[Author Prev][Author Next][Thread Prev][Thread Next][Author Index][Thread Index]

[or-cvs] r10147: Rearranged everything so that we have a CiruitManager that g (torflow/trunk)



Author: renner
Date: 2007-05-09 12:36:33 -0400 (Wed, 09 May 2007)
New Revision: 10147

Modified:
   torflow/trunk/op-addon.py
Log:
Rearranged everything so that we have a CiruitManager that gets instantiated by the EventHandler.


Modified: torflow/trunk/op-addon.py
===================================================================
--- torflow/trunk/op-addon.py	2007-05-09 09:16:43 UTC (rev 10146)
+++ torflow/trunk/op-addon.py	2007-05-09 16:36:33 UTC (rev 10147)
@@ -42,7 +42,7 @@
 # Set interval between work loads
 sleep_interval = 20
 # No of idle circuits
-idle_circuits = 6
+idle_circuits = 5
 
 # GeoIP data object
 geoip = GeoIP.new(GeoIP.GEOIP_STANDARD)
@@ -112,9 +112,9 @@
 class Circuit(PathSupport.Circuit):  
   def __init__(self):
     PathSupport.Circuit.__init__(self)
-    self.rtt = None          # double (sec)
+    self.total_rtt = None      # double (sec)
+    self.rtts = []           # list of partial rtts: 0-1-2
     self.timeout_counter = 0 # close on reaching a limit
-    # TODO: self.rtts = []
 
 # Stream class extended to isPing
 class Stream(PathSupport.Stream):
@@ -164,25 +164,31 @@
     plog("DEBUG", "NetworkModel initiated")
 
   def addRouter(self, router):
-    self.graph.add_Node(router)
+    self.graph.add_node(router)
 
+  def addLink(self, source, dest):
+    self.graph.add_edge(source, dest)
+    
 ######################################### END: NetworkModel        #####################
 ######################################### BEGIN: EventHandler      #####################
 
-# TODO: better extend TorCtl.EventHandler() ??
-# TODO: Add a NetworkModel to this!
-
 # We need an EventHandler, this one extends PathBuilder
 class EventHandler(PathSupport.PathBuilder):  
-  def __init__(self, c, slmgr):    
-    self.ping_circs = Queue.Queue()  # circ_ids
+  def __init__(self, c, selmgr):    
+    # Call constructor of superclass
+    PathBuilder.__init__(self, c, selmgr, GeoIPRouter)
+    # Additional stuff
+    self.ping_circs = Queue.Queue()  # (circ_id, hop)-pairs
     self.start_times = {}            # dict mapping circ_id:start_time TODO: cleanup
-    self.circs_sorted = []           # sorted list of circs, generated regularly    
-    PathBuilder.__init__(self, c, slmgr, GeoIPRouter)
+    self.circs_sorted = []           # sorted list of circs, generated regularly
+    # Set up the CircuitManager
+    self.circ_manager = CircuitManager(selmgr, c, self)
+    self.circ_manager.setDaemon(True)
+    self.circ_manager.start()
  
-  # Add a circuit to ping
-  def queue_ping_circ(self, id):
-    self.ping_circs.put(id)
+  # Add a circuit to ping, ping_info is (circ_id, hop)
+  def queue_ping_circ(self, ping_info):
+    self.ping_circs.put(ping_info)
 
   # Send signal "CLEARDNSCACHE"
   def clear_dns_cache(self):
@@ -199,7 +205,7 @@
   def refresh_sorted_list(self):
     # Sort the list for RTTs
     circs_lock.acquire()
-    self.circs_sorted = self.sort_list(self.circuits.values(), lambda x: x.rtt)
+    self.circs_sorted = self.sort_list(self.circuits.values(), lambda x: x.total_rtt)
     circs_lock.release()
     plog("DEBUG", "Refreshed sorted list of circuits")
   
@@ -269,7 +275,7 @@
 
     # Choose from the sorted list!  
     for circ in self.circs_sorted:
-      if circ.built and circ.rtt and circ.circ_id not in badcircs:
+      if circ.built and circ.total_rtt and circ.circ_id not in badcircs:
         if circ.exit.will_exit_to(stream.host, stream.port):
           try:
             self.c.attach_stream(stream.strm_id, circ.circ_id)
@@ -305,21 +311,27 @@
   # Handle a ping stream
   def attach_ping(self, stream):
     plog("DEBUG", "New ping request")
-    # Get circ-id from the Queue
+    # Get info from the Queue
     # TODO: check if empty
-    circ_id = self.ping_circs.get()
+    ping_info = self.ping_circs.get()
+    # Extract ping-infos
+    circ_id = ping_info[0]
+    hop = ping_info[1]
+    # Set circ to stream
     stream.circ = circ_id
     try:
       circs_lock.acquire()
-      # Get the circuit
-      circ = self.circuits[circ_id]
-      if circ:
-        # TODO: Measure to all hops
-        self.c.attach_stream(stream.strm_id, circ.circ_id)
-        # Measure here or move to before attaching?
-        self.start_times[circ_id] = time.time()
-        stream.pending_circ = circ # Only one possible here
-        circ.pending_streams.append(stream)
+      # Get the circuit, TODO: Handle circs that do not exist anymore!
+      if circ_id in self.circuits:
+        circ = self.circuits[circ_id]
+        if circ.built:        
+          self.c.attach_stream(stream.strm_id, circ.circ_id, hop)
+          # Measure here or move to before attaching?
+          self.start_times[circ_id] = time.time()
+          stream.pending_circ = circ # Only one possible here
+          circ.pending_streams.append(stream)
+        else:
+          plog("WARN", "Circuit not built")
       else:
         plog("WARN", "Circuit does not exist")
       circs_lock.release()
@@ -376,7 +388,7 @@
 	    plog("DEBUG", "Reached limit on timeouts --> closing circuit " + str(s.circ_id))
 	    self.c.close_circuit(s.circ_id)
 	  # Set RTT for circ to None
-	  self.circuits[s.circ_id].rtt = None
+	  self.circuits[s.circ_id].total_rtt = None
 	  circs_lock.release()
 	  # Only close the stream
           self.c.close_stream(s.strm_id, 7)
@@ -386,7 +398,7 @@
 	rtt = now - self.start_times[s.circ_id]
         plog("INFO", "Measured RTT: " + str(rtt) + " sec")
 	# Save RTT to circuit
-	self.circuits[s.circ_id].rtt = rtt
+	self.circuits[s.circ_id].total_rtt = rtt
 	circs_lock.release()
 	# TODO: Sort every time ??
 	self.refresh_sorted_list()
@@ -468,7 +480,7 @@
         self.streams[s.strm_id].port = s.target_port
 
 ######################################### END: EventHandler        #####################
-######################################### BEGIN: Addon-class       #####################
+######################################### BEGIN: CircuitManager    #####################
 
 # This is the main class that keeps track of: 
 # -- Connection to Tor
@@ -477,6 +489,7 @@
 # Does work regularly
 # TODO: Close circuits that are too slow
 # TODO: Switch circuit-managing off to get circuits created from Tor
+# TODO: Add a NetworkModel to this!
 
 class CircuitManager(threading.Thread):
   
@@ -484,41 +497,16 @@
   #rtt_circuit_limit = 1
   #rtt_link_limit = 0.33
 
-  def __init__(self, control_host, control_port, selmgr):
-    # Connect to Tor process
-    self.conn = self.connect(control_host, control_port)
-    self.conn.debug(file("control.log", "w"))
-    self.conn.authenticate()
-    # Set the selmgr
+  def __init__(self, selmgr, conn, event_handler):
+    # Set everything
     self.selmgr = selmgr
-    # Set Handler to the connection
-    self.handler = EventHandler(self.conn, self.selmgr)
-    self.conn.set_event_handler(self.handler)
-    # Configure myself
-    self.configure()
+    self.conn = conn
+    self.handler = event_handler
     # Create the Pinger
     self.pinger = Pinger(ping_dummy_host, ping_dummy_port)
     # Call constructor of superclass
     threading.Thread.__init__(self)
-
-  # Return a connection to Tor's control port
-  def connect(self, control_host, control_port):
-    # Create a socket and connect to Tor
-    self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
-    self.socket.connect((control_host, control_port))
-    return Connection(self.socket)
   
-  # Do the configuration
-  def configure(self):
-    # Set events to listen to
-    self.conn.set_events([TorCtl.EVENT_TYPE.STREAM,
-    	TorCtl.EVENT_TYPE.CIRC,
-    	TorCtl.EVENT_TYPE.NS,	  
-    	TorCtl.EVENT_TYPE.NEWDESC], True)
-    # Set options: We attach streams now & build circuits
-    self.conn.set_option("__LeaveStreamsUnattached", "1")
-    self.conn.set_option("__DisablePredictedCircuits", "1")
-
   # The run()-method
   def run(self):
     while self.isAlive():
@@ -550,8 +538,8 @@
     for c in circs:
       if c.built:
 	id = c.circ_id
-	# Put in the queue, XXX: synchronize!
-	self.handler.queue_ping_circ(id)
+	# Put in the queue (circ, hop), XXX: synchronize!
+	self.handler.queue_ping_circ((id, None))
 	# Measure, TODO: synchronize or GIL --> OK?
 	self.handler.start_times[id] = time.time()
         self.pinger.ping()
@@ -580,21 +568,54 @@
     for c in circs:
       out = "+ Circuit " + str(c.circ_id) + ": "
       for r in c.path: out = out + " " + r.nickname + "(" + str(r.country_code) + ")"
-      if c.rtt: out = out + " (RTT=" + str(c.rtt) + ")"
+      if c.total_rtt: out = out + " (RTT=" + str(c.total_rtt) + ")"
       if not c.built: out = out + " (not yet built)"
       print(out)
 
-  # TODO: Call on exit
-  def cleanup(self, conn, sock):
-    self.conn.set_option("__LeaveStreamsUnattached", "0")
-    self.conn.set_option("__DisablePredictedCircuits", "0")
-    self.sock.close()
+######################################### END: CircuitManager      #####################
 
+# Return a connection to Tor's control port
+def connect(control_host, control_port):
+  # Create a socket and connect to Tor
+  sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+  sock.connect((control_host, control_port))
+  return Connection(sock)
+  
+# Do the configuration
+def configure(conn):
+  # Set events to listen to
+  conn.set_events([TorCtl.EVENT_TYPE.STREAM,
+      TorCtl.EVENT_TYPE.CIRC,
+      TorCtl.EVENT_TYPE.NS,	  
+      TorCtl.EVENT_TYPE.NEWDESC], True)
+  # Set options: We attach streams now & build circuits
+  conn.set_option("__LeaveStreamsUnattached", "1")
+  conn.set_option("__DisablePredictedCircuits", "1")
+
+def startup(argv):
+  # Connect to Tor process
+  conn = connect(control_host, control_port)
+  #conn.debug(file("control.log", "w"))
+  conn.authenticate()
+  # Set Handler to the connection
+  handler = EventHandler(conn, __selmgr)
+  conn.set_event_handler(handler)
+  # Configure myself
+  configure(conn)
+  # Go to sleep to be able to get killed from the commandline
+  try:
+    while True:
+      time.sleep(60)
+  except KeyboardInterrupt:
+    cleanup(conn)
+
+# Call this on exit
+def cleanup(conn):
+  plog("INFO", "Cleaning up...")
+  conn.set_option("__LeaveStreamsUnattached", "0")
+  conn.set_option("__DisablePredictedCircuits", "0")
+  conn.close()
+
 if __name__ == '__main__':
-  # TODO: How to keep track of threads? 
-  # Instantiate and start
-  circ_manager = CircuitManager(control_host, control_port, __selmgr)
-  circ_manager.setDaemon(True)
-  circ_manager.start()
-  while circ_manager.isAlive():
-    time.sleep(60)
+  # Call main
+  startup(sys.argv)