[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[minion-cvs] More work on multithreaded server, other goodies. It s...



Update of /home/minion/cvsroot/src/minion/lib/mixminion/server
In directory moria.mit.edu:/tmp/cvs-serv10789/lib/mixminion/server

Modified Files:
	Modules.py PacketHandler.py ServerMain.py ServerQueue.py 
Log Message:
More work on multithreaded server, other goodies.  It seems more stable now.

setup.py:
- Remove old mixminion/server/Queue.py where found

ClientMain, ServerMain:
- Don't use stderr if it isn't an error.

Common:
- Call 'Queue' 'MessageQueue' to avoid potential confusion.

Main: 
- Add a disclaimer to the main usage message

Modules, PacketHandler, ServerMain:
- Refactor how we pass packets around so as to move message decoding into the
  processing thread:  Objects are beautiful; tuples are kludgey.

ServerMain:
- Document more of the multithreaded stuff
- refactor
- make locking more sane
- handle queues with obsolete tuple-based messages.
- Remove old comment.
- Reset logs on sighup
- Check for thread liveness (this may be a bad idea)

ServerQueue:
- Simplify clean logic
- make locking more sane.
- Be more robust on erroneous queue state.

test, benchmark:
- Update tests to use new interfaces



Index: Modules.py
===================================================================
RCS file: /home/minion/cvsroot/src/minion/lib/mixminion/server/Modules.py,v
retrieving revision 1.23
retrieving revision 1.24
diff -u -d -r1.23 -r1.24
--- Modules.py	9 Jan 2003 18:54:01 -0000	1.23
+++ Modules.py	10 Jan 2003 20:12:05 -0000	1.24
@@ -104,9 +104,8 @@
            """
         return SimpleModuleDeliveryQueue(self, queueDir)
 
-    def processMessage(self, message, tag, exitType, exitInfo):
-        """Given a message with a given exitType and exitInfo, try to deliver
-           it.  'tag' is as decribed in createDeliveryQueue. Return one of:
+    def processMessage(self, packet):
+        """Given a DeliveryPacket object, try to delier it.  Return one of:
             DELIVER_OK (if the message was successfully delivered),
             DELIVER_FAIL_RETRY (if the message wasn't delivered, but might be
               deliverable later), or
@@ -125,11 +124,12 @@
     def __init__(self, module):
         self.module = module
 
-    def queueDeliveryMessage(self, (exitType, address, tag), message):
+    def queueDeliveryMessage(self, addr, packet, retry=0):
         """Instead of queueing our message, pass it directly to the underlying
            DeliveryModule."""
+        assert addr is None
         try:
-            res = self.module.processMessage(message, tag, exitType, address)
+            res = self.module.processMessage(packet)
             if res == DELIVER_OK:
                 return
             elif res == DELIVER_FAIL_RETRY:
@@ -158,10 +158,10 @@
         self.module = module
 
     def _deliverMessages(self, msgList):
-        for handle, addr, message, n_retries in msgList:
+        for handle, addr, packet, n_retries in msgList:
+            assert addr is None
             try:
-                exitType, address, tag = addr
-                result = self.module.processMessage(message,tag,exitType,address)
+                result = self.module.processMessage(packet)
                 if result == DELIVER_OK:
                     self.deliverySucceeded(handle)
                 elif result == DELIVER_FAIL_RETRY:
@@ -174,12 +174,16 @@
                                    "Exception delivering message")
                 self.deliveryFailed(handle, 0)
 
+    def queueDeliveryMessage(self, addr, packet, retry=0):
+        assert addr is None
+        mixminion.server.ServerQueue.DeliveryQueue.queueDeliveryMessage(
+            self, None, packet, retry)
+
 class DeliveryThread(threading.Thread):
     def __init__(self, moduleManager):
         threading.Thread.__init__(self)
         self.moduleManager = moduleManager
         self.event = threading.Event()
-        self.setDaemon(1)
         self.__stoppinglock = threading.Lock() # UGLY. XXXX
         self.isStopping = 0
 
@@ -194,16 +198,20 @@
         self.event.set()
 
     def run(self):
-        while 1:
-            self.event.wait()
-            self.event.clear()
-            self.__stoppinglock.acquire()
-            stop = self.isStopping
-            self.__stoppinglock.release()
-            if stop:
-                LOG.info("Delivery thread shutting down.")
-                return
-            self.moduleManager._sendReadyMessages()
+        try:
+            while 1:
+                self.event.wait()
+                self.event.clear()
+                self.__stoppinglock.acquire()
+                stop = self.isStopping
+                self.__stoppinglock.release()
+                if stop:
+                    LOG.info("Delivery thread shutting down.")
+                    return
+                self.moduleManager._sendReadyMessages()
+        except:
+            LOG.error_exc(sys.exc_info(),
+                          "Exception in delivery; shutting down thread.")
 
 class ModuleManager:
     """A ModuleManager knows about all of the server modules in the system.
@@ -367,45 +375,24 @@
         if self.enabled.has_key(module.getName()):
             del self.enabled[module.getName()]
 
-    def queueMessage(self, message, tag, exitType, address):
-        """Queue a message for delivery."""
-        # XXXX003 remove the more complex logic here into the PacketHandler
-        # XXXX003 code.
-        # FFFF Support non-exit messages.
-        (exitType, address, tag), message = \
-                   self.decodeMessage(message, tag, exitType, address)
-        self.queueDecodedMessage((exitType, address, tag), message)
+    def queueMessage2(self, packet):
+        self.queueDecodedMessage(packet)
 
-    def queueDecodedMessage(self, (exitType, address, tag), message):
+    def queueDecodedMessage(self, packet):
         #DOCDOC
+        exitType = packet.getExitType()
+        
         mod = self.typeToModule.get(exitType, None)
         if mod is None:
             LOG.error("Unable to handle message with unknown type %s",
-                           exitType)
+                      exitType)
             return
         queue = self.queues[mod.getName()]
         LOG.debug("Delivering message %r (type %04x) via module %s",
-                       message[:8], exitType, mod.getName())
-
-        queue.queueDeliveryMessage((exitType, address, tag), message)
-
-    def decodeMessage(self, message, tag, exitType, address):
-        payload = None
-        try:
-            payload = mixminion.BuildMessage.decodePayload(message, tag)
-        except CompressedDataTooLong:
-            contents = mixminion.Packet.parsePayload(message).getContents()
-            return (exitType, address, 'long'), contents
-        except MixError:
-            return (exitType, address, 'err'), message
-
-        if payload is None:
-            # encrypted message
-            return (exitType, address, tag), message
-        else:
-            # forward message
-            return (exitType, address, None), payload
-
+                  packet.getContents()[:8], exitType, mod.getName())
+       
+        queue.queueDeliveryMessage(None, packet)
+        
     #DOCDOC
     def shutdown(self):
         if self.thread is not None:
@@ -445,7 +432,7 @@
         return [ mixminion.Packet.DROP_TYPE ]
     def createDeliveryQueue(self, directory):
         return ImmediateDeliveryQueue(self)
-    def processMessage(self, message, tag, exitType, exitInfo):
+    def processMessage(self, packet):
         LOG.debug("Dropping padding message")
         return DELIVER_OK
 
@@ -687,11 +674,11 @@
     def getExitTypes(self):
         return [ mixminion.Packet.MBOX_TYPE ]
 
-    def processMessage(self, message, tag, exitType, address):
+    def processMessage(self, packet): #message, tag, exitType, address):
         # Determine that message's address;
-        assert exitType == mixminion.Packet.MBOX_TYPE
+        assert packet.getExitType() == mixminion.Packet.MBOX_TYPE
         LOG.debug("Received MBOX message")
-        info = mixminion.Packet.parseMBOXInfo(address)
+        info = mixminion.Packet.parseMBOXInfo(packet.getAddress())
         try:
             address = self.addresses[info.user]
         except KeyError:
@@ -699,7 +686,7 @@
             return DELIVER_FAIL_NORETRY
 
         # Escape the message if it isn't plaintext ascii
-        msg = _escapeMessageForEmail(message, tag)
+        msg = _escapeMessageForEmail(packet)
 
         # Generate the boilerplate (FFFF Make this configurable)
         fields = { 'user': address,
@@ -795,14 +782,15 @@
 
         manager.enableModule(self)
 
-    def processMessage(self, message, tag, exitType, address):
-        assert exitType == mixminion.Packet.SMTP_TYPE
+    def processMessage(self, packet):
+        assert packet.getExitType() == mixminion.Packet.SMTP_TYPE
         LOG.debug("Received SMTP message")
         # parseSMTPInfo will raise a parse error if the mailbox is invalid.
         try:
-            address = mixminion.Packet.parseSMTPInfo(address).email
+            address = mixminion.Packet.parseSMTPInfo(packet.getAddress()).email
         except ParseError:
-            LOG.warn("Dropping SMTP message to invalid address %r", address)
+            LOG.warn("Dropping SMTP message to invalid address %r",
+                     packet.getAddress())
             return DELIVER_FAIL_NORETRY
 
         # Now, have we blacklisted this address?
@@ -811,7 +799,7 @@
             return DELIVER_FAIL_NORETRY
 
         # Decode and escape the message, and get ready to send it.
-        msg = _escapeMessageForEmail(message, tag)
+        msg = _escapeMessageForEmail(packet)
         msg = "To: %s\n%s\n\n%s"%(address, self.header, msg)
 
         # Send the message.
@@ -873,17 +861,18 @@
         self.tmpQueue.removeAll()
         return _MixmasterSMTPModuleDeliveryQueue(self, queueDir)
 
-    def processMessage(self, message, tag, exitType, smtpAddress):
+    def processMessage(self, packet):
         """Insert a message into the Mixmaster queue"""
-        assert exitType == mixminion.Packet.SMTP_TYPE
+        assert packet.getExitType() == mixminion.Packet.SMTP_TYPE
         # parseSMTPInfo will raise a parse error if the mailbox is invalid.
         try:
-            info = mixminion.Packet.parseSMTPInfo(smtpAddress)
+            info = mixminion.Packet.parseSMTPInfo(packet.getAddress())
         except ParseError:
-            LOG.warn("Dropping SMTP message to invalid address %r",smtpAddress)
+            LOG.warn("Dropping SMTP message to invalid address %r",
+                     packet.getAddress())
             return DELIVER_FAIL_NORETRY
 
-        msg = _escapeMessageForEmail(message, tag)
+        msg = _escapeMessageForEmail(packet)
         handle = self.tmpQueue.queueMessage(msg)
 
         cmd = self.command
@@ -936,7 +925,7 @@
 
 #----------------------------------------------------------------------
 
-def _escapeMessageForEmail(msg, tag):
+def _escapeMessageForEmail(packet):
     """Helper function: Given a message and tag, escape the message if
        it is not plaintext ascii, and wrap it in some standard
        boilerplate.  Add a disclaimer if the message is not ascii.
@@ -949,33 +938,34 @@
                          'long' [if the message might be a zlib bomb'].
 
        Returns None on an invalid message."""
-    m = _escapeMessage(msg, tag, text=1)
-    if m is None:
+    if packet.isError():
         return None
-    code, msg, tag = m
 
-    if code == 'ENC':
+    if packet.isEncrypted():
         junk_msg = """\
 This message is not in plaintext.  It's either 1) a reply; 2) a forward
 message encrypted to you; or 3) junk.\n\n"""
-    elif code == 'ZB':
+    elif packet.isOvercompressed():
         junk_msg = """\
 This message is compressed with zlib.  Ordinarily, I would have decompressed
 it, but it was compressed by more than a factor of 20, which makes me nervous.
 \n"""
-    elif code == 'BIN':
+    elif not packet.isPrintingAscii():
+        assert packet.isPlaintext()
         junk_msg = """\
 This message contains nonprinting characters, so I encoded it with Base64
 before sending it to you.\n\n"""
     else:
+        assert packet.isPlaintext()
         junk_msg = ""
 
-    if tag is not None:
-        tag = "Decoding handle: "+tag+"\n"
+    if packet.isEncrypted():
+        tag = "Decoding handle: %s\n"%packet.getAsciiTag()
     else:
         tag = ""
 
-    if msg and msg[-1] != '\n':
+    contents = packet.getAsciiContents()
+    if contents and contents[-1] != '\n':
         extra_newline = "\n"
     else:
         extra_newline = ""
@@ -983,42 +973,4 @@
     return """\
 %s======= TYPE III ANONYMOUS MESSAGE BEGINS ========
 %s%s%s======== TYPE III ANONYMOUS MESSAGE ENDS =========
-""" %(junk_msg, tag, msg, extra_newline)
-
-def _escapeMessage(message, tag, text=0):
-    """Helper: given a decoded message (and possibly its tag), determine
-       whether the message is a text plaintext message (code='TXT'), a
-       binary plaintext message (code 'BIN'), an encrypted message/reply
-       (code='ENC'), or a plaintext possible zlib bomb ('ZB').  If
-       requested, non-TXT messages are base-64 encoded.
-
-       Returns: (code, message, tag (for ENC) or None (for BIN, TXT).
-       Returns None if the message is invalid.
-
-          message -- A (possibly decoded) message
-          tag -- One of: a 20-byte decoding tag [if the message is encrypted
-                            or a reply]
-                         None [if the message is in plaintext]
-                         'err' [if the message was invalid.]
-                         'long' [if the message might be a zlib bomb'].
-          text -- flag: if true, non-TXT messages must be base64-encoded.
-    """
-    if tag == 'err':
-        return None
-    elif tag == 'long':
-        code = "ZB"
-    elif tag is not None:
-        code = "ENC"
-    else:
-        assert tag is None
-        if isPrintingAscii(message, allowISO=1):
-            code = "TXT"
-        else:
-            code = "BIN"
-
-    if text and (code != "TXT") :
-        message = base64.encodestring(message)
-    if text and tag:
-        tag = base64.encodestring(tag).strip()
-
-    return code, message, tag
+""" %(junk_msg, tag, contents, extra_newline)

Index: PacketHandler.py
===================================================================
RCS file: /home/minion/cvsroot/src/minion/lib/mixminion/server/PacketHandler.py,v
retrieving revision 1.3
retrieving revision 1.4
diff -u -d -r1.3 -r1.4
--- PacketHandler.py	16 Dec 2002 02:40:11 -0000	1.3
+++ PacketHandler.py	10 Jan 2003 20:12:05 -0000	1.4
@@ -3,11 +3,17 @@
 
 """mixminion.PacketHandler: Code to process mixminion packets on a server"""
 
+import base64
+
 import mixminion.Crypto as Crypto
 import mixminion.Packet as Packet
 import mixminion.Common as Common
+import mixminion.BuildMessage
 
-__all__ = [ 'PacketHandler', 'ContentError' ]
+from mixminion.BuildMessage import CompressedDataTooLong
+from mixminion.Common import MixError, isPrintingAscii
+
+__all__ = [ 'PacketHandler', 'ContentError', 'DeliveryPacket', 'RelayedPacket']
 
 class ContentError(Common.MixError):
     """Exception raised when a packed is malformatted or unacceptable."""
@@ -53,8 +59,10 @@
         for h in self.hashlog:
             h.close()
 
-    def processMessage(self, msg):
-        """Given a 32K mixminion message, processes it completely.
+    def processMessage(self, msg):    
+        """DOCDOC
+
+           Given a 32K mixminion message, processes it completely.
 
            Returns one of:
                     None [if the mesesage should be dropped.]
@@ -145,11 +153,9 @@
         # If we're an exit node, there's no need to process the headers
         # further.
         if rt >= Packet.MIN_EXIT_TYPE:
-            return ("EXIT",
-                    (rt, subh.getExitAddress(),
-                     keys.get(Crypto.APPLICATION_KEY_MODE),
-                     subh.getTag(),
-                     payload))
+            return DeliveryPacket(rt, subh.getExitAddress(),
+                                  keys.get(Crypto.APPLICATION_KEY_MODE),
+                                  subh.getTag(), payload)
 
         # If we're not an exit node, make sure that what we recognize our
         # routing type.
@@ -186,4 +192,111 @@
         # Construct the message for the next hop.
         msg = Packet.Message(header1, header2, payload).pack()
 
-        return ("QUEUE", (address, msg))
+        return RelayedPacket(address, msg)
+        
+class RelayedPacket:
+    def __init__(self, address, msg):
+        assert isinstance(address, Packet.IPV4Info)
+        self.address = address
+        self.msg = msg
+
+    def isDelivery(self):
+        return 0
+
+    def getAddress(self):
+        return self.address
+
+    def getPacket(self):
+        return self.msg
+
+class DeliveryPacket:
+    def __init__(self, routingType, routingInfo, applicationKey,
+                 tag, payload):
+        self.exitType = routingType
+        self.address = routingInfo
+        self.key = applicationKey
+        self.tag = tag
+        self.payload = payload
+        self.contents = None
+        self.type = None
+
+    def isDelivery(self):
+        return 1
+
+    def getExitType(self): return self.exitType
+    def getAddress(self): return self.address
+    def getTag(self): return self.tag
+    def getApplicationKey(self): return self.key
+    def getPayload(self): return self.payload
+
+    def getContents(self):
+        if self.type is None: self.decode()
+        return self.contents
+
+    def isPlaintext(self):
+        if self.type is None: self.decode()        
+        return self.type == 'plain'
+
+    def isOvercompressed(self):
+        if self.type is None: self.decode()
+        return self.type == 'long'
+
+    def isEncrypted(self):
+        if self.type is None: self.decode()
+        return self.type == 'enc'
+
+    def isPrintingAscii(self):
+        if self.type is None: self.decode()
+        return isPrintingAscii(self.contents, allowISO=1)
+
+    def isError(self):
+        if self.type is None: self.decode()
+        return self.type == 'err'
+
+    def getFakeTag(self):
+        if self.type is None: self.decode()
+        if self.type == 'enc':
+            return self.tag
+        elif self.type == 'plain':
+            return None
+        else:
+            return self.type
+
+    def decode(self):
+        if self.payload is None:
+            return
+        message = self.payload
+        self.contents = None
+        try:
+            self.contents = mixminion.BuildMessage.decodePayload(message,
+                                                                 self.tag)
+            if self.contents is None:
+                # encrypted message
+                self.type = 'enc'
+                self.contents = message
+            else:
+                # forward message
+                self.type = 'plain'
+                # self.contents is right
+        except CompressedDataTooLong, _:
+            self.contents = (mixminion.Packet.parsePayload(message)
+                                             .getContents())
+            self.type = 'long'
+        except MixError:
+            self.contents = message
+            self.type = 'err'
+
+
+        self.payload = None
+
+    def getAsciiContents(self):
+        if self.type is None:
+            self.decode()
+
+        if self.type == 'plain' and isPrintingAscii(self.contents, allowISO=1):
+            return self.contents
+        else:
+            return base64.encodestring(self.contents)
+
+    def getAsciiTag(self):
+        return base64.encodestring(self.tag).strip()

Index: ServerMain.py
===================================================================
RCS file: /home/minion/cvsroot/src/minion/lib/mixminion/server/ServerMain.py,v
retrieving revision 1.30
retrieving revision 1.31
diff -u -d -r1.30 -r1.31
--- ServerMain.py	9 Jan 2003 18:54:01 -0000	1.30
+++ ServerMain.py	10 Jan 2003 20:12:05 -0000	1.31
@@ -20,7 +20,7 @@
 # We pull this from mixminion.Common, just in case somebody still has
 # a copy of the old "mixminion/server/Queue.py" (since renamed to
 # ServerQueue.py)
-from mixminion.Common import Queue
+from mixminion.Common import MessageQueue
 
 import mixminion.Config
 import mixminion.Crypto
@@ -37,9 +37,10 @@
      secureDelete, waitForChildren
 
 class IncomingQueue(mixminion.server.ServerQueue.Queue):
-    """A DeliveryQueue to accept messages from incoming MMTP connections,
-       process them with a packet handler, and send them into a mix pool."""
-
+    """A DeliveryQueue to accept packets from incoming MMTP connections,
+       and hold them until they can be processed.  As packets arrive, and
+       are stored to disk, we notify a message queue so that another thread
+       can read them."""
     def __init__(self, location, packetHandler):
         """Create an IncomingQueue that stores its messages in <location>
            and processes them through <packetHandler>."""
@@ -47,7 +48,7 @@
         self.packetHandler = packetHandler
         self.mixPool = None
         self.moduleManager = None
-        self._queue = Queue.Queue()
+        self._queue = MessageQueue()
         for h in self.getAllMessages():
             assert h is not None
             self._queue.put(h)
@@ -65,9 +66,13 @@
         assert h is not None
         self._queue.put(h)
 
+    def getMessageQueue(self):
+        return self._queue
+
     def deliverMessage(self, handle):
-        "DOCDOC"
-        # DOCDOC called from within thread.
+        """Process a single message with a given handle, and insert it into
+           the Mix pool.  This function is called from within the processing
+           thread."""
         ph = self.packetHandler
         message = self.messageContents(handle)
         try:
@@ -77,20 +82,15 @@
                 LOG.debug("Padding message %s dropped",
                           formatBase64(message[:8]))
                 self.removeMessage(handle)
-            elif res[0] == 'EXIT':
-                # XXXX Ugly, refactor
-                rt, ri, app_key, tag, payload = res[1]
-                res = self.moduleManager.decodeMessage(payload, tag, rt, ri)
-                LOG.debug("Processed message %s; inserting into pool",
-                          formatBase64(message[:8]))
-                self.mixPool.queueObject(('EXIT', res))
             else:
+                if res.isDelivery():
+                    res.decode()
                 LOG.debug("Processed message %s; inserting into pool",
                           formatBase64(message[:8]))
                 self.mixPool.queueObject(res)
                 self.removeMessage(handle)
         except mixminion.Crypto.CryptoError, e:
-            LOG.warn("Invalid PK or misencrypted packet header in message %s: %s",
+            LOG.warn("Invalid PK or misencrypted header in message %s: %s",
                      formatBase64(message[:8]), e)
             self.removeMessage(handle)
         except mixminion.Packet.ParseError, e:
@@ -105,18 +105,19 @@
             LOG.error_exc(sys.exc_info(),
                     "Unexpected error when processing message %s (handle %s)",
                           formatBase64(message[:8]), handle)
-            # ???? Remove?  Don't remove?
+            self.removeMessage(handle) # ???? Really dump this message?
 
 class MixPool:
     """Wraps a mixminion.server.Queue.*MixQueue to send messages to an exit
-       queue and a delivery queue."""
+       queue and a delivery queue.  The files in the MixQueue are instances
+       of RelayedPacket or DeliveryPacket from PacketHandler.
+
+       All methods on this class are invoked from the main thread.
+    """
     def __init__(self, config, queueDir):
         """Create a new MixPool, based on this server's configuration and
            queue location."""
 
-        # DOCDOC lock
-        self.__lock = threading.Lock()
-
         server = config['Server']
         interval = server['MixInterval'][2]
         if server['MixAlgorithm'] == 'TimedMixQueue':
@@ -139,16 +140,15 @@
         self.moduleManager = None
 
     def lock(self):
-        self.__lock.acquire()
+        self.queue.lock()
 
     def unlock(self):
-        self.__lock.release()
+        self.queue.unlock()
 
     def queueObject(self, obj):
         """Insert an object into the queue."""
-        self.__lock.acquire()
+        obj.isDelivery() #XXXX remove this implicit typecheck.
         self.queue.queueObject(obj)
-        self.__lock.release()
 
     def count(self):
         "Return the number of messages in the queue"
@@ -171,16 +171,17 @@
                   self.queue.count(), len(handles))
         
         for h in handles:
-            tp, info = self.queue.getObject(h)
-            if tp == 'EXIT':
-                (exitType, address, tag), payload = info
+            packet = self.queue.getObject(h)
+            #XXXX remove the first case
+            if type(packet) == type(()):
+                LOG.debug("  (skipping message %s in obsolete format)", h)
+            elif packet.isDelivery():
                 LOG.debug("  (sending message %s to exit modules)",
-                          formatBase64(payload[:8]))
-                self.moduleManager.queueDecodedMessage((exitType,address,tag),
-                                                       payload)
+                          formatBase64(packet.getContents()[:8]))
+                self.moduleManager.queueDecodedMessage(packet)
             else:
-                assert tp == 'QUEUE'
-                ipv4, msg = info
+                ipv4 = packet.getAddress()
+                msg = packet.getPacket()
                 LOG.debug("  (sending message %s to MMTP server)",
                           formatBase64(msg[:8]))
                 self.outgoingQueue.queueDeliveryMessage(ipv4, msg)
@@ -192,7 +193,12 @@
         return now + self.queue.getInterval()
 
 class OutgoingQueue(mixminion.server.ServerQueue.DeliveryQueue):
-    """DeliveryQueue to send messages via outgoing MMTP connections."""
+    """DeliveryQueue to send messages via outgoing MMTP connections.  All
+       methods on this class are called from the main thread.  The addresses
+       in this queue are pickled IPV4Info objects.
+
+       All methods in this class are run from the main thread.
+    """
     def __init__(self, location):
         """Create a new OutgoingQueue that stores its messages in a given
            location."""
@@ -217,7 +223,10 @@
 
 class _MMTPServer(mixminion.server.MMTPServer.MMTPAsyncServer):
     """Implementation of mixminion.server.MMTPServer that knows about
-       delivery queues."""
+       delivery queues.
+
+       All methods in this class are run from the main thread.
+       """
     def __init__(self, config, tls):
         mixminion.server.MMTPServer.MMTPAsyncServer.__init__(self, config, tls)
 
@@ -235,16 +244,16 @@
         self.outgoingQueue.deliveryFailed(handle, retriable)
 #----------------------------------------------------------------------
 class CleaningThread(threading.Thread):
+    """Thread that handles file deletion."""
     #DOCDOC
     def __init__(self):
         threading.Thread.__init__(self)
-        self._queue = Queue.Queue()
-        self.setDaemon(1)
+        self.mqueue = MessageQueue()
 
     def deleteFile(self, fname):
         LOG.trace("Scheduling %s for deletion", fname)
         assert fname is not None
-        self._queue.put(fname)
+        self.mqueue.put(fname)
 
     def deleteFiles(self, fnames):
         for f in fnames:
@@ -252,12 +261,12 @@
 
     def shutdown(self):
         LOG.info("Telling cleanup thread to shut down.")
-        self._queue.put(None)
+        self.mqueue.put(None)
 
     def run(self):
         try:
             while 1:
-                fn = self._queue.get()
+                fn = self.mqueue.get()
                 if fn is None:
                     LOG.info("Cleanup thread shutting down.")
                     return
@@ -276,26 +285,25 @@
         threading.Thread.__init__(self)
         # Clean up logic; maybe refactor. ????
         self.incomingQueue = incomingQueue
-        self.setDaemon(1) #????
+        self.mqueue = incomingQueue.getMessageQueue()
 
     def shutdown(self):
         LOG.info("Telling processing thread to shut down.")
-        self.incomingQueue._queue.put(None)
+        self.mqueue.put(None)
 
     def run(self):
-        while 1:
-            handle = self.incomingQueue._queue.get()
-            if handle is None:
-                LOG.info("Processing thread shutting down.")
-                return
-            self.incomingQueue.deliverMessage(handle)
-            # XXXX debugging hack
-            if self.incomingQueue._queue.qsize() == 0:
-                n = self.incomingQueue.count(1)
-                if n != 0:
-                    LOG.trace("_queue was empty, but incoming queue had %s",n)
-
+        try:
+            while 1:
+                handle = self.mqueue.get()
+                if handle is None:
+                    LOG.info("Processing thread shutting down.")
+                    return
+                self.incomingQueue.deliverMessage(handle)
+        except:
+            LOG.error_exc(sys.exc_info(),
+                          "Exception while processing; shutting down thread.")
 
+#----------------------------------------------------------------------
 STOPPING = 0
 def _sigTermHandler(signal_num, _):
     '''(Signal handler for SIGTERM)'''
@@ -432,8 +440,8 @@
         #  'MIX', 'SHRED', and 'TIMEOUT'.  Kept in sorted order.
         scheduledEvents = []
         now = time.time()
-        #XXXX restore
-        scheduledEvents.append( (now + 120, "SHRED") )#FFFF make configurable
+
+        scheduledEvents.append( (now + 600, "SHRED") )#FFFF make configurable
         scheduledEvents.append( (self.mmtpServer.getNextTimeoutTime(now),
                                  "TIMEOUT") )
         nextMix = self.mixPool.getNextMixTime(now)
@@ -442,13 +450,6 @@
         scheduledEvents.sort()
 
         # FFFF Support for automatic key rotation.
-
-        # ???? Our cuurent approach can make the server unresponsive when
-        # ???? mixing many messages at once: We stop answering requests, and
-        # ???? don't start again until we've delivered all the pending
-        # ???? messages!  Also, we process every packet as soon as it arrives,
-        # ???? which can also make the system pause for a few ms at a time.
-        # ????   Possible solutions:  Multiple threads or processes...?
         while 1:
             nextEventTime = scheduledEvents[0][0]
             now = time.time()
@@ -460,8 +461,16 @@
                     LOG.info("Caught sigterm; shutting down.")
                     return
                 elif GOT_HUP:
-                    LOG.info("Ignoring sighup for now, sorry.")
+                    LOG.info("Caught sighup")
+                    LOG.info("Resetting logs")
+                    LOG.reset()
                     GOT_HUP = 0
+                # ???? This could slow us down a good bit.  Move it?
+                if not (self.cleaningThread.isAlive() and
+                        self.processingThread.isAlive() and
+                        self.moduleManager.thread.isAlive()):
+                    LOG.fatal("One of our threads has halted; shutting down.")
+                    return
                 
 ##                 # Process any new messages that have come in, placing them
 ##                 # into the mix pool.
@@ -484,8 +493,7 @@
                        (self.mmtpServer.getNextTimeoutTime(now), "TIMEOUT"))
             elif event == 'SHRED':
                 self.cleanQueues()
-                insort(scheduledEvents,
-                       (now + 120, "SHRED")) #XXXX Restore original value
+                insort(scheduledEvents, (now + 600, "SHRED"))
             elif event == 'MIX':
                 # Before we mix, we need to log the hashes to avoid replays.
                 # FFFF We need to recover on server failure.
@@ -619,7 +627,7 @@
             sys.exit(1)
 
     try:
-        print >>sys.stderr, "Reading configuration from %s"%configFile
+        print "Reading configuration from %s"%configFile
         return mixminion.server.ServerConfig.ServerConfig(fname=configFile)
     except (IOError, OSError), e:
         print >>sys.stderr, "Error reading configuration file %r:"%configFile
@@ -640,11 +648,12 @@
     except:
         info = sys.exc_info()
         LOG.fatal_exc(info,"Exception while configuring server")
-        print >>sys.stderr, "Shutting down because of exception: %s"%info[1]
+        LOG.fatal("Shutting down because of exception: %s", info[0])
+        #XXXX if sys.stderr is still real, send a message there as well.
         sys.exit(1)
 
     if config['Server'].get("Daemon",1):
-        print >>sys.stderr, "Starting server in the background"
+        print "Starting server in the background"
         try:
             daemonize()
         except:
@@ -664,7 +673,8 @@
     except:
         info = sys.exc_info()
         LOG.fatal_exc(info,"Exception while configuring server")
-        print >>sys.stderr, "Shutting down because of exception: %s"%info[1]
+        LOG.fatal("Shutting down because of exception: %s", info[0])
+        #XXXX if sys.stderr is still real, send a message there as well.
         sys.exit(1)            
             
     LOG.info("Starting server: Mixminion %s", mixminion.__version__)
@@ -675,7 +685,8 @@
     except:
         info = sys.exc_info()
         LOG.fatal_exc(info,"Exception while running server")
-        print >>sys.stderr, "Shutting down because of exception: %s"%info[1]
+        LOG.fatal("Shutting down because of exception: %s", info[0])
+        #XXXX if sys.stderr is still real, send a message there as well.
     LOG.info("Server shutting down")
     server.close()
     LOG.info("Server is shut down")
@@ -721,10 +732,10 @@
     LOG.setMinSeverity("INFO")
     mixminion.Crypto.init_crypto(config)
     keyring = mixminion.server.ServerKeys.ServerKeyring(config)
-    print >>sys.stderr, "Creating %s keys..." % keys
+    print "Creating %s keys..." % keys
     for i in xrange(keys):
         keyring.createKeys(1)
-        print >> sys.stderr, ".... (%s/%s done)" % (i+1,keys)
+        print ".... (%s/%s done)" % (i+1,keys)
 
 #----------------------------------------------------------------------
 _REMOVEKEYS_USAGE = """\

Index: ServerQueue.py
===================================================================
RCS file: /home/minion/cvsroot/src/minion/lib/mixminion/server/ServerQueue.py,v
retrieving revision 1.1
retrieving revision 1.2
diff -u -d -r1.1 -r1.2
--- ServerQueue.py	9 Jan 2003 06:28:58 -0000	1.1
+++ ServerQueue.py	10 Jan 2003 20:12:05 -0000	1.2
@@ -30,9 +30,9 @@
 # trash.
 INPUT_TIMEOUT = 6000
 
-# If we've been cleaning for more than CLEAN_TIMEOUT seconds, assume the
-# old clean is dead.
-CLEAN_TIMEOUT = 120
+## # If we've been cleaning for more than CLEAN_TIMEOUT seconds, assume the
+## # old clean is dead.
+## CLEAN_TIMEOUT = 120
 
 class Queue:
     """A Queue is an unordered collection of files with secure insert, move,
@@ -47,23 +47,34 @@
        (Where HANDLE is a randomly chosen 12-character selection from the
        characters 'A-Za-z0-9+-'.  [Collision probability is negligable.])
 
-       XXXX Threading notes: Currently, Queue is only threadsafe when XXXX
+       Threading notes:  Although Queue itself is threadsafe, you'll want
+       to synchronize around any multistep operations that you want to
+       run atomicly.  Use Queue.lock() and Queue.unlock() for this.
+
+       In the Mixminion server, no queue currently has more than one producer
+       or more than one consumer ... so synchronization turns out to be
+       fairly easy.
        """
-       # How negligible?  A back-of-the-envelope approximation: The chance
-       # of a collision reaches .1% when you have 3e9 messages in a single
-       # queue.  If Alice somehow manages to accumulate a 96 gigabyte
-       # backlog, we'll have bigger problems than name collision... such
-       # as the fact that most Unices behave badly when confronted with
-       # 3 billion files in the same directory... or the fact that,
-       # at today's processor speeds, it will take Alice 3 or 4
-       # CPU-years to clear her backlog.
+       # How negligible are the chances of collision?  A back-of-the-envelope
+       # approximation: The chance of a collision reaches .1% when you have 3e9
+       # messages in a single queue.  If Alice somehow manages to accumulate a
+       # 96 gigabyte backlog, we'll have bigger problems than name
+       # collision... such as the fact that most Unices behave badly when
+       # confronted with 3 billion files in the same directory... or the fact
+       # that, at today's processor speeds, it will take Alice 3 or 4 CPU-years
+       # to clear her backlog.
+       #
+       # Threading: If we ever get more than one producer, we're fine.  With
+       #    more than one consumer, we'll need to modify DeliveryQueue below.
+       
 
     # Fields:   dir--the location of the queue.
     #           n_entries: the number of complete messages in the queue.
     #                 <0 if we haven't counted yet.
     #           _lock: A lock that must be held while modifying or accessing
-    #                 the queue object.  Because of our naming scheme, access
-    #                 to the filesystem itself need not be synchronized.
+    #                 the queue object.  Filesystem operations are allowed
+    #                 without holding the lock, but they must not be visible
+    #                 to users of the queue.
     def __init__(self, location, create=0, scrub=0):
         """Creates a queue object for a given directory, 'location'.  If
            'create' is true, creates the directory if necessary.  If 'scrub'
@@ -89,12 +100,20 @@
         # Count messages on first time through.
         self.n_entries = -1
 
+    def lock(self):
+        """Prevent access to this queue from other threads."""
+        self._lock.acquire()
+
+    def unlock(self):
+        """Release the lock on this queue."""
+        self._lock.release()
+
     def queueMessage(self, contents):
         """Creates a new message in the queue whose contents are 'contents',
            and returns a handle to that message."""
         f, handle = self.openNewMessage()
         f.write(contents)
-        self.finishMessage(f, handle)
+        self.finishMessage(f, handle) # handles locking
         return handle
 
     def queueObject(self, object):
@@ -102,7 +121,7 @@
            object."""
         f, handle = self.openNewMessage()
         cPickle.dump(object, f, 1)
-        self.finishMessage(f, handle)
+        self.finishMessage(f, handle) # handles locking
         return handle
 
     def count(self, recount=0):
@@ -127,10 +146,7 @@
 
            If there are fewer than 'count' messages in the queue, all the
            messages will be retained."""
-        self._lock.acquire()
-        handles = [ fn[4:] for fn in os.listdir(self.dir)
-                           if fn.startswith("msg_") ]
-        self._lock.release()
+        handles = self.getAllMessages() # handles locking
 
         return getCommonPRNG().shuffle(handles, count)
 
@@ -144,7 +160,7 @@
 
     def removeMessage(self, handle):
         """Given a handle, removes the corresponding message from the queue."""
-        self.__changeState(handle, "msg", "rmv")
+        self.__changeState(handle, "msg", "rmv") # handles locking.
 
     def removeAll(self):
         """Removes all messages from this queue."""
@@ -176,28 +192,38 @@
     def getMessagePath(self, handle):
         """Given a handle for an existing message, return the name of the
            file that contains that message."""
+        # We don't need to lock here: the handle is still valid, or it isn't.
         return os.path.join(self.dir, "msg_"+handle)
 
     def openMessage(self, handle):
         """Given a handle for an existing message, returns a file descriptor
            open to read that message."""
+        # We don't need to lock here; the handle is still valid, or it isn't.
         return open(os.path.join(self.dir, "msg_"+handle), 'rb')
 
     def messageContents(self, handle):
         """Given a message handle, returns the contents of the corresponding
            message."""
-        f = open(os.path.join(self.dir, "msg_"+handle), 'rb')
-        s = f.read()
-        f.close()
-        return s
+        try:
+            self._lock.acquire()
+            f = open(os.path.join(self.dir, "msg_"+handle), 'rb')
+            s = f.read()
+            f.close()
+            return s
+        finally:
+            self._lock.release()
 
     def getObject(self, handle):
         """Given a message handle, read and unpickle the contents of the
            corresponding message."""
-        f = open(os.path.join(self.dir, "msg_"+handle), 'rb')
-        res = cPickle.load(f)
-        f.close()
-        return res
+        try:
+            self._lock.acquire()
+            f = open(os.path.join(self.dir, "msg_"+handle), 'rb')
+            res = cPickle.load(f)
+            f.close()
+            return res
+        finally:
+            self._lock.release()
 
     def openNewMessage(self):
         """Returns (file, handle) tuple to create a new message.  Once
@@ -228,36 +254,41 @@
 
            DOCDOC secureDeleteFn
         """
-        # ???? Threading?
-        now = time.time()
-        cleanFile = os.path.join(self.dir,".cleaning")
+        # We don't need to hold the lock here; we synchronize via the
+        # filesystem.
 
-        cleaning = 1
-        while cleaning:
-            try:
-                # Try to get the .cleaning lock file.  If we can create it,
-                # we're the only cleaner around.
-                fd = os.open(cleanFile, os.O_WRONLY+os.O_CREAT+os.O_EXCL, 0600)
-                os.write(fd, str(now))
-                os.close(fd)
-                cleaning = 0
-            except OSError:
-                try:
-                    # If we can't create the file, see if it's too old.  If it
-                    # is too old, delete it and try again.  If it isn't, there
-                    # may be a live clean in progress.
-                    s = os.stat(cleanFile)
-                    if now - s[stat.ST_MTIME] > CLEAN_TIMEOUT:
-                        os.unlink(cleanFile)
-                    else:
-                        return 1
-                except OSError:
-                    # If the 'stat' or 'unlink' calls above fail, then
-                    # .cleaning must not exist, or must not be readable
-                    # by us.
-                    if os.path.exists(cleanFile):
-                        # In the latter case, bail out.
-                        return 1
+# XXXX this logic never worked anyway; now we do all our cleaning in a separate
+# XXXX thread anyway.
+
+##         now = time.time()
+##         cleanFile = os.path.join(self.dir,".cleaning")
+##
+##         cleaning = 1
+##         while cleaning:
+##             try:
+##                 # Try to get the .cleaning lock file.  If we can create it,
+##                 # we're the only cleaner around.
+##                 fd = os.open(cleanFile, os.O_WRONLY+os.O_CREAT+os.O_EXCL, 0600)
+##                 os.write(fd, str(now))
+##                 os.close(fd)
+##                 cleaning = 0
+##             except OSError:
+##                 try:
+##                     # If we can't create the file, see if it's too old.  If it
+##                     # is too old, delete it and try again.  If it isn't, there
+##                     # may be a live clean in progress.
+##                     s = os.stat(cleanFile)
+##                     if now - s[stat.ST_MTIME] > CLEAN_TIMEOUT:
+##                         os.unlink(cleanFile)
+##                     else:
+##                         return 1
+##                 except OSError:
+##                     # If the 'stat' or 'unlink' calls above fail, then
+##                     # .cleaning must not exist, or must not be readable
+##                     # by us.
+##                     if os.path.exists(cleanFile):
+##                         # In the latter case, bail out.
+##                         return 1
 
         rmv = []
         allowedTime = int(time.time()) - INPUT_TIMEOUT
@@ -280,8 +311,17 @@
            to 's2', and changes the internal count."""
         try:
             self._lock.acquire()
-            os.rename(os.path.join(self.dir, s1+"_"+handle),
-                      os.path.join(self.dir, s2+"_"+handle))
+            try:
+                os.rename(os.path.join(self.dir, s1+"_"+handle),
+                          os.path.join(self.dir, s2+"_"+handle))
+            except OSError, e:
+                contents = os.listdir(self.dir)
+                LOG.error("Error while trying to change %s from %s to %s: %s",
+                          handle, s1, s2, e)
+                LOG.error("Directory %s contains: %s", self.dir, contents)
+                self.count(1)
+                return
+                
             if self.n_entries < 0:
                 return
             if s1 == 'msg' and s2 != 'msg':
@@ -315,6 +355,8 @@
        won't play nice if multiple instances are looking at the same
        directory.
     """
+    # XXXX separating addr was a mistake.
+    
     ###
     # Fields:
     #    sendable -- A list of handles for all messages