[Author Prev][Author Next][Thread Prev][Thread Next][Author Index][Thread Index]
[or-cvs] r10705: Added the recording of global-circuit building stats like av (torflow/trunk)
Author: renner
Date: 2007-06-29 11:48:20 -0400 (Fri, 29 Jun 2007)
New Revision: 10705
Modified:
torflow/trunk/op-addon.py
Log:
Added the recording of global-circuit building stats like average setup duration, min/max, median and total no. of timeouts occurred.
Modified: torflow/trunk/op-addon.py
===================================================================
--- torflow/trunk/op-addon.py 2007-06-29 13:54:56 UTC (rev 10704)
+++ torflow/trunk/op-addon.py 2007-06-29 15:48:20 UTC (rev 10705)
@@ -31,12 +31,14 @@
from TorCtl import *
from TorCtl.TorUtil import plog, sort_list
-# TODO: Move these to config-file
+# TODO: Move all of the configuration to a .pathrc
control_host = "127.0.0.1"
control_port = 9051
socks_host = "127.0.0.1"
socks_port = 9050
-# Any ideas/proposals?
+
+# Host and port to use for ping streams
+# Choose randomly from a set of hosts/ports to prevent from fasttracking?
ping_dummy_host = "127.0.0.1"
ping_dummy_port = 100
@@ -46,34 +48,32 @@
# Measure complete circuits
measure_circs = True
-# Note: Tor-internal lifetime of a circuit is 10 min --> 600/sleep_interval = max-age
# Sleep interval between working loads in sec
initial_interval = 8
-sleep_interval = 1
+sleep_interval = 2
# Close circ after n timeouts or avg measured slownesses
timeout_limit = 1
# Slow RTT := x seconds, close circs slower & create only circs faster than this
-slowness_limit = 5
+slowness_limit = 2
slow = 1.
# Set to True if we want to measure partial circuits
# This also enables circuit creation from the model
-measure_partial_circs = True
+measure_partial_circs = False
# Minimum number of proposals to choose from
-min_proposals = 10
+min_proposals = 1
# Min ratio of traditionally created circs
# ensures growing of the explored subnet
-min_ratio = 1./3.
+min_ratio = 1./2.
-# Testing mode: Close circuits after num_tests measures +
-# involves a FileHandler to write collected data to a file
-testing_mode = True
-# Number of tests per circuit
+# Testing mode: Collect latencies of circuits and links in the network + global circ_stats
+# Close circuits after num_tests measures + involve FileHandlers to write data to files
+testing_mode = False
num_tests = 5
# Do geoip-configuration here
-# TODO: Set src_country below when setting up our location
-path_config = GeoIPSupport.GeoIPConfig(unique_countries = None,
+# Set src_country below when setting up our location
+path_config = GeoIPSupport.GeoIPConfig(unique_countries = True,
entry_country = None,
exit_country = None,
max_crossings = 1,
@@ -91,7 +91,7 @@
use_all_exits=False,
uniform=True,
use_exit=None,
- use_guards=True,
+ use_guards=False,
geoip_config=path_config)
######################################### BEGIN: Connection #####################
@@ -174,12 +174,19 @@
return self.values[(len(self.values)-1)/2]
else: return 0.0
+######################################## FileHandler ######################
+
class FileHandler:
- """ FileHandler for appending collected data to a file """
+ """ FileHandler class for writing/appending collected data to a file """
def __init__(self, filename):
self.filename = filename
def write(self, line):
+ self.filehandle = open(self.filename, 'w')
+ self.filehandle.write(line + "\n")
+ self.filehandle.close()
+
+ def append(self, line):
self.filehandle = open(self.filename, 'a')
self.filehandle.write(line + "\n")
self.filehandle.close()
@@ -200,12 +207,18 @@
self.slowness_counter = 0 # slowness limit
self.closed = False # mark circuit closed
self.rtt_created = False # if this was created from the model
+ # List of extend-times
+ self.extend_times = [] # list of all extend-times, sum up for setup duration
def add_rtt(self, rtt):
""" Add a new value and refresh the stats """
# Set current
- self.current_rtt = rtt
- # Add to the stats
+ if self.current_rtt == None:
+ self.current_rtt = rtt
+ else:
+ self.current_rtt = (self.current_rtt * 0.5) + (rtt * 0.5)
+ plog("DEBUG", "Computing new current RTT from " + str(rtt) + " to " + str(self.current_rtt))
+ # Add new RTT to the stats
self.stats.add_value(rtt)
# Increase age
self.age += 1
@@ -217,9 +230,9 @@
if not self.built: s += " (not yet built)"
else: s += " (age=" + str(self.age) + ")"
if self.current_rtt:
- s += ": " "RTT (current/median/mean/dev): "
- s += str(self.current_rtt) + "/" + str(self.stats.median) + "/"
- s += str(self.stats.mean) + "/" + str(self.stats.dev)
+ s += ": " "RTT [current (median/mean/dev)]: "
+ s += str(self.current_rtt) + " (" + str(self.stats.median) + "/"
+ s += str(self.stats.mean) + "/" + str(self.stats.dev) + ")"
if self.rtt_created: s += "*"
return s
@@ -227,8 +240,24 @@
""" Stream class extended to hop """
def __init__(self, sid, host, port, kind):
PathSupport.Stream.__init__(self, sid, host, port, kind)
- self.hop = None # save hop if this is a ping, hop=None means complete circ
+ self.hop = None # save hop if this is a ping, hop=None means complete circ
+######################################### BEGIN: CircuitStats #####################
+
+class CircuitBuildingStats(Stats):
+ """ Create an instance of this and gather overall circuit stats """
+ def __init__(self):
+ Stats.__init__(self)
+ self.timeouts = 0 # count occurrences of timeouts
+
+ def to_string(self):
+ """ Create a string for writing to a file """
+ s = "Circuit buildups: "
+ s += str(len(self.values)) + " records, median=" + str(self.median) + " sec, avg=" + str(self.mean) + " sec"
+ s += ", dev=" + str(self.dev) + " sec (min=" + str(self.min) + " sec, max=" + str(self.max) + " sec)\n"
+ s += "Total number of timeouts: " + str(self.timeouts)
+ return s
+
######################################### BEGIN: NetworkModel #####################
class LinkInfo:
@@ -239,14 +268,16 @@
self.src = src
self.dest = dest
# The current value
- self.current_rtt = 0.0
- # Setup the stats and record the first RTT
- self.stats = Stats()
+ self.current_rtt = None
+ # Set the RTT
self.add_rtt(rtt)
def add_rtt(self, rtt):
- self.current_rtt = rtt
- self.stats.add_value(rtt)
+ # Compute new current value from the last
+ if self.current_rtt == None: self.current_rtt = rtt
+ else:
+ self.current_rtt = (self.current_rtt * 0.5) + (rtt * 0.5)
+ plog("DEBUG", "Computing new current RTT from " + str(rtt) + " to " + str(self.current_rtt))
class PathProposal:
""" Instances of this class are path-proposals found in the model """
@@ -257,7 +288,12 @@
self.path = path[1:len(path)]
# Compute the expected RTT (from current value?)
self.rtt = reduce(lambda x,y: x + y.current_rtt, self.links, 0.0)
-
+
+ # TODO: Call before actually creating a circuit
+ def will_exit_to(self, host, port):
+ """ Check for the last router in the path """
+ return self.path(len(self.path)-1).will_exit_to(host, port)
+
def to_string(self):
""" Create a string for printing out information """
s = ""
@@ -345,9 +381,9 @@
def __init__(self, c, selmgr, num_circuits):
# Init the PathBuilder
PathSupport.PathBuilder.__init__(self, c, selmgr, GeoIPSupport.GeoIPRouter)
- self.num_circuits = num_circuits # size of the circuit pool
- self.check_circuit_pool() # bring up the pool of circs
- self.sorted_circs = None # attribute to hold a sorted list of the circuits
+ self.num_circuits = num_circuits # size of the circuit pool
+ self.check_circuit_pool() # bring up the pool of circs
+ self.circ_stats = CircuitBuildingStats() # record buildup-times, no. of timeouts
def check_circuit_pool(self):
""" Init or check the status of our pool of circuits """
@@ -413,6 +449,11 @@
# EXTENDED
if c.status == "EXTENDED":
+ # Compute elapsed time
+ extend_time = c.arrived_at - self.circuits[c.circ_id].last_extended_at
+ # Add to the list
+ self.circuits[c.circ_id].extend_times.append(extend_time)
+ plog("DEBUG", "Circuit " + str(c.circ_id) + " extended in " + str(extend_time) + " sec")
self.circuits[c.circ_id].last_extended_at = c.arrived_at
# FAILED & CLOSED
@@ -432,6 +473,11 @@
# BUILT
elif c.status == "BUILT":
self.circuits[c.circ_id].built = True
+ # Compute duration by summing up extend_times
+ duration = reduce(lambda x, y: x+y, self.circuits[c.circ_id].extend_times, 0.0)
+ plog("DEBUG", "Circuit " + str(c.circ_id) + " needed " + str(duration) + " seconds to be built")
+ # Add duration to circ_stats
+ self.circ_stats.add_value(duration)
try:
for stream in self.circuits[c.circ_id].pending_streams:
self.c.attach_stream(stream.strm_id, c.circ_id)
@@ -453,7 +499,8 @@
def __init__(self, c, selmgr, num_circs):
# Call constructor of superclass
CircuitHandler.__init__(self, c, selmgr, num_circs)
-
+ self.sorted_circs = None # attribute to hold a sorted list of the circuits
+
def clear_dns_cache(self):
""" Send signal CLEARDNSCACHE """
lines = self.c.sendAndRecv("SIGNAL CLEARDNSCACHE\r\n")
@@ -553,7 +600,7 @@
self.streams[s.strm_id].detached_from.append(s.circ_id)
# Detect timeouts on user streams
if s.reason == "TIMEOUT":
- # TODO: Increase a timeout counter on the stream?
+ # TODO: Count timeouts on the stream?
#self.streams[s.strm_id].timeout_counter += 1
plog("DEBUG", "User stream timed out on circuit " + str(s.circ_id))
# Stream was pending
@@ -625,9 +672,11 @@
if self.partial_circs:
self.router = router # this object represents this OR
self.model = NetworkModel(self.router) # model for recording link-RTTs
+ # For saving stats about circuit building
+ self.circ_filehandler = FileHandler("data/circ_stats")
# Handle testing_mode
if testing_mode:
- self.filehandler = FileHandler("data/circuits")
+ self.latency_filehandler = FileHandler("data/mean_latencies")
# Init the StreamHandler
StreamHandler.__init__(self, c, selmgr, num_circs)
# Start the Pinger that triggers the connections
@@ -638,8 +687,12 @@
self.sorted_circs = [] # list of circs sorted by mean RTT
def refresh_sorted_list(self):
- """ Sort the list for their mean RTTs or something else? """
- self.sorted_circs = sort_list(self.circuits.values(), lambda x: x.stats.mean)
+ """ Sort the list for their current RTTs """
+ def notlambda(x):
+ # If not measured yet, return a max value
+ if x.current_rtt == None: return 10
+ else: return x.current_rtt
+ self.sorted_circs = sort_list(self.circuits.values(), notlambda)
plog("DEBUG", "Refreshed sorted list of circuits")
def enqueue_pings(self):
@@ -651,7 +704,7 @@
# Get id of c
id = c.circ_id
if self.partial_circs:
- # If partial measures wanted: get length
+ # If partial measurings wanted: get length
path_len = len(c.path)
for i in xrange(1, path_len):
self.ping_queue.put((id, i))
@@ -659,57 +712,67 @@
# And for the whole circuit ...
self.ping_queue.put((id, None))
plog("DEBUG", "Enqueued circuit " + str(id) + " hop None")
-
- def compute_link_RTTs(self):
- """ Get the circs and check if we can compute RTTs of single links and store these in the model """
+
+ # XXX: Not used
+ def compute_all_RTTs(self):
+ """ Get the circs and compute everything """
circs = self.circuits.values()
# Measure also the duration
start = time.time()
for c in circs:
- # Get the length
- path_len = len(c.path)
- # Go through the path
- for i in xrange(1,path_len):
- if i in c.part_rtts:
- # First hop --> add Link from Root to 1
- if i == 1:
- link_rtt = c.part_rtts[i]
- self.model.add_link(self.router, c.path[i-1], link_rtt)
- # Handle i -- (i+1)
- if i+1 in c.part_rtts:
- link_rtt = c.part_rtts[i+1] - c.part_rtts[i]
- if link_rtt > 0:
- plog("INFO", "Computed link-RTT: " + str(link_rtt))
- # Save to NetworkModel
- self.model.add_link(c.path[i-1], c.path[i], link_rtt)
- else:
- plog("WARN", "Negative link-RTT: " + str(link_rtt))
- # Handle (n-1) -- n
- elif None in c.part_rtts:
- # We have a total value
- link_rtt = c.part_rtts[None] - c.part_rtts[i]
- if link_rtt > 0:
- plog("INFO", "Computed link-RTT: " + str(link_rtt))
- # Save to NetworkModel
- self.model.add_link(c.path[i-1], c.path[i], link_rtt)
- else:
- plog("WARN", "Negative link-RTT: " + str(link_rtt))
+ self.compute_link_RTTs(c)
plog("DEBUG", "Computation of link-RTTs took us " + str(time.time()-start) + " seconds")
# Print out the model
self.model.print_graph()
self.model.find_circuits()
+ def compute_link_RTTs(self, c):
+ """ Check if we can compute RTTs of single links for circuit c and store these in the model """
+ # Get the length
+ path_len = len(c.path)
+ # Go through the path
+ for i in xrange(1,path_len):
+ if i in c.part_rtts:
+ # First hop --> add Link from Root to 1
+ if i == 1:
+ link_rtt = c.part_rtts[i]
+ self.model.add_link(self.router, c.path[i-1], link_rtt)
+ # Handle i -- (i+1)
+ if i+1 in c.part_rtts:
+ link_rtt = c.part_rtts[i+1] - c.part_rtts[i]
+ if link_rtt > 0:
+ plog("INFO", "Computed link-RTT " + str(i) + ": " + str(link_rtt))
+ # Save to NetworkModel
+ self.model.add_link(c.path[i-1], c.path[i], link_rtt)
+ else:
+ plog("WARN", "Negative link-RTT " + str(i) + ": " + str(link_rtt))
+ # Handle (n-1) -- n
+ elif None in c.part_rtts:
+ # We have a total value
+ link_rtt = c.part_rtts[None] - c.part_rtts[i]
+ if link_rtt > 0:
+ plog("INFO", "Computed link-RTT " + str(i) + ": " + str(link_rtt))
+ # Save to NetworkModel
+ self.model.add_link(c.path[i-1], c.path[i], link_rtt)
+ else:
+ plog("WARN", "Negative link-RTT " + str(i) + ": " + str(link_rtt))
+
def attach_ping(self, stream):
""" Attach a ping stream to its circuit """
if self.ping_queue.empty():
# This round has finished
- plog("INFO", "Queue is empty --> no circuits to test, closing stream " + str(stream.strm_id))
+ plog("INFO", "Queue is empty --> round has finished, closing stream " + str(stream.strm_id))
self.close_stream(stream.strm_id, 5)
# Call the rest from here?
self.print_circuits()
if self.partial_circs:
- self.compute_link_RTTs()
+ # Print out the model
+ self.model.print_graph()
+ self.model.find_circuits()
+ # Enqueue again all circs
self.enqueue_pings()
+ # Write circ_stats to a file every round
+ self.circ_filehandler.write(self.circ_stats.to_string())
else:
# Get the info and extract
@@ -759,15 +822,15 @@
# Save stats to a file for generating plots etc.
if self.partial_circs:
if self.circuits[s.circ_id].rtt_created:
- # TODO: Do we have to check if this circuit is _really_ new?
- self.filehandler.write(str(self.circuits[s.circ_id].stats.mean))
+ # TODO: Do we want to check if this circuit is _really_ new?
+ self.latency_filehandler.append(str(self.circuits[s.circ_id].stats.mean))
else:
- self.filehandler.write(str(self.circuits[s.circ_id].stats.mean))
+ self.latency_filehandler.append(str(self.circuits[s.circ_id].stats.mean))
# Close the circuit
self.close_circuit(s.circ_id)
- # Close if slow-max is reached on mean RTT
- if self.circuits[s.circ_id].stats.mean >= slow:
+ # Close if slow-max is reached on current RTTs
+ if self.circuits[s.circ_id].current_rtt >= slow:
self.circuits[s.circ_id].slowness_counter += 1
if self.circuits[s.circ_id].slowness_counter >= slowness_limit and not self.circuits[s.circ_id].closed:
plog("DEBUG", "Slow-max (" + str(slowness_limit) + ") is reached --> closing circuit " + str(s.circ_id))
@@ -775,6 +838,9 @@
# Resort only if this is for the complete circ
self.refresh_sorted_list()
+ if self.partial_circs == True:
+ self.compute_link_RTTs(self.circuits[s.circ_id])
+
def stream_status_event(self, s):
""" Separate pings from regular streams directly """
if not (s.target_host == ping_dummy_host and s.target_port == ping_dummy_port):
@@ -808,6 +874,8 @@
if self.circuits[s.circ_id].timeout_counter >= timeout_limit and not self.circuits[s.circ_id].closed:
# Close the circuit
plog("DEBUG", "Reached limit on timeouts --> closing circuit " + str(s.circ_id))
+ # Increase total counter for timeouts
+ self.circ_stats.timeouts += 1
self.close_circuit(s.circ_id)
# Set RTT for this circ to None
self.circuits[s.circ_id].current_rtt = None