[Author Prev][Author Next][Thread Prev][Thread Next][Author Index][Thread Index]

[minion-cvs] Untested logic for client-side message reassembly



Update of /home/minion/cvsroot/src/minion/lib/mixminion
In directory moria.mit.edu:/tmp/cvs-serv14961/lib/mixminion

Modified Files:
	BuildMessage.py ClientMain.py ClientUtils.py Fragments.py 
Log Message:
Untested logic for client-side message reassembly

Index: BuildMessage.py
===================================================================
RCS file: /home/minion/cvsroot/src/minion/lib/mixminion/BuildMessage.py,v
retrieving revision 1.68
retrieving revision 1.69
diff -u -d -r1.68 -r1.69
--- BuildMessage.py	7 Jan 2004 02:50:08 -0000	1.68
+++ BuildMessage.py	16 Feb 2004 22:30:03 -0000	1.69
@@ -333,7 +333,7 @@
 
 #----------------------------------------------------------------------
 # MESSAGE DECODING
-def decodePayload(payload, tag, key=None, userKeys=()):
+def decodePayload(payload, tag, key=None, userKeys=(), retNym=None):
     """Given a 28K payload and a 20-byte decoding tag, attempt to decode the
        original message.  Returns either a SingletonPayload instance, a
        FragmentPayload instance, or None.
@@ -347,6 +347,8 @@
        If we can successfully decrypt the payload, we return it.  If we
        might be able to decrypt the payload given more/different keys,
        we return None.  If the payload is corrupt, we raise MixError.
+
+       DOCDOC retNym
     """
     if userKeys is None:
         userKeys = []
@@ -378,6 +380,8 @@
                 p = _decodeStatelessReplyPayload(payload, tag, userKey)
                 if name:
                     LOG.info("Decoded reply message to identity %r", name)
+                if retNym is not None:
+                    retNym.append(name)
                 return p
             except MixError:
                 pass

Index: ClientMain.py
===================================================================
RCS file: /home/minion/cvsroot/src/minion/lib/mixminion/ClientMain.py,v
retrieving revision 1.154
retrieving revision 1.155
diff -u -d -r1.154 -r1.155
--- ClientMain.py	7 Feb 2004 07:25:14 -0000	1.154
+++ ClientMain.py	16 Feb 2004 22:30:03 -0000	1.155
@@ -212,9 +212,10 @@
         # Initialize PRNG
         self.prng = mixminion.Crypto.getCommonPRNG()
         self.queue = mixminion.ClientUtils.ClientQueue(os.path.join(userdir, "queue"))
+        self.pool = mixminion.ClientUtils.ClientFragmentPool(os.path.join(userdir, "fragments"))
 
     def _sortPackets(self, packets, shuffle=1):
-        """Helper function.  Takes a list of tuples of (packet, 
+        """Helper function.  Takes a list of tuples of (packet,
            ServerInfo/routigInforoutingInfo),
            groups packets with the same routingInfos, and returns a list of
            tuples of (routingInfo, [packet list]).
@@ -238,6 +239,7 @@
                 self.prng.shuffle(pktList)
         return result
 
+
     def sendForwardMessage(self, directory, address, pathSpec, message,
                            startAt, endAt, forceQueue=0, forceNoQueue=0,
                            forceNoServerSideFragments=0):
@@ -637,23 +639,33 @@
         """
         #XXXX write unit tests
         results = []
+        foundAFragment = 0
         for msg in parseTextEncodedMessages(s, force=force):
             if msg.isOvercompressed() and not force:
                 LOG.warn("Message is a possible zlib bomb; not uncompressing")
-            if msg.isFragment():
-                raise UIError("Sorry -- no support yet for client-side defragmentation.")
-            elif not msg.isEncrypted():
-                results.append(msg.getContents())
+
+            if not msg.isEncrypted():
+                if msg.isFragment():
+                    self.pool.addFragment(msg.getContents(), "---")
+                else:
+                    results.append(msg.getContents())
             else:
                 assert msg.isEncrypted()
                 surbKeys = self.keys.getSURBKeys()
+                nym = []
                 p = mixminion.BuildMessage.decodePayload(msg.getContents(),
                                                          tag=msg.getTag(),
-                                                         userKeys=surbKeys)
-                if p and p.isSingleton():
-                    results.append(p.getUncompressedContents())
-                elif p:
-                    raise UIError("Sorry; no support yet for client-side defragmentation.")
+                                                         userKeys=surbKeys,
+                                                         rNym=nym)
+                if p:
+                    if nym:
+                        nym=nym[0]
+                    else:
+                        nym="default"
+                    if p.isSingleton():
+                        results.append(p.getUncompressedContents())
+                    else:
+                        self.pool.addFragment(p,nym)
                 else:
                     raise UIError("Unable to decode message")
         if isatty and not force:
@@ -1277,7 +1289,7 @@
                     port = int(addrport[1])
                 except ValueError:
                     raise UIError("Invalid port: %r"%addrport[1])
-                
+
             else:
                 arg = "%s:48099"%arg
                 port = 48099

Index: ClientUtils.py
===================================================================
RCS file: /home/minion/cvsroot/src/minion/lib/mixminion/ClientUtils.py,v
retrieving revision 1.19
retrieving revision 1.20
diff -u -d -r1.19 -r1.20
--- ClientUtils.py	2 Feb 2004 07:05:49 -0000	1.19
+++ ClientUtils.py	16 Feb 2004 22:30:03 -0000	1.20
@@ -8,7 +8,8 @@
    """
 
 __all__ = [ 'NoPassword', 'PasswordManager', 'getPassword_term',
-            'getNewPassword_term', 'SURBLog', 'ClientQueue' ]
+            'getNewPassword_term', 'SURBLog', 'ClientQueue',
+            'ClientFragmentPool' ]
 
 import binascii
 import cPickle
@@ -16,9 +17,11 @@
 import os
 import sys
 import time
+import types
 import struct
 
 import mixminion.Filestore
+import mixminion.Packet
 
 from mixminion.Common import LOG, MixError, UIError, ceilDiv, \
      createPrivateDir, floorDiv, previousMidnight, readFile, \
@@ -853,3 +856,83 @@
             mixminion.ClientMain.clientUnlock()
 
         self.metadataLoaded = 1
+
+# ----------------------------------------------------------------------
+
+class ClientFragmentPool:
+    """DOCDOC"""
+    def __init__(self, directory):
+        createPrivateDir(directory)
+        self.dir = directory
+        self.pool = None
+
+    def __getPool(self):
+        if self.pool is None:
+            import mixminion.Fragments
+            self.pool = mixminion.Fragments.FragmentPool(self.directory)
+        return self.pool
+
+    def close(self):
+        if self.pool is not None:
+            self.pool.close()
+            self.pool = None
+
+    def addFragment(self, fragment, nym=None):
+        """fragment is instance of fragmentPayload or is a string payload
+           DOCDOC"""
+        pool = self.__getPool()
+        if isinstance(fragment, types.StringType):
+            try:
+                fragment = mixminion.Packet.parsePayload(fragment)
+            except ParseError, s:
+                raise UIError("Corrupted fragment payload: %s"%s)
+            if not fragment.isFragment():
+                raise UIError("Non-fragment payload marked as a fragment.")
+
+        assert isinstance(fragment, mixminion.Packet.FragmentPayload)
+
+        return pool.addFragment(fragment, nym=nym, verbose=1)
+
+    def process(self):
+        pool = self.__getPool()
+        pool.unchunkMessages()
+        pool.cleanQueue()
+
+    def expireMessages(self, cutoff):
+        pool = self.__getPool()
+        pool.expireMessages(cutoff)
+        self.cleanQueue()
+
+    def getMessage(self, msgid):
+        pool = self.__getPool()
+        msg = pool.getReadyMessage(msgid)
+        if msg is not None:
+            return msg
+
+        state = pool.getStateByMsgID(msgid)
+        if state is None:
+            raise UIError("No such message as '%s'" % msgid)
+        elif not state.isDone():
+            raise UIError("Message '%s' is still missing fragments."%msgid)
+        else:
+            raise MixFatalError("Can't decode message %s; I don't know why!")
+
+    def removeMessages(self, msgids):
+        pool = self.__getPool()
+        for i in msgids:
+            if pool.getStateByMsgID(m) is None:
+                raise UIError("No such message as %s")
+        pool._deleteMessageIDs(msgids, "?")
+        pool.cleanQueue()
+
+    def listMessages(self):
+        pool = self.__getPool()
+        return pool.listMessages()
+
+    def formatMessageList(self):
+        msgs = self.listMessages()
+        result = []
+        for msgid in msgs.keys():
+            result.append(msgid+(": to <%(nym)s>. %(size)s bytes (%(have)s/%(need)s packets received)"
+                                 % msgs[msgid]))
+        return result

Index: Fragments.py
===================================================================
RCS file: /home/minion/cvsroot/src/minion/lib/mixminion/Fragments.py,v
retrieving revision 1.11
retrieving revision 1.12
diff -u -d -r1.11 -r1.12
--- Fragments.py	3 Jan 2004 07:35:23 -0000	1.11
+++ Fragments.py	16 Feb 2004 22:30:03 -0000	1.12
@@ -145,7 +145,7 @@
         del self.store
         del self.states
 
-    def addFragment(self, fragmentPacket, nym=None, now=None):
+    def addFragment(self, fragmentPacket, nym=None, now=None, verbose=0):
         """Given an instance of mixminion.Packet.FragmentPayload, record
            the fragment if appropriate and update the state of the
            fragment pool if necessary.
@@ -156,7 +156,13 @@
                   attack where we send 2 fragments to 'MarkTwain' and 2
                   fragments to 'SClemens', and see that the message is
                   reconstructed.]
+
+           DOCDOC return, verbose
         """
+        if verbose:
+            say = LOG.info
+        else:
+            say = LOG.debug
         if now is None:
             now = time.time()
         today = previousMidnight(now)
@@ -165,9 +171,9 @@
         # drop this packet.
         s = self.db.getStatusAndTime(fragmentPacket.msgID)
         if s:
-            LOG.debug("Dropping fragment of %s message %r",
-                      s[0].lower(), disp64(fragmentPacket.msgID,13))
-            return
+            say("Dropping fragment of %s message %r",
+                s[0].lower(), disp64(fragmentPacket.msgID,12))
+            return None
 
         # Otherwise, create a new metadata object for this fragment...
         meta = FragmentMetadata(messageid=fragmentPacket.msgID,
@@ -188,17 +194,21 @@
             h = self.store.queueMessageAndMetadata(fragmentPacket.data, meta)
             # And *now* update the message state.
             state.addFragment(h, meta)
-            LOG.debug("Stored fragment %s of message %s",
-                      fragmentPacket.index, disp64(fragmentPacket.msgID,13))
-        except MismatchedFragment:
+            say("Stored fragment %s of message %s",
+                fragmentPacket.index, disp64(fragmentPacket.msgID,12))
+            return fragmentPacket.msgID
+        except MismatchedFragment, s:
             # Remove the other fragments, mark msgid as bad.
-            LOG.warn("Found inconsistent fragment %s in message %s",
-                      fragmentPacket.index, disp64(fragmentPacket.msgID,13))
+            LOG.warn("Found inconsistent fragment %s in message %s: %s",
+                     fragmentPacket.index, disp64(fragmentPacket.msgID,12),
+                     s)
             self._deleteMessageIDs({ meta.messageid : 1}, "REJECTED", now)
+            return None
         except UnneededFragment:
             # Discard this fragment; we don't need it.
             LOG.debug("Dropping unneeded fragment %s of message %s",
-                      fragmentPacket.index, disp64(fragmentPacket.msgID,13))
+                      fragmentPacket.index, disp64(fragmentPacket.msgID,12))
+            return None
 
     def getReadyMessage(self, msgid):
         """Return the complete message associated with messageid 'msgid'.
@@ -310,9 +320,9 @@
            undeliverable.
 
               messageIDSet -- a map from 20-byte messageID to 1.
-              why -- 'REJECTED' or 'COMPLETED'.
+              why -- 'REJECTED' or 'COMPLETED' or '?'
         """
-        assert why in ("REJECTED", "COMPLETED")
+        assert why in ("REJECTED", "COMPLETED", "?")
         if not messageIDSet:
             return
         if today is None:
@@ -321,9 +331,12 @@
         if why == 'REJECTED':
             LOG.debug("Removing bogus messages by IDs: %s",
                       messageIDSet.keys())
-        else:
+        elif why == "COMPLETED":
             LOG.debug("Removing completed messages by IDs: %s",
                       messageIDSet.keys())
+        else:
+            LOG.debug("Removing messages by IDs: %s",
+                      messageIDSet.keys())
         for mid in messageIDSet.keys():
             self.db.markStatus(mid, why, today)
             try:
@@ -348,6 +361,33 @@
             self.states[fm.messageid] = state
             return state
 
+    def getStateByMsgID(self, msgid):
+        """DOCDOC"""
+        if len(msgid) == 20:
+            return self.state.get(msgid,None)
+        elif len(msgid) == 12:
+            target = binascii.a2b_base64(msgid)
+            for i in self.states.keys():
+                if i.startswith(target):
+                    return self.states[i]
+        return None
+
+    def listMessages(self):
+        """DOCDOC
+           pretty-id => { 'size':x, 'nym':x, 'have':x, 'need':x }
+        """
+        result = {}
+        for msgid in self.states.keys():
+            state = self.states[msgid]
+            have, need = state.getCompleteness()
+            result[disp64(msgid,12)] = {
+                'size' : state.params.length,
+                'nym' : state.nym,
+                'have' : have,
+                'need' : need
+                }
+        return result
+
 # ======================================================================
 
 class MismatchedFragment(Exception):
@@ -451,19 +491,28 @@
         assert self.isDone()
         return [ self.chunks[i][0] for i in xrange(self.params.nChunks) ]
 
+    def getCompleteness(self):
+        """(have,need) DOCDOC"""
+        need = self.params.k * self.params.nChunks
+        have = self.params.k * len(self.chunks)
+        for d in self.fragmentsByChunk.values():
+            have += min(len(d),self.params.k)
+        return have, need
+
     def addChunk(self, h, fm):
         """Register a chunk with handle h and FragmentMetadata fm.  If the
            chunk is inconsistent with other fragments of this message,
            raise MismatchedFragment."""
         assert fm.isChunk
         assert fm.messageid == self.messageid
-        if (fm.size != self.params.length or
-            fm.overhead != self.overhead or
-            self.chunks.has_key(fm.chunkNum)):
-            raise MismatchedFragment
-
+        if fm.size != self.params.length:
+            raise MismatchedFragment("Mismatched message length")
+        if fm.overhead != self.overhead:
+            raise MismatchedFragment("Mismatched packet overhead")
+        if self.chunks.has_key(fm.chunkNum):
+            raise MismatchedFragment("Duplicate chunks")
         if fm.nym != self.nym:
-            raise MismatchedFragment
+            raise MismatchedFragment("Fragments received for differing identities")
 
         if self.inserted > fm.insertedDate:
             self.inserted = fm.insertedDate
@@ -485,9 +534,12 @@
            fragment--just raise exceptions as needed."""
         assert fm.messageid == self.messageid
 
-        if (fm.size != self.params.length or
-            fm.overhead != self.overhead):
-            raise MismatchedFragment
+        if fm.size != self.params.length:
+            raise MismatchedFragment("mismatched message size")
+        if fm.overhead != self.overhead:
+            raise MismatchedFragment("mismatched fragment payload size")
+        if fm.nym != self.nym:
+            raise MismatchedFragment("mismatched identities")
 
         chunkNum, pos = self.params.getPosition(fm.idx)
         if chunkNum >= self.params.nChunks:
@@ -498,7 +550,7 @@
             raise UnneededFragment
 
         if self.fragmentsByChunk[chunkNum].has_key(pos):
-            raise MismatchedFragment
+            raise MismatchedFragment("multiple fragments for one position")
 
         if noop:
             return