[Author Prev][Author Next][Thread Prev][Thread Next][Author Index][Thread Index]
[minion-cvs] More testing and debugging on pinging. automate a touch...
Update of /home/minion/cvsroot/src/minion/lib/mixminion/server
In directory moria.mit.edu:/tmp/cvs-serv16676/lib/mixminion/server
Modified Files:
Pinger.py
Log Message:
More testing and debugging on pinging. automate a touch better. Add a mechanism to dump output. Next steps: get concurrency right (deep hurting)
Index: Pinger.py
===================================================================
RCS file: /home/minion/cvsroot/src/minion/lib/mixminion/server/Pinger.py,v
retrieving revision 1.11
retrieving revision 1.12
diff -u -d -r1.11 -r1.12
--- Pinger.py 7 Dec 2004 01:44:31 -0000 1.11
+++ Pinger.py 11 Dec 2004 02:48:54 -0000 1.12
@@ -35,7 +35,7 @@
import mixminion.server.PacketHandler
import mixminion.server.MMTPServer
-from mixminion.Common import MixError, ceilDiv, createPrivateDir, \
+from mixminion.Common import MixError, AtomicFile, ceilDiv, createPrivateDir, \
floorDiv, formatBase64, formatFnameDate, formatTime, IntervalSet, LOG, \
parseFnameDate, previousMidnight, readPickled, secureDelete, \
succeedingMidnight, writePickled
@@ -57,40 +57,6 @@
'varchar' : 'varchar',
}
-
-_MERGE_ITERS_CODE = """
-_MERGE_DONE = object()
-def _wrapNext(next,DONE=_MERGE_DONE):
- def fn():
- try:
- return next()
- except StopIteration:
- return DONE
- return fn
-def _mergeIters(iterable1, iterable2):
- DONE = _MERGE_DONE
- n1 = _wrapNext(iter(iterable1).next)
- n2 = _wrapNext(iter(iterable2).next)
- peek1 = n1()
- peek2 = n2()
- while peek1 is not DONE and peek2 is not DONE:
- if peek1 <= peek2:
- yield peek1
- peek1 = n1()
- else:
- yield peek2
- peek2 = n2()
- while peek1 is not DONE:
- yield peek1
- peek1 = n1()
- while peek2 is not DONE:
- yield peek2
- peek2 = n2()
-"""
-
-if sys.version_info[:2] >= (2,2):
- exec _MERGE_ITERS_CODE
-
class PingerIntervalSchedule:
"""DOCDOC -- defines a set of intervals in time."""
def __init__(self):
@@ -159,8 +125,8 @@
def _getInsertOrUpdateFn(self, table, keyCols, valCols):
update = "UPDATE %s SET %s WHERE %s" % (
table,
- ", ".join(["%s = %%s" % k for k in keyCols]),
- " AND ".join(["%s = %%s" % v for v in valCols]))
+ ", ".join(["%s = %%s" % k for k in valCols]),
+ " AND ".join(["%s = %%s" % v for v in keyCols]))
insert = "INSERT INTO %s (%s, %s) VALUES (%s)" % (
table,
", ".join(keyCols),
@@ -169,13 +135,12 @@
def fn(keyVals, valVals):
assert len(keyVals) == len(keyCols)
assert len(valVals) == len(valCols)
- vals = keyVals+valVals
- self.cursor.execute(update, vals)
+ self.cursor.execute(update, (valVals+keyVals))
if self.cursor.rowcount > 0:
return
- self.cursor.execute(insert, vals)
+ self.cursor.execute(insert, (keyVals+valVals))
return fn
class PingLog(_SQLiteMixin):
@@ -189,6 +154,8 @@
self.pingIntervals = PingerIntervalSchedule()
self.brokenChains = {}
self.interestingChains = {}
+ self._startTime = None
+ self._lastRecalculation = 0
self._createAllTables()
self._loadServers()
@@ -231,12 +198,12 @@
("wreceived", "float", "not null"),
("reliability", "float", "not null")])
self._createTable("echolotCurrentOneHopResults",
- [("servername", "varchar", "not null"),
+ [("servername", "varchar", "unique not null"),
("at", "time", "not null"),
("latency", "integer", "not null"),
("reliability", "float", "not null")])
self._createTable("echolotCurrentTwoHopResults",
- [("path", "varchar", "not null"),
+ [("path", "varchar", "unique not null"),
("at", "time", "not null"),
("nSent", "integer", "not null"),
("nReceived", "integer", "not null"),
@@ -248,7 +215,8 @@
self._createIndex("pingsPathSR", "pings",
["path", "sentat", "received"])
self._createIndex("connectsAt", "connects", ["at"])
- # indices on echolot*results
+ self._createIndex("uptimesNS", "uptimes", ["name", "start"])
+ # indices on echolot*results, uptimes.
self._setUptime = self._getInsertOrUpdateFn(
"uptimes", ["start", "end", "name"], ["uptime"])
@@ -294,7 +262,7 @@
if i:
interesting[p]=1
self.isBroken = broken
- self.isInterestin = interesting
+ self.isInteresting = interesting
def updateServers(self, names):
self.lock.acquire()
@@ -358,10 +326,11 @@
now = self._time(now)
self._execute(self._SHUTDOWN, (now, now, self._startTime))
- _HEARTBEAT = "UPDATE lifetimes SET stillup = %s WHERE up = %s"
+ _HEARTBEAT = "UPDATE lifetimes SET stillup = %s WHERE up = %s AND stillup < %s"
def heartbeat(self, now=None):
if self._startTime is None: self.startup()
- self._execute(self._HEARTBEAT, (self._time(now), self._startTime))
+ now = self._time(now)
+ self._execute(self._HEARTBEAT, (now, self._startTime, now))
def rotated(self):
pass
@@ -377,8 +346,8 @@
finally:
self.lock.release()
- def connectFailed(self, nickname):
- self.connected(nickname, 0)
+ def connectFailed(self, nickname, now=None):
+ self.connected(nickname, success=0, now=now)
_QUEUED_PING = ("INSERT INTO pings (hash, path, sentat, received)"
"VALUES (%s,%s,%s,%s)")
@@ -403,15 +372,13 @@
elif n > 1:
LOG.warn("Received ping with multiple hash entries!")
- def _calculateUptimes(self, startTime, endTime):
+ def _calculateUptimes(self, startTime, endTime, now=None):
cur = self.cursor
self.lock.acquire()
try:
- cur.execute("DELETE FROM uptimes WHERE start = %s AND end = %s",
- startTime, endTime)
-
# First, calculate my own uptime.
- self.heartbeat()
+ if now is None: now = time.time()
+ self.heartbeat(now)
timespan = IntervalSet( [(startTime, endTime)] )
@@ -428,10 +395,6 @@
(self._time(startTime), self._time(endTime), "<self>"),
(fracUptime,))
- #cur.execute("INSERT INTO uptimes (start, end, name, uptime)"
- # " VALUES (%s, %s, '<self>', %s)",
- # (self._time(startTime), self._time(endTime),
-
# Okay, now everybody else.
for s in self.serverNames.keys():
cur.execute("SELECT at, success FROM connects"
@@ -441,14 +404,12 @@
lastStatus = None
lastTime = None
- times = [ 0, 0] # uptime, downtime
+ times = [ 0, 0 ] # uptime, downtime
for at, success in cur:
assert success in (0,1)
upAt, downAt = myIntervals.getIntervalContaining(at)
if upAt == None:
- # Event at edge of interval
- #XXXX008 warn better, or not at all.
- LOG.warn("Event out of bounds")
+ # Event outside edge of interval.
continue
if lastTime is None or upAt > lastTime:
lastTime = upAt
@@ -457,6 +418,8 @@
t = (at-lastTime)/2.0
times[success] += t
times[lastStatus] += t
+ lastStatus = success
+ lastTime = at
if times == [0,0]:
continue
@@ -466,9 +429,10 @@
finally:
self.lock.release()
- def calculateUptimes(self, startAt, endAt):
+ def calculateUptimes(self, startAt, endAt, now=None):
+ if now is None: now = time.time()
for s, e in self.uptimeIntervals.getIntervals(startAt, endAt):
- self._calculateUptimes(s, e)
+ self._calculateUptimes(s, e, now=now)
def getUptimes(self, startAt, endAt):
"""DODOC: uptimes for all servers overlapping [startAt, endAt],
@@ -487,17 +451,39 @@
finally:
self.lock.release()
+ def _roundLatency(self, latency):
+ """Using a median latency can leak the fact that a message was a
+ ping. DOCDOC"""
+ for cutoff, q in [
+ (60, 5), (10*60, 60), (30*60, 2*60),
+ (60*60, 5*60), (3*60*60, 10*60), (12*60*60, 20*60),
+ (24*60*60, 30*60) ]:
+ if latency < cutoff:
+ quantum = q
+ break
+ else:
+ quantum = 60*60
+
+ return int( float(latency)/quantum + 0.5 ) * quantum
+
_WEIGHT_AGE_PERIOD = 24*60*60
_WEIGHT_AGE = [ 1, 2, 2, 3, 5, 8, 9, 10, 10, 10, 10, 5 ]
_PING_GRANULARITY = 24*60*60
- def _calculateOneHopResults(self, serverName, now=None):
+ def _calculateOneHopResults(self, serverName, startTime, endTime,
+ now=None, calculateOverallResults=1):
# hold lock; commit when done.
cur = self.cursor
if now is None:
now = time.time()
- horizon_size = self._WEIGHT_AGE_PERIOD * len(self._WEIGHT_AGE)
- startTime =self.pingIntervals.getIntervalContaining(now-horizon_size)[0]
- nPeriods = len(self.pingIntervals.getIntervals(startTime, now))
+ self._addServer(serverName)
+ if calculateOverallResults:
+ startTime = min(startTime,
+ now - (len(self._WEIGHT_AGE)*self._WEIGHT_AGE_PERIOD))
+ endTime = max(endTime, now)
+ intervals = self.pingIntervals.getIntervals(startTime, endTime)
+ nPeriods = len(intervals)
+ startTime = intervals[0][0]
+ endTime = intervals[-1][1]
# 1. Compute latencies and number of pings sent in each period.
# We need to learn these first so we can tell the percentile
@@ -507,7 +493,7 @@
nPings = 0
cur.execute("SELECT sentat, received FROM pings WHERE path = %s"
" AND sentat >= %s AND sentat <= %s",
- (serverName, startTime, now ))
+ (serverName, startTime, endTime))
for sent,received in cur:
pIdx = floorDiv(sent-startTime, self._PING_GRANULARITY)
nSent[pIdx] += 1
@@ -527,6 +513,10 @@
del d
del dailyLatencies
allLatencies.sort()
+ #if allLatencies:
+ # LOG.warn("%s pings in %s intervals. Median latency is %s seconds",
+ # nPings, nPeriods,
+ # allLatencies[floorDiv(len(allLatencies),2)])
# 2. Compute the number of pings actually received each day,
# and the number of pings received each day weighted by
@@ -536,7 +526,7 @@
perTotalWeighted = [0]*nPeriods
cur.execute("SELECT sentat, received FROM pings WHERE path = %s"
" AND sentat >= %s AND sentat <= %s",
- (serverName, startTime, now))
+ (serverName, startTime, endTime))
for sent,received in cur:
pIdx = floorDiv(sent-startTime, self._PING_GRANULARITY)
if received:
@@ -545,38 +535,49 @@
else:
mod_age = (now-sent-15*60)*0.8
w = bisect.bisect_left(allLatencies, mod_age)/float(nPings)
+ #LOG.warn("Percentile is %s.", w)
perTotalWeights[pIdx] += w
if received:
perTotalWeighted[pIdx] += w
# 2b. Write per-day results into the DB.
- intervals = self.pingIntervals.getIntervals(startTime, now)
for pIdx in xrange(len(intervals)):
s, e = intervals[pIdx]
- latent = dailyMedianLatency[pIdx]
+ latent = self._roundLatency(dailyMedianLatency[pIdx])
sent = nSent[pIdx]
rcvd = nReceived[pIdx]
wsent = perTotalWeights[pIdx]
wrcvd = perTotalWeighted[pIdx]
- if wrcvd:
- rel = wsent / wrcvd
+ if wsent:
+ rel = wrcvd / wsent
else:
rel = 0.0
+ #if sent:
+ # LOG.warn("Of pings sent on day %s, %s/%s were received. "
+ # "rel=%s/%s=%s",
+ # pIdx, rcvd, sent, wrcvd, wsent, rel)
self._setOneHop(
(serverName, self._time(s), self._time(e)),
(sent, rcvd, latent, wsent, wrcvd, rel))
+
+ if not calculateOverallResults:
+ return
+
# 3. Write current overall results into the DB.
if allLatencies:
- latent = allLatencies[floorDiv(len(allLatencies),2)]
+ latent = self._roundLatency(allLatencies[floorDiv(len(allLatencies),2)])
else:
latent = 0
wsent = wrcvd = 0.0
+ nPeriods = len(self._WEIGHT_AGE)
+ perTotalWeights = perTotalWeights[-nPeriods:]
+ perTotalWeighted = perTotalWeighted[-nPeriods:]
for s, r, w in zip(perTotalWeights, perTotalWeighted, self._WEIGHT_AGE):
wsent += s*w
wrcvd += r*w
- if wrcvd:
- rel = wsent / wrcvd
+ if wsent:
+ rel = wrcvd / wsent
else:
rel = 0.0
self._setCurOneHop((serverName,), (self._time(now), latent, rel))
@@ -588,12 +589,14 @@
if now is None:
now = time.time()
for s in self.serverNames.keys():
- self._calculateOneHopResults(s,now)
+ # For now, always calculate overall results.
+ self._calculateOneHopResults(s,now,now,now,
+ calculateOverallResults=1)
self.connection.commit()
finally:
self.lock.release()
- def _calculate2ChainStatus(self, since, path):
+ def _calculate2ChainStatus(self, since, path, now=None):
# hold lock.
cur = self.cursor
cur.execute("SELECT count() FROM pings WHERE path = %s"
@@ -609,7 +612,7 @@
try:
rels = [ self.serverReliability[s] for s in servers ]
product = reduce(operator.mul, rels)
- except IndexError:
+ except KeyError:
product = None
if nSent == 0:
@@ -652,6 +655,92 @@
finally:
self.lock.release()
+ def dumpAllStatus(self,f,since,now=None):
+ self.lock.acquire()
+ try:
+ if now is None: now = time.time()
+ print >>f, "# List of all currently tracked servers."
+ print >>f, "KNOWN_SERVERS =",self.serverNames.keys()
+ cur = self.cursor
+
+ print >>f, "\n# Map from server to list of (period-start, period-end, fractional uptime"
+ print >>f, "SERVER_UPTIMES = {"
+ cur.execute("SELECT start,end,name,uptime FROM uptimes "
+ "WHERE start >= %s AND end <= %s"
+ "ORDER BY name, start", (since, now))
+ lastServer = "---"
+ for s,e,n,u in cur:
+ if n != lastServer:
+ if lastServer != '---': print >>f, " ],"
+ lastServer = n
+ print >>f, " %r : [" % n
+ print >>f, " (%s,%s,%.04f),"%(s,e,u)
+ if lastServer != '---': print >>f, " ]"
+ print >>f, "}"
+
+ print >>f, """
+# Map from server name to list of (period-start, period-end, # pings sent,
+# # of those pings received, median latency on those pings (sec),
+# weighted reliability)"""
+ print >>f, "SERVER_DAILY_PING_STATUS = {"
+ cur.execute("SELECT servername,startAt,endAt,nSent,nReceived,"
+ " latency,reliability FROM echolotOneHopResults "
+ "WHERE startat >= %s AND endat <= %s"
+ "ORDER BY servername, startat", (since, now))
+ lastServer = "---"
+ for n,s,e,nS,nR,lat,r in cur:
+ if n != lastServer:
+ if lastServer != '---': print >>f, " ],"
+ lastServer = n
+ print >>f, " %r : [" % n
+ print >>f, " (%s,%s,%s,%s,%s,%.04f),"%(s,e,nS,nR,lat,r)
+ if lastServer != '---': print >>f, " ]"
+ print >>f, "}"
+
+ print >>f, "\n# Map from server-name to current (avg latency, avg reliability)"
+ print >>f, "SERVER_CUR_PING_STATUS = {"
+ cur.execute("SELECT servername,latency,reliability FROM "
+ "echolotCurrentOneHopResults")
+ for n,lat,r in cur:
+ print >>f, " %r : (%s,%.04f)," %(n,lat,r)
+ print >>f, "}"
+
+ print >>f, "\n# Chains that we want to know more about"
+ print >>f, "INTERESTING_CHAINS = ["
+ cur.execute("SELECT path FROM echolotCurrentTwoHopResults "
+ "WHERE interesting = 1")
+ for p, in cur:
+ print >>f, " %r,"%p
+ print >>f, "]"
+
+ print >>f, "\n# Chains that are more unreliable than we'd expect"
+ print >>f, "BROKEN_CHAINS = ["
+ cur.execute("SELECT path FROM echolotCurrentTwoHopResults "
+ "WHERE broken = 1")
+ for p, in cur:
+ print >>f, " %r,"%p
+ print >>f, "]"
+ print >>f, "\n"
+
+ finally:
+ self.lock.release()
+
+ def calculateAll(self, outFname=None, now=None):
+ LOG.info("Computing ping results.")
+ LOG.info("Starting to compute server uptimes.")
+ self.calculateUptimes(now, now-24*60*60*12)
+ LOG.info("Starting to compute one-hop ping results")
+ self.calculateOneHopResults(now)
+ LOG.info("Starting to compute two-hop chain status")
+ self.calculateChainStatus(now)
+ if outFname:
+ LOG.info("Writing ping results to disk")
+ f = AtomicFile(outFname, 'w')
+ self.dumpAllStatus(f, now-24*60*60*12, now)
+ f.close()
+ LOG.info("Done computing ping results")
+ self.lastCalculation = now
+
class PingGenerator:
"""DOCDOC"""
#XXXX008 add abstract functions.