[Author Prev][Author Next][Thread Prev][Thread Next][Author Index][Thread Index]

[minion-cvs] Make retry logic check when the last attempt to *this a...



Update of /home/minion/cvsroot/src/minion/lib/mixminion/server
In directory moria.mit.edu:/tmp/cvs-serv22094/lib/mixminion/server

Modified Files:
	ServerMain.py ServerQueue.py 
Log Message:
Make retry logic check when the last attempt to *this address* failed, not when this message failed. This has subtle anonymity implications; the new way is better.

Index: ServerMain.py
===================================================================
RCS file: /home/minion/cvsroot/src/minion/lib/mixminion/server/ServerMain.py,v
retrieving revision 1.126
retrieving revision 1.127
diff -u -d -r1.126 -r1.127
--- ServerMain.py	14 May 2004 23:41:17 -0000	1.126
+++ ServerMain.py	17 May 2004 05:19:09 -0000	1.127
@@ -307,7 +307,7 @@
            mix."""
         return now + self.queue.getInterval()
 
-class OutgoingQueue(mixminion.server.ServerQueue.DeliveryQueue):
+class OutgoingQueue(mixminion.server.ServerQueue.PerAddressDeliveryQueue):
     """DeliveryQueue to send packets via outgoing MMTP connections.  All
        methods on this class are called from the main thread.  The underlying
        objects in this queue are instances of RelayedPacket.

Index: ServerQueue.py
===================================================================
RCS file: /home/minion/cvsroot/src/minion/lib/mixminion/server/ServerQueue.py,v
retrieving revision 1.40
retrieving revision 1.41
diff -u -d -r1.40 -r1.41
--- ServerQueue.py	14 May 2004 23:41:17 -0000	1.40
+++ ServerQueue.py	17 May 2004 05:19:09 -0000	1.41
@@ -23,7 +23,7 @@
 from mixminion.Filestore import CorruptedFile
 
 __all__ = [ 'DeliveryQueue', 'TimedMixPool', 'CottrellMixPool',
-            'BinomialCottrellMixPool' ]
+            'BinomialCottrellMixPool', 'PerAddressDeliveryQueue' ]
 
 def _calculateNext(lastAttempt, firstAttempt, retrySchedule, canDrop, now):
     """DOCDOC"""
@@ -50,8 +50,11 @@
     if canDrop:
         return None
     else:
-        attempt += (ceilDiv(lastAttempt-attempt+60,self.retrySchedule[-1]) *
-                    self.retrySchedule[-1])
+        if not retrySchedule or retrySchedule[-1]<5:
+            #DOCDOC
+            retrySchedule = [3600]
+        attempt += (ceilDiv(lastAttempt-attempt+60,retrySchedule[-1]) *
+                    retrySchedule[-1])
         return attempt
 
 class _DeliveryState:
@@ -166,10 +169,10 @@
     def getHandle(self):
         return self.handle
 
-    def succeeded(self):
+    def succeeded(self,now=None):
         """Mark this message as having been successfully deleted, removing
            it from the queue."""
-        self.queue.deliverySucceeded(self.handle)
+        self.queue.deliverySucceeded(self.handle,now=now)
         self.queue = self.message = None
 
     def failed(self, retriable=0, now=None):
@@ -200,7 +203,8 @@
        as appropriate.
 
        Users of this class will probably only want to call the
-       queueMessage, sendReadyMessages, and nextMessageReadyAt methods.
+       queueDeliveryMessage, sendReadyMessages, and nextMessageReadyAt
+       methods.
 
        This class caches information about the directory state; it won't
        play nice if multiple instances are looking at the same directory.
@@ -233,7 +237,7 @@
             self.setRetrySchedule(retrySchedule, now)
         else:
             self.setRetrySchedule([0], now)
-        self._repOk()
+        self._repOK()
 
     def setRetrySchedule(self, schedule, now=None):
         """Set the retry schedule for this queue.  A retry schedule is
@@ -267,7 +271,7 @@
             self._lock.acquire()
             self.store.loadAllMetadata(lambda h: _DeliveryState())
             self._rebuildNextAttempt(now)
-            self._repOk()
+            self._repOK()
         finally:
             self._lock.release()
 
@@ -292,9 +296,9 @@
 
         for ds in self.store._metadata_cache.values():
             ds.setNextAttempt(rs, now)
-        self._repOk()
+        self._repOK()
 
-    def _repOk(self):
+    def _repOK(self):
         """Raise an assertion error if the internal state of this object is
            nonsensical."""
         # XXXX Later in the release cycle, we should call this *even* less.
@@ -330,7 +334,7 @@
     def _inspect(self,handle):
         """Returns a (msg, inserted, lastAttempt, nextAttempt) tuple
            for a given message handle.  For testing. """
-        self._repOk()
+        self._repOK()
         o = self.store.getObject(handle)
         ds = self.store.getMetadata(handle)
         return (o, ds.queuedTime, ds.lastAttempt, ds.nextAttempt)
@@ -354,7 +358,7 @@
         """Sends all messages which are not already being sent, and which
            are scheduled to be sent."""
         assert self.retrySchedule is not None
-        self._repOk()
+        self._repOK()
         if now is None:
             now = time.time()
         LOG.trace("DeliveryQueue checking for deliverable messages in %s",
@@ -388,7 +392,7 @@
 
         if messages:
             self._deliverMessages(messages)
-        self._repOk()
+        self._repOK()
 
     def _deliverMessages(self, msgList):
         """Abstract method; Invoked with a list of PendingMessage objects
@@ -419,7 +423,7 @@
         finally:
             self._lock.release()
 
-    def deliverySucceeded(self, handle):
+    def deliverySucceeded(self, handle, now=None):
         """Removes a message from the outgoing queue.  This method
            should be invoked after the corresponding message has been
            successfully delivered.
@@ -489,11 +493,11 @@
         finally:
             self._lock.release()
 
-class AddressState:
-    """DOCDOC"""
+class _AddressState:
+    """DOCDOsC"""
     def __init__(self, address):
         self.address = address
-        self.lastSuccess = self.lastFailure = self.firstFailure
+        self.lastSuccess = self.lastFailure = self.firstFailure = None
 
     def __getstate__(self):
         return ("ADDR-V1", self.address, self.lastSuccess,
@@ -517,10 +521,13 @@
                                           self.firstFailure,
                                           retrySchedule, canDrop=0, now=now)
 
-    def lastActivity(self):
+    def getLastActivity(self):
         events = [ e for e in [self.lastSuccess, self.lastFailure]
                    if e is not None ]
-        return max(events)
+        if events:
+            return max(events)
+        else:
+            return None
 
     def succeeded(self, now=None):
         if not now:
@@ -529,19 +536,22 @@
         self.lastFailure = None
         self.firstFailure = None
 
-    def failed(self, now=None):
+    def failed(self, attempt, now=None):
         if not now:
             now = time.time()
         if not self.firstFailure:
-            self.firstFailure = now
-        self.lastFailure = now
+            self.firstFailure = attempt
+        self.lastFailure = attempt
 
 class PerAddressDeliveryQueue(DeliveryQueue):
     """DOCDOC"""
     def __init__(self, location, retrySchedule=None, now=None, name=None):
-        self.addressStateDB = mixminion.Filestore.WritethroughShelf(
-            os.path.join(location,"addressStatus.db"))
-        DeliveryQueue.__init__(self=self, location=location,
+        self.addressStateDB = mixminion.Filestore.WritethroughDict(
+            filename=os.path.join(location,"addressStatus.db"),
+            purpose="address state")
+        if retrySchedule is None:
+            retrySchedule = [3600]
+        DeliveryQueue.__init__(self, location=location,
                                retrySchedule=retrySchedule, now=now, name=name)
 
     def sync(self):
@@ -561,27 +571,24 @@
     def _rebuildNextAttempt(self, now=None):
         self._lock.acquire()
         try:
-            self.addressStates = {}
-            for k in self.addressStateDB.keys():
-                v = self.addressStateDB[k]
-                self.addressStates[k.address] = (k, v)
             for ds in self.store._metadata_cache.values():
-                if not self.addressStates.has_key(ds.address):
-                    as = AddressState(ds.address)
+                if not self.addressStateDB.has_key(str(ds.address)):
+                    as = _AddressState(ds.address)
                     self.addressStateDB[str(ds.address)] = as
-                    self.addressStates[ds.address] = (str(ds.address),as)
             if not self.retrySchedule:
                 rs = [3600]
+                self.totalLifetime = 3600
             else:
                 rs = self.retrySchedule
-                self.totalLifetime = reduce(operator.plus,self.retrySchedule,0)
-            for k, as in self.addressStates.values():
+                self.totalLifetime = reduce(operator.add,self.retrySchedule,0)
+            for as in self.addressStateDB.values():
                 as.setNextAttempt(rs, now)
-            self._repOk()
+            self._repOK()
         finally:
             self._lock.release()
 
-    def _repOk(self):
+    def _repOK(self):
+        #XXXX008
         pass
 
     def removeExpiredMessages(self, now=None):
@@ -589,26 +596,34 @@
         assert self.retrySchedule is not None
         self._lock.acquire()
         try:
-            for k, as in self.addressStates.values():
-                if as.lastActivity and (
-                    as.lastActivity + self.totalLifetime < now):
-                    del self.addressStates[as.address]
-                    del self.addressStateDB[k]
+            have = {}
             for h, ds in self.store._metadata_cache.items():
                 if ds.queuedTime + self.totalLifetime < now:
                     self.removeMessage(h)
+                else:
+                    have[ds.address]=1
+
+            for k, as in self.addressStateDB.items():
+                if have.has_key(as.address):
+                    continue
+                lastActivity = as.getLastActivity()
+                if lastActivity and (
+                    lastActivity + self.totalLifetime < now):
+                    del self.addressStateDB[k]
         finally:
             self._lock.release()
 
-    def _getAddressState(self, address):
-       try:
-           k, as = self.addressStates[address]
-       except KeyError:
-           as = AddressState(address)
-           k = str(address)
-           self.addressStateDB[k] = as
-           self.addressStates[address] = as
-       return as
+    def _getAddressState(self, address, now=None):
+        try:
+            as = self.addressStateDB[str(address)]
+        except KeyError:
+            as = self.addressStateDB[str(address)] = _AddressState(address)
+            as.setNextAttempt(self.retrySchedule, now)
+        return as
+
+    def queueDeliveryMessage(self, msg, address, now=None):
+        self._getAddressState(address, now=now)
+        return DeliveryQueue.queueDeliveryMessage(self,msg,address,now)
 
     def sendReadyMessages(self, now=None):
         if now is None:
@@ -628,11 +643,11 @@
                     LOG.trace("     [%s] is expired", h)
                     self.removeMessage(h)
                     continue
-                addressState = self._getAddressState(state.address)
+                addressState = self._getAddressState(state.address, now)
                 if addressState.nextAttempt <= now:
                     LOG.trace("     [%s] is ready for next attempt on %s", h,
                               state.address)
-                    messages.append(PendingMessage(h,self,addr))
+                    messages.append(PendingMessage(h,self,state.address))
                     state.setPending(now)
                 else:
                     LOG.trace("     [%s] will wait for next attempt on %s",h,
@@ -650,20 +665,23 @@
     def sync(self):
         self.addressStateDB.sync()
 
-    def deliverySucceeded(self, handle):
+    def close(self):
+        self.addressStateDB.close()
+
+    def deliverySucceeded(self, handle, now=None):
         assert self.retrySchedule is not None
         self._lock.acquire()
         try:
             LOG.trace("PerAddressDeliveryQueue got successful delivery for %s from %s",
                       handle, self.qname)
             try:
-                mState = self.getMetadata(handle)
+                mState = self.store.getMetadata(handle)
             except CorruptedFile:
                 mState = None
             if mState:
-                aState = self._getAddressState(mState.address)
-                changed = aState.succeeded()
-                aState.setNextAttempt(self.retrySchedule)
+                aState = self._getAddressState(mState.address, now)
+                changed = aState.succeeded(now=now)
+                aState.setNextAttempt(self.retrySchedule, now)
                 self.addressStateDB[str(mState.address)] = aState
 
             self.removeMessage(handle)
@@ -677,7 +695,7 @@
         self._lock.acquire()
         try:
             try:
-                mState = self.getMetadata(handle)
+                mState = self.store.getMetadata(handle)
             except KeyError:
                 mState = None
             except CorruptedFile:
@@ -698,15 +716,39 @@
             if not retriable:
                 LOG.trace("     (Giving up on %s)", handle)
                 self.removeMessage(handle)
-                return
 
-            aState = self._getAddressState(mState.address)
-            aState.failed(now=now)
+            aState = self._getAddressState(mState.address, now)
+            aState.failed(attempt=last,now=now)
             aState.setNextAttempt(self.retrySchedule,now=now)
-            self.addressStateDB[str(aState.address)] = aState
+            self.addressStateDB[str(aState.address)] = aState # flush to db.
         finally:
             self._lock.release()
 
+    def _inspect(self,handle):
+        """Returns a (msg, state, addressState) tuple for a given
+           message handle.  For testing."""
+        self._repOK()
+        o = self.store.getObject(handle)
+        ds = self.store.getMetadata(handle)
+        as = self._getAddressState(ds.address)
+        return (o, ds, as)
+
+    def _repOK(self):
+        """Raise an assertion error if the internal state of this object is
+           nonsensical."""
+        # XXXX Later in the release cycle, we should call this *even* less.
+        # XXXX It adds ~8-9ms on my laptop for ~400 messages
+        self._lock.acquire()
+        try:
+            DeliveryQueue._repOK(self)
+            for h in self.store._metadata_cache.keys():
+                ds = self.store._metadata_cache[h]
+                as = self._getAddressState(ds.address)
+                assert as.address == ds.address
+        finally:
+            self._lock.release()
+
+
 class TimedMixPool(mixminion.Filestore.ObjectStore):
     """A TimedMixPool holds a group of files, and returns some of them
        as requested, according to a mixing algorithm that sends a batch