[Author Prev][Author Next][Thread Prev][Thread Next][Author Index][Thread Index]
[minion-cvs] Document pinging and related code; clean some interface...
Update of /home/minion/cvsroot/src/minion/lib/mixminion/server
In directory moria.mit.edu:/tmp/cvs-serv7890/lib/mixminion/server
Modified Files:
MMTPServer.py Pinger.py ServerMain.py
Log Message:
Document pinging and related code; clean some interfaces up slightly.
Index: MMTPServer.py
===================================================================
RCS file: /home/minion/cvsroot/src/minion/lib/mixminion/server/MMTPServer.py,v
retrieving revision 1.85
retrieving revision 1.86
diff -u -d -r1.85 -r1.86
--- MMTPServer.py 27 Jul 2004 04:33:20 -0000 1.85
+++ MMTPServer.py 27 Dec 2004 00:15:57 -0000 1.86
@@ -499,7 +499,9 @@
return 0
class LinkPadding(mixminion.MMTPClient.DeliverableMessage):
- """DOCDOC"""
+ """An instance of DeliverableMessage to implement link-padding
+ (junk) MMTP commands.
+ """
def __init__(self):
self.contents = getCommonPRNG().getBytes(1<<15)
def succeeded(self): pass
@@ -508,8 +510,18 @@
def isJunk(self): return 1
class _ClientCon(MMTPClientConnection):
- """DOCDOC"""
+ """A subclass of MMTPClientConnection that reports events to the
+ pinger subsystem."""
+ ## Fields:
+ # _pingLog: The PingLog we will inform about successful or failed
+ # connections, or None if we have no PingLog.
+ # _nickname: The nickname of the server we're trying to connect to.
+ # _wasOnceConnected: True iff we have successfully negotiated a protocol
+ # version with the other server.
def configurePingLog(self, pingLog, nickname):
+ """Must be called after construction: set this _ClientCon to
+ report events to the pinglog 'pingLog'; tell it that we are
+ connecting to the server named 'nickname'."""
self._pingLog = pingLog
self._nickname = nickname
self._wasOnceConnected = 0
@@ -608,6 +620,7 @@
self.dnsCache = dnsCache
def connectPingLog(self, pingLog):
+ """Report successful or failed connection attempts to 'pingLog'."""
self.pingLog = pingLog
def setServerContext(self, servercontext):
Index: Pinger.py
===================================================================
RCS file: /home/minion/cvsroot/src/minion/lib/mixminion/server/Pinger.py,v
retrieving revision 1.22
retrieving revision 1.23
diff -u -d -r1.22 -r1.23
--- Pinger.py 22 Dec 2004 04:47:10 -0000 1.22
+++ Pinger.py 27 Dec 2004 00:15:57 -0000 1.23
@@ -6,16 +6,21 @@
Built-in network reliability tester (pinger) for Mixminion servers.
Our pinger uses a three-part architecture. First, we periodically
- consider adding testing packets to the outgoing batches.
+ consider adding testing packets to the mix pool, and consider
+ adding link padding to outgoing patches. These functions are
+ perfomed by PingGenerator objects.
- Second, we note the outcome of connection attempts; and the timing
- of our sending/receiving test packets.
+ Second, we note the outcome of connection attempts, and the timing
+ of our sending/receiving test packets. These functions are
+ performed by a PingLog object.
Third, we use the timing/uptime information in the second part
above to try to infer how reliable the other nodes in the network
- are.
+ are. This is also done by PingLog.
- This module requires python 2.2 or later, and the sqlite module.
+ This module requires Python 2.2 or later, and the sqlite module.
+ I'm okay with this, since most servers shouldn't be running a
+ pinger anyway.
"""
import bisect
@@ -46,17 +51,24 @@
except ImportError:
sqlite = None
+# How often should the server store the fact that it is still alive (seconds).
HEARTBEAT_INTERVAL = 30*60
+# Number of seconds in a day.
ONE_DAY = 24*60*60
class IntervalSchedule:
- """DOCDOC -- defines a set of intervals in time."""
+ """A partition of time into a series of intervals. (Currently, only
+ days are supported."""
def __init__(self):
pass
def getIntervalContaining(self, t):
+ """Return a 2-tuple of the start and the end of the interval containing
+ 't'. The bottom of the interval is closed; the top is open."""
p = previousMidnight(t)
return p, succeedingMidnight(p)
def getIntervals(self, startAt, endAt):
+ """Return a list of all intervals between the one containing
+ startAt and the one containing endAt, inclusive."""
r = []
t = previousMidnight(startAt)
while t < endAt:
@@ -66,7 +78,25 @@
return r
class SQLiteDatabase:
- #XXXX can only be used from one thread at a time.
+ """Helper class. Encapsulates the properties of the SQLite
+ database implementation.
+
+ SQLite is a very minimal relational databse, and SQLite 2.8
+ (the one that PySQLite wraps) is even more minimal. It only
+ has two underlying types: string and number. It only enforces
+ them when doing comparisons. The locking is very
+ coarse-grained. Foreign key constraints aren't checked.
+
+ On the plus side, SQLite has no administrative overhead. You
+ don't need to run a daemon; you don't need to create users; you
+ only need to install a library.
+ """
+ ## Fields:
+ # _theConnection: A SQLite database connection. Only one thread should
+ # use this connection at a time.
+ # _theCursor: A cursor for the connection.
+ # Defined to 1: this database should only be used from one thread
+ # at a time.
LOCKING_IS_COARSE = 1
# Map from logical type to type used in (sqlite) database.
REALTYPES = { 'timestamp' : 'integer',
@@ -78,53 +108,70 @@
}
def __init__(self, location):
+ """Create a SQLite database storing its data in the file 'location'."""
parent = os.path.split(location)[0]
createPrivateDir(parent)
self._theConnection = sqlite.connect(location, autocommit=0)
self._theCursor = self._theConnection.cursor()
def close(self):
+ """Release resources held by this database."""
self._theConnection.close()
self._theConnection = self._theCursor = None
def getConnection(self):
+ """Return a database connection object. You do not need to close it
+ when you are done."""
+ # If LOCKING_IS_COARSE is true, this should return a singleton
+ # connection. If LOCKING_IS_COARSE is false, this should
+ # return a thread-local connection.
return self._theConnection
def getCursor(self):
+ """Return a database cursor object."""
return self._theCursor
- def lock(self):
- self.dbLock.acquire()
-
- def unlock(self):
- self.dbLock.release()
-
def _objectExists(self, name, objType):
+ """Helper: Return true iff this database has an object called
+ 'name' of the specified type. objType should be one of
+ 'view', 'table', or 'index'.
+ """
self._theCursor.execute(
"SELECT * FROM SQLITE_MASTER WHERE type = %s AND name = %s",
(objType,name))
rs = self._theCursor.fetchall()
return len(rs) > 0
- def createTable(self, name, rows, constraints=()):
+ def createTable(self, name, columns, constraints=()):
+ """If there is no table called 'name', create it. Its columns
+ are given in 'columns', a list of tuples. Each tuple is
+ either a 2-tuple containing a column name and a type, or a
+ 3-tuple containing a column name, a type, and a series of
+ constraints. Additional table-level constraints can be
+ given in a list as 'constraints'.
+ """
+ # This is more complicated than just saying "CREATE TABLE
+ # foo", but it lets us map types and constraints differently
+ # on differently broken databases.
+
if self._objectExists(name,"table"):
return
body = []
- for r in rows:
- cname = r[0]
- ctype = r[1]
+ for c in columns:
+ cname = c[0]
+ ctype = c[1]
if '(' in ctype:
idx = ctype.find('(')
ctype = self.REALTYPES[ctype[:idx]]+ctype[idx:]
else:
ctype = self.REALTYPES[ctype]
- if len(r) == 2:
+ if len(c) == 2:
body.append("%s %s"%(cname, ctype))
else:
- assert len(r) == 3
- body.append("%s %s %s"%(cname, ctype, r[2]))
+ assert len(c) == 3
+ body.append("%s %s %s"%(cname, ctype, c[2]))
body.extend(constraints)
@@ -133,6 +180,10 @@
self._theConnection.commit()
def createIndex(self, name, tablename, columns, unique=0):
+ """If the index 'name', doesn't already exist, create an index
+ of that name on the table 'tablename', indexing the columns
+ 'columns'. If 'unique', create an index of unique values.
+ """
if self._objectExists(name, "index"):
return
@@ -146,15 +197,28 @@
self._theConnection.commit()
def time(self, t=None):
+ """Convert 't' (a Unix time in seconds since the epoch) into the
+ format the database wants for its timestamp fields. If 't' is None,
+ use the current time instead.
+ """
return long(t or time.time())
def bool(self, b):
+ """Convert the boolean 'b' into the format the database wants for its
+ boolean fields."""
if b:
return 1
else:
return 0
def getInsertOrUpdateFn(self, table, keyCols, valCols):
+ """Return a function that takes two arguments: a tuple of
+ values for the columns in keyCols, and a tuple of values
+ for the columns in valCols. If some row in 'table' has the
+ given values for the key columns, this function will update
+ that row and set the values of the value columns.
+ Otherwise, the function will insert a new row with the
+ given value columns."""
## update = "UPDATE %s SET %s WHERE %s" % (
## table,
## ", ".join(["%s = %%s" % k for k in valCols]),
@@ -177,10 +241,35 @@
return fn
class PingLog:
+ """A PingLog stores a series of pinging-related events to a
+ persistant relational database, and calculates server statistics based
+ on those events.
+ """
+ ## Fields:
+ # _db: the underlying database.
+ # _lock: an instance of threading.RLock to control access to in-memory
+ # structures. The databse is responsible for keeping its own structures
+ # consistent.
+ # _serverIDs: A map from lc server nickname to server ID used in the
+ # database.
+ # _intervalIDs: A map from (start,end) to a time interval ID used in the
+ # database
+ # _serverReliability: A map from lc server nickname to last computed
+ # server reliability (a float between 0 and 1).
+ # _intervals: An instance of IntervalSchedule.
+ # _brokenChains, _interestingChains: Maps from comma-separated
+ # lowercase nicknames for servers in chains that we believe to be
+ # broken or "interesting" to 1.
+ # _startTime: The 'startup' time for the current myLifespan row.
+ # _lastRecalculation: The last time this process recomputed all
+ # the stats, or 0 for 'never'.
+ # _set{Uptime|OneHop|CurOneHop|TwoHop}: Functions generated by
+ # getInsertOrUpdateFn.
+
+ # FFFF Maybe refactor this into data storage and stats computation.
def __init__(self, db):
+ """Create a new PingLog, storing events into the databse 'db'."""
self._db = db
- self._time = self._db.time
- self._bool = self._db.bool
self._lock = threading.RLock()
self._serverIDs = {}
self._intervalIDs = {}
@@ -194,6 +283,8 @@
self._loadServers()
def _createAllTables(self):
+ """Helper: check for the existence of all the tables and indices
+ we plan to use, and create the missing ones."""
self._lock.acquire()
try:
# FFFF There are still a few sucky bits of this DB design.
@@ -202,31 +293,56 @@
# FFFF for us, without our having to give a sequence.)
# FFFF Second, paths probably want to have their own table.
- # Raw data
+ #### Tables holding raw data.
+
+ # Holds intervals over which this server was running. A
+ # row in myLifespan means: "This server started running at
+ # 'startup', and was still running at 'stillup'. If
+ # shutdown is provided, that's when the server shut down."
self._db.createTable("myLifespan",
[("startup", "timestamp", "not null"),
("stillup", "timestamp", "not null"),
("shutdown", "timestamp")])
+
+ # Holds information about probe packets sent into the network. A
+ # row in ping means: We send a probe packet along the path 'path'
+ # at the time 'sentat'. The payload of the message we receive
+ # will hash to 'hash' (base-64 encoded). If 'received' is not
+ # null, we received the packet again at 'received'.
self._db.createTable("ping",
[("hash", "char(28)", "primary key"),
("path", "varchar(200)", "not null"),
("sentat", "timestamp", "not null"),
("received", "timestamp")])
+
+ # Holds lowercased nicknames for all the servers we know about.
self._db.createTable("server",
[("id", "integer", "primary key"),
("name", "varchar(32)", "unique not null")])
+
+ # Holds information about our attempts to launch MMTP connections
+ # to other servers. A row in connectionAttempt means: We tried to
+ # connect to 'serever' at the time 'at'. If 'success', we
+ # successfully connected and negotiated a protocol version.
+ # Otherwise, we failed before we could negotiate a protocol version.
self._db.createTable(
"connectionAttempt",
[("at", "timestamp", "not null"),
("server", "integer", "not null REFERENCES server(id)"),
("success", "bool", "not null")])
- # Results
+ #### Tables holding results.
+
+ # Maps spans of time (currently, days) to identifiers.
self._db.createTable("statsInterval",
[("id", "integer", "primary key"),
("startAt", "timestamp", "not null"),
("endAt", "timestamp", "not null")])
+ # Holds estimated server uptimes. Each row in uptime means:
+ # during 'interval', our successful and failed connections to
+ # 'server' make us believe that it was running and on the network
+ # about 'uptime' fraction of the time.
self._db.createTable(
"uptime",
[("interval", "integer", "not null REFERENCES statsinterval(id)"),
@@ -234,6 +350,18 @@
("uptime", "float", "not null")],
["PRIMARY KEY (interval, server)"])
+ # Holds estimates for server latency and reliability for a given
+ # interval. Each row in echolotOneHopResult means: during
+ # 'interval', we sent 'nSent' one-hop probe messages to 'server'.
+ # We eventually received 'nReceived' of them. The median latency
+ # (rounded off) of those we received was 'latency' seconds.
+ # Weighting unreceived probe messages by the fraction we would
+ # have expected to see by the time we computed the results times
+ # 0.8, and weighting received probes by 1.0, the weighted number
+ # of sent and received pings are in 'wsent' and 'wreceived', and
+ # the weighted fraction received is 'reliability'.
+
+ # (Yes, Echolot is dark magic.)
self._db.createTable(
"echolotOneHopResult",
[("server", "integer", "not null REFERENCES server(id)"),
@@ -246,6 +374,12 @@
("reliability", "float", "not null")],
["PRIMARY KEY (server, interval)"])
+ # Holds estimates for server latency and reliability over the last
+ # several (12) days. A row in echolotCurrentOneHopResults means:
+ # We most recently computed single-hop probe statistics for server
+ # at 'at'. Over the last several days, its median latency
+ # (rounded) has been 'latency' seconds, and its reliability,
+ # weighted by relevance of day, has been 'reliability'.
self._db.createTable(
"echolotCurrentOneHopResult",
[("server", "integer",
@@ -254,6 +388,16 @@
("latency", "integer", "not null"),
("reliability", "float", "not null")])
+ # Holds estimates for two-hop chain reliability. Each row means:
+ # We most recently calculted the reliability for the two-hop chain
+ # 'server1,server2' at 'at'. Over the last several (12) days, we
+ # sent nSent probes, and have received nReceieved of them. Iff
+ # 'broken', the fraction received is so much lower that what we'd
+ # expect that we have concluded that the chain is probably broken.
+ # Iff 'interesting', then there is not enough data to be sure, so
+ # we're going to probe this chain a bit more frequently for a
+ # while.
+
self._db.createTable(
"echolotCurrentTwoHopResult",
[("server1", "integer", "not null REFERENCES server(id)"),
@@ -265,6 +409,8 @@
("interesting", "bool", "not null")],
["PRIMARY KEY (server1, server2)"])
+ #### Indices.
+
self._db.createIndex("serverName", "server", ["name"], unique=1)
self._db.createIndex("statsIntervalSE", "statsInterval",
["startAt", "endAt"], unique=1)
@@ -275,7 +421,8 @@
self._db.createIndex("connectionAttemptServerAt",
"connectionAttempt", ["server","at"])
- # indices on echolot*results, uptimes.
+ # XXXX008 We should probably have indices on echolot*results,
+ # uptimes.
self._setUptime = self._db.getInsertOrUpdateFn(
"uptime", ["interval", "server"], ["uptime"])
@@ -296,7 +443,10 @@
self._lock.release()
def _loadServers(self):
- # hold lock.
+ """Helper function; callers must hold lock. Load _serverIDs,
+ _serverReliability, _brokenChains, and _interestingChains from
+ the database.
+ """
cur = self._db.getCursor()
cur.execute("SELECT id, name FROM server")
res = cur.fetchall()
@@ -332,15 +482,18 @@
self._interestingChains = interesting
def updateServers(self, names):
- self._lock.acquire()
- try:
- for n in names:
- self._getServerID(n)
- finally:
- self._lock.release()
+ """Add the names in 'names' to the database, if they aren't there
+ already.
+ """
+ for n in names:
+ self._getServerID(n)
+ self._db.getConnection().commit()
def _getServerID(self, name):
- # doesn't commit.
+ """Helper: Return the database ID for the server named 'name'. If the
+ database doesn't know about the server yet, add it. Does not
+ commit the current transaction.
+ """
name = name.lower()
self._lock.acquire()
try:
@@ -361,6 +514,10 @@
return ident
def _getIntervalID(self, start, end):
+ """Helper: Return the database ID for the interval spanning from
+ 'start' to 'end'. If the database doesn't know about the interval
+ yet, add it. Does not commit the current transaction.
+ """
# CACHE THESE? FFFF
start = self._db.time(start)
end = self._db.time(end)
@@ -380,10 +537,14 @@
return r[0][0]
def rotate(self, dataCutoff, resultsCutoff):
+ """Remove expired entries from the database. Remove any raw data from
+ before 'dataCutoff', and any computed statistics from before
+ 'resultsCutoff'.
+ """
#if now is None: now = time.time()
#sec = config['Pinging']
- #dataCutoff = self._time(now - sec['RetainPingData'])
- #resultsCutoff = self._time(now - sec['RetainPingResults'])
+ #dataCutoff = self._db.time(now - sec['RetainPingData'])
+ #resultsCutoff = self._db.time(now - sec['RetainPingResults'])
cur = self._db.getCursor()
cur.execute("DELETE FROM myLifespan WHERE stillup < %s", dataCutoff)
@@ -401,59 +562,82 @@
self._db.getConnection().commit()
def flush(self):
+ """Write any pending information to disk."""
self._db.getConnection().commit()
def close(self):
+ """Release all resources held by this PingLog and the underlying
+ database."""
self._db.close()
_STARTUP = "INSERT INTO myLifespan (startup, stillup, shutdown) VALUES (%s,%s, 0)"
def startup(self,now=None):
+ """Called when the server has just started. Starts tracking a new
+ interval of this server's lifetime."""
self._lock.acquire()
- self._startTime = now = self._time(now)
+ self._startTime = now = self._db.time(now)
self._lock.release()
self._db.getCursor().execute(self._STARTUP, (now,now))
self._db.getConnection().commit()
_SHUTDOWN = "UPDATE myLifespan SET stillup = %s, shutdown = %s WHERE startup = %s"
def shutdown(self, now=None):
+ """Called when the server is shutting down. Stops tracking the current
+ interval of this server's lifetime."""
if self._startTime is None: self.startup()
- now = self._time(now)
+ now = self._db.time(now)
self._db.getCursor().execute(self._SHUTDOWN, (now, now, self._startTime))
self._db.getConnection().commit()
_HEARTBEAT = "UPDATE myLifespan SET stillup = %s WHERE startup = %s AND stillup < %s"
def heartbeat(self, now=None):
+ """Called periodically. Notes that the server is still running as of
+ the time 'now'."""
if self._startTime is None: self.startup()
- now = self._time(now)
+ now = self._db.time(now)
self._db.getCursor().execute(self._HEARTBEAT, (now, self._startTime, now))
self._db.getConnection().commit()
_CONNECTED = ("INSERT INTO connectionAttempt (at, server, success) "
"VALUES (%s,%s,%s)")
def connected(self, nickname, success=1, now=None):
+ """Note that we attempted to connect to the server named 'nickname'.
+ We successfully negotiated a protocol iff success is true.
+ """
serverID = self._getServerID(nickname)
self._db.getCursor().execute(self._CONNECTED,
- (self._time(now), serverID, self._bool(success)))
+ (self._db.time(now), serverID, self._db.bool(success)))
self._db.getConnection().commit()
def connectFailed(self, nickname, now=None):
+ """Note that we attempted to connect to the server named 'nickname',
+ but could not negotiate a protocol.
+ """
self.connected(nickname, success=0, now=now)
_QUEUED_PING = ("INSERT INTO ping (hash, path, sentat, received)"
"VALUES (%s,%s,%s,%s)")
def queuedPing(self, hash, path, now=None):
+ """Note that we send a probe message along 'path' (a comma-separated
+ sequence of server nicknames, excluding ourself as first and last
+ hop), such that the payload, when delivered, will have 'hash' as
+ its digest.
+ """
assert len(hash) == mixminion.Crypto.DIGEST_LEN
path = path.lower()
for s in path.split(","):
self._getServerID(s)
self._db.getCursor().execute(self._QUEUED_PING,
- (formatBase64(hash), path, self._time(now), 0))
+ (formatBase64(hash), path, self._db.time(now), 0))
self._db.getConnection().commit()
_GOT_PING = "UPDATE ping SET received = %s WHERE hash = %s"
def gotPing(self, hash, now=None):
+ """Note that we have received a probe message whose payload had 'hash'
+ as its digest.
+ """
assert len(hash) == mixminion.Crypto.DIGEST_LEN
- self._db.getCursor().execute(self._GOT_PING, (self._time(now), formatBase64(hash)))
+ self._db.getCursor().execute(self._GOT_PING, (self._db.time(now), formatBase64(hash)))
n = self._db.getCursor().rowcount
if n == 0:
LOG.warn("Received ping with no record of its hash")
@@ -461,7 +645,10 @@
LOG.warn("Received ping with multiple hash entries!")
def _calculateUptimes(self, serverNames, startTime, endTime, now=None):
- # commit when one done
+ """Helper: calculate the uptime results for a set of servers, named in
+ serverNames, for all intervals between startTime and endTime
+ inclusive. Does not commit the current transaction.
+ """
cur = self._db.getCursor()
serverNames.sort()
@@ -475,7 +662,7 @@
cur.execute("SELECT startup, stillup, shutdown FROM myLifespan WHERE "
"startup <= %s AND stillup >= %s",
- self._time(endTime), self._time(startTime))
+ self._db.time(endTime), self._db.time(startTime))
myUptime = 0
myIntervals = IntervalSet([ (start, max(end,shutdown))
for start,end,shutdown in cur ])
@@ -528,6 +715,8 @@
self._setUptime((intervalID, serverID), (fraction,))
def calculateUptimes(self, startAt, endAt, now=None):
+ """Calculate the uptimes for all servers for all intervals between
+ startAt and endAt, inclusive."""
if now is None: now = time.time()
self._lock.acquire()
try:
@@ -539,8 +728,8 @@
self._db.getConnection().commit()
def getUptimes(self, startAt, endAt):
- """DODOC: uptimes for all servers overlapping [startAt, endAt],
- as mapping from (start,end) to nickname to fraction.
+ """Return uptimes for all servers overlapping [startAt, endAt],
+ as mapping from (start,end) to lowercase nickname to fraction.
"""
result = {}
cur = self._db.getCursor()
@@ -549,15 +738,17 @@
"WHERE statsInterval.id = uptime.interval "
"AND server.id = uptime.server "
"AND %s >= startat AND %s <= endat",
- (self._time(startAt), self._time(endAt)))
+ (self._db.time(startAt), self._db.time(endAt)))
for s,e,n,u in cur:
result.setdefault((s,e), {})[n] = u
self._db.getConnection().commit()
return result
def _roundLatency(self, latency):
- """Using a median latency can leak the fact that a message was a
- ping. DOCDOC"""
+ """Return 'latency', rounded to an even unit of time. Revealing
+ median lancency directly can leak the fact that a message was a
+ ping.
+ """
for cutoff, q in [
(60, 5), (10*60, 60), (30*60, 2*60),
(60*60, 5*60), (3*60*60, 10*60), (12*60*60, 20*60),
@@ -575,6 +766,11 @@
_PING_GRANULARITY = 24*60*60
def _calculateOneHopResult(self, serverName, startTime, endTime,
now=None, calculateOverallResults=1):
+ """Calculate the latency and reliablity for a given server on
+ intervals between startTime and endTime, inclusive. If
+ calculateOverallResults is true, also compute the current overall
+ results for that server.
+ """
# commit when done; serverName must exist.
cur = self._db.getCursor()
if now is None:
@@ -685,10 +881,12 @@
rel = wrcvd / wsent
else:
rel = 0.0
- self._setCurOneHop((serverID,), (self._time(now), latent, rel))
+ self._setCurOneHop((serverID,), (self._db.time(now), latent, rel))
return rel
def calculateOneHopResult(self, now=None):
+ """Calculate latency and reliability for all servers.
+ """
self._lock.acquire()
try:
serverNames = self._serverIDs.keys()
@@ -713,36 +911,46 @@
self._lock.release()
def _calculate2ChainStatus(self, since, s1, s2, now=None):
+ """Helper: Calculate the status (broken/interesting/both/neither) for
+ a chain of the servers 's1' and 's2' (given as lc nicknames),
+ considering pings sent since 'since'. Return a tuple of (number of
+ pings sent, number of those pings received, is-broken,
+ is-interesting). Does not commit the current transaction.
+ """
# doesn't commit.
cur = self._db.getCursor()
path = "%s,%s"%(s1,s2)
cur.execute("SELECT count() FROM ping WHERE path = %s"
" AND sentat >= %s",
- (path,self._time(since)))
+ (path,self._db.time(since)))
nSent, = cur.fetchone()
cur.execute("SELECT count() FROM ping WHERE path = %s"
" AND sentat >= %s AND received > 0",
(path,since))
nReceived, = cur.fetchone()
+ # Product: expected reliability.
try:
product = self._serverReliability[s1] * self._serverReliability[s2]
except KeyError:
product = None
+ # Frac: actual reliability
if nSent == 0:
frac = 0.0
else:
frac = float(nReceived)/nSent
isBroken = nSent >= 3 and product and frac <= product*0.3
+
isInteresting = ((nSent < 3 and nReceived == 0) or
(product and frac <= product*0.3))
- return nSent, nReceived, product, isBroken, isInteresting
+ return nSent, nReceived, isBroken, isInteresting
_CHAIN_PING_HORIZON = 12*ONE_DAY
def calculateChainStatus(self, now=None):
+ """Calculate the status of all two-hop chains."""
self._lock.acquire()
try:
serverNames = self._serverIDs.keys()
@@ -762,7 +970,7 @@
for s2 in serverNames:
if s2 == '<self>': continue
p = "%s,%s"%(s1,s2)
- nS, nR, prod, isBroken, isInteresting = \
+ nS, nR, isBroken, isInteresting = \
self._calculate2ChainStatus(since, s1, s2)
if isBroken:
brokenChains[p] = 1
@@ -771,8 +979,8 @@
self._setTwoHop(
(self._getServerID(s1), self._getServerID(s2)),
- (self._time(now), nS, nR, self._bool(isBroken),
- self._bool(isInteresting)))
+ (self._db.time(now), nS, nR, self._db.bool(isBroken),
+ self._db.bool(isInteresting)))
self._db.getConnection().commit()
self._lock.acquire()
@@ -783,6 +991,8 @@
self._lock.release()
def dumpAllStatus(self,f,since,now=None):
+ """Write statistics into the file object 'f' for all intervals since
+ 'since', inclusive."""
self._lock.acquire()
try:
serverNames = self._serverIDs.keys()
@@ -867,6 +1077,10 @@
self._db.getConnection().commit()
def calculateAll(self, outFname=None, now=None):
+ """Recalculate all statistics, writing the results into a file called
+ 'outFname'. If 'outFname' is None, only save the results into the
+ database.
+ """
if now is None: now=time.time()
LOG.info("Computing ping results.")
LOG.info("Starting to compute server uptimes.")
@@ -884,34 +1098,66 @@
self.lastCalculation = now
class PingGenerator:
- """DOCDOC"""
- #XXXX008 add abstract functions.
+ """Abstract class: A PingGenerator periodically sends traffic into the
+ network, or adds link padding to outgoing connections.
+ """
+ ## Fields:
+ # directory: an instance of ClientDirectory
+ # pingLog: an instance of PingLog
+ # outgoingQueue: an instance of outgoingQueue, if we're going to send
+ # pings
+ # myNickname: the nickname of this server
def __init__(self, config):
+ """Create a new PingGenerator with a given configuration"""
self.directory = None
self.pingLog = None
self.outgoingQueue = None
self.myNickname = config['Server']['Nickname']
- self.latestStatistics = None
def connect(self, directory, outgoingQueue, pingLog, keyring):
+ """Use the provided directory/queue/pingLog/keyring as needed.
+ This will be called before other methods of this generator."""
pass
def directoryUpdated(self):
+ """Called when the directory has changed."""
pass
def getFirstPingTime(self):
+ """Return the next time when we want sendPings() to be called. Valid
+ once scheduleAllPings has been called."""
return None
def scheduleAllPings(self, now=None):
+ """Figure out when we want to ping what."""
pass
def sendPings(self, now=None):
+ """Send all pings that are currently pending by adding them to the
+ outgoing queue we were connected to.
+ """
pass
def addLinkPadding(self, pkts):
+ """Given a map from addesses (MMTPHostInfo) to lists of
+ DeliverableMessage, add link padding objects
+ (mixminion.server.MMTPServer.LinkPadding) as desired. This will be
+ called on all outgoing message batches.
+ """
pass
def _sendOnePing(self, path1, path2):
+ """Helper called by subclasses. Add a ping down a two-stage path;
+ queue the ping in self.outgoingQueue, and log the event in
+ self.pingLog. Path1 and path2 are lists of ServerInfo objects, or
+ nicknames to be resolved by self.directory.getPath.
+
+ self.path2 must end with self.keyring.getCurrentDescriptor().
+ NOTE: Don't use self.myNickname in path1 or path2; we may not
+ be in the directory.
+
+ Return 1 if we are able to queue the ping, 0 otherwise.
+ """
assert path1 and path2
assert path2[-1].getNickname() == self.myNickname
try:
@@ -935,6 +1181,15 @@
return 1
class _PingScheduler:
+ """Helper class: use an echolot-like approach to schedule pings.
+
+ We divide time into a series of 'periods'. Within each 'period', we
+ ping servers at regular 'intervals', with the first interval offset
+ from the start of the period by a random-looking amount.
+ """
+ ## Fields:
+ # nextPingTime: map from path to the next time we plan to ping it.
+ # seed: a secret random value used to computer perturbations.
def __init__(self):
self.nextPingTime = {}#path->when
def connect(self, directory, outgoingQueue, pingLog, keyring):
@@ -944,19 +1199,30 @@
self.keyring = keyring
self.seed = keyring.getPingerSeed()
def _calcPeriodLen(self, interval):
+ """Helper: Given a period length, return the interval length we'll
+ use.
+ """
period = ONE_DAY
while period < interval*2:
period *= 2
return period
def scheduleAllPings(self, now=None):
+ # Subclasses should override this to call _schedulePing on all paths.
raise NotImplemented()
def _getPeriodStart(self, t):
+ """Abstract: Return the start of the period containing the time t."""
+ #XXXX008 should take _calcPeriodLen into account?
raise NotImplemented()
def _getPingInterval(self, path):
+ """Abstract: Return the interval of pings for the path 'path'."""
raise NotImplemented()
def _getPeriodLength(self):
+ """Abstract: Return the length of the period."""
raise NotImplemented()
def _schedulePing(self,path,now=None):
+ """Helper: schedule a single ping along the path 'path', adding
+ it to self.nextPingTime.
+ """
if now is None: now = int(time.time())
periodStart = self._getPeriodStart(now)
periodEnd = periodStart + self._period_length
@@ -975,6 +1241,9 @@
",".join(path), formatTime(t,1))
return t
def _getPerturbation(self, path, periodStart, interval):
+ """Return the offset to be used for the ping intervals for 'path'
+ of interval 'interval' within the period starting at 'periodStart'.
+ """
sha = mixminion.Crypto.sha1("%s@@%s@@%s"%(",".join(path),
interval,
self.seed))
@@ -990,7 +1259,8 @@
return None
class OneHopPingGenerator(_PingScheduler,PingGenerator):
- """DOCDOC"""
+ """A OneHopPingGenerator uses the Echolot ping algorithm to schedule
+ single-hop pings all known servers."""
def __init__(self, config):
PingGenerator.__init__(self, config)
_PingScheduler.__init__(self)
@@ -1044,7 +1314,12 @@
self._schedulePing((n,), now+60)
class TwoHopPingGenerator(_PingScheduler, PingGenerator):
- """DOCDOC"""
+ """A TwoHopPingGenerator uses the Echolot ping algorithm to schedule
+ two-hop pings to all known pairs of servers.
+
+ If we conclude that a chain of servers is 'interesting' (possibly
+ broken, or too little data to tell), we ping it more frequently.
+ """
def __init__(self, config):
PingGenerator.__init__(self, config)
_PingScheduler.__init__(self)
@@ -1105,7 +1380,8 @@
self._schedulePing((n1,n2), now+60)
class TestLinkPaddingGenerator(PingGenerator):
- """DOCDOC"""
+ """A PingGenerator to ensure that we randomly probe all known server
+ addresses for liveness, from time to time."""
def __init__(self, config):
PingGenerator.__init__(self,config)
mixInterval = config['Server']['MixInterval'].getSeconds()
@@ -1133,6 +1409,8 @@
pkts.setdefault(addr,[]).append(padding)
class CompoundPingGenerator:
+ """A CompoundPingGenerator wraps several PingGenerators as a single
+ PingGenerator object."""
def __init__(self, generators):
self.gens = generators[:]
def connect(self, directory, outgoingQueue, pingLog, keyring):
@@ -1167,8 +1445,9 @@
g.addLinkPadding(pkts)
def getPingGenerator(config):
- """DOCDOC"""
- if not config['Pinging'].get('Enabled'):
+ """Return the PingGenerator (if any) requested in config."""
+ #XXXX008 make it possible to turn off some of the components.
+ if not config['Pinging'].get('Enabled') or not canRunPinger():
return CompoundPingGenerator([])
pingers = []
pingers.append(OneHopPingGenerator(config))
@@ -1177,12 +1456,22 @@
return CompoundPingGenerator(pingers)
def canRunPinger():
- """DOCDOC"""
+ """Return true iff we have the required libraries installed to run a pinger.
+ """
return sys.version_info[:2] >= (2,2) and sqlite is not None
+# Map from database type name to databae implementation class.
DATABASE_CLASSES = { 'sqlite' : SQLiteDatabase }
def openPingLog(config, location=None, databaseThread=None):
+ """Open a ping log based on the ServerConfig 'config'. If 'location' is
+ provided, store the files in 'location'; otherwise, deduce where to
+ store the files from 'config'. If databaseThread is provided and the
+ databse does not do well with multithreading (either no locking, or
+ locking too coarse-grained to use), then background all calls to
+ PingLog in databaseThread.
+ """
+
# FFFF eventually, we should maybe support more than pysqlite. But let's
# FFFF not generalize until we have a 2nd case.
database = 'sqlite'
Index: ServerMain.py
===================================================================
RCS file: /home/minion/cvsroot/src/minion/lib/mixminion/server/ServerMain.py,v
retrieving revision 1.145
retrieving revision 1.146
diff -u -d -r1.145 -r1.146
--- ServerMain.py 27 Dec 2004 00:09:21 -0000 1.145
+++ ServerMain.py 27 Dec 2004 00:15:58 -0000 1.146
@@ -153,13 +153,14 @@
# packetHandler -- an instance of PacketHandler.
# mixPool -- an instance of MixPool
# processingThread -- an instance of ProcessingThread
+ # pingLog -- an instance of pingLog, or None
def __init__(self, location, packetHandler):
"""Create an IncomingQueue that stores its packets in <location>
and processes them through <packetHandler>."""
mixminion.Filestore.StringStore.__init__(self, location, create=1)
self.packetHandler = packetHandler
self.mixPool = None
- self.pingLog = None#DOCDOC
+ self.pingLog = None
def connectQueues(self, mixPool, processingThread):
"""Sets the target mix queue"""
@@ -171,7 +172,9 @@
lambda self=self, h=h: self.__deliverPacket(h))
def setPingLog(self, pingLog):
- "DOCDOC"
+ """Configure this queue to inform 'pingLog' about received
+ ping messages.
+ """
self.pingLog = pingLog
def queuePacket(self, pkt):
@@ -336,8 +339,10 @@
## Fields:
# server -- an instance of _MMTPServer
# addr -- the key ID we published in our descriptor.
- # incomingQueue -- pointer to IncomingQueue object to be used for
+ # incomingQueue -- the IncomingQueue object to be used for
# self->self communication.
+ # pingGenerator -- the pingGenerator that may want to add link padding
+ # to outgoing packet sets, or None.
def __init__(self, location, keyID):
"""Create a new OutgoingQueue that stores its packets in a given
location."""
@@ -358,7 +363,7 @@
self.server = server
self.incomingQueue = incoming
- self.pingGenerator = pingGenerator#DOCDOC
+ self.pingGenerator = pingGenerator
def _deliverMessages(self, msgList):
"Implementation of abstract method from DeliveryQueue."
@@ -718,12 +723,18 @@
# cleaningThread: Thread used to remove packets in the background
# processingThread: Thread to handle CPU-intensive activity without
# slowing down network interactivity.
- # databaseThread: DOCDOC
+ # databaseThread: Thread to handle pinger database activitity that may
+ # be slow. (If the database has good locking, this is only statistics
+ # recomputation. If the database has dumb locking, this is all
+ # database activity.)
# lockFile: An instance of Lockfile to prevent multiple servers from
# running in the same directory. The filename for this lock is
# stored in self.pidFile.
# pidFile: Filename in which we store the pid of the running server.
- # pingLog: DOCDOC
+ # pingLog: None, or an instance of PingLog that needs to be informed
+ # about network probing activity.
+ # pingGenerator: None, or an instance of PingGenerator that will decide
+ # when to generate probe traffic.
def __init__(self, config):
"""Create a new server from a ServerConfig."""
_Scheduler.__init__(self)