[Author Prev][Author Next][Thread Prev][Thread Next][Author Index][Thread Index]

[minion-cvs] More testing and debugging on pinging. automate a touch...



Update of /home/minion/cvsroot/src/minion/lib/mixminion/server
In directory moria.mit.edu:/tmp/cvs-serv16676/lib/mixminion/server

Modified Files:
	Pinger.py 
Log Message:
More testing and debugging on pinging. automate a touch better.  Add a mechanism to dump output.  Next steps: get concurrency right (deep hurting)

Index: Pinger.py
===================================================================
RCS file: /home/minion/cvsroot/src/minion/lib/mixminion/server/Pinger.py,v
retrieving revision 1.11
retrieving revision 1.12
diff -u -d -r1.11 -r1.12
--- Pinger.py	7 Dec 2004 01:44:31 -0000	1.11
+++ Pinger.py	11 Dec 2004 02:48:54 -0000	1.12
@@ -35,7 +35,7 @@
 import mixminion.server.PacketHandler
 import mixminion.server.MMTPServer
 
-from mixminion.Common import MixError, ceilDiv, createPrivateDir, \
+from mixminion.Common import MixError, AtomicFile, ceilDiv, createPrivateDir, \
      floorDiv, formatBase64, formatFnameDate, formatTime, IntervalSet, LOG, \
      parseFnameDate, previousMidnight, readPickled, secureDelete, \
      succeedingMidnight, writePickled
@@ -57,40 +57,6 @@
               'varchar' : 'varchar',
               }
 
-
-_MERGE_ITERS_CODE = """
-_MERGE_DONE = object()
-def _wrapNext(next,DONE=_MERGE_DONE):
-    def fn():
-        try:
-            return next()
-        except StopIteration:
-            return DONE
-    return fn
-def _mergeIters(iterable1, iterable2):
-    DONE = _MERGE_DONE
-    n1 = _wrapNext(iter(iterable1).next)
-    n2 = _wrapNext(iter(iterable2).next)
-    peek1 = n1()
-    peek2 = n2()
-    while peek1 is not DONE and peek2 is not DONE:
-        if peek1 <= peek2:
-            yield peek1
-            peek1 = n1()
-        else:
-            yield peek2
-            peek2 = n2()
-    while peek1 is not DONE:
-        yield peek1
-        peek1 = n1()
-    while peek2 is not DONE:
-        yield peek2
-        peek2 = n2()
-"""
-
-if sys.version_info[:2] >= (2,2):
-    exec _MERGE_ITERS_CODE
-
 class PingerIntervalSchedule:
     """DOCDOC -- defines a set of intervals in time."""
     def __init__(self):
@@ -159,8 +125,8 @@
     def _getInsertOrUpdateFn(self, table, keyCols, valCols):
         update = "UPDATE %s SET %s WHERE %s" % (
             table,
-            ", ".join(["%s = %%s" % k for k in keyCols]),
-            " AND ".join(["%s = %%s" % v for v in valCols]))
+            ", ".join(["%s = %%s" % k for k in valCols]),
+            " AND ".join(["%s = %%s" % v for v in keyCols]))
         insert = "INSERT INTO %s (%s, %s) VALUES (%s)" % (
             table,
             ", ".join(keyCols),
@@ -169,13 +135,12 @@
         def fn(keyVals, valVals):
             assert len(keyVals) == len(keyCols)
             assert len(valVals) == len(valCols)
-            vals = keyVals+valVals
 
-            self.cursor.execute(update, vals)
+            self.cursor.execute(update, (valVals+keyVals))
             if self.cursor.rowcount > 0:
                 return
 
-            self.cursor.execute(insert, vals)
+            self.cursor.execute(insert, (keyVals+valVals))
         return fn
 
 class PingLog(_SQLiteMixin):
@@ -189,6 +154,8 @@
         self.pingIntervals = PingerIntervalSchedule()
         self.brokenChains = {}
         self.interestingChains = {}
+        self._startTime = None
+        self._lastRecalculation = 0
         self._createAllTables()
         self._loadServers()
 
@@ -231,12 +198,12 @@
                                ("wreceived",   "float",   "not null"),
                                ("reliability", "float",   "not null")])
             self._createTable("echolotCurrentOneHopResults",
-                              [("servername",  "varchar", "not null"),
+                              [("servername",  "varchar", "unique not null"),
                                ("at",          "time",    "not null"),
                                ("latency",     "integer", "not null"),
                                ("reliability", "float",   "not null")])
             self._createTable("echolotCurrentTwoHopResults",
-                              [("path",        "varchar", "not null"),
+                              [("path",        "varchar", "unique not null"),
                                ("at",          "time",    "not null"),
                                ("nSent",       "integer", "not null"),
                                ("nReceived",   "integer", "not null"),
@@ -248,7 +215,8 @@
             self._createIndex("pingsPathSR", "pings",
                               ["path", "sentat", "received"])
             self._createIndex("connectsAt", "connects", ["at"])
-            # indices on echolot*results
+            self._createIndex("uptimesNS", "uptimes", ["name", "start"])
+            # indices on echolot*results, uptimes.
 
             self._setUptime = self._getInsertOrUpdateFn(
                 "uptimes", ["start", "end", "name"], ["uptime"])
@@ -294,7 +262,7 @@
             if i:
                 interesting[p]=1
         self.isBroken = broken
-        self.isInterestin = interesting
+        self.isInteresting = interesting
 
     def updateServers(self, names):
         self.lock.acquire()
@@ -358,10 +326,11 @@
         now = self._time(now)
         self._execute(self._SHUTDOWN, (now, now, self._startTime))
 
-    _HEARTBEAT = "UPDATE lifetimes SET stillup = %s WHERE up = %s"
+    _HEARTBEAT = "UPDATE lifetimes SET stillup = %s WHERE up = %s AND stillup < %s"
     def heartbeat(self, now=None):
         if self._startTime is None: self.startup()
-        self._execute(self._HEARTBEAT, (self._time(now), self._startTime))
+        now = self._time(now)
+        self._execute(self._HEARTBEAT, (now, self._startTime, now))
 
     def rotated(self):
         pass
@@ -377,8 +346,8 @@
         finally:
             self.lock.release()
 
-    def connectFailed(self, nickname):
-        self.connected(nickname, 0)
+    def connectFailed(self, nickname, now=None):
+        self.connected(nickname, success=0, now=now)
 
     _QUEUED_PING = ("INSERT INTO pings (hash, path, sentat, received)"
                     "VALUES (%s,%s,%s,%s)")
@@ -403,15 +372,13 @@
         elif n > 1:
             LOG.warn("Received ping with multiple hash entries!")
 
-    def _calculateUptimes(self, startTime, endTime):
+    def _calculateUptimes(self, startTime, endTime, now=None):
         cur = self.cursor
         self.lock.acquire()
         try:
-            cur.execute("DELETE FROM uptimes WHERE start = %s AND end = %s",
-                        startTime, endTime)
-
             # First, calculate my own uptime.
-            self.heartbeat()
+            if now is None: now = time.time()
+            self.heartbeat(now)
 
             timespan = IntervalSet( [(startTime, endTime)] )
 
@@ -428,10 +395,6 @@
                 (self._time(startTime), self._time(endTime), "<self>"),
                 (fracUptime,))
 
-            #cur.execute("INSERT INTO uptimes (start, end, name, uptime)"
-            #            " VALUES (%s, %s, '<self>', %s)",
-            #            (self._time(startTime), self._time(endTime),
-
             # Okay, now everybody else.
             for s in self.serverNames.keys():
                 cur.execute("SELECT at, success FROM connects"
@@ -441,14 +404,12 @@
 
                 lastStatus = None
                 lastTime = None
-                times = [ 0, 0] # uptime, downtime
+                times = [ 0, 0 ] # uptime, downtime
                 for at, success in cur:
                     assert success in (0,1)
                     upAt, downAt = myIntervals.getIntervalContaining(at)
                     if upAt == None:
-                        # Event at edge of interval
-                        #XXXX008 warn better, or not at all.
-                        LOG.warn("Event out of bounds")
+                        # Event outside edge of interval.
                         continue
                     if lastTime is None or upAt > lastTime:
                         lastTime = upAt
@@ -457,6 +418,8 @@
                         t = (at-lastTime)/2.0
                         times[success] += t
                         times[lastStatus] += t
+                    lastStatus = success
+                    lastTime = at
 
                 if times == [0,0]:
                     continue
@@ -466,9 +429,10 @@
         finally:
             self.lock.release()
 
-    def calculateUptimes(self, startAt, endAt):
+    def calculateUptimes(self, startAt, endAt, now=None):
+        if now is None: now = time.time()
         for s, e in self.uptimeIntervals.getIntervals(startAt, endAt):
-            self._calculateUptimes(s, e)
+            self._calculateUptimes(s, e, now=now)
 
     def getUptimes(self, startAt, endAt):
         """DODOC: uptimes for all servers overlapping [startAt, endAt],
@@ -487,17 +451,39 @@
         finally:
             self.lock.release()
 
+    def _roundLatency(self, latency):
+        """Using a median latency can leak the fact that a message was a
+           ping. DOCDOC"""
+        for cutoff, q in [
+            (60, 5), (10*60, 60), (30*60, 2*60),
+            (60*60, 5*60), (3*60*60, 10*60), (12*60*60, 20*60),
+            (24*60*60, 30*60) ]:
+            if latency < cutoff:
+                quantum = q
+                break
+        else:
+            quantum = 60*60
+
+        return int( float(latency)/quantum + 0.5 ) * quantum
+
     _WEIGHT_AGE_PERIOD = 24*60*60
     _WEIGHT_AGE = [ 1, 2, 2, 3, 5, 8, 9, 10, 10, 10, 10, 5 ]
     _PING_GRANULARITY = 24*60*60
-    def _calculateOneHopResults(self, serverName, now=None):
+    def _calculateOneHopResults(self, serverName, startTime, endTime,
+                                now=None, calculateOverallResults=1):
         # hold lock; commit when done.
         cur = self.cursor
         if now is None:
             now = time.time()
-        horizon_size = self._WEIGHT_AGE_PERIOD * len(self._WEIGHT_AGE)
-        startTime =self.pingIntervals.getIntervalContaining(now-horizon_size)[0]
-        nPeriods = len(self.pingIntervals.getIntervals(startTime, now))
+        self._addServer(serverName)
+        if calculateOverallResults:
+            startTime = min(startTime,
+                       now - (len(self._WEIGHT_AGE)*self._WEIGHT_AGE_PERIOD))
+            endTime = max(endTime, now)
+        intervals = self.pingIntervals.getIntervals(startTime, endTime)
+        nPeriods = len(intervals)
+        startTime = intervals[0][0]
+        endTime = intervals[-1][1]
 
         # 1. Compute latencies and number of pings sent in each period.
         #    We need to learn these first so we can tell the percentile
@@ -507,7 +493,7 @@
         nPings = 0
         cur.execute("SELECT sentat, received FROM pings WHERE path = %s"
                     " AND sentat >= %s AND sentat <= %s",
-                    (serverName, startTime, now ))
+                    (serverName, startTime, endTime))
         for sent,received in cur:
             pIdx = floorDiv(sent-startTime, self._PING_GRANULARITY)
             nSent[pIdx] += 1
@@ -527,6 +513,10 @@
             del d
         del dailyLatencies
         allLatencies.sort()
+        #if allLatencies:
+        #    LOG.warn("%s pings in %s intervals. Median latency is %s seconds",
+        #             nPings, nPeriods,
+        #             allLatencies[floorDiv(len(allLatencies),2)])
 
         # 2. Compute the number of pings actually received each day,
         #    and the number of pings received each day weighted by
@@ -536,7 +526,7 @@
         perTotalWeighted = [0]*nPeriods
         cur.execute("SELECT sentat, received FROM pings WHERE path = %s"
                     " AND sentat >= %s AND sentat <= %s",
-                    (serverName, startTime, now))
+                    (serverName, startTime, endTime))
         for sent,received in cur:
             pIdx = floorDiv(sent-startTime, self._PING_GRANULARITY)
             if received:
@@ -545,38 +535,49 @@
             else:
                 mod_age = (now-sent-15*60)*0.8
                 w = bisect.bisect_left(allLatencies, mod_age)/float(nPings)
+                #LOG.warn("Percentile is %s.", w)
 
             perTotalWeights[pIdx] += w
             if received:
                 perTotalWeighted[pIdx] += w
 
         # 2b. Write per-day results into the DB.
-        intervals = self.pingIntervals.getIntervals(startTime, now)
         for pIdx in xrange(len(intervals)):
             s, e = intervals[pIdx]
-            latent = dailyMedianLatency[pIdx]
+            latent = self._roundLatency(dailyMedianLatency[pIdx])
             sent = nSent[pIdx]
             rcvd = nReceived[pIdx]
             wsent = perTotalWeights[pIdx]
             wrcvd = perTotalWeighted[pIdx]
-            if wrcvd:
-                rel = wsent / wrcvd
+            if wsent:
+                rel = wrcvd / wsent
             else:
                 rel = 0.0
+            #if sent:
+            #    LOG.warn("Of pings sent on day %s, %s/%s were received. "
+            #             "rel=%s/%s=%s",
+            #             pIdx, rcvd, sent, wrcvd, wsent, rel)
             self._setOneHop(
                 (serverName, self._time(s), self._time(e)),
                 (sent, rcvd, latent, wsent, wrcvd, rel))
+
+        if not calculateOverallResults:
+            return
+
         # 3. Write current overall results into the DB.
         if allLatencies:
-            latent = allLatencies[floorDiv(len(allLatencies),2)]
+            latent = self._roundLatency(allLatencies[floorDiv(len(allLatencies),2)])
         else:
             latent = 0
         wsent = wrcvd = 0.0
+        nPeriods = len(self._WEIGHT_AGE)
+        perTotalWeights = perTotalWeights[-nPeriods:]
+        perTotalWeighted = perTotalWeighted[-nPeriods:]
         for s, r, w in zip(perTotalWeights, perTotalWeighted, self._WEIGHT_AGE):
             wsent += s*w
             wrcvd += r*w
-        if wrcvd:
-            rel = wsent / wrcvd
+        if wsent:
+            rel = wrcvd / wsent
         else:
             rel = 0.0
         self._setCurOneHop((serverName,), (self._time(now), latent, rel))
@@ -588,12 +589,14 @@
             if now is None:
                 now = time.time()
             for s in self.serverNames.keys():
-                self._calculateOneHopResults(s,now)
+                # For now, always calculate overall results.
+                self._calculateOneHopResults(s,now,now,now,
+                                             calculateOverallResults=1)
             self.connection.commit()
         finally:
             self.lock.release()
 
-    def _calculate2ChainStatus(self, since, path):
+    def _calculate2ChainStatus(self, since, path, now=None):
         # hold lock.
         cur = self.cursor
         cur.execute("SELECT count() FROM pings WHERE path = %s"
@@ -609,7 +612,7 @@
         try:
             rels = [ self.serverReliability[s] for s in servers ]
             product = reduce(operator.mul, rels)
-        except IndexError:
+        except KeyError:
             product = None
 
         if nSent == 0:
@@ -652,6 +655,92 @@
         finally:
             self.lock.release()
 
+    def dumpAllStatus(self,f,since,now=None):
+        self.lock.acquire()
+        try:
+            if now is None: now = time.time()
+            print >>f, "# List of all currently tracked servers."
+            print >>f, "KNOWN_SERVERS =",self.serverNames.keys()
+            cur = self.cursor
+
+            print >>f, "\n# Map from server to list of (period-start, period-end, fractional uptime"
+            print >>f, "SERVER_UPTIMES = {"
+            cur.execute("SELECT start,end,name,uptime FROM uptimes "
+                        "WHERE start >= %s AND end <= %s"
+                        "ORDER BY name, start", (since, now))
+            lastServer = "---"
+            for s,e,n,u in cur:
+                if n != lastServer:
+                    if lastServer != '---': print >>f, "   ],"
+                    lastServer = n
+                    print >>f, "   %r : [" % n
+                print >>f, "      (%s,%s,%.04f),"%(s,e,u)
+            if lastServer != '---': print >>f, "   ]"
+            print >>f, "}"
+
+            print >>f, """
+# Map from server name to list of (period-start, period-end, # pings sent,
+#      # of those pings received, median latency on those pings (sec),
+#      weighted reliability)"""
+            print >>f, "SERVER_DAILY_PING_STATUS = {"
+            cur.execute("SELECT servername,startAt,endAt,nSent,nReceived,"
+                        "  latency,reliability FROM echolotOneHopResults "
+                        "WHERE startat >= %s AND endat <= %s"
+                        "ORDER BY servername, startat", (since, now))
+            lastServer = "---"
+            for n,s,e,nS,nR,lat,r in cur:
+                if n != lastServer:
+                    if lastServer != '---': print >>f, "   ],"
+                    lastServer = n
+                    print >>f, "   %r : [" % n
+                print >>f, "      (%s,%s,%s,%s,%s,%.04f),"%(s,e,nS,nR,lat,r)
+            if lastServer != '---': print >>f, "   ]"
+            print >>f, "}"
+
+            print >>f, "\n# Map from server-name to current (avg latency, avg reliability)"
+            print >>f, "SERVER_CUR_PING_STATUS = {"
+            cur.execute("SELECT servername,latency,reliability FROM "
+                        "echolotCurrentOneHopResults")
+            for n,lat,r in cur:
+                print >>f, "   %r : (%s,%.04f)," %(n,lat,r)
+            print >>f, "}"
+
+            print >>f, "\n# Chains that we want to know more about"
+            print >>f, "INTERESTING_CHAINS = ["
+            cur.execute("SELECT path FROM echolotCurrentTwoHopResults "
+                        "WHERE interesting = 1")
+            for p, in cur:
+                print >>f, "   %r,"%p
+            print >>f, "]"
+
+            print >>f, "\n# Chains that are more unreliable than we'd expect"
+            print >>f, "BROKEN_CHAINS = ["
+            cur.execute("SELECT path FROM echolotCurrentTwoHopResults "
+                        "WHERE broken = 1")
+            for p, in cur:
+                print >>f, "   %r,"%p
+            print >>f, "]"
+            print >>f, "\n"
+
+        finally:
+            self.lock.release()
+
+    def calculateAll(self, outFname=None, now=None):
+        LOG.info("Computing ping results.")
+        LOG.info("Starting to compute server uptimes.")
+        self.calculateUptimes(now, now-24*60*60*12)
+        LOG.info("Starting to compute one-hop ping results")
+        self.calculateOneHopResults(now)
+        LOG.info("Starting to compute two-hop chain status")
+        self.calculateChainStatus(now)
+        if outFname:
+            LOG.info("Writing ping results to disk")
+            f = AtomicFile(outFname, 'w')
+            self.dumpAllStatus(f, now-24*60*60*12, now)
+            f.close()
+        LOG.info("Done computing ping results")
+        self.lastCalculation = now
+
 class PingGenerator:
     """DOCDOC"""
     #XXXX008 add abstract functions.