[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]
[minion-cvs] Refactor ServerMain; debug the universe.
Update of /home/minion/cvsroot/src/minion/lib/mixminion
In directory moria.seul.org:/tmp/cvs-serv20327/lib/mixminion
Modified Files:
Common.py Config.py Crypto.py MMTPServer.py Modules.py
Queue.py ServerInfo.py ServerMain.py __init__.py benchmark.py
test.py
Log Message:
Refactor ServerMain; debug the universe.
Common.py:
Debug createPrivateDirs
Config.py:
Link ModuleManager to ServerConfig
Crypto.py:
More inlining in lioness_encrypt
Change default e from 65535 to 65537
Debug and tune shuffle
Optimize the heck out of PRNG.getInt
Optimize getFloat a bit.
MMTPServer.py:
Add FFFFs
Modules.py:
Add more comments
Make modules provide their own delivery queues; add a couple of
helper classes to make this easier.
Add 'immediate delivery' null-object queue for modules that don't
want to queue.
Debug module loading
Improve queueing and delivery
Queue.py:
Switch to createPrivateDir
Debug queueObject
Remove retryAt logic from DeliveryQueue: messages now get retried
at every opportunity
Remove timing logic from MixQueue
Debug *MixQueue
ServerInfo.py:
Rename ServerKeys to ServerKeyset
ServerMain.py: Refactor completely.
ServerKeyring now holds all of the server's key info
IncomingQueue, OutgoingQueue, Mix, and _MMTPConnection now encapsulate
the internal processing logic
MixminionServer is now a small class.
benchmark.py:
Add timing for shuffle
Make benchmark code run again (It was broken long ago by changes
to ServerInfo and _minionlib)
test.py:
Replace mktemp with a secure, paranoid replacement that can clean
up after itself; remove all old clean-up stuff.
getFloat no longer takes an arg
Tests for shuffle
Tests for Queue.queueObject
Tests for DeliveryQueue
Tests for *MixQueue
Cache 2048-bit RSA key; don't generate too many of those!
Tests for ModuleManager
Index: Common.py
===================================================================
RCS file: /home/minion/cvsroot/src/minion/lib/mixminion/Common.py,v
retrieving revision 1.12
retrieving revision 1.13
diff -u -d -r1.12 -r1.13
--- Common.py 12 Aug 2002 18:12:24 -0000 1.12
+++ Common.py 19 Aug 2002 15:33:55 -0000 1.13
@@ -13,6 +13,7 @@
import signal
import sys
import time
+import stat
from types import StringType
class MixError(Exception):
@@ -58,7 +59,7 @@
XXXX we don't check permissions properly yet."""
if not os.path.exists(d):
try:
- os.makedirs(s, 0700)
+ os.makedirs(d, 0700)
except OSError, e:
getLog().fatal("Unable to create directory %s"%d)
raise MixFatalError()
Index: Config.py
===================================================================
RCS file: /home/minion/cvsroot/src/minion/lib/mixminion/Config.py,v
retrieving revision 1.8
retrieving revision 1.9
diff -u -d -r1.8 -r1.9
--- Config.py 11 Aug 2002 07:50:34 -0000 1.8
+++ Config.py 19 Aug 2002 15:33:55 -0000 1.9
@@ -261,7 +261,7 @@
key = mixminion.Crypto.pk_decode_public_key(asn1)
except mixminion.Crypto.CryptoError:
raise ConfigError("Invalid public key")
- if key.get_public_key()[1] != 65535:
+ if key.get_public_key()[1] != 65537:
raise ConfigError("Invalid exponent on public key")
return key
@@ -695,12 +695,18 @@
_restrictFormat = 0
# XXXX Missing: Queue-Size / Queue config options
# XXXX timeout options
- # XXXX listen timeout??
- def __init__(self, fname=None, string=None):
+ # XXXX listen timeout??
+ # XXXX Retry options
+ def __init__(self, fname=None, string=None, moduleManager=None):
+ # We use a copy of SERVER_SYNTAX, because the ModuleManager will
+ # mess it up.
self._syntax = SERVER_SYNTAX.copy()
import mixminion.Modules
- self.moduleManager = mixminion.Modules.ModuleManager()
+ if moduleManager is None:
+ self.moduleManager = mixminion.Modules.ModuleManager()
+ else:
+ self.moduleManager = moduleManager
self._addCallback("Server", self.__loadModules)
_ConfigFile.__init__(self, fname, string)
@@ -715,13 +721,10 @@
accordingly."""
self.moduleManager.setPath(section.get('ModulePath', None))
for mod in section.get('Module', []):
- info("Loading module %s", mod)
+ getLog().info("Loading module %s", mod)
self.moduleManager.loadExtModule(mod)
self._syntax.update(self.moduleManager.getConfigSyntax())
-## if sections['Server']['PublicKeyLifeTime'][2] < 24*60*60:
-## raise ConfigError("PublicKeyLifetime must be at least 1 day.")
-## elif sections['Server']['PublicKeyLifeTime'][2] % (24*60*60) > 30:
-## getLog().warn("PublicKeyLifetime rounded to the nearest day")
-## nDays = sections[60*60*24]
+ def getModuleManager(self):
+ return self.moduleManager
Index: Crypto.py
===================================================================
RCS file: /home/minion/cvsroot/src/minion/lib/mixminion/Crypto.py,v
retrieving revision 1.13
retrieving revision 1.14
diff -u -d -r1.13 -r1.14
--- Crypto.py 12 Aug 2002 21:05:50 -0000 1.13
+++ Crypto.py 19 Aug 2002 15:33:56 -0000 1.14
@@ -11,6 +11,8 @@
import os
import sys
import stat
+# XXXX is this used?
+import struct
from types import StringType
import mixminion.Config
@@ -89,13 +91,18 @@
left = s[:DIGEST_LEN]
right = s[DIGEST_LEN:]
del s
- # Performance note: This business with sha1("".join([key,right,key]))
+ # Performance note: This business with sha1("".join((key,right,key)))
# may look slow, but it contributes only a 6% to the hashing step,
# which in turn contributes under 11% of the time for LIONESS.
- right = ctr_crypt(right, _ml.sha1("".join([key1,left,key1]))[:AES_KEY_LEN])
- left = _ml.strxor(left, _ml.sha1("".join([key2,right,key2])))
- right = ctr_crypt(right, _ml.sha1("".join([key3,left,key3]))[:AES_KEY_LEN])
- left = _ml.strxor(left, _ml.sha1("".join([key4,right,key4])))
+ right = _ml.aes_ctr128_crypt(
+ _ml.aes_key(_ml.sha1("".join((key1,left,key1)))[:AES_KEY_LEN]),
+ right, 0)
+ left = _ml.strxor(left, _ml.sha1("".join((key2,right,key2))))
+ right = _ml.aes_ctr128_crypt(
+ _ml.aes_key(_ml.sha1("".join((key3,left,key3)))[:AES_KEY_LEN]),
+ right, 0)
+ left = _ml.strxor(left, _ml.sha1("".join((key4,right,key4))))
+
return left + right
def lioness_decrypt(s,(key1,key2,key3,key4)):
@@ -161,7 +168,7 @@
data = key.crypt(data, 1, 0)
return check_oaep(data,OAEP_PARAMETER,bytes)
-def pk_generate(bits=1024,e=65535):
+def pk_generate(bits=1024,e=65537):
"""Generate a new RSA keypair with 'bits' bits and exponent 'e'. It is
safe to use the default value of 'e'.
"""
@@ -171,7 +178,7 @@
"""Extracts the modulus of a public key."""
return key.get_public_key()[0]
-def pk_from_modulus(n, e=65535L):
+def pk_from_modulus(n, e=65537L):
"""Given a modulus and exponent, creates an RSA public key."""
return _ml.rsa_make_public_key(long(n),long(e))
@@ -400,67 +407,48 @@
else:
n = min(n, size)
+ if n == size:
+ series = xrange(n-1)
+ else:
+ series = xrange(n)
# This permutation algorithm yields all permutation with equal
# probability (assuming a good rng); others do not.
- for i in range(n-1):
- swap = i+self.getInt(size-i)
- v = lst[swap]
- lst[swap] = lst[i]
- lst[i] = v
+ getInt = self.getInt
+ for i in series:
+ swap = i+getInt(size-i)
+ lst[swap],lst[i] = lst[i],lst[swap]
return lst[:n]
def getInt(self, max):
"""Returns a random integer i s.t. 0 <= i < max.
- The value of max must be less than 2**32."""
+ The value of max must be less than 2**30."""
- # FFFF This implementation isn't very good. It determines the number
- # of bytes in max (nBytes), and a bitmask 1 less than the first power
- # of 2 less than max.
- #
- # Then, it gets nBytes random bytes, ANDs them with the bitmask, and
- # checks to see whether the result is < max. If so, it returns. Else,
- # it generates more random bytes and tries again.
- #
- # On the plus side, this algorithm will obviously give all values
- # 0 <= i < max with equal probability. On the minus side, it
- # requires (on average) 2*nBytes entropy to do so.
-
- assert max > 0
- for bits in xrange(1,33):
- if max < 1<<bits:
- nBytes = ceilDiv(bits,8)
- mask = (1<<bits)-1
- break
- if bits == 33:
- raise "I didn't expect to have to generate a number over 2**32"
+ # FFFF This implementation is about 2-4x as good as the last one, but
+ # FFFF still could be better. It's faster than getFloat()*max.
- while 1:
- bytes = self.getBytes(nBytes)
- r = 0
- for byte in bytes:
- r = (r << 8) | ord(byte)
- r = r & mask
- if r < max:
- return r
+ assert 0 < max < 0x3ffffffff
+ _ord = ord
+ while 1:
+ # Get a random positive int between 0 and 0x7fffffff.
+ b = self.getBytes(4)
+ o = ((((((_ord(b[0])&0x7f)<<8) + _ord(b[1]))<<8) +
+ _ord(b[2]))<<8) + _ord(b[3])
+ # Retry if we got a value that would fall in an incomplete
+ # run of 'max' elements.
+ if 0x7fffffff - max >= o:
+ return o % max
- def getFloat(self, bytes=3):
+ def getFloat(self):
"""Return a floating-point number between 0 and 1. The number
will have 'bytes' bytes of resolution."""
- # We need to special-case the <4 byte case to get good performance
- # on Python<2.2
- if bytes <= 3:
- max = 1<<(bytes*8)
- tot = 0
- else:
- max = 1L<<(bytes*8)
- tot = 0L
- bytes = self.getBytes(bytes)
- for byte in bytes:
- tot = (tot << 8) | ord(byte)
- return float(tot)/max
+ b = self.getBytes(4)
+ _ord = ord
+ o = ((((((_ord(b[0])&0x7f)<<8) + _ord(b[1]))<<8) +
+ _ord(b[2]))<<8) + _ord(b[3])
+ return o / 2147483647.0
def _prng(self, n):
"""Abstract method: Must be overridden to return n bytes of fresh
Index: MMTPServer.py
===================================================================
RCS file: /home/minion/cvsroot/src/minion/lib/mixminion/MMTPServer.py,v
retrieving revision 1.10
retrieving revision 1.11
diff -u -d -r1.10 -r1.11
--- MMTPServer.py 12 Aug 2002 18:12:24 -0000 1.10
+++ MMTPServer.py 19 Aug 2002 15:33:56 -0000 1.11
@@ -621,6 +621,9 @@
MMTPClientConnection"""
def __init__(self, config):
self.context = config.getTLSContext(server=1)
+ # FFFF Don't always listen; don't always retransmit!
+ # FFFF Support listening on specific IPs
+ # FFFF File
self.listener = ListenConnection("127.0.0.1",
config['Outgoing/MMTP']['Port'],
LISTEN_BACKLOG,
@@ -631,7 +634,7 @@
def _newMMTPConnection(self, sock):
"""helper method. Creates and registers a new server connection when
the listener socket gets a hit."""
- # XXXX Check whether incoming IP is valid XXXX
+ # XXXX Check whether incoming IP is allowed!
tls = self.context.sock(sock, serverMode=1)
sock.setblocking(0)
con = MMTPServerConnection(sock, tls, self.onMessageReceived)
Index: Modules.py
===================================================================
RCS file: /home/minion/cvsroot/src/minion/lib/mixminion/Modules.py,v
retrieving revision 1.4
retrieving revision 1.5
diff -u -d -r1.4 -r1.5
--- Modules.py 12 Aug 2002 18:12:24 -0000 1.4
+++ Modules.py 19 Aug 2002 15:33:56 -0000 1.5
@@ -5,18 +5,20 @@
Type codes and dispatch functions for routing functionality."""
-__all__ = [ 'ModuleManager', 'DROP_TYPE', 'FWD_TYPE', 'SWAP_FWD_TYPE',
+__all__ = [ 'ModuleManager', 'DeliveryModule',
+ 'DROP_TYPE', 'FWD_TYPE', 'SWAP_FWD_TYPE',
'DELIVER_OK', 'DELIVER_FAIL_RETRY', 'DELIVER_FAIL_NORETRY',
'SMTP_TYPE', 'MBOX_TYPE' ]
import os
+import sys
import smtplib
import mixminion.Config
import mixminion.Packet
import mixminion.Queue
from mixminion.Config import ConfigError, _parseBoolean, _parseCommand
-from mixminion.Common import getLog
+from mixminion.Common import getLog, createPrivateDir
# Return values for processMessage
DELIVER_OK = 1
@@ -36,8 +38,8 @@
MBOX_TYPE = 0x0101 # Send the message to one of a fixed list of addresses
class DeliveryModule:
- """Abstract base for modules; delivery modules should implement the methods
- in this class.
+ """Abstract base for modules; delivery modules should implement
+ the methods in this class.
A delivery module has the following responsibilities:
* It must have a 0-argument contructor.
@@ -51,15 +53,21 @@
pass
def getConfigSyntax(self):
+ """Return a map from section names to section syntax, as described
+ in Config.py"""
pass
def validateConfig(self, sections, entries, lines, contents):
+ """See mixminion.Config.validate"""
pass
def configure(self, config, manager):
+ """Configure this object using a given Config object, and (if
+ required) register it with the module manager."""
pass
def getServerInfoBlock(self):
+ """Return a block for inclusion in a server descriptor."""
pass
def getName(self):
@@ -68,10 +76,17 @@
pass
def getExitTypes(self):
- """Return a list of numeric exit types which this module is willing to
+ """Return a sequence of numeric exit types that this module can
handle."""
pass
+ def createDeliveryQueue(self, queueDir):
+ """Return a DeliveryQueue object suitable for delivering messages
+ via this module. The default implementation returns a
+ SimpleModuleDeliveryQueue, which (though adequate) doesn't
+ batch messages intended for the same destination."""
+ return _SimpleModuleDeliveryQueue(self, queueDir)
+
def processMessage(self, message, exitType, exitInfo):
"""Given a message with a given exitType and exitInfo, try to deliver
it. Return one of:
@@ -81,6 +96,58 @@
DELIVER_FAIL_NORETRY (if the message shouldn't be tried later)."""
pass
+class _ImmediateDeliveryQueue:
+ """Helper class usable as delivery queue for modules that don't
+ actually want a queue. Such modules should have very speedy
+ processMessage() methods, and should never have deliery fail."""
+ def __init__(self, module):
+ self.module = module
+
+ def queueMessage(self, (exitType, exitInfo), message):
+ try:
+ res = self.module.processMessage(exitType, exitInfo, message)
+ if res == DELIVER_OK:
+ return
+ elif res == DELIVER_FAIL_RETRY:
+ getLog().error("Unable to retry delivery for message")
+ else:
+ getLog().error("Unable to deliver message")
+ except:
+ _, e, tb = sys.exc_info()
+ getLog().error(
+ "Exception delivering message: %s at line %s of %s",
+ e, tb.tb_lineno, tb.tb_frame.f_code.co_name)
+
+ def sendReadyMessages(self):
+ pass
+
+class _SimpleModuleDeliveryQueue(mixminion.Queue.DeliveryQueue):
+ """Helper class used as a default delivery queue for modules that
+ don't care about batching messages to like addresses."""
+ def __init__(self, module, directory):
+ mixminion.Queue.DeliveryQueue.__init__(self, directory)
+ self.module = module
+
+ def deliverMessages(self, msgList):
+ for handle, addr, message, n_retries in msgList:
+ try:
+ exitType, exitInfo = addr
+ result = self.module.processMessage(message,exitType,exitInfo)
+ if result == DELIVER_OK:
+ self.deliverySucceeded(handle)
+ elif result == DELIVER_FAIL_RETRY:
+ # XXXX We need to drop undeliverable messages eventually!
+ self.deliveryFailed(handle, 1)
+ else:
+ getLog().error("Unable to deliver message")
+ self.deliveryFailed(handle, 0)
+ except:
+ _, e, tb = sys.exc_info()
+ getLog().error(
+ "Exception delivering message: %s at line %s of %s",
+ e, tb.tb_lineno, tb.tb_frame.f_code.co_name)
+ self.deliveryFailed(handle, 0)
+
class ModuleManager:
"""A ModuleManager knows about all of the modules in the systems.
@@ -91,14 +158,15 @@
enabled."""
##
# Fields
- # myntax: extensions to the syntax configuration in Config.py
+ # syntax: extensions to the syntax configuration in Config.py
# modules: a list of DeliveryModule objects
# nameToModule: XXXX Docdoc
# typeToModule: a map from delivery type to enabled deliverymodule.
# path: search path for python modules.
# queueRoot: directory where all the queues go.
- # queues: a map from module name to queue.
-
+ # queues: a map from module name to queue (Queue objects must support
+ # queueMessage and sendReadyMessages as in DeliveryQueue.)
+
def __init__(self):
self.syntax = {}
self.modules = []
@@ -133,7 +201,10 @@
def setPath(self, path):
"""Sets the search path for Python modules"""
- self.path = path
+ if path:
+ self.path = path.split(":")
+ else:
+ self.path = []
def loadExtModule(self, className):
"""Load and register a module from a python file. Takes a classname
@@ -142,8 +213,8 @@
ids = className.split(".")
pyPkg = ".".join(ids[:-1])
pyClassName = ids[-1]
+ orig_path = sys.path[:]
try:
- orig_path = sys.path[:]
sys.path[0:0] = self.path
try:
m = __import__(pyPkg, {}, {}, [])
@@ -152,7 +223,7 @@
finally:
sys.path = orig_path
try:
- pyClass = getattr(m, pyClassname)
+ pyClass = getattr(m, pyClassName)
except AttributeError, e:
raise MixError("No class %s in module %s" %(pyClassName,pyPkg))
try:
@@ -165,8 +236,8 @@
m.validateConfig(sections, entries, lines, contents)
def configure(self, config):
- self.queueRoot = os.path.join(config['Server']['Homedir'],
- 'work', 'queues', 'deliver')
+ self._setQueueRoot(os.path.join(config['Server']['Homedir'],
+ 'work', 'queues', 'deliver'))
createPrivateDir(self.queueRoot)
for m in self.modules:
m.configure(config, self)
@@ -175,10 +246,15 @@
"""Maps all the types for a module object."""
for t in module.getExitTypes():
self.typeToModule[t] = module
+
queueDir = os.path.join(self.queueRoot, module.getName())
- queue = mixminion.Queue.Queue(queueDiir, create=1, scrub=1)
+ queue = module.createDeliveryQueue(queueDir)
self.queues[module.getName()] = queue
+ def cleanQueues(self):
+ for queue in self.queues.values():
+ queue.cleanQueue()
+
def disableModule(self, module):
"""Unmaps all the types for a module object."""
for t in module.getExitTypes():
@@ -189,30 +265,17 @@
def queueMessage(self, message, exitType, exitInfo):
mod = self.typeToModule.get(exitType, None)
- if mod is not None:
- queue = self.queues[mod.getName()]
- f, handle = queue.openNewMessage()
- cPickle.dumps((0, exitType, exitInfo, message), f, 1)
- queue.finishMessage(f, handle)
- else:
- getLog().error("Unable to queue message with unknown type %s",
+ if mod is None:
+ getLog().error("Unable to handle message with unknown type %s",
exitType)
+ return
+
+ queue = self.queues[mod.getName()]
+ queue.queueMessage((exitType,exitInfo), message)
- def processMessages(self):
+ def sendReadyMessages(self):
for name, queue in self.queues.items():
- if len(queue):
- XXXX
-
- def processMessage(self, message, exitType, exitInfo):
- """Tries to deliver a message. Return types are as in
- DeliveryModule.processMessage"""
- mod = self.typeToModule.get(exitType, None)
- if mod is not None:
- return mod.processMessage(message, exitType, exitInfo)
- else:
- getLog().error("Unable to deliver message with unknown type %s",
- exitType)
- return DELIVER_FAIL_NORETRY
+ queue.sendReadyMessages()
def getServerInfoBlocks(self):
return [ m.getServerInfoBlock() for m in self.modules ]
@@ -225,17 +288,21 @@
def getServerInfoBlock(self):
return ""
def configure(self, config, manager):
- manager.enable(self)
+ manager.enableModule(self)
def getName(self):
return "DROP"
def getExitTypes(self):
return [ DROP_TYPE ]
+ def createDeliveryQueue(self, directory):
+ return _ImmediateDeliveryQueue(self)
def processMessage(self, message, exitType, exitInfo):
getLog().debug("Dropping padding message")
return DELIVER_OK
#----------------------------------------------------------------------
class MBoxModule(DeliveryModule):
+ # XXXX This implementation can stall badly if we don't have a fast
+ # XXXX local MTA.
def __init__(self):
DeliveryModule.__init__(self)
self.command = None
@@ -257,20 +324,22 @@
def configure(self, config, moduleManager):
# XXXX Check this. error handling
+
self.enabled = config['Delivery/MBOX'].get("Enabled", 0)
- self.server = config['Delivery/MBOX']['SMTPServer']
- self.addressFile = config['Delivery/MBOX']['AddressFile']
- self.returnAddress = config['Delivery/MBOX']['ReturnAddress']
- self.contact = config['Delivery/MBOX']['RemoveContact']
- if self.enabled:
- if not self.addressFile:
- raise ConfigError("Missing AddressFile field in Delivery/MBOX")
- if not self.returnAddress:
- raise ConfigError("Missing ReturnAddress field "+
- "in Delivery/MBOX")
- if not self.contact:
- raise ConfigError("Missing RemoveContact field "+
- "in Delivery/MBOX")
+ if not self.enabled:
+ return
+
+ self.server = config['Delivery/MBOX']['SMTPServer']
+ self.addressFile = config['Delivery/MBOX']['AddressFile']
+ self.returnAddress = config['Delivery/MBOX']['ReturnAddress']
+ self.contact = config['Delivery/MBOX']['RemoveContact']
+ if not self.addressFile:
+ raise ConfigError("Missing AddressFile field in Delivery/MBOX")
+ if not self.returnAddress:
+ raise ConfigError("Missing ReturnAddress field in Delivery/MBOX")
+ if not self.contact:
+ raise ConfigError("Missing RemoveContact field in Delivery/MBOX")
+
self.nickname = config['Server']['Nickname']
if not self.nickname:
Index: Queue.py
===================================================================
RCS file: /home/minion/cvsroot/src/minion/lib/mixminion/Queue.py,v
retrieving revision 1.10
retrieving revision 1.11
diff -u -d -r1.10 -r1.11
--- Queue.py 12 Aug 2002 21:05:50 -0000 1.10
+++ Queue.py 19 Aug 2002 15:33:56 -0000 1.11
@@ -12,10 +12,12 @@
import stat
import cPickle
-from mixminion.Common import MixError, MixFatalError, secureDelete, getLog
+from mixminion.Common import MixError, MixFatalError, secureDelete, getLog, \
+ createPrivateDir
from mixminion.Crypto import AESCounterPRNG
-__all__ = [ 'Queue' ]
+__all__ = [ 'Queue', 'DeliveryQueue', 'TimedMixQueue', 'CottrellMixQueue',
+ 'BinomialCottrellMixQueue' ]
# Mode to pass to open(2) for creating a new file, and dying if it already
# exists.
@@ -79,7 +81,7 @@
if not os.path.exists(location):
if create:
getLog().info("Trying to create queue %s", location)
- os.mkdir(location, 0700)
+ createPrivateDir(location)
else:
raise MixFatalError("No directory for queue %s" % location)
@@ -90,7 +92,7 @@
getLog().warn("Worrisome mode %o on directory %s", mode, location)
if scrub:
- self.cleanQueue(1)
+ self.cleanQueue()
# Count messages on first time through.
self.n_entries = -1
@@ -107,7 +109,7 @@
"""Queue an object using cPickle, and return a handle to that
object."""
f, handle = self.openNewMessage()
- cPickle.dump(contents, f, 1)
+ cPickle.dump(object, f, 1)
self.finishMessage(f, handle)
return handle
@@ -261,7 +263,7 @@
"""A DeliveryQueue implements a queue that greedily sends messages
to outgoing streams that occasionally fail. Messages in a
DeliveryQueue are no longer unstructured text, but rather
- tuples of: (n_retries, next_retry_time, addressing info, msg).
+ tuples of: (n_retries, addressing info, msg).
This class is abstract. Implementors of this class should
subclass it to add a deliverMessages method. Multiple
@@ -278,11 +280,8 @@
"""
###
# Fields:
- # sendableAt -- An inorder list of (time, handle) for all messages
- # that we're not currently sending. Each time is either a
- # number of seconds since the epoch at which the corresponding
- # message will be sendable, or 0 to indicate that the message
- # is sendable _now_.
+ # sendable -- A list of handles for all messages
+ # that we're not currently sending.
# pending -- Dict from handle->1, for all messages that we're
# currently sending.
@@ -293,63 +292,47 @@
def _rescan(self):
"""Rebuild the internal state of this queue from the underlying
directory."""
- self.sendableAt = []
self.pending = {}
- now = time.time()
- for h in self.getAllmessages():
- t = self.getObject(handle)[1] # retry time
- self.sendableAt.append[(t, h)]
- self.sendableAt.sort()
+ self.sendable = self.getAllMessages()
- def queueMessage(self, addr, msg, retry=0, retryAt=0):
+ def queueMessage(self, addr, msg, retry=0):
"""Schedule a message for delivery.
addr -- An object to indicate the message's destination
msg -- the message itself
- retry -- how many times so far have we tried to send?
- retryAt -- when should we next retry? (0 for now)."""
+ retry -- how many times so far have we tried to send?"""
- self.queueObject( (retry, retryAt, addr, msg) )
+ handle = self.queueObject( (retry, addr, msg) )
+ self.sendable.append(handle)
- if retryAt == 0:
- self.sendableAt[0:0] = [(0, handle)]
- else:
- bisect.insort_right(self.sendableAt, (retryAt, handle))
return handle
- def nextMessageReadyAt(self):
- """Return the soonest possible time at which sendReadyMessages
- will send something. If some time < now is returned,
- the answer is 'immediately'. If 'None' is returned, there are
- no messages in the queue."""
- if self.sendableAt:
- return self.sendableAt[0][0]
- else:
- return None
-
- def messageContents(self,handle):
- """Returns a (n_retries, retryAt, addr, msg) payload for a given
+ def get(self,handle):
+ """Returns a (n_retries, addr, msg) payload for a given
message handle."""
return self.getObject(handle)
def sendReadyMessages(self):
- """Sends all messages which are ready to be send, and which are not
- already being sent."""
- now = time.time()
- idx = bisect.bisect_right(self.sendableAt, (now, ""))
+ """Sends all messages which are not already being sent."""
+
+ handles = self.sendable
messages = []
- sendable = self.sendableAt[:idx]
- del self.sendableAt[:idx]
- for when, h in self.sendable:
- retries, retryAt, addr, msg = self.getObject(h)
+ self.sendable = []
+ for h in handles:
+ retries, addr, msg = self.getObject(h)
messages.append((h, addr, msg, retries))
self.pending[h] = 1
if messages:
- self.deliverMessage(messages)
+ self.deliverMessages(messages)
def deliverMessages(self, msgList):
"""Abstract method; Invoked with a list of
(handle, addr, message, n_retries) tuples every time we have a batch
- of messages to send."""
+ of messages to send.
+
+ For every handle in the list, delierySucceeded or deliveryFailed
+ should eventually be called, or the message will sit in the queue
+ indefinitely, without being retried."""
+
# We could implement this as a single deliverMessage(h,addr,m,n)
# method, but that wouldn't allow implementations to batch
# messages being sent to the same address.
@@ -363,18 +346,18 @@
self.removeMessage(handle)
del self.pending[handle]
- def deliveryFailed(self, handle, retryAt=None):
+ def deliveryFailed(self, handle, retriable=0):
"""Removes a message from the outgoing queue, or requeues it
for delivery at a later time. This method should be
invoked after the corresponding message has been
successfully delivered."""
del self.pending[handle]
- if self.retryAt is not None:
+ if retriable:
# Queue the new one before removing the old one, for
# crash-proofness
- retries, retryAt, addr, msg = self.getObject(h)
- self.queueMessage(addr, msg, retries+1, retryAt)
- self.removeMessage(handle)
+ retries, addr, msg = self.getObject(handle)
+ self.queueMessage(addr, msg, retries+1)
+ self.removeMessage(handle)
class TimedMixQueue(Queue):
"""A TimedMixQueue holds a group of files, and returns some of them
@@ -385,27 +368,15 @@
every 'interval' seconds."""
Queue.__init__(self, location, create=1, scrub=1)
self.interval = interval
- self.nextSendTime = time.time + interval
- def nextMessageReadyAt(self):
- """Return the next time at which the pool will be ready to send
- messages"""
- return self.nextSendTime
-
- def getReadyMessages(self):
+ def getBatch(self):
"""Return handles for all messages that the pool is currently ready
- to send."""
- now = time.time()
- if now < self.nextSendTime:
- return []
- self.nextSendTime = now + self.interval
- return self._getBatch()
-
- def _getBatch(self):
- """Internal method: called by getReadyMessages to return a single
- batch of handles."""
+ to send in the next batch"""
return self.pickRandom()
+ def getInterval(self):
+ return self.interval
+
class CottrellMixQueue(TimedMixQueue):
"""A CottrellMixQueue holds a group of files, and returns some of them
as requested, according the Cottrell (timed dynamic-pool) mixing
@@ -416,22 +387,24 @@
and never sends more than maxSendRate * the current pool size."""
TimedMixQueue.__init__(self, location, interval)
self.minPoolSize = minPoolSize
- self.maxSendRate = maxSendRate
+ self.maxBatchSize = int(maxSendRate*minPoolSize)
+ if self.maxBatchSize < 1:
+ self.maxBatchSize = 1
- def _getBatch(self):
+ def getBatch(self):
pool = self.count()
- nTransmit = min(pool-self.minPoolSize, int(pool*self.maxSendRate))
+ nTransmit = min(pool-self.minPoolSize, self.maxBatchSize)
return self.pickRandom(nTransmit)
class BinomialCottrellMixQueue(CottrellMixQueue):
"""Same algorithm as CottrellMixQueue, but instead of sending N messages
from the pool of size P, sends each message with probability N/P."""
- def _getBatch(self):
+ def getBatch(self):
pool = self.count()
- nTransmit = min(pool-self.minPoolSize, int(pool*self.maxSendRate))
+ nTransmit = min(pool-self.minPoolSize, self.maxBatchSize)
msgProbability = float(nTransmit) / pool
- return rng.shuffle([ h for h in self.getAllMessages()
- if self.rng.getFloat() < msgProbability ])
+ return self.rng.shuffle([ h for h in self.getAllMessages()
+ if self.rng.getFloat() < msgProbability ])
def _secureDelete_bg(files, cleanFile):
pid = os.fork()
Index: ServerInfo.py
===================================================================
RCS file: /home/minion/cvsroot/src/minion/lib/mixminion/ServerInfo.py,v
retrieving revision 1.9
retrieving revision 1.10
diff -u -d -r1.9 -r1.10
--- ServerInfo.py 12 Aug 2002 18:12:24 -0000 1.9
+++ ServerInfo.py 19 Aug 2002 15:33:56 -0000 1.10
@@ -132,7 +132,7 @@
return IPV4Info(self.getAddr(), self.getPort(), self.getKeyID())
#----------------------------------------------------------------------
-class ServerKeys:
+class ServerKeyset:
"""A set of expirable keys for use by a server.
A server has one long-lived identity key, and two short-lived
@@ -143,7 +143,7 @@
Whether we publish or not, we always generate a server descriptor
to store the keys' lifetimes.
- When we create a new ServerKeys object, the associated keys are not
+ When we create a new ServerKeyset object, the associated keys are not
read from disk unil the object's load method is called."""
def __init__(self, keyroot, keyname, hashroot):
keydir = self.keydir = os.path.join(keyroot, "key_"+keyname)
@@ -232,7 +232,7 @@
packetKey = mixminion.Crypto.pk_generate(PACKET_KEY_BYTES*8)
mmtpKey = mixminion.Crypto.pk_generate(PACKET_KEY_BYTES*8)
- serverKeys = ServerKeys(keydir, keyname, hashdir)
+ serverKeys = ServerKeyset(keydir, keyname, hashdir)
serverKeys.packetKey = packetKey
serverKeys.mmtpKey = mmtpKey
serverKeys.save()
Index: ServerMain.py
===================================================================
RCS file: /home/minion/cvsroot/src/minion/lib/mixminion/ServerMain.py,v
retrieving revision 1.3
retrieving revision 1.4
diff -u -d -r1.3 -r1.4
--- ServerMain.py 12 Aug 2002 18:12:24 -0000 1.3
+++ ServerMain.py 19 Aug 2002 15:33:56 -0000 1.4
@@ -8,7 +8,11 @@
BUG: No support for encrypting private keys.n"""
import cPickle
+import os
+import mixminion._minionlib
+import mixminion.Queue
+from mixminion.ServerInfo import ServerKeySet, ServerInfo
from mixminion.Common import getLog, MixFatalError, MixError, createPrivateDir
# Directory layout:
@@ -27,80 +31,55 @@
# conf/miniond.conf
# ....
-class ServerState:
- # XXXX This should be refactored. keys should be separated from queues.
- # config
- # log
- # homedir
+class ServerKeyring:
+ # homeDir: ----
+ # keysDir: ----
+ # keySloppiness: ----
+ # keyIntervals: list of (start, end, ServerKeySetName)
def __init__(self, config):
- self.config = config
-
- #XXXX DOCDOC
- # set up directory structure.
- c = self.config
- self.homedir = c['Server']['Homedir']
- createPrivateDir(self.homedir)
- getLog()._configure(self.config)# ????
-
- w = os.path.join(self.homeDir, "work")
- q = os.path.join(w, "queues")
- self.incomingDir = os.path.join(q, "incoming")
- self.mixDir = os.path.join(q, "mix")
- self.outgoingDir = os.path.join(q, "outgoing")
- self.deliverDir = os.path.join(q, "deliver")
-
- tlsDir = os.path.join(w, "tls")
- self.hashlogsDir = os.path.join(w, "hashlogs")
- self.keysDir = os.path.join(self.homeDir, "keys")
- self.confDir = os.path.join(self.homeDir, "conf")
-
- for d in [self.homeDir, w, q, self.incomingDir, self.mixDir,
- self.outgoingDir, self.deliverDir, tlsDir,
- self.hashlogsDir, self.keysDir, self.confDir]:
- createPrivateDir(d)
-
- for name in ("incoming", "mix", "outgoing"):
- loc = getattr(self, name+"Dir")
- queue = mixminion.Queue.Queue(loc, create=1, scrub=1)
- setattr(self, name+"Queue", queue)
-
- self.dhFile = os.path.join(tlsDir, "dhparam")
-
- self.checkKeys()
-
- def getDHFile(self):
- if not os.path.exists(self.dhFile):
- getLog().info("Generating Diffie-Helman parameters for TLS...")
- mixminion._minionlib.generate_dh_parameters(self.dhFile, verbose=0)
- getLog().info("...done")
+ self.configure(config)
- return self.dhFile
+ def configure(self):
+ self.homeDir = config['Server']['Homedir']
+ self.keyDir = os.path.join(self.homeDir, 'keys')
+ self.keySloppiness = config['Server']['PublicKeySloppiness']
+ self.checkKeys()
def checkKeys(self):
- self.keyIntervals = [] # list of start, end, keysetname
+ self.keyIntervals = []
for dirname in os.listdir(self.keysDir):
if not dirname.startswith('key_'):
+ getLog().warn("Unexpected directory %s under %s",
+ dirname, self.keysDir)
continue
keysetname = dirname[4:]
d = os.path.join(self.keysDir, dirname)
si = os.path.join(self.keysDir, "ServerDesc")
if os.path.exists(si):
- inf = mixminion.ServerInfo.ServerInfo(fname=si, assumeValid=1)
+ inf = ServerInfo(fname=si, assumeValid=1)
t1 = inf['Server']['Valid-After']
t2 = inf['Server']['Valid-Until']
self.keyIntervals.append( (t1, t2, keysetname) )
self.keyIntervals.sort()
-
+
def removeDeadKeys(self):
now = time.time()
- cutoff = now - config['Server']['PublicKeySloppiness']
- names = [ os.path.join(self.keyDir,"key_"+name)
+ cutoff = now - self.keySloppiness
+ dirs = [ os.path.join(self.keyDir,"key_"+name)
for va, vu, name in self.keyIntervals if vu < cutoff ]
- # XXXX DELETE KEYS
-
+
+ for dirname in dirs:
+ files = [ os.path.join(dirname,f)
+ for f in os.listdir(dirname) ])
+ secureDelete(filenames, blocking=1)
+ os.rmdir(dirname)
+
+ self.checkKeys()
+
def _getLiveKey(self):
+ # returns valid-after, valid-until, name
now = time.time()
idx = bisect.bisect_left(self.keyIntervals, (now, None, None))
return self.keyIntervals[idx]
@@ -108,136 +87,182 @@
def getNextKeyRotation(self):
return self._getLiveKey()[1]
- def getServerKeys(self):
- keyset = self._getLiveKey()[2]
- sk = mixminion.ServerInfo.ServerKeys(self.keyDir, keyset,
- self.hashlogsDir)
- sk.load()
- return sk
+ def getServerKeyset(self):
+ # FFFF Support passwords on keys
+ _, _, name = self._getLiveKey()
+ hashroot = os.path.join(self.homeDir, 'work', 'hashlogs')
+ keyset = ServerKeySet(self.keyDir, name, hashroot)
+ keyset.load
+ return self.keyset
+
+ def getDHFile(self):
+ dhdir = os.path.join(self.homedir, 'work', 'tls')
+ createPrivateDir(dhdir)
+ dhfile = os.path.join(dhdir, 'dhparam')
+ if not os.path.exists(dhfile):
+ getLog().info("Generating Diffie-Helman parameters for TLS...")
+ mixminion._minionlib.generate_dh_parameters(self.dhfile, verbose=0)
+ getLog().info("...done")
+ return dhfile
+
def getTLSContext(self):
- # XXXX NO SUPPORT FOR ROTATION
- keys = self.getServerKeys()
- return mixminion._minionlib.TLSContext_new(keys.certFile,
- keys.mmtpKey,
- self.dhFile)
-
+ keys = self.getServerKeyset()
+ return mixminion._minionlib.TLSContext_new(keys.getCertFileName(),
+ keys.GetMMTPKey(),
+ self.getDHFile())
+
def getPacketHandler(self):
- keys = self.getServerKeys()
- return mixminion.PacketHandler.PacketHandler(keys.packetKey,
- keys.hashlogFile)
+ keys = self.getServerKeyset()
+ return mixminion.PacketHandler.PacketHandler(keys.getPacketKey(),
+ keys.getHashLogFile())
- def getIncomingQueues(self):
- return self.incomingQueue
+class IncomingQueue(mixminion.Queue.DeliveryQueue):
+ def __init__(self, location, packetHandler):
+ mixminion.Queue.DeliveryQueue.__init__(self, location)
+ self.packetHandler = packetHandler
+ self.mixQueue = None
- def getOutgoingQueue(self):
- return self.outgoingQueue
+ def connectQueues(self, mixQueue):
+ self.mixQueue = mixQueue
- def getMixQueue(self):
- return self.mixQueue
+ def queueMessage(self, msg):
+ mixminion.Queue.queueMessage(None, msg)
+
+ def deliverMessages(self, msgList):
+ ph = self.packetHandler
+ for handle, _, message, n_retries in msgList:
+ try:
+ res = ph.packetHandler(message)
+ if res is None:
+ log.info("Padding message dropped")
+ else:
+ self.mixQueue.queueObject(res)
+ self.deliverySucceeded(handle)
+ except mixminion.Crypto.CryptoError, e:
+ log.warn("Invalid PK or misencrypted packet header:"+str(e))
+ self.deliveryFailed(handle)
+ except mixminion.Packet.ParseError, e:
+ log.warn("Malformed message dropped:"+str(e))
+ self.deliveryFailed(handle)
+ except mixminion.PacketHandler.ContentError, e:
+ log.warn("Discarding bad packet:"+str(e))
+ self.deliveryFailed(handle)
- def getDeliverMBOXQueue(self, which):
- return self.deliverMBOXQueue
+class MixQueue:
+ def __init__(self, queue):
+ self.queue = queue
+ self.outgoingQueue = None
+ self.moduleManager = None
-class _Server(MMTPServer):
- def __init__(self, config, serverState):
- self.incomingQueue = serverState.getIncomingQueue()
- self.outgoingQueue = serverState.getOutgoingQueue()
+ def connectQueues(self, outgoing, manager):
+ self.outgoingQueue = outgoing
+ self.moduleManager = manager
+
+ def mix(self):
+ handles = self.queue.getBatch()
+ for h in handles:
+ tp, info = self.queue.getObject(h)
+ if tp == 'EXIT':
+ rt, ri, app_key, payload = info
+ self.moduleManger.queueMessage((rt, ri), payload)
+ else:
+ assert tp == 'QUEUE'
+ ipv4, msg = info
+ self.outgoingQueue.queueMessage(ipv4, msg)
+
+class OutgoingQueue(mixminion.Queue.DeliveryQueue):
+ def __init__(self, location):
+ OutgoingQueue.__init__(self, location)
+ self.server = None
+
+ def connectQueues(self, server):
+ self.server = sever
+
+ def deliverMessages(self, msgList):
+ # Map from addr -> [ (handle, msg) ... ]
+ msgs = {}
+ for handle, addr, message, n_retries in msgList:
+ msgs.setdefault(addr, []).append( (handle, message) )
+ for addr, messages in msgs.items():
+ messages, handles = zip(*messages)
+ self.server.sendMessages(addr.ip, addr.port, addr.keyinfo,
+ messages, handles)
+
+class _MMTPConnection(MMTPServer):
+ def __init__(self, config):
MMTPServer.__init__(self, config)
-
+
+ def connectQueues(self, incoming, outgoing)
+ self.incomingQueue = incoming
+ self.outgoingQueue = outgoing
+
def onMessageReceived(self, msg):
self.incomingQueue.queueMessage(msg)
def onMessageSent(self, msg, handle):
- self.outgoingQueue.remove(handle)
+ self.outgoingQueue.deliverySucceeded(handle)
-def runServer(config):
- s = ServerState(config)
- log = getLog()
- packetHandler = s.getPacketHandler()
- context = s.getTLSContext()
+ def onMessageUndeliverable(self, msg, handle, retriable):
+ self.outgoingQueue.deliveryFailed(handle, retriable)
- # XXXX Make these configurable; make mixing OO.
- mixInterval = 60
- mixPoolMinSize = 5
- mixPoolMaxRate = 0.5
- nextMixTime = time.time() + mixInterval
+class MixminionServer:
+ def __init__(self, config, keyring):
+ self.config = config
+ self.keyring = ServerKeyring(config)
+
+ self.packetHandler = self.keyring.getPacketHandler()
+ self.mmtpConnection = _MMTPConnection(config)
- server = _Server(config, s)
+ # FFFF Modulemanager should know about async so it can patch in if it
+ # FFFF needs to.
+ self.moduleManager = config.getModuleManager()
- incomingQueue = s.getIncomingQueue()
- outgoingQueue = s.getOutgoingQueue()
- mixQueue = s.getMixQueue()
- deliverMBOXQueue = s.getDeliverMBOXQueue()
- while 1: # Add shutdown mechanism XXXX
- server.process(1)
+ homeDir = config['Server']['Homedir']
+ queueDir = os.path.join(homeDir, 'work', 'queues')
- # Possible timing attack here????? XXXXX ????
- if incomingQueue.count():
- for h in incomingQueue.getAllMessages():
- msg = incomingQueue.messageContents(h)
- res = None
- try:
- res = packetHandler.processHandler(msg)
- if res is None: log.info("Padding message dropped")
- except miximinion.Packet.ParseError, e:
- log.warn("Malformed message dropped:"+str(e))
- except miximinion.Crypto.CryptoError, e:
- log.warn("Invalid PK or misencrypted packet header:"+str(e))
- except mixminion.PacketHandler.ContentError, e:
- log.warn("Discarding bad packet:"+str(e))
+ incomingDir = os.path.join(queueDir, "incoming")
+ self.incomingQueue = IncomingQueue(incomingDir, self.packetHandler)
- if res is not None:
- f, newHandle = mixQueue.openNewMessage()
- cPickle.dump(res, f, 1)
- mixQueue.finishMessage(f, newHandle)
- log.info("Message added to mix queue")
-
- incomingQueue.removeMessage(h)
+ mixDir = os.path.join(queueDir, "mix")
+ # FFFF The choice of mix algorithm should be configurable
+ self.mixQueue = MixQueue(TimedMixQueue(mixDir, 60))
+
+ outgoingDir = os.path.join(queueDir, "outgoing")
+ self.outgoingQueue = OutgoingQueue(outgoingDir)
+
+ self.incomingQueue.connectQueues(mixQueue=self.mixQueue)
+ self.mixQueue.connectQueues(outgoing=self.outgoingQueue,
+ manager=self.moduleManager)
+ self.outgoingQueue.connectQueues(server=self.mmtpConnection)
+ self.mmtpConnection.connectQueues(incoming=self.incomingQueue,
+ outgoing=self.outgoingQueue)
- if time.time() > nextMixTime:
- nextMixTime = time.time() + mixInterval
+ def run(self):
+ now = time.time()
+ nextMix = now + 60 # FFFF Configurable!
+ nextShred = now + 6000
+
+ while 1:
+ while time.time() < nextMix:
+ self.mmtpConnection.process(1)
+ self.incomingQueue.sendReadyMessages()
- poolSize = mixQueue.count()
- if poolSize > mixPoolMinSize:
- beginSending = {}
- n = min(poolSize-mixPoolMinSize, int(poolSize*mixPoolMaxRate))
- handles = mixQueue.pickRandom(n)
- for h in handles:
- f = mixQueue.openMessage(h)
- type, data = cPickle.load(f)
- f.close()
- if type == 'QUEUE':
- newHandle = mixQueue.moveMessage(h, outgoingQueue)
- ipv4info, payload = data
- beginSending.setdefault(ipv4info,[]).append(
- (newHandle, payload))
- else:
- assert type == 'EXIT'
- # XXXX Use modulemanager for this.
- rt, ri, appKey, payload = data
- if rt == Modules.DROP_TYPE:
- mixQueue.removeMessage(h)
- elif rt == Moddules.MBOX_TYPE:
- mixQueue.moveMessage(h, deliverMBOXQueue)
- else:
- # XXXX Shouldn't we drop stuff as early as possible,
- # XXXX so that we don't wind up with only a single
- # XXXX message sent out of the server?
- log.warn("Dropping unhandled type 0x%04x",rt)
- mixQueue.removeMessage(h)
+ self.mixQueue.mix()
+ self.outgoingQueue.sendReadyMessages()
+ self.moduleManager.sendReadyMessages()
+
+ now = time.time()
+ nextMix = now + 60
+ if now > nextShred:
+ # Configurable shred interval
+ self.incomingQueue.cleanQueue()
+ self.mixQueue.queue.cleanQueue()
+ self.outgoingQueue.cleanQueue()
+ self.moduleManager.cleanQueues()
+ nextShred = now + 6000
- if beginSending:
- # XXXX Handle timeouts; handle if we're already sending
- # XXXX to a server.
- for addr, messages in beginSending.items():
- handles, payloads = zip(*messages)
- server.sendMessages(addr.ip, addr.port, addr.keyinfo,
- payloads, handles)
-
- # XXXX Retry and resend after failure.
- # XXXX Scan for messages we should begin sending after restart.
# XXXX Remove long-undeliverable messages
- # XXXX Deliver MBOX messages.
+
Index: __init__.py
===================================================================
RCS file: /home/minion/cvsroot/src/minion/lib/mixminion/__init__.py,v
retrieving revision 1.4
retrieving revision 1.5
diff -u -d -r1.4 -r1.5
--- __init__.py 5 Jul 2002 19:50:27 -0000 1.4
+++ __init__.py 19 Aug 2002 15:33:56 -0000 1.5
@@ -23,4 +23,3 @@
import mixminion.Packet
import mixminion.ServerInfo
-
Index: benchmark.py
===================================================================
RCS file: /home/minion/cvsroot/src/minion/lib/mixminion/benchmark.py,v
retrieving revision 1.7
retrieving revision 1.8
diff -u -d -r1.7 -r1.8
--- benchmark.py 25 Jun 2002 11:41:08 -0000 1.7
+++ benchmark.py 19 Aug 2002 15:33:56 -0000 1.8
@@ -12,6 +12,7 @@
"""
from time import time
+from mixminion.test import mix_mktemp
__pychecker__ = 'no-funcdoc no-reimport'
__all__ = [ 'timeAll', 'testLeaks1', 'testLeaks2' ]
@@ -94,6 +95,7 @@
def cryptoTiming():
print "#==================== CRYPTO ======================="
+
print "SHA1 (short)", timeit((lambda: sha1(short)), 100000)
print "SHA1 (64b)", timeit((lambda: sha1(s64b)), 100000)
print "SHA1 (2K)", timeit((lambda: sha1(s2K)), 10000)
@@ -153,6 +155,15 @@
print "aesprng.getInt (513)", \
timeit((lambda c=c: c.getInt(513)), 10000)
+ L10 = [ "x" ] * 10
+ L1000 = [ "x" ] * 1000
+ print "aesprng.shuffle (10/10)", \
+ timeit((lambda c=c,L=L10: c.shuffle(L)), 1000)
+ print "aesprng.shuffle (1000/1000)", \
+ timeit((lambda c=c,L=L1000: c.shuffle(L)), 30)
+ print "aesprng.shuffle (10/1000)", \
+ timeit((lambda c=c,L=L1000: c.shuffle(L,10)), 1000)
+
lkey = Keyset("keymaterial foo bar baz").getLionessKeys("T")
print "lioness E (1K)", timeit((
lambda lkey=lkey: lioness_encrypt(s1K, lkey)), 1000)
@@ -203,6 +214,7 @@
for (bits,it) in ((2048,10),(4096,10)):
t = time()
+ print "[generating key...]"
rsa2 = pk_generate(bits)
t = time()-t
print "RSA genrate (%d bit)"%bits, timestr(t)
@@ -216,14 +228,13 @@
print "Timing overhead: %s...%s" % (timestr(min(o)),timestr(max(o)))
#----------------------------------------------------------------------
-import tempfile
import os
import stat
def hashlogTiming():
print "#==================== HASH LOGS ======================="
for load in (100, 1000, 10000, 100000):
- fname = tempfile.mktemp(".db")
+ fname = mix_mktemp(".db")
try:
_hashlogTiming(fname,load)
finally:
@@ -278,16 +289,16 @@
#----------------------------------------------------------------------
from mixminion.BuildMessage import _buildHeader, buildForwardMessage
-from mixminion.ServerInfo import ServerInfo
+from mixminion.test import FakeServerInfo
def buildMessageTiming():
print "#================= BUILD MESSAGE ====================="
pk = pk_generate()
payload = ("Junky qoph flags vext crwd zimb."*1024)[:22*1024]
- serverinfo = [ServerInfo("127.0.0.1", 48099, pk_get_modulus(pk),"x"*20)
+ serverinfo = [FakeServerInfo("127.0.0.1", 48099, pk,"x"*20)
] * 16
-
+
def bh(np,it, serverinfo=serverinfo):
ctr = AESCounterPRNG()
@@ -330,8 +341,7 @@
print "#================= SERVER PROCESS ====================="
pk = pk_generate()
- n = pk_get_modulus(pk)
- server = ServerInfo("127.0.0.1", 1, n, "X"*20)
+ server = FakeServerInfo("127.0.0.1", 1, pk, "X"*20)
sp = PacketHandler(pk, DummyLog())
m_noswap = buildForwardMessage("Hello world", SMTP_TYPE, "f@invalid",
@@ -409,8 +419,7 @@
lambda s=s2K,k=lionesskey: lioness_encrypt(s,k)),1000)
prng_128b = timeit_((lambda k=aeskey: prng(k,128)),10000)
- n = pk_get_modulus(pk)
- server = ServerInfo("127.0.0.1", 1, n, "X"*20)
+ server = FakeServerInfo("127.0.0.1", 1, pk, "X"*20)
sp = PacketHandler(pk, DummyLog())
m_noswap = buildForwardMessage("Hello world", SMTP_TYPE, "f@invalid",
@@ -444,7 +453,7 @@
def fileOpsTiming():
print "#================= File ops ====================="
installSignalHandlers(child=1,hup=0,term=0)
- dname = tempfile.mktemp(".d")
+ dname = mix_mktemp(".d")
try:
os.mkdir(dname)
@@ -487,55 +496,56 @@
keytxt="a"*16
key = _ml.aes_key(keytxt)
while 1:
- if 1:
- _ml.sha1(s20k)
- _ml.aes_ctr128_crypt(key,s20k,0)
- _ml.aes_ctr128_crypt(key,s20k,2000)
- _ml.aes_ctr128_crypt(key,"",2000,20000)
- _ml.aes_ctr128_crypt(key,"",0,20000)
- _ml.aes_ctr128_crypt(key,s20k,0,2000)
- try:
- _ml.aes_ctr128_crypt("abc",s20k,0,2000)
- except:
- pass
- _ml.strxor(s20k,s20k)
- try:
- _ml.strxor(s20k,keytxt)
- except:
- pass
- _ml.openssl_seed(s20k)
- r = _ml.add_oaep_padding("Hello",OAEP_PARAMETER,128)
- _ml.check_oaep_padding(r,OAEP_PARAMETER,128)
- try:
- _ml.check_oaep_padding("hello",OAEP_PARAMETER,128)
- except:
- pass
- try:
- _ml.add_oaep_padding(s20k,OAEP_PARAMETER,128)
- except:
- pass
- try:
- _ml.add_oaep_padding("a"*127,OAEP_PARAMETER,128)
- except:
- pass
+ _ml.sha1(s20k)
+ _ml.aes_ctr128_crypt(key,s20k,0)
+ _ml.aes_ctr128_crypt(key,s20k,2000)
+ _ml.aes_ctr128_crypt(key,"",2000,20000)
+ _ml.aes_ctr128_crypt(key,"",0,20000)
+ _ml.aes_ctr128_crypt(key,s20k,0,2000)
+ try:
+ _ml.aes_ctr128_crypt("abc",s20k,0,2000)
+ except:
+ pass
+ _ml.strxor(s20k,s20k)
+ try:
+ _ml.strxor(s20k,keytxt)
+ except:
+ pass
+ _ml.openssl_seed(s20k)
+ r = _ml.add_oaep_padding("Hello",OAEP_PARAMETER,128)
+ _ml.check_oaep_padding(r,OAEP_PARAMETER,128)
+ try:
+ _ml.check_oaep_padding("hello",OAEP_PARAMETER,128)
+ except:
+ pass
+ try:
+ _ml.add_oaep_padding(s20k,OAEP_PARAMETER,128)
+ except:
+ pass
+ try:
+ _ml.add_oaep_padding("a"*127,OAEP_PARAMETER,128)
+ except:
+ pass
def testLeaks2():
print "Trying to leak (rsa)"
s20 = "a"*20
p = pk_generate(512)
- n,e = _ml.rsa_get_public_key(p)
+ n,e = p.get_public_key()
while 1:
if 1:
p = pk_generate(512)
pk_decrypt(pk_encrypt(s20,p),p)
for public in (0,1):
- x = _ml.rsa_encode_key(p,public)
+ x = p.encode_key(public)
_ml.rsa_decode_key(x,public)
- _ml.rsa_get_public_key(p)
+ p.get_public_key()
_ml.rsa_make_public_key(n,e)
+ # XXXX rest of rsa functionality
+
#----------------------------------------------------------------------
def timeAll():
@@ -550,3 +560,4 @@
timeAll()
#testLeaks1()
#testLeaks2()
+
Index: test.py
===================================================================
RCS file: /home/minion/cvsroot/src/minion/lib/mixminion/test.py,v
retrieving revision 1.18
retrieving revision 1.19
diff -u -d -r1.18 -r1.19
--- test.py 12 Aug 2002 21:05:50 -0000 1.18
+++ test.py 19 Aug 2002 15:33:56 -0000 1.19
@@ -17,11 +17,11 @@
import sys
import threading
import time
-import atexit
-import tempfile
import types
import re
import binascii
+import stat
+import cPickle
from mixminion.Common import MixError, MixFatalError, MixProtocolError, getLog
@@ -30,6 +30,69 @@
except ImportError:
import mixminion._unittest as unittest
+# Test for acceptable permissions and uid on directory?
+_MM_TESTING_TEMPDIR_PARANOIA = 1
+# Holds
+_MM_TESTING_TEMPDIR = None
+_MM_TESTING_TEMPDIR_COUNTER = 0
+_MM_TESTING_TEMPDIR_REMOVE_ON_EXIT = 1
+def mix_mktemp(extra=""):
+ '''mktemp wrapper. puts all files under a securely mktemped
+ directory.'''
+ global _MM_TESTING_TEMPDIR
+ global _MM_TESTING_TEMPDIR_COUNTER
+ if _MM_TESTING_TEMPDIR is None:
+ import tempfile
+ temp = tempfile.mktemp()
+ paranoia = _MM_TESTING_TEMPDIR_PARANOIA
+ if paranoia and os.path.exists(temp):
+ print "I think somebody's trying to exploit mktemp."
+ sys.exit(1)
+ try:
+ os.mkdir(temp, 0700)
+ except OSError, e:
+ print "Something's up with mktemp: %s" % e
+ sys.exit(1)
+ if not os.path.exists(temp):
+ print "Couldn't create temp dir %r" %temp
+ sys.exit(1)
+ st = os.stat(temp)
+ if paranoia and st[stat.ST_MODE] & 077:
+ print "Couldn't create temp dir %r with secure permissions" %temp
+ sys.exit(1)
+ if paranoia and st[stat.ST_UID] != os.getuid():
+ print "The wrong user owns temp dir %r"%temp
+ sys.exit(1)
+ # XXXX If we're on a bad system, the permissions on /tmp
+ # XXXX might be faulty. We don't bother checking for that.
+ _MM_TESTING_TEMPDIR = temp
+ if _MM_TESTING_TEMPDIR_REMOVE_ON_EXIT:
+ import atexit
+ atexit.register(deltree, temp)
+
+ _MM_TESTING_TEMPDIR_COUNTER += 1
+ return os.path.join(_MM_TESTING_TEMPDIR,
+ "tmp%05d%s" % (_MM_TESTING_TEMPDIR_COUNTER,extra))
+
+_WAIT_FOR_KIDS = 1
+def deltree(*dirs):
+ global _WAIT_FOR_KIDS
+ if _WAIT_FOR_KIDS:
+ print "Waiting for shred processes to finish."
+ waitForChildren()
+ _WAIT_FOR_KIDS = 0
+ for d in dirs:
+ if os.path.isdir(d):
+ for fn in os.listdir(d):
+ loc = os.path.join(d,fn)
+ if os.path.isdir(loc):
+ deltree(loc)
+ else:
+ os.unlink(loc)
+ os.rmdir(d)
+ elif os.path.exists(d):
+ os.unlink(d)
+
def hexread(s):
assert (len(s) % 2) == 0
r = []
@@ -42,35 +105,6 @@
r.append(chr(c))
return "".join(r)
-def try_unlink(fnames):
- if isinstance(fnames, types.StringType):
- fnames = [fnames]
- for fname in fnames:
- try:
- if os.path.isdir(fname):
- try_unlink([os.path.join(fname,f) for f in os.listdir(fname)])
- os.rmdir(fname)
- else:
- os.unlink(fname)
- except OSError:
- pass
-
-def try_unlink_db(fname):
- '''Try to unlink an anydbm file(s)'''
- for suffix in ("", ".bak", ".dat", ".dir"):
- try_unlink(fname+suffix)
-
-_unlink_on_exit_list = []
-
-def unlink_db_on_exit(fname):
- for suffix in ("", ".bak", ".dat", ".dir"):
- _unlink_on_exit_list.append(fname+suffix)
-
-def unlink_on_exit(*files):
- _unlink_on_exit_list.extend(files)
-
-atexit.register(try_unlink, _unlink_on_exit_list)
-
def floatEq(f1,f2):
return abs(f1-f2) < .00001
@@ -199,7 +233,7 @@
check,x[:-1]+ch,"B",128)
def test_rsa(self):
- p = _ml.rsa_generate(1024, 65535)
+ p = _ml.rsa_generate(1024, 65537)
#for all of SIGN, CHECK_SIG, ENCRYPT, DECRYPT...
for pub1 in (0,1):
@@ -237,7 +271,7 @@
n,e = p.get_public_key()
p2 = _ml.rsa_make_public_key(n,e)
self.assertEquals((n,e), p2.get_public_key())
- self.assertEquals(65535,e)
+ self.assertEquals(65537,e)
self.assertEquals(p.encode_key(1), p.encode_key(1))
# Try private-key ops with public key p3.
@@ -253,11 +287,10 @@
self.failUnlessRaises(TypeError, p2.encode_key, 0)
self.failUnlessRaises(TypeError, p3.encode_key, 0)
- tf = tempfile.mktemp()
+ tf = mix_mktemp()
tf_pub = tf + "1"
tf_prv = tf + "2"
tf_enc = tf + "3"
- unlink_on_exit(tf_pub, tf_prv, tf_enc)
p.PEM_write_key(open(tf_pub,'w'), 1)
p.PEM_write_key(open(tf_prv,'w'), 0)
@@ -411,11 +444,12 @@
self.failUnless(0 <= PRNG.getInt(i) < i)
for i in xrange(100):
self.failUnless(0 <= PRNG.getFloat() < 1)
- self.failUnless(0 <= PRNG.getFloat(4) < 1)
- self.failUnless(0 <= PRNG.getFloat(5) < 1)
- # Make sure shuffle only shuffles the first n.
lst = range(100)
+ # Test shuffle(0)
+ self.assertEquals(PRNG.shuffle(lst,0), [])
+ self.assertEquals(lst, range(100))
+ # Make sure shuffle only shuffles the last n.
PRNG.shuffle(lst,10)
later = [ item for item in lst[10:] if item >= 10 ]
s = later[:]
@@ -432,6 +466,14 @@
for z in crossSection:
if z != crossSection[0]: allEq = 0
self.failIf(allEq)
+ foundUnmoved = 0
+ for lst in lists:
+ for inorder, shuffled in zip(lst, range(100)):
+ if inorder == shuffled:
+ foundUnmoved = 1
+ break
+ if foundUnmoved: break
+ self.failUnless(foundUnmoved)
for lst in lists:
s = lst[:]
s.sort()
@@ -589,8 +631,7 @@
class HashLogTests(unittest.TestCase):
def test_hashlog(self):
- fname = tempfile.mktemp(".db")
- unlink_db_on_exit(fname)
+ fname = mix_mktemp(".db")
h = [HashLog(fname, "Xyzzy")]
@@ -1018,12 +1059,10 @@
class PacketHandlerTests(unittest.TestCase):
def setUp(self):
from mixminion.PacketHandler import PacketHandler
- from tempfile import mktemp
self.pk1 = BMTSupport.pk1
self.pk2 = BMTSupport.pk2
self.pk3 = BMTSupport.pk3
- self.tmpfile = mktemp(".db")
- unlink_db_on_exit(self.tmpfile)
+ self.tmpfile = mix_mktemp(".db")
h = self.hlog = HashLog(self.tmpfile, "Z"*20)
self.server1 = FakeServerInfo("127.0.0.1", 1, self.pk1, "X"*20)
@@ -1231,29 +1270,22 @@
#----------------------------------------------------------------------
# QUEUE
-import stat
from mixminion.Common import waitForChildren
-from mixminion.Queue import Queue
+from mixminion.Queue import *
-def removeTempDirs(*dirs):
- print "Waiting for shred processes to finish."
- waitForChildren()
- for d in dirs:
- if os.path.isdir(d):
- for fn in os.listdir(d):
- os.unlink(os.path.join(d,fn))
- os.rmdir(d)
- elif os.path.exists(d):
- os.unlink(d)
+class TestDeliveryQueue(DeliveryQueue):
+ def __init__(self,d):
+ DeliveryQueue.__init__(self,d)
+ self._msgs = None
+ def deliverMessages(self, msgList):
+ self._msgs = msgList
class QueueTests(unittest.TestCase):
def setUp(self):
- import tempfile
mixminion.Common.installSignalHandlers(child=1,hup=0,term=0)
- self.d1 = tempfile.mktemp("q1")
- self.d2 = tempfile.mktemp("q2")
- self.d3 = tempfile.mktemp("q3")
- atexit.register(removeTempDirs, self.d1, self.d2, self.d3)
+ self.d1 = mix_mktemp("q1")
+ self.d2 = mix_mktemp("q2")
+ self.d3 = mix_mktemp("q3")
def testCreateQueue(self):
# Nonexistant dir.
@@ -1369,11 +1401,104 @@
self.assertEquals(queue1.count(), 41)
self.assert_(not os.path.exists(os.path.join(self.d2, "msg_"+h)))
+ # Test object functionality
+ obj = [ ("A pair of strings", "in a tuple in a list") ]
+ h1 = queue1.queueObject(obj)
+ h2 = queue1.queueObject(6060842)
+ self.assertEquals(obj, queue1.getObject(h1))
+ self.assertEquals(6060842, queue1.getObject(h2))
+ self.assertEquals(obj, cPickle.loads(queue1.messageContents(h1)))
+
# Scrub both queues.
queue1.removeAll()
queue2.removeAll()
queue1.cleanQueue()
queue2.cleanQueue()
+
+ def testDeliveryQueues(self):
+ d_d = mix_mktemp("qd")
+
+ queue = TestDeliveryQueue(d_d)
+
+ h1 = queue.queueMessage("Address 1", "Message 1")
+ h2 = queue.queueMessage("Address 2", "Message 2")
+ self.assertEquals((0, "Address 1", "Message 1"), queue.get(h1))
+ queue.sendReadyMessages()
+ msgs = queue._msgs
+ self.assertEquals(2, len(msgs))
+ self.failUnless((h1, "Address 1", "Message 1", 0) in msgs)
+ self.failUnless((h2, "Address 2", "Message 2", 0) in msgs)
+ h3 = queue.queueMessage("Address 3", "Message 3")
+ queue.deliverySucceeded(h1)
+ queue.sendReadyMessages()
+ msgs = queue._msgs
+ self.assertEquals([(h3, "Address 3", "Message 3", 0)], msgs)
+
+ allHandles = queue.getAllMessages()
+ allHandles.sort()
+ exHandles = [h2,h3]
+ exHandles.sort()
+ self.assertEquals(exHandles, allHandles)
+ queue.deliveryFailed(h2, retriable=1)
+ queue.deliveryFailed(h3, retriable=0)
+
+ allHandles = queue.getAllMessages()
+ h4 = allHandles[0]
+ self.assertEquals([h4], queue.getAllMessages())
+ queue.sendReadyMessages()
+ msgs = queue._msgs
+ self.assertEquals([(h4, "Address 2", "Message 2", 1)], msgs)
+ self.assertNotEquals(h2, h4)
+
+ queue.removeAll()
+ queue.cleanQueue()
+
+ def testDeliveryQueues(self):
+ d_m = mix_mktemp("qm")
+ queue = TimedMixQueue(d_m)
+ h1 = queue.queueMessage("Hello1")
+ h2 = queue.queueMessage("Hello2")
+ h3 = queue.queueMessage("Hello3")
+ b = queue.getBatch()
+ msgs = [h1,h2,h3]
+ msgs.sort()
+ b.sort()
+ self.assertEquals(msgs,b)
+
+ cmq = CottrellMixQueue(d_m, 600, 6, .5)
+ # Not enough messages
+ self.assertEquals([], cmq.getBatch())
+ self.assertEquals([], cmq.getBatch())
+ # 8 messages: 2 get sent
+ for i in range(5):
+ cmq.queueMessage("Message %s"%i)
+
+ b1, b2, b3 = cmq.getBatch(), cmq.getBatch(), cmq.getBatch()
+ self.assertEquals(2, len(b1))
+ self.assertEquals(2, len(b2))
+ self.assertEquals(2, len(b3))
+ allEq = 1
+ for x in xrange(13): #fails <one in a trillion
+ b = cmq.getBatch()
+ if b != b1:
+ allEq = 0; break
+ self.failIf(allEq)
+ # Don't send more than 3.
+ for x in xrange(100):
+ cmq.queueMessage("Hello2 %s"%x)
+ for x in xrange(10):
+ self.assertEquals(3, len(cmq.getBatch()))
+
+ bcmq = BinomialCottrellMixQueue(d_m, 600, 6, .5)
+ allThree = 1
+ for i in range(10):
+ b = bcmq.getBatch()
+ if not len(b)==3:
+ allThree = 0
+ self.failIf(allThree)
+
+ bcmq.removeAll()
+ bcmq.cleanQueue()
#---------------------------------------------------------------------
# LOGGING
@@ -1397,9 +1522,9 @@
self.failUnless(buf.getvalue().endswith(
"[ERROR] All your anonymity are belong to us\n"))
- t = tempfile.mktemp("log")
+ t = mix_mktemp("log")
t1 = t+"1"
- unlink_on_exit(t, t1)
+
log.addHandler(_FileLogHandler(t))
log.info("Abc")
log.info("Def")
@@ -1413,11 +1538,11 @@
#----------------------------------------------------------------------
# SIGHANDLERS
-# XXXX
+# FFFF Write tests here
#----------------------------------------------------------------------
# MMTP
-# XXXX Write more tests
+# FFFF Write more tests
import mixminion.MMTPServer
import mixminion.MMTPClient
@@ -1432,7 +1557,7 @@
global certfile
if isServer:
if dhfile is None:
- f = tempfile.mktemp()
+ f = mix_mktemp()
dhfile = f+"_dh"
pkfile = f+"_pk"
certfile = f+"_cert"
@@ -1448,13 +1573,11 @@
print "[Generating DH parameters (not caching)...",
sys.stdout.flush()
_ml.generate_dh_parameters(dhfile, 0)
- unlink_on_exit(dhfile)
print "done.]"
- pk = _ml.rsa_generate(1024, 65535)
+ pk = _ml.rsa_generate(1024, 65537)
pk.PEM_write_key(open(pkfile, 'w'), 0)
_ml.generate_cert(certfile, pk, "Testing certificate",
time.time(), time.time()+365*24*60*60)
- unlink_on_exit(certfile, pkfile)
pk = _ml.rsa_PEM_read_key(open(pkfile, 'r'), 0)
return _ml.TLSContext_new(certfile, pk, dhfile)
@@ -1583,7 +1706,7 @@
server.process(0.1)
t.join()
finally:
- getLog().setMinSeverity(severity) #unsuppress
+ getLog().setMinSeverity(severity) #unsuppress warning
#----------------------------------------------------------------------
# Config files
@@ -1669,8 +1792,7 @@
"Quz: 88 88\n\n[Sec3]\nIntAS: 9\nIntASD: 10\nIntAMD: 8\n"+
"IntAMD: 10\nIntRS: 5\n\n"))
# Test file input
- fn = tempfile.mktemp()
- unlink_on_exit(fn)
+ fn = mix_mktemp()
file = open(fn, 'w')
file.write(longerString)
@@ -1765,7 +1887,7 @@
self.assertEquals(pa("192.168.0.1",0),
("192.168.0.1", "255.255.255.255", 0, 65535))
- # XXXX Won't work on Windows.
+ # XXXX This won't work on Windows.
self.assertEquals(C._parseCommand("ls -l"), ("/bin/ls", ['-l']))
self.assertEquals(C._parseCommand("rm"), ("/bin/rm", []))
self.assertEquals(C._parseCommand("/bin/ls"), ("/bin/ls", []))
@@ -1821,7 +1943,7 @@
cmd, opts = C._parseCommand(nonexistcmd)
if os.path.exists(cmd):
# Ok, I guess they would.
- self.failUnlessEquals(opts, ["-meow"])
+ self.assertEquals(opts, ["-meow"])
else:
self.fail("_parseCommand is not working as expected")
except ConfigError, e:
@@ -1866,19 +1988,25 @@
Mode: relay
"""
+_IDENTITY_KEY = None
+def _getIdentityKey():
+ global _IDENTITY_KEY
+ if _IDENTITY_KEY is None:
+ _IDENTITY_KEY = mixminion.Crypto.pk_generate(2048)
+ return _IDENTITY_KEY
import mixminion.Config
import mixminion.ServerInfo
class ServerInfoTests(unittest.TestCase):
def testServerInfoGen(self):
- d = tempfile.mktemp()
+ identity = _getIdentityKey()
+ d = mix_mktemp()
conf = mixminion.Config.ServerConfig(string=SERVER_CONFIG)
- identity = mixminion.Crypto.pk_generate(2048)
if not os.path.exists(d):
os.mkdir(d, 0700)
- unlink_on_exit(d)
+
inf = mixminion.ServerInfo.generateServerDescriptorAndKeys(conf,
- identity,
+ identity,
d,
"key1",
d)
@@ -1913,7 +2041,7 @@
# Now make sure everything was saved properly
keydir = os.path.join(d, "key_key1")
eq(inf, open(os.path.join(keydir, "ServerDesc")).read())
- keys = mixminion.ServerInfo.ServerKeys(d, "key1", d)
+ keys = mixminion.ServerInfo.ServerKeyset(d, "key1", d)
packetKey = mixminion.Crypto.pk_PEM_load(
os.path.join(keydir, "mix.key"))
eq(packetKey.get_public_key(),
@@ -1958,18 +2086,169 @@
mixminion.ServerInfo.ServerInfo,
None, badSig)
-
-
+#----------------------------------------------------------------------
+# Modules annd ModuleManager
+from mixminion.Modules import *
+
+# test of an example module that we load dynamically from
+EXAMPLE_MODULE_TEXT = \
+"""
+import mixminion.Modules
+from mixminion.Config import ConfigError
+
+class TestModule(mixminion.Modules.DeliveryModule):
+ def __init__(self):
+ self.processedMessages = []
+ def getName(self):
+ return "TestModule"
+ def getConfigSyntax(self):
+ return { "Example" : { "Foo" : ("REQUIRE",
+ mixminion.Config._parseInt, None) } }
+ def validateConfig(self, cfg, entries, lines, contents):
+ if cfg['Example'] is not None:
+ if cfg['Example'].get('Foo',1) % 2 == 0:
+ raise ConfigError("Foo was even")
+ def configure(self,cfg, manager):
+ if cfg['Example']:
+ self.enabled = 1
+ self.foo = cfg['Example'].get('Foo',1)
+ manager.enableModule(self)
+ else:
+ self.foo = None
+ self.enabled = 0
+ def getServerInfoBlock(self):
+ if self.enabled:
+ return "[Example]\\nFoo: %s\\n" % self.foo
+ else:
+ return None
+ def getExitTypes(self):
+ return (1234,)
+ def processMessage(self, message, exitType, exitInfo):
+ self.processedMessages.append(message)
+ if exitInfo == 'fail?':
+ return mixminion.Modules.DELIVER_FAIL_RETRY
+ elif exitInfo == 'fail!':
+ return mixminion.Modules.DELIVER_FAIL_NORETRY
+ else:
+ return mixminion.Modules.DELIVER_OK
+"""
+
+class ModuleManagerTests(unittest.TestCase):
+ def testModuleManager(self):
+ mod_dir = mix_mktemp()
+ home_dir = mix_mktemp()
+
+ os.mkdir(mod_dir, 0700)
+ f = open(os.path.join(mod_dir, "ExampleMod.py"), 'w')
+ f.write(EXAMPLE_MODULE_TEXT)
+ f.close()
+
+ cfg_test = SERVER_CONFIG_SHORT + """
+Homedir = %s
+ModulePath = %s
+Module ExampleMod.TestModule
+[Example]
+Foo: 99
+""" % (home_dir, mod_dir)
+
+ conf = mixminion.Config.ServerConfig(string=cfg_test)
+ manager = conf.getModuleManager()
+ exampleMod = None
+ for m in manager.modules:
+ if m.getName() == "TestModule":
+ exampleMod = m
+ self.failUnless(exampleMod is not None)
+ manager.configure(conf)
+
+ self.assertEquals(99, exampleMod.foo)
+ conf = mixminion.Config.ServerConfig(string=cfg_test)
+ manager = conf.getModuleManager()
+ exampleMod = None
+ for m in manager.modules:
+ if m.getName() == "TestModule":
+ exampleMod = m
+ self.failUnless(exampleMod is not None)
+
+ manager.configure(conf)
+ self.failUnless(exampleMod is manager.typeToModule[1234])
+
+ manager.queueMessage("Hello 1", 1234, "fail!")
+ manager.queueMessage("Hello 2", 1234, "fail?")
+ manager.queueMessage("Hello 3", 1234, "good")
+ manager.queueMessage("Drop very much",
+ mixminion.Modules.DROP_TYPE, "")
+ queue = manager.queues['TestModule']
+ self.failUnless(isinstance(queue,
+ mixminion.Modules._SimpleModuleDeliveryQueue))
+ self.assertEquals(3, queue.count())
+ self.assertEquals(exampleMod.processedMessages, [])
+ try:
+ severity = getLog().getMinSeverity()
+ getLog().setMinSeverity("FATAL") #suppress warning
+ manager.sendReadyMessages()
+ finally:
+ getLog().setMinSeverity(severity) #unsuppress warning
+ self.assertEquals(1, queue.count())
+ self.assertEquals(3, len(exampleMod.processedMessages))
+ manager.sendReadyMessages()
+ self.assertEquals(1, queue.count())
+ self.assertEquals(4, len(exampleMod.processedMessages))
+ self.assertEquals("Hello 2", exampleMod.processedMessages[-1])
+
+ # Check serverinfo generation.
+ try:
+ severity = getLog().getMinSeverity()
+ getLog().setMinSeverity("ERROR")
+ info = mixminion.ServerInfo.generateServerDescriptorAndKeys(
+ conf, _getIdentityKey(), home_dir, "key11", home_dir)
+ self.failUnless(info.find("\n[Example]\nFoo: 99\n") >= 0)
+ finally:
+ getLog().setMinSeverity(severity) #unsuppress warning
+
+ #
+ # Try again, this time with the test module disabled.
+ #
+ cfg_test = SERVER_CONFIG_SHORT + """
+Homedir = %s
+ModulePath = %s
+Module ExampleMod.TestModule
+""" % (home_dir, mod_dir)
+
+ conf = mixminion.Config.ServerConfig(string=cfg_test)
+ manager = conf.getModuleManager()
+ exampleMod = None
+ for m in manager.modules:
+ if m.getName() == "TestModule":
+ exampleMod = m
+ self.failUnless(exampleMod is not None)
+ manager.configure(conf)
+
+ self.failIf(exampleMod is manager.typeToModule.get(1234))
+
+ # Failing validation
+ cfg_test = SERVER_CONFIG_SHORT + """
+Homedir = %s
+ModulePath = %s
+Module ExampleMod.TestModule
+[Example]
+Foo: 100
+""" % (home_dir, mod_dir)
+
+ # FFFF Add tests for catching exceptions from buggy modules
+
+#----------------------------------------------------------------------
def testSuite():
suite = unittest.TestSuite()
loader = unittest.TestLoader()
tc = loader.loadTestsFromTestCase
+
suite.addTest(tc(MinionlibCryptoTests))
suite.addTest(tc(CryptoTests))
suite.addTest(tc(FormatTests))
suite.addTest(tc(LogTests))
suite.addTest(tc(ConfigFileTests))
suite.addTest(tc(ServerInfoTests))
+ suite.addTest(tc(ModuleManagerTests))
suite.addTest(tc(HashLogTests))
suite.addTest(tc(BuildMessageTests))
suite.addTest(tc(PacketHandlerTests))