[Author Prev][Author Next][Thread Prev][Thread Next][Author Index][Thread Index]
[or-cvs] r10147: Rearranged everything so that we have a CiruitManager that g (torflow/trunk)
- To: or-cvs@xxxxxxxxxxxxx
- Subject: [or-cvs] r10147: Rearranged everything so that we have a CiruitManager that g (torflow/trunk)
- From: renner@xxxxxxxx
- Date: Wed, 9 May 2007 12:36:34 -0400 (EDT)
- Delivered-to: archiver@seul.org
- Delivered-to: or-cvs-outgoing@seul.org
- Delivered-to: or-cvs@seul.org
- Delivery-date: Wed, 09 May 2007 12:36:44 -0400
- Reply-to: or-dev@xxxxxxxxxxxxx
- Sender: owner-or-cvs@xxxxxxxxxxxxx
Author: renner
Date: 2007-05-09 12:36:33 -0400 (Wed, 09 May 2007)
New Revision: 10147
Modified:
torflow/trunk/op-addon.py
Log:
Rearranged everything so that we have a CiruitManager that gets instantiated by the EventHandler.
Modified: torflow/trunk/op-addon.py
===================================================================
--- torflow/trunk/op-addon.py 2007-05-09 09:16:43 UTC (rev 10146)
+++ torflow/trunk/op-addon.py 2007-05-09 16:36:33 UTC (rev 10147)
@@ -42,7 +42,7 @@
# Set interval between work loads
sleep_interval = 20
# No of idle circuits
-idle_circuits = 6
+idle_circuits = 5
# GeoIP data object
geoip = GeoIP.new(GeoIP.GEOIP_STANDARD)
@@ -112,9 +112,9 @@
class Circuit(PathSupport.Circuit):
def __init__(self):
PathSupport.Circuit.__init__(self)
- self.rtt = None # double (sec)
+ self.total_rtt = None # double (sec)
+ self.rtts = [] # list of partial rtts: 0-1-2
self.timeout_counter = 0 # close on reaching a limit
- # TODO: self.rtts = []
# Stream class extended to isPing
class Stream(PathSupport.Stream):
@@ -164,25 +164,31 @@
plog("DEBUG", "NetworkModel initiated")
def addRouter(self, router):
- self.graph.add_Node(router)
+ self.graph.add_node(router)
+ def addLink(self, source, dest):
+ self.graph.add_edge(source, dest)
+
######################################### END: NetworkModel #####################
######################################### BEGIN: EventHandler #####################
-# TODO: better extend TorCtl.EventHandler() ??
-# TODO: Add a NetworkModel to this!
-
# We need an EventHandler, this one extends PathBuilder
class EventHandler(PathSupport.PathBuilder):
- def __init__(self, c, slmgr):
- self.ping_circs = Queue.Queue() # circ_ids
+ def __init__(self, c, selmgr):
+ # Call constructor of superclass
+ PathBuilder.__init__(self, c, selmgr, GeoIPRouter)
+ # Additional stuff
+ self.ping_circs = Queue.Queue() # (circ_id, hop)-pairs
self.start_times = {} # dict mapping circ_id:start_time TODO: cleanup
- self.circs_sorted = [] # sorted list of circs, generated regularly
- PathBuilder.__init__(self, c, slmgr, GeoIPRouter)
+ self.circs_sorted = [] # sorted list of circs, generated regularly
+ # Set up the CircuitManager
+ self.circ_manager = CircuitManager(selmgr, c, self)
+ self.circ_manager.setDaemon(True)
+ self.circ_manager.start()
- # Add a circuit to ping
- def queue_ping_circ(self, id):
- self.ping_circs.put(id)
+ # Add a circuit to ping, ping_info is (circ_id, hop)
+ def queue_ping_circ(self, ping_info):
+ self.ping_circs.put(ping_info)
# Send signal "CLEARDNSCACHE"
def clear_dns_cache(self):
@@ -199,7 +205,7 @@
def refresh_sorted_list(self):
# Sort the list for RTTs
circs_lock.acquire()
- self.circs_sorted = self.sort_list(self.circuits.values(), lambda x: x.rtt)
+ self.circs_sorted = self.sort_list(self.circuits.values(), lambda x: x.total_rtt)
circs_lock.release()
plog("DEBUG", "Refreshed sorted list of circuits")
@@ -269,7 +275,7 @@
# Choose from the sorted list!
for circ in self.circs_sorted:
- if circ.built and circ.rtt and circ.circ_id not in badcircs:
+ if circ.built and circ.total_rtt and circ.circ_id not in badcircs:
if circ.exit.will_exit_to(stream.host, stream.port):
try:
self.c.attach_stream(stream.strm_id, circ.circ_id)
@@ -305,21 +311,27 @@
# Handle a ping stream
def attach_ping(self, stream):
plog("DEBUG", "New ping request")
- # Get circ-id from the Queue
+ # Get info from the Queue
# TODO: check if empty
- circ_id = self.ping_circs.get()
+ ping_info = self.ping_circs.get()
+ # Extract ping-infos
+ circ_id = ping_info[0]
+ hop = ping_info[1]
+ # Set circ to stream
stream.circ = circ_id
try:
circs_lock.acquire()
- # Get the circuit
- circ = self.circuits[circ_id]
- if circ:
- # TODO: Measure to all hops
- self.c.attach_stream(stream.strm_id, circ.circ_id)
- # Measure here or move to before attaching?
- self.start_times[circ_id] = time.time()
- stream.pending_circ = circ # Only one possible here
- circ.pending_streams.append(stream)
+ # Get the circuit, TODO: Handle circs that do not exist anymore!
+ if circ_id in self.circuits:
+ circ = self.circuits[circ_id]
+ if circ.built:
+ self.c.attach_stream(stream.strm_id, circ.circ_id, hop)
+ # Measure here or move to before attaching?
+ self.start_times[circ_id] = time.time()
+ stream.pending_circ = circ # Only one possible here
+ circ.pending_streams.append(stream)
+ else:
+ plog("WARN", "Circuit not built")
else:
plog("WARN", "Circuit does not exist")
circs_lock.release()
@@ -376,7 +388,7 @@
plog("DEBUG", "Reached limit on timeouts --> closing circuit " + str(s.circ_id))
self.c.close_circuit(s.circ_id)
# Set RTT for circ to None
- self.circuits[s.circ_id].rtt = None
+ self.circuits[s.circ_id].total_rtt = None
circs_lock.release()
# Only close the stream
self.c.close_stream(s.strm_id, 7)
@@ -386,7 +398,7 @@
rtt = now - self.start_times[s.circ_id]
plog("INFO", "Measured RTT: " + str(rtt) + " sec")
# Save RTT to circuit
- self.circuits[s.circ_id].rtt = rtt
+ self.circuits[s.circ_id].total_rtt = rtt
circs_lock.release()
# TODO: Sort every time ??
self.refresh_sorted_list()
@@ -468,7 +480,7 @@
self.streams[s.strm_id].port = s.target_port
######################################### END: EventHandler #####################
-######################################### BEGIN: Addon-class #####################
+######################################### BEGIN: CircuitManager #####################
# This is the main class that keeps track of:
# -- Connection to Tor
@@ -477,6 +489,7 @@
# Does work regularly
# TODO: Close circuits that are too slow
# TODO: Switch circuit-managing off to get circuits created from Tor
+# TODO: Add a NetworkModel to this!
class CircuitManager(threading.Thread):
@@ -484,41 +497,16 @@
#rtt_circuit_limit = 1
#rtt_link_limit = 0.33
- def __init__(self, control_host, control_port, selmgr):
- # Connect to Tor process
- self.conn = self.connect(control_host, control_port)
- self.conn.debug(file("control.log", "w"))
- self.conn.authenticate()
- # Set the selmgr
+ def __init__(self, selmgr, conn, event_handler):
+ # Set everything
self.selmgr = selmgr
- # Set Handler to the connection
- self.handler = EventHandler(self.conn, self.selmgr)
- self.conn.set_event_handler(self.handler)
- # Configure myself
- self.configure()
+ self.conn = conn
+ self.handler = event_handler
# Create the Pinger
self.pinger = Pinger(ping_dummy_host, ping_dummy_port)
# Call constructor of superclass
threading.Thread.__init__(self)
-
- # Return a connection to Tor's control port
- def connect(self, control_host, control_port):
- # Create a socket and connect to Tor
- self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
- self.socket.connect((control_host, control_port))
- return Connection(self.socket)
- # Do the configuration
- def configure(self):
- # Set events to listen to
- self.conn.set_events([TorCtl.EVENT_TYPE.STREAM,
- TorCtl.EVENT_TYPE.CIRC,
- TorCtl.EVENT_TYPE.NS,
- TorCtl.EVENT_TYPE.NEWDESC], True)
- # Set options: We attach streams now & build circuits
- self.conn.set_option("__LeaveStreamsUnattached", "1")
- self.conn.set_option("__DisablePredictedCircuits", "1")
-
# The run()-method
def run(self):
while self.isAlive():
@@ -550,8 +538,8 @@
for c in circs:
if c.built:
id = c.circ_id
- # Put in the queue, XXX: synchronize!
- self.handler.queue_ping_circ(id)
+ # Put in the queue (circ, hop), XXX: synchronize!
+ self.handler.queue_ping_circ((id, None))
# Measure, TODO: synchronize or GIL --> OK?
self.handler.start_times[id] = time.time()
self.pinger.ping()
@@ -580,21 +568,54 @@
for c in circs:
out = "+ Circuit " + str(c.circ_id) + ": "
for r in c.path: out = out + " " + r.nickname + "(" + str(r.country_code) + ")"
- if c.rtt: out = out + " (RTT=" + str(c.rtt) + ")"
+ if c.total_rtt: out = out + " (RTT=" + str(c.total_rtt) + ")"
if not c.built: out = out + " (not yet built)"
print(out)
- # TODO: Call on exit
- def cleanup(self, conn, sock):
- self.conn.set_option("__LeaveStreamsUnattached", "0")
- self.conn.set_option("__DisablePredictedCircuits", "0")
- self.sock.close()
+######################################### END: CircuitManager #####################
+# Return a connection to Tor's control port
+def connect(control_host, control_port):
+ # Create a socket and connect to Tor
+ sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+ sock.connect((control_host, control_port))
+ return Connection(sock)
+
+# Do the configuration
+def configure(conn):
+ # Set events to listen to
+ conn.set_events([TorCtl.EVENT_TYPE.STREAM,
+ TorCtl.EVENT_TYPE.CIRC,
+ TorCtl.EVENT_TYPE.NS,
+ TorCtl.EVENT_TYPE.NEWDESC], True)
+ # Set options: We attach streams now & build circuits
+ conn.set_option("__LeaveStreamsUnattached", "1")
+ conn.set_option("__DisablePredictedCircuits", "1")
+
+def startup(argv):
+ # Connect to Tor process
+ conn = connect(control_host, control_port)
+ #conn.debug(file("control.log", "w"))
+ conn.authenticate()
+ # Set Handler to the connection
+ handler = EventHandler(conn, __selmgr)
+ conn.set_event_handler(handler)
+ # Configure myself
+ configure(conn)
+ # Go to sleep to be able to get killed from the commandline
+ try:
+ while True:
+ time.sleep(60)
+ except KeyboardInterrupt:
+ cleanup(conn)
+
+# Call this on exit
+def cleanup(conn):
+ plog("INFO", "Cleaning up...")
+ conn.set_option("__LeaveStreamsUnattached", "0")
+ conn.set_option("__DisablePredictedCircuits", "0")
+ conn.close()
+
if __name__ == '__main__':
- # TODO: How to keep track of threads?
- # Instantiate and start
- circ_manager = CircuitManager(control_host, control_port, __selmgr)
- circ_manager.setDaemon(True)
- circ_manager.start()
- while circ_manager.isAlive():
- time.sleep(60)
+ # Call main
+ startup(sys.argv)