[Author Prev][Author Next][Thread Prev][Thread Next][Author Index][Thread Index]

[minion-cvs] Actually calculate ping statistics and generate two-hop...



Update of /home/minion/cvsroot/src/minion/lib/mixminion/server
In directory moria.mit.edu:/tmp/cvs-serv23752/lib/mixminion/server

Modified Files:
	Pinger.py ServerMain.py 
Log Message:
Actually calculate ping statistics and generate two-hop pings at a frequency based on these stats.

Index: Pinger.py
===================================================================
RCS file: /home/minion/cvsroot/src/minion/lib/mixminion/server/Pinger.py,v
retrieving revision 1.1
retrieving revision 1.2
diff -u -d -r1.1 -r1.2
--- Pinger.py	27 Jul 2004 04:33:20 -0000	1.1
+++ Pinger.py	27 Jul 2004 23:12:16 -0000	1.2
@@ -16,7 +16,9 @@
    are.
 """
 
+import bisect
 import calendar
+import cPickle
 import os
 import re
 import struct
@@ -34,13 +36,16 @@
      formatFnameDate, formatTime, LOG, parseFnameDate, previousMidnight, \
      secureDelete
 
+KEEP_HISTORY_DAYS = 15
+USE_HISTORY_DAYS = 12
+HEARTBEAT_INTERVAL = 30*60
+ONE_DAY = 24*60*60
+
 class PingLog:
     """DOCDOC
        stores record of pinger events
     """
-    HISTORY_DAYS = 12
     HEARTBEAT_INTERVAL = 30*60
-
     def __init__(self, location):
         createPrivateDir(location)
         self.location = location
@@ -49,21 +54,29 @@
         self.lock = threading.RLock()
         self.rotate()
 
+    def _getDateString(self, now):
+        return formatFnameDate(now)
+
     def rotate(self,now=None):
         self.lock.acquire()
         try:
-            date = formatFnameDate(now)
+            date = self._getDateString(now)
             if self.file is not None:
                 if self.fname.endswith(date):
                     # no need to rotate.
                     return
                 self.rotated()
                 self.close()
+                self._rotateHook()
             self.fname = os.path.join(self.location, "events-"+date)
             self.file = open(self.fname, 'a')
+            self.rotated()
         finally:
             self.lock.release()
 
+    def _rotateHook(self):
+        pass
+
     def close(self):
         self.lock.acquire()
         try:
@@ -73,6 +86,23 @@
         finally:
             self.lock.release()
 
+    def _parseFname(self,fn):
+        if not fn.startswith("events-"):
+            return None,None
+        try:
+            date = parseFnameDate(fn[7:15])
+        except ValueError:
+            return Nne,None
+        tp = fn[15:]
+        if tp == "":
+            return date,"log"
+        elif tp == ".stat.gz":
+            return date,"stat"
+        elif tp == ".pend.gz":
+            return date,"pend"
+        else:
+            return None,None
+
     def clean(self, now=None, deleteFn=None):
         if now is None:
             now = time.time()
@@ -80,19 +110,24 @@
         try:
             self.rotate(now)
             bad = []
-            cutoff = previousMidnight(now) - 24*60*60*(self.HISTORY_DAYS+1)
+            lastPending = None
+            cutoff = previousMidnight(now) - ONE_DAY*(KEEP_HISTORY_DAYS)
+            filenames = os.listdir(self.location)
+            filenames.sort() # consider files in order of time.
             for fn in os.listdir(self.location):
-                if not fn.startswith("events-"):
-                    continue
-                try:
-                    date = parseFnameDate(fn[7:])
-                except ValueError:
-                    LOG.warn("Bad events filename %r; removing", fn)
-                    bad.append(os.path.join(self.location, fn))
+                date,type = self._parseFname(fn)
+                if not date:
+                    LOG.warn("Unrecognized events file %s",fn)
                     continue
-                if date < cutoff:
+                elif date < cutoff:
                     LOG.debug("Removing expired events file %s", fn)
                     bad.append(os.path.join(self.location, fn))
+                elif type == "pend":
+                    if self.lastPending:
+                        LOG.debug("Removing old pending-pings file %s",
+                                  lastPending)
+                        bad.append(os.path.join(self.location,lastPending))
+                        self.lastPending = fn
             if deleteFn:
                 deleteFn(bad)
             else:
@@ -108,6 +143,9 @@
         finally:
             self.lock.release()
 
+    def _event(self,tm,event):
+        pass
+
     def _write(self,*msg):
         self.lock.acquire()
         try:
@@ -115,7 +153,7 @@
             m = "%s %s\n" % (formatTime(now)," ".join(msg))
             self.file.write(m)
             self.file.flush()
-            # XXXX self.events.append((now, msg))
+            self._event(now, msg)
         finally:
             self.lock.release()
 
@@ -162,6 +200,159 @@
         LOG.debug("Received valid ping packet [%s]",formatBase64(addr))
         self.gotPing(addr)
 
+    def calculateStatistics(self,now=None):
+        pass
+
+    def getLatestStatistics(self):
+        return None
+
+class PingStatusLog(PingLog):
+    MAX_PING_AGE = 12 * ONE_DAY
+
+    def __init__(self, location):
+        PingLog.__init__(self,location)
+        self.pingStatus = None
+        self.latestStatistics = None
+        self._loadDepth=0
+        self._loadPingStatus()
+
+    def _event(self,tm,event):
+        # lock is held.
+        self.pingStatus.addEvent(tm,event)
+
+    def _rotateHook(self,fname=None):
+        # hold lock
+        if fname is None:
+            fname = self.fname
+        pr = self.pingStatus.splitResults(_nocopy=1)
+        pend = pr.pendingPings
+        pr.pendingPings=None
+        # separate these for space savings.
+        writePickled(fname+".pend.gz",pend,gzipped=1)
+        writePickled(fname+".stat.gz",pr,gzipped=1)
+
+    def _rescanImpl(self):
+        # hold lock; restore pingStatus.
+        fnames = [ fn for fn in os.listdir(self.location) if
+                   self._parseFname(fn)[1] == 'log' ]
+        fnames.sort() # go by date.
+
+        self.pingStatus = ps = PingStatus()
+        for fn in fnames[:-1]:
+            f = open(fn, 'r')
+            try:
+                ps.addFile(f)
+            finally:
+                f.close()
+            if fn != self.fname:
+                self._rotateHook(fn)
+
+    def _loadPingStatusImpl(self, binFname, pendFname, logFname):
+        # lock is held if any refs to objects are held.
+        if binFname:
+            results = readPickled(os.path.join(location,binFname),gzipped=1)
+            if results._version != PeriodResults.MAGIC:
+                return 1
+            pending = readPickled(os.path.join(location,pendFname),gzipped=1)
+            results.pendingPings=pending
+            ps = PingStatus(results)
+        else:
+            ps = PingStatus()
+        if logFname and os.path.exists(os.path.join(location,logFname)):
+            f = open(os.path.join(location,logFname), 'r')
+            ps.addFile(f)
+            f.close()
+        self.pingStatus = ps
+        return 0
+
+    def _loadPingStatus(self):
+        # lock is held if any refs to objects are held.
+        dateSet = {}
+        for fn in os.listDir(self.location):
+            date, tp = self._parseFname(fn)
+            dateSet.setdefault(date,{})[tp]=fn
+        dates = dateSet.keys()
+        dates.sort()
+        rescan = 0
+        # All files but the current one should have stat.  The last stat
+        # should have a pend.  Otherwise, rescan.
+        lastStat = None
+        for d in dates:
+            l,s=dateSet[d].get('log'), dateSet[d].get('stat')
+            if not l or l == self.fname:
+                continue
+            if not s:
+                rescan = 1
+            lastStat = d
+        if lastStat and not dateSet[lastStat].has_key('pend'):
+            rescan = 1
+
+        if rescan:
+            if self._loadDepth:
+                raise MixError("Recursive rescan")
+            self._rescanImpl()
+            self._loadDepth += 1
+            try:
+                self._loadPingStatus()
+            finally:
+                self._loadDepth -= 1
+            return
+
+        try:
+            if lastStat:
+                rescan = self._loadPingStatusImpl(dateSet[lastStat]['stat'],
+                                                  dateSet[lastStat]['pend'],
+                                                  self.fname)
+            else:
+                rescan = self._loadPingStatusImpl(None,None,self.fname)
+        except (cPickle.UnpicklingError, OSError, ValueError):
+            rescan = 1
+
+        if rescan:
+            #XXXX duplicate code.
+            if self._loadDepth:
+                raise MixError("Recursive rescan")
+            self._rescanImpl()
+            self._loadDepth += 1
+            try:
+                self._loadPingStatus()
+            finally:
+                self._loadDepth -= 1
+
+    def calculateStatistics(self,now=None):
+        if now is None:
+            now = time.time()
+        self.rotate()
+        cutoff = previousMidnight(now)-KEEP_HISTORY_DAYS*ONE_DAY
+        stats = []
+        for fn in os.listDir(self.location):
+            date, type = self._parseFname(fn)
+            if type != 'stat': continue
+            if date < cutoff: continue
+            if fn == self.fname: continue
+            stats.append((date,
+                       readPickle(os.path.join(self.location,fn),gzipped=1)))
+        stats.sort()
+        stats = [ pr for _,pr in stats ]
+        self.lock.acquire()
+        try:
+            stats.append(self.pingStatus.checkPointResults())
+        finally:
+            self.lock.release()
+
+        stats = calculatePingResults(stats, now)
+
+        self.lock.acquire()
+        try:
+            self.latestStatistics = stats
+        finally:
+            self.lock.release()
+
+        return stats
+
+    def getLatestStatistics(self):
+        return self.latestStatistics
+
 def iteratePingLog(file, func):
     _EVENT_ARGS = {
         "STARTUP" : 0,
@@ -195,30 +386,96 @@
             continue
         func(tm, event)
 
-def readEventLog(file):
-    result = []
-    iterateEventLog(file, lambda tm, event: result.append((tm,event)))
-    return result
+def readPingLog(file):
+    results = []
+    iteratePingLog(file, lambda t,e,r=results:r.append(t,e))
+    return results
+
+class PeriodResults:
+    MAGIC = "PeriodResults-0.0"
+    def __init__(self, start, end, liveness, serverUptime, serverStatus,
+                 pings, pendingPings):
+        self.start = start
+        self.end = end
+        self.liveness = liveness
+        self.serverUptime = serverUptime # nickname->n_sec.
+        self.serverDowntime = serverDowntime # nickname->n_sec.
+        self.serverStatus = serverStatus
+        self.pings = pings # must be a copy.
+        self.pendingPings = pendingPings # must be a copy.
+        self._version = self.MAGIC
 
 class PingStatus:
-    def __init__(self):
-        self.serverStatus = {} #"U"/"D",as-of
+    def __init__(self, lastResults=None):
+        if lastResults is not None:
+            # must copy
+            self.start = lastResults.start
+            self.liveness = lastResults.liveness
+            self.serverUptime = lastResults.serverUptime
+            self.pings = lastResults.pings
+            self.pendingPings = lastResults.pendingPings
+        else:
+            self.start = None
+            self.liveness = 0 # n_sec
+            self.serverUptime = {} # map from nickname->n_sec
+            self.serverDowntime = {} # map from nickname->n_sec
+            self.pings = {} # map from path->[(sent,received)...]
+            self.pendingPings = {} # map from pinghash->(sent,path)
+
+        self.lastEventTime = None
+        self.serverStatus = {} # nickname->"U"/"D", as-of
+        self.lastUpdated = None
+
+    def checkpointResults(self, _nocopy=0):
+        if self.lastEventTime is None:
+            return None
+        if _nocopy:
+            c = lambda x:x
+        else:
+            c = lambda x:x.copy()
+        self.update(self.lastEventTime)
+        return PeriodResults(self.start, self.lastEventTime,
+                             self.liveness, c(self.serverUptime),
+                             c(self.serverDowntime),
+                             c(self.serverStatus),
+                             c(self.pings),
+                             c(self.pendingPings))
+
+    def splitResults(self, _nocopy=0):
+        if self.lastEventTime is None:
+            return None
+        pr = self.checkpointResults(_nocopy=1)
+        if _nocopy:
+            c = lambda x:x
+        else:
+            c = lambda x:x.copy()
         self.serverUptime = {}
         self.serverDowntime = {}
-        self.pendingPings = {} #hash64->sent,path
-        self.lastEvent = None
+        self.serverStatus = c(self.serverStatus)
+        self.pings = self.pings
+        self.pendingPings = c(self.pendingPings)
+
+    def expirePings(self, cutoff):
+        for h,(sent,path) in self.pendingPings.items():
+            if sent<cutoff:
+                self.pings[path].setdefault(path,[]).append((sent,None))
+                del self.pendingPings[h]
+
     def addEvent(self, tm, event):
         eType = event[0]
         if eType == 'PING':
             self.pendingPings[event[1]] = tm, event[2]
         elif eType == 'GOT_PING':
+            h = event[1]
             try:
-                tSent, path = self.pendingPings[event[1]]
+                tSent, path = self.pendingPing[h]
             except KeyError:
                 # we didn't send it, or can't remember sending it.
-                pass
+                LOG.warn("Received a ping I don't remember sending (%s)",
+                         event[1])
             else:
-                self.pingDone(path, tSent, tm)
+                del self.pendingPings[event[1]]
+                self.pings.setdefault(path, []).append((tSent,tm))
         elif eType == 'CONNECTED':
             try:
                 s, tLast = self.serverStatus[event[1]]
@@ -246,43 +503,171 @@
         elif eType == 'SHUTDOWN':
             self.diedAt(tm)
         elif eType == 'STARTUP':
-            if self.lastEvent:
-                self.diedAt(self.lastEvent[0])
-
-        self.lastEvent = (tm, event)
+            if self.lastEventTime:
+                self.diedAt(self.lastEventTime)
 
-    def pingDone(self, path, queuedAt, receivedAt):
-        servers = path.split(",")
-        if len(servers) == 1:
-            self.oneHopPingDone(servers[0], queuedAt, receivedAt)
-        elif len(servers) == 2:
-            self.twoHopPingDone(servers, queuedAt, receivedAt)
-        else:
-            pass # never made now.
+        if self.start is None:
+            self.start = tm
 
-    def oneHopPingDone(self, nickname, queuedAt, receivedAt):
-        pass
+        self.lastEventTime = tm
 
+    def addFile(self, file):
+        iteratePingLog(file, self.addEvent)
 
-    def diedAt(self, diedAt):
+    def update(self, liveAt):
         for nickname, (status, tLast) in self.serverStatus.items():
             if status == 'U':
                 m = self.serverUptime
             else:
                 m = self.serverDowntime
+            if liveAt < tLast: continue
             try:
-                m[nickname] += diedAt-tLast
+                m[nickname] += liveAt-tLast
             except KeyError:
-                m[nickname] = diedAt-tLast
+                m[nickname] = liveAt-tLast
+
+        if self.lastUpdated is not None and self.lastUpdated < liveAt:
+            self.liveness += liveAt-self.last
+
+        self.lastUpdated = liveAt
+
+    def diedAt(self, diedAt):
+        self.update(diedAt)
 
         self.serverStatus = {}
-        self.lastEvent = None
+        self.lastEventTime = None
 
-    def getNetworkStatus(self):
-        nicknames = {}
-        for n in self.serverUptime.keys(): nicknames[n]=1
-        for n in self.serverDowntime.keys(): nicknames[n]=1
+class OneDayPingResults:
+    #XXXX008 move to ClientDirectory?
+    def __init__(self):
+        self.start = 0
+        self.end = 0
+        self.uptime = {} # nickname->pct
+        self.reliability = {} # path->pct
+        self.latency = {} # path->avg
+
+class PingResults:
+    #XXXX008 move to ClientDirectory?
+    def __init__(self, days, summary):
+        self.days = days # list of OneDayPingResults
+        self.summary = summary
+
+GRACE_PERIOD = ONE_DAY
+WEIGHT_AGE = [ 5, 10, 10, 10, 10, 9, 8, 5, 3, 2, 2, 1, 0, 0, 0, 0, 0 ]
+
+def calculatePingResults(periods, endAt):
+    startAt = previousMidnight(endAt) - ONE_DAY*(USE_HISTORY_DAYS)
+
+    results = [ OneDayPingResults() for _ in xrange(USE_HISTORY_DAYS+1) ]
+    summary = OneDayPingResults()
+    summary.start = startAt
+    summary.end = endAt
+    for idx in xrange(USE_HISTORY_DAYS+1):
+        results[idx].start = startAt+(ONE_DAY*idx)
+        results[idx].end = startAt+(ONE_DAY*idx) - 1
+
+    pingsByDay = [ {} for _ in xrange(USE_HISTORY_DAYS+1) ]
+    allPaths = {}
+    for p in periods:
+        for paths,timings in p.pings.items():
+            allPaths[path]=1
+            for send,recv in timings:
+                day = floorDiv(send-startAt, ONE_DAY)
+                if day<0: continue
+                pingsByDay[day].setdefault(path,[]).append((send,recv))
+    for send,path in periods[-1].values():
+        if send+GRACE_PERIOD > endAt:
+            continue
+        day = floorDiv(send-startAt, ONE_DAY)
+        if day<0: continue
+        pingsByDay[day].setdefault(path,[]).append((send,None))
+        allPaths[path]=1
+
+    maxDelay = {}
+    delays = {}
+    summary.nSent = {}
+    summary.nRcvd = {}
+    for path in allPaths.keys():
+        maxDelay[path] = 0
+        delays[path] = []
+        summary.nSent[path]=0
+        summary.nRcvd[path]=0
+    for idx in xrange(KEEP_HISTORY_DAYS+1):
+        pbd = pingsByDay[idx]
+        for path, pings in pdb.keys():
+            nRcvd = 0
+            nLost = 0
+            totalDelay = 0.0
+            for (send,recv) in pings:
+                if recv is None:
+                    nLost += 1
+                    continue
+                else:
+                    nRcvd += 1
+                    delay = recv-send
+                    totalDelay += delay
+                    if delay > maxDelay[path]:
+                        maxDelay[path]=delay
+                    delays[path].append(delay)
+            results[idx].reliability[path] = float(nRcvd)/(nRcvd+nLost)
+            results[idx].latency[path] = totalDelay/nRcvd
+            summary.nSent[path] += len(pings)
+            summary.nRcvd[path] += nRcvd
+
+    totalWeight = {}
+    totalWeightedDelay = {}
+    for path in allPaths.keys():
+        delays[path].sort()
+        totalWeight[path] = 0
+        totalWeightReceived[path] = 0
 
+    for idx in xrange(HISTORY_DAYS+1):
+        pbd = pingsByDay[idx]
+        weightAge = WEIGHT_AGE[-idx]
+        for path, pings in pdb.keys():
+            if not delays[path]:
+                continue
+            d = delays[path]
+            for send,recv in pings:
+                if recv is not None:
+                    totalWeightReceived[path] += weightAge
+                    totalWeight[path] += weightAge
+                else:
+                    fracMax = (endAt-send-15*60) * 0.8
+                    weightLatent = (bisect.bisect(d, fracMax)/len(d))
+                    totalWeight[path] += weightLatent*weightAge
+
+    for path in allPaths.keys():
+        if not totalWeight[path]:
+            summary.reliability[path]=None
+            continue
+        summary.reliability[path] = (
+            (float(totalWeightReceived[path])/totalWeight[path]))
+        d = delays[path]
+        summary.latency[path] = d[floorDiv(len(d),2)]
+
+    allServers = {}
+    for p in periods:
+        for s in p.serverUptime.keys(): allServers[s]=1
+        for s in p.serverDowntime.keys(): allServers[s]=1
+
+    for s in allServers.keys():
+        upTotal = 0
+        downTotal = 0
+        for p in periods:
+            day = floorDiv(p.start-startAt, ONE_DAY)
+            if day<0: continue
+            up = p.serverUptime.get(s,0)
+            down = p.serverUptime.get(s,0)
+            upTotal += up
+            downTotal += down
+            if up+down < 60*60:
+                continue
+            results[day].uptime[s] = float(up)/(up+down)
+        if upTotal+downTotal < 60*60: continue
+        summary.uptime[s] = float(upTotal)/(upTotal+downTotal)
+
+    return PingResults(results, s)
 
 class PingGenerator:
     """DOCDOC"""
@@ -292,6 +677,7 @@
         self.pingLog = None
         self.outgoingQueue = None
         self.myNickname = config['Server']['Nickname']
+        self.latestStatistics = None
 
     def connect(self, directory, outgoingQueue, pingLog, keyring):
         pass
@@ -340,21 +726,25 @@
         raise NotImplemented()
     def _getPeriodStart(self, t):
         raise NotImplemented()
+    def _getPingInterval(self, path):
+        raise NotImplemented()
     def _schedulePing(self,path,now=None):
         if now is None: now = time.time()
         periodStart = _getPeriodStart(now)
-        t = periodStart + self._getPerturbation(path, periodStart)
-        t += self.PING_INTERVAL * ceilDiv(now-t, self.PING_INTERVAL)
+        interval = self._getPingInterval(path)
+        t = periodStart + self._getPerturbation(path, periodStart, interval)
+        t += interval * ceilDiv(now-t, interval)
         if t>periodStart+self.PERIOD:
             t = periodStart+self.PERIOD+self._getPerturbation(path,
-                                                    periodStart+self.PERIOD)
+                                                    periodStart+self.PERIOD,
+                                                              interval)
         self.nextPingTime[path] = t
         LOG.trace("Scheduling %d-hop ping for %s at %s", len(path),
                   ",".join(path), formatTime(t,1))
         return t
-    def _getPerturbation(self, path, periodStart):
+    def _getPerturbation(self, path, periodStart, interval):
         sha = mixminion.Crypto.sha1(",".join(path) + "@@" + str(day))
-        sec = abs(struct.unpack("I", sha[:4])[0]) % self.PING_INTERVAL
+        sec = abs(struct.unpack("I", sha[:4])[0]) % interval
         return sec
 
     def getFirstPingTime(self):
@@ -363,12 +753,11 @@
         else:
             return None
 
-
 class OneHopPingGenerator(PingGenerator,_PingScheduler):
     """DOCDOC"""
     #XXXX008 make this configurable, but not less than 2 hours.
     PING_INTERVAL = 2*60*60
-    PERIOD = 24*60*60
+    PERIOD = ONE_DAY
     def __init__(self, config):
         PingGenerator.__init__(self, config)
         _PingScheduler.__init__(self)
@@ -385,6 +774,9 @@
     def _getPeriodStart(self, t):
         return previousMidnight(t)
 
+    def _getInterval(self, path):
+        return self.PING_INTERVAL
+
     def sendPings(self, now=None):
         if now is None: now = time.time()
         servers = self.directory.getAllServers()
@@ -411,8 +803,9 @@
 class TwoHopPingGenerator:
     """DOCDOC"""
     #XXXX008 make this configurable, but not less than 2 hours.
-    PING_INTERVAL = 7*24*60*60
-    PERIOD = 7*24*60*60
+    DULL_INTERVAL = 4*ONE_DAY
+    INTERESTING_INTERVAL = ONE_DAY
+    PERIOD = 8*ONE_DAY
     def __init__(self, config):
         PingGenerator.__init__(self, config)
         _PingScheduler.__init__(self)
@@ -430,6 +823,27 @@
     def _getPeriodStart(self, t):
         return previousMidnight(t)
 
+    def _getPingInterval(self, path):
+        stats = self.pingLog.getLatestStatistics()
+        if stats is None:
+            return self.DULL_INTERVAL
+        pStr = ",".join(path)
+        nSent = stats.summary.nSent.get(pStr,0)
+        nRcvd = stats.summary.nRcvd.get(pStr,0)
+        assert nRcvd <= nSent
+        if nSent < 3 and nRcvd < 1:
+            return self.INTERESTING_INTERVAL
+        try:
+            rel1 = stats.summary.reliability[path[0]]
+            rel2 = stats.summary.reliability[path[1]]
+        except KeyError:
+            return self.DULL_INTERVAL
+
+        if float(nRcvd)/nSent <= rel1*rel2*0.3:
+            return self.INTERESTING_INTERVAL
+        else:
+            return self.DULL_INTERVAL
+
     def sendPings(self, now=None):
         if now is None: now = time.time()
         servers = self.directory.getAllServers()
@@ -520,5 +934,6 @@
         return CompoundPingGenerator([])
     pingers = []
     pingers.append(OneHopPingGenerator(config))
+    pingers.append(TwoHopPingGenerator(config))
     pingers.append(TestLinkPaddingGenerator(config))
     return CompoundPingGenerator(pingers)

Index: ServerMain.py
===================================================================
RCS file: /home/minion/cvsroot/src/minion/lib/mixminion/server/ServerMain.py,v
retrieving revision 1.132
retrieving revision 1.133
diff -u -d -r1.132 -r1.133
--- ServerMain.py	27 Jul 2004 04:42:33 -0000	1.132
+++ ServerMain.py	27 Jul 2004 23:12:16 -0000	1.133
@@ -878,7 +878,7 @@
             LOG.debug("Initializing ping log")
             pingerDir = os.path.join(config.getWorkDir(), "pinger")
             pingerLogDir = os.path.join(pingerDir, "log")
-            self.pingLog = mixminion.server.Pinger.PingLog(pingerLogDir)
+            self.pingLog = mixminion.server.Pinger.PingStatusLog(pingerLogDir)
             self.pingLog.startup()
 
             LOG.debug("Initializing ping generator")
@@ -1034,6 +1034,11 @@
                 now+self.pingLog.HEARTBEAT_INTERVAL,
                 self.pingLog.heartbeat,
                 self.pingLog.HEARTBEAT_INTERVAL))
+            self.scheduleEvent(RecurringBackgroundEvent(
+                now+3*60,
+                self.processingThread.addJob,
+                self.pingLog.calculateStatistics,
+                27*60*60))
 
         # Makes next update get scheduled.
         nextUpdate = self.updateDirectoryClient()