[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]
[minion-cvs] More work on multithreaded server, other goodies. It s...
Update of /home/minion/cvsroot/src/minion/lib/mixminion/server
In directory moria.mit.edu:/tmp/cvs-serv10789/lib/mixminion/server
Modified Files:
Modules.py PacketHandler.py ServerMain.py ServerQueue.py
Log Message:
More work on multithreaded server, other goodies. It seems more stable now.
setup.py:
- Remove old mixminion/server/Queue.py where found
ClientMain, ServerMain:
- Don't use stderr if it isn't an error.
Common:
- Call 'Queue' 'MessageQueue' to avoid potential confusion.
Main:
- Add a disclaimer to the main usage message
Modules, PacketHandler, ServerMain:
- Refactor how we pass packets around so as to move message decoding into the
processing thread: Objects are beautiful; tuples are kludgey.
ServerMain:
- Document more of the multithreaded stuff
- refactor
- make locking more sane
- handle queues with obsolete tuple-based messages.
- Remove old comment.
- Reset logs on sighup
- Check for thread liveness (this may be a bad idea)
ServerQueue:
- Simplify clean logic
- make locking more sane.
- Be more robust on erroneous queue state.
test, benchmark:
- Update tests to use new interfaces
Index: Modules.py
===================================================================
RCS file: /home/minion/cvsroot/src/minion/lib/mixminion/server/Modules.py,v
retrieving revision 1.23
retrieving revision 1.24
diff -u -d -r1.23 -r1.24
--- Modules.py 9 Jan 2003 18:54:01 -0000 1.23
+++ Modules.py 10 Jan 2003 20:12:05 -0000 1.24
@@ -104,9 +104,8 @@
"""
return SimpleModuleDeliveryQueue(self, queueDir)
- def processMessage(self, message, tag, exitType, exitInfo):
- """Given a message with a given exitType and exitInfo, try to deliver
- it. 'tag' is as decribed in createDeliveryQueue. Return one of:
+ def processMessage(self, packet):
+ """Given a DeliveryPacket object, try to delier it. Return one of:
DELIVER_OK (if the message was successfully delivered),
DELIVER_FAIL_RETRY (if the message wasn't delivered, but might be
deliverable later), or
@@ -125,11 +124,12 @@
def __init__(self, module):
self.module = module
- def queueDeliveryMessage(self, (exitType, address, tag), message):
+ def queueDeliveryMessage(self, addr, packet, retry=0):
"""Instead of queueing our message, pass it directly to the underlying
DeliveryModule."""
+ assert addr is None
try:
- res = self.module.processMessage(message, tag, exitType, address)
+ res = self.module.processMessage(packet)
if res == DELIVER_OK:
return
elif res == DELIVER_FAIL_RETRY:
@@ -158,10 +158,10 @@
self.module = module
def _deliverMessages(self, msgList):
- for handle, addr, message, n_retries in msgList:
+ for handle, addr, packet, n_retries in msgList:
+ assert addr is None
try:
- exitType, address, tag = addr
- result = self.module.processMessage(message,tag,exitType,address)
+ result = self.module.processMessage(packet)
if result == DELIVER_OK:
self.deliverySucceeded(handle)
elif result == DELIVER_FAIL_RETRY:
@@ -174,12 +174,16 @@
"Exception delivering message")
self.deliveryFailed(handle, 0)
+ def queueDeliveryMessage(self, addr, packet, retry=0):
+ assert addr is None
+ mixminion.server.ServerQueue.DeliveryQueue.queueDeliveryMessage(
+ self, None, packet, retry)
+
class DeliveryThread(threading.Thread):
def __init__(self, moduleManager):
threading.Thread.__init__(self)
self.moduleManager = moduleManager
self.event = threading.Event()
- self.setDaemon(1)
self.__stoppinglock = threading.Lock() # UGLY. XXXX
self.isStopping = 0
@@ -194,16 +198,20 @@
self.event.set()
def run(self):
- while 1:
- self.event.wait()
- self.event.clear()
- self.__stoppinglock.acquire()
- stop = self.isStopping
- self.__stoppinglock.release()
- if stop:
- LOG.info("Delivery thread shutting down.")
- return
- self.moduleManager._sendReadyMessages()
+ try:
+ while 1:
+ self.event.wait()
+ self.event.clear()
+ self.__stoppinglock.acquire()
+ stop = self.isStopping
+ self.__stoppinglock.release()
+ if stop:
+ LOG.info("Delivery thread shutting down.")
+ return
+ self.moduleManager._sendReadyMessages()
+ except:
+ LOG.error_exc(sys.exc_info(),
+ "Exception in delivery; shutting down thread.")
class ModuleManager:
"""A ModuleManager knows about all of the server modules in the system.
@@ -367,45 +375,24 @@
if self.enabled.has_key(module.getName()):
del self.enabled[module.getName()]
- def queueMessage(self, message, tag, exitType, address):
- """Queue a message for delivery."""
- # XXXX003 remove the more complex logic here into the PacketHandler
- # XXXX003 code.
- # FFFF Support non-exit messages.
- (exitType, address, tag), message = \
- self.decodeMessage(message, tag, exitType, address)
- self.queueDecodedMessage((exitType, address, tag), message)
+ def queueMessage2(self, packet):
+ self.queueDecodedMessage(packet)
- def queueDecodedMessage(self, (exitType, address, tag), message):
+ def queueDecodedMessage(self, packet):
#DOCDOC
+ exitType = packet.getExitType()
+
mod = self.typeToModule.get(exitType, None)
if mod is None:
LOG.error("Unable to handle message with unknown type %s",
- exitType)
+ exitType)
return
queue = self.queues[mod.getName()]
LOG.debug("Delivering message %r (type %04x) via module %s",
- message[:8], exitType, mod.getName())
-
- queue.queueDeliveryMessage((exitType, address, tag), message)
-
- def decodeMessage(self, message, tag, exitType, address):
- payload = None
- try:
- payload = mixminion.BuildMessage.decodePayload(message, tag)
- except CompressedDataTooLong:
- contents = mixminion.Packet.parsePayload(message).getContents()
- return (exitType, address, 'long'), contents
- except MixError:
- return (exitType, address, 'err'), message
-
- if payload is None:
- # encrypted message
- return (exitType, address, tag), message
- else:
- # forward message
- return (exitType, address, None), payload
-
+ packet.getContents()[:8], exitType, mod.getName())
+
+ queue.queueDeliveryMessage(None, packet)
+
#DOCDOC
def shutdown(self):
if self.thread is not None:
@@ -445,7 +432,7 @@
return [ mixminion.Packet.DROP_TYPE ]
def createDeliveryQueue(self, directory):
return ImmediateDeliveryQueue(self)
- def processMessage(self, message, tag, exitType, exitInfo):
+ def processMessage(self, packet):
LOG.debug("Dropping padding message")
return DELIVER_OK
@@ -687,11 +674,11 @@
def getExitTypes(self):
return [ mixminion.Packet.MBOX_TYPE ]
- def processMessage(self, message, tag, exitType, address):
+ def processMessage(self, packet): #message, tag, exitType, address):
# Determine that message's address;
- assert exitType == mixminion.Packet.MBOX_TYPE
+ assert packet.getExitType() == mixminion.Packet.MBOX_TYPE
LOG.debug("Received MBOX message")
- info = mixminion.Packet.parseMBOXInfo(address)
+ info = mixminion.Packet.parseMBOXInfo(packet.getAddress())
try:
address = self.addresses[info.user]
except KeyError:
@@ -699,7 +686,7 @@
return DELIVER_FAIL_NORETRY
# Escape the message if it isn't plaintext ascii
- msg = _escapeMessageForEmail(message, tag)
+ msg = _escapeMessageForEmail(packet)
# Generate the boilerplate (FFFF Make this configurable)
fields = { 'user': address,
@@ -795,14 +782,15 @@
manager.enableModule(self)
- def processMessage(self, message, tag, exitType, address):
- assert exitType == mixminion.Packet.SMTP_TYPE
+ def processMessage(self, packet):
+ assert packet.getExitType() == mixminion.Packet.SMTP_TYPE
LOG.debug("Received SMTP message")
# parseSMTPInfo will raise a parse error if the mailbox is invalid.
try:
- address = mixminion.Packet.parseSMTPInfo(address).email
+ address = mixminion.Packet.parseSMTPInfo(packet.getAddress()).email
except ParseError:
- LOG.warn("Dropping SMTP message to invalid address %r", address)
+ LOG.warn("Dropping SMTP message to invalid address %r",
+ packet.getAddress())
return DELIVER_FAIL_NORETRY
# Now, have we blacklisted this address?
@@ -811,7 +799,7 @@
return DELIVER_FAIL_NORETRY
# Decode and escape the message, and get ready to send it.
- msg = _escapeMessageForEmail(message, tag)
+ msg = _escapeMessageForEmail(packet)
msg = "To: %s\n%s\n\n%s"%(address, self.header, msg)
# Send the message.
@@ -873,17 +861,18 @@
self.tmpQueue.removeAll()
return _MixmasterSMTPModuleDeliveryQueue(self, queueDir)
- def processMessage(self, message, tag, exitType, smtpAddress):
+ def processMessage(self, packet):
"""Insert a message into the Mixmaster queue"""
- assert exitType == mixminion.Packet.SMTP_TYPE
+ assert packet.getExitType() == mixminion.Packet.SMTP_TYPE
# parseSMTPInfo will raise a parse error if the mailbox is invalid.
try:
- info = mixminion.Packet.parseSMTPInfo(smtpAddress)
+ info = mixminion.Packet.parseSMTPInfo(packet.getAddress())
except ParseError:
- LOG.warn("Dropping SMTP message to invalid address %r",smtpAddress)
+ LOG.warn("Dropping SMTP message to invalid address %r",
+ packet.getAddress())
return DELIVER_FAIL_NORETRY
- msg = _escapeMessageForEmail(message, tag)
+ msg = _escapeMessageForEmail(packet)
handle = self.tmpQueue.queueMessage(msg)
cmd = self.command
@@ -936,7 +925,7 @@
#----------------------------------------------------------------------
-def _escapeMessageForEmail(msg, tag):
+def _escapeMessageForEmail(packet):
"""Helper function: Given a message and tag, escape the message if
it is not plaintext ascii, and wrap it in some standard
boilerplate. Add a disclaimer if the message is not ascii.
@@ -949,33 +938,34 @@
'long' [if the message might be a zlib bomb'].
Returns None on an invalid message."""
- m = _escapeMessage(msg, tag, text=1)
- if m is None:
+ if packet.isError():
return None
- code, msg, tag = m
- if code == 'ENC':
+ if packet.isEncrypted():
junk_msg = """\
This message is not in plaintext. It's either 1) a reply; 2) a forward
message encrypted to you; or 3) junk.\n\n"""
- elif code == 'ZB':
+ elif packet.isOvercompressed():
junk_msg = """\
This message is compressed with zlib. Ordinarily, I would have decompressed
it, but it was compressed by more than a factor of 20, which makes me nervous.
\n"""
- elif code == 'BIN':
+ elif not packet.isPrintingAscii():
+ assert packet.isPlaintext()
junk_msg = """\
This message contains nonprinting characters, so I encoded it with Base64
before sending it to you.\n\n"""
else:
+ assert packet.isPlaintext()
junk_msg = ""
- if tag is not None:
- tag = "Decoding handle: "+tag+"\n"
+ if packet.isEncrypted():
+ tag = "Decoding handle: %s\n"%packet.getAsciiTag()
else:
tag = ""
- if msg and msg[-1] != '\n':
+ contents = packet.getAsciiContents()
+ if contents and contents[-1] != '\n':
extra_newline = "\n"
else:
extra_newline = ""
@@ -983,42 +973,4 @@
return """\
%s======= TYPE III ANONYMOUS MESSAGE BEGINS ========
%s%s%s======== TYPE III ANONYMOUS MESSAGE ENDS =========
-""" %(junk_msg, tag, msg, extra_newline)
-
-def _escapeMessage(message, tag, text=0):
- """Helper: given a decoded message (and possibly its tag), determine
- whether the message is a text plaintext message (code='TXT'), a
- binary plaintext message (code 'BIN'), an encrypted message/reply
- (code='ENC'), or a plaintext possible zlib bomb ('ZB'). If
- requested, non-TXT messages are base-64 encoded.
-
- Returns: (code, message, tag (for ENC) or None (for BIN, TXT).
- Returns None if the message is invalid.
-
- message -- A (possibly decoded) message
- tag -- One of: a 20-byte decoding tag [if the message is encrypted
- or a reply]
- None [if the message is in plaintext]
- 'err' [if the message was invalid.]
- 'long' [if the message might be a zlib bomb'].
- text -- flag: if true, non-TXT messages must be base64-encoded.
- """
- if tag == 'err':
- return None
- elif tag == 'long':
- code = "ZB"
- elif tag is not None:
- code = "ENC"
- else:
- assert tag is None
- if isPrintingAscii(message, allowISO=1):
- code = "TXT"
- else:
- code = "BIN"
-
- if text and (code != "TXT") :
- message = base64.encodestring(message)
- if text and tag:
- tag = base64.encodestring(tag).strip()
-
- return code, message, tag
+""" %(junk_msg, tag, contents, extra_newline)
Index: PacketHandler.py
===================================================================
RCS file: /home/minion/cvsroot/src/minion/lib/mixminion/server/PacketHandler.py,v
retrieving revision 1.3
retrieving revision 1.4
diff -u -d -r1.3 -r1.4
--- PacketHandler.py 16 Dec 2002 02:40:11 -0000 1.3
+++ PacketHandler.py 10 Jan 2003 20:12:05 -0000 1.4
@@ -3,11 +3,17 @@
"""mixminion.PacketHandler: Code to process mixminion packets on a server"""
+import base64
+
import mixminion.Crypto as Crypto
import mixminion.Packet as Packet
import mixminion.Common as Common
+import mixminion.BuildMessage
-__all__ = [ 'PacketHandler', 'ContentError' ]
+from mixminion.BuildMessage import CompressedDataTooLong
+from mixminion.Common import MixError, isPrintingAscii
+
+__all__ = [ 'PacketHandler', 'ContentError', 'DeliveryPacket', 'RelayedPacket']
class ContentError(Common.MixError):
"""Exception raised when a packed is malformatted or unacceptable."""
@@ -53,8 +59,10 @@
for h in self.hashlog:
h.close()
- def processMessage(self, msg):
- """Given a 32K mixminion message, processes it completely.
+ def processMessage(self, msg):
+ """DOCDOC
+
+ Given a 32K mixminion message, processes it completely.
Returns one of:
None [if the mesesage should be dropped.]
@@ -145,11 +153,9 @@
# If we're an exit node, there's no need to process the headers
# further.
if rt >= Packet.MIN_EXIT_TYPE:
- return ("EXIT",
- (rt, subh.getExitAddress(),
- keys.get(Crypto.APPLICATION_KEY_MODE),
- subh.getTag(),
- payload))
+ return DeliveryPacket(rt, subh.getExitAddress(),
+ keys.get(Crypto.APPLICATION_KEY_MODE),
+ subh.getTag(), payload)
# If we're not an exit node, make sure that what we recognize our
# routing type.
@@ -186,4 +192,111 @@
# Construct the message for the next hop.
msg = Packet.Message(header1, header2, payload).pack()
- return ("QUEUE", (address, msg))
+ return RelayedPacket(address, msg)
+
+class RelayedPacket:
+ def __init__(self, address, msg):
+ assert isinstance(address, Packet.IPV4Info)
+ self.address = address
+ self.msg = msg
+
+ def isDelivery(self):
+ return 0
+
+ def getAddress(self):
+ return self.address
+
+ def getPacket(self):
+ return self.msg
+
+class DeliveryPacket:
+ def __init__(self, routingType, routingInfo, applicationKey,
+ tag, payload):
+ self.exitType = routingType
+ self.address = routingInfo
+ self.key = applicationKey
+ self.tag = tag
+ self.payload = payload
+ self.contents = None
+ self.type = None
+
+ def isDelivery(self):
+ return 1
+
+ def getExitType(self): return self.exitType
+ def getAddress(self): return self.address
+ def getTag(self): return self.tag
+ def getApplicationKey(self): return self.key
+ def getPayload(self): return self.payload
+
+ def getContents(self):
+ if self.type is None: self.decode()
+ return self.contents
+
+ def isPlaintext(self):
+ if self.type is None: self.decode()
+ return self.type == 'plain'
+
+ def isOvercompressed(self):
+ if self.type is None: self.decode()
+ return self.type == 'long'
+
+ def isEncrypted(self):
+ if self.type is None: self.decode()
+ return self.type == 'enc'
+
+ def isPrintingAscii(self):
+ if self.type is None: self.decode()
+ return isPrintingAscii(self.contents, allowISO=1)
+
+ def isError(self):
+ if self.type is None: self.decode()
+ return self.type == 'err'
+
+ def getFakeTag(self):
+ if self.type is None: self.decode()
+ if self.type == 'enc':
+ return self.tag
+ elif self.type == 'plain':
+ return None
+ else:
+ return self.type
+
+ def decode(self):
+ if self.payload is None:
+ return
+ message = self.payload
+ self.contents = None
+ try:
+ self.contents = mixminion.BuildMessage.decodePayload(message,
+ self.tag)
+ if self.contents is None:
+ # encrypted message
+ self.type = 'enc'
+ self.contents = message
+ else:
+ # forward message
+ self.type = 'plain'
+ # self.contents is right
+ except CompressedDataTooLong, _:
+ self.contents = (mixminion.Packet.parsePayload(message)
+ .getContents())
+ self.type = 'long'
+ except MixError:
+ self.contents = message
+ self.type = 'err'
+
+
+ self.payload = None
+
+ def getAsciiContents(self):
+ if self.type is None:
+ self.decode()
+
+ if self.type == 'plain' and isPrintingAscii(self.contents, allowISO=1):
+ return self.contents
+ else:
+ return base64.encodestring(self.contents)
+
+ def getAsciiTag(self):
+ return base64.encodestring(self.tag).strip()
Index: ServerMain.py
===================================================================
RCS file: /home/minion/cvsroot/src/minion/lib/mixminion/server/ServerMain.py,v
retrieving revision 1.30
retrieving revision 1.31
diff -u -d -r1.30 -r1.31
--- ServerMain.py 9 Jan 2003 18:54:01 -0000 1.30
+++ ServerMain.py 10 Jan 2003 20:12:05 -0000 1.31
@@ -20,7 +20,7 @@
# We pull this from mixminion.Common, just in case somebody still has
# a copy of the old "mixminion/server/Queue.py" (since renamed to
# ServerQueue.py)
-from mixminion.Common import Queue
+from mixminion.Common import MessageQueue
import mixminion.Config
import mixminion.Crypto
@@ -37,9 +37,10 @@
secureDelete, waitForChildren
class IncomingQueue(mixminion.server.ServerQueue.Queue):
- """A DeliveryQueue to accept messages from incoming MMTP connections,
- process them with a packet handler, and send them into a mix pool."""
-
+ """A DeliveryQueue to accept packets from incoming MMTP connections,
+ and hold them until they can be processed. As packets arrive, and
+ are stored to disk, we notify a message queue so that another thread
+ can read them."""
def __init__(self, location, packetHandler):
"""Create an IncomingQueue that stores its messages in <location>
and processes them through <packetHandler>."""
@@ -47,7 +48,7 @@
self.packetHandler = packetHandler
self.mixPool = None
self.moduleManager = None
- self._queue = Queue.Queue()
+ self._queue = MessageQueue()
for h in self.getAllMessages():
assert h is not None
self._queue.put(h)
@@ -65,9 +66,13 @@
assert h is not None
self._queue.put(h)
+ def getMessageQueue(self):
+ return self._queue
+
def deliverMessage(self, handle):
- "DOCDOC"
- # DOCDOC called from within thread.
+ """Process a single message with a given handle, and insert it into
+ the Mix pool. This function is called from within the processing
+ thread."""
ph = self.packetHandler
message = self.messageContents(handle)
try:
@@ -77,20 +82,15 @@
LOG.debug("Padding message %s dropped",
formatBase64(message[:8]))
self.removeMessage(handle)
- elif res[0] == 'EXIT':
- # XXXX Ugly, refactor
- rt, ri, app_key, tag, payload = res[1]
- res = self.moduleManager.decodeMessage(payload, tag, rt, ri)
- LOG.debug("Processed message %s; inserting into pool",
- formatBase64(message[:8]))
- self.mixPool.queueObject(('EXIT', res))
else:
+ if res.isDelivery():
+ res.decode()
LOG.debug("Processed message %s; inserting into pool",
formatBase64(message[:8]))
self.mixPool.queueObject(res)
self.removeMessage(handle)
except mixminion.Crypto.CryptoError, e:
- LOG.warn("Invalid PK or misencrypted packet header in message %s: %s",
+ LOG.warn("Invalid PK or misencrypted header in message %s: %s",
formatBase64(message[:8]), e)
self.removeMessage(handle)
except mixminion.Packet.ParseError, e:
@@ -105,18 +105,19 @@
LOG.error_exc(sys.exc_info(),
"Unexpected error when processing message %s (handle %s)",
formatBase64(message[:8]), handle)
- # ???? Remove? Don't remove?
+ self.removeMessage(handle) # ???? Really dump this message?
class MixPool:
"""Wraps a mixminion.server.Queue.*MixQueue to send messages to an exit
- queue and a delivery queue."""
+ queue and a delivery queue. The files in the MixQueue are instances
+ of RelayedPacket or DeliveryPacket from PacketHandler.
+
+ All methods on this class are invoked from the main thread.
+ """
def __init__(self, config, queueDir):
"""Create a new MixPool, based on this server's configuration and
queue location."""
- # DOCDOC lock
- self.__lock = threading.Lock()
-
server = config['Server']
interval = server['MixInterval'][2]
if server['MixAlgorithm'] == 'TimedMixQueue':
@@ -139,16 +140,15 @@
self.moduleManager = None
def lock(self):
- self.__lock.acquire()
+ self.queue.lock()
def unlock(self):
- self.__lock.release()
+ self.queue.unlock()
def queueObject(self, obj):
"""Insert an object into the queue."""
- self.__lock.acquire()
+ obj.isDelivery() #XXXX remove this implicit typecheck.
self.queue.queueObject(obj)
- self.__lock.release()
def count(self):
"Return the number of messages in the queue"
@@ -171,16 +171,17 @@
self.queue.count(), len(handles))
for h in handles:
- tp, info = self.queue.getObject(h)
- if tp == 'EXIT':
- (exitType, address, tag), payload = info
+ packet = self.queue.getObject(h)
+ #XXXX remove the first case
+ if type(packet) == type(()):
+ LOG.debug(" (skipping message %s in obsolete format)", h)
+ elif packet.isDelivery():
LOG.debug(" (sending message %s to exit modules)",
- formatBase64(payload[:8]))
- self.moduleManager.queueDecodedMessage((exitType,address,tag),
- payload)
+ formatBase64(packet.getContents()[:8]))
+ self.moduleManager.queueDecodedMessage(packet)
else:
- assert tp == 'QUEUE'
- ipv4, msg = info
+ ipv4 = packet.getAddress()
+ msg = packet.getPacket()
LOG.debug(" (sending message %s to MMTP server)",
formatBase64(msg[:8]))
self.outgoingQueue.queueDeliveryMessage(ipv4, msg)
@@ -192,7 +193,12 @@
return now + self.queue.getInterval()
class OutgoingQueue(mixminion.server.ServerQueue.DeliveryQueue):
- """DeliveryQueue to send messages via outgoing MMTP connections."""
+ """DeliveryQueue to send messages via outgoing MMTP connections. All
+ methods on this class are called from the main thread. The addresses
+ in this queue are pickled IPV4Info objects.
+
+ All methods in this class are run from the main thread.
+ """
def __init__(self, location):
"""Create a new OutgoingQueue that stores its messages in a given
location."""
@@ -217,7 +223,10 @@
class _MMTPServer(mixminion.server.MMTPServer.MMTPAsyncServer):
"""Implementation of mixminion.server.MMTPServer that knows about
- delivery queues."""
+ delivery queues.
+
+ All methods in this class are run from the main thread.
+ """
def __init__(self, config, tls):
mixminion.server.MMTPServer.MMTPAsyncServer.__init__(self, config, tls)
@@ -235,16 +244,16 @@
self.outgoingQueue.deliveryFailed(handle, retriable)
#----------------------------------------------------------------------
class CleaningThread(threading.Thread):
+ """Thread that handles file deletion."""
#DOCDOC
def __init__(self):
threading.Thread.__init__(self)
- self._queue = Queue.Queue()
- self.setDaemon(1)
+ self.mqueue = MessageQueue()
def deleteFile(self, fname):
LOG.trace("Scheduling %s for deletion", fname)
assert fname is not None
- self._queue.put(fname)
+ self.mqueue.put(fname)
def deleteFiles(self, fnames):
for f in fnames:
@@ -252,12 +261,12 @@
def shutdown(self):
LOG.info("Telling cleanup thread to shut down.")
- self._queue.put(None)
+ self.mqueue.put(None)
def run(self):
try:
while 1:
- fn = self._queue.get()
+ fn = self.mqueue.get()
if fn is None:
LOG.info("Cleanup thread shutting down.")
return
@@ -276,26 +285,25 @@
threading.Thread.__init__(self)
# Clean up logic; maybe refactor. ????
self.incomingQueue = incomingQueue
- self.setDaemon(1) #????
+ self.mqueue = incomingQueue.getMessageQueue()
def shutdown(self):
LOG.info("Telling processing thread to shut down.")
- self.incomingQueue._queue.put(None)
+ self.mqueue.put(None)
def run(self):
- while 1:
- handle = self.incomingQueue._queue.get()
- if handle is None:
- LOG.info("Processing thread shutting down.")
- return
- self.incomingQueue.deliverMessage(handle)
- # XXXX debugging hack
- if self.incomingQueue._queue.qsize() == 0:
- n = self.incomingQueue.count(1)
- if n != 0:
- LOG.trace("_queue was empty, but incoming queue had %s",n)
-
+ try:
+ while 1:
+ handle = self.mqueue.get()
+ if handle is None:
+ LOG.info("Processing thread shutting down.")
+ return
+ self.incomingQueue.deliverMessage(handle)
+ except:
+ LOG.error_exc(sys.exc_info(),
+ "Exception while processing; shutting down thread.")
+#----------------------------------------------------------------------
STOPPING = 0
def _sigTermHandler(signal_num, _):
'''(Signal handler for SIGTERM)'''
@@ -432,8 +440,8 @@
# 'MIX', 'SHRED', and 'TIMEOUT'. Kept in sorted order.
scheduledEvents = []
now = time.time()
- #XXXX restore
- scheduledEvents.append( (now + 120, "SHRED") )#FFFF make configurable
+
+ scheduledEvents.append( (now + 600, "SHRED") )#FFFF make configurable
scheduledEvents.append( (self.mmtpServer.getNextTimeoutTime(now),
"TIMEOUT") )
nextMix = self.mixPool.getNextMixTime(now)
@@ -442,13 +450,6 @@
scheduledEvents.sort()
# FFFF Support for automatic key rotation.
-
- # ???? Our cuurent approach can make the server unresponsive when
- # ???? mixing many messages at once: We stop answering requests, and
- # ???? don't start again until we've delivered all the pending
- # ???? messages! Also, we process every packet as soon as it arrives,
- # ???? which can also make the system pause for a few ms at a time.
- # ???? Possible solutions: Multiple threads or processes...?
while 1:
nextEventTime = scheduledEvents[0][0]
now = time.time()
@@ -460,8 +461,16 @@
LOG.info("Caught sigterm; shutting down.")
return
elif GOT_HUP:
- LOG.info("Ignoring sighup for now, sorry.")
+ LOG.info("Caught sighup")
+ LOG.info("Resetting logs")
+ LOG.reset()
GOT_HUP = 0
+ # ???? This could slow us down a good bit. Move it?
+ if not (self.cleaningThread.isAlive() and
+ self.processingThread.isAlive() and
+ self.moduleManager.thread.isAlive()):
+ LOG.fatal("One of our threads has halted; shutting down.")
+ return
## # Process any new messages that have come in, placing them
## # into the mix pool.
@@ -484,8 +493,7 @@
(self.mmtpServer.getNextTimeoutTime(now), "TIMEOUT"))
elif event == 'SHRED':
self.cleanQueues()
- insort(scheduledEvents,
- (now + 120, "SHRED")) #XXXX Restore original value
+ insort(scheduledEvents, (now + 600, "SHRED"))
elif event == 'MIX':
# Before we mix, we need to log the hashes to avoid replays.
# FFFF We need to recover on server failure.
@@ -619,7 +627,7 @@
sys.exit(1)
try:
- print >>sys.stderr, "Reading configuration from %s"%configFile
+ print "Reading configuration from %s"%configFile
return mixminion.server.ServerConfig.ServerConfig(fname=configFile)
except (IOError, OSError), e:
print >>sys.stderr, "Error reading configuration file %r:"%configFile
@@ -640,11 +648,12 @@
except:
info = sys.exc_info()
LOG.fatal_exc(info,"Exception while configuring server")
- print >>sys.stderr, "Shutting down because of exception: %s"%info[1]
+ LOG.fatal("Shutting down because of exception: %s", info[0])
+ #XXXX if sys.stderr is still real, send a message there as well.
sys.exit(1)
if config['Server'].get("Daemon",1):
- print >>sys.stderr, "Starting server in the background"
+ print "Starting server in the background"
try:
daemonize()
except:
@@ -664,7 +673,8 @@
except:
info = sys.exc_info()
LOG.fatal_exc(info,"Exception while configuring server")
- print >>sys.stderr, "Shutting down because of exception: %s"%info[1]
+ LOG.fatal("Shutting down because of exception: %s", info[0])
+ #XXXX if sys.stderr is still real, send a message there as well.
sys.exit(1)
LOG.info("Starting server: Mixminion %s", mixminion.__version__)
@@ -675,7 +685,8 @@
except:
info = sys.exc_info()
LOG.fatal_exc(info,"Exception while running server")
- print >>sys.stderr, "Shutting down because of exception: %s"%info[1]
+ LOG.fatal("Shutting down because of exception: %s", info[0])
+ #XXXX if sys.stderr is still real, send a message there as well.
LOG.info("Server shutting down")
server.close()
LOG.info("Server is shut down")
@@ -721,10 +732,10 @@
LOG.setMinSeverity("INFO")
mixminion.Crypto.init_crypto(config)
keyring = mixminion.server.ServerKeys.ServerKeyring(config)
- print >>sys.stderr, "Creating %s keys..." % keys
+ print "Creating %s keys..." % keys
for i in xrange(keys):
keyring.createKeys(1)
- print >> sys.stderr, ".... (%s/%s done)" % (i+1,keys)
+ print ".... (%s/%s done)" % (i+1,keys)
#----------------------------------------------------------------------
_REMOVEKEYS_USAGE = """\
Index: ServerQueue.py
===================================================================
RCS file: /home/minion/cvsroot/src/minion/lib/mixminion/server/ServerQueue.py,v
retrieving revision 1.1
retrieving revision 1.2
diff -u -d -r1.1 -r1.2
--- ServerQueue.py 9 Jan 2003 06:28:58 -0000 1.1
+++ ServerQueue.py 10 Jan 2003 20:12:05 -0000 1.2
@@ -30,9 +30,9 @@
# trash.
INPUT_TIMEOUT = 6000
-# If we've been cleaning for more than CLEAN_TIMEOUT seconds, assume the
-# old clean is dead.
-CLEAN_TIMEOUT = 120
+## # If we've been cleaning for more than CLEAN_TIMEOUT seconds, assume the
+## # old clean is dead.
+## CLEAN_TIMEOUT = 120
class Queue:
"""A Queue is an unordered collection of files with secure insert, move,
@@ -47,23 +47,34 @@
(Where HANDLE is a randomly chosen 12-character selection from the
characters 'A-Za-z0-9+-'. [Collision probability is negligable.])
- XXXX Threading notes: Currently, Queue is only threadsafe when XXXX
+ Threading notes: Although Queue itself is threadsafe, you'll want
+ to synchronize around any multistep operations that you want to
+ run atomicly. Use Queue.lock() and Queue.unlock() for this.
+
+ In the Mixminion server, no queue currently has more than one producer
+ or more than one consumer ... so synchronization turns out to be
+ fairly easy.
"""
- # How negligible? A back-of-the-envelope approximation: The chance
- # of a collision reaches .1% when you have 3e9 messages in a single
- # queue. If Alice somehow manages to accumulate a 96 gigabyte
- # backlog, we'll have bigger problems than name collision... such
- # as the fact that most Unices behave badly when confronted with
- # 3 billion files in the same directory... or the fact that,
- # at today's processor speeds, it will take Alice 3 or 4
- # CPU-years to clear her backlog.
+ # How negligible are the chances of collision? A back-of-the-envelope
+ # approximation: The chance of a collision reaches .1% when you have 3e9
+ # messages in a single queue. If Alice somehow manages to accumulate a
+ # 96 gigabyte backlog, we'll have bigger problems than name
+ # collision... such as the fact that most Unices behave badly when
+ # confronted with 3 billion files in the same directory... or the fact
+ # that, at today's processor speeds, it will take Alice 3 or 4 CPU-years
+ # to clear her backlog.
+ #
+ # Threading: If we ever get more than one producer, we're fine. With
+ # more than one consumer, we'll need to modify DeliveryQueue below.
+
# Fields: dir--the location of the queue.
# n_entries: the number of complete messages in the queue.
# <0 if we haven't counted yet.
# _lock: A lock that must be held while modifying or accessing
- # the queue object. Because of our naming scheme, access
- # to the filesystem itself need not be synchronized.
+ # the queue object. Filesystem operations are allowed
+ # without holding the lock, but they must not be visible
+ # to users of the queue.
def __init__(self, location, create=0, scrub=0):
"""Creates a queue object for a given directory, 'location'. If
'create' is true, creates the directory if necessary. If 'scrub'
@@ -89,12 +100,20 @@
# Count messages on first time through.
self.n_entries = -1
+ def lock(self):
+ """Prevent access to this queue from other threads."""
+ self._lock.acquire()
+
+ def unlock(self):
+ """Release the lock on this queue."""
+ self._lock.release()
+
def queueMessage(self, contents):
"""Creates a new message in the queue whose contents are 'contents',
and returns a handle to that message."""
f, handle = self.openNewMessage()
f.write(contents)
- self.finishMessage(f, handle)
+ self.finishMessage(f, handle) # handles locking
return handle
def queueObject(self, object):
@@ -102,7 +121,7 @@
object."""
f, handle = self.openNewMessage()
cPickle.dump(object, f, 1)
- self.finishMessage(f, handle)
+ self.finishMessage(f, handle) # handles locking
return handle
def count(self, recount=0):
@@ -127,10 +146,7 @@
If there are fewer than 'count' messages in the queue, all the
messages will be retained."""
- self._lock.acquire()
- handles = [ fn[4:] for fn in os.listdir(self.dir)
- if fn.startswith("msg_") ]
- self._lock.release()
+ handles = self.getAllMessages() # handles locking
return getCommonPRNG().shuffle(handles, count)
@@ -144,7 +160,7 @@
def removeMessage(self, handle):
"""Given a handle, removes the corresponding message from the queue."""
- self.__changeState(handle, "msg", "rmv")
+ self.__changeState(handle, "msg", "rmv") # handles locking.
def removeAll(self):
"""Removes all messages from this queue."""
@@ -176,28 +192,38 @@
def getMessagePath(self, handle):
"""Given a handle for an existing message, return the name of the
file that contains that message."""
+ # We don't need to lock here: the handle is still valid, or it isn't.
return os.path.join(self.dir, "msg_"+handle)
def openMessage(self, handle):
"""Given a handle for an existing message, returns a file descriptor
open to read that message."""
+ # We don't need to lock here; the handle is still valid, or it isn't.
return open(os.path.join(self.dir, "msg_"+handle), 'rb')
def messageContents(self, handle):
"""Given a message handle, returns the contents of the corresponding
message."""
- f = open(os.path.join(self.dir, "msg_"+handle), 'rb')
- s = f.read()
- f.close()
- return s
+ try:
+ self._lock.acquire()
+ f = open(os.path.join(self.dir, "msg_"+handle), 'rb')
+ s = f.read()
+ f.close()
+ return s
+ finally:
+ self._lock.release()
def getObject(self, handle):
"""Given a message handle, read and unpickle the contents of the
corresponding message."""
- f = open(os.path.join(self.dir, "msg_"+handle), 'rb')
- res = cPickle.load(f)
- f.close()
- return res
+ try:
+ self._lock.acquire()
+ f = open(os.path.join(self.dir, "msg_"+handle), 'rb')
+ res = cPickle.load(f)
+ f.close()
+ return res
+ finally:
+ self._lock.release()
def openNewMessage(self):
"""Returns (file, handle) tuple to create a new message. Once
@@ -228,36 +254,41 @@
DOCDOC secureDeleteFn
"""
- # ???? Threading?
- now = time.time()
- cleanFile = os.path.join(self.dir,".cleaning")
+ # We don't need to hold the lock here; we synchronize via the
+ # filesystem.
- cleaning = 1
- while cleaning:
- try:
- # Try to get the .cleaning lock file. If we can create it,
- # we're the only cleaner around.
- fd = os.open(cleanFile, os.O_WRONLY+os.O_CREAT+os.O_EXCL, 0600)
- os.write(fd, str(now))
- os.close(fd)
- cleaning = 0
- except OSError:
- try:
- # If we can't create the file, see if it's too old. If it
- # is too old, delete it and try again. If it isn't, there
- # may be a live clean in progress.
- s = os.stat(cleanFile)
- if now - s[stat.ST_MTIME] > CLEAN_TIMEOUT:
- os.unlink(cleanFile)
- else:
- return 1
- except OSError:
- # If the 'stat' or 'unlink' calls above fail, then
- # .cleaning must not exist, or must not be readable
- # by us.
- if os.path.exists(cleanFile):
- # In the latter case, bail out.
- return 1
+# XXXX this logic never worked anyway; now we do all our cleaning in a separate
+# XXXX thread anyway.
+
+## now = time.time()
+## cleanFile = os.path.join(self.dir,".cleaning")
+##
+## cleaning = 1
+## while cleaning:
+## try:
+## # Try to get the .cleaning lock file. If we can create it,
+## # we're the only cleaner around.
+## fd = os.open(cleanFile, os.O_WRONLY+os.O_CREAT+os.O_EXCL, 0600)
+## os.write(fd, str(now))
+## os.close(fd)
+## cleaning = 0
+## except OSError:
+## try:
+## # If we can't create the file, see if it's too old. If it
+## # is too old, delete it and try again. If it isn't, there
+## # may be a live clean in progress.
+## s = os.stat(cleanFile)
+## if now - s[stat.ST_MTIME] > CLEAN_TIMEOUT:
+## os.unlink(cleanFile)
+## else:
+## return 1
+## except OSError:
+## # If the 'stat' or 'unlink' calls above fail, then
+## # .cleaning must not exist, or must not be readable
+## # by us.
+## if os.path.exists(cleanFile):
+## # In the latter case, bail out.
+## return 1
rmv = []
allowedTime = int(time.time()) - INPUT_TIMEOUT
@@ -280,8 +311,17 @@
to 's2', and changes the internal count."""
try:
self._lock.acquire()
- os.rename(os.path.join(self.dir, s1+"_"+handle),
- os.path.join(self.dir, s2+"_"+handle))
+ try:
+ os.rename(os.path.join(self.dir, s1+"_"+handle),
+ os.path.join(self.dir, s2+"_"+handle))
+ except OSError, e:
+ contents = os.listdir(self.dir)
+ LOG.error("Error while trying to change %s from %s to %s: %s",
+ handle, s1, s2, e)
+ LOG.error("Directory %s contains: %s", self.dir, contents)
+ self.count(1)
+ return
+
if self.n_entries < 0:
return
if s1 == 'msg' and s2 != 'msg':
@@ -315,6 +355,8 @@
won't play nice if multiple instances are looking at the same
directory.
"""
+ # XXXX separating addr was a mistake.
+
###
# Fields:
# sendable -- A list of handles for all messages