[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[minion-cvs] Add more mixing algorithms and support code



Update of /home/minion/cvsroot/src/minion/lib/mixminion
In directory moria.seul.org:/tmp/cvs-serv3439/lib/mixminion

Modified Files:
	Crypto.py Queue.py test.py 
Log Message:
Add more mixing algorithms and support code

Crypto, tests:
	Add getFloat method to RNG
	Add shuffle method to RNG, using logic from Queue.pickRandom
	Fix long-standing bug in shuffle that would never select the
		last element of the list.  (Ow!)

Queue:
	Refactor Queue.pickRandom.
	Add TimedMixQueue.
	Rename MixQueue to CottrellMixQueue
	Add BinomialCottrellMixQueue as a proof-of-concept.
	Make Mix queues know about timeouts.



Index: Crypto.py
===================================================================
RCS file: /home/minion/cvsroot/src/minion/lib/mixminion/Crypto.py,v
retrieving revision 1.12
retrieving revision 1.13
diff -u -d -r1.12 -r1.13
--- Crypto.py	11 Aug 2002 07:50:34 -0000	1.12
+++ Crypto.py	12 Aug 2002 21:05:50 -0000	1.13
@@ -389,6 +389,28 @@
             self.bytes = self.bytes[n:]
             return res
 
+    def shuffle(self, lst, n=None):
+	"""Rearranges the elements of lst so that the first n elements
+	   are randomly chosen from lst.  Returns the first n elements.
+	   (Other elements are still in lst, but may be in a nonrandom
+	   order.)  If n is None, shuffles and returns the entire list"""
+        size = len(lst)
+	if n is None:
+	    n = size
+	else:
+	    n = min(n, size)
+
+
+        # This permutation algorithm yields all permutation with equal
+        # probability (assuming a good rng); others do not.
+        for i in range(n-1):
+            swap = i+self.getInt(size-i)
+            v = lst[swap]
+            lst[swap] = lst[i]
+            lst[i] = v
+
+	return lst[:n]
+
     def getInt(self, max):
         """Returns a random integer i s.t. 0 <= i < max.
 
@@ -419,10 +441,26 @@
             bytes = self.getBytes(nBytes)
             r = 0
             for byte in bytes:
-                r = (r << 8) + ord(byte)
+                r = (r << 8) | ord(byte)
             r = r & mask
             if r < max:
                 return r
+
+    def getFloat(self, bytes=3):
+	"""Return a floating-point number between 0 and 1.  The number
+	   will have 'bytes' bytes of resolution."""
+        # We need to special-case the <4 byte case to get good performance
+	# on Python<2.2
+        if bytes <= 3:
+	    max = 1<<(bytes*8)
+	    tot = 0
+	else:
+	    max = 1L<<(bytes*8)
+	    tot = 0L
+	bytes = self.getBytes(bytes)
+	for byte in bytes:
+	    tot = (tot << 8) | ord(byte)
+	return float(tot)/max
 
     def _prng(self, n):
         """Abstract method: Must be overridden to return n bytes of fresh

Index: Queue.py
===================================================================
RCS file: /home/minion/cvsroot/src/minion/lib/mixminion/Queue.py,v
retrieving revision 1.9
retrieving revision 1.10
diff -u -d -r1.9 -r1.10
--- Queue.py	12 Aug 2002 18:12:24 -0000	1.9
+++ Queue.py	12 Aug 2002 21:05:50 -0000	1.10
@@ -129,23 +129,10 @@
 
            If there are fewer than 'count' messages in the queue, all the
            messages will be retained."""
-        messages = [fn for fn in os.listdir(self.dir) if fn.startswith("msg_")]
-
-        n = len(messages)
-        if count is None:
-            count = n
-        else:
-            count = min(count, n)
-
-        # This permutation algorithm yields all permutation with equal
-        # probability (assuming a good rng); others do not.
-        for i in range(count-1):
-            swap = i+self.rng.getInt(n-i-1)
-            v = messages[swap]
-            messages[swap] = messages[i]
-            messages[i] = v
+        handles = [ fn[4:] for fn in os.listdir(self.dir)
+		           if fn.startswith("msg_") ]
 
-        return [m[4:] for m in messages[:count]]
+	return self.rng.shuffle(handles, count)
 
     def getAllMessages(self):
 	"""Returns handles for all messages currently in the queue.
@@ -332,7 +319,8 @@
     def nextMessageReadyAt(self):
 	"""Return the soonest possible time at which sendReadyMessages
 	   will send something.  If some time < now is returned,
-	   the answer is 'immediately'. """
+	   the answer is 'immediately'.  If 'None' is returned, there are
+	   no messages in the queue."""
 	if self.sendableAt:
 	    return self.sendableAt[0][0]
 	else:
@@ -388,27 +376,62 @@
 	    self.queueMessage(addr, msg, retries+1, retryAt)
 	self.removeMessage(handle)	    
 
-class MixQueue(Queue):
-    """A MixQueue holds a group of files, and returns some of them
-       as requested, according to some mixing algorithm.
-       
-       It's the responsibility of the user of this class to only invoke it
-       at the specified interval."""
-    # Right now, we use the 'Cottrell' mixing algorithm with fixed
-    # parameters.  We always keep 5 messages in the pool, and never send
-    # more than 30% of the pool at a time.  These parameters should probably
-    # be more like 100
-    MIN_POOL_SIZE = 5
-    MAX_REPLACEMENT_RATE = 0.3
-    def __init__(self, location):
-	Queue.__init__(self, location, create=1, scrub=1)
+class TimedMixQueue(Queue):
+    """A TimedMixQueue holds a group of files, and returns some of them
+       as requested, according to a mixing algorithm that sends a batch
+       of messages every N seconds."""
+    def __init__(self, location, interval=600):
+	"""Create a TimedMixQueue that sends its entire batch of messages
+	   every 'interval' seconds."""
+        Queue.__init__(self, location, create=1, scrub=1)
+	self.interval = interval
+	self.nextSendTime = time.time + interval
 
-    def pickMessages(self):
-	"""Return a list of handles."""
-	n = self.count()
-	nTransmit = min(n-self.MIN_POOL_SIZE,
-		       int(n*self.MAX_REPLACEMENT_RATE))
+    def nextMessageReadyAt(self):
+	"""Return the next time at which the pool will be ready to send
+	   messages"""
+	return self.nextSendTime
+
+    def getReadyMessages(self):
+	"""Return handles for all messages that the pool is currently ready 
+	   to send."""
+	now = time.time()
+	if now < self.nextSendTime:
+	    return []
+	self.nextSendTime = now + self.interval
+	return self._getBatch()
+
+    def _getBatch(self):
+	"""Internal method: called by getReadyMessages to return a single
+	   batch of handles."""
+	return self.pickRandom()
+
+class CottrellMixQueue(TimedMixQueue):
+    """A CottrellMixQueue holds a group of files, and returns some of them
+       as requested, according the Cottrell (timed dynamic-pool) mixing
+       algorithm from Mixmaster."""
+    def __init__(self, location, interval=600, minPoolSize=6, maxSendRate=.3):
+	"""Create a new queue that yields a batch of message every 'interval'
+	   seconds, never allows its pool size to drop below 'minPoolSize',
+	   and never sends more than maxSendRate * the current pool size."""
+	TimedMixQueue.__init__(self, location, interval)
+	self.minPoolSize = minPoolSize
+	self.maxSendRate = maxSendRate
+
+    def _getBatch(self):
+	pool = self.count()
+	nTransmit = min(pool-self.minPoolSize, int(pool*self.maxSendRate))
 	return self.pickRandom(nTransmit)
+
+class BinomialCottrellMixQueue(CottrellMixQueue):
+    """Same algorithm as CottrellMixQueue, but instead of sending N messages
+       from the pool of size P, sends each message with probability N/P."""
+    def _getBatch(self):
+	pool = self.count()
+	nTransmit = min(pool-self.minPoolSize, int(pool*self.maxSendRate))
+	msgProbability = float(nTransmit) / pool
+	return rng.shuffle([ h for h in self.getAllMessages() 
+			        if self.rng.getFloat() < msgProbability ])
 
 def _secureDelete_bg(files, cleanFile):
     pid = os.fork()

Index: test.py
===================================================================
RCS file: /home/minion/cvsroot/src/minion/lib/mixminion/test.py,v
retrieving revision 1.17
retrieving revision 1.18
diff -u -d -r1.17 -r1.18
--- test.py	11 Aug 2002 07:50:34 -0000	1.17
+++ test.py	12 Aug 2002 21:05:50 -0000	1.18
@@ -405,9 +405,37 @@
                           PRNG.getBytes(15)+PRNG.getBytes(16000)+
                           PRNG.getBytes(34764)))
 
+	# Check getInt, getFloat.
         for i in xrange(1,10000,17):
             self.failUnless(0 <= PRNG.getInt(10) < 10)
             self.failUnless(0 <= PRNG.getInt(i) < i)
+	for i in xrange(100):
+	    self.failUnless(0 <= PRNG.getFloat() < 1)
+	    self.failUnless(0 <= PRNG.getFloat(4) < 1)
+	    self.failUnless(0 <= PRNG.getFloat(5) < 1)
+
+	# Make sure shuffle only shuffles the first n.
+	lst = range(100)
+	PRNG.shuffle(lst,10)
+	later = [ item for item in lst[10:] if item >= 10 ]
+	s = later[:]
+	s.sort()
+	self.failUnless(later == s)
+
+	# Make sure shuffle actually shuffles all positions.
+	lists = [  ]
+	for i in xrange(6):
+	    lists.append(PRNG.shuffle(lst)[:])
+	# This will fail accidentally once in 10,000,000,000 attempts.
+	for crossSection in zip(*lists):
+	    allEq = 1
+	    for z in crossSection:
+		if z != crossSection[0]: allEq = 0
+	    self.failIf(allEq)
+	for lst in lists:
+	    s = lst[:]
+	    s.sort()
+	    self.assertEquals(s, range(100))
 
 #----------------------------------------------------------------------
 import mixminion.Packet