[Author Prev][Author Next][Thread Prev][Thread Next][Author Index][Thread Index]

[or-cvs] r18570: {torctl} Refactor new consensensus tracking code into a TorCtl.Consen (torctl/trunk/python/TorCtl)



Author: mikeperry
Date: 2009-02-16 06:45:43 -0500 (Mon, 16 Feb 2009)
New Revision: 18570

Modified:
   torctl/trunk/python/TorCtl/PathSupport.py
   torctl/trunk/python/TorCtl/StatsSupport.py
   torctl/trunk/python/TorCtl/TorCtl.py
Log:

Refactor new consensensus tracking code into a 
TorCtl.ConsensusTracker.



Modified: torctl/trunk/python/TorCtl/PathSupport.py
===================================================================
--- torctl/trunk/python/TorCtl/PathSupport.py	2009-02-16 11:23:10 UTC (rev 18569)
+++ torctl/trunk/python/TorCtl/PathSupport.py	2009-02-16 11:45:43 UTC (rev 18570)
@@ -48,7 +48,6 @@
 import Queue
 import time
 import TorUtil
-import sets
 from TorUtil import *
 
 __all__ = ["NodeRestrictionList", "PathRestrictionList",
@@ -1025,7 +1024,7 @@
 # node reliability stats for normal usage without us attaching streams
 # Can use __metaclass__ and type
 
-class PathBuilder(TorCtl.EventHandler):
+class PathBuilder(TorCtl.ConsensusTracker):
   """
   PathBuilder implementation. Handles circuit construction, subject
   to the constraints of the SelectionManager selmgr.
@@ -1038,27 +1037,20 @@
     """Constructor. 'c' is a Connection, 'selmgr' is a SelectionManager,
     and 'RouterClass' is a class that inherits from Router and is used
     to create annotated Routers."""
-    TorCtl.EventHandler.__init__(self)
-    self.c = c
-    nslist = c.get_network_status()
+    TorCtl.ConsensusTracker.__init__(self, c, RouterClass)
     self.last_exit = None
     self.new_nym = False
     self.resolve_port = 0
     self.num_circuits = 1
-    self.RouterClass = RouterClass
-    self.sorted_r = []
-    self.name_to_key = {}
-    self.routers = {}
     self.circuits = {}
     self.streams = {}
-    self.read_routers(nslist)
     self.selmgr = selmgr
     self.selmgr.reconfigure(self.sorted_r)
     self.imm_jobs = Queue.Queue()
     self.low_prio_jobs = Queue.Queue()
     self.run_all_jobs = False
     self.do_reconfigure = False
-    plog("INFO", "Read "+str(len(self.sorted_r))+"/"+str(len(nslist))+" routers")
+    plog("INFO", "Read "+str(len(self.sorted_r))+"/"+str(len(self.consensus))+" routers")
 
   def schedule_immediate(self, job):
     """
@@ -1119,44 +1111,6 @@
       delay_job = self.low_prio_jobs.get_nowait()
       delay_job(self)
 
-  def read_routers(self, nslist):
-    old_idhexes = sets.Set(self.routers.keys())
-    new_idhexes = sets.Set(map(lambda ns: ns.idhex, nslist))
-    removed_idhexes = old_idhexes - new_idhexes
-    removed_idhexes.union_update(sets.Set(map(lambda ns: ns.idhex,
-                      filter(lambda ns: "Running" not in ns.flags, nslist))))
-
-    for i in removed_idhexes:
-      if i not in self.routers: continue
-      self.routers[i].down = True
-      self.routers[i].flags.remove("Running")
-      if self.routers[i].refcount == 0:
-        self.routers[i].deleted = True
-        plog("INFO", "Expiring non-running router "+i)
-        self.sorted_r.remove(self.routers[i])
-        del self.routers[i]
-      else:
-        plog("INFO", "Postponing expiring non-running router "+i)
-        self.routers[i].deleted = True
-
-    routers = self.c.read_routers(nslist)
-    for r in routers:
-      self.name_to_key[r.nickname] = "$"+r.idhex
-      if r.idhex in self.routers:
-        if self.routers[r.idhex].nickname != r.nickname:
-          plog("NOTICE", "Router "+r.idhex+" changed names from "
-             +self.routers[r.idhex].nickname+" to "+r.nickname)
-        # Must do IN-PLACE update to keep all the refs to this router
-        # valid and current (especially for stats)
-        self.routers[r.idhex].update_to(r)
-      else:
-        rc = self.RouterClass(r)
-        self.routers[rc.idhex] = rc
-
-    self.sorted_r = filter(lambda r: not r.down, self.routers.itervalues())
-    self.sorted_r.sort(lambda x, y: cmp(y.bw, x.bw))
-    for i in xrange(len(self.sorted_r)): self.sorted_r[i].list_rank = i
-
   def build_path(self):
     """ Get a path from the SelectionManager's PathSelector, can be used 
         e.g. for generating paths without actually creating any circuits """
@@ -1395,12 +1349,14 @@
       self.streams[s.strm_id].bytes_read += s.bytes_read
       self.streams[s.strm_id].bytes_written += s.bytes_written
 
-  def newconsensus_event(self, n):
-    self.read_routers(n.nslist)
+  def new_consensus_event(self, n):
+    TorCtl.ConsensusTracker.new_desc_event(self, n)
     self.selmgr.path_selector.rebuild_gens(self.sorted_r)
-    plog("DEBUG", "Read " + str(len(n.nslist))+" NC => " 
-       + str(len(self.sorted_r)) + " routers")
-  
+
+  def new_desc_event(self, d):
+    if TorCtl.ConsensusTracker.new_desc_event(self, d):
+      self.selmgr.path_selector.rebuild_gens(self.sorted_r)
+
   def bandwidth_event(self, b): pass # For heartbeat only..
 
 ################### CircuitHandler #############################

Modified: torctl/trunk/python/TorCtl/StatsSupport.py
===================================================================
--- torctl/trunk/python/TorCtl/StatsSupport.py	2009-02-16 11:23:10 UTC (rev 18569)
+++ torctl/trunk/python/TorCtl/StatsSupport.py	2009-02-16 11:45:43 UTC (rev 18570)
@@ -677,21 +677,30 @@
           self.count_stream_reason_failed(s, reason)
     PathBuilder.stream_status_event(self, s)
 
-  def newconsensus_event(self, n):
-    PathBuilder.newconsensus_event(self, n)
+  def _check_hibernation(self, r, now):
+    if r.down:
+      if not r.hibernated_at:
+        r.hibernated_at = now
+        r.total_active_uptime += now - r.became_active_at
+      r.became_active_at = 0
+    else:
+      if not r.became_active_at:
+        r.became_active_at = now
+        r.total_hibernation_time += now - r.hibernated_at
+      r.hibernated_at = 0
+
+  def new_consensus_event(self, n):
+    PathBuilder.new_consensus_event(self, n)
     now = n.arrived_at
     for ns in n.nslist:
-      if not ns.idhex in self.routers:
-        continue
-      r = self.routers[ns.idhex]
-      if r.down:
-        if not r.hibernated_at:
-          r.hibernated_at = now
-          r.total_active_uptime += now - r.became_active_at
-        r.became_active_at = 0
-      else:
-        if not r.became_active_at:
-          r.became_active_at = now
-          r.total_hibernation_time += now - r.hibernated_at
-        r.hibernated_at = 0
+      if not ns.idhex in self.routers: continue
+      self._check_hibernation(self.routers[ns.idhex], now)
 
+  def new_desc_event(self, d):
+    if PathBuilder.new_desc_event(self, d):
+      now = d.arrived_at
+      for i in d.idlist:
+        if not i in self.routers: continue
+        self._check_hibernation(self.routers[i], now)
+      
+   

Modified: torctl/trunk/python/TorCtl/TorCtl.py
===================================================================
--- torctl/trunk/python/TorCtl/TorCtl.py	2009-02-16 11:23:10 UTC (rev 18569)
+++ torctl/trunk/python/TorCtl/TorCtl.py	2009-02-16 11:45:43 UTC (rev 18570)
@@ -27,7 +27,7 @@
            "EventHandler", "DebugEventHandler", "NetworkStatusEvent",
            "NewDescEvent", "CircuitEvent", "StreamEvent", "ORConnEvent",
            "StreamBwEvent", "LogEvent", "AddrMapEvent", "BWEvent",
-           "UnknownEvent" ]
+           "UnknownEvent", "ConsensusTracker" ]
 
 import os
 import re
@@ -42,6 +42,8 @@
 import types
 import time
 import sha
+import sets
+import copy
 from TorUtil import *
 
 # Types of "EVENT" message.
@@ -257,7 +259,13 @@
   created from the parsed fields, or can be built from a
   descriptor+NetworkStatus 
   """     
-  def __init__(self, idhex, name, bw, down, exitpolicy, flags, ip, version, os, uptime):
+  def __init__(self, *args):
+    if len(args) == 1:
+      for i in args[0].__dict__:
+        self.__dict__[i] =  copy.deepcopy(args[0].__dict__[i])
+      return
+    else:
+      (idhex, name, bw, down, exitpolicy, flags, ip, version, os, uptime) = args
     self.idhex = idhex
     self.nickname = name
     self.bw = bw
@@ -694,8 +702,10 @@
     sig_start = desc.find("\nrouter-signature\n")+len("\nrouter-signature\n")
     fp_base64 = sha.sha(desc[:sig_start]).digest().encode("base64")[:-2]
     if fp_base64 != ns.orhash:
-      plog("WARN", "Router descriptor for "+ns.idhex+" does not match ns fingerprint ("+ns.orhash+" vs "+fp_base64+")")
-    return Router.build_from_desc(desc.split("\n"), ns)
+      plog("NOTICE", "Router descriptor for "+ns.idhex+" does not match ns fingerprint ("+ns.orhash+" vs "+fp_base64+")")
+      return None
+    else:
+      return Router.build_from_desc(desc.split("\n"), ns)
 
 
   def read_routers(self, nslist):
@@ -707,7 +717,7 @@
     for ns in nslist:
       try:
         r = self.get_router(ns)
-        new.append(r)
+        if r: new.append(r)
       except ErrorReply:
         bad_key += 1
         if "Running" in ns.flags:
@@ -873,7 +883,7 @@
       "NEWDESC" : self.new_desc_event,
       "ADDRMAP" : self.address_mapped_event,
       "NS" : self.ns_event,
-      "NEWCONSENSUS" : self.newconsensus_event
+      "NEWCONSENSUS" : self.new_consensus_event
       }
 
   def _handle1(self, timestamp, lines):
@@ -1042,7 +1052,7 @@
   def ns_event(self, event):
     raise NotImplemented()
 
-  def newconsensus_event(self, event):
+  def new_consensus_event(self, event):
     raise NotImplemented()
 
   def address_mapped_event(self, event):
@@ -1051,7 +1061,98 @@
     """
     raise NotImplemented()
 
+class ConsensusTracker(EventHandler):
+  """
+  A ConsensusTracker is an EventHandler that tracks the current
+  consensus of Tor in self.consensus, self.routers and self.sorted_r
+  """
+  def __init__(self, c, RouterClass=Router):
+    EventHandler.__init__(self)
+    c.set_event_handler(self)
+    self.c = c
+    self.consensus = {}
+    self.routers = {}
+    self.sorted_r = []
+    self.name_to_key = {}
+    self.RouterClass = RouterClass
+    self.update_consensus()
+    self._read_routers(self.consensus.values())
 
+  def _read_routers(self, nslist):
+    old_idhexes = sets.Set(self.routers.keys())
+    new_idhexes = sets.Set(map(lambda ns: ns.idhex, nslist))
+    removed_idhexes = old_idhexes - new_idhexes
+    removed_idhexes.union_update(sets.Set(map(lambda ns: ns.idhex,
+                      filter(lambda ns: "Running" not in ns.flags, nslist))))
+
+    for i in removed_idhexes:
+      if i not in self.routers: continue
+      self.routers[i].down = True
+      self.routers[i].flags.remove("Running")
+      if self.routers[i].refcount == 0:
+        self.routers[i].deleted = True
+        plog("INFO", "Expiring non-running router "+i)
+        self.sorted_r.remove(self.routers[i])
+        del self.routers[i]
+      else:
+        plog("INFO", "Postponing expiring non-running router "+i)
+        self.routers[i].deleted = True
+
+    routers = self.c.read_routers(nslist)
+    for r in routers:
+      self.name_to_key[r.nickname] = "$"+r.idhex
+      if r.idhex in self.routers:
+        if self.routers[r.idhex].nickname != r.nickname:
+          plog("NOTICE", "Router "+r.idhex+" changed names from "
+             +self.routers[r.idhex].nickname+" to "+r.nickname)
+        # Must do IN-PLACE update to keep all the refs to this router
+        # valid and current (especially for stats)
+        self.routers[r.idhex].update_to(r)
+      else:
+        rc = self.RouterClass(r)
+        self.routers[rc.idhex] = rc
+
+    self.sorted_r = filter(lambda r: not r.down, self.routers.itervalues())
+    self.sorted_r.sort(lambda x, y: cmp(y.bw, x.bw))
+    for i in xrange(len(self.sorted_r)): self.sorted_r[i].list_rank = i
+
+  def _update_consensus(self, nslist):
+    self.consensus = {}
+    for n in nslist:
+      self.consensus[n.idhex] = n
+   
+  def update_consensus(self):
+    self._update_consensus(self.c.get_network_status())
+
+  def new_consensus_event(self, n):
+    self._update_consensus(n.nslist)
+    self._read_routers(self.consensus.values())
+    plog("DEBUG", "Read " + str(len(n.nslist))+" NC => " 
+       + str(len(self.sorted_r)) + " routers")
+ 
+  def new_desc_event(self, d):
+    update = False
+    for i in d.idlist:
+      ns = self.c.get_network_status("id/"+i)
+      r = self.c.read_router(ns)
+      if r and r.idhex in self.consensus:
+        if ns.orhash != self.consensus[r.idhex].orhash:
+          plog("WARN", "Getinfo and consensus disagree for "+r.idhex)
+          continue
+        update = True
+        if r.idhex in self.routers:
+          self.routers[r.idhex].update_to(r)
+        else:
+          self.routers[r.idhex] = self.RouterClass(r)
+    if update:
+      self.sorted_r = filter(lambda r: not r.down, self.routers.itervalues())
+      self.sorted_r.sort(lambda x, y: cmp(y.bw, x.bw))
+      for i in xrange(len(self.sorted_r)): self.sorted_r[i].list_rank = i
+    plog("DEBUG", "Read " + str(len(d.idlist))+" ND => " 
+       + str(len(self.sorted_r)) + " routers. Update: "+str(update))
+    return update
+
+
 class DebugEventHandler(EventHandler):
   """Trivial debug event handler: reassembles all parsed events to stdout."""
   def circ_status_event(self, circ_event): # CircuitEvent()
@@ -1081,7 +1182,7 @@
         ns.updated.isoformat(), ns.ip, str(ns.orport),
         str(ns.dirport), " ".join(ns.flags)))
 
-  def newconsensus_event(self, nc_event):
+  def new_consensus_event(self, nc_event):
     self.ns_event(nc_event)
 
   def new_desc_event(self, newdesc_event):