[Author Prev][Author Next][Thread Prev][Thread Next][Author Index][Thread Index]
[minion-cvs] Get locking right: since sqlite does database locking s...
Update of /home/minion/cvsroot/src/minion/lib/mixminion/server
In directory moria.mit.edu:/tmp/cvs-serv7675/lib/mixminion/server
Modified Files:
Pinger.py ServerMain.py
Log Message:
Get locking right: since sqlite does database locking so poorly, but other dbs have good transaction systems, make all database access happen in a separate thread when we are using sqlite, and allow for saner stuff with better DBs.
Index: Pinger.py
===================================================================
RCS file: /home/minion/cvsroot/src/minion/lib/mixminion/server/Pinger.py,v
retrieving revision 1.12
retrieving revision 1.13
diff -u -d -r1.12 -r1.13
--- Pinger.py 11 Dec 2004 02:48:54 -0000 1.12
+++ Pinger.py 12 Dec 2004 02:48:16 -0000 1.13
@@ -32,6 +32,7 @@
import mixminion.Crypto
import mixminion.Packet
import mixminion.ServerInfo
+import mixminion.ThreadUtils
import mixminion.server.PacketHandler
import mixminion.server.MMTPServer
@@ -73,17 +74,37 @@
t = n
return r
-class _SQLiteMixin:
+class SQLiteDatabase:
+ #XXXX can only be used from one thread at a time.
+ LOCKING_IS_COARSE = 1
+ def __init__(self, location):
+ self._theConnection = sqlite.connect(location, autocommit=0)
+ self._theCursor = self._theConnection.cursor()
+
+ def close(self):
+ self._theConnection.close()
+ self._theConnection = self._theCursor = None
+
+ def getConnection(self):
+ return self._theConnection
+
+ def getCursor(self):
+ return self._theCursor
+
+ def lock(self):
+ self.dbLock.acquire()
+
+ def unlock(self):
+ self.dbLock.release()
+
def _objectExists(self, name, objType):
- # hold lock.
- self.cursor.execute(
+ self._theCursor.execute(
"SELECT * FROM SQLITE_MASTER WHERE type = %s AND name = %s",
(objType,name))
- rs = self.cursor.fetchall()
+ rs = self._theCursor.fetchall()
return len(rs) > 0
- def _createTable(self, name, rows):
- # hold lock
+ def createTable(self, name, rows):
if self._objectExists(name,"table"):
return
@@ -96,11 +117,10 @@
body.append("%s %s %s"%(r[0],REALTYPES[r[1]],r[2]))
stmt = "CREATE TABLE %s (%s)" % (name,", ".join(body))
- self.cursor.execute(stmt)
- self.connection.commit()
+ self._theCursor.execute(stmt)
+ self._theConnection.commit()
- def _createIndex(self, name, tablename, columns, unique=0):
- #hold lock
+ def createIndex(self, name, tablename, columns, unique=0):
if self._objectExists(name, "index"):
return
@@ -110,19 +130,19 @@
u = ""
stmt = "CREATE %sINDEX %s ON %s (%s)"%(
u, name, tablename, ", ".join(columns))
- self.cursor.execute(stmt)
- self.connection.commit()
+ self._theCursor.execute(stmt)
+ self._theConnection.commit()
- def _time(self, t=None):
+ def time(self, t=None):
return long(t or time.time())
- def _bool(self, b):
+ def bool(self, b):
if b:
return 1
else:
return 0
- def _getInsertOrUpdateFn(self, table, keyCols, valCols):
+ def getInsertOrUpdateFn(self, table, keyCols, valCols):
update = "UPDATE %s SET %s WHERE %s" % (
table,
", ".join(["%s = %%s" % k for k in valCols]),
@@ -136,120 +156,122 @@
assert len(keyVals) == len(keyCols)
assert len(valVals) == len(valCols)
- self.cursor.execute(update, (valVals+keyVals))
- if self.cursor.rowcount > 0:
+ self._theCursor.execute(update, (valVals+keyVals))
+ if self._theCursor.rowcount > 0:
return
- self.cursor.execute(insert, (keyVals+valVals))
+ self._theCursor.execute(insert, (keyVals+valVals))
return fn
-class PingLog(_SQLiteMixin):
- def __init__(self, connection):
- self.connection = connection
- self.cursor = connection.cursor()
- self.lock = threading.RLock()
- self.serverNames = {}
- self.serverReliability = {}
- self.uptimeIntervals = PingerIntervalSchedule()
- self.pingIntervals = PingerIntervalSchedule()
- self.brokenChains = {}
- self.interestingChains = {}
+class PingLog:
+ def __init__(self, db):
+ self._db = db
+ self._time = self._db.time
+ self._bool = self._db.bool
+ self._lock = threading.RLock()
+ self._serverNames = {}
+ self._serverReliability = {}
+ self._uptimeIntervals = PingerIntervalSchedule()
+ self._pingIntervals = PingerIntervalSchedule()
+ self._brokenChains = {}
+ self._interestingChains = {}
self._startTime = None
self._lastRecalculation = 0
self._createAllTables()
self._loadServers()
def _createAllTables(self):
- self.lock.acquire()
+ self._lock.acquire()
try:
# FFFF This is terrible DB design. It's not normalized by
# FFFF any stretch of the imagination, it doesn't have enough
# FFFF constraints, etc etc.
# Raw data
- self._createTable("lifetimes",
- [("up", "time", "not null"),
- ("stillup", "time"),
- ("shutdown", "time")])
- self._createTable("pings",
- [("hash", "varchar", "primary key"),
- ("path", "varchar", "not null"),
- ("sentat", "time", "not null"),
- ("received", "time")])
- self._createTable("connects",
- [("at", "time", "not null"),
- ("server", "varchar", "not null"),
- ("success", "boolean", "not null")])
- self._createTable("servers",
- [("name", "varchar", "unique not null")])
+ self._db.createTable("lifetimes",
+ [("up", "time", "not null"),
+ ("stillup", "time"),
+ ("shutdown", "time")])
+ self._db.createTable("pings",
+ [("hash", "varchar", "primary key"),
+ ("path", "varchar", "not null"),
+ ("sentat", "time", "not null"),
+ ("received", "time")])
+ self._db.createTable("connects",
+ [("at", "time", "not null"),
+ ("server", "varchar", "not null"),
+ ("success", "boolean", "not null")])
+ self._db.createTable("servers",
+ [("name", "varchar", "unique not null")])
# Results
- self._createTable("uptimes",
- [("start", "time", "not null"),
- ("end", "time", "not null"),
- ("name", "varchar", "not null"),
- ("uptime", "float", "not null")])
- self._createTable("echolotOneHopResults",
- [("servername", "varchar", "not null"),
- ("startAt", "time", "not null"),
- ("endAt", "time", "not null"),
- ("nSent", "integer", "not null"),
- ("nReceived", "integer", "not null"),
- ("latency", "integer", "not null"),
- ("wsent", "float", "not null"),
- ("wreceived", "float", "not null"),
- ("reliability", "float", "not null")])
- self._createTable("echolotCurrentOneHopResults",
- [("servername", "varchar", "unique not null"),
- ("at", "time", "not null"),
- ("latency", "integer", "not null"),
- ("reliability", "float", "not null")])
- self._createTable("echolotCurrentTwoHopResults",
- [("path", "varchar", "unique not null"),
- ("at", "time", "not null"),
- ("nSent", "integer", "not null"),
- ("nReceived", "integer", "not null"),
- ("broken", "boolean", "not null"),
- ("interesting", "boolean", "not null")])
+ self._db.createTable("uptimes",
+ [("start", "time", "not null"),
+ ("end", "time", "not null"),
+ ("name", "varchar", "not null"),
+ ("uptime", "float", "not null")])
+ self._db.createTable("echolotOneHopResults",
+ [("servername", "varchar", "not null"),
+ ("startAt", "time", "not null"),
+ ("endAt", "time", "not null"),
+ ("nSent", "integer", "not null"),
+ ("nReceived", "integer", "not null"),
+ ("latency", "integer", "not null"),
+ ("wsent", "float", "not null"),
+ ("wreceived", "float", "not null"),
+ ("reliability", "float", "not null")])
+ self._db.createTable("echolotCurrentOneHopResults",
+ [("servername", "varchar", "unique not null"),
+ ("at", "time", "not null"),
+ ("latency", "integer", "not null"),
+ ("reliability", "float", "not null")])
+ self._db.createTable("echolotCurrentTwoHopResults",
+ [("path", "varchar", "unique not null"),
+ ("at", "time", "not null"),
+ ("nSent", "integer", "not null"),
+ ("nReceived", "integer", "not null"),
+ ("broken", "boolean", "not null"),
+ ("interesting", "boolean", "not null")])
- self._createIndex("lifetimesUp", "lifetimes", ["up"])
- self._createIndex("pingsHash", "pings", ["hash"], unique=1)
- self._createIndex("pingsPathSR", "pings",
- ["path", "sentat", "received"])
- self._createIndex("connectsAt", "connects", ["at"])
- self._createIndex("uptimesNS", "uptimes", ["name", "start"])
+ self._db.createIndex("lifetimesUp", "lifetimes", ["up"])
+ self._db.createIndex("pingsHash", "pings", ["hash"], unique=1)
+ self._db.createIndex("pingsPathSR", "pings",
+ ["path", "sentat", "received"])
+ self._db.createIndex("connectsAt", "connects", ["at"])
+ self._db.createIndex("uptimesNS", "uptimes", ["name", "start"])
# indices on echolot*results, uptimes.
- self._setUptime = self._getInsertOrUpdateFn(
+ self._setUptime = self._db.getInsertOrUpdateFn(
"uptimes", ["start", "end", "name"], ["uptime"])
- self._setOneHop = self._getInsertOrUpdateFn(
+ self._setOneHop = self._db.getInsertOrUpdateFn(
"echolotOneHopResults",
["servername", "startAt", "endAt"],
["nSent", "nReceived", "latency", "wsent", "wreceived",
"reliability"])
- self._setCurOneHop = self._getInsertOrUpdateFn(
+ self._setCurOneHop = self._db.getInsertOrUpdateFn(
"echolotCurrentOneHopResults",
["servername"],
["at", "latency", "reliability"])
- self._setTwoHop = self._getInsertOrUpdateFn(
+ self._setTwoHop = self._db.getInsertOrUpdateFn(
"echolotCurrentTwoHopResults",
["path"],
["at", "nSent", "nReceived", "broken", "interesting"])
finally:
- self.lock.release()
+ self._lock.release()
def _loadServers(self):
# hold lock.
- self.serverNames = {}
- cur = self.cursor
+ cur = self._db.getCursor()
cur.execute("SELECT name FROM servers")
res = cur.fetchall()
+ serverNames = {}
+ serverReliability = {}
for name, in res:
- self.serverNames[name] = 1
+ serverNames[name] = 1
cur.execute("SELECT servername, reliability FROM "
"echolotCurrentOneHopResults")
res = cur.fetchall()
for name,rel in res:
- self.serverReliability[name]=rel
+ serverReliability[name]=rel
cur.execute("SELECT path, broken, interesting FROM "
"echolotCurrentTwoHopResults WHERE interesting OR broken")
@@ -261,90 +283,76 @@
broken[p]=1
if i:
interesting[p]=1
- self.isBroken = broken
- self.isInteresting = interesting
+ self._serverNames = serverNames
+ self._serverReliability = serverReliability
+ self._brokenChains = broken
+ self._interestingChains = interesting
def updateServers(self, names):
- self.lock.acquire()
+ self._lock.acquire()
try:
for n in names:
self._addServer(n)
finally:
- self.lock.release()
+ self._lock.release()
def _addServer(self, name):
- # hold lock.
+ # doesn't commit.
name = name.lower()
- if self.serverNames.has_key(name):
- return
- self.cursor.execute("INSERT INTO servers (name) VALUES (%s)", name)
- self.serverNames[name] = 1
+ self._lock.acquire()
+ try:
+ if self._serverNames.has_key(name):
+ return
+ self._serverNames[name] = 1
+ finally:
+ self._lock.release()
+
+ self._db.getCursor().execute("INSERT INTO servers (name) VALUES (%s)", name)
def rotate(self, now=None):
- self.lock.acquire()
if now is None: now = time.time()
cutoff = self._time(now - KEEP_HISTORY_DAYS * ONE_DAY)
- cur = self.cursor
- try:
- cur.execute("DELETE FROM lifetimes WHERE stillup < %s", cutoff)
- cur.execute("DELETE FROM pings WHERE sentat < %s", cutoff)
- cur.execute("DELETE FROM connects WHERE at < %s", cutoff)
- self.connection.commit()
- finally:
- self.lock.release()
+ cur = self._db.getCursor()
+ cur.execute("DELETE FROM lifetimes WHERE stillup < %s", cutoff)
+ cur.execute("DELETE FROM pings WHERE sentat < %s", cutoff)
+ cur.execute("DELETE FROM connects WHERE at < %s", cutoff)
+ self._db.getConnection().commit()
def flush(self):
- self.lock.acquire()
- try:
- self.connection.commit()
- finally:
- self.lock.release()
+ self._db.getConnection().commit()
def close(self):
- self.lock.acquire()
- try:
- self.connection.close()
- del self.connection
- finally:
- self.lock.release()
-
- def _execute(self, sql, args):
- self.lock.acquire()
- try:
- self.cursor.execute(sql, args)
- finally:
- self.lock.release()
+ self._db.close()
_STARTUP = "INSERT INTO lifetimes (up, stillup, shutdown) VALUES (%s,%s, 0)"
def startup(self,now=None):
+ self._lock.acquire()
self._startTime = now = self._time(now)
- self._execute(self._STARTUP, (now,now))
+ self._lock.release()
+ self._db.getCursor().execute(self._STARTUP, (now,now))
+ self._db.getConnection().commit()
_SHUTDOWN = "UPDATE lifetimes SET stillup = %s, shutdown = %s WHERE up = %s"
def shutdown(self, now=None):
if self._startTime is None: self.startup()
now = self._time(now)
- self._execute(self._SHUTDOWN, (now, now, self._startTime))
+ self._db.getCursor().execute(self._SHUTDOWN, (now, now, self._startTime))
+ self._db.getConnection().commit()
_HEARTBEAT = "UPDATE lifetimes SET stillup = %s WHERE up = %s AND stillup < %s"
def heartbeat(self, now=None):
if self._startTime is None: self.startup()
now = self._time(now)
- self._execute(self._HEARTBEAT, (now, self._startTime, now))
-
- def rotated(self):
- pass
+ self._db.getCursor().execute(self._HEARTBEAT, (now, self._startTime, now))
+ self._db.getConnection().commit()
_CONNECTED = ("INSERT INTO connects (at, server, success) "
"VALUES (%s,%s,%s)")
def connected(self, nickname, success=1, now=None):
- self.lock.acquire()
- try:
- self._addServer(nickname)
- self._execute(self._CONNECTED,
- (self._time(now), nickname.lower(), self._bool(success)))
- finally:
- self.lock.release()
+ self._addServer(nickname)
+ self._db.getCursor().execute(self._CONNECTED,
+ (self._time(now), nickname.lower(), self._bool(success)))
+ self._db.getConnection().commit()
def connectFailed(self, nickname, now=None):
self.connected(nickname, success=0, now=now)
@@ -353,103 +361,103 @@
"VALUES (%s,%s,%s,%s)")
def queuedPing(self, hash, path, now=None):
assert len(hash) == mixminion.Crypto.DIGEST_LEN
- self.lock.acquire()
- try:
- path = path.lower()
- for s in path.split(","):
- self._addServer(s)
- self._execute(self._QUEUED_PING,
- (formatBase64(hash), path, self._time(now), 0))
- finally:
- self.lock.release()
+ path = path.lower()
+ for s in path.split(","):
+ self._addServer(s)
+ self._db.getCursor().execute(self._QUEUED_PING,
+ (formatBase64(hash), path, self._time(now), 0))
+ self._db.getConnection().commit()
_GOT_PING = "UPDATE pings SET received = %s WHERE hash = %s"
def gotPing(self, hash, now=None):
assert len(hash) == mixminion.Crypto.DIGEST_LEN
- n = self._execute(self._GOT_PING, (self._time(now), formatBase64(hash)))
+ self._db.getCursor().execute(self._GOT_PING, (self._time(now), formatBase64(hash)))
+ n = self._db.getCursor().rowcount
if n == 0:
LOG.warn("Received ping with no record of its hash")
elif n > 1:
LOG.warn("Received ping with multiple hash entries!")
- def _calculateUptimes(self, startTime, endTime, now=None):
- cur = self.cursor
- self.lock.acquire()
- try:
- # First, calculate my own uptime.
- if now is None: now = time.time()
- self.heartbeat(now)
+ def _calculateUptimes(self, serverNames, startTime, endTime, now=None):
+ # commit when one done
+ cur = self._db.getCursor()
+ serverNames.sort()
- timespan = IntervalSet( [(startTime, endTime)] )
+ # First, calculate my own uptime.
+ if now is None: now = time.time()
+ self.heartbeat(now)
- cur.execute("SELECT up, stillup, shutdown FROM lifetimes WHERE "
- "up <= %s AND stillup >= %s",
- self._time(endTime), self._time(startTime))
- myUptime = 0
- myIntervals = IntervalSet([ (start, max(end,shutdown))
- for start,end,shutdown in cur ])
- myIntervals *= timespan
- myUptime = myIntervals.spanLength()
- fracUptime = float(myUptime)/(endTime-startTime)
- self._setUptime(
- (self._time(startTime), self._time(endTime), "<self>"),
- (fracUptime,))
+ timespan = IntervalSet( [(startTime, endTime)] )
- # Okay, now everybody else.
- for s in self.serverNames.keys():
- cur.execute("SELECT at, success FROM connects"
- " WHERE server = %s AND at >= %s AND at <= %s"
- " ORDER BY at",
- s, startTime, endTime)
+ cur.execute("SELECT up, stillup, shutdown FROM lifetimes WHERE "
+ "up <= %s AND stillup >= %s",
+ self._time(endTime), self._time(startTime))
+ myUptime = 0
+ myIntervals = IntervalSet([ (start, max(end,shutdown))
+ for start,end,shutdown in cur ])
+ myIntervals *= timespan
+ myUptime = myIntervals.spanLength()
+ fracUptime = float(myUptime)/(endTime-startTime)
+ self._setUptime(
+ (self._time(startTime), self._time(endTime), "<self>"),
+ (fracUptime,))
- lastStatus = None
- lastTime = None
- times = [ 0, 0 ] # uptime, downtime
- for at, success in cur:
- assert success in (0,1)
- upAt, downAt = myIntervals.getIntervalContaining(at)
- if upAt == None:
- # Event outside edge of interval.
- continue
- if lastTime is None or upAt > lastTime:
- lastTime = upAt
- lastStatus = None
- if lastStatus is not None:
- t = (at-lastTime)/2.0
- times[success] += t
- times[lastStatus] += t
- lastStatus = success
- lastTime = at
+ # Okay, now everybody else.
+ for s in self._serverNames.keys():
+ cur.execute("SELECT at, success FROM connects"
+ " WHERE server = %s AND at >= %s AND at <= %s"
+ " ORDER BY at",
+ s, startTime, endTime)
- if times == [0,0]:
+ lastStatus = None
+ lastTime = None
+ times = [ 0, 0 ] # uptime, downtime
+ for at, success in cur:
+ assert success in (0,1)
+ upAt, downAt = myIntervals.getIntervalContaining(at)
+ if upAt == None:
+ # Event outside edge of interval.
continue
- fraction = float(times[1])/(times[0]+times[1])
- self._setUptime((startTime, endTime, s), (fraction,))
- self.connection.commit()
- finally:
- self.lock.release()
+ if lastTime is None or upAt > lastTime:
+ lastTime = upAt
+ lastStatus = None
+ if lastStatus is not None:
+ t = (at-lastTime)/2.0
+ times[success] += t
+ times[lastStatus] += t
+ lastStatus = success
+ lastTime = at
+
+ if times == [0,0]:
+ continue
+ fraction = float(times[1])/(times[0]+times[1])
+ self._setUptime((startTime, endTime, s), (fraction,))
def calculateUptimes(self, startAt, endAt, now=None):
if now is None: now = time.time()
- for s, e in self.uptimeIntervals.getIntervals(startAt, endAt):
- self._calculateUptimes(s, e, now=now)
+ self._lock.acquire()
+ try:
+ serverNames = self._serverNames.keys()
+ finally:
+ self._lock.release()
+ serverNames.sort()
+ for s, e in self._uptimeIntervals.getIntervals(startAt, endAt):
+ self._calculateUptimes(serverNames, s, e, now=now)
+ self._db.getConnection().commit()
def getUptimes(self, startAt, endAt):
"""DODOC: uptimes for all servers overlapping [startAt, endAt],
as mapping from (start,end) to nickname to fraction.
"""
result = {}
- cur = self.cursor
- self.lock.acquire()
- try:
- cur.execute("SELECT start, end, name, uptime FROM uptimes "
- "WHERE %s >= start AND %s <= end",
- (self._time(startAt), self._time(endAt)))
- for s,e,n,u in cur:
- result.setdefault((s,e), {})[n] = u
- return result
- finally:
- self.lock.release()
+ cur = self._db.getCursor()
+ cur.execute("SELECT start, end, name, uptime FROM uptimes "
+ "WHERE %s >= start AND %s <= end",
+ (self._time(startAt), self._time(endAt)))
+ for s,e,n,u in cur:
+ result.setdefault((s,e), {})[n] = u
+ self._db.getConnection().commit()
+ return result
def _roundLatency(self, latency):
"""Using a median latency can leak the fact that a message was a
@@ -471,16 +479,15 @@
_PING_GRANULARITY = 24*60*60
def _calculateOneHopResults(self, serverName, startTime, endTime,
now=None, calculateOverallResults=1):
- # hold lock; commit when done.
- cur = self.cursor
+ # commit when done; serverName must exist.
+ cur = self._db.getCursor()
if now is None:
now = time.time()
- self._addServer(serverName)
if calculateOverallResults:
startTime = min(startTime,
now - (len(self._WEIGHT_AGE)*self._WEIGHT_AGE_PERIOD))
endTime = max(endTime, now)
- intervals = self.pingIntervals.getIntervals(startTime, endTime)
+ intervals = self._pingIntervals.getIntervals(startTime, endTime)
nPeriods = len(intervals)
startTime = intervals[0][0]
endTime = intervals[-1][1]
@@ -562,7 +569,7 @@
(sent, rcvd, latent, wsent, wrcvd, rel))
if not calculateOverallResults:
- return
+ return None
# 3. Write current overall results into the DB.
if allLatencies:
@@ -581,24 +588,34 @@
else:
rel = 0.0
self._setCurOneHop((serverName,), (self._time(now), latent, rel))
- self.serverReliability[serverName] = rel
+ return rel
def calculateOneHopResults(self, now=None):
- self.lock.acquire()
+ self._lock.acquire()
try:
- if now is None:
- now = time.time()
- for s in self.serverNames.keys():
- # For now, always calculate overall results.
- self._calculateOneHopResults(s,now,now,now,
+ serverNames = self._serverNames.keys()
+ finally:
+ self._lock.release()
+
+ if now is None:
+ now = time.time()
+ serverNames.sort()
+ reliability = {}
+ for s in serverNames:
+ # For now, always calculate overall results.
+ r = self._calculateOneHopResults(s,now,now,now,
calculateOverallResults=1)
- self.connection.commit()
+ reliability[s] = r
+ self._db.getConnection().commit()
+ self._lock.acquire()
+ try:
+ self._serverReliability.update(reliability)
finally:
- self.lock.release()
+ self._lock.release()
def _calculate2ChainStatus(self, since, path, now=None):
- # hold lock.
- cur = self.cursor
+ # doesn't commit.
+ cur = self._db.getCursor()
cur.execute("SELECT count() FROM pings WHERE path = %s"
" AND sentat >= %s",
(path,self._time(since)))
@@ -610,7 +627,7 @@
servers = path.split(",")
try:
- rels = [ self.serverReliability[s] for s in servers ]
+ rels = [ self._serverReliability[s] for s in servers ]
product = reduce(operator.mul, rels)
except KeyError:
product = None
@@ -628,102 +645,113 @@
_CHAIN_PING_HORIZON = 12*ONE_DAY
def calculateChainStatus(self, now=None):
- self.lock.acquire()
+ self._lock.acquire()
try:
- if now is None:
- now = time.time()
- brokenChains = {}
- interestingChains = {}
- since = now - self._CHAIN_PING_HORIZON
+ serverNames = self._serverNames.keys()
+ finally:
+ self._lock.release()
- for s1 in self.serverNames.keys():
- for s2 in self.serverNames.keys():
- p = "%s,%s" % (s1, s2)
+ if now is None:
+ now = time.time()
- nS, nR, prod, isBroken, isInteresting = \
- self._calculate2ChainStatus(since, p)
- if isBroken:
- brokenChains[p] = 1
- if isInteresting:
- interestingChains[p] = 1
- self._setTwoHop((p,),
- (self._time(now), nS, nR, self._bool(isBroken),
- self._bool(isInteresting)))
+ brokenChains = {}
+ interestingChains = {}
+ since = now - self._CHAIN_PING_HORIZON
+ serverNames.sort()
- self.isBroken = isBroken
- self.isInteresting = isInteresting
+ for s1 in serverNames:
+ for s2 in serverNames:
+ p = "%s,%s" % (s1, s2)
+
+ nS, nR, prod, isBroken, isInteresting = \
+ self._calculate2ChainStatus(since, p)
+ if isBroken:
+ brokenChains[p] = 1
+ if isInteresting:
+ interestingChains[p] = 1
+ self._setTwoHop((p,),
+ (self._time(now), nS, nR, self._bool(isBroken),
+ self._bool(isInteresting)))
+ self._db.getConnection().commit()
+
+ self._lock.acquire()
+ try:
+ self._brokenChains = isBroken
+ self._interestingChains = isInteresting
finally:
- self.lock.release()
+ self._lock.release()
def dumpAllStatus(self,f,since,now=None):
- self.lock.acquire()
+ self._lock.acquire()
try:
- if now is None: now = time.time()
- print >>f, "# List of all currently tracked servers."
- print >>f, "KNOWN_SERVERS =",self.serverNames.keys()
- cur = self.cursor
+ serverNames = self._serverNames.keys()
+ finally:
+ self._lock.release()
- print >>f, "\n# Map from server to list of (period-start, period-end, fractional uptime"
- print >>f, "SERVER_UPTIMES = {"
- cur.execute("SELECT start,end,name,uptime FROM uptimes "
- "WHERE start >= %s AND end <= %s"
- "ORDER BY name, start", (since, now))
- lastServer = "---"
- for s,e,n,u in cur:
- if n != lastServer:
- if lastServer != '---': print >>f, " ],"
- lastServer = n
- print >>f, " %r : [" % n
- print >>f, " (%s,%s,%.04f),"%(s,e,u)
- if lastServer != '---': print >>f, " ]"
- print >>f, "}"
+ if now is None: now = time.time()
+ print >>f, "# List of all currently tracked servers."
+ print >>f, "KNOWN_SERVERS =",serverNames
+ cur = self._db.getCursor()
- print >>f, """
+ print >>f, "\n# Map from server to list of (period-start, period-end, fractional uptime"
+ print >>f, "SERVER_UPTIMES = {"
+ cur.execute("SELECT start,end,name,uptime FROM uptimes "
+ "WHERE start >= %s AND end <= %s"
+ "ORDER BY name, start", (since, now))
+ lastServer = "---"
+ for s,e,n,u in cur:
+ if n != lastServer:
+ if lastServer != '---': print >>f, " ],"
+ lastServer = n
+ print >>f, " %r : [" % n
+ print >>f, " (%s,%s,%.04f),"%(s,e,u)
+ if lastServer != '---': print >>f, " ]"
+ print >>f, "}"
+
+ print >>f, """
# Map from server name to list of (period-start, period-end, # pings sent,
# # of those pings received, median latency on those pings (sec),
# weighted reliability)"""
- print >>f, "SERVER_DAILY_PING_STATUS = {"
- cur.execute("SELECT servername,startAt,endAt,nSent,nReceived,"
- " latency,reliability FROM echolotOneHopResults "
- "WHERE startat >= %s AND endat <= %s"
- "ORDER BY servername, startat", (since, now))
- lastServer = "---"
- for n,s,e,nS,nR,lat,r in cur:
- if n != lastServer:
- if lastServer != '---': print >>f, " ],"
- lastServer = n
- print >>f, " %r : [" % n
- print >>f, " (%s,%s,%s,%s,%s,%.04f),"%(s,e,nS,nR,lat,r)
- if lastServer != '---': print >>f, " ]"
- print >>f, "}"
-
- print >>f, "\n# Map from server-name to current (avg latency, avg reliability)"
- print >>f, "SERVER_CUR_PING_STATUS = {"
- cur.execute("SELECT servername,latency,reliability FROM "
- "echolotCurrentOneHopResults")
- for n,lat,r in cur:
- print >>f, " %r : (%s,%.04f)," %(n,lat,r)
- print >>f, "}"
+ print >>f, "SERVER_DAILY_PING_STATUS = {"
+ cur.execute("SELECT servername,startAt,endAt,nSent,nReceived,"
+ " latency,reliability FROM echolotOneHopResults "
+ "WHERE startat >= %s AND endat <= %s"
+ "ORDER BY servername, startat", (since, now))
+ lastServer = "---"
+ for n,s,e,nS,nR,lat,r in cur:
+ if n != lastServer:
+ if lastServer != '---': print >>f, " ],"
+ lastServer = n
+ print >>f, " %r : [" % n
+ print >>f, " (%s,%s,%s,%s,%s,%.04f),"%(s,e,nS,nR,lat,r)
+ if lastServer != '---': print >>f, " ]"
+ print >>f, "}"
- print >>f, "\n# Chains that we want to know more about"
- print >>f, "INTERESTING_CHAINS = ["
- cur.execute("SELECT path FROM echolotCurrentTwoHopResults "
- "WHERE interesting = 1")
- for p, in cur:
- print >>f, " %r,"%p
- print >>f, "]"
+ print >>f, "\n# Map from server-name to current (avg latency, avg reliability)"
+ print >>f, "SERVER_CUR_PING_STATUS = {"
+ cur.execute("SELECT servername,latency,reliability FROM "
+ "echolotCurrentOneHopResults")
+ for n,lat,r in cur:
+ print >>f, " %r : (%s,%.04f)," %(n,lat,r)
+ print >>f, "}"
- print >>f, "\n# Chains that are more unreliable than we'd expect"
- print >>f, "BROKEN_CHAINS = ["
- cur.execute("SELECT path FROM echolotCurrentTwoHopResults "
- "WHERE broken = 1")
- for p, in cur:
- print >>f, " %r,"%p
- print >>f, "]"
- print >>f, "\n"
+ print >>f, "\n# Chains that we want to know more about"
+ print >>f, "INTERESTING_CHAINS = ["
+ cur.execute("SELECT path FROM echolotCurrentTwoHopResults "
+ "WHERE interesting = 1")
+ for p, in cur:
+ print >>f, " %r,"%p
+ print >>f, "]"
- finally:
- self.lock.release()
+ print >>f, "\n# Chains that are more unreliable than we'd expect"
+ print >>f, "BROKEN_CHAINS = ["
+ cur.execute("SELECT path FROM echolotCurrentTwoHopResults "
+ "WHERE broken = 1")
+ for p, in cur:
+ print >>f, " %r,"%p
+ print >>f, "]"
+ print >>f, "\n"
+ self._db.getConnection().commit()
def calculateAll(self, outFname=None, now=None):
LOG.info("Computing ping results.")
@@ -815,6 +843,7 @@
",".join(path), formatTime(t,1))
return t
def _getPerturbation(self, path, periodStart, interval):
+ #XXXX add a secret seed
sha = mixminion.Crypto.sha1(",".join(path) + "@@" + str(interval))
sec = abs(struct.unpack("I", sha[:4])[0]) % interval
return sec
@@ -998,7 +1027,20 @@
"""DOCDOC"""
return sys.version_info[:2] >= (2,2) and sqlite is not None
-def openPingLog(location):
+DATABASE_CLASSES = { 'sqlite' : SQLiteDatabase }
+
+def openPingLog(config, location=None, databaseThread=None):
# FFFF eventually, we should maybe support more than pysqlite. But let's
# FFFF not generalize until we have a 2nd case.
- return PingLog(sqlite.connect(location))
+ database = 'sqlite'
+
+ assert DATABASE_CLASSES.has_key(database)
+ if location is None:
+ location = os.path.join(config.getWorkingDir(), "pinger", "pingdb")
+ db = DATABASE_CLASSES[database](location)
+ log = PingLog(db)
+
+ if db.LOCKING_IS_COARSE and databaseThread is not None:
+ log = BackgroundingDecorator(databaseThread, log)
+
+ return log
Index: ServerMain.py
===================================================================
RCS file: /home/minion/cvsroot/src/minion/lib/mixminion/server/ServerMain.py,v
retrieving revision 1.137
retrieving revision 1.138
diff -u -d -r1.137 -r1.138
--- ServerMain.py 6 Dec 2004 20:02:23 -0000 1.137
+++ ServerMain.py 12 Dec 2004 02:48:16 -0000 1.138
@@ -55,7 +55,8 @@
# We pull this from mixminion.ThreadUtils just in case somebody still has
# a copy of the old "mixminion/server/Queue.py" (since renamed to
# ServerQueue.py)
-from mixminion.ThreadUtils import MessageQueue, ClearableQueue, QueueEmpty
+from mixminion.ThreadUtils import MessageQueue, ClearableQueue, QueueEmpty, \
+ ProcessingThread
import mixminion.ClientDirectory
import mixminion.Config
@@ -482,45 +483,7 @@
LOG.error_exc(sys.exc_info(),
"Exception while cleaning; shutting down thread.")
-class ProcessingThread(threading.Thread):
- """Background thread to handle CPU-intensive functions.
-
- Currently used to process packets in the background."""
- # Fields:
- # mqueue: a ClearableQueue of callable objects.
- class _Shutdown:
- """Callable that raises itself when called. Inserted into the
- queue when it's time to shut down."""
- def __call__(self):
- raise self
-
- def __init__(self):
- """Create a new processing thread."""
- threading.Thread.__init__(self)
- self.mqueue = ClearableQueue()
-
- def shutdown(self):
- LOG.info("Telling processing thread to shut down.")
- self.mqueue.clear()
- self.mqueue.put(ProcessingThread._Shutdown())
-
- def addJob(self, job):
- """Adds a job to the message queue. A job is a callable object
- to be invoked by the processing thread. If the job raises
- ProcessingThread._Shutdown, the processing thread stops running."""
- self.mqueue.put(job)
- def run(self):
- try:
- while 1:
- job = self.mqueue.get()
- job()
- except ProcessingThread._Shutdown:
- LOG.info("Processing thread shutting down.")
- return
- except:
- LOG.error_exc(sys.exc_info(),
- "Exception while processing; shutting down thread.")
#----------------------------------------------------------------------
STOPPING = 0 # Set to one if we get SIGTERM
@@ -876,8 +839,7 @@
LOG.debug("Initializing ping log")
pingerDir = os.path.join(config.getWorkDir(), "pinger")
pingerLogDir = os.path.join(pingerDir, "log")
- self.pingLog = mixminion.server.Pinger.PingStatusLog(pingerLogDir)
- self.pingLog.startup()
+ self.pingLog = mixminion.server.Pinger.openPingLog(config)
LOG.debug("Initializing ping generator")