[Author Prev][Author Next][Thread Prev][Thread Next][Author Index][Thread Index]
[or-cvs] r18678: {torctl} Refactor SelectionManager to be more self-contained, more ea (torctl/trunk/python/TorCtl)
Author: mikeperry
Date: 2009-02-23 06:00:22 -0500 (Mon, 23 Feb 2009)
New Revision: 18678
Modified:
torctl/trunk/python/TorCtl/PathSupport.py
torctl/trunk/python/TorCtl/TorCtl.py
Log:
Refactor SelectionManager to be more self-contained, more easily
drop-in replaceable, and more intelligent about handling
conflicting restrictions.
Modified: torctl/trunk/python/TorCtl/PathSupport.py
===================================================================
--- torctl/trunk/python/TorCtl/PathSupport.py 2009-02-23 10:46:31 UTC (rev 18677)
+++ torctl/trunk/python/TorCtl/PathSupport.py 2009-02-23 11:00:22 UTC (rev 18678)
@@ -7,15 +7,20 @@
number of interfaces that make path construction easier.
The inheritance diagram for event handling is as follows:
-TorCtl.EventHandler <- PathBuilder <- CircuitHandler <- StreamHandler.
+TorCtl.EventHandler <- TorCtl.ConsensusTracker <- PathBuilder
+ <- CircuitHandler <- StreamHandler.
Basically, EventHandler is what gets all the control port events
packaged in nice clean classes (see help(TorCtl) for information on
those).
-PathBuilder inherits from EventHandler and is what builds all circuits
-based on the requirements specified in the SelectionManager instance
-passed to its constructor. It also handles attaching streams to
+ConsensusTracker tracks the NEWCONSENSUS and NEWDESC events to maintain
+a view of the network that is consistent with the Tor client's current
+consensus.
+
+PathBuilder inherits from ConsensusTracker and is what builds all
+circuits based on the requirements specified in the SelectionManager
+instance passed to its constructor. It also handles attaching streams to
circuits. It only handles one building one circuit at a time.
CircuitHandler optionally inherits from PathBuilder, and overrides its
@@ -28,12 +33,15 @@
The SelectionManager is essentially a configuration wrapper around the
most elegant portions of TorFlow: NodeGenerators, NodeRestrictions, and
-PathRestrictions. In the SelectionManager, a NodeGenerator is used to
-choose the nodes probabilistically according to some distribution while
-obeying the NodeRestrictions. These generators (one per hop) are handed
-off to the PathSelector, which uses the generators to build a complete
-path that satisfies the PathRestriction requirements.
+PathRestrictions. It extends from a BaseSelectionManager that provides
+a basic example of using these mechanisms for custom implementations.
+In the SelectionManager, a NodeGenerator is used to choose the nodes
+probabilistically according to some distribution while obeying the
+NodeRestrictions. These generators (one per hop) are handed off to the
+PathSelector, which uses the generators to build a complete path that
+satisfies the PathRestriction requirements.
+
Have a look at the class hierarchy directly below to get a feel for how
the restrictions fit together, and what options are available.
@@ -191,11 +199,11 @@
"""Extended Connection class that provides a method for building circuits"""
def __init__(self, sock):
TorCtl.Connection.__init__(self,sock)
- def build_circuit(self, pathlen, path_sel):
+ def build_circuit(self, path):
"Tell Tor to build a circuit chosen by the PathSelector 'path_sel'"
circ = Circuit()
- circ.path = path_sel.build_path(pathlen)
- circ.exit = circ.path[pathlen-1]
+ circ.path = path
+ circ.exit = circ.path[len(path)-1]
circ.circ_id = self.extend_circuit(0, circ.id_path())
return circ
@@ -778,7 +786,7 @@
self.mid_gen.rebuild(sorted_r)
self.exit_gen.rebuild(sorted_r)
- def build_path(self, pathlen):
+ def select_path(self, pathlen):
"""Creates a path of 'pathlen' hops, and returns it as a list of
Router instances"""
self.entry_gen.rewind()
@@ -816,7 +824,67 @@
r.refcount += 1
return path
-class SelectionManager:
+# TODO: Implement example manager.
+class BaseSelectionManager:
+ """
+ The BaseSelectionManager is a minimalistic node selection manager.
+
+ It is meant to be used with a PathSelector that consists of an
+ entry NodeGenerator, a middle NodeGenerator, and an exit NodeGenerator.
+
+ However, none of these are absolutely necessary. It is possible
+ to completely avoid them if you wish by hacking whatever selection
+ mechanisms you want straight into this interface and then passing
+ an instance to a PathBuilder implementation.
+ """
+ def __init__(self):
+ self.bad_restrictions = False
+ self.consensus = None
+
+ def reconfigure(self, consensus=None):
+ """
+ This method is called whenever a significant configuration change
+ occurs. Currently, this only happens via PathBuilder.__init__ and
+ PathBuilder.schedule_selmgr().
+
+ This method should NOT throw any exceptions.
+ """
+ pass
+
+ def new_consensus(self, consensus):
+ """
+ This method is called whenever a consensus change occurs.
+
+ This method should NOT throw any exceptions.
+ """
+ pass
+
+ def set_exit(self, exit_name):
+ """
+ This method provides notification that a fixed exit is desired.
+
+ This method should NOT throw any exceptions.
+ """
+ pass
+
+ def set_target(self, host, port):
+ """
+ This method provides notification that a new target endpoint is
+ desired.
+
+ May throw a RestrictionError if target is impossible to reach.
+ """
+ pass
+
+ def select_path(self):
+ """
+ Returns a new path in the form of a list() of Router instances.
+
+ May throw a RestrictionError.
+ """
+ pass
+
+class SelectionManager(BaseSelectionManager):
"""Helper class to handle configuration updates
The methods are NOT threadsafe. They may ONLY be called from
@@ -828,6 +896,7 @@
def __init__(self, pathlen, order_exits,
percent_fast, percent_skip, min_bw, use_all_exits,
uniform, use_exit, use_guards,geoip_config=None,restrict_guards=False):
+ BaseSelectionManager.__init__(self)
self.__ordered_exit_gen = None
self.pathlen = pathlen
self.order_exits = order_exits
@@ -836,15 +905,33 @@
self.min_bw = min_bw
self.use_all_exits = use_all_exits
self.uniform = uniform
- self.exit_name = use_exit
+ self.exit_id = use_exit
self.use_guards = use_guards
self.geoip_config = geoip_config
self.restrict_guards_only = restrict_guards
+ self.bad_restrictions = False
+ self.consensus = None
- def reconfigure(self, sorted_r):
+ def reconfigure(self, consensus=None):
+ try:
+ self._reconfigure(consensus)
+ self.bad_restrictions = False
+ except NoNodesRemain:
+ plog("WARN", "No nodes remain in selection manager")
+ self.bad_restrictions = True
+ return self.bad_restrictions
+
+ def _reconfigure(self, consensus=None):
"""This function is called after a configuration change,
to rebuild the RestrictionLists."""
- plog("DEBUG", "Reconfigure")
+ if consensus:
+ plog("DEBUG", "Reconfigure with consensus")
+ self.consensus = consensus
+ else:
+ plog("DEBUG", "Reconfigure without consensus")
+
+ sorted_r = self.consensus.sorted_r
+
if self.use_all_exits:
self.path_rstr = PathRestrictionList([UniqueRestriction()])
else:
@@ -873,12 +960,9 @@
)
- if self.exit_name:
- plog("DEBUG", "Applying Setexit: "+self.exit_name)
- if self.exit_name[0] == '$':
- self.exit_rstr = NodeRestrictionList([IdHexRestriction(self.exit_name)])
- else:
- self.exit_rstr = NodeRestrictionList([NickRestriction(self.exit_name)])
+ if self.exit_id:
+ plog("DEBUG", "Applying Setexit: "+self.exit_id)
+ self.exit_rstr = NodeRestrictionList([IdHexRestriction(self.exit_id)])
elif self.use_all_exits:
self.exit_rstr = NodeRestrictionList(
[FlagsRestriction(["Valid", "Running","Fast"], ["BadExit"])])
@@ -957,11 +1041,49 @@
exitgen, self.path_rstr)
return
+ def set_exit(self, exit_name):
+ # sets an exit, if bad, sets bad_exit
+ exit_id = None
+ self.exit_rstr.del_restriction(IdHexRestriction)
+ if exit_name:
+ if exit_name[0] == '$':
+ exit_id = exit_name
+ elif exit_name in self.consensus.name_to_key:
+ exit_id = self.consensus.name_to_key[exit_id]
+ self.exit_id = exit_id
+ if not exit_id or exit_id[1:] not in self.consensus.routers \
+ or self.consensus.routers[exit_id[1:]].down:
+ plog("NOTICE", "Requested downed exit "+str(exit_id))
+ self.bad_restrictions = True
+ else:
+ self.exit_rstr.add_restriction(IdHexRestriction(exit_id))
+ try:
+ self.path_selector.exit_gen.rebuild()
+ self.bad_restrictions = False
+ except RestrictionError, e:
+ plog("WARN", "Restriction error "+str(e)+" after set_exit")
+ self.bad_restrictions = True
+ return self.bad_restrictions
+
+ def new_consensus(self, consensus):
+ self.consensus = consensus
+ if self.exit_id:
+ self.set_exit(self.exit_id)
+ if self.bad_restrictions:
+ return
+ try:
+ self.path_selector.rebuild_gens(self.consensus.sorted_r)
+ except NoNodesRemain:
+ traceback.print_exc()
+ plog("WARN", "No viable nodes in consensus. Punting + performing reconfigure..")
+ self.reconfigure()
+
def set_target(self, ip, port):
+ # sets an exit policy, if bad, rasies exception..
"Called to update the ExitPolicyRestrictions with a new ip and port"
- if self.exit_name[0:3] == "!up":
- plog("WARN", "Requested target with non-up node: "+self.exit_name[4:])
- raise NoNodesRemain()
+ if self.bad_restrictions:
+ plog("WARN", "Requested target with bad restrictions")
+ raise RestrictionError()
self.exit_rstr.del_restriction(ExitPolicyRestriction)
self.exit_rstr.add_restriction(ExitPolicyRestriction(ip, port))
if self.__ordered_exit_gen: self.__ordered_exit_gen.set_port(port)
@@ -983,31 +1105,11 @@
# Need to rebuild exit generator
self.path_selector.exit_gen.rebuild()
- def rebuild_gens(self, sorted_r, router_map, nick_map):
- exit_cleared = False
- exit_id = None
- if self.exit_name:
- if self.exit_name[0] == '$':
- exit_id = self.exit_name[1:]
- elif self.exit_name in nick_map:
- exit_id = nick_map[exit_id][1:]
- if not exit_id or exit_id not in router_map or router_map[exit_id].down:
- plog("NOTICE", "Clearing restriction on downed exit "+self.exit_name)
- exit_cleared = True
- self.exit_name = None
- self.reconfigure(sorted_r)
- try:
- self.path_selector.rebuild_gens(sorted_r)
- except NoNodesRemain:
- traceback.print_exc()
- plog("WARN", "Punting + Performing reconfigure..")
- self.reconfigure(sorted_r)
- if exit_cleared:
- # FIXME: This is a pretty ugly hack.. Basically we are forcing
- # another NoNodesRemain via set_target if the user doesn't request a
- # new exit by then...
- self.exit_name = "!up-"+str(exit_id)
- self.exit_rstr.add_restriction(NickRestriction(self.exit_name))
+ def select_path(self):
+ if self.bad_restrictions:
+ plog("WARN", "Requested target with bad restrictions")
+ raise RestrictionError()
+ return self.path_selector.select_path(self.pathlen)
class Circuit:
"Class to describe a circuit"
@@ -1075,12 +1177,12 @@
self.circuits = {}
self.streams = {}
self.selmgr = selmgr
- self.selmgr.reconfigure(self.sorted_r)
+ self.selmgr.reconfigure(self.current_consensus())
self.imm_jobs = Queue.Queue()
self.low_prio_jobs = Queue.Queue()
self.run_all_jobs = False
self.do_reconfigure = False
- plog("INFO", "Read "+str(len(self.sorted_r))+"/"+str(len(self.consensus))+" routers")
+ plog("INFO", "Read "+str(len(self.sorted_r))+"/"+str(len(self.ns_map))+" routers")
def schedule_immediate(self, job):
"""
@@ -1116,7 +1218,7 @@
imm_job(self)
if self.do_reconfigure:
- self.selmgr.reconfigure(self.sorted_r)
+ self.selmgr.reconfigure(self.current_consensus())
self.do_reconfigure = False
if self.run_all_jobs:
@@ -1144,7 +1246,7 @@
def build_path(self):
""" Get a path from the SelectionManager's PathSelector, can be used
e.g. for generating paths without actually creating any circuits """
- return self.selmgr.path_selector.build_path(self.selmgr.pathlen)
+ return self.selmgr.select_path()
def close_all_circuits(self):
""" Close all open circuits """
@@ -1201,28 +1303,22 @@
circ = None
try:
self.selmgr.set_target(stream.host, stream.port)
- except NoNodesRemain:
+ circ = self.c.build_circuit(self.selmgr.select_path())
+ except RestrictionError, e:
+ # XXX: Dress this up a bit
self.last_exit = None
# Kill this stream
- plog("NOTICE", "Closing stream "+str(stream.strm_id))
+ plog("NOTICE", "Closing impossible stream "+str(stream.strm_id)+" ("+str(e)+")")
self.c.close_stream(stream.strm_id, "4") # END_STREAM_REASON_EXITPOLICY
return
- while circ == None:
- try:
- circ = self.c.build_circuit(
- self.selmgr.pathlen,
- self.selmgr.path_selector)
- except TorCtl.ErrorReply, e:
- # FIXME: How come some routers are non-existant? Shouldn't
- # we have gotten an NS event to notify us they
- # disappeared?
- plog("WARN", "Error building circ: "+str(e.args))
- self.last_exit = None
- # Kill this stream
- plog("NOTICE", "Closing stream "+str(stream.strm_id))
- # END_STREAM_REASON_DESTROY
- self.c.close_stream(stream.strm_id, "5")
- return
+ except TorCtl.ErrorReply, e:
+ plog("WARN", "Error building circ: "+str(e.args))
+ self.last_exit = None
+ # Kill this stream
+ plog("NOTICE", "Closing stream "+str(stream.strm_id))
+ # END_STREAM_REASON_DESTROY
+ self.c.close_stream(stream.strm_id, "5")
+ return
for u in unattached_streams:
plog("DEBUG",
"Attaching "+str(u.strm_id)+" pending build of "+str(circ.circ_id))
@@ -1253,7 +1349,7 @@
self.sorted_r.remove(self.routers[r.idhex])
del self.routers[r.idhex]
for i in xrange(len(self.sorted_r)): self.sorted_r[i].list_rank = i
- self.selmgr.path_selector.rebuild_gens(self.sorted_r)
+ self.selmgr.new_consensus(self.current_consensus())
del self.circuits[c.circ_id]
for stream in circ.pending_streams:
plog("DEBUG", "Finding new circ for " + str(stream.strm_id))
@@ -1387,11 +1483,11 @@
def new_consensus_event(self, n):
TorCtl.ConsensusTracker.new_consensus_event(self, n)
- self.selmgr.rebuild_gens(self.sorted_r, self.routers, self.name_to_key)
+ self.selmgr.new_consensus(self.current_consensus())
def new_desc_event(self, d):
if TorCtl.ConsensusTracker.new_desc_event(self, d):
- self.selmgr.rebuild_gens(self.sorted_r, self.routers, self.name_to_key)
+ self.selmgr.new_consensus(self.current_consensus())
def bandwidth_event(self, b): pass # For heartbeat only..
@@ -1433,13 +1529,15 @@
while circ == None:
try:
self.selmgr.set_target(host, port)
- circ = self.c.build_circuit(self.selmgr.pathlen,
- self.selmgr.path_selector)
+ circ = self.c.build_circuit(self.selmgr.select_path())
self.circuits[circ.circ_id] = circ
return circ
+ except RestrictionError, e:
+ # XXX: Dress this up a bit
+ traceback.print_exc()
+ plog("ERROR", "Impossible restrictions: "+str(e))
except TorCtl.ErrorReply, e:
- # FIXME: How come some routers are non-existant? Shouldn't
- # we have gotten an NS event to notify us they disappeared?
+ traceback.print_exc()
plog("WARN", "Error building circuit: " + str(e.args))
def circ_status_event(self, c):
Modified: torctl/trunk/python/TorCtl/TorCtl.py
===================================================================
--- torctl/trunk/python/TorCtl/TorCtl.py 2009-02-23 10:46:31 UTC (rev 18677)
+++ torctl/trunk/python/TorCtl/TorCtl.py 2009-02-23 11:00:22 UTC (rev 18678)
@@ -1068,16 +1068,31 @@
"""
raise NotImplemented()
+class Consensus:
+ """
+ A Consensus is a pickleable container for the members of
+ ConsensusTracker. This should only be used as a temporary
+ reference, and will change after a NEWDESC or NEWCONSENUS event.
+ If you want a copy of a consensus that is independent
+ of subsequent updates, use copy.deepcopy()
+ """
+
+ def __init__(self, ns_map, sorted_r, router_map, nick_map):
+ self.ns_map = ns_map
+ self.sorted_r = sorted_r
+ self.routers = router_map
+ self.name_to_key = nick_map
+
class ConsensusTracker(EventHandler):
"""
A ConsensusTracker is an EventHandler that tracks the current
- consensus of Tor in self.consensus, self.routers and self.sorted_r
+ consensus of Tor in self.ns_map, self.routers and self.sorted_r
"""
def __init__(self, c, RouterClass=Router):
EventHandler.__init__(self)
c.set_event_handler(self)
self.c = c
- self.consensus = {}
+ self.ns_map = {}
self.routers = {}
self.sorted_r = []
self.name_to_key = {}
@@ -1123,17 +1138,17 @@
for i in xrange(len(self.sorted_r)): self.sorted_r[i].list_rank = i
def _update_consensus(self, nslist):
- self.consensus = {}
+ self.ns_map = {}
for n in nslist:
- self.consensus[n.idhex] = n
+ self.ns_map[n.idhex] = n
def update_consensus(self):
self._update_consensus(self.c.get_network_status())
- self._read_routers(self.consensus.values())
+ self._read_routers(self.ns_map.values())
def new_consensus_event(self, n):
self._update_consensus(n.nslist)
- self._read_routers(self.consensus.values())
+ self._read_routers(self.ns_map.values())
plog("DEBUG", "Read " + str(len(n.nslist))+" NC => "
+ str(len(self.sorted_r)) + " routers")
@@ -1149,8 +1164,8 @@
plog("WARN", "Multiple descs for "+i+" after NEWDESC")
r = r[0]
ns = ns[0]
- if r and r.idhex in self.consensus:
- if ns.orhash != self.consensus[r.idhex].orhash:
+ if r and r.idhex in self.ns_map:
+ if ns.orhash != self.ns_map[r.idhex].orhash:
plog("WARN", "Getinfo and consensus disagree for "+r.idhex)
continue
update = True
@@ -1166,6 +1181,9 @@
+ str(len(self.sorted_r)) + " routers. Update: "+str(update))
return update
+ def current_consensus(self):
+ return Consensus(self.ns_map, self.sorted_r, self.routers,
+ self.name_to_key)
class DebugEventHandler(EventHandler):
"""Trivial debug event handler: reassembles all parsed events to stdout."""