[Author Prev][Author Next][Thread Prev][Thread Next][Author Index][Thread Index]

[minion-cvs] Several days worth of hacking. Highlights: Key rotatio...



Update of /home/minion/cvsroot/src/minion/lib/mixminion/server
In directory moria.mit.edu:/tmp/cvs-serv2846/lib/mixminion/server

Modified Files:
	EventStats.py MMTPServer.py Modules.py PacketHandler.py 
	ServerConfig.py ServerKeys.py ServerMain.py ServerQueue.py 
Log Message:
Several days worth of hacking.  Highlights: Key rotation, robust queues.

TODO:
- Update status, add time estimates
- Break down directory work

etc/mixminiond.conf:
- Rename PublicKeySloppiness to PublicKeyOverlap

*:
- Whitespace normalization

ClientMain:
- Improve path syntax to include ?, *n,  Allow choice-with-replacement
- Use new readPickled functionality from Common
- Add -n argument for flush command
- Add default-path options to ClientConfig
- Be more specific about causes of failure when flushing; be more specific
  about # messages flushed.
- Remove --swap-at option: now path syntax is adequate.

Config, ClientMain, Common:
- Change duration from a 3-tuple to an independent class.  Now we 
  can say duration.getSeconds() rather than duration[2], which makes
  some stuff more readable.

Common:
- Debug checkPrivateFile
- Add AtomicFile class to help with standard create/rename pattern.
- Add readPickled/writePickled wrappers

MMTPClient:
- Document PeerCertificateCache

Packet:
- Correct documentation on overflow, underflow.

benchmark:
- Improve format of printed sizes
- Improve pk timing; time with bizarre exponent.
- Add Timing for ServerQueues

test:
- Add tests for encodeBase64
- Correct tests for new DeliveryQueue implementation
- Add tests for checkPrivateFile
- Revise tests for _parseInterval in response to new Duration class.
- Add tests for generating new descriptors with existing keys
- Fix test for directory with bad signature: make it fail for the
  right reason
- Deal with new validateConfig in Module
- Add test for scheduler.
- Tests for new path selection code

testSupport: 
- Module code uses new interface

EventStats:
- Document, clean

MMTPServer:
- Better warning on TLSClosed while connecting.
- Document new functionality

Modules:
- validateConfig function no longer needs 'sections' and 'entries':
  make it follow the same interface as other validation fns
- _deliverMessages: use new DeliveryQueue interface

PacketHandler:
- Always take a list of keys, never a single one.

ServerConfig:
- Refactor validateRetrySchedule
- Use new Duration class
- Rename PublicKeySloppiness to PublicKeyOverlap

ServerKeys: ***
- Implement key rotation:
   - Notice when to add and remove keys from PacketHandlers, MMTPServer
   - Set keys in packethandlers, mmtpserver 
   - Note that 512-bit DH moduli are kinda silly 
- More code and debugging for descriptor regenration

ServerMain:
- Documentation
- Key rotation
- Respond to refactoring in DeliveryQueue
- Use lambdas to wrap EventStats rotation
- Separate reset method
- Remove obsolete commands

ServerQueue: ***
- Refactor DeliveryQueue so that it has a prayer of working: Keep
  message delivery state in a separate file, and update separately.
  Remember time of queueing for each method, and last attempted
  delivery; n_retries is gone.  This allows us to change the retry schedule
  without putting messages in an inconsistent state.

  An earlier version put the state for _all_ queued objects in a
  single file: this turned out to be screamingly inefficient.

crypt.c, tls.c:
- Documentation fixes




Index: EventStats.py
===================================================================
RCS file: /home/minion/cvsroot/src/minion/lib/mixminion/server/EventStats.py,v
retrieving revision 1.3
retrieving revision 1.4
diff -u -d -r1.3 -r1.4
--- EventStats.py	5 May 2003 00:38:46 -0000	1.3
+++ EventStats.py	17 May 2003 00:08:44 -0000	1.4
@@ -7,15 +7,14 @@
 
 __all__ = [ 'EventLog', 'NilEventLog' ]
 
-import cPickle
 import os
 from threading import RLock
 from time import time
 
 from mixminion.Common import formatTime, LOG, previousMidnight, floorDiv, \
-     createPrivateDir, MixError
+     createPrivateDir, MixError, readPickled, writePickled
 
-# _EVENTS: a list of all recognized event types. 
+# _EVENTS: a list of all recognized event types.
 _EVENTS = [ 'ReceivedPacket',
            'AttemptedRelay',
            'SuccessfulRelay', 'FailedRelay', 'UnretriableRelay',
@@ -25,7 +24,7 @@
 
 class NilEventLog:
     """Null implementation of EventLog interface: ignores all events and
-       logs nothing.  
+       logs nothing.
     """
     def __init__(self):
         pass
@@ -33,10 +32,13 @@
         """Flushes this eventlog to disk."""
         pass
     def rotate(self, now=None):
-        """DOCDOC"""
+        """Move the pending events from this EventLog into a
+           summarized text listing, and start a new pool.  Requires
+           that it's time to rotate.
+        """
         pass
     def getNextRotation(self):
-        """DOCDOC"""
+        """Return a time after which it's okay to rotate the log."""
         return 0
     def _log(self, event, arg=None):
         """Notes that an event has occurred.
@@ -79,7 +81,7 @@
            module fails unretriably.
         """
         self._log("UnretriableDelivery", arg)
-        
+
 class EventLog(NilEventLog):
     """An EventLog records events, aggregates them according to some time
        periods, and logs the totals to disk.
@@ -124,10 +126,7 @@
            periodically writes to 'historyFile' every 'interval' seconds."""
         NilEventLog.__init__(self)
         if os.path.exists(filename):
-            # XXXX If this doesn't work, then we should ????004
-            f = open(filename, 'rb')
-            self.__dict__.update(cPickle.load(f))
-            f.close()
+            self.__dict__.update(readPickled(filename))
             assert self.count is not None
             assert self.lastRotation is not None
             assert self.accumulatedTime is not None
@@ -155,7 +154,7 @@
             self._save(now)
         finally:
             self._lock.release()
-            
+
     def _save(self, now=None):
         """Implements 'save' method.  For internal use.  Must hold self._lock
            to invoke."""
@@ -168,14 +167,10 @@
             pass
         self.accumulatedTime += int(now-self.lastSave)
         self.lastSave = now
-        f = open(tmpfile, 'wb')
-        cPickle.dump({ 'count' : self.count,
-                       'lastRotation' : self.lastRotation,
-                       'accumulatedTime' : self.accumulatedTime,
-                       },
-                     f, 1)
-        f.close()
-        os.rename(tmpfile, self.filename)
+        writePickled(self.filename, { 'count' : self.count,
+                                      'lastRotation' : self.lastRotation,
+                                      'accumulatedTime' : self.accumulatedTime,
+                                      })
 
     def _log(self, event, arg=None):
         try:
@@ -206,11 +201,11 @@
     def _rotate(self, now=None):
         """Flush all events since the last rotation to the history file,
            and clears the current event log."""
-        
+
         # Must hold lock
         LOG.debug("Flushing statistics log")
         if now is None: now = time()
-            
+
         f = open(self.historyFilename, 'a')
         self.dump(f, now)
         f.close()
@@ -220,7 +215,7 @@
             self.count[e] = {}
         self.lastRotation = now
         self._save(now)
-        self.accumulatedTime = 0        
+        self.accumulatedTime = 0
         self._setNextRotation(now)
 
     def dump(self, f, now=None):
@@ -257,7 +252,7 @@
             self._lock.release()
 
     def _setNextRotation(self, now=None):
-        # DOCDOC
+        """Helper function: calculate the time when we next rotate the log."""
         # ???? Lock to 24-hour cycle
 
         # This is a little weird.  We won't save *until*:
@@ -283,7 +278,9 @@
             self.nextRotation = mid + 3600 * floorDiv(rest+55*60, 3600)
 
 def configureLog(config):
-    """DOCDOC"""
+    """Given a configuration file, set up the log.  May replace the log global
+       variable.
+    """
     global log
     if config['Server']['LogStats']:
         LOG.info("Enabling statistics logging")
@@ -293,9 +290,11 @@
             statsfile = os.path.join(homedir, "stats")
         workfile = os.path.join(homedir, "work", "stats.tmp")
         log = EventLog(
-            workfile, statsfile, config['Server']['StatsInterval'][2])
+           workfile, statsfile, config['Server']['StatsInterval'].getSeconds())
+        LOG.info("Statistics logging enabled")
     else:
         log = NilEventLog()
         LOG.info("Statistics logging disabled")
 
+# Global variable: The currently configured event log.
 log = NilEventLog()

Index: MMTPServer.py
===================================================================
RCS file: /home/minion/cvsroot/src/minion/lib/mixminion/server/MMTPServer.py,v
retrieving revision 1.27
retrieving revision 1.28
diff -u -d -r1.27 -r1.28
--- MMTPServer.py	5 May 2003 02:52:01 -0000	1.27
+++ MMTPServer.py	17 May 2003 00:08:44 -0000	1.28
@@ -444,7 +444,10 @@
         except _ml.TLSWantRead:
             self.__server.registerReader(self)
         except _ml.TLSClosed:
-            warn("Unexpectedly closed connection to %s", self.address)
+            if self.__state is self.__connectFn:
+                warn("Couldn't connect to server %s", self.address)
+            else:
+                warn("Unexpectedly closed connection to %s", self.address)
             self.handleFail(retriable=1)
             self.__sock.close()
             self.__server.unregister(self)
@@ -472,7 +475,6 @@
 
     def shutdownFailed(self):
         """Called when this connection goes down hard."""
-        #DOCDOc
         pass
 
     def shutdown(self, err=0, retriable=0):
@@ -526,19 +528,25 @@
     #     if negotiation hasn't completed.
     # PROTOCOL_VERSIONS: (static) a list of protocol versions we allow,
     #     in decreasing order of preference.
+    # junkCallback: a no-arguments function called when we receive a
+    #     junk packet.
+    # rejectCallback: a no-arguments function called when we reject a packet.
+    # rejectPackets: a flag: should we reject incoming packets?
     PROTOCOL_VERSIONS = [ '0.3' ]
     def __init__(self, sock, tls, consumer, rejectPackets=0):
         """Create an MMTP connection to receive messages sent along a given
            socket.  When valid packets are received, pass them to the
-           function 'consumer'."""
+           function 'consumer'.  If rejectPackets is true, then instead of
+           accepting packets, we refuse them instead--for example, if the
+           because the disk is full."""
         SimpleTLSConnection.__init__(self, sock, tls, 1,
                                      "%s:%s"%sock.getpeername())
         self.messageConsumer = consumer
-        self.junkCallback = lambda : None #DOCDOC
-        self.rejectCallback = lambda : None #DOCDOC
+        self.junkCallback = lambda : None
+        self.rejectCallback = lambda : None
         self.finished = self.__setupFinished
         self.protocol = None
-        self.rejectPackets = rejectPackets #DOCDOC
+        self.rejectPackets = rejectPackets
 
     def __setupFinished(self):
         """Called once we're done accepting.  Begins reading the protocol
@@ -569,7 +577,7 @@
                 self.finished = self.__sentProtocol
                 self.beginWrite("MMTP %s\r\n"% p)
                 return
-            
+
         warn("Unsupported protocol list.  Closing connection to %s",
              self.address)
         self.shutdown(err=1)
@@ -599,7 +607,7 @@
             expectedDigest = sha1(msg+"SEND")
             if self.rejectPackets:
                 replyDigest = sha1(msg+"REJECTED")
-                replyControl = REJECTED_CONTROL                
+                replyControl = REJECTED_CONTROL
             else:
                 replyDigest = sha1(msg+"RECEIVED")
                 replyControl = RECEIVED_CONTROL
@@ -617,7 +625,7 @@
         else:
             debug("%s packet received from %s; Checksum valid.",
                   data[:4], self.address)
-            self.finished = self.__sentAck            
+            self.finished = self.__sentAck
             self.beginWrite(replyControl+replyDigest)
             if isJunk:
                 self.junkCallback()
@@ -641,10 +649,13 @@
     """Asynchronious implementation of the sending ("client") side of a
        mixminion connection."""
     ## Fields:
-    # ip, port, keyID, messageList, handleList, sendCallback, failCallback:
-    #   As described in the docstring for __init__ below.
+    # ip, port, keyID, messageList, handleList, sendCallback, failCallback,
+    # finishedCallback, certCache:
+    #   As described in the docstring for __init__ below.  We remove entries
+    #   from the front of messageList/handleList as we begin sending them.
     # junk: A list of 32KB padding chunks that we're going to send.  We
-    #   pregenerate these to avoid timing attacks.
+    #   pregenerate these to avoid timing attacks.  They correspond to
+    #   the 'JUNK' entries in messageList.
     # isJunk: flag.  Is the current chunk padding?
     # expectedDigest: The digest we expect to receive in response to the
     #   current chunk.
@@ -652,31 +663,34 @@
     #     if negotiation hasn't completed.
     # PROTOCOL_VERSIONS: (static) a list of protocol versions we allow,
     #     in the order we offer them.
+    # _curMessage, _curHandle: Correspond to the message and handle
+    #     that we are currently trying to deliver.
     PROTOCOL_VERSIONS = [ '0.3' ]
     def __init__(self, context, ip, port, keyID, messageList, handleList,
                  sentCallback=None, failCallback=None, finishedCallback=None,
                  certCache=None):
         """Create a connection to send messages to an MMTP server.
            Raises socket.error if the connection fails.
-        
+
            ip -- The IP of the destination server.
            port -- The port to connect to.
            keyID -- None, or the expected SHA1 hash of the server's public key
            messageList -- a list of message payloads and control strings.
                The control string "JUNK" sends 32KB of padding; the control
                string "RENEGOTIATE" renegotiates the connection key.
-           handleList -- a list of objects corresponding to the messages in
+           handleList -- a list of objects corresponding to the entries in
               messageList.  Used for callback.
            sentCallback -- None, or a function of (msg, handle) to be called
               whenever a message is successfully sent.
            failCallback -- None, or a function of (msg, handle, retriable)
               to be called when messages can't be sent.
-              DOCDOC certcache,finishedCallback
-
-              DOCDOC lengths of handles and messsages are equal."""
-
+           finishedCallback -- None, or a function to be called when this
+              connection is closed.
+           certCache -- an instance of PeerCertificateCache to use for
+              checking server certificates.
+        """
         # Generate junk before connecting to avoid timing attacks
-        self.junk = [] #DOCDOC doc this field.
+        self.junk = []
         self.messageList = []
         self.handleList = []
 
@@ -705,14 +719,17 @@
         self.finished = self.__setupFinished
         self.sentCallback = sentCallback
         self.failCallback = failCallback
-        self.finishedCallback = finishedCallback #DOCDOC
+        self.finishedCallback = finishedCallback
         self.protocol = None
-        self._curMessage = self._curHandle = None#DOCDOC
+        self._curMessage = self._curHandle = None
 
-        debug("Opening client connection (fd %s)", self.fd)
+        debug("Opening client connection to %s:%s (fd %s)", ip,port,self.fd)
 
     def addMessages(self, messages, handles):
-        "DOCDOC"
+        """Given a list of messages and handles, as given to
+           MMTPServer.__init__, cause this connection to deliver that new
+           set of messages after it's done with those it's currently sending.
+        """
         assert len(messages) == len(handles)
         for m,h in zip(messages, handles):
             if m in ("JUNK", "RENEGOTIATE"):
@@ -724,7 +741,7 @@
         self.handleList.extend(handles)
 
     def getAddr(self):
-        "DOCDOC"
+        """Return an (ip,port,keyID) tuple for this connection"""
         return self.ip, self.port, self.keyID
 
     def __setupFinished(self):
@@ -756,7 +773,7 @@
            sending a packet, or exits if we're done sending.
         """
         inp = self.getInput()
-        
+
         for p in self.PROTOCOL_VERSIONS:
             if inp == 'MMTP %s\r\n'%p:
                 trace("Speaking MMTP version %s with %s", p, self.address)
@@ -777,7 +794,7 @@
             return
 
         msg = self._curMessage = self.messageList[0]
-        handle = self._curHandle = self.handleList[0]
+        self._curHandle = self.handleList[0]
         del self.messageList[0]
         del self.handleList[0]
         if msg == 'RENEGOTIATE':
@@ -788,12 +805,12 @@
             msg = self.junk[0]
             del self.junk[0]
             self.expectedDigest = sha1(msg+"RECEIVED JUNK")
-            self.rejectDigest = sha1(msg+"REJECTED") 
+            self.rejectDigest = sha1(msg+"REJECTED")
             msg = JUNK_CONTROL+msg+sha1(msg+"JUNK")
             self.isJunk = 1
         else:
             self.expectedDigest = sha1(msg+"RECEIVED")
-            self.rejectDigest = sha1(msg+"REJECTED") 
+            self.rejectDigest = sha1(msg+"REJECTED")
             msg = SEND_CONTROL+msg+sha1(msg+"SEND")
             self.isJunk = 0
 
@@ -856,7 +873,7 @@
 
         if self.finishedCallback is not None:
             self.finishedCallback()
-            
+
     def shutdownFinished(self):
         if self.finishedCallback is not None:
             self.finishedCallback()
@@ -869,9 +886,13 @@
        MMTPClientConnection, with a function to add new connections, and
        callbacks for message success and failure."""
     ##
-    # clientConByAddr
-    # certificateCache
-    #DOCDOC fields
+    # context: a TLSContext object to use for newly received connections.
+    # clientConByAddr: A map from 3-tuples returned by MMTPClientConnection.
+    #     getAddr, to MMTPClientConnection objects.
+    # certificateCache: A PeerCertificateCache object.
+    # listener: A ListenConnection object.
+    # _timeout: The number of seconds of inactivity to allow on a connection
+    #     before formerly shutting it down.
     def __init__(self, config, tls):
         AsyncServer.__init__(self)
 
@@ -894,7 +915,7 @@
                                          self._newMMTPConnection)
         #self.config = config
         self.listener.register(self)
-        self._timeout = config['Server']['Timeout'][2]
+        self._timeout = config['Server']['Timeout'].getSeconds()
         self.clientConByAddr = {}
         self.certificateCache = PeerCertificateCache()
 
@@ -934,7 +955,7 @@
             assert len(h) < 32
 
         try:
-            #DOCDOC 
+            # Is there an existing connection open to the right server?
             con = self.clientConByAddr[(ip,port,keyID)]
             LOG.debug("Queueing %s messages on open connection to %s:%s",
                       len(messages), ip, port)
@@ -944,6 +965,7 @@
             pass
 
         try:
+            # There isn't any connection to the right server. Open one...
             addr = (ip, port, keyID)
             finished = lambda addr=addr, self=self: self.__clientFinished(addr)
             con = MMTPClientConnection(self.context,
@@ -953,6 +975,7 @@
                                      finishedCallback=finished,
                                      certCache=self.certificateCache)
             con.register(self)
+            # ...and register it in clientConByAddr
             assert addr == con.getAddr()
             self.clientConByAddr[addr] = con
         except socket.error, e:
@@ -962,7 +985,7 @@
                 self.onMessageUndeliverable(m,h,1)
 
     def __clientFinished(self, addr):
-        """DOCDOC"""
+        """Called when a client connection runs out of messages to send."""
         try:
             del self.clientConByAddr[addr]
         except KeyError:
@@ -970,10 +993,15 @@
                      addr)
 
     def onMessageReceived(self, msg):
+        """Abstract function.  Called when we get a message"""
         pass
 
     def onMessageUndeliverable(self, msg, handle, retriable):
+        """Abstract function: Called when an attempt to deliver a
+           message fails."""
         pass
 
     def onMessageSent(self, msg, handle):
+        """Abstract function: Called when an attempt to deliver a
+           message succeeds."""
         pass

Index: Modules.py
===================================================================
RCS file: /home/minion/cvsroot/src/minion/lib/mixminion/server/Modules.py,v
retrieving revision 1.36
retrieving revision 1.37
diff -u -d -r1.36 -r1.37
--- Modules.py	5 May 2003 00:38:46 -0000	1.36
+++ Modules.py	17 May 2003 00:08:45 -0000	1.37
@@ -70,7 +70,7 @@
            in Config.py"""
         raise NotImplementedError("getConfigSyntax")
 
-    def validateConfig(self, sections, entries, lines, contents):
+    def validateConfig(self, config, lines, contents):
         """See mixminion.Config.validate"""
         pass
 
@@ -139,7 +139,6 @@
             res = self.module.processMessage(packet)
             if res == DELIVER_OK:
                 EventStats.log.successfulDelivery() #XXXX
-                return
             elif res == DELIVER_FAIL_RETRY:
                 LOG.error("Unable to retry delivery for message")
                 EventStats.log.unretriableDelivery() #XXXX
@@ -170,7 +169,7 @@
         self.module = module
 
     def _deliverMessages(self, msgList):
-        for handle, packet, n_retries in msgList:
+        for handle, packet in msgList:
             try:
                 EventStats.log.attemptedDelivery() #XXXX
                 result = self.module.processMessage(packet)
@@ -181,6 +180,7 @@
                     self.deliveryFailed(handle, 1)
                     EventStats.log.failedDelivery() #XXXX
                 else:
+                    assert result == DELIVER_FAIL_NORETRY
                     LOG.error("Unable to deliver message")
                     self.deliveryFailed(handle, 0)
                     EventStats.log.unretriableDelivery() #XXXX
@@ -260,7 +260,7 @@
     #    _isConfigured: flag: has this modulemanager's configure method been
     #            called?
     #    thread: None, or a DeliveryThread object.
-    
+
     def __init__(self):
         "Create a new ModuleManager"
         self.syntax = {}
@@ -346,10 +346,10 @@
         except Exception, e:
             raise MixError("Error initializing module %s" %className)
 
-    def validate(self, sections, entries, lines, contents):
+    def validate(self, config, lines, contents):
         # (As in ServerConfig)
         for m in self.modules:
-            m.validateConfig(sections, entries, lines, contents)
+            m.validateConfig(config, lines, contents)
 
     def configure(self, config):
         self._setQueueRoot(os.path.join(config['Server']['Homedir'],
@@ -399,7 +399,7 @@
            exit module, and queue the packet for delivey by that exit module.
         """
         exitType = packet.getExitType()
-        
+
         mod = self.typeToModule.get(exitType, None)
         if mod is None:
             LOG.error("Unable to handle message with unknown type %s",
@@ -408,9 +408,9 @@
         queue = self.queues[mod.getName()]
         LOG.debug("Delivering message %r (type %04x) via module %s",
                   packet.getContents()[:8], exitType, mod.getName())
-       
+
         queue.queueDeliveryMessage(packet)
-        
+
     def shutdown(self):
         """Tell the delivery thread (if any) to stop."""
         if self.thread is not None:
@@ -632,8 +632,8 @@
                    'SMTPServer' : ('ALLOW', None, 'localhost') }
                  }
 
-    def validateConfig(self, sections, entries, lines, contents):
-        sec = sections['Delivery/MBOX']
+    def validateConfig(self, config, lines, contents):
+        sec = config['Delivery/MBOX']
         if not sec.get('Enabled'):
             return
         for field in ['AddressFile', 'ReturnAddress', 'RemoveContact',
@@ -648,9 +648,7 @@
                 LOG.warn("Value of %s (%s) doesn't look like an email address",
                          field, sec[field])
 
-        mixInterval = sections['Server']['MixInterval'][2]
-        mixminion.server.ServerConfig._validateRetrySchedule(
-            mixInterval, entries, "Delivery/MBOX")
+        config.validateRetrySchedule("Delivery/MBOX")
 
     def configure(self, config, moduleManager):
         if not config['Delivery/MBOX'].get("Enabled", 0):
@@ -790,8 +788,8 @@
                    }
                  }
 
-    def validateConfig(self, sections, entries, lines, contents):
-        sec = sections['Delivery/SMTP']
+    def validateConfig(self, config, lines, contents):
+        sec = config['Delivery/SMTP']
         if not sec.get('Enabled'):
             return
         for field in 'SMTPServer', 'ReturnAddress':
@@ -804,9 +802,7 @@
             LOG.warn("Return address (%s) doesn't look like an email address",
                      sec['ReturnAddress'])
 
-        mixInterval = sections['Server']['MixInterval'][2]
-        mixminion.server.ServerConfig._validateRetrySchedule(
-            mixInterval, entries, "Delivery/SMTP")
+        config.validateRetrySchedule("Delivery/SMTP")
 
     def configure(self, config, manager):
         sec = config['Delivery/SMTP']
@@ -886,15 +882,13 @@
                    }
                  }
 
-    def validateConfig(self, sections, entries, lines, contents):
+    def validateConfig(self, config, lines, contents):
         #XXXX write more
-        sec = sections['Delivery/SMTP-Via-Mixmaster']
+        sec = config['Delivery/SMTP-Via-Mixmaster']
         if not sec.get("Enabled"):
             return
-        mixInterval = sections['Server']['MixInterval'][2]
-        mixminion.server.ServerConfig._validateRetrySchedule(
-            mixInterval, entries, "Delivery/SMTP-Via-Mixmaster")
-        
+        config.validateRetrySchedule("Delivery/SMTP-Via-Mixmaster")
+
     def configure(self, config, manager):
         sec = config['Delivery/SMTP-Via-Mixmaster']
         if not sec.get("Enabled", 0):

Index: PacketHandler.py
===================================================================
RCS file: /home/minion/cvsroot/src/minion/lib/mixminion/server/PacketHandler.py,v
retrieving revision 1.15
retrieving revision 1.16
diff -u -d -r1.15 -r1.16
--- PacketHandler.py	5 May 2003 00:38:46 -0000	1.15
+++ PacketHandler.py	17 May 2003 00:08:45 -0000	1.16
@@ -5,6 +5,7 @@
 
 import binascii
 import threading
+import types
 
 from mixminion.Common import encodeBase64, formatBase64
 import mixminion.Crypto as Crypto
@@ -31,26 +32,27 @@
     # privatekeys: a list of 2-tuples of
     #      (1) a RSA private key that we accept
     #      (2) a HashLog objects corresponding to the given key
-    def __init__(self, privatekey, hashlog):
-        """Constructs a new packet handler, given a private key object for
-           header encryption, and a hashlog object to prevent replays.
+    def __init__(self, privatekey=(), hashlog=()):
+        """Constructs a new packet handler, given a sequence of
+           private key object for header encryption, and a sequence of
+           corresponding hashlog object to prevent replays.
 
-           A sequence of private keys may be provided, if you'd like the
-           server to accept messages encrypted with any of them.  Beware,
-           though: PK decryption is expensive.  Also, a hashlog must be
-           provided for each private key.
+           The lists must be equally long.  When a new packet is
+           processed, we try each of the private keys in sequence.  If
+           the packet is decodeable with one of the keys, we log it in
+           the corresponding entry of the hashlog list.
         """
         self.privatekeys = []
         self.lock = threading.Lock()
-        
-        try:
-            _ = privatekey[0]
-            self.setKeys(privatekey, hashlog)
-        except TypeError:
-            self.setKeys([privatekey], [hashlog])
+
+        assert type(privatekey) in (types.ListType, types.TupleType)
+
+        self.setKeys(privatekey, hashlog)
 
     def setKeys(self, keys, hashlogs):
-        """DOCDOC"""
+        """Change the keys and hashlogs used by this PacketHandler.
+           Arguments are as to PacketHandler.__init__
+        """
         self.lock.acquire()
         newKeys = {}
         try:
@@ -84,7 +86,7 @@
             self.lock.acquire()
             for _, h in self.privatekeys:
                 h.close()
-        finally:        
+        finally:
             self.lock.release()
 
     def processMessage(self, msg):
@@ -177,7 +179,7 @@
             header1 = header1[overflowLength:]
 
         header1 = subh.underflow + header1
-        
+
         # Decrypt the payload.
         payload = Crypto.lioness_decrypt(msg.payload,
                               keys.getLionessKeys(Crypto.PAYLOAD_ENCRYPT_MODE))
@@ -217,7 +219,7 @@
         msg = Packet.Message(header1, header2, payload).pack()
 
         return RelayedPacket(address, msg)
-        
+
 class RelayedPacket:
     """A packet that is to be relayed to another server; returned by
        returned by PacketHandler.processMessage."""
@@ -373,5 +375,5 @@
         else:
             assert self.isPlaintext()
             tp = 'BIN'
-            
+
         return Packet.TextEncodedMessage(self.contents, tp, tag)

Index: ServerConfig.py
===================================================================
RCS file: /home/minion/cvsroot/src/minion/lib/mixminion/server/ServerConfig.py,v
retrieving revision 1.23
retrieving revision 1.24
diff -u -d -r1.23 -r1.24
--- ServerConfig.py	5 May 2003 00:38:46 -0000	1.23
+++ ServerConfig.py	17 May 2003 00:08:45 -0000	1.24
@@ -55,10 +55,10 @@
             LOG.warn("Identity key encryption not yet implemented")
         if server['EncryptPrivateKey']:
             LOG.warn("Encrypted private keys not yet implemented")
-        if server['PublicKeyLifetime'][2] < 24*60*60:
+        if server['PublicKeyLifetime'].getSeconds() < 24*60*60:
             raise ConfigError("PublicKeyLifetime must be at least 1 day.")
-        if server['PublicKeySloppiness'][2] > 20*60:
-            raise ConfigError("PublicKeySloppiness must be <= 20 minutes.")
+        if server['PublicKeyOverlap'].getSeconds() > 6*60*60:
+            raise ConfigError("PublicKeyOverlap must be <= 6 hours")
 
         if _haveEntry(self, 'Server', 'NoDaemon'):
             LOG.warn("The NoDaemon option is obsolete.  Use Daemon instead.")
@@ -66,7 +66,7 @@
         if _haveEntry(self, 'Server', 'Mode'):
             LOG.warn("Mode specification is not yet supported.")
 
-        mixInterval = server['MixInterval'][2]
+        mixInterval = server['MixInterval'].getSeconds()
         if mixInterval < 30*60:
             LOG.warn("Dangerously low MixInterval")
         if server['MixAlgorithm'] == 'TimedMixPool':
@@ -94,11 +94,9 @@
             if e[0] in ('Allow', 'Deny')]:
             LOG.warn("Allow/deny are not yet supported")
 
-        _validateRetrySchedule(mixInterval, self._sectionEntries,
-                               "Outgoing/MMTP")
+        self.validateRetrySchedule("Outgoing/MMTP")
 
-        self.moduleManager.validate(self._sections, self._sectionEntries,
-                                    lines, contents)
+        self.moduleManager.validate(self, lines, contents)
 
     def __loadModules(self, section, sectionEntries):
         """Callback from the [Server] section of a config file.  Parses
@@ -124,20 +122,21 @@
         server = self['Server']
         if server['LogLevel'] in ('TRACE', 'DEBUG'):
             reasons.append("Log is too verbose")
-        if server['LogStats'] and server['StatsInterval'][2] < 24*60*60:
+        if server['LogStats'] and server['StatsInterval'].getSeconds() \
+               < 24*60*60:
             reasons.append("StatsInterval is too short")
         if not server["EncryptIdentityKey"]:
             reasons.append("Identity key is not encrypted")
-        # ????004 Pkey lifetime, sloppiness? 
+        # ????004 Pkey lifetime, sloppiness?
         if server["MixAlgorithm"] not in _SECURE_MIX_RULES:
             reasons.append("Mix algorithm is not secure")
         else:
             if server["MixPoolMinSize"] < 5:
                 reasons.append("MixPoolMinSize is too small")
             #MixPoolRate?
-        if server["MixInterval"][2] < 30*60:
+        if server["MixInterval"].getSeconds() < 30*60:
             reasons.append("Mix interval under 30 minutes")
-        
+
         # ????004 DIRSERVERS?
 
         # ????004 Incoming/MMTP
@@ -146,40 +145,52 @@
 
         # ????004 Modules?
 
-def _validateRetrySchedule(mixInterval, entries, sectionname,
-                           entryname='Retry'):
-    """DOCDOC"""
-    entry = [e for e in entries.get(sectionname,[]) if e[0] == entryname]
-    if not entry:
-        return
-    assert len(entry) == 1
-    sched = entry[0][1]
-    total = reduce(operator.add, sched, 0)
+        return reasons
+
+    def validateRetrySchedule(self, sectionName, entryName='Retry'):
+        """Check whether the retry schedule in self[sectionName][entryName]
+           is reasonable.  Warn or raise ConfigError if it isn't.  Ignore
+           the entry if it isn't there.
+        """
+        entry = self[sectionName].get(entryName,None)
+        if not entry:
+            return
+        mixInterval = self['Server']['MixInterval'].getSeconds()
+        _validateRetrySchedule(mixInterval, entry, sectionName)
+
+def _validateRetrySchedule(mixInterval, schedule, sectionName):
+    """Backend for ServerConfig.validateRetrySchedule -- separated for testing.
+
+       mixInterval -- our batching interval.
+       schedule -- a retry schedule as returned by _parseIntervalList.
+       sectionName -- the name of the retrying subsystem: used for messages.
+    """
+    total = reduce(operator.add, schedule, 0)
 
     # Warn if we try for less than a day.
     if total < 24*60*60:
-        LOG.warn("Dangerously low retry timeout for %s (<1 day)", sectionname)
-        
+        LOG.warn("Dangerously low retry timeout for %s (<1 day)", sectionName)
+
     # Warn if we try for more than two weeks.
     if total > 2*7*24*60*60:
-        LOG.warn("Very high retry timeout for %s (>14 days)", sectionname)
+        LOG.warn("Very high retry timeout for %s (>14 days)", sectionName)
 
     # Warn if any of our intervals are less than the mix interval...
-    if min(sched) < mixInterval-2:
-        LOG.warn("Rounding retry intervals for %s to the nearest mix interval",
-                 sectionname)
+    if min(schedule) < mixInterval-2:
+        LOG.warn("Rounding retry intervals for %s to the nearest mix",
+                 sectionName)
 
     # ... or less than 5 minutes.
-    elif min(sched) < 5*60:
-        LOG.warn("Very fast retry intervals for %s (< 5 minutes)", sectionname)
+    elif min(schedule) < 5*60:
+        LOG.warn("Very fast retry intervals for %s (< 5 minutes)", sectionName)
 
     # Warn if we make fewer than 5 attempts.
-    if len(sched) < 5:
-        LOG.warn("Dangerously low number of retries for %s (<5)", sectionname)
+    if len(schedule) < 5:
+        LOG.warn("Dangerously low number of retries for %s (<5)", sectionName)
 
     # Warn if we make more than 50 attempts.
-    if len(sched) > 50:
-        LOG.warn("Very high number of retries for %s (>50)", sectionname)
+    if len(schedule) > 50:
+        LOG.warn("Very high number of retries for %s (>50)", sectionName)
 
 #======================================================================
 
@@ -240,8 +251,8 @@
                      'IdentityKeyBits': ('ALLOW', C._parseInt, "2048"),
                      'PublicKeyLifetime' : ('ALLOW', C._parseInterval,
                                             "30 days"),
-                     'PublicKeySloppiness': ('ALLOW', C._parseInterval,
-                                             "5 minutes"),
+                     'PublicKeyOverlap': ('ALLOW', C._parseInterval,
+                                          "5 minutes"),
                      'EncryptPrivateKey' : ('ALLOW', C._parseBoolean, "no"),
                      'Mode' : ('REQUIRE', C._parseServerMode, "local"),
                      'Nickname': ('ALLOW', C._parseNickname, None),

Index: ServerKeys.py
===================================================================
RCS file: /home/minion/cvsroot/src/minion/lib/mixminion/server/ServerKeys.py,v
retrieving revision 1.20
retrieving revision 1.21
diff -u -d -r1.20 -r1.21
--- ServerKeys.py	5 May 2003 00:38:46 -0000	1.20
+++ ServerKeys.py	17 May 2003 00:08:45 -0000	1.21
@@ -20,12 +20,14 @@
 import mixminion.Crypto
 import mixminion.Packet
 import mixminion.server.HashLog
-import mixminion.server.PacketHandler
 import mixminion.server.MMTPServer
+import mixminion.server.ServerMain
 
 from mixminion.ServerInfo import ServerInfo, PACKET_KEY_BYTES, MMTP_KEY_BYTES,\
      signServerInfo
-from mixminion.Common import LOG, MixError, MixFatalError, createPrivateDir, \
+
+from mixminion.Common import AtomicFile, LOG, MixError, MixFatalError, \
+     createPrivateDir, \
      checkPrivateFile, formatBase64, formatDate, formatTime, previousMidnight,\
      secureDelete
 
@@ -34,16 +36,17 @@
     """A ServerKeyring remembers current and future keys, descriptors, and
        hash logs for a mixminion server.
 
+       DOCDOC
+
        FFFF We need a way to generate keys as needed, not just a month's
        FFFF worth of keys up front.
        """
     ## Fields:
     # homeDir: server home directory
     # keyDir: server key directory
-    # keySloppiness: fudge-factor: how forgiving are we about key liveness?
+    # keyOverlap: How long after a new key begins do we accept the old one?
     # keyIntervals: list of (start, end, keyset Name)
-    # liveKey: list of (start, end, keyset name for current key.)
-    # nextRotation: time_t when this key expires.
+    # nextRotation: time_t when this key expires, DOCDOCDOC not so.
     # keyRange: tuple of (firstKey, lastKey) to represent which key names
     #      have keys on disk.
 
@@ -78,7 +81,8 @@
         self.homeDir = config['Server']['Homedir']
         self.keyDir = os.path.join(self.homeDir, 'keys')
         self.hashDir = os.path.join(self.homeDir, 'work', 'hashlogs')
-        self.keySloppiness = config['Server']['PublicKeySloppiness'][2]
+        self.keyOverlap = config['Server']['PublicKeyOverlap'].getSeconds()
+        self.nextUpdate = None
         self.checkKeys()
 
     def checkKeys(self):
@@ -144,10 +148,6 @@
                 LOG.warn("Gap in key schedule: no key from %s to %s",
                               formatDate(end), formatDate(start))
 
-        self.nextKeyRotation = 0 # Make sure that now > nextKeyRotation before
-                                 # we call _getLiveKey()
-        self._getLiveKey()       # Set up liveKey, nextKeyRotation.
-
     def getIdentityKey(self):
         """Return this server's identity key.  Generate one if it doesn't
            exist."""
@@ -216,7 +216,7 @@
 
             keyname = "%04d" % keynum
 
-            nextStart = startAt + self.config['Server']['PublicKeyLifetime'][2]
+            nextStart = startAt + self.config['Server']['PublicKeyLifetime'].getSeconds()
 
             LOG.info("Generating key %s to run from %s through %s (GMT)",
                      keyname, formatDate(startAt),
@@ -241,7 +241,7 @@
         else:
             expiryStr = ""
 
-        cutoff = now - self.keySloppiness
+        cutoff = now - self.keyOverlap
 
         for va, vu, name in self.keyIntervals:
             if vu >= cutoff:
@@ -259,47 +259,50 @@
 
         self.checkKeys()
 
-    def _getLiveKey(self, when=None):
-        """Find the first key that is now valid.  Return (Valid-after,
+    def _getLiveKeys(self, now=None):
+        """Find all keys that are now valid.  Return list of (Valid-after,
            valid-util, name)."""
         if not self.keyIntervals:
-            self.liveKey = None
-            self.nextKeyRotation = 0
-            return None
-
-        w = when
-        if when is None:
-            when = time.time()
-            if when < self.nextKeyRotation:
-                return self.liveKey
+            return []
+        if now is None:
+            now = time.time()
+        if self.nextUpdate and now > self.nextUpdate:
+            self.nextUpdate = None
 
-        idx = bisect.bisect(self.keyIntervals, (when, None, None))-1
-        k = self.keyIntervals[idx]
-        if w is None:
-            self.liveKey = k
-            self.nextKeyRotation = k[1]
+        cutoff = now-self.keyOverlap
+        # A key is live if
+        #     * it became valid before now, and
+        #     * it did not become invalid until keyOverlap seconds ago
 
-        return k
+        return [ k for k in self.keyIntervals
+                 if k[0] < now and k[1] > cutoff ]
 
-    def getNextKeyRotation(self):
-        """Return the expiration time of the current key"""
-        return self.nextKeyRotation
+    def getServerKeysets(self):
+        """Return a ServerKeyset object for the currently live key.
 
-    def getServerKeyset(self):
-        """Return a ServerKeyset object for the currently live key."""
+           DOCDOC"""
         # FFFF Support passwords on keys
-        _, _, name = self._getLiveKey()
-        keyset = ServerKeyset(self.keyDir, name, self.hashDir)
-        keyset.load()
-        return keyset
+        keysets = [ ]
+        for va, vu, name in self._getLiveKeys():
+            ks = ServerKeyset(self.keyDir, name, self.hashDir)
+            ks.validAfter = va
+            ks.validUntil = vu
+            ks.load()
+            keysets.append(ks)
+
+        #XXXX004 there should only be 2.
+        return keysets
 
     def getDHFile(self):
         """Return the filename for the diffie-helman parameters for the
            server.  Creates the file if it doesn't yet exist."""
+        #XXXX Make me private????004
         dhdir = os.path.join(self.homeDir, 'work', 'tls')
         createPrivateDir(dhdir)
         dhfile = os.path.join(dhdir, 'dhparam')
         if not os.path.exists(dhfile):
+            # ???? This is only using 512-bit Diffie-Hellman!  That isn't
+            # ???? remotely enough.
             LOG.info("Generating Diffie-Helman parameters for TLS...")
             mixminion._minionlib.generate_dh_parameters(dhfile, verbose=0)
             LOG.info("...done")
@@ -309,25 +312,61 @@
 
         return dhfile
 
-    def getTLSContext(self):
+    def _getTLSContext(self, keys=None):
         """Create and return a TLS context from the currently live key."""
-        keys = self.getServerKeyset()
+        if keys is None:
+            keys = self.getServerKeysets()[-1]
         return mixminion._minionlib.TLSContext_new(keys.getCertFileName(),
                                                    keys.getMMTPKey(),
                                                    self.getDHFile())
 
-    def getPacketHandler(self):
-        """Create and return a PacketHandler from the currently live key."""
-        keys = self.getServerKeyset()
-        packetKey = keys.getPacketKey()
-        hashlog = mixminion.server.HashLog.HashLog(keys.getHashLogFileName(),
-                                                 keys.getMMTPKeyID())
-        return mixminion.server.PacketHandler.PacketHandler(packetKey,
-                                                     hashlog)
+    def updateKeys(self, packetHandler, mmtpServer, when=None):
+        """DOCDOC: Return next rotation."""
+        self.removeDeadKeys()
+        keys = self.getServerKeysets(when)
+        LOG.info("Updating keys: %s currently valid", len(keys))
+        if mmtpServer is not None:
+            context = self._getTLSContext(keys[-1])
+            mmtpServer.setContext(context)
+        if packetHandler is not None:
+            packetKeys = []
+            hashLogs = []
+
+            for k in keys:
+                packetKeys.append(k.getPacketKey())
+                hashLogs.append(mixminion.server.HashLog.HashLog(
+                    k.getHashLogFileName(), k.getPacketKeyID()))
+            packetHandler.setKeys(packetkeys, hashLogs)
+
+        self.getNextKeyRotation(keys)
+
+    def getNextKeyRotation(self, keys=None):
+        if self.nextUpdate is None:
+            if keys is None:
+                keys = self.getServerKeysets()
+            addKeyEvents = []
+            rmKeyEvents = []
+            for k in keys:
+                va, vu = k.getLiveness()
+                rmKeyEvents.append(vu+self.keyOverlap)
+                addKeyEvents.append(vu)
+            add = min(addKeyEvents); rm = min(rmKeyEvents)
+
+            if add < rm:
+                LOG.info("Next event: new key becomes valid at %s",
+                         formatTime(add,1))
+                self.nextUpdate = add
+            else:
+                LOG.info("Next event: old key is removed at %s",
+                         formatTime(rm,1))
+                self.nextUpdate = rm
+
+
+        return self.nextUpdate
 
     def getAddress(self):
         """Return out current ip/port/keyid tuple"""
-        keys = self.getServerKeyset()
+        keys = self.getServerKeysets()[0]
         desc = keys.getServerDescriptor()
         return (desc['Incoming/MMTP']['IP'],
                 desc['Incoming/MMTP']['Port'],
@@ -354,16 +393,24 @@
     # descFile: filename of this keyset's server descriptor.
     #
     # packetKey, mmtpKey: This server's actual short-term keys.
+    # DOCDOC serverinfo, validAfter, validUntil
     def __init__(self, keyroot, keyname, hashroot):
         """Load a set of keys named "keyname" on a server where all keys
            are stored under the directory "keyroot" and hashlogs are stored
            under "hashroot". """
+        self.keyroot = keyroot
+        self.keyname = keyname
+        self.hashroot= hashroot
+
         keydir  = os.path.join(keyroot, "key_"+keyname)
         self.hashlogFile = os.path.join(hashroot, "hash_"+keyname)
         self.packetKeyFile = os.path.join(keydir, "mix.key")
         self.mmtpKeyFile = os.path.join(keydir, "mmtp.key")
         self.certFile = os.path.join(keydir, "mmtp.cert")
         self.descFile = os.path.join(keydir, "ServerDesc")
+        self.serverinfo = None
+        self.validAfter = None
+        self.validUntil = None
         if not os.path.exists(keydir):
             createPrivateDir(keydir)
 
@@ -387,13 +434,33 @@
     def getDescriptorFileName(self): return self.descFile
     def getPacketKey(self): return self.packetKey
     def getMMTPKey(self): return self.mmtpKey
-    def getMMTPKeyID(self):
-        "Return the sha1 hash of the asn1 encoding of the MMTP public key"
-        return mixminion.Crypto.sha1(self.mmtpKey.encode_key(1))
+    def getPacketKeyID(self):
+        "Return the sha1 hash of the asn1 encoding of the packet public key"
+        return mixminion.Crypto.sha1(self.packetKey.encode_key(1))
     def getServerDescriptor(self):
-        return ServerInfo(fname=self.descFile)
+        if self.serverinfo is None:
+            self.serverinfo = ServerInfo(fname=self.descFile)
+        return self.serverinfo
+    def getLiveness(self):
+        if self.validAfter is None or self.validUntil is None:
+            info = self.getServerDescriptor()
+            self.validAfter = info['Server']['Valid-After']
+            self.validUntil = info['Server']['Valid-Until']
+        return self.validAfter, self.validUntil
+    def regenerateServerDescriptor(self, config, identityKey, validAt=None):
+        """DOCDOC"""
+        self.load()
+        if validAt is None:
+            validAt = self.getLiveness()[0]
+        generateServerDescriptorAndKeys(config, identityKey,
+                         self.keyroot, self.keyname, self.hashroot,
+                         validAt=validAt, useServerKeys=1)
+        self.serverinfo = self.validAfter = self.validUntil = None
 
 class _WarnWrapper:
+    """Helper for 'checkDescriptorConsistency' to keep its implementation
+       short.  Counts the number of times it's invoked, and delegates to
+       LOG.warn if silence is false."""
     def __init__(self, silence):
         self.silence = silence
         self.called = 0
@@ -403,12 +470,12 @@
             LOG.warn(*args)
 
 def checkDescriptorConsistency(info, config, log=1):
-    """DOCDOC
+    """Given a ServerInfo and a ServerConfig, compare them for consistency.
 
-    Return true iff info may have come from 'config'.  If log is true,
-    warn as well.  Does not check keys.
+       Return true iff info may have come from 'config'.  If 'log' is
+       true, warn as well.  Does not check keys.
     """
-    
+
     if log:
         warn = _WarnWrapper(0)
     else:
@@ -419,7 +486,7 @@
     if config_s['Nickname'] and (info_s['Nickname'] != config_s['Nickname']):
         warn("Mismatched nicknames: %s in configuration; %s published.",
              config_s['Nickname'], info_s['Nickname'])
-    
+
     idBits = info_s['Identity'].get_modulus_bytes()*8
     confIDBits = config_s['IdentityKeyBits']
     if idBits != confIDBits:
@@ -438,7 +505,7 @@
         warn("Mismatched comments field.")
 
     if (previousMidnight(info_s['Valid-Until']) !=
-        previousMidnight(config_s['PublicKeyLifetime'][2] +
+        previousMidnight(config_s['PublicKeyLifetime'].getSeconds() +
                          info_s['Valid-After'])):
         warn("Published lifetime does not match PublicKeyLifetime")
 
@@ -478,7 +545,7 @@
             warn("%s enabled, but not published.", section)
 
     return not warn.called
-        
+
 #----------------------------------------------------------------------
 # Functionality to generate keys and server descriptors
 
@@ -489,7 +556,7 @@
 
 def generateServerDescriptorAndKeys(config, identityKey, keydir, keyname,
                                     hashdir, validAt=None, now=None,
-                                    useServerKeys=None):
+                                    useServerKeys=0):
     #XXXX reorder args
     """Generate and sign a new server descriptor, and generate all the keys to
        go with it.
@@ -501,10 +568,15 @@
           hashdir -- The root directory for storing hash logs.
           validAt -- The starting time (in seconds) for this key's lifetime.
 
-          DOCDOC useServerKeys, 
+          DOCDOC useServerKeys
           """
 
-    if useServerKeys is None:
+    if useServerKeys:
+        serverKeys = ServerKeyset(keydir, keyname, hashdir)
+        serverKeys.load()
+        packetKey = serverKeys.packetKey
+        mmtpKey = serverKeys.mmtpKey # not used
+    else:
         # First, we generate both of our short-term keys...
         packetKey = mixminion.Crypto.pk_generate(PACKET_KEY_BYTES*8)
         mmtpKey = mixminion.Crypto.pk_generate(MMTP_KEY_BYTES*8)
@@ -515,12 +587,6 @@
         serverKeys.packetKey = packetKey
         serverKeys.mmtpKey = mmtpKey
         serverKeys.save()
-    else:
-        #XXXX drop this once we've tested and added more validation logic.
-        LOG.warn("EXPERIMENTAL FEATURE: Regenerating server descriptor from old keys")
-        serverKeys = useServerKeys
-        packetKey = serverKeys.getPacketKey()
-        mmtpKey = serverKeys.getMMTPKey()
 
     # FFFF unused
     # allowIncoming = config['Incoming/MMTP'].get('Enabled', 0)
@@ -547,15 +613,14 @@
     # Calculate descriptor and X509 certificate lifetimes.
     # (Round validAt to previous mignight.)
     validAt = mixminion.Common.previousMidnight(validAt+30)
-    validUntil = validAt + config['Server']['PublicKeyLifetime'][2]
+    validUntil = validAt + config['Server']['PublicKeyLifetime'].getSeconds()
     certStarts = validAt - CERTIFICATE_EXPIRY_SLOPPINESS
-    certEnds = validUntil + CERTIFICATE_EXPIRY_SLOPPINESS + \
-               config['Server']['PublicKeySloppiness'][2]
+    certEnds = validUntil + CERTIFICATE_EXPIRY_SLOPPINESS
 
-    if useServerKeys is None:
-        # Create the X509 certificates
-        generateCertChain(serverKeys.getCertFileName(),
-                          mmtpKey, identityKey, nickname, certStarts, certEnds)
+    # Create the X509 certificates in any case, in case one of the parameters
+    # has changed.
+    generateCertChain(serverKeys.getCertFileName(),
+                      mmtpKey, identityKey, nickname, certStarts, certEnds)
 
     mmtpProtocolsIn = mixminion.server.MMTPServer.MMTPServerConnection \
                       .PROTOCOL_VERSIONS[:]
@@ -581,7 +646,7 @@
         "ValidUntil": formatDate(validUntil),
         "PacketKey":
            formatBase64(mixminion.Crypto.pk_encode_public_key(packetKey)),
-        "KeyID": identityKeyID,        
+        "KeyID": identityKeyID,
         "MMTPProtocolsIn" : mmtpProtocolsIn,
         "MMTPProtocolsOut" : mmtpProtocolsOut,
         "PacketFormat" : "%s.%s"%(mixminion.Packet.MAJOR_NO,
@@ -664,11 +729,13 @@
     info = signServerInfo(info, identityKey)
 
     # Write the desciptor
-    f = open(serverKeys.getDescriptorFileName(), 'w')
+    f = AtomicFile(serverKeys.getDescriptorFileName(), 'w')
     try:
         f.write(info)
-    finally:
         f.close()
+    except:
+        f.discard()
+        raise
 
     # This is for debugging: we try to parse and validate the descriptor
     #   we just made.
@@ -755,7 +822,7 @@
 
 def generateCertChain(filename, mmtpKey, identityKey, nickname,
                       certStarts, certEnds):
-    "DOCDOC"
+    """Create a two-certificate chain DOCDOC"""
     fname = filename+"_tmp"
     mixminion.Crypto.generate_cert(fname,
                                    mmtpKey, identityKey,

Index: ServerMain.py
===================================================================
RCS file: /home/minion/cvsroot/src/minion/lib/mixminion/server/ServerMain.py,v
retrieving revision 1.51
retrieving revision 1.52
diff -u -d -r1.51 -r1.52
--- ServerMain.py	5 May 2003 00:38:46 -0000	1.51
+++ ServerMain.py	17 May 2003 00:08:45 -0000	1.52
@@ -39,7 +39,7 @@
      installSIGCHLDHandler, Lockfile, secureDelete, waitForChildren
 
 class IncomingQueue(mixminion.server.ServerQueue.Queue):
-    """A DeliveryQueue to accept packets from incoming MMTP connections,
+    """A Queue to accept packets from incoming MMTP connections,
        and hold them until they can be processed.  As packets arrive, and
        are stored to disk, we notify a message queue so that another thread
        can read them."""
@@ -126,7 +126,7 @@
            queue location."""
 
         server = config['Server']
-        interval = server['MixInterval'][2]
+        interval = server['MixInterval'].getSeconds()
         if server['MixAlgorithm'] == 'TimedMixPool':
             self.queue = mixminion.server.ServerQueue.TimedMixPool(
                 location=queueDir, interval=interval)
@@ -177,7 +177,7 @@
         handles = self.queue.getBatch()
         LOG.debug("%s messages in the mix pool; delivering %s.",
                   self.queue.count(), len(handles))
-        
+
         for h in handles:
             packet = self.queue.getObject(h)
             if type(packet) == type(()):
@@ -209,7 +209,8 @@
     ## Fields:
     # server -- an instance of _MMTPServer
     # addr -- (publishedIP, publishedPort, publishedKeyID)
-    # incomingQueue -- DOCDOC
+    # incomingQueue -- pointer to IncomingQueue object to be used for
+    #        self->self communication.
     def __init__(self, location, (ip,port,keyid)):
         """Create a new OutgoingQueue that stores its messages in a given
            location."""
@@ -224,8 +225,9 @@
         self.setRetrySchedule(retry)
 
     def connectQueues(self, server, incoming):
-        """Set the MMTPServer that this OutgoingQueue informs of its
-           deliverable messages."""
+        """Set the MMTPServer and IncomingQueue that this
+           OutgoingQueue informs of its deliverable messages."""
+
         self.server = server
         self.incomingQueue = incoming
 
@@ -233,7 +235,7 @@
         "Implementation of abstract method from DeliveryQueue."
         # Map from addr -> [ (handle, msg) ... ]
         msgs = {}
-        for handle, packet, n_retries in msgList:
+        for handle, packet in msgList:
             if not isinstance(packet,
                               mixminion.server.PacketHandler.RelayedPacket):
                 LOG.warn("Skipping packet in obsolete format")
@@ -290,7 +292,7 @@
             EventStats.log.failedRelay() # XXXX replace with addr
         else:
             EventStats.log.unretriableRelay() # XXXX replace with addr
-        
+
 #----------------------------------------------------------------------
 class CleaningThread(threading.Thread):
     """Thread that handles file deletion.  Some methods of secure deletion
@@ -402,23 +404,38 @@
 #----------------------------------------------------------------------
 
 class _Scheduler:
-    """Mixin class for server.  Implements DOCDOC"""
-    # DOCDOC
+    """Mixin class for server.  Implements a priority queue of ongoing,
+       scheduled tasks with a loose (few seconds) granularity.
+    """
+    # Fields:
+    #   scheduledEvents: list of (time, identifying-string, callable)
+    #       Sorted by time.  We could use a heap here instead, but
+    #       that doesn't turn into a net benefit until we have a hundred
+    #       events or so.
     def __init__(self):
+        """Create a new _Scheduler"""
         self.scheduledEvents = []
 
     def firstEventTime(self):
+        """Return the time at which the earliest-scheduled event is
+           supposed to occur.  Returns -1 if no events.
+        """
         if self.scheduledEvents:
             return self.scheduledEvents[0][0]
         else:
             return -1
 
     def scheduleOnce(self, when, name, cb):
+        """Schedule a callback function, 'cb', to be invoked at time 'when.'
+        """
         assert type(name) is StringType
         assert type(when) in (IntType, LongType, FloatType)
         insort(self.scheduledEvents, (when, name, cb))
 
     def scheduleRecurring(self, first, interval, name, cb):
+        """Schedule a callback function 'cb' to be invoked at time 'first,'
+           and every 'interval' seconds thereafter.
+        """
         assert type(name) is StringType
         assert type(first) in (IntType, LongType, FloatType)
         assert type(interval) in (IntType, LongType, FloatType)
@@ -427,20 +444,42 @@
                                       _RecurringEvent(name, cb, self, next)))
 
     def scheduleRecurringComplex(self, first, name, cb, nextFn):
+        """Schedule a callback function 'cb' to be invoked at time 'first,'
+           and thereafter at times returned by 'nextFn'.
+
+           (nextFn is called immediately after the callback is invoked,
+           every time it is invoked, and should return a time at which.)
+        """
         assert type(name) is StringType
         assert type(first) in (IntType, LongType, FloatType)
         insort(self.scheduledEvents, (first, name,
                                       _RecurringEvent(name, cb, self, nextFn)))
 
     def processEvents(self, now=None):
+        """Run all events that are scheduled to occur before 'now'.
+
+           Note: if an event reschedules itself for a time _before_ now,
+           it will only be run once per invocation of processEvents.
+
+           The right way to run this class is something like:
+               while 1:
+                   interval = time.time() - scheduler.firstEventTime()
+                   if interval > 0:
+                       time.sleep(interval)
+                       # or maybe, select.select(...,...,...,interval)
+                   scheduler.processEvents()
+        """
         if now is None: now = time.time()
         se = self.scheduledEvents
+        cbs = []
         while se and se[0][0] <= now:
-            cb = se[0][2]
+            cbs.append(se[0][2])
             del se[0]
+        for cb in cbs:
             cb()
 
 class _RecurringEvent:
+    """helper for _Scheduler. Calls a callback, then reschedules it."""
     def __init__(self, name, cb, scheduler, nextFn):
         self.name = name
         self.cb = cb
@@ -499,20 +538,20 @@
         #XXXX004 Catch ConfigError for bad serverinfo.
         #XXXX004 Check whether config matches serverinfo
         self.keyring = mixminion.server.ServerKeys.ServerKeyring(config)
-        if self.keyring._getLiveKey() is None:
+        if not self.keyring.getLiveKeys():
             LOG.info("Generating a month's worth of keys.")
             LOG.info("(Don't count on this feature in future versions.)")
             # We might not be able to do this, if we password-encrypt keys
-            keylife = config['Server']['PublicKeyLifetime'][2]
+            keylife = config['Server']['PublicKeyLifetime'].getSeconds()
             nKeys = ceilDiv(30*24*60*60, keylife)
             self.keyring.createKeys(nKeys)
 
         LOG.debug("Initializing packet handler")
-        self.packetHandler = self.keyring.getPacketHandler()
-        LOG.debug("Initializing TLS context")
-        tlsContext = self.keyring.getTLSContext()
+        self.packetHandler = mixminion.server.PacketHandler.PacketHandler()
         LOG.debug("Initializing MMTP server")
-        self.mmtpServer = _MMTPServer(config, tlsContext)
+        self.mmtpServer = _MMTPServer(config, None)
+        LOG.debug("Initializing keys")
+        self.keyring.updateKeys(self.packetHandler, self.mmtpServer)
 
         publishedIP, publishedPort, publishedKeyID = self.keyring.getAddress()
 
@@ -562,6 +601,10 @@
         self.processingThread.start()
         self.moduleManager.startThreading()
 
+    def updateKeys(self):
+        """DOCDOC"""
+        self.keyring.updateKeys(self.packetHandler, self.mmtpServer)
+
     def run(self):
         """Run the server; don't return unless we hit an exception."""
         global GOT_HUP
@@ -575,19 +618,23 @@
         self.scheduleRecurring(now+600, 600, "SHRED", self.cleanQueues)
         self.scheduleRecurring(now+180, 180, "WAIT",
                                lambda: waitForChildren(blocking=0))
-        if EventStats.log.getNextRotation():#XXXX!!!!
+        if EventStats.log.getNextRotation():
             self.scheduleRecurring(now+300, 300, "ES_SAVE",
-                                   EventStats.log.save)
+                                   lambda: EventStats.log.save)
             self.scheduleRecurringComplex(EventStats.log.getNextRotation(),
-                                          "ES_ROTATE",
-                                          EventStats.log.rotate,
-                                          EventStats.log.getNextRotation)
+                                        "ES_ROTATE",
+                                        lambda: EventStats.log.rotate,
+                                        lambda: EventStats.log.getNextRotation)
 
         self.scheduleRecurringComplex(self.mmtpServer.getNextTimeoutTime(now),
                                       "TIMEOUT",
                                       self.mmtpServer.tryTimeout,
                                       self.mmtpServer.getNextTimeoutTime)
 
+        self.scheduleRecurringComplex(self.keyring.getNextKeyRotation(),
+                                      self.updateKeys,
+                                      "KEY_ROTATE",
+                                      self.keyring.getKeyRotation)
 
         nextMix = self.mixPool.getNextMixTime(now)
         LOG.debug("First mix at %s", formatTime(nextMix,1))
@@ -616,9 +663,7 @@
                     return
                 elif GOT_HUP:
                     LOG.info("Caught sighup")
-                    LOG.info("Resetting logs")
-                    LOG.reset()
-                    EventStats.log.save()
+                    self.reset()
                     GOT_HUP = 0
                 # Make sure that our worker threads are still running.
                 if not (self.cleaningThread.isAlive() and
@@ -626,7 +671,7 @@
                         self.moduleManager.thread.isAlive()):
                     LOG.fatal("One of our threads has halted; shutting down.")
                     return
-                
+
                 # Calculate remaining time until the next event.
                 now = time.time()
                 timeLeft = nextEventTime - now
@@ -634,6 +679,14 @@
             # An event has fired.
             self.processEvents()
 
+    def doReset(self):
+        LOG.info("Resetting logs")
+        LOG.reset()
+        EventStats.log.save()
+        LOG.info("Checking for key rotation")
+        self.keyring.checkKeys()
+        self.updateKeys()
+
     def doMix(self):
         now = time.time()
         # Before we mix, we need to log the hashes to avoid replays.
@@ -788,7 +841,7 @@
 def runServer(cmd, args):
     if cmd.endswith(" server"):
         print "Obsolete command. Use 'mixminion server-start' instead."
-    
+
     config = configFromServerArgs(cmd, args)
     try:
         # Configure the log, but delay disabling stderr until the last
@@ -834,7 +887,7 @@
         LOG.fatal_exc(info,"Exception while configuring server")
         LOG.fatal("Shutting down because of exception: %s", info[0])
         sys.exit(1)
-            
+
     LOG.info("Starting server: Mixminion %s", mixminion.__version__)
     try:
         # We keep the console log open as long as possible so we can catch
@@ -898,12 +951,6 @@
         assert cmd.endswith("reload-server") or cmd.endswith("server-reload")
         reload = 1
 
-    #XXXX004 remove this.
-    if cmd.endswith("stop-server"):
-        print "Obsolete command. Use 'mixminion server-stop' instead."
-    elif cmd.endswith("reload-server"):
-        print "Obsolete command. Use 'mixminion server-reload' instead."
-
     if usage:
         print _SIGNAL_SERVER_USAGE % { 'cmd' : cmd }
         return
@@ -911,7 +958,10 @@
     _signalServer(config, reload)
 
 def _signalServer(config, reload):
-    """DOCDOC"""
+    """Given a configuration file, sends a signal to the corresponding
+       server if it's running.  If 'reload', the signal is HUP.  Else,
+       the signal is TERM.
+    """
     homeDir = config['Server']['Homedir']
     pidFile = os.path.join(homeDir, "pid")
     if not os.path.exists(pidFile):

Index: ServerQueue.py
===================================================================
RCS file: /home/minion/cvsroot/src/minion/lib/mixminion/server/ServerQueue.py,v
retrieving revision 1.13
retrieving revision 1.14
diff -u -d -r1.13 -r1.14
--- ServerQueue.py	5 May 2003 00:38:46 -0000	1.13
+++ ServerQueue.py	17 May 2003 00:08:45 -0000	1.14
@@ -12,9 +12,10 @@
 import sys
 import cPickle
 import threading
+import types
 
 from mixminion.Common import MixError, MixFatalError, secureDelete, LOG, \
-     createPrivateDir
+     createPrivateDir, readPickled, writePickled, formatTime
 from mixminion.Crypto import getCommonPRNG
 
 __all__ = [ 'Queue', 'DeliveryQueue', 'TimedMixPool', 'CottrellMixPool',
@@ -127,7 +128,7 @@
                 return res
         finally:
             self._lock.release()
-            
+
     def pickRandom(self, count=None):
         """Returns a list of 'count' handles to messages in this queue.
            The messages are chosen randomly, and returned in a random order.
@@ -166,7 +167,7 @@
         """Given a handle and a queue, moves the corresponding message from
            this queue to the queue provided.  Returns a new handle for
            the message in the destination queue."""
-        # Since we're switching handle, we don't want to just rename;
+        # Since we're switching handles, we don't want to just rename;
         # We really want to copy and delete the old file.
         try:
             self._lock.acquire()
@@ -219,7 +220,7 @@
            commit your changes, or abortMessage to reject them."""
         file, handle = getCommonPRNG().openNewFile(self.dir, "inp_", 1)
         return file, handle
-    
+
     def finishMessage(self, f, handle):
         """Given a file and a corresponding handle, closes the file
            commits the corresponding message."""
@@ -279,7 +280,7 @@
                 LOG.error("Directory %s contains: %s", self.dir, contents)
                 self.count(1)
                 return
-                
+
             if self.n_entries < 0:
                 return
             if s1 == 'msg' and s2 != 'msg':
@@ -289,27 +290,92 @@
         finally:
             self._lock.release()
 
+
+class _DeliveryState:
+    """Helper class: holds the state needed to schedule delivery or
+       eventual abandonmont of a message in a DeliveryQueue."""
+    ## Fields:
+    # queuedTime: time at which the corresponding message was first
+    #    inserted into the queue.
+    # lastAttempt: The most recent time at which we attempted to
+    #    deliver the message. (None means 'never').
+    def __init__(self, queuedTime=None, lastAttempt=None):
+        """Create a new _DeliveryState for a message received at
+           queuedTime (default now), whose last delivery attempt was
+           at lastAttempt (default never)."""
+        if queuedTime is None:
+            queuedTime = time.time()
+        self.queuedTime = queuedTime
+        self.lastAttempt = lastAttempt
+
+    def __getstate__(self):
+        # For pickling.  All future versions of deliverystate will pickle
+        #   to a tuple, whose first element will be a version string.
+        return ("V0", self.queuedTime, self.lastAttempt)
+
+    def __setstate__(self, state):
+        # For pickling.
+        if state[0] == "V0":
+            self.queuedTime = state[1]
+            self.lastAttempt = state[2]
+        else:
+            raise MixFatalError("Unrecognized delivery state")
+
+    def getNextAttempt(self, retrySchedule, now=None):
+        """Return the next time when we should try to deliver this message
+           according to the provided retrySchedule.  If the time returned
+           is in the past, then immediate delivery is okay.  If the time
+           returned is None, this message has expired and should be forgotten.
+        """
+        if not now:
+            now = time.time()
+
+        last = self.lastAttempt
+
+        # If we've never tried to deliver the message, it's ready to
+        # go immediately.
+        if last is None:
+            return now
+
+        # Otherwise, we count from the time the message was first queued,
+        # until we find a scheduled delivery that falls after the last
+        # attempted delivery.
+        #
+        # This scheduled delivery may be in the past.  That's okay: it only
+        # means that we've missed a scheduled delivery, and we can try again
+        # immediately.
+        attempt = self.queuedTime
+        for interval in retrySchedule:
+            attempt += interval
+            if attempt > last:
+                return attempt
+
+        # Oops: there are no scheduled deliveries after the last delivery.
+        # Time to drop this message.
+        return None
+
+    def setLastAttempt(self, when):
+        """Update time of the last attempted delivery."""
+        self.lastAttempt = when
+
 class DeliveryQueue(Queue):
-    """A DeliveryQueue implements a queue that greedily sends messages
-       to outgoing streams that occasionally fail.  Messages in a
-       DeliveryQueue are no longer unstructured text, but rather
-       tuples of: (n_retries, None, message, nextAttempt), where
-       n_retries is the number of delivery attempts so far,
-       the message is an arbitrary pickled object, and nextAttempt is a time
-       before which no further delivery should be attempted.
+    """A DeliveryQueue implements a queue that greedily sends messages to
+       outgoing streams that occasionally fail.  All underlying messages
+       are pickled objects.  Additionally, we store metadata about
+       attempted deliveries in the past, so we know when to schedule the
+       next delivery.
 
-       This class is abstract. Implementors of this class should
-       subclass it to add a _deliverMessages method.  Multiple
-       invocations of this method may be active at a given time.  Upon
-       success or failure, this method should cause deliverySucceeded
-       or deliveryFailed to be called as appropriate.
+       This class is abstract. Implementors of this class should subclass
+       it to add a _deliverMessages method.  Multiple invocations of this
+       method may be active at a given time.  Upon success or failure, this
+       method should cause deliverySucceeded or deliveryFailed to be called
+       as appropriate.
 
-       Users of this class will probably only want to call the queueMessage,
-       sendReadyMessages, and nextMessageReadyAt methods.
+       Users of this class will probably only want to call the
+       queueMessage, sendReadyMessages, and nextMessageReadyAt methods.
 
-       This class caches information about the directory state; it
-       won't play nice if multiple instances are looking at the same
-       directory.
+       This class caches information about the directory state; it won't
+       play nice if multiple instances are looking at the same directory.
     """
     ###
     # Fields:
@@ -317,17 +383,36 @@
     #           currently sending.
     #    pending -- Dict from handle->time_sent, for all messages that we're
     #           currently sending.
-    #    retrySchedule: a list of intervals at which delivery of messages
+    #    retrySchedule -- a list of intervals at which delivery of messages
     #           should be reattempted, as described in "setRetrySchedule".
-    def __init__(self, location, retrySchedule=None):
+    #    deliveryState -- a dict from handle->_DeliveryState object for
+    #           all handles.
+    #    nextAttempt -- a dict from handle->time-of-next-scheduled-delivery,
+    #           for all handles.  Not meaningful for handles in 'pending'.
+    #           If the time is in the past, delivery can be tried now.
+    #           If None, the message may be removable.
+    #
+    # XXXX Refactor as many of these fields as possible into _DeliveryState.
+    #
+    # Files:
+    #    meta_* : a pickled _DeliveryState object for each message in the
+    #        queue.
+    #    rmv_meta_*: a dead metafile, waiting for removal.
+
+    def __init__(self, location, retrySchedule=None, now=None):
+        """Create a new DeliveryQueue object that stores its files in
+           <location>.  If retrySchedule is provided, it is interpreted as
+           in setRetrySchedule."""
         Queue.__init__(self, location, create=1, scrub=1)
+        self.retrySchedule = None
         self._rescan()
-        if retrySchedule is None:
-            self.retrySchedule = None
+        if retrySchedule is not None:
+            self.setRetrySchedule(retrySchedule, now)
         else:
-            self.setRetrySchedule(retrySchedule)
+            self.setRetrySchedule([0], now)
+        self._repOk()
 
-    def setRetrySchedule(self, schedule):
+    def setRetrySchedule(self, schedule, now=None):
         """Set the retry schedule for this queue.  A retry schedule is
            a list of integers, each representing a number of seconds.
            For example, a schedule of [ 120, 120, 3600, 3600 ] will
@@ -343,151 +428,307 @@
                 every 30 minutes, messages will only me retried once every
                 30 minutes.
         """
-        self.retrySchedule = schedule[:]
+        try:
+            self._lock.acquire()
+            self.retrySchedule = schedule[:]
+            self._rebuildNextAttempt(now)
+        finally:
+            self._lock.release()
 
-    def _rescan(self):
-        """Rebuild the internal state of this queue from the underlying
-           directory."""
+    def _rescan(self, now=None):
+        """Helper: Rebuild the internal state of this queue from the
+           underlying directory.  Trashes 'pending' and 'sendable'."""
+        fname = os.path.join(self.dir, "metadata")
         try:
             self._lock.acquire()
             self.pending = {}
+            self.nextAttempt = {}
             self.sendable = self.getAllMessages()
+            self._loadState()
+            self._rebuildNextAttempt(now)
+            self._repOk()
+        finally:
+            self._lock.release()
+
+    def _loadState(self):
+        """Read all DeliveryState objects from the disk."""
+        # must hold lock.
+        self.deliveryState = {}
+        for h in self.getAllMessages():
+            fn = os.path.join(self.dir, "meta_"+h)
+            if os.path.exists(fn):
+                self.deliveryState[h] = readPickled(fn)
+            else:
+                LOG.warn("No metadata for file handle %s", h)
+                obj = self.getObject(h)
+                #XXXX005 remove this.
+                if isinstance(obj, types.TupleType) and len(obj) == 4:
+                    # This message is in an obsolete format from 0.0.3.
+                    # We'd repair it, but packets from 0.0.3 are incompatible
+                    # anyway.
+                    LOG.info("Removing item %s in obsolete format", h)
+                    self.removeMessage(h)
+                    continue
+                
+                self.deliveryState[h] = _DeliveryState()
+                self._writeState(h)
+
+        for fn in os.listdir(self.dir):
+            if fn.startswith("meta_"):
+                h = fn[5:]
+                if not self.deliveryState.has_key(h):
+                    LOG.warn("Metadata for nonexistant handle %s", h)
+                os.unlink(os.path.join(self.dir, fn))
+
+    def _writeState(self, h):
+        """Helper method: writes out the metadata for handle 'h'.  If that
+           handle has been removed, removes the metadata.
+        """
+        fn = os.path.join(self.dir, "meta_"+h)
+        ds = self.deliveryState.get(h)
+        if ds is not None:
+            writePickled(fn, self.deliveryState[h])
+        else:
+            try:
+                os.rename(fn, os.path.join(self.dir, "rmv_meta_"+h))
+            except OSError:
+                pass
+
+    def _rebuildNextAttempt(self, now=None):
+        """Helper: Reconstruct self.nextAttempt from self.retrySchedule and
+           self.deliveryState.
+
+           Callers must hold self._lock.
+        """
+        if self.retrySchedule is None:
+            rs = [0]
+        else:
+            rs = self.retrySchedule
+
+        nextAttempt = {}
+        for h, ds in self.deliveryState.items():
+            nextAttempt[h] = ds.getNextAttempt(rs, now)
+        self.nextAttempt = nextAttempt
+        self._repOk()
+
+    def _repOk(self):
+        """Raise an assertion error if the internal state of this object is
+           nonsensical."""
+        #XXXX004 Later in the release cycle, we should call this less.  It
+        #XXXX004 adds ~8-9ms on my laptop for ~400 messages
+        try:
+            self._lock.acquire()
+
+            allHandles = self.getAllMessages()
+            knownHandles = self.pending.keys() + self.sendable
+            allHandles.sort()
+            knownHandles.sort()
+            assert allHandles == knownHandles
+            dsHandles = self.deliveryState.keys()
+            naHandles = self.nextAttempt.keys()
+            dsHandles.sort()
+            naHandles.sort()
+            assert allHandles == dsHandles
+            assert allHandles == naHandles
         finally:
             self._lock.release()
 
     def queueMessage(self, msg):
         if 1: raise MixError("Tried to call DeliveryQueue.queueMessage.")
 
-    def queueDeliveryMessage(self, msg, retry=0, nextAttempt=0):
+    def queueDeliveryMessage(self, msg, now=None):
         """Schedule a message for delivery.
-             msg -- the message.  This can be any pickleable object
-             retry -- how many times so far have we tried to send?
-             nextAttempt -- A time before which no further attempts to
-                  deliver should be made.
+             msg -- the message.  This can be any pickleable object.
         """
+        assert self.retrySchedule is not None
         try:
             self._lock.acquire()
-            handle = self.queueObject( (retry, None, msg, nextAttempt) )
+            handle = self.queueObject(msg)
             self.sendable.append(handle)
+            ds = self.deliveryState[handle] = _DeliveryState(now)
+            self.nextAttempt[handle] = \
+                     ds.getNextAttempt(self.retrySchedule, now)
+            #self._saveState()
+            self._writeState(handle)
         finally:
             self._lock.release()
 
+        self._repOk()
         return handle
 
-    def get(self,handle):
-        """Returns a (n_retries, msg, nextAttempt) tuple for a given
-           message handle."""
+    def _inspect(self,handle):
+        """Returns a (msg, inserted, lastAttempt, nextAttempt) tuple
+           for a given message handle.  For testing. """
+        self._repOk()
         o = self.getObject(handle)
-        return o[0], o[2], o[3]
+        return (o,
+                self.deliveryState[handle].queuedTime,
+                self.deliveryState[handle].lastAttempt,
+                self.nextAttempt[handle])
+
+    def removeExpiredMessages(self, now=None):
+        """Remove every message expired in this queue according to the
+           current schedule.  Ordinarily, messages are removed when
+           their last delivery is over.  Occasionally, however,
+           changing the schedule while the system is down can make calling
+           this method useful."""
+        try:
+            self._lock.acquire()
+            for h in self.sendable:
+                if self.nextAttempt[h] is None:
+                    self.removeMessage(h)
+        finally:
+            self._lock.release()
 
     def sendReadyMessages(self, now=None):
-        """Sends all messages which are not already being sent."""
+        """Sends all messages which are not already being sent, and which
+           are scheduled to be sent."""
+        assert self.retrySchedule is not None
         if now is None:
             now = time.time()
+        LOG.trace("ServerQueue checking for deliverable messages in %s",
+                  self.dir)
         try:
             self._lock.acquire()
             handles = self.sendable
             messages = []
             self.sendable = []
             for h in handles:
-                retries,msg, nextAttempt = self.get(h)
-                if nextAttempt <= now:
-                    messages.append((h, msg, retries))
+                assert not self.pending.has_key(h)
+                next = self.nextAttempt[h]
+                if next is None:
+                    LOG.trace("     [%s] is expired.", h)
+                    self.removeMessage(h)
+                elif next <= now:
+                    LOG.trace("     [%s] is ready for delivery", h)
+                    messages.append( (h, self.getObject(h)) )
                     self.pending[h] = now
                 else:
+                    LOG.trace("     [%s] is not yet ready for delivery", h)
                     self.sendable.append(h)
+            for h in self.pending.keys():
+                LOG.trace("     [%s] is pending delivery", h)
         finally:
             self._lock.release()
+        self._repOk()
         if messages:
             self._deliverMessages(messages)
+            self._repOk()
 
     def _deliverMessages(self, msgList):
-        """Abstract method; Invoked with a list of
-           (handle, message, n_retries) tuples every time we have a batch
-           of messages to send.
+        """Abstract method; Invoked with a list of (handle, message)
+           tuples every time we have a batch of messages to send.
 
            For every handle in the list, delierySucceeded or deliveryFailed
            should eventually be called, or the message will sit in the queue
            indefinitely, without being retried."""
 
-        # We could implement this as a single _deliverMessage(h,addr,m,n)
+        # We could implement this as a single _deliverMessage(h,addr)
         # method, but that wouldn't allow implementations to batch
         # messages being sent to the same address.
 
         raise NotImplementedError("_deliverMessages")
 
-    def deliverySucceeded(self, handle):
-        """Removes a message from the outgoing queue.  This method
-           should be invoked after the corresponding message has been
-           successfully delivered.
-        """
+    def removeMessage(self, handle):
         try:
             self._lock.acquire()
+            Queue.removeMessage(self, handle)
+            for d in self.pending, self.deliveryState, self.nextAttempt:
+                try:
+                    del d[handle]
+                except KeyError:
+                    pass
             try:
-                self.removeMessage(handle)
-            except:
-                # This should never happen.
-                LOG.error_exc(sys.exc_info(), "Error removing message")
-            try:
-                del self.pending[handle]
-            except KeyError:
-                # This should never happen.
-                LOG.error_exc(sys.exc_info(),
-                              "Handle %s was not pending", handle)
+                del self.sendable[self.sendable.index(handle)]
+            except ValueError:
+                pass
+
+            self._writeState(handle)
         finally:
             self._lock.release()
 
-    def deliveryFailed(self, handle, retriable=0):
+    def removeAll(self):
+        try:
+            self._lock.acquire()
+            Queue.removeAll(self)
+            for m in os.listdir(self.dir):
+                if m[:5] == 'meta_':
+                    os.rename(os.path.join(self.dir, m),
+                              os.path.join(self.dir, "rmv_"+m))
+            self.deliveryState = {}
+            self.pending = {}
+            self.nextAttempt = {}
+            self.sendable = []
+            self.cleanQueue()
+        finally:
+            self._lock.release()
+
+
+    def deliverySucceeded(self, handle):
+        """Removes a message from the outgoing queue.  This method
+           should be invoked after the corresponding message has been
+           successfully delivered.
+        """
+        assert self.retrySchedule is not None
+
+        LOG.trace("ServerQueue got successful delivery for %s from %s",
+                  handle, self.dir)
+        self.removeMessage(handle)
+
+    def deliveryFailed(self, handle, retriable=0, now=None):
         """Removes a message from the outgoing queue, or requeues it
            for delivery at a later time.  This method should be
            invoked after the corresponding message has been
-           successfully delivered."""
+           unsuccessfully delivered."""
+        assert self.retrySchedule is not None
+        LOG.trace("ServerQueue failed to deliver %s from %s",
+                  handle, self.dir)
         try:
             self._lock.acquire()
             try:
                 lastAttempt = self.pending[handle]
-                del self.pending[handle]
             except KeyError:
                 # This should never happen
                 LOG.error_exc(sys.exc_info(),
                               "Handle %s was not pending", handle)
                 lastAttempt = 0
-                
+
             if retriable:
-                # Queue the new one before removing the old one, for
-                # crash-proofness.  First, fetch the old information...
-                retries, msg, schedAttempt = self.get(handle)
+                # If we can retry the message, update the deliveryState
+                # with the most recent attempt, and see if there's another
+                # attempt in the future.
+                try:
+                    ds = self.deliveryState[handle]
+                except KeyError:
+                    # This should never happen
+                    LOG.error_exc(sys.exc_info(),
+                                  "Handle %s had no state", handle)
+                    ds = self.deliveryState[handle] = _DeliveryState(now)
 
-                # Multiple retry intervals may have passed in between the most
-                # recent failed delivery attempt (lastAttempt) and the time it
-                # was scheduled (schedAttempt).  Increment 'retries' and to
-                # reflect the number of retry intervals that have passed
-                # between first sending the message and nextAttempt.
-                if self.retrySchedule and retries < len(self.retrySchedule):
-                    nextAttempt = schedAttempt
-                    if nextAttempt == 0:
-                        nextAttempt = lastAttempt
-                    # Increment nextAttempt and retries according to the
-                    # retry schedule, until nextAttempt is after lastAttempt.
-                    while retries < len(self.retrySchedule):
-                        nextAttempt += self.retrySchedule[retries]
-                        retries += 1
-                        if nextAttempt > lastAttempt:
-                            break
-                    # If there are more attempts to be made, queue the message.
-                    if retries <= len(self.retrySchedule):
-                        self.queueDeliveryMessage(msg, retries, nextAttempt)
-                elif not self.retrySchedule:
-                    #XXXX005: Make sure this error never occurs.
-                    LOG.error(
-                        "ServerQueue.deliveryFailed without retrySchedule")
-                    retries += 1
-                    nextAttempt = 0
-                    if retries < 10:
-                        self.queueDeliveryMessage(msg, retries, nextAttempt)
+                ds.setLastAttempt(lastAttempt)
+                nextAttempt = ds.getNextAttempt(self.retrySchedule, now)
+                if nextAttempt is not None:
+                    LOG.trace("     (We'll %s try again at %s)", handle,
+                              formatTime(nextAttempt, 1))
+                    # There is another scheduled delivery attempt.  Remember
+                    # it, mark the message sendable again, and save our state.
+                    self.nextAttempt[handle] = nextAttempt
+                    self.sendable.append(handle)
+                    try:
+                        del self.pending[handle]
+                    except KeyError:
+                        LOG.error("Handle %s was not pending", handle)
 
-            # Now, it's okay to remove the failed message.
+                    self._repOk()
+                    self._writeState(handle)
+                    return
+
+                # Otherwise, fallthrough.
+
+            # If we reach this point, the message is undeliverable.
+            LOG.trace("     (Giving up on %s)", handle)
             self.removeMessage(handle)
+            self._repOk()
         finally:
             self._lock.release()
 
@@ -584,4 +825,3 @@
 class BinomialCottrellMixPool(_BinomialMixin,CottrellMixPool):
     """Same algorithm as CottrellMixPool, but instead of sending N messages
        from the pool of size P, sends each message with probability N/P."""
-