[Author Prev][Author Next][Thread Prev][Thread Next][Author Index][Thread Index]
[minion-cvs] Add new Fragments.py module to hold fragmentation and u...
Update of /home/minion/cvsroot/src/minion/lib/mixminion
In directory moria.mit.edu:/tmp/cvs-serv32726
Modified Files:
BuildMessage.py Crypto.py Filestore.py benchmark.py test.py
Added Files:
Fragments.py
Log Message:
Add new Fragments.py module to hold fragmentation and unfragmentation code
--- NEW FILE: Fragments.py ---
# Copyright 2002-2003 Nick Mathewson. See LICENSE for licensing information.
# $Id: Fragments.py,v 1.1 2003/08/14 19:37:24 nickm Exp $
"""mixminion.BuildMessage
Code to fragment and reassemble messages."""
import mixminion._minionlib
import mixminion.Filestore
from mixminion.Crypto import getCommonPRNG, whiten, unwhiten
from mixminion.Common import LOG, previousMidnight, MixError, MixFatalError
MAX_FRAGMENTS_PER_CHUNK = 32
EXP_FACTOR = 1.33333333333
class _FragmentationParams:
"""DOCDOC"""
## Fields:
# k, n, length, fec, chunkSize, fragmentCapacity, dataFragments,
# totalFragments, paddingLen, paddedLen
def __init__(self, length, overhead):
assert overhead in (0, ENC_FWD_OVERHEAD)
self.length = length
self.fragCapacity = PAYLOAD_LEN - FRAGMENT_PAYLOAD_OVERHEAD - overhead
# minimum number of payloads to hold msg, without fragmentation
# or padding.
minFragments = ceilDiv(length, self.fragCapacity)
# Number of data fragments per chunk.
self.k = 2
while k < minFragments and k < 16:
self.k *= 2
# Number of chunks.
self.nChunks = ceilDiv(minFragments, k)
# Data in a single chunk
self.chunkSize = self.fragCapacity * self.k
# Length of data to fill chunks
self.paddedLen = self.nChunks * self.fragCapacity * self.k
# Length of padding needed to fill all chunks with data.
self.paddingLen = self.paddedLen - length
# Number of total fragments per chunk.
self.n = math.ceil(EXP_FACTOR * k)
# FEC object
self.fec = None
def getFEC(self):
if self.fec is None:
self.fec = _getFEC(self.k, self.n)
return self.fec
def getPosition(self, index):
"""DOCDOC"""
chunk, pos = divmod(index, self.n)
return chunk, pos
def getFragments(self, s, paddingPRNG=None):
if paddingPRNG is None:
paddingPRNG = getCommonPRNG()
self.getFEC()
assert s.length == self.length
s = whiten(s)
s += paddingPRNG.getBytes(self.paddingLen)
assert len(s) == self.paddedLen
chunks = []
for i in xrange(self.nChunks):
chunks.append( s[i*self.chunkSize:(i+1)*self.chunkSize] )
del s
fragments = []
for i in xrange(self.nChunks):
blocks = []
for j in xrange(self.k):
blocks.append( chunks[i][j*self.fragCapacity:
(j+1)*self.fragCapacity] )
chunks[i] = None
for j in xrange(p.n):
fragments.append( self.fec.encode(j, blocks) )
return fragments
# ======================================================================
#DOCDOC this entire section
class MismatchedFragment(Exception):
pass
class UnneededFragment(Exception):
pass
class _FragmentMetadata:
def __init__(self, messageid, hash, idx, size, isChunk, chunkNum, overhead,
insertedDate):
self.messageid = messageid
self.hash = hash
self.idx = idx
self.size = size
self.isChunk = isChunk
self.chunkNum = chunkNum
self.overhead = overhead
self.insertedDate = insertedDate
def __getstate__(self):
return ("V0", self.messageid, self.hash, self.idx, self.size,
self.isChunk, self.chunkNum, self.insertedDate)
def __setstate__(self, o):
if state[0] == 'V0':
(_, self.messageid, self.hash, self.idx, self.size,
self.isChunk, self.chunkNum, self.insertedDate) = state
else:
raise MixFatalError("Unrecognized fragment state")
class MessageState:
def __init__(self, messageid, hash, length, overhead):
self.messageid = messageid
self.hash = hash
self.overhead = overhead
# chunkno -> handle,fragmentmeta
self.chunks = {}
# chunkno -> idxwithinchunk -> (handle,fragmentmeta)
self.fragmentsByChunk = []
self.params = _FragmentationParams(length, overhead)
for i in xrange(self.params.nChunks):
self.fragmentsByChunk.append({})
# chunkset: ready chunk num -> 1
self.readyChunks = {}
def isDone(self):
return len(self.chunks) == self.params.nChunks
def getChunkHandles(self):
return [ self.chunks[i][0] for i in xrange(self.params.nChunks) ]
def addChunk(self, h, fm):
# h is handle
# fm is fragmentmetadata
assert fm.isChunk
assert fm.messageid == self.messageid
if (fm.size != self.params.length or
fm.hash != self.hash or
fm.overhead != self.overhead or
self.chunks.has_key(fm.chunkNum)):
raise MismatchedFragment
self.chunks[fm.chunkNum] = (h,fm)
def addFragment(self, h, fm):
# h is handle
# fm is fragmentmetadata
assert fm.messageid == self.messageid
if (fm.hash != self.hash or
fm.size != self.params.length or
fm.overhead != self.overhead):
raise MismatchedFragment
chunkNum, pos = self.params.getPosition(idx)
if self.chunks.has_key(chunkNum):
raise UnneededFragment
if self.fragmentsByChunk[chunkNum].has_key(pos):
raise MismatchedFragment
self.fragmentsByChunk[chunkNum][pos] = (h, fm)
if len(self.fragmentsByChunk(chunkNum)) >= self.params.k:
self.readyChunks[chunkNum] = 1
def hasReadyChunks(self):
return len(self.readyChunks) != 0
def getReadyChunks(self):
"""DOCDOC"""
# return list of [ (chunkno, [(h, fm)...]) )
r = []
for chunkno in self.readyChunks.keys():
ch = self.fragmentsByChunk[chunkno].values()[:self.params.k]
r.append( (chunkno, ch) )
return r
class _FragmentDB(mixminion.Filestore.DBBase):
def __init__(self, location):
mixminion.Filestore.DBBase.__init__(self, location, "fragment")
def markStatus(self, msgid, status, today):
assert status in ("COMPLETED", "REJECTED")
if now is None:
now = time.time()
self[msgid] = (status, today)
def getStatusAndTime(self, msgid):
return self.get(msgid, None)
def _encodeKey(self, k):
return binascii.b2a_hex(k)
def _encodeVal(self, v):
status, tm = v
return "%s-%s"%(
{"COMPLETED":"C", "REJECTED":"R"}[status], str(tm))
def _decodeVal(self, v):
status = {"C":"COMPLETED", "R":"REJECTED"}[v[0]]
tm = int(tm[2:])
return status, tm
class FragmentPool:
"""DOCDOC"""
##
# messages : map from
def __init__(self, dir):
self.store = mixminion.Filestore.StringMetadataStore(dir,create=1,
scrub=1)
self.log = _FragmentDB(dir+"_db")
self.rescan()
def getState(self, fm):
try:
return self.states[fm.messageid]
except KeyError:
state = MessageState(messageid=fm.messageid,
hash=fm.hash,
length=fm.size,
overhead=fm.overhead)
self.states[fm.messageid] = state
return state
def rescan(self):
self.store.loadAllMetadata()
meta = self.store._metadata_cache
self.states = states = {}
badMessageIDs = {}
unneededHandles = []
for h, fm in meta.items():
try:
mid = fm.messageid
if badMessageIDs.has_key(mid):
continue
state = self.getState(fm)
if fm.isChunk:
state.addChunk(h, fm)
else:
state.addFragment(h, fm)
except MismatchedFragment:
badMessageIDs[mid] = 1
except UnneededFragment:
unneededHandles.append(h)
for h in unneededHandles:
fm = meta[h]
LOG.debug("Removing unneeded fragment %s from message ID %r",
fm.idx, fm.messageid)
self.removeMessage(h)
self._abortMessageIDs(badMessageIDs, today)
def _abortMessageIDs(self, messageIDSet, today=None):
if today is None:
today = previousMidnight(time.time())
else:
today = previousMidnight(today)
LOG.debug("Removing bogus messages by IDs: %s", messageIDSet.keys())
for mid in messageIDSet.keys():
self.markStatus(mid, "REJECTED", today)
for h, fm in self._metadata_cache.items():
if messageIDSet.has_key(fm.messageid):
self.removeMessage(h)
def _getPacketMetadata(self, fragmentPacket):
return _FragmentMetadata(messageid=fragmentPacket.msgID,
idx=fragmentPacket.index,
hash=fragmentPacket.hash,
size=fragmentPacket.msgLen,
isChunk=0,
chunkNum=None,
overhead=fragmentPacket.getOverhead(),
insertedDate=previousMidnight(now))
def addFragment(self, fragmentPacket, now=None):
if now is None:
now = time.time()
today = previousMidnight(now)
meta = self._getFragmentMetadata(fragmentPacket)
state = self.getState(meta)
try:
state.addFragment(fragmentPacket)
h = self.store.queueMessageAndMetadata(fragmentPacket.data, meta)
except MismatchedFragment:
# remove other fragments, mark msgid as bad.
self._abortMessageIDs({ meta.id : 1}, now)
except UnneededFragment:
LOG.debug("Dropping unneeded fragment %s of message %r",
fragmentPacket.idx, fragmentPacket.msgID)
def getReadyMessage(self, msgid):
s = self.states.get(msgid)
if not s or not s.isDone():
return None
hs = s.getChunkHandles()
return "".join([self.state.getMessage(h) for h in hs])
def deleteMessage(self, msgid):
s = self.states.get(msgid)
if not s or not s.isDone():
return None
hs = s.getChunkHandles()
for h in hs:
self.store.removeMessage(h)
def getReadyMessages(self):
return [ msgid
for msgid,state in self.states.items()
if state.isDone() ]
def unchunkMessages(self):
for msgid, state in self.states.items():
if not state.hasReadyChunks():
continue
for chunkno, lst in state.getReadyChunks():
vs = []
minDate = min([fm.insertedDate for h,fm in lst])
for h,fm in lst:
vs.append((state.getPos(fm.index)[1],
self.store.getMessage(h)))
chunkText = self.store.params.getFEC().decode(vs)
fm2 = _FragmentMetadata(state.mesageid, state.hash,
state.idx, 1, chunkno, state.overhead,
minDate)
h2 = self.store.queueMessage(chunkText)
self.store.setMetadata(h2, fm2)
for h,fm in lst:
self.store.removeMessage(h)
# ======================================================================
_fectab = {}
def _getFEC(k,n):
"""DOCDOC: Note race condition """
try:
return _fectab[(k,n)]
except KeyError:
f = mixminion._minionlib.FEC_generate(k,n)
_fectab[(k,n)] = f
return f
Index: BuildMessage.py
===================================================================
RCS file: /home/minion/cvsroot/src/minion/lib/mixminion/BuildMessage.py,v
retrieving revision 1.52
retrieving revision 1.53
diff -u -d -r1.52 -r1.53
--- BuildMessage.py 8 Aug 2003 21:42:46 -0000 1.52
+++ BuildMessage.py 14 Aug 2003 19:37:24 -0000 1.53
@@ -11,6 +11,7 @@
import types
import mixminion.Crypto as Crypto
+import mixminion.Fragments
from mixminion.Packet import *
from mixminion.Common import MixError, MixFatalError, LOG, UIError
import mixminion._minionlib
@@ -18,15 +19,78 @@
if sys.version_info[:3] < (2,2,0):
import mixminion._zlibutil as zlibutil
-__all__ = ['buildForwardMessage', 'buildEncryptedMessage', 'buildReplyMessage',
- 'buildReplyBlock', 'checkPathLength', 'decodePayload' ]
+__all__ = ['buildForwardMessage', 'buildEncryptedMessage',
+ 'buildReplyMessage', 'buildReplyBlock', 'checkPathLength',
+ 'encodePayloads', 'decodePayload' ]
+
+def encodePayloads(message, overhead, paddingPRNG):
+ """Given a message, compress it, fragment it into individual payloads,
+ and add extra fields (size, hash, etc) as appropriate. Return a list
+ of strings, each of which is a message payload suitable for use in
+ build*Message.
+
+ payload: the initial payload
+ overhead: number of bytes to omit from each payload,
+ given the type ofthe message encoding.
+ (0 or ENC_FWD_OVERHEAD)
+ paddingPRNG: generator for padding.
+
+ Note: If multiple strings are returned, be sure to shuffle them
+ before transmitting them to the network.
+ """
+ assert overhead in (0, ENC_FWD_OVERHEAD)
+ if paddingPRNG is None:
+ paddingPRNG = Crypto.getCommonPRNG()
+ origLength = len(message)
+ payload = compressData(message)
+ length = len(payload)
+
+ if length > 1024 and length*20 <= origLength:
+ LOG.warn("Message is very compressible and will look like a zlib bomb")
+
+ paddingLen = PAYLOAD_LEN - SINGLETON_PAYLOAD_OVERHEAD - overhead - length
+
+ # If the compressed payload fits in 28K, we're set.
+ if paddingLen >= 0:
+ # We pad the payload, and construct a new SingletonPayload,
+ # including this payload's size and checksum.
+ payload += paddingPRNG.getBytes(paddingLen)
+ p = SingletonPayload(length, None, payload)
+ p.computeHash()
+ return [ p.pack() ]
+
+ # DOCDOC
+ messageid = getCommonPRNG().getBytes(20)
+ p = mixminion.Fragments.FragmentationParams(len(payload), overhead)
+ rawFragments = p.getFragments(payload)
+ fragments = []
+ for i in xrange(len(rawFragments)):
+ pyld = FragmentPayload(i, None, messageid, p.length, rawFragments[i])
+ pyld.computeHash()
+ fragments.append(pyld.pack())
+ rawFragments[i] = None
+ return fragments
def buildForwardMessage(payload, exitType, exitInfo, path1, path2,
paddingPRNG=None):
+ # Compress, pad, and checksum the payload.
+ if payload is not None and exitType != DROP_TYPE:
+ payloads = encodePayloads(payload, 0, paddingPRNG)
+ assert len(payloads) == 1
+ payload = payloads[0]
+ LOG.debug("Encoding forward message for %s-byte payload",len(payload))
+ else:
+ payload = (paddingPRNG or Crypto.getCommonPRNG()).getBytes(PAYLOAD_LEN)
+ LOG.debug("Generating DROP message with %s bytes", PAYLOAD_LEN)
+
+ return _buildForwardMessage(payload, exitType, exitInfo, path1, path2,
+ paddingPRNG)
+
+def _buildForwardMessage(payload, exitType, exitInfo, path1, path2,
+ paddingPRNG=None):
"""Construct a forward message.
- payload: The payload to deliver. Must compress to under 28K-22b.
- If it does not, MixError is raised. If the payload is
- None, 28K of random data is sent.
+ payload: The payload to deliver. Must be exactly 28K. If the
+ payload is None, 28K of random data is sent.
exitType: The routing type for the final node. (2 bytes, >=0x100)
exitInfo: The routing info for the final node, not including tag.
path1: Sequence of ServerInfo objects for the first leg of the path
@@ -46,15 +110,8 @@
suppressTag = 0
if exitType == DROP_TYPE:
suppressTag = 1
- payload = None
- # Compress, pad, and checksum the payload.
- if payload is not None:
- payload = _encodePayload(payload, 0, paddingPRNG)
- LOG.debug("Encoding forward message for %s-byte payload",len(payload))
- else:
- payload = paddingPRNG.getBytes(PAYLOAD_LEN)
- LOG.debug("Generating DROP message with %s bytes", PAYLOAD_LEN)
+ assert len(payload) == PAYLOAD_LEN
LOG.debug(" Using path %s:%s",
",".join([s.getNickname() for s in path1]),
@@ -70,10 +127,17 @@
def buildEncryptedForwardMessage(payload, exitType, exitInfo, path1, path2,
key, paddingPRNG=None, secretRNG=None):
+ payloads = encodePayloads(payload, ENC_FWD_OVERHEAD, paddingPRNG)
+ assert len(payloads) == 1
+ return _buildEncryptedForwardMessage(payloads[0], exitType, exitInfo,
+ path1, path2, key, paddingPRNG,
+ secretRNG)
+
+def _buildEncryptedForwardMessage(payload, exitType, exitInfo, path1, path2,
+ key, paddingPRNG=None, secretRNG=None):
"""Construct a forward message encrypted with the public key of a
given user.
- payload: The payload to deliver. Must compress to under 28K-60b.
- If it does not, MixError is raised.
+ payload: The payload to deliver. Must be 28K-42b long.
exitType: The routing type for the final node. (2 bytes, >=0x100)
exitInfo: The routing info for the final node, not including tag.
path1: Sequence of ServerInfo objects for the first leg of the path
@@ -93,10 +157,9 @@
[s.getNickname() for s in path2])
LOG.debug(" Delivering to %04x:%r", exitType, exitInfo)
- # Compress, pad, and checksum the payload.
# (For encrypted-forward messages, we have overhead for OAEP padding
# and the session key, but we save 20 bytes by spilling into the tag.)
- payload = _encodePayload(payload, ENC_FWD_OVERHEAD, paddingPRNG)
+ assert len(payload) == PAYLOAD_LEN - ENC_FWD_OVERHEAD
# Generate the session key, and prepend it to the payload.
sessionKey = secretRNG.getBytes(SECRET_LEN)
@@ -131,8 +194,14 @@
return _buildMessage(payload, exitType, exitInfo, path1, path2,paddingPRNG)
def buildReplyMessage(payload, path1, replyBlock, paddingPRNG=None):
+ payloads = encodePayloads(payload, 0, paddingPRNG)
+ assert len(payloads) == 1
+ return _buildReplyMessage(payloads[0], path1, replyBlock, paddingPRNG)
+
+def _buildReplyMessage(payload, path1, replyBlock, paddingPRNG=None):
"""Build a message using a reply block. 'path1' is a sequence of
- ServerInfo for the nodes on the first leg of the path.
+ ServerInfo for the nodes on the first leg of the path. 'payload'
+ must be exactly 28K long.
"""
if paddingPRNG is None:
paddingPRNG = Crypto.getCommonPRNG()
@@ -141,8 +210,7 @@
len(payload))
LOG.debug(" Using path %s/??",[s.getNickname() for s in path1])
- # Compress, pad, and checksum the payload.
- payload = _encodePayload(payload, 0, paddingPRNG)
+ assert len(payload) == PAYLOAD_LEN
# Encrypt the payload so that it won't appear as plaintext to the
# crossover note. (We use 'decrypt' so that the message recipient can
@@ -600,125 +668,6 @@
#----------------------------------------------------------------------
# Payload-related helpers
-MAX_FRAGMENTS_PER_CHUNK = 32
-EXP_FACTOR = 1.33333333333
-
-def _encodePayloads(message, overhead, paddingPRNG):
- """DOCDOC"""
- assert overhead in (0, ENC_FWD_OVERHEAD)
- origLength = len(message)
- payload = compress(message)
- length = len(payload)
-
- if length > 1024 and length*20 <= origLength:
- LOG.warn("Message is very compressible and will look like a zlib bomb")
-
- paddingLen = PAYLOAD_LEN - SINGLETON_PAYLOAD_OVERHEAD - overhead - length
-
- # If the compressed payload fits in 28K, we're set.
- if paddingLen >= 0:
- # We pad the payload, and construct a new SingletonPayload,
- # including this payload's size and checksum.
- payload += paddingPRNG.getBytes(paddingLen)
- p = SingletonPayload(length, None, payload)
- p.computeHash()
- return [ p.pack() ]
-
- # DOCDOC
- payload = whiten(payload)
- p = _FragmentationParams(len(payload), overhead)
-
- payload += paddingPRNG.getBytes(p.paddingLen)
- assert len(payload) == p.paddedLen
- chunks = []
- for i in xrange(p.nChunks):
- chunk[i] = payload[i*p.chunkSize:(i+1)*p.chunkSize]
- del payload
- messageid = getCommonPRNG().getBytes(20)
-
- idx = 0
- fragments = []
- for i in xrange(p.nChunks):
- blocks = []
- for j in xrange(p.k):
- blocks[j] = chunks[i][j*p.fragCapacity:(j+1)*p.fragCapacity]
- chunks[i] = None
- for j in xrange(p.n):
- frag = p.fec.encode(j, blocks)
- pyld = FragmentPayload(idx, None, messageid, p.length, frag)
- pyld.computeHash()
- fragments.append(pyld.pack())
- idx += 1
- return fragments
-
-class _FragmentationParams:
- """DOCDOC"""
- ## Fields:
- # k, n, length, fec, chunkSize, fragmentCapacity, dataFragments,
- # totalFragments, paddingLen, paddedLen
- def __init__(self, length, overhead):
- assert overhead in (0, ENC_FWD_OVERHEAD)
- self.length = length
- self.fragCapacity = PAYLOAD_LEN - FRAGMENT_PAYLOAD_OVERHEAD - overhead
- # minimum number of payloads to hold msg, without fragmentation
- # or padding.
- minFragments = ceilDiv(length, self.fragCapacity)
- # Number of data fragments per chunk.
- self.k = 2
- while k < minFragments and k < 16:
- self.k *= 2
- # Number of chunks.
- self.nChunks = ceilDiv(minFragments, k)
- # Data in a single chunk
- self.chunkSize = self.fragCapacity * self.k
- # Length of data to fill chunks
- self.paddedLen = self.nChunks * self.fragCapacity * self.k
- # Length of padding needed to fill all chunks with data.
- self.paddingLen = self.paddedLen - length
- # Number of total fragments per chunk.
- self.n = math.ceil(EXP_FACTOR * k)
- # FEC object
- self.fec = None
-
- def getFEC(self):
- if self.fec is None:
- self.fec = _getFEC(self.k, self.n)
- return self.fec
-
- def getPosition(self, index):
- """DOCDOC"""
- chunk, pos = divmod(index, self.n)
- return chunk, pos
-
-def _encodePayload(payload, overhead, paddingPRNG):
- """Helper: compress a payload, pad it, and add extra fields (size and hash)
- payload: the initial payload
- overhead: number of bytes to omit from result
- (0 or ENC_FWD_OVERHEAD)
- paddingPRNG: generator for padding.
-
- BUG: This should eventually support K-of-N.
- """
- assert overhead in (0, ENC_FWD_OVERHEAD)
-
- # Compress the data, and figure out how much padding we'll need.
- origLength = len(payload)
- payload = compressData(payload)
- length = len(payload)
-
- if length > 1024 and length*20 <= origLength:
- LOG.warn("Message is very compressible and will look like a zlib bomb")
-
- paddingLen = PAYLOAD_LEN - SINGLETON_PAYLOAD_OVERHEAD - overhead - length
-
- # If the compressed payload doesn't fit in 28K, then we need to bail out.
- if paddingLen < 0:
- raise MixError("Payload too long for singleton message")
-
- # Otherwise, we pad the payload, and construct a new SingletonPayload,
- # including this payload's size and checksum.
- payload += paddingPRNG.getBytes(paddingLen)
- return SingletonPayload(length, Crypto.sha1(payload), payload).pack()
def _getRandomTag(rng):
"Helper: Return a 20-byte string with the MSB of byte 0 set to 0."
@@ -782,268 +731,3 @@
return routing, sizes, totalSize
-# ======================================================================
-
-class MismatchedFragment(Exception):
- pass
-
-class UnneededFragment(Exception):
- pass
-
-class _FragmentMetadata:
- def __init__(self, messageid, hash, idx, size, isChunk, chunkNum, overhead,
- insertedDate):
- self.messageid = messageid
- self.hash = hash
- self.idx = idx
- self.size = size
- self.isChunk = isChunk
- self.chunkNum = chunkNum
- self.overhead = overhead
- self.insertedDate = insertedDate
-
- def __getstate__(self):
- return ("V0", self.messageid, self.hash, self.idx, self.size,
- self.isChunk, self.chunkNum, self.insertedDate)
-
- def __setstate__(self, o):
- if state[0] == 'V0':
- (_, self.messageid, self.hash, self.idx, self.size,
- self.isChunk, self.chunkNum, self.insertedDate) = state
- else:
- raise MixFatalError("Unrecognized fragment state")
-
-class MessageState:
- def __init__(self, messageid, hash, length, overhead):
- self.messageid = messageid
- self.hash = hash
- self.overhead = overhead
- # chunkno -> handle,fragmentmeta
- self.chunks = {}
- # chunkno -> idxwithinchunk -> (handle,fragmentmeta)
- self.fragmentsByChunk = []
- self.params = _FragmentationParams(length, overhead)
- for i in xrange(self.params.nChunks):
- self.fragmentsByChunk.append({})
- # chunkset: ready chunk num -> 1
- self.readyChunks = {}
-
- def isDone(self):
- return len(self.chunks) == self.params.nChunks
-
- def getChunkHandles(self):
- return [ self.chunks[i][0] for i in xrange(self.params.nChunks) ]
-
- def addChunk(self, h, fm):
- # h is handle
- # fm is fragmentmetadata
- assert fm.isChunk
- assert fm.messageid == self.messageid
- if (fm.size != self.params.length or
- fm.hash != self.hash or
- fm.overhead != self.overhead or
- self.chunks.has_key(fm.chunkNum)):
- raise MismatchedFragment
-
- self.chunks[fm.chunkNum] = (h,fm)
-
- def addFragment(self, h, fm):
- # h is handle
- # fm is fragmentmetadata
- assert fm.messageid == self.messageid
-
- if (fm.hash != self.hash or
- fm.size != self.params.length or
- fm.overhead != self.overhead):
- raise MismatchedFragment
-
- chunkNum, pos = self.params.getPosition(idx)
-
- if self.chunks.has_key(chunkNum):
- raise UnneededFragment
-
- if self.fragmentsByChunk[chunkNum].has_key(pos):
- raise MismatchedFragment
-
- self.fragmentsByChunk[chunkNum][pos] = (h, fm)
-
- if len(self.fragmentsByChunk(chunkNum)) >= self.params.k:
- self.readyChunks[chunkNum] = 1
-
- def hasReadyChunks(self):
- return len(self.readyChunks) != 0
-
- def getReadyChunks(self):
- """DOCDOC"""
- # return list of [ (chunkno, [(h, fm)...]) )
- r = []
- for chunkno in self.readyChunks.keys():
- ch = self.fragmentsByChunk[chunkno].values()[:self.params.k]
- r.append( (chunkno, ch) )
- return r
-
-class _FragmentDB(mixminion.Filestore.DBBase):
- def __init__(self, location):
- mixminion.Filestore.DBBase.__init__(self, location, "fragment")
- def markStatus(self, msgid, status, today):
- assert status in ("COMPLETED", "REJECTED")
- if now is None:
- now = time.time()
- self[msgid] = (status, today)
- def getStatusAndTime(self, msgid):
- return self.get(msgid, None)
- def _encodeKey(self, k):
- return binascii.b2a_hex(k)
- def _encodeVal(self, v):
- status, tm = v
- return "%s-%s"%(
- {"COMPLETED":"C", "REJECTED":"R"}[status], str(tm))
- def _decodeVal(self, v):
- status = {"C":"COMPLETED", "R":"REJECTED"}[v[0]]
- tm = int(tm[2:])
- return status, tm
-
-class FragmentPool:
- """DOCDOC"""
- ##
- # messages : map from
- def __init__(self, dir):
- self.store = mixminion.Filestore.StringMetadataStore(dir,create=1,
- scrub=1)
- self.log = _FragmentDB(dir+"_db")
- self.rescan()
-
- def getState(self, fm):
- try:
- return self.states[fm.messageid]
- except KeyError:
- state = MessageState(messageid=fm.messageid,
- hash=fm.hash,
- length=fm.size,
- overhead=fm.overhead)
- self.states[fm.messageid] = state
- return state
-
- def rescan(self):
- self.store.loadAllMetadata()
- meta = self.store._metadata_cache
- self.states = states = {}
- badMessageIDs = {}
- unneededHandles = []
- for h, fm in meta.items():
- try:
- mid = fm.messageid
- if badMessageIDs.has_key(mid):
- continue
- state = self.getState(fm)
- if fm.isChunk:
- state.addChunk(h, fm)
- else:
- state.addFragment(h, fm)
- except MismatchedFragment:
- badMessageIDs[mid] = 1
- except UnneededFragment:
- unneededHandles.append(h)
-
- for h in unneededHandles:
- fm = meta[h]
- LOG.debug("Removing unneeded fragment %s from message ID %r",
- fm.idx, fm.messageid)
- self.removeMessage(h)
-
- self._abortMessageIDs(badMessageIDs, today)
-
- def _abortMessageIDs(self, messageIDSet, today=None):
- if today is None:
- today = previousMidnight(time.time())
- LOG.debug("Removing bogus messages by IDs: %s", messageIDSet.keys())
- for mid in messageIDSet.keys():
- self.markStatus(mid, "REJECTED", today)
- for h, fm in self._metadata_cache.items():
- if messageIDSet.has_key(fm.messageid):
- self.removeMessage(h)
-
- def _getPacketMetadata(self, fragmentPacket):
- return _FragmentMetadata(messageid=fragmentPacket.msgID,
- idx=fragmentPacket.index,
- hash=fragmentPacket.hash,
- size=fragmentPacket.msgLen,
- isChunk=0,
- chunkNum=None,
- overhead=fragmentPacket.getOverhead(),
- insertedDate=previousMidnight(now))
-
- def addFragment(self, fragmentPacket, now=None):
- if now is None:
- now = time.time()
- today = previousMidnight(now)
-
- meta = self._getFragmentMetadata(fragmentPacket)
- h = self.store.queueMessage(fragmentPacket.data)
- self.store.setMetadata(h, meta)
-
- state = self.getState(fm)
- try:
- state.addFragment(fragmentPacket)
- except MismatchedFragment:
- self._abortMessageIDs({ meta.id
- self.removeMessage(h)
- # XXXX remove other fragments, mark msgid as bad.
- except UnneededFragment:
- LOG.debug("Dropping unneeded fragment %s of message %r",
- fragmentPacket.idx, fragmentPacket.msgID)
- self.removeMessage(h)
-
- def getReadyMessage(self, msgid):
- s = self.states.get(msgid)
- if not s or not s.isDone():
- return None
-
- hs = s.getChunkHandles()
- return "".join([self.state.getMessage(h) for h in hs])
-
- def deleteMessage(self, msgid):
- s = self.states.get(msgid)
- if not s or not s.isDone():
- return None
-
- hs = s.getChunkHandles()
- for h in hs:
- self.store.removeMessage(h)
-
- def getReadyMessages(self):
- return [ msgid
- for msgid,state in self.states.items()
- if state.isDone() ]
-
- def unchunkMessages(self):
- for msgid, state in self.states.items():
- if not state.hasReadyChunks():
- continue
- for chunkno, lst in state.getReadyChunks():
- vs = []
- minDate = min([fm.insertedDate for h,fm in lst])
- for h,fm in lst:
- vs.append((state.getPos(fm.index)[1],
- self.store.getMessage(h)))
- chunkText = self.store.params.getFEC().decode(vs)
- fm2 = _FragmentMetadata(state.mesageid, state.hash,
- state.idx, 1, chunkno, state.overhead,
- minDate)
- h2 = self.store.queueMessage(chunkText)
- self.store.setMetadata(h2, fm2)
- for h,fm in lst:
- self.store.removeMessage(h)
-
-# ======================================================================
-
-_fectab = {}
-
-def _getFEC(k,n):
- """DOCDOC: Note race condition """
- try:
- return _fectab[(k,n)]
- except KeyError:
- f = mixminion._minionlib.FEC_generate(k,n)
- _fectab[(k,n)] = f
- return f
Index: Crypto.py
===================================================================
RCS file: /home/minion/cvsroot/src/minion/lib/mixminion/Crypto.py,v
retrieving revision 1.51
retrieving revision 1.52
diff -u -d -r1.51 -r1.52
--- Crypto.py 8 Aug 2003 21:42:47 -0000 1.51
+++ Crypto.py 14 Aug 2003 19:37:24 -0000 1.52
@@ -184,12 +184,16 @@
return left + right
def whiten(s):
- """DOCDOC"""
+ """Return a whitened version of a string 's', using the whitening
+ algorithm from 'E2E-spec.txt'.
+
+ The functions 'unwhiten' inverts 'whiten', but if any portion of
+ whiten(s) is not known, no part of 's' can be recovered."""
keys = Keyset("WHITEN").getLionessKeys("WHITEN")
return lioness_encrypt(s, keys)
def unwhiten(s):
- """DOCDOC"""
+ """Given a whitened string, return the original string."""
keys = Keyset("WHITEN").getLionessKeys("WHITEN")
return lioness_decrypt(s, keys)
Index: Filestore.py
===================================================================
RCS file: /home/minion/cvsroot/src/minion/lib/mixminion/Filestore.py,v
retrieving revision 1.4
retrieving revision 1.5
diff -u -d -r1.4 -r1.5
--- Filestore.py 8 Aug 2003 21:40:42 -0000 1.4
+++ Filestore.py 14 Aug 2003 19:37:24 -0000 1.5
@@ -14,13 +14,14 @@
# time to refactor the common code.
import anydbm
+import binascii
+import cPickle
import dumbdbm
-import os
import errno
-import time
+import os
import stat
-import cPickle
import threading
+import time
from mixminion.Common import MixFatalError, secureDelete, LOG, \
createPrivateDir, readFile, tryUnlink
@@ -28,7 +29,13 @@
__all__ = [ "StringStore", "StringMetadataStore",
"ObjectStore", "ObjectMetadataStore",
- "MixedStore", "MixedMetadataStore" ]
+ "MixedStore", "MixedMetadataStore",
+ "DBBase", "JournaledDBBase", "BooleanJournaledDBBase"
+ ]
+
+
+# ======================================================================
+# Filestores.
# Mode to pass to open(2) for creating a new file, and dying if it already
# exists.
@@ -88,7 +95,6 @@
'create' is true, creates the directory if necessary. If 'scrub'
is true, removes any incomplete or invalidated messages from the
store."""
-
secureDelete([]) # Make sure secureDelete is configured. HACK!
self._lock = threading.RLock()
@@ -192,20 +198,24 @@
file, handle = getCommonPRNG().openNewFile(self.dir, "inp_", 1)
return file, handle
- def finishMessage(self, f, handle, ismeta=0):
+ def finishMessage(self, f, handle, _ismeta=0):
"""Given a file and a corresponding handle, closes the file
commits the corresponding message."""
+ # if '_ismeta' is true, we're finishing not a message, but the
+ # metadata for a message
f.close()
- if ismeta:
+ if _ismeta:
self._changeState(handle, "inpm", "meta")
else:
self._changeState(handle, "inp", "msg")
- def abortMessage(self, f, handle, ismeta=0):
+ def abortMessage(self, f, handle, _ismeta=0):
"""Given a file and a corresponding handle, closes the file
rejects the corresponding message."""
+ # if '_ismeta' is true, we're finishing not a message, but the
+ # metadata for a message
f.close()
- if ismeta:
+ if _ismeta:
self._changeState(handle, "inpm", "rmvm")
else:
self._changeState(handle, "inp", "rmv")
@@ -240,7 +250,6 @@
secureDeleteFn(rmv)
else:
secureDelete(rmv, blocking=1)
- return 0
def _changeState(self, handle, s1, s2):
"""Helper method: changes the state of message 'handle' from 's1'
@@ -251,6 +260,7 @@
os.rename(os.path.join(self.dir, s1+"_"+handle),
os.path.join(self.dir, s2+"_"+handle))
except OSError, e:
+ # WWWW On windows, replacing metdata can create an error!
contents = os.listdir(self.dir)
LOG.error("Error while trying to change %s from %s to %s: %s",
handle, s1, s2, e)
@@ -268,6 +278,9 @@
self._lock.release()
class StringStoreMixin:
+ """Combine the 'StringStoreMixin' class with a BaseStore in order
+ to implement a BaseStore that stores strings.
+ """
def __init__(self): pass
def messageContents(self, handle):
"""Given a message handle, returns the contents of the corresponding
@@ -287,23 +300,10 @@
self.finishMessage(f, handle) # handles locking
return handle
- def moveMessage(self, handle, other):
- """Given a handle and a queue, moves the corresponding message
- from this filestore to the filestore provided. Returns a
- new handle for the message in the destination queue."""
-
- # Since we're switching handles, we don't want to just rename;
- # We really want to copy and delete the old file.
- try:
- self._lock.acquire()
- newHandle = other.queueMessage(self.messageContents(handle))
- self.removeMessage(handle)
- finally:
- self._lock.release()
-
- return newHandle
-
class ObjectStoreMixin:
+ """Combine the 'ObjectStoreMixin' class with a BaseStore in order
+ to implement a BaseStore that stores strings.
+ """
def __init__(self): pass
def getObject(self, handle):
"""Given a message handle, read and unpickle the contents of the
@@ -326,11 +326,50 @@
return handle
class BaseMetadataStore(BaseStore):
+ """A BaseMetadataStore is a BaseStore that stores a metadata
+ object for every object in the store. We assume metadata to be
+ relatively volitile compared to the underlying stored objects.
+ Metadata is not always wiped before removal.
+
+ The representation of a store with metadata is the same as that
+ of a simple store, except that:
+ 1) The meta_, rmvm_, and inpm_ tags are used.
+ 2) For every file in msg_ state, there is a corresponding meta_
+ file.
+ """
+ ##Fields:
+ # _metadata_cache: map from handle to cached metadata object. This is
+ # a write-through cache.
def __init__(self, location, create=0, scrub=0):
+ """Create a new BaseMetadataStore to store files in 'location'. The
+ 'create' and 'scrub' arguments are as for BaseStore(...)."""
BaseStore.__init__(self, location=location, create=create, scrub=scrub)
self._metadata_cache = {}
+ if scrub:
+ self.cleanMetadata()
+
+ def cleanMetadata(self,secureDeleteFn=None):
+ """Find all orphaned metadata files and remove them."""
+ hSet = {}
+ for h in self.getAllMessages():
+ hSet[h] = 1
+ rmv = []
+ for h in [fn[5:] for fn in os.listdir(self.dir)
+ if fn.startswith("meta_")]:
+ if not hSet.get(h):
+ rmv.append("meta_"+h)
+ if rmv:
+ LOG.warn("Removing %s orphaned metadata files from %s",
+ len(rmv), self.dir)
+ if secureDeleteFn:
+ secureDeleteFn(rmv)
+ else:
+ secureDelete(rmv, blocking=1)
def loadAllMetadata(self, newDataFn):
+ """For all objects in the store, load their metadata into the internal
+ cache. If any object is missing its metadata, create metadata for
+ it by invoking newDataFn(handle)."""
try:
self._lock.acquire()
self._metadata_cache = {}
@@ -344,6 +383,7 @@
self._lock.release()
def getMetadata(self, handle):
+ """Return the metadata associated with a given handle."""
fname = os.path.join(self.dir, "meta_"+handle)
if not os.path.exists(fname):
raise KeyError(handle)
@@ -362,21 +402,25 @@
self._lock.release()
def setMetadata(self, handle, object):
- """DOCDOC"""
+ """Change the metadata associated with a given handle."""
try:
self._lock.acquire()
fname = os.path.join(self.dir, "inpm_"+handle)
f = os.fdopen(os.open(fname, _NEW_FILE_FLAGS, 0600), "wb")
cPickle.dump(object, f, 1)
- self.finishMessage(f, handle, ismeta=1)
+ self.finishMessage(f, handle, _ismeta=1)
self._metadata_cache[handle] = object
return handle
finally:
self._lock.release()
def removeMessage(self, handle):
+ """Given a handle, removes the corresponding message from the
+ filestore. """
try:
self._lock.acquire()
+ # Remove the message before the metadata, so we don't have
+ # a message without metadata.
BaseStore.removeMessage(self, handle)
if os.path.exists(os.path.join(self.dir, "meta_"+handle)):
self._changeState(handle, "meta", "rmvm")
@@ -388,25 +432,55 @@
finally:
self._lock.release()
+class StringMetadataStoreMixin(StringStoreMixin):
+ """Add this mixin class to a BaseMetadataStore in order to get a
+ filestore that stores strings with metadata."""
+ def __init__(self):
+ StringStoreMixin.__init__(self)
+ def queueMessage(self, message):
+ LOG.warn("Called 'queueMessage' on a metadata store.")
+ return self.queueMessageAndMetadata(message, None)
+ def queueMessageAndMetadata(self, message, metadata):
+ f, handle = self.openNewMessage()
+ f.write(message)
+ self.setMetadata(handle, metadata)
+ self.finishMessage(f, handle) # handles locking
+ return handle
+
+class ObjectMetadataStoreMixin(ObjectStoreMixin):
+ """Add this mixin class to a BaseMetadataStore in order to get a
+ filestore that stores objects with metadata."""
+ def __init__(self):
+ ObjectStoreMixin.__init__(self)
+ def queueObject(self, object):
+ LOG.warn("Called 'queueObject' on a metadata store.")
+ return self.queueObjectAndMetadata(message, None)
+ def queueObjectAndMetadata(self, object, metadata):
+ f, handle = self.openNewMessage()
+ cPickle.dump(object, f, 1)
+ self.setMetadata(handle, metadata)
+ self.finishMessage(f, handle) # handles locking
+ return handle
+
class StringStore(BaseStore, StringStoreMixin):
def __init__(self, location, create=0, scrub=0):
BaseStore.__init__(self, location, create, scrub)
StringStoreMixin.__init__(self)
-class StringMetadataStore(BaseMetadataStore, StringStoreMixin):
+class StringMetadataStore(BaseMetadataStore, StringMetadataStoreMixin):
def __init__(self, location, create=0, scrub=0):
BaseMetadataStore.__init__(self, location, create, scrub)
- StringStoreMixin.__init__(self)
+ StringMetadataStoreMixin.__init__(self)
class ObjectStore(BaseStore, ObjectStoreMixin):
def __init__(self, location, create=0, scrub=0):
BaseStore.__init__(self, location, create, scrub)
ObjectStoreMixin.__init__(self)
-class ObjectMetadataStore(BaseMetadataStore, ObjectStoreMixin):
+class ObjectMetadataStore(BaseMetadataStore, ObjectMetadataStoreMixin):
def __init__(self, location, create=0, scrub=0):
BaseMetadataStore.__init__(self, location, create, scrub)
- ObjectStoreMixin.__init__(self)
+ ObjectMetadataStoreMixin.__init__(self)
class MixedStore(BaseStore, StringStoreMixin, ObjectStoreMixin):
def __init__(self, location, create=0, scrub=0):
@@ -414,31 +488,51 @@
StringStoreMixin.__init__(self)
ObjectStoreMixin.__init__(self)
-class MixedMetadataStore(BaseMetadataStore, StringStoreMixin,
- ObjectStoreMixin):
+class MixedMetadataStore(BaseMetadataStore, StringMetadataStoreMixin,
+ ObjectMetadataStoreMixin):
def __init__(self, location, create=0, scrub=0):
BaseMetadataStore.__init__(self, location, create, scrub)
- StringStoreMixin.__init__(self)
- ObjectStoreMixin.__init__(self)
+ StringMetadataStoreMixin.__init__(self)
+ ObjectMetadataStoreMixin.__init__(self)
# ======================================================================
# Database wrappers
class DBBase:
- # _lock
+ """A DBBase is a persistant store that maps keys to values, using
+ a Python anydbm object.
+
+ It differs from the standard python 'shelve' module:
+ - by handling broken databases files,
+ - by warning when using dumbdbm,
+ - by providing a 'sync' feature,
+ - by bypassing the pickle module's overhead,
+ - by providing thread-safety
+
+ To use this class for non-string keys or values, override the
+ _{en|de}code{Key|Value} methods."""
+ ## Fields:
+ # _lock -- A threading.RLock to protect access to database.
+ # filename -- The name of the underlying database file. Note that some
+ # database implementations (such as dumdbm) create multiple files,
+ # using <filename> as a prefix.
+ # log -- The underlying anydbm object.
def __init__(self, filename, purpose=""):
+ """Create a DBBase object for a database stored in 'filename',
+ creating the underlying database if needed."""
self._lock = threading.RLock()
self.filename = filename
parent = os.path.split(filename)[0]
createPrivateDir(parent)
+ # If the file can't be read, bail.
try:
st = os.stat(filename)
except OSError, e:
if e.errno != errno.ENOENT:
raise
st = None
-
+ # If the file is empty, delete it and start over.
if st and st[stat.ST_SIZE] == 0:
LOG.warn("Half-created database %s found; cleaning up.", filename)
tryUnlink(filename)
@@ -461,10 +555,16 @@
# flush the journal, and so on.
def _encodeKey(self, k):
+ """Given a key for this mapping (a Python object), return a string
+ usable as a key by the underlying databse."""
return k
def _encodeVal(self, v):
+ """Given a value for this mapping (a Python object), return a string
+ usable as a value by the underlying databse."""
return v
def _decodeVal(self, v):
+ """Given a string-encoded value as used in the underlying database,
+ return the original Python object."""
return v
def has_key(self, k):
@@ -477,6 +577,9 @@
def __getitem__(self, k):
return self.getItem(k)
+ def keys(self):
+ return map(self._decodeKey, self.log.keys())
+
def get(self, k, default=None):
try:
return self[k]
@@ -486,6 +589,9 @@
def __setitem__(self, k, v):
self.setItem(k, v)
+ def __delitem__(self, k):
+ self.delItem(k)
+
def getItem(self, k):
try:
self._lock.acquire()
@@ -499,8 +605,16 @@
self.log[self._encodeKey(k)] = self._encodeVal(v)
finally:
self._lock.release()
+
+ def delItem(self, k):
+ try:
+ self._lock.acquire()
+ del self.log[self._encodeKey(k)]
+ finally:
+ self._lock.release()
def sync(self):
+ """Flush all pending changes to disk"""
self._lock.acquire()
try:
self.log.sync()
@@ -508,6 +622,7 @@
self._lock.release()
def close(self):
+ """Release resources associated with this database."""
self._lock.acquire()
try:
self.log.close()
@@ -515,11 +630,30 @@
finally:
self._lock.release()
+# Flags for use when opening the journal.
_JOURNAL_OPEN_FLAGS = os.O_WRONLY|os.O_CREAT|getattr(os,'O_SYNC',0)|getattr(os,'O_BINARY',0)
class JournaledDBBase(DBBase):
+ """Optimized version of DBBase that requires fewer sync() operations.
+ Uses a journal file to cache keys and values until they can be written
+ to the underlying database. Keys and values must all encode to stings
+ of the same length."""
+ # Largest allowed number of journal entries before we flush the journal
+ # to disk.
MAX_JOURNAL = 128
+ ## Fields:
+ # klen -- required length of encoded keys
+ # vlen -- required length of encoded values
+ # vdflt -- If vlen is 0, default value used when reading journaled value
+ # from disk.
+ # journal -- map from journal-encoded key to journal-encoded value.
+ # journalFileName -- filename to use for journal file.
+ # journalFile -- fd for the journal file
+
def __init__(self, location, purpose, klen, vlen, vdflt):
+ """Create a new JournaledDBBase that stores its files to match the
+ pattern 'location*', whose journal-encoded keys are all of length
+ klen, whose journal-encoded values are all of length vlen."""
DBBase.__init__(self, location, purpose)
self.klen = klen
@@ -528,6 +662,7 @@
self.journalFileName = location+"_jrnl"
self.journal = {}
+ # If there's a journal file, snarf it into memory.
if os.path.exists(self.journalFileName):
j = readFile(self.journalFileName, 1)
for i in xrange(0, len(j), klen+vlen):
@@ -564,6 +699,10 @@
finally:
self._lock.release()
+ def keys(self):
+ return map(self._decodeKey, self.log.keys()) + \
+ map(self._jDecodeKey, self.journal.keys())
+
def setItem(self, k, v):
jk = self._jEncodeKey(k)
jv = self._jEncodeVal(v)
@@ -580,6 +719,9 @@
finally:
self._lock.release()
+ def delItem(self, k):
+ raise NotImplemented
+
def sync(self):
self._lock.acquire()
try:
@@ -604,3 +746,19 @@
os.close(self.journalFile)
finally:
self._lock.release()
+
+class BooleanJournaledDBBase(JournaledDBBase):
+ """Specialization of JournaledDBBase that encodes a set of keys, mapping
+ each key to the value '1'."""
+ def __init__(self, location, purpose, klen):
+ JournaledDBBase.__init__(self,location,purpose,klen,0,"1")
+ def _encodeKey(self, k):
+ return binascii.b2a_hex(k)
+ def _jEncodeVal(self, v):
+ return ""
+ def _jDecodeVal(self, k):
+ return 1
+ def _encodeVal(self, v):
+ return "1"
+ def _decodeVal(self, v):
+ return 1
Index: benchmark.py
===================================================================
RCS file: /home/minion/cvsroot/src/minion/lib/mixminion/benchmark.py,v
retrieving revision 1.45
retrieving revision 1.46
diff -u -d -r1.45 -r1.46
--- benchmark.py 13 Jul 2003 03:45:34 -0000 1.45
+++ benchmark.py 14 Aug 2003 19:37:24 -0000 1.46
@@ -24,7 +24,7 @@
import mixminion.server.ServerQueue
from mixminion.BuildMessage import _buildHeader, buildForwardMessage, \
- compressData, uncompressData, _encodePayload, decodePayload
+ compressData, uncompressData, encodePayloads, decodePayload
from mixminion.Common import secureDelete, installSIGCHLDHandler, \
waitForChildren, formatBase64, Lockfile
from mixminion.Crypto import *
@@ -479,7 +479,8 @@
#----------------------------------------------------------------------
def serverQueueTiming():
print "#================= SERVER QUEUES ====================="
- Queue = mixminion.server.ServerQueue.Queue
+ import mixminion.Filestore
+ Queue = mixminion.Filestore.MixedStore
DeliveryQueue = mixminion.server.ServerQueue.DeliveryQueue
d1 = mix_mktemp()
q1 = Queue(d1, create=1)
@@ -568,19 +569,19 @@
print "#=============== END-TO-END ENCODING =================="
shortP = "hello world"
prng = AESCounterPRNG()
- p = _encodePayload(shortP, 0, prng)
+ p = encodePayloads(shortP, 0, prng)[0]
t = prng.getBytes(20)
print "Decode short payload", timeit(
lambda p=p,t=t: decodePayload(p, t), 1000)
k20 = prng.getBytes(20*1024)
- p = _encodePayload(k20, 0, prng)
+ p = encodePayloads(k20, 0, prng)[0]
t = prng.getBytes(20)
print "Decode 20K payload", timeit(
lambda p=p,t=t: decodePayload(p, t), 1000)
comp = "x"*(20*1024)
- p = _encodePayload(comp, 0, prng)
+ p = encodePayloads(comp, 0, prng)[0]
t = prng.getBytes(20)
def decode(p=p,t=t):
try:
@@ -1002,7 +1003,6 @@
if 0:
testLeaks_FEC()
return
-
fecTiming()
cryptoTiming()
rsaTiming()
Index: test.py
===================================================================
RCS file: /home/minion/cvsroot/src/minion/lib/mixminion/test.py,v
retrieving revision 1.142
retrieving revision 1.143
diff -u -d -r1.142 -r1.143
--- test.py 8 Aug 2003 21:40:40 -0000 1.142
+++ test.py 14 Aug 2003 19:37:25 -0000 1.143
@@ -1648,7 +1648,9 @@
for m in (p.getBytes(3000), p.getBytes(10000), "", "q", "blznerty"):
for ov in 0, 42-20+16: # encrypted forward overhead
- pld = BuildMessage._encodePayload(m,ov,p)
+ plds = BuildMessage.encodePayloads(m,ov,p)
+ assert len(plds) == 1
+ pld = plds[0]
self.assertEquals(28*1024, len(pld)+ov)
comp = compressData(m)
self.assertStartsWith(pld[22:], comp)
@@ -2284,7 +2286,7 @@
# get such a payload is to compress 25K of zeroes.
nils = "\x00"*(25*1024)
overcompressed_payload = \
- BuildMessage._encodePayload(nils, 0, AESCounterPRNG())
+ BuildMessage.encodePayloads(nils, 0, AESCounterPRNG())[0]
self.failUnlessRaises(CompressedDataTooLong,
BuildMessage.decodePayload, overcompressed_payload, "X"*20)
@@ -2721,7 +2723,8 @@
# Move the first 30 messages to queue2
q2h = []
for h in handles[:30]:
- nh = queue1.moveMessage(h, queue2)
+ nh = queue2.queueMessage(queue1.messageContents(h))
+ queue1.removeMessage(h)
q2h.append(nh)
# Look at the messages in queue2, 15 then 30 at a time.
@@ -2792,15 +2795,14 @@
Store = mixminion.Filestore.StringMetadataStore
queue = Store(d_d, create=1)
- h1 = queue.queueMessage("abc")
- queue.setMetadata(h1, [2,3])
+ h1 = queue.queueMessageAndMetadata("abc", [2,3])
self.assertEquals(readPickled(os.path.join(d_d, "meta_"+h1)), [2,3])
self.assertEquals(queue.getMetadata(h1), [2,3])
- h2 = queue.queueMessage("def")
- queue.setMetadata(h2, [5,6])
- h3 = queue.queueMessage("ghi")
- self.assertEquals(queue._metadata_cache, { h1 : [2,3], h2 : [5,6] })
-
+ h2 = queue.queueMessageAndMetadata("def", [5,6])
+ h3 = queue.queueMessageAndMetadata("ghi", None)
+ self.assertEquals(queue._metadata_cache, { h1 : [2,3], h2 : [5,6],
+ h3 : None })
+ os.unlink(os.path.join(d_d, "meta_"+h3))
queue = Store(d_d, create=0)
self.assertEquals(queue.getMetadata(h2), [5,6])
self.assertEquals(queue._metadata_cache, { h2 : [5,6] })