[Author Prev][Author Next][Thread Prev][Thread Next][Author Index][Thread Index]

[minion-cvs] More debugging and testing for fragment pools; also doc...



Update of /home/minion/cvsroot/src/minion/lib/mixminion
In directory moria.mit.edu:/tmp/cvs-serv31037/lib/mixminion

Modified Files:
	Fragments.py Packet.py test.py 
Log Message:
More debugging and testing for fragment pools; also document Fragments module

Index: Fragments.py
===================================================================
RCS file: /home/minion/cvsroot/src/minion/lib/mixminion/Fragments.py,v
retrieving revision 1.3
retrieving revision 1.4
diff -u -d -r1.3 -r1.4
--- Fragments.py	18 Aug 2003 00:41:10 -0000	1.3
+++ Fragments.py	18 Aug 2003 05:11:55 -0000	1.4
@@ -17,14 +17,30 @@
 
 __all__ = [ "FragmentPool", "FragmentationParams" ]
 
-MAX_FRAGMENTS_PER_CHUNK = 32
-EXP_FACTOR = 1.33333333333
+# Largest number of allowed fragments in a single chunk.  Must be a power
+# of two.
+MAX_FRAGMENTS_PER_CHUNK = 16
+# Minimum proportion of extra packets to add to each chunk.
+EXP_FACTOR = 1.3333333333333333
 
 class FragmentationParams:
-    """DOCDOC"""
+    """Class to track the padding, chunking, and fragmentation required
+       for a message of a given length to be packed into fragments of a
+       given capacity."""
     ## Fields:
-    # k, n, length, fec, chunkSize, fragmentCapacity, dataFragments,
-    # totalFragments, paddingLen, paddedLen
+    # length -- size (in octets) of the original message.
+    # k -- number of input packets for each chunk (also number of packets
+    #    from a chunk required to reconstruct it.)
+    # n -- number of output packets for each chunk.
+    # fragmentCapacity -- number of bytes we can fit in a single fragment.
+    #    (28KB - overhead)
+    # chunkSize -- number of input bytes in a single chunk.  Equal to
+    #    k*fragmentCapacity.
+    # nChunks -- number of total chunks in message.
+    # paddingLen -- bytes added to message before fragmentation
+    # paddedLen -- length of message after padding; equal to chunkSize*nChunks
+    # fec -- mixminion._minionlib.FEC object to encode/decode chunks;
+    #    lazy-initialized by getFEC()
     def __init__(self, length, overhead):
         assert overhead in (0, ENC_FWD_OVERHEAD)
         self.length = length
@@ -35,7 +51,7 @@
         assert minFragments >= 2
         # Number of data fragments per chunk.
         self.k = 2
-        while self.k < minFragments and self.k < 16:
+        while self.k < minFragments and self.k < MAX_FRAGMENTS_PER_CHUNK:
             self.k *= 2
         # Number of chunks.
         self.nChunks = ceilDiv(minFragments, self.k)
@@ -51,16 +67,23 @@
         self.fec = None
 
     def getFEC(self):
+        """Return a FEC object to fragment or defragment messages with
+           these parameters"""
         if self.fec is None:
             self.fec = _getFEC(self.k, self.n)
         return self.fec
 
     def getPosition(self, index):
-        """DOCDOC"""
+        """Return a chunk,index-within-chunk tuple for a packet with index
+           'index'"""
         chunk, pos = divmod(index, self.n)
         return chunk, pos
 
     def getFragments(self, s, paddingPRNG=None):
+        """Given a string of length self.length, whiten it, pad it,
+           and fragmment it.  Return a list of the fragments, in order.
+           (Note -- after building the fragment packets, be sure to shuffle
+           them into a random order.)"""
         if paddingPRNG is None:
             paddingPRNG = getCommonPRNG()
 
@@ -87,73 +110,202 @@
         return fragments
 
 # ======================================================================
-#DOCDOC this entire section
-
 class FragmentPool:
-    """DOCDOC"""
-    ##
-    # messages : map from 
+    """Class to hold and manage fragmented messages as they are
+       reconstructed."""
+    ## Fields:
+    # states -- map from messageid to MessageState.  Reconstructed by
+    #    rescan().
+    # db -- instance of FragmentDB.
+    # store -- instance of StringMetadataStore.  The messages are either
+    #    the contents of invidual fragments or reconstructed chunks.
+    #    The metadata are instances of FragmentMetadata.
     def __init__(self, dir):
-        self.store = mixminion.Filestore.StringMetadataStore(dir,create=1,
-                                                             scrub=1)
-        self.db = _FragmentDB(dir+"_db")
+        """Open a FragmentPool storing fragments in 'dir' and records of
+           old messages in 'dir_db'.
+           """
+        self.store = mixminion.Filestore.StringMetadataStore(
+            dir,create=1,scrub=1)
+        self.db = FragmentDB(dir+"_db")
         self.rescan()
 
     def sync(self):
+        """Flush pending changes to disk."""
         self.db.sync()
 
     def close(self):
+        """Release open resources for this pool."""
         self.db.close()
+        del self.db
+        del self.store
+        del self.states
 
-    def getState(self, fm):
+    def addFragment(self, fragmentPacket, nym=None, now=None):
+        """Given an instance of mixminion.Packet.FragmentPayload, record
+           the fragment if appropriate and update the state of the
+           fragment pool if necessary.
+
+              fragmentPacket -- the new fragment to add.
+              nym -- a string representing the identity that received this
+                  fragment.  [Tracking nyms is important, to prevent an
+                  attack where we send 2 fragments to 'MarkTwain' and 2
+                  fragments to 'SClemens', and see that the message is
+                  reconstructed.]
+        """
+        if now is None:
+            now = time.time()
+        today = previousMidnight(now)
+
+        # If the message has already been rejected or completed, we can
+        # drop this packet.
+        s = self.db.getStatusAndTime(fragmentPacket.msgID)
+        if s:
+            LOG.debug("Dropping fragment of %s message %r",
+                      s[0].lower(), fragmentPacket.msgID)
+            return
+
+        # Otherwise, create a new metadata object for this fragment...
+        meta = FragmentMetadata(messageid=fragmentPacket.msgID,
+                                 idx=fragmentPacket.index,
+                                 size=fragmentPacket.msgLen,
+                                 isChunk=0,
+                                 chunkNum=None,
+                                 overhead=fragmentPacket.getOverhead(),
+                                 insertedDate=previousMidnight(now),
+                                 nym=nym)
+        # ... and allocate or find the MessageState for this message.
+        state = self._getState(meta)
         try:
-            return self.states[fm.messageid]
-        except KeyError:
-            state = MessageState(messageid=fm.messageid,
-                                 hash=fm.hash,
-                                 length=fm.size,
-                                 overhead=fm.overhead)
-            self.states[fm.messageid] = state
-            return state
+            # Check whether we can/should add this message, but do not
+            # add it.
+            state.addFragment(None, meta, noop=1)
+            # No exception was thrown; queue the message.
+            h = self.store.queueMessageAndMetadata(fragmentPacket.data, meta)
+            # And *now* update the message state.
+            state.addFragment(h, meta)
+        except MismatchedFragment:
+            # Remove the other fragments, mark msgid as bad.
+            self._deleteMessageIDs({ meta.messageid : 1}, "REJECTED", now)
+        except UnneededFragment:
+            # Discard this fragment; we don't need it.
+            LOG.debug("Dropping unneeded fragment %s of message %r",
+                      fragmentPacket.index, fragmentPacket.msgID)
+
+    def getReadyMessage(self, msgid):
+        """Return the complete message associated with messageid 'msgid'.
+           (If no such complete message is found, return None.)  The
+           resulting message is unwhitened, but not uncompressed."""
+        s = self.states.get(msgid)
+        if not s or not s.isDone():
+            return None
+
+        hs = s.getChunkHandles()
+        msg = "".join([self.store.messageContents(h) for h in hs])
+        msg = unwhiten(msg[:s.params.length])
+        return msg                      
+
+    def markMessageCompleted(self, msgid, rejected=0):
+        """Release all resources associated with the messageid 'msgid', and
+           reject future packets for that messageid.  If 'rejected', the
+           message has been abandoned and not sent; otherwise, the message
+           has been sent.
+        """
+        s = self.states.get(msgid)
+        if not s or not s.isDone():
+            return None
+        if rejected:
+            self._deleteMessageIDs({msgid: 1}, "REJECTED")
+        else:
+            self._deleteMessageIDs({msgid: 1}, "COMPLETED")
+
+    def listReadyMessages(self):
+        """Return a list of all messageIDs that have been completely
+           reconstructed."""
+        return [ msgid
+                 for msgid,state in self.states.items()
+                 if state.isDone() ]
+
+    def unchunkMessages(self):
+        """If any messages are ready for partial or full reconstruction,
+           reconstruct as many of their chunks as possible."""
+        for msgid, state in self.states.items():
+            if not state.hasReadyChunks():
+                continue
+            state.reconstruct(self.store)
+            
+    def expireMessages(self, cutoff):
+        """Remove all pending messages that were first inserted before
+           'cutoff'. """
+        expiredMessageIDs = {}
+        for s in self.states.values():
+            if s.inserted < cutoff:
+                expiredMessageIDs[s.messageid] = 1
+        self._deleteMessageIDs(expiredMessageIDs, "REJECTED")
         
     def rescan(self):
+        """Check all fragment metadata objects on disk, and reconstruct our
+           internal view of message states.
+        """
+        # Delete all internal state; reload FragmentMetadatas from disk.
         self.store.loadAllMetadata(lambda: None)
         meta = self.store._metadata_cache
         self.states = states = {}
-        badMessageIDs = {}
-        unneededHandles = []
+        badMessageIDs = {} # map from bad messageID to 1
+        unneededHandles = [] # list of handles that aren't needed.
         for h, fm in meta.items():
             if not fm:
                 LOG.debug("Removing fragment %s with missing metadata", h)
                 self.store.removeMessage(h)
+                continue
             try:
                 mid = fm.messageid
                 if badMessageIDs.has_key(mid):
+                    # We've already rejected fragments associated with this ID.
                     continue
-                state = self.getState(fm)
+                # All is well; try to register the fragment/chunk.  If it's
+                # redundant or inconsistent, raise an exception.
+                state = self._getState(fm)
                 if fm.isChunk:
                     state.addChunk(h, fm)
                 else:
                     state.addFragment(h, fm)
             except MismatchedFragment:
+                # Mark the message ID for this fragment as inconsistent.
                 badMessageIDs[mid] = 1
             except UnneededFragment:
+                LOG.warn("Found redundant fragment %s in pool", h)
+                # Remember that this message is unneeded.
                 unneededHandles.append(h)
 
+        # Check for fragments superseded by chunks -- those are unneeded too.
+        for s in self.states.values():
+            unneededHandles.extend(s.getUnneededFragmentHandles())
+
+        # Delete unneeded fragments.
         for h in unneededHandles:
-            fm = meta[h]
+            try:
+                fm = meta[h]
+            except KeyError:
+                continue
             LOG.debug("Removing unneeded fragment %s from message ID %r",
                       fm.idx, fm.messageid)
             self.store.removeMessage(h)
 
+        # Now nuke inconsistant messages.
         self._deleteMessageIDs(badMessageIDs, "REJECTED")
 
     def _deleteMessageIDs(self, messageIDSet, why, today=None):
+        """Helper function. Remove all the fragments and chunks associated
+           with a given message, and mark the message as delivered or
+           undeliverable.
+           
+              messageIDSet -- a map from 20-byte messageID to 1.
+              why -- 'REJECTED' or 'COMPLETED'.
+        """
         assert why in ("REJECTED", "COMPLETED")
         if today is None:
-            today = previousMidnight(time.time())
-        else:
-            today = previousMidnight(today)
+            today = time.time()
+        today = previousMidnight(today)
         if why == 'REJECTED':
             LOG.debug("Removing bogus messages by IDs: %s",
                       messageIDSet.keys())
@@ -170,133 +322,101 @@
             if messageIDSet.has_key(fm.messageid):
                 self.store.removeMessage(h)
 
-
-    def _getFragmentMetadata(self, fragmentPacket):
-        now=time.time()
-        return  _FragmentMetadata(messageid=fragmentPacket.msgID,
-                                  idx=fragmentPacket.index,
-                                  hash=fragmentPacket.hash,
-                                  size=fragmentPacket.msgLen,
-                                  isChunk=0,
-                                  chunkNum=None,
-                                  overhead=fragmentPacket.getOverhead(),
-                                  insertedDate=previousMidnight(now))
-        
-    def addFragment(self, fragmentPacket, now=None):
-        #print "---"
-        if now is None:
-            now = time.time()
-        today = previousMidnight(now)
-
-        s = self.db.getStatusAndTime(fragmentPacket.msgID)
-        if s:
-            #print "A"
-            LOG.debug("Dropping fragment of %s message %r",
-                      s[0].lower(), fragmentPacket.msgID)
-            return
-            
-        meta = self._getFragmentMetadata(fragmentPacket)
-        state = self.getState(meta)
+    def _getState(self, fm):
+        """Helper function.  Return the MessageState object associated with
+           a given FragmentMetadata; allocate it if necessary."""
         try:
-            # print "B"
-            state.addFragment(None, meta, noop=1)
-            h = self.store.queueMessageAndMetadata(fragmentPacket.data, meta)
-            state.addFragment(h, meta)
-            #print "C"
-        except MismatchedFragment:
-            # remove other fragments, mark msgid as bad.
-            #print "D"
-            self._deleteMessageIDs({ meta.messageid : 1}, "REJECTED", now)
-        except UnneededFragment:
-            #print "E"
-            LOG.debug("Dropping unneeded fragment %s of message %r",
-                      fragmentPacket.index, fragmentPacket.msgID)
-
-    def getReadyMessage(self, msgid):
-        s = self.states.get(msgid)
-        if not s or not s.isDone():
-            return None
-
-        hs = s.getChunkHandles()
-        msg = "".join([self.store.messageContents(h) for h in hs])
-        msg = unwhiten(msg[:s.params.length])
-        return msg                      
-
-    def markMessageCompleted(self, msgid, rejected=0):
-        s = self.states.get(msgid)
-        if not s or not s.isDone():
-            return None
-        if rejected:
-            self._deleteMessageIDs({msgid: 1}, "REJECTED")
-        else:
-            self._deleteMessageIDs({msgid: 1}, "COMPLETED")
-
-    def listReadyMessages(self):
-        return [ msgid
-                 for msgid,state in self.states.items()
-                 if state.isDone() ]
-
-    def unchunkMessages(self):
-        for msgid, state in self.states.items():
-            if not state.hasReadyChunks():
-                continue
-            # refactor as much of this as possible into state. XXXX 
-            for chunkno, lst in state.getReadyChunks():
-                vs = []
-                minDate = min([fm.insertedDate for h,fm in lst])
-                for h,fm in lst:
-                    vs.append((state.params.getPosition(fm.idx)[1],
-                               self.store.messageContents(h)))
-                chunkText = "".join(state.params.getFEC().decode(vs))
-                del vs
-                fm2 = _FragmentMetadata(state.messageid, state.hash,
-                                        1, state.params.length, 1,
-                                        chunkno,
-                                        state.overhead,
-                                        minDate)
-                h2 = self.store.queueMessageAndMetadata(chunkText, fm2)
-                #XXXX005 handle if crash comes here!
-                for h,fm in lst:
-                    self.store.removeMessage(h)
-                state.fragmentsByChunk[chunkno] = {}
-                state.addChunk(h2, fm2)
+            return self.states[fm.messageid]
+        except KeyError:
+            state = MessageState(messageid=fm.messageid,
+                                 length=fm.size,
+                                 overhead=fm.overhead,
+                                 inserted=fm.insertedDate,
+                                 nym=fm.nym)
+            self.states[fm.messageid] = state
+            return state
 
 # ======================================================================
 
 class MismatchedFragment(Exception):
+    """Exception raised when a fragment isn't compatible with the other
+       fragments with a given message ID.  Because fragments are
+       integrity-checked on their way in, inconsistent fragments mean the
+       message is corrupt."""
     pass
 
 class UnneededFragment(Exception):
+    """Exception raised when a fragment is unneeded, and doesn't need to be
+       stored to disk."""
     pass
 
-class _FragmentMetadata:
-    def __init__(self, messageid, hash, idx, size, isChunk, chunkNum, overhead,
-                 insertedDate):
+class FragmentMetadata:
+    """Persistent metadata object to hold the state of a given fragment or
+       reconstructed chunk."""
+    ## Fields
+    # messageid -- unique 20-byte identifier for the message this fragment
+    #    comes from.
+    # idx -- index of the fragment within the message.  In the case of a
+    #    chunk, it's equal to chunkNum.
+    # size -- total length of the message.
+    # isChunk -- true iff this is a reconstructed chunk.
+    # chunkNum -- number of the chunk to which this fragment belongs.
+    # overhead -- Payload overhead for this fragment.  Equal to 0 or
+    #    ENC_FWD_OVERHEAD.
+    # insertedDate -- Midnight GMT before the day this fragment was received.
+    # nym -- name of the identity that received this fragment.
+    def __init__(self, messageid, idx, size, isChunk, chunkNum, overhead,
+                 insertedDate, nym):
         self.messageid = messageid
-        self.hash = hash
         self.idx = idx
         self.size = size
         self.isChunk = isChunk
         self.chunkNum = chunkNum
         self.overhead = overhead
         self.insertedDate = insertedDate
+        self.nym = nym
 
     def __getstate__(self):
-        return ("V0", self.messageid, self.hash, self.idx, self.size,
-                self.isChunk, self.chunkNum, self.insertedDate)
+        return ("V0", self.messageid, self.idx, self.size,
+                self.isChunk, self.chunkNum, self.overhead, self.insertedDate,
+                self.nym)
 
-    def __setstate__(self, o):
+    def __setstate__(self, state):
         if state[0] == 'V0':
-            (_, self.messageid, self.hash, self.idx, self.size,
-             self.isChunk, self.chunkNum, self.insertedDate) = state
+            (_, self.messageid, self.idx, self.size,
+             self.isChunk, self.chunkNum, self.overhead, self.insertedDate,
+             self.nym) = state
         else:
             raise MixFatalError("Unrecognized fragment state")
 
 class MessageState:
-    def __init__(self, messageid, hash, length, overhead):
+    """Helper class.  Tracks the status of the reconstruction of a
+       single message.  MessageState objects are not persistent, and must
+       be reconstructed from FragmentMetadata objects whenever a
+       fragment pool is rescanned.
+    """
+    ## Fields:
+    # messageid -- the 20-byte message ID of this message.
+    # overhead -- the overhead for messages sent via this message
+    # inserted -- the midnight (GMT) of the day on the first packet
+    #     associated with this message was inserted.
+    # nym -- the name of the identity receiving this message.  Used to
+    #     prevent linkage attacks. XXXX005 specify this need!
+    #
+    # params -- an instance of FragmentationParams for this message.
+    # chunks -- a map from chunk number to tuples of (handle within the pool,
+    #     FragmentMetadata object).  For completed chunks.
+    # fragmentsByChunk -- a map from chunk number to maps from
+    #     index-within-chunk to (handle,FragmentMetadata)
+    # readyChunks -- a map whose keys are the numbers of chunks that
+    #     are ready for reconstruction, but haven't been reconstructed
+    #     yet.
+    def __init__(self, messageid, length, overhead, inserted, nym):
+        """Create a new MessageState."""
         self.messageid = messageid
-        self.hash = hash
         self.overhead = overhead
+        self.inserted = inserted
+        self.nym = nym
         # chunkno -> handle,fragmentmeta
         self.chunks = {} 
         # chunkno -> idxwithinchunk -> (handle,fragmentmeta)
@@ -308,40 +428,52 @@
         self.readyChunks = {}
         
     def isDone(self):
+        """Return true iff we have reconstructed all the chunks for this
+           message."""
         return len(self.chunks) == self.params.nChunks
 
     def getChunkHandles(self):
+        """Requires self.isDone().  Return an in-order list for the handles
+           of the reconstructed chunks of this message."""
+        assert self.isDone()
         return [ self.chunks[i][0] for i in xrange(self.params.nChunks) ]
 
     def addChunk(self, h, fm):
-        # h is handle
-        # fm is fragmentmetadata
+        """Register a chunk with handle h and FragmentMetadata fm.  If the
+           chunk is inconsistent with other fragments of this message,
+           raise MismatchedFragment."""
         assert fm.isChunk
         assert fm.messageid == self.messageid
         if (fm.size != self.params.length or
-            fm.hash != self.hash or
             fm.overhead != self.overhead or
             self.chunks.has_key(fm.chunkNum)):
-            #print "MIS-C-1"
             raise MismatchedFragment
-        
+
+        if fm.nym != self.nym:
+            raise MismatchedFragment
+
+        if self.inserted > fm.insertedDate:
+            self.inserted = fm.insertedDate
         self.chunks[fm.chunkNum] = (h,fm)
 
         if self.fragmentsByChunk[fm.chunkNum]:
             LOG.warn("Found a chunk with unneeded fragments for message %r",
                      self.messageid)
-            #XXXX005 the old fragments need to be removed.
+
+        if self.readyChunks.get(fm.chunkNum):
+            del self.readyChunks[fm.chunkNum]
             
     def addFragment(self, h, fm, noop=0):
-        # h is handle
-        # fm is fragmentmetadata
+        """Register a fragment with handle h and FragmentMetadata fm.  If the
+           fragment is inconsistent with the other fragments of this message,
+           raise MismatchedFragment.  If the fragment isn't neeeded (because
+           enough fragments for its chunks have already been received),
+           raise UnneededFragment).  If 'noop' is true, do not add this
+           fragment--just raise exceptions as needed."""
         assert fm.messageid == self.messageid
 
         if (fm.size != self.params.length or
             fm.overhead != self.overhead):
-            #print "MIS-1"
-            #print (fm.hash, fm.size, fm.overhead)
-            #print (self.hash, self.params.length, self.overhead)
             raise MismatchedFragment
         
         chunkNum, pos = self.params.getPosition(fm.idx)
@@ -350,45 +482,86 @@
 
         if (self.chunks.has_key(chunkNum) or
             len(self.fragmentsByChunk[chunkNum]) >= self.params.k):
-            #print "UNN-2"
             raise UnneededFragment
         
         if self.fragmentsByChunk[chunkNum].has_key(pos):
-            #print "MIS-3"
             raise MismatchedFragment
 
         if noop:
             return
         assert h
+        if self.inserted > fm.insertedDate:
+            self.inserted = fm.insertedDate        
         self.fragmentsByChunk[chunkNum][pos] = (h, fm)
 
         if len(self.fragmentsByChunk[chunkNum]) >= self.params.k:
             self.readyChunks[chunkNum] = 1
 
     def hasReadyChunks(self):
+        """Return true iff some of the chunks in this message are pending
+           reconstruction."""
         return len(self.readyChunks) != 0
 
-    def getReadyChunks(self):
-        """DOCDOC"""
-        # return list of [ (chunkno, [(h, fm)...]) )
-        r = []
+    def reconstruct(self, store):
+        """If any of the chunks in this message are pending reconstruction,
+           reconstruct them in a given store."""
+        if not self.readyChunks:
+            return
         for chunkno in self.readyChunks.keys():
+            # Get the first K fragments in the chunk. (list of h,fm)
             ch = self.fragmentsByChunk[chunkno].values()[:self.params.k]
-            r.append( (chunkno, ch) )
-        return r
+            minDate = min([fm.insertedDate for h, fm in ch])
+            # Build a list of (position-within-chunk, fragment-contents).
+            frags = [(self.params.getPosition(fm.idx)[1],
+                      store.messageContents(h)) for h,fm in ch]
+            chunkText = "".join(self.params.getFEC().decode(frags))
+            del frags
+            fm2 = FragmentMetadata(messageid=self.messageid,
+                                   idx=chunkno, size=self.params.length,
+                                   isChunk=1, chunkNum=chunkno,
+                                   overhead=self.overhead,
+                                   insertedDate=minDate, nym=self.nym)
+            # Queue the chunk.
+            h2 = store.queueMessageAndMetadata(chunkText, fm2)
+            del chunkText
+            # Remove superceded fragments.
+            for h, fm in ch:
+                store.removeMessage(h)
+            # Update this MessageState object.
+            self.fragmentsByChunk[chunkno] = {}
+            del self.readyChunks[chunkno]
+            self.addChunk(h2, fm2)
 
+    def getUnneededFragmentHandles(self):
+        """Returns any handles for fragments that have been superceded by
+           chunks."""
+        r = []
+        for chunkno in self.chunks.keys():
+            r.extend([ h for h,_ in self.fragmentsByChunk[chunkno]])
+        return r
 
-class _FragmentDB(mixminion.Filestore.DBBase):
+class FragmentDB(mixminion.Filestore.DBBase):
+    """Internal class. Uses a database background (such as dbm, berkely db,
+       gdbm, etc.) to remember which message IDs have already been
+       reconstructed or noted as corrupt
+    """
     def __init__(self, location):
+        """Open a new FragmentDB; stores its data in files beginning with
+           'location'."""
         mixminion.Filestore.DBBase.__init__(self, location, "fragment")
         self.sync()
-    def markStatus(self, msgid, status, today):
+    def markStatus(self, msgid, status, today=None):
+        """Note fragments for a message with message ID 'msgid' should no
+           longer be stored.  'status' is one of 'COMPLETED' or 'REJECTED',
+           depending on whether the message was delivered or undeliverable."""
         assert status in ("COMPLETED", "REJECTED")
         if today is None:
             today = time.time()
         today = previousMidnight(today)
         self[msgid] = (status, today)
     def getStatusAndTime(self, msgid):
+        """Given a messageID, return a 2-tuple of status,resolutiondate.
+           Return None if the message is still deliverable."""
         return self.get(msgid, None)
     def _encodeKey(self, k):
         return binascii.b2a_hex(k)
@@ -400,19 +573,30 @@
         status = {"C":"COMPLETED", "R":"REJECTED"}[v[0]]
         tm = int(v[2:])
         return status, tm
-
             
 # ======================================================================
+# Internal lazy-generated cache from (k,n) to _minionlib.FEC object.
+# Note that we only use k,n for a limited set of k,n.
+def _blankFECtable():
+    """Return a map from permissible k,n tuples to FEC objects"""
+    f = {}
+    k = 2
+    while k <= MAX_FRAGMENTS_PER_CHUNK:
+        f[(k, int(math.ceil(EXP_FACTOR*k)))] = None
+        k *= 2
+    return f
 
-_fectab = {}
+# global map.
+_fectab = _blankFECtable()
 
 def _getFEC(k,n):
-    """DOCDOC: Note race condition """
-    try:
-        return _fectab[(k,n)]
-    except KeyError:
-        f = mixminion._minionlib.FEC_generate(k,n)
-        _fectab[(k,n)] = f
-        return f
-    
-    
+    """Given k and n parameters, return a FEC object to fragment and
+       reconstruct messages given those parameters."""
+    # There's a possible race condition here where two threads note
+    # that a given set of parameters haven't been generated, and both
+    # generate them.  This is harmless.
+    f = _fectab[(k,n)]
+    if f is None:
+        f = _fectab[(k,n)] = mixminion._minionlib.FEC_generate(k,n)
+    return f
+

Index: Packet.py
===================================================================
RCS file: /home/minion/cvsroot/src/minion/lib/mixminion/Packet.py,v
retrieving revision 1.54
retrieving revision 1.55
diff -u -d -r1.54 -r1.55
--- Packet.py	8 Aug 2003 21:42:47 -0000	1.54
+++ Packet.py	18 Aug 2003 05:11:55 -0000	1.55
@@ -82,6 +82,8 @@
 MIN_EXIT_TYPE  = 0x0100  # The numerically first exit type.
 SMTP_TYPE      = 0x0100  # Mail the message
 MBOX_TYPE      = 0x0101  # Send the message to one of a fixed list of addresses
+NEWS_TYPE      = 0x0102  # Post the message to some ngs, and maybe mail it too
+FRAGMENT_TYPE  = 0x0103  # Find the actual deliver info in the message payload 
 MAX_EXIT_TYPE  = 0xFFFF
 
 class ParseError(MixError):

Index: test.py
===================================================================
RCS file: /home/minion/cvsroot/src/minion/lib/mixminion/test.py,v
retrieving revision 1.145
retrieving revision 1.146
diff -u -d -r1.145 -r1.146
--- test.py	18 Aug 2003 00:41:10 -0000	1.145
+++ test.py	18 Aug 2003 05:11:55 -0000	1.146
@@ -6407,7 +6407,7 @@
         pkts2 = [ pp(x) for x in em(M2,38) ]
         pkts3 = [ pp(x) for x in em(M3,0) ]
         pkts4 = [ pp(x) for x in em(M4,0) ]
-        pkts5 = [ pp(x) for x in em(M4,0) ]
+        pkts5 = [ pp(x) for x in em(M5,0) ]
         self.assertEquals(map(len, [pkts1,pkts2,pkts3,pkts4,pkts5]),
                           [3, 11, 11, 66, 66])
         
@@ -6417,8 +6417,10 @@
         pool.unchunkMessages()
 
         #DOCDOC comment this rats' nets
-        
+
+        ####
         # Reconstruct: simple case.
+        ####
         pool.addFragment(pkts1[2])
         self.assertEquals(1, pool.store.count())
         self.assertEquals([], pool.listReadyMessages())
@@ -6437,11 +6439,45 @@
         self.assertEquals([], pool.listReadyMessages())
         pool.addFragment(pkts1[1])
         self.assertEquals([], pool.store.getAllMessages())
+
+        ####
+        # Reconstruct: large message, stop half-way, reload, finish.
+        ####
+        # enough for chunk1 -- 17 messages
+        for p in pkts4[5:22]: pool.addFragment(p)
+        # enough for half of chunk2: 8 messages
+        for p in pkts4[22:30]: pool.addFragment(p)
+        pool.unchunkMessages()
+        # close and re-open messages
+        pool.close()
+        pool = mixminion.Fragments.FragmentPool(loc)
+        # Enough for the rest of message 4...  8 from 2, 17 from 3.
+        for p in pkts4[36:44]+pkts4[49:66]:
+            pool.addFragment(p)
+            pool.unchunkMessages()
+        self.assertEquals(len(pool.listReadyMessages()), 1)
+        mid = pool.listReadyMessages()[0]
+        self.assertLongStringEq(M4, uncompressData(pool.getReadyMessage(mid)))
+        pool.markMessageCompleted(mid)
+        pool.close()
+        pool = mixminion.Fragments.FragmentPool(loc)
+        pool.addFragment(pkts4[48])
+        self.assertEquals([], pool.store.getAllMessages())
+
+        # free some RAM
+        del M4
+        del M1
+        del pkts4
+        del pkts1
+
+        ####
+        # Try an interleaved case with two smallish msgs and one big one.
+        # Provoke an error in the big one part way through.
+        ####
         
+
         
         
-        
-
 #----------------------------------------------------------------------
 def testSuite():
     """Return a PyUnit test suite containing all the unit test cases."""
@@ -6449,7 +6485,7 @@
     loader = unittest.TestLoader()
     tc = loader.loadTestsFromTestCase
 
-    if 0:
+    if 1:
         suite.addTest(tc(FragmentTests))
         return suite