[Author Prev][Author Next][Thread Prev][Thread Next][Author Index][Thread Index]

[or-cvs] r9587: Found a bug in TorCtl revolving around multi-line events get (torflow/trunk)



Author: mikeperry
Date: 2007-02-14 17:25:41 -0500 (Wed, 14 Feb 2007)
New Revision: 9587

Modified:
   torflow/trunk/TorCtl.py
   torflow/trunk/TorUtil.py
   torflow/trunk/metatroller.py
   torflow/trunk/soat.pl
Log:
Found a bug in TorCtl revolving around multi-line events getting attached to
the OK response from a command, which caused everything to hang. I believe
this bug was present in the original TorCtl, so I decided to cave to Nick's
request to make TorCtl events forward-compatible (using classes, not dicts). 

Maybe now I can guilt him into doing ADDRMAP SOURCE for me ;)



Modified: torflow/trunk/TorCtl.py
===================================================================
--- torflow/trunk/TorCtl.py	2007-02-14 16:46:55 UTC (rev 9586)
+++ torflow/trunk/TorCtl.py	2007-02-14 22:25:41 UTC (rev 9587)
@@ -51,14 +51,88 @@
     "Raised when Tor controller returns an error"
     pass
 
-class NotImplemented(TorCtlError):
-    "Raised when user doesn't implement EventHandler event"
-    pass
-
 class NetworkStatus:
     "Filled in during NS events"
-    pass
+    def __init__(self, nickname, idhash, orhash, updated, ip, orport, dirport, flags):
+        self.nickname = nickname
+        self.idhash = idhash
+        self.orhash = orhash
+        self.ip = ip
+        self.orport = int(orport)
+        self.dirport = int(dirport)
+        self.flags = flags
+        self.idhex = (self.idhash + "=").decode("base64").encode("hex")
+        m = re.search(r"(\d+)-(\d+)-(\d+) (\d+):(\d+):(\d+)", updated)
+        self.updated = datetime.datetime(*map(int, m.groups()))
 
+class NetworkStatusEvent:
+    def __init__(self, event_name, nslist):
+        self.event_name = event_name
+        self.nslist = nslist # List of NetworkStatus objects
+
+class NewDescEvent:
+    def __init__(self, event_name, idlist):
+        self.event_name = event_name
+        self.idlist = idlist
+
+class CircuitEvent:
+    def __init__(self, event_name, circ_id, status, path, reason,
+                 remote_reason):
+        self.event_name = event_name
+        self.circ_id = circ_id
+        self.status = status
+        self.path = path
+        self.reason = reason
+        self.remote_reason = remote_reason
+
+class StreamEvent:
+    def __init__(self, event_name, strm_id, status, circ_id, target_host,
+                 target_port, reason, remote_reason):
+        self.event_name = event_name
+        self.strm_id = strm_id
+        self.status = status
+        self.circ_id = circ_id
+        self.target_host = target_host
+        self.target_port = int(target_port)
+        self.reason = reason
+        self.remote_reason = remote_reason
+
+class ORConnEvent:
+    def __init__(self, event_name, status, router_name, age, read_bytes,
+                 wrote_bytes, reason, ncircs):
+        self.event_name = event_name
+        self.status = status
+        self.router_name = router_name
+        self.age = age
+        self.read_bytes = read_bytes
+        self.wrote_bytes = wrote_bytes
+        self.reason = reason
+        self.ncircs = ncircs
+
+class LogEvent:
+    def __init__(self, level, msg):
+        self.event_name = self.level = level
+        self.msg = msg
+
+class AddrMapEvent:
+    def __init__(self, event_name, from_addr, to_addr, when, by_exit):
+        self.event_name = event_name
+        self.from_addr = from_addr
+        self.to_addr = to_addr
+        self.when = when
+        self.by_exit = by_exit # XOXOXOX <3 ;) @ nickm
+
+class BWEvent:
+    def __init__(self, event_name, read, written):
+        self.event_name = event_name
+        self.read = read
+        self.written = written
+
+class UnknownEvent:
+    def __init__(self, event_name, event_string):
+        self.event_name = event_name
+        self.event_string = event_string
+
 class NodeSelector:
     "Interface for node selection policies"
     def __init__(self, target_ip, target_port):
@@ -195,7 +269,6 @@
            as events or as responses to other commands.
         """
         while 1:
-            ex = None
             try:
                 isEvent, reply = self._read_reply()
             except:
@@ -256,7 +329,6 @@
         # Here's where the result goes...
         result = []
 
-        plog("DEBUG", "Sending: "+msg)
         if self._closedEx is not None:
             raise self._closedEx
         elif self._closed:
@@ -274,7 +346,7 @@
         self._sendLock.acquire()
         try:
             self._queue.put(cb)
-            sendFn(msg)
+            sendFn(msg) # _doSend(msg)
         finally:
             self._sendLock.release()
 
@@ -328,12 +400,17 @@
                 more = []
                 while 1:
                     line = self._s.readline()
-                    if self._debugFile and tp != "+":
-                        self._debugFile.write("    %s" % line)
+                    if self._debugFile:
+                        self._debugFile.write("+++ %s" % line)
                     if line in (".\r\n", ".\n"):
                         break
                     more.append(line)
                 lines.append((code, s, unescape_dots("".join(more))))
+                isEvent = (lines and lines[0][0][0] == '6')
+                if isEvent: # Need "250 OK" if it's not an event. Otherwise, end
+                    return (isEvent, lines)
+
+        # XXX: Notreached
         isEvent = (lines and lines[0][0][0] == '6')
         return (isEvent, lines)
 
@@ -429,8 +506,8 @@
         exitpolicy = []
         dead = not ("Running" in ns.flags)
         bw_observed = 0
-        if router != ns.name:
-            plog("NOTICE", "Got different names " + ns.name + " vs " +
+        if router != ns.nickname:
+            plog("NOTICE", "Got different names " + ns.nickname + " vs " +
                          router + " for " + ns.idhex)
         for line in desc:
             ac = re.search(r"^accept (\S+):([^-]+)(?:-(\d+))?", line)
@@ -445,8 +522,8 @@
             elif bw:
                 bw_observed = int(bw.group(1))
         if not bw_observed and not dead and ("Valid" in ns.flags):
-            plog("NOTICE", "No bandwidth for live router " + ns.name)
-        return Router(ns.idhex, ns.name, bw_observed, exitpolicy, dead,
+            plog("NOTICE", "No bandwidth for live router " + ns.nickname)
+        return Router(ns.idhex, ns.nickname, bw_observed, exitpolicy, dead,
                 ("Guard" in ns.flags), ("Valid" in ns.flags),
                 ("BadExit" in ns.flags), ("Fast" in ns.flags))
 
@@ -576,17 +653,11 @@
     nsgroups.pop(0)
     nslist = []
     for nsline in nsgroups:
-        ns = NetworkStatus()
-        m = re.match(r"(\S+)\s(\S+)\s(\S+)\s(\S+\s\S+)\s(\S+)\s(\d+)\s(\d+)", nsline)
-        ns.name,ns.idhash,ns.orhash,updated,ns.ip,ns.orport,ns.dirport = m.groups()
-        ns.idhex = (ns.idhash + "=").decode("base64").encode("hex")
-        ns.orport,ns.dirport = map(int, (ns.orport,ns.dirport))
-        m = re.search(r"(\d+)-(\d+)-(\d+) (\d+):(\d+):(\d+)", updated)
-        ns.updated = datetime.datetime(*map(int, m.groups()))
         m = re.search(r"^s((?:\s\S*)+)", nsline, re.M)
         flags = m.groups()
-        ns.flags = flags[0].strip().split(" ")
-        nslist.append(ns)
+        flags = flags[0].strip().split(" ")
+        m = re.match(r"(\S+)\s(\S+)\s(\S+)\s(\S+\s\S+)\s(\S+)\s(\d+)\s(\d+)", nsline)
+        nslist.append(NetworkStatus(*(m.groups() + (flags,))))
     return nslist
 
 class EventHandler:
@@ -611,8 +682,8 @@
     def handle1(self, lines):
         """Dispatcher: called from Connection when an event is received."""
         for code, msg, data in lines:
-            evtype, args = self.decode1(msg, data)
-            self._map1.get(evtype, self.unknown_event)(evtype, *args)
+            event = self.decode1(msg, data)
+            self._map1.get(event.event_name, self.unknown_event)(event)
 
     def decode1(self, body, data):
         """Unpack an event message into a type/arguments-tuple tuple."""
@@ -638,7 +709,8 @@
                 path = []
             if reason: reason = reason[8:]
             if remote: remote = remote[15:]
-            args = ident, status, path, reason, remote
+            event = CircuitEvent(evtype, ident, status, path, reason,
+                                       remote)
         elif evtype == "STREAM":
             m = re.match(r"(\S+)\s+(\S+)\s+(\S+)\s+(\S+):(\d+)(\s\S+)?(\s\S+)?", body)
             if not m:
@@ -647,7 +719,8 @@
             ident,circ = map(int, (ident,circ))
             if reason: reason = reason[8:]
             if remote: remote = remote[15:]
-            args = ident, status, circ, target_host, int(target_port), reason, remote
+            event = StreamEvent(evtype, ident, status, circ, target_host,
+                                      int(target_port), reason, remote)
         elif evtype == "ORCONN":
             m = re.match(r"(\S+)\s+(\S+)(\s\S+)?(\s\S+)?(\s\S+)?(\s\S+)?(\s\S+)?", body)
             if not m:
@@ -655,14 +728,13 @@
             target, status, age, read, wrote, reason, ncircs = m.groups()
 
             # XXX: Special hacks for bandwidth stat research
-            if status == "READ": 
+            if status == "READ":
                 read = " READ=" + str(age)
                 age = 0
             if status == "WRITE":
                 wrote = " WRITTEN=" + str(age)
                 age = 0
 
-
             if reason and not ncircs:
                 if "NCIRCS=" in reason:
                     ncircs = reason
@@ -676,17 +748,18 @@
             else: read = 0
             if wrote: wrote = int(wrote[9:])
             else: wrote = 0
-            args = status, target, age, read, wrote, reason, ncircs
+            event = ORConnEvent(evtype, status, target, age, read, wrote,
+                                reason, ncircs)
         elif evtype == "BW":
             m = re.match(r"(\d+)\s+(\d+)", body)
             if not m:
                 raise ProtocolError("BANDWIDTH event misformatted.")
             read, written = map(long, m.groups())
-            args = read, written
+            event = BWEvent(evtype, read, written)
         elif evtype in ("DEBUG", "INFO", "NOTICE", "WARN", "ERR"):
-            args = evtype, body
+            event = LogEvent(evtype, body)
         elif evtype == "NEWDESC":
-            args = (body.split(" "),)
+            event = NewDescEvent(evtype, body.split(" "))
         elif evtype == "ADDRMAP":
             m = re.match(r'(\S+)\s+(\S+)\s+(\"[^"]+\"|\w+)', body)
             if not m:
@@ -697,21 +770,21 @@
             else:
                 when = time.localtime(
                     time.strptime(when[1:-1], "%Y-%m-%d %H:%M:%S"))
-            args = fromaddr, toaddr, when
+            event = AddrMapEvent(evtype, fromaddr, toaddr, when, "Unknown")
         elif evtype == "NS":
-            args = (parse_ns_body(data),)
+            event = NetworkStatusEvent(evtype, parse_ns_body(data))
         else:
-            args = (body,)
+            event = UnknownEvent(evtype, body)
 
-        return evtype, args
+        return event
 
-    def unknown_event(self, eventtype, evtype, *args):
+    def unknown_event(self, event):
         """Called when we get an event type we don't recognize.  This
            is almost alwyas an error.
         """
         raise NotImplemented()
 
-    def circ_status(self, eventtype, circID, status, path, reason, remote):
+    def circ_status(self, event):
         """Called when a circuit status changes if listening to CIRCSTATUS
            events.  'status' is a member of CIRC_STATUS; circID is a numeric
            circuit ID, and 'path' is the circuit's path so far as a list of
@@ -719,42 +792,41 @@
         """
         raise NotImplemented()
 
-    def stream_status(self, eventtype, streamID, status, circID, target_host, target_port, reason, remote):
+    def stream_status(self, event):
         """Called when a stream status changes if listening to STREAMSTATUS
            events.  'status' is a member of STREAM_STATUS; streamID is a
            numeric stream ID, and 'target' is the destination of the stream.
         """
         raise NotImplemented()
 
-    def or_conn_status(self, eventtype, status, target, age, read, wrote, 
-                       reason, ncircs):
+    def or_conn_status(self, event):
         """Called when an OR connection's status changes if listening to
            ORCONNSTATUS events. 'status' is a member of OR_CONN_STATUS; target
            is the OR in question.
         """
         raise NotImplemented()
 
-    def bandwidth(self, eventtype, read, written):
+    def bandwidth(self, event):
         """Called once a second if listening to BANDWIDTH events.  'read' is
            the number of bytes read; 'written' is the number of bytes written.
         """
         raise NotImplemented()
 
-    def new_desc(self, eventtype, identities):
+    def new_desc(self, event):
         """Called when Tor learns a new server descriptor if listenting to
            NEWDESC events.
         """
         raise NotImplemented()
 
-    def msg(self, eventtype, severity, message):
+    def msg(self, event):
         """Called when a log message of a given severity arrives if listening
            to INFO_MSG, NOTICE_MSG, WARN_MSG, or ERR_MSG events."""
         raise NotImplemented()
 
-    def ns(self, eventtype, nslist):
+    def ns(self, event):
         raise NotImplemented()
 
-    def address_mapped(self, eventtype, fromAddr, toAddr, expiry=None):
+    def address_mapped(self, event):
         """Called when Tor adds a mapping for an address if listening
            to ADDRESSMAPPED events.
         """
@@ -763,43 +835,56 @@
 
 class DebugEventHandler(EventHandler):
     """Trivial debug event handler: reassembles all parsed events to stdout."""
-    def circ_status(self, eventtype, circID, status, path, reason, remote):
-        output = [eventtype, str(circID), status]
-        if path: output.append(",".join(path))
-        if reason: output.append("REASON=" + reason)
-        if remote: output.append("REMOTE_REASON=" + remote)
+    def circ_status(self, circ_event): # CircuitEvent()
+        output = [circ_event.event_name, str(circ_event.circ_id),
+                  circ_event.status]
+        if circ_event.path:
+            output.append(",".join(circ_event.path))
+        if circ_event.reason:
+            output.append("REASON=" + circ_event.reason)
+        if circ_event.remote_reason:
+            output.append("REMOTE_REASON=" + circ_event.remote_reason)
         print " ".join(output)
 
-    def stream_status(self, eventtype, streamID, status, circID, target_host, target_port, reason, remote):
-        output = [eventtype, str(streamID), status, str(circID), target_host,
-str(target_port)]
-        if reason: output.append("REASON=" + reason)
-        if remote: output.append("REMOTE_REASON=" + remote)
+    def stream_status(self, strm_event):
+        output = [strm_event.event_name, str(strm_event.strm_id),
+                  strm_event.status, str(strm_event.circ_id),
+                  strm_event.target_host, str(strm_event.target_port)]
+        if strm_event.reason:
+            output.append("REASON=" + strm_event.reason)
+        if strm_event.remote_reason:
+            output.append("REMOTE_REASON=" + strm_event.remote_reason)
         print " ".join(output)
 
-    def ns(self, eventtype, nslist):
-        for ns in nslist:
-            print " ".join((eventtype, ns.name, ns.idhash,
+    def ns(self, ns_event):
+        for ns in ns_event.nslist:
+            print " ".join((ns_event.event_name, ns.nickname, ns.idhash,
               ns.updated.isoformat(), ns.ip, str(ns.orport),
               str(ns.dirport), " ".join(ns.flags)))
 
-    def new_desc(self, eventtype, identities):
-        print " ".join((eventtype, " ".join(identities)))
+    def new_desc(self, newdesc_event):
+        print " ".join((newdesc_event.event_name, " ".join(newdesc_event.idlist)))
    
-    def or_conn_status(self, eventtype, status, target, age, read, wrote,
-                       reason, ncircs):
-        if age: age = "AGE="+str(age)
+    def or_conn_status(self, orconn_event):
+        if orconn_event.age: age = "AGE="+str(orconn_event.age)
         else: age = ""
-        if read: read = "READ="+str(read)
+        if orconn_event.read_bytes: read = "READ="+str(orconn_event.read_bytes)
         else: read = ""
-        if wrote: wrote = "WRITTEN="+str(wrote)
+        if orconn_event.wrote_bytes: wrote = "WRITTEN="+str(orconn_event.wrote_bytes)
         else: wrote = ""
-        if reason: reason = "REASON="+reason
+        if orconn_event.reason: reason = "REASON="+orconn_event.reason
         else: reason = ""
-        if ncircs: ncircs = "NCIRCS="+str(ncircs)
+        if orconn_event.ncircs: ncircs = "NCIRCS="+str(orconn_event.ncircs)
         else: ncircs = ""
-        print " ".join((eventtype, target, status, age, read, wrote, reason, ncircs))
+        print " ".join((orconn_event.event_name, orconn_event.router_name,
+                        orconn_event.status, age, read, wrote, reason, ncircs))
 
+    def msg(self, log_event):
+        print log_event.event_name+" "+log_event.msg
+    
+    def bandwidth(self, bw_event):
+        print bw_event.event_name+" "+str(bw_event.read)+" "+str(bw_event.written)
+
 def parseHostAndPort(h):
     """Given a string of the form 'address:port' or 'address' or
        'port' or '', return a two-tuple of (address, port)
@@ -861,18 +946,17 @@
 #    c.set_events([EVENT_TYPE.ORCONN], True)
     c.set_events([EVENT_TYPE.STREAM, EVENT_TYPE.CIRC,
                   EVENT_TYPE.NS, EVENT_TYPE.NEWDESC,
-                  EVENT_TYPE.ORCONN], True)
+                  EVENT_TYPE.ORCONN, EVENT_TYPE.BW], True)
 
     th.join()
     return
 
 if __name__ == '__main__':
-    import socket
     if len(sys.argv) > 2:
         print "Syntax: TorControl.py torhost:torport"
         sys.exit(0)
     else:
-        sys.argv.append("localhost:9061")
+        sys.argv.append("localhost:9051")
     sh,sp = parseHostAndPort(sys.argv[1])
     run_example(sh,sp)
 

Modified: torflow/trunk/TorUtil.py
===================================================================
--- torflow/trunk/TorUtil.py	2007-02-14 16:46:55 UTC (rev 9586)
+++ torflow/trunk/TorUtil.py	2007-02-14 22:25:41 UTC (rev 9587)
@@ -9,15 +9,9 @@
 
 import os
 import re
-import struct
 import sys
-import threading
-import Queue
-import datetime
-import traceback
 import socket
 import binascii
-import types
 import sha
 
 __all__ = ["Enum", "Enum2", "quote", "escape_dots", "unescape_dots",
@@ -195,4 +189,5 @@
 def plog(level, msg): # XXX: Timestamps
     if(loglevels[level] >= loglevels[loglevel]):
         print level + ": " + msg
+        sys.stdout.flush()
 

Modified: torflow/trunk/metatroller.py
===================================================================
--- torflow/trunk/metatroller.py	2007-02-14 16:46:55 UTC (rev 9586)
+++ torflow/trunk/metatroller.py	2007-02-14 22:25:41 UTC (rev 9587)
@@ -28,7 +28,6 @@
 circuits = {} # map from ID # to circuit object
 streams = {} # map from stream id to circuit
 
-
 version = "0.1.0-dev"
 
 # TODO: Move these to config file
@@ -91,7 +90,7 @@
 #  - Restrictors (puts self.r_is_ok() into list):
 #    - Subnet16
 #    - AvoidWastingExits
-#    - VersionRange (Less than, greater than, in-range)
+#    - VersionRange (Less than, greater than, in-range, not-equal)
 #    - OSSelector (ex Yes: Linux, *BSD; No: Windows, Solaris)
 #    - OceanPhobicRestrictor (avoids Pacific Ocean or two atlantic crossings)
 #      or ContinentRestrictor (avoids doing more than N continent crossings)
@@ -183,8 +182,8 @@
     bad_key = 0
     for ns in nslist:
         try:
-            key_to_name[ns.idhex] = ns.name
-            name_to_key[ns.name] = ns.idhex
+            key_to_name[ns.idhex] = ns.nickname
+            name_to_key[ns.nickname] = ns.idhex
             r = MetaRouter(c.get_router(ns))
             if ns.idhex in routers:
                 if routers[ns.idhex].name != r.name:
@@ -196,7 +195,7 @@
         except TorCtl.ErrorReply:
             bad_key += 1
             if "Running" in ns.flags:
-                plog("NOTICE", "Running router "+ns.name+"="
+                plog("NOTICE", "Running router "+ns.nickname+"="
                      +ns.idhex+" has no descriptor")
             pass
         except:
@@ -241,112 +240,140 @@
         for circ in circuits.itervalues():
             if circ.built and circ.cid not in badcircs:
                 if circ.exit.will_exit_to(stream.host, stream.port):
-                    self.c.attach_stream(stream.sid, circ.cid)
-                    stream.pending_circ = circ # Only one stream possible here
-                    circ.pending_streams.append(stream)
+                    try:
+                        self.c.attach_stream(stream.sid, circ.cid)
+                        stream.pending_circ = circ # Only one possible here
+                        circ.pending_streams.append(stream)
+                    except TorCtl.ErrorReply, e:
+                        # No need to retry here. We should get the failed
+                        # event for either the circ or stream next
+                        plog("NOTICE", "Error attaching stream: "+str(e.args))
+                        return
                     break
         else:
-            circ = MetaCircuit(self.c.build_circuit(pathlen,
-                                 UniformSelector(stream.host, stream.port)))
+            circ = None
+            while circ == None:
+                try:
+                    circ = MetaCircuit(self.c.build_circuit(pathlen,
+                                    UniformSelector(stream.host, stream.port)))
+                except TorCtl.ErrorReply, e:
+                    # FIXME: How come some routers are non-existant? Shouldn't
+                    # we have gotten an NS event to notify us they
+                    # disappeared?
+                    plog("NOTICE", "Error building circ: "+str(e.args))
             for u in unattached_streams:
-                plog("DEBUG", "Attach pending build: "+str(u.sid))
+                plog("DEBUG",
+                     "Attaching "+str(u.sid)+" pending build of "+str(circ.cid))
                 u.pending_circ = circ
             circ.pending_streams.extend(unattached_streams)
             circuits[circ.cid] = circ
         global last_exit # Last attempted exit
         last_exit = circ.exit
 
-    def circ_status(self, eventtype, circID, status, path, reason, remote):
-        output = [eventtype, str(circID), status]
-        if path: output.append(",".join(path))
-        if reason: output.append("REASON=" + reason)
-        if remote: output.append("REMOTE_REASON=" + remote)
+    def circ_status(self, c):
+        output = [c.event_name, str(c.circ_id), c.status]
+        if c.path: output.append(",".join(c.path))
+        if c.reason: output.append("REASON=" + c.reason)
+        if c.remote_reason: output.append("REMOTE_REASON=" + c.remote_reason)
         plog("DEBUG", " ".join(output))
         # Circuits we don't control get built by Tor
-        if circID not in circuits:
-            plog("DEBUG", "Ignoring circ " + str(circID))
+        if c.circ_id not in circuits:
+            plog("DEBUG", "Ignoring circ " + str(c.circ_id))
             return
-        if status == "FAILED" or status == "CLOSED":
-            circ = circuits[circID]
-            del circuits[circID]
+        if c.status == "FAILED" or c.status == "CLOSED":
+            circ = circuits[c.circ_id]
+            del circuits[c.circ_id]
             for stream in circ.pending_streams:
                 plog("DEBUG", "Finding new circ for " + str(stream.sid))
                 self.attach_stream_any(stream, stream.detached_from)
-        elif status == "BUILT":
-            circuits[circID].built = True
-            for stream in circuits[circID].pending_streams:
-                self.c.attach_stream(stream.sid, circID)
-                circuits[circID].used_cnt += 1
+        elif c.status == "BUILT":
+            circuits[c.circ_id].built = True
+            for stream in circuits[c.circ_id].pending_streams:
+                self.c.attach_stream(stream.sid, c.circ_id)
+                circuits[c.circ_id].used_cnt += 1
 
-    def stream_status(self, eventtype, streamID, status, circID, target_host, target_port, reason, remote):
-        output = [eventtype, str(streamID), status, str(circID), target_host,
-                  str(target_port)]
-        if reason: output.append("REASON=" + reason)
-        if remote: output.append("REMOTE_REASON=" + remote)
+    def stream_status(self, s):
+        output = [s.event_name, str(s.strm_id), s.status, str(s.circ_id),
+                  s.target_host, str(s.target_port)]
+        if s.reason: output.append("REASON=" + s.reason)
+        if s.remote_reason: output.append("REMOTE_REASON=" + s.remote_reason)
         plog("DEBUG", " ".join(output))
-        if not re.match(r"\d+.\d+.\d+.\d+", target_host):
-            target_host = "255.255.255.255" # ignore DNS for exit policy check
-        if status == "NEW" or status == "NEWRESOLVE":
+        if not re.match(r"\d+.\d+.\d+.\d+", s.target_host):
+            s.target_host = "255.255.255.255" # ignore DNS for exit policy check
+        if s.status == "NEW" or s.status == "NEWRESOLVE":
             global circuits
-            streams[streamID] = Stream(streamID, target_host, target_port)
+            streams[s.strm_id] = Stream(s.strm_id, s.target_host, s.target_port)
 
-            self.attach_stream_any(streams[streamID],
-                                   streams[streamID].detached_from)
-        elif status == "DETACHED":
+            self.attach_stream_any(streams[s.strm_id],
+                                   streams[s.strm_id].detached_from)
+        elif s.status == "DETACHED":
             global circuits
-            if streamID not in streams:
-                plog("WARN", "Detached stream "+str(streamID)+" not found")
-                streams[streamID] = Stream(streamID, target_host, target_port)
+            if s.strm_id not in streams:
+                plog("WARN", "Detached stream "+str(s.strm_id)+" not found")
+                streams[s.strm_id] = Stream(s.strm_id, s.target_host,
+                                            s.target_port)
             # FIXME Stats (differentiate Resolved streams also..)
-            if not circID:
-                plog("WARN", "Stream "+str(streamID)+" detached from no circuit!")
+            if not s.circ_id:
+                plog("WARN", "Stream "+str(s.strm_id)+" detached from no circuit!")
             else:
-                streams[streamID].detached_from.append(circID)
+                streams[s.strm_id].detached_from.append(s.circ_id)
 
             
-            if streams[streamID] in streams[streamID].pending_circ.pending_streams:
-                streams[streamID].pending_circ.pending_streams.remove(streams[streamID])
-            streams[streamID].pending_circ = None
-            self.attach_stream_any(streams[streamID],
-                                   streams[streamID].detached_from)
-        elif status == "SUCCEEDED":
-            if streamID not in streams:
-                plog("NOTICE", "Succeeded stream "+str(streamID)+" not found")
+            if streams[s.strm_id] in streams[s.strm_id].pending_circ.pending_streams:
+                streams[s.strm_id].pending_circ.pending_streams.remove(streams[s.strm_id])
+            streams[s.strm_id].pending_circ = None
+            self.attach_stream_any(streams[s.strm_id],
+                                   streams[s.strm_id].detached_from)
+        elif s.status == "SUCCEEDED":
+            if s.strm_id not in streams:
+                plog("NOTICE", "Succeeded stream "+str(s.strm_id)+" not found")
                 return
-            streams[streamID].circ = streams[streamID].pending_circ
-            streams[streamID].circ.pending_streams.remove(streams[streamID])
-            streams[streamID].pending_circ = None
-            streams[streamID].circ.used_cnt += 1
-        elif status == "FAILED" or status == "CLOSED":
+            streams[s.strm_id].circ = streams[s.strm_id].pending_circ
+            streams[s.strm_id].circ.pending_streams.remove(streams[s.strm_id])
+            streams[s.strm_id].pending_circ = None
+            streams[s.strm_id].circ.used_cnt += 1
+        elif s.status == "FAILED" or s.status == "CLOSED":
             # FIXME stats
-            if status == "FAILED": # We get failed and closed for each stream
+            if s.strm_id not in streams:
+                plog("NOTICE", "Failed stream "+str(s.strm_id)+" not found")
                 return
-            if streamID not in streams:
-                plog("NOTICE", "Failed stream "+str(streamID)+" not found")
+
+            if not s.circ_id:
+                plog("WARN", "Stream "+str(s.strm_id)+" failed from no circuit!")
+
+            # We get failed and closed for each stream. OK to return 
+            # and let the closed do the cleanup
+            # (FIXME: be careful about double stats)
+            if s.status == "FAILED":
+                # Avoid busted circuits that will not resolve or carry
+                # traffic. FIXME: Failed count before doing this?
+                if s.circ_id in circuits: del circuits[s.circ_id]
+                else: plog("WARN","Failed stream on unknown circ "+str(s.circ_id))
                 return
-            if streams[streamID].pending_circ:
-                streams[streamID].pending_circ.pending_streams.remove(streams[streamID])
-            del streams[streamID]
-        elif status == "REMAP":
-            if streamID not in streams:
-                plog("WARN", "Remap id "+str(streamID)+" not found")
+
+            if streams[s.strm_id].pending_circ:
+                streams[s.strm_id].pending_circ.pending_streams.remove(streams[s.strm_id])
+            del streams[s.strm_id]
+        elif s.status == "REMAP":
+            if s.strm_id not in streams:
+                plog("WARN", "Remap id "+str(s.strm_id)+" not found")
             else:
-                if not re.match(r"\d+.\d+.\d+.\d+", target_host):
-                    target_host = "255.255.255.255"
-                    plog("NOTICE", "Non-IP remap for "+str(streamID)+" to "
-                                   + target_host)
-                streams[streamID].host = target_host
-                streams[streamID].port = target_port
+                if not re.match(r"\d+.\d+.\d+.\d+", s.target_host):
+                    s.target_host = "255.255.255.255"
+                    plog("NOTICE", "Non-IP remap for "+str(s.strm_id)+" to "
+                                   + s.target_host)
+                streams[s.strm_id].host = s.target_host
+                streams[s.strm_id].port = s.target_port
 
-    def ns(self, eventtype, nslist):
-        read_routers(self.c, nslist)
-        plog("DEBUG", "Read " + str(len(nslist)) + eventtype + " => " 
+    def ns(self, n):
+        read_routers(self.c, n.nslist)
+        plog("DEBUG", "Read " + str(len(n.nslist))+" NS => " 
              + str(len(sorted_r)) + " routers")
     
-    def new_desc(self, eventtype, identities):
-        for i in identities: # Is this too slow?
+    def new_desc(self, d):
+        for i in d.idlist: # Is this too slow?
             read_routers(self.c, self.c.get_network_status("id/"+i))
-        plog("DEBUG", "Read " + str(len(identities)) + eventtype + " => " 
+        plog("DEBUG", "Read " + str(len(d.idlist))+" Desc => " 
              + str(len(sorted_r)) + " routers")
         
 
@@ -470,6 +497,7 @@
     s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
     s.connect((control_host,control_port))
     c = TorCtl.get_connection(s)
+    c.debug(file("control.log", "w"))
     c.set_event_handler(SnakeHandler(c))
     c.launch_thread()
     c.authenticate()

Modified: torflow/trunk/soat.pl
===================================================================
--- torflow/trunk/soat.pl	2007-02-14 16:46:55 UTC (rev 9586)
+++ torflow/trunk/soat.pl	2007-02-14 22:25:41 UTC (rev 9587)
@@ -19,7 +19,7 @@
 #baseline md5s of html
 my $SOCKS_PROXY = "127.0.0.1:9050";
 
-my @TO_SCAN = ("ssl", "urls");
+my @TO_SCAN = ("ssl");
 my $ALLOW_NEW_SSL_IPS = 1;
 
 # doc and ppt may also be good ones to check.. They are frequently vulnerable