[Author Prev][Author Next][Thread Prev][Thread Next][Author Index][Thread Index]

[minion-cvs] Refactor 2-hop pings; add more utility functions for wo...



Update of /home/minion/cvsroot/src/minion/lib/mixminion/server
In directory moria.mit.edu:/tmp/cvs-serv7910/lib/mixminion/server

Modified Files:
	Pinger.py 
Log Message:
Refactor 2-hop pings; add more utility functions for working with sqlite

Index: Pinger.py
===================================================================
RCS file: /home/minion/cvsroot/src/minion/lib/mixminion/server/Pinger.py,v
retrieving revision 1.9
retrieving revision 1.10
diff -u -d -r1.9 -r1.10
--- Pinger.py	7 Dec 2004 00:15:15 -0000	1.9
+++ Pinger.py	7 Dec 2004 01:26:38 -0000	1.10
@@ -21,6 +21,7 @@
 import bisect
 import calendar
 import cPickle
+import operator
 import os
 import re
 import string
@@ -47,7 +48,6 @@
     sqlite = None
 
 KEEP_HISTORY_DAYS = 15
-USE_HISTORY_DAYS = 12
 HEARTBEAT_INTERVAL = 30*60
 ONE_DAY = 24*60*60
 
@@ -152,6 +152,34 @@
     def _time(self, t=None):
         return long(t or time.time())
 
+    def _bool(self, b):
+        if b:
+            return 1
+        else:
+            return 0
+
+    def _getInsertOrUpdateFn(self, table, keyCols, valCols):
+        update = "UPDATE %s SET %s WHERE %s" % (
+            table,
+            ", ".join(["%s = %%s" % k for k in keyCols]),
+            " AND ".join(["%s = %%s" % v for v in valCols]))
+        insert = "INSERT INTO %s (%s, %s) VALUES (%s)" % (
+            table,
+            ", ".join(keyCols),
+            ", ".join(valCols),
+            ", ".join(["%s"]*(len(valCols)+len(keyCols))))
+        def fn(keyVals, valVals):
+            assert len(keyVals) == len(keyCols)
+            assert len(valVals) == len(valCols)
+            vals = keyVals+valVals
+
+            self.cursor.execute(update, vals)
+            if self.cursor.rowcount > 0:
+                return
+
+            self.cursor.execute(insert, vals)
+        return fn
+
 class PingLog(_SQLiteMixin):
     def __init__(self, connection):
         self.connection = connection
@@ -161,8 +189,10 @@
         self.serverReliability = {}
         self.uptimeIntervals = PingerIntervalSchedule()
         self.pingIntervals = PingerIntervalSchedule()
+        self.brokenChains = {}
+        self.interestingChains = {}
         self._createAllTables()
-
+        self._loadServers()
 
     def _createAllTables(self):
         cur = self.cursor
@@ -175,7 +205,7 @@
             self._createTable("lifetimes",
                               [("up",       "time", "not null"),
                                ("stillup",  "time"),
-                               ("shutdown", "boolean")])
+                               ("shutdown", "time")])
             self._createTable("pings",
                               [("hash",     "varchar", "primary key"),
                                ("path",     "varchar", "not null"),
@@ -196,7 +226,7 @@
             self._createTable("echolotOneHopResults",
                               [("servername",  "varchar", "not null"),
                                ("startAt",     "time",    "not null"),
-                               ("endAt",       "varchar", "not null"),
+                               ("endAt",       "time",    "not null"),
                                ("nSent",       "integer", "not null"),
                                ("nReceived",   "integer", "not null"),
                                ("latency",     "integer", "not null"),
@@ -208,13 +238,36 @@
                                ("at",          "time",    "not null"),
                                ("latency",     "integer", "not null"),
                                ("reliability", "float",   "not null")])
+            self._createTable("echolotCurrentTwoHopResults",
+                              [("path",        "varchar", "not null"),
+                               ("at",          "time",    "not null"),
+                               ("nSent",       "integer", "not null"),
+                               ("nReceived",   "integer", "not null"),
+                               ("broken",      "boolean", "not null"),
+                               ("interesting", "boolean", "not null")])
 
             self._createIndex("lifetimesUp", "lifetimes", ["up"])
             self._createIndex("pingsHash",   "pings", ["hash"], unique=1)
             self._createIndex("pingsPathSR", "pings",
                               ["path", "sentat", "received"])
-            # ???? what else on pings?
             self._createIndex("connectsAt", "connects", ["at"])
+            # indices on echolot*results
+
+            self._setUptime = self._getInsertOrUpdateFn(
+                "uptimes", ["start", "end", "name"], ["uptime"])
+            self._setOneHop = self._getInsertOrUpdateFn(
+                "echolotOneHopResults",
+                ["servername", "startAt", "endAt"],
+                ["nSent", "nReceived", "latency", "wsent", "wreceived",
+                 "reliability"])
+            self._setCurOneHop = self._getInsertOrUpdateFn(
+                "echolotCurrentOneHopResults",
+                ["servername"],
+                ["at", "latency", "reliability"])
+            self._setTwoHop = self._getInsertOrUpdateFn(
+                "echolotCurrentTwoHopResults",
+                ["path"],
+                ["at", "nSent", "nReceived", "broken", "interesting"])
         finally:
             self.lock.release()
 
@@ -227,12 +280,33 @@
         for name, in res:
             self.serverNames[name] = 1
 
-        cur.execute("SELECT servername, MAX(at), reliablity FROM"
-                    " echolotOneHops")
+        cur.execute("SELECT servername, reliability FROM "
+                    "echolotCurrentOneHopResults")
         res = cur.fetchall()
-        for name,_,rel in res:
+        for name,rel in res:
             self.serverReliability[name]=rel
 
+        cur.execute("SELECT path, broken, interesting FROM "
+                    "echolotCurrentTwoHopResults WHERE interesting OR broken")
+        res = cur.fetchall()
+        broken = {}
+        interesting = {}
+        for p, b, i in res:
+            if b:
+                broken[p]=1
+            if i:
+                interesting[p]=1
+        self.isBroken = broken
+        self.isInterestin = interesting
+
+    def updateServers(self, names):
+        self.lock.acquire()
+        try:
+            for n in names:
+                self._addServer(n)
+        finally:
+            self.lock.release()
+
     def _addServer(self, name):
         # hold lock.
         name = name.lower()
@@ -302,7 +376,7 @@
         try:
             self._addServer(nickname)
             self._execute(self._CONNECTED,
-                      (self._time(now), nickname.lower(), success))
+                    (self._time(now), nickname.lower(), self._bool(success)))
         finally:
             self.lock.release()
 
@@ -335,7 +409,6 @@
     def _calculateUptimes(self, startTime, endTime):
         cur = self.cursor
         self.lock.acquire()
-
         try:
             cur.execute("DELETE FROM uptimes WHERE start = %s AND end = %s",
                         startTime, endTime)
@@ -353,11 +426,14 @@
                                         for start,end,shutdown in cur ])
             myIntervals *= timespan
             myUptime = myIntervals.spanLength()
+            fracUptime = float(myUptime)/(endTime-startTime)
+            self._setUptime(
+                (self._time(startTime), self._time(endTime), "<self>"),
+                (fracUptime,))
 
-            cur.execute("INSERT INTO uptimes (start, end, name, uptime)"
-                        " VALUES (%s, %s, '<self>', %s)",
-                        (self._time(startTime), self._time(endTime),
-                         float(myUptime)/(endTime-startTime)))
+            #cur.execute("INSERT INTO uptimes (start, end, name, uptime)"
+            #            " VALUES (%s, %s, '<self>', %s)",
+            #            (self._time(startTime), self._time(endTime),
 
             # Okay, now everybody else.
             for s in self.serverNames.keys():
@@ -374,7 +450,7 @@
                     upAt, downAt = myIntervals.getIntervalContaining(at)
                     if upAt == None:
                         # Event at edge of interval
-                        #XXXX008 warn better?
+                        #XXXX008 warn better, or not at all.
                         LOG.warn("Event out of bounds")
                         continue
                     if lastTime is None or upAt > lastTime:
@@ -388,10 +464,7 @@
                 if times == [0,0]:
                     continue
                 fraction = float(times[1])/(times[0]+times[1])
-                print "Adding ",name,fraction
-                cur.execute("INSERT INTO uptimes (start, end, name, uptime)"
-                            " VALUES (%s, %s, %s, %s)",
-                            (startTime, endTime, s, fraction))
+                self._setUptime((startTime, endTime, s), (fraction,))
             self.connection.commit()
         finally:
             self.lock.release()
@@ -493,14 +566,9 @@
                 rel = wsent / wrcvd
             else:
                 rel = 0.0
-            cur.execute(
-                "INSERT INTO echolotOneHopResults "
-                "(servername, startAt, endAt, nSent, nReceived, latency, "
-                "wsent, wreceived, reliability) "
-                "VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s)",
-                (serverName, self._time(s), self._time(e), sent, rcvd,
-                 latent, wsent, wrcvd, rel))
-
+            self._setOneHop(
+                (serverName, self._time(s), self._time(e)),
+                (sent, rcvd, latent, wsent, wrcvd, rel))
         # 3. Write current overall results into the DB.
         if allLatencies:
             latent = allLatencies[floorDiv(len(allLatencies),2)]
@@ -514,10 +582,7 @@
             rel = wsent / wrcvd
         else:
             rel = 0.0
-        cur.execute(
-            "INSERT INTO echolotCurrentOneHopResults "
-            "(servername, at, latency, reliability) VALUES (%s, %s, %s, %s)",
-            (serverName, self._time(now), latent, rel))
+        self._setCurOneHop((serverName,), (self._time(now), latent, rel))
         self.serverReliability[serverName] = rel
 
     def calculateOneHopResults(self, now=None):
@@ -531,38 +596,64 @@
         finally:
             self.lock.release()
 
-    def _calculateChainCounts(self, startAt, endAt, path):
+    def _calculate2ChainStatus(self, since, path):
         # hold lock.
+        cur = self.cursor
         cur.execute("SELECT count() FROM pings WHERE path = %s"
-                    " AND sent >= %s AND sent <= %s",
-                    (path,startAt,endAt))
+                    " AND sentat >= %s",
+                    (path,self._time(since)))
         nSent, = cur.fetchone()
         cur.execute("SELECT count() FROM pings WHERE path = %s"
-                    " AND sent >= %s AND sent <= %s AND received > 0",
-                    (path,startAt,endAt))
+                    " AND sentat >= %s AND received > 0",
+                    (path,since))
         nReceived, = cur.fetchone()
 
         servers = path.split(",")
         try:
-            rels = [ self.reliability[s] for s in servers ]
+            rels = [ self.serverReliability[s] for s in servers ]
             product = reduce(operator.mul, rels)
         except IndexError:
             product = None
 
-        return nSent, nReceived, product
+        if nSent == 0:
+            frac = 0.0
+        else:
+            frac = float(nReceived)/nSent
 
-    def _2chainIsBroken(self, startAt, endAt, path):
-        #lock
-        nSent, nReceived, product = self._calculateChainCounts()
-        frac = float(nReceived)/nSent
-        return nSent >= 3 and product is not None and frac <= product*0.3
+        isBroken = nSent >= 3 and product and frac <= product*0.3
+        isInteresting = ((nSent < 3 and nReceived == 0) or
+                         (product and frac <= product*0.3))
 
-    def _2chainIsInteresting(self, startAt, endAt, path):
-        #lock
-        nSent, nReceived, product = self._calculateChainCounts()
-        frac = float(nReceived)/nSent
-        return (nSent < 3 and nReceived == 0) or (
-            product is not None and frac <= product*0.3)
+        return nSent, nReceived, product, isBroken, isInteresting
+
+    _CHAIN_PING_HORIZON = 12*ONE_DAY
+    def calculateChainStatus(self, now=None):
+        self.lock.acquire()
+        try:
+            if now is None:
+                now = time.time()
+            brokenChains = {}
+            interestingChains = {}
+            since = now - self._CHAIN_PING_HORIZON
+
+            for s1 in self.serverNames.keys():
+                for s2 in self.serverNames.keys():
+                    p = "%s,%s" % (s1, s2)
+
+                    nS, nR, prod, isBroken, isInteresting = \
+                        self._calculate2ChainStatus(since, p)
+                    if isBroken:
+                        brokenChains[p] = 1
+                    if isInteresting:
+                        interestingChains[p] = 1
+                    self._setTwoHop((p,),
+                           (self._time(now), nS, nR, self._bool(isBroken),
+                            self._bool(isInteresting)))
+
+            self.isBroken = isBroken
+            self.isInteresting = isInteresting
+        finally:
+            self.lock.release()
 
 class PingGenerator:
     """DOCDOC"""
@@ -664,7 +755,7 @@
         for s in servers:
             nicknames[s.getNickname()]=1
         for n in nicknames.keys():
-            self._schedulePing((n,),now)
+            self._schedulePing((n,), now)
 
     def _getPeriodStart(self, t):
         return previousMidnight(t)
@@ -719,22 +810,7 @@
         return previousMidnight(t)
 
     def _getPingInterval(self, path):
-        stats = self.pingLog.getLatestStatistics()
-        if stats is None:
-            return self.DULL_INTERVAL
-        pStr = ",".join(path)
-        nSent = stats.summary.nSent.get(pStr,0)
-        nRcvd = stats.summary.nRcvd.get(pStr,0)
-        assert nRcvd <= nSent
-        if nSent < 3 and nRcvd < 1:
-            return self.INTERESTING_INTERVAL
-        try:
-            rel1 = stats.summary.reliability[path[0]]
-            rel2 = stats.summary.reliability[path[1]]
-        except KeyError:
-            return self.DULL_INTERVAL
-
-        if float(nRcvd)/nSent <= rel1*rel2*0.3:
+        if self.pingLog.isInteresting.get(path, 0):
             return self.INTERESTING_INTERVAL
         else:
             return self.DULL_INTERVAL
@@ -764,8 +840,6 @@
             self._schedulePing((n1,n2), now+60)
             #XXXX008 we need to reschedule pings when a new directory arrives
 
-#class GeometricLinkPaddingGenerator
-
 class TestLinkPaddingGenerator(PingGenerator):
     """DOCDOC"""
     def __init__(self, config):