[Author Prev][Author Next][Thread Prev][Thread Next][Author Index][Thread Index]

[minion-cvs] Make retry logic check when the last attempt to *this a...



Update of /home/minion/cvsroot/src/minion/lib/mixminion
In directory moria.mit.edu:/tmp/cvs-serv22094/lib/mixminion

Modified Files:
	Filestore.py test.py 
Log Message:
Make retry logic check when the last attempt to *this address* failed, not when this message failed. This has subtle anonymity implications; the new way is better.

Index: Filestore.py
===================================================================
RCS file: /home/minion/cvsroot/src/minion/lib/mixminion/Filestore.py,v
retrieving revision 1.19
retrieving revision 1.20
diff -u -d -r1.19 -r1.20
--- Filestore.py	14 May 2004 23:43:04 -0000	1.19
+++ Filestore.py	17 May 2004 05:19:07 -0000	1.20
@@ -872,6 +872,12 @@
     def keys(self):
         return self.cache.keys()
 
+    def values(self):
+        return self.cache.values()
+
+    def items(self):
+        return self.cache.items()
+
     def load(self):
         keys = self.db.keys()
         self.cache = cache = {}

Index: test.py
===================================================================
RCS file: /home/minion/cvsroot/src/minion/lib/mixminion/test.py,v
retrieving revision 1.197
retrieving revision 1.198
diff -u -d -r1.197 -r1.198
--- test.py	14 May 2004 23:44:09 -0000	1.197
+++ test.py	17 May 2004 05:19:07 -0000	1.198
@@ -16,6 +16,7 @@
 import cPickle
 import cStringIO
 import gzip
+import operator
 import os
 import re
 import socket
@@ -241,6 +242,8 @@
         """Fail unless 'item not in lst'"""
         if (item in lst):
             self.fail("%r in %r"%(item,lst))
+    def assertIEquals(self, i1, i2):
+        self.assertEquals(long(i1), long(i2))
 
 #----------------------------------------------------------------------
 # Tests for common functionality
@@ -2905,6 +2908,16 @@
     def _deliverMessages(self, msgList):
         self._msgs = msgList
 
+class TestPerAddressDeliveryQueue(PerAddressDeliveryQueue):
+    def __init__(self,d,now=None):
+        PerAddressDeliveryQueue.__init__(self,d,now=now)
+        self._msgs = None
+    def sendReadyMessages(self, *x, **y):
+        self._msgs = None
+        PerAddressDeliveryQueue.sendReadyMessages(self, *x,**y)
+    def _deliverMessages(self, msgList):
+        self._msgs = msgList
+
 class FStoreTestBase(TestCase):
     def unlink(self, fns):
         for f in fns:
@@ -3258,6 +3271,16 @@
         self.assertUnorderedEq(d.keys(), ["mulligan","bliznert"])
         d.close()
 
+class _TestAddr:
+    def __init__(self,s):
+        self.s = s
+    def __str__(self):
+        return self.s
+    def __hash__(self):
+        return hash(self.s+"_but different.")
+    def __cmp__(self, o):
+        return cmp(self.s, o.s)
+
 class QueueTests(FStoreTestBase):
     def setUp(self):
         mixminion.Common.installSIGCHLDHandler()
@@ -3343,9 +3366,143 @@
         queue.removeAll(self.unlink)
         queue.cleanQueue(self.unlink)
 
-    def testAddressBasedQueue(self):
-        #XXXX008
-        pass
+    def testPerAddressDeliveryQueue(self):
+        PADQ = TestPerAddressDeliveryQueue
+        A1 = _TestAddr("FirstAddress")
+        A2 = _TestAddr("SecondAddress")
+        A3 = _TestAddr("ThirdAddress")
+        HOUR = 60*60
+        MIN = 60
+        RETRY = [ HOUR, HOUR, HOUR, HOUR*2, HOUR*4, HOUR*8 ]
+        assert reduce(operator.add, RETRY) == 17*HOUR
+        loc = mix_mktemp()
+        start = time.time()
+        q = PADQ(loc,now=start)
+        q.setRetrySchedule(RETRY)
+        h1 = q.queueDeliveryMessage("Message number one", A1, start)
+        h2 = q.queueDeliveryMessage("Message number two", A2, start)
+        h3 = q.queueDeliveryMessage("Message number three", A1, start)
+        q._repOK()
+
+        m, ds, as = q._inspect(h1)
+        self.assertEquals(m, "Message number one")
+        self.assertEquals(ds.address, A1)
+        self.assertEquals(as.address, A1)
+        self.assert_(not ds.isPending())
+        self.assertEquals(ds.lastAttempt, None)
+        self.assertEquals(as.lastSuccess, None)
+        self.assertEquals(as.lastFailure, None)
+        self.assertEquals(as.firstFailure, None)
+        q.removeExpiredMessages(start)
+        self.assertEquals(q.store.count(), 3)
+
+        q.sendReadyMessages(start+10)
+        msgs = self._pendingMsgDict(q._msgs)
+        self.assertEquals(len(msgs), 3)
+        self.assert_(q._inspect(h1)[1].isPending())
+        self.assertEquals(msgs[h1].getAddress(), A1)
+        self.assertEquals(msgs[h1].getMessage(), "Message number one")
+        # Succeed on address 2.
+        self.assertEquals(msgs[h2].getAddress(), A2)
+        msgs[h2].succeeded(now=start+20)
+        # Succeed on _one_ message from address 1, but not the other.
+        msgs[h1].succeeded(now=start+30)
+        msgs[h3].failed(now=start+40, retriable=1)
+        self.assert_(not q._inspect(h3)[1].isPending())
+
+        as = q.addressStateDB[str(A1)]
+        self.assertIEquals(as.lastSuccess-start, 30)
+        self.assertIEquals(as.firstFailure-start, 10)
+        self.assertIEquals(as.lastFailure-start, 10)
+        self.assertIEquals(as.nextAttempt-start, 10+HOUR)
+        as = q.addressStateDB[str(A2)]
+        self.assertIEquals(as.lastSuccess-start, 20)
+        self.assertEquals(as.firstFailure, None)
+        self.assertEquals(as.lastFailure, None)
+        self.assertIEquals(as.nextAttempt-start, 20)
+
+        # Queue two more messages; the one on A2 gets tried; the ones on
+        # A1 doesn't get tried for a while.
+        h4 = q.queueDeliveryMessage("This is message four", A1, start+MIN)
+        h5 = q.queueDeliveryMessage("This is message five", A2, start+MIN)
+        q.sendReadyMessages(start+2*MIN)
+        msgs = self._pendingMsgDict(q._msgs)
+        self.assertEquals(len(msgs), 1)
+        self.assertEquals(msgs[h5].getMessage(), "This is message five")
+        msgs[h5].succeeded(now=start+3*MIN)
+
+        # Wait an "hour"; then, the A1 messages both get tried.
+        q.sendReadyMessages(start+HOUR+MIN)
+        msgs = self._pendingMsgDict(q._msgs)
+        self.assertUnorderedEq(msgs.keys(), [h3, h4])
+        as = q.addressStateDB[str(A1)]
+        msgs[h3].failed(now=start+HOUR+MIN+5, retriable=1)
+        msgs[h4].failed(now=start+HOUR+MIN+5, retriable=1)
+
+        self.assertIEquals(as.firstFailure-start, 10)
+        self.assertIEquals(as.lastFailure-start, HOUR+MIN)
+        self.assertIEquals(as.nextAttempt-start, 10+2*HOUR)
+
+        h6 = q.queueDeliveryMessage("The sixth message", A1,
+                                    now=start+HOUR*17-MIN)
+
+        # 17 hours and 30 seconds after 'start', exactly one message
+        # is ready to be removed.
+        self.assertEquals(3, q.store.count())
+        q.removeExpiredMessages(now=start+HOUR*17+30)
+        self.assertEquals(2, q.store.count())
+        self.assertUnorderedEq(q.store.getAllMessages(), [h4,h6])
+        self.assertUnorderedEq(q.addressStateDB.keys(), [str(A1), str(A2)])
+        # 17 hours and 2 minutes after 'start', another one is ready to be
+        # removed.
+        q.removeExpiredMessages(now=start+HOUR*17+MIN*2)
+        self.assertUnorderedEq(q.store.getAllMessages(), [h6])
+        self.assertUnorderedEq(q.addressStateDB.keys(), [str(A1), str(A2)])
+
+        # 17 hours and 5 minutes: A2 expires (no activity for a long time.)
+        q.removeExpiredMessages(now=start+HOUR*17+MIN*5)
+        self.assertUnorderedEq(q.store.getAllMessages(), [h6])
+        self.assertUnorderedEq(q.addressStateDB.keys(), [str(A1)])
+
+        # Make sure that messages keep getting retried...
+        as = q.addressStateDB[str(A1)]
+        # (Reset nextDelivery on A1, since not all the delivery attempts
+        # have really happened.)
+        q.sendReadyMessages(now=start+HOUR*17+MIN*15)
+        msgs = self._pendingMsgDict(q._msgs)
+        msgs[h6].failed(now=start+HOUR*17+MIN*16, retriable=0)
+        self.assertEquals(as.nextAttempt - start, 25*HOUR+10)
+        self.assertEquals([], q.store.getAllMessages())
+
+        # Test reloading.
+        q.sync()
+        q.close()
+        q = PADQ(loc,now=start+20*HOUR)
+        q.setRetrySchedule(RETRY,now=start+20*HOUR)
+        q._repOK()
+        self.assertEquals(q._getAddressState(A1).nextAttempt - start,
+                          25*HOUR+10)
+        q.close()
+
+        # Test upgrading
+        loc2 = mix_mktemp()
+        dq = TestDeliveryQueue(loc2, now=start)
+        hA = dq.queueDeliveryMessage("Grandfathered msg 1", A2, now=start)
+        hB = dq.queueDeliveryMessage("Grandfathered msg 2", A3, now=start+30)
+        q = PADQ(loc2,now=start+MIN)
+        self.assertUnorderedEq([hA,hB],q.store.getAllMessages())
+        q.sendReadyMessages(now=start+2*MIN)
+        msgs = self._pendingMsgDict(q._msgs)
+        self.assertUnorderedEq(msgs.keys(), [hA,hB])
+        self.assertEquals(msgs[hA].getAddress(),A2)
+        self.assertEquals(msgs[hB].getAddress(),A3)
+        q.close()
+
+    def _pendingMsgDict(self, lst):
+        d = {}
+        for m in lst:
+            d[m.getHandle()] = m
+        return d
 
     def testMixPools(self):
         d_m = mix_mktemp("qm")
@@ -7600,8 +7757,8 @@
     loader = unittest.TestLoader()
     tc = loader.loadTestsFromTestCase
 
-    if 0:
-        suite.addTest(tc(FilestoreTests))
+    if 1:
+        suite.addTest(tc(QueueTests))
         return suite
     testClasses = [MiscTests,
                    MinionlibCryptoTests,