[Author Prev][Author Next][Thread Prev][Thread Next][Author Index][Thread Index]

[minion-cvs] Flushing patches from my laptop)



Update of /home/minion/cvsroot/src/minion/lib/mixminion/server
In directory moria.mit.edu:/tmp/cvs-serv4734/lib/mixminion/server

Modified Files:
	EventStats.py HashLog.py MMTPServer.py Modules.py 
	PacketHandler.py ServerConfig.py ServerKeys.py ServerMain.py 
	ServerQueue.py 
Log Message:
(Flushing patches from my laptop)

BuildMessage, ClientMain:
	- Improve comments and documentation

Common:
	- Repair file mermissions better
	- Don't assume all filesystems have the same block size

MMTPClient, MMTPServer:
	- Increment protocol version; drop 0.1 and 0.2 support.

Packet:
	- Drop obsolete packet format

test:
	- Tests for rejected packets
	- Tests for EventStats
	
EventStats:
	- Debug; make rotation explicit

Hashlog:
	- Don't die when a half-created hashlog is found.
	- Sync on startup

Modules:
	- Don't leak zombie mixmaster processes when flushing

PacketHandler:
	- Change key rotation semantics from add/remove to set.

ServerConfig:
	- Upgrade comments from ???? to ????004

ServerKeys
	- Fix hashlog deletion

ServerMain:
	- Refactor scheduler into a separate class.
	- Drop umask in all cases, even when not daemon.

ServerQueue:
	- Drop some legacy code










Index: EventStats.py
===================================================================
RCS file: /home/minion/cvsroot/src/minion/lib/mixminion/server/EventStats.py,v
retrieving revision 1.2
retrieving revision 1.3
diff -u -d -r1.2 -r1.3
--- EventStats.py	26 Apr 2003 14:39:59 -0000	1.2
+++ EventStats.py	5 May 2003 00:38:46 -0000	1.3
@@ -13,7 +13,7 @@
 from time import time
 
 from mixminion.Common import formatTime, LOG, previousMidnight, floorDiv, \
-     createPrivateDir
+     createPrivateDir, MixError
 
 # _EVENTS: a list of all recognized event types. 
 _EVENTS = [ 'ReceivedPacket',
@@ -32,6 +32,12 @@
     def save(self, now=None):
         """Flushes this eventlog to disk."""
         pass
+    def rotate(self, now=None):
+        """DOCDOC"""
+        pass
+    def getNextRotation(self):
+        """DOCDOC"""
+        return 0
     def _log(self, event, arg=None):
         """Notes that an event has occurred.
            event -- the type of event to note
@@ -118,7 +124,7 @@
            periodically writes to 'historyFile' every 'interval' seconds."""
         NilEventLog.__init__(self)
         if os.path.exists(filename):
-            # XXXX If this doesn't work, then we should 
+            # XXXX If this doesn't work, then we should ????004
             f = open(filename, 'rb')
             self.__dict__.update(cPickle.load(f))
             f.close()
@@ -155,8 +161,6 @@
            to invoke."""
         LOG.debug("Syncing statistics to disk")
         if not now: now = time()
-        if now > self.nextRotation:
-            self._rotate()
         tmpfile = self.filename + "_tmp"
         try:
             os.unlink(tmpfile)
@@ -176,9 +180,6 @@
     def _log(self, event, arg=None):
         try:
             self._lock.acquire()
-            if time() > self.nextRotation:
-                self._rotate()
-                self._save()
             try:
                 self.count[event][arg] += 1
             except KeyError:
@@ -189,31 +190,46 @@
         finally:
             self._lock.release()
 
+    def getNextRotation(self):
+        return self.nextRotation
+
+    def rotate(self,now=None):
+        if now is None: now = time()
+        if now < self.nextRotation:
+            raise MixError("Not ready to rotate event stats")
+        try:
+            self._lock.acquire()
+            self._rotate(now)
+        finally:
+            self._lock.release()
+
     def _rotate(self, now=None):
         """Flush all events since the last rotation to the history file,
            and clears the current event log."""
         
         # Must hold lock
         LOG.debug("Flushing statistics log")
-        if now is None: now = time() #????
+        if now is None: now = time()
             
         f = open(self.historyFilename, 'a')
-        self.dump(f)
+        self.dump(f, now)
         f.close()
 
-        self.accumulatedTime = 0
         self.count = {}
         for e in _EVENTS:
             self.count[e] = {}
-        self.lastRotation = self.nextRotation
-        self._setNextRotation()
+        self.lastRotation = now
+        self._save(now)
+        self.accumulatedTime = 0        
+        self._setNextRotation(now)
 
-    def dump(self, f):
+    def dump(self, f, now=None):
         """Write the current data to a file handle 'f'."""
+        if now is None: now = time()
         try:
             self._lock.acquire()
             startTime = self.lastRotation
-            endTime = time()
+            endTime = now
             print >>f, "========== From %s to %s:" % (formatTime(startTime,1),
                                                       formatTime(endTime,1))
             for event in _EVENTS:
@@ -240,8 +256,8 @@
         finally:
             self._lock.release()
 
-
     def _setNextRotation(self, now=None):
+        # DOCDOC
         # ???? Lock to 24-hour cycle
 
         # This is a little weird.  We won't save *until*:
@@ -253,18 +269,14 @@
         #  round to the hour, up to 5 minutes down and 55 up.
         if not now: now = time()
 
-        secToGo = max(0, self.rotateInterval * 0.75 - self.accumulatedTime)
+        accumulatedTime = self.accumulatedTime + (now - self.lastSave)
+        secToGo = max(0, self.rotateInterval * 0.75 - accumulatedTime)
         self.nextRotation = max(self.lastRotation + self.rotateInterval,
                                 now + secToGo)
 
-        if not self.lastRotation: self.lastRotation = now
-        if now - self.lastRotation <= self.rotateInterval * 1.2:
-            base = self.lastRotation
-        else:
-            base = now
+        if self.nextRotation < now:
+            self.nextRotation = now
 
-        self.nextRotation = base + self.rotateInterval
-        
         if (self.rotateInterval % 3600) == 0:
             mid = previousMidnight(self.nextRotation)
             rest = self.nextRotation - mid

Index: HashLog.py
===================================================================
RCS file: /home/minion/cvsroot/src/minion/lib/mixminion/server/HashLog.py,v
retrieving revision 1.9
retrieving revision 1.10
diff -u -d -r1.9 -r1.10
--- HashLog.py	26 Apr 2003 14:39:59 -0000	1.9
+++ HashLog.py	5 May 2003 00:38:46 -0000	1.10
@@ -8,6 +8,7 @@
 
 import binascii
 import os
+import stat
 import anydbm, dumbdbm
 import threading
 from mixminion.Common import MixFatalError, LOG, createPrivateDir
@@ -63,9 +64,18 @@
            'keyid'."""
         parent = os.path.split(filename)[0]
         createPrivateDir(parent)
-        # XXXX004 catch empty hashlog.
-        self.log = anydbm.open(filename, 'c')
+
+        # Catch empty logfiles: these can be created if we exit before
+        # syncing the log for the first time.
+        try:
+            if os.stat(filename)[stat.ST_SIZE] == 0:
+                LOG.warn("Half-created database %s found; cleaning up.")
+                os.unlink(filename)
+        except os.error:
+            pass
+
         LOG.debug("Opening database %s for packet digests", filename)
+        self.log = anydbm.open(filename, 'c')
         if isinstance(self.log, dumbdbm._Database):
             LOG.warn("Warning: logging packet digests to a flat file.")
         try:
@@ -73,7 +83,9 @@
                 raise MixFatalError("Log KEYID does not match current KEYID")
         except KeyError:
             self.log["KEYID"] = keyid
+            if hasattr(self.log, 'sync'): self.log.sync()
 
+        # Scan the journal file
         self.journalFileName = filename+"_jrnl"
         self.journal = {}
         if os.path.exists(self.journalFileName):
@@ -85,7 +97,11 @@
 
         self.journalFile = os.open(self.journalFileName,
                     _JOURNAL_OPEN_FLAGS|os.O_APPEND, 0600)
+
         self.__lock = threading.RLock()
+
+        # On startup, we flush everything to disk.
+        self.sync()
 
     def seenHash(self, hash):
         """Return true iff 'hash' has been logged before."""

Index: MMTPServer.py
===================================================================
RCS file: /home/minion/cvsroot/src/minion/lib/mixminion/server/MMTPServer.py,v
retrieving revision 1.25
retrieving revision 1.26
diff -u -d -r1.25 -r1.26
--- MMTPServer.py	26 Apr 2003 14:39:59 -0000	1.25
+++ MMTPServer.py	5 May 2003 00:38:46 -0000	1.26
@@ -8,9 +8,7 @@
    of the protocol.
 
    If you just want to send messages into the system, use MMTPClient.
-
-   FFFF As yet unsupported are: Session resumption, key renegotiation,
-   FFFF: Also unsupported: timeouts."""
+   """
 
 # NOTE FOR THE CURIOUS: The 'asyncore' module in the standard library
 #    is another general select/poll wrapper... so why are we using our
@@ -133,10 +131,12 @@
         if r: del self.readers[fd]
         if w: del self.writers[fd]
 
-    def tryTimeout(self, now):
+    def tryTimeout(self, now=None):
         """Timeout any connection that is too old."""
         if self._timeout is None:
             return
+        if now is None:
+            now = time.time()
         # All connections older than 'cutoff' get purged.
         cutoff = now - self._timeout
         # Maintain a set of filenos for connections we've checked, so we don't
@@ -501,7 +501,7 @@
 # Implementation for MMTP.
 
 # The protocol string to send.
-PROTOCOL_STRING      = "MMTP 0.1,0.2\r\n"
+PROTOCOL_STRING      = "MMTP 0.3\r\n"
 # The protocol specification to expect.
 PROTOCOL_RE          = re.compile("MMTP ([^\s\r\n]+)\r\n")
 # Control line for sending a message.
@@ -528,17 +528,19 @@
     #     if negotiation hasn't completed.
     # PROTOCOL_VERSIONS: (static) a list of protocol versions we allow,
     #     in decreasing order of preference.
-    PROTOCOL_VERSIONS = [ '0.2', '0.1' ]
-    def __init__(self, sock, tls, consumer):
+    PROTOCOL_VERSIONS = [ '0.3' ]
+    def __init__(self, sock, tls, consumer, rejectPackets=0):
         """Create an MMTP connection to receive messages sent along a given
            socket.  When valid packets are received, pass them to the
            function 'consumer'."""
         SimpleTLSConnection.__init__(self, sock, tls, 1,
                                      "%s:%s"%sock.getpeername())
         self.messageConsumer = consumer
-        self.junkCallback = lambda : None
+        self.junkCallback = lambda : None #DOCDOC
+        self.rejectCallback = lambda : None #DOCDOC
         self.finished = self.__setupFinished
         self.protocol = None
+        self.rejectPackets = rejectPackets #DOCDOC
 
     def __setupFinished(self):
         """Called once we're done accepting.  Begins reading the protocol
@@ -593,10 +595,16 @@
         if data.startswith(JUNK_CONTROL):
             expectedDigest = sha1(msg+"JUNK")
             replyDigest = sha1(msg+"RECEIVED JUNK")
+            replyControl = RECEIVED_CONTROL
             isJunk = 1
         elif data.startswith(SEND_CONTROL):
             expectedDigest = sha1(msg+"SEND")
-            replyDigest = sha1(msg+"RECEIVED")
+            if self.rejectPackets:
+                replyDigest = sha1(msg+"REJECTED")
+                replyControl = REJECTED_CONTROL                
+            else:
+                replyDigest = sha1(msg+"RECEIVED")
+                replyControl = RECEIVED_CONTROL
             isJunk = 0
         else:
             warn("Unrecognized command from %s.  Closing connection.",
@@ -611,10 +619,12 @@
         else:
             debug("%s packet received from %s; Checksum valid.",
                   data[:4], self.address)
-            self.finished = self.__sentAck
-            self.beginWrite(RECEIVED_CONTROL+replyDigest)
+            self.finished = self.__sentAck            
+            self.beginWrite(replyControl+replyDigest)
             if isJunk:
                 self.junkCallback()
+            elif self.rejectPackets:
+                self.rejectCallback()
             else:
                 self.messageConsumer(msg)
 
@@ -622,7 +632,6 @@
         """Called once we're done sending an ACK.  Begins reading a new
            message."""
         debug("Send ACK for message from %s (fd %s)", self.address, self.fd)
-        #FFFF Rehandshake
         self.finished = self.__receivedMessage
         self.expectRead(SEND_RECORD_LEN)
 
@@ -630,7 +639,6 @@
 
 NULL_KEYID = "\x00"*20
 
-# FFFF We need to note retriable situations better.
 class MMTPClientConnection(SimpleTLSConnection):
     """Asynchronious implementation of the sending ("client") side of a
        mixminion connection."""
@@ -646,7 +654,7 @@
     #     if negotiation hasn't completed.
     # PROTOCOL_VERSIONS: (static) a list of protocol versions we allow,
     #     in the order we offer them.
-    PROTOCOL_VERSIONS = [ '0.1', '0.2' ]
+    PROTOCOL_VERSIONS = [ '0.3' ]
     def __init__(self, context, ip, port, keyID, messageList, handleList,
                  sentCallback=None, failCallback=None, finishedCallback=None,
                  certCache=None):
@@ -668,7 +676,7 @@
               DOCDOC certcache,finishedCallback"""
 
         # Generate junk before connecting to avoid timing attacks
-        self.junk = [] #XXXX doc this field.
+        self.junk = [] #DOCDOC doc this field.
         for m in messageList:
             if m == 'JUNK':
                 self.junk.append(getCommonPRNG().getBytes(MESSAGE_LEN))
@@ -773,10 +781,6 @@
             del self.messageList[0]
             msg = self.junk[0]
             del self.junk[0]
-            if self.protocol == '0.1':
-                debug("Won't send junk to a 0.1 server.")
-                self.beginNextMessage()
-                return
             self.expectedDigest = sha1(msg+"RECEIVED JUNK")
             self.rejectDigest = sha1(msg+"REJECTED") 
             msg = JUNK_CONTROL+msg+sha1(msg+"JUNK")
@@ -809,7 +813,6 @@
           Otherwise, begins shutting down.
        """
        trace("received ack (fd %s)", self.fd)
-       # FFFF Rehandshake
        inp = self.getInput()
        rejected = 0
        if inp == REJECTED_CONTROL+self.rejectDigest:
@@ -863,7 +866,7 @@
 
         self.context = tls
         # FFFF Don't always listen; don't always retransmit!
-        # FFFF Support listening on specific IPs
+        # FFFF Support listening on multiple IPs
 
         if config['Incoming/MMTP'].get('ListenIP',None) is not None:
             IP = config['Incoming/MMTP']['ListenIP']
@@ -889,9 +892,11 @@
            Used to rotate keys."""
         self.context = context
 
-    def getNextTimeoutTime(self, now):
+    def getNextTimeoutTime(self, now=None):
         """Return the time at which we next purge connections, if we have
            last done so at time 'now'."""
+        if now is None:
+            now = time.time()
         return now + self._timeout
 
     def _newMMTPConnection(self, sock):
@@ -958,4 +963,3 @@
 
     def onMessageSent(self, msg, handle):
         pass
-

Index: Modules.py
===================================================================
RCS file: /home/minion/cvsroot/src/minion/lib/mixminion/server/Modules.py,v
retrieving revision 1.35
retrieving revision 1.36
diff -u -d -r1.35 -r1.36
--- Modules.py	26 Apr 2003 14:39:59 -0000	1.35
+++ Modules.py	5 May 2003 00:38:46 -0000	1.36
@@ -34,7 +34,7 @@
 from mixminion.Config import ConfigError, _parseBoolean, _parseCommand, \
      _parseIntervalList
 from mixminion.Common import LOG, createPrivateDir, MixError, isSMTPMailbox, \
-     isPrintingAscii
+     isPrintingAscii, waitForChildren
 from mixminion.Packet import ParseError, CompressedDataTooLong
 
 # Return values for processMessage
@@ -225,6 +225,7 @@
                     LOG.info("Delivery thread shutting down.")
                     return
                 self.moduleManager._sendReadyMessages()
+                waitForChildren(blocking=0)
         except:
             LOG.error_exc(sys.exc_info(),
                           "Exception in delivery; shutting down thread.")
@@ -945,7 +946,7 @@
            should be called after invocations of processMessage."""
         cmd = self.command
         LOG.debug("Flushing Mixmaster pool")
-        os.spawnl(os.P_NOWAIT, cmd, cmd, "-S")
+        os.spawnl(os.P_WAIT, cmd, cmd, "-S")
 
 class _MixmasterSMTPModuleDeliveryQueue(SimpleModuleDeliveryQueue):
     """Delivery queue for _MixmasterSMTPModule.  Same as

Index: PacketHandler.py
===================================================================
RCS file: /home/minion/cvsroot/src/minion/lib/mixminion/server/PacketHandler.py,v
retrieving revision 1.14
retrieving revision 1.15
diff -u -d -r1.14 -r1.15
--- PacketHandler.py	26 Apr 2003 14:39:59 -0000	1.14
+++ PacketHandler.py	5 May 2003 00:38:46 -0000	1.15
@@ -12,6 +12,7 @@
 import mixminion.Common as Common
 import mixminion.BuildMessage
 
+from mixminion.ServerInfo import PACKET_KEY_BYTES
 from mixminion.Common import MixError, isPrintingAscii
 
 __all__ = [ 'PacketHandler', 'ContentError', 'DeliveryPacket', 'RelayedPacket']
@@ -27,8 +28,9 @@
        to drop the message, relay the message, or send the message to
        an exit handler."""
     ## Fields:
-    # privatekey: list of RSA private keys that we accept
-    # hashlog: list of HashLog objects corresponding to the keys.
+    # privatekeys: a list of 2-tuples of
+    #      (1) a RSA private key that we accept
+    #      (2) a HashLog objects corresponding to the given key
     def __init__(self, privatekey, hashlog):
         """Constructs a new packet handler, given a private key object for
            header encryption, and a hashlog object to prevent replays.
@@ -38,57 +40,52 @@
            though: PK decryption is expensive.  Also, a hashlog must be
            provided for each private key.
         """
+        self.privatekeys = []
+        self.lock = threading.Lock()
+        
         try:
-            # Check whether we have a key or a sequence of keys.
             _ = privatekey[0]
-            assert len(hashlog) == len(privatekey)
-
-            self.privatekey = privatekey
-            self.hashlog = hashlog
+            self.setKeys(privatekey, hashlog)
         except TypeError:
-            # Privatekey is not be subscriptable; we must have only one.
-            self.privatekey = [privatekey]
-            self.hashlog = [hashlog]
-
-        self.lock = threading.Lock()
-
-    def addKey(self, key, hashlog):
-        """DOCDOC"""
-        self.lock.acquire()
-        self.privatekey.append(key)
-        self.hashlog.append(hashlog)
-        self.lock.release()
+            self.setKeys([privatekey], [hashlog])
 
-    def removeKey(self, key):
+    def setKeys(self, keys, hashlogs):
         """DOCDOC"""
         self.lock.acquire()
+        newKeys = {}
         try:
-            enc = key.encode_key(1)
-            for i in range(len(self.privatekey)):
-                k = self.privatekey[i]
-                if k.enc(1) == enc:
-                    del self.privatekey[i]
-                    hlog = self.hashlog[i]
-                    del self.hashlog[i]
-                    hlog.close()
-                    return
-            raise KeyError
+            # Build a set of asn.1-encoded public keys in *new* set.
+            for k in keys:
+                newKeys[k.encode_key(1)] = 1
+                if k.get_modulus_bytes() != PACKET_KEY_BYTES:
+                    raise Common.MixFatalError("Incorrect packet key length")
+            # For all old public keys, if they aren't in the new set, close
+            # their hashlogs.
+            for k, h in self.privatekeys:
+                if not newKeys.get(k.encode_key(1)):
+                    h.close()
+            # Now, set the keys.
+            self.privatekeys = zip(keys, hashlogs)
         finally:
             self.lock.release()
-            
+
     def syncLogs(self):
         """Sync all this PacketHandler's hashlogs."""
-        self.lock.acquire()
-        for h in self.hashlog:
-            h.sync()
-        self.lock.release()
+        try:
+            self.lock.acquire()
+            for _, h in self.privatekeys:
+                h.sync()
+        finally:
+            self.lock.release()
 
     def close(self):
         """Close all this PacketHandler's hashlogs."""
-        self.lock.acquire()
-        for h in self.hashlog:
-            h.close()
-        self.lock.release()
+        try:
+            self.lock.acquire()
+            for _, h in self.privatekeys:
+                h.close()
+        finally:        
+            self.lock.release()
 
     def processMessage(self, msg):
         """Given a 32K mixminion message, processes it completely.
@@ -119,7 +116,7 @@
         e = None
         self.lock.acquire()
         try:
-            for pk, hashlog in zip(self.privatekey, self.hashlog):
+            for pk, hashlog in self.privatekeys:
                 try:
                     subh = Crypto.pk_decrypt(encSubh, pk)
                     break

Index: ServerConfig.py
===================================================================
RCS file: /home/minion/cvsroot/src/minion/lib/mixminion/server/ServerConfig.py,v
retrieving revision 1.22
retrieving revision 1.23
diff -u -d -r1.22 -r1.23
--- ServerConfig.py	26 Apr 2003 14:39:59 -0000	1.22
+++ ServerConfig.py	5 May 2003 00:38:46 -0000	1.23
@@ -128,7 +128,7 @@
             reasons.append("StatsInterval is too short")
         if not server["EncryptIdentityKey"]:
             reasons.append("Identity key is not encrypted")
-        # ???? Pkey lifetime, sloppiness? 
+        # ????004 Pkey lifetime, sloppiness? 
         if server["MixAlgorithm"] not in _SECURE_MIX_RULES:
             reasons.append("Mix algorithm is not secure")
         else:
@@ -138,17 +138,17 @@
         if server["MixInterval"][2] < 30*60:
             reasons.append("Mix interval under 30 minutes")
         
-        # ???? DIRSERVERS?
+        # ????004 DIRSERVERS?
 
-        # ???? Incoming/MMTP
+        # ????004 Incoming/MMTP
 
-        # ???? Outgoing/MMTP
+        # ????004 Outgoing/MMTP
 
-        # ???? Modules?
+        # ????004 Modules?
 
 def _validateRetrySchedule(mixInterval, entries, sectionname,
                            entryname='Retry'):
-    #XXXX writeme.
+    """DOCDOC"""
     entry = [e for e in entries.get(sectionname,[]) if e[0] == entryname]
     if not entry:
         return

Index: ServerKeys.py
===================================================================
RCS file: /home/minion/cvsroot/src/minion/lib/mixminion/server/ServerKeys.py,v
retrieving revision 1.19
retrieving revision 1.20
diff -u -d -r1.19 -r1.20
--- ServerKeys.py	26 Apr 2003 14:39:59 -0000	1.19
+++ ServerKeys.py	5 May 2003 00:38:46 -0000	1.20
@@ -252,7 +252,7 @@
             files = [ os.path.join(dirname,f)
                       for f in os.listdir(dirname) ]
             hashFiles = [ os.path.join(self.hashDir, "hash_"+name) ,
-                          os.path.join(self.hashDir, "hash_"+name+"jrnl") ]
+                          os.path.join(self.hashDir, "hash_"+name+"_jrnl") ]
             files += [ f for f in hashFiles if os.path.exists(f) ]
             secureDelete(files, blocking=1)
             os.rmdir(dirname)

Index: ServerMain.py
===================================================================
RCS file: /home/minion/cvsroot/src/minion/lib/mixminion/server/ServerMain.py,v
retrieving revision 1.50
retrieving revision 1.51
diff -u -d -r1.50 -r1.51
--- ServerMain.py	26 Apr 2003 14:39:59 -0000	1.50
+++ ServerMain.py	5 May 2003 00:38:46 -0000	1.51
@@ -17,6 +17,7 @@
 import string
 import time
 import threading
+from types import *
 # We pull this from mixminion.Common, just in case somebody still has
 # a copy of the old "mixminion/server/Queue.py" (since renamed to
 # ServerQueue.py)
@@ -179,9 +180,9 @@
         
         for h in handles:
             packet = self.queue.getObject(h)
-            #XXXX004 remove the first case after 0.0.3
             if type(packet) == type(()):
-                LOG.debug("  (skipping message %s in obsolete format)", h)
+                #XXXX005 remove this case.
+                LOG.error("  (skipping message %s in obsolete format)", h)
             elif packet.isDelivery():
                 LOG.debug("  (sending message %s to exit modules)",
                           formatBase64(packet.getContents()[:8]))
@@ -274,7 +275,8 @@
 
     def onMessageReceived(self, msg):
         self.incomingQueue.queueMessage(msg)
-        EventStats.receivedPacket("ReceivedPacket", None) # XXXX Replace with server.
+        # XXXX Replace with server.
+        EventStats.log.receivedPacket()
 
     def onMessageSent(self, msg, handle):
         self.outgoingQueue.deliverySucceeded(handle)
@@ -399,7 +401,56 @@
 
 #----------------------------------------------------------------------
 
-class MixminionServer:
+class _Scheduler:
+    """Mixin class for server.  Implements DOCDOC"""
+    # DOCDOC
+    def __init__(self):
+        self.scheduledEvents = []
+
+    def firstEventTime(self):
+        if self.scheduledEvents:
+            return self.scheduledEvents[0][0]
+        else:
+            return -1
+
+    def scheduleOnce(self, when, name, cb):
+        assert type(name) is StringType
+        assert type(when) in (IntType, LongType, FloatType)
+        insort(self.scheduledEvents, (when, name, cb))
+
+    def scheduleRecurring(self, first, interval, name, cb):
+        assert type(name) is StringType
+        assert type(first) in (IntType, LongType, FloatType)
+        assert type(interval) in (IntType, LongType, FloatType)
+        next = lambda t=time.time,i=interval: t()+i
+        insort(self.scheduledEvents, (first, name,
+                                      _RecurringEvent(name, cb, self, next)))
+
+    def scheduleRecurringComplex(self, first, name, cb, nextFn):
+        assert type(name) is StringType
+        assert type(first) in (IntType, LongType, FloatType)
+        insort(self.scheduledEvents, (first, name,
+                                      _RecurringEvent(name, cb, self, nextFn)))
+
+    def processEvents(self, now=None):
+        if now is None: now = time.time()
+        se = self.scheduledEvents
+        while se and se[0][0] <= now:
+            cb = se[0][2]
+            del se[0]
+            cb()
+
+class _RecurringEvent:
+    def __init__(self, name, cb, scheduler, nextFn):
+        self.name = name
+        self.cb = cb
+        self.scheduler = scheduler
+        self.nextFn = nextFn
+    def __call__(self):
+        self.cb()
+        self.scheduler.scheduleOnce(self.nextFn(), self.name, self)
+
+class MixminionServer(_Scheduler):
     """Wraps and drives all the queues, and the async net server.  Handles
        all timed events."""
     ## Fields:
@@ -428,6 +479,7 @@
     # pidFile: Filename in which we store the pid of the running server.
     def __init__(self, config):
         """Create a new server from a ServerConfig."""
+        _Scheduler.__init__(self)
         LOG.debug("Initializing server")
 
         self.config = config
@@ -519,19 +571,28 @@
 
         self.cleanQueues()
 
-        # List of (eventTime, eventName) tuples.  Current names are:
-        #  'MIX', 'SHRED', and 'TIMEOUT'.  Kept in sorted order.
-        scheduledEvents = []
         now = time.time()
+        self.scheduleRecurring(now+600, 600, "SHRED", self.cleanQueues)
+        self.scheduleRecurring(now+180, 180, "WAIT",
+                               lambda: waitForChildren(blocking=0))
+        if EventStats.log.getNextRotation():#XXXX!!!!
+            self.scheduleRecurring(now+300, 300, "ES_SAVE",
+                                   EventStats.log.save)
+            self.scheduleRecurringComplex(EventStats.log.getNextRotation(),
+                                          "ES_ROTATE",
+                                          EventStats.log.rotate,
+                                          EventStats.log.getNextRotation)
+
+        self.scheduleRecurringComplex(self.mmtpServer.getNextTimeoutTime(now),
+                                      "TIMEOUT",
+                                      self.mmtpServer.tryTimeout,
+                                      self.mmtpServer.getNextTimeoutTime)
+
 
-        scheduledEvents.append( (now + 600, "SHRED") )#FFFF make configurable
-        scheduledEvents.append( (now + 180, "WAIT") )#FFFF make configurable
-        scheduledEvents.append( (self.mmtpServer.getNextTimeoutTime(now),
-                                 "TIMEOUT") )
         nextMix = self.mixPool.getNextMixTime(now)
-        scheduledEvents.append( (nextMix, "MIX") )
         LOG.debug("First mix at %s", formatTime(nextMix,1))
-        scheduledEvents.sort()
+        self.scheduleOnce(self.mixPool.getNextMixTime(now),
+                          "MIX", self.doMix)
 
         LOG.info("Entering main loop: Mixminion %s", mixminion.__version__)
 
@@ -543,7 +604,7 @@
 
         # FFFF Support for automatic key rotation.
         while 1:
-            nextEventTime = scheduledEvents[0][0]
+            nextEventTime = self.firstEventTime()
             now = time.time()
             timeLeft = nextEventTime - now
             while timeLeft > 0:
@@ -570,54 +631,35 @@
                 now = time.time()
                 timeLeft = nextEventTime - now
 
-            # It's time for an event.
-            event = scheduledEvents[0][1]
-            del scheduledEvents[0]
+            # An event has fired.
+            self.processEvents()
 
-            if event == 'TIMEOUT':
-                LOG.trace("Timing out old connections")
-                self.mmtpServer.tryTimeout(now)
-                insort(scheduledEvents,
-                       (self.mmtpServer.getNextTimeoutTime(now), "TIMEOUT"))
-            elif event == 'SHRED':
-                self.cleanQueues()
-                insort(scheduledEvents, (now + 600, "SHRED"))
-            elif event == 'WAIT':
-                # Every few minutes, we reap zombies.  Why, you ask?  Isn't
-                # catching sigchild enough?  Nope -- sometimes buggy delivery
-                # software forks, stays along long enough to ignore a child's
-                # SIGCHLD, then dies itself.  Or something -- in any case, we
-                # sure seem to be leaving a bunch of zombie mixmaster processes
-                # around.  This should fix it.
-                waitForChildren(blocking=0)
-                insort(scheduledEvents, (now + 180, "WAIT"))
-            elif event == 'MIX':
-                # Before we mix, we need to log the hashes to avoid replays.
-                try:
-                    # There's a potential threading problem here... in
-                    # between this sync and the 'mix' below, nobody should
-                    # insert into the mix pool.
-                    self.mixPool.lock()
-                    self.packetHandler.syncLogs()
+    def doMix(self):
+        now = time.time()
+        # Before we mix, we need to log the hashes to avoid replays.
+        try:
+            # There's a potential threading problem here... in
+            # between this sync and the 'mix' below, nobody should
+            # insert into the mix pool.
+            self.mixPool.lock()
+            self.packetHandler.syncLogs()
 
-                    LOG.trace("Mix interval elapsed")
-                    # Choose a set of outgoing messages; put them in
-                    # outgoingqueue and modulemanager
-                    self.mixPool.mix()
-                finally:
-                    self.mixPool.unlock()
-                    
-                # Send outgoing messages
-                self.outgoingQueue.sendReadyMessages()
-                # Send exit messages
-                self.moduleManager.sendReadyMessages()
+            LOG.trace("Mix interval elapsed")
+            # Choose a set of outgoing messages; put them in
+            # outgoingqueue and modulemanager
+            self.mixPool.mix()
+        finally:
+            self.mixPool.unlock()
 
-                # Choose next mix interval
-                nextMix = self.mixPool.getNextMixTime(now)
-                insort(scheduledEvents, (nextMix, "MIX"))
-                LOG.trace("Next mix at %s", formatTime(nextMix,1))
-            else:
-                assert event in ("MIX", "SHRED", "TIMEOUT", "WAIT")
+        # Send outgoing messages
+        self.outgoingQueue.sendReadyMessages()
+        # Send exit messages
+        self.moduleManager.sendReadyMessages()
+
+        # Choose next mix interval
+        nextMix = self.mixPool.getNextMixTime(now)
+        self.scheduleOnce(nextMix, "MIX", self.doMix)
+        LOG.trace("Next mix at %s", formatTime(nextMix,1))
 
     def cleanQueues(self):
         """Remove all deleted messages from queues"""
@@ -769,6 +811,8 @@
             LOG.fatal_exc(sys.exc_info(),
                           "Exception while starting server in the background")
             os._exit(0)
+    else:
+        os.umask(0000)
 
     # Configure event log
     try:

Index: ServerQueue.py
===================================================================
RCS file: /home/minion/cvsroot/src/minion/lib/mixminion/server/ServerQueue.py,v
retrieving revision 1.12
retrieving revision 1.13
diff -u -d -r1.12 -r1.13
--- ServerQueue.py	27 Mar 2003 10:31:00 -0000	1.12
+++ ServerQueue.py	5 May 2003 00:38:46 -0000	1.13
@@ -319,14 +319,13 @@
     #           currently sending.
     #    retrySchedule: a list of intervals at which delivery of messages
     #           should be reattempted, as described in "setRetrySchedule".
-
     def __init__(self, location, retrySchedule=None):
         Queue.__init__(self, location, create=1, scrub=1)
         self._rescan()
         if retrySchedule is None:
             self.retrySchedule = None
         else:
-            self.retrySchedule = retrySchedule[:]
+            self.setRetrySchedule(retrySchedule)
 
     def setRetrySchedule(self, schedule):
         """Set the retry schedule for this queue.  A retry schedule is
@@ -379,8 +378,6 @@
         """Returns a (n_retries, msg, nextAttempt) tuple for a given
            message handle."""
         o = self.getObject(handle)
-        if len(o) == 3:# XXXX004 For legacy queues; delete after 0.0.3
-            o = o + (0,)
         return o[0], o[2], o[3]
 
     def sendReadyMessages(self, now=None):
@@ -481,7 +478,9 @@
                     if retries <= len(self.retrySchedule):
                         self.queueDeliveryMessage(msg, retries, nextAttempt)
                 elif not self.retrySchedule:
-                    #LEGACY XXXX004
+                    #XXXX005: Make sure this error never occurs.
+                    LOG.error(
+                        "ServerQueue.deliveryFailed without retrySchedule")
                     retries += 1
                     nextAttempt = 0
                     if retries < 10: