[Author Prev][Author Next][Thread Prev][Thread Next][Author Index][Thread Index]
[or-cvs] r18570: {torctl} Refactor new consensensus tracking code into a TorCtl.Consen (torctl/trunk/python/TorCtl)
Author: mikeperry
Date: 2009-02-16 06:45:43 -0500 (Mon, 16 Feb 2009)
New Revision: 18570
Modified:
torctl/trunk/python/TorCtl/PathSupport.py
torctl/trunk/python/TorCtl/StatsSupport.py
torctl/trunk/python/TorCtl/TorCtl.py
Log:
Refactor new consensensus tracking code into a
TorCtl.ConsensusTracker.
Modified: torctl/trunk/python/TorCtl/PathSupport.py
===================================================================
--- torctl/trunk/python/TorCtl/PathSupport.py 2009-02-16 11:23:10 UTC (rev 18569)
+++ torctl/trunk/python/TorCtl/PathSupport.py 2009-02-16 11:45:43 UTC (rev 18570)
@@ -48,7 +48,6 @@
import Queue
import time
import TorUtil
-import sets
from TorUtil import *
__all__ = ["NodeRestrictionList", "PathRestrictionList",
@@ -1025,7 +1024,7 @@
# node reliability stats for normal usage without us attaching streams
# Can use __metaclass__ and type
-class PathBuilder(TorCtl.EventHandler):
+class PathBuilder(TorCtl.ConsensusTracker):
"""
PathBuilder implementation. Handles circuit construction, subject
to the constraints of the SelectionManager selmgr.
@@ -1038,27 +1037,20 @@
"""Constructor. 'c' is a Connection, 'selmgr' is a SelectionManager,
and 'RouterClass' is a class that inherits from Router and is used
to create annotated Routers."""
- TorCtl.EventHandler.__init__(self)
- self.c = c
- nslist = c.get_network_status()
+ TorCtl.ConsensusTracker.__init__(self, c, RouterClass)
self.last_exit = None
self.new_nym = False
self.resolve_port = 0
self.num_circuits = 1
- self.RouterClass = RouterClass
- self.sorted_r = []
- self.name_to_key = {}
- self.routers = {}
self.circuits = {}
self.streams = {}
- self.read_routers(nslist)
self.selmgr = selmgr
self.selmgr.reconfigure(self.sorted_r)
self.imm_jobs = Queue.Queue()
self.low_prio_jobs = Queue.Queue()
self.run_all_jobs = False
self.do_reconfigure = False
- plog("INFO", "Read "+str(len(self.sorted_r))+"/"+str(len(nslist))+" routers")
+ plog("INFO", "Read "+str(len(self.sorted_r))+"/"+str(len(self.consensus))+" routers")
def schedule_immediate(self, job):
"""
@@ -1119,44 +1111,6 @@
delay_job = self.low_prio_jobs.get_nowait()
delay_job(self)
- def read_routers(self, nslist):
- old_idhexes = sets.Set(self.routers.keys())
- new_idhexes = sets.Set(map(lambda ns: ns.idhex, nslist))
- removed_idhexes = old_idhexes - new_idhexes
- removed_idhexes.union_update(sets.Set(map(lambda ns: ns.idhex,
- filter(lambda ns: "Running" not in ns.flags, nslist))))
-
- for i in removed_idhexes:
- if i not in self.routers: continue
- self.routers[i].down = True
- self.routers[i].flags.remove("Running")
- if self.routers[i].refcount == 0:
- self.routers[i].deleted = True
- plog("INFO", "Expiring non-running router "+i)
- self.sorted_r.remove(self.routers[i])
- del self.routers[i]
- else:
- plog("INFO", "Postponing expiring non-running router "+i)
- self.routers[i].deleted = True
-
- routers = self.c.read_routers(nslist)
- for r in routers:
- self.name_to_key[r.nickname] = "$"+r.idhex
- if r.idhex in self.routers:
- if self.routers[r.idhex].nickname != r.nickname:
- plog("NOTICE", "Router "+r.idhex+" changed names from "
- +self.routers[r.idhex].nickname+" to "+r.nickname)
- # Must do IN-PLACE update to keep all the refs to this router
- # valid and current (especially for stats)
- self.routers[r.idhex].update_to(r)
- else:
- rc = self.RouterClass(r)
- self.routers[rc.idhex] = rc
-
- self.sorted_r = filter(lambda r: not r.down, self.routers.itervalues())
- self.sorted_r.sort(lambda x, y: cmp(y.bw, x.bw))
- for i in xrange(len(self.sorted_r)): self.sorted_r[i].list_rank = i
-
def build_path(self):
""" Get a path from the SelectionManager's PathSelector, can be used
e.g. for generating paths without actually creating any circuits """
@@ -1395,12 +1349,14 @@
self.streams[s.strm_id].bytes_read += s.bytes_read
self.streams[s.strm_id].bytes_written += s.bytes_written
- def newconsensus_event(self, n):
- self.read_routers(n.nslist)
+ def new_consensus_event(self, n):
+ TorCtl.ConsensusTracker.new_desc_event(self, n)
self.selmgr.path_selector.rebuild_gens(self.sorted_r)
- plog("DEBUG", "Read " + str(len(n.nslist))+" NC => "
- + str(len(self.sorted_r)) + " routers")
-
+
+ def new_desc_event(self, d):
+ if TorCtl.ConsensusTracker.new_desc_event(self, d):
+ self.selmgr.path_selector.rebuild_gens(self.sorted_r)
+
def bandwidth_event(self, b): pass # For heartbeat only..
################### CircuitHandler #############################
Modified: torctl/trunk/python/TorCtl/StatsSupport.py
===================================================================
--- torctl/trunk/python/TorCtl/StatsSupport.py 2009-02-16 11:23:10 UTC (rev 18569)
+++ torctl/trunk/python/TorCtl/StatsSupport.py 2009-02-16 11:45:43 UTC (rev 18570)
@@ -677,21 +677,30 @@
self.count_stream_reason_failed(s, reason)
PathBuilder.stream_status_event(self, s)
- def newconsensus_event(self, n):
- PathBuilder.newconsensus_event(self, n)
+ def _check_hibernation(self, r, now):
+ if r.down:
+ if not r.hibernated_at:
+ r.hibernated_at = now
+ r.total_active_uptime += now - r.became_active_at
+ r.became_active_at = 0
+ else:
+ if not r.became_active_at:
+ r.became_active_at = now
+ r.total_hibernation_time += now - r.hibernated_at
+ r.hibernated_at = 0
+
+ def new_consensus_event(self, n):
+ PathBuilder.new_consensus_event(self, n)
now = n.arrived_at
for ns in n.nslist:
- if not ns.idhex in self.routers:
- continue
- r = self.routers[ns.idhex]
- if r.down:
- if not r.hibernated_at:
- r.hibernated_at = now
- r.total_active_uptime += now - r.became_active_at
- r.became_active_at = 0
- else:
- if not r.became_active_at:
- r.became_active_at = now
- r.total_hibernation_time += now - r.hibernated_at
- r.hibernated_at = 0
+ if not ns.idhex in self.routers: continue
+ self._check_hibernation(self.routers[ns.idhex], now)
+ def new_desc_event(self, d):
+ if PathBuilder.new_desc_event(self, d):
+ now = d.arrived_at
+ for i in d.idlist:
+ if not i in self.routers: continue
+ self._check_hibernation(self.routers[i], now)
+
+
Modified: torctl/trunk/python/TorCtl/TorCtl.py
===================================================================
--- torctl/trunk/python/TorCtl/TorCtl.py 2009-02-16 11:23:10 UTC (rev 18569)
+++ torctl/trunk/python/TorCtl/TorCtl.py 2009-02-16 11:45:43 UTC (rev 18570)
@@ -27,7 +27,7 @@
"EventHandler", "DebugEventHandler", "NetworkStatusEvent",
"NewDescEvent", "CircuitEvent", "StreamEvent", "ORConnEvent",
"StreamBwEvent", "LogEvent", "AddrMapEvent", "BWEvent",
- "UnknownEvent" ]
+ "UnknownEvent", "ConsensusTracker" ]
import os
import re
@@ -42,6 +42,8 @@
import types
import time
import sha
+import sets
+import copy
from TorUtil import *
# Types of "EVENT" message.
@@ -257,7 +259,13 @@
created from the parsed fields, or can be built from a
descriptor+NetworkStatus
"""
- def __init__(self, idhex, name, bw, down, exitpolicy, flags, ip, version, os, uptime):
+ def __init__(self, *args):
+ if len(args) == 1:
+ for i in args[0].__dict__:
+ self.__dict__[i] = copy.deepcopy(args[0].__dict__[i])
+ return
+ else:
+ (idhex, name, bw, down, exitpolicy, flags, ip, version, os, uptime) = args
self.idhex = idhex
self.nickname = name
self.bw = bw
@@ -694,8 +702,10 @@
sig_start = desc.find("\nrouter-signature\n")+len("\nrouter-signature\n")
fp_base64 = sha.sha(desc[:sig_start]).digest().encode("base64")[:-2]
if fp_base64 != ns.orhash:
- plog("WARN", "Router descriptor for "+ns.idhex+" does not match ns fingerprint ("+ns.orhash+" vs "+fp_base64+")")
- return Router.build_from_desc(desc.split("\n"), ns)
+ plog("NOTICE", "Router descriptor for "+ns.idhex+" does not match ns fingerprint ("+ns.orhash+" vs "+fp_base64+")")
+ return None
+ else:
+ return Router.build_from_desc(desc.split("\n"), ns)
def read_routers(self, nslist):
@@ -707,7 +717,7 @@
for ns in nslist:
try:
r = self.get_router(ns)
- new.append(r)
+ if r: new.append(r)
except ErrorReply:
bad_key += 1
if "Running" in ns.flags:
@@ -873,7 +883,7 @@
"NEWDESC" : self.new_desc_event,
"ADDRMAP" : self.address_mapped_event,
"NS" : self.ns_event,
- "NEWCONSENSUS" : self.newconsensus_event
+ "NEWCONSENSUS" : self.new_consensus_event
}
def _handle1(self, timestamp, lines):
@@ -1042,7 +1052,7 @@
def ns_event(self, event):
raise NotImplemented()
- def newconsensus_event(self, event):
+ def new_consensus_event(self, event):
raise NotImplemented()
def address_mapped_event(self, event):
@@ -1051,7 +1061,98 @@
"""
raise NotImplemented()
+class ConsensusTracker(EventHandler):
+ """
+ A ConsensusTracker is an EventHandler that tracks the current
+ consensus of Tor in self.consensus, self.routers and self.sorted_r
+ """
+ def __init__(self, c, RouterClass=Router):
+ EventHandler.__init__(self)
+ c.set_event_handler(self)
+ self.c = c
+ self.consensus = {}
+ self.routers = {}
+ self.sorted_r = []
+ self.name_to_key = {}
+ self.RouterClass = RouterClass
+ self.update_consensus()
+ self._read_routers(self.consensus.values())
+ def _read_routers(self, nslist):
+ old_idhexes = sets.Set(self.routers.keys())
+ new_idhexes = sets.Set(map(lambda ns: ns.idhex, nslist))
+ removed_idhexes = old_idhexes - new_idhexes
+ removed_idhexes.union_update(sets.Set(map(lambda ns: ns.idhex,
+ filter(lambda ns: "Running" not in ns.flags, nslist))))
+
+ for i in removed_idhexes:
+ if i not in self.routers: continue
+ self.routers[i].down = True
+ self.routers[i].flags.remove("Running")
+ if self.routers[i].refcount == 0:
+ self.routers[i].deleted = True
+ plog("INFO", "Expiring non-running router "+i)
+ self.sorted_r.remove(self.routers[i])
+ del self.routers[i]
+ else:
+ plog("INFO", "Postponing expiring non-running router "+i)
+ self.routers[i].deleted = True
+
+ routers = self.c.read_routers(nslist)
+ for r in routers:
+ self.name_to_key[r.nickname] = "$"+r.idhex
+ if r.idhex in self.routers:
+ if self.routers[r.idhex].nickname != r.nickname:
+ plog("NOTICE", "Router "+r.idhex+" changed names from "
+ +self.routers[r.idhex].nickname+" to "+r.nickname)
+ # Must do IN-PLACE update to keep all the refs to this router
+ # valid and current (especially for stats)
+ self.routers[r.idhex].update_to(r)
+ else:
+ rc = self.RouterClass(r)
+ self.routers[rc.idhex] = rc
+
+ self.sorted_r = filter(lambda r: not r.down, self.routers.itervalues())
+ self.sorted_r.sort(lambda x, y: cmp(y.bw, x.bw))
+ for i in xrange(len(self.sorted_r)): self.sorted_r[i].list_rank = i
+
+ def _update_consensus(self, nslist):
+ self.consensus = {}
+ for n in nslist:
+ self.consensus[n.idhex] = n
+
+ def update_consensus(self):
+ self._update_consensus(self.c.get_network_status())
+
+ def new_consensus_event(self, n):
+ self._update_consensus(n.nslist)
+ self._read_routers(self.consensus.values())
+ plog("DEBUG", "Read " + str(len(n.nslist))+" NC => "
+ + str(len(self.sorted_r)) + " routers")
+
+ def new_desc_event(self, d):
+ update = False
+ for i in d.idlist:
+ ns = self.c.get_network_status("id/"+i)
+ r = self.c.read_router(ns)
+ if r and r.idhex in self.consensus:
+ if ns.orhash != self.consensus[r.idhex].orhash:
+ plog("WARN", "Getinfo and consensus disagree for "+r.idhex)
+ continue
+ update = True
+ if r.idhex in self.routers:
+ self.routers[r.idhex].update_to(r)
+ else:
+ self.routers[r.idhex] = self.RouterClass(r)
+ if update:
+ self.sorted_r = filter(lambda r: not r.down, self.routers.itervalues())
+ self.sorted_r.sort(lambda x, y: cmp(y.bw, x.bw))
+ for i in xrange(len(self.sorted_r)): self.sorted_r[i].list_rank = i
+ plog("DEBUG", "Read " + str(len(d.idlist))+" ND => "
+ + str(len(self.sorted_r)) + " routers. Update: "+str(update))
+ return update
+
+
class DebugEventHandler(EventHandler):
"""Trivial debug event handler: reassembles all parsed events to stdout."""
def circ_status_event(self, circ_event): # CircuitEvent()
@@ -1081,7 +1182,7 @@
ns.updated.isoformat(), ns.ip, str(ns.orport),
str(ns.dirport), " ".join(ns.flags)))
- def newconsensus_event(self, nc_event):
+ def new_consensus_event(self, nc_event):
self.ns_event(nc_event)
def new_desc_event(self, newdesc_event):