[Author Prev][Author Next][Thread Prev][Thread Next][Author Index][Thread Index]

[or-cvs] r9717: Refactored all circuit and stream handling into PathSupport. (in torflow/trunk: . TorCtl)



Author: mikeperry
Date: 2007-03-03 01:56:05 -0500 (Sat, 03 Mar 2007)
New Revision: 9717

Modified:
   torflow/trunk/TorCtl/PathSupport.py
   torflow/trunk/metatroller.py
Log:
Refactored all circuit and stream handling into PathSupport. The only thing
that will live in metatroller.py now is statistics.



Modified: torflow/trunk/TorCtl/PathSupport.py
===================================================================
--- torflow/trunk/TorCtl/PathSupport.py	2007-03-03 05:11:38 UTC (rev 9716)
+++ torflow/trunk/TorCtl/PathSupport.py	2007-03-03 06:56:05 UTC (rev 9717)
@@ -15,7 +15,8 @@
 "VersionExcludeRestriction", "ExitPolicyRestriction", "OrNodeRestriction",
 "AtLeastNNodeRestriction", "NotNodeRestriction", "Subnet16Restriction",
 "UniqueRestriction", "UniformGenerator", "OrderedExitGenerator",
-"PathSelector", "Connection", "NickRestriction", "IdHexRestriction"]
+"PathSelector", "Connection", "NickRestriction", "IdHexRestriction",
+"PathBuilder"]
 
 #################### Path Support Interfaces #####################
 
@@ -408,7 +409,320 @@
                 return r
         raise NoRouters();
 
+class SelectionManager:
+    """Helper class to handle configuration updates
+      
+      The methods are NOT threadsafe. They may ONLY be called from
+      EventHandler's thread.
 
+      When updating the configuration, make a copy of this object, modify it,
+      and then submit it to PathBuilder.update_selmgr(). Since assignments
+      are atomic (GIL), this patten is threadsafe.
+      """
+    def __init__(self, resolve_port, num_circuits, pathlen, order_exits,
+                 percent_fast, percent_skip, min_bw, use_all_exits,
+                 uniform, use_exit, use_guards):
+        self.__ordered_exit_gen = None # except this one ;)
+        self.last_exit = None
+        self.new_nym = False
+        self.resolve_port = resolve_port
+        self.num_circuits = num_circuits
+        self.pathlen = pathlen
+        self.order_exits = order_exits
+        self.percent_fast = percent_fast
+        self.percent_skip = percent_skip
+        self.min_bw = min_bw
+        self.use_all_exits = use_all_exits
+        self.uniform = uniform
+        self.exit_name = use_exit
+        self.use_guards = use_guards
+
+    def reconfigure(self, sorted_r):
+        """
+        Member variables from this funciton should not be modified by other
+        threads.
+        """
+        if self.use_all_exits:
+            self.path_rstr = PathRestrictionList([])
+        else:
+            self.path_rstr = PathRestrictionList(
+                     [Subnet16Restriction(), UniqueRestriction()])
+            
+        self.entry_rstr = NodeRestrictionList(
+            [
+             PercentileRestriction(self.percent_skip, self.percent_fast,
+                sorted_r),
+             ConserveExitsRestriction(),
+             FlagsRestriction(["Guard", "Valid", "Running"], [])
+             ], sorted_r)
+        self.mid_rstr = NodeRestrictionList(
+            [PercentileRestriction(self.percent_skip, self.percent_fast,
+                sorted_r),
+             ConserveExitsRestriction(),
+             FlagsRestriction(["Valid", "Running"], [])], sorted_r)
+
+        if self.use_all_exits:
+            self.exit_rstr = NodeRestrictionList(
+                [FlagsRestriction(["Valid", "Running"], ["BadExit"])], sorted_r)
+        else:
+            self.exit_rstr = NodeRestrictionList(
+                [PercentileRestriction(self.percent_skip, self.percent_fast,
+                   sorted_r),
+                 FlagsRestriction(["Valid", "Running"], ["BadExit"])],
+                 sorted_r)
+
+        if self.exit_name:
+            if self.exit_name[0] == '$':
+                self.exit_rstr.add_restriction(IdHexRestriction(self.exit_name))
+            else:
+                self.exit_rstr.add_restriction(NickRestriction(self.exit_name))
+
+        # This is kind of hokey..
+        if self.order_exits:
+            if self.__ordered_exit_gen:
+                exitgen = self.__ordered_exit_gen
+            else:
+                exitgen = self.__ordered_exit_gen = \
+                    OrderedExitGenerator(self.exit_rstr, 80)
+        else:
+            exitgen = UniformGenerator(self.exit_rstr)
+
+        if self.uniform:
+            self.path_selector = PathSelector(
+                 UniformGenerator(self.entry_rstr),
+                 UniformGenerator(self.mid_rstr),
+                 exitgen, self.path_rstr)
+        else:
+            raise NotImplemented()
+
+    def set_target(self, ip, port):
+        self.exit_rstr.del_restriction(ExitPolicyRestriction)
+        self.exit_rstr.add_restriction(ExitPolicyRestriction(ip, port))
+        if self.__ordered_exit_gen: self.__ordered_exit_gen.set_port(port)
+
+    def update_routers(self, new_rlist):
+        self.entry_rstr.update_routers(new_rlist)
+        self.mid_rstr.update_routers(new_rlist)
+        self.exit_rstr.update_routers(new_rlist)
+
+class Circuit(TorCtl.Circuit):
+    def __init__(self, circuit): # Promotion constructor
+        # perf shortcut since we don't care about the 'circuit' 
+        # instance after this
+        self.__dict__ = circuit.__dict__
+        self.built = False
+        self.detached_cnt = 0
+        self.created_at = datetime.datetime.now()
+        self.pending_streams = [] # Which stream IDs are pending us
+
+class Stream:
+    def __init__(self, sid, host, port):
+        self.sid = sid
+        self.detached_from = [] # circ id #'s
+        self.pending_circ = None
+        self.circ = None
+        self.host = host
+        self.port = port
+
+# TODO: Make passive "PathWatcher" so people can get aggregate 
+# node reliability stats for normal usage without us attaching streams
+
+class PathBuilder(TorCtl.EventHandler):
+    def __init__(self, c, selmgr, RouterClass):
+        TorCtl.EventHandler.__init__(self)
+        self.c = c
+        nslist = c.get_network_status()
+        self.RouterClass = RouterClass
+        self.sorted_r = []
+        self.routers = {}
+        self.circuits = {}
+        self.streams = {}
+        self.read_routers(nslist)
+        self.selupdate = selmgr # other threads can fully examine this safely
+        self.selmgr = selmgr # other threads can read single values safely
+        self.selmgr.reconfigure(self.sorted_r)
+        plog("INFO", "Read "+str(len(self.sorted_r))+"/"+str(len(nslist))+" routers")
+
+    def read_routers(self, nslist):
+        new_routers = map(self.RouterClass, self.c.read_routers(nslist))
+        for r in new_routers:
+            if r.idhex in self.routers:
+                if self.routers[r.idhex].nickname != r.nickname:
+                    plog("NOTICE", "Router "+r.idhex+" changed names from "
+                         +self.routers[r.idhex].nickname+" to "+r.nickname)
+                self.sorted_r.remove(self.routers[r.idhex])
+            self.routers[r.idhex] = r
+        self.sorted_r.extend(new_routers)
+        self.sorted_r.sort(lambda x, y: cmp(y.bw, x.bw))
+
+    def attach_stream_any(self, stream, badcircs):
+        # Newnym, and warn if not built plus pending
+        unattached_streams = [stream]
+        if self.selmgr.new_nym:
+            self.selmgr.new_nym = False
+            plog("DEBUG", "Obeying new nym")
+            for key in self.circuits.keys():
+                if len(self.circuits[key].pending_streams):
+                    plog("WARN", "New nym called, destroying circuit "+str(key)
+                         +" with "+str(len(self.circuits[key].pending_streams))
+                         +" pending streams")
+                    unattached_streams.extend(self.circuits[key].pending_streams)
+                # FIXME: Consider actually closing circ if no streams.
+                del self.circuits[key]
+            
+        for circ in self.circuits.itervalues():
+            if circ.built and circ.cid not in badcircs:
+                if circ.exit.will_exit_to(stream.host, stream.port):
+                    try:
+                        self.c.attach_stream(stream.sid, circ.cid)
+                        stream.pending_circ = circ # Only one possible here
+                        circ.pending_streams.append(stream)
+                    except TorCtl.ErrorReply, e:
+                        # No need to retry here. We should get the failed
+                        # event for either the circ or stream next
+                        plog("NOTICE", "Error attaching stream: "+str(e.args))
+                        return
+                    break
+        else:
+            circ = None
+            while circ == None:
+                self.selmgr.set_target(stream.host, stream.port)
+                try:
+                    circ = Circuit(self.c.build_circuit(
+                                    self.selmgr.pathlen,
+                                    self.selmgr.path_selector))
+                except TorCtl.ErrorReply, e:
+                    # FIXME: How come some routers are non-existant? Shouldn't
+                    # we have gotten an NS event to notify us they
+                    # disappeared?
+                    plog("NOTICE", "Error building circ: "+str(e.args))
+            for u in unattached_streams:
+                plog("DEBUG",
+                     "Attaching "+str(u.sid)+" pending build of "+str(circ.cid))
+                u.pending_circ = circ
+            circ.pending_streams.extend(unattached_streams)
+            self.circuits[circ.cid] = circ
+        self.selmgr.last_exit = circ.exit
+
+    #Relying on GIL for weak atomicity instead of locking. 
+    #http://effbot.org/pyfaq/can-t-we-get-rid-of-the-global-interpreter-lock.htm
+    #http://effbot.org/pyfaq/what-kinds-of-global-value-mutation-are-thread-safe.htm
+    def update_selmgr(self, selmgr):
+        "This is the _ONLY_ method that can be called from another thread"
+        self.selupdate = selmgr
+
+    def heartbeat_event(self, event):
+        if id(self.selupdate) != id(self.selmgr):
+            self.selmgr = self.selupdate
+            self.selmgr.reconfigure(self.sorted_r)
+    
+    def circ_status_event(self, c):
+        output = [c.event_name, str(c.circ_id), c.status]
+        if c.path: output.append(",".join(c.path))
+        if c.reason: output.append("REASON=" + c.reason)
+        if c.remote_reason: output.append("REMOTE_REASON=" + c.remote_reason)
+        plog("DEBUG", " ".join(output))
+        # Circuits we don't control get built by Tor
+        if c.circ_id not in self.circuits:
+            plog("DEBUG", "Ignoring circ " + str(c.circ_id))
+            return
+        if c.status == "FAILED" or c.status == "CLOSED":
+            circ = self.circuits[c.circ_id]
+            del self.circuits[c.circ_id]
+            for stream in circ.pending_streams:
+                plog("DEBUG", "Finding new circ for " + str(stream.sid))
+                self.attach_stream_any(stream, stream.detached_from)
+        elif c.status == "BUILT":
+            self.circuits[c.circ_id].built = True
+            for stream in self.circuits[c.circ_id].pending_streams:
+                self.c.attach_stream(stream.sid, c.circ_id)
+
+    def stream_status_event(self, s):
+        output = [s.event_name, str(s.strm_id), s.status, str(s.circ_id),
+                  s.target_host, str(s.target_port)]
+        if s.reason: output.append("REASON=" + s.reason)
+        if s.remote_reason: output.append("REMOTE_REASON=" + s.remote_reason)
+        plog("DEBUG", " ".join(output))
+        if not re.match(r"\d+.\d+.\d+.\d+", s.target_host):
+            s.target_host = "255.255.255.255" # ignore DNS for exit policy check
+        if s.status == "NEW" or s.status == "NEWRESOLVE":
+            if s.status == "NEWRESOLVE" and not s.target_port:
+                s.target_port = self.selmgr.resolve_port
+            self.streams[s.strm_id] = Stream(s.strm_id, s.target_host, s.target_port)
+
+            self.attach_stream_any(self.streams[s.strm_id],
+                                   self.streams[s.strm_id].detached_from)
+        elif s.status == "DETACHED":
+            if s.strm_id not in self.streams:
+                plog("WARN", "Detached stream "+str(s.strm_id)+" not found")
+                self.streams[s.strm_id] = Stream(s.strm_id, s.target_host,
+                                            s.target_port)
+            # FIXME Stats (differentiate Resolved streams also..)
+            if not s.circ_id:
+                plog("WARN", "Stream "+str(s.strm_id)+" detached from no circuit!")
+            else:
+                self.streams[s.strm_id].detached_from.append(s.circ_id)
+
+            
+            if self.streams[s.strm_id] in self.streams[s.strm_id].pending_circ.pending_streams:
+                self.streams[s.strm_id].pending_circ.pending_streams.remove(self.streams[s.strm_id])
+            self.streams[s.strm_id].pending_circ = None
+            self.attach_stream_any(self.streams[s.strm_id],
+                                   self.streams[s.strm_id].detached_from)
+        elif s.status == "SUCCEEDED":
+            if s.strm_id not in self.streams:
+                plog("NOTICE", "Succeeded stream "+str(s.strm_id)+" not found")
+                return
+            self.streams[s.strm_id].circ = self.streams[s.strm_id].pending_circ
+            self.streams[s.strm_id].circ.pending_streams.remove(self.streams[s.strm_id])
+            self.streams[s.strm_id].pending_circ = None
+        elif s.status == "FAILED" or s.status == "CLOSED":
+            # FIXME stats
+            if s.strm_id not in self.streams:
+                plog("NOTICE", "Failed stream "+str(s.strm_id)+" not found")
+                return
+
+            if not s.circ_id:
+                plog("WARN", "Stream "+str(s.strm_id)+" failed from no circuit!")
+
+            # We get failed and closed for each stream. OK to return 
+            # and let the closed do the cleanup
+            # (FIXME: be careful about double stats)
+            if s.status == "FAILED":
+                # Avoid busted circuits that will not resolve or carry
+                # traffic. FIXME: Failed count before doing this?
+                if s.circ_id in self.circuits: del self.circuits[s.circ_id]
+                else: plog("WARN","Failed stream on unknown circ "+str(s.circ_id))
+                return
+
+            if self.streams[s.strm_id].pending_circ:
+                self.streams[s.strm_id].pending_circ.pending_streams.remove(self.streams[s.strm_id])
+            del self.streams[s.strm_id]
+        elif s.status == "REMAP":
+            if s.strm_id not in self.streams:
+                plog("WARN", "Remap id "+str(s.strm_id)+" not found")
+            else:
+                if not re.match(r"\d+.\d+.\d+.\d+", s.target_host):
+                    s.target_host = "255.255.255.255"
+                    plog("NOTICE", "Non-IP remap for "+str(s.strm_id)+" to "
+                                   + s.target_host)
+                self.streams[s.strm_id].host = s.target_host
+                self.streams[s.strm_id].port = s.target_port
+
+
+    def ns_event(self, n):
+        self.read_routers(n.nslist)
+        plog("DEBUG", "Read " + str(len(n.nslist))+" NS => " 
+             + str(len(self.sorted_r)) + " routers")
+        self.selmgr.update_routers(self.sorted_r)
+    
+    def new_desc_event(self, d):
+        for i in d.idlist: # Is this too slow?
+            self.read_routers(self.c.get_network_status("id/"+i))
+        plog("DEBUG", "Read " + str(len(d.idlist))+" Desc => " 
+             + str(len(self.sorted_r)) + " routers")
+        self.selmgr.update_routers(self.sorted_r)
+
 ########################## Unit tests ##########################
 
 
@@ -438,7 +752,6 @@
                   lambda r: "")
     do_unit(PercentileRestriction(10, 20, sorted_rlist), sorted_rlist,
                   lambda r: "")
-    exit(0)
     do_unit(OSRestriction([r"[lL]inux", r"BSD", "Darwin"], []), sorted_rlist,
                   lambda r: r.os)
     do_unit(OSRestriction([], ["Windows", "Solaris"]), sorted_rlist,
@@ -471,11 +784,11 @@
         print "Checking: " + r.nickname
         for rs in rl:
             if not rs.r_is_ok(r):
-                raise PathException()
+                raise PathError()
             if not "Exit" in r.flags:
                 print "No exit in flags of "+r.nickname
         rlist.append(r)
-    for r in sorted_r:
+    for r in sorted_rlist:
         if "Exit" in r.flags and not r in rlist:
             print r.nickname+" is an exit not in rl!"
                 

Modified: torflow/trunk/metatroller.py
===================================================================
--- torflow/trunk/metatroller.py	2007-03-03 05:11:38 UTC (rev 9716)
+++ torflow/trunk/metatroller.py	2007-03-03 06:56:05 UTC (rev 9717)
@@ -19,14 +19,6 @@
 from TorCtl.TorUtil import *
 from TorCtl.PathSupport import *
 
-routers = {} # indexed by idhex
-name_to_key = {}
-
-sorted_r = []
-
-circuits = {} # map from ID # to circuit object
-streams = {} # map from stream id to circuit
-
 mt_version = "0.1.0-dev"
 
 # TODO: Move these to config file
@@ -37,351 +29,50 @@
 meta_port = 9052
 max_detach = 3
 
-# Thread shared variables. Relying on GIL for weak atomicity (really
-# we only care about corruption.. GIL prevents that, so no locking needed)
-# http://effbot.org/pyfaq/can-t-we-get-rid-of-the-global-interpreter-lock.htm
-# http://effbot.org/pyfaq/what-kinds-of-global-value-mutation-are-thread-safe.htm
+selmgr = PathSupport.SelectionManager(
+            resolve_port=0,
+            num_circuits=1,
+            pathlen=3,
+            order_exits=False,
+            percent_fast=100,
+            percent_skip=0,
+            min_bw=1024,
+            use_all_exits=False,
+            uniform=True,
+            use_exit=None,
+            use_guards=False)
 
-last_exit = None
-resolve_port = 0
-num_circuits = 1 # TODO: Use
-new_nym = False
-pathlen=3
 
-
-
 # Technically we could just add member vars as we need them, but this
 # is a bit more clear
-class MetaRouter(TorCtl.Router):
+class StatsRouter(TorCtl.Router):
     def __init__(self, router): # Promotion constructor :)
         self.__dict__ = router.__dict__
         self.failed = 0
         self.suspected = 0
         self.circ_selections = 0
         self.strm_selections = 0
+        self.unhibernated_at = 0
+        self.active_uptime = 0
         self.reason_suspected = {}
         self.reason_failed = {}
 
-class MetaCircuit(TorCtl.Circuit):
-    def __init__(self, circuit): # Promotion
-        self.__dict__ = circuit.__dict__
-        self.built = False
-        self.detached_cnt = 0
-        self.used_cnt = 0
-        self.created_at = datetime.datetime.now()
-        # XXX: build time (export to stats)
-        # XXX: Study timeout's effect on success rates...
-        self.pending_streams = [] # Which stream IDs are pending us
+class StatsHandler(PathSupport.PathBuilder):
+    def __init__(self, c, slmgr):
+        PathBuilder.__init__(self, c, slmgr, StatsRouter)
 
-    
-class Stream:
-    def __init__(self, sid, host, port):
-        self.sid = sid
-        self.detached_from = [] # circ id #'s
-        self.pending_circ = None
-        self.circ = None
-        self.host = host
-        self.port = port
+    def circ_status_event(self, event):
+        PathBuilder.circ_status_event(self, event)
 
-# XXX: Scheduled to be moved to PathSupport
-class SelectionManager:
-    """Helper class to handle configuration updates
-      
-      The methods are NOT threadsafe. They may ONLY be called from
-      EventHandler's thread.
+    def stream_status_event(self, event):
+        PathBuilder.stream_status_event(self, event)
 
-      However, the variables defined in init can be modified anywhere."""
-    def __init__(self, order_exits, percent_fast, percent_skip, min_bw,
-                 use_all_exits, uniform, use_exit, use_guards):
-        "The member variables defined here may be modified by other threads"
-        self.__ordered_exit_gen = None # except this one ;)
-        self.order_exits = order_exits
-        self.percent_fast = percent_fast
-        self.percent_skip = percent_skip
-        self.min_bw = min_bw
-        self.use_all_exits = use_all_exits
-        self.uniform = uniform
-        self.exit_name = use_exit
-        self.use_guards = use_guards
-
-    def reconfigure(self):
-        """
-        Member variables from this funciton should not be modified by other
-        threads.
-        """
-        if self.use_all_exits:
-            self.path_rstr = PathRestrictionList([])
-        else:
-            self.path_rstr = PathRestrictionList(
-                     [Subnet16Restriction(), UniqueRestriction()])
-            
-        self.entry_rstr = NodeRestrictionList(
-            [
-             PercentileRestriction(self.percent_skip, self.percent_fast,
-                sorted_r),
-             ConserveExitsRestriction(),
-             FlagsRestriction(["Guard", "Valid", "Running"], [])
-             ], sorted_r)
-        self.mid_rstr = NodeRestrictionList(
-            [PercentileRestriction(self.percent_skip, self.percent_fast,
-                sorted_r),
-             ConserveExitsRestriction(),
-             FlagsRestriction(["Valid", "Running"], [])], sorted_r)
-
-        if self.use_all_exits:
-            self.exit_rstr = NodeRestrictionList(
-                [FlagsRestriction(["Valid", "Running"], ["BadExit"])], sorted_r)
-        else:
-            self.exit_rstr = NodeRestrictionList(
-                [PercentileRestriction(self.percent_skip, self.percent_fast,
-                   sorted_r),
-                 FlagsRestriction(["Valid", "Running"], ["BadExit"])],
-                 sorted_r)
-
-        if self.exit_name:
-            if self.exit_name[0] == '$':
-                self.exit_rstr.add_restriction(IdHexRestriction(self.exit_name))
-            else:
-                self.exit_rstr.add_restriction(NickRestriction(self.exit_name))
-
-        # This is kind of hokey..
-        if self.order_exits:
-            if self.__ordered_exit_gen:
-                exitgen = self.__ordered_exit_gen
-            else:
-                exitgen = self.__ordered_exit_gen = \
-                    OrderedExitGenerator(self.exit_rstr, 80)
-        else:
-            exitgen = UniformGenerator(self.exit_rstr)
-
-        if self.uniform:
-            self.path_selector = PathSelector(
-                 UniformGenerator(self.entry_rstr),
-                 UniformGenerator(self.mid_rstr),
-                 exitgen, self.path_rstr)
-        else:
-            raise NotImplemented()
-
-    def set_target(self, ip, port):
-        self.exit_rstr.del_restriction(ExitPolicyRestriction)
-        self.exit_rstr.add_restriction(ExitPolicyRestriction(ip, port))
-        if self.__ordered_exit_gen: self.__ordered_exit_gen.set_port(port)
-
-    def update_routers(self, new_rlist):
-        self.entry_rstr.update_routers(new_rlist)
-        self.mid_rstr.update_routers(new_rlist)
-        self.exit_rstr.update_routers(new_rlist)
-
-
-selmgr = SelectionManager(
-         order_exits=False,
-         percent_fast=100,
-         percent_skip=0,
-         min_bw=1024,
-         use_all_exits=False,
-         uniform=True,
-         use_exit=None,
-         use_guards=False)
-
-# TODO: Make passive mode so people can get aggregate node reliability 
-# stats for normal usage without us attaching streams
-
-# XXX: Scheduled to be moved to PathSupport and refactored
-class SnakeHandler(TorCtl.EventHandler):
-    def __init__(self, c, slnmgr):
-        TorCtl.EventHandler.__init__(self)
-        self.c = c
-        nslist = c.get_network_status()
-        self.read_routers(nslist)
-        self.selmgr = slnmgr
-        self.selmgr.reconfigure()
-        plog("INFO", "Read "+str(len(sorted_r))+"/"+str(len(nslist))+" routers")
-
-    def read_routers(self, nslist):
-        new_routers = map(MetaRouter, self.c.read_routers(nslist))
-        for r in new_routers:
-            if r.idhex in routers:
-                if routers[r.idhex].nickname != r.nickname:
-                    plog("NOTICE", "Router "+r.idhex+" changed names from "
-                         +routers[r.idhex].nickname+" to "+r.nickname)
-                sorted_r.remove(routers[r.idhex])
-            routers[r.idhex] = r
-            name_to_key[r.nickname] = r.idhex
-        sorted_r.extend(new_routers)
-        sorted_r.sort(lambda x, y: cmp(y.bw, x.bw))
-
-    def attach_stream_any(self, stream, badcircs):
-        # Newnym, and warn if not built plus pending
-        unattached_streams = [stream]
-        global new_nym
-        if new_nym:
-            new_nym = False
-            plog("DEBUG", "Obeying new nym")
-            for key in circuits.keys():
-                if len(circuits[key].pending_streams):
-                    plog("WARN", "New nym called, destroying circuit "+str(key)
-                         +" with "+str(len(circuits[key].pending_streams))
-                         +" pending streams")
-                    unattached_streams.extend(circuits[key].pending_streams)
-                # FIXME: Consider actually closing circ if no streams.
-                del circuits[key]
-            
-        for circ in circuits.itervalues():
-            if circ.built and circ.cid not in badcircs:
-                if circ.exit.will_exit_to(stream.host, stream.port):
-                    try:
-                        self.c.attach_stream(stream.sid, circ.cid)
-                        stream.pending_circ = circ # Only one possible here
-                        circ.pending_streams.append(stream)
-                    except TorCtl.ErrorReply, e:
-                        # No need to retry here. We should get the failed
-                        # event for either the circ or stream next
-                        plog("NOTICE", "Error attaching stream: "+str(e.args))
-                        return
-                    break
-        else:
-            circ = None
-            while circ == None:
-                self.selmgr.set_target(stream.host, stream.port)
-                try:
-                    circ = MetaCircuit(self.c.build_circuit(pathlen,
-                                    self.selmgr.path_selector))
-                except TorCtl.ErrorReply, e:
-                    # FIXME: How come some routers are non-existant? Shouldn't
-                    # we have gotten an NS event to notify us they
-                    # disappeared?
-                    plog("NOTICE", "Error building circ: "+str(e.args))
-            for u in unattached_streams:
-                plog("DEBUG",
-                     "Attaching "+str(u.sid)+" pending build of "+str(circ.cid))
-                u.pending_circ = circ
-            circ.pending_streams.extend(unattached_streams)
-            circuits[circ.cid] = circ
-        global last_exit # Last attempted exit
-        last_exit = circ.exit
-
-    def heartbeat_event(self, event):
-        global selmgr
-        if id(self.selmgr) != id(selmgr):
-            self.selmgr = selmgr
-            self.selmgr.reconfigure()
-
-    def circ_status_event(self, c):
-        output = [c.event_name, str(c.circ_id), c.status]
-        if c.path: output.append(",".join(c.path))
-        if c.reason: output.append("REASON=" + c.reason)
-        if c.remote_reason: output.append("REMOTE_REASON=" + c.remote_reason)
-        plog("DEBUG", " ".join(output))
-        # Circuits we don't control get built by Tor
-        if c.circ_id not in circuits:
-            plog("DEBUG", "Ignoring circ " + str(c.circ_id))
-            return
-        if c.status == "FAILED" or c.status == "CLOSED":
-            circ = circuits[c.circ_id]
-            del circuits[c.circ_id]
-            for stream in circ.pending_streams:
-                plog("DEBUG", "Finding new circ for " + str(stream.sid))
-                self.attach_stream_any(stream, stream.detached_from)
-        elif c.status == "BUILT":
-            circuits[c.circ_id].built = True
-            for stream in circuits[c.circ_id].pending_streams:
-                self.c.attach_stream(stream.sid, c.circ_id)
-                circuits[c.circ_id].used_cnt += 1
-
-    def stream_status_event(self, s):
-        output = [s.event_name, str(s.strm_id), s.status, str(s.circ_id),
-                  s.target_host, str(s.target_port)]
-        if s.reason: output.append("REASON=" + s.reason)
-        if s.remote_reason: output.append("REMOTE_REASON=" + s.remote_reason)
-        plog("DEBUG", " ".join(output))
-        if not re.match(r"\d+.\d+.\d+.\d+", s.target_host):
-            s.target_host = "255.255.255.255" # ignore DNS for exit policy check
-        if s.status == "NEW" or s.status == "NEWRESOLVE":
-            if s.status == "NEWRESOLVE" and not s.target_port:
-                s.target_port = resolve_port
-            streams[s.strm_id] = Stream(s.strm_id, s.target_host, s.target_port)
-
-            self.attach_stream_any(streams[s.strm_id],
-                                   streams[s.strm_id].detached_from)
-        elif s.status == "DETACHED":
-            if s.strm_id not in streams:
-                plog("WARN", "Detached stream "+str(s.strm_id)+" not found")
-                streams[s.strm_id] = Stream(s.strm_id, s.target_host,
-                                            s.target_port)
-            # FIXME Stats (differentiate Resolved streams also..)
-            if not s.circ_id:
-                plog("WARN", "Stream "+str(s.strm_id)+" detached from no circuit!")
-            else:
-                streams[s.strm_id].detached_from.append(s.circ_id)
-
-            
-            if streams[s.strm_id] in streams[s.strm_id].pending_circ.pending_streams:
-                streams[s.strm_id].pending_circ.pending_streams.remove(streams[s.strm_id])
-            streams[s.strm_id].pending_circ = None
-            self.attach_stream_any(streams[s.strm_id],
-                                   streams[s.strm_id].detached_from)
-        elif s.status == "SUCCEEDED":
-            if s.strm_id not in streams:
-                plog("NOTICE", "Succeeded stream "+str(s.strm_id)+" not found")
-                return
-            streams[s.strm_id].circ = streams[s.strm_id].pending_circ
-            streams[s.strm_id].circ.pending_streams.remove(streams[s.strm_id])
-            streams[s.strm_id].pending_circ = None
-            streams[s.strm_id].circ.used_cnt += 1
-        elif s.status == "FAILED" or s.status == "CLOSED":
-            # FIXME stats
-            if s.strm_id not in streams:
-                plog("NOTICE", "Failed stream "+str(s.strm_id)+" not found")
-                return
-
-            if not s.circ_id:
-                plog("WARN", "Stream "+str(s.strm_id)+" failed from no circuit!")
-
-            # We get failed and closed for each stream. OK to return 
-            # and let the closed do the cleanup
-            # (FIXME: be careful about double stats)
-            if s.status == "FAILED":
-                # Avoid busted circuits that will not resolve or carry
-                # traffic. FIXME: Failed count before doing this?
-                if s.circ_id in circuits: del circuits[s.circ_id]
-                else: plog("WARN","Failed stream on unknown circ "+str(s.circ_id))
-                return
-
-            if streams[s.strm_id].pending_circ:
-                streams[s.strm_id].pending_circ.pending_streams.remove(streams[s.strm_id])
-            del streams[s.strm_id]
-        elif s.status == "REMAP":
-            if s.strm_id not in streams:
-                plog("WARN", "Remap id "+str(s.strm_id)+" not found")
-            else:
-                if not re.match(r"\d+.\d+.\d+.\d+", s.target_host):
-                    s.target_host = "255.255.255.255"
-                    plog("NOTICE", "Non-IP remap for "+str(s.strm_id)+" to "
-                                   + s.target_host)
-                streams[s.strm_id].host = s.target_host
-                streams[s.strm_id].port = s.target_port
-
-
-    def ns_event(self, n):
-        self.read_routers(n.nslist)
-        plog("DEBUG", "Read " + str(len(n.nslist))+" NS => " 
-             + str(len(sorted_r)) + " routers")
-        self.selmgr.update_routers(sorted_r)
-    
-    def new_desc_event(self, d):
-        for i in d.idlist: # Is this too slow?
-            self.read_routers(self.c.get_network_status("id/"+i))
-        plog("DEBUG", "Read " + str(len(d.idlist))+" Desc => " 
-             + str(len(sorted_r)) + " routers")
-        self.selmgr.update_routers(sorted_r)
-
 def clear_dns_cache(c):
     lines = c.sendAndRecv("SIGNAL CLEARDNSCACHE\r\n")
     for _,msg,more in lines:
         plog("DEBUG", msg)
  
-def commandloop(s, c):
-    global selmgr
+def commandloop(s, c, h):
     s.write("220 Welcome to the Tor Metatroller "+mt_version+"! Try HELP for Info\r\n\r\n")
     while 1:
         buf = s.readline()
@@ -394,12 +85,14 @@
             continue
         (command, arg) = m.groups()
         if command == "GETLASTEXIT":
-            le = last_exit # Consistency (avoids need for lock w/ GIL)
+            # local assignment avoids need for lock w/ GIL:
+            le = h.selmgr.last_exit
             s.write("250 LASTEXIT=$"+le.idhex.upper()+" ("+le.nickname+") OK\r\n")
         elif command == "NEWEXIT" or command == "NEWNYM":
-            global new_nym
-            new_nym = True
             clear_dns_cache(c)
+            newmgr = copy.copy(h.selupdate)
+            newmgr.new_nym = True
+            h.update_selmgr(newmgr)
             plog("DEBUG", "Got new nym")
             s.write("250 NEWNYM OK\r\n")
         elif command == "GETDNSEXIT":
@@ -410,9 +103,9 @@
             try:
                 if arg:
                     order_exits = int(arg)
-                    newmgr = copy.copy(selmgr)
+                    newmgr = copy.copy(h.selupdate)
                     newmgr.order_exits = order_exits
-                    selmgr = newmgr
+                    h.update_selmgr(newmgr)
                 s.write("250 ORDEREXITS="+str(order_exits)+" OK\r\n")
             except ValueError:
                 s.write("510 Integer expected\r\n")
@@ -420,23 +113,29 @@
             try:
                 if arg:
                     use_all_exits = int(arg)
-                    newmgr = copy.copy(selmgr)
+                    newmgr = copy.copy(h.selupdate)
                     newmgr.use_all_exits = use_all_exits
-                    selmgr = newmgr
+                    h.update_selmgr(newmgr)
                 s.write("250 USEALLEXITS="+str(use_all_exits)+" OK\r\n")
             except ValueError:
                 s.write("510 Integer expected\r\n")
         elif command == "PRECIRCUITS":
-            global num_circuits
             try:
-                if arg: num_circuits = int(arg)
+                if arg:
+                    num_circuits = int(arg)
+                    newmgr = copy.copy(h.selupdate)
+                    newmgr.num_circuits = num_circuits
+                    h.update_selmgr(newmgr)
                 s.write("250 PRECIRCUITS="+str(num_circuits)+" OK\r\n")
             except ValueError:
                 s.write("510 Integer expected\r\n")
         elif command == "RESOLVEPORT":
-            global resolve_port
             try:
-                if arg: resolve_port = int(arg)
+                if arg:
+                    resolve_port = int(arg)
+                    newmgr = copy.copy(h.selupdate)
+                    newmgr.resolve_port = resolve_port
+                    h.update_selmgr(newmgr)
                 s.write("250 RESOLVEPORT="+str(resolve_port)+" OK\r\n")
             except ValueError:
                 s.write("510 Integer expected\r\n")
@@ -444,9 +143,9 @@
             try:
                 if arg:
                     percent_fast = int(arg)
-                    newmgr = copy.copy(selmgr)
+                    newmgr = copy.copy(h.selupdate)
                     newmgr.percent_fast = percent_fast
-                    selmgr = newmgr
+                    h.update_selmgr(newmgr)
                 s.write("250 PERCENTFAST="+str(percent_fast)+" OK\r\n")
             except ValueError:
                 s.write("510 Integer expected\r\n")
@@ -454,9 +153,9 @@
             try:
                 if arg:
                     percent_skip = int(arg)
-                    newmgr = copy.copy(selmgr)
+                    newmgr = copy.copy(h.selupdate)
                     newmgr.percent_skip = percent_skip
-                    selmgr = newmgr
+                    h.update_selmgr(newmgr)
                 s.write("250 PERCENTSKIP="+str(percent_skip)+" OK\r\n")
             except ValueError:
                 s.write("510 Integer expected\r\n")
@@ -464,18 +163,21 @@
             try:
                 if arg:
                     min_bw = int(arg)
-                    newmgr = copy.copy(selmgr)
+                    newmgr = copy.copy(h.selupdate)
                     newmgr.min_bw = min_bw
-                    selmgr = newmgr
+                    h.update_selmgr(newmgr)
                 s.write("250 BWCUTOFF="+str(min_bw)+" OK\r\n")
             except ValueError:
                 s.write("510 Integer expected\r\n")
         elif command == "UNIFORM":
             s.write("250 OK\r\n")
         elif command == "PATHLEN":
-            global pathlen
             try:
-                if arg: pathlen = int(arg)
+                if arg:
+                    pathlen = int(arg)
+                    newmgr = copy.copy(h.selupdate)
+                    newmgr.pathlen = pathlen
+                    h.update_selmgr(newmgr)
                 s.write("250 PATHLEN="+str(pathlen)+" OK\r\n")
             except ValueError:
                 s.write("510 Integer expected\r\n")
@@ -498,14 +200,14 @@
     c.set_option("__LeaveStreamsUnattached", "0")
     s.close()
 
-def listenloop(c):
+def listenloop(c, h):
     """Loop that handles metatroller commands"""
     srv = ListenSocket(meta_host, meta_port)
     atexit.register(cleanup, *(c, srv))
     while 1:
         client = srv.accept()
         if not client: break
-        thr = threading.Thread(None, lambda: commandloop(BufSock(client), c))
+        thr = threading.Thread(None, lambda: commandloop(BufSock(client), c, h))
         thr.run()
     srv.close()
 
@@ -515,16 +217,17 @@
     c = PathSupport.Connection(s)
     c.debug(file("control.log", "w"))
     c.authenticate()
-    c.set_event_handler(SnakeHandler(c, selmgr))
+    h = StatsHandler(c, selmgr)
+    c.set_event_handler(h)
     c.set_events([TorCtl.EVENT_TYPE.STREAM,
                   TorCtl.EVENT_TYPE.NS,
                   TorCtl.EVENT_TYPE.CIRC,
                   TorCtl.EVENT_TYPE.NEWDESC], True)
     c.set_option("__LeaveStreamsUnattached", "1")
-    return c
+    return (c,h)
 
 def main(argv):
-    listenloop(startup())
+    listenloop(*startup())
 
 if __name__ == '__main__':
     main(sys.argv)