[Author Prev][Author Next][Thread Prev][Thread Next][Author Index][Thread Index]

[minion-cvs] Document pinging and related code; clean some interface...



Update of /home/minion/cvsroot/src/minion/lib/mixminion/server
In directory moria.mit.edu:/tmp/cvs-serv7890/lib/mixminion/server

Modified Files:
	MMTPServer.py Pinger.py ServerMain.py 
Log Message:
Document pinging and related code; clean some interfaces up slightly.

Index: MMTPServer.py
===================================================================
RCS file: /home/minion/cvsroot/src/minion/lib/mixminion/server/MMTPServer.py,v
retrieving revision 1.85
retrieving revision 1.86
diff -u -d -r1.85 -r1.86
--- MMTPServer.py	27 Jul 2004 04:33:20 -0000	1.85
+++ MMTPServer.py	27 Dec 2004 00:15:57 -0000	1.86
@@ -499,7 +499,9 @@
         return 0
 
 class LinkPadding(mixminion.MMTPClient.DeliverableMessage):
-    """DOCDOC"""
+    """An instance of DeliverableMessage to implement link-padding
+       (junk) MMTP commands.
+    """
     def __init__(self):
         self.contents = getCommonPRNG().getBytes(1<<15)
     def succeeded(self): pass
@@ -508,8 +510,18 @@
     def isJunk(self): return 1
 
 class _ClientCon(MMTPClientConnection):
-    """DOCDOC"""
+    """A subclass of MMTPClientConnection that reports events to the
+       pinger subsystem."""
+    ## Fields:
+    # _pingLog: The PingLog we will inform about successful or failed
+    #    connections, or None if we have no PingLog.
+    # _nickname: The nickname of the server we're trying to connect to.
+    # _wasOnceConnected: True iff we have successfully negotiated a protocol
+    #    version with the other server.
     def configurePingLog(self, pingLog, nickname):
+        """Must be called after construction: set this _ClientCon to
+           report events to the pinglog 'pingLog'; tell it that we are
+            connecting to the server named 'nickname'."""
         self._pingLog = pingLog
         self._nickname = nickname
         self._wasOnceConnected = 0
@@ -608,6 +620,7 @@
         self.dnsCache = dnsCache
 
     def connectPingLog(self, pingLog):
+        """Report successful or failed connection attempts to 'pingLog'."""
         self.pingLog = pingLog
 
     def setServerContext(self, servercontext):

Index: Pinger.py
===================================================================
RCS file: /home/minion/cvsroot/src/minion/lib/mixminion/server/Pinger.py,v
retrieving revision 1.22
retrieving revision 1.23
diff -u -d -r1.22 -r1.23
--- Pinger.py	22 Dec 2004 04:47:10 -0000	1.22
+++ Pinger.py	27 Dec 2004 00:15:57 -0000	1.23
@@ -6,16 +6,21 @@
    Built-in network reliability tester (pinger) for Mixminion servers.
 
    Our pinger uses a three-part architecture.  First, we periodically
-   consider adding testing packets to the outgoing batches.
+   consider adding testing packets to the mix pool, and consider
+   adding link padding to outgoing patches.  These functions are
+   perfomed by PingGenerator objects.
 
-   Second, we note the outcome of connection attempts; and the timing
-   of our sending/receiving test packets.
+   Second, we note the outcome of connection attempts, and the timing
+   of our sending/receiving test packets.  These functions are
+   performed by a PingLog object.
 
    Third, we use the timing/uptime information in the second part
    above to try to infer how reliable the other nodes in the network
-   are.
+   are.  This is also done by PingLog.
 
-   This module requires python 2.2 or later, and the sqlite module.
+   This module requires Python 2.2 or later, and the sqlite module.
+   I'm okay with this, since most servers shouldn't be running a
+   pinger anyway.
 """
 
 import bisect
@@ -46,17 +51,24 @@
 except ImportError:
     sqlite = None
 
+# How often should the server store the fact that it is still alive (seconds).
 HEARTBEAT_INTERVAL = 30*60
+# Number of seconds in a day.
 ONE_DAY = 24*60*60
 
 class IntervalSchedule:
-    """DOCDOC -- defines a set of intervals in time."""
+    """A partition of time into a series of intervals.  (Currently, only
+       days are supported."""
     def __init__(self):
         pass
     def getIntervalContaining(self, t):
+        """Return a 2-tuple of the start and the end of the interval containing
+           't'.  The bottom of the interval is closed; the top is open."""
         p = previousMidnight(t)
         return p, succeedingMidnight(p)
     def getIntervals(self, startAt, endAt):
+        """Return a list of all intervals between the one containing
+           startAt and the one containing endAt, inclusive."""
         r = []
         t = previousMidnight(startAt)
         while t < endAt:
@@ -66,7 +78,25 @@
         return r
 
 class SQLiteDatabase:
-    #XXXX can only be used from one thread at a time.
+    """Helper class.  Encapsulates the properties of the SQLite
+       database implementation.
+
+       SQLite is a very minimal relational databse, and SQLite 2.8
+       (the one that PySQLite wraps) is even more minimal.  It only
+       has two underlying types: string and number.  It only enforces
+       them when doing comparisons.  The locking is very
+       coarse-grained.  Foreign key constraints aren't checked.
+
+       On the plus side, SQLite has no administrative overhead.  You
+       don't need to run a daemon; you don't need to create users; you
+       only need to install a library.
+    """
+    ## Fields:
+    # _theConnection: A SQLite database connection.  Only one thread should
+    #    use this connection at a time.
+    # _theCursor: A cursor for the connection.
+    # Defined to 1: this database should only be used from one thread
+    # at a time.
     LOCKING_IS_COARSE = 1
     # Map from logical type to type used in (sqlite) database.
     REALTYPES = { 'timestamp' : 'integer',
@@ -78,53 +108,70 @@
               }
 
     def __init__(self, location):
+        """Create a SQLite database storing its data in the file 'location'."""
         parent = os.path.split(location)[0]
         createPrivateDir(parent)
         self._theConnection = sqlite.connect(location, autocommit=0)
         self._theCursor = self._theConnection.cursor()
 
     def close(self):
+        """Release resources held by this database."""
         self._theConnection.close()
         self._theConnection = self._theCursor = None
 
     def getConnection(self):
+        """Return a database connection object.  You do not need to close it
+           when you are done."""
+        # If LOCKING_IS_COARSE is true, this should return a singleton
+        # connection.  If LOCKING_IS_COARSE is false, this should
+        # return a thread-local connection.
         return self._theConnection
 
     def getCursor(self):
+        """Return a database cursor object."""
         return self._theCursor
 
-    def lock(self):
-        self.dbLock.acquire()
-
-    def unlock(self):
-        self.dbLock.release()
-
     def _objectExists(self, name, objType):
+        """Helper: Return true iff this database has an object called
+           'name' of the specified type.  objType should be one of
+           'view', 'table', or 'index'.
+        """
         self._theCursor.execute(
             "SELECT * FROM SQLITE_MASTER WHERE type = %s AND name = %s",
             (objType,name))
         rs = self._theCursor.fetchall()
         return len(rs) > 0
 
-    def createTable(self, name, rows, constraints=()):
+    def createTable(self, name, columns, constraints=()):
+        """If there is no table called 'name', create it.  Its columns
+           are given in 'columns', a list of tuples.  Each tuple is
+           either a 2-tuple containing a column name and a type, or a
+           3-tuple containing a column name, a type, and a series of
+           constraints.  Additional table-level constraints can be
+           given in a list as 'constraints'.
+        """
+        # This is more complicated than just saying "CREATE TABLE
+        # foo", but it lets us map types and constraints differently
+        # on differently broken databases.
+
         if self._objectExists(name,"table"):
             return
 
         body = []
-        for r in rows:
-            cname = r[0]
-            ctype = r[1]
+        for c in columns:
+            cname = c[0]
+            ctype = c[1]
             if '(' in ctype:
                 idx = ctype.find('(')
                 ctype = self.REALTYPES[ctype[:idx]]+ctype[idx:]
             else:
                 ctype = self.REALTYPES[ctype]
 
-            if len(r) == 2:
+            if len(c) == 2:
                 body.append("%s %s"%(cname, ctype))
             else:
-                assert len(r) == 3
-                body.append("%s %s %s"%(cname, ctype, r[2]))
+                assert len(c) == 3
+                body.append("%s %s %s"%(cname, ctype, c[2]))
 
         body.extend(constraints)
 
@@ -133,6 +180,10 @@
         self._theConnection.commit()
 
     def createIndex(self, name, tablename, columns, unique=0):
+        """If the index 'name', doesn't already exist, create an index
+           of that name on the table 'tablename', indexing the columns
+           'columns'.  If 'unique', create an index of unique values.
+        """
         if self._objectExists(name, "index"):
             return
 
@@ -146,15 +197,28 @@
         self._theConnection.commit()
 
     def time(self, t=None):
+        """Convert 't' (a Unix time in seconds since the epoch) into the
+           format the database wants for its timestamp fields.  If 't' is None,
+           use the current time instead.
+        """
         return long(t or time.time())
 
     def bool(self, b):
+        """Convert the boolean 'b' into the format the database wants for its
+           boolean fields."""
         if b:
             return 1
         else:
             return 0
 
     def getInsertOrUpdateFn(self, table, keyCols, valCols):
+        """Return a function that takes two arguments: a tuple of
+           values for the columns in keyCols, and a tuple of values
+           for the columns in valCols.  If some row in 'table' has the
+           given values for the key columns, this function will update
+           that row and set the values of the value columns.
+           Otherwise, the function will insert a new row with the
+           given value columns."""
 ##         update = "UPDATE %s SET %s WHERE %s" % (
 ##             table,
 ##             ", ".join(["%s = %%s" % k for k in valCols]),
@@ -177,10 +241,35 @@
         return fn
 
 class PingLog:
+    """A PingLog stores a series of pinging-related events to a
+       persistant relational database, and calculates server statistics based
+       on those events.
+    """
+    ## Fields:
+    # _db: the underlying database.
+    # _lock: an instance of threading.RLock to control access to in-memory
+    #    structures.  The databse is responsible for keeping its own structures
+    #    consistent.
+    # _serverIDs: A map from lc server nickname to server ID used in the
+    #    database.
+    # _intervalIDs: A map from (start,end) to a time interval ID used in the
+    #    database
+    # _serverReliability: A map from lc server nickname to last computed
+    #    server reliability (a float between 0 and 1).
+    # _intervals: An instance of IntervalSchedule.
+    # _brokenChains, _interestingChains: Maps from comma-separated
+    #   lowercase nicknames for servers in chains that we believe to be
+    #   broken or "interesting" to 1.
+    # _startTime: The 'startup' time for the current myLifespan row.
+    # _lastRecalculation: The last time this process recomputed all
+    #   the stats, or 0 for 'never'.
+    # _set{Uptime|OneHop|CurOneHop|TwoHop}: Functions generated by
+    #   getInsertOrUpdateFn.
+
+    # FFFF Maybe refactor this into data storage and stats computation.
     def __init__(self, db):
+        """Create a new PingLog, storing events into the databse 'db'."""
         self._db = db
-        self._time = self._db.time
-        self._bool = self._db.bool
         self._lock = threading.RLock()
         self._serverIDs = {}
         self._intervalIDs = {}
@@ -194,6 +283,8 @@
         self._loadServers()
 
     def _createAllTables(self):
+        """Helper: check for the existence of all the tables and indices
+           we plan to use, and create the missing ones."""
         self._lock.acquire()
         try:
             # FFFF There are still a few sucky bits of this DB design.
@@ -202,31 +293,56 @@
             # FFFF for us, without our having to give a sequence.)
             # FFFF Second, paths probably want to have their own table.
 
-            # Raw data
+            #### Tables holding raw data.
+
+            # Holds intervals over which this server was running.  A
+            # row in myLifespan means: "This server started running at
+            # 'startup', and was still running at 'stillup'. If
+            # shutdown is provided, that's when the server shut down."
             self._db.createTable("myLifespan",
                                  [("startup",  "timestamp", "not null"),
                                   ("stillup",  "timestamp", "not null"),
                                   ("shutdown", "timestamp")])
+
+            # Holds information about probe packets sent into the network.  A
+            # row in ping means: We send a probe packet along the path 'path'
+            # at the time 'sentat'.  The payload of the message we receive
+            # will hash to 'hash' (base-64 encoded).  If 'received' is not
+            # null, we received the packet again at 'received'.
             self._db.createTable("ping",
                                  [("hash",     "char(28)",     "primary key"),
                                   ("path",     "varchar(200)", "not null"),
                                   ("sentat",   "timestamp",    "not null"),
                                   ("received", "timestamp")])
+
+            # Holds lowercased nicknames for all the servers we know about.
             self._db.createTable("server",
                                  [("id",   "integer",     "primary key"),
                                   ("name", "varchar(32)", "unique not null")])
+
+            # Holds information about our attempts to launch MMTP connections
+            # to other servers.  A row in connectionAttempt means: We tried to
+            # connect to 'serever' at the time 'at'.  If 'success', we
+            # successfully connected and negotiated a protocol version.
+            # Otherwise, we failed before we could negotiate a protocol version.
             self._db.createTable(
                 "connectionAttempt",
                 [("at",       "timestamp", "not null"),
                  ("server",   "integer",   "not null REFERENCES server(id)"),
                  ("success",  "bool",      "not null")])
 
-            # Results
+            #### Tables holding results.
+
+            # Maps spans of time (currently, days) to identifiers.
             self._db.createTable("statsInterval",
                                  [("id",      "integer",   "primary key"),
                                   ("startAt", "timestamp", "not null"),
                                   ("endAt",   "timestamp", "not null")])
 
+            # Holds estimated server uptimes.  Each row in uptime means:
+            # during 'interval', our successful and failed connections to
+            # 'server' make us believe that it was running and on the network
+            # about 'uptime' fraction of the time.
             self._db.createTable(
                 "uptime",
                 [("interval", "integer", "not null REFERENCES statsinterval(id)"),
@@ -234,6 +350,18 @@
                  ("uptime",   "float",   "not null")],
                 ["PRIMARY KEY (interval, server)"])
 
+            # Holds estimates for server latency and reliability for a given
+            # interval.  Each row in echolotOneHopResult means: during
+            # 'interval', we sent 'nSent' one-hop probe messages to 'server'.
+            # We eventually received 'nReceived' of them.  The median latency
+            # (rounded off) of those we received was 'latency' seconds.
+            # Weighting unreceived probe messages by the fraction we would
+            # have expected to see by the time we computed the results times
+            # 0.8, and weighting received probes by 1.0, the weighted number
+            # of sent and received pings are in 'wsent' and 'wreceived', and
+            # the weighted fraction received is 'reliability'.
+
+            # (Yes, Echolot is dark magic.)
             self._db.createTable(
                 "echolotOneHopResult",
                   [("server",   "integer", "not null REFERENCES server(id)"),
@@ -246,6 +374,12 @@
                    ("reliability", "float",   "not null")],
                 ["PRIMARY KEY (server, interval)"])
 
+            # Holds estimates for server latency and reliability over the last
+            # several (12) days.  A row in echolotCurrentOneHopResults means:
+            # We most recently computed single-hop probe statistics for server
+            # at 'at'. Over the last several days, its median latency
+            # (rounded) has been 'latency' seconds, and its reliability,
+            # weighted by relevance of day, has been 'reliability'.
             self._db.createTable(
                 "echolotCurrentOneHopResult",
                 [("server",      "integer",
@@ -254,6 +388,16 @@
                  ("latency",     "integer",   "not null"),
                  ("reliability", "float",     "not null")])
 
+            # Holds estimates for two-hop chain reliability. Each row means:
+            # We most recently calculted the reliability for the two-hop chain
+            # 'server1,server2' at 'at'.  Over the last several (12) days, we
+            # sent nSent probes, and have received nReceieved of them.  Iff
+            # 'broken', the fraction received is so much lower that what we'd
+            # expect that we have concluded that the chain is probably broken.
+            # Iff 'interesting', then there is not enough data to be sure, so
+            # we're going to probe this chain a bit more frequently for a
+            # while.
+
             self._db.createTable(
                 "echolotCurrentTwoHopResult",
                 [("server1",     "integer",   "not null REFERENCES server(id)"),
@@ -265,6 +409,8 @@
                  ("interesting", "bool",      "not null")],
                 ["PRIMARY KEY (server1, server2)"])
 
+            #### Indices.
+
             self._db.createIndex("serverName", "server", ["name"], unique=1)
             self._db.createIndex("statsIntervalSE", "statsInterval",
                                  ["startAt", "endAt"], unique=1)
@@ -275,7 +421,8 @@
             self._db.createIndex("connectionAttemptServerAt",
                                  "connectionAttempt", ["server","at"])
 
-            # indices on echolot*results, uptimes.
+            # XXXX008 We should probably have indices on echolot*results,
+            # uptimes.
 
             self._setUptime = self._db.getInsertOrUpdateFn(
                 "uptime", ["interval", "server"], ["uptime"])
@@ -296,7 +443,10 @@
             self._lock.release()
 
     def _loadServers(self):
-        # hold lock.
+        """Helper function; callers must hold lock.  Load _serverIDs,
+           _serverReliability, _brokenChains, and _interestingChains from
+           the database.
+        """
         cur = self._db.getCursor()
         cur.execute("SELECT id, name FROM server")
         res = cur.fetchall()
@@ -332,15 +482,18 @@
         self._interestingChains = interesting
 
     def updateServers(self, names):
-        self._lock.acquire()
-        try:
-            for n in names:
-                self._getServerID(n)
-        finally:
-            self._lock.release()
+        """Add the names in 'names' to the database, if they aren't there
+           already.
+        """
+        for n in names:
+            self._getServerID(n)
+        self._db.getConnection().commit()
 
     def _getServerID(self, name):
-        # doesn't commit.
+        """Helper: Return the database ID for the server named 'name'.  If the
+           database doesn't know about the server yet, add it.  Does not
+           commit the current transaction.
+        """
         name = name.lower()
         self._lock.acquire()
         try:
@@ -361,6 +514,10 @@
         return ident
 
     def _getIntervalID(self, start, end):
+        """Helper: Return the database ID for the interval spanning from
+           'start' to 'end'.  If the database doesn't know about the interval
+           yet, add it.  Does not commit the current transaction.
+        """
         # CACHE THESE? FFFF
         start = self._db.time(start)
         end = self._db.time(end)
@@ -380,10 +537,14 @@
         return r[0][0]
 
     def rotate(self, dataCutoff, resultsCutoff):
+        """Remove expired entries from the database. Remove any raw data from
+           before 'dataCutoff', and any computed statistics from before
+           'resultsCutoff'.
+        """
         #if now is None: now = time.time()
         #sec = config['Pinging']
-        #dataCutoff = self._time(now - sec['RetainPingData'])
-        #resultsCutoff = self._time(now - sec['RetainPingResults'])
+        #dataCutoff = self._db.time(now - sec['RetainPingData'])
+        #resultsCutoff = self._db.time(now - sec['RetainPingResults'])
 
         cur = self._db.getCursor()
         cur.execute("DELETE FROM myLifespan WHERE stillup < %s", dataCutoff)
@@ -401,59 +562,82 @@
         self._db.getConnection().commit()
 
     def flush(self):
+        """Write any pending information to disk."""
         self._db.getConnection().commit()
 
     def close(self):
+        """Release all resources held by this PingLog and the underlying
+           database."""
         self._db.close()
 
     _STARTUP = "INSERT INTO myLifespan (startup, stillup, shutdown) VALUES (%s,%s, 0)"
     def startup(self,now=None):
+        """Called when the server has just started.  Starts tracking a new
+           interval of this server's lifetime."""
         self._lock.acquire()
-        self._startTime = now = self._time(now)
+        self._startTime = now = self._db.time(now)
         self._lock.release()
         self._db.getCursor().execute(self._STARTUP, (now,now))
         self._db.getConnection().commit()
 
     _SHUTDOWN = "UPDATE myLifespan SET stillup = %s, shutdown = %s WHERE startup = %s"
     def shutdown(self, now=None):
+        """Called when the server is shutting down. Stops tracking the current
+           interval of this server's lifetime."""
         if self._startTime is None: self.startup()
-        now = self._time(now)
+        now = self._db.time(now)
         self._db.getCursor().execute(self._SHUTDOWN, (now, now, self._startTime))
         self._db.getConnection().commit()
 
     _HEARTBEAT = "UPDATE myLifespan SET stillup = %s WHERE startup = %s AND stillup < %s"
     def heartbeat(self, now=None):
+        """Called periodically.  Notes that the server is still running as of
+           the time 'now'."""
         if self._startTime is None: self.startup()
-        now = self._time(now)
+        now = self._db.time(now)
         self._db.getCursor().execute(self._HEARTBEAT, (now, self._startTime, now))
         self._db.getConnection().commit()
 
     _CONNECTED = ("INSERT INTO connectionAttempt (at, server, success) "
                   "VALUES (%s,%s,%s)")
     def connected(self, nickname, success=1, now=None):
+        """Note that we attempted to connect to the server named 'nickname'.
+           We successfully negotiated a protocol iff success is true.
+        """
         serverID = self._getServerID(nickname)
         self._db.getCursor().execute(self._CONNECTED,
-                        (self._time(now), serverID, self._bool(success)))
+                        (self._db.time(now), serverID, self._db.bool(success)))
         self._db.getConnection().commit()
 
     def connectFailed(self, nickname, now=None):
+        """Note that we attempted to connect to the server named 'nickname',
+           but could not negotiate a protocol.
+        """
         self.connected(nickname, success=0, now=now)
 
     _QUEUED_PING = ("INSERT INTO ping (hash, path, sentat, received)"
                     "VALUES (%s,%s,%s,%s)")
     def queuedPing(self, hash, path, now=None):
+        """Note that we send a probe message along 'path' (a comma-separated
+           sequence of server nicknames, excluding ourself as first and last
+           hop), such that the payload, when delivered, will have 'hash' as
+           its digest.
+        """
         assert len(hash) == mixminion.Crypto.DIGEST_LEN
         path = path.lower()
         for s in path.split(","):
             self._getServerID(s)
         self._db.getCursor().execute(self._QUEUED_PING,
-                             (formatBase64(hash), path, self._time(now), 0))
+                             (formatBase64(hash), path, self._db.time(now), 0))
         self._db.getConnection().commit()
 
     _GOT_PING = "UPDATE ping SET received = %s WHERE hash = %s"
     def gotPing(self, hash, now=None):
+        """Note that we have received a probe message whose payload had 'hash'
+           as its digest.
+        """
         assert len(hash) == mixminion.Crypto.DIGEST_LEN
-        self._db.getCursor().execute(self._GOT_PING, (self._time(now), formatBase64(hash)))
+        self._db.getCursor().execute(self._GOT_PING, (self._db.time(now), formatBase64(hash)))
         n = self._db.getCursor().rowcount
         if n == 0:
             LOG.warn("Received ping with no record of its hash")
@@ -461,7 +645,10 @@
             LOG.warn("Received ping with multiple hash entries!")
 
     def _calculateUptimes(self, serverNames, startTime, endTime, now=None):
-        # commit when one done
+        """Helper: calculate the uptime results for a set of servers, named in
+           serverNames, for all intervals between startTime and endTime
+           inclusive.  Does not commit the current transaction.
+        """
         cur = self._db.getCursor()
         serverNames.sort()
 
@@ -475,7 +662,7 @@
 
         cur.execute("SELECT startup, stillup, shutdown FROM myLifespan WHERE "
                     "startup <= %s AND stillup >= %s",
-                    self._time(endTime), self._time(startTime))
+                    self._db.time(endTime), self._db.time(startTime))
         myUptime = 0
         myIntervals = IntervalSet([ (start, max(end,shutdown))
                                     for start,end,shutdown in cur ])
@@ -528,6 +715,8 @@
                 self._setUptime((intervalID, serverID), (fraction,))
 
     def calculateUptimes(self, startAt, endAt, now=None):
+        """Calculate the uptimes for all servers for all intervals between
+           startAt and endAt, inclusive."""
         if now is None: now = time.time()
         self._lock.acquire()
         try:
@@ -539,8 +728,8 @@
         self._db.getConnection().commit()
 
     def getUptimes(self, startAt, endAt):
-        """DODOC: uptimes for all servers overlapping [startAt, endAt],
-           as mapping from (start,end) to nickname to fraction.
+        """Return uptimes for all servers overlapping [startAt, endAt],
+           as mapping from (start,end) to lowercase nickname to fraction.
         """
         result = {}
         cur = self._db.getCursor()
@@ -549,15 +738,17 @@
                     "WHERE statsInterval.id = uptime.interval "
                     "AND server.id = uptime.server "
                     "AND %s >= startat AND %s <= endat",
-                    (self._time(startAt), self._time(endAt)))
+                    (self._db.time(startAt), self._db.time(endAt)))
         for s,e,n,u in cur:
             result.setdefault((s,e), {})[n] = u
         self._db.getConnection().commit()
         return result
 
     def _roundLatency(self, latency):
-        """Using a median latency can leak the fact that a message was a
-           ping. DOCDOC"""
+        """Return 'latency', rounded to an even unit of time.  Revealing
+           median lancency directly can leak the fact that a message was a
+           ping.
+        """
         for cutoff, q in [
             (60, 5), (10*60, 60), (30*60, 2*60),
             (60*60, 5*60), (3*60*60, 10*60), (12*60*60, 20*60),
@@ -575,6 +766,11 @@
     _PING_GRANULARITY = 24*60*60
     def _calculateOneHopResult(self, serverName, startTime, endTime,
                                 now=None, calculateOverallResults=1):
+        """Calculate the latency and reliablity for a given server on
+           intervals between startTime and endTime, inclusive.  If
+           calculateOverallResults is true, also compute the current overall
+           results for that server.
+        """
         # commit when done; serverName must exist.
         cur = self._db.getCursor()
         if now is None:
@@ -685,10 +881,12 @@
             rel = wrcvd / wsent
         else:
             rel = 0.0
-        self._setCurOneHop((serverID,), (self._time(now), latent, rel))
+        self._setCurOneHop((serverID,), (self._db.time(now), latent, rel))
         return rel
 
     def calculateOneHopResult(self, now=None):
+        """Calculate latency and reliability for all servers.
+        """
         self._lock.acquire()
         try:
             serverNames = self._serverIDs.keys()
@@ -713,36 +911,46 @@
             self._lock.release()
 
     def _calculate2ChainStatus(self, since, s1, s2, now=None):
+        """Helper: Calculate the status (broken/interesting/both/neither) for
+           a chain of the servers 's1' and 's2' (given as lc nicknames),
+           considering pings sent since 'since'.  Return a tuple of (number of
+           pings sent, number of those pings received, is-broken,
+           is-interesting).  Does not commit the current transaction.
+        """
         # doesn't commit.
         cur = self._db.getCursor()
         path = "%s,%s"%(s1,s2)
         cur.execute("SELECT count() FROM ping WHERE path = %s"
                     " AND sentat >= %s",
-                    (path,self._time(since)))
+                    (path,self._db.time(since)))
         nSent, = cur.fetchone()
         cur.execute("SELECT count() FROM ping WHERE path = %s"
                     " AND sentat >= %s AND received > 0",
                     (path,since))
         nReceived, = cur.fetchone()
 
+        # Product: expected reliability.
         try:
             product = self._serverReliability[s1] * self._serverReliability[s2]
         except KeyError:
             product = None
 
+        # Frac: actual reliability
         if nSent == 0:
             frac = 0.0
         else:
             frac = float(nReceived)/nSent
 
         isBroken = nSent >= 3 and product and frac <= product*0.3
+
         isInteresting = ((nSent < 3 and nReceived == 0) or
                          (product and frac <= product*0.3))
 
-        return nSent, nReceived, product, isBroken, isInteresting
+        return nSent, nReceived, isBroken, isInteresting
 
     _CHAIN_PING_HORIZON = 12*ONE_DAY
     def calculateChainStatus(self, now=None):
+        """Calculate the status of all two-hop chains."""
         self._lock.acquire()
         try:
             serverNames = self._serverIDs.keys()
@@ -762,7 +970,7 @@
             for s2 in serverNames:
                 if s2 == '<self>': continue
                 p = "%s,%s"%(s1,s2)
-                nS, nR, prod, isBroken, isInteresting = \
+                nS, nR, isBroken, isInteresting = \
                     self._calculate2ChainStatus(since, s1, s2)
                 if isBroken:
                     brokenChains[p] = 1
@@ -771,8 +979,8 @@
 
                 self._setTwoHop(
                     (self._getServerID(s1), self._getServerID(s2)),
-                    (self._time(now), nS, nR, self._bool(isBroken),
-                     self._bool(isInteresting)))
+                    (self._db.time(now), nS, nR, self._db.bool(isBroken),
+                     self._db.bool(isInteresting)))
         self._db.getConnection().commit()
 
         self._lock.acquire()
@@ -783,6 +991,8 @@
             self._lock.release()
 
     def dumpAllStatus(self,f,since,now=None):
+        """Write statistics into the file object 'f' for all intervals since
+           'since', inclusive."""
         self._lock.acquire()
         try:
             serverNames = self._serverIDs.keys()
@@ -867,6 +1077,10 @@
         self._db.getConnection().commit()
 
     def calculateAll(self, outFname=None, now=None):
+        """Recalculate all statistics, writing the results into a file called
+           'outFname'.  If 'outFname' is None, only save the results into the
+           database.
+        """
         if now is None: now=time.time()
         LOG.info("Computing ping results.")
         LOG.info("Starting to compute server uptimes.")
@@ -884,34 +1098,66 @@
         self.lastCalculation = now
 
 class PingGenerator:
-    """DOCDOC"""
-    #XXXX008 add abstract functions.
+    """Abstract class: A PingGenerator periodically sends traffic into the
+       network, or adds link padding to outgoing connections.
+    """
+    ## Fields:
+    # directory: an instance of ClientDirectory
+    # pingLog: an instance of PingLog
+    # outgoingQueue: an instance of outgoingQueue, if we're going to send
+    #   pings
+    # myNickname: the nickname of this server
     def __init__(self, config):
+        """Create a new PingGenerator with a given configuration"""
         self.directory = None
         self.pingLog = None
         self.outgoingQueue = None
         self.myNickname = config['Server']['Nickname']
-        self.latestStatistics = None
 
     def connect(self, directory, outgoingQueue, pingLog, keyring):
+        """Use the provided directory/queue/pingLog/keyring as needed.
+           This will be called before other methods of this generator."""
         pass
 
     def directoryUpdated(self):
+        """Called when the directory has changed."""
         pass
 
     def getFirstPingTime(self):
+        """Return the next time when we want sendPings() to be called.  Valid
+           once scheduleAllPings has been called."""
         return None
 
     def scheduleAllPings(self, now=None):
+        """Figure out when we want to ping what."""
         pass
 
     def sendPings(self, now=None):
+        """Send all pings that are currently pending by adding them to the
+           outgoing queue we were connected to.
+        """
         pass
 
     def addLinkPadding(self, pkts):
+        """Given a map from addesses (MMTPHostInfo) to lists of
+           DeliverableMessage, add link padding objects
+           (mixminion.server.MMTPServer.LinkPadding) as desired.  This will be
+           called on all outgoing message batches.
+        """
         pass
 
     def _sendOnePing(self, path1, path2):
+        """Helper called by subclasses.  Add a ping down a two-stage path;
+           queue the ping in self.outgoingQueue, and log the event in
+           self.pingLog.  Path1 and path2 are lists of ServerInfo objects, or
+           nicknames to be resolved by self.directory.getPath.
+
+           self.path2 must end with self.keyring.getCurrentDescriptor().
+           NOTE: Don't use self.myNickname in path1 or path2; we may not
+           be in the directory.
+
+           Return 1 if we are able to queue the ping, 0 otherwise.
+        """
         assert path1 and path2
         assert path2[-1].getNickname() == self.myNickname
         try:
@@ -935,6 +1181,15 @@
         return 1
 
 class _PingScheduler:
+    """Helper class: use an echolot-like approach to schedule pings.
+
+       We divide time into a series of 'periods'.  Within each 'period', we
+       ping servers at regular 'intervals', with the first interval offset
+       from the start of the period by a random-looking amount.
+    """
+    ## Fields:
+    # nextPingTime: map from path to the next time we plan to ping it.
+    # seed: a secret random value used to computer perturbations.
     def __init__(self):
         self.nextPingTime = {}#path->when
     def connect(self, directory, outgoingQueue, pingLog, keyring):
@@ -944,19 +1199,30 @@
         self.keyring = keyring
         self.seed = keyring.getPingerSeed()
     def _calcPeriodLen(self, interval):
+        """Helper: Given a period length, return the interval length we'll
+           use.
+        """
         period = ONE_DAY
         while period < interval*2:
             period *= 2
         return period
     def scheduleAllPings(self, now=None):
+        # Subclasses should override this to call _schedulePing on all paths.
         raise NotImplemented()
     def _getPeriodStart(self, t):
+        """Abstract: Return the start of the period containing the time t."""
+        #XXXX008 should take _calcPeriodLen into account?
         raise NotImplemented()
     def _getPingInterval(self, path):
+        """Abstract: Return the interval of pings for the path 'path'."""
         raise NotImplemented()
     def _getPeriodLength(self):
+        """Abstract: Return the length of the period."""
         raise NotImplemented()
     def _schedulePing(self,path,now=None):
+        """Helper: schedule a single ping along the path 'path', adding
+           it to self.nextPingTime.
+        """
         if now is None: now = int(time.time())
         periodStart = self._getPeriodStart(now)
         periodEnd = periodStart + self._period_length
@@ -975,6 +1241,9 @@
                       ",".join(path), formatTime(t,1))
         return t
     def _getPerturbation(self, path, periodStart, interval):
+        """Return the offset to be used for the ping intervals for 'path'
+           of interval 'interval' within the period starting at 'periodStart'.
+        """
         sha = mixminion.Crypto.sha1("%s@@%s@@%s"%(",".join(path),
                                                   interval,
                                                   self.seed))
@@ -990,7 +1259,8 @@
             return None
 
 class OneHopPingGenerator(_PingScheduler,PingGenerator):
-    """DOCDOC"""
+    """A OneHopPingGenerator uses the Echolot ping algorithm to schedule
+       single-hop pings all known servers."""
     def __init__(self, config):
         PingGenerator.__init__(self, config)
         _PingScheduler.__init__(self)
@@ -1044,7 +1314,12 @@
                 self._schedulePing((n,), now+60)
 
 class TwoHopPingGenerator(_PingScheduler, PingGenerator):
-    """DOCDOC"""
+    """A TwoHopPingGenerator uses the Echolot ping algorithm to schedule
+       two-hop pings to all known pairs of servers.
+
+       If we conclude that a chain of servers is 'interesting' (possibly
+       broken, or too little data to tell), we ping it more frequently.
+    """
     def __init__(self, config):
         PingGenerator.__init__(self, config)
         _PingScheduler.__init__(self)
@@ -1105,7 +1380,8 @@
                 self._schedulePing((n1,n2), now+60)
 
 class TestLinkPaddingGenerator(PingGenerator):
-    """DOCDOC"""
+    """A PingGenerator to ensure that we randomly probe all known server
+       addresses for liveness, from time to time."""
     def __init__(self, config):
         PingGenerator.__init__(self,config)
         mixInterval = config['Server']['MixInterval'].getSeconds()
@@ -1133,6 +1409,8 @@
             pkts.setdefault(addr,[]).append(padding)
 
 class CompoundPingGenerator:
+    """A CompoundPingGenerator wraps several PingGenerators as a single
+       PingGenerator object."""
     def __init__(self, generators):
         self.gens = generators[:]
     def connect(self, directory, outgoingQueue, pingLog, keyring):
@@ -1167,8 +1445,9 @@
             g.addLinkPadding(pkts)
 
 def getPingGenerator(config):
-    """DOCDOC"""
-    if not config['Pinging'].get('Enabled'):
+    """Return the PingGenerator (if any) requested in config."""
+    #XXXX008 make it possible to turn off some of the components.
+    if not config['Pinging'].get('Enabled') or not canRunPinger():
         return CompoundPingGenerator([])
     pingers = []
     pingers.append(OneHopPingGenerator(config))
@@ -1177,12 +1456,22 @@
     return CompoundPingGenerator(pingers)
 
 def canRunPinger():
-    """DOCDOC"""
+    """Return true iff we have the required libraries installed to run a pinger.
+    """
     return sys.version_info[:2] >= (2,2) and sqlite is not None
 
+# Map from database type name to databae implementation class.
 DATABASE_CLASSES = { 'sqlite' : SQLiteDatabase }
 
 def openPingLog(config, location=None, databaseThread=None):
+    """Open a ping log based on the ServerConfig 'config'.  If 'location' is
+       provided, store the files in 'location'; otherwise, deduce where to
+       store the files from 'config'.  If databaseThread is provided and the
+       databse does not do well with multithreading (either no locking, or
+       locking too coarse-grained to use), then background all calls to
+       PingLog in databaseThread.
+    """
+
     # FFFF eventually, we should maybe support more than pysqlite.  But let's
     # FFFF not generalize until we have a 2nd case.
     database = 'sqlite'

Index: ServerMain.py
===================================================================
RCS file: /home/minion/cvsroot/src/minion/lib/mixminion/server/ServerMain.py,v
retrieving revision 1.145
retrieving revision 1.146
diff -u -d -r1.145 -r1.146
--- ServerMain.py	27 Dec 2004 00:09:21 -0000	1.145
+++ ServerMain.py	27 Dec 2004 00:15:58 -0000	1.146
@@ -153,13 +153,14 @@
     # packetHandler -- an instance of PacketHandler.
     # mixPool -- an instance of MixPool
     # processingThread -- an instance of ProcessingThread
+    # pingLog -- an instance of pingLog, or None
     def __init__(self, location, packetHandler):
         """Create an IncomingQueue that stores its packets in <location>
            and processes them through <packetHandler>."""
         mixminion.Filestore.StringStore.__init__(self, location, create=1)
         self.packetHandler = packetHandler
         self.mixPool = None
-        self.pingLog = None#DOCDOC
+        self.pingLog = None
 
     def connectQueues(self, mixPool, processingThread):
         """Sets the target mix queue"""
@@ -171,7 +172,9 @@
                 lambda self=self, h=h: self.__deliverPacket(h))
 
     def setPingLog(self, pingLog):
-        "DOCDOC"
+        """Configure this queue to inform 'pingLog' about received
+           ping messages.
+        """
         self.pingLog = pingLog
 
     def queuePacket(self, pkt):
@@ -336,8 +339,10 @@
     ## Fields:
     # server -- an instance of _MMTPServer
     # addr -- the key ID we published in our descriptor.
-    # incomingQueue -- pointer to IncomingQueue object to be used for
+    # incomingQueue -- the IncomingQueue object to be used for
     #        self->self communication.
+    # pingGenerator -- the pingGenerator that may want to add link padding
+    #        to outgoing packet sets, or None.
     def __init__(self, location, keyID):
         """Create a new OutgoingQueue that stores its packets in a given
            location."""
@@ -358,7 +363,7 @@
 
         self.server = server
         self.incomingQueue = incoming
-        self.pingGenerator = pingGenerator#DOCDOC
+        self.pingGenerator = pingGenerator
 
     def _deliverMessages(self, msgList):
         "Implementation of abstract method from DeliveryQueue."
@@ -718,12 +723,18 @@
     # cleaningThread: Thread used to remove packets in the background
     # processingThread: Thread to handle CPU-intensive activity without
     #    slowing down network interactivity.
-    # databaseThread: DOCDOC
+    # databaseThread: Thread to handle pinger database activitity that may
+    #    be slow.  (If the database has good locking, this is only statistics
+    #    recomputation.  If the database has dumb locking, this is all
+    #    database activity.)
     # lockFile: An instance of Lockfile to prevent multiple servers from
     #    running in the same directory.  The filename for this lock is
     #    stored in self.pidFile.
     # pidFile: Filename in which we store the pid of the running server.
-    # pingLog: DOCDOC
+    # pingLog: None, or an instance of PingLog that needs to be informed
+    #    about network probing activity.
+    # pingGenerator: None, or an instance of PingGenerator that will decide
+    #    when to generate probe traffic.
     def __init__(self, config):
         """Create a new server from a ServerConfig."""
         _Scheduler.__init__(self)