[Author Prev][Author Next][Thread Prev][Thread Next][Author Index][Thread Index]
[minion-cvs] Rearrange database tables to please weasel and other fo...
Update of /home/minion/cvsroot/src/minion/lib/mixminion/server
In directory moria.mit.edu:/tmp/cvs-serv4812/lib/mixminion/server
Modified Files:
Pinger.py
Log Message:
Rearrange database tables to please weasel and other folks who know how dbs are supposed to look
Index: Pinger.py
===================================================================
RCS file: /home/minion/cvsroot/src/minion/lib/mixminion/server/Pinger.py,v
retrieving revision 1.18
retrieving revision 1.19
diff -u -d -r1.18 -r1.19
--- Pinger.py 13 Dec 2004 07:02:30 -0000 1.18
+++ Pinger.py 17 Dec 2004 20:40:04 -0000 1.19
@@ -50,15 +50,8 @@
HEARTBEAT_INTERVAL = 30*60
ONE_DAY = 24*60*60
-# Map from logical type to type used in (sqlite) database.
-REALTYPES = { 'time' : 'integer',
- 'boolean' : 'integer',
- 'integer' : 'integer',
- 'float' : 'float',
- 'varchar' : 'varchar',
- }
-class PingerIntervalSchedule:
+class IntervalSchedule:
"""DOCDOC -- defines a set of intervals in time."""
def __init__(self):
pass
@@ -77,6 +70,15 @@
class SQLiteDatabase:
#XXXX can only be used from one thread at a time.
LOCKING_IS_COARSE = 1
+ # Map from logical type to type used in (sqlite) database.
+ REALTYPES = { 'timestamp' : 'integer',
+ 'bool' : 'integer',
+ 'integer' : 'integer',
+ 'float' : 'float',
+ 'varchar' : 'varchar',
+ 'char' : 'char',
+ }
+
def __init__(self, location):
parent = os.path.split(location)[0]
createPrivateDir(parent)
@@ -106,17 +108,27 @@
rs = self._theCursor.fetchall()
return len(rs) > 0
- def createTable(self, name, rows):
+ def createTable(self, name, rows, constraints=()):
if self._objectExists(name,"table"):
return
body = []
for r in rows:
+ cname = r[0]
+ ctype = r[1]
+ if '(' in ctype:
+ idx = ctype.find('(')
+ ctype = self.REALTYPES[ctype[:idx]]+ctype[idx:]
+ else:
+ ctype = self.REALTYPES[ctype]
+
if len(r) == 2:
- body.append("%s %s"%(r[0],REALTYPES[r[1]]))
+ body.append("%s %s"%(cname, ctype))
else:
assert len(r) == 3
- body.append("%s %s %s"%(r[0],REALTYPES[r[1]],r[2]))
+ body.append("%s %s %s"%(cname, ctype, r[2]))
+
+ body.extend(constraints)
stmt = "CREATE TABLE %s (%s)" % (name,", ".join(body))
self._theCursor.execute(stmt)
@@ -145,11 +157,16 @@
return 0
def getInsertOrUpdateFn(self, table, keyCols, valCols):
- update = "UPDATE %s SET %s WHERE %s" % (
- table,
- ", ".join(["%s = %%s" % k for k in valCols]),
- " AND ".join(["%s = %%s" % v for v in keyCols]))
- insert = "INSERT INTO %s (%s, %s) VALUES (%s)" % (
+## update = "UPDATE %s SET %s WHERE %s" % (
+## table,
+## ", ".join(["%s = %%s" % k for k in valCols]),
+## " AND ".join(["%s = %%s" % v for v in keyCols]))
+## insert = "INSERT INTO %s (%s, %s) VALUES (%s)" % (
+## table,
+## ", ".join(keyCols),
+## ", ".join(valCols),
+## ", ".join(["%s"]*(len(valCols)+len(keyCols))))
+ stmt = "INSERT OR REPLACE INTO %s (%s, %s) VALUES (%s)"% (
table,
", ".join(keyCols),
", ".join(valCols),
@@ -158,11 +175,7 @@
assert len(keyVals) == len(keyCols)
assert len(valVals) == len(valCols)
- self._theCursor.execute(update, (valVals+keyVals))
- if self._theCursor.rowcount > 0:
- return
-
- self._theCursor.execute(insert, (keyVals+valVals))
+ self._theCursor.execute(stmt, (keyVals+valVals))
return fn
class PingLog:
@@ -171,10 +184,10 @@
self._time = self._db.time
self._bool = self._db.bool
self._lock = threading.RLock()
- self._serverNames = {}
+ self._serverIDs = {}
+ self._intervalIDs = {}
self._serverReliability = {}
- self._uptimeIntervals = PingerIntervalSchedule()
- self._pingIntervals = PingerIntervalSchedule()
+ self._intervals = IntervalSchedule()
self._brokenChains = {}
self._interestingChains = {}
self._startTime = None
@@ -185,76 +198,101 @@
def _createAllTables(self):
self._lock.acquire()
try:
- # FFFF This is terrible DB design. It's not normalized by
- # FFFF any stretch of the imagination, it doesn't have enough
- # FFFF constraints, etc etc.
+ # FFFF There are still a few sucky bits of this DB design.
+ # FFFF First, we depend on SQLite's behavior when inserting null
+ # FFFF into an integer primary key column. (It picks a new integer
+ # FFFF for us, without our having to give a sequence.)
+ # FFFF Second, paths probably want to have their own table.
+
# Raw data
- self._db.createTable("lifetimes",
- [("up", "time", "not null"),
- ("stillup", "time"),
- ("shutdown", "time")])
- self._db.createTable("pings",
- [("hash", "varchar", "primary key"),
- ("path", "varchar", "not null"),
- ("sentat", "time", "not null"),
- ("received", "time")])
- self._db.createTable("connects",
- [("at", "time", "not null"),
- ("server", "varchar", "not null"),
- ("success", "boolean", "not null")])
- self._db.createTable("servers",
- [("name", "varchar", "unique not null")])
+ self._db.createTable("myLifespan",
+ [("startup", "timestamp", "not null"),
+ ("stillup", "timestamp", "not null"),
+ ("shutdown", "timestamp")])
+ self._db.createTable("ping",
+ [("hash", "char(28)", "primary key"),
+ ("path", "varchar(200)", "not null"),
+ ("sentat", "timestamp", "not null"),
+ ("received", "timestamp")])
+ self._db.createTable("server",
+ [("id", "integer", "primary key"),
+ ("name", "varchar(32)", "unique not null")])
+ self._db.createTable(
+ "connectionAttempt",
+ [("at", "timestamp", "not null"),
+ ("server", "integer", "not null REFERENCES server(id)"),
+ ("success", "bool", "not null")])
+
# Results
- self._db.createTable("uptimes",
- [("start", "time", "not null"),
- ("end", "time", "not null"),
- ("name", "varchar", "not null"),
- ("uptime", "float", "not null")])
- self._db.createTable("echolotOneHopResults",
- [("servername", "varchar", "not null"),
- ("startAt", "time", "not null"),
- ("endAt", "time", "not null"),
- ("nSent", "integer", "not null"),
- ("nReceived", "integer", "not null"),
- ("latency", "integer", "not null"),
- ("wsent", "float", "not null"),
- ("wreceived", "float", "not null"),
- ("reliability", "float", "not null")])
- self._db.createTable("echolotCurrentOneHopResults",
- [("servername", "varchar", "unique not null"),
- ("at", "time", "not null"),
- ("latency", "integer", "not null"),
- ("reliability", "float", "not null")])
- self._db.createTable("echolotCurrentTwoHopResults",
- [("path", "varchar", "unique not null"),
- ("at", "time", "not null"),
- ("nSent", "integer", "not null"),
- ("nReceived", "integer", "not null"),
- ("broken", "boolean", "not null"),
- ("interesting", "boolean", "not null")])
+ self._db.createTable("statsInterval",
+ [("id", "integer", "primary key"),
+ ("startAt", "timestamp", "not null"),
+ ("endAt", "timestamp", "not null")])
- self._db.createIndex("lifetimesUp", "lifetimes", ["up"])
- self._db.createIndex("pingsHash", "pings", ["hash"], unique=1)
- self._db.createIndex("pingsPathSR", "pings",
+ self._db.createTable(
+ "uptime",
+ [("interval", "integer", "not null REFERENCES statsinterval(id)"),
+ ("server", "integer", "not null REFERENCES server(id)"),
+ ("uptime", "float", "not null")],
+ ["PRIMARY KEY (interval, server)"])
+
+ self._db.createTable(
+ "echolotOneHopResult",
+ [("server", "integer", "not null REFERENCES server(id)"),
+ ("interval", "integer", "not null REFERENCES statsInterval(id)"),
+ ("nSent", "integer", "not null"),
+ ("nReceived","integer", "not null"),
+ ("latency", "integer", "not null"),
+ ("wsent", "float", "not null"),
+ ("wreceived","float", "not null"),
+ ("reliability", "float", "not null")],
+ ["PRIMARY KEY (server, interval)"])
+
+ self._db.createTable(
+ "echolotCurrentOneHopResult",
+ [("server", "integer",
+ "primary key REFERENCES server(id)"),
+ ("at", "timestamp", "not null"),
+ ("latency", "integer", "not null"),
+ ("reliability", "float", "not null")])
+
+ self._db.createTable(
+ "echolotCurrentTwoHopResult",
+ [("server1", "integer", "not null REFERENCES server(id)"),
+ ("server2", "integer", "not null REFERENCES server(id)"),
+ ("at", "timestamp", "not null"),
+ ("nSent", "integer", "not null"),
+ ("nReceived", "integer", "not null"),
+ ("broken", "bool", "not null"),
+ ("interesting", "bool", "not null")],
+ ["PRIMARY KEY (server1, server2)"])
+
+ self._db.createIndex("serverName", "server", ["name"], unique=1)
+ self._db.createIndex("statsIntervalSE", "statsInterval",
+ ["startAt", "endAt"], unique=1)
+ self._db.createIndex("myLifespanStartup", "myLifespan", ["startUp"])
+ self._db.createIndex("pingHash", "ping", ["hash"], unique=1)
+ self._db.createIndex("pingPathSR", "ping",
["path", "sentat", "received"])
- self._db.createIndex("connectsAt", "connects", ["at"])
- self._db.createIndex("uptimesNS", "uptimes", ["name", "start"])
+ self._db.createIndex("connectionAttemptServerAt",
+ "connectionAttempt", ["server","at"])
+
# indices on echolot*results, uptimes.
self._setUptime = self._db.getInsertOrUpdateFn(
- "uptimes", ["start", "end", "name"], ["uptime"])
+ "uptime", ["interval", "server"], ["uptime"])
self._setOneHop = self._db.getInsertOrUpdateFn(
- "echolotOneHopResults",
- ["servername", "startAt", "endAt"],
+ "echolotOneHopResult",
+ ["server", "interval"],
["nSent", "nReceived", "latency", "wsent", "wreceived",
"reliability"])
self._setCurOneHop = self._db.getInsertOrUpdateFn(
- "echolotCurrentOneHopResults",
- ["servername"],
+ "echolotCurrentOneHopResult",
+ ["server"],
["at", "latency", "reliability"])
self._setTwoHop = self._db.getInsertOrUpdateFn(
- "echolotCurrentTwoHopResults",
- ["path"],
+ "echolotCurrentTwoHopResult",
+ ["server1", "server2"],
["at", "nSent", "nReceived", "broken", "interesting"])
finally:
self._lock.release()
@@ -262,30 +300,34 @@
def _loadServers(self):
# hold lock.
cur = self._db.getCursor()
- cur.execute("SELECT name FROM servers")
+ cur.execute("SELECT id, name FROM server")
res = cur.fetchall()
- serverNames = {}
- serverReliability = {}
- for name, in res:
- serverNames[name] = 1
+ serverIDs = {}
+ for id,name in res:
+ serverIDs[name] = id
- cur.execute("SELECT servername, reliability FROM "
- "echolotCurrentOneHopResults")
+ serverReliability = {}
+ cur.execute("SELECT name, reliability FROM "
+ "echolotCurrentOneHopResult, server "
+ "WHERE server.id = echolotCurrentOneHopResult.server")
res = cur.fetchall()
for name,rel in res:
serverReliability[name]=rel
- cur.execute("SELECT path, broken, interesting FROM "
- "echolotCurrentTwoHopResults WHERE interesting OR broken")
+ cur.execute("SELECT s1.name, s2.name, broken, interesting FROM "
+ "echolotCurrentTwoHopResult, server as S1, server as S2 "
+ "WHERE (interesting = 1 OR broken = 1) AND "
+ "S1.id = server1 AND S2.id = server2")
res = cur.fetchall()
broken = {}
interesting = {}
- for p, b, i in res:
+ for s1, s2, b, i in res:
+ p = "%s,%s"%(s1,s2)
if b:
broken[p]=1
if i:
interesting[p]=1
- self._serverNames = serverNames
+ self._serverIDs = serverIDs
self._serverReliability = serverReliability
self._brokenChains = broken
self._interestingChains = interesting
@@ -298,26 +340,53 @@
finally:
self._lock.release()
- def _addServer(self, name):
+ def _getServerID(self, name):
# doesn't commit.
name = name.lower()
self._lock.acquire()
try:
- if self._serverNames.has_key(name):
- return
- self._serverNames[name] = 1
+ try:
+ return self._serverIDs[name]
+ except KeyError:
+ self._serverIDs[name] = 1
finally:
self._lock.release()
- self._db.getCursor().execute("INSERT INTO servers (name) VALUES (%s)", name)
+ cur = self._db.getCursor()
+
+ cur.execute("INSERT INTO server (name) VALUES (%s)", name)
+ cur.execute("SELECT id FROM server WHERE name = %s", name)
+ #XXXX catch errors!
+ ident, = cur.fetchone()
+ self._serverIDs[name]=ident
+ return ident
+
+ def _getIntervalID(self, start, end):
+ # CACHE THESE? FFFF
+ start = self._db.time(start)
+ end = self._db.time(end)
+ cur = self._db.getCursor()
+ cur.execute("SELECT id FROM statsInterval WHERE "
+ "startAt = %s AND endAt = %s", start, end)
+ r = cur.fetchall()
+ if len(r) == 1:
+ return r[0][0]
+
+ cur.execute("INSERT INTO statsInterval (startAt, endAt) "
+ "VALUES (%s, %s)", start, end)
+ cur.execute("SELECT id FROM statsInterval WHERE "
+ "startAt = %s AND endAt = %s", start, end)
+ r = cur.fetchall()
+ assert len(r) == 1
+ return r[0][0]
def rotate(self, now=None):
if now is None: now = time.time()
cutoff = self._time(now - KEEP_HISTORY_DAYS * ONE_DAY)
cur = self._db.getCursor()
- cur.execute("DELETE FROM lifetimes WHERE stillup < %s", cutoff)
- cur.execute("DELETE FROM pings WHERE sentat < %s", cutoff)
- cur.execute("DELETE FROM connects WHERE at < %s", cutoff)
+ cur.execute("DELETE FROM myLifespan WHERE stillup < %s", cutoff)
+ cur.execute("DELETE FROM ping WHERE sentat < %s", cutoff)
+ cur.execute("DELETE FROM connectionAttempt WHERE at < %s", cutoff)
self._db.getConnection().commit()
def flush(self):
@@ -326,7 +395,7 @@
def close(self):
self._db.close()
- _STARTUP = "INSERT INTO lifetimes (up, stillup, shutdown) VALUES (%s,%s, 0)"
+ _STARTUP = "INSERT INTO myLifespan (startup, stillup, shutdown) VALUES (%s,%s, 0)"
def startup(self,now=None):
self._lock.acquire()
self._startTime = now = self._time(now)
@@ -334,43 +403,43 @@
self._db.getCursor().execute(self._STARTUP, (now,now))
self._db.getConnection().commit()
- _SHUTDOWN = "UPDATE lifetimes SET stillup = %s, shutdown = %s WHERE up = %s"
+ _SHUTDOWN = "UPDATE myLifespan SET stillup = %s, shutdown = %s WHERE startup = %s"
def shutdown(self, now=None):
if self._startTime is None: self.startup()
now = self._time(now)
self._db.getCursor().execute(self._SHUTDOWN, (now, now, self._startTime))
self._db.getConnection().commit()
- _HEARTBEAT = "UPDATE lifetimes SET stillup = %s WHERE up = %s AND stillup < %s"
+ _HEARTBEAT = "UPDATE myLifespan SET stillup = %s WHERE startup = %s AND stillup < %s"
def heartbeat(self, now=None):
if self._startTime is None: self.startup()
now = self._time(now)
self._db.getCursor().execute(self._HEARTBEAT, (now, self._startTime, now))
self._db.getConnection().commit()
- _CONNECTED = ("INSERT INTO connects (at, server, success) "
+ _CONNECTED = ("INSERT INTO connectionAttempt (at, server, success) "
"VALUES (%s,%s,%s)")
def connected(self, nickname, success=1, now=None):
- self._addServer(nickname)
+ serverID = self._getServerID(nickname)
self._db.getCursor().execute(self._CONNECTED,
- (self._time(now), nickname.lower(), self._bool(success)))
+ (self._time(now), serverID, self._bool(success)))
self._db.getConnection().commit()
def connectFailed(self, nickname, now=None):
self.connected(nickname, success=0, now=now)
- _QUEUED_PING = ("INSERT INTO pings (hash, path, sentat, received)"
+ _QUEUED_PING = ("INSERT INTO ping (hash, path, sentat, received)"
"VALUES (%s,%s,%s,%s)")
def queuedPing(self, hash, path, now=None):
assert len(hash) == mixminion.Crypto.DIGEST_LEN
path = path.lower()
for s in path.split(","):
- self._addServer(s)
+ self._getServerID(s)
self._db.getCursor().execute(self._QUEUED_PING,
(formatBase64(hash), path, self._time(now), 0))
self._db.getConnection().commit()
- _GOT_PING = "UPDATE pings SET received = %s WHERE hash = %s"
+ _GOT_PING = "UPDATE ping SET received = %s WHERE hash = %s"
def gotPing(self, hash, now=None):
assert len(hash) == mixminion.Crypto.DIGEST_LEN
self._db.getCursor().execute(self._GOT_PING, (self._time(now), formatBase64(hash)))
@@ -390,9 +459,10 @@
self.heartbeat(now)
timespan = IntervalSet( [(startTime, endTime)] )
+ intervalID = self._getIntervalID(startTime, endTime)
- cur.execute("SELECT up, stillup, shutdown FROM lifetimes WHERE "
- "up <= %s AND stillup >= %s",
+ cur.execute("SELECT startup, stillup, shutdown FROM myLifespan WHERE "
+ "startup <= %s AND stillup >= %s",
self._time(endTime), self._time(startTime))
myUptime = 0
myIntervals = IntervalSet([ (start, max(end,shutdown))
@@ -401,15 +471,15 @@
myUptime = myIntervals.spanLength()
fracUptime = float(myUptime)/(endTime-startTime)
self._setUptime(
- (self._time(startTime), self._time(endTime), "<self>"),
+ (intervalID, self._getServerID("<self>")),
(fracUptime,))
# Okay, now everybody else.
- for s in self._serverNames.keys():
- cur.execute("SELECT at, success FROM connects"
+ for s, serverID in self._serverIDs.items():
+ cur.execute("SELECT at, success FROM connectionAttempt"
" WHERE server = %s AND at >= %s AND at <= %s"
" ORDER BY at",
- s, startTime, endTime)
+ serverID, startTime, endTime)
lastStatus = None
lastTime = None
@@ -435,17 +505,17 @@
if times == [0,0]:
continue
fraction = float(times[1])/(times[0]+times[1])
- self._setUptime((startTime, endTime, s), (fraction,))
+ self._setUptime((intervalID, serverID), (fraction,))
def calculateUptimes(self, startAt, endAt, now=None):
if now is None: now = time.time()
self._lock.acquire()
try:
- serverNames = self._serverNames.keys()
+ serverNames = self._serverIDs.keys()
finally:
self._lock.release()
serverNames.sort()
- for s, e in self._uptimeIntervals.getIntervals(startAt, endAt):
+ for s, e in self._intervals.getIntervals(startAt, endAt):
self._calculateUptimes(serverNames, s, e, now=now)
self._db.getConnection().commit()
@@ -455,8 +525,11 @@
"""
result = {}
cur = self._db.getCursor()
- cur.execute("SELECT start, end, name, uptime FROM uptimes "
- "WHERE %s >= start AND %s <= end",
+ cur.execute("SELECT startat, endat, name, uptime "
+ "FROM uptime, statsInterval, server "
+ "WHERE statsInterval.id = uptime.interval "
+ "AND server.id = uptime.server "
+ "AND %s >= startat AND %s <= endat",
(self._time(startAt), self._time(endAt)))
for s,e,n,u in cur:
result.setdefault((s,e), {})[n] = u
@@ -481,7 +554,7 @@
_WEIGHT_AGE_PERIOD = 24*60*60
_WEIGHT_AGE = [ 1, 2, 2, 3, 5, 8, 9, 10, 10, 10, 10, 5 ]
_PING_GRANULARITY = 24*60*60
- def _calculateOneHopResults(self, serverName, startTime, endTime,
+ def _calculateOneHopResult(self, serverName, startTime, endTime,
now=None, calculateOverallResults=1):
# commit when done; serverName must exist.
cur = self._db.getCursor()
@@ -491,10 +564,11 @@
startTime = min(startTime,
now - (len(self._WEIGHT_AGE)*self._WEIGHT_AGE_PERIOD))
endTime = max(endTime, now)
- intervals = self._pingIntervals.getIntervals(startTime, endTime)
+ intervals = self._intervals.getIntervals(startTime, endTime)
nPeriods = len(intervals)
startTime = intervals[0][0]
endTime = intervals[-1][1]
+ serverID = self._getServerID(serverName)
# 1. Compute latencies and number of pings sent in each period.
# We need to learn these first so we can tell the percentile
@@ -502,7 +576,7 @@
dailyLatencies = [[] for _ in xrange(nPeriods)]
nSent = [0]*nPeriods
nPings = 0
- cur.execute("SELECT sentat, received FROM pings WHERE path = %s"
+ cur.execute("SELECT sentat, received FROM ping WHERE path = %s"
" AND sentat >= %s AND sentat <= %s",
(serverName, startTime, endTime))
for sent,received in cur:
@@ -535,7 +609,7 @@
nReceived = [0]*nPeriods
perTotalWeights = [0]*nPeriods
perTotalWeighted = [0]*nPeriods
- cur.execute("SELECT sentat, received FROM pings WHERE path = %s"
+ cur.execute("SELECT sentat, received FROM ping WHERE path = %s"
" AND sentat >= %s AND sentat <= %s",
(serverName, startTime, endTime))
for sent,received in cur:
@@ -554,7 +628,8 @@
# 2b. Write per-day results into the DB.
for pIdx in xrange(len(intervals)):
- s, e = intervals[pIdx]
+ s,e = intervals[pIdx]
+ intervalID = self._getIntervalID(s,e)
latent = self._roundLatency(dailyMedianLatency[pIdx])
sent = nSent[pIdx]
rcvd = nReceived[pIdx]
@@ -569,7 +644,7 @@
# "rel=%s/%s=%s",
# pIdx, rcvd, sent, wrcvd, wsent, rel)
self._setOneHop(
- (serverName, self._time(s), self._time(e)),
+ (serverID, intervalID),
(sent, rcvd, latent, wsent, wrcvd, rel))
if not calculateOverallResults:
@@ -591,13 +666,13 @@
rel = wrcvd / wsent
else:
rel = 0.0
- self._setCurOneHop((serverName,), (self._time(now), latent, rel))
+ self._setCurOneHop((serverID,), (self._time(now), latent, rel))
return rel
- def calculateOneHopResults(self, now=None):
+ def calculateOneHopResult(self, now=None):
self._lock.acquire()
try:
- serverNames = self._serverNames.keys()
+ serverNames = self._serverIDs.keys()
finally:
self._lock.release()
@@ -607,7 +682,7 @@
reliability = {}
for s in serverNames:
# For now, always calculate overall results.
- r = self._calculateOneHopResults(s,now,now,now,
+ r = self._calculateOneHopResult(s,now,now,now,
calculateOverallResults=1)
reliability[s] = r
self._db.getConnection().commit()
@@ -617,22 +692,21 @@
finally:
self._lock.release()
- def _calculate2ChainStatus(self, since, path, now=None):
+ def _calculate2ChainStatus(self, since, s1, s2, now=None):
# doesn't commit.
cur = self._db.getCursor()
- cur.execute("SELECT count() FROM pings WHERE path = %s"
+ path = "%s,%s"%(s1,s2)
+ cur.execute("SELECT count() FROM ping WHERE path = %s"
" AND sentat >= %s",
(path,self._time(since)))
nSent, = cur.fetchone()
- cur.execute("SELECT count() FROM pings WHERE path = %s"
+ cur.execute("SELECT count() FROM ping WHERE path = %s"
" AND sentat >= %s AND received > 0",
(path,since))
nReceived, = cur.fetchone()
- servers = path.split(",")
try:
- rels = [ self._serverReliability[s] for s in servers ]
- product = reduce(operator.mul, rels)
+ product = self._serverReliability[s1] * self._serverReliability[s2]
except KeyError:
product = None
@@ -651,7 +725,7 @@
def calculateChainStatus(self, now=None):
self._lock.acquire()
try:
- serverNames = self._serverNames.keys()
+ serverNames = self._serverIDs.keys()
finally:
self._lock.release()
@@ -665,17 +739,18 @@
for s1 in serverNames:
for s2 in serverNames:
- p = "%s,%s" % (s1, s2)
-
+ p = "%s,%s"%(s1,s2)
nS, nR, prod, isBroken, isInteresting = \
- self._calculate2ChainStatus(since, p)
+ self._calculate2ChainStatus(since, s1, s2)
if isBroken:
brokenChains[p] = 1
if isInteresting:
interestingChains[p] = 1
- self._setTwoHop((p,),
- (self._time(now), nS, nR, self._bool(isBroken),
- self._bool(isInteresting)))
+
+ self._setTwoHop(
+ (self._getServerID(s1), self._getServerID(s2)),
+ (self._time(now), nS, nR, self._bool(isBroken),
+ self._bool(isInteresting)))
self._db.getConnection().commit()
self._lock.acquire()
@@ -688,7 +763,7 @@
def dumpAllStatus(self,f,since,now=None):
self._lock.acquire()
try:
- serverNames = self._serverNames.keys()
+ serverNames = self._serverIDs.keys()
finally:
self._lock.release()
@@ -699,9 +774,11 @@
print >>f, "\n# Map from server to list of (period-start, period-end, fractional uptime"
print >>f, "SERVER_UPTIMES = {"
- cur.execute("SELECT start,end,name,uptime FROM uptimes "
- "WHERE start >= %s AND start <= %s"
- "ORDER BY name, start", (since, now))
+ cur.execute("SELECT startAt,endAt,name,uptime FROM uptime, server, statsInterval "
+ "WHERE startAt >= %s AND startAt <= %s "
+ "AND uptime.server = server.id "
+ "AND uptime.interval = statsInterval.id "
+ "ORDER BY name, startAt", (since, now))
lastServer = "---"
for s,e,n,u in cur:
if n != lastServer:
@@ -717,10 +794,13 @@
# # of those pings received, median latency on those pings (sec),
# weighted reliability)"""
print >>f, "SERVER_DAILY_PING_STATUS = {"
- cur.execute("SELECT servername,startAt,endAt,nSent,nReceived,"
- " latency,reliability FROM echolotOneHopResults "
- "WHERE startat >= %s AND startat <= %s"
- "ORDER BY servername, startat", (since, now))
+ cur.execute("SELECT name,startAt,endAt,nSent,nReceived,"
+ " latency,reliability "
+ "FROM echolotOneHopResult, server, statsInterval "
+ "WHERE startat >= %s AND startat <= %s"
+ "AND echolotOneHopResult.server = server.id "
+ "AND echolotOneHopResult.interval = statsInterval.id "
+ "ORDER BY name, startat", (since, now))
lastServer = "---"
for n,s,e,nS,nR,lat,r in cur:
if n != lastServer:
@@ -733,26 +813,29 @@
print >>f, "\n# Map from server-name to current (avg latency, avg reliability)"
print >>f, "SERVER_CUR_PING_STATUS = {"
- cur.execute("SELECT servername,latency,reliability FROM "
- "echolotCurrentOneHopResults")
+ cur.execute("SELECT name,latency,reliability FROM "
+ "echolotCurrentOneHopResult, server WHERE "
+ "echolotCurrentOneHopResult.server = server.id")
for n,lat,r in cur:
print >>f, " %r : (%s,%.04f)," %(n,lat,r)
print >>f, "}"
print >>f, "\n# Chains that we want to know more about"
print >>f, "INTERESTING_CHAINS = ["
- cur.execute("SELECT path FROM echolotCurrentTwoHopResults "
- "WHERE interesting = 1")
- for p, in cur:
- print >>f, " %r,"%p
+ cur.execute("SELECT S1.name, S2.name FROM echolotCurrentTwoHopResult, "
+ " server as S1, server as S2 WHERE "
+ "interesting = 1 AND S1.id = server1 AND S2.id = server2")
+ for s1,s2 in cur:
+ print >>f, " '%s,%s',"%(s1,s2)
print >>f, "]"
print >>f, "\n# Chains that are more unreliable than we'd expect"
print >>f, "BROKEN_CHAINS = ["
- cur.execute("SELECT path FROM echolotCurrentTwoHopResults "
- "WHERE broken = 1")
- for p, in cur:
- print >>f, " %r,"%p
+ cur.execute("SELECT S1.name, S2.name FROM echolotCurrentTwoHopResult, "
+ " server as S1, server as S2 WHERE "
+ "broken = 1 AND S1.id = server1 AND S2.id = server2")
+ for s1,s2 in cur:
+ print >>f, " '%s,%s',"%(s1,s2)
print >>f, "]"
print >>f, "\n"
self._db.getConnection().commit()
@@ -763,7 +846,7 @@
LOG.info("Starting to compute server uptimes.")
self.calculateUptimes(now-24*60*60*12, now)
LOG.info("Starting to compute one-hop ping results")
- self.calculateOneHopResults(now)
+ self.calculateOneHopResult(now)
LOG.info("Starting to compute two-hop chain status")
self.calculateChainStatus(now)
if outFname: