[Author Prev][Author Next][Thread Prev][Thread Next][Author Index][Thread Index]

[or-cvs] r9753: Stats now sane, or at least are now verified to be self-cons (in torflow/trunk: . TorCtl)



Author: mikeperry
Date: 2007-03-07 00:37:10 -0500 (Wed, 07 Mar 2007)
New Revision: 9753

Modified:
   torflow/trunk/TorCtl/PathSupport.py
   torflow/trunk/TorCtl/TorCtl.py
   torflow/trunk/metatroller.py
   torflow/trunk/soat.pl
Log:
Stats now sane, or at least are now verified to be self-consistent in their
insanity. Getting closer to that big pot of data at the end of the rainbow...



Modified: torflow/trunk/TorCtl/PathSupport.py
===================================================================
--- torflow/trunk/TorCtl/PathSupport.py	2007-03-06 23:48:55 UTC (rev 9752)
+++ torflow/trunk/TorCtl/PathSupport.py	2007-03-07 05:37:10 UTC (rev 9753)
@@ -7,6 +7,7 @@
 import socket
 import copy
 import datetime
+import Queue
 from TorUtil import *
 
 __all__ = ["NodeRestrictionList", "PathRestrictionList",
@@ -418,18 +419,14 @@
       The methods are NOT threadsafe. They may ONLY be called from
       EventHandler's thread.
 
-      When updating the configuration, make a copy of this object, modify it,
-      and then submit it to PathBuilder.update_selmgr(). Since assignments
-      are atomic (GIL), this patten is threadsafe.
+      To update the selection manager, schedule a config update job
+      using PathBuilder.schedule_selmgr() with a worker function
+      to modify this object.
       """
-    def __init__(self, resolve_port, num_circuits, pathlen, order_exits,
+    def __init__(self, pathlen, order_exits,
                  percent_fast, percent_skip, min_bw, use_all_exits,
                  uniform, use_exit, use_guards):
         self.__ordered_exit_gen = None 
-        self.last_exit = None
-        self.new_nym = False
-        self.resolve_port = resolve_port
-        self.num_circuits = num_circuits
         self.pathlen = pathlen
         self.order_exits = order_exits
         self.percent_fast = percent_fast
@@ -441,10 +438,6 @@
         self.use_guards = use_guards
 
     def reconfigure(self, sorted_r):
-        """
-        Member variables from this funciton should not be modified by other
-        threads.
-        """
         if self.use_all_exits:
             self.path_rstr = PathRestrictionList([])
         else:
@@ -532,38 +525,57 @@
 # node reliability stats for normal usage without us attaching streams
 
 class PathBuilder(TorCtl.EventHandler):
+    """
+    PathBuilder implementation. Handles circuit construction, subject
+    to the constraints of the SelectionManager selmgr.
+    
+    Do not access this object from other threads. Instead, use the 
+    schedule_* functions to schedule work to be done in the thread
+    of the EventHandler.
+    """
     def __init__(self, c, selmgr, RouterClass):
         TorCtl.EventHandler.__init__(self)
         self.c = c
         nslist = c.get_network_status()
+        self.last_exit = None
+        self.new_nym = False
+        self.resolve_port = 0
+        self.num_circuits = 1
         self.RouterClass = RouterClass
         self.sorted_r = []
         self.routers = {}
         self.circuits = {}
         self.streams = {}
         self.read_routers(nslist)
-        self.selupdate = selmgr # other threads can fully examine this safely
-        self.selmgr = selmgr # other threads can read single values safely
+        self.selmgr = selmgr
         self.selmgr.reconfigure(self.sorted_r)
+        self.imm_jobs = Queue.Queue()
+        self.low_prio_jobs = Queue.Queue()
+        self.do_reconfigure = False
         plog("INFO", "Read "+str(len(self.sorted_r))+"/"+str(len(nslist))+" routers")
 
     def read_routers(self, nslist):
-        new_routers = map(self.RouterClass, self.c.read_routers(nslist))
-        for r in new_routers:
+        routers = self.c.read_routers(nslist)
+        new_routers = []
+        for r in routers:
             if r.idhex in self.routers:
                 if self.routers[r.idhex].nickname != r.nickname:
                     plog("NOTICE", "Router "+r.idhex+" changed names from "
                          +self.routers[r.idhex].nickname+" to "+r.nickname)
-                self.sorted_r.remove(self.routers[r.idhex])
-            self.routers[r.idhex] = r
+                # Must do IN-PLACE update to keep all the refs to this router
+                # valid and current (especially for stats)
+                self.routers[r.idhex].update_to(r)
+            else:
+                self.routers[r.idhex] = self.RouterClass(r)
+                new_routers.append(self.RouterClass(r))
         self.sorted_r.extend(new_routers)
         self.sorted_r.sort(lambda x, y: cmp(y.bw, x.bw))
 
     def attach_stream_any(self, stream, badcircs):
         # Newnym, and warn if not built plus pending
         unattached_streams = [stream]
-        if self.selmgr.new_nym:
-            self.selmgr.new_nym = False
+        if self.new_nym:
+            self.new_nym = False
             plog("DEBUG", "Obeying new nym")
             for key in self.circuits.keys():
                 if len(self.circuits[key].pending_streams):
@@ -606,20 +618,55 @@
                 u.pending_circ = circ
             circ.pending_streams.extend(unattached_streams)
             self.circuits[circ.cid] = circ
-        self.selmgr.last_exit = circ.exit
+        self.last_exit = circ.exit
 
-    #Relying on GIL for weak atomicity instead of locking. 
-    #http://effbot.org/pyfaq/can-t-we-get-rid-of-the-global-interpreter-lock.htm
-    #http://effbot.org/pyfaq/what-kinds-of-global-value-mutation-are-thread-safe.htm
-    def update_selmgr(self, selmgr):
-        "This is the _ONLY_ method that can be called from another thread"
-        self.selupdate = selmgr
 
+    def schedule_immediate(self, job):
+        """
+        Schedules an immediate job to be run before the next event is
+        processed.
+        """
+        self.imm_jobs.put(job)
+
+    def schedule_low_prio(self, job):
+        """
+        Schedules a job to be run when a non-time critical event arrives.
+        """
+        self.low_prio_jobs.put(job)
+
+    def schedule_selmgr(self, job):
+        """
+        Schedules an immediate job to be run before the next event is
+        processed. Also notifies the selection manager that it needs
+        to update itself.
+        """
+        def notlambda(this):
+            job(this.selmgr)
+            this.do_reconfigure = True
+        self.schedule_immediate(notlambda)
+
     def heartbeat_event(self, event):
-        if id(self.selupdate) != id(self.selmgr):
-            self.selmgr = self.selupdate
+        while not self.imm_jobs.empty():
+            imm_job = self.imm_jobs.get_nowait()
+            imm_job(self)
+        
+        if self.do_reconfigure:
             self.selmgr.reconfigure(self.sorted_r)
-    
+            self.do_reconfigure = False
+        
+        # If event is stream:NEW*/DETACHED or circ BUILT/FAILED, 
+        # don't run low prio jobs.. No need to delay streams on them.
+        if isinstance(event, TorCtl.CircuitEvent):
+            if event.status in ("BUILT", "FAILED"): return
+        elif isinstance(event, TorCtl.StreamEvent):
+            if event.status in ("NEW", "NEWRESOLVE", "DETACHED"): return
+        
+        # Do the low prio jobs one at a time in case a 
+        # higher priority event is queued   
+        if not self.low_prio_jobs.empty():
+            delay_job = self.low_prio_jobs.get_nowait()
+            delay_job(self)
+
     def circ_status_event(self, c):
         output = [c.event_name, str(c.circ_id), c.status]
         if c.path: output.append(",".join(c.path))
@@ -651,7 +698,7 @@
             s.target_host = "255.255.255.255" # ignore DNS for exit policy check
         if s.status == "NEW" or s.status == "NEWRESOLVE":
             if s.status == "NEWRESOLVE" and not s.target_port:
-                s.target_port = self.selmgr.resolve_port
+                s.target_port = self.resolve_port
             self.streams[s.strm_id] = Stream(s.strm_id, s.target_host, s.target_port, s.status)
 
             self.attach_stream_any(self.streams[s.strm_id],
@@ -660,7 +707,7 @@
             if s.strm_id not in self.streams:
                 plog("WARN", "Detached stream "+str(s.strm_id)+" not found")
                 self.streams[s.strm_id] = Stream(s.strm_id, s.target_host,
-                                            s.target_port)
+                                            s.target_port, "NEW")
             # FIXME Stats (differentiate Resolved streams also..)
             if not s.circ_id:
                 plog("WARN", "Stream "+str(s.strm_id)+" detached from no circuit!")
@@ -751,6 +798,10 @@
     c.authenticate()
     nslist = c.get_network_status()
     sorted_rlist = c.read_routers(c.get_network_status())
+    
+    for r in sorted_rlist:
+        if r.will_exit_to("211.11.21.22", 465):
+            print r.nickname+" "+str(r.bw)
 
     do_unit(PercentileRestriction(0, 100, sorted_rlist), sorted_rlist,
                   lambda r: "")

Modified: torflow/trunk/TorCtl/TorCtl.py
===================================================================
--- torflow/trunk/TorCtl/TorCtl.py	2007-03-06 23:48:55 UTC (rev 9752)
+++ torflow/trunk/TorCtl/TorCtl.py	2007-03-07 05:37:10 UTC (rev 9753)
@@ -188,16 +188,24 @@
         self.nickname = name
         self.bw = bw
         self.exitpolicy = exitpolicy
-        self.guard = "Guard" in flags
-        self.badexit = "BadExit" in flags
-        self.valid = "Valid" in flags
-        self.fast = "Fast" in flags
         self.flags = flags
         self.down = down
         self.ip = struct.unpack(">I", socket.inet_aton(ip))[0]
         self.version = RouterVersion(version)
         self.os = os
 
+    def update_to(self, new):
+        if self.idhex != new.idhex:
+            plog("ERROR", "Update of router "+self.nickname+"changes idhex!")
+        self.idhex = new.idhex
+        self.nickname = new.nickname
+        self.bw = new.bw
+        self.exitpolicy = new.exitpolicy
+        self.flags = new.flags
+        self.ip = new.ip
+        self.version = new.version
+        self.os = new.os
+
     def will_exit_to(self, ip, port):
         for line in self.exitpolicy:
             ret = line.check(ip, port)
@@ -288,7 +296,7 @@
                 if self._handler is not None:
                     self._eventQueue.put(reply)
             else:
-                cb = self._queue.get() # XXX: lock?
+                cb = self._queue.get() # atomic..
                 cb(reply)
 
     def _err(self, (tp, ex, tb), fromEventLoop=0):
@@ -352,7 +360,7 @@
                 condition.release()
 
         # Sends a message to Tor...
-        self._sendLock.acquire()
+        self._sendLock.acquire() # ensure queue+sendmsg is atomic
         try:
             self._queue.put(cb)
             sendFn(msg) # _doSend(msg)

Modified: torflow/trunk/metatroller.py
===================================================================
--- torflow/trunk/metatroller.py	2007-03-06 23:48:55 UTC (rev 9752)
+++ torflow/trunk/metatroller.py	2007-03-07 05:37:10 UTC (rev 9753)
@@ -30,9 +30,10 @@
 meta_port = 9052
 max_detach = 3
 
-selmgr = PathSupport.SelectionManager(
-            resolve_port=0,
-            num_circuits=1,
+# Do NOT modify this object directly after it is handed to PathBuilder
+# Use PathBuilder.schedule_reconfigure instead.
+# (Modifying the arguments here is OK)
+__selmgr = PathSupport.SelectionManager(
             pathlen=3,
             order_exits=False,
             percent_fast=100,
@@ -49,29 +50,73 @@
 class StatsRouter(TorCtl.Router):
     def __init__(self, router): # Promotion constructor :)
         self.__dict__ = router.__dict__
+        self.reset()
+    
+    def reset(self):
+        self.circ_uncounted = 0
         self.circ_failed = 0
         self.circ_succeeded = 0
         self.circ_suspected = 0
-        self.circ_selections = 0 # above 3 should add to this
+        self.circ_selections = 0 # above 4 should add to this
         self.strm_failed = 0 # Only exits should have these
         self.strm_succeeded = 0
         self.strm_suspected = 0
+        self.strm_uncounted = 0
         self.strm_selections = 0 # above 3 should add to this
         self.reason_suspected = {}
         self.reason_failed = {}
-        self.became_active_at = 0
+        self.first_seen = time.time()
+        if "Running" in self.flags:
+            self.became_active_at = self.first_seen
+            self.hibernated_at = 0
+        else:
+            self.became_active_at = 0
+            self.hibernated_at = self.first_seen
+        self.total_hibernation_time = 0
         self.total_active_uptime = 0
         self.max_bw = 0
         self.min_bw = 0
         self.avg_bw = 0
+    
+    def sanity_check(self):
+        if self.circ_failed + self.circ_succeeded + self.circ_suspected \
+            + self.circ_uncounted != self.circ_selections:
+            plog("ERROR", self.nickname+" does not add up for circs")
+        if self.strm_failed + self.strm_succeeded + self.strm_suspected \
+            + self.strm_uncounted != self.strm_selections:
+            plog("ERROR", self.nickname+" does not add up for streams")
+        def check_reasons(reasons, expected, which, rtype):
+            count = 0
+            for rs in reasons.iterkeys():
+                if re.search(r"^"+which, rs): count += reasons[rs]
+            if count != expected:
+                plog("ERROR", "Mismatch "+which+" "+rtype+" for "+self.nickname)
+        check_reasons(self.reason_suspected,self.strm_suspected,"STREAM","susp")
+        check_reasons(self.reason_suspected,self.circ_suspected,"CIRC","susp")
+        check_reasons(self.reason_failed,self.strm_failed,"STREAM","failed")
+        check_reasons(self.reason_failed,self.circ_failed,"CIRC","failed")
+        now = time.time()
+        tot_hib_time = self.total_hibernation_time
+        tot_uptime = self.total_active_uptime
+        if self.hibernated_at: tot_hib_time += now - self.hibernated_at
+        if self.became_active_at: tot_uptime += now - self.became_active_at
+        if round(tot_hib_time+tot_uptime) != round(now-self.first_seen):
+            plog("ERROR", "Mismatch of uptimes for "+self.nickname)
 
 class StatsHandler(PathSupport.PathBuilder):
     def __init__(self, c, slmgr):
         PathBuilder.__init__(self, c, slmgr, StatsRouter)
-    
-    def heartbeat_event(self, event):
-        PathBuilder.heartbeat_event(self, event)
 
+    def write_stats(self, filename):
+        # FIXME: Sort this by different values.
+        plog("DEBUG", "Writing stats")
+        for r in self.sorted_r:
+            r.sanity_check()
+
+    def reset_stats(self):
+        for r in self.sorted_r:
+            r.reset()
+
     # TODO: Use stream bandwidth events to implement reputation system
     # from
     # http://www.cs.colorado.edu/department/publications/reports/docs/CU-CS-1025-07.pdf
@@ -84,22 +129,27 @@
             else: lreason = "NONE"
             if c.remote_reason: rreason = c.remote_reason
             else: rreason = "NONE"
-            reason = c.status+":"+lreason+":"+rreason
+            reason = c.event_name+":"+c.status+":"+lreason+":"+rreason
             if c.status == "FAILED":
                 # update selection count
                 for r in self.circuits[c.circ_id].path: r.circ_selections += 1
+                
+                if len(c.path)-1 < 0: start_f = 0
+                else: start_f = len(c.path)-1 
 
                 # Count failed
-                for r in self.circuits[c.circ_id].path[len(c.path)-1:len(c.path)+1]:
+                for r in self.circuits[c.circ_id].path[start_f:len(c.path)+1]:
                     r.circ_failed += 1
-                    if not reason in r.reason_failed: r.reason_failed[reason] = 1
+                    if not reason in r.reason_failed:
+                        r.reason_failed[reason] = 1
                     else: r.reason_failed[reason]+=1
-                
+
+                for r in self.circuits[c.circ_id].path[len(c.path)+1:]:
+                    r.circ_uncounted += 1
+
                 # Don't count if failed was set this round, don't set 
                 # suspected..
-                if len(c.path)-2 < 0: end_susp = 0
-                else: end_susp = len(c.path)-2
-                for r in self.circuits[c.circ_id].path[:end_susp]:
+                for r in self.circuits[c.circ_id].path[:start_f]:
                     r.circ_suspected += 1
                     if not reason in r.reason_suspected:
                         r.reason_suspected[reason] = 1
@@ -108,12 +158,12 @@
                 # Since PathBuilder deletes the circuit on a failed, 
                 # we only get this for a clean close
                 # Update circ_selections count
-                for r in self.circuits[c.circ_id].path: r.circ_selections += 1
+                for r in self.circuits[c.circ_id].path:
+                    r.circ_selections += 1
                 
-                if lreason in ("REQUESTED", "FINISHED", "ORIGIN"):
-                    r.circ_succeeded += 1
-                else:
-                    for r in self.circuits[c.circ_id].path:
+                    if lreason in ("REQUESTED", "FINISHED", "ORIGIN"):
+                        r.circ_succeeded += 1
+                    else:
                         if not reason in r.reason_suspected:
                             r.reason_suspected[reason] = 1
                         else: r.reason_suspected[reason] += 1
@@ -127,7 +177,7 @@
             else: lreason = "NONE"
             if s.remote_reason: rreason = s.remote_reason
             else: rreason = "NONE"
-            reason = s.status+":"+lreason+":"+rreason+":"+self.streams[s.strm_id].kind
+            reason = s.event_name+":"+s.status+":"+lreason+":"+rreason+":"+self.streams[s.strm_id].kind
             if s.status in ("DETACHED", "FAILED", "CLOSED", "SUCCEEDED") \
                     and not s.circ_id:
                 plog("WARN", "Stream "+str(s.strm_id)+" detached from no circuit!")
@@ -150,16 +200,23 @@
                         if not reason in r.reason_suspected:
                             r.reason_suspected[reason] = 1
                         else: r.reason_suspected[reason]+=1
+                else:
+                    for r in self.circuits[s.circ_id].path[:-1]:
+                        r.strm_uncounted += 1
             elif s.status == "CLOSED":
                 # Always get both a closed and a failed.. 
                 #   - Check if the circuit exists still
                 if s.circ_id in self.circuits:
-                    if lreason in ("TIMEOUT", "INTERNAL", "TORPROTOCOL", "DESTROY"):
+                    if lreason in ("TIMEOUT", "INTERNAL", "TORPROTOCOL" "DESTROY"):
                         for r in self.circuits[s.circ_id].path[:-1]:
                             r.strm_suspected += 1
                             if not reason in r.reason_suspected:
                                 r.reason_suspected[reason] = 1
                             else: r.reason_suspected[reason]+=1
+                    else:
+                        for r in self.circuits[s.circ_id].path[:-1]:
+                            r.strm_uncounted += 1
+                        
                     r = self.circuits[s.circ_id].exit
                     if lreason == "DONE":
                         r.strm_succeeded += 1
@@ -176,16 +233,21 @@
 
     def ns_event(self, n):
         PathBuilder.ns_event(self, n)
+        now = time.time()
         for ns in n.nslist:
             if not ns.idhex in self.routers:
                 continue
+            r = self.routers[ns.idhex]
             if "Running" in ns.flags:
-                if not self.routers[ns.idhex].became_active_at:
-                    self.routers[ns.idhex].became_active_at = time.time()
+                if not r.became_active_at:
+                    r.became_active_at = now
+                    r.total_hibernation_time += now - r.hibernated_at
+                r.hibernated_at = 0
             else:
-                self.routers[ns.idhex].total_active_uptime += \
-                    (time.time() - self.routers[ns.idhex].became_active_at)
-                self.routers[ns.idhex].became_active_at = 0
+                if not r.hibernated_at:
+                    r.hibernated_at = now
+                    r.total_active_uptime += now - r.became_active_at
+                r.became_active_at = 0
                 
 
 def clear_dns_cache(c):
@@ -206,14 +268,17 @@
             continue
         (command, arg) = m.groups()
         if command == "GETLASTEXIT":
-            # local assignment avoids need for lock w/ GIL:
-            le = h.selmgr.last_exit
-            s.write("250 LASTEXIT=$"+le.idhex.upper()+" ("+le.nickname+") OK\r\n")
+            # local assignment avoids need for lock w/ GIL
+            # http://effbot.org/pyfaq/can-t-we-get-rid-of-the-global-interpreter-lock.htm
+            # http://effbot.org/pyfaq/what-kinds-of-global-value-mutation-are-thread-safe.htm
+            le = h.last_exit
+            if le:
+                s.write("250 LASTEXIT=$"+le.idhex.upper()+" ("+le.nickname+") OK\r\n")
+            else:
+                s.write("250 LASTEXIT=0 (0) OK\r\n")
         elif command == "NEWEXIT" or command == "NEWNYM":
             clear_dns_cache(c)
-            newmgr = copy.copy(h.selupdate)
-            newmgr.new_nym = True
-            h.update_selmgr(newmgr)
+            h.new_nym = True # GIL hack
             plog("DEBUG", "Got new nym")
             s.write("250 NEWNYM OK\r\n")
         elif command == "GETDNSEXIT":
@@ -224,9 +289,8 @@
             try:
                 if arg:
                     order_exits = int(arg)
-                    newmgr = copy.copy(h.selupdate)
-                    newmgr.order_exits = order_exits
-                    h.update_selmgr(newmgr)
+                    def notlambda(sm): sm.order_exits=order_exits
+                    h.schedule_selmgr(notlambda)
                 s.write("250 ORDEREXITS="+str(order_exits)+" OK\r\n")
             except ValueError:
                 s.write("510 Integer expected\r\n")
@@ -234,9 +298,8 @@
             try:
                 if arg:
                     use_all_exits = int(arg)
-                    newmgr = copy.copy(h.selupdate)
-                    newmgr.use_all_exits = use_all_exits
-                    h.update_selmgr(newmgr)
+                    def notlambda(sm): sm.use_all_exits=use_all_exits
+                    h.schedule_selmgr(notlambda)
                 s.write("250 USEALLEXITS="+str(use_all_exits)+" OK\r\n")
             except ValueError:
                 s.write("510 Integer expected\r\n")
@@ -244,9 +307,8 @@
             try:
                 if arg:
                     num_circuits = int(arg)
-                    newmgr = copy.copy(h.selupdate)
-                    newmgr.num_circuits = num_circuits
-                    h.update_selmgr(newmgr)
+                    def notlambda(pb): pb.num_circuits=num_circuits
+                    h.schedule_immediate(notlambda)
                 s.write("250 PRECIRCUITS="+str(num_circuits)+" OK\r\n")
             except ValueError:
                 s.write("510 Integer expected\r\n")
@@ -254,9 +316,8 @@
             try:
                 if arg:
                     resolve_port = int(arg)
-                    newmgr = copy.copy(h.selupdate)
-                    newmgr.resolve_port = resolve_port
-                    h.update_selmgr(newmgr)
+                    def notlambda(pb): pb.resolve_port=resolve_port
+                    h.schedule_immediate(notlambda)
                 s.write("250 RESOLVEPORT="+str(resolve_port)+" OK\r\n")
             except ValueError:
                 s.write("510 Integer expected\r\n")
@@ -264,9 +325,8 @@
             try:
                 if arg:
                     percent_fast = int(arg)
-                    newmgr = copy.copy(h.selupdate)
-                    newmgr.percent_fast = percent_fast
-                    h.update_selmgr(newmgr)
+                    def notlambda(sm): sm.percent_fast=percent_fast
+                    h.schedule_selmgr(notlambda)
                 s.write("250 PERCENTFAST="+str(percent_fast)+" OK\r\n")
             except ValueError:
                 s.write("510 Integer expected\r\n")
@@ -274,9 +334,8 @@
             try:
                 if arg:
                     percent_skip = int(arg)
-                    newmgr = copy.copy(h.selupdate)
-                    newmgr.percent_skip = percent_skip
-                    h.update_selmgr(newmgr)
+                    def notlambda(sm): sm.percent_skip=percent_skip
+                    h.schedule_selmgr(notlambda)
                 s.write("250 PERCENTSKIP="+str(percent_skip)+" OK\r\n")
             except ValueError:
                 s.write("510 Integer expected\r\n")
@@ -284,9 +343,8 @@
             try:
                 if arg:
                     min_bw = int(arg)
-                    newmgr = copy.copy(h.selupdate)
-                    newmgr.min_bw = min_bw
-                    h.update_selmgr(newmgr)
+                    def notlambda(sm): sm.min_bw=min_bw
+                    h.schedule_selmgr(notlambda)
                 s.write("250 BWCUTOFF="+str(min_bw)+" OK\r\n")
             except ValueError:
                 s.write("510 Integer expected\r\n")
@@ -296,19 +354,32 @@
             try:
                 if arg:
                     pathlen = int(arg)
-                    newmgr = copy.copy(h.selupdate)
-                    newmgr.pathlen = pathlen
-                    h.update_selmgr(newmgr)
+                    # Technically this doesn't need a full selmgr update.. But
+                    # the user shouldn't be changing it very often..
+                    def notlambda(sm): sm.pathlen=pathlen
+                    h.schedule_selmgr(notlambda)
                 s.write("250 PATHLEN="+str(pathlen)+" OK\r\n")
             except ValueError:
                 s.write("510 Integer expected\r\n")
         elif command == "SETEXIT":
-            s.write("250 OK\r\n")
+            if arg:
+                # XXX: Hrmm.. if teh user is a dumbass, this will fail silently
+                def notlambda(sm): sm.exit_name=arg
+                h.schedule_selmgr(notlambda)
+                s.write("250 OK\r\n")
+            else:
+                s.write("510 Argument expected\r\n")
         elif command == "GUARDNODES":
             s.write("250 OK\r\n")
         elif command == "SAVESTATS":
+            if arg: filename = arg
+            else: filename = "./data/stats-"+time.strftime("20%y-%m-%d-%H:%M:%S")
+            def notlambda(this): this.write_stats(filename)
+            h.schedule_low_prio(notlambda)
             s.write("250 OK\r\n")
         elif command == "RESETSTATS":
+            def notlambda(this): this.reset_stats()
+            h.schedule_low_prio(notlambda)
             s.write("250 OK\r\n")
         elif command == "HELP":
             s.write("250 OK\r\n")
@@ -338,7 +409,7 @@
     c = PathSupport.Connection(s)
     c.debug(file("control.log", "w"))
     c.authenticate()
-    h = StatsHandler(c, selmgr)
+    h = StatsHandler(c, __selmgr)
     c.set_event_handler(h)
     c.set_events([TorCtl.EVENT_TYPE.STREAM,
                   TorCtl.EVENT_TYPE.NS,

Modified: torflow/trunk/soat.pl
===================================================================
--- torflow/trunk/soat.pl	2007-03-06 23:48:55 UTC (rev 9752)
+++ torflow/trunk/soat.pl	2007-03-07 05:37:10 UTC (rev 9753)
@@ -19,7 +19,7 @@
 #baseline md5s of html
 my $SOCKS_PROXY = "127.0.0.1:9060";
 
-my @TO_SCAN = ("ssl");
+my @TO_SCAN = ("ssl", "urls");
 my $ALLOW_NEW_SSL_IPS = 1;
 
 # doc and ppt may also be good ones to check.. They are frequently vulnerable
@@ -1065,6 +1065,9 @@
         plog "NOTICE", "Final URL List:\n " . join("\n ", @DOC_URLS) . "\n\n";
     }
     plog "INFO", "Beginning scan loop\n";
+    print $mcp "SAVESTATS\r\n";
+    $line = <$mcp>;
+    die "Error saving stats: $line" if (not $line =~ /^250/);
         
     while(1) {
         my $scan = $TO_SCAN[int(rand(@TO_SCAN))];
@@ -1083,6 +1086,9 @@
         } elsif($scan eq "ssh") {
             openssh_check_all($mcp, "./known_hosts");
         }
+        print $mcp "SAVESTATS\r\n";
+        $line = <$mcp>;
+        die "Error saving stats: $line" if (not $line =~ /^250/);
 
         write_failrates($mcp, $DOC_DIR . "/naive_fail_rates", 
                 "FAILRATES", \%mt_fail_counts, \%total_fail_counts,