[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]
[minion-cvs] Add more mixing algorithms and support code
Update of /home/minion/cvsroot/src/minion/lib/mixminion
In directory moria.seul.org:/tmp/cvs-serv3439/lib/mixminion
Modified Files:
Crypto.py Queue.py test.py
Log Message:
Add more mixing algorithms and support code
Crypto, tests:
Add getFloat method to RNG
Add shuffle method to RNG, using logic from Queue.pickRandom
Fix long-standing bug in shuffle that would never select the
last element of the list. (Ow!)
Queue:
Refactor Queue.pickRandom.
Add TimedMixQueue.
Rename MixQueue to CottrellMixQueue
Add BinomialCottrellMixQueue as a proof-of-concept.
Make Mix queues know about timeouts.
Index: Crypto.py
===================================================================
RCS file: /home/minion/cvsroot/src/minion/lib/mixminion/Crypto.py,v
retrieving revision 1.12
retrieving revision 1.13
diff -u -d -r1.12 -r1.13
--- Crypto.py 11 Aug 2002 07:50:34 -0000 1.12
+++ Crypto.py 12 Aug 2002 21:05:50 -0000 1.13
@@ -389,6 +389,28 @@
self.bytes = self.bytes[n:]
return res
+ def shuffle(self, lst, n=None):
+ """Rearranges the elements of lst so that the first n elements
+ are randomly chosen from lst. Returns the first n elements.
+ (Other elements are still in lst, but may be in a nonrandom
+ order.) If n is None, shuffles and returns the entire list"""
+ size = len(lst)
+ if n is None:
+ n = size
+ else:
+ n = min(n, size)
+
+
+ # This permutation algorithm yields all permutation with equal
+ # probability (assuming a good rng); others do not.
+ for i in range(n-1):
+ swap = i+self.getInt(size-i)
+ v = lst[swap]
+ lst[swap] = lst[i]
+ lst[i] = v
+
+ return lst[:n]
+
def getInt(self, max):
"""Returns a random integer i s.t. 0 <= i < max.
@@ -419,10 +441,26 @@
bytes = self.getBytes(nBytes)
r = 0
for byte in bytes:
- r = (r << 8) + ord(byte)
+ r = (r << 8) | ord(byte)
r = r & mask
if r < max:
return r
+
+ def getFloat(self, bytes=3):
+ """Return a floating-point number between 0 and 1. The number
+ will have 'bytes' bytes of resolution."""
+ # We need to special-case the <4 byte case to get good performance
+ # on Python<2.2
+ if bytes <= 3:
+ max = 1<<(bytes*8)
+ tot = 0
+ else:
+ max = 1L<<(bytes*8)
+ tot = 0L
+ bytes = self.getBytes(bytes)
+ for byte in bytes:
+ tot = (tot << 8) | ord(byte)
+ return float(tot)/max
def _prng(self, n):
"""Abstract method: Must be overridden to return n bytes of fresh
Index: Queue.py
===================================================================
RCS file: /home/minion/cvsroot/src/minion/lib/mixminion/Queue.py,v
retrieving revision 1.9
retrieving revision 1.10
diff -u -d -r1.9 -r1.10
--- Queue.py 12 Aug 2002 18:12:24 -0000 1.9
+++ Queue.py 12 Aug 2002 21:05:50 -0000 1.10
@@ -129,23 +129,10 @@
If there are fewer than 'count' messages in the queue, all the
messages will be retained."""
- messages = [fn for fn in os.listdir(self.dir) if fn.startswith("msg_")]
-
- n = len(messages)
- if count is None:
- count = n
- else:
- count = min(count, n)
-
- # This permutation algorithm yields all permutation with equal
- # probability (assuming a good rng); others do not.
- for i in range(count-1):
- swap = i+self.rng.getInt(n-i-1)
- v = messages[swap]
- messages[swap] = messages[i]
- messages[i] = v
+ handles = [ fn[4:] for fn in os.listdir(self.dir)
+ if fn.startswith("msg_") ]
- return [m[4:] for m in messages[:count]]
+ return self.rng.shuffle(handles, count)
def getAllMessages(self):
"""Returns handles for all messages currently in the queue.
@@ -332,7 +319,8 @@
def nextMessageReadyAt(self):
"""Return the soonest possible time at which sendReadyMessages
will send something. If some time < now is returned,
- the answer is 'immediately'. """
+ the answer is 'immediately'. If 'None' is returned, there are
+ no messages in the queue."""
if self.sendableAt:
return self.sendableAt[0][0]
else:
@@ -388,27 +376,62 @@
self.queueMessage(addr, msg, retries+1, retryAt)
self.removeMessage(handle)
-class MixQueue(Queue):
- """A MixQueue holds a group of files, and returns some of them
- as requested, according to some mixing algorithm.
-
- It's the responsibility of the user of this class to only invoke it
- at the specified interval."""
- # Right now, we use the 'Cottrell' mixing algorithm with fixed
- # parameters. We always keep 5 messages in the pool, and never send
- # more than 30% of the pool at a time. These parameters should probably
- # be more like 100
- MIN_POOL_SIZE = 5
- MAX_REPLACEMENT_RATE = 0.3
- def __init__(self, location):
- Queue.__init__(self, location, create=1, scrub=1)
+class TimedMixQueue(Queue):
+ """A TimedMixQueue holds a group of files, and returns some of them
+ as requested, according to a mixing algorithm that sends a batch
+ of messages every N seconds."""
+ def __init__(self, location, interval=600):
+ """Create a TimedMixQueue that sends its entire batch of messages
+ every 'interval' seconds."""
+ Queue.__init__(self, location, create=1, scrub=1)
+ self.interval = interval
+ self.nextSendTime = time.time + interval
- def pickMessages(self):
- """Return a list of handles."""
- n = self.count()
- nTransmit = min(n-self.MIN_POOL_SIZE,
- int(n*self.MAX_REPLACEMENT_RATE))
+ def nextMessageReadyAt(self):
+ """Return the next time at which the pool will be ready to send
+ messages"""
+ return self.nextSendTime
+
+ def getReadyMessages(self):
+ """Return handles for all messages that the pool is currently ready
+ to send."""
+ now = time.time()
+ if now < self.nextSendTime:
+ return []
+ self.nextSendTime = now + self.interval
+ return self._getBatch()
+
+ def _getBatch(self):
+ """Internal method: called by getReadyMessages to return a single
+ batch of handles."""
+ return self.pickRandom()
+
+class CottrellMixQueue(TimedMixQueue):
+ """A CottrellMixQueue holds a group of files, and returns some of them
+ as requested, according the Cottrell (timed dynamic-pool) mixing
+ algorithm from Mixmaster."""
+ def __init__(self, location, interval=600, minPoolSize=6, maxSendRate=.3):
+ """Create a new queue that yields a batch of message every 'interval'
+ seconds, never allows its pool size to drop below 'minPoolSize',
+ and never sends more than maxSendRate * the current pool size."""
+ TimedMixQueue.__init__(self, location, interval)
+ self.minPoolSize = minPoolSize
+ self.maxSendRate = maxSendRate
+
+ def _getBatch(self):
+ pool = self.count()
+ nTransmit = min(pool-self.minPoolSize, int(pool*self.maxSendRate))
return self.pickRandom(nTransmit)
+
+class BinomialCottrellMixQueue(CottrellMixQueue):
+ """Same algorithm as CottrellMixQueue, but instead of sending N messages
+ from the pool of size P, sends each message with probability N/P."""
+ def _getBatch(self):
+ pool = self.count()
+ nTransmit = min(pool-self.minPoolSize, int(pool*self.maxSendRate))
+ msgProbability = float(nTransmit) / pool
+ return rng.shuffle([ h for h in self.getAllMessages()
+ if self.rng.getFloat() < msgProbability ])
def _secureDelete_bg(files, cleanFile):
pid = os.fork()
Index: test.py
===================================================================
RCS file: /home/minion/cvsroot/src/minion/lib/mixminion/test.py,v
retrieving revision 1.17
retrieving revision 1.18
diff -u -d -r1.17 -r1.18
--- test.py 11 Aug 2002 07:50:34 -0000 1.17
+++ test.py 12 Aug 2002 21:05:50 -0000 1.18
@@ -405,9 +405,37 @@
PRNG.getBytes(15)+PRNG.getBytes(16000)+
PRNG.getBytes(34764)))
+ # Check getInt, getFloat.
for i in xrange(1,10000,17):
self.failUnless(0 <= PRNG.getInt(10) < 10)
self.failUnless(0 <= PRNG.getInt(i) < i)
+ for i in xrange(100):
+ self.failUnless(0 <= PRNG.getFloat() < 1)
+ self.failUnless(0 <= PRNG.getFloat(4) < 1)
+ self.failUnless(0 <= PRNG.getFloat(5) < 1)
+
+ # Make sure shuffle only shuffles the first n.
+ lst = range(100)
+ PRNG.shuffle(lst,10)
+ later = [ item for item in lst[10:] if item >= 10 ]
+ s = later[:]
+ s.sort()
+ self.failUnless(later == s)
+
+ # Make sure shuffle actually shuffles all positions.
+ lists = [ ]
+ for i in xrange(6):
+ lists.append(PRNG.shuffle(lst)[:])
+ # This will fail accidentally once in 10,000,000,000 attempts.
+ for crossSection in zip(*lists):
+ allEq = 1
+ for z in crossSection:
+ if z != crossSection[0]: allEq = 0
+ self.failIf(allEq)
+ for lst in lists:
+ s = lst[:]
+ s.sort()
+ self.assertEquals(s, range(100))
#----------------------------------------------------------------------
import mixminion.Packet