[Author Prev][Author Next][Thread Prev][Thread Next][Author Index][Thread Index]

[or-cvs] r18678: {torctl} Refactor SelectionManager to be more self-contained, more ea (torctl/trunk/python/TorCtl)



Author: mikeperry
Date: 2009-02-23 06:00:22 -0500 (Mon, 23 Feb 2009)
New Revision: 18678

Modified:
   torctl/trunk/python/TorCtl/PathSupport.py
   torctl/trunk/python/TorCtl/TorCtl.py
Log:

Refactor SelectionManager to be more self-contained, more easily
drop-in replaceable, and more intelligent about handling 
conflicting restrictions.



Modified: torctl/trunk/python/TorCtl/PathSupport.py
===================================================================
--- torctl/trunk/python/TorCtl/PathSupport.py	2009-02-23 10:46:31 UTC (rev 18677)
+++ torctl/trunk/python/TorCtl/PathSupport.py	2009-02-23 11:00:22 UTC (rev 18678)
@@ -7,15 +7,20 @@
 number of interfaces that make path construction easier.
 
 The inheritance diagram for event handling is as follows:
-TorCtl.EventHandler <- PathBuilder <- CircuitHandler <- StreamHandler.
+TorCtl.EventHandler <- TorCtl.ConsensusTracker <- PathBuilder 
+  <- CircuitHandler <- StreamHandler.
 
 Basically, EventHandler is what gets all the control port events
 packaged in nice clean classes (see help(TorCtl) for information on
 those). 
 
-PathBuilder inherits from EventHandler and is what builds all circuits
-based on the requirements specified in the SelectionManager instance
-passed to its constructor. It also handles attaching streams to
+ConsensusTracker tracks the NEWCONSENSUS and NEWDESC events to maintain
+a view of the network that is consistent with the Tor client's current
+consensus.
+
+PathBuilder inherits from ConsensusTracker and is what builds all
+circuits based on the requirements specified in the SelectionManager
+instance passed to its constructor. It also handles attaching streams to
 circuits. It only handles one building one circuit at a time.
 
 CircuitHandler optionally inherits from PathBuilder, and overrides its
@@ -28,12 +33,15 @@
 
 The SelectionManager is essentially a configuration wrapper around the
 most elegant portions of TorFlow: NodeGenerators, NodeRestrictions, and
-PathRestrictions. In the SelectionManager, a NodeGenerator is used to
-choose the nodes probabilistically according to some distribution while
-obeying the NodeRestrictions. These generators (one per hop) are handed
-off to the PathSelector, which uses the generators to build a complete
-path that satisfies the PathRestriction requirements.
+PathRestrictions. It extends from a BaseSelectionManager that provides
+a basic example of using these mechanisms for custom implementations.
 
+In the SelectionManager, a NodeGenerator is used to choose the nodes
+probabilistically according to some distribution while obeying the
+NodeRestrictions. These generators (one per hop) are handed off to the
+PathSelector, which uses the generators to build a complete path that
+satisfies the PathRestriction requirements.
+
 Have a look at the class hierarchy directly below to get a feel for how
 the restrictions fit together, and what options are available.
 
@@ -191,11 +199,11 @@
   """Extended Connection class that provides a method for building circuits"""
   def __init__(self, sock):
     TorCtl.Connection.__init__(self,sock)
-  def build_circuit(self, pathlen, path_sel):
+  def build_circuit(self, path):
     "Tell Tor to build a circuit chosen by the PathSelector 'path_sel'"
     circ = Circuit()
-    circ.path = path_sel.build_path(pathlen)
-    circ.exit = circ.path[pathlen-1]
+    circ.path = path
+    circ.exit = circ.path[len(path)-1]
     circ.circ_id = self.extend_circuit(0, circ.id_path())
     return circ
 
@@ -778,7 +786,7 @@
     self.mid_gen.rebuild(sorted_r)
     self.exit_gen.rebuild(sorted_r)
 
-  def build_path(self, pathlen):
+  def select_path(self, pathlen):
     """Creates a path of 'pathlen' hops, and returns it as a list of
        Router instances"""
     self.entry_gen.rewind()
@@ -816,7 +824,67 @@
       r.refcount += 1
     return path
 
-class SelectionManager:
+# TODO: Implement example manager.
+class BaseSelectionManager:
+   """
+   The BaseSelectionManager is a minimalistic node selection manager.
+
+   It is meant to be used with a PathSelector that consists of an
+   entry NodeGenerator, a middle NodeGenerator, and an exit NodeGenerator.
+
+   However, none of these are absolutely necessary. It is possible
+   to completely avoid them if you wish by hacking whatever selection
+   mechanisms you want straight into this interface and then passing
+   an instance to a PathBuilder implementation.
+   """
+   def __init__(self):
+     self.bad_restrictions = False
+     self.consensus = None
+
+   def reconfigure(self, consensus=None):
+     """ 
+     This method is called whenever a significant configuration change
+     occurs. Currently, this only happens via PathBuilder.__init__ and
+     PathBuilder.schedule_selmgr().
+     
+     This method should NOT throw any exceptions.
+     """
+     pass
+
+   def new_consensus(self, consensus):
+     """ 
+     This method is called whenever a consensus change occurs.
+     
+     This method should NOT throw any exceptions.
+     """
+     pass
+
+   def set_exit(self, exit_name):
+     """
+     This method provides notification that a fixed exit is desired.
+
+     This method should NOT throw any exceptions.
+     """
+     pass
+
+   def set_target(self, host, port):
+     """
+     This method provides notification that a new target endpoint is
+     desired.
+
+     May throw a RestrictionError if target is impossible to reach.
+     """
+     pass
+
+   def select_path(self):
+     """
+     Returns a new path in the form of a list() of Router instances.
+
+     May throw a RestrictionError.
+     """
+     pass
+
+class SelectionManager(BaseSelectionManager):
   """Helper class to handle configuration updates
     
     The methods are NOT threadsafe. They may ONLY be called from
@@ -828,6 +896,7 @@
   def __init__(self, pathlen, order_exits,
          percent_fast, percent_skip, min_bw, use_all_exits,
          uniform, use_exit, use_guards,geoip_config=None,restrict_guards=False):
+    BaseSelectionManager.__init__(self)
     self.__ordered_exit_gen = None 
     self.pathlen = pathlen
     self.order_exits = order_exits
@@ -836,15 +905,33 @@
     self.min_bw = min_bw
     self.use_all_exits = use_all_exits
     self.uniform = uniform
-    self.exit_name = use_exit
+    self.exit_id = use_exit
     self.use_guards = use_guards
     self.geoip_config = geoip_config
     self.restrict_guards_only = restrict_guards
+    self.bad_restrictions = False
+    self.consensus = None
 
-  def reconfigure(self, sorted_r):
+  def reconfigure(self, consensus=None):
+    try:
+      self._reconfigure(consensus)
+      self.bad_restrictions = False
+    except NoNodesRemain:
+      plog("WARN", "No nodes remain in selection manager")
+      self.bad_restrictions = True
+    return self.bad_restrictions
+
+  def _reconfigure(self, consensus=None):
     """This function is called after a configuration change, 
      to rebuild the RestrictionLists."""
-    plog("DEBUG", "Reconfigure")
+    if consensus: 
+      plog("DEBUG", "Reconfigure with consensus")
+      self.consensus = consensus
+    else:
+      plog("DEBUG", "Reconfigure without consensus")
+
+    sorted_r = self.consensus.sorted_r
+
     if self.use_all_exits:
       self.path_rstr = PathRestrictionList([UniqueRestriction()])
     else:
@@ -873,12 +960,9 @@
 
     )
 
-    if self.exit_name:
-      plog("DEBUG", "Applying Setexit: "+self.exit_name)
-      if self.exit_name[0] == '$':
-        self.exit_rstr = NodeRestrictionList([IdHexRestriction(self.exit_name)])
-      else:
-        self.exit_rstr = NodeRestrictionList([NickRestriction(self.exit_name)])
+    if self.exit_id:
+      plog("DEBUG", "Applying Setexit: "+self.exit_id)
+      self.exit_rstr = NodeRestrictionList([IdHexRestriction(self.exit_id)])
     elif self.use_all_exits:
       self.exit_rstr = NodeRestrictionList(
         [FlagsRestriction(["Valid", "Running","Fast"], ["BadExit"])])
@@ -957,11 +1041,49 @@
          exitgen, self.path_rstr)
       return
 
+  def set_exit(self, exit_name):
+    # sets an exit, if bad, sets bad_exit
+    exit_id = None
+    self.exit_rstr.del_restriction(IdHexRestriction)
+    if exit_name:
+      if exit_name[0] == '$':
+        exit_id = exit_name
+      elif exit_name in self.consensus.name_to_key:
+        exit_id = self.consensus.name_to_key[exit_id]
+    self.exit_id = exit_id
+    if not exit_id or exit_id[1:] not in self.consensus.routers \
+            or self.consensus.routers[exit_id[1:]].down:
+      plog("NOTICE", "Requested downed exit "+str(exit_id))
+      self.bad_restrictions = True
+    else:
+      self.exit_rstr.add_restriction(IdHexRestriction(exit_id))
+      try:
+        self.path_selector.exit_gen.rebuild()
+        self.bad_restrictions = False
+      except RestrictionError, e:
+        plog("WARN", "Restriction error "+str(e)+" after set_exit")
+        self.bad_restrictions = True
+    return self.bad_restrictions
+
+  def new_consensus(self, consensus):
+    self.consensus = consensus
+    if self.exit_id:
+      self.set_exit(self.exit_id)
+    if self.bad_restrictions:
+      return
+    try:
+      self.path_selector.rebuild_gens(self.consensus.sorted_r)
+    except NoNodesRemain:
+      traceback.print_exc()
+      plog("WARN", "No viable nodes in consensus. Punting + performing reconfigure..")
+      self.reconfigure()
+
   def set_target(self, ip, port):
+    # sets an exit policy, if bad, rasies exception..
     "Called to update the ExitPolicyRestrictions with a new ip and port"
-    if self.exit_name[0:3] == "!up":
-      plog("WARN", "Requested target with non-up node: "+self.exit_name[4:])
-      raise NoNodesRemain()
+    if self.bad_restrictions:
+      plog("WARN", "Requested target with bad restrictions")
+      raise RestrictionError()
     self.exit_rstr.del_restriction(ExitPolicyRestriction)
     self.exit_rstr.add_restriction(ExitPolicyRestriction(ip, port))
     if self.__ordered_exit_gen: self.__ordered_exit_gen.set_port(port)
@@ -983,31 +1105,11 @@
     # Need to rebuild exit generator
     self.path_selector.exit_gen.rebuild()
 
-  def rebuild_gens(self, sorted_r, router_map, nick_map):
-    exit_cleared = False
-    exit_id = None
-    if self.exit_name:
-      if self.exit_name[0] == '$':
-        exit_id = self.exit_name[1:]
-      elif self.exit_name in nick_map:
-        exit_id = nick_map[exit_id][1:]
-      if not exit_id or exit_id not in router_map or router_map[exit_id].down:
-        plog("NOTICE", "Clearing restriction on downed exit "+self.exit_name)
-        exit_cleared = True
-        self.exit_name = None
-        self.reconfigure(sorted_r)
-    try:
-      self.path_selector.rebuild_gens(sorted_r)
-    except NoNodesRemain:
-      traceback.print_exc()
-      plog("WARN", "Punting + Performing reconfigure..")
-      self.reconfigure(sorted_r)
-    if exit_cleared: 
-      # FIXME: This is a pretty ugly hack.. Basically we are forcing
-      # another NoNodesRemain via set_target if the user doesn't request a 
-      # new exit by then...
-      self.exit_name = "!up-"+str(exit_id)
-      self.exit_rstr.add_restriction(NickRestriction(self.exit_name))
+  def select_path(self):
+    if self.bad_restrictions:
+      plog("WARN", "Requested target with bad restrictions")
+      raise RestrictionError()
+    return self.path_selector.select_path(self.pathlen)
 
 class Circuit:
   "Class to describe a circuit"
@@ -1075,12 +1177,12 @@
     self.circuits = {}
     self.streams = {}
     self.selmgr = selmgr
-    self.selmgr.reconfigure(self.sorted_r)
+    self.selmgr.reconfigure(self.current_consensus())
     self.imm_jobs = Queue.Queue()
     self.low_prio_jobs = Queue.Queue()
     self.run_all_jobs = False
     self.do_reconfigure = False
-    plog("INFO", "Read "+str(len(self.sorted_r))+"/"+str(len(self.consensus))+" routers")
+    plog("INFO", "Read "+str(len(self.sorted_r))+"/"+str(len(self.ns_map))+" routers")
 
   def schedule_immediate(self, job):
     """
@@ -1116,7 +1218,7 @@
       imm_job(self)
     
     if self.do_reconfigure:
-      self.selmgr.reconfigure(self.sorted_r)
+      self.selmgr.reconfigure(self.current_consensus())
       self.do_reconfigure = False
     
     if self.run_all_jobs:
@@ -1144,7 +1246,7 @@
   def build_path(self):
     """ Get a path from the SelectionManager's PathSelector, can be used 
         e.g. for generating paths without actually creating any circuits """
-    return self.selmgr.path_selector.build_path(self.selmgr.pathlen)
+    return self.selmgr.select_path()
 
   def close_all_circuits(self):
     """ Close all open circuits """
@@ -1201,28 +1303,22 @@
       circ = None
       try:
         self.selmgr.set_target(stream.host, stream.port)
-      except NoNodesRemain:
+        circ = self.c.build_circuit(self.selmgr.select_path())
+      except RestrictionError, e:
+        # XXX: Dress this up a bit
         self.last_exit = None
         # Kill this stream
-        plog("NOTICE", "Closing stream "+str(stream.strm_id))
+        plog("NOTICE", "Closing impossible stream "+str(stream.strm_id)+" ("+str(e)+")")
         self.c.close_stream(stream.strm_id, "4") # END_STREAM_REASON_EXITPOLICY
         return
-      while circ == None:
-        try:
-          circ = self.c.build_circuit(
-                  self.selmgr.pathlen,
-                  self.selmgr.path_selector)
-        except TorCtl.ErrorReply, e:
-          # FIXME: How come some routers are non-existant? Shouldn't
-          # we have gotten an NS event to notify us they
-          # disappeared?
-          plog("WARN", "Error building circ: "+str(e.args))
-          self.last_exit = None
-          # Kill this stream
-          plog("NOTICE", "Closing stream "+str(stream.strm_id))
-          # END_STREAM_REASON_DESTROY
-          self.c.close_stream(stream.strm_id, "5") 
-          return
+      except TorCtl.ErrorReply, e:
+        plog("WARN", "Error building circ: "+str(e.args))
+        self.last_exit = None
+        # Kill this stream
+        plog("NOTICE", "Closing stream "+str(stream.strm_id))
+        # END_STREAM_REASON_DESTROY
+        self.c.close_stream(stream.strm_id, "5") 
+        return
       for u in unattached_streams:
         plog("DEBUG",
            "Attaching "+str(u.strm_id)+" pending build of "+str(circ.circ_id))
@@ -1253,7 +1349,7 @@
           self.sorted_r.remove(self.routers[r.idhex])
           del self.routers[r.idhex]
           for i in xrange(len(self.sorted_r)): self.sorted_r[i].list_rank = i
-          self.selmgr.path_selector.rebuild_gens(self.sorted_r)
+          self.selmgr.new_consensus(self.current_consensus())
       del self.circuits[c.circ_id]
       for stream in circ.pending_streams:
         plog("DEBUG", "Finding new circ for " + str(stream.strm_id))
@@ -1387,11 +1483,11 @@
 
   def new_consensus_event(self, n):
     TorCtl.ConsensusTracker.new_consensus_event(self, n)
-    self.selmgr.rebuild_gens(self.sorted_r, self.routers, self.name_to_key)
+    self.selmgr.new_consensus(self.current_consensus())
 
   def new_desc_event(self, d):
     if TorCtl.ConsensusTracker.new_desc_event(self, d):
-      self.selmgr.rebuild_gens(self.sorted_r, self.routers, self.name_to_key)
+      self.selmgr.new_consensus(self.current_consensus())
 
   def bandwidth_event(self, b): pass # For heartbeat only..
 
@@ -1433,13 +1529,15 @@
     while circ == None:
       try:
         self.selmgr.set_target(host, port)
-        circ = self.c.build_circuit(self.selmgr.pathlen, 
-           self.selmgr.path_selector)
+        circ = self.c.build_circuit(self.selmgr.select_path())
         self.circuits[circ.circ_id] = circ
         return circ
+      except RestrictionError, e:
+        # XXX: Dress this up a bit
+        traceback.print_exc()
+        plog("ERROR", "Impossible restrictions: "+str(e))
       except TorCtl.ErrorReply, e:
-        # FIXME: How come some routers are non-existant? Shouldn't
-        # we have gotten an NS event to notify us they disappeared?
+        traceback.print_exc()
         plog("WARN", "Error building circuit: " + str(e.args))
 
   def circ_status_event(self, c):

Modified: torctl/trunk/python/TorCtl/TorCtl.py
===================================================================
--- torctl/trunk/python/TorCtl/TorCtl.py	2009-02-23 10:46:31 UTC (rev 18677)
+++ torctl/trunk/python/TorCtl/TorCtl.py	2009-02-23 11:00:22 UTC (rev 18678)
@@ -1068,16 +1068,31 @@
     """
     raise NotImplemented()
 
+class Consensus:
+  """
+  A Consensus is a pickleable container for the members of
+  ConsensusTracker. This should only be used as a temporary 
+  reference, and will change after a NEWDESC or NEWCONSENUS event.
+  If you want a copy of a consensus that is independent
+  of subsequent updates, use copy.deepcopy()
+  """
+
+  def __init__(self, ns_map, sorted_r, router_map, nick_map):
+    self.ns_map = ns_map
+    self.sorted_r = sorted_r
+    self.routers = router_map
+    self.name_to_key = nick_map
+
 class ConsensusTracker(EventHandler):
   """
   A ConsensusTracker is an EventHandler that tracks the current
-  consensus of Tor in self.consensus, self.routers and self.sorted_r
+  consensus of Tor in self.ns_map, self.routers and self.sorted_r
   """
   def __init__(self, c, RouterClass=Router):
     EventHandler.__init__(self)
     c.set_event_handler(self)
     self.c = c
-    self.consensus = {}
+    self.ns_map = {}
     self.routers = {}
     self.sorted_r = []
     self.name_to_key = {}
@@ -1123,17 +1138,17 @@
     for i in xrange(len(self.sorted_r)): self.sorted_r[i].list_rank = i
 
   def _update_consensus(self, nslist):
-    self.consensus = {}
+    self.ns_map = {}
     for n in nslist:
-      self.consensus[n.idhex] = n
+      self.ns_map[n.idhex] = n
    
   def update_consensus(self):
     self._update_consensus(self.c.get_network_status())
-    self._read_routers(self.consensus.values())
+    self._read_routers(self.ns_map.values())
 
   def new_consensus_event(self, n):
     self._update_consensus(n.nslist)
-    self._read_routers(self.consensus.values())
+    self._read_routers(self.ns_map.values())
     plog("DEBUG", "Read " + str(len(n.nslist))+" NC => " 
        + str(len(self.sorted_r)) + " routers")
  
@@ -1149,8 +1164,8 @@
         plog("WARN", "Multiple descs for "+i+" after NEWDESC")
       r = r[0]
       ns = ns[0]
-      if r and r.idhex in self.consensus:
-        if ns.orhash != self.consensus[r.idhex].orhash:
+      if r and r.idhex in self.ns_map:
+        if ns.orhash != self.ns_map[r.idhex].orhash:
           plog("WARN", "Getinfo and consensus disagree for "+r.idhex)
           continue
         update = True
@@ -1166,6 +1181,9 @@
        + str(len(self.sorted_r)) + " routers. Update: "+str(update))
     return update
 
+  def current_consensus(self):
+    return Consensus(self.ns_map, self.sorted_r, self.routers, 
+                     self.name_to_key)
 
 class DebugEventHandler(EventHandler):
   """Trivial debug event handler: reassembles all parsed events to stdout."""