[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[minion-cvs] HALFTESTED] Make the server slightly multithreaded to k...



Update of /home/minion/cvsroot/src/minion/lib/mixminion/server
In directory moria.mit.edu:/tmp/cvs-serv10668/lib/mixminion/server

Modified Files:
	HashLog.py Modules.py ServerMain.py __init__.py 
Added Files:
	ServerQueue.py 
Removed Files:
	Queue.py 
Log Message:
[HALFTESTED] Make the server slightly multithreaded to keep it from stalling.

Previously, we'd become unresponsive to the network while processing
packets, overwriting files, or sending messages via SMTP or Mixmaster.
Now we have a single separate thread to handle each of those, so we
will no longer see stalls of 30ms up to 5 minutes (!!).

The ideal is still for networking code to use the async server in
the main thread, whenever convenient and possible.

(NOTE: THIS CODE HAS WORKED FOR ME ON MY LAPTOP.  It is not tested or
documented enough to convince me that it is correct.  It has not been
tested on Linux. There are subtle issues about synchronizing access to
server queues that I need to hash out with myself before I trust this
code.  By all means feel free to play with it.... but if it breaks [as
Linus likes to say], you get to keep both pieces.)

BuildMessage:
- Make use of new Crypto.getTrueRNG.

Common:
- Import 'Queue' from the python distribution here.
- Make blocking securedelete work well with sigchild handler.
- Add 'blocking' option to waitForChildren.
- Remove unused, unusable signal stuff.

Crypto:
- Optimize getCommonPRNG for the common case
- Add a lock to TRNG
- Add an accessor function: getTrueRNG.

benchmark:
- Add note about limitations of hashlog benchmarks.

test, server.Queue<deleted>, server.ServerQueue<new>:
- Rename Queue.py to ServerQueue.py to avoid conflicting with Queue from
  the standard python distribution.
- Add locking -- possibly not completely right, and certainly not well
  documented, to StandardQueue.

HashLog:
- Add locks to hashlog.

ServerMain, Modules:
- The big threading patch, part 1.  Go from a single-thread model to one
  with 4 threads:
        * Main thread that runs the async server
	* Processing thread that handles all the crypto and packet magic
	* Cleaning thread that overwrites dead files
	* Delivery thread that handles all outgoing messages not bound
	  for the main server.  (Right now, that's everything but MMTP,
	  including SMTP and SMTP-via-Mixmaster).
  The code works for me, but needs some rethinking and cleanup in places.
- Catch HUP and TERM; take appropriate action on TERM.



--- NEW FILE: ServerQueue.py ---
# Copyright 2002 Nick Mathewson.  See LICENSE for licensing information.
# $Id: ServerQueue.py,v 1.1 2003/01/09 06:28:58 nickm Exp $

"""mixminion.server.ServerQueue

   Facility for fairly secure, directory-based, unordered queues.
   """

import os
import base64
import time
import stat
import cPickle
import threading

from mixminion.Common import MixError, MixFatalError, secureDelete, LOG, \
     createPrivateDir
from mixminion.Crypto import getCommonPRNG

__all__ = [ 'Queue', 'DeliveryQueue', 'TimedMixQueue', 'CottrellMixQueue',
            'BinomialCottrellMixQueue' ]

# Mode to pass to open(2) for creating a new file, and dying if it already
# exists.
_NEW_MESSAGE_FLAGS = os.O_WRONLY+os.O_CREAT+os.O_EXCL
# On windows or mac, binary != text.
_NEW_MESSAGE_FLAGS += getattr(os, 'O_BINARY', 0)

# Any inp_* files older than INPUT_TIMEOUT seconds old are assumed to be
# trash.
INPUT_TIMEOUT = 6000

# If we've been cleaning for more than CLEAN_TIMEOUT seconds, assume the
# old clean is dead.
CLEAN_TIMEOUT = 120

class Queue:
    """A Queue is an unordered collection of files with secure insert, move,
       and delete operations.

       Implementation: a queue is a directory of 'messages'.  Each
       filename in the directory has a name in one of the following
       formats:
             rmv_HANDLE  (A message waiting to be deleted)
             msg_HANDLE  (A message waiting in the queue.
             inp_HANDLE  (An incomplete message being created.)
       (Where HANDLE is a randomly chosen 12-character selection from the
       characters 'A-Za-z0-9+-'.  [Collision probability is negligable.])

       XXXX Threading notes: Currently, Queue is only threadsafe when XXXX
       """
       # How negligible?  A back-of-the-envelope approximation: The chance
       # of a collision reaches .1% when you have 3e9 messages in a single
       # queue.  If Alice somehow manages to accumulate a 96 gigabyte
       # backlog, we'll have bigger problems than name collision... such
       # as the fact that most Unices behave badly when confronted with
       # 3 billion files in the same directory... or the fact that,
       # at today's processor speeds, it will take Alice 3 or 4
       # CPU-years to clear her backlog.

    # Fields:   dir--the location of the queue.
    #           n_entries: the number of complete messages in the queue.
    #                 <0 if we haven't counted yet.
    #           _lock: A lock that must be held while modifying or accessing
    #                 the queue object.  Because of our naming scheme, access
    #                 to the filesystem itself need not be synchronized.
    def __init__(self, location, create=0, scrub=0):
        """Creates a queue object for a given directory, 'location'.  If
           'create' is true, creates the directory if necessary.  If 'scrub'
           is true, removes any incomplete or invalidated messages from the
           Queue."""

        secureDelete([]) # Make sure secureDelete is configured. HACK!

        self._lock = threading.RLock()
        self.dir = location

        if not os.path.isabs(location):
            LOG.warn("Queue path %s isn't absolute.", location)

        if os.path.exists(location) and not os.path.isdir(location):
            raise MixFatalError("%s is not a directory" % location)

        createPrivateDir(location, nocreate=(not create))

        if scrub:
            self.cleanQueue()

        # Count messages on first time through.
        self.n_entries = -1

    def queueMessage(self, contents):
        """Creates a new message in the queue whose contents are 'contents',
           and returns a handle to that message."""
        f, handle = self.openNewMessage()
        f.write(contents)
        self.finishMessage(f, handle)
        return handle

    def queueObject(self, object):
        """Queue an object using cPickle, and return a handle to that
           object."""
        f, handle = self.openNewMessage()
        cPickle.dump(object, f, 1)
        self.finishMessage(f, handle)
        return handle

    def count(self, recount=0):
        """Returns the number of complete messages in the queue."""
        try:
            self._lock.acquire()
            if self.n_entries >= 0 and not recount:
                return self.n_entries
            else:
                res = 0
                for fn in os.listdir(self.dir):
                    if fn.startswith("msg_"):
                        res += 1
                self.n_entries = res
                return res
        finally:
            self._lock.release()
            
    def pickRandom(self, count=None):
        """Returns a list of 'count' handles to messages in this queue.
           The messages are chosen randomly, and returned in a random order.

           If there are fewer than 'count' messages in the queue, all the
           messages will be retained."""
        self._lock.acquire()
        handles = [ fn[4:] for fn in os.listdir(self.dir)
                           if fn.startswith("msg_") ]
        self._lock.release()

        return getCommonPRNG().shuffle(handles, count)

    def getAllMessages(self):
        """Returns handles for all messages currently in the queue.
           Note: this ordering is not guaranteed to be random"""
        self._lock.acquire()
        hs = [fn[4:] for fn in os.listdir(self.dir) if fn.startswith("msg_")]
        self._lock.release()
        return hs

    def removeMessage(self, handle):
        """Given a handle, removes the corresponding message from the queue."""
        self.__changeState(handle, "msg", "rmv")

    def removeAll(self):
        """Removes all messages from this queue."""
        try:
            self._lock.acquire()
            for m in os.listdir(self.dir):
                if m[:4] in ('inp_', 'msg_'):
                    self.__changeState(m[4:], m[:3], "rmv")
            self.n_entries = 0
            self.cleanQueue()
        finally:
            self._lock.release()

    def moveMessage(self, handle, queue):
        """Given a handle and a queue, moves the corresponding message from
           this queue to the queue provided.  Returns a new handle for
           the message in the destination queue."""
        # Since we're switching handle, we don't want to just rename;
        # We really want to copy and delete the old file.
        try:
            self._lock.acquire()
            newHandle = queue.queueMessage(self.messageContents(handle))
            self.removeMessage(handle)
        finally:
            self._lock.release()

        return newHandle

    def getMessagePath(self, handle):
        """Given a handle for an existing message, return the name of the
           file that contains that message."""
        return os.path.join(self.dir, "msg_"+handle)

    def openMessage(self, handle):
        """Given a handle for an existing message, returns a file descriptor
           open to read that message."""
        return open(os.path.join(self.dir, "msg_"+handle), 'rb')

    def messageContents(self, handle):
        """Given a message handle, returns the contents of the corresponding
           message."""
        f = open(os.path.join(self.dir, "msg_"+handle), 'rb')
        s = f.read()
        f.close()
        return s

    def getObject(self, handle):
        """Given a message handle, read and unpickle the contents of the
           corresponding message."""
        f = open(os.path.join(self.dir, "msg_"+handle), 'rb')
        res = cPickle.load(f)
        f.close()
        return res

    def openNewMessage(self):
        """Returns (file, handle) tuple to create a new message.  Once
           you're done writing, you must call finishMessage to
           commit your changes, or abortMessage to reject them."""
        handle = self.__newHandle()
        fname = os.path.join(self.dir, "inp_"+handle)
        fd = os.open(fname, _NEW_MESSAGE_FLAGS, 0600)
        return os.fdopen(fd, 'wb'), handle

    def finishMessage(self, f, handle):
        """Given a file and a corresponding handle, closes the file
           commits the corresponding message."""
        f.close()
        self.__changeState(handle, "inp", "msg")

    def abortMessage(self, f, handle):
        """Given a file and a corresponding handle, closes the file
           rejects the corresponding message."""
        f.close()
        self.__changeState(handle, "inp", "rmv")

    def cleanQueue(self, secureDeleteFn=None):
        """Removes all timed-out or trash messages from the queue.

           Returns 1 if a clean is already in progress; otherwise
           returns 0.

           DOCDOC secureDeleteFn
        """
        # ???? Threading?
        now = time.time()
        cleanFile = os.path.join(self.dir,".cleaning")

        cleaning = 1
        while cleaning:
            try:
                # Try to get the .cleaning lock file.  If we can create it,
                # we're the only cleaner around.
                fd = os.open(cleanFile, os.O_WRONLY+os.O_CREAT+os.O_EXCL, 0600)
                os.write(fd, str(now))
                os.close(fd)
                cleaning = 0
            except OSError:
                try:
                    # If we can't create the file, see if it's too old.  If it
                    # is too old, delete it and try again.  If it isn't, there
                    # may be a live clean in progress.
                    s = os.stat(cleanFile)
                    if now - s[stat.ST_MTIME] > CLEAN_TIMEOUT:
                        os.unlink(cleanFile)
                    else:
                        return 1
                except OSError:
                    # If the 'stat' or 'unlink' calls above fail, then
                    # .cleaning must not exist, or must not be readable
                    # by us.
                    if os.path.exists(cleanFile):
                        # In the latter case, bail out.
                        return 1

        rmv = []
        allowedTime = int(time.time()) - INPUT_TIMEOUT
        for m in os.listdir(self.dir):
            if m.startswith("rmv_"):
                rmv.append(os.path.join(self.dir, m))
            elif m.startswith("inp_"):
                s = os.stat(m)
                if s[stat.ST_MTIME] < allowedTime:
                    self.__changeState(m[4:], "inp", "rmv")
                    rmv.append(os.path.join(self.dir, m))
        if secureDeleteFn:
            secureDeleteFn(rmv)
        else:
            secureDelete(rmv, blocking=1)
        return 0

    def __changeState(self, handle, s1, s2):
        """Helper method: changes the state of message 'handle' from 's1'
           to 's2', and changes the internal count."""
        try:
            self._lock.acquire()
            os.rename(os.path.join(self.dir, s1+"_"+handle),
                      os.path.join(self.dir, s2+"_"+handle))
            if self.n_entries < 0:
                return
            if s1 == 'msg' and s2 != 'msg':
                self.n_entries -= 1
            elif s1 != 'msg' and s2 == 'msg':
                self.n_entries += 1
        finally:
            self._lock.release()

    def __newHandle(self):
        """Helper method: creates a new random handle."""
        junk = getCommonPRNG().getBytes(9)
        return base64.encodestring(junk).strip().replace("/","-")

class DeliveryQueue(Queue):
    """A DeliveryQueue implements a queue that greedily sends messages
       to outgoing streams that occasionally fail.  Messages in a
       DeliveryQueue are no longer unstructured text, but rather
       tuples of: (n_retries, addressing info, msg).

       This class is abstract. Implementors of this class should
       subclass it to add a _deliverMessages method.  Multiple
       invocations of this method may be active at a given time.  Upon
       success or failure, this method should cause deliverySucceeded
       or deliveryFailed to be called as appropriate.

       Users of this class will probably only want to call the queueMessage,
       sendReadyMessages, and nextMessageReadyAt methods.

       This class caches information about the directory state; it
       won't play nice if multiple instances are looking at the same
       directory.
    """
    ###
    # Fields:
    #    sendable -- A list of handles for all messages
    #           that we're not currently sending.
    #    pending -- Dict from handle->1, for all messages that we're
    #           currently sending.

    def __init__(self, location):
        Queue.__init__(self, location, create=1, scrub=1)
        self._rescan()

    def _rescan(self):
        """Rebuild the internal state of this queue from the underlying
           directory."""
        try:
            self._lock.acquire()
            self.pending = {}
            self.sendable = self.getAllMessages()
        finally:
            self._lock.release()

    def queueMessage(self, msg):
        if 1: raise MixError("Tried to call DeliveryQueue.queueMessage.")

    def queueDeliveryMessage(self, addr, msg, retry=0):
        """Schedule a message for delivery.
             addr -- An object to indicate the message's destination
             msg -- the message itself
             retry -- how many times so far have we tried to send?"""

        try:
            self._lock.acquire()
            handle = self.queueObject( (retry, addr, msg) )
            self.sendable.append(handle)
        finally:
            self._lock.release()

        return handle

    def get(self,handle):
        """Returns a (n_retries, addr, msg) payload for a given
           message handle."""
        return self.getObject(handle)

    def sendReadyMessages(self):
        """Sends all messages which are not already being sent."""

        try:
            self._lock.acquire()
            handles = self.sendable
            messages = []
            self.sendable = []
            for h in handles:
                retries, addr, msg = self.getObject(h)
                messages.append((h, addr, msg, retries))
                self.pending[h] = 1
        finally:
            self._lock.release()
        if messages:
            self._deliverMessages(messages)

    def _deliverMessages(self, msgList):
        """Abstract method; Invoked with a list of
           (handle, addr, message, n_retries) tuples every time we have a batch
           of messages to send.

           For every handle in the list, delierySucceeded or deliveryFailed
           should eventually be called, or the message will sit in the queue
           indefinitely, without being retried."""

        # We could implement this as a single _deliverMessage(h,addr,m,n)
        # method, but that wouldn't allow implementations to batch
        # messages being sent to the same address.

        raise NotImplementedError("_deliverMessages")

    def deliverySucceeded(self, handle):
        """Removes a message from the outgoing queue.  This method
           should be invoked after the corresponding message has been
           successfully delivered.
        """
        try:
            self._lock.acquire()
            self.removeMessage(handle)
            del self.pending[handle]
        finally:
            self._lock.release()

    def deliveryFailed(self, handle, retriable=0):
        """Removes a message from the outgoing queue, or requeues it
           for delivery at a later time.  This method should be
           invoked after the corresponding message has been
           successfully delivered."""
        try:
            self._lock.acquire()
            del self.pending[handle]
            if retriable:
                # Queue the new one before removing the old one, for
                # crash-proofness
                retries, addr, msg = self.getObject(handle)
                # FFFF This test makes us never retry past the 10th attempt.
                # FFFF That's wrong; we should be smarter.
                if retries <= 10:
                    self.queueDeliveryMessage(addr, msg, retries+1)
            self.removeMessage(handle)
        finally:
            self._lock.release()

class TimedMixQueue(Queue):
    """A TimedMixQueue holds a group of files, and returns some of them
       as requested, according to a mixing algorithm that sends a batch
       of messages every N seconds."""
    # FFFF : interval is unused.
    ## Fields:
    #   interval: scanning interval, in seconds.
    def __init__(self, location, interval=600):
        """Create a TimedMixQueue that sends its entire batch of messages
           every 'interval' seconds."""
        Queue.__init__(self, location, create=1, scrub=1)
        self.interval = interval

    def getBatch(self):
        """Return handles for all messages that the pool is currently ready
           to send in the next batch"""
        return self.pickRandom()

    def getInterval(self):
        return self.interval

class CottrellMixQueue(TimedMixQueue):
    """A CottrellMixQueue holds a group of files, and returns some of them
       as requested, according the Cottrell (timed dynamic-pool) mixing
       algorithm from Mixmaster."""
    # FFFF : interval is unused.
    ## Fields:
    # interval: scanning interval, in seconds.
    # minPool: Minimum number of messages to keep in pool.
    # minSend: Minimum number of messages above minPool before we consider
    #      sending.
    # sendRate: Largest fraction of the pool to send at a time.
    def __init__(self, location, interval=600, minPool=6, minSend=1,
                 sendRate=.7):
        """Create a new queue that yields a batch of message every 'interval'
           seconds, always keeps <minPool> messages in the pool, never sends
           unless it has <minPool>+<minSend> messages, and never sends more
           than <sendRate> * the corrent pool size.

           If 'minSend'==1, this is a real Cottrell (type II style) mix pool.
           Otherwise, this is a generic 'timed dynamic-pool' mix pool.  (Note
           that there is still a matter of some controversy whether it ever
           makes sense to set minSend != 1.)
           """

        # Note that there was a bit of confusion here: earlier versions
        # implemented an algorithm called "mixmaster" that wasn't actually the
        # mixmaster algorithm.  I picked up the other algorithm from an early
        # draft of Roger, Paul, and Andrei's 'Batching Taxonomy' paper (since
        # corrected); they seem to have gotten it from Anja Jerichow's
        # Phd. thesis ("Generalisation and Security Improvement of
        # Mix-mediated Anonymous Communication") of 2000.
        #
        # *THIS* is the algorithm that the current 'Batching Taxonomy' paper
        # says that Cottrell says is the real thing.

        TimedMixQueue.__init__(self, location, interval)
        self.minPool = minPool
        self.minSend = minSend
        self.sendRate = sendRate

    def _getBatchSize(self):
        "Helper method: returns the number of messages to send."
        pool = self.count()
        if pool >= (self.minPool + self.minSend):
            sendable = pool - self.minPool
            return min(sendable, max(1, int(pool * self.sendRate)))
        else:
            return 0

    def getBatch(self):
        "Returns a list of handles for the next batch of messages to send."
        n = self._getBatchSize()
        if n:
            return self.pickRandom(n)
        else:
            return []

class BinomialCottrellMixQueue(CottrellMixQueue):
    """Same algorithm as CottrellMixQueue, but instead of sending N messages
       from the pool of size P, sends each message with probability N/P."""
    def getBatch(self):
        n = self._getBatchSize()
        if n == 0:
            return []
        msgProbability = n / float(self.count())
        rng = getCommonPRNG()
        return rng.shuffle([ h for h in self.getAllMessages()
                             if rng.getFloat() < msgProbability ])

Index: HashLog.py
===================================================================
RCS file: /home/minion/cvsroot/src/minion/lib/mixminion/server/HashLog.py,v
retrieving revision 1.3
retrieving revision 1.4
diff -u -d -r1.3 -r1.4
--- HashLog.py	16 Dec 2002 02:40:11 -0000	1.3
+++ HashLog.py	9 Jan 2003 06:28:58 -0000	1.4
@@ -8,6 +8,7 @@
 
 import os
 import anydbm, dumbdbm
+import threading
 from mixminion.Common import MixFatalError, LOG, createPrivateDir
 from mixminion.Packet import DIGEST_LEN
 
@@ -80,36 +81,53 @@
 
         self.journalFile = os.open(self.journalFileName,
                     _JOURNAL_OPEN_FLAGS|os.O_APPEND, 0600)
+        self.__lock = threading.RLock()
 
     def seenHash(self, hash):
         """Return true iff 'hash' has been logged before."""
         try:
-            if self.journal.get(hash,0):
+            self.__lock.acquire()
+            try:
+                if self.journal.get(hash,0):
+                    return 1
+                _ = self.log[hash]
                 return 1
-            _ = self.log[hash]
-            return 1
-        except KeyError:
-            return 0
+            except KeyError:
+                return 0
+        finally:
+            self.__lock.release()
 
     def logHash(self, hash):
         """Insert 'hash' into the database."""
         assert len(hash) == DIGEST_LEN
-        self.journal[hash] = 1
-        os.write(self.journalFile, hash)
+        try:
+            self.__lock.acquire()
+            self.journal[hash] = 1
+            os.write(self.journalFile, hash)
+        finally:
+            self.__lock.release()
 
     def sync(self):
         """Flushes changes to this log to the filesystem."""
-        for hash in self.journal.keys():
-            self.log[hash] = "1"
-        if hasattr(self.log, "sync"):
-            self.log.sync()
-        os.close(self.journalFile)
-        self.journalFile = os.open(self.journalFileName,
-                   _JOURNAL_OPEN_FLAGS|os.O_TRUNC, 0600)
-        self.journal = {}
+        try:
+            self.__lock.acquire()
+            for hash in self.journal.keys():
+                self.log[hash] = "1"
+            if hasattr(self.log, "sync"):
+                self.log.sync()
+            os.close(self.journalFile)
+            self.journalFile = os.open(self.journalFileName,
+                       _JOURNAL_OPEN_FLAGS|os.O_TRUNC, 0600)
+            self.journal = {}
+        finally:
+            self.__lock.release()
 
     def close(self):
         """Closes this log."""
-        self.sync()
-        self.log.close()
-        os.close(self.journalFile)
+        try:
+            self.__lock.acquire()
+            self.sync()
+            self.log.close()
+            os.close(self.journalFile)
+        finally:
+            self.__lock.release()

Index: Modules.py
===================================================================
RCS file: /home/minion/cvsroot/src/minion/lib/mixminion/server/Modules.py,v
retrieving revision 1.21
retrieving revision 1.22
diff -u -d -r1.21 -r1.22
--- Modules.py	8 Jan 2003 07:58:10 -0000	1.21
+++ Modules.py	9 Jan 2003 06:28:58 -0000	1.22
@@ -19,6 +19,7 @@
 import smtplib
 import socket
 import base64
+import threading
 
 if sys.version_info[:2] >= (2,3):
     import textwrap
@@ -28,7 +29,7 @@
 import mixminion.BuildMessage
 import mixminion.Config
 import mixminion.Packet
-import mixminion.server.Queue
+import mixminion.server.ServerQueue
 from mixminion.Config import ConfigError, _parseBoolean, _parseCommand
 from mixminion.Common import LOG, createPrivateDir, MixError, isSMTPMailbox, \
      isPrintingAscii
@@ -118,7 +119,7 @@
 class ImmediateDeliveryQueue:
     """Helper class usable as delivery queue for modules that don't
        actually want a queue.  Such modules should have very speedy
-       processMessage() methods, and should never have deliery fail."""
+       processMessage() methods, and should never have delivery fail."""
     ##Fields:
     #  module: the underlying DeliveryModule object.
     def __init__(self, module):
@@ -143,17 +144,17 @@
         # We do nothing here; we already delivered the messages
         pass
 
-    def cleanQueue(self):
+    def cleanQueue(self, deleteFn=None):
         # There is no underlying queue to worry about here; do nothing.
         pass
 
-class SimpleModuleDeliveryQueue(mixminion.server.Queue.DeliveryQueue):
+class SimpleModuleDeliveryQueue(mixminion.server.ServerQueue.DeliveryQueue):
     """Helper class used as a default delivery queue for modules that
        don't care about batching messages to like addresses."""
     ## Fields:
     # module: the underlying module.
     def __init__(self, module, directory):
-        mixminion.server.Queue.DeliveryQueue.__init__(self, directory)
+        mixminion.server.ServerQueue.DeliveryQueue.__init__(self, directory)
         self.module = module
 
     def _deliverMessages(self, msgList):
@@ -173,6 +174,37 @@
                                    "Exception delivering message")
                 self.deliveryFailed(handle, 0)
 
+class DeliveryThread(threading.Thread):
+    def __init__(self, moduleManager):
+        threading.Thread.__init__(self)
+        self.moduleManager = moduleManager
+        self.event = threading.Event()
+        self.setDaemon(1)
+        self.__stoppinglock = threading.Lock() # UGLY. XXXX
+        self.isStopping = 0
+
+    def beginSending(self):
+        self.event.set()
+
+    def shutdown(self):
+        LOG.info("Telling delivery thread to shut down.")
+        self.__stoppinglock.acquire()
+        self.isStopping = 1
+        self.__stoppinglock.release()
+        self.event.set()
+
+    def run(self):
+        while 1:
+            self.event.wait()
+            self.event.clear()
+            self.__stoppinglock.acquire()
+            stop = self.isStopping
+            self.__stoppinglock.release()
+            if stop:
+                LOG.info("Delivery thread shutting down.")
+                return
+            self.moduleManager._sendReadyMessages()
+
 class ModuleManager:
     """A ModuleManager knows about all of the server modules in the system.
 
@@ -202,8 +234,9 @@
     #            queueMessage and sendReadyMessages as in DeliveryQueue.)
     #    _isConfigured: flag: has this modulemanager's configure method been
     #            called?
+    # DOCDOC threaded, thread.
 
-    def __init__(self):
+    def __init__(self, threaded=0):
         "Create a new ModuleManager"
         self.syntax = {}
         self.modules = []
@@ -221,6 +254,11 @@
         self.registerModule(MixmasterSMTPModule())
 
         self._isConfigured = 0
+        self.thread = None
+
+    def startThreading(self):
+        self.thread = DeliveryThread(self)
+        self.thread.start()
 
     def isConfigured(self):
         """Return true iff this object's configure method has been called"""
@@ -312,10 +350,10 @@
         self.queues[module.getName()] = queue
         self.enabled[module.getName()] = 1
 
-    def cleanQueues(self):
+    def cleanQueues(self, deleteFn=None):
         """Remove trash messages from all internal queues."""
         for queue in self.queues.values():
-            queue.cleanQueue()
+            queue.cleanQueue(deleteFn)
 
     def disableModule(self, module):
         """Unmaps all the types for a module object."""
@@ -331,6 +369,8 @@
 
     def queueMessage(self, message, tag, exitType, address):
         """Queue a message for delivery."""
+        # XXXX003 remove the more complex logic here into the PacketHandler
+        # XXXX003 code.
         # FFFF Support non-exit messages.
         mod = self.typeToModule.get(exitType, None)
         if mod is None:
@@ -358,7 +398,23 @@
             # forward message
             queue.queueDeliveryMessage((exitType, address, None), payload)
 
+    #DOCDOC
+    def shutdown(self):
+        if self.thread is not None:
+            self.thread.shutdown()
+
+    def join(self):
+        if self.thread is not None:
+            self.thread.join()
+
+    # XXXX refactor, document
     def sendReadyMessages(self):
+        if self.thread is not None:
+            self.thread.beginSending()
+        else:
+            self._sendReadyMessages()
+
+    def _sendReadyMessages(self):
         for name, queue in self.queues.items():
             queue.sendReadyMessages()
 
@@ -805,7 +861,7 @@
     def createDeliveryQueue(self, queueDir):
         # We create a temporary queue so we can hold files there for a little
         # while before passing their names to mixmaster.
-        self.tmpQueue = mixminion.server.Queue.Queue(queueDir+"_tmp", 1, 1)
+        self.tmpQueue = mixminion.server.ServerQueue.Queue(queueDir+"_tmp", 1, 1)
         self.tmpQueue.removeAll()
         return _MixmasterSMTPModuleDeliveryQueue(self, queueDir)
 

Index: ServerMain.py
===================================================================
RCS file: /home/minion/cvsroot/src/minion/lib/mixminion/server/ServerMain.py,v
retrieving revision 1.26
retrieving revision 1.27
diff -u -d -r1.26 -r1.27
--- ServerMain.py	8 Jan 2003 03:56:54 -0000	1.26
+++ ServerMain.py	9 Jan 2003 06:28:58 -0000	1.27
@@ -14,31 +14,42 @@
 import getopt
 import os
 import sys
+import signal
 import time
+import threading
+# We pull this from mixminion.Common, just in case somebody still has
+# a copy of the old "mixminion/server/Queue.py" (since renamed to
+# ServerQueue.py)
+from mixminion.Common import Queue
 
 import mixminion.Config
 import mixminion.Crypto
 import mixminion.server.MMTPServer
 import mixminion.server.Modules
 import mixminion.server.PacketHandler
-import mixminion.server.Queue
+import mixminion.server.ServerQueue
 import mixminion.server.ServerConfig
 import mixminion.server.ServerKeys
 
 from bisect import insort
 from mixminion.Common import LOG, LogStream, MixError, MixFatalError, ceilDiv,\
-     createPrivateDir, formatBase64, formatTime, waitForChildren
+     createPrivateDir, formatBase64, formatTime, installSIGCHLDHandler, \
+     secureDelete, waitForChildren
 
-class IncomingQueue(mixminion.server.Queue.DeliveryQueue):
+class IncomingQueue(mixminion.server.ServerQueue.Queue):
     """A DeliveryQueue to accept messages from incoming MMTP connections,
        process them with a packet handler, and send them into a mix pool."""
 
     def __init__(self, location, packetHandler):
         """Create an IncomingQueue that stores its messages in <location>
            and processes them through <packetHandler>."""
-        mixminion.server.Queue.DeliveryQueue.__init__(self, location)
+        mixminion.server.ServerQueue.Queue.__init__(self, location)
         self.packetHandler = packetHandler
         self.mixPool = None
+        self._queue = Queue.Queue()
+        for h in self.getAllMessages():
+            assert h is not None
+            self._queue.put(h)
 
     def connectQueues(self, mixPool):
         """Sets the target mix queue"""
@@ -48,33 +59,41 @@
         """Add a message for delivery"""
         LOG.trace("Inserted message %s into incoming queue",
                   formatBase64(msg[:8]))
-        self.queueDeliveryMessage(None, msg)
+        h = mixminion.server.ServerQueue.Queue.queueMessage(self, msg)
+        assert h is not None
+        self._queue.put(h)
 
-    def _deliverMessages(self, msgList):
-        "Implementation of abstract method from DeliveryQueue."
+    def deliverMessage(self, handle):
+        "DOCDOC"
+        # DOCDOC called from within thread.
         ph = self.packetHandler
-        for handle, _, message, n_retries in msgList:
-            try:
-                res = ph.processMessage(message)
-                if res is None:
-                    # Drop padding before it gets to the mix.
-                    LOG.debug("Padding message %s dropped",
-                              formatBase64(message[:8]))
-                    self.deliverySucceeded(handle)
-                else:
-                    LOG.debug("Processed message %s; inserting into pool",
-                              formatBase64(message[:8]))
-                    self.mixPool.queueObject(res)
-                    self.deliverySucceeded(handle)
-            except mixminion.Crypto.CryptoError, e:
-                LOG.warn("Invalid PK or misencrypted packet header: %s", e)
-                self.deliveryFailed(handle)
-            except mixminion.Packet.ParseError, e:
-                LOG.warn("Malformed message dropped: %s", e)
-                self.deliveryFailed(handle)
-            except mixminion.server.PacketHandler.ContentError, e:
-                LOG.warn("Discarding bad packet: %s", e)
-                self.deliveryFailed(handle)
+        message = self.messageContents(handle)
+        try:
+            res = ph.processMessage(message)
+            if res is None:
+                # Drop padding before it gets to the mix.
+                LOG.debug("Padding message %s dropped",
+                          formatBase64(message[:8]))
+                self.removeMessage(handle)
+            else:
+                LOG.debug("Processed message %s; inserting into pool",
+                          formatBase64(message[:8]))
+                self.mixPool.queueObject(res)
+                self.removeMessage(handle)
+        except mixminion.Crypto.CryptoError, e:
+            LOG.warn("Invalid PK or misencrypted packet header: %s", e)
+            self.removeMessage(handle)
+        except mixminion.Packet.ParseError, e:
+            LOG.warn("Malformed message dropped: %s", e)
+            self.removeMessage(handle)
+        except mixminion.server.PacketHandler.ContentError, e:
+            LOG.warn("Discarding bad packet: %s", e)
+            self.removeMessage(handle)
+        except:
+            LOG.error_exc(sys.exc_info(),
+                    "Unexpected error when processing message %s (handle %s)",
+                          formatBase64(message[:8]), handle)
+            # ???? Remove?  Don't remove?
 
 class MixPool:
     """Wraps a mixminion.server.Queue.*MixQueue to send messages to an exit
@@ -83,18 +102,21 @@
         """Create a new MixPool, based on this server's configuration and
            queue location."""
 
+        # DOCDOC lock
+        self.__lock = threading.Lock()
+
         server = config['Server']
         interval = server['MixInterval'][2]
         if server['MixAlgorithm'] == 'TimedMixQueue':
-            self.queue = mixminion.server.Queue.TimedMixQueue(
+            self.queue = mixminion.server.ServerQueue.TimedMixQueue(
                 location=queueDir, interval=interval)
         elif server['MixAlgorithm'] == 'CottrellMixQueue':
-            self.queue = mixminion.server.Queue.CottrellMixQueue(
+            self.queue = mixminion.server.ServerQueue.CottrellMixQueue(
                 location=queueDir, interval=interval,
                 minPool=server.get("MixPoolMinSize", 5),
                 sendRate=server.get("MixPoolRate", 0.6))
         elif server['MixAlgorithm'] == 'BinomialCottrellMixQueue':
-            self.queue = mixminion.server.Queue.BinomialCottrellMixQueue(
+            self.queue = mixminion.server.ServerQueue.BinomialCottrellMixQueue(
                 location=queueDir, interval=interval,
                 minPool=server.get("MixPoolMinSize", 5),
                 sendRate=server.get("MixPoolRate", 0.6))
@@ -104,9 +126,17 @@
         self.outgoingQueue = None
         self.moduleManager = None
 
+    def lock(self):
+        self.__lock.acquire()
+
+    def unlock(self):
+        self.__lock.release()
+
     def queueObject(self, obj):
         """Insert an object into the queue."""
+        self.__lock.acquire()
         self.queue.queueObject(obj)
+        self.__lock.release()
 
     def count(self):
         "Return the number of messages in the queue"
@@ -148,12 +178,12 @@
            mix."""
         return now + self.queue.getInterval()
 
-class OutgoingQueue(mixminion.server.Queue.DeliveryQueue):
+class OutgoingQueue(mixminion.server.ServerQueue.DeliveryQueue):
     """DeliveryQueue to send messages via outgoing MMTP connections."""
     def __init__(self, location):
         """Create a new OutgoingQueue that stores its messages in a given
            location."""
-        mixminion.server.Queue.DeliveryQueue.__init__(self, location)
+        mixminion.server.ServerQueue.DeliveryQueue.__init__(self, location)
         self.server = None
 
     def connectQueues(self, server):
@@ -190,6 +220,84 @@
 
     def onMessageUndeliverable(self, msg, handle, retriable):
         self.outgoingQueue.deliveryFailed(handle, retriable)
+#----------------------------------------------------------------------
+class CleaningThread(threading.Thread):
+    #DOCDOC
+    def __init__(self):
+        threading.Thread.__init__(self)
+        self._queue = Queue.Queue()
+        self.setDaemon(1)
+
+    def deleteFile(self, fname):
+        LOG.trace("Scheduling %s for deletion", fname)
+        assert fname is not None
+        self._queue.put(fname)
+
+    def deleteFiles(self, fnames):
+        for f in fnames:
+            self.deleteFile(f)
+
+    def shutdown(self):
+        LOG.info("Telling cleanup thread to shut down") #????info
+        self._queue.put(None)
+
+    def run(self):
+        try:
+            while 1:
+                fn = self._queue.get()
+                if fn is None:
+                    LOG.info("Cleanup thread shutting down.")
+                    return
+                if os.path.exists(fn):
+                    LOG.trace("Deleting %s", fn)
+                    secureDelete(fn, blocking=1)
+                else:
+                    LOG.warn("Delete thread didn't find file %s",fn)
+        except:
+            LOG.error_exc(sys.exc_info(),
+                          "Exception while cleaning; shutting down thread.")
+
+class PacketProcessingThread(threading.Thread):
+    #DOCDOC
+    def __init__(self, incomingQueue):
+        threading.Thread.__init__(self)
+        # Clean up logic; maybe refactor. ????
+        self.incomingQueue = incomingQueue
+        self.setDaemon(1) #????
+
+    def shutdown(self):
+        LOG.info("Telling processing thread to shut down.")
+        self.incomingQueue._queue.put(None)
+
+    def run(self):
+        while 1:
+            handle = self.incomingQueue._queue.get()
+            if handle is None:
+                LOG.info("Processing thread shutting down.")
+                return
+            self.incomingQueue.deliverMessage(handle)
+
+
+STOPPING = 0
+def _sigTermHandler(signal_num, _):
+    '''(Signal handler for SIGTERM)'''
+    signal.signal(signal_num, _sigTermHandler)
+    global STOPPING
+    STOPPING = 1
+
+GOT_HUP = 0
+def _sigHupHandler(signal_num, _):
+    '''(Signal handler for SIGTERM)'''
+    signal.signal(signal_num, _sigHupHandler)
+    global GOT_HUP
+    GOT_HUP = 1
+
+def installSignalHandlers():
+    "DOCDOC"
+    signal.signal(signal.SIGHUP, _sigHupHandler)
+    signal.signal(signal.SIGTERM, _sigTermHandler)
+
+#----------------------------------------------------------------------
 
 class MixminionServer:
     """Wraps and drives all the queues, and the async net server.  Handles
@@ -212,10 +320,15 @@
     # moduleManager: Instance of ModuleManager.  Map routing types to
     #    outging queues, and processes non-MMTP exit messages.
     # outgoingQueue: Holds messages waiting to be send via MMTP.
-
+    # DOCDOC cleaningThread, processingthread
+    
     def __init__(self, config):
         """Create a new server from a ServerConfig."""
         LOG.debug("Initializing server")
+
+        installSIGCHLDHandler()
+        installSignalHandlers()
+        
         self.config = config
         homeDir = config['Server']['Homedir']
         createPrivateDir(homeDir)
@@ -283,10 +396,15 @@
         self.mmtpServer.connectQueues(incoming=self.incomingQueue,
                                       outgoing=self.outgoingQueue)
 
+        self.cleaningThread = CleaningThread()
+        self.cleaningThread.start()
+        self.processingThread = PacketProcessingThread(self.incomingQueue)
+        self.processingThread.start()
+        self.moduleManager.startThreading()
 
     def run(self):
         """Run the server; don't return unless we hit an exception."""
-
+        global GOT_HUP
         f = open(self.pidFile, 'wt')
         f.write("%s\n" % os.getpid())
         f.close()
@@ -297,7 +415,8 @@
         #  'MIX', 'SHRED', and 'TIMEOUT'.  Kept in sorted order.
         scheduledEvents = []
         now = time.time()
-        scheduledEvents.append( (now + 600, "SHRED") )#FFFF make configurable
+        #XXXX restore
+        scheduledEvents.append( (now + 120, "SHRED") )#FFFF make configurable
         scheduledEvents.append( (self.mmtpServer.getNextTimeoutTime(now),
                                  "TIMEOUT") )
         nextMix = self.mixPool.getNextMixTime(now)
@@ -319,12 +438,21 @@
             timeLeft = nextEventTime - now
             while timeLeft > 0:
                 # Handle pending network events
-                self.mmtpServer.process(timeLeft)
-                # Process any new messages that have come in, placing them
-                # into the mix pool.
-                self.incomingQueue.sendReadyMessages()
-                # Prevent child processes from turning into zombies.
-                waitForChildren(1)
+                self.mmtpServer.process(2)
+                if STOPPING:
+                    LOG.info("Caught sigterm; shutting down.")
+                    return
+                elif GOT_HUP:
+                    LOG.info("Ignoring sighup for now, sorry.")
+                    GOT_HUP = 0
+                
+##                 # Process any new messages that have come in, placing them
+##                 # into the mix pool.
+##                 self.incomingQueue.sendReadyMessages()
+##                  ##Prevent child processes from turning into zombies.
+##                  #???? I think we should just install a SIGCHLD handler.
+##                  waitForChildren(onceOnly=1,blocking=0)
+
                 # Calculate remaining time.
                 now = time.time()
                 timeLeft = nextEventTime - now
@@ -338,19 +466,27 @@
                 insort(scheduledEvents,
                        (self.mmtpServer.getNextTimeoutTime(now), "TIMEOUT"))
             elif event == 'SHRED':
-                LOG.debug("Overwriting deleted files")
                 self.cleanQueues()
                 insort(scheduledEvents,
-                       (now + 600, "SHRED"))
+                       (now + 120, "SHRED")) #XXXX Restore original value
             elif event == 'MIX':
                 # Before we mix, we need to log the hashes to avoid replays.
                 # FFFF We need to recover on server failure.
-                self.packetHandler.syncLogs()
 
-                LOG.trace("Mix interval elapsed")
-                # Choose a set of outgoing messages; put them in
-                # outgoingqueue and modulemanger
-                self.mixPool.mix()
+                try:
+                    # There's a potential threading problem here... in
+                    # between this sync and the 'mix' below, nobody should
+                    # insert into the mix pool.
+                    self.mixPool.lock()
+                    self.packetHandler.syncLogs()
+
+                    LOG.trace("Mix interval elapsed")
+                    # Choose a set of outgoing messages; put them in
+                    # outgoingqueue and modulemanager
+                    self.mixPool.mix()
+                finally:
+                    self.mixPool.unlock()
+                    
                 # Send outgoing messages
                 self.outgoingQueue.sendReadyMessages()
                 # Send exit messages
@@ -366,13 +502,22 @@
     def cleanQueues(self):
         """Remove all deleted messages from queues"""
         LOG.trace("Expunging deleted messages from queues")
-        self.incomingQueue.cleanQueue()
-        self.mixPool.queue.cleanQueue()
-        self.outgoingQueue.cleanQueue()
-        self.moduleManager.cleanQueues()
+        df = self.cleaningThread.deleteFiles
+        self.incomingQueue.cleanQueue(df)
+        self.mixPool.queue.cleanQueue(df)
+        self.outgoingQueue.cleanQueue(df)
+        self.moduleManager.cleanQueues(df)
 
     def close(self):
         """Release all resources; close all files."""
+        self.cleaningThread.shutdown()
+        self.processingThread.shutdown()
+        self.moduleManager.shutdown()
+
+        self.cleaningThread.join()
+        self.processingThread.join()
+        self.moduleManager.join()
+        
         self.packetHandler.close()
         try:
             os.unlink(self.lockFile)

Index: __init__.py
===================================================================
RCS file: /home/minion/cvsroot/src/minion/lib/mixminion/server/__init__.py,v
retrieving revision 1.2
retrieving revision 1.3
diff -u -d -r1.2 -r1.3
--- __init__.py	31 Dec 2002 04:36:12 -0000	1.2
+++ __init__.py	9 Jan 2003 06:28:58 -0000	1.3
@@ -8,11 +8,3 @@
 
 __all__ = [ ]
 
-#'MMTPServer', 'Queue', 'HashLog', 'PacketHandler', 'Modules',
-#	    'ServerMain' ]
-## import mixminion.server.MMTPServer as MMTPServer
-## import mixminion.server.PacketHandler as PacketHandler
-## import mixminion.server.HashLog as HashLog
-## import mixminion.server.Modules as Modules
-## import mixminion.server.Queue as Queue
-## import mixminion.server.ServerMain as ServerMain

--- Queue.py DELETED ---