[Author Prev][Author Next][Thread Prev][Thread Next][Author Index][Thread Index]

[minion-cvs] Refactor and rewrite echolot-style one-hop ping result ...



Update of /home/minion/cvsroot/src/minion/lib/mixminion/server
In directory moria.mit.edu:/tmp/cvs-serv4929/lib/mixminion/server

Modified Files:
	Pinger.py 
Log Message:
Refactor and rewrite echolot-style one-hop ping result calculation. Still need results, 2-hop chain calc, etc

Index: Pinger.py
===================================================================
RCS file: /home/minion/cvsroot/src/minion/lib/mixminion/server/Pinger.py,v
retrieving revision 1.8
retrieving revision 1.9
diff -u -d -r1.8 -r1.9
--- Pinger.py	2 Dec 2004 23:39:02 -0000	1.8
+++ Pinger.py	7 Dec 2004 00:15:15 -0000	1.9
@@ -38,7 +38,8 @@
 
 from mixminion.Common import MixError, ceilDiv, createPrivateDir, \
      floorDiv, formatBase64, formatFnameDate, formatTime, IntervalSet, LOG, \
-     parseFnameDate, previousMidnight, readPickled, secureDelete, writePickled
+     parseFnameDate, previousMidnight, readPickled, secureDelete, \
+     succeedingMidnight, writePickled
 
 try:
     import sqlite
@@ -92,20 +93,23 @@
 if sys.version_info[:2] >= (2,2):
     exec _MERGE_ITERS_CODE
 
-WEIGHT_AGE_PERIOD = 24*60*60
-WEIGHT_AGE = [ 5, 10, 10, 10, 10, 9, 8, 5, 3, 2, 2, 1, 0, 0, 0, 0, 0 ]
-UPTIME_GRANULARITY = 24*60*60
-PING_GRANULARITY = 24*60*60
-
-class PingLog:
-    def __init__(self, connection):
-        self.connection = connection
-        self.cursor = connection.cursor()
-        self.lock = threading.RLock()
-        self.serverNames = {}
-        self.serverReliability = {}
-        self._createAllTables()
+class PingerIntervalSchedule:
+    """DOCDOC -- defines a set of intervals in time."""
+    def __init__(self):
+        pass
+    def getIntervalContaining(self, t):
+        p = previousMidnight(t)
+        return p, succeedingMidnight(p)
+    def getIntervals(self, startAt, endAt):
+        r = []
+        t = previousMidnight(startAt)
+        while t < endAt:
+            n = succeedingMidnight(t)
+            r.append((t, n))
+            t = n
+        return r
 
+class _SQLiteMixin:
     def _objectExists(self, name, objType):
         # hold lock.
         self.cursor.execute(
@@ -145,10 +149,28 @@
         self.cursor.execute(stmt)
         self.connection.commit()
 
+    def _time(self, t=None):
+        return long(t or time.time())
+
+class PingLog(_SQLiteMixin):
+    def __init__(self, connection):
+        self.connection = connection
+        self.cursor = connection.cursor()
+        self.lock = threading.RLock()
+        self.serverNames = {}
+        self.serverReliability = {}
+        self.uptimeIntervals = PingerIntervalSchedule()
+        self.pingIntervals = PingerIntervalSchedule()
+        self._createAllTables()
+
+
     def _createAllTables(self):
         cur = self.cursor
         self.lock.acquire()
         try:
+            # FFFF This is terrible DB design.  It's not normalized by
+            # FFFF any stretch of the imagination, it doesn't have enough
+            # FFFF constraints, etc etc.
             # Raw data
             self._createTable("lifetimes",
                               [("up",       "time", "not null"),
@@ -171,14 +193,26 @@
                                ("end",   "time", "not null"),
                                ("name",  "varchar", "not null"),
                                ("uptime", "float", "not null")])
-            self._createTable("echolotOneHops",#XXXX008 NOT RIGHT!
-                              [("servername",  "time", "not null"),
-                               ("at",          "time", "not null"),
+            self._createTable("echolotOneHopResults",
+                              [("servername",  "varchar", "not null"),
+                               ("startAt",     "time",    "not null"),
+                               ("endAt",       "varchar", "not null"),
+                               ("nSent",       "integer", "not null"),
+                               ("nReceived",   "integer", "not null"),
                                ("latency",     "integer", "not null"),
-                               ("wreliability","float", "not null")])
+                               ("wsent",       "float",   "not null"),
+                               ("wreceived",   "float",   "not null"),
+                               ("reliability", "float",   "not null")])
+            self._createTable("echolotCurrentOneHopResults",
+                              [("servername",  "varchar", "not null"),
+                               ("at",          "time",    "not null"),
+                               ("latency",     "integer", "not null"),
+                               ("reliability", "float",   "not null")])
 
             self._createIndex("lifetimesUp", "lifetimes", ["up"])
             self._createIndex("pingsHash",   "pings", ["hash"], unique=1)
+            self._createIndex("pingsPathSR", "pings",
+                              ["path", "sentat", "received"])
             # ???? what else on pings?
             self._createIndex("connectsAt", "connects", ["at"])
         finally:
@@ -193,11 +227,11 @@
         for name, in res:
             self.serverNames[name] = 1
 
-        cur.execute("SELECT servername, MAX(at), wreliablity FROM"
+        cur.execute("SELECT servername, MAX(at), reliablity FROM"
                     " echolotOneHops")
         res = cur.fetchall()
         for name,_,rel in res:
-            self.reliability[name]=rel
+            self.serverReliability[name]=rel
 
     def _addServer(self, name):
         # hold lock.
@@ -242,9 +276,6 @@
         finally:
             self.lock.release()
 
-    def _time(self, t=None):
-        return long(t or time.time())
-
     _STARTUP = "INSERT INTO lifetimes (up, stillup, shutdown) VALUES (%s,%s, 0)"
     def startup(self,now=None):
         self._startTime = now = self._time(now)
@@ -301,7 +332,7 @@
         elif n > 1:
             LOG.warn("Received ping with multiple hash entries!")
 
-    def calculateUptimes(self, startTime, endTime):
+    def _calculateUptimes(self, startTime, endTime):
         cur = self.cursor
         self.lock.acquire()
 
@@ -365,6 +396,10 @@
         finally:
             self.lock.release()
 
+    def calculateUptimes(self, startAt, endAt):
+        for s, e in self.uptimeIntervals.getIntervals(startAt, endAt):
+            self._calculateUptimes(s, e)
+
     def getUptimes(self, startAt, endAt):
         """DODOC: uptimes for all servers overlapping [startAt, endAt],
            as mapping from (start,end) to nickname to fraction.
@@ -382,64 +417,119 @@
         finally:
             self.lock.release()
 
-    def _calculateOneHopResults(self, serverName, startTime, now=None):
+    _WEIGHT_AGE_PERIOD = 24*60*60
+    _WEIGHT_AGE = [ 1, 2, 2, 3, 5, 8, 9, 10, 10, 10, 10, 5 ]
+    _PING_GRANULARITY = 24*60*60
+    def _calculateOneHopResults(self, serverName, now=None):
         # hold lock; commit when done.
         cur = self.cursor
-        GRANULARITY = PING_GRANULARITY
         if now is None:
             now = time.time()
-        nPeriods = ceilDiv(now-startTime, GRANULARITY)
+        horizon_size = self._WEIGHT_AGE_PERIOD * len(self._WEIGHT_AGE)
+        startTime =self.pingIntervals.getIntervalContaining(now-horizon_size)[0]
+        nPeriods = len(self.pingIntervals.getIntervals(startTime, now))
+
+        # 1. Compute latencies and number of pings sent in each period.
+        #    We need to learn these first so we can tell the percentile
+        #    of each ping's latency.
+        dailyLatencies = [[] for _ in xrange(nPeriods)]
         nSent = [0]*nPeriods
-        nReceived = [0]*nPeriods
-        totalWeights = 0.0
-        totalWeighted = 0.0
-        perTotalWeights = [0]*nPeriods
-        perTotalWeighted = [0]*nPeriods
-        allLatentcies = []
         nPings = 0
         cur.execute("SELECT sentat, received FROM pings WHERE path = %s"
                     " AND sentat >= %s AND sentat <= %s",
                     (serverName, startTime, now ))
         for sent,received in cur:
+            pIdx = floorDiv(sent-startTime, self._PING_GRANULARITY)
+            nSent[pIdx] += 1
             nPings += 1
             if received:
-                allLatencies.append(received-sent)
+                dailyLatencies[pIdx].append(received-sent)
+
+        dailyMedianLatency = []
+        allLatencies = []
+        for d in dailyLatencies:
+            d.sort()
+            if d:
+                dailyMedianLatency.append(d[floorDiv(len(d), 2)])
+            else:
+                dailyMedianLatency.append(0)
+            allLatencies.extend(d)
+            del d
+        del dailyLatencies
         allLatencies.sort()
 
+        # 2. Compute the number of pings actually received each day,
+        #    and the number of pings received each day weighted by
+        #    apparent-latency percentile.
+        nReceived = [0]*nPeriods
+        perTotalWeights = [0]*nPeriods
+        perTotalWeighted = [0]*nPeriods
         cur.execute("SELECT sentat, received FROM pings WHERE path = %s"
                     " AND sentat >= %s AND sentat <= %s",
-                    (serverName, startTime, now ))
+                    (serverName, startTime, now))
         for sent,received in cur:
-            pIdx = floorDiv(sent-startTime, GRANULARITY)
-            nSent[pIdx] += 1
+            pIdx = floorDiv(sent-startTime, self._PING_GRANULARITY)
             if received:
                 nReceived[pIdx] += 1
-                w2 = 1
+                w = 1.0
             else:
                 mod_age = (now-sent-15*60)*0.8
-                w2 = bisect.bisect_left(allLatencies, mod_age)/float(nPings)
-
-            if pIdx < len(WEIGHT_AGE):
-                w1 = WEIGHT_AGE[nPeriods-pIdx-1]
-            else:
-                w1 = 0
+                w = bisect.bisect_left(allLatencies, mod_age)/float(nPings)
 
-            perTotalWeights[pIdx] += w1*w2
+            perTotalWeights[pIdx] += w
             if received:
-                perTotalWeighted[pIdx] += w1*w2
+                perTotalWeighted[pIdx] += w
 
-        allLatencies.sort()
+        # 2b. Write per-day results into the DB.
+        intervals = self.pingIntervals.getIntervals(startTime, now)
+        for pIdx in xrange(len(intervals)):
+            s, e = intervals[pIdx]
+            latent = dailyMedianLatency[pIdx]
+            sent = nSent[pIdx]
+            rcvd = nReceived[pIdx]
+            wsent = perTotalWeights[pIdx]
+            wrcvd = perTotalWeighted[pIdx]
+            if wrcvd:
+                rel = wsent / wrcvd
+            else:
+                rel = 0.0
+            cur.execute(
+                "INSERT INTO echolotOneHopResults "
+                "(servername, startAt, endAt, nSent, nReceived, latency, "
+                "wsent, wreceived, reliability) "
+                "VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s)",
+                (serverName, self._time(s), self._time(e), sent, rcvd,
+                 latent, wsent, wrcvd, rel))
+
+        # 3. Write current overall results into the DB.
         if allLatencies:
-            latency = allLatencies[floorDiv(len(allLatencies),2)]
+            latent = allLatencies[floorDiv(len(allLatencies),2)]
         else:
-            latency = 0
-
-        reliability = reduce(operator.add, perTotalWeighted)/float(
-            reduce(operator.add, perTotalWeights))
+            latent = 0
+        wsent = wreceived = 0.0
+        for s, r, w in zip(perTotalWeights, perTotalWeighted, self._WEIGHT_AGE):
+            wsent += s*w
+            wrcvd += r*w
+        if wrcvd:
+            rel = wsent / wrcvd
+        else:
+            rel = 0.0
+        cur.execute(
+            "INSERT INTO echolotCurrentOneHopResults "
+            "(servername, at, latency, reliability) VALUES (%s, %s, %s, %s)",
+            (serverName, self._time(now), latent, rel))
+        self.serverReliability[serverName] = rel
 
-        cur.execute("INSERT INTO echolotOneHops (servername, at, latency,"
-                    " wreliability) VALUES (%s, %s, %s, %s)",
-                    (serverName, now, latency, reliability))
+    def calculateOneHopResults(self, now=None):
+        self.lock.acquire()
+        try:
+            if now is None:
+                now = time.time()
+            for s in self.serverNames.keys():
+                self._calculateOneHopResults(s,now)
+            self.connection.commit()
+        finally:
+            self.lock.release()
 
     def _calculateChainCounts(self, startAt, endAt, path):
         # hold lock.
@@ -474,10 +564,6 @@
         return (nSent < 3 and nReceived == 0) or (
             product is not None and frac <= product*0.3)
 
-    def calculateDailyResults(self, server):
-        #XXXX
-        pass
-
 class PingGenerator:
     """DOCDOC"""
     #XXXX008 add abstract functions.