[Author Prev][Author Next][Thread Prev][Thread Next][Author Index][Thread Index]

[or-cvs] r9849: Turns out PathSupport was broken for ExitPolicyRestriction u (in torflow/trunk: . TorCtl)



Author: mikeperry
Date: 2007-03-16 14:40:47 -0400 (Fri, 16 Mar 2007)
New Revision: 9849

Modified:
   torflow/trunk/TorCtl/PathSupport.py
   torflow/trunk/TorCtl/TorCtl.py
   torflow/trunk/metatroller.py
   torflow/trunk/nodemon.py
   torflow/trunk/speedracer.pl
Log:
Turns out PathSupport was broken for ExitPolicyRestriction usage. Fixed. Also
made significant perf improvements by removing some list persistence
braindamage for NodeSelectionLists, and fixed some stats. Oh, and added
timestamps on events when they first arrive off the wire so we don't mess up
accounting during large NS events arriving or while we are doing lots of
processing.



Modified: torflow/trunk/TorCtl/PathSupport.py
===================================================================
--- torflow/trunk/TorCtl/PathSupport.py	2007-03-16 11:13:15 UTC (rev 9848)
+++ torflow/trunk/TorCtl/PathSupport.py	2007-03-16 18:40:47 UTC (rev 9849)
@@ -24,38 +24,24 @@
 class NodeRestriction:
   "Interface for node restriction policies"
   def r_is_ok(self, r): return True  
-  def reset(self, router_list): pass
 
 class NodeRestrictionList:
-  def __init__(self, restrictions, sorted_rlist):
+  def __init__(self, restrictions):
     self.restrictions = restrictions
-    self.update_routers(sorted_rlist)
 
-  def __check_r(self, r):
-    for rst in self.restrictions:
-      if not rst.r_is_ok(r): return False
-    self.restricted_bw += r.bw
+  def r_is_ok(self, r):
+    for rs in self.restrictions:
+      if not rs.r_is_ok(r): return False
     return True
 
-  def update_routers(self, sorted_rlist):
-    self._sorted_r = sorted_rlist
-    self.restricted_bw = 0
-    for rs in self.restrictions: rs.reset(sorted_rlist)
-    self.restricted_r = filter(self.__check_r, self._sorted_r)
-
   def add_restriction(self, restr):
     self.restrictions.append(restr)
-    for r in self.restricted_r:
-      if not restr.r_is_ok(r):
-        self.restricted_r.remove(r)
-        self.restricted_bw -= r.bw
-  
-  # XXX: This does not collapse meta restrictions..
+
+  # TODO: This does not collapse meta restrictions..
   def del_restriction(self, RestrictionClass):
     self.restrictions = filter(
         lambda r: not isinstance(r, RestrictionClass),
           self.restrictions)
-    self.update_routers(self._sorted_r)
 
 class PathRestriction:
   "Interface for path restriction policies"
@@ -96,28 +82,18 @@
 
 class NodeGenerator:
   "Interface for node generation"
-  def __init__(self, restriction_list):
-    self.restriction_list = restriction_list
+  def __init__(self, sorted_r, rstr_list):
+    self.rstr_list = rstr_list # Check me before you yield!
+    self.sorted_r = sorted_r
     self.rewind()
 
   def rewind(self):
-    # TODO: Hrmm... Is there any way to handle termination other 
-    # than to make a list of routers that we pop from? Random generators 
-    # will not terminate if no node matches the selector without this..
-    # Not so much an issue now, but in a few years, the Tor network
-    # will be large enough that having all these list copies will
-    # be obscene... Possible candidate for a python list comprehension
-    self.routers = copy.copy(self.restriction_list.restricted_r)
-    self.bw = self.restriction_list.restricted_bw
+    self.routers = copy.copy(self.sorted_r)
 
   def mark_chosen(self, r):
     self.routers.remove(r)
-    self.bw -= r.bw
 
   def all_chosen(self):
-    if not self.routers and self.bw or not self.bw and self.routers:
-      plog("WARN", str(len(self.routers))+" routers left but bw="
-         +str(self.bw))
     return not self.routers
 
   def next_r(self): raise NotImplemented()
@@ -161,34 +137,19 @@
 #          Exit->destination hops
 
 class PercentileRestriction(NodeRestriction):
-  """If used, this restriction MUST be FIRST in the RestrictionList."""
   def __init__(self, pct_skip, pct_fast, r_list):
     self.pct_fast = pct_fast
     self.pct_skip = pct_skip
     self.sorted_r = r_list
-    self.position = 0
 
-  def reset(self, r_list):
-    self.sorted_r = r_list
-    self.position = 0
-    
-  # XXX: Don't count non-running routers in this
   def r_is_ok(self, r):
-    ret = True
-    if self.position == len(self.sorted_r):
-      self.position = 0
-      plog("WARN", "Resetting PctFastRestriction")
-    if self.position != self.sorted_r.index(r): # XXX expensive?
-      plog("WARN", "Router"+r.nickname+" at mismatched index: "
-             +self.position+" vs "+self.sorted_r.index(r))
+    # Hrmm.. technically we shouldn't count non-running routers in this..
+    # but that is tricky to do efficiently
     
-    if self.position < len(self.sorted_r)*self.pct_skip/100:
-      ret = False
-    elif self.position > len(self.sorted_r)*self.pct_fast/100:
-      ret = False
+    if r.list_rank < len(self.sorted_r)*self.pct_skip/100: return False
+    elif r.list_rank > len(self.sorted_r)*self.pct_fast/100: return False
     
-    self.position += 1
-    return ret
+    return True
     
 class OSRestriction(NodeRestriction):
   def __init__(self, ok, bad=[]):
@@ -206,6 +167,7 @@
     if self.bad: return True
 
 class ConserveExitsRestriction(NodeRestriction):
+  # FIXME: Make this adaptive
   def r_is_ok(self, r): return not "Exit" in r.flags
 
 class FlagsRestriction(NodeRestriction):
@@ -283,7 +245,7 @@
   def r_is_ok(self, r): return r.will_exit_to(self.to_ip, self.to_port)
 
 class MetaNodeRestriction(NodeRestriction):
-  # XXX: these should collapse the restriction and return a new
+  # TODO: these should collapse the restriction and return a new
   # instance for re-insertion (or None)
   def next_rstr(self): raise NotImplemented()
   def del_restriction(self, RestrictionClass): raise NotImplemented()
@@ -340,20 +302,18 @@
     while not self.all_chosen():
       r = random.choice(self.routers)
       self.mark_chosen(r)
-      yield r
+      if self.rstr_list.r_is_ok(r): yield r
 
-# XXX: Either this is busted or the ExitPolicyRestriction is..
 class OrderedExitGenerator(NodeGenerator):
-  def __init__(self, restriction_list, to_port):
+  def __init__(self, to_port, sorted_r, rstr_list):
     self.to_port = to_port
     self.next_exit_by_port = {}
-    NodeGenerator.__init__(self, restriction_list)
+    NodeGenerator.__init__(self, sorted_r, rstr_list)
 
   def rewind(self):
-    NodeGenerator.rewind(self)
     if self.to_port not in self.next_exit_by_port or not self.next_exit_by_port[self.to_port]:
       self.next_exit_by_port[self.to_port] = 0
-      self.last_idx = len(self.routers)
+      self.last_idx = len(self.sorted_r)
     else:
       self.last_idx = self.next_exit_by_port[self.to_port]
 
@@ -367,11 +327,11 @@
 
   def next_r(self):
     while True: # A do..while would be real nice here..
-      if self.next_exit_by_port[self.to_port] >= len(self.routers):
+      if self.next_exit_by_port[self.to_port] >= len(self.sorted_r):
         self.next_exit_by_port[self.to_port] = 0
-      r = self.routers[self.next_exit_by_port[self.to_port]]
+      r = self.sorted_r[self.next_exit_by_port[self.to_port]]
       self.next_exit_by_port[self.to_port] += 1
-      yield r
+      if self.rstr_list.r_is_ok(r): yield r
       if self.last_idx == self.next_exit_by_port[self.to_port]:
         break
 
@@ -438,35 +398,35 @@
 
   def reconfigure(self, sorted_r):
     if self.use_all_exits:
-      self.path_rstr = PathRestrictionList([])
+      self.path_rstr = PathRestrictionList([UniqueRestriction()])
     else:
       self.path_rstr = PathRestrictionList(
            [Subnet16Restriction(), UniqueRestriction()])
+  
+    if self.use_guards: entry_flags = ["Guard", "Valid", "Running"]
+    else: entry_flags = ["Valid", "Running"]
       
-    self.entry_rstr = NodeRestrictionList(
-      [
-       PercentileRestriction(self.percent_skip, self.percent_fast,
-        sorted_r),
+    entry_rstr = NodeRestrictionList(
+      [PercentileRestriction(self.percent_skip, self.percent_fast, sorted_r),
        ConserveExitsRestriction(),
-       FlagsRestriction(["Guard", "Valid", "Running"], [])
-       ], sorted_r)
-    self.mid_rstr = NodeRestrictionList(
-      [PercentileRestriction(self.percent_skip, self.percent_fast,
-        sorted_r),
+       FlagsRestriction(entry_flags, [])]
+    )
+    mid_rstr = NodeRestrictionList(
+      [PercentileRestriction(self.percent_skip, self.percent_fast, sorted_r),
        ConserveExitsRestriction(),
-       FlagsRestriction(["Valid", "Running"], [])], sorted_r)
-
+       FlagsRestriction(["Running"], [])]
+    )
     if self.use_all_exits:
       self.exit_rstr = NodeRestrictionList(
-        [FlagsRestriction(["Valid", "Running"], ["BadExit"])], sorted_r)
+        [FlagsRestriction(["Valid", "Running"], ["BadExit"])])
     else:
       self.exit_rstr = NodeRestrictionList(
-        [PercentileRestriction(self.percent_skip, self.percent_fast,
-           sorted_r),
-         FlagsRestriction(["Valid", "Running"], ["BadExit"])],
-         sorted_r)
+        [PercentileRestriction(self.percent_skip, self.percent_fast, sorted_r),
+         FlagsRestriction(["Valid", "Running"], ["BadExit"])])
 
     if self.exit_name:
+      self.exit_rstr.del_restriction(IdHexRestriction)
+      self.exit_rstr.del_restriction(NickRestriction)
       if self.exit_name[0] == '$':
         self.exit_rstr.add_restriction(IdHexRestriction(self.exit_name))
       else:
@@ -478,14 +438,14 @@
         exitgen = self.__ordered_exit_gen
       else:
         exitgen = self.__ordered_exit_gen = \
-          OrderedExitGenerator(self.exit_rstr, 80)
+          OrderedExitGenerator(80, sorted_r, self.exit_rstr)
     else:
-      exitgen = UniformGenerator(self.exit_rstr)
+      exitgen = UniformGenerator(sorted_r, self.exit_rstr)
 
     if self.uniform:
       self.path_selector = PathSelector(
-         UniformGenerator(self.entry_rstr),
-         UniformGenerator(self.mid_rstr),
+         UniformGenerator(sorted_r, entry_rstr),
+         UniformGenerator(sorted_r, mid_rstr),
          exitgen, self.path_rstr)
     else:
       raise NotImplemented()
@@ -495,17 +455,13 @@
     self.exit_rstr.add_restriction(ExitPolicyRestriction(ip, port))
     if self.__ordered_exit_gen: self.__ordered_exit_gen.set_port(port)
 
-  def update_routers(self, new_rlist):
-    self.entry_rstr.update_routers(new_rlist)
-    self.mid_rstr.update_routers(new_rlist)
-    self.exit_rstr.update_routers(new_rlist)
-
 class Circuit:
   def __init__(self):
     self.cid = 0
     self.path = [] # routers
     self.exit = None
     self.built = False
+    self.dirty = False
     self.detached_cnt = 0
     self.last_extended_at = time.time()
     self.pending_streams = [] # Which stream IDs are pending us
@@ -525,9 +481,7 @@
     self.bytes_read = 0
     self.bytes_written = 0
 
-  # XXX: Use event timestamps
-  def lifespan(self): return time.time()-self.attached_at
-  def write_bw(self): return self.bytes_written/self.lifespan()
+  def lifespan(self, now): return now-self.attached_at
 
 # TODO: Make passive "PathWatcher" so people can get aggregate 
 # node reliability stats for normal usage without us attaching streams
@@ -606,13 +560,12 @@
       return
     
     # If event is stream:NEW*/DETACHED or circ BUILT/FAILED, 
-    # don't run low prio jobs.. No need to delay streams or delay bandwidth
-    # counting for them.
+    # don't run low prio jobs.. No need to delay streams for them.
     if isinstance(event, TorCtl.CircuitEvent):
-      if event.status in ("BUILT", "FAILED", "EXTENDED"):
+      if event.status in ("BUILT", "FAILED"):
         return
     elif isinstance(event, TorCtl.StreamEvent):
-      if event.status in ("NEW", "NEWRESOLVE", "DETACHED", "FAILED", "CLOSED"):
+      if event.status in ("NEW", "NEWRESOLVE", "DETACHED"):
         return
     
     # Do the low prio jobs one at a time in case a 
@@ -639,6 +592,7 @@
         new_routers.append(rc)
     self.sorted_r.extend(new_routers)
     self.sorted_r.sort(lambda x, y: cmp(y.bw, x.bw))
+    for i in xrange(len(self.sorted_r)): self.sorted_r[i].list_rank = i
 
   def attach_stream_any(self, stream, badcircs):
     # Newnym, and warn if not built plus pending
@@ -653,11 +607,10 @@
              +" pending streams")
           unattached_streams.extend(self.circuits[key].pending_streams)
         # FIXME: Consider actually closing circ if no streams.
-        # XXX: Circ chosen&failed count before doing this?
-        del self.circuits[key]
+        self.circuits[key].dirty = True
       
     for circ in self.circuits.itervalues():
-      if circ.built and circ.cid not in badcircs:
+      if circ.built and not circ.dirty and circ.cid not in badcircs:
         if circ.exit.will_exit_to(stream.host, stream.port):
           try:
             self.c.attach_stream(stream.sid, circ.cid)
@@ -671,8 +624,8 @@
           break
     else:
       circ = None
+      self.selmgr.set_target(stream.host, stream.port)
       while circ == None:
-        self.selmgr.set_target(stream.host, stream.port)
         try:
           circ = self.c.build_circuit(
                   self.selmgr.pathlen,
@@ -701,7 +654,7 @@
       plog("DEBUG", "Ignoring circ " + str(c.circ_id))
       return
     if c.status == "EXTENDED":
-      self.circuits[c.circ_id].last_extended_at = time.time()
+      self.circuits[c.circ_id].last_extended_at = c.arrived_at
     elif c.status == "FAILED" or c.status == "CLOSED":
       circ = self.circuits[c.circ_id]
       del self.circuits[c.circ_id]
@@ -757,7 +710,7 @@
       self.streams[s.strm_id].circ = self.streams[s.strm_id].pending_circ
       self.streams[s.strm_id].circ.pending_streams.remove(self.streams[s.strm_id])
       self.streams[s.strm_id].pending_circ = None
-      self.streams[s.strm_id].attached_at = time.time()
+      self.streams[s.strm_id].attached_at = s.arrived_at
     elif s.status == "FAILED" or s.status == "CLOSED":
       # FIXME stats
       if s.strm_id not in self.streams:
@@ -773,8 +726,7 @@
       if s.status == "FAILED":
         # Avoid busted circuits that will not resolve or carry
         # traffic. 
-        # XXX: Circ chosen&failed count before doing this?
-        if s.circ_id in self.circuits: del self.circuits[s.circ_id]
+        if s.circ_id in self.circuits: self.circuits[s.circ_id].dirty = True
         else: plog("WARN","Failed stream on unknown circ "+str(s.circ_id))
         return
 
@@ -806,14 +758,12 @@
     self.read_routers(n.nslist)
     plog("DEBUG", "Read " + str(len(n.nslist))+" NS => " 
        + str(len(self.sorted_r)) + " routers")
-    self.selmgr.update_routers(self.sorted_r)
   
   def new_desc_event(self, d):
     for i in d.idlist: # Is this too slow?
       self.read_routers(self.c.get_network_status("id/"+i))
     plog("DEBUG", "Read " + str(len(d.idlist))+" Desc => " 
        + str(len(self.sorted_r)) + " routers")
-    self.selmgr.update_routers(self.sorted_r)
 
   def bandwidth_event(self, b): pass # For heartbeat only..
 
@@ -846,6 +796,9 @@
     if r.will_exit_to("211.11.21.22", 465):
       print r.nickname+" "+str(r.bw)
 
+  do_unit(ExitPolicyRestriction("2.11.2.2", 80), sorted_rlist,
+          lambda r: "exits to 80")
+  exit(0)
   do_unit(PercentileRestriction(0, 100, sorted_rlist), sorted_rlist,
           lambda r: "")
   do_unit(PercentileRestriction(10, 20, sorted_rlist), sorted_rlist,
@@ -867,15 +820,14 @@
   do_unit(ConserveExitsRestriction(), sorted_rlist, lambda r: " ".join(r.flags))
   do_unit(FlagsRestriction([], ["Valid"]), sorted_rlist, lambda r: " ".join(r.flags))
 
-  # XXX: Need unittest
   do_unit(IdHexRestriction("$FFCB46DB1339DA84674C70D7CB586434C4370441"),
           sorted_rlist, lambda r: r.idhex)
 
   rl =  [AtLeastNNodeRestriction([ExitPolicyRestriction("255.255.255.255", 80), ExitPolicyRestriction("255.255.255.255", 443), ExitPolicyRestriction("255.255.255.255", 6667)], 2), FlagsRestriction([], ["BadExit"])]
 
-  exit_rstr = NodeRestrictionList(rl, sorted_rlist)
+  exit_rstr = NodeRestrictionList(rl)
 
-  ug = UniformGenerator(exit_rstr)
+  ug = UniformGenerator(sorted_rlist, exit_rstr)
 
   rlist = []
   for r in ug.next_r():

Modified: torflow/trunk/TorCtl/TorCtl.py
===================================================================
--- torflow/trunk/TorCtl/TorCtl.py	2007-03-16 11:13:15 UTC (rev 9848)
+++ torflow/trunk/TorCtl/TorCtl.py	2007-03-16 18:40:47 UTC (rev 9849)
@@ -71,17 +71,20 @@
 class NetworkStatusEvent:
   def __init__(self, event_name, nslist):
     self.event_name = event_name
+    self.arrived_at = 0
     self.nslist = nslist # List of NetworkStatus objects
 
 class NewDescEvent:
   def __init__(self, event_name, idlist):
     self.event_name = event_name
+    self.arrived_at = 0
     self.idlist = idlist
 
 class CircuitEvent:
   def __init__(self, event_name, circ_id, status, path, reason,
          remote_reason):
     self.event_name = event_name
+    self.arrived_at = 0
     self.circ_id = circ_id
     self.status = status
     self.path = path
@@ -92,6 +95,7 @@
   def __init__(self, event_name, strm_id, status, circ_id, target_host,
          target_port, reason, remote_reason):
     self.event_name = event_name
+    self.arrived_at = 0
     self.strm_id = strm_id
     self.status = status
     self.circ_id = circ_id
@@ -104,6 +108,7 @@
   def __init__(self, event_name, status, endpoint, age, read_bytes,
          wrote_bytes, reason, ncircs):
     self.event_name = event_name
+    self.arrived_at = 0
     self.status = status
     self.endpoint = endpoint
     self.age = age
@@ -200,6 +205,7 @@
     self.ip = struct.unpack(">I", socket.inet_aton(ip))[0]
     self.version = RouterVersion(version)
     self.os = os
+    self.list_rank = 0 # position in a sorted list of routers.
 
   def update_to(self, new):
     if self.idhex != new.idhex:
@@ -218,7 +224,7 @@
       ret = line.check(ip, port)
       if ret != -1:
         return ret
-    plog("NOTICE", "No matching exit line for "+self.nickname)
+    plog("WARN", "No matching exit line for "+self.nickname)
     return False
    
 class Connection:
@@ -253,7 +259,7 @@
     self._sendLock.acquire()
     try:
       self._queue.put("CLOSE")
-      self._eventQueue.put("CLOSE")
+      self._eventQueue.put((time.time(), "CLOSE"))
       self._s.close()
       self._s = None
       self._closed = 1
@@ -286,9 +292,9 @@
         self._err(sys.exc_info())
         return
 
-      if isEvent: # XXX: timestamp these, and pass timestamp to EventHandler
+      if isEvent:
         if self._handler is not None:
-          self._eventQueue.put(reply)
+          self._eventQueue.put((time.time(), reply))
       else:
         cb = self._queue.get() # atomic..
         cb(reply)
@@ -322,11 +328,11 @@
   def _eventLoop(self):
     """DOCDOC"""
     while 1:
-      reply = self._eventQueue.get()
+      (timestamp, reply) = self._eventQueue.get()
       if reply == "CLOSE":
         return
       try:
-        self._handleFn(reply)
+        self._handleFn(timestamp, reply)
       except:
         self._err(sys.exc_info(), 1)
         return
@@ -421,9 +427,8 @@
         if isEvent: # Need "250 OK" if it's not an event. Otherwise, end
           return (isEvent, lines)
 
-    # XXX: Notreached
-    isEvent = (lines and lines[0][0][0] == '6')
-    return (isEvent, lines)
+    # Notreached
+    raise TorCtlError()
 
   def _doSend(self, msg):
     if self._debugFile:
@@ -700,10 +705,11 @@
       "NS" : self.ns_event
       }
 
-  def handle1(self, lines):
+  def handle1(self, timestamp, lines):
     """Dispatcher: called from Connection when an event is received."""
     for code, msg, data in lines:
       event = self.decode1(msg, data)
+      event.arrived_at = timestamp
       self.heartbeat_event(event)
       self._map1.get(event.event_name, self.unknown_event)(event)
 

Modified: torflow/trunk/metatroller.py
===================================================================
--- torflow/trunk/metatroller.py	2007-03-16 11:13:15 UTC (rev 9848)
+++ torflow/trunk/metatroller.py	2007-03-16 18:40:47 UTC (rev 9849)
@@ -23,7 +23,6 @@
 mt_version = "0.1.0-dev"
 
 # TODO: Move these to config file
-# TODO: Option to ignore guard flag
 control_host = "127.0.0.1"
 control_port = 9061
 meta_host = "127.0.0.1"
@@ -308,12 +307,13 @@
 
   def run_zbtest(self): # Unweighted z-test
     n = reduce(lambda x, y: x+(y.bwstats.mean > 0), self.sorted_r, 0)
-    if n == 0: return
+    if n == 0: return (0, 0)
     avg = reduce(lambda x, y: x+y.bwstats.mean, self.sorted_r, 0)/float(n)
     def notlambda(x, y):
       if y.bwstats.mean <= 0: return x+0
       else: return x+(y.bwstats.mean-avg)*(y.bwstats.mean-avg)
     stddev = math.sqrt(reduce(notlambda, self.sorted_r, 0)/float(n))
+    if not stddev: return (avg, stddev)
     for r in self.sorted_r:
       if r.bwstats.mean > 0:
         r.z_bw = abs((r.bwstats.mean-avg)/stddev)
@@ -322,12 +322,13 @@
 
   def run_zrtest(self): # Unweighted z-test
     n = reduce(lambda x, y: x+(y.bw_ratio() > 0), self.sorted_r, 0)
-    if n == 0: return
+    if n == 0: return (0, 0)
     avg = reduce(lambda x, y: x+y.bw_ratio(), self.sorted_r, 0)/float(n)
     def notlambda(x, y):
       if y.bw_ratio() <= 0: return x+0
       else: return x+(y.bw_ratio()-avg)*(y.bw_ratio()-avg)
     stddev = math.sqrt(reduce(notlambda, self.sorted_r, 0)/float(n))
+    if not stddev: return (avg, stddev)
     for r in self.sorted_r:
       if r.bw_ratio() > 0:
         r.z_ratio = abs((r.bw_ratio()-avg)/stddev)
@@ -404,7 +405,7 @@
     # TODO: Sort by failed/selected and suspect/selected ratios
     # if we ever want to do non-uniform scanning..
 
-    # XXX: Add failed in here somehow..
+    # FIXME: Add failed in here somehow..
     susp_reasons = self.suspect_reasons.values()
     susp_reasons.sort(lambda x, y:
        cmp(y.total_suspected(), x.total_suspected()))
@@ -440,7 +441,7 @@
       else: rreason = "NONE"
       reason = c.event_name+":"+c.status+":"+lreason+":"+rreason
       if c.status == "EXTENDED":
-        delta = time.time() - self.circuits[c.circ_id].last_extended_at
+        delta = c.arrived_at - self.circuits[c.circ_id].last_extended_at
         r_ext = c.path[-1]
         if r_ext[0] != '$': r_ext = self.name_to_key[r_ext]
         self.routers[r_ext[1:]].total_extend_time += delta
@@ -495,7 +496,6 @@
     PathBuilder.circ_status_event(self, c)
   
   def stream_status_event(self, s):
-    # XXX: Verify circ id matches stream.circ
     if s.strm_id in self.streams:
       # TODO: Hrmm, consider making this sane in TorCtl.
       if s.reason: lreason = s.reason
@@ -511,6 +511,15 @@
         plog("WARN", "Stream "+str(s.strm_id)+" detached from no circuit!")
         PathBuilder.stream_status_event(self, s)
         return
+
+      # Verify circ id matches stream.circ
+      if s.status not in ("NEW" or "NEWRESOLVE"):
+        circ = self.streams[s.strm_id].circ
+        if not circ: circ = self.streams[s.strm_id].pending_circ
+        if circ and circ.cid != s.circ_id:
+          plog("WARN", str(s.strm_id) + " has mismatch of "
+                +str(s.circ_id)+" v "+str(circ.cid))
+
       if s.status == "DETACHED" or s.status == "FAILED":
         # Update strm_chosen count
         # FIXME: use SENTRESOLVE/SENTCONNECT instead?
@@ -520,11 +529,10 @@
         if self.streams[s.strm_id].attached_at:
           if s.status == "DETACHED":
             plog("WARN", str(s.strm_id)+" detached after succeeded")
-          lifespan = self.streams[s.strm_id].lifespan()
+          lifespan = self.streams[s.strm_id].lifespan(s.arrived_at)
           for r in self.streams[s.strm_id].circ.path:
             r.bwstats.add_bw(self.streams[s.strm_id].bytes_written+
-                             self.streams[s.strm_id].bytes_read,
-                             lifespan)
+                             self.streams[s.strm_id].bytes_read, lifespan)
  
         # Update failed count,reason_failed for exit
         r = self.circuits[s.circ_id].exit
@@ -559,11 +567,10 @@
 
           # Update bw stats
           if self.streams[s.strm_id].attached_at:
-            lifespan = self.streams[s.strm_id].lifespan()
+            lifespan = self.streams[s.strm_id].lifespan(s.arrived_at)
             for r in self.streams[s.strm_id].circ.path:
               r.bwstats.add_bw(self.streams[s.strm_id].bytes_written+
-                               self.streams[s.strm_id].bytes_read,
-                               lifespan)
+                               self.streams[s.strm_id].bytes_read, lifespan)
 
           if lreason in ("TIMEOUT", "INTERNAL", "TORPROTOCOL" "DESTROY"):
             for r in self.circuits[s.circ_id].path[:-1]:
@@ -593,7 +600,7 @@
 
   def ns_event(self, n):
     PathBuilder.ns_event(self, n)
-    now = time.time()
+    now = n.arrived_at
     for ns in n.nslist:
       if not ns.idhex in self.routers:
         continue

Modified: torflow/trunk/nodemon.py
===================================================================
--- torflow/trunk/nodemon.py	2007-03-16 11:13:15 UTC (rev 9848)
+++ torflow/trunk/nodemon.py	2007-03-16 18:40:47 UTC (rev 9849)
@@ -41,7 +41,7 @@
 key_to_name = {}
 name_to_key = {}
 
-# XXX: Move these to config file
+# TODO: Move these to config file
 control_host = "127.0.0.1"
 control_port = 9051
 max_detach = 3

Modified: torflow/trunk/speedracer.pl
===================================================================
--- torflow/trunk/speedracer.pl	2007-03-16 11:13:15 UTC (rev 9848)
+++ torflow/trunk/speedracer.pl	2007-03-16 18:40:47 UTC (rev 9849)
@@ -16,11 +16,12 @@
 
 # http://bitter.stalin.se/torfile
 # http://www.sigma.su.se/~who/torfile
-my $URL = "http://130.237.152.195/~who/torfile";; 
-my $COUNT = 400;
+my $URL = "http://bitter.stalin.se/torfile1";; 
+my $COUNT = 500;
 my $START_PCT = 0;
-my $STOP_PCT = 21;
-my $PCT_STEP = 7;
+my $STOP_PCT = 20;
+my $PCT_STEP = 5;
+my $DOUBLE_FETCH = 0;
 
 my $LOG_LEVEL = "DEBUG";
 my %log_levels = ("DEBUG", 0, "INFO", 1, "NOTICE", 2, "WARN", 3, "ERROR", 4);
@@ -136,41 +137,47 @@
         } while($ret != 0 || $delta_build >= 550.0);
 
         $build_exit = query_exit($mcp);
+        $fetch_exit = $build_exit;
 
         plog "DEBUG", "Got 1st via $build_exit\n";
 
         # Now do it for real
-        
-        do {
-            $i++;
-            $t0 = [gettimeofday()];
-            $ret = 
+        if($DOUBLE_FETCH) {
+            do {
+                $i++;
+                $t0 = [gettimeofday()];
+                $ret = 
 #                system("tsocks wget -U \"$USER_AGENT\" \'$URL\' -O - >& /dev/null");
-                system("curl $CURL_PROXY -m 600 -A \"$USER_AGENT\" \'$URL\' >& /dev/null");
+                    system("curl $CURL_PROXY -m 600 -A \"$USER_AGENT\" \'$URL\' >& /dev/null");
 
-            if($ret == 2) {
-                plog "NOTICE", "wget got Sigint. Dying\n";
-                exit;
-            }
-            plog "NOTICE", "wget failed with ret=$ret.. Retrying with clock still running\n" 
-                if($ret != 0);
-            $delta_fetch = tv_interval $t0;
-            plog "NOTICE", "Timer exceeded limit: $delta_fetch\n"
-                if($delta_fetch >= 550.0);
-        } while($ret != 0 || $delta_fetch >= 550.0);
+                if($ret == 2) {
+                    plog "NOTICE", "wget got Sigint. Dying\n";
+                    exit;
+                }
+                plog "NOTICE", "wget failed with ret=$ret.. Retrying with clock still running\n" 
+                    if($ret != 0);
+                $delta_fetch = tv_interval $t0;
+                plog "NOTICE", "Timer exceeded limit: $delta_fetch\n"
+                    if($delta_fetch >= 550.0);
+            } while($ret != 0 || $delta_fetch >= 550.0);
 
-        $fetch_exit = query_exit($mcp);
+            $fetch_exit = query_exit($mcp);
 
-        if($fetch_exit eq $build_exit) {
+            if($fetch_exit eq $build_exit) {
+                $tot_build_time += $delta_build;
+                push(@build_times, $delta_build);
+                plog "DEBUG", "$skip-$pct% circuit build+fetch took $delta_build for $fetch_exit\n";
+
+                push(@fetch_times, $delta_fetch);
+                $tot_fetch_time += $delta_fetch;
+                plog "DEBUG", "$skip-$pct% fetch took $delta_fetch for $fetch_exit\n";
+            } else {
+                plog "NOTICE", "Ignoring strange exit swap $build_exit -> $fetch_exit. Circuit failure?\n";
+            }
+        } else {
             $tot_build_time += $delta_build;
             push(@build_times, $delta_build);
             plog "DEBUG", "$skip-$pct% circuit build+fetch took $delta_build for $fetch_exit\n";
-
-            push(@fetch_times, $delta_fetch);
-            $tot_fetch_time += $delta_fetch;
-            plog "DEBUG", "$skip-$pct% fetch took $delta_fetch for $fetch_exit\n";
-        } else {
-            plog "NOTICE", "Ignoring strange exit swap $build_exit -> $fetch_exit. Circuit failure?\n";
         }
     }
     my $avg_build_time = $tot_build_time/($#build_times+1);
@@ -180,17 +187,19 @@
             ($_ - $avg_build_time)*($_ - $avg_build_time);
     }
     $build_dev = sqrt($build_dev / ($#build_times+1));
-    
-    my $avg_fetch_time = $tot_fetch_time/($#fetch_times+1);
-    my $fetch_dev = 0;
-    foreach(@fetch_times) {
-        $fetch_dev += 
-            ($_ - $avg_fetch_time)*($_ - $avg_fetch_time);
+   
+    if($DOUBLE_FETCH) { 
+        my $avg_fetch_time = $tot_fetch_time/($#fetch_times+1);
+        my $fetch_dev = 0;
+        foreach(@fetch_times) {
+            $fetch_dev += 
+                ($_ - $avg_fetch_time)*($_ - $avg_fetch_time);
+        }
+        $fetch_dev = sqrt($fetch_dev / ($#fetch_times+1));
+        plog "INFO", "RANGE $skip-$pct " . ($#fetch_times+1) . " fetches: avg=$avg_fetch_time, dev=$fetch_dev\n";
     }
-    $fetch_dev = sqrt($fetch_dev / ($#fetch_times+1));
     plog "INFO", "RANGE $skip-$pct " . ($#build_times+1) . " build+fetches: avg=$avg_build_time, dev=$build_dev\n";
-    plog "INFO", "RANGE $skip-$pct " . ($#fetch_times+1) . " fetches: avg=$avg_fetch_time, dev=$fetch_dev\n";
-    plog "INFO", "  " . ($COUNT*2) . " fetches took $i tries\n";
+    plog "INFO", "  " . ($COUNT*($DOUBLE_FETCH+1)) . " fetches took $i tries\n";
 }
 
 sub main