[Author Prev][Author Next][Thread Prev][Thread Next][Author Index][Thread Index]

[or-cvs] r15021: Ignore streams with directory-related PURPOSEs and fix some (in torflow/branches/gsoc2008: . TorCtl)



Author: mikeperry
Date: 2008-06-07 23:08:50 -0400 (Sat, 07 Jun 2008)
New Revision: 15021

Modified:
   torflow/branches/gsoc2008/TorCtl/PathSupport.py
   torflow/branches/gsoc2008/TorCtl/TorCtl.py
   torflow/branches/gsoc2008/metatroller.py
Log:

Ignore streams with directory-related PURPOSEs and fix some
bugs with streams on circuits that existed before we started.



Modified: torflow/branches/gsoc2008/TorCtl/PathSupport.py
===================================================================
--- torflow/branches/gsoc2008/TorCtl/PathSupport.py	2008-06-08 02:53:32 UTC (rev 15020)
+++ torflow/branches/gsoc2008/TorCtl/PathSupport.py	2008-06-08 03:08:50 UTC (rev 15021)
@@ -851,6 +851,7 @@
     self.bytes_read = 0
     self.bytes_written = 0
     self.failed = False
+    self.ignored = False # Set if PURPOSE=DIR_*
     self.failed_reason = None # Cheating a little.. Only used by StatsHandler
 
   def lifespan(self, now):
@@ -1066,15 +1067,27 @@
           s.target_host, str(s.target_port)]
     if s.reason: output.append("REASON=" + s.reason)
     if s.remote_reason: output.append("REMOTE_REASON=" + s.remote_reason)
+    if s.purpose: output.append("PURPOSE=" + s.purpose)
     plog("DEBUG", " ".join(output))
     if not re.match(r"\d+.\d+.\d+.\d+", s.target_host):
       s.target_host = "255.255.255.255" # ignore DNS for exit policy check
+
+    # Hack to ignore Tor-handled streams (Currently only directory streams)
+    if s.strm_id in self.streams and self.streams[s.strm_id].ignored:
+      plog("DEBUG", "Ignoring stream: " + str(s.strm_id))
+      return
+
     if s.status == "NEW" or s.status == "NEWRESOLVE":
       if s.status == "NEWRESOLVE" and not s.target_port:
         s.target_port = self.resolve_port
       self.streams[s.strm_id] = Stream(s.strm_id, s.target_host, s.target_port, s.status)
-
-      self.attach_stream_any(self.streams[s.strm_id],
+      # Remember Tor-handled streams (Currently only directory streams)
+      if s.purpose and s.purpose.find("DIR_") == 0:
+        self.streams[s.strm_id].ignored = True
+        plog("DEBUG", "Ignoring stream: " + str(s.strm_id))
+        return
+      else:
+        self.attach_stream_any(self.streams[s.strm_id],
                    self.streams[s.strm_id].detached_from)
     elif s.status == "DETACHED":
       if s.strm_id not in self.streams:
@@ -1102,7 +1115,11 @@
         plog("WARN", "Mismatch of pending: "
           +str(self.streams[s.strm_id].pending_circ.circ_id)+" vs "
           +str(s.circ_id))
-        self.streams[s.strm_id].circ = self.circuits[s.circ_id]
+        # This can happen if the circuit existed before we started up
+        if s.circ_id in self.circuits:
+          self.streams[s.strm_id].circ = self.circuits[s.circ_id]
+        else:
+          plog("NOTICE", "Stream "+str(s.strm_id)+" has unknown circuit: "+str(s.circ_id))
       else:
         self.streams[s.strm_id].circ = self.streams[s.strm_id].pending_circ
       self.streams[s.strm_id].pending_circ.pending_streams.remove(self.streams[s.strm_id])
@@ -1356,20 +1373,32 @@
     output = [s.event_name, str(s.strm_id), s.status, str(s.circ_id), s.target_host+':'+str(s.target_port)]
     if s.reason: output.append("REASON=" + s.reason)
     if s.remote_reason: output.append("REMOTE_REASON=" + s.remote_reason)
+    if s.purpose: output.append("PURPOSE=" + s.purpose)
     plog("DEBUG", " ".join(output))
      
     # If target_host is not an IP-address
     if not re.match(r"\d+.\d+.\d+.\d+", s.target_host):
       s.target_host = "255.255.255.255" # ignore DNS for exit policy check
-    
+   
+    # Hack to ignore Tor-handled streams (Currently only directory streams)
+    if s.strm_id in self.streams and self.streams[s.strm_id].ignored:
+      plog("DEBUG", "Ignoring stream: " + str(s.strm_id))
+      return
+ 
     # NEW or NEWRESOLVE
     if s.status == "NEW" or s.status == "NEWRESOLVE":
       if s.status == "NEWRESOLVE" and not s.target_port:
         s.target_port = self.resolve_port      
       # Set up the new stream
       stream = Stream(s.strm_id, s.target_host, s.target_port, s.status)
-      self.streams[s.strm_id] = stream        
-      self.attach_stream_any(self.streams[s.strm_id], self.streams[s.strm_id].detached_from)
+
+      self.streams[s.strm_id] = stream
+      if s.purpose and s.purpose.find("DIR_") == 0:
+        stream.ignored = True
+        plog("DEBUG", "Ignoring stream: " + str(s.strm_id))
+        return
+      else:
+        self.attach_stream_any(self.streams[s.strm_id], self.streams[s.strm_id].detached_from)
     
     # DETACHED
     elif s.status == "DETACHED":

Modified: torflow/branches/gsoc2008/TorCtl/TorCtl.py
===================================================================
--- torflow/branches/gsoc2008/TorCtl/TorCtl.py	2008-06-08 02:53:32 UTC (rev 15020)
+++ torflow/branches/gsoc2008/TorCtl/TorCtl.py	2008-06-08 03:08:50 UTC (rev 15021)
@@ -114,7 +114,7 @@
 
 class StreamEvent:
   def __init__(self, event_name, strm_id, status, circ_id, target_host,
-         target_port, reason, remote_reason, source, source_addr):
+         target_port, reason, remote_reason, source, source_addr, purpose):
     self.event_name = event_name
     self.arrived_at = 0
     self.strm_id = strm_id
@@ -126,6 +126,7 @@
     self.remote_reason = remote_reason
     self.source = source
     self.source_addr = source_addr
+    self.purpose = purpose
 
 class ORConnEvent:
   def __init__(self, event_name, status, endpoint, age, read_bytes,
@@ -840,17 +841,18 @@
       event = CircuitEvent(evtype, ident, status, path, reason, remote)
     elif evtype == "STREAM":
       #plog("DEBUG", "STREAM: "+body)
-      m = re.match(r"(\S+)\s+(\S+)\s+(\S+)\s+(\S+):(\d+)(\sREASON=\S+)?(\sREMOTE_REASON=\S+)?(\sSOURCE=\S+)?(\sSOURCE_ADDR=\S+)?", body)
+      m = re.match(r"(\S+)\s+(\S+)\s+(\S+)\s+(\S+):(\d+)(\sREASON=\S+)?(\sREMOTE_REASON=\S+)?(\sSOURCE=\S+)?(\sSOURCE_ADDR=\S+)?(\sPURPOSE=\S+)?", body)
       if not m:
         raise ProtocolError("STREAM event misformatted.")
-      ident,status,circ,target_host,target_port,reason,remote,source,source_addr = m.groups()
+      ident,status,circ,target_host,target_port,reason,remote,source,source_addr,purpose = m.groups()
       ident,circ = map(int, (ident,circ))
       if reason: reason = reason[8:]
       if remote: remote = remote[15:]
       if source: source = source[8:]
       if source_addr: source_addr = source_addr[13:]
+      if purpose: purpose = purpose[9:]
       event = StreamEvent(evtype, ident, status, circ, target_host,
-                    int(target_port), reason, remote, source, source_addr)
+               int(target_port), reason, remote, source, source_addr, purpose)
     elif evtype == "ORCONN":
       m = re.match(r"(\S+)\s+(\S+)(\sAGE=\S+)?(\sREAD=\S+)?(\sWRITTEN=\S+)?(\sREASON=\S+)?(\sNCIRCS=\S+)?", body)
       if not m:

Modified: torflow/branches/gsoc2008/metatroller.py
===================================================================
--- torflow/branches/gsoc2008/metatroller.py	2008-06-08 02:53:32 UTC (rev 15020)
+++ torflow/branches/gsoc2008/metatroller.py	2008-06-08 03:08:50 UTC (rev 15021)
@@ -38,7 +38,7 @@
 __selmgr = PathSupport.SelectionManager(
       pathlen=2,
       order_exits=True,
-      percent_fast=100,
+      percent_fast=80,
       percent_skip=0,
       min_bw=1024,
       use_all_exits=False,
@@ -434,7 +434,7 @@
             +"/"+str(self.strm_count)+"\n")
 
     # Extend times 
-    n = reduce(lambda x, y: x+(y.avg_extend_time() > 0), self.sorted_r, 0)
+    n = 0.01+reduce(lambda x, y: x+(y.avg_extend_time() > 0), self.sorted_r, 0)
     avg_extend = reduce(lambda x, y: x+y.avg_extend_time(), self.sorted_r, 0)/n
     def notlambda(x, y):
       return x+(y.avg_extend_time()-avg_extend)*(y.avg_extend_time()-avg_extend) 
@@ -588,7 +588,7 @@
         r.strm_uncounted += 1
   
   def stream_status_event(self, s):
-    if s.strm_id in self.streams:
+    if s.strm_id in self.streams and not self.streams[s.strm_id].ignored:
       # TODO: Hrmm, consider making this sane in TorCtl.
       if s.reason: lreason = s.reason
       else: lreason = "NONE"
@@ -611,6 +611,10 @@
         if circ and circ.circ_id != s.circ_id:
           plog("WARN", str(s.strm_id) + " has mismatch of "
                 +str(s.circ_id)+" v "+str(circ.circ_id))
+        if s.circ_id and s.circ_id not in self.circuits:
+          plog("NOTICE", "Unknown circuit "+str(s.circ_id)
+                +" for stream "+str(s.strm_id))
+          return
       
       if s.status == "DETACHED":
         if self.streams[s.strm_id].attached_at: