[Author Prev][Author Next][Thread Prev][Thread Next][Author Index][Thread Index]
[minion-cvs] Make retry logic check when the last attempt to *this a...
Update of /home/minion/cvsroot/src/minion/lib/mixminion
In directory moria.mit.edu:/tmp/cvs-serv22094/lib/mixminion
Modified Files:
Filestore.py test.py
Log Message:
Make retry logic check when the last attempt to *this address* failed, not when this message failed. This has subtle anonymity implications; the new way is better.
Index: Filestore.py
===================================================================
RCS file: /home/minion/cvsroot/src/minion/lib/mixminion/Filestore.py,v
retrieving revision 1.19
retrieving revision 1.20
diff -u -d -r1.19 -r1.20
--- Filestore.py 14 May 2004 23:43:04 -0000 1.19
+++ Filestore.py 17 May 2004 05:19:07 -0000 1.20
@@ -872,6 +872,12 @@
def keys(self):
return self.cache.keys()
+ def values(self):
+ return self.cache.values()
+
+ def items(self):
+ return self.cache.items()
+
def load(self):
keys = self.db.keys()
self.cache = cache = {}
Index: test.py
===================================================================
RCS file: /home/minion/cvsroot/src/minion/lib/mixminion/test.py,v
retrieving revision 1.197
retrieving revision 1.198
diff -u -d -r1.197 -r1.198
--- test.py 14 May 2004 23:44:09 -0000 1.197
+++ test.py 17 May 2004 05:19:07 -0000 1.198
@@ -16,6 +16,7 @@
import cPickle
import cStringIO
import gzip
+import operator
import os
import re
import socket
@@ -241,6 +242,8 @@
"""Fail unless 'item not in lst'"""
if (item in lst):
self.fail("%r in %r"%(item,lst))
+ def assertIEquals(self, i1, i2):
+ self.assertEquals(long(i1), long(i2))
#----------------------------------------------------------------------
# Tests for common functionality
@@ -2905,6 +2908,16 @@
def _deliverMessages(self, msgList):
self._msgs = msgList
+class TestPerAddressDeliveryQueue(PerAddressDeliveryQueue):
+ def __init__(self,d,now=None):
+ PerAddressDeliveryQueue.__init__(self,d,now=now)
+ self._msgs = None
+ def sendReadyMessages(self, *x, **y):
+ self._msgs = None
+ PerAddressDeliveryQueue.sendReadyMessages(self, *x,**y)
+ def _deliverMessages(self, msgList):
+ self._msgs = msgList
+
class FStoreTestBase(TestCase):
def unlink(self, fns):
for f in fns:
@@ -3258,6 +3271,16 @@
self.assertUnorderedEq(d.keys(), ["mulligan","bliznert"])
d.close()
+class _TestAddr:
+ def __init__(self,s):
+ self.s = s
+ def __str__(self):
+ return self.s
+ def __hash__(self):
+ return hash(self.s+"_but different.")
+ def __cmp__(self, o):
+ return cmp(self.s, o.s)
+
class QueueTests(FStoreTestBase):
def setUp(self):
mixminion.Common.installSIGCHLDHandler()
@@ -3343,9 +3366,143 @@
queue.removeAll(self.unlink)
queue.cleanQueue(self.unlink)
- def testAddressBasedQueue(self):
- #XXXX008
- pass
+ def testPerAddressDeliveryQueue(self):
+ PADQ = TestPerAddressDeliveryQueue
+ A1 = _TestAddr("FirstAddress")
+ A2 = _TestAddr("SecondAddress")
+ A3 = _TestAddr("ThirdAddress")
+ HOUR = 60*60
+ MIN = 60
+ RETRY = [ HOUR, HOUR, HOUR, HOUR*2, HOUR*4, HOUR*8 ]
+ assert reduce(operator.add, RETRY) == 17*HOUR
+ loc = mix_mktemp()
+ start = time.time()
+ q = PADQ(loc,now=start)
+ q.setRetrySchedule(RETRY)
+ h1 = q.queueDeliveryMessage("Message number one", A1, start)
+ h2 = q.queueDeliveryMessage("Message number two", A2, start)
+ h3 = q.queueDeliveryMessage("Message number three", A1, start)
+ q._repOK()
+
+ m, ds, as = q._inspect(h1)
+ self.assertEquals(m, "Message number one")
+ self.assertEquals(ds.address, A1)
+ self.assertEquals(as.address, A1)
+ self.assert_(not ds.isPending())
+ self.assertEquals(ds.lastAttempt, None)
+ self.assertEquals(as.lastSuccess, None)
+ self.assertEquals(as.lastFailure, None)
+ self.assertEquals(as.firstFailure, None)
+ q.removeExpiredMessages(start)
+ self.assertEquals(q.store.count(), 3)
+
+ q.sendReadyMessages(start+10)
+ msgs = self._pendingMsgDict(q._msgs)
+ self.assertEquals(len(msgs), 3)
+ self.assert_(q._inspect(h1)[1].isPending())
+ self.assertEquals(msgs[h1].getAddress(), A1)
+ self.assertEquals(msgs[h1].getMessage(), "Message number one")
+ # Succeed on address 2.
+ self.assertEquals(msgs[h2].getAddress(), A2)
+ msgs[h2].succeeded(now=start+20)
+ # Succeed on _one_ message from address 1, but not the other.
+ msgs[h1].succeeded(now=start+30)
+ msgs[h3].failed(now=start+40, retriable=1)
+ self.assert_(not q._inspect(h3)[1].isPending())
+
+ as = q.addressStateDB[str(A1)]
+ self.assertIEquals(as.lastSuccess-start, 30)
+ self.assertIEquals(as.firstFailure-start, 10)
+ self.assertIEquals(as.lastFailure-start, 10)
+ self.assertIEquals(as.nextAttempt-start, 10+HOUR)
+ as = q.addressStateDB[str(A2)]
+ self.assertIEquals(as.lastSuccess-start, 20)
+ self.assertEquals(as.firstFailure, None)
+ self.assertEquals(as.lastFailure, None)
+ self.assertIEquals(as.nextAttempt-start, 20)
+
+ # Queue two more messages; the one on A2 gets tried; the ones on
+ # A1 doesn't get tried for a while.
+ h4 = q.queueDeliveryMessage("This is message four", A1, start+MIN)
+ h5 = q.queueDeliveryMessage("This is message five", A2, start+MIN)
+ q.sendReadyMessages(start+2*MIN)
+ msgs = self._pendingMsgDict(q._msgs)
+ self.assertEquals(len(msgs), 1)
+ self.assertEquals(msgs[h5].getMessage(), "This is message five")
+ msgs[h5].succeeded(now=start+3*MIN)
+
+ # Wait an "hour"; then, the A1 messages both get tried.
+ q.sendReadyMessages(start+HOUR+MIN)
+ msgs = self._pendingMsgDict(q._msgs)
+ self.assertUnorderedEq(msgs.keys(), [h3, h4])
+ as = q.addressStateDB[str(A1)]
+ msgs[h3].failed(now=start+HOUR+MIN+5, retriable=1)
+ msgs[h4].failed(now=start+HOUR+MIN+5, retriable=1)
+
+ self.assertIEquals(as.firstFailure-start, 10)
+ self.assertIEquals(as.lastFailure-start, HOUR+MIN)
+ self.assertIEquals(as.nextAttempt-start, 10+2*HOUR)
+
+ h6 = q.queueDeliveryMessage("The sixth message", A1,
+ now=start+HOUR*17-MIN)
+
+ # 17 hours and 30 seconds after 'start', exactly one message
+ # is ready to be removed.
+ self.assertEquals(3, q.store.count())
+ q.removeExpiredMessages(now=start+HOUR*17+30)
+ self.assertEquals(2, q.store.count())
+ self.assertUnorderedEq(q.store.getAllMessages(), [h4,h6])
+ self.assertUnorderedEq(q.addressStateDB.keys(), [str(A1), str(A2)])
+ # 17 hours and 2 minutes after 'start', another one is ready to be
+ # removed.
+ q.removeExpiredMessages(now=start+HOUR*17+MIN*2)
+ self.assertUnorderedEq(q.store.getAllMessages(), [h6])
+ self.assertUnorderedEq(q.addressStateDB.keys(), [str(A1), str(A2)])
+
+ # 17 hours and 5 minutes: A2 expires (no activity for a long time.)
+ q.removeExpiredMessages(now=start+HOUR*17+MIN*5)
+ self.assertUnorderedEq(q.store.getAllMessages(), [h6])
+ self.assertUnorderedEq(q.addressStateDB.keys(), [str(A1)])
+
+ # Make sure that messages keep getting retried...
+ as = q.addressStateDB[str(A1)]
+ # (Reset nextDelivery on A1, since not all the delivery attempts
+ # have really happened.)
+ q.sendReadyMessages(now=start+HOUR*17+MIN*15)
+ msgs = self._pendingMsgDict(q._msgs)
+ msgs[h6].failed(now=start+HOUR*17+MIN*16, retriable=0)
+ self.assertEquals(as.nextAttempt - start, 25*HOUR+10)
+ self.assertEquals([], q.store.getAllMessages())
+
+ # Test reloading.
+ q.sync()
+ q.close()
+ q = PADQ(loc,now=start+20*HOUR)
+ q.setRetrySchedule(RETRY,now=start+20*HOUR)
+ q._repOK()
+ self.assertEquals(q._getAddressState(A1).nextAttempt - start,
+ 25*HOUR+10)
+ q.close()
+
+ # Test upgrading
+ loc2 = mix_mktemp()
+ dq = TestDeliveryQueue(loc2, now=start)
+ hA = dq.queueDeliveryMessage("Grandfathered msg 1", A2, now=start)
+ hB = dq.queueDeliveryMessage("Grandfathered msg 2", A3, now=start+30)
+ q = PADQ(loc2,now=start+MIN)
+ self.assertUnorderedEq([hA,hB],q.store.getAllMessages())
+ q.sendReadyMessages(now=start+2*MIN)
+ msgs = self._pendingMsgDict(q._msgs)
+ self.assertUnorderedEq(msgs.keys(), [hA,hB])
+ self.assertEquals(msgs[hA].getAddress(),A2)
+ self.assertEquals(msgs[hB].getAddress(),A3)
+ q.close()
+
+ def _pendingMsgDict(self, lst):
+ d = {}
+ for m in lst:
+ d[m.getHandle()] = m
+ return d
def testMixPools(self):
d_m = mix_mktemp("qm")
@@ -7600,8 +7757,8 @@
loader = unittest.TestLoader()
tc = loader.loadTestsFromTestCase
- if 0:
- suite.addTest(tc(FilestoreTests))
+ if 1:
+ suite.addTest(tc(QueueTests))
return suite
testClasses = [MiscTests,
MinionlibCryptoTests,