[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[minion-cvs] Configurable, smart retry logic; lots of refactoring; m...



Update of /home/minion/cvsroot/src/minion/lib/mixminion/server
In directory moria.mit.edu:/tmp/cvs-serv3319/lib/mixminion/server

Modified Files:
	Modules.py ServerConfig.py ServerMain.py ServerQueue.py 
Log Message:
Configurable, smart retry logic; lots of refactoring; more tests.

mixminiond.conf, ServerQueue, Modules, ServerMain, test:
- Add more sophisticated, more configurable retry logic

Modules, ServerQueue, ServerMain, test:
- Refactor queueDeliveryMessage to not have a bogus 'addr' argument.

ServerMain:
- Change OutgoingQueue to contain instances of RelayedPacket, not 2-tuples of
  (IPv4Info, Packet).

ServerQueue:
- Remove dead code

test:
- Test payloads for drop messages.
- Test retry logic on server queues.
- Tests for DeliveryPacket methods
- Tests for connection padding and key renegotiation
- Use new queueDeliveryMessage interface
- Fix bug that generated server descs of the form "5 days days"



Index: Modules.py
===================================================================
RCS file: /home/minion/cvsroot/src/minion/lib/mixminion/server/Modules.py,v
retrieving revision 1.24
retrieving revision 1.25
diff -u -d -r1.24 -r1.25
--- Modules.py	10 Jan 2003 20:12:05 -0000	1.24
+++ Modules.py	13 Jan 2003 06:35:52 -0000	1.25
@@ -30,7 +30,9 @@
 import mixminion.Config
 import mixminion.Packet
 import mixminion.server.ServerQueue
-from mixminion.Config import ConfigError, _parseBoolean, _parseCommand
+import mixminion.server.ServerConfig
+from mixminion.Config import ConfigError, _parseBoolean, _parseCommand, \
+     _parseIntervalList
 from mixminion.Common import LOG, createPrivateDir, MixError, isSMTPMailbox, \
      isPrintingAscii
 from mixminion.BuildMessage import CompressedDataTooLong
@@ -59,6 +61,10 @@
         "Zero-argument constructor, as required by Module protocol."
         pass
 
+    def getRetrySchedule(self):
+        "DOCDOC"
+        return None
+
     def getConfigSyntax(self):
         """Return a map from section names to section syntax, as described
            in Config.py"""
@@ -102,7 +108,8 @@
            Otherwise, the message is either a reply or an encrypted
            forward message.
            """
-        return SimpleModuleDeliveryQueue(self, queueDir)
+        return SimpleModuleDeliveryQueue(self, queueDir,
+                                   retrySchedule=self.getRetrySchedule())
 
     def processMessage(self, packet):
         """Given a DeliveryPacket object, try to delier it.  Return one of:
@@ -124,10 +131,9 @@
     def __init__(self, module):
         self.module = module
 
-    def queueDeliveryMessage(self, addr, packet, retry=0):
+    def queueDeliveryMessage(self, packet, retry=0, lastAttempt=0):
         """Instead of queueing our message, pass it directly to the underlying
            DeliveryModule."""
-        assert addr is None
         try:
             res = self.module.processMessage(packet)
             if res == DELIVER_OK:
@@ -153,13 +159,13 @@
        don't care about batching messages to like addresses."""
     ## Fields:
     # module: the underlying module.
-    def __init__(self, module, directory):
-        mixminion.server.ServerQueue.DeliveryQueue.__init__(self, directory)
+    def __init__(self, module, directory, retrySchedule=None):
+        mixminion.server.ServerQueue.DeliveryQueue.__init__(self, directory,
+                                                            retrySchedule)
         self.module = module
 
     def _deliverMessages(self, msgList):
-        for handle, addr, packet, n_retries in msgList:
-            assert addr is None
+        for handle, packet, n_retries in msgList:
             try:
                 result = self.module.processMessage(packet)
                 if result == DELIVER_OK:
@@ -174,11 +180,6 @@
                                    "Exception delivering message")
                 self.deliveryFailed(handle, 0)
 
-    def queueDeliveryMessage(self, addr, packet, retry=0):
-        assert addr is None
-        mixminion.server.ServerQueue.DeliveryQueue.queueDeliveryMessage(
-            self, None, packet, retry)
-
 class DeliveryThread(threading.Thread):
     def __init__(self, moduleManager):
         threading.Thread.__init__(self)
@@ -375,9 +376,6 @@
         if self.enabled.has_key(module.getName()):
             del self.enabled[module.getName()]
 
-    def queueMessage2(self, packet):
-        self.queueDecodedMessage(packet)
-
     def queueDecodedMessage(self, packet):
         #DOCDOC
         exitType = packet.getExitType()
@@ -391,7 +389,7 @@
         LOG.debug("Delivering message %r (type %04x) via module %s",
                   packet.getContents()[:8], exitType, mod.getName())
        
-        queue.queueDeliveryMessage(None, packet)
+        queue.queueDeliveryMessage(packet)
         
     #DOCDOC
     def shutdown(self):
@@ -422,6 +420,8 @@
     """Null-object pattern: drops all messages it receives."""
     def getConfigSyntax(self):
         return { }
+    def getRetrySchedule(self):
+        return [ ]
     def getServerInfoBlock(self):
         return ""
     def configure(self, config, manager):
@@ -590,11 +590,16 @@
         DeliveryModule.__init__(self)
         self.addresses = {}
 
+    def getRetrySchedule(self):
+        return self.retrySchedule #DOCDOC
+
     def getConfigSyntax(self):
         # FFFF There should be some way to say that fields are required
         # FFFF if the module is enabled.
         return { "Delivery/MBOX" :
                  { 'Enabled' : ('REQUIRE',  _parseBoolean, "no"),
+                   'Retry': ('ALLOW', _parseIntervalList,
+                             "7 hours for 6 days"),
                    'AddressFile' : ('ALLOW', None, None),
                    'ReturnAddress' : ('ALLOW', None, None),
                    'RemoveContact' : ('ALLOW', None, None),
@@ -617,6 +622,10 @@
                 LOG.warn("Value of %s (%s) doesn't look like an email address",
                          field, sec[field])
 
+        mixInterval = sections['Server']['MixInterval'][2]
+        mixminion.server.ServerConfig._validateRetrySchedule(
+            mixInterval, entries, "Delivery/MBOX")
+
     def configure(self, config, moduleManager):
         if not config['Delivery/MBOX'].get("Enabled", 0):
             moduleManager.disableModule(self)
@@ -627,6 +636,7 @@
         self.addressFile = sec['AddressFile']
         self.returnAddress = sec['ReturnAddress']
         self.contact = sec['RemoveContact']
+        self.retrySchedule = sec['Retry']
         # validate should have caught these.
         assert (self.server and self.addressFile and self.returnAddress
                 and self.contact)
@@ -735,9 +745,14 @@
     def __init__(self):
         SMTPModule.__init__(self)
 
+    def getRetrySchedule(self):
+        return self.retrySchedule #DOCDOC
+
     def getConfigSyntax(self):
         return { "Delivery/SMTP" :
                  { 'Enabled' : ('REQUIRE', _parseBoolean, "no"),
+                   'Retry': ('ALLOW', _parseIntervalList,
+                             "7 hours for 6 days"),
                    'BlacklistFile' : ('ALLOW', None, None),
                    'SMTPServer' : ('ALLOW', None, 'localhost'),
                    'Message' : ('ALLOW', None, ""),
@@ -762,6 +777,10 @@
             LOG.warn("Return address (%s) doesn't look like an email address",
                      sec['ReturnAddress'])
 
+        mixInterval = sections['Server']['MixInterval'][2]
+        mixminion.server.ServerConfig._validateRetrySchedule(
+            mixInterval, entries, "Delivery/SMTP")
+
     def configure(self, config, manager):
         sec = config['Delivery/SMTP']
         if not sec.get('Enabled'):
@@ -769,6 +788,7 @@
             return
 
         self.server = sec['SMTPServer']
+        self.retrySchedule = sec['Retry']
         if sec['BlacklistFile']:
             self.blacklist = EmailAddressSet(fname=sec['BlacklistFile'])
         else:
@@ -824,9 +844,14 @@
     def __init__(self):
         SMTPModule.__init__(self)
 
+    def getRetrySchedule(self):
+        return self.retrySchedule #DOCDOC
+
     def getConfigSyntax(self):
         return { "Delivery/SMTP-Via-Mixmaster" :
                  { 'Enabled' : ('REQUIRE', _parseBoolean, "no"),
+                   'Retry': ('ALLOW', _parseIntervalList,
+                             "7 hours for 6 days"),
                    'MixCommand' : ('REQUIRE', _parseCommand, None),
                    'Server' : ('REQUIRE', None, None),
                    'SubjectLine' : ('ALLOW', None,
@@ -835,9 +860,14 @@
                  }
 
     def validateConfig(self, sections, entries, lines, contents):
-        # Currently, we accept any configuration options that the config allows
-        pass
-
+        #XXXX write more
+        sec = sections['Delivery/SMTP-Via-Mixmaster']
+        if not sec.get("Enabled"):
+            return
+        mixInterval = sections['Server']['MixInterval'][2]
+        mixminion.server.ServerConfig._validateRetrySchedule(
+            mixInterval, entries, "Delivery/SMTP-Via-Mixmaster")
+        
     def configure(self, config, manager):
         sec = config['Delivery/SMTP-Via-Mixmaster']
         if not sec.get("Enabled", 0):
@@ -846,6 +876,7 @@
         cmd = sec['MixCommand']
         self.server = sec['Server']
         self.subject = sec['SubjectLine']
+        self.retrySchedule = sec['Retry']
         self.command = cmd[0]
         self.options = tuple(cmd[1]) + ("-l", self.server,
                                         "-s", self.subject)

Index: ServerConfig.py
===================================================================
RCS file: /home/minion/cvsroot/src/minion/lib/mixminion/server/ServerConfig.py,v
retrieving revision 1.14
retrieving revision 1.15
diff -u -d -r1.14 -r1.15
--- ServerConfig.py	6 Jan 2003 12:57:48 -0000	1.14
+++ ServerConfig.py	13 Jan 2003 06:35:52 -0000	1.15
@@ -63,7 +63,8 @@
         if _haveEntry(entries, 'Server', 'Mode'):
             LOG.warn("Mode specification is not yet supported.")
 
-        if server['MixInterval'][2] < 30*60:
+        mixInterval = server['MixInterval'][2]
+        if mixInterval < 30*60:
             LOG.warn("Dangerously low MixInterval")
         if server['MixAlgorithm'] == 'TimedMixQueue':
             if _haveEntry(entries, 'Server', 'MixPoolRate'):
@@ -88,6 +89,8 @@
         if [e for e in entries['Outgoing/MMTP'] if e[0] in ('Allow', 'Deny')]:
             LOG.warn("Allow/deny are not yet supported")
 
+        _validateRetrySchedule(mixInterval, entries, "Outgoing/MMTP")
+
         self.moduleManager.validate(sections, entries, lines, contents)
 
     def __loadModules(self, section, sectionEntries):
@@ -105,6 +108,41 @@
         "Return the module manager initialized by this server."
         return self.moduleManager
 
+def _validateRetrySchedule(mixinterval, entries, sectionname,
+                           entryname='Retry'):
+    #XXXX writeme.
+    entry = [e for e in entries.get(sectionname,[]) if e[0] == entryname]
+    if not entry:
+        return
+    assert len(entry) == 1
+    sched = entry[0][1]
+    total = reduce(operator.add, sched, 0)
+
+    # Warn if we try for less than a day.
+    if total < 24*60*60:
+        LOG.warn("Dangerously low retry timeout for %s (<1 day)", sectionname)
+        
+    # Warn if we try for more than two weeks.
+    if total > 2*7*24*60*60:
+        LOG.warn("Very high retry timeout for %s (>14 days)", sectionname)
+
+    # Warn if any of our intervals are less than the mix interval...
+    if min(sched) < mixInterval-2:
+        LOG.warn("Rounding retry intervals for %s to the nearest mix interval",
+                 sectionname)
+
+    # ... or less than 5 minutes.
+    elif min(sched) < 5*60:
+        LOG.warn("Very fast retry intervals for %s (< 5 minutes)", sectionname)
+
+    # Warn if we make fewer than 5 attempts.
+    if len(sched) < 5:
+        LOG.warn("Dangerously low number of retries for %s (<5)", sectionname)
+
+    # Warn if we make more than 50 attempts.
+    if len(sched) > 50:
+        LOG.warn("Very high number of retries for %s (>50)", sectionname)
+
 #======================================================================
 
 _MIX_RULE_NAMES = {
@@ -192,6 +230,8 @@
                           'Deny' : ('ALLOW*', C._parseAddressSet_deny, None)
 			 },
         'Outgoing/MMTP' : { 'Enabled' : ('REQUIRE', C._parseBoolean, "no"),
+                            'Retry' : ('ALLOW', C._parseIntervalList,
+                                    ".5 hour for 1 day, 7 hours for 5 days"),
                           'Allow' : ('ALLOW*', C._parseAddressSet_allow, None),
                           'Deny' : ('ALLOW*', C._parseAddressSet_deny, None) },
         # FFFF Missing: Queue-Size / Queue config options

Index: ServerMain.py
===================================================================
RCS file: /home/minion/cvsroot/src/minion/lib/mixminion/server/ServerMain.py,v
retrieving revision 1.31
retrieving revision 1.32
diff -u -d -r1.31 -r1.32
--- ServerMain.py	10 Jan 2003 20:12:05 -0000	1.31
+++ ServerMain.py	13 Jan 2003 06:35:52 -0000	1.32
@@ -180,11 +180,9 @@
                           formatBase64(packet.getContents()[:8]))
                 self.moduleManager.queueDecodedMessage(packet)
             else:
-                ipv4 = packet.getAddress()
-                msg = packet.getPacket()
                 LOG.debug("  (sending message %s to MMTP server)",
-                          formatBase64(msg[:8]))
-                self.outgoingQueue.queueDeliveryMessage(ipv4, msg)
+                          formatBase64(packet.getPacket()[:8]))
+                self.outgoingQueue.queueDeliveryMessage(packet)
             self.queue.removeMessage(h)
 
     def getNextMixTime(self, now):
@@ -194,8 +192,8 @@
 
 class OutgoingQueue(mixminion.server.ServerQueue.DeliveryQueue):
     """DeliveryQueue to send messages via outgoing MMTP connections.  All
-       methods on this class are called from the main thread.  The addresses
-       in this queue are pickled IPV4Info objects.
+       methods on this class are called from the main thread.  The underlying
+       objects in this queue are instances of RelayedPacket.
 
        All methods in this class are run from the main thread.
     """
@@ -205,6 +203,10 @@
         mixminion.server.ServerQueue.DeliveryQueue.__init__(self, location)
         self.server = None
 
+    def configure(self, config):
+        retry = config['Outgoing/MMTP']['Retry']
+        self.setRetrySchedule(retry)
+
     def connectQueues(self, server):
         """Set the MMTPServer that this OutgoingQueue informs of its
            deliverable messages."""
@@ -214,7 +216,10 @@
         "Implementation of abstract method from DeliveryQueue."
         # Map from addr -> [ (handle, msg) ... ]
         msgs = {}
-        for handle, addr, message, n_retries in msgList:
+        # XXXX SKIP DEAD MESSAGES!!!!
+        for handle, packet, n_retries in msgList:
+            addr = packet.getAddress()
+            message = packet.getPacket()
             msgs.setdefault(addr, []).append( (handle, message) )
         for addr, messages in msgs.items():
             handles, messages = zip(*messages)
@@ -408,6 +413,7 @@
         outgoingDir = os.path.join(queueDir, "outgoing")
         LOG.debug("Initializing outgoing queue")
         self.outgoingQueue = OutgoingQueue(outgoingDir)
+        self.outgoingQueue.configure(config)
         LOG.debug("Found %d pending messages in outgoing queue",
                        self.outgoingQueue.count())
 

Index: ServerQueue.py
===================================================================
RCS file: /home/minion/cvsroot/src/minion/lib/mixminion/server/ServerQueue.py,v
retrieving revision 1.2
retrieving revision 1.3
diff -u -d -r1.2 -r1.3
--- ServerQueue.py	10 Jan 2003 20:12:05 -0000	1.2
+++ ServerQueue.py	13 Jan 2003 06:35:52 -0000	1.3
@@ -30,10 +30,6 @@
 # trash.
 INPUT_TIMEOUT = 6000
 
-## # If we've been cleaning for more than CLEAN_TIMEOUT seconds, assume the
-## # old clean is dead.
-## CLEAN_TIMEOUT = 120
-
 class Queue:
     """A Queue is an unordered collection of files with secure insert, move,
        and delete operations.
@@ -257,39 +253,6 @@
         # We don't need to hold the lock here; we synchronize via the
         # filesystem.
 
-# XXXX this logic never worked anyway; now we do all our cleaning in a separate
-# XXXX thread anyway.
-
-##         now = time.time()
-##         cleanFile = os.path.join(self.dir,".cleaning")
-##
-##         cleaning = 1
-##         while cleaning:
-##             try:
-##                 # Try to get the .cleaning lock file.  If we can create it,
-##                 # we're the only cleaner around.
-##                 fd = os.open(cleanFile, os.O_WRONLY+os.O_CREAT+os.O_EXCL, 0600)
-##                 os.write(fd, str(now))
-##                 os.close(fd)
-##                 cleaning = 0
-##             except OSError:
-##                 try:
-##                     # If we can't create the file, see if it's too old.  If it
-##                     # is too old, delete it and try again.  If it isn't, there
-##                     # may be a live clean in progress.
-##                     s = os.stat(cleanFile)
-##                     if now - s[stat.ST_MTIME] > CLEAN_TIMEOUT:
-##                         os.unlink(cleanFile)
-##                     else:
-##                         return 1
-##                 except OSError:
-##                     # If the 'stat' or 'unlink' calls above fail, then
-##                     # .cleaning must not exist, or must not be readable
-##                     # by us.
-##                     if os.path.exists(cleanFile):
-##                         # In the latter case, bail out.
-##                         return 1
-
         rmv = []
         allowedTime = int(time.time()) - INPUT_TIMEOUT
         for m in os.listdir(self.dir):
@@ -364,9 +327,16 @@
     #    pending -- Dict from handle->1, for all messages that we're
     #           currently sending.
 
-    def __init__(self, location):
+    def __init__(self, location, retrySchedule=None):
         Queue.__init__(self, location, create=1, scrub=1)
         self._rescan()
+        if retrySchedule is None:
+            self.retrySchedule = None
+        else:
+            self.retrySchedule = retrySchedule[:]
+
+    def setRetrySchedule(self, schedule):#DOCDOC
+        self.retrySchedule = schedule[:]
 
     def _rescan(self):
         """Rebuild the internal state of this queue from the underlying
@@ -381,15 +351,16 @@
     def queueMessage(self, msg):
         if 1: raise MixError("Tried to call DeliveryQueue.queueMessage.")
 
-    def queueDeliveryMessage(self, addr, msg, retry=0):
+    def queueDeliveryMessage(self,  msg, retry=0, nextAttempt=0):
         """Schedule a message for delivery.
              addr -- An object to indicate the message's destination
              msg -- the message itself
-             retry -- how many times so far have we tried to send?"""
-
+             retry -- how many times so far have we tried to send?
+             DOCDOC"""
         try:
             self._lock.acquire()
-            handle = self.queueObject( (retry, addr, msg) )
+            #DOCDOC nextAttempt 
+            handle = self.queueObject( (retry, None, msg, nextAttempt) )
             self.sendable.append(handle)
         finally:
             self._lock.release()
@@ -397,22 +368,29 @@
         return handle
 
     def get(self,handle):
-        """Returns a (n_retries, addr, msg) payload for a given
+        """Returns a (n_retries, addr, msg, nextAttempt?) payload for a given
            message handle."""
-        return self.getObject(handle)
+        o = self.getObject(handle)
+        if len(o) == 3:# XXXX For legacy queues; delete after 0.0.3
+            o = o + (0,)
+        return o[0], o[2], o[3]
 
-    def sendReadyMessages(self):
+    def sendReadyMessages(self, now=None):
         """Sends all messages which are not already being sent."""
-
+        if now is None:
+            now = time.time()
         try:
             self._lock.acquire()
             handles = self.sendable
             messages = []
             self.sendable = []
             for h in handles:
-                retries, addr, msg = self.getObject(h)
-                messages.append((h, addr, msg, retries))
-                self.pending[h] = 1
+                retries,msg, nextAttempt = self.get(h)
+                if nextAttempt <= now:
+                    messages.append((h, msg, retries))
+                    self.pending[h] = now
+                else:
+                    self.sendable.append(h)
         finally:
             self._lock.release()
         if messages:
@@ -420,7 +398,7 @@
 
     def _deliverMessages(self, msgList):
         """Abstract method; Invoked with a list of
-           (handle, addr, message, n_retries) tuples every time we have a batch
+           (handle, message, n_retries) tuples every time we have a batch
            of messages to send.
 
            For every handle in the list, delierySucceeded or deliveryFailed
@@ -440,6 +418,7 @@
         """
         try:
             self._lock.acquire()
+            #XXXX003 be more robust in the presence of errors here.
             self.removeMessage(handle)
             del self.pending[handle]
         finally:
@@ -451,16 +430,40 @@
            invoked after the corresponding message has been
            successfully delivered."""
         try:
+            #XXXX003 be more robust in the presence of errors here.
             self._lock.acquire()
+            lastAttempt = self.pending[handle]
             del self.pending[handle]
             if retriable:
                 # Queue the new one before removing the old one, for
-                # crash-proofness
-                retries, addr, msg = self.getObject(handle)
-                # FFFF This test makes us never retry past the 10th attempt.
-                # FFFF That's wrong; we should be smarter.
-                if retries <= 10:
-                    self.queueDeliveryMessage(addr, msg, retries+1)
+                # crash-proofness.  First, fetch the old information...
+                retries, msg, schedAttempt = self.get(handle)
+
+                # Multiple retry intervals may have passed in between the most
+                # recent failed delivery attempt (lastAttempt) and the time
+                # it was schedule (schedAttempt).  Increment 'retries' and
+                # efore it (prevAttempt).  Increment 'retries' to reflect the
+                # number of retry intervals that have passed between first
+                # sending the message and nextAttempt.
+                #DOCDOC
+                if self.retrySchedule and retries < len(self.retrySchedule):
+                    nextAttempt = schedAttempt
+                    if nextAttempt == 0:
+                        nextAttempt = lastAttempt
+                    while retries < len(self.retrySchedule):
+                        nextAttempt += self.retrySchedule[retries]
+                        retries += 1
+                        if nextAttempt > lastAttempt:
+                            break
+                    if retries <= len(self.retrySchedule):
+                        self.queueDeliveryMessage(msg, retries, nextAttempt)
+                elif not self.retrySchedule:
+                    #LEGACY XXXX003
+                    retries += 1
+                    nextAttempt = 0
+                    if retries < 10:
+                        self.queueDeliveryMessage(msg, retries, nextAttempt)
+                                              
             self.removeMessage(handle)
         finally:
             self._lock.release()