[Author Prev][Author Next][Thread Prev][Thread Next][Author Index][Thread Index]
[or-cvs] r18268: {torflow} Change indent from 4->2 spaces. This makes it easier to have (torflow/trunk/NetworkScanners)
Author: mikeperry
Date: 2009-01-25 06:56:57 -0500 (Sun, 25 Jan 2009)
New Revision: 18268
Modified:
torflow/trunk/NetworkScanners/libsoat.py
torflow/trunk/NetworkScanners/soat.py
torflow/trunk/NetworkScanners/soatcli.py
Log:
Change indent from 4->2 spaces. This makes it easier to have
class methods without having no screen space left..
Modified: torflow/trunk/NetworkScanners/libsoat.py
===================================================================
--- torflow/trunk/NetworkScanners/libsoat.py 2009-01-25 11:47:53 UTC (rev 18267)
+++ torflow/trunk/NetworkScanners/libsoat.py 2009-01-25 11:56:57 UTC (rev 18268)
@@ -31,167 +31,167 @@
# classes to use with pickle to dump test results into files
class TestResult(object):
- ''' Parent class for all test result classes '''
- def __init__(self, exit_node, site, status):
- self.exit_node = exit_node
- self.site = site
- self.timestamp = time.time()
- self.status = status
+ ''' Parent class for all test result classes '''
+ def __init__(self, exit_node, site, status):
+ self.exit_node = exit_node
+ self.site = site
+ self.timestamp = time.time()
+ self.status = status
class SSLTestResult(TestResult):
- ''' Represents the result of an openssl test '''
- def __init__(self, exit_node, ssl_site, cert_file, status):
- super(SSLTestResult, self).__init__(exit_node, ssl_site, status)
- self.cert = cert_file
+ ''' Represents the result of an openssl test '''
+ def __init__(self, exit_node, ssl_site, cert_file, status):
+ super(SSLTestResult, self).__init__(exit_node, ssl_site, status)
+ self.cert = cert_file
class HttpTestResult(TestResult):
- ''' Represents the result of a http test '''
- def __init__(self, exit_node, website, tag_prints, status):
- super(HttpTestResult, self).__init__(exit_node, website, status)
- self.tag_prints = tag_prints
+ ''' Represents the result of a http test '''
+ def __init__(self, exit_node, website, tag_prints, status):
+ super(HttpTestResult, self).__init__(exit_node, website, status)
+ self.tag_prints = tag_prints
class SSHTestResult(TestResult):
- ''' Represents the result of an ssh test '''
- def __init__(self, exit_node, ssh_site, status):
- super(SSHTestResult, self).__init__(exit_node, ssh_site, status)
+ ''' Represents the result of an ssh test '''
+ def __init__(self, exit_node, ssh_site, status):
+ super(SSHTestResult, self).__init__(exit_node, ssh_site, status)
class DNSTestResult(TestResult):
- ''' Represents the result of a dns test '''
- def __init__(self, exit_node, dns_site, status):
- super(DNSTestResult, self).__init__(exit_node, dns_site, status)
+ ''' Represents the result of a dns test '''
+ def __init__(self, exit_node, dns_site, status):
+ super(DNSTestResult, self).__init__(exit_node, dns_site, status)
class DNSRebindTestResult(TestResult):
- ''' Represents the result of a dns rebind test '''
- def __init__(self, exit_node, dns_rebind_site, status):
- super(DNSRebindTestResult, self).__init__(exit_node, dns_rebind_site, status)
+ ''' Represents the result of a dns rebind test '''
+ def __init__(self, exit_node, dns_rebind_site, status):
+ super(DNSRebindTestResult, self).__init__(exit_node, dns_rebind_site, status)
class SMTPTestResult(TestResult):
- ''' Represents the result of an smtp test '''
- def __init__(self, exit_node, smtp_site, status):
- super(SMTPTestResult, self).__init__(exit_node, smtp_site, status)
+ ''' Represents the result of an smtp test '''
+ def __init__(self, exit_node, smtp_site, status):
+ super(SMTPTestResult, self).__init__(exit_node, smtp_site, status)
class IMAPTestResult(TestResult):
- ''' Represents the result of an imap test '''
- def __init__(self, exit_node, imap_site, status):
- super(IMAPTestResult, self).__init__(exit_node, imap_site, status)
+ ''' Represents the result of an imap test '''
+ def __init__(self, exit_node, imap_site, status):
+ super(IMAPTestResult, self).__init__(exit_node, imap_site, status)
class POPTestResult(TestResult):
- ''' Represents the result of a pop test '''
- def __init__(self, exit_node, pop_site, status):
- super(POPTestResult, self).__init__(exit_node, pop_site, status)
+ ''' Represents the result of a pop test '''
+ def __init__(self, exit_node, pop_site, status):
+ super(POPTestResult, self).__init__(exit_node, pop_site, status)
class DataHandler:
- ''' Class for saving and managing test result data '''
- def filterResults(self, results, protocols=[], show_good=False,
- show_bad=False, show_inconclusive=False):
- ''' filter results based on protocol and success level '''
+ ''' Class for saving and managing test result data '''
+ def filterResults(self, results, protocols=[], show_good=False,
+ show_bad=False, show_inconclusive=False):
+ ''' filter results based on protocol and success level '''
- protocol_filters = []
- status_filters = []
+ protocol_filters = []
+ status_filters = []
- for protocol in protocols:
- protocol_filters.append(lambda x, p=protocol: x.__class__.__name__.lower()[:-10].endswith(p))
- if show_good:
- status_filters.append(lambda x: x.status == TEST_SUCCESS)
- if show_bad:
- status_filters.append(lambda x: x.status == TEST_FAILURE)
- if show_inconclusive:
- status_filters.append(lambda x: x.status == TEST_INCONCLUSIVE)
+ for protocol in protocols:
+ protocol_filters.append(lambda x, p=protocol: x.__class__.__name__.lower()[:-10].endswith(p))
+ if show_good:
+ status_filters.append(lambda x: x.status == TEST_SUCCESS)
+ if show_bad:
+ status_filters.append(lambda x: x.status == TEST_FAILURE)
+ if show_inconclusive:
+ status_filters.append(lambda x: x.status == TEST_INCONCLUSIVE)
- if len(protocol_filters) == 0 or len(status_filters) == 0:
- return []
-
- protocol_filter = lambda x: reduce(operator.__or__, [f(x) for f in protocol_filters])
- status_filter = lambda x: reduce(operator.__or__, [f(x) for f in status_filters])
+ if len(protocol_filters) == 0 or len(status_filters) == 0:
+ return []
+
+ protocol_filter = lambda x: reduce(operator.__or__, [f(x) for f in protocol_filters])
+ status_filter = lambda x: reduce(operator.__or__, [f(x) for f in status_filters])
- return [x for x in results if (protocol_filter(x) and status_filter(x))]
-
- def filterByNode(self, results, id):
- ''' filter by node'''
- return filter(lambda x: x.exit_node == id, results)
+ return [x for x in results if (protocol_filter(x) and status_filter(x))]
+
+ def filterByNode(self, results, id):
+ ''' filter by node'''
+ return filter(lambda x: x.exit_node == id, results)
- def getAll(self):
- ''' get all available results'''
- return self.__getResults(data_dir)
+ def getAll(self):
+ ''' get all available results'''
+ return self.__getResults(data_dir)
- def getSsh(self):
- ''' get results of ssh tests '''
- return self.__getResults(data_dir + 'ssh/')
-
- def getHttp(self):
- ''' get results of http tests '''
- return self.__getResults(data_dir + 'http/')
+ def getSsh(self):
+ ''' get results of ssh tests '''
+ return self.__getResults(data_dir + 'ssh/')
+
+ def getHttp(self):
+ ''' get results of http tests '''
+ return self.__getResults(data_dir + 'http/')
- def getSsl(self):
- ''' get results of ssl tests '''
- return self.__getResults(data_dir + 'ssl/')
+ def getSsl(self):
+ ''' get results of ssl tests '''
+ return self.__getResults(data_dir + 'ssl/')
- def getSmtp(self):
- ''' get results of smtp tests '''
- return self.__getResults(data_dir + 'smtp/')
+ def getSmtp(self):
+ ''' get results of smtp tests '''
+ return self.__getResults(data_dir + 'smtp/')
- def getPop(self):
- ''' get results of pop tests '''
- return self.__getResults(data_dir + 'pop/')
+ def getPop(self):
+ ''' get results of pop tests '''
+ return self.__getResults(data_dir + 'pop/')
- def getImap(self):
- ''' get results of imap tests '''
- return self.__getResults(data_dir + 'imap/')
+ def getImap(self):
+ ''' get results of imap tests '''
+ return self.__getResults(data_dir + 'imap/')
- def getDns(self):
- ''' get results of basic dns tests '''
- return self.__getResults(data_dir + 'dns')
+ def getDns(self):
+ ''' get results of basic dns tests '''
+ return self.__getResults(data_dir + 'dns')
- def getDnsRebind(self):
- ''' get results of dns rebind tests '''
- return self.__getResults(data_dir + 'dnsbrebind/')
+ def getDnsRebind(self):
+ ''' get results of dns rebind tests '''
+ return self.__getResults(data_dir + 'dnsbrebind/')
- def __getResults(self, dir):
- '''
- recursively traverse the directory tree starting with dir
- gather test results from files ending with .result
- '''
- results = []
+ def __getResults(self, dir):
+ '''
+ recursively traverse the directory tree starting with dir
+ gather test results from files ending with .result
+ '''
+ results = []
- for root, dirs, files in os.walk(dir):
- for file in files:
- if file.endswith('result'):
- fh = open(os.path.join(root, file))
- result = pickle.load(fh)
- results.append(result)
+ for root, dirs, files in os.walk(dir):
+ for file in files:
+ if file.endswith('result'):
+ fh = open(os.path.join(root, file))
+ result = pickle.load(fh)
+ results.append(result)
- return results
+ return results
- def safeFilename(self, str):
- '''
- remove characters illegal in some systems
- and trim the string to a reasonable length
- '''
- replaced = (str.replace('/','_').replace('\\','_').replace('?','_').replace(':','_').
- replace('|','_').replace('*','_').replace('<','_').replace('>','_').replace('"',''))
- return replaced[:200]
+ def safeFilename(self, str):
+ '''
+ remove characters illegal in some systems
+ and trim the string to a reasonable length
+ '''
+ replaced = (str.replace('/','_').replace('\\','_').replace('?','_').replace(':','_').
+ replace('|','_').replace('*','_').replace('<','_').replace('>','_').replace('"',''))
+ return replaced[:200]
- def saveResult(self, result):
- ''' generic method for saving test results '''
- address = ''
- if result.__class__.__name__ == 'HttpTestResult':
- address = self.safeFilename(result.site[7:])
- elif result.__class__.__name__ == 'SSLTestResult':
- address = self.safeFilename(result.site[8:])
- elif 'TestResult' in result.__class__.__name__:
- address = self.safeFilename(result.site)
- else:
- raise Exception, 'This doesn\'t seems to be a result instance.'
+ def saveResult(self, result):
+ ''' generic method for saving test results '''
+ address = ''
+ if result.__class__.__name__ == 'HttpTestResult':
+ address = self.safeFilename(result.site[7:])
+ elif result.__class__.__name__ == 'SSLTestResult':
+ address = self.safeFilename(result.site[8:])
+ elif 'TestResult' in result.__class__.__name__:
+ address = self.safeFilename(result.site)
+ else:
+ raise Exception, 'This doesn\'t seems to be a result instance.'
- dir = data_dir + result.__class__.__name__[:-10].lower() + '/'
- if result.status == TEST_SUCCESS:
- dir += 'successful/'
- if result.status == TEST_INCONCLUSIVE:
- dir += 'inconclusive/'
- if result.status == TEST_FAILURE:
- dir += 'failed/'
-
- result_file = open(dir + result.exit_node[1:] + "-" + address + '.result', 'w')
- pickle.dump(result, result_file)
- result_file.close()
+ dir = data_dir + result.__class__.__name__[:-10].lower() + '/'
+ if result.status == TEST_SUCCESS:
+ dir += 'successful/'
+ if result.status == TEST_INCONCLUSIVE:
+ dir += 'inconclusive/'
+ if result.status == TEST_FAILURE:
+ dir += 'failed/'
+
+ result_file = open(dir + result.exit_node[1:] + "-" + address + '.result', 'w')
+ pickle.dump(result, result_file)
+ result_file.close()
Modified: torflow/trunk/NetworkScanners/soat.py
===================================================================
--- torflow/trunk/NetworkScanners/soat.py 2009-01-25 11:47:53 UTC (rev 18267)
+++ torflow/trunk/NetworkScanners/soat.py 2009-01-25 11:56:57 UTC (rev 18268)
@@ -73,13 +73,13 @@
result_per_type = 5
firefox_headers = {
- 'User-Agent':'Mozilla/5.0 (Windows; U; Windows NT 5.1; de; rv:1.8.1) Gecko/20061010 Firefox/2.0',
- 'Accept':'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8',
- 'Accept-Language':"en-us,en;q=0.5",
- 'Accept-Encoding':"gzip,deflate",
- 'Accept-Charset': "ISO-8859-1,utf-8;q=0.7,*;q=0.7",
- 'Keep-Alive':"300",
- 'Connection':"keep-alive"
+ 'User-Agent':'Mozilla/5.0 (Windows; U; Windows NT 5.1; de; rv:1.8.1) Gecko/20061010 Firefox/2.0',
+ 'Accept':'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8',
+ 'Accept-Language':"en-us,en;q=0.5",
+ 'Accept-Encoding':"gzip,deflate",
+ 'Accept-Charset': "ISO-8859-1,utf-8;q=0.7,*;q=0.7",
+ 'Keep-Alive':"300",
+ 'Connection':"keep-alive"
}
# http://www.voidspace.org.uk/python/articles/cookielib.shtml
@@ -91,13 +91,13 @@
#
ports_to_check = [
- ["pop", ExitPolicyRestriction('255.255.255.255', 110), "pops", ExitPolicyRestriction('255.255.255.255', 995)],
- ["imap", ExitPolicyRestriction('255.255.255.255', 143), "imaps", ExitPolicyRestriction('255.255.255.255', 993)],
- ["telnet", ExitPolicyRestriction('255.255.255.255', 23), "ssh", ExitPolicyRestriction('255.255.255.255', 22)],
- ["smtp", ExitPolicyRestriction('255.255.255.255', 25), "smtps", ExitPolicyRestriction('255.255.255.255', 465)],
- ["http", ExitPolicyRestriction('255.255.255.255', 80), "https",
+ ["pop", ExitPolicyRestriction('255.255.255.255', 110), "pops", ExitPolicyRestriction('255.255.255.255', 995)],
+ ["imap", ExitPolicyRestriction('255.255.255.255', 143), "imaps", ExitPolicyRestriction('255.255.255.255', 993)],
+ ["telnet", ExitPolicyRestriction('255.255.255.255', 23), "ssh", ExitPolicyRestriction('255.255.255.255', 22)],
+ ["smtp", ExitPolicyRestriction('255.255.255.255', 25), "smtps", ExitPolicyRestriction('255.255.255.255', 465)],
+ ["http", ExitPolicyRestriction('255.255.255.255', 80), "https",
ExitPolicyRestriction('255.255.255.255', 443)],
- ["plaintext", NodeRestrictionList([
+ ["plaintext", NodeRestrictionList([
ExitPolicyRestriction('255.255.255.255',110),
ExitPolicyRestriction('255.255.255.255',143),
ExitPolicyRestriction('255.255.255.255',23),
@@ -120,13 +120,13 @@
# refer to: www.iana.org/assignments/ipv4-address-space, www.iana.org/assignments/multicast-addresses
#
ipv4_nonpublic = [
- '00000000', # default route and its network: 0.0.0.0/8
- '00001010', # private 10.0.0.0/8
- '01111111', # loopback 127.0.0.0/8
- '1010100111111110', # link-local 169.254.0.0/16
- '101011000001', # private 172.16.0.0/12
- '1100000010101000', # private 192.168.0.0/16
- '111' # multicast & experimental 224.0.0.0/3
+ '00000000', # default route and its network: 0.0.0.0/8
+ '00001010', # private 10.0.0.0/8
+ '01111111', # loopback 127.0.0.0/8
+ '1010100111111110', # link-local 169.254.0.0/16
+ '101011000001', # private 172.16.0.0/12
+ '1100000010101000', # private 192.168.0.0/16
+ '111' # multicast & experimental 224.0.0.0/3
]
# tags and attributes to check in the http test: XXX these should be reviewed
@@ -145,1336 +145,1336 @@
# Http request handling
def http_request(address, cookie_jar=None):
- ''' perform a http GET-request and return the content received '''
- request = urllib2.Request(address)
- for h in firefox_headers.iterkeys():
- request.add_header(h, firefox_headers[h])
+ ''' perform a http GET-request and return the content received '''
+ request = urllib2.Request(address)
+ for h in firefox_headers.iterkeys():
+ request.add_header(h, firefox_headers[h])
- content = ""
- try:
- if cookie_jar != None:
- opener = urllib2.build_opener(urllib2.HTTPCookieProcessor(cookie_jar))
- reply = opener.open(request)
- if "__filename" in cookie_jar.__dict__:
- cookie_jar.save(cookie_jar.__filename)
- else:
- reply = urllib2.urlopen(request)
- content = decompress_response_data(reply)
- except (ValueError, urllib2.URLError):
- plog('WARN', 'The http-request address ' + address + ' is malformed')
- return ""
- except (IndexError, TypeError):
- plog('WARN', 'An error occured while negotiating socks5 with Tor')
- return ""
- except KeyboardInterrupt:
- raise KeyboardInterrupt
- except:
- plog('WARN', 'An unknown HTTP error occured for '+address)
- traceback.print_exc()
- return ""
+ content = ""
+ try:
+ if cookie_jar != None:
+ opener = urllib2.build_opener(urllib2.HTTPCookieProcessor(cookie_jar))
+ reply = opener.open(request)
+ if "__filename" in cookie_jar.__dict__:
+ cookie_jar.save(cookie_jar.__filename)
+ else:
+ reply = urllib2.urlopen(request)
+ content = decompress_response_data(reply)
+ except (ValueError, urllib2.URLError):
+ plog('WARN', 'The http-request address ' + address + ' is malformed')
+ return ""
+ except (IndexError, TypeError):
+ plog('WARN', 'An error occured while negotiating socks5 with Tor')
+ return ""
+ except KeyboardInterrupt:
+ raise KeyboardInterrupt
+ except:
+ plog('WARN', 'An unknown HTTP error occured for '+address)
+ traceback.print_exc()
+ return ""
- return content
+ return content
# a simple interface to handle a socket connection
class Client:
- def __init__(self, host, port):
- self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
- self.sock.connect((host, port))
- self.buffer = self.sock.makefile('rb')
+ def __init__(self, host, port):
+ self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+ self.sock.connect((host, port))
+ self.buffer = self.sock.makefile('rb')
- def writeline(self, line):
- self.sock.send(line + linebreak)
+ def writeline(self, line):
+ self.sock.send(line + linebreak)
- def readline(self):
- response = self.buffer.readline()
- if not response:
- raise EOFError
- elif response[-2:] == linebreak:
- response = response[:-2]
- elif response[-1:] in linebreak:
- response = response[:-1]
- return response
+ def readline(self):
+ response = self.buffer.readline()
+ if not response:
+ raise EOFError
+ elif response[-2:] == linebreak:
+ response = response[:-2]
+ elif response[-1:] in linebreak:
+ response = response[:-1]
+ return response
class DNSRebindScanner(EventHandler):
- '''
- A tor control event handler extending TorCtl.EventHandler
- Monitors for REMAP events (see check_dns_rebind())
- '''
- def __init__(self, exit_node_scanner):
- EventHandler.__init__(self)
- self.__soat = exit_node_scanner
+ '''
+ A tor control event handler extending TorCtl.EventHandler
+ Monitors for REMAP events (see check_dns_rebind())
+ '''
+ def __init__(self, exit_node_scanner):
+ EventHandler.__init__(self)
+ self.__soat = exit_node_scanner
- def stream_status_event(self, event):
- if event.status == 'REMAP':
- octets = map(lambda x: int2bin(x).zfill(8), event.target_host.split('.'))
- ipbin = ''.join(octets)
- for network in ipv4_nonpublic:
- if ipbin[:len(network)] == network:
- handler = DataHandler()
- node = self.__soat.get_exit_node()
- plog("ERROR", "DNS Rebeind failure via "+node)
- result = DNSRebindTestResult(node, '', TEST_FAILURE)
- handler.saveResult(result)
+ def stream_status_event(self, event):
+ if event.status == 'REMAP':
+ octets = map(lambda x: int2bin(x).zfill(8), event.target_host.split('.'))
+ ipbin = ''.join(octets)
+ for network in ipv4_nonpublic:
+ if ipbin[:len(network)] == network:
+ handler = DataHandler()
+ node = self.__soat.get_exit_node()
+ plog("ERROR", "DNS Rebeind failure via "+node)
+ result = DNSRebindTestResult(node, '', TEST_FAILURE)
+ handler.saveResult(result)
class ExitNodeScanner:
- ''' The scanner class '''
- def __init__(self):
- '''
- Establish a connection to metatroller & control port,
- configure metatroller, load the number of previously tested nodes
- '''
- # establish a metatroller connection
- plog('INFO', 'ExitNodeScanner starting up...')
- try:
- self.__meta = Client(meta_host, meta_port)
- except socket.error:
- plog('ERROR', 'Couldn\'t connect to metatroller. Is it on?')
- exit()
+ ''' The scanner class '''
+ def __init__(self):
+ '''
+ Establish a connection to metatroller & control port,
+ configure metatroller, load the number of previously tested nodes
+ '''
+ # establish a metatroller connection
+ plog('INFO', 'ExitNodeScanner starting up...')
+ try:
+ self.__meta = Client(meta_host, meta_port)
+ except socket.error:
+ plog('ERROR', 'Couldn\'t connect to metatroller. Is it on?')
+ exit()
+
+ # skip two lines of metatroller introduction
+ data = self.__meta.readline()
+ data = self.__meta.readline()
- # skip two lines of metatroller introduction
- data = self.__meta.readline()
- data = self.__meta.readline()
-
- # configure metatroller
- commands = [
- 'PATHLEN 2',
- 'PERCENTFAST 10', # Cheat to win!
- 'USEALLEXITS 1',
- 'UNIFORM 0',
- 'BWCUTOFF 1',
- 'ORDEREXITS 1',
- 'GUARDNODES 0',
- 'RESETSTATS']
- plog('INFO', 'Executing preliminary configuration commands')
- for c in commands:
- self.__meta.writeline(c)
- reply = self.__meta.readline()
- if reply[:3] != '250': # first three chars indicate the reply code
- reply += self.__meta.readline()
- plog('ERROR', 'Error configuring metatroller (' + c + ' failed)')
- plog('ERROR', reply)
- exit()
+ # configure metatroller
+ commands = [
+ 'PATHLEN 2',
+ 'PERCENTFAST 10', # Cheat to win!
+ 'USEALLEXITS 1',
+ 'UNIFORM 0',
+ 'BWCUTOFF 1',
+ 'ORDEREXITS 1',
+ 'GUARDNODES 0',
+ 'RESETSTATS']
+ plog('INFO', 'Executing preliminary configuration commands')
+ for c in commands:
+ self.__meta.writeline(c)
+ reply = self.__meta.readline()
+ if reply[:3] != '250': # first three chars indicate the reply code
+ reply += self.__meta.readline()
+ plog('ERROR', 'Error configuring metatroller (' + c + ' failed)')
+ plog('ERROR', reply)
+ exit()
- # establish a control port connection
- try:
- s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
- s.connect((control_host, control_port))
- c = Connection(s)
- c.authenticate()
- self.__control = c
- except socket.error, e:
- plog('ERROR', 'Couldn\'t connect to the control port')
- plog('ERROR', e)
- exit()
- except AttributeError, e:
- plog('ERROR', 'A service other that the Tor control port is listening on ' + control_host + ':' + control_port)
- plog('ERROR', e)
- exit()
+ # establish a control port connection
+ try:
+ s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+ s.connect((control_host, control_port))
+ c = Connection(s)
+ c.authenticate()
+ self.__control = c
+ except socket.error, e:
+ plog('ERROR', 'Couldn\'t connect to the control port')
+ plog('ERROR', e)
+ exit()
+ except AttributeError, e:
+ plog('ERROR', 'A service other that the Tor control port is listening on ' + control_host + ':' + control_port)
+ plog('ERROR', e)
+ exit()
- # get a data handler
- self.__datahandler = DataHandler()
+ # get a data handler
+ self.__datahandler = DataHandler()
- # TODO get stats about previous runs
- plog('INFO', 'Loading the previous run stats')
+ # TODO get stats about previous runs
+ plog('INFO', 'Loading the previous run stats')
- ssh_results = self.__datahandler.getSsh()
- ssl_results = self.__datahandler.getSsl()
- http_results = self.__datahandler.getHttp()
+ ssh_results = self.__datahandler.getSsh()
+ ssl_results = self.__datahandler.getSsl()
+ http_results = self.__datahandler.getHttp()
- # get lists of tested nodes
- self.ssh_tested = Set([x.exit_node for x in ssh_results])
- self.http_tested = Set([x.exit_node for x in http_results])
- self.ssl_tested = Set([x.exit_node for x in ssl_results])
-
- # get the number of failures
- self.ssh_fail = [self.__datahandler.filterResults(ssh_results, protocols=["ssh"], show_bad=True)]
- self.http_fail = [self.__datahandler.filterResults(http_results, protocols=["http"], show_bad=True)]
- self.ssl_fail = [self.__datahandler.filterResults(ssl_results, protocols=["ssl"], show_bad=True)]
+ # get lists of tested nodes
+ self.ssh_tested = Set([x.exit_node for x in ssh_results])
+ self.http_tested = Set([x.exit_node for x in http_results])
+ self.ssl_tested = Set([x.exit_node for x in ssl_results])
+
+ # get the number of failures
+ self.ssh_fail = [self.__datahandler.filterResults(ssh_results, protocols=["ssh"], show_bad=True)]
+ self.http_fail = [self.__datahandler.filterResults(http_results, protocols=["http"], show_bad=True)]
+ self.ssl_fail = [self.__datahandler.filterResults(ssl_results, protocols=["ssl"], show_bad=True)]
- plog('INFO', 'ExitNodeScanner up and ready')
+ plog('INFO', 'ExitNodeScanner up and ready')
- def get_exit_node(self):
- ''' ask metatroller for the last exit used '''
- self.__meta.writeline("GETLASTEXIT")
- reply = self.__meta.readline()
-
- if reply[:3] != '250':
- reply += self.__meta.readline()
- plog('ERROR', reply)
- return 0
-
- p = re.compile('250 LASTEXIT=[\S]+')
- m = p.match(reply)
- self.__exit = m.group()[13:] # drop the irrelevant characters
- plog('INFO','Current node: ' + self.__exit)
- return self.__exit
+ def get_exit_node(self):
+ ''' ask metatroller for the last exit used '''
+ self.__meta.writeline("GETLASTEXIT")
+ reply = self.__meta.readline()
+
+ if reply[:3] != '250':
+ reply += self.__meta.readline()
+ plog('ERROR', reply)
+ return 0
+
+ p = re.compile('250 LASTEXIT=[\S]+')
+ m = p.match(reply)
+ self.__exit = m.group()[13:] # drop the irrelevant characters
+ plog('INFO','Current node: ' + self.__exit)
+ return self.__exit
- def get_new_circuit(self):
- ''' tell metatroller to close the current circuit and open a new one '''
- plog('DEBUG', 'Trying to construct a new circuit')
- self.__meta.writeline("NEWEXIT")
- reply = self.__meta.readline()
+ def get_new_circuit(self):
+ ''' tell metatroller to close the current circuit and open a new one '''
+ plog('DEBUG', 'Trying to construct a new circuit')
+ self.__meta.writeline("NEWEXIT")
+ reply = self.__meta.readline()
- if reply[:3] != '250':
- plog('ERROR', 'Choosing a new exit failed')
- plog('ERROR', reply)
+ if reply[:3] != '250':
+ plog('ERROR', 'Choosing a new exit failed')
+ plog('ERROR', reply)
- def set_new_exit(self, exit):
- '''
- tell metatroller to set the given node as the exit in the next circuit
- '''
- plog('DEBUG', 'Trying to set ' + `exit` + ' as the exit for the next circuit')
- self.__meta.writeline("SETEXIT $"+exit)
- reply = self.__meta.readline()
+ def set_new_exit(self, exit):
+ '''
+ tell metatroller to set the given node as the exit in the next circuit
+ '''
+ plog('DEBUG', 'Trying to set ' + `exit` + ' as the exit for the next circuit')
+ self.__meta.writeline("SETEXIT $"+exit)
+ reply = self.__meta.readline()
- if reply[:3] != '250':
- plog('ERROR', 'Setting ' + exit + ' as the new exit failed')
- plog('ERROR', reply)
+ if reply[:3] != '250':
+ plog('ERROR', 'Setting ' + exit + ' as the new exit failed')
+ plog('ERROR', reply)
- def report_bad_exit(self, exit):
- '''
- report an evil exit to the control port using AuthDirBadExit
- Note: currently not used
- '''
- # self.__contol.set_option('AuthDirBadExit', exit) ?
- pass
+ def report_bad_exit(self, exit):
+ '''
+ report an evil exit to the control port using AuthDirBadExit
+ Note: currently not used
+ '''
+ # self.__contol.set_option('AuthDirBadExit', exit) ?
+ pass
- def get_nodes_for_port(self, port):
- ''' ask control port for a list of nodes that allow exiting to a given port '''
- routers = self.__control.read_routers(self.__control.get_network_status())
- restriction = NodeRestrictionList([FlagsRestriction(["Running", "Valid"]), ExitPolicyRestriction('255.255.255.255', port)])
- return [x for x in routers if restriction.r_is_ok(x)]
+ def get_nodes_for_port(self, port):
+ ''' ask control port for a list of nodes that allow exiting to a given port '''
+ routers = self.__control.read_routers(self.__control.get_network_status())
+ restriction = NodeRestrictionList([FlagsRestriction(["Running", "Valid"]), ExitPolicyRestriction('255.255.255.255', port)])
+ return [x for x in routers if restriction.r_is_ok(x)]
- def check_all_exits_port_consistency(self):
- '''
- an independent test that finds nodes that allow connections over a common protocol
- while disallowing connections over its secure version (for instance http/https)
- '''
+ def check_all_exits_port_consistency(self):
+ '''
+ an independent test that finds nodes that allow connections over a common protocol
+ while disallowing connections over its secure version (for instance http/https)
+ '''
- # get the structure
- routers = self.__control.read_routers(self.__control.get_network_status())
- bad_exits = Set([])
- specific_bad_exits = [None]*len(ports_to_check)
- for i in range(len(ports_to_check)):
- specific_bad_exits[i] = []
+ # get the structure
+ routers = self.__control.read_routers(self.__control.get_network_status())
+ bad_exits = Set([])
+ specific_bad_exits = [None]*len(ports_to_check)
+ for i in range(len(ports_to_check)):
+ specific_bad_exits[i] = []
- # check exit policies
- for router in routers:
- for i in range(len(ports_to_check)):
- [common_protocol, common_restriction, secure_protocol, secure_restriction] = ports_to_check[i]
- if common_restriction.r_is_ok(router) and not secure_restriction.r_is_ok(router):
- bad_exits.add(router)
- specific_bad_exits[i].append(router)
- plog('INFO', 'Router ' + router.nickname + ' allows ' + common_protocol + ' but not ' + secure_protocol)
-
- # report results
- plog('INFO', 'Total exits: ' + `len(routers)`)
- for i in range(len(ports_to_check)):
- [common_protocol, _, secure_protocol, _] = ports_to_check[i]
- plog('INFO', 'Exits with ' + common_protocol + ' / ' + secure_protocol + ' problem: ' + `len(specific_bad_exits[i])` + ' (~' + `(len(specific_bad_exits[i]) * 100 / len(routers))` + '%)')
- plog('INFO', 'Total bad exits: ' + `len(bad_exits)` + ' (~' + `(len(bad_exits) * 100 / len(routers))` + '%)')
+ # check exit policies
+ for router in routers:
+ for i in range(len(ports_to_check)):
+ [common_protocol, common_restriction, secure_protocol, secure_restriction] = ports_to_check[i]
+ if common_restriction.r_is_ok(router) and not secure_restriction.r_is_ok(router):
+ bad_exits.add(router)
+ specific_bad_exits[i].append(router)
+ plog('INFO', 'Router ' + router.nickname + ' allows ' + common_protocol + ' but not ' + secure_protocol)
+
+ # report results
+ plog('INFO', 'Total exits: ' + `len(routers)`)
+ for i in range(len(ports_to_check)):
+ [common_protocol, _, secure_protocol, _] = ports_to_check[i]
+ plog('INFO', 'Exits with ' + common_protocol + ' / ' + secure_protocol + ' problem: ' + `len(specific_bad_exits[i])` + ' (~' + `(len(specific_bad_exits[i]) * 100 / len(routers))` + '%)')
+ plog('INFO', 'Total bad exits: ' + `len(bad_exits)` + ' (~' + `(len(bad_exits) * 100 / len(routers))` + '%)')
- def check_http(self, address):
- ''' check whether a http connection to a given address is molested '''
- plog('INFO', 'Conducting an http test with destination ' + address)
+ def check_http(self, address):
+ ''' check whether a http connection to a given address is molested '''
+ plog('INFO', 'Conducting an http test with destination ' + address)
- defaultsocket = socket.socket
- socks.setdefaultproxy(socks.PROXY_TYPE_SOCKS5, tor_host, tor_port)
- socket.socket = socks.socksocket
+ defaultsocket = socket.socket
+ socks.setdefaultproxy(socks.PROXY_TYPE_SOCKS5, tor_host, tor_port)
+ socket.socket = socks.socksocket
- pcontent = http_request(address)
+ pcontent = http_request(address)
- # reset the connection to direct
- socket.socket = defaultsocket
+ # reset the connection to direct
+ socket.socket = defaultsocket
- exit_node = self.get_exit_node()
- if exit_node == 0 or exit_node == '0' or not exit_node:
- plog('INFO', 'We had no exit node to test, skipping to the next test.')
- return TEST_SUCCESS
+ exit_node = self.get_exit_node()
+ if exit_node == 0 or exit_node == '0' or not exit_node:
+ plog('INFO', 'We had no exit node to test, skipping to the next test.')
+ return TEST_SUCCESS
- # an address representation acceptable for a filename
- address_file = self.__datahandler.safeFilename(address[7:])
+ # an address representation acceptable for a filename
+ address_file = self.__datahandler.safeFilename(address[7:])
- # if we have no content, we had a connection error
- if pcontent == "":
- result = HttpTestResult(exit_node, address, 0, TEST_INCONCLUSIVE)
- self.__datahandler.saveResult(result)
- return TEST_INCONCLUSIVE
+ # if we have no content, we had a connection error
+ if pcontent == "":
+ result = HttpTestResult(exit_node, address, 0, TEST_INCONCLUSIVE)
+ self.__datahandler.saveResult(result)
+ return TEST_INCONCLUSIVE
- elements = SoupStrainer(lambda name, attrs : name in tags_to_check or
- len(Set(attrs).intersection(Set(attrs_to_check))) > 0)
- pcontent = pcontent.decode('ascii', 'ignore')
- psoup = BeautifulSoup(pcontent, parseOnlyThese=elements)
+ elements = SoupStrainer(lambda name, attrs : name in tags_to_check or
+ len(Set(attrs).intersection(Set(attrs_to_check))) > 0)
+ pcontent = pcontent.decode('ascii', 'ignore')
+ psoup = BeautifulSoup(pcontent, parseOnlyThese=elements)
- # load the original tag structure
- # if we don't have any yet, get it
- soup = 0
- try:
- tag_file = open(http_tags_dir + address_file + '.tags', 'r')
- soup = BeautifulSoup(tag_file.read())
- tag_file.close()
- except IOError:
- content = http_request(address)
- content = content.decode('ascii','ignore')
- soup = BeautifulSoup(content, parseOnlyThese=elements)
- tag_file = open(http_tags_dir + address_file + '.tags', 'w')
- tag_file.write(soup.__str__() + ' ') # the space is needed in case we have some page with no matching tags at all
- tag_file.close()
- except TypeError, e:
- plog('ERROR', 'Failed parsing the tag tree for ' + address)
- plog('ERROR', e)
- return TEST_INCONCLUSIVE
- if soup == 0:
- plog('ERROR', 'Failed to get the correct tag structure for ' + address)
- return TEST_INCONCLUSIVE
+ # load the original tag structure
+ # if we don't have any yet, get it
+ soup = 0
+ try:
+ tag_file = open(http_tags_dir + address_file + '.tags', 'r')
+ soup = BeautifulSoup(tag_file.read())
+ tag_file.close()
+ except IOError:
+ content = http_request(address)
+ content = content.decode('ascii','ignore')
+ soup = BeautifulSoup(content, parseOnlyThese=elements)
+ tag_file = open(http_tags_dir + address_file + '.tags', 'w')
+ tag_file.write(soup.__str__() + ' ') # the space is needed in case we have some page with no matching tags at all
+ tag_file.close()
+ except TypeError, e:
+ plog('ERROR', 'Failed parsing the tag tree for ' + address)
+ plog('ERROR', e)
+ return TEST_INCONCLUSIVE
+ if soup == 0:
+ plog('ERROR', 'Failed to get the correct tag structure for ' + address)
+ return TEST_INCONCLUSIVE
- self.http_tested.add(exit_node)
+ self.http_tested.add(exit_node)
- # compare the content
- # if content matches, everything is ok
- if psoup == soup:
- result = HttpTestResult(exit_node, address, 0, TEST_SUCCESS)
- self.__datahandler.saveResult(result)
- return TEST_SUCCESS
+ # compare the content
+ # if content matches, everything is ok
+ if psoup == soup:
+ result = HttpTestResult(exit_node, address, 0, TEST_SUCCESS)
+ self.__datahandler.saveResult(result)
+ return TEST_SUCCESS
- # if content doesnt match, update the direct content
- content_new = http_request(address)
- content_new = content_new.decode('ascii', 'ignore')
- if not content_new:
- result = HttpTestResult(exit_node, address, 0, TEST_INCONCLUSIVE)
- self.__datahandler.saveResult(result)
- return TEST_INCONCLUSIVE
+ # if content doesnt match, update the direct content
+ content_new = http_request(address)
+ content_new = content_new.decode('ascii', 'ignore')
+ if not content_new:
+ result = HttpTestResult(exit_node, address, 0, TEST_INCONCLUSIVE)
+ self.__datahandler.saveResult(result)
+ return TEST_INCONCLUSIVE
- soup_new = BeautifulSoup(content_new, parseOnlyThese=elements)
- # compare the new and old content
- # if they match, means the node has been changing the content
- if soup == soup_new:
- result = HttpTestResult(exit_node, address, 0, TEST_FAILURE)
- self.__datahandler.saveResult(result)
- tag_file = open(http_tags_dir + exit_node[1:] + '_' + address_file + '.tags', 'w')
- tag_file.write(psoup.__str__())
- tag_file.close()
- plog("ERROR", "HTTP Failure at "+exit_node)
- return TEST_FAILURE
+ soup_new = BeautifulSoup(content_new, parseOnlyThese=elements)
+ # compare the new and old content
+ # if they match, means the node has been changing the content
+ if soup == soup_new:
+ result = HttpTestResult(exit_node, address, 0, TEST_FAILURE)
+ self.__datahandler.saveResult(result)
+ tag_file = open(http_tags_dir + exit_node[1:] + '_' + address_file + '.tags', 'w')
+ tag_file.write(psoup.__str__())
+ tag_file.close()
+ plog("ERROR", "HTTP Failure at "+exit_node)
+ return TEST_FAILURE
- # if content has changed outside of tor, update the saved file
- tag_file = open(http_tags_dir + address_file + '.tags', 'w')
- tag_file.write(soup_new.__str__())
- tag_file.close()
+ # if content has changed outside of tor, update the saved file
+ tag_file = open(http_tags_dir + address_file + '.tags', 'w')
+ tag_file.write(soup_new.__str__())
+ tag_file.close()
- # compare the node content and the new content
- # if it matches, everything is ok
- if psoup == soup_new:
- result = HttpTestResult(exit_node, address, 0, TEST_SUCCESS)
- self.__datahandler.saveResult(result)
- return TEST_SUCCESS
+ # compare the node content and the new content
+ # if it matches, everything is ok
+ if psoup == soup_new:
+ result = HttpTestResult(exit_node, address, 0, TEST_SUCCESS)
+ self.__datahandler.saveResult(result)
+ return TEST_SUCCESS
- # if it doesn't match, means the node has been changing the content
- result = HttpTestResult(exit_node, address, 0, TEST_FAILURE)
- self.__datahandler.saveResult(result)
- tag_file = open(http_tags_dir + exit_node[1:] + '_' + address_file + '.tags', 'w')
- tag_file.write(psoup.__str__())
- tag_file.close()
-
- plog("ERROR", "HTTP Failure at "+exit_node)
- return TEST_FAILURE
+ # if it doesn't match, means the node has been changing the content
+ result = HttpTestResult(exit_node, address, 0, TEST_FAILURE)
+ self.__datahandler.saveResult(result)
+ tag_file = open(http_tags_dir + exit_node[1:] + '_' + address_file + '.tags', 'w')
+ tag_file.write(psoup.__str__())
+ tag_file.close()
+
+ plog("ERROR", "HTTP Failure at "+exit_node)
+ return TEST_FAILURE
- def check_openssh(self, address):
- ''' check whether an openssh connection to a given address is molested '''
- # TODO
- #ssh = pyssh.Ssh('username', 'host', 22)
- #ssh.set_sshpath(pyssh.SSH_PATH)
- #response = self.ssh.sendcmd('ls')
- #print response
+ def check_openssh(self, address):
+ ''' check whether an openssh connection to a given address is molested '''
+ # TODO
+ #ssh = pyssh.Ssh('username', 'host', 22)
+ #ssh.set_sshpath(pyssh.SSH_PATH)
+ #response = self.ssh.sendcmd('ls')
+ #print response
- return 0
+ return 0
- def check_openssl(self, address):
- ''' check whether an https connection to a given address is molested '''
- plog('INFO', 'Conducting an ssl test with destination ' + address)
+ def check_openssl(self, address):
+ ''' check whether an https connection to a given address is molested '''
+ plog('INFO', 'Conducting an ssl test with destination ' + address)
- # an address representation acceptable for a filename
- address_file = self.__datahandler.safeFilename(address[8:])
+ # an address representation acceptable for a filename
+ address_file = self.__datahandler.safeFilename(address[8:])
- # get the cert via tor
+ # get the cert via tor
- defaultsocket = socket.socket
- socks.setdefaultproxy(socks.PROXY_TYPE_SOCKS5, tor_host, tor_port)
- socket.socket = socks.socksocket
+ defaultsocket = socket.socket
+ socks.setdefaultproxy(socks.PROXY_TYPE_SOCKS5, tor_host, tor_port)
+ socket.socket = socks.socksocket
- cert = self.ssl_request(address)
+ cert = self.ssl_request(address)
- # reset the connection method back to direct
- socket.socket = defaultsocket
+ # reset the connection method back to direct
+ socket.socket = defaultsocket
- exit_node = self.get_exit_node()
- if exit_node == 0 or exit_node == '0' or not exit_node:
- plog('WARN', 'We had no exit node to test, skipping to the next test.')
- return TEST_FAILURE
+ exit_node = self.get_exit_node()
+ if exit_node == 0 or exit_node == '0' or not exit_node:
+ plog('WARN', 'We had no exit node to test, skipping to the next test.')
+ return TEST_FAILURE
- # if we got no cert, there was an ssl error
- if cert == 0:
- result = SSLTestResult(exit_node, address, 0, TEST_INCONCLUSIVE)
- self.__datahandler.saveResult(result)
- return TEST_INCONCLUSIVE
+ # if we got no cert, there was an ssl error
+ if cert == 0:
+ result = SSLTestResult(exit_node, address, 0, TEST_INCONCLUSIVE)
+ self.__datahandler.saveResult(result)
+ return TEST_INCONCLUSIVE
- # load the original cert and compare
- # if we don't have the original cert yet, get it
- original_cert = 0
- try:
- cert_file = open(ssl_certs_dir + address_file + '.pem', 'r')
- cert_string = cert_file.read()
- original_cert = crypto.load_certificate(crypto.FILETYPE_PEM, cert_string)
- except IOError:
- plog('INFO', 'Opening a direct ssl connection to ' + address)
- original_cert = self.ssl_request(address)
- if not original_cert:
- plog('WARN', 'Error getting the correct cert for ' + address)
- return TEST_INCONCLUSIVE
- if original_cert.has_expired():
- plog('WARN', 'The ssl cert for ' + address + 'seems to have expired. Skipping to the next test...')
- return TEST_INCONCLUSIVE
- cert_file = open(ssl_certs_dir + address_file + '.pem', 'w')
- cert_file.write(crypto.dump_certificate(crypto.FILETYPE_PEM, original_cert))
- cert_file.close()
- except OpenSSL.crypto.Error:
- plog('NOTICE', 'There are non-related files in ' + ssl_certs_dir + '. You should probably clean it.')
- return TEST_INCONCLUSIVE
- if not original_cert:
- plog('WARN', 'Error getting the correct cert for ' + address)
- return TEST_INCONCLUSIVE
+ # load the original cert and compare
+ # if we don't have the original cert yet, get it
+ original_cert = 0
+ try:
+ cert_file = open(ssl_certs_dir + address_file + '.pem', 'r')
+ cert_string = cert_file.read()
+ original_cert = crypto.load_certificate(crypto.FILETYPE_PEM, cert_string)
+ except IOError:
+ plog('INFO', 'Opening a direct ssl connection to ' + address)
+ original_cert = self.ssl_request(address)
+ if not original_cert:
+ plog('WARN', 'Error getting the correct cert for ' + address)
+ return TEST_INCONCLUSIVE
+ if original_cert.has_expired():
+ plog('WARN', 'The ssl cert for ' + address + 'seems to have expired. Skipping to the next test...')
+ return TEST_INCONCLUSIVE
+ cert_file = open(ssl_certs_dir + address_file + '.pem', 'w')
+ cert_file.write(crypto.dump_certificate(crypto.FILETYPE_PEM, original_cert))
+ cert_file.close()
+ except OpenSSL.crypto.Error:
+ plog('NOTICE', 'There are non-related files in ' + ssl_certs_dir + '. You should probably clean it.')
+ return TEST_INCONCLUSIVE
+ if not original_cert:
+ plog('WARN', 'Error getting the correct cert for ' + address)
+ return TEST_INCONCLUSIVE
- # get an easily comparable representation of the certs
- cert_pem = crypto.dump_certificate(crypto.FILETYPE_PEM, cert)
- original_cert_pem = crypto.dump_certificate(crypto.FILETYPE_PEM, original_cert)
+ # get an easily comparable representation of the certs
+ cert_pem = crypto.dump_certificate(crypto.FILETYPE_PEM, cert)
+ original_cert_pem = crypto.dump_certificate(crypto.FILETYPE_PEM, original_cert)
- # in any case we can consider the node looked at
- self.ssl_tested.add(exit_node)
+ # in any case we can consider the node looked at
+ self.ssl_tested.add(exit_node)
- # if certs match, everything is ok
- if cert_pem == original_cert_pem:
- cert_file = ssl_certs_dir + address_file + '.pem'
- result = SSLTestResult(exit_node, address, cert_file, TEST_SUCCESS)
- self.__datahandler.saveResult(result)
- return TEST_SUCCESS
-
- # if certs dont match, open up a direct connection and update the cert
- plog('INFO', 'Opening a direct ssl connection to ' + address)
- original_cert_new = self.ssl_request(address)
- if original_cert_new == 0:
- plog('WARN', 'Error getting the correct cert for ' + address)
- result = SSLTestResult(exit_node, address, 0, TEST_INCONCLUSIVE)
- self.__datahandler.saveResult(result)
- return TEST_INCONCLUSIVE
+ # if certs match, everything is ok
+ if cert_pem == original_cert_pem:
+ cert_file = ssl_certs_dir + address_file + '.pem'
+ result = SSLTestResult(exit_node, address, cert_file, TEST_SUCCESS)
+ self.__datahandler.saveResult(result)
+ return TEST_SUCCESS
+
+ # if certs dont match, open up a direct connection and update the cert
+ plog('INFO', 'Opening a direct ssl connection to ' + address)
+ original_cert_new = self.ssl_request(address)
+ if original_cert_new == 0:
+ plog('WARN', 'Error getting the correct cert for ' + address)
+ result = SSLTestResult(exit_node, address, 0, TEST_INCONCLUSIVE)
+ self.__datahandler.saveResult(result)
+ return TEST_INCONCLUSIVE
- original_cert_new_pem = crypto.dump_certificate(crypto.FILETYPE_PEM, original_cert_new)
+ original_cert_new_pem = crypto.dump_certificate(crypto.FILETYPE_PEM, original_cert_new)
- # compare the old and new cert
- # if certs match, means the exit node has been messing with the cert
- if original_cert_pem == original_cert_new_pem:
- plog('ERROR', 'Exit node ' + exit_node + ' seems to be meddling with certificates. (' + address + ')')
+ # compare the old and new cert
+ # if certs match, means the exit node has been messing with the cert
+ if original_cert_pem == original_cert_new_pem:
+ plog('ERROR', 'Exit node ' + exit_node + ' seems to be meddling with certificates. (' + address + ')')
- cert_file_name = ssl_certs_dir + address_file + '_' + exit_node[1:] + '.pem'
- cert_file = open(cert_file_name, 'w')
- cert_file.write(cert_pem)
- cert_file.close()
+ cert_file_name = ssl_certs_dir + address_file + '_' + exit_node[1:] + '.pem'
+ cert_file = open(cert_file_name, 'w')
+ cert_file.write(cert_pem)
+ cert_file.close()
- result = SSLTestResult(exit_node, address, cert_file_name, TEST_FAILURE)
- self.__datahandler.saveResult(result)
- return TEST_FAILURE
+ result = SSLTestResult(exit_node, address, cert_file_name, TEST_FAILURE)
+ self.__datahandler.saveResult(result)
+ return TEST_FAILURE
- # if comparsion fails, replace the old cert with the new one
- # XXX: Hrmm, probably should store as a seperate IP file in this case
- # so we don't keep alternating on sites that have round robin
- # DNS and different certs for each IP..
- cert_file = open(ssl_certs_dir + address_file + '.pem', 'w')
- cert_file.write(original_cert_new_pem)
- cert_file.close()
-
- # compare the new cert and the node cert
- # if certs match, everything is ok
- if cert_pem == original_cert_new_pem:
- cert_file = ssl_certs_dir + address_file + '.pem'
- result = SSLTestResult(exit_node, address, cert_file, TEST_SUCCESS)
- self.__datahandler.saveResult(result)
- return TEST_SUCCESS
+ # if comparsion fails, replace the old cert with the new one
+ # XXX: Hrmm, probably should store as a seperate IP file in this case
+ # so we don't keep alternating on sites that have round robin
+ # DNS and different certs for each IP..
+ cert_file = open(ssl_certs_dir + address_file + '.pem', 'w')
+ cert_file.write(original_cert_new_pem)
+ cert_file.close()
+
+ # compare the new cert and the node cert
+ # if certs match, everything is ok
+ if cert_pem == original_cert_new_pem:
+ cert_file = ssl_certs_dir + address_file + '.pem'
+ result = SSLTestResult(exit_node, address, cert_file, TEST_SUCCESS)
+ self.__datahandler.saveResult(result)
+ return TEST_SUCCESS
- # if certs dont match, means the exit node has been messing with the cert
- plog('ERROR', 'Exit node ' + exit_node + ' seems to be meddling with certificates. (' + address + ')')
+ # if certs dont match, means the exit node has been messing with the cert
+ plog('ERROR', 'Exit node ' + exit_node + ' seems to be meddling with certificates. (' + address + ')')
- cert_file_name = ssl_certs_dir + address + '_' + exit_node[1:] + '.pem'
- cert_file = open(cert_file_name, 'w')
- cert_file.write(cert_pem)
- cert_file.close()
+ cert_file_name = ssl_certs_dir + address + '_' + exit_node[1:] + '.pem'
+ cert_file = open(cert_file_name, 'w')
+ cert_file.write(cert_pem)
+ cert_file.close()
- result = SSLTestResult(exit_node, address, cert_file_name, TEST_FAILURE)
- self.__datahandler.saveResult(result)
+ result = SSLTestResult(exit_node, address, cert_file_name, TEST_FAILURE)
+ self.__datahandler.saveResult(result)
- return TEST_FAILURE
+ return TEST_FAILURE
- def check_smtp(self, address, port=''):
- '''
- check whether smtp + tls connection to a given address is molested
- this is done by going through the STARTTLS sequence and comparing server
- responses for the direct and tor connections
- '''
+ def check_smtp(self, address, port=''):
+ '''
+ check whether smtp + tls connection to a given address is molested
+ this is done by going through the STARTTLS sequence and comparing server
+ responses for the direct and tor connections
+ '''
- plog('INFO', 'Conducting an smtp test with destination ' + address)
+ plog('INFO', 'Conducting an smtp test with destination ' + address)
- defaultsocket = socket.socket
- socks.setdefaultproxy(socks.PROXY_TYPE_SOCKS5, tor_host, tor_port)
- socket.socket = socks.socksocket
+ defaultsocket = socket.socket
+ socks.setdefaultproxy(socks.PROXY_TYPE_SOCKS5, tor_host, tor_port)
+ socket.socket = socks.socksocket
- ehlo1_reply = 0
- has_starttls = 0
- ehlo2_reply = 0
+ ehlo1_reply = 0
+ has_starttls = 0
+ ehlo2_reply = 0
- try:
- s = smtplib.SMTP(address, port)
- ehlo1_reply = s.ehlo()[0]
- if ehlo1_reply != 250:
- raise smtplib.SMTPException('First ehlo failed')
- has_starttls = s.has_extn('starttls')
- if not has_starttls:
- raise smtplib.SMTPException('It seems the server doesn\'t support starttls')
- s.starttls()
- # TODO check certs?
- ehlo2_reply = s.ehlo()[0]
- if ehlo2_reply != 250:
- raise smtplib.SMTPException('Second ehlo failed')
- except socket.gaierror, e:
- plog('WARN', 'A connection error occured while testing smtp at ' + address)
- plog('WARN', e)
- socket.socket = defaultsocket
- return TEST_INCONCLUSIVE
- except smtplib.SMTPException, e:
- plog('WARN','An error occured while testing smtp at ' + address)
- plog('WARN', e)
- return TEST_INCONCLUSIVE
- # reset the connection method back to direct
- socket.socket = defaultsocket
+ try:
+ s = smtplib.SMTP(address, port)
+ ehlo1_reply = s.ehlo()[0]
+ if ehlo1_reply != 250:
+ raise smtplib.SMTPException('First ehlo failed')
+ has_starttls = s.has_extn('starttls')
+ if not has_starttls:
+ raise smtplib.SMTPException('It seems the server doesn\'t support starttls')
+ s.starttls()
+ # TODO check certs?
+ ehlo2_reply = s.ehlo()[0]
+ if ehlo2_reply != 250:
+ raise smtplib.SMTPException('Second ehlo failed')
+ except socket.gaierror, e:
+ plog('WARN', 'A connection error occured while testing smtp at ' + address)
+ plog('WARN', e)
+ socket.socket = defaultsocket
+ return TEST_INCONCLUSIVE
+ except smtplib.SMTPException, e:
+ plog('WARN','An error occured while testing smtp at ' + address)
+ plog('WARN', e)
+ return TEST_INCONCLUSIVE
+ # reset the connection method back to direct
+ socket.socket = defaultsocket
- # check whether the test was valid at all
- exit_node = self.get_exit_node()
- if exit_node == 0 or exit_node == '0':
- plog('INFO', 'We had no exit node to test, skipping to the next test.')
- return TEST_SUCCESS
+ # check whether the test was valid at all
+ exit_node = self.get_exit_node()
+ if exit_node == 0 or exit_node == '0':
+ plog('INFO', 'We had no exit node to test, skipping to the next test.')
+ return TEST_SUCCESS
- # now directly
+ # now directly
- ehlo1_reply_d = 0
- has_starttls_d = 0
- ehlo2_reply_d = 0
+ ehlo1_reply_d = 0
+ has_starttls_d = 0
+ ehlo2_reply_d = 0
- try:
- s = smtplib.SMTP(address, port)
- ehlo1_reply_d = s.ehlo()[0]
- if ehlo1_reply != 250:
- raise smtplib.SMTPException('First ehlo failed')
- has_starttls_d = s.has_extn('starttls')
- if not has_starttls_d:
- raise smtplib.SMTPException('It seems that the server doesn\'t support starttls')
- s.starttls()
- ehlo2_reply_d = s.ehlo()[0]
- if ehlo2_reply_d != 250:
- raise smtplib.SMTPException('Second ehlo failed')
- except socket.gaierror, e:
- plog('WARN', 'A connection error occured while testing smtp at ' + address)
- plog('WARN', e)
- socket.socket = defaultsocket
- return TEST_INCONCLUSIVE
- except smtplib.SMTPException, e:
- plog('WARN', 'An error occurred while testing smtp at ' + address)
- plog('WARN', e)
- return TEST_INCONCLUSIVE
+ try:
+ s = smtplib.SMTP(address, port)
+ ehlo1_reply_d = s.ehlo()[0]
+ if ehlo1_reply != 250:
+ raise smtplib.SMTPException('First ehlo failed')
+ has_starttls_d = s.has_extn('starttls')
+ if not has_starttls_d:
+ raise smtplib.SMTPException('It seems that the server doesn\'t support starttls')
+ s.starttls()
+ ehlo2_reply_d = s.ehlo()[0]
+ if ehlo2_reply_d != 250:
+ raise smtplib.SMTPException('Second ehlo failed')
+ except socket.gaierror, e:
+ plog('WARN', 'A connection error occured while testing smtp at ' + address)
+ plog('WARN', e)
+ socket.socket = defaultsocket
+ return TEST_INCONCLUSIVE
+ except smtplib.SMTPException, e:
+ plog('WARN', 'An error occurred while testing smtp at ' + address)
+ plog('WARN', e)
+ return TEST_INCONCLUSIVE
- print ehlo1_reply, ehlo1_reply_d, has_starttls, has_starttls_d, ehlo2_reply, ehlo2_reply_d
+ print ehlo1_reply, ehlo1_reply_d, has_starttls, has_starttls_d, ehlo2_reply, ehlo2_reply_d
- # compare
- if ehlo1_reply != ehlo1_reply_d or has_starttls != has_starttls_d or ehlo2_reply != ehlo2_reply_d:
- result = SMTPTestResult(exit_node, address, TEST_FAILURE)
- self.__datahandler.saveResult(result)
- # XXX: Log?
- return TEST_FAILURE
+ # compare
+ if ehlo1_reply != ehlo1_reply_d or has_starttls != has_starttls_d or ehlo2_reply != ehlo2_reply_d:
+ result = SMTPTestResult(exit_node, address, TEST_FAILURE)
+ self.__datahandler.saveResult(result)
+ # XXX: Log?
+ return TEST_FAILURE
- result = SMTPTestResult(exit_node, address, TEST_SUCCESS)
- self.__datahandler.saveResult(result)
- return TEST_SUCCESS
+ result = SMTPTestResult(exit_node, address, TEST_SUCCESS)
+ self.__datahandler.saveResult(result)
+ return TEST_SUCCESS
- def check_pop(self, address, port=''):
- '''
- check whether a pop + tls connection to a given address is molested
- it is implied that the server reads/sends messages compliant with RFC1939 & RFC2449
- '''
+ def check_pop(self, address, port=''):
+ '''
+ check whether a pop + tls connection to a given address is molested
+ it is implied that the server reads/sends messages compliant with RFC1939 & RFC2449
+ '''
- plog('INFO', 'Conducting a pop test with destination ' + address)
+ plog('INFO', 'Conducting a pop test with destination ' + address)
- if not port:
- port = 110
+ if not port:
+ port = 110
- defaultsocket = socket.socket
- socks.setdefaultproxy(socks.PROXY_TYPE_SOCKS5, tor_host, tor_port)
- socket.socket = socks.socksocket
+ defaultsocket = socket.socket
+ socks.setdefaultproxy(socks.PROXY_TYPE_SOCKS5, tor_host, tor_port)
+ socket.socket = socks.socksocket
- capabilities_ok = False
- starttls_present = False
- tls_started = None
- tls_succeeded = None
+ capabilities_ok = False
+ starttls_present = False
+ tls_started = None
+ tls_succeeded = None
- try:
- pop = Client(address, port)
-
- # read the server greeting
- server_greeting = pop.readline()
+ try:
+ pop = Client(address, port)
+
+ # read the server greeting
+ server_greeting = pop.readline()
- # get the server capabilities
- pop.writeline('CAPA')
- capabilities = ''
- while 1:
- curr = pop.readline()
- if '+OK' in curr:
- capabilities_ok = True
- elif curr == '.':
- break
- elif 'STLS' in curr:
- starttls_present = True
-
- if not capabilities_ok:
- return TEST_INCONCLUSIVE
+ # get the server capabilities
+ pop.writeline('CAPA')
+ capabilities = ''
+ while 1:
+ curr = pop.readline()
+ if '+OK' in curr:
+ capabilities_ok = True
+ elif curr == '.':
+ break
+ elif 'STLS' in curr:
+ starttls_present = True
+
+ if not capabilities_ok:
+ return TEST_INCONCLUSIVE
- # try to start tls negotiation
- if starttls_present:
- pop.writeline('STLS')
+ # try to start tls negotiation
+ if starttls_present:
+ pop.writeline('STLS')
- starttls_response = pop.readline()
- starttls_started = '+OK' in starttls_response
+ starttls_response = pop.readline()
+ starttls_started = '+OK' in starttls_response
- # negotiate TLS and issue some request to feel good about it
- # TODO check certs?
- ctx = SSL.Context(SSL.SSLv23_METHOD)
- c = SSL.Connection(ctx, pop.sock)
- c.set_connect_state()
- c.do_handshake()
- c.send('CAPA' + linebreak)
-
- while tls_succeeded == None:
- line = ''
- char = None
- while char != '\n':
- char = c.read(1)
- if not char:
- break
- elif char == '.':
- tls_succeeded = False
- line += char
+ # negotiate TLS and issue some request to feel good about it
+ # TODO check certs?
+ ctx = SSL.Context(SSL.SSLv23_METHOD)
+ c = SSL.Connection(ctx, pop.sock)
+ c.set_connect_state()
+ c.do_handshake()
+ c.send('CAPA' + linebreak)
+
+ while tls_succeeded == None:
+ line = ''
+ char = None
+ while char != '\n':
+ char = c.read(1)
+ if not char:
+ break
+ elif char == '.':
+ tls_succeeded = False
+ line += char
- if '-ERR' in line:
- tls_succeeded = False
- elif '+OK' in line:
- tls_succeeded = True
- elif not line:
- tls_succeeded = False
+ if '-ERR' in line:
+ tls_succeeded = False
+ elif '+OK' in line:
+ tls_succeeded = True
+ elif not line:
+ tls_succeeded = False
- except socket.error, e:
- plog('WARN', 'Connection to ' + address + ':' + port + ' refused')
- plog('WARN', e)
- socket.socket = defaultsocket
- return TEST_INCONCLUSIVE
- except OpenSSL.SSL.SysCallError, e:
- plog('WARN', 'Error while negotiating an SSL connection to ' + address + ':' + port)
- plog('WARN', e)
- socket.socket = defaultsocket
- return TEST_INCONCLUSIVE
+ except socket.error, e:
+ plog('WARN', 'Connection to ' + address + ':' + port + ' refused')
+ plog('WARN', e)
+ socket.socket = defaultsocket
+ return TEST_INCONCLUSIVE
+ except OpenSSL.SSL.SysCallError, e:
+ plog('WARN', 'Error while negotiating an SSL connection to ' + address + ':' + port)
+ plog('WARN', e)
+ socket.socket = defaultsocket
+ return TEST_INCONCLUSIVE
- # reset the connection to default
- socket.socket = defaultsocket
+ # reset the connection to default
+ socket.socket = defaultsocket
- # check whether the test was valid at all
- exit_node = self.get_exit_node()
- if exit_node == 0 or exit_node == '0':
- plog('INFO', 'We had no exit node to test, skipping to the next test.')
- return TEST_SUCCESS
+ # check whether the test was valid at all
+ exit_node = self.get_exit_node()
+ if exit_node == 0 or exit_node == '0':
+ plog('INFO', 'We had no exit node to test, skipping to the next test.')
+ return TEST_SUCCESS
- # do the same for the direct connection
+ # do the same for the direct connection
- capabilities_ok_d = False
- starttls_present_d = False
- tls_started_d = None
- tls_succeeded_d = None
+ capabilities_ok_d = False
+ starttls_present_d = False
+ tls_started_d = None
+ tls_succeeded_d = None
- try:
- pop = Client(address, port)
-
- # read the server greeting
- server_greeting = pop.readline()
+ try:
+ pop = Client(address, port)
+
+ # read the server greeting
+ server_greeting = pop.readline()
- # get the server capabilities
- pop.writeline('CAPA')
- capabilities = ''
- while 1:
- curr = pop.readline()
- if '+OK' in curr:
- capabilities_ok_d = True
- elif curr == '.':
- break
- elif 'STLS' in curr:
- starttls_present_d = True
-
- if not capabilities_ok_d:
- return TEST_INCONCLUSIVE
+ # get the server capabilities
+ pop.writeline('CAPA')
+ capabilities = ''
+ while 1:
+ curr = pop.readline()
+ if '+OK' in curr:
+ capabilities_ok_d = True
+ elif curr == '.':
+ break
+ elif 'STLS' in curr:
+ starttls_present_d = True
+
+ if not capabilities_ok_d:
+ return TEST_INCONCLUSIVE
- # try to start tls negotiation
- if starttls_present_d:
- pop.writeline('STLS')
+ # try to start tls negotiation
+ if starttls_present_d:
+ pop.writeline('STLS')
- starttls_started_d = '+OK' in starttls_response
+ starttls_started_d = '+OK' in starttls_response
- # negotiate TLS, issue some request to feel good about it
- ctx = SSL.Context(SSL.SSLv23_METHOD)
- c = SSL.Connection(ctx, pop.sock)
- c.set_connect_state()
- c.do_handshake()
- c.send('CAPA' + linebreak)
-
- while tls_succeeded_d == None:
- line = ''
- char = None
- while char != '\n':
- char = c.read(1)
- if not char:
- break
- elif char == '.':
- tls_succeeded_d = False
- line += char
+ # negotiate TLS, issue some request to feel good about it
+ ctx = SSL.Context(SSL.SSLv23_METHOD)
+ c = SSL.Connection(ctx, pop.sock)
+ c.set_connect_state()
+ c.do_handshake()
+ c.send('CAPA' + linebreak)
+
+ while tls_succeeded_d == None:
+ line = ''
+ char = None
+ while char != '\n':
+ char = c.read(1)
+ if not char:
+ break
+ elif char == '.':
+ tls_succeeded_d = False
+ line += char
- if '-ERR' in line:
- tls_succeeded_d = False
- elif '+OK' in line:
- tls_succeeded_d = True
- elif not line:
- tls_succeeded_d = False
+ if '-ERR' in line:
+ tls_succeeded_d = False
+ elif '+OK' in line:
+ tls_succeeded_d = True
+ elif not line:
+ tls_succeeded_d = False
- except socket.error, e:
- plog('WARN', 'Connection to ' + address + ':' + port + ' refused')
- plog('WARN', e)
- socket.socket = defaultsocket
- return TEST_INCONCLUSIVE
- except OpenSSL.SSL.SysCallError, e:
- plog('WARN', 'Error while negotiating an SSL connection to ' + address + ':' + port)
- plog('WARN', e)
- socket.socket = defaultsocket
- return TEST_INCONCLUSIVE
+ except socket.error, e:
+ plog('WARN', 'Connection to ' + address + ':' + port + ' refused')
+ plog('WARN', e)
+ socket.socket = defaultsocket
+ return TEST_INCONCLUSIVE
+ except OpenSSL.SSL.SysCallError, e:
+ plog('WARN', 'Error while negotiating an SSL connection to ' + address + ':' + port)
+ plog('WARN', e)
+ socket.socket = defaultsocket
+ return TEST_INCONCLUSIVE
- # compare
- if (capabilities_ok != capabilities_ok_d or starttls_present != starttls_present_d or
- tls_started != tls_started_d or tls_succeeded != tls_succeeded_d):
- result = POPTestResult(exit_node, address, TEST_FAILURE)
- self.__datahandler.saveResult(result)
- # XXX: Log?
- return TEST_FAILURE
-
- result = POPTestResult(exit_node, address, TEST_SUCCESS)
- self.__datahandler.saveResult(result)
- return TEST_SUCCESS
+ # compare
+ if (capabilities_ok != capabilities_ok_d or starttls_present != starttls_present_d or
+ tls_started != tls_started_d or tls_succeeded != tls_succeeded_d):
+ result = POPTestResult(exit_node, address, TEST_FAILURE)
+ self.__datahandler.saveResult(result)
+ # XXX: Log?
+ return TEST_FAILURE
+
+ result = POPTestResult(exit_node, address, TEST_SUCCESS)
+ self.__datahandler.saveResult(result)
+ return TEST_SUCCESS
- def check_imap(self, address, port=''):
- '''
- check whether an imap + tls connection to a given address is molested
- it is implied that the server reads/sends messages compliant with RFC3501
- '''
- plog('INFO', 'Conducting an imap test with destination ' + address)
+ def check_imap(self, address, port=''):
+ '''
+ check whether an imap + tls connection to a given address is molested
+ it is implied that the server reads/sends messages compliant with RFC3501
+ '''
+ plog('INFO', 'Conducting an imap test with destination ' + address)
- if not port:
- port = 143
+ if not port:
+ port = 143
- defaultsocket = socket.socket
- socks.setdefaultproxy(socks.PROXY_TYPE_SOCKS5, tor_host, tor_port)
- socket.socket = socks.socksocket
-
- capabilities_ok = None
- starttls_present = None
- tls_started = None
- tls_succeeded = None
+ defaultsocket = socket.socket
+ socks.setdefaultproxy(socks.PROXY_TYPE_SOCKS5, tor_host, tor_port)
+ socket.socket = socks.socksocket
+
+ capabilities_ok = None
+ starttls_present = None
+ tls_started = None
+ tls_succeeded = None
- try:
- imap = Client(address, port)
+ try:
+ imap = Client(address, port)
- # read server greeting
- server_greeting = imap.readline()
+ # read server greeting
+ server_greeting = imap.readline()
- # get server capabilities
- imap.writeline('a001 CAPABILITY')
- capabilities = imap.readline() # first line - list of capabilities
- capabilities_ok = 'OK' in imap.readline() # second line - the request status
-
- if not capabilities_ok:
- return TEST_INCONCLUSIVE
+ # get server capabilities
+ imap.writeline('a001 CAPABILITY')
+ capabilities = imap.readline() # first line - list of capabilities
+ capabilities_ok = 'OK' in imap.readline() # second line - the request status
+
+ if not capabilities_ok:
+ return TEST_INCONCLUSIVE
- # check if starttls is present
- starttls_present = 'STARTTLS' in capabilities
+ # check if starttls is present
+ starttls_present = 'STARTTLS' in capabilities
- if starttls_present:
- imap.writeline('a002 STARTTLS')
- tls_started = 'OK' in imap.readline()
+ if starttls_present:
+ imap.writeline('a002 STARTTLS')
+ tls_started = 'OK' in imap.readline()
- # negotiate TLS, issue a request to feel good about it
- # TODO check the cert aswell ?
- ctx = SSL.Context(SSL.SSLv23_METHOD)
- c = SSL.Connection(ctx, imap.sock)
- c.set_connect_state()
- c.do_handshake()
- c.send('a003 CAPABILITY' + linebreak)
-
- while tls_succeeded == None:
- line = ''
- char = None
- while char != '\n':
- char = c.read(1)
- if not char:
- break
- line += char
+ # negotiate TLS, issue a request to feel good about it
+ # TODO check the cert aswell ?
+ ctx = SSL.Context(SSL.SSLv23_METHOD)
+ c = SSL.Connection(ctx, imap.sock)
+ c.set_connect_state()
+ c.do_handshake()
+ c.send('a003 CAPABILITY' + linebreak)
+
+ while tls_succeeded == None:
+ line = ''
+ char = None
+ while char != '\n':
+ char = c.read(1)
+ if not char:
+ break
+ line += char
- if 'Error' in line or 'error' in line:
- tls_succeeded = False
- elif 'OK' in line:
- tls_succeeded = True
- elif not line:
- tls_succeeded = False
+ if 'Error' in line or 'error' in line:
+ tls_succeeded = False
+ elif 'OK' in line:
+ tls_succeeded = True
+ elif not line:
+ tls_succeeded = False
+
+ except socket.error, e:
+ plog('WARN', 'Connection to ' + address + ':' + port + ' refused')
+ plog('WARN', e)
+ socket.socket = defaultsocket
+ return TEST_INCONCLUSIVE
+ except OpenSSL.SSL.SysCallError, e:
+ plog('WARN', 'Error while negotiating an SSL connection to ' + address + ':' + port)
+ plog('WARN', e)
+ socket.socket = defaultsocket
+ return TEST_INCONCLUSIVE
- except socket.error, e:
- plog('WARN', 'Connection to ' + address + ':' + port + ' refused')
- plog('WARN', e)
- socket.socket = defaultsocket
- return TEST_INCONCLUSIVE
- except OpenSSL.SSL.SysCallError, e:
- plog('WARN', 'Error while negotiating an SSL connection to ' + address + ':' + port)
- plog('WARN', e)
- socket.socket = defaultsocket
- return TEST_INCONCLUSIVE
-
- socket.socket = defaultsocket
+ socket.socket = defaultsocket
- # check whether the test was valid at all
- exit_node = self.get_exit_node()
- if exit_node == 0 or exit_node == '0':
- plog('INFO', 'We had no exit node to test, skipping to the next test.')
- return TEST_SUCCESS
+ # check whether the test was valid at all
+ exit_node = self.get_exit_node()
+ if exit_node == 0 or exit_node == '0':
+ plog('INFO', 'We had no exit node to test, skipping to the next test.')
+ return TEST_SUCCESS
- # do the same for the direct connection
- capabilities_ok_d = None
- starttls_present_d = None
- tls_started_d = None
- tls_succeeded_d = None
+ # do the same for the direct connection
+ capabilities_ok_d = None
+ starttls_present_d = None
+ tls_started_d = None
+ tls_succeeded_d = None
- try:
- imap = Client(address, port)
+ try:
+ imap = Client(address, port)
- # read server greeting
- server_greeting = imap.readline()
+ # read server greeting
+ server_greeting = imap.readline()
- # get server capabilities
- imap.writeline('a001 CAPABILITY')
- capabilities = imap.readline() # first line - list of capabilities
- capabilities_ok_d = 'OK' in imap.readline() # second line - the request status
+ # get server capabilities
+ imap.writeline('a001 CAPABILITY')
+ capabilities = imap.readline() # first line - list of capabilities
+ capabilities_ok_d = 'OK' in imap.readline() # second line - the request status
- if not capabilities_ok_d:
- return TEST_INCONCLUSIVE
+ if not capabilities_ok_d:
+ return TEST_INCONCLUSIVE
- # check if starttls is present
- starttls_present_d = 'STARTTLS' in capabilities
+ # check if starttls is present
+ starttls_present_d = 'STARTTLS' in capabilities
- if starttls_present_d:
- imap.writeline('a002 STARTTLS')
- tls_started = 'OK' in imap.readline()
+ if starttls_present_d:
+ imap.writeline('a002 STARTTLS')
+ tls_started = 'OK' in imap.readline()
- # negotiate TLS, issue some request to feel good about it
- ctx = SSL.Context(SSL.SSLv23_METHOD)
- c = SSL.Connection(ctx, imap.sock)
- c.set_connect_state()
- c.do_handshake()
- c.send('a003 CAPABILITY' + linebreak)
+ # negotiate TLS, issue some request to feel good about it
+ ctx = SSL.Context(SSL.SSLv23_METHOD)
+ c = SSL.Connection(ctx, imap.sock)
+ c.set_connect_state()
+ c.do_handshake()
+ c.send('a003 CAPABILITY' + linebreak)
- while tls_succeeded_d == None:
- line = ''
- char = None
- while char != '\n':
- char = c.read(1)
- if not char:
- break
- line += char
+ while tls_succeeded_d == None:
+ line = ''
+ char = None
+ while char != '\n':
+ char = c.read(1)
+ if not char:
+ break
+ line += char
- if 'Error' in line or 'error' in line:
- tls_succeeded_d = False
- elif 'OK' in line:
- tls_succeeded_d = True
- elif not line:
- tls_succeeded_d = False
+ if 'Error' in line or 'error' in line:
+ tls_succeeded_d = False
+ elif 'OK' in line:
+ tls_succeeded_d = True
+ elif not line:
+ tls_succeeded_d = False
- except socket.error, e:
- plog('WARN', 'Connection to ' + address + ':' + port + ' refused')
- plog('WARN', e)
- socket.socket = defaultsocket
- return TEST_INCONCLUSIVE
- except OpenSSL.SSL.SysCallError, e:
- plog('WARN', 'Error while negotiating an SSL connection to ' + address + ':' + port)
- plog('WARN', e)
- socket.socket = defaultsocket
- return TEST_INCONCLUSIVE
+ except socket.error, e:
+ plog('WARN', 'Connection to ' + address + ':' + port + ' refused')
+ plog('WARN', e)
+ socket.socket = defaultsocket
+ return TEST_INCONCLUSIVE
+ except OpenSSL.SSL.SysCallError, e:
+ plog('WARN', 'Error while negotiating an SSL connection to ' + address + ':' + port)
+ plog('WARN', e)
+ socket.socket = defaultsocket
+ return TEST_INCONCLUSIVE
- # compare
- if (capabilities_ok != capabilities_ok_d or starttls_present != starttls_present_d or
- tls_started != tls_started_d or tls_succeeded != tls_succeeded_d):
- result = IMAPTestResult(exit_node, address, TEST_FAILURE)
- self.__datahandler.saveResult(result)
- # XXX: log?
- return TEST_FAILURE
+ # compare
+ if (capabilities_ok != capabilities_ok_d or starttls_present != starttls_present_d or
+ tls_started != tls_started_d or tls_succeeded != tls_succeeded_d):
+ result = IMAPTestResult(exit_node, address, TEST_FAILURE)
+ self.__datahandler.saveResult(result)
+ # XXX: log?
+ return TEST_FAILURE
- result = IMAPTestResult(exit_node, address, TEST_SUCCESS)
- self.__datahandler.saveResult(result)
- return TEST_SUCCESS
+ result = IMAPTestResult(exit_node, address, TEST_SUCCESS)
+ self.__datahandler.saveResult(result)
+ return TEST_SUCCESS
- def check_dns(self, address):
- ''' A basic comparison DNS test. Rather unreliable. '''
- # TODO Spawns a lot of false positives (for ex. doesn't work for google.com).
- plog('INFO', 'Conducting a basic dns test for destination ' + address)
+ def check_dns(self, address):
+ ''' A basic comparison DNS test. Rather unreliable. '''
+ # TODO Spawns a lot of false positives (for ex. doesn't work for google.com).
+ plog('INFO', 'Conducting a basic dns test for destination ' + address)
- ip = tor_resolve(address)
+ ip = tor_resolve(address)
- # check whether the test was valid at all
- exit_node = self.get_exit_node()
- if exit_node == 0 or exit_node == '0':
- plog('INFO', 'We had no exit node to test, skipping to the next test.')
- return TEST_SUCCESS
+ # check whether the test was valid at all
+ exit_node = self.get_exit_node()
+ if exit_node == 0 or exit_node == '0':
+ plog('INFO', 'We had no exit node to test, skipping to the next test.')
+ return TEST_SUCCESS
- ips_d = Set([])
- try:
- results = socket.getaddrinfo(address,None)
- for result in results:
- ips_d.add(result[4][0])
- except socket.herror, e:
- plog('WARN', 'An error occured while performing a basic dns test')
- plog('WARN', e)
- return TEST_INCONCLUSIVE
+ ips_d = Set([])
+ try:
+ results = socket.getaddrinfo(address,None)
+ for result in results:
+ ips_d.add(result[4][0])
+ except socket.herror, e:
+ plog('WARN', 'An error occured while performing a basic dns test')
+ plog('WARN', e)
+ return TEST_INCONCLUSIVE
- if ip in ips_d:
- result = DNSTestResult(exit_node, address, TEST_SUCCESS)
- return TEST_SUCCESS
- else:
- plog('ERROR', 'The basic DNS test suspects ' + exit_node + ' to be malicious.')
- result = DNSTestResult(exit_node, address, TEST_FAILURE)
- return TEST_FAILURE
+ if ip in ips_d:
+ result = DNSTestResult(exit_node, address, TEST_SUCCESS)
+ return TEST_SUCCESS
+ else:
+ plog('ERROR', 'The basic DNS test suspects ' + exit_node + ' to be malicious.')
+ result = DNSTestResult(exit_node, address, TEST_FAILURE)
+ return TEST_FAILURE
- def check_dns_rebind(self):
- '''
- A DNS-rebind attack test that runs in the background and monitors REMAP events
- The test makes sure that external hosts are not resolved to private addresses
- '''
- plog('INFO', 'Monitoring REMAP events for weirdness')
- self.__dnshandler = DNSRebindScanner(self)
- self.__control.set_event_handler(self.__dnshandler)
- self.__control.set_events([TorCtl.EVENT_TYPE.STREAM], True)
+ def check_dns_rebind(self):
+ '''
+ A DNS-rebind attack test that runs in the background and monitors REMAP events
+ The test makes sure that external hosts are not resolved to private addresses
+ '''
+ plog('INFO', 'Monitoring REMAP events for weirdness')
+ self.__dnshandler = DNSRebindScanner(self)
+ self.__control.set_event_handler(self.__dnshandler)
+ self.__control.set_events([TorCtl.EVENT_TYPE.STREAM], True)
- def ssh_request(self):
- pass
+ def ssh_request(self):
+ pass
- def ssl_request(self, address):
- ''' initiate an ssl connection and return the server certificate '''
- address=str(address) # Unicode hostnames not supported..
-
- # specify the context
- ctx = SSL.Context(SSL.SSLv23_METHOD)
- ctx.set_verify_depth(1)
+ def ssl_request(self, address):
+ ''' initiate an ssl connection and return the server certificate '''
+ address=str(address) # Unicode hostnames not supported..
+
+ # specify the context
+ ctx = SSL.Context(SSL.SSLv23_METHOD)
+ ctx.set_verify_depth(1)
- # ready the certificate request
- request = crypto.X509Req()
+ # ready the certificate request
+ request = crypto.X509Req()
- # open an ssl connection
- s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
- c = SSL.Connection(ctx, s)
- c.set_connect_state()
-
- try:
- c.connect((address, 443))
- c.send(crypto.dump_certificate_request(crypto.FILETYPE_PEM,request))
- except socket.error, e:
- plog('WARN','An error occured while opening an ssl connection to ' + address)
- plog('WARN', e)
- return 0
- except (IndexError, TypeError):
- plog('WARN', 'An error occured while negotiating socks5 with Tor (timeout?)')
- return 0
- except KeyboardInterrupt:
- raise KeyboardInterrupt
- except:
- plog('WARN', 'An unknown SSL error occured for '+address)
- traceback.print_exc()
- return 0
-
- # return the cert
- return c.get_peer_certificate()
+ # open an ssl connection
+ s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+ c = SSL.Connection(ctx, s)
+ c.set_connect_state()
+
+ try:
+ c.connect((address, 443))
+ c.send(crypto.dump_certificate_request(crypto.FILETYPE_PEM,request))
+ except socket.error, e:
+ plog('WARN','An error occured while opening an ssl connection to ' + address)
+ plog('WARN', e)
+ return 0
+ except (IndexError, TypeError):
+ plog('WARN', 'An error occured while negotiating socks5 with Tor (timeout?)')
+ return 0
+ except KeyboardInterrupt:
+ raise KeyboardInterrupt
+ except:
+ plog('WARN', 'An unknown SSL error occured for '+address)
+ traceback.print_exc()
+ return 0
+
+ # return the cert
+ return c.get_peer_certificate()
# some helpful methods
def load_wordlist(file):
- ''' load a list of strings from a file (which contains words separated by newlines) '''
- plog('INFO', 'Loading the wordlist')
-
- wordlist = []
- fh = None
- try:
- fh = open(file, 'r')
- except IOError, e:
- plog('ERROR', 'Reading the wordlist file failed.')
- plog('ERROR', e)
-
- try:
- for line in fh:
- wordlist.append(line[:-1]) # get rid of the linebreaks
- finally:
- fh.close()
+ ''' load a list of strings from a file (which contains words separated by newlines) '''
+ plog('INFO', 'Loading the wordlist')
+
+ wordlist = []
+ fh = None
+ try:
+ fh = open(file, 'r')
+ except IOError, e:
+ plog('ERROR', 'Reading the wordlist file failed.')
+ plog('ERROR', e)
+
+ try:
+ for line in fh:
+ wordlist.append(line[:-1]) # get rid of the linebreaks
+ finally:
+ fh.close()
- return wordlist
+ return wordlist
def decompress_response_data(response):
- encoding = None
+ encoding = None
- # a reponse to a httplib.HTTPRequest
- if (response.__class__.__name__ == "HTTPResponse"):
- encoding = response.getheader("Content-Encoding")
- # a response to urllib2.urlopen()
- elif (response.__class__.__name__ == "addinfourl"):
- encoding = response.info().get("Content-Encoding")
+ # a reponse to a httplib.HTTPRequest
+ if (response.__class__.__name__ == "HTTPResponse"):
+ encoding = response.getheader("Content-Encoding")
+ # a response to urllib2.urlopen()
+ elif (response.__class__.__name__ == "addinfourl"):
+ encoding = response.info().get("Content-Encoding")
- if encoding == 'gzip' or encoding == 'x-gzip':
- return gzip.GzipFile('', 'rb', 9, StringIO.StringIO(response.read())).read()
- elif encoding == 'deflate':
- return StringIO.StringIO(zlib.decompress(response.read())).read()
- else:
- return response.read()
+ if encoding == 'gzip' or encoding == 'x-gzip':
+ return gzip.GzipFile('', 'rb', 9, StringIO.StringIO(response.read())).read()
+ elif encoding == 'deflate':
+ return StringIO.StringIO(zlib.decompress(response.read())).read()
+ else:
+ return response.read()
def get_urls(wordlist, host_only=False, filetypes=['any'], results_per_type=5, protocol='any', g_results_per_page=10):
- '''
- construct a list of urls based on the wordlist, filetypes and protocol.
-
- Note: since we currently use google, which doesn't index by protocol,
- searches for anything but 'any' could be rather slow
- '''
- plog('INFO', 'Searching google for relevant sites...')
+ '''
+ construct a list of urls based on the wordlist, filetypes and protocol.
+
+ Note: since we currently use google, which doesn't index by protocol,
+ searches for anything but 'any' could be rather slow
+ '''
+ plog('INFO', 'Searching google for relevant sites...')
- urllist = []
- for filetype in filetypes:
- type_urls = []
+ urllist = []
+ for filetype in filetypes:
+ type_urls = []
- while len(type_urls) < results_per_type:
- query = random.choice(wordlist)
- if filetype != 'any':
- query += ' filetype:' + filetype
- if protocol != 'any':
- query += ' inurl:' + protocol # this isn't too reliable, but we'll re-filter results later
- #query += '&num=' + `g_results_per_page`
+ while len(type_urls) < results_per_type:
+ query = random.choice(wordlist)
+ if filetype != 'any':
+ query += ' filetype:' + filetype
+ if protocol != 'any':
+ query += ' inurl:' + protocol # this isn't too reliable, but we'll re-filter results later
+ #query += '&num=' + `g_results_per_page`
- # search google for relevant pages
- # note: google only accepts requests from idenitified browsers
- # TODO gracefully handle the case when google doesn't want to give us result anymore
- host = 'www.google.com'
- params = urllib.urlencode({'q' : query})
- search_path = '/search' + '?' + params
- search_url = "http://"+host+search_path
- connection = None
- response = None
+ # search google for relevant pages
+ # note: google only accepts requests from idenitified browsers
+ # TODO gracefully handle the case when google doesn't want to give us result anymore
+ host = 'www.google.com'
+ params = urllib.urlencode({'q' : query})
+ search_path = '/search' + '?' + params
+ search_url = "http://"+host+search_path
+ connection = None
+ response = None
- try:
- # XXX: This does not handle http error codes.. (like 302!)
- content = http_request(search_url, google_cookies)
- except socket.gaierror, e:
- plog('ERROR', 'Scraping of http://'+host+search_path+" failed")
- traceback.print_exc()
- return list(Set(urllist))
- except:
- plog('ERROR', 'Scraping of http://'+host+search_path+" failed")
- traceback.print_exc()
- # Bloody hack just to run some tests overnight
- return [protocol+"://www.eff.org", protocol+"://www.fastmail.fm", protocol+"://www.torproject.org", protocol+"://secure.wikileaks.org/"]
+ try:
+ # XXX: This does not handle http error codes.. (like 302!)
+ content = http_request(search_url, google_cookies)
+ except socket.gaierror, e:
+ plog('ERROR', 'Scraping of http://'+host+search_path+" failed")
+ traceback.print_exc()
+ return list(Set(urllist))
+ except:
+ plog('ERROR', 'Scraping of http://'+host+search_path+" failed")
+ traceback.print_exc()
+ # Bloody hack just to run some tests overnight
+ return [protocol+"://www.eff.org", protocol+"://www.fastmail.fm", protocol+"://www.torproject.org", protocol+"://secure.wikileaks.org/"]
- links = SoupStrainer('a')
- try:
- soup = BeautifulSoup(content, parseOnlyThese=links)
- except Exception, e:
- plog('ERROR', 'Soup-scraping of http://'+host+search_path+" failed")
- traceback.print_exc()
- print "Content is: "+str(content)
- return [protocol+"://www.eff.org", protocol+"://www.fastmail.fm", protocol+"://www.torproject.org", protocol+"://secure.wikileaks.org/"]
-
- # get the links and do some additional filtering
- for link in soup.findAll('a', {'class' : 'l'}):
- url = link['href']
- if (protocol != 'any' and url[:len(protocol)] != protocol or
- filetype != 'any' and url[-len(filetype):] != filetype):
- pass
- else:
- if host_only:
- host = urlparse.urlparse(link['href'])[1]
- type_urls.append(host)
- else:
- type_urls.append(link['href'])
-
- if type_urls > results_per_type:
- type_urls = random.sample(type_urls, results_per_type) # make sure we don't get more urls than needed
- urllist.extend(type_urls)
-
- return list(Set(urllist))
+ links = SoupStrainer('a')
+ try:
+ soup = BeautifulSoup(content, parseOnlyThese=links)
+ except Exception, e:
+ plog('ERROR', 'Soup-scraping of http://'+host+search_path+" failed")
+ traceback.print_exc()
+ print "Content is: "+str(content)
+ return [protocol+"://www.eff.org", protocol+"://www.fastmail.fm", protocol+"://www.torproject.org", protocol+"://secure.wikileaks.org/"]
+
+ # get the links and do some additional filtering
+ for link in soup.findAll('a', {'class' : 'l'}):
+ url = link['href']
+ if (protocol != 'any' and url[:len(protocol)] != protocol or
+ filetype != 'any' and url[-len(filetype):] != filetype):
+ pass
+ else:
+ if host_only:
+ host = urlparse.urlparse(link['href'])[1]
+ type_urls.append(host)
+ else:
+ type_urls.append(link['href'])
+
+ if type_urls > results_per_type:
+ type_urls = random.sample(type_urls, results_per_type) # make sure we don't get more urls than needed
+ urllist.extend(type_urls)
+
+ return list(Set(urllist))
def tor_resolve(address):
- ''' performs a DNS query explicitly via tor '''
- return commands.getoutput("tor-resolve " + address)
+ ''' performs a DNS query explicitly via tor '''
+ return commands.getoutput("tor-resolve " + address)
def int2bin(n):
- '''
- simple decimal -> binary conversion, needed for comparing IP addresses
- '''
- n = int(n)
- if n < 0:
- raise ValueError, "Negative values are not accepted."
- elif n == 0:
- return '0'
- else:
- bin = ''
- while n > 0:
- bin += str(n % 2)
- n = n >> 1
- return bin[::-1]
+ '''
+ simple decimal -> binary conversion, needed for comparing IP addresses
+ '''
+ n = int(n)
+ if n < 0:
+ raise ValueError, "Negative values are not accepted."
+ elif n == 0:
+ return '0'
+ else:
+ bin = ''
+ while n > 0:
+ bin += str(n % 2)
+ n = n >> 1
+ return bin[::-1]
class NoURLsFound(Exception):
- pass
+ pass
class Test:
- def __init__(self, scanner, proto, port, url_fcn, test_fcn):
- self.proto = proto
- self.port = port
- self.scanner = scanner
- self.url_fcn = url_fcn
- self.test_fcn = test_fcn
- self.rewind()
+ def __init__(self, scanner, proto, port, url_fcn, test_fcn):
+ self.proto = proto
+ self.port = port
+ self.scanner = scanner
+ self.url_fcn = url_fcn
+ self.test_fcn = test_fcn
+ self.rewind()
- def run_test(self):
- self.tests_run += 1
- return self.test_fcn(random.choice(self.urls))
+ def run_test(self):
+ self.tests_run += 1
+ return self.test_fcn(random.choice(self.urls))
- def get_node(self):
- return random.choice(self.nodes)
+ def get_node(self):
+ return random.choice(self.nodes)
- def mark_chosen(self, node):
- self.nodes_marked += 1
- self.nodes.remove(node)
-
- def finished(self):
- return not self.nodes
+ def mark_chosen(self, node):
+ self.nodes_marked += 1
+ self.nodes.remove(node)
+
+ def finished(self):
+ return not self.nodes
- def percent_complete(self):
- return round(100.0*self.nodes_marked/self.total_nodes, 1)
+ def percent_complete(self):
+ return round(100.0*self.nodes_marked/self.total_nodes, 1)
- def rewind(self):
- self.tests_run = 0
- self.nodes_marked = 0
- self.urls = self.url_fcn()
- if not self.urls:
- raise NoURLsFound("No URLS found for protocol "+self.proto)
- urls = "\n\t".join(self.urls)
- plog("INFO", "Using the following urls for "+self.proto+" scan:\n\t"+urls)
- self.nodes = self.scanner.get_nodes_for_port(self.port)
- self.node_map = {}
- for n in self.nodes:
- self.node_map[n.idhex] = n
- self.total_nodes = len(self.nodes)
+ def rewind(self):
+ self.tests_run = 0
+ self.nodes_marked = 0
+ self.urls = self.url_fcn()
+ if not self.urls:
+ raise NoURLsFound("No URLS found for protocol "+self.proto)
+ urls = "\n\t".join(self.urls)
+ plog("INFO", "Using the following urls for "+self.proto+" scan:\n\t"+urls)
+ self.nodes = self.scanner.get_nodes_for_port(self.port)
+ self.node_map = {}
+ for n in self.nodes:
+ self.node_map[n.idhex] = n
+ self.total_nodes = len(self.nodes)
#
# main logic
#
def main(argv):
- # make sure we have something to test for
- if len(argv) < 2:
- print ''
- print 'Please provide at least one test option:'
- print '--ssl (~works)'
- print '--http (gives some false positives)'
- print '--ssh (doesn\'t work yet)'
- print '--smtp (~works)'
- print '--pop (~works)'
- print '--imap (~works)'
- print '--dnsrebind (works with the ssl test)'
- print '--policies (~works)'
- print ''
- return
+ # make sure we have something to test for
+ if len(argv) < 2:
+ print ''
+ print 'Please provide at least one test option:'
+ print '--ssl (~works)'
+ print '--http (gives some false positives)'
+ print '--ssh (doesn\'t work yet)'
+ print '--smtp (~works)'
+ print '--pop (~works)'
+ print '--imap (~works)'
+ print '--dnsrebind (works with the ssl test)'
+ print '--policies (~works)'
+ print ''
+ return
- opts = ['ssl','http','ssh','smtp','pop','imap','dns','dnsrebind','policies']
- flags, trailer = getopt.getopt(argv[1:], [], opts)
-
- # get specific test types
- do_ssl = ('--ssl','') in flags
- do_http = ('--http','') in flags
- do_ssh = ('--ssh','') in flags
- do_smtp = ('--smtp','') in flags
- do_pop = ('--pop','') in flags
- do_imap = ('--imap','') in flags
- do_dns_rebind = ('--dnsrebind','') in flags
- do_consistency = ('--policies','') in flags
+ opts = ['ssl','http','ssh','smtp','pop','imap','dns','dnsrebind','policies']
+ flags, trailer = getopt.getopt(argv[1:], [], opts)
+
+ # get specific test types
+ do_ssl = ('--ssl','') in flags
+ do_http = ('--http','') in flags
+ do_ssh = ('--ssh','') in flags
+ do_smtp = ('--smtp','') in flags
+ do_pop = ('--pop','') in flags
+ do_imap = ('--imap','') in flags
+ do_dns_rebind = ('--dnsrebind','') in flags
+ do_consistency = ('--policies','') in flags
- # load the wordlist to search for sites lates on
- wordlist = load_wordlist(wordlist_file)
+ # load the wordlist to search for sites lates on
+ wordlist = load_wordlist(wordlist_file)
- # initiate the scanner
- scanner = ExitNodeScanner()
+ # initiate the scanner
+ scanner = ExitNodeScanner()
- # initiate the passive dns rebind attack monitor
- if do_dns_rebind:
- scanner.check_dns_rebind()
+ # initiate the passive dns rebind attack monitor
+ if do_dns_rebind:
+ scanner.check_dns_rebind()
- # check for sketchy exit policies
- if do_consistency:
- scanner.check_all_exits_port_consistency()
+ # check for sketchy exit policies
+ if do_consistency:
+ scanner.check_all_exits_port_consistency()
- # maybe only the consistency test was required
- if not (do_ssl or do_http or do_ssh or do_smtp or do_pop or do_imap):
- plog('INFO', 'Done.')
- return
+ # maybe only the consistency test was required
+ if not (do_ssl or do_http or do_ssh or do_smtp or do_pop or do_imap):
+ plog('INFO', 'Done.')
+ return
- # Load the cookie jar
- global google_cookies
- google_cookies = cookielib.LWPCookieJar()
- if os.path.isfile(google_cookie_file):
- google_cookies.load(google_cookie_file)
- google_cookies.__filename = google_cookie_file
+ # Load the cookie jar
+ global google_cookies
+ google_cookies = cookielib.LWPCookieJar()
+ if os.path.isfile(google_cookie_file):
+ google_cookies.load(google_cookie_file)
+ google_cookies.__filename = google_cookie_file
- # declare some variables and assign values if neccessary
- http_fail = 0
+ # declare some variables and assign values if neccessary
+ http_fail = 0
- tests = {}
+ tests = {}
- # FIXME: Create an event handler that updates these lists
- if do_ssl:
- try:
- tests["SSL"] = Test(scanner, "SSL", 443,
- lambda:
- get_urls(wordlist, protocol='https', host_only=True, results_per_type=10,
+ # FIXME: Create an event handler that updates these lists
+ if do_ssl:
+ try:
+ tests["SSL"] = Test(scanner, "SSL", 443,
+ lambda:
+ get_urls(wordlist, protocol='https', host_only=True, results_per_type=10,
g_results_per_page=20), lambda u: scanner.check_openssl(u))
- except NoURLsFound, e:
- plog('ERROR', e.message)
+ except NoURLsFound, e:
+ plog('ERROR', e.message)
- if do_http:
- http_fail = len(scanner.http_fail)
- try:
- tests["HTTP"] = Test(scanner, "HTTP", 80,
- lambda:
- get_urls(wordlist, protocol='http', results_per_type=10, g_results_per_page=20), lambda u: scanner.check_http(u))
- except NoURLsFound, e:
- plog('ERROR', e.message)
+ if do_http:
+ http_fail = len(scanner.http_fail)
+ try:
+ tests["HTTP"] = Test(scanner, "HTTP", 80,
+ lambda:
+ get_urls(wordlist, protocol='http', results_per_type=10, g_results_per_page=20), lambda u: scanner.check_http(u))
+ except NoURLsFound, e:
+ plog('ERROR', e.message)
- if do_ssh:
- try:
- tests["SSH"] = Test(scanner, "SSH", 22, lambda: [],
- lambda u: scanner.check_openssh(u))
- except NoURLsFound, e:
- plog('ERROR', e.message)
+ if do_ssh:
+ try:
+ tests["SSH"] = Test(scanner, "SSH", 22, lambda: [],
+ lambda u: scanner.check_openssh(u))
+ except NoURLsFound, e:
+ plog('ERROR', e.message)
- if do_smtp:
- try:
- tests["SMTP"] = Test(scanner, "SMTP", 587,
- lambda: [('smtp.gmail.com','587')],
- lambda u: scanner.check_smtp(*u))
- except NoURLsFound, e:
- plog('ERROR', e.message)
-
- if do_pop:
- try:
- tests["POP"] = Test(scanner, "POP", 110, lambda: [],
- lambda u: scanner.check_pop(*u))
- except NoURLsFound, e:
- plog('ERROR', e.message)
+ if do_smtp:
+ try:
+ tests["SMTP"] = Test(scanner, "SMTP", 587,
+ lambda: [('smtp.gmail.com','587')],
+ lambda u: scanner.check_smtp(*u))
+ except NoURLsFound, e:
+ plog('ERROR', e.message)
+
+ if do_pop:
+ try:
+ tests["POP"] = Test(scanner, "POP", 110, lambda: [],
+ lambda u: scanner.check_pop(*u))
+ except NoURLsFound, e:
+ plog('ERROR', e.message)
- if do_imap:
- try:
- tests["IMAP"] = Test(scanner, "IMAP", 143, lambda: [],
- lambda u: scanner.check_imap(*u))
- except NoURLsFound, e:
- plog('ERROR', e.message)
+ if do_imap:
+ try:
+ tests["IMAP"] = Test(scanner, "IMAP", 143, lambda: [],
+ lambda u: scanner.check_imap(*u))
+ except NoURLsFound, e:
+ plog('ERROR', e.message)
- # maybe no tests could be initialized
- if not (do_ssl or do_http or do_ssh or do_smtp or do_pop or do_imap):
- plog('INFO', 'Done.')
- sys.exit(0)
-
- # start testing
- while 1:
- # Get as much milage out of each exit as we safely can:
- # Run a random subset of our tests in random order
- avail_tests = tests.values()
- n_tests = random.choice(xrange(1,len(avail_tests)+1))
-
- to_run = random.sample(avail_tests, n_tests)
+ # maybe no tests could be initialized
+ if not (do_ssl or do_http or do_ssh or do_smtp or do_pop or do_imap):
+ plog('INFO', 'Done.')
+ sys.exit(0)
+
+ # start testing
+ while 1:
+ # Get as much milage out of each exit as we safely can:
+ # Run a random subset of our tests in random order
+ avail_tests = tests.values()
+ n_tests = random.choice(xrange(1,len(avail_tests)+1))
+
+ to_run = random.sample(avail_tests, n_tests)
- common_nodes = None
- # Do set intersection and reuse nodes for shared tests
- for test in to_run:
- if not common_nodes: common_nodes = Set(map(lambda n: n.idhex, test.nodes))
- else: common_nodes &= Set(map(lambda n: n.idhex, test.nodes))
+ common_nodes = None
+ # Do set intersection and reuse nodes for shared tests
+ for test in to_run:
+ if not common_nodes: common_nodes = Set(map(lambda n: n.idhex, test.nodes))
+ else: common_nodes &= Set(map(lambda n: n.idhex, test.nodes))
- if common_nodes:
- current_exit_idhex = random.choice(list(common_nodes))
- plog("DEBUG", "Chose to run "+str(n_tests)+" tests via "+current_exit_idhex+" (tests share "+str(len(common_nodes))+" exit nodes")
+ if common_nodes:
+ current_exit_idhex = random.choice(list(common_nodes))
+ plog("DEBUG", "Chose to run "+str(n_tests)+" tests via "+current_exit_idhex+" (tests share "+str(len(common_nodes))+" exit nodes")
- scanner.set_new_exit(current_exit_idhex)
- scanner.get_new_circuit()
- for test in to_run:
- # Keep testing failures and inconclusives
- result = test.run_test()
- if result == TEST_SUCCESS:
- test.mark_chosen(test.node_map[current_exit_idhex])
- plog("INFO", test.proto+" test via "+current_exit_idhex+" has result "+str(result))
- plog("INFO", test.proto+" attempts: "+str(test.tests_run)+". Completed: "+str(test.nodes_marked)+"/"+str(test.total_nodes)+" ("+str(test.percent_complete())+"%)")
- else:
- plog("NOTICE", "No nodes in common between "+", ".join(map(lambda t: t.proto, to_run)))
- for test in to_run:
- current_exit = test.get_node()
- scanner.set_new_exit(current_exit.idhex)
- scanner.get_new_circuit()
- # Keep testing failures and inconclusives
- result = test.run_test()
- plog("INFO", test.proto+" test via "+current_exit.idhex+" has result "+str(result))
- plog("INFO", test.proto+" attempts: "+str(test.tests_run)+". Completed: "+str(test.nodes_marked)+"/"+str(test.total_nodes)+" ("+str(test.percent_complete())+"%)")
- if result == TEST_SUCCESS:
- test.mark_chosen(current_exit)
-
+ scanner.set_new_exit(current_exit_idhex)
+ scanner.get_new_circuit()
+ for test in to_run:
+ # Keep testing failures and inconclusives
+ result = test.run_test()
+ if result == TEST_SUCCESS:
+ test.mark_chosen(test.node_map[current_exit_idhex])
+ plog("INFO", test.proto+" test via "+current_exit_idhex+" has result "+str(result))
+ plog("INFO", test.proto+" attempts: "+str(test.tests_run)+". Completed: "+str(test.nodes_marked)+"/"+str(test.total_nodes)+" ("+str(test.percent_complete())+"%)")
+ else:
+ plog("NOTICE", "No nodes in common between "+", ".join(map(lambda t: t.proto, to_run)))
+ for test in to_run:
+ current_exit = test.get_node()
+ scanner.set_new_exit(current_exit.idhex)
+ scanner.get_new_circuit()
+ # Keep testing failures and inconclusives
+ result = test.run_test()
+ plog("INFO", test.proto+" test via "+current_exit.idhex+" has result "+str(result))
+ plog("INFO", test.proto+" attempts: "+str(test.tests_run)+". Completed: "+str(test.nodes_marked)+"/"+str(test.total_nodes)+" ("+str(test.percent_complete())+"%)")
+ if result == TEST_SUCCESS:
+ test.mark_chosen(current_exit)
+
- # Check each test for rewind
- for test in tests.itervalues():
- if test.finished():
- plog("NOTICE", test.proto+" test has finished all nodes. Rewinding")
- test.rewind()
-
- #
- # managing url lists
- # if we've been having too many false positives lately, get a new target list
- # XXX: Do this on a per-url basis
+ # Check each test for rewind
+ for test in tests.itervalues():
+ if test.finished():
+ plog("NOTICE", test.proto+" test has finished all nodes. Rewinding")
+ test.rewind()
+
+ #
+ # managing url lists
+ # if we've been having too many false positives lately, get a new target list
+ # XXX: Do this on a per-url basis
- #if do_http and len(scanner.http_fail) - http_fail >= len(http_urls):
- # http_urls = get_urls(wordlist, protocol='http', results_per_type=10, g_results_per_page=20)
- # http_fail = len(scanner.http_fail)
-
+ #if do_http and len(scanner.http_fail) - http_fail >= len(http_urls):
+ # http_urls = get_urls(wordlist, protocol='http', results_per_type=10, g_results_per_page=20)
+ # http_fail = len(scanner.http_fail)
+
#
# initiate the program
#
if __name__ == '__main__':
- try:
- main(sys.argv)
- except KeyboardInterrupt:
- plog('INFO', "Ctrl + C was pressed. Exiting ... ")
- traceback.print_exc()
- except Exception, e:
- plog('ERROR', "An unexpected error occured.")
- traceback.print_exc()
+ try:
+ main(sys.argv)
+ except KeyboardInterrupt:
+ plog('INFO', "Ctrl + C was pressed. Exiting ... ")
+ traceback.print_exc()
+ except Exception, e:
+ plog('ERROR', "An unexpected error occured.")
+ traceback.print_exc()
Modified: torflow/trunk/NetworkScanners/soatcli.py
===================================================================
--- torflow/trunk/NetworkScanners/soatcli.py 2009-01-25 11:47:53 UTC (rev 18267)
+++ torflow/trunk/NetworkScanners/soatcli.py 2009-01-25 11:56:57 UTC (rev 18268)
@@ -20,187 +20,187 @@
#
class StatsConsole:
- ''' Class to display statistics from CLI'''
+ ''' Class to display statistics from CLI'''
+
+ def Listen(self):
+ while 1:
+ input = raw_input(">>>")
+ if input == 'e' or input == 'exit':
+ exit()
+ elif input == 's' or input == 'summary':
+ self.Summary()
+ elif input == 'h' or input == 'help' or len(input) > 6:
+ self.Help()
+ else:
+ self.Reply(input)
+
+ def Summary(self):
+ dh = DataHandler()
+ data = dh.getAll()
- def Listen(self):
- while 1:
- input = raw_input(">>>")
- if input == 'e' or input == 'exit':
- exit()
- elif input == 's' or input == 'summary':
- self.Summary()
- elif input == 'h' or input == 'help' or len(input) > 6:
- self.Help()
- else:
- self.Reply(input)
-
- def Summary(self):
- dh = DataHandler()
- data = dh.getAll()
-
- nodeSet = Set([])
- sshSet = Set([])
- sslSet = Set([])
- httpSet = Set([])
- smtpSet = Set([])
- popSet = Set([])
- imapSet = Set([])
- dnsSet = Set([])
- dnsrebindSet = Set([])
+ nodeSet = Set([])
+ sshSet = Set([])
+ sslSet = Set([])
+ httpSet = Set([])
+ smtpSet = Set([])
+ popSet = Set([])
+ imapSet = Set([])
+ dnsSet = Set([])
+ dnsrebindSet = Set([])
- total = len(data)
- good = bad = inconclusive = 0
- ssh = http = ssl = pop = imap = smtp = dns = dnsrebind = 0
+ total = len(data)
+ good = bad = inconclusive = 0
+ ssh = http = ssl = pop = imap = smtp = dns = dnsrebind = 0
- for result in data:
- nodeSet.add(result.exit_node)
-
- if result.status == 0:
- good += 1
- elif result.status == 1:
- inconclusive += 1
- elif result.status == 2:
- bad += 1
-
- if result.__class__.__name__ == 'SSHTestResult':
- sshSet.add(result.exit_node)
- ssh += 1
- elif result.__class__.__name__ == 'HttpTestResult':
- httpSet.add(result.exit_node)
- http += 1
- elif result.__class__.__name__ == 'SSLTestResult':
- sslSet.add(result.exit_node)
- ssl += 1
- elif result.__class__.__name__ == 'IMAPTestResult':
- imapSet.add(result.exit_node)
- imap += 1
- elif result.__class__.__name__ == 'POPTestResult':
- popSet.add(result.exit_node)
- pop += 1
- elif result.__class__.__name__ == 'SMTPTestResult':
- smtpSet.add(result.exit_node)
- smtp += 1
- elif result.__class__.__name__ == 'DNSTestResult':
- dnsSet.add(result.exit_node)
- dns += 1
- elif result.__class__.__name__ == 'DNSRebindTestResult':
- dnsrebindSet.add(result.exit_node)
- dnsrebind += 1
+ for result in data:
+ nodeSet.add(result.exit_node)
+
+ if result.status == 0:
+ good += 1
+ elif result.status == 1:
+ inconclusive += 1
+ elif result.status == 2:
+ bad += 1
+
+ if result.__class__.__name__ == 'SSHTestResult':
+ sshSet.add(result.exit_node)
+ ssh += 1
+ elif result.__class__.__name__ == 'HttpTestResult':
+ httpSet.add(result.exit_node)
+ http += 1
+ elif result.__class__.__name__ == 'SSLTestResult':
+ sslSet.add(result.exit_node)
+ ssl += 1
+ elif result.__class__.__name__ == 'IMAPTestResult':
+ imapSet.add(result.exit_node)
+ imap += 1
+ elif result.__class__.__name__ == 'POPTestResult':
+ popSet.add(result.exit_node)
+ pop += 1
+ elif result.__class__.__name__ == 'SMTPTestResult':
+ smtpSet.add(result.exit_node)
+ smtp += 1
+ elif result.__class__.__name__ == 'DNSTestResult':
+ dnsSet.add(result.exit_node)
+ dns += 1
+ elif result.__class__.__name__ == 'DNSRebindTestResult':
+ dnsrebindSet.add(result.exit_node)
+ dnsrebind += 1
- swidth = 25
- nwidth = 10
- width = swidth + nwidth
+ swidth = 25
+ nwidth = 10
+ width = swidth + nwidth
- header_format = '%-*s%*s'
- format = '%-*s%*i'
+ header_format = '%-*s%*s'
+ format = '%-*s%*i'
- print '=' * width
- print header_format % (swidth, 'Parameter', nwidth, 'Count')
- print '-' * width
+ print '=' * width
+ print header_format % (swidth, 'Parameter', nwidth, 'Count')
+ print '-' * width
- stats = [
- ('Tests completed', total),
- ('Nodes tested', len(nodeSet)),
- ('Nodes SSL-tested', len(sslSet)),
- ('Nodes HTTP-tested', len(httpSet)),
- ('Nodes SSH-tested', len(sshSet)),
- ('Nodes POP-tested', len(popSet)),
- ('Nodes IMAP-tested', len(imapSet)),
- ('Nodes SMTP-tested', len(smtpSet)),
- ('Nodes DNS-tested', len(dnsSet)),
- ('Nodes DNSRebind-tested', len(dnsrebindSet)),
- ('Failed tests', bad),
- ('Succeeded tests', good),
- ('Inconclusive tests', inconclusive),
- ('SSH tests', ssh),
- ('HTTP tests', http),
- ('SSL tests', ssl),
- ('POP tests', pop),
- ('IMAP tests', imap),
- ('SMTP tests', smtp),
- ('DNS tests', dns),
- ('DNS rebind tests', dnsrebind)
- ]
+ stats = [
+ ('Tests completed', total),
+ ('Nodes tested', len(nodeSet)),
+ ('Nodes SSL-tested', len(sslSet)),
+ ('Nodes HTTP-tested', len(httpSet)),
+ ('Nodes SSH-tested', len(sshSet)),
+ ('Nodes POP-tested', len(popSet)),
+ ('Nodes IMAP-tested', len(imapSet)),
+ ('Nodes SMTP-tested', len(smtpSet)),
+ ('Nodes DNS-tested', len(dnsSet)),
+ ('Nodes DNSRebind-tested', len(dnsrebindSet)),
+ ('Failed tests', bad),
+ ('Succeeded tests', good),
+ ('Inconclusive tests', inconclusive),
+ ('SSH tests', ssh),
+ ('HTTP tests', http),
+ ('SSL tests', ssl),
+ ('POP tests', pop),
+ ('IMAP tests', imap),
+ ('SMTP tests', smtp),
+ ('DNS tests', dns),
+ ('DNS rebind tests', dnsrebind)
+ ]
- for (k,v) in stats:
- print format % (swidth, k, nwidth, v)
- print '=' * width
+ for (k,v) in stats:
+ print format % (swidth, k, nwidth, v)
+ print '=' * width
- def Reply(self, input):
+ def Reply(self, input):
- good = bad = inconclusive = False
- protocols = []
+ good = bad = inconclusive = False
+ protocols = []
- if 'a' in input:
- good = bad = inconclusive = True
- protocols.extend(["ssh", "http", "ssl", "imap", "pop", "smtp"])
- else:
- good = 'g' in input
- bad = 'b' in input
- inconclusive = 'i' in input
+ if 'a' in input:
+ good = bad = inconclusive = True
+ protocols.extend(["ssh", "http", "ssl", "imap", "pop", "smtp"])
+ else:
+ good = 'g' in input
+ bad = 'b' in input
+ inconclusive = 'i' in input
- if 's' in input:
- protocols.append("ssh")
- if 'h' in input:
- protocols.append("http")
- if 'l' in input:
- protocols.append("ssl")
- if 'p' in input:
- protocols.append("imap")
- if 'o' in input:
- protocols.append("pop")
- if 't' in input:
- protocols.append("smtp")
- if 'd' in input:
- protocols.append("dns")
- if 'r' in input:
- protocols.append("dnsrebind")
+ if 's' in input:
+ protocols.append("ssh")
+ if 'h' in input:
+ protocols.append("http")
+ if 'l' in input:
+ protocols.append("ssl")
+ if 'p' in input:
+ protocols.append("imap")
+ if 'o' in input:
+ protocols.append("pop")
+ if 't' in input:
+ protocols.append("smtp")
+ if 'd' in input:
+ protocols.append("dns")
+ if 'r' in input:
+ protocols.append("dnsrebind")
- dh = DataHandler()
- data = dh.getAll()
- filtered = dh.filterResults(data, protocols, good, bad, inconclusive)
+ dh = DataHandler()
+ data = dh.getAll()
+ filtered = dh.filterResults(data, protocols, good, bad, inconclusive)
- nodewidth = 45
- typewidth = 10
- sitewidth = 30
- timewidth = 30
- statuswidth = 6
- width = nodewidth + typewidth + sitewidth + timewidth + statuswidth
+ nodewidth = 45
+ typewidth = 10
+ sitewidth = 30
+ timewidth = 30
+ statuswidth = 6
+ width = nodewidth + typewidth + sitewidth + timewidth + statuswidth
- format = '%-*s%-*s%-*s%-*s%-*s'
+ format = '%-*s%-*s%-*s%-*s%-*s'
- print '=' * width
- print format % (nodewidth, 'Exit node', typewidth, 'Test type', sitewidth, 'Remote site',
- timewidth, 'Time', statuswidth, 'Status')
- print '-' * width
- for result in filtered:
- print format % (nodewidth, `result.exit_node`,
- typewidth, result.__class__.__name__[:-10],
- sitewidth, result.site,
- timewidth, time.strftime("%a, %d %b %Y %H:%M:%S", time.localtime(result.timestamp)),
- statuswidth, `result.status`)
- print '=' * width
+ print '=' * width
+ print format % (nodewidth, 'Exit node', typewidth, 'Test type', sitewidth, 'Remote site',
+ timewidth, 'Time', statuswidth, 'Status')
+ print '-' * width
+ for result in filtered:
+ print format % (nodewidth, `result.exit_node`,
+ typewidth, result.__class__.__name__[:-10],
+ sitewidth, result.site,
+ timewidth, time.strftime("%a, %d %b %Y %H:%M:%S", time.localtime(result.timestamp)),
+ statuswidth, `result.status`)
+ print '=' * width
- def Help(self):
- print ''
- print 'Options:'
- print '* summmary (s) - display a short summary about all tests done so far'
- print '* exit (e) - terminate the program'
- print '* help (h) - display this help text'
- print '* all (a) - list all the results'
- print '* (shlgbi) - display a filtered list of test results. Letters are optional and mean the following:'
- print ' s - show ssh results'
- print ' h - show http results'
- print ' l - show ssl results'
- print ' g - show good results'
- print ' b - show bad results'
- print ' i - show inconclusive results'
- print ' p - show imap results'
- print ' o - show pop results'
- print ' t - show smtp results'
- print ' d - show dns results'
- print ' r - show dnsrebind results'
- print ''
+ def Help(self):
+ print ''
+ print 'Options:'
+ print '* summmary (s) - display a short summary about all tests done so far'
+ print '* exit (e) - terminate the program'
+ print '* help (h) - display this help text'
+ print '* all (a) - list all the results'
+ print '* (shlgbi) - display a filtered list of test results. Letters are optional and mean the following:'
+ print ' s - show ssh results'
+ print ' h - show http results'
+ print ' l - show ssl results'
+ print ' g - show good results'
+ print ' b - show bad results'
+ print ' i - show inconclusive results'
+ print ' p - show imap results'
+ print ' o - show pop results'
+ print ' t - show smtp results'
+ print ' d - show dns results'
+ print ' r - show dnsrebind results'
+ print ''
#
# Displaying stats in a graphical setting (first check if we have wx)
@@ -208,211 +208,211 @@
nowx = False
try:
- import wx
- from wx.lib.mixins.listctrl import ListCtrlAutoWidthMixin, ColumnSorterMixin
+ import wx
+ from wx.lib.mixins.listctrl import ListCtrlAutoWidthMixin, ColumnSorterMixin
except:
- nowx = True
+ nowx = True
if not nowx:
- class ListMixin(wx.ListCtrl, ListCtrlAutoWidthMixin, ColumnSorterMixin):
- def __init__(self, parent, map):
- wx.ListCtrl.__init__(self, parent, -1, style=wx.LC_REPORT)
- ListCtrlAutoWidthMixin.__init__(self)
- ColumnSorterMixin.__init__(self, len(map))
- self.itemDataMap = map
+ class ListMixin(wx.ListCtrl, ListCtrlAutoWidthMixin, ColumnSorterMixin):
+ def __init__(self, parent, map):
+ wx.ListCtrl.__init__(self, parent, -1, style=wx.LC_REPORT)
+ ListCtrlAutoWidthMixin.__init__(self)
+ ColumnSorterMixin.__init__(self, len(map))
+ self.itemDataMap = map
- def GetListCtrl(self):
- return self
+ def GetListCtrl(self):
+ return self
- # menu item ids
- ID_EXIT = 1
+ # menu item ids
+ ID_EXIT = 1
- ID_SHOW_GOOD = 11
- ID_SHOW_BAD = 12
- ID_SHOW_UNSURE = 13
+ ID_SHOW_GOOD = 11
+ ID_SHOW_BAD = 12
+ ID_SHOW_UNSURE = 13
- ID_SHOW_SSL = 21
- ID_SHOW_HTTP = 22
- ID_SHOW_SSH = 23
- ID_SHOW_SMTP = 24
- ID_SHOW_IMAP = 25
- ID_SHOW_POP = 26
- ID_SHOW_DNS = 27
- ID_SHOW_DNSREBIND = 28
+ ID_SHOW_SSL = 21
+ ID_SHOW_HTTP = 22
+ ID_SHOW_SSH = 23
+ ID_SHOW_SMTP = 24
+ ID_SHOW_IMAP = 25
+ ID_SHOW_POP = 26
+ ID_SHOW_DNS = 27
+ ID_SHOW_DNSREBIND = 28
- ID_NODE = 31
+ ID_NODE = 31
- class MainFrame(wx.Frame):
- ''' the main application window for displaying statistics with a GUI'''
- def __init__(self):
- wx.Frame.__init__(self, None, title="Soat test results", size=(900,500))
-
- # get the data
+ class MainFrame(wx.Frame):
+ ''' the main application window for displaying statistics with a GUI'''
+ def __init__(self):
+ wx.Frame.__init__(self, None, title="Soat test results", size=(900,500))
+
+ # get the data
- self.dataHandler = DataHandler()
- self.dataList = self.dataHandler.getAll()
- self.filteredList = self.dataList
+ self.dataHandler = DataHandler()
+ self.dataList = self.dataHandler.getAll()
+ self.filteredList = self.dataList
- # display it
-
- self.CreateStatusBar()
- self.initMenuBar()
- self.initContent()
+ # display it
+
+ self.CreateStatusBar()
+ self.initMenuBar()
+ self.initContent()
- self.Center()
- self.Show()
+ self.Center()
+ self.Show()
+
+ def initMenuBar(self):
+ fileMenu = wx.Menu()
+ fileMenu.Append(ID_EXIT, "E&xit", "Exit the program")
- def initMenuBar(self):
- fileMenu = wx.Menu()
- fileMenu.Append(ID_EXIT, "E&xit", "Exit the program")
-
- viewMenu = wx.Menu()
- self.showGood = viewMenu.Append(ID_SHOW_GOOD, 'Show &Good', 'Show sucessful test results', kind=wx.ITEM_CHECK)
- self.showBad = viewMenu.Append(ID_SHOW_BAD, 'Show &Bad', 'Show unsucessful test results', kind=wx.ITEM_CHECK)
- self.showUnsure = viewMenu.Append(ID_SHOW_UNSURE, 'Show &Inconclusive', 'Show inconclusive test results', kind=wx.ITEM_CHECK)
- viewMenu.AppendSeparator()
- self.showSSL = viewMenu.Append(ID_SHOW_SSL, 'Show SS&L', 'Show SSL test results', kind=wx.ITEM_CHECK)
- self.showHTTP = viewMenu.Append(ID_SHOW_HTTP, 'Show &HTTP', 'Show HTTP test results', kind=wx.ITEM_CHECK)
- self.showSSH = viewMenu.Append(ID_SHOW_SSH, 'Show &SSH', 'Show SSH test results', kind=wx.ITEM_CHECK)
- viewMenu.AppendSeparator()
- self.showSMTP = viewMenu.Append(ID_SHOW_SMTP, 'Show SMTP', 'Show SMTP test results', kind=wx.ITEM_CHECK)
- self.showIMAP = viewMenu.Append(ID_SHOW_IMAP, 'Show IMAP', 'Show IMAP test results', kind=wx.ITEM_CHECK)
- self.showPOP = viewMenu.Append(ID_SHOW_POP, 'Show POP', 'Show POP test results', kind=wx.ITEM_CHECK)
- viewMenu.AppendSeparator()
- self.showDNS = viewMenu.Append(ID_SHOW_DNS, 'Show DNS', 'Show DNS test results', kind=wx.ITEM_CHECK)
- self.showDNSRebind = viewMenu.Append(ID_SHOW_DNSREBIND, 'Show DNSRebind', 'Show DNS rebind test results', kind=wx.ITEM_CHECK)
- viewMenu.AppendSeparator()
- viewMenu.Append(ID_NODE, '&Find node...', 'View test results for a given node [NOT IMPLEMENTED]')
-
- menuBar = wx.MenuBar()
- menuBar.Append(fileMenu,"&File")
- menuBar.Append(viewMenu,"&View")
+ viewMenu = wx.Menu()
+ self.showGood = viewMenu.Append(ID_SHOW_GOOD, 'Show &Good', 'Show sucessful test results', kind=wx.ITEM_CHECK)
+ self.showBad = viewMenu.Append(ID_SHOW_BAD, 'Show &Bad', 'Show unsucessful test results', kind=wx.ITEM_CHECK)
+ self.showUnsure = viewMenu.Append(ID_SHOW_UNSURE, 'Show &Inconclusive', 'Show inconclusive test results', kind=wx.ITEM_CHECK)
+ viewMenu.AppendSeparator()
+ self.showSSL = viewMenu.Append(ID_SHOW_SSL, 'Show SS&L', 'Show SSL test results', kind=wx.ITEM_CHECK)
+ self.showHTTP = viewMenu.Append(ID_SHOW_HTTP, 'Show &HTTP', 'Show HTTP test results', kind=wx.ITEM_CHECK)
+ self.showSSH = viewMenu.Append(ID_SHOW_SSH, 'Show &SSH', 'Show SSH test results', kind=wx.ITEM_CHECK)
+ viewMenu.AppendSeparator()
+ self.showSMTP = viewMenu.Append(ID_SHOW_SMTP, 'Show SMTP', 'Show SMTP test results', kind=wx.ITEM_CHECK)
+ self.showIMAP = viewMenu.Append(ID_SHOW_IMAP, 'Show IMAP', 'Show IMAP test results', kind=wx.ITEM_CHECK)
+ self.showPOP = viewMenu.Append(ID_SHOW_POP, 'Show POP', 'Show POP test results', kind=wx.ITEM_CHECK)
+ viewMenu.AppendSeparator()
+ self.showDNS = viewMenu.Append(ID_SHOW_DNS, 'Show DNS', 'Show DNS test results', kind=wx.ITEM_CHECK)
+ self.showDNSRebind = viewMenu.Append(ID_SHOW_DNSREBIND, 'Show DNSRebind', 'Show DNS rebind test results', kind=wx.ITEM_CHECK)
+ viewMenu.AppendSeparator()
+ viewMenu.Append(ID_NODE, '&Find node...', 'View test results for a given node [NOT IMPLEMENTED]')
+
+ menuBar = wx.MenuBar()
+ menuBar.Append(fileMenu,"&File")
+ menuBar.Append(viewMenu,"&View")
- self.SetMenuBar(menuBar)
+ self.SetMenuBar(menuBar)
- wx.EVT_MENU(self, ID_EXIT, self.OnExit)
+ wx.EVT_MENU(self, ID_EXIT, self.OnExit)
- wx.EVT_MENU(self, ID_SHOW_GOOD, self.GenerateFilteredList)
- wx.EVT_MENU(self, ID_SHOW_BAD, self.GenerateFilteredList)
- wx.EVT_MENU(self, ID_SHOW_UNSURE, self.GenerateFilteredList)
- viewMenu.Check(ID_SHOW_GOOD, True)
- viewMenu.Check(ID_SHOW_BAD, True)
- viewMenu.Check(ID_SHOW_UNSURE, True)
-
- for i in range(ID_SHOW_SSL, ID_SHOW_DNSREBIND + 1):
- viewMenu.Check(i, True)
- wx.EVT_MENU(self, i, self.GenerateFilteredList)
+ wx.EVT_MENU(self, ID_SHOW_GOOD, self.GenerateFilteredList)
+ wx.EVT_MENU(self, ID_SHOW_BAD, self.GenerateFilteredList)
+ wx.EVT_MENU(self, ID_SHOW_UNSURE, self.GenerateFilteredList)
+ viewMenu.Check(ID_SHOW_GOOD, True)
+ viewMenu.Check(ID_SHOW_BAD, True)
+ viewMenu.Check(ID_SHOW_UNSURE, True)
+
+ for i in range(ID_SHOW_SSL, ID_SHOW_DNSREBIND + 1):
+ viewMenu.Check(i, True)
+ wx.EVT_MENU(self, i, self.GenerateFilteredList)
- def initContent(self):
- base = wx.Panel(self, -1)
- sizer = wx.GridBagSizer(0,0)
+ def initContent(self):
+ base = wx.Panel(self, -1)
+ sizer = wx.GridBagSizer(0,0)
- box = wx.StaticBox(base, -1, 'Summary')
- boxSizer = wx.StaticBoxSizer(box, wx.HORIZONTAL)
+ box = wx.StaticBox(base, -1, 'Summary')
+ boxSizer = wx.StaticBoxSizer(box, wx.HORIZONTAL)
- total = wx.StaticText(base, -1, 'Total tests: ' + `len(self.filteredList)`)
- boxSizer.Add(total, 0, wx.LEFT | wx.TOP | wx.BOTTOM, 10)
+ total = wx.StaticText(base, -1, 'Total tests: ' + `len(self.filteredList)`)
+ boxSizer.Add(total, 0, wx.LEFT | wx.TOP | wx.BOTTOM, 10)
- nodes = wx.StaticText(base, -1, 'Nodes scanned: ' + `len(Set([x.exit_node for x in self.filteredList]))`)
- boxSizer.Add(nodes, 0, wx.LEFT | wx.TOP | wx.BOTTOM , 10)
+ nodes = wx.StaticText(base, -1, 'Nodes scanned: ' + `len(Set([x.exit_node for x in self.filteredList]))`)
+ boxSizer.Add(nodes, 0, wx.LEFT | wx.TOP | wx.BOTTOM , 10)
- bad = wx.StaticText(base, -1, 'Failed tests: ' + `len([x for x in self.filteredList if x.status == 2])`)
- boxSizer.Add(bad, 0, wx.LEFT | wx.TOP | wx.BOTTOM, 10)
+ bad = wx.StaticText(base, -1, 'Failed tests: ' + `len([x for x in self.filteredList if x.status == 2])`)
+ boxSizer.Add(bad, 0, wx.LEFT | wx.TOP | wx.BOTTOM, 10)
- suspicious = wx.StaticText(base, -1, 'Inconclusive tests: ' + `len([x for x in self.filteredList if x.status == 1])`)
- boxSizer.Add(suspicious, 0, wx.ALL, 10)
+ suspicious = wx.StaticText(base, -1, 'Inconclusive tests: ' + `len([x for x in self.filteredList if x.status == 1])`)
+ boxSizer.Add(suspicious, 0, wx.ALL, 10)
- sizer.Add(boxSizer, (0,0), (1, 5), wx.EXPAND | wx.ALL, 15)
+ sizer.Add(boxSizer, (0,0), (1, 5), wx.EXPAND | wx.ALL, 15)
- dataMap = {}
- self.fillDataMap(dataMap)
-
- self.listCtrl = ListMixin(base, dataMap)
- self.listCtrl.InsertColumn(0, 'exit node', width=380)
- self.listCtrl.InsertColumn(1, 'type', width=70)
- self.listCtrl.InsertColumn(2, 'site', width=180)
- self.listCtrl.InsertColumn(3, 'time', width=180)
- self.listCtrl.InsertColumn(4, 'status', wx.LIST_FORMAT_CENTER, width=50)
+ dataMap = {}
+ self.fillDataMap(dataMap)
+
+ self.listCtrl = ListMixin(base, dataMap)
+ self.listCtrl.InsertColumn(0, 'exit node', width=380)
+ self.listCtrl.InsertColumn(1, 'type', width=70)
+ self.listCtrl.InsertColumn(2, 'site', width=180)
+ self.listCtrl.InsertColumn(3, 'time', width=180)
+ self.listCtrl.InsertColumn(4, 'status', wx.LIST_FORMAT_CENTER, width=50)
- self.fillListCtrl(dataMap)
-
- sizer.Add(self.listCtrl, (1,0), (1,5), wx.EXPAND | wx.LEFT | wx.BOTTOM | wx.RIGHT, border=15)
+ self.fillListCtrl(dataMap)
+
+ sizer.Add(self.listCtrl, (1,0), (1,5), wx.EXPAND | wx.LEFT | wx.BOTTOM | wx.RIGHT, border=15)
- sizer.AddGrowableCol(3)
- sizer.AddGrowableRow(1)
+ sizer.AddGrowableCol(3)
+ sizer.AddGrowableRow(1)
- base.SetSizerAndFit(sizer)
+ base.SetSizerAndFit(sizer)
- # make a nasty dictionary from the current self.filteredList object so columns would be sortable
- def fillDataMap(self, dataMap):
- for i in range(len(self.filteredList)):
- dataMap.update([(i,(self.filteredList[i].exit_node,
- self.filteredList[i].__class__.__name__[:-10],
- self.filteredList[i].site,
- time.strftime("%a, %d %b %Y %H:%M:%S", time.localtime(self.filteredList[i].timestamp)),
- self.filteredList[i].status))])
+ # make a nasty dictionary from the current self.filteredList object so columns would be sortable
+ def fillDataMap(self, dataMap):
+ for i in range(len(self.filteredList)):
+ dataMap.update([(i,(self.filteredList[i].exit_node,
+ self.filteredList[i].__class__.__name__[:-10],
+ self.filteredList[i].site,
+ time.strftime("%a, %d %b %Y %H:%M:%S", time.localtime(self.filteredList[i].timestamp)),
+ self.filteredList[i].status))])
- # fill the result listing with data
- def fillListCtrl(self, dataMap):
- if self.listCtrl.GetItemCount() > 0:
- self.listCtrl.DeleteAllItems()
+ # fill the result listing with data
+ def fillListCtrl(self, dataMap):
+ if self.listCtrl.GetItemCount() > 0:
+ self.listCtrl.DeleteAllItems()
- for k, i in dataMap.items():
- index = self.listCtrl.InsertStringItem(sys.maxint, `i[0]`)
- self.listCtrl.SetStringItem(index, 1, i[1])
- self.listCtrl.SetStringItem(index, 2, `i[2]`)
- self.listCtrl.SetStringItem(index, 3, i[3])
- self.listCtrl.SetStringItem(index, 4, `i[4]`)
- self.listCtrl.SetItemData(index,k)
+ for k, i in dataMap.items():
+ index = self.listCtrl.InsertStringItem(sys.maxint, `i[0]`)
+ self.listCtrl.SetStringItem(index, 1, i[1])
+ self.listCtrl.SetStringItem(index, 2, `i[2]`)
+ self.listCtrl.SetStringItem(index, 3, i[3])
+ self.listCtrl.SetStringItem(index, 4, `i[4]`)
+ self.listCtrl.SetItemData(index,k)
- def OnExit(self,e):
- self.Close(True)
+ def OnExit(self,e):
+ self.Close(True)
- def GenerateFilteredList(self, e):
- protocols = []
- if self.showSSH.IsChecked():
- protocols.append("ssh")
- if self.showHTTP.IsChecked():
- protocols.append("http")
- if self.showSSL.IsChecked():
- protocols.append("ssl")
- if self.showIMAP.IsChecked():
- protocols.append("imap")
- if self.showPOP.IsChecked():
- protocols.append("pop")
- if self.showSMTP.IsChecked():
- protocols.append("smtp")
- if self.showDNS.IsChecked():
- protocols.append("dns")
- if self.showDNSRebind.IsChecked():
- protocols.append("dnsrebind")
+ def GenerateFilteredList(self, e):
+ protocols = []
+ if self.showSSH.IsChecked():
+ protocols.append("ssh")
+ if self.showHTTP.IsChecked():
+ protocols.append("http")
+ if self.showSSL.IsChecked():
+ protocols.append("ssl")
+ if self.showIMAP.IsChecked():
+ protocols.append("imap")
+ if self.showPOP.IsChecked():
+ protocols.append("pop")
+ if self.showSMTP.IsChecked():
+ protocols.append("smtp")
+ if self.showDNS.IsChecked():
+ protocols.append("dns")
+ if self.showDNSRebind.IsChecked():
+ protocols.append("dnsrebind")
- self.filteredList = list(self.dataHandler.filterResults(self.dataList, protocols,
- self.showGood.IsChecked(), self.showBad.IsChecked(), self.showUnsure.IsChecked()))
+ self.filteredList = list(self.dataHandler.filterResults(self.dataList, protocols,
+ self.showGood.IsChecked(), self.showBad.IsChecked(), self.showUnsure.IsChecked()))
- dataMap = {}
- self.fillDataMap(dataMap)
- self.fillListCtrl(dataMap)
- self.listCtrl.RefreshItems(0, len(dataMap))
+ dataMap = {}
+ self.fillDataMap(dataMap)
+ self.fillListCtrl(dataMap)
+ self.listCtrl.RefreshItems(0, len(dataMap))
if __name__ == "__main__":
- if len(sys.argv) == 1:
- console = StatsConsole()
- console.Listen()
- elif len(sys.argv) == 2 and sys.argv[1] == 'wx':
- if nowx:
- print 'wxpython doesn\'t seem to be installed on your system'
- print 'you can use the console interface instead (see help)'
- else:
- app = wx.App(0)
- MainFrame()
- app.MainLoop()
+ if len(sys.argv) == 1:
+ console = StatsConsole()
+ console.Listen()
+ elif len(sys.argv) == 2 and sys.argv[1] == 'wx':
+ if nowx:
+ print 'wxpython doesn\'t seem to be installed on your system'
+ print 'you can use the console interface instead (see help)'
else:
- print ''
- print 'This app displays results of tests carried out by soat.py (in a user-friendly way).'
- print ''
- print 'Usage:'
- print 'python soatstats.py - app starts console-only'
- print 'python soatstats.py wx - app starts with a wxpython gui'
- print ''
+ app = wx.App(0)
+ MainFrame()
+ app.MainLoop()
+ else:
+ print ''
+ print 'This app displays results of tests carried out by soat.py (in a user-friendly way).'
+ print ''
+ print 'Usage:'
+ print 'python soatstats.py - app starts console-only'
+ print 'python soatstats.py wx - app starts with a wxpython gui'
+ print ''