[Author Prev][Author Next][Thread Prev][Thread Next][Author Index][Thread Index]
[or-cvs] r9807: Indent level 4 -> 2 spaces. (in torflow/trunk: . TorCtl)
Author: mikeperry
Date: 2007-03-12 21:36:12 -0400 (Mon, 12 Mar 2007)
New Revision: 9807
Modified:
torflow/trunk/TorCtl/PathSupport.py
torflow/trunk/TorCtl/TorCtl.py
torflow/trunk/TorCtl/TorUtil.py
torflow/trunk/metatroller.py
torflow/trunk/nodemon.py
Log:
Indent level 4 -> 2 spaces.
Modified: torflow/trunk/TorCtl/PathSupport.py
===================================================================
--- torflow/trunk/TorCtl/PathSupport.py 2007-03-12 21:07:19 UTC (rev 9806)
+++ torflow/trunk/TorCtl/PathSupport.py 2007-03-13 01:36:12 UTC (rev 9807)
@@ -22,835 +22,835 @@
#################### Path Support Interfaces #####################
class NodeRestriction:
- "Interface for node restriction policies"
- def r_is_ok(self, r): return True
- def reset(self, router_list): pass
+ "Interface for node restriction policies"
+ def r_is_ok(self, r): return True
+ def reset(self, router_list): pass
class NodeRestrictionList:
- def __init__(self, restrictions, sorted_rlist):
- self.restrictions = restrictions
- self.update_routers(sorted_rlist)
+ def __init__(self, restrictions, sorted_rlist):
+ self.restrictions = restrictions
+ self.update_routers(sorted_rlist)
- def __check_r(self, r):
- for rst in self.restrictions:
- if not rst.r_is_ok(r): return False
- self.restricted_bw += r.bw
- return True
+ def __check_r(self, r):
+ for rst in self.restrictions:
+ if not rst.r_is_ok(r): return False
+ self.restricted_bw += r.bw
+ return True
- def update_routers(self, sorted_rlist):
- self._sorted_r = sorted_rlist
- self.restricted_bw = 0
- for rs in self.restrictions: rs.reset(sorted_rlist)
- self.restricted_r = filter(self.__check_r, self._sorted_r)
+ def update_routers(self, sorted_rlist):
+ self._sorted_r = sorted_rlist
+ self.restricted_bw = 0
+ for rs in self.restrictions: rs.reset(sorted_rlist)
+ self.restricted_r = filter(self.__check_r, self._sorted_r)
- def add_restriction(self, restr):
- self.restrictions.append(restr)
- for r in self.restricted_r:
- if not restr.r_is_ok(r):
- self.restricted_r.remove(r)
- self.restricted_bw -= r.bw
-
- # XXX: This does not collapse meta restrictions..
- def del_restriction(self, RestrictionClass):
- self.restrictions = filter(
- lambda r: not isinstance(r, RestrictionClass),
- self.restrictions)
- self.update_routers(self._sorted_r)
+ def add_restriction(self, restr):
+ self.restrictions.append(restr)
+ for r in self.restricted_r:
+ if not restr.r_is_ok(r):
+ self.restricted_r.remove(r)
+ self.restricted_bw -= r.bw
+
+ # XXX: This does not collapse meta restrictions..
+ def del_restriction(self, RestrictionClass):
+ self.restrictions = filter(
+ lambda r: not isinstance(r, RestrictionClass),
+ self.restrictions)
+ self.update_routers(self._sorted_r)
class PathRestriction:
- "Interface for path restriction policies"
- def r_is_ok(self, path, r): return True
- def entry_is_ok(self, path, r): return self.r_is_ok(path, r)
- def middle_is_ok(self, path, r): return self.r_is_ok(path, r)
- def exit_is_ok(self, path, r): return self.r_is_ok(path, r)
+ "Interface for path restriction policies"
+ def r_is_ok(self, path, r): return True
+ def entry_is_ok(self, path, r): return self.r_is_ok(path, r)
+ def middle_is_ok(self, path, r): return self.r_is_ok(path, r)
+ def exit_is_ok(self, path, r): return self.r_is_ok(path, r)
class PathRestrictionList:
- def __init__(self, restrictions):
- self.restrictions = restrictions
-
- def entry_is_ok(self, path, r):
- for rs in self.restrictions:
- if not rs.entry_is_ok(path, r):
- return False
- return True
+ def __init__(self, restrictions):
+ self.restrictions = restrictions
+
+ def entry_is_ok(self, path, r):
+ for rs in self.restrictions:
+ if not rs.entry_is_ok(path, r):
+ return False
+ return True
- def middle_is_ok(self, path, r):
- for rs in self.restrictions:
- if not rs.middle_is_ok(path, r):
- return False
- return True
+ def middle_is_ok(self, path, r):
+ for rs in self.restrictions:
+ if not rs.middle_is_ok(path, r):
+ return False
+ return True
- def exit_is_ok(self, path, r):
- for rs in self.restrictions:
- if not rs.exit_is_ok(path, r):
- return False
- return True
+ def exit_is_ok(self, path, r):
+ for rs in self.restrictions:
+ if not rs.exit_is_ok(path, r):
+ return False
+ return True
- def add_restriction(self, rstr):
- self.restrictions.append(rstr)
+ def add_restriction(self, rstr):
+ self.restrictions.append(rstr)
- def del_restriction(self, RestrictionClass):
- self.restrictions = filter(
- lambda r: not isinstance(r, RestrictionClass),
- self.restrictions)
+ def del_restriction(self, RestrictionClass):
+ self.restrictions = filter(
+ lambda r: not isinstance(r, RestrictionClass),
+ self.restrictions)
class NodeGenerator:
- "Interface for node generation"
- def __init__(self, restriction_list):
- self.restriction_list = restriction_list
- self.rewind()
+ "Interface for node generation"
+ def __init__(self, restriction_list):
+ self.restriction_list = restriction_list
+ self.rewind()
- def rewind(self):
- # TODO: Hrmm... Is there any way to handle termination other
- # than to make a list of routers that we pop from? Random generators
- # will not terminate if no node matches the selector without this..
- # Not so much an issue now, but in a few years, the Tor network
- # will be large enough that having all these list copies will
- # be obscene... Possible candidate for a python list comprehension
- self.routers = copy.copy(self.restriction_list.restricted_r)
- self.bw = self.restriction_list.restricted_bw
+ def rewind(self):
+ # TODO: Hrmm... Is there any way to handle termination other
+ # than to make a list of routers that we pop from? Random generators
+ # will not terminate if no node matches the selector without this..
+ # Not so much an issue now, but in a few years, the Tor network
+ # will be large enough that having all these list copies will
+ # be obscene... Possible candidate for a python list comprehension
+ self.routers = copy.copy(self.restriction_list.restricted_r)
+ self.bw = self.restriction_list.restricted_bw
- def mark_chosen(self, r):
- self.routers.remove(r)
- self.bw -= r.bw
+ def mark_chosen(self, r):
+ self.routers.remove(r)
+ self.bw -= r.bw
- def all_chosen(self):
- if not self.routers and self.bw or not self.bw and self.routers:
- plog("WARN", str(len(self.routers))+" routers left but bw="
- +str(self.bw))
- return not self.routers
+ def all_chosen(self):
+ if not self.routers and self.bw or not self.bw and self.routers:
+ plog("WARN", str(len(self.routers))+" routers left but bw="
+ +str(self.bw))
+ return not self.routers
- def next_r(self): raise NotImplemented()
+ def next_r(self): raise NotImplemented()
class Connection(TorCtl.Connection):
- def build_circuit(self, pathlen, path_sel):
- circ = TorCtl.Circuit()
- if pathlen == 1:
- circ.exit = path_sel.exit_chooser(circ.path)
- circ.path = [circ.exit]
- circ.cid = self.extend_circuit(0, circ.id_path())
- else:
- circ.path.append(path_sel.entry_chooser(circ.path))
- for i in xrange(1, pathlen-1):
- circ.path.append(path_sel.middle_chooser(circ.path))
- circ.exit = path_sel.exit_chooser(circ.path)
- circ.path.append(circ.exit)
- circ.cid = self.extend_circuit(0, circ.id_path())
- circ.created_at = datetime.datetime.now()
- return circ
+ def build_circuit(self, pathlen, path_sel):
+ circ = TorCtl.Circuit()
+ if pathlen == 1:
+ circ.exit = path_sel.exit_chooser(circ.path)
+ circ.path = [circ.exit]
+ circ.cid = self.extend_circuit(0, circ.id_path())
+ else:
+ circ.path.append(path_sel.entry_chooser(circ.path))
+ for i in xrange(1, pathlen-1):
+ circ.path.append(path_sel.middle_chooser(circ.path))
+ circ.exit = path_sel.exit_chooser(circ.path)
+ circ.path.append(circ.exit)
+ circ.cid = self.extend_circuit(0, circ.id_path())
+ circ.created_at = datetime.datetime.now()
+ return circ
######################## Node Restrictions ########################
# TODO: We still need more path support implementations
# - BwWeightedGenerator
# - NodeRestrictions:
-# - Uptime/LongLivedPorts (Does/should hibernation count?)
-# - Published/Updated
-# - GeoIP
-# - NodeCountry
+# - Uptime/LongLivedPorts (Does/should hibernation count?)
+# - Published/Updated
+# - GeoIP
+# - NodeCountry
# - PathRestrictions
-# - Family
-# - GeoIP:
-# - OceanPhobicRestrictor (avoids Pacific Ocean or two atlantic crossings)
-# or ContinentRestrictor (avoids doing more than N continent crossings)
-# - Mathematical/empirical study of predecessor expectation
-# - If middle node on the same continent as exit, exit learns nothing
-# - else, exit has a bias on the continent of origin of user
-# - Language and browser accept string determine this anyway
-# - EchelonPhobicRestrictor
-# - Does not cross international boundaries for client->Entry or
-# Exit->destination hops
+# - Family
+# - GeoIP:
+# - OceanPhobicRestrictor (avoids Pacific Ocean or two atlantic crossings)
+# or ContinentRestrictor (avoids doing more than N continent crossings)
+# - Mathematical/empirical study of predecessor expectation
+# - If middle node on the same continent as exit, exit learns nothing
+# - else, exit has a bias on the continent of origin of user
+# - Language and browser accept string determine this anyway
+# - EchelonPhobicRestrictor
+# - Does not cross international boundaries for client->Entry or
+# Exit->destination hops
class PercentileRestriction(NodeRestriction):
- """If used, this restriction MUST be FIRST in the RestrictionList."""
- def __init__(self, pct_skip, pct_fast, r_list):
- self.pct_fast = pct_fast
- self.pct_skip = pct_skip
- self.sorted_r = r_list
- self.position = 0
+ """If used, this restriction MUST be FIRST in the RestrictionList."""
+ def __init__(self, pct_skip, pct_fast, r_list):
+ self.pct_fast = pct_fast
+ self.pct_skip = pct_skip
+ self.sorted_r = r_list
+ self.position = 0
- def reset(self, r_list):
- self.sorted_r = r_list
- self.position = 0
-
- def r_is_ok(self, r):
- ret = True
- if self.position == len(self.sorted_r):
- self.position = 0
- plog("WARN", "Resetting PctFastRestriction")
- if self.position != self.sorted_r.index(r): # XXX expensive?
- plog("WARN", "Router"+r.nickname+" at mismatched index: "
- +self.position+" vs "+self.sorted_r.index(r))
-
- if self.position < len(self.sorted_r)*self.pct_skip/100:
- ret = False
- elif self.position > len(self.sorted_r)*self.pct_fast/100:
- ret = False
-
- self.position += 1
- return ret
-
+ def reset(self, r_list):
+ self.sorted_r = r_list
+ self.position = 0
+
+ def r_is_ok(self, r):
+ ret = True
+ if self.position == len(self.sorted_r):
+ self.position = 0
+ plog("WARN", "Resetting PctFastRestriction")
+ if self.position != self.sorted_r.index(r): # XXX expensive?
+ plog("WARN", "Router"+r.nickname+" at mismatched index: "
+ +self.position+" vs "+self.sorted_r.index(r))
+
+ if self.position < len(self.sorted_r)*self.pct_skip/100:
+ ret = False
+ elif self.position > len(self.sorted_r)*self.pct_fast/100:
+ ret = False
+
+ self.position += 1
+ return ret
+
class OSRestriction(NodeRestriction):
- def __init__(self, ok, bad=[]):
- self.ok = ok
- self.bad = bad
+ def __init__(self, ok, bad=[]):
+ self.ok = ok
+ self.bad = bad
- def r_is_ok(self, r):
- for y in self.ok:
- if re.search(y, r.os):
- return True
- for b in self.bad:
- if re.search(b, r.os):
- return False
- if self.ok: return False
- if self.bad: return True
+ def r_is_ok(self, r):
+ for y in self.ok:
+ if re.search(y, r.os):
+ return True
+ for b in self.bad:
+ if re.search(b, r.os):
+ return False
+ if self.ok: return False
+ if self.bad: return True
class ConserveExitsRestriction(NodeRestriction):
- def r_is_ok(self, r): return not "Exit" in r.flags
+ def r_is_ok(self, r): return not "Exit" in r.flags
class FlagsRestriction(NodeRestriction):
- def __init__(self, mandatory, forbidden=[]):
- self.mandatory = mandatory
- self.forbidden = forbidden
+ def __init__(self, mandatory, forbidden=[]):
+ self.mandatory = mandatory
+ self.forbidden = forbidden
- def r_is_ok(self, router):
- for m in self.mandatory:
- if not m in router.flags: return False
- for f in self.forbidden:
- if f in router.flags: return False
- return True
+ def r_is_ok(self, router):
+ for m in self.mandatory:
+ if not m in router.flags: return False
+ for f in self.forbidden:
+ if f in router.flags: return False
+ return True
class NickRestriction(NodeRestriction):
- """Require that the node nickname is as specified"""
- def __init__(self, nickname):
- self.nickname = nickname
+ """Require that the node nickname is as specified"""
+ def __init__(self, nickname):
+ self.nickname = nickname
- def r_is_ok(self, router):
- return router.nickname == self.nickname
+ def r_is_ok(self, router):
+ return router.nickname == self.nickname
class IdHexRestriction(NodeRestriction):
- """Require that the node idhash is as specified"""
- def __init__(self, idhex):
- if idhex[0] == '$':
- self.idhex = idhex[1:].upper()
- else:
- self.idhex = idhex.upper()
+ """Require that the node idhash is as specified"""
+ def __init__(self, idhex):
+ if idhex[0] == '$':
+ self.idhex = idhex[1:].upper()
+ else:
+ self.idhex = idhex.upper()
- def r_is_ok(self, router):
- return router.idhex == self.idhex
-
+ def r_is_ok(self, router):
+ return router.idhex == self.idhex
+
class MinBWRestriction(NodeRestriction):
- def __init__(self, minbw):
- self.min_bw = minbw
+ def __init__(self, minbw):
+ self.min_bw = minbw
- def r_is_ok(self, router): return router.bw >= self.min_bw
-
+ def r_is_ok(self, router): return router.bw >= self.min_bw
+
class VersionIncludeRestriction(NodeRestriction):
- def __init__(self, eq):
- self.eq = map(TorCtl.RouterVersion, eq)
-
- def r_is_ok(self, router):
- for e in self.eq:
- if e == router.version:
- return True
- return False
+ def __init__(self, eq):
+ self.eq = map(TorCtl.RouterVersion, eq)
+
+ def r_is_ok(self, router):
+ for e in self.eq:
+ if e == router.version:
+ return True
+ return False
class VersionExcludeRestriction(NodeRestriction):
- def __init__(self, exclude):
- self.exclude = map(TorCtl.RouterVersion, exclude)
-
- def r_is_ok(self, router):
- for e in self.exclude:
- if e == router.version:
- return False
- return True
+ def __init__(self, exclude):
+ self.exclude = map(TorCtl.RouterVersion, exclude)
+
+ def r_is_ok(self, router):
+ for e in self.exclude:
+ if e == router.version:
+ return False
+ return True
class VersionRangeRestriction(NodeRestriction):
- def __init__(self, gr_eq, less_eq=None):
- self.gr_eq = TorCtl.RouterVersion(gr_eq)
- if less_eq: self.less_eq = TorCtl.RouterVersion(less_eq)
- else: self.less_eq = None
-
+ def __init__(self, gr_eq, less_eq=None):
+ self.gr_eq = TorCtl.RouterVersion(gr_eq)
+ if less_eq: self.less_eq = TorCtl.RouterVersion(less_eq)
+ else: self.less_eq = None
+
- def r_is_ok(self, router):
- return (not self.gr_eq or router.version >= self.gr_eq) and \
- (not self.less_eq or router.version <= self.less_eq)
+ def r_is_ok(self, router):
+ return (not self.gr_eq or router.version >= self.gr_eq) and \
+ (not self.less_eq or router.version <= self.less_eq)
class ExitPolicyRestriction(NodeRestriction):
- def __init__(self, to_ip, to_port):
- self.to_ip = to_ip
- self.to_port = to_port
+ def __init__(self, to_ip, to_port):
+ self.to_ip = to_ip
+ self.to_port = to_port
- def r_is_ok(self, r): return r.will_exit_to(self.to_ip, self.to_port)
+ def r_is_ok(self, r): return r.will_exit_to(self.to_ip, self.to_port)
class MetaNodeRestriction(NodeRestriction):
- # XXX: these should collapse the restriction and return a new
- # instance for re-insertion (or None)
- def next_rstr(self): raise NotImplemented()
- def del_restriction(self, RestrictionClass): raise NotImplemented()
+ # XXX: these should collapse the restriction and return a new
+ # instance for re-insertion (or None)
+ def next_rstr(self): raise NotImplemented()
+ def del_restriction(self, RestrictionClass): raise NotImplemented()
class OrNodeRestriction(MetaNodeRestriction):
- def __init__(self, rs):
- self.rstrs = rs
+ def __init__(self, rs):
+ self.rstrs = rs
- def r_is_ok(self, r):
- for rs in self.rstrs:
- if rs.r_is_ok(r):
- return True
- return False
+ def r_is_ok(self, r):
+ for rs in self.rstrs:
+ if rs.r_is_ok(r):
+ return True
+ return False
class NotNodeRestriction(MetaNodeRestriction):
- def __init__(self, a):
- self.a = a
+ def __init__(self, a):
+ self.a = a
- def r_is_ok(self, r): return not self.a.r_is_ok(r)
+ def r_is_ok(self, r): return not self.a.r_is_ok(r)
class AtLeastNNodeRestriction(MetaNodeRestriction):
- def __init__(self, rstrs, n):
- self.rstrs = rstrs
- self.n = n
+ def __init__(self, rstrs, n):
+ self.rstrs = rstrs
+ self.n = n
- def r_is_ok(self, r):
- cnt = 0
- for rs in self.rstrs:
- if rs.r_is_ok(r):
- cnt += 1
- if cnt < self.n: return False
- else: return True
+ def r_is_ok(self, r):
+ cnt = 0
+ for rs in self.rstrs:
+ if rs.r_is_ok(r):
+ cnt += 1
+ if cnt < self.n: return False
+ else: return True
#################### Path Restrictions #####################
class Subnet16Restriction(PathRestriction):
- def r_is_ok(self, path, router):
- mask16 = struct.unpack(">I", socket.inet_aton("255.255.0.0"))[0]
- ip16 = router.ip & mask16
- for r in path:
- if ip16 == (r.ip & mask16):
- return False
- return True
+ def r_is_ok(self, path, router):
+ mask16 = struct.unpack(">I", socket.inet_aton("255.255.0.0"))[0]
+ ip16 = router.ip & mask16
+ for r in path:
+ if ip16 == (r.ip & mask16):
+ return False
+ return True
class UniqueRestriction(PathRestriction):
- def r_is_ok(self, path, r): return not r in path
+ def r_is_ok(self, path, r): return not r in path
#################### Node Generators ######################
class UniformGenerator(NodeGenerator):
- def next_r(self):
- while not self.all_chosen():
- r = random.choice(self.routers)
- self.mark_chosen(r)
- yield r
+ def next_r(self):
+ while not self.all_chosen():
+ r = random.choice(self.routers)
+ self.mark_chosen(r)
+ yield r
class OrderedExitGenerator(NodeGenerator):
- def __init__(self, restriction_list, to_port):
- self.to_port = to_port
- self.next_exit_by_port = {}
- NodeGenerator.__init__(self, restriction_list)
+ def __init__(self, restriction_list, to_port):
+ self.to_port = to_port
+ self.next_exit_by_port = {}
+ NodeGenerator.__init__(self, restriction_list)
- def rewind(self):
- NodeGenerator.rewind(self)
- if self.to_port not in self.next_exit_by_port or not self.next_exit_by_port[self.to_port]:
- self.next_exit_by_port[self.to_port] = 0
- self.last_idx = len(self.routers)
- else:
- self.last_idx = self.next_exit_by_port[self.to_port]
+ def rewind(self):
+ NodeGenerator.rewind(self)
+ if self.to_port not in self.next_exit_by_port or not self.next_exit_by_port[self.to_port]:
+ self.next_exit_by_port[self.to_port] = 0
+ self.last_idx = len(self.routers)
+ else:
+ self.last_idx = self.next_exit_by_port[self.to_port]
- def set_port(self, port):
- self.to_port = port
- self.rewind()
-
- # Just in case:
- def mark_chosen(self, r): raise NotImplemented()
- def all_chosen(self): raise NotImplemented()
+ def set_port(self, port):
+ self.to_port = port
+ self.rewind()
+
+ # Just in case:
+ def mark_chosen(self, r): raise NotImplemented()
+ def all_chosen(self): raise NotImplemented()
- def next_r(self):
- while True: # A do..while would be real nice here..
- if self.next_exit_by_port[self.to_port] >= len(self.routers):
- self.next_exit_by_port[self.to_port] = 0
- r = self.routers[self.next_exit_by_port[self.to_port]]
- self.next_exit_by_port[self.to_port] += 1
- yield r
- if self.last_idx == self.next_exit_by_port[self.to_port]:
- break
+ def next_r(self):
+ while True: # A do..while would be real nice here..
+ if self.next_exit_by_port[self.to_port] >= len(self.routers):
+ self.next_exit_by_port[self.to_port] = 0
+ r = self.routers[self.next_exit_by_port[self.to_port]]
+ self.next_exit_by_port[self.to_port] += 1
+ yield r
+ if self.last_idx == self.next_exit_by_port[self.to_port]:
+ break
####################### Secret Sauce ###########################
class PathError(Exception):
- pass
+ pass
class NoRouters(PathError):
- pass
+ pass
class PathSelector:
- "Implementation of path selection policies"
- def __init__(self, entry_gen, mid_gen, exit_gen, path_restrict):
- self.entry_gen = entry_gen
- self.mid_gen = mid_gen
- self.exit_gen = exit_gen
- self.path_restrict = path_restrict
+ "Implementation of path selection policies"
+ def __init__(self, entry_gen, mid_gen, exit_gen, path_restrict):
+ self.entry_gen = entry_gen
+ self.mid_gen = mid_gen
+ self.exit_gen = exit_gen
+ self.path_restrict = path_restrict
- def entry_chooser(self, path):
- self.entry_gen.rewind()
- for r in self.entry_gen.next_r():
- if self.path_restrict.entry_is_ok(path, r):
- return r
- raise NoRouters();
-
- def middle_chooser(self, path):
- self.mid_gen.rewind()
- for r in self.mid_gen.next_r():
- if self.path_restrict.middle_is_ok(path, r):
- return r
- raise NoRouters();
+ def entry_chooser(self, path):
+ self.entry_gen.rewind()
+ for r in self.entry_gen.next_r():
+ if self.path_restrict.entry_is_ok(path, r):
+ return r
+ raise NoRouters();
+
+ def middle_chooser(self, path):
+ self.mid_gen.rewind()
+ for r in self.mid_gen.next_r():
+ if self.path_restrict.middle_is_ok(path, r):
+ return r
+ raise NoRouters();
- def exit_chooser(self, path):
- self.exit_gen.rewind()
- for r in self.exit_gen.next_r():
- if self.path_restrict.exit_is_ok(path, r):
- return r
- raise NoRouters();
+ def exit_chooser(self, path):
+ self.exit_gen.rewind()
+ for r in self.exit_gen.next_r():
+ if self.path_restrict.exit_is_ok(path, r):
+ return r
+ raise NoRouters();
class SelectionManager:
- """Helper class to handle configuration updates
-
- The methods are NOT threadsafe. They may ONLY be called from
- EventHandler's thread.
+ """Helper class to handle configuration updates
+
+ The methods are NOT threadsafe. They may ONLY be called from
+ EventHandler's thread.
- To update the selection manager, schedule a config update job
- using PathBuilder.schedule_selmgr() with a worker function
- to modify this object.
- """
- def __init__(self, pathlen, order_exits,
- percent_fast, percent_skip, min_bw, use_all_exits,
- uniform, use_exit, use_guards):
- self.__ordered_exit_gen = None
- self.pathlen = pathlen
- self.order_exits = order_exits
- self.percent_fast = percent_fast
- self.percent_skip = percent_skip
- self.min_bw = min_bw
- self.use_all_exits = use_all_exits
- self.uniform = uniform
- self.exit_name = use_exit
- self.use_guards = use_guards
+ To update the selection manager, schedule a config update job
+ using PathBuilder.schedule_selmgr() with a worker function
+ to modify this object.
+ """
+ def __init__(self, pathlen, order_exits,
+ percent_fast, percent_skip, min_bw, use_all_exits,
+ uniform, use_exit, use_guards):
+ self.__ordered_exit_gen = None
+ self.pathlen = pathlen
+ self.order_exits = order_exits
+ self.percent_fast = percent_fast
+ self.percent_skip = percent_skip
+ self.min_bw = min_bw
+ self.use_all_exits = use_all_exits
+ self.uniform = uniform
+ self.exit_name = use_exit
+ self.use_guards = use_guards
- def reconfigure(self, sorted_r):
- if self.use_all_exits:
- self.path_rstr = PathRestrictionList([])
- else:
- self.path_rstr = PathRestrictionList(
- [Subnet16Restriction(), UniqueRestriction()])
-
- self.entry_rstr = NodeRestrictionList(
- [
- PercentileRestriction(self.percent_skip, self.percent_fast,
- sorted_r),
- ConserveExitsRestriction(),
- FlagsRestriction(["Guard", "Valid", "Running"], [])
- ], sorted_r)
- self.mid_rstr = NodeRestrictionList(
- [PercentileRestriction(self.percent_skip, self.percent_fast,
- sorted_r),
- ConserveExitsRestriction(),
- FlagsRestriction(["Valid", "Running"], [])], sorted_r)
+ def reconfigure(self, sorted_r):
+ if self.use_all_exits:
+ self.path_rstr = PathRestrictionList([])
+ else:
+ self.path_rstr = PathRestrictionList(
+ [Subnet16Restriction(), UniqueRestriction()])
+
+ self.entry_rstr = NodeRestrictionList(
+ [
+ PercentileRestriction(self.percent_skip, self.percent_fast,
+ sorted_r),
+ ConserveExitsRestriction(),
+ FlagsRestriction(["Guard", "Valid", "Running"], [])
+ ], sorted_r)
+ self.mid_rstr = NodeRestrictionList(
+ [PercentileRestriction(self.percent_skip, self.percent_fast,
+ sorted_r),
+ ConserveExitsRestriction(),
+ FlagsRestriction(["Valid", "Running"], [])], sorted_r)
- if self.use_all_exits:
- self.exit_rstr = NodeRestrictionList(
- [FlagsRestriction(["Valid", "Running"], ["BadExit"])], sorted_r)
- else:
- self.exit_rstr = NodeRestrictionList(
- [PercentileRestriction(self.percent_skip, self.percent_fast,
- sorted_r),
- FlagsRestriction(["Valid", "Running"], ["BadExit"])],
- sorted_r)
+ if self.use_all_exits:
+ self.exit_rstr = NodeRestrictionList(
+ [FlagsRestriction(["Valid", "Running"], ["BadExit"])], sorted_r)
+ else:
+ self.exit_rstr = NodeRestrictionList(
+ [PercentileRestriction(self.percent_skip, self.percent_fast,
+ sorted_r),
+ FlagsRestriction(["Valid", "Running"], ["BadExit"])],
+ sorted_r)
- if self.exit_name:
- if self.exit_name[0] == '$':
- self.exit_rstr.add_restriction(IdHexRestriction(self.exit_name))
- else:
- self.exit_rstr.add_restriction(NickRestriction(self.exit_name))
+ if self.exit_name:
+ if self.exit_name[0] == '$':
+ self.exit_rstr.add_restriction(IdHexRestriction(self.exit_name))
+ else:
+ self.exit_rstr.add_restriction(NickRestriction(self.exit_name))
- # This is kind of hokey..
- if self.order_exits:
- if self.__ordered_exit_gen:
- exitgen = self.__ordered_exit_gen
- else:
- exitgen = self.__ordered_exit_gen = \
- OrderedExitGenerator(self.exit_rstr, 80)
- else:
- exitgen = UniformGenerator(self.exit_rstr)
+ # This is kind of hokey..
+ if self.order_exits:
+ if self.__ordered_exit_gen:
+ exitgen = self.__ordered_exit_gen
+ else:
+ exitgen = self.__ordered_exit_gen = \
+ OrderedExitGenerator(self.exit_rstr, 80)
+ else:
+ exitgen = UniformGenerator(self.exit_rstr)
- if self.uniform:
- self.path_selector = PathSelector(
- UniformGenerator(self.entry_rstr),
- UniformGenerator(self.mid_rstr),
- exitgen, self.path_rstr)
- else:
- raise NotImplemented()
+ if self.uniform:
+ self.path_selector = PathSelector(
+ UniformGenerator(self.entry_rstr),
+ UniformGenerator(self.mid_rstr),
+ exitgen, self.path_rstr)
+ else:
+ raise NotImplemented()
- def set_target(self, ip, port):
- self.exit_rstr.del_restriction(ExitPolicyRestriction)
- self.exit_rstr.add_restriction(ExitPolicyRestriction(ip, port))
- if self.__ordered_exit_gen: self.__ordered_exit_gen.set_port(port)
+ def set_target(self, ip, port):
+ self.exit_rstr.del_restriction(ExitPolicyRestriction)
+ self.exit_rstr.add_restriction(ExitPolicyRestriction(ip, port))
+ if self.__ordered_exit_gen: self.__ordered_exit_gen.set_port(port)
- def update_routers(self, new_rlist):
- self.entry_rstr.update_routers(new_rlist)
- self.mid_rstr.update_routers(new_rlist)
- self.exit_rstr.update_routers(new_rlist)
+ def update_routers(self, new_rlist):
+ self.entry_rstr.update_routers(new_rlist)
+ self.mid_rstr.update_routers(new_rlist)
+ self.exit_rstr.update_routers(new_rlist)
class Circuit(TorCtl.Circuit):
- def __init__(self, circuit): # Promotion constructor
- # perf shortcut since we don't care about the 'circuit'
- # instance after this
- self.__dict__ = circuit.__dict__
- self.built = False
- self.detached_cnt = 0
- self.created_at = datetime.datetime.now()
- self.pending_streams = [] # Which stream IDs are pending us
+ def __init__(self, circuit): # Promotion constructor
+ # perf shortcut since we don't care about the 'circuit'
+ # instance after this
+ self.__dict__ = circuit.__dict__
+ self.built = False
+ self.detached_cnt = 0
+ self.created_at = datetime.datetime.now()
+ self.pending_streams = [] # Which stream IDs are pending us
class Stream:
- def __init__(self, sid, host, port, kind):
- self.sid = sid
- self.detached_from = [] # circ id #'s
- self.pending_circ = None
- self.circ = None
- self.host = host
- self.port = port
- self.kind = kind
+ def __init__(self, sid, host, port, kind):
+ self.sid = sid
+ self.detached_from = [] # circ id #'s
+ self.pending_circ = None
+ self.circ = None
+ self.host = host
+ self.port = port
+ self.kind = kind
# TODO: Make passive "PathWatcher" so people can get aggregate
# node reliability stats for normal usage without us attaching streams
class PathBuilder(TorCtl.EventHandler):
- """
- PathBuilder implementation. Handles circuit construction, subject
- to the constraints of the SelectionManager selmgr.
-
- Do not access this object from other threads. Instead, use the
- schedule_* functions to schedule work to be done in the thread
- of the EventHandler.
- """
- def __init__(self, c, selmgr, RouterClass):
- TorCtl.EventHandler.__init__(self)
- self.c = c
- nslist = c.get_network_status()
- self.last_exit = None
- self.new_nym = False
- self.resolve_port = 0
- self.num_circuits = 1
- self.RouterClass = RouterClass
- self.sorted_r = []
- self.routers = {}
- self.circuits = {}
- self.streams = {}
- self.read_routers(nslist)
- self.selmgr = selmgr
- self.selmgr.reconfigure(self.sorted_r)
- self.imm_jobs = Queue.Queue()
- self.low_prio_jobs = Queue.Queue()
- self.do_reconfigure = False
- plog("INFO", "Read "+str(len(self.sorted_r))+"/"+str(len(nslist))+" routers")
+ """
+ PathBuilder implementation. Handles circuit construction, subject
+ to the constraints of the SelectionManager selmgr.
+
+ Do not access this object from other threads. Instead, use the
+ schedule_* functions to schedule work to be done in the thread
+ of the EventHandler.
+ """
+ def __init__(self, c, selmgr, RouterClass):
+ TorCtl.EventHandler.__init__(self)
+ self.c = c
+ nslist = c.get_network_status()
+ self.last_exit = None
+ self.new_nym = False
+ self.resolve_port = 0
+ self.num_circuits = 1
+ self.RouterClass = RouterClass
+ self.sorted_r = []
+ self.routers = {}
+ self.circuits = {}
+ self.streams = {}
+ self.read_routers(nslist)
+ self.selmgr = selmgr
+ self.selmgr.reconfigure(self.sorted_r)
+ self.imm_jobs = Queue.Queue()
+ self.low_prio_jobs = Queue.Queue()
+ self.do_reconfigure = False
+ plog("INFO", "Read "+str(len(self.sorted_r))+"/"+str(len(nslist))+" routers")
- def read_routers(self, nslist):
- routers = self.c.read_routers(nslist)
- new_routers = []
- for r in routers:
- if r.idhex in self.routers:
- if self.routers[r.idhex].nickname != r.nickname:
- plog("NOTICE", "Router "+r.idhex+" changed names from "
- +self.routers[r.idhex].nickname+" to "+r.nickname)
- # Must do IN-PLACE update to keep all the refs to this router
- # valid and current (especially for stats)
- self.routers[r.idhex].update_to(r)
- else:
- rc = self.RouterClass(r)
- self.routers[r.idhex] = rc
- new_routers.append(rc)
- self.sorted_r.extend(new_routers)
- self.sorted_r.sort(lambda x, y: cmp(y.bw, x.bw))
+ def read_routers(self, nslist):
+ routers = self.c.read_routers(nslist)
+ new_routers = []
+ for r in routers:
+ if r.idhex in self.routers:
+ if self.routers[r.idhex].nickname != r.nickname:
+ plog("NOTICE", "Router "+r.idhex+" changed names from "
+ +self.routers[r.idhex].nickname+" to "+r.nickname)
+ # Must do IN-PLACE update to keep all the refs to this router
+ # valid and current (especially for stats)
+ self.routers[r.idhex].update_to(r)
+ else:
+ rc = self.RouterClass(r)
+ self.routers[r.idhex] = rc
+ new_routers.append(rc)
+ self.sorted_r.extend(new_routers)
+ self.sorted_r.sort(lambda x, y: cmp(y.bw, x.bw))
- def attach_stream_any(self, stream, badcircs):
- # Newnym, and warn if not built plus pending
- unattached_streams = [stream]
- if self.new_nym:
- self.new_nym = False
- plog("DEBUG", "Obeying new nym")
- for key in self.circuits.keys():
- if len(self.circuits[key].pending_streams):
- plog("WARN", "New nym called, destroying circuit "+str(key)
- +" with "+str(len(self.circuits[key].pending_streams))
- +" pending streams")
- unattached_streams.extend(self.circuits[key].pending_streams)
- # FIXME: Consider actually closing circ if no streams.
- del self.circuits[key]
-
- for circ in self.circuits.itervalues():
- if circ.built and circ.cid not in badcircs:
- if circ.exit.will_exit_to(stream.host, stream.port):
- try:
- self.c.attach_stream(stream.sid, circ.cid)
- stream.pending_circ = circ # Only one possible here
- circ.pending_streams.append(stream)
- except TorCtl.ErrorReply, e:
- # No need to retry here. We should get the failed
- # event for either the circ or stream next
- plog("WARN", "Error attaching stream: "+str(e.args))
- return
- break
- else:
- circ = None
- while circ == None:
- self.selmgr.set_target(stream.host, stream.port)
- try:
- circ = Circuit(self.c.build_circuit(
- self.selmgr.pathlen,
- self.selmgr.path_selector))
- except TorCtl.ErrorReply, e:
- # FIXME: How come some routers are non-existant? Shouldn't
- # we have gotten an NS event to notify us they
- # disappeared?
- plog("NOTICE", "Error building circ: "+str(e.args))
- for u in unattached_streams:
- plog("DEBUG",
- "Attaching "+str(u.sid)+" pending build of "+str(circ.cid))
- u.pending_circ = circ
- circ.pending_streams.extend(unattached_streams)
- self.circuits[circ.cid] = circ
- self.last_exit = circ.exit
+ def attach_stream_any(self, stream, badcircs):
+ # Newnym, and warn if not built plus pending
+ unattached_streams = [stream]
+ if self.new_nym:
+ self.new_nym = False
+ plog("DEBUG", "Obeying new nym")
+ for key in self.circuits.keys():
+ if len(self.circuits[key].pending_streams):
+ plog("WARN", "New nym called, destroying circuit "+str(key)
+ +" with "+str(len(self.circuits[key].pending_streams))
+ +" pending streams")
+ unattached_streams.extend(self.circuits[key].pending_streams)
+ # FIXME: Consider actually closing circ if no streams.
+ del self.circuits[key]
+
+ for circ in self.circuits.itervalues():
+ if circ.built and circ.cid not in badcircs:
+ if circ.exit.will_exit_to(stream.host, stream.port):
+ try:
+ self.c.attach_stream(stream.sid, circ.cid)
+ stream.pending_circ = circ # Only one possible here
+ circ.pending_streams.append(stream)
+ except TorCtl.ErrorReply, e:
+ # No need to retry here. We should get the failed
+ # event for either the circ or stream next
+ plog("WARN", "Error attaching stream: "+str(e.args))
+ return
+ break
+ else:
+ circ = None
+ while circ == None:
+ self.selmgr.set_target(stream.host, stream.port)
+ try:
+ circ = Circuit(self.c.build_circuit(
+ self.selmgr.pathlen,
+ self.selmgr.path_selector))
+ except TorCtl.ErrorReply, e:
+ # FIXME: How come some routers are non-existant? Shouldn't
+ # we have gotten an NS event to notify us they
+ # disappeared?
+ plog("NOTICE", "Error building circ: "+str(e.args))
+ for u in unattached_streams:
+ plog("DEBUG",
+ "Attaching "+str(u.sid)+" pending build of "+str(circ.cid))
+ u.pending_circ = circ
+ circ.pending_streams.extend(unattached_streams)
+ self.circuits[circ.cid] = circ
+ self.last_exit = circ.exit
- def schedule_immediate(self, job):
- """
- Schedules an immediate job to be run before the next event is
- processed.
- """
- self.imm_jobs.put(job)
+ def schedule_immediate(self, job):
+ """
+ Schedules an immediate job to be run before the next event is
+ processed.
+ """
+ self.imm_jobs.put(job)
- def schedule_low_prio(self, job):
- """
- Schedules a job to be run when a non-time critical event arrives.
- """
- self.low_prio_jobs.put(job)
+ def schedule_low_prio(self, job):
+ """
+ Schedules a job to be run when a non-time critical event arrives.
+ """
+ self.low_prio_jobs.put(job)
- def schedule_selmgr(self, job):
- """
- Schedules an immediate job to be run before the next event is
- processed. Also notifies the selection manager that it needs
- to update itself.
- """
- def notlambda(this):
- job(this.selmgr)
- this.do_reconfigure = True
- self.schedule_immediate(notlambda)
+ def schedule_selmgr(self, job):
+ """
+ Schedules an immediate job to be run before the next event is
+ processed. Also notifies the selection manager that it needs
+ to update itself.
+ """
+ def notlambda(this):
+ job(this.selmgr)
+ this.do_reconfigure = True
+ self.schedule_immediate(notlambda)
- def heartbeat_event(self, event):
- while not self.imm_jobs.empty():
- imm_job = self.imm_jobs.get_nowait()
- imm_job(self)
-
- if self.do_reconfigure:
- self.selmgr.reconfigure(self.sorted_r)
- self.do_reconfigure = False
-
- # If event is stream:NEW*/DETACHED or circ BUILT/FAILED,
- # don't run low prio jobs.. No need to delay streams on them.
- if isinstance(event, TorCtl.CircuitEvent):
- if event.status in ("BUILT", "FAILED"): return
- elif isinstance(event, TorCtl.StreamEvent):
- if event.status in ("NEW", "NEWRESOLVE", "DETACHED"): return
-
- # Do the low prio jobs one at a time in case a
- # higher priority event is queued
- if not self.low_prio_jobs.empty():
- delay_job = self.low_prio_jobs.get_nowait()
- delay_job(self)
+ def heartbeat_event(self, event):
+ while not self.imm_jobs.empty():
+ imm_job = self.imm_jobs.get_nowait()
+ imm_job(self)
+
+ if self.do_reconfigure:
+ self.selmgr.reconfigure(self.sorted_r)
+ self.do_reconfigure = False
+
+ # If event is stream:NEW*/DETACHED or circ BUILT/FAILED,
+ # don't run low prio jobs.. No need to delay streams on them.
+ if isinstance(event, TorCtl.CircuitEvent):
+ if event.status in ("BUILT", "FAILED"): return
+ elif isinstance(event, TorCtl.StreamEvent):
+ if event.status in ("NEW", "NEWRESOLVE", "DETACHED"): return
+
+ # Do the low prio jobs one at a time in case a
+ # higher priority event is queued
+ if not self.low_prio_jobs.empty():
+ delay_job = self.low_prio_jobs.get_nowait()
+ delay_job(self)
- def circ_status_event(self, c):
- output = [c.event_name, str(c.circ_id), c.status]
- if c.path: output.append(",".join(c.path))
- if c.reason: output.append("REASON=" + c.reason)
- if c.remote_reason: output.append("REMOTE_REASON=" + c.remote_reason)
- plog("DEBUG", " ".join(output))
- # Circuits we don't control get built by Tor
- if c.circ_id not in self.circuits:
- plog("DEBUG", "Ignoring circ " + str(c.circ_id))
- return
- if c.status == "FAILED" or c.status == "CLOSED":
- circ = self.circuits[c.circ_id]
- del self.circuits[c.circ_id]
- for stream in circ.pending_streams:
- plog("DEBUG", "Finding new circ for " + str(stream.sid))
- self.attach_stream_any(stream, stream.detached_from)
- elif c.status == "BUILT":
- self.circuits[c.circ_id].built = True
- try:
- for stream in self.circuits[c.circ_id].pending_streams:
- self.c.attach_stream(stream.sid, c.circ_id)
- except TorCtl.ErrorReply, e:
- # No need to retry here. We should get the failed
- # event for either the circ or stream next
- plog("WARN", "Error attaching stream: "+str(e.args))
- return
+ def circ_status_event(self, c):
+ output = [c.event_name, str(c.circ_id), c.status]
+ if c.path: output.append(",".join(c.path))
+ if c.reason: output.append("REASON=" + c.reason)
+ if c.remote_reason: output.append("REMOTE_REASON=" + c.remote_reason)
+ plog("DEBUG", " ".join(output))
+ # Circuits we don't control get built by Tor
+ if c.circ_id not in self.circuits:
+ plog("DEBUG", "Ignoring circ " + str(c.circ_id))
+ return
+ if c.status == "FAILED" or c.status == "CLOSED":
+ circ = self.circuits[c.circ_id]
+ del self.circuits[c.circ_id]
+ for stream in circ.pending_streams:
+ plog("DEBUG", "Finding new circ for " + str(stream.sid))
+ self.attach_stream_any(stream, stream.detached_from)
+ elif c.status == "BUILT":
+ self.circuits[c.circ_id].built = True
+ try:
+ for stream in self.circuits[c.circ_id].pending_streams:
+ self.c.attach_stream(stream.sid, c.circ_id)
+ except TorCtl.ErrorReply, e:
+ # No need to retry here. We should get the failed
+ # event for either the circ or stream next
+ plog("WARN", "Error attaching stream: "+str(e.args))
+ return
- def stream_status_event(self, s):
- output = [s.event_name, str(s.strm_id), s.status, str(s.circ_id),
- s.target_host, str(s.target_port)]
- if s.reason: output.append("REASON=" + s.reason)
- if s.remote_reason: output.append("REMOTE_REASON=" + s.remote_reason)
- plog("DEBUG", " ".join(output))
- if not re.match(r"\d+.\d+.\d+.\d+", s.target_host):
- s.target_host = "255.255.255.255" # ignore DNS for exit policy check
- if s.status == "NEW" or s.status == "NEWRESOLVE":
- if s.status == "NEWRESOLVE" and not s.target_port:
- s.target_port = self.resolve_port
- self.streams[s.strm_id] = Stream(s.strm_id, s.target_host, s.target_port, s.status)
+ def stream_status_event(self, s):
+ output = [s.event_name, str(s.strm_id), s.status, str(s.circ_id),
+ s.target_host, str(s.target_port)]
+ if s.reason: output.append("REASON=" + s.reason)
+ if s.remote_reason: output.append("REMOTE_REASON=" + s.remote_reason)
+ plog("DEBUG", " ".join(output))
+ if not re.match(r"\d+.\d+.\d+.\d+", s.target_host):
+ s.target_host = "255.255.255.255" # ignore DNS for exit policy check
+ if s.status == "NEW" or s.status == "NEWRESOLVE":
+ if s.status == "NEWRESOLVE" and not s.target_port:
+ s.target_port = self.resolve_port
+ self.streams[s.strm_id] = Stream(s.strm_id, s.target_host, s.target_port, s.status)
- self.attach_stream_any(self.streams[s.strm_id],
- self.streams[s.strm_id].detached_from)
- elif s.status == "DETACHED":
- if s.strm_id not in self.streams:
- plog("WARN", "Detached stream "+str(s.strm_id)+" not found")
- self.streams[s.strm_id] = Stream(s.strm_id, s.target_host,
- s.target_port, "NEW")
- # FIXME Stats (differentiate Resolved streams also..)
- if not s.circ_id:
- plog("WARN", "Stream "+str(s.strm_id)+" detached from no circuit!")
- else:
- self.streams[s.strm_id].detached_from.append(s.circ_id)
+ self.attach_stream_any(self.streams[s.strm_id],
+ self.streams[s.strm_id].detached_from)
+ elif s.status == "DETACHED":
+ if s.strm_id not in self.streams:
+ plog("WARN", "Detached stream "+str(s.strm_id)+" not found")
+ self.streams[s.strm_id] = Stream(s.strm_id, s.target_host,
+ s.target_port, "NEW")
+ # FIXME Stats (differentiate Resolved streams also..)
+ if not s.circ_id:
+ plog("WARN", "Stream "+str(s.strm_id)+" detached from no circuit!")
+ else:
+ self.streams[s.strm_id].detached_from.append(s.circ_id)
-
- if self.streams[s.strm_id] in self.streams[s.strm_id].pending_circ.pending_streams:
- self.streams[s.strm_id].pending_circ.pending_streams.remove(self.streams[s.strm_id])
- self.streams[s.strm_id].pending_circ = None
- self.attach_stream_any(self.streams[s.strm_id],
- self.streams[s.strm_id].detached_from)
- elif s.status == "SUCCEEDED":
- if s.strm_id not in self.streams:
- plog("NOTICE", "Succeeded stream "+str(s.strm_id)+" not found")
- return
- self.streams[s.strm_id].circ = self.streams[s.strm_id].pending_circ
- self.streams[s.strm_id].circ.pending_streams.remove(self.streams[s.strm_id])
- self.streams[s.strm_id].pending_circ = None
- elif s.status == "FAILED" or s.status == "CLOSED":
- # FIXME stats
- if s.strm_id not in self.streams:
- plog("NOTICE", "Failed stream "+str(s.strm_id)+" not found")
- return
+
+ if self.streams[s.strm_id] in self.streams[s.strm_id].pending_circ.pending_streams:
+ self.streams[s.strm_id].pending_circ.pending_streams.remove(self.streams[s.strm_id])
+ self.streams[s.strm_id].pending_circ = None
+ self.attach_stream_any(self.streams[s.strm_id],
+ self.streams[s.strm_id].detached_from)
+ elif s.status == "SUCCEEDED":
+ if s.strm_id not in self.streams:
+ plog("NOTICE", "Succeeded stream "+str(s.strm_id)+" not found")
+ return
+ self.streams[s.strm_id].circ = self.streams[s.strm_id].pending_circ
+ self.streams[s.strm_id].circ.pending_streams.remove(self.streams[s.strm_id])
+ self.streams[s.strm_id].pending_circ = None
+ elif s.status == "FAILED" or s.status == "CLOSED":
+ # FIXME stats
+ if s.strm_id not in self.streams:
+ plog("NOTICE", "Failed stream "+str(s.strm_id)+" not found")
+ return
- if not s.circ_id:
- plog("WARN", "Stream "+str(s.strm_id)+" failed from no circuit!")
+ if not s.circ_id:
+ plog("WARN", "Stream "+str(s.strm_id)+" failed from no circuit!")
- # We get failed and closed for each stream. OK to return
- # and let the closed do the cleanup
- # (FIXME: be careful about double stats)
- if s.status == "FAILED":
- # Avoid busted circuits that will not resolve or carry
- # traffic. FIXME: Failed count before doing this?
- if s.circ_id in self.circuits: del self.circuits[s.circ_id]
- else: plog("WARN","Failed stream on unknown circ "+str(s.circ_id))
- return
+ # We get failed and closed for each stream. OK to return
+ # and let the closed do the cleanup
+ # (FIXME: be careful about double stats)
+ if s.status == "FAILED":
+ # Avoid busted circuits that will not resolve or carry
+ # traffic. FIXME: Failed count before doing this?
+ if s.circ_id in self.circuits: del self.circuits[s.circ_id]
+ else: plog("WARN","Failed stream on unknown circ "+str(s.circ_id))
+ return
- if self.streams[s.strm_id].pending_circ:
- self.streams[s.strm_id].pending_circ.pending_streams.remove(self.streams[s.strm_id])
- del self.streams[s.strm_id]
- elif s.status == "REMAP":
- if s.strm_id not in self.streams:
- plog("WARN", "Remap id "+str(s.strm_id)+" not found")
- else:
- if not re.match(r"\d+.\d+.\d+.\d+", s.target_host):
- s.target_host = "255.255.255.255"
- plog("NOTICE", "Non-IP remap for "+str(s.strm_id)+" to "
- + s.target_host)
- self.streams[s.strm_id].host = s.target_host
- self.streams[s.strm_id].port = s.target_port
+ if self.streams[s.strm_id].pending_circ:
+ self.streams[s.strm_id].pending_circ.pending_streams.remove(self.streams[s.strm_id])
+ del self.streams[s.strm_id]
+ elif s.status == "REMAP":
+ if s.strm_id not in self.streams:
+ plog("WARN", "Remap id "+str(s.strm_id)+" not found")
+ else:
+ if not re.match(r"\d+.\d+.\d+.\d+", s.target_host):
+ s.target_host = "255.255.255.255"
+ plog("NOTICE", "Non-IP remap for "+str(s.strm_id)+" to "
+ + s.target_host)
+ self.streams[s.strm_id].host = s.target_host
+ self.streams[s.strm_id].port = s.target_port
- def ns_event(self, n):
- self.read_routers(n.nslist)
- plog("DEBUG", "Read " + str(len(n.nslist))+" NS => "
- + str(len(self.sorted_r)) + " routers")
- self.selmgr.update_routers(self.sorted_r)
-
- def new_desc_event(self, d):
- for i in d.idlist: # Is this too slow?
- self.read_routers(self.c.get_network_status("id/"+i))
- plog("DEBUG", "Read " + str(len(d.idlist))+" Desc => "
- + str(len(self.sorted_r)) + " routers")
- self.selmgr.update_routers(self.sorted_r)
+ def ns_event(self, n):
+ self.read_routers(n.nslist)
+ plog("DEBUG", "Read " + str(len(n.nslist))+" NS => "
+ + str(len(self.sorted_r)) + " routers")
+ self.selmgr.update_routers(self.sorted_r)
+
+ def new_desc_event(self, d):
+ for i in d.idlist: # Is this too slow?
+ self.read_routers(self.c.get_network_status("id/"+i))
+ plog("DEBUG", "Read " + str(len(d.idlist))+" Desc => "
+ + str(len(self.sorted_r)) + " routers")
+ self.selmgr.update_routers(self.sorted_r)
########################## Unit tests ##########################
def do_unit(rst, r_list, plamb):
- print "\n"
- print "-----------------------------------"
- print rst.r_is_ok.im_class
- for r in r_list:
- print r.nickname+" "+plamb(r)+"="+str(rst.r_is_ok(r))
+ print "\n"
+ print "-----------------------------------"
+ print rst.r_is_ok.im_class
+ for r in r_list:
+ print r.nickname+" "+plamb(r)+"="+str(rst.r_is_ok(r))
# TODO: Tests:
# - Test each NodeRestriction and print in/out lines for it
# - Test NodeGenerator and reapply NodeRestrictions
# - Same for PathSelector and PathRestrictions
-# - Also Reapply each restriction by hand to path. Verify returns true
+# - Also Reapply each restriction by hand to path. Verify returns true
if __name__ == '__main__':
- s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
- s.connect(("127.0.0.1",9061))
- c = Connection(s)
- c.debug(file("control.log", "w"))
- c.authenticate()
- nslist = c.get_network_status()
- sorted_rlist = c.read_routers(c.get_network_status())
-
- for r in sorted_rlist:
- if r.will_exit_to("211.11.21.22", 465):
- print r.nickname+" "+str(r.bw)
+ s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+ s.connect(("127.0.0.1",9061))
+ c = Connection(s)
+ c.debug(file("control.log", "w"))
+ c.authenticate()
+ nslist = c.get_network_status()
+ sorted_rlist = c.read_routers(c.get_network_status())
+
+ for r in sorted_rlist:
+ if r.will_exit_to("211.11.21.22", 465):
+ print r.nickname+" "+str(r.bw)
- do_unit(PercentileRestriction(0, 100, sorted_rlist), sorted_rlist,
- lambda r: "")
- do_unit(PercentileRestriction(10, 20, sorted_rlist), sorted_rlist,
- lambda r: "")
- do_unit(OSRestriction([r"[lL]inux", r"BSD", "Darwin"], []), sorted_rlist,
- lambda r: r.os)
- do_unit(OSRestriction([], ["Windows", "Solaris"]), sorted_rlist,
- lambda r: r.os)
+ do_unit(PercentileRestriction(0, 100, sorted_rlist), sorted_rlist,
+ lambda r: "")
+ do_unit(PercentileRestriction(10, 20, sorted_rlist), sorted_rlist,
+ lambda r: "")
+ do_unit(OSRestriction([r"[lL]inux", r"BSD", "Darwin"], []), sorted_rlist,
+ lambda r: r.os)
+ do_unit(OSRestriction([], ["Windows", "Solaris"]), sorted_rlist,
+ lambda r: r.os)
- do_unit(VersionRangeRestriction("0.1.2.0"), sorted_rlist,
- lambda r: str(r.version))
- do_unit(VersionRangeRestriction("0.1.2.0", "0.1.2.5"), sorted_rlist,
- lambda r: str(r.version))
- do_unit(VersionIncludeRestriction(["0.1.1.26-alpha", "0.1.2.7-ignored"]),
- sorted_rlist, lambda r: str(r.version))
- do_unit(VersionExcludeRestriction(["0.1.1.26"]), sorted_rlist,
- lambda r: str(r.version))
+ do_unit(VersionRangeRestriction("0.1.2.0"), sorted_rlist,
+ lambda r: str(r.version))
+ do_unit(VersionRangeRestriction("0.1.2.0", "0.1.2.5"), sorted_rlist,
+ lambda r: str(r.version))
+ do_unit(VersionIncludeRestriction(["0.1.1.26-alpha", "0.1.2.7-ignored"]),
+ sorted_rlist, lambda r: str(r.version))
+ do_unit(VersionExcludeRestriction(["0.1.1.26"]), sorted_rlist,
+ lambda r: str(r.version))
- do_unit(ConserveExitsRestriction(), sorted_rlist, lambda r: " ".join(r.flags))
- do_unit(FlagsRestriction([], ["Valid"]), sorted_rlist, lambda r: " ".join(r.flags))
+ do_unit(ConserveExitsRestriction(), sorted_rlist, lambda r: " ".join(r.flags))
+ do_unit(FlagsRestriction([], ["Valid"]), sorted_rlist, lambda r: " ".join(r.flags))
- # XXX: Need unittest
- do_unit(IdHexRestriction("$FFCB46DB1339DA84674C70D7CB586434C4370441"),
- sorted_rlist, lambda r: r.idhex)
+ # XXX: Need unittest
+ do_unit(IdHexRestriction("$FFCB46DB1339DA84674C70D7CB586434C4370441"),
+ sorted_rlist, lambda r: r.idhex)
- rl = [AtLeastNNodeRestriction([ExitPolicyRestriction("255.255.255.255", 80), ExitPolicyRestriction("255.255.255.255", 443), ExitPolicyRestriction("255.255.255.255", 6667)], 2), FlagsRestriction([], ["BadExit"])]
+ rl = [AtLeastNNodeRestriction([ExitPolicyRestriction("255.255.255.255", 80), ExitPolicyRestriction("255.255.255.255", 443), ExitPolicyRestriction("255.255.255.255", 6667)], 2), FlagsRestriction([], ["BadExit"])]
- exit_rstr = NodeRestrictionList(rl, sorted_rlist)
+ exit_rstr = NodeRestrictionList(rl, sorted_rlist)
- ug = UniformGenerator(exit_rstr)
+ ug = UniformGenerator(exit_rstr)
- rlist = []
- for r in ug.next_r():
- print "Checking: " + r.nickname
- for rs in rl:
- if not rs.r_is_ok(r):
- raise PathError()
- if not "Exit" in r.flags:
- print "No exit in flags of "+r.nickname
- rlist.append(r)
- for r in sorted_rlist:
- if "Exit" in r.flags and not r in rlist:
- print r.nickname+" is an exit not in rl!"
-
+ rlist = []
+ for r in ug.next_r():
+ print "Checking: " + r.nickname
+ for rs in rl:
+ if not rs.r_is_ok(r):
+ raise PathError()
+ if not "Exit" in r.flags:
+ print "No exit in flags of "+r.nickname
+ rlist.append(r)
+ for r in sorted_rlist:
+ if "Exit" in r.flags and not r in rlist:
+ print r.nickname+" is an exit not in rl!"
+
Modified: torflow/trunk/TorCtl/TorCtl.py
===================================================================
--- torflow/trunk/TorCtl/TorCtl.py 2007-03-12 21:07:19 UTC (rev 9806)
+++ torflow/trunk/TorCtl/TorCtl.py 2007-03-13 01:36:12 UTC (rev 9807)
@@ -25,952 +25,952 @@
# Types of "EVENT" message.
EVENT_TYPE = Enum2(
- CIRC="CIRC",
- STREAM="STREAM",
- ORCONN="ORCONN",
- BW="BW",
- NS="NS",
- NEWDESC="NEWDESC",
- DEBUG="DEBUG",
- INFO="INFO",
- NOTICE="NOTICE",
- WARN="WARN",
- ERR="ERR")
+ CIRC="CIRC",
+ STREAM="STREAM",
+ ORCONN="ORCONN",
+ BW="BW",
+ NS="NS",
+ NEWDESC="NEWDESC",
+ DEBUG="DEBUG",
+ INFO="INFO",
+ NOTICE="NOTICE",
+ WARN="WARN",
+ ERR="ERR")
class TorCtlError(Exception):
- "Generic error raised by TorControl code."
- pass
+ "Generic error raised by TorControl code."
+ pass
class TorCtlClosed(TorCtlError):
- "Raised when the controller connection is closed by Tor (not by us.)"
- pass
+ "Raised when the controller connection is closed by Tor (not by us.)"
+ pass
class ProtocolError(TorCtlError):
- "Raised on violations in Tor controller protocol"
- pass
+ "Raised on violations in Tor controller protocol"
+ pass
class ErrorReply(TorCtlError):
- "Raised when Tor controller returns an error"
- pass
+ "Raised when Tor controller returns an error"
+ pass
class NetworkStatus:
- "Filled in during NS events"
- def __init__(self, nickname, idhash, orhash, updated, ip, orport, dirport, flags):
- self.nickname = nickname
- self.idhash = idhash
- self.orhash = orhash
- self.ip = ip
- self.orport = int(orport)
- self.dirport = int(dirport)
- self.flags = flags
- self.idhex = (self.idhash + "=").decode("base64").encode("hex").upper()
- m = re.search(r"(\d+)-(\d+)-(\d+) (\d+):(\d+):(\d+)", updated)
- self.updated = datetime.datetime(*map(int, m.groups()))
+ "Filled in during NS events"
+ def __init__(self, nickname, idhash, orhash, updated, ip, orport, dirport, flags):
+ self.nickname = nickname
+ self.idhash = idhash
+ self.orhash = orhash
+ self.ip = ip
+ self.orport = int(orport)
+ self.dirport = int(dirport)
+ self.flags = flags
+ self.idhex = (self.idhash + "=").decode("base64").encode("hex").upper()
+ m = re.search(r"(\d+)-(\d+)-(\d+) (\d+):(\d+):(\d+)", updated)
+ self.updated = datetime.datetime(*map(int, m.groups()))
class NetworkStatusEvent:
- def __init__(self, event_name, nslist):
- self.event_name = event_name
- self.nslist = nslist # List of NetworkStatus objects
+ def __init__(self, event_name, nslist):
+ self.event_name = event_name
+ self.nslist = nslist # List of NetworkStatus objects
class NewDescEvent:
- def __init__(self, event_name, idlist):
- self.event_name = event_name
- self.idlist = idlist
+ def __init__(self, event_name, idlist):
+ self.event_name = event_name
+ self.idlist = idlist
class CircuitEvent:
- def __init__(self, event_name, circ_id, status, path, reason,
- remote_reason):
- self.event_name = event_name
- self.circ_id = circ_id
- self.status = status
- self.path = path
- self.reason = reason
- self.remote_reason = remote_reason
+ def __init__(self, event_name, circ_id, status, path, reason,
+ remote_reason):
+ self.event_name = event_name
+ self.circ_id = circ_id
+ self.status = status
+ self.path = path
+ self.reason = reason
+ self.remote_reason = remote_reason
class StreamEvent:
- def __init__(self, event_name, strm_id, status, circ_id, target_host,
- target_port, reason, remote_reason):
- self.event_name = event_name
- self.strm_id = strm_id
- self.status = status
- self.circ_id = circ_id
- self.target_host = target_host
- self.target_port = int(target_port)
- self.reason = reason
- self.remote_reason = remote_reason
+ def __init__(self, event_name, strm_id, status, circ_id, target_host,
+ target_port, reason, remote_reason):
+ self.event_name = event_name
+ self.strm_id = strm_id
+ self.status = status
+ self.circ_id = circ_id
+ self.target_host = target_host
+ self.target_port = int(target_port)
+ self.reason = reason
+ self.remote_reason = remote_reason
class ORConnEvent:
- def __init__(self, event_name, status, endpoint, age, read_bytes,
- wrote_bytes, reason, ncircs):
- self.event_name = event_name
- self.status = status
- self.endpoint = endpoint
- self.age = age
- self.read_bytes = read_bytes
- self.wrote_bytes = wrote_bytes
- self.reason = reason
- self.ncircs = ncircs
+ def __init__(self, event_name, status, endpoint, age, read_bytes,
+ wrote_bytes, reason, ncircs):
+ self.event_name = event_name
+ self.status = status
+ self.endpoint = endpoint
+ self.age = age
+ self.read_bytes = read_bytes
+ self.wrote_bytes = wrote_bytes
+ self.reason = reason
+ self.ncircs = ncircs
class LogEvent:
- def __init__(self, level, msg):
- self.event_name = self.level = level
- self.msg = msg
+ def __init__(self, level, msg):
+ self.event_name = self.level = level
+ self.msg = msg
class AddrMapEvent:
- def __init__(self, event_name, from_addr, to_addr, when, by_exit):
- self.event_name = event_name
- self.from_addr = from_addr
- self.to_addr = to_addr
- self.when = when
- self.by_exit = by_exit # XOXOXOX <3 ;) @ nickm
+ def __init__(self, event_name, from_addr, to_addr, when, by_exit):
+ self.event_name = event_name
+ self.from_addr = from_addr
+ self.to_addr = to_addr
+ self.when = when
+ self.by_exit = by_exit # XOXOXOX <3 ;) @ nickm
class BWEvent:
- def __init__(self, event_name, read, written):
- self.event_name = event_name
- self.read = read
- self.written = written
+ def __init__(self, event_name, read, written):
+ self.event_name = event_name
+ self.read = read
+ self.written = written
class UnknownEvent:
- def __init__(self, event_name, event_string):
- self.event_name = event_name
- self.event_string = event_string
+ def __init__(self, event_name, event_string):
+ self.event_name = event_name
+ self.event_string = event_string
class ExitPolicyLine:
- def __init__(self, match, ip_mask, port_low, port_high):
- self.match = match
- if ip_mask == "*":
- self.ip = 0
- self.netmask = 0
+ def __init__(self, match, ip_mask, port_low, port_high):
+ self.match = match
+ if ip_mask == "*":
+ self.ip = 0
+ self.netmask = 0
+ else:
+ if not "/" in ip_mask:
+ self.netmask = 0xFFFFFFFF
+ ip = ip_mask
+ else:
+ ip, mask = ip_mask.split("/")
+ if re.match(r"\d+.\d+.\d+.\d+", mask):
+ self.netmask=struct.unpack(">I", socket.inet_aton(mask))[0]
else:
- if not "/" in ip_mask:
- self.netmask = 0xFFFFFFFF
- ip = ip_mask
- else:
- ip, mask = ip_mask.split("/")
- if re.match(r"\d+.\d+.\d+.\d+", mask):
- self.netmask=struct.unpack(">I", socket.inet_aton(mask))[0]
- else:
- self.netmask = ~(2**(32 - int(mask)) - 1)
- self.ip = struct.unpack(">I", socket.inet_aton(ip))[0]
- self.ip &= self.netmask
- if port_low == "*":
- self.port_low,self.port_high = (0,65535)
- else:
- if not port_high:
- port_high = port_low
- self.port_low = int(port_low)
- self.port_high = int(port_high)
-
- def check(self, ip, port):
- ip = struct.unpack(">I", socket.inet_aton(ip))[0]
- if (ip & self.netmask) == self.ip:
- if self.port_low <= port and port <= self.port_high:
- return self.match
- return -1
+ self.netmask = ~(2**(32 - int(mask)) - 1)
+ self.ip = struct.unpack(">I", socket.inet_aton(ip))[0]
+ self.ip &= self.netmask
+ if port_low == "*":
+ self.port_low,self.port_high = (0,65535)
+ else:
+ if not port_high:
+ port_high = port_low
+ self.port_low = int(port_low)
+ self.port_high = int(port_high)
+
+ def check(self, ip, port):
+ ip = struct.unpack(">I", socket.inet_aton(ip))[0]
+ if (ip & self.netmask) == self.ip:
+ if self.port_low <= port and port <= self.port_high:
+ return self.match
+ return -1
class RouterVersion:
- def __init__(self, version):
- v = re.search("^(\d+).(\d+).(\d+).(\d+)", version).groups()
- self.version = int(v[0])*0x1000000 + int(v[1])*0x10000 + int(v[2])*0x100 + int(v[3])
- self.ver_string = version
+ def __init__(self, version):
+ v = re.search("^(\d+).(\d+).(\d+).(\d+)", version).groups()
+ self.version = int(v[0])*0x1000000 + int(v[1])*0x10000 + int(v[2])*0x100 + int(v[3])
+ self.ver_string = version
- def __lt__(self, other): return self.version < other.version
- def __gt__(self, other): return self.version > other.version
- def __ge__(self, other): return self.version >= other.version
- def __le__(self, other): return self.version <= other.version
- def __eq__(self, other): return self.version == other.version
- def __ne__(self, other): return self.version != other.version
- def __str__(self): return self.ver_string
+ def __lt__(self, other): return self.version < other.version
+ def __gt__(self, other): return self.version > other.version
+ def __ge__(self, other): return self.version >= other.version
+ def __le__(self, other): return self.version <= other.version
+ def __eq__(self, other): return self.version == other.version
+ def __ne__(self, other): return self.version != other.version
+ def __str__(self): return self.ver_string
class Router:
- def __init__(self, idhex, name, bw, down, exitpolicy, flags, ip, version, os):
- self.idhex = idhex
- self.nickname = name
- self.bw = bw
- self.exitpolicy = exitpolicy
- self.flags = flags
- self.down = down
- self.ip = struct.unpack(">I", socket.inet_aton(ip))[0]
- self.version = RouterVersion(version)
- self.os = os
+ def __init__(self, idhex, name, bw, down, exitpolicy, flags, ip, version, os):
+ self.idhex = idhex
+ self.nickname = name
+ self.bw = bw
+ self.exitpolicy = exitpolicy
+ self.flags = flags
+ self.down = down
+ self.ip = struct.unpack(">I", socket.inet_aton(ip))[0]
+ self.version = RouterVersion(version)
+ self.os = os
- def update_to(self, new):
- if self.idhex != new.idhex:
- plog("ERROR", "Update of router "+self.nickname+"changes idhex!")
- self.idhex = new.idhex
- self.nickname = new.nickname
- self.bw = new.bw
- self.exitpolicy = new.exitpolicy
- self.flags = new.flags
- self.ip = new.ip
- self.version = new.version
- self.os = new.os
+ def update_to(self, new):
+ if self.idhex != new.idhex:
+ plog("ERROR", "Update of router "+self.nickname+"changes idhex!")
+ self.idhex = new.idhex
+ self.nickname = new.nickname
+ self.bw = new.bw
+ self.exitpolicy = new.exitpolicy
+ self.flags = new.flags
+ self.ip = new.ip
+ self.version = new.version
+ self.os = new.os
- def will_exit_to(self, ip, port):
- for line in self.exitpolicy:
- ret = line.check(ip, port)
- if ret != -1:
- return ret
- plog("NOTICE", "No matching exit line for "+self.nickname)
- return False
+ def will_exit_to(self, ip, port):
+ for line in self.exitpolicy:
+ ret = line.check(ip, port)
+ if ret != -1:
+ return ret
+ plog("NOTICE", "No matching exit line for "+self.nickname)
+ return False
class Circuit:
- def __init__(self):
- self.cid = 0
- self.created_at = 0 # time
- self.path = [] # routers
- self.exit = 0
-
- def id_path(self): return map(lambda r: r.idhex, self.path)
+ def __init__(self):
+ self.cid = 0
+ self.created_at = 0 # time
+ self.path = [] # routers
+ self.exit = 0
+
+ def id_path(self): return map(lambda r: r.idhex, self.path)
class Connection:
- """A Connection represents a connection to the Tor process."""
- def __init__(self, sock):
- """Create a Connection to communicate with the Tor process over the
- socket 'sock'.
- """
- self._handler = None
- self._handleFn = None
- self._sendLock = threading.RLock()
- self._queue = Queue.Queue()
- self._thread = None
- self._closedEx = None
- self._closed = 0
- self._closeHandler = None
- self._eventThread = None
- self._eventQueue = Queue.Queue()
- self._s = BufSock(sock)
- self._debugFile = None
+ """A Connection represents a connection to the Tor process."""
+ def __init__(self, sock):
+ """Create a Connection to communicate with the Tor process over the
+ socket 'sock'.
+ """
+ self._handler = None
+ self._handleFn = None
+ self._sendLock = threading.RLock()
+ self._queue = Queue.Queue()
+ self._thread = None
+ self._closedEx = None
+ self._closed = 0
+ self._closeHandler = None
+ self._eventThread = None
+ self._eventQueue = Queue.Queue()
+ self._s = BufSock(sock)
+ self._debugFile = None
- def set_close_handler(self, handler):
- """Call 'handler' when the Tor process has closed its connection or
- given us an exception. If we close normally, no arguments are
- provided; otherwise, it will be called with an exception as its
- argument.
- """
- self._closeHandler = handler
+ def set_close_handler(self, handler):
+ """Call 'handler' when the Tor process has closed its connection or
+ given us an exception. If we close normally, no arguments are
+ provided; otherwise, it will be called with an exception as its
+ argument.
+ """
+ self._closeHandler = handler
- def close(self):
- """Shut down this controller connection"""
- self._sendLock.acquire()
- try:
- self._queue.put("CLOSE")
- self._eventQueue.put("CLOSE")
- self._s.close()
- self._s = None
- self._closed = 1
- finally:
- self._sendLock.release()
+ def close(self):
+ """Shut down this controller connection"""
+ self._sendLock.acquire()
+ try:
+ self._queue.put("CLOSE")
+ self._eventQueue.put("CLOSE")
+ self._s.close()
+ self._s = None
+ self._closed = 1
+ finally:
+ self._sendLock.release()
- def launch_thread(self, daemon=1):
- """Launch a background thread to handle messages from the Tor process."""
- assert self._thread is None
- t = threading.Thread(target=self._loop)
- if daemon:
- t.setDaemon(daemon)
- t.start()
- self._thread = t
- t = threading.Thread(target=self._eventLoop)
- if daemon:
- t.setDaemon(daemon)
- t.start()
- self._eventThread = t
- return self._thread
+ def launch_thread(self, daemon=1):
+ """Launch a background thread to handle messages from the Tor process."""
+ assert self._thread is None
+ t = threading.Thread(target=self._loop)
+ if daemon:
+ t.setDaemon(daemon)
+ t.start()
+ self._thread = t
+ t = threading.Thread(target=self._eventLoop)
+ if daemon:
+ t.setDaemon(daemon)
+ t.start()
+ self._eventThread = t
+ return self._thread
- def _loop(self):
- """Main subthread loop: Read commands from Tor, and handle them either
- as events or as responses to other commands.
- """
- while 1:
- try:
- isEvent, reply = self._read_reply()
- except:
- self._err(sys.exc_info())
- return
+ def _loop(self):
+ """Main subthread loop: Read commands from Tor, and handle them either
+ as events or as responses to other commands.
+ """
+ while 1:
+ try:
+ isEvent, reply = self._read_reply()
+ except:
+ self._err(sys.exc_info())
+ return
- if isEvent:
- if self._handler is not None:
- self._eventQueue.put(reply)
- else:
- cb = self._queue.get() # atomic..
- cb(reply)
+ if isEvent:
+ if self._handler is not None:
+ self._eventQueue.put(reply)
+ else:
+ cb = self._queue.get() # atomic..
+ cb(reply)
- def _err(self, (tp, ex, tb), fromEventLoop=0):
- """DOCDOC"""
- # silent death is bad :(
- traceback.print_exception(tp, ex, tb)
- if self._s:
- try:
- self.close()
- except:
- pass
- self._sendLock.acquire()
- try:
- self._closedEx = ex
- self._closed = 1
- finally:
- self._sendLock.release()
- while 1:
- try:
- cb = self._queue.get(timeout=0)
- if cb != "CLOSE":
- cb("EXCEPTION")
- except Queue.Empty:
- break
- if self._closeHandler is not None:
- self._closeHandler(ex)
+ def _err(self, (tp, ex, tb), fromEventLoop=0):
+ """DOCDOC"""
+ # silent death is bad :(
+ traceback.print_exception(tp, ex, tb)
+ if self._s:
+ try:
+ self.close()
+ except:
+ pass
+ self._sendLock.acquire()
+ try:
+ self._closedEx = ex
+ self._closed = 1
+ finally:
+ self._sendLock.release()
+ while 1:
+ try:
+ cb = self._queue.get(timeout=0)
+ if cb != "CLOSE":
+ cb("EXCEPTION")
+ except Queue.Empty:
+ break
+ if self._closeHandler is not None:
+ self._closeHandler(ex)
+ return
+
+ def _eventLoop(self):
+ """DOCDOC"""
+ while 1:
+ reply = self._eventQueue.get()
+ if reply == "CLOSE":
return
+ try:
+ self._handleFn(reply)
+ except:
+ self._err(sys.exc_info(), 1)
+ return
- def _eventLoop(self):
- """DOCDOC"""
- while 1:
- reply = self._eventQueue.get()
- if reply == "CLOSE":
- return
- try:
- self._handleFn(reply)
- except:
- self._err(sys.exc_info(), 1)
- return
+ def _sendImpl(self, sendFn, msg):
+ """DOCDOC"""
+ if self._thread is None:
+ self.launch_thread(1)
+ # This condition will get notified when we've got a result...
+ condition = threading.Condition()
+ # Here's where the result goes...
+ result = []
- def _sendImpl(self, sendFn, msg):
- """DOCDOC"""
- if self._thread is None:
- self.launch_thread(1)
- # This condition will get notified when we've got a result...
- condition = threading.Condition()
- # Here's where the result goes...
- result = []
+ if self._closedEx is not None:
+ raise self._closedEx
+ elif self._closed:
+ raise TorCtlClosed()
- if self._closedEx is not None:
- raise self._closedEx
- elif self._closed:
- raise TorCtlClosed()
+ def cb(reply,condition=condition,result=result):
+ condition.acquire()
+ try:
+ result.append(reply)
+ condition.notify()
+ finally:
+ condition.release()
- def cb(reply,condition=condition,result=result):
- condition.acquire()
- try:
- result.append(reply)
- condition.notify()
- finally:
- condition.release()
+ # Sends a message to Tor...
+ self._sendLock.acquire() # ensure queue+sendmsg is atomic
+ try:
+ self._queue.put(cb)
+ sendFn(msg) # _doSend(msg)
+ finally:
+ self._sendLock.release()
- # Sends a message to Tor...
- self._sendLock.acquire() # ensure queue+sendmsg is atomic
- try:
- self._queue.put(cb)
- sendFn(msg) # _doSend(msg)
- finally:
- self._sendLock.release()
+ # Now wait till the answer is in...
+ condition.acquire()
+ try:
+ while not result:
+ condition.wait()
+ finally:
+ condition.release()
- # Now wait till the answer is in...
- condition.acquire()
- try:
- while not result:
- condition.wait()
- finally:
- condition.release()
+ # ...And handle the answer appropriately.
+ assert len(result) == 1
+ reply = result[0]
+ if reply == "EXCEPTION":
+ raise self._closedEx
- # ...And handle the answer appropriately.
- assert len(result) == 1
- reply = result[0]
- if reply == "EXCEPTION":
- raise self._closedEx
+ return reply
- return reply
+ def debug(self, f):
+ """DOCDOC"""
+ self._debugFile = f
- def debug(self, f):
- """DOCDOC"""
- self._debugFile = f
+ def set_event_handler(self, handler):
+ """Cause future events from the Tor process to be sent to 'handler'.
+ """
+ self._handler = handler
+ self._handleFn = handler.handle1
- def set_event_handler(self, handler):
- """Cause future events from the Tor process to be sent to 'handler'.
- """
- self._handler = handler
- self._handleFn = handler.handle1
-
- def _read_reply(self):
- lines = []
+ def _read_reply(self):
+ lines = []
+ while 1:
+ line = self._s.readline().strip()
+ if self._debugFile:
+ self._debugFile.write(" %s\n" % line)
+ if len(line)<4:
+ raise ProtocolError("Badly formatted reply line: Too short")
+ code = line[:3]
+ tp = line[3]
+ s = line[4:]
+ if tp == "-":
+ lines.append((code, s, None))
+ elif tp == " ":
+ lines.append((code, s, None))
+ isEvent = (lines and lines[0][0][0] == '6')
+ return isEvent, lines
+ elif tp != "+":
+ raise ProtocolError("Badly formatted reply line: unknown type %r"%tp)
+ else:
+ more = []
while 1:
- line = self._s.readline().strip()
- if self._debugFile:
- self._debugFile.write(" %s\n" % line)
- if len(line)<4:
- raise ProtocolError("Badly formatted reply line: Too short")
- code = line[:3]
- tp = line[3]
- s = line[4:]
- if tp == "-":
- lines.append((code, s, None))
- elif tp == " ":
- lines.append((code, s, None))
- isEvent = (lines and lines[0][0][0] == '6')
- return isEvent, lines
- elif tp != "+":
- raise ProtocolError("Badly formatted reply line: unknown type %r"%tp)
- else:
- more = []
- while 1:
- line = self._s.readline()
- if self._debugFile:
- self._debugFile.write("+++ %s" % line)
- if line in (".\r\n", ".\n"):
- break
- more.append(line)
- lines.append((code, s, unescape_dots("".join(more))))
- isEvent = (lines and lines[0][0][0] == '6')
- if isEvent: # Need "250 OK" if it's not an event. Otherwise, end
- return (isEvent, lines)
-
- # XXX: Notreached
+ line = self._s.readline()
+ if self._debugFile:
+ self._debugFile.write("+++ %s" % line)
+ if line in (".\r\n", ".\n"):
+ break
+ more.append(line)
+ lines.append((code, s, unescape_dots("".join(more))))
isEvent = (lines and lines[0][0][0] == '6')
- return (isEvent, lines)
+ if isEvent: # Need "250 OK" if it's not an event. Otherwise, end
+ return (isEvent, lines)
- def _doSend(self, msg):
- if self._debugFile:
- amsg = msg
- lines = amsg.split("\n")
- if len(lines) > 2:
- amsg = "\n".join(lines[:2]) + "\n"
- self._debugFile.write(">>> %s" % amsg)
- self._s.write(msg)
+ # XXX: Notreached
+ isEvent = (lines and lines[0][0][0] == '6')
+ return (isEvent, lines)
- def sendAndRecv(self, msg="", expectedTypes=("250", "251")):
- """Helper: Send a command 'msg' to Tor, and wait for a command
- in response. If the response type is in expectedTypes,
- return a list of (tp,body,extra) tuples. If it is an
- error, raise ErrorReply. Otherwise, raise ProtocolError.
- """
- if type(msg) == types.ListType:
- msg = "".join(msg)
- assert msg.endswith("\r\n")
+ def _doSend(self, msg):
+ if self._debugFile:
+ amsg = msg
+ lines = amsg.split("\n")
+ if len(lines) > 2:
+ amsg = "\n".join(lines[:2]) + "\n"
+ self._debugFile.write(">>> %s" % amsg)
+ self._s.write(msg)
- lines = self._sendImpl(self._doSend, msg)
- # print lines
- for tp, msg, _ in lines:
- if tp[0] in '45':
- raise ErrorReply("%s %s"%(tp, msg))
- if tp not in expectedTypes:
- raise ProtocolError("Unexpectd message type %r"%tp)
+ def sendAndRecv(self, msg="", expectedTypes=("250", "251")):
+ """Helper: Send a command 'msg' to Tor, and wait for a command
+ in response. If the response type is in expectedTypes,
+ return a list of (tp,body,extra) tuples. If it is an
+ error, raise ErrorReply. Otherwise, raise ProtocolError.
+ """
+ if type(msg) == types.ListType:
+ msg = "".join(msg)
+ assert msg.endswith("\r\n")
- return lines
+ lines = self._sendImpl(self._doSend, msg)
+ # print lines
+ for tp, msg, _ in lines:
+ if tp[0] in '45':
+ raise ErrorReply("%s %s"%(tp, msg))
+ if tp not in expectedTypes:
+ raise ProtocolError("Unexpectd message type %r"%tp)
- def authenticate(self, secret=""):
- """Send an authenticating secret to Tor. You'll need to call this
- method before Tor can start.
- """
- hexstr = binascii.b2a_hex(secret)
- self.sendAndRecv("AUTHENTICATE %s\r\n"%hexstr)
+ return lines
- def get_option(self, name):
- """Get the value of the configuration option named 'name'. To
- retrieve multiple values, pass a list for 'name' instead of
- a string. Returns a list of (key,value) pairs.
- Refer to section 3.3 of control-spec.txt for a list of valid names.
- """
- if not isinstance(name, str):
- name = " ".join(name)
- lines = self.sendAndRecv("GETCONF %s\r\n" % name)
+ def authenticate(self, secret=""):
+ """Send an authenticating secret to Tor. You'll need to call this
+ method before Tor can start.
+ """
+ hexstr = binascii.b2a_hex(secret)
+ self.sendAndRecv("AUTHENTICATE %s\r\n"%hexstr)
- r = []
- for _,line,_ in lines:
- try:
- key, val = line.split("=", 1)
- r.append((key,val))
- except ValueError:
- r.append((line, None))
+ def get_option(self, name):
+ """Get the value of the configuration option named 'name'. To
+ retrieve multiple values, pass a list for 'name' instead of
+ a string. Returns a list of (key,value) pairs.
+ Refer to section 3.3 of control-spec.txt for a list of valid names.
+ """
+ if not isinstance(name, str):
+ name = " ".join(name)
+ lines = self.sendAndRecv("GETCONF %s\r\n" % name)
- return r
+ r = []
+ for _,line,_ in lines:
+ try:
+ key, val = line.split("=", 1)
+ r.append((key,val))
+ except ValueError:
+ r.append((line, None))
- def set_option(self, key, value):
- """Set the value of the configuration option 'key' to the value 'value'.
- """
- self.set_options([(key, value)])
+ return r
- def set_options(self, kvlist):
- """Given a list of (key,value) pairs, set them as configuration
- options.
- """
- if not kvlist:
- return
- msg = " ".join(["%s=%s"%(k,quote(v)) for k,v in kvlist])
- self.sendAndRecv("SETCONF %s\r\n"%msg)
+ def set_option(self, key, value):
+ """Set the value of the configuration option 'key' to the value 'value'.
+ """
+ self.set_options([(key, value)])
- def reset_options(self, keylist):
- """Reset the options listed in 'keylist' to their default values.
+ def set_options(self, kvlist):
+ """Given a list of (key,value) pairs, set them as configuration
+ options.
+ """
+ if not kvlist:
+ return
+ msg = " ".join(["%s=%s"%(k,quote(v)) for k,v in kvlist])
+ self.sendAndRecv("SETCONF %s\r\n"%msg)
- Tor started implementing this command in version 0.1.1.7-alpha;
- previous versions wanted you to set configuration keys to "".
- That no longer works.
- """
- self.sendAndRecv("RESETCONF %s\r\n"%(" ".join(keylist)))
+ def reset_options(self, keylist):
+ """Reset the options listed in 'keylist' to their default values.
- def get_network_status(self, who="all"):
- """Get the entire network status list"""
- return parse_ns_body(self.sendAndRecv("GETINFO ns/"+who+"\r\n")[0][2])
+ Tor started implementing this command in version 0.1.1.7-alpha;
+ previous versions wanted you to set configuration keys to "".
+ That no longer works.
+ """
+ self.sendAndRecv("RESETCONF %s\r\n"%(" ".join(keylist)))
- def get_router(self, ns):
- """Fill in a Router class corresponding to a given NS class"""
- desc = self.sendAndRecv("GETINFO desc/id/" + ns.idhex + "\r\n")[0][2].split("\n")
- line = desc.pop(0)
- m = re.search(r"^router\s+(\S+)\s+(\S+)", line)
- router,ip = m.groups()
- exitpolicy = []
- dead = not ("Running" in ns.flags)
- bw_observed = 0
- if router != ns.nickname:
- plog("NOTICE", "Got different names " + ns.nickname + " vs " +
- router + " for " + ns.idhex)
- # XXX: Compile these regular expressions? This is an expensive process
- # Use http://docs.python.org/lib/profile.html to verify this is
- # the part of startup that is slow
- for line in desc:
- pl = re.search(r"^platform Tor (\S+) on (\S+)", line)
- ac = re.search(r"^accept (\S+):([^-]+)(?:-(\d+))?", line)
- rj = re.search(r"^reject (\S+):([^-]+)(?:-(\d+))?", line)
- bw = re.search(r"^bandwidth \d+ \d+ (\d+)", line)
- if re.search(r"^opt hibernating 1", line):
- #dead = 1 # XXX: Technically this may be stale..
- if ("Running" in ns.flags):
- plog("INFO", "Hibernating router "+ns.nickname+" is running..")
- if ac:
- exitpolicy.append(ExitPolicyLine(True, *ac.groups()))
- elif rj:
- exitpolicy.append(ExitPolicyLine(False, *rj.groups()))
- elif bw:
- bw_observed = int(bw.group(1))
- elif pl:
- version, os = pl.groups()
- if not bw_observed and not dead and ("Valid" in ns.flags):
- plog("INFO", "No bandwidth for live router " + ns.nickname)
- return Router(ns.idhex, ns.nickname, bw_observed, dead, exitpolicy,
- ns.flags, ip, version, os)
+ def get_network_status(self, who="all"):
+ """Get the entire network status list"""
+ return parse_ns_body(self.sendAndRecv("GETINFO ns/"+who+"\r\n")[0][2])
- def read_routers(self, nslist):
- bad_key = 0
- new = []
- for ns in nslist:
- try:
- r = self.get_router(ns)
- new.append(r)
- except ErrorReply:
- bad_key += 1
- if "Running" in ns.flags:
- plog("NOTICE", "Running router "+ns.nickname+"="
- +ns.idhex+" has no descriptor")
- except:
- traceback.print_exception(*sys.exc_info())
- continue
-
- return new
+ def get_router(self, ns):
+ """Fill in a Router class corresponding to a given NS class"""
+ desc = self.sendAndRecv("GETINFO desc/id/" + ns.idhex + "\r\n")[0][2].split("\n")
+ line = desc.pop(0)
+ m = re.search(r"^router\s+(\S+)\s+(\S+)", line)
+ router,ip = m.groups()
+ exitpolicy = []
+ dead = not ("Running" in ns.flags)
+ bw_observed = 0
+ if router != ns.nickname:
+ plog("NOTICE", "Got different names " + ns.nickname + " vs " +
+ router + " for " + ns.idhex)
+ # XXX: Compile these regular expressions? This is an expensive process
+ # Use http://docs.python.org/lib/profile.html to verify this is
+ # the part of startup that is slow
+ for line in desc:
+ pl = re.search(r"^platform Tor (\S+) on (\S+)", line)
+ ac = re.search(r"^accept (\S+):([^-]+)(?:-(\d+))?", line)
+ rj = re.search(r"^reject (\S+):([^-]+)(?:-(\d+))?", line)
+ bw = re.search(r"^bandwidth \d+ \d+ (\d+)", line)
+ if re.search(r"^opt hibernating 1", line):
+ #dead = 1 # XXX: Technically this may be stale..
+ if ("Running" in ns.flags):
+ plog("INFO", "Hibernating router "+ns.nickname+" is running..")
+ if ac:
+ exitpolicy.append(ExitPolicyLine(True, *ac.groups()))
+ elif rj:
+ exitpolicy.append(ExitPolicyLine(False, *rj.groups()))
+ elif bw:
+ bw_observed = int(bw.group(1))
+ elif pl:
+ version, os = pl.groups()
+ if not bw_observed and not dead and ("Valid" in ns.flags):
+ plog("INFO", "No bandwidth for live router " + ns.nickname)
+ return Router(ns.idhex, ns.nickname, bw_observed, dead, exitpolicy,
+ ns.flags, ip, version, os)
- def get_info(self, name):
- """Return the value of the internal information field named 'name'.
- Refer to section 3.9 of control-spec.txt for a list of valid names.
- DOCDOC
- """
- if not isinstance(name, str):
- name = " ".join(name)
- lines = self.sendAndRecv("GETINFO %s\r\n"%name)
- d = {}
- for _,msg,more in lines:
- if msg == "OK":
- break
- try:
- k,rest = msg.split("=",1)
- except ValueError:
- raise ProtocolError("Bad info line %r",msg)
- if more:
- d[k] = more
- else:
- d[k] = rest
- return d
+ def read_routers(self, nslist):
+ bad_key = 0
+ new = []
+ for ns in nslist:
+ try:
+ r = self.get_router(ns)
+ new.append(r)
+ except ErrorReply:
+ bad_key += 1
+ if "Running" in ns.flags:
+ plog("NOTICE", "Running router "+ns.nickname+"="
+ +ns.idhex+" has no descriptor")
+ except:
+ traceback.print_exception(*sys.exc_info())
+ continue
+
+ return new
- def set_events(self, events, extended=False):
- """Change the list of events that the event handler is interested
- in to those in 'events', which is a list of event names.
- Recognized event names are listed in section 3.3 of the control-spec
- """
- if extended:
- plog ("DEBUG", "SETEVENTS EXTENDED %s\r\n" % " ".join(events))
- self.sendAndRecv("SETEVENTS EXTENDED %s\r\n" % " ".join(events))
- else:
- self.sendAndRecv("SETEVENTS %s\r\n" % " ".join(events))
+ def get_info(self, name):
+ """Return the value of the internal information field named 'name'.
+ Refer to section 3.9 of control-spec.txt for a list of valid names.
+ DOCDOC
+ """
+ if not isinstance(name, str):
+ name = " ".join(name)
+ lines = self.sendAndRecv("GETINFO %s\r\n"%name)
+ d = {}
+ for _,msg,more in lines:
+ if msg == "OK":
+ break
+ try:
+ k,rest = msg.split("=",1)
+ except ValueError:
+ raise ProtocolError("Bad info line %r",msg)
+ if more:
+ d[k] = more
+ else:
+ d[k] = rest
+ return d
- def save_conf(self):
- """Flush all configuration changes to disk.
- """
- self.sendAndRecv("SAVECONF\r\n")
+ def set_events(self, events, extended=False):
+ """Change the list of events that the event handler is interested
+ in to those in 'events', which is a list of event names.
+ Recognized event names are listed in section 3.3 of the control-spec
+ """
+ if extended:
+ plog ("DEBUG", "SETEVENTS EXTENDED %s\r\n" % " ".join(events))
+ self.sendAndRecv("SETEVENTS EXTENDED %s\r\n" % " ".join(events))
+ else:
+ self.sendAndRecv("SETEVENTS %s\r\n" % " ".join(events))
- def send_signal(self, sig):
- """Send the signal 'sig' to the Tor process; The allowed values for
- 'sig' are listed in section 3.6 of control-spec.
- """
- sig = { 0x01 : "HUP",
- 0x02 : "INT",
- 0x0A : "USR1",
- 0x0C : "USR2",
- 0x0F : "TERM" }.get(sig,sig)
- self.sendAndRecv("SIGNAL %s\r\n"%sig)
+ def save_conf(self):
+ """Flush all configuration changes to disk.
+ """
+ self.sendAndRecv("SAVECONF\r\n")
- def map_address(self, kvList):
- if not kvList:
- return
- m = " ".join([ "%s=%s" for k,v in kvList])
- lines = self.sendAndRecv("MAPADDRESS %s\r\n"%m)
- r = []
- for _,line,_ in lines:
- try:
- key, val = line.split("=", 1)
- except ValueError:
- raise ProtocolError("Bad address line %r",v)
- r.append((key,val))
- return r
+ def send_signal(self, sig):
+ """Send the signal 'sig' to the Tor process; The allowed values for
+ 'sig' are listed in section 3.6 of control-spec.
+ """
+ sig = { 0x01 : "HUP",
+ 0x02 : "INT",
+ 0x0A : "USR1",
+ 0x0C : "USR2",
+ 0x0F : "TERM" }.get(sig,sig)
+ self.sendAndRecv("SIGNAL %s\r\n"%sig)
- def extend_circuit(self, circid, hops):
- """Tell Tor to extend the circuit identified by 'circid' through the
- servers named in the list 'hops'.
- """
- if circid is None:
- circid = "0"
- plog("DEBUG", "Extending circuit")
- lines = self.sendAndRecv("EXTENDCIRCUIT %d %s\r\n"
- %(circid, ",".join(hops)))
- tp,msg,_ = lines[0]
- m = re.match(r'EXTENDED (\S*)', msg)
- if not m:
- raise ProtocolError("Bad extended line %r",msg)
- plog("DEBUG", "Circuit extended")
- return int(m.group(1))
+ def map_address(self, kvList):
+ if not kvList:
+ return
+ m = " ".join([ "%s=%s" for k,v in kvList])
+ lines = self.sendAndRecv("MAPADDRESS %s\r\n"%m)
+ r = []
+ for _,line,_ in lines:
+ try:
+ key, val = line.split("=", 1)
+ except ValueError:
+ raise ProtocolError("Bad address line %r",v)
+ r.append((key,val))
+ return r
- def redirect_stream(self, streamid, newaddr, newport=""):
- """DOCDOC"""
- if newport:
- self.sendAndRecv("REDIRECTSTREAM %d %s %s\r\n"%(streamid, newaddr, newport))
- else:
- self.sendAndRecv("REDIRECTSTREAM %d %s\r\n"%(streamid, newaddr))
+ def extend_circuit(self, circid, hops):
+ """Tell Tor to extend the circuit identified by 'circid' through the
+ servers named in the list 'hops'.
+ """
+ if circid is None:
+ circid = "0"
+ plog("DEBUG", "Extending circuit")
+ lines = self.sendAndRecv("EXTENDCIRCUIT %d %s\r\n"
+ %(circid, ",".join(hops)))
+ tp,msg,_ = lines[0]
+ m = re.match(r'EXTENDED (\S*)', msg)
+ if not m:
+ raise ProtocolError("Bad extended line %r",msg)
+ plog("DEBUG", "Circuit extended")
+ return int(m.group(1))
- def attach_stream(self, streamid, circid):
- """DOCDOC"""
- plog("DEBUG", "Attaching stream: "+str(streamid)+" to "+str(circid))
- self.sendAndRecv("ATTACHSTREAM %d %d\r\n"%(streamid, circid))
+ def redirect_stream(self, streamid, newaddr, newport=""):
+ """DOCDOC"""
+ if newport:
+ self.sendAndRecv("REDIRECTSTREAM %d %s %s\r\n"%(streamid, newaddr, newport))
+ else:
+ self.sendAndRecv("REDIRECTSTREAM %d %s\r\n"%(streamid, newaddr))
- def close_stream(self, streamid, reason=0, flags=()):
- """DOCDOC"""
- self.sendAndRecv("CLOSESTREAM %d %s %s\r\n"
- %(streamid, reason, "".join(flags)))
+ def attach_stream(self, streamid, circid):
+ """DOCDOC"""
+ plog("DEBUG", "Attaching stream: "+str(streamid)+" to "+str(circid))
+ self.sendAndRecv("ATTACHSTREAM %d %d\r\n"%(streamid, circid))
- def close_circuit(self, circid, reason=0, flags=()):
- """DOCDOC"""
- self.sendAndRecv("CLOSECIRCUIT %d %s %s\r\n"
- %(circid, reason, "".join(flags)))
+ def close_stream(self, streamid, reason=0, flags=()):
+ """DOCDOC"""
+ self.sendAndRecv("CLOSESTREAM %d %s %s\r\n"
+ %(streamid, reason, "".join(flags)))
- def post_descriptor(self, desc):
- self.sendAndRecv("+POSTDESCRIPTOR\r\n%s"%escape_dots(desc))
+ def close_circuit(self, circid, reason=0, flags=()):
+ """DOCDOC"""
+ self.sendAndRecv("CLOSECIRCUIT %d %s %s\r\n"
+ %(circid, reason, "".join(flags)))
+ def post_descriptor(self, desc):
+ self.sendAndRecv("+POSTDESCRIPTOR\r\n%s"%escape_dots(desc))
+
def parse_ns_body(data):
- "Parse the body of an NS event or command."
- nsgroups = re.compile(r"^r ", re.M).split(data)
- nsgroups.pop(0)
- nslist = []
- for nsline in nsgroups:
- m = re.search(r"^s((?:\s\S*)+)", nsline, re.M)
- flags = m.groups()
- flags = flags[0].strip().split(" ")
- m = re.match(r"(\S+)\s(\S+)\s(\S+)\s(\S+\s\S+)\s(\S+)\s(\d+)\s(\d+)", nsline)
- nslist.append(NetworkStatus(*(m.groups() + (flags,))))
- return nslist
+ "Parse the body of an NS event or command."
+ nsgroups = re.compile(r"^r ", re.M).split(data)
+ nsgroups.pop(0)
+ nslist = []
+ for nsline in nsgroups:
+ m = re.search(r"^s((?:\s\S*)+)", nsline, re.M)
+ flags = m.groups()
+ flags = flags[0].strip().split(" ")
+ m = re.match(r"(\S+)\s(\S+)\s(\S+)\s(\S+\s\S+)\s(\S+)\s(\d+)\s(\d+)", nsline)
+ nslist.append(NetworkStatus(*(m.groups() + (flags,))))
+ return nslist
class EventHandler:
- """An 'EventHandler' wraps callbacks for the events Tor can return."""
- def __init__(self):
- """Create a new EventHandler."""
- self._map1 = {
- "CIRC" : self.circ_status_event,
- "STREAM" : self.stream_status_event,
- "ORCONN" : self.or_conn_status_event,
- "BW" : self.bandwidth_event,
- "DEBUG" : self.msg_event,
- "INFO" : self.msg_event,
- "NOTICE" : self.msg_event,
- "WARN" : self.msg_event,
- "ERR" : self.msg_event,
- "NEWDESC" : self.new_desc_event,
- "ADDRMAP" : self.address_mapped_event,
- "NS" : self.ns_event
- }
+ """An 'EventHandler' wraps callbacks for the events Tor can return."""
+ def __init__(self):
+ """Create a new EventHandler."""
+ self._map1 = {
+ "CIRC" : self.circ_status_event,
+ "STREAM" : self.stream_status_event,
+ "ORCONN" : self.or_conn_status_event,
+ "BW" : self.bandwidth_event,
+ "DEBUG" : self.msg_event,
+ "INFO" : self.msg_event,
+ "NOTICE" : self.msg_event,
+ "WARN" : self.msg_event,
+ "ERR" : self.msg_event,
+ "NEWDESC" : self.new_desc_event,
+ "ADDRMAP" : self.address_mapped_event,
+ "NS" : self.ns_event
+ }
- def handle1(self, lines):
- """Dispatcher: called from Connection when an event is received."""
- for code, msg, data in lines:
- event = self.decode1(msg, data)
- self.heartbeat_event(event)
- self._map1.get(event.event_name, self.unknown_event)(event)
+ def handle1(self, lines):
+ """Dispatcher: called from Connection when an event is received."""
+ for code, msg, data in lines:
+ event = self.decode1(msg, data)
+ self.heartbeat_event(event)
+ self._map1.get(event.event_name, self.unknown_event)(event)
- def decode1(self, body, data):
- """Unpack an event message into a type/arguments-tuple tuple."""
- if " " in body:
- evtype,body = body.split(" ",1)
+ def decode1(self, body, data):
+ """Unpack an event message into a type/arguments-tuple tuple."""
+ if " " in body:
+ evtype,body = body.split(" ",1)
+ else:
+ evtype,body = body,""
+ evtype = evtype.upper()
+ if evtype == "CIRC":
+ m = re.match(r"(\d+)\s+(\S+)(\s\S+)?(\s\S+)?(\s\S+)?", body)
+ if not m:
+ raise ProtocolError("CIRC event misformatted.")
+ ident,status,path,reason,remote = m.groups()
+ ident = int(ident)
+ if path:
+ if "REASON=" in path:
+ remote = reason
+ reason = path
+ path=[]
else:
- evtype,body = body,""
- evtype = evtype.upper()
- if evtype == "CIRC":
- m = re.match(r"(\d+)\s+(\S+)(\s\S+)?(\s\S+)?(\s\S+)?", body)
- if not m:
- raise ProtocolError("CIRC event misformatted.")
- ident,status,path,reason,remote = m.groups()
- ident = int(ident)
- if path:
- if "REASON=" in path:
- remote = reason
- reason = path
- path=[]
- else:
- path = path.strip().split(",")
- else:
- path = []
- if reason: reason = reason[8:]
- if remote: remote = remote[15:]
- event = CircuitEvent(evtype, ident, status, path, reason,
- remote)
- elif evtype == "STREAM":
- m = re.match(r"(\S+)\s+(\S+)\s+(\S+)\s+(\S+):(\d+)(\s\S+)?(\s\S+)?", body)
- if not m:
- raise ProtocolError("STREAM event misformatted.")
- ident,status,circ,target_host,target_port,reason,remote = m.groups()
- ident,circ = map(int, (ident,circ))
- if reason: reason = reason[8:]
- if remote: remote = remote[15:]
- event = StreamEvent(evtype, ident, status, circ, target_host,
- int(target_port), reason, remote)
- elif evtype == "ORCONN":
- m = re.match(r"(\S+)\s+(\S+)(\sAGE=\S+)?(\sREAD=\S+)?(\sWRITTEN=\S+)?(\sREASON=\S+)?(\sNCIRCS=\S+)?", body)
- if not m:
- raise ProtocolError("ORCONN event misformatted.")
- target, status, age, read, wrote, reason, ncircs = m.groups()
+ path = path.strip().split(",")
+ else:
+ path = []
+ if reason: reason = reason[8:]
+ if remote: remote = remote[15:]
+ event = CircuitEvent(evtype, ident, status, path, reason,
+ remote)
+ elif evtype == "STREAM":
+ m = re.match(r"(\S+)\s+(\S+)\s+(\S+)\s+(\S+):(\d+)(\s\S+)?(\s\S+)?", body)
+ if not m:
+ raise ProtocolError("STREAM event misformatted.")
+ ident,status,circ,target_host,target_port,reason,remote = m.groups()
+ ident,circ = map(int, (ident,circ))
+ if reason: reason = reason[8:]
+ if remote: remote = remote[15:]
+ event = StreamEvent(evtype, ident, status, circ, target_host,
+ int(target_port), reason, remote)
+ elif evtype == "ORCONN":
+ m = re.match(r"(\S+)\s+(\S+)(\sAGE=\S+)?(\sREAD=\S+)?(\sWRITTEN=\S+)?(\sREASON=\S+)?(\sNCIRCS=\S+)?", body)
+ if not m:
+ raise ProtocolError("ORCONN event misformatted.")
+ target, status, age, read, wrote, reason, ncircs = m.groups()
- #plog("DEBUG", "ORCONN: "+body)
- if ncircs: ncircs = int(ncircs[8:])
- else: ncircs = 0
- if reason: reason = reason[8:]
- if age: age = int(age[5:])
- else: age = 0
- if read: read = int(read[6:])
- else: read = 0
- if wrote: wrote = int(wrote[9:])
- else: wrote = 0
- event = ORConnEvent(evtype, status, target, age, read, wrote,
- reason, ncircs)
- elif evtype == "BW":
- m = re.match(r"(\d+)\s+(\d+)", body)
- if not m:
- raise ProtocolError("BANDWIDTH event misformatted.")
- read, written = map(long, m.groups())
- event = BWEvent(evtype, read, written)
- elif evtype in ("DEBUG", "INFO", "NOTICE", "WARN", "ERR"):
- event = LogEvent(evtype, body)
- elif evtype == "NEWDESC":
- event = NewDescEvent(evtype, body.split(" "))
- elif evtype == "ADDRMAP":
- m = re.match(r'(\S+)\s+(\S+)\s+(\"[^"]+\"|\w+)', body)
- if not m:
- raise ProtocolError("BANDWIDTH event misformatted.")
- fromaddr, toaddr, when = m.groups()
- if when.upper() == "NEVER":
- when = None
- else:
- when = time.localtime(
- time.strptime(when[1:-1], "%Y-%m-%d %H:%M:%S"))
- event = AddrMapEvent(evtype, fromaddr, toaddr, when, "Unknown")
- elif evtype == "NS":
- event = NetworkStatusEvent(evtype, parse_ns_body(data))
- else:
- event = UnknownEvent(evtype, body)
+ #plog("DEBUG", "ORCONN: "+body)
+ if ncircs: ncircs = int(ncircs[8:])
+ else: ncircs = 0
+ if reason: reason = reason[8:]
+ if age: age = int(age[5:])
+ else: age = 0
+ if read: read = int(read[6:])
+ else: read = 0
+ if wrote: wrote = int(wrote[9:])
+ else: wrote = 0
+ event = ORConnEvent(evtype, status, target, age, read, wrote,
+ reason, ncircs)
+ elif evtype == "BW":
+ m = re.match(r"(\d+)\s+(\d+)", body)
+ if not m:
+ raise ProtocolError("BANDWIDTH event misformatted.")
+ read, written = map(long, m.groups())
+ event = BWEvent(evtype, read, written)
+ elif evtype in ("DEBUG", "INFO", "NOTICE", "WARN", "ERR"):
+ event = LogEvent(evtype, body)
+ elif evtype == "NEWDESC":
+ event = NewDescEvent(evtype, body.split(" "))
+ elif evtype == "ADDRMAP":
+ m = re.match(r'(\S+)\s+(\S+)\s+(\"[^"]+\"|\w+)', body)
+ if not m:
+ raise ProtocolError("BANDWIDTH event misformatted.")
+ fromaddr, toaddr, when = m.groups()
+ if when.upper() == "NEVER":
+ when = None
+ else:
+ when = time.localtime(
+ time.strptime(when[1:-1], "%Y-%m-%d %H:%M:%S"))
+ event = AddrMapEvent(evtype, fromaddr, toaddr, when, "Unknown")
+ elif evtype == "NS":
+ event = NetworkStatusEvent(evtype, parse_ns_body(data))
+ else:
+ event = UnknownEvent(evtype, body)
- return event
+ return event
- def heartbeat_event(self, event):
- """Called before any event is recieved. Convenience function
- for any cleanup/setup/reconfiguration you may need to do.
- """
- pass
+ def heartbeat_event(self, event):
+ """Called before any event is recieved. Convenience function
+ for any cleanup/setup/reconfiguration you may need to do.
+ """
+ pass
- def unknown_event(self, event):
- """Called when we get an event type we don't recognize. This
- is almost alwyas an error.
- """
- raise NotImplemented()
+ def unknown_event(self, event):
+ """Called when we get an event type we don't recognize. This
+ is almost alwyas an error.
+ """
+ raise NotImplemented()
- def circ_status_event(self, event):
- """Called when a circuit status changes if listening to CIRCSTATUS
- events. 'status' is a member of CIRC_STATUS; circID is a numeric
- circuit ID, and 'path' is the circuit's path so far as a list of
- names.
- """
- raise NotImplemented()
+ def circ_status_event(self, event):
+ """Called when a circuit status changes if listening to CIRCSTATUS
+ events. 'status' is a member of CIRC_STATUS; circID is a numeric
+ circuit ID, and 'path' is the circuit's path so far as a list of
+ names.
+ """
+ raise NotImplemented()
- def stream_status_event(self, event):
- """Called when a stream status changes if listening to STREAMSTATUS
- events. 'status' is a member of STREAM_STATUS; streamID is a
- numeric stream ID, and 'target' is the destination of the stream.
- """
- raise NotImplemented()
+ def stream_status_event(self, event):
+ """Called when a stream status changes if listening to STREAMSTATUS
+ events. 'status' is a member of STREAM_STATUS; streamID is a
+ numeric stream ID, and 'target' is the destination of the stream.
+ """
+ raise NotImplemented()
- def or_conn_status_event(self, event):
- """Called when an OR connection's status changes if listening to
- ORCONNSTATUS events. 'status' is a member of OR_CONN_STATUS; target
- is the OR in question.
- """
- raise NotImplemented()
+ def or_conn_status_event(self, event):
+ """Called when an OR connection's status changes if listening to
+ ORCONNSTATUS events. 'status' is a member of OR_CONN_STATUS; target
+ is the OR in question.
+ """
+ raise NotImplemented()
- def bandwidth_event(self, event):
- """Called once a second if listening to BANDWIDTH events. 'read' is
- the number of bytes read; 'written' is the number of bytes written.
- """
- raise NotImplemented()
+ def bandwidth_event(self, event):
+ """Called once a second if listening to BANDWIDTH events. 'read' is
+ the number of bytes read; 'written' is the number of bytes written.
+ """
+ raise NotImplemented()
- def new_desc_event(self, event):
- """Called when Tor learns a new server descriptor if listenting to
- NEWDESC events.
- """
- raise NotImplemented()
+ def new_desc_event(self, event):
+ """Called when Tor learns a new server descriptor if listenting to
+ NEWDESC events.
+ """
+ raise NotImplemented()
- def msg_event(self, event):
- """Called when a log message of a given severity arrives if listening
- to INFO_MSG, NOTICE_MSG, WARN_MSG, or ERR_MSG events."""
- raise NotImplemented()
+ def msg_event(self, event):
+ """Called when a log message of a given severity arrives if listening
+ to INFO_MSG, NOTICE_MSG, WARN_MSG, or ERR_MSG events."""
+ raise NotImplemented()
- def ns_event(self, event):
- raise NotImplemented()
+ def ns_event(self, event):
+ raise NotImplemented()
- def address_mapped_event(self, event):
- """Called when Tor adds a mapping for an address if listening
- to ADDRESSMAPPED events.
- """
- raise NotImplemented()
+ def address_mapped_event(self, event):
+ """Called when Tor adds a mapping for an address if listening
+ to ADDRESSMAPPED events.
+ """
+ raise NotImplemented()
class DebugEventHandler(EventHandler):
- """Trivial debug event handler: reassembles all parsed events to stdout."""
- def circ_status_event(self, circ_event): # CircuitEvent()
- output = [circ_event.event_name, str(circ_event.circ_id),
- circ_event.status]
- if circ_event.path:
- output.append(",".join(circ_event.path))
- if circ_event.reason:
- output.append("REASON=" + circ_event.reason)
- if circ_event.remote_reason:
- output.append("REMOTE_REASON=" + circ_event.remote_reason)
- print " ".join(output)
+ """Trivial debug event handler: reassembles all parsed events to stdout."""
+ def circ_status_event(self, circ_event): # CircuitEvent()
+ output = [circ_event.event_name, str(circ_event.circ_id),
+ circ_event.status]
+ if circ_event.path:
+ output.append(",".join(circ_event.path))
+ if circ_event.reason:
+ output.append("REASON=" + circ_event.reason)
+ if circ_event.remote_reason:
+ output.append("REMOTE_REASON=" + circ_event.remote_reason)
+ print " ".join(output)
- def stream_status_event(self, strm_event):
- output = [strm_event.event_name, str(strm_event.strm_id),
- strm_event.status, str(strm_event.circ_id),
- strm_event.target_host, str(strm_event.target_port)]
- if strm_event.reason:
- output.append("REASON=" + strm_event.reason)
- if strm_event.remote_reason:
- output.append("REMOTE_REASON=" + strm_event.remote_reason)
- print " ".join(output)
+ def stream_status_event(self, strm_event):
+ output = [strm_event.event_name, str(strm_event.strm_id),
+ strm_event.status, str(strm_event.circ_id),
+ strm_event.target_host, str(strm_event.target_port)]
+ if strm_event.reason:
+ output.append("REASON=" + strm_event.reason)
+ if strm_event.remote_reason:
+ output.append("REMOTE_REASON=" + strm_event.remote_reason)
+ print " ".join(output)
- def ns_event(self, ns_event):
- for ns in ns_event.nslist:
- print " ".join((ns_event.event_name, ns.nickname, ns.idhash,
- ns.updated.isoformat(), ns.ip, str(ns.orport),
- str(ns.dirport), " ".join(ns.flags)))
+ def ns_event(self, ns_event):
+ for ns in ns_event.nslist:
+ print " ".join((ns_event.event_name, ns.nickname, ns.idhash,
+ ns.updated.isoformat(), ns.ip, str(ns.orport),
+ str(ns.dirport), " ".join(ns.flags)))
- def new_desc_event(self, newdesc_event):
- print " ".join((newdesc_event.event_name, " ".join(newdesc_event.idlist)))
+ def new_desc_event(self, newdesc_event):
+ print " ".join((newdesc_event.event_name, " ".join(newdesc_event.idlist)))
- def or_conn_status_event(self, orconn_event):
- if orconn_event.age: age = "AGE="+str(orconn_event.age)
- else: age = ""
- if orconn_event.read_bytes: read = "READ="+str(orconn_event.read_bytes)
- else: read = ""
- if orconn_event.wrote_bytes: wrote = "WRITTEN="+str(orconn_event.wrote_bytes)
- else: wrote = ""
- if orconn_event.reason: reason = "REASON="+orconn_event.reason
- else: reason = ""
- if orconn_event.ncircs: ncircs = "NCIRCS="+str(orconn_event.ncircs)
- else: ncircs = ""
- print " ".join((orconn_event.event_name, orconn_event.endpoint,
- orconn_event.status, age, read, wrote, reason, ncircs))
+ def or_conn_status_event(self, orconn_event):
+ if orconn_event.age: age = "AGE="+str(orconn_event.age)
+ else: age = ""
+ if orconn_event.read_bytes: read = "READ="+str(orconn_event.read_bytes)
+ else: read = ""
+ if orconn_event.wrote_bytes: wrote = "WRITTEN="+str(orconn_event.wrote_bytes)
+ else: wrote = ""
+ if orconn_event.reason: reason = "REASON="+orconn_event.reason
+ else: reason = ""
+ if orconn_event.ncircs: ncircs = "NCIRCS="+str(orconn_event.ncircs)
+ else: ncircs = ""
+ print " ".join((orconn_event.event_name, orconn_event.endpoint,
+ orconn_event.status, age, read, wrote, reason, ncircs))
- def msg_event(self, log_event):
- print log_event.event_name+" "+log_event.msg
-
- def bandwidth_event(self, bw_event):
- print bw_event.event_name+" "+str(bw_event.read)+" "+str(bw_event.written)
+ def msg_event(self, log_event):
+ print log_event.event_name+" "+log_event.msg
+
+ def bandwidth_event(self, bw_event):
+ print bw_event.event_name+" "+str(bw_event.read)+" "+str(bw_event.written)
def parseHostAndPort(h):
- """Given a string of the form 'address:port' or 'address' or
- 'port' or '', return a two-tuple of (address, port)
- """
- host, port = "localhost", 9100
- if ":" in h:
- i = h.index(":")
- host = h[:i]
- try:
- port = int(h[i+1:])
- except ValueError:
- print "Bad hostname %r"%h
- sys.exit(1)
- elif h:
- try:
- port = int(h)
- except ValueError:
- host = h
+ """Given a string of the form 'address:port' or 'address' or
+ 'port' or '', return a two-tuple of (address, port)
+ """
+ host, port = "localhost", 9100
+ if ":" in h:
+ i = h.index(":")
+ host = h[:i]
+ try:
+ port = int(h[i+1:])
+ except ValueError:
+ print "Bad hostname %r"%h
+ sys.exit(1)
+ elif h:
+ try:
+ port = int(h)
+ except ValueError:
+ host = h
- return host, port
+ return host, port
def run_example(host,port):
- print "host is %s:%d"%(host,port)
- s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
- s.connect((host,port))
- c = Connection(s)
- c.set_event_handler(DebugEventHandler())
- th = c.launch_thread()
- c.authenticate()
- print "nick",`c.get_option("nickname")`
- print `c.get_info("version")`
- #print `c.get_info("desc/name/moria1")`
- print `c.get_info("network-status")`
- print `c.get_info("addr-mappings/all")`
- print `c.get_info("addr-mappings/config")`
- print `c.get_info("addr-mappings/cache")`
- print `c.get_info("addr-mappings/control")`
+ print "host is %s:%d"%(host,port)
+ s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+ s.connect((host,port))
+ c = Connection(s)
+ c.set_event_handler(DebugEventHandler())
+ th = c.launch_thread()
+ c.authenticate()
+ print "nick",`c.get_option("nickname")`
+ print `c.get_info("version")`
+ #print `c.get_info("desc/name/moria1")`
+ print `c.get_info("network-status")`
+ print `c.get_info("addr-mappings/all")`
+ print `c.get_info("addr-mappings/config")`
+ print `c.get_info("addr-mappings/cache")`
+ print `c.get_info("addr-mappings/control")`
- print `c.extend_circuit(0,["moria1"])`
- try:
- print `c.extend_circuit(0,[""])`
- except ErrorReply: # wtf?
- print "got error. good."
- except:
- print "Strange error", sys.exc_info()[0]
+ print `c.extend_circuit(0,["moria1"])`
+ try:
+ print `c.extend_circuit(0,[""])`
+ except ErrorReply: # wtf?
+ print "got error. good."
+ except:
+ print "Strange error", sys.exc_info()[0]
- #send_signal(s,1)
- #save_conf(s)
+ #send_signal(s,1)
+ #save_conf(s)
- #set_option(s,"1")
- #set_option(s,"bandwidthburstbytes 100000")
- #set_option(s,"runasdaemon 1")
- #set_events(s,[EVENT_TYPE.WARN])
-# c.set_events([EVENT_TYPE.ORCONN], True)
- c.set_events([EVENT_TYPE.STREAM, EVENT_TYPE.CIRC,
- EVENT_TYPE.NS, EVENT_TYPE.NEWDESC,
- EVENT_TYPE.ORCONN, EVENT_TYPE.BW], True)
+ #set_option(s,"1")
+ #set_option(s,"bandwidthburstbytes 100000")
+ #set_option(s,"runasdaemon 1")
+ #set_events(s,[EVENT_TYPE.WARN])
+# c.set_events([EVENT_TYPE.ORCONN], True)
+ c.set_events([EVENT_TYPE.STREAM, EVENT_TYPE.CIRC,
+ EVENT_TYPE.NS, EVENT_TYPE.NEWDESC,
+ EVENT_TYPE.ORCONN, EVENT_TYPE.BW], True)
- th.join()
- return
+ th.join()
+ return
if __name__ == '__main__':
- if len(sys.argv) > 2:
- print "Syntax: TorControl.py torhost:torport"
- sys.exit(0)
- else:
- sys.argv.append("localhost:9051")
- sh,sp = parseHostAndPort(sys.argv[1])
- run_example(sh,sp)
+ if len(sys.argv) > 2:
+ print "Syntax: TorControl.py torhost:torport"
+ sys.exit(0)
+ else:
+ sys.argv.append("localhost:9051")
+ sh,sp = parseHostAndPort(sys.argv[1])
+ run_example(sh,sp)
Modified: torflow/trunk/TorCtl/TorUtil.py
===================================================================
--- torflow/trunk/TorCtl/TorUtil.py 2007-03-12 21:07:19 UTC (rev 9806)
+++ torflow/trunk/TorCtl/TorUtil.py 2007-03-13 01:36:12 UTC (rev 9807)
@@ -15,171 +15,171 @@
import sha
__all__ = ["Enum", "Enum2", "quote", "escape_dots", "unescape_dots",
- "BufSock", "secret_to_key", "urandom_rng", "s2k_gen", "s2k_check",
- "plog", "ListenSocket"]
+ "BufSock", "secret_to_key", "urandom_rng", "s2k_gen", "s2k_check",
+ "plog", "ListenSocket"]
class Enum:
- # Helper: define an ordered dense name-to-number 1-1 mapping.
- def __init__(self, start, names):
- self.nameOf = {}
- idx = start
- for name in names:
- setattr(self,name,idx)
- self.nameOf[idx] = name
- idx += 1
+ # Helper: define an ordered dense name-to-number 1-1 mapping.
+ def __init__(self, start, names):
+ self.nameOf = {}
+ idx = start
+ for name in names:
+ setattr(self,name,idx)
+ self.nameOf[idx] = name
+ idx += 1
class Enum2:
- # Helper: define an ordered sparse name-to-number 1-1 mapping.
- def __init__(self, **args):
- self.__dict__.update(args)
- self.nameOf = {}
- for k,v in args.items():
- self.nameOf[v] = k
+ # Helper: define an ordered sparse name-to-number 1-1 mapping.
+ def __init__(self, **args):
+ self.__dict__.update(args)
+ self.nameOf = {}
+ for k,v in args.items():
+ self.nameOf[v] = k
def quote(s):
- return re.sub(r'([\r\n\\\"])', r'\\\1', s)
+ return re.sub(r'([\r\n\\\"])', r'\\\1', s)
def escape_dots(s, translate_nl=1):
- if translate_nl:
- lines = re.split(r"\r?\n", s)
- else:
- lines = s.split("\r\n")
- if lines and not lines[-1]:
- del lines[-1]
- for i in xrange(len(lines)):
- if lines[i].startswith("."):
- lines[i] = "."+lines[i]
- lines.append(".\r\n")
- return "\r\n".join(lines)
+ if translate_nl:
+ lines = re.split(r"\r?\n", s)
+ else:
+ lines = s.split("\r\n")
+ if lines and not lines[-1]:
+ del lines[-1]
+ for i in xrange(len(lines)):
+ if lines[i].startswith("."):
+ lines[i] = "."+lines[i]
+ lines.append(".\r\n")
+ return "\r\n".join(lines)
def unescape_dots(s, translate_nl=1):
- lines = s.split("\r\n")
+ lines = s.split("\r\n")
- for i in xrange(len(lines)):
- if lines[i].startswith("."):
- lines[i] = lines[i][1:]
+ for i in xrange(len(lines)):
+ if lines[i].startswith("."):
+ lines[i] = lines[i][1:]
- if lines and lines[-1]:
- lines.append("")
+ if lines and lines[-1]:
+ lines.append("")
- if translate_nl:
- return "\n".join(lines)
- else:
- return "\r\n".join(lines)
+ if translate_nl:
+ return "\n".join(lines)
+ else:
+ return "\r\n".join(lines)
class BufSock:
- def __init__(self, s):
- self._s = s
- self._buf = []
+ def __init__(self, s):
+ self._s = s
+ self._buf = []
- def readline(self):
- if self._buf:
- idx = self._buf[0].find('\n')
- if idx >= 0:
- result = self._buf[0][:idx+1]
- self._buf[0] = self._buf[0][idx+1:]
- return result
+ def readline(self):
+ if self._buf:
+ idx = self._buf[0].find('\n')
+ if idx >= 0:
+ result = self._buf[0][:idx+1]
+ self._buf[0] = self._buf[0][idx+1:]
+ return result
- while 1:
- s = self._s.recv(128)
- if not s: return None
- # XXX: This really does need an exception
- # raise ConnectionClosed()
- idx = s.find('\n')
- if idx >= 0:
- self._buf.append(s[:idx+1])
- result = "".join(self._buf)
- rest = s[idx+1:]
- if rest:
- self._buf = [ rest ]
- else:
- del self._buf[:]
- return result
- else:
- self._buf.append(s)
+ while 1:
+ s = self._s.recv(128)
+ if not s: return None
+ # XXX: This really does need an exception
+ # raise ConnectionClosed()
+ idx = s.find('\n')
+ if idx >= 0:
+ self._buf.append(s[:idx+1])
+ result = "".join(self._buf)
+ rest = s[idx+1:]
+ if rest:
+ self._buf = [ rest ]
+ else:
+ del self._buf[:]
+ return result
+ else:
+ self._buf.append(s)
- def write(self, s):
- self._s.send(s)
+ def write(self, s):
+ self._s.send(s)
- def close(self):
- self._s.close()
+ def close(self):
+ self._s.close()
# SocketServer.TCPServer is nuts..
class ListenSocket:
- def __init__(self, listen_ip, port):
- msg = None
+ def __init__(self, listen_ip, port):
+ msg = None
+ self.s = None
+ for res in socket.getaddrinfo(listen_ip, port, socket.AF_UNSPEC,
+ socket.SOCK_STREAM, 0, socket.AI_PASSIVE):
+ af, socktype, proto, canonname, sa = res
+ try:
+ self.s = socket.socket(af, socktype, proto)
+ self.s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
+ except socket.error, msg:
self.s = None
- for res in socket.getaddrinfo(listen_ip, port, socket.AF_UNSPEC,
- socket.SOCK_STREAM, 0, socket.AI_PASSIVE):
- af, socktype, proto, canonname, sa = res
- try:
- self.s = socket.socket(af, socktype, proto)
- self.s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
- except socket.error, msg:
- self.s = None
- continue
- try:
- self.s.bind(sa)
- self.s.listen(1)
- except socket.error, msg:
- self.s.close()
- self.s = None
- continue
- break
- if self.s is None:
- raise socket.error(msg)
+ continue
+ try:
+ self.s.bind(sa)
+ self.s.listen(1)
+ except socket.error, msg:
+ self.s.close()
+ self.s = None
+ continue
+ break
+ if self.s is None:
+ raise socket.error(msg)
- def accept(self):
- conn, addr = self.s.accept()
- return conn
+ def accept(self):
+ conn, addr = self.s.accept()
+ return conn
- def close(self):
- self.s.close()
+ def close(self):
+ self.s.close()
def secret_to_key(secret, s2k_specifier):
- """Used to generate a hashed password string. DOCDOC."""
- c = ord(s2k_specifier[8])
- EXPBIAS = 6
- count = (16+(c&15)) << ((c>>4) + EXPBIAS)
+ """Used to generate a hashed password string. DOCDOC."""
+ c = ord(s2k_specifier[8])
+ EXPBIAS = 6
+ count = (16+(c&15)) << ((c>>4) + EXPBIAS)
- d = sha.new()
- tmp = s2k_specifier[:8]+secret
- slen = len(tmp)
- while count:
- if count > slen:
- d.update(tmp)
- count -= slen
- else:
- d.update(tmp[:count])
- count = 0
- return d.digest()
+ d = sha.new()
+ tmp = s2k_specifier[:8]+secret
+ slen = len(tmp)
+ while count:
+ if count > slen:
+ d.update(tmp)
+ count -= slen
+ else:
+ d.update(tmp[:count])
+ count = 0
+ return d.digest()
def urandom_rng(n):
- """Try to read some entropy from the platform entropy source."""
- f = open('/dev/urandom', 'rb')
- try:
- return f.read(n)
- finally:
- f.close()
+ """Try to read some entropy from the platform entropy source."""
+ f = open('/dev/urandom', 'rb')
+ try:
+ return f.read(n)
+ finally:
+ f.close()
def s2k_gen(secret, rng=None):
- """DOCDOC"""
- if rng is None:
- if hasattr(os, "urandom"):
- rng = os.urandom
- else:
- rng = urandom_rng
- spec = "%s%s"%(rng(8), chr(96))
- return "16:%s"%(
- binascii.b2a_hex(spec + secret_to_key(secret, spec)))
+ """DOCDOC"""
+ if rng is None:
+ if hasattr(os, "urandom"):
+ rng = os.urandom
+ else:
+ rng = urandom_rng
+ spec = "%s%s"%(rng(8), chr(96))
+ return "16:%s"%(
+ binascii.b2a_hex(spec + secret_to_key(secret, spec)))
def s2k_check(secret, k):
- """DOCDOC"""
- assert k[:3] == "16:"
+ """DOCDOC"""
+ assert k[:3] == "16:"
- k = binascii.a2b_hex(k[3:])
- return secret_to_key(secret, k[:9]) == k[9:]
+ k = binascii.a2b_hex(k[3:])
+ return secret_to_key(secret, k[:9]) == k[9:]
## XXX: Make this a class?
@@ -187,7 +187,7 @@
loglevels = {"DEBUG" : 0, "INFO" : 1, "NOTICE" : 2, "WARN" : 3, "ERROR" : 4}
def plog(level, msg): # XXX: Timestamps
- if(loglevels[level] >= loglevels[loglevel]):
- print level + ": " + msg
- sys.stdout.flush()
+ if(loglevels[level] >= loglevels[loglevel]):
+ print level + ": " + msg
+ sys.stdout.flush()
Modified: torflow/trunk/metatroller.py
===================================================================
--- torflow/trunk/metatroller.py 2007-03-12 21:07:19 UTC (rev 9806)
+++ torflow/trunk/metatroller.py 2007-03-13 01:36:12 UTC (rev 9807)
@@ -34,601 +34,599 @@
# Use PathBuilder.schedule_reconfigure instead.
# (Modifying the arguments here is OK)
__selmgr = PathSupport.SelectionManager(
- pathlen=3,
- order_exits=False,
- percent_fast=100,
- percent_skip=0,
- min_bw=1024,
- use_all_exits=False,
- uniform=True,
- use_exit=None,
- use_guards=False)
+ pathlen=3,
+ order_exits=False,
+ percent_fast=100,
+ percent_skip=0,
+ min_bw=1024,
+ use_all_exits=False,
+ uniform=True,
+ use_exit=None,
+ use_guards=False)
# Technically we could just add member vars as we need them, but this
# is a bit more clear
class StatsRouter(TorCtl.Router):
- def __init__(self, router): # Promotion constructor :)
- self.__dict__ = router.__dict__
- self.reset()
-
- def reset(self):
- self.circ_uncounted = 0
- self.circ_failed = 0
- self.circ_succeeded = 0 # disjoint from failed (for verification only)
- self.circ_suspected = 0
- self.circ_chosen = 0 # above 4 should add to this
- self.strm_failed = 0 # Only exits should have these
- self.strm_succeeded = 0
- self.strm_suspected = 0 # disjoint from failed (for verification only)
- self.strm_uncounted = 0
- self.strm_chosen = 0 # above 3 should add to this
- self.reason_suspected = {}
- self.reason_failed = {}
- self.first_seen = time.time()
- if "Running" in self.flags:
- self.became_active_at = self.first_seen
- self.hibernated_at = 0
- else:
- self.became_active_at = 0
- self.hibernated_at = self.first_seen
- self.total_hibernation_time = 0
- self.total_active_uptime = 0
- self.max_bw = 0
- self.min_bw = 0
- self.avg_bw = 0
+ def __init__(self, router): # Promotion constructor :)
+ self.__dict__ = router.__dict__
+ self.reset()
+
+ def reset(self):
+ self.circ_uncounted = 0
+ self.circ_failed = 0
+ self.circ_succeeded = 0 # disjoint from failed (for verification only)
+ self.circ_suspected = 0
+ self.circ_chosen = 0 # above 4 should add to this
+ self.strm_failed = 0 # Only exits should have these
+ self.strm_succeeded = 0
+ self.strm_suspected = 0 # disjoint from failed (for verification only)
+ self.strm_uncounted = 0
+ self.strm_chosen = 0 # above 3 should add to this
+ self.reason_suspected = {}
+ self.reason_failed = {}
+ self.first_seen = time.time()
+ if "Running" in self.flags:
+ self.became_active_at = self.first_seen
+ self.hibernated_at = 0
+ else:
+ self.became_active_at = 0
+ self.hibernated_at = self.first_seen
+ self.total_hibernation_time = 0
+ self.total_active_uptime = 0
+ self.max_bw = 0
+ self.min_bw = 0
+ self.avg_bw = 0
- def current_uptime(self):
- if self.became_active_at:
- ret = (self.total_active_uptime+(time.time()-self.became_active_at))
- else:
- ret = self.total_active_uptime
- if ret == 0: return 0.000005 # eh..
- else: return ret
-
- def failed_per_hour(self):
- return (3600.*(self.circ_failed+self.strm_failed))/self.current_uptime()
+ def current_uptime(self):
+ if self.became_active_at:
+ ret = (self.total_active_uptime+(time.time()-self.became_active_at))
+ else:
+ ret = self.total_active_uptime
+ if ret == 0: return 0.000005 # eh..
+ else: return ret
+
+ def failed_per_hour(self):
+ return (3600.*(self.circ_failed+self.strm_failed))/self.current_uptime()
- def suspected_per_hour(self):
- return (3600.*(self.circ_suspected+self.strm_suspected
- +self.circ_failed+self.strm_failed))/self.current_uptime()
+ def suspected_per_hour(self):
+ return (3600.*(self.circ_suspected+self.strm_suspected
+ +self.circ_failed+self.strm_failed))/self.current_uptime()
- # These four are for sanity checking
- def _suspected_per_hour(self):
- return (3600.*(self.circ_suspected+self.strm_suspected))/self.current_uptime()
+ # These four are for sanity checking
+ def _suspected_per_hour(self):
+ return (3600.*(self.circ_suspected+self.strm_suspected))/self.current_uptime()
- def _uncounted_per_hour(self):
- return (3600.*(self.circ_uncounted+self.strm_uncounted))/self.current_uptime()
+ def _uncounted_per_hour(self):
+ return (3600.*(self.circ_uncounted+self.strm_uncounted))/self.current_uptime()
- def _chosen_per_hour(self):
- return (3600.*(self.circ_chosen+self.strm_chosen))/self.current_uptime()
+ def _chosen_per_hour(self):
+ return (3600.*(self.circ_chosen+self.strm_chosen))/self.current_uptime()
- def _succeeded_per_hour(self):
- return (3600.*(self.circ_succeeded+self.strm_succeeded))/self.current_uptime()
-
- def __str__(self):
- return (self.idhex+" ("+self.nickname+")\n\t"
- +" CF=" +str(self.circ_failed)
- +" CS="+ str(self.circ_suspected+self.circ_failed)
- +" CC="+ str(self.circ_chosen)
- +" SF="+ str(self.strm_failed)
- +" SS="+ str(self.strm_suspected+self.strm_failed)
- +" SC="+ str(self.strm_chosen)
- +" FR="+ str(round(self.failed_per_hour(),2))
- +" SR="+ str(round(self.suspected_per_hour(),2))
- +" Up="+str(round(self.current_uptime()/3600, 1))+"h\n")
+ def _succeeded_per_hour(self):
+ return (3600.*(self.circ_succeeded+self.strm_succeeded))/self.current_uptime()
+
+ def __str__(self):
+ return (self.idhex+" ("+self.nickname+")\n\t"
+ +" CF="+str(self.circ_failed)
+ +" CS="+str(self.circ_suspected+self.circ_failed)
+ +" CC="+str(self.circ_chosen)
+ +" SF="+str(self.strm_failed)
+ +" SS="+str(self.strm_suspected+self.strm_failed)
+ +" SC="+str(self.strm_chosen)
+ +" FH="+str(round(self.failed_per_hour(),2))
+ +" SH="+str(round(self.suspected_per_hour(),2))
+ +" Up="+str(round(self.current_uptime()/3600, 1))+"h\n")
- def sanity_check(self):
- if self.circ_failed + self.circ_succeeded + self.circ_suspected \
- + self.circ_uncounted != self.circ_chosen:
- plog("ERROR", self.nickname+" does not add up for circs")
- if self.strm_failed + self.strm_succeeded + self.strm_suspected \
- + self.strm_uncounted != self.strm_chosen:
- plog("ERROR", self.nickname+" does not add up for streams")
- def check_reasons(reasons, expected, which, rtype):
- count = 0
- for rs in reasons.iterkeys():
- if re.search(r"^"+which, rs): count += reasons[rs]
- if count != expected:
- plog("ERROR", "Mismatch "+which+" "+rtype+" for "+self.nickname)
- check_reasons(self.reason_suspected,self.strm_suspected,"STREAM","susp")
- check_reasons(self.reason_suspected,self.circ_suspected,"CIRC","susp")
- check_reasons(self.reason_failed,self.strm_failed,"STREAM","failed")
- check_reasons(self.reason_failed,self.circ_failed,"CIRC","failed")
- now = time.time()
- tot_hib_time = self.total_hibernation_time
- tot_uptime = self.total_active_uptime
- if self.hibernated_at: tot_hib_time += now - self.hibernated_at
- if self.became_active_at: tot_uptime += now - self.became_active_at
- if round(tot_hib_time+tot_uptime) != round(now-self.first_seen):
- plog("ERROR", "Mismatch of uptimes for "+self.nickname)
-
- per_hour_tot = round(self._uncounted_per_hour()+self.failed_per_hour()+
- self._suspected_per_hour()+self._succeeded_per_hour(), 2)
- chosen_tot = round(self._chosen_per_hour(), 2)
- if per_hour_tot != chosen_tot:
- plog("ERROR", self.nickname+" has mismatch of per hour counts: "+str(per_hour_tot) +" vs "+str(chosen_tot))
+ def sanity_check(self):
+ if self.circ_failed + self.circ_succeeded + self.circ_suspected \
+ + self.circ_uncounted != self.circ_chosen:
+ plog("ERROR", self.nickname+" does not add up for circs")
+ if self.strm_failed + self.strm_succeeded + self.strm_suspected \
+ + self.strm_uncounted != self.strm_chosen:
+ plog("ERROR", self.nickname+" does not add up for streams")
+ def check_reasons(reasons, expected, which, rtype):
+ count = 0
+ for rs in reasons.iterkeys():
+ if re.search(r"^"+which, rs): count += reasons[rs]
+ if count != expected:
+ plog("ERROR", "Mismatch "+which+" "+rtype+" for "+self.nickname)
+ check_reasons(self.reason_suspected,self.strm_suspected,"STREAM","susp")
+ check_reasons(self.reason_suspected,self.circ_suspected,"CIRC","susp")
+ check_reasons(self.reason_failed,self.strm_failed,"STREAM","failed")
+ check_reasons(self.reason_failed,self.circ_failed,"CIRC","failed")
+ now = time.time()
+ tot_hib_time = self.total_hibernation_time
+ tot_uptime = self.total_active_uptime
+ if self.hibernated_at: tot_hib_time += now - self.hibernated_at
+ if self.became_active_at: tot_uptime += now - self.became_active_at
+ if round(tot_hib_time+tot_uptime) != round(now-self.first_seen):
+ plog("ERROR", "Mismatch of uptimes for "+self.nickname)
+
+ per_hour_tot = round(self._uncounted_per_hour()+self.failed_per_hour()+
+ self._suspected_per_hour()+self._succeeded_per_hour(), 2)
+ chosen_tot = round(self._chosen_per_hour(), 2)
+ if per_hour_tot != chosen_tot:
+ plog("ERROR", self.nickname+" has mismatch of per hour counts: "+str(per_hour_tot) +" vs "+str(chosen_tot))
class ReasonRouterList:
- "Helper class to track which reasons are in which routers."
- def __init__(self, reason):
- self.reason = reason
- self.rlist = {}
+ "Helper class to track which reasons are in which routers."
+ def __init__(self, reason):
+ self.reason = reason
+ self.rlist = {}
- def sort_list(self): raise NotImplemented()
+ def sort_list(self): raise NotImplemented()
- def write_list(self, f):
- rlist = self.sort_list()
- for r in rlist:
- f.write(r.idhex+" ("+r.nickname+") Fail=")
- if self.reason in r.reason_failed:
- f.write(str(r.reason_failed[self.reason]))
- else: f.write("0")
- f.write(" Susp=")
- if self.reason in r.reason_suspected:
- f.write(str(r.reason_suspected[self.reason])+"\n")
- else: f.write("0\n")
-
- def add_r(self, r):
- self.rlist[r] = 1
+ def write_list(self, f):
+ rlist = self.sort_list()
+ for r in rlist:
+ f.write(r.idhex+" ("+r.nickname+") Fail=")
+ if self.reason in r.reason_failed:
+ f.write(str(r.reason_failed[self.reason]))
+ else: f.write("0")
+ f.write(" Susp=")
+ if self.reason in r.reason_suspected:
+ f.write(str(r.reason_suspected[self.reason])+"\n")
+ else: f.write("0\n")
+
+ def add_r(self, r):
+ self.rlist[r] = 1
- def total_suspected(self):
- # suspected is disjoint from failed. The failed table
- # may not have an entry
- def notlambda(x, y):
- if self.reason in y.reason_suspected:
- if self.reason in y.reason_failed:
- return (x + y.reason_suspected[self.reason]
- + y.reason_failed[self.reason])
- else:
- return (x + y.reason_suspected[self.reason])
- else:
- if self.reason in y.reason_failed:
- return (x + y.reason_failed[self.reason])
- else: return x
- return reduce(notlambda, self.rlist.iterkeys(), 0)
+ def total_suspected(self):
+ # suspected is disjoint from failed. The failed table
+ # may not have an entry
+ def notlambda(x, y):
+ if self.reason in y.reason_suspected:
+ if self.reason in y.reason_failed:
+ return (x + y.reason_suspected[self.reason]
+ + y.reason_failed[self.reason])
+ else:
+ return (x + y.reason_suspected[self.reason])
+ else:
+ if self.reason in y.reason_failed:
+ return (x + y.reason_failed[self.reason])
+ else: return x
+ return reduce(notlambda, self.rlist.iterkeys(), 0)
- def total_failed(self):
- def notlambda(x, y):
- if self.reason in y.reason_failed:
- return (x + y.reason_failed[self.reason])
- else: return x
- return reduce(notlambda, self.rlist.iterkeys(), 0)
+ def total_failed(self):
+ def notlambda(x, y):
+ if self.reason in y.reason_failed:
+ return (x + y.reason_failed[self.reason])
+ else: return x
+ return reduce(notlambda, self.rlist.iterkeys(), 0)
class SuspectRouterList(ReasonRouterList):
- def __init__(self, reason): ReasonRouterList.__init__(self,reason)
-
- def sort_list(self):
- rlist = self.rlist.keys()
- rlist.sort(lambda x, y: cmp(y.reason_suspected[self.reason],
- x.reason_suspected[self.reason]))
- return rlist
+ def __init__(self, reason): ReasonRouterList.__init__(self,reason)
+
+ def sort_list(self):
+ rlist = self.rlist.keys()
+ rlist.sort(lambda x, y: cmp(y.reason_suspected[self.reason],
+ x.reason_suspected[self.reason]))
+ return rlist
- def _verify_suspected(self):
- return reduce(lambda x, y: x + y.reason_suspected[self.reason],
- self.rlist.iterkeys(), 0)
+ def _verify_suspected(self):
+ return reduce(lambda x, y: x + y.reason_suspected[self.reason],
+ self.rlist.iterkeys(), 0)
class FailedRouterList(ReasonRouterList):
- def __init__(self, reason): ReasonRouterList.__init__(self,reason)
+ def __init__(self, reason): ReasonRouterList.__init__(self,reason)
- def sort_list(self):
- rlist = self.rlist.keys()
- rlist.sort(lambda x, y: cmp(y.reason_failed[self.reason],
- x.reason_failed[self.reason]))
- return rlist
+ def sort_list(self):
+ rlist = self.rlist.keys()
+ rlist.sort(lambda x, y: cmp(y.reason_failed[self.reason],
+ x.reason_failed[self.reason]))
+ return rlist
- def _verify_failed(self):
- return reduce(lambda x, y: x + y.reason_failed[self.reason],
- self.rlist.iterkeys(), 0)
+ def _verify_failed(self):
+ return reduce(lambda x, y: x + y.reason_failed[self.reason],
+ self.rlist.iterkeys(), 0)
class StatsHandler(PathSupport.PathBuilder):
- def __init__(self, c, slmgr):
- PathBuilder.__init__(self, c, slmgr, StatsRouter)
- self.failed_reasons = {}
- self.suspect_reasons = {}
+ def __init__(self, c, slmgr):
+ PathBuilder.__init__(self, c, slmgr, StatsRouter)
+ self.failed_reasons = {}
+ self.suspect_reasons = {}
- def write_reasons(self, f, reasons, name):
- f.write("\n\n\t------------------- "+name+" -------------------\n")
- for rsn in reasons:
- f.write("\nReason="+rsn.reason+". Failed: "+str(rsn.total_failed())
- +", Suspected: "+str(rsn.total_suspected())+"\n")
- rsn.write_list(f)
+ def write_reasons(self, f, reasons, name):
+ f.write("\n\n\t------------------- "+name+" -------------------\n")
+ for rsn in reasons:
+ f.write("\nReason="+rsn.reason+". Failed: "+str(rsn.total_failed())
+ +", Suspected: "+str(rsn.total_suspected())+"\n")
+ rsn.write_list(f)
- def write_routers(self, f, rlist, name):
- f.write("\n\n\t------------------- "+name+" -------------------\n\n")
- for r in rlist:
- f.write(str(r))
+ def write_routers(self, f, rlist, name):
+ f.write("\n\n\t------------------- "+name+" -------------------\n\n")
+ for r in rlist:
+ f.write(str(r))
- def write_stats(self, filename):
- plog("DEBUG", "Writing stats")
- # Sanity check routers
- # TODO: all sanity checks should be turned off once its stable.
- for r in self.sorted_r: r.sanity_check()
+ def write_stats(self, filename):
+ plog("DEBUG", "Writing stats")
+ # Sanity check routers
+ # TODO: all sanity checks should be turned off once its stable.
+ for r in self.sorted_r: r.sanity_check()
- # Sanity check the router reason lists.
- for r in self.sorted_r:
- for rsn in r.reason_failed:
- if r not in self.failed_reasons[rsn].rlist:
- plog("ERROR", "Router missing from reason table")
- for rsn in r.reason_suspected:
- if r not in self.suspect_reasons[rsn].rlist:
- plog("ERROR", "Router missing from reason table")
+ # Sanity check the router reason lists.
+ for r in self.sorted_r:
+ for rsn in r.reason_failed:
+ if r not in self.failed_reasons[rsn].rlist:
+ plog("ERROR", "Router missing from reason table")
+ for rsn in r.reason_suspected:
+ if r not in self.suspect_reasons[rsn].rlist:
+ plog("ERROR", "Router missing from reason table")
- # Sanity check the lists the other way
- for rsn in self.failed_reasons.itervalues(): rsn._verify_failed()
- for rsn in self.suspect_reasons.itervalues(): rsn._verify_suspected()
+ # Sanity check the lists the other way
+ for rsn in self.failed_reasons.itervalues(): rsn._verify_failed()
+ for rsn in self.suspect_reasons.itervalues(): rsn._verify_suspected()
- f = file(filename, "w")
+ f = file(filename, "w")
- # FIXME: Print out key/legend header
- failed = copy.copy(self.sorted_r)
- failed.sort(lambda x, y:
- cmp(y.circ_failed+y.strm_failed,
- x.circ_failed+x.strm_failed))
- self.write_routers(f, failed, "Failed Counts")
+ # FIXME: Print out key/legend header
+ failed = copy.copy(self.sorted_r)
+ failed.sort(lambda x, y:
+ cmp(y.circ_failed+y.strm_failed,
+ x.circ_failed+x.strm_failed))
+ self.write_routers(f, failed, "Failed Counts")
- suspected = copy.copy(self.sorted_r)
- suspected.sort(lambda x, y: # Suspected includes failed
- cmp(y.circ_failed+y.strm_failed+y.circ_suspected+y.strm_suspected,
- x.circ_failed+x.strm_failed+x.circ_suspected+x.strm_suspected))
- self.write_routers(f, suspected, "Suspected Counts")
+ suspected = copy.copy(self.sorted_r)
+ suspected.sort(lambda x, y: # Suspected includes failed
+ cmp(y.circ_failed+y.strm_failed+y.circ_suspected+y.strm_suspected,
+ x.circ_failed+x.strm_failed+x.circ_suspected+x.strm_suspected))
+ self.write_routers(f, suspected, "Suspected Counts")
- fail_rate = copy.copy(failed)
- fail_rate.sort(lambda x, y:
- cmp(y.failed_per_hour(), x.failed_per_hour()))
- self.write_routers(f, fail_rate, "Fail Rates")
+ fail_rate = copy.copy(failed)
+ fail_rate.sort(lambda x, y:
+ cmp(y.failed_per_hour(), x.failed_per_hour()))
+ self.write_routers(f, fail_rate, "Fail Rates")
- suspect_rate = copy.copy(suspected)
- suspect_rate.sort(lambda x, y:
- cmp(y.suspected_per_hour(), x.suspected_per_hour()))
- self.write_routers(f, suspect_rate, "Suspect Rates")
+ suspect_rate = copy.copy(suspected)
+ suspect_rate.sort(lambda x, y:
+ cmp(y.suspected_per_hour(), x.suspected_per_hour()))
+ self.write_routers(f, suspect_rate, "Suspect Rates")
- # TODO: Sort by failed/selected and suspect/selected ratios
- # if we ever want to do non-uniform scanning..
+ # TODO: Sort by failed/selected and suspect/selected ratios
+ # if we ever want to do non-uniform scanning..
- susp_reasons = self.suspect_reasons.values()
- susp_reasons.sort(lambda x, y:
- cmp(y.total_suspected(), x.total_suspected()))
- self.write_reasons(f, susp_reasons, "Suspect Reasons")
+ susp_reasons = self.suspect_reasons.values()
+ susp_reasons.sort(lambda x, y:
+ cmp(y.total_suspected(), x.total_suspected()))
+ self.write_reasons(f, susp_reasons, "Suspect Reasons")
- fail_reasons = self.failed_reasons.values()
- fail_reasons.sort(lambda x, y:
- cmp(y.total_failed(), x.total_failed()))
- self.write_reasons(f, fail_reasons, "Failed Reasons")
- f.close()
+ fail_reasons = self.failed_reasons.values()
+ fail_reasons.sort(lambda x, y:
+ cmp(y.total_failed(), x.total_failed()))
+ self.write_reasons(f, fail_reasons, "Failed Reasons")
+ f.close()
- def reset_stats(self):
- for r in self.sorted_r:
- r.reset()
+ def reset_stats(self):
+ for r in self.sorted_r:
+ r.reset()
- # TODO: Use stream bandwidth events to implement reputation system
- # from
- # http://www.cs.colorado.edu/department/publications/reports/docs/CU-CS-1025-07.pdf
- # aha! the way to detect lying nodes as a client is to test
- # their bandwidths in tiers.. only make circuits of nodes of
- # the same bandwidth.. Then look for nodes with odd avg bandwidths
+ # TODO: Use stream bandwidth events to implement reputation system
+ # from
+ # http://www.cs.colorado.edu/department/publications/reports/docs/CU-CS-1025-07.pdf
+ # aha! the way to detect lying nodes as a client is to test
+ # their bandwidths in tiers.. only make circuits of nodes of
+ # the same bandwidth.. Then look for nodes with odd avg bandwidths
- def circ_status_event(self, c):
- if c.circ_id in self.circuits:
- # TODO: Hrmm, consider making this sane in TorCtl.
- if c.reason: lreason = c.reason
- else: lreason = "NONE"
- if c.remote_reason: rreason = c.remote_reason
- else: rreason = "NONE"
- reason = c.event_name+":"+c.status+":"+lreason+":"+rreason
- if c.status == "FAILED":
- # update selection count
- for r in self.circuits[c.circ_id].path: r.circ_chosen += 1
-
- if len(c.path)-1 < 0: start_f = 0
- else: start_f = len(c.path)-1
+ def circ_status_event(self, c):
+ if c.circ_id in self.circuits:
+ # TODO: Hrmm, consider making this sane in TorCtl.
+ if c.reason: lreason = c.reason
+ else: lreason = "NONE"
+ if c.remote_reason: rreason = c.remote_reason
+ else: rreason = "NONE"
+ reason = c.event_name+":"+c.status+":"+lreason+":"+rreason
+ if c.status == "FAILED":
+ # update selection count
+ for r in self.circuits[c.circ_id].path: r.circ_chosen += 1
+
+ if len(c.path)-1 < 0: start_f = 0
+ else: start_f = len(c.path)-1
- # Count failed
- for r in self.circuits[c.circ_id].path[start_f:len(c.path)+1]:
- r.circ_failed += 1
- if not reason in r.reason_failed:
- r.reason_failed[reason] = 1
- else: r.reason_failed[reason]+=1
- if reason not in self.failed_reasons:
- self.failed_reasons[reason] = FailedRouterList(reason)
- self.failed_reasons[reason].add_r(r)
+ # Count failed
+ for r in self.circuits[c.circ_id].path[start_f:len(c.path)+1]:
+ r.circ_failed += 1
+ if not reason in r.reason_failed:
+ r.reason_failed[reason] = 1
+ else: r.reason_failed[reason]+=1
+ if reason not in self.failed_reasons:
+ self.failed_reasons[reason] = FailedRouterList(reason)
+ self.failed_reasons[reason].add_r(r)
- for r in self.circuits[c.circ_id].path[len(c.path)+1:]:
- r.circ_uncounted += 1
+ for r in self.circuits[c.circ_id].path[len(c.path)+1:]:
+ r.circ_uncounted += 1
- # Don't count if failed was set this round, don't set
- # suspected..
- for r in self.circuits[c.circ_id].path[:start_f]:
- r.circ_suspected += 1
- if not reason in r.reason_suspected:
- r.reason_suspected[reason] = 1
- else: r.reason_suspected[reason]+=1
- if reason not in self.suspect_reasons:
- self.suspect_reasons[reason] = SuspectRouterList(reason)
- self.suspect_reasons[reason].add_r(r)
- elif c.status == "CLOSED":
- # Since PathBuilder deletes the circuit on a failed,
- # we only get this for a clean close
- # Update circ_chosen count
- for r in self.circuits[c.circ_id].path:
- r.circ_chosen += 1
-
- if lreason in ("REQUESTED", "FINISHED", "ORIGIN"):
- r.circ_succeeded += 1
- else:
- if not reason in r.reason_suspected:
- r.reason_suspected[reason] = 1
- else: r.reason_suspected[reason] += 1
- r.circ_suspected+= 1
- if reason not in self.suspect_reasons:
- self.suspect_reasons[reason] = SuspectRouterList(reason)
- self.suspect_reasons[reason].add_r(r)
- PathBuilder.circ_status_event(self, c)
-
- def stream_status_event(self, s):
- if s.strm_id in self.streams:
- # TODO: Hrmm, consider making this sane in TorCtl.
- if s.reason: lreason = s.reason
- else: lreason = "NONE"
- if s.remote_reason: rreason = s.remote_reason
- else: rreason = "NONE"
- reason = s.event_name+":"+s.status+":"+lreason+":"+rreason+":"+self.streams[s.strm_id].kind
- if s.status in ("DETACHED", "FAILED", "CLOSED", "SUCCEEDED") \
- and not s.circ_id:
- # XXX: REMAPs can do this (normal). Also REASON=DESTROY (bug?)
- # Also timeouts.. Those should use the pending circ instead
- # of returning..
- plog("WARN", "Stream "+str(s.strm_id)+" detached from no circuit!")
- PathBuilder.stream_status_event(self, s)
- return
- if s.status == "DETACHED" or s.status == "FAILED":
- # Update strm_chosen count
- # FIXME: use SENTRESOLVE/SENTCONNECT instead?
- for r in self.circuits[s.circ_id].path: r.strm_chosen += 1
- # Update failed count,reason_failed for exit
- r = self.circuits[s.circ_id].exit
- if not reason in r.reason_failed: r.reason_failed[reason] = 1
- else: r.reason_failed[reason]+=1
- r.strm_failed += 1
- if reason not in self.failed_reasons:
- self.failed_reasons[reason] = FailedRouterList(reason)
- self.failed_reasons[reason].add_r(r)
-
- # If reason=timeout, update suspected for all
- if lreason in ("TIMEOUT", "INTERNAL", "TORPROTOCOL", "DESTROY"):
- for r in self.circuits[s.circ_id].path[:-1]:
- r.strm_suspected += 1
- if not reason in r.reason_suspected:
- r.reason_suspected[reason] = 1
- else: r.reason_suspected[reason]+=1
- if reason not in self.suspect_reasons:
- self.suspect_reasons[reason] = SuspectRouterList(reason)
- self.suspect_reasons[reason].add_r(r)
- else:
- for r in self.circuits[s.circ_id].path[:-1]:
- r.strm_uncounted += 1
- elif s.status == "CLOSED":
- # Always get both a closed and a failed..
- # - Check if the circuit exists still
- # XXX: Save both closed and failed reason in stream object
- if s.circ_id in self.circuits:
- # Update strm_chosen count
- for r in self.circuits[s.circ_id].path: r.strm_chosen += 1
- if lreason in ("TIMEOUT", "INTERNAL", "TORPROTOCOL" "DESTROY"):
- for r in self.circuits[s.circ_id].path[:-1]:
- r.strm_suspected += 1
- if not reason in r.reason_suspected:
- r.reason_suspected[reason] = 1
- else: r.reason_suspected[reason]+=1
- if reason not in self.suspect_reasons:
- self.suspect_reasons[reason] = SuspectRouterList(reason)
- self.suspect_reasons[reason].add_r(r)
- else:
- for r in self.circuits[s.circ_id].path[:-1]:
- r.strm_uncounted += 1
-
- r = self.circuits[s.circ_id].exit
- if lreason == "DONE":
- r.strm_succeeded += 1
- else:
- if not reason in r.reason_failed:
- r.reason_failed[reason] = 1
- else: r.reason_failed[reason]+=1
- r.strm_failed += 1
- if reason not in self.failed_reasons:
- self.failed_reasons[reason] = FailedRouterList(reason)
- self.failed_reasons[reason].add_r(r)
+ # Don't count if failed was set this round, don't set
+ # suspected..
+ for r in self.circuits[c.circ_id].path[:start_f]:
+ r.circ_suspected += 1
+ if not reason in r.reason_suspected:
+ r.reason_suspected[reason] = 1
+ else: r.reason_suspected[reason]+=1
+ if reason not in self.suspect_reasons:
+ self.suspect_reasons[reason] = SuspectRouterList(reason)
+ self.suspect_reasons[reason].add_r(r)
+ elif c.status == "CLOSED":
+ # Since PathBuilder deletes the circuit on a failed,
+ # we only get this for a clean close
+ # Update circ_chosen count
+ for r in self.circuits[c.circ_id].path:
+ r.circ_chosen += 1
+
+ if lreason in ("REQUESTED", "FINISHED", "ORIGIN"):
+ r.circ_succeeded += 1
+ else:
+ if not reason in r.reason_suspected:
+ r.reason_suspected[reason] = 1
+ else: r.reason_suspected[reason] += 1
+ r.circ_suspected+= 1
+ if reason not in self.suspect_reasons:
+ self.suspect_reasons[reason] = SuspectRouterList(reason)
+ self.suspect_reasons[reason].add_r(r)
+ PathBuilder.circ_status_event(self, c)
+
+ def stream_status_event(self, s):
+ if s.strm_id in self.streams:
+ # TODO: Hrmm, consider making this sane in TorCtl.
+ if s.reason: lreason = s.reason
+ else: lreason = "NONE"
+ if s.remote_reason: rreason = s.remote_reason
+ else: rreason = "NONE"
+ reason = s.event_name+":"+s.status+":"+lreason+":"+rreason+":"+self.streams[s.strm_id].kind
+ if s.status in ("DETACHED", "FAILED", "CLOSED", "SUCCEEDED") \
+ and not s.circ_id:
+ # XXX: REMAPs can do this (normal). Also REASON=DESTROY (bug?)
+ # Also timeouts.. Those should use the pending circ instead
+ # of returning..
+ plog("WARN", "Stream "+str(s.strm_id)+" detached from no circuit!")
PathBuilder.stream_status_event(self, s)
+ return
+ if s.status == "DETACHED" or s.status == "FAILED":
+ # Update strm_chosen count
+ # FIXME: use SENTRESOLVE/SENTCONNECT instead?
+ for r in self.circuits[s.circ_id].path: r.strm_chosen += 1
+ # Update failed count,reason_failed for exit
+ r = self.circuits[s.circ_id].exit
+ if not reason in r.reason_failed: r.reason_failed[reason] = 1
+ else: r.reason_failed[reason]+=1
+ r.strm_failed += 1
+ if reason not in self.failed_reasons:
+ self.failed_reasons[reason] = FailedRouterList(reason)
+ self.failed_reasons[reason].add_r(r)
- def ns_event(self, n):
- PathBuilder.ns_event(self, n)
- now = time.time()
- for ns in n.nslist:
- if not ns.idhex in self.routers:
- continue
- r = self.routers[ns.idhex]
- if "Running" in ns.flags:
- if not r.became_active_at:
- r.became_active_at = now
- r.total_hibernation_time += now - r.hibernated_at
- r.hibernated_at = 0
- else:
- if not r.hibernated_at:
- r.hibernated_at = now
- r.total_active_uptime += now - r.became_active_at
- r.became_active_at = 0
-
+ # If reason=timeout, update suspected for all
+ if lreason in ("TIMEOUT", "INTERNAL", "TORPROTOCOL", "DESTROY"):
+ for r in self.circuits[s.circ_id].path[:-1]:
+ r.strm_suspected += 1
+ if not reason in r.reason_suspected:
+ r.reason_suspected[reason] = 1
+ else: r.reason_suspected[reason]+=1
+ if reason not in self.suspect_reasons:
+ self.suspect_reasons[reason] = SuspectRouterList(reason)
+ self.suspect_reasons[reason].add_r(r)
+ else:
+ for r in self.circuits[s.circ_id].path[:-1]:
+ r.strm_uncounted += 1
+ elif s.status == "CLOSED":
+ # Always get both a closed and a failed..
+ # - Check if the circuit exists still
+ # XXX: Save both closed and failed reason in stream object
+ if s.circ_id in self.circuits:
+ # Update strm_chosen count
+ for r in self.circuits[s.circ_id].path: r.strm_chosen += 1
+ if lreason in ("TIMEOUT", "INTERNAL", "TORPROTOCOL" "DESTROY"):
+ for r in self.circuits[s.circ_id].path[:-1]:
+ r.strm_suspected += 1
+ if not reason in r.reason_suspected:
+ r.reason_suspected[reason] = 1
+ else: r.reason_suspected[reason]+=1
+ if reason not in self.suspect_reasons:
+ self.suspect_reasons[reason] = SuspectRouterList(reason)
+ self.suspect_reasons[reason].add_r(r)
+ else:
+ for r in self.circuits[s.circ_id].path[:-1]:
+ r.strm_uncounted += 1
+
+ r = self.circuits[s.circ_id].exit
+ if lreason == "DONE":
+ r.strm_succeeded += 1
+ else:
+ if not reason in r.reason_failed:
+ r.reason_failed[reason] = 1
+ else: r.reason_failed[reason]+=1
+ r.strm_failed += 1
+ if reason not in self.failed_reasons:
+ self.failed_reasons[reason] = FailedRouterList(reason)
+ self.failed_reasons[reason].add_r(r)
+ PathBuilder.stream_status_event(self, s)
+ def ns_event(self, n):
+ PathBuilder.ns_event(self, n)
+ now = time.time()
+ for ns in n.nslist:
+ if not ns.idhex in self.routers:
+ continue
+ r = self.routers[ns.idhex]
+ if "Running" in ns.flags:
+ if not r.became_active_at:
+ r.became_active_at = now
+ r.total_hibernation_time += now - r.hibernated_at
+ r.hibernated_at = 0
+ else:
+ if not r.hibernated_at:
+ r.hibernated_at = now
+ r.total_active_uptime += now - r.became_active_at
+ r.became_active_at = 0
+
+
def clear_dns_cache(c):
- lines = c.sendAndRecv("SIGNAL CLEARDNSCACHE\r\n")
- for _,msg,more in lines:
- plog("DEBUG", msg)
+ lines = c.sendAndRecv("SIGNAL CLEARDNSCACHE\r\n")
+ for _,msg,more in lines:
+ plog("DEBUG", msg)
def commandloop(s, c, h):
- s.write("220 Welcome to the Tor Metatroller "+mt_version+"! Try HELP for Info\r\n\r\n")
- while 1:
- buf = s.readline()
- if not buf: break
-
- m = re.search(r"^(\S+)(?:\s(\S+))?", buf)
- if not m:
- s.write("500 Guido insults you for thinking '"+buf+
- "' could possibly be a metatroller command\r\n")
- continue
- (command, arg) = m.groups()
- if command == "GETLASTEXIT":
- # local assignment avoids need for lock w/ GIL
- # http://effbot.org/pyfaq/can-t-we-get-rid-of-the-global-interpreter-lock.htm
- # http://effbot.org/pyfaq/what-kinds-of-global-value-mutation-are-thread-safe.htm
- le = h.last_exit
- if le:
- s.write("250 LASTEXIT=$"+le.idhex+" ("+le.nickname+") OK\r\n")
- else:
- s.write("250 LASTEXIT=0 (0) OK\r\n")
- elif command == "NEWEXIT" or command == "NEWNYM":
- clear_dns_cache(c)
- h.new_nym = True # GIL hack
- plog("DEBUG", "Got new nym")
- s.write("250 NEWNYM OK\r\n")
- elif command == "GETDNSEXIT":
- pass # TODO: Takes a hostname? Or prints most recent?
- elif command == "RESETSTATS":
- s.write("250 OK\r\n")
- elif command == "ORDEREXITS":
- try:
- if arg:
- order_exits = int(arg)
- def notlambda(sm): sm.order_exits=order_exits
- h.schedule_selmgr(notlambda)
- s.write("250 ORDEREXITS="+str(order_exits)+" OK\r\n")
- except ValueError:
- s.write("510 Integer expected\r\n")
- elif command == "USEALLEXITS":
- try:
- if arg:
- use_all_exits = int(arg)
- def notlambda(sm): sm.use_all_exits=use_all_exits
- h.schedule_selmgr(notlambda)
- s.write("250 USEALLEXITS="+str(use_all_exits)+" OK\r\n")
- except ValueError:
- s.write("510 Integer expected\r\n")
- elif command == "PRECIRCUITS":
- try:
- if arg:
- num_circuits = int(arg)
- def notlambda(pb): pb.num_circuits=num_circuits
- h.schedule_immediate(notlambda)
- s.write("250 PRECIRCUITS="+str(num_circuits)+" OK\r\n")
- except ValueError:
- s.write("510 Integer expected\r\n")
- elif command == "RESOLVEPORT":
- try:
- if arg:
- resolve_port = int(arg)
- def notlambda(pb): pb.resolve_port=resolve_port
- h.schedule_immediate(notlambda)
- s.write("250 RESOLVEPORT="+str(resolve_port)+" OK\r\n")
- except ValueError:
- s.write("510 Integer expected\r\n")
- elif command == "PERCENTFAST":
- try:
- if arg:
- percent_fast = int(arg)
- def notlambda(sm): sm.percent_fast=percent_fast
- h.schedule_selmgr(notlambda)
- s.write("250 PERCENTFAST="+str(percent_fast)+" OK\r\n")
- except ValueError:
- s.write("510 Integer expected\r\n")
- elif command == "PERCENTSKIP":
- try:
- if arg:
- percent_skip = int(arg)
- def notlambda(sm): sm.percent_skip=percent_skip
- h.schedule_selmgr(notlambda)
- s.write("250 PERCENTSKIP="+str(percent_skip)+" OK\r\n")
- except ValueError:
- s.write("510 Integer expected\r\n")
- elif command == "BWCUTOFF":
- try:
- if arg:
- min_bw = int(arg)
- def notlambda(sm): sm.min_bw=min_bw
- h.schedule_selmgr(notlambda)
- s.write("250 BWCUTOFF="+str(min_bw)+" OK\r\n")
- except ValueError:
- s.write("510 Integer expected\r\n")
- elif command == "UNIFORM":
- s.write("250 OK\r\n")
- elif command == "PATHLEN":
- try:
- if arg:
- pathlen = int(arg)
- # Technically this doesn't need a full selmgr update.. But
- # the user shouldn't be changing it very often..
- def notlambda(sm): sm.pathlen=pathlen
- h.schedule_selmgr(notlambda)
- s.write("250 PATHLEN="+str(pathlen)+" OK\r\n")
- except ValueError:
- s.write("510 Integer expected\r\n")
- elif command == "SETEXIT":
- if arg:
- # FIXME: Hrmm.. if teh user is a dumbass this will fail silently
- def notlambda(sm): sm.exit_name=arg
- h.schedule_selmgr(notlambda)
- s.write("250 OK\r\n")
- else:
- s.write("510 Argument expected\r\n")
- elif command == "GUARDNODES":
- s.write("250 OK\r\n")
- elif command == "SAVESTATS":
- if arg: filename = arg
- else: filename = "./data/stats-"+time.strftime("20%y-%m-%d-%H:%M:%S")
- def notlambda(this): this.write_stats(filename)
- h.schedule_low_prio(notlambda)
- s.write("250 OK\r\n")
- elif command == "RESETSTATS":
- def notlambda(this): this.reset_stats()
- h.schedule_low_prio(notlambda)
- s.write("250 OK\r\n")
- elif command == "HELP":
- s.write("250 OK\r\n")
- else:
- s.write("510 Guido slaps you for thinking '"+command+
- "' could possibly be a metatroller command\r\n")
- s.close()
+ s.write("220 Welcome to the Tor Metatroller "+mt_version+"! Try HELP for Info\r\n\r\n")
+ while 1:
+ buf = s.readline()
+ if not buf: break
+
+ m = re.search(r"^(\S+)(?:\s(\S+))?", buf)
+ if not m:
+ s.write("500 "+buf+" is not a metatroller command\r\n")
+ continue
+ (command, arg) = m.groups()
+ if command == "GETLASTEXIT":
+ # local assignment avoids need for lock w/ GIL
+ # http://effbot.org/pyfaq/can-t-we-get-rid-of-the-global-interpreter-lock.htm
+ # http://effbot.org/pyfaq/what-kinds-of-global-value-mutation-are-thread-safe.htm
+ le = h.last_exit
+ if le:
+ s.write("250 LASTEXIT=$"+le.idhex+" ("+le.nickname+") OK\r\n")
+ else:
+ s.write("250 LASTEXIT=0 (0) OK\r\n")
+ elif command == "NEWEXIT" or command == "NEWNYM":
+ clear_dns_cache(c)
+ h.new_nym = True # GIL hack
+ plog("DEBUG", "Got new nym")
+ s.write("250 NEWNYM OK\r\n")
+ elif command == "GETDNSEXIT":
+ pass # TODO: Takes a hostname? Or prints most recent?
+ elif command == "RESETSTATS":
+ s.write("250 OK\r\n")
+ elif command == "ORDEREXITS":
+ try:
+ if arg:
+ order_exits = int(arg)
+ def notlambda(sm): sm.order_exits=order_exits
+ h.schedule_selmgr(notlambda)
+ s.write("250 ORDEREXITS="+str(order_exits)+" OK\r\n")
+ except ValueError:
+ s.write("510 Integer expected\r\n")
+ elif command == "USEALLEXITS":
+ try:
+ if arg:
+ use_all_exits = int(arg)
+ def notlambda(sm): sm.use_all_exits=use_all_exits
+ h.schedule_selmgr(notlambda)
+ s.write("250 USEALLEXITS="+str(use_all_exits)+" OK\r\n")
+ except ValueError:
+ s.write("510 Integer expected\r\n")
+ elif command == "PRECIRCUITS":
+ try:
+ if arg:
+ num_circuits = int(arg)
+ def notlambda(pb): pb.num_circuits=num_circuits
+ h.schedule_immediate(notlambda)
+ s.write("250 PRECIRCUITS="+str(num_circuits)+" OK\r\n")
+ except ValueError:
+ s.write("510 Integer expected\r\n")
+ elif command == "RESOLVEPORT":
+ try:
+ if arg:
+ resolve_port = int(arg)
+ def notlambda(pb): pb.resolve_port=resolve_port
+ h.schedule_immediate(notlambda)
+ s.write("250 RESOLVEPORT="+str(resolve_port)+" OK\r\n")
+ except ValueError:
+ s.write("510 Integer expected\r\n")
+ elif command == "PERCENTFAST":
+ try:
+ if arg:
+ percent_fast = int(arg)
+ def notlambda(sm): sm.percent_fast=percent_fast
+ h.schedule_selmgr(notlambda)
+ s.write("250 PERCENTFAST="+str(percent_fast)+" OK\r\n")
+ except ValueError:
+ s.write("510 Integer expected\r\n")
+ elif command == "PERCENTSKIP":
+ try:
+ if arg:
+ percent_skip = int(arg)
+ def notlambda(sm): sm.percent_skip=percent_skip
+ h.schedule_selmgr(notlambda)
+ s.write("250 PERCENTSKIP="+str(percent_skip)+" OK\r\n")
+ except ValueError:
+ s.write("510 Integer expected\r\n")
+ elif command == "BWCUTOFF":
+ try:
+ if arg:
+ min_bw = int(arg)
+ def notlambda(sm): sm.min_bw=min_bw
+ h.schedule_selmgr(notlambda)
+ s.write("250 BWCUTOFF="+str(min_bw)+" OK\r\n")
+ except ValueError:
+ s.write("510 Integer expected\r\n")
+ elif command == "UNIFORM":
+ s.write("250 OK\r\n")
+ elif command == "PATHLEN":
+ try:
+ if arg:
+ pathlen = int(arg)
+ # Technically this doesn't need a full selmgr update.. But
+ # the user shouldn't be changing it very often..
+ def notlambda(sm): sm.pathlen=pathlen
+ h.schedule_selmgr(notlambda)
+ s.write("250 PATHLEN="+str(pathlen)+" OK\r\n")
+ except ValueError:
+ s.write("510 Integer expected\r\n")
+ elif command == "SETEXIT":
+ if arg:
+ # FIXME: Hrmm.. if teh user is a dumbass this will fail silently
+ def notlambda(sm): sm.exit_name=arg
+ h.schedule_selmgr(notlambda)
+ s.write("250 OK\r\n")
+ else:
+ s.write("510 Argument expected\r\n")
+ elif command == "GUARDNODES":
+ s.write("250 OK\r\n")
+ elif command == "SAVESTATS":
+ if arg: filename = arg
+ else: filename = "./data/stats-"+time.strftime("20%y-%m-%d-%H:%M:%S")
+ def notlambda(this): this.write_stats(filename)
+ h.schedule_low_prio(notlambda)
+ s.write("250 OK\r\n")
+ elif command == "RESETSTATS":
+ def notlambda(this): this.reset_stats()
+ h.schedule_low_prio(notlambda)
+ s.write("250 OK\r\n")
+ elif command == "HELP":
+ s.write("250 OK\r\n")
+ else:
+ s.write("500 "+buf+" is not a metatroller command\r\n")
+ s.close()
def cleanup(c, s):
- c.set_option("__LeaveStreamsUnattached", "0")
- s.close()
+ c.set_option("__LeaveStreamsUnattached", "0")
+ s.close()
def listenloop(c, h):
- """Loop that handles metatroller commands"""
- srv = ListenSocket(meta_host, meta_port)
- atexit.register(cleanup, *(c, srv))
- while 1:
- client = srv.accept()
- if not client: break
- thr = threading.Thread(None, lambda: commandloop(BufSock(client), c, h))
- thr.run()
- srv.close()
+ """Loop that handles metatroller commands"""
+ srv = ListenSocket(meta_host, meta_port)
+ atexit.register(cleanup, *(c, srv))
+ while 1:
+ client = srv.accept()
+ if not client: break
+ thr = threading.Thread(None, lambda: commandloop(BufSock(client), c, h))
+ thr.run()
+ srv.close()
def startup():
- s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
- s.connect((control_host,control_port))
- c = PathSupport.Connection(s)
- c.debug(file("control.log", "w"))
- c.authenticate()
- h = StatsHandler(c, __selmgr)
- c.set_event_handler(h)
- c.set_events([TorCtl.EVENT_TYPE.STREAM,
- TorCtl.EVENT_TYPE.NS,
- TorCtl.EVENT_TYPE.CIRC,
- TorCtl.EVENT_TYPE.NEWDESC], True)
- c.set_option("__LeaveStreamsUnattached", "1")
- return (c,h)
+ s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+ s.connect((control_host,control_port))
+ c = PathSupport.Connection(s)
+ c.debug(file("control.log", "w"))
+ c.authenticate()
+ h = StatsHandler(c, __selmgr)
+ c.set_event_handler(h)
+ c.set_events([TorCtl.EVENT_TYPE.STREAM,
+ TorCtl.EVENT_TYPE.NS,
+ TorCtl.EVENT_TYPE.CIRC,
+ TorCtl.EVENT_TYPE.NEWDESC], True)
+ c.set_option("__LeaveStreamsUnattached", "1")
+ return (c,h)
def main(argv):
- listenloop(*startup())
+ listenloop(*startup())
if __name__ == '__main__':
- main(sys.argv)
+ main(sys.argv)
Modified: torflow/trunk/nodemon.py
===================================================================
--- torflow/trunk/nodemon.py 2007-03-12 21:07:19 UTC (rev 9806)
+++ torflow/trunk/nodemon.py 2007-03-13 01:36:12 UTC (rev 9807)
@@ -15,26 +15,26 @@
import thread
class Reason:
- def __init__(self, reason): self.reason = reason
- ncircs = 0
- count = 0
+ def __init__(self, reason): self.reason = reason
+ ncircs = 0
+ count = 0
class RouterStats(TorCtl.Router):
- # Behold, a "Promotion Constructor"!
- # Also allows null superclasses! Python is awesome
- def __init__(self, r=None):
- if r:
- self.__dict__ = r.__dict__
- else:
- self.down = 0
- self.reasons = {} # For a fun time, move this outside __init__
- tot_ncircs = 0
- tot_count = 0
- tot_read = 0
- tot_wrote = 0
- running_read = 0
- running_wrote = 0
- tot_age = 0
+ # Behold, a "Promotion Constructor"!
+ # Also allows null superclasses! Python is awesome
+ def __init__(self, r=None):
+ if r:
+ self.__dict__ = r.__dict__
+ else:
+ self.down = 0
+ self.reasons = {} # For a fun time, move this outside __init__
+ tot_ncircs = 0
+ tot_count = 0
+ tot_read = 0
+ tot_wrote = 0
+ running_read = 0
+ running_wrote = 0
+ tot_age = 0
errors = {}
errors_lock = thread.allocate_lock()
@@ -47,179 +47,179 @@
max_detach = 3
def read_routers(c, nslist):
- bad_key = 0
- errors_lock.acquire()
- for ns in nslist:
- try:
- key_to_name[ns.idhex] = ns.nickname
- name_to_key[ns.nickname] = ns.idhex
- r = RouterStats(c.get_router(ns))
- if ns.nickname in errors:
- if errors[ns.nickname].idhex != r.idhex:
- plog("NOTICE", "Router "+r.nickname+" has multiple keys: "
- +errors[ns.nickname].idhex+" and "+r.idhex)
- errors[r.nickname] = r # XXX: We get names only from ORCONN :(
- except TorCtl.ErrorReply:
- bad_key += 1
- if "Running" in ns.flags:
- plog("INFO", "Running router "+ns.nickname+"="
- +ns.idhex+" has no descriptor")
- pass
- except:
- traceback.print_exception(*sys.exc_info())
- continue
- errors_lock.release()
+ bad_key = 0
+ errors_lock.acquire()
+ for ns in nslist:
+ try:
+ key_to_name[ns.idhex] = ns.nickname
+ name_to_key[ns.nickname] = ns.idhex
+ r = RouterStats(c.get_router(ns))
+ if ns.nickname in errors:
+ if errors[ns.nickname].idhex != r.idhex:
+ plog("NOTICE", "Router "+r.nickname+" has multiple keys: "
+ +errors[ns.nickname].idhex+" and "+r.idhex)
+ errors[r.nickname] = r # XXX: We get names only from ORCONN :(
+ except TorCtl.ErrorReply:
+ bad_key += 1
+ if "Running" in ns.flags:
+ plog("INFO", "Running router "+ns.nickname+"="
+ +ns.idhex+" has no descriptor")
+ pass
+ except:
+ traceback.print_exception(*sys.exc_info())
+ continue
+ errors_lock.release()
# Make eventhandler
class NodeHandler(TorCtl.EventHandler):
- def __init__(self, c):
- TorCtl.EventHandler.__init__(self)
- self.c = c
+ def __init__(self, c):
+ TorCtl.EventHandler.__init__(self)
+ self.c = c
- def or_conn_status_event(self, o):
- # XXX: Count all routers as one?
- if re.search(r"^\$", o.endpoint):
- if o.endpoint not in key_to_name:
- o.endpoint = "AllClients:HASH"
- else: o.endpoint = key_to_name[o.endpoint]
- elif o.endpoint not in name_to_key:
- plog("DEBUG", "IP? " + o.endpoint)
- o.endpoint = "AllClients:IP"
+ def or_conn_status_event(self, o):
+ # XXX: Count all routers as one?
+ if re.search(r"^\$", o.endpoint):
+ if o.endpoint not in key_to_name:
+ o.endpoint = "AllClients:HASH"
+ else: o.endpoint = key_to_name[o.endpoint]
+ elif o.endpoint not in name_to_key:
+ plog("DEBUG", "IP? " + o.endpoint)
+ o.endpoint = "AllClients:IP"
- if o.status == "READ" or o.status == "WRITE":
- #plog("DEBUG", "Read: " + str(read) + " wrote: " + str(wrote))
- errors_lock.acquire()
- if o.endpoint not in errors:
- plog("NOTICE", "Buh?? No "+o.endpoint)
- errors[o.endpoint] = RouterStats()
- errors[o.endpoint].nickname = o.endpoint
- errors[o.endpoint].running_read += o.read_bytes
- errors[o.endpoint].running_wrote += o.wrote_bytes
- errors_lock.release()
+ if o.status == "READ" or o.status == "WRITE":
+ #plog("DEBUG", "Read: " + str(read) + " wrote: " + str(wrote))
+ errors_lock.acquire()
+ if o.endpoint not in errors:
+ plog("NOTICE", "Buh?? No "+o.endpoint)
+ errors[o.endpoint] = RouterStats()
+ errors[o.endpoint].nickname = o.endpoint
+ errors[o.endpoint].running_read += o.read_bytes
+ errors[o.endpoint].running_wrote += o.wrote_bytes
+ errors_lock.release()
-
- if o.status == "CLOSED" or o.status == "FAILED":
- errors_lock.acquire()
- if o.endpoint not in errors:
- plog("NOTICE", "Buh?? No "+o.endpoint)
- errors[o.endpoint] = RouterStats()
- errors[o.endpoint].nickname = o.endpoint
- if o.status == "FAILED" and not errors[o.endpoint].down:
- o.status = o.status + "(Running)"
- o.reason = o.status+":"+o.reason
- if o.reason not in errors[o.endpoint].reasons:
- errors[o.endpoint].reasons[o.reason] = Reason(o.reason)
- errors[o.endpoint].reasons[o.reason].ncircs += o.ncircs
- errors[o.endpoint].reasons[o.reason].count += 1
- errors[o.endpoint].tot_ncircs += o.ncircs
- errors[o.endpoint].tot_count += 1
- if o.age: errors[o.endpoint].tot_age += o.age
- if o.read_bytes: errors[o.endpoint].tot_read += o.read_bytes
- if o.wrote_bytes: errors[o.endpoint].tot_wrote += o.wrote_bytes
- errors_lock.release()
- else: return
+
+ if o.status == "CLOSED" or o.status == "FAILED":
+ errors_lock.acquire()
+ if o.endpoint not in errors:
+ plog("NOTICE", "Buh?? No "+o.endpoint)
+ errors[o.endpoint] = RouterStats()
+ errors[o.endpoint].nickname = o.endpoint
+ if o.status == "FAILED" and not errors[o.endpoint].down:
+ o.status = o.status + "(Running)"
+ o.reason = o.status+":"+o.reason
+ if o.reason not in errors[o.endpoint].reasons:
+ errors[o.endpoint].reasons[o.reason] = Reason(o.reason)
+ errors[o.endpoint].reasons[o.reason].ncircs += o.ncircs
+ errors[o.endpoint].reasons[o.reason].count += 1
+ errors[o.endpoint].tot_ncircs += o.ncircs
+ errors[o.endpoint].tot_count += 1
+ if o.age: errors[o.endpoint].tot_age += o.age
+ if o.read_bytes: errors[o.endpoint].tot_read += o.read_bytes
+ if o.wrote_bytes: errors[o.endpoint].tot_wrote += o.wrote_bytes
+ errors_lock.release()
+ else: return
- if o.age: age = "AGE="+str(o.age)
- else: age = ""
- if o.read_bytes: read = "READ="+str(o.read_bytes)
- else: read = ""
- if o.wrote_bytes: wrote = "WRITTEN="+str(o.wrote_bytes)
- else: wrote = ""
- if o.reason: reason = "REASON="+o.reason
- else: reason = ""
- if o.ncircs: ncircs = "NCIRCS="+str(o.ncircs)
- else: ncircs = ""
- plog("DEBUG",
- " ".join((o.event_name, o.endpoint, o.status, age, read, wrote,
- reason, ncircs)))
+ if o.age: age = "AGE="+str(o.age)
+ else: age = ""
+ if o.read_bytes: read = "READ="+str(o.read_bytes)
+ else: read = ""
+ if o.wrote_bytes: wrote = "WRITTEN="+str(o.wrote_bytes)
+ else: wrote = ""
+ if o.reason: reason = "REASON="+o.reason
+ else: reason = ""
+ if o.ncircs: ncircs = "NCIRCS="+str(o.ncircs)
+ else: ncircs = ""
+ plog("DEBUG",
+ " ".join((o.event_name, o.endpoint, o.status, age, read, wrote,
+ reason, ncircs)))
- def ns_event(self, n):
- read_routers(self.c, n.nslist)
+ def ns_event(self, n):
+ read_routers(self.c, n.nslist)
- def new_desc_event(self, d):
- for i in d.idlist: # Is this too slow?
- read_routers(self.c, self.c.get_network_status("id/"+i))
+ def new_desc_event(self, d):
+ for i in d.idlist: # Is this too slow?
+ read_routers(self.c, self.c.get_network_status("id/"+i))
def bw_stats(key, f):
- routers = errors.values()
- routers.sort(lambda x,y: cmp(key(y), key(x))) # Python < 2.4 hack
+ routers = errors.values()
+ routers.sort(lambda x,y: cmp(key(y), key(x))) # Python < 2.4 hack
- for r in routers:
- f.write(r.nickname+"="+str(key(r))+"\n")
+ for r in routers:
+ f.write(r.nickname+"="+str(key(r))+"\n")
+
+ f.close()
+
- f.close()
-
-
def save_stats(s):
- errors_lock.acquire()
- # Yes yes, adding + 0.005 to age is bloody.. but who cares,
- # 1. Routers sorted by bytes read
- bw_stats(lambda x: x.tot_read, file("./data/r_by_rbytes", "w"))
- # 2. Routers sorted by bytes written
- bw_stats(lambda x: x.tot_wrote, file("./data/r_by_wbytes", "w"))
- # 3. Routers sorted by tot bytes
- bw_stats(lambda x: x.tot_read+x.tot_wrote, file("./data/r_by_tbytes", "w"))
- # 4. Routers sorted by downstream bw
- bw_stats(lambda x: x.tot_read/(x.tot_age+0.005),
- file("./data/r_by_rbw", "w"))
- # 5. Routers sorted by upstream bw
- bw_stats(lambda x: x.tot_wrote/(x.tot_age+0.005), file("./data/r_by_wbw", "w"))
- # 6. Routers sorted by total bw
- bw_stats(lambda x: (x.tot_read+x.tot_wrote)/(x.tot_age+0.005),
- file("./data/r_by_tbw", "w"))
+ errors_lock.acquire()
+ # Yes yes, adding + 0.005 to age is bloody.. but who cares,
+ # 1. Routers sorted by bytes read
+ bw_stats(lambda x: x.tot_read, file("./data/r_by_rbytes", "w"))
+ # 2. Routers sorted by bytes written
+ bw_stats(lambda x: x.tot_wrote, file("./data/r_by_wbytes", "w"))
+ # 3. Routers sorted by tot bytes
+ bw_stats(lambda x: x.tot_read+x.tot_wrote, file("./data/r_by_tbytes", "w"))
+ # 4. Routers sorted by downstream bw
+ bw_stats(lambda x: x.tot_read/(x.tot_age+0.005),
+ file("./data/r_by_rbw", "w"))
+ # 5. Routers sorted by upstream bw
+ bw_stats(lambda x: x.tot_wrote/(x.tot_age+0.005), file("./data/r_by_wbw", "w"))
+ # 6. Routers sorted by total bw
+ bw_stats(lambda x: (x.tot_read+x.tot_wrote)/(x.tot_age+0.005),
+ file("./data/r_by_tbw", "w"))
- bw_stats(lambda x: x.running_read,
- file("./data/r_by_rrunbytes", "w"))
- bw_stats(lambda x: x.running_wrote,
- file("./data/r_by_wrunbytes", "w"))
- bw_stats(lambda x: x.running_read+x.running_wrote,
- file("./data/r_by_trunbytes", "w"))
-
-
- f = file("./data/reasons", "w")
- routers = errors.values()
- def notlambda(x, y):
- if y.tot_ncircs or x.tot_ncircs:
- return cmp(y.tot_ncircs, x.tot_ncircs)
- else:
- return cmp(y.tot_count, x.tot_count)
- routers.sort(notlambda)
+ bw_stats(lambda x: x.running_read,
+ file("./data/r_by_rrunbytes", "w"))
+ bw_stats(lambda x: x.running_wrote,
+ file("./data/r_by_wrunbytes", "w"))
+ bw_stats(lambda x: x.running_read+x.running_wrote,
+ file("./data/r_by_trunbytes", "w"))
+
+
+ f = file("./data/reasons", "w")
+ routers = errors.values()
+ def notlambda(x, y):
+ if y.tot_ncircs or x.tot_ncircs:
+ return cmp(y.tot_ncircs, x.tot_ncircs)
+ else:
+ return cmp(y.tot_count, x.tot_count)
+ routers.sort(notlambda)
- for r in routers:
- f.write(r.nickname+" " +str(r.tot_ncircs)+"/"+str(r.tot_count)+"\n")
- for reason in r.reasons.itervalues():
- f.write("\t"+reason.reason+" "+str(reason.ncircs)+
- "/"+str(reason.count)+"\n")
+ for r in routers:
+ f.write(r.nickname+" " +str(r.tot_ncircs)+"/"+str(r.tot_count)+"\n")
+ for reason in r.reasons.itervalues():
+ f.write("\t"+reason.reason+" "+str(reason.ncircs)+
+ "/"+str(reason.count)+"\n")
- errors_lock.release()
- f.close()
- s.enter(60, 1, save_stats, (s,))
+ errors_lock.release()
+ f.close()
+ s.enter(60, 1, save_stats, (s,))
def startmon(c):
- global key_to_name, name_to_key
- nslist = c.get_network_status()
- read_routers(c, nslist)
-
- s=sched.scheduler(time.time, time.sleep)
+ global key_to_name, name_to_key
+ nslist = c.get_network_status()
+ read_routers(c, nslist)
+
+ s=sched.scheduler(time.time, time.sleep)
- s.enter(60, 1, save_stats, (s,))
- s.run();
+ s.enter(60, 1, save_stats, (s,))
+ s.run();
def main(argv):
- s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
- s.connect((control_host,control_port))
- c = TorCtl.get_connection(s)
- c.set_event_handler(NodeHandler(c))
- c.launch_thread()
- c.authenticate()
- c.set_events([TorCtl.EVENT_TYPE.ORCONN,
- TorCtl.EVENT_TYPE.NS,
- TorCtl.EVENT_TYPE.NEWDESC], True)
- startmon(c)
+ s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+ s.connect((control_host,control_port))
+ c = TorCtl.get_connection(s)
+ c.set_event_handler(NodeHandler(c))
+ c.launch_thread()
+ c.authenticate()
+ c.set_events([TorCtl.EVENT_TYPE.ORCONN,
+ TorCtl.EVENT_TYPE.NS,
+ TorCtl.EVENT_TYPE.NEWDESC], True)
+ startmon(c)
if __name__ == '__main__':
- main(sys.argv)
+ main(sys.argv)