[Author Prev][Author Next][Thread Prev][Thread Next][Author Index][Thread Index]
[minion-cvs] Improve logging, speed up tests, bulletproof queues.
Update of /home/minion/cvsroot/src/minion/lib/mixminion/server
In directory moria.mit.edu:/tmp/cvs-serv15090/lib/mixminion/server
Modified Files:
MMTPServer.py Modules.py ServerMain.py ServerQueue.py
Log Message:
Improve logging, speed up tests, bulletproof queues.
Also add a better linecounter (that's been sitting in my ~/bin for a
long time).
Index: MMTPServer.py
===================================================================
RCS file: /home/minion/cvsroot/src/minion/lib/mixminion/server/MMTPServer.py,v
retrieving revision 1.28
retrieving revision 1.29
diff -u -d -r1.28 -r1.29
--- MMTPServer.py 17 May 2003 00:08:44 -0000 1.28
+++ MMTPServer.py 26 May 2003 20:04:24 -0000 1.29
@@ -265,9 +265,9 @@
self.__state = self.__connectFn
if address is not None:
- self.address = address
+ self.address = "%s (fd %s)" % (address, self.fd)
else:
- self.address = "remote host"
+ self.address = "remote host (fd %s)" % (address, self.fd)
def isShutdown(self):
"""Returns true iff this connection is finished shutting down"""
@@ -411,7 +411,7 @@
def tryTimeout(self, cutoff):
if self.lastActivity <= cutoff:
- warn("Socket %s to %s timed out", self.fd, self.address)
+ warn("Connection to %s timed out", self.address)
# ???? I'm not sure this is right. Instead of just killing
# ???? the socket, should we shut down the SSL too?
self.__server.unregister(self)
@@ -541,6 +541,7 @@
because the disk is full."""
SimpleTLSConnection.__init__(self, sock, tls, 1,
"%s:%s"%sock.getpeername())
+
self.messageConsumer = consumer
self.junkCallback = lambda : None
self.rejectCallback = lambda : None
@@ -559,7 +560,7 @@
"""Called once we're done reading the protocol string. Either
rejects, or sends our response.
"""
- trace("done w/ client sendproto (fd %s)", self.fd)
+ trace("done w/ client sendproto to %s", self.address)
inp = self.getInput()
m = PROTOCOL_RE.match(inp)
@@ -571,8 +572,7 @@
protocols = m.group(1).split(",")
for p in self.PROTOCOL_VERSIONS:
if p in protocols:
- trace("Using protocol %s with %s (fd %s)",
- p, self.address, self.fd)
+ trace("Using protocol %s with %s", p, self.address)
self.protocol = p
self.finished = self.__sentProtocol
self.beginWrite("MMTP %s\r\n"% p)
@@ -587,7 +587,7 @@
"""Called once we're done sending our protocol response. Begins
reading a packet from the line.
"""
- trace("done w/ server sendproto (fd %s)", self.fd)
+ trace("done w/ server sendproto to %s", self.address)
self.finished = self.__receivedMessage
self.expectRead(SEND_RECORD_LEN)
@@ -637,7 +637,7 @@
def __sentAck(self):
"""Called once we're done sending an ACK. Begins reading a new
message."""
- debug("Send ACK for message from %s (fd %s)", self.address, self.fd)
+ debug("Send ACK for message from %s", self.address)
self.finished = self.__receivedMessage
self.expectRead(SEND_RECORD_LEN)
@@ -723,7 +723,7 @@
self.protocol = None
self._curMessage = self._curHandle = None
- debug("Opening client connection to %s:%s (fd %s)", ip,port,self.fd)
+ debug("Opening client connection to %s", self.address)
def addMessages(self, messages, handles):
"""Given a list of messages and handles, as given to
@@ -752,11 +752,12 @@
self.certCache.check(self.getTLSConnection(), self.keyID,
self.address)
except MixProtocolError, e:
- warn("%s. Shutting down connection",e)
+ warn("Certificate error: %s. Shutting down connection to %s",
+ e, self.address)
self.shutdown(err=1,retriable=1)
return
else:
- debug("KeyID from %s is valid", self.address)
+ debug("KeyID is valid from %s", self.address)
self.beginWrite("MMTP %s\r\n"%(",".join(self.PROTOCOL_VERSIONS)))
self.finished = self.__sentProtocol
@@ -782,6 +783,7 @@
return
warn("Invalid protocol. Closing connection to %s", self.address)
+
# This isn't retriable; we don't talk to servers we don't
# understand.
self.shutdown(err=1,retriable=0)
@@ -822,7 +824,7 @@
"""Called when we're done sending a message. Begins reading the
server's ACK."""
- debug("Message delivered to %s (fd %s)", self.address, self.fd)
+ debug("Message delivered to %s", self.address)
self.finished = self.__receivedAck
self.expectRead(RECEIVED_RECORD_LEN)
@@ -835,11 +837,11 @@
If there are more messages to send, begins sending the next.
Otherwise, begins shutting down.
"""
- trace("received ack (fd %s)", self.fd)
+ trace("received ack from %s", self.address)
inp = self.getInput()
rejected = 0
if inp == REJECTED_CONTROL+self.rejectDigest:
- debug("Message rejected from %s (fd %s)", self.address, self.fd)
+ debug("Message rejected from %s", self.address)
rejected = 1
elif inp != (RECEIVED_CONTROL+self.expectedDigest):
# We only get bad ACKs if an adversary somehow subverts TLS's
@@ -957,8 +959,8 @@
try:
# Is there an existing connection open to the right server?
con = self.clientConByAddr[(ip,port,keyID)]
- LOG.debug("Queueing %s messages on open connection to %s:%s",
- len(messages), ip, port)
+ LOG.debug("Queueing %s messages on open connection to %s",
+ len(messages), con.address)
con.addMessages(messages, handles)
return
except KeyError:
@@ -989,8 +991,8 @@
try:
del self.clientConByAddr[addr]
except KeyError:
- LOG.warn("Didn't find client connection to %s in address map",
- addr)
+ warn("Didn't find client connection to %s in address map",
+ addr)
def onMessageReceived(self, msg):
"""Abstract function. Called when we get a message"""
Index: Modules.py
===================================================================
RCS file: /home/minion/cvsroot/src/minion/lib/mixminion/server/Modules.py,v
retrieving revision 1.37
retrieving revision 1.38
diff -u -d -r1.37 -r1.38
--- Modules.py 17 May 2003 00:08:45 -0000 1.37
+++ Modules.py 26 May 2003 20:04:25 -0000 1.38
@@ -150,6 +150,8 @@
"Exception delivering message")
EventStats.log.unretriableDeliery() #XXXX
+ return "<nil>"
+
def sendReadyMessages(self):
# We do nothing here; we already delivered the messages
pass
@@ -167,6 +169,7 @@
mixminion.server.ServerQueue.DeliveryQueue.__init__(self, directory,
retrySchedule)
self.module = module
+
def _deliverMessages(self, msgList):
for handle, packet in msgList:
@@ -174,14 +177,18 @@
EventStats.log.attemptedDelivery() #XXXX
result = self.module.processMessage(packet)
if result == DELIVER_OK:
+ LOG.debug("Successfully delivered message MOD:%s", handle)
self.deliverySucceeded(handle)
EventStats.log.successfulDelivery() #XXXX
elif result == DELIVER_FAIL_RETRY:
+ LOG.debug("Unable to deliver message MOD:%s; will retry",
+ handle)
self.deliveryFailed(handle, 1)
EventStats.log.failedDelivery() #XXXX
else:
assert result == DELIVER_FAIL_NORETRY
- LOG.error("Unable to deliver message")
+ LOG.error("Unable to deliver message MOD:%s; giving up",
+ handle)
self.deliveryFailed(handle, 0)
EventStats.log.unretriableDelivery() #XXXX
except:
@@ -404,12 +411,12 @@
if mod is None:
LOG.error("Unable to handle message with unknown type %s",
exitType)
- return
+ return "<nil>"
queue = self.queues[mod.getName()]
LOG.debug("Delivering message %r (type %04x) via module %s",
packet.getContents()[:8], exitType, mod.getName())
- queue.queueDeliveryMessage(packet)
+ return queue.queueDeliveryMessage(packet)
def shutdown(self):
"""Tell the delivery thread (if any) to stop."""
Index: ServerMain.py
===================================================================
RCS file: /home/minion/cvsroot/src/minion/lib/mixminion/server/ServerMain.py,v
retrieving revision 1.54
retrieving revision 1.55
diff -u -d -r1.54 -r1.55
--- ServerMain.py 23 May 2003 22:49:30 -0000 1.54
+++ ServerMain.py 26 May 2003 20:04:25 -0000 1.55
@@ -65,9 +65,8 @@
def queueMessage(self, msg):
"""Add a message for delivery"""
- LOG.trace("Inserted message %s into incoming queue",
- formatBase64(msg[:8]))
h = mixminion.server.ServerQueue.Queue.queueMessage(self, msg)
+ LOG.trace("Inserting message IN:%s into incoming queue", h)
assert h is not None
self.processingThread.addJob(
lambda self=self, h=h: self.__deliverMessage(h))
@@ -82,32 +81,29 @@
res = ph.processMessage(message)
if res is None:
# Drop padding before it gets to the mix.
- LOG.debug("Padding message %s dropped",
- formatBase64(message[:8]))
+ LOG.debug("Padding message IN:%s dropped", handle)
self.removeMessage(handle)
else:
if res.isDelivery():
res.decode()
- LOG.debug("Processed message %s; inserting into pool",
- formatBase64(message[:8]))
- self.mixPool.queueObject(res)
+
+ h2 = self.mixPool.queueObject(res)
self.removeMessage(handle)
+ LOG.debug("Processed message IN:%s; inserting into mix pool as MIX:%s",
+ handle, h2)
except mixminion.Crypto.CryptoError, e:
- LOG.warn("Invalid PK or misencrypted header in message %s: %s",
- formatBase64(message[:8]), e)
+ LOG.warn("Invalid PK or misencrypted header in message IN:%s: %s",
+ handle, e)
self.removeMessage(handle)
except mixminion.Packet.ParseError, e:
- LOG.warn("Malformed message %s dropped: %s",
- formatBase64(message[:8]), e)
+ LOG.warn("Malformed message IN:%s dropped: %s", handle, e)
self.removeMessage(handle)
except mixminion.server.PacketHandler.ContentError, e:
- LOG.warn("Discarding bad packet %s: %s",
- formatBase64(message[:8]), e)
+ LOG.warn("Discarding bad packet IN:%s: %s", handle, e)
self.removeMessage(handle)
except:
LOG.error_exc(sys.exc_info(),
- "Unexpected error when processing message %s (handle %s)",
- formatBase64(message[:8]), handle)
+ "Unexpected error when processing IN:%s", handle)
self.removeMessage(handle) # ???? Really dump this message?
class MixPool:
@@ -156,7 +152,7 @@
def queueObject(self, obj):
"""Insert an object into the pool."""
- self.queue.queueObject(obj)
+ return self.queue.queueObject(obj)
def count(self):
"Return the number of messages in the pool"
@@ -182,15 +178,16 @@
packet = self.queue.getObject(h)
if type(packet) == type(()):
#XXXX005 remove this case.
- LOG.error(" (skipping message %s in obsolete format)", h)
+ LOG.error(" (skipping message MIX:%s in obsolete format)", h)
elif packet.isDelivery():
- LOG.debug(" (sending message %s to exit modules)",
- formatBase64(packet.getContents()[:8]))
- self.moduleManager.queueDecodedMessage(packet)
+ h2 = self.moduleManager.queueDecodedMessage(packet)
+ LOG.debug(" (sending message MIX:%s to exit modules as MOD:%s)"
+ , h, h2)
+
else:
- LOG.debug(" (sending message %s to MMTP server)",
- formatBase64(packet.getPacket()[:8]))
- self.outgoingQueue.queueDeliveryMessage(packet)
+ h2 = self.outgoingQueue.queueDeliveryMessage(packet)
+ LOG.debug(" (sending message MIX:%s to MMTP server as OUT:%s)"
+ , h, h2)
# In any case, we're through with this message now.
self.queue.removeMessage(h)
@@ -238,7 +235,7 @@
for handle, packet in msgList:
if not isinstance(packet,
mixminion.server.PacketHandler.RelayedPacket):
- LOG.warn("Skipping packet in obsolete format")
+ LOG.warn("Skipping packet OUT:%s in obsolete format", handle)
self.deliverySucceeded(handle)
continue
addr = packet.getAddress()
@@ -249,13 +246,14 @@
if self.addr[2] != addr.keyinfo:
LOG.warn("Delivering messages to myself with bad KeyID")
for h,m in messages:
- LOG.trace("Delivering message %s to myself.",
- formatBase64(m[:8]))
+ LOG.trace("Delivering message OUT:%s to myself.", h)
self.incomingQueue.queueMessage(m)
self.deliverySucceeded(h)
continue
handles, messages = zip(*messages)
+ LOG.trace("Delivering messages OUT:[%s] to %s:%s",
+ " ".join(handles), addr.ip,addr.port)
self.server.sendMessages(addr.ip, addr.port, addr.keyinfo,
list(messages), list(handles))
@@ -308,7 +306,7 @@
def deleteFile(self, fname):
"""Schedule the file named 'fname' for deletion"""
- LOG.trace("Scheduling %s for deletion", fname)
+ #LOG.trace("Scheduling %s for deletion", fname)
assert fname is not None
self.mqueue.put(fname)
@@ -334,7 +332,7 @@
LOG.info("Cleanup thread shutting down.")
return
if os.path.exists(fn):
- LOG.trace("Deleting %s", fn)
+ #LOG.trace("Deleting %s", fn)
secureDelete(fn, blocking=1)
else:
LOG.warn("Delete thread didn't find file %s",fn)
Index: ServerQueue.py
===================================================================
RCS file: /home/minion/cvsroot/src/minion/lib/mixminion/server/ServerQueue.py,v
retrieving revision 1.14
retrieving revision 1.15
diff -u -d -r1.14 -r1.15
--- ServerQueue.py 17 May 2003 00:08:45 -0000 1.14
+++ ServerQueue.py 26 May 2003 20:04:27 -0000 1.15
@@ -151,7 +151,7 @@
"""Given a handle, removes the corresponding message from the queue."""
self.__changeState(handle, "msg", "rmv") # handles locking.
- def removeAll(self):
+ def removeAll(self, secureDeleteFn=None):
"""Removes all messages from this queue."""
try:
self._lock.acquire()
@@ -159,7 +159,7 @@
if m[:4] in ('inp_', 'msg_'):
self.__changeState(m[4:], m[:3], "rmv")
self.n_entries = 0
- self.cleanQueue()
+ self.cleanQueue(secureDeleteFn)
finally:
self._lock.release()
@@ -548,7 +548,8 @@
ds = self.deliveryState[handle] = _DeliveryState(now)
self.nextAttempt[handle] = \
ds.getNextAttempt(self.retrySchedule, now)
- #self._saveState()
+ LOG.trace("ServerQueue got message %s for %s",
+ handle, self.dir)
self._writeState(handle)
finally:
self._lock.release()
@@ -633,28 +634,40 @@
try:
self._lock.acquire()
Queue.removeMessage(self, handle)
- for d in self.pending, self.deliveryState, self.nextAttempt:
- try:
- del d[handle]
- except KeyError:
- pass
+ try:
+ del self.pending[handle]
+ pending = 1
+ except KeyError:
+ pending = 0
+
+ try:
+ del self.deliveryState[handle]
+ except KeyError:
+ LOG.error("Removing message %s with no delivery state", handle)
+ try:
+ del self.nextAttempt[handle]
+ except KeyError:
+ LOG.error("Removing message %s with no nextAttempt", handle)
+
try:
del self.sendable[self.sendable.index(handle)]
except ValueError:
- pass
+ if not pending:
+ LOG.error("Removing message %s in neither "
+ "'sendable' nor 'pending' list", handle)
self._writeState(handle)
finally:
self._lock.release()
- def removeAll(self):
+ def removeAll(self, secureDeleteFn=None):
try:
self._lock.acquire()
- Queue.removeAll(self)
for m in os.listdir(self.dir):
if m[:5] == 'meta_':
os.rename(os.path.join(self.dir, m),
os.path.join(self.dir, "rmv_"+m))
+ Queue.removeAll(self, secureDeleteFn)
self.deliveryState = {}
self.pending = {}
self.nextAttempt = {}
@@ -708,7 +721,7 @@
ds.setLastAttempt(lastAttempt)
nextAttempt = ds.getNextAttempt(self.retrySchedule, now)
if nextAttempt is not None:
- LOG.trace(" (We'll %s try again at %s)", handle,
+ LOG.trace(" (We'll try %s again at %s)", handle,
formatTime(nextAttempt, 1))
# There is another scheduled delivery attempt. Remember
# it, mark the message sendable again, and save our state.