[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]
[minion-cvs] Clean up cottrell algorithm
Update of /home/minion/cvsroot/src/minion/lib/mixminion
In directory moria.seul.org:/tmp/cvs-serv27330/lib/mixminion
Modified Files:
Queue.py
Log Message:
Clean up cottrell algorithm
Index: Queue.py
===================================================================
RCS file: /home/minion/cvsroot/src/minion/lib/mixminion/Queue.py,v
retrieving revision 1.17
retrieving revision 1.18
diff -u -d -r1.17 -r1.18
--- Queue.py 10 Sep 2002 14:45:30 -0000 1.17
+++ Queue.py 21 Oct 2002 02:31:02 -0000 1.18
@@ -16,7 +16,7 @@
createPrivateDir
from mixminion.Crypto import AESCounterPRNG
-__all__ = [ 'Queue', 'DeliveryQueue', 'TimedMixQueue', 'CottrellMixQueue',
+__all__ = [ 'Queue', 'DeliveryQueue', 'TimedMixQueue', 'CottrellMixQueue',
'BinomialCottrellMixQueue' ]
# Mode to pass to open(2) for creating a new file, and dying if it already
@@ -29,7 +29,7 @@
# trash.
INPUT_TIMEOUT = 600
-# If we've been cleaning for more than CLEAN_TIMEOUT seconds, assume the
+# If we've been cleaning for more than CLEAN_TIMEOUT seconds, assume the
# old clean is dead.
CLEAN_TIMEOUT = 60
@@ -54,13 +54,13 @@
# as the fact that most Unices behave badly when confronted with
# 3 billion files in the same directory... or the fact that,
# at today's processor speeds, it will take Alice 3 or 4
- # CPU-years to clear her backlog.
+ # CPU-years to clear her backlog.
# Fields: rng--a random number generator for creating new messages
# and getting a random slice of the queue.
# dir--the location of the queue.
# n_entries: the number of complete messages in the queue.
- # <0 if we haven't counted yet.
+ # <0 if we haven't counted yet.
def __init__(self, location, create=0, scrub=0):
"""Creates a queue object for a given directory, 'location'. If
'create' is true, creates the directory if necessary. If 'scrub'
@@ -68,7 +68,7 @@
Queue."""
secureDelete([]) # Make sure secureDelete is configured. HACK!
-
+
self.rng = AESCounterPRNG()
self.dir = location
@@ -95,13 +95,13 @@
return handle
def queueObject(self, object):
- """Queue an object using cPickle, and return a handle to that
+ """Queue an object using cPickle, and return a handle to that
object."""
f, handle = self.openNewMessage()
cPickle.dump(object, f, 1)
self.finishMessage(f, handle)
return handle
-
+
def count(self, recount=0):
"""Returns the number of complete messages in the queue."""
if self.n_entries >= 0 and not recount:
@@ -200,13 +200,13 @@
Returns 1 if a clean is already in progress; otherwise
returns 0.
"""
- now = time.time()
+ now = time.time()
cleanFile = os.path.join(self.dir,".cleaning")
try:
s = os.stat(cleanFile)
if now - s[stat.ST_MTIME] > CLEAN_TIMEOUT:
cleaning = 0
- cleaning = 1
+ cleaning = 1
except OSError:
cleaning = 0
@@ -216,7 +216,7 @@
f = open(cleanFile, 'w')
f.write(str(now))
f.close()
-
+
rmv = []
allowedTime = int(time.time()) - INPUT_TIMEOUT
for m in os.listdir(self.dir):
@@ -249,7 +249,7 @@
class DeliveryQueue(Queue):
"""A DeliveryQueue implements a queue that greedily sends messages
- to outgoing streams that occasionally fail. Messages in a
+ to outgoing streams that occasionally fail. Messages in a
DeliveryQueue are no longer unstructured text, but rather
tuples of: (n_retries, addressing info, msg).
@@ -268,7 +268,7 @@
"""
###
# Fields:
- # sendable -- A list of handles for all messages
+ # sendable -- A list of handles for all messages
# that we're not currently sending.
# pending -- Dict from handle->1, for all messages that we're
# currently sending.
@@ -282,7 +282,7 @@
directory."""
self.pending = {}
self.sendable = self.getAllMessages()
-
+
def queueMessage(self, addr, msg, retry=0):
"""Schedule a message for delivery.
addr -- An object to indicate the message's destination
@@ -313,9 +313,9 @@
self.deliverMessages(messages)
def deliverMessages(self, msgList):
- """Abstract method; Invoked with a list of
+ """Abstract method; Invoked with a list of
(handle, addr, message, n_retries) tuples every time we have a batch
- of messages to send.
+ of messages to send.
For every handle in the list, delierySucceeded or deliveryFailed
should eventually be called, or the message will sit in the queue
@@ -342,14 +342,14 @@
successfully delivered."""
del self.pending[handle]
if retriable:
- # Queue the new one before removing the old one, for
+ # Queue the new one before removing the old one, for
# crash-proofness
retries, addr, msg = self.getObject(handle)
# FFFF This test makes us never retry past the 10th attempt.
# FFFF That's wrong; we should be smarter.
if retries <= 10:
self.queueMessage(addr, msg, retries+1)
- self.removeMessage(handle)
+ self.removeMessage(handle)
class TimedMixQueue(Queue):
"""A TimedMixQueue holds a group of files, and returns some of them
@@ -362,7 +362,7 @@
self.interval = interval
def getBatch(self):
- """Return handles for all messages that the pool is currently ready
+ """Return handles for all messages that the pool is currently ready
to send in the next batch"""
return self.pickRandom()
@@ -373,33 +373,59 @@
"""A CottrellMixQueue holds a group of files, and returns some of them
as requested, according the Cottrell (timed dynamic-pool) mixing
algorithm from Mixmaster."""
- def __init__(self, location, interval=600, threshold=6, retainRate=.7):
+ def __init__(self, location, interval=600, minPool=6, minSend=1,
+ sendRate=.7):
"""Create a new queue that yields a batch of message every 'interval'
- seconds, never sends unless it has more than <threshold> messages,
- and always keeps <retainRate> * the current pool size."""
+ seconds, always keeps <minPool> messages in the pool, never sends
+ unless it has <minPool>+<minSend> messages, and never sends more
+ than <sendRate> * the corrent pool size.
+
+ If 'minSend'==1, this is a real Cottrell (type-II) mix pool.
+ Otherwise, this is a generic 'timed dynamic-pool' mix pool. (Note
+ that there is still a matter of some controversy whether it ever
+ makes sense to set minSend != 1.)
+ """
+
+ # Note that there was a bit of confusion here: earlier versions
+ # implemented an algorithm called "mixmaster" that wasn't actually the
+ # mixmaster algorithm. I picked up the other algorithm from an early
+ # draft of Roger, Paul, and Andrei's 'Batching Taxonomy' paper (since
+ # corrected); they seem to have gotten it from Anja Jerichow's
+ # Phd. thesis ("Generalisation and Security Improvement of
+ # Mix-mediated Anonymous Communication") of 2000.
+ #
+ # *THIS* is the algorithm that the current 'Batching Taxonomy' paper
+ # says that Cottrell says is the real thing.
+
TimedMixQueue.__init__(self, location, interval)
- self.threshold = threshold
- self.sendRate = 1.0 - retainRate
+ self.minPool = minPool
+ self.minSend = minSend
+ self.sendRate = sendRate
- def getBatch(self):
- # XXXX This is not the real cottrell algorithm. Once somebody
- # XXXX has explained to me what is going on here, I will implement
- # XXXX the real one. -NM
+ def _getBatchSize(self):
pool = self.count()
- if pool <= self.threshold:
+ if pool >= (self.minPool + self.minSend):
+ sendable = pool - self.minPool
+ return min(sendable, max(1, int(pool * self.sendRate)))
+ else:
+ return 0
+
+ def getBatch(self):
+ n = self._getBatchSize()
+ if n:
+ return self.pickRandom(n)
+ else:
return []
- nTransmit = int(pool * self.sendRate)
- return self.pickRandom(nTransmit)
class BinomialCottrellMixQueue(CottrellMixQueue):
"""Same algorithm as CottrellMixQueue, but instead of sending N messages
from the pool of size P, sends each message with probability N/P."""
def getBatch(self):
- pool = self.count()
- if pool <= self.threshold:
+ n = self._getBatchSize()
+ if n == 0:
return []
- msgProbability = self.sendRate
- return self.rng.shuffle([ h for h in self.getAllMessages()
+ msgProbability = n / float(self.count())
+ return self.rng.shuffle([ h for h in self.getAllMessages()
if self.rng.getFloat() < msgProbability ])
def _secureDelete_bg(files, cleanFile):