[Author Prev][Author Next][Thread Prev][Thread Next][Author Index][Thread Index]

[minion-cvs] Trivial doc patch to servermain; add new (untested) ser...



Update of /home/minion/cvsroot/src/minion/lib/mixminion/server
In directory moria.mit.edu:/tmp/cvs-serv7114/lib/mixminion/server

Modified Files:
	ServerMain.py ServerQueue.py 
Log Message:
Trivial doc patch to servermain; add new (untested) server queue to handle per-address retry logic.

Index: ServerMain.py
===================================================================
RCS file: /home/minion/cvsroot/src/minion/lib/mixminion/server/ServerMain.py,v
retrieving revision 1.125
retrieving revision 1.126
diff -u -d -r1.125 -r1.126
--- ServerMain.py	26 Apr 2004 16:55:46 -0000	1.125
+++ ServerMain.py	14 May 2004 23:41:17 -0000	1.126
@@ -1,7 +1,7 @@
 # Copyright 2002-2004 Nick Mathewson.  See LICENSE for licensing information.
 # $Id$
 
-"""mixminion.ServerMain
+"""mixminion.server.ServerMain
 
    The main loop and related functionality for a Mixminion server.
 

Index: ServerQueue.py
===================================================================
RCS file: /home/minion/cvsroot/src/minion/lib/mixminion/server/ServerQueue.py,v
retrieving revision 1.39
retrieving revision 1.40
diff -u -d -r1.39 -r1.40
--- ServerQueue.py	2 May 2004 18:45:16 -0000	1.39
+++ ServerQueue.py	14 May 2004 23:41:17 -0000	1.40
@@ -6,23 +6,54 @@
    Facilities for retriable delivery queues, and for mix pools.
    """
 
+import cPickle
 import os
+import operator
 import time
 import stat
 import sys
-import cPickle
 import threading
 
 import mixminion.Filestore
 
 from mixminion.Common import MixError, MixFatalError, secureDelete, LOG, \
-     createPrivateDir, readPickled, writePickled, formatTime, readFile
+     createPrivateDir, readPickled, writePickled, formatTime, readFile, \
+     ceilDiv
 from mixminion.Crypto import getCommonPRNG
 from mixminion.Filestore import CorruptedFile
 
 __all__ = [ 'DeliveryQueue', 'TimedMixPool', 'CottrellMixPool',
             'BinomialCottrellMixPool' ]
 
+def _calculateNext(lastAttempt, firstAttempt, retrySchedule, canDrop, now):
+    """DOCDOC"""
+    # If we've never tried to deliver the message, it's ready to
+    # go immediately.
+    if lastAttempt is None:
+        return now
+
+    # Otherwise, we count from the time the message was first queued,
+    # until we find a scheduled delivery that falls after the last
+    # attempted delivery.
+    #
+    # This scheduled delivery may be in the past.  That's okay: it only
+    # means that we've missed a scheduled delivery, and we can try again
+    # immediately.
+    attempt = firstAttempt
+    for interval in retrySchedule:
+        attempt += interval
+        if attempt > lastAttempt:
+            return attempt
+
+    # Oops: there are no scheduled deliveries after the last delivery.
+    # Time to drop this message, or go into holding mode.
+    if canDrop:
+        return None
+    else:
+        attempt += (ceilDiv(lastAttempt-attempt+60,self.retrySchedule[-1]) *
+                    self.retrySchedule[-1])
+        return attempt
+
 class _DeliveryState:
     """Helper class: holds the state needed to schedule delivery or
        eventual abandonment of a message in a DeliveryQueue."""
@@ -33,7 +64,7 @@
     #    deliver the message. (None means 'never').
     # address: Pickleable object holding address information.  Delivery
     #    code uses this field to group messages by address before loading
-    #    them all from disk.
+    #    them all from disk.  Must be usable as hash key.
     # pendingAt: None (if we're not sending this message), or a time
     #    at which we begain sending this message.
     # nextAttempt: None, or the time at which we'll next try to send
@@ -102,32 +133,11 @@
             now = time.time()
 
         self.remove = 0
-        last = self.lastAttempt
-
-        # If we've never tried to deliver the message, it's ready to
-        # go immediately.
-        if last is None:
-            self.nextAttempt = now
-            return
-
-        # Otherwise, we count from the time the message was first queued,
-        # until we find a scheduled delivery that falls after the last
-        # attempted delivery.
-        #
-        # This scheduled delivery may be in the past.  That's okay: it only
-        # means that we've missed a scheduled delivery, and we can try again
-        # immediately.
-        attempt = self.queuedTime
-        for interval in retrySchedule:
-            attempt += interval
-            if attempt > last:
-                self.nextAttempt = attempt
-                return
 
-        # Oops: there are no scheduled deliveries after the last delivery.
-        # Time to drop this message.
-        self.nextAttempt = None
-        self.remove = 1
+        self.nextAttempt = _calculateNext(self.lastAttempt, self.queuedTime,
+                                          retrySchedule, canDrop=1, now=now)
+        if self.nextAttempt is None:
+            self.remove = 1
 
     def setLastAttempt(self, when):
         """Update time of the last attempted delivery."""
@@ -271,7 +281,7 @@
 
     def _rebuildNextAttempt(self, now=None):
         """Helper: Reconstruct self.nextAttempt from self.retrySchedule and
-           self.deliveryState.
+           self.deliveryState. DOCDOC
 
            Callers must hold self._lock.
         """
@@ -360,10 +370,10 @@
                 if state.isPending():
                     LOG.trace("     [%s] is pending delivery", h)
                     continue
-                elif state and state.isRemovable():
+                elif state.isRemovable():
                     LOG.trace("     [%s] is expired", h)
                     self.removeMessage(h)
-                elif (not state) or state.nextAttempt <= now:
+                elif state.nextAttempt <= now:
                     LOG.trace("     [%s] is ready for delivery", h)
                     if state is None:
                         addr = None
@@ -479,6 +489,224 @@
         finally:
             self._lock.release()
 
+class AddressState:
+    """DOCDOC"""
+    def __init__(self, address):
+        self.address = address
+        self.lastSuccess = self.lastFailure = self.firstFailure
+
+    def __getstate__(self):
+        return ("ADDR-V1", self.address, self.lastSuccess,
+                self.lastFailure, self.firstFailure)
+
+    def __setstate__(self, state):
+        if state[0] == 'ADDR-V1':
+            _, self.address, self.lastSuccess, self.lastFailure, \
+               self.firstFailure = state
+        else:
+            #XXXX008 This is way too extreme.
+            raise MixFatalError("Unrecognized delivery state")
+
+        self.nextAttempt = None
+
+    def setNextAttempt(self, retrySchedule, now=None):
+        if not now:
+            now = time.time()
+
+        self.nextAttempt = _calculateNext(self.lastFailure,
+                                          self.firstFailure,
+                                          retrySchedule, canDrop=0, now=now)
+
+    def lastActivity(self):
+        events = [ e for e in [self.lastSuccess, self.lastFailure]
+                   if e is not None ]
+        return max(events)
+
+    def succeeded(self, now=None):
+        if not now:
+            now = time.time()
+        self.lastSuccess = now
+        self.lastFailure = None
+        self.firstFailure = None
+
+    def failed(self, now=None):
+        if not now:
+            now = time.time()
+        if not self.firstFailure:
+            self.firstFailure = now
+        self.lastFailure = now
+
+class PerAddressDeliveryQueue(DeliveryQueue):
+    """DOCDOC"""
+    def __init__(self, location, retrySchedule=None, now=None, name=None):
+        self.addressStateDB = mixminion.Filestore.WritethroughShelf(
+            os.path.join(location,"addressStatus.db"))
+        DeliveryQueue.__init__(self=self, location=location,
+                               retrySchedule=retrySchedule, now=now, name=name)
+
+    def sync(self):
+        self._lock.acquire()
+        try:
+            self.addressStateDB.sync()
+        finally:
+            self._lock.release()
+
+    def _rescan(self):
+        try:
+            self._lock.acquire()
+            DeliveryQueue._rescan(self)
+        finally:
+            self._lock.release()
+
+    def _rebuildNextAttempt(self, now=None):
+        self._lock.acquire()
+        try:
+            self.addressStates = {}
+            for k in self.addressStateDB.keys():
+                v = self.addressStateDB[k]
+                self.addressStates[k.address] = (k, v)
+            for ds in self.store._metadata_cache.values():
+                if not self.addressStates.has_key(ds.address):
+                    as = AddressState(ds.address)
+                    self.addressStateDB[str(ds.address)] = as
+                    self.addressStates[ds.address] = (str(ds.address),as)
+            if not self.retrySchedule:
+                rs = [3600]
+            else:
+                rs = self.retrySchedule
+                self.totalLifetime = reduce(operator.plus,self.retrySchedule,0)
+            for k, as in self.addressStates.values():
+                as.setNextAttempt(rs, now)
+            self._repOk()
+        finally:
+            self._lock.release()
+
+    def _repOk(self):
+        pass
+
+    def removeExpiredMessages(self, now=None):
+        """DOCDOC"""
+        assert self.retrySchedule is not None
+        self._lock.acquire()
+        try:
+            for k, as in self.addressStates.values():
+                if as.lastActivity and (
+                    as.lastActivity + self.totalLifetime < now):
+                    del self.addressStates[as.address]
+                    del self.addressStateDB[k]
+            for h, ds in self.store._metadata_cache.items():
+                if ds.queuedTime + self.totalLifetime < now:
+                    self.removeMessage(h)
+        finally:
+            self._lock.release()
+
+    def _getAddressState(self, address):
+       try:
+           k, as = self.addressStates[address]
+       except KeyError:
+           as = AddressState(address)
+           k = str(address)
+           self.addressStateDB[k] = as
+           self.addressStates[address] = as
+       return as
+
+    def sendReadyMessages(self, now=None):
+        if now is None:
+            now = time.time()
+        self._lock.acquire()
+        try:
+            messages = []
+            for h in self.store._metadata_cache.keys():
+                try:
+                    state = self.store.getMetadata(h)
+                except CorruptedFile:
+                    continue
+                if state.isPending():
+                    LOG.trace("     [%s] is pending delivery", h)
+                    continue
+                elif state.queuedTime + self.totalLifetime < now:
+                    LOG.trace("     [%s] is expired", h)
+                    self.removeMessage(h)
+                    continue
+                addressState = self._getAddressState(state.address)
+                if addressState.nextAttempt <= now:
+                    LOG.trace("     [%s] is ready for next attempt on %s", h,
+                              state.address)
+                    messages.append(PendingMessage(h,self,addr))
+                    state.setPending(now)
+                else:
+                    LOG.trace("     [%s] will wait for next attempt on %s",h,
+                              state.address)
+        finally:
+            self._lock.release()
+
+        if messages:
+            self._deliverMessages(messages)
+
+    def cleanQueue(self, secureDeleteFn=None):
+        self.sync()
+        self.store.cleanQueue(secureDeleteFn)
+
+    def sync(self):
+        self.addressStateDB.sync()
+
+    def deliverySucceeded(self, handle):
+        assert self.retrySchedule is not None
+        self._lock.acquire()
+        try:
+            LOG.trace("PerAddressDeliveryQueue got successful delivery for %s from %s",
+                      handle, self.qname)
+            try:
+                mState = self.getMetadata(handle)
+            except CorruptedFile:
+                mState = None
+            if mState:
+                aState = self._getAddressState(mState.address)
+                changed = aState.succeeded()
+                aState.setNextAttempt(self.retrySchedule)
+                self.addressStateDB[str(mState.address)] = aState
+
+            self.removeMessage(handle)
+        finally:
+            self._lock.release()
+
+    def deliveryFailed(self, handle, retriable=0, now=None):
+        assert self.retrySchedule is not None
+        if now is None:
+            now = time.time()
+        self._lock.acquire()
+        try:
+            try:
+                mState = self.getMetadata(handle)
+            except KeyError:
+                mState = None
+            except CorruptedFile:
+                mState = None
+
+            if mState is None:
+                # This should never happen
+                LOG.error_exc(sys.exc_info(),
+                              "Handle %s had no state; removing", handle)
+                self.removeMessage(handle)
+                return
+            elif not mState.isPending():
+                LOG.error("Handle %s was not pending", handle)
+                return
+
+            last = mState.pending
+            mState.setNonPending()
+            if not retriable:
+                LOG.trace("     (Giving up on %s)", handle)
+                self.removeMessage(handle)
+                return
+
+            aState = self._getAddressState(mState.address)
+            aState.failed(now=now)
+            aState.setNextAttempt(self.retrySchedule,now=now)
+            self.addressStateDB[str(aState.address)] = aState
+        finally:
+            self._lock.release()
+
 class TimedMixPool(mixminion.Filestore.ObjectStore):
     """A TimedMixPool holds a group of files, and returns some of them
        as requested, according to a mixing algorithm that sends a batch