[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]
[minion-cvs] More work on server code. Clean up some unnecessary co...
Update of /home/minion/cvsroot/src/minion/lib/mixminion
In directory moria.seul.org:/tmp/cvs-serv15299/lib/mixminion
Modified Files:
Common.py Config.py Crypto.py MMTPServer.py Packet.py Queue.py
ServerMain.py test.py
Log Message:
More work on server code. Clean up some unnecessary complications in config system
Index: Common.py
===================================================================
RCS file: /home/minion/cvsroot/src/minion/lib/mixminion/Common.py,v
retrieving revision 1.10
retrieving revision 1.11
diff -u -d -r1.10 -r1.11
--- Common.py 26 Jul 2002 15:47:20 -0000 1.10
+++ Common.py 11 Aug 2002 07:50:34 -0000 1.11
@@ -57,11 +57,11 @@
_SHRED_CMD = "---"
_SHRED_OPTS = None
-def _shredConfigHook():
+def configureShredCommand(conf):
+ """Initialize the secure delete command from a given Config object.
+ If no object is provided, try some sane defaults."""
global _SHRED_CMD
global _SHRED_OPTS
- import mixminion.Config as Config
- conf = Config.getConfig()
cmd, opts = None, None
if conf is not None:
val = conf['Host'].get('ShredCommand', None)
@@ -98,9 +98,7 @@
XXXX let's try to avoid that, to be on the safe side.
"""
if _SHRED_CMD == "---":
- import mixminion.Config as Config
- _shredConfigHook()
- Config.addHook(_shredConfigHook)
+ configureShredCommand(None)
if fnames == []:
return
@@ -125,13 +123,17 @@
# I'm trying to make this interface look like a subset of the one in
# the draft PEP-0282 (http://www.python.org/peps/pep-0282.html).
-#XXXX XXXX DOC DOC DOCDOC
-
def _logtime():
- #XXXX Is this guaranteed to work?
+ 'Helper function. Returns current local time formatted for log.'
+
+ # Note: Python strftime is implemented using that platform libc's
+ # strftime, so in theory, this might barf. All of the format
+ # elements below are (I think) standard, so we should be ok.
return time.strftime("%b %d %H:%m:%S")
-class FileLogTarget:
+class _FileLogHandler:
+ """Helper class for logging. Represents a file on disk, and allows the
+ usual close-and-open gimmick for log rotation."""
def __init__(self, fname):
self.file = None
self.fname = fname
@@ -139,14 +141,19 @@
def reset(self):
if self.file is not None:
self.file.close()
- # XXXX Fail sanely. :)
- self.file = open(self.fname, 'a')
+ try:
+ self.file = open(self.fname, 'a')
+ except OSError, e:
+ self.file = None
+ raise MixError("Unable to open log file %r"%self.fname)
def close(self):
self.file.close()
def write(self, severity, message):
+ if self.file is None:
+ return
print >> self.file, "%s [%s] %s" % (_logtime(), severity, message)
-class ConsoleLogTarget:
+class _ConsoleLogHandler:
def __init__(self, file):
self.file = file
def reset(self): pass
@@ -154,7 +161,6 @@
def write(self, severity, message):
print >> self.file, "%s [%s] %s" % (_logtime(), severity, message)
-
_SEVERITIES = { 'TRACE' : -2,
'DEBUG' : -1,
'INFO' : 0,
@@ -164,19 +170,17 @@
'NEVER' : 100}
class Log:
+ """A Log is a set of destinations for system messages, along with the
+ means to filter them to a desired verbosity."""
def __init__(self, minSeverity):
- self.handlers = []
- self.setMinSeverity(minSeverity)
- onReset(self.reset)
- onTerminate(self.close)
+ self.configure(None)
+ self.setMinSeverity(minSeverity)
- def _configure(self):
- import mixminion.Config as Config
- config = Config.getConfig()
+ def configure(self, config):
self.handlers = []
if config == None or not config.has_section("Server"):
self.setMinSeverity("WARN")
- self.addHandler(ConsoleLogTarget(sys.stderr))
+ self.addHandler(_ConsoleLogHandler(sys.stderr))
else:
self.setMinSeverity(config['Server'].get('LogLevel', "WARN"))
logfile = config['Server'].get('LogFile',None)
@@ -184,10 +188,14 @@
homedir = config['Server']['Homedir']
if homedir:
logfile = os.path.join(homedir, "log")
- if not logfile or config['Server'].get('EchoMessages',0):
- self.addHandler(ConsoleLogTarget(sys.stderr))
+ self.addHandler(_ConsoleLogHandler(sys.stderr))
if logfile:
- self.addHandler(FileLogTarget(logfile))
+ try:
+ self.addHandler(_FileLogHandler(logfile))
+ except MixError, e:
+ self.error(str(e))
+ if logfile and not config['Server'].get('EchoMessages',0):
+ del self.handlers[0]
def setMinSeverity(self, minSeverity):
self.severity = _SEVERITIES.get(minSeverity, 1)
@@ -202,9 +210,14 @@
self.handlers.append(handler)
def reset(self):
- # FFFF reload configuration information here?
for h in self.handlers:
- h.reset()
+ try:
+ h.reset()
+ except MixError, e:
+ if len(self.handlers) > 1:
+ self.error(str(e))
+ else:
+ print >>sys.stderr, "Unable to reset log system"
def close(self):
for h in self.handlers:
@@ -230,18 +243,14 @@
def fatal(self, message, *args):
self.log("FATAL", message, *args)
-_theLog = None
-
+_THE_LOG = None
def getLog():
"""Return the MixMinion log object."""
- global _theLog
- if _theLog is None:
- import mixminion.Config as Config
- _theLog = Log('DEBUG')
- _theLog._configure()
- Config.addHook(_theLog._configure)
-
- return _theLog
+ global _THE_LOG
+ if _THE_LOG is None:
+ _THE_LOG = Log('WARN')
+
+ return _THE_LOG
#----------------------------------------------------------------------
# Signal handling
Index: Config.py
===================================================================
RCS file: /home/minion/cvsroot/src/minion/lib/mixminion/Config.py,v
retrieving revision 1.7
retrieving revision 1.8
diff -u -d -r1.7 -r1.8
--- Config.py 6 Aug 2002 16:09:21 -0000 1.7
+++ Config.py 11 Aug 2002 07:50:34 -0000 1.8
@@ -46,7 +46,7 @@
The restricted format is used for server descriptors.
"""
-__all__ = [ 'getConfig', 'loadConfig', 'addHook' ]
+__all__ = [ 'getConfig', 'loadConfig' ]
import os
import re
@@ -80,26 +80,12 @@
assert fname is not None
_theConfiguration = ClientConfig(fname)
- mixminion.Common.onReset(_theConfiguration.reload)
-
def getConfig():
"""Return the configuration object for this process, or None if we haven't
been configured yet."""
return _theConfiguration
#----------------------------------------------------------------------
-_CONFIG_HOOKS = []
-def addHook(hook):
- '''Add 'hook' (a 0-argument function) to the list of configuration
- hooks. Whenever the configuration file is reloaded (as on a
- SIGHUP), it invokes each of the configuration hooks in the
- order it was added to the list.'''
- # This isn't a method of _Config, since we want to be able to call
- # it before we read the configuration file.
- _CONFIG_HOOKS.append(hook)
-
-#----------------------------------------------------------------------
-
class ConfigError(MixError):
"""Thrown when an error is found in a configuration file."""
pass
@@ -233,7 +219,7 @@
cmd, opts = c[0], c[1:]
if os.path.isabs(cmd):
if not os.path.exists(cmd):
- raise ConfigError("File not found: %s" % cmd)
+ raise ConfigError("Executable file not found: %s" % cmd)
else:
return cmd, opts
else:
@@ -300,7 +286,6 @@
(0 <= mm < 60) and (0 <= ss <= 61)):
raise ConfigError("Invalid %s %r" % (("date","time")[_timeMode],s))
-
# we set the DST flag to zero so that subtracting time.timezone always
# gives us gmt.
return time.mktime((yyyy,MM,dd,hh,mm,ss,0,0,0))-time.timezone
@@ -422,7 +407,10 @@
# _sections: A map from secname->key->value.
# _sectionEntries: A map from secname->[ (key, value) ] inorder.
# _sectionNames: An inorder list of secnames.
- # _callbacks: XXXX DOC
+ # _callbacks: A map from section name to a callback function that should
+ # be invoked with (section,sectionEntries) after each section is
+ # read. This shouldn't be used for validation; it's for code that
+ # needs to change the semantics of the parser.
#
# Fields to be set by a subclass:
# _syntax is map from sec->{key:
@@ -483,8 +471,6 @@
f = open(self.fname, 'r')
try:
self.__reload(f)
- for hook in _CONFIG_HOOKS:
- hook()
finally:
f.close()
@@ -604,6 +590,7 @@
self._sectionNames = self_sectionNames
def _addCallback(self, section, cb):
+ """For use by subclasses. Adds a callback for a section"""
if not hasattr(self, '_callbacks'):
self._callbacks = {}
self._callbacks[section] = cb
@@ -701,15 +688,20 @@
}
class ServerConfig(_ConfigFile):
+ ##
+ # Fields:
+ # moduleManager
+ #
_restrictFormat = 0
# XXXX Missing: Queue-Size / Queue config options
# XXXX timeout options
+ # XXXX listen timeout??
def __init__(self, fname=None, string=None):
self._syntax = SERVER_SYNTAX.copy()
import mixminion.Modules
self.moduleManager = mixminion.Modules.ModuleManager()
- self._addCallback("Server", self.loadModules)
+ self._addCallback("Server", self.__loadModules)
_ConfigFile.__init__(self, fname, string)
@@ -717,19 +709,19 @@
#XXXX write this.
self.moduleManager.validate(sections, entries, lines, contents)
- def loadModules(self, section, sectionEntries):
+ def __loadModules(self, section, sectionEntries):
+ """Callback from the [Server] section of a config file. Parses
+ the module options, and adds new sections to the syntax
+ accordingly."""
self.moduleManager.setPath(section.get('ModulePath', None))
for mod in section.get('Module', []):
+ info("Loading module %s", mod)
self.moduleManager.loadExtModule(mod)
self._syntax.update(self.moduleManager.getConfigSyntax())
-
-
## if sections['Server']['PublicKeyLifeTime'][2] < 24*60*60:
## raise ConfigError("PublicKeyLifetime must be at least 1 day.")
## elif sections['Server']['PublicKeyLifeTime'][2] % (24*60*60) > 30:
## getLog().warn("PublicKeyLifetime rounded to the nearest day")
## nDays = sections[60*60*24]
-
-
Index: Crypto.py
===================================================================
RCS file: /home/minion/cvsroot/src/minion/lib/mixminion/Crypto.py,v
retrieving revision 1.11
retrieving revision 1.12
diff -u -d -r1.11 -r1.12
--- Crypto.py 28 Jul 2002 22:42:33 -0000 1.11
+++ Crypto.py 11 Aug 2002 07:50:34 -0000 1.12
@@ -9,6 +9,7 @@
functionality themselves."""
import os
+import sys
import stat
from types import StringType
@@ -35,14 +36,17 @@
# Number of bytes in a SHA1 digest
DIGEST_LEN = 160 >> 3
-def init_crypto():
+def init_crypto(config=None):
"""Initialize the crypto subsystem."""
+ configure_trng(config)
trng(1)
try:
# Try to read /dev/urandom
trng(1)
+ except MixFatalError, e:
+ raise
except:
- raise MixFatalError("Couldn't initialize entropy source")
+ raise MixFatalError("Error initializing entropy source")
openssl_seed(40)
def sha1(s):
@@ -453,40 +457,72 @@
_theSharedPRNG = AESCounterPRNG()
return _theSharedPRNG
+#----------------------------------------------------------------------
+# TRNG implementation
+
+# Here, we pick default files.
+#
+# This is a tricky point. We want a device that gives securely-seeded
+# numbers from a really strong entropy source, but we don't need it to
+# block. On Linux, this is /dev/urandom. On BSD-ish things, this
+# MAY be /dev/srandom (the man page only says that urandom 'is not
+# guaranteed to be secure). On Darwin, neither one seems to block.
+# On commercial Unices, your guess is as good as mine.
+PLATFORM_TRNG_DEFAULTS = {
+ 'darwin' : [ "/dev/urandom", "/dev/random" ],
+ 'linux2' : [ "/dev/urandom" ],
+ '***' : [ "/dev/urandom", "/dev/srandom", "/dev/random" ],
+ }
+
_TRNG_FILENAME = None
-def _trng_set_filename():
+def configure_trng(config):
+ """Initialize the true entropy source from a given Config object. If
+ none is provided, tries some sane defaults."""
global _TRNG_FILENAME
- config = mixminion.Config.getConfig()
if config is not None:
- file = config['Host'].get('EntropySource', "/dev/urandom")
+ requestedFile = config['Host'].get('EntropySource', None)
else:
- file = "/dev/urandom"
+ requestedFile = None
- if not os.path.exists(file):
- getLog().error("No such file as %s", file)
- file = None
- else:
- st = os.stat(file)
- if not (st[stat.ST_MODE] & stat.S_IFCHR):
- getLog().error("Entropy source %s isn't a character device", file)
- file = None
+ defaults = PLATFORM_TRNG_DEFAULTS.get(sys.platform,
+ PLATFORM_TRNG_DEFAULTS['***'])
+ files = [ requestedFile ] + defaults
- if file is None and _TRNG_FILENAME is None:
+ randFile = None
+ for file in files:
+ if file is None:
+ continue
+
+ verbose = 1#(file == requestedFile)
+ if not os.path.exists(file):
+ if verbose:
+ getLog().error("No such file as %s", file)
+ else:
+ st = os.stat(file)
+ if not (st[stat.ST_MODE] & stat.S_IFCHR):
+ if verbose:
+ getLog().error("Entropy source %s isn't a character device",
+ file)
+ else:
+ randFile = file
+ break
+
+ if randFile is None and _TRNG_FILENAME is None:
getLog().fatal("No entropy source available")
raise MixFatalError("No entropy source available")
- elif file is None:
+ elif randFile is None:
getLog().warn("Falling back to previous entropy source %s",
_TRNG_FILENAME)
else:
- _TRNG_FILENAME = file
+ getLog().info("Setting entropy source to %r", randFile)
+ _TRNG_FILENAME = randFile
def _trng_uncached(n):
'''Underlying access to our true entropy source.'''
if _TRNG_FILENAME is None:
- _trng_set_filename()
- mixminion.Config.addHook(_trng_set_filename)
+ configure_trng(None)
- f = open(_TRNG_FILENAME)
+ f = open(_TRNG_FILENAME, 'rb')
d = f.read(n)
f.close()
return d
Index: MMTPServer.py
===================================================================
RCS file: /home/minion/cvsroot/src/minion/lib/mixminion/MMTPServer.py,v
retrieving revision 1.8
retrieving revision 1.9
diff -u -d -r1.8 -r1.9
--- MMTPServer.py 6 Aug 2002 16:09:21 -0000 1.8
+++ MMTPServer.py 11 Aug 2002 07:50:34 -0000 1.9
@@ -323,10 +323,7 @@
while len(out):
r = self.__con.write(out) # may throw
- if r == 0:
- self.shutdown() #XXXX
- return
-
+ assert r > 0
out = out[r:]
self.__outbuf = out
@@ -359,7 +356,6 @@
self.__server.registerReader(self)
except _ml.TLSClosed:
warn("Unexpectedly closed connection")
-
self.__sock.close()
self.__server.unregister(self)
except _ml.TLSError:
@@ -488,9 +484,20 @@
#----------------------------------------------------------------------
+# XXXX retry logic.
class MMTPClientConnection(SimpleTLSConnection):
- def __init__(self, context, ip, port, keyID, messageList,
+ def __init__(self, context, ip, port, keyID, messageList, handleList,
sentCallback=None):
+ """Create a connection to send messages to an MMTP server.
+ ip -- The IP of the destination server.
+ port -- The port to connect to.
+ keyID -- None, or the expected SHA1 hash of the server's public key
+ messageList -- a list of message payloads to send
+ handleList -- a list of objects corresponding to the messages in
+ messageList. Used for callback.
+ sentCallback -- None, or a function of (msg, handle) to be called
+ whenever a message is successfully sent."""
+
trace("CLIENT CON")
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.setblocking(0)
@@ -506,6 +513,7 @@
SimpleTLSConnection.__init__(self, sock, tls, 0)
self.messageList = messageList
+ self.handleList = handleList
self.finished = self.__setupFinished
self.sentCallback = sentCallback
@@ -581,38 +589,44 @@
debug("Received valid ACK for message.")
justSent = self.messageList[0]
+ justSentHandle = self.handleList[0]
del self.messageList[0]
+ del self.handleList[0]
if self.sentCallback is not None:
- self.sentCallback(justSent)
-
+ self.sentCallback(justSent, justSetHandle)
+
self.beginNextMessage()
+LISTEN_BACKLOG = 10 # ???? Is something else more reasonable
class MMTPServer(AsyncServer):
- "XXXX"
+ """A helper class to invoke AsyncServer, MMTPServerConnection, and
+ MMTPClientConnection"""
def __init__(self, config):
self.context = config.getTLSContext(server=1)
self.listener = ListenConnection("127.0.0.1",
- config['Outgoing/MMTP']['Port']
- 10, self._newMMTPConnection)
+ config['Outgoing/MMTP']['Port'],
+ LISTEN_BACKLOG,
+ self._newMMTPConnection)
self.config = config
self.listener.register(self)
def _newMMTPConnection(self, sock):
- "XXXX"
+ """helper method. Creates and registers a new server connection when
+ the listener socket gets a hit."""
# XXXX Check whether incoming IP is valid XXXX
tls = self.context.sock(sock, serverMode=1)
sock.setblocking(0)
con = MMTPServerConnection(sock, tls, self.onMessageReceived)
con.register(self)
- def sendMessages(self, ip, port, keyID, messages):
- con = MMTPClientConnection(ip, port, keyID, messages,
+ def sendMessages(self, ip, port, keyID, messages, handles):
+ """Send a set of messages to a given server."""
+ con = MMTPClientConnection(ip, port, keyID, messages, handles,
self.onMessageSent)
con.register(self)
def onMessageReceived(self, msg):
pass
- def onMessageSent(self, msg):
+ def onMessageSent(self, msg, handle):
pass
-
Index: Packet.py
===================================================================
RCS file: /home/minion/cvsroot/src/minion/lib/mixminion/Packet.py,v
retrieving revision 1.6
retrieving revision 1.7
diff -u -d -r1.6 -r1.7
--- Packet.py 6 Aug 2002 16:09:21 -0000 1.6
+++ Packet.py 11 Aug 2002 07:50:34 -0000 1.7
@@ -324,6 +324,13 @@
"""Return the routing info for this address"""
assert len(self.keyinfo) == DIGEST_LEN
return struct.pack(IPV4_PAT, _packIP(self.ip), self.port, self.keyinfo)
+
+ def __hash__(self):
+ return hash(self.pack())
+
+ def __eq__(self, other):
+ return (type(self) == type(other) and self.ip == other.ip and
+ self.port == other.port and self.keyinfo == other.keyinfo)
def parseSMTPInfo(s):
"""Convert the encoding of an SMTP routinginfo into an SMTPInfo object."""
Index: Queue.py
===================================================================
RCS file: /home/minion/cvsroot/src/minion/lib/mixminion/Queue.py,v
retrieving revision 1.7
retrieving revision 1.8
diff -u -d -r1.7 -r1.8
--- Queue.py 6 Aug 2002 16:09:21 -0000 1.7
+++ Queue.py 11 Aug 2002 07:50:34 -0000 1.8
@@ -56,6 +56,8 @@
# Fields: rng--a random number generator for creating new messages
# and getting a random slice of the queue.
# dir--the location of the queue.
+ # n_entries: the number of complete messages in the queue.
+ # <0 if we haven't counted yet.
def __init__(self, location, create=0, scrub=0):
"""Creates a queue object for a given directory, 'location'. If
'create' is true, creates the directory if necessary. If 'scrub'
@@ -89,6 +91,9 @@
if scrub:
self.cleanQueue(1)
+ # Count messages on first time through.
+ self.n_entries = -1
+
def queueMessage(self, contents):
"""Creates a new message in the queue whose contents are 'contents',
and returns a handle to that message."""
@@ -97,13 +102,17 @@
self.finishMessage(f, handle)
return handle
- def count(self):
+ def count(self, recount=0):
"""Returns the number of complete messages in the queue."""
- res = 0
- for fn in os.listdir(self.dir):
- if fn.startswith("msg_"):
- res += 1
- return res
+ if self.n_entries >= 0 and not recount:
+ return self.n_entries
+ else:
+ res = 0
+ for fn in os.listdir(self.dir):
+ if fn.startswith("msg_"):
+ res += 1
+ self.n_entries = res
+ return res
def pickRandom(self, count=None):
"""Returns a list of 'count' handles to messages in this queue.
@@ -111,7 +120,6 @@
If there are fewer than 'count' messages in the queue, all the
messages will be retained."""
-
messages = [fn for fn in os.listdir(self.dir) if fn.startswith("msg_")]
n = len(messages)
@@ -130,6 +138,11 @@
return [m[4:] for m in messages[:count]]
+ def getAllMessages(self):
+ """Returns handles for all messages currently in the queue.
+ Note: this ordering is not guaranteed to be random"""
+ return [fn[4:] for fn in os.listdir(self.dir) if fn.startswith("msg_")]
+
def removeMessage(self, handle):
"""Given a handle, removes the corresponding message from the queue."""
self.__changeState(handle, "msg", "rmv")
@@ -140,9 +153,7 @@
for m in os.listdir(self.dir):
if m[:4] in ('inp_', 'msg_'):
self.__changeState(m[4:], m[:3], "rmv")
- #removed.append(os.path.join(self.dir, "rmv_"+m[4:]))
- # elif m[:4] == 'rmv_':
- # removed.append(self.dir)
+ self.n_entries = 0
self.cleanQueue()
def moveMessage(self, handle, queue):
@@ -175,7 +186,7 @@
handle = self.__newHandle()
fname = os.path.join(self.dir, "inp_"+handle)
fd = os.open(fname, _NEW_MESSAGE_MODE, 0600)
- return os.fdopen(fd, 'w'), handle
+ return os.fdopen(fd, 'wb'), handle
def finishMessage(self, f, handle):
"""Given a file and a corresponding handle, closes the file
@@ -227,9 +238,15 @@
def __changeState(self, handle, s1, s2):
"""Helper method: changes the state of message 'handle' from 's1'
- to 's2'."""
+ to 's2', and changes the internal count."""
os.rename(os.path.join(self.dir, s1+"_"+handle),
os.path.join(self.dir, s2+"_"+handle))
+ if self.n_entries < 0:
+ return
+ if s1 == 'msg' and s2 != 'msg':
+ self.n_entries -= 1
+ elif s1 != 'msg' and s2 == 'msg':
+ self.n_entries += 1
def __newHandle(self):
"""Helper method: creates a new random handle."""
Index: ServerMain.py
===================================================================
RCS file: /home/minion/cvsroot/src/minion/lib/mixminion/ServerMain.py,v
retrieving revision 1.1
retrieving revision 1.2
diff -u -d -r1.1 -r1.2
--- ServerMain.py 6 Aug 2002 16:09:21 -0000 1.1
+++ ServerMain.py 11 Aug 2002 07:50:34 -0000 1.2
@@ -5,7 +5,9 @@
The main loop and related functionality for a Mixminion server
- BUG: No support for public key encryption"""
+ BUG: No support for encrypting private keys.n"""
+
+import cPickle
from mixminion.Common import getLog, MixFatalError, MixError
@@ -43,6 +45,7 @@
raise MixFatalError()
class ServerState:
+ # XXXX This should be refactored. keys should be separated from queues.
# config
# log
# homedir
@@ -153,7 +156,6 @@
def getDeliverMBOXQueue(self, which):
return self.deliverMBOXQueue
-
class _Server(MMTPServer):
def __init__(self, config, serverState):
@@ -164,27 +166,96 @@
def onMessageReceived(self, msg):
self.incomingQueue.queueMessage(msg)
- def onMessageSent(self, msg):
- self.outgoingQueue.remove
-
+ def onMessageSent(self, msg, handle):
+ self.outgoingQueue.remove(handle)
def runServer(config):
s = ServerState(config)
+ log = getLog()
packetHandler = s.getPacketHandler()
context = s.getTLSContext()
- shouldProcess = len(os.listdir(s.incomingDir))
- shouldSend = len(os.listdir(s.outgoingDir))
- shouldMBox = len(os.listdir(s.deliverMBOXDir))
# XXXX Make these configurable; make mixing OO.
mixInterval = 60
mixPoolMinSize = 5
- mixPoolMaxRate = 5
+ mixPoolMaxRate = 0.5
nextMixTime = time.time() + mixInterval
- server = mixminion.MMTPServer.MMTPServer(config)
+ server = _Server(config, s)
+
+ incomingQueue = s.getIncomingQueue()
+ outgoingQueue = s.getOutgoingQueue()
+ mixQueue = s.getMixQueue()
+ deliverMBOXQueue = s.getDeliverMBOXQueue()
+ while 1: # Add shutdown mechanism XXXX
+ server.process(1)
+
+ # Possible timing attack here????? XXXXX ????
+ if incomingQueue.count():
+ for h in incomingQueue.getAllMessages():
+ msg = incomingQueue.messageContents(h)
+ res = None
+ try:
+ res = packetHandler.processHandler(msg)
+ if res is None: log.info("Padding message dropped")
+ except miximinion.Packet.ParseError, e:
+ log.warn("Malformed message dropped:"+str(e))
+ except miximinion.Crypto.CryptoError, e:
+ log.warn("Invalid PK or misencrypted packet header:"+str(e))
+ except mixminion.PacketHandler.ContentError, e:
+ log.warn("Discarding bad packet:"+str(e))
+
+ if res is not None:
+ f, newHandle = mixQueue.openNewMessage()
+ cPickle.dump(res, f, 1)
+ mixQueue.finishMessage(f, newHandle)
+ log.info("Message added to mix queue")
+
+ incomingQueue.removeMessage(h)
+
+ if time.time() > nextMixTime:
+ nextMixTime = time.time() + mixInterval
+
+ poolSize = mixQueue.count()
+ if poolSize > mixPoolMinSize:
+ beginSending = {}
+ n = min(poolSize-mixPoolMinSize, int(poolSize*mixPoolMaxRate))
+ handles = mixQueue.pickRandom(n)
+ for h in handles:
+ f = mixQueue.openMessage(h)
+ type, data = cPickle.load(f)
+ f.close()
+ if type == 'QUEUE':
+ newHandle = mixQueue.moveMessage(h, outgoingQueue)
+ ipv4info, payload = data
+ beginSending.setdefault(ipv4info,[]).append(
+ (newHandle, payload))
+ else:
+ assert type == 'EXIT'
+ # XXXX Use modulemanager for this.
+ rt, ri, appKey, payload = data
+ if rt == Modules.DROP_TYPE:
+ mixQueue.removeMessage(h)
+ elif rt == Moddules.MBOX_TYPE:
+ mixQueue.moveMessage(h, deliverMBOXQueue)
+ else:
+ # XXXX Shouldn't we drop stuff as early as possible,
+ # XXXX so that we don't wind up with only a single
+ # XXXX message sent out of the server?
+ log.warn("Dropping unhandled type 0x%04x",rt)
+ mixQueue.removeMessage(h)
+
+ if beginSending:
+ # XXXX Handle timeouts; handle if we're already sending
+ # XXXX to a server.
+ for addr, messages in beginSending.items():
+ handles, payloads = zip(*messages)
+ server.sendMessages(addr.ip, addr.port, addr.keyinfo,
+ payloads, handles)
+
+ # XXXX Retry and resend after failure.
+ # XXXX Scan for messages we should begin sending after restart.
+ # XXXX Remove long-undeliverable messages
+ # XXXX Deliver MBOX messages.
- while 1:
-
-
Index: test.py
===================================================================
RCS file: /home/minion/cvsroot/src/minion/lib/mixminion/test.py,v
retrieving revision 1.16
retrieving revision 1.17
diff -u -d -r1.16 -r1.17
--- test.py 6 Aug 2002 16:09:21 -0000 1.16
+++ test.py 11 Aug 2002 07:50:34 -0000 1.17
@@ -1347,17 +1347,18 @@
queue1.cleanQueue()
queue2.cleanQueue()
-#----------------------------------------------------------------------
+#---------------------------------------------------------------------
# LOGGING
class LogTests(unittest.TestCase):
def testLogging(self):
import cStringIO
- from mixminion.Common import Log, FileLogTarget, ConsoleLogTarget
+ from mixminion.Common import Log, _FileLogHandler, _ConsoleLogHandler
log = Log("INFO")
self.assertEquals(log.getMinSeverity(), "INFO")
+ log.handlers = []
log.log("WARN", "This message should not appear")
buf = cStringIO.StringIO()
- log.addHandler(ConsoleLogTarget(buf))
+ log.addHandler(_ConsoleLogHandler(buf))
log.trace("Foo")
self.assertEquals(buf.getvalue(), "")
log.log("WARN", "Hello%sworld", ", ")
@@ -1371,7 +1372,7 @@
t = tempfile.mktemp("log")
t1 = t+"1"
unlink_on_exit(t, t1)
- log.addHandler(FileLogTarget(t))
+ log.addHandler(_FileLogHandler(t))
log.info("Abc")
log.info("Def")
os.rename(t,t1)
@@ -1514,7 +1515,8 @@
messages = ["helloxxx"*4096, "helloyyy"*4096]
async = mixminion.MMTPServer.AsyncServer()
clientcon = mixminion.MMTPServer.MMTPClientConnection(
- _getTLSContext(0), "127.0.0.1", TEST_PORT, keyid, messages[:], None)
+ _getTLSContext(0), "127.0.0.1", TEST_PORT, keyid, messages[:],
+ [None, None], None)
clientcon.register(async)
def clientThread(clientcon=clientcon, async=async):
while not clientcon.isShutdown():
@@ -1536,7 +1538,7 @@
# Again, with bad keyid.
clientcon = mixminion.MMTPServer.MMTPClientConnection(
_getTLSContext(0), "127.0.0.1", TEST_PORT, "Z"*20,
- messages[:], None)
+ messages[:], [None, None], None)
clientcon.register(async)
def clientThread(clientcon=clientcon, async=async):
while not clientcon.isShutdown():