[Author Prev][Author Next][Thread Prev][Thread Next][Author Index][Thread Index]
[minion-cvs] More debugging and testing for fragment pools; also doc...
Update of /home/minion/cvsroot/src/minion/lib/mixminion
In directory moria.mit.edu:/tmp/cvs-serv31037/lib/mixminion
Modified Files:
Fragments.py Packet.py test.py
Log Message:
More debugging and testing for fragment pools; also document Fragments module
Index: Fragments.py
===================================================================
RCS file: /home/minion/cvsroot/src/minion/lib/mixminion/Fragments.py,v
retrieving revision 1.3
retrieving revision 1.4
diff -u -d -r1.3 -r1.4
--- Fragments.py 18 Aug 2003 00:41:10 -0000 1.3
+++ Fragments.py 18 Aug 2003 05:11:55 -0000 1.4
@@ -17,14 +17,30 @@
__all__ = [ "FragmentPool", "FragmentationParams" ]
-MAX_FRAGMENTS_PER_CHUNK = 32
-EXP_FACTOR = 1.33333333333
+# Largest number of allowed fragments in a single chunk. Must be a power
+# of two.
+MAX_FRAGMENTS_PER_CHUNK = 16
+# Minimum proportion of extra packets to add to each chunk.
+EXP_FACTOR = 1.3333333333333333
class FragmentationParams:
- """DOCDOC"""
+ """Class to track the padding, chunking, and fragmentation required
+ for a message of a given length to be packed into fragments of a
+ given capacity."""
## Fields:
- # k, n, length, fec, chunkSize, fragmentCapacity, dataFragments,
- # totalFragments, paddingLen, paddedLen
+ # length -- size (in octets) of the original message.
+ # k -- number of input packets for each chunk (also number of packets
+ # from a chunk required to reconstruct it.)
+ # n -- number of output packets for each chunk.
+ # fragmentCapacity -- number of bytes we can fit in a single fragment.
+ # (28KB - overhead)
+ # chunkSize -- number of input bytes in a single chunk. Equal to
+ # k*fragmentCapacity.
+ # nChunks -- number of total chunks in message.
+ # paddingLen -- bytes added to message before fragmentation
+ # paddedLen -- length of message after padding; equal to chunkSize*nChunks
+ # fec -- mixminion._minionlib.FEC object to encode/decode chunks;
+ # lazy-initialized by getFEC()
def __init__(self, length, overhead):
assert overhead in (0, ENC_FWD_OVERHEAD)
self.length = length
@@ -35,7 +51,7 @@
assert minFragments >= 2
# Number of data fragments per chunk.
self.k = 2
- while self.k < minFragments and self.k < 16:
+ while self.k < minFragments and self.k < MAX_FRAGMENTS_PER_CHUNK:
self.k *= 2
# Number of chunks.
self.nChunks = ceilDiv(minFragments, self.k)
@@ -51,16 +67,23 @@
self.fec = None
def getFEC(self):
+ """Return a FEC object to fragment or defragment messages with
+ these parameters"""
if self.fec is None:
self.fec = _getFEC(self.k, self.n)
return self.fec
def getPosition(self, index):
- """DOCDOC"""
+ """Return a chunk,index-within-chunk tuple for a packet with index
+ 'index'"""
chunk, pos = divmod(index, self.n)
return chunk, pos
def getFragments(self, s, paddingPRNG=None):
+ """Given a string of length self.length, whiten it, pad it,
+ and fragmment it. Return a list of the fragments, in order.
+ (Note -- after building the fragment packets, be sure to shuffle
+ them into a random order.)"""
if paddingPRNG is None:
paddingPRNG = getCommonPRNG()
@@ -87,73 +110,202 @@
return fragments
# ======================================================================
-#DOCDOC this entire section
-
class FragmentPool:
- """DOCDOC"""
- ##
- # messages : map from
+ """Class to hold and manage fragmented messages as they are
+ reconstructed."""
+ ## Fields:
+ # states -- map from messageid to MessageState. Reconstructed by
+ # rescan().
+ # db -- instance of FragmentDB.
+ # store -- instance of StringMetadataStore. The messages are either
+ # the contents of invidual fragments or reconstructed chunks.
+ # The metadata are instances of FragmentMetadata.
def __init__(self, dir):
- self.store = mixminion.Filestore.StringMetadataStore(dir,create=1,
- scrub=1)
- self.db = _FragmentDB(dir+"_db")
+ """Open a FragmentPool storing fragments in 'dir' and records of
+ old messages in 'dir_db'.
+ """
+ self.store = mixminion.Filestore.StringMetadataStore(
+ dir,create=1,scrub=1)
+ self.db = FragmentDB(dir+"_db")
self.rescan()
def sync(self):
+ """Flush pending changes to disk."""
self.db.sync()
def close(self):
+ """Release open resources for this pool."""
self.db.close()
+ del self.db
+ del self.store
+ del self.states
- def getState(self, fm):
+ def addFragment(self, fragmentPacket, nym=None, now=None):
+ """Given an instance of mixminion.Packet.FragmentPayload, record
+ the fragment if appropriate and update the state of the
+ fragment pool if necessary.
+
+ fragmentPacket -- the new fragment to add.
+ nym -- a string representing the identity that received this
+ fragment. [Tracking nyms is important, to prevent an
+ attack where we send 2 fragments to 'MarkTwain' and 2
+ fragments to 'SClemens', and see that the message is
+ reconstructed.]
+ """
+ if now is None:
+ now = time.time()
+ today = previousMidnight(now)
+
+ # If the message has already been rejected or completed, we can
+ # drop this packet.
+ s = self.db.getStatusAndTime(fragmentPacket.msgID)
+ if s:
+ LOG.debug("Dropping fragment of %s message %r",
+ s[0].lower(), fragmentPacket.msgID)
+ return
+
+ # Otherwise, create a new metadata object for this fragment...
+ meta = FragmentMetadata(messageid=fragmentPacket.msgID,
+ idx=fragmentPacket.index,
+ size=fragmentPacket.msgLen,
+ isChunk=0,
+ chunkNum=None,
+ overhead=fragmentPacket.getOverhead(),
+ insertedDate=previousMidnight(now),
+ nym=nym)
+ # ... and allocate or find the MessageState for this message.
+ state = self._getState(meta)
try:
- return self.states[fm.messageid]
- except KeyError:
- state = MessageState(messageid=fm.messageid,
- hash=fm.hash,
- length=fm.size,
- overhead=fm.overhead)
- self.states[fm.messageid] = state
- return state
+ # Check whether we can/should add this message, but do not
+ # add it.
+ state.addFragment(None, meta, noop=1)
+ # No exception was thrown; queue the message.
+ h = self.store.queueMessageAndMetadata(fragmentPacket.data, meta)
+ # And *now* update the message state.
+ state.addFragment(h, meta)
+ except MismatchedFragment:
+ # Remove the other fragments, mark msgid as bad.
+ self._deleteMessageIDs({ meta.messageid : 1}, "REJECTED", now)
+ except UnneededFragment:
+ # Discard this fragment; we don't need it.
+ LOG.debug("Dropping unneeded fragment %s of message %r",
+ fragmentPacket.index, fragmentPacket.msgID)
+
+ def getReadyMessage(self, msgid):
+ """Return the complete message associated with messageid 'msgid'.
+ (If no such complete message is found, return None.) The
+ resulting message is unwhitened, but not uncompressed."""
+ s = self.states.get(msgid)
+ if not s or not s.isDone():
+ return None
+
+ hs = s.getChunkHandles()
+ msg = "".join([self.store.messageContents(h) for h in hs])
+ msg = unwhiten(msg[:s.params.length])
+ return msg
+
+ def markMessageCompleted(self, msgid, rejected=0):
+ """Release all resources associated with the messageid 'msgid', and
+ reject future packets for that messageid. If 'rejected', the
+ message has been abandoned and not sent; otherwise, the message
+ has been sent.
+ """
+ s = self.states.get(msgid)
+ if not s or not s.isDone():
+ return None
+ if rejected:
+ self._deleteMessageIDs({msgid: 1}, "REJECTED")
+ else:
+ self._deleteMessageIDs({msgid: 1}, "COMPLETED")
+
+ def listReadyMessages(self):
+ """Return a list of all messageIDs that have been completely
+ reconstructed."""
+ return [ msgid
+ for msgid,state in self.states.items()
+ if state.isDone() ]
+
+ def unchunkMessages(self):
+ """If any messages are ready for partial or full reconstruction,
+ reconstruct as many of their chunks as possible."""
+ for msgid, state in self.states.items():
+ if not state.hasReadyChunks():
+ continue
+ state.reconstruct(self.store)
+
+ def expireMessages(self, cutoff):
+ """Remove all pending messages that were first inserted before
+ 'cutoff'. """
+ expiredMessageIDs = {}
+ for s in self.states.values():
+ if s.inserted < cutoff:
+ expiredMessageIDs[s.messageid] = 1
+ self._deleteMessageIDs(expiredMessageIDs, "REJECTED")
def rescan(self):
+ """Check all fragment metadata objects on disk, and reconstruct our
+ internal view of message states.
+ """
+ # Delete all internal state; reload FragmentMetadatas from disk.
self.store.loadAllMetadata(lambda: None)
meta = self.store._metadata_cache
self.states = states = {}
- badMessageIDs = {}
- unneededHandles = []
+ badMessageIDs = {} # map from bad messageID to 1
+ unneededHandles = [] # list of handles that aren't needed.
for h, fm in meta.items():
if not fm:
LOG.debug("Removing fragment %s with missing metadata", h)
self.store.removeMessage(h)
+ continue
try:
mid = fm.messageid
if badMessageIDs.has_key(mid):
+ # We've already rejected fragments associated with this ID.
continue
- state = self.getState(fm)
+ # All is well; try to register the fragment/chunk. If it's
+ # redundant or inconsistent, raise an exception.
+ state = self._getState(fm)
if fm.isChunk:
state.addChunk(h, fm)
else:
state.addFragment(h, fm)
except MismatchedFragment:
+ # Mark the message ID for this fragment as inconsistent.
badMessageIDs[mid] = 1
except UnneededFragment:
+ LOG.warn("Found redundant fragment %s in pool", h)
+ # Remember that this message is unneeded.
unneededHandles.append(h)
+ # Check for fragments superseded by chunks -- those are unneeded too.
+ for s in self.states.values():
+ unneededHandles.extend(s.getUnneededFragmentHandles())
+
+ # Delete unneeded fragments.
for h in unneededHandles:
- fm = meta[h]
+ try:
+ fm = meta[h]
+ except KeyError:
+ continue
LOG.debug("Removing unneeded fragment %s from message ID %r",
fm.idx, fm.messageid)
self.store.removeMessage(h)
+ # Now nuke inconsistant messages.
self._deleteMessageIDs(badMessageIDs, "REJECTED")
def _deleteMessageIDs(self, messageIDSet, why, today=None):
+ """Helper function. Remove all the fragments and chunks associated
+ with a given message, and mark the message as delivered or
+ undeliverable.
+
+ messageIDSet -- a map from 20-byte messageID to 1.
+ why -- 'REJECTED' or 'COMPLETED'.
+ """
assert why in ("REJECTED", "COMPLETED")
if today is None:
- today = previousMidnight(time.time())
- else:
- today = previousMidnight(today)
+ today = time.time()
+ today = previousMidnight(today)
if why == 'REJECTED':
LOG.debug("Removing bogus messages by IDs: %s",
messageIDSet.keys())
@@ -170,133 +322,101 @@
if messageIDSet.has_key(fm.messageid):
self.store.removeMessage(h)
-
- def _getFragmentMetadata(self, fragmentPacket):
- now=time.time()
- return _FragmentMetadata(messageid=fragmentPacket.msgID,
- idx=fragmentPacket.index,
- hash=fragmentPacket.hash,
- size=fragmentPacket.msgLen,
- isChunk=0,
- chunkNum=None,
- overhead=fragmentPacket.getOverhead(),
- insertedDate=previousMidnight(now))
-
- def addFragment(self, fragmentPacket, now=None):
- #print "---"
- if now is None:
- now = time.time()
- today = previousMidnight(now)
-
- s = self.db.getStatusAndTime(fragmentPacket.msgID)
- if s:
- #print "A"
- LOG.debug("Dropping fragment of %s message %r",
- s[0].lower(), fragmentPacket.msgID)
- return
-
- meta = self._getFragmentMetadata(fragmentPacket)
- state = self.getState(meta)
+ def _getState(self, fm):
+ """Helper function. Return the MessageState object associated with
+ a given FragmentMetadata; allocate it if necessary."""
try:
- # print "B"
- state.addFragment(None, meta, noop=1)
- h = self.store.queueMessageAndMetadata(fragmentPacket.data, meta)
- state.addFragment(h, meta)
- #print "C"
- except MismatchedFragment:
- # remove other fragments, mark msgid as bad.
- #print "D"
- self._deleteMessageIDs({ meta.messageid : 1}, "REJECTED", now)
- except UnneededFragment:
- #print "E"
- LOG.debug("Dropping unneeded fragment %s of message %r",
- fragmentPacket.index, fragmentPacket.msgID)
-
- def getReadyMessage(self, msgid):
- s = self.states.get(msgid)
- if not s or not s.isDone():
- return None
-
- hs = s.getChunkHandles()
- msg = "".join([self.store.messageContents(h) for h in hs])
- msg = unwhiten(msg[:s.params.length])
- return msg
-
- def markMessageCompleted(self, msgid, rejected=0):
- s = self.states.get(msgid)
- if not s or not s.isDone():
- return None
- if rejected:
- self._deleteMessageIDs({msgid: 1}, "REJECTED")
- else:
- self._deleteMessageIDs({msgid: 1}, "COMPLETED")
-
- def listReadyMessages(self):
- return [ msgid
- for msgid,state in self.states.items()
- if state.isDone() ]
-
- def unchunkMessages(self):
- for msgid, state in self.states.items():
- if not state.hasReadyChunks():
- continue
- # refactor as much of this as possible into state. XXXX
- for chunkno, lst in state.getReadyChunks():
- vs = []
- minDate = min([fm.insertedDate for h,fm in lst])
- for h,fm in lst:
- vs.append((state.params.getPosition(fm.idx)[1],
- self.store.messageContents(h)))
- chunkText = "".join(state.params.getFEC().decode(vs))
- del vs
- fm2 = _FragmentMetadata(state.messageid, state.hash,
- 1, state.params.length, 1,
- chunkno,
- state.overhead,
- minDate)
- h2 = self.store.queueMessageAndMetadata(chunkText, fm2)
- #XXXX005 handle if crash comes here!
- for h,fm in lst:
- self.store.removeMessage(h)
- state.fragmentsByChunk[chunkno] = {}
- state.addChunk(h2, fm2)
+ return self.states[fm.messageid]
+ except KeyError:
+ state = MessageState(messageid=fm.messageid,
+ length=fm.size,
+ overhead=fm.overhead,
+ inserted=fm.insertedDate,
+ nym=fm.nym)
+ self.states[fm.messageid] = state
+ return state
# ======================================================================
class MismatchedFragment(Exception):
+ """Exception raised when a fragment isn't compatible with the other
+ fragments with a given message ID. Because fragments are
+ integrity-checked on their way in, inconsistent fragments mean the
+ message is corrupt."""
pass
class UnneededFragment(Exception):
+ """Exception raised when a fragment is unneeded, and doesn't need to be
+ stored to disk."""
pass
-class _FragmentMetadata:
- def __init__(self, messageid, hash, idx, size, isChunk, chunkNum, overhead,
- insertedDate):
+class FragmentMetadata:
+ """Persistent metadata object to hold the state of a given fragment or
+ reconstructed chunk."""
+ ## Fields
+ # messageid -- unique 20-byte identifier for the message this fragment
+ # comes from.
+ # idx -- index of the fragment within the message. In the case of a
+ # chunk, it's equal to chunkNum.
+ # size -- total length of the message.
+ # isChunk -- true iff this is a reconstructed chunk.
+ # chunkNum -- number of the chunk to which this fragment belongs.
+ # overhead -- Payload overhead for this fragment. Equal to 0 or
+ # ENC_FWD_OVERHEAD.
+ # insertedDate -- Midnight GMT before the day this fragment was received.
+ # nym -- name of the identity that received this fragment.
+ def __init__(self, messageid, idx, size, isChunk, chunkNum, overhead,
+ insertedDate, nym):
self.messageid = messageid
- self.hash = hash
self.idx = idx
self.size = size
self.isChunk = isChunk
self.chunkNum = chunkNum
self.overhead = overhead
self.insertedDate = insertedDate
+ self.nym = nym
def __getstate__(self):
- return ("V0", self.messageid, self.hash, self.idx, self.size,
- self.isChunk, self.chunkNum, self.insertedDate)
+ return ("V0", self.messageid, self.idx, self.size,
+ self.isChunk, self.chunkNum, self.overhead, self.insertedDate,
+ self.nym)
- def __setstate__(self, o):
+ def __setstate__(self, state):
if state[0] == 'V0':
- (_, self.messageid, self.hash, self.idx, self.size,
- self.isChunk, self.chunkNum, self.insertedDate) = state
+ (_, self.messageid, self.idx, self.size,
+ self.isChunk, self.chunkNum, self.overhead, self.insertedDate,
+ self.nym) = state
else:
raise MixFatalError("Unrecognized fragment state")
class MessageState:
- def __init__(self, messageid, hash, length, overhead):
+ """Helper class. Tracks the status of the reconstruction of a
+ single message. MessageState objects are not persistent, and must
+ be reconstructed from FragmentMetadata objects whenever a
+ fragment pool is rescanned.
+ """
+ ## Fields:
+ # messageid -- the 20-byte message ID of this message.
+ # overhead -- the overhead for messages sent via this message
+ # inserted -- the midnight (GMT) of the day on the first packet
+ # associated with this message was inserted.
+ # nym -- the name of the identity receiving this message. Used to
+ # prevent linkage attacks. XXXX005 specify this need!
+ #
+ # params -- an instance of FragmentationParams for this message.
+ # chunks -- a map from chunk number to tuples of (handle within the pool,
+ # FragmentMetadata object). For completed chunks.
+ # fragmentsByChunk -- a map from chunk number to maps from
+ # index-within-chunk to (handle,FragmentMetadata)
+ # readyChunks -- a map whose keys are the numbers of chunks that
+ # are ready for reconstruction, but haven't been reconstructed
+ # yet.
+ def __init__(self, messageid, length, overhead, inserted, nym):
+ """Create a new MessageState."""
self.messageid = messageid
- self.hash = hash
self.overhead = overhead
+ self.inserted = inserted
+ self.nym = nym
# chunkno -> handle,fragmentmeta
self.chunks = {}
# chunkno -> idxwithinchunk -> (handle,fragmentmeta)
@@ -308,40 +428,52 @@
self.readyChunks = {}
def isDone(self):
+ """Return true iff we have reconstructed all the chunks for this
+ message."""
return len(self.chunks) == self.params.nChunks
def getChunkHandles(self):
+ """Requires self.isDone(). Return an in-order list for the handles
+ of the reconstructed chunks of this message."""
+ assert self.isDone()
return [ self.chunks[i][0] for i in xrange(self.params.nChunks) ]
def addChunk(self, h, fm):
- # h is handle
- # fm is fragmentmetadata
+ """Register a chunk with handle h and FragmentMetadata fm. If the
+ chunk is inconsistent with other fragments of this message,
+ raise MismatchedFragment."""
assert fm.isChunk
assert fm.messageid == self.messageid
if (fm.size != self.params.length or
- fm.hash != self.hash or
fm.overhead != self.overhead or
self.chunks.has_key(fm.chunkNum)):
- #print "MIS-C-1"
raise MismatchedFragment
-
+
+ if fm.nym != self.nym:
+ raise MismatchedFragment
+
+ if self.inserted > fm.insertedDate:
+ self.inserted = fm.insertedDate
self.chunks[fm.chunkNum] = (h,fm)
if self.fragmentsByChunk[fm.chunkNum]:
LOG.warn("Found a chunk with unneeded fragments for message %r",
self.messageid)
- #XXXX005 the old fragments need to be removed.
+
+ if self.readyChunks.get(fm.chunkNum):
+ del self.readyChunks[fm.chunkNum]
def addFragment(self, h, fm, noop=0):
- # h is handle
- # fm is fragmentmetadata
+ """Register a fragment with handle h and FragmentMetadata fm. If the
+ fragment is inconsistent with the other fragments of this message,
+ raise MismatchedFragment. If the fragment isn't neeeded (because
+ enough fragments for its chunks have already been received),
+ raise UnneededFragment). If 'noop' is true, do not add this
+ fragment--just raise exceptions as needed."""
assert fm.messageid == self.messageid
if (fm.size != self.params.length or
fm.overhead != self.overhead):
- #print "MIS-1"
- #print (fm.hash, fm.size, fm.overhead)
- #print (self.hash, self.params.length, self.overhead)
raise MismatchedFragment
chunkNum, pos = self.params.getPosition(fm.idx)
@@ -350,45 +482,86 @@
if (self.chunks.has_key(chunkNum) or
len(self.fragmentsByChunk[chunkNum]) >= self.params.k):
- #print "UNN-2"
raise UnneededFragment
if self.fragmentsByChunk[chunkNum].has_key(pos):
- #print "MIS-3"
raise MismatchedFragment
if noop:
return
assert h
+ if self.inserted > fm.insertedDate:
+ self.inserted = fm.insertedDate
self.fragmentsByChunk[chunkNum][pos] = (h, fm)
if len(self.fragmentsByChunk[chunkNum]) >= self.params.k:
self.readyChunks[chunkNum] = 1
def hasReadyChunks(self):
+ """Return true iff some of the chunks in this message are pending
+ reconstruction."""
return len(self.readyChunks) != 0
- def getReadyChunks(self):
- """DOCDOC"""
- # return list of [ (chunkno, [(h, fm)...]) )
- r = []
+ def reconstruct(self, store):
+ """If any of the chunks in this message are pending reconstruction,
+ reconstruct them in a given store."""
+ if not self.readyChunks:
+ return
for chunkno in self.readyChunks.keys():
+ # Get the first K fragments in the chunk. (list of h,fm)
ch = self.fragmentsByChunk[chunkno].values()[:self.params.k]
- r.append( (chunkno, ch) )
- return r
+ minDate = min([fm.insertedDate for h, fm in ch])
+ # Build a list of (position-within-chunk, fragment-contents).
+ frags = [(self.params.getPosition(fm.idx)[1],
+ store.messageContents(h)) for h,fm in ch]
+ chunkText = "".join(self.params.getFEC().decode(frags))
+ del frags
+ fm2 = FragmentMetadata(messageid=self.messageid,
+ idx=chunkno, size=self.params.length,
+ isChunk=1, chunkNum=chunkno,
+ overhead=self.overhead,
+ insertedDate=minDate, nym=self.nym)
+ # Queue the chunk.
+ h2 = store.queueMessageAndMetadata(chunkText, fm2)
+ del chunkText
+ # Remove superceded fragments.
+ for h, fm in ch:
+ store.removeMessage(h)
+ # Update this MessageState object.
+ self.fragmentsByChunk[chunkno] = {}
+ del self.readyChunks[chunkno]
+ self.addChunk(h2, fm2)
+ def getUnneededFragmentHandles(self):
+ """Returns any handles for fragments that have been superceded by
+ chunks."""
+ r = []
+ for chunkno in self.chunks.keys():
+ r.extend([ h for h,_ in self.fragmentsByChunk[chunkno]])
+ return r
-class _FragmentDB(mixminion.Filestore.DBBase):
+class FragmentDB(mixminion.Filestore.DBBase):
+ """Internal class. Uses a database background (such as dbm, berkely db,
+ gdbm, etc.) to remember which message IDs have already been
+ reconstructed or noted as corrupt
+ """
def __init__(self, location):
+ """Open a new FragmentDB; stores its data in files beginning with
+ 'location'."""
mixminion.Filestore.DBBase.__init__(self, location, "fragment")
self.sync()
- def markStatus(self, msgid, status, today):
+ def markStatus(self, msgid, status, today=None):
+ """Note fragments for a message with message ID 'msgid' should no
+ longer be stored. 'status' is one of 'COMPLETED' or 'REJECTED',
+ depending on whether the message was delivered or undeliverable."""
assert status in ("COMPLETED", "REJECTED")
if today is None:
today = time.time()
today = previousMidnight(today)
self[msgid] = (status, today)
def getStatusAndTime(self, msgid):
+ """Given a messageID, return a 2-tuple of status,resolutiondate.
+ Return None if the message is still deliverable."""
return self.get(msgid, None)
def _encodeKey(self, k):
return binascii.b2a_hex(k)
@@ -400,19 +573,30 @@
status = {"C":"COMPLETED", "R":"REJECTED"}[v[0]]
tm = int(v[2:])
return status, tm
-
# ======================================================================
+# Internal lazy-generated cache from (k,n) to _minionlib.FEC object.
+# Note that we only use k,n for a limited set of k,n.
+def _blankFECtable():
+ """Return a map from permissible k,n tuples to FEC objects"""
+ f = {}
+ k = 2
+ while k <= MAX_FRAGMENTS_PER_CHUNK:
+ f[(k, int(math.ceil(EXP_FACTOR*k)))] = None
+ k *= 2
+ return f
-_fectab = {}
+# global map.
+_fectab = _blankFECtable()
def _getFEC(k,n):
- """DOCDOC: Note race condition """
- try:
- return _fectab[(k,n)]
- except KeyError:
- f = mixminion._minionlib.FEC_generate(k,n)
- _fectab[(k,n)] = f
- return f
-
-
+ """Given k and n parameters, return a FEC object to fragment and
+ reconstruct messages given those parameters."""
+ # There's a possible race condition here where two threads note
+ # that a given set of parameters haven't been generated, and both
+ # generate them. This is harmless.
+ f = _fectab[(k,n)]
+ if f is None:
+ f = _fectab[(k,n)] = mixminion._minionlib.FEC_generate(k,n)
+ return f
+
Index: Packet.py
===================================================================
RCS file: /home/minion/cvsroot/src/minion/lib/mixminion/Packet.py,v
retrieving revision 1.54
retrieving revision 1.55
diff -u -d -r1.54 -r1.55
--- Packet.py 8 Aug 2003 21:42:47 -0000 1.54
+++ Packet.py 18 Aug 2003 05:11:55 -0000 1.55
@@ -82,6 +82,8 @@
MIN_EXIT_TYPE = 0x0100 # The numerically first exit type.
SMTP_TYPE = 0x0100 # Mail the message
MBOX_TYPE = 0x0101 # Send the message to one of a fixed list of addresses
+NEWS_TYPE = 0x0102 # Post the message to some ngs, and maybe mail it too
+FRAGMENT_TYPE = 0x0103 # Find the actual deliver info in the message payload
MAX_EXIT_TYPE = 0xFFFF
class ParseError(MixError):
Index: test.py
===================================================================
RCS file: /home/minion/cvsroot/src/minion/lib/mixminion/test.py,v
retrieving revision 1.145
retrieving revision 1.146
diff -u -d -r1.145 -r1.146
--- test.py 18 Aug 2003 00:41:10 -0000 1.145
+++ test.py 18 Aug 2003 05:11:55 -0000 1.146
@@ -6407,7 +6407,7 @@
pkts2 = [ pp(x) for x in em(M2,38) ]
pkts3 = [ pp(x) for x in em(M3,0) ]
pkts4 = [ pp(x) for x in em(M4,0) ]
- pkts5 = [ pp(x) for x in em(M4,0) ]
+ pkts5 = [ pp(x) for x in em(M5,0) ]
self.assertEquals(map(len, [pkts1,pkts2,pkts3,pkts4,pkts5]),
[3, 11, 11, 66, 66])
@@ -6417,8 +6417,10 @@
pool.unchunkMessages()
#DOCDOC comment this rats' nets
-
+
+ ####
# Reconstruct: simple case.
+ ####
pool.addFragment(pkts1[2])
self.assertEquals(1, pool.store.count())
self.assertEquals([], pool.listReadyMessages())
@@ -6437,11 +6439,45 @@
self.assertEquals([], pool.listReadyMessages())
pool.addFragment(pkts1[1])
self.assertEquals([], pool.store.getAllMessages())
+
+ ####
+ # Reconstruct: large message, stop half-way, reload, finish.
+ ####
+ # enough for chunk1 -- 17 messages
+ for p in pkts4[5:22]: pool.addFragment(p)
+ # enough for half of chunk2: 8 messages
+ for p in pkts4[22:30]: pool.addFragment(p)
+ pool.unchunkMessages()
+ # close and re-open messages
+ pool.close()
+ pool = mixminion.Fragments.FragmentPool(loc)
+ # Enough for the rest of message 4... 8 from 2, 17 from 3.
+ for p in pkts4[36:44]+pkts4[49:66]:
+ pool.addFragment(p)
+ pool.unchunkMessages()
+ self.assertEquals(len(pool.listReadyMessages()), 1)
+ mid = pool.listReadyMessages()[0]
+ self.assertLongStringEq(M4, uncompressData(pool.getReadyMessage(mid)))
+ pool.markMessageCompleted(mid)
+ pool.close()
+ pool = mixminion.Fragments.FragmentPool(loc)
+ pool.addFragment(pkts4[48])
+ self.assertEquals([], pool.store.getAllMessages())
+
+ # free some RAM
+ del M4
+ del M1
+ del pkts4
+ del pkts1
+
+ ####
+ # Try an interleaved case with two smallish msgs and one big one.
+ # Provoke an error in the big one part way through.
+ ####
+
-
-
#----------------------------------------------------------------------
def testSuite():
"""Return a PyUnit test suite containing all the unit test cases."""
@@ -6449,7 +6485,7 @@
loader = unittest.TestLoader()
tc = loader.loadTestsFromTestCase
- if 0:
+ if 1:
suite.addTest(tc(FragmentTests))
return suite