[Author Prev][Author Next][Thread Prev][Thread Next][Author Index][Thread Index]
[minion-cvs] Look, we"re alpha again! Time for some delayed perform...
Update of /home/minion/cvsroot/src/minion/lib/mixminion/server
In directory moria.mit.edu:/tmp/cvs-serv6726/lib/mixminion/server
Modified Files:
MMTPServer.py Modules.py ServerMain.py ServerQueue.py
Log Message:
Look, we're alpha again! Time for some delayed performance work.
(Some of this still needs docs and tests)
Common, ServerMain:
- To make the server shut down faster, abandon pending work in the
processing and cleaning threads when we're shutting down.
(Formerly, if we got behind cleaning files or processing messages,
we'd wait to finish before we shut down. Now we shut down almost
immediately. This is safe, because the code is written to be safe
in the presence of SIGKILL's anyway.)
test, MMTPServer, Modules, ServerMain
- When we had N messages to deliver, we used to suck all N into
memory. Clearly, this was a pretty rotten idea: instead, we now
lazy-load messages as they're needed. This makes the mixminion
server use even less memory under load than it did before.
The code is a little hairy, but it seems to work for me. It still
needs documentation.
Index: MMTPServer.py
===================================================================
RCS file: /home/minion/cvsroot/src/minion/lib/mixminion/server/MMTPServer.py,v
retrieving revision 1.39
retrieving revision 1.40
diff -u -d -r1.39 -r1.40
--- MMTPServer.py 6 Jun 2003 07:17:35 -0000 1.39
+++ MMTPServer.py 13 Jun 2003 01:03:46 -0000 1.40
@@ -32,6 +32,7 @@
from mixminion.Crypto import sha1, getCommonPRNG
from mixminion.Packet import MESSAGE_LEN, DIGEST_LEN
from mixminion.MMTPClient import PeerCertificateCache
+import mixminion.server.EventStats as EventStats
__all__ = [ 'AsyncServer', 'ListenConnection', 'MMTPServerConnection',
'MMTPClientConnection' ]
@@ -698,6 +699,27 @@
NULL_KEYID = "\x00"*20
+class DeliverableMessage:
+ def __init__(self):
+ pass
+ def getContents(self):
+ raise NotImplementedError
+ def succeeded(self):
+ raise NotImplementedError
+ def failed(self,retriable=0):
+ raise NotImplementedError
+
+class DeliverablePacket(DeliverableMessage):
+ def __init__(self, pending):
+ DeliverableMessage.__init__(self)
+ self.pending = pending
+ def succeeded(self):
+ self.pending.succeeded()
+ def failed(self,retriable=0):
+ self.pending.failed(retriable=retriable)
+ def getContents(self):
+ return self.pending.getMessage().getPacket()
+
class MMTPClientConnection(SimpleTLSConnection):
"""Asynchronious implementation of the sending ("client") side of a
mixminion connection."""
@@ -721,9 +743,8 @@
# active: Are we currently able to send messages to a server? Boolean.
PROTOCOL_VERSIONS = [ '0.3' ]
- def __init__(self, context, ip, port, keyID, messageList, handleList,
- sentCallback=None, failCallback=None, finishedCallback=None,
- certCache=None):
+ def __init__(self, context, ip, port, keyID, messageList,
+ finishedCallback=None, certCache=None):
"""Create a connection to send messages to an MMTP server.
Raises socket.error if the connection fails.
@@ -733,12 +754,6 @@
messageList -- a list of message payloads and control strings.
The control string "JUNK" sends 32KB of padding; the control
string "RENEGOTIATE" renegotiates the connection key.
- handleList -- a list of objects corresponding to the entries in
- messageList. Used for callback.
- sentCallback -- None, or a function of (msg, handle) to be called
- whenever a message is successfully sent.
- failCallback -- None, or a function of (msg, handle, retriable)
- to be called when messages can't be sent.
finishedCallback -- None, or a function to be called when this
connection is closed.
certCache -- an instance of PeerCertificateCache to use for
@@ -746,11 +761,12 @@
"""
# Generate junk before connecting to avoid timing attacks
self.junk = []
+ #DOCDOC
self.messageList = []
- self.handleList = []
self.active = 1
- self.addMessages(messageList, handleList)
+ #DOCDOC
+ self.addMessages(messageList)
if certCache is None:
certCache = PeerCertificateCache()
@@ -773,8 +789,6 @@
SimpleTLSConnection.__init__(self, sock, tls, 0, "%s:%s"%(ip,port))
self.finished = self.__setupFinished
- self.sentCallback = sentCallback
- self.failCallback = failCallback
self.finishedCallback = finishedCallback
self.protocol = None
self._curMessage = self._curHandle = None
@@ -787,21 +801,16 @@
the connection is currently shutting down."""
return self.active
- def addMessages(self, messages, handles):
+ def addMessages(self, messages):
"""Given a list of messages and handles, as given to
MMTPServer.__init__, cause this connection to deliver that new
set of messages after it's done with those it's currently sending.
"""
assert self.active
- assert len(messages) == len(handles)
- for m,h in zip(messages, handles):
- if m in ("JUNK", "RENEGOTIATE"):
- assert h is None
for m in messages:
if m == "JUNK":
self.junk.append(getCommonPRNG().getBytes(MESSAGE_LEN))
self.messageList.extend(messages)
- self.handleList.extend(handles)
def getAddr(self):
"""Return an (ip,port,keyID) tuple for this connection"""
@@ -858,10 +867,8 @@
self.shutdown(0)
return
- msg = self._curMessage = self.messageList[0]
- self._curHandle = self.handleList[0]
- del self.messageList[0]
- del self.handleList[0]
+ self._getNextMessage()
+ msg = self._curMessage
if msg == 'RENEGOTIATE':
self.finished = self.beginNextMessage
self.startRenegotiate()
@@ -874,15 +881,27 @@
msg = JUNK_CONTROL+msg+sha1(msg+"JUNK")
self.isJunk = 1
else:
+ EventStats.log.attemptedRelay() #FFFF addr
self.expectedDigest = sha1(msg+"RECEIVED")
self.rejectDigest = sha1(msg+"REJECTED")
msg = SEND_CONTROL+msg+sha1(msg+"SEND")
self.isJunk = 0
-
+
assert len(msg) == SEND_RECORD_LEN
self.beginWrite(msg)
self.finished = self.__sentMessage
+ def _getNextMessage(self):
+ """DOCDOC"""
+ m = self.messageList[0]
+ del self.messageList[0]
+ if hasattr(m, 'getContents'):
+ self._curHandle = m
+ self._curMessage = m.getContents()
+ else:
+ self._curHandle = None
+ self._curMessage = m
+
def __sentMessage(self):
"""Called when we're done sending a message. Begins reading the
server's ACK."""
@@ -915,11 +934,12 @@
debug("Received valid ACK for message from %s", self.address)
if not self.isJunk:
- if not rejected and self.sentCallback is not None:
- self.sentCallback(self._curMessage, self._curHandle)
- elif rejected and self.failCallback is not None:
- self.failCallback(self._curMessage, self._curHandle,
- retriable=1)
+ if not rejected:
+ self._curHandle.succeeded()
+ EventStats.log.successfulRelay() #FFFF addr
+ else:
+ self._curHandle.failed(retriable=1)
+ EventStats.log.failedRelay() #FFFF addr
self._curMessage = self._curHandle = None
@@ -927,14 +947,20 @@
def handleFail(self, retriable):
"""Invoked when we shutdown with an error."""
- if self.failCallback is not None:
- if self._curHandle is not None:
- self.failCallback(self._curMessage, self._curHandle, retriable)
- for msg, handle in zip(self.messageList, self.handleList):
- if handle is None:
- continue
- self.failCallback(msg,handle,retriable)
- self._messageList = self.handleList = []
+ if retriable:
+ statFn = EventStats.log.failedRelay
+ else:
+ statFn = EventStats.log.unretriableRelay
+ if self._curHandle is not None:
+ self._curHandle.failed(retriable)
+ statFn()
+ for msg in self.messageList:
+ try:
+ msg.failed(retriable)
+ statFn()
+ except AttributeError:
+ pass
+ self._messageList = []
self._curMessage = self._curHandle = None
def shutdown(self, err=0, retriable=0):
@@ -946,8 +972,6 @@
if self.finishedCallback is not None:
self.finishedCallback()
self.finishedCallback = None
- self.failCallback = None
- self.sentCallback = None
SimpleTLSConnection.remove(self)
@@ -1014,24 +1038,17 @@
def stopListening(self):
self.listener.shutdown()
- def sendMessages(self, ip, port, keyID, messages, handles):
+ def sendMessages(self, ip, port, keyID, deliverable):
"""Begin sending a set of messages to a given server."""
- for m,h in zip(messages, handles):
- if m in ("JUNK", "RENEGOTIATE"):
- assert h is None
- continue
- assert len(m) == MESSAGE_LEN
- assert len(h) < 32
-
try:
# Is there an existing connection open to the right server?
con = self.clientConByAddr[(ip,port,keyID)]
# If so, is that connection currently sending messages?
if con.isActive():
LOG.debug("Queueing %s messages on open connection to %s",
- len(messages), con.address)
- con.addMessages(messages, handles)
+ len(deliverable), con.address)
+ con.addMessages(deliverable)
return
except KeyError:
pass
@@ -1041,9 +1058,7 @@
addr = (ip, port, keyID)
finished = lambda addr=addr, self=self: self.__clientFinished(addr)
con = MMTPClientConnection(self.context,
- ip, port, keyID, messages, handles,
- sentCallback=self.onMessageSent,
- failCallback=self.onMessageUndeliverable,
+ ip, port, keyID, deliverable,
finishedCallback=finished,
certCache=self.certificateCache)
con.register(self)
@@ -1053,8 +1068,11 @@
except socket.error, e:
LOG.error("Unexpected socket error connecting to %s:%s: %s",
ip, port, e)
- for m,h in zip(messages, handles):
- self.onMessageUndeliverable(m,h,1)
+ for m in messages:
+ try:
+ m.failed(1)
+ except AttributeError:
+ pass
def __clientFinished(self, addr):
"""Called when a client connection runs out of messages to send."""
@@ -1068,12 +1086,4 @@
"""Abstract function. Called when we get a message"""
pass
- def onMessageUndeliverable(self, msg, handle, retriable):
- """Abstract function: Called when an attempt to deliver a
- message fails."""
- pass
- def onMessageSent(self, msg, handle):
- """Abstract function: Called when an attempt to deliver a
- message succeeds."""
- pass
Index: Modules.py
===================================================================
RCS file: /home/minion/cvsroot/src/minion/lib/mixminion/server/Modules.py,v
retrieving revision 1.42
retrieving revision 1.43
diff -u -d -r1.42 -r1.43
--- Modules.py 5 Jun 2003 18:41:40 -0000 1.42
+++ Modules.py 13 Jun 2003 01:03:46 -0000 1.43
@@ -172,29 +172,30 @@
def _deliverMessages(self, msgList):
- for handle, packet in msgList:
+ for handle in msgList:
try:
EventStats.log.attemptedDelivery() #FFFF
+ packet = handle.getMessage()
result = self.module.processMessage(packet)
if result == DELIVER_OK:
LOG.debug("Successfully delivered message MOD:%s", handle)
- self.deliverySucceeded(handle)
+ handle.succeeded()
EventStats.log.successfulDelivery() #FFFF
elif result == DELIVER_FAIL_RETRY:
LOG.debug("Unable to deliver message MOD:%s; will retry",
handle)
- self.deliveryFailed(handle, 1)
+ handle.failed(1)
EventStats.log.failedDelivery() #FFFF
else:
assert result == DELIVER_FAIL_NORETRY
LOG.error("Unable to deliver message MOD:%s; giving up",
handle)
- self.deliveryFailed(handle, 0)
+ handle.failed(0)
EventStats.log.unretriableDelivery() #FFFF
except:
LOG.error_exc(sys.exc_info(),
"Exception delivering message")
- self.deliveryFailed(handle, 0)
+ handle.failed(0)
EventStats.log.unretriableDelivery() #FFFF
class DeliveryThread(threading.Thread):
Index: ServerMain.py
===================================================================
RCS file: /home/minion/cvsroot/src/minion/lib/mixminion/server/ServerMain.py,v
retrieving revision 1.79
retrieving revision 1.80
diff -u -d -r1.79 -r1.80
--- ServerMain.py 12 Jun 2003 04:07:54 -0000 1.79
+++ ServerMain.py 13 Jun 2003 01:03:46 -0000 1.80
@@ -48,7 +48,7 @@
# We pull this from mixminion.Common, just in case somebody still has
# a copy of the old "mixminion/server/Queue.py" (since renamed to
# ServerQueue.py)
-from mixminion.Common import MessageQueue
+from mixminion.Common import MessageQueue, ClearableQueue, QueueEmpty
import mixminion.Config
import mixminion.Crypto
@@ -256,16 +256,13 @@
for h in handles:
packet = self.queue.getObject(h)
- if type(packet) == type(()):
- #XXXX005 remove this case.
- LOG.error(" (skipping message MIX:%s in obsolete format)", h)
- elif packet.isDelivery():
+ if packet.isDelivery():
h2 = self.moduleManager.queueDecodedMessage(packet)
LOG.debug(" (sending message MIX:%s to exit modules as MOD:%s)"
, h, h2)
-
else:
- h2 = self.outgoingQueue.queueDeliveryMessage(packet)
+ address = packet.getAddress()
+ h2 = self.outgoingQueue.queueDeliveryMessage(packet, address)
LOG.debug(" (sending message MIX:%s to MMTP server as OUT:%s)"
, h, h2)
# In any case, we're through with this message now.
@@ -312,30 +309,31 @@
"Implementation of abstract method from DeliveryQueue."
# Map from addr -> [ (handle, msg) ... ]
msgs = {}
- for handle, packet in msgList:
- if not isinstance(packet,
- mixminion.server.PacketHandler.RelayedPacket):
- LOG.warn("Skipping packet OUT:%s in obsolete format", handle)
- self.deliverySucceeded(handle)
- continue
- addr = packet.getAddress()
- message = packet.getPacket()
- msgs.setdefault(addr, []).append( (handle, message) )
+ for pending in msgList:
+ addr = pending.getAddress()
+ if addr is None:
+ addr = pending.getMessage().getAddress()
+ msgs.setdefault(addr, []).append(pending)
for addr, messages in msgs.items():
if self.addr[:2] == (addr.ip, addr.port):
if self.addr[2] != addr.keyinfo:
LOG.warn("Delivering messages to myself with bad KeyID")
- for h,m in messages:
- LOG.trace("Delivering message OUT:%s to myself.", h)
- self.incomingQueue.queueMessage(m)
- self.deliverySucceeded(h)
+ for pending in messages:
+ LOG.trace("Delivering message OUT:%s to myself.",
+ pending.getHandle())
+ self.incomingQueue.queueMessage(
+ pending.getMessage().getPacket())
+ pending.succeeded()
continue
- handles, messages = zip(*messages)
+ deliverable = [
+ mixminion.server.MMTPServer.DeliverablePacket(pending)
+ for pending in messages ]
LOG.trace("Delivering messages OUT:[%s] to %s:%s",
- " ".join(handles), addr.ip,addr.port)
+ " ".join([p.getHandle() for p in messages]),
+ addr.ip, addr.port)
self.server.sendMessages(addr.ip, addr.port, addr.keyinfo,
- list(messages), list(handles))
+ deliverable)
class _MMTPServer(mixminion.server.MMTPServer.MMTPAsyncServer):
"""Implementation of mixminion.server.MMTPServer that knows about
@@ -358,19 +356,6 @@
# FFFF Replace with server.
EventStats.log.receivedPacket()
- def onMessageSent(self, msg, handle):
- self.outgoingQueue.deliverySucceeded(handle)
- EventStats.log.attemptedRelay() # FFFF replace with addr
- EventStats.log.successfulRelay() # FFFF replace with addr
-
- def onMessageUndeliverable(self, msg, handle, retriable):
- self.outgoingQueue.deliveryFailed(handle, retriable)
- EventStats.log.attemptedRelay() # FFFF replace with addr
- if retriable:
- EventStats.log.failedRelay() # FFFF replace with addr
- else:
- EventStats.log.unretriableRelay() # FFFF replace with addr
-
#----------------------------------------------------------------------
class CleaningThread(threading.Thread):
"""Thread that handles file deletion. Some methods of secure deletion
@@ -378,11 +363,11 @@
main thread.
"""
# Fields:
- # mqueue: A MessageQueue holding filenames to delete, or None to indicate
+ # mqueue: A ClearableQueue holding filenames to delete, or None to indicate
# a shutdown.
def __init__(self):
threading.Thread.__init__(self)
- self.mqueue = MessageQueue()
+ self.mqueue = ClearableQueue()
def deleteFile(self, fname):
"""Schedule the file named 'fname' for deletion"""
@@ -399,6 +384,7 @@
"""Tell this thread to shut down once it has deleted all pending
files."""
LOG.info("Telling cleanup thread to shut down.")
+ self.mqueue.clear()
self.mqueue.put(None)
def run(self):
@@ -425,7 +411,7 @@
Currently used to process packets in the background."""
# Fields:
- # mqueue: a MessageQueue of callable objects.
+ # mqueue: a ClearableQueue of callable objects.
class _Shutdown:
"""Callable that raises itself when called. Inserted into the
queue when it's time to shut down."""
@@ -433,12 +419,13 @@
raise self
def __init__(self):
- """Given a MessageQueue object, create a new processing thread."""
+ """Create a new processing thread."""
threading.Thread.__init__(self)
- self.mqueue = MessageQueue()
+ self.mqueue = ClearableQueue()
def shutdown(self):
LOG.info("Telling processing thread to shut down.")
+ self.mqueue.clear()
self.mqueue.put(ProcessingThread._Shutdown())
def addJob(self, job):
Index: ServerQueue.py
===================================================================
RCS file: /home/minion/cvsroot/src/minion/lib/mixminion/server/ServerQueue.py,v
retrieving revision 1.22
retrieving revision 1.23
diff -u -d -r1.22 -r1.23
--- ServerQueue.py 6 Jun 2003 06:08:40 -0000 1.22
+++ ServerQueue.py 13 Jun 2003 01:03:46 -0000 1.23
@@ -12,7 +12,6 @@
import sys
import cPickle
import threading
-import types
from mixminion.Common import MixError, MixFatalError, secureDelete, LOG, \
createPrivateDir, readPickled, writePickled, formatTime, readFile
@@ -287,7 +286,6 @@
finally:
self._lock.release()
-
class _DeliveryState:
"""Helper class: holds the state needed to schedule delivery or
eventual abandonmont of a message in a DeliveryQueue."""
@@ -296,7 +294,8 @@
# inserted into the queue.
# lastAttempt: The most recent time at which we attempted to
# deliver the message. (None means 'never').
- def __init__(self, queuedTime=None, lastAttempt=None):
+ # address: Pickleable object holding address information. DOCDOC
+ def __init__(self, queuedTime=None, lastAttempt=None, address=None):
"""Create a new _DeliveryState for a message received at
queuedTime (default now), whose last delivery attempt was
at lastAttempt (default never)."""
@@ -304,17 +303,24 @@
queuedTime = time.time()
self.queuedTime = queuedTime
self.lastAttempt = lastAttempt
+ self.address = None
def __getstate__(self):
# For pickling. All future versions of deliverystate will pickle
# to a tuple, whose first element will be a version string.
- return ("V0", self.queuedTime, self.lastAttempt)
+ return ("V1", self.queuedTime, self.lastAttempt, self.address)
def __setstate__(self, state):
# For pickling.
- if state[0] == "V0":
+ if state[0] == "V1":
self.queuedTime = state[1]
self.lastAttempt = state[2]
+ self.address = state[3]
+ elif state[0] == "V0":
+ #XXXX006 remove this case.
+ self.queuedTime = state[1]
+ self.lastAttempt = state[2]
+ self.address = None
else:
raise MixFatalError("Unrecognized delivery state")
@@ -355,6 +361,34 @@
"""Update time of the last attempted delivery."""
self.lastAttempt = when
+class PendingMessage:
+ """DOCDOC"""
+ def __init__(self, handle, queue, address, message=None):
+ self.handle = handle
+ self.queue = queue
+ self.address = address
+ self.message = message
+
+ def getAddress(self):
+ return self.address
+
+ def getHandle(self):
+ return self.handle
+
+ def succeeded(self):
+ self.queue.deliverySucceeded(self.handle)
+ self.queue = self.message = None
+
+ def failed(self, retriable=0, now=None):
+ self.queue.deliveryFailed(self.handle, retriable, now=now)
+ self.queue = self.message = None
+
+ def getMessage(self):
+ if self.message is None:
+ self.message = self.queue.getObject(self.handle)
+ return self.message
+
+
class DeliveryQueue(Queue):
"""A DeliveryQueue implements a queue that greedily sends messages to
outgoing streams that occasionally fail. All underlying messages
@@ -456,16 +490,6 @@
self.deliveryState[h] = readPickled(fn)
else:
LOG.warn("No metadata for file handle %s", h)
- obj = self.getObject(h)
- #XXXX005 remove this.
- if isinstance(obj, types.TupleType) and len(obj) == 4:
- # This message is in an obsolete format from 0.0.3.
- # We'd repair it, but packets from 0.0.3 are incompatible
- # anyway.
- LOG.info("Removing item %s in obsolete format", h)
- self.removeMessage(h)
- continue
-
self.deliveryState[h] = _DeliveryState()
self._writeState(h)
@@ -534,7 +558,7 @@
def queueMessage(self, msg):
if 1: raise MixError("Tried to call DeliveryQueue.queueMessage.")
- def queueDeliveryMessage(self, msg, now=None):
+ def queueDeliveryMessage(self, msg, address=None, now=None):
"""Schedule a message for delivery.
msg -- the message. This can be any pickleable object.
"""
@@ -543,7 +567,7 @@
self._lock.acquire()
handle = self.queueObject(msg)
self.sendable.append(handle)
- ds = self.deliveryState[handle] = _DeliveryState(now)
+ ds = self.deliveryState[handle] = _DeliveryState(now,None,address)
self.nextAttempt[handle] = \
ds.getNextAttempt(self.retrySchedule, now)
LOG.trace("ServerQueue got message %s for %s",
@@ -602,7 +626,12 @@
self.removeMessage(h)
elif next <= now:
LOG.trace(" [%s] is ready for delivery", h)
- messages.append( (h, self.getObject(h)) )
+ state = self.deliveryState.get(h)
+ if state is None:
+ addr = None
+ else:
+ addr = state.address
+ messages.append(PendingMessage(h,self,addr))
self.pending[h] = now
else:
LOG.trace(" [%s] is not yet ready for redelivery", h)
@@ -614,10 +643,6 @@
self._deliverMessages(messages)
self._repOk()
- # FFFF005 This interface is inefficient in space: we don't need to load
- # FFFF005 the messages to tell whether they need to be delivered. We
- # FFFF005 should have _deliverMessages() take a list of handles, not of
- # FFFF005 messages.
def _deliverMessages(self, msgList):
"""Abstract method; Invoked with a list of (handle, message)
tuples every time we have a batch of messages to send.
@@ -625,6 +650,8 @@
For every handle in the list, delierySucceeded or deliveryFailed
should eventually be called, or the message will sit in the queue
indefinitely, without being retried."""
+
+ #DOCDOC
# We could implement this as a single _deliverMessage(h,addr)
# method, but that wouldn't allow implementations to batch