[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]
[minion-cvs] Move "decode exit message" logic into processing thread...
Update of /home/minion/cvsroot/src/minion/lib/mixminion/server
In directory moria.mit.edu:/tmp/cvs-serv24386/lib/mixminion/server
Modified Files:
Modules.py ServerMain.py
Log Message:
Move "decode exit message" logic into processing thread. This prevents a stall of several ms per exit message
Index: Modules.py
===================================================================
RCS file: /home/minion/cvsroot/src/minion/lib/mixminion/server/Modules.py,v
retrieving revision 1.22
retrieving revision 1.23
diff -u -d -r1.22 -r1.23
--- Modules.py 9 Jan 2003 06:28:58 -0000 1.22
+++ Modules.py 9 Jan 2003 18:54:01 -0000 1.23
@@ -372,6 +372,12 @@
# XXXX003 remove the more complex logic here into the PacketHandler
# XXXX003 code.
# FFFF Support non-exit messages.
+ (exitType, address, tag), message = \
+ self.decodeMessage(message, tag, exitType, address)
+ self.queueDecodedMessage((exitType, address, tag), message)
+
+ def queueDecodedMessage(self, (exitType, address, tag), message):
+ #DOCDOC
mod = self.typeToModule.get(exitType, None)
if mod is None:
LOG.error("Unable to handle message with unknown type %s",
@@ -380,23 +386,25 @@
queue = self.queues[mod.getName()]
LOG.debug("Delivering message %r (type %04x) via module %s",
message[:8], exitType, mod.getName())
+
+ queue.queueDeliveryMessage((exitType, address, tag), message)
+
+ def decodeMessage(self, message, tag, exitType, address):
payload = None
try:
payload = mixminion.BuildMessage.decodePayload(message, tag)
except CompressedDataTooLong:
contents = mixminion.Packet.parsePayload(message).getContents()
- queue.queueDeliveryMessage((exitType, address, 'long'), contents)
- return
+ return (exitType, address, 'long'), contents
except MixError:
- queue.queueDeliveryMessage((exitType, address, 'err'), message)
- return
+ return (exitType, address, 'err'), message
if payload is None:
- # enrypted message
- queue.queueDeliveryMessage((exitType, address, tag), message)
+ # encrypted message
+ return (exitType, address, tag), message
else:
# forward message
- queue.queueDeliveryMessage((exitType, address, None), payload)
+ return (exitType, address, None), payload
#DOCDOC
def shutdown(self):
Index: ServerMain.py
===================================================================
RCS file: /home/minion/cvsroot/src/minion/lib/mixminion/server/ServerMain.py,v
retrieving revision 1.29
retrieving revision 1.30
diff -u -d -r1.29 -r1.30
--- ServerMain.py 9 Jan 2003 17:44:00 -0000 1.29
+++ ServerMain.py 9 Jan 2003 18:54:01 -0000 1.30
@@ -46,14 +46,16 @@
mixminion.server.ServerQueue.Queue.__init__(self, location, create=1)
self.packetHandler = packetHandler
self.mixPool = None
+ self.moduleManager = None
self._queue = Queue.Queue()
for h in self.getAllMessages():
assert h is not None
self._queue.put(h)
- def connectQueues(self, mixPool):
+ def connectQueues(self, mixPool, manager):
"""Sets the target mix queue"""
self.mixPool = mixPool
+ self.moduleManager = manager #XXXX003 refactor.
def queueMessage(self, msg):
"""Add a message for delivery"""
@@ -75,6 +77,13 @@
LOG.debug("Padding message %s dropped",
formatBase64(message[:8]))
self.removeMessage(handle)
+ elif res[0] == 'EXIT':
+ # XXXX Ugly, refactor
+ rt, ri, app_key, tag, payload = res[1]
+ res = self.moduleManager.decodeMessage(payload, tag, rt, ri)
+ LOG.debug("Processed message %s; inserting into pool",
+ formatBase64(message[:8]))
+ self.mixPool.queueObject(('EXIT', res))
else:
LOG.debug("Processed message %s; inserting into pool",
formatBase64(message[:8]))
@@ -164,10 +173,11 @@
for h in handles:
tp, info = self.queue.getObject(h)
if tp == 'EXIT':
- rt, ri, app_key, tag, payload = info
+ (exitType, address, tag), payload = info
LOG.debug(" (sending message %s to exit modules)",
formatBase64(payload[:8]))
- self.moduleManager.queueMessage(payload, tag, rt, ri)
+ self.moduleManager.queueDecodedMessage((exitType,address,tag),
+ payload)
else:
assert tp == 'QUEUE'
ipv4, msg = info
@@ -394,7 +404,8 @@
self.outgoingQueue.count())
LOG.debug("Connecting queues")
- self.incomingQueue.connectQueues(mixPool=self.mixPool)
+ self.incomingQueue.connectQueues(mixPool=self.mixPool,
+ manager=self.moduleManager)
self.mixPool.connectQueues(outgoing=self.outgoingQueue,
manager=self.moduleManager)
self.outgoingQueue.connectQueues(server=self.mmtpServer)