[Author Prev][Author Next][Thread Prev][Thread Next][Author Index][Thread Index]

[minion-cvs] More pinger work. Almost done!



Update of /home/minion/cvsroot/src/minion/lib/mixminion/server
In directory moria.mit.edu:/tmp/cvs-serv18077/lib/mixminion/server

Modified Files:
	Pinger.py ServerConfig.py ServerKeys.py ServerMain.py 
Log Message:
More pinger work.  Almost done!
- Make nearly all intervals configurable
- Change update logic to handle the time between the last connection
  attempt and the end of the day correctly.
- Actually delete ancient data as promised.
- Schedule pings based on server-held secret, to make ping timing less
  predictable.


Index: Pinger.py
===================================================================
RCS file: /home/minion/cvsroot/src/minion/lib/mixminion/server/Pinger.py,v
retrieving revision 1.19
retrieving revision 1.20
diff -u -d -r1.19 -r1.20
--- Pinger.py	17 Dec 2004 20:40:04 -0000	1.19
+++ Pinger.py	20 Dec 2004 04:16:21 -0000	1.20
@@ -46,11 +46,9 @@
 except ImportError:
     sqlite = None
 
-KEEP_HISTORY_DAYS = 15
 HEARTBEAT_INTERVAL = 30*60
 ONE_DAY = 24*60*60
 
-
 class IntervalSchedule:
     """DOCDOC -- defines a set of intervals in time."""
     def __init__(self):
@@ -322,6 +320,7 @@
         broken = {}
         interesting = {}
         for s1, s2, b, i in res:
+            if s1 == '<self>' or s2 == '<self>': continue
             p = "%s,%s"%(s1,s2)
             if b:
                 broken[p]=1
@@ -333,10 +332,11 @@
         self._interestingChains = interesting
 
     def updateServers(self, names):
+        #XXXX008 call when a new directory arrives.
         self._lock.acquire()
         try:
             for n in names:
-                self._addServer(n)
+                self._getServerID(n)
         finally:
             self._lock.release()
 
@@ -380,13 +380,25 @@
         assert len(r) == 1
         return r[0][0]
 
-    def rotate(self, now=None):
-        if now is None: now = time.time()
-        cutoff = self._time(now - KEEP_HISTORY_DAYS * ONE_DAY)
+    def rotate(self, dataCutoff, resultsCutoff):
+        #if now is None: now = time.time()
+        #sec = config['Pinging']
+        #dataCutoff = self._time(now - sec['RetainPingData'])
+        #resultsCutoff = self._time(now - sec['RetainPingResults'])
+
         cur = self._db.getCursor()
-        cur.execute("DELETE FROM myLifespan WHERE stillup < %s", cutoff)
-        cur.execute("DELETE FROM ping WHERE sentat < %s", cutoff)
-        cur.execute("DELETE FROM connectionAttempt WHERE at < %s", cutoff)
+        cur.execute("DELETE FROM myLifespan WHERE stillup < %s", dataCutoff)
+        cur.execute("DELETE FROM ping WHERE sentat < %s", dataCutoff)
+        cur.execute("DELETE FROM connectionAttempt WHERE at < %s", dataCutoff)
+
+        cur.execute("DELETE FROM uptime WHERE interval IN "
+                    "( SELECT id FROM statsInterval WHERE endAt < %s )",
+                    resultsCutoff)
+        cur.execute("DELETE FROM echolotOneHopResult WHERE interval IN "
+                    "( SELECT id FROM statsInterval WHERE endAt < %s )",
+                    resultsCutoff)
+        cur.execute("DELETE FROM statsInterval WHERE endAt < %s", resultsCutoff)
+
         self._db.getConnection().commit()
 
     def flush(self):
@@ -459,7 +471,8 @@
         self.heartbeat(now)
 
         timespan = IntervalSet( [(startTime, endTime)] )
-        intervalID = self._getIntervalID(startTime, endTime)
+        calcIntervals = [ (s,e,self._getIntervalID(s,e)) for s,e in
+                          self._intervals.getIntervals(startTime,endTime)]
 
         cur.execute("SELECT startup, stillup, shutdown FROM myLifespan WHERE "
                     "startup <= %s AND stillup >= %s",
@@ -468,22 +481,22 @@
         myIntervals = IntervalSet([ (start, max(end,shutdown))
                                     for start,end,shutdown in cur ])
         myIntervals *= timespan
-        myUptime = myIntervals.spanLength()
-        fracUptime = float(myUptime)/(endTime-startTime)
-        self._setUptime(
-            (intervalID, self._getServerID("<self>")),
-            (fracUptime,))
+        for s, e, i in calcIntervals:
+            uptime = (myIntervals * IntervalSet([(s,e)])).spanLength()
+            fracUptime = float(uptime)/(e-s)
+            self._setUptime((i, self._getServerID("<self>")), (fracUptime,))
 
         # Okay, now everybody else.
         for s, serverID in self._serverIDs.items():
+            if s == '<self>': continue
             cur.execute("SELECT at, success FROM connectionAttempt"
                         " WHERE server = %s AND at >= %s AND at <= %s"
                         " ORDER BY at",
                         serverID, startTime, endTime)
 
+            intervals = [[], []] #uptimes, downtimes
             lastStatus = None
             lastTime = None
-            times = [ 0, 0 ] # uptime, downtime
             for at, success in cur:
                 assert success in (0,1)
                 upAt, downAt = myIntervals.getIntervalContaining(at)
@@ -496,16 +509,24 @@
                     lastTime = upAt
                     lastStatus = None
                 if lastStatus is not None:
-                    t = (at-lastTime)/2.0
-                    times[success] += t
-                    times[lastStatus] += t
+                    t = (at+lastTime)/2.0
+                    intervals[lastStatus].append((lastTime,t))
+                    intervals[success].append((t,at))
                 lastStatus = success
                 lastTime = at
+            downIntervals = IntervalSet(intervals[0])
+            upIntervals = IntervalSet(intervals[1])
+            downIntervals *= myIntervals
+            upIntervals *= myIntervals
 
-            if times == [0,0]:
-                continue
-            fraction = float(times[1])/(times[0]+times[1])
-            self._setUptime((intervalID, serverID), (fraction,))
+            for s,e,intervalID in calcIntervals:
+                uptime = (upIntervals*IntervalSet([(s,e)])).spanLength()
+                downtime = (downIntervals*IntervalSet([(s,e)])).spanLength()
+                if s == 'foobart': print uptime, downtime
+                if uptime < 1 and downtime < 1:
+                    continue
+                fraction = float(uptime)/(uptime+downtime)
+                self._setUptime((intervalID, serverID), (fraction,))
 
     def calculateUptimes(self, startAt, endAt, now=None):
         if now is None: now = time.time()
@@ -515,8 +536,7 @@
         finally:
             self._lock.release()
         serverNames.sort()
-        for s, e in self._intervals.getIntervals(startAt, endAt):
-            self._calculateUptimes(serverNames, s, e, now=now)
+        self._calculateUptimes(serverNames, startAt, endAt, now=now)
         self._db.getConnection().commit()
 
     def getUptimes(self, startAt, endAt):
@@ -681,6 +701,7 @@
         serverNames.sort()
         reliability = {}
         for s in serverNames:
+            if s == '<self>': continue
             # For now, always calculate overall results.
             r = self._calculateOneHopResult(s,now,now,now,
                                              calculateOverallResults=1)
@@ -738,7 +759,9 @@
         serverNames.sort()
 
         for s1 in serverNames:
+            if s1 == '<self>': continue
             for s2 in serverNames:
+                if s2 == '<self>': continue
                 p = "%s,%s"%(s1,s2)
                 nS, nR, prod, isBroken, isInteresting = \
                     self._calculate2ChainStatus(since, s1, s2)
@@ -803,6 +826,7 @@
                    "ORDER BY name, startat", (since, now))
         lastServer = "---"
         for n,s,e,nS,nR,lat,r in cur:
+            if s == '<self>': continue
             if n != lastServer:
                 if lastServer != '---': print >>f, "   ],"
                 lastServer = n
@@ -817,6 +841,7 @@
                     "echolotCurrentOneHopResult, server WHERE "
                     "echolotCurrentOneHopResult.server = server.id")
         for n,lat,r in cur:
+            if n == '<self>': continue
             print >>f, "   %r : (%s,%.04f)," %(n,lat,r)
         print >>f, "}"
 
@@ -826,6 +851,7 @@
                     "   server as S1, server as S2 WHERE "
                     "interesting = 1 AND S1.id = server1 AND S2.id = server2")
         for s1,s2 in cur:
+            if s1 == '<self>' or s2 == '<self>': continue
             print >>f, "   '%s,%s',"%(s1,s2)
         print >>f, "]"
 
@@ -835,6 +861,7 @@
                     "   server as S1, server as S2 WHERE "
                     "broken = 1 AND S1.id = server1 AND S2.id = server2")
         for s1,s2 in cur:
+            if s1 == '<self>' or s2 == '<self>': continue
             print >>f, "   '%s,%s',"%(s1,s2)
         print >>f, "]"
         print >>f, "\n"
@@ -903,36 +930,47 @@
 class _PingScheduler:
     def __init__(self):
         self.nextPingTime = {}#path->when
-        # PERIOD
-        # PING_INTERVAL
     def connect(self, directory, outgoingQueue, pingLog, keyring):
         self.directory = directory
         self.outgoingQueue = outgoingQueue
         self.pingLog = pingLog
         self.keyring = keyring
+        self.seed = keyring.getPingerSeed()
+    def _calcPeriodLen(self, interval):
+        period = ONE_DAY
+        while period < interval*2:
+            period *= 2
+        return period
     def scheduleAllPings(self, now=None):
         raise NotImplemented()
     def _getPeriodStart(self, t):
         raise NotImplemented()
     def _getPingInterval(self, path):
         raise NotImplemented()
+    def _getPeriodLength(self):
+        raise NotImplemented()
     def _schedulePing(self,path,now=None):
         if now is None: now = int(time.time())
         periodStart = self._getPeriodStart(now)
+        periodEnd = periodStart + self._period_length
+
         interval = self._getPingInterval(path)
         t = periodStart + self._getPerturbation(path, periodStart, interval)
         t += interval * ceilDiv(now-t, interval)
-        if t>periodStart+self.PERIOD:
-            t = periodStart+self.PERIOD+self._getPerturbation(path,
-                                                    periodStart+self.PERIOD,
-                                                              interval)
+        if t>periodEnd:
+            t = periodEnd+self._getPerturbation(path,
+                                                periodEnd,
+                                                interval)
         self.nextPingTime[path] = t
         LOG.trace("Scheduling %d-hop ping for %s at %s", len(path),
                   ",".join(path), formatTime(t,1))
         return t
     def _getPerturbation(self, path, periodStart, interval):
-        #XXXX add a secret seed
-        sha = mixminion.Crypto.sha1(",".join(path) + "@@" + str(interval))
+        sha = mixminion.Crypto.sha1("%s@@%s@@%s"%(",".join(path),
+                                                  interval,
+                                                  self.seed))
+        # This modulo calculation biases the result, but less than 0.1 percent,
+        # so I don't really care.
         sec = abs(struct.unpack("I", sha[:4])[0]) % interval
         return sec
 
@@ -944,12 +982,11 @@
 
 class OneHopPingGenerator(_PingScheduler,PingGenerator):
     """DOCDOC"""
-    #XXXX008 make this configurable, but not less than 2 hours.
-    PING_INTERVAL = 2*60*60
-    PERIOD = ONE_DAY
     def __init__(self, config):
         PingGenerator.__init__(self, config)
         _PingScheduler.__init__(self)
+        self._ping_interval = config['Pinging']['ServerPingPeriod']
+        self._period_length = self._calcPeriodLen(self._pingInterval)
 
     def scheduleAllPings(self, now=None):
         if now is None: now = int(time.time())
@@ -964,7 +1001,7 @@
         return previousMidnight(t)
 
     def _getPingInterval(self, path):
-        return self.PING_INTERVAL
+        return self._ping_interval
 
     def sendPings(self, now=None):
         if now is None: now = time.time()
@@ -991,13 +1028,13 @@
 
 class TwoHopPingGenerator(_PingScheduler, PingGenerator):
     """DOCDOC"""
-    #XXXX008 make this configurable, but not less than 2 hours.
-    DULL_INTERVAL = 4*ONE_DAY
-    INTERESTING_INTERVAL = ONE_DAY
-    PERIOD = 8*ONE_DAY
     def __init__(self, config):
         PingGenerator.__init__(self, config)
         _PingScheduler.__init__(self)
+        self._dull_interval = self['Pinging']['DullChainPingPeriod'].getSeconds()
+        self._interesting_interval = self['Pinging']['ChainPingPeriod'].getSeconds()
+        self._period_length = self._calcPeriodLen(
+            max(self._interesting_interval,self._dull_interval))
 
     def scheduleAllPings(self, now=None):
         if now is None: now = time.time()
@@ -1014,9 +1051,9 @@
 
     def _getPingInterval(self, path):
         if self.pingLog._interestingChains.get(path, 0):
-            return self.INTERESTING_INTERVAL
+            return self._interesting_interval
         else:
-            return self.DULL_INTERVAL
+            return self._dull_interval
 
     def sendPings(self, now=None):
         if now is None: now = time.time()
@@ -1047,9 +1084,10 @@
     """DOCDOC"""
     def __init__(self, config):
         PingGenerator.__init__(self,config)
-        interval = config['Server']['MixInterval'].getSeconds()
-        if interval < 60*60:
-            self.prob = interval / float(60*60)
+        mixInterval = config['Server']['MixInterval'].getSeconds()
+        probeInterval = config['Pinging']['ServerProbePeriod'].getSeconds()
+        if mixInterval < probeInterval:
+            self.prob = mixInterval / float(probeInterval)
         else:
             self.prob = 1.0
     def connect(self, directory, outgoingQueue, pingLog, keyring):

Index: ServerConfig.py
===================================================================
RCS file: /home/minion/cvsroot/src/minion/lib/mixminion/server/ServerConfig.py,v
retrieving revision 1.58
retrieving revision 1.59
diff -u -d -r1.58 -r1.59
--- ServerConfig.py	24 Aug 2004 22:16:09 -0000	1.58
+++ ServerConfig.py	20 Dec 2004 04:16:21 -0000	1.59
@@ -357,7 +357,18 @@
                      'MaxBandwidth' : ('ALLOW', "size", None),
                      'MaxBandwidthSpike' : ('ALLOW', "size", None),
                      },
-        'Pinging' : { 'Enabled' : ('ALLOW', 'boolean', 'yes') },
+        #DOCDOC
+        'Pinging' : { 'Enabled' : ('ALLOW', 'boolean', 'yes'),
+                      'RecomputeInterval' : ('ALLOW', 'interval', '30 min'),
+                      'ServerPingPeriod' : ('ALLOW', 'interval', '2 hours'),
+                      'DullChainPingPeriod' : ('ALLOW', 'interval', '4 days'),
+                      'ChainPingPeriod' : ('ALLOW', 'interval', '1 day'),
+                      'ServerProbePeriod' : ('ALLOW', 'interval', '1 hour'),
+                      # XXXX008 enforce > 15 days.
+                      'RetainData' : ('ALLOW', 'interval', '30 days'),
+                      # XXXX008 enforce > 15 days
+                      'RetainResults' : ('ALLOW', 'interval', '1 year'),
+                      },
         'DirectoryServers' : { # '__SECTION__' : ('REQUIRE', None, None),
                                'ServerURL' : ('ALLOW*', None, None),
                                'PublishURL' : ('ALLOW*', None, None),

Index: ServerKeys.py
===================================================================
RCS file: /home/minion/cvsroot/src/minion/lib/mixminion/server/ServerKeys.py,v
retrieving revision 1.70
retrieving revision 1.71
diff -u -d -r1.70 -r1.71
--- ServerKeys.py	6 Dec 2004 20:02:23 -0000	1.70
+++ ServerKeys.py	20 Dec 2004 04:16:21 -0000	1.71
@@ -103,6 +103,7 @@
         self.currentKeys = None
         self._tlsContext = None #DOCDOC
         self._tlsContextExpires = -1 #DOCDOC
+        self.pingerSeed = None
         self.checkKeys()
 
     def checkKeys(self):
@@ -243,6 +244,24 @@
 
         return key
 
+    def getPingerSeed(self):
+        """DOCDOC"""
+        if self.pingerSeed is not None:
+            return self.pingerSeed
+
+        fn = os.path.join(self.keyDir, "pinger.seed")
+        if os.path.exists(fn):
+            checkPrivateFile(fn)
+            r = readFile(fn)
+            if len(r) == mixminion.Crypto.DIGEST_LEN:
+                self.pingerSeed = r
+                return r
+
+        self.pingerSeed = r = mixminion.Crypto.trng(mixminion.Crypto.DIGEST_LEN)
+        createPrivateDir(self.keyDir)
+        writeFile(fn, r, 0600)
+        return r
+
     def getIdentityKeyDigest(self):
         """DOCDOC"""
         k = self.getIdentityKey()

Index: ServerMain.py
===================================================================
RCS file: /home/minion/cvsroot/src/minion/lib/mixminion/server/ServerMain.py,v
retrieving revision 1.141
retrieving revision 1.142
diff -u -d -r1.141 -r1.142
--- ServerMain.py	13 Dec 2004 01:06:43 -0000	1.141
+++ ServerMain.py	20 Dec 2004 04:16:21 -0000	1.142
@@ -1004,13 +1004,11 @@
                 mixminion.server.Pinger.HEARTBEAT_INTERVAL))
             # FFFF if we aren't using a LOCKING_IS_COURSE database, we will
             # FFFF still want this to happen in another thread.
-            #XXXX008 use symbolic constants here
             self.scheduleEvent(RecurringEvent(
-                now+1*60, #3*60
+                now+60,
                 lambda self=self: self.pingLog.calculateAll(
-                 os.path.join(self.config.getWorkDir(), "pinger", "status")),
-                #1*60*60))
-                10*60))
+                  os.path.join(self.config.getWorkDir(), "pinger", "status")),
+                self.config['Pinging']['RecomputeInterval']))
 
         # Makes next update get scheduled.
         nextUpdate = self.updateDirectoryClient()
@@ -1123,7 +1121,9 @@
         self.outgoingQueue.cleanQueue(df)
         self.moduleManager.cleanQueues(df)
         if self.pingLog:
-            self.pingLog.rotate()
+            now = time.time()
+            self.pingLog.rotate(now-self.config['Pinging']['RetainData'],
+                                now-self.config['Pinging']['RetainResults'])
 
     def close(self):
         """Release all resources; close all files."""