[Author Prev][Author Next][Thread Prev][Thread Next][Author Index][Thread Index]
[minion-cvs] Trivial doc patch to servermain; add new (untested) ser...
Update of /home/minion/cvsroot/src/minion/lib/mixminion/server
In directory moria.mit.edu:/tmp/cvs-serv7114/lib/mixminion/server
Modified Files:
ServerMain.py ServerQueue.py
Log Message:
Trivial doc patch to servermain; add new (untested) server queue to handle per-address retry logic.
Index: ServerMain.py
===================================================================
RCS file: /home/minion/cvsroot/src/minion/lib/mixminion/server/ServerMain.py,v
retrieving revision 1.125
retrieving revision 1.126
diff -u -d -r1.125 -r1.126
--- ServerMain.py 26 Apr 2004 16:55:46 -0000 1.125
+++ ServerMain.py 14 May 2004 23:41:17 -0000 1.126
@@ -1,7 +1,7 @@
# Copyright 2002-2004 Nick Mathewson. See LICENSE for licensing information.
# $Id$
-"""mixminion.ServerMain
+"""mixminion.server.ServerMain
The main loop and related functionality for a Mixminion server.
Index: ServerQueue.py
===================================================================
RCS file: /home/minion/cvsroot/src/minion/lib/mixminion/server/ServerQueue.py,v
retrieving revision 1.39
retrieving revision 1.40
diff -u -d -r1.39 -r1.40
--- ServerQueue.py 2 May 2004 18:45:16 -0000 1.39
+++ ServerQueue.py 14 May 2004 23:41:17 -0000 1.40
@@ -6,23 +6,54 @@
Facilities for retriable delivery queues, and for mix pools.
"""
+import cPickle
import os
+import operator
import time
import stat
import sys
-import cPickle
import threading
import mixminion.Filestore
from mixminion.Common import MixError, MixFatalError, secureDelete, LOG, \
- createPrivateDir, readPickled, writePickled, formatTime, readFile
+ createPrivateDir, readPickled, writePickled, formatTime, readFile, \
+ ceilDiv
from mixminion.Crypto import getCommonPRNG
from mixminion.Filestore import CorruptedFile
__all__ = [ 'DeliveryQueue', 'TimedMixPool', 'CottrellMixPool',
'BinomialCottrellMixPool' ]
+def _calculateNext(lastAttempt, firstAttempt, retrySchedule, canDrop, now):
+ """DOCDOC"""
+ # If we've never tried to deliver the message, it's ready to
+ # go immediately.
+ if lastAttempt is None:
+ return now
+
+ # Otherwise, we count from the time the message was first queued,
+ # until we find a scheduled delivery that falls after the last
+ # attempted delivery.
+ #
+ # This scheduled delivery may be in the past. That's okay: it only
+ # means that we've missed a scheduled delivery, and we can try again
+ # immediately.
+ attempt = firstAttempt
+ for interval in retrySchedule:
+ attempt += interval
+ if attempt > lastAttempt:
+ return attempt
+
+ # Oops: there are no scheduled deliveries after the last delivery.
+ # Time to drop this message, or go into holding mode.
+ if canDrop:
+ return None
+ else:
+ attempt += (ceilDiv(lastAttempt-attempt+60,self.retrySchedule[-1]) *
+ self.retrySchedule[-1])
+ return attempt
+
class _DeliveryState:
"""Helper class: holds the state needed to schedule delivery or
eventual abandonment of a message in a DeliveryQueue."""
@@ -33,7 +64,7 @@
# deliver the message. (None means 'never').
# address: Pickleable object holding address information. Delivery
# code uses this field to group messages by address before loading
- # them all from disk.
+ # them all from disk. Must be usable as hash key.
# pendingAt: None (if we're not sending this message), or a time
# at which we begain sending this message.
# nextAttempt: None, or the time at which we'll next try to send
@@ -102,32 +133,11 @@
now = time.time()
self.remove = 0
- last = self.lastAttempt
-
- # If we've never tried to deliver the message, it's ready to
- # go immediately.
- if last is None:
- self.nextAttempt = now
- return
-
- # Otherwise, we count from the time the message was first queued,
- # until we find a scheduled delivery that falls after the last
- # attempted delivery.
- #
- # This scheduled delivery may be in the past. That's okay: it only
- # means that we've missed a scheduled delivery, and we can try again
- # immediately.
- attempt = self.queuedTime
- for interval in retrySchedule:
- attempt += interval
- if attempt > last:
- self.nextAttempt = attempt
- return
- # Oops: there are no scheduled deliveries after the last delivery.
- # Time to drop this message.
- self.nextAttempt = None
- self.remove = 1
+ self.nextAttempt = _calculateNext(self.lastAttempt, self.queuedTime,
+ retrySchedule, canDrop=1, now=now)
+ if self.nextAttempt is None:
+ self.remove = 1
def setLastAttempt(self, when):
"""Update time of the last attempted delivery."""
@@ -271,7 +281,7 @@
def _rebuildNextAttempt(self, now=None):
"""Helper: Reconstruct self.nextAttempt from self.retrySchedule and
- self.deliveryState.
+ self.deliveryState. DOCDOC
Callers must hold self._lock.
"""
@@ -360,10 +370,10 @@
if state.isPending():
LOG.trace(" [%s] is pending delivery", h)
continue
- elif state and state.isRemovable():
+ elif state.isRemovable():
LOG.trace(" [%s] is expired", h)
self.removeMessage(h)
- elif (not state) or state.nextAttempt <= now:
+ elif state.nextAttempt <= now:
LOG.trace(" [%s] is ready for delivery", h)
if state is None:
addr = None
@@ -479,6 +489,224 @@
finally:
self._lock.release()
+class AddressState:
+ """DOCDOC"""
+ def __init__(self, address):
+ self.address = address
+ self.lastSuccess = self.lastFailure = self.firstFailure
+
+ def __getstate__(self):
+ return ("ADDR-V1", self.address, self.lastSuccess,
+ self.lastFailure, self.firstFailure)
+
+ def __setstate__(self, state):
+ if state[0] == 'ADDR-V1':
+ _, self.address, self.lastSuccess, self.lastFailure, \
+ self.firstFailure = state
+ else:
+ #XXXX008 This is way too extreme.
+ raise MixFatalError("Unrecognized delivery state")
+
+ self.nextAttempt = None
+
+ def setNextAttempt(self, retrySchedule, now=None):
+ if not now:
+ now = time.time()
+
+ self.nextAttempt = _calculateNext(self.lastFailure,
+ self.firstFailure,
+ retrySchedule, canDrop=0, now=now)
+
+ def lastActivity(self):
+ events = [ e for e in [self.lastSuccess, self.lastFailure]
+ if e is not None ]
+ return max(events)
+
+ def succeeded(self, now=None):
+ if not now:
+ now = time.time()
+ self.lastSuccess = now
+ self.lastFailure = None
+ self.firstFailure = None
+
+ def failed(self, now=None):
+ if not now:
+ now = time.time()
+ if not self.firstFailure:
+ self.firstFailure = now
+ self.lastFailure = now
+
+class PerAddressDeliveryQueue(DeliveryQueue):
+ """DOCDOC"""
+ def __init__(self, location, retrySchedule=None, now=None, name=None):
+ self.addressStateDB = mixminion.Filestore.WritethroughShelf(
+ os.path.join(location,"addressStatus.db"))
+ DeliveryQueue.__init__(self=self, location=location,
+ retrySchedule=retrySchedule, now=now, name=name)
+
+ def sync(self):
+ self._lock.acquire()
+ try:
+ self.addressStateDB.sync()
+ finally:
+ self._lock.release()
+
+ def _rescan(self):
+ try:
+ self._lock.acquire()
+ DeliveryQueue._rescan(self)
+ finally:
+ self._lock.release()
+
+ def _rebuildNextAttempt(self, now=None):
+ self._lock.acquire()
+ try:
+ self.addressStates = {}
+ for k in self.addressStateDB.keys():
+ v = self.addressStateDB[k]
+ self.addressStates[k.address] = (k, v)
+ for ds in self.store._metadata_cache.values():
+ if not self.addressStates.has_key(ds.address):
+ as = AddressState(ds.address)
+ self.addressStateDB[str(ds.address)] = as
+ self.addressStates[ds.address] = (str(ds.address),as)
+ if not self.retrySchedule:
+ rs = [3600]
+ else:
+ rs = self.retrySchedule
+ self.totalLifetime = reduce(operator.plus,self.retrySchedule,0)
+ for k, as in self.addressStates.values():
+ as.setNextAttempt(rs, now)
+ self._repOk()
+ finally:
+ self._lock.release()
+
+ def _repOk(self):
+ pass
+
+ def removeExpiredMessages(self, now=None):
+ """DOCDOC"""
+ assert self.retrySchedule is not None
+ self._lock.acquire()
+ try:
+ for k, as in self.addressStates.values():
+ if as.lastActivity and (
+ as.lastActivity + self.totalLifetime < now):
+ del self.addressStates[as.address]
+ del self.addressStateDB[k]
+ for h, ds in self.store._metadata_cache.items():
+ if ds.queuedTime + self.totalLifetime < now:
+ self.removeMessage(h)
+ finally:
+ self._lock.release()
+
+ def _getAddressState(self, address):
+ try:
+ k, as = self.addressStates[address]
+ except KeyError:
+ as = AddressState(address)
+ k = str(address)
+ self.addressStateDB[k] = as
+ self.addressStates[address] = as
+ return as
+
+ def sendReadyMessages(self, now=None):
+ if now is None:
+ now = time.time()
+ self._lock.acquire()
+ try:
+ messages = []
+ for h in self.store._metadata_cache.keys():
+ try:
+ state = self.store.getMetadata(h)
+ except CorruptedFile:
+ continue
+ if state.isPending():
+ LOG.trace(" [%s] is pending delivery", h)
+ continue
+ elif state.queuedTime + self.totalLifetime < now:
+ LOG.trace(" [%s] is expired", h)
+ self.removeMessage(h)
+ continue
+ addressState = self._getAddressState(state.address)
+ if addressState.nextAttempt <= now:
+ LOG.trace(" [%s] is ready for next attempt on %s", h,
+ state.address)
+ messages.append(PendingMessage(h,self,addr))
+ state.setPending(now)
+ else:
+ LOG.trace(" [%s] will wait for next attempt on %s",h,
+ state.address)
+ finally:
+ self._lock.release()
+
+ if messages:
+ self._deliverMessages(messages)
+
+ def cleanQueue(self, secureDeleteFn=None):
+ self.sync()
+ self.store.cleanQueue(secureDeleteFn)
+
+ def sync(self):
+ self.addressStateDB.sync()
+
+ def deliverySucceeded(self, handle):
+ assert self.retrySchedule is not None
+ self._lock.acquire()
+ try:
+ LOG.trace("PerAddressDeliveryQueue got successful delivery for %s from %s",
+ handle, self.qname)
+ try:
+ mState = self.getMetadata(handle)
+ except CorruptedFile:
+ mState = None
+ if mState:
+ aState = self._getAddressState(mState.address)
+ changed = aState.succeeded()
+ aState.setNextAttempt(self.retrySchedule)
+ self.addressStateDB[str(mState.address)] = aState
+
+ self.removeMessage(handle)
+ finally:
+ self._lock.release()
+
+ def deliveryFailed(self, handle, retriable=0, now=None):
+ assert self.retrySchedule is not None
+ if now is None:
+ now = time.time()
+ self._lock.acquire()
+ try:
+ try:
+ mState = self.getMetadata(handle)
+ except KeyError:
+ mState = None
+ except CorruptedFile:
+ mState = None
+
+ if mState is None:
+ # This should never happen
+ LOG.error_exc(sys.exc_info(),
+ "Handle %s had no state; removing", handle)
+ self.removeMessage(handle)
+ return
+ elif not mState.isPending():
+ LOG.error("Handle %s was not pending", handle)
+ return
+
+ last = mState.pending
+ mState.setNonPending()
+ if not retriable:
+ LOG.trace(" (Giving up on %s)", handle)
+ self.removeMessage(handle)
+ return
+
+ aState = self._getAddressState(mState.address)
+ aState.failed(now=now)
+ aState.setNextAttempt(self.retrySchedule,now=now)
+ self.addressStateDB[str(aState.address)] = aState
+ finally:
+ self._lock.release()
+
class TimedMixPool(mixminion.Filestore.ObjectStore):
"""A TimedMixPool holds a group of files, and returns some of them
as requested, according to a mixing algorithm that sends a batch