[Author Prev][Author Next][Thread Prev][Thread Next][Author Index][Thread Index]
[or-cvs] r12279: Latest revision of op-addon.py containing additional methods (in torflow/trunk: . TorCtl)
Author: renner
Date: 2007-10-30 06:41:28 -0400 (Tue, 30 Oct 2007)
New Revision: 12279
Added:
torflow/trunk/stream-server.pl
Modified:
torflow/trunk/TorCtl/PathSupport.py
torflow/trunk/bw-informer.py
torflow/trunk/op-addon.py
torflow/trunk/pathrc.example
Log:
Latest revision of op-addon.py containing additional methods that were used
to perform evaluations (involving stream-server.pl) + new version of
bw-informer.py
Modified: torflow/trunk/TorCtl/PathSupport.py
===================================================================
--- torflow/trunk/TorCtl/PathSupport.py 2007-10-30 08:59:34 UTC (rev 12278)
+++ torflow/trunk/TorCtl/PathSupport.py 2007-10-30 10:41:28 UTC (rev 12279)
@@ -456,7 +456,9 @@
self.total_exit_bw += r.bw
bw_per_hop = (1.0*self.total_bw)/self.pathlen
- ratio = self.total_exit_bw/float(self.total_bw)
+ if self.total_bw > 0:
+ ratio = self.total_exit_bw/float(self.total_bw)
+ else: ratio = 0
plog("DEBUG", "E = " + str(self.total_exit_bw) +
", T = " + str(self.total_bw) +
", ratio = " + str(ratio) +
@@ -467,7 +469,9 @@
# add a ConserveExitsRestriction?
self.weight = 0
else:
- self.weight = ((self.total_exit_bw-bw_per_hop)/self.total_exit_bw)
+ if self.total_exit_bw > 0:
+ self.weight = ((self.total_exit_bw-bw_per_hop)/self.total_exit_bw)
+ else: self.weight = 0
plog("DEBUG", "The exit-weight is: " + str(self.weight))
def next_r(self):
Modified: torflow/trunk/bw-informer.py
===================================================================
--- torflow/trunk/bw-informer.py 2007-10-30 08:59:34 UTC (rev 12278)
+++ torflow/trunk/bw-informer.py 2007-10-30 10:41:28 UTC (rev 12279)
@@ -1,65 +1,71 @@
#!/usr/bin/python
+
"""
RWTH Aachen University, Informatik IV
Copyright (C) 2007 Johannes Renner
Contact: renner@xxxxxxxxxxxxxxxxxxxxxxxxxxxx
"""
-# Addon for Onion Routers (prototype-v0.0-alpha):
-# Provides bandwidth-data about single TLS-connections as well as
-# total bandwidth-data about the router for requesting clients.
-# This software works in a passive way: It does _not_ control the
-# Tor process, e.g. close connections, _only_ records traffic.
-# TODO: Howto make the document be served by Tor via http?
-# TODO: Check nicknames for uniqueness
+# Addon for onion routers:
+# Shall provide information about available bandwidth on single
+# TLS-connections as well as globally available bandwidth for
+# requesting clients in an anonymity-preserving way (?).
+# TODO: Make the document be served by Tor via HTTP
+
+import re
import sys
import sched
import time
import socket
import atexit
import threading
+import traceback
from TorCtl import *
from TorCtl.TorUtil import *
-# Move these to config file:
+# Set the version here
+VERSION = "0.0-alpha"
+
+# Move these to a config file:
# Tor host and port
control_host = "127.0.0.1"
control_port = 9051
# Listen host and port
-listen_host = "127.0.0.1"
+listen_host = "137.226.12.177"
listen_port = 9053
# Duration of single measuring interval (seconds)
-interval = 30
-# No of inactive rounds to decrease max until
-# we set it to zero, this leads to 1 hour
-inactive_limit = 3600/interval
+interval = 20
+
# Alpha for computing new max values, let max
# decrease slowly if no traffic or not topped
-alpha = 0.01
-# Minimum 'available' bandwidth (bytes/sec)
+alpha = .9999
+# Minimum 'available' bandwidth (byte/sec)
# to show up on the document
-available_min = 10000
+available_min = 0
# Global variable marks the start of an interval
start = time.time()
-# variable that contains the status-document
+# Overall start time
+total_start = time.time()
+
+# Variable that contains the status-document
bw_status = "no status document available yet :(\r\n"
# Dictionary that contains all stats
stats = {}
stats_lock = threading.Lock()
+# Dicts that contain mappings
+key_to_name = {}
+name_to_key = {}
-#key_to_name = {}
-#name_to_key = {}
-
-# We use one class for recording global stats and link stats
+# We use the same class for recording global stats and link stats
class LinkBandwidthStats(TorCtl.Router):
def __init__(self, r=None):
if r:
- self.__dict__ = r.dict
+ self.__dict__ = r.__dict__
else:
self.down = 0
# Total counters
@@ -68,15 +74,14 @@
self.tot_ncircs = 0
self.tot_read = 0
self.tot_written = 0
+ self.tot_bytes = 0 # total read + written
# Interval stats
self.int_read = 0 # count bytes read & written ..
self.int_written = 0 # in the last interval
self.int_bytes = 0 # sum of both, gets set on update()
- self.avg_throughput = 0.0 # avg throughput for the last interval
+ self.curr_throughput = 0.0 # avg throughput for the last interval
self.max_throughput = 0.0 # throughput max-value
self.available = 0.0 # max - avg
- self.inactive_count = 0 # counter for inactive rounds
- self.inactive = False # inactive flag
def read(self, bytes_read):
self.tot_read += bytes_read
@@ -95,50 +100,35 @@
# Most important method here
def update(self, elapsed):
# Compute the interval-bytes read+written
- self.int_bytes = self.int_read + self.int_written
- # If nothing read or written this round
- if self.int_bytes == 0:
- # Increase counter
- self.inactive_count += 1
- if self.inactive_count >= inactive_limit:
- # Limit reached: set max to 0 to get this deleted from stats
- plog("DEBUG", "Inactive limit reached --> setting max to 0: " + self.nickname)
- self.max_throughput = 0
- self.inactive = True
- # Not needed since inactive --> del
- #reset_interval_counters()
- return
- else:
- # We have read or written something
- self.inactive_count = 0
+ self.int_bytes = self.int_read + self.int_written
+ # Compute total bytes
+ self.tot_bytes = self.tot_read + self.tot_written
# Compute avg interval throughput
- self.avg_throughput = self.int_bytes/elapsed
+ self.curr_throughput = self.int_bytes/elapsed
# Max handling ..
- if self.avg_throughput > self.max_throughput:
+ if self.curr_throughput > self.max_throughput:
# We have a new max!
- self.max_throughput = self.avg_throughput
- plog("DEBUG", self.nickname + " reached new max: " + str(self.max_throughput) + " bytes/sec")
+ self.max_throughput = self.curr_throughput
+ plog("DEBUG", self.nickname + " reached new max: " +
+ str(self.max_throughput) + " byte/sec")
else:
# Saving old max for debugging only
- #old_max = self.max_throughput
+ old_max = self.max_throughput
# Decrease the max-value using alpha-formula
- self.max_throughput = max(self.avg_throughput, (self.max_throughput*(1-alpha) + self.avg_throughput*alpha))
- #plog("DEBUG", self.nickname + ": max decreased from " + str(old_max) + " to " + str(self.max_throughput))
+ self.max_throughput = max(self.curr_throughput, (self.max_throughput*alpha + self.curr_throughput*(1-alpha)))
+ #plog("DEBUG", self.nickname + ": max decreased from "
+ # + str(old_max) + " to " + str(self.max_throughput))
- # Also set inactive if nothing read/written and max decreased to zero
- if self.int_bytes == 0 and self.max_throughput == 0:
- self.inactive = True
# Compute the difference as 'available'
- # TODO: Do it in the clients, or deliver ONLY this value??
- self.available = self.max_throughput - self.avg_throughput
+ # TODO: Add the frac part from the approaches
+ self.available = self.max_throughput - self.curr_throughput
# Reset the counters
self.reset_interval_counters()
# Special instance of LinkBandwidthStats for recording of bw-events
global_stats = LinkBandwidthStats()
-# TODO: Get my hostname/nickname?
-global_stats.nickname = "This Router"
+global_stats.nickname = "Global stats"
# We need an EventHandler
# extend from TorCtl.EventHandler
@@ -155,24 +145,24 @@
if event.written: global_stats.written(event.written)
# Method to handle ORCONN-events
- def or_conn_status_event(self, o):
- # XXX: Count all routers as one?
+ def or_conn_status_event(self, o):
+ # Count all clients as one:
# If o.endpoint is an idhash
- #if re.search(r"^\$", o.endpoint):
- #if o.endpoint not in key_to_name:
- #o.endpoint = "AllClients:HASH"
- #else: o.endpoint = key_to_name[o.endpoint]
+ if re.search(r"^\$", o.endpoint):
+ if o.endpoint not in key_to_name:
+ o.endpoint = "AllClients:HASH"
+ else: o.endpoint = key_to_name[o.endpoint]
# If it is no idhash and not in name_to_key
- #elif o.endpoint not in name_to_key:
- #plog("DEBUG", "IP? " + o.endpoint)
- #o.endpoint = "AllClients:IP"
+ elif o.endpoint not in name_to_key:
+ plog("DEBUG", "IP? " + o.endpoint)
+ o.endpoint = "AllClients:IP"
# If NEW, LAUNCHED or CONNECTED
if o.status == "NEW" or o.status == "LAUNCHED" or o.status == "CONNECTED":
plog("NOTICE", "Connection to " + o.endpoint + " is now " + o.status)
# If status is READ or WRITE
- if o.status == "READ" or o.status == "WRITE":
+ elif o.status == "READ" or o.status == "WRITE":
#plog("DEBUG", o.endpoint + ", read: " + str(o.read_bytes) + " wrote: " + str(o.wrote_bytes))
stats_lock.acquire()
# If not in stats: add!
@@ -181,14 +171,12 @@
stats[o.endpoint].nickname = o.endpoint
plog("NOTICE", "+ Added " + o.endpoint + " to the stats")
# Add number of bytes to total and interval
- if o.read_bytes:
- stats[o.endpoint].read(o.read_bytes)
- if o.wrote_bytes:
- stats[o.endpoint].written(o.wrote_bytes)
+ if o.read_bytes: stats[o.endpoint].read(o.read_bytes)
+ if o.wrote_bytes: stats[o.endpoint].written(o.wrote_bytes)
stats_lock.release()
# If CLOSED or FAILED
- if o.status == "CLOSED" or o.status == "FAILED":
+ elif o.status == "CLOSED" or o.status == "FAILED":
# Don't record reasons!
stats_lock.acquire()
if o.endpoint not in stats:
@@ -206,21 +194,28 @@
#if o.read_bytes: stats[o.endpoint].tot_read += o.read_bytes
#if o.wrote_bytes: stats[o.endpoint].tot_wrote += o.wrote_bytes
stats_lock.release()
- else: return
- # This is only for constructing debug output
- if o.age: age = "AGE="+str(o.age)
- else: age = ""
- if o.read_bytes: read = "READ="+str(o.read_bytes)
- else: read = ""
- if o.wrote_bytes: wrote = "WRITTEN="+str(o.wrote_bytes)
- else: wrote = ""
- if o.reason: reason = "REASON="+o.reason
- else: reason = ""
- if o.ncircs: ncircs = "NCIRCS="+str(o.ncircs)
- else: ncircs = ""
- plog("DEBUG", " ".join((o.event_name, o.endpoint, o.status, age, read, wrote, reason, ncircs)))
+ # This is only for constructing debug output
+ if o.age: age = "AGE="+str(o.age)
+ else: age = ""
+ if o.read_bytes: read = "READ="+str(o.read_bytes)
+ else: read = ""
+ if o.wrote_bytes: wrote = "WRITTEN="+str(o.wrote_bytes)
+ else: wrote = ""
+ if o.reason: reason = "REASON="+o.reason
+ else: reason = ""
+ if o.ncircs: ncircs = "NCIRCS="+str(o.ncircs)
+ else: ncircs = ""
+ plog("DEBUG", " ".join((o.event_name, o.endpoint, o.status, age, read, wrote, reason, ncircs)))
+ # NS-EventHandler methods
+ def ns_event(self, n):
+ read_routers(self.c, n.nslist)
+
+ def new_desc_event(self, d):
+ for i in d.idlist: # Is this too slow?
+ read_routers(self.c, self.c.get_network_status("id/"+i))
+
# Sort a list by a specified key
def sort_list(list, key):
list.sort(lambda x,y: cmp(key(y), key(x))) # Python < 2.4 hack
@@ -231,6 +226,31 @@
f.write(bw_status)
f.close()
+# Read the routers
+def read_routers(c, nslist):
+ global key_to_name, name_to_key
+ bad_key = 0
+ stats_lock.acquire()
+ for ns in nslist:
+ try:
+ key_to_name[ns.idhex] = ns.nickname
+ name_to_key[ns.nickname] = ns.idhex
+ r = LinkBandwidthStats(c.get_router(ns))
+ if ns.nickname in stats:
+ if stats[ns.nickname].idhex != r.idhex:
+ plog("NOTICE", "Router "+r.nickname+" has multiple keys: "
+ +stats[ns.nickname].idhex+" and "+r.idhex)
+ stats[r.nickname] = r # XXX: We get names only from ORCONN :(
+ except TorCtl.ErrorReply:
+ bad_key += 1
+ if "Running" in ns.flags:
+ plog("INFO", "Running router "+ns.nickname+"="+ns.idhex+" has no descriptor")
+ pass
+ except:
+ traceback.print_exception(*sys.exc_info())
+ continue
+ stats_lock.release()
+
# Update stats and reset every router's counters
# (Requires stats_lock.acquire())
def update_stats(elapsed):
@@ -241,15 +261,12 @@
for l in links:
# Update & reset stats
l.update(elapsed)
- # If inactive --> delete
- if l.inactive:
- del stats[l.nickname]
- plog("NOTICE", "- No traffic on link to " + l.nickname + " --> deleted from stats")
# Create the new status document
# (Requires stats_lock.acquire())
-# TODO: Compress the data:
+# TODO: Somehow compress the data:
# - if available==max --> only deliver max?
+# - only deliver available?
# - leave out links with available==0 ?
# - No, avail==0 means new max, but not nothing available!
# - clustering/classification?
@@ -258,21 +275,20 @@
# Fill in global_stats
new_status += str(global_stats.available) + " "
new_status += str(global_stats.max_throughput) + " "
- new_status += str(global_stats.avg_throughput) + "\r\n"
- new_status += "--------------------\r\n"
- # TODO: Better sort for available or max?
+ new_status += str(global_stats.curr_throughput) + "\r\n"
+ # Sort the document for available
key = lambda x: x.available
- links_sorted = sort_list(stats.values(), key)
+ links_sorted = sort_list(stats.values(), key)
for l in links_sorted:
# Cutoff at available_min
- if key(l) >= available_min:
- new_status += l.nickname + " " + str(key(l)) + " "
- new_status += str(l.max_throughput) + " " + str(l.avg_throughput) + "\r\n"
+ if key(l) >= available_min and l.nickname != "AllClients:HASH":
+ new_status += l.nickname + " " + str(key(l)) + " "
+ new_status += str(l.max_throughput) + " " + str(l.curr_throughput) + "\r\n"
# Critical: Exchange global bw_status document
global bw_status
bw_status = new_status
-# This is the method where the main work gets done
+# This is the method where the main work is done
# Schedule the call every 'interval' seconds
def do_work(s):
global start
@@ -289,8 +305,8 @@
# Release lock
stats_lock.release()
- # Write to file, TODO: Write to Tor-dir, find out!
- write_file(file("./data/bw-document", "w"))
+ # Write to file, TODO: Write to Tor-dir: data/status/
+ write_file(file("./data/bw-informer/bw-document", "w"))
# Some debugging
plog("INFO", "Created new document for the last interval (" + str(elapsed) + ") seconds\n") # + bw_status)
# Reschedule
@@ -298,12 +314,14 @@
s.enter(interval, 1, do_work, (s,))
# Run a scheduler that does work every interval
-def start_sched():
- #global key_to_name, name_to_key
- #nslist = c.get_network_status()
- #read_routers(c, nslist)
+def start_sched(c):
+ # Ge the network status
+ nslist = c.get_network_status()
+ read_routers(c, nslist)
+ # Setup scheduler
s = sched.scheduler(time.time, time.sleep)
start = time.time()
+ total_start = time.time()
s.enter(interval, 1, do_work, (s,))
try:
s.run()
@@ -338,7 +356,7 @@
# Main function
def main(argv):
- plog("INFO", "This is bandwidth-informer v0.0-alpha")
+ plog("INFO", "bw-informer v" + VERSION)
# Create connection to Tor
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.connect((control_host, control_port))
@@ -348,19 +366,22 @@
c.set_event_handler(LinkHandler(c))
# Close connection on exit
atexit.register(cleanup, *(c,))
- # Start the thread
+ # Start the connection thread
c.launch_thread()
c.authenticate()
- # Only listen to ORCONN
- c.set_events([TorCtl.EVENT_TYPE.ORCONN, TorCtl.EVENT_TYPE.BW], True)
+ # Listen to some events
+ c.set_events([TorCtl.EVENT_TYPE.ORCONN,
+ TorCtl.EVENT_TYPE.BW,
+ TorCtl.EVENT_TYPE.NS,
+ TorCtl.EVENT_TYPE.NEWDESC], True)
# TODO: Set extra-info for descriptor here
# Start server thread
thr = threading.Thread(None, lambda: start_server())
- thr.setName("Server")
+ thr.setName("BW-Server")
thr.setDaemon(1)
thr.start()
- # Start the monitor here
- start_sched()
+ # Start the actual monitor here
+ start_sched(c)
# Program entry point
if __name__ == '__main__':
Modified: torflow/trunk/op-addon.py
===================================================================
--- torflow/trunk/op-addon.py 2007-10-30 08:59:34 UTC (rev 12278)
+++ torflow/trunk/op-addon.py 2007-10-30 10:41:28 UTC (rev 12279)
@@ -24,15 +24,15 @@
## CONFIGURATION ##############################################################
# Set the version
-VERSION = "0.0.01-alpha"
-# Path to data-directory
+VERSION = "0.0.10"
+# Path to the data directory
DATADIR = "data/op-addon/"
# Our IP-address
IP = None
# Simulation modus
SIMULATE = False
-# Try to get the config-file from the commandline
+# Try to get the config-file from the commandline first
if len(sys.argv) == 1:
CONFIG_FILE = "pathrc.example"
elif len(sys.argv) == 2:
@@ -60,16 +60,16 @@
plog("ERROR", "Config file '" + CONFIG_FILE + "' does not exist, exiting.")
sys.exit(0)
-# Configuration sections
+# Different configuration sections
HOST_PORT = "HOST_PORT"
CIRC_MANAGEMENT = "CIRC_MANAGEMENT"
NODE_SELECTION = "NODE_SELECTION"
GEOIP = "GEOIP"
-TESTING = "TESTING"
+EVALUATE = "EVALUATE"
RTT = "RTT"
MODEL = "MODEL"
-# Measure the circuits
+# Measure RTTs of circuits
ping_circs = config.getboolean(RTT, "ping_circs")
network_model = False
if ping_circs:
@@ -86,12 +86,12 @@
# Close a circ after n timeouts
timeout_limit = config.getint(RTT, "timeout_limit")
- # Set to True if we want to measure partial circuits
- # This also enables circuit creation from the model
+ # Set to True to measure RTTs of partial circuits,
+ # also enables circuit creation from the model
network_model = config.getboolean(MODEL, "network_model")
if network_model:
import networkx
- # RTT-threshhold when creating circs from the model
+ # RTT-threshold when creating circs from the model
max_rtt = config.getfloat(MODEL, "max_rtt")
# Minimum number of proposals to choose from
min_proposals = config.getint(MODEL, "min_proposals")
@@ -102,10 +102,11 @@
# Testing mode: Collect latencies of circuits and links in the
# network. Close circuits after num_xx_tests measures and involve
# a FileHandler to write data to a file
- TESTING_MODE = config.getboolean(TESTING, "testing_mode")
- if TESTING_MODE:
- num_rtt_tests = config.getint(TESTING, "num_rtt_tests")
- num_records = config.getint(TESTING, "num_records")
+ EVAL_MODE = config.getboolean(EVALUATE, "evaluate")
+ if EVAL_MODE:
+ num_rtt_tests = config.getint(EVALUATE, "num_rtt_tests")
+ num_bw_tests = config.getint(EVALUATE, "num_bw_tests")
+ num_records = config.getint(EVALUATE, "num_records")
def get_geoip_config():
""" Read the geoip-configuration from the config-file """
@@ -161,7 +162,7 @@
def build_circuit_from_path(self, path):
""" Build circuit using a given path (= router-objects),
- used to build circs from NetworkModel """
+ used to build circuits from a NetworkModel """
circ = Circuit()
circ.path = path
circ.exit = path[len(path)-1]
@@ -258,6 +259,7 @@
def get_line_count(self):
self.filehandle = open(self.filename)
lines = self.filehandle.readlines()
+ # Close handle?
return len(lines)
## Circuit & Stream ###########################################################
@@ -267,30 +269,30 @@
def __init__(self):
PathSupport.Circuit.__init__(self)
# RTT stuff
- self.part_rtts = {} # Dict of partial rtts, pathlen 3: 1-2-None
- self.current_rtt = None # Double (sec): current value
- self.stats = Stats() # Stats about total RTT contains history
+ self.part_rtts = {} # Dict of partial RTTs, pathlen 3: 1-2-None
+ self.current_rtt = None # Double (sec): current ranking of this circ
+ self.stats = Stats() # Stats about total RTT, contains the history
# Counters and flags
self.age = 0 # Age in rounds
self.timeout_counter = 0 # Timeout limit
- self.rtt_created = False # Created from the model
+ self.rtt_created = False # Created from the model
# XXX: BW stuff
self.bw = 0
self.bw_tested = False
def add_rtt(self, rtt):
""" Add a new value and refresh stats and current """
- # Set current
+ # Set current circuit-ranking
if self.current_rtt == None:
self.current_rtt = rtt
else:
- # Weight the current value with the last
+ # Weight the current value with the previous
self.current_rtt = (self.current_rtt * 0.5) + (rtt * 0.5)
plog("DEBUG", "Computing new current RTT from " + str(rtt) + " to " +
str(self.current_rtt))
- # Add new RTT to the stats
+ # Add a new RTT to the stats
self.stats.add_value(rtt)
- # Increase age
+ # Increase the age
self.age += 1
def to_string(self):
@@ -316,7 +318,7 @@
## NetworkModel ###############################################################
-class LinkInfo:
+class TorLink:
""" This class contains infos about a link: source, destination, RTT
plus: rtt_history, methods to compute stats, etc. """
def __init__(self, src, dest, rtt=0):
@@ -330,7 +332,8 @@
def add_rtt(self, rtt):
# Compute new current value from the last
- if self.current_rtt == None: self.current_rtt = rtt
+ if self.current_rtt == None:
+ self.current_rtt = rtt
else:
self.current_rtt = (self.current_rtt * 0.5) + (rtt * 0.5)
plog("DEBUG", "Computing new current RTT from " + str(rtt) + " to " +
@@ -339,23 +342,23 @@
class PathProposal:
""" Instances of this class are path-proposals found in the model """
def __init__(self, links, path):
- # This is a list of LinkInfo objects
+ # This is a list of TorLink objects
self.links = links
- # Cut off ROOT here
+ # Cut off the ROOT here
self.path = path[1:len(path)]
# Compute the expected RTT
self.rtt = reduce(lambda x,y: x + y.current_rtt, self.links, 0.0)
self.rtt_score = 0 # RTT score
self.bw_score = 0 # BW score
- self.min_bw = 0 # Minimum BW of routers in self.path
- self.ranking_index = None # Index computed from BW and RTT
+ self.min_bw = 0 # Minimum bw of routers in path
+ self.ranking_index = None # Index computed from bw and RTT
def to_string(self):
""" Create a string for printing out information """
s = ""
for l in self.links:
s += str(l.src) + "--" + l.dest + " (" + str(l.current_rtt) + ") " + ", "
- return s + "--> " + str(self.rtt) + " sec"
+ return s + "--> " + str(self.rtt) + " sec"
class NetworkModel:
""" This class is used to record measured RTTs of single links in a model
@@ -375,7 +378,7 @@
self.up_to_date = False
except:
plog("INFO", "Could not load a model, creating a new one ..")
- self.graph = networkx.XGraph(name="Explored Tor Subnet")
+ self.graph = networkx.XGraph(name="Tor Subnet")
self.graph.add_node(None)
self.up_to_date = True
self.print_info()
@@ -385,18 +388,18 @@
""" Write the graph to a binary file """
start = time.time()
networkx.write_gpickle(self.graph, self.pickle_path)
- plog("INFO", "Saved network-model to '" + self.pickle_path +
+ plog("INFO", "Stored Tor-graph to '" + self.pickle_path +
"' in " + str(time.time()-start) + " sec")
def load_graph(self):
""" Load a graph from a binary file and return it """
graph = networkx.read_gpickle(self.pickle_path)
- plog("INFO", "Loaded graph from '" + self.pickle_path + "'")
+ plog("INFO", "Loaded Tor-graph from '" + self.pickle_path + "'")
return graph
def add_link(self, src, dest, rtt):
- """ Add link to the graph given src, dest (router-ids) & RTT (LinkInfo) """
- self.graph.add_edge(src, dest, LinkInfo(src, dest, rtt))
+ """ Add link to the graph given src, dest (router-ids) & RTT (TorLink) """
+ self.graph.add_edge(src, dest, TorLink(src, dest, rtt))
def add_circuit(self, c):
""" Check if we can compute RTTs of single links for a circuit
@@ -615,7 +618,7 @@
self.circ_stats = CircuitBuildingStats() # record setup-durations
self.stats_logger = FileHandler(DATADIR + "circ-setup-stats")
self.setup_logger = None # FileHandler(DATADIR + "circ-setup-durations")
- if TESTING_MODE:
+ if EVAL_MODE:
self.testing_logger = FileHandler(DATADIR + "circ-data")
self.bw_queue = Queue.Queue() # circ_ids to bw-test
# Queue containing circs to be tested
@@ -671,7 +674,19 @@
# TODO: Check if there are any circs, else set 'frequency' to 10?
circs = self.circuits.values()
for c in circs:
- self.enqueue_circ(c)
+ # XXX: First test BW, then enqueue for RTTs
+ if EVAL_MODE and num_bw_tests > 0:
+ if self.model:
+ if c.rtt_created and c.bw_tested:
+ self.enqueue_circ(c)
+ elif not c.rtt_created:
+ self.enqueue_circ(c)
+ elif not c.bw_tested:
+ pass
+ else:
+ self.enqueue_circ(c)
+ else:
+ self.enqueue_circ(c)
def enqueue_circ(self, c):
""" Enqueue a circuit for measuring RTT """
@@ -743,8 +758,8 @@
plog("DEBUG", "Added RTT to history: " +
str(self.circuits[s.circ_id].stats.values))
- # TESTING_MODE: close if num_rtt_tests is reached
- if TESTING_MODE:
+ # EVAL_MODE: close if num_rtt_tests is reached
+ if EVAL_MODE:
if self.circuits[s.circ_id].age == num_rtt_tests:
plog("DEBUG", "Closing circ " + str(s.circ_id) +
": num_rtt_tests is reached")
@@ -763,10 +778,69 @@
# Add the links of this circuit to the model
self.model.add_circuit(self.circuits[s.circ_id])
+ def handle_bw_test(self, s):
+ """ Handle special streams to measure the bandwidth of circs """
+ output = [s.event_name, str(s.strm_id), s.status, str(s.circ_id),
+ s.target_host, str(s.target_port)]
+ if s.reason: output.append("REASON=" + s.reason)
+ if s.remote_reason: output.append("REMOTE_REASON=" + s.remote_reason)
+ plog("DEBUG", " ".join(output))
+ # NEW
+ if s.status == "NEW":
+ stream = Stream(s.strm_id, s.target_host, s.target_port, s.status)
+ self.streams[s.strm_id] = stream
+ # Set next circ_id to stream
+ stream.circ = self.bw_queue.get()
+ try:
+ if stream.circ in self.circuits:
+ circ = self.circuits[stream.circ]
+ if circ.built and not circ.closed:
+ self.c.attach_stream(stream.strm_id, circ.circ_id)
+ else:
+ plog("WARN", "Circuit not built or closed")
+ self.close_stream(s.strm_id, 5)
+ else:
+ # Go to next test if circuit is gone or we get an ErrorReply
+ plog("WARN", "Circuit " + str(circ_id) +
+ " does not exist anymore --> closing stream")
+ # Close stream, XXX: Reason?
+ self.close_stream(s.strm_id, 5)
+ except TorCtl.ErrorReply, e:
+ plog("WARN", "Error attaching stream " + str(stream.strm_id) +
+ " :" + str(e.args))
+ self.close_stream(s.strm_id, 5)
+ # SUCCEEDED
+ if s.status == "SUCCEEDED":
+ self.streams[s.strm_id].attached_at = s.arrived_at
+ # DONE
+ if s.status == "CLOSED" and s.reason == "DONE":
+ stream = self.streams[s.strm_id]
+ # Since bytes are counted from events, use the timestamp
+ # of the last stream_bw event for computing the lifespan
+ #lifespan = stream.lifespan(s.arrived_at)
+ lifespan = stream.lifespan(stream.bw_timestamp)
+ plog("INFO", "Lifespan is " + str(lifespan))
+ # Compute bandwidth
+ total_bytes = stream.bytes_read + stream.bytes_written
+ plog("DEBUG", "Total number of bytes (read+written) is " + str(total_bytes))
+ bw = total_bytes/float(lifespan)
+ plog("INFO", "Got bandwidth: " + str(bw))
+ self.circuits[s.circ_id].bw = bw
+ self.circuits[s.circ_id].bw_tested = True
+ # DETACHED reason EXITPOLICY
+ if s.status == "DETACHED":
+ if s.remote_reason in ["EXITPOLICY","TIMEOUT"]:
+ # Close circuit and stream
+ self.close_stream(s.strm_id, 5)
+ self.close_circuit(s.circ_id)
+
def stream_status_event(self, s):
""" Separate pings from regular streams directly """
if not (s.target_host == ping_dummy_host and
s.target_port == ping_dummy_port):
+ # XXX: Catch bandwidth-streams
+ if s.target_host == IP and s.target_port == 8041:
+ return self.handle_bw_test(s)
# TODO: Handle echelon here?
# - perform DNS request (or use REMAP?)
@@ -774,8 +848,9 @@
# - check if there is already a circuit with exit node
# in destination country
- # This is no ping, call the other method
- return PathSupport.StreamHandler.stream_status_event(self, s)
+ # This is NO test: call the underlying method to attach
+ else:
+ return PathSupport.StreamHandler.stream_status_event(self, s)
# Construct debugging output
output = [s.event_name, str(s.strm_id), s.status, str(s.circ_id),
@@ -817,19 +892,19 @@
# Close the stream
self.close_stream(s.strm_id, 5)
- # CLOSED + END is also ping, some routers send it when measuring
- # latency to a single hop, better measure on FAILED?
+ # CLOSED + END is also a ping, some routers send it when
+ # measuring RTT to a single hop, better measure on FAILED?
elif s.status == "CLOSED":
if s.reason == "END":
# Only record
self.record_ping(s)
def circ_status_event(self, c):
- """ Override to record statistics on circuit-setups and -failures """
+ """ Override this to record statistics on circuit-setups and -failures """
if c.circ_id not in self.circuits:
return PathSupport.CircuitHandler.circ_status_event(self, c)
- # Catch FAILED/CLOSED now since circ will be removed
+ # Catch FAILED/CLOSED now: circ will be removed
elif c.status == "FAILED" or c.status == "CLOSED":
circ = self.circuits[c.circ_id]
# Setup a message for logging
@@ -878,7 +953,33 @@
self.circ_stats.add_value(circ.setup_duration)
self.stats_logger.write(self.circ_stats.to_string())
self.refresh_sorted_list()
+
+ # XXX: Initialize a bw-test here
+ if EVAL_MODE and num_bw_tests > 0:
+ if self.model:
+ # Only test bandwidth on rtt_created circs
+ if circ.rtt_created:
+ self.start_bw_test(c.circ_id)
+ else: self.start_bw_test(c.circ_id)
+
+ def start_bw_test(self, circ_id):
+ """ Perform a bandwidth-test on circuit with given circ_id """
+ plog("INFO", "Starting BW-test on circuit " + str(circ_id))
+ # Enqueue the circuit
+ self.bw_queue.put(circ_id)
+ # Start the stream-thread (512 KB = 524288)
+ bw_tester = BwTester(1000000)
+ bw_tester.setDaemon(True)
+ bw_tester.start()
+ def stream_bw_event(self, s):
+ """ Record the timestamp of the last stream_bw event to any stream """
+ if not s.strm_id in self.streams:
+ plog("WARN", "BW event for unknown stream id: "+str(s.strm_id))
+ else:
+ self.streams[s.strm_id].bw_timestamp = s.arrived_at
+ PathSupport.PathBuilder.stream_bw_event(self, s)
+
def build_circuit(self, host, port):
""" Override from CircuitHandler to support circuit-creation from model """
if self.model:
@@ -905,7 +1006,7 @@
plog("DEBUG", "Current number of proposals is "+
str(len(self.model.proposals)))
if len(self.model.proposals) >= min_proposals:
- # Give weights for single scores
+ # TODO: Set weights for single scores here!
self.model.update_ranking(1, 0)
# As long as there are enough
while len(self.model.proposals) >= min_proposals:
@@ -985,6 +1086,47 @@
# Close the socket if open
if s: s.close()
+## BW-Tester ##################################################################
+
+class BwTester(threading.Thread):
+ """ Thread that connects to our own IP and downloads a stream """
+ def __init__(self, bytes):
+ self.bytes = bytes # Amount of bytes to request
+ threading.Thread.__init__(self) # Call the thread-constructor
+
+ def run(self):
+ """ The run()-method """
+ self.run_test()
+
+ # No "try .. except .. finally .." in Python < 2.5 !
+ def run_test(self):
+ """ Create a connection to stream-server.pl using SOCKS4 """
+ s = None
+ try:
+ try:
+ s = socks.socksocket()
+ s.setproxy(socks.PROXY_TYPE_SOCKS4, socks_host, socks_port)
+ s.connect((IP, 8041))
+ plog("INFO", "Connected to " + IP)
+ # Request bytes
+ s.send(str(self.bytes) + "\n")
+ plog("INFO", "Sent request for " + str(self.bytes) + " bytes")
+ byte_counter = 0
+ while 1:
+ buffer = s.recv(4096)
+ if buffer:
+ #plog("INFO", "Received " + str(len(buffer)) + " bytes")
+ byte_counter += len(buffer)
+ if byte_counter >= self.bytes:
+ plog("INFO", "Received " + str(byte_counter) + " bytes in total")
+ s.send("close\n")
+ break
+ except socks.Socks4Error, e:
+ print("Got Exception: " + str(e))
+ finally:
+ # Close the socket if open
+ if s: s.close()
+
## End of Classes #############################################################
def connect():
@@ -1028,8 +1170,8 @@
TorCtl.EVENT_TYPE.NS,
TorCtl.EVENT_TYPE.NEWDESC], True)
# Set options: We attach streams now & build circuits
+ conn.set_option("__DisablePredictedCircuits", "1")
conn.set_option("__LeaveStreamsUnattached", "1")
- conn.set_option("__DisablePredictedCircuits", "1")
def startup(argv):
# Connect to Tor process
@@ -1040,14 +1182,14 @@
configure(conn)
# Get the size of the circuit-pool from config
num_circs = config.getint(CIRC_MANAGEMENT, "idle_circuits")
- # Set an EventHandler to the connection
+ # Set an EventHandler to the connection
if ping_circs:
if network_model:
handler = PingHandler(conn, __selmgr, num_circs,
GeoIPSupport.GeoIPRouter, True)
else:
handler = PingHandler(conn, __selmgr, num_circs,
- GeoIPSupport.GeoIPRouter)
+ GeoIPSupport.GeoIPRouter)
else:
# No pings, only a StreamHandler
handler = PathSupport.StreamHandler(conn, __selmgr, num_circs,
@@ -1075,7 +1217,7 @@
def simulate(n):
""" Simulate circuit creations """
- plog("INFO", "Starting simulation ..")
+ plog("INFO", "Running a simulation ..")
# Connect to Tor process
conn = connect()
setup_location(conn)
@@ -1083,13 +1225,13 @@
path_list = []
# Instantiate a PathBuilder
path_builder = PathSupport.PathBuilder(conn, __selmgr, GeoIPSupport.GeoIPRouter)
- plog("INFO", "Creating "+str(n)+" paths")
+ plog("INFO", "Generating "+str(n)+" paths")
if network_model:
model = NetworkModel(path_builder.routers)
model.set_target("255.255.255.255", 80, max_rtt)
model.generate_proposals()
- # Give weights for single scores (RTT, advertised BW)
- model.update_ranking(1, 1)
+ # TODO: Set weights for single scores (RTT, advertised BW) here!
+ model.update_ranking(1, 0)
while n > 0:
# Probabilistic selection
choice = model.weighted_selection(lambda x: x.ranking_index)
@@ -1108,7 +1250,7 @@
sys.exit(1)
def evaluate(path_list):
- """ Currently evaluates only lists of 3-hop paths """
+ """ Currently evaluates lists of 3-hop paths only """
import sets
entries = sets.Set()
middles = sets.Set()
@@ -1205,7 +1347,7 @@
return max_entropy
if __name__ == '__main__':
- plog("INFO", "OP-Addon v" + VERSION)
+ plog("INFO", "Starting OP-Addon v" + VERSION)
if SIMULATE:
if len(sys.argv) == 3:
simulate(10)
Modified: torflow/trunk/pathrc.example
===================================================================
--- torflow/trunk/pathrc.example 2007-10-30 08:59:34 UTC (rev 12278)
+++ torflow/trunk/pathrc.example 2007-10-30 10:41:28 UTC (rev 12279)
@@ -25,12 +25,12 @@
use_all_exits = yes
# UniformGenerator with optionally ordered exits,
-# (uniform = no) means bandwidth-weighted selection
+# (uniform = no) --> bandwidth-weighted selection
uniform = no
order_exits = no
# Make use of guard-nodes (yes|no) or a specific
-# exit node (nickname or IDHex)
+# exit node (nickname or IDHex) for all paths
use_guards = yes
#use_exit = xyz
@@ -42,8 +42,16 @@
# yes|no for unique|distinct countries,
# ! comment to don't care
-#unique_countries = yes
+unique_countries = yes
+# Maximum number of continent crossings: 0-n
+# ! comment out to enforce distinct continents
+# ! set >= pathlen to not care about
+continent_crossings = 2
+# Maximum number of ocean crossings: 0-n
+# ! comment out to don't care
+ocean_crossings = 1
+
# If echelon is set, OP-Addon will try to find an
# exit in the destination country of the current
# request (exit_country may be used as backup)
@@ -55,30 +63,26 @@
#middle_country = RU
#exit_country = US
-# Maximum number of continent-crossings: 0-n
-# ! comment out to enforce distinct continents
-# ! set >= pathlen to not care about
-continent_crossings = 2
-# Maximum number of ocean crossings: 0-n
-# ! comment out to don't care
-ocean_crossings = 0
-
# TODO: excludes = [".."]
-[TESTING]
+[EVALUATE]
-# Testing mode: Close every circuit after testing
+# Evaluation mode: close every circuit after measuring performance
# yes|no
-testing_mode = no
+evaluate = no
# Number of latency-tests per circuit (int: 0-n)
-num_rtt_tests = 5
+num_rtt_tests = 3
+# Number of bandwidth-tests per circuit (int:0 or 1)
+# Requires stream-server.pl listening on the same host
+num_bw_tests = 0
+
# Amount of circuits to test (int)
num_records = 300
[RTT]
-# Measure latencies of complete circuits
+# Ping the latencies of complete circuits
# yes|no
ping_circs = yes
@@ -86,34 +90,34 @@
socks_host = 127.0.0.1
socks_port = 9050
-# Host and port dummies to be used
+# Host- and port-dummies to be used
# for ping-connections
ping_dummy_host = 127.0.0.1
ping_dummy_port = 100
# Time interval to wait before triggering
-# pings and frequency in seconds (float)
+# pings and frequency of pings in seconds (float)
initial_interval = 10
frequency = 5
-# Close circ after n timeouts on measurings
-# Set to 0 to not close circs (int)
+# Close a circuit after n timeouts on measurings
+# Set to 0 to never close circs (int)
timeout_limit = 1
[MODEL]
-# Set to True if you want to measure latencies of single
-# links and enable circuit creation from the model
+# Set to 'yes' to measure latencies of single links
+# and enable circuit creation from the model
# yes|no
network_model = no
-# RTT-threshhold in seconds when creating circs (float):
-# 0: no threshhold, choose from all proposals
-max_rtt = 1
-# Minimum number of proposals to choose from (int)
-min_proposals = 100
# Min ratio of circs created with the backup-method,
# controls growing of the model (float in [0,1])
# 0: no growing
# 1: growing only
min_ratio = 0.5
+# RTT-threshhold in seconds when creating circs (float):
+# 0: no threshhold, choose from all proposals
+max_rtt = 0
+# Minimum number of proposals to choose from (int)
+min_proposals = 100
Added: torflow/trunk/stream-server.pl
===================================================================
--- torflow/trunk/stream-server.pl (rev 0)
+++ torflow/trunk/stream-server.pl 2007-10-30 10:41:28 UTC (rev 12279)
@@ -0,0 +1,44 @@
+#!/usr/bin/perl -w
+
+use strict;
+use IO::Socket::INET;
+
+# specify the port
+my $port = 8041;
+
+# create the socket
+my $server = IO::Socket::INET->new(Listen=>100, LocalPort=>$port, Proto=>'tcp', Reuse=>'yes');
+
+# set the number of bytes one line contains: 1024 Byte = 1 kB
+my $line_count = 1000000;
+
+# print some startup-information
+print "pid ".$$.": listening on port ".$server->sockport."\n";
+
+# main loop
+while(my $client = $server->accept) {
+ if(fork()) {
+ # parent
+ close($client);
+ } else {
+ # child
+ print "pid ".$$.": accepted connection from ".$client->peerhost."\n";
+ while(my $line = <$client>) {
+ if ($line =~ /(\d+)/) {
+ my $counter = $1;
+ while($counter>0) {
+ my $send = ($counter>$line_count) ? $line_count : $counter;
+ print $client "X" x $send;
+ print $client "\r\n";
+ $counter -= $send;
+ }
+ }
+ elsif ($line =~ m/close/) {
+ print "pid ".$$.": closing connection to ".$client->peerhost."\n";
+ close($client);
+ exit(0);
+ }
+ }
+ close($client);
+ }
+}
Property changes on: torflow/trunk/stream-server.pl
___________________________________________________________________
Name: svn:executable
+ *