[Author Prev][Author Next][Thread Prev][Thread Next][Author Index][Thread Index]

[or-cvs] r9831: Z-tests written for stream bandwidth and obsevered bandwidth (in torflow/trunk: . TorCtl)



Author: mikeperry
Date: 2007-03-15 14:58:51 -0400 (Thu, 15 Mar 2007)
New Revision: 9831

Added:
   torflow/trunk/speedracer.pl
Modified:
   torflow/trunk/TorCtl/PathSupport.py
   torflow/trunk/TorCtl/TorCtl.py
   torflow/trunk/TorCtl/TorUtil.py
   torflow/trunk/metatroller.py
   torflow/trunk/soat.pl
Log:
Z-tests written for stream bandwidth and obsevered bandwidth/stream bandwidth.
Still a little flaky unfortunately.. Really need to add timestamps to events
as soon as they are read, and also some stats are in fact self-consistent in
their insanity rather than sane. But at least no regressions are present in
this code, so I'm gonna commit it. Bugfixes to follow.



Modified: torflow/trunk/TorCtl/PathSupport.py
===================================================================
--- torflow/trunk/TorCtl/PathSupport.py	2007-03-15 16:28:01 UTC (rev 9830)
+++ torflow/trunk/TorCtl/PathSupport.py	2007-03-15 18:58:51 UTC (rev 9831)
@@ -6,8 +6,8 @@
 import random
 import socket
 import copy
-import datetime
 import Queue
+import time
 from TorUtil import *
 
 __all__ = ["NodeRestrictionList", "PathRestrictionList",
@@ -124,7 +124,7 @@
 
 class Connection(TorCtl.Connection):
   def build_circuit(self, pathlen, path_sel):
-    circ = TorCtl.Circuit()
+    circ = Circuit()
     if pathlen == 1:
       circ.exit = path_sel.exit_chooser(circ.path)
       circ.path = [circ.exit]
@@ -136,7 +136,6 @@
       circ.exit = path_sel.exit_chooser(circ.path)
       circ.path.append(circ.exit)
       circ.cid = self.extend_circuit(0, circ.id_path())
-    circ.created_at = datetime.datetime.now()
     return circ
 
 ######################## Node Restrictions ########################
@@ -144,22 +143,22 @@
 # TODO: We still need more path support implementations
 #  - BwWeightedGenerator
 #  - NodeRestrictions:
-#  - Uptime/LongLivedPorts (Does/should hibernation count?)
-#  - Published/Updated
-#  - GeoIP
-#    - NodeCountry
-#  - PathRestrictions
-#  - Family
-#  - GeoIP:
-#    - OceanPhobicRestrictor (avoids Pacific Ocean or two atlantic crossings)
-#    or ContinentRestrictor (avoids doing more than N continent crossings)
-#    - Mathematical/empirical study of predecessor expectation
-#      - If middle node on the same continent as exit, exit learns nothing
-#      - else, exit has a bias on the continent of origin of user
-#      - Language and browser accept string determine this anyway
-#    - EchelonPhobicRestrictor
-#    - Does not cross international boundaries for client->Entry or
-#      Exit->destination hops
+#    - Uptime/LongLivedPorts (Does/should hibernation count?)
+#    - Published/Updated
+#    - GeoIP
+#      - NodeCountry
+#  - PathRestrictions:
+#    - Family
+#    - GeoIP:
+#      - OceanPhobicRestrictor (avoids Pacific Ocean or two atlantic crossings)
+#        or ContinentRestrictor (avoids doing more than N continent crossings)
+#        - Mathematical/empirical study of predecessor expectation
+#          - If middle node on the same continent as exit, exit learns nothing
+#          - else, exit has a bias on the continent of origin of user
+#            - Language and browser accept string determine this anyway
+#      - EchelonPhobicRestrictor
+#        - Does not cross international boundaries for client->Entry or
+#          Exit->destination hops
 
 class PercentileRestriction(NodeRestriction):
   """If used, this restriction MUST be FIRST in the RestrictionList."""
@@ -173,6 +172,7 @@
     self.sorted_r = r_list
     self.position = 0
     
+  # XXX: Don't count non-running routers in this
   def r_is_ok(self, r):
     ret = True
     if self.position == len(self.sorted_r):
@@ -255,7 +255,6 @@
         return True
     return False
 
-
 class VersionExcludeRestriction(NodeRestriction):
   def __init__(self, exclude):
     self.exclude = map(TorCtl.RouterVersion, exclude)
@@ -272,7 +271,6 @@
     if less_eq: self.less_eq = TorCtl.RouterVersion(less_eq)
     else: self.less_eq = None
   
-
   def r_is_ok(self, router):
     return (not self.gr_eq or router.version >= self.gr_eq) and \
         (not self.less_eq or router.version <= self.less_eq)
@@ -344,6 +342,7 @@
       self.mark_chosen(r)
       yield r
 
+# XXX: Either this is busted or the ExitPolicyRestriction is..
 class OrderedExitGenerator(NodeGenerator):
   def __init__(self, restriction_list, to_port):
     self.to_port = to_port
@@ -501,15 +500,17 @@
     self.mid_rstr.update_routers(new_rlist)
     self.exit_rstr.update_routers(new_rlist)
 
-class Circuit(TorCtl.Circuit):
-  def __init__(self, circuit): # Promotion constructor
-    # perf shortcut since we don't care about the 'circuit' 
-    # instance after this
-    self.__dict__ = circuit.__dict__
+class Circuit:
+  def __init__(self):
+    self.cid = 0
+    self.path = [] # routers
+    self.exit = None
     self.built = False
     self.detached_cnt = 0
-    self.created_at = datetime.datetime.now()
+    self.last_extended_at = time.time()
     self.pending_streams = [] # Which stream IDs are pending us
+  
+  def id_path(self): return map(lambda r: r.idhex, self.path)
 
 class Stream:
   def __init__(self, sid, host, port, kind):
@@ -520,7 +521,14 @@
     self.host = host
     self.port = port
     self.kind = kind
+    self.attached_at = 0
+    self.bytes_read = 0
+    self.bytes_written = 0
 
+  # XXX: Use event timestamps
+  def lifespan(self): return time.time()-self.attached_at
+  def write_bw(self): return self.bytes_written/self.lifespan()
+
 # TODO: Make passive "PathWatcher" so people can get aggregate 
 # node reliability stats for normal usage without us attaching streams
 
@@ -543,6 +551,7 @@
     self.num_circuits = 1
     self.RouterClass = RouterClass
     self.sorted_r = []
+    self.name_to_key = {}
     self.routers = {}
     self.circuits = {}
     self.streams = {}
@@ -551,13 +560,72 @@
     self.selmgr.reconfigure(self.sorted_r)
     self.imm_jobs = Queue.Queue()
     self.low_prio_jobs = Queue.Queue()
+    self.run_all_jobs = False
     self.do_reconfigure = False
     plog("INFO", "Read "+str(len(self.sorted_r))+"/"+str(len(nslist))+" routers")
 
+  def schedule_immediate(self, job):
+    """
+    Schedules an immediate job to be run before the next event is
+    processed.
+    """
+    self.imm_jobs.put(job)
+
+  def schedule_low_prio(self, job):
+    """
+    Schedules a job to be run when a non-time critical event arrives.
+    """
+    self.low_prio_jobs.put(job)
+
+  def schedule_selmgr(self, job):
+    """
+    Schedules an immediate job to be run before the next event is
+    processed. Also notifies the selection manager that it needs
+    to update itself.
+    """
+    def notlambda(this):
+      job(this.selmgr)
+      this.do_reconfigure = True
+    self.schedule_immediate(notlambda)
+
+     
+  def heartbeat_event(self, event):
+    while not self.imm_jobs.empty():
+      imm_job = self.imm_jobs.get_nowait()
+      imm_job(self)
+    
+    if self.do_reconfigure:
+      self.selmgr.reconfigure(self.sorted_r)
+      self.do_reconfigure = False
+    
+    if self.run_all_jobs:
+      self.run_all_jobs = False
+      while not self.low_prio_jobs.empty():
+        imm_job = self.low_prio_jobs.get_nowait()
+        imm_job(self)
+      return
+    
+    # If event is stream:NEW*/DETACHED or circ BUILT/FAILED, 
+    # don't run low prio jobs.. No need to delay streams or delay bandwidth
+    # counting for them.
+    if isinstance(event, TorCtl.CircuitEvent):
+      if event.status in ("BUILT", "FAILED", "EXTENDED"):
+        return
+    elif isinstance(event, TorCtl.StreamEvent):
+      if event.status in ("NEW", "NEWRESOLVE", "DETACHED", "FAILED", "CLOSED"):
+        return
+    
+    # Do the low prio jobs one at a time in case a 
+    # higher priority event is queued   
+    if not self.low_prio_jobs.empty():
+      delay_job = self.low_prio_jobs.get_nowait()
+      delay_job(self)
+
   def read_routers(self, nslist):
     routers = self.c.read_routers(nslist)
     new_routers = []
     for r in routers:
+      self.name_to_key[r.nickname] = "$"+r.idhex
       if r.idhex in self.routers:
         if self.routers[r.idhex].nickname != r.nickname:
           plog("NOTICE", "Router "+r.idhex+" changed names from "
@@ -567,7 +635,7 @@
         self.routers[r.idhex].update_to(r)
       else:
         rc = self.RouterClass(r)
-        self.routers[r.idhex] = rc
+        self.routers[rc.idhex] = rc
         new_routers.append(rc)
     self.sorted_r.extend(new_routers)
     self.sorted_r.sort(lambda x, y: cmp(y.bw, x.bw))
@@ -585,6 +653,7 @@
              +" pending streams")
           unattached_streams.extend(self.circuits[key].pending_streams)
         # FIXME: Consider actually closing circ if no streams.
+        # XXX: Circ chosen&failed count before doing this?
         del self.circuits[key]
       
     for circ in self.circuits.itervalues():
@@ -605,9 +674,9 @@
       while circ == None:
         self.selmgr.set_target(stream.host, stream.port)
         try:
-          circ = Circuit(self.c.build_circuit(
+          circ = self.c.build_circuit(
                   self.selmgr.pathlen,
-                  self.selmgr.path_selector))
+                  self.selmgr.path_selector)
         except TorCtl.ErrorReply, e:
           # FIXME: How come some routers are non-existant? Shouldn't
           # we have gotten an NS event to notify us they
@@ -621,53 +690,6 @@
       self.circuits[circ.cid] = circ
     self.last_exit = circ.exit
 
-
-  def schedule_immediate(self, job):
-    """
-    Schedules an immediate job to be run before the next event is
-    processed.
-    """
-    self.imm_jobs.put(job)
-
-  def schedule_low_prio(self, job):
-    """
-    Schedules a job to be run when a non-time critical event arrives.
-    """
-    self.low_prio_jobs.put(job)
-
-  def schedule_selmgr(self, job):
-    """
-    Schedules an immediate job to be run before the next event is
-    processed. Also notifies the selection manager that it needs
-    to update itself.
-    """
-    def notlambda(this):
-      job(this.selmgr)
-      this.do_reconfigure = True
-    self.schedule_immediate(notlambda)
-
-  def heartbeat_event(self, event):
-    while not self.imm_jobs.empty():
-      imm_job = self.imm_jobs.get_nowait()
-      imm_job(self)
-    
-    if self.do_reconfigure:
-      self.selmgr.reconfigure(self.sorted_r)
-      self.do_reconfigure = False
-    
-    # If event is stream:NEW*/DETACHED or circ BUILT/FAILED, 
-    # don't run low prio jobs.. No need to delay streams on them.
-    if isinstance(event, TorCtl.CircuitEvent):
-      if event.status in ("BUILT", "FAILED"): return
-    elif isinstance(event, TorCtl.StreamEvent):
-      if event.status in ("NEW", "NEWRESOLVE", "DETACHED"): return
-    
-    # Do the low prio jobs one at a time in case a 
-    # higher priority event is queued   
-    if not self.low_prio_jobs.empty():
-      delay_job = self.low_prio_jobs.get_nowait()
-      delay_job(self)
-
   def circ_status_event(self, c):
     output = [c.event_name, str(c.circ_id), c.status]
     if c.path: output.append(",".join(c.path))
@@ -678,7 +700,9 @@
     if c.circ_id not in self.circuits:
       plog("DEBUG", "Ignoring circ " + str(c.circ_id))
       return
-    if c.status == "FAILED" or c.status == "CLOSED":
+    if c.status == "EXTENDED":
+      self.circuits[c.circ_id].last_extended_at = time.time()
+    elif c.status == "FAILED" or c.status == "CLOSED":
       circ = self.circuits[c.circ_id]
       del self.circuits[c.circ_id]
       for stream in circ.pending_streams:
@@ -720,7 +744,6 @@
         plog("WARN", "Stream "+str(s.strm_id)+" detached from no circuit!")
       else:
         self.streams[s.strm_id].detached_from.append(s.circ_id)
-
       
       if self.streams[s.strm_id] in self.streams[s.strm_id].pending_circ.pending_streams:
         self.streams[s.strm_id].pending_circ.pending_streams.remove(self.streams[s.strm_id])
@@ -734,6 +757,7 @@
       self.streams[s.strm_id].circ = self.streams[s.strm_id].pending_circ
       self.streams[s.strm_id].circ.pending_streams.remove(self.streams[s.strm_id])
       self.streams[s.strm_id].pending_circ = None
+      self.streams[s.strm_id].attached_at = time.time()
     elif s.status == "FAILED" or s.status == "CLOSED":
       # FIXME stats
       if s.strm_id not in self.streams:
@@ -748,7 +772,8 @@
       # (FIXME: be careful about double stats)
       if s.status == "FAILED":
         # Avoid busted circuits that will not resolve or carry
-        # traffic. FIXME: Failed count before doing this?
+        # traffic. 
+        # XXX: Circ chosen&failed count before doing this?
         if s.circ_id in self.circuits: del self.circuits[s.circ_id]
         else: plog("WARN","Failed stream on unknown circ "+str(s.circ_id))
         return
@@ -767,7 +792,16 @@
         self.streams[s.strm_id].host = s.target_host
         self.streams[s.strm_id].port = s.target_port
 
-
+  def stream_bw_event(self, s):
+    output = [s.event_name, str(s.strm_id), str(s.bytes_read),
+              str(s.bytes_written)]
+    plog("DEBUG", " ".join(output))
+    if not s.strm_id in self.streams:
+      plog("WARN", "BW event for unknown stream id: "+str(s.strm_id))
+    else:
+      self.streams[s.strm_id].bytes_read += s.bytes_read
+      self.streams[s.strm_id].bytes_written += s.bytes_written
+ 
   def ns_event(self, n):
     self.read_routers(n.nslist)
     plog("DEBUG", "Read " + str(len(n.nslist))+" NS => " 
@@ -781,6 +815,8 @@
        + str(len(self.sorted_r)) + " routers")
     self.selmgr.update_routers(self.sorted_r)
 
+  def bandwidth_event(self, b): pass # For heartbeat only..
+
 ########################## Unit tests ##########################
 
 

Modified: torflow/trunk/TorCtl/TorCtl.py
===================================================================
--- torflow/trunk/TorCtl/TorCtl.py	2007-03-15 16:28:01 UTC (rev 9830)
+++ torflow/trunk/TorCtl/TorCtl.py	2007-03-15 18:58:51 UTC (rev 9831)
@@ -28,6 +28,7 @@
           CIRC="CIRC",
           STREAM="STREAM",
           ORCONN="ORCONN",
+          STREAM_BW="STREAM_BW",
           BW="BW",
           NS="NS",
           NEWDESC="NEWDESC",
@@ -111,18 +112,24 @@
     self.reason = reason
     self.ncircs = ncircs
 
+class StreamBwEvent:
+  def __init__(self, event_name, strm_id, read, written):
+    self.event_name = event_name
+    self.strm_id = int(strm_id)
+    self.bytes_read = int(read)
+    self.bytes_written = int(written)
+
 class LogEvent:
   def __init__(self, level, msg):
     self.event_name = self.level = level
     self.msg = msg
 
 class AddrMapEvent:
-  def __init__(self, event_name, from_addr, to_addr, when, by_exit):
+  def __init__(self, event_name, from_addr, to_addr, when):
     self.event_name = event_name
     self.from_addr = from_addr
     self.to_addr = to_addr
     self.when = when
-    self.by_exit = by_exit # XOXOXOX <3 ;) @ nickm
 
 class BWEvent:
   def __init__(self, event_name, read, written):
@@ -214,16 +221,6 @@
     plog("NOTICE", "No matching exit line for "+self.nickname)
     return False
    
-class Circuit:
-  def __init__(self):
-    self.cid = 0
-    self.created_at = 0 # time
-    self.path = [] # routers
-    self.exit = 0
-  
-  def id_path(self): return map(lambda r: r.idhex, self.path)
-
-
 class Connection:
   """A Connection represents a connection to the Tor process."""
   def __init__(self, sock):
@@ -289,7 +286,7 @@
         self._err(sys.exc_info())
         return
 
-      if isEvent:
+      if isEvent: # XXX: timestamp these, and pass timestamp to EventHandler
         if self._handler is not None:
           self._eventQueue.put(reply)
       else:
@@ -691,6 +688,7 @@
       "CIRC" : self.circ_status_event,
       "STREAM" : self.stream_status_event,
       "ORCONN" : self.or_conn_status_event,
+      "STREAM_BW" : self.stream_bw_event,
       "BW" : self.bandwidth_event,
       "DEBUG" : self.msg_event,
       "INFO" : self.msg_event,
@@ -763,6 +761,11 @@
       else: wrote = 0
       event = ORConnEvent(evtype, status, target, age, read, wrote,
                 reason, ncircs)
+    elif evtype == "STREAM_BW":
+      m = re.match(r"(\d+)\s+(\d+)\s+(\d+)", body)
+      if not m:
+        raise ProtocolError("STREAM_BW event misformatted.")
+      event = StreamBwEvent(evtype, *m.groups())
     elif evtype == "BW":
       m = re.match(r"(\d+)\s+(\d+)", body)
       if not m:
@@ -783,7 +786,7 @@
       else:
         when = time.localtime(
           time.strptime(when[1:-1], "%Y-%m-%d %H:%M:%S"))
-      event = AddrMapEvent(evtype, fromaddr, toaddr, when, "Unknown")
+      event = AddrMapEvent(evtype, fromaddr, toaddr, when)
     elif evtype == "NS":
       event = NetworkStatusEvent(evtype, parse_ns_body(data))
     else:
@@ -805,29 +808,24 @@
 
   def circ_status_event(self, event):
     """Called when a circuit status changes if listening to CIRCSTATUS
-       events.  'status' is a member of CIRC_STATUS; circID is a numeric
-       circuit ID, and 'path' is the circuit's path so far as a list of
-       names.
-    """
+       events."""
     raise NotImplemented()
 
   def stream_status_event(self, event):
     """Called when a stream status changes if listening to STREAMSTATUS
-       events.  'status' is a member of STREAM_STATUS; streamID is a
-       numeric stream ID, and 'target' is the destination of the stream.
-    """
+       events.  """
     raise NotImplemented()
 
+  def stream_bw_event(self, event):
+    raise NotImplemented()
+
   def or_conn_status_event(self, event):
     """Called when an OR connection's status changes if listening to
-       ORCONNSTATUS events. 'status' is a member of OR_CONN_STATUS; target
-       is the OR in question.
-    """
+       ORCONNSTATUS events."""
     raise NotImplemented()
 
   def bandwidth_event(self, event):
-    """Called once a second if listening to BANDWIDTH events.  'read' is
-       the number of bytes read; 'written' is the number of bytes written.
+    """Called once a second if listening to BANDWIDTH events.
     """
     raise NotImplemented()
 

Modified: torflow/trunk/TorCtl/TorUtil.py
===================================================================
--- torflow/trunk/TorCtl/TorUtil.py	2007-03-15 16:28:01 UTC (rev 9830)
+++ torflow/trunk/TorCtl/TorUtil.py	2007-03-15 18:58:51 UTC (rev 9831)
@@ -13,10 +13,11 @@
 import socket
 import binascii
 import sha
+import math
 
 __all__ = ["Enum", "Enum2", "quote", "escape_dots", "unescape_dots",
       "BufSock", "secret_to_key", "urandom_rng", "s2k_gen", "s2k_check",
-      "plog", "ListenSocket"]
+      "plog", "ListenSocket", "zprob"]
 
 class Enum:
   # Helper: define an ordered dense name-to-number 1-1 mapping.
@@ -191,3 +192,46 @@
     print level + ": " + msg
     sys.stdout.flush()
 
+# Stolen from
+# http://www.nmr.mgh.harvard.edu/Neural_Systems_Group/gary/python/stats.py
+def zprob(z):
+    """
+Returns the area under the normal curve 'to the left of' the given z value.
+Thus, 
+    for z<0, zprob(z) = 1-tail probability
+    for z>0, 1.0-zprob(z) = 1-tail probability
+    for any z, 2.0*(1.0-zprob(abs(z))) = 2-tail probability
+Adapted from z.c in Gary Perlman's |Stat.
+
+Usage:   lzprob(z)
+"""
+    Z_MAX = 6.0    # maximum meaningful z-value
+    if z == 0.0:
+        x = 0.0
+    else:
+        y = 0.5 * math.fabs(z)
+        if y >= (Z_MAX*0.5):
+            x = 1.0
+        elif (y < 1.0):
+            w = y*y
+            x = ((((((((0.000124818987 * w
+                        -0.001075204047) * w +0.005198775019) * w
+                      -0.019198292004) * w +0.059054035642) * w
+                    -0.151968751364) * w +0.319152932694) * w
+                  -0.531923007300) * w +0.797884560593) * y * 2.0
+        else:
+            y = y - 2.0
+            x = (((((((((((((-0.000045255659 * y
+                             +0.000152529290) * y -0.000019538132) * y
+                           -0.000676904986) * y +0.001390604284) * y
+                         -0.000794620820) * y -0.002034254874) * y
+                       +0.006549791214) * y -0.010557625006) * y
+                     +0.011630447319) * y -0.009279453341) * y
+                   +0.005353579108) * y -0.002141268741) * y
+                 +0.000535310849) * y +0.999936657524
+    if z > 0.0:
+        prob = ((x+1.0)*0.5)
+    else:
+        prob = ((1.0-x)*0.5)
+    return prob
+

Modified: torflow/trunk/metatroller.py
===================================================================
--- torflow/trunk/metatroller.py	2007-03-15 16:28:01 UTC (rev 9830)
+++ torflow/trunk/metatroller.py	2007-03-15 18:58:51 UTC (rev 9831)
@@ -11,11 +11,11 @@
 import traceback
 import re
 import random
-import datetime
 import threading
 import struct
 import copy
 import time
+import math
 from TorCtl import *
 from TorCtl.TorUtil import *
 from TorCtl.PathSupport import *
@@ -31,7 +31,7 @@
 max_detach = 3
 
 # Do NOT modify this object directly after it is handed to PathBuilder
-# Use PathBuilder.schedule_reconfigure instead.
+# Use PathBuilder.schedule_selmgr instead.
 # (Modifying the arguments here is OK)
 __selmgr = PathSupport.SelectionManager(
       pathlen=3,
@@ -44,7 +44,53 @@
       use_exit=None,
       use_guards=False)
 
+class BandwidthStats:
+  def __init__(self):
+    self.byte_list = []
+    self.duration_list = []
+    self.min_bw = 1e10
+    self.max_bw = 0
+    self.mean = 0
+    self.dev = 0
 
+  def _exp(self): # Weighted avg
+    tot_bw = reduce(lambda x, y: x+y, self.byte_list, 0.0)
+    EX = 0.0
+    for i in xrange(len(self.byte_list)):
+      EX += (self.byte_list[i]*self.byte_list[i])/self.duration_list[i]
+    if tot_bw == 0.0: return 0.0
+    EX /= tot_bw
+    return EX
+
+  def _exp2(self): # E[X^2]
+    tot_bw = reduce(lambda x, y: x+y, self.byte_list, 0.0)
+    EX = 0.0
+    for i in xrange(len(self.byte_list)):
+      EX += (self.byte_list[i]**3)/(self.duration_list[i]**2)
+    if tot_bw == 0.0: return 0.0
+    EX /= tot_bw
+    return EX
+    
+  def _dev(self): # Weighted dev
+    EX = self.mean
+    EX2 = self._exp2()
+    arg = EX2 - (EX*EX)
+    if arg < -0.05:
+      plog("WARN", "Diff of "+str(EX2)+" and "+str(EX)+"^2 is "+str(arg))
+    return math.sqrt(abs(arg))
+
+  def add_bw(self, bytes, duration):
+    if not bytes: plog("WARN", "No bytes for bandwidth")
+    bytes /= 1024.
+    self.byte_list.append(bytes)
+    self.duration_list.append(duration)
+    bw = bytes/duration
+    plog("DEBUG", "Got bandwidth "+str(bw))
+    if self.min_bw > bw: self.min_bw = bw
+    if self.max_bw < bw: self.max_bw = bw
+    self.mean = self._exp()
+    self.dev = self._dev()
+
 # Technically we could just add member vars as we need them, but this
 # is a bit more clear
 class StatsRouter(TorCtl.Router):
@@ -62,7 +108,7 @@
     self.strm_succeeded = 0
     self.strm_suspected = 0 # disjoint from failed (for verification only)
     self.strm_uncounted = 0
-    self.strm_chosen = 0 # above 3 should add to this
+    self.strm_chosen = 0 # above 4 should add to this
     self.reason_suspected = {}
     self.reason_failed = {}
     self.first_seen = time.time()
@@ -74,10 +120,24 @@
       self.hibernated_at = self.first_seen
     self.total_hibernation_time = 0
     self.total_active_uptime = 0
-    self.max_bw = 0
-    self.min_bw = 0
-    self.avg_bw = 0
+    self.total_extend_time = 0
+    self.total_extended = 0
+    self.bwstats = BandwidthStats()
+    self.z_ratio = 0
+    self.prob_zr = 0
+    self.z_bw = 0
+    self.prob_zb = 0
 
+  def avg_extend_time(self):
+    if self.total_extended:
+      return self.total_extend_time/self.total_extended
+    else: return 0
+
+  def bw_ratio(self):
+    bw = self.bwstats.mean
+    if bw == 0.0: return 0
+    else: return self.bw/(1024.*bw)
+
   def current_uptime(self):
     if self.became_active_at:
       ret = (self.total_active_uptime+(time.time()-self.became_active_at))
@@ -105,25 +165,41 @@
 
   def _succeeded_per_hour(self):
     return (3600.*(self.circ_succeeded+self.strm_succeeded))/self.current_uptime()
-    
+  
+  key = """Metatroller Statistics:
+  CC=Circuits Chosen   CF=Circuits Failed     CS=Circuit Suspected
+  SC=Streams Chosen    SF=Streams Failed      SS=Streams Suspected
+  FH=Failed per Hour   SH=Suspected per Hour  ET=avg circuit Extend Time (s)
+  EB=mean BW (K)       BD=BW std Dev (K)      BR=Ratio of observed to avg BW
+  ZB=BW z-test value   PB=Probability(z-bw)   ZR=Ratio z-test value
+  PR=Prob(z-ratio)     U=Uptime (h)\n"""
+
   def __str__(self):
-    return (self.idhex+" ("+self.nickname+")\n\t"
+    return (self.idhex+" ("+self.nickname+")\n"
+    +"   CC="+str(self.circ_chosen)
       +" CF="+str(self.circ_failed)
       +" CS="+str(self.circ_suspected+self.circ_failed)
-      +" CC="+str(self.circ_chosen)
+      +" SC="+str(self.strm_chosen)
       +" SF="+str(self.strm_failed)
       +" SS="+str(self.strm_suspected+self.strm_failed)
-      +" SC="+str(self.strm_chosen)
-      +" FH="+str(round(self.failed_per_hour(),2))
-      +" SH="+str(round(self.suspected_per_hour(),2))
-      +" Up="+str(round(self.current_uptime()/3600, 1))+"h\n")
+      +" FH="+str(round(self.failed_per_hour(),1))
+      +" SH="+str(round(self.suspected_per_hour(),1))+"\n"
+    +"   ET="+str(round(self.avg_extend_time(),1))
+      +" EB="+str(round(self.bwstats.mean,1))
+      +" BD="+str(round(self.bwstats.dev,1))
+      +" ZB="+str(round(self.z_bw,1))
+      +" PB="+(str(round(self.prob_zb,3))[1:])
+      +" BR="+str(round(self.bw_ratio(),1))
+      +" ZR="+str(round(self.z_ratio,1))
+      +" PR="+(str(round(self.prob_zr,3))[1:])
+      +" U="+str(round(self.current_uptime()/3600, 1))+"\n")
 
   def sanity_check(self):
-    if self.circ_failed + self.circ_succeeded + self.circ_suspected \
-      + self.circ_uncounted != self.circ_chosen:
+    if (self.circ_failed + self.circ_succeeded + self.circ_suspected
+      + self.circ_uncounted != self.circ_chosen):
       plog("ERROR", self.nickname+" does not add up for circs")
-    if self.strm_failed + self.strm_succeeded + self.strm_suspected \
-      + self.strm_uncounted != self.strm_chosen:
+    if (self.strm_failed + self.strm_succeeded + self.strm_suspected
+      + self.strm_uncounted != self.strm_chosen):
       plog("ERROR", self.nickname+" does not add up for streams")
     def check_reasons(reasons, expected, which, rtype):
       count = 0
@@ -147,7 +223,8 @@
          self._suspected_per_hour()+self._succeeded_per_hour(), 2)
     chosen_tot = round(self._chosen_per_hour(), 2)
     if per_hour_tot != chosen_tot:
-      plog("ERROR", self.nickname+" has mismatch of per hour counts: "+str(per_hour_tot) +" vs "+str(chosen_tot))
+      plog("ERROR", self.nickname+" has mismatch of per hour counts: "
+                    +str(per_hour_tot) +" vs "+str(chosen_tot))
 
 class ReasonRouterList:
   "Helper class to track which reasons are in which routers."
@@ -160,14 +237,15 @@
   def write_list(self, f):
     rlist = self.sort_list()
     for r in rlist:
+      susp = 0
       f.write(r.idhex+" ("+r.nickname+") Fail=")
       if self.reason in r.reason_failed:
-        f.write(str(r.reason_failed[self.reason]))
-      else: f.write("0")
+        susp = r.reason_failed[self.reason]
+      f.write(str(susp))
       f.write(" Susp=")
       if self.reason in r.reason_suspected:
-        f.write(str(r.reason_suspected[self.reason])+"\n")
-      else: f.write("0\n")
+        susp += r.reason_suspected[self.reason]
+      f.write(str(susp)+"\n")
     
   def add_r(self, r):
     self.rlist[r] = 1
@@ -221,23 +299,53 @@
     return reduce(lambda x, y: x + y.reason_failed[self.reason],
             self.rlist.iterkeys(), 0)
 
+
 class StatsHandler(PathSupport.PathBuilder):
   def __init__(self, c, slmgr):
     PathBuilder.__init__(self, c, slmgr, StatsRouter)
     self.failed_reasons = {}
     self.suspect_reasons = {}
 
+  def run_zbtest(self): # Unweighted z-test
+    n = reduce(lambda x, y: x+(y.bwstats.mean > 0), self.sorted_r, 0)
+    if n == 0: return
+    avg = reduce(lambda x, y: x+y.bwstats.mean, self.sorted_r, 0)/float(n)
+    def notlambda(x, y):
+      if y.bwstats.mean <= 0: return x+0
+      else: return x+(y.bwstats.mean-avg)*(y.bwstats.mean-avg)
+    stddev = math.sqrt(reduce(notlambda, self.sorted_r, 0)/float(n))
+    for r in self.sorted_r:
+      if r.bwstats.mean > 0:
+        r.z_bw = abs((r.bwstats.mean-avg)/stddev)
+        r.prob_zb = TorUtil.zprob(-r.z_bw)
+    return (avg, stddev)
+
+  def run_zrtest(self): # Unweighted z-test
+    n = reduce(lambda x, y: x+(y.bw_ratio() > 0), self.sorted_r, 0)
+    if n == 0: return
+    avg = reduce(lambda x, y: x+y.bw_ratio(), self.sorted_r, 0)/float(n)
+    def notlambda(x, y):
+      if y.bw_ratio() <= 0: return x+0
+      else: return x+(y.bw_ratio()-avg)*(y.bw_ratio()-avg)
+    stddev = math.sqrt(reduce(notlambda, self.sorted_r, 0)/float(n))
+    for r in self.sorted_r:
+      if r.bw_ratio() > 0:
+        r.z_ratio = abs((r.bw_ratio()-avg)/stddev)
+        r.prob_zr = TorUtil.zprob(-r.z_ratio)
+    return (avg, stddev)
+
   def write_reasons(self, f, reasons, name):
-    f.write("\n\n\t------------------- "+name+" -------------------\n")
+    f.write("\n\n\t----------------- "+name+" -----------------\n")
     for rsn in reasons:
       f.write("\nReason="+rsn.reason+". Failed: "+str(rsn.total_failed())
           +", Suspected: "+str(rsn.total_suspected())+"\n")
       rsn.write_list(f)
 
   def write_routers(self, f, rlist, name):
-    f.write("\n\n\t------------------- "+name+" -------------------\n\n")
+    f.write("\n\n\t----------------- "+name+" -----------------\n\n")
     for r in rlist:
-      f.write(str(r))
+      # only print it if we've used it.
+      if r.circ_chosen+r.strm_chosen > 0: f.write(str(r))
 
   def write_stats(self, filename):
     plog("DEBUG", "Writing stats")
@@ -259,7 +367,18 @@
     for rsn in self.suspect_reasons.itervalues(): rsn._verify_suspected()
 
     f = file(filename, "w")
+    f.write(StatsRouter.key)
+    (avg, dev) = self.run_zbtest()
+    f.write("\n\nBW stats: u="+str(round(avg,1))+" s="+str(round(dev,1))+"\n")
 
+    (avg, dev) = self.run_zrtest()
+    f.write("BW ratio stats: u="+str(round(avg,1)) +" s="+str(round(dev,1)))
+
+    # sort+print by bandwidth
+    bw_rate = copy.copy(self.sorted_r)
+    bw_rate.sort(lambda x, y: cmp(y.bw_ratio(), x.bw_ratio()))
+    self.write_routers(f, bw_rate, "Bandwidth Ratios")
+
     # FIXME: Print out key/legend header
     failed = copy.copy(self.sorted_r)
     failed.sort(lambda x, y:
@@ -274,18 +393,18 @@
     self.write_routers(f, suspected, "Suspected Counts")
 
     fail_rate = copy.copy(failed)
-    fail_rate.sort(lambda x, y:
-       cmp(y.failed_per_hour(), x.failed_per_hour()))
+    fail_rate.sort(lambda x, y: cmp(y.failed_per_hour(), x.failed_per_hour()))
     self.write_routers(f, fail_rate, "Fail Rates")
 
     suspect_rate = copy.copy(suspected)
     suspect_rate.sort(lambda x, y:
        cmp(y.suspected_per_hour(), x.suspected_per_hour()))
     self.write_routers(f, suspect_rate, "Suspect Rates")
-
+    
     # TODO: Sort by failed/selected and suspect/selected ratios
     # if we ever want to do non-uniform scanning..
 
+    # XXX: Add failed in here somehow..
     susp_reasons = self.suspect_reasons.values()
     susp_reasons.sort(lambda x, y:
        cmp(y.total_suspected(), x.total_suspected()))
@@ -297,9 +416,13 @@
     self.write_reasons(f, fail_reasons, "Failed Reasons")
     f.close()
 
+    # FIXME: sort+print by circ extend time
+
   def reset_stats(self):
-    for r in self.sorted_r:
-      r.reset()
+    plog("DEBUG", "Resetting stats")
+    self.suspect_reasons.clear()
+    self.failed_reasons.clear()
+    for r in self.sorted_r: r.reset()
 
   # TODO: Use stream bandwidth events to implement reputation system
   # from
@@ -316,7 +439,13 @@
       if c.remote_reason: rreason = c.remote_reason
       else: rreason = "NONE"
       reason = c.event_name+":"+c.status+":"+lreason+":"+rreason
-      if c.status == "FAILED":
+      if c.status == "EXTENDED":
+        delta = time.time() - self.circuits[c.circ_id].last_extended_at
+        r_ext = c.path[-1]
+        if r_ext[0] != '$': r_ext = self.name_to_key[r_ext]
+        self.routers[r_ext[1:]].total_extend_time += delta
+        self.routers[r_ext[1:]].total_extended += 1
+      elif c.status == "FAILED":
         # update selection count
         for r in self.circuits[c.circ_id].path: r.circ_chosen += 1
         
@@ -366,6 +495,7 @@
     PathBuilder.circ_status_event(self, c)
   
   def stream_status_event(self, s):
+    # XXX: Verify circ id matches stream.circ
     if s.strm_id in self.streams:
       # TODO: Hrmm, consider making this sane in TorCtl.
       if s.reason: lreason = s.reason
@@ -373,8 +503,8 @@
       if s.remote_reason: rreason = s.remote_reason
       else: rreason = "NONE"
       reason = s.event_name+":"+s.status+":"+lreason+":"+rreason+":"+self.streams[s.strm_id].kind
-      if s.status in ("DETACHED", "FAILED", "CLOSED", "SUCCEEDED") \
-          and not s.circ_id:
+      if (s.status in ("DETACHED", "FAILED", "CLOSED", "SUCCEEDED")
+          and not s.circ_id):
         # XXX: REMAPs can do this (normal). Also REASON=DESTROY (bug?)
         # Also timeouts.. Those should use the pending circ instead
         # of returning..
@@ -385,6 +515,17 @@
         # Update strm_chosen count
         # FIXME: use SENTRESOLVE/SENTCONNECT instead?
         for r in self.circuits[s.circ_id].path: r.strm_chosen += 1
+        
+        # Update bw stats
+        if self.streams[s.strm_id].attached_at:
+          if s.status == "DETACHED":
+            plog("WARN", str(s.strm_id)+" detached after succeeded")
+          lifespan = self.streams[s.strm_id].lifespan()
+          for r in self.streams[s.strm_id].circ.path:
+            r.bwstats.add_bw(self.streams[s.strm_id].bytes_written+
+                             self.streams[s.strm_id].bytes_read,
+                             lifespan)
+ 
         # Update failed count,reason_failed for exit
         r = self.circuits[s.circ_id].exit
         if not reason in r.reason_failed: r.reason_failed[reason] = 1
@@ -411,9 +552,19 @@
         # Always get both a closed and a failed.. 
         #   - Check if the circuit exists still
         # XXX: Save both closed and failed reason in stream object
+        #      and rely on a flag instead of this
         if s.circ_id in self.circuits:
           # Update strm_chosen count
           for r in self.circuits[s.circ_id].path: r.strm_chosen += 1
+
+          # Update bw stats
+          if self.streams[s.strm_id].attached_at:
+            lifespan = self.streams[s.strm_id].lifespan()
+            for r in self.streams[s.strm_id].circ.path:
+              r.bwstats.add_bw(self.streams[s.strm_id].bytes_written+
+                               self.streams[s.strm_id].bytes_read,
+                               lifespan)
+
           if lreason in ("TIMEOUT", "INTERNAL", "TORPROTOCOL" "DESTROY"):
             for r in self.circuits[s.circ_id].path[:-1]:
               r.strm_suspected += 1
@@ -428,7 +579,7 @@
               r.strm_uncounted += 1
             
           r = self.circuits[s.circ_id].exit
-          if lreason == "DONE":
+          if lreason == "DONE" or (lreason == "END" and rreason == "DONE"):
             r.strm_succeeded += 1
           else:
             if not reason in r.reason_failed:
@@ -491,8 +642,6 @@
       s.write("250 NEWNYM OK\r\n")
     elif command == "GETDNSEXIT":
       pass # TODO: Takes a hostname? Or prints most recent?
-    elif command == "RESETSTATS":
-      s.write("250 OK\r\n")
     elif command == "ORDEREXITS":
       try:
         if arg:
@@ -586,9 +735,15 @@
       h.schedule_low_prio(notlambda)
       s.write("250 OK\r\n")
     elif command == "RESETSTATS":
+      plog("DEBUG", "Got resetstats")
       def notlambda(this): this.reset_stats()
       h.schedule_low_prio(notlambda)
       s.write("250 OK\r\n")
+    elif command == "COMMIT":
+      plog("DEBUG", "Got commit")
+      def notlambda(this): this.run_all_jobs = True
+      h.schedule_immediate(notlambda)
+      s.write("250 OK\r\n")
     elif command == "HELP":
       s.write("250 OK\r\n")
     else:
@@ -619,8 +774,10 @@
   h = StatsHandler(c, __selmgr)
   c.set_event_handler(h)
   c.set_events([TorCtl.EVENT_TYPE.STREAM,
+          TorCtl.EVENT_TYPE.BW,
           TorCtl.EVENT_TYPE.NS,
           TorCtl.EVENT_TYPE.CIRC,
+          TorCtl.EVENT_TYPE.STREAM_BW,
           TorCtl.EVENT_TYPE.NEWDESC], True)
   c.set_option("__LeaveStreamsUnattached", "1")
   return (c,h)

Modified: torflow/trunk/soat.pl
===================================================================
--- torflow/trunk/soat.pl	2007-03-15 16:28:01 UTC (rev 9830)
+++ torflow/trunk/soat.pl	2007-03-15 18:58:51 UTC (rev 9831)
@@ -60,32 +60,6 @@
 my $LOG_LEVEL = "DEBUG";
 my %log_levels = ("DEBUG", 0, "INFO", 1, "NOTICE", 2, "WARN", 3, "ERROR", 4);
 
-
-my %mt_circ_sel_counts;
-my %mt_strm_sel_counts;
-my %mt_reason_counts;
-my %mt_fail_counts;
-my %mt_fail_totals; # actually 5 vars in metatroller
-
-my %mt_suspect_circ_sel_counts;
-my %mt_suspect_strm_sel_counts;
-my %mt_suspect_reason_counts;
-my %mt_suspect_counts;
-
-my %total_circ_sel_counts;
-my %total_strm_sel_counts;
-my %total_reason_counts;
-my %total_fail_counts;
-my %total_fail_totals; # actually 5 vars in metatroller
-
-my %total_suspect_circ_sel_counts;
-my %total_suspect_strm_sel_counts;
-my %total_suspect_counts;
-my %total_suspect_reason_counts;
-
-
-my %key_names;
-
 sub plog
 {
     my $level = shift;
@@ -681,321 +655,6 @@
     plog "INFO", "Checked all ssh hosts\n";
 }
 
-sub parse_failrates
-{
-    my $file = shift;
-    my $hash_fail = shift;
-    my $hash_circ = shift;
-    my $hash_strm = shift;
-    my $hash_totals = shift;
-
-    while(1) {
-        my $line = <$file>;
-        
-        if($line =~ /^250 FAILCOUNT/) {
-            $line =~ /FAILCOUNT ([\S]+) \(([\S]+)\) = ([\d]+)\//;
-            my $key = $1;
-            my $name = $2;
-            my $fail = $3;
-            # Hack because for reason failure lines there is an extra element
-            # (middle=total failures)
-            $line =~ /\/([\d]+)\+([\d]+)/;
-            my $circ_sel = $1;
-            my $strm_sel = $2;
-
-            if($key_names{$key} and (not $key_names{$key} eq $name)) {
-                plog "NOTICE", "$name for $key is not the same as $key_names{$key}\n";
-            }
-            $key_names{$key} = $name;
-            if($$hash_fail{$key}) {
-                $$hash_fail{$key} += $fail;
-            } else {
-                $$hash_fail{$key} = $fail;
-            }
-
-            if($hash_circ) {
-                if($$hash_circ{$key}) {
-                    $$hash_circ{$key} += $circ_sel;
-                } else {
-                    $$hash_circ{$key} = $circ_sel;
-                }
-            }
-            if($hash_strm) {
-                if($$hash_strm{$key}) {
-                    $$hash_strm{$key} += $strm_sel;
-                } else {
-                    $$hash_strm{$key} = $strm_sel;
-                }
-            }
-
-        } else {
-            if($hash_totals) {
-                if($line =~ /^250 FAILTOTALS ([\d]+)\/([\d]+) ([\d]+)\+([\d]+)\/([\d]+)/) {
-                    $$hash_totals{"CIRCUITFAILED"} = $1;
-                    $$hash_totals{"CIRCUITCOUNTS"} = $2;
-                    $$hash_totals{"STREAMDETACHED"} = $3;
-                    $$hash_totals{"STREAMFAILURES"} = $4;
-                    $$hash_totals{"STREAMCOUNTS"} = $5;
-                    plog "DEBUG", "Got fail totals\n";
-                }
-            }
-            last;
-        }
-    }
-}
-
-sub read_failrates
-{
-    my $mcp = shift;
-    my $file = shift;
-    my $cmd = shift;
-    my $mt_fcnt = shift;
-    my $tot_fcnt = shift; 
-    my $mt_circ_sel_cnt = shift;
-    my $tot_circ_sel_cnt = shift;
-    my $mt_strm_sel_cnt = shift;
-    my $tot_strm_sel_cnt = shift;
-    my $mt_totals = shift;
-    my $tot_totals = shift;
-    
-    return;
-    # Hack to avoid counting previous run
-    print $mcp "$cmd\r\n";
-    parse_failrates($mcp, $mt_fcnt, $mt_circ_sel_cnt, $mt_strm_sel_cnt, $mt_totals);
-
-    if(not open(FAILFILE, '<', $file)) {
-        return;
-    }
-
-    parse_failrates(*FAILFILE, $tot_fcnt, $tot_circ_sel_cnt, $tot_strm_sel_cnt, $tot_totals);
-    close(FAILFILE);
-}
-
-sub write_failrates
-{
-    my $mcp = shift;
-    my $file = shift;
-    my $cmd = shift;
-    my $mt_fcnt = shift;
-    my $tot_fcnt = shift;
-    my $mt_circ_sel_cnt = shift;
-    my $tot_circ_sel_cnt = shift;
-    my $mt_strm_sel_cnt = shift;
-    my $tot_strm_sel_cnt = shift;
-    my $mt_totals = shift;
-    my $tot_totals = shift;
-    my %fail_counts;
-    my %circ_counts;
-    my %strm_counts;
-    my %rate_totals;
-    return;
-
-    print $mcp "$cmd\r\n";
-    parse_failrates($mcp, \%fail_counts, \%circ_counts, \%strm_counts, \%rate_totals);
-
-    # Get delta
-    foreach(keys(%fail_counts)) {
-        if(not $$tot_fcnt{$_}) {
-            $$tot_circ_sel_cnt{$_} = 0;
-            $$tot_strm_sel_cnt{$_} = 0;
-            $$tot_fcnt{$_} = 0;
-        }
-
-        if(not $$mt_fcnt{$_}) {
-            $$mt_fcnt{$_} = 0;
-            $$mt_circ_sel_cnt{$_} = 0;
-            $$mt_strm_sel_cnt{$_} = 0;
-        }
-
-        # Update our totals only if the change is positive (ie no restart)
-        if(($fail_counts{$_} - $$mt_fcnt{$_}) > 0) {
-            $$tot_fcnt{$_} += ($fail_counts{$_} - $$mt_fcnt{$_});
-            $$tot_circ_sel_cnt{$_} += ($circ_counts{$_} - $$mt_circ_sel_cnt{$_});
-            $$tot_strm_sel_cnt{$_} += ($strm_counts{$_} - $$mt_strm_sel_cnt{$_});
-        }
-
-        # Store MT totals
-        $$mt_fcnt{$_} = $fail_counts{$_};
-        $$mt_circ_sel_cnt{$_} = $circ_counts{$_};
-        $$mt_strm_sel_cnt{$_} = $strm_counts{$_};
-    }
-
-    if($tot_totals) {
-        
-        if(($rate_totals{"STREAMCOUNTS"} - $$mt_totals{"STREAMCOUNTS"}) > 0) {
-
-            $$tot_totals{"CIRCUITFAILED"} += 
-                $rate_totals{"CIRCUITFAILED"} - $$mt_totals{"CIRCUITFAILED"};
-            $$tot_totals{"CIRCUITCOUNTS"} +=
-                $rate_totals{"CIRCUITCOUNTS"} - $$mt_totals{"CIRCUITCOUNTS"};
-            $$tot_totals{"STREAMDETACHED"} +=
-                $rate_totals{"STREAMDETACHED"} - $$mt_totals{"STREAMDETACHED"};
-            $$tot_totals{"STREAMFAILURES"} +=
-                $rate_totals{"STREAMFAILURES"} - $$mt_totals{"STREAMFAILURES"};
-            $$tot_totals{"STREAMCOUNTS"} +=
-                $rate_totals{"STREAMCOUNTS"} - $$mt_totals{"STREAMCOUNTS"};
-
-        }
-        $$mt_totals{"CIRCUITFAILED"} = $rate_totals{"CIRCUITFAILED"};
-        $$mt_totals{"CIRCUITCOUNTS"} = $rate_totals{"CIRCUITCOUNTS"};
-        $$mt_totals{"STREAMDETACHED"} = $rate_totals{"STREAMDETACHED"};
-        $$mt_totals{"STREAMFAILURES"} = $rate_totals{"STREAMFAILURES"};
-        $$mt_totals{"STREAMCOUNTS"} = $rate_totals{"STREAMCOUNTS"};
-
-    }
-
-    # Use global, not arg (which may be undef)
-    my @sorted_r = sort {
-        $$tot_fcnt{$b} <=> $$tot_fcnt{$a}
-    } keys %$tot_fcnt;
-
-    if(not open(FAILFILE, '>', $file)) {
-        die "Can't open $file to save fail rate table";
-    }
-
-    foreach(@sorted_r) {
-        print FAILFILE "250 FAILCOUNT $_ ($key_names{$_}) = $$tot_fcnt{$_}/$$tot_circ_sel_cnt{$_}+$$tot_strm_sel_cnt{$_}\n";
-    }
-
-    if($tot_totals) { 
-        print FAILFILE "250 FAILTOTALS " . $$tot_totals{"CIRCUITFAILED"} ."/". 
-            $$tot_totals{"CIRCUITCOUNTS"} ." ". $$tot_totals{"STREAMDETACHED"} ."+". 
-             $$tot_totals{"STREAMFAILURES"} ."/". $$tot_totals{"STREAMCOUNTS"} ." OK\n";
-    } else { 
-        print FAILFILE "250 OK\n";
-    }
-    
-    close(FAILFILE);
-    plog "INFO", "Updated failure counts\n";
-}
-
-
-sub parse_reasons
-{
-    my $file = shift;
-    my $hash = shift;
-
-    while( 1 ) {
-        my $line = <$file>;
-        my $reason;
-        if($line =~ /^250 REASON=([\S]+)/) {
-            $reason = $1;
-        } elsif($line =~ /^250 OK/) {
-            plog "DEBUG", "End reason parsing on $line\n";
-            return;
-        } else {
-            plog "NOTICE", "Weird end line $line\n";
-            return;
-        }
-
-        if(not $$hash{$reason}) {
-            $$hash{$reason} = {};
-        }
-
-        parse_failrates($file, $$hash{$reason});
-    }
-}
-
-sub read_reasons
-{
-    my $mcp = shift;
-    my $file = shift;
-    my $cmd = shift;
-    my $mt_rc = shift;
-    my $tot_rc = shift;
-    return;
-
-    # Hack to avoid double-counting
-    print $mcp "$cmd\r\n";
-    parse_reasons($mcp, $mt_rc);
-    
-    if(not open(FAILFILE, '<', $file)) {
-        return;
-    }
-
-    parse_reasons(*FAILFILE, $tot_rc);
-    close(FAILFILE);
-}
-
-sub write_reasons
-{
-    my $mcp = shift;
-    my $file = shift;
-    my $cmd = shift;
-    my $mt_rcnt = shift;
-    my $tot_rcnt = shift;
-    my $tot_fcnt = shift;
-    my $tot_circ_sel_cnt = shift;
-    my $tot_strm_sel_cnt = shift;
-    my %curr_reason_counts;
-
-    return;
-    print $mcp "$cmd\r\n";
-    parse_reasons($mcp, \%curr_reason_counts);
-
-    # Get delta
-    foreach(keys(%curr_reason_counts)) {
-        if(not $$tot_rcnt{$_}) {
-            $$tot_rcnt{$_} = {};
-        } 
-        if(not $$mt_rcnt{$_}) {
-            $$mt_rcnt{$_} = {};
-        } 
-        my $curr_hash = $curr_reason_counts{$_};
-        my $total_hash = $$tot_rcnt{$_};
-        my $mt_hash = $$mt_rcnt{$_};
-
-        foreach(keys(%$curr_hash)) {
-
-            if(not $$total_hash{$_}) {
-                $$total_hash{$_} = 0;
-            }
-
-            if(not $$mt_hash{$_}) {
-                $$mt_hash{$_} = 0;
-            }
-
-            # Update our totals 
-            if(($$curr_hash{$_} - $$mt_hash{$_}) > 0) {
-                $$total_hash{$_} += ($$curr_hash{$_} - $$mt_hash{$_});
-            }
-
-            # Store MT totals
-            $$mt_hash{$_} = $$curr_hash{$_};
-        }
-    }
-
-    my $failed_total = 0;
-    foreach(keys(%$tot_fcnt)) {
-        $failed_total += $$tot_fcnt{$_};
-    }
-
-    if(not open(FAILFILE, '>', $file)) {
-        die "Can't open $file to save fail rate table";
-    }
-
-    foreach(keys(%$tot_rcnt)) {
-        print FAILFILE "250 REASON=$_\r\n";
-        my $reason_hash = $$tot_rcnt{$_};
-        my $reason_total = 0;
-
-        my @sorted_r = sort {
-            $$reason_hash{$b} <=> $$reason_hash{$a}
-        } keys %$reason_hash;
-
-        foreach(@sorted_r) {
-            print FAILFILE "250 FAILCOUNT $_ ($key_names{$_}) = $$reason_hash{$_}/$$tot_fcnt{$_}/$$tot_circ_sel_cnt{$_}+$$tot_strm_sel_cnt{$_}\r\n";
-            $reason_total += $$reason_hash{$_};
-        }
-        print FAILFILE "250 REASONTOTAL $reason_total/$failed_total\r\n";
-    }
-    print FAILFILE "250 OK\r\n";
-   
-    close(FAILFILE);
-    plog "INFO", "Updated failure counts\n";
-}
-
-
 sub main
 {
     my $mcp = IO::Socket::INET->new(
@@ -1035,22 +694,6 @@
     delete $ENV{"HTTP_PROXY"};
     delete $ENV{"proxy"};
     delete $ENV{"PROXY"};
-   
-    read_failrates($mcp, $DOC_DIR . "/naive_fail_rates",
-            "FAILRATES", \%mt_fail_counts, \%total_fail_counts,
-            \%mt_circ_sel_counts, \%total_circ_sel_counts,
-            \%mt_strm_sel_counts, \%total_strm_sel_counts,
-            \%mt_fail_totals, \%total_fail_totals);
-    read_failrates($mcp, $DOC_DIR . "/suspected_rates",
-            "SUSPECTRATES", \%mt_suspect_counts, \%total_suspect_counts,
-            \%mt_suspect_circ_sel_counts, \%total_suspect_circ_sel_counts,
-            \%mt_suspect_strm_sel_counts, \%total_suspect_strm_sel_counts);
-    
-    read_reasons($mcp, $DOC_DIR . "/naive_fail_reasons", 
-            "FAILREASONS", \%mt_reason_counts, \%total_reason_counts);
-    read_reasons($mcp, $DOC_DIR . "/suspected_reasons", 
-            "SUSPECTREASONS", \%mt_suspect_reason_counts, 
-            \%total_suspect_reason_counts);
   
     if(is_in("urls", \@TO_SCAN)) {   
         @DOC_URLS = (@DOC_URLS, get_doc_list());
@@ -1089,25 +732,6 @@
         print $mcp "SAVESTATS\r\n";
         $line = <$mcp>;
         die "Error saving stats: $line" if (not $line =~ /^250/);
-
-        write_failrates($mcp, $DOC_DIR . "/naive_fail_rates", 
-                "FAILRATES", \%mt_fail_counts, \%total_fail_counts, 
-                \%mt_circ_sel_counts, \%total_circ_sel_counts,
-                \%mt_strm_sel_counts, \%total_strm_sel_counts,
-                \%mt_fail_totals, \%total_fail_totals);
-        write_failrates($mcp, $DOC_DIR . "/suspected_rates", 
-                "SUSPECTRATES", \%mt_suspect_counts, \%total_suspect_counts,
-                \%mt_suspect_circ_sel_counts, \%total_suspect_circ_sel_counts,
-                \%mt_suspect_strm_sel_counts, \%total_suspect_strm_sel_counts);
-        
-        write_reasons($mcp, $DOC_DIR . "/naive_fail_reasons",
-                "FAILREASONS", \%mt_reason_counts, \%total_reason_counts,
-                \%total_fail_counts, \%total_circ_sel_counts, \%total_strm_sel_counts);
-        write_reasons($mcp, $DOC_DIR . "/suspected_reasons",
-                "SUSPECTREASONS", \%mt_suspect_reason_counts,
-                \%total_suspect_reason_counts, \%total_suspect_counts,
-                \%total_suspect_circ_sel_counts,
-                \%total_suspect_strm_sel_counts);
     }
 }
 

Added: torflow/trunk/speedracer.pl
===================================================================
--- torflow/trunk/speedracer.pl	2007-03-15 16:28:01 UTC (rev 9830)
+++ torflow/trunk/speedracer.pl	2007-03-15 18:58:51 UTC (rev 9831)
@@ -0,0 +1,251 @@
+#!/usr/bin/perl -w
+
+
+use strict;
+use IO::Socket;
+use IO::Socket::INET;
+use Time::HiRes qw( usleep ualarm gettimeofday tv_interval );
+
+my $META_PORT = "9052";
+my $META_HOST = "127.0.0.1";
+
+my $USER_AGENT = "Mozilla/4.0 (compatible; MSIE 6.0; Windows NT 5.1; .NET CLR 1.0.3705; .NET CLR 1.1.4322)";
+
+my $SOCKS_PROXY = "127.0.0.1:9060";
+my $CURL_PROXY = "--socks5 $SOCKS_PROXY";
+
+# http://bitter.stalin.se/torfile
+# http://www.sigma.su.se/~who/torfile
+my $URL = "http://130.237.152.195/~who/torfile";; 
+my $COUNT = 400;
+my $START_PCT = 0;
+my $STOP_PCT = 21;
+my $PCT_STEP = 7;
+
+my $LOG_LEVEL = "DEBUG";
+my %log_levels = ("DEBUG", 0, "INFO", 1, "NOTICE", 2, "WARN", 3, "ERROR", 4);
+
+
+sub plog
+{
+    my $level = shift;
+    my $msg = shift;
+    my ($sec,$min,$hour,$mday,$mon,$year,$wday,$yday,$isdst) = localtime(time);
+
+    $year += 1900; # lame.
+    $mon += 1;
+    
+    print "$level \[" . localtime() . "\]: " . $msg if($log_levels{$level} >= $log_levels{$LOG_LEVEL})
+    #print "$level\[$year-$mon-$mday $hour:$min:$sec\]: " . $msg if($log_levels{$level} >= $log_levels{$LOG_LEVEL})
+}
+
+sub is_in
+{
+    my $element = shift;
+    my $ary = shift;
+    my $is_there = 0;
+    foreach (@$ary) {
+        if ($_ eq $element) {
+            $is_there = 1;
+            last;
+        }
+    }
+
+    return $is_there;
+}
+
+sub compare_arrays {
+    my ($first, $second) = @_;
+    no warnings;  # silence spurious -w undef complaints
+        return 0 unless @$first == @$second;
+    for (my $i = 0; $i < @$first; $i++) {
+        return 0 if $first->[$i] ne $second->[$i];
+    }
+    return 1;
+}
+
+sub query_exit
+{
+    my $mcp = shift;
+    my $line;
+    print $mcp "GETLASTEXIT\r\n";
+    $line = <$mcp>;
+    $line =~ /LASTEXIT=([\S]+)/;
+
+    return $1;    
+}
+
+
+sub speedrace
+{
+    my $mcp = shift;
+    my $skip = shift;
+    my $pct = shift;
+    my @build_times;
+    my @fetch_times;
+    my $tot_fetch_time = 0;
+    my $tot_build_time = 0;
+    my $i = 0;
+    my $line;
+
+    # Weak new-nym
+    print $mcp "PERCENTSKIP $skip\r\n";
+    $line = <$mcp>;
+    die "Error setting percentskip: $line" if (not $line =~ /^250/);
+
+    print $mcp "PERCENTFAST $pct\r\n";
+    $line = <$mcp>;
+    die "Error setting percentfast: $line" if (not $line =~ /^250/);
+
+    # So this is a really big hack. Since metatroller builds circuits on 
+    # the fly where as tor has a pool of pre-built circuits to use, 
+    # we want to get it to build a circuit for us but not count 
+    # that construction time. The way we do this is to issue
+    # a NEWNYM and then get the url TWICE. 
+
+    while($#build_times+1 < $COUNT) {
+        my $t0;
+        my $delta_build;
+        my $delta_fetch;
+        my $fetch_exit;
+        my $build_exit;
+        my $ret;
+
+        print $mcp "NEWNYM\r\n";
+        $line = <$mcp>;
+        die "Error sending NEWNYM: $line" if (not $line =~ /^250/);
+ 
+        # Build the circuit... 
+        do {
+            $i++;
+            
+            $t0 = [gettimeofday()];
+            $ret = 
+#                system("tsocks wget -U \"$USER_AGENT\" \'$URL\' -O - >& /dev/null");
+                system("curl $CURL_PROXY -m 600 -A \"$USER_AGENT\" \'$URL\' >& /dev/null");
+
+            if($ret == 2) {
+                plog "NOTICE", "wget got Sigint. Dying\n";
+                exit;
+            }
+            plog "NOTICE", "wget failed with ret=$ret.. Retrying...\n" 
+                if($ret != 0);
+            $delta_build = tv_interval $t0;
+            plog "NOTICE", "Timer exceeded limit: $delta_build\n"
+                if($delta_build >= 550.0);
+        } while($ret != 0 || $delta_build >= 550.0);
+
+        $build_exit = query_exit($mcp);
+
+        plog "DEBUG", "Got 1st via $build_exit\n";
+
+        # Now do it for real
+        
+        do {
+            $i++;
+            $t0 = [gettimeofday()];
+            $ret = 
+#                system("tsocks wget -U \"$USER_AGENT\" \'$URL\' -O - >& /dev/null");
+                system("curl $CURL_PROXY -m 600 -A \"$USER_AGENT\" \'$URL\' >& /dev/null");
+
+            if($ret == 2) {
+                plog "NOTICE", "wget got Sigint. Dying\n";
+                exit;
+            }
+            plog "NOTICE", "wget failed with ret=$ret.. Retrying with clock still running\n" 
+                if($ret != 0);
+            $delta_fetch = tv_interval $t0;
+            plog "NOTICE", "Timer exceeded limit: $delta_fetch\n"
+                if($delta_fetch >= 550.0);
+        } while($ret != 0 || $delta_fetch >= 550.0);
+
+        $fetch_exit = query_exit($mcp);
+
+        if($fetch_exit eq $build_exit) {
+            $tot_build_time += $delta_build;
+            push(@build_times, $delta_build);
+            plog "DEBUG", "$skip-$pct% circuit build+fetch took $delta_build for $fetch_exit\n";
+
+            push(@fetch_times, $delta_fetch);
+            $tot_fetch_time += $delta_fetch;
+            plog "DEBUG", "$skip-$pct% fetch took $delta_fetch for $fetch_exit\n";
+        } else {
+            plog "NOTICE", "Ignoring strange exit swap $build_exit -> $fetch_exit. Circuit failure?\n";
+        }
+    }
+    my $avg_build_time = $tot_build_time/($#build_times+1);
+    my $build_dev = 0;
+    foreach(@build_times) {
+        $build_dev += 
+            ($_ - $avg_build_time)*($_ - $avg_build_time);
+    }
+    $build_dev = sqrt($build_dev / ($#build_times+1));
+    
+    my $avg_fetch_time = $tot_fetch_time/($#fetch_times+1);
+    my $fetch_dev = 0;
+    foreach(@fetch_times) {
+        $fetch_dev += 
+            ($_ - $avg_fetch_time)*($_ - $avg_fetch_time);
+    }
+    $fetch_dev = sqrt($fetch_dev / ($#fetch_times+1));
+    plog "INFO", "RANGE $skip-$pct " . ($#build_times+1) . " build+fetches: avg=$avg_build_time, dev=$build_dev\n";
+    plog "INFO", "RANGE $skip-$pct " . ($#fetch_times+1) . " fetches: avg=$avg_fetch_time, dev=$fetch_dev\n";
+    plog "INFO", "  " . ($COUNT*2) . " fetches took $i tries\n";
+}
+
+sub main
+{
+    my $mcp = IO::Socket::INET->new(
+            Proto    => "tcp",
+            PeerAddr => $META_HOST,
+            PeerPort => $META_PORT)
+        or die "The Metatroller is not enabled";
+    my $line = <$mcp>;
+    $line = <$mcp>;
+
+    delete $ENV{"http_proxy"};
+    delete $ENV{"HTTP_PROXY"};
+    delete $ENV{"proxy"};
+    delete $ENV{"PROXY"};
+
+    print $mcp "GUARDNODES 0\r\n";
+    $line = <$mcp>;
+    die "Error setting Guard Nodes: $line" if (not $line =~ /^250/);
+
+    print $mcp "UNIFORM 1\r\n";
+    $line = <$mcp>;
+    die "Error setting UNIFORM: $line" if (not $line =~ /^250/);
+
+    print $mcp "ORDEREXITS 1\r\n";
+    $line = <$mcp>;
+    die "Error setting ORDEREXITS: $line" if (not $line =~ /^250/);
+
+    print $mcp "PATHLEN 2\r\n";
+    $line = <$mcp>;
+    die "Error setting PATHLEN: $line" if (not $line =~ /^250/);
+
+    my $pct = $START_PCT;
+    plog "INFO", "Beginning time loop\n";
+        
+    while($pct < $STOP_PCT) {
+        print $mcp "RESETSTATS\r\n";
+        $line = <$mcp>;
+        die "Error on RESETSTATS: $line" if (not $line =~ /^250/);
+        print $mcp "COMMIT\r\n";
+        $line = <$mcp>;
+        die "Error on COMMIT: $line" if (not $line =~ /^250/);
+        plog "DEBUG", "Reset stats\n";
+        speedrace($mcp, $pct, $pct+$PCT_STEP); 
+        plog "DEBUG", "speedroced\n";
+        print $mcp "SAVESTATS ./speed/stats-$pct:".($pct+$PCT_STEP)."\r\n";
+        $line = <$mcp>;
+        die "Error on SAVESTATS: $line" if (not $line =~ /^250/);
+        plog "DEBUG", "Wrote stats\n";
+        $pct += $PCT_STEP; 
+        print $mcp "COMMIT\r\n";
+        $line = <$mcp>;
+        die "Error on COMMIT: $line" if (not $line =~ /^250/);
+    }
+}
+
+main();


Property changes on: torflow/trunk/speedracer.pl
___________________________________________________________________
Name: svn:executable
   + *