[Author Prev][Author Next][Thread Prev][Thread Next][Author Index][Thread Index]

[minion-cvs] Fix pinger crash when a server leaves the directory



Update of /home/minion/cvsroot/src/minion/lib/mixminion/server
In directory moria.mit.edu:/tmp/cvs-serv7787/lib/mixminion/server

Modified Files:
	Pinger.py ServerMain.py 
Log Message:
Fix pinger crash when a server leaves the directory

Index: Pinger.py
===================================================================
RCS file: /home/minion/cvsroot/src/minion/lib/mixminion/server/Pinger.py,v
retrieving revision 1.21
retrieving revision 1.22
diff -u -d -r1.21 -r1.22
--- Pinger.py	20 Dec 2004 05:07:26 -0000	1.21
+++ Pinger.py	22 Dec 2004 04:47:10 -0000	1.22
@@ -39,7 +39,7 @@
 from mixminion.Common import MixError, AtomicFile, ceilDiv, createPrivateDir, \
      floorDiv, formatBase64, formatFnameDate, formatTime, IntervalSet, LOG, \
      parseFnameDate, previousMidnight, readPickled, secureDelete, \
-     succeedingMidnight, writePickled
+     succeedingMidnight, UIError, writePickled
 
 try:
     import sqlite
@@ -332,7 +332,6 @@
         self._interestingChains = interesting
 
     def updateServers(self, names):
-        #XXXX008 call when a new directory arrives.
         self._lock.acquire()
         try:
             for n in names:
@@ -897,6 +896,9 @@
     def connect(self, directory, outgoingQueue, pingLog, keyring):
         pass
 
+    def directoryUpdated(self):
+        pass
+
     def getFirstPingTime(self):
         return None
 
@@ -912,8 +914,12 @@
     def _sendOnePing(self, path1, path2):
         assert path1 and path2
         assert path2[-1].getNickname() == self.myNickname
-        p1 = self.directory.getPath(path1)
-        p2 = self.directory.getPath(path2)
+        try:
+            p1 = self.directory.getPath(path1)
+            p2 = self.directory.getPath(path2)
+        except UIError, e:
+            LOG.info("Not sending scheduled ping: %s",e)
+            return 0
         verbose_path = ",".join([s.getNickname() for s in (p1+p2[:-1])])
         payload = mixminion.BuildMessage.buildRandomPayload()
         payloadHash = mixminion.Crypto.sha1(payload)
@@ -926,6 +932,7 @@
                   formatBase64(payloadHash))
         self.pingLog.queuedPing(payloadHash, verbose_path)
         self.outgoingQueue.queueDeliveryMessage(obj, addr)
+        return 1
 
 class _PingScheduler:
     def __init__(self):
@@ -961,9 +968,11 @@
             t = periodEnd+self._getPerturbation(path,
                                                 periodEnd,
                                                 interval)
+        oldTime = self.nextPingTime.get(path, None)
         self.nextPingTime[path] = t
-        LOG.trace("Scheduling %d-hop ping for %s at %s", len(path),
-                  ",".join(path), formatTime(t,1))
+        if oldTime != t:
+            LOG.trace("Scheduling %d-hop ping for %s at %s", len(path),
+                      ",".join(path), formatTime(t,1))
         return t
     def _getPerturbation(self, path, periodStart, interval):
         sha = mixminion.Crypto.sha1("%s@@%s@@%s"%(",".join(path),
@@ -985,15 +994,23 @@
     def __init__(self, config):
         PingGenerator.__init__(self, config)
         _PingScheduler.__init__(self)
-        self._ping_interval = config['Pinging']['ServerPingPeriod'].getSeconds()
+        sec = config['Pinging']
+        self._ping_interval = sec['ServerPingPeriod'].getSeconds()
         self._period_length = self._calcPeriodLen(self._ping_interval)
 
+    def directoryUpdated(self):
+        self.scheduleAllPings()
+
     def scheduleAllPings(self, now=None):
         if now is None: now = int(time.time())
         servers = self.directory.getAllServers()
         nicknames = {}
         for s in servers:
             nicknames[s.getNickname()]=1
+        for (n,) in self.nextPingTime.keys():
+            if not nicknames.has_key(n):
+                LOG.trace("Unscheduling 1-hop ping for %s", n)
+                del self.nextPingTime[(n,)]
         for n in nicknames.keys():
             self._schedulePing((n,), now)
 
@@ -1023,25 +1040,33 @@
                 pingable.append(n)
         myDescriptor = self.keyring.getCurrentDescriptor()
         for n in pingable:
-            self._sendOnePing([n], [myDescriptor])
-            self._schedulePing((n,), now+60)
+            if self._sendOnePing([n], [myDescriptor]):
+                self._schedulePing((n,), now+60)
 
 class TwoHopPingGenerator(_PingScheduler, PingGenerator):
     """DOCDOC"""
     def __init__(self, config):
         PingGenerator.__init__(self, config)
         _PingScheduler.__init__(self)
-        self._dull_interval = config['Pinging']['DullChainPingPeriod'].getSeconds()
-        self._interesting_interval = config['Pinging']['ChainPingPeriod'].getSeconds()
+        sec = config['Pinging']
+        self._dull_interval = sec['DullChainPingPeriod'].getSeconds()
+        self._interesting_interval = sec['ChainPingPeriod'].getSeconds()
         self._period_length = self._calcPeriodLen(
             max(self._interesting_interval,self._dull_interval))
 
+    def directoryUpdated(self):
+        self.scheduleAllPings()
+
     def scheduleAllPings(self, now=None):
         if now is None: now = time.time()
         servers = self.directory.getAllServers()
         nicknames = {}
         for s in servers:
             nicknames[s.getNickname()]=1
+        for n1,n2 in self.nextPingTime.keys():
+            if not (nicknames.has_key(n1) and nicknames.has_key(n2)):
+                LOG.trace("Unscheduling 2-hop ping for %s,%s",n1,n2)
+                del self.nextPingTime[(n1,n2)]
         for n1 in nicknames.keys():
             for n2 in nicknames.keys():
                 self._schedulePing((n1,n2),now)
@@ -1076,9 +1101,8 @@
                     pingable.append((n1,n2))
         myDescriptor = self.keyring.getCurrentDescriptor()
         for n1, n2 in pingable:
-            self._sendOnePing([n1,n2], [myDescriptor])
-            self._schedulePing((n1,n2), now+60)
-            #XXXX008 we need to reschedule pings when a new directory arrives
+            if self._sendOnePing([n1,n2], [myDescriptor]):
+                self._schedulePing((n1,n2), now+60)
 
 class TestLinkPaddingGenerator(PingGenerator):
     """DOCDOC"""
@@ -1118,6 +1142,9 @@
         assert keyring
         for g in self.gens:
             g.connect(directory, outgoingQueue, pingLog, keyring)
+    def directoryUpdated(self):
+        for g in self.gens:
+            g.directoryUpdated()
     def scheduleAllPings(self, now=None):
         for g in self.gens:
             g.scheduleAllPings(now)

Index: ServerMain.py
===================================================================
RCS file: /home/minion/cvsroot/src/minion/lib/mixminion/server/ServerMain.py,v
retrieving revision 1.143
retrieving revision 1.144
diff -u -d -r1.143 -r1.144
--- ServerMain.py	20 Dec 2004 05:07:26 -0000	1.143
+++ ServerMain.py	22 Dec 2004 04:47:11 -0000	1.144
@@ -922,7 +922,7 @@
         finally:
             self.keyring.unlock()
 
-    def updateDirectoryClient(self):
+    def updateDirectoryClient(self, reschedulePings=1):
         try:
             self.dirClient.update()
             nextUpdate = succeedingMidnight(time.time()+30)
@@ -935,11 +935,22 @@
             LOG.warn("    I'll try again in an hour.")
             nextUpdate = min(succeedingMidnight(time.time()+30),
                              time.time()+3600)
+            reschedulePings = 0
         except UIError, e:#XXXX008 This should really be a new exception
             LOG.warn(str(e))
             LOG.warn("    I'll try again in an hour.")
             nextUpdate = min(succeedingMidnight(time.time()+30),
                              time.time()+3600)
+            reschedulePings = 0
+
+        if reschedulePings:
+            if self.pingGenerator:
+                self.pingGenerator.directoryUpdated()
+            if self.pingLog:
+                serverNames = [ s.getNickname()
+                                for s in self.dirClient.getAllServers() ]
+                self.pingLog.updateServers(serverNames)
+
         return nextUpdate
 
     def run(self):
@@ -1008,12 +1019,13 @@
                 now+60,
                 lambda self=self: self.pingLog.calculateAll(
                   os.path.join(self.config.getWorkDir(), "pinger", "status")),
-                self.config['Pinging']['RecomputeInterval']))
+                self.config['Pinging']['RecomputeInterval'].getSeconds()))
 
         # Makes next update get scheduled.
-        nextUpdate = self.updateDirectoryClient()
+        nextUpdate = self.updateDirectoryClient(reschedulePings=0)
         self.scheduleEvent(RecurringComplexBackgroundEvent(
             nextUpdate,
+            time.time()+4*60,
             self.processingThread.addJob,
             self.updateDirectoryClient))
 
@@ -1263,7 +1275,6 @@
 
     return config
 
-
 def readConfigFile(configFile):
     """Given a filename from the command line (or None if the user didn't
        specify a configuration file), find the configuration file, parse it,