[Author Prev][Author Next][Thread Prev][Thread Next][Author Index][Thread Index]
[minion-cvs] Refactor 2-hop pings; add more utility functions for wo...
Update of /home/minion/cvsroot/src/minion/lib/mixminion/server
In directory moria.mit.edu:/tmp/cvs-serv7910/lib/mixminion/server
Modified Files:
Pinger.py
Log Message:
Refactor 2-hop pings; add more utility functions for working with sqlite
Index: Pinger.py
===================================================================
RCS file: /home/minion/cvsroot/src/minion/lib/mixminion/server/Pinger.py,v
retrieving revision 1.9
retrieving revision 1.10
diff -u -d -r1.9 -r1.10
--- Pinger.py 7 Dec 2004 00:15:15 -0000 1.9
+++ Pinger.py 7 Dec 2004 01:26:38 -0000 1.10
@@ -21,6 +21,7 @@
import bisect
import calendar
import cPickle
+import operator
import os
import re
import string
@@ -47,7 +48,6 @@
sqlite = None
KEEP_HISTORY_DAYS = 15
-USE_HISTORY_DAYS = 12
HEARTBEAT_INTERVAL = 30*60
ONE_DAY = 24*60*60
@@ -152,6 +152,34 @@
def _time(self, t=None):
return long(t or time.time())
+ def _bool(self, b):
+ if b:
+ return 1
+ else:
+ return 0
+
+ def _getInsertOrUpdateFn(self, table, keyCols, valCols):
+ update = "UPDATE %s SET %s WHERE %s" % (
+ table,
+ ", ".join(["%s = %%s" % k for k in keyCols]),
+ " AND ".join(["%s = %%s" % v for v in valCols]))
+ insert = "INSERT INTO %s (%s, %s) VALUES (%s)" % (
+ table,
+ ", ".join(keyCols),
+ ", ".join(valCols),
+ ", ".join(["%s"]*(len(valCols)+len(keyCols))))
+ def fn(keyVals, valVals):
+ assert len(keyVals) == len(keyCols)
+ assert len(valVals) == len(valCols)
+ vals = keyVals+valVals
+
+ self.cursor.execute(update, vals)
+ if self.cursor.rowcount > 0:
+ return
+
+ self.cursor.execute(insert, vals)
+ return fn
+
class PingLog(_SQLiteMixin):
def __init__(self, connection):
self.connection = connection
@@ -161,8 +189,10 @@
self.serverReliability = {}
self.uptimeIntervals = PingerIntervalSchedule()
self.pingIntervals = PingerIntervalSchedule()
+ self.brokenChains = {}
+ self.interestingChains = {}
self._createAllTables()
-
+ self._loadServers()
def _createAllTables(self):
cur = self.cursor
@@ -175,7 +205,7 @@
self._createTable("lifetimes",
[("up", "time", "not null"),
("stillup", "time"),
- ("shutdown", "boolean")])
+ ("shutdown", "time")])
self._createTable("pings",
[("hash", "varchar", "primary key"),
("path", "varchar", "not null"),
@@ -196,7 +226,7 @@
self._createTable("echolotOneHopResults",
[("servername", "varchar", "not null"),
("startAt", "time", "not null"),
- ("endAt", "varchar", "not null"),
+ ("endAt", "time", "not null"),
("nSent", "integer", "not null"),
("nReceived", "integer", "not null"),
("latency", "integer", "not null"),
@@ -208,13 +238,36 @@
("at", "time", "not null"),
("latency", "integer", "not null"),
("reliability", "float", "not null")])
+ self._createTable("echolotCurrentTwoHopResults",
+ [("path", "varchar", "not null"),
+ ("at", "time", "not null"),
+ ("nSent", "integer", "not null"),
+ ("nReceived", "integer", "not null"),
+ ("broken", "boolean", "not null"),
+ ("interesting", "boolean", "not null")])
self._createIndex("lifetimesUp", "lifetimes", ["up"])
self._createIndex("pingsHash", "pings", ["hash"], unique=1)
self._createIndex("pingsPathSR", "pings",
["path", "sentat", "received"])
- # ???? what else on pings?
self._createIndex("connectsAt", "connects", ["at"])
+ # indices on echolot*results
+
+ self._setUptime = self._getInsertOrUpdateFn(
+ "uptimes", ["start", "end", "name"], ["uptime"])
+ self._setOneHop = self._getInsertOrUpdateFn(
+ "echolotOneHopResults",
+ ["servername", "startAt", "endAt"],
+ ["nSent", "nReceived", "latency", "wsent", "wreceived",
+ "reliability"])
+ self._setCurOneHop = self._getInsertOrUpdateFn(
+ "echolotCurrentOneHopResults",
+ ["servername"],
+ ["at", "latency", "reliability"])
+ self._setTwoHop = self._getInsertOrUpdateFn(
+ "echolotCurrentTwoHopResults",
+ ["path"],
+ ["at", "nSent", "nReceived", "broken", "interesting"])
finally:
self.lock.release()
@@ -227,12 +280,33 @@
for name, in res:
self.serverNames[name] = 1
- cur.execute("SELECT servername, MAX(at), reliablity FROM"
- " echolotOneHops")
+ cur.execute("SELECT servername, reliability FROM "
+ "echolotCurrentOneHopResults")
res = cur.fetchall()
- for name,_,rel in res:
+ for name,rel in res:
self.serverReliability[name]=rel
+ cur.execute("SELECT path, broken, interesting FROM "
+ "echolotCurrentTwoHopResults WHERE interesting OR broken")
+ res = cur.fetchall()
+ broken = {}
+ interesting = {}
+ for p, b, i in res:
+ if b:
+ broken[p]=1
+ if i:
+ interesting[p]=1
+ self.isBroken = broken
+ self.isInterestin = interesting
+
+ def updateServers(self, names):
+ self.lock.acquire()
+ try:
+ for n in names:
+ self._addServer(n)
+ finally:
+ self.lock.release()
+
def _addServer(self, name):
# hold lock.
name = name.lower()
@@ -302,7 +376,7 @@
try:
self._addServer(nickname)
self._execute(self._CONNECTED,
- (self._time(now), nickname.lower(), success))
+ (self._time(now), nickname.lower(), self._bool(success)))
finally:
self.lock.release()
@@ -335,7 +409,6 @@
def _calculateUptimes(self, startTime, endTime):
cur = self.cursor
self.lock.acquire()
-
try:
cur.execute("DELETE FROM uptimes WHERE start = %s AND end = %s",
startTime, endTime)
@@ -353,11 +426,14 @@
for start,end,shutdown in cur ])
myIntervals *= timespan
myUptime = myIntervals.spanLength()
+ fracUptime = float(myUptime)/(endTime-startTime)
+ self._setUptime(
+ (self._time(startTime), self._time(endTime), "<self>"),
+ (fracUptime,))
- cur.execute("INSERT INTO uptimes (start, end, name, uptime)"
- " VALUES (%s, %s, '<self>', %s)",
- (self._time(startTime), self._time(endTime),
- float(myUptime)/(endTime-startTime)))
+ #cur.execute("INSERT INTO uptimes (start, end, name, uptime)"
+ # " VALUES (%s, %s, '<self>', %s)",
+ # (self._time(startTime), self._time(endTime),
# Okay, now everybody else.
for s in self.serverNames.keys():
@@ -374,7 +450,7 @@
upAt, downAt = myIntervals.getIntervalContaining(at)
if upAt == None:
# Event at edge of interval
- #XXXX008 warn better?
+ #XXXX008 warn better, or not at all.
LOG.warn("Event out of bounds")
continue
if lastTime is None or upAt > lastTime:
@@ -388,10 +464,7 @@
if times == [0,0]:
continue
fraction = float(times[1])/(times[0]+times[1])
- print "Adding ",name,fraction
- cur.execute("INSERT INTO uptimes (start, end, name, uptime)"
- " VALUES (%s, %s, %s, %s)",
- (startTime, endTime, s, fraction))
+ self._setUptime((startTime, endTime, s), (fraction,))
self.connection.commit()
finally:
self.lock.release()
@@ -493,14 +566,9 @@
rel = wsent / wrcvd
else:
rel = 0.0
- cur.execute(
- "INSERT INTO echolotOneHopResults "
- "(servername, startAt, endAt, nSent, nReceived, latency, "
- "wsent, wreceived, reliability) "
- "VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s)",
- (serverName, self._time(s), self._time(e), sent, rcvd,
- latent, wsent, wrcvd, rel))
-
+ self._setOneHop(
+ (serverName, self._time(s), self._time(e)),
+ (sent, rcvd, latent, wsent, wrcvd, rel))
# 3. Write current overall results into the DB.
if allLatencies:
latent = allLatencies[floorDiv(len(allLatencies),2)]
@@ -514,10 +582,7 @@
rel = wsent / wrcvd
else:
rel = 0.0
- cur.execute(
- "INSERT INTO echolotCurrentOneHopResults "
- "(servername, at, latency, reliability) VALUES (%s, %s, %s, %s)",
- (serverName, self._time(now), latent, rel))
+ self._setCurOneHop((serverName,), (self._time(now), latent, rel))
self.serverReliability[serverName] = rel
def calculateOneHopResults(self, now=None):
@@ -531,38 +596,64 @@
finally:
self.lock.release()
- def _calculateChainCounts(self, startAt, endAt, path):
+ def _calculate2ChainStatus(self, since, path):
# hold lock.
+ cur = self.cursor
cur.execute("SELECT count() FROM pings WHERE path = %s"
- " AND sent >= %s AND sent <= %s",
- (path,startAt,endAt))
+ " AND sentat >= %s",
+ (path,self._time(since)))
nSent, = cur.fetchone()
cur.execute("SELECT count() FROM pings WHERE path = %s"
- " AND sent >= %s AND sent <= %s AND received > 0",
- (path,startAt,endAt))
+ " AND sentat >= %s AND received > 0",
+ (path,since))
nReceived, = cur.fetchone()
servers = path.split(",")
try:
- rels = [ self.reliability[s] for s in servers ]
+ rels = [ self.serverReliability[s] for s in servers ]
product = reduce(operator.mul, rels)
except IndexError:
product = None
- return nSent, nReceived, product
+ if nSent == 0:
+ frac = 0.0
+ else:
+ frac = float(nReceived)/nSent
- def _2chainIsBroken(self, startAt, endAt, path):
- #lock
- nSent, nReceived, product = self._calculateChainCounts()
- frac = float(nReceived)/nSent
- return nSent >= 3 and product is not None and frac <= product*0.3
+ isBroken = nSent >= 3 and product and frac <= product*0.3
+ isInteresting = ((nSent < 3 and nReceived == 0) or
+ (product and frac <= product*0.3))
- def _2chainIsInteresting(self, startAt, endAt, path):
- #lock
- nSent, nReceived, product = self._calculateChainCounts()
- frac = float(nReceived)/nSent
- return (nSent < 3 and nReceived == 0) or (
- product is not None and frac <= product*0.3)
+ return nSent, nReceived, product, isBroken, isInteresting
+
+ _CHAIN_PING_HORIZON = 12*ONE_DAY
+ def calculateChainStatus(self, now=None):
+ self.lock.acquire()
+ try:
+ if now is None:
+ now = time.time()
+ brokenChains = {}
+ interestingChains = {}
+ since = now - self._CHAIN_PING_HORIZON
+
+ for s1 in self.serverNames.keys():
+ for s2 in self.serverNames.keys():
+ p = "%s,%s" % (s1, s2)
+
+ nS, nR, prod, isBroken, isInteresting = \
+ self._calculate2ChainStatus(since, p)
+ if isBroken:
+ brokenChains[p] = 1
+ if isInteresting:
+ interestingChains[p] = 1
+ self._setTwoHop((p,),
+ (self._time(now), nS, nR, self._bool(isBroken),
+ self._bool(isInteresting)))
+
+ self.isBroken = isBroken
+ self.isInteresting = isInteresting
+ finally:
+ self.lock.release()
class PingGenerator:
"""DOCDOC"""
@@ -664,7 +755,7 @@
for s in servers:
nicknames[s.getNickname()]=1
for n in nicknames.keys():
- self._schedulePing((n,),now)
+ self._schedulePing((n,), now)
def _getPeriodStart(self, t):
return previousMidnight(t)
@@ -719,22 +810,7 @@
return previousMidnight(t)
def _getPingInterval(self, path):
- stats = self.pingLog.getLatestStatistics()
- if stats is None:
- return self.DULL_INTERVAL
- pStr = ",".join(path)
- nSent = stats.summary.nSent.get(pStr,0)
- nRcvd = stats.summary.nRcvd.get(pStr,0)
- assert nRcvd <= nSent
- if nSent < 3 and nRcvd < 1:
- return self.INTERESTING_INTERVAL
- try:
- rel1 = stats.summary.reliability[path[0]]
- rel2 = stats.summary.reliability[path[1]]
- except KeyError:
- return self.DULL_INTERVAL
-
- if float(nRcvd)/nSent <= rel1*rel2*0.3:
+ if self.pingLog.isInteresting.get(path, 0):
return self.INTERESTING_INTERVAL
else:
return self.DULL_INTERVAL
@@ -764,8 +840,6 @@
self._schedulePing((n1,n2), now+60)
#XXXX008 we need to reschedule pings when a new directory arrives
-#class GeometricLinkPaddingGenerator
-
class TestLinkPaddingGenerator(PingGenerator):
"""DOCDOC"""
def __init__(self, config):