[Author Prev][Author Next][Thread Prev][Thread Next][Author Index][Thread Index]

[minion-cvs] Fragmentation backend now works for at least one test c...



Update of /home/minion/cvsroot/src/minion/lib/mixminion
In directory moria.mit.edu:/tmp/cvs-serv9529/lib/mixminion

Modified Files:
	BuildMessage.py Fragments.py test.py 
Log Message:
Fragmentation backend now works for at least one test case.  More tests to come. Woohoo.

Index: BuildMessage.py
===================================================================
RCS file: /home/minion/cvsroot/src/minion/lib/mixminion/BuildMessage.py,v
retrieving revision 1.53
retrieving revision 1.54
diff -u -d -r1.53 -r1.54
--- BuildMessage.py	14 Aug 2003 19:37:24 -0000	1.53
+++ BuildMessage.py	18 Aug 2003 00:41:10 -0000	1.54
@@ -21,9 +21,9 @@
 
 __all__ = ['buildForwardMessage', 'buildEncryptedMessage',
            'buildReplyMessage', 'buildReplyBlock', 'checkPathLength',
-           'encodePayloads', 'decodePayload' ]
+           'encodeMessage', 'decodePayload' ]
 
-def encodePayloads(message, overhead, paddingPRNG):
+def encodeMessage(message, overhead, paddingPRNG=None):
     """Given a message, compress it, fragment it into individual payloads,
        and add extra fields (size, hash, etc) as appropriate.  Return a list
        of strings, each of which is a message payload suitable for use in
@@ -60,7 +60,7 @@
         return [ p.pack() ]
 
     # DOCDOC
-    messageid = getCommonPRNG().getBytes(20)
+    messageid = Crypto.getCommonPRNG().getBytes(20)
     p = mixminion.Fragments.FragmentationParams(len(payload), overhead)
     rawFragments = p.getFragments(payload)
     fragments = []
@@ -75,7 +75,7 @@
                         paddingPRNG=None):
     # Compress, pad, and checksum the payload.
     if payload is not None and exitType != DROP_TYPE:
-        payloads = encodePayloads(payload, 0, paddingPRNG)
+        payloads = encodeMessage(payload, 0, paddingPRNG)
         assert len(payloads) == 1
         payload = payloads[0]
         LOG.debug("Encoding forward message for %s-byte payload",len(payload))
@@ -127,7 +127,7 @@
 
 def buildEncryptedForwardMessage(payload, exitType, exitInfo, path1, path2,
                                  key, paddingPRNG=None, secretRNG=None):
-    payloads = encodePayloads(payload, ENC_FWD_OVERHEAD, paddingPRNG)
+    payloads = encodeMessage(payload, ENC_FWD_OVERHEAD, paddingPRNG)
     assert len(payloads) == 1
     return _buildEncryptedForwardMessage(payloads[0], exitType, exitInfo,
                                          path1, path2, key, paddingPRNG,
@@ -194,7 +194,7 @@
     return _buildMessage(payload, exitType, exitInfo, path1, path2,paddingPRNG)
 
 def buildReplyMessage(payload, path1, replyBlock, paddingPRNG=None):
-    payloads = encodePayloads(payload, 0, paddingPRNG)
+    payloads = encodeMessage(payload, 0, paddingPRNG)
     assert len(payloads) == 1
     return _buildReplyMessage(payloads[0], path1, replyBlock, paddingPRNG)
 

Index: Fragments.py
===================================================================
RCS file: /home/minion/cvsroot/src/minion/lib/mixminion/Fragments.py,v
retrieving revision 1.2
retrieving revision 1.3
diff -u -d -r1.2 -r1.3
--- Fragments.py	17 Aug 2003 21:09:56 -0000	1.2
+++ Fragments.py	18 Aug 2003 00:41:10 -0000	1.3
@@ -5,15 +5,22 @@
 
    Code to fragment and reassemble messages."""
 
+import binascii
+import math
+import time
 import mixminion._minionlib
 import mixminion.Filestore
-from mixminion.Crypto import getCommonPRNG, whiten, unwhiten
+from mixminion.Crypto import ceilDiv, getCommonPRNG, whiten, unwhiten
 from mixminion.Common import LOG, previousMidnight, MixError, MixFatalError
+from mixminion.Packet import ENC_FWD_OVERHEAD, PAYLOAD_LEN, \
+     FRAGMENT_PAYLOAD_OVERHEAD
+
+__all__ = [ "FragmentPool", "FragmentationParams" ]
 
 MAX_FRAGMENTS_PER_CHUNK = 32
 EXP_FACTOR = 1.33333333333
 
-class _FragmentationParams:
+class FragmentationParams:
     """DOCDOC"""
     ## Fields:
     # k, n, length, fec, chunkSize, fragmentCapacity, dataFragments,
@@ -25,20 +32,21 @@
         # minimum number of payloads to hold msg, without fragmentation
         # or padding.
         minFragments = ceilDiv(length, self.fragCapacity)
+        assert minFragments >= 2
         # Number of data fragments per chunk.
         self.k = 2
-        while k < minFragments and k < 16:
+        while self.k < minFragments and self.k < 16:
             self.k *= 2
         # Number of chunks.
-        self.nChunks = ceilDiv(minFragments, k)
+        self.nChunks = ceilDiv(minFragments, self.k)
+        # Number of total fragments per chunk.
+        self.n = int(math.ceil(EXP_FACTOR * self.k))
         # Data in  a single chunk
         self.chunkSize = self.fragCapacity * self.k
         # Length of data to fill chunks
         self.paddedLen = self.nChunks * self.fragCapacity * self.k
         # Length of padding needed to fill all chunks with data.
         self.paddingLen = self.paddedLen - length
-        # Number of total fragments per chunk.
-        self.n = math.ceil(EXP_FACTOR * k)
         # FEC object
         self.fec = None
 
@@ -57,7 +65,7 @@
             paddingPRNG = getCommonPRNG()
 
         self.getFEC()
-        assert s.length == self.length
+        assert len(s) == self.length
         s = whiten(s)
         s += paddingPRNG.getBytes(self.paddingLen)
         assert len(s) == self.paddedLen
@@ -74,13 +82,187 @@
                 blocks.append( chunks[i][j*self.fragCapacity:
                                          (j+1)*self.fragCapacity] )
             chunks[i] = None
-            for j in xrange(p.n):
+            for j in xrange(self.n):
                 fragments.append( self.fec.encode(j, blocks) )
         return fragments
 
 # ======================================================================
 #DOCDOC this entire section
 
+class FragmentPool:
+    """DOCDOC"""
+    ##
+    # messages : map from 
+    def __init__(self, dir):
+        self.store = mixminion.Filestore.StringMetadataStore(dir,create=1,
+                                                             scrub=1)
+        self.db = _FragmentDB(dir+"_db")
+        self.rescan()
+
+    def sync(self):
+        self.db.sync()
+
+    def close(self):
+        self.db.close()
+
+    def getState(self, fm):
+        try:
+            return self.states[fm.messageid]
+        except KeyError:
+            state = MessageState(messageid=fm.messageid,
+                                 hash=fm.hash,
+                                 length=fm.size,
+                                 overhead=fm.overhead)
+            self.states[fm.messageid] = state
+            return state
+        
+    def rescan(self):
+        self.store.loadAllMetadata(lambda: None)
+        meta = self.store._metadata_cache
+        self.states = states = {}
+        badMessageIDs = {}
+        unneededHandles = []
+        for h, fm in meta.items():
+            if not fm:
+                LOG.debug("Removing fragment %s with missing metadata", h)
+                self.store.removeMessage(h)
+            try:
+                mid = fm.messageid
+                if badMessageIDs.has_key(mid):
+                    continue
+                state = self.getState(fm)
+                if fm.isChunk:
+                    state.addChunk(h, fm)
+                else:
+                    state.addFragment(h, fm)
+            except MismatchedFragment:
+                badMessageIDs[mid] = 1
+            except UnneededFragment:
+                unneededHandles.append(h)
+
+        for h in unneededHandles:
+            fm = meta[h]
+            LOG.debug("Removing unneeded fragment %s from message ID %r",
+                      fm.idx, fm.messageid)
+            self.store.removeMessage(h)
+
+        self._deleteMessageIDs(badMessageIDs, "REJECTED")
+
+    def _deleteMessageIDs(self, messageIDSet, why, today=None):
+        assert why in ("REJECTED", "COMPLETED")
+        if today is None:
+            today = previousMidnight(time.time())
+        else:
+            today = previousMidnight(today)
+        if why == 'REJECTED':
+            LOG.debug("Removing bogus messages by IDs: %s",
+                      messageIDSet.keys())
+        else:
+            LOG.debug("Removing completed messages by IDs: %s",
+                      messageIDSet.keys())
+        for mid in messageIDSet.keys():
+            self.db.markStatus(mid, why, today)
+            try:
+                del self.states[mid]
+            except KeyError:
+                pass
+        for h, fm in self.store._metadata_cache.items():
+            if messageIDSet.has_key(fm.messageid):
+                self.store.removeMessage(h)
+
+
+    def _getFragmentMetadata(self, fragmentPacket):
+        now=time.time()
+        return  _FragmentMetadata(messageid=fragmentPacket.msgID,
+                                  idx=fragmentPacket.index,
+                                  hash=fragmentPacket.hash,
+                                  size=fragmentPacket.msgLen,
+                                  isChunk=0,
+                                  chunkNum=None,
+                                  overhead=fragmentPacket.getOverhead(),
+                                  insertedDate=previousMidnight(now))
+        
+    def addFragment(self, fragmentPacket, now=None):
+        #print "---"
+        if now is None:
+            now = time.time()
+        today = previousMidnight(now)
+
+        s = self.db.getStatusAndTime(fragmentPacket.msgID)
+        if s:
+            #print "A"
+            LOG.debug("Dropping fragment of %s message %r",
+                      s[0].lower(), fragmentPacket.msgID)
+            return
+            
+        meta = self._getFragmentMetadata(fragmentPacket)
+        state = self.getState(meta)
+        try:
+            # print "B"
+            state.addFragment(None, meta, noop=1)
+            h = self.store.queueMessageAndMetadata(fragmentPacket.data, meta)
+            state.addFragment(h, meta)
+            #print "C"
+        except MismatchedFragment:
+            # remove other fragments, mark msgid as bad.
+            #print "D"
+            self._deleteMessageIDs({ meta.messageid : 1}, "REJECTED", now)
+        except UnneededFragment:
+            #print "E"
+            LOG.debug("Dropping unneeded fragment %s of message %r",
+                      fragmentPacket.index, fragmentPacket.msgID)
+
+    def getReadyMessage(self, msgid):
+        s = self.states.get(msgid)
+        if not s or not s.isDone():
+            return None
+
+        hs = s.getChunkHandles()
+        msg = "".join([self.store.messageContents(h) for h in hs])
+        msg = unwhiten(msg[:s.params.length])
+        return msg                      
+
+    def markMessageCompleted(self, msgid, rejected=0):
+        s = self.states.get(msgid)
+        if not s or not s.isDone():
+            return None
+        if rejected:
+            self._deleteMessageIDs({msgid: 1}, "REJECTED")
+        else:
+            self._deleteMessageIDs({msgid: 1}, "COMPLETED")
+
+    def listReadyMessages(self):
+        return [ msgid
+                 for msgid,state in self.states.items()
+                 if state.isDone() ]
+
+    def unchunkMessages(self):
+        for msgid, state in self.states.items():
+            if not state.hasReadyChunks():
+                continue
+            # refactor as much of this as possible into state. XXXX 
+            for chunkno, lst in state.getReadyChunks():
+                vs = []
+                minDate = min([fm.insertedDate for h,fm in lst])
+                for h,fm in lst:
+                    vs.append((state.params.getPosition(fm.idx)[1],
+                               self.store.messageContents(h)))
+                chunkText = "".join(state.params.getFEC().decode(vs))
+                del vs
+                fm2 = _FragmentMetadata(state.messageid, state.hash,
+                                        1, state.params.length, 1,
+                                        chunkno,
+                                        state.overhead,
+                                        minDate)
+                h2 = self.store.queueMessageAndMetadata(chunkText, fm2)
+                #XXXX005 handle if crash comes here!
+                for h,fm in lst:
+                    self.store.removeMessage(h)
+                state.fragmentsByChunk[chunkno] = {}
+                state.addChunk(h2, fm2)
+
+# ======================================================================
+
 class MismatchedFragment(Exception):
     pass
 
@@ -119,7 +301,7 @@
         self.chunks = {} 
         # chunkno -> idxwithinchunk -> (handle,fragmentmeta)
         self.fragmentsByChunk = []
-        self.params = _FragmentationParams(length, overhead)
+        self.params = FragmentationParams(length, overhead)
         for i in xrange(self.params.nChunks):
             self.fragmentsByChunk.append({})
         # chunkset: ready chunk num -> 1
@@ -140,31 +322,47 @@
             fm.hash != self.hash or
             fm.overhead != self.overhead or
             self.chunks.has_key(fm.chunkNum)):
+            #print "MIS-C-1"
             raise MismatchedFragment
         
         self.chunks[fm.chunkNum] = (h,fm)
 
-    def addFragment(self, h, fm):
+        if self.fragmentsByChunk[fm.chunkNum]:
+            LOG.warn("Found a chunk with unneeded fragments for message %r",
+                     self.messageid)
+            #XXXX005 the old fragments need to be removed.
+            
+    def addFragment(self, h, fm, noop=0):
         # h is handle
         # fm is fragmentmetadata
         assert fm.messageid == self.messageid
 
-        if (fm.hash != self.hash or
-            fm.size != self.params.length or
+        if (fm.size != self.params.length or
             fm.overhead != self.overhead):
+            #print "MIS-1"
+            #print (fm.hash, fm.size, fm.overhead)
+            #print (self.hash, self.params.length, self.overhead)
             raise MismatchedFragment
         
-        chunkNum, pos = self.params.getPosition(idx)
+        chunkNum, pos = self.params.getPosition(fm.idx)
+        if chunkNum >= self.params.nChunks:
+            raise MismatchedFragment
 
-        if self.chunks.has_key(chunkNum):
+        if (self.chunks.has_key(chunkNum) or
+            len(self.fragmentsByChunk[chunkNum]) >= self.params.k):
+            #print "UNN-2"
             raise UnneededFragment
         
         if self.fragmentsByChunk[chunkNum].has_key(pos):
+            #print "MIS-3"
             raise MismatchedFragment
 
+        if noop:
+            return
+        assert h
         self.fragmentsByChunk[chunkNum][pos] = (h, fm)
 
-        if len(self.fragmentsByChunk(chunkNum)) >= self.params.k:
+        if len(self.fragmentsByChunk[chunkNum]) >= self.params.k:
             self.readyChunks[chunkNum] = 1
 
     def hasReadyChunks(self):
@@ -179,14 +377,16 @@
             r.append( (chunkno, ch) )
         return r
 
+
 class _FragmentDB(mixminion.Filestore.DBBase):
     def __init__(self, location):
         mixminion.Filestore.DBBase.__init__(self, location, "fragment")
         self.sync()
     def markStatus(self, msgid, status, today):
         assert status in ("COMPLETED", "REJECTED")
-        if now is None:
-            now = time.time()
+        if today is None:
+            today = time.time()
+        today = previousMidnight(today)
         self[msgid] = (status, today)
     def getStatusAndTime(self, msgid):
         return self.get(msgid, None)
@@ -198,138 +398,9 @@
             {"COMPLETED":"C", "REJECTED":"R"}[status], str(tm))
     def _decodeVal(self, v):
         status = {"C":"COMPLETED", "R":"REJECTED"}[v[0]]
-        tm = int(tm[2:])
+        tm = int(v[2:])
         return status, tm
 
-class FragmentPool:
-    """DOCDOC"""
-    ##
-    # messages : map from 
-    def __init__(self, dir):
-        self.store = mixminion.Filestore.StringMetadataStore(dir,create=1,
-                                                             scrub=1)
-        self.log = _FragmentDB(dir+"_db")
-        self.rescan()
-
-    def getState(self, fm):
-        try:
-            return self.states[fm.messageid]
-        except KeyError:
-            state = MessageState(messageid=fm.messageid,
-                                 hash=fm.hash,
-                                 length=fm.size,
-                                 overhead=fm.overhead)
-            self.states[fm.messageid] = state
-            return state
-        
-    def rescan(self):
-        self.store.loadAllMetadata()
-        meta = self.store._metadata_cache
-        self.states = states = {}
-        badMessageIDs = {}
-        unneededHandles = []
-        for h, fm in meta.items():
-            try:
-                mid = fm.messageid
-                if badMessageIDs.has_key(mid):
-                    continue
-                state = self.getState(fm)
-                if fm.isChunk:
-                    state.addChunk(h, fm)
-                else:
-                    state.addFragment(h, fm)
-            except MismatchedFragment:
-                badMessageIDs[mid] = 1
-            except UnneededFragment:
-                unneededHandles.append(h)
-
-        for h in unneededHandles:
-            fm = meta[h]
-            LOG.debug("Removing unneeded fragment %s from message ID %r",
-                      fm.idx, fm.messageid)
-            self.removeMessage(h)
-
-        self._abortMessageIDs(badMessageIDs, today)
-
-    def _abortMessageIDs(self, messageIDSet, today=None):
-        if today is None:
-            today = previousMidnight(time.time())
-        else:
-            today = previousMidnight(today)
-        LOG.debug("Removing bogus messages by IDs: %s", messageIDSet.keys())
-        for mid in messageIDSet.keys():
-            self.markStatus(mid, "REJECTED", today)
-        for h, fm in self._metadata_cache.items():
-            if messageIDSet.has_key(fm.messageid):
-                self.removeMessage(h)
-
-    def _getPacketMetadata(self, fragmentPacket):
-        return  _FragmentMetadata(messageid=fragmentPacket.msgID,
-                                  idx=fragmentPacket.index,
-                                  hash=fragmentPacket.hash,
-                                  size=fragmentPacket.msgLen,
-                                  isChunk=0,
-                                  chunkNum=None,
-                                  overhead=fragmentPacket.getOverhead(),
-                                  insertedDate=previousMidnight(now))
-        
-    def addFragment(self, fragmentPacket, now=None):
-        if now is None:
-            now = time.time()
-        today = previousMidnight(now)
-
-        meta = self._getFragmentMetadata(fragmentPacket)
-        state = self.getState(meta)
-        try:
-            state.addFragment(fragmentPacket)
-            h = self.store.queueMessageAndMetadata(fragmentPacket.data, meta)
-        except MismatchedFragment:
-            # remove other fragments, mark msgid as bad.            
-            self._abortMessageIDs({ meta.id : 1}, now)
-        except UnneededFragment:
-            LOG.debug("Dropping unneeded fragment %s of message %r",
-                      fragmentPacket.idx, fragmentPacket.msgID)
-
-    def getReadyMessage(self, msgid):
-        s = self.states.get(msgid)
-        if not s or not s.isDone():
-            return None
-
-        hs = s.getChunkHandles()
-        return "".join([self.state.getMessage(h) for h in hs])
-
-    def deleteMessage(self, msgid):
-        s = self.states.get(msgid)
-        if not s or not s.isDone():
-            return None
-
-        hs = s.getChunkHandles()
-        for h in hs:
-            self.store.removeMessage(h)
-
-    def getReadyMessages(self):
-        return [ msgid
-                 for msgid,state in self.states.items()
-                 if state.isDone() ]
-
-    def unchunkMessages(self):
-        for msgid, state in self.states.items():
-            if not state.hasReadyChunks():
-                continue
-            for chunkno, lst in state.getReadyChunks():
-                vs = []
-                minDate = min([fm.insertedDate for h,fm in lst])
-                for h,fm in lst:
-                    vs.append((state.getPos(fm.index)[1],
-                               self.store.getMessage(h)))
-                chunkText = self.store.params.getFEC().decode(vs)
-                fm2 = _FragmentMetadata(state.mesageid, state.hash,
-                                        state.idx, 1, chunkno, state.overhead,
-                                        minDate)
-                h2 = self.store.queueMessage(chunkText)
-                self.store.setMetadata(h2, fm2)
-                for h,fm in lst:
-                    self.store.removeMessage(h)
             
 # ======================================================================
 
@@ -343,4 +414,5 @@
         f = mixminion._minionlib.FEC_generate(k,n)
         _fectab[(k,n)] = f
         return f
+    
     

Index: test.py
===================================================================
RCS file: /home/minion/cvsroot/src/minion/lib/mixminion/test.py,v
retrieving revision 1.144
retrieving revision 1.145
diff -u -d -r1.144 -r1.145
--- test.py	17 Aug 2003 21:09:56 -0000	1.144
+++ test.py	18 Aug 2003 00:41:10 -0000	1.145
@@ -43,6 +43,7 @@
 import mixminion.Config
 import mixminion.Crypto as Crypto
 import mixminion.Filestore
+import mixminion.Fragments
 import mixminion.MMTPClient
 import mixminion.Packet
 import mixminion.ServerInfo
@@ -1648,7 +1649,7 @@
 
         for m in (p.getBytes(3000), p.getBytes(10000), "", "q", "blznerty"):
             for ov in 0, 42-20+16: # encrypted forward overhead
-                plds = BuildMessage.encodePayloads(m,ov,p)
+                plds = BuildMessage.encodeMessage(m,ov,p)
                 assert len(plds) == 1
                 pld = plds[0]
                 self.assertEquals(28*1024, len(pld)+ov)
@@ -2286,7 +2287,7 @@
         # get such a payload is to compress 25K of zeroes.
         nils = "\x00"*(25*1024)
         overcompressed_payload = \
-             BuildMessage.encodePayloads(nils, 0, AESCounterPRNG())[0]
+             BuildMessage.encodeMessage(nils, 0, AESCounterPRNG())[0]
         self.failUnlessRaises(CompressedDataTooLong,
              BuildMessage.decodePayload, overcompressed_payload, "X"*20)
 
@@ -6325,6 +6326,123 @@
         return ds[0] == ds[1]
 
 #----------------------------------------------------------------------
+class FragmentTests(TestCase):
+    def testFragmentParams(self):
+        K28 = 28*1024
+        FP = mixminion.Fragments.FragmentationParams
+        # 1 chunk, small
+        fp1 = FP(K28 + 1, 0)
+        self.assertEquals((fp1.length, fp1.fragCapacity, fp1.k, fp1.n,
+                           fp1.nChunks, fp1.chunkSize, fp1.paddedLen,
+                           fp1.paddingLen),
+                          (K28+1, K28 - 47, 2, 3,
+                           1, 2*(K28-47), 2*(K28-47),
+                           fp1.paddedLen-(K28+1)))
+        # 1 chunk, a bit larger, with enc-fwd overhead
+        fp1 = FP(150*1024, 38)
+        self.assertEquals((fp1.length, fp1.fragCapacity, fp1.k, fp1.n,
+                           fp1.nChunks, fp1.chunkSize, fp1.paddedLen,
+                           fp1.paddingLen),
+                          (150*1024, K28 - 47 - 38, 8, 11,
+                           1, 8*(K28-47-38), 8*(K28-47-38),
+                           fp1.paddedLen-(150*1024)))
+        # 2 chunks.
+        fp1 = FP(K28 * 20, 0)
+        self.assertEquals((fp1.length, fp1.fragCapacity, fp1.k, fp1.n,
+                           fp1.nChunks, fp1.chunkSize, fp1.paddedLen,
+                           fp1.paddingLen),
+                          (K28*20, K28 - 47, 16, 22,
+                           2, 16*(K28-47), 32*(K28-47),
+                           fp1.paddedLen-(K28*20)))
+        
+
+        # 3 chunks.
+        fp1 = FP(K28 * 32 + 101, 0)
+        self.assertEquals((fp1.length, fp1.fragCapacity, fp1.k, fp1.n,
+                           fp1.nChunks, fp1.chunkSize, fp1.paddedLen,
+                           fp1.paddingLen),
+                          (K28*32+101, K28 - 47, 16, 22,
+                           3, 16*(K28-47), 48*(K28-47),
+                           fp1.paddedLen-(K28*32+101)))
+        
+
+    def testFragmentation(self):
+        FP = mixminion.Fragments.FragmentationParams
+        
+        # One chunk.
+        msg = Crypto.getCommonPRNG().getBytes(150*1024)
+        fp1 = FP(len(msg), 38)
+        fec = mixminion.Fragments._getFEC(8, 11)
+        blocks = fp1.getFragments(msg)
+        self.assertEquals(len(blocks), 11)
+        for b in blocks:
+            self.assertEquals(len(b), 28*1024 - 47 - 38)
+        m2 = "".join(fec.decode(zip(range(3,11),blocks[3:])))
+        self.assertLongStringEq(msg, unwhiten(m2[:150*1024]))
+        
+        # Three chunks.
+        msg = Crypto.getCommonPRNG().getBytes(28*32*1024 + 101)
+        fp1 = FP(len(msg), 0)
+        fec = mixminion.Fragments._getFEC(16, 22)
+        blocks = fp1.getFragments(msg)
+        self.assertEquals(len(blocks), 66)
+        for b in blocks: self.assertEquals(len(b), 28*1024 - 47)
+        chunks = []
+        for chunkno in 0,1,2:
+            blocksInChunk = zip(range(22), blocks[chunkno*22:(chunkno+1)*22])
+            receivedBlocks = Crypto.getCommonPRNG().shuffle(blocksInChunk, 16)
+            self.assertEquals(16, len(receivedBlocks))
+            chunks.append("".join(fec.decode(receivedBlocks)))
+        self.assertLongStringEq(msg, unwhiten(("".join(chunks))[:len(msg)]))
+        
+    def testFragmentPool(self):
+        em = mixminion.BuildMessage.encodeMessage
+        pp = mixminion.Packet.parsePayload
+        M1 = Crypto.getCommonPRNG().getBytes(1024*30)
+        M2 = Crypto.getCommonPRNG().getBytes(1024*150)
+        M3 = Crypto.getCommonPRNG().getBytes(1024*200)
+        M4 = Crypto.getCommonPRNG().getBytes(1024*900)
+        M5 = Crypto.getCommonPRNG().getBytes(1024*900)
+        pkts1 = [ pp(x) for x in em(M1,0) ]
+        pkts2 = [ pp(x) for x in em(M2,38) ]
+        pkts3 = [ pp(x) for x in em(M3,0) ]
+        pkts4 = [ pp(x) for x in em(M4,0) ]
+        pkts5 = [ pp(x) for x in em(M4,0) ]
+        self.assertEquals(map(len, [pkts1,pkts2,pkts3,pkts4,pkts5]),
+                          [3, 11, 11, 66, 66])
+        
+        loc = mix_mktemp()
+        pool = mixminion.Fragments.FragmentPool(loc)
+        self.assertEquals([], pool.listReadyMessages())
+        pool.unchunkMessages()
+
+        #DOCDOC comment this rats' nets
+        
+        # Reconstruct: simple case.
+        pool.addFragment(pkts1[2])
+        self.assertEquals(1, pool.store.count())
+        self.assertEquals([], pool.listReadyMessages())
+        pool.addFragment(pkts1[0])
+        self.assertEquals(2, pool.store.count())
+        pool.addFragment(pkts1[1]) # should be 'unnneedeed'
+        self.assertEquals(2, pool.store.count())
+        self.assertEquals([], pool.listReadyMessages())
+        pool.unchunkMessages()
+        self.assertEquals([pkts1[0].msgID], pool.listReadyMessages())
+        mid = pool.listReadyMessages()[0]
+        #print len(M1), len(pool.getReadyMessage(mid))
+        self.assertLongStringEq(M1, uncompressData(pool.getReadyMessage(mid)))
+        self.assertLongStringEq(M1, uncompressData(pool.getReadyMessage(mid)))
+        pool.markMessageCompleted(mid)
+        self.assertEquals([], pool.listReadyMessages())
+        pool.addFragment(pkts1[1])
+        self.assertEquals([], pool.store.getAllMessages())
+        
+        
+        
+        
+
+#----------------------------------------------------------------------
 def testSuite():
     """Return a PyUnit test suite containing all the unit test cases."""
     suite = unittest.TestSuite()
@@ -6332,7 +6450,7 @@
     tc = loader.loadTestsFromTestCase
 
     if 0:
-        suite.addTest(tc(FilestoreTests))
+        suite.addTest(tc(FragmentTests))
         return suite
 
     suite.addTest(tc(MiscTests))