[Author Prev][Author Next][Thread Prev][Thread Next][Author Index][Thread Index]

[minion-cvs] First batch of pinger implementation; new scheduler; ne...



Update of /home/minion/cvsroot/src/minion/lib/mixminion/server
In directory moria.mit.edu:/tmp/cvs-serv19854/server

Modified Files:
	MMTPServer.py ServerConfig.py ServerKeys.py ServerMain.py 
Added Files:
	Pinger.py 
Log Message:
First batch of pinger implementation; new scheduler; new cert rotation policy

Pinger.py:
 - Code to send one-hop pings to measure one-node reliability
 - Initial code to send two-hop pings (not finished; not used.)
 - Code to try sending link padding to servers in order to measure their
   uptime.
 - Code to store record of ping events, and rotate out old records.
 - Code to parse entries from record of ping events.
 - Code to configure ping generators.

 Still missing: code to analyze ping events; code to report results of
 analysis; code to feed status of analysis back into pinger; more tests;
 more docs.

MMTPServer.py:
 - Support link padding on outgoing connections from servers
 - Subclass MMTPClientConnection to tell PingLog about failed and successful
   connections.
 - Hook pinglogs into MMTPClientConnection.

ServerConfig.py:
 - Add an option to turn on pinging

ServerKeys.py, ServerMain.py:
 - Decouple link key rotation from packet key rotation; there's no need
   to keep link keys alive for a long time, or even save them to disk.

ServerKeys.py:
 - Add function to return current descriptor.  How did we get on so
   long without it?  Anyway, the pinger needs it.

ServerMain:
 - IncomingQueue sends decoded ping packets to the PingLog.
 - OutgoingQueue asks the PingGenerator to add link padding as needed.
 - Configure and enable pinger as required.

 - Completely re-work scheduler, for three reasons:
    - The old scheduler was not very OO, and worked very circuitously.
      (I could no longer convince myself it was correct.)
    - The old scheduler wasn't very good at handling regular events
      that needed to be run in the background, and that couldn't be
      rescheduled until they were done running.
    - The old scheduler used an always-sorted-list structure to make
      lookup-next-event O(1) while making schedule-an-event O(N).  But
      neither of these is in the critical path, and maintaining the
      list in order was hazardous.






Index: MMTPServer.py
===================================================================
RCS file: /home/minion/cvsroot/src/minion/lib/mixminion/server/MMTPServer.py,v
retrieving revision 1.84
retrieving revision 1.85
diff -u -d -r1.84 -r1.85
--- MMTPServer.py	23 Mar 2004 00:09:24 -0000	1.84
+++ MMTPServer.py	27 Jul 2004 04:33:20 -0000	1.85
@@ -41,8 +41,7 @@
 from mixminion.Filestore import CorruptedFile
 from mixminion.ThreadUtils import MessageQueue, QueueEmpty
 
-__all__ = [ 'AsyncServer', 'ListenConnection', 'MMTPServerConnection',
-            'MMTPClientConnection' ]
+__all__ = [ 'AsyncServer', 'ListenConnection', 'MMTPServerConnection' ]
 
 class SelectAsyncServer:
     """AsyncServer is the core of a general-purpose asynchronous
@@ -499,6 +498,32 @@
     def isJunk(self):
         return 0
 
+class LinkPadding(mixminion.MMTPClient.DeliverableMessage):
+    """DOCDOC"""
+    def __init__(self):
+        self.contents = getCommonPRNG().getBytes(1<<15)
+    def succeeded(self): pass
+    def failed(self,retriable=0): pass
+    def getContents(self): return self.contents
+    def isJunk(self): return 1
+
+class _ClientCon(MMTPClientConnection):
+    """DOCDOC"""
+    def configurePingLog(self, pingLog, nickname):
+        self._pingLog = pingLog
+        self._nickname = nickname
+        self._wasOnceConnected = 0
+    def onProtocolRead(self):
+        MMTPClientConnection.onProtocolRead(self)
+        if self._isConnected:
+            if self._pingLog:
+                self._pingLog.connected(self._nickname)
+            self._wasOnceConnected = 1
+    def _failPendingPackets(self):
+        if not self._wasOnceConnected and self._pingLog:
+            self._pingLog.connectFailed(self._nickname)
+        MMTPClientConnection._failPendingPackets(self)
+
 LISTEN_BACKLOG = 128
 class MMTPAsyncServer(AsyncServer):
     """A helper class to invoke AsyncServer, MMTPServerConnection, and
@@ -574,6 +599,7 @@
         self.dnsCache = None
         self.msgQueue = MessageQueue()
         self.pendingPackets = []
+        self.pingLog = None
 
     def connectDNSCache(self, dnsCache):
         """Use the DNSCache object 'DNSCache' to resolve DNS queries for
@@ -581,6 +607,9 @@
         """
         self.dnsCache = dnsCache
 
+    def connectPingLog(self, pingLog):
+        self.pingLog = pingLog
+
     def setServerContext(self, servercontext):
         """Change the TLS context used for newly received connections.
            Used to rotate keys."""
@@ -723,9 +752,13 @@
             # There isn't any connection to the right server. Open one...
             addr = (ip, port, keyID)
             finished = lambda addr=addr, self=self: self.__clientFinished(addr)
-            con = MMTPClientConnection(
+            con = _ClientCon(
                 family, ip, port, keyID, serverName=serverName,
                 context=self.clientContext, certCache=self.certificateCache)
+            nickname = mixminion.ServerInfo.getNicknameByKeyID(keyID)
+            if nickname is None:
+                nickname = "<unknown>"
+            con.configurePingLog(self.pingLog, nickname)
             #con.allPacketsSent = finished #XXXX007 wrong!
             con.onClosed = finished
         except (socket.error, MixProtocolError), e:

--- NEW FILE: Pinger.py ---
# Copyright 2004 Nick Mathewson.  See LICENSE for licensing information.
# $Id: Pinger.py,v 1.1 2004/07/27 04:33:20 nickm Exp $

"""mixminion.server.Pinger

   Built-in network reliability tester (pinger) for Mixminion servers.

   Our pinger uses a three-part architecture.  First, we periodically
   consider adding testing packets to the outgoing batches.

   Second, we note the outcome of connection attempts; and the timing
   of our sending/receiving test packets.

   Third, we use the timing/uptime information in the second part
   above to try to infer how reliable the other nodes in the network
   are.
"""

import calendar
import os
import re
import struct
import threading
import time

import mixminion.BuildMessage
import mixminion.Crypto
import mixminion.Packet
import mixminion.ServerInfo
import mixminion.server.PacketHandler
import mixminion.server.MMTPServer

from mixminion.Common import ceilDiv, createPrivateDir, formatBase64, \
     formatFnameDate, formatTime, LOG, parseFnameDate, previousMidnight, \
     secureDelete

class PingLog:
    """DOCDOC
       stores record of pinger events
    """
    HISTORY_DAYS = 12
    HEARTBEAT_INTERVAL = 30*60

    def __init__(self, location):
        createPrivateDir(location)
        self.location = location
        self.file = None
        self.fname = None
        self.lock = threading.RLock()
        self.rotate()

    def rotate(self,now=None):
        self.lock.acquire()
        try:
            date = formatFnameDate(now)
            if self.file is not None:
                if self.fname.endswith(date):
                    # no need to rotate.
                    return
                self.rotated()
                self.close()
            self.fname = os.path.join(self.location, "events-"+date)
            self.file = open(self.fname, 'a')
        finally:
            self.lock.release()

    def close(self):
        self.lock.acquire()
        try:
            if self.file is not None:
                self.file.close()
                self.file = None
        finally:
            self.lock.release()

    def clean(self, now=None, deleteFn=None):
        if now is None:
            now = time.time()
        self.lock.acquire()
        try:
            self.rotate(now)
            bad = []
            cutoff = previousMidnight(now) - 24*60*60*(self.HISTORY_DAYS+1)
            for fn in os.listdir(self.location):
                if not fn.startswith("events-"):
                    continue
                try:
                    date = parseFnameDate(fn[7:])
                except ValueError:
                    LOG.warn("Bad events filename %r; removing", fn)
                    bad.append(os.path.join(self.location, fn))
                    continue
                if date < cutoff:
                    LOG.debug("Removing expired events file %s", fn)
                    bad.append(os.path.join(self.location, fn))
            if deleteFn:
                deleteFn(bad)
            else:
                secureDelete(bad, blocking=1)
        finally:
            self.lock.release()

    def flush(self):
        self.lock.acquire()
        try:
            if self.file is not None:
                self.file.flush()
        finally:
            self.lock.release()

    def _write(self,*msg):
        self.lock.acquire()
        try:
            now = time.time()
            m = "%s %s\n" % (formatTime(now)," ".join(msg))
            self.file.write(m)
            self.file.flush()
            # XXXX self.events.append((now, msg))
        finally:
            self.lock.release()

    def startup(self):
        self._write("STARTUP")

    def shutdown(self):
        self._write("SHUTDOWN")

    def rotated(self):
        self._write("ROTATE")

    def connected(self, nickname):
        self._write("CONNECTED",nickname)

    def connectFailed(self, nickname):
        self._write("CONNECT_FAILED",nickname)

    def queuedPing(self, hash, path):
        self._write("PING",formatBase64(hash),path)

    def gotPing(self, hash):
        self._write("GOT_PING",formatBase64(hash))

    def heartbeat(self):
        self._write("STILL_UP")

    def _getAllFilenames(self):
        fns = [ fn for fn in os.listdir(self.location) if
                fn.startswith("events-") ]
        fns.sort()
        return fns

    def processPing(self, packet):#instanceof DeliveryPacket with type==ping
        assert packet.getExitType() != mixminion.Packet.PING_TYPE
        addr = packet.getAddress()
        if len(addr) != mixminion.Crypto.DIGEST_LEN:
            LOG.warn("Ignoring malformed ping packet (exitInfo length %s)",
                     len(packet.address))
            return
        if addr != mixminion.Crypto.sha1(packet.payload):
            LOG.warn("Received ping packet with corrupt payload; ignoring")
            return
        LOG.debug("Received valid ping packet [%s]",formatBase64(addr))
        self.gotPing(addr)

def iteratePingLog(file, func):
    _EVENT_ARGS = {
        "STARTUP" : 0,
        "SHUTDOWN" : 0,
        "ROTATE" : 0,
        "CONNECTED" : 1,
        "CONNECT_FAILED" : 1,
        "PING" : 2,
        "GOT_PING" : 1,
        "STILL_UP" : 0 }
    _PAT = re.compile(
        r"^(\d{4})-(\d{2})-(\d{2}) (\d{2}):(\d{2}):(\d{2}) (\w+ ?.*)")

    if hasattr(file,"xreadlines"):
        readLines = file.xreadlines
    else:
        readLines = file.readlines

    events = []
    for line in readLines():
        if line.startswith("#"):
            return
        m = _PAT.match(line)
        if not m:
            continue
        gr = m.groups()
        # parse time, event; make sure right # of args.
        tm = calendar.timegm(*gr[:6])
        event = tuple(gr[6].split())
        if _EVENT_ARGS.get(event[0]) != len(event)-1:
            continue
        func(tm, event)

def readEventLog(file):
    result = []
    iterateEventLog(file, lambda tm, event: result.append((tm,event)))
    return result

class PingStatus:
    def __init__(self):
        self.serverStatus = {} #"U"/"D",as-of
        self.serverUptime = {}
        self.serverDowntime = {}
        self.pendingPings = {} #hash64->sent,path
        self.lastEvent = None
    def addEvent(self, tm, event):
        eType = event[0]
        if eType == 'PING':
            self.pendingPings[event[1]] = tm, event[2]
        elif eType == 'GOT_PING':
            try:
                tSent, path = self.pendingPings[event[1]]
            except KeyError:
                # we didn't send it, or can't remember sending it.
                pass
            else:
                self.pingDone(path, tSent, tm)
        elif eType == 'CONNECTED':
            try:
                s, tLast = self.serverStatus[event[1]]
            except KeyError:
                self.serverStatus[event[1]] = ('U', tm)
            else:
                if s == 'D':
                    try:
                        self.serverDowntime[event[1]] += tm-tLast
                    except KeyError:
                        self.serverDowntime[event[1]] = tm-tLast
                    serverStatus[event[1]] = ('U', tm)
        elif eType == 'CONNECT_FAILED':
            try:
                s, tLast = self.serverStatus[event[1]]
            except KeyError:
                self.serverStatus[event[1]] = ('D', tm)
            else:
                if s == 'U':
                    try:
                        self.serverDowntime[event[1]] += tm-tLast
                    except KeyError:
                        self.serverDowntime[event[1]] = tm-tLast
                    serverStatus[event[1]] = ('U', tm)
        elif eType == 'SHUTDOWN':
            self.diedAt(tm)
        elif eType == 'STARTUP':
            if self.lastEvent:
                self.diedAt(self.lastEvent[0])

        self.lastEvent = (tm, event)

    def pingDone(self, path, queuedAt, receivedAt):
        servers = path.split(",")
        if len(servers) == 1:
            self.oneHopPingDone(servers[0], queuedAt, receivedAt)
        elif len(servers) == 2:
            self.twoHopPingDone(servers, queuedAt, receivedAt)
        else:
            pass # never made now.

    def oneHopPingDone(self, nickname, queuedAt, receivedAt):
        pass


    def diedAt(self, diedAt):
        for nickname, (status, tLast) in self.serverStatus.items():
            if status == 'U':
                m = self.serverUptime
            else:
                m = self.serverDowntime
            try:
                m[nickname] += diedAt-tLast
            except KeyError:
                m[nickname] = diedAt-tLast

        self.serverStatus = {}
        self.lastEvent = None

    def getNetworkStatus(self):
        nicknames = {}
        for n in self.serverUptime.keys(): nicknames[n]=1
        for n in self.serverDowntime.keys(): nicknames[n]=1


class PingGenerator:
    """DOCDOC"""
    #XXXX008 add abstract functions.
    def __init__(self, config):
        self.directory = None
        self.pingLog = None
        self.outgoingQueue = None
        self.myNickname = config['Server']['Nickname']

    def connect(self, directory, outgoingQueue, pingLog, keyring):
        pass

    def getFirstPingTime(self):
        return None

    def scheduleAllPings(self, now=None):
        pass

    def sendPings(self, now=None):
        pass

    def addLinkPadding(self, pkts):
        pass

    def _sendOnePing(self, path1, path2):
        assert path1 and path2
        assert path2[-1].getNickname() == self.myNickname
        p1 = self.directory.getPath(path1)
        p2 = self.directory.getPath(path2)
        verbose_path = ",".join([s.getNickname() for s in (p1+p2[:-1])])
        payload = mixminion.BuildMessage.buildRandomPayload()
        payloadHash = mixminion.Crypto.sha1(payload)
        packet = mixminion.BuildMessage.buildForwardPacket(
            payload, exitType=mixminion.Packet.PING_TYPE, exitInfo=payloadHash,
            path1=p1, path2=p2, suppressTag=1)
        addr = p1[0].getMMTPHostInfo()
        obj = mixminion.server.PacketHandler.RelayedPacket(addr, packet)
        LOG.debug("Pinger queueing ping along path %s [%s]",verbose_path,
                  formatBase64(payloadHash))
        self.pingLog.queuedPing(payloadHash, verbose_path)
        self.outgoingQueue.queueDeliveryMessage(obj, addr)

class _PingScheduler:
    def __init__(self):
        self.nextPingTime = {}#path->when
        # PERIOD
        # PING_INTERVAL
    def connect(self, directory, outgoingQueue, pingLog, keyring):
        self.directory = directory
        self.outgoingQueue = outgoingQueue
        self.pingLog = pingLog
        self.keyring = keyring
    def scheduleAllPings(self, now=None):
        raise NotImplemented()
    def _getPeriodStart(self, t):
        raise NotImplemented()
    def _schedulePing(self,path,now=None):
        if now is None: now = time.time()
        periodStart = _getPeriodStart(now)
        t = periodStart + self._getPerturbation(path, periodStart)
        t += self.PING_INTERVAL * ceilDiv(now-t, self.PING_INTERVAL)
        if t>periodStart+self.PERIOD:
            t = periodStart+self.PERIOD+self._getPerturbation(path,
                                                    periodStart+self.PERIOD)
        self.nextPingTime[path] = t
        LOG.trace("Scheduling %d-hop ping for %s at %s", len(path),
                  ",".join(path), formatTime(t,1))
        return t
    def _getPerturbation(self, path, periodStart):
        sha = mixminion.Crypto.sha1(",".join(path) + "@@" + str(day))
        sec = abs(struct.unpack("I", sha[:4])[0]) % self.PING_INTERVAL
        return sec

    def getFirstPingTime(self):
        if self.nextPingTime:
            return min(self.nextPingTime.values())
        else:
            return None


class OneHopPingGenerator(PingGenerator,_PingScheduler):
    """DOCDOC"""
    #XXXX008 make this configurable, but not less than 2 hours.
    PING_INTERVAL = 2*60*60
    PERIOD = 24*60*60
    def __init__(self, config):
        PingGenerator.__init__(self, config)
        _PingScheduler.__init__(self)

    def scheduleAllPings(self, now=None):
        if now is None: now = time.time()
        servers = self.directory.getAllServers()
        nicknames = {}
        for s in servers:
            nicknames[s.getNickname()]=1
        for n in nicknames.keys():
            self._schedulePing((n,),now)

    def _getPeriodStart(self, t):
        return previousMidnight(t)

    def sendPings(self, now=None):
        if now is None: now = time.time()
        servers = self.directory.getAllServers()
        nicknames = {}
        for s in servers:
            nicknames[s.getNickname()] = 1
        pingable = []
        for n in nicknames.keys():
            when = self.nextPingTime.get((n,))
            if when is None:
                # No ping scheduled; server must be new to directory.
                self._schedulePing((n,),now)
                continue
            elif when > now: # Not yet.
                continue
            else:
                # Time for a ping!
                pingable.append(n)
        myDescriptor = self.keyring.getCurrentDescriptor()
        for n in pingable:
            self._sendOnePing([n], [myDescriptor])
            self._schedulePing((n,), now+60)

class TwoHopPingGenerator:
    """DOCDOC"""
    #XXXX008 make this configurable, but not less than 2 hours.
    PING_INTERVAL = 7*24*60*60
    PERIOD = 7*24*60*60
    def __init__(self, config):
        PingGenerator.__init__(self, config)
        _PingScheduler.__init__(self)

    def scheduleAllPings(self, now=None):
        if now is None: now = time.time()
        servers = self.directory.getAllServers()
        nicknames = {}
        for s in servers:
            nicknames[s.getNickname()]=1
        for n1 in nicknames.keys():
            for n2 in nicknames.keys():
                self._schedulePing((n1,n2),now)

    def _getPeriodStart(self, t):
        return previousMidnight(t)

    def sendPings(self, now=None):
        if now is None: now = time.time()
        servers = self.directory.getAllServers()
        nicknames = {}
        for s in servers:
            nicknames[s.getNickname()] = 1
        pingable = []
        for n1 in nicknames.keys():
            for n2 in nicknames.keys():
                when = self.nextPingTime.get((n1,n2))
                if when is None:
                    # No ping scheduled; server must be new to directory.
                    self._schedulePing((n1,n2),now)
                    continue
                elif when > now: # Not yet.
                    continue
                else:
                    # Time for a ping!
                    pingable.append((n1,n2))
        myDescriptor = self.keyring.getCurrentDescriptor()
        for n1, n2 in pingable:
            self._sendOnePing([n1,n2], [myDescriptor])
            self._schedulePing((n1,n2), now+60)

#class GeometricLinkPaddingGenerator

class TestLinkPaddingGenerator(PingGenerator):
    """DOCDOC"""
    def __init__(self, config):
        PingGenerator.__init__(self,config)
        interval = config['Server']['MixInterval'].getSeconds()
        if interval < 60*60:
            self.prob = interval / float(60*60)
        else:
            self.prob = 1.0
    def connect(self, directory, outgoingQueue, pingLog, keyring):
        self.directory = directory
    def addLinkPadding(self, pkts):
        prng = mixminion.Crypto.getCommonPRNG()
        addressSet = {}
        for s in self.directory.getAllServers():
            addressSet[s.getRoutingInfo()]=1
        needPadding = []
        for addr in addressSet.keys():
            if not pkts.get(addr,None):
                if prng.getFloat() < self.prob:
                    LOG.debug("Pinger adding link-padding to test %s",
                            mixminion.ServerInfo.displayServerByRouting(addr))
                    needPadding.append(addr)
        for addr in needPadding:
            padding = mixminion.server.MMTPServer.LinkPadding()
            pkts.setdefault(addr,[]).append(padding)

class CompoundPingGenerator:
    def __init__(self, generators):
        self.gens = generators[:]
    def connect(self, directory, outgoingQueue, pingLog, keyring):
        assert directory
        assert outgoingQueue
        assert pingLog
        assert keyring
        for g in self.gens:
            g.connect(directory, outgoingQueue, pingLog, keyring)
    def scheduleAllPings(self, now=None):
        for g in self.gens:
            g.scheduleAllPings(now)
    def getFirstPingTime(self):
        times = []
        for g in self.gens:
            t = g.getFirstPingTime()
            if t is not None:
                times.append(t)
        if times:
            return min(times)
        else:
            return None
    def sendPings(self,now=None):
        for g in self.gens:
            g.sendPings()
        return self.getFirstPingTime()
    def addLinkPadding(self,pkts):
        for g in self.gens:
            g.addLinkPadding(pkts)

def getPingGenerator(config):
    """DOCDOC"""
    if not config['Pinging'].get('Enabled'):
        return CompoundPingGenerator([])
    pingers = []
    pingers.append(OneHopPingGenerator(config))
    pingers.append(TestLinkPaddingGenerator(config))
    return CompoundPingGenerator(pingers)

Index: ServerConfig.py
===================================================================
RCS file: /home/minion/cvsroot/src/minion/lib/mixminion/server/ServerConfig.py,v
retrieving revision 1.56
retrieving revision 1.57
diff -u -d -r1.56 -r1.57
--- ServerConfig.py	2 May 2004 18:45:16 -0000	1.56
+++ ServerConfig.py	27 Jul 2004 04:33:20 -0000	1.57
@@ -357,6 +357,7 @@
                      'MaxBandwidth' : ('ALLOW', "size", None),
                      'MaxBandwidthSpike' : ('ALLOW', "size", None),
                      },
+        'Pinging' : { 'Enabled' : ('ALLOW', 'boolean', 'yes') },
         'DirectoryServers' : { # '__SECTION__' : ('REQUIRE', None, None),
                                'ServerURL' : ('ALLOW*', None, None),
                                'PublishURL' : ('ALLOW*', None, None),

Index: ServerKeys.py
===================================================================
RCS file: /home/minion/cvsroot/src/minion/lib/mixminion/server/ServerKeys.py,v
retrieving revision 1.66
retrieving revision 1.67
diff -u -d -r1.66 -r1.67
--- ServerKeys.py	6 Mar 2004 00:04:38 -0000	1.66
+++ ServerKeys.py	27 Jul 2004 04:33:20 -0000	1.67
@@ -33,7 +33,7 @@
 from mixminion.Common import AtomicFile, LOG, MixError, MixFatalError, \
      ceilDiv, createPrivateDir, checkPrivateFile, englishSequence, \
      formatBase64, formatDate, formatTime, previousMidnight, readFile, \
-     secureDelete, tryUnlink, UIError, writeFile
+     replaceFile, secureDelete, tryUnlink, UIError, writeFile
 from mixminion.Config import ConfigError
 
 #----------------------------------------------------------------------
@@ -54,6 +54,16 @@
 #FFFF Make this configurable
 DIRECTORY_UPLOAD_URL = "http://mixminion.net/minion-cgi/publish";
 
+
+# We have our X509 certificate set to expire a bit after public key does,
+# so that slightly-skewed clients don't incorrectly give up while trying to
+# connect to us.  (And so that we don't mess up the world while being
+# slightly skewed.)
+CERTIFICATE_EXPIRY_SLOPPINESS = 2*60*60
+
+# DOCDOC
+CERTIFICATE_LIFETIME = 24*60*60
+
 #----------------------------------------------------------------------
 class ServerKeyring:
     """A ServerKeyring remembers current and future keys, descriptors, and
@@ -86,9 +96,13 @@
         self.keyDir = config.getKeyDir()
         self.hashDir = os.path.join(config.getWorkDir(), 'hashlogs')
         self.dhFile = os.path.join(config.getWorkDir(), 'tls', 'dhparam')
+        self.certFile = os.path.join(config.getWorkDir(), "cert_chain")
         self.keyOverlap = config['Server']['PublicKeyOverlap'].getSeconds()
+        self.nickname = config['Server']['Nickname'] #DOCDOC
         self.nextUpdate = None
         self.currentKeys = None
+        self._tlsContext = None #DOCDOC
+        self._tlsContextExpires = -1 #DOCDOC
         self.checkKeys()
 
     def checkKeys(self):
@@ -452,16 +466,44 @@
 
         return self.dhFile
 
-    def _getTLSContext(self, keys=None):
-        """Create and return a TLS context from the currently live key."""
-        if keys is None:
-            keys = self.getServerKeysets()[-1]
-        return mixminion._minionlib.TLSContext_new(keys.getCertFileName(),
-                                                   keys.getMMTPKey(),
-                                                   self._getDHFile())
+    def _newTLSContext(self, now=None):
+        """Create and return a TLS context."""
+        if now is None:
+            now = time.time()
+        mmtpKey = mixminion.Crypto.pk_generate(MMTP_KEY_BYTES*8)
 
-    def updateKeys(self, packetHandler, mmtpServer, statusFile=None,when=None):
-        """Update the keys and certificates stored in a PacketHandler and an
+        certStarts = now - CERTIFICATE_EXPIRY_SLOPPINESS
+        expires = now + CERTIFICATE_LIFETIME
+        certEnds = now + CERTIFICATE_LIFETIME + CERTIFICATE_EXPIRY_SLOPPINESS
+
+        tmpName = self.certFile + "_tmp"
+        generateCertChain(tmpName, mmtpKey, self.getIdentityKey(),
+                          self.nickname, certStarts, certEnds)
+        replaceFile(tmpName, self.certFile)
+
+        self._tlsContext = (
+                    mixminion._minionlib.TLSContext_new(self.certFile,
+                                                        mmtpKey,
+                                                        self._getDHFile()))
+        self._tlsContextExpires = expires
+        return self._tlsContext
+
+    def _getTLSContext(self, force=0, now=None):
+        if now is None:
+            now = time.time()
+        if force or self._tlsContext is None or self._tlsContextExpires < now:
+            return self._newTLSContext(now=now)
+        else:
+            return self._tlsContext
+
+    def updateMMTPServerTLSContext(self,mmtpServer,force=0,now=None):
+        """DOCDOC"""
+        context = self._getTLSContext(force=force,now=now)
+        mmtpServer.setServerContext(context)
+        return self._tlsContextExpires
+
+    def updateKeys(self, packetHandler, statusFile=None,when=None):
+        """Update the keys stored in a PacketHandler,
            MMTPServer object, so that they contain the currently correct
            keys.  Also removes any dead keys.
 
@@ -475,12 +517,6 @@
         LOG.info("Updating keys: %s currently valid (%s); %s expired (%s)",
                  len(keys), " ".join(keyNames),
                  len(deadKeys), " ".join(deadKeyNames))
-        if mmtpServer is not None:
-            LOG.trace("Using TLS cert from %s: good from %s to %s",
-                      keyNames[-1], formatDate(keys[-1].validAfter),
-                      formatDate(keys[-1].validUntil))
-            context = self._getTLSContext(keys[-1])
-            mmtpServer.setServerContext(context)
         if packetHandler is not None:
             packetKeys = []
             hashLogs = []
@@ -547,12 +583,28 @@
 
     def getAddress(self):
         """Return out current ip/port/keyid tuple"""
-        keys = self.getServerKeysets()[0]
-        desc = keys.getServerDescriptor()
+        desc = self.getCurrentDescriptor()
         return (desc['Incoming/MMTP']['IP'],
                 desc['Incoming/MMTP']['Port'],
                 desc.getKeyDigest())
 
+    def getCurrentDescriptor(self, now=None):
+        """DOCDOC"""
+        self._lock.acquire()
+        if now is None:
+            now = time.time()
+        try:
+            keysets = self.getServerKeysets()
+            for k in keysets:
+                va,vu = k.getLiveness()
+                if va <= now <= vu:
+                    return k.getServerDescriptor()
+
+            LOG.warn("getCurrentDescriptor: no live keysets??")
+            return self.getServerKeysets()[-1].getServerDescriptor()
+        finally:
+            self._lock.release()
+
     def lock(self, blocking=1):
         return self._lock.acquire(blocking)
 
@@ -577,7 +629,6 @@
     # keydir: Directory to store this keyset's data.
     # hashlogFile: filename of this keyset's hashlog.
     # packetKeyFile, mmtpKeyFile: filename of this keyset's short-term keys
-    # certFile: filename of this keyset's X509 certificate
     # descFile: filename of this keyset's server descriptor.
     # publishedFile: filename to store this server's publication time.
     #
@@ -599,6 +650,10 @@
         self.packetKeyFile = os.path.join(keydir, "mix.key")
         self.mmtpKeyFile = os.path.join(keydir, "mmtp.key")
         self.certFile = os.path.join(keydir, "mmtp.cert")
+        if os.path.exists(self.mmtpKeyFile):
+            secureDelete(self.mmtpKeyFile)
+        if os.path.exists(self.certFile):
+            secureDelete(self.certFile)
         self.descFile = os.path.join(keydir, "ServerDesc")
         self.publishedFile = os.path.join(keydir, "published")
         self.serverinfo = None
@@ -611,9 +666,6 @@
     def delete(self):
         """Remove this keyset from disk."""
         files = [self.packetKeyFile,
-                 self.mmtpKeyFile,
-                 self.certFile,
-                 self.certFile+"_tmp",
                  self.descFile,
                  self.publishedFile,
                  self.hashlogFile ]
@@ -625,7 +677,6 @@
     def checkKeys(self):
         """Check whether all the required keys exist and are private."""
         checkPrivateFile(self.packetKeyFile)
-        checkPrivateFile(self.mmtpKeyFile)
 
     def load(self, password=None):
         """Read the short-term keys from disk.  Must be called before
@@ -633,24 +684,18 @@
         self.checkKeys()
         self.packetKey = mixminion.Crypto.pk_PEM_load(self.packetKeyFile,
                                                       password)
-        self.mmtpKey = mixminion.Crypto.pk_PEM_load(self.mmtpKeyFile,
-                                                    password)
     def save(self, password=None):
         """Save this set of keys to disk."""
         mixminion.Crypto.pk_PEM_save(self.packetKey, self.packetKeyFile,
                                      password)
-        mixminion.Crypto.pk_PEM_save(self.mmtpKey, self.mmtpKeyFile,
-                                     password)
 
     def clear(self):
         """Stop holding the keys in memory."""
-        self.packetKey = self.mmtpKey = None
+        self.packetKey = None
 
-    def getCertFileName(self): return self.certFile
     def getHashLogFileName(self): return self.hashlogFile
     def getDescriptorFileName(self): return self.descFile
     def getPacketKey(self): return self.packetKey
-    def getMMTPKey(self): return self.mmtpKey
     def getPacketKeyID(self):
         "Return the sha1 hash of the asn1 encoding of the packet public key"
         return mixminion.Crypto.sha1(self.packetKey.encode_key(1))
@@ -889,11 +934,6 @@
 #----------------------------------------------------------------------
 # Functionality to generate keys and server descriptors
 
-# We have our X509 certificate set to expire a bit after public key does,
-# so that slightly-skewed clients don't incorrectly give up while trying to
-# connect to us.
-CERTIFICATE_EXPIRY_SLOPPINESS = 5*60
-
 def generateServerDescriptorAndKeys(config, identityKey, keydir, keyname,
                                     hashdir, validAt=None, now=None,
                                     useServerKeys=0, validUntil=None):
@@ -915,17 +955,14 @@
         serverKeys = ServerKeyset(keydir, keyname, hashdir)
         serverKeys.load()
         packetKey = serverKeys.packetKey
-        mmtpKey = serverKeys.mmtpKey # not used
     else:
         # First, we generate both of our short-term keys...
         packetKey = mixminion.Crypto.pk_generate(PACKET_KEY_BYTES*8)
-        mmtpKey = mixminion.Crypto.pk_generate(MMTP_KEY_BYTES*8)
 
         # ...and save them to disk, setting up our directory structure while
         # we're at it.
         serverKeys = ServerKeyset(keydir, keyname, hashdir)
         serverKeys.packetKey = packetKey
-        serverKeys.mmtpKey = mmtpKey
         serverKeys.save()
 
     # FFFF unused
@@ -952,13 +989,6 @@
     if not validUntil:
         keyLifetime = config['Server']['PublicKeyLifetime'].getSeconds()
         validUntil = previousMidnight(validAt + keyLifetime + 30)
-    certStarts = validAt - CERTIFICATE_EXPIRY_SLOPPINESS
-    certEnds = validUntil + CERTIFICATE_EXPIRY_SLOPPINESS
-
-    # Create the X509 certificates in any case, in case one of the parameters
-    # has changed.
-    generateCertChain(serverKeys.getCertFileName(),
-                      mmtpKey, identityKey, nickname, certStarts, certEnds)
 
     mmtpProtocolsIn = mixminion.server.MMTPServer.MMTPServerConnection \
                       .PROTOCOL_VERSIONS[:]
@@ -1249,3 +1279,4 @@
 
     return "Mixminion %s; Python %r on %r" % (
         mixminion.__version__, sys.version, uname)
+

Index: ServerMain.py
===================================================================
RCS file: /home/minion/cvsroot/src/minion/lib/mixminion/server/ServerMain.py,v
retrieving revision 1.130
retrieving revision 1.131
diff -u -d -r1.130 -r1.131
--- ServerMain.py	17 May 2004 21:25:27 -0000	1.130
+++ ServerMain.py	27 Jul 2004 04:33:20 -0000	1.131
@@ -21,6 +21,7 @@
 #                          ...
 #               stats.tmp         [Cache of stats from latest period]
 #               dir/...           [Directory dowloaded from directory server.]
+#               pinger/log/       [DOCDOC]
 #
 #    QUEUEDIR defaults to ${WORKDIR}/queues
 #    ${QUEUEDIR}/incoming/        [Queue of received,unprocessed pkts]
@@ -64,6 +65,7 @@
 import mixminion.server.MMTPServer
 import mixminion.server.Modules
 import mixminion.server.PacketHandler
+import mixminion.server.Pinger
 import mixminion.server.ServerQueue
 import mixminion.server.ServerConfig
 import mixminion.server.ServerKeys
@@ -156,6 +158,7 @@
         mixminion.Filestore.StringStore.__init__(self, location, create=1)
         self.packetHandler = packetHandler
         self.mixPool = None
+        self.pingLog = None#DOCDOC
 
     def connectQueues(self, mixPool, processingThread):
         """Sets the target mix queue"""
@@ -166,6 +169,10 @@
             self.processingThread.addJob(
                 lambda self=self, h=h: self.__deliverPacket(h))
 
+    def setPingLog(self, pingLog):
+        "DOCDOC"
+        self.pingLog = pingLog
+
     def queuePacket(self, pkt):
         """Add a packet for delivery"""
         h = mixminion.Filestore.StringStore.queueMessage(self, pkt)
@@ -192,7 +199,17 @@
                 self.removeMessage(handle)
             else:
                 if res.isDelivery():
-                    res.decode()
+                    if res.getExitType() == mixminion.Packet.PING_TYPE:
+                        LOG.debug("Ping packet IN:%s decoded", handle)
+                        if self.pingLog is not None:
+                            self.pingLog.processPing(res)
+                        else:
+                            LOG.debug("Pinging not enabled; discarding packet")
+                        self.removeMessage(handle)
+                        return
+                    else:
+                        #XXXX008 defer decoding to module; don't do it here.
+                        res.decode()
 
                 self.mixPool.queueObject(res)
                 self.removeMessage(handle)
@@ -325,6 +342,7 @@
         mixminion.server.ServerQueue.PerAddressDeliveryQueue.__init__(self, location)
         self.server = None
         self.incomingQueue = None
+        self.pingGenerator = None
         self.keyID = keyid
 
     def configure(self, config):
@@ -332,12 +350,13 @@
         retry = config['Outgoing/MMTP']['Retry']
         self.setRetrySchedule(retry)
 
-    def connectQueues(self, server, incoming):
+    def connectQueues(self, server, incoming, pingGenerator):
         """Set the MMTPServer and IncomingQueue that this
            OutgoingQueue informs of its deliverable packets."""
 
         self.server = server
         self.incomingQueue = incoming
+        self.pingGenerator = pingGenerator#DOCDOC
 
     def _deliverMessages(self, msgList):
         "Implementation of abstract method from DeliveryQueue."
@@ -351,6 +370,8 @@
             except mixminion.Filestore.CorruptedFile:
                 continue
             pkts.setdefault(addr, []).append(pending)
+
+        deliverable = {}
         for routing, packets in pkts.items():
             if self.keyID == routing.keyinfo:
                 for pending in packets:
@@ -361,13 +382,18 @@
                     pending.succeeded()
                 continue
 
-            deliverable = [
+            deliverable[routing] = [
                 mixminion.server.MMTPServer.DeliverablePacket(pending)
                 for pending in packets ]
             LOG.trace("Delivering packets OUT:[%s] to %s",
                       " ".join([p.getHandle() for p in packets]),
                       mixminion.ServerInfo.displayServerByRouting(routing))
-            self.server.sendPacketsByRouting(routing, deliverable)
+
+        if self.pingGenerator is not None:
+            self.pingGenerator.addLinkPadding(deliverable)
+
+        for routing, packets in deliverable.items():
+            self.server.sendPacketsByRouting(routing, packets)
 
 class _MMTPServer(mixminion.server.MMTPServer.MMTPAsyncServer):
     """Implementation of mixminion.server.MMTPServer that knows about
@@ -519,112 +545,190 @@
     signal.signal(signal.SIGTERM, _sigTermHandler)
 
 #----------------------------------------------------------------------
+# Functions to schedule periodic events.  Invocation times are approximate,
+# and not accurate to within more than a minute or two.
+
+class ScheduledEvent:
+    """Abstract base class for a scheduleable event."""
+    def getNextTime(self):
+        """Return the next time when this event should be called.  Return
+           -1 for 'never' and 'None' for 'currently unknown'.
+        """
+        raise NotImplementedError("getNextTime")
+    def __call__(self):
+        """Invoke this event."""
+        raise NotImplementedError("__call__")
+
+class OneTimeEvent:
+    """An event that will be called exactly once."""
+    def __init__(self, when, func):
+        """Create an event to call func() at the time 'when'."""
+        self.when = when
+        self.func = func
+    def getNextTime(self):
+        return self.when
+    def __call__(self):
+        self.func()
+        self.when = -1
+
+class RecurringEvent:
+    """An event that will be called at regular intervals."""
+    def __init__(self, when, func, repeat):
+        """Create an event to call func() at the time 'when', and every
+           'repeat' seconds thereafter."""
+        self.when = when
+        self.func = func
+        self.repeat = repeat
+    def getNextTime(self):
+        return self.when
+    def __call__(self):
+        try:
+            self.func()
+        finally:
+            self.when += self.repeat
+
+class RecurringComplexEvent(RecurringEvent):
+    """An event that will be called at irregular intervals."""
+    def __init__(self, when, func):
+        """Create an event to invoke func() at time 'when'.  func() must
+           return -1 for 'do not call again', or a time when it should next
+           be called."""
+        RecurringEvent.__init__(self, when, func, None)
+    def __call__(self):
+        self.when = self.func()
+
+class RecurringBackgroundEvent:
+    """An event that will be called at regular intervals, and scheduled
+       as a background job.  Does not reschedule the event while it is
+       already in progress."""
+    def __init__(self, when, scheduleJob, func, repeat):
+        """Create an event to invoke 'func' at time 'when' and every
+           'repeat' seconds thereafter.   The function 'scheduleJob' will
+           be invoked with a single callable object in order to run that
+           callable in the background.
+        """
+        self.when = when
+        self.scheduleJob = scheduleJob
+        self.func = func
+        self.repeat = repeat
+        self.running = 0
+        self.lock = threading.Lock()
+    def getNextTime(self):
+        self.lock.acquire()
+        try:
+            if self.running:
+                return None
+            else:
+                return self.when
+        finally:
+            self.lock.release()
+    def __call__(self):
+        self.lock.acquire()
+        try:
+            if self.running:
+                return
+            self.running = 1
+        finally:
+            self.lock.release()
+
+        self.scheduleJob(self._background)
+    def _background(self):
+        """Helper function: this one is actually invoked by the background
+           thread."""
+        self.func()
+        self.lock.acquire()
+        try:
+            now = time.time()
+            while self.when < now:
+                self.when += self.repeat
+            self.running = 0
+        finally:
+            self.lock.release()
+
+class RecurringComplexBackgroundEvent(RecurringBackgroundEvent):
+    """An event to run a job at irregular intervals in the background."""
+    def __init__(self, when, scheduleJob, func):
+        """Create an event to invoke 'func' at time 'when'.  func() must
+           return -1 for 'do not call again', or a time when it should next
+           be called.
+
+           The function 'scheduleJob' will be invoked with a single
+           callable object in order to run that callable in the
+           background.
+        """
+        RecurringBackgroundEvent.__init__(self, when, scheduleJob, func, None)
+    def _background(self):
+        next = self.func()
+        self.lock.acquire()
+        try:
+            self.when = next
+            self.running = 0
+        finally:
+            self.lock.release()
 
 class _Scheduler:
-    """Mixin class for server.  Implements a priority queue of ongoing,
-       scheduled tasks with a loose (few seconds) granularity.
-    """
-    # Fields:
-    #   scheduledEvents: list of (time, identifying-string, callable)
-    #       Sorted by time.  We could use a heap here instead, but
-    #       that doesn't turn into a net benefit until we have a hundred
-    #       events or so.
+    """Base class: used to run a bunch of events periodically."""
+    ##Fields:
+    # scheduledEvents: a list of ScheduledEvent objects.
+    # schedLock: a threading.RLock object to protect the list scheduledEvents
+    #   (but not the events themselves).
+    #XXXX008 needs more tests
     def __init__(self):
-        """Create a new _Scheduler"""
+        """Create a new scheduler."""
         self.scheduledEvents = []
         self.schedLock = threading.RLock()
-
     def firstEventTime(self):
-        """Return the time at which the earliest-scheduled event is
-           supposed to occur.  Returns -1 if no events.
-        """
+        """Return the time at which an event will first occur."""
         self.schedLock.acquire()
         try:
-            if self.scheduledEvents:
-                return self.scheduledEvents[0][0]
-            else:
+            if not self.scheduledEvents:
                 return -1
+            first = 0
+            for e in self.scheduledEvents:
+                t = e.getNextTime()
+                if t in (-1,None): continue
+                if not first or t < first:
+                    first = t
+            return first
         finally:
             self.schedLock.release()
 
-    def scheduleOnce(self, when, name, cb):
-        """Schedule a callback function, 'cb', to be invoked at time 'when.'
-        """
-        assert type(name) is StringType
-        assert type(when) in (IntType, LongType, FloatType)
+    def scheduleEvent(self, event):
+        """Add a ScheduledEvent to this scheduler"""
+        when = event.getNextTime()
+        if when == -1:
+            return
+        self.schedLock.acquire()
         try:
-            self.schedLock.acquire()
-            insort(self.scheduledEvents, (when, name, cb))
+            self.scheduledEvents.append(event)
         finally:
-            self.schedLock.release()
+            self.schedLock.acquire()
+
+    #XXXX008 -- these are only used for testing.
+    def scheduleOnce(self, when, name, cb):
+        self.scheduleEvent(OneTimeEvent(when,cb))
 
     def scheduleRecurring(self, first, interval, name, cb):
-        """Schedule a callback function 'cb' to be invoked at time 'first,'
-           and every 'interval' seconds thereafter.
-        """
-        assert type(name) is StringType
-        assert type(first) in (IntType, LongType, FloatType)
-        assert type(interval) in (IntType, LongType, FloatType)
-        def cbWrapper(cb=cb, interval=interval):
-            cb()
-            return time.time()+interval
-        self.scheduleRecurringComplex(first,name,cbWrapper)
+        self.scheduleEvent(RecurringEvent(first, cb, interval))
 
     def scheduleRecurringComplex(self, first, name, cb):
-        """Schedule a callback function 'cb' to be invoked at time 'first,'
-           and thereafter at times returned by 'cb'.
-
-           (Every time the callback is invoked, if it returns a non-None value,
-           the event is rescheduled for the time it returns.)
-        """
-        assert type(name) is StringType
-        assert type(first) in (IntType, LongType, FloatType)
-        self.scheduleOnce(first, name, _RecurringEvent(name, cb, self))
+        self.scheduleEvent(RecurringComplexEvent(first, cb))
 
     def processEvents(self, now=None):
-        """Run all events that are scheduled to occur before 'now'.
-
-           Note: if an event reschedules itself for a time _before_ now,
-           it will only be run once per invocation of processEvents.
-
-           The right way to run this class is something like:
-               while 1:
-                   interval = time.time() - scheduler.firstEventTime()
-                   if interval > 0:
-                       time.sleep(interval)
-                       # or maybe, select.select(...,...,...,interval)
-                   scheduler.processEvents()
-        """
-        if now is None: now = time.time()
+        """Run all events that need to get called at the time 'now'."""
+        if now is None:
+            now = time.time()
         self.schedLock.acquire()
         try:
-            se = self.scheduledEvents
-            cbs = []
-            while se and se[0][0] <= now:
-                cbs.append(se[0][2])
-                del se[0]
+            events = [(e.getNextTime(),e) for e in self.scheduledEvents]
+            self.scheduledEvents = [e for t,e in events if t != -1]
+            runnable = [(t,e) for t,e in events
+                        if t not in (-1,None) and t <= now]
         finally:
             self.schedLock.release()
-        for cb in cbs:
-            cb()
-
-class _RecurringEvent:
-    """helper for _Scheduler. Calls a callback, then reschedules it."""
-    def __init__(self, name, cb, scheduler):
-        self.name = name
-        self.cb = cb
-        self.scheduler = scheduler
-
-    def __call__(self):
-        nextTime = self.cb()
-        if nextTime is None:
-            LOG.warn("Not rescheduling %s", self.name)
-            return
-        elif nextTime < time.time():
-            raise MixFatalError("Tried to schedule event %s in the past! (%s)"
-                                %(self.name, formatTime(nextTime,1)))
-
-        self.scheduler.scheduleOnce(nextTime, self.name, self)
+        runnable.sort()
+        for _,e in runnable:
+            e()
 
 class MixminionServer(_Scheduler):
     """Wraps and drives all the queues, and the async net server.  Handles
@@ -654,6 +758,7 @@
     #    running in the same directory.  The filename for this lock is
     #    stored in self.pidFile.
     # pidFile: Filename in which we store the pid of the running server.
+    # pingLog: DOCDOC
     def __init__(self, config):
         """Create a new server from a ServerConfig."""
         _Scheduler.__init__(self)
@@ -720,8 +825,9 @@
         self.mmtpServer = _MMTPServer(config, None)
         LOG.debug("Initializing keys")
         self.descriptorFile = os.path.join(homeDir, "current-desc")
-        self.keyring.updateKeys(self.packetHandler, self.mmtpServer,
+        self.keyring.updateKeys(self.packetHandler,
                                 self.descriptorFile)
+        self.keyring.updateMMTPServerTLSContext(self.mmtpServer)
         LOG.debug("Initializing directory client")
         self.dirClient = mixminion.ClientDirectory.ClientDirectory(config)
         try:
@@ -766,6 +872,22 @@
         LOG.debug("Found %d pending packets in outgoing queue",
                        self.outgoingQueue.count())
 
+        pingerEnabled = config['Pinging'].get("Enabled")
+        if pingerEnabled:
+            #FFFF Later, enable this stuff anyway, to make R-G-B mixing work.
+            LOG.debug("Initializing ping log")
+            pingerDir = os.path.join(config.getWorkDir(), "pinger")
+            pingerLogDir = os.path.join(pingerDir, "log")
+            self.pingLog = mixminion.server.Pinger.PingLog(pingerLogDir)
+            self.pingLog.startup()
+
+            LOG.debug("Initializing ping generator")
+
+            self.pingGenerator=mixminion.server.Pinger.getPingGenerator(config)
+        else:
+            self.pingLog = None
+            self.pingGenerator = None
+
         self.cleaningThread = CleaningThread()
         self.processingThread = ProcessingThread()
 
@@ -777,10 +899,22 @@
         self.mixPool.connectQueues(outgoing=self.outgoingQueue,
                                    manager=self.moduleManager)
         self.outgoingQueue.connectQueues(server=self.mmtpServer,
-                                         incoming=self.incomingQueue)
+                                         incoming=self.incomingQueue,
+                                         pingGenerator=self.pingGenerator)
         self.mmtpServer.connectQueues(incoming=self.incomingQueue,
                                       outgoing=self.outgoingQueue)
         self.mmtpServer.connectDNSCache(self.dnsCache)
+        if self.pingGenerator is not None:
+            assert self.pingLog
+            self.pingGenerator.connect(directory=self.dirClient,
+                                       outgoingQueue=self.outgoingQueue,
+                                       pingLog=self.pingLog,
+                                       keyring=self.keyring)
+            self.pingGenerator.scheduleAllPings(time.time())
+
+        if self.pingLog is not None:
+            self.incomingQueue.setPingLog(self.pingLog)
+            self.mmtpServer.connectPingLog(self.pingLog)
 
         self.cleaningThread.start()
         self.processingThread.start()
@@ -802,7 +936,7 @@
             return time.time() + 120
 
         try:
-            self.keyring.updateKeys(self.packetHandler, self.mmtpServer,
+            self.keyring.updateKeys(self.packetHandler,
                                     self.descriptorFile)
             return self.keyring.getNextKeyRotation()
         finally:
@@ -810,49 +944,35 @@
 
     def generateKeys(self):
         """Callback used to schedule key-generation"""
-
-        # We generate and publish keys in the processing thread, so we don't
-        # slow down the server.  We also reschedule from the processing thread,
-        # so that we can take the new keyset into account when calculating
-        # when keys are next needed.
-
-        def c(self=self):
-            try:
-                self.keyring.lock()
-                self.keyring.createKeysAsNeeded()
-                self.updateKeys(lock=0)
-                if self.config['DirectoryServers'].get('Publish'):
-                    self.keyring.publishKeys()
-                self.scheduleOnce(self.keyring.getNextKeygen(),
-                                  "KEY_GEN",
-                                  self.generateKeys)
-            finally:
-                self.keyring.unlock()
-
-        self.processingThread.addJob(c)
+        try:
+            self.keyring.lock()
+            self.keyring.createKeysAsNeeded()
+            self.updateKeys(lock=0)
+            if self.config['DirectoryServers'].get('Publish'):
+                self.keyring.publishKeys()
+            return self.keyring.getNextKeygen()
+        finally:
+            self.keyring.unlock()
 
     def updateDirectoryClient(self):
-        def c(self=self):
-            try:
-                self.dirClient.update()
-                nextUpdate = succeedingMidnight(time.time()+30)
-                prng = mixminion.Crypto.getCommonPRNG()
-                # Randomly retrieve the directory within an hour after
-                # midnight, to avoid hosing the server.
-                nextUpdate += prng.getInt(60)*60
-            except mixminion.ClientDirectory.GotInvalidDirectoryError, e:
-                LOG.warn(str(e))
-                LOG.warn("    I'll try again in an hour.")
-                nextUpdate = min(succeedingMidnight(time.time()+30),
-                                 time.time()+3600)
-            except UIError, e:#XXXX008 This should really be a new exception
-                LOG.warn(str(e))
-                LOG.warn("    I'll try again in an hour.")
-                nextUpdate = min(succeedingMidnight(time.time()+30),
-                                 time.time()+3600)
-            self.scheduleOnce(nextUpdate, "UPDATE_DIR_CLIENT",
-                              self.updateDirectoryClient)
-        self.processingThread.addJob(c)
+        try:
+            self.dirClient.update()
+            nextUpdate = succeedingMidnight(time.time()+30)
+            prng = mixminion.Crypto.getCommonPRNG()
+            # Randomly retrieve the directory within an hour after
+            # midnight, to avoid hosing the server.
+            nextUpdate += prng.getInt(60)*60
+        except mixminion.ClientDirectory.GotInvalidDirectoryError, e:
+            LOG.warn(str(e))
+            LOG.warn("    I'll try again in an hour.")
+            nextUpdate = min(succeedingMidnight(time.time()+30),
+                             time.time()+3600)
+        except UIError, e:#XXXX008 This should really be a new exception
+            LOG.warn(str(e))
+            LOG.warn("    I'll try again in an hour.")
+            nextUpdate = min(succeedingMidnight(time.time()+30),
+                             time.time()+3600)
+        return nextUpdate
 
     def run(self):
         """Run the server; don't return unless we hit an exception."""
@@ -864,43 +984,68 @@
         self.cleanQueues()
 
         now = time.time()
-        self.scheduleRecurring(now+600, 600, "SHRED", self.cleanQueues)
-        self.scheduleRecurring(now+180, 180, "WAIT",
-                               lambda: waitForChildren(blocking=0))
+        self.scheduleEvent(RecurringEvent(now+600, self.cleanQueues, 600))
+        self.scheduleEvent(RecurringEvent(now+180,
+                                     lambda: waitForChildren(blocking=0),
+                                     180))
         if EventStats.log.getNextRotation():
-            self.scheduleRecurring(now+300, 300, "ES_SAVE",
-                                   lambda: EventStats.log.save)
             def _rotateStats():
                 EventStats.log.rotate()
                 return EventStats.log.getNextRotation()
-            self.scheduleRecurringComplex(EventStats.log.getNextRotation(),
-                                          "ES_ROTATE",
-                                          _rotateStats)
+            self.scheduleEvent(RecurringEvent(now+300,
+                                           lambda: EventStats.log.save, 300))
+            self.scheduleEvent(RecurringComplexEvent(
+                EventStats.log.getNextRotation(),
+                _rotateStats))
 
         def _tryTimeout(self=self):
             self.mmtpServer.tryTimeout()
             self.dnsCache.cleanCache()
             return self.mmtpServer.getNextTimeoutTime()
 
-        self.scheduleRecurringComplex(self.mmtpServer.getNextTimeoutTime(now),
-                                      "TIMEOUT",
-                                      _tryTimeout)
+        self.scheduleEvent(RecurringComplexEvent(
+            self.mmtpServer.getNextTimeoutTime(now),
+            _tryTimeout))
 
-        self.scheduleRecurringComplex(self.keyring.getNextKeyRotation(),
-                                      "KEY_ROTATE",
-                                      self.updateKeys)
+        self.scheduleEvent(RecurringComplexEvent(
+            self.keyring.getNextKeyRotation(),
+            self.updateKeys))
 
-        self.scheduleOnce(self.keyring.getNextKeygen(),
-                          "KEY_GEN",
-                          self.generateKeys)
+        self.scheduleEvent(RecurringBackgroundEvent(
+            self.keyring._tlsContextExpires,
+            self.processingThread.addJob,
+            lambda self=self: self.keyring.updateMMTPServerTLSContext(
+                                                self.mmtpServer,
+                                                force=1),
+            mixminion.server.ServerKeys.CERTIFICATE_LIFETIME))
+
+        self.scheduleEvent(RecurringComplexBackgroundEvent(
+            self.keyring.getNextKeygen(),
+            self.processingThread.addJob,
+            self.generateKeys))
+
+        if self.pingGenerator is not None:
+            self.scheduleEvent(RecurringComplexBackgroundEvent(
+                self.pingGenerator.getFirstPingTime(),
+                self.processingThread.addJob,
+                self.pingGenerator.sendPings))
+        if self.pingLog is not None:
+            self.scheduleEvent(RecurringEvent(
+                now+self.pingLog.HEARTBEAT_INTERVAL,
+                self.pingLog.heartbeat,
+                self.pingLog.HEARTBEAT_INTERVAL))
 
         # Makes next update get scheduled.
-        self.updateDirectoryClient()
+        nextUpdate = self.updateDirectoryClient()
+        self.scheduleEvent(RecurringComplexBackgroundEvent(
+            nextUpdate,
+            self.processingThread.addJob,
+            self.updateDirectoryClient))
 
         nextMix = self.mixPool.getNextMixTime(now)
         LOG.debug("First mix at %s", formatTime(nextMix,1))
-        self.scheduleRecurringComplex(self.mixPool.getNextMixTime(now),
-                                      "MIX", self.doMix)
+        self.scheduleEvent(RecurringComplexEvent(
+            self.mixPool.getNextMixTime(now), self.doMix))
 
         LOG.info("Entering main loop: Mixminion %s", mixminion.__version__)
 
@@ -910,15 +1055,16 @@
         if self.config['Server'].get("Daemon",1):
             closeUnusedFDs()
 
+        SCHEDULE_INTERVAL = 60
+        TICK_INTERVAL = self.mmtpServer.TICK_INTERVAL
         while 1:
-            nextEventTime = self.firstEventTime()
             now = time.time()
-            timeLeft = nextEventTime - now
-            tickInterval = self.mmtpServer.TICK_INTERVAL
-            nextTick = now+tickInterval
+            nextEvent = now + SCHEDULE_INTERVAL
+            timeLeft = SCHEDULE_INTERVAL
+            nextTick = now+TICK_INTERVAL
             while timeLeft > 0:
                 # Handle pending network events
-                self.mmtpServer.process(tickInterval)
+                self.mmtpServer.process(TICK_INTERVAL)
                 # Check for signals
                 if STOPPING:
                     LOG.info("Caught SIGTERM; shutting down.")
@@ -938,8 +1084,8 @@
                 now = time.time()
                 if now > nextTick:
                     self.mmtpServer.tick()
-                    nextTick = now+tickInterval
-                timeLeft = nextEventTime - now
+                    nextTick = now+TICK_INTERVAL
+                timeLeft = nextEvent - now
 
             # An event has fired.
             self.processEvents()
@@ -999,9 +1145,13 @@
         self.mixPool.queue.cleanQueue(df)
         self.outgoingQueue.cleanQueue(df)
         self.moduleManager.cleanQueues(df)
+        if self.pingLog:
+            self.pingLog.clean()
 
     def close(self):
         """Release all resources; close all files."""
+        if self.pingLog is not None:
+            self.pingLog.shutdown()
         self.cleaningThread.shutdown()
         self.processingThread.shutdown()
         self.moduleManager.shutdown()