[Author Prev][Author Next][Thread Prev][Thread Next][Author Index][Thread Index]

[or-cvs] r10167: Added code to close circuits that are 'too slow' regarding s (torflow/trunk)



Author: renner
Date: 2007-05-11 08:37:27 -0400 (Fri, 11 May 2007)
New Revision: 10167

Modified:
   torflow/trunk/op-addon.py
Log:
 Added code to close circuits that are 'too slow' regarding some configurable RTT-limit and fixed a bug 
 (duplicated saves of start-times for measurings).


Modified: torflow/trunk/op-addon.py
===================================================================
--- torflow/trunk/op-addon.py	2007-05-11 10:55:34 UTC (rev 10166)
+++ torflow/trunk/op-addon.py	2007-05-11 12:37:27 UTC (rev 10167)
@@ -34,15 +34,18 @@
 control_port = 9051
 socks_host = "127.0.0.1"
 socks_port = 9050
+# Any ideas/proposals?
 ping_dummy_host = "127.0.0.1"
 ping_dummy_port = 100
 
 # Close circ after n timeouts
 timeout_limit = 3
+# Slow RTT: 1 second?? 
+slow = 1
 # Set interval between work loads
-sleep_interval = 20
+sleep_interval = 10
 # No of idle circuits
-idle_circuits = 5
+idle_circuits = 8
 
 # GeoIP data object
 geoip = GeoIP.new(GeoIP.GEOIP_STANDARD)
@@ -112,15 +115,18 @@
 class Circuit(PathSupport.Circuit):  
   def __init__(self):
     PathSupport.Circuit.__init__(self)
-    self.total_rtt = None      # double (sec)
-    self.rtts = []           # list of partial rtts: 0-1-2
-    self.timeout_counter = 0 # close on reaching a limit
+    self.total_rtt = None      # double (sec), substitute with..
+    self.rtts = []             # list of partial rtts: 1-2-3
+    self.timeout_counter = 0   # timeout limit
+    self.slowness_counter = 0  # slowness limit
+    self.closed = False        # Mark circuits closed
 
 # Stream class extended to isPing
 class Stream(PathSupport.Stream):
   def __init__(self, sid, host, port, kind):
     PathSupport.Stream.__init__(self, sid, host, port, kind)
     self.isPing = False
+    self.hop = None	# Save hop if this is a ping, None = complete circ
 
 ######################################### Router, Circuit, Stream  #####################
 ######################################### BEGIN: Pinger            #####################
@@ -179,7 +185,7 @@
     PathBuilder.__init__(self, c, selmgr, GeoIPRouter)
     # Additional stuff
     self.ping_circs = Queue.Queue()  # (circ_id, hop)-pairs
-    self.start_times = {}            # dict mapping circ_id:start_time TODO: cleanup
+    self.start_times = {}            # dict mapping (circ_id, hop):start_time TODO: cleanup
     self.circs_sorted = []           # sorted list of circs, generated regularly
     # Set up the CircuitManager
     self.circ_manager = CircuitManager(selmgr, c, self)
@@ -275,7 +281,7 @@
 
     # Choose from the sorted list!  
     for circ in self.circs_sorted:
-      if circ.built and circ.total_rtt and circ.circ_id not in badcircs:
+      if circ.built and circ.total_rtt and circ.circ_id not in badcircs and not circ.closed:
         if circ.exit.will_exit_to(stream.host, stream.port):
           try:
             self.c.attach_stream(stream.strm_id, circ.circ_id)
@@ -324,10 +330,11 @@
       # Get the circuit, TODO: Handle circs that do not exist anymore!
       if circ_id in self.circuits:
         circ = self.circuits[circ_id]
-        if circ.built:        
+        if circ.built and not circ.closed:        
           self.c.attach_stream(stream.strm_id, circ.circ_id, hop)
           # Measure here or move to before attaching?
-          self.start_times[circ_id] = time.time()
+          self.start_times[(circ_id, hop)] = time.time()
+	  stream.hop = hop
           stream.pending_circ = circ # Only one possible here
           circ.pending_streams.append(stream)
         else:
@@ -382,10 +389,12 @@
         circs_lock.acquire()
         if (s.reason == "TIMEOUT"):
 	  self.circuits[s.circ_id].timeout_counter += 1
+	  self.circuits[s.circ_id].slowness_counter += 1
 	  plog("DEBUG", str(self.circuits[s.circ_id].timeout_counter) + " timeout(s) on circuit " + str(s.circ_id))
 	  if self.circuits[s.circ_id].timeout_counter >= timeout_limit:
 	    # Close the circuit
 	    plog("DEBUG", "Reached limit on timeouts --> closing circuit " + str(s.circ_id))
+	    self.circuits[s.circ_id].closed = True
 	    self.c.close_circuit(s.circ_id)
 	  # Set RTT for circ to None
 	  self.circuits[s.circ_id].total_rtt = None
@@ -395,12 +404,20 @@
 	  return
         # This is a successful ping: measure here
 	now = time.time()
-	rtt = now - self.start_times[s.circ_id]
+	rtt = now - self.start_times[(s.circ_id, self.streams[s.strm_id].hop)]
         plog("INFO", "Measured RTT: " + str(rtt) + " sec")
 	# Save RTT to circuit
 	self.circuits[s.circ_id].total_rtt = rtt
+	
+	# Close if slow-max is reached
+        if rtt >= slow:
+	  self.circuits[s.circ_id].slowness_counter += 1
+	  if self.circuits[s.circ_id].slowness_counter >= timeout_limit:
+	    plog("DEBUG", "Slow-max is reached --> closing circuit " + str(s.circ_id))
+	    self.circuits[s.circ_id].closed = True
+	    self.c.close_circuit(s.circ_id)
 	circs_lock.release()
-	# TODO: Sort every time ??
+	# Resort every time ??
 	self.refresh_sorted_list()
 	# Close the stream
         self.c.close_stream(s.strm_id, 6)
@@ -414,6 +431,7 @@
 	if self.circuits[s.circ_id].timeout_counter >= timeout_limit:
 	  # Close the circuit
 	  plog("DEBUG", "Reached limit on timeouts --> closing circuit " + str(s.circ_id))
+	  self.circuits[s.circ_id].closed = True
 	  self.c.close_circuit(s.circ_id)
 	circs_lock.release()
 
@@ -487,15 +505,10 @@
 # -- EventHandler
 #
 # Does work regularly
-# TODO: Close circuits that are too slow
 # TODO: Switch circuit-managing off to get circuits created from Tor
 # TODO: Add a NetworkModel to this!
 
 class CircuitManager(threading.Thread):
-  
-  # Limit of slowness when to close circs: 1 sec?
-  #rtt_circuit_limit = 1
-  #rtt_link_limit = 0.33
 
   def __init__(self, selmgr, conn, event_handler):
     # Set everything
@@ -512,7 +525,22 @@
     while self.isAlive():
       self.do_work()
       time.sleep(sleep_interval)
-  
+ 
+  # Do the work
+  def do_work(self):
+    # Get number of circuits
+    circs_lock.acquire()
+    n = len(self.handler.circuits.values())
+    circs_lock.release() 
+    # Schedule (idle_circuits-n) circuit-buildups
+    while (n < idle_circuits):      
+      self.build_idle_circuit()
+      plog("DEBUG", "Scheduled circuit No. " + str(n+1))
+      n += 1
+    # Measure RTTs of circuits
+    self.measure()
+    self.print_circuits()
+
   # Build an idle circuit
   # Better here than in EventHandler's thread
   def build_idle_circuit(self):
@@ -538,32 +566,17 @@
     for c in circs:
       if c.built:
 	id = c.circ_id
+	# TODO: Measure for all hops, test if result is 
+	# bigger each time, else start again
 	# Put in the queue (circ, hop), XXX: synchronize!
 	self.handler.queue_ping_circ((id, None))
-	# Measure, TODO: synchronize or GIL --> OK?
-	self.handler.start_times[id] = time.time()
-        self.pinger.ping()
+        # Trigger ping
+	self.pinger.ping()
 
-  # Do the work
-  def do_work(self):
-    # Get number of circuits
-    circs_lock.acquire()
-    n = len(self.handler.circuits.values())
-    circs_lock.release() 
-    # Schedule (idle_circuits-n) circuit-buildups
-    while (n < idle_circuits):      
-      self.build_idle_circuit()
-      plog("DEBUG", "Scheduled circuit No. " + str(n+1))
-      n += 1
-    # Measure RTTs of circuits
-    self.measure()
-    self.print_circuits()
-
   # Print circuits
   def print_circuits(self):
     circs_lock.acquire()
     circs = self.handler.circuits.values()
-    circs_lock.release()
     plog("INFO", "We have " + str(len(circs)) + " circuits")
     for c in circs:
       out = "+ Circuit " + str(c.circ_id) + ": "
@@ -571,6 +584,7 @@
       if c.total_rtt: out = out + " (RTT=" + str(c.total_rtt) + ")"
       if not c.built: out = out + " (not yet built)"
       print(out)
+    circs_lock.release()
 
 ######################################### END: CircuitManager      #####################