[Author Prev][Author Next][Thread Prev][Thread Next][Author Index][Thread Index]
[minion-cvs] Fragmentation backend now works for at least one test c...
Update of /home/minion/cvsroot/src/minion/lib/mixminion
In directory moria.mit.edu:/tmp/cvs-serv9529/lib/mixminion
Modified Files:
BuildMessage.py Fragments.py test.py
Log Message:
Fragmentation backend now works for at least one test case. More tests to come. Woohoo.
Index: BuildMessage.py
===================================================================
RCS file: /home/minion/cvsroot/src/minion/lib/mixminion/BuildMessage.py,v
retrieving revision 1.53
retrieving revision 1.54
diff -u -d -r1.53 -r1.54
--- BuildMessage.py 14 Aug 2003 19:37:24 -0000 1.53
+++ BuildMessage.py 18 Aug 2003 00:41:10 -0000 1.54
@@ -21,9 +21,9 @@
__all__ = ['buildForwardMessage', 'buildEncryptedMessage',
'buildReplyMessage', 'buildReplyBlock', 'checkPathLength',
- 'encodePayloads', 'decodePayload' ]
+ 'encodeMessage', 'decodePayload' ]
-def encodePayloads(message, overhead, paddingPRNG):
+def encodeMessage(message, overhead, paddingPRNG=None):
"""Given a message, compress it, fragment it into individual payloads,
and add extra fields (size, hash, etc) as appropriate. Return a list
of strings, each of which is a message payload suitable for use in
@@ -60,7 +60,7 @@
return [ p.pack() ]
# DOCDOC
- messageid = getCommonPRNG().getBytes(20)
+ messageid = Crypto.getCommonPRNG().getBytes(20)
p = mixminion.Fragments.FragmentationParams(len(payload), overhead)
rawFragments = p.getFragments(payload)
fragments = []
@@ -75,7 +75,7 @@
paddingPRNG=None):
# Compress, pad, and checksum the payload.
if payload is not None and exitType != DROP_TYPE:
- payloads = encodePayloads(payload, 0, paddingPRNG)
+ payloads = encodeMessage(payload, 0, paddingPRNG)
assert len(payloads) == 1
payload = payloads[0]
LOG.debug("Encoding forward message for %s-byte payload",len(payload))
@@ -127,7 +127,7 @@
def buildEncryptedForwardMessage(payload, exitType, exitInfo, path1, path2,
key, paddingPRNG=None, secretRNG=None):
- payloads = encodePayloads(payload, ENC_FWD_OVERHEAD, paddingPRNG)
+ payloads = encodeMessage(payload, ENC_FWD_OVERHEAD, paddingPRNG)
assert len(payloads) == 1
return _buildEncryptedForwardMessage(payloads[0], exitType, exitInfo,
path1, path2, key, paddingPRNG,
@@ -194,7 +194,7 @@
return _buildMessage(payload, exitType, exitInfo, path1, path2,paddingPRNG)
def buildReplyMessage(payload, path1, replyBlock, paddingPRNG=None):
- payloads = encodePayloads(payload, 0, paddingPRNG)
+ payloads = encodeMessage(payload, 0, paddingPRNG)
assert len(payloads) == 1
return _buildReplyMessage(payloads[0], path1, replyBlock, paddingPRNG)
Index: Fragments.py
===================================================================
RCS file: /home/minion/cvsroot/src/minion/lib/mixminion/Fragments.py,v
retrieving revision 1.2
retrieving revision 1.3
diff -u -d -r1.2 -r1.3
--- Fragments.py 17 Aug 2003 21:09:56 -0000 1.2
+++ Fragments.py 18 Aug 2003 00:41:10 -0000 1.3
@@ -5,15 +5,22 @@
Code to fragment and reassemble messages."""
+import binascii
+import math
+import time
import mixminion._minionlib
import mixminion.Filestore
-from mixminion.Crypto import getCommonPRNG, whiten, unwhiten
+from mixminion.Crypto import ceilDiv, getCommonPRNG, whiten, unwhiten
from mixminion.Common import LOG, previousMidnight, MixError, MixFatalError
+from mixminion.Packet import ENC_FWD_OVERHEAD, PAYLOAD_LEN, \
+ FRAGMENT_PAYLOAD_OVERHEAD
+
+__all__ = [ "FragmentPool", "FragmentationParams" ]
MAX_FRAGMENTS_PER_CHUNK = 32
EXP_FACTOR = 1.33333333333
-class _FragmentationParams:
+class FragmentationParams:
"""DOCDOC"""
## Fields:
# k, n, length, fec, chunkSize, fragmentCapacity, dataFragments,
@@ -25,20 +32,21 @@
# minimum number of payloads to hold msg, without fragmentation
# or padding.
minFragments = ceilDiv(length, self.fragCapacity)
+ assert minFragments >= 2
# Number of data fragments per chunk.
self.k = 2
- while k < minFragments and k < 16:
+ while self.k < minFragments and self.k < 16:
self.k *= 2
# Number of chunks.
- self.nChunks = ceilDiv(minFragments, k)
+ self.nChunks = ceilDiv(minFragments, self.k)
+ # Number of total fragments per chunk.
+ self.n = int(math.ceil(EXP_FACTOR * self.k))
# Data in a single chunk
self.chunkSize = self.fragCapacity * self.k
# Length of data to fill chunks
self.paddedLen = self.nChunks * self.fragCapacity * self.k
# Length of padding needed to fill all chunks with data.
self.paddingLen = self.paddedLen - length
- # Number of total fragments per chunk.
- self.n = math.ceil(EXP_FACTOR * k)
# FEC object
self.fec = None
@@ -57,7 +65,7 @@
paddingPRNG = getCommonPRNG()
self.getFEC()
- assert s.length == self.length
+ assert len(s) == self.length
s = whiten(s)
s += paddingPRNG.getBytes(self.paddingLen)
assert len(s) == self.paddedLen
@@ -74,13 +82,187 @@
blocks.append( chunks[i][j*self.fragCapacity:
(j+1)*self.fragCapacity] )
chunks[i] = None
- for j in xrange(p.n):
+ for j in xrange(self.n):
fragments.append( self.fec.encode(j, blocks) )
return fragments
# ======================================================================
#DOCDOC this entire section
+class FragmentPool:
+ """DOCDOC"""
+ ##
+ # messages : map from
+ def __init__(self, dir):
+ self.store = mixminion.Filestore.StringMetadataStore(dir,create=1,
+ scrub=1)
+ self.db = _FragmentDB(dir+"_db")
+ self.rescan()
+
+ def sync(self):
+ self.db.sync()
+
+ def close(self):
+ self.db.close()
+
+ def getState(self, fm):
+ try:
+ return self.states[fm.messageid]
+ except KeyError:
+ state = MessageState(messageid=fm.messageid,
+ hash=fm.hash,
+ length=fm.size,
+ overhead=fm.overhead)
+ self.states[fm.messageid] = state
+ return state
+
+ def rescan(self):
+ self.store.loadAllMetadata(lambda: None)
+ meta = self.store._metadata_cache
+ self.states = states = {}
+ badMessageIDs = {}
+ unneededHandles = []
+ for h, fm in meta.items():
+ if not fm:
+ LOG.debug("Removing fragment %s with missing metadata", h)
+ self.store.removeMessage(h)
+ try:
+ mid = fm.messageid
+ if badMessageIDs.has_key(mid):
+ continue
+ state = self.getState(fm)
+ if fm.isChunk:
+ state.addChunk(h, fm)
+ else:
+ state.addFragment(h, fm)
+ except MismatchedFragment:
+ badMessageIDs[mid] = 1
+ except UnneededFragment:
+ unneededHandles.append(h)
+
+ for h in unneededHandles:
+ fm = meta[h]
+ LOG.debug("Removing unneeded fragment %s from message ID %r",
+ fm.idx, fm.messageid)
+ self.store.removeMessage(h)
+
+ self._deleteMessageIDs(badMessageIDs, "REJECTED")
+
+ def _deleteMessageIDs(self, messageIDSet, why, today=None):
+ assert why in ("REJECTED", "COMPLETED")
+ if today is None:
+ today = previousMidnight(time.time())
+ else:
+ today = previousMidnight(today)
+ if why == 'REJECTED':
+ LOG.debug("Removing bogus messages by IDs: %s",
+ messageIDSet.keys())
+ else:
+ LOG.debug("Removing completed messages by IDs: %s",
+ messageIDSet.keys())
+ for mid in messageIDSet.keys():
+ self.db.markStatus(mid, why, today)
+ try:
+ del self.states[mid]
+ except KeyError:
+ pass
+ for h, fm in self.store._metadata_cache.items():
+ if messageIDSet.has_key(fm.messageid):
+ self.store.removeMessage(h)
+
+
+ def _getFragmentMetadata(self, fragmentPacket):
+ now=time.time()
+ return _FragmentMetadata(messageid=fragmentPacket.msgID,
+ idx=fragmentPacket.index,
+ hash=fragmentPacket.hash,
+ size=fragmentPacket.msgLen,
+ isChunk=0,
+ chunkNum=None,
+ overhead=fragmentPacket.getOverhead(),
+ insertedDate=previousMidnight(now))
+
+ def addFragment(self, fragmentPacket, now=None):
+ #print "---"
+ if now is None:
+ now = time.time()
+ today = previousMidnight(now)
+
+ s = self.db.getStatusAndTime(fragmentPacket.msgID)
+ if s:
+ #print "A"
+ LOG.debug("Dropping fragment of %s message %r",
+ s[0].lower(), fragmentPacket.msgID)
+ return
+
+ meta = self._getFragmentMetadata(fragmentPacket)
+ state = self.getState(meta)
+ try:
+ # print "B"
+ state.addFragment(None, meta, noop=1)
+ h = self.store.queueMessageAndMetadata(fragmentPacket.data, meta)
+ state.addFragment(h, meta)
+ #print "C"
+ except MismatchedFragment:
+ # remove other fragments, mark msgid as bad.
+ #print "D"
+ self._deleteMessageIDs({ meta.messageid : 1}, "REJECTED", now)
+ except UnneededFragment:
+ #print "E"
+ LOG.debug("Dropping unneeded fragment %s of message %r",
+ fragmentPacket.index, fragmentPacket.msgID)
+
+ def getReadyMessage(self, msgid):
+ s = self.states.get(msgid)
+ if not s or not s.isDone():
+ return None
+
+ hs = s.getChunkHandles()
+ msg = "".join([self.store.messageContents(h) for h in hs])
+ msg = unwhiten(msg[:s.params.length])
+ return msg
+
+ def markMessageCompleted(self, msgid, rejected=0):
+ s = self.states.get(msgid)
+ if not s or not s.isDone():
+ return None
+ if rejected:
+ self._deleteMessageIDs({msgid: 1}, "REJECTED")
+ else:
+ self._deleteMessageIDs({msgid: 1}, "COMPLETED")
+
+ def listReadyMessages(self):
+ return [ msgid
+ for msgid,state in self.states.items()
+ if state.isDone() ]
+
+ def unchunkMessages(self):
+ for msgid, state in self.states.items():
+ if not state.hasReadyChunks():
+ continue
+ # refactor as much of this as possible into state. XXXX
+ for chunkno, lst in state.getReadyChunks():
+ vs = []
+ minDate = min([fm.insertedDate for h,fm in lst])
+ for h,fm in lst:
+ vs.append((state.params.getPosition(fm.idx)[1],
+ self.store.messageContents(h)))
+ chunkText = "".join(state.params.getFEC().decode(vs))
+ del vs
+ fm2 = _FragmentMetadata(state.messageid, state.hash,
+ 1, state.params.length, 1,
+ chunkno,
+ state.overhead,
+ minDate)
+ h2 = self.store.queueMessageAndMetadata(chunkText, fm2)
+ #XXXX005 handle if crash comes here!
+ for h,fm in lst:
+ self.store.removeMessage(h)
+ state.fragmentsByChunk[chunkno] = {}
+ state.addChunk(h2, fm2)
+
+# ======================================================================
+
class MismatchedFragment(Exception):
pass
@@ -119,7 +301,7 @@
self.chunks = {}
# chunkno -> idxwithinchunk -> (handle,fragmentmeta)
self.fragmentsByChunk = []
- self.params = _FragmentationParams(length, overhead)
+ self.params = FragmentationParams(length, overhead)
for i in xrange(self.params.nChunks):
self.fragmentsByChunk.append({})
# chunkset: ready chunk num -> 1
@@ -140,31 +322,47 @@
fm.hash != self.hash or
fm.overhead != self.overhead or
self.chunks.has_key(fm.chunkNum)):
+ #print "MIS-C-1"
raise MismatchedFragment
self.chunks[fm.chunkNum] = (h,fm)
- def addFragment(self, h, fm):
+ if self.fragmentsByChunk[fm.chunkNum]:
+ LOG.warn("Found a chunk with unneeded fragments for message %r",
+ self.messageid)
+ #XXXX005 the old fragments need to be removed.
+
+ def addFragment(self, h, fm, noop=0):
# h is handle
# fm is fragmentmetadata
assert fm.messageid == self.messageid
- if (fm.hash != self.hash or
- fm.size != self.params.length or
+ if (fm.size != self.params.length or
fm.overhead != self.overhead):
+ #print "MIS-1"
+ #print (fm.hash, fm.size, fm.overhead)
+ #print (self.hash, self.params.length, self.overhead)
raise MismatchedFragment
- chunkNum, pos = self.params.getPosition(idx)
+ chunkNum, pos = self.params.getPosition(fm.idx)
+ if chunkNum >= self.params.nChunks:
+ raise MismatchedFragment
- if self.chunks.has_key(chunkNum):
+ if (self.chunks.has_key(chunkNum) or
+ len(self.fragmentsByChunk[chunkNum]) >= self.params.k):
+ #print "UNN-2"
raise UnneededFragment
if self.fragmentsByChunk[chunkNum].has_key(pos):
+ #print "MIS-3"
raise MismatchedFragment
+ if noop:
+ return
+ assert h
self.fragmentsByChunk[chunkNum][pos] = (h, fm)
- if len(self.fragmentsByChunk(chunkNum)) >= self.params.k:
+ if len(self.fragmentsByChunk[chunkNum]) >= self.params.k:
self.readyChunks[chunkNum] = 1
def hasReadyChunks(self):
@@ -179,14 +377,16 @@
r.append( (chunkno, ch) )
return r
+
class _FragmentDB(mixminion.Filestore.DBBase):
def __init__(self, location):
mixminion.Filestore.DBBase.__init__(self, location, "fragment")
self.sync()
def markStatus(self, msgid, status, today):
assert status in ("COMPLETED", "REJECTED")
- if now is None:
- now = time.time()
+ if today is None:
+ today = time.time()
+ today = previousMidnight(today)
self[msgid] = (status, today)
def getStatusAndTime(self, msgid):
return self.get(msgid, None)
@@ -198,138 +398,9 @@
{"COMPLETED":"C", "REJECTED":"R"}[status], str(tm))
def _decodeVal(self, v):
status = {"C":"COMPLETED", "R":"REJECTED"}[v[0]]
- tm = int(tm[2:])
+ tm = int(v[2:])
return status, tm
-class FragmentPool:
- """DOCDOC"""
- ##
- # messages : map from
- def __init__(self, dir):
- self.store = mixminion.Filestore.StringMetadataStore(dir,create=1,
- scrub=1)
- self.log = _FragmentDB(dir+"_db")
- self.rescan()
-
- def getState(self, fm):
- try:
- return self.states[fm.messageid]
- except KeyError:
- state = MessageState(messageid=fm.messageid,
- hash=fm.hash,
- length=fm.size,
- overhead=fm.overhead)
- self.states[fm.messageid] = state
- return state
-
- def rescan(self):
- self.store.loadAllMetadata()
- meta = self.store._metadata_cache
- self.states = states = {}
- badMessageIDs = {}
- unneededHandles = []
- for h, fm in meta.items():
- try:
- mid = fm.messageid
- if badMessageIDs.has_key(mid):
- continue
- state = self.getState(fm)
- if fm.isChunk:
- state.addChunk(h, fm)
- else:
- state.addFragment(h, fm)
- except MismatchedFragment:
- badMessageIDs[mid] = 1
- except UnneededFragment:
- unneededHandles.append(h)
-
- for h in unneededHandles:
- fm = meta[h]
- LOG.debug("Removing unneeded fragment %s from message ID %r",
- fm.idx, fm.messageid)
- self.removeMessage(h)
-
- self._abortMessageIDs(badMessageIDs, today)
-
- def _abortMessageIDs(self, messageIDSet, today=None):
- if today is None:
- today = previousMidnight(time.time())
- else:
- today = previousMidnight(today)
- LOG.debug("Removing bogus messages by IDs: %s", messageIDSet.keys())
- for mid in messageIDSet.keys():
- self.markStatus(mid, "REJECTED", today)
- for h, fm in self._metadata_cache.items():
- if messageIDSet.has_key(fm.messageid):
- self.removeMessage(h)
-
- def _getPacketMetadata(self, fragmentPacket):
- return _FragmentMetadata(messageid=fragmentPacket.msgID,
- idx=fragmentPacket.index,
- hash=fragmentPacket.hash,
- size=fragmentPacket.msgLen,
- isChunk=0,
- chunkNum=None,
- overhead=fragmentPacket.getOverhead(),
- insertedDate=previousMidnight(now))
-
- def addFragment(self, fragmentPacket, now=None):
- if now is None:
- now = time.time()
- today = previousMidnight(now)
-
- meta = self._getFragmentMetadata(fragmentPacket)
- state = self.getState(meta)
- try:
- state.addFragment(fragmentPacket)
- h = self.store.queueMessageAndMetadata(fragmentPacket.data, meta)
- except MismatchedFragment:
- # remove other fragments, mark msgid as bad.
- self._abortMessageIDs({ meta.id : 1}, now)
- except UnneededFragment:
- LOG.debug("Dropping unneeded fragment %s of message %r",
- fragmentPacket.idx, fragmentPacket.msgID)
-
- def getReadyMessage(self, msgid):
- s = self.states.get(msgid)
- if not s or not s.isDone():
- return None
-
- hs = s.getChunkHandles()
- return "".join([self.state.getMessage(h) for h in hs])
-
- def deleteMessage(self, msgid):
- s = self.states.get(msgid)
- if not s or not s.isDone():
- return None
-
- hs = s.getChunkHandles()
- for h in hs:
- self.store.removeMessage(h)
-
- def getReadyMessages(self):
- return [ msgid
- for msgid,state in self.states.items()
- if state.isDone() ]
-
- def unchunkMessages(self):
- for msgid, state in self.states.items():
- if not state.hasReadyChunks():
- continue
- for chunkno, lst in state.getReadyChunks():
- vs = []
- minDate = min([fm.insertedDate for h,fm in lst])
- for h,fm in lst:
- vs.append((state.getPos(fm.index)[1],
- self.store.getMessage(h)))
- chunkText = self.store.params.getFEC().decode(vs)
- fm2 = _FragmentMetadata(state.mesageid, state.hash,
- state.idx, 1, chunkno, state.overhead,
- minDate)
- h2 = self.store.queueMessage(chunkText)
- self.store.setMetadata(h2, fm2)
- for h,fm in lst:
- self.store.removeMessage(h)
# ======================================================================
@@ -343,4 +414,5 @@
f = mixminion._minionlib.FEC_generate(k,n)
_fectab[(k,n)] = f
return f
+
Index: test.py
===================================================================
RCS file: /home/minion/cvsroot/src/minion/lib/mixminion/test.py,v
retrieving revision 1.144
retrieving revision 1.145
diff -u -d -r1.144 -r1.145
--- test.py 17 Aug 2003 21:09:56 -0000 1.144
+++ test.py 18 Aug 2003 00:41:10 -0000 1.145
@@ -43,6 +43,7 @@
import mixminion.Config
import mixminion.Crypto as Crypto
import mixminion.Filestore
+import mixminion.Fragments
import mixminion.MMTPClient
import mixminion.Packet
import mixminion.ServerInfo
@@ -1648,7 +1649,7 @@
for m in (p.getBytes(3000), p.getBytes(10000), "", "q", "blznerty"):
for ov in 0, 42-20+16: # encrypted forward overhead
- plds = BuildMessage.encodePayloads(m,ov,p)
+ plds = BuildMessage.encodeMessage(m,ov,p)
assert len(plds) == 1
pld = plds[0]
self.assertEquals(28*1024, len(pld)+ov)
@@ -2286,7 +2287,7 @@
# get such a payload is to compress 25K of zeroes.
nils = "\x00"*(25*1024)
overcompressed_payload = \
- BuildMessage.encodePayloads(nils, 0, AESCounterPRNG())[0]
+ BuildMessage.encodeMessage(nils, 0, AESCounterPRNG())[0]
self.failUnlessRaises(CompressedDataTooLong,
BuildMessage.decodePayload, overcompressed_payload, "X"*20)
@@ -6325,6 +6326,123 @@
return ds[0] == ds[1]
#----------------------------------------------------------------------
+class FragmentTests(TestCase):
+ def testFragmentParams(self):
+ K28 = 28*1024
+ FP = mixminion.Fragments.FragmentationParams
+ # 1 chunk, small
+ fp1 = FP(K28 + 1, 0)
+ self.assertEquals((fp1.length, fp1.fragCapacity, fp1.k, fp1.n,
+ fp1.nChunks, fp1.chunkSize, fp1.paddedLen,
+ fp1.paddingLen),
+ (K28+1, K28 - 47, 2, 3,
+ 1, 2*(K28-47), 2*(K28-47),
+ fp1.paddedLen-(K28+1)))
+ # 1 chunk, a bit larger, with enc-fwd overhead
+ fp1 = FP(150*1024, 38)
+ self.assertEquals((fp1.length, fp1.fragCapacity, fp1.k, fp1.n,
+ fp1.nChunks, fp1.chunkSize, fp1.paddedLen,
+ fp1.paddingLen),
+ (150*1024, K28 - 47 - 38, 8, 11,
+ 1, 8*(K28-47-38), 8*(K28-47-38),
+ fp1.paddedLen-(150*1024)))
+ # 2 chunks.
+ fp1 = FP(K28 * 20, 0)
+ self.assertEquals((fp1.length, fp1.fragCapacity, fp1.k, fp1.n,
+ fp1.nChunks, fp1.chunkSize, fp1.paddedLen,
+ fp1.paddingLen),
+ (K28*20, K28 - 47, 16, 22,
+ 2, 16*(K28-47), 32*(K28-47),
+ fp1.paddedLen-(K28*20)))
+
+
+ # 3 chunks.
+ fp1 = FP(K28 * 32 + 101, 0)
+ self.assertEquals((fp1.length, fp1.fragCapacity, fp1.k, fp1.n,
+ fp1.nChunks, fp1.chunkSize, fp1.paddedLen,
+ fp1.paddingLen),
+ (K28*32+101, K28 - 47, 16, 22,
+ 3, 16*(K28-47), 48*(K28-47),
+ fp1.paddedLen-(K28*32+101)))
+
+
+ def testFragmentation(self):
+ FP = mixminion.Fragments.FragmentationParams
+
+ # One chunk.
+ msg = Crypto.getCommonPRNG().getBytes(150*1024)
+ fp1 = FP(len(msg), 38)
+ fec = mixminion.Fragments._getFEC(8, 11)
+ blocks = fp1.getFragments(msg)
+ self.assertEquals(len(blocks), 11)
+ for b in blocks:
+ self.assertEquals(len(b), 28*1024 - 47 - 38)
+ m2 = "".join(fec.decode(zip(range(3,11),blocks[3:])))
+ self.assertLongStringEq(msg, unwhiten(m2[:150*1024]))
+
+ # Three chunks.
+ msg = Crypto.getCommonPRNG().getBytes(28*32*1024 + 101)
+ fp1 = FP(len(msg), 0)
+ fec = mixminion.Fragments._getFEC(16, 22)
+ blocks = fp1.getFragments(msg)
+ self.assertEquals(len(blocks), 66)
+ for b in blocks: self.assertEquals(len(b), 28*1024 - 47)
+ chunks = []
+ for chunkno in 0,1,2:
+ blocksInChunk = zip(range(22), blocks[chunkno*22:(chunkno+1)*22])
+ receivedBlocks = Crypto.getCommonPRNG().shuffle(blocksInChunk, 16)
+ self.assertEquals(16, len(receivedBlocks))
+ chunks.append("".join(fec.decode(receivedBlocks)))
+ self.assertLongStringEq(msg, unwhiten(("".join(chunks))[:len(msg)]))
+
+ def testFragmentPool(self):
+ em = mixminion.BuildMessage.encodeMessage
+ pp = mixminion.Packet.parsePayload
+ M1 = Crypto.getCommonPRNG().getBytes(1024*30)
+ M2 = Crypto.getCommonPRNG().getBytes(1024*150)
+ M3 = Crypto.getCommonPRNG().getBytes(1024*200)
+ M4 = Crypto.getCommonPRNG().getBytes(1024*900)
+ M5 = Crypto.getCommonPRNG().getBytes(1024*900)
+ pkts1 = [ pp(x) for x in em(M1,0) ]
+ pkts2 = [ pp(x) for x in em(M2,38) ]
+ pkts3 = [ pp(x) for x in em(M3,0) ]
+ pkts4 = [ pp(x) for x in em(M4,0) ]
+ pkts5 = [ pp(x) for x in em(M4,0) ]
+ self.assertEquals(map(len, [pkts1,pkts2,pkts3,pkts4,pkts5]),
+ [3, 11, 11, 66, 66])
+
+ loc = mix_mktemp()
+ pool = mixminion.Fragments.FragmentPool(loc)
+ self.assertEquals([], pool.listReadyMessages())
+ pool.unchunkMessages()
+
+ #DOCDOC comment this rats' nets
+
+ # Reconstruct: simple case.
+ pool.addFragment(pkts1[2])
+ self.assertEquals(1, pool.store.count())
+ self.assertEquals([], pool.listReadyMessages())
+ pool.addFragment(pkts1[0])
+ self.assertEquals(2, pool.store.count())
+ pool.addFragment(pkts1[1]) # should be 'unnneedeed'
+ self.assertEquals(2, pool.store.count())
+ self.assertEquals([], pool.listReadyMessages())
+ pool.unchunkMessages()
+ self.assertEquals([pkts1[0].msgID], pool.listReadyMessages())
+ mid = pool.listReadyMessages()[0]
+ #print len(M1), len(pool.getReadyMessage(mid))
+ self.assertLongStringEq(M1, uncompressData(pool.getReadyMessage(mid)))
+ self.assertLongStringEq(M1, uncompressData(pool.getReadyMessage(mid)))
+ pool.markMessageCompleted(mid)
+ self.assertEquals([], pool.listReadyMessages())
+ pool.addFragment(pkts1[1])
+ self.assertEquals([], pool.store.getAllMessages())
+
+
+
+
+
+#----------------------------------------------------------------------
def testSuite():
"""Return a PyUnit test suite containing all the unit test cases."""
suite = unittest.TestSuite()
@@ -6332,7 +6450,7 @@
tc = loader.loadTestsFromTestCase
if 0:
- suite.addTest(tc(FilestoreTests))
+ suite.addTest(tc(FragmentTests))
return suite
suite.addTest(tc(MiscTests))