[Author Prev][Author Next][Thread Prev][Thread Next][Author Index][Thread Index]

[minion-cvs] Check in pending work including fragments and bugfixes.



Update of /home/minion/cvsroot/src/minion/lib/mixminion/server
In directory moria.mit.edu:/tmp/cvs-serv26532/lib/mixminion/server

Modified Files:
	MMTPServer.py Modules.py PacketHandler.py ServerKeys.py 
	ServerMain.py ServerQueue.py 
Log Message:
Check in pending work including fragments and bugfixes.

setup.py, README, __init__.py: 
- Bump version to 0.0.5alpha2.

BuildMessage: 
- Refactor message construction code to support fragment-then-build-packets
  path.
- Change decodePayload to return SingletonPayload or FragmentPayload.
- Remove an extraneous sha1() from payload decoding.

ClientMain:
- Minimal, kludged-up support for sending fragmented forward messages.
- Remove obsolete client queue code.

Common: 
- Clean up 'bad mode' error message
- Add check for [...] in basee64-encoded data.

Config:
- Add _parseSize to allow configuration files with values like "1M", "100K",
  etc.
- Check whether the user accidentally passes mixminiond.conf to a client.

Crypto:
- Make 'openNewFile' even more paranoid.
- List failing entropy sources when none is found.

Filestore:
- Add delItem to journaled DB.
- Catch corrupted data in file stores.

Fragments: 
- Debug, appease pychecker.

Main:
- Add missing parenthesis

Packet:
- Add notion of tag-less delivery types. (FRAGMENT)
- Renane 'Message' to 'Packet', which it should have been all along.
- Move uncompression logic from decodePayload into SingletonPayload.
- Add encoding for fragmentation of messages that will be decoded at the
  exit node.
- Add 'Message-type: fragmented' for text-encoded messages.
- Check for overlong mail headers and bad from addresses when sending.

benchmark:
- Debug.
- Add some more timing.

test:
- Add tests for catching [...] in base64'd data
- Add tests for sending bad email headers.
- Use new interface from BuildMessage.
- Propagate other new interfaces.
- Add tests for _parseSize.
- More fragment reassembly tests

MMTPServer:
- Change shutdown logic again.  This may be what finally kills our
  infinite busy-loop bug.
- Appease pychecker.

Modules: 
- Add .close() and .sync() methods to modules.
- Add delivery module to reassemble fragmented messages, and relay them on
  to other delivery modules.
- Implement limits on message sizes.

PacketHandler:
- Stop calling packets messages.
- Add 'getDecodedPayload' interface to DeliveryPacket; add fragments.

ServerKeys, ServerMain:
- Change some errors to UIError, drop some dead code.

ServerQueue: 
- Handle corrupted messages on disk.

mixminiond.conf, Modules, ServerInfo:
- Add MaximumSize to module configuration
- Add configuration and capabilities for fragment reconstruction.



Index: MMTPServer.py
===================================================================
RCS file: /home/minion/cvsroot/src/minion/lib/mixminion/server/MMTPServer.py,v
retrieving revision 1.46
retrieving revision 1.47
diff -u -d -r1.46 -r1.47
--- MMTPServer.py	15 Jul 2003 04:41:18 -0000	1.46
+++ MMTPServer.py	21 Aug 2003 21:34:03 -0000	1.47
@@ -278,6 +278,7 @@
         self.__failed = 0
         self.__inbuf = []
         self.__inbuflen = 0
+        self.__awaitingShutdown = 0
 
         if serverMode:
             self.__connecting = 0
@@ -352,27 +353,44 @@
         # back... but we don't want to keep retrying indefinitely, or
         # else we can deadlock on a connection from ourself to
         # ourself.
-        if self.__con.shutdown() == 1: #may throw want*
-            #trace("Got a 1 on shutdown (fd %s)", self.fd)
-            self.__sock.close()
-            self.__state = None
-            self.shutdownFinished()
-            return
 
-        # If we don't get any response on shutdown, stop blocking; the other
-        # side may be hostile, confused, or deadlocking.
-        #trace("Got a 0 on shutdown (fd %s)", self.fd)
-        # ???? Is 'wantread' always correct?
-        # ???? Rather than waiting for a read, should we use a timer or
-        # ????       something?
-        raise _ml.TLSWantRead()
+        #DOCDOC this insanity.
+        while 1:
+            done = self.__con.shutdown() # may throw want*
+            if not done and self.__awaitingShutdown:
+                error("Shutdown returned zero twice from %s -- bailing",
+                      self.address)
+                done = 1
+            if done:
+                debug("Got a completed shutdown from %s", self.address)
+                self.__sock.close()
+                self.__state = None
+                self.shutdownFinished()
+                return
+            else:
+                trace("Shutdown returned zero -- entering read mode")
+                self.__awaitingShutdown = 1
+                #DODOC is this right?
+                if 1:
+                    self.finished = self.__readTooMuch
+                    self.expectRead(128)
+                raise _ml.TLSWantRead()
+
+    def __readTooMuch(self):
+        """DOCDOC"""
+        print "AAAARGH."
+        self.__sock.close()
+        self.state = None
 
     def __readFn(self):
         """Hook to implement read"""
         while 1:
             r = self.__con.read(1024) #may throw want*
             if r == 0:
-                trace("read returned 0 -- shutting down (fd %s)", self.fd)
+                if self.__awaitingShutdown:
+                    debug("read returned 0: shutdown complete (fd %s)",self.fd)
+                else:
+                    debug("read returned 0: shutting down (fd %s)", self.fd)
                 self.shutdown(err=0)
                 return
             else:
@@ -477,8 +495,8 @@
                 self.handleFail(retriable=1)
             self.remove()
         except _ml.TLSError, e:
-            if self.__state != self.__shutdownFn:
-                warn("Unexpected error: %s. Closing connection to %s.",
+            if self.__state != self.__shutdownFn and (not self.__awaitingShutdown):
+                warn("Unexpected TLS error: %s. Closing connection to %s.",
                      e, self.address)
                 self.shutdown(err=1, retriable=1)
                 self.__handleAll() # Try another round of the loop.
@@ -990,7 +1008,7 @@
                 statFn()
             except AttributeError:
                 pass
-        self._messageList = []
+        self.messageList = []
         self._curMessage = self._curHandle = None
 
     def shutdown(self, err=0, retriable=0):
@@ -1103,7 +1121,7 @@
             LOG.error("Unexpected socket error connecting to %s:%s: %s",
                       ip, port, e)
             EventStats.log.failedConnect() #FFFF addr
-            for m in self.messageList:
+            for m in con.messageList:
                 try:
                     m.failed(1)
                 except AttributeError:
@@ -1120,5 +1138,3 @@
     def onMessageReceived(self, msg):
         """Abstract function.  Called when we get a message"""
         pass
-
-

Index: Modules.py
===================================================================
RCS file: /home/minion/cvsroot/src/minion/lib/mixminion/server/Modules.py,v
retrieving revision 1.49
retrieving revision 1.50
diff -u -d -r1.49 -r1.50
--- Modules.py	24 Jul 2003 17:37:16 -0000	1.49
+++ Modules.py	21 Aug 2003 21:34:03 -0000	1.50
@@ -19,6 +19,7 @@
 import smtplib
 import socket
 import threading
+import time
 
 if sys.version_info[:2] >= (2,3):
     import textwrap
@@ -28,15 +29,18 @@
 import mixminion.BuildMessage
 import mixminion.Config
 import mixminion.Filestore
+import mixminion.Fragments
 import mixminion.Packet
 import mixminion.server.ServerQueue
 import mixminion.server.ServerConfig
 import mixminion.server.EventStats as EventStats
+import mixminion.server.PacketHandler
 from mixminion.Config import ConfigError, _parseBoolean, _parseCommand, \
-     _parseIntervalList
-from mixminion.Common import LOG, createPrivateDir, MixError, isSMTPMailbox, \
-     isPrintingAscii, readFile, waitForChildren
-from mixminion.Packet import ParseError, CompressedDataTooLong
+     _parseInterval, _parseIntervalList, _parseSize
+from mixminion.Common import LOG, MixError, ceilDiv, createPrivateDir, \
+     encodeBase64, isPrintingAscii, isSMTPMailbox, previousMidnight, \
+     readFile, waitForChildren
+from mixminion.Packet import ParseError, CompressedDataTooLong, uncompressData
 
 # Return values for processMessage
 DELIVER_OK = 1
@@ -123,6 +127,13 @@
             a nonstandard delivery queue, you don't need to implement this.)"""
         raise NotImplementedError("processMessage")
 
+    def sync(self):
+        """DOCDOC"""
+
+    def close(self):
+        """DOCDOC"""
+        pass
+
 class ImmediateDeliveryQueue:
     """Helper class usable as delivery queue for modules that don't
        actually want a queue.  Such modules should have very speedy
@@ -149,7 +160,7 @@
         except:
             LOG.error_exc(sys.exc_info(),
                                "Exception delivering message")
-            EventStats.log.unretriableDeliery() #FFFF
+            EventStats.log.unretriableDelivery() #FFFF
 
         return "<nil>"
 
@@ -232,6 +243,7 @@
                 stop = self.__stoppingevent.isSet()
                 if stop:
                     LOG.info("Delivery thread shutting down.")
+                    self.moduleManager.close()
                     return
                 self.moduleManager._sendReadyMessages()
                 waitForChildren(blocking=0)
@@ -447,9 +459,22 @@
             queue.sendReadyMessages()
 
     def getServerInfoBlocks(self):
+        """DOCDOC"""
         return [ m.getServerInfoBlock() for m in self.modules
                        if self.enabled.get(m.getName(),0) ]
 
+    def close(self):
+        """DOCDOC"""
+        #XXXX005 this method must be called on shutdown!
+        for module in self.enabled:
+            module.close()
+
+    def sync(self):
+        """DOCDOC"""
+        #XXXX005 this method must be called on reset!
+        for module in self.enabled:
+            module.sync()
+
 #----------------------------------------------------------------------
 class DropModule(DeliveryModule):
     """Null-object pattern: drops all messages it receives."""
@@ -472,6 +497,158 @@
         return DELIVER_OK
 
 #----------------------------------------------------------------------
+class FragmentModule(DeliveryModule):
+    """DOCDOC"""
+    def __init__(self):
+        DeliveryModule.__init__(self)
+        self._queue = None
+        self.manager = None
+        self.maxMessageSize = None
+        self.maxInterval = None
+        self.maxFragments = None
+    def getConfigSyntax(self):
+        return { "Delivery/Fragmented" :
+                 { 'Enabled' : ('REQUIRE',  _parseBoolean, "no"),
+                   'MaximumSize' : ('REQUIRE', _parseSize, None),
+                   'MaximumInterval' : ('ALLOW', _parseInterval, "2 days" )
+                   } }
+    def getRetrySchedule(self):
+        return [ ]
+    def configure(self, config, manager):
+        sec = config['Delivery/Fragmented']
+        if not sec.get("Enabled"):
+            manager.disableModule(self)
+            self.close()
+            return
+        self.maxMessageSize = sec['MaximumSize']
+        self.maxInterval = sec['MaximumInterval']
+        # How many packets could it take to encode a max-size message?
+        fp = mixminion.Fragments.FragmentParams(self.maxMessageSize, 0)
+        self.maxFragments = fp.nChunks * fp.n
+        self.manager = manager
+    def getServerInfoBlock(self):
+        return """[Delivery/Fragmented]
+                  Version: 0.1
+                  Maximum-Fragments: %s
+               """ % self.maxFragments
+    def getName(self):
+        return "FRAGMENT"
+    def getExitTypes(self):
+        return [ mixminion.Packet.FRAGMENT_TYPE ]
+    def createDeliveryQueue(self, queueDir):
+        self.close()
+        self._queue = FragmentDeliveryQueue(self, queueDir, self.manager)
+        return self._queue
+    def sync(self):
+        self._queue.pool.sync()
+    def close(self):
+        if self._queue:
+            self._queue.pool.close()
+            self._queue = None
+    
+class FragmentDeliveryQueue:
+    def __init__(self, module, directory, manager):
+        self.module = module
+        self.directory = directory
+        self.manager = manager
+        self.pool = mixminion.Fragments.FragmentPool(self.directory)
+
+    def queueDeliveryMessage(self, packet, retry=0, lastAttempt=0):
+        if not packet.isFragment():
+            LOG.warn("Dropping FRAGMENT packet with non-fragment payload.")
+            return
+        if packet.getAddress():
+            LOG.warn("Dropping FRAGMENT packet with spurious addressing info.")
+            return
+        # Should be instance of FragmentPayload.
+        payload = packet.getDecodedPayload()
+        assert payload is not None
+        self.pool.addFragment(payload)
+
+    def sendReadyMessages(self):
+        self.pool.unchunkMessages()
+        ready = self.pool.listReadyMessages()
+        for msgid in ready:
+            msg = self.pool.getReadyMessage(msgid)
+            try:
+                ssfm = mixminion.Packet.parseServerSideFragmentedMessage(msg)
+                del msg
+            except ParseError:
+                LOG.warn("Dropping malformed server-side fragmented message")
+                self.pool.markMessageCompleted(msgid, rejected=1)
+                continue
+            if len(ssfm.compressedContents) > self.module.maxMessageSize:
+                LOG.warn("Dropping over-long fragmented message")
+                self.pool.markMessageCompleted(msgid, rejected=1)
+                continue
+            
+            fm = _FragmentedDeliveryMessage(ssfm)
+            self.manager.queueDecodedMessage(fm)
+            self.pool.markMessageCompleted(msgid)
+
+        cutoff = previousMidnight(time.time()) - self.module.maxInterval
+        self.pool.expireMessages(cutoff)
+        
+
+class _FragmentedDeliveryMessage:
+    def __init__(self, ssfm):
+        self.m = ssfm
+        self.contents = None
+    def __getstate__(self):
+        return ( self.m , )
+    def __setstate__(self, s):
+        self.m = s[0]
+        self.contents = None
+    def isDelivery(self): return 1
+    def getExitType(self): return self.m.routingtype
+    def getAddress(self): return self.m.exitinfo
+    def getContents(self):
+        if self.contents is None: self.decode()
+        return self.contents
+    def isPlaintext(self): return 1
+    def isFragment(self): return 0
+    def isEncrypted(self): return 0
+    def isError(self):
+        if self.contents is None: self.decode()
+        return self.isError
+    def isOvercompressed(self):
+        if self.contents is None: self.decode()
+        return self.isOvercompressed
+    def isPrintingAscii(self):
+        if self.contents is None: self.decode()
+        return isPrintingAscii(self.contents, allowISO=1)
+    def getAsciiContents(self):
+        if self.contents is None: self.decode()
+        if isPrintingAscii(self.contents, allowISO=1):
+            return self.contents
+        else:
+            return encodeBase64(self.contents)
+    def getHeaders(self):
+        """DOCDOC"""
+        if self.contents is None:
+            self.decode()
+        assert self.headers is not None
+        return self.headers
+    def getTextEncodedMessage(self):
+        if self.isOvercompressed():
+            tp = 'LONG'
+        elif self.isPrintingAscii():
+            tp = 'TXT'
+        else:
+            tp = 'BIN'
+        return mixminion.Packet.TextEncodedMessage(self.contents, tp, None)
+    def decode(self):
+        maxLen = 20*len(self.m.compressedContents)
+        try:
+            c = uncompressData(self.m.uncompressedContents, maxLen)
+        except CompressedDataTooLong:
+            self.contents = self.m.uncompressedContents
+            self.headers = {}
+            return
+        self.contents, self.headers = \
+                       mixminion.Packet.parseMessageAndHeaders(c)
+        
+#----------------------------------------------------------------------
 class EmailAddressSet:
     """A set of email addresses stored on disk, for use in blacklisting email
        addresses.  The file format is line-based.  Lines starting with #
@@ -606,7 +783,12 @@
     def _formatEmailMessage(self, address, packet):
         """DOCDOC"""
         #DOCDOC implied fields
-        #   subject, fromTag, returnAddress, header
+        #   subject, fromTag, returnAddress, header, maxMessageSize
+
+        if len(packet.getContents()) > self.maxMessageSize:
+            LOG.warn("Dropping over-long message (message is %sb; max is %sb)",
+                     len(packet.getContents()), self.maxMessageSize)
+            return None
 
         headers = packet.getHeaders()
         subject = headers.get("SUBJECT", self.subject)
@@ -622,6 +804,9 @@
             morelines.append("In-Reply-To: %s\n" % headers['IN-REPLY-TO'])
         if headers.has_key("REFERENCES"):
             morelines.append("References: %s\n" % headers['REFERENCES'])
+        #FFFF In the long run, we may want to reject messages with
+        #FFFF unrecognized headers.  But while we're in alpha, it'd
+        #FFFF be too much of a headache.
 
         # Decode and escape the message, and get ready to send it.
         msg = _escapeMessageForEmail(packet)
@@ -652,6 +837,7 @@
     #   addr: our IP address, or "<Unknown IP>": for use in our boilerplate.
     def __init__(self):
         DeliveryModule.__init__(self)
+        self.maxMessageSize = None
         self.addresses = {}
 
     def getRetrySchedule(self):
@@ -669,7 +855,9 @@
                    'AddressFile' : ('ALLOW', None, None),
                    'ReturnAddress' : ('ALLOW', None, None),
                    'RemoveContact' : ('ALLOW', None, None),
-                   'SMTPServer' : ('ALLOW', None, 'localhost') }
+                   'SMTPServer' : ('ALLOW', None, 'localhost'),
+                   'MaximumSize' : ('ALLOW', _parseSize, "100K"),
+                   }
                  }
 
     def validateConfig(self, config, lines, contents):
@@ -709,6 +897,10 @@
         if not self.nickname:
             self.nickname = socket.gethostname()
         self.addr = config['Incoming/MMTP'].get('IP', "<Unknown IP>")
+        self.maxMessageSize = sec['MaximumSize']
+        if self.maxMessageSize < 32*1024:
+            LOG.warn("Ignoring low maximum message sze")
+            self.maxMessageSize = 32*1024
 
         # These fields are needed by MailBase
         self.subject = "Type III Anonymous Message"
@@ -772,6 +964,8 @@
 
         # Generate the boilerplate (FFFF Make this more configurable)
         msg = self._formatEmailMessage(address, packet)
+        if not msg:
+            return DELIVER_FAIL_NORETRY
 
         # Deliver the message
         return sendSMTPMessage(self.server, [address], self.returnAddress, msg)
@@ -779,11 +973,12 @@
 #----------------------------------------------------------------------
 class SMTPModule(DeliveryModule, MailBase):
     """Placeholder for real exit node implementation.
-       For now, use MixmasterSMTPModule"""
+       For now, use MixmasterSMTPModule DOCDOC"""
     def __init__(self):
         DeliveryModule.__init__(self)
     def getServerInfoBlock(self):
-        return "[Delivery/SMTP]\nVersion: 0.1\n"
+        return "[Delivery/SMTP]\nVersion: 0.1\nMaximum-Size: %s\n" % (
+            ceilDiv(self.maxMessageSize,1024) )
     def getName(self):
         return "SMTP"
     def getExitTypes(self):
@@ -821,7 +1016,7 @@
                    'FromTag' : ('ALLOW', None, "[Anon]"),
                    'SubjectLine' : ('ALLOW', None,
                                     'Type III Anonymous Message'),
-
+                   'MaximumSize' : ('ALLOW', _parseSize, "100K"),
                    }
                  }
 
@@ -862,6 +1057,11 @@
         else:
             self.header = "X-Anonymous: yes"
 
+        self.maxMessageSize = sec['MaximumSize']
+        if self.maxMessageSize < 32*1024:
+            LOG.warn("Ignoring low maximum message sze")
+            self.maxMessageSize = 32*1024
+
         manager.enableModule(self)
 
     def processMessage(self, packet):
@@ -881,6 +1081,9 @@
             return DELIVER_FAIL_NORETRY
 
         msg = self._formatEmailMessage(address, packet)
+        if not msg:
+            return DELIVER_FAIL_NORETRY
+
         # Send the message.
         return sendSMTPMessage(self.server, [address], self.returnAddress, msg)
 
@@ -918,6 +1121,7 @@
                    'FromTag' : ('ALLOW', None, "[Anon]"),
                    'SubjectLine' : ('ALLOW', None,
                                     'Type III Anonymous Message'),
+                   'MaximumSize' : ('ALLOW', _parseSize, "100K"),
                    }
                  }
 
@@ -942,6 +1146,10 @@
         self.options = tuple(cmd[1]) + ("-l", self.server)
         self.returnAddress = "nobody"
         self.header = "X-Anonymous: yes"
+        self.maxMessageSize = sec['MaximumSize']
+        if self.maxMessageSize < 32*1024:
+            LOG.warn("Ignoring low maximum message sze")
+            self.maxMessageSize = 32*1024
         manager.enableModule(self)
 
     def getName(self):
@@ -966,6 +1174,9 @@
             return DELIVER_FAIL_NORETRY
 
         msg = self._formatEmailMessage(info.email, packet)
+        if not msg:
+            return DELIVER_FAIL_NORETRY
+
         handle = self.tmpQueue.queueMessage(msg)
 
         cmd = self.command
@@ -1032,7 +1243,7 @@
 #----------------------------------------------------------------------
 
 def _escapeMessageForEmail(packet):
-    """Helper function: Given a message and tag, escape the message if
+    """Helper function: Given a DeliveryPacket, escape the message if
        it is not plaintext ascii, and wrap it in some standard
        boilerplate.  Add a disclaimer if the message is not ascii.
        Extracts headers if possible.  Returns a 2-tuple of message/headers.

Index: PacketHandler.py
===================================================================
RCS file: /home/minion/cvsroot/src/minion/lib/mixminion/server/PacketHandler.py,v
retrieving revision 1.20
retrieving revision 1.21
diff -u -d -r1.20 -r1.21
--- PacketHandler.py	24 Jul 2003 18:01:29 -0000	1.20
+++ PacketHandler.py	21 Aug 2003 21:34:03 -0000	1.21
@@ -108,8 +108,8 @@
            forwarded messages.  You must prevent timing attacks elsewhere."""
 
         # Break into headers and payload
-        msg = Packet.parseMessage(msg)
-        header1 = Packet.parseHeader(msg.header1)
+        pkt = Packet.parsePacket(msg)
+        header1 = Packet.parseHeader(pkt.header1)
         encSubh = header1[:Packet.ENC_SUBHEADER_LEN]
         header1 = header1[Packet.ENC_SUBHEADER_LEN:]
 
@@ -182,7 +182,7 @@
         header1 = subh.underflow + header1
 
         # Decrypt the payload.
-        payload = Crypto.lioness_decrypt(msg.payload,
+        payload = Crypto.lioness_decrypt(pkt.payload,
                               keys.getLionessKeys(Crypto.PAYLOAD_ENCRYPT_MODE))
 
         # If we're an exit node, there's no need to process the headers
@@ -198,7 +198,7 @@
             raise ContentError("Unrecognized Mixminion routing type")
 
         # Decrypt header 2.
-        header2 = Crypto.lioness_decrypt(msg.header2,
+        header2 = Crypto.lioness_decrypt(pkt.header2,
                            keys.getLionessKeys(Crypto.HEADER_ENCRYPT_MODE))
 
         # If we're the swap node, (1) decrypt the payload with a hash of
@@ -217,9 +217,9 @@
         address = Packet.parseIPV4Info(subh.routinginfo)
 
         # Construct the message for the next hop.
-        msg = Packet.Message(header1, header2, payload).pack()
+        pkt = Packet.Packet(header1, header2, payload).pack()
 
-        return RelayedPacket(address, msg)
+        return RelayedPacket(address, pkt)
 
 class RelayedPacket:
     """A packet that is to be relayed to another server; returned by
@@ -262,6 +262,8 @@
     # type -- until decode is called, None.  After decode is called,
     #     one of 'plain' (plaintext message), 'long' (overcompressed message),
     #     'enc' (encrypted message), or 'err' (malformed message).
+    # isfrag -- DOCDOC
+    # dPayload -- DOCDOC
     def __init__(self, routingType, routingInfo, applicationKey,
                  tag, payload):
         """Construct a new DeliveryPacket."""
@@ -277,6 +279,8 @@
         self.contents = None
         self.type = None
         self.headers = None#DOCDOC
+        self.isfrag = 0
+        self.dPayload = None
 
     def isDelivery(self):
         """Return true iff this packet is a delivery (non-relay) packet."""
@@ -293,6 +297,11 @@
         if self.type is None: self.decode()
         return self.contents
 
+    def getDecodedPayload(self):
+        """DOCDOC"""
+        if self.type is None: self.decode()
+        return self.dPayload
+
     def isPlaintext(self):
         """Return true iff this packet is a plaintext, forward packet."""
         if self.type is None: self.decode()
@@ -304,6 +313,11 @@
         if self.type is None: self.decode()
         return self.type == 'long'
 
+    def isFragment(self):
+        """DOCDOC"""
+        if self.type is None: self.decode()
+        return self.isfrag
+
     def isEncrypted(self):
         """Return true iff this packet may be an encrypted forward or
            reply packet."""
@@ -328,18 +342,25 @@
         message = self.payload
         self.contents = None
         try:
-            self.contents = mixminion.BuildMessage.decodePayload(message,
+            self.dPayload = mixminion.BuildMessage.decodePayload(message,
                                                                  self.tag)
-            if self.contents is None:
+            if self.dPayload is None:
                 # encrypted message
                 self.type = 'enc'
                 self.contents = message
                 self.headers = {}
-            else:
-                # forward message
+            elif self.dPayload.isSingleton():
+                # forward message, singleton.
                 self.type = 'plain'
+                body = self.dPayload.getUncompressedContents()
                 self.contents, self.headers = \
-                               Packet.parseMessageAndHeaders(self.contents)
+                               Packet.parseMessageAndHeaders(body)
+            else:
+                # forward message, fragment.
+                self.isfrag = 1
+                self.type = 'plain'
+                self.contents = message
+                self.headers = {}
         except Packet.CompressedDataTooLong, _:
             self.contents = Packet.parsePayload(message).getContents()
             self.type = 'long'
@@ -384,6 +405,9 @@
         elif self.isPrintingAscii():
             assert self.isPlaintext()
             tp = 'TXT'
+        elif self.isFragment():
+            assert self.isPlaintext()
+            tp = 'FRAG'
         else:
             assert self.isPlaintext()
             tp = 'BIN'

Index: ServerKeys.py
===================================================================
RCS file: /home/minion/cvsroot/src/minion/lib/mixminion/server/ServerKeys.py,v
retrieving revision 1.48
retrieving revision 1.49
diff -u -d -r1.48 -r1.49
--- ServerKeys.py	13 Jul 2003 03:45:35 -0000	1.48
+++ ServerKeys.py	21 Aug 2003 21:34:03 -0000	1.49
@@ -32,7 +32,7 @@
 from mixminion.Common import AtomicFile, LOG, MixError, MixFatalError, \
      ceilDiv, createPrivateDir, checkPrivateFile, formatBase64, formatDate, \
      formatTime, previousMidnight, readFile, secureDelete, tryUnlink, \
-     writeFile
+     UIError, writeFile
 
 #----------------------------------------------------------------------
 
@@ -941,7 +941,7 @@
             LOG.warn("No IP configured; guessing %s",fields['IP'])
         except IPGuessError, e:
             LOG.error("Can't guess IP: %s", str(e))
-            raise MixError("Can't guess IP: %s" % str(e))
+            raise UIError("Can't guess IP: %s" % str(e))
 
     # Fill in a stock server descriptor.  Note the empty Digest: and
     # Signature: lines.

Index: ServerMain.py
===================================================================
RCS file: /home/minion/cvsroot/src/minion/lib/mixminion/server/ServerMain.py,v
retrieving revision 1.87
retrieving revision 1.88
diff -u -d -r1.87 -r1.88
--- ServerMain.py	24 Jul 2003 17:37:16 -0000	1.87
+++ ServerMain.py	21 Aug 2003 21:34:03 -0000	1.88
@@ -624,7 +624,7 @@
         try:
             self.lockFile.acquire()
         except LockfileLocked:
-            raise MixFatalError("Another server seems to be running.")
+            raise UIError("Another server seems to be running.")
 
         # The pid file.
         self.pidFile = os.path.join(homeDir, "pid")
@@ -734,25 +734,6 @@
             return self.keyring.getNextKeyRotation()
         finally:
             if lock: self.keyring.unlock()
-
-    def scheduleRecurringComplexBackground(self, first, name, cb):
-        """DOCDOC"""
-        #????005 Is this worth it?
-        backgroundJob = [None]
-        scheduler = [None]
-        
-        def _bg(cb=cb, self=self, scheduler=scheduler, name=name):
-            next = cb()
-            if next is not None:
-                self.scheduleOnce(next, name, scheduler[0])
-
-        def _scheduler(backgroundJob=backgroundJob, self=self):
-            self.processingThread.addJob(backgroundJob[0])
-
-        backgroundJob[0] = _bg
-        scheduler[0] = _scheduler
-
-        self.scheduleOnce(first,name,_scheduler)
 
     def generateKeys(self):
         """Callback used to schedule key-generation"""

Index: ServerQueue.py
===================================================================
RCS file: /home/minion/cvsroot/src/minion/lib/mixminion/server/ServerQueue.py,v
retrieving revision 1.30
retrieving revision 1.31
diff -u -d -r1.30 -r1.31
--- ServerQueue.py	14 Aug 2003 19:37:25 -0000	1.30
+++ ServerQueue.py	21 Aug 2003 21:34:03 -0000	1.31
@@ -186,6 +186,8 @@
            it from disk if necessary."""
         if self.message is None:
             self.message = self.queue.store.getObject(self.handle)
+            #XXXX There's an error case where getObject returns none
+            #XXXX if the data is corrupt on disk.
         return self.message
 
 class DeliveryQueue:
@@ -363,7 +365,9 @@
             messages = []
             for h in self.store._metadata_cache.keys():
                 state = self.store.getMetadata(h)
-                if state.isPending():
+                if state is None:
+                    continue
+                elif state.isPending():
                     LOG.trace("     [%s] is pending delivery", h)
                     continue
                 elif state and state.isRemovable():
@@ -439,6 +443,9 @@
             try:
                 ds = self.store.getMetadata(handle)
             except KeyError:
+                ds = None
+
+            if ds is None:
                 # This should never happen
                 LOG.error_exc(sys.exc_info(),
                               "Handle %s had no state", handle)