[Author Prev][Author Next][Thread Prev][Thread Next][Author Index][Thread Index]

[minion-cvs] Look, we"re alpha again! Time for some delayed perform...



Update of /home/minion/cvsroot/src/minion/lib/mixminion/server
In directory moria.mit.edu:/tmp/cvs-serv6726/lib/mixminion/server

Modified Files:
	MMTPServer.py Modules.py ServerMain.py ServerQueue.py 
Log Message:
Look, we're alpha again!  Time for some delayed performance work.

(Some of this still needs docs and tests)

Common, ServerMain:
- To make the server shut down faster, abandon pending work in the 
  processing and cleaning threads when we're shutting down.

  (Formerly, if we got behind cleaning files or processing messages,
  we'd wait to finish before we shut down.  Now we shut down almost
  immediately.  This is safe, because the code is written to be safe
  in the presence of SIGKILL's anyway.)

test, MMTPServer, Modules, ServerMain
- When we had N messages to deliver, we used to suck all N into
  memory.  Clearly, this was a pretty rotten idea: instead, we now
  lazy-load messages as they're needed.  This makes the mixminion
  server use even less memory under load than it did before.

  The code is a little hairy, but it seems to work for me.  It still
  needs documentation.



Index: MMTPServer.py
===================================================================
RCS file: /home/minion/cvsroot/src/minion/lib/mixminion/server/MMTPServer.py,v
retrieving revision 1.39
retrieving revision 1.40
diff -u -d -r1.39 -r1.40
--- MMTPServer.py	6 Jun 2003 07:17:35 -0000	1.39
+++ MMTPServer.py	13 Jun 2003 01:03:46 -0000	1.40
@@ -32,6 +32,7 @@
 from mixminion.Crypto import sha1, getCommonPRNG
 from mixminion.Packet import MESSAGE_LEN, DIGEST_LEN
 from mixminion.MMTPClient import PeerCertificateCache
+import mixminion.server.EventStats as EventStats
 
 __all__ = [ 'AsyncServer', 'ListenConnection', 'MMTPServerConnection',
             'MMTPClientConnection' ]
@@ -698,6 +699,27 @@
 
 NULL_KEYID = "\x00"*20
 
+class DeliverableMessage:
+    def __init__(self):
+        pass
+    def getContents(self):
+        raise NotImplementedError
+    def succeeded(self):
+        raise NotImplementedError
+    def failed(self,retriable=0):
+        raise NotImplementedError
+
+class DeliverablePacket(DeliverableMessage):
+    def __init__(self, pending):
+        DeliverableMessage.__init__(self)
+        self.pending = pending
+    def succeeded(self):
+        self.pending.succeeded()
+    def failed(self,retriable=0):
+        self.pending.failed(retriable=retriable)
+    def getContents(self):
+        return self.pending.getMessage().getPacket()
+
 class MMTPClientConnection(SimpleTLSConnection):
     """Asynchronious implementation of the sending ("client") side of a
        mixminion connection."""
@@ -721,9 +743,8 @@
     # active: Are we currently able to send messages to a server?  Boolean.
 
     PROTOCOL_VERSIONS = [ '0.3' ]
-    def __init__(self, context, ip, port, keyID, messageList, handleList,
-                 sentCallback=None, failCallback=None, finishedCallback=None,
-                 certCache=None):
+    def __init__(self, context, ip, port, keyID, messageList,
+                 finishedCallback=None, certCache=None):
         """Create a connection to send messages to an MMTP server.
            Raises socket.error if the connection fails.
 
@@ -733,12 +754,6 @@
            messageList -- a list of message payloads and control strings.
                The control string "JUNK" sends 32KB of padding; the control
                string "RENEGOTIATE" renegotiates the connection key.
-           handleList -- a list of objects corresponding to the entries in
-              messageList.  Used for callback.
-           sentCallback -- None, or a function of (msg, handle) to be called
-              whenever a message is successfully sent.
-           failCallback -- None, or a function of (msg, handle, retriable)
-              to be called when messages can't be sent.
            finishedCallback -- None, or a function to be called when this
               connection is closed.
            certCache -- an instance of PeerCertificateCache to use for
@@ -746,11 +761,12 @@
         """
         # Generate junk before connecting to avoid timing attacks
         self.junk = []
+        #DOCDOC
         self.messageList = []
-        self.handleList = []
         self.active = 1
 
-        self.addMessages(messageList, handleList)
+        #DOCDOC
+        self.addMessages(messageList)
 
         if certCache is None:
             certCache = PeerCertificateCache()
@@ -773,8 +789,6 @@
 
         SimpleTLSConnection.__init__(self, sock, tls, 0, "%s:%s"%(ip,port))
         self.finished = self.__setupFinished
-        self.sentCallback = sentCallback
-        self.failCallback = failCallback
         self.finishedCallback = finishedCallback
         self.protocol = None
         self._curMessage = self._curHandle = None
@@ -787,21 +801,16 @@
            the connection is currently shutting down."""
         return self.active
 
-    def addMessages(self, messages, handles):
+    def addMessages(self, messages):
         """Given a list of messages and handles, as given to
            MMTPServer.__init__, cause this connection to deliver that new
            set of messages after it's done with those it's currently sending.
         """
         assert self.active
-        assert len(messages) == len(handles)
-        for m,h in zip(messages, handles):
-            if m in ("JUNK", "RENEGOTIATE"):
-                assert h is None
         for m in messages:
             if m == "JUNK":
                 self.junk.append(getCommonPRNG().getBytes(MESSAGE_LEN))
         self.messageList.extend(messages)
-        self.handleList.extend(handles)
 
     def getAddr(self):
         """Return an (ip,port,keyID) tuple for this connection"""
@@ -858,10 +867,8 @@
             self.shutdown(0)
             return
 
-        msg = self._curMessage = self.messageList[0]
-        self._curHandle = self.handleList[0]
-        del self.messageList[0]
-        del self.handleList[0]
+        self._getNextMessage()
+        msg = self._curMessage
         if msg == 'RENEGOTIATE':
             self.finished = self.beginNextMessage
             self.startRenegotiate()
@@ -874,15 +881,27 @@
             msg = JUNK_CONTROL+msg+sha1(msg+"JUNK")
             self.isJunk = 1
         else:
+            EventStats.log.attemptedRelay() #FFFF addr
             self.expectedDigest = sha1(msg+"RECEIVED")
             self.rejectDigest = sha1(msg+"REJECTED")
             msg = SEND_CONTROL+msg+sha1(msg+"SEND")
             self.isJunk = 0
-
+        
         assert len(msg) == SEND_RECORD_LEN
         self.beginWrite(msg)
         self.finished = self.__sentMessage
 
+    def _getNextMessage(self):
+        """DOCDOC"""
+        m = self.messageList[0]
+        del self.messageList[0]
+        if hasattr(m, 'getContents'):
+            self._curHandle = m
+            self._curMessage = m.getContents()
+        else:
+            self._curHandle = None
+            self._curMessage = m
+
     def __sentMessage(self):
         """Called when we're done sending a message.  Begins reading the
            server's ACK."""
@@ -915,11 +934,12 @@
            debug("Received valid ACK for message from %s", self.address)
 
        if not self.isJunk:
-           if not rejected and self.sentCallback is not None:
-               self.sentCallback(self._curMessage, self._curHandle)
-           elif rejected and self.failCallback is not None:
-               self.failCallback(self._curMessage, self._curHandle,
-                                 retriable=1)
+           if not rejected:
+               self._curHandle.succeeded()
+               EventStats.log.successfulRelay() #FFFF addr
+           else:
+               self._curHandle.failed(retriable=1)
+               EventStats.log.failedRelay() #FFFF addr
 
        self._curMessage = self._curHandle = None
 
@@ -927,14 +947,20 @@
 
     def handleFail(self, retriable):
         """Invoked when we shutdown with an error."""
-        if self.failCallback is not None:
-            if self._curHandle is not None:
-                self.failCallback(self._curMessage, self._curHandle, retriable)
-            for msg, handle in zip(self.messageList, self.handleList):
-                if handle is None:
-                    continue
-                self.failCallback(msg,handle,retriable)
-        self._messageList = self.handleList = []
+        if retriable:
+            statFn = EventStats.log.failedRelay
+        else:
+            statFn = EventStats.log.unretriableRelay
+        if self._curHandle is not None:
+            self._curHandle.failed(retriable)
+            statFn()
+        for msg in self.messageList:
+            try:
+                msg.failed(retriable)
+                statFn()
+            except AttributeError:
+                pass
+        self._messageList = []
         self._curMessage = self._curHandle = None
 
     def shutdown(self, err=0, retriable=0):
@@ -946,8 +972,6 @@
         if self.finishedCallback is not None:
             self.finishedCallback()
         self.finishedCallback = None
-        self.failCallback = None
-        self.sentCallback = None
         
         SimpleTLSConnection.remove(self)
 
@@ -1014,24 +1038,17 @@
     def stopListening(self):
         self.listener.shutdown()
 
-    def sendMessages(self, ip, port, keyID, messages, handles):
+    def sendMessages(self, ip, port, keyID, deliverable):
         """Begin sending a set of messages to a given server."""
 
-        for m,h in zip(messages, handles):
-            if m in ("JUNK", "RENEGOTIATE"):
-                assert h is None
-                continue
-            assert len(m) == MESSAGE_LEN
-            assert len(h) < 32
-
         try:
             # Is there an existing connection open to the right server?
             con = self.clientConByAddr[(ip,port,keyID)]
             # If so, is that connection currently sending messages?
             if con.isActive():
                 LOG.debug("Queueing %s messages on open connection to %s",
-                          len(messages), con.address)
-                con.addMessages(messages, handles)
+                          len(deliverable), con.address)
+                con.addMessages(deliverable)
                 return
         except KeyError:
             pass
@@ -1041,9 +1058,7 @@
             addr = (ip, port, keyID)
             finished = lambda addr=addr, self=self: self.__clientFinished(addr)
             con = MMTPClientConnection(self.context,
-                                     ip, port, keyID, messages, handles,
-                                     sentCallback=self.onMessageSent,
-                                     failCallback=self.onMessageUndeliverable,
+                                     ip, port, keyID, deliverable,
                                      finishedCallback=finished,
                                      certCache=self.certificateCache)
             con.register(self)
@@ -1053,8 +1068,11 @@
         except socket.error, e:
             LOG.error("Unexpected socket error connecting to %s:%s: %s",
                       ip, port, e)
-            for m,h in zip(messages, handles):
-                self.onMessageUndeliverable(m,h,1)
+            for m in messages:
+                try:
+                    m.failed(1)
+                except AttributeError:
+                    pass
 
     def __clientFinished(self, addr):
         """Called when a client connection runs out of messages to send."""
@@ -1068,12 +1086,4 @@
         """Abstract function.  Called when we get a message"""
         pass
 
-    def onMessageUndeliverable(self, msg, handle, retriable):
-        """Abstract function: Called when an attempt to deliver a
-           message fails."""
-        pass
 
-    def onMessageSent(self, msg, handle):
-        """Abstract function: Called when an attempt to deliver a
-           message succeeds."""
-        pass

Index: Modules.py
===================================================================
RCS file: /home/minion/cvsroot/src/minion/lib/mixminion/server/Modules.py,v
retrieving revision 1.42
retrieving revision 1.43
diff -u -d -r1.42 -r1.43
--- Modules.py	5 Jun 2003 18:41:40 -0000	1.42
+++ Modules.py	13 Jun 2003 01:03:46 -0000	1.43
@@ -172,29 +172,30 @@
         
 
     def _deliverMessages(self, msgList):
-        for handle, packet in msgList:
+        for handle in msgList:
             try:
                 EventStats.log.attemptedDelivery() #FFFF
+                packet = handle.getMessage()
                 result = self.module.processMessage(packet)
                 if result == DELIVER_OK:
                     LOG.debug("Successfully delivered message MOD:%s", handle)
-                    self.deliverySucceeded(handle)
+                    handle.succeeded()
                     EventStats.log.successfulDelivery() #FFFF
                 elif result == DELIVER_FAIL_RETRY:
                     LOG.debug("Unable to deliver message MOD:%s; will retry",
                               handle)
-                    self.deliveryFailed(handle, 1)
+                    handle.failed(1)
                     EventStats.log.failedDelivery() #FFFF
                 else:
                     assert result == DELIVER_FAIL_NORETRY
                     LOG.error("Unable to deliver message MOD:%s; giving up",
                               handle)
-                    self.deliveryFailed(handle, 0)
+                    handle.failed(0)
                     EventStats.log.unretriableDelivery() #FFFF
             except:
                 LOG.error_exc(sys.exc_info(),
                                    "Exception delivering message")
-                self.deliveryFailed(handle, 0)
+                handle.failed(0)
                 EventStats.log.unretriableDelivery() #FFFF
 
 class DeliveryThread(threading.Thread):

Index: ServerMain.py
===================================================================
RCS file: /home/minion/cvsroot/src/minion/lib/mixminion/server/ServerMain.py,v
retrieving revision 1.79
retrieving revision 1.80
diff -u -d -r1.79 -r1.80
--- ServerMain.py	12 Jun 2003 04:07:54 -0000	1.79
+++ ServerMain.py	13 Jun 2003 01:03:46 -0000	1.80
@@ -48,7 +48,7 @@
 # We pull this from mixminion.Common, just in case somebody still has
 # a copy of the old "mixminion/server/Queue.py" (since renamed to
 # ServerQueue.py)
-from mixminion.Common import MessageQueue
+from mixminion.Common import MessageQueue, ClearableQueue, QueueEmpty
 
 import mixminion.Config
 import mixminion.Crypto
@@ -256,16 +256,13 @@
 
         for h in handles:
             packet = self.queue.getObject(h)
-            if type(packet) == type(()):
-                #XXXX005 remove this case.
-                LOG.error("  (skipping message MIX:%s in obsolete format)", h)
-            elif packet.isDelivery():
+            if packet.isDelivery():
                 h2 = self.moduleManager.queueDecodedMessage(packet)
                 LOG.debug("  (sending message MIX:%s to exit modules as MOD:%s)"
                           , h, h2)
-
             else:
-                h2 = self.outgoingQueue.queueDeliveryMessage(packet)
+                address = packet.getAddress()
+                h2 = self.outgoingQueue.queueDeliveryMessage(packet, address)
                 LOG.debug("  (sending message MIX:%s to MMTP server as OUT:%s)"
                           , h, h2)
             # In any case, we're through with this message now.
@@ -312,30 +309,31 @@
         "Implementation of abstract method from DeliveryQueue."
         # Map from addr -> [ (handle, msg) ... ]
         msgs = {}
-        for handle, packet in msgList:
-            if not isinstance(packet,
-                              mixminion.server.PacketHandler.RelayedPacket):
-                LOG.warn("Skipping packet OUT:%s in obsolete format", handle)
-                self.deliverySucceeded(handle)
-                continue
-            addr = packet.getAddress()
-            message = packet.getPacket()
-            msgs.setdefault(addr, []).append( (handle, message) )
+        for pending in msgList:
+            addr = pending.getAddress()
+            if addr is None:
+                addr = pending.getMessage().getAddress()
+            msgs.setdefault(addr, []).append(pending)
         for addr, messages in msgs.items():
             if self.addr[:2] == (addr.ip, addr.port):
                 if self.addr[2] != addr.keyinfo:
                     LOG.warn("Delivering messages to myself with bad KeyID")
-                for h,m in messages:
-                    LOG.trace("Delivering message OUT:%s to myself.", h)
-                    self.incomingQueue.queueMessage(m)
-                    self.deliverySucceeded(h)
+                for pending in messages:
+                    LOG.trace("Delivering message OUT:%s to myself.",
+                              pending.getHandle())
+                    self.incomingQueue.queueMessage(
+                        pending.getMessage().getPacket())
+                    pending.succeeded()
                 continue
 
-            handles, messages = zip(*messages)
+            deliverable = [
+                mixminion.server.MMTPServer.DeliverablePacket(pending)
+                for pending in messages ]
             LOG.trace("Delivering messages OUT:[%s] to %s:%s",
-                      " ".join(handles), addr.ip,addr.port)
+                      " ".join([p.getHandle() for p in messages]),
+                      addr.ip, addr.port)
             self.server.sendMessages(addr.ip, addr.port, addr.keyinfo,
-                                     list(messages), list(handles))
+                                     deliverable)
 
 class _MMTPServer(mixminion.server.MMTPServer.MMTPAsyncServer):
     """Implementation of mixminion.server.MMTPServer that knows about
@@ -358,19 +356,6 @@
         # FFFF Replace with server.
         EventStats.log.receivedPacket()
 
-    def onMessageSent(self, msg, handle):
-        self.outgoingQueue.deliverySucceeded(handle)
-        EventStats.log.attemptedRelay() # FFFF replace with addr
-        EventStats.log.successfulRelay() # FFFF replace with addr
-
-    def onMessageUndeliverable(self, msg, handle, retriable):
-        self.outgoingQueue.deliveryFailed(handle, retriable)
-        EventStats.log.attemptedRelay() # FFFF replace with addr
-        if retriable:
-            EventStats.log.failedRelay() # FFFF replace with addr
-        else:
-            EventStats.log.unretriableRelay() # FFFF replace with addr
-
 #----------------------------------------------------------------------
 class CleaningThread(threading.Thread):
     """Thread that handles file deletion.  Some methods of secure deletion
@@ -378,11 +363,11 @@
        main thread.
     """
     # Fields:
-    #   mqueue: A MessageQueue holding filenames to delete, or None to indicate
+    #   mqueue: A ClearableQueue holding filenames to delete, or None to indicate
     #     a shutdown.
     def __init__(self):
         threading.Thread.__init__(self)
-        self.mqueue = MessageQueue()
+        self.mqueue = ClearableQueue()
 
     def deleteFile(self, fname):
         """Schedule the file named 'fname' for deletion"""
@@ -399,6 +384,7 @@
         """Tell this thread to shut down once it has deleted all pending
            files."""
         LOG.info("Telling cleanup thread to shut down.")
+        self.mqueue.clear()
         self.mqueue.put(None)
 
     def run(self):
@@ -425,7 +411,7 @@
 
        Currently used to process packets in the background."""
     # Fields:
-    #   mqueue: a MessageQueue of callable objects.
+    #   mqueue: a ClearableQueue of callable objects.
     class _Shutdown:
         """Callable that raises itself when called.  Inserted into the
            queue when it's time to shut down."""
@@ -433,12 +419,13 @@
             raise self
 
     def __init__(self):
-        """Given a MessageQueue object, create a new processing thread."""
+        """Create a new processing thread."""
         threading.Thread.__init__(self)
-        self.mqueue = MessageQueue()
+        self.mqueue = ClearableQueue()
 
     def shutdown(self):
         LOG.info("Telling processing thread to shut down.")
+        self.mqueue.clear()
         self.mqueue.put(ProcessingThread._Shutdown())
 
     def addJob(self, job):

Index: ServerQueue.py
===================================================================
RCS file: /home/minion/cvsroot/src/minion/lib/mixminion/server/ServerQueue.py,v
retrieving revision 1.22
retrieving revision 1.23
diff -u -d -r1.22 -r1.23
--- ServerQueue.py	6 Jun 2003 06:08:40 -0000	1.22
+++ ServerQueue.py	13 Jun 2003 01:03:46 -0000	1.23
@@ -12,7 +12,6 @@
 import sys
 import cPickle
 import threading
-import types
 
 from mixminion.Common import MixError, MixFatalError, secureDelete, LOG, \
      createPrivateDir, readPickled, writePickled, formatTime, readFile
@@ -287,7 +286,6 @@
         finally:
             self._lock.release()
 
-
 class _DeliveryState:
     """Helper class: holds the state needed to schedule delivery or
        eventual abandonmont of a message in a DeliveryQueue."""
@@ -296,7 +294,8 @@
     #    inserted into the queue.
     # lastAttempt: The most recent time at which we attempted to
     #    deliver the message. (None means 'never').
-    def __init__(self, queuedTime=None, lastAttempt=None):
+    # address: Pickleable object holding address information. DOCDOC
+    def __init__(self, queuedTime=None, lastAttempt=None, address=None):
         """Create a new _DeliveryState for a message received at
            queuedTime (default now), whose last delivery attempt was
            at lastAttempt (default never)."""
@@ -304,17 +303,24 @@
             queuedTime = time.time()
         self.queuedTime = queuedTime
         self.lastAttempt = lastAttempt
+        self.address = None
 
     def __getstate__(self):
         # For pickling.  All future versions of deliverystate will pickle
         #   to a tuple, whose first element will be a version string.
-        return ("V0", self.queuedTime, self.lastAttempt)
+        return ("V1", self.queuedTime, self.lastAttempt, self.address)
 
     def __setstate__(self, state):
         # For pickling.
-        if state[0] == "V0":
+        if state[0] == "V1":
             self.queuedTime = state[1]
             self.lastAttempt = state[2]
+            self.address = state[3]
+        elif state[0] == "V0":
+            #XXXX006 remove this case.
+            self.queuedTime = state[1]
+            self.lastAttempt = state[2]
+            self.address = None
         else:
             raise MixFatalError("Unrecognized delivery state")
 
@@ -355,6 +361,34 @@
         """Update time of the last attempted delivery."""
         self.lastAttempt = when
 
+class PendingMessage:
+    """DOCDOC"""
+    def __init__(self, handle, queue, address, message=None):
+        self.handle = handle
+        self.queue = queue
+        self.address = address
+        self.message = message
+
+    def getAddress(self):
+        return self.address
+
+    def getHandle(self):
+        return self.handle
+
+    def succeeded(self):
+        self.queue.deliverySucceeded(self.handle)
+        self.queue = self.message = None
+
+    def failed(self, retriable=0, now=None):
+        self.queue.deliveryFailed(self.handle, retriable, now=now)
+        self.queue = self.message = None
+
+    def getMessage(self):
+        if self.message is None:
+            self.message = self.queue.getObject(self.handle)
+        return self.message
+
+
 class DeliveryQueue(Queue):
     """A DeliveryQueue implements a queue that greedily sends messages to
        outgoing streams that occasionally fail.  All underlying messages
@@ -456,16 +490,6 @@
                 self.deliveryState[h] = readPickled(fn)
             else:
                 LOG.warn("No metadata for file handle %s", h)
-                obj = self.getObject(h)
-                #XXXX005 remove this.
-                if isinstance(obj, types.TupleType) and len(obj) == 4:
-                    # This message is in an obsolete format from 0.0.3.
-                    # We'd repair it, but packets from 0.0.3 are incompatible
-                    # anyway.
-                    LOG.info("Removing item %s in obsolete format", h)
-                    self.removeMessage(h)
-                    continue
-                
                 self.deliveryState[h] = _DeliveryState()
                 self._writeState(h)
 
@@ -534,7 +558,7 @@
     def queueMessage(self, msg):
         if 1: raise MixError("Tried to call DeliveryQueue.queueMessage.")
 
-    def queueDeliveryMessage(self, msg, now=None):
+    def queueDeliveryMessage(self, msg, address=None, now=None):
         """Schedule a message for delivery.
              msg -- the message.  This can be any pickleable object.
         """
@@ -543,7 +567,7 @@
             self._lock.acquire()
             handle = self.queueObject(msg)
             self.sendable.append(handle)
-            ds = self.deliveryState[handle] = _DeliveryState(now)
+            ds = self.deliveryState[handle] = _DeliveryState(now,None,address)
             self.nextAttempt[handle] = \
                      ds.getNextAttempt(self.retrySchedule, now)
             LOG.trace("ServerQueue got message %s for %s",
@@ -602,7 +626,12 @@
                     self.removeMessage(h)
                 elif next <= now:
                     LOG.trace("     [%s] is ready for delivery", h)
-                    messages.append( (h, self.getObject(h)) )
+                    state = self.deliveryState.get(h)
+                    if state is None:
+                        addr = None
+                    else:
+                        addr = state.address
+                    messages.append(PendingMessage(h,self,addr))
                     self.pending[h] = now
                 else:
                     LOG.trace("     [%s] is not yet ready for redelivery", h)
@@ -614,10 +643,6 @@
             self._deliverMessages(messages)
         self._repOk()
 
-    # FFFF005 This interface is inefficient in space: we don't need to load
-    # FFFF005 the messages to tell whether they need to be delivered.  We
-    # FFFF005 should have _deliverMessages() take a list of handles, not of
-    # FFFF005 messages.
     def _deliverMessages(self, msgList):
         """Abstract method; Invoked with a list of (handle, message)
            tuples every time we have a batch of messages to send.
@@ -625,6 +650,8 @@
            For every handle in the list, delierySucceeded or deliveryFailed
            should eventually be called, or the message will sit in the queue
            indefinitely, without being retried."""
+
+        #DOCDOC
 
         # We could implement this as a single _deliverMessage(h,addr)
         # method, but that wouldn't allow implementations to batch