[Author Prev][Author Next][Thread Prev][Thread Next][Author Index][Thread Index]

[minion-cvs] Improve logging, speed up tests, bulletproof queues.



Update of /home/minion/cvsroot/src/minion/lib/mixminion/server
In directory moria.mit.edu:/tmp/cvs-serv15090/lib/mixminion/server

Modified Files:
	MMTPServer.py Modules.py ServerMain.py ServerQueue.py 
Log Message:
Improve logging, speed up tests, bulletproof queues.

Also add a better linecounter (that's been sitting in my ~/bin for a
long time).



Index: MMTPServer.py
===================================================================
RCS file: /home/minion/cvsroot/src/minion/lib/mixminion/server/MMTPServer.py,v
retrieving revision 1.28
retrieving revision 1.29
diff -u -d -r1.28 -r1.29
--- MMTPServer.py	17 May 2003 00:08:44 -0000	1.28
+++ MMTPServer.py	26 May 2003 20:04:24 -0000	1.29
@@ -265,9 +265,9 @@
             self.__state = self.__connectFn
 
         if address is not None:
-            self.address = address
+            self.address = "%s (fd %s)" % (address, self.fd)
         else:
-            self.address = "remote host"
+            self.address = "remote host (fd %s)" % (address, self.fd)
 
     def isShutdown(self):
         """Returns true iff this connection is finished shutting down"""
@@ -411,7 +411,7 @@
 
     def tryTimeout(self, cutoff):
         if self.lastActivity <= cutoff:
-            warn("Socket %s to %s timed out", self.fd, self.address)
+            warn("Connection to %s timed out", self.address)
             # ????     I'm not sure this is right.  Instead of just killing
             # ???? the socket, should we shut down the SSL too?
             self.__server.unregister(self)
@@ -541,6 +541,7 @@
            because the disk is full."""
         SimpleTLSConnection.__init__(self, sock, tls, 1,
                                      "%s:%s"%sock.getpeername())
+        
         self.messageConsumer = consumer
         self.junkCallback = lambda : None
         self.rejectCallback = lambda : None
@@ -559,7 +560,7 @@
         """Called once we're done reading the protocol string.  Either
            rejects, or sends our response.
         """
-        trace("done w/ client sendproto (fd %s)", self.fd)
+        trace("done w/ client sendproto to %s", self.address)
         inp = self.getInput()
         m = PROTOCOL_RE.match(inp)
 
@@ -571,8 +572,7 @@
         protocols = m.group(1).split(",")
         for p in self.PROTOCOL_VERSIONS:
             if p in protocols:
-                trace("Using protocol %s with %s (fd %s)",
-                      p, self.address, self.fd)
+                trace("Using protocol %s with %s", p, self.address)
                 self.protocol = p
                 self.finished = self.__sentProtocol
                 self.beginWrite("MMTP %s\r\n"% p)
@@ -587,7 +587,7 @@
         """Called once we're done sending our protocol response.  Begins
            reading a packet from the line.
         """
-        trace("done w/ server sendproto (fd %s)", self.fd)
+        trace("done w/ server sendproto to %s", self.address)
         self.finished = self.__receivedMessage
         self.expectRead(SEND_RECORD_LEN)
 
@@ -637,7 +637,7 @@
     def __sentAck(self):
         """Called once we're done sending an ACK.  Begins reading a new
            message."""
-        debug("Send ACK for message from %s (fd %s)", self.address, self.fd)
+        debug("Send ACK for message from %s", self.address)
         self.finished = self.__receivedMessage
         self.expectRead(SEND_RECORD_LEN)
 
@@ -723,7 +723,7 @@
         self.protocol = None
         self._curMessage = self._curHandle = None
 
-        debug("Opening client connection to %s:%s (fd %s)", ip,port,self.fd)
+        debug("Opening client connection to %s", self.address)
 
     def addMessages(self, messages, handles):
         """Given a list of messages and handles, as given to
@@ -752,11 +752,12 @@
             self.certCache.check(self.getTLSConnection(), self.keyID,
                                  self.address)
         except MixProtocolError, e:
-            warn("%s.  Shutting down connection",e)
+            warn("Certificate error: %s.  Shutting down connection to %s",
+                 e, self.address)
             self.shutdown(err=1,retriable=1)
             return
         else:
-            debug("KeyID from %s is valid", self.address)
+            debug("KeyID is valid from %s", self.address)
 
         self.beginWrite("MMTP %s\r\n"%(",".join(self.PROTOCOL_VERSIONS)))
         self.finished = self.__sentProtocol
@@ -782,6 +783,7 @@
                 return
 
         warn("Invalid protocol.  Closing connection to %s", self.address)
+             
         # This isn't retriable; we don't talk to servers we don't
         # understand.
         self.shutdown(err=1,retriable=0)
@@ -822,7 +824,7 @@
         """Called when we're done sending a message.  Begins reading the
            server's ACK."""
 
-        debug("Message delivered to %s (fd %s)", self.address, self.fd)
+        debug("Message delivered to %s", self.address)
         self.finished = self.__receivedAck
         self.expectRead(RECEIVED_RECORD_LEN)
 
@@ -835,11 +837,11 @@
           If there are more messages to send, begins sending the next.
           Otherwise, begins shutting down.
        """
-       trace("received ack (fd %s)", self.fd)
+       trace("received ack from %s", self.address)
        inp = self.getInput()
        rejected = 0
        if inp == REJECTED_CONTROL+self.rejectDigest:
-           debug("Message rejected from %s (fd %s)", self.address, self.fd)
+           debug("Message rejected from %s", self.address)
            rejected = 1
        elif inp != (RECEIVED_CONTROL+self.expectedDigest):
            # We only get bad ACKs if an adversary somehow subverts TLS's
@@ -957,8 +959,8 @@
         try:
             # Is there an existing connection open to the right server?
             con = self.clientConByAddr[(ip,port,keyID)]
-            LOG.debug("Queueing %s messages on open connection to %s:%s",
-                      len(messages), ip, port)
+            LOG.debug("Queueing %s messages on open connection to %s",
+                      len(messages), con.address)
             con.addMessages(messages, handles)
             return
         except KeyError:
@@ -989,8 +991,8 @@
         try:
             del self.clientConByAddr[addr]
         except KeyError:
-            LOG.warn("Didn't find client connection to %s in address map",
-                     addr)
+            warn("Didn't find client connection to %s in address map",
+                 addr)
 
     def onMessageReceived(self, msg):
         """Abstract function.  Called when we get a message"""

Index: Modules.py
===================================================================
RCS file: /home/minion/cvsroot/src/minion/lib/mixminion/server/Modules.py,v
retrieving revision 1.37
retrieving revision 1.38
diff -u -d -r1.37 -r1.38
--- Modules.py	17 May 2003 00:08:45 -0000	1.37
+++ Modules.py	26 May 2003 20:04:25 -0000	1.38
@@ -150,6 +150,8 @@
                                "Exception delivering message")
             EventStats.log.unretriableDeliery() #XXXX
 
+        return "<nil>"
+
     def sendReadyMessages(self):
         # We do nothing here; we already delivered the messages
         pass
@@ -167,6 +169,7 @@
         mixminion.server.ServerQueue.DeliveryQueue.__init__(self, directory,
                                                             retrySchedule)
         self.module = module
+        
 
     def _deliverMessages(self, msgList):
         for handle, packet in msgList:
@@ -174,14 +177,18 @@
                 EventStats.log.attemptedDelivery() #XXXX
                 result = self.module.processMessage(packet)
                 if result == DELIVER_OK:
+                    LOG.debug("Successfully delivered message MOD:%s", handle)
                     self.deliverySucceeded(handle)
                     EventStats.log.successfulDelivery() #XXXX
                 elif result == DELIVER_FAIL_RETRY:
+                    LOG.debug("Unable to deliver message MOD:%s; will retry",
+                              handle)
                     self.deliveryFailed(handle, 1)
                     EventStats.log.failedDelivery() #XXXX
                 else:
                     assert result == DELIVER_FAIL_NORETRY
-                    LOG.error("Unable to deliver message")
+                    LOG.error("Unable to deliver message MOD:%s; giving up",
+                              handle)
                     self.deliveryFailed(handle, 0)
                     EventStats.log.unretriableDelivery() #XXXX
             except:
@@ -404,12 +411,12 @@
         if mod is None:
             LOG.error("Unable to handle message with unknown type %s",
                       exitType)
-            return
+            return "<nil>"
         queue = self.queues[mod.getName()]
         LOG.debug("Delivering message %r (type %04x) via module %s",
                   packet.getContents()[:8], exitType, mod.getName())
 
-        queue.queueDeliveryMessage(packet)
+        return queue.queueDeliveryMessage(packet)
 
     def shutdown(self):
         """Tell the delivery thread (if any) to stop."""

Index: ServerMain.py
===================================================================
RCS file: /home/minion/cvsroot/src/minion/lib/mixminion/server/ServerMain.py,v
retrieving revision 1.54
retrieving revision 1.55
diff -u -d -r1.54 -r1.55
--- ServerMain.py	23 May 2003 22:49:30 -0000	1.54
+++ ServerMain.py	26 May 2003 20:04:25 -0000	1.55
@@ -65,9 +65,8 @@
 
     def queueMessage(self, msg):
         """Add a message for delivery"""
-        LOG.trace("Inserted message %s into incoming queue",
-                  formatBase64(msg[:8]))
         h = mixminion.server.ServerQueue.Queue.queueMessage(self, msg)
+        LOG.trace("Inserting message IN:%s into incoming queue", h)
         assert h is not None
         self.processingThread.addJob(
             lambda self=self, h=h: self.__deliverMessage(h))
@@ -82,32 +81,29 @@
             res = ph.processMessage(message)
             if res is None:
                 # Drop padding before it gets to the mix.
-                LOG.debug("Padding message %s dropped",
-                          formatBase64(message[:8]))
+                LOG.debug("Padding message IN:%s dropped", handle)
                 self.removeMessage(handle)
             else:
                 if res.isDelivery():
                     res.decode()
-                LOG.debug("Processed message %s; inserting into pool",
-                          formatBase64(message[:8]))
-                self.mixPool.queueObject(res)
+
+                h2 = self.mixPool.queueObject(res)
                 self.removeMessage(handle)
+                LOG.debug("Processed message IN:%s; inserting into mix pool as MIX:%s",
+                          handle, h2)
         except mixminion.Crypto.CryptoError, e:
-            LOG.warn("Invalid PK or misencrypted header in message %s: %s",
-                     formatBase64(message[:8]), e)
+            LOG.warn("Invalid PK or misencrypted header in message IN:%s: %s",
+                     handle, e)
             self.removeMessage(handle)
         except mixminion.Packet.ParseError, e:
-            LOG.warn("Malformed message %s dropped: %s",
-                     formatBase64(message[:8]), e)
+            LOG.warn("Malformed message IN:%s dropped: %s", handle, e)
             self.removeMessage(handle)
         except mixminion.server.PacketHandler.ContentError, e:
-            LOG.warn("Discarding bad packet %s: %s",
-                     formatBase64(message[:8]), e)
+            LOG.warn("Discarding bad packet IN:%s: %s", handle, e)
             self.removeMessage(handle)
         except:
             LOG.error_exc(sys.exc_info(),
-                    "Unexpected error when processing message %s (handle %s)",
-                          formatBase64(message[:8]), handle)
+                    "Unexpected error when processing IN:%s", handle)
             self.removeMessage(handle) # ???? Really dump this message?
 
 class MixPool:
@@ -156,7 +152,7 @@
 
     def queueObject(self, obj):
         """Insert an object into the pool."""
-        self.queue.queueObject(obj)
+        return self.queue.queueObject(obj)
 
     def count(self):
         "Return the number of messages in the pool"
@@ -182,15 +178,16 @@
             packet = self.queue.getObject(h)
             if type(packet) == type(()):
                 #XXXX005 remove this case.
-                LOG.error("  (skipping message %s in obsolete format)", h)
+                LOG.error("  (skipping message MIX:%s in obsolete format)", h)
             elif packet.isDelivery():
-                LOG.debug("  (sending message %s to exit modules)",
-                          formatBase64(packet.getContents()[:8]))
-                self.moduleManager.queueDecodedMessage(packet)
+                h2 = self.moduleManager.queueDecodedMessage(packet)
+                LOG.debug("  (sending message MIX:%s to exit modules as MOD:%s)"
+                          , h, h2)
+
             else:
-                LOG.debug("  (sending message %s to MMTP server)",
-                          formatBase64(packet.getPacket()[:8]))
-                self.outgoingQueue.queueDeliveryMessage(packet)
+                h2 = self.outgoingQueue.queueDeliveryMessage(packet)
+                LOG.debug("  (sending message MIX:%s to MMTP server as OUT:%s)"
+                          , h, h2)
             # In any case, we're through with this message now.
             self.queue.removeMessage(h)
 
@@ -238,7 +235,7 @@
         for handle, packet in msgList:
             if not isinstance(packet,
                               mixminion.server.PacketHandler.RelayedPacket):
-                LOG.warn("Skipping packet in obsolete format")
+                LOG.warn("Skipping packet OUT:%s in obsolete format", handle)
                 self.deliverySucceeded(handle)
                 continue
             addr = packet.getAddress()
@@ -249,13 +246,14 @@
                 if self.addr[2] != addr.keyinfo:
                     LOG.warn("Delivering messages to myself with bad KeyID")
                 for h,m in messages:
-                    LOG.trace("Delivering message %s to myself.",
-                              formatBase64(m[:8]))
+                    LOG.trace("Delivering message OUT:%s to myself.", h)
                     self.incomingQueue.queueMessage(m)
                     self.deliverySucceeded(h)
                 continue
 
             handles, messages = zip(*messages)
+            LOG.trace("Delivering messages OUT:[%s] to %s:%s",
+                      " ".join(handles), addr.ip,addr.port)
             self.server.sendMessages(addr.ip, addr.port, addr.keyinfo,
                                      list(messages), list(handles))
 
@@ -308,7 +306,7 @@
 
     def deleteFile(self, fname):
         """Schedule the file named 'fname' for deletion"""
-        LOG.trace("Scheduling %s for deletion", fname)
+        #LOG.trace("Scheduling %s for deletion", fname)
         assert fname is not None
         self.mqueue.put(fname)
 
@@ -334,7 +332,7 @@
                     LOG.info("Cleanup thread shutting down.")
                     return
                 if os.path.exists(fn):
-                    LOG.trace("Deleting %s", fn)
+                    #LOG.trace("Deleting %s", fn)
                     secureDelete(fn, blocking=1)
                 else:
                     LOG.warn("Delete thread didn't find file %s",fn)

Index: ServerQueue.py
===================================================================
RCS file: /home/minion/cvsroot/src/minion/lib/mixminion/server/ServerQueue.py,v
retrieving revision 1.14
retrieving revision 1.15
diff -u -d -r1.14 -r1.15
--- ServerQueue.py	17 May 2003 00:08:45 -0000	1.14
+++ ServerQueue.py	26 May 2003 20:04:27 -0000	1.15
@@ -151,7 +151,7 @@
         """Given a handle, removes the corresponding message from the queue."""
         self.__changeState(handle, "msg", "rmv") # handles locking.
 
-    def removeAll(self):
+    def removeAll(self, secureDeleteFn=None):
         """Removes all messages from this queue."""
         try:
             self._lock.acquire()
@@ -159,7 +159,7 @@
                 if m[:4] in ('inp_', 'msg_'):
                     self.__changeState(m[4:], m[:3], "rmv")
             self.n_entries = 0
-            self.cleanQueue()
+            self.cleanQueue(secureDeleteFn)
         finally:
             self._lock.release()
 
@@ -548,7 +548,8 @@
             ds = self.deliveryState[handle] = _DeliveryState(now)
             self.nextAttempt[handle] = \
                      ds.getNextAttempt(self.retrySchedule, now)
-            #self._saveState()
+            LOG.trace("ServerQueue got message %s for %s",
+                      handle, self.dir)
             self._writeState(handle)
         finally:
             self._lock.release()
@@ -633,28 +634,40 @@
         try:
             self._lock.acquire()
             Queue.removeMessage(self, handle)
-            for d in self.pending, self.deliveryState, self.nextAttempt:
-                try:
-                    del d[handle]
-                except KeyError:
-                    pass
+            try:
+                del self.pending[handle]
+                pending = 1
+            except KeyError:
+                pending = 0
+
+            try:
+                del self.deliveryState[handle]
+            except KeyError:
+                LOG.error("Removing message %s with no delivery state", handle)
+            try:
+                del self.nextAttempt[handle]
+            except KeyError:
+                LOG.error("Removing message %s with no nextAttempt", handle)
+
             try:
                 del self.sendable[self.sendable.index(handle)]
             except ValueError:
-                pass
+                if not pending:
+                    LOG.error("Removing message %s in neither "
+                              "'sendable' nor 'pending' list", handle)
 
             self._writeState(handle)
         finally:
             self._lock.release()
 
-    def removeAll(self):
+    def removeAll(self, secureDeleteFn=None):
         try:
             self._lock.acquire()
-            Queue.removeAll(self)
             for m in os.listdir(self.dir):
                 if m[:5] == 'meta_':
                     os.rename(os.path.join(self.dir, m),
                               os.path.join(self.dir, "rmv_"+m))
+            Queue.removeAll(self, secureDeleteFn)
             self.deliveryState = {}
             self.pending = {}
             self.nextAttempt = {}
@@ -708,7 +721,7 @@
                 ds.setLastAttempt(lastAttempt)
                 nextAttempt = ds.getNextAttempt(self.retrySchedule, now)
                 if nextAttempt is not None:
-                    LOG.trace("     (We'll %s try again at %s)", handle,
+                    LOG.trace("     (We'll try %s again at %s)", handle,
                               formatTime(nextAttempt, 1))
                     # There is another scheduled delivery attempt.  Remember
                     # it, mark the message sendable again, and save our state.