[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[minion-cvs] Sunday"s hacks, today! More work towards clean, usable...



Update of /home/minion/cvsroot/src/minion/lib/mixminion
In directory moria.seul.org:/tmp/cvs-serv31913/lib/mixminion

Modified Files:
	Common.py MMTPServer.py Modules.py Queue.py ServerInfo.py 
	ServerMain.py 
Log Message:
Sunday's hacks, today!  More work towards clean, usable, testable server
logic.

HACKING:
	Bring up to date

Makefile, setup:
	Make it work on macos X

Common:
	Move in createPrivateDirs from servermain
	Document log severities

MMTPServer:
	Track retriable and non-retriable failures better; add failure
	   callback

Modules:
	Document classes
	Add per-module queues
	Begin new processMessage method (incomplete)
	Simplify DropModule
	Switch MBOX to use SMTPlib.
	
Queue:
	Add convenience methods to pickle and unpickle objects
	Add DeliveryQueue class to encapsulate queue/send/retry logic
	Add MixQueue to implement Cottrell mixing.
	
ServerMain:
	Adopt some (but not yet all) of above improvements

aes_ctr.c:
	Fix big-endian *_U32 macros
	Add INCR_U32 macro to improve performance on big-endian machines
	Rewrite code to use INCR_ macros




Index: Common.py
===================================================================
RCS file: /home/minion/cvsroot/src/minion/lib/mixminion/Common.py,v
retrieving revision 1.11
retrieving revision 1.12
diff -u -d -r1.11 -r1.12
--- Common.py	11 Aug 2002 07:50:34 -0000	1.11
+++ Common.py	12 Aug 2002 18:12:24 -0000	1.12
@@ -51,6 +51,28 @@
     return divmod(a-1,b)[0]+1
 
 #----------------------------------------------------------------------
+def createPrivateDir(d):
+    """Create a directory, and all parent directories, checking permissions
+       as we go along.  All superdirectories must be owned by root or us.
+       
+       XXXX we don't check permissions properly yet."""
+    if not os.path.exists(d):
+	try:
+	    os.makedirs(s, 0700)
+	except OSError, e:
+	    getLog().fatal("Unable to create directory %s"%d)
+	    raise MixFatalError()
+    elif not os.path.isdir(d):
+        getLog().fatal("%s is not a directory"%d)
+        raise MixFatalError()
+    else:
+        m = os.stat(d)[stat.ST_MODE]
+        # check permissions
+        if m & 0077:
+            getLog().fatal("Directory %s must be mode 0700" %d)
+            raise MixFatalError()
+
+#----------------------------------------------------------------------
 # Secure filesystem operations.
 #
 
@@ -171,7 +193,18 @@
 
 class Log:
     """A Log is a set of destinations for system messages, along with the
-       means to filter them to a desired verbosity."""
+       means to filter them to a desired verbosity.
+
+       Log messages have 'severities' as follows:
+              TRACE: hyperverbose mode; used for debugging fiddly
+                 little details.  This is a security risk.
+              DEBUG: very verbose mode; used for tracing connections
+	         and messages through the system.  This is a security risk.
+              INFO: non-critical events.
+	      WARN: recoverable errors
+	      ERROR: nonrecoverable errors that affect only a single 
+                 message or a connection.
+	      FATAL: nonrecoverable errors that affect the entire system"""
     def __init__(self, minSeverity):
 	self.configure(None)
 	self.setMinSeverity(minSeverity)

Index: MMTPServer.py
===================================================================
RCS file: /home/minion/cvsroot/src/minion/lib/mixminion/MMTPServer.py,v
retrieving revision 1.9
retrieving revision 1.10
diff -u -d -r1.9 -r1.10
--- MMTPServer.py	11 Aug 2002 07:50:34 -0000	1.9
+++ MMTPServer.py	12 Aug 2002 18:12:24 -0000	1.10
@@ -165,7 +165,7 @@
         rw.register(self.server)
 
     def shutdown(self):
-        debug("Closing server connection")
+        debug("Closing listener connection")
         self.server.unregister(self)
         del self.server
         self.sock.close()
@@ -289,7 +289,7 @@
             r = self.__con.read(1024) #may throw want*
             if r == 0:
                 trace("read returned 0.")
-                self.shutdown()
+                self.shutdown(err=0)
                 return
             else:
                 assert isinstance(r, StringType)
@@ -304,7 +304,7 @@
 
         if self.__expectReadLen and self.__inbuflen > self.__expectReadLen:
             warn("Protocol violation: too much data. Closing connection.")
-            self.shutdown(err=1)
+            self.shutdown(err=1, retriable=0)
             return
          
         if self.__terminator and self.__inbuf[0].find(self.__terminator) > -1:
@@ -356,12 +356,13 @@
             self.__server.registerReader(self)
         except _ml.TLSClosed:
             warn("Unexpectedly closed connection")
+	    self.handleFail(retriable=1)
             self.__sock.close()
             self.__server.unregister(self) 
         except _ml.TLSError:
             if self.__state != self.__shutdownFn:
                 warn("Unexpected error: closing connection.")
-                self.shutdown(1)
+                self.shutdown(err=1, retriable=1)
             else:
                 warn("Error while shutting down: closing connection.")
                 self.__server.unregister(self)
@@ -377,11 +378,11 @@
         """Called when this connection is successfully shut down."""
         pass
 
-    def shutdown(self, err=0):
+    def shutdown(self, err=0, retriable=0):
         """Begin a shutdown on this connection"""
-        
+	if err:
+	    self.handleFail(retriable)
         self.__state = self.__shutdownFn
-        #self.__server.registerWriter(self)
         
     def fileno(self):
         return self.__con.fileno()
@@ -392,6 +393,10 @@
 
     def getPeerPK(self):
         return self.__con.get_peer_cert_pk()
+
+    def handleFail(self, retriable=0):
+	"""Called when we shutdown with an error."""
+	pass
     
 #----------------------------------------------------------------------
 PROTOCOL_STRING      = "MMTP 1.0\r\n"
@@ -484,10 +489,10 @@
 
 #----------------------------------------------------------------------
         
-# XXXX retry logic.
+# XXXX We need logic to do retires.
 class MMTPClientConnection(SimpleTLSConnection):
     def __init__(self, context, ip, port, keyID, messageList, handleList,
-                 sentCallback=None):
+                 sentCallback=None, failCallback=None):
 	"""Create a connection to send messages to an MMTP server.  
 	   ip -- The IP of the destination server.
 	   port -- The port to connect to.
@@ -496,7 +501,9 @@
 	   handleList -- a list of objects corresponding to the messages in
 	      messageList.  Used for callback.
            sentCallback -- None, or a function of (msg, handle) to be called
-              whenever a message is successfully sent."""
+              whenever a message is successfully sent.
+           failCallback -- None, or a function of (msg, handle, retriable)
+              to be called when messages can't be sent."""
 	
         trace("CLIENT CON")
         sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
@@ -516,6 +523,7 @@
 	self.handleList = handleList
         self.finished = self.__setupFinished
         self.sentCallback = sentCallback
+	self.failCallback = failCallback
 
     def __setupFinished(self):
         """Called when we're done with the client side negotations.
@@ -525,7 +533,8 @@
         if self.keyID is not None:
             if keyID != self.keyID:
                 warn("Got unexpected Key ID from %s", self.ip)
-                self.shutdown(err=1)
+		# This may work again in a couple of hours
+                self.shutdown(err=1,retriable=1)
             else:
                 debug("KeyID is valid")
 
@@ -546,7 +555,9 @@
         inp = self.getInput()
         if inp != PROTOCOL_STRING:
             warn("Invalid protocol.  Closing connection")
-            self.shutdown(err=1)
+	    # This isn't retriable; we don't talk to servers we don't
+	    # understand.
+            self.shutdown(err=1,retriable=0)
             return
 
         self.beginNextMessage()
@@ -584,7 +595,9 @@
        #XXXX Rehandshake
        inp = self.getInput()
        if inp != (RECEIVED_CONTROL+self.expectedDigest):
-           self.shutdown(1)
+	   # We only get bad ACKs if an adversary somehow subverts TLS's
+	   # checksumming.  That's not fixable.
+           self.shutdown(err=1,retriable=0)
            return
 
        debug("Received valid ACK for message.")
@@ -597,6 +610,11 @@
 	   
        self.beginNextMessage()
 
+    def handleFail(self, retriable):
+	if self.failCallback is not None:
+	    for msg, handle in zip(self.messageList, self.handleList):
+		self.failCallback(msg,handle,retriable)
+
 LISTEN_BACKLOG = 10 # ???? Is something else more reasonable
 class MMTPServer(AsyncServer):
     """A helper class to invoke AsyncServer, MMTPServerConnection, and
@@ -618,7 +636,10 @@
         sock.setblocking(0)
         con = MMTPServerConnection(sock, tls, self.onMessageReceived)
         con.register(self)
-        
+    
+    def stopListening(self):
+	self.listener.shutdown()
+
     def sendMessages(self, ip, port, keyID, messages, handles):
 	"""Send a set of messages to a given server."""
         con = MMTPClientConnection(ip, port, keyID, messages, handles,
@@ -627,6 +648,9 @@
 
     def onMessageReceived(self, msg):
         pass
+
+    def onMessageUndeliverable(self, msg, handle, retriable):
+	pass
 
     def onMessageSent(self, msg, handle):
         pass

Index: Modules.py
===================================================================
RCS file: /home/minion/cvsroot/src/minion/lib/mixminion/Modules.py,v
retrieving revision 1.3
retrieving revision 1.4
diff -u -d -r1.3 -r1.4
--- Modules.py	6 Aug 2002 16:09:21 -0000	1.3
+++ Modules.py	12 Aug 2002 18:12:24 -0000	1.4
@@ -5,15 +5,20 @@
 
    Type codes and dispatch functions for routing functionality."""
 
-#__all__ = [ 'ModuleManager' ]
+__all__ = [ 'ModuleManager', 'DROP_TYPE', 'FWD_TYPE', 'SWAP_FWD_TYPE',
+	    'DELIVER_OK', 'DELIVER_FAIL_RETRY', 'DELIVER_FAIL_NORETRY',
+	    'SMTP_TYPE', 'MBOX_TYPE' ]
 
 import os
+import smtplib
 
 import mixminion.Config
 import mixminion.Packet
+import mixminion.Queue
 from mixminion.Config import ConfigError, _parseBoolean, _parseCommand
 from mixminion.Common import getLog
 
+# Return values for processMessage
 DELIVER_OK = 1
 DELIVER_FAIL_RETRY = 2
 DELIVER_FAIL_NORETRY = 3
@@ -30,11 +35,20 @@
 SMTP_TYPE      = 0x0100  # Mail the message
 MBOX_TYPE      = 0x0101  # Send the message to one of a fixed list of addresses
 
-
 class DeliveryModule:
-    "XXXX DOCME"
+    """Abstract base for modules; delivery modules should implement the methods
+       in this class.
+
+       A delivery module has the following responsibilities:
+           * It must have a 0-argument contructor.
+           * If it is configurable, it must be able to specify its options,
+             validate its configuration, and configure itself.
+           * If it is advertisable, it must provide a server info block.
+           * It must know its own name.
+	   * It must know which types it handles.
+	   * Of course, it needs to know how to deliver a message."""
     def __init__(self):
-        pass
+	pass
 
     def getConfigSyntax(self):
         pass
@@ -49,27 +63,67 @@
         pass
 
     def getName(self):
+	"""Return the name of this module.  This name may be used to construct
+	   directory paths, so it shouldn't contain any funny characters."""
         pass
 
     def getExitTypes(self):
+	"""Return a list of numeric exit types which this module is willing to
+           handle."""
         pass
 
     def processMessage(self, message, exitType, exitInfo):
+	"""Given a message with a given exitType and exitInfo, try to deliver
+           it.  Return one of:
+            DELIVER_OK (if the message was successfully delivered), 
+	    DELIVER_FAIL_RETRY (if the message wasn't delivered, but might be 
+              deliverable later), or
+	    DELIVER_FAIL_NORETRY (if the message shouldn't be tried later)."""
         pass
 
 class ModuleManager:
+    """A ModuleManager knows about all of the modules in the systems.
+   
+       A module may be in one of three states: unloaded, registered, or 
+       enabled.  An unloaded module is just a class in a python module.
+       A registered module has been loaded, configured, and listed with
+       the ModuleManager, but will not receive messags until it has been
+       enabled."""
+    ## 
+    # Fields
+    #    myntax: extensions to the syntax configuration in Config.py
+    #    modules: a list of DeliveryModule objects
+    #    nameToModule: XXXX Docdoc
+    #    typeToModule: a map from delivery type to enabled deliverymodule.
+    #    path: search path for python modules.
+    #    queueRoot: directory where all the queues go.
+    #    queues: a map from module name to queue.
+       
     def __init__(self):
         self.syntax = {}
         self.modules = []
+	self.nameToModule = {}
         self.typeToModule = {}
+	self.path = []
+	self.queueRoot = None
+	self.queues = {}
         
         self.registerModule(MBoxModule())
         self.registerModule(DropModule())
 
+    def _setQueueRoot(self, queueRoot):
+	"""Sets a directory under which all modules' queue directories
+	   should go."""
+        self.queueRoot = queueRoot
+
     def getConfigSyntax(self):
+	"""Returns a dict to extend the syntax configuration in a Config
+	   object. Should be called after all modules are registered."""
         return self.syntax
 
     def registerModule(self, module):
+	"""Inform this ModuleManager about a delivery module.  This method
+   	   updates the syntax options, but does not enable the module."""
         self.modules.append(module)
         syn = module.getConfigSyntax()
         for sec, rules in syn.items():
@@ -78,40 +132,80 @@
         self.syntax.update(syn)
 
     def setPath(self, path):
+	"""Sets the search path for Python modules"""
         self.path = path
 
     def loadExtModule(self, className):
-        # CHECK! XXXX Handle errors
+	"""Load and register a module from a python file.  Takes a classname
+           of the format module.Class or package.module.Class.  Raises
+	   MixError if the module can't be loaded."""
         ids = className.split(".")
         pyPkg = ".".join(ids[:-1])
         pyClassName = ids[-1]
         try:
             orig_path = sys.path[:]
-            sys.path.extend(self.path)
-            m = __import__(pyPkg, {}, {}, [])
+	    sys.path[0:0] = self.path
+	    try:
+		m = __import__(pyPkg, {}, {}, [])
+	    except ImportError, e:
+		raise MixError("%s while importing %s" %(str(e),className))
         finally:
             sys.path = orig_path
-        pyClass = getattr(pyPkg, pyClassname)
-        self.registerModule(pyClass())
-
+	try:
+	    pyClass = getattr(m, pyClassname)
+	except AttributeError, e:
+	    raise MixError("No class %s in module %s" %(pyClassName,pyPkg))
+	try:
+	    self.registerModule(pyClass())
+	except Exception, e:
+	    raise MixError("Error initializing module %s" %className)
+	    
     def validate(self, sections, entries, lines, contents):
         for m in self.modules:
             m.validateConfig(sections, entries, lines, contents)
 
     def configure(self, config):
+	self.queueRoot = os.path.join(config['Server']['Homedir'],
+				      'work', 'queues', 'deliver')
+	createPrivateDir(self.queueRoot)
         for m in self.modules:
             m.configure(config, self)
 
     def enableModule(self, module):
+	"""Maps all the types for a module object."""
         for t in module.getExitTypes():
             self.typeToModule[t] = module
+	queueDir = os.path.join(self.queueRoot, module.getName())
+	queue = mixminion.Queue.Queue(queueDiir, create=1, scrub=1)
+	self.queues[module.getName()] = queue
 
     def disableModule(self, module):
+	"""Unmaps all the types for a module object."""
         for t in module.getExitTypes():
             if self.typeToModule.has_key(t):
                 del self.typeToModule[t]
+	if self.queues.has_key(module.getName()):
+	    del self.queues[module.getName()]
+
+    def queueMessage(self, message, exitType, exitInfo):
+        mod = self.typeToModule.get(exitType, None)
+        if mod is not None:
+	    queue = self.queues[mod.getName()]
+	    f, handle = queue.openNewMessage()
+	    cPickle.dumps((0, exitType, exitInfo, message), f, 1)
+	    queue.finishMessage(f, handle)
+        else:
+            getLog().error("Unable to queue message with unknown type %s",
+                           exitType)
+
+    def processMessages(self):
+	for name, queue in self.queues.items():
+	    if len(queue):
+		XXXX
 
     def processMessage(self, message, exitType, exitInfo):
+	"""Tries to deliver a message.  Return types are as in 
+           DeliveryModule.processMessage"""
         mod = self.typeToModule.get(exitType, None)
         if mod is not None:
             return mod.processMessage(message, exitType, exitInfo)
@@ -123,30 +217,21 @@
     def getServerInfoBlocks(self):
         return [ m.getServerInfoBlock() for m in self.modules ]
 
+#----------------------------------------------------------------------
 class DropModule(DeliveryModule):
-    def __init__(self):
-        DeliveryModule.__init__(self)
-
+    """Null-object pattern: drops all messages it receives."""
     def getConfigSyntax(self):
         return { }
-
-    def validateConfig(self, sections, entries, lines, contents):
-        pass
-
-    def configure(self, config, moduleManager):
-        pass
-
     def getServerInfoBlock(self):
         return ""
-    
+    def configure(self, config, manager):
+	manager.enable(self)
     def getName(self):
-        return "DROP module"
-
+        return "DROP"
     def getExitTypes(self):
         return [ DROP_TYPE ]
-
     def processMessage(self, message, exitType, exitInfo):
-        getLog().info("Dropping padding message")
+        getLog().debug("Dropping padding message")
         return DELIVER_OK
 
 #----------------------------------------------------------------------
@@ -163,7 +248,7 @@
                    'AddressFile' : ('ALLOW', None, None),
                    'ReturnAddress' : ('ALLOW', None, None),
                    'RemoveContact' : ('ALLOW', None, None),
-                   'Command' : ('ALLOW', _parseCommand, "sendmail") }
+                   'SMTPServer' : ('ALLOW', None, 'localhost') }
                  }
 
     def validateConfig(self, sections, entries, lines, contents):
@@ -173,7 +258,7 @@
     def configure(self, config, moduleManager):
         # XXXX Check this.  error handling
         self.enabled = config['Delivery/MBOX'].get("Enabled", 0)
-        self.command = config['Delivery/MBOX']['Command']
+        self.server = config['Delivery/MBOX']['SMTPServer']
         self.addressFile = config['Delivery/MBOX']['AddressFile']
         self.returnAddress = config['Delivery/MBOX']['ReturnAddress']
         self.contact = config['Delivery/MBOX']['RemoveContact']
@@ -192,9 +277,6 @@
             self.nickname = socket.gethostname()
         self.addr = config['Server'].get('IP', "<Unknown host>")
 
-        if self.command != ('sendmail', []):
-            getLog().warn("Ignoring mail command in version 0.0.1")
-
         f = open(self.addressfile)
         addresses = f.read()
         f.close()
@@ -221,7 +303,7 @@
                """
     
     def getName(self):
-        return "MBOX module"
+        return "MBOX"
 
     def getExitTypes(self):
         return [ MBOX_TYPE ]
@@ -230,12 +312,14 @@
         assert exitType == MBOX_TYPE
         getLog().trace("Received MBOX message")
         info = mixminion.packet.parseMBOXInfo(exitInfo)
-        if not addresses.has_key(info.user):
+	try:
+	    address = addresses[info.user]
+	except KeyError, e:
             getLog.warn("Unknown MBOX user %r", info.user)
-            return 
+
         msg = _escapeMessageForEmail(message)
 
-        fields = { 'user': addresses[info.user],
+        fields = { 'user': address,
                    'return': self.returnAddr,
                    'nickname': self.nickname,
                    'addr': self.addr,
@@ -249,22 +333,29 @@
 THIS IS AN ANONYMOUS MESSAGE.  The mixminion server '%(nickname)s' at
 %(addr)s has been configured to deliver messages to your address.  If you
 do not want to receive messages in the future, contact %(contact)s and you
-will be removed.  (XXXX Need real boilerplate)
+will be removed.
 
 %(msg)s
 """ % fields
 
-        f = os.popen("sendmail -i -t", 'w')
-        f.write(msg)
-        status = f.close()
-        if status != 0:
-            getLog().error("Unsuccessful sendmail")
-            return DELIVER_FAIL_RETRY
-
-        return DELIVER_OK
+        return sendSMTPMessage(self.server, [address], self.returnAddr, msg)
 
 #----------------------------------------------------------------------
+def sendSMTPMessage(server, toList, fromAddr, message):
+    con = smtplib(server)
+    try:
+	con.sendmail(fromAddr, toList, message)
+	res = DELIVER_OK
+    except smtplib.SMTPException, e:
+	getLog().warn("Unsuccessful smtp: "+str(e))
+	res = DELIVER_FAIL_RETRY #????
 
+    con.quit()
+    con.close()
+
+    return res
+
+#----------------------------------------------------------------------
 
 _allChars = "".join(map(chr, range(256)))
 _nonprinting = "".join(map(chr, range(0x00, 0x07)+range(0x0E, 0x20)))
@@ -286,3 +377,4 @@
 ============ BASE-64 ENCODED ANONYMOUS MESSAGE BEGINS
 %s
 ============ BASE-64 ENCODED ANONYMOUS MESSAGE ENDS\n""" % msg
+

Index: Queue.py
===================================================================
RCS file: /home/minion/cvsroot/src/minion/lib/mixminion/Queue.py,v
retrieving revision 1.8
retrieving revision 1.9
diff -u -d -r1.8 -r1.9
--- Queue.py	11 Aug 2002 07:50:34 -0000	1.8
+++ Queue.py	12 Aug 2002 18:12:24 -0000	1.9
@@ -10,6 +10,7 @@
 import base64
 import time
 import stat
+import cPickle
 
 from mixminion.Common import MixError, MixFatalError, secureDelete, getLog
 from mixminion.Crypto import AESCounterPRNG
@@ -86,7 +87,7 @@
         mode = os.stat(location)[stat.ST_MODE]
         if mode & 0077:
             # XXXX be more Draconian.
-            getLog().warn("Worrisome more %o on directory %s", mode, location)
+            getLog().warn("Worrisome mode %o on directory %s", mode, location)
 
         if scrub:
             self.cleanQueue(1)
@@ -102,6 +103,14 @@
         self.finishMessage(f, handle)
         return handle
 
+    def queueObject(self, object):
+	"""Queue an object using cPickle, and return a handle to that 
+	   object."""
+        f, handle = self.openNewMessage()
+        cPickle.dump(contents, f, 1)
+        self.finishMessage(f, handle)
+        return handle
+      
     def count(self, recount=0):
         """Returns the number of complete messages in the queue."""
 	if self.n_entries >= 0 and not recount:
@@ -172,13 +181,21 @@
         return open(os.path.join(self.dir, "msg_"+handle), 'rb')
 
     def messageContents(self, handle):
-        """Given a messagge handle, returns the contents of the corresponding
+        """Given a message handle, returns the contents of the corresponding
            message."""
         f = open(os.path.join(self.dir, "msg_"+handle), 'rb')
         s = f.read()
         f.close()
         return s
 
+    def getObject(self, handle):
+        """Given a message handle, read and unpickle the contents of the
+	   corresponding message."""
+        f = open(os.path.join(self.dir, "msg_"+handle), 'rb')
+        res = cPickle.load(f)
+        f.close()
+        return res
+
     def openNewMessage(self):
         """Returns (file, handle) tuple to create a new message.  Once
            you're done writing, you must call finishMessage to
@@ -252,6 +269,146 @@
         """Helper method: creates a new random handle."""
         junk = self.rng.getBytes(9)
         return base64.encodestring(junk).strip().replace("/","-")
+
+class DeliveryQueue(Queue):
+    """A DeliveryQueue implements a queue that greedily sends messages
+       to outgoing streams that occasionally fail.  Messages in a 
+       DeliveryQueue are no longer unstructured text, but rather
+       tuples of: (n_retries, next_retry_time, addressing info, msg).
+
+       This class is abstract. Implementors of this class should
+       subclass it to add a deliverMessages method.  Multiple
+       invocations of this method may be active at a given time.  Upon
+       success or failure, this method should cause deliverySucceeded
+       or deliveryFailed to be called as appropriate.
+
+       Users of this class will probably only want to call the queueMessage,
+       sendReadyMessages, and nextMessageReadyAt methods.
+
+       This class caches information about the directory state; it
+       won't play nice if multiple instances are looking at the same
+       directory.
+    """
+    ###
+    # Fields:
+    #    sendableAt -- An inorder list of (time, handle) for all messages 
+    #           that we're not currently sending. Each time is either a
+    #           number of seconds since the epoch at which the corresponding
+    #           message will be sendable, or 0 to indicate that the message
+    #           is sendable _now_.  
+    #    pending -- Dict from handle->1, for all messages that we're
+    #           currently sending.
+
+    def __init__(self, location):
+	Queue.__init__(self, location, create=1, scrub=1)
+	self._rescan()
+
+    def _rescan(self):
+	"""Rebuild the internal state of this queue from the underlying
+           directory."""
+	self.sendableAt = []
+	self.pending = {}
+	now = time.time()
+	for h in self.getAllmessages():
+	    t = self.getObject(handle)[1] # retry time
+	    self.sendableAt.append[(t, h)]
+	self.sendableAt.sort()
+    
+    def queueMessage(self, addr, msg, retry=0, retryAt=0):
+	"""Schedule a message for delivery.
+	     addr -- An object to indicate the message's destination
+	     msg -- the message itself
+	     retry -- how many times so far have we tried to send?
+	     retryAt -- when should we next retry? (0 for now)."""
+
+        self.queueObject( (retry, retryAt, addr, msg) )
+
+	if retryAt == 0:
+	    self.sendableAt[0:0] = [(0, handle)]
+	else:
+	    bisect.insort_right(self.sendableAt, (retryAt, handle))
+	return handle
+
+    def nextMessageReadyAt(self):
+	"""Return the soonest possible time at which sendReadyMessages
+	   will send something.  If some time < now is returned,
+	   the answer is 'immediately'. """
+	if self.sendableAt:
+	    return self.sendableAt[0][0]
+	else:
+	    return None
+    
+    def messageContents(self,handle):
+	"""Returns a (n_retries, retryAt, addr, msg) payload for a given
+	   message handle."""
+        return self.getObject(handle)
+	
+    def sendReadyMessages(self):
+	"""Sends all messages which are ready to be send, and which are not
+           already being sent."""
+	now = time.time()
+	idx = bisect.bisect_right(self.sendableAt, (now, ""))
+	messages = []
+	sendable = self.sendableAt[:idx]
+	del self.sendableAt[:idx]
+	for when, h in self.sendable:
+	    retries, retryAt, addr, msg = self.getObject(h)
+	    messages.append((h, addr, msg, retries))
+	    self.pending[h] = 1
+	if messages:
+	    self.deliverMessage(messages)
+
+    def deliverMessages(self, msgList):
+	"""Abstract method; Invoked with a list of  
+  	   (handle, addr, message, n_retries) tuples every time we have a batch
+	   of messages to send."""
+        # We could implement this as a single deliverMessage(h,addr,m,n)
+	# method, but that wouldn't allow implementations to batch
+	# messages being sent to the same address.
+	assert 0
+
+    def deliverySucceeded(self, handle):
+	"""Removes a message from the outgoing queue.  This method
+	   should be invoked after the corresponding message has been
+	   successfully delivered.
+        """
+	self.removeMessage(handle)
+	del self.pending[handle]
+
+    def deliveryFailed(self, handle, retryAt=None):
+	"""Removes a message from the outgoing queue, or requeues it
+	   for delivery at a later time.  This method should be
+	   invoked after the corresponding message has been
+	   successfully delivered."""
+	del self.pending[handle]
+	if self.retryAt is not None:
+	    # Queue the new one before removing the old one, for 
+	    # crash-proofness
+	    retries, retryAt, addr, msg = self.getObject(h)
+	    self.queueMessage(addr, msg, retries+1, retryAt)
+	self.removeMessage(handle)	    
+
+class MixQueue(Queue):
+    """A MixQueue holds a group of files, and returns some of them
+       as requested, according to some mixing algorithm.
+       
+       It's the responsibility of the user of this class to only invoke it
+       at the specified interval."""
+    # Right now, we use the 'Cottrell' mixing algorithm with fixed
+    # parameters.  We always keep 5 messages in the pool, and never send
+    # more than 30% of the pool at a time.  These parameters should probably
+    # be more like 100
+    MIN_POOL_SIZE = 5
+    MAX_REPLACEMENT_RATE = 0.3
+    def __init__(self, location):
+	Queue.__init__(self, location, create=1, scrub=1)
+
+    def pickMessages(self):
+	"""Return a list of handles."""
+	n = self.count()
+	nTransmit = min(n-self.MIN_POOL_SIZE,
+		       int(n*self.MAX_REPLACEMENT_RATE))
+	return self.pickRandom(nTransmit)
 
 def _secureDelete_bg(files, cleanFile):
     pid = os.fork()

Index: ServerInfo.py
===================================================================
RCS file: /home/minion/cvsroot/src/minion/lib/mixminion/ServerInfo.py,v
retrieving revision 1.8
retrieving revision 1.9
diff -u -d -r1.8 -r1.9
--- ServerInfo.py	6 Aug 2002 16:09:21 -0000	1.8
+++ ServerInfo.py	12 Aug 2002 18:12:24 -0000	1.9
@@ -132,7 +132,6 @@
         return IPV4Info(self.getAddr(), self.getPort(), self.getKeyID())
 
 #----------------------------------------------------------------------
-# This should go in a different file.
 class ServerKeys:
     """A set of expirable keys for use by a server.
 
@@ -334,12 +333,13 @@
     finally:
         f.close()
 
-    # for debugging XXXX ### Remove this once we're more confident.
+    # XXXX for debugging: try to parse and validate the thing we just made.
+    # XXXX Remove this once we're more confident.
     ServerInfo(string=info)
 
     return info
 
-#-----------------------b-----------------------------------------------
+#----------------------------------------------------------------------
 def getServerInfoDigest(info):
     """Calculate the digest of a server descriptor"""
     return _getServerInfoDigestImpl(info, None)

Index: ServerMain.py
===================================================================
RCS file: /home/minion/cvsroot/src/minion/lib/mixminion/ServerMain.py,v
retrieving revision 1.2
retrieving revision 1.3
diff -u -d -r1.2 -r1.3
--- ServerMain.py	11 Aug 2002 07:50:34 -0000	1.2
+++ ServerMain.py	12 Aug 2002 18:12:24 -0000	1.3
@@ -9,7 +9,7 @@
 
 import cPickle
 
-from mixminion.Common import getLog, MixFatalError, MixError
+from mixminion.Common import getLog, MixFatalError, MixError, createPrivateDir
 
 # Directory layout:
 #     MINION_HOME/work/queues/incoming/
@@ -27,23 +27,6 @@
 #                 conf/miniond.conf 
 #                       ....
 
-def createDir(d):
-    if not os.path.exists(d):
-        try:
-            os.mkdir(d, 0700)
-        except OSError, e:
-            getLog().fatal("Unable to create directory %s"%d)
-            raise MixFatalError()
-    elif not os.path.isdir(d):
-        getLog().fatal("%s is not a directory"%d)
-        raise MixFatalError()
-    else:
-        m = os.stat(d)[stat.ST_MODE]
-        # check permissions
-        if m & 0077:
-            getLog().fatal("Directory %s must be mode 0700" %d)
-            raise MixFatalError()
-
 class ServerState:
     # XXXX This should be refactored.  keys should be separated from queues.
     # config
@@ -56,8 +39,8 @@
         # set up directory structure.
         c = self.config
         self.homedir = c['Server']['Homedir']
-        createDir(self.homedir)
-        getLog()._configure() # ????
+        createPrivateDir(self.homedir)
+        getLog()._configure(self.config)# ????
         
         w = os.path.join(self.homeDir, "work")
         q = os.path.join(w, "queues")
@@ -65,7 +48,6 @@
         self.mixDir = os.path.join(q, "mix")
         self.outgoingDir = os.path.join(q, "outgoing")
         self.deliverDir = os.path.join(q, "deliver")
-        self.deliverMBOXDir = os.path.join(self.deliverDir, "mbox")
 
         tlsDir = os.path.join(w, "tls")
         self.hashlogsDir = os.path.join(w, "hashlogs")
@@ -75,9 +57,9 @@
         for d in [self.homeDir, w, q, self.incomingDir, self.mixDir,
                   self.outgoingDir, self.deliverDir, tlsDir,
                   self.hashlogsDir, self.keysDir, self.confDir]:
-            createDir(d)
+            createPrivateDir(d)
 
-        for name in ("incoming", "mix", "outgoing", "deliverMBOX"):
+        for name in ("incoming", "mix", "outgoing"):
             loc = getattr(self, name+"Dir")
             queue = mixminion.Queue.Queue(loc, create=1, scrub=1)
             setattr(self, name+"Queue", queue)