[Author Prev][Author Next][Thread Prev][Thread Next][Author Index][Thread Index]

[minion-cvs] Simplify uptime processing in pinger; make it run witho...



Update of /home/minion/cvsroot/src/minion/lib/mixminion/server
In directory moria.mit.edu:/tmp/cvs-serv8972/lib/mixminion/server

Modified Files:
	Pinger.py 
Log Message:
Simplify uptime processing in pinger; make it run without crashing.

Index: Pinger.py
===================================================================
RCS file: /home/minion/cvsroot/src/minion/lib/mixminion/server/Pinger.py,v
retrieving revision 1.7
retrieving revision 1.8
diff -u -d -r1.7 -r1.8
--- Pinger.py	2 Dec 2004 06:47:08 -0000	1.7
+++ Pinger.py	2 Dec 2004 23:39:02 -0000	1.8
@@ -37,8 +37,8 @@
 import mixminion.server.MMTPServer
 
 from mixminion.Common import MixError, ceilDiv, createPrivateDir, \
-     floorDiv, formatBase64, formatFnameDate, formatTime, LOG, parseFnameDate,\
-     previousMidnight, readPickled, secureDelete, writePickled
+     floorDiv, formatBase64, formatFnameDate, formatTime, IntervalSet, LOG, \
+     parseFnameDate, previousMidnight, readPickled, secureDelete, writePickled
 
 try:
     import sqlite
@@ -50,24 +50,26 @@
 HEARTBEAT_INTERVAL = 30*60
 ONE_DAY = 24*60*60
 
-REALTYPES = { 'time' : 'integer',
+# Map from logical type to type used in (sqlite) database.
+REALTYPES = { 'time'    : 'integer',
               'boolean' : 'integer',
               'integer' : 'integer',
-              'float' : 'float',
+              'float'   : 'float',
               'varchar' : 'varchar',
               }
 
+
+_MERGE_ITERS_CODE = """
 _MERGE_DONE = object()
-def _wrapNext(next):
+def _wrapNext(next,DONE=_MERGE_DONE):
     def fn():
         try:
             return next()
         except StopIteration:
-            return _MERGE_DONE
+            return DONE
     return fn
-
-_MERGE_ITERS = """
-def mergeIters(iterable1, iterable2):
+def _mergeIters(iterable1, iterable2):
+    DONE = _MERGE_DONE
     n1 = _wrapNext(iter(iterable1).next)
     n2 = _wrapNext(iter(iterable2).next)
     peek1 = n1()
@@ -87,49 +89,8 @@
         peek2 = n2()
 """
 
-class _MergeIters:
-    """Given two iterators yielding their items in order, yield the ordered
-       merge of their items.
-    """
-    _DONE = object()
-    def __init__(self, iterable1, iterable2):
-        self.n1 = iter(iterable1).next
-        self.n2 = iter(iterable2).next
-        try:
-            self.peek1 = self.n1()
-        except StopIteration:
-            self.peek1 = self._DONE
-        try:
-            self.peek2 = self.n2()
-        except StopIteration:
-            self.peek2 = self._DONE()
-    def __iter__(self):
-        return self
-    def __next__(self):
-        if self.peek1 is self._DONE:
-            if self.peek2 is self._DONE:
-                raise StopIteration
-            else:
-                p1 = 0
-        elif self.peek2 is self._DONE:
-            p1 = 1
-        else:
-            p1 = self.peek1 < self.peek2
-
-        if p1:
-            r = self.peek1
-            try:
-                self.peek1 = self.n1()
-            except StopIteration:
-                self.peek1 = self._DONE
-        else:
-            try:
-                self.peek2 = self.n2()
-            except StopIteration:
-                self.peek2 = self._DONE
-
-        return r
-
+if sys.version_info[:2] >= (2,2):
+    exec _MERGE_ITERS_CODE
 
 WEIGHT_AGE_PERIOD = 24*60*60
 WEIGHT_AGE = [ 5, 10, 10, 10, 10, 9, 8, 5, 3, 2, 2, 1, 0, 0, 0, 0, 0 ]
@@ -167,7 +128,6 @@
                 body.append("%s %s %s"%(r[0],REALTYPES[r[1]],r[2]))
 
         stmt = "CREATE TABLE %s (%s)" % (name,", ".join(body))
-        #print "about to execute <<%s>>"%stmt
         self.cursor.execute(stmt)
         self.connection.commit()
 
@@ -182,7 +142,6 @@
             u = ""
         stmt = "CREATE %sINDEX %s ON %s (%s)"%(
             u, name, tablename, ", ".join(columns))
-        #print "About to execute <<%s>>"%stmt
         self.cursor.execute(stmt)
         self.connection.commit()
 
@@ -242,10 +201,11 @@
 
     def _addServer(self, name):
         # hold lock.
+        name = name.lower()
         if self.serverNames.has_key(name):
             return
-        print "!!!!",name
         self.cursor.execute("INSERT INTO servers (name) VALUES (%s)", name)
+        self.serverNames[name] = 1
 
     def rotate(self, now=None):
         self.lock.acquire()
@@ -278,54 +238,64 @@
     def _execute(self, sql, args):
         self.lock.acquire()
         try:
-            #print "about to execute %s %s"%(sql,args)
             self.cursor.execute(sql, args)
         finally:
             self.lock.release()
 
-    def _time(self, t):
-        return long(t)
-
-    def _now(self):
-        return long(time.time())
+    def _time(self, t=None):
+        return long(t or time.time())
 
     _STARTUP = "INSERT INTO lifetimes (up, stillup, shutdown) VALUES (%s,%s, 0)"
-    def startup(self):
-        self._startTime = now = self._now()
+    def startup(self,now=None):
+        self._startTime = now = self._time(now)
         self._execute(self._STARTUP, (now,now))
 
     _SHUTDOWN = "UPDATE lifetimes SET stillup = %s, shutdown = %s WHERE up = %s"
-    def shutdown(self):
+    def shutdown(self, now=None):
         if self._startTime is None: self.startup()
-        self._execute(self._SHUTDOWN, (self._now(), self._now(), self._startTime))
+        now = self._time(now)
+        self._execute(self._SHUTDOWN, (now, now, self._startTime))
 
     _HEARTBEAT = "UPDATE lifetimes SET stillup = %s WHERE up = %s"
-    def heartbeat(self):
+    def heartbeat(self, now=None):
         if self._startTime is None: self.startup()
-        self._execute(self._HEARTBEAT, (self._now(), self._startTime))
+        self._execute(self._HEARTBEAT, (self._time(now), self._startTime))
 
     def rotated(self):
         pass
 
     _CONNECTED = ("INSERT INTO connects (at, server, success) "
                   "VALUES (%s,%s,%s)")
-    def connected(self, nickname, success=1):
-        self._execute(self._CONNECTED, (self._now(), nickname.lower(), success))
+    def connected(self, nickname, success=1, now=None):
+        self.lock.acquire()
+        try:
+            self._addServer(nickname)
+            self._execute(self._CONNECTED,
+                      (self._time(now), nickname.lower(), success))
+        finally:
+            self.lock.release()
 
     def connectFailed(self, nickname):
         self.connected(nickname, 0)
 
     _QUEUED_PING = ("INSERT INTO pings (hash, path, sentat, received)"
                     "VALUES (%s,%s,%s,%s)")
-    def queuedPing(self, hash, path):
+    def queuedPing(self, hash, path, now=None):
         assert len(hash) == mixminion.Crypto.DIGEST_LEN
-        self._execute(self._QUEUED_PING,
-                      (formatBase64(hash), path, self._now(), 0))
+        self.lock.acquire()
+        try:
+            path = path.lower()
+            for s in path.split(","):
+                self._addServer(s)
+            self._execute(self._QUEUED_PING,
+                          (formatBase64(hash), path, self._time(now), 0))
+        finally:
+            self.lock.release()
 
     _GOT_PING = "UPDATE pings SET received = %s WHERE hash = %s"
-    def gotPing(self, hash):
+    def gotPing(self, hash, now=None):
         assert len(hash) == mixminion.Crypto.DIGEST_LEN
-        n = self._execute(self._GOT_PING, (self._now(), formatBase64(hash)))
+        n = self._execute(self._GOT_PING, (self._time(now), formatBase64(hash)))
         if n == 0:
             LOG.warn("Received ping with no record of its hash")
         elif n > 1:
@@ -339,84 +309,83 @@
             cur.execute("DELETE FROM uptimes WHERE start = %s AND end = %s",
                         startTime, endTime)
 
-            lifetimeEvents = []
             # First, calculate my own uptime.
             self.heartbeat()
 
-            lifetimeEvents = []
-            cur.execute("SELECT MAX(up) FROM lifetimes WHERE up < %s",startTime)
-            res = cur.fetchall()
-            if res:
-                cur.execute("SELECT up,stillup FROM lifetimes WHERE up >= %s"
-                            " AND up <= %s", res[0][0], endTime)
-            else:
-                cur.execute("SELECT up,stillup FROM lifetimes WHERE"
-                            " up <= %s", endTime)
-            events = cur.fetchall()
+            timespan = IntervalSet( [(startTime, endTime)] )
+
+            cur.execute("SELECT up, stillup, shutdown FROM lifetimes WHERE "
+                        "up <= %s AND stillup >= %s",
+                        self._time(endTime), self._time(startTime))
             myUptime = 0
-            for start,end in events:
-                lifetimeEvents.append((start,'up'))
-                lifetimeEvents.append((end,'down'))
-                if start < startTime: start = startTime
-                if end > endTime: end = endTime
-                myUptime += end-start
+            myIntervals = IntervalSet([ (start, max(end,shutdown))
+                                        for start,end,shutdown in cur ])
+            myIntervals *= timespan
+            myUptime = myIntervals.spanLength()
 
             cur.execute("INSERT INTO uptimes (start, end, name, uptime)"
                         " VALUES (%s, %s, '<self>', %s)",
                         (self._time(startTime), self._time(endTime),
                          float(myUptime)/(endTime-startTime)))
-            lifetimeEvents.sort()
-            lifetimeEvents.append((sys.maxint,1))
 
             # Okay, now everybody else.
             for s in self.serverNames.keys():
-                cur.execute("SELECT at, success FROM events"
+                cur.execute("SELECT at, success FROM connects"
                             " WHERE server = %s AND at >= %s AND at <= %s"
                             " ORDER BY at",
                             s, startTime, endTime)
-                myStatus = None
-                mySince = None
-                hisStatus = None
-                hisLast = None
-                hisTimes = [ 0,0 ]
-                for at, event in _MixIters(lifetimeEvents, iter(cur)):
-                    if event == 'up':
-                        if not myStatus: mySince = at
-                        myStatus = 1
-                    elif event == 'down':
-                        if myStatus: mySince = at
-                        myStatus = 0
-                    else:
-                        # event is 0 or 1; I tried to connect at
-                        # 'at' and succeeded or failed.
-                        if hisLast and hisLast > mySince:
-                            # The last thing that happened: I tried and
-                            # succeeeded or failed.
-                            t = (at-hisLast)/2
-                            hisTimes[hisStatus] += t
-                            hisTimes[event] += t
-                        elif myStatus:
-                            # The last thing that happened: I came online.
-                            hisTimes[event] += at-mySince
-                        else:
-                            # The last thing that happened: I went offline??
-                            LOG.warn("Internal error in calculateUptimes")
-                        hisStatus = event
-                        hisLast = at
-                if hisTimes == [0,0]:
+
+                lastStatus = None
+                lastTime = None
+                times = [ 0, 0] # uptime, downtime
+                for at, success in cur:
+                    assert success in (0,1)
+                    upAt, downAt = myIntervals.getIntervalContaining(at)
+                    if upAt == None:
+                        # Event at edge of interval
+                        #XXXX008 warn better?
+                        LOG.warn("Event out of bounds")
+                        continue
+                    if lastTime is None or upAt > lastTime:
+                        lastTime = upAt
+                        lastStatus = None
+                    if lastStatus is not None:
+                        t = (at-lastTime)/2.0
+                        times[success] += t
+                        times[lastStatus] += t
+
+                if times == [0,0]:
                     continue
-                fraction = float(hisTimes[1])/(hisTimes[0]+hisTimes[1])
+                fraction = float(times[1])/(times[0]+times[1])
+                print "Adding ",name,fraction
                 cur.execute("INSERT INTO uptimes (start, end, name, uptime)"
-                            " VALUES (%s, %s, %s, %s",
+                            " VALUES (%s, %s, %s, %s)",
                             (startTime, endTime, s, fraction))
             self.connection.commit()
         finally:
             self.lock.release()
 
+    def getUptimes(self, startAt, endAt):
+        """DODOC: uptimes for all servers overlapping [startAt, endAt],
+           as mapping from (start,end) to nickname to fraction.
+        """
+        result = {}
+        cur = self.cursor
+        self.lock.acquire()
+        try:
+            cur.execute("SELECT start, end, name, uptime FROM uptimes "
+                        "WHERE %s >= start AND %s <= end",
+                        (self._time(startAt), self._time(endAt)))
+            for s,e,n,u in cur:
+                result.setdefault((s,e), {})[n] = u
+            return result
+        finally:
+            self.lock.release()
+
     def _calculateOneHopResults(self, serverName, startTime, now=None):
         # hold lock; commit when done.
         cur = self.cursor
-        GRANULARITY = 24*60*60
+        GRANULARITY = PING_GRANULARITY
         if now is None:
             now = time.time()
         nPeriods = ceilDiv(now-startTime, GRANULARITY)