[Author Prev][Author Next][Thread Prev][Thread Next][Author Index][Thread Index]
[minion-cvs] Work on getting fragments happy -- not done yet
Update of /home/minion/cvsroot/src/minion/lib/mixminion
In directory moria.mit.edu:/tmp/cvs-serv3409/lib/mixminion
Modified Files:
BuildMessage.py Crypto.py Packet.py
Log Message:
Work on getting fragments happy -- not done yet
Index: BuildMessage.py
===================================================================
RCS file: /home/minion/cvsroot/src/minion/lib/mixminion/BuildMessage.py,v
retrieving revision 1.51
retrieving revision 1.52
diff -u -d -r1.51 -r1.52
--- BuildMessage.py 28 Jul 2003 01:02:33 -0000 1.51
+++ BuildMessage.py 8 Aug 2003 21:42:46 -0000 1.52
@@ -604,6 +604,7 @@
EXP_FACTOR = 1.33333333333
def _encodePayloads(message, overhead, paddingPRNG):
+ """DOCDOC"""
assert overhead in (0, ENC_FWD_OVERHEAD)
origLength = len(message)
payload = compress(message)
@@ -619,42 +620,75 @@
# We pad the payload, and construct a new SingletonPayload,
# including this payload's size and checksum.
payload += paddingPRNG.getBytes(paddingLen)
- return [SingletonPayload(length, Crypto.sha1(payload), payload).pack()]
-
- # XXXX005 Whiten payload!!!
+ p = SingletonPayload(length, None, payload)
+ p.computeHash()
+ return [ p.pack() ]
- fragCapacity = PAYLOAD_LEN - FRAGMENT_PAYLOAD_OVERHEAD - overhead
- minFragments = ceilDiv(length, fragCapacity)
- k = 2
- while k < minFragments and k < 16:
- k *= 2
- nChunks = ceilDiv(minFragments, k)
- paddingLen = nChunks*fragCapacity*k
- payload += paddingPRNG.getBytes(paddingLen)
+ # DOCDOC
+ payload = whiten(payload)
+ p = _FragmentationParams(len(payload), overhead)
+
+ payload += paddingPRNG.getBytes(p.paddingLen)
+ assert len(payload) == p.paddedLen
chunks = []
- for i in xrange(nChunks):
- chunk[i] = payload[fragCapacity*k*i:fragCapacity*k*(i+1)]
- n = math.ceil(EXP_FACTOR*k)
- fragments = []
- fec = getFEC(k,n)
+ for i in xrange(p.nChunks):
+ chunk[i] = payload[i*p.chunkSize:(i+1)*p.chunkSize]
+ del payload
messageid = getCommonPRNG().getBytes(20)
idx = 0
- for i in xrange(nChunks):
+ fragments = []
+ for i in xrange(p.nChunks):
blocks = []
- for j in xrange(k):
- blocks[j] = chunks[i][j*fragCapacity:(j+1)*fragCapacity]
+ for j in xrange(p.k):
+ blocks[j] = chunks[i][j*p.fragCapacity:(j+1)*p.fragCapacity]
chunks[i] = None
- for j in xrange(n):
- frag = fec.encode(j, blocks)
- fragments.append(idx, Crypto.sha1())
-
+ for j in xrange(p.n):
+ frag = p.fec.encode(j, blocks)
+ pyld = FragmentPayload(idx, None, messageid, p.length, frag)
+ pyld.computeHash()
+ fragments.append(pyld.pack())
idx += 1
+ return fragments
+class _FragmentationParams:
+ """DOCDOC"""
+ ## Fields:
+ # k, n, length, fec, chunkSize, fragmentCapacity, dataFragments,
+ # totalFragments, paddingLen, paddedLen
+ def __init__(self, length, overhead):
+ assert overhead in (0, ENC_FWD_OVERHEAD)
+ self.length = length
+ self.fragCapacity = PAYLOAD_LEN - FRAGMENT_PAYLOAD_OVERHEAD - overhead
+ # minimum number of payloads to hold msg, without fragmentation
+ # or padding.
+ minFragments = ceilDiv(length, self.fragCapacity)
+ # Number of data fragments per chunk.
+ self.k = 2
+ while k < minFragments and k < 16:
+ self.k *= 2
+ # Number of chunks.
+ self.nChunks = ceilDiv(minFragments, k)
+ # Data in a single chunk
+ self.chunkSize = self.fragCapacity * self.k
+ # Length of data to fill chunks
+ self.paddedLen = self.nChunks * self.fragCapacity * self.k
+ # Length of padding needed to fill all chunks with data.
+ self.paddingLen = self.paddedLen - length
+ # Number of total fragments per chunk.
+ self.n = math.ceil(EXP_FACTOR * k)
+ # FEC object
+ self.fec = None
-
-
+ def getFEC(self):
+ if self.fec is None:
+ self.fec = _getFEC(self.k, self.n)
+ return self.fec
+ def getPosition(self, index):
+ """DOCDOC"""
+ chunk, pos = divmod(index, self.n)
+ return chunk, pos
def _encodePayload(payload, overhead, paddingPRNG):
"""Helper: compress a payload, pad it, and add extra fields (size and hash)
@@ -748,3 +782,268 @@
return routing, sizes, totalSize
+# ======================================================================
+
+class MismatchedFragment(Exception):
+ pass
+
+class UnneededFragment(Exception):
+ pass
+
+class _FragmentMetadata:
+ def __init__(self, messageid, hash, idx, size, isChunk, chunkNum, overhead,
+ insertedDate):
+ self.messageid = messageid
+ self.hash = hash
+ self.idx = idx
+ self.size = size
+ self.isChunk = isChunk
+ self.chunkNum = chunkNum
+ self.overhead = overhead
+ self.insertedDate = insertedDate
+
+ def __getstate__(self):
+ return ("V0", self.messageid, self.hash, self.idx, self.size,
+ self.isChunk, self.chunkNum, self.insertedDate)
+
+ def __setstate__(self, o):
+ if state[0] == 'V0':
+ (_, self.messageid, self.hash, self.idx, self.size,
+ self.isChunk, self.chunkNum, self.insertedDate) = state
+ else:
+ raise MixFatalError("Unrecognized fragment state")
+
+class MessageState:
+ def __init__(self, messageid, hash, length, overhead):
+ self.messageid = messageid
+ self.hash = hash
+ self.overhead = overhead
+ # chunkno -> handle,fragmentmeta
+ self.chunks = {}
+ # chunkno -> idxwithinchunk -> (handle,fragmentmeta)
+ self.fragmentsByChunk = []
+ self.params = _FragmentationParams(length, overhead)
+ for i in xrange(self.params.nChunks):
+ self.fragmentsByChunk.append({})
+ # chunkset: ready chunk num -> 1
+ self.readyChunks = {}
+
+ def isDone(self):
+ return len(self.chunks) == self.params.nChunks
+
+ def getChunkHandles(self):
+ return [ self.chunks[i][0] for i in xrange(self.params.nChunks) ]
+
+ def addChunk(self, h, fm):
+ # h is handle
+ # fm is fragmentmetadata
+ assert fm.isChunk
+ assert fm.messageid == self.messageid
+ if (fm.size != self.params.length or
+ fm.hash != self.hash or
+ fm.overhead != self.overhead or
+ self.chunks.has_key(fm.chunkNum)):
+ raise MismatchedFragment
+
+ self.chunks[fm.chunkNum] = (h,fm)
+
+ def addFragment(self, h, fm):
+ # h is handle
+ # fm is fragmentmetadata
+ assert fm.messageid == self.messageid
+
+ if (fm.hash != self.hash or
+ fm.size != self.params.length or
+ fm.overhead != self.overhead):
+ raise MismatchedFragment
+
+ chunkNum, pos = self.params.getPosition(idx)
+
+ if self.chunks.has_key(chunkNum):
+ raise UnneededFragment
+
+ if self.fragmentsByChunk[chunkNum].has_key(pos):
+ raise MismatchedFragment
+
+ self.fragmentsByChunk[chunkNum][pos] = (h, fm)
+
+ if len(self.fragmentsByChunk(chunkNum)) >= self.params.k:
+ self.readyChunks[chunkNum] = 1
+
+ def hasReadyChunks(self):
+ return len(self.readyChunks) != 0
+
+ def getReadyChunks(self):
+ """DOCDOC"""
+ # return list of [ (chunkno, [(h, fm)...]) )
+ r = []
+ for chunkno in self.readyChunks.keys():
+ ch = self.fragmentsByChunk[chunkno].values()[:self.params.k]
+ r.append( (chunkno, ch) )
+ return r
+
+class _FragmentDB(mixminion.Filestore.DBBase):
+ def __init__(self, location):
+ mixminion.Filestore.DBBase.__init__(self, location, "fragment")
+ def markStatus(self, msgid, status, today):
+ assert status in ("COMPLETED", "REJECTED")
+ if now is None:
+ now = time.time()
+ self[msgid] = (status, today)
+ def getStatusAndTime(self, msgid):
+ return self.get(msgid, None)
+ def _encodeKey(self, k):
+ return binascii.b2a_hex(k)
+ def _encodeVal(self, v):
+ status, tm = v
+ return "%s-%s"%(
+ {"COMPLETED":"C", "REJECTED":"R"}[status], str(tm))
+ def _decodeVal(self, v):
+ status = {"C":"COMPLETED", "R":"REJECTED"}[v[0]]
+ tm = int(tm[2:])
+ return status, tm
+
+class FragmentPool:
+ """DOCDOC"""
+ ##
+ # messages : map from
+ def __init__(self, dir):
+ self.store = mixminion.Filestore.StringMetadataStore(dir,create=1,
+ scrub=1)
+ self.log = _FragmentDB(dir+"_db")
+ self.rescan()
+
+ def getState(self, fm):
+ try:
+ return self.states[fm.messageid]
+ except KeyError:
+ state = MessageState(messageid=fm.messageid,
+ hash=fm.hash,
+ length=fm.size,
+ overhead=fm.overhead)
+ self.states[fm.messageid] = state
+ return state
+
+ def rescan(self):
+ self.store.loadAllMetadata()
+ meta = self.store._metadata_cache
+ self.states = states = {}
+ badMessageIDs = {}
+ unneededHandles = []
+ for h, fm in meta.items():
+ try:
+ mid = fm.messageid
+ if badMessageIDs.has_key(mid):
+ continue
+ state = self.getState(fm)
+ if fm.isChunk:
+ state.addChunk(h, fm)
+ else:
+ state.addFragment(h, fm)
+ except MismatchedFragment:
+ badMessageIDs[mid] = 1
+ except UnneededFragment:
+ unneededHandles.append(h)
+
+ for h in unneededHandles:
+ fm = meta[h]
+ LOG.debug("Removing unneeded fragment %s from message ID %r",
+ fm.idx, fm.messageid)
+ self.removeMessage(h)
+
+ self._abortMessageIDs(badMessageIDs, today)
+
+ def _abortMessageIDs(self, messageIDSet, today=None):
+ if today is None:
+ today = previousMidnight(time.time())
+ LOG.debug("Removing bogus messages by IDs: %s", messageIDSet.keys())
+ for mid in messageIDSet.keys():
+ self.markStatus(mid, "REJECTED", today)
+ for h, fm in self._metadata_cache.items():
+ if messageIDSet.has_key(fm.messageid):
+ self.removeMessage(h)
+
+ def _getPacketMetadata(self, fragmentPacket):
+ return _FragmentMetadata(messageid=fragmentPacket.msgID,
+ idx=fragmentPacket.index,
+ hash=fragmentPacket.hash,
+ size=fragmentPacket.msgLen,
+ isChunk=0,
+ chunkNum=None,
+ overhead=fragmentPacket.getOverhead(),
+ insertedDate=previousMidnight(now))
+
+ def addFragment(self, fragmentPacket, now=None):
+ if now is None:
+ now = time.time()
+ today = previousMidnight(now)
+
+ meta = self._getFragmentMetadata(fragmentPacket)
+ h = self.store.queueMessage(fragmentPacket.data)
+ self.store.setMetadata(h, meta)
+
+ state = self.getState(fm)
+ try:
+ state.addFragment(fragmentPacket)
+ except MismatchedFragment:
+ self._abortMessageIDs({ meta.id
+ self.removeMessage(h)
+ # XXXX remove other fragments, mark msgid as bad.
+ except UnneededFragment:
+ LOG.debug("Dropping unneeded fragment %s of message %r",
+ fragmentPacket.idx, fragmentPacket.msgID)
+ self.removeMessage(h)
+
+ def getReadyMessage(self, msgid):
+ s = self.states.get(msgid)
+ if not s or not s.isDone():
+ return None
+
+ hs = s.getChunkHandles()
+ return "".join([self.state.getMessage(h) for h in hs])
+
+ def deleteMessage(self, msgid):
+ s = self.states.get(msgid)
+ if not s or not s.isDone():
+ return None
+
+ hs = s.getChunkHandles()
+ for h in hs:
+ self.store.removeMessage(h)
+
+ def getReadyMessages(self):
+ return [ msgid
+ for msgid,state in self.states.items()
+ if state.isDone() ]
+
+ def unchunkMessages(self):
+ for msgid, state in self.states.items():
+ if not state.hasReadyChunks():
+ continue
+ for chunkno, lst in state.getReadyChunks():
+ vs = []
+ minDate = min([fm.insertedDate for h,fm in lst])
+ for h,fm in lst:
+ vs.append((state.getPos(fm.index)[1],
+ self.store.getMessage(h)))
+ chunkText = self.store.params.getFEC().decode(vs)
+ fm2 = _FragmentMetadata(state.mesageid, state.hash,
+ state.idx, 1, chunkno, state.overhead,
+ minDate)
+ h2 = self.store.queueMessage(chunkText)
+ self.store.setMetadata(h2, fm2)
+ for h,fm in lst:
+ self.store.removeMessage(h)
+
+# ======================================================================
+
+_fectab = {}
+
+def _getFEC(k,n):
+ """DOCDOC: Note race condition """
+ try:
+ return _fectab[(k,n)]
+ except KeyError:
+ f = mixminion._minionlib.FEC_generate(k,n)
+ _fectab[(k,n)] = f
+ return f
Index: Crypto.py
===================================================================
RCS file: /home/minion/cvsroot/src/minion/lib/mixminion/Crypto.py,v
retrieving revision 1.50
retrieving revision 1.51
diff -u -d -r1.50 -r1.51
--- Crypto.py 24 Jul 2003 17:37:16 -0000 1.50
+++ Crypto.py 8 Aug 2003 21:42:47 -0000 1.51
@@ -29,6 +29,7 @@
'pk_encode_public_key', 'pk_encrypt', 'pk_fingerprint',
'pk_from_modulus', 'pk_generate', 'pk_get_modulus',
'pk_same_public_key', 'pk_sign', 'prng', 'sha1', 'strxor', 'trng',
+ 'unwhiten', 'whiten',
'AES_KEY_LEN', 'DIGEST_LEN', 'HEADER_SECRET_MODE', 'PRNG_MODE',
'RANDOM_JUNK_MODE', 'HEADER_ENCRYPT_MODE', 'APPLICATION_KEY_MODE',
'PAYLOAD_ENCRYPT_MODE', 'HIDE_HEADER_MODE' ]
@@ -181,6 +182,16 @@
right = ctr_crypt(right, _ml.sha1(left)[:AES_KEY_LEN])
left = _ml.strxor(left, _ml.sha1("".join((key1,right,key1))))
return left + right
+
+def whiten(s):
+ """DOCDOC"""
+ keys = Keyset("WHITEN").getLionessKeys("WHITEN")
+ return lioness_encrypt(s, keys)
+
+def unwhiten(s):
+ """DOCDOC"""
+ keys = Keyset("WHITEN").getLionessKeys("WHITEN")
+ return lioness_decrypt(s, keys)
def openssl_seed(count):
"""Seeds the openssl rng with 'count' bytes of real entropy."""
Index: Packet.py
===================================================================
RCS file: /home/minion/cvsroot/src/minion/lib/mixminion/Packet.py,v
retrieving revision 1.53
retrieving revision 1.54
diff -u -d -r1.53 -r1.54
--- Packet.py 28 Jul 2003 01:02:33 -0000 1.53
+++ Packet.py 8 Aug 2003 21:42:47 -0000 1.54
@@ -245,12 +245,12 @@
# Length of the 'MessageID' field in a fragment payload
FRAGMENT_MESSAGEID_LEN = 20
# Maximum number of fragments associated with a given message
-MAX_N_FRAGMENTS = 0x7ffff
+MAX_N_FRAGMENTS = 0x7ffffff
# Number of bytes taken up by header fields in a singleton payload.
SINGLETON_PAYLOAD_OVERHEAD = 2 + DIGEST_LEN
# Number of bytes taken up by header fields in a fragment payload.
-FRAGMENT_PAYLOAD_OVERHEAD = 2 + DIGEST_LEN + FRAGMENT_MESSAGEID_LEN + 4
+FRAGMENT_PAYLOAD_OVERHEAD = 3 + DIGEST_LEN + FRAGMENT_MESSAGEID_LEN + 4
# Number of bytes taken up from OAEP padding in an encrypted forward
# payload, minus bytes saved by spilling the RSA-encrypted block into the
# tag, minus the bytes taken by the session key.
@@ -265,9 +265,10 @@
bit0 = ord(payload[0]) & 0x80
if bit0:
# We have a fragment
- idx, hash, msgID, msgLen = struct.unpack(FRAGMENT_UNPACK_PATTERN,
- payload[:FRAGMENT_PAYLOAD_OVERHEAD])
- idx &= 0x7f
+ idxhi, idxlo, hash, msgID, msgLen = \
+ struct.unpack(FRAGMENT_UNPACK_PATTERN,
+ payload[:FRAGMENT_PAYLOAD_OVERHEAD])
+ idx = ((idxhi & 0x7f) << 16) + idxlo
contents = payload[FRAGMENT_PAYLOAD_OVERHEAD:]
if msgLen <= len(contents):
raise ParseError("Payload has an invalid size field")
@@ -284,14 +285,13 @@
# A singleton payload starts with a 0 bit, 15 bits of size, and a 20-byte hash
SINGLETON_UNPACK_PATTERN = "!H%ds" % (DIGEST_LEN)
-# A fragment payload starts with a 1 bit, a 15-bit packet index, a
+# A fragment payload starts with a 1 bit, a 23-bit packet index, a
# 20-byte hash, a 20-byte message ID, and 4 bytes of message size.
-FRAGMENT_UNPACK_PATTERN = "!H%ds%dsL" % (DIGEST_LEN, FRAGMENT_MESSAGEID_LEN)
+FRAGMENT_UNPACK_PATTERN = "!BH%ds%dsL" % (DIGEST_LEN, FRAGMENT_MESSAGEID_LEN)
class _Payload:
pass
-
class SingletonPayload(_Payload):
"""Represents the payload for a standalone mixminion message.
Fields: size, hash, data. (Note that data is padded.)"""
@@ -300,6 +300,10 @@
self.hash = hash
self.data = data
+ def computeHash(self):
+ """DOCDOC"""
+ self.hash = sha1(self.data)
+
def isSingleton(self):
"""Returns true; this is a singleton payload."""
return 1
@@ -333,9 +337,10 @@
self.data = data
def computeHash(self):
+ """DOCDOC"""
self.hash = "X"*DIGEST_LEN
p = self.pack()
- self.hash = sha1("XXXX")
+ self.hash = sha1(p[23:])
def isSingleton(self):
"""Return false; not a singleton"""
@@ -349,10 +354,14 @@
assert len(self.data) < self.msgLen < 0x100000000L
assert (PAYLOAD_LEN - FRAGMENT_PAYLOAD_OVERHEAD - len(self.data)) in \
(0, ENC_FWD_OVERHEAD)
- idx = self.index | 0x8000
- header = struct.pack(FRAGMENT_UNPACK_PATTERN, idx, self.hash,
- self.msgID, self.msgLen)
+ idxhi = ((self.index & 0xff0000) >> 16) | 0x80
+ idxlo = self.index & 0x00fffff
+ header = struct.pack(FRAGMENT_UNPACK_PATTERN, idxhi, idxlo,
+ self.hash, self.msgID, self.msgLen)
return "%s%s" % (header, self.data)
+
+ def getOverhead(self):
+ return PAYLOAD_LEN - FRAGMENT_PAYLOAD_OVERHEAD - len(self.data)