[Author Prev][Author Next][Thread Prev][Thread Next][Author Index][Thread Index]

[minion-cvs] Check in pending work including fragments and bugfixes.



Update of /home/minion/cvsroot/src/minion/lib/mixminion
In directory moria.mit.edu:/tmp/cvs-serv26532/lib/mixminion

Modified Files:
	BuildMessage.py ClientMain.py Common.py Config.py Crypto.py 
	Filestore.py Fragments.py Main.py Packet.py ServerInfo.py 
	__init__.py benchmark.py test.py 
Log Message:
Check in pending work including fragments and bugfixes.

setup.py, README, __init__.py: 
- Bump version to 0.0.5alpha2.

BuildMessage: 
- Refactor message construction code to support fragment-then-build-packets
  path.
- Change decodePayload to return SingletonPayload or FragmentPayload.
- Remove an extraneous sha1() from payload decoding.

ClientMain:
- Minimal, kludged-up support for sending fragmented forward messages.
- Remove obsolete client queue code.

Common: 
- Clean up 'bad mode' error message
- Add check for [...] in basee64-encoded data.

Config:
- Add _parseSize to allow configuration files with values like "1M", "100K",
  etc.
- Check whether the user accidentally passes mixminiond.conf to a client.

Crypto:
- Make 'openNewFile' even more paranoid.
- List failing entropy sources when none is found.

Filestore:
- Add delItem to journaled DB.
- Catch corrupted data in file stores.

Fragments: 
- Debug, appease pychecker.

Main:
- Add missing parenthesis

Packet:
- Add notion of tag-less delivery types. (FRAGMENT)
- Renane 'Message' to 'Packet', which it should have been all along.
- Move uncompression logic from decodePayload into SingletonPayload.
- Add encoding for fragmentation of messages that will be decoded at the
  exit node.
- Add 'Message-type: fragmented' for text-encoded messages.
- Check for overlong mail headers and bad from addresses when sending.

benchmark:
- Debug.
- Add some more timing.

test:
- Add tests for catching [...] in base64'd data
- Add tests for sending bad email headers.
- Use new interface from BuildMessage.
- Propagate other new interfaces.
- Add tests for _parseSize.
- More fragment reassembly tests

MMTPServer:
- Change shutdown logic again.  This may be what finally kills our
  infinite busy-loop bug.
- Appease pychecker.

Modules: 
- Add .close() and .sync() methods to modules.
- Add delivery module to reassemble fragmented messages, and relay them on
  to other delivery modules.
- Implement limits on message sizes.

PacketHandler:
- Stop calling packets messages.
- Add 'getDecodedPayload' interface to DeliveryPacket; add fragments.

ServerKeys, ServerMain:
- Change some errors to UIError, drop some dead code.

ServerQueue: 
- Handle corrupted messages on disk.

mixminiond.conf, Modules, ServerInfo:
- Add MaximumSize to module configuration
- Add configuration and capabilities for fragment reconstruction.



Index: BuildMessage.py
===================================================================
RCS file: /home/minion/cvsroot/src/minion/lib/mixminion/BuildMessage.py,v
retrieving revision 1.54
retrieving revision 1.55
diff -u -d -r1.54 -r1.55
--- BuildMessage.py	18 Aug 2003 00:41:10 -0000	1.54
+++ BuildMessage.py	21 Aug 2003 21:34:02 -0000	1.55
@@ -23,16 +23,18 @@
            'buildReplyMessage', 'buildReplyBlock', 'checkPathLength',
            'encodeMessage', 'decodePayload' ]
 
-def encodeMessage(message, overhead, paddingPRNG=None):
+def encodeMessage(message, overhead, uncompressedFragmentPrefix="",
+                  paddingPRNG=None):
     """Given a message, compress it, fragment it into individual payloads,
        and add extra fields (size, hash, etc) as appropriate.  Return a list
        of strings, each of which is a message payload suitable for use in
        build*Message.
-
-       payload: the initial payload
+ 
+              payload: the initial payload
               overhead: number of bytes to omit from each payload,
                         given the type ofthe message encoding.
                         (0 or ENC_FWD_OVERHEAD)
+              uncompressedFragmentPrefix: DOCDOC 
               paddingPRNG: generator for padding.
 
        Note: If multiple strings are returned, be sure to shuffle them
@@ -59,6 +61,9 @@
         p.computeHash()
         return [ p.pack() ]
 
+    if uncompressedFragmentPrefix:
+        payload = uncompressedFragmentPrefix+payload
+
     # DOCDOC
     messageid = Crypto.getCommonPRNG().getBytes(20)
     p = mixminion.Fragments.FragmentationParams(len(payload), overhead)
@@ -71,12 +76,16 @@
         rawFragments[i] = None
     return fragments
 
+#XXXX006 Most of the build*Message functions here should be 'build*Packet'.
+#XXXX006 All of the build*Message functions should be replaced with their
+#XXXX006 _build*Message variants.
 def buildForwardMessage(payload, exitType, exitInfo, path1, path2,
                         paddingPRNG=None):
     # Compress, pad, and checksum the payload.
     if payload is not None and exitType != DROP_TYPE:
-        payloads = encodeMessage(payload, 0, paddingPRNG)
-        assert len(payloads) == 1
+        payloads = encodeMessage(payload, 0, "", paddingPRNG)
+        if len(payloads) != 1:
+            raise MixError("buildForwardMessage does not support fragmented payloads")
         payload = payloads[0]
         LOG.debug("Encoding forward message for %s-byte payload",len(payload))
     else:
@@ -127,8 +136,9 @@
 
 def buildEncryptedForwardMessage(payload, exitType, exitInfo, path1, path2,
                                  key, paddingPRNG=None, secretRNG=None):
-    payloads = encodeMessage(payload, ENC_FWD_OVERHEAD, paddingPRNG)
-    assert len(payloads) == 1
+    payloads = encodeMessage(payload, ENC_FWD_OVERHEAD, "", paddingPRNG)
+    if len(payloads) != 1:
+        raise UIError("No support yet for fragmented encrypted messages")
     return _buildEncryptedForwardMessage(payloads[0], exitType, exitInfo,
                                          path1, path2, key, paddingPRNG,
                                          secretRNG)
@@ -194,8 +204,9 @@
     return _buildMessage(payload, exitType, exitInfo, path1, path2,paddingPRNG)
 
 def buildReplyMessage(payload, path1, replyBlock, paddingPRNG=None):
-    payloads = encodeMessage(payload, 0, paddingPRNG)
-    assert len(payloads) == 1
+    payloads = encodeMessage(payload, 0, "", paddingPRNG)
+    if len(payloads) != 1:
+        raise UIError("No support yet for fragmented reply messages")
     return _buildReplyMessage(payloads[0], path1, replyBlock, paddingPRNG)
 
 def _buildReplyMessage(payload, path1, replyBlock, paddingPRNG=None):
@@ -337,11 +348,10 @@
     
 #----------------------------------------------------------------------
 # MESSAGE DECODING
-
-def decodePayload(payload, tag, key=None,
-                  userKeys=None):
-    """Given a 28K payload and a 20-byte decoding tag, attempt to decode and
-       decompress the original message.
+def decodePayload(payload, tag, key=None, userKeys=None):
+    """Given a 28K payload and a 20-byte decoding tag, attempt to decode the
+       original message.  Returns either a SingletonPayload instance, a
+       FragmentPayload instance, or None.
 
            key: an RSA key to decode encrypted forward messages, or None
            userKeys: a map from identity names to keys for reply blocks,
@@ -362,7 +372,7 @@
     # If the payload already contains a valid checksum, it's a forward
     # message.
     if _checkPayload(payload):
-        return _decodeForwardPayload(payload)
+        return parsePayload(payload)
 
     # If H(tag|userKey|"Validate") ends with 0, then the message _might_
     # be a reply message using H(tag|userKey|"Generate") as the seed for
@@ -391,7 +401,10 @@
 def _decodeForwardPayload(payload):
     """Helper function: decode a non-encrypted forward payload. Return values
        are the same as decodePayload."""
-    return _decodePayloadImpl(payload)
+    if not _checkPayload(payload):
+        raise MixError("Hash doesn't match")
+
+    return parsePayload(payload)
 
 def _decodeEncryptedForwardPayload(payload, tag, key):
     """Helper function: decode an encrypted forward payload.  Return values
@@ -417,7 +430,10 @@
     rest = rsaPart[SECRET_LEN:] + Crypto.lioness_decrypt(rest, k)
 
     # ... and then, check the checksum and continue.
-    return _decodePayloadImpl(rest)
+    if not _checkPayload(rest):
+        raise MixError("Invalid checksum on encrypted forward payload")
+
+    return parsePayload(rest)
 
 def _decodeReplyPayload(payload, secrets, check=0):
     """Helper function: decode a reply payload, given a known list of packet
@@ -431,10 +447,14 @@
         k = Crypto.Keyset(sec).getLionessKeys(Crypto.PAYLOAD_ENCRYPT_MODE)
         payload = Crypto.lioness_encrypt(payload, k)
         if check and _checkPayload(payload):
-            break
+            return parsePayload(payload)
 
-    # ... and then, check the checksum and continue.
-    return _decodePayloadImpl(payload)
+    # If 'check' is false, then we might still have a good payload.  If
+    # 'check' is true, we don't.
+    if check or not _checkPayload(payload):
+        raise MixError("Invalid checksum on reply payload")
+    
+    return parsePayload(payload)
 
 def _decodeStatelessReplyPayload(payload, tag, userKey):
     """Decode a (state-carrying) reply payload."""
@@ -663,38 +683,15 @@
         header2 = Crypto.lioness_encrypt(header2,hkey)
         payload = Crypto.lioness_encrypt(payload,pkey)
 
-    return Message(header1, header2, payload).pack()
+    return Packet(header1, header2, payload).pack()
 
 #----------------------------------------------------------------------
 # Payload-related helpers
 
-
 def _getRandomTag(rng):
     "Helper: Return a 20-byte string with the MSB of byte 0 set to 0."
     b = ord(rng.getBytes(1)) & 0x7f
     return chr(b) + rng.getBytes(TAG_LEN-1)
-
-def _decodePayloadImpl(payload):
-    """Helper: try to decode an encoded payload: checks only encoding,
-       not encryption."""
-    # Is the hash ok?
-    if not _checkPayload(payload):
-        raise MixError("Hash doesn't match")
-
-    # Parse the payload into its size, checksum, and body.
-    payload = parsePayload(payload)
-
-    if not payload.isSingleton():
-        raise MixError("Message fragments not yet supported")
-
-    # Uncompress the body.
-    contents = payload.getContents()
-    # If the payload would expand to be more than 20K long, and the
-    # compression factor is greater than 20, we warn of a possible zlib
-    # bomb.
-    maxLen = max(20*1024, 20*len(contents))
-
-    return uncompressData(contents, maxLength=maxLen)
 
 def _checkPayload(payload):
     'Return true iff the hash on the given payload seems valid'

Index: ClientMain.py
===================================================================
RCS file: /home/minion/cvsroot/src/minion/lib/mixminion/ClientMain.py,v
retrieving revision 1.106
retrieving revision 1.107
diff -u -d -r1.106 -r1.107
--- ClientMain.py	17 Aug 2003 21:09:56 -0000	1.106
+++ ClientMain.py	21 Aug 2003 21:34:02 -0000	1.107
@@ -19,6 +19,7 @@
 import signal
 import socket
 import stat
+import struct
 import sys
 import time
 import urllib2
@@ -159,7 +160,7 @@
         url = MIXMINION_DIRECTORY_URL
         LOG.info("Downloading directory from %s", url)
 
-        # XXXX005 refactor download logic.
+        # XXXX Refactor download logic.
 
         if hasattr(signal, 'alarm'):
             def sigalrmHandler(sig, _):
@@ -487,6 +488,24 @@
 
         return u.values()
 
+    def findByExitTypeAndSize(self, exitType, size, nPackets):
+        #XXXX006 remove this method.  It's not really a good interface,
+        #XXXX006 and only gets used by the kludgy choose-a-new-last-hop logic
+        """Return a server that supports exitType 'exittype' (currently must be
+           SMTP_TYPE), and messages of size 'size' bytes."""
+        assert exitType == SMTP_TYPE
+        servers = self.__find(self.byCapability['smtp'], time.time(),
+                              time.time()+24*60*60)
+        servers = servers[:]
+        mixminion.Crypto.getCommonPRNG().shuffle(servers)
+        for s in servers:
+            maxSize = s['Delivery/SMTP']['Maximum-Size'] * 1024
+            maxPackets = s['Delivery/Fragmented'].get('Maximum-Fragments',1)
+            if maxSize >= size and maxPackets >= packets:
+                return s
+
+        return None
+
     def clean(self, now=None):
         """Remove all expired or superseded descriptors from DIR/servers."""
 
@@ -1391,123 +1410,6 @@
             self.store.removeMessage(h)
         self.store.cleanQueue()
 
-## class ClientQueue:
-##     """A ClientQueue holds packets that have been scheduled for delivery
-##        but not yet delivered.  As a matter of policy, we queue messages if
-##        the user tells us to, or if deliver has failed and the user didn't
-##        tell us not to."""
-##     ## Fields:
-##     # dir -- a directory to store packets in.
-##     # prng -- an instance of mixminion.Crypto.RNG.
-##     ## Format:
-##     # The directory holds files with names of the form pkt_<handle>.
-##     # Each file holds pickled tuple containing:
-##     #           ("PACKET-0",
-##     #             a 32K string (the packet),
-##     #             an instance of IPV4Info (the first hop),
-##     #             the latest midnight preceding the time when this
-##     #                 packet was inserted into the queue
-##     #           )
-##     # XXXX change this to be OO; add nicknames.
-
-##     # XXXX write unit tests
-
-##     def __init__(self, directory, prng=None):
-##         """Create a new ClientQueue object, storing packets in 'directory'
-##            and generating random filenames using 'prng'."""
-##         self.dir = directory
-##         createPrivateDir(directory)
-##         if prng is not None:
-##             self.prng = prng
-##         else:
-##             self.prng = mixminion.Crypto.getCommonPRNG()
-
-##     def queuePacket(self, message, routing):
-##         """Insert the 32K packet 'message' (to be delivered to 'routing')
-##            into the queue.  Return the handle of the newly inserted packet."""
-##         clientLock()
-##         try:
-##             f, handle = self.prng.openNewFile(self.dir, "pkt_", 1)
-##             cPickle.dump(("PACKET-0", message, routing,
-##                           previousMidnight(time.time())), f, 1)
-##             f.close()
-##             return handle
-##         finally:
-##             clientUnlock()
-
-##     def getHandles(self):
-##         """Return a list of the handles of all messages currently in the
-##            queue."""
-##         clientLock()
-##         try:
-##             fnames = os.listdir(self.dir)
-##             handles = []
-##             for fname in fnames:
-##                 if fname.startswith("pkt_"):
-##                     handles.append(fname[4:])
-##             return handles
-##         finally:
-##             clientUnlock()
-
-##     def getPacket(self, handle):
-##         """Given a handle, return a 3-tuple of the corresponding
-##            32K packet, IPV4Info, and time of first queueing.  (The time
-##            is rounded down to the closest midnight GMT.)"""
-##         fname = os.path.join(self.dir, "pkt_"+handle)
-##         magic, message, routing, when = readPickled(fname)
-##         if magic != "PACKET-0":
-##             LOG.error("Unrecognized packet format for %s",handle)
-##             return None
-##         return message, routing, when
-
-##     def packetExists(self, handle):
-##         """Return true iff the queue contains a packet with the handle
-##            'handle'."""
-##         fname = os.path.join(self.dir, "pkt_"+handle)
-##         return os.path.exists(fname)
-
-##     def removePacket(self, handle):
-##         """Remove the packet named with the handle 'handle'."""
-##         fname = os.path.join(self.dir, "pkt_"+handle)
-##         secureDelete(fname, blocking=1)
-
-##     def inspectQueue(self, now=None):
-##         """Print a message describing how many messages in the queue are headed
-##            to which addresses."""
-##         if now is None:
-##             now = time.time()
-##         handles = self.getHandles()
-##         if not handles:
-##             print "[Queue is empty.]"
-##             return
-##         timesByServer = {}
-##         for h in handles:
-##             _, routing, when = self.getPacket(h)
-##             timesByServer.setdefault(routing, []).append(when)
-##         for s in timesByServer.keys():
-##             count = len(timesByServer[s])
-##             oldest = min(timesByServer[s])
-##             days = floorDiv(now - oldest, 24*60*60)
-##             if days < 1:
-##                 days = "<1"
-##             print "%2d messages for server at %s:%s (oldest is %s days old)"%(
-##                 count, s.ip, s.port, days)
-
-##     def cleanQueue(self, maxAge, now=None):
-##         """Remove all messages older than maxAge seconds from this
-##            queue."""
-##         if now is None:
-##             now = time.time()
-##         cutoff = now - maxAge
-##         remove = []
-##         for h in self.getHandles():
-##             when = self.getPacket(h)[2]
-##             if when < cutoff:
-##                 remove.append(h)
-##         LOG.info("Removing %s old messages from queue", len(remove))
-##         for h in remove:
-##             self.removePacket(h)
-
 class MixminionClient:
     """Access point for client functionality."""
     ## Fields:
@@ -1544,16 +1446,15 @@
                fails."""
         assert not (forceQueue and forceNoQueue)
 
-        message, firstHop = \
-                 self.generateForwardMessage(address, payload,
-                                             servers1, servers2)
+        for packet, firstHop in self.generateForwardMessage(
+            address, payload, servers1, servers2):
 
-        routing = firstHop.getRoutingInfo()
+            routing = firstHop.getRoutingInfo()
 
-        if forceQueue:
-            self.queueMessages([message], routing)
-        else:
-            self.sendMessages([message], routing, noQueue=forceNoQueue)
+            if forceQueue:
+                self.queueMessages([packet], routing)
+            else:
+                self.sendMessages([packet], routing, noQueue=forceNoQueue)
 
     def sendReplyMessage(self, payload, servers, surbList, forceQueue=0,
                          forceNoQueue=0):
@@ -1594,21 +1495,42 @@
 
         return block
 
-    def generateForwardMessage(self, address, payload, servers1, servers2):
-        """Generate a forward message, but do not send it.  Returns
-           a tuple of (the message body, a ServerInfo for the first hop.)
+    def generateForwardMessage(self, address, message, servers1, servers2):
+        """Generate a forward message, but do not send it.  Returns a
+           list of tuples of (the packet body, a ServerInfo for the
+           first hop.)
 
             address -- the results of a parseAddress call
-            payload -- the contents of the message to send  (None for DROP
+            message -- the contents of the message to send  (None for DROP
               messages)
             servers1,servers2 -- lists of ServerInfo.
             """
         routingType, routingInfo, _ = address.getRouting()
-        LOG.info("Generating payload...")
-        msg = mixminion.BuildMessage.buildForwardMessage(
-            payload, routingType, routingInfo, servers1, servers2,
-            self.prng)
-        return msg, servers1[0]
+
+        #XXXX006 we need to factor this long-message logic out to the
+        #XXXX006 common code.  For now, this is a temporary measure.
+        
+        # DOCDOC
+        fragmentedMessagePrefix = struct.pack("!HH", routingType,
+                                               len(routingInfo))+routingInfo
+        LOG.info("Generating payload(s)...")
+        r = []
+        payloads = mixminion.BuildMessage.encodeMessage(message, 0,
+                            fragmentedMessagePrefix)
+        if len(payloads) > 1:
+            routingType = mixminion.Packet.FRAGMENT_TYPE
+            routingInfo = ""
+            if servers2[-1]['Delivery/Fragmented'].get('Maximum-Fragments',1) < len(payloads):
+                raise UIError("Oops; %s won't reassable a message this large.",
+                              servers2.getNickname())
+
+        #XXXX006 don't use the same path for all the packets!
+        for p in payloads:
+            msg = mixminion.BuildMessage._buildForwardMessage(
+                p, routingType, routingInfo, servers1, servers2,
+                self.prng)
+            r.append( (msg, servers1[0]) )
+        return r
 
     def generateReplyMessage(self, payload, servers, surbList, now=None):
         """Generate a forward message, but do not send it.  Returns
@@ -1813,15 +1735,19 @@
         for msg in parseTextEncodedMessages(s, force=force):
             if msg.isOvercompressed() and not force:
                 LOG.warn("Message is a possible zlib bomb; not uncompressing")
-            if not msg.isEncrypted():
+            if msg.isFragment:
+                raise UIError("Sorry -- no support yet for client-side defragmentation.")
+            elif not msg.isEncrypted():
                 results.append(msg.getContents())
             else:
                 surbKeys = self.keys.getSURBKeys()
                 p = mixminion.BuildMessage.decodePayload(msg.getContents(),
                                                          tag=msg.getTag(),
                                                          userKeys=surbKeys)
-                if p:
-                    results.append(p)
+                if p and p.isSingleton():
+                    results.append(p.getUncompressedContents())
+                elif p:
+                    raise UIError("Sorry; no support yet for client-side defragmentation.")
                 else:
                     raise UIError("Unable to decode message")
         if isatty and not force:
@@ -2353,8 +2279,11 @@
 
     # Encode the headers early so that we die before reading the message if
     # they won't work.
-    headerStr = encodeMailHeaders(subject=h_subject, fromAddr=h_from,
-                                  inReplyTo=h_irt, references=h_references)
+    try:
+        headerStr = encodeMailHeaders(subject=h_subject, fromAddr=h_from,
+                                      inReplyTo=h_irt, references=h_references)
+    except MixError, e:
+        raise UIError("Invalid headers: %s"%e)
 
     if inFile in (None, '-') and '-' in parser.replyBlockFiles:
         raise UIError(
@@ -2367,11 +2296,22 @@
     parser.init()
     client = parser.client
 
+    #XXXX006 the logic here is wrong for large messages.  Instead of
+    #XXXX006 [parse pathspec, parse address, generate path, read message,
+    #XXXX006 encode message, build packets, send packets], it should be
+    #XXXX006 [parse pathspec, parse address, check pathspec, read message,
+    #XXXX006 encode message, generate paths, build packets, send packets].
     parser.parsePath()
 
     path1, path2 = parser.getForwardPath()
     address = parser.address
 
+    #XXXX006 remove this ad hoc check.
+    if not parser.usingSURBList and len(headerStr) > 2:
+        sware = path2[-1]['Server'].get('Software', "")
+        if sware.startswith("Mixminion 0.0.4") or sware.startswith("Mixminion 0.0.5alpha1"):
+            LOG.warn("Exit server %s is running old software that may not support headers correctly.", path2[-1].getNickname())
+
     # Get our surb, if any.
     if parser.usingSURBList and inFile in ('-', None):
         # We check to make sure that we have a valid SURB before reading
@@ -2386,11 +2326,11 @@
 
     # XXXX Clean up this ugly control structure.
     if address and inFile is None and address.getRouting()[0] == DROP_TYPE:
-        payload = None
+        message = None
         LOG.info("Sending dummy message")
     else:
         if address and address.getRouting()[0] == DROP_TYPE:
-            raise UIError("Cannot send a payload with a DROP message.")
+            raise UIError("Cannot send a message in a DROP packet")
 
         if inFile is None:
             inFile = "-"
@@ -2399,21 +2339,50 @@
             if inFile == '-':
                 print "Enter your message now.  Type %s when you are done."%(
                         EOF_STR)
-                payload = sys.stdin.read()
+                message = sys.stdin.read()
             else:
-                payload = readFile(inFile)
+                message = readFile(inFile)
         except KeyboardInterrupt:
             print "Interrupted.  Message not sent."
             sys.exit(1)
 
-    payload = "%s%s" % (headerStr, payload)
+    message = "%s%s" % (headerStr, message)
 
     if parser.usingSURBList:
         assert isinstance(path2, ListType)
-        client.sendReplyMessage(payload, path1, path2,
+        client.sendReplyMessage(message, path1, path2,
                                 forceQueue, forceNoQueue)
     else:
-        client.sendForwardMessage(address, payload, path1, path2,
+        # If our message is too large for the exit node to reconstruct,
+        # either choose a new exit node (for SMTP) or bail (for MBOX or
+        # other).
+        #
+        #XXXX006 This logic is wrong; when we refactor paths again, it'll
+        #XXXX006 have to be fixed.  
+        if message:
+            msgLen = len(message)
+            if address.exitType == SMTP_TYPE:
+                maxlen = path2[-1]["Delivery/SMTP"]["Maximum-Size"] * 1024
+                if msgLen > maxLen:
+                    LOG.warn("Message is too long for server %s--looking for another..."
+                             % path2[-1].getNickname())
+                    LOG.warn("(This behavior is a hack, and will go away in 0.0.6.)")
+                    server = parser.directory.findByExitTypeAndSize(SMTP_TYPE, msgLen, 1)
+                    if not server:
+                        raise UIError("No such server found")
+                    LOG.warn("Replacing %s with %s",
+                             path2[-1].getNickname(),
+                             server.getNickname())
+                    path2[-1] = server
+            elif address.exitType == MBOX_TYPE:
+                maxlen = path2[-1]["Delivery/MBOX"]["Maximum-Size"] * 1024
+                if msgLen > maxLen:
+                    raise UIError("Message is too long for MBOX server %s, and client-side reconstruction is not yet supported"
+                                  % path2[-1].getNickname())
+            elif msgLen > 32*1024:
+                LOG.warn("Delivering long message via unrecognized delivery type")
+        
+        client.sendForwardMessage(address, message, path1, path2,
                                   forceQueue, forceNoQueue)
 
 _PING_USAGE = """\

Index: Common.py
===================================================================
RCS file: /home/minion/cvsroot/src/minion/lib/mixminion/Common.py,v
retrieving revision 1.104
retrieving revision 1.105
diff -u -d -r1.104 -r1.105
--- Common.py	9 Aug 2003 02:53:31 -0000	1.104
+++ Common.py	21 Aug 2003 21:34:02 -0000	1.105
@@ -292,6 +292,8 @@
 
         if base64:
             try:
+                if stringContains(s[idx:endIdx], "\n[...]\n"):
+                    raise ValueError("Value seems to be truncated by a Mixminion-Mixmaster gateway")
                 value = binascii.a2b_base64(s[idx:endIdx])
             except (TypeError, binascii.Incomplete, binascii.Error), e:
                 raise ValueError(str(e))
@@ -379,7 +381,7 @@
     mode = st[stat.ST_MODE] & 0777
     if _CHECK_MODE and mode not in (0700, 0600):
         if not fix:
-            raise MixFilePermissionError("Bad mode %o on file %s" 
+            raise MixFilePermissionError("Bad permissions (mode %o) on file %s"
                                          % (mode & 0777, fn))
         newmode = {0:0600,0100:0700}[(mode & 0100)]
         LOG.warn("Repairing permissions on file %s" % fn)

Index: Config.py
===================================================================
RCS file: /home/minion/cvsroot/src/minion/lib/mixminion/Config.py,v
retrieving revision 1.53
retrieving revision 1.54
diff -u -d -r1.53 -r1.54
--- Config.py	30 Jul 2003 22:38:03 -0000	1.53
+++ Config.py	21 Aug 2003 21:34:02 -0000	1.54
@@ -106,7 +106,7 @@
 
 # re to match strings of the form '9 seconds', '1 month', etc.
 _interval_re = re.compile(r'''^(\d+\.?\d*|\.\d+)\s+
-                     (sec|second|min|minute|hour|day|week|mon|month|year)s?$''',
+                   (sec|second|min|minute|hour|day|week|mon|month|year)s?$''',
                           re.X)
 _seconds_per_unit = {
     'second': 1,
@@ -173,6 +173,25 @@
     except ValueError:
         raise ConfigError("Expected an integer but got %r" % (integer))
 
+# regular expression to match a size.
+_size_re = re.compile(r'^(\d+\.?\d*|\.\d+)\s*(k|kb|m|mb|b|byte|octet|)s?')
+_size_name_map = { '' : 1L, 'b' : 1L, 'byte' : 1L, 'octet' : 1L,
+                   'k' : 1L<<10, 'kb' : 1L<<10,
+                   'm' : 1L<<20, 'mb' : 1L<<20,
+                   'g' : 1L<<30, 'gb' : 1L<<30 }
+def _parseSize(size):
+    """Validation function.  Converts a config value to a size in octets.
+       Raises ConfigError on failure."""
+    s = size.strip().lower()
+    m = _size_re.match(s)
+    if not m: raise ConfigError("Invalid size %r"%size)
+    val = m.group(1)
+    unit = _size_name_map[m.group(2)]
+    if '.' in val:
+        return long(float(val)*unit)
+    else:
+        return long(val)*unit
+
 # Regular expression to match a dotted quad.
 _ip_re = re.compile(r'^\d+\.\d+\.\d+\.\d+$')
 
@@ -773,7 +792,21 @@
     def __init__(self, fname=None, string=None):
         _ConfigFile.__init__(self, fname, string)
 
-    #XXXX005 Make  prevalidate check to make sure there's no 'Server' section.
+
+
+    def prevalidate(self, contents):
+        # See if we've been passed a server configuration.
+        foundServer = 0
+        foundUser = 0
+        for s, _ in contents:
+            if s == 'Server':
+                foundServer = 1
+            elif s == 'User':
+                foundUser = 1
+        if foundServer and not foundUser:
+            raise ConfigError("Got a server configuration (mixminiond.conf), but expected a client configuration (.mixminionrc)")
+
+        return contents
 
     def validate(self, lines, contents):
         _validateHostSection(self['Host'])

Index: Crypto.py
===================================================================
RCS file: /home/minion/cvsroot/src/minion/lib/mixminion/Crypto.py,v
retrieving revision 1.52
retrieving revision 1.53
diff -u -d -r1.52 -r1.53
--- Crypto.py	14 Aug 2003 19:37:24 -0000	1.52
+++ Crypto.py	21 Aug 2003 21:34:02 -0000	1.53
@@ -602,7 +602,7 @@
         #return o / float(0x7fffffff)
         return o / 2147483647.0
 
-    def openNewFile(self, dir, prefix="", binary=1):
+    def openNewFile(self, dir, prefix="", binary=1, conflictPrefix=None):
         """Generate a new random filename within a directory with a given
            prefix within a directory, and open a new file within the directory
            with that filename.  Return 2-tuple of a file object and the
@@ -610,6 +610,8 @@
 
            Random portions are generated by choosing 8 random characters
            from the set 'A-Za-z0-9+-'.
+
+           DOCDOC conflictPrefix
            """
         flags = os.O_WRONLY|os.O_CREAT|os.O_EXCL
         mode = "w"
@@ -622,6 +624,9 @@
             if FS_IS_CASEI:
                 base = base.lower()
             fname = os.path.join(dir, "%s%s"%(prefix,base))
+            if conflictPrefix and os.path.exists(os.path.join(
+                dir,conflictPrefix+base)):
+                continue
             try:
                 fd = os.open(fname, flags, 0600)
                 return os.fdopen(fd, mode), base
@@ -735,7 +740,8 @@
             _ml.openssl_seed(_ml.win32_get_random_bytes(32))
             _theTrueRNG = _XorRNG(_OpensslRNG(), _WinTrueRNG())
         else:
-            LOG.fatal("No entropy source available")
+            LOG.fatal("No entropy source available: Tried all of %s",
+                      files)
             raise MixFatalError("No entropy source available")
     elif randFile is None:
         LOG.warn("Falling back to previous entropy source %s",

Index: Filestore.py
===================================================================
RCS file: /home/minion/cvsroot/src/minion/lib/mixminion/Filestore.py,v
retrieving revision 1.6
retrieving revision 1.7
diff -u -d -r1.6 -r1.7
--- Filestore.py	17 Aug 2003 21:09:56 -0000	1.6
+++ Filestore.py	21 Aug 2003 21:34:02 -0000	1.7
@@ -195,8 +195,10 @@
         """Returns (file, handle) tuple to create a new message.  Once
            you're done writing, you must call finishMessage to
            commit your changes, or abortMessage to reject them."""
-        file, handle = getCommonPRNG().openNewFile(self.dir, "inp_", 1)
-        return file, handle
+        while 1:
+            file, handle = getCommonPRNG().openNewFile(self.dir, "inp_", 1,
+                                                       "msg_")
+            return file, handle
 
     def finishMessage(self, f, handle, _ismeta=0):
         """Given a file and a corresponding handle, closes the file
@@ -307,11 +309,18 @@
     def __init__(self): pass
     def getObject(self, handle):
         """Given a message handle, read and unpickle the contents of the
-           corresponding message."""
+           corresponding message.  In rare error cases, defaults to 'None'.
+           """
         try:
             self._lock.acquire()
             f = open(os.path.join(self.dir, "msg_"+handle), 'rb')
-            res = cPickle.load(f)
+            try:
+                res = cPickle.load(f)
+            except cpickle.UnpicklingError, e:
+                LOG.error("Found damaged object %s in filestore %s: %s",
+                          handle, self.dir, str(e))
+                self.removeMessage(handle)
+                res = None
             f.close()
             return res
         finally:
@@ -394,7 +403,13 @@
             except KeyError:
                 pass
             f = open(fname, 'rb')
-            res = cPickle.load(f)
+            try:
+                res = cPickle.load(f)
+            except cpickle.UnpicklingError, e:
+                LOG.error("Found damaged metadata for %s in filestore %s: %s",
+                          handle, self.dir, str(e))
+                self.removeMessage(handle)
+                return None
             f.close()
             self._metadata_cache[handle] = res
             return res
@@ -720,7 +735,20 @@
             self._lock.release()
 
     def delItem(self, k):
-        raise NotImplemented
+        deletedOne = 0
+        try:
+            del self.journal[k]
+            self.sync()
+            deletedOne = 1
+        except KeyError:
+            pass
+        try:
+            del self.log[k]
+            deletedOne = 1
+        except KeyError:
+            pass
+        if not deletedOne:
+            raise KeyError
 
     def sync(self):
         self._lock.acquire()

Index: Fragments.py
===================================================================
RCS file: /home/minion/cvsroot/src/minion/lib/mixminion/Fragments.py,v
retrieving revision 1.4
retrieving revision 1.5
diff -u -d -r1.4 -r1.5
--- Fragments.py	18 Aug 2003 05:11:55 -0000	1.4
+++ Fragments.py	21 Aug 2003 21:34:02 -0000	1.5
@@ -171,7 +171,7 @@
                                  isChunk=0,
                                  chunkNum=None,
                                  overhead=fragmentPacket.getOverhead(),
-                                 insertedDate=previousMidnight(now),
+                                 insertedDate=today,
                                  nym=nym)
         # ... and allocate or find the MessageState for this message.
         state = self._getState(meta)
@@ -249,7 +249,7 @@
         # Delete all internal state; reload FragmentMetadatas from disk.
         self.store.loadAllMetadata(lambda: None)
         meta = self.store._metadata_cache
-        self.states = states = {}
+        self.states = {}
         badMessageIDs = {} # map from bad messageID to 1
         unneededHandles = [] # list of handles that aren't needed.
         for h, fm in meta.items():
@@ -422,7 +422,7 @@
         # chunkno -> idxwithinchunk -> (handle,fragmentmeta)
         self.fragmentsByChunk = []
         self.params = FragmentationParams(length, overhead)
-        for i in xrange(self.params.nChunks):
+        for _ in xrange(self.params.nChunks):
             self.fragmentsByChunk.append({})
         # chunkset: ready chunk num -> 1
         self.readyChunks = {}

Index: Main.py
===================================================================
RCS file: /home/minion/cvsroot/src/minion/lib/mixminion/Main.py,v
retrieving revision 1.55
retrieving revision 1.56
diff -u -d -r1.55 -r1.56
--- Main.py	24 Jul 2003 18:01:29 -0000	1.55
+++ Main.py	21 Aug 2003 21:34:02 -0000	1.56
@@ -288,7 +288,7 @@
         print str(e)
         print "(You can disable file permission checking by setting",
         print "the MM_NO_FILE_PARANOIA"
-        print "environment variable."
+        print "environment variable.)"
         sys.exit(1)
     except KeyboardInterrupt:
         print "Interrupted."

Index: Packet.py
===================================================================
RCS file: /home/minion/cvsroot/src/minion/lib/mixminion/Packet.py,v
retrieving revision 1.55
retrieving revision 1.56
diff -u -d -r1.55 -r1.56
--- Packet.py	18 Aug 2003 05:11:55 -0000	1.55
+++ Packet.py	21 Aug 2003 21:34:02 -0000	1.56
@@ -15,13 +15,13 @@
             'FRAGMENT_PAYLOAD_OVERHEAD', 'FWD_TYPE', 'FragmentPayload',
             'HEADER_LEN', 'IPV4Info', 'MAJOR_NO', 'MBOXInfo',
             'MBOX_TYPE', 'MINOR_NO', 'MIN_EXIT_TYPE',
-            'MIN_SUBHEADER_LEN', 'Message',
+            'MIN_SUBHEADER_LEN', 'Packet',
             'OAEP_OVERHEAD', 'PAYLOAD_LEN', 'ParseError', 'ReplyBlock',
             'ReplyBlock', 'SECRET_LEN', 'SINGLETON_PAYLOAD_OVERHEAD',
             'SMTPInfo', 'SMTP_TYPE', 'SWAP_FWD_TYPE', 'SingletonPayload',
             'Subheader', 'TAG_LEN', 'TextEncodedMessage',
             'parseHeader', 'parseIPV4Info',
-            'parseMBOXInfo', 'parseMessage', 'parseMessageAndHeaders',
+            'parseMBOXInfo', 'parsePacket', 'parseMessageAndHeaders',
             'parsePayload', 'parseReplyBlock',
             'parseReplyBlocks', 'parseSMTPInfo', 'parseSubheader',
             'parseTextEncodedMessages', 'parseTextReplyBlocks', 
@@ -86,6 +86,9 @@
 FRAGMENT_TYPE  = 0x0103  # Find the actual deliver info in the message payload 
 MAX_EXIT_TYPE  = 0xFFFF
 
+#DOCDOC
+_TYPES_WITHOUT_TAGS = { FRAGMENT_TYPE : 1 }
+
 class ParseError(MixError):
     """Thrown when a message or portion thereof is incorrectly formatted."""
     pass
@@ -93,17 +96,17 @@
 #----------------------------------------------------------------------
 # PACKET-LEVEL STRUCTURES
 
-def parseMessage(s):
-    """Given a 32K string, returns a Message object that breaks it into
+def parsePacket(s):
+    """Given a 32K string, returns a Packet object that breaks it into
        two headers and a payload."""
     if len(s) != MESSAGE_LEN:
         raise ParseError("Bad message length")
 
-    return Message(s[:HEADER_LEN],
+    return Packet(s[:HEADER_LEN],
                    s[HEADER_LEN:HEADER_LEN*2],
                    s[HEADER_LEN*2:])
 
-class Message:
+class Packet:
     """Represents a complete Mixminion packet
 
        Fields: header1, header2, payload"""
@@ -145,7 +148,7 @@
     underflow = ""
     if rlen < len(ri):
         ri, underflow = ri[:rlen], ri[rlen:]
-    if rt >= MIN_EXIT_TYPE and rlen < 20:
+    if rt >= MIN_EXIT_TYPE and not _TYPES_WITHOUT_TAGS.get(rt) and rlen < 20:
         raise ParseError("Subheader missing tag")
     return Subheader(major,minor,secret,digest,rt,ri,rlen,underflow)
 
@@ -189,15 +192,21 @@
         """Return the part of the routingInfo that contains the delivery
            address.  (Requires that routingType is an exit type.)"""
         assert self.routingtype >= MIN_EXIT_TYPE
-        assert len(self.routinginfo) >= TAG_LEN
-        return self.routinginfo[TAG_LEN:]
+        if _TYPES_WITHOUT_TAGS.get(self.routingtype):
+            return self.routinginfo
+        else:
+            assert len(self.routinginfo) >= TAG_LEN            
+            return self.routinginfo[TAG_LEN:]
 
     def getTag(self):
         """Return the part of the routingInfo that contains the decoding
            tag. (Requires that routingType is an exit type.)"""
         assert self.routingtype >= MIN_EXIT_TYPE
-        assert len(self.routinginfo) >= TAG_LEN
-        return self.routinginfo[:TAG_LEN]
+        if _TYPES_WITHOUT_TAGS.get(self.routingtype):
+            return ""
+        else:
+            assert len(self.routinginfo) >= TAG_LEN
+            return self.routinginfo[:TAG_LEN]
 
     def setRoutingInfo(self, info):
         """Change the routinginfo, and the routinglength to correspond."""
@@ -314,6 +323,17 @@
         """Returns the non-padding portion of this payload's data"""
         return self.data[:self.size]
 
+    def getUncompressedContents(self, force=None):
+        """Return the original message from this payload's data, removing
+           compression.  Raise CompressedDataTooLong if the data is too
+           long, and force is not true."""
+        d = self.data[:self.size]
+        if force:
+            return uncompressData(d)
+        else:
+            maxLen = max(20*1024, 20*len(d))
+            return uncompressData(d, maxLen)
+
     def pack(self):
         """Check for reasonable values of fields, and return a packed payload.
         """
@@ -365,8 +385,32 @@
     def getOverhead(self):
         return PAYLOAD_LEN - FRAGMENT_PAYLOAD_OVERHEAD - len(self.data)
 
+#----------------------------------------------------------------------
+#DOCDOC
+
+SSF_UNPACK_PATTERN = "!HH"
+SSF_PREFIX_LEN = 4
+
+def parseServerSideFragmentedMessage(s):
+    if len(s) < SSF_PREFIX_LEN:
+        raise ParseError("Server-side fragmented message too short")
     
+    rt, rl = struct.unpack(SSF_UNPACK_PATTERN, s[:SSF_PREFIX_LEN])
+    if len(s) < SSF_PREFIX_LEN + rl:
+        raise ParseError("Server-side fragmented message too short")
+    ri = s[SSF_PREFIX_LEN:SSF_PREFIX_LEN+rl]
+    comp = s[SSF_PREFIX_LEN+rl:]
+    return ServerSideFragmentedMessage(rt, ri, comp)
 
+class ServerSideFragmentedMessage:
+    """DOCDOC"""
+    def __init__(self, routingtype, routinginfo, compressedContents):
+        self.routingtype = routingtype
+        self.routinginfo = routinginfo
+        self.compressedContents = compressedContents
+    def pack(self):
+        return struct.pack(SSF_UNPACK_PATTERN, self.routingtype,
+                           len(self.routinginfo)) + self.compressedContents
 
 #----------------------------------------------------------------------
 # REPLY BLOCKS
@@ -592,7 +636,6 @@
 MESSAGE_ARMOR_NAME = "TYPE III ANONYMOUS MESSAGE"
  
 def parseTextEncodedMessages(msg,force=0):
-
     """Given a text-encoded Type III packet, return a list of
        TextEncodedMessage objects or raise ParseError.
        
@@ -620,6 +663,8 @@
             msgType = "BIN"
         elif d['Message-type'] == 'encrypted':
             msgType = "ENC"
+        elif d['Message-type'] == 'fragment':
+            msgType = "FRAG"
         else:
             raise ParseError("Unknown message type: %r"%d["Message-type"])
 
@@ -630,7 +675,7 @@
         if msgType == 'LONG' and force:
             msg = uncompressData(msg)
             
-        if msgType in ('TXT','BIN','LONG'):
+        if msgType in ('TXT','BIN','LONG','FRAG'):
             res.append(TextEncodedMessage(val, msgType))
         else:
             assert msgType == 'ENC'
@@ -650,8 +695,10 @@
     def __init__(self, contents, messageType, tag=None):
         """Create a new TextEncodedMessage given a set of contents, a
            messageType ('TXT', 'ENC', 'LONG', or 'BIN'), and optionally
-           a tag."""
-        assert messageType in ('TXT', 'ENC', 'LONG', 'BIN')
+           a tag.
+           DOCDOC FRAG
+           """
+        assert messageType in ('TXT', 'ENC', 'LONG', 'BIN', 'FRAG')
         assert tag is None or (messageType == 'ENC' and len(tag) == 20)
         self.contents = contents
         self.messageType = messageType
@@ -668,6 +715,9 @@
     def isOvercompressed(self):
         """Return true iff this is an overcompressed plaintext packet."""
         return self.messageType == 'LONG'
+    def isFragment(self):
+        """DOCDOC"""
+        return self.messageType == 'FRAG'
     def getContents(self):
         """Return the (unencoded) contents of this packet."""
         return self.contents
@@ -681,7 +731,8 @@
                    { 'TXT' : "plaintext",
                      'LONG' : "overcompressed",
                      'BIN' : "binary",
-                     'ENC' : "encrypted" }[self.messageType]),
+                     'ENC' : "encrypted",
+                     'FRAG' : 'fragment' }[self.messageType]),
                   ]
         if self.messageType == 'ENC':
             fields.append(("Decoding-handle",
@@ -693,14 +744,20 @@
 #----------------------------------------------------------------------
 # Header encoding
 
+#DOCDOC
+MAX_HEADER_LEN = 900
+
 def encodeMailHeaders(subject=None, fromAddr=None, inReplyTo=None,
                       references=None):
-    """DOCDOC"""
-    #XXXX005 check values
+    """DOCDOC.  Raise MixError on failure."""
     headers = {}
     if subject:
         headers['SUBJECT'] = subject
     if fromAddr:
+        for badchar in '"[]:':
+            if badchar in fromAddr:
+                raise MixError("Forbidden character %r in from address"%
+                               badchar)
         headers['FROM'] = fromAddr
     if inReplyTo:
         headers['IN-REPLY-TO'] = inReplyTo
@@ -709,7 +766,7 @@
     return encodeMessageHeaders(headers)
 
 def encodeMessageHeaders(headers):
-    """DOCDOC dict
+    """DOCDOC dict, max size
 
        Requires that headers are in acceptable format.
     """
@@ -720,6 +777,8 @@
         item = "%s:%s\n"%(k,v)
         if not HEADER_RE.match(item) or "\n" in k or "\n" in v:
             raise ParseError("Invalid value for %s header"%k)
+        if len(v) > 900:
+            raise ParseError("The %s header is too long"%k.lower())
         items.append(item)
     items.append("\n")
     return "".join(items)
@@ -736,7 +795,7 @@
         m = HEADER_RE.match(msg)
         if m:
             k,v = m.groups()
-            if len(v) > 900:
+            if len(v) > MAX_HEADER_LEN:
                 LOG.warn("Rejecting overlong exit header %r:%r...",k,v[:30])
             else:
                 headers[k] = v

Index: ServerInfo.py
===================================================================
RCS file: /home/minion/cvsroot/src/minion/lib/mixminion/ServerInfo.py,v
retrieving revision 1.52
retrieving revision 1.53
diff -u -d -r1.52 -r1.53
--- ServerInfo.py	30 Jul 2003 22:38:03 -0000	1.52
+++ ServerInfo.py	21 Aug 2003 21:34:02 -0000	1.53
@@ -85,9 +85,17 @@
                      },
         "Delivery/MBOX" : {
                      "Version": ("REQUIRE", None, None),
+                     # XXXX006 change to 'REQUIRE'
+                     "Maximum-Size": ("ALLOW", C._parseInt, "32"),
                      },
         "Delivery/SMTP" : {
                      "Version": ("REQUIRE", None, None),
+                     # XXXX006 change to 'REQUIRE'
+                     "Maximum-Size": ("ALLOW", C._parseInt, "32"),
+                     },
+        "Delivery/Fragmented" : {
+                     "Version": ("REQUIRE", None, None),
+                     "Maximum-Fragments": ("REQUIRE", C._parseInt, None),
                      },
         # We never read these values, except to see whether we should
         # regenerate them.  Depending on these options would violate
@@ -244,6 +252,8 @@
         # XXXX This next check is highly bogus.
         if self['Outgoing/MMTP'].get('Version'):
             caps.append('relay')
+        if self['Delivery/Fragmented'].get('Version'):
+            caps.append('frag')
         return caps
 
     def isValidated(self):

Index: __init__.py
===================================================================
RCS file: /home/minion/cvsroot/src/minion/lib/mixminion/__init__.py,v
retrieving revision 1.43
retrieving revision 1.44
diff -u -d -r1.43 -r1.44
--- __init__.py	13 Jul 2003 03:45:34 -0000	1.43
+++ __init__.py	21 Aug 2003 21:34:02 -0000	1.44
@@ -7,7 +7,7 @@
    """
 
 # This version string is generated from setup.py; don't edit it.
-__version__ = "0.0.5alpha1"
+__version__ = "0.0.5alpha2"
 # This 5-tuple encodes the version number for comparison.  Don't edit it.
 # The first 3 numbers are the version number; the 4th is:
 #          0 for alpha
@@ -18,7 +18,7 @@
 # The 4th or 5th number may be a string.  If so, it is not meant to
 #   succeed or precede any other sub-version with the same a.b.c version
 #   number.
-version_info = (0, 0, 5, 0, 1)
+version_info = (0, 0, 5, 0, 2)
 __all__ = [ 'server', 'directory' ]
 
 def version_tuple_to_string(t):

Index: benchmark.py
===================================================================
RCS file: /home/minion/cvsroot/src/minion/lib/mixminion/benchmark.py,v
retrieving revision 1.46
retrieving revision 1.47
diff -u -d -r1.46 -r1.47
--- benchmark.py	14 Aug 2003 19:37:24 -0000	1.46
+++ benchmark.py	21 Aug 2003 21:34:03 -0000	1.47
@@ -24,7 +24,7 @@
 import mixminion.server.ServerQueue
 
 from mixminion.BuildMessage import _buildHeader, buildForwardMessage, \
-     compressData, uncompressData, encodePayloads, decodePayload
+     compressData, uncompressData, encodeMessage, decodePayload
 from mixminion.Common import secureDelete, installSIGCHLDHandler, \
      waitForChildren, formatBase64, Lockfile
 from mixminion.Crypto import *
@@ -569,23 +569,27 @@
     print "#=============== END-TO-END ENCODING =================="
     shortP = "hello world"
     prng = AESCounterPRNG()
-    p = encodePayloads(shortP, 0, prng)[0]
+    p = encodeMessage(shortP, 0, prng)[0]
     t = prng.getBytes(20)
-    print "Decode short payload", timeit(
+    print "Check short payload", timeit(
+        lambda p=p: mixminion.BuildMessage._checkPayload(p), 1000)
+    print "Decode without uncompress", timeit(
         lambda p=p,t=t: decodePayload(p, t), 1000)
+    print "Decode short payload", timeit(
+        lambda p=p,t=t: decodePayload(p, t).getUncompressedContents(), 1000)
 
     k20 = prng.getBytes(20*1024)
-    p = encodePayloads(k20, 0, prng)[0]
+    p = encodeMessage(k20, 0, prng)[0]
     t = prng.getBytes(20)
     print "Decode 20K payload", timeit(
-        lambda p=p,t=t: decodePayload(p, t), 1000)
+        lambda p=p,t=t: decodePayload(p, t).getUncompressedContents(), 1000)
 
     comp = "x"*(20*1024)
-    p = encodePayloads(comp, 0, prng)[0]
+    p = encodeMessage(comp, 0, prng)[0]
     t = prng.getBytes(20)
     def decode(p=p,t=t):
         try:
-            decodePayload(p,t)
+            decodePayload(p,t).getUncompressedContents()
         except CompressedDataTooLong:
             pass
     print "Decode overcompressed payload", timeit(decode, 1000)
@@ -1001,7 +1005,7 @@
 #----------------------------------------------------------------------
 def timeAll(name, args):
     if 0:
-        testLeaks_FEC()
+        timeEfficiency()
         return
     fecTiming()    
     cryptoTiming()

Index: test.py
===================================================================
RCS file: /home/minion/cvsroot/src/minion/lib/mixminion/test.py,v
retrieving revision 1.146
retrieving revision 1.147
diff -u -d -r1.146 -r1.147
--- test.py	18 Aug 2003 05:11:55 -0000	1.146
+++ test.py	21 Aug 2003 21:34:03 -0000	1.147
@@ -620,6 +620,13 @@
                     if not mode:
                         self.assertEndsWith(b, "\n")
 
+        # Test unarmor and [...]
+        enc = armorText(inp2*50, "MUNGED", [], base64=1)
+        enc = enc.split("\n")
+        enc[-10:-5] = [ "[...]" ]
+        enc = "\n".join(enc)
+        self.assertRaises(ValueError, unarmorText, enc, ["MUNGED"], 1)
+
         # Test base64fn and concatenation.
         enc1 = armorText(inp2, "THIS THAT", [("H-64", "0")], 0)
         enc2 = armorText(inp2, "THIS THAT", [("H-64", "1")], 1)
@@ -1262,13 +1269,13 @@
         # correctly.
         # (Generate a nice random string to make sure we're slicing right.)
         m = Crypto.prng("HappyFunAESKey!!", 32768)
-        msg = parseMessage(m)
+        msg = parsePacket(m)
         self.assert_(msg.pack() == m)
         self.assert_(msg.header1 == m[:2048])
         self.assert_(msg.header2 == m[2048:4096])
         self.assert_(msg.payload == m[4096:])
-        self.failUnlessRaises(ParseError, parseMessage, m[:-1])
-        self.failUnlessRaises(ParseError, parseMessage, m+"x")
+        self.failUnlessRaises(ParseError, parsePacket, m[:-1])
+        self.failUnlessRaises(ParseError, parsePacket, m+"x")
 
     def test_ipv4info(self):
         # Check the IPV4Info structure used to hold the addresses for the
@@ -1454,27 +1461,33 @@
 
     def testHeaders(self):
         emh = encodeMessageHeaders
+        eMh = encodeMailHeaders
         pmh = parseMessageAndHeaders
         eq = self.assertEquals
 
+        # Nil headers
         eq(emh({}), "\n")
 
+        # Parse and unparse headers
         encoded = emh({"ABC": "x y zzy", "X": "<42>"}) + "Hello whirled"
         eq(encoded, "ABC:x y zzy\nX:<42>\n\nHello whirled")
         m,h = pmh(encoded)
         eq(m, "Hello whirled")
         eq(h, {"ABC": "x y zzy", "X": "<42>"})
 
+        # Handle leading newlines properly
         encoded = emh({}) + "A short message"
         eq(encoded, "\nA short message")
         eq(("A short message", {}), pmh(encoded))
         eq(("\nA short message", {}), pmh("\n"+encoded))
 
+        # Parse headers with problems.
         try:
             suspendLog()
             m,h = pmh("A message that doesn't start with a header")
             eq(m, "A message that doesn't start with a header")
             eq(h, {})
+            # Over-long header gets skipped.
             encoded = "X:"+("Y"*1000)+"\nY:X\n\nZ"
             m,h = pmh(encoded)
             eq(h, {"Y": "X"})
@@ -1482,6 +1495,14 @@
         finally:
             resumeLog()
 
+        eq("\n", eMh())
+        eq("SUBJECT:foo\n\n", eMh(subject='foo'))
+        eq("FROM:fred@bluto\n\n", eMh(fromAddr='fred@bluto'))
+        eq("FROM:a\nIN-REPLY-TO:b\nREFERENCES:c\nSUBJECT:d\n\n",
+           eMh(fromAddr='a',inReplyTo='b',references='c',subject='d'))
+        self.assertRaises(MixError, eMh, fromAddr='fred"')
+        self.assertRaises(MixError, eMh, subject=("X"*910))
+
 #----------------------------------------------------------------------
 class HashLogTests(TestCase):
     def test_hashlog(self):
@@ -1638,6 +1659,7 @@
 
     def test_payload_helpers(self):
         "test helpers for payload encoding"
+        
         p = AESCounterPRNG()
         for _ in xrange(10):
             t = BuildMessage._getRandomTag(p)
@@ -1649,7 +1671,7 @@
 
         for m in (p.getBytes(3000), p.getBytes(10000), "", "q", "blznerty"):
             for ov in 0, 42-20+16: # encrypted forward overhead
-                plds = BuildMessage.encodeMessage(m,ov,p)
+                plds = BuildMessage.encodeMessage(m,ov,"",p)
                 assert len(plds) == 1
                 pld = plds[0]
                 self.assertEquals(28*1024, len(pld)+ov)
@@ -1659,18 +1681,15 @@
                 self.assert_(BuildMessage._checkPayload(pld))
                 self.assertEquals(len(comp), ord(pld[0])*256+ord(pld[1]))
                 self.assertEquals(0, ord(pld[0])&0x80)
-                self.assertEquals(m, BuildMessage._decodePayloadImpl(pld))
-
-        self.failUnlessRaises(MixError, BuildMessage._decodePayloadImpl, b)
+                py = mixminion.Packet.parsePayload(pld)
+                self.assertEquals(m, py.getUncompressedContents())
 
-        # Check fragments (not yet supported)
-        pldFrag = chr(ord(pld[0])|0x80)+pld[1:]
-        self.failUnlessRaises(MixError,BuildMessage._decodePayloadImpl,pldFrag)
+        self.failUnlessRaises(MixError, BuildMessage._decodeForwardPayload, b)
 
         # Check impossibly long messages
         pldSize = "\x7f\xff"+pld[2:] #sha1(pld[22:])+pld[22:]
         self.failUnlessRaises(ParseError,
-                              BuildMessage._decodePayloadImpl,pldSize)
+                              BuildMessage._decodeForwardPayload,pldSize)
 
     def test_buildheader_1hop(self):
         bhead = BuildMessage._buildHeader
@@ -1848,7 +1867,7 @@
         message = consMsg(secrets1, secrets2, h1, h2, pld)
 
         self.assertEquals(len(message), mixminion.Packet.MESSAGE_LEN)
-        msg = mixminion.Packet.parseMessage(message)
+        msg = mixminion.Packet.parsePacket(message)
         head1, head2, payload = msg.header1, msg.header2, msg.payload
         self.assert_(h1 == head1)
 
@@ -1875,7 +1894,7 @@
         ### Reply case
         message = consMsg(secrets1, None, h1, h2, pld)
         self.assertEquals(len(message), mixminion.Packet.MESSAGE_LEN)
-        msg = mixminion.Packet.parseMessage(message)
+        msg = mixminion.Packet.parsePacket(message)
         head1, head2, payload = msg.header1, msg.header2, msg.payload
         self.assert_(h1 == head1)
 
@@ -1929,13 +1948,14 @@
             p = lioness_decrypt(p,ks.getLionessKeys(PAYLOAD_ENCRYPT_MODE))
 
         if decoder is None:
-            p = BuildMessage._decodeForwardPayload(p)
+            p = BuildMessage._decodeForwardPayload(p).getUncompressedContents()
         else:
             p = decoder(p, tag)
 
         self.assertEquals(payload, p[:len(payload)])
 
     def test_build_fwd_message(self):
+        
         bfm = BuildMessage.buildForwardMessage
         befm = BuildMessage.buildEncryptedForwardMessage
         payload = "Hello!!!!"
@@ -1954,14 +1974,13 @@
                                (self.server2.getRoutingInfo().pack(),
                                 "Goodbye") ),
                              "Hello!!!!")
-
         m = bfm(payload, 500, "Goodbye", [self.server1], [self.server3])
-
+        
         messages = {}
 
         def decoder0(p,t,messages=messages):
             messages['fwd'] = (p,t)
-            return BuildMessage._decodeForwardPayload(p)
+            return BuildMessage._decodeForwardPayload(p).getUncompressedContents()
 
         self.do_message_test(m,
                              ( (self.pk1,), None,
@@ -1972,7 +1991,7 @@
                                ("Goodbye",) ),
                              "Hello!!!!",
                              decoder=decoder0)
-
+        
         # Drop message gets no tag, random payload
         m = bfm(payload, DROP_TYPE, "", [self.server1], [self.server3])
 
@@ -2002,8 +2021,8 @@
                      rsakey)
             def decoder(p,t,key=rsakey,messages=messages):
                 messages['efwd'+str(key.get_modulus_bytes())] = (p,t)
-                return BuildMessage._decodeEncryptedForwardPayload(p,t,key)
-
+                payload = BuildMessage._decodeEncryptedForwardPayload(p,t,key)
+                return payload.getUncompressedContents()
             self.do_message_test(m,
                                  ( (self.pk1, self.pk2), None,
                                    (FWD_TYPE, SWAP_FWD_TYPE),
@@ -2015,7 +2034,7 @@
                                     "Phello") ),
                                  payload,
                                  decoder=decoder)
-
+            
         # Now do more tests on final messages: is the format as expected?
         p,t = messages['fwd']
         self.assertEquals(20, len(t))
@@ -2069,7 +2088,8 @@
         messages = {}
         def decoder(p,t,secrets=secrets_1,messages=messages):
             messages['repl'] = p,t
-            return BuildMessage._decodeReplyPayload(p,secrets)
+            payload = BuildMessage._decodeReplyPayload(p,secrets)
+            return payload.getUncompressedContents()
 
         self.do_message_test(m,
                              ((self.pk3, self.pk1), None,
@@ -2145,8 +2165,10 @@
 
         def decoder2(p,t,messages=messages):
             messages['srepl'] = p,t
-            return BuildMessage._decodeStatelessReplyPayload(p,t,
+            payload = BuildMessage._decodeStatelessReplyPayload(p,t,
                                                          "Tyrone Slothrop")
+            return payload.getUncompressedContents()
+        
         self.do_message_test(m,
                              ((self.pk3, self.pk1), None,
                               (FWD_TYPE,SWAP_FWD_TYPE),
@@ -2183,6 +2205,13 @@
         self.assertStartsWith(p[22:], comp)
         self.assertEquals(sha1(p[22:]), p[2:22])
 
+    def assertPayloadDecodesTo(self, decoded, encoded, tag, key, userKeys):
+        p = BuildMessage.decodePayload(encoded, tag, key, userKeys)
+        if p is None:
+            self.assertEquals(decoded, p)
+        else:
+            self.assertEquals(decoded, p.getUncompressedContents())
+
     def test_decoding(self):
         # Now we create a bunch of fake payloads and try to decode them.
 
@@ -2195,8 +2224,10 @@
         self.assertEquals(len(comp), 109)
         encoded1 = (comp+ "RWE/HGW"*4096)[:28*1024-22]
         encoded1 = '\x00\x6D'+sha1(encoded1)+encoded1
+
         # Forward message.
-        self.assertEquals(payload, BuildMessage._decodeForwardPayload(encoded1))
+        self.assertEquals(payload,
+                          BuildMessage._decodeForwardPayload(encoded1).getUncompressedContents())
         # Encoded forward message
         efwd = (comp+"RWE/HGW"*4096)[:28*1024-22-38]
         efwd = '\x00\x6D'+sha1(efwd)+efwd
@@ -2207,7 +2238,7 @@
         efwd_t = efwd_rsa[:20]
         efwd_p = efwd_rsa[20:]+efwd_lioness
         self.assertEquals(payload,
-             BuildMessage._decodeEncryptedForwardPayload(efwd_p,efwd_t,rsa1))
+             BuildMessage._decodeEncryptedForwardPayload(efwd_p,efwd_t,rsa1).getUncompressedContents())
 
 ##      # Stateful reply
 ##      secrets = [ "Is that you, Des","troyer?Rinehart?" ]
@@ -2232,7 +2263,7 @@
             key = Keyset(k).getLionessKeys(PAYLOAD_ENCRYPT_MODE)
             m = lioness_decrypt(m,key)
         self.assertEquals(payload,
-                     BuildMessage._decodeStatelessReplyPayload(m,tag,passwd))
+                     BuildMessage._decodeStatelessReplyPayload(m,tag,passwd).getUncompressedContents())
         repl2, repl2tag = m, tag
 
         # Okay, now let's try out 'decodePayload' (and thereby test its
@@ -2245,51 +2276,46 @@
             ##for d in (sdict, None): # stateful replies disabled.
                 for p in (passwd, None):
                     for tag in ("zzzz"*5, "pzzz"*5):
-                        self.assertEquals(payload,
-                                          decodePayload(encoded1, tag, pk, p))
+                        self.assertPayloadDecodesTo(payload,
+                                                    encoded1, tag, pk, p)
 
         # efwd
         ##for d in (sdict, None): # stateful replies disabled
         if 1:
             for p in (passwd, None):
-                self.assertEquals(payload,
-                        decodePayload(efwd_p, efwd_t, rsa1, p))
-                self.assertEquals(None,
-                        decodePayload(efwd_p, efwd_t, None, p))
-                self.assertEquals(None,
-                        decodePayload(efwd_p, efwd_t, self.pk2, p))
+                self.assertPayloadDecodesTo(payload, efwd_p, efwd_t, rsa1, p)
+                self.assertPayloadDecodesTo(None, efwd_p, efwd_t, None, p)
+                self.assertPayloadDecodesTo(None, efwd_p, efwd_t, self.pk2, p)
 
         # Stateful replies are disabled.
 
         # repl (stateless)
         for pk in (self.pk1, None):
             #for sd in (sdict, None): #Stateful replies are disabled
-                self.assertEquals(payload,
-                            decodePayload(repl2, repl2tag, pk, passwd))
+                self.assertPayloadDecodesTo(payload,
+                                            repl2, repl2tag, pk, passwd)
                 try:
                     suspendLog("INFO")
-                    self.assertEquals(payload,
-                                      decodePayload(repl2, repl2tag, pk,
-                                     userKeys={ "Fred": passwd, "": "z"*20 }))
+                    self.assertPayloadDecodesTo(payload, repl2, repl2tag, pk,
+                                     userKeys={ "Fred": passwd, "": "z"*20 })
                 finally:
                     s = resumeLog()
                 self.assert_(stringContains(s,
                                "Decoded reply message to identity 'Fred'"))
-                self.assertEquals(None,
-                            decodePayload(repl2, repl2tag, pk, "Bliznerty"))
-                self.assertEquals(None,
-                            decodePayload(repl2, repl2tag, pk,
-                                   userKeys={ "Fred":"Bliznerty", "":"z"*20}))
-                self.assertEquals(None,
-                            decodePayload(repl2, repl2tag, pk, None))
+                self.assertPayloadDecodesTo(None,
+                                            repl2, repl2tag, pk, "Bliznerty")
+                self.assertPayloadDecodesTo(None, repl2, repl2tag, pk,
+                                   userKeys={ "Fred":"Bliznerty", "":"z"*20})
+                self.assertPayloadDecodesTo(None, repl2, repl2tag, pk, None)
 
         # Try decoding a payload that looks like a zlib bomb.  An easy way to
         # get such a payload is to compress 25K of zeroes.
         nils = "\x00"*(25*1024)
         overcompressed_payload = \
-             BuildMessage.encodeMessage(nils, 0, AESCounterPRNG())[0]
+             BuildMessage.encodeMessage(nils, 0)[0]
+        p = BuildMessage.decodePayload(overcompressed_payload, "X"*20)
         self.failUnlessRaises(CompressedDataTooLong,
-             BuildMessage.decodePayload, overcompressed_payload, "X"*20)
+                              p.getUncompressedContents)
 
         # And now the cases that fail hard.  This can only happen on:
         #   1) *: Hash checks out, but zlib or size is wrong.  Already tested.
@@ -2306,13 +2332,11 @@
         for p in (passwd, None):
             self.failUnlessRaises(MixError, decodePayload,
                                   efwd_pbad, efwd_t, rsa1, p)
-            self.assertEquals(None,
-                      decodePayload(efwd_pbad, efwd_t, self.pk2, p))
+            self.assertPayloadDecodesTo(None, efwd_pbad, efwd_t, self.pk2, p)
 
         # Bad repl
         repl2_bad = repl2[:-1] + chr(ord(repl2[-1])^0xaa)
-        self.assertEquals(None,
-                  decodePayload(repl2_bad, repl2tag, None, passwd))
+        self.assertPayloadDecodesTo(None, repl2_bad, repl2tag, None, passwd)
 
 #----------------------------------------------------------------------
 # Having tested BuildMessage without using PacketHandler, we can now use
@@ -3922,6 +3946,14 @@
         self.assert_(int(h2) == int(m120) == 7200)
         m120.reduce()
         self.assertEquals(str(m120), "2 hours")
+        # size
+        self.assertEquals(C._parseSize(" 30 bytes"), 30L)
+        self.assertEquals(C._parseSize(" 3000 b"), 3000L)
+        self.assertEquals(C._parseSize("50k"), 50*1024L)
+        self.assertEquals(C._parseSize("50k "), 50*1024L)
+        self.assertEquals(C._parseSize("50 "), 50L)
+        self.assertEquals(C._parseSize("50"), 50L)
+        self.assertEquals(C._parseSize("12.3M"), long(12.3*(1<<20L)))
         # IntervalList
         self.assertEquals(C._parseIntervalList(" 5 sec, 1 min, 2 hours"),
                           [ 5, 60, 7200 ])
@@ -5008,7 +5040,9 @@
                            "Retry": [0,0,0,0],
                            "SubjectLine":'foobar',
                            "FromTag" : '[NotReally]',
-                           'MixCommand' : ('ls', ['-z'])}},
+                           'MixCommand' : ('ls', ['-z']),
+                           "MaximumSize" : 32*1024,
+                           }},
                          manager)
         queue = manager.queues['SMTP_MIX2']
         replaceFunction(os, "spawnl")
@@ -5210,7 +5244,9 @@
                            "ReturnAddress": "returnaddress@x",
                            "RemoveContact": "removeaddress@x",
                            "Retry": [0,0,0,3],
-                           "SMTPServer" : "foo.bar.baz"}}, manager)
+                           "SMTPServer" : "foo.bar.baz",
+                           "MaximumSize" : 32*1024,
+                           }}, manager)
         # Check that the address file was read correctly.
         self.assertEquals({'mix-minion': 'mixminion@thishost',
                            'mixdaddy':   'mixminion@thathost',
@@ -6230,8 +6266,8 @@
 
             for fn, args, kwargs in getCalls():
                 self.assertEquals(fn, "buildForwardMessage")
-                self.assertEquals(args[0:3],
-                                  (payload, SMTP_TYPE, "joe@cledonism.net"))
+                self.assertEquals(args[1:3],
+                                  (SMTP_TYPE, "joe@cledonism.net"))
                 self.assert_(len(args[3]) == len(args[4]) == 2)
                 self.assertEquals(["Lola", "Joe", "Alice", "Joe"],
                      [x['Server']['Nickname'] for x in args[3]+args[4]])
@@ -6399,17 +6435,11 @@
         em = mixminion.BuildMessage.encodeMessage
         pp = mixminion.Packet.parsePayload
         M1 = Crypto.getCommonPRNG().getBytes(1024*30)
-        M2 = Crypto.getCommonPRNG().getBytes(1024*150)
-        M3 = Crypto.getCommonPRNG().getBytes(1024*200)
-        M4 = Crypto.getCommonPRNG().getBytes(1024*900)
-        M5 = Crypto.getCommonPRNG().getBytes(1024*900)
+        M2 = Crypto.getCommonPRNG().getBytes(1024*900)
         pkts1 = [ pp(x) for x in em(M1,0) ]
-        pkts2 = [ pp(x) for x in em(M2,38) ]
-        pkts3 = [ pp(x) for x in em(M3,0) ]
-        pkts4 = [ pp(x) for x in em(M4,0) ]
-        pkts5 = [ pp(x) for x in em(M5,0) ]
-        self.assertEquals(map(len, [pkts1,pkts2,pkts3,pkts4,pkts5]),
-                          [3, 11, 11, 66, 66])
+        pkts2 = [ pp(x) for x in em(M2,0) ]
+        self.assertEquals(map(len, [pkts1,pkts2]),
+                          [3, 66])
         
         loc = mix_mktemp()
         pool = mixminion.Fragments.FragmentPool(loc)
@@ -6444,39 +6474,72 @@
         # Reconstruct: large message, stop half-way, reload, finish.
         ####
         # enough for chunk1 -- 17 messages
-        for p in pkts4[5:22]: pool.addFragment(p)
+        for p in pkts2[5:22]: pool.addFragment(p)
         # enough for half of chunk2: 8 messages
-        for p in pkts4[22:30]: pool.addFragment(p)
+        for p in pkts2[22:30]: pool.addFragment(p)
         pool.unchunkMessages()
         # close and re-open messages
         pool.close()
         pool = mixminion.Fragments.FragmentPool(loc)
         # Enough for the rest of message 4...  8 from 2, 17 from 3.
-        for p in pkts4[36:44]+pkts4[49:66]:
+        for p in pkts2[36:44]+pkts2[49:66]:
             pool.addFragment(p)
             pool.unchunkMessages()
         self.assertEquals(len(pool.listReadyMessages()), 1)
         mid = pool.listReadyMessages()[0]
-        self.assertLongStringEq(M4, uncompressData(pool.getReadyMessage(mid)))
+        self.assertLongStringEq(M2, uncompressData(pool.getReadyMessage(mid)))
         pool.markMessageCompleted(mid)
         pool.close()
         pool = mixminion.Fragments.FragmentPool(loc)
-        pool.addFragment(pkts4[48])
+        pool.addFragment(pkts2[48])
         self.assertEquals([], pool.store.getAllMessages())
 
         # free some RAM
-        del M4
-        del M1
-        del pkts4
-        del pkts1
+        M1 = M2 = None
+        pkts1 = pkts2 = None
 
         ####
         # Try an interleaved case with two smallish msgs and one big one.
         # Provoke an error in the big one part way through.
         ####
-        
+        M1 = Crypto.getCommonPRNG().getBytes(1024*150)
+        M2 = Crypto.getCommonPRNG().getBytes(1024*200) 
+        M3 = Crypto.getCommonPRNG().getBytes(1024*900)
+        pkts1 = [ pp(x) for x in em(M1,38) ]
+        pkts2 = [ pp(x) for x in em(M2,0) ]
+        pkts3 = [ pp(x) for x in em(M3,0) ]
+        self.assertEquals(map(len,[pkts1,pkts2,pkts3]),
+                          [11,11,66])
+        for i in xrange(7,11):
+            pool.addFragment(pkts1[i])
+            pool.addFragment(pkts2[11-i])
+        for i in xrange(26):
+            pool.addFragment(pkts3[i])
+        pool.unchunkMessages()
+        self.assertEquals(pool.store.count(), 4+4+1+4)
+        # Change index of message to impossible value, make sure m3 gets
+        # dropped.
+        pkts3[60].index = 66
+        pool.addFragment(pkts3[60])
+        self.assertEquals(pool.store.count(), 4+4)
+        for i in xrange(40,55):
+            pool.addFragment(pkts3[i])
+        self.assertEquals(pool.store.count(), 4+4)
 
-        
+        for i in xrange(1,5):
+            pool.addFragment(pkts1[i])
+            pool.addFragment(pkts2[11-i])
+        pool.unchunkMessages()
+        msgids = pool.listReadyMessages()
+        self.assertUnorderedEq([pkts1[0].msgID, pkts2[0].msgID],
+                               msgids)
+        self.assertLongStringEq(M1,
+                 uncompressData(pool.getReadyMessage(pkts1[0].msgID)))
+        pool.markMessageCompleted(pkts1[0].msgID)
+
+        # Force pkts2 to rejected by expiring it.
+        pool.expireMessages(time.time()+48*60*60)
+        self.assertEquals(pool.listReadyMessages(), [])
         
 #----------------------------------------------------------------------
 def testSuite():
@@ -6485,34 +6548,37 @@
     loader = unittest.TestLoader()
     tc = loader.loadTestsFromTestCase
 
-    if 1:
+    if 0:
         suite.addTest(tc(FragmentTests))
         return suite
+    testClasses = [MiscTests,
+                   MinionlibCryptoTests,
+                   MinionlibFECTests,
+                   CryptoTests,
+                   PacketTests,
+                   LogTests,
+                   FileParanoiaTests,
+                   ConfigFileTests,
+                   HashLogTests,
+                   BuildMessageTests,
+                   PacketHandlerTests,
+                   FilestoreTests,
+                   FragmentTests,
+                   QueueTests,
+                   EventStatsTests,
+                   ModuleTests,
 
-    suite.addTest(tc(MiscTests))
-    suite.addTest(tc(MinionlibCryptoTests))
-    suite.addTest(tc(MinionlibFECTests))
-    suite.addTest(tc(CryptoTests))
-    suite.addTest(tc(PacketTests))
-    suite.addTest(tc(LogTests))
-    suite.addTest(tc(FileParanoiaTests))
-    suite.addTest(tc(ConfigFileTests))
-    suite.addTest(tc(HashLogTests))
-    suite.addTest(tc(BuildMessageTests))
-    suite.addTest(tc(PacketHandlerTests))
-    suite.addTest(tc(FilestoreTests))
-    suite.addTest(tc(QueueTests))
-    suite.addTest(tc(EventStatsTests))
-    suite.addTest(tc(ModuleTests))
+                   ClientMainTests,
+                   ServerKeysTests,
+                   ServerMainTests,
 
-    suite.addTest(tc(ClientMainTests))
-    suite.addTest(tc(ServerKeysTests))
-    suite.addTest(tc(ServerMainTests))
+                   # These tests are slowest, so we do them last.
+                   ModuleManagerTests,
+                   ServerInfoTests,
+                   MMTPTests]
 
-    # These tests are slowest, so we do them last.
-    suite.addTest(tc(ModuleManagerTests))
-    suite.addTest(tc(ServerInfoTests))
-    suite.addTest(tc(MMTPTests))
+    for cl in testClasses:
+        suite.addTest(tc(cl))
 
     return suite