[Author Prev][Author Next][Thread Prev][Thread Next][Author Index][Thread Index]

[or-cvs] r23018: {arm} Revising externals definitions for torctl, and making a mirr (in arm: . dependencies dependencies/TorCtl release trunk trunk/src)



Author: atagar
Date: 2010-08-23 01:13:01 +0000 (Mon, 23 Aug 2010)
New Revision: 23018

Added:
   arm/dependencies/
   arm/dependencies/TorCtl/
   arm/dependencies/TorCtl/GeoIPSupport.py
   arm/dependencies/TorCtl/PathSupport.py
   arm/dependencies/TorCtl/README
   arm/dependencies/TorCtl/SQLSupport.py
   arm/dependencies/TorCtl/ScanSupport.py
   arm/dependencies/TorCtl/StatsSupport.py
   arm/dependencies/TorCtl/TorCtl.py
   arm/dependencies/TorCtl/TorUtil.py
   arm/dependencies/TorCtl/__init__.py
   arm/dependencies/notes.txt
Removed:
   arm/trunk/src/TorCtl/
Modified:
   arm/release/
   arm/trunk/
   arm/trunk/src/
Log:
Revising externals definitions for torctl, and making a mirror based on the current git trunk.



Added: arm/dependencies/TorCtl/GeoIPSupport.py
===================================================================
--- arm/dependencies/TorCtl/GeoIPSupport.py	                        (rev 0)
+++ arm/dependencies/TorCtl/GeoIPSupport.py	2010-08-23 01:13:01 UTC (rev 23018)
@@ -0,0 +1,140 @@
+#!/usr/bin/python
+
+import struct
+import socket
+import TorCtl
+import StatsSupport
+
+from TorUtil import plog
+try:
+  import GeoIP
+  # GeoIP data object: choose database here
+  geoip = GeoIP.new(GeoIP.GEOIP_STANDARD)
+  #geoip = GeoIP.open("./GeoLiteCity.dat", GeoIP.GEOIP_STANDARD)
+except:
+  plog("NOTICE", "No GeoIP library. GeoIPSupport.py will not work correctly")
+  # XXX: How do we bail entirely..  
+
+
+class Continent:
+  """ Continent class: The group attribute is to partition the continents
+      in groups, to determine the number of ocean crossings """
+  def __init__(self, continent_code):
+    self.code = continent_code 
+    self.group = None
+    self.countries = []
+
+  def contains(self, country_code):
+    return country_code in self.countries
+
+# Set countries to continents
+africa = Continent("AF")
+africa.group = 1
+africa.countries = ["AO","BF","BI","BJ","BV","BW","CD","CF","CG","CI","CM",
+   "CV","DJ","DZ","EG","EH","ER","ET","GA","GH","GM","GN","GQ","GW","HM","KE",
+   "KM","LR","LS","LY","MA","MG","ML","MR","MU","MW","MZ","NA","NE","NG","RE",
+   "RW","SC","SD","SH","SL","SN","SO","ST","SZ","TD","TF","TG","TN","TZ","UG",
+   "YT","ZA","ZM","ZR","ZW"]
+
+asia = Continent("AS")
+asia.group = 1
+asia.countries = ["AP","AE","AF","AM","AZ","BD","BH","BN","BT","CC","CN","CX",
+   "CY","GE","HK","ID","IL","IN","IO","IQ","IR","JO","JP","KG","KH","KP","KR",
+   "KW","KZ","LA","LB","LK","MM","MN","MO","MV","MY","NP","OM","PH","PK","PS",
+   "QA","RU","SA","SG","SY","TH","TJ","TM","TP","TR","TW","UZ","VN","YE"]
+
+europe = Continent("EU")
+europe.group = 1
+europe.countries = ["EU","AD","AL","AT","BA","BE","BG","BY","CH","CZ","DE",
+   "DK","EE","ES","FI","FO","FR","FX","GB","GI","GR","HR","HU","IE","IS","IT",
+   "LI","LT","LU","LV","MC","MD","MK","MT","NL","NO","PL","PT","RO","SE","SI",
+   "SJ","SK","SM","UA","VA","YU"]
+
+oceania = Continent("OC")
+oceania.group = 2
+oceania.countries = ["AS","AU","CK","FJ","FM","GU","KI","MH","MP","NC","NF",
+   "NR","NU","NZ","PF","PG","PN","PW","SB","TK","TO","TV","UM","VU","WF","WS"]
+
+north_america = Continent("NA")
+north_america.group = 0
+north_america.countries = ["CA","MX","US"]
+
+south_america = Continent("SA")
+south_america.group = 0
+south_america.countries = ["AG","AI","AN","AR","AW","BB","BM","BO","BR","BS",
+   "BZ","CL","CO","CR","CU","DM","DO","EC","FK","GD","GF","GL","GP","GS","GT",
+   "GY","HN","HT","JM","KN","KY","LC","MQ","MS","NI","PA","PE","PM","PR","PY",
+   "SA","SR","SV","TC","TT","UY","VC","VE","VG","VI"]
+
+# List of continents
+continents = [africa, asia, europe, north_america, oceania, south_america]
+
+def get_continent(country_code):
+  """ Perform country -- continent mapping """
+  for c in continents:
+    if c.contains(country_code):
+      return c
+  plog("INFO", country_code + " is not on any continent")
+  return None
+
+def get_country(ip):
+  """ Get the country via the library """
+  return geoip.country_code_by_addr(ip)
+
+def get_country_from_record(ip):
+  """ Get the country code out of a GeoLiteCity record (not used) """
+  record = geoip.record_by_addr(ip)
+  if record != None:
+    return record['country_code']
+
+class GeoIPRouter(TorCtl.Router):
+  # TODO: Its really shitty that this has to be a TorCtl.Router
+  # and can't be a StatsRouter..
+  """ Router class extended to GeoIP """
+  def __init__(self, router):
+    self.__dict__ = router.__dict__
+    self.country_code = get_country(self.get_ip_dotted())
+    if self.country_code != None: 
+      c = get_continent(self.country_code)
+      if c != None:
+        self.continent = c.code
+        self.cont_group = c.group
+    else: 
+      plog("INFO", self.nickname + ": Country code not found")
+      self.continent = None
+   
+  def get_ip_dotted(self):
+    """ Convert long int back to dotted quad string """
+    return socket.inet_ntoa(struct.pack('>I', self.ip))
+
+class GeoIPConfig:
+  """ Class to configure GeoIP-based path building """
+  def __init__(self, unique_countries=None, continent_crossings=4,
+     ocean_crossings=None, entry_country=None, middle_country=None,
+     exit_country=None, excludes=None):
+    # TODO: Somehow ensure validity of a configuration:
+    #   - continent_crossings >= ocean_crossings
+    #   - unique_countries=False --> continent_crossings!=None
+    #   - echelon? set entry_country to source and exit_country to None
+
+    # Do not use a country twice in a route 
+    # [True --> unique, False --> same or None --> pass] 
+    self.unique_countries = unique_countries
+    
+    # Configure max continent crossings in one path 
+    # [integer number 0-n or None --> ContinentJumper/UniqueContinent]
+    self.continent_crossings = continent_crossings
+    self.ocean_crossings = ocean_crossings
+ 
+    # Try to find an exit node in the destination country
+    # use exit_country as backup, if country cannot not be found
+    self.echelon = False
+
+    # Specify countries for positions [single country code or None]
+    self.entry_country = entry_country
+    self.middle_country = middle_country
+    self.exit_country = exit_country
+
+    # List of countries not to use in routes 
+    # [(empty) list of country codes or None]
+    self.excludes = excludes

Added: arm/dependencies/TorCtl/PathSupport.py
===================================================================
--- arm/dependencies/TorCtl/PathSupport.py	                        (rev 0)
+++ arm/dependencies/TorCtl/PathSupport.py	2010-08-23 01:13:01 UTC (rev 23018)
@@ -0,0 +1,2108 @@
+#!/usr/bin/python
+"""
+
+Support classes for path construction
+
+The PathSupport package builds on top of TorCtl.TorCtl. It provides a
+number of interfaces that make path construction easier.
+
+The inheritance diagram for event handling is as follows:
+TorCtl.EventHandler <- TorCtl.ConsensusTracker <- PathBuilder 
+  <- CircuitHandler <- StreamHandler.
+
+Basically, EventHandler is what gets all the control port events
+packaged in nice clean classes (see help(TorCtl) for information on
+those). 
+
+ConsensusTracker tracks the NEWCONSENSUS and NEWDESC events to maintain
+a view of the network that is consistent with the Tor client's current
+consensus.
+
+PathBuilder inherits from ConsensusTracker and is what builds all
+circuits based on the requirements specified in the SelectionManager
+instance passed to its constructor. It also handles attaching streams to
+circuits. It only handles one building one circuit at a time.
+
+CircuitHandler optionally inherits from PathBuilder, and overrides its
+circuit event handling to manage building a pool of circuits as opposed
+to just one. It still uses the SelectionManager for path selection.
+
+StreamHandler inherits from CircuitHandler, and is what governs the
+attachment of an incoming stream on to one of the multiple circuits of
+the circuit handler. 
+
+The SelectionManager is essentially a configuration wrapper around the
+most elegant portions of TorFlow: NodeGenerators, NodeRestrictions, and
+PathRestrictions. It extends from a BaseSelectionManager that provides
+a basic example of using these mechanisms for custom implementations.
+
+In the SelectionManager, a NodeGenerator is used to choose the nodes
+probabilistically according to some distribution while obeying the
+NodeRestrictions. These generators (one per hop) are handed off to the
+PathSelector, which uses the generators to build a complete path that
+satisfies the PathRestriction requirements.
+
+Have a look at the class hierarchy directly below to get a feel for how
+the restrictions fit together, and what options are available.
+
+"""
+
+import TorCtl
+import re
+import struct
+import random
+import socket
+import copy
+import Queue
+import time
+import TorUtil
+import traceback
+import threading
+from TorUtil import *
+
+import sys
+if sys.version_info < (2, 5):
+  from sets import Set as set
+
+__all__ = ["NodeRestrictionList", "PathRestrictionList",
+"PercentileRestriction", "OSRestriction", "ConserveExitsRestriction",
+"FlagsRestriction", "MinBWRestriction", "VersionIncludeRestriction",
+"VersionExcludeRestriction", "VersionRangeRestriction",
+"ExitPolicyRestriction", "NodeRestriction", "PathRestriction",
+"OrNodeRestriction", "MetaNodeRestriction", "AtLeastNNodeRestriction",
+"NotNodeRestriction", "Subnet16Restriction", "UniqueRestriction",
+"NodeGenerator", "UniformGenerator", "OrderedExitGenerator",
+"BwWeightedGenerator", "PathSelector", "Connection", "NickRestriction",
+"IdHexRestriction", "PathBuilder", "CircuitHandler", "StreamHandler",
+"SelectionManager", "BaseSelectionManager", "CountryCodeRestriction",
+"CountryRestriction", "UniqueCountryRestriction", "SingleCountryRestriction",
+"ContinentRestriction", "ContinentJumperRestriction",
+"UniqueContinentRestriction", "MetaPathRestriction", "RateLimitedRestriction",
+"SmartSocket"]
+
+#################### Path Support Interfaces #####################
+
+class RestrictionError(Exception):
+  "Error raised for issues with applying restrictions"
+  pass
+
+class NoNodesRemain(RestrictionError):
+  "Error raised for issues with applying restrictions"
+  pass
+
+class NodeRestriction:
+  "Interface for node restriction policies"
+  def r_is_ok(self, r):
+    "Returns true if Router 'r' is acceptable for this restriction"
+    return True  
+
+class PathRestriction:
+  "Interface for path restriction policies"
+  def path_is_ok(self, path):
+    "Return true if the list of Routers in path satisfies this restriction"
+    return True  
+
+# TODO: Or, Not, N of M
+class MetaPathRestriction(PathRestriction):
+  "MetaPathRestrictions are path restriction aggregators."
+  def add_restriction(self, rstr): raise NotImplemented()
+  def del_restriction(self, RestrictionClass): raise NotImplemented()
+ 
+class PathRestrictionList(MetaPathRestriction):
+  """Class to manage a list of PathRestrictions"""
+  def __init__(self, restrictions):
+    "Constructor. 'restrictions' is a list of PathRestriction instances"
+    self.restrictions = restrictions
+  
+  def path_is_ok(self, path):
+    "Given list if Routers in 'path', check it against each restriction."
+    for rs in self.restrictions:
+      if not rs.path_is_ok(path):
+        return False
+    return True
+
+  def add_restriction(self, rstr):
+    "Add a PathRestriction 'rstr' to the list"
+    self.restrictions.append(rstr)
+
+  def del_restriction(self, RestrictionClass):
+    "Remove all PathRestrictions of type RestrictionClass from the list."
+    self.restrictions = filter(
+        lambda r: not isinstance(r, RestrictionClass),
+          self.restrictions)
+
+  def __str__(self):
+    return self.__class__.__name__+"("+str(map(str, self.restrictions))+")"
+
+class NodeGenerator:
+  "Interface for node generation"
+  def __init__(self, sorted_r, rstr_list):
+    """Constructor. Takes a bandwidth-sorted list of Routers 'sorted_r' 
+    and a NodeRestrictionList 'rstr_list'"""
+    self.rstr_list = rstr_list
+    self.rebuild(sorted_r)
+
+  def reset_restriction(self, rstr_list):
+    "Reset the restriction list to a new list"
+    self.rstr_list = rstr_list
+    self.rebuild()
+
+  def rewind(self):
+    "Rewind the generator to the 'beginning'"
+    self.routers = copy.copy(self.rstr_routers)
+    if not self.routers:
+      plog("NOTICE", "No routers left after restrictions applied: "+str(self.rstr_list))
+      raise NoNodesRemain(str(self.rstr_list))
+ 
+  def rebuild(self, sorted_r=None):
+    """ Extra step to be performed when new routers are added or when
+    the restrictions change. """
+    if sorted_r != None:
+      self.sorted_r = sorted_r
+    self.rstr_routers = filter(lambda r: self.rstr_list.r_is_ok(r), self.sorted_r)
+    if not self.rstr_routers:
+      plog("NOTICE", "No routers left after restrictions applied: "+str(self.rstr_list))
+      raise NoNodesRemain(str(self.rstr_list))
+
+  def mark_chosen(self, r):
+    """Mark a router as chosen: remove it from the list of routers 
+     that can be returned in the future"""
+    self.routers.remove(r)
+
+  def all_chosen(self):
+    "Return true if all the routers have been marked as chosen"
+    return not self.routers
+
+  def generate(self):
+    "Return a python generator that yields routers according to the policy"
+    raise NotImplemented()
+
+class Connection(TorCtl.Connection):
+  """Extended Connection class that provides a method for building circuits"""
+  def __init__(self, sock):
+    TorCtl.Connection.__init__(self,sock)
+  def build_circuit(self, path):
+    "Tell Tor to build a circuit chosen by the PathSelector 'path_sel'"
+    circ = Circuit()
+    circ.path = path
+    circ.exit = circ.path[len(path)-1]
+    circ.circ_id = self.extend_circuit(0, circ.id_path())
+    return circ
+
+######################## Node Restrictions ########################
+
+# TODO: We still need more path support implementations
+#  - NodeRestrictions:
+#    - Uptime/LongLivedPorts (Does/should hibernation count?)
+#    - Published/Updated
+#    - Add a /8 restriction for ExitPolicy?
+#  - PathRestrictions:
+#    - NodeFamily
+#    - GeoIP:
+#      - Mathematical/empirical study of predecessor expectation
+#        - If middle node on the same continent as exit, exit learns nothing
+#        - else, exit has a bias on the continent of origin of user
+#          - Language and browser accept string determine this anyway
+#      - ContinentRestrictor (avoids doing more than N continent crossings)
+#      - EchelonPhobicRestrictor
+#        - Does not cross international boundaries for client->Entry or
+#          Exit->destination hops
+
+class PercentileRestriction(NodeRestriction):
+  """Restriction to cut out a percentile slice of the network."""
+  def __init__(self, pct_skip, pct_fast, r_list):
+    """Constructor. Sets up the restriction such that routers in the 
+     'pct_skip' to 'pct_fast' percentile of bandwidth rankings are 
+     returned from the sorted list 'r_list'"""
+    self.pct_fast = pct_fast
+    self.pct_skip = pct_skip
+    self.sorted_r = r_list
+
+  def r_is_ok(self, r):
+    "Returns true if r is in the percentile boundaries (by rank)"
+    if r.list_rank < len(self.sorted_r)*self.pct_skip/100: return False
+    elif r.list_rank > len(self.sorted_r)*self.pct_fast/100: return False
+    
+    return True
+
+  def __str__(self):
+    return self.__class__.__name__+"("+str(self.pct_skip)+","+str(self.pct_fast)+")"
+
+class UptimeRestriction(NodeRestriction):
+  """Restriction to filter out routers with uptimes < min_uptime or
+     > max_uptime"""
+  def __init__(self, min_uptime=None, max_uptime=None):
+    self.min_uptime = min_uptime
+    self.max_uptime = max_uptime
+
+  def r_is_ok(self, r):
+    "Returns true if r is in the uptime boundaries"
+    if self.min_uptime and r.uptime < self.min_uptime: return False
+    if self.max_uptime and r.uptime > self.max_uptime: return False
+    return True
+
+class RankRestriction(NodeRestriction):
+  """Restriction to cut out a list-rank slice of the network."""
+  def __init__(self, rank_skip, rank_stop):
+    self.rank_skip = rank_skip
+    self.rank_stop = rank_stop
+
+  def r_is_ok(self, r):
+    "Returns true if r is in the boundaries (by rank)"
+    if r.list_rank < self.rank_skip: return False
+    elif r.list_rank > self.rank_stop: return False
+    
+    return True
+
+  def __str__(self):
+    return self.__class__.__name__+"("+str(self.rank_skip)+","+str(self.rank_stop)+")"
+    
+class OSRestriction(NodeRestriction):
+  "Restriction based on operating system"
+  def __init__(self, ok, bad=[]):
+    """Constructor. Accept router OSes that match regexes in 'ok', 
+       rejects those that match regexes in 'bad'."""
+    self.ok = ok
+    self.bad = bad
+
+  def r_is_ok(self, r):
+    "Returns true if r is in 'ok', false if 'r' is in 'bad'. If 'ok'"
+    for y in self.ok:
+      if re.search(y, r.os):
+        return True
+    for b in self.bad:
+      if re.search(b, r.os):
+        return False
+    if self.ok: return False
+    if self.bad: return True
+
+  def __str__(self):
+    return self.__class__.__name__+"("+str(self.ok)+","+str(self.bad)+")"
+
+class ConserveExitsRestriction(NodeRestriction):
+  "Restriction to reject exits from selection"
+  def __init__(self, exit_ports=None):
+    self.exit_ports = exit_ports
+
+  def r_is_ok(self, r):
+    if self.exit_ports:
+      for port in self.exit_ports:
+        if r.will_exit_to("255.255.255.255", port):
+          return False
+      return True
+    return not "Exit" in r.flags
+
+  def __str__(self):
+    return self.__class__.__name__+"()"
+
+class FlagsRestriction(NodeRestriction):
+  "Restriction for mandatory and forbidden router flags"
+  def __init__(self, mandatory, forbidden=[]):
+    """Constructor. 'mandatory' and 'forbidden' are both lists of router 
+     flags as strings."""
+    self.mandatory = mandatory
+    self.forbidden = forbidden
+
+  def r_is_ok(self, router):
+    for m in self.mandatory:
+      if not m in router.flags: return False
+    for f in self.forbidden:
+      if f in router.flags: return False
+    return True
+
+  def __str__(self):
+    return self.__class__.__name__+"("+str(self.mandatory)+","+str(self.forbidden)+")"
+
+class NickRestriction(NodeRestriction):
+  """Require that the node nickname is as specified"""
+  def __init__(self, nickname):
+    self.nickname = nickname
+
+  def r_is_ok(self, router):
+    return router.nickname == self.nickname
+
+  def __str__(self):
+    return self.__class__.__name__+"("+str(self.nickname)+")"
+
+class IdHexRestriction(NodeRestriction):
+  """Require that the node idhash is as specified"""
+  def __init__(self, idhex):
+    if idhex[0] == '$':
+      self.idhex = idhex[1:].upper()
+    else:
+      self.idhex = idhex.upper()
+
+  def r_is_ok(self, router):
+    return router.idhex == self.idhex
+
+  def __str__(self):
+    return self.__class__.__name__+"("+str(self.idhex)+")"
+ 
+class MinBWRestriction(NodeRestriction):
+  """Require a minimum bandwidth"""
+  def __init__(self, minbw):
+    self.min_bw = minbw
+
+  def r_is_ok(self, router): return router.bw >= self.min_bw
+
+  def __str__(self):
+    return self.__class__.__name__+"("+str(self.min_bw)+")"
+
+class RateLimitedRestriction(NodeRestriction):
+  def __init__(self, limited=True):
+    self.limited = limited
+
+  def r_is_ok(self, router): return router.rate_limited == self.limited
+
+  def __str__(self):
+    return self.__class__.__name__+"("+str(self.limited)+")"
+   
+class VersionIncludeRestriction(NodeRestriction):
+  """Require that the version match one in the list"""
+  def __init__(self, eq):
+    "Constructor. 'eq' is a list of versions as strings"
+    self.eq = map(TorCtl.RouterVersion, eq)
+  
+  def r_is_ok(self, router):
+    """Returns true if the version of 'router' matches one of the 
+     specified versions."""
+    for e in self.eq:
+      if e == router.version:
+        return True
+    return False
+
+  def __str__(self):
+    return self.__class__.__name__+"("+str(self.eq)+")"
+
+class VersionExcludeRestriction(NodeRestriction):
+  """Require that the version not match one in the list"""
+  def __init__(self, exclude):
+    "Constructor. 'exclude' is a list of versions as strings"
+    self.exclude = map(TorCtl.RouterVersion, exclude)
+  
+  def r_is_ok(self, router):
+    """Returns false if the version of 'router' matches one of the 
+     specified versions."""
+    for e in self.exclude:
+      if e == router.version:
+        return False
+    return True
+
+  def __str__(self):
+    return self.__class__.__name__+"("+str(map(str, self.exclude))+")"
+
+class VersionRangeRestriction(NodeRestriction):
+  """Require that the versions be inside a specified range""" 
+  def __init__(self, gr_eq, less_eq=None):
+    self.gr_eq = TorCtl.RouterVersion(gr_eq)
+    if less_eq: self.less_eq = TorCtl.RouterVersion(less_eq)
+    else: self.less_eq = None
+  
+  def r_is_ok(self, router):
+    return (not self.gr_eq or router.version >= self.gr_eq) and \
+        (not self.less_eq or router.version <= self.less_eq)
+
+  def __str__(self):
+    return self.__class__.__name__+"("+str(self.gr_eq)+","+str(self.less_eq)+")"
+
+class ExitPolicyRestriction(NodeRestriction):
+  """Require that a router exit to an ip+port"""
+  def __init__(self, to_ip, to_port):
+    self.to_ip = to_ip
+    self.to_port = to_port
+
+  def r_is_ok(self, r): return r.will_exit_to(self.to_ip, self.to_port)
+
+  def __str__(self):
+    return self.__class__.__name__+"("+str(self.to_ip)+","+str(self.to_port)+")"
+
+class MetaNodeRestriction(NodeRestriction):
+  """Interface for a NodeRestriction that is an expression consisting of 
+     multiple other NodeRestrictions"""
+  def add_restriction(self, rstr): raise NotImplemented()
+  # TODO: these should collapse the restriction and return a new
+  # instance for re-insertion (or None)
+  def next_rstr(self): raise NotImplemented()
+  def del_restriction(self, RestrictionClass): raise NotImplemented()
+
+class OrNodeRestriction(MetaNodeRestriction):
+  """MetaNodeRestriction that is the boolean or of two or more
+     NodeRestrictions"""
+  def __init__(self, rs):
+    "Constructor. 'rs' is a list of NodeRestrictions"
+    self.rstrs = rs
+
+  def r_is_ok(self, r):
+    "Returns true if one of 'rs' is true for this router"
+    for rs in self.rstrs:
+      if rs.r_is_ok(r):
+        return True
+    return False
+
+  def __str__(self):
+    return self.__class__.__name__+"("+str(map(str, self.rstrs))+")"
+
+class NotNodeRestriction(MetaNodeRestriction):
+  """Negates a single restriction"""
+  def __init__(self, a):
+    self.a = a
+
+  def r_is_ok(self, r): return not self.a.r_is_ok(r)
+
+  def __str__(self):
+    return self.__class__.__name__+"("+str(self.a)+")"
+
+class AtLeastNNodeRestriction(MetaNodeRestriction):
+  """MetaNodeRestriction that is true if at least n member 
+     restrictions are true."""
+  def __init__(self, rstrs, n):
+    self.rstrs = rstrs
+    self.n = n
+
+  def r_is_ok(self, r):
+    cnt = 0
+    for rs in self.rstrs:
+      if rs.r_is_ok(r):
+        cnt += 1
+    if cnt < self.n: return False
+    else: return True
+
+  def __str__(self):
+    return self.__class__.__name__+"("+str(map(str, self.rstrs))+","+str(self.n)+")"
+
+class NodeRestrictionList(MetaNodeRestriction):
+  "Class to manage a list of NodeRestrictions"
+  def __init__(self, restrictions):
+    "Constructor. 'restrictions' is a list of NodeRestriction instances"
+    self.restrictions = restrictions
+
+  def r_is_ok(self, r):
+    "Returns true of Router 'r' passes all of the contained restrictions"
+    for rs in self.restrictions:
+      if not rs.r_is_ok(r): return False
+    return True
+
+  def add_restriction(self, restr):
+    "Add a NodeRestriction 'restr' to the list of restrictions"
+    self.restrictions.append(restr)
+
+  # TODO: This does not collapse meta restrictions..
+  def del_restriction(self, RestrictionClass):
+    """Remove all restrictions of type RestrictionClass from the list.
+       Does NOT inspect or collapse MetaNode Restrictions (though 
+       MetaRestrictions can be removed if RestrictionClass is 
+       MetaNodeRestriction)"""
+    self.restrictions = filter(
+        lambda r: not isinstance(r, RestrictionClass),
+          self.restrictions)
+  
+  def clear(self):
+    """ Remove all restrictions """
+    self.restrictions = []
+
+  def __str__(self):
+    return self.__class__.__name__+"("+str(map(str, self.restrictions))+")"
+
+
+#################### Path Restrictions #####################
+
+class Subnet16Restriction(PathRestriction):
+  """PathRestriction that mandates that no two nodes from the same 
+     /16 subnet be in the path"""
+  def path_is_ok(self, path):
+    mask16 = struct.unpack(">I", socket.inet_aton("255.255.0.0"))[0]
+    ip16 = path[0].ip & mask16
+    for r in path[1:]:
+      if ip16 == (r.ip & mask16):
+        return False
+    return True
+
+  def __str__(self):
+    return self.__class__.__name__+"()"
+
+class UniqueRestriction(PathRestriction):
+  """Path restriction that mandates that the same router can't appear more
+     than once in a path"""
+  def path_is_ok(self, path):
+    for i in xrange(0,len(path)):
+      if path[i] in path[:i]:
+        return False
+    return True
+
+  def __str__(self):
+    return self.__class__.__name__+"()"
+
+#################### GeoIP Restrictions ###################
+
+class CountryCodeRestriction(NodeRestriction):
+  """ Ensure that the country_code is set """
+  def r_is_ok(self, r):
+    return r.country_code != None
+
+  def __str__(self):
+    return self.__class__.__name__+"()"
+
+class CountryRestriction(NodeRestriction):
+  """ Only accept nodes that are in 'country_code' """
+  def __init__(self, country_code):
+    self.country_code = country_code
+
+  def r_is_ok(self, r):
+    return r.country_code == self.country_code
+
+  def __str__(self):
+    return self.__class__.__name__+"("+str(self.country_code)+")"
+
+class ExcludeCountriesRestriction(NodeRestriction):
+  """ Exclude a list of countries """
+  def __init__(self, countries):
+    self.countries = countries
+
+  def r_is_ok(self, r):
+    return not (r.country_code in self.countries)
+
+  def __str__(self):
+    return self.__class__.__name__+"("+str(self.countries)+")"
+
+class UniqueCountryRestriction(PathRestriction):
+  """ Ensure every router to have a distinct country_code """
+  def path_is_ok(self, path):
+    for i in xrange(0, len(path)-1):
+      for j in xrange(i+1, len(path)):
+        if path[i].country_code == path[j].country_code:
+          return False;
+    return True;
+
+  def __str__(self):
+    return self.__class__.__name__+"()"
+
+class SingleCountryRestriction(PathRestriction):
+  """ Ensure every router to have the same country_code """
+  def path_is_ok(self, path):
+    country_code = path[0].country_code
+    for r in path:
+      if country_code != r.country_code:
+        return False
+    return True
+
+  def __str__(self):
+    return self.__class__.__name__+"()"
+
+class ContinentRestriction(PathRestriction):
+  """ Do not more than n continent crossings """
+  # TODO: Add src and dest
+  def __init__(self, n, src=None, dest=None):
+    self.n = n
+
+  def path_is_ok(self, path):
+    crossings = 0
+    prev = None
+    # Compute crossings until now
+    for r in path:
+      # Jump over the first router
+      if prev:
+        if r.continent != prev.continent:
+          crossings += 1
+      prev = r
+    if crossings > self.n: return False
+    else: return True
+
+  def __str__(self):
+    return self.__class__.__name__+"("+str(self.n)+")"
+
+class ContinentJumperRestriction(PathRestriction):
+  """ Ensure continent crossings between all hops """
+  def path_is_ok(self, path):
+    prev = None
+    for r in path:
+      # Jump over the first router
+      if prev:
+        if r.continent == prev.continent:
+          return False
+      prev = r
+    return True
+
+  def __str__(self):
+    return self.__class__.__name__+"()"
+
+class UniqueContinentRestriction(PathRestriction):
+  """ Ensure every hop to be on a different continent """
+  def path_is_ok(self, path):
+    for i in xrange(0, len(path)-1):
+      for j in xrange(i+1, len(path)):
+        if path[i].continent == path[j].continent:
+          return False;
+    return True;
+
+  def __str__(self):
+    return self.__class__.__name__+"()"
+
+class OceanPhobicRestriction(PathRestriction):
+  """ Not more than n ocean crossings """
+  # TODO: Add src and dest
+  def __init__(self, n, src=None, dest=None):
+    self.n = n
+
+  def path_is_ok(self, path):
+    crossings = 0
+    prev = None
+    # Compute ocean crossings until now
+    for r in path:
+      # Jump over the first router
+      if prev:
+        if r.cont_group != prev.cont_group:
+          crossings += 1
+      prev = r
+    if crossings > self.n: return False
+    else: return True
+
+  def __str__(self):
+    return self.__class__.__name__+"("+str(self.n)+")"
+
+#################### Node Generators ######################
+
+class UniformGenerator(NodeGenerator):
+  """NodeGenerator that produces nodes in the uniform distribution"""
+  def generate(self):
+    # XXX: hrmm.. this is not really the right thing to check
+    while not self.all_chosen():
+      yield random.choice(self.routers)
+     
+class ExactUniformGenerator(NodeGenerator):
+  """NodeGenerator that produces nodes randomly, yet strictly uniformly 
+     over time"""
+  def __init__(self, sorted_r, rstr_list, position=0):
+    self.position = position
+    NodeGenerator.__init__(self, sorted_r, rstr_list)  
+
+  def generate(self):
+    min_gen = min(map(lambda r: r._generated[self.position], self.routers))
+    choices = filter(lambda r: r._generated[self.position]==min_gen, 
+                       self.routers)
+    while choices:
+      r = random.choice(choices)
+      yield r
+      choices.remove(r)
+
+    choices = filter(lambda r: r._generated[self.position]==min_gen,
+                       self.routers)
+    plog("NOTICE", "Ran out of choices in ExactUniformGenerator. Incrementing nodes")
+    for r in choices:
+      r._generated[self.position] += 1
+
+  def mark_chosen(self, r):
+    r._generated[self.position] += 1
+    NodeGenerator.mark_chosen(self, r)
+
+  def rebuild(self, sorted_r=None):
+    plog("DEBUG", "Rebuilding ExactUniformGenerator")
+    NodeGenerator.rebuild(self, sorted_r)
+    for r in self.rstr_routers:
+      lgen = len(r._generated)
+      if lgen < self.position+1:
+        for i in xrange(lgen, self.position+1):
+          r._generated.append(0)
+
+
+class OrderedExitGenerator(NodeGenerator):
+  """NodeGenerator that produces exits in an ordered fashion for a 
+     specific port"""
+  def __init__(self, to_port, sorted_r, rstr_list):
+    self.to_port = to_port
+    self.next_exit_by_port = {}
+    NodeGenerator.__init__(self, sorted_r, rstr_list)
+
+  def rewind(self):
+    NodeGenerator.rewind(self)
+    if self.to_port not in self.next_exit_by_port or not self.next_exit_by_port[self.to_port]:
+      self.next_exit_by_port[self.to_port] = 0
+      self.last_idx = len(self.routers)
+    else:
+      self.last_idx = self.next_exit_by_port[self.to_port]
+
+  def set_port(self, port):
+    self.to_port = port
+    self.rewind()
+     
+  def mark_chosen(self, r):
+    self.next_exit_by_port[self.to_port] += 1
+  
+  def all_chosen(self):
+    return self.last_idx == self.next_exit_by_port[self.to_port]
+
+  def generate(self):
+    while True: # A do..while would be real nice here..
+      if self.next_exit_by_port[self.to_port] >= len(self.routers):
+        self.next_exit_by_port[self.to_port] = 0
+      yield self.routers[self.next_exit_by_port[self.to_port]]
+      self.next_exit_by_port[self.to_port] += 1
+      if self.last_idx == self.next_exit_by_port[self.to_port]:
+        break
+
+class BwWeightedGenerator(NodeGenerator):
+  """
+
+  This is a generator designed to match the Tor Path Selection
+  algorithm.  It will generate nodes weighted by their bandwidth,
+  but take the appropriate weighting into account against guard
+  nodes and exit nodes when they are chosen for positions other
+  than guard/exit. For background see:
+  routerlist.c::smartlist_choose_by_bandwidth(),
+  http://archives.seul.org/or/dev/Jul-2007/msg00021.html,
+  http://archives.seul.org/or/dev/Jul-2007/msg00056.html, and
+  https://tor-svn.freehaven.net/svn/tor/trunk/doc/spec/path-spec.txt
+  The formulas used are from the first or-dev link, but are proven
+  optimal and equivalent to the ones now used in routerlist.c in the 
+  second or-dev link.
+  
+  """ 
+  def __init__(self, sorted_r, rstr_list, pathlen, exit=False, guard=False):
+    """ Pass exit=True to create a generator for exit-nodes """
+    self.max_bandwidth = 10000000
+    # Out for an exit-node?
+    self.exit = exit
+    # Is this a guard node? 
+    self.guard = guard
+    # Different sums of bandwidths
+    self.total_bw = 0
+    self.total_exit_bw = 0
+    self.total_guard_bw = 0
+    self.total_weighted_bw = 0
+    self.pathlen = pathlen
+    NodeGenerator.__init__(self, sorted_r, rstr_list)
+
+  def rebuild(self, sorted_r=None):
+    NodeGenerator.rebuild(self, sorted_r)
+    NodeGenerator.rewind(self)
+    # Set the exit_weight
+    # We are choosing a non-exit
+    self.total_exit_bw = 0
+    self.total_guard_bw = 0
+    self.total_bw = 0
+    for r in self.routers:
+      # TODO: Check max_bandwidth and cap...
+      self.total_bw += r.bw
+      if "Exit" in r.flags:
+        self.total_exit_bw += r.bw
+      if "Guard" in r.flags:
+        self.total_guard_bw += r.bw
+
+    bw_per_hop = (1.0*self.total_bw)/self.pathlen
+
+    # Print some debugging info about bandwidth ratios
+    if self.total_bw > 0:
+      e_ratio = self.total_exit_bw/float(self.total_bw)
+      g_ratio = self.total_guard_bw/float(self.total_bw)
+    else:
+      g_ratio = 0
+      e_ratio = 0
+    plog("DEBUG",
+       "E = " + str(self.total_exit_bw) +
+       ", G = " + str(self.total_guard_bw) +
+       ", T = " + str(self.total_bw) +
+       ", g_ratio = " + str(g_ratio) + ", e_ratio = " +str(e_ratio) +
+       ", bw_per_hop = " + str(bw_per_hop))
+   
+    if self.exit:
+      self.exit_weight = 1.0
+    else:
+      if self.total_exit_bw < bw_per_hop:
+        # Don't use exit nodes at all
+        self.exit_weight = 0
+      else:
+        if self.total_exit_bw > 0:
+          self.exit_weight = ((self.total_exit_bw-bw_per_hop)/self.total_exit_bw)
+        else: self.exit_weight = 0
+
+    if self.guard:
+      self.guard_weight = 1.0
+    else:
+      if self.total_guard_bw < bw_per_hop:
+        # Don't use exit nodes at all
+        self.guard_weight = 0
+      else:
+        if self.total_guard_bw > 0:
+          self.guard_weight = ((self.total_guard_bw-bw_per_hop)/self.total_guard_bw)
+        else: self.guard_weight = 0
+    
+    for r in self.routers:
+      bw = r.bw
+      if "Exit" in r.flags:
+        bw *= self.exit_weight
+      if "Guard" in r.flags:
+        bw *= self.guard_weight
+      self.total_weighted_bw += bw
+
+    self.total_weighted_bw = int(self.total_weighted_bw)
+    plog("DEBUG", "Bw: "+str(self.total_weighted_bw)+"/"+str(self.total_bw)
+          +". The exit-weight is: "+str(self.exit_weight)
+          + ", guard weight is: "+str(self.guard_weight))
+
+  def generate(self):
+    while True:
+      # Choose a suitable random int
+      i = random.randint(0, self.total_weighted_bw)
+
+      # Go through the routers
+      for r in self.routers:
+        # Below zero here means next() -> choose a new random int+router 
+        if i < 0: break
+        bw = r.bw
+        if "Exit" in r.flags:
+          bw *= self.exit_weight
+        if "Guard" in r.flags:
+          bw *= self.guard_weight
+
+        i -= bw
+        if i < 0:
+          plog("DEBUG", "Chosen router with a bandwidth of: " + str(r.bw))
+          yield r
+
+####################### Secret Sauce ###########################
+
+class PathError(Exception):
+  pass
+
+class NoRouters(PathError):
+  pass
+
+class PathSelector:
+  """Implementation of path selection policies. Builds a path according
+     to entry, middle, and exit generators that satisfies the path 
+     restrictions."""
+  def __init__(self, entry_gen, mid_gen, exit_gen, path_restrict):
+    """Constructor. The first three arguments are NodeGenerators with 
+     their appropriate restrictions. The 'path_restrict' is a
+     PathRestrictionList"""
+    self.entry_gen = entry_gen
+    self.mid_gen = mid_gen
+    self.exit_gen = exit_gen
+    self.path_restrict = path_restrict
+
+  def rebuild_gens(self, sorted_r):
+    "Rebuild the 3 generators with a new sorted router list"
+    self.entry_gen.rebuild(sorted_r)
+    self.mid_gen.rebuild(sorted_r)
+    self.exit_gen.rebuild(sorted_r)
+
+  def select_path(self, pathlen):
+    """Creates a path of 'pathlen' hops, and returns it as a list of
+       Router instances"""
+    self.entry_gen.rewind()
+    self.mid_gen.rewind()
+    self.exit_gen.rewind()
+    entry = self.entry_gen.generate()
+    mid = self.mid_gen.generate()
+    ext = self.exit_gen.generate()
+      
+    plog("DEBUG", "Selecting path..")
+
+    while True:
+      path = []
+      plog("DEBUG", "Building path..")
+      try:
+        if pathlen == 1:
+          path = [ext.next()]
+        else:
+          path.append(entry.next())
+          for i in xrange(1, pathlen-1):
+            path.append(mid.next())
+          path.append(ext.next())
+        if self.path_restrict.path_is_ok(path):
+          self.entry_gen.mark_chosen(path[0])
+          for i in xrange(1, pathlen-1):
+            self.mid_gen.mark_chosen(path[i])
+          self.exit_gen.mark_chosen(path[pathlen-1])
+          plog("DEBUG", "Marked path.")
+          break
+        else:
+          plog("DEBUG", "Path rejected by path restrictions.")
+      except StopIteration:
+        plog("NOTICE", "Ran out of routers during buildpath..");
+        self.entry_gen.rewind()
+        self.mid_gen.rewind()
+        self.exit_gen.rewind()
+        entry = self.entry_gen.generate()
+        mid = self.mid_gen.generate()
+        ext = self.exit_gen.generate()
+    for r in path:
+      r.refcount += 1
+      plog("DEBUG", "Circ refcount "+str(r.refcount)+" for "+r.idhex)
+    return path
+
+# TODO: Implement example manager.
+class BaseSelectionManager:
+   """
+   The BaseSelectionManager is a minimalistic node selection manager.
+
+   It is meant to be used with a PathSelector that consists of an
+   entry NodeGenerator, a middle NodeGenerator, and an exit NodeGenerator.
+
+   However, none of these are absolutely necessary. It is possible
+   to completely avoid them if you wish by hacking whatever selection
+   mechanisms you want straight into this interface and then passing
+   an instance to a PathBuilder implementation.
+   """
+   def __init__(self):
+     self.bad_restrictions = False
+     self.consensus = None
+
+   def reconfigure(self, consensus=None):
+     """ 
+     This method is called whenever a significant configuration change
+     occurs. Currently, this only happens via PathBuilder.__init__ and
+     PathBuilder.schedule_selmgr().
+     
+     This method should NOT throw any exceptions.
+     """
+     pass
+
+   def new_consensus(self, consensus):
+     """ 
+     This method is called whenever a consensus change occurs.
+     
+     This method should NOT throw any exceptions.
+     """
+     pass
+
+   def set_exit(self, exit_name):
+     """
+     This method provides notification that a fixed exit is desired.
+
+     This method should NOT throw any exceptions.
+     """
+     pass
+
+   def set_target(self, host, port):
+     """
+     This method provides notification that a new target endpoint is
+     desired.
+
+     May throw a RestrictionError if target is impossible to reach.
+     """
+     pass
+
+   def select_path(self):
+     """
+     Returns a new path in the form of a list() of Router instances.
+
+     May throw a RestrictionError.
+     """
+     pass
+
+class SelectionManager(BaseSelectionManager):
+  """Helper class to handle configuration updates
+    
+    The methods are NOT threadsafe. They may ONLY be called from
+    EventHandler's thread. This means that to update the selection 
+    manager, you must schedule a config update job using 
+    PathBuilder.schedule_selmgr() with a worker function to modify 
+    this object.
+ 
+    XXX: Warning. The constructor of this class is subject to change
+    and may undergo reorganization in the near future. Watch for falling 
+    bits.
+    """
+  # XXX: Hrmm, consider simplifying this. It is confusing and unweildy.
+  def __init__(self, pathlen, order_exits,
+         percent_fast, percent_skip, min_bw, use_all_exits,
+         uniform, use_exit, use_guards,geoip_config=None,
+         restrict_guards=False, extra_node_rstr=None, exit_ports=None):
+    BaseSelectionManager.__init__(self)
+    self.__ordered_exit_gen = None 
+    self.pathlen = pathlen
+    self.order_exits = order_exits
+    self.percent_fast = percent_fast
+    self.percent_skip = percent_skip
+    self.min_bw = min_bw
+    self.use_all_exits = use_all_exits
+    self.uniform = uniform
+    self.exit_id = use_exit
+    self.use_guards = use_guards
+    self.geoip_config = geoip_config
+    self.restrict_guards_only = restrict_guards
+    self.bad_restrictions = False
+    self.consensus = None
+    self.exit_ports = exit_ports
+    self.extra_node_rstr=extra_node_rstr
+
+  def reconfigure(self, consensus=None):
+    try:
+      self._reconfigure(consensus)
+      self.bad_restrictions = False
+    except NoNodesRemain:
+      plog("WARN", "No nodes remain in selection manager")
+      self.bad_restrictions = True
+    return self.bad_restrictions
+
+  def _reconfigure(self, consensus=None):
+    """This function is called after a configuration change, 
+     to rebuild the RestrictionLists."""
+    if consensus: 
+      plog("DEBUG", "Reconfigure with consensus")
+      self.consensus = consensus
+    else:
+      plog("DEBUG", "Reconfigure without consensus")
+
+    sorted_r = self.consensus.sorted_r
+
+    if self.use_all_exits:
+      self.path_rstr = PathRestrictionList([UniqueRestriction()])
+    else:
+      self.path_rstr = PathRestrictionList(
+           [Subnet16Restriction(), UniqueRestriction()])
+  
+    if self.use_guards: entry_flags = ["Guard", "Running", "Fast"]
+    else: entry_flags = ["Running", "Fast"]
+
+    if self.restrict_guards_only:
+      nonentry_skip = 0
+      nonentry_fast = 100
+    else:
+      nonentry_skip = self.percent_skip
+      nonentry_fast = self.percent_fast
+
+    # XXX: sometimes we want the ability to do uniform scans
+    # without the conserve exit restrictions..
+    entry_rstr = NodeRestrictionList(
+      [PercentileRestriction(self.percent_skip, self.percent_fast, sorted_r),
+       OrNodeRestriction(
+           [FlagsRestriction(["BadExit"]),
+           ConserveExitsRestriction(self.exit_ports)]),
+       FlagsRestriction(entry_flags, [])]
+    )
+    mid_rstr = NodeRestrictionList(
+      [PercentileRestriction(nonentry_skip, nonentry_fast, sorted_r),
+       OrNodeRestriction(
+           [FlagsRestriction(["BadExit"]),
+           ConserveExitsRestriction(self.exit_ports)]),
+       FlagsRestriction(["Running","Fast"], [])]
+
+    )
+
+    if self.exit_id:
+      self._set_exit(self.exit_id)
+      plog("DEBUG", "Applying Setexit: "+self.exit_id)
+      self.exit_rstr = NodeRestrictionList([IdHexRestriction(self.exit_id)])
+    elif self.use_all_exits:
+      self.exit_rstr = NodeRestrictionList(
+        [FlagsRestriction(["Running","Fast"], ["BadExit"])])
+    else:
+      self.exit_rstr = NodeRestrictionList(
+        [PercentileRestriction(nonentry_skip, nonentry_fast, sorted_r),
+         FlagsRestriction(["Running","Fast"], ["BadExit"])])
+
+    if self.extra_node_rstr:
+      entry_rstr.add_restriction(self.extra_node_rstr)
+      mid_rstr.add_restriction(self.extra_node_rstr)
+      self.exit_rstr.add_restriction(self.extra_node_rstr)
+
+    # GeoIP configuration
+    if self.geoip_config:
+      # Every node needs country_code 
+      entry_rstr.add_restriction(CountryCodeRestriction())
+      mid_rstr.add_restriction(CountryCodeRestriction())
+      self.exit_rstr.add_restriction(CountryCodeRestriction())
+      
+      # Specified countries for different positions
+      if self.geoip_config.entry_country:
+        entry_rstr.add_restriction(CountryRestriction(self.geoip_config.entry_country))
+      if self.geoip_config.middle_country:
+        mid_rstr.add_restriction(CountryRestriction(self.geoip_config.middle_country))
+      if self.geoip_config.exit_country:
+        self.exit_rstr.add_restriction(CountryRestriction(self.geoip_config.exit_country))
+
+      # Excluded countries
+      if self.geoip_config.excludes:
+        plog("INFO", "Excluded countries: " + str(self.geoip_config.excludes))
+        if len(self.geoip_config.excludes) > 0:
+          entry_rstr.add_restriction(ExcludeCountriesRestriction(self.geoip_config.excludes))
+          mid_rstr.add_restriction(ExcludeCountriesRestriction(self.geoip_config.excludes))
+          self.exit_rstr.add_restriction(ExcludeCountriesRestriction(self.geoip_config.excludes))
+      
+      # Unique countries set? None --> pass
+      if self.geoip_config.unique_countries != None:
+        if self.geoip_config.unique_countries:
+          # If True: unique countries 
+          self.path_rstr.add_restriction(UniqueCountryRestriction())
+        else:
+          # False: use the same country for all nodes in a path
+          self.path_rstr.add_restriction(SingleCountryRestriction())
+      
+      # Specify max number of continent crossings, None means UniqueContinents
+      if self.geoip_config.continent_crossings == None:
+        self.path_rstr.add_restriction(UniqueContinentRestriction())
+      else: self.path_rstr.add_restriction(ContinentRestriction(self.geoip_config.continent_crossings))
+      # Should even work in combination with continent crossings
+      if self.geoip_config.ocean_crossings != None:
+        self.path_rstr.add_restriction(OceanPhobicRestriction(self.geoip_config.ocean_crossings))
+
+    # This is kind of hokey..
+    if self.order_exits:
+      if self.__ordered_exit_gen:
+        exitgen = self.__ordered_exit_gen
+        exitgen.reset_restriction(self.exit_rstr)
+      else:
+        exitgen = self.__ordered_exit_gen = \
+          OrderedExitGenerator(80, sorted_r, self.exit_rstr)
+    elif self.uniform:
+      exitgen = ExactUniformGenerator(sorted_r, self.exit_rstr)
+    else:
+      exitgen = BwWeightedGenerator(sorted_r, self.exit_rstr, self.pathlen, exit=True)
+
+    if self.uniform:
+      self.path_selector = PathSelector(
+         ExactUniformGenerator(sorted_r, entry_rstr),
+         ExactUniformGenerator(sorted_r, mid_rstr),
+         exitgen, self.path_rstr)
+    else:
+      # Remove ConserveExitsRestriction for entry and middle positions
+      # by removing the OrNodeRestriction that contains it...
+      # FIXME: This is a landmine for a poor soul to hit.
+      # Then again, most of the rest of this function is, too.
+      entry_rstr.del_restriction(OrNodeRestriction)
+      mid_rstr.del_restriction(OrNodeRestriction)
+      self.path_selector = PathSelector(
+         BwWeightedGenerator(sorted_r, entry_rstr, self.pathlen,
+                             guard=self.use_guards),
+         BwWeightedGenerator(sorted_r, mid_rstr, self.pathlen),
+         exitgen, self.path_rstr)
+      return
+
+  def _set_exit(self, exit_name):
+    # sets an exit, if bad, sets bad_exit
+    exit_id = None
+    if exit_name:
+      if exit_name[0] == '$':
+        exit_id = exit_name
+      elif exit_name in self.consensus.name_to_key:
+        exit_id = self.consensus.name_to_key[exit_name]
+    self.exit_id = exit_id
+
+  def set_exit(self, exit_name):
+    self._set_exit(exit_name)
+    self.exit_rstr.clear()
+    if not self.exit_id:
+      plog("NOTICE", "Requested null exit "+str(self.exit_id))
+      self.bad_restrictions = True
+    elif self.exit_id[1:] not in self.consensus.routers:
+      plog("NOTICE", "Requested absent exit "+str(self.exit_id))
+      self.bad_restrictions = True
+    elif self.consensus.routers[self.exit_id[1:]].down:
+      e = self.consensus.routers[self.exit_id[1:]]
+      plog("NOTICE", "Requested downed exit "+str(self.exit_id)+" (bw: "+str(e.bw)+", flags: "+str(e.flags)+")")
+      self.bad_restrictions = True
+    elif self.consensus.routers[self.exit_id[1:]].deleted:
+      e = self.consensus.routers[self.exit_id[1:]]
+      plog("NOTICE", "Requested deleted exit "+str(self.exit_id)+" (bw: "+str(e.bw)+", flags: "+str(e.flags)+", Down: "+str(e.down)+", ref: "+str(e.refcount)+")")
+      self.bad_restrictions = True
+    else:
+      self.exit_rstr.add_restriction(IdHexRestriction(self.exit_id))
+      plog("DEBUG", "Added exit restriction for "+self.exit_id)
+      try:
+        self.path_selector.exit_gen.rebuild()
+        self.bad_restrictions = False
+      except RestrictionError, e:
+        plog("WARN", "Restriction error "+str(e)+" after set_exit")
+        self.bad_restrictions = True
+    return self.bad_restrictions
+
+  def new_consensus(self, consensus):
+    self.consensus = consensus
+    try:
+      self.path_selector.rebuild_gens(self.consensus.sorted_r)
+      if self.exit_id:
+        self.set_exit(self.exit_id)
+    except NoNodesRemain:
+      plog("NOTICE", "No viable nodes in consensus for restrictions.")
+      # Punting + performing reconfigure..")
+      #self.reconfigure(consensus)
+
+  def set_target(self, ip, port):
+    # sets an exit policy, if bad, rasies exception..
+    "Called to update the ExitPolicyRestrictions with a new ip and port"
+    if self.bad_restrictions:
+      plog("WARN", "Requested target with bad restrictions")
+      raise RestrictionError()
+    self.exit_rstr.del_restriction(ExitPolicyRestriction)
+    self.exit_rstr.add_restriction(ExitPolicyRestriction(ip, port))
+    if self.__ordered_exit_gen: self.__ordered_exit_gen.set_port(port)
+    # Try to choose an exit node in the destination country
+    # needs an IP != 255.255.255.255
+    if self.geoip_config and self.geoip_config.echelon:
+      import GeoIPSupport
+      c = GeoIPSupport.get_country(ip)
+      if c:
+        plog("INFO", "[Echelon] IP "+ip+" is in ["+c+"]")
+        self.exit_rstr.del_restriction(CountryRestriction)
+        self.exit_rstr.add_restriction(CountryRestriction(c))
+      else: 
+        plog("INFO", "[Echelon] Could not determine destination country of IP "+ip)
+        # Try to use a backup country
+        if self.geoip_config.exit_country:
+          self.exit_rstr.del_restriction(CountryRestriction) 
+          self.exit_rstr.add_restriction(CountryRestriction(self.geoip_config.exit_country))
+    # Need to rebuild exit generator
+    self.path_selector.exit_gen.rebuild()
+
+  def select_path(self):
+    if self.bad_restrictions:
+      plog("WARN", "Requested target with bad restrictions")
+      raise RestrictionError()
+    return self.path_selector.select_path(self.pathlen)
+
+class Circuit:
+  "Class to describe a circuit"
+  def __init__(self):
+    self.circ_id = 0
+    self.path = [] # routers
+    self.exit = None
+    self.built = False
+    self.failed = False
+    self.dirty = False
+    self.requested_closed = False
+    self.detached_cnt = 0
+    self.last_extended_at = time.time()
+    self.extend_times = []      # List of all extend-durations
+    self.setup_duration = None  # Sum of extend-times
+    self.pending_streams = []   # Which stream IDs are pending us
+    # XXX: Unused.. Need to use for refcounting because
+    # sometimes circuit closed events come before the stream
+    # close and we need to track those failures..
+    self.carried_streams = []
+
+  def id_path(self):
+    "Returns a list of idhex keys for the path of Routers"
+    return map(lambda r: r.idhex, self.path)
+
+class Stream:
+  "Class to describe a stream"
+  def __init__(self, sid, host, port, kind):
+    self.strm_id = sid
+    self.detached_from = [] # circ id #'s
+    self.pending_circ = None
+    self.circ = None
+    self.host = host
+    self.port = port
+    self.kind = kind
+    self.attached_at = 0
+    self.bytes_read = 0
+    self.bytes_written = 0
+    self.failed = False
+    self.ignored = False # Set if PURPOSE=DIR_*
+    self.failed_reason = None # Cheating a little.. Only used by StatsHandler
+
+  def lifespan(self, now):
+    "Returns the age of the stream"
+    return now-self.attached_at
+
+_origsocket = socket.socket
+class _SocketWrapper(socket.socket):
+  """ Ghetto wrapper to workaround python same_slots_added() and 
+      socket __base__ braindamage """
+  pass
+
+class SmartSocket(_SocketWrapper):
+  """ A SmartSocket is a socket that tracks global socket creation
+      for local ports. It has a member StreamSelector that can
+      be used as a PathBuilder stream StreamSelector (see below).
+
+      Most users will want to reset the base class of SocksiPy to
+      use this class:
+      __oldsocket = socket.socket
+      socket.socket = PathSupport.SmartSocket
+      import SocksiPy
+      socket.socket = __oldsocket
+   """
+  port_table = set()
+  _table_lock = threading.Lock()
+
+  def connect(self, args):
+    ret = super(SmartSocket, self).connect(args)
+    myaddr = self.getsockname()
+    self.__local_addr = myaddr[0]+":"+str(myaddr[1])
+    SmartSocket._table_lock.acquire()
+    assert(self.__local_addr not in SmartSocket.port_table)
+    SmartSocket.port_table.add(myaddr[0]+":"+str(myaddr[1]))
+    SmartSocket._table_lock.release()
+    plog("DEBUG", "Added "+self.__local_addr+" to our local port list")
+    return ret
+
+  def connect_ex(self, args):
+    ret = super(SmartSocket, self).connect_ex(args)
+    myaddr = ret.getsockname()
+    self.__local_addr = myaddr[0]+":"+str(myaddr[1])
+    SmartSocket._table_lock.acquire()
+    assert(self.__local_addr not in SmartSocket.port_table)
+    SmartSocket.port_table.add(myaddr[0]+":"+str(myaddr[1]))
+    SmartSocket._table_lock.release()
+    plog("DEBUG", "Added "+self.__local_addr+" to our local port list")
+    return ret
+
+  def __del__(self):
+    SmartSocket._table_lock.acquire()
+    SmartSocket.port_table.remove(self.__local_addr)
+    SmartSocket._table_lock.release()
+    plog("DEBUG", "Removed "+self.__local_addr+" from our local port list")
+
+  def table_size():
+    SmartSocket._table_lock.acquire()
+    ret = len(SmartSocket.port_table)
+    SmartSocket._table_lock.release()
+    return ret
+  table_size = Callable(table_size)
+
+  def clear_port_table():
+    """ WARNING: Calling this periodically is a *really good idea*.
+        Relying on __del__ can expose you to race conditions on garbage
+        collection between your processes. """
+    SmartSocket._table_lock.acquire()
+    for i in list(SmartSocket.port_table):
+      plog("DEBUG", "Cleared "+i+" from our local port list")
+      SmartSocket.port_table.remove(i)
+    SmartSocket._table_lock.release()
+  clear_port_table = Callable(clear_port_table)
+
+  def StreamSelector(host, port):
+    to_test = host+":"+str(port)
+    SmartSocket._table_lock.acquire()
+    ret = (to_test in SmartSocket.port_table)
+    SmartSocket._table_lock.release()
+    return ret
+  StreamSelector = Callable(StreamSelector)
+
+
+def StreamSelector(host, port):
+  """ A StreamSelector is a function that takes a host and a port as
+      arguments (parsed from Tor's SOURCE_ADDR field in STREAM NEW
+      events) and decides if it is a stream from this process or not.
+
+      This StreamSelector is just a placeholder that always returns True.
+      When you define your own, be aware that you MUST DO YOUR OWN
+      LOCKING inside this function, as it is called from the Eventhandler
+      thread.
+
+      See PathSupport.SmartSocket.StreamSelctor for an actual
+      implementation.
+
+  """
+  return True
+
+# TODO: Make passive "PathWatcher" so people can get aggregate 
+# node reliability stats for normal usage without us attaching streams
+# Can use __metaclass__ and type
+
+class PathBuilder(TorCtl.ConsensusTracker):
+  """
+  PathBuilder implementation. Handles circuit construction, subject
+  to the constraints of the SelectionManager selmgr.
+  
+  Do not access this object from other threads. Instead, use the 
+  schedule_* functions to schedule work to be done in the thread
+  of the EventHandler.
+  """
+  def __init__(self, c, selmgr, RouterClass=TorCtl.Router,
+               strm_selector=StreamSelector):
+    """Constructor. 'c' is a Connection, 'selmgr' is a SelectionManager,
+    and 'RouterClass' is a class that inherits from Router and is used
+    to create annotated Routers."""
+    TorCtl.ConsensusTracker.__init__(self, c, RouterClass)
+    self.last_exit = None
+    self.new_nym = False
+    self.resolve_port = 0
+    self.num_circuits = 1
+    self.circuits = {}
+    self.streams = {}
+    self.selmgr = selmgr
+    self.selmgr.reconfigure(self.current_consensus())
+    self.imm_jobs = Queue.Queue()
+    self.low_prio_jobs = Queue.Queue()
+    self.run_all_jobs = False
+    self.do_reconfigure = False
+    self.strm_selector = strm_selector
+    plog("INFO", "Read "+str(len(self.sorted_r))+"/"+str(len(self.ns_map))+" routers")
+
+  def schedule_immediate(self, job):
+    """
+    Schedules an immediate job to be run before the next event is
+    processed.
+    """
+    assert(self.c.is_live())
+    self.imm_jobs.put(job)
+
+  def schedule_low_prio(self, job):
+    """
+    Schedules a job to be run when a non-time critical event arrives.
+    """
+    assert(self.c.is_live())
+    self.low_prio_jobs.put(job)
+
+  def reset(self):
+    """
+    Resets accumulated state. Currently only clears the 
+    ExactUniformGenerator state.
+    """
+    plog("DEBUG", "Resetting _generated values for ExactUniformGenerator")
+    for r in self.routers.itervalues():
+      for g in xrange(0, len(r._generated)):
+        r._generated[g] = 0
+
+  def is_urgent_event(event):
+    # If event is stream:NEW*/DETACHED or circ BUILT/FAILED, 
+    # it is high priority and requires immediate action.
+    if isinstance(event, TorCtl.CircuitEvent):
+      if event.status in ("BUILT", "FAILED", "CLOSED"):
+        return True
+    elif isinstance(event, TorCtl.StreamEvent):
+      if event.status in ("NEW", "NEWRESOLVE", "DETACHED"):
+        return True
+    return False
+  is_urgent_event = Callable(is_urgent_event)
+
+  def schedule_selmgr(self, job):
+    """
+    Schedules an immediate job to be run before the next event is
+    processed. Also notifies the selection manager that it needs
+    to update itself.
+    """
+    assert(self.c.is_live())
+    def notlambda(this):
+      job(this.selmgr)
+      this.do_reconfigure = True
+    self.schedule_immediate(notlambda)
+
+     
+  def heartbeat_event(self, event):
+    """This function handles dispatching scheduled jobs. If you 
+       extend PathBuilder and want to implement this function for 
+       some reason, be sure to call the parent class"""
+    while not self.imm_jobs.empty():
+      imm_job = self.imm_jobs.get_nowait()
+      imm_job(self)
+    
+    if self.do_reconfigure:
+      self.selmgr.reconfigure(self.current_consensus())
+      self.do_reconfigure = False
+    
+    if self.run_all_jobs:
+      while not self.low_prio_jobs.empty() and self.run_all_jobs:
+        imm_job = self.low_prio_jobs.get_nowait()
+        imm_job(self)
+      self.run_all_jobs = False
+      return
+
+    # If event is stream:NEW*/DETACHED or circ BUILT/FAILED, 
+    # don't run low prio jobs.. No need to delay streams for them.
+    if PathBuilder.is_urgent_event(event): return
+   
+    # Do the low prio jobs one at a time in case a 
+    # higher priority event is queued   
+    if not self.low_prio_jobs.empty():
+      delay_job = self.low_prio_jobs.get_nowait()
+      delay_job(self)
+
+  def build_path(self):
+    """ Get a path from the SelectionManager's PathSelector, can be used 
+        e.g. for generating paths without actually creating any circuits """
+    return self.selmgr.select_path()
+
+  def close_all_streams(self, reason):
+    """ Close all open streams """
+    for strm in self.streams.itervalues():
+      if not strm.ignored:
+        try:
+          self.c.close_stream(strm.strm_id, reason)
+        except TorCtl.ErrorReply, e:
+          # This can happen. Streams can timeout before this call.
+          plog("NOTICE", "Error closing stream "+str(strm.strm_id)+": "+str(e))
+
+  def close_all_circuits(self):
+    """ Close all open circuits """
+    for circ in self.circuits.itervalues():
+      self.close_circuit(circ.circ_id)
+
+  def close_circuit(self, id):
+    """ Close a circuit with given id """
+    # TODO: Pass streams to another circ before closing?
+    plog("DEBUG", "Requesting close of circuit id: "+str(id))
+    if self.circuits[id].requested_closed: return
+    self.circuits[id].requested_closed = True
+    try: self.c.close_circuit(id)
+    except TorCtl.ErrorReply, e: 
+      plog("ERROR", "Failed closing circuit " + str(id) + ": " + str(e))
+
+  def circuit_list(self):
+    """ Return an iterator or a list of circuits prioritized for 
+        stream selection."""
+    return self.circuits.itervalues()
+
+  def attach_stream_any(self, stream, badcircs):
+    "Attach a stream to a valid circuit, avoiding any in 'badcircs'"
+    # Newnym, and warn if not built plus pending
+    unattached_streams = [stream]
+    if self.new_nym:
+      self.new_nym = False
+      plog("DEBUG", "Obeying new nym")
+      for key in self.circuits.keys():
+        if (not self.circuits[key].dirty
+            and len(self.circuits[key].pending_streams)):
+          plog("WARN", "New nym called, destroying circuit "+str(key)
+             +" with "+str(len(self.circuits[key].pending_streams))
+             +" pending streams")
+          unattached_streams.extend(self.circuits[key].pending_streams)
+          self.circuits[key].pending_streams = []
+        # FIXME: Consider actually closing circ if no streams.
+        self.circuits[key].dirty = True
+      
+    for circ in self.circuit_list():
+      if circ.built and not circ.requested_closed and not circ.dirty \
+          and circ.circ_id not in badcircs:
+        # XXX: Fails for 'tor-resolve 530.19.6.80' -> NEWRESOLVE
+        if circ.exit.will_exit_to(stream.host, stream.port):
+          try:
+            self.c.attach_stream(stream.strm_id, circ.circ_id)
+            stream.pending_circ = circ # Only one possible here
+            circ.pending_streams.append(stream)
+          except TorCtl.ErrorReply, e:
+            # No need to retry here. We should get the failed
+            # event for either the circ or stream next
+            plog("WARN", "Error attaching new stream: "+str(e.args))
+            return
+          break
+    else:
+      circ = None
+      try:
+        self.selmgr.set_target(stream.host, stream.port)
+        circ = self.c.build_circuit(self.selmgr.select_path())
+      except RestrictionError, e:
+        # XXX: Dress this up a bit
+        self.last_exit = None
+        # Kill this stream
+        plog("WARN", "Closing impossible stream "+str(stream.strm_id)+" ("+str(e)+")")
+        try:
+          self.c.close_stream(stream.strm_id, "4") # END_STREAM_REASON_EXITPOLICY
+        except TorCtl.ErrorReply, e:
+          plog("WARN", "Error closing stream: "+str(e))
+        return
+      except TorCtl.ErrorReply, e:
+        plog("WARN", "Error building circ: "+str(e.args))
+        self.last_exit = None
+        # Kill this stream
+        plog("NOTICE", "Closing stream "+str(stream.strm_id))
+        try:
+          self.c.close_stream(stream.strm_id, "5") # END_STREAM_REASON_DESTROY
+        except TorCtl.ErrorReply, e:
+          plog("WARN", "Error closing stream: "+str(e))
+        return
+      for u in unattached_streams:
+        plog("DEBUG",
+           "Attaching "+str(u.strm_id)+" pending build of "+str(circ.circ_id))
+        u.pending_circ = circ
+      circ.pending_streams.extend(unattached_streams)
+      self.circuits[circ.circ_id] = circ
+    self.last_exit = circ.exit
+    plog("DEBUG", "Set last exit to "+self.last_exit.idhex)
+
+  def circ_status_event(self, c):
+    output = [str(time.time()-c.arrived_at), c.event_name, str(c.circ_id),
+              c.status]
+    if c.path: output.append(",".join(c.path))
+    if c.reason: output.append("REASON=" + c.reason)
+    if c.remote_reason: output.append("REMOTE_REASON=" + c.remote_reason)
+    plog("DEBUG", " ".join(output))
+    # Circuits we don't control get built by Tor
+    if c.circ_id not in self.circuits:
+      plog("DEBUG", "Ignoring circ " + str(c.circ_id))
+      return
+    if c.status == "EXTENDED":
+      self.circuits[c.circ_id].last_extended_at = c.arrived_at
+    elif c.status == "FAILED" or c.status == "CLOSED":
+      # XXX: Can still get a STREAM FAILED for this circ after this
+      circ = self.circuits[c.circ_id]
+      for r in circ.path:
+        r.refcount -= 1
+        plog("DEBUG", "Close refcount "+str(r.refcount)+" for "+r.idhex)
+        if r.deleted and r.refcount == 0:
+          # XXX: This shouldn't happen with StatsRouters.. 
+          if r.__class__.__name__ == "StatsRouter":
+            plog("WARN", "Purging expired StatsRouter "+r.idhex)
+          else:
+            plog("INFO", "Purging expired router "+r.idhex)
+          del self.routers[r.idhex]
+          self.selmgr.new_consensus(self.current_consensus())
+      del self.circuits[c.circ_id]
+      for stream in circ.pending_streams:
+        # If it was built, let Tor decide to detach or fail the stream
+        if not circ.built:
+          plog("DEBUG", "Finding new circ for " + str(stream.strm_id))
+          self.attach_stream_any(stream, stream.detached_from)
+        else:
+          plog("NOTICE", "Waiting on Tor to hint about stream "+str(stream.strm_id)+" on closed circ "+str(circ.circ_id))
+    elif c.status == "BUILT":
+      self.circuits[c.circ_id].built = True
+      try:
+        for stream in self.circuits[c.circ_id].pending_streams:
+          self.c.attach_stream(stream.strm_id, c.circ_id)
+      except TorCtl.ErrorReply, e:
+        # No need to retry here. We should get the failed
+        # event for either the circ or stream in the next event
+        plog("NOTICE", "Error attaching pending stream: "+str(e.args))
+        return
+
+  def stream_status_event(self, s):
+    output = [str(time.time()-s.arrived_at), s.event_name, str(s.strm_id),
+              s.status, str(s.circ_id),
+          s.target_host, str(s.target_port)]
+    if s.reason: output.append("REASON=" + s.reason)
+    if s.remote_reason: output.append("REMOTE_REASON=" + s.remote_reason)
+    if s.purpose: output.append("PURPOSE=" + s.purpose)
+    if s.source_addr: output.append("SOURCE_ADDR="+s.source_addr)
+    if not re.match(r"\d+.\d+.\d+.\d+", s.target_host):
+      s.target_host = "255.255.255.255" # ignore DNS for exit policy check
+
+    # Hack to ignore Tor-handled streams
+    if s.strm_id in self.streams and self.streams[s.strm_id].ignored:
+      if s.status == "CLOSED":
+        plog("DEBUG", "Deleting ignored stream: " + str(s.strm_id))
+        del self.streams[s.strm_id]
+      else:
+        plog("DEBUG", "Ignoring stream: " + str(s.strm_id))
+      return
+
+    plog("DEBUG", " ".join(output))
+    # XXX: Copy s.circ_id==0 check+reset from StatsSupport here too?
+
+    if s.status == "NEW" or s.status == "NEWRESOLVE":
+      if s.status == "NEWRESOLVE" and not s.target_port:
+        s.target_port = self.resolve_port
+      if s.circ_id == 0:
+        self.streams[s.strm_id] = Stream(s.strm_id, s.target_host, s.target_port, s.status)
+      elif s.strm_id not in self.streams:
+        plog("NOTICE", "Got new stream "+str(s.strm_id)+" with circuit "
+                       +str(s.circ_id)+" already attached.")
+        self.streams[s.strm_id] = Stream(s.strm_id, s.target_host, s.target_port, s.status)
+        self.streams[s.strm_id].circ_id = s.circ_id
+
+      # Remember Tor-handled streams (Currently only directory streams)
+
+      if s.purpose and s.purpose.find("DIR_") == 0:
+        self.streams[s.strm_id].ignored = True
+        plog("DEBUG", "Ignoring stream: " + str(s.strm_id))
+        return
+      elif s.source_addr:
+        src_addr = s.source_addr.split(":")
+        src_addr[1] = int(src_addr[1])
+        if not self.strm_selector(*src_addr):
+          self.streams[s.strm_id].ignored = True
+          plog("INFO", "Ignoring foreign stream: " + str(s.strm_id))
+          return
+      if s.circ_id == 0:
+        self.attach_stream_any(self.streams[s.strm_id],
+                   self.streams[s.strm_id].detached_from)
+    elif s.status == "DETACHED":
+      if s.strm_id not in self.streams:
+        plog("WARN", "Detached stream "+str(s.strm_id)+" not found")
+        self.streams[s.strm_id] = Stream(s.strm_id, s.target_host,
+                      s.target_port, "NEW")
+      # FIXME Stats (differentiate Resolved streams also..)
+      if not s.circ_id:
+        if s.reason == "TIMEOUT" or s.reason == "EXITPOLICY":
+          plog("NOTICE", "Stream "+str(s.strm_id)+" detached with "+s.reason)
+        else:
+          plog("WARN", "Stream "+str(s.strm_id)+" detached from no circuit with reason: "+str(s.reason))
+      else:
+        self.streams[s.strm_id].detached_from.append(s.circ_id)
+
+      if self.streams[s.strm_id].pending_circ and \
+           self.streams[s.strm_id] in \
+                  self.streams[s.strm_id].pending_circ.pending_streams:
+        self.streams[s.strm_id].pending_circ.pending_streams.remove(
+                                                self.streams[s.strm_id])
+      self.streams[s.strm_id].pending_circ = None
+      self.attach_stream_any(self.streams[s.strm_id],
+                   self.streams[s.strm_id].detached_from)
+    elif s.status == "SUCCEEDED":
+      if s.strm_id not in self.streams:
+        plog("NOTICE", "Succeeded stream "+str(s.strm_id)+" not found")
+        return
+      if s.circ_id and self.streams[s.strm_id].pending_circ.circ_id != s.circ_id:
+        # Hrmm.. this can happen on a new-nym.. Very rare, putting warn
+        # in because I'm still not sure this is correct
+        plog("WARN", "Mismatch of pending: "
+          +str(self.streams[s.strm_id].pending_circ.circ_id)+" vs "
+          +str(s.circ_id))
+        # This can happen if the circuit existed before we started up
+        if s.circ_id in self.circuits:
+          self.streams[s.strm_id].circ = self.circuits[s.circ_id]
+        else:
+          plog("NOTICE", "Stream "+str(s.strm_id)+" has unknown circuit: "+str(s.circ_id))
+      else:
+        self.streams[s.strm_id].circ = self.streams[s.strm_id].pending_circ
+      self.streams[s.strm_id].pending_circ.pending_streams.remove(self.streams[s.strm_id])
+      self.streams[s.strm_id].pending_circ = None
+      self.streams[s.strm_id].attached_at = s.arrived_at
+    elif s.status == "FAILED" or s.status == "CLOSED":
+      # FIXME stats
+      if s.strm_id not in self.streams:
+        plog("NOTICE", "Failed stream "+str(s.strm_id)+" not found")
+        return
+
+      # XXX: Can happen on timeout
+      if not s.circ_id:
+        if s.reason == "TIMEOUT" or s.reason == "EXITPOLICY":
+          plog("NOTICE", "Stream "+str(s.strm_id)+" "+s.status+" with "+s.reason)
+        else:
+          plog("WARN", "Stream "+str(s.strm_id)+" "+s.status+" from no circuit with reason: "+str(s.reason))
+
+      # We get failed and closed for each stream. OK to return 
+      # and let the closed do the cleanup
+      if s.status == "FAILED":
+        # Avoid busted circuits that will not resolve or carry
+        # traffic. 
+        self.streams[s.strm_id].failed = True
+        if s.circ_id in self.circuits: self.circuits[s.circ_id].dirty = True
+        elif s.circ_id != 0: 
+          plog("WARN", "Failed stream "+str(s.strm_id)+" on unknown circ "+str(s.circ_id))
+        return
+
+      if self.streams[s.strm_id].pending_circ:
+        self.streams[s.strm_id].pending_circ.pending_streams.remove(self.streams[s.strm_id])
+      del self.streams[s.strm_id]
+    elif s.status == "REMAP":
+      if s.strm_id not in self.streams:
+        plog("WARN", "Remap id "+str(s.strm_id)+" not found")
+      else:
+        if not re.match(r"\d+.\d+.\d+.\d+", s.target_host):
+          s.target_host = "255.255.255.255"
+          plog("NOTICE", "Non-IP remap for "+str(s.strm_id)+" to "
+                   + s.target_host)
+        self.streams[s.strm_id].host = s.target_host
+        self.streams[s.strm_id].port = s.target_port
+
+  def stream_bw_event(self, s):
+    output = [str(time.time()-s.arrived_at), s.event_name, str(s.strm_id),
+              str(s.bytes_written),
+              str(s.bytes_read)]
+    if not s.strm_id in self.streams:
+      plog("DEBUG", " ".join(output))
+      plog("WARN", "BW event for unknown stream id: "+str(s.strm_id))
+    else:
+      if not self.streams[s.strm_id].ignored:
+        plog("DEBUG", " ".join(output))
+      self.streams[s.strm_id].bytes_read += s.bytes_read
+      self.streams[s.strm_id].bytes_written += s.bytes_written
+
+  def new_consensus_event(self, n):
+    TorCtl.ConsensusTracker.new_consensus_event(self, n)
+    self.selmgr.new_consensus(self.current_consensus())
+
+  def new_desc_event(self, d):
+    if TorCtl.ConsensusTracker.new_desc_event(self, d):
+      self.selmgr.new_consensus(self.current_consensus())
+
+  def bandwidth_event(self, b): pass # For heartbeat only..
+
+################### CircuitHandler #############################
+
+class CircuitHandler(PathBuilder):
+  """ CircuitHandler that extends from PathBuilder to handle multiple
+      circuits as opposed to just one. """
+  def __init__(self, c, selmgr, num_circuits, RouterClass):
+    """Constructor. 'c' is a Connection, 'selmgr' is a SelectionManager,
+    'num_circuits' is the number of circuits to keep in the pool,
+    and 'RouterClass' is a class that inherits from Router and is used
+    to create annotated Routers."""
+    PathBuilder.__init__(self, c, selmgr, RouterClass)
+    # Set handler to the connection here to 
+    # not miss any circuit events on startup
+    c.set_event_handler(self)
+    self.num_circuits = num_circuits    # Size of the circuit pool
+    self.check_circuit_pool()           # Bring up the pool of circs
+    
+  def check_circuit_pool(self):
+    """ Init or check the status of the circuit-pool """
+    # Get current number of circuits
+    n = len(self.circuits.values())
+    i = self.num_circuits-n
+    if i > 0:
+      plog("INFO", "Checked pool of circuits: we need to build " + 
+         str(i) + " circuits")
+    # Schedule (num_circs-n) circuit-buildups
+    while (n < self.num_circuits):      
+      # TODO: Should mimic Tor's learning here
+      self.build_circuit("255.255.255.255", 80) 
+      plog("DEBUG", "Scheduled circuit No. " + str(n+1))
+      n += 1
+
+  def build_circuit(self, host, port):
+    """ Build a circuit """
+    circ = None
+    while circ == None:
+      try:
+        self.selmgr.set_target(host, port)
+        circ = self.c.build_circuit(self.selmgr.select_path())
+        self.circuits[circ.circ_id] = circ
+        return circ
+      except RestrictionError, e:
+        # XXX: Dress this up a bit
+        traceback.print_exc()
+        plog("ERROR", "Impossible restrictions: "+str(e))
+      except TorCtl.ErrorReply, e:
+        traceback.print_exc()
+        plog("WARN", "Error building circuit: " + str(e.args))
+
+  def circ_status_event(self, c):
+    """ Handle circuit status events """
+    output = [c.event_name, str(c.circ_id), c.status]
+    if c.path: output.append(",".join(c.path))
+    if c.reason: output.append("REASON=" + c.reason)
+    if c.remote_reason: output.append("REMOTE_REASON=" + c.remote_reason)
+    plog("DEBUG", " ".join(output))
+    
+    # Circuits we don't control get built by Tor
+    if c.circ_id not in self.circuits:
+      plog("DEBUG", "Ignoring circuit " + str(c.circ_id) + 
+         " (controlled by Tor)")
+      return
+    
+    # EXTENDED
+    if c.status == "EXTENDED":
+      # Compute elapsed time
+      extend_time = c.arrived_at-self.circuits[c.circ_id].last_extended_at
+      self.circuits[c.circ_id].extend_times.append(extend_time)
+      plog("INFO", "Circuit " + str(c.circ_id) + " extended in " + 
+         str(extend_time) + " sec")
+      self.circuits[c.circ_id].last_extended_at = c.arrived_at
+    
+    # FAILED & CLOSED
+    elif c.status == "FAILED" or c.status == "CLOSED":
+      PathBuilder.circ_status_event(self, c)
+      # Check if there are enough circs
+      self.check_circuit_pool()
+      return
+    # BUILT
+    elif c.status == "BUILT":
+      PathBuilder.circ_status_event(self, c)
+      # Compute duration by summing up extend_times
+      circ = self.circuits[c.circ_id]
+      duration = reduce(lambda x, y: x+y, circ.extend_times, 0.0)
+      plog("INFO", "Circuit " + str(c.circ_id) + " needed " + 
+         str(duration) + " seconds to be built")
+      # Save the duration to the circuit for later use
+      circ.setup_duration = duration
+      
+    # OTHER?
+    else:
+      # If this was e.g. a LAUNCHED
+      pass
+
+################### StreamHandler ##############################
+
+class StreamHandler(CircuitHandler):
+  """ StreamHandler that extends from the CircuitHandler 
+      to handle attaching streams to an appropriate circuit 
+      in the pool. """
+  def __init__(self, c, selmgr, num_circs, RouterClass):
+    CircuitHandler.__init__(self, c, selmgr, num_circs, RouterClass)
+
+  def clear_dns_cache(self):
+    """ Send signal CLEARDNSCACHE """
+    lines = self.c.sendAndRecv("SIGNAL CLEARDNSCACHE\r\n")
+    for _, msg, more in lines:
+      plog("DEBUG", "CLEARDNSCACHE: " + msg)
+
+  def close_stream(self, id, reason):
+    """ Close a stream with given id and reason """
+    self.c.close_stream(id, reason)
+
+  def address_mapped_event(self, event):
+    """ It is necessary to listen to ADDRMAP events to be able to 
+        perform DNS lookups using Tor """
+    output = [event.event_name, event.from_addr, event.to_addr, 
+       time.asctime(event.when)]
+    plog("DEBUG", " ".join(output))
+
+  def unknown_event(self, event):
+    plog("DEBUG", "UNKNOWN EVENT '" + event.event_name + "':" + 
+       event.event_string)
+
+########################## Unit tests ##########################
+
+def do_gen_unit(gen, r_list, weight_bw, num_print):
+  trials = 0
+  for r in r_list:
+    if gen.rstr_list.r_is_ok(r):
+      trials += weight_bw(gen, r)
+  trials = int(trials/1024)
+  
+  print "Running "+str(trials)+" trials"
+
+  # 0. Reset r.chosen = 0 for all routers
+  for r in r_list:
+    r.chosen = 0
+
+  # 1. Generate 'trials' choices:
+  #    1a. r.chosen++
+
+  loglevel = TorUtil.loglevel
+  TorUtil.loglevel = "INFO"
+
+  gen.rewind()
+  rtrs = gen.generate()
+  for i in xrange(1, trials):
+    r = rtrs.next()
+    r.chosen += 1
+
+  TorUtil.loglevel = loglevel
+
+  # 2. Print top num_print routers choices+bandwidth stats+flags
+  i = 0
+  copy_rlist = copy.copy(r_list)
+  copy_rlist.sort(lambda x, y: cmp(y.chosen, x.chosen))
+  for r in copy_rlist:
+    if r.chosen and not gen.rstr_list.r_is_ok(r):
+      print "WARN: Restriction fail at "+r.idhex
+    if not r.chosen and gen.rstr_list.r_is_ok(r):
+      print "WARN: Generation fail at "+r.idhex
+    if not gen.rstr_list.r_is_ok(r): continue
+    flag = ""
+    bw = int(weight_bw(gen, r))
+    if "Exit" in r.flags:
+      flag += "E"
+    if "Guard" in r.flags:
+      flag += "G"
+    print str(r.list_rank)+". "+r.nickname+" "+str(r.bw/1024.0)+"/"+str(bw/1024.0)+": "+str(r.chosen)+", "+flag
+    i += 1
+    if i > num_print: break
+
+def do_unit(rst, r_list, plamb):
+  print "\n"
+  print "-----------------------------------"
+  print rst.r_is_ok.im_class
+  above_i = 0
+  above_bw = 0
+  below_i = 0
+  below_bw = 0
+  for r in r_list:
+    if rst.r_is_ok(r):
+      print r.nickname+" "+plamb(r)+"="+str(rst.r_is_ok(r))+" "+str(r.bw)
+      if r.bw > 400000:
+        above_i = above_i + 1
+        above_bw += r.bw
+      else:
+        below_i = below_i + 1
+        below_bw += r.bw
+        
+  print "Routers above: " + str(above_i) + " bw: " + str(above_bw)
+  print "Routers below: " + str(below_i) + " bw: " + str(below_bw)
+
+# TODO: Tests:
+#  - Test each NodeRestriction and print in/out lines for it
+#  - Test NodeGenerator and reapply NodeRestrictions
+#  - Same for PathSelector and PathRestrictions
+#  - Also Reapply each restriction by hand to path. Verify returns true
+
+if __name__ == '__main__':
+  s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+  s.connect((TorUtil.control_host,TorUtil.control_port))
+  c = Connection(s)
+  c.debug(file("control.log", "w"))
+  c.authenticate(TorUtil.control_pass)
+  nslist = c.get_network_status()
+  sorted_rlist = c.read_routers(c.get_network_status())
+
+  sorted_rlist.sort(lambda x, y: cmp(y.bw, x.bw))
+  for i in xrange(len(sorted_rlist)): sorted_rlist[i].list_rank = i
+
+  def flag_weighting(bwgen, r):
+    bw = r.bw
+    if "Exit" in r.flags:
+      bw *= bwgen.exit_weight
+    if "Guard" in r.flags:
+      bw *= bwgen.guard_weight
+    return bw
+
+  def uniform_weighting(bwgen, r):
+    return 10240000
+
+  # XXX: Test OrderedexitGenerators
+  do_gen_unit(
+   UniformGenerator(sorted_rlist,
+                    NodeRestrictionList([PercentileRestriction(20,30,sorted_rlist),
+FlagsRestriction(["Valid"])])),
+                    sorted_rlist, uniform_weighting, 1500)
+
+  
+  do_gen_unit(BwWeightedGenerator(sorted_rlist, FlagsRestriction(["Exit"]),
+                                  3, exit=True),
+              sorted_rlist, flag_weighting, 500)
+  
+  do_gen_unit(BwWeightedGenerator(sorted_rlist, FlagsRestriction(["Guard"]),
+              3, guard=True),
+              sorted_rlist, flag_weighting, 500)
+  
+  do_gen_unit(
+   BwWeightedGenerator(sorted_rlist, FlagsRestriction(["Valid"]), 3),
+   sorted_rlist, flag_weighting, 500)
+
+ 
+  for r in sorted_rlist:
+    if r.will_exit_to("211.11.21.22", 465):
+      print r.nickname+" "+str(r.bw)
+
+  do_unit(FlagsRestriction(["Guard"], []), sorted_rlist, lambda r: " ".join(r.flags))
+  do_unit(FlagsRestriction(["Fast"], []), sorted_rlist, lambda r: " ".join(r.flags))
+
+  do_unit(ExitPolicyRestriction("2.11.2.2", 80), sorted_rlist,
+          lambda r: "exits to 80")
+  do_unit(PercentileRestriction(0, 100, sorted_rlist), sorted_rlist,
+          lambda r: "")
+  do_unit(PercentileRestriction(10, 20, sorted_rlist), sorted_rlist,
+          lambda r: "")
+  do_unit(OSRestriction([r"[lL]inux", r"BSD", "Darwin"], []), sorted_rlist,
+          lambda r: r.os)
+  do_unit(OSRestriction([], ["Windows", "Solaris"]), sorted_rlist,
+          lambda r: r.os)
+   
+  do_unit(VersionRangeRestriction("0.1.2.0"), sorted_rlist,
+          lambda r: str(r.version))
+  do_unit(VersionRangeRestriction("0.1.2.0", "0.1.2.5"), sorted_rlist,
+          lambda r: str(r.version))
+  do_unit(VersionIncludeRestriction(["0.1.1.26-alpha", "0.1.2.7-ignored"]),
+          sorted_rlist, lambda r: str(r.version))
+  do_unit(VersionExcludeRestriction(["0.1.1.26"]), sorted_rlist,
+          lambda r: str(r.version))
+
+  do_unit(ConserveExitsRestriction(), sorted_rlist, lambda r: " ".join(r.flags))
+  do_unit(FlagsRestriction([], ["Valid"]), sorted_rlist, lambda r: " ".join(r.flags))
+
+  do_unit(IdHexRestriction("$FFCB46DB1339DA84674C70D7CB586434C4370441"),
+          sorted_rlist, lambda r: r.idhex)
+
+  rl =  [AtLeastNNodeRestriction([ExitPolicyRestriction("255.255.255.255", 80), ExitPolicyRestriction("255.255.255.255", 443), ExitPolicyRestriction("255.255.255.255", 6667)], 2), FlagsRestriction([], ["BadExit"])]
+
+  exit_rstr = NodeRestrictionList(rl)
+
+  ug = UniformGenerator(sorted_rlist, exit_rstr)
+
+  ug.rewind()
+  rlist = []
+  for r in ug.generate():
+    print "Checking: " + r.nickname
+    for rs in rl:
+      if not rs.r_is_ok(r):
+        raise PathError()
+    if not "Exit" in r.flags:
+      print "No exit in flags of "+r.idhex
+      for e in r.exitpolicy:
+        print " "+str(e)
+      print " 80: "+str(r.will_exit_to("255.255.255.255", 80))
+      print " 443: "+str(r.will_exit_to("255.255.255.255", 443))
+      print " 6667: "+str(r.will_exit_to("255.255.255.255", 6667))
+
+    ug.mark_chosen(r)
+    rlist.append(r)
+  for r in sorted_rlist:
+    if "Exit" in r.flags and not r in rlist:
+      print r.idhex+" is an exit not in rl!"
+        

Added: arm/dependencies/TorCtl/README
===================================================================
--- arm/dependencies/TorCtl/README	                        (rev 0)
+++ arm/dependencies/TorCtl/README	2010-08-23 01:13:01 UTC (rev 23018)
@@ -0,0 +1,42 @@
+                         TorCtl Python Bindings
+
+
+TorCtl is a python Tor controller with extensions to support path
+building and various constraints on node and path selection, as well as
+statistics gathering.
+
+Apps can hook into the TorCtl package at whatever level they wish.
+
+The lowest level of interaction is to use the TorCtl module
+(TorCtl/TorCtl.py). Typically this is done by importing TorCtl.TorCtl
+and creating a TorCtl.Connection and extending from TorCtl.EventHandler.
+This class receives Tor controller events packaged into python classes
+from a TorCtl.Connection.
+
+The next level up is to use the TorCtl.PathSupport module. This is done
+by importing TorCtl.PathSupport and instantiating or extending from
+PathSupport.PathBuilder, which itself extends from TorCtl.EventHandler.
+This class handles circuit construction and stream attachment subject to
+policies defined by PathSupport.NodeRestrictor and
+PathSupport.PathRestrictor implementations.
+
+If you are interested in gathering statistics, you can instead
+instantiate or extend from StatsSupport.StatsHandler, which is
+again an event handler with hooks to record statistics on circuit
+creation, stream bandwidth, and circuit failure information.
+
+All of these modules are pydoced. For more detailed information than
+the above overview, you can do:
+
+# pydoc TorCtl
+# pydoc PathSupport
+# pydoc StatsSupport
+
+There is a minimalistic example of usage of the basic TorCtl.Connection
+and TorCtl.EventHandler in run_example() in TorCtl.py in this directory.
+Other components also have unit tests at the end of their source files.
+
+For more extensive examples of the PathSupport and StatsSupport
+interfaces, see the TorFlow project at git url:
+
+git clone git://git.torproject.org/git/torflow.git

Added: arm/dependencies/TorCtl/SQLSupport.py
===================================================================
--- arm/dependencies/TorCtl/SQLSupport.py	                        (rev 0)
+++ arm/dependencies/TorCtl/SQLSupport.py	2010-08-23 01:13:01 UTC (rev 23018)
@@ -0,0 +1,1083 @@
+#!/usr/bin/python
+
+"""
+
+Support classes for statisics gathering in SQL Databases
+
+DOCDOC
+
+"""
+
+import socket
+import sys
+import time
+import datetime
+import math
+
+import PathSupport, TorCtl
+from TorUtil import *
+from PathSupport import *
+from TorUtil import meta_port, meta_host, control_port, control_host, control_pass
+from TorCtl import EVENT_TYPE, EVENT_STATE, TorCtlError
+
+import sqlalchemy
+import sqlalchemy.orm.exc
+from sqlalchemy.orm import scoped_session, sessionmaker, eagerload, lazyload, eagerload_all
+from sqlalchemy import create_engine, and_, or_, not_, func
+from sqlalchemy.sql import func,select
+from sqlalchemy.schema import ThreadLocalMetaData,MetaData
+from elixir import *
+
+# Nodes with a ratio below this value will be removed from consideration
+# for higher-valued nodes
+MIN_RATIO=0.5
+
+NO_FPE=2**-50
+
+#################### Model #######################
+
+# In elixir, the session (DB connection) is a property of the model..
+# There can only be one for all of the listeners below that use it
+# See http://elixir.ematia.de/trac/wiki/Recipes/MultipleDatabases
+OP=None
+tc_metadata = MetaData()
+tc_metadata.echo=True
+tc_session = scoped_session(sessionmaker(autoflush=True))
+
+def setup_db(db_uri, echo=False, drop=False):
+  tc_engine = create_engine(db_uri, echo=echo)
+  tc_metadata.bind = tc_engine
+  tc_metadata.echo = echo
+
+  setup_all()
+  if drop: drop_all()
+  create_all()
+
+  if sqlalchemy.__version__ < "0.5.0":
+    # DIAF SQLAlchemy. A token gesture at backwards compatibility
+    # wouldn't kill you, you know.
+    tc_session.add = tc_session.save_or_update
+
+class Router(Entity):
+  using_options(shortnames=True, order_by='-published', session=tc_session, metadata=tc_metadata)
+  using_mapper_options(save_on_init=False)
+  idhex = Field(CHAR(40), primary_key=True, index=True)
+  orhash = Field(CHAR(27))
+  published = Field(DateTime)
+  nickname = Field(Text)
+
+  os = Field(Text)
+  rate_limited = Field(Boolean)
+  guard = Field(Boolean)
+  exit = Field(Boolean)
+  stable = Field(Boolean)
+  v2dir = Field(Boolean)
+  v3dir = Field(Boolean)
+  hsdir = Field(Boolean)
+
+  bw = Field(Integer)
+  version = Field(Integer)
+  # FIXME: is mutable=False what we want? Do we care?
+  #router = Field(PickleType(mutable=False)) 
+  circuits = ManyToMany('Circuit')
+  streams = ManyToMany('Stream')
+  detached_streams = ManyToMany('Stream')
+  bw_history = OneToMany('BwHistory')
+  stats = OneToOne('RouterStats', inverse="router")
+
+  def from_router(self, router):
+    self.published = router.published
+    self.bw = router.bw
+    self.idhex = router.idhex
+    self.orhash = router.orhash
+    self.nickname = router.nickname
+    # XXX: Temporary hack. router.os can contain unicode, which makes
+    # us barf. Apparently 'Text' types can't have unicode chars?
+    # self.os = router.os
+    self.rate_limited = router.rate_limited
+    self.guard = "Guard" in router.flags
+    self.exit = "Exit" in router.flags
+    self.stable = "Stable" in router.flags
+    self.v2dir = "V2Dir" in router.flags
+    self.v3dir = "V3Dir" in router.flags
+    self.hsdir = "HSDir" in router.flags
+    self.version = router.version.version
+    #self.router = router
+    return self
+
+class BwHistory(Entity):
+  using_options(shortnames=True, session=tc_session, metadata=tc_metadata)
+  using_mapper_options(save_on_init=False)
+  router = ManyToOne('Router')
+  bw = Field(Integer)
+  desc_bw = Field(Integer)
+  rank = Field(Integer)
+  pub_time = Field(DateTime)
+
+class Circuit(Entity):
+  using_options(shortnames=True, order_by='-launch_time', session=tc_session, metadata=tc_metadata)
+  using_mapper_options(save_on_init=False)
+  routers = ManyToMany('Router')
+  streams = OneToMany('Stream', inverse='circuit')
+  detached_streams = ManyToMany('Stream', inverse='detached_circuits')
+  extensions = OneToMany('Extension', inverse='circ')
+  circ_id = Field(Integer, index=True)
+  launch_time = Field(Float)
+  last_extend = Field(Float)
+
+class FailedCircuit(Circuit):
+  using_mapper_options(save_on_init=False)
+  using_options(shortnames=True, session=tc_session, metadata=tc_metadata)
+  #failed_extend = ManyToOne('Extension', inverse='circ')
+  fail_reason = Field(Text)
+  fail_time = Field(Float)
+
+class BuiltCircuit(Circuit):
+  using_options(shortnames=True, session=tc_session, metadata=tc_metadata)
+  using_mapper_options(save_on_init=False)
+  built_time = Field(Float)
+  tot_delta = Field(Float)
+
+class DestroyedCircuit(Circuit):
+  using_options(shortnames=True, session=tc_session, metadata=tc_metadata)
+  using_mapper_options(save_on_init=False)
+  destroy_reason = Field(Text)
+  destroy_time = Field(Float)
+
+class ClosedCircuit(BuiltCircuit):
+  using_options(shortnames=True, session=tc_session, metadata=tc_metadata)
+  using_mapper_options(save_on_init=False)
+  closed_time = Field(Float)
+
+class Extension(Entity):
+  using_mapper_options(save_on_init=False)
+  using_options(shortnames=True, order_by='-time', session=tc_session, metadata=tc_metadata)
+  circ = ManyToOne('Circuit', inverse='extensions')
+  from_node = ManyToOne('Router')
+  to_node = ManyToOne('Router')
+  hop = Field(Integer)
+  time = Field(Float)
+  delta = Field(Float)
+
+class FailedExtension(Extension):
+  using_options(shortnames=True, session=tc_session, metadata=tc_metadata)
+  #failed_circ = ManyToOne('FailedCircuit', inverse='failed_extend')
+  using_mapper_options(save_on_init=False)
+  reason = Field(Text)
+
+class Stream(Entity):
+  using_options(shortnames=True, session=tc_session, metadata=tc_metadata)
+  using_options(shortnames=True, order_by='-start_time')
+  using_mapper_options(save_on_init=False)
+  tgt_host = Field(Text)
+  tgt_port = Field(Integer)
+  circuit = ManyToOne('Circuit', inverse='streams')
+  detached_circuits = ManyToMany('Circuit', inverse='detatched_streams')
+  ignored = Field(Boolean) # Directory streams
+  strm_id = Field(Integer, index=True)
+  start_time = Field(Float)
+  tot_read_bytes = Field(Integer)
+  tot_write_bytes = Field(Integer)
+  init_status = Field(Text)
+  close_reason = Field(Text) # Shared by Failed and Closed. Unused here.
+
+class FailedStream(Stream):
+  using_options(shortnames=True, session=tc_session, metadata=tc_metadata)
+  using_mapper_options(save_on_init=False)
+  fail_reason = Field(Text)
+  fail_time = Field(Float)
+
+class ClosedStream(Stream):
+  using_options(shortnames=True, session=tc_session, metadata=tc_metadata)
+  using_mapper_options(save_on_init=False)
+  end_time = Field(Float)
+  read_bandwidth = Field(Float)
+  write_bandwidth = Field(Float)
+
+  def tot_bytes(self):
+    return self.tot_read_bytes
+    #return self.tot_read_bytes+self.tot_write_bytes
+
+  def bandwidth(self):
+    return self.tot_bandwidth()
+
+  def tot_bandwidth(self):
+    #return self.read_bandwidth+self.write_bandwidth 
+    return self.read_bandwidth
+
+class RouterStats(Entity):
+  using_options(shortnames=True, session=tc_session, metadata=tc_metadata)
+  using_mapper_options(save_on_init=False)
+  router = ManyToOne('Router', inverse="stats")
+   
+  # Easily derived from BwHistory
+  min_rank = Field(Integer)
+  avg_rank = Field(Float)
+  max_rank = Field(Integer)
+  avg_bw = Field(Float)
+  avg_desc_bw = Field(Float)
+
+  percentile = Field(Float)
+
+  # These can be derived with a single query over 
+  # FailedExtension and Extension
+  circ_fail_to = Field(Float) 
+  circ_fail_from = Field(Float)
+  circ_try_to = Field(Float)
+  circ_try_from = Field(Float)
+
+  circ_from_rate = Field(Float)
+  circ_to_rate = Field(Float)
+  circ_bi_rate = Field(Float)
+
+  circ_to_ratio = Field(Float)
+  circ_from_ratio = Field(Float)
+  circ_bi_ratio = Field(Float)
+
+  avg_first_ext = Field(Float)
+  ext_ratio = Field(Float)
+ 
+  strm_try = Field(Integer)
+  strm_closed = Field(Integer)
+
+  sbw = Field(Float)
+  sbw_dev = Field(Float)
+  sbw_ratio = Field(Float)
+  filt_sbw = Field(Float)
+  filt_sbw_ratio = Field(Float)
+
+  def _compute_stats_relation(stats_clause):
+    for rs in RouterStats.query.\
+                   filter(stats_clause).\
+                   options(eagerload_all('router.circuits.extensions')).\
+                   all():
+      rs.circ_fail_to = 0
+      rs.circ_try_to = 0
+      rs.circ_fail_from = 0
+      rs.circ_try_from = 0
+      tot_extend_time = 0
+      tot_extends = 0
+      for c in rs.router.circuits: 
+        for e in c.extensions: 
+          if e.to_node == r:
+            rs.circ_try_to += 1
+            if isinstance(e, FailedExtension):
+              rs.circ_fail_to += 1
+            elif e.hop == 0:
+              tot_extend_time += e.delta
+              tot_extends += 1
+          elif e.from_node == r:
+            rs.circ_try_from += 1
+            if isinstance(e, FailedExtension):
+              rs.circ_fail_from += 1
+            
+        if isinstance(c, FailedCircuit):
+          pass # TODO: Also count timeouts against earlier nodes?
+        elif isinstance(c, DestroyedCircuit):
+          pass # TODO: Count these somehow..
+
+      if tot_extends > 0: rs.avg_first_ext = (1.0*tot_extend_time)/tot_extends
+      else: rs.avg_first_ext = 0
+      if rs.circ_try_from > 0:
+        rs.circ_from_rate = (1.0*rs.circ_fail_from/rs.circ_try_from)
+      if rs.circ_try_to > 0:
+        rs.circ_to_rate = (1.0*rs.circ_fail_to/rs.circ_try_to)
+      if rs.circ_try_to+rs.circ_try_from > 0:
+        rs.circ_bi_rate = (1.0*rs.circ_fail_to+rs.circ_fail_from)/(rs.circ_try_to+rs.circ_try_from)
+
+      tc_session.add(rs)
+    tc_session.commit()
+  _compute_stats_relation = Callable(_compute_stats_relation)
+
+  def _compute_stats_query(stats_clause):
+    tc_session.clear()
+    # http://www.sqlalchemy.org/docs/04/sqlexpression.html#sql_update
+    to_s = select([func.count(Extension.id)], 
+        and_(stats_clause, Extension.table.c.to_node_idhex
+             == RouterStats.table.c.router_idhex)).as_scalar()
+    from_s = select([func.count(Extension.id)], 
+        and_(stats_clause, Extension.table.c.from_node_idhex
+             == RouterStats.table.c.router_idhex)).as_scalar()
+    f_to_s = select([func.count(FailedExtension.id)], 
+        and_(stats_clause, FailedExtension.table.c.to_node_idhex
+             == RouterStats.table.c.router_idhex,
+             FailedExtension.table.c.row_type=='failedextension')).as_scalar()
+    f_from_s = select([func.count(FailedExtension.id)], 
+        and_(stats_clause, FailedExtension.table.c.from_node_idhex
+                       == RouterStats.table.c.router_idhex,
+             FailedExtension.table.c.row_type=='failedextension')).as_scalar()
+    avg_ext = select([func.avg(Extension.delta)], 
+        and_(stats_clause,
+             Extension.table.c.to_node_idhex==RouterStats.table.c.router_idhex,
+             Extension.table.c.hop==0, 
+             Extension.table.c.row_type=='extension')).as_scalar()
+
+    RouterStats.table.update(stats_clause, values=
+      {RouterStats.table.c.circ_try_to:to_s,
+       RouterStats.table.c.circ_try_from:from_s,
+       RouterStats.table.c.circ_fail_to:f_to_s,
+       RouterStats.table.c.circ_fail_from:f_from_s,
+       RouterStats.table.c.avg_first_ext:avg_ext}).execute()
+
+    RouterStats.table.update(stats_clause, values=
+      {RouterStats.table.c.circ_from_rate:
+         RouterStats.table.c.circ_fail_from/RouterStats.table.c.circ_try_from,
+       RouterStats.table.c.circ_to_rate:
+          RouterStats.table.c.circ_fail_to/RouterStats.table.c.circ_try_to,
+       RouterStats.table.c.circ_bi_rate:
+         (RouterStats.table.c.circ_fail_to+RouterStats.table.c.circ_fail_from)
+                          /
+      (RouterStats.table.c.circ_try_to+RouterStats.table.c.circ_try_from)}).execute()
+
+
+    # TODO: Give the streams relation table a sane name and reduce this too
+    for rs in RouterStats.query.filter(stats_clause).\
+                        options(eagerload('router'),
+                                eagerload('router.detached_streams'),
+                                eagerload('router.streams')).all():
+      tot_bw = 0.0
+      s_cnt = 0
+      tot_bytes = 0.0
+      tot_duration = 0.0
+      for s in rs.router.streams:
+        if isinstance(s, ClosedStream):
+          tot_bytes += s.tot_bytes()
+          tot_duration += s.end_time - s.start_time
+          tot_bw += s.bandwidth()
+          s_cnt += 1
+      # FIXME: Hrmm.. do we want to do weighted avg or pure avg here?
+      # If files are all the same size, it shouldn't matter..
+      if s_cnt > 0:
+        rs.sbw = tot_bw/s_cnt
+      else: rs.sbw = None
+      rs.strm_closed = s_cnt
+      rs.strm_try = len(rs.router.streams)+len(rs.router.detached_streams)
+      if rs.sbw:
+        tot_var = 0.0
+        for s in rs.router.streams:
+          if isinstance(s, ClosedStream):
+            tot_var += (s.bandwidth()-rs.sbw)*(s.bandwidth()-rs.sbw)
+        tot_var /= s_cnt
+        rs.sbw_dev = math.sqrt(tot_var)
+      tc_session.add(rs)
+    tc_session.commit()
+  _compute_stats_query = Callable(_compute_stats_query)
+
+  def _compute_stats(stats_clause):
+    RouterStats._compute_stats_query(stats_clause)
+    #RouterStats._compute_stats_relation(stats_clause)
+  _compute_stats = Callable(_compute_stats)
+
+  def _compute_ranks():
+    tc_session.clear()
+    min_r = select([func.min(BwHistory.rank)],
+        BwHistory.table.c.router_idhex
+            == RouterStats.table.c.router_idhex).as_scalar()
+    avg_r = select([func.avg(BwHistory.rank)],
+        BwHistory.table.c.router_idhex
+            == RouterStats.table.c.router_idhex).as_scalar()
+    max_r = select([func.max(BwHistory.rank)],
+        BwHistory.table.c.router_idhex
+            == RouterStats.table.c.router_idhex).as_scalar()
+    avg_bw = select([func.avg(BwHistory.bw)],
+        BwHistory.table.c.router_idhex
+            == RouterStats.table.c.router_idhex).as_scalar()
+    avg_desc_bw = select([func.avg(BwHistory.desc_bw)],
+        BwHistory.table.c.router_idhex
+            == RouterStats.table.c.router_idhex).as_scalar()
+
+    RouterStats.table.update(values=
+       {RouterStats.table.c.min_rank:min_r,
+        RouterStats.table.c.avg_rank:avg_r,
+        RouterStats.table.c.max_rank:max_r,
+        RouterStats.table.c.avg_bw:avg_bw,
+        RouterStats.table.c.avg_desc_bw:avg_desc_bw}).execute()
+
+    #min_avg_rank = select([func.min(RouterStats.avg_rank)]).as_scalar()
+    max_avg_rank = select([func.max(RouterStats.avg_rank)]).as_scalar()
+
+    RouterStats.table.update(values=
+       {RouterStats.table.c.percentile:
+            (100.0*RouterStats.table.c.avg_rank)/max_avg_rank}).execute()
+    tc_session.commit()
+  _compute_ranks = Callable(_compute_ranks)
+
+  def _compute_ratios(stats_clause):
+    tc_session.clear()
+    avg_from_rate = select([func.avg(RouterStats.circ_from_rate)],
+                           stats_clause).as_scalar()
+    avg_to_rate = select([func.avg(RouterStats.circ_to_rate)],
+                           stats_clause).as_scalar()
+    avg_bi_rate = select([func.avg(RouterStats.circ_bi_rate)],
+                           stats_clause).as_scalar()
+    avg_ext = select([func.avg(RouterStats.avg_first_ext)],
+                           stats_clause).as_scalar()
+    avg_sbw = select([func.avg(RouterStats.sbw)],
+                           stats_clause).as_scalar()
+
+    RouterStats.table.update(stats_clause, values=
+       {RouterStats.table.c.circ_from_ratio:
+         (1-RouterStats.table.c.circ_from_rate)/(1-avg_from_rate),
+        RouterStats.table.c.circ_to_ratio:
+         (1-RouterStats.table.c.circ_to_rate)/(1-avg_to_rate),
+        RouterStats.table.c.circ_bi_ratio:
+         (1-RouterStats.table.c.circ_bi_rate)/(1-avg_bi_rate),
+        RouterStats.table.c.ext_ratio:
+         avg_ext/RouterStats.table.c.avg_first_ext,
+        RouterStats.table.c.sbw_ratio:
+         RouterStats.table.c.sbw/avg_sbw}).execute()
+    tc_session.commit()
+  _compute_ratios = Callable(_compute_ratios)
+
+  def _compute_filtered_relational(min_ratio, stats_clause, filter_clause):
+    badrouters = RouterStats.query.filter(stats_clause).filter(filter_clause).\
+                   filter(RouterStats.sbw_ratio < min_ratio).all()
+
+    # TODO: Turn this into a single query....
+    for rs in RouterStats.query.filter(stats_clause).\
+          options(eagerload_all('router.streams.circuit.routers')).all():
+      tot_sbw = 0
+      sbw_cnt = 0
+      for s in rs.router.streams:
+        if isinstance(s, ClosedStream):
+          skip = False
+          #for br in badrouters:
+          #  if br != rs:
+          #    if br.router in s.circuit.routers:
+          #      skip = True
+          if not skip:
+            # Throw out outliers < mean 
+            # (too much variance for stddev to filter much)
+            if rs.strm_closed == 1 or s.bandwidth() >= rs.sbw:
+              tot_sbw += s.bandwidth()
+              sbw_cnt += 1
+
+      if sbw_cnt: rs.filt_sbw = tot_sbw/sbw_cnt
+      else: rs.filt_sbw = None
+      tc_session.add(rs)
+    if sqlalchemy.__version__ < "0.5.0":
+      avg_sbw = RouterStats.query.filter(stats_clause).avg(RouterStats.filt_sbw)
+    else:
+      avg_sbw = tc_session.query(func.avg(RouterStats.filt_sbw)).filter(stats_clause).scalar()
+    for rs in RouterStats.query.filter(stats_clause).all():
+      if type(rs.filt_sbw) == float and avg_sbw:
+        rs.filt_sbw_ratio = rs.filt_sbw/avg_sbw
+      else:
+        rs.filt_sbw_ratio = None
+      tc_session.add(rs)
+    tc_session.commit()
+  _compute_filtered_relational = Callable(_compute_filtered_relational)
+
+  def _compute_filtered_ratios(min_ratio, stats_clause, filter_clause):
+    RouterStats._compute_filtered_relational(min_ratio, stats_clause, 
+                                             filter_clause)
+    #RouterStats._compute_filtered_query(filter,min_ratio)
+  _compute_filtered_ratios = Callable(_compute_filtered_ratios)
+
+  def reset():
+    tc_session.clear()
+    RouterStats.table.drop()
+    RouterStats.table.create()
+    for r in Router.query.all():
+      rs = RouterStats()
+      rs.router = r
+      r.stats = rs
+      tc_session.add(r)
+    tc_session.commit()
+  reset = Callable(reset)
+
+  def compute(pct_low=0, pct_high=100, stat_clause=None, filter_clause=None):
+    pct_clause = and_(RouterStats.percentile >= pct_low, 
+                         RouterStats.percentile < pct_high)
+    if stat_clause:
+      stat_clause = and_(pct_clause, stat_clause)
+    else:
+      stat_clause = pct_clause
+     
+    RouterStats.reset()
+    RouterStats._compute_ranks() # No filters. Ranks are independent
+    RouterStats._compute_stats(stat_clause)
+    RouterStats._compute_ratios(stat_clause)
+    RouterStats._compute_filtered_ratios(MIN_RATIO, stat_clause, filter_clause)
+    tc_session.commit()
+  compute = Callable(compute)  
+
+  def write_stats(f, pct_low=0, pct_high=100, order_by=None, recompute=False, stat_clause=None, filter_clause=None, disp_clause=None):
+
+    if not order_by:
+      order_by=RouterStats.avg_first_ext
+
+    if recompute:
+      RouterStats.compute(pct_low, pct_high, stat_clause, filter_clause)
+
+    pct_clause = and_(RouterStats.percentile >= pct_low, 
+                         RouterStats.percentile < pct_high)
+
+    # This is Fail City and sqlalchemy is running for mayor.
+    if sqlalchemy.__version__ < "0.5.0":
+      circ_from_rate = RouterStats.query.filter(pct_clause).filter(stat_clause).avg(RouterStats.circ_from_rate)
+      circ_to_rate = RouterStats.query.filter(pct_clause).filter(stat_clause).avg(RouterStats.circ_to_rate)
+      circ_bi_rate = RouterStats.query.filter(pct_clause).filter(stat_clause).avg(RouterStats.circ_bi_rate)
+
+      avg_first_ext = RouterStats.query.filter(pct_clause).filter(stat_clause).avg(RouterStats.avg_first_ext)
+      sbw = RouterStats.query.filter(pct_clause).filter(stat_clause).avg(RouterStats.sbw)
+      filt_sbw = RouterStats.query.filter(pct_clause).filter(stat_clause).avg(RouterStats.filt_sbw)
+      percentile = RouterStats.query.filter(pct_clause).filter(stat_clause).avg(RouterStats.percentile)
+    else:
+      circ_from_rate = tc_session.query(func.avg(RouterStats.circ_from_rate)).filter(pct_clause).filter(stat_clause).scalar()
+      circ_to_rate = tc_session.query(func.avg(RouterStats.circ_to_rate)).filter(pct_clause).filter(stat_clause).scalar()
+      circ_bi_rate = tc_session.query(func.avg(RouterStats.circ_bi_rate)).filter(pct_clause).filter(stat_clause).scalar()
+      
+      avg_first_ext = tc_session.query(func.avg(RouterStats.avg_first_ext)).filter(pct_clause).filter(stat_clause).scalar()
+      sbw = tc_session.query(func.avg(RouterStats.sbw)).filter(pct_clause).filter(stat_clause).scalar()
+      filt_sbw = tc_session.query(func.avg(RouterStats.filt_sbw)).filter(pct_clause).filter(stat_clause).scalar()
+      percentile = tc_session.query(func.avg(RouterStats.percentile)).filter(pct_clause).filter(stat_clause).scalar()
+
+    def cvt(a,b,c=1):
+      if type(a) == float: return round(a/c,b)
+      elif type(a) == int: return a
+      elif type(a) == type(None): return "None"
+      else: return type(a)
+
+    sql_key = """SQLSupport Statistics:
+    CF=Circ From Rate          CT=Circ To Rate      CB=Circ To/From Rate
+    CE=Avg 1st Ext time (s)    SB=Avg Stream BW     FB=Filtered stream bw
+    SD=Strm BW stddev          CC=Circ To Attempts  ST=Strem attempts
+    SC=Streams Closed OK       RF=Circ From Ratio   RT=Circ To Ratio     
+    RB=Circ To/From Ratio      RE=1st Ext Ratio     RS=Stream BW Ratio   
+    RF=Filt Stream Ratio       PR=Percentile Rank\n\n"""
+ 
+    f.write(sql_key)
+    f.write("Average Statistics:\n")
+    f.write("   CF="+str(cvt(circ_from_rate,2)))
+    f.write("  CT="+str(cvt(circ_to_rate,2)))
+    f.write("  CB="+str(cvt(circ_bi_rate,2)))
+    f.write("  CE="+str(cvt(avg_first_ext,2)))
+    f.write("  SB="+str(cvt(sbw,2,1024)))
+    f.write("  FB="+str(cvt(filt_sbw,2,1024)))
+    f.write("  PR="+str(cvt(percentile,2))+"\n\n\n")
+
+    for s in RouterStats.query.filter(pct_clause).filter(stat_clause).\
+             filter(disp_clause).order_by(order_by).all():
+      f.write(s.router.idhex+" ("+s.router.nickname+")\n")
+      f.write("   CF="+str(cvt(s.circ_from_rate,2)))
+      f.write("  CT="+str(cvt(s.circ_to_rate,2)))
+      f.write("  CB="+str(cvt(s.circ_bi_rate,2)))
+      f.write("  CE="+str(cvt(s.avg_first_ext,2)))
+      f.write("  SB="+str(cvt(s.sbw,2,1024)))
+      f.write("  FB="+str(cvt(s.filt_sbw,2,1024)))
+      f.write("  SD="+str(cvt(s.sbw_dev,2,1024))+"\n")
+      f.write("   RF="+str(cvt(s.circ_from_ratio,2)))
+      f.write("  RT="+str(cvt(s.circ_to_ratio,2)))
+      f.write("  RB="+str(cvt(s.circ_bi_ratio,2)))
+      f.write("  RE="+str(cvt(s.ext_ratio,2)))
+      f.write("  RS="+str(cvt(s.sbw_ratio,2)))
+      f.write("  RF="+str(cvt(s.filt_sbw_ratio,2)))
+      f.write("  PR="+str(cvt(s.percentile,1))+"\n")
+      f.write("   CC="+str(cvt(s.circ_try_to,1)))
+      f.write("  ST="+str(cvt(s.strm_try, 1)))
+      f.write("  SC="+str(cvt(s.strm_closed, 1))+"\n\n")
+
+    f.flush()
+  write_stats = Callable(write_stats)  
+  
+
+  def write_bws(f, pct_low=0, pct_high=100, order_by=None, recompute=False, stat_clause=None, filter_clause=None, disp_clause=None):
+    if not order_by:
+      order_by=RouterStats.avg_first_ext
+
+    if recompute:
+      RouterStats.compute(pct_low, pct_high, stat_clause, filter_clause)
+
+    pct_clause = and_(RouterStats.percentile >= pct_low, 
+                         RouterStats.percentile < pct_high)
+
+    # This is Fail City and sqlalchemy is running for mayor.
+    if sqlalchemy.__version__ < "0.5.0":
+      sbw = RouterStats.query.filter(pct_clause).filter(stat_clause).avg(RouterStats.sbw)
+      filt_sbw = RouterStats.query.filter(pct_clause).filter(stat_clause).avg(RouterStats.filt_sbw)
+    else:
+      sbw = tc_session.query(func.avg(RouterStats.sbw)).filter(pct_clause).filter(stat_clause).scalar()
+      filt_sbw = tc_session.query(func.avg(RouterStats.filt_sbw)).filter(pct_clause).filter(stat_clause).scalar()
+
+    f.write(str(int(time.time()))+"\n")
+
+    def cvt(a,b,c=1):
+      if type(a) == float: return int(round(a/c,b))
+      elif type(a) == int: return a
+      elif type(a) == type(None): return "None"
+      else: return type(a)
+
+    for s in RouterStats.query.filter(pct_clause).filter(stat_clause).\
+           filter(disp_clause).order_by(order_by).all():
+      f.write("node_id=$"+s.router.idhex+" nick="+s.router.nickname)
+      f.write(" strm_bw="+str(cvt(s.sbw,0)))
+      f.write(" filt_bw="+str(cvt(s.filt_sbw,0)))
+      f.write(" desc_bw="+str(int(cvt(s.avg_desc_bw,0))))
+      f.write(" ns_bw="+str(int(cvt(s.avg_bw,0)))+"\n")
+
+    f.flush()
+  write_bws = Callable(write_bws)  
+    
+
+##################### End Model ####################
+
+#################### Model Support ################
+def reset_all():
+  # Need to keep routers around.. 
+  for r in Router.query.all():
+    # This appears to be needed. the relation tables do not get dropped 
+    # automatically.
+    r.circuits = []
+    r.streams = []
+    r.detached_streams = []
+    r.bw_history = [] 
+    r.stats = None
+    tc_session.add(r)
+
+  tc_session.commit()
+  tc_session.clear()
+
+  BwHistory.table.drop() # Will drop subclasses
+  Extension.table.drop()
+  Stream.table.drop() 
+  Circuit.table.drop()
+  RouterStats.table.drop()
+
+  RouterStats.table.create()
+  BwHistory.table.create() 
+  Extension.table.create()
+  Stream.table.create() 
+  Circuit.table.create()
+
+  tc_session.commit()
+
+  #for r in Router.query.all():
+  #  if len(r.bw_history) or len(r.circuits) or len(r.streams) or r.stats:
+  #    plog("WARN", "Router still has dropped data!")
+
+  plog("NOTICE", "Reset all SQL stats")
+
+##################### End Model Support ####################
+
+class ConsensusTrackerListener(TorCtl.DualEventListener):
+  def __init__(self):
+    TorCtl.DualEventListener.__init__(self)
+    self.last_desc_at = time.time()+60 # Give tor some time to start up
+    self.consensus = None
+    self.wait_for_signal = False
+
+  CONSENSUS_DONE = 0x7fffffff
+
+  # TODO: What about non-running routers and uptime information?
+  def _update_rank_history(self, idlist):
+    plog("INFO", "Consensus change... Updating rank history")
+    for idhex in idlist:
+      if idhex not in self.consensus.routers: continue
+      rc = self.consensus.routers[idhex]
+      if rc.down: continue
+      try:
+        r = Router.query.options(eagerload('bw_history')).filter_by(
+                                    idhex=idhex).with_labels().one()
+        bwh = BwHistory(router=r, rank=rc.list_rank, bw=rc.bw,
+                        desc_bw=rc.desc_bw, pub_time=r.published)
+        r.bw_history.append(bwh)
+        #tc_session.add(bwh)
+        tc_session.add(r)
+      except sqlalchemy.orm.exc.NoResultFound:
+        plog("WARN", "No descriptor found for consenus router "+str(idhex))
+
+    plog("INFO", "Consensus history updated.")
+    tc_session.commit()
+
+  def _update_db(self, idlist):
+    # FIXME: It is tempting to delay this as well, but we need
+    # this info to be present immediately for circuit construction...
+    plog("INFO", "Consensus change... Updating db")
+    for idhex in idlist:
+      if idhex in self.consensus.routers:
+        rc = self.consensus.routers[idhex]
+        r = Router.query.filter_by(idhex=rc.idhex).first()
+        if r and r.orhash == rc.orhash:
+          # We already have it stored. (Possible spurious NEWDESC)
+          continue
+        if not r: r = Router()
+        r.from_router(rc)
+        tc_session.add(r)
+    plog("INFO", "Consensus db updated")
+    tc_session.commit()
+
+  def update_consensus(self):
+    plog("INFO", "Updating DB with full consensus.")
+    self.consensus = self.parent_handler.current_consensus()
+    self._update_db(self.consensus.ns_map.iterkeys())
+
+  def set_parent(self, parent_handler):
+    if not isinstance(parent_handler, TorCtl.ConsensusTracker):
+      raise TorCtlError("ConsensusTrackerListener can only be attached to ConsensusTracker instances")
+    TorCtl.DualEventListener.set_parent(self, parent_handler)
+
+  def heartbeat_event(self, e):
+    # This sketchiness is to ensure we have an accurate history
+    # of each router's rank+bandwidth for the entire duration of the run..
+    if e.state == EVENT_STATE.PRELISTEN:
+      if not self.consensus: 
+        global OP
+        OP = Router.query.filter_by(
+                 idhex="0000000000000000000000000000000000000000").first()
+        if not OP:
+          OP = Router(idhex="0000000000000000000000000000000000000000", 
+                    orhash="000000000000000000000000000", 
+                    nickname="!!TorClient", 
+                    published=datetime.datetime.utcnow())
+          tc_session.add(OP)
+          tc_session.commit()
+        self.update_consensus()
+      # XXX: This hack exists because update_rank_history is expensive.
+      # However, even if we delay it till the end of the consensus update, 
+      # it still delays event processing for up to 30 seconds on a fast 
+      # machine.
+      # 
+      # The correct way to do this is give SQL processing
+      # to a dedicated worker thread that pulls events off of a secondary
+      # queue, that way we don't block stream handling on this processing.
+      # The problem is we are pretty heavily burdened with the need to 
+      # stay in sync with our parent event handler. A queue will break this 
+      # coupling (even if we could get all the locking right).
+      #
+      # A lighterweight hack might be to just make the scanners pause
+      # on a condition used to signal we are doing this (and other) heavy 
+      # lifting. We could have them possibly check self.last_desc_at..
+      if not self.wait_for_signal and e.arrived_at - self.last_desc_at > 60.0:
+        if self.consensus.consensus_count  < 0.95*(len(self.consensus.ns_map)):
+          plog("INFO", "Not enough router descriptors: "
+                       +str(self.consensus.consensus_count)+"/"
+                       +str(len(self.consensus.ns_map)))
+        elif not PathSupport.PathBuilder.is_urgent_event(e):
+          plog("INFO", "Newdesc timer is up. Assuming we have full consensus")
+          self._update_rank_history(self.consensus.ns_map.iterkeys())
+          self.last_desc_at = ConsensusTrackerListener.CONSENSUS_DONE
+
+  def new_consensus_event(self, n):
+    if n.state == EVENT_STATE.POSTLISTEN:
+      self.last_desc_at = n.arrived_at
+      self.update_consensus()
+
+  def new_desc_event(self, d): 
+    if d.state == EVENT_STATE.POSTLISTEN:
+      self.last_desc_at = d.arrived_at
+      self.consensus = self.parent_handler.current_consensus()
+      self._update_db(d.idlist)
+
+class CircuitListener(TorCtl.PreEventListener):
+  def set_parent(self, parent_handler):
+    if not filter(lambda f: f.__class__ == ConsensusTrackerListener, 
+                  parent_handler.post_listeners):
+       raise TorCtlError("CircuitListener needs a ConsensusTrackerListener")
+    TorCtl.PreEventListener.set_parent(self, parent_handler)
+    # TODO: This is really lame. We only know the extendee of a circuit
+    # if we have built the path ourselves. Otherwise, Tor keeps it a
+    # secret from us. This prevents us from properly tracking failures
+    # for normal Tor usage.
+    if isinstance(parent_handler, PathSupport.PathBuilder):
+      self.track_parent = True
+    else:
+      self.track_parent = False
+
+  def circ_status_event(self, c):
+    if self.track_parent and c.circ_id not in self.parent_handler.circuits:
+      return # Ignore circuits that aren't ours
+    # TODO: Hrmm, consider making this sane in TorCtl.
+    if c.reason: lreason = c.reason
+    else: lreason = "NONE"
+    if c.remote_reason: rreason = c.remote_reason
+    else: rreason = "NONE"
+    reason = c.event_name+":"+c.status+":"+lreason+":"+rreason
+
+    output = [str(c.arrived_at), str(time.time()-c.arrived_at), c.event_name, str(c.circ_id), c.status]
+    if c.path: output.append(",".join(c.path))
+    if c.reason: output.append("REASON=" + c.reason)
+    if c.remote_reason: output.append("REMOTE_REASON=" + c.remote_reason)
+    plog("DEBUG", " ".join(output))
+  
+    if c.status == "LAUNCHED":
+      circ = Circuit(circ_id=c.circ_id,launch_time=c.arrived_at,
+                     last_extend=c.arrived_at)
+      if self.track_parent:
+        for r in self.parent_handler.circuits[c.circ_id].path:
+          rq = Router.query.options(eagerload('circuits')).filter_by(
+                                idhex=r.idhex).with_labels().one()
+          circ.routers.append(rq) 
+          #rq.circuits.append(circ) # done automagically?
+          #tc_session.add(rq)
+      tc_session.add(circ)
+      tc_session.commit()
+    elif c.status == "EXTENDED":
+      circ = Circuit.query.options(eagerload('extensions')).filter_by(
+                       circ_id = c.circ_id).first()
+      if not circ: return # Skip circuits from before we came online
+
+      e = Extension(circ=circ, hop=len(c.path)-1, time=c.arrived_at)
+
+      if len(c.path) == 1:
+        e.from_node = OP
+      else:
+        r_ext = c.path[-2]
+        if r_ext[0] != '$': r_ext = self.parent_handler.name_to_key[r_ext]
+        e.from_node = Router.query.filter_by(idhex=r_ext[1:]).one()
+
+      r_ext = c.path[-1]
+      if r_ext[0] != '$': r_ext = self.parent_handler.name_to_key[r_ext]
+
+      e.to_node = Router.query.filter_by(idhex=r_ext[1:]).one()
+      if not self.track_parent:
+        # FIXME: Eager load here?
+        circ.routers.append(e.to_node)
+        e.to_node.circuits.append(circ)
+        tc_session.add(e.to_node)
+ 
+      e.delta = c.arrived_at - circ.last_extend
+      circ.last_extend = c.arrived_at
+      circ.extensions.append(e)
+      tc_session.add(e)
+      tc_session.add(circ)
+      tc_session.commit()
+    elif c.status == "FAILED":
+      circ = Circuit.query.filter_by(circ_id = c.circ_id).first()
+      if not circ: return # Skip circuits from before we came online
+        
+      circ.expunge()
+      if isinstance(circ, BuiltCircuit):
+        # Convert to destroyed circuit
+        Circuit.table.update(Circuit.id ==
+                  circ.id).execute(row_type='destroyedcircuit')
+        circ = DestroyedCircuit.query.filter_by(id=circ.id).one()
+        circ.destroy_reason = reason
+        circ.destroy_time = c.arrived_at
+      else:
+        # Convert to failed circuit
+        Circuit.table.update(Circuit.id ==
+                  circ.id).execute(row_type='failedcircuit')
+        circ = FailedCircuit.query.options(
+                  eagerload('extensions')).filter_by(id=circ.id).one()
+        circ.fail_reason = reason
+        circ.fail_time = c.arrived_at
+        e = FailedExtension(circ=circ, hop=len(c.path), time=c.arrived_at)
+
+        if len(c.path) == 0:
+          e.from_node = OP
+        else:
+          r_ext = c.path[-1]
+          if r_ext[0] != '$': r_ext = self.parent_handler.name_to_key[r_ext]
+ 
+          e.from_node = Router.query.filter_by(idhex=r_ext[1:]).one()
+
+        if self.track_parent:
+          r=self.parent_handler.circuits[c.circ_id].path[len(c.path)]
+          e.to_node = Router.query.filter_by(idhex=r.idhex).one()
+        else:
+          e.to_node = None # We have no idea..
+
+        e.delta = c.arrived_at - circ.last_extend
+        e.reason = reason
+        circ.extensions.append(e)
+        circ.fail_time = c.arrived_at
+        tc_session.add(e)
+
+      tc_session.add(circ)
+      tc_session.commit()
+    elif c.status == "BUILT":
+      circ = Circuit.query.filter_by(
+                     circ_id = c.circ_id).first()
+      if not circ: return # Skip circuits from before we came online
+
+      circ.expunge()
+      # Convert to built circuit
+      Circuit.table.update(Circuit.id ==
+                circ.id).execute(row_type='builtcircuit')
+      circ = BuiltCircuit.query.filter_by(id=circ.id).one()
+      
+      circ.built_time = c.arrived_at
+      circ.tot_delta = c.arrived_at - circ.launch_time
+      tc_session.add(circ)
+      tc_session.commit()
+    elif c.status == "CLOSED":
+      circ = BuiltCircuit.query.filter_by(circ_id = c.circ_id).first()
+      if circ:
+        circ.expunge()
+        if lreason in ("REQUESTED", "FINISHED", "ORIGIN"):
+          # Convert to closed circuit
+          Circuit.table.update(Circuit.id ==
+                    circ.id).execute(row_type='closedcircuit')
+          circ = ClosedCircuit.query.filter_by(id=circ.id).one()
+          circ.closed_time = c.arrived_at
+        else:
+          # Convert to destroyed circuit
+          Circuit.table.update(Circuit.id ==
+                    circ.id).execute(row_type='destroyedcircuit')
+          circ = DestroyedCircuit.query.filter_by(id=circ.id).one()
+          circ.destroy_reason = reason
+          circ.destroy_time = c.arrived_at
+        tc_session.add(circ)
+        tc_session.commit()
+
+class StreamListener(CircuitListener):
+  def stream_bw_event(self, s):
+    strm = Stream.query.filter_by(strm_id = s.strm_id).first()
+    if strm and strm.start_time and strm.start_time < s.arrived_at:
+      plog("DEBUG", "Got stream bw: "+str(s.strm_id))
+      strm.tot_read_bytes += s.bytes_read
+      strm.tot_write_bytes += s.bytes_written
+      tc_session.add(strm)
+      tc_session.commit()
+ 
+  def stream_status_event(self, s):
+    if s.reason: lreason = s.reason
+    else: lreason = "NONE"
+    if s.remote_reason: rreason = s.remote_reason
+    else: rreason = "NONE"
+
+    if s.status in ("NEW", "NEWRESOLVE"):
+      strm = Stream(strm_id=s.strm_id, tgt_host=s.target_host, 
+                    tgt_port=s.target_port, init_status=s.status,
+                    tot_read_bytes=0, tot_write_bytes=0)
+      tc_session.add(strm)
+      tc_session.commit()
+      return
+
+    strm = Stream.query.filter_by(strm_id = s.strm_id).first()
+    if self.track_parent and \
+      (s.strm_id not in self.parent_handler.streams or \
+           self.parent_handler.streams[s.strm_id].ignored):
+      if strm:
+        tc_session.delete(strm)
+        tc_session.commit()
+      return # Ignore streams that aren't ours
+
+    if not strm: 
+      plog("NOTICE", "Ignoring prior stream "+str(s.strm_id))
+      return # Ignore prior streams
+
+    reason = s.event_name+":"+s.status+":"+lreason+":"+rreason+":"+strm.init_status
+
+    if s.status == "SENTCONNECT":
+      # New circuit
+      strm.circuit = Circuit.query.filter_by(circ_id=s.circ_id).first()
+      if not strm.circuit:
+        plog("NOTICE", "Ignoring prior stream "+str(strm.strm_id)+" with old circuit "+str(s.circ_id))
+        tc_session.delete(strm)
+        tc_session.commit()
+        return
+    else:
+      circ = None
+      if s.circ_id:
+        circ = Circuit.query.filter_by(circ_id=s.circ_id).first()
+      elif self.track_parent:
+        circ = self.parent_handler.streams[s.strm_id].circ
+        if not circ: circ = self.parent_handler.streams[s.strm_id].pending_circ
+        if circ:
+          circ = Circuit.query.filter_by(circ_id=circ.circ_id).first()
+
+      if not circ:
+        plog("WARN", "No circuit for "+str(s.strm_id)+" circ: "+str(s.circ_id))
+
+      if not strm.circuit:
+        plog("INFO", "No stream circuit for "+str(s.strm_id)+" circ: "+str(s.circ_id))
+        strm.circuit = circ
+
+      # XXX: Verify circ id matches stream.circ
+    
+    if s.status == "SUCCEEDED":
+      strm.start_time = s.arrived_at
+      for r in strm.circuit.routers: 
+        plog("DEBUG", "Added router "+r.idhex+" to stream "+str(s.strm_id))
+        r.streams.append(strm)
+        tc_session.add(r)
+      tc_session.add(strm)
+      tc_session.commit()
+    elif s.status == "DETACHED":
+      for r in strm.circuit.routers:
+        r.detached_streams.append(strm)
+        tc_session.add(r)
+      #strm.detached_circuits.append(strm.circuit)
+      strm.circuit.detached_streams.append(strm)
+      strm.circuit.streams.remove(strm)
+      strm.circuit = None
+      tc_session.add(strm)
+      tc_session.commit()
+    elif s.status == "FAILED":
+      strm.expunge()
+      # Convert to destroyed circuit
+      Stream.table.update(Stream.id ==
+                strm.id).execute(row_type='failedstream')
+      strm = FailedStream.query.filter_by(id=strm.id).one()
+      strm.fail_time = s.arrived_at
+      strm.fail_reason = reason
+      tc_session.add(strm)
+      tc_session.commit()
+    elif s.status == "CLOSED":
+      if isinstance(strm, FailedStream):
+        strm.close_reason = reason
+      else:
+        strm.expunge()
+        if not (lreason == "DONE" or (lreason == "END" and rreason == "DONE")):
+          # Convert to destroyed circuit
+          Stream.table.update(Stream.id ==
+                    strm.id).execute(row_type='failedstream')
+          strm = FailedStream.query.filter_by(id=strm.id).one()
+          strm.fail_time = s.arrived_at
+        else: 
+          # Convert to destroyed circuit
+          Stream.table.update(Stream.id ==
+                    strm.id).execute(row_type='closedstream')
+          strm = ClosedStream.query.filter_by(id=strm.id).one()
+          strm.read_bandwidth = strm.tot_read_bytes/(s.arrived_at-strm.start_time)
+          strm.write_bandwidth = strm.tot_write_bytes/(s.arrived_at-strm.start_time)
+          strm.end_time = s.arrived_at
+          plog("DEBUG", "Stream "+str(strm.strm_id)+" xmitted "+str(strm.tot_bytes()))
+        strm.close_reason = reason
+      tc_session.add(strm)
+      tc_session.commit()
+
+def run_example(host, port):
+  """ Example of basic TorCtl usage. See PathSupport for more advanced
+      usage.
+  """
+  print "host is %s:%d"%(host,port)
+  setup_db("sqlite:///torflow.sqlite", echo=False)
+
+  #print tc_session.query(((func.count(Extension.id)))).filter(and_(FailedExtension.table.c.row_type=='extension', FailedExtension.table.c.from_node_idhex == "7CAA2F5F998053EF5D2E622563DEB4A6175E49AC")).one()
+  #return
+  #for e in Extension.query.filter(FailedExtension.table.c.row_type=='extension').all():
+  #  if e.from_node: print "From: "+e.from_node.idhex+" "+e.from_node.nickname
+  #  if e.to_node: print "To: "+e.to_node.idhex+" "+e.to_node.nickname
+  #return
+
+  s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+  s.connect((host,port))
+  c = Connection(s)
+  th = c.launch_thread()
+  c.authenticate(control_pass)
+  c.set_event_handler(TorCtl.ConsensusTracker(c))
+  c.add_event_listener(ConsensusTrackerListener())
+  c.add_event_listener(CircuitListener())
+
+  print `c.extend_circuit(0,["moria1"])`
+  try:
+    print `c.extend_circuit(0,[""])`
+  except TorCtl.ErrorReply: # wtf?
+    print "got error. good."
+  except:
+    print "Strange error", sys.exc_info()[0]
+   
+  c.set_events([EVENT_TYPE.STREAM, EVENT_TYPE.CIRC,
+          EVENT_TYPE.NEWCONSENSUS, EVENT_TYPE.NEWDESC,
+          EVENT_TYPE.ORCONN, EVENT_TYPE.BW], True)
+
+  th.join()
+  return
+
+  
+if __name__ == '__main__':
+  run_example(control_host,control_port)
+

Added: arm/dependencies/TorCtl/ScanSupport.py
===================================================================
--- arm/dependencies/TorCtl/ScanSupport.py	                        (rev 0)
+++ arm/dependencies/TorCtl/ScanSupport.py	2010-08-23 01:13:01 UTC (rev 23018)
@@ -0,0 +1,251 @@
+import PathSupport
+import SQLSupport
+import threading
+import copy
+import time
+import shutil
+
+from TorUtil import plog
+
+# Note: be careful writing functions for this class. Remember that
+# the PathBuilder has its own thread that it recieves events on
+# independent from your thread that calls into here.
+class ScanHandler(PathSupport.PathBuilder):
+  def set_pct_rstr(self, percent_skip, percent_fast):
+    def notlambda(sm):
+      sm.percent_fast=percent_fast
+      sm.percent_skip=percent_skip
+    self.schedule_selmgr(notlambda)
+
+  def reset_stats(self):
+    def notlambda(this):
+      this.reset()
+    self.schedule_low_prio(notlambda)
+
+  def commit(self):
+    plog("INFO", "Scanner committing jobs...")
+    cond = threading.Condition()
+    def notlambda2(this):
+      cond.acquire()
+      this.run_all_jobs = False
+      plog("INFO", "Commit done.")
+      cond.notify()
+      cond.release()
+
+    def notlambda1(this):
+      plog("INFO", "Committing jobs...")
+      this.run_all_jobs = True
+      self.schedule_low_prio(notlambda2)
+
+    cond.acquire()
+    self.schedule_immediate(notlambda1)
+
+    cond.wait()
+    cond.release()
+    plog("INFO", "Scanner commit done.")
+
+  def close_circuits(self):
+    cond = threading.Condition()
+    def notlambda(this):
+      cond.acquire()
+      this.close_all_circuits()
+      cond.notify()
+      cond.release()
+    cond.acquire()
+    self.schedule_low_prio(notlambda)
+    cond.wait()
+    cond.release()
+
+  def close_streams(self, reason):
+    cond = threading.Condition()
+    plog("NOTICE", "Wedged Tor stream. Closing all streams")
+    def notlambda(this):
+      cond.acquire()
+      this.close_all_streams(reason)
+      cond.notify()
+      cond.release()
+    cond.acquire()
+    self.schedule_low_prio(notlambda)
+    cond.wait()
+    cond.release()
+
+  def new_exit(self):
+    cond = threading.Condition()
+    def notlambda(this):
+      cond.acquire()
+      this.new_nym = True
+      if this.selmgr.bad_restrictions:
+        plog("NOTICE", "Clearing bad restrictions with reconfigure..")
+        this.selmgr.reconfigure(this.current_consensus())
+      lines = this.c.sendAndRecv("SIGNAL CLEARDNSCACHE\r\n")
+      for _,msg,more in lines:
+        plog("DEBUG", msg)
+      cond.notify()
+      cond.release()
+    cond.acquire()
+    self.schedule_low_prio(notlambda)
+    cond.wait()
+    cond.release()
+
+  def idhex_to_r(self, idhex):
+    cond = threading.Condition()
+    def notlambda(this):
+      cond.acquire()
+      if idhex in self.routers:
+        cond._result = self.routers[idhex]
+      else:
+        cond._result = None
+      cond.notify()
+      cond.release()
+    cond.acquire()
+    self.schedule_low_prio(notlambda)
+    cond.wait()
+    cond.release()
+    return cond._result
+
+  def name_to_idhex(self, nick):
+    cond = threading.Condition()
+    def notlambda(this):
+      cond.acquire()
+      if nick in self.name_to_key:
+        cond._result = self.name_to_key[nick]
+      else:
+        cond._result = None
+      cond.notify()
+      cond.release()
+    cond.acquire()
+    self.schedule_low_prio(notlambda)
+    cond.wait()
+    cond.release()
+    return cond._result
+
+  def rank_to_percent(self, rank):
+    cond = threading.Condition()
+    def notlambda(this):
+      cond.acquire()
+      cond._pct = (100.0*rank)/len(this.sorted_r) # lol moar haxx
+      cond.notify()
+      cond.release()
+    cond.acquire()
+    self.schedule_low_prio(notlambda)
+    cond.wait()
+    cond.release()
+    return cond._pct
+
+  def percent_to_rank(self, pct):
+    cond = threading.Condition()
+    def notlambda(this):
+      cond.acquire()
+      cond._rank = int(round((pct*len(this.sorted_r))/100.0,0)) # lol moar haxx
+      cond.notify()
+      cond.release()
+    cond.acquire()
+    self.schedule_low_prio(notlambda)
+    cond.wait()
+    cond.release()
+    return cond._rank
+
+  def get_exit_node(self):
+    ret = copy.copy(self.last_exit) # GIL FTW
+    if ret:
+      plog("DEBUG", "Got last exit of "+ret.idhex)
+    else:
+      plog("DEBUG", "No last exit.")
+    return ret
+
+  def set_exit_node(self, arg):
+    cond = threading.Condition()
+    exit_name = arg
+    plog("DEBUG", "Got Setexit: "+exit_name)
+    def notlambda(sm):
+      plog("DEBUG", "Job for setexit: "+exit_name)
+      cond.acquire()
+      sm.set_exit(exit_name)
+      cond.notify()
+      cond.release()
+    cond.acquire()
+    self.schedule_selmgr(notlambda)
+    cond.wait()
+    cond.release()
+
+class SQLScanHandler(ScanHandler):
+  def attach_sql_listener(self, db_uri):
+    plog("DEBUG", "Got sqlite: "+db_uri)
+    SQLSupport.setup_db(db_uri, echo=False, drop=True)
+    self.sql_consensus_listener = SQLSupport.ConsensusTrackerListener()
+    self.add_event_listener(self.sql_consensus_listener)
+    self.add_event_listener(SQLSupport.StreamListener())
+
+  def write_sql_stats(self, rfilename=None, stats_filter=None):
+    if not rfilename:
+      rfilename="./data/stats/sql-"+time.strftime("20%y-%m-%d-%H:%M:%S")
+    cond = threading.Condition()
+    def notlambda(h):
+      cond.acquire()
+      SQLSupport.RouterStats.write_stats(file(rfilename, "w"),
+                            0, 100, order_by=SQLSupport.RouterStats.sbw,
+                            recompute=True, disp_clause=stats_filter)
+      cond.notify()
+      cond.release()
+    cond.acquire()
+    self.schedule_low_prio(notlambda)
+    cond.wait()
+    cond.release()
+
+  def write_strm_bws(self, rfilename=None, slice_num=0, stats_filter=None):
+    if not rfilename:
+      rfilename="./data/stats/bws-"+time.strftime("20%y-%m-%d-%H:%M:%S")
+    cond = threading.Condition()
+    def notlambda(this):
+      cond.acquire()
+      f=file(rfilename, "w")
+      f.write("slicenum="+str(slice_num)+"\n")
+      SQLSupport.RouterStats.write_bws(f, 0, 100,
+                            order_by=SQLSupport.RouterStats.sbw,
+                            recompute=False, disp_clause=stats_filter)
+      f.close()
+      cond.notify()
+      cond.release()
+    cond.acquire()
+    self.schedule_low_prio(notlambda)
+    cond.wait()
+    cond.release()
+
+  def save_sql_file(self, sql_file, new_file):
+    cond = threading.Condition()
+    def notlambda(this):
+      cond.acquire()
+      SQLSupport.tc_session.close()
+      try:
+        shutil.copy(sql_file, new_file)
+      except Exception,e:
+        plog("WARN", "Error moving sql file: "+str(e))
+      SQLSupport.reset_all()
+      cond.notify()
+      cond.release()
+    cond.acquire()
+    self.schedule_low_prio(notlambda)
+    cond.wait()
+    cond.release()
+
+  def wait_for_consensus(self):
+    cond = threading.Condition()
+    def notlambda(this):
+      if this.sql_consensus_listener.last_desc_at \
+                 != SQLSupport.ConsensusTrackerListener.CONSENSUS_DONE:
+        this.sql_consensus_listener.wait_for_signal = False
+        plog("INFO", "Waiting on consensus result: "+str(this.run_all_jobs))
+        this.schedule_low_prio(notlambda)
+      else:
+        cond.acquire()
+        this.sql_consensus_listener.wait_for_signal = True
+        cond.notify()
+        cond.release()
+    plog("DEBUG", "Checking for consensus")
+    cond.acquire()
+    self.schedule_low_prio(notlambda)
+    cond.wait()
+    cond.release()
+    plog("INFO", "Consensus OK")
+
+

Added: arm/dependencies/TorCtl/StatsSupport.py
===================================================================
--- arm/dependencies/TorCtl/StatsSupport.py	                        (rev 0)
+++ arm/dependencies/TorCtl/StatsSupport.py	2010-08-23 01:13:01 UTC (rev 23018)
@@ -0,0 +1,894 @@
+#!/usr/bin/python
+#StatsSupport.py - functions and classes useful for calculating stream/circuit statistics
+
+"""
+
+Support classes for statisics gathering
+
+The StatsSupport package contains several classes that extend
+PathSupport to gather continuous statistics on the Tor network.
+
+The main entrypoint is to extend or create an instance of the
+StatsHandler class. The StatsHandler extends from
+TorCtl.PathSupport.PathBuilder, which is itself a TorCtl.EventHandler.
+The StatsHandler listens to CIRC and STREAM events and gathers all
+manner of statics on their creation and failure before passing the
+events back up to the PathBuilder code, which manages the actual
+construction and the attachment of streams to circuits.
+
+The package also contains a number of support classes that help gather
+additional statistics on the reliability and performance of routers.
+
+For the purpose of accounting failures, the code tracks two main classes
+of failure: 'actual' failure and 'suspected' failure. The general rule
+is that an actual failure is attributed to the node that directly
+handled the circuit or stream. For streams, this is considered to be the
+exit node. For circuits, it is both the extender and the extendee.
+'Suspected' failures, on the other hand, are attributed to every member
+of the circuit up until the extendee for circuits, and all hops for
+streams.
+
+For bandwidth accounting, the average stream bandwidth and the average
+ratio of stream bandwidth to advertised bandwidth are tracked, and when
+the statistics are written, a Z-test is performed to calculate the
+probabilities of these values assuming a normal distribution. Note,
+however, that it has not been verified that this distribution is
+actually normal. It is likely to be something else (pareto, perhaps?).
+
+"""
+
+import sys
+import re
+import random
+import copy
+import time
+import math
+import traceback
+
+import TorUtil, PathSupport, TorCtl
+from TorUtil import *
+from PathSupport import *
+from TorUtil import meta_port, meta_host, control_port, control_host
+
+class ReasonRouterList:
+  "Helper class to track which Routers have failed for a given reason"
+  def __init__(self, reason):
+    self.reason = reason
+    self.rlist = {}
+
+  def sort_list(self): raise NotImplemented()
+
+  def write_list(self, f):
+    "Write the list of failure counts for this reason 'f'"
+    rlist = self.sort_list()
+    for r in rlist:
+      susp = 0
+      tot_failed = r.circ_failed+r.strm_failed
+      tot_susp = tot_failed+r.circ_suspected+r.strm_suspected
+      f.write(r.idhex+" ("+r.nickname+") F=")
+      if self.reason in r.reason_failed:
+        susp = r.reason_failed[self.reason]
+      f.write(str(susp)+"/"+str(tot_failed))
+      f.write(" S=")
+      if self.reason in r.reason_suspected:
+        susp += r.reason_suspected[self.reason]
+      f.write(str(susp)+"/"+str(tot_susp)+"\n")
+    
+  def add_r(self, r):
+    "Add a router to the list for this reason"
+    self.rlist[r] = 1
+
+  def total_suspected(self):
+    "Get a list of total suspected failures for this reason"
+    # suspected is disjoint from failed. The failed table
+    # may not have an entry
+    def notlambda(x, y):
+      if self.reason in y.reason_suspected:
+        if self.reason in y.reason_failed:
+          return (x + y.reason_suspected[self.reason]
+               + y.reason_failed[self.reason])
+        else:
+          return (x + y.reason_suspected[self.reason])
+      else:
+        if self.reason in y.reason_failed:
+          return (x + y.reason_failed[self.reason])
+        else: return x
+    return reduce(notlambda, self.rlist.iterkeys(), 0)
+
+  def total_failed(self):
+    "Get a list of total failures for this reason"
+    def notlambda(x, y):
+      if self.reason in y.reason_failed:
+        return (x + y.reason_failed[self.reason])
+      else: return x
+    return reduce(notlambda, self.rlist.iterkeys(), 0)
+ 
+class SuspectRouterList(ReasonRouterList):
+  """Helper class to track all routers suspected of failing for a given
+     reason. The main difference between this and the normal
+     ReasonRouterList is the sort order and the verification."""
+  def __init__(self, reason): ReasonRouterList.__init__(self,reason)
+  
+  def sort_list(self):
+    rlist = self.rlist.keys()
+    rlist.sort(lambda x, y: cmp(y.reason_suspected[self.reason],
+                  x.reason_suspected[self.reason]))
+    return rlist
+   
+  def _verify_suspected(self):
+    return reduce(lambda x, y: x + y.reason_suspected[self.reason],
+            self.rlist.iterkeys(), 0)
+
+class FailedRouterList(ReasonRouterList):
+  """Helper class to track all routers that failed for a given
+     reason. The main difference between this and the normal
+     ReasonRouterList is the sort order and the verification."""
+  def __init__(self, reason): ReasonRouterList.__init__(self,reason)
+
+  def sort_list(self):
+    rlist = self.rlist.keys()
+    rlist.sort(lambda x, y: cmp(y.reason_failed[self.reason],
+                  x.reason_failed[self.reason]))
+    return rlist
+
+  def _verify_failed(self):
+    return reduce(lambda x, y: x + y.reason_failed[self.reason],
+            self.rlist.iterkeys(), 0)
+class BandwidthStats:
+  "Class that manages observed bandwidth through a Router"
+  def __init__(self):
+    self.byte_list = []
+    self.duration_list = []
+    self.min_bw = 1e10
+    self.max_bw = 0
+    self.mean = 0
+    self.dev = 0
+
+  def _exp(self): # Weighted avg
+    "Expectation - weighted average of the bandwidth through this node"
+    tot_bw = reduce(lambda x, y: x+y, self.byte_list, 0.0)
+    EX = 0.0
+    for i in xrange(len(self.byte_list)):
+      EX += (self.byte_list[i]*self.byte_list[i])/self.duration_list[i]
+    if tot_bw == 0.0: return 0.0
+    EX /= tot_bw
+    return EX
+
+  def _exp2(self): # E[X^2]
+    "Second moment of the bandwidth"
+    tot_bw = reduce(lambda x, y: x+y, self.byte_list, 0.0)
+    EX = 0.0
+    for i in xrange(len(self.byte_list)):
+      EX += (self.byte_list[i]**3)/(self.duration_list[i]**2)
+    if tot_bw == 0.0: return 0.0
+    EX /= tot_bw
+    return EX
+    
+  def _dev(self): # Weighted dev
+    "Standard deviation of bandwidth"
+    EX = self.mean
+    EX2 = self._exp2()
+    arg = EX2 - (EX*EX)
+    if arg < -0.05:
+      plog("WARN", "Diff of "+str(EX2)+" and "+str(EX)+"^2 is "+str(arg))
+    return math.sqrt(abs(arg))
+
+  def add_bw(self, bytes, duration):
+    "Add an observed transfer of 'bytes' for 'duration' seconds"
+    if not bytes: plog("NOTICE", "No bytes for bandwidth")
+    bytes /= 1024.
+    self.byte_list.append(bytes)
+    self.duration_list.append(duration)
+    bw = bytes/duration
+    plog("DEBUG", "Got bandwidth "+str(bw))
+    if self.min_bw > bw: self.min_bw = bw
+    if self.max_bw < bw: self.max_bw = bw
+    self.mean = self._exp()
+    self.dev = self._dev()
+
+
+class StatsRouter(TorCtl.Router):
+  "Extended Router to handle statistics markup"
+  def __init__(self, router): # Promotion constructor :)
+    """'Promotion Constructor' that converts a Router directly into a 
+    StatsRouter without a copy."""
+    # TODO: Use __bases__ to do this instead?
+    self.__dict__ = router.__dict__
+    self.reset()
+    # StatsRouters should not be destroyed when Tor forgets about them
+    # Give them an extra refcount:
+    self.refcount += 1
+    plog("DEBUG", "Stats refcount "+str(self.refcount)+" for "+self.idhex)
+
+  def reset(self):
+    "Reset all stats on this Router"
+    self.circ_uncounted = 0
+    self.circ_failed = 0
+    self.circ_succeeded = 0 # disjoint from failed
+    self.circ_suspected = 0
+    self.circ_chosen = 0 # above 4 should add to this
+    self.strm_failed = 0 # Only exits should have these
+    self.strm_succeeded = 0
+    self.strm_suspected = 0 # disjoint from failed
+    self.strm_uncounted = 0
+    self.strm_chosen = 0 # above 4 should add to this
+    self.reason_suspected = {}
+    self.reason_failed = {}
+    self.first_seen = time.time()
+    if "Running" in self.flags:
+      self.became_active_at = self.first_seen
+      self.hibernated_at = 0
+    else:
+      self.became_active_at = 0
+      self.hibernated_at = self.first_seen
+    self.total_hibernation_time = 0
+    self.total_active_uptime = 0
+    self.total_extend_time = 0
+    self.total_extended = 0
+    self.bwstats = BandwidthStats()
+    self.z_ratio = 0
+    self.prob_zr = 0
+    self.z_bw = 0
+    self.prob_zb = 0
+    self.rank_history = []
+    self.bw_history = []
+
+  def was_used(self):
+    """Return True if this router was used in this round"""
+    return self.circ_chosen != 0
+
+  def avg_extend_time(self):
+    """Return the average amount of time it took for this router
+     to extend a circuit one hop"""
+    if self.total_extended:
+      return self.total_extend_time/self.total_extended
+    else: return 0
+
+  def bw_ratio(self):
+    """Return the ratio of the Router's advertised bandwidth to its 
+     observed average stream bandwidth"""
+    bw = self.bwstats.mean
+    if bw == 0.0: return 0
+    else: return self.bw/(1024.*bw)
+
+  def adv_ratio(self): # XXX
+    """Return the ratio of the Router's advertised bandwidth to 
+       the overall average observed bandwith"""
+    bw = StatsRouter.global_bw_mean
+    if bw == 0.0: return 0
+    else: return self.bw/bw
+
+  def avg_rank(self):
+    if not self.rank_history: return self.list_rank
+    return (1.0*sum(self.rank_history))/len(self.rank_history)
+
+  def bw_ratio_ratio(self):
+    bwr = self.bw_ratio()
+    if bwr == 0.0: return 0
+    # (avg_reported_bw/our_reported_bw) *
+    # (our_stream_capacity/avg_stream_capacity)
+    return StatsRouter.global_ratio_mean/bwr 
+
+  def strm_bw_ratio(self):
+    """Return the ratio of the Router's stream capacity to the average
+       stream capacity passed in as 'mean'"""
+    bw = self.bwstats.mean
+    if StatsRouter.global_strm_mean == 0.0: return 0
+    else: return (1.0*bw)/StatsRouter.global_strm_mean
+
+  def circ_fail_rate(self):
+    if self.circ_chosen == 0: return 0
+    return (1.0*self.circ_failed)/self.circ_chosen
+
+  def strm_fail_rate(self):
+    if self.strm_chosen == 0: return 0
+    return (1.0*self.strm_failed)/self.strm_chosen
+
+  def circ_suspect_rate(self):
+    if self.circ_chosen == 0: return 1
+    return (1.0*(self.circ_suspected+self.circ_failed))/self.circ_chosen
+
+  def strm_suspect_rate(self):
+    if self.strm_chosen == 0: return 1
+    return (1.0*(self.strm_suspected+self.strm_failed))/self.strm_chosen
+
+  def circ_suspect_ratio(self):
+    if 1.0-StatsRouter.global_cs_mean <= 0.0: return 0
+    return (1.0-self.circ_suspect_rate())/(1.0-StatsRouter.global_cs_mean)
+
+  def strm_suspect_ratio(self):
+    if 1.0-StatsRouter.global_ss_mean <= 0.0: return 0
+    return (1.0-self.strm_suspect_rate())/(1.0-StatsRouter.global_ss_mean)
+
+  def circ_fail_ratio(self):
+    if 1.0-StatsRouter.global_cf_mean <= 0.0: return 0
+    return (1.0-self.circ_fail_rate())/(1.0-StatsRouter.global_cf_mean)
+
+  def strm_fail_ratio(self):
+    if 1.0-StatsRouter.global_sf_mean <= 0.0: return 0
+    return (1.0-self.strm_fail_rate())/(1.0-StatsRouter.global_sf_mean)
+
+  def current_uptime(self):
+    if self.became_active_at:
+      ret = (self.total_active_uptime+(time.time()-self.became_active_at))
+    else:
+      ret = self.total_active_uptime
+    if ret == 0: return 0.000005 # eh..
+    else: return ret
+        
+  def failed_per_hour(self):
+    """Return the number of circuit extend failures per hour for this 
+     Router"""
+    return (3600.*(self.circ_failed+self.strm_failed))/self.current_uptime()
+
+  # XXX: Seperate suspected from failed in totals 
+  def suspected_per_hour(self):
+    """Return the number of circuits that failed with this router as an
+     earlier hop"""
+    return (3600.*(self.circ_suspected+self.strm_suspected
+          +self.circ_failed+self.strm_failed))/self.current_uptime()
+
+  # These four are for sanity checking
+  def _suspected_per_hour(self):
+    return (3600.*(self.circ_suspected+self.strm_suspected))/self.current_uptime()
+
+  def _uncounted_per_hour(self):
+    return (3600.*(self.circ_uncounted+self.strm_uncounted))/self.current_uptime()
+
+  def _chosen_per_hour(self):
+    return (3600.*(self.circ_chosen+self.strm_chosen))/self.current_uptime()
+
+  def _succeeded_per_hour(self):
+    return (3600.*(self.circ_succeeded+self.strm_succeeded))/self.current_uptime()
+  
+  key = """Metatroller Router Statistics:
+  CC=Circuits Chosen   CF=Circuits Failed      CS=Circuit Suspected
+  SC=Streams Chosen    SF=Streams Failed       SS=Streams Suspected
+  FH=Failed per Hour   SH=Suspected per Hour   ET=avg circuit Extend Time (s)
+  EB=mean BW (K)       BD=BW std Dev (K)       BR=Ratio of observed to avg BW
+  ZB=BW z-test value   PB=Probability(z-bw)    ZR=Ratio z-test value
+  PR=Prob(z-ratio)     SR=Global mean/mean BW  U=Uptime (h)\n"""
+
+  global_strm_mean = 0.0
+  global_strm_dev = 0.0
+  global_ratio_mean = 0.0
+  global_ratio_dev = 0.0
+  global_bw_mean = 0.0
+  global_cf_mean = 0.0
+  global_sf_mean = 0.0
+  global_cs_mean = 0.0
+  global_ss_mean = 0.0
+
+  def __str__(self):
+    return (self.idhex+" ("+self.nickname+")\n"
+    +"   CC="+str(self.circ_chosen)
+      +" CF="+str(self.circ_failed)
+      +" CS="+str(self.circ_suspected+self.circ_failed)
+      +" SC="+str(self.strm_chosen)
+      +" SF="+str(self.strm_failed)
+      +" SS="+str(self.strm_suspected+self.strm_failed)
+      +" FH="+str(round(self.failed_per_hour(),1))
+      +" SH="+str(round(self.suspected_per_hour(),1))+"\n"
+    +"   ET="+str(round(self.avg_extend_time(),1))
+      +" EB="+str(round(self.bwstats.mean,1))
+      +" BD="+str(round(self.bwstats.dev,1))
+      +" ZB="+str(round(self.z_bw,1))
+      +" PB="+(str(round(self.prob_zb,3))[1:])
+      +" BR="+str(round(self.bw_ratio(),1))
+      +" ZR="+str(round(self.z_ratio,1))
+      +" PR="+(str(round(self.prob_zr,3))[1:])
+      +" SR="+(str(round(self.strm_bw_ratio(),1)))
+      +" U="+str(round(self.current_uptime()/3600, 1))+"\n")
+
+  def sanity_check(self):
+    "Makes sure all stats are self-consistent"
+    if (self.circ_failed + self.circ_succeeded + self.circ_suspected
+      + self.circ_uncounted != self.circ_chosen):
+      plog("ERROR", self.nickname+" does not add up for circs")
+    if (self.strm_failed + self.strm_succeeded + self.strm_suspected
+      + self.strm_uncounted != self.strm_chosen):
+      plog("ERROR", self.nickname+" does not add up for streams")
+    def check_reasons(reasons, expected, which, rtype):
+      count = 0
+      for rs in reasons.iterkeys():
+        if re.search(r"^"+which, rs): count += reasons[rs]
+      if count != expected:
+        plog("ERROR", "Mismatch "+which+" "+rtype+" for "+self.nickname)
+    check_reasons(self.reason_suspected,self.strm_suspected,"STREAM","susp")
+    check_reasons(self.reason_suspected,self.circ_suspected,"CIRC","susp")
+    check_reasons(self.reason_failed,self.strm_failed,"STREAM","failed")
+    check_reasons(self.reason_failed,self.circ_failed,"CIRC","failed")
+    now = time.time()
+    tot_hib_time = self.total_hibernation_time
+    tot_uptime = self.total_active_uptime
+    if self.hibernated_at: tot_hib_time += now - self.hibernated_at
+    if self.became_active_at: tot_uptime += now - self.became_active_at
+    if round(tot_hib_time+tot_uptime) != round(now-self.first_seen):
+      plog("ERROR", "Mismatch of uptimes for "+self.nickname)
+    
+    per_hour_tot = round(self._uncounted_per_hour()+self.failed_per_hour()+
+         self._suspected_per_hour()+self._succeeded_per_hour(), 2)
+    chosen_tot = round(self._chosen_per_hour(), 2)
+    if per_hour_tot != chosen_tot:
+      plog("ERROR", self.nickname+" has mismatch of per hour counts: "
+                    +str(per_hour_tot) +" vs "+str(chosen_tot))
+
+
+# TODO: Use __metaclass__ and type to make this inheritance flexible?
+class StatsHandler(PathSupport.PathBuilder):
+  """An extension of PathSupport.PathBuilder that keeps track of 
+     router statistics for every circuit and stream"""
+  def __init__(self, c, slmgr, RouterClass=StatsRouter, track_ranks=False):
+    PathBuilder.__init__(self, c, slmgr, RouterClass)
+    self.circ_count = 0
+    self.strm_count = 0
+    self.strm_failed = 0
+    self.circ_failed = 0
+    self.circ_succeeded = 0
+    self.failed_reasons = {}
+    self.suspect_reasons = {}
+    self.track_ranks = track_ranks
+
+  # XXX: Shit, all this stuff should be slice-based
+  def run_zbtest(self): # Unweighted z-test
+    """Run unweighted z-test to calculate the probabilities of a node
+       having a given stream bandwidth based on the Normal distribution"""
+    n = reduce(lambda x, y: x+(y.bwstats.mean > 0), self.sorted_r, 0)
+    if n == 0: return (0, 0)
+    avg = reduce(lambda x, y: x+y.bwstats.mean, self.sorted_r, 0)/float(n)
+    def notlambda(x, y):
+      if y.bwstats.mean <= 0: return x+0
+      else: return x+(y.bwstats.mean-avg)*(y.bwstats.mean-avg)
+    stddev = math.sqrt(reduce(notlambda, self.sorted_r, 0)/float(n))
+    if not stddev: return (avg, stddev)
+    for r in self.sorted_r:
+      if r.bwstats.mean > 0:
+        r.z_bw = abs((r.bwstats.mean-avg)/stddev)
+        r.prob_zb = TorUtil.zprob(-r.z_bw)
+    return (avg, stddev)
+
+  def run_zrtest(self): # Unweighted z-test
+    """Run unweighted z-test to calculate the probabilities of a node
+       having a given ratio of stream bandwidth to advertised bandwidth
+       based on the Normal distribution"""
+    n = reduce(lambda x, y: x+(y.bw_ratio() > 0), self.sorted_r, 0)
+    if n == 0: return (0, 0)
+    avg = reduce(lambda x, y: x+y.bw_ratio(), self.sorted_r, 0)/float(n)
+    def notlambda(x, y):
+      if y.bw_ratio() <= 0: return x+0
+      else: return x+(y.bw_ratio()-avg)*(y.bw_ratio()-avg)
+    stddev = math.sqrt(reduce(notlambda, self.sorted_r, 0)/float(n))
+    if not stddev: return (avg, stddev)
+    for r in self.sorted_r:
+      if r.bw_ratio() > 0:
+        r.z_ratio = abs((r.bw_ratio()-avg)/stddev)
+        r.prob_zr = TorUtil.zprob(-r.z_ratio)
+    return (avg, stddev)
+
+  def avg_adv_bw(self):
+    n = reduce(lambda x, y: x+y.was_used(), self.sorted_r, 0)
+    if n == 0: return (0, 0)
+    avg = reduce(lambda x, y: x+y.bw, 
+            filter(lambda r: r.was_used(), self.sorted_r), 0)/float(n)
+    return avg 
+
+  def avg_circ_failure(self):
+    n = reduce(lambda x, y: x+y.was_used(), self.sorted_r, 0)
+    if n == 0: return (0, 0)
+    avg = reduce(lambda x, y: x+y.circ_fail_rate(), 
+            filter(lambda r: r.was_used(), self.sorted_r), 0)/float(n)
+    return avg 
+
+  def avg_stream_failure(self):
+    n = reduce(lambda x, y: x+y.was_used(), self.sorted_r, 0)
+    if n == 0: return (0, 0)
+    avg = reduce(lambda x, y: x+y.strm_fail_rate(), 
+            filter(lambda r: r.was_used(), self.sorted_r), 0)/float(n)
+    return avg 
+
+  def avg_circ_suspects(self):
+    n = reduce(lambda x, y: x+y.was_used(), self.sorted_r, 0)
+    if n == 0: return (0, 0)
+    avg = reduce(lambda x, y: x+y.circ_suspect_rate(), 
+            filter(lambda r: r.was_used(), self.sorted_r), 0)/float(n)
+    return avg 
+
+  def avg_stream_suspects(self):
+    n = reduce(lambda x, y: x+y.was_used(), self.sorted_r, 0)
+    if n == 0: return (0, 0)
+    avg = reduce(lambda x, y: x+y.strm_suspect_rate(), 
+            filter(lambda r: r.was_used(), self.sorted_r), 0)/float(n)
+    return avg 
+
+  def write_reasons(self, f, reasons, name):
+    "Write out all the failure reasons and statistics for all Routers"
+    f.write("\n\n\t----------------- "+name+" -----------------\n")
+    for rsn in reasons:
+      f.write("\n"+rsn.reason+". Failed: "+str(rsn.total_failed())
+          +", Suspected: "+str(rsn.total_suspected())+"\n")
+      rsn.write_list(f)
+
+  def write_routers(self, f, rlist, name):
+    "Write out all the usage statistics for all Routers"
+    f.write("\n\n\t----------------- "+name+" -----------------\n\n")
+    for r in rlist:
+      # only print it if we've used it.
+      if r.circ_chosen+r.strm_chosen > 0: f.write(str(r))
+
+  # FIXME: Maybe move this two up into StatsRouter too?
+  ratio_key = """Metatroller Ratio Statistics:
+  SR=Stream avg ratio     AR=Advertised bw ratio    BRR=Adv. bw avg ratio
+  CSR=Circ suspect ratio  CFR=Circ Fail Ratio       SSR=Stream suspect ratio
+  SFR=Stream fail ratio   CC=Circuit Count          SC=Stream Count
+  P=Percentile Rank       U=Uptime (h)\n"""
+  
+  def write_ratios(self, filename):
+    "Write out bandwith ratio stats StatsHandler has gathered"
+    plog("DEBUG", "Writing ratios to "+filename)
+    f = file(filename, "w")
+    f.write(StatsHandler.ratio_key)
+
+    (avg, dev) = self.run_zbtest()
+    StatsRouter.global_strm_mean = avg
+    StatsRouter.global_strm_dev = dev
+    (avg, dev) = self.run_zrtest()
+    StatsRouter.global_ratio_mean = avg
+    StatsRouter.global_ratio_dev = dev
+
+    StatsRouter.global_bw_mean = self.avg_adv_bw()
+
+    StatsRouter.global_cf_mean = self.avg_circ_failure()
+    StatsRouter.global_sf_mean = self.avg_stream_failure()
+    
+    StatsRouter.global_cs_mean = self.avg_circ_suspects()
+    StatsRouter.global_ss_mean = self.avg_stream_suspects()
+
+    strm_bw_ratio = copy.copy(self.sorted_r)
+    strm_bw_ratio.sort(lambda x, y: cmp(x.strm_bw_ratio(), y.strm_bw_ratio()))
+    for r in strm_bw_ratio:
+      if r.circ_chosen == 0: continue
+      f.write(r.idhex+"="+r.nickname+"\n  ")
+      f.write("SR="+str(round(r.strm_bw_ratio(),4))+" AR="+str(round(r.adv_ratio(), 4))+" BRR="+str(round(r.bw_ratio_ratio(),4))+" CSR="+str(round(r.circ_suspect_ratio(),4))+" CFR="+str(round(r.circ_fail_ratio(),4))+" SSR="+str(round(r.strm_suspect_ratio(),4))+" SFR="+str(round(r.strm_fail_ratio(),4))+" CC="+str(r.circ_chosen)+" SC="+str(r.strm_chosen)+" U="+str(round(r.current_uptime()/3600,1))+" P="+str(round((100.0*r.avg_rank())/len(self.sorted_r),1))+"\n")
+    f.close()
+ 
+  def write_stats(self, filename):
+    "Write out all the statistics the StatsHandler has gathered"
+    # TODO: all this shit should be configurable. Some of it only makes
+    # sense when scanning in certain modes.
+    plog("DEBUG", "Writing stats to "+filename)
+    # Sanity check routers
+    for r in self.sorted_r: r.sanity_check()
+
+    # Sanity check the router reason lists.
+    for r in self.sorted_r:
+      for rsn in r.reason_failed:
+        if rsn not in self.failed_reasons:
+          plog("ERROR", "Router "+r.idhex+" w/o reason "+rsn+" in fail table")
+        elif r not in self.failed_reasons[rsn].rlist:
+          plog("ERROR", "Router "+r.idhex+" missing from fail table")
+      for rsn in r.reason_suspected:
+        if rsn not in self.suspect_reasons:
+          plog("ERROR", "Router "+r.idhex+" w/o reason "+rsn+" in fail table") 
+        elif r not in self.suspect_reasons[rsn].rlist:
+          plog("ERROR", "Router "+r.idhex+" missing from suspect table")
+
+    # Sanity check the lists the other way
+    for rsn in self.failed_reasons.itervalues(): rsn._verify_failed()
+    for rsn in self.suspect_reasons.itervalues(): rsn._verify_suspected()
+
+    f = file(filename, "w")
+    f.write(StatsRouter.key)
+    (avg, dev) = self.run_zbtest()
+    StatsRouter.global_strm_mean = avg
+    StatsRouter.global_strm_dev = dev
+    f.write("\n\nBW stats: u="+str(round(avg,1))+" s="+str(round(dev,1))+"\n")
+
+    (avg, dev) = self.run_zrtest()
+    StatsRouter.global_ratio_mean = avg
+    StatsRouter.global_ratio_dev = dev
+    f.write("BW ratio stats: u="+str(round(avg,1))+" s="+str(round(dev,1))+"\n")
+
+
+    # Circ, strm infoz
+    f.write("Circ failure ratio: "+str(self.circ_failed)
+            +"/"+str(self.circ_count)+"\n")
+
+    f.write("Stream failure ratio: "+str(self.strm_failed)
+            +"/"+str(self.strm_count)+"\n")
+
+    # Extend times 
+    n = 0.01+reduce(lambda x, y: x+(y.avg_extend_time() > 0), self.sorted_r, 0)
+    avg_extend = reduce(lambda x, y: x+y.avg_extend_time(), self.sorted_r, 0)/n
+    def notlambda(x, y):
+      return x+(y.avg_extend_time()-avg_extend)*(y.avg_extend_time()-avg_extend) 
+    dev_extend = math.sqrt(reduce(notlambda, self.sorted_r, 0)/float(n))
+
+    f.write("Extend time: u="+str(round(avg_extend,1))
+             +" s="+str(round(dev_extend,1)))
+    
+    # sort+print by bandwidth
+    strm_bw_ratio = copy.copy(self.sorted_r)
+    strm_bw_ratio.sort(lambda x, y: cmp(x.strm_bw_ratio(), y.strm_bw_ratio()))
+    self.write_routers(f, strm_bw_ratio, "Stream Ratios")
+
+    # sort+print by bandwidth
+    bw_rate = copy.copy(self.sorted_r)
+    bw_rate.sort(lambda x, y: cmp(y.bw_ratio(), x.bw_ratio()))
+    self.write_routers(f, bw_rate, "Bandwidth Ratios")
+
+    failed = copy.copy(self.sorted_r)
+    failed.sort(lambda x, y:
+          cmp(y.circ_failed+y.strm_failed,
+            x.circ_failed+x.strm_failed))
+    self.write_routers(f, failed, "Failed Counts")
+
+    suspected = copy.copy(self.sorted_r)
+    suspected.sort(lambda x, y: # Suspected includes failed
+       cmp(y.circ_failed+y.strm_failed+y.circ_suspected+y.strm_suspected,
+         x.circ_failed+x.strm_failed+x.circ_suspected+x.strm_suspected))
+    self.write_routers(f, suspected, "Suspected Counts")
+
+    fail_rate = copy.copy(failed)
+    fail_rate.sort(lambda x, y: cmp(y.failed_per_hour(), x.failed_per_hour()))
+    self.write_routers(f, fail_rate, "Fail Rates")
+
+    suspect_rate = copy.copy(suspected)
+    suspect_rate.sort(lambda x, y:
+       cmp(y.suspected_per_hour(), x.suspected_per_hour()))
+    self.write_routers(f, suspect_rate, "Suspect Rates")
+    
+    # TODO: Sort by failed/selected and suspect/selected ratios
+    # if we ever want to do non-uniform scanning..
+
+    # FIXME: Add failed in here somehow..
+    susp_reasons = self.suspect_reasons.values()
+    susp_reasons.sort(lambda x, y:
+       cmp(y.total_suspected(), x.total_suspected()))
+    self.write_reasons(f, susp_reasons, "Suspect Reasons")
+
+    fail_reasons = self.failed_reasons.values()
+    fail_reasons.sort(lambda x, y:
+       cmp(y.total_failed(), x.total_failed()))
+    self.write_reasons(f, fail_reasons, "Failed Reasons")
+    f.close()
+
+    # FIXME: sort+print by circ extend time
+
+  def reset(self):
+    PathSupport.PathBuilder.reset(self)
+    self.reset_stats()
+
+  def reset_stats(self):
+    plog("DEBUG", "Resetting stats")
+    self.circ_count = 0
+    self.strm_count = 0
+    self.strm_failed = 0
+    self.circ_succeeded = 0
+    self.circ_failed = 0
+    self.suspect_reasons.clear()
+    self.failed_reasons.clear()
+    for r in self.routers.itervalues(): r.reset()
+
+  def close_circuit(self, id):
+    PathSupport.PathBuilder.close_circuit(self, id)
+    # Shortcut so we don't have to wait for the CLOSE
+    # events for stats update.
+    self.circ_succeeded += 1
+    for r in self.circuits[id].path:
+      r.circ_chosen += 1
+      r.circ_succeeded += 1
+
+  def circ_status_event(self, c):
+    if c.circ_id in self.circuits:
+      # TODO: Hrmm, consider making this sane in TorCtl.
+      if c.reason: lreason = c.reason
+      else: lreason = "NONE"
+      if c.remote_reason: rreason = c.remote_reason
+      else: rreason = "NONE"
+      reason = c.event_name+":"+c.status+":"+lreason+":"+rreason
+      if c.status == "LAUNCHED":
+        # Update circ_chosen count
+        self.circ_count += 1
+      elif c.status == "EXTENDED":
+        delta = c.arrived_at - self.circuits[c.circ_id].last_extended_at
+        r_ext = c.path[-1]
+        try:
+          if r_ext[0] != '$': r_ext = self.name_to_key[r_ext]
+          self.routers[r_ext[1:]].total_extend_time += delta
+          self.routers[r_ext[1:]].total_extended += 1
+        except KeyError, e:
+          traceback.print_exc()
+          plog("WARN", "No key "+str(e)+" for "+r_ext+" in dict:"+str(self.name_to_key))
+      elif c.status == "FAILED":
+        for r in self.circuits[c.circ_id].path: r.circ_chosen += 1
+        
+        if len(c.path)-1 < 0: start_f = 0
+        else: start_f = len(c.path)-1 
+
+        # Count failed
+        self.circ_failed += 1
+        # XXX: Differentiate between extender and extendee
+        for r in self.circuits[c.circ_id].path[start_f:len(c.path)+1]:
+          r.circ_failed += 1
+          if not reason in r.reason_failed:
+            r.reason_failed[reason] = 1
+          else: r.reason_failed[reason]+=1
+          if reason not in self.failed_reasons:
+             self.failed_reasons[reason] = FailedRouterList(reason)
+          self.failed_reasons[reason].add_r(r)
+
+        for r in self.circuits[c.circ_id].path[len(c.path)+1:]:
+          r.circ_uncounted += 1
+
+        # Don't count if failed was set this round, don't set 
+        # suspected..
+        for r in self.circuits[c.circ_id].path[:start_f]:
+          r.circ_suspected += 1
+          if not reason in r.reason_suspected:
+            r.reason_suspected[reason] = 1
+          else: r.reason_suspected[reason]+=1
+          if reason not in self.suspect_reasons:
+             self.suspect_reasons[reason] = SuspectRouterList(reason)
+          self.suspect_reasons[reason].add_r(r)
+      elif c.status == "CLOSED":
+        # Since PathBuilder deletes the circuit on a failed, 
+        # we only get this for a clean close that was not
+        # requested by us.
+
+        # Don't count circuits we requested closed from
+        # pathbuilder, they are counted there instead.
+        if not self.circuits[c.circ_id].requested_closed:
+          self.circ_succeeded += 1
+          for r in self.circuits[c.circ_id].path:
+            r.circ_chosen += 1
+            if lreason in ("REQUESTED", "FINISHED", "ORIGIN"):
+              r.circ_succeeded += 1
+            else:
+              if not reason in r.reason_suspected:
+                r.reason_suspected[reason] = 1
+              else: r.reason_suspected[reason] += 1
+              r.circ_suspected+= 1
+              if reason not in self.suspect_reasons:
+                self.suspect_reasons[reason] = SuspectRouterList(reason)
+              self.suspect_reasons[reason].add_r(r)
+    PathBuilder.circ_status_event(self, c)
+
+  def count_stream_reason_failed(self, s, reason):
+    "Count the routers involved in a failure"
+    # Update failed count,reason_failed for exit
+    r = self.circuits[s.circ_id].exit
+    if not reason in r.reason_failed: r.reason_failed[reason] = 1
+    else: r.reason_failed[reason]+=1
+    r.strm_failed += 1
+    if reason not in self.failed_reasons:
+      self.failed_reasons[reason] = FailedRouterList(reason)
+    self.failed_reasons[reason].add_r(r)
+
+  def count_stream_suspects(self, s, lreason, reason):
+    "Count the routers 'suspected' of being involved in a failure"
+    if lreason in ("TIMEOUT", "INTERNAL", "TORPROTOCOL" "DESTROY"):
+      for r in self.circuits[s.circ_id].path[:-1]:
+        r.strm_suspected += 1
+        if not reason in r.reason_suspected:
+          r.reason_suspected[reason] = 1
+        else: r.reason_suspected[reason]+=1
+        if reason not in self.suspect_reasons:
+          self.suspect_reasons[reason] = SuspectRouterList(reason)
+        self.suspect_reasons[reason].add_r(r)
+    else:
+      for r in self.circuits[s.circ_id].path[:-1]:
+        r.strm_uncounted += 1
+  
+  def stream_status_event(self, s):
+    if s.strm_id in self.streams and not self.streams[s.strm_id].ignored:
+      # TODO: Hrmm, consider making this sane in TorCtl.
+      if s.reason: lreason = s.reason
+      else: lreason = "NONE"
+      if s.remote_reason: rreason = s.remote_reason
+      else: rreason = "NONE"
+      reason = s.event_name+":"+s.status+":"+lreason+":"+rreason+":"+self.streams[s.strm_id].kind
+      circ = self.streams[s.strm_id].circ
+      if not circ: circ = self.streams[s.strm_id].pending_circ
+      if (s.status in ("DETACHED", "FAILED", "CLOSED", "SUCCEEDED")
+          and not s.circ_id):
+        # XXX: REMAPs can do this (normal). Also REASON=DESTROY (bug?)
+        if circ:
+          plog("INFO", "Stream "+s.status+" of "+str(s.strm_id)+" gave circ 0.  Resetting to stored circ id: "+str(circ.circ_id))
+          s.circ_id = circ.circ_id
+        #elif s.reason == "TIMEOUT" or s.reason == "EXITPOLICY":
+        #  plog("NOTICE", "Stream "+str(s.strm_id)+" detached with "+s.reason)
+        else:
+          plog("WARN", "Stream "+str(s.strm_id)+" detached from no known circuit with reason: "+str(s.reason))
+          PathBuilder.stream_status_event(self, s)
+          return
+
+      # Verify circ id matches stream.circ
+      if s.status not in ("NEW", "NEWRESOLVE", "REMAP"):
+        if s.circ_id and circ and circ.circ_id != s.circ_id:
+          plog("WARN", str(s.strm_id) + " has mismatch of "
+                +str(s.circ_id)+" v "+str(circ.circ_id))
+        if s.circ_id and s.circ_id not in self.circuits:
+          plog("NOTICE", "Unknown circuit "+str(s.circ_id)
+                +" for stream "+str(s.strm_id))
+          PathBuilder.stream_status_event(self, s)
+          return
+      
+      if s.status == "DETACHED":
+        if self.streams[s.strm_id].attached_at:
+          plog("WARN", str(s.strm_id)+" detached after succeeded")
+        # Update strm_chosen count
+        self.strm_count += 1
+        for r in self.circuits[s.circ_id].path: r.strm_chosen += 1
+        self.strm_failed += 1
+        self.count_stream_suspects(s, lreason, reason)
+        self.count_stream_reason_failed(s, reason)
+      elif s.status == "FAILED":
+        # HACK. We get both failed and closed for the same stream,
+        # with different reasons. Might as well record both, since they 
+        # often differ.
+        self.streams[s.strm_id].failed_reason = reason
+      elif s.status == "CLOSED":
+        # Always get both a closed and a failed.. 
+        #   - Check if the circuit exists still
+        # Update strm_chosen count
+        self.strm_count += 1
+        for r in self.circuits[s.circ_id].path: r.strm_chosen += 1
+
+        if self.streams[s.strm_id].failed:
+          reason = self.streams[s.strm_id].failed_reason+":"+lreason+":"+rreason
+
+        self.count_stream_suspects(s, lreason, reason)
+          
+        r = self.circuits[s.circ_id].exit
+        if (not self.streams[s.strm_id].failed
+          and (lreason == "DONE" or (lreason == "END" and rreason == "DONE"))):
+          r.strm_succeeded += 1
+
+          # Update bw stats. XXX: Don't do this for resolve streams
+          if self.streams[s.strm_id].attached_at:
+            lifespan = self.streams[s.strm_id].lifespan(s.arrived_at)
+            for r in self.streams[s.strm_id].circ.path:
+              r.bwstats.add_bw(self.streams[s.strm_id].bytes_written+
+                               self.streams[s.strm_id].bytes_read, lifespan)
+  
+        else:
+          self.strm_failed += 1
+          self.count_stream_reason_failed(s, reason)
+    PathBuilder.stream_status_event(self, s)
+
+  def _check_hibernation(self, r, now):
+    if r.down:
+      if not r.hibernated_at:
+        r.hibernated_at = now
+        r.total_active_uptime += now - r.became_active_at
+      r.became_active_at = 0
+    else:
+      if not r.became_active_at:
+        r.became_active_at = now
+        r.total_hibernation_time += now - r.hibernated_at
+      r.hibernated_at = 0
+
+  def new_consensus_event(self, n):
+    if self.track_ranks:
+      # Record previous rank and history.
+      for ns in n.nslist:
+        if not ns.idhex in self.routers:
+          continue
+        r = self.routers[ns.idhex]
+        r.bw_history.append(r.bw)
+      for r in self.sorted_r:
+        r.rank_history.append(r.list_rank)
+    PathBuilder.new_consensus_event(self, n)
+    now = n.arrived_at
+    for ns in n.nslist:
+      if not ns.idhex in self.routers: continue
+      self._check_hibernation(self.routers[ns.idhex], now)
+
+  def new_desc_event(self, d):
+    if PathBuilder.new_desc_event(self, d):
+      now = d.arrived_at
+      for i in d.idlist:
+        if not i in self.routers: continue
+        self._check_hibernation(self.routers[i], now)
+      
+   

Added: arm/dependencies/TorCtl/TorCtl.py
===================================================================
--- arm/dependencies/TorCtl/TorCtl.py	                        (rev 0)
+++ arm/dependencies/TorCtl/TorCtl.py	2010-08-23 01:13:01 UTC (rev 23018)
@@ -0,0 +1,1645 @@
+#!/usr/bin/python
+# TorCtl.py -- Python module to interface with Tor Control interface.
+# Copyright 2005 Nick Mathewson
+# Copyright 2007 Mike Perry. See LICENSE file.
+
+"""
+Library to control Tor processes.
+
+This library handles sending commands, parsing responses, and delivering
+events to and from the control port. The basic usage is to create a
+socket, wrap that in a TorCtl.Connection, and then add an EventHandler
+to that connection. A simple example with a DebugEventHandler (that just
+echoes the events back to stdout) is present in run_example().
+
+Note that the TorCtl.Connection is fully compatible with the more
+advanced EventHandlers in TorCtl.PathSupport (and of course any other
+custom event handlers that you may extend off of those).
+
+This package also contains a helper class for representing Routers, and
+classes and constants for each event.
+
+"""
+
+__all__ = ["EVENT_TYPE", "TorCtlError", "TorCtlClosed", "ProtocolError",
+           "ErrorReply", "NetworkStatus", "ExitPolicyLine", "Router",
+           "RouterVersion", "Connection", "parse_ns_body",
+           "EventHandler", "DebugEventHandler", "NetworkStatusEvent",
+           "NewDescEvent", "CircuitEvent", "StreamEvent", "ORConnEvent",
+           "StreamBwEvent", "LogEvent", "AddrMapEvent", "BWEvent",
+           "BuildTimeoutSetEvent", "UnknownEvent", "ConsensusTracker",
+           "EventListener", "EVENT_STATE" ]
+
+import os
+import re
+import struct
+import sys
+import threading
+import Queue
+import datetime
+import traceback
+import socket
+import binascii
+import types
+import time
+import copy
+
+from TorUtil import *
+
+if sys.version_info < (2, 5):
+  from sets import Set as set
+  from sha import sha as sha1
+else:
+  from hashlib import sha1
+
+# Types of "EVENT" message.
+EVENT_TYPE = Enum2(
+          CIRC="CIRC",
+          STREAM="STREAM",
+          ORCONN="ORCONN",
+          STREAM_BW="STREAM_BW",
+          BW="BW",
+          NS="NS",
+          NEWCONSENSUS="NEWCONSENSUS",
+          BUILDTIMEOUT_SET="BUILDTIMEOUT_SET",
+          GUARD="GUARD",
+          NEWDESC="NEWDESC",
+          ADDRMAP="ADDRMAP",
+          DEBUG="DEBUG",
+          INFO="INFO",
+          NOTICE="NOTICE",
+          WARN="WARN",
+          ERR="ERR")
+
+EVENT_STATE = Enum2(
+          PRISTINE="PRISTINE",
+          PRELISTEN="PRELISTEN",
+          HEARTBEAT="HEARTBEAT",
+          HANDLING="HANDLING",
+          POSTLISTEN="POSTLISTEN",
+          DONE="DONE")
+
+class TorCtlError(Exception):
+  "Generic error raised by TorControl code."
+  pass
+
+class TorCtlClosed(TorCtlError):
+  "Raised when the controller connection is closed by Tor (not by us.)"
+  pass
+
+class ProtocolError(TorCtlError):
+  "Raised on violations in Tor controller protocol"
+  pass
+
+class ErrorReply(TorCtlError):
+  "Raised when Tor controller returns an error"
+  def __init__(self, *args, **kwargs):
+    if "status" in kwargs:
+      self.status = kwargs.pop("status")
+    if "message" in kwargs:
+      self.message = kwargs.pop("message")
+    TorCtlError.__init__(self, *args, **kwargs)
+
+class NetworkStatus:
+  "Filled in during NS events"
+  def __init__(self, nickname, idhash, orhash, updated, ip, orport, dirport, flags, bandwidth=None):
+    self.nickname = nickname
+    self.idhash = idhash
+    self.orhash = orhash
+    self.ip = ip
+    self.orport = int(orport)
+    self.dirport = int(dirport)
+    self.flags = flags
+    self.idhex = (self.idhash + "=").decode("base64").encode("hex").upper()
+    self.bandwidth = bandwidth
+    m = re.search(r"(\d+)-(\d+)-(\d+) (\d+):(\d+):(\d+)", updated)
+    self.updated = datetime.datetime(*map(int, m.groups()))
+
+class Event:
+  def __init__(self, event_name):
+    self.event_name = event_name
+    self.arrived_at = 0
+    self.state = EVENT_STATE.PRISTINE
+
+class TimerEvent(Event):
+  def __init__(self, event_name, type):
+    Event.__init__(self, event_name)
+    self.type = type
+
+class NetworkStatusEvent(Event):
+  def __init__(self, event_name, nslist):
+    Event.__init__(self, event_name)
+    self.nslist = nslist # List of NetworkStatus objects
+
+class NewConsensusEvent(NetworkStatusEvent):
+  pass
+
+class NewDescEvent(Event):
+  def __init__(self, event_name, idlist):
+    Event.__init__(self, event_name)
+    self.idlist = idlist
+
+class GuardEvent(Event):
+  def __init__(self, event_name, ev_type, guard, status):
+    Event.__init__(self, event_name)
+    if "~" in guard:
+      (self.idhex, self.nick) = guard[1:].split("~")
+    elif "=" in guard:
+      (self.idhex, self.nick) = guard[1:].split("=")
+    else:
+      self.idhex = guard[1:]
+    self.status = status
+
+class BuildTimeoutSetEvent(Event):
+  def __init__(self, event_name, set_type, total_times, timeout_ms, xm, alpha,
+               quantile):
+    Event.__init__(self, event_name)
+    self.set_type = set_type
+    self.total_times = total_times
+    self.timeout_ms = timeout_ms
+    self.xm = xm
+    self.alpha = alpha
+    self.cutoff_quantile = quantile
+
+class CircuitEvent(Event):
+  def __init__(self, event_name, circ_id, status, path, purpose,
+         reason, remote_reason):
+    Event.__init__(self, event_name)
+    self.circ_id = circ_id
+    self.status = status
+    self.path = path
+    self.purpose = purpose
+    self.reason = reason
+    self.remote_reason = remote_reason
+
+class StreamEvent(Event):
+  def __init__(self, event_name, strm_id, status, circ_id, target_host,
+         target_port, reason, remote_reason, source, source_addr, purpose):
+    Event.__init__(self, event_name)
+    self.strm_id = strm_id
+    self.status = status
+    self.circ_id = circ_id
+    self.target_host = target_host
+    self.target_port = int(target_port)
+    self.reason = reason
+    self.remote_reason = remote_reason
+    self.source = source
+    self.source_addr = source_addr
+    self.purpose = purpose
+
+class ORConnEvent(Event):
+  def __init__(self, event_name, status, endpoint, age, read_bytes,
+         wrote_bytes, reason, ncircs):
+    Event.__init__(self, event_name)
+    self.status = status
+    self.endpoint = endpoint
+    self.age = age
+    self.read_bytes = read_bytes
+    self.wrote_bytes = wrote_bytes
+    self.reason = reason
+    self.ncircs = ncircs
+
+class StreamBwEvent(Event):
+  def __init__(self, event_name, strm_id, written, read):
+    Event.__init__(self, event_name)
+    self.strm_id = int(strm_id)
+    self.bytes_read = int(read)
+    self.bytes_written = int(written)
+
+class LogEvent(Event):
+  def __init__(self, level, msg):
+    Event.__init__(self, level)
+    self.level = level
+    self.msg = msg
+
+class AddrMapEvent(Event):
+  def __init__(self, event_name, from_addr, to_addr, when):
+    Event.__init__(self, event_name)
+    self.from_addr = from_addr
+    self.to_addr = to_addr
+    self.when = when
+
+class AddrMap:
+  def __init__(self, from_addr, to_addr, when):
+    self.from_addr = from_addr
+    self.to_addr = to_addr
+    self.when = when
+
+class BWEvent(Event):
+  def __init__(self, event_name, read, written):
+    Event.__init__(self, event_name)
+    self.read = read
+    self.written = written
+
+class UnknownEvent(Event):
+  def __init__(self, event_name, event_string):
+    Event.__init__(self, event_name)
+    self.event_string = event_string
+
+ipaddress_re = re.compile(r"(\d{1,3}\.){3}\d{1,3}$")
+class ExitPolicyLine:
+  """ Class to represent a line in a Router's exit policy in a way 
+      that can be easily checked. """
+  def __init__(self, match, ip_mask, port_low, port_high):
+    self.match = match
+    if ip_mask == "*":
+      self.ip = 0
+      self.netmask = 0
+    else:
+      if not "/" in ip_mask:
+        self.netmask = 0xFFFFFFFF
+        ip = ip_mask
+      else:
+        ip, mask = ip_mask.split("/")
+        if ipaddress_re.match(mask):
+          self.netmask=struct.unpack(">I", socket.inet_aton(mask))[0]
+        else:
+          self.netmask = 0xffffffff ^ (0xffffffff >> int(mask))
+      self.ip = struct.unpack(">I", socket.inet_aton(ip))[0]
+    self.ip &= self.netmask
+    if port_low == "*":
+      self.port_low,self.port_high = (0,65535)
+    else:
+      if not port_high:
+        port_high = port_low
+      self.port_low = int(port_low)
+      self.port_high = int(port_high)
+  
+  def check(self, ip, port):
+    """Check to see if an ip and port is matched by this line. 
+     Returns true if the line is an Accept, and False if it is a Reject. """
+    ip = struct.unpack(">I", socket.inet_aton(ip))[0]
+    if (ip & self.netmask) == self.ip:
+      if self.port_low <= port and port <= self.port_high:
+        return self.match
+    return -1
+
+  def __str__(self):
+    retr = ""
+    if self.match:
+      retr += "accept "
+    else:
+      retr += "reject "
+    retr += socket.inet_ntoa(struct.pack(">I",self.ip)) + "/"
+    retr += socket.inet_ntoa(struct.pack(">I",self.netmask)) + ":"
+    retr += str(self.port_low)+"-"+str(self.port_high)
+    return retr
+
+class RouterVersion:
+  """ Represents a Router's version. Overloads all comparison operators
+      to check for newer, older, or equivalent versions. """
+  def __init__(self, version):
+    if version:
+      v = re.search("^(\d+)\.(\d+)\.(\d+)\.(\d+)", version).groups()
+      self.version = int(v[0])*0x1000000 + int(v[1])*0x10000 + int(v[2])*0x100 + int(v[3])
+      self.ver_string = version
+    else: 
+      self.version = version
+      self.ver_string = "unknown"
+
+  def __lt__(self, other): return self.version < other.version
+  def __gt__(self, other): return self.version > other.version
+  def __ge__(self, other): return self.version >= other.version
+  def __le__(self, other): return self.version <= other.version
+  def __eq__(self, other): return self.version == other.version
+  def __ne__(self, other): return self.version != other.version
+  def __str__(self): return self.ver_string
+
+
+# map descriptor keywords to regular expressions.
+desc_re = {
+  "router":          r"(\S+) (\S+)",
+  "opt fingerprint": r"(.+).*on (\S+)",
+  "opt hibernating": r"1$",
+  "platform":  r"Tor (\S+).*on ([\S\s]+)",
+  "accept":    r"(\S+):([^-]+)(?:-(\d+))?",
+  "reject":    r"(\S+):([^-]+)(?:-(\d+))?",
+  "bandwidth": r"(\d+) \d+ (\d+)",
+  "uptime":    r"(\d+)",
+  "contact":   r"(.+)",
+  "published": r"(\S+ \S+)",
+}
+# Compile each regular expression now.
+for kw, reg in desc_re.iteritems():
+  desc_re[kw] = re.compile(reg)
+
+class Router:
+  """ 
+  Class to represent a router from a descriptor. Can either be
+  created from the parsed fields, or can be built from a
+  descriptor+NetworkStatus 
+  """     
+  def __init__(self, *args):
+    if len(args) == 1:
+      for i in args[0].__dict__:
+        self.__dict__[i] =  copy.deepcopy(args[0].__dict__[i])
+      return
+    else:
+      (idhex, name, bw, down, exitpolicy, flags, ip, version, os, uptime, published, contact, rate_limited, orhash, ns_bandwidth) = args
+    self.idhex = idhex
+    self.nickname = name
+    if ns_bandwidth != None:
+      self.bw = ns_bandwidth
+    else:
+     self.bw = bw
+    self.desc_bw = bw
+    self.exitpolicy = exitpolicy
+    self.flags = flags # Technicaly from NS doc
+    self.down = down
+    self.ip = struct.unpack(">I", socket.inet_aton(ip))[0]
+    self.version = RouterVersion(version)
+    self.os = os
+    self.list_rank = 0 # position in a sorted list of routers.
+    self.uptime = uptime
+    self.published = published
+    self.refcount = 0 # How many open circs are we currently in?
+    self.deleted = False # Has Tor already deleted this descriptor?
+    self.contact = contact
+    self.rate_limited = rate_limited
+    self.orhash = orhash
+    self._generated = [] # For ExactUniformGenerator
+
+  def __str__(self):
+    s = self.idhex, self.nickname
+    return s.__str__()
+
+  def build_from_desc(desc, ns):
+    """
+    Static method of Router that parses a descriptor string into this class.
+    'desc' is a full descriptor as a string. 
+    'ns' is a TorCtl.NetworkStatus instance for this router (needed for
+    the flags, the nickname, and the idhex string). 
+    Returns a Router instance.
+    """
+    exitpolicy = []
+    dead = not ("Running" in ns.flags)
+    bw_observed = 0
+    version = None
+    os = None
+    uptime = 0
+    ip = 0
+    router = "[none]"
+    published = "never"
+    contact = None
+
+    for line in desc:
+      # Pull off the keyword...
+      kw, _, rest = line.partition(" ")
+
+      # ...and if it's "opt", extend it by the next keyword
+      # so we get "opt hibernating" as one keyword.
+      if kw == "opt":
+        okw, _, rest = rest.partition(" ")
+        kw += " " + okw
+
+      # try to match the descriptor line by keyword.
+      try:
+        match = desc_re[kw].match(rest)
+      # if we don't handle this keyword, just move on to the next one.
+      except KeyError:
+        continue
+      # if we do handle this keyword but its data is malformed,
+      # move on to the next one without processing it.
+      if not match:
+        continue
+
+      g = match.groups()
+
+      # Handle each keyword individually.
+      # TODO: This could possibly be sped up since we technically already
+      # did the compare with the dictionary lookup... lambda magic time.
+      if kw == "accept":
+        exitpolicy.append(ExitPolicyLine(True, *g))
+      elif kw == "reject":
+        exitpolicy.append(ExitPolicyLine(False, *g))
+      elif kw == "router":
+        router,ip = g
+      elif kw == "bandwidth":
+        bws = map(int, g)
+        bw_observed = min(bws)
+        rate_limited = False
+        if bws[0] < bws[1]:
+          rate_limited = True
+      elif kw == "platform":
+        version, os = g
+      elif kw == "uptime":
+        uptime = int(g[0])
+      elif kw == "published":
+        t = time.strptime(g[0] + " UTC", "20%y-%m-%d %H:%M:%S %Z")
+        published = datetime.datetime(*t[0:6])
+      elif kw == "contact":
+        contact = g[0]
+      elif kw == "opt hibernating":
+        dead = True 
+        if ("Running" in ns.flags):
+          plog("INFO", "Hibernating router "+ns.nickname+" is running, flags: "+" ".join(ns.flags))
+
+    if router != ns.nickname:
+      plog("NOTICE", "Got different names " + ns.nickname + " vs " +
+             router + " for " + ns.idhex)
+    if not bw_observed and not dead and ("Valid" in ns.flags):
+      plog("INFO", "No bandwidth for live router "+ns.nickname+", flags: "+" ".join(ns.flags))
+      dead = True
+    if not version or not os:
+      plog("INFO", "No version and/or OS for router " + ns.nickname)
+    return Router(ns.idhex, ns.nickname, bw_observed, dead, exitpolicy,
+        ns.flags, ip, version, os, uptime, published, contact, rate_limited,
+        ns.orhash, ns.bandwidth)
+  build_from_desc = Callable(build_from_desc)
+
+  def update_to(self, new):
+    """ Somewhat hackish method to update this router to be a copy of
+    'new' """
+    if self.idhex != new.idhex:
+      plog("ERROR", "Update of router "+self.nickname+"changes idhex!")
+    plog("DEBUG", "Updating refcount "+str(self.refcount)+" for "+self.idhex)
+    for i in new.__dict__.iterkeys():
+      if i == "refcount" or i == "_generated": continue
+      self.__dict__[i] = new.__dict__[i]
+    plog("DEBUG", "Updated refcount "+str(self.refcount)+" for "+self.idhex)
+
+  def will_exit_to(self, ip, port):
+    """ Check the entire exitpolicy to see if the router will allow
+        connections to 'ip':'port' """
+    for line in self.exitpolicy:
+      ret = line.check(ip, port)
+      if ret != -1:
+        return ret
+    plog("WARN", "No matching exit line for "+self.nickname)
+    return False
+   
+class Connection:
+  """A Connection represents a connection to the Tor process via the 
+     control port."""
+  def __init__(self, sock):
+    """Create a Connection to communicate with the Tor process over the
+       socket 'sock'.
+    """
+    self._handler = None
+    self._handleFn = None
+    self._sendLock = threading.RLock()
+    self._queue = Queue.Queue()
+    self._thread = None
+    self._closedEx = None
+    self._closed = 0
+    self._closeHandler = None
+    self._eventThread = None
+    self._eventQueue = Queue.Queue()
+    self._s = BufSock(sock)
+    self._debugFile = None
+
+  def set_close_handler(self, handler):
+    """Call 'handler' when the Tor process has closed its connection or
+       given us an exception.  If we close normally, no arguments are
+       provided; otherwise, it will be called with an exception as its
+       argument.
+    """
+    self._closeHandler = handler
+
+  def close(self):
+    """Shut down this controller connection"""
+    self._sendLock.acquire()
+    try:
+      self._queue.put("CLOSE")
+      self._eventQueue.put((time.time(), "CLOSE"))
+      self._closed = 1
+      # XXX: For some reason, this does not cause the readline in
+      # self._read_reply() to return immediately. The _loop() thread
+      # thus tends to stick around until some event causes data to come
+      # back...
+      self._s.close()
+      self._eventThread.join()
+    finally:
+      self._sendLock.release()
+
+  def is_live(self):
+    """ Returns true iff the connection is alive and healthy"""
+    return self._thread.isAlive() and self._eventThread.isAlive() and not \
+           self._closed
+
+  def launch_thread(self, daemon=1):
+    """Launch a background thread to handle messages from the Tor process."""
+    assert self._thread is None
+    t = threading.Thread(target=self._loop, name="TorLoop")
+    if daemon:
+      t.setDaemon(daemon)
+    t.start()
+    self._thread = t
+    t = threading.Thread(target=self._eventLoop, name="EventLoop")
+    if daemon:
+      t.setDaemon(daemon)
+    t.start()
+    self._eventThread = t
+    # eventThread provides a more reliable indication of when we are done.
+    # The _loop thread won't always die when self.close() is called.
+    return self._eventThread
+
+  def _loop(self):
+    """Main subthread loop: Read commands from Tor, and handle them either
+       as events or as responses to other commands.
+    """
+    while 1:
+      try:
+        isEvent, reply = self._read_reply()
+      except TorCtlClosed:
+        plog("NOTICE", "Tor closed control connection. Exiting event thread.")
+        return
+      except Exception,e:
+        if not self._closed:
+          if sys:
+            self._err(sys.exc_info())
+          else:
+            plog("NOTICE", "No sys left at exception shutdown: "+str(e))
+            self._err((e.__class__, e, None))
+          return
+        else:
+          isEvent = 0
+
+      if isEvent:
+        if self._handler is not None:
+          self._eventQueue.put((time.time(), reply))
+      else:
+        cb = self._queue.get() # atomic..
+        if cb == "CLOSE":
+          self._s = None
+          plog("INFO", "Closed control connection. Exiting thread.")
+          return
+        else:
+          cb(reply)
+
+  def _err(self, (tp, ex, tb), fromEventLoop=0):
+    """DOCDOC"""
+    # silent death is bad :(
+    traceback.print_exception(tp, ex, tb)
+    if self._s:
+      try:
+        self.close()
+      except:
+        pass
+    self._sendLock.acquire()
+    try:
+      self._closedEx = ex
+      self._closed = 1
+    finally:
+      self._sendLock.release()
+    while 1:
+      try:
+        cb = self._queue.get(timeout=0)
+        if cb != "CLOSE":
+          cb("EXCEPTION")
+      except Queue.Empty:
+        break
+    if self._closeHandler is not None:
+      self._closeHandler(ex)
+    # I hate you for making me resort to this, python
+    os.kill(os.getpid(), 15)
+    return
+
+  def _eventLoop(self):
+    """DOCDOC"""
+    while 1:
+      (timestamp, reply) = self._eventQueue.get()
+      if reply[0][0] == "650" and reply[0][1] == "OK":
+        plog("DEBUG", "Ignoring incompatible syntactic sugar: 650 OK")
+        continue
+      if reply == "CLOSE":
+        plog("INFO", "Event loop received close message.")
+        return
+      try:
+        self._handleFn(timestamp, reply)
+      except:
+        for code, msg, data in reply:
+            plog("WARN", "No event for: "+str(code)+" "+str(msg))
+        self._err(sys.exc_info(), 1)
+        return
+
+  def _sendImpl(self, sendFn, msg):
+    """DOCDOC"""
+    if self._thread is None and not self._closed:
+      self.launch_thread(1)
+    # This condition will get notified when we've got a result...
+    condition = threading.Condition()
+    # Here's where the result goes...
+    result = []
+
+    if self._closedEx is not None:
+      raise self._closedEx
+    elif self._closed:
+      raise TorCtlClosed()
+
+    def cb(reply,condition=condition,result=result):
+      condition.acquire()
+      try:
+        result.append(reply)
+        condition.notify()
+      finally:
+        condition.release()
+
+    # Sends a message to Tor...
+    self._sendLock.acquire() # ensure queue+sendmsg is atomic
+    try:
+      self._queue.put(cb)
+      sendFn(msg) # _doSend(msg)
+    finally:
+      self._sendLock.release()
+
+    # Now wait till the answer is in...
+    condition.acquire()
+    try:
+      while not result:
+        condition.wait()
+    finally:
+      condition.release()
+
+    # ...And handle the answer appropriately.
+    assert len(result) == 1
+    reply = result[0]
+    if reply == "EXCEPTION":
+      raise self._closedEx
+
+    return reply
+
+
+  def debug(self, f):
+    """DOCDOC"""
+    self._debugFile = f
+
+  def set_event_handler(self, handler):
+    """Cause future events from the Tor process to be sent to 'handler'.
+    """
+    if self._handler:
+      handler.pre_listeners = self._handler.pre_listeners
+      handler.post_listeners = self._handler.post_listeners
+    self._handler = handler
+    self._handler.c = self
+    self._handleFn = handler._handle1
+
+  def add_event_listener(self, listener):
+    if not self._handler:
+      self.set_event_handler(EventHandler())
+    self._handler.add_event_listener(listener)
+
+  def _read_reply(self):
+    lines = []
+    while 1:
+      line = self._s.readline()
+      if not line:
+        self._closed = True
+        raise TorCtlClosed() 
+      line = line.strip()
+      if self._debugFile:
+        self._debugFile.write(str(time.time())+"\t  %s\n" % line)
+      if len(line)<4:
+        raise ProtocolError("Badly formatted reply line: Too short")
+      code = line[:3]
+      tp = line[3]
+      s = line[4:]
+      if tp == "-":
+        lines.append((code, s, None))
+      elif tp == " ":
+        lines.append((code, s, None))
+        isEvent = (lines and lines[0][0][0] == '6')
+        return isEvent, lines
+      elif tp != "+":
+        raise ProtocolError("Badly formatted reply line: unknown type %r"%tp)
+      else:
+        more = []
+        while 1:
+          line = self._s.readline()
+          if self._debugFile:
+            self._debugFile.write("+++ %s" % line)
+          if line in (".\r\n", ".\n", "650 OK\n", "650 OK\r\n"): 
+            break
+          more.append(line)
+        lines.append((code, s, unescape_dots("".join(more))))
+        isEvent = (lines and lines[0][0][0] == '6')
+        if isEvent: # Need "250 OK" if it's not an event. Otherwise, end
+          return (isEvent, lines)
+
+    # Notreached
+    raise TorCtlError()
+
+  def _doSend(self, msg):
+    if self._debugFile:
+      amsg = msg
+      lines = amsg.split("\n")
+      if len(lines) > 2:
+        amsg = "\n".join(lines[:2]) + "\n"
+      self._debugFile.write(str(time.time())+"\t>>> "+amsg)
+    self._s.write(msg)
+
+  def set_timer(self, in_seconds, type=None):
+    event = (("650", "TORCTL_TIMER", type),)
+    threading.Timer(in_seconds, lambda: 
+                  self._eventQueue.put((time.time(), event))).start()
+
+  def set_periodic_timer(self, every_seconds, type=None):
+    event = (("650", "TORCTL_TIMER", type),)
+    def notlambda():
+      plog("DEBUG", "Timer fired for type "+str(type))
+      self._eventQueue.put((time.time(), event))
+      self._eventQueue.put((time.time(), event))
+      threading.Timer(every_seconds, notlambda).start()
+    threading.Timer(every_seconds, notlambda).start()
+
+  def sendAndRecv(self, msg="", expectedTypes=("250", "251")):
+    """Helper: Send a command 'msg' to Tor, and wait for a command
+       in response.  If the response type is in expectedTypes,
+       return a list of (tp,body,extra) tuples.  If it is an
+       error, raise ErrorReply.  Otherwise, raise ProtocolError.
+    """
+    if type(msg) == types.ListType:
+      msg = "".join(msg)
+    assert msg.endswith("\r\n")
+
+    lines = self._sendImpl(self._doSend, msg)
+
+    # print lines
+    for tp, msg, _ in lines:
+      if tp[0] in '45':
+        code = int(tp[:3])
+        raise ErrorReply("%s %s"%(tp, msg), status = code, message = msg)
+      if tp not in expectedTypes:
+        raise ProtocolError("Unexpectd message type %r"%tp)
+
+    return lines
+
+  def authenticate(self, secret=""):
+    """Sends an authenticating secret (password) to Tor.  You'll need to call 
+       this method (or authenticate_cookie) before Tor can start.
+    """
+    #hexstr = binascii.b2a_hex(secret)
+    self.sendAndRecv("AUTHENTICATE \"%s\"\r\n"%secret)
+  
+  def authenticate_cookie(self, cookie):
+    """Sends an authentication cookie to Tor. This may either be a file or 
+       its contents.
+    """
+    
+    # read contents if provided a file
+    if type(cookie) == file: cookie = cookie.read()
+    
+    # unlike passwords the cookie contents isn't enclosed by quotes
+    self.sendAndRecv("AUTHENTICATE %s\r\n" % binascii.b2a_hex(cookie))
+
+  def get_option(self, name):
+    """Get the value of the configuration option named 'name'.  To
+       retrieve multiple values, pass a list for 'name' instead of
+       a string.  Returns a list of (key,value) pairs.
+       Refer to section 3.3 of control-spec.txt for a list of valid names.
+    """
+    if not isinstance(name, str):
+      name = " ".join(name)
+    lines = self.sendAndRecv("GETCONF %s\r\n" % name)
+
+    r = []
+    for _,line,_ in lines:
+      try:
+        key, val = line.split("=", 1)
+        r.append((key,val))
+      except ValueError:
+        r.append((line, None))
+
+    return r
+
+  def set_option(self, key, value):
+    """Set the value of the configuration option 'key' to the value 'value'.
+    """
+    self.set_options([(key, value)])
+
+  def set_options(self, kvlist):
+    """Given a list of (key,value) pairs, set them as configuration
+       options.
+    """
+    if not kvlist:
+      return
+    msg = " ".join(["%s=\"%s\""%(k,quote(v)) for k,v in kvlist])
+    self.sendAndRecv("SETCONF %s\r\n"%msg)
+
+  def reset_options(self, keylist):
+    """Reset the options listed in 'keylist' to their default values.
+
+       Tor started implementing this command in version 0.1.1.7-alpha;
+       previous versions wanted you to set configuration keys to "".
+       That no longer works.
+    """
+    self.sendAndRecv("RESETCONF %s\r\n"%(" ".join(keylist)))
+
+  def get_network_status(self, who="all"):
+    """Get the entire network status list. Returns a list of
+       TorCtl.NetworkStatus instances."""
+    return parse_ns_body(self.sendAndRecv("GETINFO ns/"+who+"\r\n")[0][2])
+
+  def get_address_mappings(self, type="all"):
+    # TODO: Also parse errors and GMTExpiry
+    body = self.sendAndRecv("GETINFO address-mappings/"+type+"\r\n")
+      
+    #print "|"+body[0][1].replace("address-mappings/"+type+"=", "")+"|"
+    #print str(body[0])
+
+    if body[0][1].replace("address-mappings/"+type+"=", "") != "":
+      # one line
+      lines = [body[0][1].replace("address-mappings/"+type+"=", "")]
+    elif not body[0][2]:
+      return []
+    else:
+      lines = body[0][2].split("\n")
+    if not lines: return []
+    ret = []
+    for l in lines:
+      #print "|"+str(l)+"|"
+      if len(l) == 0: continue #Skip last line.. it's empty
+      m = re.match(r'(\S+)\s+(\S+)\s+(\"[^"]+\"|\w+)', l)
+      if not m:
+        raise ProtocolError("ADDRMAP response misformatted.")
+      fromaddr, toaddr, when = m.groups()
+      if when.upper() == "NEVER":  
+        when = None
+      else:
+        when = time.strptime(when[1:-1], "%Y-%m-%d %H:%M:%S")
+      ret.append(AddrMap(fromaddr, toaddr, when))
+    return ret
+
+  def get_router(self, ns):
+    """Fill in a Router class corresponding to a given NS class"""
+    desc = self.sendAndRecv("GETINFO desc/id/" + ns.idhex + "\r\n")[0][2]
+    sig_start = desc.find("\nrouter-signature\n")+len("\nrouter-signature\n")
+    fp_base64 = sha1(desc[:sig_start]).digest().encode("base64")[:-2]
+    r = Router.build_from_desc(desc.split("\n"), ns)
+    if fp_base64 != ns.orhash:
+      plog("INFO", "Router descriptor for "+ns.idhex+" does not match ns fingerprint (NS @ "+str(ns.updated)+" vs Desc @ "+str(r.published)+")")
+      return None
+    else:
+      return r
+
+
+  def read_routers(self, nslist):
+    """ Given a list a NetworkStatuses in 'nslist', this function will 
+        return a list of new Router instances.
+    """
+    bad_key = 0
+    new = []
+    for ns in nslist:
+      try:
+        r = self.get_router(ns)
+        if r:
+          new.append(r)
+      except ErrorReply:
+        bad_key += 1
+        if "Running" in ns.flags:
+          plog("NOTICE", "Running router "+ns.nickname+"="
+             +ns.idhex+" has no descriptor")
+  
+    return new
+
+  def get_info(self, name):
+    """Return the value of the internal information field named 'name'.
+       Refer to section 3.9 of control-spec.txt for a list of valid names.
+       DOCDOC
+    """
+    if not isinstance(name, str):
+      name = " ".join(name)
+    lines = self.sendAndRecv("GETINFO %s\r\n"%name)
+    d = {}
+    for _,msg,more in lines:
+      if msg == "OK":
+        break
+      try:
+        k,rest = msg.split("=",1)
+      except ValueError:
+        raise ProtocolError("Bad info line %r",msg)
+      if more:
+        d[k] = more
+      else:
+        d[k] = rest
+    return d
+
+  def set_events(self, events, extended=False):
+    """Change the list of events that the event handler is interested
+       in to those in 'events', which is a list of event names.
+       Recognized event names are listed in section 3.3 of the control-spec
+    """
+    if extended:
+      plog ("DEBUG", "SETEVENTS EXTENDED %s\r\n" % " ".join(events))
+      self.sendAndRecv("SETEVENTS EXTENDED %s\r\n" % " ".join(events))
+    else:
+      self.sendAndRecv("SETEVENTS %s\r\n" % " ".join(events))
+
+  def save_conf(self):
+    """Flush all configuration changes to disk.
+    """
+    self.sendAndRecv("SAVECONF\r\n")
+
+  def send_signal(self, sig):
+    """Send the signal 'sig' to the Tor process; The allowed values for
+       'sig' are listed in section 3.6 of control-spec.
+    """
+    sig = { 0x01 : "HUP",
+        0x02 : "INT",
+        0x03 : "NEWNYM",
+        0x0A : "USR1",
+        0x0C : "USR2",
+        0x0F : "TERM" }.get(sig,sig)
+    self.sendAndRecv("SIGNAL %s\r\n"%sig)
+
+  def resolve(self, host):
+    """ Launch a remote hostname lookup request:
+        'host' may be a hostname or IPv4 address
+    """
+    # TODO: handle "mode=reverse"
+    self.sendAndRecv("RESOLVE %s\r\n"%host)
+
+  def map_address(self, kvList):
+    """ Sends the MAPADDRESS command for each of the tuples in kvList """
+    if not kvList:
+      return
+    m = " ".join([ "%s=%s" for k,v in kvList])
+    lines = self.sendAndRecv("MAPADDRESS %s\r\n"%m)
+    r = []
+    for _,line,_ in lines:
+      try:
+        key, val = line.split("=", 1)
+      except ValueError:
+        raise ProtocolError("Bad address line %r",v)
+      r.append((key,val))
+    return r
+
+  def extend_circuit(self, circid=None, hops=None):
+    """Tell Tor to extend the circuit identified by 'circid' through the
+       servers named in the list 'hops'.
+    """
+    if circid is None:
+      circid = 0
+    if hops is None:
+      hops = ""
+    plog("DEBUG", "Extending circuit")
+    lines = self.sendAndRecv("EXTENDCIRCUIT %d %s\r\n"
+                  %(circid, ",".join(hops)))
+    tp,msg,_ = lines[0]
+    m = re.match(r'EXTENDED (\S*)', msg)
+    if not m:
+      raise ProtocolError("Bad extended line %r",msg)
+    plog("DEBUG", "Circuit extended")
+    return int(m.group(1))
+
+  def redirect_stream(self, streamid, newaddr, newport=""):
+    """DOCDOC"""
+    if newport:
+      self.sendAndRecv("REDIRECTSTREAM %d %s %s\r\n"%(streamid, newaddr, newport))
+    else:
+      self.sendAndRecv("REDIRECTSTREAM %d %s\r\n"%(streamid, newaddr))
+
+  def attach_stream(self, streamid, circid, hop=None):
+    """Attach a stream to a circuit, specify both by IDs. If hop is given, 
+       try to use the specified hop in the circuit as the exit node for 
+       this stream.
+    """
+    if hop:
+      self.sendAndRecv("ATTACHSTREAM %d %d HOP=%d\r\n"%(streamid, circid, hop))
+      plog("DEBUG", "Attaching stream: "+str(streamid)+" to hop "+str(hop)+" of circuit "+str(circid))
+    else:
+      self.sendAndRecv("ATTACHSTREAM %d %d\r\n"%(streamid, circid))
+      plog("DEBUG", "Attaching stream: "+str(streamid)+" to circuit "+str(circid))
+
+  def close_stream(self, streamid, reason=0, flags=()):
+    """DOCDOC"""
+    self.sendAndRecv("CLOSESTREAM %d %s %s\r\n"
+              %(streamid, reason, "".join(flags)))
+
+  def close_circuit(self, circid, reason=0, flags=()):
+    """DOCDOC"""
+    self.sendAndRecv("CLOSECIRCUIT %d %s %s\r\n"
+              %(circid, reason, "".join(flags)))
+
+  def post_descriptor(self, desc):
+    self.sendAndRecv("+POSTDESCRIPTOR purpose=controller\r\n%s"%escape_dots(desc))
+
+def parse_ns_body(data):
+  """Parse the body of an NS event or command into a list of
+     NetworkStatus instances"""
+  if not data: return []
+  nsgroups = re.compile(r"^r ", re.M).split(data)
+  nsgroups.pop(0)
+  nslist = []
+  for nsline in nsgroups:
+    m = re.search(r"^s((?:[ ]\S*)+)", nsline, re.M)
+    flags = m.groups()
+    flags = flags[0].strip().split(" ")
+    m = re.match(r"(\S+)\s(\S+)\s(\S+)\s(\S+\s\S+)\s(\S+)\s(\d+)\s(\d+)", nsline)    
+    w = re.search(r"^w Bandwidth=(\d+)", nsline, re.M)
+    if w:
+      nslist.append(NetworkStatus(*(m.groups()+(flags,)+(int(w.group(1))*1000,))))
+    else:
+      nslist.append(NetworkStatus(*(m.groups() + (flags,))))
+  return nslist
+
+class EventSink:
+  def heartbeat_event(self, event): pass
+  def unknown_event(self, event): pass
+  def circ_status_event(self, event): pass
+  def stream_status_event(self, event): pass
+  def stream_bw_event(self, event): pass
+  def or_conn_status_event(self, event): pass
+  def bandwidth_event(self, event): pass
+  def new_desc_event(self, event): pass
+  def msg_event(self, event): pass
+  def ns_event(self, event): pass
+  def new_consensus_event(self, event): pass
+  def buildtimeout_set_event(self, event): pass
+  def guard_event(self, event): pass
+  def address_mapped_event(self, event): pass
+  def timer_event(self, event): pass
+
+class EventListener(EventSink):
+  """An 'EventListener' is a passive sink for parsed Tor events. It 
+     implements the same interface as EventHandler, but it should
+     not alter Tor's behavior as a result of these events.
+    
+     Do not extend from this class. Instead, extend from one of 
+     Pre, Post, or Dual event listener, to get events 
+     before, after, or before and after the EventHandler handles them.
+     """
+  def __init__(self):
+    """Create a new EventHandler."""
+    self._map1 = {
+      "CIRC" : self.circ_status_event,
+      "STREAM" : self.stream_status_event,
+      "ORCONN" : self.or_conn_status_event,
+      "STREAM_BW" : self.stream_bw_event,
+      "BW" : self.bandwidth_event,
+      "DEBUG" : self.msg_event,
+      "INFO" : self.msg_event,
+      "NOTICE" : self.msg_event,
+      "WARN" : self.msg_event,
+      "ERR" : self.msg_event,
+      "NEWDESC" : self.new_desc_event,
+      "ADDRMAP" : self.address_mapped_event,
+      "NS" : self.ns_event,
+      "NEWCONSENSUS" : self.new_consensus_event,
+      "BUILDTIMEOUT_SET" : self.buildtimeout_set_event,
+      "GUARD" : self.guard_event,
+      "TORCTL_TIMER" : self.timer_event
+      }
+    self.parent_handler = None
+    self._sabotage()
+
+  def _sabotage(self):
+    raise TorCtlError("Error: Do not extend from EventListener directly! Use Pre, Post or DualEventListener instead.")
+ 
+  def listen(self, event):
+    self.heartbeat_event(event)
+    self._map1.get(event.event_name, self.unknown_event)(event)
+
+  def set_parent(self, parent_handler):
+    self.parent_handler = parent_handler
+
+class PreEventListener(EventListener):
+  def _sabotage(self): pass
+class PostEventListener(EventListener):
+  def _sabotage(self): pass
+class DualEventListener(PreEventListener,PostEventListener): 
+  def _sabotage(self): pass
+
+class EventHandler(EventSink):
+  """An 'EventHandler' wraps callbacks for the events Tor can return. 
+     Each event argument is an instance of the corresponding event
+     class."""
+  def __init__(self):
+    """Create a new EventHandler."""
+    self._map1 = {
+      "CIRC" : self.circ_status_event,
+      "STREAM" : self.stream_status_event,
+      "ORCONN" : self.or_conn_status_event,
+      "STREAM_BW" : self.stream_bw_event,
+      "BW" : self.bandwidth_event,
+      "DEBUG" : self.msg_event,
+      "INFO" : self.msg_event,
+      "NOTICE" : self.msg_event,
+      "WARN" : self.msg_event,
+      "ERR" : self.msg_event,
+      "NEWDESC" : self.new_desc_event,
+      "ADDRMAP" : self.address_mapped_event,
+      "NS" : self.ns_event,
+      "NEWCONSENSUS" : self.new_consensus_event,
+      "BUILDTIMEOUT_SET" : self.buildtimeout_set_event,
+      "GUARD" : self.guard_event,
+      "TORCTL_TIMER" : self.timer_event
+      }
+    self.c = None # Gets set by Connection.set_event_hanlder()
+    self.pre_listeners = []
+    self.post_listeners = []
+
+  def _handle1(self, timestamp, lines):
+    """Dispatcher: called from Connection when an event is received."""
+    for code, msg, data in lines:
+      event = self._decode1(msg, data)
+      event.arrived_at = timestamp
+      event.state=EVENT_STATE.PRELISTEN
+      for l in self.pre_listeners:
+        l.listen(event)
+      event.state=EVENT_STATE.HEARTBEAT
+      self.heartbeat_event(event)
+      event.state=EVENT_STATE.HANDLING
+      self._map1.get(event.event_name, self.unknown_event)(event)
+      event.state=EVENT_STATE.POSTLISTEN
+      for l in self.post_listeners:
+        l.listen(event)
+
+  def _decode1(self, body, data):
+    """Unpack an event message into a type/arguments-tuple tuple."""
+    if " " in body:
+      evtype,body = body.split(" ",1)
+    else:
+      evtype,body = body,""
+    evtype = evtype.upper()
+    if evtype == "CIRC":
+      m = re.match(r"(\d+)\s+(\S+)(\s\S+)?(\s\S+)?(\s\S+)?(\s\S+)?", body)
+      if not m:
+        raise ProtocolError("CIRC event misformatted.")
+      ident,status,path,purpose,reason,remote = m.groups()
+      ident = int(ident)
+      if path:
+        if "PURPOSE=" in path:
+          remote = reason
+          reason = purpose
+          purpose=path
+          path=[]
+        elif "REASON=" in path:
+          remote = reason
+          reason = path
+          purpose = ""
+          path=[]
+        else:
+          path_verb = path.strip().split(",")
+          path = []
+          for p in path_verb:
+            path.append(p.replace("~", "=").split("=")[0])
+      else:
+        path = []
+
+      if purpose and "REASON=" in purpose:
+        remote=reason
+        reason=purpose
+        purpose=""
+
+      if purpose: purpose = purpose[9:]
+      if reason: reason = reason[8:]
+      if remote: remote = remote[15:]
+      event = CircuitEvent(evtype, ident, status, path, purpose, reason, remote)
+    elif evtype == "STREAM":
+      #plog("DEBUG", "STREAM: "+body)
+      m = re.match(r"(\S+)\s+(\S+)\s+(\S+)\s+(\S+)?:(\d+)(\sREASON=\S+)?(\sREMOTE_REASON=\S+)?(\sSOURCE=\S+)?(\sSOURCE_ADDR=\S+)?(\s+PURPOSE=\S+)?", body)
+      if not m:
+        raise ProtocolError("STREAM event misformatted.")
+      ident,status,circ,target_host,target_port,reason,remote,source,source_addr,purpose = m.groups()
+      ident,circ = map(int, (ident,circ))
+      if not target_host: # This can happen on SOCKS_PROTOCOL failures
+        target_host = "(none)"
+      if reason: reason = reason[8:]
+      if remote: remote = remote[15:]
+      if source: source = source[8:]
+      if source_addr: source_addr = source_addr[13:]
+      if purpose:
+        purpose = purpose.lstrip()
+        purpose = purpose[8:]
+      event = StreamEvent(evtype, ident, status, circ, target_host,
+               int(target_port), reason, remote, source, source_addr, purpose)
+    elif evtype == "ORCONN":
+      m = re.match(r"(\S+)\s+(\S+)(\sAGE=\S+)?(\sREAD=\S+)?(\sWRITTEN=\S+)?(\sREASON=\S+)?(\sNCIRCS=\S+)?", body)
+      if not m:
+        raise ProtocolError("ORCONN event misformatted.")
+      target, status, age, read, wrote, reason, ncircs = m.groups()
+
+      #plog("DEBUG", "ORCONN: "+body)
+      if ncircs: ncircs = int(ncircs[8:])
+      else: ncircs = 0
+      if reason: reason = reason[8:]
+      if age: age = int(age[5:])
+      else: age = 0
+      if read: read = int(read[6:])
+      else: read = 0
+      if wrote: wrote = int(wrote[9:])
+      else: wrote = 0
+      event = ORConnEvent(evtype, status, target, age, read, wrote,
+                reason, ncircs)
+    elif evtype == "STREAM_BW":
+      m = re.match(r"(\d+)\s+(\d+)\s+(\d+)", body)
+      if not m:
+        raise ProtocolError("STREAM_BW event misformatted.")
+      event = StreamBwEvent(evtype, *m.groups())
+    elif evtype == "BW":
+      m = re.match(r"(\d+)\s+(\d+)", body)
+      if not m:
+        raise ProtocolError("BANDWIDTH event misformatted.")
+      read, written = map(long, m.groups())
+      event = BWEvent(evtype, read, written)
+    elif evtype in ("DEBUG", "INFO", "NOTICE", "WARN", "ERR"):
+      event = LogEvent(evtype, body)
+    elif evtype == "NEWDESC":
+      ids_verb = body.split(" ")
+      ids = []
+      for i in ids_verb:
+        ids.append(i.replace("~", "=").split("=")[0].replace("$",""))
+      event = NewDescEvent(evtype, ids)
+    elif evtype == "ADDRMAP":
+      # TODO: Also parse errors and GMTExpiry
+      m = re.match(r'(\S+)\s+(\S+)\s+(\"[^"]+\"|\w+)', body)
+      if not m:
+        raise ProtocolError("ADDRMAP event misformatted.")
+      fromaddr, toaddr, when = m.groups()
+      if when.upper() == "NEVER":  
+        when = None
+      else:
+        when = time.strptime(when[1:-1], "%Y-%m-%d %H:%M:%S")
+      event = AddrMapEvent(evtype, fromaddr, toaddr, when)
+    elif evtype == "NS":
+      event = NetworkStatusEvent(evtype, parse_ns_body(data))
+    elif evtype == "NEWCONSENSUS":
+      event = NewConsensusEvent(evtype, parse_ns_body(data))
+    elif evtype == "BUILDTIMEOUT_SET":
+      m = re.match(
+        r"(\S+)\sTOTAL_TIMES=(\d+)\sTIMEOUT_MS=(\d+)\sXM=(\d+)\sALPHA=(\S+)\sCUTOFF_QUANTILE=(\S+)",
+        body)
+      set_type, total_times, timeout_ms, xm, alpha, quantile = m.groups()
+      event = BuildTimeoutSetEvent(evtype, set_type, int(total_times),
+                                   int(timeout_ms), int(xm), float(alpha),
+                                   float(quantile))
+    elif evtype == "GUARD":
+      m = re.match(r"(\S+)\s(\S+)\s(\S+)", body)
+      entry, guard, status = m.groups()
+      event = GuardEvent(evtype, entry, guard, status)
+    elif evtype == "TORCTL_TIMER":
+      event = TimerEvent(evtype, data)
+    else:
+      event = UnknownEvent(evtype, body)
+
+    return event
+
+  def add_event_listener(self, evlistener):
+    if isinstance(evlistener, PreEventListener):
+      self.pre_listeners.append(evlistener)
+    if isinstance(evlistener, PostEventListener):
+      self.post_listeners.append(evlistener)
+    evlistener.set_parent(self)
+
+  def heartbeat_event(self, event):
+    """Called before any event is received. Convenience function
+       for any cleanup/setup/reconfiguration you may need to do.
+    """
+    pass
+
+  def unknown_event(self, event):
+    """Called when we get an event type we don't recognize.  This
+       is almost alwyas an error.
+    """
+    pass
+
+  def circ_status_event(self, event):
+    """Called when a circuit status changes if listening to CIRCSTATUS
+       events."""
+    pass
+
+  def stream_status_event(self, event):
+    """Called when a stream status changes if listening to STREAMSTATUS
+       events.  """
+    pass
+
+  def stream_bw_event(self, event):
+    pass
+
+  def or_conn_status_event(self, event):
+    """Called when an OR connection's status changes if listening to
+       ORCONNSTATUS events."""
+    pass
+
+  def bandwidth_event(self, event):
+    """Called once a second if listening to BANDWIDTH events.
+    """
+    pass
+
+  def new_desc_event(self, event):
+    """Called when Tor learns a new server descriptor if listenting to
+       NEWDESC events.
+    """
+    pass
+
+  def msg_event(self, event):
+    """Called when a log message of a given severity arrives if listening
+       to INFO_MSG, NOTICE_MSG, WARN_MSG, or ERR_MSG events."""
+    pass
+
+  def ns_event(self, event):
+    pass
+
+  def new_consensus_event(self, event):
+    pass
+
+  def buildtimeout_set_event(self, event):
+    pass
+
+  def guard_event(self, event):
+    pass
+
+  def address_mapped_event(self, event):
+    """Called when Tor adds a mapping for an address if listening
+       to ADDRESSMAPPED events.
+    """
+    pass
+
+  def timer_event(self, event):
+    pass
+
+class Consensus:
+  """
+  A Consensus is a pickleable container for the members of
+  ConsensusTracker. This should only be used as a temporary 
+  reference, and will change after a NEWDESC or NEWCONSENUS event.
+  If you want a copy of a consensus that is independent
+  of subsequent updates, use copy.deepcopy()
+  """
+
+  def __init__(self, ns_map, sorted_r, router_map, nick_map, consensus_count):
+    self.ns_map = ns_map
+    self.sorted_r = sorted_r
+    self.routers = router_map
+    self.name_to_key = nick_map
+    self.consensus_count = consensus_count
+
+class ConsensusTracker(EventHandler):
+  """
+  A ConsensusTracker is an EventHandler that tracks the current
+  consensus of Tor in self.ns_map, self.routers and self.sorted_r
+  """
+  def __init__(self, c, RouterClass=Router):
+    EventHandler.__init__(self)
+    c.set_event_handler(self)
+    self.ns_map = {}
+    self.routers = {}
+    self.sorted_r = []
+    self.name_to_key = {}
+    self.RouterClass = RouterClass
+    self.consensus_count = 0
+    self.update_consensus()
+
+  # XXX: If there were a potential memory leak through perpetually referenced
+  # objects, this function would be the #1 suspect.
+  def _read_routers(self, nslist):
+    # Routers can fall out of our consensus five different ways:
+    # 1. Their descriptors disappear
+    # 2. Their NS documents disappear
+    # 3. They lose the Running flag
+    # 4. They list a bandwidth of 0
+    # 5. They have 'opt hibernating' set
+    routers = self.c.read_routers(nslist) # Sets .down if 3,4,5
+    self.consensus_count = len(routers)
+    old_idhexes = set(self.routers.keys())
+    new_idhexes = set(map(lambda r: r.idhex, routers)) 
+    for r in routers:
+      if r.idhex in self.routers:
+        if self.routers[r.idhex].nickname != r.nickname:
+          plog("NOTICE", "Router "+r.idhex+" changed names from "
+             +self.routers[r.idhex].nickname+" to "+r.nickname)
+        # Must do IN-PLACE update to keep all the refs to this router
+        # valid and current (especially for stats)
+        self.routers[r.idhex].update_to(r)
+      else:
+        rc = self.RouterClass(r)
+        self.routers[rc.idhex] = rc
+
+    removed_idhexes = old_idhexes - new_idhexes
+    removed_idhexes.update(set(map(lambda r: r.idhex,
+                                   filter(lambda r: r.down, routers))))
+
+    for i in removed_idhexes:
+      if i not in self.routers: continue
+      self.routers[i].down = True
+      if "Running" in self.routers[i].flags:
+        self.routers[i].flags.remove("Running")
+      if self.routers[i].refcount == 0:
+        self.routers[i].deleted = True
+        if self.routers[i].__class__.__name__ == "StatsRouter":
+          plog("WARN", "Expiring non-running StatsRouter "+i)
+        else:
+          plog("INFO", "Expiring non-running router "+i)
+        del self.routers[i]
+      else:
+        plog("INFO", "Postponing expiring non-running router "+i)
+        self.routers[i].deleted = True
+
+    self.sorted_r = filter(lambda r: not r.down, self.routers.itervalues())
+    self.sorted_r.sort(lambda x, y: cmp(y.bw, x.bw))
+    for i in xrange(len(self.sorted_r)): self.sorted_r[i].list_rank = i
+
+    # XXX: Verification only. Can be removed.
+    self._sanity_check(self.sorted_r)
+
+  def _sanity_check(self, list):
+    if len(self.routers) > 1.5*self.consensus_count:
+      plog("WARN", "Router count of "+str(len(self.routers))+" exceeds consensus count "+str(self.consensus_count)+" by more than 50%")
+
+    if len(self.ns_map) < self.consensus_count:
+      plog("WARN", "NS map count of "+str(len(self.ns_map))+" is below consensus count "+str(self.consensus_count))
+
+    downed =  filter(lambda r: r.down, list)
+    for d in downed:
+      plog("WARN", "Router "+d.idhex+" still present but is down. Del: "+str(d.deleted)+", flags: "+str(d.flags)+", bw: "+str(d.bw))
+ 
+    deleted =  filter(lambda r: r.deleted, list)
+    for d in deleted:
+      plog("WARN", "Router "+d.idhex+" still present but is deleted. Down: "+str(d.down)+", flags: "+str(d.flags)+", bw: "+str(d.bw))
+
+    zero =  filter(lambda r: r.refcount == 0 and r.__class__.__name__ == "StatsRouter", list)
+    for d in zero:
+      plog("WARN", "Router "+d.idhex+" has refcount 0. Del:"+str(d.deleted)+", Down: "+str(d.down)+", flags: "+str(d.flags)+", bw: "+str(d.bw))
+ 
+  def _update_consensus(self, nslist):
+    self.ns_map = {}
+    for n in nslist:
+      self.ns_map[n.idhex] = n
+      self.name_to_key[n.nickname] = "$"+n.idhex
+   
+  def update_consensus(self):
+    self._update_consensus(self.c.get_network_status())
+    self._read_routers(self.ns_map.values())
+
+  def new_consensus_event(self, n):
+    self._update_consensus(n.nslist)
+    self._read_routers(self.ns_map.values())
+    plog("DEBUG", str(time.time()-n.arrived_at)+" Read " + str(len(n.nslist))
+       +" NC => " + str(len(self.sorted_r)) + " routers")
+ 
+  def new_desc_event(self, d):
+    update = False
+    for i in d.idlist:
+      r = None
+      try:
+        ns = self.c.get_network_status("id/"+i)
+        r = self.c.read_routers(ns)
+      except ErrorReply, e:
+        plog("WARN", "Error reply for "+i+" after NEWDESC: "+str(e))
+        continue
+      if not r:
+        plog("WARN", "No router desc for "+i+" after NEWDESC")
+        continue
+      elif len(r) != 1:
+        plog("WARN", "Multiple descs for "+i+" after NEWDESC")
+
+      r = r[0]
+      ns = ns[0]
+      if ns.idhex in self.routers and self.routers[ns.idhex].orhash == r.orhash:
+        plog("NOTICE",
+             "Got extra NEWDESC event for router "+ns.nickname+"="+ns.idhex)
+      else:
+        self.consensus_count += 1
+      self.name_to_key[ns.nickname] = "$"+ns.idhex
+      if r and r.idhex in self.ns_map:
+        if ns.orhash != self.ns_map[r.idhex].orhash:
+          plog("WARN", "Getinfo and consensus disagree for "+r.idhex)
+          continue
+        update = True
+        if r.idhex in self.routers:
+          self.routers[r.idhex].update_to(r)
+        else:
+          self.routers[r.idhex] = self.RouterClass(r)
+    if update:
+      self.sorted_r = filter(lambda r: not r.down, self.routers.itervalues())
+      self.sorted_r.sort(lambda x, y: cmp(y.bw, x.bw))
+      for i in xrange(len(self.sorted_r)): self.sorted_r[i].list_rank = i
+    plog("DEBUG", str(time.time()-d.arrived_at)+ " Read " + str(len(d.idlist))
+       +" ND => "+str(len(self.sorted_r))+" routers. Update: "+str(update))
+    # XXX: Verification only. Can be removed.
+    self._sanity_check(self.sorted_r)
+    return update
+
+  def current_consensus(self):
+    return Consensus(self.ns_map, self.sorted_r, self.routers, 
+                     self.name_to_key, self.consensus_count)
+
+class DebugEventHandler(EventHandler):
+  """Trivial debug event handler: reassembles all parsed events to stdout."""
+  def circ_status_event(self, circ_event): # CircuitEvent()
+    output = [circ_event.event_name, str(circ_event.circ_id),
+          circ_event.status]
+    if circ_event.path:
+      output.append(",".join(circ_event.path))
+    if circ_event.reason:
+      output.append("REASON=" + circ_event.reason)
+    if circ_event.remote_reason:
+      output.append("REMOTE_REASON=" + circ_event.remote_reason)
+    print " ".join(output)
+
+  def stream_status_event(self, strm_event):
+    output = [strm_event.event_name, str(strm_event.strm_id),
+          strm_event.status, str(strm_event.circ_id),
+          strm_event.target_host, str(strm_event.target_port)]
+    if strm_event.reason:
+      output.append("REASON=" + strm_event.reason)
+    if strm_event.remote_reason:
+      output.append("REMOTE_REASON=" + strm_event.remote_reason)
+    print " ".join(output)
+
+  def ns_event(self, ns_event):
+    for ns in ns_event.nslist:
+      print " ".join((ns_event.event_name, ns.nickname, ns.idhash,
+        ns.updated.isoformat(), ns.ip, str(ns.orport),
+        str(ns.dirport), " ".join(ns.flags)))
+
+  def new_consensus_event(self, nc_event):
+    self.ns_event(nc_event)
+
+  def new_desc_event(self, newdesc_event):
+    print " ".join((newdesc_event.event_name, " ".join(newdesc_event.idlist)))
+   
+  def or_conn_status_event(self, orconn_event):
+    if orconn_event.age: age = "AGE="+str(orconn_event.age)
+    else: age = ""
+    if orconn_event.read_bytes: read = "READ="+str(orconn_event.read_bytes)
+    else: read = ""
+    if orconn_event.wrote_bytes: wrote = "WRITTEN="+str(orconn_event.wrote_bytes)
+    else: wrote = ""
+    if orconn_event.reason: reason = "REASON="+orconn_event.reason
+    else: reason = ""
+    if orconn_event.ncircs: ncircs = "NCIRCS="+str(orconn_event.ncircs)
+    else: ncircs = ""
+    print " ".join((orconn_event.event_name, orconn_event.endpoint,
+            orconn_event.status, age, read, wrote, reason, ncircs))
+
+  def msg_event(self, log_event):
+    print log_event.event_name+" "+log_event.msg
+  
+  def bandwidth_event(self, bw_event):
+    print bw_event.event_name+" "+str(bw_event.read)+" "+str(bw_event.written)
+
+def parseHostAndPort(h):
+  """Given a string of the form 'address:port' or 'address' or
+     'port' or '', return a two-tuple of (address, port)
+  """
+  host, port = "localhost", 9100
+  if ":" in h:
+    i = h.index(":")
+    host = h[:i]
+    try:
+      port = int(h[i+1:])
+    except ValueError:
+      print "Bad hostname %r"%h
+      sys.exit(1)
+  elif h:
+    try:
+      port = int(h)
+    except ValueError:
+      host = h
+
+  return host, port
+
+def run_example(host,port):
+  """ Example of basic TorCtl usage. See PathSupport for more advanced
+      usage.
+  """
+  print "host is %s:%d"%(host,port)
+  s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+  s.connect((host,port))
+  c = Connection(s)
+  c.set_event_handler(DebugEventHandler())
+  th = c.launch_thread()
+  c.authenticate()
+  print "nick",`c.get_option("nickname")`
+  print `c.get_info("version")`
+  #print `c.get_info("desc/name/moria1")`
+  print `c.get_info("network-status")`
+  print `c.get_info("addr-mappings/all")`
+  print `c.get_info("addr-mappings/config")`
+  print `c.get_info("addr-mappings/cache")`
+  print `c.get_info("addr-mappings/control")`
+
+  print `c.extend_circuit(0,["moria1"])`
+  try:
+    print `c.extend_circuit(0,[""])`
+  except ErrorReply: # wtf?
+    print "got error. good."
+  except:
+    print "Strange error", sys.exc_info()[0]
+   
+  #send_signal(s,1)
+  #save_conf(s)
+
+  #set_option(s,"1")
+  #set_option(s,"bandwidthburstbytes 100000")
+  #set_option(s,"runasdaemon 1")
+  #set_events(s,[EVENT_TYPE.WARN])
+#  c.set_events([EVENT_TYPE.ORCONN], True)
+  c.set_events([EVENT_TYPE.STREAM, EVENT_TYPE.CIRC,
+          EVENT_TYPE.NEWCONSENSUS, EVENT_TYPE.NEWDESC,
+          EVENT_TYPE.ORCONN, EVENT_TYPE.BW], True)
+
+  th.join()
+  return
+
+if __name__ == '__main__':
+  if len(sys.argv) > 2:
+    print "Syntax: TorControl.py torhost:torport"
+    sys.exit(0)
+  else:
+    sys.argv.append("localhost:9051")
+  sh,sp = parseHostAndPort(sys.argv[1])
+  run_example(sh,sp)
+


Property changes on: arm/dependencies/TorCtl/TorCtl.py
___________________________________________________________________
Added: svn:executable
   + *

Added: arm/dependencies/TorCtl/TorUtil.py
===================================================================
--- arm/dependencies/TorCtl/TorUtil.py	                        (rev 0)
+++ arm/dependencies/TorCtl/TorUtil.py	2010-08-23 01:13:01 UTC (rev 23018)
@@ -0,0 +1,412 @@
+#!/usr/bin/python
+# TorCtl.py -- Python module to interface with Tor Control interface.
+# Copyright 2007 Mike Perry -- See LICENSE for licensing information.
+# Portions Copyright 2005 Nick Matthewson
+
+"""
+TorUtil -- Support functions for TorCtl.py and metatroller
+"""
+
+import os
+import re
+import sys
+import socket
+import binascii
+import math
+import time
+import logging
+import ConfigParser
+
+if sys.version_info < (2, 5):
+  from sha import sha as sha1
+else:
+  from hashlib import sha1
+
+__all__ = ["Enum", "Enum2", "Callable", "sort_list", "quote", "escape_dots", "unescape_dots",
+      "BufSock", "secret_to_key", "urandom_rng", "s2k_gen", "s2k_check", "plog", 
+     "ListenSocket", "zprob", "logfile", "loglevel"]
+
+# TODO: This isn't the right place for these.. But at least it's unified.
+tor_port = 9060
+tor_host = '127.0.0.1'
+
+control_port = 9061
+control_host = '127.0.0.1'
+control_pass = ""
+
+meta_port = 9052
+meta_host = '127.0.0.1'
+
+class Referrer:
+  def __init__(self, cl):
+    self.referrers = {}
+    self.cl_name = cl
+    self.count = 0
+
+  def recurse_store(self, gc, obj, depth, max_depth):
+    if depth >= max_depth: return
+    for r in gc.get_referrers(obj):
+      if hasattr(r, "__class__"):
+        cl = r.__class__.__name__
+        # Skip frames and list iterators.. prob just us
+        if cl in ("frame", "listiterator"): continue 
+        if cl not in self.referrers:
+          self.referrers[cl] = Referrer(cl)
+        self.referrers[cl].count += 1
+        self.referrers[cl].recurse_store(gc, r, depth+1, max_depth)
+
+  def recurse_print(self, rcutoff, depth=""):
+    refs = self.referrers.keys()
+    refs.sort(lambda x, y: self.referrers[y].count - self.referrers[x].count)
+    for r in refs:
+      if self.referrers[r].count > rcutoff:
+        plog("NOTICE", "GC:  "+depth+"Refed by "+r+": "+str(self.referrers[r].count))
+        self.referrers[r].recurse_print(rcutoff, depth+" ")
+
+def dump_class_ref_counts(referrer_depth=2, cutoff=500, rcutoff=1,
+        ignore=('tuple', 'list', 'function', 'dict',
+                 'builtin_function_or_method',
+                 'wrapper_descriptor')):
+  """ Debugging function to track down types of objects
+      that cannot be garbage collected because we hold refs to them 
+      somewhere."""
+  import gc
+  __dump_class_ref_counts(gc, referrer_depth, cutoff, rcutoff, ignore)
+  gc.collect()
+  plog("NOTICE", "GC: Done.")
+
+def __dump_class_ref_counts(gc, referrer_depth, cutoff, rcutoff, ignore):
+  """ loil
+  """
+  plog("NOTICE", "GC: Gathering garbage collection stats...")
+  uncollectable = gc.collect()
+  class_counts = {}
+  referrers = {}
+  plog("NOTICE", "GC: Uncollectable objects: "+str(uncollectable))
+  objs = gc.get_objects()
+  for obj in objs:
+    if hasattr(obj, "__class__"):
+      cl = obj.__class__.__name__
+      if cl in ignore: continue
+      if cl not in class_counts:
+        class_counts[cl] = 0
+        referrers[cl] = Referrer(cl)
+      class_counts[cl] += 1
+  if referrer_depth:
+    for obj in objs:
+      if hasattr(obj, "__class__"):
+        cl = obj.__class__.__name__
+        if cl in ignore: continue
+        if class_counts[cl] > cutoff:
+          referrers[cl].recurse_store(gc, obj, 0, referrer_depth)
+  classes = class_counts.keys()
+  classes.sort(lambda x, y: class_counts[y] - class_counts[x])
+  for c in classes:
+    if class_counts[c] < cutoff: continue
+    plog("NOTICE", "GC: Class "+c+": "+str(class_counts[c]))
+    if referrer_depth:
+      referrers[c].recurse_print(rcutoff)
+
+
+
+def read_config(filename):
+  config = ConfigParser.SafeConfigParser()
+  config.read(filename)
+  global tor_port, tor_host, control_port, control_pass, control_host
+  global meta_port, meta_host
+  global loglevel
+
+  tor_port = config.getint('TorCtl', 'tor_port')
+  meta_port = config.getint('TorCtl', 'meta_port')
+  control_port = config.getint('TorCtl', 'control_port')
+
+  tor_host = config.get('TorCtl', 'tor_host')
+  control_host = config.get('TorCtl', 'control_host')
+  meta_host = config.get('TorCtl', 'meta_host')
+  control_pass = config.get('TorCtl', 'control_pass')
+  loglevel = config.get('TorCtl', 'loglevel')
+
+
+class Enum:
+  """ Defines an ordered dense name-to-number 1-1 mapping """
+  def __init__(self, start, names):
+    self.nameOf = {}
+    idx = start
+    for name in names:
+      setattr(self,name,idx)
+      self.nameOf[idx] = name
+      idx += 1
+
+class Enum2:
+  """ Defines an ordered sparse name-to-number 1-1 mapping """
+  def __init__(self, **args):
+    self.__dict__.update(args)
+    self.nameOf = {}
+    for k,v in args.items():
+      self.nameOf[v] = k
+
+class Callable:
+    def __init__(self, anycallable):
+        self.__call__ = anycallable
+
+def sort_list(list, key):
+  """ Sort a list by a specified key """
+  list.sort(lambda x,y: cmp(key(x), key(y))) # Python < 2.4 hack
+  return list
+
+def quote(s):
+  return re.sub(r'([\r\n\\\"])', r'\\\1', s)
+
+def escape_dots(s, translate_nl=1):
+  if translate_nl:
+    lines = re.split(r"\r?\n", s)
+  else:
+    lines = s.split("\r\n")
+  if lines and not lines[-1]:
+    del lines[-1]
+  for i in xrange(len(lines)):
+    if lines[i].startswith("."):
+      lines[i] = "."+lines[i]
+  lines.append(".\r\n")
+  return "\r\n".join(lines)
+
+def unescape_dots(s, translate_nl=1):
+  lines = s.split("\r\n")
+
+  for i in xrange(len(lines)):
+    if lines[i].startswith("."):
+      lines[i] = lines[i][1:]
+
+  if lines and lines[-1]:
+    lines.append("")
+
+  if translate_nl:
+    return "\n".join(lines)
+  else:
+    return "\r\n".join(lines)
+
+# XXX: Exception handling
+class BufSock:
+  def __init__(self, s):
+    self._s = s
+    self._buf = []
+
+  def readline(self):
+    if self._buf:
+      idx = self._buf[0].find('\n')
+      if idx >= 0:
+        result = self._buf[0][:idx+1]
+        self._buf[0] = self._buf[0][idx+1:]
+        return result
+
+    while 1:
+      s = self._s.recv(128)
+      if not s: return None
+      # XXX: This really does need an exception
+      #  raise ConnectionClosed()
+      idx = s.find('\n')
+      if idx >= 0:
+        self._buf.append(s[:idx+1])
+        result = "".join(self._buf)
+        rest = s[idx+1:]
+        if rest:
+          self._buf = [ rest ]
+        else:
+          del self._buf[:]
+        return result
+      else:
+        self._buf.append(s)
+
+  def write(self, s):
+    self._s.send(s)
+
+  def close(self):
+    self._s.close()
+
+# SocketServer.TCPServer is nuts.. 
+class ListenSocket:
+  def __init__(self, listen_ip, port):
+    msg = None
+    self.s = None
+    for res in socket.getaddrinfo(listen_ip, port, socket.AF_UNSPEC,
+              socket.SOCK_STREAM, 0, socket.AI_PASSIVE):
+      af, socktype, proto, canonname, sa = res
+      try:
+        self.s = socket.socket(af, socktype, proto)
+        self.s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
+      except socket.error, msg:
+        self.s = None
+        continue
+      try:
+        self.s.bind(sa)
+        self.s.listen(1)
+      except socket.error, msg:
+        self.s.close()
+        self.s = None
+        continue
+      break
+    if self.s is None:
+      raise socket.error(msg)
+
+  def accept(self):
+    conn, addr = self.s.accept()
+    return conn
+
+  def close(self):
+    self.s.close()
+
+
+def secret_to_key(secret, s2k_specifier):
+  """Used to generate a hashed password string. DOCDOC."""
+  c = ord(s2k_specifier[8])
+  EXPBIAS = 6
+  count = (16+(c&15)) << ((c>>4) + EXPBIAS)
+
+  d = sha1()
+  tmp = s2k_specifier[:8]+secret
+  slen = len(tmp)
+  while count:
+    if count > slen:
+      d.update(tmp)
+      count -= slen
+    else:
+      d.update(tmp[:count])
+      count = 0
+  return d.digest()
+
+def urandom_rng(n):
+  """Try to read some entropy from the platform entropy source."""
+  f = open('/dev/urandom', 'rb')
+  try:
+    return f.read(n)
+  finally:
+    f.close()
+
+def s2k_gen(secret, rng=None):
+  """DOCDOC"""
+  if rng is None:
+    if hasattr(os, "urandom"):
+      rng = os.urandom
+    else:
+      rng = urandom_rng
+  spec = "%s%s"%(rng(8), chr(96))
+  return "16:%s"%(
+    binascii.b2a_hex(spec + secret_to_key(secret, spec)))
+
+def s2k_check(secret, k):
+  """DOCDOC"""
+  assert k[:3] == "16:"
+
+  k =  binascii.a2b_hex(k[3:])
+  return secret_to_key(secret, k[:9]) == k[9:]
+
+## XXX: Make this a class?
+loglevel = "DEBUG"
+#loglevels = {"DEBUG" : 0, "INFO" : 1, "NOTICE" : 2, "WARN" : 3, "ERROR" : 4, "NONE" : 5}
+logfile = None
+logger = None
+
+# Python logging levels are in increments of 10, so place our custom
+# levels in between Python's default levels.
+loglevels = { "DEBUG":  logging.DEBUG,
+              "INFO":   logging.INFO,
+              "NOTICE": logging.INFO + 5,
+              "WARN":   logging.WARN,
+              "ERROR":  logging.ERROR,
+              "NONE":   logging.ERROR + 5 }
+# Set loglevel => name translation.
+for name, value in loglevels.iteritems():
+  logging.addLevelName(value, name)
+
+def plog_use_logger(name):
+  """ Set the Python logger to use with plog() by name.
+      Useful when TorCtl is integrated with an application using logging.
+      The logger specified by name must be set up before the first call
+      to plog()! """
+  global logger, loglevels
+  logger = logging.getLogger(name)
+
+def plog(level, msg, *args):
+  global logger, logfile
+  if not logger:
+    # Default init = old TorCtl format + default behavior
+    # Default behavior = log to stdout if TorUtil.logfile is None,
+    # or to the open file specified otherwise.
+    logger = logging.getLogger("TorCtl")
+    formatter = logging.Formatter("%(levelname)s[%(asctime)s]:%(message)s",
+                                  "%a %b %d %H:%M:%S %Y")
+
+    if not logfile:
+      logfile = sys.stdout
+    # HACK: if logfile is a string, assume is it the desired filename.
+    if type(logfile) is str:
+      f = logging.FileHandler(logfile)
+      f.setFormatter(formatter)
+      logger.addHandler(f)
+    # otherwise, pretend it is a stream.
+    else:
+      ch = logging.StreamHandler(logfile)
+      ch.setFormatter(formatter)
+      logger.addHandler(ch)
+    logger.setLevel(loglevels[loglevel])
+
+  logger.log(loglevels[level], msg, *args)
+
+# The following zprob routine was stolen from
+# http://www.nmr.mgh.harvard.edu/Neural_Systems_Group/gary/python/stats.py
+# pursuant to this license:
+#
+# Copyright (c) 1999-2007 Gary Strangman; All Rights Reserved.
+#
+# Permission is hereby granted, free of charge, to any person obtaining a copy
+# of this software and associated documentation files (the "Software"), to
+# deal in the Software without restriction, including without limitation the
+# rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
+# sell copies of the Software, and to permit persons to whom the Software is
+# furnished to do so, subject to the following conditions:
+#
+# The above copyright notice and this permission notice shall be included in
+# all copies or substantial portions of the Software.
+#
+# The above license applies only to the following 39 lines of code.
+def zprob(z):
+    """
+Returns the area under the normal curve 'to the left of' the given z value.
+Thus, 
+    for z<0, zprob(z) = 1-tail probability
+    for z>0, 1.0-zprob(z) = 1-tail probability
+    for any z, 2.0*(1.0-zprob(abs(z))) = 2-tail probability
+Adapted from z.c in Gary Perlman's |Stat.
+
+Usage:   lzprob(z)
+"""
+    Z_MAX = 6.0    # maximum meaningful z-value
+    if z == 0.0:
+        x = 0.0
+    else:
+        y = 0.5 * math.fabs(z)
+        if y >= (Z_MAX*0.5):
+            x = 1.0
+        elif (y < 1.0):
+            w = y*y
+            x = ((((((((0.000124818987 * w
+                        -0.001075204047) * w +0.005198775019) * w
+                      -0.019198292004) * w +0.059054035642) * w
+                    -0.151968751364) * w +0.319152932694) * w
+                  -0.531923007300) * w +0.797884560593) * y * 2.0
+        else:
+            y = y - 2.0
+            x = (((((((((((((-0.000045255659 * y
+                             +0.000152529290) * y -0.000019538132) * y
+                           -0.000676904986) * y +0.001390604284) * y
+                         -0.000794620820) * y -0.002034254874) * y
+                       +0.006549791214) * y -0.010557625006) * y
+                     +0.011630447319) * y -0.009279453341) * y
+                   +0.005353579108) * y -0.002141268741) * y
+                 +0.000535310849) * y +0.999936657524
+    if z > 0.0:
+        prob = ((x+1.0)*0.5)
+    else:
+        prob = ((1.0-x)*0.5)
+    return prob
+

Added: arm/dependencies/TorCtl/__init__.py
===================================================================
--- arm/dependencies/TorCtl/__init__.py	                        (rev 0)
+++ arm/dependencies/TorCtl/__init__.py	2010-08-23 01:13:01 UTC (rev 23018)
@@ -0,0 +1,28 @@
+"""
+TorCtl is a python Tor controller with extensions to support path
+building and various constraints on node and path selection, as well as
+statistics gathering.
+
+Apps can hook into the TorCtl package at whatever level they wish. 
+
+The lowest level of interaction is to use the TorCtl module
+(TorCtl/TorCtl.py). Typically this is done by importing TorCtl.TorCtl
+and creating a TorCtl.Connection and extending from TorCtl.EventHandler.
+This class receives Tor controller events packaged into python classes
+from a TorCtl.Connection.
+
+The next level up is to use the TorCtl.PathSupport module. This is done
+by importing TorCtl.PathSupport and instantiating or extending from
+PathSupport.PathBuilder, which itself extends from TorCtl.EventHandler.
+This class handles circuit construction and stream attachment subject to
+policies defined by PathSupport.NodeRestrictor and
+PathSupport.PathRestrictor implementations.
+
+If you are interested in gathering statistics, you can instead
+instantiate or extend from StatsSupport.StatsHandler, which is
+again an event handler with hooks to record statistics on circuit
+creation, stream bandwidth, and circuit failure information.
+"""
+
+__all__ = ["TorUtil", "GeoIPSupport", "PathSupport", "TorCtl", "StatsSupport",
+           "SQLSupport", "ScanSupport"]

Added: arm/dependencies/notes.txt
===================================================================
--- arm/dependencies/notes.txt	                        (rev 0)
+++ arm/dependencies/notes.txt	2010-08-23 01:13:01 UTC (rev 23018)
@@ -0,0 +1,6 @@
+TorCtl -
+  Last Updated: 8/22/10 (c514a0a7105cebe7cc5fa199750b90369b820bfb):
+  To update run the following:
+    git clone git://git.torproject.org/pytorctl.git
+    git archive master | tar -x -C /path/to/dependences/TorCtl/
+


Property changes on: arm/release
___________________________________________________________________
Modified: svn:externals
   - TorCtl https://svn.torproject.org/svn/torctl/trunk/python/TorCtl

   + TorCtl https://svn.torproject.org/svn/arm/dependencies/TorCtl



Property changes on: arm/trunk
___________________________________________________________________
Deleted: svn:externals
   - TorCtl https://svn.torproject.org/svn/torctl/trunk/python/TorCtl



Property changes on: arm/trunk/src
___________________________________________________________________
Added: svn:externals
   + TorCtl https://svn.torproject.org/svn/arm/dependencies/TorCtl