[Author Prev][Author Next][Thread Prev][Thread Next][Author Index][Thread Index]
[minion-cvs] More pinger work. Almost done!
Update of /home/minion/cvsroot/src/minion/lib/mixminion/server
In directory moria.mit.edu:/tmp/cvs-serv18077/lib/mixminion/server
Modified Files:
Pinger.py ServerConfig.py ServerKeys.py ServerMain.py
Log Message:
More pinger work. Almost done!
- Make nearly all intervals configurable
- Change update logic to handle the time between the last connection
attempt and the end of the day correctly.
- Actually delete ancient data as promised.
- Schedule pings based on server-held secret, to make ping timing less
predictable.
Index: Pinger.py
===================================================================
RCS file: /home/minion/cvsroot/src/minion/lib/mixminion/server/Pinger.py,v
retrieving revision 1.19
retrieving revision 1.20
diff -u -d -r1.19 -r1.20
--- Pinger.py 17 Dec 2004 20:40:04 -0000 1.19
+++ Pinger.py 20 Dec 2004 04:16:21 -0000 1.20
@@ -46,11 +46,9 @@
except ImportError:
sqlite = None
-KEEP_HISTORY_DAYS = 15
HEARTBEAT_INTERVAL = 30*60
ONE_DAY = 24*60*60
-
class IntervalSchedule:
"""DOCDOC -- defines a set of intervals in time."""
def __init__(self):
@@ -322,6 +320,7 @@
broken = {}
interesting = {}
for s1, s2, b, i in res:
+ if s1 == '<self>' or s2 == '<self>': continue
p = "%s,%s"%(s1,s2)
if b:
broken[p]=1
@@ -333,10 +332,11 @@
self._interestingChains = interesting
def updateServers(self, names):
+ #XXXX008 call when a new directory arrives.
self._lock.acquire()
try:
for n in names:
- self._addServer(n)
+ self._getServerID(n)
finally:
self._lock.release()
@@ -380,13 +380,25 @@
assert len(r) == 1
return r[0][0]
- def rotate(self, now=None):
- if now is None: now = time.time()
- cutoff = self._time(now - KEEP_HISTORY_DAYS * ONE_DAY)
+ def rotate(self, dataCutoff, resultsCutoff):
+ #if now is None: now = time.time()
+ #sec = config['Pinging']
+ #dataCutoff = self._time(now - sec['RetainPingData'])
+ #resultsCutoff = self._time(now - sec['RetainPingResults'])
+
cur = self._db.getCursor()
- cur.execute("DELETE FROM myLifespan WHERE stillup < %s", cutoff)
- cur.execute("DELETE FROM ping WHERE sentat < %s", cutoff)
- cur.execute("DELETE FROM connectionAttempt WHERE at < %s", cutoff)
+ cur.execute("DELETE FROM myLifespan WHERE stillup < %s", dataCutoff)
+ cur.execute("DELETE FROM ping WHERE sentat < %s", dataCutoff)
+ cur.execute("DELETE FROM connectionAttempt WHERE at < %s", dataCutoff)
+
+ cur.execute("DELETE FROM uptime WHERE interval IN "
+ "( SELECT id FROM statsInterval WHERE endAt < %s )",
+ resultsCutoff)
+ cur.execute("DELETE FROM echolotOneHopResult WHERE interval IN "
+ "( SELECT id FROM statsInterval WHERE endAt < %s )",
+ resultsCutoff)
+ cur.execute("DELETE FROM statsInterval WHERE endAt < %s", resultsCutoff)
+
self._db.getConnection().commit()
def flush(self):
@@ -459,7 +471,8 @@
self.heartbeat(now)
timespan = IntervalSet( [(startTime, endTime)] )
- intervalID = self._getIntervalID(startTime, endTime)
+ calcIntervals = [ (s,e,self._getIntervalID(s,e)) for s,e in
+ self._intervals.getIntervals(startTime,endTime)]
cur.execute("SELECT startup, stillup, shutdown FROM myLifespan WHERE "
"startup <= %s AND stillup >= %s",
@@ -468,22 +481,22 @@
myIntervals = IntervalSet([ (start, max(end,shutdown))
for start,end,shutdown in cur ])
myIntervals *= timespan
- myUptime = myIntervals.spanLength()
- fracUptime = float(myUptime)/(endTime-startTime)
- self._setUptime(
- (intervalID, self._getServerID("<self>")),
- (fracUptime,))
+ for s, e, i in calcIntervals:
+ uptime = (myIntervals * IntervalSet([(s,e)])).spanLength()
+ fracUptime = float(uptime)/(e-s)
+ self._setUptime((i, self._getServerID("<self>")), (fracUptime,))
# Okay, now everybody else.
for s, serverID in self._serverIDs.items():
+ if s == '<self>': continue
cur.execute("SELECT at, success FROM connectionAttempt"
" WHERE server = %s AND at >= %s AND at <= %s"
" ORDER BY at",
serverID, startTime, endTime)
+ intervals = [[], []] #uptimes, downtimes
lastStatus = None
lastTime = None
- times = [ 0, 0 ] # uptime, downtime
for at, success in cur:
assert success in (0,1)
upAt, downAt = myIntervals.getIntervalContaining(at)
@@ -496,16 +509,24 @@
lastTime = upAt
lastStatus = None
if lastStatus is not None:
- t = (at-lastTime)/2.0
- times[success] += t
- times[lastStatus] += t
+ t = (at+lastTime)/2.0
+ intervals[lastStatus].append((lastTime,t))
+ intervals[success].append((t,at))
lastStatus = success
lastTime = at
+ downIntervals = IntervalSet(intervals[0])
+ upIntervals = IntervalSet(intervals[1])
+ downIntervals *= myIntervals
+ upIntervals *= myIntervals
- if times == [0,0]:
- continue
- fraction = float(times[1])/(times[0]+times[1])
- self._setUptime((intervalID, serverID), (fraction,))
+ for s,e,intervalID in calcIntervals:
+ uptime = (upIntervals*IntervalSet([(s,e)])).spanLength()
+ downtime = (downIntervals*IntervalSet([(s,e)])).spanLength()
+ if s == 'foobart': print uptime, downtime
+ if uptime < 1 and downtime < 1:
+ continue
+ fraction = float(uptime)/(uptime+downtime)
+ self._setUptime((intervalID, serverID), (fraction,))
def calculateUptimes(self, startAt, endAt, now=None):
if now is None: now = time.time()
@@ -515,8 +536,7 @@
finally:
self._lock.release()
serverNames.sort()
- for s, e in self._intervals.getIntervals(startAt, endAt):
- self._calculateUptimes(serverNames, s, e, now=now)
+ self._calculateUptimes(serverNames, startAt, endAt, now=now)
self._db.getConnection().commit()
def getUptimes(self, startAt, endAt):
@@ -681,6 +701,7 @@
serverNames.sort()
reliability = {}
for s in serverNames:
+ if s == '<self>': continue
# For now, always calculate overall results.
r = self._calculateOneHopResult(s,now,now,now,
calculateOverallResults=1)
@@ -738,7 +759,9 @@
serverNames.sort()
for s1 in serverNames:
+ if s1 == '<self>': continue
for s2 in serverNames:
+ if s2 == '<self>': continue
p = "%s,%s"%(s1,s2)
nS, nR, prod, isBroken, isInteresting = \
self._calculate2ChainStatus(since, s1, s2)
@@ -803,6 +826,7 @@
"ORDER BY name, startat", (since, now))
lastServer = "---"
for n,s,e,nS,nR,lat,r in cur:
+ if s == '<self>': continue
if n != lastServer:
if lastServer != '---': print >>f, " ],"
lastServer = n
@@ -817,6 +841,7 @@
"echolotCurrentOneHopResult, server WHERE "
"echolotCurrentOneHopResult.server = server.id")
for n,lat,r in cur:
+ if n == '<self>': continue
print >>f, " %r : (%s,%.04f)," %(n,lat,r)
print >>f, "}"
@@ -826,6 +851,7 @@
" server as S1, server as S2 WHERE "
"interesting = 1 AND S1.id = server1 AND S2.id = server2")
for s1,s2 in cur:
+ if s1 == '<self>' or s2 == '<self>': continue
print >>f, " '%s,%s',"%(s1,s2)
print >>f, "]"
@@ -835,6 +861,7 @@
" server as S1, server as S2 WHERE "
"broken = 1 AND S1.id = server1 AND S2.id = server2")
for s1,s2 in cur:
+ if s1 == '<self>' or s2 == '<self>': continue
print >>f, " '%s,%s',"%(s1,s2)
print >>f, "]"
print >>f, "\n"
@@ -903,36 +930,47 @@
class _PingScheduler:
def __init__(self):
self.nextPingTime = {}#path->when
- # PERIOD
- # PING_INTERVAL
def connect(self, directory, outgoingQueue, pingLog, keyring):
self.directory = directory
self.outgoingQueue = outgoingQueue
self.pingLog = pingLog
self.keyring = keyring
+ self.seed = keyring.getPingerSeed()
+ def _calcPeriodLen(self, interval):
+ period = ONE_DAY
+ while period < interval*2:
+ period *= 2
+ return period
def scheduleAllPings(self, now=None):
raise NotImplemented()
def _getPeriodStart(self, t):
raise NotImplemented()
def _getPingInterval(self, path):
raise NotImplemented()
+ def _getPeriodLength(self):
+ raise NotImplemented()
def _schedulePing(self,path,now=None):
if now is None: now = int(time.time())
periodStart = self._getPeriodStart(now)
+ periodEnd = periodStart + self._period_length
+
interval = self._getPingInterval(path)
t = periodStart + self._getPerturbation(path, periodStart, interval)
t += interval * ceilDiv(now-t, interval)
- if t>periodStart+self.PERIOD:
- t = periodStart+self.PERIOD+self._getPerturbation(path,
- periodStart+self.PERIOD,
- interval)
+ if t>periodEnd:
+ t = periodEnd+self._getPerturbation(path,
+ periodEnd,
+ interval)
self.nextPingTime[path] = t
LOG.trace("Scheduling %d-hop ping for %s at %s", len(path),
",".join(path), formatTime(t,1))
return t
def _getPerturbation(self, path, periodStart, interval):
- #XXXX add a secret seed
- sha = mixminion.Crypto.sha1(",".join(path) + "@@" + str(interval))
+ sha = mixminion.Crypto.sha1("%s@@%s@@%s"%(",".join(path),
+ interval,
+ self.seed))
+ # This modulo calculation biases the result, but less than 0.1 percent,
+ # so I don't really care.
sec = abs(struct.unpack("I", sha[:4])[0]) % interval
return sec
@@ -944,12 +982,11 @@
class OneHopPingGenerator(_PingScheduler,PingGenerator):
"""DOCDOC"""
- #XXXX008 make this configurable, but not less than 2 hours.
- PING_INTERVAL = 2*60*60
- PERIOD = ONE_DAY
def __init__(self, config):
PingGenerator.__init__(self, config)
_PingScheduler.__init__(self)
+ self._ping_interval = config['Pinging']['ServerPingPeriod']
+ self._period_length = self._calcPeriodLen(self._pingInterval)
def scheduleAllPings(self, now=None):
if now is None: now = int(time.time())
@@ -964,7 +1001,7 @@
return previousMidnight(t)
def _getPingInterval(self, path):
- return self.PING_INTERVAL
+ return self._ping_interval
def sendPings(self, now=None):
if now is None: now = time.time()
@@ -991,13 +1028,13 @@
class TwoHopPingGenerator(_PingScheduler, PingGenerator):
"""DOCDOC"""
- #XXXX008 make this configurable, but not less than 2 hours.
- DULL_INTERVAL = 4*ONE_DAY
- INTERESTING_INTERVAL = ONE_DAY
- PERIOD = 8*ONE_DAY
def __init__(self, config):
PingGenerator.__init__(self, config)
_PingScheduler.__init__(self)
+ self._dull_interval = self['Pinging']['DullChainPingPeriod'].getSeconds()
+ self._interesting_interval = self['Pinging']['ChainPingPeriod'].getSeconds()
+ self._period_length = self._calcPeriodLen(
+ max(self._interesting_interval,self._dull_interval))
def scheduleAllPings(self, now=None):
if now is None: now = time.time()
@@ -1014,9 +1051,9 @@
def _getPingInterval(self, path):
if self.pingLog._interestingChains.get(path, 0):
- return self.INTERESTING_INTERVAL
+ return self._interesting_interval
else:
- return self.DULL_INTERVAL
+ return self._dull_interval
def sendPings(self, now=None):
if now is None: now = time.time()
@@ -1047,9 +1084,10 @@
"""DOCDOC"""
def __init__(self, config):
PingGenerator.__init__(self,config)
- interval = config['Server']['MixInterval'].getSeconds()
- if interval < 60*60:
- self.prob = interval / float(60*60)
+ mixInterval = config['Server']['MixInterval'].getSeconds()
+ probeInterval = config['Pinging']['ServerProbePeriod'].getSeconds()
+ if mixInterval < probeInterval:
+ self.prob = mixInterval / float(probeInterval)
else:
self.prob = 1.0
def connect(self, directory, outgoingQueue, pingLog, keyring):
Index: ServerConfig.py
===================================================================
RCS file: /home/minion/cvsroot/src/minion/lib/mixminion/server/ServerConfig.py,v
retrieving revision 1.58
retrieving revision 1.59
diff -u -d -r1.58 -r1.59
--- ServerConfig.py 24 Aug 2004 22:16:09 -0000 1.58
+++ ServerConfig.py 20 Dec 2004 04:16:21 -0000 1.59
@@ -357,7 +357,18 @@
'MaxBandwidth' : ('ALLOW', "size", None),
'MaxBandwidthSpike' : ('ALLOW', "size", None),
},
- 'Pinging' : { 'Enabled' : ('ALLOW', 'boolean', 'yes') },
+ #DOCDOC
+ 'Pinging' : { 'Enabled' : ('ALLOW', 'boolean', 'yes'),
+ 'RecomputeInterval' : ('ALLOW', 'interval', '30 min'),
+ 'ServerPingPeriod' : ('ALLOW', 'interval', '2 hours'),
+ 'DullChainPingPeriod' : ('ALLOW', 'interval', '4 days'),
+ 'ChainPingPeriod' : ('ALLOW', 'interval', '1 day'),
+ 'ServerProbePeriod' : ('ALLOW', 'interval', '1 hour'),
+ # XXXX008 enforce > 15 days.
+ 'RetainData' : ('ALLOW', 'interval', '30 days'),
+ # XXXX008 enforce > 15 days
+ 'RetainResults' : ('ALLOW', 'interval', '1 year'),
+ },
'DirectoryServers' : { # '__SECTION__' : ('REQUIRE', None, None),
'ServerURL' : ('ALLOW*', None, None),
'PublishURL' : ('ALLOW*', None, None),
Index: ServerKeys.py
===================================================================
RCS file: /home/minion/cvsroot/src/minion/lib/mixminion/server/ServerKeys.py,v
retrieving revision 1.70
retrieving revision 1.71
diff -u -d -r1.70 -r1.71
--- ServerKeys.py 6 Dec 2004 20:02:23 -0000 1.70
+++ ServerKeys.py 20 Dec 2004 04:16:21 -0000 1.71
@@ -103,6 +103,7 @@
self.currentKeys = None
self._tlsContext = None #DOCDOC
self._tlsContextExpires = -1 #DOCDOC
+ self.pingerSeed = None
self.checkKeys()
def checkKeys(self):
@@ -243,6 +244,24 @@
return key
+ def getPingerSeed(self):
+ """DOCDOC"""
+ if self.pingerSeed is not None:
+ return self.pingerSeed
+
+ fn = os.path.join(self.keyDir, "pinger.seed")
+ if os.path.exists(fn):
+ checkPrivateFile(fn)
+ r = readFile(fn)
+ if len(r) == mixminion.Crypto.DIGEST_LEN:
+ self.pingerSeed = r
+ return r
+
+ self.pingerSeed = r = mixminion.Crypto.trng(mixminion.Crypto.DIGEST_LEN)
+ createPrivateDir(self.keyDir)
+ writeFile(fn, r, 0600)
+ return r
+
def getIdentityKeyDigest(self):
"""DOCDOC"""
k = self.getIdentityKey()
Index: ServerMain.py
===================================================================
RCS file: /home/minion/cvsroot/src/minion/lib/mixminion/server/ServerMain.py,v
retrieving revision 1.141
retrieving revision 1.142
diff -u -d -r1.141 -r1.142
--- ServerMain.py 13 Dec 2004 01:06:43 -0000 1.141
+++ ServerMain.py 20 Dec 2004 04:16:21 -0000 1.142
@@ -1004,13 +1004,11 @@
mixminion.server.Pinger.HEARTBEAT_INTERVAL))
# FFFF if we aren't using a LOCKING_IS_COURSE database, we will
# FFFF still want this to happen in another thread.
- #XXXX008 use symbolic constants here
self.scheduleEvent(RecurringEvent(
- now+1*60, #3*60
+ now+60,
lambda self=self: self.pingLog.calculateAll(
- os.path.join(self.config.getWorkDir(), "pinger", "status")),
- #1*60*60))
- 10*60))
+ os.path.join(self.config.getWorkDir(), "pinger", "status")),
+ self.config['Pinging']['RecomputeInterval']))
# Makes next update get scheduled.
nextUpdate = self.updateDirectoryClient()
@@ -1123,7 +1121,9 @@
self.outgoingQueue.cleanQueue(df)
self.moduleManager.cleanQueues(df)
if self.pingLog:
- self.pingLog.rotate()
+ now = time.time()
+ self.pingLog.rotate(now-self.config['Pinging']['RetainData'],
+ now-self.config['Pinging']['RetainResults'])
def close(self):
"""Release all resources; close all files."""