[Author Prev][Author Next][Thread Prev][Thread Next][Author Index][Thread Index]
[minion-cvs] Make retry logic check when the last attempt to *this a...
Update of /home/minion/cvsroot/src/minion/lib/mixminion/server
In directory moria.mit.edu:/tmp/cvs-serv22094/lib/mixminion/server
Modified Files:
ServerMain.py ServerQueue.py
Log Message:
Make retry logic check when the last attempt to *this address* failed, not when this message failed. This has subtle anonymity implications; the new way is better.
Index: ServerMain.py
===================================================================
RCS file: /home/minion/cvsroot/src/minion/lib/mixminion/server/ServerMain.py,v
retrieving revision 1.126
retrieving revision 1.127
diff -u -d -r1.126 -r1.127
--- ServerMain.py 14 May 2004 23:41:17 -0000 1.126
+++ ServerMain.py 17 May 2004 05:19:09 -0000 1.127
@@ -307,7 +307,7 @@
mix."""
return now + self.queue.getInterval()
-class OutgoingQueue(mixminion.server.ServerQueue.DeliveryQueue):
+class OutgoingQueue(mixminion.server.ServerQueue.PerAddressDeliveryQueue):
"""DeliveryQueue to send packets via outgoing MMTP connections. All
methods on this class are called from the main thread. The underlying
objects in this queue are instances of RelayedPacket.
Index: ServerQueue.py
===================================================================
RCS file: /home/minion/cvsroot/src/minion/lib/mixminion/server/ServerQueue.py,v
retrieving revision 1.40
retrieving revision 1.41
diff -u -d -r1.40 -r1.41
--- ServerQueue.py 14 May 2004 23:41:17 -0000 1.40
+++ ServerQueue.py 17 May 2004 05:19:09 -0000 1.41
@@ -23,7 +23,7 @@
from mixminion.Filestore import CorruptedFile
__all__ = [ 'DeliveryQueue', 'TimedMixPool', 'CottrellMixPool',
- 'BinomialCottrellMixPool' ]
+ 'BinomialCottrellMixPool', 'PerAddressDeliveryQueue' ]
def _calculateNext(lastAttempt, firstAttempt, retrySchedule, canDrop, now):
"""DOCDOC"""
@@ -50,8 +50,11 @@
if canDrop:
return None
else:
- attempt += (ceilDiv(lastAttempt-attempt+60,self.retrySchedule[-1]) *
- self.retrySchedule[-1])
+ if not retrySchedule or retrySchedule[-1]<5:
+ #DOCDOC
+ retrySchedule = [3600]
+ attempt += (ceilDiv(lastAttempt-attempt+60,retrySchedule[-1]) *
+ retrySchedule[-1])
return attempt
class _DeliveryState:
@@ -166,10 +169,10 @@
def getHandle(self):
return self.handle
- def succeeded(self):
+ def succeeded(self,now=None):
"""Mark this message as having been successfully deleted, removing
it from the queue."""
- self.queue.deliverySucceeded(self.handle)
+ self.queue.deliverySucceeded(self.handle,now=now)
self.queue = self.message = None
def failed(self, retriable=0, now=None):
@@ -200,7 +203,8 @@
as appropriate.
Users of this class will probably only want to call the
- queueMessage, sendReadyMessages, and nextMessageReadyAt methods.
+ queueDeliveryMessage, sendReadyMessages, and nextMessageReadyAt
+ methods.
This class caches information about the directory state; it won't
play nice if multiple instances are looking at the same directory.
@@ -233,7 +237,7 @@
self.setRetrySchedule(retrySchedule, now)
else:
self.setRetrySchedule([0], now)
- self._repOk()
+ self._repOK()
def setRetrySchedule(self, schedule, now=None):
"""Set the retry schedule for this queue. A retry schedule is
@@ -267,7 +271,7 @@
self._lock.acquire()
self.store.loadAllMetadata(lambda h: _DeliveryState())
self._rebuildNextAttempt(now)
- self._repOk()
+ self._repOK()
finally:
self._lock.release()
@@ -292,9 +296,9 @@
for ds in self.store._metadata_cache.values():
ds.setNextAttempt(rs, now)
- self._repOk()
+ self._repOK()
- def _repOk(self):
+ def _repOK(self):
"""Raise an assertion error if the internal state of this object is
nonsensical."""
# XXXX Later in the release cycle, we should call this *even* less.
@@ -330,7 +334,7 @@
def _inspect(self,handle):
"""Returns a (msg, inserted, lastAttempt, nextAttempt) tuple
for a given message handle. For testing. """
- self._repOk()
+ self._repOK()
o = self.store.getObject(handle)
ds = self.store.getMetadata(handle)
return (o, ds.queuedTime, ds.lastAttempt, ds.nextAttempt)
@@ -354,7 +358,7 @@
"""Sends all messages which are not already being sent, and which
are scheduled to be sent."""
assert self.retrySchedule is not None
- self._repOk()
+ self._repOK()
if now is None:
now = time.time()
LOG.trace("DeliveryQueue checking for deliverable messages in %s",
@@ -388,7 +392,7 @@
if messages:
self._deliverMessages(messages)
- self._repOk()
+ self._repOK()
def _deliverMessages(self, msgList):
"""Abstract method; Invoked with a list of PendingMessage objects
@@ -419,7 +423,7 @@
finally:
self._lock.release()
- def deliverySucceeded(self, handle):
+ def deliverySucceeded(self, handle, now=None):
"""Removes a message from the outgoing queue. This method
should be invoked after the corresponding message has been
successfully delivered.
@@ -489,11 +493,11 @@
finally:
self._lock.release()
-class AddressState:
- """DOCDOC"""
+class _AddressState:
+ """DOCDOsC"""
def __init__(self, address):
self.address = address
- self.lastSuccess = self.lastFailure = self.firstFailure
+ self.lastSuccess = self.lastFailure = self.firstFailure = None
def __getstate__(self):
return ("ADDR-V1", self.address, self.lastSuccess,
@@ -517,10 +521,13 @@
self.firstFailure,
retrySchedule, canDrop=0, now=now)
- def lastActivity(self):
+ def getLastActivity(self):
events = [ e for e in [self.lastSuccess, self.lastFailure]
if e is not None ]
- return max(events)
+ if events:
+ return max(events)
+ else:
+ return None
def succeeded(self, now=None):
if not now:
@@ -529,19 +536,22 @@
self.lastFailure = None
self.firstFailure = None
- def failed(self, now=None):
+ def failed(self, attempt, now=None):
if not now:
now = time.time()
if not self.firstFailure:
- self.firstFailure = now
- self.lastFailure = now
+ self.firstFailure = attempt
+ self.lastFailure = attempt
class PerAddressDeliveryQueue(DeliveryQueue):
"""DOCDOC"""
def __init__(self, location, retrySchedule=None, now=None, name=None):
- self.addressStateDB = mixminion.Filestore.WritethroughShelf(
- os.path.join(location,"addressStatus.db"))
- DeliveryQueue.__init__(self=self, location=location,
+ self.addressStateDB = mixminion.Filestore.WritethroughDict(
+ filename=os.path.join(location,"addressStatus.db"),
+ purpose="address state")
+ if retrySchedule is None:
+ retrySchedule = [3600]
+ DeliveryQueue.__init__(self, location=location,
retrySchedule=retrySchedule, now=now, name=name)
def sync(self):
@@ -561,27 +571,24 @@
def _rebuildNextAttempt(self, now=None):
self._lock.acquire()
try:
- self.addressStates = {}
- for k in self.addressStateDB.keys():
- v = self.addressStateDB[k]
- self.addressStates[k.address] = (k, v)
for ds in self.store._metadata_cache.values():
- if not self.addressStates.has_key(ds.address):
- as = AddressState(ds.address)
+ if not self.addressStateDB.has_key(str(ds.address)):
+ as = _AddressState(ds.address)
self.addressStateDB[str(ds.address)] = as
- self.addressStates[ds.address] = (str(ds.address),as)
if not self.retrySchedule:
rs = [3600]
+ self.totalLifetime = 3600
else:
rs = self.retrySchedule
- self.totalLifetime = reduce(operator.plus,self.retrySchedule,0)
- for k, as in self.addressStates.values():
+ self.totalLifetime = reduce(operator.add,self.retrySchedule,0)
+ for as in self.addressStateDB.values():
as.setNextAttempt(rs, now)
- self._repOk()
+ self._repOK()
finally:
self._lock.release()
- def _repOk(self):
+ def _repOK(self):
+ #XXXX008
pass
def removeExpiredMessages(self, now=None):
@@ -589,26 +596,34 @@
assert self.retrySchedule is not None
self._lock.acquire()
try:
- for k, as in self.addressStates.values():
- if as.lastActivity and (
- as.lastActivity + self.totalLifetime < now):
- del self.addressStates[as.address]
- del self.addressStateDB[k]
+ have = {}
for h, ds in self.store._metadata_cache.items():
if ds.queuedTime + self.totalLifetime < now:
self.removeMessage(h)
+ else:
+ have[ds.address]=1
+
+ for k, as in self.addressStateDB.items():
+ if have.has_key(as.address):
+ continue
+ lastActivity = as.getLastActivity()
+ if lastActivity and (
+ lastActivity + self.totalLifetime < now):
+ del self.addressStateDB[k]
finally:
self._lock.release()
- def _getAddressState(self, address):
- try:
- k, as = self.addressStates[address]
- except KeyError:
- as = AddressState(address)
- k = str(address)
- self.addressStateDB[k] = as
- self.addressStates[address] = as
- return as
+ def _getAddressState(self, address, now=None):
+ try:
+ as = self.addressStateDB[str(address)]
+ except KeyError:
+ as = self.addressStateDB[str(address)] = _AddressState(address)
+ as.setNextAttempt(self.retrySchedule, now)
+ return as
+
+ def queueDeliveryMessage(self, msg, address, now=None):
+ self._getAddressState(address, now=now)
+ return DeliveryQueue.queueDeliveryMessage(self,msg,address,now)
def sendReadyMessages(self, now=None):
if now is None:
@@ -628,11 +643,11 @@
LOG.trace(" [%s] is expired", h)
self.removeMessage(h)
continue
- addressState = self._getAddressState(state.address)
+ addressState = self._getAddressState(state.address, now)
if addressState.nextAttempt <= now:
LOG.trace(" [%s] is ready for next attempt on %s", h,
state.address)
- messages.append(PendingMessage(h,self,addr))
+ messages.append(PendingMessage(h,self,state.address))
state.setPending(now)
else:
LOG.trace(" [%s] will wait for next attempt on %s",h,
@@ -650,20 +665,23 @@
def sync(self):
self.addressStateDB.sync()
- def deliverySucceeded(self, handle):
+ def close(self):
+ self.addressStateDB.close()
+
+ def deliverySucceeded(self, handle, now=None):
assert self.retrySchedule is not None
self._lock.acquire()
try:
LOG.trace("PerAddressDeliveryQueue got successful delivery for %s from %s",
handle, self.qname)
try:
- mState = self.getMetadata(handle)
+ mState = self.store.getMetadata(handle)
except CorruptedFile:
mState = None
if mState:
- aState = self._getAddressState(mState.address)
- changed = aState.succeeded()
- aState.setNextAttempt(self.retrySchedule)
+ aState = self._getAddressState(mState.address, now)
+ changed = aState.succeeded(now=now)
+ aState.setNextAttempt(self.retrySchedule, now)
self.addressStateDB[str(mState.address)] = aState
self.removeMessage(handle)
@@ -677,7 +695,7 @@
self._lock.acquire()
try:
try:
- mState = self.getMetadata(handle)
+ mState = self.store.getMetadata(handle)
except KeyError:
mState = None
except CorruptedFile:
@@ -698,15 +716,39 @@
if not retriable:
LOG.trace(" (Giving up on %s)", handle)
self.removeMessage(handle)
- return
- aState = self._getAddressState(mState.address)
- aState.failed(now=now)
+ aState = self._getAddressState(mState.address, now)
+ aState.failed(attempt=last,now=now)
aState.setNextAttempt(self.retrySchedule,now=now)
- self.addressStateDB[str(aState.address)] = aState
+ self.addressStateDB[str(aState.address)] = aState # flush to db.
finally:
self._lock.release()
+ def _inspect(self,handle):
+ """Returns a (msg, state, addressState) tuple for a given
+ message handle. For testing."""
+ self._repOK()
+ o = self.store.getObject(handle)
+ ds = self.store.getMetadata(handle)
+ as = self._getAddressState(ds.address)
+ return (o, ds, as)
+
+ def _repOK(self):
+ """Raise an assertion error if the internal state of this object is
+ nonsensical."""
+ # XXXX Later in the release cycle, we should call this *even* less.
+ # XXXX It adds ~8-9ms on my laptop for ~400 messages
+ self._lock.acquire()
+ try:
+ DeliveryQueue._repOK(self)
+ for h in self.store._metadata_cache.keys():
+ ds = self.store._metadata_cache[h]
+ as = self._getAddressState(ds.address)
+ assert as.address == ds.address
+ finally:
+ self._lock.release()
+
+
class TimedMixPool(mixminion.Filestore.ObjectStore):
"""A TimedMixPool holds a group of files, and returns some of them
as requested, according to a mixing algorithm that sends a batch