[Author Prev][Author Next][Thread Prev][Thread Next][Author Index][Thread Index]
[minion-cvs] Actually calculate ping statistics and generate two-hop...
Update of /home/minion/cvsroot/src/minion/lib/mixminion/server
In directory moria.mit.edu:/tmp/cvs-serv23752/lib/mixminion/server
Modified Files:
Pinger.py ServerMain.py
Log Message:
Actually calculate ping statistics and generate two-hop pings at a frequency based on these stats.
Index: Pinger.py
===================================================================
RCS file: /home/minion/cvsroot/src/minion/lib/mixminion/server/Pinger.py,v
retrieving revision 1.1
retrieving revision 1.2
diff -u -d -r1.1 -r1.2
--- Pinger.py 27 Jul 2004 04:33:20 -0000 1.1
+++ Pinger.py 27 Jul 2004 23:12:16 -0000 1.2
@@ -16,7 +16,9 @@
are.
"""
+import bisect
import calendar
+import cPickle
import os
import re
import struct
@@ -34,13 +36,16 @@
formatFnameDate, formatTime, LOG, parseFnameDate, previousMidnight, \
secureDelete
+KEEP_HISTORY_DAYS = 15
+USE_HISTORY_DAYS = 12
+HEARTBEAT_INTERVAL = 30*60
+ONE_DAY = 24*60*60
+
class PingLog:
"""DOCDOC
stores record of pinger events
"""
- HISTORY_DAYS = 12
HEARTBEAT_INTERVAL = 30*60
-
def __init__(self, location):
createPrivateDir(location)
self.location = location
@@ -49,21 +54,29 @@
self.lock = threading.RLock()
self.rotate()
+ def _getDateString(self, now):
+ return formatFnameDate(now)
+
def rotate(self,now=None):
self.lock.acquire()
try:
- date = formatFnameDate(now)
+ date = self._getDateString(now)
if self.file is not None:
if self.fname.endswith(date):
# no need to rotate.
return
self.rotated()
self.close()
+ self._rotateHook()
self.fname = os.path.join(self.location, "events-"+date)
self.file = open(self.fname, 'a')
+ self.rotated()
finally:
self.lock.release()
+ def _rotateHook(self):
+ pass
+
def close(self):
self.lock.acquire()
try:
@@ -73,6 +86,23 @@
finally:
self.lock.release()
+ def _parseFname(self,fn):
+ if not fn.startswith("events-"):
+ return None,None
+ try:
+ date = parseFnameDate(fn[7:15])
+ except ValueError:
+ return Nne,None
+ tp = fn[15:]
+ if tp == "":
+ return date,"log"
+ elif tp == ".stat.gz":
+ return date,"stat"
+ elif tp == ".pend.gz":
+ return date,"pend"
+ else:
+ return None,None
+
def clean(self, now=None, deleteFn=None):
if now is None:
now = time.time()
@@ -80,19 +110,24 @@
try:
self.rotate(now)
bad = []
- cutoff = previousMidnight(now) - 24*60*60*(self.HISTORY_DAYS+1)
+ lastPending = None
+ cutoff = previousMidnight(now) - ONE_DAY*(KEEP_HISTORY_DAYS)
+ filenames = os.listdir(self.location)
+ filenames.sort() # consider files in order of time.
for fn in os.listdir(self.location):
- if not fn.startswith("events-"):
- continue
- try:
- date = parseFnameDate(fn[7:])
- except ValueError:
- LOG.warn("Bad events filename %r; removing", fn)
- bad.append(os.path.join(self.location, fn))
+ date,type = self._parseFname(fn)
+ if not date:
+ LOG.warn("Unrecognized events file %s",fn)
continue
- if date < cutoff:
+ elif date < cutoff:
LOG.debug("Removing expired events file %s", fn)
bad.append(os.path.join(self.location, fn))
+ elif type == "pend":
+ if self.lastPending:
+ LOG.debug("Removing old pending-pings file %s",
+ lastPending)
+ bad.append(os.path.join(self.location,lastPending))
+ self.lastPending = fn
if deleteFn:
deleteFn(bad)
else:
@@ -108,6 +143,9 @@
finally:
self.lock.release()
+ def _event(self,tm,event):
+ pass
+
def _write(self,*msg):
self.lock.acquire()
try:
@@ -115,7 +153,7 @@
m = "%s %s\n" % (formatTime(now)," ".join(msg))
self.file.write(m)
self.file.flush()
- # XXXX self.events.append((now, msg))
+ self._event(now, msg)
finally:
self.lock.release()
@@ -162,6 +200,159 @@
LOG.debug("Received valid ping packet [%s]",formatBase64(addr))
self.gotPing(addr)
+ def calculateStatistics(self,now=None):
+ pass
+
+ def getLatestStatistics(self):
+ return None
+
+class PingStatusLog(PingLog):
+ MAX_PING_AGE = 12 * ONE_DAY
+
+ def __init__(self, location):
+ PingLog.__init__(self,location)
+ self.pingStatus = None
+ self.latestStatistics = None
+ self._loadDepth=0
+ self._loadPingStatus()
+
+ def _event(self,tm,event):
+ # lock is held.
+ self.pingStatus.addEvent(tm,event)
+
+ def _rotateHook(self,fname=None):
+ # hold lock
+ if fname is None:
+ fname = self.fname
+ pr = self.pingStatus.splitResults(_nocopy=1)
+ pend = pr.pendingPings
+ pr.pendingPings=None
+ # separate these for space savings.
+ writePickled(fname+".pend.gz",pend,gzipped=1)
+ writePickled(fname+".stat.gz",pr,gzipped=1)
+
+ def _rescanImpl(self):
+ # hold lock; restore pingStatus.
+ fnames = [ fn for fn in os.listdir(self.location) if
+ self._parseFname(fn)[1] == 'log' ]
+ fnames.sort() # go by date.
+
+ self.pingStatus = ps = PingStatus()
+ for fn in fnames[:-1]:
+ f = open(fn, 'r')
+ try:
+ ps.addFile(f)
+ finally:
+ f.close()
+ if fn != self.fname:
+ self._rotateHook(fn)
+
+ def _loadPingStatusImpl(self, binFname, pendFname, logFname):
+ # lock is held if any refs to objects are held.
+ if binFname:
+ results = readPickled(os.path.join(location,binFname),gzipped=1)
+ if results._version != PeriodResults.MAGIC:
+ return 1
+ pending = readPickled(os.path.join(location,pendFname),gzipped=1)
+ results.pendingPings=pending
+ ps = PingStatus(results)
+ else:
+ ps = PingStatus()
+ if logFname and os.path.exists(os.path.join(location,logFname)):
+ f = open(os.path.join(location,logFname), 'r')
+ ps.addFile(f)
+ f.close()
+ self.pingStatus = ps
+ return 0
+
+ def _loadPingStatus(self):
+ # lock is held if any refs to objects are held.
+ dateSet = {}
+ for fn in os.listDir(self.location):
+ date, tp = self._parseFname(fn)
+ dateSet.setdefault(date,{})[tp]=fn
+ dates = dateSet.keys()
+ dates.sort()
+ rescan = 0
+ # All files but the current one should have stat. The last stat
+ # should have a pend. Otherwise, rescan.
+ lastStat = None
+ for d in dates:
+ l,s=dateSet[d].get('log'), dateSet[d].get('stat')
+ if not l or l == self.fname:
+ continue
+ if not s:
+ rescan = 1
+ lastStat = d
+ if lastStat and not dateSet[lastStat].has_key('pend'):
+ rescan = 1
+
+ if rescan:
+ if self._loadDepth:
+ raise MixError("Recursive rescan")
+ self._rescanImpl()
+ self._loadDepth += 1
+ try:
+ self._loadPingStatus()
+ finally:
+ self._loadDepth -= 1
+ return
+
+ try:
+ if lastStat:
+ rescan = self._loadPingStatusImpl(dateSet[lastStat]['stat'],
+ dateSet[lastStat]['pend'],
+ self.fname)
+ else:
+ rescan = self._loadPingStatusImpl(None,None,self.fname)
+ except (cPickle.UnpicklingError, OSError, ValueError):
+ rescan = 1
+
+ if rescan:
+ #XXXX duplicate code.
+ if self._loadDepth:
+ raise MixError("Recursive rescan")
+ self._rescanImpl()
+ self._loadDepth += 1
+ try:
+ self._loadPingStatus()
+ finally:
+ self._loadDepth -= 1
+
+ def calculateStatistics(self,now=None):
+ if now is None:
+ now = time.time()
+ self.rotate()
+ cutoff = previousMidnight(now)-KEEP_HISTORY_DAYS*ONE_DAY
+ stats = []
+ for fn in os.listDir(self.location):
+ date, type = self._parseFname(fn)
+ if type != 'stat': continue
+ if date < cutoff: continue
+ if fn == self.fname: continue
+ stats.append((date,
+ readPickle(os.path.join(self.location,fn),gzipped=1)))
+ stats.sort()
+ stats = [ pr for _,pr in stats ]
+ self.lock.acquire()
+ try:
+ stats.append(self.pingStatus.checkPointResults())
+ finally:
+ self.lock.release()
+
+ stats = calculatePingResults(stats, now)
+
+ self.lock.acquire()
+ try:
+ self.latestStatistics = stats
+ finally:
+ self.lock.release()
+
+ return stats
+
+ def getLatestStatistics(self):
+ return self.latestStatistics
+
def iteratePingLog(file, func):
_EVENT_ARGS = {
"STARTUP" : 0,
@@ -195,30 +386,96 @@
continue
func(tm, event)
-def readEventLog(file):
- result = []
- iterateEventLog(file, lambda tm, event: result.append((tm,event)))
- return result
+def readPingLog(file):
+ results = []
+ iteratePingLog(file, lambda t,e,r=results:r.append(t,e))
+ return results
+
+class PeriodResults:
+ MAGIC = "PeriodResults-0.0"
+ def __init__(self, start, end, liveness, serverUptime, serverStatus,
+ pings, pendingPings):
+ self.start = start
+ self.end = end
+ self.liveness = liveness
+ self.serverUptime = serverUptime # nickname->n_sec.
+ self.serverDowntime = serverDowntime # nickname->n_sec.
+ self.serverStatus = serverStatus
+ self.pings = pings # must be a copy.
+ self.pendingPings = pendingPings # must be a copy.
+ self._version = self.MAGIC
class PingStatus:
- def __init__(self):
- self.serverStatus = {} #"U"/"D",as-of
+ def __init__(self, lastResults=None):
+ if lastResults is not None:
+ # must copy
+ self.start = lastResults.start
+ self.liveness = lastResults.liveness
+ self.serverUptime = lastResults.serverUptime
+ self.pings = lastResults.pings
+ self.pendingPings = lastResults.pendingPings
+ else:
+ self.start = None
+ self.liveness = 0 # n_sec
+ self.serverUptime = {} # map from nickname->n_sec
+ self.serverDowntime = {} # map from nickname->n_sec
+ self.pings = {} # map from path->[(sent,received)...]
+ self.pendingPings = {} # map from pinghash->(sent,path)
+
+ self.lastEventTime = None
+ self.serverStatus = {} # nickname->"U"/"D", as-of
+ self.lastUpdated = None
+
+ def checkpointResults(self, _nocopy=0):
+ if self.lastEventTime is None:
+ return None
+ if _nocopy:
+ c = lambda x:x
+ else:
+ c = lambda x:x.copy()
+ self.update(self.lastEventTime)
+ return PeriodResults(self.start, self.lastEventTime,
+ self.liveness, c(self.serverUptime),
+ c(self.serverDowntime),
+ c(self.serverStatus),
+ c(self.pings),
+ c(self.pendingPings))
+
+ def splitResults(self, _nocopy=0):
+ if self.lastEventTime is None:
+ return None
+ pr = self.checkpointResults(_nocopy=1)
+ if _nocopy:
+ c = lambda x:x
+ else:
+ c = lambda x:x.copy()
self.serverUptime = {}
self.serverDowntime = {}
- self.pendingPings = {} #hash64->sent,path
- self.lastEvent = None
+ self.serverStatus = c(self.serverStatus)
+ self.pings = self.pings
+ self.pendingPings = c(self.pendingPings)
+
+ def expirePings(self, cutoff):
+ for h,(sent,path) in self.pendingPings.items():
+ if sent<cutoff:
+ self.pings[path].setdefault(path,[]).append((sent,None))
+ del self.pendingPings[h]
+
def addEvent(self, tm, event):
eType = event[0]
if eType == 'PING':
self.pendingPings[event[1]] = tm, event[2]
elif eType == 'GOT_PING':
+ h = event[1]
try:
- tSent, path = self.pendingPings[event[1]]
+ tSent, path = self.pendingPing[h]
except KeyError:
# we didn't send it, or can't remember sending it.
- pass
+ LOG.warn("Received a ping I don't remember sending (%s)",
+ event[1])
else:
- self.pingDone(path, tSent, tm)
+ del self.pendingPings[event[1]]
+ self.pings.setdefault(path, []).append((tSent,tm))
elif eType == 'CONNECTED':
try:
s, tLast = self.serverStatus[event[1]]
@@ -246,43 +503,171 @@
elif eType == 'SHUTDOWN':
self.diedAt(tm)
elif eType == 'STARTUP':
- if self.lastEvent:
- self.diedAt(self.lastEvent[0])
-
- self.lastEvent = (tm, event)
+ if self.lastEventTime:
+ self.diedAt(self.lastEventTime)
- def pingDone(self, path, queuedAt, receivedAt):
- servers = path.split(",")
- if len(servers) == 1:
- self.oneHopPingDone(servers[0], queuedAt, receivedAt)
- elif len(servers) == 2:
- self.twoHopPingDone(servers, queuedAt, receivedAt)
- else:
- pass # never made now.
+ if self.start is None:
+ self.start = tm
- def oneHopPingDone(self, nickname, queuedAt, receivedAt):
- pass
+ self.lastEventTime = tm
+ def addFile(self, file):
+ iteratePingLog(file, self.addEvent)
- def diedAt(self, diedAt):
+ def update(self, liveAt):
for nickname, (status, tLast) in self.serverStatus.items():
if status == 'U':
m = self.serverUptime
else:
m = self.serverDowntime
+ if liveAt < tLast: continue
try:
- m[nickname] += diedAt-tLast
+ m[nickname] += liveAt-tLast
except KeyError:
- m[nickname] = diedAt-tLast
+ m[nickname] = liveAt-tLast
+
+ if self.lastUpdated is not None and self.lastUpdated < liveAt:
+ self.liveness += liveAt-self.last
+
+ self.lastUpdated = liveAt
+
+ def diedAt(self, diedAt):
+ self.update(diedAt)
self.serverStatus = {}
- self.lastEvent = None
+ self.lastEventTime = None
- def getNetworkStatus(self):
- nicknames = {}
- for n in self.serverUptime.keys(): nicknames[n]=1
- for n in self.serverDowntime.keys(): nicknames[n]=1
+class OneDayPingResults:
+ #XXXX008 move to ClientDirectory?
+ def __init__(self):
+ self.start = 0
+ self.end = 0
+ self.uptime = {} # nickname->pct
+ self.reliability = {} # path->pct
+ self.latency = {} # path->avg
+
+class PingResults:
+ #XXXX008 move to ClientDirectory?
+ def __init__(self, days, summary):
+ self.days = days # list of OneDayPingResults
+ self.summary = summary
+
+GRACE_PERIOD = ONE_DAY
+WEIGHT_AGE = [ 5, 10, 10, 10, 10, 9, 8, 5, 3, 2, 2, 1, 0, 0, 0, 0, 0 ]
+
+def calculatePingResults(periods, endAt):
+ startAt = previousMidnight(endAt) - ONE_DAY*(USE_HISTORY_DAYS)
+
+ results = [ OneDayPingResults() for _ in xrange(USE_HISTORY_DAYS+1) ]
+ summary = OneDayPingResults()
+ summary.start = startAt
+ summary.end = endAt
+ for idx in xrange(USE_HISTORY_DAYS+1):
+ results[idx].start = startAt+(ONE_DAY*idx)
+ results[idx].end = startAt+(ONE_DAY*idx) - 1
+
+ pingsByDay = [ {} for _ in xrange(USE_HISTORY_DAYS+1) ]
+ allPaths = {}
+ for p in periods:
+ for paths,timings in p.pings.items():
+ allPaths[path]=1
+ for send,recv in timings:
+ day = floorDiv(send-startAt, ONE_DAY)
+ if day<0: continue
+ pingsByDay[day].setdefault(path,[]).append((send,recv))
+ for send,path in periods[-1].values():
+ if send+GRACE_PERIOD > endAt:
+ continue
+ day = floorDiv(send-startAt, ONE_DAY)
+ if day<0: continue
+ pingsByDay[day].setdefault(path,[]).append((send,None))
+ allPaths[path]=1
+
+ maxDelay = {}
+ delays = {}
+ summary.nSent = {}
+ summary.nRcvd = {}
+ for path in allPaths.keys():
+ maxDelay[path] = 0
+ delays[path] = []
+ summary.nSent[path]=0
+ summary.nRcvd[path]=0
+ for idx in xrange(KEEP_HISTORY_DAYS+1):
+ pbd = pingsByDay[idx]
+ for path, pings in pdb.keys():
+ nRcvd = 0
+ nLost = 0
+ totalDelay = 0.0
+ for (send,recv) in pings:
+ if recv is None:
+ nLost += 1
+ continue
+ else:
+ nRcvd += 1
+ delay = recv-send
+ totalDelay += delay
+ if delay > maxDelay[path]:
+ maxDelay[path]=delay
+ delays[path].append(delay)
+ results[idx].reliability[path] = float(nRcvd)/(nRcvd+nLost)
+ results[idx].latency[path] = totalDelay/nRcvd
+ summary.nSent[path] += len(pings)
+ summary.nRcvd[path] += nRcvd
+
+ totalWeight = {}
+ totalWeightedDelay = {}
+ for path in allPaths.keys():
+ delays[path].sort()
+ totalWeight[path] = 0
+ totalWeightReceived[path] = 0
+ for idx in xrange(HISTORY_DAYS+1):
+ pbd = pingsByDay[idx]
+ weightAge = WEIGHT_AGE[-idx]
+ for path, pings in pdb.keys():
+ if not delays[path]:
+ continue
+ d = delays[path]
+ for send,recv in pings:
+ if recv is not None:
+ totalWeightReceived[path] += weightAge
+ totalWeight[path] += weightAge
+ else:
+ fracMax = (endAt-send-15*60) * 0.8
+ weightLatent = (bisect.bisect(d, fracMax)/len(d))
+ totalWeight[path] += weightLatent*weightAge
+
+ for path in allPaths.keys():
+ if not totalWeight[path]:
+ summary.reliability[path]=None
+ continue
+ summary.reliability[path] = (
+ (float(totalWeightReceived[path])/totalWeight[path]))
+ d = delays[path]
+ summary.latency[path] = d[floorDiv(len(d),2)]
+
+ allServers = {}
+ for p in periods:
+ for s in p.serverUptime.keys(): allServers[s]=1
+ for s in p.serverDowntime.keys(): allServers[s]=1
+
+ for s in allServers.keys():
+ upTotal = 0
+ downTotal = 0
+ for p in periods:
+ day = floorDiv(p.start-startAt, ONE_DAY)
+ if day<0: continue
+ up = p.serverUptime.get(s,0)
+ down = p.serverUptime.get(s,0)
+ upTotal += up
+ downTotal += down
+ if up+down < 60*60:
+ continue
+ results[day].uptime[s] = float(up)/(up+down)
+ if upTotal+downTotal < 60*60: continue
+ summary.uptime[s] = float(upTotal)/(upTotal+downTotal)
+
+ return PingResults(results, s)
class PingGenerator:
"""DOCDOC"""
@@ -292,6 +677,7 @@
self.pingLog = None
self.outgoingQueue = None
self.myNickname = config['Server']['Nickname']
+ self.latestStatistics = None
def connect(self, directory, outgoingQueue, pingLog, keyring):
pass
@@ -340,21 +726,25 @@
raise NotImplemented()
def _getPeriodStart(self, t):
raise NotImplemented()
+ def _getPingInterval(self, path):
+ raise NotImplemented()
def _schedulePing(self,path,now=None):
if now is None: now = time.time()
periodStart = _getPeriodStart(now)
- t = periodStart + self._getPerturbation(path, periodStart)
- t += self.PING_INTERVAL * ceilDiv(now-t, self.PING_INTERVAL)
+ interval = self._getPingInterval(path)
+ t = periodStart + self._getPerturbation(path, periodStart, interval)
+ t += interval * ceilDiv(now-t, interval)
if t>periodStart+self.PERIOD:
t = periodStart+self.PERIOD+self._getPerturbation(path,
- periodStart+self.PERIOD)
+ periodStart+self.PERIOD,
+ interval)
self.nextPingTime[path] = t
LOG.trace("Scheduling %d-hop ping for %s at %s", len(path),
",".join(path), formatTime(t,1))
return t
- def _getPerturbation(self, path, periodStart):
+ def _getPerturbation(self, path, periodStart, interval):
sha = mixminion.Crypto.sha1(",".join(path) + "@@" + str(day))
- sec = abs(struct.unpack("I", sha[:4])[0]) % self.PING_INTERVAL
+ sec = abs(struct.unpack("I", sha[:4])[0]) % interval
return sec
def getFirstPingTime(self):
@@ -363,12 +753,11 @@
else:
return None
-
class OneHopPingGenerator(PingGenerator,_PingScheduler):
"""DOCDOC"""
#XXXX008 make this configurable, but not less than 2 hours.
PING_INTERVAL = 2*60*60
- PERIOD = 24*60*60
+ PERIOD = ONE_DAY
def __init__(self, config):
PingGenerator.__init__(self, config)
_PingScheduler.__init__(self)
@@ -385,6 +774,9 @@
def _getPeriodStart(self, t):
return previousMidnight(t)
+ def _getInterval(self, path):
+ return self.PING_INTERVAL
+
def sendPings(self, now=None):
if now is None: now = time.time()
servers = self.directory.getAllServers()
@@ -411,8 +803,9 @@
class TwoHopPingGenerator:
"""DOCDOC"""
#XXXX008 make this configurable, but not less than 2 hours.
- PING_INTERVAL = 7*24*60*60
- PERIOD = 7*24*60*60
+ DULL_INTERVAL = 4*ONE_DAY
+ INTERESTING_INTERVAL = ONE_DAY
+ PERIOD = 8*ONE_DAY
def __init__(self, config):
PingGenerator.__init__(self, config)
_PingScheduler.__init__(self)
@@ -430,6 +823,27 @@
def _getPeriodStart(self, t):
return previousMidnight(t)
+ def _getPingInterval(self, path):
+ stats = self.pingLog.getLatestStatistics()
+ if stats is None:
+ return self.DULL_INTERVAL
+ pStr = ",".join(path)
+ nSent = stats.summary.nSent.get(pStr,0)
+ nRcvd = stats.summary.nRcvd.get(pStr,0)
+ assert nRcvd <= nSent
+ if nSent < 3 and nRcvd < 1:
+ return self.INTERESTING_INTERVAL
+ try:
+ rel1 = stats.summary.reliability[path[0]]
+ rel2 = stats.summary.reliability[path[1]]
+ except KeyError:
+ return self.DULL_INTERVAL
+
+ if float(nRcvd)/nSent <= rel1*rel2*0.3:
+ return self.INTERESTING_INTERVAL
+ else:
+ return self.DULL_INTERVAL
+
def sendPings(self, now=None):
if now is None: now = time.time()
servers = self.directory.getAllServers()
@@ -520,5 +934,6 @@
return CompoundPingGenerator([])
pingers = []
pingers.append(OneHopPingGenerator(config))
+ pingers.append(TwoHopPingGenerator(config))
pingers.append(TestLinkPaddingGenerator(config))
return CompoundPingGenerator(pingers)
Index: ServerMain.py
===================================================================
RCS file: /home/minion/cvsroot/src/minion/lib/mixminion/server/ServerMain.py,v
retrieving revision 1.132
retrieving revision 1.133
diff -u -d -r1.132 -r1.133
--- ServerMain.py 27 Jul 2004 04:42:33 -0000 1.132
+++ ServerMain.py 27 Jul 2004 23:12:16 -0000 1.133
@@ -878,7 +878,7 @@
LOG.debug("Initializing ping log")
pingerDir = os.path.join(config.getWorkDir(), "pinger")
pingerLogDir = os.path.join(pingerDir, "log")
- self.pingLog = mixminion.server.Pinger.PingLog(pingerLogDir)
+ self.pingLog = mixminion.server.Pinger.PingStatusLog(pingerLogDir)
self.pingLog.startup()
LOG.debug("Initializing ping generator")
@@ -1034,6 +1034,11 @@
now+self.pingLog.HEARTBEAT_INTERVAL,
self.pingLog.heartbeat,
self.pingLog.HEARTBEAT_INTERVAL))
+ self.scheduleEvent(RecurringBackgroundEvent(
+ now+3*60,
+ self.processingThread.addJob,
+ self.pingLog.calculateStatistics,
+ 27*60*60))
# Makes next update get scheduled.
nextUpdate = self.updateDirectoryClient()