[Author Prev][Author Next][Thread Prev][Thread Next][Author Index][Thread Index]

[minion-cvs] Break and fix server repeatedly until DNS-based routing...



Update of /home/minion/cvsroot/src/minion/lib/mixminion/server
In directory moria.mit.edu:/tmp/cvs-serv21642/lib/mixminion/server

Modified Files:
	DNSFarm.py MMTPServer.py PacketHandler.py ServerConfig.py 
	ServerKeys.py ServerMain.py ServerQueue.py 
Log Message:
Break and fix server repeatedly until DNS-based routing works.  Hurray.

TODO:
 - Reflect state of work
 - Defer client-side fragment reassembly

setup.py
 - Bump version to 0.0.6alpha2

BuildMessage.py
 - Choose relay types and routing info based on server capabilities -- don't
   just assume that IPV4 is right for everyone.

ClientDirectory.py
 - Document everything.
 - Remove spurious isSURB field from ExitAddress
 - Fix theoretical bug that would crash path generation with non-mixminion
   servers.
 - Deprecate unadorned '*' in paths.
 - Note bug with path length checking.

ClientMain.py
 - Deprecate -H; use -P foo instead.
 - Make list-servers work

Config.py, NetUtils.py:
 - Refactor _parseIP and _parseIP6 validation functions into NetUtils.

Config.py
 - Add documentation

NetUtils.py
 - Add documentation
 - Debug getIP
 - Add function to detect static IP4 or IP6 addresses.

Packet.py
 - Debug parseRelayInfoByType
 - Documentation

ServerInfo.py:
 - Documentation
 - Disable hostname-based routing with 0.0.6alpha1 servers.  (I don't
   want to break Tonga and peertech.)
 
test.py:
 - Add tests for DNS functionality
 - Add tests for DNS farm functionality
 - Add tests for new ServerInfo methods

DNSFarm.py
 - Add documentation
 - Make DNSCache.shutdown() more failsafe, and make shutdown(wait=1) not
   deadlock the server.
 - Add special-case test to skip hostname lookup for static IP addresses

MMTPServer.py
 - Make sure that we get real RelayPacket objects.
 - Choose whether to use IP4 or IP6 connections in MMTPClientConnection

PacketHandler.py
 - Accept HOST relay types and MMTPHostInfo routinginfo.

ServerConfig.py:
 - Remove obsolete __DEBUG_GC option.

ServerMain:
 - Change same-server detection mechanism to only look at key ID.

ServerQueue.py:
 - Add assert to catch weird bug case.
 



Index: DNSFarm.py
===================================================================
RCS file: /home/minion/cvsroot/src/minion/lib/mixminion/server/DNSFarm.py,v
retrieving revision 1.3
retrieving revision 1.4
diff -u -d -r1.3 -r1.4
--- DNSFarm.py	7 Nov 2003 08:11:36 -0000	1.3
+++ DNSFarm.py	10 Nov 2003 04:12:20 -0000	1.4
@@ -1,86 +1,137 @@
 # Copyright 2003 Nick Mathewson.  See LICENSE for licensing information.
 # $Id$
 
-"""mixminion.server.DNSFarm DOCDOC"""
+"""mixminion.server.DNSFarm: code to implement asynchronous DNS resolves with
+   background threads and cachhe the results.
+   """
 
 import socket
 import threading
 import time
 import sys
 from mixminion.Common import LOG, TimeoutQueue, QueueEmpty
-from mixminion.NetUtils import getIP
+from mixminion.NetUtils import getIP, nameIsStaticIP
 
 __all__ = [ 'DNSCache' ]
 
 class _Pending:
+    """Class to represent resolves that we're waiting for an answer on."""
     def __cmp__(self,o):
         return cmp(type(self), type(o))
-PENDING = _Pending
+PENDING = _Pending()
 
+# We never shutdown threads if doing so would leave fewer than MIN_THREADS.
 MIN_THREADS = 2
+# We never shutdown threads if doing so would leave fewer than MIN_FREE_THREADS
+# idle threads.
 MIN_FREE_THREADS = 1
+# We never start a new thread if doing so would make more than MAX_THREADS.
 MAX_THREADS = 8
+# Subject to MIN_THREADS and MIN_FREE_THREADS, we shutdown threads when
+# they're idele for more than MAX_THREAD_IDLE seconds.
 MAX_THREAD_IDLE = 5*60
-MAX_ENTRY_TTL = 15*60
-
+# We clear entries from the DNS cache when they're more than MAX_ENTRY_TTL
+# seconds old.
+MAX_ENTRY_TTL = 30*60
 
 class DNSCache:
-    """DOCDOC"""
+    """Class to cache answers to DNS requests and manager DNS threads."""
+    ## Fields:
+    # _isShutdown: boolean: are the threads shutting down?  (While the
+    #     threads are shutting down, we don't answer any requests.)
+    # cache: map from name to PENDING or getIP result.
+    # callbacks: map from name to list of callback functions. (See lookup
+    #     for definition of callback.)
+    # lock: Lock to control access to this class's shared state.
+    # nBusyThreads: Number of threads that are currently handling requests.
+    # nLiveThreads: Number of threads that are currently running.
+    # queue: Instance of TimeoutQueue that holds either names to resolve,
+    #     or instances of None to shutdown threads.
+    # threads: List of DNSThreads, some of which may be dead.
     def __init__(self):
-        self.cache = {} # name -> getIP return / PENDING
+        """Create a new DNSCache"""
+        self.cache = {}
         self.callbacks = {}
         self.lock = threading.RLock()
         self.queue = TimeoutQueue()
         self.threads = []
         self.nLiveThreads = 0
         self.nBusyThreads = 0
+        self._isShutdown = 0
         self.cleanCache()
     def getNonblocking(self, name):
+        """Return the cached result for the lookup of name.  If we're
+           waiting for an answer, return PENDING.  If there is no cached
+           result, return None.
+        """
         try:
             self.lock.acquire()
             return self.cache.get(name)
         finally:
             self.lock.release()
     def lookup(self,name,cb):
+        """Look up the name 'name', and pass the result to the callback
+           function 'cb' when we're done.  The result will be of the
+           same form as the return value of NetUtils.getIP: either
+           (Family, Address, Time) or ('NOENT', Reason, Time).
+
+           Note: The callback may be invoked from a different thread.  Either
+           this thread or a DNS thread will block until the callback finishes,
+           so it shouldn't be especially time-consuming.
+        """
+        # Check for a static IP first; no need to resolve that.
+        v = nameIsStaticIP(name)
+        if v is not None:
+            cb(name,v)
+            return
         try:
             self.lock.acquire()
             v = self.cache.get(name)
+            # If we don't have a cached answer, add cb to self.callbacks
             if v is None or v is PENDING:
                 self.callbacks.setdefault(name, []).append(cb)
-            #XXXX006 We should check for literal addresses before we queue
+            # If we aren't looking up the answer, start looking it up.
             if v is None:
                 self._beginLookup(name)
         finally:
             self.lock.release()
+        # If we _did_ have an answer, invoke the callback now.
         if v is not None and v is not PENDING:
             cb(name,v)
+
     def shutdown(self, wait=0):
+        """Tell all the DNS threads to shut down.  If 'wait' is true,
+           don't wait until all the theads have completed."""
         try:
             self.lock.acquire()
+            self._isShutdown = 1
             self.queue.clear()
             for _ in xrange(self.nLiveThreads*2):
                 self.queue.put(None)
-            if wait:
-                for thr in self.threads:
-                    thr.join()
         finally:
             self.lock.release()
-    def cleanCache(self):
+
+        if wait:
+            for thr in self.threads:
+                thr.join()
+
+    def cleanCache(self,now=None):
+        """Remove all expired entries from the cache."""
+        if now is None:
+            now = time.time()
         try:
             self.lock.acquire()
 
-            # Purge old entries
-            now = time.time()
-            cache = self.cache            
+            # Purge old entries from the
+            cache = self.cache
             for name in cache.keys():
                 v = cache[name]
                 if v is PENDING: continue
                 if now-v[2] > MAX_ENTRY_TTL:
                     del cache[name]
 
-            # Remove dead threads from self.threads
-            self.threads = [ thr for thr in self.threads
-                             if thr.isLive() ]
+            # Remove dead threads from self.threads.
+            self.threads = [ thr for thr in self.threads if thr.isAlive() ]
 
             # Make sure we have enough threads.
             if len(self.threads) < MIN_THREADS:
@@ -89,18 +140,35 @@
                     self.threads[-1].start()
         finally:
             self.lock.release()
+            
     def _beginLookup(self,name):
-        # Must hold lock
+        """Helper function: Begin looking up 'name'.
+
+           Caller must hold self.lock
+        """
         self.cache[name] = PENDING
+        if self._isShutdown:
+            # If we've shut down the threads, don't queue the request at
+            # all; it'll stay pending indefinitely.
+            return
+        # Queue the request.
         self.queue.put(name)
+        # If there aren't enough idle threads, and if we haven't maxed
+        # out the threads, start a new one.
         if (self.nLiveThreads < self.nBusyThreads + MIN_FREE_THREADS
             and self.nLiveThreads < MAX_THREADS):
-            self.threads.append(DNSThread(self))
-            self.threads[-1].start()
+            thread = DNSThread(self)
+            thread.start()
+            self.threads.append(thread)
     def _lookupDone(self,name,val):
+        """Helper function: invoked when we get the answer 'val' for
+           a lookup of 'name'.
+           """
         try:
             self.lock.acquire()
+            # Insert the value in the cache.
             self.cache[name]=val
+            # Get the callbacks for the name, if any.
             cbs = self.callbacks.get(name,[])
             try:
                 del self.callbacks[name]
@@ -108,34 +176,51 @@
                 pass
         finally:
             self.lock.release()
+        # Now that we've released the lock, invoke the callbacks.
         for cb in cbs:
             cb(name,val)
     def _adjLiveThreads(self,n):
+        """Helper: adjust the number of live threads by n"""
         self.lock.acquire()
         self.nLiveThreads += n
         self.lock.release()
     def _adjBusyThreads(self,n):
+        """Helper: adjust the number of busy threads by n"""
         self.lock.acquire()
         self.nBusyThreads += n
         self.lock.release()
 
 class DNSThread(threading.Thread):
+    """Helper class: used by DNSCache to implement name resolution."""
+    ## Fields:
+    # dnscache: The DNSCache object that should receive our answers
     def __init__(self, dnscache):
+        """Create a new DNSThread"""
         threading.Thread.__init__(self)
-        self.dnscache = dnscache
-        self.setDaemon(1)
+        self.dnscache = dnscache 
+        self.setDaemon(1) # When the process exits, don't wait for this thread.
     def run(self):
-        self.dnscache._adjLiveThreads(1)
+        """Thread body: pull questions from the DNS thread queue and
+           answer them."""
+        queue = self.dnscache.queue
+        _lookupDone = self.dnscache._lookupDone
+        _adjBusyThreads = self.dnscache._adjBusyThreads
+        _adjLiveThreads = self.dnscache._adjLiveThreads
         try:
+            _adjLiveThreads(1)
             try:
                 while 1:
-                    hostname = self.dnscache.queue.get(timeout=MAX_THREAD_IDLE)
+                    # Get a question from the queue, but don't wait more than
+                    # MAX_THREAD_IDLE seconds
+                    hostname = queue.get(timeout=MAX_THREAD_IDLE)
+                    # If the question is None, shutdown.
                     if hostname is None:
                         return
-                    self.dnscache._adjBusyThreads(1)
+                    # Else, resolve the IP and send the answer to the dnscache
+                    _adjBusyThreads(1)
                     result = getIP(hostname)
-                    self.dnscache._lookupDone(hostname, result)
-                    self.dnscache._adjBusyThreads(-1)
+                    _lookupDone(hostname, result)
+                    _adjBusyThreads(-1)
             except QueueEmpty:
                 LOG.debug("DNS thread shutting down: idle for %s seconds.",
                          MAX_THREAD_IDLE)
@@ -143,6 +228,5 @@
                 LOG.error_exc(sys.exc_info(),
                               "Exception in DNS thread; shutting down.")
         finally:
-            self.dnscache.adjLiveThreads(-1)
-
+            _adjLiveThreads(-1)
 

Index: MMTPServer.py
===================================================================
RCS file: /home/minion/cvsroot/src/minion/lib/mixminion/server/MMTPServer.py,v
retrieving revision 1.55
retrieving revision 1.56
diff -u -d -r1.55 -r1.56
--- MMTPServer.py	7 Nov 2003 08:11:36 -0000	1.55
+++ MMTPServer.py	10 Nov 2003 04:12:20 -0000	1.56
@@ -32,7 +32,7 @@
 from mixminion.Crypto import sha1, getCommonPRNG
 from mixminion.Packet import MESSAGE_LEN, DIGEST_LEN, IPV4Info, MMTPHostInfo
 from mixminion.MMTPClient import PeerCertificateCache
-from mixminion.NetUtils import IN_PROGRESS_ERRNOS, getProtocolSupport
+from mixminion.NetUtils import IN_PROGRESS_ERRNOS, getProtocolSupport, AF_INET, AF_INET6
 import mixminion.server.EventStats as EventStats
 from mixminion.Filestore import CorruptedFile
 
@@ -763,6 +763,9 @@
        PacketHandler.RelayPacket objects."""
     def __init__(self, pending):
         DeliverableMessage.__init__(self)
+        assert hasattr(pending, 'succeeded')
+        assert hasattr(pending, 'failed')
+        assert hasattr(pending, 'getMessage')        
         self.pending = pending
     def succeeded(self):
         self.pending.succeeded()
@@ -823,8 +826,11 @@
         if certCache is None:
             certCache = PeerCertificateCache()
         self.certCache = certCache
-
-        sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+        if ':' in ip:
+            family = AF_INET6
+        else:
+            family = AF_INET
+        sock = socket.socket(family, socket.SOCK_STREAM)
         sock.setblocking(0)
         self.keyID = keyID
         self.ip = ip
@@ -1087,8 +1093,8 @@
             port = config['Incoming/MMTP']['Port']
 
         self.listeners = [] #DOCDOC
-        for (supported, addr, family) in [(ip4_supported,IP,socket.AF_INET),
-                                          (ip6_supported,IP6,socket.AF_INET6)]:
+        for (supported, addr, family) in [(ip4_supported,IP,AF_INET),
+                                          (ip6_supported,IP6,AF_INET6)]:
             if not supported or not addr:
                 continue
             listener = ListenConnection(family, addr, port,
@@ -1136,7 +1142,7 @@
     def sendMessagesByRouting(self, routing, deliverable):
         """DOCDOC"""
         if isinstance(routing, IPV4Info):
-            self.sendMessages(socket.AF_INET, routing.ip, routing.port,
+            self.sendMessages(AF_INET, routing.ip, routing.port,
                               routing.keyinfo, deliverable)
         else:
             assert isinstance(routing, MMTPHostInfo)

Index: PacketHandler.py
===================================================================
RCS file: /home/minion/cvsroot/src/minion/lib/mixminion/server/PacketHandler.py,v
retrieving revision 1.29
retrieving revision 1.30
diff -u -d -r1.29 -r1.30
--- PacketHandler.py	13 Oct 2003 17:30:24 -0000	1.29
+++ PacketHandler.py	10 Nov 2003 04:12:20 -0000	1.30
@@ -212,7 +212,8 @@
 
         # If we're not an exit node, make sure that what we recognize our
         # routing type.
-        if rt not in (Packet.SWAP_FWD_IPV4_TYPE, Packet.FWD_IPV4_TYPE):
+        if rt not in (Packet.SWAP_FWD_IPV4_TYPE, Packet.FWD_IPV4_TYPE,
+                      Packet.SWAP_FWD_HOST_TYPE, Packet.FWD_HOST_TYPE):
             raise ContentError("Unrecognized Mixminion routing type")
 
         # Decrypt header 2.
@@ -232,7 +233,7 @@
             header1, header2 = header2, header1
 
         # Build the address object for the next hop
-        address = Packet.parseIPV4Info(subh.routinginfo)
+        address = Packet.parseRelayInfoByType(rt, subh.routinginfo)
 
         # Construct the message for the next hop.
         pkt = Packet.Packet(header1, header2, payload).pack()
@@ -246,9 +247,9 @@
     # address -- an instance of IPV4Info
     # msg -- a 32K packet.
     def __init__(self, address, msg):
-        """Create a new packet, given an instance of IPV4Info and a 32K
-           packet."""
-        assert isinstance(address, Packet.IPV4Info)
+        """Create a new packet, given an instance of IPV4Info or
+           MMTPHostInfo and a 32K packet."""
+        assert isinstance(address, Packet.IPV4Info) or isinstance(address, Packet.MMTPHostInfo)
         assert len(msg) == 1<<15
         self.address = address
         self.msg = msg
@@ -258,8 +259,8 @@
         return 0
 
     def getAddress(self):
-        """Return an instance of IPV4Info indicating the address where this
-           packet is to be delivered."""
+        """Return an instance of IPV4Info or MMTPHostInfo indicating
+           the address where this packet is to be delivered."""
         return self.address
 
     def getPacket(self):

Index: ServerConfig.py
===================================================================
RCS file: /home/minion/cvsroot/src/minion/lib/mixminion/server/ServerConfig.py,v
retrieving revision 1.39
retrieving revision 1.40
diff -u -d -r1.39 -r1.40
--- ServerConfig.py	7 Nov 2003 07:03:28 -0000	1.39
+++ ServerConfig.py	10 Nov 2003 04:12:20 -0000	1.40
@@ -289,8 +289,6 @@
                      'MixPoolRate' : ('ALLOW', "fraction", "60%"),
                      'MixPoolMinSize' : ('ALLOW', "int", "5"),
 		     'Timeout' : ('ALLOW', "interval", "5 min"),
-                     #XXXX006 remove this.
-                     '__DEBUG_GC' : ('ALLOW', "boolean", "no"),
                      },
         'DirectoryServers' : { # '__SECTION__' : ('REQUIRE', None, None),
                                'ServerURL' : ('ALLOW*', None, None),

Index: ServerKeys.py
===================================================================
RCS file: /home/minion/cvsroot/src/minion/lib/mixminion/server/ServerKeys.py,v
retrieving revision 1.56
retrieving revision 1.57
diff -u -d -r1.56 -r1.57
--- ServerKeys.py	7 Nov 2003 08:11:36 -0000	1.56
+++ ServerKeys.py	10 Nov 2003 04:12:20 -0000	1.57
@@ -1196,6 +1196,7 @@
                     LOG.warn("Hostname %r resolves to reserved address %s",
                              name,addr)
     except socket.error, e:
+        # XXXX006 Turn into a warning?
         raise UIError("Cannot resolve hostname %r: %s"%(name,e))
     _KNOWN_LOCAL_HOSTNAMES[name] = 1
 

Index: ServerMain.py
===================================================================
RCS file: /home/minion/cvsroot/src/minion/lib/mixminion/server/ServerMain.py,v
retrieving revision 1.99
retrieving revision 1.100
diff -u -d -r1.99 -r1.100
--- ServerMain.py	7 Nov 2003 08:11:36 -0000	1.99
+++ ServerMain.py	10 Nov 2003 04:12:20 -0000	1.100
@@ -300,7 +300,7 @@
         mixminion.server.ServerQueue.DeliveryQueue.__init__(self, location)
         self.server = None
         self.incomingQueue = None
-        self.addr = (ip,port,keyid) #XXXX006 need to detect same host.
+        self.keyID = keyid
 
     def configure(self, config):
         """Set up this queue according to a ServerConfig object."""
@@ -327,9 +327,7 @@
                 continue
             msgs.setdefault(addr, []).append(pending)
         for routing, messages in msgs.items():
-            if self.addr[:2] == (routing.ip, routing.port): #XXX006 detect host
-                if self.addr[2] != routing.keyinfo:
-                    LOG.warn("Delivering messages to myself with bad KeyID")
+            if self.keyID == routing.keyinfo:
                 for pending in messages:
                     LOG.trace("Delivering message OUT:%s to myself.",
                               pending.getHandle())
@@ -1056,12 +1054,6 @@
         LOG.fatal_exc(info,"Exception while configuring server")
         LOG.fatal("Shutting down because of exception: %s", info[0])
         sys.exit(1)
-
-    # Undocumented feature to cajole python into dumping gc info.
-    if config['Server']['__DEBUG_GC']:
-        import gc
-        gc.set_debug(gc.DEBUG_STATS|gc.DEBUG_COLLECTABLE|gc.DEBUG_UNCOLLECTABLE
-                     |gc.DEBUG_INSTANCES|gc.DEBUG_OBJECTS)
 
     if daemonMode:
         LOG.info("Starting server in the background")

Index: ServerQueue.py
===================================================================
RCS file: /home/minion/cvsroot/src/minion/lib/mixminion/server/ServerQueue.py,v
retrieving revision 1.34
retrieving revision 1.35
diff -u -d -r1.34 -r1.35
--- ServerQueue.py	28 Sep 2003 04:12:29 -0000	1.34
+++ ServerQueue.py	10 Nov 2003 04:12:21 -0000	1.35
@@ -176,6 +176,7 @@
     def getMessage(self):
         """Return the underlying object stored in the delivery queue, loading
            it from disk if necessary. May raise CorruptedFile."""
+        assert self.handle is not None
         if self.message is None:
             self.message = self.queue.store.getObject(self.handle)
         return self.message