[Author Prev][Author Next][Thread Prev][Thread Next][Author Index][Thread Index]

[minion-cvs] Work on getting fragments happy -- not done yet



Update of /home/minion/cvsroot/src/minion/lib/mixminion
In directory moria.mit.edu:/tmp/cvs-serv3409/lib/mixminion

Modified Files:
	BuildMessage.py Crypto.py Packet.py 
Log Message:
Work on getting fragments happy -- not done yet

Index: BuildMessage.py
===================================================================
RCS file: /home/minion/cvsroot/src/minion/lib/mixminion/BuildMessage.py,v
retrieving revision 1.51
retrieving revision 1.52
diff -u -d -r1.51 -r1.52
--- BuildMessage.py	28 Jul 2003 01:02:33 -0000	1.51
+++ BuildMessage.py	8 Aug 2003 21:42:46 -0000	1.52
@@ -604,6 +604,7 @@
 EXP_FACTOR = 1.33333333333
 
 def _encodePayloads(message, overhead, paddingPRNG):
+    """DOCDOC"""
     assert overhead in (0, ENC_FWD_OVERHEAD)
     origLength = len(message)
     payload = compress(message)
@@ -619,42 +620,75 @@
         # We pad the payload, and construct a new SingletonPayload,
         # including this payload's size and checksum.
         payload += paddingPRNG.getBytes(paddingLen)
-        return [SingletonPayload(length, Crypto.sha1(payload), payload).pack()]
-
-    # XXXX005 Whiten payload!!!
+        p = SingletonPayload(length, None, payload)
+        p.computeHash()
+        return [ p.pack() ]
 
-    fragCapacity = PAYLOAD_LEN - FRAGMENT_PAYLOAD_OVERHEAD - overhead
-    minFragments = ceilDiv(length, fragCapacity)
-    k = 2
-    while k < minFragments and k < 16:
-        k *= 2
-    nChunks = ceilDiv(minFragments, k)
-    paddingLen = nChunks*fragCapacity*k
-    payload += paddingPRNG.getBytes(paddingLen)
+    # DOCDOC
+    payload = whiten(payload)
+    p = _FragmentationParams(len(payload), overhead)
+    
+    payload += paddingPRNG.getBytes(p.paddingLen)
+    assert len(payload) == p.paddedLen
     chunks = []
-    for i in xrange(nChunks):
-        chunk[i] = payload[fragCapacity*k*i:fragCapacity*k*(i+1)]
-    n = math.ceil(EXP_FACTOR*k)
-    fragments = []
-    fec = getFEC(k,n)
+    for i in xrange(p.nChunks):
+        chunk[i] = payload[i*p.chunkSize:(i+1)*p.chunkSize]
+    del payload
     messageid = getCommonPRNG().getBytes(20)
     
     idx = 0
-    for i in xrange(nChunks):
+    fragments = []
+    for i in xrange(p.nChunks):
         blocks = []
-        for j in xrange(k):
-            blocks[j] = chunks[i][j*fragCapacity:(j+1)*fragCapacity]
+        for j in xrange(p.k):
+            blocks[j] = chunks[i][j*p.fragCapacity:(j+1)*p.fragCapacity]
         chunks[i] = None
-        for j in xrange(n):
-            frag = fec.encode(j, blocks)
-            fragments.append(idx, Crypto.sha1())
-
+        for j in xrange(p.n):
+            frag = p.fec.encode(j, blocks)
+            pyld = FragmentPayload(idx, None, messageid, p.length, frag)
+            pyld.computeHash()
+            fragments.append(pyld.pack())
             idx += 1
+    return fragments
 
+class _FragmentationParams:
+    """DOCDOC"""
+    ## Fields:
+    # k, n, length, fec, chunkSize, fragmentCapacity, dataFragments,
+    # totalFragments, paddingLen, paddedLen
+    def __init__(self, length, overhead):
+        assert overhead in (0, ENC_FWD_OVERHEAD)
+        self.length = length
+        self.fragCapacity = PAYLOAD_LEN - FRAGMENT_PAYLOAD_OVERHEAD - overhead
+        # minimum number of payloads to hold msg, without fragmentation
+        # or padding.
+        minFragments = ceilDiv(length, self.fragCapacity)
+        # Number of data fragments per chunk.
+        self.k = 2
+        while k < minFragments and k < 16:
+            self.k *= 2
+        # Number of chunks.
+        self.nChunks = ceilDiv(minFragments, k)
+        # Data in  a single chunk
+        self.chunkSize = self.fragCapacity * self.k
+        # Length of data to fill chunks
+        self.paddedLen = self.nChunks * self.fragCapacity * self.k
+        # Length of padding needed to fill all chunks with data.
+        self.paddingLen = self.paddedLen - length
+        # Number of total fragments per chunk.
+        self.n = math.ceil(EXP_FACTOR * k)
+        # FEC object
+        self.fec = None
 
-        
-    
+    def getFEC(self):
+        if self.fec is None:
+            self.fec = _getFEC(self.k, self.n)
+        return self.fec
 
+    def getPosition(self, index):
+        """DOCDOC"""
+        chunk, pos = divmod(index, self.n)
+        return chunk, pos
 
 def _encodePayload(payload, overhead, paddingPRNG):
     """Helper: compress a payload, pad it, and add extra fields (size and hash)
@@ -748,3 +782,268 @@
 
     return routing, sizes, totalSize
 
+# ======================================================================
+
+class MismatchedFragment(Exception):
+    pass
+
+class UnneededFragment(Exception):
+    pass
+
+class _FragmentMetadata:
+    def __init__(self, messageid, hash, idx, size, isChunk, chunkNum, overhead,
+                 insertedDate):
+        self.messageid = messageid
+        self.hash = hash
+        self.idx = idx
+        self.size = size
+        self.isChunk = isChunk
+        self.chunkNum = chunkNum
+        self.overhead = overhead
+        self.insertedDate = insertedDate
+
+    def __getstate__(self):
+        return ("V0", self.messageid, self.hash, self.idx, self.size,
+                self.isChunk, self.chunkNum, self.insertedDate)
+
+    def __setstate__(self, o):
+        if state[0] == 'V0':
+            (_, self.messageid, self.hash, self.idx, self.size,
+             self.isChunk, self.chunkNum, self.insertedDate) = state
+        else:
+            raise MixFatalError("Unrecognized fragment state")
+
+class MessageState:
+    def __init__(self, messageid, hash, length, overhead):
+        self.messageid = messageid
+        self.hash = hash
+        self.overhead = overhead
+        # chunkno -> handle,fragmentmeta
+        self.chunks = {} 
+        # chunkno -> idxwithinchunk -> (handle,fragmentmeta)
+        self.fragmentsByChunk = []
+        self.params = _FragmentationParams(length, overhead)
+        for i in xrange(self.params.nChunks):
+            self.fragmentsByChunk.append({})
+        # chunkset: ready chunk num -> 1
+        self.readyChunks = {}
+        
+    def isDone(self):
+        return len(self.chunks) == self.params.nChunks
+
+    def getChunkHandles(self):
+        return [ self.chunks[i][0] for i in xrange(self.params.nChunks) ]
+
+    def addChunk(self, h, fm):
+        # h is handle
+        # fm is fragmentmetadata
+        assert fm.isChunk
+        assert fm.messageid == self.messageid
+        if (fm.size != self.params.length or
+            fm.hash != self.hash or
+            fm.overhead != self.overhead or
+            self.chunks.has_key(fm.chunkNum)):
+            raise MismatchedFragment
+        
+        self.chunks[fm.chunkNum] = (h,fm)
+
+    def addFragment(self, h, fm):
+        # h is handle
+        # fm is fragmentmetadata
+        assert fm.messageid == self.messageid
+
+        if (fm.hash != self.hash or
+            fm.size != self.params.length or
+            fm.overhead != self.overhead):
+            raise MismatchedFragment
+        
+        chunkNum, pos = self.params.getPosition(idx)
+
+        if self.chunks.has_key(chunkNum):
+            raise UnneededFragment
+        
+        if self.fragmentsByChunk[chunkNum].has_key(pos):
+            raise MismatchedFragment
+
+        self.fragmentsByChunk[chunkNum][pos] = (h, fm)
+
+        if len(self.fragmentsByChunk(chunkNum)) >= self.params.k:
+            self.readyChunks[chunkNum] = 1
+
+    def hasReadyChunks(self):
+        return len(self.readyChunks) != 0
+
+    def getReadyChunks(self):
+        """DOCDOC"""
+        # return list of [ (chunkno, [(h, fm)...]) )
+        r = []
+        for chunkno in self.readyChunks.keys():
+            ch = self.fragmentsByChunk[chunkno].values()[:self.params.k]
+            r.append( (chunkno, ch) )
+        return r
+
+class _FragmentDB(mixminion.Filestore.DBBase):
+    def __init__(self, location):
+        mixminion.Filestore.DBBase.__init__(self, location, "fragment")
+    def markStatus(self, msgid, status, today):
+        assert status in ("COMPLETED", "REJECTED")
+        if now is None:
+            now = time.time()
+        self[msgid] = (status, today)
+    def getStatusAndTime(self, msgid):
+        return self.get(msgid, None)
+    def _encodeKey(self, k):
+        return binascii.b2a_hex(k)
+    def _encodeVal(self, v):
+        status, tm = v
+        return "%s-%s"%(
+            {"COMPLETED":"C", "REJECTED":"R"}[status], str(tm))
+    def _decodeVal(self, v):
+        status = {"C":"COMPLETED", "R":"REJECTED"}[v[0]]
+        tm = int(tm[2:])
+        return status, tm
+
+class FragmentPool:
+    """DOCDOC"""
+    ##
+    # messages : map from 
+    def __init__(self, dir):
+        self.store = mixminion.Filestore.StringMetadataStore(dir,create=1,
+                                                             scrub=1)
+        self.log = _FragmentDB(dir+"_db")
+        self.rescan()
+
+    def getState(self, fm):
+        try:
+            return self.states[fm.messageid]
+        except KeyError:
+            state = MessageState(messageid=fm.messageid,
+                                 hash=fm.hash,
+                                 length=fm.size,
+                                 overhead=fm.overhead)
+            self.states[fm.messageid] = state
+            return state
+        
+    def rescan(self):
+        self.store.loadAllMetadata()
+        meta = self.store._metadata_cache
+        self.states = states = {}
+        badMessageIDs = {}
+        unneededHandles = []
+        for h, fm in meta.items():
+            try:
+                mid = fm.messageid
+                if badMessageIDs.has_key(mid):
+                    continue
+                state = self.getState(fm)
+                if fm.isChunk:
+                    state.addChunk(h, fm)
+                else:
+                    state.addFragment(h, fm)
+            except MismatchedFragment:
+                badMessageIDs[mid] = 1
+            except UnneededFragment:
+                unneededHandles.append(h)
+
+        for h in unneededHandles:
+            fm = meta[h]
+            LOG.debug("Removing unneeded fragment %s from message ID %r",
+                      fm.idx, fm.messageid)
+            self.removeMessage(h)
+
+        self._abortMessageIDs(badMessageIDs, today)
+
+    def _abortMessageIDs(self, messageIDSet, today=None):
+        if today is None:
+            today = previousMidnight(time.time())
+        LOG.debug("Removing bogus messages by IDs: %s", messageIDSet.keys())
+        for mid in messageIDSet.keys():
+            self.markStatus(mid, "REJECTED", today)
+        for h, fm in self._metadata_cache.items():
+            if messageIDSet.has_key(fm.messageid):
+                self.removeMessage(h)
+
+    def _getPacketMetadata(self, fragmentPacket):
+        return  _FragmentMetadata(messageid=fragmentPacket.msgID,
+                                  idx=fragmentPacket.index,
+                                  hash=fragmentPacket.hash,
+                                  size=fragmentPacket.msgLen,
+                                  isChunk=0,
+                                  chunkNum=None,
+                                  overhead=fragmentPacket.getOverhead(),
+                                  insertedDate=previousMidnight(now))
+        
+    def addFragment(self, fragmentPacket, now=None):
+        if now is None:
+            now = time.time()
+        today = previousMidnight(now)
+
+        meta = self._getFragmentMetadata(fragmentPacket)
+        h = self.store.queueMessage(fragmentPacket.data)
+        self.store.setMetadata(h, meta)
+
+        state = self.getState(fm)
+        try:
+            state.addFragment(fragmentPacket)
+        except MismatchedFragment:
+            self._abortMessageIDs({ meta.id
+            self.removeMessage(h)
+            # XXXX remove other fragments, mark msgid as bad.
+        except UnneededFragment:
+            LOG.debug("Dropping unneeded fragment %s of message %r",
+                      fragmentPacket.idx, fragmentPacket.msgID)
+            self.removeMessage(h)
+
+    def getReadyMessage(self, msgid):
+        s = self.states.get(msgid)
+        if not s or not s.isDone():
+            return None
+
+        hs = s.getChunkHandles()
+        return "".join([self.state.getMessage(h) for h in hs])
+
+    def deleteMessage(self, msgid):
+        s = self.states.get(msgid)
+        if not s or not s.isDone():
+            return None
+
+        hs = s.getChunkHandles()
+        for h in hs:
+            self.store.removeMessage(h)
+
+    def getReadyMessages(self):
+        return [ msgid
+                 for msgid,state in self.states.items()
+                 if state.isDone() ]
+
+    def unchunkMessages(self):
+        for msgid, state in self.states.items():
+            if not state.hasReadyChunks():
+                continue
+            for chunkno, lst in state.getReadyChunks():
+                vs = []
+                minDate = min([fm.insertedDate for h,fm in lst])
+                for h,fm in lst:
+                    vs.append((state.getPos(fm.index)[1],
+                               self.store.getMessage(h)))
+                chunkText = self.store.params.getFEC().decode(vs)
+                fm2 = _FragmentMetadata(state.mesageid, state.hash,
+                                        state.idx, 1, chunkno, state.overhead,
+                                        minDate)
+                h2 = self.store.queueMessage(chunkText)
+                self.store.setMetadata(h2, fm2)
+                for h,fm in lst:
+                    self.store.removeMessage(h)
+            
+# ======================================================================
+
+_fectab = {}
+
+def _getFEC(k,n):
+    """DOCDOC: Note race condition """
+    try:
+        return _fectab[(k,n)]
+    except KeyError:
+        f = mixminion._minionlib.FEC_generate(k,n)
+        _fectab[(k,n)] = f
+        return f

Index: Crypto.py
===================================================================
RCS file: /home/minion/cvsroot/src/minion/lib/mixminion/Crypto.py,v
retrieving revision 1.50
retrieving revision 1.51
diff -u -d -r1.50 -r1.51
--- Crypto.py	24 Jul 2003 17:37:16 -0000	1.50
+++ Crypto.py	8 Aug 2003 21:42:47 -0000	1.51
@@ -29,6 +29,7 @@
             'pk_encode_public_key', 'pk_encrypt', 'pk_fingerprint',
             'pk_from_modulus', 'pk_generate', 'pk_get_modulus',
             'pk_same_public_key', 'pk_sign', 'prng', 'sha1', 'strxor', 'trng',
+            'unwhiten', 'whiten',
             'AES_KEY_LEN', 'DIGEST_LEN', 'HEADER_SECRET_MODE', 'PRNG_MODE',
             'RANDOM_JUNK_MODE', 'HEADER_ENCRYPT_MODE', 'APPLICATION_KEY_MODE',
             'PAYLOAD_ENCRYPT_MODE', 'HIDE_HEADER_MODE' ]
@@ -181,6 +182,16 @@
     right = ctr_crypt(right, _ml.sha1(left)[:AES_KEY_LEN])
     left = _ml.strxor(left, _ml.sha1("".join((key1,right,key1))))
     return left + right
+
+def whiten(s):
+    """DOCDOC"""
+    keys = Keyset("WHITEN").getLionessKeys("WHITEN")
+    return lioness_encrypt(s, keys)
+
+def unwhiten(s):
+    """DOCDOC"""
+    keys = Keyset("WHITEN").getLionessKeys("WHITEN")
+    return lioness_decrypt(s, keys)
 
 def openssl_seed(count):
     """Seeds the openssl rng with 'count' bytes of real entropy."""

Index: Packet.py
===================================================================
RCS file: /home/minion/cvsroot/src/minion/lib/mixminion/Packet.py,v
retrieving revision 1.53
retrieving revision 1.54
diff -u -d -r1.53 -r1.54
--- Packet.py	28 Jul 2003 01:02:33 -0000	1.53
+++ Packet.py	8 Aug 2003 21:42:47 -0000	1.54
@@ -245,12 +245,12 @@
 # Length of the 'MessageID' field in a fragment payload
 FRAGMENT_MESSAGEID_LEN = 20
 # Maximum number of fragments associated with a given message
-MAX_N_FRAGMENTS = 0x7ffff
+MAX_N_FRAGMENTS = 0x7ffffff
 
 # Number of bytes taken up by header fields in a singleton payload.
 SINGLETON_PAYLOAD_OVERHEAD = 2 + DIGEST_LEN
 # Number of bytes taken up by header fields in a fragment payload.
-FRAGMENT_PAYLOAD_OVERHEAD = 2 + DIGEST_LEN + FRAGMENT_MESSAGEID_LEN + 4
+FRAGMENT_PAYLOAD_OVERHEAD = 3 + DIGEST_LEN + FRAGMENT_MESSAGEID_LEN + 4
 # Number of bytes taken up from OAEP padding in an encrypted forward
 # payload, minus bytes saved by spilling the RSA-encrypted block into the
 # tag, minus the bytes taken by the session key.
@@ -265,9 +265,10 @@
     bit0 = ord(payload[0]) & 0x80
     if bit0:
         # We have a fragment
-        idx, hash, msgID, msgLen = struct.unpack(FRAGMENT_UNPACK_PATTERN,
-                                         payload[:FRAGMENT_PAYLOAD_OVERHEAD])
-        idx &= 0x7f
+        idxhi, idxlo, hash, msgID, msgLen = \
+               struct.unpack(FRAGMENT_UNPACK_PATTERN,
+                             payload[:FRAGMENT_PAYLOAD_OVERHEAD])
+        idx = ((idxhi & 0x7f) << 16) + idxlo
         contents = payload[FRAGMENT_PAYLOAD_OVERHEAD:]
         if msgLen <= len(contents):
             raise ParseError("Payload has an invalid size field")
@@ -284,14 +285,13 @@
 # A singleton payload starts with a 0 bit, 15 bits of size, and a 20-byte hash
 SINGLETON_UNPACK_PATTERN = "!H%ds" % (DIGEST_LEN)
 
-# A fragment payload starts with a 1 bit, a 15-bit packet index, a
+# A fragment payload starts with a 1 bit, a 23-bit packet index, a
 # 20-byte hash, a 20-byte message ID, and 4 bytes of message size.
-FRAGMENT_UNPACK_PATTERN = "!H%ds%dsL" % (DIGEST_LEN, FRAGMENT_MESSAGEID_LEN)
+FRAGMENT_UNPACK_PATTERN = "!BH%ds%dsL" % (DIGEST_LEN, FRAGMENT_MESSAGEID_LEN)
 
 class _Payload:
     pass
 
-
 class SingletonPayload(_Payload):
     """Represents the payload for a standalone mixminion message.
        Fields:  size, hash, data.  (Note that data is padded.)"""
@@ -300,6 +300,10 @@
         self.hash = hash
         self.data = data
 
+    def computeHash(self):
+        """DOCDOC"""
+        self.hash = sha1(self.data)
+
     def isSingleton(self):
         """Returns true; this is a singleton payload."""
         return 1
@@ -333,9 +337,10 @@
         self.data = data
 
     def computeHash(self):
+        """DOCDOC"""
         self.hash = "X"*DIGEST_LEN
         p = self.pack()
-        self.hash = sha1("XXXX")
+        self.hash = sha1(p[23:])
 
     def isSingleton(self):
         """Return false; not a singleton"""
@@ -349,10 +354,14 @@
         assert len(self.data) < self.msgLen < 0x100000000L
         assert (PAYLOAD_LEN - FRAGMENT_PAYLOAD_OVERHEAD - len(self.data)) in \
                (0, ENC_FWD_OVERHEAD)
-        idx = self.index | 0x8000
-        header = struct.pack(FRAGMENT_UNPACK_PATTERN, idx, self.hash,
-                             self.msgID, self.msgLen)
+        idxhi = ((self.index & 0xff0000) >> 16) | 0x80
+        idxlo = self.index & 0x00fffff
+        header = struct.pack(FRAGMENT_UNPACK_PATTERN, idxhi, idxlo,
+                             self.hash, self.msgID, self.msgLen)
         return "%s%s" % (header, self.data)
+
+    def getOverhead(self):
+        return PAYLOAD_LEN - FRAGMENT_PAYLOAD_OVERHEAD - len(self.data)