[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[minion-cvs] Move "decode exit message" logic into processing thread...



Update of /home/minion/cvsroot/src/minion/lib/mixminion/server
In directory moria.mit.edu:/tmp/cvs-serv24386/lib/mixminion/server

Modified Files:
	Modules.py ServerMain.py 
Log Message:
Move "decode exit message" logic into processing thread.  This prevents a stall of several ms per exit message

Index: Modules.py
===================================================================
RCS file: /home/minion/cvsroot/src/minion/lib/mixminion/server/Modules.py,v
retrieving revision 1.22
retrieving revision 1.23
diff -u -d -r1.22 -r1.23
--- Modules.py	9 Jan 2003 06:28:58 -0000	1.22
+++ Modules.py	9 Jan 2003 18:54:01 -0000	1.23
@@ -372,6 +372,12 @@
         # XXXX003 remove the more complex logic here into the PacketHandler
         # XXXX003 code.
         # FFFF Support non-exit messages.
+        (exitType, address, tag), message = \
+                   self.decodeMessage(message, tag, exitType, address)
+        self.queueDecodedMessage((exitType, address, tag), message)
+
+    def queueDecodedMessage(self, (exitType, address, tag), message):
+        #DOCDOC
         mod = self.typeToModule.get(exitType, None)
         if mod is None:
             LOG.error("Unable to handle message with unknown type %s",
@@ -380,23 +386,25 @@
         queue = self.queues[mod.getName()]
         LOG.debug("Delivering message %r (type %04x) via module %s",
                        message[:8], exitType, mod.getName())
+
+        queue.queueDeliveryMessage((exitType, address, tag), message)
+
+    def decodeMessage(self, message, tag, exitType, address):
         payload = None
         try:
             payload = mixminion.BuildMessage.decodePayload(message, tag)
         except CompressedDataTooLong:
             contents = mixminion.Packet.parsePayload(message).getContents()
-            queue.queueDeliveryMessage((exitType, address, 'long'), contents)
-            return
+            return (exitType, address, 'long'), contents
         except MixError:
-            queue.queueDeliveryMessage((exitType, address, 'err'), message)
-            return
+            return (exitType, address, 'err'), message
 
         if payload is None:
-            # enrypted message
-            queue.queueDeliveryMessage((exitType, address, tag), message)
+            # encrypted message
+            return (exitType, address, tag), message
         else:
             # forward message
-            queue.queueDeliveryMessage((exitType, address, None), payload)
+            return (exitType, address, None), payload
 
     #DOCDOC
     def shutdown(self):

Index: ServerMain.py
===================================================================
RCS file: /home/minion/cvsroot/src/minion/lib/mixminion/server/ServerMain.py,v
retrieving revision 1.29
retrieving revision 1.30
diff -u -d -r1.29 -r1.30
--- ServerMain.py	9 Jan 2003 17:44:00 -0000	1.29
+++ ServerMain.py	9 Jan 2003 18:54:01 -0000	1.30
@@ -46,14 +46,16 @@
         mixminion.server.ServerQueue.Queue.__init__(self, location, create=1)
         self.packetHandler = packetHandler
         self.mixPool = None
+        self.moduleManager = None
         self._queue = Queue.Queue()
         for h in self.getAllMessages():
             assert h is not None
             self._queue.put(h)
 
-    def connectQueues(self, mixPool):
+    def connectQueues(self, mixPool, manager):
         """Sets the target mix queue"""
         self.mixPool = mixPool
+        self.moduleManager = manager #XXXX003 refactor.
 
     def queueMessage(self, msg):
         """Add a message for delivery"""
@@ -75,6 +77,13 @@
                 LOG.debug("Padding message %s dropped",
                           formatBase64(message[:8]))
                 self.removeMessage(handle)
+            elif res[0] == 'EXIT':
+                # XXXX Ugly, refactor
+                rt, ri, app_key, tag, payload = res[1]
+                res = self.moduleManager.decodeMessage(payload, tag, rt, ri)
+                LOG.debug("Processed message %s; inserting into pool",
+                          formatBase64(message[:8]))
+                self.mixPool.queueObject(('EXIT', res))
             else:
                 LOG.debug("Processed message %s; inserting into pool",
                           formatBase64(message[:8]))
@@ -164,10 +173,11 @@
         for h in handles:
             tp, info = self.queue.getObject(h)
             if tp == 'EXIT':
-                rt, ri, app_key, tag, payload = info
+                (exitType, address, tag), payload = info
                 LOG.debug("  (sending message %s to exit modules)",
                           formatBase64(payload[:8]))
-                self.moduleManager.queueMessage(payload, tag, rt, ri)
+                self.moduleManager.queueDecodedMessage((exitType,address,tag),
+                                                       payload)
             else:
                 assert tp == 'QUEUE'
                 ipv4, msg = info
@@ -394,7 +404,8 @@
                        self.outgoingQueue.count())
 
         LOG.debug("Connecting queues")
-        self.incomingQueue.connectQueues(mixPool=self.mixPool)
+        self.incomingQueue.connectQueues(mixPool=self.mixPool,
+                                         manager=self.moduleManager)
         self.mixPool.connectQueues(outgoing=self.outgoingQueue,
                                    manager=self.moduleManager)
         self.outgoingQueue.connectQueues(server=self.mmtpServer)