[Author Prev][Author Next][Thread Prev][Thread Next][Author Index][Thread Index]

[minion-cvs] Add new Fragments.py module to hold fragmentation and u...



Update of /home/minion/cvsroot/src/minion/lib/mixminion
In directory moria.mit.edu:/tmp/cvs-serv32726

Modified Files:
	BuildMessage.py Crypto.py Filestore.py benchmark.py test.py 
Added Files:
	Fragments.py 
Log Message:
Add new Fragments.py module to hold fragmentation and unfragmentation code

--- NEW FILE: Fragments.py ---
# Copyright 2002-2003 Nick Mathewson.  See LICENSE for licensing information.
# $Id: Fragments.py,v 1.1 2003/08/14 19:37:24 nickm Exp $

"""mixminion.BuildMessage

   Code to fragment and reassemble messages."""

import mixminion._minionlib
import mixminion.Filestore
from mixminion.Crypto import getCommonPRNG, whiten, unwhiten
from mixminion.Common import LOG, previousMidnight, MixError, MixFatalError

MAX_FRAGMENTS_PER_CHUNK = 32
EXP_FACTOR = 1.33333333333

class _FragmentationParams:
    """DOCDOC"""
    ## Fields:
    # k, n, length, fec, chunkSize, fragmentCapacity, dataFragments,
    # totalFragments, paddingLen, paddedLen
    def __init__(self, length, overhead):
        assert overhead in (0, ENC_FWD_OVERHEAD)
        self.length = length
        self.fragCapacity = PAYLOAD_LEN - FRAGMENT_PAYLOAD_OVERHEAD - overhead
        # minimum number of payloads to hold msg, without fragmentation
        # or padding.
        minFragments = ceilDiv(length, self.fragCapacity)
        # Number of data fragments per chunk.
        self.k = 2
        while k < minFragments and k < 16:
            self.k *= 2
        # Number of chunks.
        self.nChunks = ceilDiv(minFragments, k)
        # Data in  a single chunk
        self.chunkSize = self.fragCapacity * self.k
        # Length of data to fill chunks
        self.paddedLen = self.nChunks * self.fragCapacity * self.k
        # Length of padding needed to fill all chunks with data.
        self.paddingLen = self.paddedLen - length
        # Number of total fragments per chunk.
        self.n = math.ceil(EXP_FACTOR * k)
        # FEC object
        self.fec = None

    def getFEC(self):
        if self.fec is None:
            self.fec = _getFEC(self.k, self.n)
        return self.fec

    def getPosition(self, index):
        """DOCDOC"""
        chunk, pos = divmod(index, self.n)
        return chunk, pos

    def getFragments(self, s, paddingPRNG=None):
        if paddingPRNG is None:
            paddingPRNG = getCommonPRNG()

        self.getFEC()
        assert s.length == self.length
        s = whiten(s)
        s += paddingPRNG.getBytes(self.paddingLen)
        assert len(s) == self.paddedLen
        
        chunks = []
        for i in xrange(self.nChunks):
            chunks.append( s[i*self.chunkSize:(i+1)*self.chunkSize] )
        del s

        fragments = []
        for i in xrange(self.nChunks):
            blocks = []
            for j in xrange(self.k):
                blocks.append( chunks[i][j*self.fragCapacity:
                                         (j+1)*self.fragCapacity] )
            chunks[i] = None
            for j in xrange(p.n):
                fragments.append( self.fec.encode(j, blocks) )
        return fragments

# ======================================================================
#DOCDOC this entire section

class MismatchedFragment(Exception):
    pass

class UnneededFragment(Exception):
    pass

class _FragmentMetadata:
    def __init__(self, messageid, hash, idx, size, isChunk, chunkNum, overhead,
                 insertedDate):
        self.messageid = messageid
        self.hash = hash
        self.idx = idx
        self.size = size
        self.isChunk = isChunk
        self.chunkNum = chunkNum
        self.overhead = overhead
        self.insertedDate = insertedDate

    def __getstate__(self):
        return ("V0", self.messageid, self.hash, self.idx, self.size,
                self.isChunk, self.chunkNum, self.insertedDate)

    def __setstate__(self, o):
        if state[0] == 'V0':
            (_, self.messageid, self.hash, self.idx, self.size,
             self.isChunk, self.chunkNum, self.insertedDate) = state
        else:
            raise MixFatalError("Unrecognized fragment state")

class MessageState:
    def __init__(self, messageid, hash, length, overhead):
        self.messageid = messageid
        self.hash = hash
        self.overhead = overhead
        # chunkno -> handle,fragmentmeta
        self.chunks = {} 
        # chunkno -> idxwithinchunk -> (handle,fragmentmeta)
        self.fragmentsByChunk = []
        self.params = _FragmentationParams(length, overhead)
        for i in xrange(self.params.nChunks):
            self.fragmentsByChunk.append({})
        # chunkset: ready chunk num -> 1
        self.readyChunks = {}
        
    def isDone(self):
        return len(self.chunks) == self.params.nChunks

    def getChunkHandles(self):
        return [ self.chunks[i][0] for i in xrange(self.params.nChunks) ]

    def addChunk(self, h, fm):
        # h is handle
        # fm is fragmentmetadata
        assert fm.isChunk
        assert fm.messageid == self.messageid
        if (fm.size != self.params.length or
            fm.hash != self.hash or
            fm.overhead != self.overhead or
            self.chunks.has_key(fm.chunkNum)):
            raise MismatchedFragment
        
        self.chunks[fm.chunkNum] = (h,fm)

    def addFragment(self, h, fm):
        # h is handle
        # fm is fragmentmetadata
        assert fm.messageid == self.messageid

        if (fm.hash != self.hash or
            fm.size != self.params.length or
            fm.overhead != self.overhead):
            raise MismatchedFragment
        
        chunkNum, pos = self.params.getPosition(idx)

        if self.chunks.has_key(chunkNum):
            raise UnneededFragment
        
        if self.fragmentsByChunk[chunkNum].has_key(pos):
            raise MismatchedFragment

        self.fragmentsByChunk[chunkNum][pos] = (h, fm)

        if len(self.fragmentsByChunk(chunkNum)) >= self.params.k:
            self.readyChunks[chunkNum] = 1

    def hasReadyChunks(self):
        return len(self.readyChunks) != 0

    def getReadyChunks(self):
        """DOCDOC"""
        # return list of [ (chunkno, [(h, fm)...]) )
        r = []
        for chunkno in self.readyChunks.keys():
            ch = self.fragmentsByChunk[chunkno].values()[:self.params.k]
            r.append( (chunkno, ch) )
        return r

class _FragmentDB(mixminion.Filestore.DBBase):
    def __init__(self, location):
        mixminion.Filestore.DBBase.__init__(self, location, "fragment")
    def markStatus(self, msgid, status, today):
        assert status in ("COMPLETED", "REJECTED")
        if now is None:
            now = time.time()
        self[msgid] = (status, today)
    def getStatusAndTime(self, msgid):
        return self.get(msgid, None)
    def _encodeKey(self, k):
        return binascii.b2a_hex(k)
    def _encodeVal(self, v):
        status, tm = v
        return "%s-%s"%(
            {"COMPLETED":"C", "REJECTED":"R"}[status], str(tm))
    def _decodeVal(self, v):
        status = {"C":"COMPLETED", "R":"REJECTED"}[v[0]]
        tm = int(tm[2:])
        return status, tm

class FragmentPool:
    """DOCDOC"""
    ##
    # messages : map from 
    def __init__(self, dir):
        self.store = mixminion.Filestore.StringMetadataStore(dir,create=1,
                                                             scrub=1)
        self.log = _FragmentDB(dir+"_db")
        self.rescan()

    def getState(self, fm):
        try:
            return self.states[fm.messageid]
        except KeyError:
            state = MessageState(messageid=fm.messageid,
                                 hash=fm.hash,
                                 length=fm.size,
                                 overhead=fm.overhead)
            self.states[fm.messageid] = state
            return state
        
    def rescan(self):
        self.store.loadAllMetadata()
        meta = self.store._metadata_cache
        self.states = states = {}
        badMessageIDs = {}
        unneededHandles = []
        for h, fm in meta.items():
            try:
                mid = fm.messageid
                if badMessageIDs.has_key(mid):
                    continue
                state = self.getState(fm)
                if fm.isChunk:
                    state.addChunk(h, fm)
                else:
                    state.addFragment(h, fm)
            except MismatchedFragment:
                badMessageIDs[mid] = 1
            except UnneededFragment:
                unneededHandles.append(h)

        for h in unneededHandles:
            fm = meta[h]
            LOG.debug("Removing unneeded fragment %s from message ID %r",
                      fm.idx, fm.messageid)
            self.removeMessage(h)

        self._abortMessageIDs(badMessageIDs, today)

    def _abortMessageIDs(self, messageIDSet, today=None):
        if today is None:
            today = previousMidnight(time.time())
        else:
            today = previousMidnight(today)
        LOG.debug("Removing bogus messages by IDs: %s", messageIDSet.keys())
        for mid in messageIDSet.keys():
            self.markStatus(mid, "REJECTED", today)
        for h, fm in self._metadata_cache.items():
            if messageIDSet.has_key(fm.messageid):
                self.removeMessage(h)

    def _getPacketMetadata(self, fragmentPacket):
        return  _FragmentMetadata(messageid=fragmentPacket.msgID,
                                  idx=fragmentPacket.index,
                                  hash=fragmentPacket.hash,
                                  size=fragmentPacket.msgLen,
                                  isChunk=0,
                                  chunkNum=None,
                                  overhead=fragmentPacket.getOverhead(),
                                  insertedDate=previousMidnight(now))
        
    def addFragment(self, fragmentPacket, now=None):
        if now is None:
            now = time.time()
        today = previousMidnight(now)

        meta = self._getFragmentMetadata(fragmentPacket)
        state = self.getState(meta)
        try:
            state.addFragment(fragmentPacket)
            h = self.store.queueMessageAndMetadata(fragmentPacket.data, meta)
        except MismatchedFragment:
            # remove other fragments, mark msgid as bad.            
            self._abortMessageIDs({ meta.id : 1}, now)
        except UnneededFragment:
            LOG.debug("Dropping unneeded fragment %s of message %r",
                      fragmentPacket.idx, fragmentPacket.msgID)

    def getReadyMessage(self, msgid):
        s = self.states.get(msgid)
        if not s or not s.isDone():
            return None

        hs = s.getChunkHandles()
        return "".join([self.state.getMessage(h) for h in hs])

    def deleteMessage(self, msgid):
        s = self.states.get(msgid)
        if not s or not s.isDone():
            return None

        hs = s.getChunkHandles()
        for h in hs:
            self.store.removeMessage(h)

    def getReadyMessages(self):
        return [ msgid
                 for msgid,state in self.states.items()
                 if state.isDone() ]

    def unchunkMessages(self):
        for msgid, state in self.states.items():
            if not state.hasReadyChunks():
                continue
            for chunkno, lst in state.getReadyChunks():
                vs = []
                minDate = min([fm.insertedDate for h,fm in lst])
                for h,fm in lst:
                    vs.append((state.getPos(fm.index)[1],
                               self.store.getMessage(h)))
                chunkText = self.store.params.getFEC().decode(vs)
                fm2 = _FragmentMetadata(state.mesageid, state.hash,
                                        state.idx, 1, chunkno, state.overhead,
                                        minDate)
                h2 = self.store.queueMessage(chunkText)
                self.store.setMetadata(h2, fm2)
                for h,fm in lst:
                    self.store.removeMessage(h)
            
# ======================================================================

_fectab = {}

def _getFEC(k,n):
    """DOCDOC: Note race condition """
    try:
        return _fectab[(k,n)]
    except KeyError:
        f = mixminion._minionlib.FEC_generate(k,n)
        _fectab[(k,n)] = f
        return f
    

Index: BuildMessage.py
===================================================================
RCS file: /home/minion/cvsroot/src/minion/lib/mixminion/BuildMessage.py,v
retrieving revision 1.52
retrieving revision 1.53
diff -u -d -r1.52 -r1.53
--- BuildMessage.py	8 Aug 2003 21:42:46 -0000	1.52
+++ BuildMessage.py	14 Aug 2003 19:37:24 -0000	1.53
@@ -11,6 +11,7 @@
 import types
 
 import mixminion.Crypto as Crypto
+import mixminion.Fragments
 from mixminion.Packet import *
 from mixminion.Common import MixError, MixFatalError, LOG, UIError
 import mixminion._minionlib
@@ -18,15 +19,78 @@
 if sys.version_info[:3] < (2,2,0):
     import mixminion._zlibutil as zlibutil
 
-__all__ = ['buildForwardMessage', 'buildEncryptedMessage', 'buildReplyMessage',
-           'buildReplyBlock', 'checkPathLength', 'decodePayload' ]
+__all__ = ['buildForwardMessage', 'buildEncryptedMessage',
+           'buildReplyMessage', 'buildReplyBlock', 'checkPathLength',
+           'encodePayloads', 'decodePayload' ]
+
+def encodePayloads(message, overhead, paddingPRNG):
+    """Given a message, compress it, fragment it into individual payloads,
+       and add extra fields (size, hash, etc) as appropriate.  Return a list
+       of strings, each of which is a message payload suitable for use in
+       build*Message.
+
+       payload: the initial payload
+              overhead: number of bytes to omit from each payload,
+                        given the type ofthe message encoding.
+                        (0 or ENC_FWD_OVERHEAD)
+              paddingPRNG: generator for padding.
+
+       Note: If multiple strings are returned, be sure to shuffle them
+       before transmitting them to the network.
+    """
+    assert overhead in (0, ENC_FWD_OVERHEAD)
+    if paddingPRNG is None:
+        paddingPRNG = Crypto.getCommonPRNG()
+    origLength = len(message)
+    payload = compressData(message)
+    length = len(payload)
+
+    if length > 1024 and length*20 <= origLength:
+        LOG.warn("Message is very compressible and will look like a zlib bomb")
+
+    paddingLen = PAYLOAD_LEN - SINGLETON_PAYLOAD_OVERHEAD - overhead - length
+
+    # If the compressed payload fits in 28K, we're set.
+    if paddingLen >= 0:
+        # We pad the payload, and construct a new SingletonPayload,
+        # including this payload's size and checksum.
+        payload += paddingPRNG.getBytes(paddingLen)
+        p = SingletonPayload(length, None, payload)
+        p.computeHash()
+        return [ p.pack() ]
+
+    # DOCDOC
+    messageid = getCommonPRNG().getBytes(20)
+    p = mixminion.Fragments.FragmentationParams(len(payload), overhead)
+    rawFragments = p.getFragments(payload)
+    fragments = []
+    for i in xrange(len(rawFragments)):
+        pyld = FragmentPayload(i, None, messageid, p.length, rawFragments[i])
+        pyld.computeHash()
+        fragments.append(pyld.pack())
+        rawFragments[i] = None
+    return fragments
 
 def buildForwardMessage(payload, exitType, exitInfo, path1, path2,
                         paddingPRNG=None):
+    # Compress, pad, and checksum the payload.
+    if payload is not None and exitType != DROP_TYPE:
+        payloads = encodePayloads(payload, 0, paddingPRNG)
+        assert len(payloads) == 1
+        payload = payloads[0]
+        LOG.debug("Encoding forward message for %s-byte payload",len(payload))
+    else:
+        payload = (paddingPRNG or Crypto.getCommonPRNG()).getBytes(PAYLOAD_LEN)
+        LOG.debug("Generating DROP message with %s bytes", PAYLOAD_LEN)
+
+    return _buildForwardMessage(payload, exitType, exitInfo, path1, path2,
+                                paddingPRNG)
+
+def _buildForwardMessage(payload, exitType, exitInfo, path1, path2,
+                        paddingPRNG=None):
     """Construct a forward message.
-            payload: The payload to deliver.  Must compress to under 28K-22b.
-                  If it does not, MixError is raised.  If the payload is
-                  None, 28K of random data is sent.
+            payload: The payload to deliver.  Must be exactly 28K.  If the
+                  payload is None, 28K of random data is sent.
             exitType: The routing type for the final node. (2 bytes, >=0x100)
             exitInfo: The routing info for the final node, not including tag.
             path1: Sequence of ServerInfo objects for the first leg of the path
@@ -46,15 +110,8 @@
     suppressTag = 0
     if exitType == DROP_TYPE:
         suppressTag = 1
-        payload = None
 
-    # Compress, pad, and checksum the payload.
-    if payload is not None:
-        payload = _encodePayload(payload, 0, paddingPRNG)
-        LOG.debug("Encoding forward message for %s-byte payload",len(payload))
-    else:
-        payload = paddingPRNG.getBytes(PAYLOAD_LEN)
-        LOG.debug("Generating DROP message with %s bytes", PAYLOAD_LEN)
+    assert len(payload) == PAYLOAD_LEN
 
     LOG.debug("  Using path %s:%s",
                    ",".join([s.getNickname() for s in path1]),
@@ -70,10 +127,17 @@
 
 def buildEncryptedForwardMessage(payload, exitType, exitInfo, path1, path2,
                                  key, paddingPRNG=None, secretRNG=None):
+    payloads = encodePayloads(payload, ENC_FWD_OVERHEAD, paddingPRNG)
+    assert len(payloads) == 1
+    return _buildEncryptedForwardMessage(payloads[0], exitType, exitInfo,
+                                         path1, path2, key, paddingPRNG,
+                                         secretRNG)
+
+def _buildEncryptedForwardMessage(payload, exitType, exitInfo, path1, path2,
+                                 key, paddingPRNG=None, secretRNG=None):
     """Construct a forward message encrypted with the public key of a
        given user.
-            payload: The payload to deliver.  Must compress to under 28K-60b.
-                  If it does not, MixError is raised.
+            payload: The payload to deliver.  Must be 28K-42b long.
             exitType: The routing type for the final node. (2 bytes, >=0x100)
             exitInfo: The routing info for the final node, not including tag.
             path1: Sequence of ServerInfo objects for the first leg of the path
@@ -93,10 +157,9 @@
                    [s.getNickname() for s in path2])
     LOG.debug("  Delivering to %04x:%r", exitType, exitInfo)
 
-    # Compress, pad, and checksum the payload.
     # (For encrypted-forward messages, we have overhead for OAEP padding
     #   and the session  key, but we save 20 bytes by spilling into the tag.)
-    payload = _encodePayload(payload, ENC_FWD_OVERHEAD, paddingPRNG)
+    assert len(payload) == PAYLOAD_LEN - ENC_FWD_OVERHEAD
 
     # Generate the session key, and prepend it to the payload.
     sessionKey = secretRNG.getBytes(SECRET_LEN)
@@ -131,8 +194,14 @@
     return _buildMessage(payload, exitType, exitInfo, path1, path2,paddingPRNG)
 
 def buildReplyMessage(payload, path1, replyBlock, paddingPRNG=None):
+    payloads = encodePayloads(payload, 0, paddingPRNG)
+    assert len(payloads) == 1
+    return _buildReplyMessage(payloads[0], path1, replyBlock, paddingPRNG)
+
+def _buildReplyMessage(payload, path1, replyBlock, paddingPRNG=None):
     """Build a message using a reply block.  'path1' is a sequence of
-       ServerInfo for the nodes on the first leg of the path.
+       ServerInfo for the nodes on the first leg of the path.  'payload'
+       must be exactly 28K long.
     """
     if paddingPRNG is None:
         paddingPRNG = Crypto.getCommonPRNG()
@@ -141,8 +210,7 @@
                    len(payload))
     LOG.debug("  Using path %s/??",[s.getNickname() for s in path1])
 
-    # Compress, pad, and checksum the payload.
-    payload = _encodePayload(payload, 0, paddingPRNG)
+    assert len(payload) == PAYLOAD_LEN
 
     # Encrypt the payload so that it won't appear as plaintext to the
     #  crossover note.  (We use 'decrypt' so that the message recipient can
@@ -600,125 +668,6 @@
 #----------------------------------------------------------------------
 # Payload-related helpers
 
-MAX_FRAGMENTS_PER_CHUNK = 32
-EXP_FACTOR = 1.33333333333
-
-def _encodePayloads(message, overhead, paddingPRNG):
-    """DOCDOC"""
-    assert overhead in (0, ENC_FWD_OVERHEAD)
-    origLength = len(message)
-    payload = compress(message)
-    length = len(payload)
-
-    if length > 1024 and length*20 <= origLength:
-        LOG.warn("Message is very compressible and will look like a zlib bomb")
-
-    paddingLen = PAYLOAD_LEN - SINGLETON_PAYLOAD_OVERHEAD - overhead - length
-
-    # If the compressed payload fits in 28K, we're set.
-    if paddingLen >= 0:
-        # We pad the payload, and construct a new SingletonPayload,
-        # including this payload's size and checksum.
-        payload += paddingPRNG.getBytes(paddingLen)
-        p = SingletonPayload(length, None, payload)
-        p.computeHash()
-        return [ p.pack() ]
-
-    # DOCDOC
-    payload = whiten(payload)
-    p = _FragmentationParams(len(payload), overhead)
-    
-    payload += paddingPRNG.getBytes(p.paddingLen)
-    assert len(payload) == p.paddedLen
-    chunks = []
-    for i in xrange(p.nChunks):
-        chunk[i] = payload[i*p.chunkSize:(i+1)*p.chunkSize]
-    del payload
-    messageid = getCommonPRNG().getBytes(20)
-    
-    idx = 0
-    fragments = []
-    for i in xrange(p.nChunks):
-        blocks = []
-        for j in xrange(p.k):
-            blocks[j] = chunks[i][j*p.fragCapacity:(j+1)*p.fragCapacity]
-        chunks[i] = None
-        for j in xrange(p.n):
-            frag = p.fec.encode(j, blocks)
-            pyld = FragmentPayload(idx, None, messageid, p.length, frag)
-            pyld.computeHash()
-            fragments.append(pyld.pack())
-            idx += 1
-    return fragments
-
-class _FragmentationParams:
-    """DOCDOC"""
-    ## Fields:
-    # k, n, length, fec, chunkSize, fragmentCapacity, dataFragments,
-    # totalFragments, paddingLen, paddedLen
-    def __init__(self, length, overhead):
-        assert overhead in (0, ENC_FWD_OVERHEAD)
-        self.length = length
-        self.fragCapacity = PAYLOAD_LEN - FRAGMENT_PAYLOAD_OVERHEAD - overhead
-        # minimum number of payloads to hold msg, without fragmentation
-        # or padding.
-        minFragments = ceilDiv(length, self.fragCapacity)
-        # Number of data fragments per chunk.
-        self.k = 2
-        while k < minFragments and k < 16:
-            self.k *= 2
-        # Number of chunks.
-        self.nChunks = ceilDiv(minFragments, k)
-        # Data in  a single chunk
-        self.chunkSize = self.fragCapacity * self.k
-        # Length of data to fill chunks
-        self.paddedLen = self.nChunks * self.fragCapacity * self.k
-        # Length of padding needed to fill all chunks with data.
-        self.paddingLen = self.paddedLen - length
-        # Number of total fragments per chunk.
-        self.n = math.ceil(EXP_FACTOR * k)
-        # FEC object
-        self.fec = None
-
-    def getFEC(self):
-        if self.fec is None:
-            self.fec = _getFEC(self.k, self.n)
-        return self.fec
-
-    def getPosition(self, index):
-        """DOCDOC"""
-        chunk, pos = divmod(index, self.n)
-        return chunk, pos
-
-def _encodePayload(payload, overhead, paddingPRNG):
-    """Helper: compress a payload, pad it, and add extra fields (size and hash)
-              payload: the initial payload
-              overhead: number of bytes to omit from result
-                        (0 or ENC_FWD_OVERHEAD)
-              paddingPRNG: generator for padding.
-
-       BUG: This should eventually support K-of-N.
-    """
-    assert overhead in (0, ENC_FWD_OVERHEAD)
-
-    # Compress the data, and figure out how much padding we'll need.
-    origLength = len(payload)
-    payload = compressData(payload)
-    length = len(payload)
-
-    if length > 1024 and length*20 <= origLength:
-        LOG.warn("Message is very compressible and will look like a zlib bomb")
-
-    paddingLen = PAYLOAD_LEN - SINGLETON_PAYLOAD_OVERHEAD - overhead - length
-
-    # If the compressed payload doesn't fit in 28K, then we need to bail out.
-    if paddingLen < 0:
-        raise MixError("Payload too long for singleton message")
-
-    # Otherwise, we pad the payload, and construct a new SingletonPayload,
-    # including this payload's size and checksum.
-    payload += paddingPRNG.getBytes(paddingLen)
-    return SingletonPayload(length, Crypto.sha1(payload), payload).pack()
 
 def _getRandomTag(rng):
     "Helper: Return a 20-byte string with the MSB of byte 0 set to 0."
@@ -782,268 +731,3 @@
 
     return routing, sizes, totalSize
 
-# ======================================================================
-
-class MismatchedFragment(Exception):
-    pass
-
-class UnneededFragment(Exception):
-    pass
-
-class _FragmentMetadata:
-    def __init__(self, messageid, hash, idx, size, isChunk, chunkNum, overhead,
-                 insertedDate):
-        self.messageid = messageid
-        self.hash = hash
-        self.idx = idx
-        self.size = size
-        self.isChunk = isChunk
-        self.chunkNum = chunkNum
-        self.overhead = overhead
-        self.insertedDate = insertedDate
-
-    def __getstate__(self):
-        return ("V0", self.messageid, self.hash, self.idx, self.size,
-                self.isChunk, self.chunkNum, self.insertedDate)
-
-    def __setstate__(self, o):
-        if state[0] == 'V0':
-            (_, self.messageid, self.hash, self.idx, self.size,
-             self.isChunk, self.chunkNum, self.insertedDate) = state
-        else:
-            raise MixFatalError("Unrecognized fragment state")
-
-class MessageState:
-    def __init__(self, messageid, hash, length, overhead):
-        self.messageid = messageid
-        self.hash = hash
-        self.overhead = overhead
-        # chunkno -> handle,fragmentmeta
-        self.chunks = {} 
-        # chunkno -> idxwithinchunk -> (handle,fragmentmeta)
-        self.fragmentsByChunk = []
-        self.params = _FragmentationParams(length, overhead)
-        for i in xrange(self.params.nChunks):
-            self.fragmentsByChunk.append({})
-        # chunkset: ready chunk num -> 1
-        self.readyChunks = {}
-        
-    def isDone(self):
-        return len(self.chunks) == self.params.nChunks
-
-    def getChunkHandles(self):
-        return [ self.chunks[i][0] for i in xrange(self.params.nChunks) ]
-
-    def addChunk(self, h, fm):
-        # h is handle
-        # fm is fragmentmetadata
-        assert fm.isChunk
-        assert fm.messageid == self.messageid
-        if (fm.size != self.params.length or
-            fm.hash != self.hash or
-            fm.overhead != self.overhead or
-            self.chunks.has_key(fm.chunkNum)):
-            raise MismatchedFragment
-        
-        self.chunks[fm.chunkNum] = (h,fm)
-
-    def addFragment(self, h, fm):
-        # h is handle
-        # fm is fragmentmetadata
-        assert fm.messageid == self.messageid
-
-        if (fm.hash != self.hash or
-            fm.size != self.params.length or
-            fm.overhead != self.overhead):
-            raise MismatchedFragment
-        
-        chunkNum, pos = self.params.getPosition(idx)
-
-        if self.chunks.has_key(chunkNum):
-            raise UnneededFragment
-        
-        if self.fragmentsByChunk[chunkNum].has_key(pos):
-            raise MismatchedFragment
-
-        self.fragmentsByChunk[chunkNum][pos] = (h, fm)
-
-        if len(self.fragmentsByChunk(chunkNum)) >= self.params.k:
-            self.readyChunks[chunkNum] = 1
-
-    def hasReadyChunks(self):
-        return len(self.readyChunks) != 0
-
-    def getReadyChunks(self):
-        """DOCDOC"""
-        # return list of [ (chunkno, [(h, fm)...]) )
-        r = []
-        for chunkno in self.readyChunks.keys():
-            ch = self.fragmentsByChunk[chunkno].values()[:self.params.k]
-            r.append( (chunkno, ch) )
-        return r
-
-class _FragmentDB(mixminion.Filestore.DBBase):
-    def __init__(self, location):
-        mixminion.Filestore.DBBase.__init__(self, location, "fragment")
-    def markStatus(self, msgid, status, today):
-        assert status in ("COMPLETED", "REJECTED")
-        if now is None:
-            now = time.time()
-        self[msgid] = (status, today)
-    def getStatusAndTime(self, msgid):
-        return self.get(msgid, None)
-    def _encodeKey(self, k):
-        return binascii.b2a_hex(k)
-    def _encodeVal(self, v):
-        status, tm = v
-        return "%s-%s"%(
-            {"COMPLETED":"C", "REJECTED":"R"}[status], str(tm))
-    def _decodeVal(self, v):
-        status = {"C":"COMPLETED", "R":"REJECTED"}[v[0]]
-        tm = int(tm[2:])
-        return status, tm
-
-class FragmentPool:
-    """DOCDOC"""
-    ##
-    # messages : map from 
-    def __init__(self, dir):
-        self.store = mixminion.Filestore.StringMetadataStore(dir,create=1,
-                                                             scrub=1)
-        self.log = _FragmentDB(dir+"_db")
-        self.rescan()
-
-    def getState(self, fm):
-        try:
-            return self.states[fm.messageid]
-        except KeyError:
-            state = MessageState(messageid=fm.messageid,
-                                 hash=fm.hash,
-                                 length=fm.size,
-                                 overhead=fm.overhead)
-            self.states[fm.messageid] = state
-            return state
-        
-    def rescan(self):
-        self.store.loadAllMetadata()
-        meta = self.store._metadata_cache
-        self.states = states = {}
-        badMessageIDs = {}
-        unneededHandles = []
-        for h, fm in meta.items():
-            try:
-                mid = fm.messageid
-                if badMessageIDs.has_key(mid):
-                    continue
-                state = self.getState(fm)
-                if fm.isChunk:
-                    state.addChunk(h, fm)
-                else:
-                    state.addFragment(h, fm)
-            except MismatchedFragment:
-                badMessageIDs[mid] = 1
-            except UnneededFragment:
-                unneededHandles.append(h)
-
-        for h in unneededHandles:
-            fm = meta[h]
-            LOG.debug("Removing unneeded fragment %s from message ID %r",
-                      fm.idx, fm.messageid)
-            self.removeMessage(h)
-
-        self._abortMessageIDs(badMessageIDs, today)
-
-    def _abortMessageIDs(self, messageIDSet, today=None):
-        if today is None:
-            today = previousMidnight(time.time())
-        LOG.debug("Removing bogus messages by IDs: %s", messageIDSet.keys())
-        for mid in messageIDSet.keys():
-            self.markStatus(mid, "REJECTED", today)
-        for h, fm in self._metadata_cache.items():
-            if messageIDSet.has_key(fm.messageid):
-                self.removeMessage(h)
-
-    def _getPacketMetadata(self, fragmentPacket):
-        return  _FragmentMetadata(messageid=fragmentPacket.msgID,
-                                  idx=fragmentPacket.index,
-                                  hash=fragmentPacket.hash,
-                                  size=fragmentPacket.msgLen,
-                                  isChunk=0,
-                                  chunkNum=None,
-                                  overhead=fragmentPacket.getOverhead(),
-                                  insertedDate=previousMidnight(now))
-        
-    def addFragment(self, fragmentPacket, now=None):
-        if now is None:
-            now = time.time()
-        today = previousMidnight(now)
-
-        meta = self._getFragmentMetadata(fragmentPacket)
-        h = self.store.queueMessage(fragmentPacket.data)
-        self.store.setMetadata(h, meta)
-
-        state = self.getState(fm)
-        try:
-            state.addFragment(fragmentPacket)
-        except MismatchedFragment:
-            self._abortMessageIDs({ meta.id
-            self.removeMessage(h)
-            # XXXX remove other fragments, mark msgid as bad.
-        except UnneededFragment:
-            LOG.debug("Dropping unneeded fragment %s of message %r",
-                      fragmentPacket.idx, fragmentPacket.msgID)
-            self.removeMessage(h)
-
-    def getReadyMessage(self, msgid):
-        s = self.states.get(msgid)
-        if not s or not s.isDone():
-            return None
-
-        hs = s.getChunkHandles()
-        return "".join([self.state.getMessage(h) for h in hs])
-
-    def deleteMessage(self, msgid):
-        s = self.states.get(msgid)
-        if not s or not s.isDone():
-            return None
-
-        hs = s.getChunkHandles()
-        for h in hs:
-            self.store.removeMessage(h)
-
-    def getReadyMessages(self):
-        return [ msgid
-                 for msgid,state in self.states.items()
-                 if state.isDone() ]
-
-    def unchunkMessages(self):
-        for msgid, state in self.states.items():
-            if not state.hasReadyChunks():
-                continue
-            for chunkno, lst in state.getReadyChunks():
-                vs = []
-                minDate = min([fm.insertedDate for h,fm in lst])
-                for h,fm in lst:
-                    vs.append((state.getPos(fm.index)[1],
-                               self.store.getMessage(h)))
-                chunkText = self.store.params.getFEC().decode(vs)
-                fm2 = _FragmentMetadata(state.mesageid, state.hash,
-                                        state.idx, 1, chunkno, state.overhead,
-                                        minDate)
-                h2 = self.store.queueMessage(chunkText)
-                self.store.setMetadata(h2, fm2)
-                for h,fm in lst:
-                    self.store.removeMessage(h)
-            
-# ======================================================================
-
-_fectab = {}
-
-def _getFEC(k,n):
-    """DOCDOC: Note race condition """
-    try:
-        return _fectab[(k,n)]
-    except KeyError:
-        f = mixminion._minionlib.FEC_generate(k,n)
-        _fectab[(k,n)] = f
-        return f

Index: Crypto.py
===================================================================
RCS file: /home/minion/cvsroot/src/minion/lib/mixminion/Crypto.py,v
retrieving revision 1.51
retrieving revision 1.52
diff -u -d -r1.51 -r1.52
--- Crypto.py	8 Aug 2003 21:42:47 -0000	1.51
+++ Crypto.py	14 Aug 2003 19:37:24 -0000	1.52
@@ -184,12 +184,16 @@
     return left + right
 
 def whiten(s):
-    """DOCDOC"""
+    """Return a whitened version of a string 's', using the whitening
+       algorithm from 'E2E-spec.txt'.
+
+       The functions 'unwhiten' inverts 'whiten', but if any portion of
+       whiten(s) is not known, no part of 's' can be recovered."""
     keys = Keyset("WHITEN").getLionessKeys("WHITEN")
     return lioness_encrypt(s, keys)
 
 def unwhiten(s):
-    """DOCDOC"""
+    """Given a whitened string, return the original string."""
     keys = Keyset("WHITEN").getLionessKeys("WHITEN")
     return lioness_decrypt(s, keys)
 

Index: Filestore.py
===================================================================
RCS file: /home/minion/cvsroot/src/minion/lib/mixminion/Filestore.py,v
retrieving revision 1.4
retrieving revision 1.5
diff -u -d -r1.4 -r1.5
--- Filestore.py	8 Aug 2003 21:40:42 -0000	1.4
+++ Filestore.py	14 Aug 2003 19:37:24 -0000	1.5
@@ -14,13 +14,14 @@
 # time to refactor the common code.
 
 import anydbm
+import binascii
+import cPickle
 import dumbdbm
-import os
 import errno
-import time
+import os
 import stat
-import cPickle
 import threading
+import time
 
 from mixminion.Common import MixFatalError, secureDelete, LOG, \
      createPrivateDir, readFile, tryUnlink
@@ -28,7 +29,13 @@
 
 __all__ = [ "StringStore", "StringMetadataStore",
             "ObjectStore", "ObjectMetadataStore", 
-            "MixedStore", "MixedMetadataStore" ]
+            "MixedStore", "MixedMetadataStore",
+            "DBBase", "JournaledDBBase", "BooleanJournaledDBBase"
+            ]
+
+
+# ======================================================================
+# Filestores.
 
 # Mode to pass to open(2) for creating a new file, and dying if it already
 # exists.
@@ -88,7 +95,6 @@
            'create' is true, creates the directory if necessary.  If 'scrub'
            is true, removes any incomplete or invalidated messages from the
            store."""
-
         secureDelete([]) # Make sure secureDelete is configured. HACK!
 
         self._lock = threading.RLock()
@@ -192,20 +198,24 @@
         file, handle = getCommonPRNG().openNewFile(self.dir, "inp_", 1)
         return file, handle
 
-    def finishMessage(self, f, handle, ismeta=0):
+    def finishMessage(self, f, handle, _ismeta=0):
         """Given a file and a corresponding handle, closes the file
            commits the corresponding message."""
+        # if '_ismeta' is true, we're finishing not a message, but the
+        # metadata for a message
         f.close()
-        if ismeta:
+        if _ismeta:
             self._changeState(handle, "inpm", "meta")
         else:
             self._changeState(handle, "inp", "msg")
 
-    def abortMessage(self, f, handle, ismeta=0):
+    def abortMessage(self, f, handle, _ismeta=0):
         """Given a file and a corresponding handle, closes the file
            rejects the corresponding message."""
+        # if '_ismeta' is true, we're finishing not a message, but the
+        # metadata for a message
         f.close()
-        if ismeta:
+        if _ismeta:
             self._changeState(handle, "inpm", "rmvm")
         else:
             self._changeState(handle, "inp", "rmv")
@@ -240,7 +250,6 @@
             secureDeleteFn(rmv)
         else:
             secureDelete(rmv, blocking=1)
-        return 0
 
     def _changeState(self, handle, s1, s2):
         """Helper method: changes the state of message 'handle' from 's1'
@@ -251,6 +260,7 @@
                 os.rename(os.path.join(self.dir, s1+"_"+handle),
                           os.path.join(self.dir, s2+"_"+handle))
             except OSError, e:
+                # WWWW On windows, replacing metdata can create an error!
                 contents = os.listdir(self.dir)
                 LOG.error("Error while trying to change %s from %s to %s: %s",
                           handle, s1, s2, e)
@@ -268,6 +278,9 @@
             self._lock.release()
 
 class StringStoreMixin:
+    """Combine the 'StringStoreMixin' class with a BaseStore in order
+       to implement a BaseStore that stores strings.
+    """
     def __init__(self): pass
     def messageContents(self, handle):
         """Given a message handle, returns the contents of the corresponding
@@ -287,23 +300,10 @@
         self.finishMessage(f, handle) # handles locking
         return handle
 
-    def moveMessage(self, handle, other):
-        """Given a handle and a queue, moves the corresponding message
-           from this filestore to the filestore provided.  Returns a
-           new handle for the message in the destination queue."""
-
-        # Since we're switching handles, we don't want to just rename;
-        # We really want to copy and delete the old file.
-        try:
-            self._lock.acquire()
-            newHandle = other.queueMessage(self.messageContents(handle))
-            self.removeMessage(handle)
-        finally:
-            self._lock.release()
-
-        return newHandle    
-
 class ObjectStoreMixin:
+    """Combine the 'ObjectStoreMixin' class with a BaseStore in order
+       to implement a BaseStore that stores strings.
+    """
     def __init__(self): pass
     def getObject(self, handle):
         """Given a message handle, read and unpickle the contents of the
@@ -326,11 +326,50 @@
         return handle
 
 class BaseMetadataStore(BaseStore):
+    """A BaseMetadataStore is a BaseStore that stores a metadata
+       object for every object in the store.  We assume metadata to be
+       relatively volitile compared to the underlying stored objects.
+       Metadata is not always wiped before removal.
+
+       The representation of a store with metadata is the same as that
+       of a simple store, except that:
+           1) The meta_, rmvm_, and inpm_ tags are used.
+           2) For every file in msg_ state, there is a corresponding meta_
+              file.
+    """
+    ##Fields:
+    # _metadata_cache: map from handle to cached metadata object.  This is
+    #    a write-through cache.
     def __init__(self, location, create=0, scrub=0):
+        """Create a new BaseMetadataStore to store files in 'location'. The
+           'create' and 'scrub' arguments are as for BaseStore(...)."""
         BaseStore.__init__(self, location=location, create=create, scrub=scrub)
         self._metadata_cache = {}
+        if scrub:
+            self.cleanMetadata()
+
+    def cleanMetadata(self,secureDeleteFn=None):
+        """Find all orphaned metadata files and remove them."""
+        hSet = {}
+        for h in self.getAllMessages():
+            hSet[h] = 1
+        rmv = []
+        for h in [fn[5:] for fn in os.listdir(self.dir)
+                  if fn.startswith("meta_")]:
+            if not hSet.get(h):
+                rmv.append("meta_"+h)
+        if rmv:
+            LOG.warn("Removing %s orphaned metadata files from %s",
+                     len(rmv), self.dir)
+            if secureDeleteFn:
+                secureDeleteFn(rmv)
+            else:
+                secureDelete(rmv, blocking=1)
 
     def loadAllMetadata(self, newDataFn):
+        """For all objects in the store, load their metadata into the internal
+           cache.  If any object is missing its metadata, create metadata for
+           it by invoking newDataFn(handle)."""
         try:
             self._lock.acquire()
             self._metadata_cache = {}
@@ -344,6 +383,7 @@
             self._lock.release()
 
     def getMetadata(self, handle):
+        """Return the metadata associated with a given handle."""
         fname = os.path.join(self.dir, "meta_"+handle)
         if not os.path.exists(fname):
             raise KeyError(handle)
@@ -362,21 +402,25 @@
             self._lock.release()
 
     def setMetadata(self, handle, object):
-        """DOCDOC"""
+        """Change the metadata associated with a given handle."""
         try:
             self._lock.acquire()
             fname = os.path.join(self.dir, "inpm_"+handle)
             f = os.fdopen(os.open(fname, _NEW_FILE_FLAGS, 0600), "wb")
             cPickle.dump(object, f, 1)
-            self.finishMessage(f, handle, ismeta=1)
+            self.finishMessage(f, handle, _ismeta=1)
             self._metadata_cache[handle] = object
             return handle
         finally:
             self._lock.release()
 
     def removeMessage(self, handle):
+        """Given a handle, removes the corresponding message from the
+           filestore.  """
         try:
             self._lock.acquire()
+            # Remove the message before the metadata, so we don't have
+            # a message without metadata.
             BaseStore.removeMessage(self, handle)
             if os.path.exists(os.path.join(self.dir, "meta_"+handle)):
                 self._changeState(handle, "meta", "rmvm")
@@ -388,25 +432,55 @@
         finally:
             self._lock.release()
 
+class StringMetadataStoreMixin(StringStoreMixin):
+    """Add this mixin class to a BaseMetadataStore in order to get a
+       filestore that stores strings with metadata."""
+    def __init__(self):
+        StringStoreMixin.__init__(self)
+    def queueMessage(self, message):
+        LOG.warn("Called 'queueMessage' on a metadata store.")
+        return self.queueMessageAndMetadata(message, None)
+    def queueMessageAndMetadata(self, message, metadata):
+        f, handle = self.openNewMessage()
+        f.write(message)
+        self.setMetadata(handle, metadata)
+        self.finishMessage(f, handle) # handles locking
+        return handle
+
+class ObjectMetadataStoreMixin(ObjectStoreMixin):
+    """Add this mixin class to a BaseMetadataStore in order to get a
+       filestore that stores objects with metadata."""
+    def __init__(self):
+        ObjectStoreMixin.__init__(self)
+    def queueObject(self, object):
+        LOG.warn("Called 'queueObject' on a metadata store.")
+        return self.queueObjectAndMetadata(message, None)
+    def queueObjectAndMetadata(self, object, metadata):
+        f, handle = self.openNewMessage()
+        cPickle.dump(object, f, 1)
+        self.setMetadata(handle, metadata)
+        self.finishMessage(f, handle) # handles locking
+        return handle
+
 class StringStore(BaseStore, StringStoreMixin):
     def __init__(self, location, create=0, scrub=0):
         BaseStore.__init__(self, location, create, scrub)
         StringStoreMixin.__init__(self)
 
-class StringMetadataStore(BaseMetadataStore, StringStoreMixin):
+class StringMetadataStore(BaseMetadataStore, StringMetadataStoreMixin):
     def __init__(self, location, create=0, scrub=0):
         BaseMetadataStore.__init__(self, location, create, scrub)
-        StringStoreMixin.__init__(self)
+        StringMetadataStoreMixin.__init__(self)
 
 class ObjectStore(BaseStore, ObjectStoreMixin):
     def __init__(self, location, create=0, scrub=0):
         BaseStore.__init__(self, location, create, scrub)
         ObjectStoreMixin.__init__(self)
 
-class ObjectMetadataStore(BaseMetadataStore, ObjectStoreMixin):
+class ObjectMetadataStore(BaseMetadataStore, ObjectMetadataStoreMixin):
     def __init__(self, location, create=0, scrub=0):
         BaseMetadataStore.__init__(self, location, create, scrub)
-        ObjectStoreMixin.__init__(self)
+        ObjectMetadataStoreMixin.__init__(self)
         
 class MixedStore(BaseStore, StringStoreMixin, ObjectStoreMixin):
     def __init__(self, location, create=0, scrub=0):
@@ -414,31 +488,51 @@
         StringStoreMixin.__init__(self)
         ObjectStoreMixin.__init__(self)
 
-class MixedMetadataStore(BaseMetadataStore, StringStoreMixin,
-                         ObjectStoreMixin):
+class MixedMetadataStore(BaseMetadataStore, StringMetadataStoreMixin,
+                         ObjectMetadataStoreMixin):
     def __init__(self, location, create=0, scrub=0):
         BaseMetadataStore.__init__(self, location, create, scrub)
-        StringStoreMixin.__init__(self)
-        ObjectStoreMixin.__init__(self)
+        StringMetadataStoreMixin.__init__(self)
+        ObjectMetadataStoreMixin.__init__(self)
 
 # ======================================================================
 # Database wrappers
 
 class DBBase:
-    # _lock
+    """A DBBase is a persistant store that maps keys to values, using
+       a Python anydbm object.
+
+       It differs from the standard python 'shelve' module:
+          - by handling broken databases files,
+          - by warning when using dumbdbm,
+          - by providing a 'sync' feature,
+          - by bypassing the pickle module's overhead,
+          - by providing thread-safety
+
+       To use this class for non-string keys or values, override the
+       _{en|de}code{Key|Value} methods."""
+    ## Fields:
+    # _lock -- A threading.RLock to protect access to database.
+    # filename -- The name of the underlying database file.  Note that some
+    #       database implementations (such as dumdbm) create multiple files,
+    #       using <filename> as a prefix.
+    # log -- The underlying anydbm object.
     def __init__(self, filename, purpose=""):
+        """Create a DBBase object for a database stored in 'filename',
+           creating the underlying database if needed."""
         self._lock = threading.RLock()
         self.filename = filename
         parent = os.path.split(filename)[0]
         createPrivateDir(parent)
 
+        # If the file can't be read, bail.
         try:
             st = os.stat(filename)
         except OSError, e:
             if e.errno != errno.ENOENT:
                 raise
             st = None
-
+        # If the file is empty, delete it and start over.
         if st and st[stat.ST_SIZE] == 0:
             LOG.warn("Half-created database %s found; cleaning up.", filename)
             tryUnlink(filename)
@@ -461,10 +555,16 @@
         # flush the journal, and so on.
 
     def _encodeKey(self, k):
+        """Given a key for this mapping (a Python object), return a string
+           usable as a key by the underlying databse."""
         return k
     def _encodeVal(self, v):
+        """Given a value for this mapping (a Python object), return a string
+           usable as a value by the underlying databse."""
         return v
     def _decodeVal(self, v):
+        """Given a string-encoded value as used in the underlying database,
+           return the original Python object."""
         return v
 
     def has_key(self, k):
@@ -477,6 +577,9 @@
     def __getitem__(self, k):
         return self.getItem(k)
 
+    def keys(self):
+        return map(self._decodeKey, self.log.keys())
+
     def get(self, k, default=None):
         try:
             return self[k]
@@ -486,6 +589,9 @@
     def __setitem__(self, k, v):
         self.setItem(k, v)
 
+    def __delitem__(self, k):
+        self.delItem(k)
+
     def getItem(self, k):
         try:
             self._lock.acquire()
@@ -499,8 +605,16 @@
             self.log[self._encodeKey(k)] = self._encodeVal(v)
         finally:
             self._lock.release()
+
+    def delItem(self, k):
+        try:
+            self._lock.acquire()
+            del self.log[self._encodeKey(k)]
+        finally:
+            self._lock.release()
         
     def sync(self):
+        """Flush all pending changes to disk"""
         self._lock.acquire()
         try:
             self.log.sync()
@@ -508,6 +622,7 @@
             self._lock.release()
 
     def close(self):
+        """Release resources associated with this database."""
         self._lock.acquire()
         try:
             self.log.close()
@@ -515,11 +630,30 @@
         finally:
             self._lock.release()
 
+# Flags for use when opening the journal.
 _JOURNAL_OPEN_FLAGS = os.O_WRONLY|os.O_CREAT|getattr(os,'O_SYNC',0)|getattr(os,'O_BINARY',0)
 
 class JournaledDBBase(DBBase):
+    """Optimized version of DBBase that requires fewer sync() operations.
+       Uses a journal file to cache keys and values until they can be written
+       to the underlying database.  Keys and values must all encode to stings
+       of the same length."""
+    # Largest allowed number of journal entries before we flush the journal
+    # to disk.
     MAX_JOURNAL = 128
+    ## Fields:
+    # klen -- required length of encoded keys
+    # vlen -- required length of encoded values
+    # vdflt -- If vlen is 0, default value used when reading journaled value
+    #      from disk.
+    # journal -- map from journal-encoded key to journal-encoded value.
+    # journalFileName -- filename to use for journal file.
+    # journalFile -- fd for the journal file
+    
     def __init__(self, location, purpose, klen, vlen, vdflt):
+        """Create a new JournaledDBBase that stores its files to match the
+           pattern 'location*', whose journal-encoded keys are all of length
+           klen, whose journal-encoded values are all of length vlen."""
         DBBase.__init__(self, location, purpose)
 
         self.klen = klen
@@ -528,6 +662,7 @@
 
         self.journalFileName = location+"_jrnl"
         self.journal = {}
+        # If there's a journal file, snarf it into memory.
         if os.path.exists(self.journalFileName):
             j = readFile(self.journalFileName, 1)
             for i in xrange(0, len(j), klen+vlen):
@@ -564,6 +699,10 @@
         finally:
             self._lock.release()
 
+    def keys(self):
+        return map(self._decodeKey,  self.log.keys()) + \
+               map(self._jDecodeKey, self.journal.keys())
+
     def setItem(self, k, v):
         jk = self._jEncodeKey(k)
         jv = self._jEncodeVal(v)
@@ -580,6 +719,9 @@
         finally:
             self._lock.release()
 
+    def delItem(self, k):
+        raise NotImplemented
+
     def sync(self):
         self._lock.acquire()
         try:
@@ -604,3 +746,19 @@
             os.close(self.journalFile)
         finally:
             self._lock.release()
+
+class BooleanJournaledDBBase(JournaledDBBase):
+    """Specialization of JournaledDBBase that encodes a set of keys, mapping
+       each key to the value '1'."""
+    def __init__(self, location, purpose, klen):
+        JournaledDBBase.__init__(self,location,purpose,klen,0,"1")
+    def _encodeKey(self, k):
+        return binascii.b2a_hex(k)
+    def _jEncodeVal(self, v):
+        return ""
+    def _jDecodeVal(self, k):
+        return 1
+    def _encodeVal(self, v):
+        return "1"
+    def _decodeVal(self, v):
+        return 1

Index: benchmark.py
===================================================================
RCS file: /home/minion/cvsroot/src/minion/lib/mixminion/benchmark.py,v
retrieving revision 1.45
retrieving revision 1.46
diff -u -d -r1.45 -r1.46
--- benchmark.py	13 Jul 2003 03:45:34 -0000	1.45
+++ benchmark.py	14 Aug 2003 19:37:24 -0000	1.46
@@ -24,7 +24,7 @@
 import mixminion.server.ServerQueue
 
 from mixminion.BuildMessage import _buildHeader, buildForwardMessage, \
-     compressData, uncompressData, _encodePayload, decodePayload
+     compressData, uncompressData, encodePayloads, decodePayload
 from mixminion.Common import secureDelete, installSIGCHLDHandler, \
      waitForChildren, formatBase64, Lockfile
 from mixminion.Crypto import *
@@ -479,7 +479,8 @@
 #----------------------------------------------------------------------
 def serverQueueTiming():
     print "#================= SERVER QUEUES ====================="
-    Queue = mixminion.server.ServerQueue.Queue
+    import mixminion.Filestore
+    Queue = mixminion.Filestore.MixedStore
     DeliveryQueue = mixminion.server.ServerQueue.DeliveryQueue
     d1 = mix_mktemp()
     q1 = Queue(d1, create=1)
@@ -568,19 +569,19 @@
     print "#=============== END-TO-END ENCODING =================="
     shortP = "hello world"
     prng = AESCounterPRNG()
-    p = _encodePayload(shortP, 0, prng)
+    p = encodePayloads(shortP, 0, prng)[0]
     t = prng.getBytes(20)
     print "Decode short payload", timeit(
         lambda p=p,t=t: decodePayload(p, t), 1000)
 
     k20 = prng.getBytes(20*1024)
-    p = _encodePayload(k20, 0, prng)
+    p = encodePayloads(k20, 0, prng)[0]
     t = prng.getBytes(20)
     print "Decode 20K payload", timeit(
         lambda p=p,t=t: decodePayload(p, t), 1000)
 
     comp = "x"*(20*1024)
-    p = _encodePayload(comp, 0, prng)
+    p = encodePayloads(comp, 0, prng)[0]
     t = prng.getBytes(20)
     def decode(p=p,t=t):
         try:
@@ -1002,7 +1003,6 @@
     if 0:
         testLeaks_FEC()
         return
-
     fecTiming()    
     cryptoTiming()
     rsaTiming()

Index: test.py
===================================================================
RCS file: /home/minion/cvsroot/src/minion/lib/mixminion/test.py,v
retrieving revision 1.142
retrieving revision 1.143
diff -u -d -r1.142 -r1.143
--- test.py	8 Aug 2003 21:40:40 -0000	1.142
+++ test.py	14 Aug 2003 19:37:25 -0000	1.143
@@ -1648,7 +1648,9 @@
 
         for m in (p.getBytes(3000), p.getBytes(10000), "", "q", "blznerty"):
             for ov in 0, 42-20+16: # encrypted forward overhead
-                pld = BuildMessage._encodePayload(m,ov,p)
+                plds = BuildMessage.encodePayloads(m,ov,p)
+                assert len(plds) == 1
+                pld = plds[0]
                 self.assertEquals(28*1024, len(pld)+ov)
                 comp = compressData(m)
                 self.assertStartsWith(pld[22:], comp)
@@ -2284,7 +2286,7 @@
         # get such a payload is to compress 25K of zeroes.
         nils = "\x00"*(25*1024)
         overcompressed_payload = \
-             BuildMessage._encodePayload(nils, 0, AESCounterPRNG())
+             BuildMessage.encodePayloads(nils, 0, AESCounterPRNG())[0]
         self.failUnlessRaises(CompressedDataTooLong,
              BuildMessage.decodePayload, overcompressed_payload, "X"*20)
 
@@ -2721,7 +2723,8 @@
         # Move the first 30 messages to queue2
         q2h = []
         for h in handles[:30]:
-            nh = queue1.moveMessage(h, queue2)
+            nh = queue2.queueMessage(queue1.messageContents(h))
+            queue1.removeMessage(h)
             q2h.append(nh)
 
         # Look at the messages in queue2, 15 then 30 at a time.
@@ -2792,15 +2795,14 @@
         Store = mixminion.Filestore.StringMetadataStore
 
         queue = Store(d_d, create=1)
-        h1 = queue.queueMessage("abc")
-        queue.setMetadata(h1, [2,3])
+        h1 = queue.queueMessageAndMetadata("abc", [2,3])
         self.assertEquals(readPickled(os.path.join(d_d, "meta_"+h1)), [2,3])
         self.assertEquals(queue.getMetadata(h1), [2,3])
-        h2 = queue.queueMessage("def")
-        queue.setMetadata(h2, [5,6])
-        h3 = queue.queueMessage("ghi")
-        self.assertEquals(queue._metadata_cache, { h1 : [2,3], h2 : [5,6] })
-
+        h2 = queue.queueMessageAndMetadata("def", [5,6])
+        h3 = queue.queueMessageAndMetadata("ghi", None)
+        self.assertEquals(queue._metadata_cache, { h1 : [2,3], h2 : [5,6],
+                                                   h3 : None })
+        os.unlink(os.path.join(d_d, "meta_"+h3))
         queue = Store(d_d, create=0)
         self.assertEquals(queue.getMetadata(h2), [5,6])
         self.assertEquals(queue._metadata_cache, { h2 : [5,6] })