[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[minion-cvs] Clean up cottrell algorithm



Update of /home/minion/cvsroot/src/minion/lib/mixminion
In directory moria.seul.org:/tmp/cvs-serv27330/lib/mixminion

Modified Files:
	Queue.py 
Log Message:
Clean up cottrell algorithm

Index: Queue.py
===================================================================
RCS file: /home/minion/cvsroot/src/minion/lib/mixminion/Queue.py,v
retrieving revision 1.17
retrieving revision 1.18
diff -u -d -r1.17 -r1.18
--- Queue.py	10 Sep 2002 14:45:30 -0000	1.17
+++ Queue.py	21 Oct 2002 02:31:02 -0000	1.18
@@ -16,7 +16,7 @@
      createPrivateDir
 from mixminion.Crypto import AESCounterPRNG
 
-__all__ = [ 'Queue', 'DeliveryQueue', 'TimedMixQueue', 'CottrellMixQueue', 
+__all__ = [ 'Queue', 'DeliveryQueue', 'TimedMixQueue', 'CottrellMixQueue',
 	    'BinomialCottrellMixQueue' ]
 
 # Mode to pass to open(2) for creating a new file, and dying if it already
@@ -29,7 +29,7 @@
 # trash.
 INPUT_TIMEOUT = 600
 
-# If we've been cleaning for more than CLEAN_TIMEOUT seconds, assume the 
+# If we've been cleaning for more than CLEAN_TIMEOUT seconds, assume the
 # old clean is dead.
 CLEAN_TIMEOUT = 60
 
@@ -54,13 +54,13 @@
        # as the fact that most Unices behave badly when confronted with
        # 3 billion files in the same directory... or the fact that,
        # at today's processor speeds, it will take Alice 3 or 4
-       # CPU-years to clear her backlog. 
+       # CPU-years to clear her backlog.
 
     # Fields:   rng--a random number generator for creating new messages
     #                and getting a random slice of the queue.
     #           dir--the location of the queue.
     #           n_entries: the number of complete messages in the queue.
-    #                 <0 if we haven't counted yet.  
+    #                 <0 if we haven't counted yet.
     def __init__(self, location, create=0, scrub=0):
         """Creates a queue object for a given directory, 'location'.  If
            'create' is true, creates the directory if necessary.  If 'scrub'
@@ -68,7 +68,7 @@
            Queue."""
 
         secureDelete([]) # Make sure secureDelete is configured. HACK!
-        
+
         self.rng = AESCounterPRNG()
         self.dir = location
 
@@ -95,13 +95,13 @@
         return handle
 
     def queueObject(self, object):
-	"""Queue an object using cPickle, and return a handle to that 
+	"""Queue an object using cPickle, and return a handle to that
 	   object."""
         f, handle = self.openNewMessage()
         cPickle.dump(object, f, 1)
         self.finishMessage(f, handle)
         return handle
-      
+
     def count(self, recount=0):
         """Returns the number of complete messages in the queue."""
 	if self.n_entries >= 0 and not recount:
@@ -200,13 +200,13 @@
            Returns 1 if a clean is already in progress; otherwise
            returns 0.
         """
-        now = time.time() 
+        now = time.time()
         cleanFile = os.path.join(self.dir,".cleaning")
         try:
             s = os.stat(cleanFile)
             if now - s[stat.ST_MTIME] > CLEAN_TIMEOUT:
                 cleaning = 0
-            cleaning = 1    
+            cleaning = 1
         except OSError:
             cleaning = 0
 
@@ -216,7 +216,7 @@
         f = open(cleanFile, 'w')
         f.write(str(now))
         f.close()
-        
+
         rmv = []
         allowedTime = int(time.time()) - INPUT_TIMEOUT
         for m in os.listdir(self.dir):
@@ -249,7 +249,7 @@
 
 class DeliveryQueue(Queue):
     """A DeliveryQueue implements a queue that greedily sends messages
-       to outgoing streams that occasionally fail.  Messages in a 
+       to outgoing streams that occasionally fail.  Messages in a
        DeliveryQueue are no longer unstructured text, but rather
        tuples of: (n_retries, addressing info, msg).
 
@@ -268,7 +268,7 @@
     """
     ###
     # Fields:
-    #    sendable -- A list of handles for all messages 
+    #    sendable -- A list of handles for all messages
     #           that we're not currently sending.
     #    pending -- Dict from handle->1, for all messages that we're
     #           currently sending.
@@ -282,7 +282,7 @@
            directory."""
 	self.pending = {}
 	self.sendable = self.getAllMessages()
-    
+
     def queueMessage(self, addr, msg, retry=0):
 	"""Schedule a message for delivery.
 	     addr -- An object to indicate the message's destination
@@ -313,9 +313,9 @@
 	    self.deliverMessages(messages)
 
     def deliverMessages(self, msgList):
-	"""Abstract method; Invoked with a list of  
+	"""Abstract method; Invoked with a list of
   	   (handle, addr, message, n_retries) tuples every time we have a batch
-	   of messages to send.  
+	   of messages to send.
 
            For every handle in the list, delierySucceeded or deliveryFailed
 	   should eventually be called, or the message will sit in the queue
@@ -342,14 +342,14 @@
 	   successfully delivered."""
 	del self.pending[handle]
 	if retriable:
-	    # Queue the new one before removing the old one, for 
+	    # Queue the new one before removing the old one, for
 	    # crash-proofness
 	    retries, addr, msg = self.getObject(handle)
 	    # FFFF This test makes us never retry past the 10th attempt.
 	    # FFFF That's wrong; we should be smarter.
 	    if retries <= 10:
 		self.queueMessage(addr, msg, retries+1)
-	self.removeMessage(handle)    
+	self.removeMessage(handle)
 
 class TimedMixQueue(Queue):
     """A TimedMixQueue holds a group of files, and returns some of them
@@ -362,7 +362,7 @@
 	self.interval = interval
 
     def getBatch(self):
-	"""Return handles for all messages that the pool is currently ready 
+	"""Return handles for all messages that the pool is currently ready
 	   to send in the next batch"""
 	return self.pickRandom()
 
@@ -373,33 +373,59 @@
     """A CottrellMixQueue holds a group of files, and returns some of them
        as requested, according the Cottrell (timed dynamic-pool) mixing
        algorithm from Mixmaster."""
-    def __init__(self, location, interval=600, threshold=6, retainRate=.7):
+    def __init__(self, location, interval=600, minPool=6, minSend=1,
+		 sendRate=.7):
 	"""Create a new queue that yields a batch of message every 'interval'
-	   seconds, never sends unless it has more than <threshold> messages,
-	   and always keeps <retainRate> * the current pool size."""
+	   seconds, always keeps <minPool> messages in the pool, never sends
+	   unless it has <minPool>+<minSend> messages, and never sends more
+	   than <sendRate> * the corrent pool size.
+
+	   If 'minSend'==1, this is a real Cottrell (type-II) mix pool.
+	   Otherwise, this is a generic 'timed dynamic-pool' mix pool.  (Note
+	   that there is still a matter of some controversy whether it ever
+	   makes sense to set minSend != 1.)
+	   """
+
+        # Note that there was a bit of confusion here: earlier versions
+	# implemented an algorithm called "mixmaster" that wasn't actually the
+	# mixmaster algorithm.  I picked up the other algorithm from an early
+	# draft of Roger, Paul, and Andrei's 'Batching Taxonomy' paper (since
+	# corrected); they seem to have gotten it from Anja Jerichow's
+	# Phd. thesis ("Generalisation and Security Improvement of
+	# Mix-mediated Anonymous Communication") of 2000.
+	#
+	# *THIS* is the algorithm that the current 'Batching Taxonomy' paper
+	# says that Cottrell says is the real thing.
+
 	TimedMixQueue.__init__(self, location, interval)
-	self.threshold = threshold
-	self.sendRate = 1.0 - retainRate
+	self.minPool = minPool	
+	self.minSend = minSend
+	self.sendRate = sendRate
 
-    def getBatch(self):
-	# XXXX This is not the real cottrell algorithm.  Once somebody
-	# XXXX has explained to me what is going on here, I will implement
-	# XXXX the real one. -NM
+    def _getBatchSize(self):
 	pool = self.count()
-	if pool <= self.threshold:
+	if pool >= (self.minPool + self.minSend):
+	    sendable = pool - self.minPool
+	    return min(sendable, max(1, int(pool * self.sendRate)))
+	else:
+	    return 0
+
+    def getBatch(self):
+	n = self._getBatchSize()
+	if n:
+	    return self.pickRandom(n)
+	else:
 	    return []
-	nTransmit = int(pool * self.sendRate)
-	return self.pickRandom(nTransmit)
 
 class BinomialCottrellMixQueue(CottrellMixQueue):
     """Same algorithm as CottrellMixQueue, but instead of sending N messages
        from the pool of size P, sends each message with probability N/P."""
     def getBatch(self):
-	pool = self.count()
-	if pool <= self.threshold:
+	n = self._getBatchSize()
+	if n == 0:
 	    return []
-	msgProbability = self.sendRate
-	return self.rng.shuffle([ h for h in self.getAllMessages() 
+	msgProbability = n / float(self.count())
+	return self.rng.shuffle([ h for h in self.getAllMessages()
 				    if self.rng.getFloat() < msgProbability ])
 
 def _secureDelete_bg(files, cleanFile):