[Author Prev][Author Next][Thread Prev][Thread Next][Author Index][Thread Index]

[or-cvs] r9715: Added IdHexRestriction and NickRestriction from sjmurdoch. A (in torflow/trunk: . TorCtl)



Author: mikeperry
Date: 2007-03-02 15:16:36 -0500 (Fri, 02 Mar 2007)
New Revision: 9715

Modified:
   torflow/trunk/TorCtl/PathSupport.py
   torflow/trunk/TorCtl/TorCtl.py
   torflow/trunk/metatroller.py
   torflow/trunk/soat.pl
Log:
Added IdHexRestriction and NickRestriction from sjmurdoch. Also added a
concurency manager for handling config changes from the metaport. 

Note: Refactoring to follow to make room for plugable statistics. Stand clear
for falling classes (SnakeHandler and SelectionManager -> PathSupport.py).



Modified: torflow/trunk/TorCtl/PathSupport.py
===================================================================
--- torflow/trunk/TorCtl/PathSupport.py	2007-03-02 20:00:37 UTC (rev 9714)
+++ torflow/trunk/TorCtl/PathSupport.py	2007-03-02 20:16:36 UTC (rev 9715)
@@ -15,7 +15,7 @@
 "VersionExcludeRestriction", "ExitPolicyRestriction", "OrNodeRestriction",
 "AtLeastNNodeRestriction", "NotNodeRestriction", "Subnet16Restriction",
 "UniqueRestriction", "UniformGenerator", "OrderedExitGenerator",
-"PathSelector", "Connection"]
+"PathSelector", "Connection", "NickRestriction", "IdHexRestriction"]
 
 #################### Path Support Interfaces #####################
 
@@ -25,9 +25,9 @@
     def reset(self, router_list): pass
 
 class NodeRestrictionList:
-    def __init__(self, restrictions, sorted_r):
+    def __init__(self, restrictions, sorted_rlist):
         self.restrictions = restrictions
-        self.update_routers(sorted_r)
+        self.update_routers(sorted_rlist)
 
     def __check_r(self, r):
         for rst in self.restrictions:
@@ -35,10 +35,10 @@
         self.restricted_bw += r.bw
         return True
 
-    def update_routers(self, sorted_r):
-        self._sorted_r = sorted_r
+    def update_routers(self, sorted_rlist):
+        self._sorted_r = sorted_rlist
         self.restricted_bw = 0
-        for rs in self.restrictions: rs.reset(sorted_r)
+        for rs in self.restrictions: rs.reset(sorted_rlist)
         self.restricted_r = filter(self.__check_r, self._sorted_r)
 
     def add_restriction(self, restr):
@@ -142,7 +142,7 @@
 # TODO: We still need more path support implementations
 #  - BwWeightedGenerator
 #  - NodeRestrictions:
-#    - Uptime
+#    - Uptime/LongLivedPorts (Does/should hibernation count?)
 #    - Published/Updated
 #    - GeoIP
 #      - NodeCountry
@@ -159,8 +159,8 @@
 class PercentileRestriction(NodeRestriction):
     """If used, this restriction MUST be FIRST in the RestrictionList."""
     def __init__(self, pct_skip, pct_fast, r_list):
+        self.pct_fast = pct_fast
         self.pct_skip = pct_skip
-        self.pct_fast = pct_fast
         self.sorted_r = r_list
         self.position = 0
 
@@ -215,6 +215,25 @@
             if f in router.flags: return False
         return True
 
+class NickRestriction(NodeRestriction):
+    """Require that the node nickname is as specified"""
+    def __init__(self, nickname):
+        self.nickname = nickname
+
+    def r_is_ok(self, router):
+        return router.nickname == self.nickname
+
+class IdHexRestriction(NodeRestriction):
+    """Require that the node idhash is as specified"""
+    def __init__(self, idhex):
+        if idhex[0] == '$':
+            self.idhex = idhex[1:].upper()
+        else:
+            self.idhex = idhex.upper()
+
+    def r_is_ok(self, router):
+        return router.idhex.upper() == self.idhex
+    
 class MinBWRestriction(NodeRestriction):
     def __init__(self, minbw):
         self.min_bw = minbw
@@ -333,7 +352,11 @@
             self.last_idx = len(self.routers)
         else:
             self.last_idx = self.next_exit_by_port[self.to_port]
-   
+
+    def set_port(self, port):
+        self.to_port = port
+        self.rewind()
+       
     # Just in case: 
     def mark_chosen(self, r): raise NotImplemented()
     def all_chosen(self): raise NotImplemented()
@@ -409,35 +432,37 @@
     c.debug(file("control.log", "w"))
     c.authenticate()
     nslist = c.get_network_status()
-    sorted_r = c.read_routers(c.get_network_status())
+    sorted_rlist = c.read_routers(c.get_network_status())
 
-    pct_rst = PercentileRestriction(10, 20, sorted_r)
-    oss_rst = OSRestriction([r"[lL]inux", r"BSD", "Darwin"], [])
-    prop_rst = OSRestriction([], ["Windows", "Solaris"])
-
+    do_unit(PercentileRestriction(0, 100, sorted_rlist), sorted_rlist,
+                  lambda r: "")
+    do_unit(PercentileRestriction(10, 20, sorted_rlist), sorted_rlist,
+                  lambda r: "")
+    exit(0)
+    do_unit(OSRestriction([r"[lL]inux", r"BSD", "Darwin"], []), sorted_rlist,
+                  lambda r: r.os)
+    do_unit(OSRestriction([], ["Windows", "Solaris"]), sorted_rlist,
+                  lambda r: r.os)
    
-    do_unit(VersionRangeRestriction("0.1.2.0"), sorted_r,
+    do_unit(VersionRangeRestriction("0.1.2.0"), sorted_rlist,
                   lambda r: str(r.version))
-    do_unit(VersionRangeRestriction("0.1.2.0", "0.1.2.5"), sorted_r,
+    do_unit(VersionRangeRestriction("0.1.2.0", "0.1.2.5"), sorted_rlist,
                   lambda r: str(r.version))
     do_unit(VersionIncludeRestriction(["0.1.1.26-alpha", "0.1.2.7-ignored"]),
-                  sorted_r, lambda r: str(r.version))
-    do_unit(VersionExcludeRestriction(["0.1.1.26"]), sorted_r,
+                  sorted_rlist, lambda r: str(r.version))
+    do_unit(VersionExcludeRestriction(["0.1.1.26"]), sorted_rlist,
                   lambda r: str(r.version))
 
-    do_unit(ConserveExitsRestriction(), sorted_r, lambda r: " ".join(r.flags))
-    do_unit(FlagsRestriction([], ["Valid"]), sorted_r, lambda r: " ".join(r.flags))
+    do_unit(ConserveExitsRestriction(), sorted_rlist, lambda r: " ".join(r.flags))
+    do_unit(FlagsRestriction([], ["Valid"]), sorted_rlist, lambda r: " ".join(r.flags))
 
-    # TODO: Cross check ns exit flag with this list
-    #do_unit(ExitPolicyRestriction("255.255.255.255", 25), sorted_r)
+    # XXX: Need unittest
+    do_unit(IdHexRestriction("$FFCB46DB1339DA84674C70D7CB586434C4370441"),
+                  sorted_rlist, lambda r: r.idhex)
 
-    #do_unit(pct_rst, sorted_r)
-    #do_unit(oss_rst, sorted_r)
-    #do_unit(alpha_rst, sorted_r)
-    
     rl =  [AtLeastNNodeRestriction([ExitPolicyRestriction("255.255.255.255", 80), ExitPolicyRestriction("255.255.255.255", 443), ExitPolicyRestriction("255.255.255.255", 6667)], 2), FlagsRestriction([], ["BadExit"])]
 
-    exit_rstr = NodeRestrictionList(rl, sorted_r)
+    exit_rstr = NodeRestrictionList(rl, sorted_rlist)
 
     ug = UniformGenerator(exit_rstr)
 

Modified: torflow/trunk/TorCtl/TorCtl.py
===================================================================
--- torflow/trunk/TorCtl/TorCtl.py	2007-03-02 20:00:37 UTC (rev 9714)
+++ torflow/trunk/TorCtl/TorCtl.py	2007-03-02 20:16:36 UTC (rev 9715)
@@ -288,7 +288,7 @@
                 if self._handler is not None:
                     self._eventQueue.put(reply)
             else:
-                cb = self._queue.get()
+                cb = self._queue.get() # XXX: lock?
                 cb(reply)
 
     def _err(self, (tp, ex, tb), fromEventLoop=0):
@@ -524,9 +524,9 @@
             rj = re.search(r"^reject (\S+):([^-]+)(?:-(\d+))?", line)
             bw = re.search(r"^bandwidth \d+ \d+ (\d+)", line)
             if re.search(r"^opt hibernating 1", line):
-                dead = 1 # XXX: Technically this may be stale..
+                #dead = 1 # XXX: Technically this may be stale..
                 if ("Running" in ns.flags):
-                    plog("NOTICE", "Hibernating router is running..")
+                    plog("NOTICE", "Hibernating router "+ns.nickname+" is running..")
             if ac:
                 exitpolicy.append(ExitPolicyLine(True, *ac.groups()))
             elif rj:
@@ -698,7 +698,7 @@
         """Dispatcher: called from Connection when an event is received."""
         for code, msg, data in lines:
             event = self.decode1(msg, data)
-            self.heartbeat_event()
+            self.heartbeat_event(event)
             self._map1.get(event.event_name, self.unknown_event)(event)
 
     def decode1(self, body, data):
@@ -783,9 +783,9 @@
 
         return event
 
-    def heartbeat_event(self):
-        """Called every time any event is recieved. Convenience function
-           for any cleanup you may need to do.
+    def heartbeat_event(self, event):
+        """Called before any event is recieved. Convenience function
+           for any cleanup/setup/reconfiguration you may need to do.
         """
         pass
 

Modified: torflow/trunk/metatroller.py
===================================================================
--- torflow/trunk/metatroller.py	2007-03-02 20:00:37 UTC (rev 9714)
+++ torflow/trunk/metatroller.py	2007-03-02 20:16:36 UTC (rev 9715)
@@ -14,6 +14,7 @@
 import datetime
 import threading
 import struct
+import copy
 from TorCtl import *
 from TorCtl.TorUtil import *
 from TorCtl.PathSupport import *
@@ -35,7 +36,6 @@
 meta_host = "127.0.0.1"
 meta_port = 9052
 max_detach = 3
-order_exits = False
 
 # Thread shared variables. Relying on GIL for weak atomicity (really
 # we only care about corruption.. GIL prevents that, so no locking needed)
@@ -44,14 +44,12 @@
 
 last_exit = None
 resolve_port = 0
-percent_fast = 100
-percent_skip = 0
-pathlen = 3
-min_bw = 0
 num_circuits = 1 # TODO: Use
-use_all_exits = False
 new_nym = False
+pathlen=3
 
+
+
 # Technically we could just add member vars as we need them, but this
 # is a bit more clear
 class MetaRouter(TorCtl.Router):
@@ -71,6 +69,8 @@
         self.detached_cnt = 0
         self.used_cnt = 0
         self.created_at = datetime.datetime.now()
+        # XXX: build time (export to stats)
+        # XXX: Study timeout's effect on success rates...
         self.pending_streams = [] # Which stream IDs are pending us
 
     
@@ -83,35 +83,119 @@
         self.host = host
         self.port = port
 
+# XXX: Scheduled to be moved to PathSupport
+class SelectionManager:
+    """Helper class to handle configuration updates
+      
+      The methods are NOT threadsafe. They may ONLY be called from
+      EventHandler's thread.
+
+      However, the variables defined in init can be modified anywhere."""
+    def __init__(self, order_exits, percent_fast, percent_skip, min_bw,
+                 use_all_exits, uniform, use_exit, use_guards):
+        "The member variables defined here may be modified by other threads"
+        self.__ordered_exit_gen = None # except this one ;)
+        self.order_exits = order_exits
+        self.percent_fast = percent_fast
+        self.percent_skip = percent_skip
+        self.min_bw = min_bw
+        self.use_all_exits = use_all_exits
+        self.uniform = uniform
+        self.exit_name = use_exit
+        self.use_guards = use_guards
+
+    def reconfigure(self):
+        """
+        Member variables from this funciton should not be modified by other
+        threads.
+        """
+        if self.use_all_exits:
+            self.path_rstr = PathRestrictionList([])
+        else:
+            self.path_rstr = PathRestrictionList(
+                     [Subnet16Restriction(), UniqueRestriction()])
+            
+        self.entry_rstr = NodeRestrictionList(
+            [
+             PercentileRestriction(self.percent_skip, self.percent_fast,
+                sorted_r),
+             ConserveExitsRestriction(),
+             FlagsRestriction(["Guard", "Valid", "Running"], [])
+             ], sorted_r)
+        self.mid_rstr = NodeRestrictionList(
+            [PercentileRestriction(self.percent_skip, self.percent_fast,
+                sorted_r),
+             ConserveExitsRestriction(),
+             FlagsRestriction(["Valid", "Running"], [])], sorted_r)
+
+        if self.use_all_exits:
+            self.exit_rstr = NodeRestrictionList(
+                [FlagsRestriction(["Valid", "Running"], ["BadExit"])], sorted_r)
+        else:
+            self.exit_rstr = NodeRestrictionList(
+                [PercentileRestriction(self.percent_skip, self.percent_fast,
+                   sorted_r),
+                 FlagsRestriction(["Valid", "Running"], ["BadExit"])],
+                 sorted_r)
+
+        if self.exit_name:
+            if self.exit_name[0] == '$':
+                self.exit_rstr.add_restriction(IdHexRestriction(self.exit_name))
+            else:
+                self.exit_rstr.add_restriction(NickRestriction(self.exit_name))
+
+        # This is kind of hokey..
+        if self.order_exits:
+            if self.__ordered_exit_gen:
+                exitgen = self.__ordered_exit_gen
+            else:
+                exitgen = self.__ordered_exit_gen = \
+                    OrderedExitGenerator(self.exit_rstr, 80)
+        else:
+            exitgen = UniformGenerator(self.exit_rstr)
+
+        if self.uniform:
+            self.path_selector = PathSelector(
+                 UniformGenerator(self.entry_rstr),
+                 UniformGenerator(self.mid_rstr),
+                 exitgen, self.path_rstr)
+        else:
+            raise NotImplemented()
+
+    def set_target(self, ip, port):
+        self.exit_rstr.del_restriction(ExitPolicyRestriction)
+        self.exit_rstr.add_restriction(ExitPolicyRestriction(ip, port))
+        if self.__ordered_exit_gen: self.__ordered_exit_gen.set_port(port)
+
+    def update_routers(self, new_rlist):
+        self.entry_rstr.update_routers(new_rlist)
+        self.mid_rstr.update_routers(new_rlist)
+        self.exit_rstr.update_routers(new_rlist)
+
+
+selmgr = SelectionManager(
+         order_exits=False,
+         percent_fast=100,
+         percent_skip=0,
+         min_bw=1024,
+         use_all_exits=False,
+         uniform=True,
+         use_exit=None,
+         use_guards=False)
+
 # TODO: Make passive mode so people can get aggregate node reliability 
 # stats for normal usage without us attaching streams
 
-# Make eventhandler
+# XXX: Scheduled to be moved to PathSupport and refactored
 class SnakeHandler(TorCtl.EventHandler):
-    def __init__(self, c):
+    def __init__(self, c, slnmgr):
         TorCtl.EventHandler.__init__(self)
         self.c = c
         nslist = c.get_network_status()
         self.read_routers(nslist)
+        self.selmgr = slnmgr
+        self.selmgr.reconfigure()
         plog("INFO", "Read "+str(len(sorted_r))+"/"+str(len(nslist))+" routers")
-        self.path_rstr = PathRestrictionList(
-                 [Subnet16Restriction(), UniqueRestriction()])
-        self.entry_rstr = NodeRestrictionList(
-            [PercentileRestriction(percent_skip, percent_fast, sorted_r),
-             ConserveExitsRestriction(),
-             FlagsRestriction(["Guard", "Valid", "Running"], [])], sorted_r)
-        self.mid_rstr = NodeRestrictionList(
-            [PercentileRestriction(percent_skip, percent_fast, sorted_r),
-             ConserveExitsRestriction(),
-             FlagsRestriction(["Valid", "Running"], [])], sorted_r)
-        self.exit_rstr = NodeRestrictionList(
-            [PercentileRestriction(percent_skip, percent_fast, sorted_r),
-             FlagsRestriction(["Valid", "Running", "Exit"], ["BadExit"])],
-             sorted_r)
-        self.path_selector = PathSelector(
-             UniformGenerator(self.entry_rstr),
-             UniformGenerator(self.mid_rstr),
-             OrderedExitGenerator(self.exit_rstr, 80), self.path_rstr)
 
     def read_routers(self, nslist):
         new_routers = map(MetaRouter, self.c.read_routers(nslist))
@@ -140,7 +224,6 @@
                          +" pending streams")
                     unattached_streams.extend(circuits[key].pending_streams)
                 # FIXME: Consider actually closing circ if no streams.
-                # Or send Tor a SIGNAL NEWNYM and let it do it.
                 del circuits[key]
             
         for circ in circuits.itervalues():
@@ -159,12 +242,10 @@
         else:
             circ = None
             while circ == None:
-                self.exit_rstr.del_restriction(ExitPolicyRestriction)
-                self.exit_rstr.add_restriction(
-                     ExitPolicyRestriction(stream.host, stream.port))
+                self.selmgr.set_target(stream.host, stream.port)
                 try:
                     circ = MetaCircuit(self.c.build_circuit(pathlen,
-                                    self.path_selector))
+                                    self.selmgr.path_selector))
                 except TorCtl.ErrorReply, e:
                     # FIXME: How come some routers are non-existant? Shouldn't
                     # we have gotten an NS event to notify us they
@@ -179,9 +260,11 @@
         global last_exit # Last attempted exit
         last_exit = circ.exit
 
-    def heartbeat_event(self):
-        # XXX: Config updates to selectors
-        pass
+    def heartbeat_event(self, event):
+        global selmgr
+        if id(self.selmgr) != id(selmgr):
+            self.selmgr = selmgr
+            self.selmgr.reconfigure()
 
     def circ_status_event(self, c):
         output = [c.event_name, str(c.circ_id), c.status]
@@ -214,13 +297,13 @@
         if not re.match(r"\d+.\d+.\d+.\d+", s.target_host):
             s.target_host = "255.255.255.255" # ignore DNS for exit policy check
         if s.status == "NEW" or s.status == "NEWRESOLVE":
-            global circuits
+            if s.status == "NEWRESOLVE" and not s.target_port:
+                s.target_port = resolve_port
             streams[s.strm_id] = Stream(s.strm_id, s.target_host, s.target_port)
 
             self.attach_stream_any(streams[s.strm_id],
                                    streams[s.strm_id].detached_from)
         elif s.status == "DETACHED":
-            global circuits
             if s.strm_id not in streams:
                 plog("WARN", "Detached stream "+str(s.strm_id)+" not found")
                 streams[s.strm_id] = Stream(s.strm_id, s.target_host,
@@ -283,20 +366,22 @@
         self.read_routers(n.nslist)
         plog("DEBUG", "Read " + str(len(n.nslist))+" NS => " 
              + str(len(sorted_r)) + " routers")
-        self.entry_rstr.update_routers(sorted_r)
-        self.mid_rstr.update_routers(sorted_r)
-        self.exit_rstr.update_routers(sorted_r)
+        self.selmgr.update_routers(sorted_r)
     
     def new_desc_event(self, d):
         for i in d.idlist: # Is this too slow?
             self.read_routers(self.c.get_network_status("id/"+i))
         plog("DEBUG", "Read " + str(len(d.idlist))+" Desc => " 
              + str(len(sorted_r)) + " routers")
-        self.entry_rstr.update_routers(sorted_r)
-        self.mid_rstr.update_routers(sorted_r)
-        self.exit_rstr.update_routers(sorted_r)
-        
-def commandloop(s):
+        self.selmgr.update_routers(sorted_r)
+
+def clear_dns_cache(c):
+    lines = c.sendAndRecv("SIGNAL CLEARDNSCACHE\r\n")
+    for _,msg,more in lines:
+        plog("DEBUG", msg)
+ 
+def commandloop(s, c):
+    global selmgr
     s.write("220 Welcome to the Tor Metatroller "+mt_version+"! Try HELP for Info\r\n\r\n")
     while 1:
         buf = s.readline()
@@ -314,23 +399,30 @@
         elif command == "NEWEXIT" or command == "NEWNYM":
             global new_nym
             new_nym = True
+            clear_dns_cache(c)
             plog("DEBUG", "Got new nym")
             s.write("250 NEWNYM OK\r\n")
         elif command == "GETDNSEXIT":
-            pass # TODO
+            pass # TODO: Takes a hostname? Or prints most recent?
         elif command == "RESETSTATS":
             s.write("250 OK\r\n")
         elif command == "ORDEREXITS":
-            global order_exits
             try:
-                if arg: order_exits = int(arg)
+                if arg:
+                    order_exits = int(arg)
+                    newmgr = copy.copy(selmgr)
+                    newmgr.order_exits = order_exits
+                    selmgr = newmgr
                 s.write("250 ORDEREXITS="+str(order_exits)+" OK\r\n")
             except ValueError:
                 s.write("510 Integer expected\r\n")
         elif command == "USEALLEXITS":
-            global use_all_exits
             try:
-                if arg: use_all_exits = int(arg)
+                if arg:
+                    use_all_exits = int(arg)
+                    newmgr = copy.copy(selmgr)
+                    newmgr.use_all_exits = use_all_exits
+                    selmgr = newmgr
                 s.write("250 USEALLEXITS="+str(use_all_exits)+" OK\r\n")
             except ValueError:
                 s.write("510 Integer expected\r\n")
@@ -349,23 +441,32 @@
             except ValueError:
                 s.write("510 Integer expected\r\n")
         elif command == "PERCENTFAST":
-            global percent_fast
             try:
-                if arg: percent_fast = int(arg)
+                if arg:
+                    percent_fast = int(arg)
+                    newmgr = copy.copy(selmgr)
+                    newmgr.percent_fast = percent_fast
+                    selmgr = newmgr
                 s.write("250 PERCENTFAST="+str(percent_fast)+" OK\r\n")
             except ValueError:
                 s.write("510 Integer expected\r\n")
         elif command == "PERCENTSKIP":
-            global percent_skip
             try:
-                if arg: percent_skip = int(arg)
+                if arg:
+                    percent_skip = int(arg)
+                    newmgr = copy.copy(selmgr)
+                    newmgr.percent_skip = percent_skip
+                    selmgr = newmgr
                 s.write("250 PERCENTSKIP="+str(percent_skip)+" OK\r\n")
             except ValueError:
                 s.write("510 Integer expected\r\n")
         elif command == "BWCUTOFF":
-            global min_bw
             try:
-                if arg: min_bw = int(arg)
+                if arg:
+                    min_bw = int(arg)
+                    newmgr = copy.copy(selmgr)
+                    newmgr.min_bw = min_bw
+                    selmgr = newmgr
                 s.write("250 BWCUTOFF="+str(min_bw)+" OK\r\n")
             except ValueError:
                 s.write("510 Integer expected\r\n")
@@ -404,7 +505,7 @@
     while 1:
         client = srv.accept()
         if not client: break
-        thr = threading.Thread(None, lambda: commandloop(BufSock(client)))
+        thr = threading.Thread(None, lambda: commandloop(BufSock(client), c))
         thr.run()
     srv.close()
 
@@ -414,7 +515,7 @@
     c = PathSupport.Connection(s)
     c.debug(file("control.log", "w"))
     c.authenticate()
-    c.set_event_handler(SnakeHandler(c))
+    c.set_event_handler(SnakeHandler(c, selmgr))
     c.set_events([TorCtl.EVENT_TYPE.STREAM,
                   TorCtl.EVENT_TYPE.NS,
                   TorCtl.EVENT_TYPE.CIRC,

Modified: torflow/trunk/soat.pl
===================================================================
--- torflow/trunk/soat.pl	2007-03-02 20:00:37 UTC (rev 9714)
+++ torflow/trunk/soat.pl	2007-03-02 20:16:36 UTC (rev 9715)
@@ -51,7 +51,7 @@
         "mail.google.com",
         "www.relakks.com",
         "www.hushmail.com",
-        "login.yahoo.com",
+#        "login.yahoo.com",
         "www.fastmail.fm",
         "addons.mozilla.org"
 );