[Author Prev][Author Next][Thread Prev][Thread Next][Author Index][Thread Index]

[minion-cvs] Get locking right: since sqlite does database locking s...



Update of /home/minion/cvsroot/src/minion/lib/mixminion/server
In directory moria.mit.edu:/tmp/cvs-serv7675/lib/mixminion/server

Modified Files:
	Pinger.py ServerMain.py 
Log Message:
Get locking right: since sqlite does database locking so poorly, but other dbs have good transaction systems, make all database access happen in a separate thread when we are using sqlite, and allow for saner stuff with better DBs.

Index: Pinger.py
===================================================================
RCS file: /home/minion/cvsroot/src/minion/lib/mixminion/server/Pinger.py,v
retrieving revision 1.12
retrieving revision 1.13
diff -u -d -r1.12 -r1.13
--- Pinger.py	11 Dec 2004 02:48:54 -0000	1.12
+++ Pinger.py	12 Dec 2004 02:48:16 -0000	1.13
@@ -32,6 +32,7 @@
 import mixminion.Crypto
 import mixminion.Packet
 import mixminion.ServerInfo
+import mixminion.ThreadUtils
 import mixminion.server.PacketHandler
 import mixminion.server.MMTPServer
 
@@ -73,17 +74,37 @@
             t = n
         return r
 
-class _SQLiteMixin:
+class SQLiteDatabase:
+    #XXXX can only be used from one thread at a time.
+    LOCKING_IS_COARSE = 1
+    def __init__(self, location):
+        self._theConnection = sqlite.connect(location, autocommit=0)
+        self._theCursor = self._theConnection.cursor()
+
+    def close(self):
+        self._theConnection.close()
+        self._theConnection = self._theCursor = None
+
+    def getConnection(self):
+        return self._theConnection
+
+    def getCursor(self):
+        return self._theCursor
+
+    def lock(self):
+        self.dbLock.acquire()
+
+    def unlock(self):
+        self.dbLock.release()
+
     def _objectExists(self, name, objType):
-        # hold lock.
-        self.cursor.execute(
+        self._theCursor.execute(
             "SELECT * FROM SQLITE_MASTER WHERE type = %s AND name = %s",
             (objType,name))
-        rs = self.cursor.fetchall()
+        rs = self._theCursor.fetchall()
         return len(rs) > 0
 
-    def _createTable(self, name, rows):
-        # hold lock
+    def createTable(self, name, rows):
         if self._objectExists(name,"table"):
             return
 
@@ -96,11 +117,10 @@
                 body.append("%s %s %s"%(r[0],REALTYPES[r[1]],r[2]))
 
         stmt = "CREATE TABLE %s (%s)" % (name,", ".join(body))
-        self.cursor.execute(stmt)
-        self.connection.commit()
+        self._theCursor.execute(stmt)
+        self._theConnection.commit()
 
-    def _createIndex(self, name, tablename, columns, unique=0):
-        #hold lock
+    def createIndex(self, name, tablename, columns, unique=0):
         if self._objectExists(name, "index"):
             return
 
@@ -110,19 +130,19 @@
             u = ""
         stmt = "CREATE %sINDEX %s ON %s (%s)"%(
             u, name, tablename, ", ".join(columns))
-        self.cursor.execute(stmt)
-        self.connection.commit()
+        self._theCursor.execute(stmt)
+        self._theConnection.commit()
 
-    def _time(self, t=None):
+    def time(self, t=None):
         return long(t or time.time())
 
-    def _bool(self, b):
+    def bool(self, b):
         if b:
             return 1
         else:
             return 0
 
-    def _getInsertOrUpdateFn(self, table, keyCols, valCols):
+    def getInsertOrUpdateFn(self, table, keyCols, valCols):
         update = "UPDATE %s SET %s WHERE %s" % (
             table,
             ", ".join(["%s = %%s" % k for k in valCols]),
@@ -136,120 +156,122 @@
             assert len(keyVals) == len(keyCols)
             assert len(valVals) == len(valCols)
 
-            self.cursor.execute(update, (valVals+keyVals))
-            if self.cursor.rowcount > 0:
+            self._theCursor.execute(update, (valVals+keyVals))
+            if self._theCursor.rowcount > 0:
                 return
 
-            self.cursor.execute(insert, (keyVals+valVals))
+            self._theCursor.execute(insert, (keyVals+valVals))
         return fn
 
-class PingLog(_SQLiteMixin):
-    def __init__(self, connection):
-        self.connection = connection
-        self.cursor = connection.cursor()
-        self.lock = threading.RLock()
-        self.serverNames = {}
-        self.serverReliability = {}
-        self.uptimeIntervals = PingerIntervalSchedule()
-        self.pingIntervals = PingerIntervalSchedule()
-        self.brokenChains = {}
-        self.interestingChains = {}
+class PingLog:
+    def __init__(self, db):
+        self._db = db
+        self._time = self._db.time
+        self._bool = self._db.bool
+        self._lock = threading.RLock()
+        self._serverNames = {}
+        self._serverReliability = {}
+        self._uptimeIntervals = PingerIntervalSchedule()
+        self._pingIntervals = PingerIntervalSchedule()
+        self._brokenChains = {}
+        self._interestingChains = {}
         self._startTime = None
         self._lastRecalculation = 0
         self._createAllTables()
         self._loadServers()
 
     def _createAllTables(self):
-        self.lock.acquire()
+        self._lock.acquire()
         try:
             # FFFF This is terrible DB design.  It's not normalized by
             # FFFF any stretch of the imagination, it doesn't have enough
             # FFFF constraints, etc etc.
             # Raw data
-            self._createTable("lifetimes",
-                              [("up",       "time", "not null"),
-                               ("stillup",  "time"),
-                               ("shutdown", "time")])
-            self._createTable("pings",
-                              [("hash",     "varchar", "primary key"),
-                               ("path",     "varchar", "not null"),
-                               ("sentat",   "time",    "not null"),
-                               ("received", "time")])
-            self._createTable("connects",
-                              [("at",       "time", "not null"),
-                               ("server",   "varchar", "not null"),
-                               ("success",  "boolean", "not null")])
-            self._createTable("servers",
-                              [("name", "varchar", "unique not null")])
+            self._db.createTable("lifetimes",
+                                 [("up",       "time", "not null"),
+                                  ("stillup",  "time"),
+                                  ("shutdown", "time")])
+            self._db.createTable("pings",
+                                 [("hash",     "varchar", "primary key"),
+                                  ("path",     "varchar", "not null"),
+                                  ("sentat",   "time",    "not null"),
+                                  ("received", "time")])
+            self._db.createTable("connects",
+                                 [("at",       "time", "not null"),
+                                  ("server",   "varchar", "not null"),
+                                  ("success",  "boolean", "not null")])
+            self._db.createTable("servers",
+                                 [("name", "varchar", "unique not null")])
             # Results
-            self._createTable("uptimes",
-                              [("start", "time", "not null"),
-                               ("end",   "time", "not null"),
-                               ("name",  "varchar", "not null"),
-                               ("uptime", "float", "not null")])
-            self._createTable("echolotOneHopResults",
-                              [("servername",  "varchar", "not null"),
-                               ("startAt",     "time",    "not null"),
-                               ("endAt",       "time",    "not null"),
-                               ("nSent",       "integer", "not null"),
-                               ("nReceived",   "integer", "not null"),
-                               ("latency",     "integer", "not null"),
-                               ("wsent",       "float",   "not null"),
-                               ("wreceived",   "float",   "not null"),
-                               ("reliability", "float",   "not null")])
-            self._createTable("echolotCurrentOneHopResults",
-                              [("servername",  "varchar", "unique not null"),
-                               ("at",          "time",    "not null"),
-                               ("latency",     "integer", "not null"),
-                               ("reliability", "float",   "not null")])
-            self._createTable("echolotCurrentTwoHopResults",
-                              [("path",        "varchar", "unique not null"),
-                               ("at",          "time",    "not null"),
-                               ("nSent",       "integer", "not null"),
-                               ("nReceived",   "integer", "not null"),
-                               ("broken",      "boolean", "not null"),
-                               ("interesting", "boolean", "not null")])
+            self._db.createTable("uptimes",
+                                 [("start", "time", "not null"),
+                                  ("end",   "time", "not null"),
+                                  ("name",  "varchar", "not null"),
+                                  ("uptime", "float", "not null")])
+            self._db.createTable("echolotOneHopResults",
+                                 [("servername",  "varchar", "not null"),
+                                  ("startAt",     "time",    "not null"),
+                                  ("endAt",       "time",    "not null"),
+                                  ("nSent",       "integer", "not null"),
+                                  ("nReceived",   "integer", "not null"),
+                                  ("latency",     "integer", "not null"),
+                                  ("wsent",       "float",   "not null"),
+                                  ("wreceived",   "float",   "not null"),
+                                  ("reliability", "float",   "not null")])
+            self._db.createTable("echolotCurrentOneHopResults",
+                                 [("servername",  "varchar", "unique not null"),
+                                  ("at",          "time",    "not null"),
+                                  ("latency",     "integer", "not null"),
+                                  ("reliability", "float",   "not null")])
+            self._db.createTable("echolotCurrentTwoHopResults",
+                                 [("path",        "varchar", "unique not null"),
+                                  ("at",          "time",    "not null"),
+                                  ("nSent",       "integer", "not null"),
+                                  ("nReceived",   "integer", "not null"),
+                                  ("broken",      "boolean", "not null"),
+                                  ("interesting", "boolean", "not null")])
 
-            self._createIndex("lifetimesUp", "lifetimes", ["up"])
-            self._createIndex("pingsHash",   "pings", ["hash"], unique=1)
-            self._createIndex("pingsPathSR", "pings",
-                              ["path", "sentat", "received"])
-            self._createIndex("connectsAt", "connects", ["at"])
-            self._createIndex("uptimesNS", "uptimes", ["name", "start"])
+            self._db.createIndex("lifetimesUp", "lifetimes", ["up"])
+            self._db.createIndex("pingsHash",   "pings", ["hash"], unique=1)
+            self._db.createIndex("pingsPathSR", "pings",
+                                 ["path", "sentat", "received"])
+            self._db.createIndex("connectsAt", "connects", ["at"])
+            self._db.createIndex("uptimesNS", "uptimes", ["name", "start"])
             # indices on echolot*results, uptimes.
 
-            self._setUptime = self._getInsertOrUpdateFn(
+            self._setUptime = self._db.getInsertOrUpdateFn(
                 "uptimes", ["start", "end", "name"], ["uptime"])
-            self._setOneHop = self._getInsertOrUpdateFn(
+            self._setOneHop = self._db.getInsertOrUpdateFn(
                 "echolotOneHopResults",
                 ["servername", "startAt", "endAt"],
                 ["nSent", "nReceived", "latency", "wsent", "wreceived",
                  "reliability"])
-            self._setCurOneHop = self._getInsertOrUpdateFn(
+            self._setCurOneHop = self._db.getInsertOrUpdateFn(
                 "echolotCurrentOneHopResults",
                 ["servername"],
                 ["at", "latency", "reliability"])
-            self._setTwoHop = self._getInsertOrUpdateFn(
+            self._setTwoHop = self._db.getInsertOrUpdateFn(
                 "echolotCurrentTwoHopResults",
                 ["path"],
                 ["at", "nSent", "nReceived", "broken", "interesting"])
         finally:
-            self.lock.release()
+            self._lock.release()
 
     def _loadServers(self):
         # hold lock.
-        self.serverNames = {}
-        cur = self.cursor
+        cur = self._db.getCursor()
         cur.execute("SELECT name FROM servers")
         res = cur.fetchall()
+        serverNames = {}
+        serverReliability = {}
         for name, in res:
-            self.serverNames[name] = 1
+            serverNames[name] = 1
 
         cur.execute("SELECT servername, reliability FROM "
                     "echolotCurrentOneHopResults")
         res = cur.fetchall()
         for name,rel in res:
-            self.serverReliability[name]=rel
+            serverReliability[name]=rel
 
         cur.execute("SELECT path, broken, interesting FROM "
                     "echolotCurrentTwoHopResults WHERE interesting OR broken")
@@ -261,90 +283,76 @@
                 broken[p]=1
             if i:
                 interesting[p]=1
-        self.isBroken = broken
-        self.isInteresting = interesting
+        self._serverNames = serverNames
+        self._serverReliability = serverReliability
+        self._brokenChains = broken
+        self._interestingChains = interesting
 
     def updateServers(self, names):
-        self.lock.acquire()
+        self._lock.acquire()
         try:
             for n in names:
                 self._addServer(n)
         finally:
-            self.lock.release()
+            self._lock.release()
 
     def _addServer(self, name):
-        # hold lock.
+        # doesn't commit.
         name = name.lower()
-        if self.serverNames.has_key(name):
-            return
-        self.cursor.execute("INSERT INTO servers (name) VALUES (%s)", name)
-        self.serverNames[name] = 1
+        self._lock.acquire()
+        try:
+            if self._serverNames.has_key(name):
+                return
+            self._serverNames[name] = 1
+        finally:
+            self._lock.release()
+
+        self._db.getCursor().execute("INSERT INTO servers (name) VALUES (%s)", name)
 
     def rotate(self, now=None):
-        self.lock.acquire()
         if now is None: now = time.time()
         cutoff = self._time(now - KEEP_HISTORY_DAYS * ONE_DAY)
-        cur = self.cursor
-        try:
-            cur.execute("DELETE FROM lifetimes WHERE stillup < %s", cutoff)
-            cur.execute("DELETE FROM pings WHERE sentat < %s", cutoff)
-            cur.execute("DELETE FROM connects WHERE at < %s", cutoff)
-            self.connection.commit()
-        finally:
-            self.lock.release()
+        cur = self._db.getCursor()
+        cur.execute("DELETE FROM lifetimes WHERE stillup < %s", cutoff)
+        cur.execute("DELETE FROM pings WHERE sentat < %s", cutoff)
+        cur.execute("DELETE FROM connects WHERE at < %s", cutoff)
+        self._db.getConnection().commit()
 
     def flush(self):
-        self.lock.acquire()
-        try:
-            self.connection.commit()
-        finally:
-            self.lock.release()
+        self._db.getConnection().commit()
 
     def close(self):
-        self.lock.acquire()
-        try:
-            self.connection.close()
-            del self.connection
-        finally:
-            self.lock.release()
-
-    def _execute(self, sql, args):
-        self.lock.acquire()
-        try:
-            self.cursor.execute(sql, args)
-        finally:
-            self.lock.release()
+        self._db.close()
 
     _STARTUP = "INSERT INTO lifetimes (up, stillup, shutdown) VALUES (%s,%s, 0)"
     def startup(self,now=None):
+        self._lock.acquire()
         self._startTime = now = self._time(now)
-        self._execute(self._STARTUP, (now,now))
+        self._lock.release()
+        self._db.getCursor().execute(self._STARTUP, (now,now))
+        self._db.getConnection().commit()
 
     _SHUTDOWN = "UPDATE lifetimes SET stillup = %s, shutdown = %s WHERE up = %s"
     def shutdown(self, now=None):
         if self._startTime is None: self.startup()
         now = self._time(now)
-        self._execute(self._SHUTDOWN, (now, now, self._startTime))
+        self._db.getCursor().execute(self._SHUTDOWN, (now, now, self._startTime))
+        self._db.getConnection().commit()
 
     _HEARTBEAT = "UPDATE lifetimes SET stillup = %s WHERE up = %s AND stillup < %s"
     def heartbeat(self, now=None):
         if self._startTime is None: self.startup()
         now = self._time(now)
-        self._execute(self._HEARTBEAT, (now, self._startTime, now))
-
-    def rotated(self):
-        pass
+        self._db.getCursor().execute(self._HEARTBEAT, (now, self._startTime, now))
+        self._db.getConnection().commit()
 
     _CONNECTED = ("INSERT INTO connects (at, server, success) "
                   "VALUES (%s,%s,%s)")
     def connected(self, nickname, success=1, now=None):
-        self.lock.acquire()
-        try:
-            self._addServer(nickname)
-            self._execute(self._CONNECTED,
-                    (self._time(now), nickname.lower(), self._bool(success)))
-        finally:
-            self.lock.release()
+        self._addServer(nickname)
+        self._db.getCursor().execute(self._CONNECTED,
+                  (self._time(now), nickname.lower(), self._bool(success)))
+        self._db.getConnection().commit()
 
     def connectFailed(self, nickname, now=None):
         self.connected(nickname, success=0, now=now)
@@ -353,103 +361,103 @@
                     "VALUES (%s,%s,%s,%s)")
     def queuedPing(self, hash, path, now=None):
         assert len(hash) == mixminion.Crypto.DIGEST_LEN
-        self.lock.acquire()
-        try:
-            path = path.lower()
-            for s in path.split(","):
-                self._addServer(s)
-            self._execute(self._QUEUED_PING,
-                          (formatBase64(hash), path, self._time(now), 0))
-        finally:
-            self.lock.release()
+        path = path.lower()
+        for s in path.split(","):
+            self._addServer(s)
+        self._db.getCursor().execute(self._QUEUED_PING,
+                             (formatBase64(hash), path, self._time(now), 0))
+        self._db.getConnection().commit()
 
     _GOT_PING = "UPDATE pings SET received = %s WHERE hash = %s"
     def gotPing(self, hash, now=None):
         assert len(hash) == mixminion.Crypto.DIGEST_LEN
-        n = self._execute(self._GOT_PING, (self._time(now), formatBase64(hash)))
+        self._db.getCursor().execute(self._GOT_PING, (self._time(now), formatBase64(hash)))
+        n = self._db.getCursor().rowcount
         if n == 0:
             LOG.warn("Received ping with no record of its hash")
         elif n > 1:
             LOG.warn("Received ping with multiple hash entries!")
 
-    def _calculateUptimes(self, startTime, endTime, now=None):
-        cur = self.cursor
-        self.lock.acquire()
-        try:
-            # First, calculate my own uptime.
-            if now is None: now = time.time()
-            self.heartbeat(now)
+    def _calculateUptimes(self, serverNames, startTime, endTime, now=None):
+        # commit when one done
+        cur = self._db.getCursor()
+        serverNames.sort()
 
-            timespan = IntervalSet( [(startTime, endTime)] )
+        # First, calculate my own uptime.
+        if now is None: now = time.time()
+        self.heartbeat(now)
 
-            cur.execute("SELECT up, stillup, shutdown FROM lifetimes WHERE "
-                        "up <= %s AND stillup >= %s",
-                        self._time(endTime), self._time(startTime))
-            myUptime = 0
-            myIntervals = IntervalSet([ (start, max(end,shutdown))
-                                        for start,end,shutdown in cur ])
-            myIntervals *= timespan
-            myUptime = myIntervals.spanLength()
-            fracUptime = float(myUptime)/(endTime-startTime)
-            self._setUptime(
-                (self._time(startTime), self._time(endTime), "<self>"),
-                (fracUptime,))
+        timespan = IntervalSet( [(startTime, endTime)] )
 
-            # Okay, now everybody else.
-            for s in self.serverNames.keys():
-                cur.execute("SELECT at, success FROM connects"
-                            " WHERE server = %s AND at >= %s AND at <= %s"
-                            " ORDER BY at",
-                            s, startTime, endTime)
+        cur.execute("SELECT up, stillup, shutdown FROM lifetimes WHERE "
+                    "up <= %s AND stillup >= %s",
+                    self._time(endTime), self._time(startTime))
+        myUptime = 0
+        myIntervals = IntervalSet([ (start, max(end,shutdown))
+                                    for start,end,shutdown in cur ])
+        myIntervals *= timespan
+        myUptime = myIntervals.spanLength()
+        fracUptime = float(myUptime)/(endTime-startTime)
+        self._setUptime(
+            (self._time(startTime), self._time(endTime), "<self>"),
+            (fracUptime,))
 
-                lastStatus = None
-                lastTime = None
-                times = [ 0, 0 ] # uptime, downtime
-                for at, success in cur:
-                    assert success in (0,1)
-                    upAt, downAt = myIntervals.getIntervalContaining(at)
-                    if upAt == None:
-                        # Event outside edge of interval.
-                        continue
-                    if lastTime is None or upAt > lastTime:
-                        lastTime = upAt
-                        lastStatus = None
-                    if lastStatus is not None:
-                        t = (at-lastTime)/2.0
-                        times[success] += t
-                        times[lastStatus] += t
-                    lastStatus = success
-                    lastTime = at
+        # Okay, now everybody else.
+        for s in self._serverNames.keys():
+            cur.execute("SELECT at, success FROM connects"
+                        " WHERE server = %s AND at >= %s AND at <= %s"
+                        " ORDER BY at",
+                        s, startTime, endTime)
 
-                if times == [0,0]:
+            lastStatus = None
+            lastTime = None
+            times = [ 0, 0 ] # uptime, downtime
+            for at, success in cur:
+                assert success in (0,1)
+                upAt, downAt = myIntervals.getIntervalContaining(at)
+                if upAt == None:
+                    # Event outside edge of interval.
                     continue
-                fraction = float(times[1])/(times[0]+times[1])
-                self._setUptime((startTime, endTime, s), (fraction,))
-            self.connection.commit()
-        finally:
-            self.lock.release()
+                if lastTime is None or upAt > lastTime:
+                    lastTime = upAt
+                    lastStatus = None
+                if lastStatus is not None:
+                    t = (at-lastTime)/2.0
+                    times[success] += t
+                    times[lastStatus] += t
+                lastStatus = success
+                lastTime = at
+
+            if times == [0,0]:
+                continue
+            fraction = float(times[1])/(times[0]+times[1])
+            self._setUptime((startTime, endTime, s), (fraction,))
 
     def calculateUptimes(self, startAt, endAt, now=None):
         if now is None: now = time.time()
-        for s, e in self.uptimeIntervals.getIntervals(startAt, endAt):
-            self._calculateUptimes(s, e, now=now)
+        self._lock.acquire()
+        try:
+            serverNames = self._serverNames.keys()
+        finally:
+            self._lock.release()
+        serverNames.sort()
+        for s, e in self._uptimeIntervals.getIntervals(startAt, endAt):
+            self._calculateUptimes(serverNames, s, e, now=now)
+        self._db.getConnection().commit()
 
     def getUptimes(self, startAt, endAt):
         """DODOC: uptimes for all servers overlapping [startAt, endAt],
            as mapping from (start,end) to nickname to fraction.
         """
         result = {}
-        cur = self.cursor
-        self.lock.acquire()
-        try:
-            cur.execute("SELECT start, end, name, uptime FROM uptimes "
-                        "WHERE %s >= start AND %s <= end",
-                        (self._time(startAt), self._time(endAt)))
-            for s,e,n,u in cur:
-                result.setdefault((s,e), {})[n] = u
-            return result
-        finally:
-            self.lock.release()
+        cur = self._db.getCursor()
+        cur.execute("SELECT start, end, name, uptime FROM uptimes "
+                    "WHERE %s >= start AND %s <= end",
+                    (self._time(startAt), self._time(endAt)))
+        for s,e,n,u in cur:
+            result.setdefault((s,e), {})[n] = u
+        self._db.getConnection().commit()
+        return result
 
     def _roundLatency(self, latency):
         """Using a median latency can leak the fact that a message was a
@@ -471,16 +479,15 @@
     _PING_GRANULARITY = 24*60*60
     def _calculateOneHopResults(self, serverName, startTime, endTime,
                                 now=None, calculateOverallResults=1):
-        # hold lock; commit when done.
-        cur = self.cursor
+        # commit when done; serverName must exist.
+        cur = self._db.getCursor()
         if now is None:
             now = time.time()
-        self._addServer(serverName)
         if calculateOverallResults:
             startTime = min(startTime,
                        now - (len(self._WEIGHT_AGE)*self._WEIGHT_AGE_PERIOD))
             endTime = max(endTime, now)
-        intervals = self.pingIntervals.getIntervals(startTime, endTime)
+        intervals = self._pingIntervals.getIntervals(startTime, endTime)
         nPeriods = len(intervals)
         startTime = intervals[0][0]
         endTime = intervals[-1][1]
@@ -562,7 +569,7 @@
                 (sent, rcvd, latent, wsent, wrcvd, rel))
 
         if not calculateOverallResults:
-            return
+            return None
 
         # 3. Write current overall results into the DB.
         if allLatencies:
@@ -581,24 +588,34 @@
         else:
             rel = 0.0
         self._setCurOneHop((serverName,), (self._time(now), latent, rel))
-        self.serverReliability[serverName] = rel
+        return rel
 
     def calculateOneHopResults(self, now=None):
-        self.lock.acquire()
+        self._lock.acquire()
         try:
-            if now is None:
-                now = time.time()
-            for s in self.serverNames.keys():
-                # For now, always calculate overall results.
-                self._calculateOneHopResults(s,now,now,now,
+            serverNames = self._serverNames.keys()
+        finally:
+            self._lock.release()
+
+        if now is None:
+            now = time.time()
+        serverNames.sort()
+        reliability = {}
+        for s in serverNames:
+            # For now, always calculate overall results.
+            r = self._calculateOneHopResults(s,now,now,now,
                                              calculateOverallResults=1)
-            self.connection.commit()
+            reliability[s] = r
+        self._db.getConnection().commit()
+        self._lock.acquire()
+        try:
+            self._serverReliability.update(reliability)
         finally:
-            self.lock.release()
+            self._lock.release()
 
     def _calculate2ChainStatus(self, since, path, now=None):
-        # hold lock.
-        cur = self.cursor
+        # doesn't commit.
+        cur = self._db.getCursor()
         cur.execute("SELECT count() FROM pings WHERE path = %s"
                     " AND sentat >= %s",
                     (path,self._time(since)))
@@ -610,7 +627,7 @@
 
         servers = path.split(",")
         try:
-            rels = [ self.serverReliability[s] for s in servers ]
+            rels = [ self._serverReliability[s] for s in servers ]
             product = reduce(operator.mul, rels)
         except KeyError:
             product = None
@@ -628,102 +645,113 @@
 
     _CHAIN_PING_HORIZON = 12*ONE_DAY
     def calculateChainStatus(self, now=None):
-        self.lock.acquire()
+        self._lock.acquire()
         try:
-            if now is None:
-                now = time.time()
-            brokenChains = {}
-            interestingChains = {}
-            since = now - self._CHAIN_PING_HORIZON
+            serverNames = self._serverNames.keys()
+        finally:
+            self._lock.release()
 
-            for s1 in self.serverNames.keys():
-                for s2 in self.serverNames.keys():
-                    p = "%s,%s" % (s1, s2)
+        if now is None:
+            now = time.time()
 
-                    nS, nR, prod, isBroken, isInteresting = \
-                        self._calculate2ChainStatus(since, p)
-                    if isBroken:
-                        brokenChains[p] = 1
-                    if isInteresting:
-                        interestingChains[p] = 1
-                    self._setTwoHop((p,),
-                           (self._time(now), nS, nR, self._bool(isBroken),
-                            self._bool(isInteresting)))
+        brokenChains = {}
+        interestingChains = {}
+        since = now - self._CHAIN_PING_HORIZON
+        serverNames.sort()
 
-            self.isBroken = isBroken
-            self.isInteresting = isInteresting
+        for s1 in serverNames:
+            for s2 in serverNames:
+                p = "%s,%s" % (s1, s2)
+
+                nS, nR, prod, isBroken, isInteresting = \
+                    self._calculate2ChainStatus(since, p)
+                if isBroken:
+                    brokenChains[p] = 1
+                if isInteresting:
+                    interestingChains[p] = 1
+                self._setTwoHop((p,),
+                       (self._time(now), nS, nR, self._bool(isBroken),
+                        self._bool(isInteresting)))
+        self._db.getConnection().commit()
+
+        self._lock.acquire()
+        try:
+            self._brokenChains = isBroken
+            self._interestingChains = isInteresting
         finally:
-            self.lock.release()
+            self._lock.release()
 
     def dumpAllStatus(self,f,since,now=None):
-        self.lock.acquire()
+        self._lock.acquire()
         try:
-            if now is None: now = time.time()
-            print >>f, "# List of all currently tracked servers."
-            print >>f, "KNOWN_SERVERS =",self.serverNames.keys()
-            cur = self.cursor
+            serverNames = self._serverNames.keys()
+        finally:
+            self._lock.release()
 
-            print >>f, "\n# Map from server to list of (period-start, period-end, fractional uptime"
-            print >>f, "SERVER_UPTIMES = {"
-            cur.execute("SELECT start,end,name,uptime FROM uptimes "
-                        "WHERE start >= %s AND end <= %s"
-                        "ORDER BY name, start", (since, now))
-            lastServer = "---"
-            for s,e,n,u in cur:
-                if n != lastServer:
-                    if lastServer != '---': print >>f, "   ],"
-                    lastServer = n
-                    print >>f, "   %r : [" % n
-                print >>f, "      (%s,%s,%.04f),"%(s,e,u)
-            if lastServer != '---': print >>f, "   ]"
-            print >>f, "}"
+        if now is None: now = time.time()
+        print >>f, "# List of all currently tracked servers."
+        print >>f, "KNOWN_SERVERS =",serverNames
+        cur = self._db.getCursor()
 
-            print >>f, """
+        print >>f, "\n# Map from server to list of (period-start, period-end, fractional uptime"
+        print >>f, "SERVER_UPTIMES = {"
+        cur.execute("SELECT start,end,name,uptime FROM uptimes "
+                    "WHERE start >= %s AND end <= %s"
+                    "ORDER BY name, start", (since, now))
+        lastServer = "---"
+        for s,e,n,u in cur:
+            if n != lastServer:
+                if lastServer != '---': print >>f, "   ],"
+                lastServer = n
+                print >>f, "   %r : [" % n
+            print >>f, "      (%s,%s,%.04f),"%(s,e,u)
+        if lastServer != '---': print >>f, "   ]"
+        print >>f, "}"
+
+        print >>f, """
 # Map from server name to list of (period-start, period-end, # pings sent,
 #      # of those pings received, median latency on those pings (sec),
 #      weighted reliability)"""
-            print >>f, "SERVER_DAILY_PING_STATUS = {"
-            cur.execute("SELECT servername,startAt,endAt,nSent,nReceived,"
-                        "  latency,reliability FROM echolotOneHopResults "
-                        "WHERE startat >= %s AND endat <= %s"
-                        "ORDER BY servername, startat", (since, now))
-            lastServer = "---"
-            for n,s,e,nS,nR,lat,r in cur:
-                if n != lastServer:
-                    if lastServer != '---': print >>f, "   ],"
-                    lastServer = n
-                    print >>f, "   %r : [" % n
-                print >>f, "      (%s,%s,%s,%s,%s,%.04f),"%(s,e,nS,nR,lat,r)
-            if lastServer != '---': print >>f, "   ]"
-            print >>f, "}"
-
-            print >>f, "\n# Map from server-name to current (avg latency, avg reliability)"
-            print >>f, "SERVER_CUR_PING_STATUS = {"
-            cur.execute("SELECT servername,latency,reliability FROM "
-                        "echolotCurrentOneHopResults")
-            for n,lat,r in cur:
-                print >>f, "   %r : (%s,%.04f)," %(n,lat,r)
-            print >>f, "}"
+        print >>f, "SERVER_DAILY_PING_STATUS = {"
+        cur.execute("SELECT servername,startAt,endAt,nSent,nReceived,"
+                    "  latency,reliability FROM echolotOneHopResults "
+                    "WHERE startat >= %s AND endat <= %s"
+                    "ORDER BY servername, startat", (since, now))
+        lastServer = "---"
+        for n,s,e,nS,nR,lat,r in cur:
+            if n != lastServer:
+                if lastServer != '---': print >>f, "   ],"
+                lastServer = n
+                print >>f, "   %r : [" % n
+            print >>f, "      (%s,%s,%s,%s,%s,%.04f),"%(s,e,nS,nR,lat,r)
+        if lastServer != '---': print >>f, "   ]"
+        print >>f, "}"
 
-            print >>f, "\n# Chains that we want to know more about"
-            print >>f, "INTERESTING_CHAINS = ["
-            cur.execute("SELECT path FROM echolotCurrentTwoHopResults "
-                        "WHERE interesting = 1")
-            for p, in cur:
-                print >>f, "   %r,"%p
-            print >>f, "]"
+        print >>f, "\n# Map from server-name to current (avg latency, avg reliability)"
+        print >>f, "SERVER_CUR_PING_STATUS = {"
+        cur.execute("SELECT servername,latency,reliability FROM "
+                    "echolotCurrentOneHopResults")
+        for n,lat,r in cur:
+            print >>f, "   %r : (%s,%.04f)," %(n,lat,r)
+        print >>f, "}"
 
-            print >>f, "\n# Chains that are more unreliable than we'd expect"
-            print >>f, "BROKEN_CHAINS = ["
-            cur.execute("SELECT path FROM echolotCurrentTwoHopResults "
-                        "WHERE broken = 1")
-            for p, in cur:
-                print >>f, "   %r,"%p
-            print >>f, "]"
-            print >>f, "\n"
+        print >>f, "\n# Chains that we want to know more about"
+        print >>f, "INTERESTING_CHAINS = ["
+        cur.execute("SELECT path FROM echolotCurrentTwoHopResults "
+                    "WHERE interesting = 1")
+        for p, in cur:
+            print >>f, "   %r,"%p
+        print >>f, "]"
 
-        finally:
-            self.lock.release()
+        print >>f, "\n# Chains that are more unreliable than we'd expect"
+        print >>f, "BROKEN_CHAINS = ["
+        cur.execute("SELECT path FROM echolotCurrentTwoHopResults "
+                    "WHERE broken = 1")
+        for p, in cur:
+            print >>f, "   %r,"%p
+        print >>f, "]"
+        print >>f, "\n"
+        self._db.getConnection().commit()
 
     def calculateAll(self, outFname=None, now=None):
         LOG.info("Computing ping results.")
@@ -815,6 +843,7 @@
                   ",".join(path), formatTime(t,1))
         return t
     def _getPerturbation(self, path, periodStart, interval):
+        #XXXX add a secret seed
         sha = mixminion.Crypto.sha1(",".join(path) + "@@" + str(interval))
         sec = abs(struct.unpack("I", sha[:4])[0]) % interval
         return sec
@@ -998,7 +1027,20 @@
     """DOCDOC"""
     return sys.version_info[:2] >= (2,2) and sqlite is not None
 
-def openPingLog(location):
+DATABASE_CLASSES = { 'sqlite' : SQLiteDatabase }
+
+def openPingLog(config, location=None, databaseThread=None):
     # FFFF eventually, we should maybe support more than pysqlite.  But let's
     # FFFF not generalize until we have a 2nd case.
-    return PingLog(sqlite.connect(location))
+    database = 'sqlite'
+
+    assert DATABASE_CLASSES.has_key(database)
+    if location is None:
+        location = os.path.join(config.getWorkingDir(), "pinger", "pingdb")
+    db = DATABASE_CLASSES[database](location)
+    log = PingLog(db)
+
+    if db.LOCKING_IS_COARSE and databaseThread is not None:
+        log = BackgroundingDecorator(databaseThread, log)
+
+    return log

Index: ServerMain.py
===================================================================
RCS file: /home/minion/cvsroot/src/minion/lib/mixminion/server/ServerMain.py,v
retrieving revision 1.137
retrieving revision 1.138
diff -u -d -r1.137 -r1.138
--- ServerMain.py	6 Dec 2004 20:02:23 -0000	1.137
+++ ServerMain.py	12 Dec 2004 02:48:16 -0000	1.138
@@ -55,7 +55,8 @@
 # We pull this from mixminion.ThreadUtils just in case somebody still has
 # a copy of the old "mixminion/server/Queue.py" (since renamed to
 # ServerQueue.py)
-from mixminion.ThreadUtils import MessageQueue, ClearableQueue, QueueEmpty
+from mixminion.ThreadUtils import MessageQueue, ClearableQueue, QueueEmpty, \
+     ProcessingThread
 
 import mixminion.ClientDirectory
 import mixminion.Config
@@ -482,45 +483,7 @@
             LOG.error_exc(sys.exc_info(),
                           "Exception while cleaning; shutting down thread.")
 
-class ProcessingThread(threading.Thread):
-    """Background thread to handle CPU-intensive functions.
-
-       Currently used to process packets in the background."""
-    # Fields:
-    #   mqueue: a ClearableQueue of callable objects.
-    class _Shutdown:
-        """Callable that raises itself when called.  Inserted into the
-           queue when it's time to shut down."""
-        def __call__(self):
-            raise self
-
-    def __init__(self):
-        """Create a new processing thread."""
-        threading.Thread.__init__(self)
-        self.mqueue = ClearableQueue()
-
-    def shutdown(self):
-        LOG.info("Telling processing thread to shut down.")
-        self.mqueue.clear()
-        self.mqueue.put(ProcessingThread._Shutdown())
-
-    def addJob(self, job):
-        """Adds a job to the message queue.  A job is a callable object
-           to be invoked by the processing thread.  If the job raises
-           ProcessingThread._Shutdown, the processing thread stops running."""
-        self.mqueue.put(job)
 
-    def run(self):
-        try:
-            while 1:
-                job = self.mqueue.get()
-                job()
-        except ProcessingThread._Shutdown:
-            LOG.info("Processing thread shutting down.")
-            return
-        except:
-            LOG.error_exc(sys.exc_info(),
-                          "Exception while processing; shutting down thread.")
 
 #----------------------------------------------------------------------
 STOPPING = 0 # Set to one if we get SIGTERM
@@ -876,8 +839,7 @@
             LOG.debug("Initializing ping log")
             pingerDir = os.path.join(config.getWorkDir(), "pinger")
             pingerLogDir = os.path.join(pingerDir, "log")
-            self.pingLog = mixminion.server.Pinger.PingStatusLog(pingerLogDir)
-            self.pingLog.startup()
+            self.pingLog = mixminion.server.Pinger.openPingLog(config)
 
             LOG.debug("Initializing ping generator")