[Author Prev][Author Next][Thread Prev][Thread Next][Author Index][Thread Index]

[minion-cvs] Key rotation bugfixes



Update of /home/minion/cvsroot/src/minion/lib/mixminion/server
In directory moria.mit.edu:/tmp/cvs-serv32574/lib/mixminion/server

Modified Files:
	ServerKeys.py ServerMain.py 
Log Message:
Key rotation bugfixes

Recreate DELKEYS target.

Add lock to scheduler; refactor scheduleRecurringComplex.

Refactor rotation timing so we don't reschedule key generation before
key generation has finished.

Change version listed in directory to 0.0.4a3.

Fix bug in getNextKeyRotation



Index: ServerKeys.py
===================================================================
RCS file: /home/minion/cvsroot/src/minion/lib/mixminion/server/ServerKeys.py,v
retrieving revision 1.30
retrieving revision 1.31
diff -u -d -r1.30 -r1.31
--- ServerKeys.py	28 May 2003 17:26:53 -0000	1.30
+++ ServerKeys.py	29 May 2003 02:01:34 -0000	1.31
@@ -128,9 +128,11 @@
             t1, t2 = keyset.getLiveness()
             self.keySets.append( (t1, t2, keyset) )
                 
-            LOG.debug("Found key %s (valid from %s to %s)",
+            LOG.trace("Found key %s (valid from %s to %s)",
                       dirname, formatDate(t1), formatDate(t2))
 
+        LOG.debug("Found %s keys.", len(self.keySets)
+
         # Now, sort the key intervals by starting time.
         self.keySets.sort()
         self.keyRange = (firstKey, lastKey)
@@ -199,7 +201,7 @@
         else:
             keySets = [ ks for ks in keySets if not ks.isPublished() ]
             if not keySets:
-                LOG.debug("publishKeys: no unpublished keys found")
+                LOG.trace("publishKeys: no unpublished keys found")
                 return
             LOG.info("Publishing %s keys to directory server...",len(keySets))
 
@@ -249,12 +251,13 @@
         else:
             lastExpiry = now
 
-        timeToCover = lastExpiry + PREPUBLICATION_INTERVAL - now
+        needToCoverUntil = now+PUBLICATION_LATENCY+PREPUBLICATION_INTERVAL
+        timeToCover = needToCoverUntil-lastExpiry
         
         lifetime = self.config['Server']['PublicKeyLifetime'].getSeconds()
-        nKeys = ceilDiv(timeToCover, lifetime)
+        nKeys = int(ceilDiv(timeToCover, lifetime))
 
-        LOG.debug("Creating %s keys", nKeys)
+        LOG.info("Creating %s keys", nKeys)
         self.createKeys(num=nKeys)
 
     def createKeys(self, num=1, startAt=None):
@@ -323,7 +326,7 @@
         # PREPUBLICATION_INTERVAL seconds after that, and we assume that
         # a key takes up to PUBLICATION_LATENCY seconds to make it into the
         # directory.
-        nextKeygen = lastExpiry - PUBLICATION_LATENCY
+        nextKeygen = lastExpiry - PUBLICATION_LATENCY - PREPUBLICATION_INTERVAL
 
         LOG.info("Last expiry at %s; next keygen at %s",
                  formatTime(lastExpiry,1), formatTime(nextKeygen, 1))
@@ -429,32 +432,41 @@
         self.nextUpdate = None
         self.getNextKeyRotation(keys)
 
-    def getNextKeyRotation(self, keys=None):
+    def getNextKeyRotation(self, curKeys=None):
         """DOCDOC"""
         if self.nextUpdate is None:
-            if keys is None:
+            if curKeys is None:
                 if self.currentKeys is None:
-                    keys = self.getServerKeysets()
+                    curKeys = self.getServerKeysets()
                 else:
-                    keys = self.currentKeys
-            addKeyEvents = []
-            rmKeyEvents = []
-            for k in keys:
+                    curKeys = self.currentKeys
+            events = []
+            curNames = {}
+            #DOCDOC
+            for k in curKeys:
                 va, vu = k.getLiveness()
-                rmKeyEvents.append(vu+self.keyOverlap)
-                addKeyEvents.append(vu)
-            add = min(addKeyEvents); rm = min(rmKeyEvents)
+                events.append((vu+self.keyOverlap, "RM"))
+                curNames[k.keyname] = 1
+            for va, vu, k in self.keySets:
+                if curNames.has_key(k.keyname): continue
+                events.append((va, "ADD"))
 
-            if add < rm:
-                LOG.info("Next key event: new key becomes valid at %s",
-                         formatTime(add,1))
-                self.nextUpdate = add
-            else:
+            events.sort()
+            if not events:
+                LOG.info("No future key rotation events.")
+                self.nextUpdate = sys.maxint
+                return self.nextUpdate
+
+            self.nextUpdate, eventType = events[0]
+            if eventType == "RM":
                 LOG.info("Next key event: old key is removed at %s",
-                         formatTime(rm,1))
-                self.nextUpdate = rm
+                         formatTime(self.nextUpdate,1))
+            else:
+                assert eventType == "ADD"
+                LOG.info("Next key event: new key becomes valid at %s",
+                         formatTime(self.nextUpdate,1))
 
-        return self.nextUpdate
+        return self.nextUpdate 
 
     def getAddress(self):
         """Return out current ip/port/keyid tuple"""

Index: ServerMain.py
===================================================================
RCS file: /home/minion/cvsroot/src/minion/lib/mixminion/server/ServerMain.py,v
retrieving revision 1.61
retrieving revision 1.62
diff -u -d -r1.61 -r1.62
--- ServerMain.py	28 May 2003 08:41:04 -0000	1.61
+++ ServerMain.py	29 May 2003 02:01:34 -0000	1.62
@@ -448,7 +448,8 @@
 STOPPING = 0 # Set to one if we get SIGTERM
 def _sigTermHandler(signal_num, _):
     '''(Signal handler for SIGTERM)'''
-    signal.signal(signal_num, _sigTermHandler)
+    # Don't suppress subsequent signals!
+    # signal.signal(signal_num, _sigTermHandler)
     global STOPPING
     STOPPING = 1
 
@@ -478,22 +479,31 @@
     def __init__(self):
         """Create a new _Scheduler"""
         self.scheduledEvents = []
+        self.schedLock = threading.RLock()
 
     def firstEventTime(self):
         """Return the time at which the earliest-scheduled event is
            supposed to occur.  Returns -1 if no events.
         """
-        if self.scheduledEvents:
-            return self.scheduledEvents[0][0]
-        else:
-            return -1
-
+        self.schedLock.acquire()
+        try:
+            if self.scheduledEvents:
+                return self.scheduledEvents[0][0]
+            else:
+                return -1
+        finally:
+            self.schedLock.release()
+            
     def scheduleOnce(self, when, name, cb):
         """Schedule a callback function, 'cb', to be invoked at time 'when.'
         """
         assert type(name) is StringType
         assert type(when) in (IntType, LongType, FloatType)
-        insort(self.scheduledEvents, (when, name, cb))
+        try:
+            self.schedLock.acquire()
+            insort(self.scheduledEvents, (when, name, cb))
+        finally:
+            self.schedLock.release()
 
     def scheduleRecurring(self, first, interval, name, cb):
         """Schedule a callback function 'cb' to be invoked at time 'first,'
@@ -502,21 +512,23 @@
         assert type(name) is StringType
         assert type(first) in (IntType, LongType, FloatType)
         assert type(interval) in (IntType, LongType, FloatType)
-        next = lambda t=time.time,i=interval: t()+i
-        insort(self.scheduledEvents, (first, name,
-                                      _RecurringEvent(name, cb, self, next)))
+        def cbWrapper(cb=cb, interval=interval):
+            cb()
+            return time.time()+interval
+        self.scheduleRecurringComplex(first,name,cbWrapper)
 
-    def scheduleRecurringComplex(self, first, name, cb, nextFn):
+    def scheduleRecurringComplex(self, first, name, cb):
         """Schedule a callback function 'cb' to be invoked at time 'first,'
            and thereafter at times returned by 'nextFn'.
 
            (nextFn is called immediately after the callback is invoked,
            every time it is invoked, and should return a time at which.)
+
+           DOCDOC
         """
         assert type(name) is StringType
         assert type(first) in (IntType, LongType, FloatType)
-        insort(self.scheduledEvents, (first, name,
-                                      _RecurringEvent(name, cb, self, nextFn)))
+        self.scheduleOnce(first, name, _RecurringEvent(name, cb, self))
 
     def processEvents(self, now=None):
         """Run all events that are scheduled to occur before 'now'.
@@ -533,24 +545,35 @@
                    scheduler.processEvents()
         """
         if now is None: now = time.time()
-        se = self.scheduledEvents
-        cbs = []
-        while se and se[0][0] <= now:
-            cbs.append(se[0][2])
-            del se[0]
+        self.schedLock.acquire()
+        try:
+            se = self.scheduledEvents
+            cbs = []
+            while se and se[0][0] <= now:
+                cbs.append(se[0][2])
+                del se[0]
+        finally:
+            self.schedLock.release()
         for cb in cbs:
             cb()
 
 class _RecurringEvent:
     """helper for _Scheduler. Calls a callback, then reschedules it."""
-    def __init__(self, name, cb, scheduler, nextFn):
+    def __init__(self, name, cb, scheduler):
         self.name = name
         self.cb = cb
         self.scheduler = scheduler
-        self.nextFn = nextFn
+
     def __call__(self):
-        self.cb()
-        self.scheduler.scheduleOnce(self.nextFn(), self.name, self)
+        nextTime = self.cb()
+        if nextTime is None:
+            LOG.warn("Not rescheduling %s", self.name)
+            return
+        elif nextTime < time.time():
+            raise MixFatalError("Tried to schedule event %s in the past! (%s)",
+                                self.name, formatTime(nextTime,1))
+
+        self.scheduler.scheduleOnce(nextTime, self.name, self)
 
 class MixminionServer(_Scheduler):
     """Wraps and drives all the queues, and the async net server.  Handles
@@ -604,7 +627,6 @@
         # The pid file.
         self.pidFile = os.path.join(homeDir, "pid")
 
-
         # Try to read the keyring.  If we have a pre-0.0.4 version of
         # mixminion, we might have some bad server descriptors lying
         # around.  If so, tell the user to run server-upgrade.
@@ -682,40 +704,40 @@
         self.processingThread.start()
         self.moduleManager.startThreading()
 
-    def updateKeys(self):
+    def updateKeys(self, lock=1):
         """DOCDOC"""
         # We don't dare to block here -- we could block the main thread for 
         # as long as it takes to generate several new RSA keys, which would
         # stomp responsiveness on slow computers.
         # ???? Could there be a more elegant approach to this?
-        if not self.keyring.lock(0):
+        if lock and not self.keyring.lock(0):
             LOG.warn("generateKeys in progress:"
                      " updateKeys delaying for 2 minutes")
             # This will cause getNextKeyRotation to return 2 minutes later
             # than now.
-            self.keyring.nextUpdate = time.time() + 120
-            return
+            return time.time() + 120
 
         try:
             self.keyring.updateKeys(self.packetHandler, self.mmtpServer)
+            return self.keyring.getNextKeyRotation()
         finally:
-            self.keyring.unlock()
+            if lock: self.keyring.unlock()
 
     def generateKeys(self):
         """DOCDOC"""
+        
         def c(self=self):
             try:
                 self.keyring.lock()
                 self.keyring.createKeysAsNeeded()
-            finally:
-                self.keyring.unlock()
-            self.updateKeys()
-            try:
-                self.keyring.lock()
+                self.updateKeys(lock=0)
                 if self.config['DirectoryServers'].get('Publish'):
                     self.keyring.publishKeys()
+                self.scheduleOnce(self.keyring.getNextKeyRotation(),
+                                  self.generateKeys)
             finally:
                 self.keyring.unlock()
+
         self.processingThread.addJob(c)
         
     def run(self):
@@ -734,25 +756,28 @@
         if EventStats.log.getNextRotation():
             self.scheduleRecurring(now+300, 300, "ES_SAVE",
                                    lambda: EventStats.log.save)
+            def _rotateStats():
+                EventStats.log.rotate()
+                return EventStats.log.getNextRotation()
             self.scheduleRecurringComplex(EventStats.log.getNextRotation(),
-                                        "ES_ROTATE",
-                                        lambda: EventStats.log.rotate,
-                                        lambda: EventStats.log.getNextRotation)
+                                          "ES_ROTATE",
+                                          _rotateStats)
+
+        def _tryTimeout(self=self):
+            self.mmtpServer.tryTimeout()
+            return self.mmtpServer.getNextTimeoutTime()
 
         self.scheduleRecurringComplex(self.mmtpServer.getNextTimeoutTime(now),
                                       "TIMEOUT",
-                                      self.mmtpServer.tryTimeout,
-                                      self.mmtpServer.getNextTimeoutTime)
+                                      _tryTimeout)
 
         self.scheduleRecurringComplex(self.keyring.getNextKeyRotation(),
                                       "KEY_ROTATE",
-                                      self.updateKeys,
-                                      self.keyring.getNextKeyRotation)
+                                      self.updateKeys)
 
-        self.scheduleRecurringComplex(self.keyring.getNextKeygen(),
-                                      "KEY_GEN",
-                                      self.generateKeys,
-                                      self.keyring.getNextKeygen)
+        self.scheduleOnce(self.keyring.getNextKeygen(),
+                          "KEY_GEN",
+                          self.generateKeys)
 
         nextMix = self.mixPool.getNextMixTime(now)
         LOG.debug("First mix at %s", formatTime(nextMix,1))
@@ -777,10 +802,10 @@
                 self.mmtpServer.process(2)
                 # Check for signals
                 if STOPPING:
-                    LOG.info("Caught sigterm; shutting down.")
+                    LOG.info("Caught SIGTERM; shutting down.")
                     return
                 elif GOT_HUP:
-                    LOG.info("Caught sighup")
+                    LOG.info("Caught SIGHUP")
                     self.doReset()
                     GOT_HUP = 0
                 # Make sure that our worker threads are still running.
@@ -1123,6 +1148,44 @@
     writeFile(os.path.join(homeDir, 'version'),
               SERVER_HOMEDIR_VERSION, 0644)
 
+
+#----------------------------------------------------------------------
+_DELKEYS_USAGE = """\
+Usage: mixminion server-DELKEYS [options]
+Options:
+  -h, --help:                Print this usage message and exit.
+  -f <file>, --config=<file> Use a configuration file other than
+                                /etc/mixminiond.conf
+""".strip()
+
+def runDELKEYS(cmd, args):
+    """Remove all keys server descriptors for old versions of this
+       server.  If any are found, nuke the keysets, """
+
+    config = configFromServerArgs(cmd, args, usage=_DELKEYS_USAGE)
+    assert config
+
+    mixminion.Common.configureShredCommand(config)
+    mixminion.Crypto.init_crypto(config)
+
+    checkHomedirVersion(config)
+
+    homeDir = config['Server']['Homedir']
+    keyDir = os.path.join(homeDir, 'keys')
+    hashDir = os.path.join(homeDir, 'work', 'hashlogs')
+    keysets = []
+    if not os.path.exists(keyDir):
+        print >>sys.stderr, "No server keys to delete"
+    else:
+        deleted = 0
+        for fn in os.listdir(keyDir):
+            if fn.startswith("key_"):
+                name = fn[4:]
+                ks = mixminion.server.ServerKeys.ServerKeyset(
+                    keyDir, name, hashDir)
+                ks.delete()
+                deleted += 1
+        print "%s keys deleted"%deleted
 
 #----------------------------------------------------------------------
 _PRINT_STATS_USAGE = """\