[Author Prev][Author Next][Thread Prev][Thread Next][Author Index][Thread Index]

[minion-cvs] Deprecate, document, and extend. Implement faster shut...



Update of /home/minion/cvsroot/src/minion/lib/mixminion/server
In directory moria.mit.edu:/tmp/cvs-serv25437/minion/lib/mixminion/server

Modified Files:
	MMTPServer.py ServerKeys.py ServerMain.py ServerQueue.py 
Log Message:
Deprecate, document, and extend.  Implement faster shutdown.

Index: MMTPServer.py
===================================================================
RCS file: /home/minion/cvsroot/src/minion/lib/mixminion/server/MMTPServer.py,v
retrieving revision 1.40
retrieving revision 1.41
diff -u -d -r1.40 -r1.41
--- MMTPServer.py	13 Jun 2003 01:03:46 -0000	1.40
+++ MMTPServer.py	26 Jun 2003 03:23:53 -0000	1.41
@@ -700,6 +700,7 @@
 NULL_KEYID = "\x00"*20
 
 class DeliverableMessage:
+    """Interface to be impemented by messages deliverable by MMTP """
     def __init__(self):
         pass
     def getContents(self):
@@ -710,6 +711,10 @@
         raise NotImplementedError
 
 class DeliverablePacket(DeliverableMessage):
+    """Implementation of DelierableMessage.
+
+       Wraps a ServerQueue.PendingMessage object for a queue holding
+       PacketHandler.RelayPacket objects."""
     def __init__(self, pending):
         DeliverableMessage.__init__(self)
         self.pending = pending
@@ -724,12 +729,13 @@
     """Asynchronious implementation of the sending ("client") side of a
        mixminion connection."""
     ## Fields:
-    # ip, port, keyID, messageList, handleList, sendCallback, failCallback,
-    # finishedCallback, certCache:
+    # ip, port, keyID, messageList, finishedCallback, certCache:
     #   As described in the docstring for __init__ below.  We remove entries
     #   from the front of messageList/handleList as we begin sending them.
-    # _curMessage, _curHandle: Correspond to the message and handle
-    #     that we are currently trying to deliver.
+    # _curMessage, _curHandle: Correspond to the message that we are
+    #     currently trying to deliver.  If _curHandle is None, _curMessage
+    #     is a control string.  If _curHandle is a DeliverableMessage,
+    #     _curMessage is the corresponding 32KB string.
     # junk: A list of 32KB padding chunks that we're going to send.  We
     #   pregenerate these to avoid timing attacks.  They correspond to
     #   the 'JUNK' entries in messageList.
@@ -752,8 +758,10 @@
            port -- The port to connect to.
            keyID -- None, or the expected SHA1 hash of the server's public key
            messageList -- a list of message payloads and control strings.
-               The control string "JUNK" sends 32KB of padding; the control
-               string "RENEGOTIATE" renegotiates the connection key.
+               Message payloads must implement the DeliverableMessage
+               interface above; allowable control strings are either
+               string "JUNK", which sends 32KB of padding; or the control
+               string "RENEGOTIATE" which renegotiates the connection key.
            finishedCallback -- None, or a function to be called when this
               connection is closed.
            certCache -- an instance of PeerCertificateCache to use for
@@ -761,11 +769,9 @@
         """
         # Generate junk before connecting to avoid timing attacks
         self.junk = []
-        #DOCDOC
         self.messageList = []
         self.active = 1
 
-        #DOCDOC
         self.addMessages(messageList)
 
         if certCache is None:
@@ -802,9 +808,10 @@
         return self.active
 
     def addMessages(self, messages):
-        """Given a list of messages and handles, as given to
+        """Given a list of messages and control strings, as given to
            MMTPServer.__init__, cause this connection to deliver that new
-           set of messages after it's done with those it's currently sending.
+           set of messages after it's done with those it's currently
+           sending.
         """
         assert self.active
         for m in messages:
@@ -866,7 +873,7 @@
         if not self.messageList:
             self.shutdown(0)
             return
-
+        
         self._getNextMessage()
         msg = self._curMessage
         if msg == 'RENEGOTIATE':
@@ -892,7 +899,8 @@
         self.finished = self.__sentMessage
 
     def _getNextMessage(self):
-        """DOCDOC"""
+        """Helper function: pull the next _curHandle, _curMessage pair from
+           self.messageList."""
         m = self.messageList[0]
         del self.messageList[0]
         if hasattr(m, 'getContents'):
@@ -1039,7 +1047,11 @@
         self.listener.shutdown()
 
     def sendMessages(self, ip, port, keyID, deliverable):
-        """Begin sending a set of messages to a given server."""
+        """Begin sending a set of messages to a given server.
+
+           deliverable is a list of objects obeying the DeliverableMessage
+           lsit.
+        """
 
         try:
             # Is there an existing connection open to the right server?

Index: ServerKeys.py
===================================================================
RCS file: /home/minion/cvsroot/src/minion/lib/mixminion/server/ServerKeys.py,v
retrieving revision 1.44
retrieving revision 1.45
diff -u -d -r1.44 -r1.45
--- ServerKeys.py	10 Jun 2003 10:40:21 -0000	1.44
+++ ServerKeys.py	26 Jun 2003 03:23:53 -0000	1.45
@@ -352,8 +352,9 @@
         self.checkKeys()
 
     def getDeadKeys(self,now=None):
-        """DOCDOC
-           doesn't checkKeys.
+        """Helper function: return a list of (informative-message, keyset
+           object) for each expired keyset in the keystore.  Does not rescan
+           the keystore or remove dead keys.
         """
         if now is None:
             now = time.time()
@@ -580,6 +581,7 @@
         files = [self.packetKeyFile,
                  self.mmtpKeyFile,
                  self.certFile,
+                 self.certFile+"_tmp",
                  self.descFile,
                  self.publishedFile,
                  self.hashlogFile ]

Index: ServerMain.py
===================================================================
RCS file: /home/minion/cvsroot/src/minion/lib/mixminion/server/ServerMain.py,v
retrieving revision 1.80
retrieving revision 1.81
diff -u -d -r1.80 -r1.81
--- ServerMain.py	13 Jun 2003 01:03:46 -0000	1.80
+++ ServerMain.py	26 Jun 2003 03:23:53 -0000	1.81
@@ -729,6 +729,25 @@
         finally:
             if lock: self.keyring.unlock()
 
+    def scheduleRecurringComplexBackground(self, first, name, cb):
+        """DOCDOC"""
+        #????005 Is this worth it?
+        backgroundJob = [None]
+        scheduler = [None]
+        
+        def _bg(cb=cb, self=self, scheduler=scheduler, name=name):
+            next = cb()
+            if next is not None:
+                self.scheduleOnce(next, name, scheduler[0])
+
+        def _scheduler(backgroundJob=backgroundJob, self=self):
+            self.processingThread.addJob(backgroundJob[0])
+
+        backgroundJob[0] = _bg
+        scheduler[0] = _scheduler
+
+        self.scheduleOnce(first,name,_scheduler)
+
     def generateKeys(self):
         """Callback used to schedule key-generation"""
 
@@ -791,8 +810,8 @@
 
         nextMix = self.mixPool.getNextMixTime(now)
         LOG.debug("First mix at %s", formatTime(nextMix,1))
-        self.scheduleOnce(self.mixPool.getNextMixTime(now),
-                          "MIX", self.doMix)
+        self.scheduleRecurringComplex(self.mixPool.getNextMixTime(now),
+                                      "MIX", self.doMix)
 
         LOG.info("Entering main loop: Mixminion %s", mixminion.__version__)
 
@@ -868,11 +887,10 @@
         # Send exit messages
         self.moduleManager.sendReadyMessages()
 
-        #XXXX005 use new schedulerecurringcomplex interface
         # Choose next mix interval
         nextMix = self.mixPool.getNextMixTime(now)
-        self.scheduleOnce(nextMix, "MIX", self.doMix)
         LOG.trace("Next mix at %s", formatTime(nextMix,1))
+        return nextMix
 
     def cleanQueues(self):
         """Remove all deleted messages from queues"""

Index: ServerQueue.py
===================================================================
RCS file: /home/minion/cvsroot/src/minion/lib/mixminion/server/ServerQueue.py,v
retrieving revision 1.23
retrieving revision 1.24
diff -u -d -r1.23 -r1.24
--- ServerQueue.py	13 Jun 2003 01:03:46 -0000	1.23
+++ ServerQueue.py	26 Jun 2003 03:23:53 -0000	1.24
@@ -294,7 +294,9 @@
     #    inserted into the queue.
     # lastAttempt: The most recent time at which we attempted to
     #    deliver the message. (None means 'never').
-    # address: Pickleable object holding address information. DOCDOC
+    # address: Pickleable object holding address information.  Delivery
+    #    code uses this field to group messages by address before loading
+    #    them all from disk.
     def __init__(self, queuedTime=None, lastAttempt=None, address=None):
         """Create a new _DeliveryState for a message received at
            queuedTime (default now), whose last delivery attempt was
@@ -303,7 +305,7 @@
             queuedTime = time.time()
         self.queuedTime = queuedTime
         self.lastAttempt = lastAttempt
-        self.address = None
+        self.address = address
 
     def __getstate__(self):
         # For pickling.  All future versions of deliverystate will pickle
@@ -318,6 +320,7 @@
             self.address = state[3]
         elif state[0] == "V0":
             #XXXX006 remove this case.
+            # 0.0.4 used a format that didn't have an 'address' field.
             self.queuedTime = state[1]
             self.lastAttempt = state[2]
             self.address = None
@@ -362,7 +365,16 @@
         self.lastAttempt = when
 
 class PendingMessage:
-    """DOCDOC"""
+    """PendingMessage represents a message in a DeliveryQueue, for delivery
+       to a specific address.  See DeliveryQueue._deliverMessages for more
+       information about the interface."""
+    ##
+    # queue: the deliveryqueue holding this message
+    # handle: the handle for this message in the queue
+    # address: The address passed to queueDeliveryMessage for this message,
+    #     or None
+    # message: The object queued as this message, or None if the object
+    #     has not yet been loaded.
     def __init__(self, handle, queue, address, message=None):
         self.handle = handle
         self.queue = queue
@@ -376,19 +388,24 @@
         return self.handle
 
     def succeeded(self):
+        """Mark this message as having been successfully deleted, removing
+           it from the queue."""
         self.queue.deliverySucceeded(self.handle)
         self.queue = self.message = None
 
     def failed(self, retriable=0, now=None):
+        """Mark this message as has having failed delivery, either rescheduling
+           it or removing it from the queue."""
         self.queue.deliveryFailed(self.handle, retriable, now=now)
         self.queue = self.message = None
 
     def getMessage(self):
+        """Return the underlying object stored in the delivery queue, loading
+           it from disk if necessary."""
         if self.message is None:
             self.message = self.queue.getObject(self.handle)
         return self.message
 
-
 class DeliveryQueue(Queue):
     """A DeliveryQueue implements a queue that greedily sends messages to
        outgoing streams that occasionally fail.  All underlying messages
@@ -644,14 +661,13 @@
         self._repOk()
 
     def _deliverMessages(self, msgList):
-        """Abstract method; Invoked with a list of (handle, message)
-           tuples every time we have a batch of messages to send.
-
-           For every handle in the list, delierySucceeded or deliveryFailed
-           should eventually be called, or the message will sit in the queue
-           indefinitely, without being retried."""
+        """Abstract method; Invoked with a list of PendingMessage objects
+           every time we have a batch of messages to send.
 
-        #DOCDOC
+           For every PendingMessage object on the list, the object's
+           .succeeded() or .failed() method should eventually be called, or
+           the message will sit in the queue indefinitely, without being
+           retried."""
 
         # We could implement this as a single _deliverMessage(h,addr)
         # method, but that wouldn't allow implementations to batch