[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]
[minion-cvs] Configurable, smart retry logic; lots of refactoring; m...
Update of /home/minion/cvsroot/src/minion/lib/mixminion/server
In directory moria.mit.edu:/tmp/cvs-serv3319/lib/mixminion/server
Modified Files:
Modules.py ServerConfig.py ServerMain.py ServerQueue.py
Log Message:
Configurable, smart retry logic; lots of refactoring; more tests.
mixminiond.conf, ServerQueue, Modules, ServerMain, test:
- Add more sophisticated, more configurable retry logic
Modules, ServerQueue, ServerMain, test:
- Refactor queueDeliveryMessage to not have a bogus 'addr' argument.
ServerMain:
- Change OutgoingQueue to contain instances of RelayedPacket, not 2-tuples of
(IPv4Info, Packet).
ServerQueue:
- Remove dead code
test:
- Test payloads for drop messages.
- Test retry logic on server queues.
- Tests for DeliveryPacket methods
- Tests for connection padding and key renegotiation
- Use new queueDeliveryMessage interface
- Fix bug that generated server descs of the form "5 days days"
Index: Modules.py
===================================================================
RCS file: /home/minion/cvsroot/src/minion/lib/mixminion/server/Modules.py,v
retrieving revision 1.24
retrieving revision 1.25
diff -u -d -r1.24 -r1.25
--- Modules.py 10 Jan 2003 20:12:05 -0000 1.24
+++ Modules.py 13 Jan 2003 06:35:52 -0000 1.25
@@ -30,7 +30,9 @@
import mixminion.Config
import mixminion.Packet
import mixminion.server.ServerQueue
-from mixminion.Config import ConfigError, _parseBoolean, _parseCommand
+import mixminion.server.ServerConfig
+from mixminion.Config import ConfigError, _parseBoolean, _parseCommand, \
+ _parseIntervalList
from mixminion.Common import LOG, createPrivateDir, MixError, isSMTPMailbox, \
isPrintingAscii
from mixminion.BuildMessage import CompressedDataTooLong
@@ -59,6 +61,10 @@
"Zero-argument constructor, as required by Module protocol."
pass
+ def getRetrySchedule(self):
+ "DOCDOC"
+ return None
+
def getConfigSyntax(self):
"""Return a map from section names to section syntax, as described
in Config.py"""
@@ -102,7 +108,8 @@
Otherwise, the message is either a reply or an encrypted
forward message.
"""
- return SimpleModuleDeliveryQueue(self, queueDir)
+ return SimpleModuleDeliveryQueue(self, queueDir,
+ retrySchedule=self.getRetrySchedule())
def processMessage(self, packet):
"""Given a DeliveryPacket object, try to delier it. Return one of:
@@ -124,10 +131,9 @@
def __init__(self, module):
self.module = module
- def queueDeliveryMessage(self, addr, packet, retry=0):
+ def queueDeliveryMessage(self, packet, retry=0, lastAttempt=0):
"""Instead of queueing our message, pass it directly to the underlying
DeliveryModule."""
- assert addr is None
try:
res = self.module.processMessage(packet)
if res == DELIVER_OK:
@@ -153,13 +159,13 @@
don't care about batching messages to like addresses."""
## Fields:
# module: the underlying module.
- def __init__(self, module, directory):
- mixminion.server.ServerQueue.DeliveryQueue.__init__(self, directory)
+ def __init__(self, module, directory, retrySchedule=None):
+ mixminion.server.ServerQueue.DeliveryQueue.__init__(self, directory,
+ retrySchedule)
self.module = module
def _deliverMessages(self, msgList):
- for handle, addr, packet, n_retries in msgList:
- assert addr is None
+ for handle, packet, n_retries in msgList:
try:
result = self.module.processMessage(packet)
if result == DELIVER_OK:
@@ -174,11 +180,6 @@
"Exception delivering message")
self.deliveryFailed(handle, 0)
- def queueDeliveryMessage(self, addr, packet, retry=0):
- assert addr is None
- mixminion.server.ServerQueue.DeliveryQueue.queueDeliveryMessage(
- self, None, packet, retry)
-
class DeliveryThread(threading.Thread):
def __init__(self, moduleManager):
threading.Thread.__init__(self)
@@ -375,9 +376,6 @@
if self.enabled.has_key(module.getName()):
del self.enabled[module.getName()]
- def queueMessage2(self, packet):
- self.queueDecodedMessage(packet)
-
def queueDecodedMessage(self, packet):
#DOCDOC
exitType = packet.getExitType()
@@ -391,7 +389,7 @@
LOG.debug("Delivering message %r (type %04x) via module %s",
packet.getContents()[:8], exitType, mod.getName())
- queue.queueDeliveryMessage(None, packet)
+ queue.queueDeliveryMessage(packet)
#DOCDOC
def shutdown(self):
@@ -422,6 +420,8 @@
"""Null-object pattern: drops all messages it receives."""
def getConfigSyntax(self):
return { }
+ def getRetrySchedule(self):
+ return [ ]
def getServerInfoBlock(self):
return ""
def configure(self, config, manager):
@@ -590,11 +590,16 @@
DeliveryModule.__init__(self)
self.addresses = {}
+ def getRetrySchedule(self):
+ return self.retrySchedule #DOCDOC
+
def getConfigSyntax(self):
# FFFF There should be some way to say that fields are required
# FFFF if the module is enabled.
return { "Delivery/MBOX" :
{ 'Enabled' : ('REQUIRE', _parseBoolean, "no"),
+ 'Retry': ('ALLOW', _parseIntervalList,
+ "7 hours for 6 days"),
'AddressFile' : ('ALLOW', None, None),
'ReturnAddress' : ('ALLOW', None, None),
'RemoveContact' : ('ALLOW', None, None),
@@ -617,6 +622,10 @@
LOG.warn("Value of %s (%s) doesn't look like an email address",
field, sec[field])
+ mixInterval = sections['Server']['MixInterval'][2]
+ mixminion.server.ServerConfig._validateRetrySchedule(
+ mixInterval, entries, "Delivery/MBOX")
+
def configure(self, config, moduleManager):
if not config['Delivery/MBOX'].get("Enabled", 0):
moduleManager.disableModule(self)
@@ -627,6 +636,7 @@
self.addressFile = sec['AddressFile']
self.returnAddress = sec['ReturnAddress']
self.contact = sec['RemoveContact']
+ self.retrySchedule = sec['Retry']
# validate should have caught these.
assert (self.server and self.addressFile and self.returnAddress
and self.contact)
@@ -735,9 +745,14 @@
def __init__(self):
SMTPModule.__init__(self)
+ def getRetrySchedule(self):
+ return self.retrySchedule #DOCDOC
+
def getConfigSyntax(self):
return { "Delivery/SMTP" :
{ 'Enabled' : ('REQUIRE', _parseBoolean, "no"),
+ 'Retry': ('ALLOW', _parseIntervalList,
+ "7 hours for 6 days"),
'BlacklistFile' : ('ALLOW', None, None),
'SMTPServer' : ('ALLOW', None, 'localhost'),
'Message' : ('ALLOW', None, ""),
@@ -762,6 +777,10 @@
LOG.warn("Return address (%s) doesn't look like an email address",
sec['ReturnAddress'])
+ mixInterval = sections['Server']['MixInterval'][2]
+ mixminion.server.ServerConfig._validateRetrySchedule(
+ mixInterval, entries, "Delivery/SMTP")
+
def configure(self, config, manager):
sec = config['Delivery/SMTP']
if not sec.get('Enabled'):
@@ -769,6 +788,7 @@
return
self.server = sec['SMTPServer']
+ self.retrySchedule = sec['Retry']
if sec['BlacklistFile']:
self.blacklist = EmailAddressSet(fname=sec['BlacklistFile'])
else:
@@ -824,9 +844,14 @@
def __init__(self):
SMTPModule.__init__(self)
+ def getRetrySchedule(self):
+ return self.retrySchedule #DOCDOC
+
def getConfigSyntax(self):
return { "Delivery/SMTP-Via-Mixmaster" :
{ 'Enabled' : ('REQUIRE', _parseBoolean, "no"),
+ 'Retry': ('ALLOW', _parseIntervalList,
+ "7 hours for 6 days"),
'MixCommand' : ('REQUIRE', _parseCommand, None),
'Server' : ('REQUIRE', None, None),
'SubjectLine' : ('ALLOW', None,
@@ -835,9 +860,14 @@
}
def validateConfig(self, sections, entries, lines, contents):
- # Currently, we accept any configuration options that the config allows
- pass
-
+ #XXXX write more
+ sec = sections['Delivery/SMTP-Via-Mixmaster']
+ if not sec.get("Enabled"):
+ return
+ mixInterval = sections['Server']['MixInterval'][2]
+ mixminion.server.ServerConfig._validateRetrySchedule(
+ mixInterval, entries, "Delivery/SMTP-Via-Mixmaster")
+
def configure(self, config, manager):
sec = config['Delivery/SMTP-Via-Mixmaster']
if not sec.get("Enabled", 0):
@@ -846,6 +876,7 @@
cmd = sec['MixCommand']
self.server = sec['Server']
self.subject = sec['SubjectLine']
+ self.retrySchedule = sec['Retry']
self.command = cmd[0]
self.options = tuple(cmd[1]) + ("-l", self.server,
"-s", self.subject)
Index: ServerConfig.py
===================================================================
RCS file: /home/minion/cvsroot/src/minion/lib/mixminion/server/ServerConfig.py,v
retrieving revision 1.14
retrieving revision 1.15
diff -u -d -r1.14 -r1.15
--- ServerConfig.py 6 Jan 2003 12:57:48 -0000 1.14
+++ ServerConfig.py 13 Jan 2003 06:35:52 -0000 1.15
@@ -63,7 +63,8 @@
if _haveEntry(entries, 'Server', 'Mode'):
LOG.warn("Mode specification is not yet supported.")
- if server['MixInterval'][2] < 30*60:
+ mixInterval = server['MixInterval'][2]
+ if mixInterval < 30*60:
LOG.warn("Dangerously low MixInterval")
if server['MixAlgorithm'] == 'TimedMixQueue':
if _haveEntry(entries, 'Server', 'MixPoolRate'):
@@ -88,6 +89,8 @@
if [e for e in entries['Outgoing/MMTP'] if e[0] in ('Allow', 'Deny')]:
LOG.warn("Allow/deny are not yet supported")
+ _validateRetrySchedule(mixInterval, entries, "Outgoing/MMTP")
+
self.moduleManager.validate(sections, entries, lines, contents)
def __loadModules(self, section, sectionEntries):
@@ -105,6 +108,41 @@
"Return the module manager initialized by this server."
return self.moduleManager
+def _validateRetrySchedule(mixinterval, entries, sectionname,
+ entryname='Retry'):
+ #XXXX writeme.
+ entry = [e for e in entries.get(sectionname,[]) if e[0] == entryname]
+ if not entry:
+ return
+ assert len(entry) == 1
+ sched = entry[0][1]
+ total = reduce(operator.add, sched, 0)
+
+ # Warn if we try for less than a day.
+ if total < 24*60*60:
+ LOG.warn("Dangerously low retry timeout for %s (<1 day)", sectionname)
+
+ # Warn if we try for more than two weeks.
+ if total > 2*7*24*60*60:
+ LOG.warn("Very high retry timeout for %s (>14 days)", sectionname)
+
+ # Warn if any of our intervals are less than the mix interval...
+ if min(sched) < mixInterval-2:
+ LOG.warn("Rounding retry intervals for %s to the nearest mix interval",
+ sectionname)
+
+ # ... or less than 5 minutes.
+ elif min(sched) < 5*60:
+ LOG.warn("Very fast retry intervals for %s (< 5 minutes)", sectionname)
+
+ # Warn if we make fewer than 5 attempts.
+ if len(sched) < 5:
+ LOG.warn("Dangerously low number of retries for %s (<5)", sectionname)
+
+ # Warn if we make more than 50 attempts.
+ if len(sched) > 50:
+ LOG.warn("Very high number of retries for %s (>50)", sectionname)
+
#======================================================================
_MIX_RULE_NAMES = {
@@ -192,6 +230,8 @@
'Deny' : ('ALLOW*', C._parseAddressSet_deny, None)
},
'Outgoing/MMTP' : { 'Enabled' : ('REQUIRE', C._parseBoolean, "no"),
+ 'Retry' : ('ALLOW', C._parseIntervalList,
+ ".5 hour for 1 day, 7 hours for 5 days"),
'Allow' : ('ALLOW*', C._parseAddressSet_allow, None),
'Deny' : ('ALLOW*', C._parseAddressSet_deny, None) },
# FFFF Missing: Queue-Size / Queue config options
Index: ServerMain.py
===================================================================
RCS file: /home/minion/cvsroot/src/minion/lib/mixminion/server/ServerMain.py,v
retrieving revision 1.31
retrieving revision 1.32
diff -u -d -r1.31 -r1.32
--- ServerMain.py 10 Jan 2003 20:12:05 -0000 1.31
+++ ServerMain.py 13 Jan 2003 06:35:52 -0000 1.32
@@ -180,11 +180,9 @@
formatBase64(packet.getContents()[:8]))
self.moduleManager.queueDecodedMessage(packet)
else:
- ipv4 = packet.getAddress()
- msg = packet.getPacket()
LOG.debug(" (sending message %s to MMTP server)",
- formatBase64(msg[:8]))
- self.outgoingQueue.queueDeliveryMessage(ipv4, msg)
+ formatBase64(packet.getPacket()[:8]))
+ self.outgoingQueue.queueDeliveryMessage(packet)
self.queue.removeMessage(h)
def getNextMixTime(self, now):
@@ -194,8 +192,8 @@
class OutgoingQueue(mixminion.server.ServerQueue.DeliveryQueue):
"""DeliveryQueue to send messages via outgoing MMTP connections. All
- methods on this class are called from the main thread. The addresses
- in this queue are pickled IPV4Info objects.
+ methods on this class are called from the main thread. The underlying
+ objects in this queue are instances of RelayedPacket.
All methods in this class are run from the main thread.
"""
@@ -205,6 +203,10 @@
mixminion.server.ServerQueue.DeliveryQueue.__init__(self, location)
self.server = None
+ def configure(self, config):
+ retry = config['Outgoing/MMTP']['Retry']
+ self.setRetrySchedule(retry)
+
def connectQueues(self, server):
"""Set the MMTPServer that this OutgoingQueue informs of its
deliverable messages."""
@@ -214,7 +216,10 @@
"Implementation of abstract method from DeliveryQueue."
# Map from addr -> [ (handle, msg) ... ]
msgs = {}
- for handle, addr, message, n_retries in msgList:
+ # XXXX SKIP DEAD MESSAGES!!!!
+ for handle, packet, n_retries in msgList:
+ addr = packet.getAddress()
+ message = packet.getPacket()
msgs.setdefault(addr, []).append( (handle, message) )
for addr, messages in msgs.items():
handles, messages = zip(*messages)
@@ -408,6 +413,7 @@
outgoingDir = os.path.join(queueDir, "outgoing")
LOG.debug("Initializing outgoing queue")
self.outgoingQueue = OutgoingQueue(outgoingDir)
+ self.outgoingQueue.configure(config)
LOG.debug("Found %d pending messages in outgoing queue",
self.outgoingQueue.count())
Index: ServerQueue.py
===================================================================
RCS file: /home/minion/cvsroot/src/minion/lib/mixminion/server/ServerQueue.py,v
retrieving revision 1.2
retrieving revision 1.3
diff -u -d -r1.2 -r1.3
--- ServerQueue.py 10 Jan 2003 20:12:05 -0000 1.2
+++ ServerQueue.py 13 Jan 2003 06:35:52 -0000 1.3
@@ -30,10 +30,6 @@
# trash.
INPUT_TIMEOUT = 6000
-## # If we've been cleaning for more than CLEAN_TIMEOUT seconds, assume the
-## # old clean is dead.
-## CLEAN_TIMEOUT = 120
-
class Queue:
"""A Queue is an unordered collection of files with secure insert, move,
and delete operations.
@@ -257,39 +253,6 @@
# We don't need to hold the lock here; we synchronize via the
# filesystem.
-# XXXX this logic never worked anyway; now we do all our cleaning in a separate
-# XXXX thread anyway.
-
-## now = time.time()
-## cleanFile = os.path.join(self.dir,".cleaning")
-##
-## cleaning = 1
-## while cleaning:
-## try:
-## # Try to get the .cleaning lock file. If we can create it,
-## # we're the only cleaner around.
-## fd = os.open(cleanFile, os.O_WRONLY+os.O_CREAT+os.O_EXCL, 0600)
-## os.write(fd, str(now))
-## os.close(fd)
-## cleaning = 0
-## except OSError:
-## try:
-## # If we can't create the file, see if it's too old. If it
-## # is too old, delete it and try again. If it isn't, there
-## # may be a live clean in progress.
-## s = os.stat(cleanFile)
-## if now - s[stat.ST_MTIME] > CLEAN_TIMEOUT:
-## os.unlink(cleanFile)
-## else:
-## return 1
-## except OSError:
-## # If the 'stat' or 'unlink' calls above fail, then
-## # .cleaning must not exist, or must not be readable
-## # by us.
-## if os.path.exists(cleanFile):
-## # In the latter case, bail out.
-## return 1
-
rmv = []
allowedTime = int(time.time()) - INPUT_TIMEOUT
for m in os.listdir(self.dir):
@@ -364,9 +327,16 @@
# pending -- Dict from handle->1, for all messages that we're
# currently sending.
- def __init__(self, location):
+ def __init__(self, location, retrySchedule=None):
Queue.__init__(self, location, create=1, scrub=1)
self._rescan()
+ if retrySchedule is None:
+ self.retrySchedule = None
+ else:
+ self.retrySchedule = retrySchedule[:]
+
+ def setRetrySchedule(self, schedule):#DOCDOC
+ self.retrySchedule = schedule[:]
def _rescan(self):
"""Rebuild the internal state of this queue from the underlying
@@ -381,15 +351,16 @@
def queueMessage(self, msg):
if 1: raise MixError("Tried to call DeliveryQueue.queueMessage.")
- def queueDeliveryMessage(self, addr, msg, retry=0):
+ def queueDeliveryMessage(self, msg, retry=0, nextAttempt=0):
"""Schedule a message for delivery.
addr -- An object to indicate the message's destination
msg -- the message itself
- retry -- how many times so far have we tried to send?"""
-
+ retry -- how many times so far have we tried to send?
+ DOCDOC"""
try:
self._lock.acquire()
- handle = self.queueObject( (retry, addr, msg) )
+ #DOCDOC nextAttempt
+ handle = self.queueObject( (retry, None, msg, nextAttempt) )
self.sendable.append(handle)
finally:
self._lock.release()
@@ -397,22 +368,29 @@
return handle
def get(self,handle):
- """Returns a (n_retries, addr, msg) payload for a given
+ """Returns a (n_retries, addr, msg, nextAttempt?) payload for a given
message handle."""
- return self.getObject(handle)
+ o = self.getObject(handle)
+ if len(o) == 3:# XXXX For legacy queues; delete after 0.0.3
+ o = o + (0,)
+ return o[0], o[2], o[3]
- def sendReadyMessages(self):
+ def sendReadyMessages(self, now=None):
"""Sends all messages which are not already being sent."""
-
+ if now is None:
+ now = time.time()
try:
self._lock.acquire()
handles = self.sendable
messages = []
self.sendable = []
for h in handles:
- retries, addr, msg = self.getObject(h)
- messages.append((h, addr, msg, retries))
- self.pending[h] = 1
+ retries,msg, nextAttempt = self.get(h)
+ if nextAttempt <= now:
+ messages.append((h, msg, retries))
+ self.pending[h] = now
+ else:
+ self.sendable.append(h)
finally:
self._lock.release()
if messages:
@@ -420,7 +398,7 @@
def _deliverMessages(self, msgList):
"""Abstract method; Invoked with a list of
- (handle, addr, message, n_retries) tuples every time we have a batch
+ (handle, message, n_retries) tuples every time we have a batch
of messages to send.
For every handle in the list, delierySucceeded or deliveryFailed
@@ -440,6 +418,7 @@
"""
try:
self._lock.acquire()
+ #XXXX003 be more robust in the presence of errors here.
self.removeMessage(handle)
del self.pending[handle]
finally:
@@ -451,16 +430,40 @@
invoked after the corresponding message has been
successfully delivered."""
try:
+ #XXXX003 be more robust in the presence of errors here.
self._lock.acquire()
+ lastAttempt = self.pending[handle]
del self.pending[handle]
if retriable:
# Queue the new one before removing the old one, for
- # crash-proofness
- retries, addr, msg = self.getObject(handle)
- # FFFF This test makes us never retry past the 10th attempt.
- # FFFF That's wrong; we should be smarter.
- if retries <= 10:
- self.queueDeliveryMessage(addr, msg, retries+1)
+ # crash-proofness. First, fetch the old information...
+ retries, msg, schedAttempt = self.get(handle)
+
+ # Multiple retry intervals may have passed in between the most
+ # recent failed delivery attempt (lastAttempt) and the time
+ # it was schedule (schedAttempt). Increment 'retries' and
+ # efore it (prevAttempt). Increment 'retries' to reflect the
+ # number of retry intervals that have passed between first
+ # sending the message and nextAttempt.
+ #DOCDOC
+ if self.retrySchedule and retries < len(self.retrySchedule):
+ nextAttempt = schedAttempt
+ if nextAttempt == 0:
+ nextAttempt = lastAttempt
+ while retries < len(self.retrySchedule):
+ nextAttempt += self.retrySchedule[retries]
+ retries += 1
+ if nextAttempt > lastAttempt:
+ break
+ if retries <= len(self.retrySchedule):
+ self.queueDeliveryMessage(msg, retries, nextAttempt)
+ elif not self.retrySchedule:
+ #LEGACY XXXX003
+ retries += 1
+ nextAttempt = 0
+ if retries < 10:
+ self.queueDeliveryMessage(msg, retries, nextAttempt)
+
self.removeMessage(handle)
finally:
self._lock.release()