[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[minion-cvs] Refactor ServerMain; debug the universe.



Update of /home/minion/cvsroot/src/minion/lib/mixminion
In directory moria.seul.org:/tmp/cvs-serv20327/lib/mixminion

Modified Files:
	Common.py Config.py Crypto.py MMTPServer.py Modules.py 
	Queue.py ServerInfo.py ServerMain.py __init__.py benchmark.py 
	test.py 
Log Message:
Refactor ServerMain; debug the universe.

Common.py:
	Debug createPrivateDirs

Config.py:
	Link ModuleManager to ServerConfig

Crypto.py:
	More inlining in lioness_encrypt
	Change default e from 65535 to 65537
	Debug and tune shuffle
	Optimize the heck out of PRNG.getInt
	Optimize getFloat a bit.
	
MMTPServer.py:
	Add FFFFs

Modules.py:
	Add more comments
	Make modules provide their own delivery queues; add a couple of 
	   helper classes to make this easier.
	Add 'immediate delivery' null-object queue for modules that don't
	   want to queue.
	Debug module loading
	Improve queueing and delivery

Queue.py:
	Switch to createPrivateDir
	Debug queueObject
	Remove retryAt logic from DeliveryQueue: messages now get retried
	   at every opportunity
	Remove timing logic from MixQueue
	Debug *MixQueue

ServerInfo.py:
	Rename ServerKeys to ServerKeyset

ServerMain.py: Refactor completely.
	ServerKeyring now holds all of the server's key info
	IncomingQueue, OutgoingQueue, Mix, and _MMTPConnection now encapsulate
	   the internal processing logic
	MixminionServer is now a small class.

benchmark.py:
	Add timing for shuffle
	Make benchmark code run again (It was broken long ago by changes
	   to ServerInfo and _minionlib)

test.py:
	Replace mktemp with a secure, paranoid replacement that can clean
	   up after itself; remove all old clean-up stuff.
	getFloat no longer takes an arg
	Tests for shuffle
	Tests for Queue.queueObject
	Tests for DeliveryQueue
	Tests for *MixQueue
	Cache 2048-bit RSA key; don't generate too many of those!
	Tests for ModuleManager



Index: Common.py
===================================================================
RCS file: /home/minion/cvsroot/src/minion/lib/mixminion/Common.py,v
retrieving revision 1.12
retrieving revision 1.13
diff -u -d -r1.12 -r1.13
--- Common.py	12 Aug 2002 18:12:24 -0000	1.12
+++ Common.py	19 Aug 2002 15:33:55 -0000	1.13
@@ -13,6 +13,7 @@
 import signal
 import sys
 import time
+import stat
 from types import StringType
 
 class MixError(Exception):
@@ -58,7 +59,7 @@
        XXXX we don't check permissions properly yet."""
     if not os.path.exists(d):
 	try:
-	    os.makedirs(s, 0700)
+	    os.makedirs(d, 0700)
 	except OSError, e:
 	    getLog().fatal("Unable to create directory %s"%d)
 	    raise MixFatalError()

Index: Config.py
===================================================================
RCS file: /home/minion/cvsroot/src/minion/lib/mixminion/Config.py,v
retrieving revision 1.8
retrieving revision 1.9
diff -u -d -r1.8 -r1.9
--- Config.py	11 Aug 2002 07:50:34 -0000	1.8
+++ Config.py	19 Aug 2002 15:33:55 -0000	1.9
@@ -261,7 +261,7 @@
 	key = mixminion.Crypto.pk_decode_public_key(asn1)
     except mixminion.Crypto.CryptoError:
 	raise ConfigError("Invalid public key")
-    if key.get_public_key()[1] != 65535:
+    if key.get_public_key()[1] != 65537:
 	raise ConfigError("Invalid exponent on public key")
     return key
 
@@ -695,12 +695,18 @@
     _restrictFormat = 0
     # XXXX Missing: Queue-Size / Queue config options
     # XXXX         timeout options
-    # XXXX       listen timeout??
-    def __init__(self, fname=None, string=None):
+    # XXXX         listen timeout??
+    # XXXX         Retry options
+    def __init__(self, fname=None, string=None, moduleManager=None):
+	# We use a copy of SERVER_SYNTAX, because the ModuleManager will
+	# mess it up.
         self._syntax = SERVER_SYNTAX.copy()
 
         import mixminion.Modules
-        self.moduleManager = mixminion.Modules.ModuleManager()
+	if moduleManager is None:
+	    self.moduleManager = mixminion.Modules.ModuleManager()
+	else:
+	    self.moduleManager = moduleManager
         self._addCallback("Server", self.__loadModules)    
 
         _ConfigFile.__init__(self, fname, string)
@@ -715,13 +721,10 @@
 	   accordingly."""
         self.moduleManager.setPath(section.get('ModulePath', None))
         for mod in section.get('Module', []):
-	    info("Loading module %s", mod)
+	    getLog().info("Loading module %s", mod)
             self.moduleManager.loadExtModule(mod)
 
         self._syntax.update(self.moduleManager.getConfigSyntax())
     
-##         if sections['Server']['PublicKeyLifeTime'][2] < 24*60*60:
-##             raise ConfigError("PublicKeyLifetime must be at least 1 day.")
-##         elif sections['Server']['PublicKeyLifeTime'][2] % (24*60*60) > 30:
-##             getLog().warn("PublicKeyLifetime rounded to the nearest day")
-##             nDays = sections[60*60*24]
+    def getModuleManager(self):
+	return self.moduleManager

Index: Crypto.py
===================================================================
RCS file: /home/minion/cvsroot/src/minion/lib/mixminion/Crypto.py,v
retrieving revision 1.13
retrieving revision 1.14
diff -u -d -r1.13 -r1.14
--- Crypto.py	12 Aug 2002 21:05:50 -0000	1.13
+++ Crypto.py	19 Aug 2002 15:33:56 -0000	1.14
@@ -11,6 +11,8 @@
 import os
 import sys
 import stat
+# XXXX is this used?
+import struct
 from types import StringType
 
 import mixminion.Config
@@ -89,13 +91,18 @@
     left = s[:DIGEST_LEN]
     right = s[DIGEST_LEN:]
     del s
-    # Performance note: This business with sha1("".join([key,right,key]))
+    # Performance note: This business with sha1("".join((key,right,key)))
     # may look slow, but it contributes only a 6% to the hashing step,
     # which in turn contributes under 11% of the time for LIONESS.
-    right = ctr_crypt(right, _ml.sha1("".join([key1,left,key1]))[:AES_KEY_LEN])
-    left = _ml.strxor(left,  _ml.sha1("".join([key2,right,key2])))
-    right = ctr_crypt(right, _ml.sha1("".join([key3,left,key3]))[:AES_KEY_LEN])
-    left = _ml.strxor(left,  _ml.sha1("".join([key4,right,key4])))
+    right = _ml.aes_ctr128_crypt(
+	_ml.aes_key(_ml.sha1("".join((key1,left,key1)))[:AES_KEY_LEN]), 
+	right, 0) 
+    left = _ml.strxor(left,  _ml.sha1("".join((key2,right,key2))))
+    right = _ml.aes_ctr128_crypt(
+	_ml.aes_key(_ml.sha1("".join((key3,left,key3)))[:AES_KEY_LEN]), 
+	right, 0)
+    left = _ml.strxor(left,  _ml.sha1("".join((key4,right,key4))))
+
     return left + right
 
 def lioness_decrypt(s,(key1,key2,key3,key4)):
@@ -161,7 +168,7 @@
     data = key.crypt(data, 1, 0)
     return check_oaep(data,OAEP_PARAMETER,bytes)
 
-def pk_generate(bits=1024,e=65535):
+def pk_generate(bits=1024,e=65537):
     """Generate a new RSA keypair with 'bits' bits and exponent 'e'.  It is
        safe to use the default value of 'e'.
     """
@@ -171,7 +178,7 @@
     """Extracts the modulus of a public key."""
     return key.get_public_key()[0]
 
-def pk_from_modulus(n, e=65535L):
+def pk_from_modulus(n, e=65537L):
     """Given a modulus and exponent, creates an RSA public key."""
     return _ml.rsa_make_public_key(long(n),long(e))
 
@@ -400,67 +407,48 @@
 	else:
 	    n = min(n, size)
 
+	if n == size:
+	    series = xrange(n-1)
+	else:
+	    series = xrange(n)
 
         # This permutation algorithm yields all permutation with equal
         # probability (assuming a good rng); others do not.
-        for i in range(n-1):
-            swap = i+self.getInt(size-i)
-            v = lst[swap]
-            lst[swap] = lst[i]
-            lst[i] = v
+	getInt = self.getInt
+        for i in series:
+            swap = i+getInt(size-i)
+	    lst[swap],lst[i] = lst[i],lst[swap]
 
 	return lst[:n]
 
     def getInt(self, max):
         """Returns a random integer i s.t. 0 <= i < max.
 
-           The value of max must be less than 2**32."""
+           The value of max must be less than 2**30."""
 
-        # FFFF This implementation isn't very good.  It determines the number
-        # of bytes in max (nBytes), and a bitmask 1 less than the first power
-        # of 2 less than max.
-        #
-        # Then, it gets nBytes random bytes, ANDs them with the bitmask, and
-        # checks to see whether the result is < max.  If so, it returns.  Else,
-        # it generates more random bytes and tries again.
-        #
-        # On the plus side, this algorithm will obviously give all values
-        # 0 <= i < max with equal probability.  On the minus side, it
-        # requires (on average) 2*nBytes entropy to do so.
-        
-        assert max > 0
-        for bits in xrange(1,33):
-            if max < 1<<bits:
-                nBytes = ceilDiv(bits,8)
-                mask = (1<<bits)-1
-                break
-        if bits == 33:
-            raise "I didn't expect to have to generate a number over 2**32"
+        # FFFF This implementation is about 2-4x as good as the last one, but
+	# FFFF still could be better.  It's faster than getFloat()*max.
 
-        while 1:
-            bytes = self.getBytes(nBytes)
-            r = 0
-            for byte in bytes:
-                r = (r << 8) | ord(byte)
-            r = r & mask
-            if r < max:
-                return r
+        assert 0 < max < 0x3ffffffff
+	_ord = ord
+	while 1:
+	    # Get a random positive int between 0 and 0x7fffffff.
+	    b = self.getBytes(4)
+	    o = ((((((_ord(b[0])&0x7f)<<8) + _ord(b[1]))<<8) + 
+		  _ord(b[2]))<<8) + _ord(b[3])
+	    # Retry if we got a value that would fall in an incomplete
+	    # run of 'max' elements.
+	    if 0x7fffffff - max >= o:
+		return o % max
 
-    def getFloat(self, bytes=3):
+    def getFloat(self):
 	"""Return a floating-point number between 0 and 1.  The number
 	   will have 'bytes' bytes of resolution."""
-        # We need to special-case the <4 byte case to get good performance
-	# on Python<2.2
-        if bytes <= 3:
-	    max = 1<<(bytes*8)
-	    tot = 0
-	else:
-	    max = 1L<<(bytes*8)
-	    tot = 0L
-	bytes = self.getBytes(bytes)
-	for byte in bytes:
-	    tot = (tot << 8) | ord(byte)
-	return float(tot)/max
+	b = self.getBytes(4)
+	_ord = ord
+	o = ((((((_ord(b[0])&0x7f)<<8) + _ord(b[1]))<<8) + 
+	      _ord(b[2]))<<8) + _ord(b[3])
+	return o / 2147483647.0
 
     def _prng(self, n):
         """Abstract method: Must be overridden to return n bytes of fresh

Index: MMTPServer.py
===================================================================
RCS file: /home/minion/cvsroot/src/minion/lib/mixminion/MMTPServer.py,v
retrieving revision 1.10
retrieving revision 1.11
diff -u -d -r1.10 -r1.11
--- MMTPServer.py	12 Aug 2002 18:12:24 -0000	1.10
+++ MMTPServer.py	19 Aug 2002 15:33:56 -0000	1.11
@@ -621,6 +621,9 @@
        MMTPClientConnection"""
     def __init__(self, config):
         self.context = config.getTLSContext(server=1)
+	# FFFF Don't always listen; don't always retransmit!
+	# FFFF Support listening on specific IPs
+	# FFFF File
         self.listener = ListenConnection("127.0.0.1",
                                          config['Outgoing/MMTP']['Port'],
 					 LISTEN_BACKLOG,
@@ -631,7 +634,7 @@
     def _newMMTPConnection(self, sock):
 	"""helper method.  Creates and registers a new server connection when
 	   the listener socket gets a hit."""
-        # XXXX Check whether incoming IP is valid XXXX
+        # XXXX Check whether incoming IP is allowed!
         tls = self.context.sock(sock, serverMode=1)
         sock.setblocking(0)
         con = MMTPServerConnection(sock, tls, self.onMessageReceived)

Index: Modules.py
===================================================================
RCS file: /home/minion/cvsroot/src/minion/lib/mixminion/Modules.py,v
retrieving revision 1.4
retrieving revision 1.5
diff -u -d -r1.4 -r1.5
--- Modules.py	12 Aug 2002 18:12:24 -0000	1.4
+++ Modules.py	19 Aug 2002 15:33:56 -0000	1.5
@@ -5,18 +5,20 @@
 
    Type codes and dispatch functions for routing functionality."""
 
-__all__ = [ 'ModuleManager', 'DROP_TYPE', 'FWD_TYPE', 'SWAP_FWD_TYPE',
+__all__ = [ 'ModuleManager', 'DeliveryModule',
+	    'DROP_TYPE', 'FWD_TYPE', 'SWAP_FWD_TYPE',
 	    'DELIVER_OK', 'DELIVER_FAIL_RETRY', 'DELIVER_FAIL_NORETRY',
 	    'SMTP_TYPE', 'MBOX_TYPE' ]
 
 import os
+import sys
 import smtplib
 
 import mixminion.Config
 import mixminion.Packet
 import mixminion.Queue
 from mixminion.Config import ConfigError, _parseBoolean, _parseCommand
-from mixminion.Common import getLog
+from mixminion.Common import getLog, createPrivateDir
 
 # Return values for processMessage
 DELIVER_OK = 1
@@ -36,8 +38,8 @@
 MBOX_TYPE      = 0x0101  # Send the message to one of a fixed list of addresses
 
 class DeliveryModule:
-    """Abstract base for modules; delivery modules should implement the methods
-       in this class.
+    """Abstract base for modules; delivery modules should implement
+       the methods in this class.
 
        A delivery module has the following responsibilities:
            * It must have a 0-argument contructor.
@@ -51,15 +53,21 @@
 	pass
 
     def getConfigSyntax(self):
+	"""Return a map from section names to section syntax, as described
+	   in Config.py"""
         pass
 
     def validateConfig(self, sections, entries, lines, contents):
+	"""See mixminion.Config.validate"""
         pass
 
     def configure(self, config, manager):
+	"""Configure this object using a given Config object, and (if
+	   required) register it with the module manager."""
         pass
 
     def getServerInfoBlock(self):
+	"""Return a block for inclusion in a server descriptor."""
         pass
 
     def getName(self):
@@ -68,10 +76,17 @@
         pass
 
     def getExitTypes(self):
-	"""Return a list of numeric exit types which this module is willing to
+	"""Return a sequence of numeric exit types that this module can
            handle."""
         pass
 
+    def createDeliveryQueue(self, queueDir):
+	"""Return a DeliveryQueue object suitable for delivering messages
+	   via this module.  The default implementation returns a 
+	   SimpleModuleDeliveryQueue,  which (though adequate) doesn't 
+	   batch messages intended for the same destination."""
+        return _SimpleModuleDeliveryQueue(self, queueDir)
+
     def processMessage(self, message, exitType, exitInfo):
 	"""Given a message with a given exitType and exitInfo, try to deliver
            it.  Return one of:
@@ -81,6 +96,58 @@
 	    DELIVER_FAIL_NORETRY (if the message shouldn't be tried later)."""
         pass
 
+class _ImmediateDeliveryQueue:
+    """Helper class usable as delivery queue for modules that don't
+       actually want a queue.  Such modules should have very speedy
+       processMessage() methods, and should never have deliery fail."""
+    def __init__(self, module):
+	self.module = module
+
+    def queueMessage(self, (exitType, exitInfo), message):
+	try:
+	    res = self.module.processMessage(exitType, exitInfo, message)
+	    if res == DELIVER_OK:
+		return
+	    elif res == DELIVER_FAIL_RETRY:
+		getLog().error("Unable to retry delivery for message")
+	    else:
+		getLog().error("Unable to deliver message")
+	except:
+	    _, e, tb = sys.exc_info()    
+	    getLog().error(
+		"Exception delivering message: %s at line %s of %s", 
+		e, tb.tb_lineno, tb.tb_frame.f_code.co_name)
+
+    def sendReadyMessages(self):
+	pass
+
+class _SimpleModuleDeliveryQueue(mixminion.Queue.DeliveryQueue):
+    """Helper class used as a default delivery queue for modules that
+       don't care about batching messages to like addresses."""
+    def __init__(self, module, directory):
+	mixminion.Queue.DeliveryQueue.__init__(self, directory)
+	self.module = module
+    
+    def deliverMessages(self, msgList):
+	for handle, addr, message, n_retries in msgList:	  
+	    try:
+		exitType, exitInfo = addr
+		result = self.module.processMessage(message,exitType,exitInfo)
+		if result == DELIVER_OK:
+		    self.deliverySucceeded(handle)
+		elif result == DELIVER_FAIL_RETRY:
+		    # XXXX We need to drop undeliverable messages eventually!
+		    self.deliveryFailed(handle, 1)
+		else:
+		    getLog().error("Unable to deliver message")
+		    self.deliveryFailed(handle, 0)
+	    except:
+		_, e, tb = sys.exc_info()
+		getLog().error(
+		    "Exception delivering message: %s at line %s of %s", 
+		    e, tb.tb_lineno, tb.tb_frame.f_code.co_name)
+		self.deliveryFailed(handle, 0)
+
 class ModuleManager:
     """A ModuleManager knows about all of the modules in the systems.
    
@@ -91,14 +158,15 @@
        enabled."""
     ## 
     # Fields
-    #    myntax: extensions to the syntax configuration in Config.py
+    #    syntax: extensions to the syntax configuration in Config.py
     #    modules: a list of DeliveryModule objects
     #    nameToModule: XXXX Docdoc
     #    typeToModule: a map from delivery type to enabled deliverymodule.
     #    path: search path for python modules.
     #    queueRoot: directory where all the queues go.
-    #    queues: a map from module name to queue.
-       
+    #    queues: a map from module name to queue (Queue objects must support
+    #            queueMessage and sendReadyMessages as in DeliveryQueue.)
+    
     def __init__(self):
         self.syntax = {}
         self.modules = []
@@ -133,7 +201,10 @@
 
     def setPath(self, path):
 	"""Sets the search path for Python modules"""
-        self.path = path
+	if path:
+	    self.path = path.split(":")
+	else:
+	    self.path = []
 
     def loadExtModule(self, className):
 	"""Load and register a module from a python file.  Takes a classname
@@ -142,8 +213,8 @@
         ids = className.split(".")
         pyPkg = ".".join(ids[:-1])
         pyClassName = ids[-1]
+	orig_path = sys.path[:]
         try:
-            orig_path = sys.path[:]
 	    sys.path[0:0] = self.path
 	    try:
 		m = __import__(pyPkg, {}, {}, [])
@@ -152,7 +223,7 @@
         finally:
             sys.path = orig_path
 	try:
-	    pyClass = getattr(m, pyClassname)
+	    pyClass = getattr(m, pyClassName)
 	except AttributeError, e:
 	    raise MixError("No class %s in module %s" %(pyClassName,pyPkg))
 	try:
@@ -165,8 +236,8 @@
             m.validateConfig(sections, entries, lines, contents)
 
     def configure(self, config):
-	self.queueRoot = os.path.join(config['Server']['Homedir'],
-				      'work', 'queues', 'deliver')
+	self._setQueueRoot(os.path.join(config['Server']['Homedir'],
+					'work', 'queues', 'deliver'))
 	createPrivateDir(self.queueRoot)
         for m in self.modules:
             m.configure(config, self)
@@ -175,10 +246,15 @@
 	"""Maps all the types for a module object."""
         for t in module.getExitTypes():
             self.typeToModule[t] = module
+
 	queueDir = os.path.join(self.queueRoot, module.getName())
-	queue = mixminion.Queue.Queue(queueDiir, create=1, scrub=1)
+	queue = module.createDeliveryQueue(queueDir)
 	self.queues[module.getName()] = queue
 
+    def cleanQueues(self):
+	for queue in self.queues.values():
+	    queue.cleanQueue()
+	    
     def disableModule(self, module):
 	"""Unmaps all the types for a module object."""
         for t in module.getExitTypes():
@@ -189,30 +265,17 @@
 
     def queueMessage(self, message, exitType, exitInfo):
         mod = self.typeToModule.get(exitType, None)
-        if mod is not None:
-	    queue = self.queues[mod.getName()]
-	    f, handle = queue.openNewMessage()
-	    cPickle.dumps((0, exitType, exitInfo, message), f, 1)
-	    queue.finishMessage(f, handle)
-        else:
-            getLog().error("Unable to queue message with unknown type %s",
+        if mod is None:
+            getLog().error("Unable to handle message with unknown type %s",
                            exitType)
+	    return
+	    
+	queue = self.queues[mod.getName()]
+	queue.queueMessage((exitType,exitInfo), message)
 
-    def processMessages(self):
+    def sendReadyMessages(self):
 	for name, queue in self.queues.items():
-	    if len(queue):
-		XXXX
-
-    def processMessage(self, message, exitType, exitInfo):
-	"""Tries to deliver a message.  Return types are as in 
-           DeliveryModule.processMessage"""
-        mod = self.typeToModule.get(exitType, None)
-        if mod is not None:
-            return mod.processMessage(message, exitType, exitInfo)
-        else:
-            getLog().error("Unable to deliver message with unknown type %s",
-                           exitType)
-            return DELIVER_FAIL_NORETRY
+	    queue.sendReadyMessages()
 
     def getServerInfoBlocks(self):
         return [ m.getServerInfoBlock() for m in self.modules ]
@@ -225,17 +288,21 @@
     def getServerInfoBlock(self):
         return ""
     def configure(self, config, manager):
-	manager.enable(self)
+	manager.enableModule(self)
     def getName(self):
         return "DROP"
     def getExitTypes(self):
         return [ DROP_TYPE ]
+    def createDeliveryQueue(self, directory):
+	return _ImmediateDeliveryQueue(self)
     def processMessage(self, message, exitType, exitInfo):
         getLog().debug("Dropping padding message")
         return DELIVER_OK
 
 #----------------------------------------------------------------------
 class MBoxModule(DeliveryModule):
+    # XXXX This implementation can stall badly if we don't have a fast
+    # XXXX local MTA.
     def __init__(self):
         DeliveryModule.__init__(self)
         self.command = None
@@ -257,20 +324,22 @@
 
     def configure(self, config, moduleManager):
         # XXXX Check this.  error handling
+	
         self.enabled = config['Delivery/MBOX'].get("Enabled", 0)
-        self.server = config['Delivery/MBOX']['SMTPServer']
-        self.addressFile = config['Delivery/MBOX']['AddressFile']
-        self.returnAddress = config['Delivery/MBOX']['ReturnAddress']
-        self.contact = config['Delivery/MBOX']['RemoveContact']
-        if self.enabled:
-            if not self.addressFile:
-                raise ConfigError("Missing AddressFile field in Delivery/MBOX")
-            if not self.returnAddress:
-                raise ConfigError("Missing ReturnAddress field "+
-                                  "in Delivery/MBOX")
-            if not self.contact:
-                raise ConfigError("Missing RemoveContact field "+
-                                  "in Delivery/MBOX")
+	if not self.enabled:
+	    return
+
+	self.server = config['Delivery/MBOX']['SMTPServer']
+	self.addressFile = config['Delivery/MBOX']['AddressFile']
+	self.returnAddress = config['Delivery/MBOX']['ReturnAddress']
+	self.contact = config['Delivery/MBOX']['RemoveContact']
+	if not self.addressFile:
+	    raise ConfigError("Missing AddressFile field in Delivery/MBOX")
+	if not self.returnAddress:
+	    raise ConfigError("Missing ReturnAddress field in Delivery/MBOX")
+	if not self.contact:
+	    raise ConfigError("Missing RemoveContact field in Delivery/MBOX")
+			      
         
         self.nickname = config['Server']['Nickname']
         if not self.nickname:

Index: Queue.py
===================================================================
RCS file: /home/minion/cvsroot/src/minion/lib/mixminion/Queue.py,v
retrieving revision 1.10
retrieving revision 1.11
diff -u -d -r1.10 -r1.11
--- Queue.py	12 Aug 2002 21:05:50 -0000	1.10
+++ Queue.py	19 Aug 2002 15:33:56 -0000	1.11
@@ -12,10 +12,12 @@
 import stat
 import cPickle
 
-from mixminion.Common import MixError, MixFatalError, secureDelete, getLog
+from mixminion.Common import MixError, MixFatalError, secureDelete, getLog, \
+     createPrivateDir
 from mixminion.Crypto import AESCounterPRNG
 
-__all__ = [ 'Queue' ]
+__all__ = [ 'Queue', 'DeliveryQueue', 'TimedMixQueue', 'CottrellMixQueue', 
+	    'BinomialCottrellMixQueue' ]
 
 # Mode to pass to open(2) for creating a new file, and dying if it already
 # exists.
@@ -79,7 +81,7 @@
         if not os.path.exists(location):
             if create:
                 getLog().info("Trying to create queue %s", location)
-                os.mkdir(location, 0700)
+		createPrivateDir(location)
             else:
                 raise MixFatalError("No directory for queue %s" % location)
  
@@ -90,7 +92,7 @@
             getLog().warn("Worrisome mode %o on directory %s", mode, location)
 
         if scrub:
-            self.cleanQueue(1)
+            self.cleanQueue()
 
 	# Count messages on first time through.
 	self.n_entries = -1
@@ -107,7 +109,7 @@
 	"""Queue an object using cPickle, and return a handle to that 
 	   object."""
         f, handle = self.openNewMessage()
-        cPickle.dump(contents, f, 1)
+        cPickle.dump(object, f, 1)
         self.finishMessage(f, handle)
         return handle
       
@@ -261,7 +263,7 @@
     """A DeliveryQueue implements a queue that greedily sends messages
        to outgoing streams that occasionally fail.  Messages in a 
        DeliveryQueue are no longer unstructured text, but rather
-       tuples of: (n_retries, next_retry_time, addressing info, msg).
+       tuples of: (n_retries, addressing info, msg).
 
        This class is abstract. Implementors of this class should
        subclass it to add a deliverMessages method.  Multiple
@@ -278,11 +280,8 @@
     """
     ###
     # Fields:
-    #    sendableAt -- An inorder list of (time, handle) for all messages 
-    #           that we're not currently sending. Each time is either a
-    #           number of seconds since the epoch at which the corresponding
-    #           message will be sendable, or 0 to indicate that the message
-    #           is sendable _now_.  
+    #    sendable -- A list of handles for all messages 
+    #           that we're not currently sending.
     #    pending -- Dict from handle->1, for all messages that we're
     #           currently sending.
 
@@ -293,63 +292,47 @@
     def _rescan(self):
 	"""Rebuild the internal state of this queue from the underlying
            directory."""
-	self.sendableAt = []
 	self.pending = {}
-	now = time.time()
-	for h in self.getAllmessages():
-	    t = self.getObject(handle)[1] # retry time
-	    self.sendableAt.append[(t, h)]
-	self.sendableAt.sort()
+	self.sendable = self.getAllMessages()
     
-    def queueMessage(self, addr, msg, retry=0, retryAt=0):
+    def queueMessage(self, addr, msg, retry=0):
 	"""Schedule a message for delivery.
 	     addr -- An object to indicate the message's destination
 	     msg -- the message itself
-	     retry -- how many times so far have we tried to send?
-	     retryAt -- when should we next retry? (0 for now)."""
+	     retry -- how many times so far have we tried to send?"""
 
-        self.queueObject( (retry, retryAt, addr, msg) )
+        handle = self.queueObject( (retry, addr, msg) )
+	self.sendable.append(handle)
 
-	if retryAt == 0:
-	    self.sendableAt[0:0] = [(0, handle)]
-	else:
-	    bisect.insort_right(self.sendableAt, (retryAt, handle))
 	return handle
 
-    def nextMessageReadyAt(self):
-	"""Return the soonest possible time at which sendReadyMessages
-	   will send something.  If some time < now is returned,
-	   the answer is 'immediately'.  If 'None' is returned, there are
-	   no messages in the queue."""
-	if self.sendableAt:
-	    return self.sendableAt[0][0]
-	else:
-	    return None
-    
-    def messageContents(self,handle):
-	"""Returns a (n_retries, retryAt, addr, msg) payload for a given
+    def get(self,handle):
+	"""Returns a (n_retries, addr, msg) payload for a given
 	   message handle."""
         return self.getObject(handle)
 	
     def sendReadyMessages(self):
-	"""Sends all messages which are ready to be send, and which are not
-           already being sent."""
-	now = time.time()
-	idx = bisect.bisect_right(self.sendableAt, (now, ""))
+	"""Sends all messages which are not already being sent."""
+
+	handles = self.sendable
 	messages = []
-	sendable = self.sendableAt[:idx]
-	del self.sendableAt[:idx]
-	for when, h in self.sendable:
-	    retries, retryAt, addr, msg = self.getObject(h)
+	self.sendable = []
+	for h in handles:
+	    retries, addr, msg = self.getObject(h)
 	    messages.append((h, addr, msg, retries))
 	    self.pending[h] = 1
 	if messages:
-	    self.deliverMessage(messages)
+	    self.deliverMessages(messages)
 
     def deliverMessages(self, msgList):
 	"""Abstract method; Invoked with a list of  
   	   (handle, addr, message, n_retries) tuples every time we have a batch
-	   of messages to send."""
+	   of messages to send.  
+
+           For every handle in the list, delierySucceeded or deliveryFailed
+	   should eventually be called, or the message will sit in the queue
+	   indefinitely, without being retried."""
+
         # We could implement this as a single deliverMessage(h,addr,m,n)
 	# method, but that wouldn't allow implementations to batch
 	# messages being sent to the same address.
@@ -363,18 +346,18 @@
 	self.removeMessage(handle)
 	del self.pending[handle]
 
-    def deliveryFailed(self, handle, retryAt=None):
+    def deliveryFailed(self, handle, retriable=0):
 	"""Removes a message from the outgoing queue, or requeues it
 	   for delivery at a later time.  This method should be
 	   invoked after the corresponding message has been
 	   successfully delivered."""
 	del self.pending[handle]
-	if self.retryAt is not None:
+	if retriable:
 	    # Queue the new one before removing the old one, for 
 	    # crash-proofness
-	    retries, retryAt, addr, msg = self.getObject(h)
-	    self.queueMessage(addr, msg, retries+1, retryAt)
-	self.removeMessage(handle)	    
+	    retries,  addr, msg = self.getObject(handle)
+	    self.queueMessage(addr, msg, retries+1)
+	self.removeMessage(handle)    
 
 class TimedMixQueue(Queue):
     """A TimedMixQueue holds a group of files, and returns some of them
@@ -385,27 +368,15 @@
 	   every 'interval' seconds."""
         Queue.__init__(self, location, create=1, scrub=1)
 	self.interval = interval
-	self.nextSendTime = time.time + interval
 
-    def nextMessageReadyAt(self):
-	"""Return the next time at which the pool will be ready to send
-	   messages"""
-	return self.nextSendTime
-
-    def getReadyMessages(self):
+    def getBatch(self):
 	"""Return handles for all messages that the pool is currently ready 
-	   to send."""
-	now = time.time()
-	if now < self.nextSendTime:
-	    return []
-	self.nextSendTime = now + self.interval
-	return self._getBatch()
-
-    def _getBatch(self):
-	"""Internal method: called by getReadyMessages to return a single
-	   batch of handles."""
+	   to send in the next batch"""
 	return self.pickRandom()
 
+    def getInterval(self):
+	return self.interval
+
 class CottrellMixQueue(TimedMixQueue):
     """A CottrellMixQueue holds a group of files, and returns some of them
        as requested, according the Cottrell (timed dynamic-pool) mixing
@@ -416,22 +387,24 @@
 	   and never sends more than maxSendRate * the current pool size."""
 	TimedMixQueue.__init__(self, location, interval)
 	self.minPoolSize = minPoolSize
-	self.maxSendRate = maxSendRate
+	self.maxBatchSize = int(maxSendRate*minPoolSize)
+	if self.maxBatchSize < 1: 
+	    self.maxBatchSize = 1
 
-    def _getBatch(self):
+    def getBatch(self):
 	pool = self.count()
-	nTransmit = min(pool-self.minPoolSize, int(pool*self.maxSendRate))
+	nTransmit = min(pool-self.minPoolSize, self.maxBatchSize)
 	return self.pickRandom(nTransmit)
 
 class BinomialCottrellMixQueue(CottrellMixQueue):
     """Same algorithm as CottrellMixQueue, but instead of sending N messages
        from the pool of size P, sends each message with probability N/P."""
-    def _getBatch(self):
+    def getBatch(self):
 	pool = self.count()
-	nTransmit = min(pool-self.minPoolSize, int(pool*self.maxSendRate))
+	nTransmit = min(pool-self.minPoolSize, self.maxBatchSize)
 	msgProbability = float(nTransmit) / pool
-	return rng.shuffle([ h for h in self.getAllMessages() 
-			        if self.rng.getFloat() < msgProbability ])
+	return self.rng.shuffle([ h for h in self.getAllMessages() 
+				    if self.rng.getFloat() < msgProbability ])
 
 def _secureDelete_bg(files, cleanFile):
     pid = os.fork()

Index: ServerInfo.py
===================================================================
RCS file: /home/minion/cvsroot/src/minion/lib/mixminion/ServerInfo.py,v
retrieving revision 1.9
retrieving revision 1.10
diff -u -d -r1.9 -r1.10
--- ServerInfo.py	12 Aug 2002 18:12:24 -0000	1.9
+++ ServerInfo.py	19 Aug 2002 15:33:56 -0000	1.10
@@ -132,7 +132,7 @@
         return IPV4Info(self.getAddr(), self.getPort(), self.getKeyID())
 
 #----------------------------------------------------------------------
-class ServerKeys:
+class ServerKeyset:
     """A set of expirable keys for use by a server.
 
        A server has one long-lived identity key, and two short-lived
@@ -143,7 +143,7 @@
        Whether we publish or not, we always generate a server descriptor
        to store the keys' lifetimes.
 
-       When we create a new ServerKeys object, the associated keys are not
+       When we create a new ServerKeyset object, the associated keys are not
        read from disk unil the object's load method is called."""
     def __init__(self, keyroot, keyname, hashroot):
 	keydir = self.keydir = os.path.join(keyroot, "key_"+keyname)
@@ -232,7 +232,7 @@
     packetKey = mixminion.Crypto.pk_generate(PACKET_KEY_BYTES*8)
     mmtpKey = mixminion.Crypto.pk_generate(PACKET_KEY_BYTES*8)
 
-    serverKeys = ServerKeys(keydir, keyname, hashdir)
+    serverKeys = ServerKeyset(keydir, keyname, hashdir)
     serverKeys.packetKey = packetKey
     serverKeys.mmtpKey = mmtpKey
     serverKeys.save()

Index: ServerMain.py
===================================================================
RCS file: /home/minion/cvsroot/src/minion/lib/mixminion/ServerMain.py,v
retrieving revision 1.3
retrieving revision 1.4
diff -u -d -r1.3 -r1.4
--- ServerMain.py	12 Aug 2002 18:12:24 -0000	1.3
+++ ServerMain.py	19 Aug 2002 15:33:56 -0000	1.4
@@ -8,7 +8,11 @@
    BUG: No support for encrypting private keys.n"""
 
 import cPickle
+import os
 
+import mixminion._minionlib
+import mixminion.Queue
+from mixminion.ServerInfo import ServerKeySet, ServerInfo
 from mixminion.Common import getLog, MixFatalError, MixError, createPrivateDir
 
 # Directory layout:
@@ -27,80 +31,55 @@
 #                 conf/miniond.conf 
 #                       ....
 
-class ServerState:
-    # XXXX This should be refactored.  keys should be separated from queues.
-    # config
-    # log
-    # homedir
+class ServerKeyring:
+    # homeDir: ----
+    # keysDir: ----
+    # keySloppiness: ----
+    # keyIntervals: list of (start, end, ServerKeySetName)
     def __init__(self, config):
-        self.config = config
-
-        #XXXX DOCDOC
-        # set up directory structure.
-        c = self.config
-        self.homedir = c['Server']['Homedir']
-        createPrivateDir(self.homedir)
-        getLog()._configure(self.config)# ????
-        
-        w = os.path.join(self.homeDir, "work")
-        q = os.path.join(w, "queues")
-        self.incomingDir = os.path.join(q, "incoming")
-        self.mixDir = os.path.join(q, "mix")
-        self.outgoingDir = os.path.join(q, "outgoing")
-        self.deliverDir = os.path.join(q, "deliver")
-
-        tlsDir = os.path.join(w, "tls")
-        self.hashlogsDir = os.path.join(w, "hashlogs")
-        self.keysDir = os.path.join(self.homeDir, "keys")
-        self.confDir = os.path.join(self.homeDir, "conf")
-        
-        for d in [self.homeDir, w, q, self.incomingDir, self.mixDir,
-                  self.outgoingDir, self.deliverDir, tlsDir,
-                  self.hashlogsDir, self.keysDir, self.confDir]:
-            createPrivateDir(d)
-
-        for name in ("incoming", "mix", "outgoing"):
-            loc = getattr(self, name+"Dir")
-            queue = mixminion.Queue.Queue(loc, create=1, scrub=1)
-            setattr(self, name+"Queue", queue)
-
-        self.dhFile = os.path.join(tlsDir, "dhparam")
-
-        self.checkKeys()
-
-    def getDHFile(self):
-        if not os.path.exists(self.dhFile):
-            getLog().info("Generating Diffie-Helman parameters for TLS...")
-            mixminion._minionlib.generate_dh_parameters(self.dhFile, verbose=0)
-            getLog().info("...done")
+	self.configure(config)
 
-        return self.dhFile
+    def configure(self):
+	self.homeDir = config['Server']['Homedir']
+	self.keyDir = os.path.join(self.homeDir, 'keys')
+	self.keySloppiness = config['Server']['PublicKeySloppiness']
+	self.checkKeys()
 
     def checkKeys(self):
-        self.keyIntervals = [] # list of start, end, keysetname
+        self.keyIntervals = [] 
         for dirname in os.listdir(self.keysDir):
             if not dirname.startswith('key_'):
+		getLog().warn("Unexpected directory %s under %s",
+			      dirname, self.keysDir)
                 continue
             keysetname = dirname[4:]
             
             d = os.path.join(self.keysDir, dirname)
             si = os.path.join(self.keysDir, "ServerDesc")
             if os.path.exists(si):
-                inf = mixminion.ServerInfo.ServerInfo(fname=si, assumeValid=1)
+                inf = ServerInfo(fname=si, assumeValid=1)
                 t1 = inf['Server']['Valid-After']
                 t2 = inf['Server']['Valid-Until']
                 self.keyIntervals.append( (t1, t2, keysetname) ) 
 
         self.keyIntervals.sort()
-
+    
     def removeDeadKeys(self):
         now = time.time()
-        cutoff = now - config['Server']['PublicKeySloppiness']
-        names = [ os.path.join(self.keyDir,"key_"+name)
+        cutoff = now - self.keySloppiness
+	dirs = [ os.path.join(self.keyDir,"key_"+name)
                   for va, vu, name in self.keyIntervals if vu < cutoff ]
-        # XXXX DELETE KEYS
-        
+
+	for dirname in dirs:
+	    files = [ os.path.join(dirname,f) 
+		                      for f in os.listdir(dirname) ])
+	    secureDelete(filenames, blocking=1)
+	    os.rmdir(dirname)
+	    
+	self.checkKeys()
+
     def _getLiveKey(self):
+	# returns valid-after, valid-until, name
         now = time.time()
         idx = bisect.bisect_left(self.keyIntervals, (now, None, None))
         return self.keyIntervals[idx]
@@ -108,136 +87,182 @@
     def getNextKeyRotation(self):
         return self._getLiveKey()[1]
 
-    def getServerKeys(self):
-        keyset = self._getLiveKey()[2]
-        sk = mixminion.ServerInfo.ServerKeys(self.keyDir, keyset,
-                                             self.hashlogsDir)
-        sk.load()
-        return sk
+    def getServerKeyset(self):
+	# FFFF Support passwords on keys
+	_, _, name = self._getLiveKey()
+	hashroot = os.path.join(self.homeDir, 'work', 'hashlogs')
+	keyset = ServerKeySet(self.keyDir, name, hashroot)
+	keyset.load
+	return self.keyset
+	
+    def getDHFile(self):
+	dhdir = os.path.join(self.homedir, 'work', 'tls')
+	createPrivateDir(dhdir)
+	dhfile = os.path.join(dhdir, 'dhparam')
+        if not os.path.exists(dhfile):
+            getLog().info("Generating Diffie-Helman parameters for TLS...")
+            mixminion._minionlib.generate_dh_parameters(self.dhfile, verbose=0)
+            getLog().info("...done")
 
+        return dhfile
+			    
     def getTLSContext(self):
-        # XXXX NO SUPPORT FOR ROTATION
-        keys = self.getServerKeys()
-        return mixminion._minionlib.TLSContext_new(keys.certFile,
-                                                   keys.mmtpKey,
-                                                   self.dhFile)
-    
+        keys = self.getServerKeyset()
+        return mixminion._minionlib.TLSContext_new(keys.getCertFileName(),
+						   keys.GetMMTPKey(),
+						   self.getDHFile())
+
     def getPacketHandler(self):
-        keys = self.getServerKeys()
-        return mixminion.PacketHandler.PacketHandler(keys.packetKey,
-                                                     keys.hashlogFile)
+        keys = self.getServerKeyset()
+        return mixminion.PacketHandler.PacketHandler(keys.getPacketKey(),
+                                                     keys.getHashLogFile())
 
-    def getIncomingQueues(self):
-        return self.incomingQueue
+class IncomingQueue(mixminion.Queue.DeliveryQueue):
+    def __init__(self, location, packetHandler):
+	mixminion.Queue.DeliveryQueue.__init__(self, location)
+	self.packetHandler = packetHandler
+	self.mixQueue = None
 
-    def getOutgoingQueue(self):
-        return self.outgoingQueue
+    def connectQueues(self, mixQueue):
+	self.mixQueue = mixQueue
 
-    def getMixQueue(self):
-        return self.mixQueue
+    def queueMessage(self, msg):
+	mixminion.Queue.queueMessage(None, msg)
+    
+    def deliverMessages(self, msgList):
+	ph = self.packetHandler
+	for handle, _, message, n_retries in msgList:
+	    try:
+		res = ph.packetHandler(message)
+		if res is None:
+		    log.info("Padding message dropped")
+		else:
+		    self.mixQueue.queueObject(res)
+		    self.deliverySucceeded(handle)
+	    except mixminion.Crypto.CryptoError, e:
+		log.warn("Invalid PK or misencrypted packet header:"+str(e))
+		self.deliveryFailed(handle)
+	    except mixminion.Packet.ParseError, e:
+		log.warn("Malformed message dropped:"+str(e))
+		self.deliveryFailed(handle)
+	    except mixminion.PacketHandler.ContentError, e:
+		log.warn("Discarding bad packet:"+str(e))
+		self.deliveryFailed(handle)
 
-    def getDeliverMBOXQueue(self, which):
-        return self.deliverMBOXQueue
+class MixQueue:
+    def __init__(self, queue):
+	self.queue = queue
+	self.outgoingQueue = None
+	self.moduleManager = None
 
-class _Server(MMTPServer):
-    def __init__(self, config, serverState):
-        self.incomingQueue = serverState.getIncomingQueue()
-        self.outgoingQueue = serverState.getOutgoingQueue()
+    def connectQueues(self, outgoing, manager):
+	self.outgoingQueue = outgoing
+	self.moduleManager = manager
+
+    def mix(self):
+	handles = self.queue.getBatch()
+	for h in handles:
+	    tp, info = self.queue.getObject(h)
+	    if tp == 'EXIT':
+		rt, ri, app_key, payload = info
+		self.moduleManger.queueMessage((rt, ri), payload)
+	    else:
+		assert tp == 'QUEUE'
+		ipv4, msg = info
+		self.outgoingQueue.queueMessage(ipv4, msg)
+
+class OutgoingQueue(mixminion.Queue.DeliveryQueue):
+    def __init__(self, location):
+	OutgoingQueue.__init__(self, location)
+	self.server = None
+
+    def connectQueues(self, server):
+	self.server = sever
+
+    def deliverMessages(self, msgList):
+	# Map from addr -> [ (handle, msg) ... ]
+	msgs = {}
+	for handle, addr, message, n_retries in msgList:
+	    msgs.setdefault(addr, []).append( (handle, message) )
+	for addr, messages in msgs.items():
+	    messages, handles = zip(*messages)
+	    self.server.sendMessages(addr.ip, addr.port, addr.keyinfo,
+				     messages, handles)
+
+class _MMTPConnection(MMTPServer):
+    def __init__(self, config):
         MMTPServer.__init__(self, config)
-        
+
+    def connectQueues(self, incoming, outgoing)
+        self.incomingQueue = incoming
+        self.outgoingQueue = outgoing
+
     def onMessageReceived(self, msg):
         self.incomingQueue.queueMessage(msg)
 
     def onMessageSent(self, msg, handle):
-        self.outgoingQueue.remove(handle)
+        self.outgoingQueue.deliverySucceeded(handle)
 
-def runServer(config):
-    s = ServerState(config)
-    log = getLog()
-    packetHandler = s.getPacketHandler()
-    context = s.getTLSContext()
+    def onMessageUndeliverable(self, msg, handle, retriable):
+	self.outgoingQueue.deliveryFailed(handle, retriable)
 
-    # XXXX Make these configurable; make mixing OO.
-    mixInterval = 60
-    mixPoolMinSize = 5
-    mixPoolMaxRate = 0.5 
 
-    nextMixTime = time.time() + mixInterval
+class MixminionServer:
+    def __init__(self, config, keyring):
+	self.config = config
+	self.keyring = ServerKeyring(config)
+	
+	self.packetHandler = self.keyring.getPacketHandler()
+	self.mmtpConnection = _MMTPConnection(config)
 
-    server = _Server(config, s)
+	# FFFF Modulemanager should know about async so it can patch in if it
+	# FFFF needs to.
+	self.moduleManager = config.getModuleManager()
 
-    incomingQueue = s.getIncomingQueue()
-    outgoingQueue = s.getOutgoingQueue()
-    mixQueue = s.getMixQueue()
-    deliverMBOXQueue = s.getDeliverMBOXQueue()
-    while 1:  # Add shutdown mechanism XXXX
-        server.process(1)
+	homeDir = config['Server']['Homedir']
+	queueDir = os.path.join(homeDir, 'work', 'queues')
 
-	# Possible timing attack here????? XXXXX ????
-	if incomingQueue.count():
-	    for h in incomingQueue.getAllMessages():
-		msg = incomingQueue.messageContents(h)
-		res = None
-		try:
-		    res = packetHandler.processHandler(msg)
-		    if res is None: log.info("Padding message dropped")
-		except miximinion.Packet.ParseError, e:
-		    log.warn("Malformed message dropped:"+str(e))
-		except miximinion.Crypto.CryptoError, e:
-		    log.warn("Invalid PK or misencrypted packet header:"+str(e))
-		except mixminion.PacketHandler.ContentError, e:
-		    log.warn("Discarding bad packet:"+str(e))
+	incomingDir = os.path.join(queueDir, "incoming")
+	self.incomingQueue = IncomingQueue(incomingDir, self.packetHandler)
 
-		if res is not None:
-		    f, newHandle = mixQueue.openNewMessage()
-		    cPickle.dump(res, f, 1)
-		    mixQueue.finishMessage(f, newHandle)
-		    log.info("Message added to mix queue")
-		
-		incomingQueue.removeMessage(h)
+	mixDir = os.path.join(queueDir, "mix")
+	# FFFF The choice of mix algorithm should be configurable
+	self.mixQueue = MixQueue(TimedMixQueue(mixDir, 60))
+
+	outgoingDir = os.path.join(queueDir, "outgoing")
+	self.outgoingQueue = OutgoingQueue(outgoingDir)
+
+	self.incomingQueue.connectQueues(mixQueue=self.mixQueue)
+	self.mixQueue.connectQueues(outgoing=self.outgoingQueue,
+				    manager=self.moduleManager)
+	self.outgoingQueue.connectQueues(server=self.mmtpConnection)
+	self.mmtpConnection.connectQueues(incoming=self.incomingQueue,
+					  outgoing=self.outgoingQueue)
 	
-	if time.time() > nextMixTime:
-	    nextMixTime = time.time() + mixInterval
+    def run(self):
+	now = time.time()
+	nextMix = now + 60 # FFFF Configurable!
+	nextShred = now + 6000
+	
+	while 1:
+	    while time.time() < nextMix:
+		self.mmtpConnection.process(1)
+		self.incomingQueue.sendReadyMessages()
 	    
-	    poolSize = mixQueue.count()
-	    if poolSize > mixPoolMinSize:
-		beginSending = {}
-		n = min(poolSize-mixPoolMinSize, int(poolSize*mixPoolMaxRate))
-		handles = mixQueue.pickRandom(n)
-		for h in handles:
-		    f = mixQueue.openMessage(h)
-		    type, data = cPickle.load(f)
-		    f.close()
-		    if type == 'QUEUE':
-			newHandle = mixQueue.moveMessage(h, outgoingQueue)
-			ipv4info, payload = data
-			beginSending.setdefault(ipv4info,[]).append(
-			    (newHandle, payload))
-		    else:
-			assert type == 'EXIT'
-			# XXXX Use modulemanager for this.
-			rt, ri, appKey, payload = data
-			if rt == Modules.DROP_TYPE:
-			    mixQueue.removeMessage(h)
-			elif rt == Moddules.MBOX_TYPE:
-			    mixQueue.moveMessage(h, deliverMBOXQueue)
-			else:
-			    # XXXX Shouldn't we drop stuff as early as possible,
-			    # XXXX so that we don't wind up with only a single
-			    # XXXX message sent out of the server?
-			    log.warn("Dropping unhandled type 0x%04x",rt)
-			    mixQueue.removeMessage(h)
+	    self.mixQueue.mix()
+	    self.outgoingQueue.sendReadyMessages()
+	    self.moduleManager.sendReadyMessages()
+
+	    now = time.time()
+	    nextMix = now + 60
+	    if now > nextShred:
+		# Configurable shred interval
+		self.incomingQueue.cleanQueue()
+		self.mixQueue.queue.cleanQueue()
+		self.outgoingQueue.cleanQueue()
+		self.moduleManager.cleanQueues()
+		nextShred = now + 6000
 
-		if beginSending:
-		    # XXXX Handle timeouts; handle if we're already sending
-		    # XXXX to a server.
-		    for addr, messages in beginSending.items():
-			handles, payloads = zip(*messages)
-			server.sendMessages(addr.ip, addr.port, addr.keyinfo,
-					    payloads, handles)
-		    
-	    # XXXX Retry and resend after failure.
-	    # XXXX Scan for messages we should begin sending after restart.
 	    # XXXX Remove long-undeliverable messages
-	    # XXXX Deliver MBOX messages.
+
 

Index: __init__.py
===================================================================
RCS file: /home/minion/cvsroot/src/minion/lib/mixminion/__init__.py,v
retrieving revision 1.4
retrieving revision 1.5
diff -u -d -r1.4 -r1.5
--- __init__.py	5 Jul 2002 19:50:27 -0000	1.4
+++ __init__.py	19 Aug 2002 15:33:56 -0000	1.5
@@ -23,4 +23,3 @@
 import mixminion.Packet
 import mixminion.ServerInfo
 
-

Index: benchmark.py
===================================================================
RCS file: /home/minion/cvsroot/src/minion/lib/mixminion/benchmark.py,v
retrieving revision 1.7
retrieving revision 1.8
diff -u -d -r1.7 -r1.8
--- benchmark.py	25 Jun 2002 11:41:08 -0000	1.7
+++ benchmark.py	19 Aug 2002 15:33:56 -0000	1.8
@@ -12,6 +12,7 @@
    """
 
 from time import time
+from mixminion.test import mix_mktemp
 
 __pychecker__ = 'no-funcdoc no-reimport'
 __all__ = [ 'timeAll', 'testLeaks1', 'testLeaks2' ]
@@ -94,6 +95,7 @@
 
 def cryptoTiming():
     print "#==================== CRYPTO ======================="
+
     print "SHA1 (short)", timeit((lambda: sha1(short)), 100000)
     print "SHA1 (64b)", timeit((lambda: sha1(s64b)), 100000)
     print "SHA1 (2K)", timeit((lambda: sha1(s2K)), 10000)
@@ -153,6 +155,15 @@
     print "aesprng.getInt (513)", \
           timeit((lambda c=c: c.getInt(513)), 10000)
 
+    L10 = [ "x" ] * 10
+    L1000 = [ "x" ] * 1000
+    print "aesprng.shuffle (10/10)", \
+	  timeit((lambda c=c,L=L10: c.shuffle(L)), 1000)
+    print "aesprng.shuffle (1000/1000)", \
+	  timeit((lambda c=c,L=L1000: c.shuffle(L)), 30)
+    print "aesprng.shuffle (10/1000)", \
+	  timeit((lambda c=c,L=L1000: c.shuffle(L,10)), 1000)
+    
     lkey = Keyset("keymaterial foo bar baz").getLionessKeys("T")
     print "lioness E (1K)", timeit((
         lambda lkey=lkey: lioness_encrypt(s1K, lkey)), 1000)
@@ -203,6 +214,7 @@
 
     for (bits,it) in ((2048,10),(4096,10)):
         t = time()
+	print "[generating key...]"
         rsa2 = pk_generate(bits)
         t = time()-t
         print "RSA genrate (%d bit)"%bits, timestr(t)
@@ -216,14 +228,13 @@
     print "Timing overhead: %s...%s" % (timestr(min(o)),timestr(max(o)))
 
 #----------------------------------------------------------------------
-import tempfile
 import os
 import stat
 
 def hashlogTiming():
     print "#==================== HASH LOGS ======================="
     for load in (100, 1000, 10000, 100000):
-        fname = tempfile.mktemp(".db")
+        fname = mix_mktemp(".db")
         try:
             _hashlogTiming(fname,load)
         finally:
@@ -278,16 +289,16 @@
 
 #----------------------------------------------------------------------
 from mixminion.BuildMessage import _buildHeader, buildForwardMessage
-from mixminion.ServerInfo import ServerInfo
+from mixminion.test import FakeServerInfo
 
 def buildMessageTiming():
 
     print "#================= BUILD MESSAGE ====================="
     pk = pk_generate()
     payload = ("Junky qoph flags vext crwd zimb."*1024)[:22*1024]
-    serverinfo = [ServerInfo("127.0.0.1", 48099, pk_get_modulus(pk),"x"*20)
+    serverinfo = [FakeServerInfo("127.0.0.1", 48099, pk,"x"*20)
                   ] * 16
-                             
+    
     def bh(np,it, serverinfo=serverinfo):
         ctr = AESCounterPRNG()
 
@@ -330,8 +341,7 @@
     print "#================= SERVER PROCESS ====================="
 
     pk = pk_generate()
-    n = pk_get_modulus(pk)
-    server = ServerInfo("127.0.0.1", 1, n, "X"*20)
+    server = FakeServerInfo("127.0.0.1", 1, pk, "X"*20)
     sp = PacketHandler(pk, DummyLog())
 
     m_noswap = buildForwardMessage("Hello world", SMTP_TYPE, "f@invalid",
@@ -409,8 +419,7 @@
         lambda s=s2K,k=lionesskey: lioness_encrypt(s,k)),1000)
     prng_128b = timeit_((lambda k=aeskey: prng(k,128)),10000)
 
-    n = pk_get_modulus(pk)
-    server = ServerInfo("127.0.0.1", 1, n, "X"*20)
+    server = FakeServerInfo("127.0.0.1", 1, pk, "X"*20)
     sp = PacketHandler(pk, DummyLog())
 
     m_noswap = buildForwardMessage("Hello world", SMTP_TYPE, "f@invalid",
@@ -444,7 +453,7 @@
 def fileOpsTiming():
     print "#================= File ops ====================="
     installSignalHandlers(child=1,hup=0,term=0)
-    dname = tempfile.mktemp(".d")
+    dname = mix_mktemp(".d")
     try:
 
         os.mkdir(dname)
@@ -487,55 +496,56 @@
     keytxt="a"*16
     key = _ml.aes_key(keytxt)
     while 1:
-        if 1:
-            _ml.sha1(s20k)
-            _ml.aes_ctr128_crypt(key,s20k,0)
-            _ml.aes_ctr128_crypt(key,s20k,2000)
-            _ml.aes_ctr128_crypt(key,"",2000,20000)
-            _ml.aes_ctr128_crypt(key,"",0,20000)
-            _ml.aes_ctr128_crypt(key,s20k,0,2000)
-            try:
-                _ml.aes_ctr128_crypt("abc",s20k,0,2000)
-            except:
-                pass
-            _ml.strxor(s20k,s20k)
-            try:
-                _ml.strxor(s20k,keytxt)
-            except:
-                pass
-            _ml.openssl_seed(s20k)
-            r = _ml.add_oaep_padding("Hello",OAEP_PARAMETER,128)
-            _ml.check_oaep_padding(r,OAEP_PARAMETER,128)
-            try:
-                _ml.check_oaep_padding("hello",OAEP_PARAMETER,128)
-            except:
-                pass
-            try:
-                _ml.add_oaep_padding(s20k,OAEP_PARAMETER,128)
-            except:
-                pass
-            try:
-                _ml.add_oaep_padding("a"*127,OAEP_PARAMETER,128)
-            except:
-                pass
+	_ml.sha1(s20k)
+	_ml.aes_ctr128_crypt(key,s20k,0)
+	_ml.aes_ctr128_crypt(key,s20k,2000)
+	_ml.aes_ctr128_crypt(key,"",2000,20000)
+	_ml.aes_ctr128_crypt(key,"",0,20000)
+	_ml.aes_ctr128_crypt(key,s20k,0,2000)
+	try:
+	    _ml.aes_ctr128_crypt("abc",s20k,0,2000)
+	except:
+	    pass
+	_ml.strxor(s20k,s20k)
+	try:
+	    _ml.strxor(s20k,keytxt)
+	except:
+	    pass
+	_ml.openssl_seed(s20k)
+	r = _ml.add_oaep_padding("Hello",OAEP_PARAMETER,128)
+	_ml.check_oaep_padding(r,OAEP_PARAMETER,128)
+	try:
+	    _ml.check_oaep_padding("hello",OAEP_PARAMETER,128)
+	except:
+	    pass
+	try:
+	    _ml.add_oaep_padding(s20k,OAEP_PARAMETER,128)
+	except:
+	    pass
+	try:
+	    _ml.add_oaep_padding("a"*127,OAEP_PARAMETER,128)
+	except:
+	    pass
 
 def testLeaks2():
     print "Trying to leak (rsa)"
 
     s20 = "a"*20
     p = pk_generate(512)
-    n,e = _ml.rsa_get_public_key(p)
+    n,e = p.get_public_key()
 
     while 1:
         if 1:
             p = pk_generate(512)
             pk_decrypt(pk_encrypt(s20,p),p)
             for public in (0,1):
-                x = _ml.rsa_encode_key(p,public)
+                x = p.encode_key(public)
                 _ml.rsa_decode_key(x,public)
-            _ml.rsa_get_public_key(p)
+            p.get_public_key()
             _ml.rsa_make_public_key(n,e)
 
+	    # XXXX rest of rsa functionality
+
 #----------------------------------------------------------------------
 
 def timeAll():
@@ -550,3 +560,4 @@
     timeAll()
     #testLeaks1()
     #testLeaks2()
+

Index: test.py
===================================================================
RCS file: /home/minion/cvsroot/src/minion/lib/mixminion/test.py,v
retrieving revision 1.18
retrieving revision 1.19
diff -u -d -r1.18 -r1.19
--- test.py	12 Aug 2002 21:05:50 -0000	1.18
+++ test.py	19 Aug 2002 15:33:56 -0000	1.19
@@ -17,11 +17,11 @@
 import sys
 import threading
 import time
-import atexit
-import tempfile
 import types
 import re
 import binascii
+import stat
+import cPickle
 
 from mixminion.Common import MixError, MixFatalError, MixProtocolError, getLog
 
@@ -30,6 +30,69 @@
 except ImportError:
     import mixminion._unittest as unittest
 
+# Test for acceptable permissions and uid on directory?
+_MM_TESTING_TEMPDIR_PARANOIA = 1
+# Holds 
+_MM_TESTING_TEMPDIR = None
+_MM_TESTING_TEMPDIR_COUNTER = 0
+_MM_TESTING_TEMPDIR_REMOVE_ON_EXIT = 1
+def mix_mktemp(extra=""):
+    '''mktemp wrapper. puts all files under a securely mktemped
+       directory.'''
+    global _MM_TESTING_TEMPDIR
+    global _MM_TESTING_TEMPDIR_COUNTER
+    if _MM_TESTING_TEMPDIR is None:
+	import tempfile
+	temp = tempfile.mktemp()
+	paranoia = _MM_TESTING_TEMPDIR_PARANOIA
+	if paranoia and os.path.exists(temp):
+	    print "I think somebody's trying to exploit mktemp."
+	    sys.exit(1)
+	try:
+	    os.mkdir(temp, 0700)
+	except OSError, e:
+	    print "Something's up with mktemp: %s" % e
+	    sys.exit(1)
+	if not os.path.exists(temp):
+	    print "Couldn't create temp dir %r" %temp
+	    sys.exit(1)
+	st = os.stat(temp)
+	if paranoia and st[stat.ST_MODE] & 077:
+	    print "Couldn't create temp dir %r with secure permissions" %temp
+	    sys.exit(1)
+	if paranoia and st[stat.ST_UID] != os.getuid():
+	    print "The wrong user owns temp dir %r"%temp
+	    sys.exit(1)
+	# XXXX If we're on a bad system, the permissions on /tmp
+	# XXXX might be faulty.  We don't bother checking for that.
+	_MM_TESTING_TEMPDIR = temp
+	if _MM_TESTING_TEMPDIR_REMOVE_ON_EXIT:
+	    import atexit
+	    atexit.register(deltree, temp)
+    
+    _MM_TESTING_TEMPDIR_COUNTER += 1
+    return os.path.join(_MM_TESTING_TEMPDIR,
+			"tmp%05d%s" % (_MM_TESTING_TEMPDIR_COUNTER,extra))
+
+_WAIT_FOR_KIDS = 1
+def deltree(*dirs):
+    global _WAIT_FOR_KIDS
+    if _WAIT_FOR_KIDS:
+	print "Waiting for shred processes to finish."
+	waitForChildren()
+	_WAIT_FOR_KIDS = 0
+    for d in dirs:
+        if os.path.isdir(d):
+            for fn in os.listdir(d):
+		loc = os.path.join(d,fn)
+		if os.path.isdir(loc):
+		    deltree(loc)
+		else:
+		    os.unlink(loc)
+            os.rmdir(d)
+        elif os.path.exists(d):
+            os.unlink(d)
+
 def hexread(s):
     assert (len(s) % 2) == 0
     r = []
@@ -42,35 +105,6 @@
         r.append(chr(c))
     return "".join(r)
 
-def try_unlink(fnames):
-    if isinstance(fnames, types.StringType):
-        fnames = [fnames]
-    for fname in fnames:
-        try:
-            if os.path.isdir(fname):
-                try_unlink([os.path.join(fname,f) for f in os.listdir(fname)])
-                os.rmdir(fname)
-            else:
-                os.unlink(fname)
-        except OSError:
-            pass
-
-def try_unlink_db(fname):
-    '''Try to unlink an anydbm file(s)'''
-    for suffix in ("", ".bak", ".dat", ".dir"):
-        try_unlink(fname+suffix)
-
-_unlink_on_exit_list = []
-
-def unlink_db_on_exit(fname):
-    for suffix in ("", ".bak", ".dat", ".dir"):
-        _unlink_on_exit_list.append(fname+suffix)
-    
-def unlink_on_exit(*files):
-    _unlink_on_exit_list.extend(files)
-
-atexit.register(try_unlink, _unlink_on_exit_list)    
-
 def floatEq(f1,f2):
     return abs(f1-f2) < .00001
 
@@ -199,7 +233,7 @@
                               check,x[:-1]+ch,"B",128)
 
     def test_rsa(self):
-        p = _ml.rsa_generate(1024, 65535)
+        p = _ml.rsa_generate(1024, 65537)
 
         #for all of SIGN, CHECK_SIG, ENCRYPT, DECRYPT...
         for pub1 in (0,1):
@@ -237,7 +271,7 @@
         n,e = p.get_public_key()
         p2 = _ml.rsa_make_public_key(n,e)
         self.assertEquals((n,e), p2.get_public_key())
-        self.assertEquals(65535,e)
+        self.assertEquals(65537,e)
         self.assertEquals(p.encode_key(1), p.encode_key(1))
 
         # Try private-key ops with public key p3.
@@ -253,11 +287,10 @@
         self.failUnlessRaises(TypeError, p2.encode_key, 0)
         self.failUnlessRaises(TypeError, p3.encode_key, 0)
 
-        tf = tempfile.mktemp()
+        tf = mix_mktemp()
         tf_pub = tf + "1"
         tf_prv = tf + "2"
         tf_enc = tf + "3"
-        unlink_on_exit(tf_pub, tf_prv, tf_enc)
 
         p.PEM_write_key(open(tf_pub,'w'), 1)
         p.PEM_write_key(open(tf_prv,'w'), 0)
@@ -411,11 +444,12 @@
             self.failUnless(0 <= PRNG.getInt(i) < i)
 	for i in xrange(100):
 	    self.failUnless(0 <= PRNG.getFloat() < 1)
-	    self.failUnless(0 <= PRNG.getFloat(4) < 1)
-	    self.failUnless(0 <= PRNG.getFloat(5) < 1)
 
-	# Make sure shuffle only shuffles the first n.
 	lst = range(100)
+	# Test shuffle(0)
+	self.assertEquals(PRNG.shuffle(lst,0), [])
+	self.assertEquals(lst, range(100))
+	# Make sure shuffle only shuffles the last n.
 	PRNG.shuffle(lst,10)
 	later = [ item for item in lst[10:] if item >= 10 ]
 	s = later[:]
@@ -432,6 +466,14 @@
 	    for z in crossSection:
 		if z != crossSection[0]: allEq = 0
 	    self.failIf(allEq)
+	foundUnmoved = 0
+	for lst in lists:
+	    for inorder, shuffled in zip(lst, range(100)):
+		if inorder == shuffled:
+		    foundUnmoved = 1
+		    break
+	    if foundUnmoved: break
+	self.failUnless(foundUnmoved)
 	for lst in lists:
 	    s = lst[:]
 	    s.sort()
@@ -589,8 +631,7 @@
 
 class HashLogTests(unittest.TestCase):
     def test_hashlog(self):
-        fname = tempfile.mktemp(".db")
-        unlink_db_on_exit(fname)
+        fname = mix_mktemp(".db")
 
         h = [HashLog(fname, "Xyzzy")]
 
@@ -1018,12 +1059,10 @@
 class PacketHandlerTests(unittest.TestCase):
     def setUp(self):
         from mixminion.PacketHandler import PacketHandler
-        from tempfile import mktemp
         self.pk1 = BMTSupport.pk1
         self.pk2 = BMTSupport.pk2
         self.pk3 = BMTSupport.pk3
-        self.tmpfile = mktemp(".db")
-        unlink_db_on_exit(self.tmpfile)
+        self.tmpfile = mix_mktemp(".db")
         h = self.hlog = HashLog(self.tmpfile, "Z"*20)
 
         self.server1 = FakeServerInfo("127.0.0.1", 1, self.pk1, "X"*20)
@@ -1231,29 +1270,22 @@
 #----------------------------------------------------------------------
 # QUEUE
 
-import stat
 from mixminion.Common import waitForChildren
-from mixminion.Queue import Queue
+from mixminion.Queue import *
 
-def removeTempDirs(*dirs):
-    print "Waiting for shred processes to finish."
-    waitForChildren()
-    for d in dirs:
-        if os.path.isdir(d):
-            for fn in os.listdir(d):
-                os.unlink(os.path.join(d,fn))
-            os.rmdir(d)
-        elif os.path.exists(d):
-            os.unlink(d)
+class TestDeliveryQueue(DeliveryQueue):
+    def __init__(self,d):
+	DeliveryQueue.__init__(self,d)
+	self._msgs = None
+    def deliverMessages(self, msgList):
+	self._msgs = msgList
 
 class QueueTests(unittest.TestCase):
     def setUp(self):
-        import tempfile 
         mixminion.Common.installSignalHandlers(child=1,hup=0,term=0)
-        self.d1 = tempfile.mktemp("q1")
-        self.d2 = tempfile.mktemp("q2")
-        self.d3 = tempfile.mktemp("q3")
-        atexit.register(removeTempDirs, self.d1, self.d2, self.d3)
+        self.d1 = mix_mktemp("q1")
+        self.d2 = mix_mktemp("q2")
+        self.d3 = mix_mktemp("q3")
         
     def testCreateQueue(self):
         # Nonexistant dir.
@@ -1369,11 +1401,104 @@
         self.assertEquals(queue1.count(), 41)
         self.assert_(not os.path.exists(os.path.join(self.d2, "msg_"+h)))
 
+	# Test object functionality
+	obj = [ ("A pair of strings", "in a tuple in a list") ]
+	h1 = queue1.queueObject(obj)
+	h2 = queue1.queueObject(6060842)
+	self.assertEquals(obj, queue1.getObject(h1))
+	self.assertEquals(6060842, queue1.getObject(h2))
+	self.assertEquals(obj, cPickle.loads(queue1.messageContents(h1)))
+
         # Scrub both queues.
         queue1.removeAll()
         queue2.removeAll()
         queue1.cleanQueue()    
         queue2.cleanQueue()
+    
+    def testDeliveryQueues(self):
+	d_d = mix_mktemp("qd")
+
+	queue = TestDeliveryQueue(d_d)
+
+	h1 = queue.queueMessage("Address 1", "Message 1")
+	h2 = queue.queueMessage("Address 2", "Message 2")
+	self.assertEquals((0, "Address 1", "Message 1"), queue.get(h1))
+	queue.sendReadyMessages()
+	msgs = queue._msgs
+	self.assertEquals(2, len(msgs))
+	self.failUnless((h1, "Address 1", "Message 1", 0) in msgs)
+	self.failUnless((h2, "Address 2", "Message 2", 0) in msgs)
+	h3 = queue.queueMessage("Address 3", "Message 3")
+	queue.deliverySucceeded(h1)
+	queue.sendReadyMessages()
+	msgs = queue._msgs
+	self.assertEquals([(h3, "Address 3", "Message 3", 0)], msgs)
+
+	allHandles = queue.getAllMessages()
+	allHandles.sort()
+	exHandles = [h2,h3]
+	exHandles.sort()
+	self.assertEquals(exHandles, allHandles)
+	queue.deliveryFailed(h2, retriable=1)
+	queue.deliveryFailed(h3, retriable=0)
+
+	allHandles = queue.getAllMessages()
+	h4 = allHandles[0]
+	self.assertEquals([h4], queue.getAllMessages())
+	queue.sendReadyMessages()
+	msgs = queue._msgs
+	self.assertEquals([(h4, "Address 2", "Message 2", 1)], msgs)
+	self.assertNotEquals(h2, h4)
+	
+	queue.removeAll()
+	queue.cleanQueue()
+
+    def testDeliveryQueues(self):
+	d_m = mix_mktemp("qm")
+	queue = TimedMixQueue(d_m)
+	h1 = queue.queueMessage("Hello1")
+	h2 = queue.queueMessage("Hello2")
+	h3 = queue.queueMessage("Hello3")
+	b = queue.getBatch()
+	msgs = [h1,h2,h3]
+	msgs.sort()
+	b.sort()
+	self.assertEquals(msgs,b)
+	
+	cmq = CottrellMixQueue(d_m, 600, 6, .5)
+	# Not enough messages
+	self.assertEquals([], cmq.getBatch())
+	self.assertEquals([], cmq.getBatch())
+	# 8 messages: 2 get sent
+	for i in range(5):
+	    cmq.queueMessage("Message %s"%i)
+
+	b1, b2, b3 = cmq.getBatch(), cmq.getBatch(), cmq.getBatch()
+	self.assertEquals(2, len(b1))
+	self.assertEquals(2, len(b2))
+	self.assertEquals(2, len(b3))
+	allEq = 1
+	for x in xrange(13): #fails <one in a trillion
+	    b = cmq.getBatch()
+	    if b != b1:
+		allEq = 0; break
+	self.failIf(allEq)
+	# Don't send more than 3.
+	for x in xrange(100):
+	    cmq.queueMessage("Hello2 %s"%x)
+	for x in xrange(10):
+	    self.assertEquals(3, len(cmq.getBatch()))
+
+	bcmq = BinomialCottrellMixQueue(d_m, 600, 6, .5)
+	allThree = 1
+	for i in range(10):
+	    b = bcmq.getBatch()
+	    if not len(b)==3:
+		allThree = 0
+	self.failIf(allThree)
+
+	bcmq.removeAll()
+	bcmq.cleanQueue()
 
 #---------------------------------------------------------------------
 # LOGGING
@@ -1397,9 +1522,9 @@
         self.failUnless(buf.getvalue().endswith(
             "[ERROR] All your anonymity are belong to us\n"))
         
-        t = tempfile.mktemp("log")
+        t = mix_mktemp("log")
         t1 = t+"1"
-        unlink_on_exit(t, t1)
+
         log.addHandler(_FileLogHandler(t))
         log.info("Abc")
         log.info("Def")
@@ -1413,11 +1538,11 @@
         
 #----------------------------------------------------------------------
 # SIGHANDLERS
-# XXXX
+# FFFF Write tests here
 
 #----------------------------------------------------------------------
 # MMTP
-# XXXX Write more tests
+# FFFF Write more tests
 
 import mixminion.MMTPServer
 import mixminion.MMTPClient
@@ -1432,7 +1557,7 @@
     global certfile
     if isServer:
         if dhfile is None:
-            f = tempfile.mktemp()
+            f = mix_mktemp()
             dhfile = f+"_dh"
             pkfile = f+"_pk"
             certfile = f+"_cert"
@@ -1448,13 +1573,11 @@
 		print "[Generating DH parameters (not caching)...",
 		sys.stdout.flush()
                 _ml.generate_dh_parameters(dhfile, 0)
-                unlink_on_exit(dhfile)
 		print "done.]"
-            pk = _ml.rsa_generate(1024, 65535)
+            pk = _ml.rsa_generate(1024, 65537)
             pk.PEM_write_key(open(pkfile, 'w'), 0)
             _ml.generate_cert(certfile, pk, "Testing certificate",
                               time.time(), time.time()+365*24*60*60)
-            unlink_on_exit(certfile, pkfile)
             
 	pk = _ml.rsa_PEM_read_key(open(pkfile, 'r'), 0)
         return _ml.TLSContext_new(certfile, pk, dhfile)
@@ -1583,7 +1706,7 @@
                 server.process(0.1)
             t.join()
         finally:
-            getLog().setMinSeverity(severity) #unsuppress
+            getLog().setMinSeverity(severity) #unsuppress warning
                     
 #----------------------------------------------------------------------
 # Config files
@@ -1669,8 +1792,7 @@
             "Quz: 88 88\n\n[Sec3]\nIntAS: 9\nIntASD: 10\nIntAMD: 8\n"+
             "IntAMD: 10\nIntRS: 5\n\n"))
         # Test file input
-        fn = tempfile.mktemp()
-        unlink_on_exit(fn)
+        fn = mix_mktemp()
         
         file = open(fn, 'w')
         file.write(longerString)
@@ -1765,7 +1887,7 @@
         self.assertEquals(pa("192.168.0.1",0),
                           ("192.168.0.1", "255.255.255.255", 0, 65535))
 
-        # XXXX Won't work on Windows.
+        # XXXX This won't work on Windows.
         self.assertEquals(C._parseCommand("ls -l"), ("/bin/ls", ['-l']))
         self.assertEquals(C._parseCommand("rm"), ("/bin/rm", []))
         self.assertEquals(C._parseCommand("/bin/ls"), ("/bin/ls", []))
@@ -1821,7 +1943,7 @@
             cmd, opts = C._parseCommand(nonexistcmd)
             if os.path.exists(cmd):
                 # Ok, I guess they would.
-                self.failUnlessEquals(opts, ["-meow"])
+                self.assertEquals(opts, ["-meow"])
             else:
                 self.fail("_parseCommand is not working as expected")
         except ConfigError, e:
@@ -1866,19 +1988,25 @@
 Mode: relay
 """
 
+_IDENTITY_KEY = None
+def _getIdentityKey():
+    global _IDENTITY_KEY
+    if _IDENTITY_KEY is None:
+	_IDENTITY_KEY = mixminion.Crypto.pk_generate(2048)
+    return _IDENTITY_KEY
 
 import mixminion.Config
 import mixminion.ServerInfo
 class ServerInfoTests(unittest.TestCase):
     def testServerInfoGen(self):
-        d = tempfile.mktemp()
+	identity = _getIdentityKey()
+        d = mix_mktemp()
         conf = mixminion.Config.ServerConfig(string=SERVER_CONFIG)
-        identity = mixminion.Crypto.pk_generate(2048)
         if not os.path.exists(d):
             os.mkdir(d, 0700)
-        unlink_on_exit(d)
+
         inf = mixminion.ServerInfo.generateServerDescriptorAndKeys(conf,
-                                                                   identity,
+								   identity,
                                                                    d,
                                                                    "key1",
                                                                    d)
@@ -1913,7 +2041,7 @@
         # Now make sure everything was saved properly
         keydir = os.path.join(d, "key_key1")
         eq(inf, open(os.path.join(keydir, "ServerDesc")).read())
-        keys = mixminion.ServerInfo.ServerKeys(d, "key1", d)
+        keys = mixminion.ServerInfo.ServerKeyset(d, "key1", d)
         packetKey = mixminion.Crypto.pk_PEM_load(
             os.path.join(keydir, "mix.key"))
         eq(packetKey.get_public_key(),
@@ -1958,18 +2086,169 @@
                               mixminion.ServerInfo.ServerInfo,
                               None, badSig)
         
-        
-        
+#----------------------------------------------------------------------
+# Modules annd ModuleManager
+from mixminion.Modules import *
+
+# test of an example module that we load dynamically from
+EXAMPLE_MODULE_TEXT = \
+"""
+import mixminion.Modules
+from mixminion.Config import ConfigError
+
+class TestModule(mixminion.Modules.DeliveryModule):
+    def __init__(self):
+	self.processedMessages = []
+    def getName(self):
+	return "TestModule"
+    def getConfigSyntax(self):
+	return { "Example" : { "Foo" : ("REQUIRE", 
+					mixminion.Config._parseInt, None) } }
+    def validateConfig(self, cfg, entries, lines, contents):
+	if cfg['Example'] is not None:
+	    if cfg['Example'].get('Foo',1) % 2 == 0:
+		raise ConfigError("Foo was even")
+    def configure(self,cfg, manager):
+	if cfg['Example']:
+	    self.enabled = 1
+	    self.foo = cfg['Example'].get('Foo',1)
+	    manager.enableModule(self)
+	else:
+	    self.foo = None
+	    self.enabled = 0
+    def getServerInfoBlock(self):
+	if self.enabled:
+	    return "[Example]\\nFoo: %s\\n" % self.foo
+	else:
+	    return None
+    def getExitTypes(self):
+	return (1234,)
+    def processMessage(self, message, exitType, exitInfo):
+	self.processedMessages.append(message)
+	if exitInfo == 'fail?':
+	    return mixminion.Modules.DELIVER_FAIL_RETRY
+	elif exitInfo == 'fail!':
+	    return mixminion.Modules.DELIVER_FAIL_NORETRY
+	else:
+	    return mixminion.Modules.DELIVER_OK
+"""
+
+class ModuleManagerTests(unittest.TestCase):
+    def testModuleManager(self):
+	mod_dir = mix_mktemp()
+	home_dir = mix_mktemp()
+
+	os.mkdir(mod_dir, 0700)
+	f = open(os.path.join(mod_dir, "ExampleMod.py"), 'w')
+	f.write(EXAMPLE_MODULE_TEXT)
+	f.close()
+
+	cfg_test = SERVER_CONFIG_SHORT + """
+Homedir = %s
+ModulePath = %s
+Module ExampleMod.TestModule
+[Example]
+Foo: 99
+""" % (home_dir, mod_dir)
+
+        conf = mixminion.Config.ServerConfig(string=cfg_test)	
+	manager = conf.getModuleManager()
+	exampleMod = None
+	for m in manager.modules:
+	    if m.getName() == "TestModule":
+		exampleMod = m
+	self.failUnless(exampleMod is not None)
+	manager.configure(conf)
+
+	self.assertEquals(99, exampleMod.foo)
+        conf = mixminion.Config.ServerConfig(string=cfg_test)	
+	manager = conf.getModuleManager()
+	exampleMod = None
+	for m in manager.modules:
+	    if m.getName() == "TestModule":
+		exampleMod = m
+	self.failUnless(exampleMod is not None)
+
+	manager.configure(conf)
+	self.failUnless(exampleMod is manager.typeToModule[1234])
+
+	manager.queueMessage("Hello 1", 1234, "fail!")
+	manager.queueMessage("Hello 2", 1234, "fail?")
+	manager.queueMessage("Hello 3", 1234, "good")
+	manager.queueMessage("Drop very much", 
+			     mixminion.Modules.DROP_TYPE,  "")
+	queue = manager.queues['TestModule']
+	self.failUnless(isinstance(queue, 
+			   mixminion.Modules._SimpleModuleDeliveryQueue))
+	self.assertEquals(3, queue.count())
+	self.assertEquals(exampleMod.processedMessages, [])
+	try:
+	    severity = getLog().getMinSeverity()
+	    getLog().setMinSeverity("FATAL") #suppress warning
+	    manager.sendReadyMessages()
+	finally:
+            getLog().setMinSeverity(severity) #unsuppress warning
+	self.assertEquals(1, queue.count())
+	self.assertEquals(3, len(exampleMod.processedMessages))
+	manager.sendReadyMessages()
+	self.assertEquals(1, queue.count())
+	self.assertEquals(4, len(exampleMod.processedMessages))
+	self.assertEquals("Hello 2", exampleMod.processedMessages[-1])
+	
+	# Check serverinfo generation.
+	try:
+	    severity = getLog().getMinSeverity()
+	    getLog().setMinSeverity("ERROR")
+	    info = mixminion.ServerInfo.generateServerDescriptorAndKeys(
+		conf, _getIdentityKey(), home_dir, "key11", home_dir)
+	    self.failUnless(info.find("\n[Example]\nFoo: 99\n") >= 0)
+	finally:
+            getLog().setMinSeverity(severity) #unsuppress warning
+
+	# 
+	# Try again, this time with the test module disabled.
+	# 
+	cfg_test = SERVER_CONFIG_SHORT + """
+Homedir = %s
+ModulePath = %s
+Module ExampleMod.TestModule
+""" % (home_dir, mod_dir)
+
+        conf = mixminion.Config.ServerConfig(string=cfg_test)	
+	manager = conf.getModuleManager()
+	exampleMod = None
+	for m in manager.modules:
+	    if m.getName() == "TestModule":
+		exampleMod = m
+	self.failUnless(exampleMod is not None)
+	manager.configure(conf)
+	
+	self.failIf(exampleMod is manager.typeToModule.get(1234))
+
+	# Failing validation
+	cfg_test = SERVER_CONFIG_SHORT + """
+Homedir = %s
+ModulePath = %s
+Module ExampleMod.TestModule
+[Example]
+Foo: 100
+""" % (home_dir, mod_dir)
+	
+	# FFFF Add tests for catching exceptions from buggy modules
+
+#----------------------------------------------------------------------
 def testSuite():
     suite = unittest.TestSuite()
     loader = unittest.TestLoader()
     tc = loader.loadTestsFromTestCase
+
     suite.addTest(tc(MinionlibCryptoTests))
     suite.addTest(tc(CryptoTests))
     suite.addTest(tc(FormatTests))
     suite.addTest(tc(LogTests))
     suite.addTest(tc(ConfigFileTests))
     suite.addTest(tc(ServerInfoTests))
+    suite.addTest(tc(ModuleManagerTests))
     suite.addTest(tc(HashLogTests))
     suite.addTest(tc(BuildMessageTests))
     suite.addTest(tc(PacketHandlerTests))