[Author Prev][Author Next][Thread Prev][Thread Next][Author Index][Thread Index]
[or-cvs] r15742: Add external property. Keep TorCtl.local just in case. (in torflow/branches/gsoc2008: . TorCtl.local)
Author: mikeperry
Date: 2008-07-07 15:37:47 -0400 (Mon, 07 Jul 2008)
New Revision: 15742
Added:
torflow/branches/gsoc2008/TorCtl.local/
torflow/branches/gsoc2008/TorCtl.local/GeoIPSupport.py
torflow/branches/gsoc2008/TorCtl.local/PathSupport.py
torflow/branches/gsoc2008/TorCtl.local/README
torflow/branches/gsoc2008/TorCtl.local/StatsSupport.py
torflow/branches/gsoc2008/TorCtl.local/TorCtl.py
torflow/branches/gsoc2008/TorCtl.local/TorUtil.py
torflow/branches/gsoc2008/TorCtl.local/__init__.py
Removed:
torflow/branches/gsoc2008/TorCtl.local/GeoIPSupport.py
torflow/branches/gsoc2008/TorCtl.local/PathSupport.py
torflow/branches/gsoc2008/TorCtl.local/README
torflow/branches/gsoc2008/TorCtl.local/StatsSupport.py
torflow/branches/gsoc2008/TorCtl.local/TorCtl.py
torflow/branches/gsoc2008/TorCtl.local/TorUtil.py
torflow/branches/gsoc2008/TorCtl.local/__init__.py
torflow/branches/gsoc2008/TorCtl/
Modified:
torflow/branches/gsoc2008/
Log:
Add external property. Keep TorCtl.local just in case.
Property changes on: torflow/branches/gsoc2008
___________________________________________________________________
Name: svn:externals
+ TorCtl https://tor-svn.freehaven.net/svn/torctl/trunk/python/TorCtl
Copied: torflow/branches/gsoc2008/TorCtl.local (from rev 15734, torflow/branches/gsoc2008/TorCtl)
Deleted: torflow/branches/gsoc2008/TorCtl.local/GeoIPSupport.py
===================================================================
--- torflow/branches/gsoc2008/TorCtl/GeoIPSupport.py 2008-07-07 16:40:57 UTC (rev 15734)
+++ torflow/branches/gsoc2008/TorCtl.local/GeoIPSupport.py 2008-07-07 19:37:47 UTC (rev 15742)
@@ -1,132 +0,0 @@
-#!/usr/bin/python
-
-import struct
-import socket
-import GeoIP
-import TorCtl
-
-from TorUtil import plog
-
-# GeoIP data object: choose database here
-geoip = GeoIP.new(GeoIP.GEOIP_STANDARD)
-#geoip = GeoIP.open("./GeoLiteCity.dat", GeoIP.GEOIP_STANDARD)
-
-class Continent:
- """ Continent class: The group attribute is to partition the continents
- in groups, to determine the number of ocean crossings """
- def __init__(self, continent_code):
- self.code = continent_code
- self.group = None
- self.countries = []
-
- def contains(self, country_code):
- return country_code in self.countries
-
-# Set countries to continents
-africa = Continent("AF")
-africa.group = 1
-africa.countries = ["AO","BF","BI","BJ","BV","BW","CD","CF","CG","CI","CM",
- "CV","DJ","DZ","EG","EH","ER","ET","GA","GH","GM","GN","GQ","GW","HM","KE",
- "KM","LR","LS","LY","MA","MG","ML","MR","MU","MW","MZ","NA","NE","NG","RE",
- "RW","SC","SD","SH","SL","SN","SO","ST","SZ","TD","TF","TG","TN","TZ","UG",
- "YT","ZA","ZM","ZR","ZW"]
-
-asia = Continent("AS")
-asia.group = 1
-asia.countries = ["AP","AE","AF","AM","AZ","BD","BH","BN","BT","CC","CN","CX",
- "CY","GE","HK","ID","IL","IN","IO","IQ","IR","JO","JP","KG","KH","KP","KR",
- "KW","KZ","LA","LB","LK","MM","MN","MO","MV","MY","NP","OM","PH","PK","PS",
- "QA","RU","SA","SG","SY","TH","TJ","TM","TP","TR","TW","UZ","VN","YE"]
-
-europe = Continent("EU")
-europe.group = 1
-europe.countries = ["EU","AD","AL","AT","BA","BE","BG","BY","CH","CZ","DE",
- "DK","EE","ES","FI","FO","FR","FX","GB","GI","GR","HR","HU","IE","IS","IT",
- "LI","LT","LU","LV","MC","MD","MK","MT","NL","NO","PL","PT","RO","SE","SI",
- "SJ","SK","SM","UA","VA","YU"]
-
-oceania = Continent("OC")
-oceania.group = 2
-oceania.countries = ["AS","AU","CK","FJ","FM","GU","KI","MH","MP","NC","NF",
- "NR","NU","NZ","PF","PG","PN","PW","SB","TK","TO","TV","UM","VU","WF","WS"]
-
-north_america = Continent("NA")
-north_america.group = 0
-north_america.countries = ["CA","MX","US"]
-
-south_america = Continent("SA")
-south_america.group = 0
-south_america.countries = ["AG","AI","AN","AR","AW","BB","BM","BO","BR","BS",
- "BZ","CL","CO","CR","CU","DM","DO","EC","FK","GD","GF","GL","GP","GS","GT",
- "GY","HN","HT","JM","KN","KY","LC","MQ","MS","NI","PA","PE","PM","PR","PY",
- "SA","SR","SV","TC","TT","UY","VC","VE","VG","VI"]
-
-# List of continents
-continents = [africa, asia, europe, north_america, oceania, south_america]
-
-def get_continent(country_code):
- """ Perform country -- continent mapping """
- for c in continents:
- if c.contains(country_code):
- return c
- plog("INFO", country_code + " is not on any continent")
- return None
-
-def get_country(ip):
- """ Get the country via the library """
- return geoip.country_code_by_addr(ip)
-
-def get_country_from_record(ip):
- """ Get the country code out of a GeoLiteCity record (not used) """
- record = geoip.record_by_addr(ip)
- if record != None:
- return record['country_code']
-
-class GeoIPRouter(TorCtl.Router):
- """ Router class extended to GeoIP """
- def __init__(self, router):
- self.__dict__ = router.__dict__
- self.country_code = get_country(self.get_ip_dotted())
- if self.country_code != None:
- c = get_continent(self.country_code)
- if c != None:
- self.continent = c.code
- self.cont_group = c.group
- else:
- plog("INFO", self.nickname + ": Country code not found")
- self.continent = None
-
- def get_ip_dotted(self):
- """ Convert long int back to dotted quad string """
- return socket.inet_ntoa(struct.pack('>I', self.ip))
-
-class GeoIPConfig:
- """ Class to configure GeoIP-based path building """
- def __init__(self, unique_countries, continent_crossings, ocean_crossings,
- entry_country, middle_country, exit_country, excludes):
- # TODO: Somehow ensure validity of a configuration:
- # - continent_crossings >= ocean_crossings
- # - unique_countries=False --> continent_crossings!=None
- # - echelon? set entry_country to source and exit_country to None
-
- # Do not use a country twice in a route
- # [True --> unique, False --> same or None --> pass]
- self.unique_countries = unique_countries
-
- # Configure max continent crossings in one path
- # [integer number 0-n or None --> ContinentJumper/UniqueContinent]
- self.continent_crossings = continent_crossings
- self.ocean_crossings = ocean_crossings
-
- # Try to find an exit node in the destination country
- # use exit_country as backup, if country cannot not be found
- self.echelon = False
-
- # Specify countries for positions [single country code or None]
- self.entry_country = entry_country
- self.middle_country = middle_country
- self.exit_country = exit_country
-
- # List of countries not to use in routes
- # [(empty) list of country codes or None]
- self.excludes = excludes
Copied: torflow/branches/gsoc2008/TorCtl.local/GeoIPSupport.py (from rev 15741, torflow/branches/gsoc2008/TorCtl/GeoIPSupport.py)
===================================================================
--- torflow/branches/gsoc2008/TorCtl.local/GeoIPSupport.py (rev 0)
+++ torflow/branches/gsoc2008/TorCtl.local/GeoIPSupport.py 2008-07-07 19:37:47 UTC (rev 15742)
@@ -0,0 +1,132 @@
+#!/usr/bin/python
+
+import struct
+import socket
+import GeoIP
+import TorCtl
+
+from TorUtil import plog
+
+# GeoIP data object: choose database here
+geoip = GeoIP.new(GeoIP.GEOIP_STANDARD)
+#geoip = GeoIP.open("./GeoLiteCity.dat", GeoIP.GEOIP_STANDARD)
+
+class Continent:
+ """ Continent class: The group attribute is to partition the continents
+ in groups, to determine the number of ocean crossings """
+ def __init__(self, continent_code):
+ self.code = continent_code
+ self.group = None
+ self.countries = []
+
+ def contains(self, country_code):
+ return country_code in self.countries
+
+# Set countries to continents
+africa = Continent("AF")
+africa.group = 1
+africa.countries = ["AO","BF","BI","BJ","BV","BW","CD","CF","CG","CI","CM",
+ "CV","DJ","DZ","EG","EH","ER","ET","GA","GH","GM","GN","GQ","GW","HM","KE",
+ "KM","LR","LS","LY","MA","MG","ML","MR","MU","MW","MZ","NA","NE","NG","RE",
+ "RW","SC","SD","SH","SL","SN","SO","ST","SZ","TD","TF","TG","TN","TZ","UG",
+ "YT","ZA","ZM","ZR","ZW"]
+
+asia = Continent("AS")
+asia.group = 1
+asia.countries = ["AP","AE","AF","AM","AZ","BD","BH","BN","BT","CC","CN","CX",
+ "CY","GE","HK","ID","IL","IN","IO","IQ","IR","JO","JP","KG","KH","KP","KR",
+ "KW","KZ","LA","LB","LK","MM","MN","MO","MV","MY","NP","OM","PH","PK","PS",
+ "QA","RU","SA","SG","SY","TH","TJ","TM","TP","TR","TW","UZ","VN","YE"]
+
+europe = Continent("EU")
+europe.group = 1
+europe.countries = ["EU","AD","AL","AT","BA","BE","BG","BY","CH","CZ","DE",
+ "DK","EE","ES","FI","FO","FR","FX","GB","GI","GR","HR","HU","IE","IS","IT",
+ "LI","LT","LU","LV","MC","MD","MK","MT","NL","NO","PL","PT","RO","SE","SI",
+ "SJ","SK","SM","UA","VA","YU"]
+
+oceania = Continent("OC")
+oceania.group = 2
+oceania.countries = ["AS","AU","CK","FJ","FM","GU","KI","MH","MP","NC","NF",
+ "NR","NU","NZ","PF","PG","PN","PW","SB","TK","TO","TV","UM","VU","WF","WS"]
+
+north_america = Continent("NA")
+north_america.group = 0
+north_america.countries = ["CA","MX","US"]
+
+south_america = Continent("SA")
+south_america.group = 0
+south_america.countries = ["AG","AI","AN","AR","AW","BB","BM","BO","BR","BS",
+ "BZ","CL","CO","CR","CU","DM","DO","EC","FK","GD","GF","GL","GP","GS","GT",
+ "GY","HN","HT","JM","KN","KY","LC","MQ","MS","NI","PA","PE","PM","PR","PY",
+ "SA","SR","SV","TC","TT","UY","VC","VE","VG","VI"]
+
+# List of continents
+continents = [africa, asia, europe, north_america, oceania, south_america]
+
+def get_continent(country_code):
+ """ Perform country -- continent mapping """
+ for c in continents:
+ if c.contains(country_code):
+ return c
+ plog("INFO", country_code + " is not on any continent")
+ return None
+
+def get_country(ip):
+ """ Get the country via the library """
+ return geoip.country_code_by_addr(ip)
+
+def get_country_from_record(ip):
+ """ Get the country code out of a GeoLiteCity record (not used) """
+ record = geoip.record_by_addr(ip)
+ if record != None:
+ return record['country_code']
+
+class GeoIPRouter(TorCtl.Router):
+ """ Router class extended to GeoIP """
+ def __init__(self, router):
+ self.__dict__ = router.__dict__
+ self.country_code = get_country(self.get_ip_dotted())
+ if self.country_code != None:
+ c = get_continent(self.country_code)
+ if c != None:
+ self.continent = c.code
+ self.cont_group = c.group
+ else:
+ plog("INFO", self.nickname + ": Country code not found")
+ self.continent = None
+
+ def get_ip_dotted(self):
+ """ Convert long int back to dotted quad string """
+ return socket.inet_ntoa(struct.pack('>I', self.ip))
+
+class GeoIPConfig:
+ """ Class to configure GeoIP-based path building """
+ def __init__(self, unique_countries, continent_crossings, ocean_crossings,
+ entry_country, middle_country, exit_country, excludes):
+ # TODO: Somehow ensure validity of a configuration:
+ # - continent_crossings >= ocean_crossings
+ # - unique_countries=False --> continent_crossings!=None
+ # - echelon? set entry_country to source and exit_country to None
+
+ # Do not use a country twice in a route
+ # [True --> unique, False --> same or None --> pass]
+ self.unique_countries = unique_countries
+
+ # Configure max continent crossings in one path
+ # [integer number 0-n or None --> ContinentJumper/UniqueContinent]
+ self.continent_crossings = continent_crossings
+ self.ocean_crossings = ocean_crossings
+
+ # Try to find an exit node in the destination country
+ # use exit_country as backup, if country cannot not be found
+ self.echelon = False
+
+ # Specify countries for positions [single country code or None]
+ self.entry_country = entry_country
+ self.middle_country = middle_country
+ self.exit_country = exit_country
+
+ # List of countries not to use in routes
+ # [(empty) list of country codes or None]
+ self.excludes = excludes
Deleted: torflow/branches/gsoc2008/TorCtl.local/PathSupport.py
===================================================================
--- torflow/branches/gsoc2008/TorCtl/PathSupport.py 2008-07-07 16:40:57 UTC (rev 15734)
+++ torflow/branches/gsoc2008/TorCtl.local/PathSupport.py 2008-07-07 19:37:47 UTC (rev 15742)
@@ -1,1712 +0,0 @@
-#!/usr/bin/python
-"""
-
-Support classes for path construction
-
-The PathSupport package builds on top of TorCtl.TorCtl. It provides a
-number of interfaces that make path construction easier.
-
-The inheritance diagram for event handling is as follows:
-TorCtl.EventHandler <- PathBuilder <- CircuitHandler <- StreamHandler.
-
-Basically, EventHandler is what gets all the control port events
-packaged in nice clean classes (see help(TorCtl) for information on
-those).
-
-PathBuilder inherits from EventHandler and is what builds all circuits
-based on the requirements specified in the SelectionManager instance
-passed to its constructor. It also handles attaching streams to
-circuits. It only handles one building one circuit at a time.
-
-CircuitHandler optionally inherits from PathBuilder, and overrides its
-circuit event handling to manage building a pool of circuits as opposed
-to just one. It still uses the SelectionManager for path selection.
-
-StreamHandler inherits from CircuitHandler, and is what governs the
-attachment of an incoming stream on to one of the multiple circuits of
-the circuit handler.
-
-The SelectionManager is essentially a configuration wrapper around the
-most elegant portions of TorFlow: NodeGenerators, NodeRestrictions, and
-PathRestrictions. In the SelectionManager, a NodeGenerator is used to
-choose the nodes probabilistically according to some distribution while
-obeying the NodeRestrictions. These generators (one per hop) are handed
-off to the PathSelector, which uses the generators to build a complete
-path that satisfies the PathRestriction requirements.
-
-Have a look at the class hierarchy directly below to get a feel for how
-the restrictions fit together, and what options are available.
-
-"""
-
-import TorCtl
-import re
-import struct
-import random
-import socket
-import copy
-import Queue
-import time
-import TorUtil
-from TorUtil import *
-
-__all__ = ["NodeRestrictionList", "PathRestrictionList",
-"PercentileRestriction", "OSRestriction", "ConserveExitsRestriction",
-"FlagsRestriction", "MinBWRestriction", "VersionIncludeRestriction",
-"VersionExcludeRestriction", "ExitPolicyRestriction", "NodeRestriction",
-"PathRestriction", "OrNodeRestriction", "MetaNodeRestriction",
-"AtLeastNNodeRestriction", "NotNodeRestriction", "Subnet16Restriction",
-"UniqueRestriction", "NodeGenerator", "UniformGenerator",
-"OrderedExitGenerator", "BwWeightedGenerator", "PathSelector",
-"Connection", "NickRestriction", "IdHexRestriction", "PathBuilder",
-"CircuitHandler", "StreamHandler", "SelectionManager",
-"CountryCodeRestriction", "CountryRestriction",
-"UniqueCountryRestriction", "SingleCountryRestriction",
-"ContinentRestriction", "ContinentJumperRestriction",
-"UniqueContinentRestriction"]
-
-#################### Path Support Interfaces #####################
-
-class NodeRestriction:
- "Interface for node restriction policies"
- def r_is_ok(self, r):
- "Returns true if Router 'r' is acceptable for this restriction"
- return True
-
-class NodeRestrictionList:
- "Class to manage a list of NodeRestrictions"
- def __init__(self, restrictions):
- "Constructor. 'restrictions' is a list of NodeRestriction instances"
- self.restrictions = restrictions
-
- def r_is_ok(self, r):
- "Returns true of Router 'r' passes all of the contained restrictions"
- for rs in self.restrictions:
- if not rs.r_is_ok(r): return False
- return True
-
- def add_restriction(self, restr):
- "Add a NodeRestriction 'restr' to the list of restrictions"
- self.restrictions.append(restr)
-
- # TODO: This does not collapse meta restrictions..
- def del_restriction(self, RestrictionClass):
- """Remove all restrictions of type RestrictionClass from the list.
- Does NOT inspect or collapse MetaNode Restrictions (though
- MetaRestrictions can be removed if RestrictionClass is
- MetaNodeRestriction)"""
- self.restrictions = filter(
- lambda r: not isinstance(r, RestrictionClass),
- self.restrictions)
-
-class PathRestriction:
- "Interface for path restriction policies"
- def path_is_ok(self, path):
- "Return true if the list of Routers in path satisfies this restriction"
- return True
-
-class PathRestrictionList:
- """Class to manage a list of PathRestrictions"""
- def __init__(self, restrictions):
- "Constructor. 'restrictions' is a list of PathRestriction instances"
- self.restrictions = restrictions
-
- def path_is_ok(self, path):
- "Given list if Routers in 'path', check it against each restriction."
- for rs in self.restrictions:
- if not rs.path_is_ok(path):
- return False
- return True
-
- def add_restriction(self, rstr):
- "Add a PathRestriction 'rstr' to the list"
- self.restrictions.append(rstr)
-
- def del_restriction(self, RestrictionClass):
- "Remove all PathRestrictions of type RestrictionClass from the list."
- self.restrictions = filter(
- lambda r: not isinstance(r, RestrictionClass),
- self.restrictions)
-
-class NodeGenerator:
- "Interface for node generation"
- def __init__(self, sorted_r, rstr_list):
- """Constructor. Takes a bandwidth-sorted list of Routers 'sorted_r'
- and a NodeRestrictionList 'rstr_list'"""
- self.rstr_list = rstr_list # Check me before you yield!
- self.sorted_r = sorted_r
- self.rewind()
-
- def reset_restriction(self, rstr_list):
- "Reset the restriction list to a new list"
- self.rstr_list = rstr_list
-
- def rewind(self):
- "Rewind the generator to the 'beginning'"
- # FIXME: If we apply the restrictions now, we can save cycles
- # during selection, and also some memory overhead (at the cost
- # of a much slower rewind() though..)
- self.routers = filter(lambda r: self.rstr_list.r_is_ok(r), self.sorted_r)
- #self.routers = copy.copy(self.sorted_r)
-
- def mark_chosen(self, r):
- """Mark a router as chosen: remove it from the list of routers
- that can be returned in the future"""
- self.routers.remove(r)
-
- def all_chosen(self):
- "Return true if all the routers have been marked as chosen"
- return not self.routers
-
- def generate(self):
- "Return a python generator that yields routers according to the policy"
- raise NotImplemented()
-
-class Connection(TorCtl.Connection):
- """Extended Connection class that provides a method for building circuits"""
- def __init__(self, sock):
- TorCtl.Connection.__init__(self,sock)
- def build_circuit(self, pathlen, path_sel):
- "Tell Tor to build a circuit chosen by the PathSelector 'path_sel'"
- circ = Circuit()
- circ.path = path_sel.build_path(pathlen)
- circ.exit = circ.path[pathlen-1]
- circ.circ_id = self.extend_circuit(0, circ.id_path())
- return circ
-
-######################## Node Restrictions ########################
-
-# TODO: We still need more path support implementations
-# - NodeRestrictions:
-# - Uptime/LongLivedPorts (Does/should hibernation count?)
-# - Published/Updated
-# - PathRestrictions:
-# - Family
-# - GeoIP:
-# - OceanPhobicRestrictor (avoids Pacific Ocean or two atlantic crossings)
-# or ContinentRestrictor (avoids doing more than N continent crossings)
-# - Mathematical/empirical study of predecessor expectation
-# - If middle node on the same continent as exit, exit learns nothing
-# - else, exit has a bias on the continent of origin of user
-# - Language and browser accept string determine this anyway
-# - EchelonPhobicRestrictor
-# - Does not cross international boundaries for client->Entry or
-# Exit->destination hops
-
-class PercentileRestriction(NodeRestriction):
- """Restriction to cut out a percentile slice of the network."""
- def __init__(self, pct_skip, pct_fast, r_list):
- """Constructor. Sets up the restriction such that routers in the
- 'pct_skip' to 'pct_fast' percentile of bandwidth rankings are
- returned from the sorted list 'r_list'"""
- self.pct_fast = pct_fast
- self.pct_skip = pct_skip
- self.sorted_r = r_list
-
- def r_is_ok(self, r):
- "Returns true if r is in the percentile boundaries (by rank)"
- # Hrmm.. technically we shouldn't count non-running routers in this..
- # but that is tricky to do efficiently.
- # XXX: Is there any reason why sorted_r should have non-running
- # routers in the first place?
-
- if r.list_rank < len(self.sorted_r)*self.pct_skip/100: return False
- elif r.list_rank > len(self.sorted_r)*self.pct_fast/100: return False
-
- return True
-
-class OSRestriction(NodeRestriction):
- "Restriction based on operating system"
- def __init__(self, ok, bad=[]):
- """Constructor. Accept router OSes that match regexes in 'ok',
- rejects those that match regexes in 'bad'."""
- self.ok = ok
- self.bad = bad
-
- def r_is_ok(self, r):
- "Returns true if r is in 'ok', false if 'r' is in 'bad'. If 'ok'"
- for y in self.ok:
- if re.search(y, r.os):
- return True
- for b in self.bad:
- if re.search(b, r.os):
- return False
- if self.ok: return False
- if self.bad: return True
-
-class ConserveExitsRestriction(NodeRestriction):
- "Restriction to reject exits from selection"
- # XXX: Make this adaptive by ip/port
- def r_is_ok(self, r): return not "Exit" in r.flags
-
-class FlagsRestriction(NodeRestriction):
- "Restriction for mandatory and forbidden router flags"
- def __init__(self, mandatory, forbidden=[]):
- """Constructor. 'mandatory' and 'forbidden' are both lists of router
- flags as strings."""
- self.mandatory = mandatory
- self.forbidden = forbidden
-
- def r_is_ok(self, router):
- for m in self.mandatory:
- if not m in router.flags: return False
- for f in self.forbidden:
- if f in router.flags: return False
- return True
-
-class NickRestriction(NodeRestriction):
- """Require that the node nickname is as specified"""
- def __init__(self, nickname):
- self.nickname = nickname
-
- def r_is_ok(self, router):
- return router.nickname == self.nickname
-
-class IdHexRestriction(NodeRestriction):
- """Require that the node idhash is as specified"""
- def __init__(self, idhex):
- if idhex[0] == '$':
- self.idhex = idhex[1:].upper()
- else:
- self.idhex = idhex.upper()
-
- def r_is_ok(self, router):
- return router.idhex == self.idhex
-
-class MinBWRestriction(NodeRestriction):
- """Require a minimum bandwidth"""
- def __init__(self, minbw):
- self.min_bw = minbw
-
- def r_is_ok(self, router): return router.bw >= self.min_bw
-
-class VersionIncludeRestriction(NodeRestriction):
- """Require that the version match one in the list"""
- def __init__(self, eq):
- "Constructor. 'eq' is a list of versions as strings"
- self.eq = map(TorCtl.RouterVersion, eq)
-
- def r_is_ok(self, router):
- """Returns true if the version of 'router' matches one of the
- specified versions."""
- for e in self.eq:
- if e == router.version:
- return True
- return False
-
-class VersionExcludeRestriction(NodeRestriction):
- """Require that the version not match one in the list"""
- def __init__(self, exclude):
- "Constructor. 'exclude' is a list of versions as strings"
- self.exclude = map(TorCtl.RouterVersion, exclude)
-
- def r_is_ok(self, router):
- """Returns false if the version of 'router' matches one of the
- specified versions."""
- for e in self.exclude:
- if e == router.version:
- return False
- return True
-
-class VersionRangeRestriction(NodeRestriction):
- """Require that the versions be inside a specified range"""
- def __init__(self, gr_eq, less_eq=None):
- self.gr_eq = TorCtl.RouterVersion(gr_eq)
- if less_eq: self.less_eq = TorCtl.RouterVersion(less_eq)
- else: self.less_eq = None
-
- def r_is_ok(self, router):
- return (not self.gr_eq or router.version >= self.gr_eq) and \
- (not self.less_eq or router.version <= self.less_eq)
-
-class ExitPolicyRestriction(NodeRestriction):
- """Require that a router exit to an ip+port"""
- def __init__(self, to_ip, to_port):
- self.to_ip = to_ip
- self.to_port = to_port
-
- def r_is_ok(self, r): return r.will_exit_to(self.to_ip, self.to_port)
-
-class MetaNodeRestriction(NodeRestriction):
- """Interface for a NodeRestriction that is an expression consisting of
- multiple other NodeRestrictions"""
- # TODO: these should collapse the restriction and return a new
- # instance for re-insertion (or None)
- def next_rstr(self): raise NotImplemented()
- def del_restriction(self, RestrictionClass): raise NotImplemented()
-
-class OrNodeRestriction(MetaNodeRestriction):
- """MetaNodeRestriction that is the boolean or of two or more
- NodeRestrictions"""
- def __init__(self, rs):
- "Constructor. 'rs' is a list of NodeRestrictions"
- self.rstrs = rs
-
- def r_is_ok(self, r):
- "Returns true if one of 'rs' is true for this router"
- for rs in self.rstrs:
- if rs.r_is_ok(r):
- return True
- return False
-
-class NotNodeRestriction(MetaNodeRestriction):
- """Negates a single restriction"""
- def __init__(self, a):
- self.a = a
-
- def r_is_ok(self, r): return not self.a.r_is_ok(r)
-
-class AtLeastNNodeRestriction(MetaNodeRestriction):
- """MetaNodeRestriction that is true if at least n member
- restrictions are true."""
- def __init__(self, rstrs, n):
- self.rstrs = rstrs
- self.n = n
-
- def r_is_ok(self, r):
- cnt = 0
- for rs in self.rstrs:
- if rs.r_is_ok(r):
- cnt += 1
- if cnt < self.n: return False
- else: return True
-
-
-#################### Path Restrictions #####################
-
-class Subnet16Restriction(PathRestriction):
- """PathRestriction that mandates that no two nodes from the same
- /16 subnet be in the path"""
- def path_is_ok(self, path):
- mask16 = struct.unpack(">I", socket.inet_aton("255.255.0.0"))[0]
- ip16 = path[0].ip & mask16
- for r in path[1:]:
- if ip16 == (r.ip & mask16):
- return False
- return True
-
-class UniqueRestriction(PathRestriction):
- """Path restriction that mandates that the same router can't appear more
- than once in a path"""
- def path_is_ok(self, path):
- for i in xrange(0,len(path)):
- if path[i] in path[:i]:
- return False
- return True
-
-#################### GeoIP Restrictions ###################
-
-class CountryCodeRestriction(NodeRestriction):
- """ Ensure that the country_code is set """
- def r_is_ok(self, r):
- return r.country_code != None
-
-class CountryRestriction(NodeRestriction):
- """ Only accept nodes that are in 'country_code' """
- def __init__(self, country_code):
- self.country_code = country_code
-
- def r_is_ok(self, r):
- return r.country_code == self.country_code
-
-class ExcludeCountriesRestriction(NodeRestriction):
- """ Exclude a list of countries """
- def __init__(self, countries):
- self.countries = countries
-
- def r_is_ok(self, r):
- return not (r.country_code in self.countries)
-
-class UniqueCountryRestriction(PathRestriction):
- """ Ensure every router to have a distinct country_code """
- def path_is_ok(self, path):
- for i in xrange(0, len(path)-1):
- for j in xrange(i+1, len(path)):
- if path[i].country_code == path[j].country_code:
- return False;
- return True;
-
-class SingleCountryRestriction(PathRestriction):
- """ Ensure every router to have the same country_code """
- def path_is_ok(self, path):
- country_code = path[0].country_code
- for r in path:
- if country_code != r.country_code:
- return False
- return True
-
-class ContinentRestriction(PathRestriction):
- """ Do not more than n continent crossings """
- # TODO: Add src and dest
- def __init__(self, n, src=None, dest=None):
- self.n = n
-
- def path_is_ok(self, path):
- crossings = 0
- prev = None
- # Compute crossings until now
- for r in path:
- # Jump over the first router
- if prev:
- if r.continent != prev.continent:
- crossings += 1
- prev = r
- if crossings > self.n: return False
- else: return True
-
-class ContinentJumperRestriction(PathRestriction):
- """ Ensure continent crossings between all hops """
- def path_is_ok(self, path):
- prev = None
- for r in path:
- # Jump over the first router
- if prev:
- if r.continent == prev.continent:
- return False
- prev = r
- return True
-
-class UniqueContinentRestriction(PathRestriction):
- """ Ensure every hop to be on a different continent """
- def path_is_ok(self, path):
- for i in xrange(0, len(path)-1):
- for j in xrange(i+1, len(path)):
- if path[i].continent == path[j].continent:
- return False;
- return True;
-
-class OceanPhobicRestriction(PathRestriction):
- """ Not more than n ocean crossings """
- # TODO: Add src and dest
- def __init__(self, n, src=None, dest=None):
- self.n = n
-
- def path_is_ok(self, path):
- crossings = 0
- prev = None
- # Compute ocean crossings until now
- for r in path:
- # Jump over the first router
- if prev:
- if r.cont_group != prev.cont_group:
- crossings += 1
- prev = r
- if crossings > self.n: return False
- else: return True
-
-#################### Node Generators ######################
-
-class UniformGenerator(NodeGenerator):
- """NodeGenerator that produces nodes in the uniform distribution"""
- def generate(self):
- while not self.all_chosen():
- r = random.choice(self.routers)
- if self.rstr_list.r_is_ok(r): yield r
-
-class OrderedExitGenerator(NodeGenerator):
- """NodeGenerator that produces exits in an ordered fashion for a
- specific port"""
- def __init__(self, to_port, sorted_r, rstr_list):
- self.to_port = to_port
- self.next_exit_by_port = {}
- NodeGenerator.__init__(self, sorted_r, rstr_list)
-
- def rewind(self):
- if self.to_port not in self.next_exit_by_port or not self.next_exit_by_port[self.to_port]:
- self.next_exit_by_port[self.to_port] = 0
- self.last_idx = len(self.sorted_r)
- else:
- self.last_idx = self.next_exit_by_port[self.to_port]
-
- def set_port(self, port):
- self.to_port = port
- self.rewind()
-
- def mark_chosen(self, r):
- self.next_exit_by_port[self.to_port] += 1
-
- def all_chosen(self):
- return self.last_idx == self.next_exit_by_port[self.to_port]
-
- def generate(self):
- while True: # A do..while would be real nice here..
- if self.next_exit_by_port[self.to_port] >= len(self.sorted_r):
- self.next_exit_by_port[self.to_port] = 0
- r = self.sorted_r[self.next_exit_by_port[self.to_port]]
- if self.rstr_list.r_is_ok(r): yield r
- else: self.next_exit_by_port[self.to_port] += 1
- if self.last_idx == self.next_exit_by_port[self.to_port]:
- break
-
-class BwWeightedGenerator(NodeGenerator):
- """
-
- This is a generator designed to match the Tor Path Selection
- algorithm. It will generate nodes weighted by their bandwidth,
- but take the appropriate weighting into account against guard
- nodes and exit nodes when they are chosen for positions other
- than guard/exit. For background see:
- routerlist.c::smartlist_choose_by_bandwidth(),
- http://archives.seul.org/or/dev/Jul-2007/msg00021.html,
- http://archives.seul.org/or/dev/Jul-2007/msg00056.html, and
- https://tor-svn.freehaven.net/svn/tor/trunk/doc/spec/path-spec.txt
- The formulas used are from the first or-dev link, but are proven
- optimal and equivalent to the ones now used in routerlist.c in the
- second or-dev link.
-
- """
- def __init__(self, sorted_r, rstr_list, pathlen, exit=False, guard=False):
- """ Pass exit=True to create a generator for exit-nodes """
- self.max_bandwidth = 10000000
- # Out for an exit-node?
- self.exit = exit
- # Is this a guard node?
- self.guard = guard
- # Different sums of bandwidths
- self.total_bw = 0
- self.total_exit_bw = 0
- self.total_guard_bw = 0
- self.total_weighted_bw = 0
- self.pathlen = pathlen
- NodeGenerator.__init__(self, sorted_r, rstr_list)
-
- def rewind(self):
- NodeGenerator.rewind(self)
- # Set the exit_weight
- # We are choosing a non-exit
- self.total_exit_bw = 0
- self.total_guard_bw = 0
- self.total_bw = 0
- for r in self.sorted_r:
- # Should this be outside the restriction checks?
- # TODO: Check max_bandwidth and cap...
- if self.rstr_list.r_is_ok(r):
- self.total_bw += r.bw
- if "Exit" in r.flags:
- self.total_exit_bw += r.bw
- if "Guard" in r.flags:
- self.total_guard_bw += r.bw
-
- bw_per_hop = (1.0*self.total_bw)/self.pathlen
-
- # Print some debugging info about bandwidth ratios
- if self.total_bw > 0:
- e_ratio = self.total_exit_bw/float(self.total_bw)
- g_ratio = self.total_guard_bw/float(self.total_bw)
- else:
- g_ratio = 0
- e_ratio = 0
- plog("DEBUG",
- "E = " + str(self.total_exit_bw) +
- ", G = " + str(self.total_guard_bw) +
- ", T = " + str(self.total_bw) +
- ", g_ratio = " + str(g_ratio) + ", e_ratio = " +str(e_ratio) +
- ", bw_per_hop = " + str(bw_per_hop))
-
- if self.exit:
- self.exit_weight = 1.0
- else:
- if self.total_exit_bw < bw_per_hop:
- # Don't use exit nodes at all
- self.exit_weight = 0
- else:
- if self.total_exit_bw > 0:
- self.exit_weight = ((self.total_exit_bw-bw_per_hop)/self.total_exit_bw)
- else: self.exit_weight = 0
-
- if self.guard:
- self.guard_weight = 1.0
- else:
- if self.total_guard_bw < bw_per_hop:
- # Don't use exit nodes at all
- self.guard_weight = 0
- else:
- if self.total_guard_bw > 0:
- self.guard_weight = ((self.total_guard_bw-bw_per_hop)/self.total_guard_bw)
- else: self.guard_weight = 0
-
- for r in self.sorted_r:
- bw = r.bw
- if "Exit" in r.flags:
- bw *= self.exit_weight
- if "Guard" in r.flags:
- bw *= self.guard_weight
- self.total_weighted_bw += bw
-
- self.total_weighted_bw = int(self.total_weighted_bw)
- plog("DEBUG", "Bw: "+str(self.total_weighted_bw)+"/"+str(self.total_bw)
- +". The exit-weight is: "+str(self.exit_weight)
- + ", guard weight is: "+str(self.guard_weight))
-
- def generate(self):
- while True:
- # Choose a suitable random int
- i = random.randint(0, self.total_weighted_bw)
-
- # Go through the routers
- for r in self.routers:
- # Below zero here means next() -> choose a new random int+router
- if i < 0: break
- if self.rstr_list.r_is_ok(r):
- bw = r.bw
- if "Exit" in r.flags:
- bw *= self.exit_weight
- if "Guard" in r.flags:
- bw *= self.guard_weight
-
- i -= bw
- if i < 0:
- plog("DEBUG", "Chosen router with a bandwidth of: " + str(r.bw))
- yield r
-
-####################### Secret Sauce ###########################
-
-class PathError(Exception):
- pass
-
-class NoRouters(PathError):
- pass
-
-class PathSelector:
- """Implementation of path selection policies. Builds a path according
- to entry, middle, and exit generators that satisfies the path
- restrictions."""
- def __init__(self, entry_gen, mid_gen, exit_gen, path_restrict):
- """Constructor. The first three arguments are NodeGenerators with
- their appropriate restrictions. The 'path_restrict' is a
- PathRestrictionList"""
- self.entry_gen = entry_gen
- self.mid_gen = mid_gen
- self.exit_gen = exit_gen
- self.path_restrict = path_restrict
-
- def build_path(self, pathlen):
- """Creates a path of 'pathlen' hops, and returns it as a list of
- Router instances"""
- self.entry_gen.rewind()
- self.mid_gen.rewind()
- self.exit_gen.rewind()
- entry = self.entry_gen.generate()
- mid = self.mid_gen.generate()
- ext = self.exit_gen.generate()
-
- while True:
- path = []
- try:
- if pathlen == 1:
- path = [ext.next()]
- else:
- path.append(entry.next())
- for i in xrange(1, pathlen-1):
- path.append(mid.next())
- path.append(ext.next())
- if self.path_restrict.path_is_ok(path):
- self.entry_gen.mark_chosen(path[0])
- for i in xrange(1, pathlen-1):
- self.mid_gen.mark_chosen(path[i])
- self.exit_gen.mark_chosen(path[pathlen-1])
- break
- except StopIteration:
- plog("NOTICE", "Ran out of routers during buildpath..");
- self.entry_gen.rewind()
- self.mid_gen.rewind()
- self.exit_gen.rewind()
- entry = self.entry_gen.generate()
- mid = self.entry_gen.generate()
- ext = self.entry_gen.generate()
- return path
-
-class SelectionManager:
- """Helper class to handle configuration updates
-
- The methods are NOT threadsafe. They may ONLY be called from
- EventHandler's thread. This means that to update the selection
- manager, you must schedule a config update job using
- PathBuilder.schedule_selmgr() with a worker function to modify
- this object.
- """
- def __init__(self, pathlen, order_exits,
- percent_fast, percent_skip, min_bw, use_all_exits,
- uniform, use_exit, use_guards,geoip_config=None,restrict_guards=False):
- self.__ordered_exit_gen = None
- self.pathlen = pathlen
- self.order_exits = order_exits
- self.percent_fast = percent_fast
- self.percent_skip = percent_skip
- self.min_bw = min_bw
- self.use_all_exits = use_all_exits
- self.uniform = uniform
- self.exit_name = use_exit
- self.use_guards = use_guards
- self.geoip_config = geoip_config
-
- self.restrict_guards_only = restrict_guards
-
- def reconfigure(self, sorted_r):
- """This function is called after a configuration change,
- to rebuild the RestrictionLists."""
- if self.use_all_exits:
- self.path_rstr = PathRestrictionList([UniqueRestriction()])
- else:
- self.path_rstr = PathRestrictionList(
- [Subnet16Restriction(), UniqueRestriction()])
-
- if self.use_guards: entry_flags = ["Guard", "Valid", "Running"]
- else: entry_flags = ["Valid", "Running"]
-
-
- if self.restrict_guards_only:
- entry_flags = ["Guard","Valid","Running"]
- nonentry_skip = 0
- nonentry_fast = 100
- else:
- entry_flags = ["Valid","Running"]
- nonentry_skip = self.percent_skip
- nonentry_fast = self.percent_fast
-
- entry_rstr = NodeRestrictionList(
- [PercentileRestriction(self.percent_skip, self.percent_fast, sorted_r),
- ConserveExitsRestriction(),
- FlagsRestriction(entry_flags, [])]
- )
- mid_rstr = NodeRestrictionList(
- [PercentileRestriction(nonentry_skip, nonentry_fast, sorted_r),
- ConserveExitsRestriction(),
- FlagsRestriction(["Running","Fast"], [])]
-
- )
- if self.use_all_exits:
- self.exit_rstr = NodeRestrictionList(
- [FlagsRestriction(["Valid", "Running","Fast"], ["BadExit"])])
- else:
- self.exit_rstr = NodeRestrictionList(
- [PercentileRestriction(nonentry_skip, nonentry_fast, sorted_r),
- FlagsRestriction(["Valid", "Running","Fast"], ["BadExit"])])
-
- if self.exit_name:
- self.exit_rstr.del_restriction(IdHexRestriction)
- self.exit_rstr.del_restriction(NickRestriction)
- if self.exit_name[0] == '$':
- self.exit_rstr.add_restriction(IdHexRestriction(self.exit_name))
- else:
- self.exit_rstr.add_restriction(NickRestriction(self.exit_name))
-
- # GeoIP configuration
- if self.geoip_config:
- # Every node needs country_code
- entry_rstr.add_restriction(CountryCodeRestriction())
- mid_rstr.add_restriction(CountryCodeRestriction())
- self.exit_rstr.add_restriction(CountryCodeRestriction())
-
- # Specified countries for different positions
- if self.geoip_config.entry_country:
- entry_rstr.add_restriction(CountryRestriction(self.geoip_config.entry_country))
- if self.geoip_config.middle_country:
- mid_rstr.add_restriction(CountryRestriction(self.geoip_config.middle_country))
- if self.geoip_config.exit_country:
- self.exit_rstr.add_restriction(CountryRestriction(self.geoip_config.exit_country))
-
- # Excluded countries
- if self.geoip_config.excludes:
- plog("INFO", "Excluded countries: " + str(self.geoip_config.excludes))
- if len(self.geoip_config.excludes) > 0:
- entry_rstr.add_restriction(ExcludeCountriesRestriction(self.geoip_config.excludes))
- mid_rstr.add_restriction(ExcludeCountriesRestriction(self.geoip_config.excludes))
- self.exit_rstr.add_restriction(ExcludeCountriesRestriction(self.geoip_config.excludes))
-
- # Unique countries set? None --> pass
- if self.geoip_config.unique_countries != None:
- if self.geoip_config.unique_countries:
- # If True: unique countries
- self.path_rstr.add_restriction(UniqueCountryRestriction())
- else:
- # False: use the same country for all nodes in a path
- self.path_rstr.add_restriction(SingleCountryRestriction())
-
- # Specify max number of continent crossings, None means UniqueContinents
- if self.geoip_config.continent_crossings == None:
- self.path_rstr.add_restriction(UniqueContinentRestriction())
- else: self.path_rstr.add_restriction(ContinentRestriction(self.geoip_config.continent_crossings))
- # Should even work in combination with continent crossings
- if self.geoip_config.ocean_crossings != None:
- self.path_rstr.add_restriction(OceanPhobicRestriction(self.geoip_config.ocean_crossings))
-
- # This is kind of hokey..
- if self.order_exits:
- if self.__ordered_exit_gen:
- exitgen = self.__ordered_exit_gen
- exitgen.reset_restriction(self.exit_rstr)
- else:
- exitgen = self.__ordered_exit_gen = \
- OrderedExitGenerator(80, sorted_r, self.exit_rstr)
- elif self.uniform:
- # 'real' exits should also be chosen when not using 'order_exits'
- self.exit_rstr.add_restriction(ExitPolicyRestriction("255.255.255.255", 80))
- exitgen = UniformGenerator(sorted_r, self.exit_rstr)
- else:
- self.exit_rstr.add_restriction(ExitPolicyRestriction("255.255.255.255", 80))
- exitgen = BwWeightedGenerator(sorted_r, self.exit_rstr, self.pathlen, exit=True)
-
- if self.uniform:
- self.path_selector = PathSelector(
- UniformGenerator(sorted_r, entry_rstr),
- UniformGenerator(sorted_r, mid_rstr),
- exitgen, self.path_rstr)
- else:
- # Remove ConserveExitsRestrictions for entry and middle positions
- entry_rstr.del_restriction(ConserveExitsRestriction)
- mid_rstr.del_restriction(ConserveExitsRestriction)
- self.path_selector = PathSelector(
- BwWeightedGenerator(sorted_r, entry_rstr, self.pathlen,
- guard=self.use_guards),
- BwWeightedGenerator(sorted_r, mid_rstr, self.pathlen),
- exitgen, self.path_rstr)
- return
-
- def set_target(self, ip, port):
- "Called to update the ExitPolicyRestrictions with a new ip and port"
- self.exit_rstr.del_restriction(ExitPolicyRestriction)
- self.exit_rstr.add_restriction(ExitPolicyRestriction(ip, port))
- if self.__ordered_exit_gen: self.__ordered_exit_gen.set_port(port)
- # Try to choose an exit node in the destination country
- # needs an IP != 255.255.255.255
- if self.geoip_config and self.geoip_config.echelon:
- import GeoIPSupport
- c = GeoIPSupport.get_country(ip)
- if c:
- plog("INFO", "[Echelon] IP "+ip+" is in ["+c+"]")
- self.exit_rstr.del_restriction(CountryRestriction)
- self.exit_rstr.add_restriction(CountryRestriction(c))
- else:
- plog("INFO", "[Echelon] Could not determine destination country of IP "+ip)
- # Try to use a backup country
- if self.geoip_config.exit_country:
- self.exit_rstr.del_restriction(CountryRestriction)
- self.exit_rstr.add_restriction(CountryRestriction(self.geoip_config.exit_country))
-
-class Circuit:
- "Class to describe a circuit"
- def __init__(self):
- self.circ_id = 0
- self.path = [] # routers
- self.exit = None
- self.built = False
- self.failed = False
- self.dirty = False
- self.closed = False
- self.detached_cnt = 0
- self.last_extended_at = time.time()
- self.extend_times = [] # List of all extend-durations
- self.setup_duration = None # Sum of extend-times
- self.pending_streams = [] # Which stream IDs are pending us
-
- def id_path(self):
- "Returns a list of idhex keys for the path of Routers"
- return map(lambda r: r.idhex, self.path)
-
-class Stream:
- "Class to describe a stream"
- def __init__(self, sid, host, port, kind):
- self.strm_id = sid
- self.detached_from = [] # circ id #'s
- self.pending_circ = None
- self.circ = None
- self.host = host
- self.port = port
- self.kind = kind
- self.attached_at = 0
- self.bytes_read = 0
- self.bytes_written = 0
- self.failed = False
- self.ignored = False # Set if PURPOSE=DIR_*
- self.failed_reason = None # Cheating a little.. Only used by StatsHandler
-
- def lifespan(self, now):
- "Returns the age of the stream"
- return now-self.attached_at
-
-# TODO: Make passive "PathWatcher" so people can get aggregate
-# node reliability stats for normal usage without us attaching streams
-
-class PathBuilder(TorCtl.EventHandler):
- """
- PathBuilder implementation. Handles circuit construction, subject
- to the constraints of the SelectionManager selmgr.
-
- Do not access this object from other threads. Instead, use the
- schedule_* functions to schedule work to be done in the thread
- of the EventHandler.
- """
- def __init__(self, c, selmgr, RouterClass):
- """Constructor. 'c' is a Connection, 'selmgr' is a SelectionManager,
- and 'RouterClass' is a class that inherits from Router and is used
- to create annotated Routers."""
- TorCtl.EventHandler.__init__(self)
- self.c = c
- nslist = c.get_network_status()
- self.last_exit = None
- self.new_nym = False
- self.resolve_port = 0
- self.num_circuits = 1
- self.RouterClass = RouterClass
- self.sorted_r = []
- self.name_to_key = {}
- self.routers = {}
- self.circuits = {}
- self.streams = {}
- self.read_routers(nslist)
- self.selmgr = selmgr
- self.selmgr.reconfigure(self.sorted_r)
- self.imm_jobs = Queue.Queue()
- self.low_prio_jobs = Queue.Queue()
- self.run_all_jobs = False
- self.do_reconfigure = False
- plog("INFO", "Read "+str(len(self.sorted_r))+"/"+str(len(nslist))+" routers")
-
- def schedule_immediate(self, job):
- """
- Schedules an immediate job to be run before the next event is
- processed.
- """
- self.imm_jobs.put(job)
-
- def schedule_low_prio(self, job):
- """
- Schedules a job to be run when a non-time critical event arrives.
- """
- self.low_prio_jobs.put(job)
-
- def schedule_selmgr(self, job):
- """
- Schedules an immediate job to be run before the next event is
- processed. Also notifies the selection manager that it needs
- to update itself.
- """
- def notlambda(this):
- job(this.selmgr)
- this.do_reconfigure = True
- self.schedule_immediate(notlambda)
-
-
- def heartbeat_event(self, event):
- """This function handles dispatching scheduled jobs. If you
- extend PathBuilder and want to implement this function for
- some reason, be sure to call the parent class"""
- while not self.imm_jobs.empty():
- imm_job = self.imm_jobs.get_nowait()
- imm_job(self)
-
- if self.do_reconfigure:
- self.selmgr.reconfigure(self.sorted_r)
- self.do_reconfigure = False
-
- if self.run_all_jobs:
- self.run_all_jobs = False
- while not self.low_prio_jobs.empty():
- imm_job = self.low_prio_jobs.get_nowait()
- imm_job(self)
- return
-
- # If event is stream:NEW*/DETACHED or circ BUILT/FAILED,
- # don't run low prio jobs.. No need to delay streams for them.
- if isinstance(event, TorCtl.CircuitEvent):
- if event.status in ("BUILT", "FAILED"):
- return
- elif isinstance(event, TorCtl.StreamEvent):
- if event.status in ("NEW", "NEWRESOLVE", "DETACHED"):
- return
-
- # Do the low prio jobs one at a time in case a
- # higher priority event is queued
- if not self.low_prio_jobs.empty():
- delay_job = self.low_prio_jobs.get_nowait()
- delay_job(self)
-
- def read_routers(self, nslist):
- routers = self.c.read_routers(nslist)
- new_routers = []
- for r in routers:
- self.name_to_key[r.nickname] = "$"+r.idhex
- if r.idhex in self.routers:
- if self.routers[r.idhex].nickname != r.nickname:
- plog("NOTICE", "Router "+r.idhex+" changed names from "
- +self.routers[r.idhex].nickname+" to "+r.nickname)
- # Must do IN-PLACE update to keep all the refs to this router
- # valid and current (especially for stats)
- self.routers[r.idhex].update_to(r)
- else:
- rc = self.RouterClass(r)
- self.routers[rc.idhex] = rc
- new_routers.append(rc)
- self.sorted_r.extend(new_routers)
- self.sorted_r.sort(lambda x, y: cmp(y.bw, x.bw))
- for i in xrange(len(self.sorted_r)): self.sorted_r[i].list_rank = i
-
- def build_path(self):
- """ Get a path from the SelectionManager's PathSelector, can be used
- e.g. for generating paths without actually creating any circuits """
- return self.selmgr.path_selector.build_path(self.selmgr.pathlen)
-
- def attach_stream_any(self, stream, badcircs):
- "Attach a stream to a valid circuit, avoiding any in 'badcircs'"
- # Newnym, and warn if not built plus pending
- unattached_streams = [stream]
- if self.new_nym:
- self.new_nym = False
- plog("DEBUG", "Obeying new nym")
- for key in self.circuits.keys():
- if (not self.circuits[key].dirty
- and len(self.circuits[key].pending_streams)):
- plog("WARN", "New nym called, destroying circuit "+str(key)
- +" with "+str(len(self.circuits[key].pending_streams))
- +" pending streams")
- unattached_streams.extend(self.circuits[key].pending_streams)
- self.circuits[key].pending_streams.clear()
- # FIXME: Consider actually closing circ if no streams.
- self.circuits[key].dirty = True
-
- for circ in self.circuits.itervalues():
- if circ.built and not circ.dirty and circ.circ_id not in badcircs:
- if circ.exit.will_exit_to(stream.host, stream.port):
- try:
- self.c.attach_stream(stream.strm_id, circ.circ_id)
- stream.pending_circ = circ # Only one possible here
- circ.pending_streams.append(stream)
- except TorCtl.ErrorReply, e:
- # No need to retry here. We should get the failed
- # event for either the circ or stream next
- plog("WARN", "Error attaching stream: "+str(e.args))
- return
- break
- else:
- circ = None
- self.selmgr.set_target(stream.host, stream.port)
- while circ == None:
- try:
- circ = self.c.build_circuit(
- self.selmgr.pathlen,
- self.selmgr.path_selector)
- except TorCtl.ErrorReply, e:
- # FIXME: How come some routers are non-existant? Shouldn't
- # we have gotten an NS event to notify us they
- # disappeared?
- plog("NOTICE", "Error building circ: "+str(e.args))
- for u in unattached_streams:
- plog("DEBUG",
- "Attaching "+str(u.strm_id)+" pending build of "+str(circ.circ_id))
- u.pending_circ = circ
- circ.pending_streams.extend(unattached_streams)
- self.circuits[circ.circ_id] = circ
- self.last_exit = circ.exit
-
- def circ_status_event(self, c):
- output = [c.event_name, str(c.circ_id), c.status]
- if c.path: output.append(",".join(c.path))
- if c.reason: output.append("REASON=" + c.reason)
- if c.remote_reason: output.append("REMOTE_REASON=" + c.remote_reason)
- plog("DEBUG", " ".join(output))
- # Circuits we don't control get built by Tor
- if c.circ_id not in self.circuits:
- plog("DEBUG", "Ignoring circ " + str(c.circ_id))
- return
- if c.status == "EXTENDED":
- self.circuits[c.circ_id].last_extended_at = c.arrived_at
- elif c.status == "FAILED" or c.status == "CLOSED":
- # XXX: Can still get a STREAM FAILED for this circ after this
- circ = self.circuits[c.circ_id]
- del self.circuits[c.circ_id]
- for stream in circ.pending_streams:
- plog("DEBUG", "Finding new circ for " + str(stream.strm_id))
- self.attach_stream_any(stream, stream.detached_from)
- elif c.status == "BUILT":
- self.circuits[c.circ_id].built = True
- try:
- for stream in self.circuits[c.circ_id].pending_streams:
- self.c.attach_stream(stream.strm_id, c.circ_id)
- except TorCtl.ErrorReply, e:
- # No need to retry here. We should get the failed
- # event for either the circ or stream next
- plog("WARN", "Error attaching stream: "+str(e.args))
- return
-
- def stream_status_event(self, s):
- output = [s.event_name, str(s.strm_id), s.status, str(s.circ_id),
- s.target_host, str(s.target_port)]
- if s.reason: output.append("REASON=" + s.reason)
- if s.remote_reason: output.append("REMOTE_REASON=" + s.remote_reason)
- if s.purpose: output.append("PURPOSE=" + s.purpose)
- plog("DEBUG", " ".join(output))
- if not re.match(r"\d+.\d+.\d+.\d+", s.target_host):
- s.target_host = "255.255.255.255" # ignore DNS for exit policy check
-
- # Hack to ignore Tor-handled streams (Currently only directory streams)
- if s.strm_id in self.streams and self.streams[s.strm_id].ignored:
- plog("DEBUG", "Ignoring stream: " + str(s.strm_id))
- return
-
- if s.status == "NEW" or s.status == "NEWRESOLVE":
- if s.status == "NEWRESOLVE" and not s.target_port:
- s.target_port = self.resolve_port
- self.streams[s.strm_id] = Stream(s.strm_id, s.target_host, s.target_port, s.status)
- # Remember Tor-handled streams (Currently only directory streams)
- if s.purpose and s.purpose.find("DIR_") == 0:
- self.streams[s.strm_id].ignored = True
- plog("DEBUG", "Ignoring stream: " + str(s.strm_id))
- return
- else:
- self.attach_stream_any(self.streams[s.strm_id],
- self.streams[s.strm_id].detached_from)
- elif s.status == "DETACHED":
- if s.strm_id not in self.streams:
- plog("WARN", "Detached stream "+str(s.strm_id)+" not found")
- self.streams[s.strm_id] = Stream(s.strm_id, s.target_host,
- s.target_port, "NEW")
- # FIXME Stats (differentiate Resolved streams also..)
- if not s.circ_id:
- plog("WARN", "Stream "+str(s.strm_id)+" detached from no circuit!")
- else:
- self.streams[s.strm_id].detached_from.append(s.circ_id)
-
- if self.streams[s.strm_id] in self.streams[s.strm_id].pending_circ.pending_streams:
- self.streams[s.strm_id].pending_circ.pending_streams.remove(self.streams[s.strm_id])
- self.streams[s.strm_id].pending_circ = None
- self.attach_stream_any(self.streams[s.strm_id],
- self.streams[s.strm_id].detached_from)
- elif s.status == "SUCCEEDED":
- if s.strm_id not in self.streams:
- plog("NOTICE", "Succeeded stream "+str(s.strm_id)+" not found")
- return
- if s.circ_id and self.streams[s.strm_id].pending_circ.circ_id != s.circ_id:
- # Hrmm.. this can happen on a new-nym.. Very rare, putting warn
- # in because I'm still not sure this is correct
- plog("WARN", "Mismatch of pending: "
- +str(self.streams[s.strm_id].pending_circ.circ_id)+" vs "
- +str(s.circ_id))
- # This can happen if the circuit existed before we started up
- if s.circ_id in self.circuits:
- self.streams[s.strm_id].circ = self.circuits[s.circ_id]
- else:
- plog("NOTICE", "Stream "+str(s.strm_id)+" has unknown circuit: "+str(s.circ_id))
- else:
- self.streams[s.strm_id].circ = self.streams[s.strm_id].pending_circ
- self.streams[s.strm_id].pending_circ.pending_streams.remove(self.streams[s.strm_id])
- self.streams[s.strm_id].pending_circ = None
- self.streams[s.strm_id].attached_at = s.arrived_at
- elif s.status == "FAILED" or s.status == "CLOSED":
- # FIXME stats
- if s.strm_id not in self.streams:
- plog("NOTICE", "Failed stream "+str(s.strm_id)+" not found")
- return
-
- if not s.circ_id:
- plog("WARN", "Stream "+str(s.strm_id)+" failed from no circuit!")
-
- # We get failed and closed for each stream. OK to return
- # and let the closed do the cleanup
- if s.status == "FAILED":
- # Avoid busted circuits that will not resolve or carry
- # traffic.
- self.streams[s.strm_id].failed = True
- if s.circ_id in self.circuits: self.circuits[s.circ_id].dirty = True
- else: plog("WARN","Failed stream on unknown circ "+str(s.circ_id))
- return
-
- if self.streams[s.strm_id].pending_circ:
- self.streams[s.strm_id].pending_circ.pending_streams.remove(self.streams[s.strm_id])
- del self.streams[s.strm_id]
- elif s.status == "REMAP":
- if s.strm_id not in self.streams:
- plog("WARN", "Remap id "+str(s.strm_id)+" not found")
- else:
- if not re.match(r"\d+.\d+.\d+.\d+", s.target_host):
- s.target_host = "255.255.255.255"
- plog("NOTICE", "Non-IP remap for "+str(s.strm_id)+" to "
- + s.target_host)
- self.streams[s.strm_id].host = s.target_host
- self.streams[s.strm_id].port = s.target_port
-
- def stream_bw_event(self, s):
- output = [s.event_name, str(s.strm_id), str(s.bytes_read),
- str(s.bytes_written)]
- plog("DEBUG", " ".join(output))
- if not s.strm_id in self.streams:
- plog("WARN", "BW event for unknown stream id: "+str(s.strm_id))
- else:
- self.streams[s.strm_id].bytes_read += s.bytes_read
- self.streams[s.strm_id].bytes_written += s.bytes_written
-
- def ns_event(self, n):
- self.read_routers(n.nslist)
- plog("DEBUG", "Read " + str(len(n.nslist))+" NS => "
- + str(len(self.sorted_r)) + " routers")
-
- def new_desc_event(self, d):
- for i in d.idlist: # Is this too slow?
- self.read_routers(self.c.get_network_status("id/"+i))
- plog("DEBUG", "Read " + str(len(d.idlist))+" Desc => "
- + str(len(self.sorted_r)) + " routers")
-
- def bandwidth_event(self, b): pass # For heartbeat only..
-
-################### CircuitHandler #############################
-
-class CircuitHandler(PathBuilder):
- """ CircuitHandler that extends from PathBuilder to handle multiple
- circuits as opposed to just one. """
- def __init__(self, c, selmgr, num_circuits, RouterClass):
- """Constructor. 'c' is a Connection, 'selmgr' is a SelectionManager,
- 'num_circuits' is the number of circuits to keep in the pool,
- and 'RouterClass' is a class that inherits from Router and is used
- to create annotated Routers."""
- PathBuilder.__init__(self, c, selmgr, RouterClass)
- # Set handler to the connection here to
- # not miss any circuit events on startup
- c.set_event_handler(self)
- self.num_circuits = num_circuits # Size of the circuit pool
- self.check_circuit_pool() # Bring up the pool of circs
-
- def check_circuit_pool(self):
- """ Init or check the status of the circuit-pool """
- # Get current number of circuits
- n = len(self.circuits.values())
- i = self.num_circuits-n
- if i > 0:
- plog("INFO", "Checked pool of circuits: we need to build " +
- str(i) + " circuits")
- # Schedule (num_circs-n) circuit-buildups
- while (n < self.num_circuits):
- self.build_circuit("255.255.255.255", 80)
- plog("DEBUG", "Scheduled circuit No. " + str(n+1))
- n += 1
-
- def build_circuit(self, host, port):
- """ Build a circuit """
- circ = None
- while circ == None:
- try:
- self.selmgr.set_target(host, port)
- circ = self.c.build_circuit(self.selmgr.pathlen,
- self.selmgr.path_selector)
- self.circuits[circ.circ_id] = circ
- return circ
- except TorCtl.ErrorReply, e:
- # FIXME: How come some routers are non-existant? Shouldn't
- # we have gotten an NS event to notify us they disappeared?
- plog("NOTICE", "Error building circuit: " + str(e.args))
-
- def close_circuit(self, id):
- """ Close a circuit with given id """
- # TODO: Pass streams to another circ before closing?
- self.circuits[id].closed = True
- try: self.c.close_circuit(id)
- except TorCtl.ErrorReply, e:
- plog("ERROR", "Failed closing circuit " + str(id) + ": " + str(e))
-
- def circ_status_event(self, c):
- """ Handle circuit status events """
- output = [c.event_name, str(c.circ_id), c.status]
- if c.path: output.append(",".join(c.path))
- if c.reason: output.append("REASON=" + c.reason)
- if c.remote_reason: output.append("REMOTE_REASON=" + c.remote_reason)
- plog("DEBUG", " ".join(output))
-
- # Circuits we don't control get built by Tor
- if c.circ_id not in self.circuits:
- plog("DEBUG", "Ignoring circuit " + str(c.circ_id) +
- " (controlled by Tor)")
- return
-
- # EXTENDED
- if c.status == "EXTENDED":
- # Compute elapsed time
- extend_time = c.arrived_at-self.circuits[c.circ_id].last_extended_at
- self.circuits[c.circ_id].extend_times.append(extend_time)
- plog("INFO", "Circuit " + str(c.circ_id) + " extended in " +
- str(extend_time) + " sec")
- self.circuits[c.circ_id].last_extended_at = c.arrived_at
-
- # FAILED & CLOSED
- elif c.status == "FAILED" or c.status == "CLOSED":
- # XXX: Can still get a STREAM FAILED for this circ after this
- circ = self.circuits[c.circ_id]
- # Actual removal of the circ
- del self.circuits[c.circ_id]
- # Give away pending streams
- for stream in circ.pending_streams:
- plog("DEBUG", "Finding new circ for " + str(stream.strm_id))
- self.attach_stream_any(stream, stream.detached_from)
- # Check if there are enough circs
- self.check_circuit_pool()
- return
-
- # BUILT
- elif c.status == "BUILT":
- circ = self.circuits[c.circ_id]
- circ.built = True
- for stream in circ.pending_streams:
- try:
- self.c.attach_stream(stream.strm_id, c.circ_id)
- except TorCtl.ErrorReply, e:
- # No need to retry here. We should get the failed
- # event for either the circ or stream next
- plog("WARN", "Error attaching stream: " + str(e.args))
- # Compute duration by summing up extend_times
- duration = reduce(lambda x, y: x+y, circ.extend_times, 0.0)
- plog("INFO", "Circuit " + str(c.circ_id) + " needed " +
- str(duration) + " seconds to be built")
- # Save the duration to the circuit for later use
- circ.setup_duration = duration
-
- # OTHER?
- else:
- # If this was e.g. a LAUNCHED
- pass
-
-################### StreamHandler ##############################
-
-class StreamHandler(CircuitHandler):
- """ StreamHandler that extends from the CircuitHandler
- to handle attaching streams to an appropriate circuit
- in the pool. """
- def __init__(self, c, selmgr, num_circs, RouterClass):
- CircuitHandler.__init__(self, c, selmgr, num_circs, RouterClass)
- self.sorted_circs = None # optional sorted list
-
- def clear_dns_cache(self):
- """ Send signal CLEARDNSCACHE """
- lines = self.c.sendAndRecv("SIGNAL CLEARDNSCACHE\r\n")
- for _, msg, more in lines:
- plog("DEBUG", "CLEARDNSCACHE: " + msg)
-
- def close_stream(self, id, reason):
- """ Close a stream with given id and reason """
- self.c.close_stream(id, reason)
-
- def create_and_attach(self, stream, unattached_streams):
- """ Create a new circuit and attach (stream + unattached_streams) """
- circ = self.build_circuit(stream.host, stream.port)
- if circ:
- for u in unattached_streams:
- plog("DEBUG", "Attaching " + str(u.strm_id) +
- " pending build of circuit " + str(circ.circ_id))
- u.pending_circ = circ
- circ.pending_streams.extend(unattached_streams)
- self.circuits[circ.circ_id] = circ
- self.last_exit = circ.exit
-
- def attach_stream_any(self, stream, badcircs):
- """ Attach a regular user stream """
- unattached_streams = [stream]
- if self.new_nym:
- self.new_nym = False
- plog("DEBUG", "Obeying new nym")
- for key in self.circuits.keys():
- if (not self.circuits[key].dirty
- and len(self.circuits[key].pending_streams)):
- plog("WARN", "New nym called, destroying circuit "+str(key)
- +" with "+str(len(self.circuits[key].pending_streams))
- +" pending streams")
- unattached_streams.extend(self.circuits[key].pending_streams)
- del self.circuits[key].pending_streams[:]
- # FIXME: Consider actually closing circs if no streams
- self.circuits[key].dirty = True
-
- # Check if there is a sorted list of circs
- if self.sorted_circs: list = self.sorted_circs
- else: list = self.circuits.values()
- for circ in list:
- # Check each circuit
- if circ.built and not circ.closed and circ.circ_id not in badcircs and not circ.dirty:
- if circ.exit.will_exit_to(stream.host, stream.port):
- try:
- self.c.attach_stream(stream.strm_id, circ.circ_id)
- stream.pending_circ = circ # Only one possible here
- circ.pending_streams.append(stream)
- self.last_exit = circ.exit
- except TorCtl.ErrorReply, e:
- # No need to retry here. We should get the failed
- # event for either the circ or stream next
- plog("WARN", "Error attaching stream: " + str(e.args))
- return
- break
- else:
- plog("DEBUG", "Circuit " + str(circ.circ_id) + " won't exit")
- else:
- self.create_and_attach(stream, unattached_streams)
-
- def stream_status_event(self, s):
- """ Catch user stream events """
- # Construct debugging output
- output = [s.event_name, str(s.strm_id), s.status, str(s.circ_id), s.target_host+':'+str(s.target_port)]
- if s.reason: output.append("REASON=" + s.reason)
- if s.remote_reason: output.append("REMOTE_REASON=" + s.remote_reason)
- if s.purpose: output.append("PURPOSE=" + s.purpose)
- plog("DEBUG", " ".join(output))
-
- # If target_host is not an IP-address
- if not re.match(r"\d+.\d+.\d+.\d+", s.target_host):
- s.target_host = "255.255.255.255" # ignore DNS for exit policy check
-
- # Hack to ignore Tor-handled streams (Currently only directory streams)
- if s.strm_id in self.streams and self.streams[s.strm_id].ignored:
- plog("DEBUG", "Ignoring stream: " + str(s.strm_id))
- return
-
- # NEW or NEWRESOLVE
- if s.status == "NEW" or s.status == "NEWRESOLVE":
- if s.status == "NEWRESOLVE" and not s.target_port:
- s.target_port = self.resolve_port
- # Set up the new stream
- stream = Stream(s.strm_id, s.target_host, s.target_port, s.status)
-
- self.streams[s.strm_id] = stream
- if s.purpose and s.purpose.find("DIR_") == 0:
- stream.ignored = True
- plog("DEBUG", "Ignoring stream: " + str(s.strm_id))
- return
- else:
- self.attach_stream_any(self.streams[s.strm_id], self.streams[s.strm_id].detached_from)
-
- # DETACHED
- elif s.status == "DETACHED":
- # Stream not found
- if s.strm_id not in self.streams:
- plog("WARN", "Detached stream " + str(s.strm_id) + " not found")
- self.streams[s.strm_id] = Stream(s.strm_id, s.target_host, s.target_port, "NEW")
- # Circuit not found
- if not s.circ_id:
- plog("WARN", "Stream " + str(s.strm_id) + " detached from no circuit!")
- else:
- self.streams[s.strm_id].detached_from.append(s.circ_id)
- # Detect timeouts on user streams
- if s.reason == "TIMEOUT":
- # TODO: Count timeouts on streams?
- #self.streams[s.strm_id].timeout_counter += 1
- plog("DEBUG", "User stream timed out on circuit " + str(s.circ_id))
- # Stream was pending
- if self.streams[s.strm_id] in self.streams[s.strm_id].pending_circ.pending_streams:
- self.streams[s.strm_id].pending_circ.pending_streams.remove(self.streams[s.strm_id])
- # Attach to another circ
- self.streams[s.strm_id].pending_circ = None
- self.attach_stream_any(self.streams[s.strm_id], self.streams[s.strm_id].detached_from)
-
- # SUCCEEDED
- if s.status == "SUCCEEDED":
- if s.strm_id not in self.streams:
- plog("NOTICE", "Succeeded stream " + str(s.strm_id) + " not found")
- return
- if s.circ_id and self.streams[s.strm_id].pending_circ.circ_id != s.circ_id:
- # Hrmm.. this can happen on a new-nym.. Very rare, putting warn
- # in because I'm still not sure this is correct
- plog("WARN", "Mismatch of pending: "
- + str(self.streams[s.strm_id].pending_circ.circ_id) + " vs "
- + str(s.circ_id))
- self.streams[s.strm_id].circ = self.circuits[s.circ_id]
- else:
- self.streams[s.strm_id].circ = self.streams[s.strm_id].pending_circ
- self.streams[s.strm_id].pending_circ.pending_streams.remove(self.streams[s.strm_id])
- self.streams[s.strm_id].pending_circ = None
- self.streams[s.strm_id].attached_at = s.arrived_at
-
- # FAILED or CLOSED
- elif s.status == "FAILED" or s.status == "CLOSED":
- if s.strm_id not in self.streams:
- plog("NOTICE", "Failed stream " + str(s.strm_id) + " not found")
- return
- # if not s.circ_id:
- # plog("WARN", "Stream " + str(s.strm_id) + " closed/failed from no circuit")
- # We get failed and closed for each stream, let CLOSED do the cleanup
- if s.status == "FAILED":
- # Avoid busted circuits that will not resolve or carry traffic
- self.streams[s.strm_id].failed = True
- if s.circ_id in self.circuits: self.circuits[s.circ_id].dirty = True
- elif self.streams[s.strm_id].attached_at != 0:
- plog("WARN", "Failed stream on unknown circuit " + str(s.circ_id))
- return
- # CLOSED
- if self.streams[s.strm_id].pending_circ:
- self.streams[s.strm_id].pending_circ.pending_streams.remove(self.streams[s.strm_id])
- # Actual removal of the stream
- del self.streams[s.strm_id]
-
- # REMAP
- elif s.status == "REMAP":
- if s.strm_id not in self.streams:
- plog("WARN", "Remap id "+str(s.strm_id)+" not found")
- else:
- if not re.match(r"\d+.\d+.\d+.\d+", s.target_host):
- s.target_host = "255.255.255.255"
- plog("NOTICE", "Non-IP remap for "+str(s.strm_id) +
- " to " + s.target_host)
- self.streams[s.strm_id].host = s.target_host
- self.streams[s.strm_id].port = s.target_port
-
- def address_mapped_event(self, event):
- """ It is necessary to listen to ADDRMAP events to be able to
- perform DNS lookups using Tor """
- output = [event.event_name, event.from_addr, event.to_addr,
- time.asctime(event.when)]
- plog("DEBUG", " ".join(output))
-
- def unknown_event(self, event):
- plog("DEBUG", "UNKNOWN EVENT '" + event.event_name + "':" +
- event.event_string)
-
-########################## Unit tests ##########################
-
-def do_gen_unit(gen, r_list, weight_bw, num_print):
- trials = 0
- for r in r_list:
- if gen.rstr_list.r_is_ok(r):
- trials += weight_bw(gen, r)
- trials = int(trials/1024)
-
- print "Running "+str(trials)+" trials"
-
- # 0. Reset r.chosen = 0 for all routers
- for r in r_list:
- r.chosen = 0
-
- # 1. Generate 'trials' choices:
- # 1a. r.chosen++
-
- loglevel = TorUtil.loglevel
- TorUtil.loglevel = "INFO"
-
- #gen.rewind() - Just overhead if we create a fresh generator each time
- rtrs = gen.generate()
- for i in xrange(1, trials):
- r = rtrs.next()
- r.chosen += 1
-
- TorUtil.loglevel = loglevel
-
- # 2. Print top num_print routers choices+bandwidth stats+flags
- i = 0
- copy_rlist = copy.copy(r_list)
- copy_rlist.sort(lambda x, y: cmp(y.chosen, x.chosen))
- for r in copy_rlist:
- if not gen.rstr_list.r_is_ok(r): continue
- flag = ""
- bw = int(weight_bw(gen, r))
- if "Exit" in r.flags:
- flag += "E"
- if "Guard" in r.flags:
- flag += "G"
- print str(r.list_rank)+". "+r.nickname+" "+str(r.bw/1024)+"/"+str(bw/1024)+": "+str(r.chosen)+", "+flag
- i += 1
- if i > num_print: break
-
-def do_unit(rst, r_list, plamb):
- print "\n"
- print "-----------------------------------"
- print rst.r_is_ok.im_class
- above_i = 0
- above_bw = 0
- below_i = 0
- below_bw = 0
- for r in r_list:
- if rst.r_is_ok(r):
- print r.nickname+" "+plamb(r)+"="+str(rst.r_is_ok(r))+" "+str(r.bw)
- if r.bw > 400000:
- above_i = above_i + 1
- above_bw += r.bw
- else:
- below_i = below_i + 1
- below_bw += r.bw
-
- print "Routers above: " + str(above_i) + " bw: " + str(above_bw)
- print "Routers below: " + str(below_i) + " bw: " + str(below_bw)
-
-# TODO: Tests:
-# - Test each NodeRestriction and print in/out lines for it
-# - Test NodeGenerator and reapply NodeRestrictions
-# - Same for PathSelector and PathRestrictions
-# - Also Reapply each restriction by hand to path. Verify returns true
-
-if __name__ == '__main__':
- s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
- s.connect(("127.0.0.1",9051))
- c = Connection(s)
- c.debug(file("control.log", "w"))
- c.authenticate()
- nslist = c.get_network_status()
- sorted_rlist = c.read_routers(c.get_network_status())
-
- sorted_rlist.sort(lambda x, y: cmp(y.bw, x.bw))
- for i in xrange(len(sorted_rlist)): sorted_rlist[i].list_rank = i
-
- def flag_weighting(bwgen, r):
- bw = r.bw
- if "Exit" in r.flags:
- bw *= bwgen.exit_weight
- if "Guard" in r.flags:
- bw *= bwgen.guard_weight
- return bw
-
- do_gen_unit(BwWeightedGenerator(sorted_rlist, FlagsRestriction(["Exit"]),
- 3, exit=True),
- sorted_rlist, flag_weighting, 500)
-
- do_gen_unit(BwWeightedGenerator(sorted_rlist, FlagsRestriction(["Guard"]),
- 3, guard=True),
- sorted_rlist, flag_weighting, 500)
-
- do_gen_unit(
- BwWeightedGenerator(sorted_rlist, FlagsRestriction(["Valid"]), 3),
- sorted_rlist, flag_weighting, 500)
-
- exit(0)
-
- for r in sorted_rlist:
- if r.will_exit_to("211.11.21.22", 465):
- print r.nickname+" "+str(r.bw)
-
- do_unit(FlagsRestriction(["Guard"], []), sorted_rlist, lambda r: " ".join(r.flags))
- do_unit(FlagsRestriction(["Fast"], []), sorted_rlist, lambda r: " ".join(r.flags))
-
- do_unit(ExitPolicyRestriction("2.11.2.2", 80), sorted_rlist,
- lambda r: "exits to 80")
- do_unit(PercentileRestriction(0, 100, sorted_rlist), sorted_rlist,
- lambda r: "")
- do_unit(PercentileRestriction(10, 20, sorted_rlist), sorted_rlist,
- lambda r: "")
- do_unit(OSRestriction([r"[lL]inux", r"BSD", "Darwin"], []), sorted_rlist,
- lambda r: r.os)
- do_unit(OSRestriction([], ["Windows", "Solaris"]), sorted_rlist,
- lambda r: r.os)
-
- do_unit(VersionRangeRestriction("0.1.2.0"), sorted_rlist,
- lambda r: str(r.version))
- do_unit(VersionRangeRestriction("0.1.2.0", "0.1.2.5"), sorted_rlist,
- lambda r: str(r.version))
- do_unit(VersionIncludeRestriction(["0.1.1.26-alpha", "0.1.2.7-ignored"]),
- sorted_rlist, lambda r: str(r.version))
- do_unit(VersionExcludeRestriction(["0.1.1.26"]), sorted_rlist,
- lambda r: str(r.version))
-
- do_unit(ConserveExitsRestriction(), sorted_rlist, lambda r: " ".join(r.flags))
- do_unit(FlagsRestriction([], ["Valid"]), sorted_rlist, lambda r: " ".join(r.flags))
-
- do_unit(IdHexRestriction("$FFCB46DB1339DA84674C70D7CB586434C4370441"),
- sorted_rlist, lambda r: r.idhex)
-
- rl = [AtLeastNNodeRestriction([ExitPolicyRestriction("255.255.255.255", 80), ExitPolicyRestriction("255.255.255.255", 443), ExitPolicyRestriction("255.255.255.255", 6667)], 2), FlagsRestriction([], ["BadExit"])]
-
- exit_rstr = NodeRestrictionList(rl)
-
- ug = UniformGenerator(sorted_rlist, exit_rstr)
-
- rlist = []
- for r in ug.generate():
- print "Checking: " + r.nickname
- for rs in rl:
- if not rs.r_is_ok(r):
- raise PathError()
- if not "Exit" in r.flags:
- print "No exit in flags of "+r.nickname
- rlist.append(r)
- for r in sorted_rlist:
- if "Exit" in r.flags and not r in rlist:
- print r.nickname+" is an exit not in rl!"
-
Copied: torflow/branches/gsoc2008/TorCtl.local/PathSupport.py (from rev 15741, torflow/branches/gsoc2008/TorCtl/PathSupport.py)
===================================================================
--- torflow/branches/gsoc2008/TorCtl.local/PathSupport.py (rev 0)
+++ torflow/branches/gsoc2008/TorCtl.local/PathSupport.py 2008-07-07 19:37:47 UTC (rev 15742)
@@ -0,0 +1,1712 @@
+#!/usr/bin/python
+"""
+
+Support classes for path construction
+
+The PathSupport package builds on top of TorCtl.TorCtl. It provides a
+number of interfaces that make path construction easier.
+
+The inheritance diagram for event handling is as follows:
+TorCtl.EventHandler <- PathBuilder <- CircuitHandler <- StreamHandler.
+
+Basically, EventHandler is what gets all the control port events
+packaged in nice clean classes (see help(TorCtl) for information on
+those).
+
+PathBuilder inherits from EventHandler and is what builds all circuits
+based on the requirements specified in the SelectionManager instance
+passed to its constructor. It also handles attaching streams to
+circuits. It only handles one building one circuit at a time.
+
+CircuitHandler optionally inherits from PathBuilder, and overrides its
+circuit event handling to manage building a pool of circuits as opposed
+to just one. It still uses the SelectionManager for path selection.
+
+StreamHandler inherits from CircuitHandler, and is what governs the
+attachment of an incoming stream on to one of the multiple circuits of
+the circuit handler.
+
+The SelectionManager is essentially a configuration wrapper around the
+most elegant portions of TorFlow: NodeGenerators, NodeRestrictions, and
+PathRestrictions. In the SelectionManager, a NodeGenerator is used to
+choose the nodes probabilistically according to some distribution while
+obeying the NodeRestrictions. These generators (one per hop) are handed
+off to the PathSelector, which uses the generators to build a complete
+path that satisfies the PathRestriction requirements.
+
+Have a look at the class hierarchy directly below to get a feel for how
+the restrictions fit together, and what options are available.
+
+"""
+
+import TorCtl
+import re
+import struct
+import random
+import socket
+import copy
+import Queue
+import time
+import TorUtil
+from TorUtil import *
+
+__all__ = ["NodeRestrictionList", "PathRestrictionList",
+"PercentileRestriction", "OSRestriction", "ConserveExitsRestriction",
+"FlagsRestriction", "MinBWRestriction", "VersionIncludeRestriction",
+"VersionExcludeRestriction", "ExitPolicyRestriction", "NodeRestriction",
+"PathRestriction", "OrNodeRestriction", "MetaNodeRestriction",
+"AtLeastNNodeRestriction", "NotNodeRestriction", "Subnet16Restriction",
+"UniqueRestriction", "NodeGenerator", "UniformGenerator",
+"OrderedExitGenerator", "BwWeightedGenerator", "PathSelector",
+"Connection", "NickRestriction", "IdHexRestriction", "PathBuilder",
+"CircuitHandler", "StreamHandler", "SelectionManager",
+"CountryCodeRestriction", "CountryRestriction",
+"UniqueCountryRestriction", "SingleCountryRestriction",
+"ContinentRestriction", "ContinentJumperRestriction",
+"UniqueContinentRestriction"]
+
+#################### Path Support Interfaces #####################
+
+class NodeRestriction:
+ "Interface for node restriction policies"
+ def r_is_ok(self, r):
+ "Returns true if Router 'r' is acceptable for this restriction"
+ return True
+
+class NodeRestrictionList:
+ "Class to manage a list of NodeRestrictions"
+ def __init__(self, restrictions):
+ "Constructor. 'restrictions' is a list of NodeRestriction instances"
+ self.restrictions = restrictions
+
+ def r_is_ok(self, r):
+ "Returns true of Router 'r' passes all of the contained restrictions"
+ for rs in self.restrictions:
+ if not rs.r_is_ok(r): return False
+ return True
+
+ def add_restriction(self, restr):
+ "Add a NodeRestriction 'restr' to the list of restrictions"
+ self.restrictions.append(restr)
+
+ # TODO: This does not collapse meta restrictions..
+ def del_restriction(self, RestrictionClass):
+ """Remove all restrictions of type RestrictionClass from the list.
+ Does NOT inspect or collapse MetaNode Restrictions (though
+ MetaRestrictions can be removed if RestrictionClass is
+ MetaNodeRestriction)"""
+ self.restrictions = filter(
+ lambda r: not isinstance(r, RestrictionClass),
+ self.restrictions)
+
+class PathRestriction:
+ "Interface for path restriction policies"
+ def path_is_ok(self, path):
+ "Return true if the list of Routers in path satisfies this restriction"
+ return True
+
+class PathRestrictionList:
+ """Class to manage a list of PathRestrictions"""
+ def __init__(self, restrictions):
+ "Constructor. 'restrictions' is a list of PathRestriction instances"
+ self.restrictions = restrictions
+
+ def path_is_ok(self, path):
+ "Given list if Routers in 'path', check it against each restriction."
+ for rs in self.restrictions:
+ if not rs.path_is_ok(path):
+ return False
+ return True
+
+ def add_restriction(self, rstr):
+ "Add a PathRestriction 'rstr' to the list"
+ self.restrictions.append(rstr)
+
+ def del_restriction(self, RestrictionClass):
+ "Remove all PathRestrictions of type RestrictionClass from the list."
+ self.restrictions = filter(
+ lambda r: not isinstance(r, RestrictionClass),
+ self.restrictions)
+
+class NodeGenerator:
+ "Interface for node generation"
+ def __init__(self, sorted_r, rstr_list):
+ """Constructor. Takes a bandwidth-sorted list of Routers 'sorted_r'
+ and a NodeRestrictionList 'rstr_list'"""
+ self.rstr_list = rstr_list # Check me before you yield!
+ self.sorted_r = sorted_r
+ self.rewind()
+
+ def reset_restriction(self, rstr_list):
+ "Reset the restriction list to a new list"
+ self.rstr_list = rstr_list
+
+ def rewind(self):
+ "Rewind the generator to the 'beginning'"
+ # FIXME: If we apply the restrictions now, we can save cycles
+ # during selection, and also some memory overhead (at the cost
+ # of a much slower rewind() though..)
+ self.routers = filter(lambda r: self.rstr_list.r_is_ok(r), self.sorted_r)
+ #self.routers = copy.copy(self.sorted_r)
+
+ def mark_chosen(self, r):
+ """Mark a router as chosen: remove it from the list of routers
+ that can be returned in the future"""
+ self.routers.remove(r)
+
+ def all_chosen(self):
+ "Return true if all the routers have been marked as chosen"
+ return not self.routers
+
+ def generate(self):
+ "Return a python generator that yields routers according to the policy"
+ raise NotImplemented()
+
+class Connection(TorCtl.Connection):
+ """Extended Connection class that provides a method for building circuits"""
+ def __init__(self, sock):
+ TorCtl.Connection.__init__(self,sock)
+ def build_circuit(self, pathlen, path_sel):
+ "Tell Tor to build a circuit chosen by the PathSelector 'path_sel'"
+ circ = Circuit()
+ circ.path = path_sel.build_path(pathlen)
+ circ.exit = circ.path[pathlen-1]
+ circ.circ_id = self.extend_circuit(0, circ.id_path())
+ return circ
+
+######################## Node Restrictions ########################
+
+# TODO: We still need more path support implementations
+# - NodeRestrictions:
+# - Uptime/LongLivedPorts (Does/should hibernation count?)
+# - Published/Updated
+# - PathRestrictions:
+# - Family
+# - GeoIP:
+# - OceanPhobicRestrictor (avoids Pacific Ocean or two atlantic crossings)
+# or ContinentRestrictor (avoids doing more than N continent crossings)
+# - Mathematical/empirical study of predecessor expectation
+# - If middle node on the same continent as exit, exit learns nothing
+# - else, exit has a bias on the continent of origin of user
+# - Language and browser accept string determine this anyway
+# - EchelonPhobicRestrictor
+# - Does not cross international boundaries for client->Entry or
+# Exit->destination hops
+
+class PercentileRestriction(NodeRestriction):
+ """Restriction to cut out a percentile slice of the network."""
+ def __init__(self, pct_skip, pct_fast, r_list):
+ """Constructor. Sets up the restriction such that routers in the
+ 'pct_skip' to 'pct_fast' percentile of bandwidth rankings are
+ returned from the sorted list 'r_list'"""
+ self.pct_fast = pct_fast
+ self.pct_skip = pct_skip
+ self.sorted_r = r_list
+
+ def r_is_ok(self, r):
+ "Returns true if r is in the percentile boundaries (by rank)"
+ # Hrmm.. technically we shouldn't count non-running routers in this..
+ # but that is tricky to do efficiently.
+ # XXX: Is there any reason why sorted_r should have non-running
+ # routers in the first place?
+
+ if r.list_rank < len(self.sorted_r)*self.pct_skip/100: return False
+ elif r.list_rank > len(self.sorted_r)*self.pct_fast/100: return False
+
+ return True
+
+class OSRestriction(NodeRestriction):
+ "Restriction based on operating system"
+ def __init__(self, ok, bad=[]):
+ """Constructor. Accept router OSes that match regexes in 'ok',
+ rejects those that match regexes in 'bad'."""
+ self.ok = ok
+ self.bad = bad
+
+ def r_is_ok(self, r):
+ "Returns true if r is in 'ok', false if 'r' is in 'bad'. If 'ok'"
+ for y in self.ok:
+ if re.search(y, r.os):
+ return True
+ for b in self.bad:
+ if re.search(b, r.os):
+ return False
+ if self.ok: return False
+ if self.bad: return True
+
+class ConserveExitsRestriction(NodeRestriction):
+ "Restriction to reject exits from selection"
+ # XXX: Make this adaptive by ip/port
+ def r_is_ok(self, r): return not "Exit" in r.flags
+
+class FlagsRestriction(NodeRestriction):
+ "Restriction for mandatory and forbidden router flags"
+ def __init__(self, mandatory, forbidden=[]):
+ """Constructor. 'mandatory' and 'forbidden' are both lists of router
+ flags as strings."""
+ self.mandatory = mandatory
+ self.forbidden = forbidden
+
+ def r_is_ok(self, router):
+ for m in self.mandatory:
+ if not m in router.flags: return False
+ for f in self.forbidden:
+ if f in router.flags: return False
+ return True
+
+class NickRestriction(NodeRestriction):
+ """Require that the node nickname is as specified"""
+ def __init__(self, nickname):
+ self.nickname = nickname
+
+ def r_is_ok(self, router):
+ return router.nickname == self.nickname
+
+class IdHexRestriction(NodeRestriction):
+ """Require that the node idhash is as specified"""
+ def __init__(self, idhex):
+ if idhex[0] == '$':
+ self.idhex = idhex[1:].upper()
+ else:
+ self.idhex = idhex.upper()
+
+ def r_is_ok(self, router):
+ return router.idhex == self.idhex
+
+class MinBWRestriction(NodeRestriction):
+ """Require a minimum bandwidth"""
+ def __init__(self, minbw):
+ self.min_bw = minbw
+
+ def r_is_ok(self, router): return router.bw >= self.min_bw
+
+class VersionIncludeRestriction(NodeRestriction):
+ """Require that the version match one in the list"""
+ def __init__(self, eq):
+ "Constructor. 'eq' is a list of versions as strings"
+ self.eq = map(TorCtl.RouterVersion, eq)
+
+ def r_is_ok(self, router):
+ """Returns true if the version of 'router' matches one of the
+ specified versions."""
+ for e in self.eq:
+ if e == router.version:
+ return True
+ return False
+
+class VersionExcludeRestriction(NodeRestriction):
+ """Require that the version not match one in the list"""
+ def __init__(self, exclude):
+ "Constructor. 'exclude' is a list of versions as strings"
+ self.exclude = map(TorCtl.RouterVersion, exclude)
+
+ def r_is_ok(self, router):
+ """Returns false if the version of 'router' matches one of the
+ specified versions."""
+ for e in self.exclude:
+ if e == router.version:
+ return False
+ return True
+
+class VersionRangeRestriction(NodeRestriction):
+ """Require that the versions be inside a specified range"""
+ def __init__(self, gr_eq, less_eq=None):
+ self.gr_eq = TorCtl.RouterVersion(gr_eq)
+ if less_eq: self.less_eq = TorCtl.RouterVersion(less_eq)
+ else: self.less_eq = None
+
+ def r_is_ok(self, router):
+ return (not self.gr_eq or router.version >= self.gr_eq) and \
+ (not self.less_eq or router.version <= self.less_eq)
+
+class ExitPolicyRestriction(NodeRestriction):
+ """Require that a router exit to an ip+port"""
+ def __init__(self, to_ip, to_port):
+ self.to_ip = to_ip
+ self.to_port = to_port
+
+ def r_is_ok(self, r): return r.will_exit_to(self.to_ip, self.to_port)
+
+class MetaNodeRestriction(NodeRestriction):
+ """Interface for a NodeRestriction that is an expression consisting of
+ multiple other NodeRestrictions"""
+ # TODO: these should collapse the restriction and return a new
+ # instance for re-insertion (or None)
+ def next_rstr(self): raise NotImplemented()
+ def del_restriction(self, RestrictionClass): raise NotImplemented()
+
+class OrNodeRestriction(MetaNodeRestriction):
+ """MetaNodeRestriction that is the boolean or of two or more
+ NodeRestrictions"""
+ def __init__(self, rs):
+ "Constructor. 'rs' is a list of NodeRestrictions"
+ self.rstrs = rs
+
+ def r_is_ok(self, r):
+ "Returns true if one of 'rs' is true for this router"
+ for rs in self.rstrs:
+ if rs.r_is_ok(r):
+ return True
+ return False
+
+class NotNodeRestriction(MetaNodeRestriction):
+ """Negates a single restriction"""
+ def __init__(self, a):
+ self.a = a
+
+ def r_is_ok(self, r): return not self.a.r_is_ok(r)
+
+class AtLeastNNodeRestriction(MetaNodeRestriction):
+ """MetaNodeRestriction that is true if at least n member
+ restrictions are true."""
+ def __init__(self, rstrs, n):
+ self.rstrs = rstrs
+ self.n = n
+
+ def r_is_ok(self, r):
+ cnt = 0
+ for rs in self.rstrs:
+ if rs.r_is_ok(r):
+ cnt += 1
+ if cnt < self.n: return False
+ else: return True
+
+
+#################### Path Restrictions #####################
+
+class Subnet16Restriction(PathRestriction):
+ """PathRestriction that mandates that no two nodes from the same
+ /16 subnet be in the path"""
+ def path_is_ok(self, path):
+ mask16 = struct.unpack(">I", socket.inet_aton("255.255.0.0"))[0]
+ ip16 = path[0].ip & mask16
+ for r in path[1:]:
+ if ip16 == (r.ip & mask16):
+ return False
+ return True
+
+class UniqueRestriction(PathRestriction):
+ """Path restriction that mandates that the same router can't appear more
+ than once in a path"""
+ def path_is_ok(self, path):
+ for i in xrange(0,len(path)):
+ if path[i] in path[:i]:
+ return False
+ return True
+
+#################### GeoIP Restrictions ###################
+
+class CountryCodeRestriction(NodeRestriction):
+ """ Ensure that the country_code is set """
+ def r_is_ok(self, r):
+ return r.country_code != None
+
+class CountryRestriction(NodeRestriction):
+ """ Only accept nodes that are in 'country_code' """
+ def __init__(self, country_code):
+ self.country_code = country_code
+
+ def r_is_ok(self, r):
+ return r.country_code == self.country_code
+
+class ExcludeCountriesRestriction(NodeRestriction):
+ """ Exclude a list of countries """
+ def __init__(self, countries):
+ self.countries = countries
+
+ def r_is_ok(self, r):
+ return not (r.country_code in self.countries)
+
+class UniqueCountryRestriction(PathRestriction):
+ """ Ensure every router to have a distinct country_code """
+ def path_is_ok(self, path):
+ for i in xrange(0, len(path)-1):
+ for j in xrange(i+1, len(path)):
+ if path[i].country_code == path[j].country_code:
+ return False;
+ return True;
+
+class SingleCountryRestriction(PathRestriction):
+ """ Ensure every router to have the same country_code """
+ def path_is_ok(self, path):
+ country_code = path[0].country_code
+ for r in path:
+ if country_code != r.country_code:
+ return False
+ return True
+
+class ContinentRestriction(PathRestriction):
+ """ Do not more than n continent crossings """
+ # TODO: Add src and dest
+ def __init__(self, n, src=None, dest=None):
+ self.n = n
+
+ def path_is_ok(self, path):
+ crossings = 0
+ prev = None
+ # Compute crossings until now
+ for r in path:
+ # Jump over the first router
+ if prev:
+ if r.continent != prev.continent:
+ crossings += 1
+ prev = r
+ if crossings > self.n: return False
+ else: return True
+
+class ContinentJumperRestriction(PathRestriction):
+ """ Ensure continent crossings between all hops """
+ def path_is_ok(self, path):
+ prev = None
+ for r in path:
+ # Jump over the first router
+ if prev:
+ if r.continent == prev.continent:
+ return False
+ prev = r
+ return True
+
+class UniqueContinentRestriction(PathRestriction):
+ """ Ensure every hop to be on a different continent """
+ def path_is_ok(self, path):
+ for i in xrange(0, len(path)-1):
+ for j in xrange(i+1, len(path)):
+ if path[i].continent == path[j].continent:
+ return False;
+ return True;
+
+class OceanPhobicRestriction(PathRestriction):
+ """ Not more than n ocean crossings """
+ # TODO: Add src and dest
+ def __init__(self, n, src=None, dest=None):
+ self.n = n
+
+ def path_is_ok(self, path):
+ crossings = 0
+ prev = None
+ # Compute ocean crossings until now
+ for r in path:
+ # Jump over the first router
+ if prev:
+ if r.cont_group != prev.cont_group:
+ crossings += 1
+ prev = r
+ if crossings > self.n: return False
+ else: return True
+
+#################### Node Generators ######################
+
+class UniformGenerator(NodeGenerator):
+ """NodeGenerator that produces nodes in the uniform distribution"""
+ def generate(self):
+ while not self.all_chosen():
+ r = random.choice(self.routers)
+ if self.rstr_list.r_is_ok(r): yield r
+
+class OrderedExitGenerator(NodeGenerator):
+ """NodeGenerator that produces exits in an ordered fashion for a
+ specific port"""
+ def __init__(self, to_port, sorted_r, rstr_list):
+ self.to_port = to_port
+ self.next_exit_by_port = {}
+ NodeGenerator.__init__(self, sorted_r, rstr_list)
+
+ def rewind(self):
+ if self.to_port not in self.next_exit_by_port or not self.next_exit_by_port[self.to_port]:
+ self.next_exit_by_port[self.to_port] = 0
+ self.last_idx = len(self.sorted_r)
+ else:
+ self.last_idx = self.next_exit_by_port[self.to_port]
+
+ def set_port(self, port):
+ self.to_port = port
+ self.rewind()
+
+ def mark_chosen(self, r):
+ self.next_exit_by_port[self.to_port] += 1
+
+ def all_chosen(self):
+ return self.last_idx == self.next_exit_by_port[self.to_port]
+
+ def generate(self):
+ while True: # A do..while would be real nice here..
+ if self.next_exit_by_port[self.to_port] >= len(self.sorted_r):
+ self.next_exit_by_port[self.to_port] = 0
+ r = self.sorted_r[self.next_exit_by_port[self.to_port]]
+ if self.rstr_list.r_is_ok(r): yield r
+ else: self.next_exit_by_port[self.to_port] += 1
+ if self.last_idx == self.next_exit_by_port[self.to_port]:
+ break
+
+class BwWeightedGenerator(NodeGenerator):
+ """
+
+ This is a generator designed to match the Tor Path Selection
+ algorithm. It will generate nodes weighted by their bandwidth,
+ but take the appropriate weighting into account against guard
+ nodes and exit nodes when they are chosen for positions other
+ than guard/exit. For background see:
+ routerlist.c::smartlist_choose_by_bandwidth(),
+ http://archives.seul.org/or/dev/Jul-2007/msg00021.html,
+ http://archives.seul.org/or/dev/Jul-2007/msg00056.html, and
+ https://tor-svn.freehaven.net/svn/tor/trunk/doc/spec/path-spec.txt
+ The formulas used are from the first or-dev link, but are proven
+ optimal and equivalent to the ones now used in routerlist.c in the
+ second or-dev link.
+
+ """
+ def __init__(self, sorted_r, rstr_list, pathlen, exit=False, guard=False):
+ """ Pass exit=True to create a generator for exit-nodes """
+ self.max_bandwidth = 10000000
+ # Out for an exit-node?
+ self.exit = exit
+ # Is this a guard node?
+ self.guard = guard
+ # Different sums of bandwidths
+ self.total_bw = 0
+ self.total_exit_bw = 0
+ self.total_guard_bw = 0
+ self.total_weighted_bw = 0
+ self.pathlen = pathlen
+ NodeGenerator.__init__(self, sorted_r, rstr_list)
+
+ def rewind(self):
+ NodeGenerator.rewind(self)
+ # Set the exit_weight
+ # We are choosing a non-exit
+ self.total_exit_bw = 0
+ self.total_guard_bw = 0
+ self.total_bw = 0
+ for r in self.sorted_r:
+ # Should this be outside the restriction checks?
+ # TODO: Check max_bandwidth and cap...
+ if self.rstr_list.r_is_ok(r):
+ self.total_bw += r.bw
+ if "Exit" in r.flags:
+ self.total_exit_bw += r.bw
+ if "Guard" in r.flags:
+ self.total_guard_bw += r.bw
+
+ bw_per_hop = (1.0*self.total_bw)/self.pathlen
+
+ # Print some debugging info about bandwidth ratios
+ if self.total_bw > 0:
+ e_ratio = self.total_exit_bw/float(self.total_bw)
+ g_ratio = self.total_guard_bw/float(self.total_bw)
+ else:
+ g_ratio = 0
+ e_ratio = 0
+ plog("DEBUG",
+ "E = " + str(self.total_exit_bw) +
+ ", G = " + str(self.total_guard_bw) +
+ ", T = " + str(self.total_bw) +
+ ", g_ratio = " + str(g_ratio) + ", e_ratio = " +str(e_ratio) +
+ ", bw_per_hop = " + str(bw_per_hop))
+
+ if self.exit:
+ self.exit_weight = 1.0
+ else:
+ if self.total_exit_bw < bw_per_hop:
+ # Don't use exit nodes at all
+ self.exit_weight = 0
+ else:
+ if self.total_exit_bw > 0:
+ self.exit_weight = ((self.total_exit_bw-bw_per_hop)/self.total_exit_bw)
+ else: self.exit_weight = 0
+
+ if self.guard:
+ self.guard_weight = 1.0
+ else:
+ if self.total_guard_bw < bw_per_hop:
+ # Don't use exit nodes at all
+ self.guard_weight = 0
+ else:
+ if self.total_guard_bw > 0:
+ self.guard_weight = ((self.total_guard_bw-bw_per_hop)/self.total_guard_bw)
+ else: self.guard_weight = 0
+
+ for r in self.sorted_r:
+ bw = r.bw
+ if "Exit" in r.flags:
+ bw *= self.exit_weight
+ if "Guard" in r.flags:
+ bw *= self.guard_weight
+ self.total_weighted_bw += bw
+
+ self.total_weighted_bw = int(self.total_weighted_bw)
+ plog("DEBUG", "Bw: "+str(self.total_weighted_bw)+"/"+str(self.total_bw)
+ +". The exit-weight is: "+str(self.exit_weight)
+ + ", guard weight is: "+str(self.guard_weight))
+
+ def generate(self):
+ while True:
+ # Choose a suitable random int
+ i = random.randint(0, self.total_weighted_bw)
+
+ # Go through the routers
+ for r in self.routers:
+ # Below zero here means next() -> choose a new random int+router
+ if i < 0: break
+ if self.rstr_list.r_is_ok(r):
+ bw = r.bw
+ if "Exit" in r.flags:
+ bw *= self.exit_weight
+ if "Guard" in r.flags:
+ bw *= self.guard_weight
+
+ i -= bw
+ if i < 0:
+ plog("DEBUG", "Chosen router with a bandwidth of: " + str(r.bw))
+ yield r
+
+####################### Secret Sauce ###########################
+
+class PathError(Exception):
+ pass
+
+class NoRouters(PathError):
+ pass
+
+class PathSelector:
+ """Implementation of path selection policies. Builds a path according
+ to entry, middle, and exit generators that satisfies the path
+ restrictions."""
+ def __init__(self, entry_gen, mid_gen, exit_gen, path_restrict):
+ """Constructor. The first three arguments are NodeGenerators with
+ their appropriate restrictions. The 'path_restrict' is a
+ PathRestrictionList"""
+ self.entry_gen = entry_gen
+ self.mid_gen = mid_gen
+ self.exit_gen = exit_gen
+ self.path_restrict = path_restrict
+
+ def build_path(self, pathlen):
+ """Creates a path of 'pathlen' hops, and returns it as a list of
+ Router instances"""
+ self.entry_gen.rewind()
+ self.mid_gen.rewind()
+ self.exit_gen.rewind()
+ entry = self.entry_gen.generate()
+ mid = self.mid_gen.generate()
+ ext = self.exit_gen.generate()
+
+ while True:
+ path = []
+ try:
+ if pathlen == 1:
+ path = [ext.next()]
+ else:
+ path.append(entry.next())
+ for i in xrange(1, pathlen-1):
+ path.append(mid.next())
+ path.append(ext.next())
+ if self.path_restrict.path_is_ok(path):
+ self.entry_gen.mark_chosen(path[0])
+ for i in xrange(1, pathlen-1):
+ self.mid_gen.mark_chosen(path[i])
+ self.exit_gen.mark_chosen(path[pathlen-1])
+ break
+ except StopIteration:
+ plog("NOTICE", "Ran out of routers during buildpath..");
+ self.entry_gen.rewind()
+ self.mid_gen.rewind()
+ self.exit_gen.rewind()
+ entry = self.entry_gen.generate()
+ mid = self.entry_gen.generate()
+ ext = self.entry_gen.generate()
+ return path
+
+class SelectionManager:
+ """Helper class to handle configuration updates
+
+ The methods are NOT threadsafe. They may ONLY be called from
+ EventHandler's thread. This means that to update the selection
+ manager, you must schedule a config update job using
+ PathBuilder.schedule_selmgr() with a worker function to modify
+ this object.
+ """
+ def __init__(self, pathlen, order_exits,
+ percent_fast, percent_skip, min_bw, use_all_exits,
+ uniform, use_exit, use_guards,geoip_config=None,restrict_guards=False):
+ self.__ordered_exit_gen = None
+ self.pathlen = pathlen
+ self.order_exits = order_exits
+ self.percent_fast = percent_fast
+ self.percent_skip = percent_skip
+ self.min_bw = min_bw
+ self.use_all_exits = use_all_exits
+ self.uniform = uniform
+ self.exit_name = use_exit
+ self.use_guards = use_guards
+ self.geoip_config = geoip_config
+
+ self.restrict_guards_only = restrict_guards
+
+ def reconfigure(self, sorted_r):
+ """This function is called after a configuration change,
+ to rebuild the RestrictionLists."""
+ if self.use_all_exits:
+ self.path_rstr = PathRestrictionList([UniqueRestriction()])
+ else:
+ self.path_rstr = PathRestrictionList(
+ [Subnet16Restriction(), UniqueRestriction()])
+
+ if self.use_guards: entry_flags = ["Guard", "Valid", "Running"]
+ else: entry_flags = ["Valid", "Running"]
+
+
+ if self.restrict_guards_only:
+ entry_flags = ["Guard","Valid","Running"]
+ nonentry_skip = 0
+ nonentry_fast = 100
+ else:
+ entry_flags = ["Valid","Running"]
+ nonentry_skip = self.percent_skip
+ nonentry_fast = self.percent_fast
+
+ entry_rstr = NodeRestrictionList(
+ [PercentileRestriction(self.percent_skip, self.percent_fast, sorted_r),
+ ConserveExitsRestriction(),
+ FlagsRestriction(entry_flags, [])]
+ )
+ mid_rstr = NodeRestrictionList(
+ [PercentileRestriction(nonentry_skip, nonentry_fast, sorted_r),
+ ConserveExitsRestriction(),
+ FlagsRestriction(["Running","Fast"], [])]
+
+ )
+ if self.use_all_exits:
+ self.exit_rstr = NodeRestrictionList(
+ [FlagsRestriction(["Valid", "Running","Fast"], ["BadExit"])])
+ else:
+ self.exit_rstr = NodeRestrictionList(
+ [PercentileRestriction(nonentry_skip, nonentry_fast, sorted_r),
+ FlagsRestriction(["Valid", "Running","Fast"], ["BadExit"])])
+
+ if self.exit_name:
+ self.exit_rstr.del_restriction(IdHexRestriction)
+ self.exit_rstr.del_restriction(NickRestriction)
+ if self.exit_name[0] == '$':
+ self.exit_rstr.add_restriction(IdHexRestriction(self.exit_name))
+ else:
+ self.exit_rstr.add_restriction(NickRestriction(self.exit_name))
+
+ # GeoIP configuration
+ if self.geoip_config:
+ # Every node needs country_code
+ entry_rstr.add_restriction(CountryCodeRestriction())
+ mid_rstr.add_restriction(CountryCodeRestriction())
+ self.exit_rstr.add_restriction(CountryCodeRestriction())
+
+ # Specified countries for different positions
+ if self.geoip_config.entry_country:
+ entry_rstr.add_restriction(CountryRestriction(self.geoip_config.entry_country))
+ if self.geoip_config.middle_country:
+ mid_rstr.add_restriction(CountryRestriction(self.geoip_config.middle_country))
+ if self.geoip_config.exit_country:
+ self.exit_rstr.add_restriction(CountryRestriction(self.geoip_config.exit_country))
+
+ # Excluded countries
+ if self.geoip_config.excludes:
+ plog("INFO", "Excluded countries: " + str(self.geoip_config.excludes))
+ if len(self.geoip_config.excludes) > 0:
+ entry_rstr.add_restriction(ExcludeCountriesRestriction(self.geoip_config.excludes))
+ mid_rstr.add_restriction(ExcludeCountriesRestriction(self.geoip_config.excludes))
+ self.exit_rstr.add_restriction(ExcludeCountriesRestriction(self.geoip_config.excludes))
+
+ # Unique countries set? None --> pass
+ if self.geoip_config.unique_countries != None:
+ if self.geoip_config.unique_countries:
+ # If True: unique countries
+ self.path_rstr.add_restriction(UniqueCountryRestriction())
+ else:
+ # False: use the same country for all nodes in a path
+ self.path_rstr.add_restriction(SingleCountryRestriction())
+
+ # Specify max number of continent crossings, None means UniqueContinents
+ if self.geoip_config.continent_crossings == None:
+ self.path_rstr.add_restriction(UniqueContinentRestriction())
+ else: self.path_rstr.add_restriction(ContinentRestriction(self.geoip_config.continent_crossings))
+ # Should even work in combination with continent crossings
+ if self.geoip_config.ocean_crossings != None:
+ self.path_rstr.add_restriction(OceanPhobicRestriction(self.geoip_config.ocean_crossings))
+
+ # This is kind of hokey..
+ if self.order_exits:
+ if self.__ordered_exit_gen:
+ exitgen = self.__ordered_exit_gen
+ exitgen.reset_restriction(self.exit_rstr)
+ else:
+ exitgen = self.__ordered_exit_gen = \
+ OrderedExitGenerator(80, sorted_r, self.exit_rstr)
+ elif self.uniform:
+ # 'real' exits should also be chosen when not using 'order_exits'
+ self.exit_rstr.add_restriction(ExitPolicyRestriction("255.255.255.255", 80))
+ exitgen = UniformGenerator(sorted_r, self.exit_rstr)
+ else:
+ self.exit_rstr.add_restriction(ExitPolicyRestriction("255.255.255.255", 80))
+ exitgen = BwWeightedGenerator(sorted_r, self.exit_rstr, self.pathlen, exit=True)
+
+ if self.uniform:
+ self.path_selector = PathSelector(
+ UniformGenerator(sorted_r, entry_rstr),
+ UniformGenerator(sorted_r, mid_rstr),
+ exitgen, self.path_rstr)
+ else:
+ # Remove ConserveExitsRestrictions for entry and middle positions
+ entry_rstr.del_restriction(ConserveExitsRestriction)
+ mid_rstr.del_restriction(ConserveExitsRestriction)
+ self.path_selector = PathSelector(
+ BwWeightedGenerator(sorted_r, entry_rstr, self.pathlen,
+ guard=self.use_guards),
+ BwWeightedGenerator(sorted_r, mid_rstr, self.pathlen),
+ exitgen, self.path_rstr)
+ return
+
+ def set_target(self, ip, port):
+ "Called to update the ExitPolicyRestrictions with a new ip and port"
+ self.exit_rstr.del_restriction(ExitPolicyRestriction)
+ self.exit_rstr.add_restriction(ExitPolicyRestriction(ip, port))
+ if self.__ordered_exit_gen: self.__ordered_exit_gen.set_port(port)
+ # Try to choose an exit node in the destination country
+ # needs an IP != 255.255.255.255
+ if self.geoip_config and self.geoip_config.echelon:
+ import GeoIPSupport
+ c = GeoIPSupport.get_country(ip)
+ if c:
+ plog("INFO", "[Echelon] IP "+ip+" is in ["+c+"]")
+ self.exit_rstr.del_restriction(CountryRestriction)
+ self.exit_rstr.add_restriction(CountryRestriction(c))
+ else:
+ plog("INFO", "[Echelon] Could not determine destination country of IP "+ip)
+ # Try to use a backup country
+ if self.geoip_config.exit_country:
+ self.exit_rstr.del_restriction(CountryRestriction)
+ self.exit_rstr.add_restriction(CountryRestriction(self.geoip_config.exit_country))
+
+class Circuit:
+ "Class to describe a circuit"
+ def __init__(self):
+ self.circ_id = 0
+ self.path = [] # routers
+ self.exit = None
+ self.built = False
+ self.failed = False
+ self.dirty = False
+ self.closed = False
+ self.detached_cnt = 0
+ self.last_extended_at = time.time()
+ self.extend_times = [] # List of all extend-durations
+ self.setup_duration = None # Sum of extend-times
+ self.pending_streams = [] # Which stream IDs are pending us
+
+ def id_path(self):
+ "Returns a list of idhex keys for the path of Routers"
+ return map(lambda r: r.idhex, self.path)
+
+class Stream:
+ "Class to describe a stream"
+ def __init__(self, sid, host, port, kind):
+ self.strm_id = sid
+ self.detached_from = [] # circ id #'s
+ self.pending_circ = None
+ self.circ = None
+ self.host = host
+ self.port = port
+ self.kind = kind
+ self.attached_at = 0
+ self.bytes_read = 0
+ self.bytes_written = 0
+ self.failed = False
+ self.ignored = False # Set if PURPOSE=DIR_*
+ self.failed_reason = None # Cheating a little.. Only used by StatsHandler
+
+ def lifespan(self, now):
+ "Returns the age of the stream"
+ return now-self.attached_at
+
+# TODO: Make passive "PathWatcher" so people can get aggregate
+# node reliability stats for normal usage without us attaching streams
+
+class PathBuilder(TorCtl.EventHandler):
+ """
+ PathBuilder implementation. Handles circuit construction, subject
+ to the constraints of the SelectionManager selmgr.
+
+ Do not access this object from other threads. Instead, use the
+ schedule_* functions to schedule work to be done in the thread
+ of the EventHandler.
+ """
+ def __init__(self, c, selmgr, RouterClass):
+ """Constructor. 'c' is a Connection, 'selmgr' is a SelectionManager,
+ and 'RouterClass' is a class that inherits from Router and is used
+ to create annotated Routers."""
+ TorCtl.EventHandler.__init__(self)
+ self.c = c
+ nslist = c.get_network_status()
+ self.last_exit = None
+ self.new_nym = False
+ self.resolve_port = 0
+ self.num_circuits = 1
+ self.RouterClass = RouterClass
+ self.sorted_r = []
+ self.name_to_key = {}
+ self.routers = {}
+ self.circuits = {}
+ self.streams = {}
+ self.read_routers(nslist)
+ self.selmgr = selmgr
+ self.selmgr.reconfigure(self.sorted_r)
+ self.imm_jobs = Queue.Queue()
+ self.low_prio_jobs = Queue.Queue()
+ self.run_all_jobs = False
+ self.do_reconfigure = False
+ plog("INFO", "Read "+str(len(self.sorted_r))+"/"+str(len(nslist))+" routers")
+
+ def schedule_immediate(self, job):
+ """
+ Schedules an immediate job to be run before the next event is
+ processed.
+ """
+ self.imm_jobs.put(job)
+
+ def schedule_low_prio(self, job):
+ """
+ Schedules a job to be run when a non-time critical event arrives.
+ """
+ self.low_prio_jobs.put(job)
+
+ def schedule_selmgr(self, job):
+ """
+ Schedules an immediate job to be run before the next event is
+ processed. Also notifies the selection manager that it needs
+ to update itself.
+ """
+ def notlambda(this):
+ job(this.selmgr)
+ this.do_reconfigure = True
+ self.schedule_immediate(notlambda)
+
+
+ def heartbeat_event(self, event):
+ """This function handles dispatching scheduled jobs. If you
+ extend PathBuilder and want to implement this function for
+ some reason, be sure to call the parent class"""
+ while not self.imm_jobs.empty():
+ imm_job = self.imm_jobs.get_nowait()
+ imm_job(self)
+
+ if self.do_reconfigure:
+ self.selmgr.reconfigure(self.sorted_r)
+ self.do_reconfigure = False
+
+ if self.run_all_jobs:
+ self.run_all_jobs = False
+ while not self.low_prio_jobs.empty():
+ imm_job = self.low_prio_jobs.get_nowait()
+ imm_job(self)
+ return
+
+ # If event is stream:NEW*/DETACHED or circ BUILT/FAILED,
+ # don't run low prio jobs.. No need to delay streams for them.
+ if isinstance(event, TorCtl.CircuitEvent):
+ if event.status in ("BUILT", "FAILED"):
+ return
+ elif isinstance(event, TorCtl.StreamEvent):
+ if event.status in ("NEW", "NEWRESOLVE", "DETACHED"):
+ return
+
+ # Do the low prio jobs one at a time in case a
+ # higher priority event is queued
+ if not self.low_prio_jobs.empty():
+ delay_job = self.low_prio_jobs.get_nowait()
+ delay_job(self)
+
+ def read_routers(self, nslist):
+ routers = self.c.read_routers(nslist)
+ new_routers = []
+ for r in routers:
+ self.name_to_key[r.nickname] = "$"+r.idhex
+ if r.idhex in self.routers:
+ if self.routers[r.idhex].nickname != r.nickname:
+ plog("NOTICE", "Router "+r.idhex+" changed names from "
+ +self.routers[r.idhex].nickname+" to "+r.nickname)
+ # Must do IN-PLACE update to keep all the refs to this router
+ # valid and current (especially for stats)
+ self.routers[r.idhex].update_to(r)
+ else:
+ rc = self.RouterClass(r)
+ self.routers[rc.idhex] = rc
+ new_routers.append(rc)
+ self.sorted_r.extend(new_routers)
+ self.sorted_r.sort(lambda x, y: cmp(y.bw, x.bw))
+ for i in xrange(len(self.sorted_r)): self.sorted_r[i].list_rank = i
+
+ def build_path(self):
+ """ Get a path from the SelectionManager's PathSelector, can be used
+ e.g. for generating paths without actually creating any circuits """
+ return self.selmgr.path_selector.build_path(self.selmgr.pathlen)
+
+ def attach_stream_any(self, stream, badcircs):
+ "Attach a stream to a valid circuit, avoiding any in 'badcircs'"
+ # Newnym, and warn if not built plus pending
+ unattached_streams = [stream]
+ if self.new_nym:
+ self.new_nym = False
+ plog("DEBUG", "Obeying new nym")
+ for key in self.circuits.keys():
+ if (not self.circuits[key].dirty
+ and len(self.circuits[key].pending_streams)):
+ plog("WARN", "New nym called, destroying circuit "+str(key)
+ +" with "+str(len(self.circuits[key].pending_streams))
+ +" pending streams")
+ unattached_streams.extend(self.circuits[key].pending_streams)
+ self.circuits[key].pending_streams.clear()
+ # FIXME: Consider actually closing circ if no streams.
+ self.circuits[key].dirty = True
+
+ for circ in self.circuits.itervalues():
+ if circ.built and not circ.dirty and circ.circ_id not in badcircs:
+ if circ.exit.will_exit_to(stream.host, stream.port):
+ try:
+ self.c.attach_stream(stream.strm_id, circ.circ_id)
+ stream.pending_circ = circ # Only one possible here
+ circ.pending_streams.append(stream)
+ except TorCtl.ErrorReply, e:
+ # No need to retry here. We should get the failed
+ # event for either the circ or stream next
+ plog("WARN", "Error attaching stream: "+str(e.args))
+ return
+ break
+ else:
+ circ = None
+ self.selmgr.set_target(stream.host, stream.port)
+ while circ == None:
+ try:
+ circ = self.c.build_circuit(
+ self.selmgr.pathlen,
+ self.selmgr.path_selector)
+ except TorCtl.ErrorReply, e:
+ # FIXME: How come some routers are non-existant? Shouldn't
+ # we have gotten an NS event to notify us they
+ # disappeared?
+ plog("NOTICE", "Error building circ: "+str(e.args))
+ for u in unattached_streams:
+ plog("DEBUG",
+ "Attaching "+str(u.strm_id)+" pending build of "+str(circ.circ_id))
+ u.pending_circ = circ
+ circ.pending_streams.extend(unattached_streams)
+ self.circuits[circ.circ_id] = circ
+ self.last_exit = circ.exit
+
+ def circ_status_event(self, c):
+ output = [c.event_name, str(c.circ_id), c.status]
+ if c.path: output.append(",".join(c.path))
+ if c.reason: output.append("REASON=" + c.reason)
+ if c.remote_reason: output.append("REMOTE_REASON=" + c.remote_reason)
+ plog("DEBUG", " ".join(output))
+ # Circuits we don't control get built by Tor
+ if c.circ_id not in self.circuits:
+ plog("DEBUG", "Ignoring circ " + str(c.circ_id))
+ return
+ if c.status == "EXTENDED":
+ self.circuits[c.circ_id].last_extended_at = c.arrived_at
+ elif c.status == "FAILED" or c.status == "CLOSED":
+ # XXX: Can still get a STREAM FAILED for this circ after this
+ circ = self.circuits[c.circ_id]
+ del self.circuits[c.circ_id]
+ for stream in circ.pending_streams:
+ plog("DEBUG", "Finding new circ for " + str(stream.strm_id))
+ self.attach_stream_any(stream, stream.detached_from)
+ elif c.status == "BUILT":
+ self.circuits[c.circ_id].built = True
+ try:
+ for stream in self.circuits[c.circ_id].pending_streams:
+ self.c.attach_stream(stream.strm_id, c.circ_id)
+ except TorCtl.ErrorReply, e:
+ # No need to retry here. We should get the failed
+ # event for either the circ or stream next
+ plog("WARN", "Error attaching stream: "+str(e.args))
+ return
+
+ def stream_status_event(self, s):
+ output = [s.event_name, str(s.strm_id), s.status, str(s.circ_id),
+ s.target_host, str(s.target_port)]
+ if s.reason: output.append("REASON=" + s.reason)
+ if s.remote_reason: output.append("REMOTE_REASON=" + s.remote_reason)
+ if s.purpose: output.append("PURPOSE=" + s.purpose)
+ plog("DEBUG", " ".join(output))
+ if not re.match(r"\d+.\d+.\d+.\d+", s.target_host):
+ s.target_host = "255.255.255.255" # ignore DNS for exit policy check
+
+ # Hack to ignore Tor-handled streams (Currently only directory streams)
+ if s.strm_id in self.streams and self.streams[s.strm_id].ignored:
+ plog("DEBUG", "Ignoring stream: " + str(s.strm_id))
+ return
+
+ if s.status == "NEW" or s.status == "NEWRESOLVE":
+ if s.status == "NEWRESOLVE" and not s.target_port:
+ s.target_port = self.resolve_port
+ self.streams[s.strm_id] = Stream(s.strm_id, s.target_host, s.target_port, s.status)
+ # Remember Tor-handled streams (Currently only directory streams)
+ if s.purpose and s.purpose.find("DIR_") == 0:
+ self.streams[s.strm_id].ignored = True
+ plog("DEBUG", "Ignoring stream: " + str(s.strm_id))
+ return
+ else:
+ self.attach_stream_any(self.streams[s.strm_id],
+ self.streams[s.strm_id].detached_from)
+ elif s.status == "DETACHED":
+ if s.strm_id not in self.streams:
+ plog("WARN", "Detached stream "+str(s.strm_id)+" not found")
+ self.streams[s.strm_id] = Stream(s.strm_id, s.target_host,
+ s.target_port, "NEW")
+ # FIXME Stats (differentiate Resolved streams also..)
+ if not s.circ_id:
+ plog("WARN", "Stream "+str(s.strm_id)+" detached from no circuit!")
+ else:
+ self.streams[s.strm_id].detached_from.append(s.circ_id)
+
+ if self.streams[s.strm_id] in self.streams[s.strm_id].pending_circ.pending_streams:
+ self.streams[s.strm_id].pending_circ.pending_streams.remove(self.streams[s.strm_id])
+ self.streams[s.strm_id].pending_circ = None
+ self.attach_stream_any(self.streams[s.strm_id],
+ self.streams[s.strm_id].detached_from)
+ elif s.status == "SUCCEEDED":
+ if s.strm_id not in self.streams:
+ plog("NOTICE", "Succeeded stream "+str(s.strm_id)+" not found")
+ return
+ if s.circ_id and self.streams[s.strm_id].pending_circ.circ_id != s.circ_id:
+ # Hrmm.. this can happen on a new-nym.. Very rare, putting warn
+ # in because I'm still not sure this is correct
+ plog("WARN", "Mismatch of pending: "
+ +str(self.streams[s.strm_id].pending_circ.circ_id)+" vs "
+ +str(s.circ_id))
+ # This can happen if the circuit existed before we started up
+ if s.circ_id in self.circuits:
+ self.streams[s.strm_id].circ = self.circuits[s.circ_id]
+ else:
+ plog("NOTICE", "Stream "+str(s.strm_id)+" has unknown circuit: "+str(s.circ_id))
+ else:
+ self.streams[s.strm_id].circ = self.streams[s.strm_id].pending_circ
+ self.streams[s.strm_id].pending_circ.pending_streams.remove(self.streams[s.strm_id])
+ self.streams[s.strm_id].pending_circ = None
+ self.streams[s.strm_id].attached_at = s.arrived_at
+ elif s.status == "FAILED" or s.status == "CLOSED":
+ # FIXME stats
+ if s.strm_id not in self.streams:
+ plog("NOTICE", "Failed stream "+str(s.strm_id)+" not found")
+ return
+
+ if not s.circ_id:
+ plog("WARN", "Stream "+str(s.strm_id)+" failed from no circuit!")
+
+ # We get failed and closed for each stream. OK to return
+ # and let the closed do the cleanup
+ if s.status == "FAILED":
+ # Avoid busted circuits that will not resolve or carry
+ # traffic.
+ self.streams[s.strm_id].failed = True
+ if s.circ_id in self.circuits: self.circuits[s.circ_id].dirty = True
+ else: plog("WARN","Failed stream on unknown circ "+str(s.circ_id))
+ return
+
+ if self.streams[s.strm_id].pending_circ:
+ self.streams[s.strm_id].pending_circ.pending_streams.remove(self.streams[s.strm_id])
+ del self.streams[s.strm_id]
+ elif s.status == "REMAP":
+ if s.strm_id not in self.streams:
+ plog("WARN", "Remap id "+str(s.strm_id)+" not found")
+ else:
+ if not re.match(r"\d+.\d+.\d+.\d+", s.target_host):
+ s.target_host = "255.255.255.255"
+ plog("NOTICE", "Non-IP remap for "+str(s.strm_id)+" to "
+ + s.target_host)
+ self.streams[s.strm_id].host = s.target_host
+ self.streams[s.strm_id].port = s.target_port
+
+ def stream_bw_event(self, s):
+ output = [s.event_name, str(s.strm_id), str(s.bytes_read),
+ str(s.bytes_written)]
+ plog("DEBUG", " ".join(output))
+ if not s.strm_id in self.streams:
+ plog("WARN", "BW event for unknown stream id: "+str(s.strm_id))
+ else:
+ self.streams[s.strm_id].bytes_read += s.bytes_read
+ self.streams[s.strm_id].bytes_written += s.bytes_written
+
+ def ns_event(self, n):
+ self.read_routers(n.nslist)
+ plog("DEBUG", "Read " + str(len(n.nslist))+" NS => "
+ + str(len(self.sorted_r)) + " routers")
+
+ def new_desc_event(self, d):
+ for i in d.idlist: # Is this too slow?
+ self.read_routers(self.c.get_network_status("id/"+i))
+ plog("DEBUG", "Read " + str(len(d.idlist))+" Desc => "
+ + str(len(self.sorted_r)) + " routers")
+
+ def bandwidth_event(self, b): pass # For heartbeat only..
+
+################### CircuitHandler #############################
+
+class CircuitHandler(PathBuilder):
+ """ CircuitHandler that extends from PathBuilder to handle multiple
+ circuits as opposed to just one. """
+ def __init__(self, c, selmgr, num_circuits, RouterClass):
+ """Constructor. 'c' is a Connection, 'selmgr' is a SelectionManager,
+ 'num_circuits' is the number of circuits to keep in the pool,
+ and 'RouterClass' is a class that inherits from Router and is used
+ to create annotated Routers."""
+ PathBuilder.__init__(self, c, selmgr, RouterClass)
+ # Set handler to the connection here to
+ # not miss any circuit events on startup
+ c.set_event_handler(self)
+ self.num_circuits = num_circuits # Size of the circuit pool
+ self.check_circuit_pool() # Bring up the pool of circs
+
+ def check_circuit_pool(self):
+ """ Init or check the status of the circuit-pool """
+ # Get current number of circuits
+ n = len(self.circuits.values())
+ i = self.num_circuits-n
+ if i > 0:
+ plog("INFO", "Checked pool of circuits: we need to build " +
+ str(i) + " circuits")
+ # Schedule (num_circs-n) circuit-buildups
+ while (n < self.num_circuits):
+ self.build_circuit("255.255.255.255", 80)
+ plog("DEBUG", "Scheduled circuit No. " + str(n+1))
+ n += 1
+
+ def build_circuit(self, host, port):
+ """ Build a circuit """
+ circ = None
+ while circ == None:
+ try:
+ self.selmgr.set_target(host, port)
+ circ = self.c.build_circuit(self.selmgr.pathlen,
+ self.selmgr.path_selector)
+ self.circuits[circ.circ_id] = circ
+ return circ
+ except TorCtl.ErrorReply, e:
+ # FIXME: How come some routers are non-existant? Shouldn't
+ # we have gotten an NS event to notify us they disappeared?
+ plog("NOTICE", "Error building circuit: " + str(e.args))
+
+ def close_circuit(self, id):
+ """ Close a circuit with given id """
+ # TODO: Pass streams to another circ before closing?
+ self.circuits[id].closed = True
+ try: self.c.close_circuit(id)
+ except TorCtl.ErrorReply, e:
+ plog("ERROR", "Failed closing circuit " + str(id) + ": " + str(e))
+
+ def circ_status_event(self, c):
+ """ Handle circuit status events """
+ output = [c.event_name, str(c.circ_id), c.status]
+ if c.path: output.append(",".join(c.path))
+ if c.reason: output.append("REASON=" + c.reason)
+ if c.remote_reason: output.append("REMOTE_REASON=" + c.remote_reason)
+ plog("DEBUG", " ".join(output))
+
+ # Circuits we don't control get built by Tor
+ if c.circ_id not in self.circuits:
+ plog("DEBUG", "Ignoring circuit " + str(c.circ_id) +
+ " (controlled by Tor)")
+ return
+
+ # EXTENDED
+ if c.status == "EXTENDED":
+ # Compute elapsed time
+ extend_time = c.arrived_at-self.circuits[c.circ_id].last_extended_at
+ self.circuits[c.circ_id].extend_times.append(extend_time)
+ plog("INFO", "Circuit " + str(c.circ_id) + " extended in " +
+ str(extend_time) + " sec")
+ self.circuits[c.circ_id].last_extended_at = c.arrived_at
+
+ # FAILED & CLOSED
+ elif c.status == "FAILED" or c.status == "CLOSED":
+ # XXX: Can still get a STREAM FAILED for this circ after this
+ circ = self.circuits[c.circ_id]
+ # Actual removal of the circ
+ del self.circuits[c.circ_id]
+ # Give away pending streams
+ for stream in circ.pending_streams:
+ plog("DEBUG", "Finding new circ for " + str(stream.strm_id))
+ self.attach_stream_any(stream, stream.detached_from)
+ # Check if there are enough circs
+ self.check_circuit_pool()
+ return
+
+ # BUILT
+ elif c.status == "BUILT":
+ circ = self.circuits[c.circ_id]
+ circ.built = True
+ for stream in circ.pending_streams:
+ try:
+ self.c.attach_stream(stream.strm_id, c.circ_id)
+ except TorCtl.ErrorReply, e:
+ # No need to retry here. We should get the failed
+ # event for either the circ or stream next
+ plog("WARN", "Error attaching stream: " + str(e.args))
+ # Compute duration by summing up extend_times
+ duration = reduce(lambda x, y: x+y, circ.extend_times, 0.0)
+ plog("INFO", "Circuit " + str(c.circ_id) + " needed " +
+ str(duration) + " seconds to be built")
+ # Save the duration to the circuit for later use
+ circ.setup_duration = duration
+
+ # OTHER?
+ else:
+ # If this was e.g. a LAUNCHED
+ pass
+
+################### StreamHandler ##############################
+
+class StreamHandler(CircuitHandler):
+ """ StreamHandler that extends from the CircuitHandler
+ to handle attaching streams to an appropriate circuit
+ in the pool. """
+ def __init__(self, c, selmgr, num_circs, RouterClass):
+ CircuitHandler.__init__(self, c, selmgr, num_circs, RouterClass)
+ self.sorted_circs = None # optional sorted list
+
+ def clear_dns_cache(self):
+ """ Send signal CLEARDNSCACHE """
+ lines = self.c.sendAndRecv("SIGNAL CLEARDNSCACHE\r\n")
+ for _, msg, more in lines:
+ plog("DEBUG", "CLEARDNSCACHE: " + msg)
+
+ def close_stream(self, id, reason):
+ """ Close a stream with given id and reason """
+ self.c.close_stream(id, reason)
+
+ def create_and_attach(self, stream, unattached_streams):
+ """ Create a new circuit and attach (stream + unattached_streams) """
+ circ = self.build_circuit(stream.host, stream.port)
+ if circ:
+ for u in unattached_streams:
+ plog("DEBUG", "Attaching " + str(u.strm_id) +
+ " pending build of circuit " + str(circ.circ_id))
+ u.pending_circ = circ
+ circ.pending_streams.extend(unattached_streams)
+ self.circuits[circ.circ_id] = circ
+ self.last_exit = circ.exit
+
+ def attach_stream_any(self, stream, badcircs):
+ """ Attach a regular user stream """
+ unattached_streams = [stream]
+ if self.new_nym:
+ self.new_nym = False
+ plog("DEBUG", "Obeying new nym")
+ for key in self.circuits.keys():
+ if (not self.circuits[key].dirty
+ and len(self.circuits[key].pending_streams)):
+ plog("WARN", "New nym called, destroying circuit "+str(key)
+ +" with "+str(len(self.circuits[key].pending_streams))
+ +" pending streams")
+ unattached_streams.extend(self.circuits[key].pending_streams)
+ del self.circuits[key].pending_streams[:]
+ # FIXME: Consider actually closing circs if no streams
+ self.circuits[key].dirty = True
+
+ # Check if there is a sorted list of circs
+ if self.sorted_circs: list = self.sorted_circs
+ else: list = self.circuits.values()
+ for circ in list:
+ # Check each circuit
+ if circ.built and not circ.closed and circ.circ_id not in badcircs and not circ.dirty:
+ if circ.exit.will_exit_to(stream.host, stream.port):
+ try:
+ self.c.attach_stream(stream.strm_id, circ.circ_id)
+ stream.pending_circ = circ # Only one possible here
+ circ.pending_streams.append(stream)
+ self.last_exit = circ.exit
+ except TorCtl.ErrorReply, e:
+ # No need to retry here. We should get the failed
+ # event for either the circ or stream next
+ plog("WARN", "Error attaching stream: " + str(e.args))
+ return
+ break
+ else:
+ plog("DEBUG", "Circuit " + str(circ.circ_id) + " won't exit")
+ else:
+ self.create_and_attach(stream, unattached_streams)
+
+ def stream_status_event(self, s):
+ """ Catch user stream events """
+ # Construct debugging output
+ output = [s.event_name, str(s.strm_id), s.status, str(s.circ_id), s.target_host+':'+str(s.target_port)]
+ if s.reason: output.append("REASON=" + s.reason)
+ if s.remote_reason: output.append("REMOTE_REASON=" + s.remote_reason)
+ if s.purpose: output.append("PURPOSE=" + s.purpose)
+ plog("DEBUG", " ".join(output))
+
+ # If target_host is not an IP-address
+ if not re.match(r"\d+.\d+.\d+.\d+", s.target_host):
+ s.target_host = "255.255.255.255" # ignore DNS for exit policy check
+
+ # Hack to ignore Tor-handled streams (Currently only directory streams)
+ if s.strm_id in self.streams and self.streams[s.strm_id].ignored:
+ plog("DEBUG", "Ignoring stream: " + str(s.strm_id))
+ return
+
+ # NEW or NEWRESOLVE
+ if s.status == "NEW" or s.status == "NEWRESOLVE":
+ if s.status == "NEWRESOLVE" and not s.target_port:
+ s.target_port = self.resolve_port
+ # Set up the new stream
+ stream = Stream(s.strm_id, s.target_host, s.target_port, s.status)
+
+ self.streams[s.strm_id] = stream
+ if s.purpose and s.purpose.find("DIR_") == 0:
+ stream.ignored = True
+ plog("DEBUG", "Ignoring stream: " + str(s.strm_id))
+ return
+ else:
+ self.attach_stream_any(self.streams[s.strm_id], self.streams[s.strm_id].detached_from)
+
+ # DETACHED
+ elif s.status == "DETACHED":
+ # Stream not found
+ if s.strm_id not in self.streams:
+ plog("WARN", "Detached stream " + str(s.strm_id) + " not found")
+ self.streams[s.strm_id] = Stream(s.strm_id, s.target_host, s.target_port, "NEW")
+ # Circuit not found
+ if not s.circ_id:
+ plog("WARN", "Stream " + str(s.strm_id) + " detached from no circuit!")
+ else:
+ self.streams[s.strm_id].detached_from.append(s.circ_id)
+ # Detect timeouts on user streams
+ if s.reason == "TIMEOUT":
+ # TODO: Count timeouts on streams?
+ #self.streams[s.strm_id].timeout_counter += 1
+ plog("DEBUG", "User stream timed out on circuit " + str(s.circ_id))
+ # Stream was pending
+ if self.streams[s.strm_id] in self.streams[s.strm_id].pending_circ.pending_streams:
+ self.streams[s.strm_id].pending_circ.pending_streams.remove(self.streams[s.strm_id])
+ # Attach to another circ
+ self.streams[s.strm_id].pending_circ = None
+ self.attach_stream_any(self.streams[s.strm_id], self.streams[s.strm_id].detached_from)
+
+ # SUCCEEDED
+ if s.status == "SUCCEEDED":
+ if s.strm_id not in self.streams:
+ plog("NOTICE", "Succeeded stream " + str(s.strm_id) + " not found")
+ return
+ if s.circ_id and self.streams[s.strm_id].pending_circ.circ_id != s.circ_id:
+ # Hrmm.. this can happen on a new-nym.. Very rare, putting warn
+ # in because I'm still not sure this is correct
+ plog("WARN", "Mismatch of pending: "
+ + str(self.streams[s.strm_id].pending_circ.circ_id) + " vs "
+ + str(s.circ_id))
+ self.streams[s.strm_id].circ = self.circuits[s.circ_id]
+ else:
+ self.streams[s.strm_id].circ = self.streams[s.strm_id].pending_circ
+ self.streams[s.strm_id].pending_circ.pending_streams.remove(self.streams[s.strm_id])
+ self.streams[s.strm_id].pending_circ = None
+ self.streams[s.strm_id].attached_at = s.arrived_at
+
+ # FAILED or CLOSED
+ elif s.status == "FAILED" or s.status == "CLOSED":
+ if s.strm_id not in self.streams:
+ plog("NOTICE", "Failed stream " + str(s.strm_id) + " not found")
+ return
+ # if not s.circ_id:
+ # plog("WARN", "Stream " + str(s.strm_id) + " closed/failed from no circuit")
+ # We get failed and closed for each stream, let CLOSED do the cleanup
+ if s.status == "FAILED":
+ # Avoid busted circuits that will not resolve or carry traffic
+ self.streams[s.strm_id].failed = True
+ if s.circ_id in self.circuits: self.circuits[s.circ_id].dirty = True
+ elif self.streams[s.strm_id].attached_at != 0:
+ plog("WARN", "Failed stream on unknown circuit " + str(s.circ_id))
+ return
+ # CLOSED
+ if self.streams[s.strm_id].pending_circ:
+ self.streams[s.strm_id].pending_circ.pending_streams.remove(self.streams[s.strm_id])
+ # Actual removal of the stream
+ del self.streams[s.strm_id]
+
+ # REMAP
+ elif s.status == "REMAP":
+ if s.strm_id not in self.streams:
+ plog("WARN", "Remap id "+str(s.strm_id)+" not found")
+ else:
+ if not re.match(r"\d+.\d+.\d+.\d+", s.target_host):
+ s.target_host = "255.255.255.255"
+ plog("NOTICE", "Non-IP remap for "+str(s.strm_id) +
+ " to " + s.target_host)
+ self.streams[s.strm_id].host = s.target_host
+ self.streams[s.strm_id].port = s.target_port
+
+ def address_mapped_event(self, event):
+ """ It is necessary to listen to ADDRMAP events to be able to
+ perform DNS lookups using Tor """
+ output = [event.event_name, event.from_addr, event.to_addr,
+ time.asctime(event.when)]
+ plog("DEBUG", " ".join(output))
+
+ def unknown_event(self, event):
+ plog("DEBUG", "UNKNOWN EVENT '" + event.event_name + "':" +
+ event.event_string)
+
+########################## Unit tests ##########################
+
+def do_gen_unit(gen, r_list, weight_bw, num_print):
+ trials = 0
+ for r in r_list:
+ if gen.rstr_list.r_is_ok(r):
+ trials += weight_bw(gen, r)
+ trials = int(trials/1024)
+
+ print "Running "+str(trials)+" trials"
+
+ # 0. Reset r.chosen = 0 for all routers
+ for r in r_list:
+ r.chosen = 0
+
+ # 1. Generate 'trials' choices:
+ # 1a. r.chosen++
+
+ loglevel = TorUtil.loglevel
+ TorUtil.loglevel = "INFO"
+
+ #gen.rewind() - Just overhead if we create a fresh generator each time
+ rtrs = gen.generate()
+ for i in xrange(1, trials):
+ r = rtrs.next()
+ r.chosen += 1
+
+ TorUtil.loglevel = loglevel
+
+ # 2. Print top num_print routers choices+bandwidth stats+flags
+ i = 0
+ copy_rlist = copy.copy(r_list)
+ copy_rlist.sort(lambda x, y: cmp(y.chosen, x.chosen))
+ for r in copy_rlist:
+ if not gen.rstr_list.r_is_ok(r): continue
+ flag = ""
+ bw = int(weight_bw(gen, r))
+ if "Exit" in r.flags:
+ flag += "E"
+ if "Guard" in r.flags:
+ flag += "G"
+ print str(r.list_rank)+". "+r.nickname+" "+str(r.bw/1024)+"/"+str(bw/1024)+": "+str(r.chosen)+", "+flag
+ i += 1
+ if i > num_print: break
+
+def do_unit(rst, r_list, plamb):
+ print "\n"
+ print "-----------------------------------"
+ print rst.r_is_ok.im_class
+ above_i = 0
+ above_bw = 0
+ below_i = 0
+ below_bw = 0
+ for r in r_list:
+ if rst.r_is_ok(r):
+ print r.nickname+" "+plamb(r)+"="+str(rst.r_is_ok(r))+" "+str(r.bw)
+ if r.bw > 400000:
+ above_i = above_i + 1
+ above_bw += r.bw
+ else:
+ below_i = below_i + 1
+ below_bw += r.bw
+
+ print "Routers above: " + str(above_i) + " bw: " + str(above_bw)
+ print "Routers below: " + str(below_i) + " bw: " + str(below_bw)
+
+# TODO: Tests:
+# - Test each NodeRestriction and print in/out lines for it
+# - Test NodeGenerator and reapply NodeRestrictions
+# - Same for PathSelector and PathRestrictions
+# - Also Reapply each restriction by hand to path. Verify returns true
+
+if __name__ == '__main__':
+ s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+ s.connect(("127.0.0.1",9051))
+ c = Connection(s)
+ c.debug(file("control.log", "w"))
+ c.authenticate()
+ nslist = c.get_network_status()
+ sorted_rlist = c.read_routers(c.get_network_status())
+
+ sorted_rlist.sort(lambda x, y: cmp(y.bw, x.bw))
+ for i in xrange(len(sorted_rlist)): sorted_rlist[i].list_rank = i
+
+ def flag_weighting(bwgen, r):
+ bw = r.bw
+ if "Exit" in r.flags:
+ bw *= bwgen.exit_weight
+ if "Guard" in r.flags:
+ bw *= bwgen.guard_weight
+ return bw
+
+ do_gen_unit(BwWeightedGenerator(sorted_rlist, FlagsRestriction(["Exit"]),
+ 3, exit=True),
+ sorted_rlist, flag_weighting, 500)
+
+ do_gen_unit(BwWeightedGenerator(sorted_rlist, FlagsRestriction(["Guard"]),
+ 3, guard=True),
+ sorted_rlist, flag_weighting, 500)
+
+ do_gen_unit(
+ BwWeightedGenerator(sorted_rlist, FlagsRestriction(["Valid"]), 3),
+ sorted_rlist, flag_weighting, 500)
+
+ exit(0)
+
+ for r in sorted_rlist:
+ if r.will_exit_to("211.11.21.22", 465):
+ print r.nickname+" "+str(r.bw)
+
+ do_unit(FlagsRestriction(["Guard"], []), sorted_rlist, lambda r: " ".join(r.flags))
+ do_unit(FlagsRestriction(["Fast"], []), sorted_rlist, lambda r: " ".join(r.flags))
+
+ do_unit(ExitPolicyRestriction("2.11.2.2", 80), sorted_rlist,
+ lambda r: "exits to 80")
+ do_unit(PercentileRestriction(0, 100, sorted_rlist), sorted_rlist,
+ lambda r: "")
+ do_unit(PercentileRestriction(10, 20, sorted_rlist), sorted_rlist,
+ lambda r: "")
+ do_unit(OSRestriction([r"[lL]inux", r"BSD", "Darwin"], []), sorted_rlist,
+ lambda r: r.os)
+ do_unit(OSRestriction([], ["Windows", "Solaris"]), sorted_rlist,
+ lambda r: r.os)
+
+ do_unit(VersionRangeRestriction("0.1.2.0"), sorted_rlist,
+ lambda r: str(r.version))
+ do_unit(VersionRangeRestriction("0.1.2.0", "0.1.2.5"), sorted_rlist,
+ lambda r: str(r.version))
+ do_unit(VersionIncludeRestriction(["0.1.1.26-alpha", "0.1.2.7-ignored"]),
+ sorted_rlist, lambda r: str(r.version))
+ do_unit(VersionExcludeRestriction(["0.1.1.26"]), sorted_rlist,
+ lambda r: str(r.version))
+
+ do_unit(ConserveExitsRestriction(), sorted_rlist, lambda r: " ".join(r.flags))
+ do_unit(FlagsRestriction([], ["Valid"]), sorted_rlist, lambda r: " ".join(r.flags))
+
+ do_unit(IdHexRestriction("$FFCB46DB1339DA84674C70D7CB586434C4370441"),
+ sorted_rlist, lambda r: r.idhex)
+
+ rl = [AtLeastNNodeRestriction([ExitPolicyRestriction("255.255.255.255", 80), ExitPolicyRestriction("255.255.255.255", 443), ExitPolicyRestriction("255.255.255.255", 6667)], 2), FlagsRestriction([], ["BadExit"])]
+
+ exit_rstr = NodeRestrictionList(rl)
+
+ ug = UniformGenerator(sorted_rlist, exit_rstr)
+
+ rlist = []
+ for r in ug.generate():
+ print "Checking: " + r.nickname
+ for rs in rl:
+ if not rs.r_is_ok(r):
+ raise PathError()
+ if not "Exit" in r.flags:
+ print "No exit in flags of "+r.nickname
+ rlist.append(r)
+ for r in sorted_rlist:
+ if "Exit" in r.flags and not r in rlist:
+ print r.nickname+" is an exit not in rl!"
+
Deleted: torflow/branches/gsoc2008/TorCtl.local/README
===================================================================
--- torflow/branches/gsoc2008/TorCtl/README 2008-07-07 16:40:57 UTC (rev 15734)
+++ torflow/branches/gsoc2008/TorCtl.local/README 2008-07-07 19:37:47 UTC (rev 15742)
@@ -1,10 +0,0 @@
-See the pydoc:
-
-# python
-
->>> import TorCtl
->>> help(TorCtl)
-
->>> import PathSupport
->>> help(PathSupport)
-
Copied: torflow/branches/gsoc2008/TorCtl.local/README (from rev 15741, torflow/branches/gsoc2008/TorCtl/README)
===================================================================
--- torflow/branches/gsoc2008/TorCtl.local/README (rev 0)
+++ torflow/branches/gsoc2008/TorCtl.local/README 2008-07-07 19:37:47 UTC (rev 15742)
@@ -0,0 +1,10 @@
+See the pydoc:
+
+# python
+
+>>> import TorCtl
+>>> help(TorCtl)
+
+>>> import PathSupport
+>>> help(PathSupport)
+
Deleted: torflow/branches/gsoc2008/TorCtl.local/StatsSupport.py
===================================================================
--- torflow/branches/gsoc2008/TorCtl/StatsSupport.py 2008-07-07 16:40:57 UTC (rev 15734)
+++ torflow/branches/gsoc2008/TorCtl.local/StatsSupport.py 2008-07-07 19:37:47 UTC (rev 15742)
@@ -1,676 +0,0 @@
-#!/usr/bin/python
-#StatsSupport.py - functions and classes useful for calculating stream/circuit statistics
-
-"""
-
-Support classes for statisics gathering
-
-The StatsSupport package contains several classes that extend PathSupport to
-gather continuous statistics on the Tor network.
-
-The main entrypoint is to extend or create an instance of the StatsHandler
-class. The StatsHandler extends from TorCtl.PathSupport.PathBuilder, which is
-itself a TorCtl.EventHandler. The StatsHandler listens to CIRC and STREAM
-events and gathers all manner of statics on their creation and failure before
-passing the events back up to the PathBuilder code, which manages the actual
-construction and the attachment of streams to circuits.
-
-The package also contains a number of support classes that help gather
-additional statistics on the reliability and performance of routers.
-
-For the purpose of accounting failures, the code tracks two main classes of
-failure: 'actual' failure and 'suspected' failure. The general rule is that an
-actual failure is attributed to the node that directly handled the circuit or
-stream. For streams, this is considered to be the exit node. For circuits, it
-is both the extender and the extendee. 'Suspected' failures, on the other
-hand, are attributed to every member of the circuit up until the extendee for
-circuits, and all hops for streams.
-
-For bandwidth accounting, the average stream bandwidth and the average ratio
-of stream bandwidth to advertised bandwidth are tracked, and when the
-statistics are written, a Z-test is performed to calculate the probabilities
-of these values assuming a normal distribution. Note, however, that it has not
-been verified that this distribution is actually normal. It is likely to be
-something else (pareto, perhaps?).
-
-"""
-
-import sys
-import re
-import random
-import copy
-import time
-import math
-
-import TorUtil, PathSupport, TorCtl
-from TorUtil import *
-from PathSupport import *
-from TorUtil import meta_port, meta_host, control_port, control_host
-
-class ReasonRouterList:
- "Helper class to track which Routers have failed for a given reason"
- def __init__(self, reason):
- self.reason = reason
- self.rlist = {}
-
- def sort_list(self): raise NotImplemented()
-
- def write_list(self, f):
- "Write the list of failure counts for this reason 'f'"
- rlist = self.sort_list()
- for r in rlist:
- susp = 0
- tot_failed = r.circ_failed+r.strm_failed
- tot_susp = tot_failed+r.circ_suspected+r.strm_suspected
- f.write(r.idhex+" ("+r.nickname+") F=")
- if self.reason in r.reason_failed:
- susp = r.reason_failed[self.reason]
- f.write(str(susp)+"/"+str(tot_failed))
- f.write(" S=")
- if self.reason in r.reason_suspected:
- susp += r.reason_suspected[self.reason]
- f.write(str(susp)+"/"+str(tot_susp)+"\n")
-
- def add_r(self, r):
- "Add a router to the list for this reason"
- self.rlist[r] = 1
-
- def total_suspected(self):
- "Get a list of total suspected failures for this reason"
- # suspected is disjoint from failed. The failed table
- # may not have an entry
- def notlambda(x, y):
- if self.reason in y.reason_suspected:
- if self.reason in y.reason_failed:
- return (x + y.reason_suspected[self.reason]
- + y.reason_failed[self.reason])
- else:
- return (x + y.reason_suspected[self.reason])
- else:
- if self.reason in y.reason_failed:
- return (x + y.reason_failed[self.reason])
- else: return x
- return reduce(notlambda, self.rlist.iterkeys(), 0)
-
- def total_failed(self):
- "Get a list of total failures for this reason"
- def notlambda(x, y):
- if self.reason in y.reason_failed:
- return (x + y.reason_failed[self.reason])
- else: return x
- return reduce(notlambda, self.rlist.iterkeys(), 0)
-
-class SuspectRouterList(ReasonRouterList):
- """Helper class to track all routers suspected of failing for a given
- reason. The main difference between this and the normal
- ReasonRouterList is the sort order and the verification."""
- def __init__(self, reason): ReasonRouterList.__init__(self,reason)
-
- def sort_list(self):
- rlist = self.rlist.keys()
- rlist.sort(lambda x, y: cmp(y.reason_suspected[self.reason],
- x.reason_suspected[self.reason]))
- return rlist
-
- def _verify_suspected(self):
- return reduce(lambda x, y: x + y.reason_suspected[self.reason],
- self.rlist.iterkeys(), 0)
-
-class FailedRouterList(ReasonRouterList):
- """Helper class to track all routers that failed for a given
- reason. The main difference between this and the normal
- ReasonRouterList is the sort order and the verification."""
- def __init__(self, reason): ReasonRouterList.__init__(self,reason)
-
- def sort_list(self):
- rlist = self.rlist.keys()
- rlist.sort(lambda x, y: cmp(y.reason_failed[self.reason],
- x.reason_failed[self.reason]))
- return rlist
-
- def _verify_failed(self):
- return reduce(lambda x, y: x + y.reason_failed[self.reason],
- self.rlist.iterkeys(), 0)
-class BandwidthStats:
- "Class that manages observed bandwidth through a Router"
- def __init__(self):
- self.byte_list = []
- self.duration_list = []
- self.min_bw = 1e10
- self.max_bw = 0
- self.mean = 0
- self.dev = 0
-
- def _exp(self): # Weighted avg
- "Expectation - weighted average of the bandwidth through this node"
- tot_bw = reduce(lambda x, y: x+y, self.byte_list, 0.0)
- EX = 0.0
- for i in xrange(len(self.byte_list)):
- EX += (self.byte_list[i]*self.byte_list[i])/self.duration_list[i]
- if tot_bw == 0.0: return 0.0
- EX /= tot_bw
- return EX
-
- def _exp2(self): # E[X^2]
- "Second moment of the bandwidth"
- tot_bw = reduce(lambda x, y: x+y, self.byte_list, 0.0)
- EX = 0.0
- for i in xrange(len(self.byte_list)):
- EX += (self.byte_list[i]**3)/(self.duration_list[i]**2)
- if tot_bw == 0.0: return 0.0
- EX /= tot_bw
- return EX
-
- def _dev(self): # Weighted dev
- "Standard deviation of bandwidth"
- EX = self.mean
- EX2 = self._exp2()
- arg = EX2 - (EX*EX)
- if arg < -0.05:
- plog("WARN", "Diff of "+str(EX2)+" and "+str(EX)+"^2 is "+str(arg))
- return math.sqrt(abs(arg))
-
- def add_bw(self, bytes, duration):
- "Add an observed transfer of 'bytes' for 'duration' seconds"
- if not bytes: plog("WARN", "No bytes for bandwidth")
- bytes /= 1024.
- self.byte_list.append(bytes)
- self.duration_list.append(duration)
- bw = bytes/duration
- plog("DEBUG", "Got bandwidth "+str(bw))
- if self.min_bw > bw: self.min_bw = bw
- if self.max_bw < bw: self.max_bw = bw
- self.mean = self._exp()
- self.dev = self._dev()
-
-
-
-class StatsRouter(TorCtl.Router):
- "Extended Router to handle statistics markup"
- def __init__(self, router): # Promotion constructor :)
- """'Promotion Constructor' that converts a Router directly into a
- StatsRouter without a copy."""
- self.__dict__ = router.__dict__
- self.reset()
-
- def reset(self):
- "Reset all stats on this Router"
- self.circ_uncounted = 0
- self.circ_failed = 0
- self.circ_succeeded = 0 # disjoint from failed
- self.circ_suspected = 0
- self.circ_chosen = 0 # above 4 should add to this
- self.strm_failed = 0 # Only exits should have these
- self.strm_succeeded = 0
- self.strm_suspected = 0 # disjoint from failed
- self.strm_uncounted = 0
- self.strm_chosen = 0 # above 4 should add to this
- self.reason_suspected = {}
- self.reason_failed = {}
- self.first_seen = time.time()
- if "Running" in self.flags:
- self.became_active_at = self.first_seen
- self.hibernated_at = 0
- else:
- self.became_active_at = 0
- self.hibernated_at = self.first_seen
- self.total_hibernation_time = 0
- self.total_active_uptime = 0
- self.total_extend_time = 0
- self.total_extended = 0
- self.bwstats = BandwidthStats()
- self.z_ratio = 0
- self.prob_zr = 0
- self.z_bw = 0
- self.prob_zb = 0
-
- def avg_extend_time(self):
- """Return the average amount of time it took for this router
- to extend a circuit one hop"""
- if self.total_extended:
- return self.total_extend_time/self.total_extended
- else: return 0
-
- def bw_ratio(self):
- """Return the ratio of the Router's advertised bandwidth to its
- observed average stream bandwidth"""
- bw = self.bwstats.mean
- if bw == 0.0: return 0
- else: return self.bw/(1024.*bw)
-
- def current_uptime(self):
- if self.became_active_at:
- ret = (self.total_active_uptime+(time.time()-self.became_active_at))
- else:
- ret = self.total_active_uptime
- if ret == 0: return 0.000005 # eh..
- else: return ret
-
- def failed_per_hour(self):
- """Return the number of circuit extend failures per hour for this
- Router"""
- return (3600.*(self.circ_failed+self.strm_failed))/self.current_uptime()
-
- # XXX: Seperate suspected from failed in totals
- def suspected_per_hour(self):
- """Return the number of circuits that failed with this router as an
- earlier hop"""
- return (3600.*(self.circ_suspected+self.strm_suspected
- +self.circ_failed+self.strm_failed))/self.current_uptime()
-
- # These four are for sanity checking
- def _suspected_per_hour(self):
- return (3600.*(self.circ_suspected+self.strm_suspected))/self.current_uptime()
-
- def _uncounted_per_hour(self):
- return (3600.*(self.circ_uncounted+self.strm_uncounted))/self.current_uptime()
-
- def _chosen_per_hour(self):
- return (3600.*(self.circ_chosen+self.strm_chosen))/self.current_uptime()
-
- def _succeeded_per_hour(self):
- return (3600.*(self.circ_succeeded+self.strm_succeeded))/self.current_uptime()
-
- key = """Metatroller Statistics:
- CC=Circuits Chosen CF=Circuits Failed CS=Circuit Suspected
- SC=Streams Chosen SF=Streams Failed SS=Streams Suspected
- FH=Failed per Hour SH=Suspected per Hour ET=avg circuit Extend Time (s)
- EB=mean BW (K) BD=BW std Dev (K) BR=Ratio of observed to avg BW
- ZB=BW z-test value PB=Probability(z-bw) ZR=Ratio z-test value
- PR=Prob(z-ratio) U=Uptime (h)\n"""
-
- def __str__(self):
- return (self.idhex+" ("+self.nickname+")\n"
- +" CC="+str(self.circ_chosen)
- +" CF="+str(self.circ_failed)
- +" CS="+str(self.circ_suspected+self.circ_failed)
- +" SC="+str(self.strm_chosen)
- +" SF="+str(self.strm_failed)
- +" SS="+str(self.strm_suspected+self.strm_failed)
- +" FH="+str(round(self.failed_per_hour(),1))
- +" SH="+str(round(self.suspected_per_hour(),1))+"\n"
- +" ET="+str(round(self.avg_extend_time(),1))
- +" EB="+str(round(self.bwstats.mean,1))
- +" BD="+str(round(self.bwstats.dev,1))
- +" ZB="+str(round(self.z_bw,1))
- +" PB="+(str(round(self.prob_zb,3))[1:])
- +" BR="+str(round(self.bw_ratio(),1))
- +" ZR="+str(round(self.z_ratio,1))
- +" PR="+(str(round(self.prob_zr,3))[1:])
- +" U="+str(round(self.current_uptime()/3600, 1))+"\n")
-
- def sanity_check(self):
- "Makes sure all stats are self-consistent"
- if (self.circ_failed + self.circ_succeeded + self.circ_suspected
- + self.circ_uncounted != self.circ_chosen):
- plog("ERROR", self.nickname+" does not add up for circs")
- if (self.strm_failed + self.strm_succeeded + self.strm_suspected
- + self.strm_uncounted != self.strm_chosen):
- plog("ERROR", self.nickname+" does not add up for streams")
- def check_reasons(reasons, expected, which, rtype):
- count = 0
- for rs in reasons.iterkeys():
- if re.search(r"^"+which, rs): count += reasons[rs]
- if count != expected:
- plog("ERROR", "Mismatch "+which+" "+rtype+" for "+self.nickname)
- check_reasons(self.reason_suspected,self.strm_suspected,"STREAM","susp")
- check_reasons(self.reason_suspected,self.circ_suspected,"CIRC","susp")
- check_reasons(self.reason_failed,self.strm_failed,"STREAM","failed")
- check_reasons(self.reason_failed,self.circ_failed,"CIRC","failed")
- now = time.time()
- tot_hib_time = self.total_hibernation_time
- tot_uptime = self.total_active_uptime
- if self.hibernated_at: tot_hib_time += now - self.hibernated_at
- if self.became_active_at: tot_uptime += now - self.became_active_at
- if round(tot_hib_time+tot_uptime) != round(now-self.first_seen):
- plog("ERROR", "Mismatch of uptimes for "+self.nickname)
-
- per_hour_tot = round(self._uncounted_per_hour()+self.failed_per_hour()+
- self._suspected_per_hour()+self._succeeded_per_hour(), 2)
- chosen_tot = round(self._chosen_per_hour(), 2)
- if per_hour_tot != chosen_tot:
- plog("ERROR", self.nickname+" has mismatch of per hour counts: "
- +str(per_hour_tot) +" vs "+str(chosen_tot))
-
-
-
-class StatsHandler(PathSupport.PathBuilder):
- """An extension of PathSupport.PathBuilder that keeps track of
- router statistics for every circuit and stream"""
- def __init__(self, c, slmgr):
- PathBuilder.__init__(self, c, slmgr, StatsRouter)
- self.circ_count = 0
- self.strm_count = 0
- self.strm_failed = 0
- self.circ_failed = 0
- self.failed_reasons = {}
- self.suspect_reasons = {}
-
- def run_zbtest(self): # Unweighted z-test
- """Run unweighted z-test to calculate the probabilities of a node
- having a given stream bandwidth based on the Normal distribution"""
- n = reduce(lambda x, y: x+(y.bwstats.mean > 0), self.sorted_r, 0)
- if n == 0: return (0, 0)
- avg = reduce(lambda x, y: x+y.bwstats.mean, self.sorted_r, 0)/float(n)
- def notlambda(x, y):
- if y.bwstats.mean <= 0: return x+0
- else: return x+(y.bwstats.mean-avg)*(y.bwstats.mean-avg)
- stddev = math.sqrt(reduce(notlambda, self.sorted_r, 0)/float(n))
- if not stddev: return (avg, stddev)
- for r in self.sorted_r:
- if r.bwstats.mean > 0:
- r.z_bw = abs((r.bwstats.mean-avg)/stddev)
- r.prob_zb = TorUtil.zprob(-r.z_bw)
- return (avg, stddev)
-
- def run_zrtest(self): # Unweighted z-test
- """Run unweighted z-test to calculate the probabilities of a node
- having a given ratio of stream bandwidth to advertised bandwidth
- based on the Normal distribution"""
- n = reduce(lambda x, y: x+(y.bw_ratio() > 0), self.sorted_r, 0)
- if n == 0: return (0, 0)
- avg = reduce(lambda x, y: x+y.bw_ratio(), self.sorted_r, 0)/float(n)
- def notlambda(x, y):
- if y.bw_ratio() <= 0: return x+0
- else: return x+(y.bw_ratio()-avg)*(y.bw_ratio()-avg)
- stddev = math.sqrt(reduce(notlambda, self.sorted_r, 0)/float(n))
- if not stddev: return (avg, stddev)
- for r in self.sorted_r:
- if r.bw_ratio() > 0:
- r.z_ratio = abs((r.bw_ratio()-avg)/stddev)
- r.prob_zr = TorUtil.zprob(-r.z_ratio)
- return (avg, stddev)
-
- def write_reasons(self, f, reasons, name):
- "Write out all the failure reasons and statistics for all Routers"
- f.write("\n\n\t----------------- "+name+" -----------------\n")
- for rsn in reasons:
- f.write("\n"+rsn.reason+". Failed: "+str(rsn.total_failed())
- +", Suspected: "+str(rsn.total_suspected())+"\n")
- rsn.write_list(f)
-
- def write_routers(self, f, rlist, name):
- "Write out all the usage statistics for all Routers"
- f.write("\n\n\t----------------- "+name+" -----------------\n\n")
- for r in rlist:
- # only print it if we've used it.
- if r.circ_chosen+r.strm_chosen > 0: f.write(str(r))
-
- def write_stats(self, filename):
- "Write out all the statistics the StatsHandler has gathered"
- # TODO: all this shit should be configurable. Some of it only makes
- # sense when scanning in certain modes.
- plog("DEBUG", "Writing stats")
- # Sanity check routers
- for r in self.sorted_r: r.sanity_check()
-
- # Sanity check the router reason lists.
- for r in self.sorted_r:
- for rsn in r.reason_failed:
- if r not in self.failed_reasons[rsn].rlist:
- plog("ERROR", "Router missing from reason table")
- for rsn in r.reason_suspected:
- if r not in self.suspect_reasons[rsn].rlist:
- plog("ERROR", "Router missing from reason table")
-
- # Sanity check the lists the other way
- for rsn in self.failed_reasons.itervalues(): rsn._verify_failed()
- for rsn in self.suspect_reasons.itervalues(): rsn._verify_suspected()
-
- f = file(filename, "w")
- f.write(StatsRouter.key)
- (avg, dev) = self.run_zbtest()
- f.write("\n\nBW stats: u="+str(round(avg,1))+" s="+str(round(dev,1))+"\n")
-
- (avg, dev) = self.run_zrtest()
- f.write("BW ratio stats: u="+str(round(avg,1))+" s="+str(round(dev,1))+"\n")
-
- # Circ, strm infoz
- f.write("Circ failure ratio: "+str(self.circ_failed)
- +"/"+str(self.circ_count)+"\n")
-
- f.write("Stream failure ratio: "+str(self.strm_failed)
- +"/"+str(self.strm_count)+"\n")
-
- # Extend times
- n = 0.01+reduce(lambda x, y: x+(y.avg_extend_time() > 0), self.sorted_r, 0)
- avg_extend = reduce(lambda x, y: x+y.avg_extend_time(), self.sorted_r, 0)/n
- def notlambda(x, y):
- return x+(y.avg_extend_time()-avg_extend)*(y.avg_extend_time()-avg_extend)
- dev_extend = math.sqrt(reduce(notlambda, self.sorted_r, 0)/float(n))
-
- f.write("Extend time: u="+str(round(avg_extend,1))
- +" s="+str(round(dev_extend,1)))
-
- # sort+print by bandwidth
- bw_rate = copy.copy(self.sorted_r)
- bw_rate.sort(lambda x, y: cmp(y.bw_ratio(), x.bw_ratio()))
- self.write_routers(f, bw_rate, "Bandwidth Ratios")
-
- failed = copy.copy(self.sorted_r)
- failed.sort(lambda x, y:
- cmp(y.circ_failed+y.strm_failed,
- x.circ_failed+x.strm_failed))
- self.write_routers(f, failed, "Failed Counts")
-
- suspected = copy.copy(self.sorted_r)
- suspected.sort(lambda x, y: # Suspected includes failed
- cmp(y.circ_failed+y.strm_failed+y.circ_suspected+y.strm_suspected,
- x.circ_failed+x.strm_failed+x.circ_suspected+x.strm_suspected))
- self.write_routers(f, suspected, "Suspected Counts")
-
- fail_rate = copy.copy(failed)
- fail_rate.sort(lambda x, y: cmp(y.failed_per_hour(), x.failed_per_hour()))
- self.write_routers(f, fail_rate, "Fail Rates")
-
- suspect_rate = copy.copy(suspected)
- suspect_rate.sort(lambda x, y:
- cmp(y.suspected_per_hour(), x.suspected_per_hour()))
- self.write_routers(f, suspect_rate, "Suspect Rates")
-
- # TODO: Sort by failed/selected and suspect/selected ratios
- # if we ever want to do non-uniform scanning..
-
- # FIXME: Add failed in here somehow..
- susp_reasons = self.suspect_reasons.values()
- susp_reasons.sort(lambda x, y:
- cmp(y.total_suspected(), x.total_suspected()))
- self.write_reasons(f, susp_reasons, "Suspect Reasons")
-
- fail_reasons = self.failed_reasons.values()
- fail_reasons.sort(lambda x, y:
- cmp(y.total_failed(), x.total_failed()))
- self.write_reasons(f, fail_reasons, "Failed Reasons")
- f.close()
-
- # FIXME: sort+print by circ extend time
-
- def reset_stats(self):
- plog("DEBUG", "Resetting stats")
- self.circ_count = 0
- self.strm_count = 0
- self.strm_failed = 0
- self.circ_failed = 0
- self.suspect_reasons.clear()
- self.failed_reasons.clear()
- for r in self.sorted_r: r.reset()
-
- def circ_status_event(self, c):
- if c.circ_id in self.circuits:
- # TODO: Hrmm, consider making this sane in TorCtl.
- if c.reason: lreason = c.reason
- else: lreason = "NONE"
- if c.remote_reason: rreason = c.remote_reason
- else: rreason = "NONE"
- reason = c.event_name+":"+c.status+":"+lreason+":"+rreason
- if c.status == "LAUNCHED":
- # Update circ_chosen count
- self.circ_count += 1
- elif c.status == "EXTENDED":
- delta = c.arrived_at - self.circuits[c.circ_id].last_extended_at
- r_ext = c.path[-1]
- if r_ext[0] != '$': r_ext = self.name_to_key[r_ext]
- self.routers[r_ext[1:]].total_extend_time += delta
- self.routers[r_ext[1:]].total_extended += 1
- elif c.status == "FAILED":
- for r in self.circuits[c.circ_id].path: r.circ_chosen += 1
-
- if len(c.path)-1 < 0: start_f = 0
- else: start_f = len(c.path)-1
-
- # Count failed
- self.circ_failed += 1
- # XXX: Differentiate between extender and extendee
- for r in self.circuits[c.circ_id].path[start_f:len(c.path)+1]:
- r.circ_failed += 1
- if not reason in r.reason_failed:
- r.reason_failed[reason] = 1
- else: r.reason_failed[reason]+=1
- if reason not in self.failed_reasons:
- self.failed_reasons[reason] = FailedRouterList(reason)
- self.failed_reasons[reason].add_r(r)
-
- for r in self.circuits[c.circ_id].path[len(c.path)+1:]:
- r.circ_uncounted += 1
-
- # Don't count if failed was set this round, don't set
- # suspected..
- for r in self.circuits[c.circ_id].path[:start_f]:
- r.circ_suspected += 1
- if not reason in r.reason_suspected:
- r.reason_suspected[reason] = 1
- else: r.reason_suspected[reason]+=1
- if reason not in self.suspect_reasons:
- self.suspect_reasons[reason] = SuspectRouterList(reason)
- self.suspect_reasons[reason].add_r(r)
- elif c.status == "CLOSED":
- # Since PathBuilder deletes the circuit on a failed,
- # we only get this for a clean close
- for r in self.circuits[c.circ_id].path:
- r.circ_chosen += 1
- if lreason in ("REQUESTED", "FINISHED", "ORIGIN"):
- r.circ_succeeded += 1
- else:
- if not reason in r.reason_suspected:
- r.reason_suspected[reason] = 1
- else: r.reason_suspected[reason] += 1
- r.circ_suspected+= 1
- if reason not in self.suspect_reasons:
- self.suspect_reasons[reason] = SuspectRouterList(reason)
- self.suspect_reasons[reason].add_r(r)
- PathBuilder.circ_status_event(self, c)
-
- def count_stream_reason_failed(self, s, reason):
- "Count the routers involved in a failure"
- # Update failed count,reason_failed for exit
- r = self.circuits[s.circ_id].exit
- if not reason in r.reason_failed: r.reason_failed[reason] = 1
- else: r.reason_failed[reason]+=1
- r.strm_failed += 1
- if reason not in self.failed_reasons:
- self.failed_reasons[reason] = FailedRouterList(reason)
- self.failed_reasons[reason].add_r(r)
-
- def count_stream_suspects(self, s, lreason, reason):
- "Count the routers 'suspected' of being involved in a failure"
- if lreason in ("TIMEOUT", "INTERNAL", "TORPROTOCOL" "DESTROY"):
- for r in self.circuits[s.circ_id].path[:-1]:
- r.strm_suspected += 1
- if not reason in r.reason_suspected:
- r.reason_suspected[reason] = 1
- else: r.reason_suspected[reason]+=1
- if reason not in self.suspect_reasons:
- self.suspect_reasons[reason] = SuspectRouterList(reason)
- self.suspect_reasons[reason].add_r(r)
- else:
- for r in self.circuits[s.circ_id].path[:-1]:
- r.strm_uncounted += 1
-
- def stream_status_event(self, s):
- if s.strm_id in self.streams and not self.streams[s.strm_id].ignored:
- # TODO: Hrmm, consider making this sane in TorCtl.
- if s.reason: lreason = s.reason
- else: lreason = "NONE"
- if s.remote_reason: rreason = s.remote_reason
- else: rreason = "NONE"
- reason = s.event_name+":"+s.status+":"+lreason+":"+rreason+":"+self.streams[s.strm_id].kind
- if (s.status in ("DETACHED", "FAILED", "CLOSED", "SUCCEEDED")
- and not s.circ_id):
- # XXX: REMAPs can do this (normal). Also REASON=DESTROY (bug?)
- # Also timeouts.. Those should use the pending circ instead
- # of returning..
- plog("WARN", "Stream "+str(s.strm_id)+" detached from no circuit!")
- PathBuilder.stream_status_event(self, s)
- return
-
- # Verify circ id matches stream.circ
- if s.status not in ("NEW", "NEWRESOLVE", "REMAP"):
- circ = self.streams[s.strm_id].circ
- if not circ: circ = self.streams[s.strm_id].pending_circ
- if circ and circ.circ_id != s.circ_id:
- plog("WARN", str(s.strm_id) + " has mismatch of "
- +str(s.circ_id)+" v "+str(circ.circ_id))
- if s.circ_id and s.circ_id not in self.circuits:
- plog("NOTICE", "Unknown circuit "+str(s.circ_id)
- +" for stream "+str(s.strm_id))
- return
-
- if s.status == "DETACHED":
- if self.streams[s.strm_id].attached_at:
- plog("WARN", str(s.strm_id)+" detached after succeeded")
- # Update strm_chosen count
- self.strm_count += 1
- for r in self.circuits[s.circ_id].path: r.strm_chosen += 1
- self.strm_failed += 1
- self.count_stream_suspects(s, lreason, reason)
- self.count_stream_reason_failed(s, reason)
- elif s.status == "FAILED":
- # HACK. We get both failed and closed for the same stream,
- # with different reasons. Might as well record both, since they
- # often differ.
- self.streams[s.strm_id].failed_reason = reason
- elif s.status == "CLOSED":
- # Always get both a closed and a failed..
- # - Check if the circuit exists still
- # Update strm_chosen count
- self.strm_count += 1
- for r in self.circuits[s.circ_id].path: r.strm_chosen += 1
-
- # Update bw stats. XXX: Don't do this for resolve streams
- if self.streams[s.strm_id].attached_at:
- lifespan = self.streams[s.strm_id].lifespan(s.arrived_at)
- for r in self.streams[s.strm_id].circ.path:
- r.bwstats.add_bw(self.streams[s.strm_id].bytes_written+
- self.streams[s.strm_id].bytes_read, lifespan)
-
- if self.streams[s.strm_id].failed:
- reason = self.streams[s.strm_id].failed_reason+":"+lreason+":"+rreason
-
- self.count_stream_suspects(s, lreason, reason)
-
- r = self.circuits[s.circ_id].exit
- if (not self.streams[s.strm_id].failed
- and (lreason == "DONE" or (lreason == "END" and rreason == "DONE"))):
- r.strm_succeeded += 1
- else:
- self.strm_failed += 1
- self.count_stream_reason_failed(s, reason)
- PathBuilder.stream_status_event(self, s)
-
- def ns_event(self, n):
- PathBuilder.ns_event(self, n)
- now = n.arrived_at
- for ns in n.nslist:
- if not ns.idhex in self.routers:
- continue
- r = self.routers[ns.idhex]
- if "Running" in ns.flags:
- if not r.became_active_at:
- r.became_active_at = now
- r.total_hibernation_time += now - r.hibernated_at
- r.hibernated_at = 0
- else:
- if not r.hibernated_at:
- r.hibernated_at = now
- r.total_active_uptime += now - r.became_active_at
- r.became_active_at = 0
Copied: torflow/branches/gsoc2008/TorCtl.local/StatsSupport.py (from rev 15741, torflow/branches/gsoc2008/TorCtl/StatsSupport.py)
===================================================================
--- torflow/branches/gsoc2008/TorCtl.local/StatsSupport.py (rev 0)
+++ torflow/branches/gsoc2008/TorCtl.local/StatsSupport.py 2008-07-07 19:37:47 UTC (rev 15742)
@@ -0,0 +1,676 @@
+#!/usr/bin/python
+#StatsSupport.py - functions and classes useful for calculating stream/circuit statistics
+
+"""
+
+Support classes for statisics gathering
+
+The StatsSupport package contains several classes that extend PathSupport to
+gather continuous statistics on the Tor network.
+
+The main entrypoint is to extend or create an instance of the StatsHandler
+class. The StatsHandler extends from TorCtl.PathSupport.PathBuilder, which is
+itself a TorCtl.EventHandler. The StatsHandler listens to CIRC and STREAM
+events and gathers all manner of statics on their creation and failure before
+passing the events back up to the PathBuilder code, which manages the actual
+construction and the attachment of streams to circuits.
+
+The package also contains a number of support classes that help gather
+additional statistics on the reliability and performance of routers.
+
+For the purpose of accounting failures, the code tracks two main classes of
+failure: 'actual' failure and 'suspected' failure. The general rule is that an
+actual failure is attributed to the node that directly handled the circuit or
+stream. For streams, this is considered to be the exit node. For circuits, it
+is both the extender and the extendee. 'Suspected' failures, on the other
+hand, are attributed to every member of the circuit up until the extendee for
+circuits, and all hops for streams.
+
+For bandwidth accounting, the average stream bandwidth and the average ratio
+of stream bandwidth to advertised bandwidth are tracked, and when the
+statistics are written, a Z-test is performed to calculate the probabilities
+of these values assuming a normal distribution. Note, however, that it has not
+been verified that this distribution is actually normal. It is likely to be
+something else (pareto, perhaps?).
+
+"""
+
+import sys
+import re
+import random
+import copy
+import time
+import math
+
+import TorUtil, PathSupport, TorCtl
+from TorUtil import *
+from PathSupport import *
+from TorUtil import meta_port, meta_host, control_port, control_host
+
+class ReasonRouterList:
+ "Helper class to track which Routers have failed for a given reason"
+ def __init__(self, reason):
+ self.reason = reason
+ self.rlist = {}
+
+ def sort_list(self): raise NotImplemented()
+
+ def write_list(self, f):
+ "Write the list of failure counts for this reason 'f'"
+ rlist = self.sort_list()
+ for r in rlist:
+ susp = 0
+ tot_failed = r.circ_failed+r.strm_failed
+ tot_susp = tot_failed+r.circ_suspected+r.strm_suspected
+ f.write(r.idhex+" ("+r.nickname+") F=")
+ if self.reason in r.reason_failed:
+ susp = r.reason_failed[self.reason]
+ f.write(str(susp)+"/"+str(tot_failed))
+ f.write(" S=")
+ if self.reason in r.reason_suspected:
+ susp += r.reason_suspected[self.reason]
+ f.write(str(susp)+"/"+str(tot_susp)+"\n")
+
+ def add_r(self, r):
+ "Add a router to the list for this reason"
+ self.rlist[r] = 1
+
+ def total_suspected(self):
+ "Get a list of total suspected failures for this reason"
+ # suspected is disjoint from failed. The failed table
+ # may not have an entry
+ def notlambda(x, y):
+ if self.reason in y.reason_suspected:
+ if self.reason in y.reason_failed:
+ return (x + y.reason_suspected[self.reason]
+ + y.reason_failed[self.reason])
+ else:
+ return (x + y.reason_suspected[self.reason])
+ else:
+ if self.reason in y.reason_failed:
+ return (x + y.reason_failed[self.reason])
+ else: return x
+ return reduce(notlambda, self.rlist.iterkeys(), 0)
+
+ def total_failed(self):
+ "Get a list of total failures for this reason"
+ def notlambda(x, y):
+ if self.reason in y.reason_failed:
+ return (x + y.reason_failed[self.reason])
+ else: return x
+ return reduce(notlambda, self.rlist.iterkeys(), 0)
+
+class SuspectRouterList(ReasonRouterList):
+ """Helper class to track all routers suspected of failing for a given
+ reason. The main difference between this and the normal
+ ReasonRouterList is the sort order and the verification."""
+ def __init__(self, reason): ReasonRouterList.__init__(self,reason)
+
+ def sort_list(self):
+ rlist = self.rlist.keys()
+ rlist.sort(lambda x, y: cmp(y.reason_suspected[self.reason],
+ x.reason_suspected[self.reason]))
+ return rlist
+
+ def _verify_suspected(self):
+ return reduce(lambda x, y: x + y.reason_suspected[self.reason],
+ self.rlist.iterkeys(), 0)
+
+class FailedRouterList(ReasonRouterList):
+ """Helper class to track all routers that failed for a given
+ reason. The main difference between this and the normal
+ ReasonRouterList is the sort order and the verification."""
+ def __init__(self, reason): ReasonRouterList.__init__(self,reason)
+
+ def sort_list(self):
+ rlist = self.rlist.keys()
+ rlist.sort(lambda x, y: cmp(y.reason_failed[self.reason],
+ x.reason_failed[self.reason]))
+ return rlist
+
+ def _verify_failed(self):
+ return reduce(lambda x, y: x + y.reason_failed[self.reason],
+ self.rlist.iterkeys(), 0)
+class BandwidthStats:
+ "Class that manages observed bandwidth through a Router"
+ def __init__(self):
+ self.byte_list = []
+ self.duration_list = []
+ self.min_bw = 1e10
+ self.max_bw = 0
+ self.mean = 0
+ self.dev = 0
+
+ def _exp(self): # Weighted avg
+ "Expectation - weighted average of the bandwidth through this node"
+ tot_bw = reduce(lambda x, y: x+y, self.byte_list, 0.0)
+ EX = 0.0
+ for i in xrange(len(self.byte_list)):
+ EX += (self.byte_list[i]*self.byte_list[i])/self.duration_list[i]
+ if tot_bw == 0.0: return 0.0
+ EX /= tot_bw
+ return EX
+
+ def _exp2(self): # E[X^2]
+ "Second moment of the bandwidth"
+ tot_bw = reduce(lambda x, y: x+y, self.byte_list, 0.0)
+ EX = 0.0
+ for i in xrange(len(self.byte_list)):
+ EX += (self.byte_list[i]**3)/(self.duration_list[i]**2)
+ if tot_bw == 0.0: return 0.0
+ EX /= tot_bw
+ return EX
+
+ def _dev(self): # Weighted dev
+ "Standard deviation of bandwidth"
+ EX = self.mean
+ EX2 = self._exp2()
+ arg = EX2 - (EX*EX)
+ if arg < -0.05:
+ plog("WARN", "Diff of "+str(EX2)+" and "+str(EX)+"^2 is "+str(arg))
+ return math.sqrt(abs(arg))
+
+ def add_bw(self, bytes, duration):
+ "Add an observed transfer of 'bytes' for 'duration' seconds"
+ if not bytes: plog("WARN", "No bytes for bandwidth")
+ bytes /= 1024.
+ self.byte_list.append(bytes)
+ self.duration_list.append(duration)
+ bw = bytes/duration
+ plog("DEBUG", "Got bandwidth "+str(bw))
+ if self.min_bw > bw: self.min_bw = bw
+ if self.max_bw < bw: self.max_bw = bw
+ self.mean = self._exp()
+ self.dev = self._dev()
+
+
+
+class StatsRouter(TorCtl.Router):
+ "Extended Router to handle statistics markup"
+ def __init__(self, router): # Promotion constructor :)
+ """'Promotion Constructor' that converts a Router directly into a
+ StatsRouter without a copy."""
+ self.__dict__ = router.__dict__
+ self.reset()
+
+ def reset(self):
+ "Reset all stats on this Router"
+ self.circ_uncounted = 0
+ self.circ_failed = 0
+ self.circ_succeeded = 0 # disjoint from failed
+ self.circ_suspected = 0
+ self.circ_chosen = 0 # above 4 should add to this
+ self.strm_failed = 0 # Only exits should have these
+ self.strm_succeeded = 0
+ self.strm_suspected = 0 # disjoint from failed
+ self.strm_uncounted = 0
+ self.strm_chosen = 0 # above 4 should add to this
+ self.reason_suspected = {}
+ self.reason_failed = {}
+ self.first_seen = time.time()
+ if "Running" in self.flags:
+ self.became_active_at = self.first_seen
+ self.hibernated_at = 0
+ else:
+ self.became_active_at = 0
+ self.hibernated_at = self.first_seen
+ self.total_hibernation_time = 0
+ self.total_active_uptime = 0
+ self.total_extend_time = 0
+ self.total_extended = 0
+ self.bwstats = BandwidthStats()
+ self.z_ratio = 0
+ self.prob_zr = 0
+ self.z_bw = 0
+ self.prob_zb = 0
+
+ def avg_extend_time(self):
+ """Return the average amount of time it took for this router
+ to extend a circuit one hop"""
+ if self.total_extended:
+ return self.total_extend_time/self.total_extended
+ else: return 0
+
+ def bw_ratio(self):
+ """Return the ratio of the Router's advertised bandwidth to its
+ observed average stream bandwidth"""
+ bw = self.bwstats.mean
+ if bw == 0.0: return 0
+ else: return self.bw/(1024.*bw)
+
+ def current_uptime(self):
+ if self.became_active_at:
+ ret = (self.total_active_uptime+(time.time()-self.became_active_at))
+ else:
+ ret = self.total_active_uptime
+ if ret == 0: return 0.000005 # eh..
+ else: return ret
+
+ def failed_per_hour(self):
+ """Return the number of circuit extend failures per hour for this
+ Router"""
+ return (3600.*(self.circ_failed+self.strm_failed))/self.current_uptime()
+
+ # XXX: Seperate suspected from failed in totals
+ def suspected_per_hour(self):
+ """Return the number of circuits that failed with this router as an
+ earlier hop"""
+ return (3600.*(self.circ_suspected+self.strm_suspected
+ +self.circ_failed+self.strm_failed))/self.current_uptime()
+
+ # These four are for sanity checking
+ def _suspected_per_hour(self):
+ return (3600.*(self.circ_suspected+self.strm_suspected))/self.current_uptime()
+
+ def _uncounted_per_hour(self):
+ return (3600.*(self.circ_uncounted+self.strm_uncounted))/self.current_uptime()
+
+ def _chosen_per_hour(self):
+ return (3600.*(self.circ_chosen+self.strm_chosen))/self.current_uptime()
+
+ def _succeeded_per_hour(self):
+ return (3600.*(self.circ_succeeded+self.strm_succeeded))/self.current_uptime()
+
+ key = """Metatroller Statistics:
+ CC=Circuits Chosen CF=Circuits Failed CS=Circuit Suspected
+ SC=Streams Chosen SF=Streams Failed SS=Streams Suspected
+ FH=Failed per Hour SH=Suspected per Hour ET=avg circuit Extend Time (s)
+ EB=mean BW (K) BD=BW std Dev (K) BR=Ratio of observed to avg BW
+ ZB=BW z-test value PB=Probability(z-bw) ZR=Ratio z-test value
+ PR=Prob(z-ratio) U=Uptime (h)\n"""
+
+ def __str__(self):
+ return (self.idhex+" ("+self.nickname+")\n"
+ +" CC="+str(self.circ_chosen)
+ +" CF="+str(self.circ_failed)
+ +" CS="+str(self.circ_suspected+self.circ_failed)
+ +" SC="+str(self.strm_chosen)
+ +" SF="+str(self.strm_failed)
+ +" SS="+str(self.strm_suspected+self.strm_failed)
+ +" FH="+str(round(self.failed_per_hour(),1))
+ +" SH="+str(round(self.suspected_per_hour(),1))+"\n"
+ +" ET="+str(round(self.avg_extend_time(),1))
+ +" EB="+str(round(self.bwstats.mean,1))
+ +" BD="+str(round(self.bwstats.dev,1))
+ +" ZB="+str(round(self.z_bw,1))
+ +" PB="+(str(round(self.prob_zb,3))[1:])
+ +" BR="+str(round(self.bw_ratio(),1))
+ +" ZR="+str(round(self.z_ratio,1))
+ +" PR="+(str(round(self.prob_zr,3))[1:])
+ +" U="+str(round(self.current_uptime()/3600, 1))+"\n")
+
+ def sanity_check(self):
+ "Makes sure all stats are self-consistent"
+ if (self.circ_failed + self.circ_succeeded + self.circ_suspected
+ + self.circ_uncounted != self.circ_chosen):
+ plog("ERROR", self.nickname+" does not add up for circs")
+ if (self.strm_failed + self.strm_succeeded + self.strm_suspected
+ + self.strm_uncounted != self.strm_chosen):
+ plog("ERROR", self.nickname+" does not add up for streams")
+ def check_reasons(reasons, expected, which, rtype):
+ count = 0
+ for rs in reasons.iterkeys():
+ if re.search(r"^"+which, rs): count += reasons[rs]
+ if count != expected:
+ plog("ERROR", "Mismatch "+which+" "+rtype+" for "+self.nickname)
+ check_reasons(self.reason_suspected,self.strm_suspected,"STREAM","susp")
+ check_reasons(self.reason_suspected,self.circ_suspected,"CIRC","susp")
+ check_reasons(self.reason_failed,self.strm_failed,"STREAM","failed")
+ check_reasons(self.reason_failed,self.circ_failed,"CIRC","failed")
+ now = time.time()
+ tot_hib_time = self.total_hibernation_time
+ tot_uptime = self.total_active_uptime
+ if self.hibernated_at: tot_hib_time += now - self.hibernated_at
+ if self.became_active_at: tot_uptime += now - self.became_active_at
+ if round(tot_hib_time+tot_uptime) != round(now-self.first_seen):
+ plog("ERROR", "Mismatch of uptimes for "+self.nickname)
+
+ per_hour_tot = round(self._uncounted_per_hour()+self.failed_per_hour()+
+ self._suspected_per_hour()+self._succeeded_per_hour(), 2)
+ chosen_tot = round(self._chosen_per_hour(), 2)
+ if per_hour_tot != chosen_tot:
+ plog("ERROR", self.nickname+" has mismatch of per hour counts: "
+ +str(per_hour_tot) +" vs "+str(chosen_tot))
+
+
+
+class StatsHandler(PathSupport.PathBuilder):
+ """An extension of PathSupport.PathBuilder that keeps track of
+ router statistics for every circuit and stream"""
+ def __init__(self, c, slmgr):
+ PathBuilder.__init__(self, c, slmgr, StatsRouter)
+ self.circ_count = 0
+ self.strm_count = 0
+ self.strm_failed = 0
+ self.circ_failed = 0
+ self.failed_reasons = {}
+ self.suspect_reasons = {}
+
+ def run_zbtest(self): # Unweighted z-test
+ """Run unweighted z-test to calculate the probabilities of a node
+ having a given stream bandwidth based on the Normal distribution"""
+ n = reduce(lambda x, y: x+(y.bwstats.mean > 0), self.sorted_r, 0)
+ if n == 0: return (0, 0)
+ avg = reduce(lambda x, y: x+y.bwstats.mean, self.sorted_r, 0)/float(n)
+ def notlambda(x, y):
+ if y.bwstats.mean <= 0: return x+0
+ else: return x+(y.bwstats.mean-avg)*(y.bwstats.mean-avg)
+ stddev = math.sqrt(reduce(notlambda, self.sorted_r, 0)/float(n))
+ if not stddev: return (avg, stddev)
+ for r in self.sorted_r:
+ if r.bwstats.mean > 0:
+ r.z_bw = abs((r.bwstats.mean-avg)/stddev)
+ r.prob_zb = TorUtil.zprob(-r.z_bw)
+ return (avg, stddev)
+
+ def run_zrtest(self): # Unweighted z-test
+ """Run unweighted z-test to calculate the probabilities of a node
+ having a given ratio of stream bandwidth to advertised bandwidth
+ based on the Normal distribution"""
+ n = reduce(lambda x, y: x+(y.bw_ratio() > 0), self.sorted_r, 0)
+ if n == 0: return (0, 0)
+ avg = reduce(lambda x, y: x+y.bw_ratio(), self.sorted_r, 0)/float(n)
+ def notlambda(x, y):
+ if y.bw_ratio() <= 0: return x+0
+ else: return x+(y.bw_ratio()-avg)*(y.bw_ratio()-avg)
+ stddev = math.sqrt(reduce(notlambda, self.sorted_r, 0)/float(n))
+ if not stddev: return (avg, stddev)
+ for r in self.sorted_r:
+ if r.bw_ratio() > 0:
+ r.z_ratio = abs((r.bw_ratio()-avg)/stddev)
+ r.prob_zr = TorUtil.zprob(-r.z_ratio)
+ return (avg, stddev)
+
+ def write_reasons(self, f, reasons, name):
+ "Write out all the failure reasons and statistics for all Routers"
+ f.write("\n\n\t----------------- "+name+" -----------------\n")
+ for rsn in reasons:
+ f.write("\n"+rsn.reason+". Failed: "+str(rsn.total_failed())
+ +", Suspected: "+str(rsn.total_suspected())+"\n")
+ rsn.write_list(f)
+
+ def write_routers(self, f, rlist, name):
+ "Write out all the usage statistics for all Routers"
+ f.write("\n\n\t----------------- "+name+" -----------------\n\n")
+ for r in rlist:
+ # only print it if we've used it.
+ if r.circ_chosen+r.strm_chosen > 0: f.write(str(r))
+
+ def write_stats(self, filename):
+ "Write out all the statistics the StatsHandler has gathered"
+ # TODO: all this shit should be configurable. Some of it only makes
+ # sense when scanning in certain modes.
+ plog("DEBUG", "Writing stats")
+ # Sanity check routers
+ for r in self.sorted_r: r.sanity_check()
+
+ # Sanity check the router reason lists.
+ for r in self.sorted_r:
+ for rsn in r.reason_failed:
+ if r not in self.failed_reasons[rsn].rlist:
+ plog("ERROR", "Router missing from reason table")
+ for rsn in r.reason_suspected:
+ if r not in self.suspect_reasons[rsn].rlist:
+ plog("ERROR", "Router missing from reason table")
+
+ # Sanity check the lists the other way
+ for rsn in self.failed_reasons.itervalues(): rsn._verify_failed()
+ for rsn in self.suspect_reasons.itervalues(): rsn._verify_suspected()
+
+ f = file(filename, "w")
+ f.write(StatsRouter.key)
+ (avg, dev) = self.run_zbtest()
+ f.write("\n\nBW stats: u="+str(round(avg,1))+" s="+str(round(dev,1))+"\n")
+
+ (avg, dev) = self.run_zrtest()
+ f.write("BW ratio stats: u="+str(round(avg,1))+" s="+str(round(dev,1))+"\n")
+
+ # Circ, strm infoz
+ f.write("Circ failure ratio: "+str(self.circ_failed)
+ +"/"+str(self.circ_count)+"\n")
+
+ f.write("Stream failure ratio: "+str(self.strm_failed)
+ +"/"+str(self.strm_count)+"\n")
+
+ # Extend times
+ n = 0.01+reduce(lambda x, y: x+(y.avg_extend_time() > 0), self.sorted_r, 0)
+ avg_extend = reduce(lambda x, y: x+y.avg_extend_time(), self.sorted_r, 0)/n
+ def notlambda(x, y):
+ return x+(y.avg_extend_time()-avg_extend)*(y.avg_extend_time()-avg_extend)
+ dev_extend = math.sqrt(reduce(notlambda, self.sorted_r, 0)/float(n))
+
+ f.write("Extend time: u="+str(round(avg_extend,1))
+ +" s="+str(round(dev_extend,1)))
+
+ # sort+print by bandwidth
+ bw_rate = copy.copy(self.sorted_r)
+ bw_rate.sort(lambda x, y: cmp(y.bw_ratio(), x.bw_ratio()))
+ self.write_routers(f, bw_rate, "Bandwidth Ratios")
+
+ failed = copy.copy(self.sorted_r)
+ failed.sort(lambda x, y:
+ cmp(y.circ_failed+y.strm_failed,
+ x.circ_failed+x.strm_failed))
+ self.write_routers(f, failed, "Failed Counts")
+
+ suspected = copy.copy(self.sorted_r)
+ suspected.sort(lambda x, y: # Suspected includes failed
+ cmp(y.circ_failed+y.strm_failed+y.circ_suspected+y.strm_suspected,
+ x.circ_failed+x.strm_failed+x.circ_suspected+x.strm_suspected))
+ self.write_routers(f, suspected, "Suspected Counts")
+
+ fail_rate = copy.copy(failed)
+ fail_rate.sort(lambda x, y: cmp(y.failed_per_hour(), x.failed_per_hour()))
+ self.write_routers(f, fail_rate, "Fail Rates")
+
+ suspect_rate = copy.copy(suspected)
+ suspect_rate.sort(lambda x, y:
+ cmp(y.suspected_per_hour(), x.suspected_per_hour()))
+ self.write_routers(f, suspect_rate, "Suspect Rates")
+
+ # TODO: Sort by failed/selected and suspect/selected ratios
+ # if we ever want to do non-uniform scanning..
+
+ # FIXME: Add failed in here somehow..
+ susp_reasons = self.suspect_reasons.values()
+ susp_reasons.sort(lambda x, y:
+ cmp(y.total_suspected(), x.total_suspected()))
+ self.write_reasons(f, susp_reasons, "Suspect Reasons")
+
+ fail_reasons = self.failed_reasons.values()
+ fail_reasons.sort(lambda x, y:
+ cmp(y.total_failed(), x.total_failed()))
+ self.write_reasons(f, fail_reasons, "Failed Reasons")
+ f.close()
+
+ # FIXME: sort+print by circ extend time
+
+ def reset_stats(self):
+ plog("DEBUG", "Resetting stats")
+ self.circ_count = 0
+ self.strm_count = 0
+ self.strm_failed = 0
+ self.circ_failed = 0
+ self.suspect_reasons.clear()
+ self.failed_reasons.clear()
+ for r in self.sorted_r: r.reset()
+
+ def circ_status_event(self, c):
+ if c.circ_id in self.circuits:
+ # TODO: Hrmm, consider making this sane in TorCtl.
+ if c.reason: lreason = c.reason
+ else: lreason = "NONE"
+ if c.remote_reason: rreason = c.remote_reason
+ else: rreason = "NONE"
+ reason = c.event_name+":"+c.status+":"+lreason+":"+rreason
+ if c.status == "LAUNCHED":
+ # Update circ_chosen count
+ self.circ_count += 1
+ elif c.status == "EXTENDED":
+ delta = c.arrived_at - self.circuits[c.circ_id].last_extended_at
+ r_ext = c.path[-1]
+ if r_ext[0] != '$': r_ext = self.name_to_key[r_ext]
+ self.routers[r_ext[1:]].total_extend_time += delta
+ self.routers[r_ext[1:]].total_extended += 1
+ elif c.status == "FAILED":
+ for r in self.circuits[c.circ_id].path: r.circ_chosen += 1
+
+ if len(c.path)-1 < 0: start_f = 0
+ else: start_f = len(c.path)-1
+
+ # Count failed
+ self.circ_failed += 1
+ # XXX: Differentiate between extender and extendee
+ for r in self.circuits[c.circ_id].path[start_f:len(c.path)+1]:
+ r.circ_failed += 1
+ if not reason in r.reason_failed:
+ r.reason_failed[reason] = 1
+ else: r.reason_failed[reason]+=1
+ if reason not in self.failed_reasons:
+ self.failed_reasons[reason] = FailedRouterList(reason)
+ self.failed_reasons[reason].add_r(r)
+
+ for r in self.circuits[c.circ_id].path[len(c.path)+1:]:
+ r.circ_uncounted += 1
+
+ # Don't count if failed was set this round, don't set
+ # suspected..
+ for r in self.circuits[c.circ_id].path[:start_f]:
+ r.circ_suspected += 1
+ if not reason in r.reason_suspected:
+ r.reason_suspected[reason] = 1
+ else: r.reason_suspected[reason]+=1
+ if reason not in self.suspect_reasons:
+ self.suspect_reasons[reason] = SuspectRouterList(reason)
+ self.suspect_reasons[reason].add_r(r)
+ elif c.status == "CLOSED":
+ # Since PathBuilder deletes the circuit on a failed,
+ # we only get this for a clean close
+ for r in self.circuits[c.circ_id].path:
+ r.circ_chosen += 1
+ if lreason in ("REQUESTED", "FINISHED", "ORIGIN"):
+ r.circ_succeeded += 1
+ else:
+ if not reason in r.reason_suspected:
+ r.reason_suspected[reason] = 1
+ else: r.reason_suspected[reason] += 1
+ r.circ_suspected+= 1
+ if reason not in self.suspect_reasons:
+ self.suspect_reasons[reason] = SuspectRouterList(reason)
+ self.suspect_reasons[reason].add_r(r)
+ PathBuilder.circ_status_event(self, c)
+
+ def count_stream_reason_failed(self, s, reason):
+ "Count the routers involved in a failure"
+ # Update failed count,reason_failed for exit
+ r = self.circuits[s.circ_id].exit
+ if not reason in r.reason_failed: r.reason_failed[reason] = 1
+ else: r.reason_failed[reason]+=1
+ r.strm_failed += 1
+ if reason not in self.failed_reasons:
+ self.failed_reasons[reason] = FailedRouterList(reason)
+ self.failed_reasons[reason].add_r(r)
+
+ def count_stream_suspects(self, s, lreason, reason):
+ "Count the routers 'suspected' of being involved in a failure"
+ if lreason in ("TIMEOUT", "INTERNAL", "TORPROTOCOL" "DESTROY"):
+ for r in self.circuits[s.circ_id].path[:-1]:
+ r.strm_suspected += 1
+ if not reason in r.reason_suspected:
+ r.reason_suspected[reason] = 1
+ else: r.reason_suspected[reason]+=1
+ if reason not in self.suspect_reasons:
+ self.suspect_reasons[reason] = SuspectRouterList(reason)
+ self.suspect_reasons[reason].add_r(r)
+ else:
+ for r in self.circuits[s.circ_id].path[:-1]:
+ r.strm_uncounted += 1
+
+ def stream_status_event(self, s):
+ if s.strm_id in self.streams and not self.streams[s.strm_id].ignored:
+ # TODO: Hrmm, consider making this sane in TorCtl.
+ if s.reason: lreason = s.reason
+ else: lreason = "NONE"
+ if s.remote_reason: rreason = s.remote_reason
+ else: rreason = "NONE"
+ reason = s.event_name+":"+s.status+":"+lreason+":"+rreason+":"+self.streams[s.strm_id].kind
+ if (s.status in ("DETACHED", "FAILED", "CLOSED", "SUCCEEDED")
+ and not s.circ_id):
+ # XXX: REMAPs can do this (normal). Also REASON=DESTROY (bug?)
+ # Also timeouts.. Those should use the pending circ instead
+ # of returning..
+ plog("WARN", "Stream "+str(s.strm_id)+" detached from no circuit!")
+ PathBuilder.stream_status_event(self, s)
+ return
+
+ # Verify circ id matches stream.circ
+ if s.status not in ("NEW", "NEWRESOLVE", "REMAP"):
+ circ = self.streams[s.strm_id].circ
+ if not circ: circ = self.streams[s.strm_id].pending_circ
+ if circ and circ.circ_id != s.circ_id:
+ plog("WARN", str(s.strm_id) + " has mismatch of "
+ +str(s.circ_id)+" v "+str(circ.circ_id))
+ if s.circ_id and s.circ_id not in self.circuits:
+ plog("NOTICE", "Unknown circuit "+str(s.circ_id)
+ +" for stream "+str(s.strm_id))
+ return
+
+ if s.status == "DETACHED":
+ if self.streams[s.strm_id].attached_at:
+ plog("WARN", str(s.strm_id)+" detached after succeeded")
+ # Update strm_chosen count
+ self.strm_count += 1
+ for r in self.circuits[s.circ_id].path: r.strm_chosen += 1
+ self.strm_failed += 1
+ self.count_stream_suspects(s, lreason, reason)
+ self.count_stream_reason_failed(s, reason)
+ elif s.status == "FAILED":
+ # HACK. We get both failed and closed for the same stream,
+ # with different reasons. Might as well record both, since they
+ # often differ.
+ self.streams[s.strm_id].failed_reason = reason
+ elif s.status == "CLOSED":
+ # Always get both a closed and a failed..
+ # - Check if the circuit exists still
+ # Update strm_chosen count
+ self.strm_count += 1
+ for r in self.circuits[s.circ_id].path: r.strm_chosen += 1
+
+ # Update bw stats. XXX: Don't do this for resolve streams
+ if self.streams[s.strm_id].attached_at:
+ lifespan = self.streams[s.strm_id].lifespan(s.arrived_at)
+ for r in self.streams[s.strm_id].circ.path:
+ r.bwstats.add_bw(self.streams[s.strm_id].bytes_written+
+ self.streams[s.strm_id].bytes_read, lifespan)
+
+ if self.streams[s.strm_id].failed:
+ reason = self.streams[s.strm_id].failed_reason+":"+lreason+":"+rreason
+
+ self.count_stream_suspects(s, lreason, reason)
+
+ r = self.circuits[s.circ_id].exit
+ if (not self.streams[s.strm_id].failed
+ and (lreason == "DONE" or (lreason == "END" and rreason == "DONE"))):
+ r.strm_succeeded += 1
+ else:
+ self.strm_failed += 1
+ self.count_stream_reason_failed(s, reason)
+ PathBuilder.stream_status_event(self, s)
+
+ def ns_event(self, n):
+ PathBuilder.ns_event(self, n)
+ now = n.arrived_at
+ for ns in n.nslist:
+ if not ns.idhex in self.routers:
+ continue
+ r = self.routers[ns.idhex]
+ if "Running" in ns.flags:
+ if not r.became_active_at:
+ r.became_active_at = now
+ r.total_hibernation_time += now - r.hibernated_at
+ r.hibernated_at = 0
+ else:
+ if not r.hibernated_at:
+ r.hibernated_at = now
+ r.total_active_uptime += now - r.became_active_at
+ r.became_active_at = 0
Deleted: torflow/branches/gsoc2008/TorCtl.local/TorCtl.py
===================================================================
--- torflow/branches/gsoc2008/TorCtl/TorCtl.py 2008-07-07 16:40:57 UTC (rev 15734)
+++ torflow/branches/gsoc2008/TorCtl.local/TorCtl.py 2008-07-07 19:37:47 UTC (rev 15742)
@@ -1,1091 +0,0 @@
-#!/usr/bin/python
-# TorCtl.py -- Python module to interface with Tor Control interface.
-# Copyright 2005 Nick Mathewson
-# Copyright 2007 Mike Perry. See LICENSE file.
-
-"""
-Library to control Tor processes.
-
-This library handles sending commands, parsing responses, and delivering
-events to and from the control port. The basic usage is to create a
-socket, wrap that in a TorCtl.Connection, and then add an EventHandler
-to that connection. A simple example with a DebugEventHandler (that just
-echoes the events back to stdout) is present in run_example().
-
-Note that the TorCtl.Connection is fully compatible with the more
-advanced EventHandlers in TorCtl.PathSupport (and of course any other
-custom event handlers that you may extend off of those).
-
-This package also contains a helper class for representing Routers, and
-classes and constants for each event.
-
-"""
-
-__all__ = ["EVENT_TYPE", "TorCtlError", "TorCtlClosed", "ProtocolError",
- "ErrorReply", "NetworkStatus", "ExitPolicyLine", "Router",
- "RouterVersion", "Connection", "parse_ns_body",
- "EventHandler", "DebugEventHandler", "NetworkStatusEvent",
- "NewDescEvent", "CircuitEvent", "StreamEvent", "ORConnEvent",
- "StreamBwEvent", "LogEvent", "AddrMapEvent", "BWEvent",
- "UnknownEvent" ]
-
-import os
-import re
-import struct
-import sys
-import threading
-import Queue
-import datetime
-import traceback
-import socket
-import binascii
-import types
-import time
-from TorUtil import *
-
-# Types of "EVENT" message.
-EVENT_TYPE = Enum2(
- CIRC="CIRC",
- STREAM="STREAM",
- ORCONN="ORCONN",
- STREAM_BW="STREAM_BW",
- BW="BW",
- NS="NS",
- NEWDESC="NEWDESC",
- ADDRMAP="ADDRMAP",
- DEBUG="DEBUG",
- INFO="INFO",
- NOTICE="NOTICE",
- WARN="WARN",
- ERR="ERR")
-
-class TorCtlError(Exception):
- "Generic error raised by TorControl code."
- pass
-
-class TorCtlClosed(TorCtlError):
- "Raised when the controller connection is closed by Tor (not by us.)"
- pass
-
-class ProtocolError(TorCtlError):
- "Raised on violations in Tor controller protocol"
- pass
-
-class ErrorReply(TorCtlError):
- "Raised when Tor controller returns an error"
- pass
-
-class NetworkStatus:
- "Filled in during NS events"
- def __init__(self, nickname, idhash, orhash, updated, ip, orport, dirport, flags):
- self.nickname = nickname
- self.idhash = idhash
- self.orhash = orhash
- self.ip = ip
- self.orport = int(orport)
- self.dirport = int(dirport)
- self.flags = flags
- self.idhex = (self.idhash + "=").decode("base64").encode("hex").upper()
- m = re.search(r"(\d+)-(\d+)-(\d+) (\d+):(\d+):(\d+)", updated)
- self.updated = datetime.datetime(*map(int, m.groups()))
-
-class NetworkStatusEvent:
- def __init__(self, event_name, nslist):
- self.event_name = event_name
- self.arrived_at = 0
- self.nslist = nslist # List of NetworkStatus objects
-
-class NewDescEvent:
- def __init__(self, event_name, idlist):
- self.event_name = event_name
- self.arrived_at = 0
- self.idlist = idlist
-
-class CircuitEvent:
- def __init__(self, event_name, circ_id, status, path, reason,
- remote_reason):
- self.event_name = event_name
- self.arrived_at = 0
- self.circ_id = circ_id
- self.status = status
- self.path = path
- self.reason = reason
- self.remote_reason = remote_reason
-
-class StreamEvent:
- def __init__(self, event_name, strm_id, status, circ_id, target_host,
- target_port, reason, remote_reason, source, source_addr, purpose):
- self.event_name = event_name
- self.arrived_at = 0
- self.strm_id = strm_id
- self.status = status
- self.circ_id = circ_id
- self.target_host = target_host
- self.target_port = int(target_port)
- self.reason = reason
- self.remote_reason = remote_reason
- self.source = source
- self.source_addr = source_addr
- self.purpose = purpose
-
-class ORConnEvent:
- def __init__(self, event_name, status, endpoint, age, read_bytes,
- wrote_bytes, reason, ncircs):
- self.event_name = event_name
- self.arrived_at = 0
- self.status = status
- self.endpoint = endpoint
- self.age = age
- self.read_bytes = read_bytes
- self.wrote_bytes = wrote_bytes
- self.reason = reason
- self.ncircs = ncircs
-
-class StreamBwEvent:
- def __init__(self, event_name, strm_id, read, written):
- self.event_name = event_name
- self.strm_id = int(strm_id)
- self.bytes_read = int(read)
- self.bytes_written = int(written)
-
-class LogEvent:
- def __init__(self, level, msg):
- self.event_name = self.level = level
- self.msg = msg
-
-class AddrMapEvent:
- def __init__(self, event_name, from_addr, to_addr, when):
- self.event_name = event_name
- self.from_addr = from_addr
- self.to_addr = to_addr
- self.when = when
-
-class BWEvent:
- def __init__(self, event_name, read, written):
- self.event_name = event_name
- self.read = read
- self.written = written
-
-class UnknownEvent:
- def __init__(self, event_name, event_string):
- self.event_name = event_name
- self.event_string = event_string
-
-class ExitPolicyLine:
- """ Class to represent a line in a Router's exit policy in a way
- that can be easily checked. """
- def __init__(self, match, ip_mask, port_low, port_high):
- self.match = match
- if ip_mask == "*":
- self.ip = 0
- self.netmask = 0
- else:
- if not "/" in ip_mask:
- self.netmask = 0xFFFFFFFF
- ip = ip_mask
- else:
- ip, mask = ip_mask.split("/")
- if re.match(r"\d+.\d+.\d+.\d+", mask):
- self.netmask=struct.unpack(">I", socket.inet_aton(mask))[0]
- else:
- self.netmask = ~(2**(32 - int(mask)) - 1)
- self.ip = struct.unpack(">I", socket.inet_aton(ip))[0]
- self.ip &= self.netmask
- if port_low == "*":
- self.port_low,self.port_high = (0,65535)
- else:
- if not port_high:
- port_high = port_low
- self.port_low = int(port_low)
- self.port_high = int(port_high)
-
- def check(self, ip, port):
- """Check to see if an ip and port is matched by this line.
- Returns true if the line is an Accept, and False if it is a Reject. """
- ip = struct.unpack(">I", socket.inet_aton(ip))[0]
- if (ip & self.netmask) == self.ip:
- if self.port_low <= port and port <= self.port_high:
- return self.match
- return -1
-
-class RouterVersion:
- """ Represents a Router's version. Overloads all comparison operators
- to check for newer, older, or equivalent versions. """
- def __init__(self, version):
- if version:
- v = re.search("^(\d+).(\d+).(\d+).(\d+)", version).groups()
- self.version = int(v[0])*0x1000000 + int(v[1])*0x10000 + int(v[2])*0x100 + int(v[3])
- self.ver_string = version
- else:
- self.version = version
- self.ver_string = "unknown"
-
- def __lt__(self, other): return self.version < other.version
- def __gt__(self, other): return self.version > other.version
- def __ge__(self, other): return self.version >= other.version
- def __le__(self, other): return self.version <= other.version
- def __eq__(self, other): return self.version == other.version
- def __ne__(self, other): return self.version != other.version
- def __str__(self): return self.ver_string
-
-class Router:
- """
- Class to represent a router from a descriptor. Can either be
- created from the parsed fields, or can be built from a
- descriptor+NetworkStatus
- """
- def __init__(self, idhex, name, bw, down, exitpolicy, flags, ip, version, os, uptime):
- self.idhex = idhex
- self.nickname = name
- self.bw = bw
- self.exitpolicy = exitpolicy
- self.flags = flags
- self.down = down
- self.ip = struct.unpack(">I", socket.inet_aton(ip))[0]
- self.version = RouterVersion(version)
- self.os = os
- self.list_rank = 0 # position in a sorted list of routers.
- self.uptime = uptime
-
- def __str__(self):
- s = self.idhex, self.nickname
- return s.__str__()
-
- def build_from_desc(desc, ns):
- """
- Static method of Router that parses a descriptor string into this class.
- 'desc' is a full descriptor as a string.
- 'ns' is a TorCtl.NetworkStatus instance for this router (needed for
- the flags, the nickname, and the idhex string).
- Returns a Router instance.
- """
- # XXX: Compile these regular expressions? This is an expensive process
- # Use http://docs.python.org/lib/profile.html to verify this is
- # the part of startup that is slow
- exitpolicy = []
- dead = not ("Running" in ns.flags)
- bw_observed = 0
- version = None
- os = None
- uptime = 0
- ip = 0
- router = "[none]"
-
- for line in desc:
- rt = re.search(r"^router (\S+) (\S+)", line)
- fp = re.search(r"^opt fingerprint (.+).*on (\S+)", line)
- pl = re.search(r"^platform Tor (\S+).*on (\S+)", line)
- ac = re.search(r"^accept (\S+):([^-]+)(?:-(\d+))?", line)
- rj = re.search(r"^reject (\S+):([^-]+)(?:-(\d+))?", line)
- bw = re.search(r"^bandwidth \d+ \d+ (\d+)", line)
- up = re.search(r"^uptime (\d+)", line)
- if re.search(r"^opt hibernating 1", line):
- #dead = 1 # XXX: Technically this may be stale..
- if ("Running" in ns.flags):
- plog("INFO", "Hibernating router "+ns.nickname+" is running..")
- if ac:
- exitpolicy.append(ExitPolicyLine(True, *ac.groups()))
- elif rj:
- exitpolicy.append(ExitPolicyLine(False, *rj.groups()))
- elif bw:
- bw_observed = int(bw.group(1))
- elif pl:
- version, os = pl.groups()
- elif up:
- uptime = int(up.group(1))
- elif rt:
- router,ip = rt.groups()
- if router != ns.nickname:
- plog("NOTICE", "Got different names " + ns.nickname + " vs " +
- router + " for " + ns.idhex)
- if not bw_observed and not dead and ("Valid" in ns.flags):
- plog("INFO", "No bandwidth for live router " + ns.nickname)
- if not version or not os:
- plog("INFO", "No version and/or OS for router " + ns.nickname)
- return Router(ns.idhex, ns.nickname, bw_observed, dead, exitpolicy,
- ns.flags, ip, version, os, uptime)
- build_from_desc = Callable(build_from_desc)
-
- def update_to(self, new):
- """ Somewhat hackish method to update this router to be a copy of
- 'new' """
- if self.idhex != new.idhex:
- plog("ERROR", "Update of router "+self.nickname+"changes idhex!")
- self.idhex = new.idhex
- self.nickname = new.nickname
- self.bw = new.bw
- self.exitpolicy = new.exitpolicy
- self.flags = new.flags
- self.ip = new.ip
- self.version = new.version
- self.os = new.os
- self.uptime = new.uptime
-
- def will_exit_to(self, ip, port):
- """ Check the entire exitpolicy to see if the router will allow
- connections to 'ip':'port' """
- for line in self.exitpolicy:
- ret = line.check(ip, port)
- if ret != -1:
- return ret
- plog("WARN", "No matching exit line for "+self.nickname)
- return False
-
-class Connection:
- """A Connection represents a connection to the Tor process via the
- control port."""
- def __init__(self, sock):
- """Create a Connection to communicate with the Tor process over the
- socket 'sock'.
- """
- self._handler = None
- self._handleFn = None
- self._sendLock = threading.RLock()
- self._queue = Queue.Queue()
- self._thread = None
- self._closedEx = None
- self._closed = 0
- self._closeHandler = None
- self._eventThread = None
- self._eventQueue = Queue.Queue()
- self._s = BufSock(sock)
- self._debugFile = None
-
- def set_close_handler(self, handler):
- """Call 'handler' when the Tor process has closed its connection or
- given us an exception. If we close normally, no arguments are
- provided; otherwise, it will be called with an exception as its
- argument.
- """
- self._closeHandler = handler
-
- def close(self):
- """Shut down this controller connection"""
- self._sendLock.acquire()
- try:
- self._queue.put("CLOSE")
- self._eventQueue.put((time.time(), "CLOSE"))
- finally:
- self._sendLock.release()
-
- def launch_thread(self, daemon=1):
- """Launch a background thread to handle messages from the Tor process."""
- assert self._thread is None
- t = threading.Thread(target=self._loop)
- if daemon:
- t.setDaemon(daemon)
- t.start()
- self._thread = t
- t = threading.Thread(target=self._eventLoop)
- if daemon:
- t.setDaemon(daemon)
- t.start()
- self._eventThread = t
- return self._thread
-
- def _loop(self):
- """Main subthread loop: Read commands from Tor, and handle them either
- as events or as responses to other commands.
- """
- while 1:
- try:
- isEvent, reply = self._read_reply()
- except:
- self._err(sys.exc_info())
- return
-
- if isEvent:
- if self._handler is not None:
- self._eventQueue.put((time.time(), reply))
- else:
- cb = self._queue.get() # atomic..
- if cb == "CLOSE":
- self._s.close()
- self._s = None
- self._closed = 1
- return
- else:
- cb(reply)
-
- def _err(self, (tp, ex, tb), fromEventLoop=0):
- """DOCDOC"""
- # silent death is bad :(
- traceback.print_exception(tp, ex, tb)
- if self._s:
- try:
- self.close()
- except:
- pass
- self._sendLock.acquire()
- try:
- self._closedEx = ex
- self._closed = 1
- finally:
- self._sendLock.release()
- while 1:
- try:
- cb = self._queue.get(timeout=0)
- if cb != "CLOSE":
- cb("EXCEPTION")
- except Queue.Empty:
- break
- if self._closeHandler is not None:
- self._closeHandler(ex)
- return
-
- def _eventLoop(self):
- """DOCDOC"""
- while 1:
- (timestamp, reply) = self._eventQueue.get()
- if reply[0][0] == "650" and reply[0][1] == "OK":
- plog("DEBUG", "Ignoring incompatible syntactic sugar: 650 OK")
- continue
- if reply == "CLOSE":
- return
- try:
- self._handleFn(timestamp, reply)
- except:
- for code, msg, data in reply:
- plog("WARN", "No event for: "+str(code)+" "+str(msg))
- self._err(sys.exc_info(), 1)
- return
-
- def _sendImpl(self, sendFn, msg):
- """DOCDOC"""
- if self._thread is None:
- self.launch_thread(1)
- # This condition will get notified when we've got a result...
- condition = threading.Condition()
- # Here's where the result goes...
- result = []
-
- if self._closedEx is not None:
- raise self._closedEx
- elif self._closed:
- raise TorCtlClosed()
-
- def cb(reply,condition=condition,result=result):
- condition.acquire()
- try:
- result.append(reply)
- condition.notify()
- finally:
- condition.release()
-
- # Sends a message to Tor...
- self._sendLock.acquire() # ensure queue+sendmsg is atomic
- try:
- self._queue.put(cb)
- sendFn(msg) # _doSend(msg)
- finally:
- self._sendLock.release()
-
- # Now wait till the answer is in...
- condition.acquire()
- try:
- while not result:
- condition.wait()
- finally:
- condition.release()
-
- # ...And handle the answer appropriately.
- assert len(result) == 1
- reply = result[0]
- if reply == "EXCEPTION":
- raise self._closedEx
-
- return reply
-
-
- def debug(self, f):
- """DOCDOC"""
- self._debugFile = f
-
- def set_event_handler(self, handler):
- """Cause future events from the Tor process to be sent to 'handler'.
- """
- self._handler = handler
- self._handleFn = handler._handle1
-
- def _read_reply(self):
- lines = []
- while 1:
- line = self._s.readline().strip()
- if self._debugFile:
- self._debugFile.write(" %s\n" % line)
- if len(line)<4:
- raise ProtocolError("Badly formatted reply line: Too short")
- code = line[:3]
- tp = line[3]
- s = line[4:]
- if tp == "-":
- lines.append((code, s, None))
- elif tp == " ":
- lines.append((code, s, None))
- isEvent = (lines and lines[0][0][0] == '6')
- return isEvent, lines
- elif tp != "+":
- raise ProtocolError("Badly formatted reply line: unknown type %r"%tp)
- else:
- more = []
- while 1:
- line = self._s.readline()
- if self._debugFile:
- self._debugFile.write("+++ %s" % line)
- if line in (".\r\n", ".\n", "650 OK\n", "650 OK\r\n"):
- break
- more.append(line)
- lines.append((code, s, unescape_dots("".join(more))))
- isEvent = (lines and lines[0][0][0] == '6')
- if isEvent: # Need "250 OK" if it's not an event. Otherwise, end
- return (isEvent, lines)
-
- # Notreached
- raise TorCtlError()
-
- def _doSend(self, msg):
- if self._debugFile:
- amsg = msg
- lines = amsg.split("\n")
- if len(lines) > 2:
- amsg = "\n".join(lines[:2]) + "\n"
- self._debugFile.write(">>> %s" % amsg)
- self._s.write(msg)
-
- def sendAndRecv(self, msg="", expectedTypes=("250", "251")):
- """Helper: Send a command 'msg' to Tor, and wait for a command
- in response. If the response type is in expectedTypes,
- return a list of (tp,body,extra) tuples. If it is an
- error, raise ErrorReply. Otherwise, raise ProtocolError.
- """
- if type(msg) == types.ListType:
- msg = "".join(msg)
- assert msg.endswith("\r\n")
-
- lines = self._sendImpl(self._doSend, msg)
- # print lines
- for tp, msg, _ in lines:
- if tp[0] in '45':
- raise ErrorReply("%s %s"%(tp, msg))
- if tp not in expectedTypes:
- raise ProtocolError("Unexpectd message type %r"%tp)
-
- return lines
-
- def authenticate(self, secret=""):
- """Send an authenticating secret to Tor. You'll need to call this
- method before Tor can start.
- """
- #hexstr = binascii.b2a_hex(secret)
- self.sendAndRecv("AUTHENTICATE \"%s\"\r\n"%secret)
-
- def get_option(self, name):
- """Get the value of the configuration option named 'name'. To
- retrieve multiple values, pass a list for 'name' instead of
- a string. Returns a list of (key,value) pairs.
- Refer to section 3.3 of control-spec.txt for a list of valid names.
- """
- if not isinstance(name, str):
- name = " ".join(name)
- lines = self.sendAndRecv("GETCONF %s\r\n" % name)
-
- r = []
- for _,line,_ in lines:
- try:
- key, val = line.split("=", 1)
- r.append((key,val))
- except ValueError:
- r.append((line, None))
-
- return r
-
- def set_option(self, key, value):
- """Set the value of the configuration option 'key' to the value 'value'.
- """
- self.set_options([(key, value)])
-
- def set_options(self, kvlist):
- """Given a list of (key,value) pairs, set them as configuration
- options.
- """
- if not kvlist:
- return
- msg = " ".join(["%s=%s"%(k,quote(v)) for k,v in kvlist])
- self.sendAndRecv("SETCONF %s\r\n"%msg)
-
- def reset_options(self, keylist):
- """Reset the options listed in 'keylist' to their default values.
-
- Tor started implementing this command in version 0.1.1.7-alpha;
- previous versions wanted you to set configuration keys to "".
- That no longer works.
- """
- self.sendAndRecv("RESETCONF %s\r\n"%(" ".join(keylist)))
-
- def get_network_status(self, who="all"):
- """Get the entire network status list. Returns a list of
- TorCtl.NetworkStatus instances."""
- return parse_ns_body(self.sendAndRecv("GETINFO ns/"+who+"\r\n")[0][2])
-
- def get_router(self, ns):
- """Fill in a Router class corresponding to a given NS class"""
- desc = self.sendAndRecv("GETINFO desc/id/" + ns.idhex + "\r\n")[0][2].split("\n")
- return Router.build_from_desc(desc, ns)
-
-
- def read_routers(self, nslist):
- """ Given a list a NetworkStatuses in 'nslist', this function will
- return a list of new Router instances.
- """
- bad_key = 0
- new = []
- for ns in nslist:
- try:
- r = self.get_router(ns)
- new.append(r)
- except ErrorReply:
- bad_key += 1
- if "Running" in ns.flags:
- plog("NOTICE", "Running router "+ns.nickname+"="
- +ns.idhex+" has no descriptor")
- except:
- traceback.print_exception(*sys.exc_info())
- continue
-
- return new
-
- def get_info(self, name):
- """Return the value of the internal information field named 'name'.
- Refer to section 3.9 of control-spec.txt for a list of valid names.
- DOCDOC
- """
- if not isinstance(name, str):
- name = " ".join(name)
- lines = self.sendAndRecv("GETINFO %s\r\n"%name)
- d = {}
- for _,msg,more in lines:
- if msg == "OK":
- break
- try:
- k,rest = msg.split("=",1)
- except ValueError:
- raise ProtocolError("Bad info line %r",msg)
- if more:
- d[k] = more
- else:
- d[k] = rest
- return d
-
- def set_events(self, events, extended=False):
- """Change the list of events that the event handler is interested
- in to those in 'events', which is a list of event names.
- Recognized event names are listed in section 3.3 of the control-spec
- """
- if extended:
- plog ("DEBUG", "SETEVENTS EXTENDED %s\r\n" % " ".join(events))
- self.sendAndRecv("SETEVENTS EXTENDED %s\r\n" % " ".join(events))
- else:
- self.sendAndRecv("SETEVENTS %s\r\n" % " ".join(events))
-
- def save_conf(self):
- """Flush all configuration changes to disk.
- """
- self.sendAndRecv("SAVECONF\r\n")
-
- def send_signal(self, sig):
- """Send the signal 'sig' to the Tor process; The allowed values for
- 'sig' are listed in section 3.6 of control-spec.
- """
- sig = { 0x01 : "HUP",
- 0x02 : "INT",
- 0x03 : "NEWNYM",
- 0x0A : "USR1",
- 0x0C : "USR2",
- 0x0F : "TERM" }.get(sig,sig)
- self.sendAndRecv("SIGNAL %s\r\n"%sig)
-
- def resolve(self, host):
- """ Launch a remote hostname lookup request:
- 'host' may be a hostname or IPv4 address
- """
- # TODO: handle "mode=reverse"
- self.sendAndRecv("RESOLVE %s\r\n"%host)
-
- def map_address(self, kvList):
- """ Sends the MAPADDRESS command for each of the tuples in kvList """
- if not kvList:
- return
- m = " ".join([ "%s=%s" for k,v in kvList])
- lines = self.sendAndRecv("MAPADDRESS %s\r\n"%m)
- r = []
- for _,line,_ in lines:
- try:
- key, val = line.split("=", 1)
- except ValueError:
- raise ProtocolError("Bad address line %r",v)
- r.append((key,val))
- return r
-
- def extend_circuit(self, circid, hops):
- """Tell Tor to extend the circuit identified by 'circid' through the
- servers named in the list 'hops'.
- """
- if circid is None:
- circid = "0"
- plog("DEBUG", "Extending circuit")
- lines = self.sendAndRecv("EXTENDCIRCUIT %d %s\r\n"
- %(circid, ",".join(hops)))
- tp,msg,_ = lines[0]
- m = re.match(r'EXTENDED (\S*)', msg)
- if not m:
- raise ProtocolError("Bad extended line %r",msg)
- plog("DEBUG", "Circuit extended")
- return int(m.group(1))
-
- def redirect_stream(self, streamid, newaddr, newport=""):
- """DOCDOC"""
- if newport:
- self.sendAndRecv("REDIRECTSTREAM %d %s %s\r\n"%(streamid, newaddr, newport))
- else:
- self.sendAndRecv("REDIRECTSTREAM %d %s\r\n"%(streamid, newaddr))
-
- def attach_stream(self, streamid, circid, hop=None):
- """Attach a stream to a circuit, specify both by IDs. If hop is given,
- try to use the specified hop in the circuit as the exit node for
- this stream.
- """
- if hop:
- self.sendAndRecv("ATTACHSTREAM %d %d HOP=%d\r\n"%(streamid, circid, hop))
- plog("DEBUG", "Attaching stream: "+str(streamid)+" to hop "+str(hop)+" of circuit "+str(circid))
- else:
- self.sendAndRecv("ATTACHSTREAM %d %d\r\n"%(streamid, circid))
- plog("DEBUG", "Attaching stream: "+str(streamid)+" to circuit "+str(circid))
-
- def close_stream(self, streamid, reason=0, flags=()):
- """DOCDOC"""
- self.sendAndRecv("CLOSESTREAM %d %s %s\r\n"
- %(streamid, reason, "".join(flags)))
-
- def close_circuit(self, circid, reason=0, flags=()):
- """DOCDOC"""
- self.sendAndRecv("CLOSECIRCUIT %d %s %s\r\n"
- %(circid, reason, "".join(flags)))
-
- def post_descriptor(self, desc):
- self.sendAndRecv("+POSTDESCRIPTOR\r\n%s"%escape_dots(desc))
-
-def parse_ns_body(data):
- """Parse the body of an NS event or command into a list of
- NetworkStatus instances"""
- nsgroups = re.compile(r"^r ", re.M).split(data)
- nsgroups.pop(0)
- nslist = []
- for nsline in nsgroups:
- m = re.search(r"^s((?:\s\S*)+)", nsline, re.M)
- flags = m.groups()
- flags = flags[0].strip().split(" ")
- m = re.match(r"(\S+)\s(\S+)\s(\S+)\s(\S+\s\S+)\s(\S+)\s(\d+)\s(\d+)", nsline)
- nslist.append(NetworkStatus(*(m.groups() + (flags,))))
- return nslist
-
-class EventHandler:
- """An 'EventHandler' wraps callbacks for the events Tor can return.
- Each event argument is an instance of the corresponding event
- class."""
- def __init__(self):
- """Create a new EventHandler."""
- self._map1 = {
- "CIRC" : self.circ_status_event,
- "STREAM" : self.stream_status_event,
- "ORCONN" : self.or_conn_status_event,
- "STREAM_BW" : self.stream_bw_event,
- "BW" : self.bandwidth_event,
- "DEBUG" : self.msg_event,
- "INFO" : self.msg_event,
- "NOTICE" : self.msg_event,
- "WARN" : self.msg_event,
- "ERR" : self.msg_event,
- "NEWDESC" : self.new_desc_event,
- "ADDRMAP" : self.address_mapped_event,
- "NS" : self.ns_event
- }
-
- def _handle1(self, timestamp, lines):
- """Dispatcher: called from Connection when an event is received."""
- for code, msg, data in lines:
- event = self._decode1(msg, data)
- event.arrived_at = timestamp
- self.heartbeat_event(event)
- self._map1.get(event.event_name, self.unknown_event)(event)
-
- def _decode1(self, body, data):
- """Unpack an event message into a type/arguments-tuple tuple."""
- if " " in body:
- evtype,body = body.split(" ",1)
- else:
- evtype,body = body,""
- evtype = evtype.upper()
- if evtype == "CIRC":
- m = re.match(r"(\d+)\s+(\S+)(\s\S+)?(\s\S+)?(\s\S+)?", body)
- if not m:
- raise ProtocolError("CIRC event misformatted.")
- ident,status,path,reason,remote = m.groups()
- ident = int(ident)
- if path:
- if "REASON=" in path:
- remote = reason
- reason = path
- path=[]
- else:
- path = path.strip().split(",")
- else:
- path = []
- if reason: reason = reason[8:]
- if remote: remote = remote[15:]
- event = CircuitEvent(evtype, ident, status, path, reason, remote)
- elif evtype == "STREAM":
- #plog("DEBUG", "STREAM: "+body)
- m = re.match(r"(\S+)\s+(\S+)\s+(\S+)\s+(\S+):(\d+)(\sREASON=\S+)?(\sREMOTE_REASON=\S+)?(\sSOURCE=\S+)?(\sSOURCE_ADDR=\S+)?(\sPURPOSE=\S+)?", body)
- if not m:
- raise ProtocolError("STREAM event misformatted.")
- ident,status,circ,target_host,target_port,reason,remote,source,source_addr,purpose = m.groups()
- ident,circ = map(int, (ident,circ))
- if reason: reason = reason[8:]
- if remote: remote = remote[15:]
- if source: source = source[8:]
- if source_addr: source_addr = source_addr[13:]
- if purpose: purpose = purpose[9:]
- event = StreamEvent(evtype, ident, status, circ, target_host,
- int(target_port), reason, remote, source, source_addr, purpose)
- elif evtype == "ORCONN":
- m = re.match(r"(\S+)\s+(\S+)(\sAGE=\S+)?(\sREAD=\S+)?(\sWRITTEN=\S+)?(\sREASON=\S+)?(\sNCIRCS=\S+)?", body)
- if not m:
- raise ProtocolError("ORCONN event misformatted.")
- target, status, age, read, wrote, reason, ncircs = m.groups()
-
- #plog("DEBUG", "ORCONN: "+body)
- if ncircs: ncircs = int(ncircs[8:])
- else: ncircs = 0
- if reason: reason = reason[8:]
- if age: age = int(age[5:])
- else: age = 0
- if read: read = int(read[6:])
- else: read = 0
- if wrote: wrote = int(wrote[9:])
- else: wrote = 0
- event = ORConnEvent(evtype, status, target, age, read, wrote,
- reason, ncircs)
- elif evtype == "STREAM_BW":
- m = re.match(r"(\d+)\s+(\d+)\s+(\d+)", body)
- if not m:
- raise ProtocolError("STREAM_BW event misformatted.")
- event = StreamBwEvent(evtype, *m.groups())
- elif evtype == "BW":
- m = re.match(r"(\d+)\s+(\d+)", body)
- if not m:
- raise ProtocolError("BANDWIDTH event misformatted.")
- read, written = map(long, m.groups())
- event = BWEvent(evtype, read, written)
- elif evtype in ("DEBUG", "INFO", "NOTICE", "WARN", "ERR"):
- event = LogEvent(evtype, body)
- elif evtype == "NEWDESC":
- event = NewDescEvent(evtype, body.split(" "))
- elif evtype == "ADDRMAP":
- # TODO: Also parse errors and GMTExpiry
- m = re.match(r'(\S+)\s+(\S+)\s+(\"[^"]+\"|\w+)', body)
- if not m:
- raise ProtocolError("ADDRMAP event misformatted.")
- fromaddr, toaddr, when = m.groups()
- if when.upper() == "NEVER":
- when = None
- else:
- when = time.strptime(when[1:-1], "%Y-%m-%d %H:%M:%S")
- event = AddrMapEvent(evtype, fromaddr, toaddr, when)
- elif evtype == "NS":
- event = NetworkStatusEvent(evtype, parse_ns_body(data))
- else:
- event = UnknownEvent(evtype, body)
-
- return event
-
- def heartbeat_event(self, event):
- """Called before any event is recieved. Convenience function
- for any cleanup/setup/reconfiguration you may need to do.
- """
- pass
-
- def unknown_event(self, event):
- """Called when we get an event type we don't recognize. This
- is almost alwyas an error.
- """
- raise NotImplemented()
-
- def circ_status_event(self, event):
- """Called when a circuit status changes if listening to CIRCSTATUS
- events."""
- raise NotImplemented()
-
- def stream_status_event(self, event):
- """Called when a stream status changes if listening to STREAMSTATUS
- events. """
- raise NotImplemented()
-
- def stream_bw_event(self, event):
- raise NotImplemented()
-
- def or_conn_status_event(self, event):
- """Called when an OR connection's status changes if listening to
- ORCONNSTATUS events."""
- raise NotImplemented()
-
- def bandwidth_event(self, event):
- """Called once a second if listening to BANDWIDTH events.
- """
- raise NotImplemented()
-
- def new_desc_event(self, event):
- """Called when Tor learns a new server descriptor if listenting to
- NEWDESC events.
- """
- raise NotImplemented()
-
- def msg_event(self, event):
- """Called when a log message of a given severity arrives if listening
- to INFO_MSG, NOTICE_MSG, WARN_MSG, or ERR_MSG events."""
- raise NotImplemented()
-
- def ns_event(self, event):
- raise NotImplemented()
-
- def address_mapped_event(self, event):
- """Called when Tor adds a mapping for an address if listening
- to ADDRESSMAPPED events.
- """
- raise NotImplemented()
-
-
-class DebugEventHandler(EventHandler):
- """Trivial debug event handler: reassembles all parsed events to stdout."""
- def circ_status_event(self, circ_event): # CircuitEvent()
- output = [circ_event.event_name, str(circ_event.circ_id),
- circ_event.status]
- if circ_event.path:
- output.append(",".join(circ_event.path))
- if circ_event.reason:
- output.append("REASON=" + circ_event.reason)
- if circ_event.remote_reason:
- output.append("REMOTE_REASON=" + circ_event.remote_reason)
- print " ".join(output)
-
- def stream_status_event(self, strm_event):
- output = [strm_event.event_name, str(strm_event.strm_id),
- strm_event.status, str(strm_event.circ_id),
- strm_event.target_host, str(strm_event.target_port)]
- if strm_event.reason:
- output.append("REASON=" + strm_event.reason)
- if strm_event.remote_reason:
- output.append("REMOTE_REASON=" + strm_event.remote_reason)
- print " ".join(output)
-
- def ns_event(self, ns_event):
- for ns in ns_event.nslist:
- print " ".join((ns_event.event_name, ns.nickname, ns.idhash,
- ns.updated.isoformat(), ns.ip, str(ns.orport),
- str(ns.dirport), " ".join(ns.flags)))
-
- def new_desc_event(self, newdesc_event):
- print " ".join((newdesc_event.event_name, " ".join(newdesc_event.idlist)))
-
- def or_conn_status_event(self, orconn_event):
- if orconn_event.age: age = "AGE="+str(orconn_event.age)
- else: age = ""
- if orconn_event.read_bytes: read = "READ="+str(orconn_event.read_bytes)
- else: read = ""
- if orconn_event.wrote_bytes: wrote = "WRITTEN="+str(orconn_event.wrote_bytes)
- else: wrote = ""
- if orconn_event.reason: reason = "REASON="+orconn_event.reason
- else: reason = ""
- if orconn_event.ncircs: ncircs = "NCIRCS="+str(orconn_event.ncircs)
- else: ncircs = ""
- print " ".join((orconn_event.event_name, orconn_event.endpoint,
- orconn_event.status, age, read, wrote, reason, ncircs))
-
- def msg_event(self, log_event):
- print log_event.event_name+" "+log_event.msg
-
- def bandwidth_event(self, bw_event):
- print bw_event.event_name+" "+str(bw_event.read)+" "+str(bw_event.written)
-
-def parseHostAndPort(h):
- """Given a string of the form 'address:port' or 'address' or
- 'port' or '', return a two-tuple of (address, port)
- """
- host, port = "localhost", 9100
- if ":" in h:
- i = h.index(":")
- host = h[:i]
- try:
- port = int(h[i+1:])
- except ValueError:
- print "Bad hostname %r"%h
- sys.exit(1)
- elif h:
- try:
- port = int(h)
- except ValueError:
- host = h
-
- return host, port
-
-def run_example(host,port):
- """ Example of basic TorCtl usage. See PathSupport for more advanced
- usage.
- """
- print "host is %s:%d"%(host,port)
- s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
- s.connect((host,port))
- c = Connection(s)
- c.set_event_handler(DebugEventHandler())
- th = c.launch_thread()
- c.authenticate()
- print "nick",`c.get_option("nickname")`
- print `c.get_info("version")`
- #print `c.get_info("desc/name/moria1")`
- print `c.get_info("network-status")`
- print `c.get_info("addr-mappings/all")`
- print `c.get_info("addr-mappings/config")`
- print `c.get_info("addr-mappings/cache")`
- print `c.get_info("addr-mappings/control")`
-
- print `c.extend_circuit(0,["moria1"])`
- try:
- print `c.extend_circuit(0,[""])`
- except ErrorReply: # wtf?
- print "got error. good."
- except:
- print "Strange error", sys.exc_info()[0]
-
- #send_signal(s,1)
- #save_conf(s)
-
- #set_option(s,"1")
- #set_option(s,"bandwidthburstbytes 100000")
- #set_option(s,"runasdaemon 1")
- #set_events(s,[EVENT_TYPE.WARN])
-# c.set_events([EVENT_TYPE.ORCONN], True)
- c.set_events([EVENT_TYPE.STREAM, EVENT_TYPE.CIRC,
- EVENT_TYPE.NS, EVENT_TYPE.NEWDESC,
- EVENT_TYPE.ORCONN, EVENT_TYPE.BW], True)
-
- th.join()
- return
-
-if __name__ == '__main__':
- if len(sys.argv) > 2:
- print "Syntax: TorControl.py torhost:torport"
- sys.exit(0)
- else:
- sys.argv.append("localhost:9051")
- sh,sp = parseHostAndPort(sys.argv[1])
- run_example(sh,sp)
-
Copied: torflow/branches/gsoc2008/TorCtl.local/TorCtl.py (from rev 15741, torflow/branches/gsoc2008/TorCtl/TorCtl.py)
===================================================================
--- torflow/branches/gsoc2008/TorCtl.local/TorCtl.py (rev 0)
+++ torflow/branches/gsoc2008/TorCtl.local/TorCtl.py 2008-07-07 19:37:47 UTC (rev 15742)
@@ -0,0 +1,1091 @@
+#!/usr/bin/python
+# TorCtl.py -- Python module to interface with Tor Control interface.
+# Copyright 2005 Nick Mathewson
+# Copyright 2007 Mike Perry. See LICENSE file.
+
+"""
+Library to control Tor processes.
+
+This library handles sending commands, parsing responses, and delivering
+events to and from the control port. The basic usage is to create a
+socket, wrap that in a TorCtl.Connection, and then add an EventHandler
+to that connection. A simple example with a DebugEventHandler (that just
+echoes the events back to stdout) is present in run_example().
+
+Note that the TorCtl.Connection is fully compatible with the more
+advanced EventHandlers in TorCtl.PathSupport (and of course any other
+custom event handlers that you may extend off of those).
+
+This package also contains a helper class for representing Routers, and
+classes and constants for each event.
+
+"""
+
+__all__ = ["EVENT_TYPE", "TorCtlError", "TorCtlClosed", "ProtocolError",
+ "ErrorReply", "NetworkStatus", "ExitPolicyLine", "Router",
+ "RouterVersion", "Connection", "parse_ns_body",
+ "EventHandler", "DebugEventHandler", "NetworkStatusEvent",
+ "NewDescEvent", "CircuitEvent", "StreamEvent", "ORConnEvent",
+ "StreamBwEvent", "LogEvent", "AddrMapEvent", "BWEvent",
+ "UnknownEvent" ]
+
+import os
+import re
+import struct
+import sys
+import threading
+import Queue
+import datetime
+import traceback
+import socket
+import binascii
+import types
+import time
+from TorUtil import *
+
+# Types of "EVENT" message.
+EVENT_TYPE = Enum2(
+ CIRC="CIRC",
+ STREAM="STREAM",
+ ORCONN="ORCONN",
+ STREAM_BW="STREAM_BW",
+ BW="BW",
+ NS="NS",
+ NEWDESC="NEWDESC",
+ ADDRMAP="ADDRMAP",
+ DEBUG="DEBUG",
+ INFO="INFO",
+ NOTICE="NOTICE",
+ WARN="WARN",
+ ERR="ERR")
+
+class TorCtlError(Exception):
+ "Generic error raised by TorControl code."
+ pass
+
+class TorCtlClosed(TorCtlError):
+ "Raised when the controller connection is closed by Tor (not by us.)"
+ pass
+
+class ProtocolError(TorCtlError):
+ "Raised on violations in Tor controller protocol"
+ pass
+
+class ErrorReply(TorCtlError):
+ "Raised when Tor controller returns an error"
+ pass
+
+class NetworkStatus:
+ "Filled in during NS events"
+ def __init__(self, nickname, idhash, orhash, updated, ip, orport, dirport, flags):
+ self.nickname = nickname
+ self.idhash = idhash
+ self.orhash = orhash
+ self.ip = ip
+ self.orport = int(orport)
+ self.dirport = int(dirport)
+ self.flags = flags
+ self.idhex = (self.idhash + "=").decode("base64").encode("hex").upper()
+ m = re.search(r"(\d+)-(\d+)-(\d+) (\d+):(\d+):(\d+)", updated)
+ self.updated = datetime.datetime(*map(int, m.groups()))
+
+class NetworkStatusEvent:
+ def __init__(self, event_name, nslist):
+ self.event_name = event_name
+ self.arrived_at = 0
+ self.nslist = nslist # List of NetworkStatus objects
+
+class NewDescEvent:
+ def __init__(self, event_name, idlist):
+ self.event_name = event_name
+ self.arrived_at = 0
+ self.idlist = idlist
+
+class CircuitEvent:
+ def __init__(self, event_name, circ_id, status, path, reason,
+ remote_reason):
+ self.event_name = event_name
+ self.arrived_at = 0
+ self.circ_id = circ_id
+ self.status = status
+ self.path = path
+ self.reason = reason
+ self.remote_reason = remote_reason
+
+class StreamEvent:
+ def __init__(self, event_name, strm_id, status, circ_id, target_host,
+ target_port, reason, remote_reason, source, source_addr, purpose):
+ self.event_name = event_name
+ self.arrived_at = 0
+ self.strm_id = strm_id
+ self.status = status
+ self.circ_id = circ_id
+ self.target_host = target_host
+ self.target_port = int(target_port)
+ self.reason = reason
+ self.remote_reason = remote_reason
+ self.source = source
+ self.source_addr = source_addr
+ self.purpose = purpose
+
+class ORConnEvent:
+ def __init__(self, event_name, status, endpoint, age, read_bytes,
+ wrote_bytes, reason, ncircs):
+ self.event_name = event_name
+ self.arrived_at = 0
+ self.status = status
+ self.endpoint = endpoint
+ self.age = age
+ self.read_bytes = read_bytes
+ self.wrote_bytes = wrote_bytes
+ self.reason = reason
+ self.ncircs = ncircs
+
+class StreamBwEvent:
+ def __init__(self, event_name, strm_id, read, written):
+ self.event_name = event_name
+ self.strm_id = int(strm_id)
+ self.bytes_read = int(read)
+ self.bytes_written = int(written)
+
+class LogEvent:
+ def __init__(self, level, msg):
+ self.event_name = self.level = level
+ self.msg = msg
+
+class AddrMapEvent:
+ def __init__(self, event_name, from_addr, to_addr, when):
+ self.event_name = event_name
+ self.from_addr = from_addr
+ self.to_addr = to_addr
+ self.when = when
+
+class BWEvent:
+ def __init__(self, event_name, read, written):
+ self.event_name = event_name
+ self.read = read
+ self.written = written
+
+class UnknownEvent:
+ def __init__(self, event_name, event_string):
+ self.event_name = event_name
+ self.event_string = event_string
+
+class ExitPolicyLine:
+ """ Class to represent a line in a Router's exit policy in a way
+ that can be easily checked. """
+ def __init__(self, match, ip_mask, port_low, port_high):
+ self.match = match
+ if ip_mask == "*":
+ self.ip = 0
+ self.netmask = 0
+ else:
+ if not "/" in ip_mask:
+ self.netmask = 0xFFFFFFFF
+ ip = ip_mask
+ else:
+ ip, mask = ip_mask.split("/")
+ if re.match(r"\d+.\d+.\d+.\d+", mask):
+ self.netmask=struct.unpack(">I", socket.inet_aton(mask))[0]
+ else:
+ self.netmask = ~(2**(32 - int(mask)) - 1)
+ self.ip = struct.unpack(">I", socket.inet_aton(ip))[0]
+ self.ip &= self.netmask
+ if port_low == "*":
+ self.port_low,self.port_high = (0,65535)
+ else:
+ if not port_high:
+ port_high = port_low
+ self.port_low = int(port_low)
+ self.port_high = int(port_high)
+
+ def check(self, ip, port):
+ """Check to see if an ip and port is matched by this line.
+ Returns true if the line is an Accept, and False if it is a Reject. """
+ ip = struct.unpack(">I", socket.inet_aton(ip))[0]
+ if (ip & self.netmask) == self.ip:
+ if self.port_low <= port and port <= self.port_high:
+ return self.match
+ return -1
+
+class RouterVersion:
+ """ Represents a Router's version. Overloads all comparison operators
+ to check for newer, older, or equivalent versions. """
+ def __init__(self, version):
+ if version:
+ v = re.search("^(\d+).(\d+).(\d+).(\d+)", version).groups()
+ self.version = int(v[0])*0x1000000 + int(v[1])*0x10000 + int(v[2])*0x100 + int(v[3])
+ self.ver_string = version
+ else:
+ self.version = version
+ self.ver_string = "unknown"
+
+ def __lt__(self, other): return self.version < other.version
+ def __gt__(self, other): return self.version > other.version
+ def __ge__(self, other): return self.version >= other.version
+ def __le__(self, other): return self.version <= other.version
+ def __eq__(self, other): return self.version == other.version
+ def __ne__(self, other): return self.version != other.version
+ def __str__(self): return self.ver_string
+
+class Router:
+ """
+ Class to represent a router from a descriptor. Can either be
+ created from the parsed fields, or can be built from a
+ descriptor+NetworkStatus
+ """
+ def __init__(self, idhex, name, bw, down, exitpolicy, flags, ip, version, os, uptime):
+ self.idhex = idhex
+ self.nickname = name
+ self.bw = bw
+ self.exitpolicy = exitpolicy
+ self.flags = flags
+ self.down = down
+ self.ip = struct.unpack(">I", socket.inet_aton(ip))[0]
+ self.version = RouterVersion(version)
+ self.os = os
+ self.list_rank = 0 # position in a sorted list of routers.
+ self.uptime = uptime
+
+ def __str__(self):
+ s = self.idhex, self.nickname
+ return s.__str__()
+
+ def build_from_desc(desc, ns):
+ """
+ Static method of Router that parses a descriptor string into this class.
+ 'desc' is a full descriptor as a string.
+ 'ns' is a TorCtl.NetworkStatus instance for this router (needed for
+ the flags, the nickname, and the idhex string).
+ Returns a Router instance.
+ """
+ # XXX: Compile these regular expressions? This is an expensive process
+ # Use http://docs.python.org/lib/profile.html to verify this is
+ # the part of startup that is slow
+ exitpolicy = []
+ dead = not ("Running" in ns.flags)
+ bw_observed = 0
+ version = None
+ os = None
+ uptime = 0
+ ip = 0
+ router = "[none]"
+
+ for line in desc:
+ rt = re.search(r"^router (\S+) (\S+)", line)
+ fp = re.search(r"^opt fingerprint (.+).*on (\S+)", line)
+ pl = re.search(r"^platform Tor (\S+).*on (\S+)", line)
+ ac = re.search(r"^accept (\S+):([^-]+)(?:-(\d+))?", line)
+ rj = re.search(r"^reject (\S+):([^-]+)(?:-(\d+))?", line)
+ bw = re.search(r"^bandwidth \d+ \d+ (\d+)", line)
+ up = re.search(r"^uptime (\d+)", line)
+ if re.search(r"^opt hibernating 1", line):
+ #dead = 1 # XXX: Technically this may be stale..
+ if ("Running" in ns.flags):
+ plog("INFO", "Hibernating router "+ns.nickname+" is running..")
+ if ac:
+ exitpolicy.append(ExitPolicyLine(True, *ac.groups()))
+ elif rj:
+ exitpolicy.append(ExitPolicyLine(False, *rj.groups()))
+ elif bw:
+ bw_observed = int(bw.group(1))
+ elif pl:
+ version, os = pl.groups()
+ elif up:
+ uptime = int(up.group(1))
+ elif rt:
+ router,ip = rt.groups()
+ if router != ns.nickname:
+ plog("NOTICE", "Got different names " + ns.nickname + " vs " +
+ router + " for " + ns.idhex)
+ if not bw_observed and not dead and ("Valid" in ns.flags):
+ plog("INFO", "No bandwidth for live router " + ns.nickname)
+ if not version or not os:
+ plog("INFO", "No version and/or OS for router " + ns.nickname)
+ return Router(ns.idhex, ns.nickname, bw_observed, dead, exitpolicy,
+ ns.flags, ip, version, os, uptime)
+ build_from_desc = Callable(build_from_desc)
+
+ def update_to(self, new):
+ """ Somewhat hackish method to update this router to be a copy of
+ 'new' """
+ if self.idhex != new.idhex:
+ plog("ERROR", "Update of router "+self.nickname+"changes idhex!")
+ self.idhex = new.idhex
+ self.nickname = new.nickname
+ self.bw = new.bw
+ self.exitpolicy = new.exitpolicy
+ self.flags = new.flags
+ self.ip = new.ip
+ self.version = new.version
+ self.os = new.os
+ self.uptime = new.uptime
+
+ def will_exit_to(self, ip, port):
+ """ Check the entire exitpolicy to see if the router will allow
+ connections to 'ip':'port' """
+ for line in self.exitpolicy:
+ ret = line.check(ip, port)
+ if ret != -1:
+ return ret
+ plog("WARN", "No matching exit line for "+self.nickname)
+ return False
+
+class Connection:
+ """A Connection represents a connection to the Tor process via the
+ control port."""
+ def __init__(self, sock):
+ """Create a Connection to communicate with the Tor process over the
+ socket 'sock'.
+ """
+ self._handler = None
+ self._handleFn = None
+ self._sendLock = threading.RLock()
+ self._queue = Queue.Queue()
+ self._thread = None
+ self._closedEx = None
+ self._closed = 0
+ self._closeHandler = None
+ self._eventThread = None
+ self._eventQueue = Queue.Queue()
+ self._s = BufSock(sock)
+ self._debugFile = None
+
+ def set_close_handler(self, handler):
+ """Call 'handler' when the Tor process has closed its connection or
+ given us an exception. If we close normally, no arguments are
+ provided; otherwise, it will be called with an exception as its
+ argument.
+ """
+ self._closeHandler = handler
+
+ def close(self):
+ """Shut down this controller connection"""
+ self._sendLock.acquire()
+ try:
+ self._queue.put("CLOSE")
+ self._eventQueue.put((time.time(), "CLOSE"))
+ finally:
+ self._sendLock.release()
+
+ def launch_thread(self, daemon=1):
+ """Launch a background thread to handle messages from the Tor process."""
+ assert self._thread is None
+ t = threading.Thread(target=self._loop)
+ if daemon:
+ t.setDaemon(daemon)
+ t.start()
+ self._thread = t
+ t = threading.Thread(target=self._eventLoop)
+ if daemon:
+ t.setDaemon(daemon)
+ t.start()
+ self._eventThread = t
+ return self._thread
+
+ def _loop(self):
+ """Main subthread loop: Read commands from Tor, and handle them either
+ as events or as responses to other commands.
+ """
+ while 1:
+ try:
+ isEvent, reply = self._read_reply()
+ except:
+ self._err(sys.exc_info())
+ return
+
+ if isEvent:
+ if self._handler is not None:
+ self._eventQueue.put((time.time(), reply))
+ else:
+ cb = self._queue.get() # atomic..
+ if cb == "CLOSE":
+ self._s.close()
+ self._s = None
+ self._closed = 1
+ return
+ else:
+ cb(reply)
+
+ def _err(self, (tp, ex, tb), fromEventLoop=0):
+ """DOCDOC"""
+ # silent death is bad :(
+ traceback.print_exception(tp, ex, tb)
+ if self._s:
+ try:
+ self.close()
+ except:
+ pass
+ self._sendLock.acquire()
+ try:
+ self._closedEx = ex
+ self._closed = 1
+ finally:
+ self._sendLock.release()
+ while 1:
+ try:
+ cb = self._queue.get(timeout=0)
+ if cb != "CLOSE":
+ cb("EXCEPTION")
+ except Queue.Empty:
+ break
+ if self._closeHandler is not None:
+ self._closeHandler(ex)
+ return
+
+ def _eventLoop(self):
+ """DOCDOC"""
+ while 1:
+ (timestamp, reply) = self._eventQueue.get()
+ if reply[0][0] == "650" and reply[0][1] == "OK":
+ plog("DEBUG", "Ignoring incompatible syntactic sugar: 650 OK")
+ continue
+ if reply == "CLOSE":
+ return
+ try:
+ self._handleFn(timestamp, reply)
+ except:
+ for code, msg, data in reply:
+ plog("WARN", "No event for: "+str(code)+" "+str(msg))
+ self._err(sys.exc_info(), 1)
+ return
+
+ def _sendImpl(self, sendFn, msg):
+ """DOCDOC"""
+ if self._thread is None:
+ self.launch_thread(1)
+ # This condition will get notified when we've got a result...
+ condition = threading.Condition()
+ # Here's where the result goes...
+ result = []
+
+ if self._closedEx is not None:
+ raise self._closedEx
+ elif self._closed:
+ raise TorCtlClosed()
+
+ def cb(reply,condition=condition,result=result):
+ condition.acquire()
+ try:
+ result.append(reply)
+ condition.notify()
+ finally:
+ condition.release()
+
+ # Sends a message to Tor...
+ self._sendLock.acquire() # ensure queue+sendmsg is atomic
+ try:
+ self._queue.put(cb)
+ sendFn(msg) # _doSend(msg)
+ finally:
+ self._sendLock.release()
+
+ # Now wait till the answer is in...
+ condition.acquire()
+ try:
+ while not result:
+ condition.wait()
+ finally:
+ condition.release()
+
+ # ...And handle the answer appropriately.
+ assert len(result) == 1
+ reply = result[0]
+ if reply == "EXCEPTION":
+ raise self._closedEx
+
+ return reply
+
+
+ def debug(self, f):
+ """DOCDOC"""
+ self._debugFile = f
+
+ def set_event_handler(self, handler):
+ """Cause future events from the Tor process to be sent to 'handler'.
+ """
+ self._handler = handler
+ self._handleFn = handler._handle1
+
+ def _read_reply(self):
+ lines = []
+ while 1:
+ line = self._s.readline().strip()
+ if self._debugFile:
+ self._debugFile.write(" %s\n" % line)
+ if len(line)<4:
+ raise ProtocolError("Badly formatted reply line: Too short")
+ code = line[:3]
+ tp = line[3]
+ s = line[4:]
+ if tp == "-":
+ lines.append((code, s, None))
+ elif tp == " ":
+ lines.append((code, s, None))
+ isEvent = (lines and lines[0][0][0] == '6')
+ return isEvent, lines
+ elif tp != "+":
+ raise ProtocolError("Badly formatted reply line: unknown type %r"%tp)
+ else:
+ more = []
+ while 1:
+ line = self._s.readline()
+ if self._debugFile:
+ self._debugFile.write("+++ %s" % line)
+ if line in (".\r\n", ".\n", "650 OK\n", "650 OK\r\n"):
+ break
+ more.append(line)
+ lines.append((code, s, unescape_dots("".join(more))))
+ isEvent = (lines and lines[0][0][0] == '6')
+ if isEvent: # Need "250 OK" if it's not an event. Otherwise, end
+ return (isEvent, lines)
+
+ # Notreached
+ raise TorCtlError()
+
+ def _doSend(self, msg):
+ if self._debugFile:
+ amsg = msg
+ lines = amsg.split("\n")
+ if len(lines) > 2:
+ amsg = "\n".join(lines[:2]) + "\n"
+ self._debugFile.write(">>> %s" % amsg)
+ self._s.write(msg)
+
+ def sendAndRecv(self, msg="", expectedTypes=("250", "251")):
+ """Helper: Send a command 'msg' to Tor, and wait for a command
+ in response. If the response type is in expectedTypes,
+ return a list of (tp,body,extra) tuples. If it is an
+ error, raise ErrorReply. Otherwise, raise ProtocolError.
+ """
+ if type(msg) == types.ListType:
+ msg = "".join(msg)
+ assert msg.endswith("\r\n")
+
+ lines = self._sendImpl(self._doSend, msg)
+ # print lines
+ for tp, msg, _ in lines:
+ if tp[0] in '45':
+ raise ErrorReply("%s %s"%(tp, msg))
+ if tp not in expectedTypes:
+ raise ProtocolError("Unexpectd message type %r"%tp)
+
+ return lines
+
+ def authenticate(self, secret=""):
+ """Send an authenticating secret to Tor. You'll need to call this
+ method before Tor can start.
+ """
+ #hexstr = binascii.b2a_hex(secret)
+ self.sendAndRecv("AUTHENTICATE \"%s\"\r\n"%secret)
+
+ def get_option(self, name):
+ """Get the value of the configuration option named 'name'. To
+ retrieve multiple values, pass a list for 'name' instead of
+ a string. Returns a list of (key,value) pairs.
+ Refer to section 3.3 of control-spec.txt for a list of valid names.
+ """
+ if not isinstance(name, str):
+ name = " ".join(name)
+ lines = self.sendAndRecv("GETCONF %s\r\n" % name)
+
+ r = []
+ for _,line,_ in lines:
+ try:
+ key, val = line.split("=", 1)
+ r.append((key,val))
+ except ValueError:
+ r.append((line, None))
+
+ return r
+
+ def set_option(self, key, value):
+ """Set the value of the configuration option 'key' to the value 'value'.
+ """
+ self.set_options([(key, value)])
+
+ def set_options(self, kvlist):
+ """Given a list of (key,value) pairs, set them as configuration
+ options.
+ """
+ if not kvlist:
+ return
+ msg = " ".join(["%s=%s"%(k,quote(v)) for k,v in kvlist])
+ self.sendAndRecv("SETCONF %s\r\n"%msg)
+
+ def reset_options(self, keylist):
+ """Reset the options listed in 'keylist' to their default values.
+
+ Tor started implementing this command in version 0.1.1.7-alpha;
+ previous versions wanted you to set configuration keys to "".
+ That no longer works.
+ """
+ self.sendAndRecv("RESETCONF %s\r\n"%(" ".join(keylist)))
+
+ def get_network_status(self, who="all"):
+ """Get the entire network status list. Returns a list of
+ TorCtl.NetworkStatus instances."""
+ return parse_ns_body(self.sendAndRecv("GETINFO ns/"+who+"\r\n")[0][2])
+
+ def get_router(self, ns):
+ """Fill in a Router class corresponding to a given NS class"""
+ desc = self.sendAndRecv("GETINFO desc/id/" + ns.idhex + "\r\n")[0][2].split("\n")
+ return Router.build_from_desc(desc, ns)
+
+
+ def read_routers(self, nslist):
+ """ Given a list a NetworkStatuses in 'nslist', this function will
+ return a list of new Router instances.
+ """
+ bad_key = 0
+ new = []
+ for ns in nslist:
+ try:
+ r = self.get_router(ns)
+ new.append(r)
+ except ErrorReply:
+ bad_key += 1
+ if "Running" in ns.flags:
+ plog("NOTICE", "Running router "+ns.nickname+"="
+ +ns.idhex+" has no descriptor")
+ except:
+ traceback.print_exception(*sys.exc_info())
+ continue
+
+ return new
+
+ def get_info(self, name):
+ """Return the value of the internal information field named 'name'.
+ Refer to section 3.9 of control-spec.txt for a list of valid names.
+ DOCDOC
+ """
+ if not isinstance(name, str):
+ name = " ".join(name)
+ lines = self.sendAndRecv("GETINFO %s\r\n"%name)
+ d = {}
+ for _,msg,more in lines:
+ if msg == "OK":
+ break
+ try:
+ k,rest = msg.split("=",1)
+ except ValueError:
+ raise ProtocolError("Bad info line %r",msg)
+ if more:
+ d[k] = more
+ else:
+ d[k] = rest
+ return d
+
+ def set_events(self, events, extended=False):
+ """Change the list of events that the event handler is interested
+ in to those in 'events', which is a list of event names.
+ Recognized event names are listed in section 3.3 of the control-spec
+ """
+ if extended:
+ plog ("DEBUG", "SETEVENTS EXTENDED %s\r\n" % " ".join(events))
+ self.sendAndRecv("SETEVENTS EXTENDED %s\r\n" % " ".join(events))
+ else:
+ self.sendAndRecv("SETEVENTS %s\r\n" % " ".join(events))
+
+ def save_conf(self):
+ """Flush all configuration changes to disk.
+ """
+ self.sendAndRecv("SAVECONF\r\n")
+
+ def send_signal(self, sig):
+ """Send the signal 'sig' to the Tor process; The allowed values for
+ 'sig' are listed in section 3.6 of control-spec.
+ """
+ sig = { 0x01 : "HUP",
+ 0x02 : "INT",
+ 0x03 : "NEWNYM",
+ 0x0A : "USR1",
+ 0x0C : "USR2",
+ 0x0F : "TERM" }.get(sig,sig)
+ self.sendAndRecv("SIGNAL %s\r\n"%sig)
+
+ def resolve(self, host):
+ """ Launch a remote hostname lookup request:
+ 'host' may be a hostname or IPv4 address
+ """
+ # TODO: handle "mode=reverse"
+ self.sendAndRecv("RESOLVE %s\r\n"%host)
+
+ def map_address(self, kvList):
+ """ Sends the MAPADDRESS command for each of the tuples in kvList """
+ if not kvList:
+ return
+ m = " ".join([ "%s=%s" for k,v in kvList])
+ lines = self.sendAndRecv("MAPADDRESS %s\r\n"%m)
+ r = []
+ for _,line,_ in lines:
+ try:
+ key, val = line.split("=", 1)
+ except ValueError:
+ raise ProtocolError("Bad address line %r",v)
+ r.append((key,val))
+ return r
+
+ def extend_circuit(self, circid, hops):
+ """Tell Tor to extend the circuit identified by 'circid' through the
+ servers named in the list 'hops'.
+ """
+ if circid is None:
+ circid = "0"
+ plog("DEBUG", "Extending circuit")
+ lines = self.sendAndRecv("EXTENDCIRCUIT %d %s\r\n"
+ %(circid, ",".join(hops)))
+ tp,msg,_ = lines[0]
+ m = re.match(r'EXTENDED (\S*)', msg)
+ if not m:
+ raise ProtocolError("Bad extended line %r",msg)
+ plog("DEBUG", "Circuit extended")
+ return int(m.group(1))
+
+ def redirect_stream(self, streamid, newaddr, newport=""):
+ """DOCDOC"""
+ if newport:
+ self.sendAndRecv("REDIRECTSTREAM %d %s %s\r\n"%(streamid, newaddr, newport))
+ else:
+ self.sendAndRecv("REDIRECTSTREAM %d %s\r\n"%(streamid, newaddr))
+
+ def attach_stream(self, streamid, circid, hop=None):
+ """Attach a stream to a circuit, specify both by IDs. If hop is given,
+ try to use the specified hop in the circuit as the exit node for
+ this stream.
+ """
+ if hop:
+ self.sendAndRecv("ATTACHSTREAM %d %d HOP=%d\r\n"%(streamid, circid, hop))
+ plog("DEBUG", "Attaching stream: "+str(streamid)+" to hop "+str(hop)+" of circuit "+str(circid))
+ else:
+ self.sendAndRecv("ATTACHSTREAM %d %d\r\n"%(streamid, circid))
+ plog("DEBUG", "Attaching stream: "+str(streamid)+" to circuit "+str(circid))
+
+ def close_stream(self, streamid, reason=0, flags=()):
+ """DOCDOC"""
+ self.sendAndRecv("CLOSESTREAM %d %s %s\r\n"
+ %(streamid, reason, "".join(flags)))
+
+ def close_circuit(self, circid, reason=0, flags=()):
+ """DOCDOC"""
+ self.sendAndRecv("CLOSECIRCUIT %d %s %s\r\n"
+ %(circid, reason, "".join(flags)))
+
+ def post_descriptor(self, desc):
+ self.sendAndRecv("+POSTDESCRIPTOR\r\n%s"%escape_dots(desc))
+
+def parse_ns_body(data):
+ """Parse the body of an NS event or command into a list of
+ NetworkStatus instances"""
+ nsgroups = re.compile(r"^r ", re.M).split(data)
+ nsgroups.pop(0)
+ nslist = []
+ for nsline in nsgroups:
+ m = re.search(r"^s((?:\s\S*)+)", nsline, re.M)
+ flags = m.groups()
+ flags = flags[0].strip().split(" ")
+ m = re.match(r"(\S+)\s(\S+)\s(\S+)\s(\S+\s\S+)\s(\S+)\s(\d+)\s(\d+)", nsline)
+ nslist.append(NetworkStatus(*(m.groups() + (flags,))))
+ return nslist
+
+class EventHandler:
+ """An 'EventHandler' wraps callbacks for the events Tor can return.
+ Each event argument is an instance of the corresponding event
+ class."""
+ def __init__(self):
+ """Create a new EventHandler."""
+ self._map1 = {
+ "CIRC" : self.circ_status_event,
+ "STREAM" : self.stream_status_event,
+ "ORCONN" : self.or_conn_status_event,
+ "STREAM_BW" : self.stream_bw_event,
+ "BW" : self.bandwidth_event,
+ "DEBUG" : self.msg_event,
+ "INFO" : self.msg_event,
+ "NOTICE" : self.msg_event,
+ "WARN" : self.msg_event,
+ "ERR" : self.msg_event,
+ "NEWDESC" : self.new_desc_event,
+ "ADDRMAP" : self.address_mapped_event,
+ "NS" : self.ns_event
+ }
+
+ def _handle1(self, timestamp, lines):
+ """Dispatcher: called from Connection when an event is received."""
+ for code, msg, data in lines:
+ event = self._decode1(msg, data)
+ event.arrived_at = timestamp
+ self.heartbeat_event(event)
+ self._map1.get(event.event_name, self.unknown_event)(event)
+
+ def _decode1(self, body, data):
+ """Unpack an event message into a type/arguments-tuple tuple."""
+ if " " in body:
+ evtype,body = body.split(" ",1)
+ else:
+ evtype,body = body,""
+ evtype = evtype.upper()
+ if evtype == "CIRC":
+ m = re.match(r"(\d+)\s+(\S+)(\s\S+)?(\s\S+)?(\s\S+)?", body)
+ if not m:
+ raise ProtocolError("CIRC event misformatted.")
+ ident,status,path,reason,remote = m.groups()
+ ident = int(ident)
+ if path:
+ if "REASON=" in path:
+ remote = reason
+ reason = path
+ path=[]
+ else:
+ path = path.strip().split(",")
+ else:
+ path = []
+ if reason: reason = reason[8:]
+ if remote: remote = remote[15:]
+ event = CircuitEvent(evtype, ident, status, path, reason, remote)
+ elif evtype == "STREAM":
+ #plog("DEBUG", "STREAM: "+body)
+ m = re.match(r"(\S+)\s+(\S+)\s+(\S+)\s+(\S+):(\d+)(\sREASON=\S+)?(\sREMOTE_REASON=\S+)?(\sSOURCE=\S+)?(\sSOURCE_ADDR=\S+)?(\sPURPOSE=\S+)?", body)
+ if not m:
+ raise ProtocolError("STREAM event misformatted.")
+ ident,status,circ,target_host,target_port,reason,remote,source,source_addr,purpose = m.groups()
+ ident,circ = map(int, (ident,circ))
+ if reason: reason = reason[8:]
+ if remote: remote = remote[15:]
+ if source: source = source[8:]
+ if source_addr: source_addr = source_addr[13:]
+ if purpose: purpose = purpose[9:]
+ event = StreamEvent(evtype, ident, status, circ, target_host,
+ int(target_port), reason, remote, source, source_addr, purpose)
+ elif evtype == "ORCONN":
+ m = re.match(r"(\S+)\s+(\S+)(\sAGE=\S+)?(\sREAD=\S+)?(\sWRITTEN=\S+)?(\sREASON=\S+)?(\sNCIRCS=\S+)?", body)
+ if not m:
+ raise ProtocolError("ORCONN event misformatted.")
+ target, status, age, read, wrote, reason, ncircs = m.groups()
+
+ #plog("DEBUG", "ORCONN: "+body)
+ if ncircs: ncircs = int(ncircs[8:])
+ else: ncircs = 0
+ if reason: reason = reason[8:]
+ if age: age = int(age[5:])
+ else: age = 0
+ if read: read = int(read[6:])
+ else: read = 0
+ if wrote: wrote = int(wrote[9:])
+ else: wrote = 0
+ event = ORConnEvent(evtype, status, target, age, read, wrote,
+ reason, ncircs)
+ elif evtype == "STREAM_BW":
+ m = re.match(r"(\d+)\s+(\d+)\s+(\d+)", body)
+ if not m:
+ raise ProtocolError("STREAM_BW event misformatted.")
+ event = StreamBwEvent(evtype, *m.groups())
+ elif evtype == "BW":
+ m = re.match(r"(\d+)\s+(\d+)", body)
+ if not m:
+ raise ProtocolError("BANDWIDTH event misformatted.")
+ read, written = map(long, m.groups())
+ event = BWEvent(evtype, read, written)
+ elif evtype in ("DEBUG", "INFO", "NOTICE", "WARN", "ERR"):
+ event = LogEvent(evtype, body)
+ elif evtype == "NEWDESC":
+ event = NewDescEvent(evtype, body.split(" "))
+ elif evtype == "ADDRMAP":
+ # TODO: Also parse errors and GMTExpiry
+ m = re.match(r'(\S+)\s+(\S+)\s+(\"[^"]+\"|\w+)', body)
+ if not m:
+ raise ProtocolError("ADDRMAP event misformatted.")
+ fromaddr, toaddr, when = m.groups()
+ if when.upper() == "NEVER":
+ when = None
+ else:
+ when = time.strptime(when[1:-1], "%Y-%m-%d %H:%M:%S")
+ event = AddrMapEvent(evtype, fromaddr, toaddr, when)
+ elif evtype == "NS":
+ event = NetworkStatusEvent(evtype, parse_ns_body(data))
+ else:
+ event = UnknownEvent(evtype, body)
+
+ return event
+
+ def heartbeat_event(self, event):
+ """Called before any event is recieved. Convenience function
+ for any cleanup/setup/reconfiguration you may need to do.
+ """
+ pass
+
+ def unknown_event(self, event):
+ """Called when we get an event type we don't recognize. This
+ is almost alwyas an error.
+ """
+ raise NotImplemented()
+
+ def circ_status_event(self, event):
+ """Called when a circuit status changes if listening to CIRCSTATUS
+ events."""
+ raise NotImplemented()
+
+ def stream_status_event(self, event):
+ """Called when a stream status changes if listening to STREAMSTATUS
+ events. """
+ raise NotImplemented()
+
+ def stream_bw_event(self, event):
+ raise NotImplemented()
+
+ def or_conn_status_event(self, event):
+ """Called when an OR connection's status changes if listening to
+ ORCONNSTATUS events."""
+ raise NotImplemented()
+
+ def bandwidth_event(self, event):
+ """Called once a second if listening to BANDWIDTH events.
+ """
+ raise NotImplemented()
+
+ def new_desc_event(self, event):
+ """Called when Tor learns a new server descriptor if listenting to
+ NEWDESC events.
+ """
+ raise NotImplemented()
+
+ def msg_event(self, event):
+ """Called when a log message of a given severity arrives if listening
+ to INFO_MSG, NOTICE_MSG, WARN_MSG, or ERR_MSG events."""
+ raise NotImplemented()
+
+ def ns_event(self, event):
+ raise NotImplemented()
+
+ def address_mapped_event(self, event):
+ """Called when Tor adds a mapping for an address if listening
+ to ADDRESSMAPPED events.
+ """
+ raise NotImplemented()
+
+
+class DebugEventHandler(EventHandler):
+ """Trivial debug event handler: reassembles all parsed events to stdout."""
+ def circ_status_event(self, circ_event): # CircuitEvent()
+ output = [circ_event.event_name, str(circ_event.circ_id),
+ circ_event.status]
+ if circ_event.path:
+ output.append(",".join(circ_event.path))
+ if circ_event.reason:
+ output.append("REASON=" + circ_event.reason)
+ if circ_event.remote_reason:
+ output.append("REMOTE_REASON=" + circ_event.remote_reason)
+ print " ".join(output)
+
+ def stream_status_event(self, strm_event):
+ output = [strm_event.event_name, str(strm_event.strm_id),
+ strm_event.status, str(strm_event.circ_id),
+ strm_event.target_host, str(strm_event.target_port)]
+ if strm_event.reason:
+ output.append("REASON=" + strm_event.reason)
+ if strm_event.remote_reason:
+ output.append("REMOTE_REASON=" + strm_event.remote_reason)
+ print " ".join(output)
+
+ def ns_event(self, ns_event):
+ for ns in ns_event.nslist:
+ print " ".join((ns_event.event_name, ns.nickname, ns.idhash,
+ ns.updated.isoformat(), ns.ip, str(ns.orport),
+ str(ns.dirport), " ".join(ns.flags)))
+
+ def new_desc_event(self, newdesc_event):
+ print " ".join((newdesc_event.event_name, " ".join(newdesc_event.idlist)))
+
+ def or_conn_status_event(self, orconn_event):
+ if orconn_event.age: age = "AGE="+str(orconn_event.age)
+ else: age = ""
+ if orconn_event.read_bytes: read = "READ="+str(orconn_event.read_bytes)
+ else: read = ""
+ if orconn_event.wrote_bytes: wrote = "WRITTEN="+str(orconn_event.wrote_bytes)
+ else: wrote = ""
+ if orconn_event.reason: reason = "REASON="+orconn_event.reason
+ else: reason = ""
+ if orconn_event.ncircs: ncircs = "NCIRCS="+str(orconn_event.ncircs)
+ else: ncircs = ""
+ print " ".join((orconn_event.event_name, orconn_event.endpoint,
+ orconn_event.status, age, read, wrote, reason, ncircs))
+
+ def msg_event(self, log_event):
+ print log_event.event_name+" "+log_event.msg
+
+ def bandwidth_event(self, bw_event):
+ print bw_event.event_name+" "+str(bw_event.read)+" "+str(bw_event.written)
+
+def parseHostAndPort(h):
+ """Given a string of the form 'address:port' or 'address' or
+ 'port' or '', return a two-tuple of (address, port)
+ """
+ host, port = "localhost", 9100
+ if ":" in h:
+ i = h.index(":")
+ host = h[:i]
+ try:
+ port = int(h[i+1:])
+ except ValueError:
+ print "Bad hostname %r"%h
+ sys.exit(1)
+ elif h:
+ try:
+ port = int(h)
+ except ValueError:
+ host = h
+
+ return host, port
+
+def run_example(host,port):
+ """ Example of basic TorCtl usage. See PathSupport for more advanced
+ usage.
+ """
+ print "host is %s:%d"%(host,port)
+ s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+ s.connect((host,port))
+ c = Connection(s)
+ c.set_event_handler(DebugEventHandler())
+ th = c.launch_thread()
+ c.authenticate()
+ print "nick",`c.get_option("nickname")`
+ print `c.get_info("version")`
+ #print `c.get_info("desc/name/moria1")`
+ print `c.get_info("network-status")`
+ print `c.get_info("addr-mappings/all")`
+ print `c.get_info("addr-mappings/config")`
+ print `c.get_info("addr-mappings/cache")`
+ print `c.get_info("addr-mappings/control")`
+
+ print `c.extend_circuit(0,["moria1"])`
+ try:
+ print `c.extend_circuit(0,[""])`
+ except ErrorReply: # wtf?
+ print "got error. good."
+ except:
+ print "Strange error", sys.exc_info()[0]
+
+ #send_signal(s,1)
+ #save_conf(s)
+
+ #set_option(s,"1")
+ #set_option(s,"bandwidthburstbytes 100000")
+ #set_option(s,"runasdaemon 1")
+ #set_events(s,[EVENT_TYPE.WARN])
+# c.set_events([EVENT_TYPE.ORCONN], True)
+ c.set_events([EVENT_TYPE.STREAM, EVENT_TYPE.CIRC,
+ EVENT_TYPE.NS, EVENT_TYPE.NEWDESC,
+ EVENT_TYPE.ORCONN, EVENT_TYPE.BW], True)
+
+ th.join()
+ return
+
+if __name__ == '__main__':
+ if len(sys.argv) > 2:
+ print "Syntax: TorControl.py torhost:torport"
+ sys.exit(0)
+ else:
+ sys.argv.append("localhost:9051")
+ sh,sp = parseHostAndPort(sys.argv[1])
+ run_example(sh,sp)
+
Deleted: torflow/branches/gsoc2008/TorCtl.local/TorUtil.py
===================================================================
--- torflow/branches/gsoc2008/TorCtl/TorUtil.py 2008-07-07 16:40:57 UTC (rev 15734)
+++ torflow/branches/gsoc2008/TorCtl.local/TorUtil.py 2008-07-07 19:37:47 UTC (rev 15742)
@@ -1,260 +0,0 @@
-#!/usr/bin/python
-# TorCtl.py -- Python module to interface with Tor Control interface.
-# Copyright 2007 Mike Perry -- See LICENSE for licensing information.
-# Portions Copyright 2005 Nick Matthewson
-
-"""
-TorUtil -- Support functions for TorCtl.py and metatroller
-"""
-
-import os
-import re
-import sys
-import socket
-import binascii
-import sha
-import math
-import time
-
-__all__ = ["Enum", "Enum2", "Callable", "sort_list", "quote", "escape_dots", "unescape_dots",
- "BufSock", "secret_to_key", "urandom_rng", "s2k_gen", "s2k_check", "plog",
- "ListenSocket", "zprob"]
-
-# TODO: Make functions to read these from a config file. This isn't
-# the right place for them either.. But at least it's unified.
-tor_port = 9050
-tor_host = '127.0.0.1'
-
-meta_port = 9052
-meta_host = '127.0.0.1'
-
-control_port = 9051
-control_host = '127.0.0.1'
-
-class Enum:
- """ Defines an ordered dense name-to-number 1-1 mapping """
- def __init__(self, start, names):
- self.nameOf = {}
- idx = start
- for name in names:
- setattr(self,name,idx)
- self.nameOf[idx] = name
- idx += 1
-
-class Enum2:
- """ Defines an ordered sparse name-to-number 1-1 mapping """
- def __init__(self, **args):
- self.__dict__.update(args)
- self.nameOf = {}
- for k,v in args.items():
- self.nameOf[v] = k
-
-class Callable:
- def __init__(self, anycallable):
- self.__call__ = anycallable
-
-def sort_list(list, key):
- """ Sort a list by a specified key """
- list.sort(lambda x,y: cmp(key(x), key(y))) # Python < 2.4 hack
- return list
-
-def quote(s):
- return re.sub(r'([\r\n\\\"])', r'\\\1', s)
-
-def escape_dots(s, translate_nl=1):
- if translate_nl:
- lines = re.split(r"\r?\n", s)
- else:
- lines = s.split("\r\n")
- if lines and not lines[-1]:
- del lines[-1]
- for i in xrange(len(lines)):
- if lines[i].startswith("."):
- lines[i] = "."+lines[i]
- lines.append(".\r\n")
- return "\r\n".join(lines)
-
-def unescape_dots(s, translate_nl=1):
- lines = s.split("\r\n")
-
- for i in xrange(len(lines)):
- if lines[i].startswith("."):
- lines[i] = lines[i][1:]
-
- if lines and lines[-1]:
- lines.append("")
-
- if translate_nl:
- return "\n".join(lines)
- else:
- return "\r\n".join(lines)
-
-# XXX: Exception handling
-class BufSock:
- def __init__(self, s):
- self._s = s
- self._buf = []
-
- def readline(self):
- if self._buf:
- idx = self._buf[0].find('\n')
- if idx >= 0:
- result = self._buf[0][:idx+1]
- self._buf[0] = self._buf[0][idx+1:]
- return result
-
- while 1:
- s = self._s.recv(128)
- if not s: return None
- # XXX: This really does need an exception
- # raise ConnectionClosed()
- idx = s.find('\n')
- if idx >= 0:
- self._buf.append(s[:idx+1])
- result = "".join(self._buf)
- rest = s[idx+1:]
- if rest:
- self._buf = [ rest ]
- else:
- del self._buf[:]
- return result
- else:
- self._buf.append(s)
-
- def write(self, s):
- self._s.send(s)
-
- def close(self):
- self._s.close()
-
-# SocketServer.TCPServer is nuts..
-class ListenSocket:
- def __init__(self, listen_ip, port):
- msg = None
- self.s = None
- for res in socket.getaddrinfo(listen_ip, port, socket.AF_UNSPEC,
- socket.SOCK_STREAM, 0, socket.AI_PASSIVE):
- af, socktype, proto, canonname, sa = res
- try:
- self.s = socket.socket(af, socktype, proto)
- self.s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
- except socket.error, msg:
- self.s = None
- continue
- try:
- self.s.bind(sa)
- self.s.listen(1)
- except socket.error, msg:
- self.s.close()
- self.s = None
- continue
- break
- if self.s is None:
- raise socket.error(msg)
-
- def accept(self):
- conn, addr = self.s.accept()
- return conn
-
- def close(self):
- self.s.close()
-
-
-def secret_to_key(secret, s2k_specifier):
- """Used to generate a hashed password string. DOCDOC."""
- c = ord(s2k_specifier[8])
- EXPBIAS = 6
- count = (16+(c&15)) << ((c>>4) + EXPBIAS)
-
- d = sha.new()
- tmp = s2k_specifier[:8]+secret
- slen = len(tmp)
- while count:
- if count > slen:
- d.update(tmp)
- count -= slen
- else:
- d.update(tmp[:count])
- count = 0
- return d.digest()
-
-def urandom_rng(n):
- """Try to read some entropy from the platform entropy source."""
- f = open('/dev/urandom', 'rb')
- try:
- return f.read(n)
- finally:
- f.close()
-
-def s2k_gen(secret, rng=None):
- """DOCDOC"""
- if rng is None:
- if hasattr(os, "urandom"):
- rng = os.urandom
- else:
- rng = urandom_rng
- spec = "%s%s"%(rng(8), chr(96))
- return "16:%s"%(
- binascii.b2a_hex(spec + secret_to_key(secret, spec)))
-
-def s2k_check(secret, k):
- """DOCDOC"""
- assert k[:3] == "16:"
-
- k = binascii.a2b_hex(k[3:])
- return secret_to_key(secret, k[:9]) == k[9:]
-
-
-## XXX: Make this a class?
-loglevel = "DEBUG"
-loglevels = {"DEBUG" : 0, "INFO" : 1, "NOTICE" : 2, "WARN" : 3, "ERROR" : 4}
-
-def plog(level, msg): # XXX: Timestamps
- if(loglevels[level] >= loglevels[loglevel]):
- t = time.strftime("%a %b %d %H:%M:%S %Y")
- print level, '[', t, ']:', msg
- sys.stdout.flush()
-
-# Stolen from
-# http://www.nmr.mgh.harvard.edu/Neural_Systems_Group/gary/python/stats.py
-def zprob(z):
- """
-Returns the area under the normal curve 'to the left of' the given z value.
-Thus,
- for z<0, zprob(z) = 1-tail probability
- for z>0, 1.0-zprob(z) = 1-tail probability
- for any z, 2.0*(1.0-zprob(abs(z))) = 2-tail probability
-Adapted from z.c in Gary Perlman's |Stat.
-
-Usage: lzprob(z)
-"""
- Z_MAX = 6.0 # maximum meaningful z-value
- if z == 0.0:
- x = 0.0
- else:
- y = 0.5 * math.fabs(z)
- if y >= (Z_MAX*0.5):
- x = 1.0
- elif (y < 1.0):
- w = y*y
- x = ((((((((0.000124818987 * w
- -0.001075204047) * w +0.005198775019) * w
- -0.019198292004) * w +0.059054035642) * w
- -0.151968751364) * w +0.319152932694) * w
- -0.531923007300) * w +0.797884560593) * y * 2.0
- else:
- y = y - 2.0
- x = (((((((((((((-0.000045255659 * y
- +0.000152529290) * y -0.000019538132) * y
- -0.000676904986) * y +0.001390604284) * y
- -0.000794620820) * y -0.002034254874) * y
- +0.006549791214) * y -0.010557625006) * y
- +0.011630447319) * y -0.009279453341) * y
- +0.005353579108) * y -0.002141268741) * y
- +0.000535310849) * y +0.999936657524
- if z > 0.0:
- prob = ((x+1.0)*0.5)
- else:
- prob = ((1.0-x)*0.5)
- return prob
-
Copied: torflow/branches/gsoc2008/TorCtl.local/TorUtil.py (from rev 15741, torflow/branches/gsoc2008/TorCtl/TorUtil.py)
===================================================================
--- torflow/branches/gsoc2008/TorCtl.local/TorUtil.py (rev 0)
+++ torflow/branches/gsoc2008/TorCtl.local/TorUtil.py 2008-07-07 19:37:47 UTC (rev 15742)
@@ -0,0 +1,260 @@
+#!/usr/bin/python
+# TorCtl.py -- Python module to interface with Tor Control interface.
+# Copyright 2007 Mike Perry -- See LICENSE for licensing information.
+# Portions Copyright 2005 Nick Matthewson
+
+"""
+TorUtil -- Support functions for TorCtl.py and metatroller
+"""
+
+import os
+import re
+import sys
+import socket
+import binascii
+import sha
+import math
+import time
+
+__all__ = ["Enum", "Enum2", "Callable", "sort_list", "quote", "escape_dots", "unescape_dots",
+ "BufSock", "secret_to_key", "urandom_rng", "s2k_gen", "s2k_check", "plog",
+ "ListenSocket", "zprob"]
+
+# TODO: Make functions to read these from a config file. This isn't
+# the right place for them either.. But at least it's unified.
+tor_port = 9050
+tor_host = '127.0.0.1'
+
+meta_port = 9052
+meta_host = '127.0.0.1'
+
+control_port = 9051
+control_host = '127.0.0.1'
+
+class Enum:
+ """ Defines an ordered dense name-to-number 1-1 mapping """
+ def __init__(self, start, names):
+ self.nameOf = {}
+ idx = start
+ for name in names:
+ setattr(self,name,idx)
+ self.nameOf[idx] = name
+ idx += 1
+
+class Enum2:
+ """ Defines an ordered sparse name-to-number 1-1 mapping """
+ def __init__(self, **args):
+ self.__dict__.update(args)
+ self.nameOf = {}
+ for k,v in args.items():
+ self.nameOf[v] = k
+
+class Callable:
+ def __init__(self, anycallable):
+ self.__call__ = anycallable
+
+def sort_list(list, key):
+ """ Sort a list by a specified key """
+ list.sort(lambda x,y: cmp(key(x), key(y))) # Python < 2.4 hack
+ return list
+
+def quote(s):
+ return re.sub(r'([\r\n\\\"])', r'\\\1', s)
+
+def escape_dots(s, translate_nl=1):
+ if translate_nl:
+ lines = re.split(r"\r?\n", s)
+ else:
+ lines = s.split("\r\n")
+ if lines and not lines[-1]:
+ del lines[-1]
+ for i in xrange(len(lines)):
+ if lines[i].startswith("."):
+ lines[i] = "."+lines[i]
+ lines.append(".\r\n")
+ return "\r\n".join(lines)
+
+def unescape_dots(s, translate_nl=1):
+ lines = s.split("\r\n")
+
+ for i in xrange(len(lines)):
+ if lines[i].startswith("."):
+ lines[i] = lines[i][1:]
+
+ if lines and lines[-1]:
+ lines.append("")
+
+ if translate_nl:
+ return "\n".join(lines)
+ else:
+ return "\r\n".join(lines)
+
+# XXX: Exception handling
+class BufSock:
+ def __init__(self, s):
+ self._s = s
+ self._buf = []
+
+ def readline(self):
+ if self._buf:
+ idx = self._buf[0].find('\n')
+ if idx >= 0:
+ result = self._buf[0][:idx+1]
+ self._buf[0] = self._buf[0][idx+1:]
+ return result
+
+ while 1:
+ s = self._s.recv(128)
+ if not s: return None
+ # XXX: This really does need an exception
+ # raise ConnectionClosed()
+ idx = s.find('\n')
+ if idx >= 0:
+ self._buf.append(s[:idx+1])
+ result = "".join(self._buf)
+ rest = s[idx+1:]
+ if rest:
+ self._buf = [ rest ]
+ else:
+ del self._buf[:]
+ return result
+ else:
+ self._buf.append(s)
+
+ def write(self, s):
+ self._s.send(s)
+
+ def close(self):
+ self._s.close()
+
+# SocketServer.TCPServer is nuts..
+class ListenSocket:
+ def __init__(self, listen_ip, port):
+ msg = None
+ self.s = None
+ for res in socket.getaddrinfo(listen_ip, port, socket.AF_UNSPEC,
+ socket.SOCK_STREAM, 0, socket.AI_PASSIVE):
+ af, socktype, proto, canonname, sa = res
+ try:
+ self.s = socket.socket(af, socktype, proto)
+ self.s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
+ except socket.error, msg:
+ self.s = None
+ continue
+ try:
+ self.s.bind(sa)
+ self.s.listen(1)
+ except socket.error, msg:
+ self.s.close()
+ self.s = None
+ continue
+ break
+ if self.s is None:
+ raise socket.error(msg)
+
+ def accept(self):
+ conn, addr = self.s.accept()
+ return conn
+
+ def close(self):
+ self.s.close()
+
+
+def secret_to_key(secret, s2k_specifier):
+ """Used to generate a hashed password string. DOCDOC."""
+ c = ord(s2k_specifier[8])
+ EXPBIAS = 6
+ count = (16+(c&15)) << ((c>>4) + EXPBIAS)
+
+ d = sha.new()
+ tmp = s2k_specifier[:8]+secret
+ slen = len(tmp)
+ while count:
+ if count > slen:
+ d.update(tmp)
+ count -= slen
+ else:
+ d.update(tmp[:count])
+ count = 0
+ return d.digest()
+
+def urandom_rng(n):
+ """Try to read some entropy from the platform entropy source."""
+ f = open('/dev/urandom', 'rb')
+ try:
+ return f.read(n)
+ finally:
+ f.close()
+
+def s2k_gen(secret, rng=None):
+ """DOCDOC"""
+ if rng is None:
+ if hasattr(os, "urandom"):
+ rng = os.urandom
+ else:
+ rng = urandom_rng
+ spec = "%s%s"%(rng(8), chr(96))
+ return "16:%s"%(
+ binascii.b2a_hex(spec + secret_to_key(secret, spec)))
+
+def s2k_check(secret, k):
+ """DOCDOC"""
+ assert k[:3] == "16:"
+
+ k = binascii.a2b_hex(k[3:])
+ return secret_to_key(secret, k[:9]) == k[9:]
+
+
+## XXX: Make this a class?
+loglevel = "DEBUG"
+loglevels = {"DEBUG" : 0, "INFO" : 1, "NOTICE" : 2, "WARN" : 3, "ERROR" : 4}
+
+def plog(level, msg): # XXX: Timestamps
+ if(loglevels[level] >= loglevels[loglevel]):
+ t = time.strftime("%a %b %d %H:%M:%S %Y")
+ print level, '[', t, ']:', msg
+ sys.stdout.flush()
+
+# Stolen from
+# http://www.nmr.mgh.harvard.edu/Neural_Systems_Group/gary/python/stats.py
+def zprob(z):
+ """
+Returns the area under the normal curve 'to the left of' the given z value.
+Thus,
+ for z<0, zprob(z) = 1-tail probability
+ for z>0, 1.0-zprob(z) = 1-tail probability
+ for any z, 2.0*(1.0-zprob(abs(z))) = 2-tail probability
+Adapted from z.c in Gary Perlman's |Stat.
+
+Usage: lzprob(z)
+"""
+ Z_MAX = 6.0 # maximum meaningful z-value
+ if z == 0.0:
+ x = 0.0
+ else:
+ y = 0.5 * math.fabs(z)
+ if y >= (Z_MAX*0.5):
+ x = 1.0
+ elif (y < 1.0):
+ w = y*y
+ x = ((((((((0.000124818987 * w
+ -0.001075204047) * w +0.005198775019) * w
+ -0.019198292004) * w +0.059054035642) * w
+ -0.151968751364) * w +0.319152932694) * w
+ -0.531923007300) * w +0.797884560593) * y * 2.0
+ else:
+ y = y - 2.0
+ x = (((((((((((((-0.000045255659 * y
+ +0.000152529290) * y -0.000019538132) * y
+ -0.000676904986) * y +0.001390604284) * y
+ -0.000794620820) * y -0.002034254874) * y
+ +0.006549791214) * y -0.010557625006) * y
+ +0.011630447319) * y -0.009279453341) * y
+ +0.005353579108) * y -0.002141268741) * y
+ +0.000535310849) * y +0.999936657524
+ if z > 0.0:
+ prob = ((x+1.0)*0.5)
+ else:
+ prob = ((1.0-x)*0.5)
+ return prob
+
Deleted: torflow/branches/gsoc2008/TorCtl.local/__init__.py
===================================================================
--- torflow/branches/gsoc2008/TorCtl/__init__.py 2008-07-07 16:40:57 UTC (rev 15734)
+++ torflow/branches/gsoc2008/TorCtl.local/__init__.py 2008-07-07 19:37:47 UTC (rev 15742)
@@ -1,2 +0,0 @@
-
-__all__ = ["TorUtil", "GeoIPSupport", "PathSupport", "TorCtl", "StatsSupport"]
Copied: torflow/branches/gsoc2008/TorCtl.local/__init__.py (from rev 15741, torflow/branches/gsoc2008/TorCtl/__init__.py)
===================================================================
--- torflow/branches/gsoc2008/TorCtl.local/__init__.py (rev 0)
+++ torflow/branches/gsoc2008/TorCtl.local/__init__.py 2008-07-07 19:37:47 UTC (rev 15742)
@@ -0,0 +1,2 @@
+
+__all__ = ["TorUtil", "GeoIPSupport", "PathSupport", "TorCtl", "StatsSupport"]