[Author Prev][Author Next][Thread Prev][Thread Next][Author Index][Thread Index]

[minion-cvs] Fix pinger logic to use identity digests instead of nic...



Update of /home/minion/cvsroot/src/minion/lib/mixminion/server
In directory moria:/tmp/cvs-serv17697/lib/mixminion/server

Modified Files:
	MMTPServer.py Pinger.py ServerMain.py 
Log Message:
Fix pinger logic to use identity digests instead of nicknames.  Why didnt I think of this in the first place?

Index: MMTPServer.py
===================================================================
RCS file: /home/minion/cvsroot/src/minion/lib/mixminion/server/MMTPServer.py,v
retrieving revision 1.87
retrieving revision 1.88
diff -u -d -r1.87 -r1.88
--- MMTPServer.py	3 May 2005 03:28:47 -0000	1.87
+++ MMTPServer.py	9 Aug 2005 15:51:32 -0000	1.88
@@ -515,25 +515,26 @@
     ## Fields:
     # _pingLog: The PingLog we will inform about successful or failed
     #    connections, or None if we have no PingLog.
-    # _nickname: The nickname of the server we're trying to connect to.
+    # _identity: The identity digest of the server we're trying to
+    #    connect to.
     # _wasOnceConnected: True iff we have successfully negotiated a protocol
     #    version with the other server.
-    def configurePingLog(self, pingLog, nickname):
+    def configurePingLog(self, pingLog, identity):
         """Must be called after construction: set this _ClientCon to
            report events to the pinglog 'pingLog'; tell it that we are
             connecting to the server named 'nickname'."""
         self._pingLog = pingLog
-        self._nickname = nickname
+        self._identity = identity
         self._wasOnceConnected = 0
     def onProtocolRead(self):
         MMTPClientConnection.onProtocolRead(self)
         if self._isConnected:
             if self._pingLog:
-                self._pingLog.connected(self._nickname)
+                self._pingLog.connected(self._identity)
             self._wasOnceConnected = 1
     def _failPendingPackets(self):
         if not self._wasOnceConnected and self._pingLog:
-            self._pingLog.connectFailed(self._nickname)
+            self._pingLog.connectFailed(self._identity)
         MMTPClientConnection._failPendingPackets(self)
 
 LISTEN_BACKLOG = 128
@@ -772,7 +773,7 @@
             if nickname is not None:
                 # If we recognize this server, then we'll want to tell
                 # the ping log what happens to our connection attempt.
-                con.configurePingLog(self.pingLog, nickname)
+                con.configurePingLog(self.pingLog, keyID)
             #con.allPacketsSent = finished #XXXX007 wrong!
             con.onClosed = finished
         except (socket.error, MixProtocolError), e:

Index: Pinger.py
===================================================================
RCS file: /home/minion/cvsroot/src/minion/lib/mixminion/server/Pinger.py,v
retrieving revision 1.26
retrieving revision 1.27
diff -u -d -r1.26 -r1.27
--- Pinger.py	7 Feb 2005 07:08:55 -0000	1.26
+++ Pinger.py	9 Aug 2005 15:51:32 -0000	1.27
@@ -23,6 +23,7 @@
    pinger anyway.
 """
 
+import binascii
 import bisect
 import calendar
 import cPickle
@@ -250,8 +251,8 @@
     # _lock: an instance of threading.RLock to control access to in-memory
     #    structures.  The databse is responsible for keeping its own structures
     #    consistent.
-    # _serverIDs: A map from lc server nickname to server ID used in the
-    #    database.
+    # _serverIDs: A map from server identity digest to server ID
+    #    used in the database.
     # _intervalIDs: A map from (start,end) to a time interval ID used in the
     #    database
     # _serverReliability: A map from lc server nickname to last computed
@@ -318,7 +319,8 @@
             # Holds lowercased nicknames for all the servers we know about.
             self._db.createTable("server",
                                  [("id",   "integer",     "primary key"),
-                                  ("name", "varchar(32)", "unique not null")])
+                                  ("name", "varchar(32)", "unique not null"),
+                                  ("identity", "varchar(40)", "unique not null")])
 
             # Holds information about our attempts to launch MMTP connections
             # to other servers.  A row in connectionAttempt means: We tried to
@@ -412,6 +414,8 @@
             #### Indices.
 
             self._db.createIndex("serverName", "server", ["name"], unique=1)
+            self._db.createIndex("serverIdentity", "server",
+                                 ["identity"], unique=1)
             self._db.createIndex("statsIntervalSE", "statsInterval",
                                  ["startAt", "endAt"], unique=1)
             self._db.createIndex("myLifespanStartup", "myLifespan", ["startUp"])
@@ -451,11 +455,11 @@
            the database.
         """
         cur = self._db.getCursor()
-        cur.execute("SELECT id, name FROM server")
+        cur.execute("SELECT id, identity FROM server")
         res = cur.fetchall()
         serverIDs = {}
-        for id,name in res:
-            serverIDs[name] = id
+        for idnum,identity in res:
+            serverIDs[binascii.a2b_hex(identity)] = idnum
 
         serverReliability = {}
         cur.execute("SELECT name, reliability FROM "
@@ -485,36 +489,39 @@
         self._brokenChains = broken
         self._interestingChains = interesting
 
-    def updateServers(self, names):
-        """Add the names in 'names' to the database, if they aren't there
-           already.
+    def updateServers(self, descriptorSource):
+        """Add the names 'descriptorSource' to the database, if they
+        aren't there already.
         """
-        for n in names:
-            self._getServerID(n)
+        for s in descriptorInfo.getServerList():
+            self._getServerID(s.getIdentityDigest(), s.getNickname())
         self._db.getConnection().commit()
 
-    def _getServerID(self, name):
-        """Helper: Return the database ID for the server named 'name'.  If the
-           database doesn't know about the server yet, add it.  Does not
-           commit the current transaction.
+    def _getServerID(self, identity, name=None):
+        """Helper: Return the database ID for the server whose hex
+           identity digest is 'identity'.  If the database doesn't
+           know about the server yet, add it.  Does not commit the
+           current transaction.
         """
-        name = name.lower()
         self._lock.acquire()
         try:
             try:
-                return self._serverIDs[name]
+                return self._serverIDs[identity]
             except KeyError:
-                self._serverIDs[name] = 1
+                self._serverIDs[identity] = 1
         finally:
             self._lock.release()
-
         cur = self._db.getCursor()
 
-        cur.execute("INSERT INTO server (name) VALUES (%s)", name)
-        cur.execute("SELECT id FROM server WHERE name = %s", name)
+        if name is None: name = binascii.b2a_hex(identity)
+
+        cur.execute("INSERT INTO server (name,identity) VALUES (%s,%s)",
+                    name.lower(), binascii.b2a_hex(identity))
+        cur.execute("SELECT id FROM server WHERE identity = %s",
+                    binascii.b2a_hex(identity))
         #XXXX catch errors!
         ident, = cur.fetchone()
-        self._serverIDs[name]=ident
+        self._serverIDs[identity]=ident
         return ident
 
     def _getIntervalID(self, start, end):
@@ -604,35 +611,33 @@
 
     _CONNECTED = ("INSERT INTO connectionAttempt (at, server, success) "
                   "VALUES (%s,%s,%s)")
-    def connected(self, nickname, success=1, now=None):
+    def connected(self, identity, success=1, now=None):
         """Note that we attempted to connect to the server named 'nickname'.
            We successfully negotiated a protocol iff success is true.
         """
-        serverID = self._getServerID(nickname)
+        serverID = self._getServerID(identity)
         self._db.getCursor().execute(self._CONNECTED,
                         (self._db.time(now), serverID, self._db.bool(success)))
         self._db.getConnection().commit()
 
-    def connectFailed(self, nickname, now=None):
+    def connectFailed(self, identity, now=None):
         """Note that we attempted to connect to the server named 'nickname',
            but could not negotiate a protocol.
         """
-        self.connected(nickname, success=0, now=now)
+        self.connected(identity, success=0, now=now)
 
     _QUEUED_PING = ("INSERT INTO ping (hash, path, sentat, received)"
                     "VALUES (%s,%s,%s,%s)")
     def queuedPing(self, hash, path, now=None):
-        """Note that we send a probe message along 'path' (a comma-separated
-           sequence of server nicknames, excluding ourself as first and last
-           hop), such that the payload, when delivered, will have 'hash' as
-           its digest.
+        """Note that we send a probe message along 'path' (a list of
+           server identities, excluding ourself as first and last
+           hop), such that the payload, when delivered, will have
+           'hash' as its digest.
         """
         assert len(hash) == mixminion.Crypto.DIGEST_LEN
-        path = path.lower()
-        for s in path.split(","):
-            self._getServerID(s)
+        ids = ",".join([ str(self._getServerID(s)) for s in path ])
         self._db.getCursor().execute(self._QUEUED_PING,
-                             (formatBase64(hash), path, self._db.time(now), 0))
+                             (formatBase64(hash), ids, self._db.time(now), 0))
         self._db.getConnection().commit()
 
     _GOT_PING = "UPDATE ping SET received = %s WHERE hash = %s"
@@ -648,13 +653,13 @@
         elif n > 1:
             LOG.warn("Received ping with multiple hash entries!")
 
-    def _calculateUptimes(self, serverNames, startTime, endTime, now=None):
+    def _calculateUptimes(self, serverIdentities, startTime, endTime, now=None):
         """Helper: calculate the uptime results for a set of servers, named in
-           serverNames, for all intervals between startTime and endTime
+           serverIdentities, for all intervals between startTime and endTime
            inclusive.  Does not commit the current transaction.
         """
         cur = self._db.getCursor()
-        serverNames.sort()
+        serverIdentities.sort()
 
         # First, calculate my own uptime.
         if now is None: now = time.time()
@@ -677,7 +682,7 @@
             self._setUptime((i, self._getServerID("<self>")), (fracUptime,))
 
         # Okay, now everybody else.
-        for s, serverID in self._serverIDs.items():
+        for (identity, serverID) in self._serverIDs.items():
             if s in ('<self>','<unknown>'): continue
             cur.execute("SELECT at, success FROM connectionAttempt"
                         " WHERE server = %s AND at >= %s AND at <= %s"
@@ -712,7 +717,6 @@
             for s,e,intervalID in calcIntervals:
                 uptime = (upIntervals*IntervalSet([(s,e)])).spanLength()
                 downtime = (downIntervals*IntervalSet([(s,e)])).spanLength()
-                if s == 'foobart': print uptime, downtime
                 if uptime < 1 and downtime < 1:
                     continue
                 fraction = float(uptime)/(uptime+downtime)
@@ -724,27 +728,27 @@
         if now is None: now = time.time()
         self._lock.acquire()
         try:
-            serverNames = self._serverIDs.keys()
+            serverIdentities = self._serverIDs.keys()
         finally:
             self._lock.release()
-        serverNames.sort()
-        self._calculateUptimes(serverNames, startAt, endAt, now=now)
+        serverIdentities.sort()
+        self._calculateUptimes(serverIdentities, startAt, endAt, now=now)
         self._db.getConnection().commit()
 
     def getUptimes(self, startAt, endAt):
         """Return uptimes for all servers overlapping [startAt, endAt],
-           as mapping from (start,end) to lowercase nickname to fraction.
+           as mapping from (start,end) to identity to fraction.
         """
         result = {}
         cur = self._db.getCursor()
-        cur.execute("SELECT startat, endat, name, uptime "
+        cur.execute("SELECT startat, endat, identity, uptime "
                     "FROM uptime, statsInterval, server "
                     "WHERE statsInterval.id = uptime.interval "
                     "AND server.id = uptime.server "
                     "AND %s >= startat AND %s <= endat",
                     (self._db.time(startAt), self._db.time(endAt)))
-        for s,e,n,u in cur:
-            result.setdefault((s,e), {})[n] = u
+        for s,e,i,u in cur:
+            result.setdefault((s,e), {})[binascii.a2b_hex(i)] = u
         self._db.getConnection().commit()
         return result
 
@@ -768,7 +772,7 @@
     _WEIGHT_AGE_PERIOD = 24*60*60
     _WEIGHT_AGE = [ 1, 2, 2, 3, 5, 8, 9, 10, 10, 10, 10, 5 ]
     _PING_GRANULARITY = 24*60*60
-    def _calculateOneHopResult(self, serverName, startTime, endTime,
+    def _calculateOneHopResult(self, serverIdentity, startTime, endTime,
                                 now=None, calculateOverallResults=1):
         """Calculate the latency and reliablity for a given server on
            intervals between startTime and endTime, inclusive.  If
@@ -787,7 +791,7 @@
         nPeriods = len(intervals)
         startTime = intervals[0][0]
         endTime = intervals[-1][1]
-        serverID = self._getServerID(serverName)
+        serverID = self._getServerID(serverIdentity)
 
         # 1. Compute latencies and number of pings sent in each period.
         #    We need to learn these first so we can tell the percentile
@@ -797,7 +801,7 @@
         nPings = 0
         cur.execute("SELECT sentat, received FROM ping WHERE path = %s"
                     " AND sentat >= %s AND sentat <= %s",
-                    (serverName, startTime, endTime))
+                    (serverID, startTime, endTime))
         for sent,received in cur:
             pIdx = floorDiv(sent-startTime, self._PING_GRANULARITY)
             nSent[pIdx] += 1
@@ -830,7 +834,7 @@
         perTotalWeighted = [0]*nPeriods
         cur.execute("SELECT sentat, received FROM ping WHERE path = %s"
                     " AND sentat >= %s AND sentat <= %s",
-                    (serverName, startTime, endTime))
+                    (serverID, startTime, endTime))
         for sent,received in cur:
             pIdx = floorDiv(sent-startTime, self._PING_GRANULARITY)
             if received:
@@ -893,15 +897,15 @@
         """
         self._lock.acquire()
         try:
-            serverNames = self._serverIDs.keys()
+            serverIdentities = self._serverIDs.keys()
         finally:
             self._lock.release()
 
         if now is None:
             now = time.time()
-        serverNames.sort()
+        serverIdentities.sort()
         reliability = {}
-        for s in serverNames:
+        for s in serverIdentities:
             if s in ('<self>','<unknown>'): continue
             # For now, always calculate overall results.
             r = self._calculateOneHopResult(s,now,now,now,
@@ -916,14 +920,14 @@
 
     def _calculate2ChainStatus(self, since, s1, s2, now=None):
         """Helper: Calculate the status (broken/interesting/both/neither) for
-           a chain of the servers 's1' and 's2' (given as lc nicknames),
+           a chain of the servers 's1' and 's2' (given as identity digests),
            considering pings sent since 'since'.  Return a tuple of (number of
            pings sent, number of those pings received, is-broken,
            is-interesting).  Does not commit the current transaction.
         """
         # doesn't commit.
         cur = self._db.getCursor()
-        path = "%s,%s"%(s1,s2)
+        path = "%s,%s"%(self._getServerID(s1),self._getServerID(s2))
         cur.execute("SELECT count() FROM ping WHERE path = %s"
                     " AND sentat >= %s",
                     (path,self._db.time(since)))
@@ -968,7 +972,7 @@
         """Calculate the status of all two-hop chains."""
         self._lock.acquire()
         try:
-            serverNames = self._serverIDs.keys()
+            serverIdentities = self._serverIDs.keys()
         finally:
             self._lock.release()
 
@@ -978,11 +982,11 @@
         brokenChains = {}
         interestingChains = {}
         since = now - self._CHAIN_PING_HORIZON
-        serverNames.sort()
+        serverIdentities.sort()
 
-        for s1 in serverNames:
+        for s1 in serverIdentities:
             if s1 in ('<self>','<unknown>'): continue
-            for s2 in serverNames:
+            for s2 in serverIdentities:
                 if s2 == ('<self>','<unknown>'): continue
                 p = "%s,%s"%(s1,s2)
                 nS, nR, isBroken, isInteresting = \
@@ -1010,28 +1014,28 @@
            'since', inclusive."""
         self._lock.acquire()
         try:
-            serverNames = self._serverIDs.keys()
+            serverIdentities = self._serverIDs.keys()
         finally:
             self._lock.release()
 
         if now is None: now = time.time()
         print >>f, "# List of all currently tracked servers."
-        print >>f, "KNOWN_SERVERS =",serverNames
+        print >>f, "KNOWN_SERVERS =",serverIdentities
         cur = self._db.getCursor()
 
         print >>f, "\n# Map from server to list of (period-start, period-end, fractional uptime"
         print >>f, "SERVER_UPTIMES = {"
-        cur.execute("SELECT startAt,endAt,name,uptime FROM uptime, server, statsInterval "
+        cur.execute("SELECT startAt,endAt,identity,name,uptime FROM uptime, server, statsInterval "
                     "WHERE startAt >= %s AND startAt <= %s "
                     "AND uptime.server = server.id "
                     "AND uptime.interval = statsInterval.id "
                     "ORDER BY name, startAt", (since, now))
         lastServer = "---"
-        for s,e,n,u in cur:
-            if n != lastServer:
+        for s,e,i,n,u in cur:
+            if i != lastServer:
                 if lastServer != '---': print >>f, "   ],"
-                lastServer = n
-                print >>f, "   %r : [" % n
+                lastServer = i
+                print >>f, "   (%r,%r) : [" % (i,n)
             print >>f, "      (%s,%s,%.04f),"%(s,e,u)
         if lastServer != '---': print >>f, "   ]"
         print >>f, "}"
@@ -1041,7 +1045,7 @@
 #      # of those pings received, median latency on those pings (sec),
 #      weighted reliability)"""
         print >>f, "SERVER_DAILY_PING_STATUS = {"
-        cur.execute("SELECT name,startAt,endAt,nSent,nReceived,"
+        cur.execute("SELECT identity,name,startAt,endAt,nSent,nReceived,"
                    "  latency,reliability "
                    "FROM echolotOneHopResult, server, statsInterval "
                    "WHERE startat >= %s AND startat <= %s"
@@ -1049,12 +1053,12 @@
                    "AND echolotOneHopResult.interval = statsInterval.id "
                    "ORDER BY name, startat", (since, now))
         lastServer = "---"
-        for n,s,e,nS,nR,lat,r in cur:
+        for i,n,s,e,nS,nR,lat,r in cur:
             if s == '<self>': continue
-            if n != lastServer:
+            if i != lastServer:
                 if lastServer != '---': print >>f, "   ],"
-                lastServer = n
-                print >>f, "   %r : [" % n
+                lastServer = i
+                print >>f, "   (%r,%r) : [" % (i,n)
             print >>f, "      (%s,%s,%s,%s,%s,%.04f),"%(s,e,nS,nR,lat,r)
         if lastServer != '---': print >>f, "   ]"
         print >>f, "}"
@@ -1182,6 +1186,7 @@
             LOG.info("Not sending scheduled ping: %s",e)
             return 0
         verbose_path = ",".join([s.getNickname() for s in (p1+p2[:-1])])
+        identity_list = [ s.getIdentityDigest() for s in p1+p2[:-1] ]
         payload = mixminion.BuildMessage.buildRandomPayload()
         payloadHash = mixminion.Crypto.sha1(payload)
         packet = mixminion.BuildMessage.buildForwardPacket(
@@ -1191,7 +1196,7 @@
         obj = mixminion.server.PacketHandler.RelayedPacket(addr, packet)
         LOG.debug("Pinger queueing ping along path %s [%s]",verbose_path,
                   formatBase64(payloadHash))
-        self.pingLog.queuedPing(payloadHash, verbose_path.lower())
+        self.pingLog.queuedPing(payloadHash, identity_list)
         self.outgoingQueue.queueDeliveryMessage(obj, addr)
         return 1
 

Index: ServerMain.py
===================================================================
RCS file: /home/minion/cvsroot/src/minion/lib/mixminion/server/ServerMain.py,v
retrieving revision 1.146
retrieving revision 1.147
diff -u -d -r1.146 -r1.147
--- ServerMain.py	27 Dec 2004 00:15:58 -0000	1.146
+++ ServerMain.py	9 Aug 2005 15:51:32 -0000	1.147
@@ -71,6 +71,9 @@
 import mixminion.server.ServerConfig
 import mixminion.server.ServerKeys
 import mixminion.server.EventStats as EventStats
+from mixminion.ScheduleUtils import OneTimeEvent, RecurringEvent, \
+     RecurringComplexEvent, RecurringBackgroundEvent, \
+     RecurringComplexBackgroundEvent, Scheduler
 
 from bisect import insort
 from mixminion.Common import LOG, LogStream, MixError, MixFatalError,\
@@ -513,193 +516,7 @@
         signal.signal(signal.SIGHUP, _sigHupHandler)
     signal.signal(signal.SIGTERM, _sigTermHandler)
 
-#----------------------------------------------------------------------
-# Functions to schedule periodic events.  Invocation times are approximate,
-# and not accurate to within more than a minute or two.
-
-class ScheduledEvent:
-    """Abstract base class for a scheduleable event."""
-    def getNextTime(self):
-        """Return the next time when this event should be called.  Return
-           -1 for 'never' and 'None' for 'currently unknown'.
-        """
-        raise NotImplementedError("getNextTime")
-    def __call__(self):
-        """Invoke this event."""
-        raise NotImplementedError("__call__")
-
-class OneTimeEvent:
-    """An event that will be called exactly once."""
-    def __init__(self, when, func):
-        """Create an event to call func() at the time 'when'."""
-        self.when = when
-        self.func = func
-    def getNextTime(self):
-        return self.when
-    def __call__(self):
-        self.func()
-        self.when = -1
-
-class RecurringEvent:
-    """An event that will be called at regular intervals."""
-    def __init__(self, when, func, repeat):
-        """Create an event to call func() at the time 'when', and every
-           'repeat' seconds thereafter."""
-        self.when = when
-        self.func = func
-        self.repeat = repeat
-    def getNextTime(self):
-        return self.when
-    def __call__(self):
-        try:
-            self.func()
-        finally:
-            self.when += self.repeat
-
-class RecurringComplexEvent(RecurringEvent):
-    """An event that will be called at irregular intervals."""
-    def __init__(self, when, func):
-        """Create an event to invoke func() at time 'when'.  func() must
-           return -1 for 'do not call again', or a time when it should next
-           be called."""
-        RecurringEvent.__init__(self, when, func, None)
-    def __call__(self):
-        self.when = self.func()
-
-class RecurringBackgroundEvent:
-    """An event that will be called at regular intervals, and scheduled
-       as a background job.  Does not reschedule the event while it is
-       already in progress."""
-    def __init__(self, when, scheduleJob, func, repeat):
-        """Create an event to invoke 'func' at time 'when' and every
-           'repeat' seconds thereafter.   The function 'scheduleJob' will
-           be invoked with a single callable object in order to run that
-           callable in the background.
-        """
-        self.when = when
-        self.scheduleJob = scheduleJob
-        self.func = func
-        self.repeat = repeat
-        self.running = 0
-        self.lock = threading.Lock()
-    def getNextTime(self):
-        self.lock.acquire()
-        try:
-            if self.running:
-                return None
-            else:
-                return self.when
-        finally:
-            self.lock.release()
-    def __call__(self):
-        self.lock.acquire()
-        try:
-            if self.running:
-                return
-            self.running = 1
-        finally:
-            self.lock.release()
-
-        self.scheduleJob(self._background)
-    def _background(self):
-        """Helper function: this one is actually invoked by the background
-           thread."""
-        self.func()
-        self.lock.acquire()
-        try:
-            now = time.time()
-            while self.when < now:
-                self.when += self.repeat
-            self.running = 0
-        finally:
-            self.lock.release()
-
-class RecurringComplexBackgroundEvent(RecurringBackgroundEvent):
-    """An event to run a job at irregular intervals in the background."""
-    def __init__(self, when, scheduleJob, func):
-        """Create an event to invoke 'func' at time 'when'.  func() must
-           return -1 for 'do not call again', or a time when it should next
-           be called.
-
-           The function 'scheduleJob' will be invoked with a single
-           callable object in order to run that callable in the
-           background.
-        """
-        RecurringBackgroundEvent.__init__(self, when, scheduleJob, func, None)
-    def _background(self):
-        next = self.func()
-        self.lock.acquire()
-        try:
-            self.when = next
-            self.running = 0
-        finally:
-            self.lock.release()
-
-class _Scheduler:
-    """Base class: used to run a bunch of events periodically."""
-    ##Fields:
-    # scheduledEvents: a list of ScheduledEvent objects.
-    # schedLock: a threading.RLock object to protect the list scheduledEvents
-    #   (but not the events themselves).
-    #XXXX008 needs more tests
-    def __init__(self):
-        """Create a new scheduler."""
-        self.scheduledEvents = []
-        self.schedLock = threading.RLock()
-    def firstEventTime(self):
-        """Return the time at which an event will first occur."""
-        self.schedLock.acquire()
-        try:
-            if not self.scheduledEvents:
-                return -1
-            first = 0
-            for e in self.scheduledEvents:
-                t = e.getNextTime()
-                if t in (-1,None): continue
-                if not first or t < first:
-                    first = t
-            return first
-        finally:
-            self.schedLock.release()
-
-    def scheduleEvent(self, event):
-        """Add a ScheduledEvent to this scheduler"""
-        when = event.getNextTime()
-        if when == -1:
-            return
-        self.schedLock.acquire()
-        try:
-            self.scheduledEvents.append(event)
-        finally:
-            self.schedLock.acquire()
-
-    #XXXX008 -- these are only used for testing.
-    def scheduleOnce(self, when, name, cb):
-        self.scheduleEvent(OneTimeEvent(when,cb))
-
-    def scheduleRecurring(self, first, interval, name, cb):
-        self.scheduleEvent(RecurringEvent(first, cb, interval))
-
-    def scheduleRecurringComplex(self, first, name, cb):
-        self.scheduleEvent(RecurringComplexEvent(first, cb))
-
-    def processEvents(self, now=None):
-        """Run all events that need to get called at the time 'now'."""
-        if now is None:
-            now = time.time()
-        self.schedLock.acquire()
-        try:
-            events = [(e.getNextTime(),e) for e in self.scheduledEvents]
-            self.scheduledEvents = [e for t,e in events if t != -1]
-            runnable = [(t,e) for t,e in events
-                        if t not in (-1,None) and t <= now]
-        finally:
-            self.schedLock.release()
-        runnable.sort()
-        for _,e in runnable:
-            e()
-
-class MixminionServer(_Scheduler):
+class MixminionServer(Scheduler):
     """Wraps and drives all the queues, and the async net server.  Handles
        all timed events."""
     ## Fields:
@@ -960,7 +777,7 @@
             if self.pingLog:
                 serverNames = [ s.getNickname()
                                 for s in self.dirClient.getAllServers() ]
-                self.pingLog.updateServers(serverNames)
+                self.pingLog.updateServers(self.dirClient)
 
         return nextUpdate