[Author Prev][Author Next][Thread Prev][Thread Next][Author Index][Thread Index]
[or-cvs] r9362: Initial commit of python version of metatroller and new TorC (torflow/trunk)
Author: mikeperry
Date: 2007-01-15 21:35:33 -0500 (Mon, 15 Jan 2007)
New Revision: 9362
Added:
torflow/trunk/TorCtl.py
torflow/trunk/metatroller.py
Log:
Initial commit of python version of metatroller and new TorCtl. They seem to
be in working form, more or less :)
Added: torflow/trunk/TorCtl.py
===================================================================
--- torflow/trunk/TorCtl.py 2007-01-16 00:01:42 UTC (rev 9361)
+++ torflow/trunk/TorCtl.py 2007-01-16 02:35:33 UTC (rev 9362)
@@ -0,0 +1,966 @@
+#!/usr/bin/python
+# TorCtl.py -- Python module to interface with Tor Control interface.
+# Copyright 2005 Nick Mathewson -- See LICENSE for licensing information.
+#$Id: TorCtl.py 6882 2005-11-19 19:42:31Z nickm $
+
+"""
+TorCtl -- Library to control Tor processes. See TorCtlDemo.py for example use.
+"""
+
+import os
+import re
+import struct
+import sys
+import threading
+import Queue
+import datetime
+import traceback
+import socket
+import binascii
+import types
+
+# XXX: Make a TorUtil.py
+class _Enum:
+ # Helper: define an ordered dense name-to-number 1-1 mapping.
+ def __init__(self, start, names):
+ self.nameOf = {}
+ idx = start
+ for name in names:
+ setattr(self,name,idx)
+ self.nameOf[idx] = name
+ idx += 1
+
+class _Enum2:
+ # Helper: define an ordered sparse name-to-number 1-1 mapping.
+ def __init__(self, **args):
+ self.__dict__.update(args)
+ self.nameOf = {}
+ for k,v in args.items():
+ self.nameOf[v] = k
+
+def _quote(s):
+ return re.sub(r'([\r\n\\\"])', r'\\\1', s)
+
+def _escape_dots(s, translate_nl=1):
+ if translate_nl:
+ lines = re.split(r"\r?\n", s)
+ else:
+ lines = s.split("\r\n")
+ if lines and not lines[-1]:
+ del lines[-1]
+ for i in xrange(len(lines)):
+ if lines[i].startswith("."):
+ lines[i] = "."+lines[i]
+ lines.append(".\r\n")
+ return "\r\n".join(lines)
+
+def _unescape_dots(s, translate_nl=1):
+ lines = s.split("\r\n")
+
+ for i in xrange(len(lines)):
+ if lines[i].startswith("."):
+ lines[i] = lines[i][1:]
+
+ if lines and lines[-1]:
+ lines.append("")
+
+ if translate_nl:
+ return "\n".join(lines)
+ else:
+ return "\r\n".join(lines)
+
+class _BufSock:
+ def __init__(self, s):
+ self._s = s
+ self._buf = []
+
+ def readline(self):
+ if self._buf:
+ idx = self._buf[0].find('\n')
+ if idx >= 0:
+ result = self._buf[0][:idx+1]
+ self._buf[0] = self._buf[0][idx+1:]
+ return result
+
+ while 1:
+ s = self._s.recv(128)
+ if not s:
+ raise TorCtlClosed()
+ idx = s.find('\n')
+ if idx >= 0:
+ self._buf.append(s[:idx+1])
+ result = "".join(self._buf)
+ rest = s[idx+1:]
+ if rest:
+ self._buf = [ rest ]
+ else:
+ del self._buf[:]
+ return result
+ else:
+ self._buf.append(s)
+
+ def write(self, s):
+ self._s.send(s)
+
+ def close(self):
+ self._s.close()
+
+def secret_to_key(secret, s2k_specifier):
+ """Used to generate a hashed password string. DOCDOC."""
+ c = ord(s2k_specifier[8])
+ EXPBIAS = 6
+ count = (16+(c&15)) << ((c>>4) + EXPBIAS)
+
+ d = sha.new()
+ tmp = s2k_specifier[:8]+secret
+ slen = len(tmp)
+ while count:
+ if count > slen:
+ d.update(tmp)
+ count -= slen
+ else:
+ d.update(tmp[:count])
+ count = 0
+ return d.digest()
+
+def urandom_rng(n):
+ """Try to read some entropy from the platform entropy source."""
+ f = open('/dev/urandom', 'rb')
+ try:
+ return f.read(n)
+ finally:
+ f.close()
+
+def s2k_gen(secret, rng=None):
+ """DOCDOC"""
+ if rng is None:
+ if hasattr(os, "urandom"):
+ rng = os.urandom
+ else:
+ rng = urandom_rng
+ spec = "%s%s"%(rng(8), chr(96))
+ return "16:%s"%(
+ binascii.b2a_hex(spec + secret_to_key(secret, spec)))
+
+def s2k_check(secret, k):
+ """DOCDOC"""
+ assert k[:3] == "16:"
+
+ k = binascii.a2b_hex(k[3:])
+ return secret_to_key(secret, k[:9]) == k[9:]
+
+# Types of "EVENT" message.
+EVENT_TYPE = _Enum2(
+ CIRC="CIRC",
+ STREAM="STREAM",
+ ORCONN="ORCONN",
+ BW="BW",
+ NS="NS",
+ NEWDESC="NEWDESC",
+ DEBUG="DEBUG",
+ INFO="INFO",
+ NOTICE="NOTICE",
+ WARN="WARN",
+ ERR="ERR")
+
+loglevel = "DEBUG"
+loglevels = {"DEBUG" : 0, "INFO" : 1, "NOTICE" : 2, "WARN" : 3, "ERROR" : 4}
+
+def plog(level, msg): # XXX: Timestamps
+ if(loglevels[level] >= loglevels[loglevel]):
+ print level + ": " + msg
+
+class TorCtlError(Exception):
+ "Generic error raised by TorControl code."
+ pass
+
+class TorCtlClosed(TorCtlError):
+ "Raised when the controller connection is closed by Tor (not by us.)"
+ pass
+
+class ProtocolError(TorCtlError):
+ "Raised on violations in Tor controller protocol"
+ pass
+
+class ErrorReply(TorCtlError):
+ "Raised when Tor controller returns an error"
+ pass
+
+class NetworkStatus:
+ "Filled in during NS events"
+ pass
+
+class ExitPolicyLine:
+ def __init__(self, match, ip_mask, port_low, port_high):
+ self.match = match
+ if ip_mask == "*":
+ self.ip = 0
+ self.netmask = 0
+ else:
+ if ip_mask.find("/") == -1:
+ self.netmask = 0xFFFFFFFF
+ ip = ip_mask
+ else:
+ ip, mask = ip_mask.split("/")
+ if re.match(r"\d+.\d+.\d+.\d+", mask):
+ self.netmask=struct.unpack(">I", socket.inet_aton(mask))[0]
+ else:
+ self.netmask = ~(2**(32 - int(mask)) - 1)
+ self.ip = struct.unpack(">I", socket.inet_aton(ip))[0]
+ self.ip &= self.netmask
+ if port_low == "*":
+ self.port_low,self.port_high = (0,65535)
+ else:
+ if not port_high:
+ port_high = port_low
+ self.port_low = int(port_low)
+ self.port_high = int(port_high)
+
+ def check(self, ip, port):
+ ip = struct.unpack(">I", socket.inet_aton(ip))[0]
+ if (ip & self.netmask) == self.ip:
+ if self.port_low <= port and port <= self.port_high:
+ return self.match
+ return -1
+
+# XXX: Parse out version and OS
+class Router:
+ def __init__(self, idhex, name, bw, exitpolicy, down, guard, valid,
+ badexit, fast):
+ self.idhex = idhex
+ self.name = name
+ self.bw = bw
+ self.exitpolicy = exitpolicy
+ self.guard = guard
+ self.down = down
+ self.badexit = badexit
+ self.valid = valid
+ self.fast = fast
+
+ def will_exit_to(self, ip, port):
+ for line in self.exitpolicy:
+ ret = line.check(ip, port)
+ if ret != -1:
+ if ret: plog("DEBUG", "Match: "+str(ret)+" for "+self.name)
+ return ret
+ plog("NOTICE", "No matching exit line for "+self.name)
+ return 0
+
+class Circuit:
+ def __init__(self):
+ self.cid = 0
+ self.created_at = 0 # time
+ self.path = [] # routers
+ self.exit = 0
+
+
+class _ConnectionBase:
+ def __init__(self):
+ self._s = None
+ self._handler = None
+ self._handleFn = None
+ self._sendLock = threading.RLock()
+ self._queue = Queue.Queue()
+ self._thread = None
+ self._closedEx = None
+ self._closed = 0
+ self._closeHandler = None
+ self._eventThread = None
+ self._eventQueue = Queue.Queue()
+
+ def set_event_handler(self, handler):
+ """Cause future events from the Tor process to be sent to 'handler'.
+ """
+ raise NotImplemented
+
+ def set_close_handler(self, handler):
+ """Call 'handler' when the Tor process has closed its connection or
+ given us an exception. If we close normally, no arguments are
+ provided; otherwise, it will be called with an exception as its
+ argument.
+ """
+ self._closeHandler = handler
+
+ def close(self):
+ """Shut down this controller connection"""
+ self._sendLock.acquire()
+ try:
+ self._queue.put("CLOSE")
+ self._eventQueue.put("CLOSE")
+ self._s.close()
+ self._s = None
+ self._closed = 1
+ finally:
+ self._sendLock.release()
+
+# def _read_reply(self):
+# """DOCDOC"""
+# raise NotImplementd
+
+ def launch_thread(self, daemon=1):
+ """Launch a background thread to handle messages from the Tor process."""
+ assert self._thread is None
+ t = threading.Thread(target=self._loop)
+ if daemon:
+ t.setDaemon(daemon)
+ t.start()
+ self._thread = t
+ t = threading.Thread(target=self._eventLoop)
+ if daemon:
+ t.setDaemon(daemon)
+ t.start()
+ self._eventThread = t
+ return self._thread
+
+ def _loop(self):
+ """Main subthread loop: Read commands from Tor, and handle them either
+ as events or as responses to other commands.
+ """
+ while 1:
+ ex = None
+ try:
+ isEvent, reply = self._read_reply()
+ except:
+ self._err(sys.exc_info())
+ return
+
+ if isEvent:
+ if self._handler is not None:
+ self._eventQueue.put(reply)
+ else:
+ cb = self._queue.get()
+ cb(reply)
+
+ def _err(self, (tp, ex, tb), fromEventLoop=0):
+ """DOCDOC"""
+ # silent death is bad :(
+ traceback.print_exception(tp, ex, tb)
+ if self._s:
+ try:
+ self.close()
+ except:
+ pass
+ self._sendLock.acquire()
+ try:
+ self._closedEx = ex
+ self._closed = 1
+ finally:
+ self._sendLock.release()
+ while 1:
+ try:
+ cb = self._queue.get(timeout=0)
+ if cb != "CLOSE":
+ cb("EXCEPTION")
+ except Queue.Empty:
+ break
+ if self._closeHandler is not None:
+ self._closeHandler(ex)
+ return
+
+ def _eventLoop(self):
+ """DOCDOC"""
+ while 1:
+ reply = self._eventQueue.get()
+ if reply == "CLOSE":
+ return
+ try:
+ self._handleFn(reply)
+ except:
+ self._err(sys.exc_info(), 1)
+ return
+
+ def _sendImpl(self, sendFn, msg):
+ """DOCDOC"""
+ if self._thread is None:
+ self.launch_thread(1)
+ # This condition will get notified when we've got a result...
+ condition = threading.Condition()
+ # Here's where the result goes...
+ result = []
+
+ if self._closedEx is not None:
+ raise self._closedEx
+ elif self._closed:
+ raise TorCtlClosed()
+
+ def cb(reply,condition=condition,result=result):
+ condition.acquire()
+ try:
+ result.append(reply)
+ condition.notify()
+ finally:
+ condition.release()
+
+ # Sends a message to Tor...
+ self._sendLock.acquire()
+ try:
+ self._queue.put(cb)
+ sendFn(msg)
+ finally:
+ self._sendLock.release()
+
+ # Now wait till the answer is in...
+ condition.acquire()
+ try:
+ while not result:
+ condition.wait()
+ finally:
+ condition.release()
+
+ # ...And handle the answer appropriately.
+ assert len(result) == 1
+ reply = result[0]
+ if reply == "EXCEPTION":
+ raise self._closedEx
+
+ return reply
+
+
+class Connection(_ConnectionBase):
+ """A Connection represents a connection to the Tor process."""
+ def __init__(self, sock):
+ """Create a Connection to communicate with the Tor process over the
+ socket 'sock'.
+ """
+ _ConnectionBase.__init__(self)
+ self._s = _BufSock(sock)
+ self._debugFile = None
+
+ def debug(self, f):
+ """DOCDOC"""
+ self._debugFile = f
+
+ def set_event_handler(self, handler):
+ """Cause future events from the Tor process to be sent to 'handler'.
+ """
+ self._handler = handler
+ self._handleFn = handler.handle1
+
+ def _read_reply(self):
+ lines = []
+ while 1:
+ line = self._s.readline().strip()
+ if self._debugFile:
+ self._debugFile.write(" %s\n" % line)
+ if len(line)<4:
+ raise ProtocolError("Badly formatted reply line: Too short")
+ code = line[:3]
+ tp = line[3]
+ s = line[4:]
+ if tp == "-":
+ lines.append((code, s, None))
+ elif tp == " ":
+ lines.append((code, s, None))
+ isEvent = (lines and lines[0][0][0] == '6')
+ return isEvent, lines
+ elif tp != "+":
+ raise ProtocolError("Badly formatted reply line: unknown type %r"%tp)
+ else:
+ more = []
+ while 1:
+ line = self._s.readline()
+ if self._debugFile and tp != "+":
+ self._debugFile.write(" %s" % line)
+ if line in (".\r\n", ".\n"):
+ break
+ more.append(line)
+ lines.append((code, s, _unescape_dots("".join(more))))
+ isEvent = (lines and lines[0][0][0] == '6')
+ return (isEvent, lines)
+
+ def _doSend(self, msg):
+ if self._debugFile:
+ amsg = msg
+ lines = amsg.split("\n")
+ if len(lines) > 2:
+ amsg = "\n".join(lines[:2]) + "\n"
+ self._debugFile.write(">>> %s" % amsg)
+ self._s.write(msg)
+
+ def _sendAndRecv(self, msg="", expectedTypes=("250", "251")):
+ """Helper: Send a command 'msg' to Tor, and wait for a command
+ in response. If the response type is in expectedTypes,
+ return a list of (tp,body,extra) tuples. If it is an
+ error, raise ErrorReply. Otherwise, raise ProtocolError.
+ """
+ if type(msg) == types.ListType:
+ msg = "".join(msg)
+ assert msg.endswith("\r\n")
+
+ lines = self._sendImpl(self._doSend, msg)
+ # print lines
+ for tp, msg, _ in lines:
+ if tp[0] in '45':
+ raise ErrorReply("%s %s"%(tp, msg))
+ if tp not in expectedTypes:
+ raise ProtocolError("Unexpectd message type %r"%tp)
+
+ return lines
+
+ def authenticate(self, secret=""):
+ """Send an authenticating secret to Tor. You'll need to call this
+ method before Tor can start.
+ """
+ hexstr = binascii.b2a_hex(secret)
+ self._sendAndRecv("AUTHENTICATE %s\r\n"%hexstr)
+
+ def get_option(self, name):
+ """Get the value of the configuration option named 'name'. To
+ retrieve multiple values, pass a list for 'name' instead of
+ a string. Returns a list of (key,value) pairs.
+ Refer to section 3.3 of control-spec.txt for a list of valid names.
+ """
+ if not isinstance(name, str):
+ name = " ".join(name)
+ lines = self._sendAndRecv("GETCONF %s\r\n" % name)
+
+ r = []
+ for _,line,_ in lines:
+ try:
+ key, val = line.split("=", 1)
+ r.append((key,val))
+ except ValueError:
+ r.append((line, None))
+
+ return r
+
+ def set_option(self, key, value):
+ """Set the value of the configuration option 'key' to the value 'value'.
+ """
+ self.set_options([(key, value)])
+
+ def set_options(self, kvlist):
+ """Given a list of (key,value) pairs, set them as configuration
+ options.
+ """
+ if not kvlist:
+ return
+ msg = " ".join(["%s=%s"%(k,_quote(v)) for k,v in kvlist])
+ self._sendAndRecv("SETCONF %s\r\n"%msg)
+
+ def reset_options(self, keylist):
+ """Reset the options listed in 'keylist' to their default values.
+
+ Tor started implementing this command in version 0.1.1.7-alpha;
+ previous versions wanted you to set configuration keys to "".
+ That no longer works.
+ """
+ self._sendAndRecv("RESETCONF %s\r\n"%(" ".join(keylist)))
+
+ def get_network_status(self, who="all"):
+ """Get the entire network status list"""
+ return parse_ns_body(self._sendAndRecv("GETINFO ns/"+who+"\r\n")[0][2])
+
+ def get_router(self, ns):
+ """Fill in a Router class corresponding to a given NS class"""
+ desc = self._sendAndRecv("GETINFO desc/id/" + ns.idhex + "\r\n")[0][2].split("\n")
+ line = desc.pop(0)
+ m = re.search(r"^router\s+(\S+)\s+", line)
+ router = m.group(1)
+ exitpolicy = []
+ dead = not ("Running" in ns.flags)
+ bw_observed = 0
+ if router != ns.name:
+ plog("NOTICE", "Got different names " + ns.name + " vs " +
+ router + " for " + ns.idhex)
+ for line in desc:
+ ac = re.search(r"^accept (\S+):([^-]+)(?:-(\d+))?", line)
+ rj = re.search(r"^reject (\S+):([^-]+)(?:-(\d+))?", line)
+ bw = re.search(r"^bandwidth \d+ \d+ (\d+)", line)
+ if re.search(r"^opt hibernating 1", line):
+ dead = 1
+ if ac:
+ exitpolicy.append(ExitPolicyLine(1, *ac.groups()))
+ elif rj:
+ exitpolicy.append(ExitPolicyLine(0, *rj.groups()))
+ elif bw:
+ bw_observed = int(bw.group(1))
+ if not bw_observed and not dead and ("Valid" in ns.flags):
+ plog("NOTICE", "No bandwidth for live router " + ns.name)
+ return Router(ns.idhex, ns.name, bw_observed, exitpolicy, dead,
+ ("Guard" in ns.flags), ("Valid" in ns.flags),
+ ("BadExit" in ns.flags), ("Fast" in ns.flags))
+
+ def get_info(self, name):
+ """Return the value of the internal information field named 'name'.
+ Refer to section 3.9 of control-spec.txt for a list of valid names.
+ DOCDOC
+ """
+ if not isinstance(name, str):
+ name = " ".join(name)
+ lines = self._sendAndRecv("GETINFO %s\r\n"%name)
+ d = {}
+ for _,msg,more in lines:
+ if msg == "OK":
+ break
+ try:
+ k,rest = msg.split("=",1)
+ except ValueError:
+ raise ProtocolError("Bad info line %r",msg)
+ if more:
+ d[k] = more
+ else:
+ d[k] = rest
+ return d
+
+ def set_events(self, events, extended=False):
+ """Change the list of events that the event handler is interested
+ in to those in 'events', which is a list of event names.
+ Recognized event names are listed in section 3.3 of the control-spec
+ """
+ if extended:
+ print ("SETEVENTS EXTENDED %s\r\n" % " ".join(events))
+ self._sendAndRecv("SETEVENTS EXTENDED %s\r\n" % " ".join(events))
+ else:
+ self._sendAndRecv("SETEVENTS %s\r\n" % " ".join(events))
+
+ def save_conf(self):
+ """Flush all configuration changes to disk.
+ """
+ self._sendAndRecv("SAVECONF\r\n")
+
+ def send_signal(self, sig):
+ """Send the signal 'sig' to the Tor process; The allowed values for
+ 'sig' are listed in section 3.6 of control-spec.
+ """
+ sig = { 0x01 : "HUP",
+ 0x02 : "INT",
+ 0x0A : "USR1",
+ 0x0C : "USR2",
+ 0x0F : "TERM" }.get(sig,sig)
+ self._sendAndRecv("SIGNAL %s\r\n"%sig)
+
+ def map_address(self, kvList):
+ if not kvList:
+ return
+ m = " ".join([ "%s=%s" for k,v in kvList])
+ lines = self._sendAndRecv("MAPADDRESS %s\r\n"%m)
+ r = []
+ for _,line,_ in lines:
+ try:
+ key, val = line.split("=", 1)
+ except ValueError:
+ raise ProtocolError("Bad address line %r",v)
+ r.append((key,val))
+ return r
+
+ def extend_circuit(self, circid, hops):
+ """Tell Tor to extend the circuit identified by 'circid' through the
+ servers named in the list 'hops'.
+ """
+ if circid is None:
+ circid = "0"
+ lines = self._sendAndRecv("EXTENDCIRCUIT %s %s\r\n"
+ %(circid, ",".join(hops)))
+ tp,msg,_ = lines[0]
+ m = re.match(r'EXTENDED (\S*)', msg)
+ if not m:
+ raise ProtocolError("Bad extended line %r",msg)
+ return int(m.group(1))
+
+ def build_circuit(self, pathlen, entry_chooser, middle_chooser, exit_chooser):
+ circ = Circuit()
+ if pathlen == 1:
+ circ.exit = exit_chooser(circ.path)
+ circ.path = [circ.exit]
+ circ.cid = self.extend_circuit(0, circ.path)
+ else:
+ circ.path.append(entry_chooser(circ.path))
+ for i in xrange(1, pathlen-1):
+ circ.path.append(middle_chooser(circ.path))
+ circ.path.append(exit_chooser(circ.path))
+ circ.cid = self.extend_circuit(0, circ.path)
+ circ.created_at = datetime.datetime.now()
+ return circ
+
+
+ def redirect_stream(self, streamid, newaddr, newport=""):
+ """DOCDOC"""
+ if newport:
+ self._sendAndRecv("REDIRECTSTREAM %s %s %s\r\n"%(streamid, newaddr, newport))
+ else:
+ self._sendAndRecv("REDIRECTSTREAM %s %s\r\n"%(streamid, newaddr))
+
+ def attach_stream(self, streamid, circid):
+ """DOCDOC"""
+ self._sendAndRecv("ATTACHSTREAM %s %s\r\n"%(streamid, circid))
+
+ def close_stream(self, streamid, reason=0, flags=()):
+ """DOCDOC"""
+ self._sendAndRecv("CLOSESTREAM %s %s %s\r\n"
+ %(streamid, reason, "".join(flags)))
+
+ def close_circuit(self, circid, reason=0, flags=()):
+ """DOCDOC"""
+ self._sendAndRecv("CLOSECIRCUIT %s %s %s\r\n"
+ %(circid, reason, "".join(flags)))
+
+ def post_descriptor(self, desc):
+ self._sendAndRecv("+POSTDESCRIPTOR\r\n%s"%_escape_dots(desc))
+
+def parse_ns_body(data):
+ "Parse the body of an NS event or command."
+ nsgroups = re.compile(r"^r ", re.M).split(data)
+ nsgroups.pop(0)
+ nslist = []
+ for nsline in nsgroups:
+ ns = NetworkStatus()
+ m = re.match(r"(\S+)\s(\S+)\s(\S+)\s(\S+\s\S+)\s(\S+)\s(\d+)\s(\d+)", nsline)
+ ns.name,ns.idhash,ns.orhash,updated,ns.ip,ns.orport,ns.dirport = m.groups()
+ ns.idhex = (ns.idhash + "=").decode("base64").encode("hex")
+ ns.orport,ns.dirport = map(int, (ns.orport,ns.dirport))
+ m = re.search(r"(\d+)-(\d+)-(\d+) (\d+):(\d+):(\d+)", updated)
+ ns.updated = datetime.datetime(*map(int, m.groups()))
+ m = re.search(r"^s((?:\s\S*)+)", nsline, re.M)
+ flags = m.groups()
+ ns.flags = flags[0].strip().split(" ")
+ nslist.append(ns)
+ return nslist
+
+class EventHandler:
+ """An 'EventHandler' wraps callbacks for the events Tor can return."""
+ def __init__(self):
+ """Create a new EventHandler."""
+ self._map1 = {
+ "CIRC" : self.circ_status,
+ "STREAM" : self.stream_status,
+ "ORCONN" : self.or_conn_status,
+ "BW" : self.bandwidth,
+ "DEBUG" : self.msg,
+ "INFO" : self.msg,
+ "NOTICE" : self.msg,
+ "WARN" : self.msg,
+ "ERR" : self.msg,
+ "NEWDESC" : self.new_desc,
+ "ADDRMAP" : self.address_mapped,
+ "NS" : self.ns
+ }
+
+ def handle1(self, lines):
+ """Dispatcher: called from Connection when an event is received."""
+ for code, msg, data in lines:
+ evtype, args = self.decode1(msg, data)
+ self._map1.get(evtype, self.unknown_event)(evtype, *args)
+
+ def decode1(self, body, data):
+ """Unpack an event message into a type/arguments-tuple tuple."""
+ if " " in body:
+ evtype,body = body.split(" ",1)
+ else:
+ evtype,body = body,""
+ evtype = evtype.upper()
+ if evtype == "CIRC":
+ m = re.match(r"(\d+)\s+(\S+)(\s\S+)?(\s\S+)?(\s\S+)?", body)
+ if not m:
+ raise ProtocolError("CIRC event misformatted.")
+ ident,status,path,reason,remote = m.groups()
+ ident = int(ident)
+ if path:
+ if path.find("REASON=") != -1:
+ remote = reason
+ reason = path
+ path=[]
+ else:
+ path = path.strip().split(",")
+ else:
+ path = []
+ if reason: reason = reason[8:]
+ if remote: remote = remote[15:]
+ args = ident, status, path, reason, remote
+ elif evtype == "STREAM":
+ m = re.match(r"(\S+)\s+(\S+)\s+(\S+)\s+(\S+):(\d+)(\s\S+)?(\s\S+)?", body)
+ if not m:
+ raise ProtocolError("STREAM event misformatted.")
+ ident,status,circ,target_host,target_port,reason,remote = m.groups()
+ ident,circ = map(int, (ident,circ))
+ if reason: reason = reason[8:]
+ if remote: remote = remote[15:]
+ args = ident, status, circ, target_host, int(target_port), reason, remote
+ elif evtype == "ORCONN":
+ m = re.match(r"(\S+)\s+(\S+)", body)
+ if not m:
+ raise ProtocolError("ORCONN event misformatted.")
+ target, status = m.groups()
+ args = status, target
+ elif evtype == "BW":
+ m = re.match(r"(\d+)\s+(\d+)", body)
+ if not m:
+ raise ProtocolError("BANDWIDTH event misformatted.")
+ read, written = map(long, m.groups())
+ args = read, written
+ elif evtype in ("DEBUG", "INFO", "NOTICE", "WARN", "ERR"):
+ args = evtype, body
+ elif evtype == "NEWDESC":
+ args = (body.split(" "),)
+ elif evtype == "ADDRMAP":
+ m = re.match(r'(\S+)\s+(\S+)\s+(\"[^"]+\"|\w+)')
+ if not m:
+ raise ProtocolError("BANDWIDTH event misformatted.")
+ fromaddr, toaddr, when = m.groups()
+ if when.upper() == "NEVER":
+ when = None
+ else:
+ when = time.localtime(
+ time.strptime(when[1:-1], "%Y-%m-%d %H:%M:%S"))
+ args = fromaddr, toaddr, when
+ elif evtype == "NS":
+ args = (parse_ns_body(data),)
+ else:
+ args = (body,)
+
+ return evtype, args
+
+ def unknown_event(self, eventtype, evtype, *args):
+ """Called when we get an event type we don't recognize. This
+ is almost alwyas an error.
+ """
+ raise NotImplemented
+
+ def circ_status(self, eventtype, circID, status, path, reason, remote):
+ """Called when a circuit status changes if listening to CIRCSTATUS
+ events. 'status' is a member of CIRC_STATUS; circID is a numeric
+ circuit ID, and 'path' is the circuit's path so far as a list of
+ names.
+ """
+ raise NotImplemented
+
+ def stream_status(self, eventtype, streamID, status, circID, target_host, target_port, reason, remote):
+ """Called when a stream status changes if listening to STREAMSTATUS
+ events. 'status' is a member of STREAM_STATUS; streamID is a
+ numeric stream ID, and 'target' is the destination of the stream.
+ """
+ raise NotImplemented
+
+ def or_conn_status(self, eventtype, status, target):
+ """Called when an OR connection's status changes if listening to
+ ORCONNSTATUS events. 'status' is a member of OR_CONN_STATUS; target
+ is the OR in question.
+ """
+ raise NotImplemented
+
+ def bandwidth(self, eventtype, read, written):
+ """Called once a second if listening to BANDWIDTH events. 'read' is
+ the number of bytes read; 'written' is the number of bytes written.
+ """
+ raise NotImplemented
+
+ def new_desc(self, eventtype, identities):
+ """Called when Tor learns a new server descriptor if listenting to
+ NEWDESC events.
+ """
+ raise NotImplemented
+
+ def msg(self, eventtype, severity, message):
+ """Called when a log message of a given severity arrives if listening
+ to INFO_MSG, NOTICE_MSG, WARN_MSG, or ERR_MSG events."""
+ raise NotImplemented
+
+ def ns(self, eventtype, nslist):
+ raise NotImplemented
+
+ def address_mapped(self, eventtype, fromAddr, toAddr, expiry=None):
+ """Called when Tor adds a mapping for an address if listening
+ to ADDRESSMAPPED events.
+ """
+ raise NotImplemented
+
+
+class DebugEventHandler(EventHandler):
+ """Trivial debug event handler: reassembles all parsed events to stdout."""
+ def circ_status(self, eventtype, circID, status, path, reason, remote):
+ output = [eventtype, str(circID), status]
+ if path: output.append(",".join(path))
+ if reason: output.append("REASON=" + reason)
+ if remote: output.append("REMOTE_REASON=" + remote)
+ print " ".join(output)
+
+ def stream_status(self, eventtype, streamID, status, circID, target_host, target_port, reason, remote):
+ output = [eventtype, str(streamID), status, str(circID), target_host,
+str(target_port)]
+ if reason: output.append("REASON=" + reason)
+ if remote: output.append("REMOTE_REASON=" + remote)
+ print " ".join(output)
+
+ def ns(self, eventtype, nslist):
+ for ns in nslist:
+ print " ".join((eventtype, ns.name, ns.idhash,
+ ns.updated.isoformat(), ns.ip, str(ns.orport),
+ str(ns.dirport), " ".join(ns.flags)))
+
+ def new_desc(self, eventtype, identities):
+ print " ".join((eventtype, " ".join(identities)))
+
+def parseHostAndPort(h):
+ """Given a string of the form 'address:port' or 'address' or
+ 'port' or '', return a two-tuple of (address, port)
+ """
+ host, port = "localhost", 9100
+ if ":" in h:
+ i = h.index(":")
+ host = h[:i]
+ try:
+ port = int(h[i+1:])
+ except ValueError:
+ print "Bad hostname %r"%h
+ sys.exit(1)
+ elif h:
+ try:
+ port = int(h)
+ except ValueError:
+ host = h
+
+ return host, port
+
+def get_connection(sock):
+ """Given a socket attached to a Tor control port, detect the version of Tor
+ and return an appropriate 'Connection' object."""
+ return Connection(sock)
+
+def run_example(host,port):
+ print "host is %s:%d"%(host,port)
+ s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+ s.connect((host,port))
+ c = get_connection(s)
+ c.set_event_handler(DebugEventHandler())
+ th = c.launch_thread()
+ c.authenticate()
+ print "nick",`c.get_option("nickname")`
+ print `c.get_info("version")`
+ #print `c.get_info("desc/name/moria1")`
+ print `c.get_info("network-status")`
+ print `c.get_info("addr-mappings/all")`
+ print `c.get_info("addr-mappings/config")`
+ print `c.get_info("addr-mappings/cache")`
+ print `c.get_info("addr-mappings/control")`
+
+ print `c.extend_circuit(0,["moria1"])`
+ try:
+ print `c.extend_circuit(0,[""])`
+ except ErrorReply: # wtf?
+ print "got error. good."
+ except:
+ print "Strange error", sys.exc_info()[0]
+
+ #send_signal(s,1)
+ #save_conf(s)
+
+ #set_option(s,"1")
+ #set_option(s,"bandwidthburstbytes 100000")
+ #set_option(s,"runasdaemon 1")
+ #set_events(s,[EVENT_TYPE.WARN])
+ c.set_events([EVENT_TYPE.STREAM, EVENT_TYPE.CIRC,
+ EVENT_TYPE.NS, EVENT_TYPE.NEWDESC], True)
+
+ th.join()
+ return
+
+if __name__ == '__main__':
+ import socket
+ if len(sys.argv) > 2:
+ print "Syntax: TorControl.py torhost:torport"
+ sys.exit(0)
+ else:
+ sys.argv.append("localhost:9061")
+ sh,sp = parseHostAndPort(sys.argv[1])
+ run_example(sh,sp)
+
Property changes on: torflow/trunk/TorCtl.py
___________________________________________________________________
Name: svn:executable
+ *
Added: torflow/trunk/metatroller.py
===================================================================
--- torflow/trunk/metatroller.py 2007-01-16 00:01:42 UTC (rev 9361)
+++ torflow/trunk/metatroller.py 2007-01-16 02:35:33 UTC (rev 9362)
@@ -0,0 +1,202 @@
+#!/usr/bin/python
+
+"""
+Metatroller - Tor Meta controller
+"""
+
+# Metatroller.
+
+
+import TorCtl
+import atexit
+import sys
+import socket
+import struct
+import traceback
+import re
+import random
+from TorCtl import plog
+
+routers = {} # indexed by idhex
+name_to_key = {}
+key_to_name = {}
+
+total_r_bw = 0
+sorted_r = []
+sorted_g = []
+total_g_bw = 0
+
+circuits = {} # map from ID # to circuit object
+streams = {} # map from stream id to circuit
+
+exit_port_idx = {} # Used in ordered exits mode
+
+# XXX: Option to ignore guard flag
+
+# XXX: Move these to config file
+control_host = "127.0.0.1"
+control_port = 9061
+max_detach = 3
+
+
+# Technically we could just add member vars as we need them, but this
+# is a bit more clear
+class MetaRouter(TorCtl.Router):
+ def __init__(self, router):
+ # XXX: Copy router stuff over
+ self.failed = 0
+ self.suspected = 0
+ self.circ_selections = 0
+ self.strm_selections = 0
+ self.reason_suspected = {}
+ self.reason_failed = {}
+
+class MetaCircuit(TorCtl.Circuit):
+ def __init__(self, circuit):
+ # XXX: Copy circuit stuff over
+ self.detached_cnt = 0
+ self.used_cnt = 0
+
+class Stream:
+ def __init__(self):
+ self.detached_from = [] # circ id #'s
+ self.detached_cnt = 0
+
+# XXX: Technically we should obey fast and valid????
+def choose_entry_uniform(path):
+ r = random.choice(sorted_g)
+ while r.idhex in path:
+ r = random.choice(sorted_g)
+ return r.idhex
+
+def choose_middle_uniform(path):
+ r = random.choice(sorted_r)
+ while r.idhex in path:
+ r = random.choice(sorted_r)
+ return r.idhex
+
+def choose_exit_uniform(path, target_ip, target_port):
+ allowed = []
+ print target_ip, target_port
+ for r in sorted_r:
+ if r.will_exit_to(target_ip, target_port):
+ plog("DEBUG", r.name + " will exit")
+ allowed.append(r)
+ r = random.choice(allowed)
+ while r.idhex in path:
+ r = random.choice(allowed)
+ return r.idhex
+
+def read_routers(c, nslist):
+ bad_key = 0
+ for ns in nslist:
+ try:
+ key_to_name[ns.idhex] = ns.name
+ name_to_key[ns.name] = ns.idhex
+ r = c.get_router(ns)
+ if ns.idhex in routers:
+ if routers[ns.idhex].name != r.name:
+ plog("NOTICE", "Router "+r.idhex+" changed names from "
+ +routers[ns.idhex].name+" to "+r.name)
+ sorted_r.remove(routers[ns.idhex])
+ routers[ns.idhex] = r
+ sorted_r.append(r)
+ except TorCtl.ErrorReply:
+ bad_key += 1
+ if "Running" in ns.flags:
+ plog("NOTICE", "Running router "+ns.name+"="
+ +ns.idhex+" has no descriptor")
+ pass
+ except:
+ #print sys.exc_info()[1]
+ traceback.print_exception(*sys.exc_info())
+ continue
+ sorted_r.sort(cmp, lambda sr: sr.bw)
+
+ global total_r_bw, total_g_bw # lame....
+ for r in sorted_r:
+ if not r.down:
+ total_r_bw += r.bw
+ if r.guard and r.valid:
+ total_g_bw += r.bw
+ sorted_g.append(r)
+
+ plog("DEBUG", str(bad_key) + " bad keys")
+
+# Make eventhandler
+class SnakeHandler(TorCtl.EventHandler):
+ def __init__(self, c):
+ TorCtl.EventHandler.__init__(self)
+ self.c = c
+
+ def circ_status(self, eventtype, circID, status, path, reason, remote):
+ output = [eventtype, str(circID), status]
+ if path: output.append(",".join(path))
+ if reason: output.append("REASON=" + reason)
+ if remote: output.append("REMOTE_REASON=" + remote)
+ plog("DEBUG", " ".join(output))
+
+ def stream_status(self, eventtype, streamID, status, circID, target_host, target_port, reason, remote):
+ output = [eventtype, str(streamID), status, str(circID), target_host,
+str(target_port)]
+ if reason: output.append("REASON=" + reason)
+ if remote: output.append("REMOTE_REASON=" + remote)
+ plog("DEBUG", " ".join(output))
+ if not re.match(r"\d+.\d+.\d+.\d+", target_host):
+ target_host = "255.255.255.255" # ignore DNS for exit policy check
+ if status == "NEW":
+ attach_circ = 0
+ global circuits
+ for circ in circuits.itervalues():
+ if circ.exit.will_exit_to(target_host, target_port):
+ attach_circ = circ
+ break
+ else:
+ attach_circ = self.c.build_circuit(3, choose_entry_uniform,
+ choose_middle_uniform,
+ lambda path:
+ choose_exit_uniform(path,
+ target_host, target_port))
+ circuits[attach_circ.cid] = attach_circ
+ # TODO: attach
+ elif status == "DETACHED":
+ pass
+ elif status == "FAILED":
+ pass
+
+ def ns(self, eventtype, nslist):
+ read_routers(self.c, nslist)
+ plog("DEBUG", "Read " + str(len(nslist)) + " NS dox => "
+ + str(len(sorted_r)) + " routers")
+
+ def new_desc(self, eventtype, identities):
+ for i in identities: # Is this too slow?
+ read_routers(self.c, self.c.get_network_status("id/"+i))
+ plog("DEBUG", "Read " + str(len(identities)) + " desc => "
+ + str(len(sorted_r)) + " routers")
+
+def deconf():
+ pass
+
+def metaloop(c):
+ """Loop that handles metatroller commands"""
+ nslist = c.get_network_status()
+ read_routers(c, nslist)
+ plog("INFO", "Read "+str(len(sorted_r))+"/"+str(len(nslist))+" routers")
+
+def main(argv):
+ atexit.register(deconf)
+ s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+ s.connect((control_host,control_port))
+ c = TorCtl.get_connection(s)
+ c.set_event_handler(SnakeHandler(c))
+ th = c.launch_thread()
+ c.authenticate()
+ c.set_events([TorCtl.EVENT_TYPE.STREAM,
+ TorCtl.EVENT_TYPE.NS,
+ TorCtl.EVENT_TYPE.CIRC,
+ TorCtl.EVENT_TYPE.NEWDESC], True)
+ metaloop(c)
+ th.join()
+
+main(sys.argv)
Property changes on: torflow/trunk/metatroller.py
___________________________________________________________________
Name: svn:executable
+ *