[Author Prev][Author Next][Thread Prev][Thread Next][Author Index][Thread Index]
[or-cvs] r10417: Added missing countries to GeoIPSupport.py and removed excep (in torflow/trunk: . TorCtl)
Author: renner
Date: 2007-05-31 09:53:02 -0400 (Thu, 31 May 2007)
New Revision: 10417
Modified:
torflow/trunk/TorCtl/GeoIPSupport.py
torflow/trunk/TorCtl/PathSupport.py
torflow/trunk/TorCtl/TorUtil.py
torflow/trunk/op-addon.py
Log:
Added missing countries to GeoIPSupport.py and removed exception for US. Moved sort_list(list, key) to TorUtil because
it's often needed. Introduced functionality to measure & compute RTTs of single links between routers and a NetworkModel
to store these. Began refactoring the event handler code following the hierarchical approach.
Modified: torflow/trunk/TorCtl/GeoIPSupport.py
===================================================================
--- torflow/trunk/TorCtl/GeoIPSupport.py 2007-05-31 07:20:44 UTC (rev 10416)
+++ torflow/trunk/TorCtl/GeoIPSupport.py 2007-05-31 13:53:02 UTC (rev 10417)
@@ -4,6 +4,7 @@
import socket
import GeoIP
import TorCtl
+
from TorUtil import plog
# GeoIP data object: choose database here
@@ -19,10 +20,13 @@
def contains(self, country_code):
return country_code in self.countries
-# The continents
+# Setup the continents
africa = Continent("AF")
-# TODO: Add more countries
-africa.countries = ["CI"]
+africa.countries = ["AO","BF","BI","BJ","BV","BW","CD","CF","CG","CI","CM","CV","DJ","DZ",
+ "EG","EH","ER","ET","GA","GH","GM","GN","GQ","GW","HM","KE","KM","LR",
+ "LS","LY","MA","MG","ML","MR","MU","MW","MZ","NA","NE","NG","RE","RW",
+ "SC","SD","SH","SL","SN","SO","ST","SZ","TD","TF","TG","TN","TZ","UG",
+ "YT","ZA","ZM","ZR","ZW"]
asia = Continent("AS")
asia.countries = ["AP","AE","AF","AM","AZ","BD","BH","BN","BT","CC","CN","CX","CY","GE",
Modified: torflow/trunk/TorCtl/PathSupport.py
===================================================================
--- torflow/trunk/TorCtl/PathSupport.py 2007-05-31 07:20:44 UTC (rev 10416)
+++ torflow/trunk/TorCtl/PathSupport.py 2007-05-31 13:53:02 UTC (rev 10417)
@@ -326,8 +326,6 @@
def r_is_ok(self, path, router):
for r in path:
if router.country_code == r.country_code:
- # Exceptionally allow US because of so many states
- if router.country_code == "US": return True
return False
return True
Modified: torflow/trunk/TorCtl/TorUtil.py
===================================================================
--- torflow/trunk/TorCtl/TorUtil.py 2007-05-31 07:20:44 UTC (rev 10416)
+++ torflow/trunk/TorCtl/TorUtil.py 2007-05-31 13:53:02 UTC (rev 10417)
@@ -15,9 +15,9 @@
import sha
import math
-__all__ = ["Enum", "Enum2", "quote", "escape_dots", "unescape_dots",
- "BufSock", "secret_to_key", "urandom_rng", "s2k_gen", "s2k_check",
- "plog", "ListenSocket", "zprob"]
+__all__ = ["Enum", "Enum2", "sort_list", "quote", "escape_dots", "unescape_dots",
+ "BufSock", "secret_to_key", "urandom_rng", "s2k_gen", "s2k_check", "plog",
+ "ListenSocket", "zprob"]
class Enum:
# Helper: define an ordered dense name-to-number 1-1 mapping.
@@ -37,6 +37,11 @@
for k,v in args.items():
self.nameOf[v] = k
+def sort_list(list, key):
+ """ Sort a list by a specified key """
+ list.sort(lambda x,y: cmp(key(x), key(y))) # Python < 2.4 hack
+ return list
+
def quote(s):
return re.sub(r'([\r\n\\\"])', r'\\\1', s)
Modified: torflow/trunk/op-addon.py
===================================================================
--- torflow/trunk/op-addon.py 2007-05-31 07:20:44 UTC (rev 10416)
+++ torflow/trunk/op-addon.py 2007-05-31 13:53:02 UTC (rev 10417)
@@ -13,22 +13,26 @@
# TODO: import 'with'-statement for Lock objects (Python 2.5: "with some_lock: do something")
import re
import sys
+import copy
import math
import time
-import sched
import socket
import threading
import Queue
+
# Non-standard packages
import socks
-#import networkx
+import networkx
+
+# TorCtl
import TorCtl.PathSupport
import TorCtl.GeoIPSupport
+# From .. import ..
from TorCtl import *
-from TorCtl.TorUtil import plog
+from TorCtl.TorUtil import plog, sort_list
-# Move these to config file
+# TODO: Move these to config-file
control_host = "127.0.0.1"
control_port = 9051
socks_host = "127.0.0.1"
@@ -38,22 +42,34 @@
ping_dummy_host = "127.0.0.1"
ping_dummy_port = 100
-# Close circ after n timeouts or slownesses
-timeout_limit = 2
+# Close circ after n timeouts or avg measured slownesses
+timeout_limit = 1
+slowness_limit = 3
# Slow RTT := x seconds
-slow = 1
-# Set interval between work loads in sec
-sleep_interval = 30
+slow = 1.5
+# Note: Tor-internal lifetime of a circuit is 10 min --> 600/sleep_interval = max-age
+# Set interval between working loads in sec
+sleep_interval = 10
# No of idle circuits to build preemptively
+# TODO: Also configure ports to use
idle_circuits = 6
-# Lock object for regulating access to the circuit list
+# Set to True if we want to measure partial circuits
+measure_partial_circs = False
+
+# Still needed: Lock object for regulating access to the circuit list
circs_lock = threading.Lock()
-# Infos about this proxy TODO: Save in some class
+# Infos about this proxy TODO: Save in some class: Router?
my_ip = None
my_country = None
+# Do configuration here TODO: use my_country for src
+path_config = GeoIPSupport.GeoIPConfig(unique_countries = False,
+ src_country = None,
+ crossings = 1,
+ excludes = ["FR"])
+
# Configure Selection Manager here!!
# Do NOT modify this object directly after it is handed to
# PathBuilder, Use PathBuilder.schedule_selmgr instead.
@@ -66,11 +82,12 @@
use_all_exits=False,
uniform=True,
use_exit=None,
- use_guards=False,
- use_geoip=True)
+ use_guards=True,
+ geoip_config=path_config)
######################################### BEGIN: Connection #####################
+# Circuit building code here
class Connection(TorCtl.Connection):
def build_circuit(self, pathlen, path_sel):
circ = Circuit()
@@ -87,158 +104,226 @@
circ.circ_id = self.extend_circuit(0, circ.id_path())
return circ
+ def build_circuit_80(self, selmgr):
+ """ Set target port to 80 """
+ selmgr.set_target("255.255.255.255", 80)
+ return self.build_circuit(selmgr.pathlen, selmgr.path_selector)
+
+ def build_circuit_from_path(self, path):
+ """ Build circuit using a given path """
+ circ = Circuit()
+ if len(path) > 0:
+ circ.circ_id = self.extend_circuit(0, path)
+
######################################### END: Connection #####################
-######################################### Router, Circuit, Stream #####################
+######################################### Circuit, Stream #####################
# Circuit class extended to RTTs
class Circuit(PathSupport.Circuit):
def __init__(self):
PathSupport.Circuit.__init__(self)
- self.total_rtt = None # double (sec), substitute with..
- self.rtts = {} # dict of partial rtts, for pathlen 3: 1-2-None
- self.timeout_counter = 0 # timeout limit
- self.slowness_counter = 0 # slowness limit
- self.closed = False # Mark circuits closed
+ # RTT stuff
+ self.current_rtt = None # double (sec): current value
+ self.part_rtts = {} # dict of partial rtts, pathlen 3: 1-2-None
+ self.rtt_history = [] # rtt history for computing stats:
+ self.avg_rtt = 0 # avg rtt value
+ self.dev_rtt = 0 # standard deviation
+ # Counters and flags
+ self.age = 0 # age in rounds
+ self.timeout_counter = 0 # timeout limit
+ self.slowness_counter = 0 # slowness limit
+ self.closed = False # mark circuit closed
-# Stream class extended to isPing and hop
+ def get_avg_rtt(self):
+ """ Compute average from history """
+ if len(self.rtt_history) > 0:
+ sum = reduce(lambda x, y: x+y, self.rtt_history, 0.0)
+ return sum/len(self.rtt_history)
+ else:
+ return 0.0
+
+ def get_dev_rtt(self):
+ """ Return the stddev of measured rtts """
+ if len(self.rtt_history) > 0:
+ avg = self.get_avg_rtt()
+ sum = reduce(lambda x, y: x + ((y-avg)**2.0), self.rtt_history, 0.0)
+ return math.sqrt(sum/len(self.rtt_history))
+ else:
+ return 0.0
+
+ def refresh_stats(self):
+ """ Refresh the stats """
+ self.avg_rtt = self.get_avg_rtt()
+ self.dev_rtt = self.get_dev_rtt()
+
class Stream(PathSupport.Stream):
+ """ Stream class extended to isPing and hop """
def __init__(self, sid, host, port, kind):
PathSupport.Stream.__init__(self, sid, host, port, kind)
- self.isPing = False
- self.hop = None # Save hop if this is a ping, None = complete circ
+ self.isPing = False # set this to mark as a ping-stream
+ self.hop = None # save hop if this is a ping, hop=None means complete circ
-######################################### Router, Circuit, Stream #####################
-######################################### BEGIN: Pinger #####################
+######################################### Circuit, Stream #####################
+######################################### BEGIN: NetworkModel #####################
-# A simple "Pinger": try to connect
-# to somewhere via Tor using Socks4a
-class Pinger:
- # Constructor
- def __init__(self, host, port):
- self.connect_host = host
- self.connect_port = port
+class LinkInfo:
+ """ This class contains infos about a link: source, destination, RTT
+ plus: rtt_history, methods to compute stats, etc. """
+ def __init__(self, src, dest, rtt=0):
+ # Set src and dest
+ self.src = src
+ self.dest = dest
+ # Setup the history and record RTT
+ self.rtt_history = []
+ self.set_rtt(rtt)
- # Hmm, there is no "try .. except .. finally .." in Python < 2.5 !!
- def ping(self):
- s = None
- try:
- try:
- s = socks.socksocket()
- s.setproxy(socks.PROXY_TYPE_SOCKS4, socks_host, socks_port)
- s.connect((self.connect_host, self.connect_port))
- except socks.Socks4Error, e:
- # Don't do nothing, this will actually happen
- # print("Got Exception: " + str(e))
- pass
- finally:
- # Close the socket if open
- if s:
- s.close()
+ def set_rtt(self, rtt):
+ self.rtt = rtt
+ self.rtt_history.append(rtt)
-######################################### END: Pinger #####################
-######################################### BEGIN: NetworkModel #####################
+class PathProposal:
+ """ Instances of this class are path-proposals """
+ def __init__(self, links):
+ # This is a list of LinkInfo objects
+ self.links = links
+ # Compute the expected RTT
+ self.rtt = reduce(lambda x,y: x + y.rtt, self.links, 0.0)
+
+ def to_string(self):
+ """ Create a string for printing out information """
+ s = ""
+ for l in self.links:
+ # Get the single objects
+ s += l.src.nickname + "--" + l.dest.nickname + " (" + str(l.rtt) + ") " + ", "
+ return "Route proposal: " + s + "--> " + str(self.rtt) + " sec"
-# This will be used to record measured RTTs
-# of single links and to find fast routes
class NetworkModel:
- def __init__(self):
- # TODO: Use XDiGraph()
- self.graph = networkx.XGraph(selfloops=False, multiedges=False)
- # Add this OP to the model
- self.addRouter("ROOT")
- plog("DEBUG", "NetworkModel initiated")
+ """ This class is used to record measured RTTs for single links in a model of the
+ 'currently explored subnet' (currently this is an undirected graph!) """
+ def __init__(self, rooter):
+ """ Constructor: pass the root of all our circuits """
+ # Use XDiGraph() (= directed)?
+ self.graph = networkx.XGraph(name="Explored Tor Subnet", selfloops=False, multiedges=False)
+ # Initially add THIS proxy to the model
+ self.root = rooter
+ self.graph.add_node(self.root)
+ plog("DEBUG", "NetworkModel initiated: added " + self.root.nickname)
- def addRouter(self, router):
- self.graph.add_node(router)
+ def add_link(self, src, dest, rtt):
+ """ Add a link to the graph given src, dest & rtt """
+ self.graph.add_edge(src, dest, LinkInfo(src, dest, rtt))
- def addLink(self, source, dest):
- self.graph.add_edge(source, dest)
-
-######################################### END: NetworkModel #####################
-######################################### BEGIN: EventHandler #####################
+ def get_link_info(self, path):
+ """ From a path given as list of routers, return link-infos """
+ links = []
+ for i in xrange(0, len(path)-1):
+ # TODO: Check if edge exists
+ links.append(self.graph.get_edge(path[i], path[i+1]))
+ return links
-# DRAFT for a new CircuitManager
-class NewCircuitManager:
- def __init__(self, c):
- self.conn = c # connection to Tor
- self.circuits = {} # dict mapping id:circuit
- self.circs_sorted = [] # list of circs sorted for rtt
+ def find_circuits(self):
+ # Reset list of proposals and prefixes for DFS
+ self.proposals = []
+ self.prefixes = {}
+ # Start the search
+ self.visit(self.root, [])
+ # Sort proposals for their RTTs
+ sort_list(self.proposals, lambda x: x.rtt)
+ # Print all of them for debugging/info
+ for p in self.proposals:
+ print(p.to_string())
- # Sort a list by a specified key
- def sort_list(self, list, key):
- list.sort(lambda x,y: cmp(key(x), key(y))) # Python < 2.4 hack
- return list
+ def visit(self, node, path, i=1):
+ """ Recursive Depth-First-Search: Maybe use some existing method? """
+ if node not in path:
+ path.append(node)
+ # Root -- Exit
+ if len(path) == 4:
+ self.proposals.append(PathProposal(self.get_link_info(path)))
+ else:
+ self.prefixes[i] = path
+ # G is also a dict
+ for n in self.graph[node]:
+ if n not in self.prefixes[i]:
+ self.visit(n, copy.copy(self.prefixes[i]), i+1)
- def refresh_sorted_list(self):
- self.circs_sorted = self.sort_list(self.circuits.values(), lambda x: x.total_rtt)
+ def print_graph(self):
+ """ Print current info about the graph """
+ print(self.graph.info())
+ # Print edges
+ #for e in self.graph.edges():
+ # src, dest, link = e
+ # plog("INFO", "Edge: " + src.nickname + " -- " + dest.nickname + ", RTT = " + str(link.rtt) + " sec")
- def add_circuit(self, circ):
- self.circuits[circ.circ_id] = circ
+######################################### END: NetworkModel #####################
+######################################### BEGIN: EventHandlers #####################
- def del_circuit(self, circ_id):
- # TODO: Test
- del self.circuits[circ_id]
+# CircuitHandler extending PathBuilder
+class CircuitHandler(PathSupport.PathBuilder):
+ def __init__(self, c, selmgr):
+ # Init the PathBuilder
+ PathSupport.PathBuilder.__init__(self, c, selmgr, GeoIPSupport.GeoIPRouter)
+ # Maintain a sorted list of circs that gets regenerated regularly
+ self.circs_sorted = []
+ # Bring up the circuits
+ self.check_circuit_pool()
+
+ def check_circuit_pool(self):
+ """ Init or check the status of our pool of circuits """
+ # Get number of circuits
+ circs_lock.acquire()
+ n = len(self.circuits.values())
+ circs_lock.release()
+ # Some Logging
+ plog("INFO", "Checked circuit-pool: building " + str(idle_circuits-n) + " circuits")
+ # Schedule (idle_circuits-n) circuit-buildups
+ while (n < idle_circuits):
+ self.build_idle_circuit()
+ plog("DEBUG", "Scheduled circuit No. " + str(n+1))
+ n += 1
+ self.print_circuits()
- def new_circuit(self):
+ def build_idle_circuit(self):
+ """ Build an idle circuit """
circ = None
while circ == None:
try:
# Build the circuit
- circ = self.conn.build_circuit(self.selmgr.pathlen, self.selmgr.path_selector)
- self.add_circuit(circ)
+ circ = self.c.build_circuit_80(self.selmgr)
+ # Using lock:
+ circs_lock.acquire()
+ self.circuits[circ.circ_id] = circ
+ circs_lock.release()
except TorCtl.ErrorReply, e:
# FIXME: How come some routers are non-existant? Shouldn't
# we have gotten an NS event to notify us they disappeared?
- plog("NOTICE", "Error building circ: " + str(e.args))
-
- def close_circuit(self, circ_id):
- # try .. except
- self.conn.close_circuit(circ_id)
-
- def attach_stream(self, stream):
- pass
+ plog("NOTICE", "Error building circuit: " + str(e.args))
-###########################################
+ def print_circuits(self):
+ """ Print out our circuits plus some info """
+ circs_lock.acquire()
+ #circs = self.circs_sorted
+ circs = self.circuits.values()
+ plog("INFO", "We have " + str(len(circs)) + " circuits")
+ for c in circs:
+ out = "+ Circuit " + str(c.circ_id) + ": "
+ for r in c.path: out += " " + r.nickname + "(" + str(r.country_code) + ")"
+ if not c.built: out += " (not yet built)"
+ else: out += " (age=" + str(c.age) + ")"
+ if c.current_rtt: out += ": " "RTT last/avg/dev: " + str(c.current_rtt) + "/" + str(c.avg_rtt) + "/"+ str(c.dev_rtt) + ""
+ print(out)
+ circs_lock.release()
-# We need an EventHandler, this one extends PathBuilder
-class EventHandler(PathSupport.PathBuilder):
- def __init__(self, c, selmgr):
- # Call constructor of superclass
- PathSupport.PathBuilder.__init__(self, c, selmgr, GeoIPSupport.GeoIPRouter)
- # Additional stuff
- self.ping_circs = Queue.Queue() # (circ_id, hop)-pairs
- self.start_times = {} # dict mapping (circ_id, hop):start_time TODO: cleanup
- self.circs_sorted = [] # sorted list of circs, generated regularly
- # Set up the CircuitManager, only pass self.circuits instead of self?
- self.circ_manager = CircuitManager(selmgr, c, self)
- self.circ_manager.setDaemon(True)
- self.circ_manager.start()
-
- # Add a circuit to ping, ping_info is (circ_id, hop)
- def queue_ping_circ(self, ping_info):
- self.ping_circs.put(ping_info)
-
- # Send signal "CLEARDNSCACHE"
- def clear_dns_cache(self):
- lines = self.c.sendAndRecv("SIGNAL CLEARDNSCACHE\r\n")
- for _, msg, more in lines:
- plog("DEBUG", "CLEARDNSCACHE: " + msg)
-
- # Sort a list by a specified key
- def sort_list(self, list, key):
- list.sort(lambda x,y: cmp(key(x), key(y))) # Python < 2.4 hack
- return list
-
# Call after each measuring
def refresh_sorted_list(self):
- # Sort the list for RTTs
+ # Sort the list for average RTTs
circs_lock.acquire()
- self.circs_sorted = self.sort_list(self.circuits.values(), lambda x: x.total_rtt)
+ self.circs_sorted = sort_list(self.circuits.values(), lambda x: x.avg_rtt)
circs_lock.release()
plog("DEBUG", "Refreshed sorted list of circuits")
-
- # Do something when circuit-events occur
+
def circ_status_event(self, c):
+ """ Handle circuit status events """
# Construct output for logging
output = [c.event_name, str(c.circ_id), c.status]
if c.path: output.append(",".join(c.path))
@@ -252,9 +337,13 @@
plog("DEBUG", "Ignoring circuit " + str(c.circ_id) + " (controlled by Tor or not yet in the list)")
circs_lock.release()
return
+
+ # EXTENDED
if c.status == "EXTENDED":
self.circuits[c.circ_id].last_extended_at = c.arrived_at
circs_lock.release()
+
+ # FAILED & CLOSED
elif c.status == "FAILED" or c.status == "CLOSED":
# XXX: Can still get a STREAM FAILED for this circ after this
circ = self.circuits[c.circ_id]
@@ -265,16 +354,19 @@
for stream in circ.pending_streams:
if stream.isPing:
#plog("DEBUG", "Finding new circ for ping stream " + str(stream.strm_id))
+ # Close the stream?
pass
if not stream.isPing:
plog("DEBUG", "Finding new circ for " + str(stream.strm_id))
self.attach_stream_any(stream, stream.detached_from)
# Refresh the list
self.refresh_sorted_list()
+ # Check if there are enough circs
+ self.check_circuit_pool()
return
- # TODO: Check if there are enough circs?
+
+ # BUILT
elif c.status == "BUILT":
- # TODO: Perform a measuring directly?
self.circuits[c.circ_id].built = True
try:
for stream in self.circuits[c.circ_id].pending_streams:
@@ -286,15 +378,34 @@
circs_lock.release()
return
circs_lock.release()
+
+ # OTHER?
else:
# If this was e.g. a LAUNCHED
circs_lock.release()
+######################################### END: CircuitHandler #####################
+######################################### BEGIN: StreamHandler #####################
+
+# StreamHandler that extends CircuitHandler
+class StreamHandler(CircuitHandler):
+ def __init__(self, c, selmgr):
+ # Call constructor of superclass
+ CircuitHandler.__init__(self, c, selmgr)
+
+ # Send signal "CLEARDNSCACHE"
+ def clear_dns_cache(self):
+ lines = self.c.sendAndRecv("SIGNAL CLEARDNSCACHE\r\n")
+ for _, msg, more in lines:
+ plog("DEBUG", "CLEARDNSCACHE: " + msg)
+
# Attach a regular user stream
def attach_stream_any(self, stream, badcircs):
+
# To be able to always choose the fastest:
# slows down attaching?
self.clear_dns_cache()
+
# Newnym, and warn if not built plus pending
unattached_streams = [stream]
if self.new_nym:
@@ -314,7 +425,8 @@
# Choose from the sorted list!
for circ in self.circs_sorted:
- if circ.built and circ.total_rtt and circ.circ_id not in badcircs and not circ.closed:
+ # Only attach if we already measured
+ if circ.built and circ.current_rtt and circ.circ_id not in badcircs and not circ.closed:
if circ.exit.will_exit_to(stream.host, stream.port):
try:
self.c.attach_stream(stream.strm_id, circ.circ_id)
@@ -348,11 +460,83 @@
circs_lock.release()
self.last_exit = circ.exit
+ # Catch user stream events
+ def handle_other_events(self, s):
+ # SUCCEEDED
+ if s.status == "SUCCEEDED":
+ if s.strm_id not in self.streams:
+ plog("NOTICE", "Succeeded stream " + str(s.strm_id) + " not found")
+ return
+ if s.circ_id and self.streams[s.strm_id].pending_circ.circ_id != s.circ_id:
+ # Hrmm.. this can happen on a new-nym.. Very rare, putting warn
+ # in because I'm still not sure this is correct
+ plog("WARN", "Mismatch of pending: "
+ + str(self.streams[s.strm_id].pending_circ.circ_id) + " vs "
+ + str(s.circ_id))
+ circs_lock.acquire()
+ self.streams[s.strm_id].circ = self.circuits[s.circ_id]
+ circs_lock.release()
+ else:
+ self.streams[s.strm_id].circ = self.streams[s.strm_id].pending_circ
+ self.streams[s.strm_id].pending_circ.pending_streams.remove(self.streams[s.strm_id])
+ self.streams[s.strm_id].pending_circ = None
+ self.streams[s.strm_id].attached_at = s.arrived_at
+
+ # FAILED or CLOSED
+ elif s.status == "FAILED" or s.status == "CLOSED":
+ if s.strm_id not in self.streams:
+ plog("NOTICE", "Failed stream " + str(s.strm_id) + " not found")
+ return
+ #if not s.circ_id: plog("WARN", "Stream " + str(s.strm_id) + " closed/failed from no circuit")
+ # We get failed and closed for each stream. OK to return
+ # and let the CLOSED do the cleanup
+ if s.status == "FAILED":
+ # Avoid busted circuits that will not resolve or carry traffic
+ self.streams[s.strm_id].failed = True
+ circs_lock.acquire()
+ if s.circ_id in self.circuits: self.circuits[s.circ_id].dirty = True
+ elif self.streams[s.strm_id].attached_at != 0:
+ plog("WARN","Failed stream on unknown circuit " + str(s.circ_id))
+ circs_lock.release()
+ return
+ # CLOSED
+ if self.streams[s.strm_id].pending_circ:
+ self.streams[s.strm_id].pending_circ.pending_streams.remove(self.streams[s.strm_id])
+ # Actual removal of the stream
+ del self.streams[s.strm_id]
+
+ # REMAP
+ elif s.status == "REMAP":
+ if s.strm_id not in self.streams:
+ plog("WARN", "Remap id "+str(s.strm_id)+" not found")
+ else:
+ if not re.match(r"\d+.\d+.\d+.\d+", s.target_host):
+ s.target_host = "255.255.255.255"
+ plog("NOTICE", "Non-IP remap for "+str(s.strm_id) + " to " + s.target_host)
+ self.streams[s.strm_id].host = s.target_host
+ self.streams[s.strm_id].port = s.target_port
+
+######################################### END: StreamHandler #####################
+######################################### BEGIN: PingHandler #####################
+
+# This class extends the StreamHandler
+class PingHandler(StreamHandler):
+ def __init__(self, c, selmgr, router, partial):
+ # Init the StreamHandler
+ StreamHandler.__init__(self, c, selmgr)
+ # Anything ping-related
+ self.ping_queue = Queue.Queue() # (circ_id, hop)-pairs
+ self.start_times = {} # dict mapping (circ_id, hop):start_time TODO: cleanup
+ # Start the Pinger that schedules the ping-connections
+ self.ping_manager = Pinger(self, router, partial)
+ self.ping_manager.setDaemon(True)
+ self.ping_manager.start()
+
# Attach a ping stream to its circuit
- def attach_ping(self, stream):
+ def attach_ping(self, stream, arrived_at):
plog("DEBUG", "New ping request")
# Get info from the Queue TODO: check if empty
- ping_info = self.ping_circs.get()
+ ping_info = self.ping_queue.get()
# Extract ping-info
circ_id = ping_info[0]
hop = ping_info[1]
@@ -363,10 +547,11 @@
# Get the circuit
if circ_id in self.circuits:
circ = self.circuits[circ_id]
+ # TODO: and not circ.busy
if circ.built and not circ.closed:
self.c.attach_stream(stream.strm_id, circ.circ_id, hop)
# Measure here or move to before attaching?
- self.start_times[(circ_id, hop)] = time.time()
+ self.start_times[(circ_id, hop)] = arrived_at
stream.hop = hop
stream.pending_circ = circ # Only one possible here
circ.pending_streams.append(stream)
@@ -380,14 +565,15 @@
except TorCtl.ErrorReply, e:
plog("WARN", "Error attaching stream: " + str(e.args))
- # Catch stream status events
def stream_status_event(self, s):
+ """ Catch stream status events: Handle NEW and DETACHED here,
+ pass other events to StreamHandler """
# Construct debugging output
output = [s.event_name, str(s.strm_id), s.status, str(s.circ_id), s.target_host, str(s.target_port)]
if s.reason: output.append("REASON=" + s.reason)
if s.remote_reason: output.append("REMOTE_REASON=" + s.remote_reason)
plog("DEBUG", " ".join(output))
-
+
# If target_host is not an IP-address
if not re.match(r"\d+.\d+.\d+.\d+", s.target_host):
s.target_host = "255.255.255.255" # ignore DNS for exit policy check
@@ -403,7 +589,7 @@
if (stream.host == ping_dummy_host) & (stream.port == ping_dummy_port):
# Set isPing
stream.isPing = True
- self.attach_ping(stream)
+ self.attach_ping(stream, s.arrived_at)
else:
self.attach_stream_any(self.streams[s.strm_id], self.streams[s.strm_id].detached_from)
@@ -430,29 +616,37 @@
# Close the circuit
plog("DEBUG", "Reached limit on timeouts --> closing circuit " + str(s.circ_id))
self.circuits[s.circ_id].closed = True
- self.c.close_circuit(s.circ_id)
+ try: self.c.close_circuit(s.circ_id)
+ except TorCtl.ErrorReply, e:
+ plog("ERROR", "Failed closing circuit " + str(s.circ_id) + ": " + str(e))
# Set RTT for circ to None
- self.circuits[s.circ_id].total_rtt = None
+ self.circuits[s.circ_id].current_rtt = None
circs_lock.release()
# Only close the stream
self.c.close_stream(s.strm_id, 7)
return
# This is a successful ping: measure here
- now = time.time()
hop = self.streams[s.strm_id].hop
- rtt = now - self.start_times[(s.circ_id, hop)]
+ # Compute RTT using arrived_at
+ rtt = s.arrived_at - self.start_times[(s.circ_id, hop)]
plog("INFO", "Measured RTT: " + str(rtt) + " sec")
# Save RTT to circuit
- self.circuits[s.circ_id].rtts[hop] = rtt
- # Additionally save total_rtt ?
+ self.circuits[s.circ_id].part_rtts[hop] = rtt
if hop == None:
- self.circuits[s.circ_id].total_rtt = rtt
+ # This is a total circuit measuring
+ self.circuits[s.circ_id].current_rtt = rtt
+ self.circuits[s.circ_id].rtt_history.append(rtt)
+ plog("DEBUG", "Added RTT to history: " + str(self.circuits[s.circ_id].rtt_history))
+ # Refresh the stats
+ self.circuits[s.circ_id].refresh_stats()
+ self.circuits[s.circ_id].age += 1
- # Close if slow-max is reached
- if rtt >= slow:
+ # Close if slow-max is reached on avg_rtt
+ if self.circuits[s.circ_id].avg_rtt >= slow:
self.circuits[s.circ_id].slowness_counter += 1
- if self.circuits[s.circ_id].slowness_counter >= timeout_limit and not self.circuits[s.circ_id].closed:
- plog("DEBUG", "Slow-max is reached --> closing circuit " + str(s.circ_id))
+ plog("DEBUG", "Slow circuit " + str(s.circ_id))
+ if self.circuits[s.circ_id].slowness_counter >= slowness_limit and not self.circuits[s.circ_id].closed:
+ plog("DEBUG", "Slow-max (" + str(slowness_limit) + ") is reached --> closing circuit " + str(s.circ_id))
self.circuits[s.circ_id].closed = True
self.c.close_circuit(s.circ_id)
@@ -465,15 +659,9 @@
# Detect timeouts on user streams
if s.reason == "TIMEOUT":
- circs_lock.acquire()
- self.circuits[s.circ_id].timeout_counter += 1
- plog("DEBUG", str(self.circuits[s.circ_id].timeout_counter) + " timeout(s) on circuit " + str(s.circ_id))
- if self.circuits[s.circ_id].timeout_counter >= timeout_limit and not self.circuits[s.circ_id].closed:
- # Close the circuit
- plog("DEBUG", "Reached limit on timeouts --> closing circuit " + str(s.circ_id))
- self.circuits[s.circ_id].closed = True
- self.c.close_circuit(s.circ_id)
- circs_lock.release()
+ # Don't increase counter, cause could be a machine that's not responding
+ #self.circuits[s.circ_id].timeout_counter += 1
+ plog("DEBUG", "User stream timed out on circuit " + str(s.circ_id))
# Stream was pending
if self.streams[s.strm_id] in self.streams[s.strm_id].pending_circ.pending_streams:
@@ -482,155 +670,121 @@
self.streams[s.strm_id].pending_circ = None
self.attach_stream_any(self.streams[s.strm_id], self.streams[s.strm_id].detached_from)
- # SUCCEEDED
- elif s.status == "SUCCEEDED":
- if s.strm_id not in self.streams:
- plog("NOTICE", "Succeeded stream " + str(s.strm_id) + " not found")
- return
- if s.circ_id and self.streams[s.strm_id].pending_circ.circ_id != s.circ_id:
- # Hrmm.. this can happen on a new-nym.. Very rare, putting warn
- # in because I'm still not sure this is correct
- plog("WARN", "Mismatch of pending: "
- + str(self.streams[s.strm_id].pending_circ.circ_id) + " vs "
- + str(s.circ_id))
- circs_lock.acquire()
- self.streams[s.strm_id].circ = self.circuits[s.circ_id]
- circs_lock.release()
- else:
- self.streams[s.strm_id].circ = self.streams[s.strm_id].pending_circ
- self.streams[s.strm_id].pending_circ.pending_streams.remove(self.streams[s.strm_id])
- self.streams[s.strm_id].pending_circ = None
- self.streams[s.strm_id].attached_at = s.arrived_at
-
- # FAILED or CLOSED
- elif s.status == "FAILED" or s.status == "CLOSED":
- if s.strm_id not in self.streams:
- plog("NOTICE", "Failed stream " + str(s.strm_id) + " not found")
- return
- #if not s.circ_id: plog("WARN", "Stream " + str(s.strm_id) + " closed/failed from no circuit")
- # We get failed and closed for each stream. OK to return
- # and let the CLOSED do the cleanup
- if s.status == "FAILED":
- # Avoid busted circuits that will not resolve or carry traffic
- self.streams[s.strm_id].failed = True
- circs_lock.acquire()
- if s.circ_id in self.circuits: self.circuits[s.circ_id].dirty = True
- elif self.streams[s.strm_id].attached_at != 0:
- plog("WARN","Failed stream on unknown circuit " + str(s.circ_id))
- circs_lock.release()
- return
- # CLOSED
- if self.streams[s.strm_id].pending_circ:
- self.streams[s.strm_id].pending_circ.pending_streams.remove(self.streams[s.strm_id])
- # Actual removal of the stream
- del self.streams[s.strm_id]
+ else:
+ self.handle_other_events(s)
- # REMAP
- elif s.status == "REMAP":
- if s.strm_id not in self.streams:
- plog("WARN", "Remap id "+str(s.strm_id)+" not found")
- else:
- if not re.match(r"\d+.\d+.\d+.\d+", s.target_host):
- s.target_host = "255.255.255.255"
- plog("NOTICE", "Non-IP remap for "+str(s.strm_id) + " to " + s.target_host)
- self.streams[s.strm_id].host = s.target_host
- self.streams[s.strm_id].port = s.target_port
+######################################### END: PingHandler #####################
+######################################### BEGIN: Pinger #####################
-######################################### END: EventHandler #####################
-######################################### BEGIN: CircuitManager #####################
-
-# This is the main class that keeps track of:
-# -- Connection to Tor
-# -- EventHandler
-#
-# Does work regularly
-# TODO: Switch circuit-managing off to get circuits created from Tor
-# TODO: Add a NetworkModel to this!
-# TODO: Make this to contain the circuit-list and use a pinger-thread
-
-class CircuitManager(threading.Thread):
-
- def __init__(self, selmgr, conn, event_handler):
- # Set everything
- self.selmgr = selmgr
- self.conn = conn
- self.handler = event_handler
- # Create the Pinger
- self.pinger = Pinger(ping_dummy_host, ping_dummy_port)
+class Pinger(threading.Thread):
+ """ Separate thread that triggers the Socks4-connections for pings """
+ def __init__(self, ping_handler, router=None, partial=False):
+ self.handler = ping_handler # The EventHandler
+ self.router = router # This router is us
+ self.measure_partial = partial # Flag to switch off partial measurings
+ if self.measure_partial:
+ # Create the model for recording link-RTTs
+ self.model = NetworkModel(self.router)
+ # The Pinger-object
+ #self.pinger = Pinger(ping_dummy_host, ping_dummy_port)
# Call constructor of superclass
threading.Thread.__init__(self)
- # The run()-method
def run(self):
+ """ The run()-method """
while self.isAlive():
+ time.sleep(sleep_interval)
self.do_work()
- time.sleep(sleep_interval)
-
- # Do the work
+
def do_work(self):
- # Get number of circuits
- circs_lock.acquire()
- n = len(self.handler.circuits.values())
- circs_lock.release()
- # Schedule (idle_circuits-n) circuit-buildups
- while (n < idle_circuits):
- self.build_idle_circuit()
- plog("DEBUG", "Scheduled circuit No. " + str(n+1))
- n += 1
+ """ Do the work """
# Measure RTTs of circuits
self.measure()
- self.print_circuits()
+ # self.handler.schedule_low_prio(lambda x: x.measure())
+ # Compute link-RTTs
+ if self.measure_partial:
+ self.compute_link_RTTs()
+ # Print circuits for info
+ self.handler.schedule_low_prio(lambda x: x.print_circuits())
- # Build an idle circuit
- # Better here than in EventHandler's thread
- def build_idle_circuit(self):
- circ = None
- while circ == None:
- try:
- # Build the circuit
- circ = self.conn.build_circuit(self.selmgr.pathlen, self.selmgr.path_selector)
- # Using lock:
- circs_lock.acquire()
- self.handler.circuits[circ.circ_id] = circ
- circs_lock.release()
- except TorCtl.ErrorReply, e:
- # FIXME: How come some routers are non-existant? Shouldn't
- # we have gotten an NS event to notify us they disappeared?
- plog("NOTICE", "Error building circ: " + str(e.args))
-
- # Measure RTTs of all circuits
def measure(self):
+ """ Measure RTTs of all circuits """
+ # XXX: Schedule in the EventHandler or leave out queueing!
circs_lock.acquire()
circs = self.handler.circuits.values()
circs_lock.release()
for c in circs:
if c.built:
- # Get length of c ...
- id = c.circ_id
- # TODO: Measure for all hops, test if result is
- # bigger each time, else start again
- #self.handler.queue_ping_circ((id, 2))
- # Trigger ping
- #self.pinger.ping()
- # Put in the queue (circ, hop), XXX: synchronize!
- self.handler.queue_ping_circ((id, None))
- # Trigger ping
- self.pinger.ping()
+ # Get id & length of c
+ id = c.circ_id
+ if self.measure_partial:
+ path_len = len(c.path)
+ for i in xrange(1, path_len):
+ # Put in the queue: (circ, hop)
+ self.handler.ping_queue.put((id, i))
+ self.ping()
+ # And for the whole circuit ...
+ self.handler.ping_queue.put((id, None))
+ self.ping()
- # Print circuits
- def print_circuits(self):
+ # Hmm, there is no "try .. except .. finally .." in Python < 2.5 !!
+ def ping(self):
+ """ This creates a connection to dummy_host/_port using Socks4 """
+ s = None
+ try:
+ try:
+ s = socks.socksocket()
+ s.setproxy(socks.PROXY_TYPE_SOCKS4, socks_host, socks_port)
+ s.connect((ping_dummy_host, ping_dummy_port))
+ except socks.Socks4Error, e:
+ # Don't do nothing, this will actually happen
+ # print("Got Exception: " + str(e))
+ pass
+ finally:
+ # Close the socket if open
+ if s:
+ s.close()
+
+ def compute_link_RTTs(self):
+ """ Get a copy(?) of the circs an check if we can compute links """
+ # XXX: Get the circuits
circs_lock.acquire()
circs = self.handler.circuits.values()
- plog("INFO", "We have " + str(len(circs)) + " circuits")
+ circs_lock.release()
for c in circs:
- out = "+ Circuit " + str(c.circ_id) + ": "
- for r in c.path: out = out + " " + r.nickname + "(" + str(r.country_code) + ")"
- if c.total_rtt: out = out + " (RTT=" + str(c.total_rtt) + ")"
- if not c.built: out = out + " (not yet built)"
- print(out)
- circs_lock.release()
+ # Get the length
+ path_len = len(c.path)
+ # Go through the path
+ for i in xrange(1,path_len):
+ if i in c.part_rtts:
+ # First hop --> add Link from Root to 1
+ if i == 1:
+ link_rtt = c.part_rtts[i]
+ self.model.add_link(self.router, c.path[i-1], link_rtt)
+ # Handle i -- (i+1)
+ if i+1 in c.part_rtts:
+ link_rtt = c.part_rtts[i+1] - c.part_rtts[i]
+ if link_rtt > 0:
+ plog("INFO", "Computed link RTT = " + str(link_rtt))
+ # Save to NetworkModel
+ self.model.add_link(c.path[i-1], c.path[i], link_rtt)
+ else:
+ plog("WARN", "Negative RTT: " + str(link_rtt))
+ # Handle (n-1) -- n
+ elif None in c.part_rtts:
+ # We have a total value
+ link_rtt = c.part_rtts[None] - c.part_rtts[i]
+ if link_rtt > 0:
+ plog("INFO", "Computed link RTT = " + str(link_rtt))
+ # Save to NetworkModel
+ self.model.add_link(c.path[i-1], c.path[i], link_rtt)
+ else:
+ plog("WARN", "Negative RTT: " + str(link_rtt))
+ # Print out the model
+ self.model.print_graph()
+ self.model.find_circuits()
-######################################### END: CircuitManager #####################
+######################################### END: Pinger #####################
# Return a connection to Tor's control port
def connect(control_host, control_port):
@@ -641,7 +795,8 @@
# Do the configuration
def configure(conn):
- # Get our own IP and country here, TODO: use try .. except?
+ global my_ip, my_country
+ # Get our own IP and country
try:
info = conn.get_info("address")
my_ip = info["address"]
@@ -664,11 +819,13 @@
conn = connect(control_host, control_port)
#conn.debug(file("control.log", "w"))
conn.authenticate()
+ # Configure myself
+ configure(conn)
+ # Setup a router instance here
+ router = GeoIPSupport.GeoIPRouter(TorCtl.Router(None,"ROOT",None,False,None,None,my_ip,None,None))
# Set Handler to the connection
- handler = EventHandler(conn, __selmgr)
+ handler = PingHandler(conn, __selmgr, router, measure_partial_circs)
conn.set_event_handler(handler)
- # Configure myself
- configure(conn)
# Go to sleep to be able to get killed from the commandline
try:
while True: