[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]
[minion-cvs] Do not mix tabs and spaces in our source files; use spa...
Update of /home/minion/cvsroot/src/minion/lib/mixminion/server
In directory moria.mit.edu:/tmp/cvs-serv9270/lib/mixminion/server
Modified Files:
HashLog.py MMTPServer.py Modules.py PacketHandler.py Queue.py
ServerConfig.py ServerKeys.py ServerMain.py
Log Message:
Do not mix tabs and spaces in our source files; use spaces only.
Index: HashLog.py
===================================================================
RCS file: /home/minion/cvsroot/src/minion/lib/mixminion/server/HashLog.py,v
retrieving revision 1.2
retrieving revision 1.3
diff -u -d -r1.2 -r1.3
--- HashLog.py 12 Dec 2002 19:56:47 -0000 1.2
+++ HashLog.py 16 Dec 2002 02:40:11 -0000 1.3
@@ -57,9 +57,9 @@
"""Create a new HashLog to store data in 'filename' for the key
'keyid'."""
parent = os.path.split(filename)[0]
- createPrivateDir(parent)
+ createPrivateDir(parent)
self.log = anydbm.open(filename, 'c')
- LOG.debug("Opening database %s for packet digests", filename)
+ LOG.debug("Opening database %s for packet digests", filename)
if isinstance(self.log, dumbdbm._Database):
LOG.warn("Warning: logging packet digests to a flat file.")
try:
@@ -68,24 +68,24 @@
except KeyError:
self.log["KEYID"] = keyid
- self.journalFileName = filename+"_jrnl"
- self.journal = {}
- if os.path.exists(self.journalFileName):
- f = open(self.journalFileName, 'r')
- # FFFF deal with really big journals?
- j = f.read()
- for i in xrange(0, len(j), DIGEST_LEN):
- self.journal[j[i:i+DIGEST_LEN]] = 1
- f.close()
+ self.journalFileName = filename+"_jrnl"
+ self.journal = {}
+ if os.path.exists(self.journalFileName):
+ f = open(self.journalFileName, 'r')
+ # FFFF deal with really big journals?
+ j = f.read()
+ for i in xrange(0, len(j), DIGEST_LEN):
+ self.journal[j[i:i+DIGEST_LEN]] = 1
+ f.close()
- self.journalFile = os.open(self.journalFileName,
- _JOURNAL_OPEN_FLAGS|os.O_APPEND, 0600)
+ self.journalFile = os.open(self.journalFileName,
+ _JOURNAL_OPEN_FLAGS|os.O_APPEND, 0600)
def seenHash(self, hash):
"""Return true iff 'hash' has been logged before."""
try:
- if self.journal.get(hash,0):
- return 1
+ if self.journal.get(hash,0):
+ return 1
_ = self.log[hash]
return 1
except KeyError:
@@ -93,25 +93,23 @@
def logHash(self, hash):
"""Insert 'hash' into the database."""
- assert len(hash) == DIGEST_LEN
- self.journal[hash] = 1
- os.write(self.journalFile, hash)
+ assert len(hash) == DIGEST_LEN
+ self.journal[hash] = 1
+ os.write(self.journalFile, hash)
def sync(self):
"""Flushes changes to this log to the filesystem."""
- for hash in self.journal.keys():
- self.log[hash] = "1"
+ for hash in self.journal.keys():
+ self.log[hash] = "1"
if hasattr(self.log, "sync"):
self.log.sync()
- os.close(self.journalFile)
- self.journalFile = os.open(self.journalFileName,
- _JOURNAL_OPEN_FLAGS|os.O_TRUNC, 0600)
- self.journal = {}
+ os.close(self.journalFile)
+ self.journalFile = os.open(self.journalFileName,
+ _JOURNAL_OPEN_FLAGS|os.O_TRUNC, 0600)
+ self.journal = {}
def close(self):
"""Closes this log."""
self.sync()
self.log.close()
- os.close(self.journalFile)
-
-
+ os.close(self.journalFile)
Index: MMTPServer.py
===================================================================
RCS file: /home/minion/cvsroot/src/minion/lib/mixminion/server/MMTPServer.py,v
retrieving revision 1.3
retrieving revision 1.4
diff -u -d -r1.3 -r1.4
--- MMTPServer.py 15 Dec 2002 04:48:46 -0000 1.3
+++ MMTPServer.py 16 Dec 2002 02:40:11 -0000 1.4
@@ -69,9 +69,9 @@
"""
## trace("%s readers (%s), %s writers (%s)" % (len(self.readers),
-## readers,
-## len(self.writers),
-## writers))
+## readers,
+## len(self.writers),
+## writers))
readfds = self.readers.keys()
writefds = self.writers.keys()
@@ -95,14 +95,14 @@
if self.writers.has_key(fd): del self.writers[fd]
def hasReader(self, reader):
- """Return true iff 'reader' is a reader on this server."""
- fd = reader.fileno()
- return self.readers.get(fd, None) is reader
+ """Return true iff 'reader' is a reader on this server."""
+ fd = reader.fileno()
+ return self.readers.get(fd, None) is reader
def hasWriter(self, writer):
- """Return true iff 'writer' is a writer on this server."""
- fd = writer.fileno()
- return self.writers.get(fd, None) is writer
+ """Return true iff 'writer' is a writer on this server."""
+ fd = writer.fileno()
+ return self.writers.get(fd, None) is writer
def registerReader(self, reader):
"""Register a connection as a reader. The connection's 'handleRead'
@@ -238,7 +238,7 @@
"""
self.__sock = sock
self.__con = tls
- self.fd = self.__con.fileno()
+ self.fd = self.__con.fileno()
if serverMode:
self.__state = self.__acceptFn
@@ -305,26 +305,26 @@
def __shutdownFn(self):
"""Hook to implement shutdown."""
- # This is a bit subtle. The underlying 'shutdown' method
- # needs to be retried till the other guy sends an 'ack'
- # back... but we don't want to keep retrying indefinitely, or
- # else we can deadlock on a connection from ourself to
- # ourself.
- if self.__con.shutdown() == 1: #may throw want*
- trace("Got a 1 on shutdown (fd %s)", self.fd)
- self.__server.unregister(self)
- self.__state = None
- self.__sock.close()
- self.shutdownFinished()
- return
+ # This is a bit subtle. The underlying 'shutdown' method
+ # needs to be retried till the other guy sends an 'ack'
+ # back... but we don't want to keep retrying indefinitely, or
+ # else we can deadlock on a connection from ourself to
+ # ourself.
+ if self.__con.shutdown() == 1: #may throw want*
+ trace("Got a 1 on shutdown (fd %s)", self.fd)
+ self.__server.unregister(self)
+ self.__state = None
+ self.__sock.close()
+ self.shutdownFinished()
+ return
- # If we don't get any response on shutdown, stop blocking; the other
- # side may be hostile, confused, or deadlocking.
- trace("Got a 0 on shutdown (fd %s)", self.fd)
- # ???? Is 'wantread' always correct?
- # ???? Rather than waiting for a read, should we use a timer or
- # ???? something?
- raise _ml.TLSWantRead()
+ # If we don't get any response on shutdown, stop blocking; the other
+ # side may be hostile, confused, or deadlocking.
+ trace("Got a 0 on shutdown (fd %s)", self.fd)
+ # ???? Is 'wantread' always correct?
+ # ???? Rather than waiting for a read, should we use a timer or
+ # ???? something?
+ raise _ml.TLSWantRead()
def __readFn(self):
"""Hook to implement read"""
@@ -347,12 +347,12 @@
if self.__expectReadLen and self.__inbuflen > self.__expectReadLen:
warn("Protocol violation: too much data. Closing connection to %s",
- self.address)
+ self.address)
self.shutdown(err=1, retriable=0)
return
if self.__terminator and stringContains(self.__inbuf[0],
- self.__terminator):
+ self.__terminator):
trace("read found terminator (fd %s)", self.fd)
self.__server.unregister(self)
self.finished()
@@ -368,7 +368,7 @@
while len(out):
r = self.__con.write(out) # may throw
- assert r > 0
+ assert r > 0
out = out[r:]
self.__outbuf = out
@@ -393,7 +393,7 @@
# We have a while loop here so that, upon entering a new
# state, we immediately see if we can go anywhere with it
# without blocking.
- while self.__state is not None:
+ while self.__state is not None:
self.__state()
except _ml.TLSWantWrite:
self.__server.registerWriter(self)
@@ -401,17 +401,17 @@
self.__server.registerReader(self)
except _ml.TLSClosed:
warn("Unexpectedly closed connection to %s", self.address)
- self.handleFail(retriable=1)
+ self.handleFail(retriable=1)
self.__sock.close()
self.__server.unregister(self)
except _ml.TLSError:
if self.__state != self.__shutdownFn:
warn("Unexpected error: closing connection to %s",
- self.address)
+ self.address)
self.shutdown(err=1, retriable=1)
else:
warn("Error while shutting down: closing connection to %s",
- self.address)
+ self.address)
self.__server.unregister(self)
else:
# We are in no state at all.
@@ -427,8 +427,8 @@
def shutdown(self, err=0, retriable=0):
"""Begin a shutdown on this connection"""
- if err:
- self.handleFail(retriable)
+ if err:
+ self.handleFail(retriable)
self.__state = self.__shutdownFn
def fileno(self):
@@ -442,8 +442,8 @@
return self.__con.get_peer_cert_pk()
def handleFail(self, retriable=0):
- """Called when we shutdown with an error."""
- pass
+ """Called when we shutdown with an error."""
+ pass
#----------------------------------------------------------------------
# Implementation for MMTP.
@@ -471,9 +471,9 @@
# finished: callback when we're done with a read or write; see
# SimpleTLSConnection.
def __init__(self, sock, tls, consumer):
- """Create an MMTP connection to receive messages sent along a given
- socket. When valid packets are received, pass them to the
- function 'consumer'."""
+ """Create an MMTP connection to receive messages sent along a given
+ socket. When valid packets are received, pass them to the
+ function 'consumer'."""
SimpleTLSConnection.__init__(self, sock, tls, 1,
"%s:%s"%sock.getpeername())
self.messageConsumer = consumer
@@ -500,7 +500,7 @@
protocols = m.group(1).split(",")
if "0.1" not in protocols:
warn("Unsupported protocol list. Closing connection to %s",
- self.address)
+ self.address)
self.shutdown(err=1); return
else:
trace("protocol ok (fd %s)", self.fd)
@@ -530,17 +530,17 @@
replyDigest = sha1(msg+"RECEIVED")
else:
warn("Unrecognized command from %s. Closing connection.",
- self.address)
+ self.address)
self.shutdown(err=1)
return
if expectedDigest != digest:
warn("Invalid checksum from %s. Closing connection",
- self.address)
+ self.address)
self.shutdown(err=1)
return
else:
debug("%s packet received from %s; Checksum valid.",
- data[:4], self.address)
+ data[:4], self.address)
self.finished = self.__sentAck
self.beginWrite(RECEIVED_CONTROL+replyDigest)
self.messageConsumer(msg)
@@ -564,13 +564,13 @@
# As described in the docstring for __init__ below.
def __init__(self, context, ip, port, keyID, messageList, handleList,
sentCallback=None, failCallback=None):
- """Create a connection to send messages to an MMTP server.
- ip -- The IP of the destination server.
- port -- The port to connect to.
- keyID -- None, or the expected SHA1 hash of the server's public key
- messageList -- a list of message payloads to send
- handleList -- a list of objects corresponding to the messages in
- messageList. Used for callback.
+ """Create a connection to send messages to an MMTP server.
+ ip -- The IP of the destination server.
+ port -- The port to connect to.
+ keyID -- None, or the expected SHA1 hash of the server's public key
+ messageList -- a list of message payloads to send
+ handleList -- a list of objects corresponding to the messages in
+ messageList. Used for callback.
sentCallback -- None, or a function of (msg, handle) to be called
whenever a message is successfully sent.
failCallback -- None, or a function of (msg, handle, retriable)
@@ -590,10 +590,10 @@
SimpleTLSConnection.__init__(self, sock, tls, 0, "%s:%s"%(ip,port))
self.messageList = messageList
- self.handleList = handleList
+ self.handleList = handleList
self.finished = self.__setupFinished
self.sentCallback = sentCallback
- self.failCallback = failCallback
+ self.failCallback = failCallback
debug("Opening client connection (fd %s)", self.fd)
@@ -605,7 +605,7 @@
if self.keyID is not None:
if keyID != self.keyID:
warn("Got unexpected Key ID from %s", self.address)
- # This may work again in a couple of hours
+ # This may work again in a couple of hours
self.shutdown(err=1,retriable=1)
else:
debug("KeyID from %s is valid", self.address)
@@ -627,8 +627,8 @@
inp = self.getInput()
if inp != PROTOCOL_STRING:
warn("Invalid protocol. Closing connection to %s", self.address)
- # This isn't retriable; we don't talk to servers we don't
- # understand.
+ # This isn't retriable; we don't talk to servers we don't
+ # understand.
self.shutdown(err=1,retriable=0)
return
@@ -642,7 +642,7 @@
msg = self.messageList[0]
self.expectedDigest = sha1(msg+"RECEIVED")
msg = SEND_CONTROL+msg+sha1(msg+"SEND")
- assert len(msg) == SEND_RECORD_LEN
+ assert len(msg) == SEND_RECORD_LEN
self.beginWrite(msg)
self.finished = self.__sentMessage
@@ -668,8 +668,8 @@
# FFFF Rehandshake
inp = self.getInput()
if inp != (RECEIVED_CONTROL+self.expectedDigest):
- # We only get bad ACKs if an adversary somehow subverts TLS's
- # checksumming. That's not fixable.
+ # We only get bad ACKs if an adversary somehow subverts TLS's
+ # checksumming. That's not fixable.
self.shutdown(err=1,retriable=0)
return
@@ -684,10 +684,10 @@
self.beginNextMessage()
def handleFail(self, retriable):
- """Invoked when a message is not deliverable."""
- if self.failCallback is not None:
- for msg, handle in zip(self.messageList, self.handleList):
- self.failCallback(msg,handle,retriable)
+ """Invoked when a message is not deliverable."""
+ if self.failCallback is not None:
+ for msg, handle in zip(self.messageList, self.handleList):
+ self.failCallback(msg,handle,retriable)
LISTEN_BACKLOG = 10 # ???? Is something else more reasonable?
class MMTPAsyncServer(AsyncServer):
@@ -695,49 +695,49 @@
MMTPClientConnection, with a function to add new connections, and
callbacks for message success and failure."""
def __init__(self, config, tls):
- AsyncServer.__init__(self)
+ AsyncServer.__init__(self)
self.context = tls
- # FFFF Don't always listen; don't always retransmit!
- # FFFF Support listening on specific IPs
+ # FFFF Don't always listen; don't always retransmit!
+ # FFFF Support listening on specific IPs
self.listener = ListenConnection(config['Incoming/MMTP']['IP'],
config['Incoming/MMTP']['Port'],
- LISTEN_BACKLOG,
+ LISTEN_BACKLOG,
self._newMMTPConnection)
- #self.config = config
+ #self.config = config
self.listener.register(self)
def _newMMTPConnection(self, sock):
- """helper method. Creates and registers a new server connection when
- the listener socket gets a hit."""
+ """helper method. Creates and registers a new server connection when
+ the listener socket gets a hit."""
# FFFF Check whether incoming IP is allowed!
tls = self.context.sock(sock, serverMode=1)
sock.setblocking(0)
con = MMTPServerConnection(sock, tls, self.onMessageReceived)
con.register(self)
- return con
+ return con
def stopListening(self):
- self.listener.shutdown()
+ self.listener.shutdown()
def sendMessages(self, ip, port, keyID, messages, handles):
- """Begin sending a set of messages to a given server."""
- # ???? Can we remove these asserts yet?
- for m,h in zip(messages, handles):
- assert len(m) == MESSAGE_LEN
- assert len(h) < 32
+ """Begin sending a set of messages to a given server."""
+ # ???? Can we remove these asserts yet?
+ for m,h in zip(messages, handles):
+ assert len(m) == MESSAGE_LEN
+ assert len(h) < 32
con = MMTPClientConnection(self.context,
- ip, port, keyID, messages, handles,
+ ip, port, keyID, messages, handles,
self.onMessageSent,
- self.onMessageUndeliverable)
+ self.onMessageUndeliverable)
con.register(self)
def onMessageReceived(self, msg):
pass
def onMessageUndeliverable(self, msg, handle, retriable):
- pass
+ pass
def onMessageSent(self, msg, handle):
pass
Index: Modules.py
===================================================================
RCS file: /home/minion/cvsroot/src/minion/lib/mixminion/server/Modules.py,v
retrieving revision 1.3
retrieving revision 1.4
diff -u -d -r1.3 -r1.4
--- Modules.py 15 Dec 2002 04:35:55 -0000 1.3
+++ Modules.py 16 Dec 2002 02:40:11 -0000 1.4
@@ -10,8 +10,8 @@
# FFFF Maybe we should refactor MMTP delivery here too.
__all__ = [ 'ModuleManager', 'DeliveryModule',
- 'DELIVER_OK', 'DELIVER_FAIL_RETRY', 'DELIVER_FAIL_NORETRY'
- ]
+ 'DELIVER_OK', 'DELIVER_FAIL_RETRY', 'DELIVER_FAIL_NORETRY'
+ ]
import os
import re
@@ -43,66 +43,66 @@
validate its configuration, and configure itself.
* If it is advertisable, it must provide a server info block.
* It must know its own name.
- * It must know which types it handles.
- * Of course, it needs to know how to deliver a message."""
+ * It must know which types it handles.
+ * Of course, it needs to know how to deliver a message."""
# FFFF DeliveryModules need to know about the AsyncServer object in
# FFFF case they support asynchronous delivery.
def __init__(self):
- "Zero-argument constructor, as required by Module protocol."
- pass
+ "Zero-argument constructor, as required by Module protocol."
+ pass
def getConfigSyntax(self):
- """Return a map from section names to section syntax, as described
- in Config.py"""
+ """Return a map from section names to section syntax, as described
+ in Config.py"""
raise NotImplementedError("getConfigSyntax")
def validateConfig(self, sections, entries, lines, contents):
- """See mixminion.Config.validate"""
+ """See mixminion.Config.validate"""
pass
def configure(self, config, manager):
- """Configure this object using a given Config object, and (if
- required) register it with the module manager."""
+ """Configure this object using a given Config object, and (if
+ required) register it with the module manager."""
raise NotImplementedError("configure")
def getServerInfoBlock(self):
- """Return a block for inclusion in a server descriptor."""
+ """Return a block for inclusion in a server descriptor."""
raise NotImplementedError("getServerInfoBlock")
def getName(self):
- """Return the name of this module. This name may be used to construct
- directory paths, so it shouldn't contain any funny characters."""
+ """Return the name of this module. This name may be used to construct
+ directory paths, so it shouldn't contain any funny characters."""
raise NotImplementedError("getName")
def getExitTypes(self):
- """Return a sequence of numeric exit types that this module can
+ """Return a sequence of numeric exit types that this module can
handle."""
raise NotImplementedError("getExitTypes")
def createDeliveryQueue(self, queueDir):
- """Return a DeliveryQueue object suitable for delivering messages
- via this module. The default implementation returns a
- SimpleModuleDeliveryQueue, which (though adequate) doesn't
- batch messages intended for the same destination.
+ """Return a DeliveryQueue object suitable for delivering messages
+ via this module. The default implementation returns a
+ SimpleModuleDeliveryQueue, which (though adequate) doesn't
+ batch messages intended for the same destination.
- For the 'address' component of the delivery queue, modules must
- accept a tuple of: (exitType, address, tag). If 'tag' is None,
- the message has been decrypted; if 'tag' is 'err', the message is
- corrupt. Otherwise, the message is either a reply or an encrypted
- forward message
- """
+ For the 'address' component of the delivery queue, modules must
+ accept a tuple of: (exitType, address, tag). If 'tag' is None,
+ the message has been decrypted; if 'tag' is 'err', the message is
+ corrupt. Otherwise, the message is either a reply or an encrypted
+ forward message
+ """
return SimpleModuleDeliveryQueue(self, queueDir)
def processMessage(self, message, tag, exitType, exitInfo):
- """Given a message with a given exitType and exitInfo, try to deliver
+ """Given a message with a given exitType and exitInfo, try to deliver
it. 'tag' is as decribed in createDeliveryQueue. Return one of:
DELIVER_OK (if the message was successfully delivered),
- DELIVER_FAIL_RETRY (if the message wasn't delivered, but might be
+ DELIVER_FAIL_RETRY (if the message wasn't delivered, but might be
deliverable later), or
- DELIVER_FAIL_NORETRY (if the message shouldn't be tried later).
+ DELIVER_FAIL_NORETRY (if the message shouldn't be tried later).
- (This method is only used by your delivery queue; if you use
- a nonstandard delivery queue, you don't need to implement this."""
+ (This method is only used by your delivery queue; if you use
+ a nonstandard delivery queue, you don't need to implement this."""
raise NotImplementedError("processMessage")
class ImmediateDeliveryQueue:
@@ -112,26 +112,26 @@
##Fields:
# module: the underlying DeliveryModule object.
def __init__(self, module):
- self.module = module
+ self.module = module
def queueDeliveryMessage(self, (exitType, address, tag), message):
- """Instead of queueing our message, pass it directly to the underlying
- DeliveryModule."""
- try:
- res = self.module.processMessage(message, tag, exitType, address)
- if res == DELIVER_OK:
- return
- elif res == DELIVER_FAIL_RETRY:
- LOG.error("Unable to retry delivery for message")
- else:
- LOG.error("Unable to deliver message")
- except:
- LOG.error_exc(sys.exc_info(),
- "Exception delivering message")
+ """Instead of queueing our message, pass it directly to the underlying
+ DeliveryModule."""
+ try:
+ res = self.module.processMessage(message, tag, exitType, address)
+ if res == DELIVER_OK:
+ return
+ elif res == DELIVER_FAIL_RETRY:
+ LOG.error("Unable to retry delivery for message")
+ else:
+ LOG.error("Unable to deliver message")
+ except:
+ LOG.error_exc(sys.exc_info(),
+ "Exception delivering message")
def sendReadyMessages(self):
- # We do nothing here; we already delivered the messages
- pass
+ # We do nothing here; we already delivered the messages
+ pass
class SimpleModuleDeliveryQueue(mixminion.server.Queue.DeliveryQueue):
"""Helper class used as a default delivery queue for modules that
@@ -139,25 +139,25 @@
## Fields:
# module: the underlying module.
def __init__(self, module, directory):
- mixminion.server.Queue.DeliveryQueue.__init__(self, directory)
- self.module = module
+ mixminion.server.Queue.DeliveryQueue.__init__(self, directory)
+ self.module = module
def _deliverMessages(self, msgList):
- for handle, addr, message, n_retries in msgList:
- try:
- exitType, address, tag = addr
- result = self.module.processMessage(message,tag,exitType,address)
- if result == DELIVER_OK:
- self.deliverySucceeded(handle)
- elif result == DELIVER_FAIL_RETRY:
- self.deliveryFailed(handle, 1)
- else:
- LOG.error("Unable to deliver message")
- self.deliveryFailed(handle, 0)
- except:
- LOG.error_exc(sys.exc_info(),
- "Exception delivering message")
- self.deliveryFailed(handle, 0)
+ for handle, addr, message, n_retries in msgList:
+ try:
+ exitType, address, tag = addr
+ result = self.module.processMessage(message,tag,exitType,address)
+ if result == DELIVER_OK:
+ self.deliverySucceeded(handle)
+ elif result == DELIVER_FAIL_RETRY:
+ self.deliveryFailed(handle, 1)
+ else:
+ LOG.error("Unable to deliver message")
+ self.deliveryFailed(handle, 0)
+ except:
+ LOG.error_exc(sys.exc_info(),
+ "Exception delivering message")
+ self.deliveryFailed(handle, 0)
class ModuleManager:
"""A ModuleManager knows about all of the server modules in the system.
@@ -190,16 +190,16 @@
# called?
def __init__(self):
- "Create a new ModuleManager"
+ "Create a new ModuleManager"
self.syntax = {}
self.modules = []
self.enabled = {}
- self.nameToModule = {}
+ self.nameToModule = {}
self.typeToModule = {}
- self.path = []
- self.queueRoot = None
- self.queues = {}
+ self.path = []
+ self.queueRoot = None
+ self.queues = {}
self.registerModule(MBoxModule())
self.registerModule(DropModule())
@@ -212,18 +212,18 @@
return self._isConfigured
def _setQueueRoot(self, queueRoot):
- """Sets a directory under which all modules' queue directories
- should go."""
+ """Sets a directory under which all modules' queue directories
+ should go."""
self.queueRoot = queueRoot
def getConfigSyntax(self):
- """Returns a dict to extend the syntax configuration in a Config
- object. Should be called after all modules are registered."""
+ """Returns a dict to extend the syntax configuration in a Config
+ object. Should be called after all modules are registered."""
return self.syntax
def registerModule(self, module):
- """Inform this ModuleManager about a delivery module. This method
- updates the syntax options, but does not enable the module."""
+ """Inform this ModuleManager about a delivery module. This method
+ updates the syntax options, but does not enable the module."""
LOG.info("Loading module %s", module.getName())
self.modules.append(module)
syn = module.getConfigSyntax()
@@ -231,56 +231,56 @@
if self.syntax.has_key(sec):
raise ConfigError("Multiple modules want to define [%s]"% sec)
self.syntax.update(syn)
- self.nameToModule[module.getName()] = module
+ self.nameToModule[module.getName()] = module
def setPath(self, path):
- """Sets the search path for Python modules"""
- if path:
- self.path = path.split(":")
- else:
- self.path = []
+ """Sets the search path for Python modules"""
+ if path:
+ self.path = path.split(":")
+ else:
+ self.path = []
def loadExtModule(self, className):
- """Load and register a module from a python file. Takes a classname
+ """Load and register a module from a python file. Takes a classname
of the format module.Class or package.module.Class. Raises
- MixError if the module can't be loaded."""
+ MixError if the module can't be loaded."""
ids = className.split(".")
pyPkg = ".".join(ids[:-1])
pyClassName = ids[-1]
- orig_path = sys.path[:]
- LOG.info("Loading module %s", className)
+ orig_path = sys.path[:]
+ LOG.info("Loading module %s", className)
try:
- sys.path[0:0] = self.path
- try:
- m = __import__(pyPkg, {}, {}, [pyClassName])
- except ImportError, e:
- raise MixError("%s while importing %s" %(str(e),className))
+ sys.path[0:0] = self.path
+ try:
+ m = __import__(pyPkg, {}, {}, [pyClassName])
+ except ImportError, e:
+ raise MixError("%s while importing %s" %(str(e),className))
finally:
sys.path = orig_path
- try:
- pyClass = getattr(m, pyClassName)
- except AttributeError, e:
- raise MixError("No class %s in module %s" %(pyClassName,pyPkg))
- try:
- self.registerModule(pyClass())
- except Exception, e:
- raise MixError("Error initializing module %s" %className)
+ try:
+ pyClass = getattr(m, pyClassName)
+ except AttributeError, e:
+ raise MixError("No class %s in module %s" %(pyClassName,pyPkg))
+ try:
+ self.registerModule(pyClass())
+ except Exception, e:
+ raise MixError("Error initializing module %s" %className)
def validate(self, sections, entries, lines, contents):
- # (As in ServerConfig)
+ # (As in ServerConfig)
for m in self.modules:
m.validateConfig(sections, entries, lines, contents)
def configure(self, config):
- self._setQueueRoot(os.path.join(config['Server']['Homedir'],
- 'work', 'queues', 'deliver'))
- createPrivateDir(self.queueRoot)
+ self._setQueueRoot(os.path.join(config['Server']['Homedir'],
+ 'work', 'queues', 'deliver'))
+ createPrivateDir(self.queueRoot)
for m in self.modules:
m.configure(config, self)
self._isConfigured = 1
def enableModule(self, module):
- """Sets up the module manager to deliver all messages whose exitTypes
+ """Sets up the module manager to deliver all messages whose exitTypes
are returned by <module>.getExitTypes() to the module."""
for t in module.getExitTypes():
if (self.typeToModule.has_key(t) and
@@ -292,54 +292,54 @@
module.getName(),
map(hex, module.getExitTypes()))
- queueDir = os.path.join(self.queueRoot, module.getName())
- queue = module.createDeliveryQueue(queueDir)
- self.queues[module.getName()] = queue
+ queueDir = os.path.join(self.queueRoot, module.getName())
+ queue = module.createDeliveryQueue(queueDir)
+ self.queues[module.getName()] = queue
self.enabled[module.getName()] = 1
def cleanQueues(self):
- """Remove trash messages from all internal queues."""
- for queue in self.queues.values():
- queue.cleanQueue()
+ """Remove trash messages from all internal queues."""
+ for queue in self.queues.values():
+ queue.cleanQueue()
def disableModule(self, module):
- """Unmaps all the types for a module object."""
+ """Unmaps all the types for a module object."""
LOG.info("Disabling module %s", module.getName())
for t in module.getExitTypes():
if (self.typeToModule.has_key(t) and
self.typeToModule[t].getName() == module.getName()):
del self.typeToModule[t]
- if self.queues.has_key(module.getName()):
- del self.queues[module.getName()]
+ if self.queues.has_key(module.getName()):
+ del self.queues[module.getName()]
if self.enabled.has_key(module.getName()):
del self.enabled[module.getName()]
def queueMessage(self, message, tag, exitType, address):
- """Queue a message for delivery."""
- # FFFF Support non-exit messages.
+ """Queue a message for delivery."""
+ # FFFF Support non-exit messages.
mod = self.typeToModule.get(exitType, None)
if mod is None:
LOG.error("Unable to handle message with unknown type %s",
exitType)
- return
- queue = self.queues[mod.getName()]
- LOG.debug("Delivering message %r (type %04x) via module %s",
- message[:8], exitType, mod.getName())
- try:
- payload = mixminion.BuildMessage.decodePayload(message, tag)
- except MixError:
- queue.queueDeliveryMessage((exitType, address, 'err'), message)
- return
- if payload is None:
- # enrypted message
- queue.queueDeliveryMessage((exitType, address, tag), message)
- else:
- # forward message
- queue.queueDeliveryMessage((exitType, address, None), payload)
+ return
+ queue = self.queues[mod.getName()]
+ LOG.debug("Delivering message %r (type %04x) via module %s",
+ message[:8], exitType, mod.getName())
+ try:
+ payload = mixminion.BuildMessage.decodePayload(message, tag)
+ except MixError:
+ queue.queueDeliveryMessage((exitType, address, 'err'), message)
+ return
+ if payload is None:
+ # enrypted message
+ queue.queueDeliveryMessage((exitType, address, tag), message)
+ else:
+ # forward message
+ queue.queueDeliveryMessage((exitType, address, None), payload)
def sendReadyMessages(self):
- for name, queue in self.queues.items():
- queue.sendReadyMessages()
+ for name, queue in self.queues.items():
+ queue.sendReadyMessages()
def getServerInfoBlocks(self):
return [ m.getServerInfoBlock() for m in self.modules
@@ -353,13 +353,13 @@
def getServerInfoBlock(self):
return ""
def configure(self, config, manager):
- manager.enableModule(self)
+ manager.enableModule(self)
def getName(self):
return "DROP"
def getExitTypes(self):
return [ mixminion.Packet.DROP_TYPE ]
def createDeliveryQueue(self, directory):
- return ImmediateDeliveryQueue(self)
+ return ImmediateDeliveryQueue(self)
def processMessage(self, message, tag, exitType, exitInfo):
LOG.debug("Dropping padding message")
return DELIVER_OK
@@ -388,8 +388,8 @@
self.addresses = {}
def getConfigSyntax(self):
- # FFFF There should be some way to say that fields are required
- # FFFF if the module is enabled.
+ # FFFF There should be some way to say that fields are required
+ # FFFF if the module is enabled.
return { "Delivery/MBOX" :
{ 'Enabled' : ('REQUIRE', _parseBoolean, "no"),
'AddressFile' : ('ALLOW', None, None),
@@ -399,66 +399,66 @@
}
def validateConfig(self, sections, entries, lines, contents):
- sec = sections['Delivery/MBOX']
- if not sec.get('Enabled'):
- return
- for field in ['AddressFile', 'ReturnAddress', 'RemoveContact',
- 'SMTPServer']:
- if not sec.get(field):
- raise ConfigError("Missing field %s in [Delivery/MBOX]"%field)
- if not os.path.exists(sec['AddressFile']):
- raise ConfigError("Address file %s seems not to exist."%
- sec['AddresFile'])
- for field in ['ReturnAddress', 'RemoveContact']:
- if not isSMTPMailbox(sec[field]):
- LOG.warn("Value of %s (%s) doesn't look like an email address",
- field, sec[field])
+ sec = sections['Delivery/MBOX']
+ if not sec.get('Enabled'):
+ return
+ for field in ['AddressFile', 'ReturnAddress', 'RemoveContact',
+ 'SMTPServer']:
+ if not sec.get(field):
+ raise ConfigError("Missing field %s in [Delivery/MBOX]"%field)
+ if not os.path.exists(sec['AddressFile']):
+ raise ConfigError("Address file %s seems not to exist."%
+ sec['AddresFile'])
+ for field in ['ReturnAddress', 'RemoveContact']:
+ if not isSMTPMailbox(sec[field]):
+ LOG.warn("Value of %s (%s) doesn't look like an email address",
+ field, sec[field])
def configure(self, config, moduleManager):
- if not config['Delivery/MBOX'].get("Enabled", 0):
- moduleManager.disableModule(self)
- return
+ if not config['Delivery/MBOX'].get("Enabled", 0):
+ moduleManager.disableModule(self)
+ return
- sec = config['Delivery/MBOX']
- self.server = sec['SMTPServer']
- self.addressFile = sec['AddressFile']
- self.returnAddress = sec['ReturnAddress']
- self.contact = sec['RemoveContact']
- # validate should have caught these.
- assert (self.server and self.addressFile and self.returnAddress
- and self.contact)
+ sec = config['Delivery/MBOX']
+ self.server = sec['SMTPServer']
+ self.addressFile = sec['AddressFile']
+ self.returnAddress = sec['ReturnAddress']
+ self.contact = sec['RemoveContact']
+ # validate should have caught these.
+ assert (self.server and self.addressFile and self.returnAddress
+ and self.contact)
self.nickname = config['Server']['Nickname']
if not self.nickname:
self.nickname = socket.gethostname()
self.addr = config['Incoming/MMTP'].get('IP', "<Unknown IP>")
- # Parse the address file.
- self.addresses = {}
+ # Parse the address file.
+ self.addresses = {}
f = open(self.addressFile)
- address_line_re = re.compile(r'\s*([^\s:=]+)\s*[:=]\s*(\S+)')
- try:
- lineno = 0
- while 1:
+ address_line_re = re.compile(r'\s*([^\s:=]+)\s*[:=]\s*(\S+)')
+ try:
+ lineno = 0
+ while 1:
line = f.readline()
if not line:
break
- line = line.strip()
- lineno += 1
- if line == '' or line[0] == '#':
- continue
- m = address_line_re.match(line)
- if not m:
- raise ConfigError("Bad address on line %s of %s"%(
- lineno,self.addressFile))
- self.addresses[m.group(1)] = m.group(2)
- LOG.trace("Mapping MBOX address %s -> %s", m.group(1),
- m.group(2))
- finally:
- f.close()
+ line = line.strip()
+ lineno += 1
+ if line == '' or line[0] == '#':
+ continue
+ m = address_line_re.match(line)
+ if not m:
+ raise ConfigError("Bad address on line %s of %s"%(
+ lineno,self.addressFile))
+ self.addresses[m.group(1)] = m.group(2)
+ LOG.trace("Mapping MBOX address %s -> %s", m.group(1),
+ m.group(2))
+ finally:
+ f.close()
- moduleManager.enableModule(self)
+ moduleManager.enableModule(self)
def getServerInfoBlock(self):
return """\
@@ -473,20 +473,20 @@
return [ mixminion.Packet.MBOX_TYPE ]
def processMessage(self, message, tag, exitType, address):
- # Determine that message's address;
+ # Determine that message's address;
assert exitType == mixminion.Packet.MBOX_TYPE
LOG.trace("Received MBOX message")
info = mixminion.Packet.parseMBOXInfo(address)
- try:
- address = self.addresses[info.user]
- except KeyError:
+ try:
+ address = self.addresses[info.user]
+ except KeyError:
LOG.error("Unknown MBOX user %r", info.user)
- return DELIVER_FAIL_NORETRY
+ return DELIVER_FAIL_NORETRY
- # Escape the message if it isn't plaintext ascii
+ # Escape the message if it isn't plaintext ascii
msg = _escapeMessageForEmail(message, tag)
- # Generate the boilerplate (FFFF Make this configurable)
+ # Generate the boilerplate (FFFF Make this configurable)
fields = { 'user': address,
'return': self.returnAddress,
'nickname': self.nickname,
@@ -515,7 +515,7 @@
def __init__(self):
DeliveryModule.__init__(self)
def getServerInfoBlock(self):
- return "[Delivery/SMTP]\nVersion: 0.1\n"
+ return "[Delivery/SMTP]\nVersion: 0.1\n"
def getName(self):
return "SMTP"
def getExitTypes(self):
@@ -552,12 +552,12 @@
}
def validateConfig(self, sections, entries, lines, contents):
- # Currently, we accept any configuration options that the config allows
+ # Currently, we accept any configuration options that the config allows
pass
def configure(self, config, manager):
sec = config['Delivery/SMTP-Via-Mixmaster']
- if not sec.get("Enabled", 0):
+ if not sec.get("Enabled", 0):
manager.disableModule(self)
return
cmd = sec['MixCommand']
@@ -565,23 +565,23 @@
self.subject = sec['SubjectLine']
self.command = cmd[0]
self.options = tuple(cmd[1]) + ("-l", self.server,
- "-s", self.subject)
+ "-s", self.subject)
manager.enableModule(self)
def getName(self):
return "SMTP_MIX2"
def createDeliveryQueue(self, queueDir):
- # We create a temporary queue so we can hold files there for a little
- # while before passing their names to mixmaster.
+ # We create a temporary queue so we can hold files there for a little
+ # while before passing their names to mixmaster.
self.tmpQueue = mixminion.server.Queue.Queue(queueDir+"_tmp", 1, 1)
self.tmpQueue.removeAll()
return _MixmasterSMTPModuleDeliveryQueue(self, queueDir)
def processMessage(self, message, tag, exitType, smtpAddress):
- """Insert a message into the Mixmaster queue"""
+ """Insert a message into the Mixmaster queue"""
assert exitType == mixminion.Packet.SMTP_TYPE
- # parseSMTPInfo will raise a parse error if the mailbox is invalid.
+ # parseSMTPInfo will raise a parse error if the mailbox is invalid.
info = mixminion.Packet.parseSMTPInfo(smtpAddress)
msg = _escapeMessageForEmail(message, tag)
@@ -596,8 +596,8 @@
return DELIVER_OK
def flushMixmasterPool(self):
- """Send all pending messages from the Mixmaster queue. This
- should be called after invocations of processMessage."""
+ """Send all pending messages from the Mixmaster queue. This
+ should be called after invocations of processMessage."""
cmd = self.command
LOG.debug("Flushing Mixmaster pool")
os.spawnl(os.P_NOWAIT, cmd, cmd, "-S")
@@ -621,11 +621,11 @@
LOG.trace("Sending message via SMTP host %s to %s", server, toList)
con = smtplib.SMTP(server)
try:
- con.sendmail(fromAddr, toList, message)
- res = DELIVER_OK
+ con.sendmail(fromAddr, toList, message)
+ res = DELIVER_OK
except smtplib.SMTPException, e:
- LOG.warn("Unsuccessful smtp: "+str(e))
- res = DELIVER_FAIL_RETRY
+ LOG.warn("Unsuccessful smtp: "+str(e))
+ res = DELIVER_FAIL_RETRY
con.quit()
con.close()
@@ -640,28 +640,28 @@
boilerplate. Add a disclaimer if the message is not ascii.
msg -- A (possibly decoded) message
- tag -- One of: a 20-byte decoding tag [if the message is encrypted
+ tag -- One of: a 20-byte decoding tag [if the message is encrypted
or a reply]
- None [if the message is in plaintext]
+ None [if the message is in plaintext]
'err' [if the message was invalid.]
Returns None on an invalid message."""
m = _escapeMessage(msg, tag, text=1)
if m is None:
- return None
+ return None
code, msg, tag = m
if code == 'ENC':
- junk_msg = """\
+ junk_msg = """\
This message is not in plaintext. It's either 1) a reply; 2) a forward
message encrypted to you; or 3) junk.\n\n"""
else:
- junk_msg = ""
+ junk_msg = ""
if tag is not None:
- tag = "Decoding handle: "+tag+"\n"
+ tag = "Decoding handle: "+tag+"\n"
else:
- tag = ""
+ tag = ""
return """\
%s============ ANONYMOUS MESSAGE BEGINS
@@ -677,26 +677,26 @@
Returns None if the message is invalid.
message -- A (possibly decoded) message
- tag -- One of: a 20-byte decoding tag [if the message is encrypted
+ tag -- One of: a 20-byte decoding tag [if the message is encrypted
or a reply]
- None [if the message is in plaintext]
+ None [if the message is in plaintext]
'err' [if the message was invalid.]
text -- flag: if true, non-TXT messages must be base64-encoded.
"""
if tag == 'err':
- return None
+ return None
elif tag is not None:
- code = "ENC"
+ code = "ENC"
else:
- assert tag is None
- if isPrintingAscii(message, allowISO=1):
- code = "TXT"
- else:
- code = "BIN"
+ assert tag is None
+ if isPrintingAscii(message, allowISO=1):
+ code = "TXT"
+ else:
+ code = "BIN"
if text and (code != "TXT") :
- message = base64.encodestring(message)
+ message = base64.encodestring(message)
if text and tag:
- tag = base64.encodestring(tag).strip()
+ tag = base64.encodestring(tag).strip()
return code, message, tag
Index: PacketHandler.py
===================================================================
RCS file: /home/minion/cvsroot/src/minion/lib/mixminion/server/PacketHandler.py,v
retrieving revision 1.2
retrieving revision 1.3
diff -u -d -r1.2 -r1.3
--- PacketHandler.py 12 Dec 2002 19:56:47 -0000 1.2
+++ PacketHandler.py 16 Dec 2002 02:40:11 -0000 1.3
@@ -39,19 +39,19 @@
self.privatekey = privatekey
self.hashlog = hashlog
except TypeError:
- # Privatekey is not be subscriptable; we must have only one.
+ # Privatekey is not be subscriptable; we must have only one.
self.privatekey = (privatekey, )
self.hashlog = (hashlog, )
def syncLogs(self):
- """Sync all this PacketHandler's hashlogs."""
- for h in self.hashlog:
- h.sync()
+ """Sync all this PacketHandler's hashlogs."""
+ for h in self.hashlog:
+ h.sync()
def close(self):
- """Close all this PacketHandler's hashlogs."""
- for h in self.hashlog:
- h.close()
+ """Close all this PacketHandler's hashlogs."""
+ for h in self.hashlog:
+ h.close()
def processMessage(self, msg):
"""Given a 32K mixminion message, processes it completely.
@@ -148,7 +148,7 @@
return ("EXIT",
(rt, subh.getExitAddress(),
keys.get(Crypto.APPLICATION_KEY_MODE),
- subh.getTag(),
+ subh.getTag(),
payload))
# If we're not an exit node, make sure that what we recognize our
@@ -169,11 +169,11 @@
keys.getLionessKeys(Crypto.HEADER_ENCRYPT_MODE))
# If we're the swap node, (1) decrypt the payload with a hash of
- # header2... (2) decrypt header2 with a hash of the payload...
- # (3) and swap the headers.
+ # header2... (2) decrypt header2 with a hash of the payload...
+ # (3) and swap the headers.
if rt == Packet.SWAP_FWD_TYPE:
- hkey = Crypto.lioness_keys_from_header(header2)
- payload = Crypto.lioness_decrypt(payload, hkey)
+ hkey = Crypto.lioness_keys_from_header(header2)
+ payload = Crypto.lioness_decrypt(payload, hkey)
hkey = Crypto.lioness_keys_from_payload(payload)
header2 = Crypto.lioness_decrypt(header2, hkey)
Index: Queue.py
===================================================================
RCS file: /home/minion/cvsroot/src/minion/lib/mixminion/server/Queue.py,v
retrieving revision 1.3
retrieving revision 1.4
diff -u -d -r1.3 -r1.4
--- Queue.py 15 Dec 2002 05:55:30 -0000 1.3
+++ Queue.py 16 Dec 2002 02:40:11 -0000 1.4
@@ -17,7 +17,7 @@
from mixminion.Crypto import getCommonPRNG
__all__ = [ 'Queue', 'DeliveryQueue', 'TimedMixQueue', 'CottrellMixQueue',
- 'BinomialCottrellMixQueue' ]
+ 'BinomialCottrellMixQueue' ]
# Mode to pass to open(2) for creating a new file, and dying if it already
# exists.
@@ -77,13 +77,13 @@
if os.path.exists(location) and not os.path.isdir(location):
raise MixFatalError("%s is not a directory" % location)
- createPrivateDir(location, nocreate=(not create))
+ createPrivateDir(location, nocreate=(not create))
if scrub:
self.cleanQueue()
- # Count messages on first time through.
- self.n_entries = -1
+ # Count messages on first time through.
+ self.n_entries = -1
def queueMessage(self, contents):
"""Creates a new message in the queue whose contents are 'contents',
@@ -94,8 +94,8 @@
return handle
def queueObject(self, object):
- """Queue an object using cPickle, and return a handle to that
- object."""
+ """Queue an object using cPickle, and return a handle to that
+ object."""
f, handle = self.openNewMessage()
cPickle.dump(object, f, 1)
self.finishMessage(f, handle)
@@ -103,15 +103,15 @@
def count(self, recount=0):
"""Returns the number of complete messages in the queue."""
- if self.n_entries >= 0 and not recount:
- return self.n_entries
- else:
- res = 0
- for fn in os.listdir(self.dir):
- if fn.startswith("msg_"):
- res += 1
- self.n_entries = res
- return res
+ if self.n_entries >= 0 and not recount:
+ return self.n_entries
+ else:
+ res = 0
+ for fn in os.listdir(self.dir):
+ if fn.startswith("msg_"):
+ res += 1
+ self.n_entries = res
+ return res
def pickRandom(self, count=None):
"""Returns a list of 'count' handles to messages in this queue.
@@ -120,12 +120,12 @@
If there are fewer than 'count' messages in the queue, all the
messages will be retained."""
handles = [ fn[4:] for fn in os.listdir(self.dir)
- if fn.startswith("msg_") ]
+ if fn.startswith("msg_") ]
- return self.rng.shuffle(handles, count)
+ return self.rng.shuffle(handles, count)
def getAllMessages(self):
- """Returns handles for all messages currently in the queue.
+ """Returns handles for all messages currently in the queue.
Note: this ordering is not guaranteed to be random"""
return [fn[4:] for fn in os.listdir(self.dir) if fn.startswith("msg_")]
@@ -138,7 +138,7 @@
for m in os.listdir(self.dir):
if m[:4] in ('inp_', 'msg_'):
self.__changeState(m[4:], m[:3], "rmv")
- self.n_entries = 0
+ self.n_entries = 0
self.cleanQueue()
def moveMessage(self, handle, queue):
@@ -171,7 +171,7 @@
def getObject(self, handle):
"""Given a message handle, read and unpickle the contents of the
- corresponding message."""
+ corresponding message."""
f = open(os.path.join(self.dir, "msg_"+handle), 'rb')
res = cPickle.load(f)
f.close()
@@ -208,32 +208,32 @@
now = time.time()
cleanFile = os.path.join(self.dir,".cleaning")
- cleaning = 1
- while cleaning:
- try:
- # Try to get the .cleaning lock file. If we can create it,
- # we're the only cleaner around.
- fd = os.open(cleanFile, os.O_WRONLY+os.O_CREAT+os.O_EXCL, 0600)
- os.write(fd, str(now))
- os.close(fd)
- cleaning = 0
- except OSError:
- try:
- # If we can't create the file, see if it's too old. If it
- # is too old, delete it and try again. If it isn't, there
- # may be a live clean in progress.
- s = os.stat(cleanFile)
- if now - s[stat.ST_MTIME] > CLEAN_TIMEOUT:
- os.unlink(cleanFile)
- else:
- return 1
- except OSError:
- # If the 'stat' or 'unlink' calls above fail, then
- # .cleaning must not exist, or must not be readable
- # by us.
- if os.path.exists(cleanFile):
- # In the latter case, bail out.
- return 1
+ cleaning = 1
+ while cleaning:
+ try:
+ # Try to get the .cleaning lock file. If we can create it,
+ # we're the only cleaner around.
+ fd = os.open(cleanFile, os.O_WRONLY+os.O_CREAT+os.O_EXCL, 0600)
+ os.write(fd, str(now))
+ os.close(fd)
+ cleaning = 0
+ except OSError:
+ try:
+ # If we can't create the file, see if it's too old. If it
+ # is too old, delete it and try again. If it isn't, there
+ # may be a live clean in progress.
+ s = os.stat(cleanFile)
+ if now - s[stat.ST_MTIME] > CLEAN_TIMEOUT:
+ os.unlink(cleanFile)
+ else:
+ return 1
+ except OSError:
+ # If the 'stat' or 'unlink' calls above fail, then
+ # .cleaning must not exist, or must not be readable
+ # by us.
+ if os.path.exists(cleanFile):
+ # In the latter case, bail out.
+ return 1
rmv = []
allowedTime = int(time.time()) - INPUT_TIMEOUT
@@ -245,7 +245,7 @@
if s[stat.ST_MTIME] < allowedTime:
self.__changeState(m[4:], "inp", "rmv")
rmv.append(os.path.join(self.dir, m))
- secureDelete(rmv, blocking=1)
+ secureDelete(rmv, blocking=1)
return 0
def __changeState(self, handle, s1, s2):
@@ -253,12 +253,12 @@
to 's2', and changes the internal count."""
os.rename(os.path.join(self.dir, s1+"_"+handle),
os.path.join(self.dir, s2+"_"+handle))
- if self.n_entries < 0:
- return
- if s1 == 'msg' and s2 != 'msg':
- self.n_entries -= 1
- elif s1 != 'msg' and s2 == 'msg':
- self.n_entries += 1
+ if self.n_entries < 0:
+ return
+ if s1 == 'msg' and s2 != 'msg':
+ self.n_entries -= 1
+ elif s1 != 'msg' and s2 == 'msg':
+ self.n_entries += 1
def __newHandle(self):
"""Helper method: creates a new random handle."""
@@ -292,85 +292,85 @@
# currently sending.
def __init__(self, location):
- Queue.__init__(self, location, create=1, scrub=1)
- self._rescan()
+ Queue.__init__(self, location, create=1, scrub=1)
+ self._rescan()
def _rescan(self):
- """Rebuild the internal state of this queue from the underlying
+ """Rebuild the internal state of this queue from the underlying
directory."""
- self.pending = {}
- self.sendable = self.getAllMessages()
+ self.pending = {}
+ self.sendable = self.getAllMessages()
def queueMessage(self, msg):
- if 1: raise MixError("Tried to call DeliveryQueue.queueMessage.")
+ if 1: raise MixError("Tried to call DeliveryQueue.queueMessage.")
def queueDeliveryMessage(self, addr, msg, retry=0):
- """Schedule a message for delivery.
- addr -- An object to indicate the message's destination
- msg -- the message itself
- retry -- how many times so far have we tried to send?"""
+ """Schedule a message for delivery.
+ addr -- An object to indicate the message's destination
+ msg -- the message itself
+ retry -- how many times so far have we tried to send?"""
handle = self.queueObject( (retry, addr, msg) )
- self.sendable.append(handle)
+ self.sendable.append(handle)
- return handle
+ return handle
def get(self,handle):
- """Returns a (n_retries, addr, msg) payload for a given
- message handle."""
+ """Returns a (n_retries, addr, msg) payload for a given
+ message handle."""
return self.getObject(handle)
def sendReadyMessages(self):
- """Sends all messages which are not already being sent."""
+ """Sends all messages which are not already being sent."""
- handles = self.sendable
- messages = []
- self.sendable = []
- for h in handles:
- retries, addr, msg = self.getObject(h)
- messages.append((h, addr, msg, retries))
- self.pending[h] = 1
- if messages:
- self._deliverMessages(messages)
+ handles = self.sendable
+ messages = []
+ self.sendable = []
+ for h in handles:
+ retries, addr, msg = self.getObject(h)
+ messages.append((h, addr, msg, retries))
+ self.pending[h] = 1
+ if messages:
+ self._deliverMessages(messages)
def _deliverMessages(self, msgList):
- """Abstract method; Invoked with a list of
- (handle, addr, message, n_retries) tuples every time we have a batch
- of messages to send.
+ """Abstract method; Invoked with a list of
+ (handle, addr, message, n_retries) tuples every time we have a batch
+ of messages to send.
For every handle in the list, delierySucceeded or deliveryFailed
- should eventually be called, or the message will sit in the queue
- indefinitely, without being retried."""
+ should eventually be called, or the message will sit in the queue
+ indefinitely, without being retried."""
# We could implement this as a single _deliverMessage(h,addr,m,n)
- # method, but that wouldn't allow implementations to batch
- # messages being sent to the same address.
+ # method, but that wouldn't allow implementations to batch
+ # messages being sent to the same address.
- raise NotImplementedError("_deliverMessages")
+ raise NotImplementedError("_deliverMessages")
def deliverySucceeded(self, handle):
- """Removes a message from the outgoing queue. This method
- should be invoked after the corresponding message has been
- successfully delivered.
+ """Removes a message from the outgoing queue. This method
+ should be invoked after the corresponding message has been
+ successfully delivered.
"""
- self.removeMessage(handle)
- del self.pending[handle]
+ self.removeMessage(handle)
+ del self.pending[handle]
def deliveryFailed(self, handle, retriable=0):
- """Removes a message from the outgoing queue, or requeues it
- for delivery at a later time. This method should be
- invoked after the corresponding message has been
- successfully delivered."""
- del self.pending[handle]
- if retriable:
- # Queue the new one before removing the old one, for
- # crash-proofness
- retries, addr, msg = self.getObject(handle)
- # FFFF This test makes us never retry past the 10th attempt.
- # FFFF That's wrong; we should be smarter.
- if retries <= 10:
- self.queueDeliveryMessage(addr, msg, retries+1)
- self.removeMessage(handle)
+ """Removes a message from the outgoing queue, or requeues it
+ for delivery at a later time. This method should be
+ invoked after the corresponding message has been
+ successfully delivered."""
+ del self.pending[handle]
+ if retriable:
+ # Queue the new one before removing the old one, for
+ # crash-proofness
+ retries, addr, msg = self.getObject(handle)
+ # FFFF This test makes us never retry past the 10th attempt.
+ # FFFF That's wrong; we should be smarter.
+ if retries <= 10:
+ self.queueDeliveryMessage(addr, msg, retries+1)
+ self.removeMessage(handle)
class TimedMixQueue(Queue):
"""A TimedMixQueue holds a group of files, and returns some of them
@@ -380,18 +380,18 @@
## Fields:
# interval: scanning interval, in seconds.
def __init__(self, location, interval=600):
- """Create a TimedMixQueue that sends its entire batch of messages
- every 'interval' seconds."""
+ """Create a TimedMixQueue that sends its entire batch of messages
+ every 'interval' seconds."""
Queue.__init__(self, location, create=1, scrub=1)
- self.interval = interval
+ self.interval = interval
def getBatch(self):
- """Return handles for all messages that the pool is currently ready
- to send in the next batch"""
- return self.pickRandom()
+ """Return handles for all messages that the pool is currently ready
+ to send in the next batch"""
+ return self.pickRandom()
def getInterval(self):
- return self.interval
+ return self.interval
class CottrellMixQueue(TimedMixQueue):
"""A CottrellMixQueue holds a group of files, and returns some of them
@@ -405,61 +405,61 @@
# sending.
# sendRate: Largest fraction of the pool to send at a time.
def __init__(self, location, interval=600, minPool=6, minSend=1,
- sendRate=.7):
- """Create a new queue that yields a batch of message every 'interval'
- seconds, always keeps <minPool> messages in the pool, never sends
- unless it has <minPool>+<minSend> messages, and never sends more
- than <sendRate> * the corrent pool size.
+ sendRate=.7):
+ """Create a new queue that yields a batch of message every 'interval'
+ seconds, always keeps <minPool> messages in the pool, never sends
+ unless it has <minPool>+<minSend> messages, and never sends more
+ than <sendRate> * the corrent pool size.
- If 'minSend'==1, this is a real Cottrell (type-II) mix pool.
- Otherwise, this is a generic 'timed dynamic-pool' mix pool. (Note
- that there is still a matter of some controversy whether it ever
- makes sense to set minSend != 1.)
- """
+ If 'minSend'==1, this is a real Cottrell (type-II) mix pool.
+ Otherwise, this is a generic 'timed dynamic-pool' mix pool. (Note
+ that there is still a matter of some controversy whether it ever
+ makes sense to set minSend != 1.)
+ """
# Note that there was a bit of confusion here: earlier versions
- # implemented an algorithm called "mixmaster" that wasn't actually the
- # mixmaster algorithm. I picked up the other algorithm from an early
- # draft of Roger, Paul, and Andrei's 'Batching Taxonomy' paper (since
- # corrected); they seem to have gotten it from Anja Jerichow's
- # Phd. thesis ("Generalisation and Security Improvement of
- # Mix-mediated Anonymous Communication") of 2000.
- #
- # *THIS* is the algorithm that the current 'Batching Taxonomy' paper
- # says that Cottrell says is the real thing.
+ # implemented an algorithm called "mixmaster" that wasn't actually the
+ # mixmaster algorithm. I picked up the other algorithm from an early
+ # draft of Roger, Paul, and Andrei's 'Batching Taxonomy' paper (since
+ # corrected); they seem to have gotten it from Anja Jerichow's
+ # Phd. thesis ("Generalisation and Security Improvement of
+ # Mix-mediated Anonymous Communication") of 2000.
+ #
+ # *THIS* is the algorithm that the current 'Batching Taxonomy' paper
+ # says that Cottrell says is the real thing.
- TimedMixQueue.__init__(self, location, interval)
- self.minPool = minPool
- self.minSend = minSend
- self.sendRate = sendRate
+ TimedMixQueue.__init__(self, location, interval)
+ self.minPool = minPool
+ self.minSend = minSend
+ self.sendRate = sendRate
def _getBatchSize(self):
- "Helper method: returns the number of messages to send."
- pool = self.count()
- if pool >= (self.minPool + self.minSend):
- sendable = pool - self.minPool
- return min(sendable, max(1, int(pool * self.sendRate)))
- else:
- return 0
+ "Helper method: returns the number of messages to send."
+ pool = self.count()
+ if pool >= (self.minPool + self.minSend):
+ sendable = pool - self.minPool
+ return min(sendable, max(1, int(pool * self.sendRate)))
+ else:
+ return 0
def getBatch(self):
- "Returns a list of handles for the next batch of messages to send."
- n = self._getBatchSize()
- if n:
- return self.pickRandom(n)
- else:
- return []
+ "Returns a list of handles for the next batch of messages to send."
+ n = self._getBatchSize()
+ if n:
+ return self.pickRandom(n)
+ else:
+ return []
class BinomialCottrellMixQueue(CottrellMixQueue):
"""Same algorithm as CottrellMixQueue, but instead of sending N messages
from the pool of size P, sends each message with probability N/P."""
def getBatch(self):
- n = self._getBatchSize()
- if n == 0:
- return []
- msgProbability = n / float(self.count())
- return self.rng.shuffle([ h for h in self.getAllMessages()
- if self.rng.getFloat() < msgProbability ])
+ n = self._getBatchSize()
+ if n == 0:
+ return []
+ msgProbability = n / float(self.count())
+ return self.rng.shuffle([ h for h in self.getAllMessages()
+ if self.rng.getFloat() < msgProbability ])
def _secureDelete_bg(files, cleanFile):
"""Helper method: delete files in another thread, removing 'cleanFile'
Index: ServerConfig.py
===================================================================
RCS file: /home/minion/cvsroot/src/minion/lib/mixminion/server/ServerConfig.py,v
retrieving revision 1.2
retrieving revision 1.3
diff -u -d -r1.2 -r1.3
--- ServerConfig.py 15 Dec 2002 05:55:30 -0000 1.2
+++ ServerConfig.py 16 Dec 2002 02:40:11 -0000 1.3
@@ -19,69 +19,69 @@
#
_restrictFormat = 0
def __init__(self, fname=None, string=None, moduleManager=None):
- # We use a copy of SERVER_SYNTAX, because the ModuleManager will
- # mess it up.
+ # We use a copy of SERVER_SYNTAX, because the ModuleManager will
+ # mess it up.
self._syntax = SERVER_SYNTAX.copy()
- if moduleManager is None:
- self.moduleManager = mixminion.server.Modules.ModuleManager()
- else:
- self.moduleManager = moduleManager
+ if moduleManager is None:
+ self.moduleManager = mixminion.server.Modules.ModuleManager()
+ else:
+ self.moduleManager = moduleManager
self._addCallback("Server", self.__loadModules)
mixminion.Config._ConfigFile.__init__(self, fname, string)
def validate(self, sections, entries, lines, contents):
- # Pre-emptively configure the log before validation, so we don't
- # write to the terminal if we've been asked not to.
- if not sections['Server'].get("EchoMessages", 0):
- LOG.handlers = []
- # ???? This can't be the best way to do this.
+ # Pre-emptively configure the log before validation, so we don't
+ # write to the terminal if we've been asked not to.
+ if not sections['Server'].get("EchoMessages", 0):
+ LOG.handlers = []
+ # ???? This can't be the best way to do this.
- # Now, validate the host section.
- mixminion.Config._validateHostSection(sections.get('Host', {}))
- # Server section
- server = sections['Server']
- bits = server['IdentityKeyBits']
- if not (2048 <= bits <= 4096):
- raise ConfigError("IdentityKeyBits must be between 2048 and 4096")
- if server['EncryptIdentityKey']:
- LOG.warn("Identity key encryption not yet implemented")
- if server['EncryptPrivateKey']:
- LOG.warn("Encrypted private keys not yet implemented")
- if server['PublicKeyLifetime'][2] < 24*60*60:
- raise ConfigError("PublicKeyLifetime must be at least 1 day.")
- if server['PublicKeySloppiness'][2] > 20*60:
- raise ConfigError("PublicKeySloppiness must be <= 20 minutes.")
- if [e for e in entries['Server'] if e[0]=='Mode']:
- LOG.warn("Mode specification is not yet supported.")
+ # Now, validate the host section.
+ mixminion.Config._validateHostSection(sections.get('Host', {}))
+ # Server section
+ server = sections['Server']
+ bits = server['IdentityKeyBits']
+ if not (2048 <= bits <= 4096):
+ raise ConfigError("IdentityKeyBits must be between 2048 and 4096")
+ if server['EncryptIdentityKey']:
+ LOG.warn("Identity key encryption not yet implemented")
+ if server['EncryptPrivateKey']:
+ LOG.warn("Encrypted private keys not yet implemented")
+ if server['PublicKeyLifetime'][2] < 24*60*60:
+ raise ConfigError("PublicKeyLifetime must be at least 1 day.")
+ if server['PublicKeySloppiness'][2] > 20*60:
+ raise ConfigError("PublicKeySloppiness must be <= 20 minutes.")
+ if [e for e in entries['Server'] if e[0]=='Mode']:
+ LOG.warn("Mode specification is not yet supported.")
- if not sections['Incoming/MMTP'].get('Enabled'):
- LOG.warn("Disabling incoming MMTP is not yet supported.")
- if [e for e in entries['Incoming/MMTP'] if e[0] in ('Allow', 'Deny')]:
- LOG.warn("Allow/deny are not yet supported")
+ if not sections['Incoming/MMTP'].get('Enabled'):
+ LOG.warn("Disabling incoming MMTP is not yet supported.")
+ if [e for e in entries['Incoming/MMTP'] if e[0] in ('Allow', 'Deny')]:
+ LOG.warn("Allow/deny are not yet supported")
- if not sections['Outgoing/MMTP'].get('Enabled'):
- LOG.warn("Disabling incoming MMTP is not yet supported.")
- if [e for e in entries['Outgoing/MMTP'] if e[0] in ('Allow', 'Deny')]:
- LOG.warn("Allow/deny are not yet supported")
+ if not sections['Outgoing/MMTP'].get('Enabled'):
+ LOG.warn("Disabling incoming MMTP is not yet supported.")
+ if [e for e in entries['Outgoing/MMTP'] if e[0] in ('Allow', 'Deny')]:
+ LOG.warn("Allow/deny are not yet supported")
self.moduleManager.validate(sections, entries, lines, contents)
def __loadModules(self, section, sectionEntries):
- """Callback from the [Server] section of a config file. Parses
- the module options, and adds new sections to the syntax
- accordingly."""
+ """Callback from the [Server] section of a config file. Parses
+ the module options, and adds new sections to the syntax
+ accordingly."""
self.moduleManager.setPath(section.get('ModulePath', None))
for mod in section.get('Module', []):
- LOG.info("Loading module %s", mod)
+ LOG.info("Loading module %s", mod)
self.moduleManager.loadExtModule(mod)
self._syntax.update(self.moduleManager.getConfigSyntax())
def getModuleManager(self):
- "Return the module manager initialized by this server."
- return self.moduleManager
+ "Return the module manager initialized by this server."
+ return self.moduleManager
# alias to make the syntax more terse.
C = mixminion.Config
@@ -93,9 +93,9 @@
'LogFile' : ('ALLOW', None, None),
'LogLevel' : ('ALLOW', C._parseSeverity, "WARN"),
'EchoMessages' : ('ALLOW', C._parseBoolean, "no"),
- 'NoDaemon' : ('ALLOW', C._parseBoolean, "no"),
+ 'NoDaemon' : ('ALLOW', C._parseBoolean, "no"),
'EncryptIdentityKey' : ('REQUIRE', C._parseBoolean, "yes"),
- 'IdentityKeyBits': ('ALLOW', C._parseInt, "2048"),
+ 'IdentityKeyBits': ('ALLOW', C._parseInt, "2048"),
'PublicKeyLifetime' : ('ALLOW', C._parseInterval,
"30 days"),
'PublicKeySloppiness': ('ALLOW', C._parseInterval,
@@ -112,18 +112,18 @@
'Publish' : ('ALLOW', C._parseBoolean, "no"),
'MaxSkew' : ('ALLOW', C._parseInterval,
"10 minutes",) },
- # FFFF Generic multi-port listen/publish options.
+ # FFFF Generic multi-port listen/publish options.
'Incoming/MMTP' : { 'Enabled' : ('REQUIRE', C._parseBoolean, "no"),
- 'IP' : ('ALLOW', C._parseIP, "0.0.0.0"),
+ 'IP' : ('ALLOW', C._parseIP, "0.0.0.0"),
'Port' : ('ALLOW', C._parseInt, "48099"),
'Allow' : ('ALLOW*', C._parseAddressSet_allow, None),
'Deny' : ('ALLOW*', C._parseAddressSet_deny, None) },
'Outgoing/MMTP' : { 'Enabled' : ('REQUIRE', C._parseBoolean, "no"),
'Allow' : ('ALLOW*', C._parseAddressSet_allow, None),
'Deny' : ('ALLOW*', C._parseAddressSet_deny, None) },
- # FFFF Missing: Queue-Size / Queue config options
- # FFFF timeout options
- # FFFF listen timeout??
- # FFFF Retry options
- # FFFF pool options
+ # FFFF Missing: Queue-Size / Queue config options
+ # FFFF timeout options
+ # FFFF listen timeout??
+ # FFFF Retry options
+ # FFFF pool options
}
Index: ServerKeys.py
===================================================================
RCS file: /home/minion/cvsroot/src/minion/lib/mixminion/server/ServerKeys.py,v
retrieving revision 1.2
retrieving revision 1.3
diff -u -d -r1.2 -r1.3
--- ServerKeys.py 15 Dec 2002 04:35:55 -0000 1.2
+++ ServerKeys.py 16 Dec 2002 02:40:11 -0000 1.3
@@ -65,105 +65,105 @@
# FFFF Support to put keys/queues in separate directories.
def __init__(self, config):
- "Create a ServerKeyring from a config object"
- self.configure(config)
+ "Create a ServerKeyring from a config object"
+ self.configure(config)
def configure(self, config):
- "Set up a SeverKeyring from a config object"
- self.config = config
- self.homeDir = config['Server']['Homedir']
- self.keyDir = os.path.join(self.homeDir, 'keys')
- self.hashDir = os.path.join(self.homeDir, 'work', 'hashlogs')
- self.keySloppiness = config['Server']['PublicKeySloppiness'][2]
- self.checkKeys()
+ "Set up a SeverKeyring from a config object"
+ self.config = config
+ self.homeDir = config['Server']['Homedir']
+ self.keyDir = os.path.join(self.homeDir, 'keys')
+ self.hashDir = os.path.join(self.homeDir, 'work', 'hashlogs')
+ self.keySloppiness = config['Server']['PublicKeySloppiness'][2]
+ self.checkKeys()
def checkKeys(self):
- """Internal method: read information about all this server's
- currently-prepared keys from disk."""
+ """Internal method: read information about all this server's
+ currently-prepared keys from disk."""
self.keyIntervals = []
- firstKey = sys.maxint
- lastKey = 0
+ firstKey = sys.maxint
+ lastKey = 0
- LOG.debug("Scanning server keystore at %s", self.keyDir)
+ LOG.debug("Scanning server keystore at %s", self.keyDir)
- if not os.path.exists(self.keyDir):
- LOG.info("Creating server keystore at %s", self.keyDir)
- createPrivateDir(self.keyDir)
+ if not os.path.exists(self.keyDir):
+ LOG.info("Creating server keystore at %s", self.keyDir)
+ createPrivateDir(self.keyDir)
- # Iterate over the entires in HOME/keys
+ # Iterate over the entires in HOME/keys
for dirname in os.listdir(self.keyDir):
- # Skip any that aren't directories named "key_INT"
- if not os.path.isdir(os.path.join(self.keyDir,dirname)):
- continue
+ # Skip any that aren't directories named "key_INT"
+ if not os.path.isdir(os.path.join(self.keyDir,dirname)):
+ continue
if not dirname.startswith('key_'):
- LOG.warn("Unexpected directory %s under %s",
- dirname, self.keyDir)
+ LOG.warn("Unexpected directory %s under %s",
+ dirname, self.keyDir)
continue
keysetname = dirname[4:]
- try:
- setNum = int(keysetname)
- # keep trace of the first and last used key number
- if setNum < firstKey: firstKey = setNum
- if setNum > lastKey: lastKey = setNum
- except ValueError:
- LOG.warn("Unexpected directory %s under %s",
- dirname, self.keyDir)
- continue
+ try:
+ setNum = int(keysetname)
+ # keep trace of the first and last used key number
+ if setNum < firstKey: firstKey = setNum
+ if setNum > lastKey: lastKey = setNum
+ except ValueError:
+ LOG.warn("Unexpected directory %s under %s",
+ dirname, self.keyDir)
+ continue
- # Find the server descriptor...
+ # Find the server descriptor...
d = os.path.join(self.keyDir, dirname)
si = os.path.join(d, "ServerDesc")
if os.path.exists(si):
inf = ServerInfo(fname=si, assumeValid=1)
- # And find out when it's valid.
+ # And find out when it's valid.
t1 = inf['Server']['Valid-After']
t2 = inf['Server']['Valid-Until']
self.keyIntervals.append( (t1, t2, keysetname) )
- LOG.debug("Found key %s (valid from %s to %s)",
- dirname, formatDate(t1), formatDate(t2))
- else:
- LOG.warn("No server descriptor found for key %s"%dirname)
+ LOG.debug("Found key %s (valid from %s to %s)",
+ dirname, formatDate(t1), formatDate(t2))
+ else:
+ LOG.warn("No server descriptor found for key %s"%dirname)
- # Now, sort the key intervals by starting time.
+ # Now, sort the key intervals by starting time.
self.keyIntervals.sort()
- self.keyRange = (firstKey, lastKey)
+ self.keyRange = (firstKey, lastKey)
- # Now we try to see whether we have more or less than 1 key in effect
- # for a given time.
- for idx in xrange(len(self.keyIntervals)-1):
- end = self.keyIntervals[idx][1]
- start = self.keyIntervals[idx+1][0]
- if start < end:
- LOG.warn("Multiple keys for %s. That's unsupported.",
- formatDate(end))
- elif start > end:
- LOG.warn("Gap in key schedule: no key from %s to %s",
- formatDate(end), formatDate(start))
+ # Now we try to see whether we have more or less than 1 key in effect
+ # for a given time.
+ for idx in xrange(len(self.keyIntervals)-1):
+ end = self.keyIntervals[idx][1]
+ start = self.keyIntervals[idx+1][0]
+ if start < end:
+ LOG.warn("Multiple keys for %s. That's unsupported.",
+ formatDate(end))
+ elif start > end:
+ LOG.warn("Gap in key schedule: no key from %s to %s",
+ formatDate(end), formatDate(start))
- self.nextKeyRotation = 0 # Make sure that now > nextKeyRotation before
- # we call _getLiveKey()
- self._getLiveKey() # Set up liveKey, nextKeyRotation.
+ self.nextKeyRotation = 0 # Make sure that now > nextKeyRotation before
+ # we call _getLiveKey()
+ self._getLiveKey() # Set up liveKey, nextKeyRotation.
def getIdentityKey(self):
- """Return this server's identity key. Generate one if it doesn't
- exist."""
- password = None # FFFF Use this, somehow.
- fn = os.path.join(self.keyDir, "identity.key")
- bits = self.config['Server']['IdentityKeyBits']
- if os.path.exists(fn):
- key = mixminion.Crypto.pk_PEM_load(fn, password)
- keylen = key.get_modulus_bytes()*8
- if keylen != bits:
- LOG.warn(
- "Stored identity key has %s bits, but you asked for %s.",
- keylen, bits)
- else:
- LOG.info("Generating identity key. (This may take a while.)")
- key = mixminion.Crypto.pk_generate(bits)
- mixminion.Crypto.pk_PEM_save(key, fn, password)
- LOG.info("Generated %s-bit identity key.", bits)
+ """Return this server's identity key. Generate one if it doesn't
+ exist."""
+ password = None # FFFF Use this, somehow.
+ fn = os.path.join(self.keyDir, "identity.key")
+ bits = self.config['Server']['IdentityKeyBits']
+ if os.path.exists(fn):
+ key = mixminion.Crypto.pk_PEM_load(fn, password)
+ keylen = key.get_modulus_bytes()*8
+ if keylen != bits:
+ LOG.warn(
+ "Stored identity key has %s bits, but you asked for %s.",
+ keylen, bits)
+ else:
+ LOG.info("Generating identity key. (This may take a while.)")
+ key = mixminion.Crypto.pk_generate(bits)
+ mixminion.Crypto.pk_PEM_save(key, fn, password)
+ LOG.info("Generated %s-bit identity key.", bits)
- return key
+ return key
def removeIdentityKey(self):
"""Remove this server's identity key."""
@@ -176,58 +176,58 @@
LOG.warn("Removing identity key")
secureDelete([fn], blocking=1)
- dhfile = os.path.join(self.homeDir, 'work', 'tls', 'dhparam')
+ dhfile = os.path.join(self.homeDir, 'work', 'tls', 'dhparam')
if os.path.exists('dhfile'):
LOG.info("Removing diffie-helman parameters file")
secureDelete([dhfile], blocking=1)
def createKeys(self, num=1, startAt=None):
- """Generate 'num' public keys for this server. If startAt is provided,
+ """Generate 'num' public keys for this server. If startAt is provided,
make the first key become valid at'startAt'. Otherwise, make the
- first key become valid right after the last key we currently have
- expires. If we have no keys now, make the first key start now."""
+ first key become valid right after the last key we currently have
+ expires. If we have no keys now, make the first key start now."""
# FFFF Use this.
- #password = None
+ #password = None
- if startAt is None:
- if self.keyIntervals:
- startAt = self.keyIntervals[-1][1]+60
- else:
- startAt = time.time()+60
+ if startAt is None:
+ if self.keyIntervals:
+ startAt = self.keyIntervals[-1][1]+60
+ else:
+ startAt = time.time()+60
- startAt = previousMidnight(startAt)
+ startAt = previousMidnight(startAt)
- firstKey, lastKey = self.keyRange
+ firstKey, lastKey = self.keyRange
- for _ in xrange(num):
- if firstKey == sys.maxint:
- keynum = firstKey = lastKey = 1
- elif firstKey > 1:
- firstKey -= 1
- keynum = firstKey
- else:
- lastKey += 1
- keynum = lastKey
+ for _ in xrange(num):
+ if firstKey == sys.maxint:
+ keynum = firstKey = lastKey = 1
+ elif firstKey > 1:
+ firstKey -= 1
+ keynum = firstKey
+ else:
+ lastKey += 1
+ keynum = lastKey
- keyname = "%04d" % keynum
+ keyname = "%04d" % keynum
- nextStart = startAt + self.config['Server']['PublicKeyLifetime'][2]
+ nextStart = startAt + self.config['Server']['PublicKeyLifetime'][2]
- LOG.info("Generating key %s to run from %s through %s (GMT)",
- keyname, formatDate(startAt),
- formatDate(nextStart-3600))
- generateServerDescriptorAndKeys(config=self.config,
- identityKey=self.getIdentityKey(),
- keyname=keyname,
- keydir=self.keyDir,
- hashdir=self.hashDir,
- validAt=startAt)
- startAt = nextStart
+ LOG.info("Generating key %s to run from %s through %s (GMT)",
+ keyname, formatDate(startAt),
+ formatDate(nextStart-3600))
+ generateServerDescriptorAndKeys(config=self.config,
+ identityKey=self.getIdentityKey(),
+ keyname=keyname,
+ keydir=self.keyDir,
+ hashdir=self.hashDir,
+ validAt=startAt)
+ startAt = nextStart
self.checkKeys()
def removeDeadKeys(self, now=None):
- """Remove all keys that have expired"""
+ """Remove all keys that have expired"""
self.checkKeys()
if now is None:
@@ -237,84 +237,84 @@
expiryStr = ""
cutoff = now - self.keySloppiness
- dirs = [ os.path.join(self.keyDir,"key_"+name)
+ dirs = [ os.path.join(self.keyDir,"key_"+name)
for va, vu, name in self.keyIntervals if vu < cutoff ]
- for dirname, (va, vu, name) in zip(dirs, self.keyIntervals):
+ for dirname, (va, vu, name) in zip(dirs, self.keyIntervals):
LOG.info("Removing%s key %s (valid from %s through %s)",
expiryStr, name, formatDate(va), formatDate(vu-3600))
- files = [ os.path.join(dirname,f)
+ files = [ os.path.join(dirname,f)
for f in os.listdir(dirname) ]
- secureDelete(files, blocking=1)
- os.rmdir(dirname)
+ secureDelete(files, blocking=1)
+ os.rmdir(dirname)
- self.checkKeys()
+ self.checkKeys()
def _getLiveKey(self, when=None):
- """Find the first key that is now valid. Return (Valid-after,
- valid-util, name)."""
+ """Find the first key that is now valid. Return (Valid-after,
+ valid-util, name)."""
if not self.keyIntervals:
- self.liveKey = None
- self.nextKeyRotation = 0
- return None
+ self.liveKey = None
+ self.nextKeyRotation = 0
+ return None
- w = when
- if when is None:
- when = time.time()
- if when < self.nextKeyRotation:
- return self.liveKey
+ w = when
+ if when is None:
+ when = time.time()
+ if when < self.nextKeyRotation:
+ return self.liveKey
- idx = bisect.bisect(self.keyIntervals, (when, None, None))-1
- k = self.keyIntervals[idx]
- if w is None:
- self.liveKey = k
- self.nextKeyRotation = k[1]
+ idx = bisect.bisect(self.keyIntervals, (when, None, None))-1
+ k = self.keyIntervals[idx]
+ if w is None:
+ self.liveKey = k
+ self.nextKeyRotation = k[1]
- return k
+ return k
def getNextKeyRotation(self):
- """Return the expiration time of the current key"""
+ """Return the expiration time of the current key"""
return self.nextKeyRotation
def getServerKeyset(self):
- """Return a ServerKeyset object for the currently live key."""
- # FFFF Support passwords on keys
- _, _, name = self._getLiveKey()
- keyset = ServerKeyset(self.keyDir, name, self.hashDir)
- keyset.load()
- return keyset
+ """Return a ServerKeyset object for the currently live key."""
+ # FFFF Support passwords on keys
+ _, _, name = self._getLiveKey()
+ keyset = ServerKeyset(self.keyDir, name, self.hashDir)
+ keyset.load()
+ return keyset
def getDHFile(self):
- """Return the filename for the diffie-helman parameters for the
- server. Creates the file if it doesn't yet exist."""
- dhdir = os.path.join(self.homeDir, 'work', 'tls')
- createPrivateDir(dhdir)
- dhfile = os.path.join(dhdir, 'dhparam')
+ """Return the filename for the diffie-helman parameters for the
+ server. Creates the file if it doesn't yet exist."""
+ dhdir = os.path.join(self.homeDir, 'work', 'tls')
+ createPrivateDir(dhdir)
+ dhfile = os.path.join(dhdir, 'dhparam')
if not os.path.exists(dhfile):
LOG.info("Generating Diffie-Helman parameters for TLS...")
mixminion._minionlib.generate_dh_parameters(dhfile, verbose=0)
LOG.info("...done")
- else:
- LOG.debug("Using existing Diffie-Helman parameter from %s",
- dhfile)
+ else:
+ LOG.debug("Using existing Diffie-Helman parameter from %s",
+ dhfile)
return dhfile
def getTLSContext(self):
- """Create and return a TLS context from the currently live key."""
+ """Create and return a TLS context from the currently live key."""
keys = self.getServerKeyset()
return mixminion._minionlib.TLSContext_new(keys.getCertFileName(),
- keys.getMMTPKey(),
- self.getDHFile())
+ keys.getMMTPKey(),
+ self.getDHFile())
def getPacketHandler(self):
- """Create and return a PacketHandler from the currently live key."""
+ """Create and return a PacketHandler from the currently live key."""
keys = self.getServerKeyset()
- packetKey = keys.getPacketKey()
- hashlog = mixminion.server.HashLog.HashLog(keys.getHashLogFileName(),
- keys.getMMTPKeyID())
+ packetKey = keys.getPacketKey()
+ hashlog = mixminion.server.HashLog.HashLog(keys.getHashLogFileName(),
+ keys.getMMTPKeyID())
return mixminion.server.PacketHandler.PacketHandler(packetKey,
- hashlog)
+ hashlog)
#----------------------------------------------------------------------
@@ -339,21 +339,21 @@
#
# packetKey, mmtpKey: This server's actual short-term keys.
def __init__(self, keyroot, keyname, hashroot):
- """Load a set of keys named "keyname" on a server where all keys
- are stored under the directory "keyroot" and hashlogs are stored
- under "hashroot". """
- keydir = os.path.join(keyroot, "key_"+keyname)
- self.hashlogFile = os.path.join(hashroot, "hash_"+keyname)
- self.packetKeyFile = os.path.join(keydir, "mix.key")
- self.mmtpKeyFile = os.path.join(keydir, "mmtp.key")
- self.certFile = os.path.join(keydir, "mmtp.cert")
+ """Load a set of keys named "keyname" on a server where all keys
+ are stored under the directory "keyroot" and hashlogs are stored
+ under "hashroot". """
+ keydir = os.path.join(keyroot, "key_"+keyname)
+ self.hashlogFile = os.path.join(hashroot, "hash_"+keyname)
+ self.packetKeyFile = os.path.join(keydir, "mix.key")
+ self.mmtpKeyFile = os.path.join(keydir, "mmtp.key")
+ self.certFile = os.path.join(keydir, "mmtp.cert")
self.descFile = os.path.join(keydir, "ServerDesc")
if not os.path.exists(keydir):
- createPrivateDir(keydir)
+ createPrivateDir(keydir)
def load(self, password=None):
"""Read the short-term keys from disk. Must be called before
- getPacketKey or getMMTPKey."""
+ getPacketKey or getMMTPKey."""
self.packetKey = mixminion.Crypto.pk_PEM_load(self.packetKeyFile,
password)
self.mmtpKey = mixminion.Crypto.pk_PEM_load(self.mmtpKeyFile,
@@ -371,7 +371,7 @@
def getMMTPKey(self): return self.mmtpKey
def getMMTPKeyID(self):
"Return the sha1 hash of the asn1 encoding of the MMTP public key"
- return mixminion.Crypto.sha1(self.mmtpKey.encode_key(1))
+ return mixminion.Crypto.sha1(self.mmtpKey.encode_key(1))
#----------------------------------------------------------------------
# Functionality to generate keys and server descriptors
@@ -390,7 +390,7 @@
identityKey -- This server's private identity key
keydir -- The root directory for storing key sets.
keyname -- The name of this new key set within keydir
- hashdir -- The root directory for storing hash logs.
+ hashdir -- The root directory for storing hash logs.
validAt -- The starting time (in seconds) for this key's lifetime."""
# First, we generate both of our short-term keys...
@@ -413,7 +413,7 @@
nickname = socket.gethostname()
if not nickname or nickname.lower().startswith("localhost"):
nickname = config['Incoming/MMTP'].get('IP', "<Unknown host>")
- LOG.warn("No nickname given: defaulting to %r", nickname)
+ LOG.warn("No nickname given: defaulting to %r", nickname)
contact = config['Server']['Contact-Email']
comments = config['Server']['Comments']
if not validAt:
@@ -429,62 +429,62 @@
# Create the X509 certificate.
mixminion.Crypto.generate_cert(serverKeys.getCertFileName(),
- mmtpKey,
- "MMTP certificate for %s" %nickname,
+ mmtpKey,
+ "MMTP certificate for %s" %nickname,
certStarts, certEnds)
fields = {
- "IP": config['Incoming/MMTP'].get('IP', "0.0.0.0"),
- "Port": config['Incoming/MMTP'].get('Port', 0),
- "Nickname": nickname,
- "Identity":
- formatBase64(mixminion.Crypto.pk_encode_public_key(identityKey)),
- "Published": formatTime(time.time()),
- "ValidAfter": formatDate(validAt),
- "ValidUntil": formatDate(validUntil),
- "PacketKey":
- formatBase64(mixminion.Crypto.pk_encode_public_key(packetKey)),
- "KeyID":
- formatBase64(serverKeys.getMMTPKeyID()),
- }
+ "IP": config['Incoming/MMTP'].get('IP', "0.0.0.0"),
+ "Port": config['Incoming/MMTP'].get('Port', 0),
+ "Nickname": nickname,
+ "Identity":
+ formatBase64(mixminion.Crypto.pk_encode_public_key(identityKey)),
+ "Published": formatTime(time.time()),
+ "ValidAfter": formatDate(validAt),
+ "ValidUntil": formatDate(validUntil),
+ "PacketKey":
+ formatBase64(mixminion.Crypto.pk_encode_public_key(packetKey)),
+ "KeyID":
+ formatBase64(serverKeys.getMMTPKeyID()),
+ }
# If we don't know our IP address, try to guess
if fields['IP'] == '0.0.0.0':
- try:
- fields['IP'] = _guessLocalIP()
- LOG.warn("No IP configured; guessing %s",fields['IP'])
- except IPGuessError, e:
- LOG.error("Can't guess IP: %s", str(e))
- raise MixError("Can't guess IP: %s" % str(e))
+ try:
+ fields['IP'] = _guessLocalIP()
+ LOG.warn("No IP configured; guessing %s",fields['IP'])
+ except IPGuessError, e:
+ LOG.error("Can't guess IP: %s", str(e))
+ raise MixError("Can't guess IP: %s" % str(e))
# Fill in a stock server descriptor. Note the empty Digest: and
# Signature: lines.
info = """\
[Server]
- Descriptor-Version: 0.1
+ Descriptor-Version: 0.1
IP: %(IP)s
Nickname: %(Nickname)s
- Identity: %(Identity)s
- Digest:
+ Identity: %(Identity)s
+ Digest:
Signature:
Published: %(Published)s
Valid-After: %(ValidAfter)s
- Valid-Until: %(ValidUntil)s
- Packet-Key: %(PacketKey)s
+ Valid-Until: %(ValidUntil)s
+ Packet-Key: %(PacketKey)s
""" % fields
if contact:
- info += "Contact: %s\n"%contact
+ info += "Contact: %s\n"%contact
if comments:
- info += "Comments: %s\n"%comments
+ info += "Comments: %s\n"%comments
# Only advertise incoming MMTP if we support it.
if config["Incoming/MMTP"].get("Enabled", 0):
- info += """\
+ info += """\
[Incoming/MMTP]
Version: 0.1
Port: %(Port)s
- Key-Digest: %(KeyID)s
- Protocols: 0.1
+ Key-Digest: %(KeyID)s
+ Protocols: 0.1
""" % fields
for k,v in config.getSectionItems("Incoming/MMTP"):
if k not in ("Allow", "Deny"):
@@ -493,9 +493,9 @@
# Only advertise outgoing MMTP if we support it.
if config["Outgoing/MMTP"].get("Enabled", 0):
- info += """\
+ info += """\
[Outgoing/MMTP]
- Version: 0.1
+ Version: 0.1
Protocols: 0.1
"""
for k,v in config.getSectionItems("Outgoing/MMTP"):
@@ -567,43 +567,43 @@
"Try to find a reasonable IP for this host."
global _GUESSED_IP
if _GUESSED_IP is not None:
- return _GUESSED_IP
+ return _GUESSED_IP
# First, let's see what our name resolving subsystem says our
# name is.
ip_set = {}
try:
- ip_set[ socket.gethostbyname(socket.gethostname()) ] = 1
+ ip_set[ socket.gethostbyname(socket.gethostname()) ] = 1
except socket.error:
- try:
- ip_set[ socket.gethostbyname(socket.getfqdn()) ] = 1
- except socket.error:
- pass
+ try:
+ ip_set[ socket.gethostbyname(socket.getfqdn()) ] = 1
+ except socket.error:
+ pass
# And in case that doesn't work, let's see what other addresses we might
# think we have by using 'getsockname'.
for target_addr in ('18.0.0.1', '10.0.0.1', '192.168.0.1',
- '172.16.0.1')+tuple(ip_set.keys()):
- # open a datagram socket so that we don't actually send any packets
- # by connecting.
- try:
- s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
- s.connect((target_addr, 9)) #discard port
- ip_set[ s.getsockname()[0] ] = 1
- except socket.error:
- pass
+ '172.16.0.1')+tuple(ip_set.keys()):
+ # open a datagram socket so that we don't actually send any packets
+ # by connecting.
+ try:
+ s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
+ s.connect((target_addr, 9)) #discard port
+ ip_set[ s.getsockname()[0] ] = 1
+ except socket.error:
+ pass
for ip in ip_set.keys():
- if ip.startswith("127.") or ip.startswith("0."):
- del ip_set[ip]
+ if ip.startswith("127.") or ip.startswith("0."):
+ del ip_set[ip]
# FFFF reject 192.168, 10., 176.16.x
if len(ip_set) == 0:
- raise IPGuessError("No address found")
+ raise IPGuessError("No address found")
if len(ip_set) > 1:
- raise IPGuessError("Multiple addresses found: %s" % (
- ", ".join(ip_set.keys())))
+ raise IPGuessError("Multiple addresses found: %s" % (
+ ", ".join(ip_set.keys())))
return ip_set.keys()[0]
Index: ServerMain.py
===================================================================
RCS file: /home/minion/cvsroot/src/minion/lib/mixminion/server/ServerMain.py,v
retrieving revision 1.5
retrieving revision 1.6
diff -u -d -r1.5 -r1.6
--- ServerMain.py 16 Dec 2002 01:37:21 -0000 1.5
+++ ServerMain.py 16 Dec 2002 02:40:11 -0000 1.6
@@ -33,114 +33,114 @@
process them with a packet handler, and send them into a mix pool."""
def __init__(self, location, packetHandler):
- """Create an IncomingQueue that stores its messages in <location>
- and processes them through <packetHandler>."""
- mixminion.server.Queue.DeliveryQueue.__init__(self, location)
- self.packetHandler = packetHandler
- self.mixPool = None
+ """Create an IncomingQueue that stores its messages in <location>
+ and processes them through <packetHandler>."""
+ mixminion.server.Queue.DeliveryQueue.__init__(self, location)
+ self.packetHandler = packetHandler
+ self.mixPool = None
def connectQueues(self, mixPool):
- """Sets the target mix queue"""
- self.mixPool = mixPool
+ """Sets the target mix queue"""
+ self.mixPool = mixPool
def queueMessage(self, msg):
- """Add a message for delivery"""
- LOG.trace("Inserted message %s into incoming queue",
- formatBase64(msg[:8]))
- self.queueDeliveryMessage(None, msg)
+ """Add a message for delivery"""
+ LOG.trace("Inserted message %s into incoming queue",
+ formatBase64(msg[:8]))
+ self.queueDeliveryMessage(None, msg)
def _deliverMessages(self, msgList):
- "Implementation of abstract method from DeliveryQueue."
- ph = self.packetHandler
- for handle, _, message, n_retries in msgList:
- try:
- res = ph.processMessage(message)
- if res is None:
- # Drop padding before it gets to the mix.
- LOG.debug("Padding message %s dropped",
- formatBase64(message[:8]))
- else:
- LOG.debug("Processed message %s; inserting into pool",
- formatBase64(message[:8]))
- self.mixPool.queueObject(res)
- self.deliverySucceeded(handle)
- except mixminion.Crypto.CryptoError, e:
- LOG.warn("Invalid PK or misencrypted packet header: %s", e)
- self.deliveryFailed(handle)
- except mixminion.Packet.ParseError, e:
- LOG.warn("Malformed message dropped: %s", e)
- self.deliveryFailed(handle)
- except mixminion.server.PacketHandler.ContentError, e:
- LOG.warn("Discarding bad packet: %s", e)
- self.deliveryFailed(handle)
+ "Implementation of abstract method from DeliveryQueue."
+ ph = self.packetHandler
+ for handle, _, message, n_retries in msgList:
+ try:
+ res = ph.processMessage(message)
+ if res is None:
+ # Drop padding before it gets to the mix.
+ LOG.debug("Padding message %s dropped",
+ formatBase64(message[:8]))
+ else:
+ LOG.debug("Processed message %s; inserting into pool",
+ formatBase64(message[:8]))
+ self.mixPool.queueObject(res)
+ self.deliverySucceeded(handle)
+ except mixminion.Crypto.CryptoError, e:
+ LOG.warn("Invalid PK or misencrypted packet header: %s", e)
+ self.deliveryFailed(handle)
+ except mixminion.Packet.ParseError, e:
+ LOG.warn("Malformed message dropped: %s", e)
+ self.deliveryFailed(handle)
+ except mixminion.server.PacketHandler.ContentError, e:
+ LOG.warn("Discarding bad packet: %s", e)
+ self.deliveryFailed(handle)
class MixPool:
"""Wraps a mixminion.server.Queue.*MixQueue to send messages to an exit
queue and a delivery queue."""
def __init__(self, queue):
- """Create a new MixPool to wrap a given *MixQueue."""
- self.queue = queue
- self.outgoingQueue = None
- self.moduleManager = None
+ """Create a new MixPool to wrap a given *MixQueue."""
+ self.queue = queue
+ self.outgoingQueue = None
+ self.moduleManager = None
def queueObject(self, obj):
- """Insert an object into the queue."""
- self.queue.queueObject(obj)
+ """Insert an object into the queue."""
+ self.queue.queueObject(obj)
def count(self):
- "Return the number of messages in the queue"
- return self.queue.count()
+ "Return the number of messages in the queue"
+ return self.queue.count()
def connectQueues(self, outgoing, manager):
- """Sets the queue for outgoing mixminion packets, and the
- module manager for deliverable messages."""
- self.outgoingQueue = outgoing
- self.moduleManager = manager
+ """Sets the queue for outgoing mixminion packets, and the
+ module manager for deliverable messages."""
+ self.outgoingQueue = outgoing
+ self.moduleManager = manager
def mix(self):
- """Get a batch of messages, and queue them for delivery as
- appropriate."""
- handles = self.queue.getBatch()
- LOG.debug("Mixing %s messages out of %s",
- len(handles), self.queue.count())
- for h in handles:
- tp, info = self.queue.getObject(h)
- if tp == 'EXIT':
- rt, ri, app_key, tag, payload = info
- LOG.debug(" (sending message %s to exit modules)",
- formatBase64(payload[:8]))
- self.moduleManager.queueMessage(payload, tag, rt, ri)
- else:
- assert tp == 'QUEUE'
- ipv4, msg = info
- LOG.debug(" (sending message %s to MMTP server)",
- formatBase64(msg[:8]))
- self.outgoingQueue.queueDeliveryMessage(ipv4, msg)
- self.queue.removeMessage(h)
+ """Get a batch of messages, and queue them for delivery as
+ appropriate."""
+ handles = self.queue.getBatch()
+ LOG.debug("Mixing %s messages out of %s",
+ len(handles), self.queue.count())
+ for h in handles:
+ tp, info = self.queue.getObject(h)
+ if tp == 'EXIT':
+ rt, ri, app_key, tag, payload = info
+ LOG.debug(" (sending message %s to exit modules)",
+ formatBase64(payload[:8]))
+ self.moduleManager.queueMessage(payload, tag, rt, ri)
+ else:
+ assert tp == 'QUEUE'
+ ipv4, msg = info
+ LOG.debug(" (sending message %s to MMTP server)",
+ formatBase64(msg[:8]))
+ self.outgoingQueue.queueDeliveryMessage(ipv4, msg)
+ self.queue.removeMessage(h)
class OutgoingQueue(mixminion.server.Queue.DeliveryQueue):
"""DeliveryQueue to send messages via outgoing MMTP connections."""
def __init__(self, location):
- """Create a new OutgoingQueue that stores its messages in a given
- location."""
+ """Create a new OutgoingQueue that stores its messages in a given
+ location."""
mixminion.server.Queue.DeliveryQueue.__init__(self, location)
- self.server = None
+ self.server = None
def connectQueues(self, server):
- """Set the MMTPServer that this OutgoingQueue informs of its
- deliverable messages."""
- self.server = server
+ """Set the MMTPServer that this OutgoingQueue informs of its
+ deliverable messages."""
+ self.server = server
def _deliverMessages(self, msgList):
- "Implementation of abstract method from DeliveryQueue."
- # Map from addr -> [ (handle, msg) ... ]
- msgs = {}
- for handle, addr, message, n_retries in msgList:
- msgs.setdefault(addr, []).append( (handle, message) )
- for addr, messages in msgs.items():
- handles, messages = zip(*messages)
- self.server.sendMessages(addr.ip, addr.port, addr.keyinfo,
- list(messages), list(handles))
+ "Implementation of abstract method from DeliveryQueue."
+ # Map from addr -> [ (handle, msg) ... ]
+ msgs = {}
+ for handle, addr, message, n_retries in msgList:
+ msgs.setdefault(addr, []).append( (handle, message) )
+ for addr, messages in msgs.items():
+ handles, messages = zip(*messages)
+ self.server.sendMessages(addr.ip, addr.port, addr.keyinfo,
+ list(messages), list(handles))
class _MMTPServer(mixminion.server.MMTPServer.MMTPAsyncServer):
"""Implementation of mixminion.server.MMTPServer that knows about
@@ -159,7 +159,7 @@
self.outgoingQueue.deliverySucceeded(handle)
def onMessageUndeliverable(self, msg, handle, retriable):
- self.outgoingQueue.deliveryFailed(handle, retriable)
+ self.outgoingQueue.deliveryFailed(handle, retriable)
class MixminionServer:
"""Wraps and drives all the queues, and the async net server. Handles
@@ -184,138 +184,138 @@
# outgoingQueue: Holds messages waiting to be send via MMTP.
def __init__(self, config):
- """Create a new server from a ServerConfig."""
- LOG.debug("Initializing server")
- self.config = config
- homeDir = config['Server']['Homedir']
- createPrivateDir(homeDir)
+ """Create a new server from a ServerConfig."""
+ LOG.debug("Initializing server")
+ self.config = config
+ homeDir = config['Server']['Homedir']
+ createPrivateDir(homeDir)
- # Lock file.
- # FFFF Refactor this part into common?
- self.lockFile = os.path.join(homeDir, "lock")
- self.lockFD = os.open(self.lockFile, os.O_RDWR|os.O_CREAT, 0600)
- try:
- fcntl.flock(self.lockFD, fcntl.LOCK_EX|fcntl.LOCK_NB)
- except IOError:
- raise MixFatalError("Another server seems to be running.")
+ # Lock file.
+ # FFFF Refactor this part into common?
+ self.lockFile = os.path.join(homeDir, "lock")
+ self.lockFD = os.open(self.lockFile, os.O_RDWR|os.O_CREAT, 0600)
+ try:
+ fcntl.flock(self.lockFD, fcntl.LOCK_EX|fcntl.LOCK_NB)
+ except IOError:
+ raise MixFatalError("Another server seems to be running.")
- # The pid file.
- self.pidFile = os.path.join(homeDir, "pid")
-
- self.keyring = mixminion.server.ServerKeys.ServerKeyring(config)
- if self.keyring._getLiveKey() is None:
- LOG.info("Generating a month's worth of keys.")
- LOG.info("(Don't count on this feature in future versions.)")
- # We might not be able to do this, if we password-encrypt keys
- keylife = config['Server']['PublicKeyLifetime'][2]
- nKeys = ceilDiv(30*24*60*60, keylife)
- self.keyring.createKeys(nKeys)
+ # The pid file.
+ self.pidFile = os.path.join(homeDir, "pid")
+
+ self.keyring = mixminion.server.ServerKeys.ServerKeyring(config)
+ if self.keyring._getLiveKey() is None:
+ LOG.info("Generating a month's worth of keys.")
+ LOG.info("(Don't count on this feature in future versions.)")
+ # We might not be able to do this, if we password-encrypt keys
+ keylife = config['Server']['PublicKeyLifetime'][2]
+ nKeys = ceilDiv(30*24*60*60, keylife)
+ self.keyring.createKeys(nKeys)
- LOG.trace("Initializing packet handler")
- self.packetHandler = self.keyring.getPacketHandler()
- LOG.trace("Initializing TLS context")
- tlsContext = self.keyring.getTLSContext()
- LOG.trace("Initializing MMTP server")
- self.mmtpServer = _MMTPServer(config, tlsContext)
+ LOG.trace("Initializing packet handler")
+ self.packetHandler = self.keyring.getPacketHandler()
+ LOG.trace("Initializing TLS context")
+ tlsContext = self.keyring.getTLSContext()
+ LOG.trace("Initializing MMTP server")
+ self.mmtpServer = _MMTPServer(config, tlsContext)
- # FFFF Modulemanager should know about async so it can patch in if it
- # FFFF needs to.
- LOG.trace("Initializing delivery module")
- self.moduleManager = config.getModuleManager()
- self.moduleManager.configure(config)
+ # FFFF Modulemanager should know about async so it can patch in if it
+ # FFFF needs to.
+ LOG.trace("Initializing delivery module")
+ self.moduleManager = config.getModuleManager()
+ self.moduleManager.configure(config)
- queueDir = os.path.join(homeDir, 'work', 'queues')
+ queueDir = os.path.join(homeDir, 'work', 'queues')
- incomingDir = os.path.join(queueDir, "incoming")
- LOG.trace("Initializing incoming queue")
- self.incomingQueue = IncomingQueue(incomingDir, self.packetHandler)
- LOG.trace("Found %d pending messages in incoming queue",
- self.incomingQueue.count())
+ incomingDir = os.path.join(queueDir, "incoming")
+ LOG.trace("Initializing incoming queue")
+ self.incomingQueue = IncomingQueue(incomingDir, self.packetHandler)
+ LOG.trace("Found %d pending messages in incoming queue",
+ self.incomingQueue.count())
- mixDir = os.path.join(queueDir, "mix")
- # FFFF The choice of mix algorithm should be configurable
- LOG.trace("Initializing Mix pool")
- self.mixPool =MixPool(mixminion.server.Queue.TimedMixQueue(mixDir, 60))
- LOG.trace("Found %d pending messages in Mix pool",
- self.mixPool.count())
+ mixDir = os.path.join(queueDir, "mix")
+ # FFFF The choice of mix algorithm should be configurable
+ LOG.trace("Initializing Mix pool")
+ self.mixPool =MixPool(mixminion.server.Queue.TimedMixQueue(mixDir, 60))
+ LOG.trace("Found %d pending messages in Mix pool",
+ self.mixPool.count())
- outgoingDir = os.path.join(queueDir, "outgoing")
- LOG.trace("Initializing outgoing queue")
- self.outgoingQueue = OutgoingQueue(outgoingDir)
- LOG.trace("Found %d pending messages in outgoing queue",
- self.outgoingQueue.count())
+ outgoingDir = os.path.join(queueDir, "outgoing")
+ LOG.trace("Initializing outgoing queue")
+ self.outgoingQueue = OutgoingQueue(outgoingDir)
+ LOG.trace("Found %d pending messages in outgoing queue",
+ self.outgoingQueue.count())
- LOG.trace("Connecting queues")
- self.incomingQueue.connectQueues(mixPool=self.mixPool)
- self.mixPool.connectQueues(outgoing=self.outgoingQueue,
- manager=self.moduleManager)
- self.outgoingQueue.connectQueues(server=self.mmtpServer)
- self.mmtpServer.connectQueues(incoming=self.incomingQueue,
- outgoing=self.outgoingQueue)
+ LOG.trace("Connecting queues")
+ self.incomingQueue.connectQueues(mixPool=self.mixPool)
+ self.mixPool.connectQueues(outgoing=self.outgoingQueue,
+ manager=self.moduleManager)
+ self.outgoingQueue.connectQueues(server=self.mmtpServer)
+ self.mmtpServer.connectQueues(incoming=self.incomingQueue,
+ outgoing=self.outgoingQueue)
def run(self):
- """Run the server; don't return unless we hit an exception."""
- # FFFF Use heapq to schedule events? [I don't think so; there are only
- # FFFF two events, after all!]
+ """Run the server; don't return unless we hit an exception."""
+ # FFFF Use heapq to schedule events? [I don't think so; there are only
+ # FFFF two events, after all!]
- f = open(self.pidFile, 'wt')
- f.write("%s\n" % os.getpid())
- f.close()
+ f = open(self.pidFile, 'wt')
+ f.write("%s\n" % os.getpid())
+ f.close()
- now = time.time()
- MIX_INTERVAL = 600 # FFFF Configurable!
- nextMix = now + MIX_INTERVAL
- nextShred = now + 6000
- #FFFF Unused
- #nextRotate = self.keyring.getNextKeyRotation()
- while 1:
- LOG.trace("Next mix at %s", formatTime(nextMix,1))
- while time.time() < nextMix:
- # Handle pending network events
- self.mmtpServer.process(1)
- # Process any new messages that have come in, placing them
- # into the mix pool.
- self.incomingQueue.sendReadyMessages()
+ now = time.time()
+ MIX_INTERVAL = 600 # FFFF Configurable!
+ nextMix = now + MIX_INTERVAL
+ nextShred = now + 6000
+ #FFFF Unused
+ #nextRotate = self.keyring.getNextKeyRotation()
+ while 1:
+ LOG.trace("Next mix at %s", formatTime(nextMix,1))
+ while time.time() < nextMix:
+ # Handle pending network events
+ self.mmtpServer.process(1)
+ # Process any new messages that have come in, placing them
+ # into the mix pool.
+ self.incomingQueue.sendReadyMessages()
# Prevent child processes from turning into zombies.
waitForChildren(1)
- # Before we mix, we need to log the hashes to avoid replays.
- # FFFF We need to recover on server failure.
- self.packetHandler.syncLogs()
+ # Before we mix, we need to log the hashes to avoid replays.
+ # FFFF We need to recover on server failure.
+ self.packetHandler.syncLogs()
- LOG.trace("Mix interval elapsed")
- # Choose a set of outgoing messages; put them in outgoingqueue and
- # modulemanger
- self.mixPool.mix()
- # Send outgoing messages
- self.outgoingQueue.sendReadyMessages()
- # Send exit messages
- self.moduleManager.sendReadyMessages()
+ LOG.trace("Mix interval elapsed")
+ # Choose a set of outgoing messages; put them in outgoingqueue and
+ # modulemanger
+ self.mixPool.mix()
+ # Send outgoing messages
+ self.outgoingQueue.sendReadyMessages()
+ # Send exit messages
+ self.moduleManager.sendReadyMessages()
- # Choose next mix interval
- now = time.time()
- nextMix = now + MIX_INTERVAL
+ # Choose next mix interval
+ now = time.time()
+ nextMix = now + MIX_INTERVAL
- if now > nextShred:
- # FFFF Configurable shred interval
- LOG.trace("Expunging deleted messages from queues")
- self.incomingQueue.cleanQueue()
- self.mixPool.queue.cleanQueue()
- self.outgoingQueue.cleanQueue()
- self.moduleManager.cleanQueues()
- nextShred = now + 6000
+ if now > nextShred:
+ # FFFF Configurable shred interval
+ LOG.trace("Expunging deleted messages from queues")
+ self.incomingQueue.cleanQueue()
+ self.mixPool.queue.cleanQueue()
+ self.outgoingQueue.cleanQueue()
+ self.moduleManager.cleanQueues()
+ nextShred = now + 6000
def close(self):
- """Release all resources; close all files."""
- self.packetHandler.close()
- try:
- os.unlink(self.lockFile)
- fcntl.flock(self.lockFD, fcntl.LOCK_UN)
- os.close(self.lockFD)
- os.unlink(self.pidFile)
- except OSError:
- pass
+ """Release all resources; close all files."""
+ self.packetHandler.close()
+ try:
+ os.unlink(self.lockFile)
+ fcntl.flock(self.lockFD, fcntl.LOCK_UN)
+ os.close(self.lockFD)
+ os.unlink(self.pidFile)
+ except OSError:
+ pass
#----------------------------------------------------------------------
def usageAndExit(cmd):
@@ -326,66 +326,66 @@
def configFromServerArgs(cmd, args):
options, args = getopt.getopt(args, "hf:", ["help", "config="])
if args:
- usageAndExit(cmd)
+ usageAndExit(cmd)
configFile = "/etc/mixminiond.conf"
for o,v in options:
- if o in ('-h', '--help'):
- usageAndExit(cmd)
- if o in ('-f', '--config'):
- configFile = v
+ if o in ('-h', '--help'):
+ usageAndExit(cmd)
+ if o in ('-f', '--config'):
+ configFile = v
return readConfigFile(configFile)
def readConfigFile(configFile):
try:
- return mixminion.server.ServerConfig.ServerConfig(fname=configFile)
+ return mixminion.server.ServerConfig.ServerConfig(fname=configFile)
except (IOError, OSError), e:
- print >>sys.stderr, "Error reading configuration file %r:"%configFile
- print >>sys.stderr, " ", str(e)
- sys.exit(1)
+ print >>sys.stderr, "Error reading configuration file %r:"%configFile
+ print >>sys.stderr, " ", str(e)
+ sys.exit(1)
except mixminion.Config.ConfigError, e:
- print >>sys.stderr, "Error in configuration file %r"%configFile
- print >>sys.stderr, str(e)
- sys.exit(1)
+ print >>sys.stderr, "Error in configuration file %r"%configFile
+ print >>sys.stderr, str(e)
+ sys.exit(1)
return None #suppress pychecker warning
#----------------------------------------------------------------------
def runServer(cmd, args):
config = configFromServerArgs(cmd, args)
try:
- mixminion.Common.LOG.configure(config)
- LOG.debug("Configuring server")
- mixminion.Common.configureShredCommand(config)
- mixminion.Crypto.init_crypto(config)
+ mixminion.Common.LOG.configure(config)
+ LOG.debug("Configuring server")
+ mixminion.Common.configureShredCommand(config)
+ mixminion.Crypto.init_crypto(config)
- server = MixminionServer(config)
+ server = MixminionServer(config)
except:
- info = sys.exc_info()
- LOG.fatal_exc(info,"Exception while configuring server")
- print >>sys.stderr, "Shutting down because of exception: %s"%info[1]
- sys.exit(1)
+ info = sys.exc_info()
+ LOG.fatal_exc(info,"Exception while configuring server")
+ print >>sys.stderr, "Shutting down because of exception: %s"%info[1]
+ sys.exit(1)
if not config['Server'].get("NoDaemon",0):
- print >>sys.stderr, "Starting server in the background"
- # ??? This 'daemonize' logic should go in Common.
- pid = os.fork()
- if pid != 0:
- os._exit(0)
- sys.stderr.close()
- sys.stdout.close()
- sys.stdin.close()
- sys.stdout = LogStream("STDOUT", "WARN")
- sys.stderr = LogStream("STDERR", "WARN")
+ print >>sys.stderr, "Starting server in the background"
+ # ??? This 'daemonize' logic should go in Common.
+ pid = os.fork()
+ if pid != 0:
+ os._exit(0)
+ sys.stderr.close()
+ sys.stdout.close()
+ sys.stdin.close()
+ sys.stdout = LogStream("STDOUT", "WARN")
+ sys.stderr = LogStream("STDERR", "WARN")
LOG.info("Starting server")
try:
- server.run()
+ server.run()
except KeyboardInterrupt:
- pass
+ pass
except:
- info = sys.exc_info()
- LOG.fatal_exc(info,"Exception while running server")
- print >>sys.stderr, "Shutting down because of exception: %s"%info[1]
+ info = sys.exc_info()
+ LOG.fatal_exc(info,"Exception while running server")
+ print >>sys.stderr, "Shutting down because of exception: %s"%info[1]
LOG.info("Server shutting down")
server.close()
LOG.info("Server is shut down")
@@ -403,16 +403,16 @@
usage=0
configFile = '/etc/miniond.conf'
for opt,val in options:
- if opt in ('-h', '--help'):
- usage=1
- elif opt in ('-f', '--config'):
- configFile = val
- elif opt in ('-n', '--keys'):
- try:
- keys = int(val)
- except ValueError:
- print >>sys.stderr,("%s requires an integer" %opt)
- usage = 1
+ if opt in ('-h', '--help'):
+ usage=1
+ elif opt in ('-f', '--config'):
+ configFile = val
+ elif opt in ('-n', '--keys'):
+ try:
+ keys = int(val)
+ except ValueError:
+ print >>sys.stderr,("%s requires an integer" %opt)
+ usage = 1
if usage:
print >>sys.stderr, "Usage: %s [-h] [-f configfile] [-n nKeys]"%cmd
sys.exit(1)
@@ -424,8 +424,8 @@
keyring = mixminion.server.ServerKeys.ServerKeyring(config)
print >>sys.stderr, "Creating %s keys..." % keys
for i in xrange(keys):
- keyring.createKeys(1)
- print >> sys.stderr, ".... (%s/%s done)" % (i+1,keys)
+ keyring.createKeys(1)
+ print >> sys.stderr, ".... (%s/%s done)" % (i+1,keys)
#----------------------------------------------------------------------
def removeKeys(cmd, args):
@@ -441,11 +441,11 @@
removeIdentity = 0
configFile = '/etc/miniond.conf'
for opt,val in options:
- if opt in ('-h', '--help'):
- usage=1
- elif opt in ('-f', '--config'):
- configFile = val
- elif opt == '--remove-identity':
+ if opt in ('-h', '--help'):
+ usage=1
+ elif opt in ('-f', '--config'):
+ configFile = val
+ elif opt == '--remove-identity':
removeIdentity = 1
if usage:
print >>sys.stderr, \