[Author Prev][Author Next][Thread Prev][Thread Next][Author Index][Thread Index]
[or-cvs] r10167: Added code to close circuits that are 'too slow' regarding s (torflow/trunk)
- To: or-cvs@xxxxxxxxxxxxx
- Subject: [or-cvs] r10167: Added code to close circuits that are 'too slow' regarding s (torflow/trunk)
- From: renner@xxxxxxxx
- Date: Fri, 11 May 2007 08:37:40 -0400 (EDT)
- Delivered-to: archiver@seul.org
- Delivered-to: or-cvs-outgoing@seul.org
- Delivered-to: or-cvs@seul.org
- Delivery-date: Fri, 11 May 2007 08:37:49 -0400
- Reply-to: or-dev@xxxxxxxxxxxxx
- Sender: owner-or-cvs@xxxxxxxxxxxxx
Author: renner
Date: 2007-05-11 08:37:27 -0400 (Fri, 11 May 2007)
New Revision: 10167
Modified:
torflow/trunk/op-addon.py
Log:
Added code to close circuits that are 'too slow' regarding some configurable RTT-limit and fixed a bug
(duplicated saves of start-times for measurings).
Modified: torflow/trunk/op-addon.py
===================================================================
--- torflow/trunk/op-addon.py 2007-05-11 10:55:34 UTC (rev 10166)
+++ torflow/trunk/op-addon.py 2007-05-11 12:37:27 UTC (rev 10167)
@@ -34,15 +34,18 @@
control_port = 9051
socks_host = "127.0.0.1"
socks_port = 9050
+# Any ideas/proposals?
ping_dummy_host = "127.0.0.1"
ping_dummy_port = 100
# Close circ after n timeouts
timeout_limit = 3
+# Slow RTT: 1 second??
+slow = 1
# Set interval between work loads
-sleep_interval = 20
+sleep_interval = 10
# No of idle circuits
-idle_circuits = 5
+idle_circuits = 8
# GeoIP data object
geoip = GeoIP.new(GeoIP.GEOIP_STANDARD)
@@ -112,15 +115,18 @@
class Circuit(PathSupport.Circuit):
def __init__(self):
PathSupport.Circuit.__init__(self)
- self.total_rtt = None # double (sec)
- self.rtts = [] # list of partial rtts: 0-1-2
- self.timeout_counter = 0 # close on reaching a limit
+ self.total_rtt = None # double (sec), substitute with..
+ self.rtts = [] # list of partial rtts: 1-2-3
+ self.timeout_counter = 0 # timeout limit
+ self.slowness_counter = 0 # slowness limit
+ self.closed = False # Mark circuits closed
# Stream class extended to isPing
class Stream(PathSupport.Stream):
def __init__(self, sid, host, port, kind):
PathSupport.Stream.__init__(self, sid, host, port, kind)
self.isPing = False
+ self.hop = None # Save hop if this is a ping, None = complete circ
######################################### Router, Circuit, Stream #####################
######################################### BEGIN: Pinger #####################
@@ -179,7 +185,7 @@
PathBuilder.__init__(self, c, selmgr, GeoIPRouter)
# Additional stuff
self.ping_circs = Queue.Queue() # (circ_id, hop)-pairs
- self.start_times = {} # dict mapping circ_id:start_time TODO: cleanup
+ self.start_times = {} # dict mapping (circ_id, hop):start_time TODO: cleanup
self.circs_sorted = [] # sorted list of circs, generated regularly
# Set up the CircuitManager
self.circ_manager = CircuitManager(selmgr, c, self)
@@ -275,7 +281,7 @@
# Choose from the sorted list!
for circ in self.circs_sorted:
- if circ.built and circ.total_rtt and circ.circ_id not in badcircs:
+ if circ.built and circ.total_rtt and circ.circ_id not in badcircs and not circ.closed:
if circ.exit.will_exit_to(stream.host, stream.port):
try:
self.c.attach_stream(stream.strm_id, circ.circ_id)
@@ -324,10 +330,11 @@
# Get the circuit, TODO: Handle circs that do not exist anymore!
if circ_id in self.circuits:
circ = self.circuits[circ_id]
- if circ.built:
+ if circ.built and not circ.closed:
self.c.attach_stream(stream.strm_id, circ.circ_id, hop)
# Measure here or move to before attaching?
- self.start_times[circ_id] = time.time()
+ self.start_times[(circ_id, hop)] = time.time()
+ stream.hop = hop
stream.pending_circ = circ # Only one possible here
circ.pending_streams.append(stream)
else:
@@ -382,10 +389,12 @@
circs_lock.acquire()
if (s.reason == "TIMEOUT"):
self.circuits[s.circ_id].timeout_counter += 1
+ self.circuits[s.circ_id].slowness_counter += 1
plog("DEBUG", str(self.circuits[s.circ_id].timeout_counter) + " timeout(s) on circuit " + str(s.circ_id))
if self.circuits[s.circ_id].timeout_counter >= timeout_limit:
# Close the circuit
plog("DEBUG", "Reached limit on timeouts --> closing circuit " + str(s.circ_id))
+ self.circuits[s.circ_id].closed = True
self.c.close_circuit(s.circ_id)
# Set RTT for circ to None
self.circuits[s.circ_id].total_rtt = None
@@ -395,12 +404,20 @@
return
# This is a successful ping: measure here
now = time.time()
- rtt = now - self.start_times[s.circ_id]
+ rtt = now - self.start_times[(s.circ_id, self.streams[s.strm_id].hop)]
plog("INFO", "Measured RTT: " + str(rtt) + " sec")
# Save RTT to circuit
self.circuits[s.circ_id].total_rtt = rtt
+
+ # Close if slow-max is reached
+ if rtt >= slow:
+ self.circuits[s.circ_id].slowness_counter += 1
+ if self.circuits[s.circ_id].slowness_counter >= timeout_limit:
+ plog("DEBUG", "Slow-max is reached --> closing circuit " + str(s.circ_id))
+ self.circuits[s.circ_id].closed = True
+ self.c.close_circuit(s.circ_id)
circs_lock.release()
- # TODO: Sort every time ??
+ # Resort every time ??
self.refresh_sorted_list()
# Close the stream
self.c.close_stream(s.strm_id, 6)
@@ -414,6 +431,7 @@
if self.circuits[s.circ_id].timeout_counter >= timeout_limit:
# Close the circuit
plog("DEBUG", "Reached limit on timeouts --> closing circuit " + str(s.circ_id))
+ self.circuits[s.circ_id].closed = True
self.c.close_circuit(s.circ_id)
circs_lock.release()
@@ -487,15 +505,10 @@
# -- EventHandler
#
# Does work regularly
-# TODO: Close circuits that are too slow
# TODO: Switch circuit-managing off to get circuits created from Tor
# TODO: Add a NetworkModel to this!
class CircuitManager(threading.Thread):
-
- # Limit of slowness when to close circs: 1 sec?
- #rtt_circuit_limit = 1
- #rtt_link_limit = 0.33
def __init__(self, selmgr, conn, event_handler):
# Set everything
@@ -512,7 +525,22 @@
while self.isAlive():
self.do_work()
time.sleep(sleep_interval)
-
+
+ # Do the work
+ def do_work(self):
+ # Get number of circuits
+ circs_lock.acquire()
+ n = len(self.handler.circuits.values())
+ circs_lock.release()
+ # Schedule (idle_circuits-n) circuit-buildups
+ while (n < idle_circuits):
+ self.build_idle_circuit()
+ plog("DEBUG", "Scheduled circuit No. " + str(n+1))
+ n += 1
+ # Measure RTTs of circuits
+ self.measure()
+ self.print_circuits()
+
# Build an idle circuit
# Better here than in EventHandler's thread
def build_idle_circuit(self):
@@ -538,32 +566,17 @@
for c in circs:
if c.built:
id = c.circ_id
+ # TODO: Measure for all hops, test if result is
+ # bigger each time, else start again
# Put in the queue (circ, hop), XXX: synchronize!
self.handler.queue_ping_circ((id, None))
- # Measure, TODO: synchronize or GIL --> OK?
- self.handler.start_times[id] = time.time()
- self.pinger.ping()
+ # Trigger ping
+ self.pinger.ping()
- # Do the work
- def do_work(self):
- # Get number of circuits
- circs_lock.acquire()
- n = len(self.handler.circuits.values())
- circs_lock.release()
- # Schedule (idle_circuits-n) circuit-buildups
- while (n < idle_circuits):
- self.build_idle_circuit()
- plog("DEBUG", "Scheduled circuit No. " + str(n+1))
- n += 1
- # Measure RTTs of circuits
- self.measure()
- self.print_circuits()
-
# Print circuits
def print_circuits(self):
circs_lock.acquire()
circs = self.handler.circuits.values()
- circs_lock.release()
plog("INFO", "We have " + str(len(circs)) + " circuits")
for c in circs:
out = "+ Circuit " + str(c.circ_id) + ": "
@@ -571,6 +584,7 @@
if c.total_rtt: out = out + " (RTT=" + str(c.total_rtt) + ")"
if not c.built: out = out + " (not yet built)"
print(out)
+ circs_lock.release()
######################################### END: CircuitManager #####################