[Author Prev][Author Next][Thread Prev][Thread Next][Author Index][Thread Index]
[or-cvs] r9498: All your streams are now belong to us. Streams are now attac (torflow/trunk)
Author: mikeperry
Date: 2007-02-06 06:38:32 -0500 (Tue, 06 Feb 2007)
New Revision: 9498
Modified:
torflow/trunk/TorCtl.py
torflow/trunk/metatroller.py
Log:
All your streams are now belong to us. Streams are now attached to
circuits. Multiple concurrent streams tested (briefly).
Also, added NodeSelector interface to TorCtl.
Modified: torflow/trunk/TorCtl.py
===================================================================
--- torflow/trunk/TorCtl.py 2007-02-06 05:47:35 UTC (rev 9497)
+++ torflow/trunk/TorCtl.py 2007-02-06 11:38:32 UTC (rev 9498)
@@ -54,6 +54,21 @@
"Filled in during NS events"
pass
+class NodeSelector:
+ "Interface for node selection policies"
+ def __init__(self, target_ip, target_port):
+ self.to_ip = target_ip
+ self.to_port = target_port
+
+ def entry_chooser(self, path):
+ raise NotImplemented
+
+ def middle_chooser(self, path):
+ raise NotImplemented
+
+ def exit_chooser(self, path):
+ raise NotImplemented
+
class ExitPolicyLine:
def __init__(self, match, ip_mask, port_low, port_high):
self.match = match
@@ -506,17 +521,17 @@
raise ProtocolError("Bad extended line %r",msg)
return int(m.group(1))
- def build_circuit(self, pathlen, entry_chooser, middle_chooser, exit_chooser):
+ def build_circuit(self, pathlen, nodesel):
circ = Circuit()
if pathlen == 1:
- circ.exit = exit_chooser(circ.path)
+ circ.exit = nodesel.exit_chooser(circ.path)
circ.path = [circ.exit.idhex]
circ.cid = self.extend_circuit(0, circ.path)
else:
- circ.path.append(entry_chooser(circ.path).idhex)
+ circ.path.append(nodesel.entry_chooser(circ.path).idhex)
for i in xrange(1, pathlen-1):
- circ.path.append(middle_chooser(circ.path).idhex)
- circ.exit = exit_chooser(circ.path)
+ circ.path.append(nodesel.middle_chooser(circ.path).idhex)
+ circ.exit = nodesel.exit_chooser(circ.path)
circ.path.append(circ.exit.idhex)
circ.cid = self.extend_circuit(0, circ.path)
circ.created_at = datetime.datetime.now()
Modified: torflow/trunk/metatroller.py
===================================================================
--- torflow/trunk/metatroller.py 2007-02-06 05:47:35 UTC (rev 9497)
+++ torflow/trunk/metatroller.py 2007-02-06 11:38:32 UTC (rev 9498)
@@ -14,6 +14,7 @@
import traceback
import re
import random
+import time
from TorUtil import *
routers = {} # indexed by idhex
@@ -30,9 +31,8 @@
exit_port_idx = {} # Used in ordered exits mode
-# XXX: Option to ignore guard flag
-
-# XXX: Move these to config file
+# TODO: Move these to config file
+# TODO: Option to ignore guard flag
control_host = "127.0.0.1"
control_port = 9061
max_detach = 3
@@ -53,37 +53,49 @@
class MetaCircuit(TorCtl.Circuit):
def __init__(self, circuit): # Promotion
self.__dict__ = circuit.__dict__
+ self.built = False
self.detached_cnt = 0
self.used_cnt = 0
- self.created_at = 0
+ self.created_at = time.time()
+ self.pending_streams = [] # Which stream IDs are pending us
+
class Stream:
- def __init__(self):
+ def __init__(self, sid, host, port):
+ self.sid = sid
self.detached_from = [] # circ id #'s
- self.detached_cnt = 0
+ self.pending_circ = None
+ self.circ = None
+ self.host = host
+ self.port = port
-# XXX: Technically we should obey fast and valid????
-def choose_entry_uniform(path):
- r = random.choice(sorted_g)
- while r.idhex in path:
+# TODO: Obviously we need other node selector implementations
+
+class UniformSelector(TorCtl.NodeSelector):
+ "Uniform node selection"
+ # FIXME: Technically we should obey fast and valid
+ def entry_chooser(self, path):
r = random.choice(sorted_g)
- return r
+ while r.idhex in path:
+ r = random.choice(sorted_g)
+ return r
-def choose_middle_uniform(path):
- r = random.choice(sorted_r)
- while r.idhex in path:
+ def middle_chooser(self, path):
r = random.choice(sorted_r)
- return r
+ while r.idhex in path:
+ r = random.choice(sorted_r)
+ return r
-def choose_exit_uniform(path, target_ip, target_port):
- allowed = []
- for r in sorted_r:
- if r.will_exit_to(target_ip, target_port):
- allowed.append(r)
- r = random.choice(allowed)
- while r.idhex in path:
+ def exit_chooser(self, path):
+ allowed = []
+ for r in sorted_r:
+ if r.will_exit_to(self.to_ip, self.to_port):
+ allowed.append(r)
r = random.choice(allowed)
- return r
+ while r.idhex in path:
+ r = random.choice(allowed)
+ return r
+
def read_routers(c, nslist):
bad_key = 0
@@ -124,42 +136,84 @@
TorCtl.EventHandler.__init__(self)
self.c = c
+ def attach_stream_any(self, stream, badcircs):
+ for circ in circuits.itervalues():
+ if circ.built and circ.cid not in badcircs:
+ if circ.exit.will_exit_to(stream.host, stream.port):
+ self.c.attach_stream(stream.sid, circ.cid)
+ stream.pending_circ = None
+ stream.circ = circ
+ circ.used_cnt += 1
+ break
+ else:
+ circ = MetaCircuit(self.c.build_circuit(3,
+ UniformSelector(stream.host, stream.port)))
+ stream.pending_circ = circ
+ circ.pending_streams.append(stream)
+ circuits[circ.cid] = circ
+
def circ_status(self, eventtype, circID, status, path, reason, remote):
output = [eventtype, str(circID), status]
if path: output.append(",".join(path))
if reason: output.append("REASON=" + reason)
if remote: output.append("REMOTE_REASON=" + remote)
plog("DEBUG", " ".join(output))
+ # Circuits we don't control get built by Tor
+ if circID not in circuits: return
+ if status == "FAILED" or status == "CLOSED":
+ circ = circuits[circID]
+ del circuits[circID]
+ for stream in circ.pending_streams:
+ self.attach_stream_any(stream, stream.detached_from)
+ elif status == "BUILT":
+ circuits[circID].built = True
+ for stream in circuits[circID].pending_streams:
+ self.c.attach_stream(stream.sid, circID)
+ circuits[circID].used_cnt += 1
def stream_status(self, eventtype, streamID, status, circID, target_host, target_port, reason, remote):
output = [eventtype, str(streamID), status, str(circID), target_host,
-str(target_port)]
+ str(target_port)]
if reason: output.append("REASON=" + reason)
if remote: output.append("REMOTE_REASON=" + remote)
plog("DEBUG", " ".join(output))
if not re.match(r"\d+.\d+.\d+.\d+", target_host):
target_host = "255.255.255.255" # ignore DNS for exit policy check
- if status == "NEW":
- attach_circ = 0
+ if status == "NEW" or status == "NEWRESOLVE":
global circuits
- for circ in circuits.itervalues():
- if circ.exit.will_exit_to(target_host, target_port):
- attach_circ = circ
- break
+ streams[streamID] = Stream(streamID, target_host, target_port)
+
+ self.attach_stream_any(streams[streamID],
+ streams[streamID].detached_from)
+ elif status == "DETACHED":
+ global circuits
+ if streamID not in streams:
+ plog("WARN", "Detached stream "+str(streamID)+" not found")
+ streams[streamID] = Stream(streamID, target_host, target_port)
+ # FIXME Stats
+ if not circID:
+ plog("WARN", "Stream "+str(streamID)+" detached from no circuit!")
else:
- attach_circ = MetaCircuit(
- self.c.build_circuit(3, choose_entry_uniform,
- choose_middle_uniform,
- lambda path:
- choose_exit_uniform(path,
- target_host, target_port)))
- circuits[attach_circ.cid] = attach_circ
- # TODO: attach
- elif status == "DETACHED":
- pass
- elif status == "FAILED":
- pass
+ streams[streamID].detached_from.append(circID)
+ self.attach_stream_any(streams[streamID],
+ streams[streamID].detached_from)
+ elif status == "FAILED" or status == "CLOSED":
+ # FIXME stats
+ if streams[streamID].pending_circ:
+ streams[streamID].pending_circ.pending_streams.remove(streams[streamID])
+ del streams[streamID]
+ elif status == "REMAP":
+ if streamID not in streams:
+ plog("WARN", "Remap id "+str(streamID)+" not found")
+ else:
+ if not re.match(r"\d+.\d+.\d+.\d+", target_host):
+ target_host = "255.255.255.255"
+ plog("NOTICE", "Non-IP remap for "+str(streamID)+" to "
+ + target_host)
+ streams[streamID].host = target_host
+ streams[streamID].port = target_port
+
def ns(self, eventtype, nslist):
read_routers(self.c, nslist)
plog("DEBUG", "Read " + str(len(nslist)) + " NS dox => "
@@ -171,23 +225,23 @@
plog("DEBUG", "Read " + str(len(identities)) + " desc => "
+ str(len(sorted_r)) + " routers")
-def deconf():
- pass
-
def metaloop(c):
"""Loop that handles metatroller commands"""
nslist = c.get_network_status()
read_routers(c, nslist)
plog("INFO", "Read "+str(len(sorted_r))+"/"+str(len(nslist))+" routers")
+ # XXX: Loop for commands on socket
def main(argv):
- atexit.register(deconf)
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.connect((control_host,control_port))
c = TorCtl.get_connection(s)
c.set_event_handler(SnakeHandler(c))
th = c.launch_thread()
c.authenticate()
+ atexit.register(lambda:
+ c.set_option("__LeaveStreamsUnattached", "0"))
+ c.set_option("__LeaveStreamsUnattached", "1")
c.set_events([TorCtl.EVENT_TYPE.STREAM,
TorCtl.EVENT_TYPE.NS,
TorCtl.EVENT_TYPE.CIRC,