[Author Prev][Author Next][Thread Prev][Thread Next][Author Index][Thread Index]
[minion-cvs] Untested logic for client-side message reassembly
Update of /home/minion/cvsroot/src/minion/lib/mixminion
In directory moria.mit.edu:/tmp/cvs-serv14961/lib/mixminion
Modified Files:
BuildMessage.py ClientMain.py ClientUtils.py Fragments.py
Log Message:
Untested logic for client-side message reassembly
Index: BuildMessage.py
===================================================================
RCS file: /home/minion/cvsroot/src/minion/lib/mixminion/BuildMessage.py,v
retrieving revision 1.68
retrieving revision 1.69
diff -u -d -r1.68 -r1.69
--- BuildMessage.py 7 Jan 2004 02:50:08 -0000 1.68
+++ BuildMessage.py 16 Feb 2004 22:30:03 -0000 1.69
@@ -333,7 +333,7 @@
#----------------------------------------------------------------------
# MESSAGE DECODING
-def decodePayload(payload, tag, key=None, userKeys=()):
+def decodePayload(payload, tag, key=None, userKeys=(), retNym=None):
"""Given a 28K payload and a 20-byte decoding tag, attempt to decode the
original message. Returns either a SingletonPayload instance, a
FragmentPayload instance, or None.
@@ -347,6 +347,8 @@
If we can successfully decrypt the payload, we return it. If we
might be able to decrypt the payload given more/different keys,
we return None. If the payload is corrupt, we raise MixError.
+
+ DOCDOC retNym
"""
if userKeys is None:
userKeys = []
@@ -378,6 +380,8 @@
p = _decodeStatelessReplyPayload(payload, tag, userKey)
if name:
LOG.info("Decoded reply message to identity %r", name)
+ if retNym is not None:
+ retNym.append(name)
return p
except MixError:
pass
Index: ClientMain.py
===================================================================
RCS file: /home/minion/cvsroot/src/minion/lib/mixminion/ClientMain.py,v
retrieving revision 1.154
retrieving revision 1.155
diff -u -d -r1.154 -r1.155
--- ClientMain.py 7 Feb 2004 07:25:14 -0000 1.154
+++ ClientMain.py 16 Feb 2004 22:30:03 -0000 1.155
@@ -212,9 +212,10 @@
# Initialize PRNG
self.prng = mixminion.Crypto.getCommonPRNG()
self.queue = mixminion.ClientUtils.ClientQueue(os.path.join(userdir, "queue"))
+ self.pool = mixminion.ClientUtils.ClientFragmentPool(os.path.join(userdir, "fragments"))
def _sortPackets(self, packets, shuffle=1):
- """Helper function. Takes a list of tuples of (packet,
+ """Helper function. Takes a list of tuples of (packet,
ServerInfo/routigInforoutingInfo),
groups packets with the same routingInfos, and returns a list of
tuples of (routingInfo, [packet list]).
@@ -238,6 +239,7 @@
self.prng.shuffle(pktList)
return result
+
def sendForwardMessage(self, directory, address, pathSpec, message,
startAt, endAt, forceQueue=0, forceNoQueue=0,
forceNoServerSideFragments=0):
@@ -637,23 +639,33 @@
"""
#XXXX write unit tests
results = []
+ foundAFragment = 0
for msg in parseTextEncodedMessages(s, force=force):
if msg.isOvercompressed() and not force:
LOG.warn("Message is a possible zlib bomb; not uncompressing")
- if msg.isFragment():
- raise UIError("Sorry -- no support yet for client-side defragmentation.")
- elif not msg.isEncrypted():
- results.append(msg.getContents())
+
+ if not msg.isEncrypted():
+ if msg.isFragment():
+ self.pool.addFragment(msg.getContents(), "---")
+ else:
+ results.append(msg.getContents())
else:
assert msg.isEncrypted()
surbKeys = self.keys.getSURBKeys()
+ nym = []
p = mixminion.BuildMessage.decodePayload(msg.getContents(),
tag=msg.getTag(),
- userKeys=surbKeys)
- if p and p.isSingleton():
- results.append(p.getUncompressedContents())
- elif p:
- raise UIError("Sorry; no support yet for client-side defragmentation.")
+ userKeys=surbKeys,
+ rNym=nym)
+ if p:
+ if nym:
+ nym=nym[0]
+ else:
+ nym="default"
+ if p.isSingleton():
+ results.append(p.getUncompressedContents())
+ else:
+ self.pool.addFragment(p,nym)
else:
raise UIError("Unable to decode message")
if isatty and not force:
@@ -1277,7 +1289,7 @@
port = int(addrport[1])
except ValueError:
raise UIError("Invalid port: %r"%addrport[1])
-
+
else:
arg = "%s:48099"%arg
port = 48099
Index: ClientUtils.py
===================================================================
RCS file: /home/minion/cvsroot/src/minion/lib/mixminion/ClientUtils.py,v
retrieving revision 1.19
retrieving revision 1.20
diff -u -d -r1.19 -r1.20
--- ClientUtils.py 2 Feb 2004 07:05:49 -0000 1.19
+++ ClientUtils.py 16 Feb 2004 22:30:03 -0000 1.20
@@ -8,7 +8,8 @@
"""
__all__ = [ 'NoPassword', 'PasswordManager', 'getPassword_term',
- 'getNewPassword_term', 'SURBLog', 'ClientQueue' ]
+ 'getNewPassword_term', 'SURBLog', 'ClientQueue',
+ 'ClientFragmentPool' ]
import binascii
import cPickle
@@ -16,9 +17,11 @@
import os
import sys
import time
+import types
import struct
import mixminion.Filestore
+import mixminion.Packet
from mixminion.Common import LOG, MixError, UIError, ceilDiv, \
createPrivateDir, floorDiv, previousMidnight, readFile, \
@@ -853,3 +856,83 @@
mixminion.ClientMain.clientUnlock()
self.metadataLoaded = 1
+
+# ----------------------------------------------------------------------
+
+class ClientFragmentPool:
+ """DOCDOC"""
+ def __init__(self, directory):
+ createPrivateDir(directory)
+ self.dir = directory
+ self.pool = None
+
+ def __getPool(self):
+ if self.pool is None:
+ import mixminion.Fragments
+ self.pool = mixminion.Fragments.FragmentPool(self.directory)
+ return self.pool
+
+ def close(self):
+ if self.pool is not None:
+ self.pool.close()
+ self.pool = None
+
+ def addFragment(self, fragment, nym=None):
+ """fragment is instance of fragmentPayload or is a string payload
+ DOCDOC"""
+ pool = self.__getPool()
+ if isinstance(fragment, types.StringType):
+ try:
+ fragment = mixminion.Packet.parsePayload(fragment)
+ except ParseError, s:
+ raise UIError("Corrupted fragment payload: %s"%s)
+ if not fragment.isFragment():
+ raise UIError("Non-fragment payload marked as a fragment.")
+
+ assert isinstance(fragment, mixminion.Packet.FragmentPayload)
+
+ return pool.addFragment(fragment, nym=nym, verbose=1)
+
+ def process(self):
+ pool = self.__getPool()
+ pool.unchunkMessages()
+ pool.cleanQueue()
+
+ def expireMessages(self, cutoff):
+ pool = self.__getPool()
+ pool.expireMessages(cutoff)
+ self.cleanQueue()
+
+ def getMessage(self, msgid):
+ pool = self.__getPool()
+ msg = pool.getReadyMessage(msgid)
+ if msg is not None:
+ return msg
+
+ state = pool.getStateByMsgID(msgid)
+ if state is None:
+ raise UIError("No such message as '%s'" % msgid)
+ elif not state.isDone():
+ raise UIError("Message '%s' is still missing fragments."%msgid)
+ else:
+ raise MixFatalError("Can't decode message %s; I don't know why!")
+
+ def removeMessages(self, msgids):
+ pool = self.__getPool()
+ for i in msgids:
+ if pool.getStateByMsgID(m) is None:
+ raise UIError("No such message as %s")
+ pool._deleteMessageIDs(msgids, "?")
+ pool.cleanQueue()
+
+ def listMessages(self):
+ pool = self.__getPool()
+ return pool.listMessages()
+
+ def formatMessageList(self):
+ msgs = self.listMessages()
+ result = []
+ for msgid in msgs.keys():
+ result.append(msgid+(": to <%(nym)s>. %(size)s bytes (%(have)s/%(need)s packets received)"
+ % msgs[msgid]))
+ return result
Index: Fragments.py
===================================================================
RCS file: /home/minion/cvsroot/src/minion/lib/mixminion/Fragments.py,v
retrieving revision 1.11
retrieving revision 1.12
diff -u -d -r1.11 -r1.12
--- Fragments.py 3 Jan 2004 07:35:23 -0000 1.11
+++ Fragments.py 16 Feb 2004 22:30:03 -0000 1.12
@@ -145,7 +145,7 @@
del self.store
del self.states
- def addFragment(self, fragmentPacket, nym=None, now=None):
+ def addFragment(self, fragmentPacket, nym=None, now=None, verbose=0):
"""Given an instance of mixminion.Packet.FragmentPayload, record
the fragment if appropriate and update the state of the
fragment pool if necessary.
@@ -156,7 +156,13 @@
attack where we send 2 fragments to 'MarkTwain' and 2
fragments to 'SClemens', and see that the message is
reconstructed.]
+
+ DOCDOC return, verbose
"""
+ if verbose:
+ say = LOG.info
+ else:
+ say = LOG.debug
if now is None:
now = time.time()
today = previousMidnight(now)
@@ -165,9 +171,9 @@
# drop this packet.
s = self.db.getStatusAndTime(fragmentPacket.msgID)
if s:
- LOG.debug("Dropping fragment of %s message %r",
- s[0].lower(), disp64(fragmentPacket.msgID,13))
- return
+ say("Dropping fragment of %s message %r",
+ s[0].lower(), disp64(fragmentPacket.msgID,12))
+ return None
# Otherwise, create a new metadata object for this fragment...
meta = FragmentMetadata(messageid=fragmentPacket.msgID,
@@ -188,17 +194,21 @@
h = self.store.queueMessageAndMetadata(fragmentPacket.data, meta)
# And *now* update the message state.
state.addFragment(h, meta)
- LOG.debug("Stored fragment %s of message %s",
- fragmentPacket.index, disp64(fragmentPacket.msgID,13))
- except MismatchedFragment:
+ say("Stored fragment %s of message %s",
+ fragmentPacket.index, disp64(fragmentPacket.msgID,12))
+ return fragmentPacket.msgID
+ except MismatchedFragment, s:
# Remove the other fragments, mark msgid as bad.
- LOG.warn("Found inconsistent fragment %s in message %s",
- fragmentPacket.index, disp64(fragmentPacket.msgID,13))
+ LOG.warn("Found inconsistent fragment %s in message %s: %s",
+ fragmentPacket.index, disp64(fragmentPacket.msgID,12),
+ s)
self._deleteMessageIDs({ meta.messageid : 1}, "REJECTED", now)
+ return None
except UnneededFragment:
# Discard this fragment; we don't need it.
LOG.debug("Dropping unneeded fragment %s of message %s",
- fragmentPacket.index, disp64(fragmentPacket.msgID,13))
+ fragmentPacket.index, disp64(fragmentPacket.msgID,12))
+ return None
def getReadyMessage(self, msgid):
"""Return the complete message associated with messageid 'msgid'.
@@ -310,9 +320,9 @@
undeliverable.
messageIDSet -- a map from 20-byte messageID to 1.
- why -- 'REJECTED' or 'COMPLETED'.
+ why -- 'REJECTED' or 'COMPLETED' or '?'
"""
- assert why in ("REJECTED", "COMPLETED")
+ assert why in ("REJECTED", "COMPLETED", "?")
if not messageIDSet:
return
if today is None:
@@ -321,9 +331,12 @@
if why == 'REJECTED':
LOG.debug("Removing bogus messages by IDs: %s",
messageIDSet.keys())
- else:
+ elif why == "COMPLETED":
LOG.debug("Removing completed messages by IDs: %s",
messageIDSet.keys())
+ else:
+ LOG.debug("Removing messages by IDs: %s",
+ messageIDSet.keys())
for mid in messageIDSet.keys():
self.db.markStatus(mid, why, today)
try:
@@ -348,6 +361,33 @@
self.states[fm.messageid] = state
return state
+ def getStateByMsgID(self, msgid):
+ """DOCDOC"""
+ if len(msgid) == 20:
+ return self.state.get(msgid,None)
+ elif len(msgid) == 12:
+ target = binascii.a2b_base64(msgid)
+ for i in self.states.keys():
+ if i.startswith(target):
+ return self.states[i]
+ return None
+
+ def listMessages(self):
+ """DOCDOC
+ pretty-id => { 'size':x, 'nym':x, 'have':x, 'need':x }
+ """
+ result = {}
+ for msgid in self.states.keys():
+ state = self.states[msgid]
+ have, need = state.getCompleteness()
+ result[disp64(msgid,12)] = {
+ 'size' : state.params.length,
+ 'nym' : state.nym,
+ 'have' : have,
+ 'need' : need
+ }
+ return result
+
# ======================================================================
class MismatchedFragment(Exception):
@@ -451,19 +491,28 @@
assert self.isDone()
return [ self.chunks[i][0] for i in xrange(self.params.nChunks) ]
+ def getCompleteness(self):
+ """(have,need) DOCDOC"""
+ need = self.params.k * self.params.nChunks
+ have = self.params.k * len(self.chunks)
+ for d in self.fragmentsByChunk.values():
+ have += min(len(d),self.params.k)
+ return have, need
+
def addChunk(self, h, fm):
"""Register a chunk with handle h and FragmentMetadata fm. If the
chunk is inconsistent with other fragments of this message,
raise MismatchedFragment."""
assert fm.isChunk
assert fm.messageid == self.messageid
- if (fm.size != self.params.length or
- fm.overhead != self.overhead or
- self.chunks.has_key(fm.chunkNum)):
- raise MismatchedFragment
-
+ if fm.size != self.params.length:
+ raise MismatchedFragment("Mismatched message length")
+ if fm.overhead != self.overhead:
+ raise MismatchedFragment("Mismatched packet overhead")
+ if self.chunks.has_key(fm.chunkNum):
+ raise MismatchedFragment("Duplicate chunks")
if fm.nym != self.nym:
- raise MismatchedFragment
+ raise MismatchedFragment("Fragments received for differing identities")
if self.inserted > fm.insertedDate:
self.inserted = fm.insertedDate
@@ -485,9 +534,12 @@
fragment--just raise exceptions as needed."""
assert fm.messageid == self.messageid
- if (fm.size != self.params.length or
- fm.overhead != self.overhead):
- raise MismatchedFragment
+ if fm.size != self.params.length:
+ raise MismatchedFragment("mismatched message size")
+ if fm.overhead != self.overhead:
+ raise MismatchedFragment("mismatched fragment payload size")
+ if fm.nym != self.nym:
+ raise MismatchedFragment("mismatched identities")
chunkNum, pos = self.params.getPosition(fm.idx)
if chunkNum >= self.params.nChunks:
@@ -498,7 +550,7 @@
raise UnneededFragment
if self.fragmentsByChunk[chunkNum].has_key(pos):
- raise MismatchedFragment
+ raise MismatchedFragment("multiple fragments for one position")
if noop:
return