[Author Prev][Author Next][Thread Prev][Thread Next][Author Index][Thread Index]
[minion-cvs] Refactor and rewrite echolot-style one-hop ping result ...
Update of /home/minion/cvsroot/src/minion/lib/mixminion/server
In directory moria.mit.edu:/tmp/cvs-serv4929/lib/mixminion/server
Modified Files:
Pinger.py
Log Message:
Refactor and rewrite echolot-style one-hop ping result calculation. Still need results, 2-hop chain calc, etc
Index: Pinger.py
===================================================================
RCS file: /home/minion/cvsroot/src/minion/lib/mixminion/server/Pinger.py,v
retrieving revision 1.8
retrieving revision 1.9
diff -u -d -r1.8 -r1.9
--- Pinger.py 2 Dec 2004 23:39:02 -0000 1.8
+++ Pinger.py 7 Dec 2004 00:15:15 -0000 1.9
@@ -38,7 +38,8 @@
from mixminion.Common import MixError, ceilDiv, createPrivateDir, \
floorDiv, formatBase64, formatFnameDate, formatTime, IntervalSet, LOG, \
- parseFnameDate, previousMidnight, readPickled, secureDelete, writePickled
+ parseFnameDate, previousMidnight, readPickled, secureDelete, \
+ succeedingMidnight, writePickled
try:
import sqlite
@@ -92,20 +93,23 @@
if sys.version_info[:2] >= (2,2):
exec _MERGE_ITERS_CODE
-WEIGHT_AGE_PERIOD = 24*60*60
-WEIGHT_AGE = [ 5, 10, 10, 10, 10, 9, 8, 5, 3, 2, 2, 1, 0, 0, 0, 0, 0 ]
-UPTIME_GRANULARITY = 24*60*60
-PING_GRANULARITY = 24*60*60
-
-class PingLog:
- def __init__(self, connection):
- self.connection = connection
- self.cursor = connection.cursor()
- self.lock = threading.RLock()
- self.serverNames = {}
- self.serverReliability = {}
- self._createAllTables()
+class PingerIntervalSchedule:
+ """DOCDOC -- defines a set of intervals in time."""
+ def __init__(self):
+ pass
+ def getIntervalContaining(self, t):
+ p = previousMidnight(t)
+ return p, succeedingMidnight(p)
+ def getIntervals(self, startAt, endAt):
+ r = []
+ t = previousMidnight(startAt)
+ while t < endAt:
+ n = succeedingMidnight(t)
+ r.append((t, n))
+ t = n
+ return r
+class _SQLiteMixin:
def _objectExists(self, name, objType):
# hold lock.
self.cursor.execute(
@@ -145,10 +149,28 @@
self.cursor.execute(stmt)
self.connection.commit()
+ def _time(self, t=None):
+ return long(t or time.time())
+
+class PingLog(_SQLiteMixin):
+ def __init__(self, connection):
+ self.connection = connection
+ self.cursor = connection.cursor()
+ self.lock = threading.RLock()
+ self.serverNames = {}
+ self.serverReliability = {}
+ self.uptimeIntervals = PingerIntervalSchedule()
+ self.pingIntervals = PingerIntervalSchedule()
+ self._createAllTables()
+
+
def _createAllTables(self):
cur = self.cursor
self.lock.acquire()
try:
+ # FFFF This is terrible DB design. It's not normalized by
+ # FFFF any stretch of the imagination, it doesn't have enough
+ # FFFF constraints, etc etc.
# Raw data
self._createTable("lifetimes",
[("up", "time", "not null"),
@@ -171,14 +193,26 @@
("end", "time", "not null"),
("name", "varchar", "not null"),
("uptime", "float", "not null")])
- self._createTable("echolotOneHops",#XXXX008 NOT RIGHT!
- [("servername", "time", "not null"),
- ("at", "time", "not null"),
+ self._createTable("echolotOneHopResults",
+ [("servername", "varchar", "not null"),
+ ("startAt", "time", "not null"),
+ ("endAt", "varchar", "not null"),
+ ("nSent", "integer", "not null"),
+ ("nReceived", "integer", "not null"),
("latency", "integer", "not null"),
- ("wreliability","float", "not null")])
+ ("wsent", "float", "not null"),
+ ("wreceived", "float", "not null"),
+ ("reliability", "float", "not null")])
+ self._createTable("echolotCurrentOneHopResults",
+ [("servername", "varchar", "not null"),
+ ("at", "time", "not null"),
+ ("latency", "integer", "not null"),
+ ("reliability", "float", "not null")])
self._createIndex("lifetimesUp", "lifetimes", ["up"])
self._createIndex("pingsHash", "pings", ["hash"], unique=1)
+ self._createIndex("pingsPathSR", "pings",
+ ["path", "sentat", "received"])
# ???? what else on pings?
self._createIndex("connectsAt", "connects", ["at"])
finally:
@@ -193,11 +227,11 @@
for name, in res:
self.serverNames[name] = 1
- cur.execute("SELECT servername, MAX(at), wreliablity FROM"
+ cur.execute("SELECT servername, MAX(at), reliablity FROM"
" echolotOneHops")
res = cur.fetchall()
for name,_,rel in res:
- self.reliability[name]=rel
+ self.serverReliability[name]=rel
def _addServer(self, name):
# hold lock.
@@ -242,9 +276,6 @@
finally:
self.lock.release()
- def _time(self, t=None):
- return long(t or time.time())
-
_STARTUP = "INSERT INTO lifetimes (up, stillup, shutdown) VALUES (%s,%s, 0)"
def startup(self,now=None):
self._startTime = now = self._time(now)
@@ -301,7 +332,7 @@
elif n > 1:
LOG.warn("Received ping with multiple hash entries!")
- def calculateUptimes(self, startTime, endTime):
+ def _calculateUptimes(self, startTime, endTime):
cur = self.cursor
self.lock.acquire()
@@ -365,6 +396,10 @@
finally:
self.lock.release()
+ def calculateUptimes(self, startAt, endAt):
+ for s, e in self.uptimeIntervals.getIntervals(startAt, endAt):
+ self._calculateUptimes(s, e)
+
def getUptimes(self, startAt, endAt):
"""DODOC: uptimes for all servers overlapping [startAt, endAt],
as mapping from (start,end) to nickname to fraction.
@@ -382,64 +417,119 @@
finally:
self.lock.release()
- def _calculateOneHopResults(self, serverName, startTime, now=None):
+ _WEIGHT_AGE_PERIOD = 24*60*60
+ _WEIGHT_AGE = [ 1, 2, 2, 3, 5, 8, 9, 10, 10, 10, 10, 5 ]
+ _PING_GRANULARITY = 24*60*60
+ def _calculateOneHopResults(self, serverName, now=None):
# hold lock; commit when done.
cur = self.cursor
- GRANULARITY = PING_GRANULARITY
if now is None:
now = time.time()
- nPeriods = ceilDiv(now-startTime, GRANULARITY)
+ horizon_size = self._WEIGHT_AGE_PERIOD * len(self._WEIGHT_AGE)
+ startTime =self.pingIntervals.getIntervalContaining(now-horizon_size)[0]
+ nPeriods = len(self.pingIntervals.getIntervals(startTime, now))
+
+ # 1. Compute latencies and number of pings sent in each period.
+ # We need to learn these first so we can tell the percentile
+ # of each ping's latency.
+ dailyLatencies = [[] for _ in xrange(nPeriods)]
nSent = [0]*nPeriods
- nReceived = [0]*nPeriods
- totalWeights = 0.0
- totalWeighted = 0.0
- perTotalWeights = [0]*nPeriods
- perTotalWeighted = [0]*nPeriods
- allLatentcies = []
nPings = 0
cur.execute("SELECT sentat, received FROM pings WHERE path = %s"
" AND sentat >= %s AND sentat <= %s",
(serverName, startTime, now ))
for sent,received in cur:
+ pIdx = floorDiv(sent-startTime, self._PING_GRANULARITY)
+ nSent[pIdx] += 1
nPings += 1
if received:
- allLatencies.append(received-sent)
+ dailyLatencies[pIdx].append(received-sent)
+
+ dailyMedianLatency = []
+ allLatencies = []
+ for d in dailyLatencies:
+ d.sort()
+ if d:
+ dailyMedianLatency.append(d[floorDiv(len(d), 2)])
+ else:
+ dailyMedianLatency.append(0)
+ allLatencies.extend(d)
+ del d
+ del dailyLatencies
allLatencies.sort()
+ # 2. Compute the number of pings actually received each day,
+ # and the number of pings received each day weighted by
+ # apparent-latency percentile.
+ nReceived = [0]*nPeriods
+ perTotalWeights = [0]*nPeriods
+ perTotalWeighted = [0]*nPeriods
cur.execute("SELECT sentat, received FROM pings WHERE path = %s"
" AND sentat >= %s AND sentat <= %s",
- (serverName, startTime, now ))
+ (serverName, startTime, now))
for sent,received in cur:
- pIdx = floorDiv(sent-startTime, GRANULARITY)
- nSent[pIdx] += 1
+ pIdx = floorDiv(sent-startTime, self._PING_GRANULARITY)
if received:
nReceived[pIdx] += 1
- w2 = 1
+ w = 1.0
else:
mod_age = (now-sent-15*60)*0.8
- w2 = bisect.bisect_left(allLatencies, mod_age)/float(nPings)
-
- if pIdx < len(WEIGHT_AGE):
- w1 = WEIGHT_AGE[nPeriods-pIdx-1]
- else:
- w1 = 0
+ w = bisect.bisect_left(allLatencies, mod_age)/float(nPings)
- perTotalWeights[pIdx] += w1*w2
+ perTotalWeights[pIdx] += w
if received:
- perTotalWeighted[pIdx] += w1*w2
+ perTotalWeighted[pIdx] += w
- allLatencies.sort()
+ # 2b. Write per-day results into the DB.
+ intervals = self.pingIntervals.getIntervals(startTime, now)
+ for pIdx in xrange(len(intervals)):
+ s, e = intervals[pIdx]
+ latent = dailyMedianLatency[pIdx]
+ sent = nSent[pIdx]
+ rcvd = nReceived[pIdx]
+ wsent = perTotalWeights[pIdx]
+ wrcvd = perTotalWeighted[pIdx]
+ if wrcvd:
+ rel = wsent / wrcvd
+ else:
+ rel = 0.0
+ cur.execute(
+ "INSERT INTO echolotOneHopResults "
+ "(servername, startAt, endAt, nSent, nReceived, latency, "
+ "wsent, wreceived, reliability) "
+ "VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s)",
+ (serverName, self._time(s), self._time(e), sent, rcvd,
+ latent, wsent, wrcvd, rel))
+
+ # 3. Write current overall results into the DB.
if allLatencies:
- latency = allLatencies[floorDiv(len(allLatencies),2)]
+ latent = allLatencies[floorDiv(len(allLatencies),2)]
else:
- latency = 0
-
- reliability = reduce(operator.add, perTotalWeighted)/float(
- reduce(operator.add, perTotalWeights))
+ latent = 0
+ wsent = wreceived = 0.0
+ for s, r, w in zip(perTotalWeights, perTotalWeighted, self._WEIGHT_AGE):
+ wsent += s*w
+ wrcvd += r*w
+ if wrcvd:
+ rel = wsent / wrcvd
+ else:
+ rel = 0.0
+ cur.execute(
+ "INSERT INTO echolotCurrentOneHopResults "
+ "(servername, at, latency, reliability) VALUES (%s, %s, %s, %s)",
+ (serverName, self._time(now), latent, rel))
+ self.serverReliability[serverName] = rel
- cur.execute("INSERT INTO echolotOneHops (servername, at, latency,"
- " wreliability) VALUES (%s, %s, %s, %s)",
- (serverName, now, latency, reliability))
+ def calculateOneHopResults(self, now=None):
+ self.lock.acquire()
+ try:
+ if now is None:
+ now = time.time()
+ for s in self.serverNames.keys():
+ self._calculateOneHopResults(s,now)
+ self.connection.commit()
+ finally:
+ self.lock.release()
def _calculateChainCounts(self, startAt, endAt, path):
# hold lock.
@@ -474,10 +564,6 @@
return (nSent < 3 and nReceived == 0) or (
product is not None and frac <= product*0.3)
- def calculateDailyResults(self, server):
- #XXXX
- pass
-
class PingGenerator:
"""DOCDOC"""
#XXXX008 add abstract functions.