[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]
[minion-cvs] Sunday"s hacks, today! More work towards clean, usable...
Update of /home/minion/cvsroot/src/minion/lib/mixminion
In directory moria.seul.org:/tmp/cvs-serv31913/lib/mixminion
Modified Files:
Common.py MMTPServer.py Modules.py Queue.py ServerInfo.py
ServerMain.py
Log Message:
Sunday's hacks, today! More work towards clean, usable, testable server
logic.
HACKING:
Bring up to date
Makefile, setup:
Make it work on macos X
Common:
Move in createPrivateDirs from servermain
Document log severities
MMTPServer:
Track retriable and non-retriable failures better; add failure
callback
Modules:
Document classes
Add per-module queues
Begin new processMessage method (incomplete)
Simplify DropModule
Switch MBOX to use SMTPlib.
Queue:
Add convenience methods to pickle and unpickle objects
Add DeliveryQueue class to encapsulate queue/send/retry logic
Add MixQueue to implement Cottrell mixing.
ServerMain:
Adopt some (but not yet all) of above improvements
aes_ctr.c:
Fix big-endian *_U32 macros
Add INCR_U32 macro to improve performance on big-endian machines
Rewrite code to use INCR_ macros
Index: Common.py
===================================================================
RCS file: /home/minion/cvsroot/src/minion/lib/mixminion/Common.py,v
retrieving revision 1.11
retrieving revision 1.12
diff -u -d -r1.11 -r1.12
--- Common.py 11 Aug 2002 07:50:34 -0000 1.11
+++ Common.py 12 Aug 2002 18:12:24 -0000 1.12
@@ -51,6 +51,28 @@
return divmod(a-1,b)[0]+1
#----------------------------------------------------------------------
+def createPrivateDir(d):
+ """Create a directory, and all parent directories, checking permissions
+ as we go along. All superdirectories must be owned by root or us.
+
+ XXXX we don't check permissions properly yet."""
+ if not os.path.exists(d):
+ try:
+ os.makedirs(s, 0700)
+ except OSError, e:
+ getLog().fatal("Unable to create directory %s"%d)
+ raise MixFatalError()
+ elif not os.path.isdir(d):
+ getLog().fatal("%s is not a directory"%d)
+ raise MixFatalError()
+ else:
+ m = os.stat(d)[stat.ST_MODE]
+ # check permissions
+ if m & 0077:
+ getLog().fatal("Directory %s must be mode 0700" %d)
+ raise MixFatalError()
+
+#----------------------------------------------------------------------
# Secure filesystem operations.
#
@@ -171,7 +193,18 @@
class Log:
"""A Log is a set of destinations for system messages, along with the
- means to filter them to a desired verbosity."""
+ means to filter them to a desired verbosity.
+
+ Log messages have 'severities' as follows:
+ TRACE: hyperverbose mode; used for debugging fiddly
+ little details. This is a security risk.
+ DEBUG: very verbose mode; used for tracing connections
+ and messages through the system. This is a security risk.
+ INFO: non-critical events.
+ WARN: recoverable errors
+ ERROR: nonrecoverable errors that affect only a single
+ message or a connection.
+ FATAL: nonrecoverable errors that affect the entire system"""
def __init__(self, minSeverity):
self.configure(None)
self.setMinSeverity(minSeverity)
Index: MMTPServer.py
===================================================================
RCS file: /home/minion/cvsroot/src/minion/lib/mixminion/MMTPServer.py,v
retrieving revision 1.9
retrieving revision 1.10
diff -u -d -r1.9 -r1.10
--- MMTPServer.py 11 Aug 2002 07:50:34 -0000 1.9
+++ MMTPServer.py 12 Aug 2002 18:12:24 -0000 1.10
@@ -165,7 +165,7 @@
rw.register(self.server)
def shutdown(self):
- debug("Closing server connection")
+ debug("Closing listener connection")
self.server.unregister(self)
del self.server
self.sock.close()
@@ -289,7 +289,7 @@
r = self.__con.read(1024) #may throw want*
if r == 0:
trace("read returned 0.")
- self.shutdown()
+ self.shutdown(err=0)
return
else:
assert isinstance(r, StringType)
@@ -304,7 +304,7 @@
if self.__expectReadLen and self.__inbuflen > self.__expectReadLen:
warn("Protocol violation: too much data. Closing connection.")
- self.shutdown(err=1)
+ self.shutdown(err=1, retriable=0)
return
if self.__terminator and self.__inbuf[0].find(self.__terminator) > -1:
@@ -356,12 +356,13 @@
self.__server.registerReader(self)
except _ml.TLSClosed:
warn("Unexpectedly closed connection")
+ self.handleFail(retriable=1)
self.__sock.close()
self.__server.unregister(self)
except _ml.TLSError:
if self.__state != self.__shutdownFn:
warn("Unexpected error: closing connection.")
- self.shutdown(1)
+ self.shutdown(err=1, retriable=1)
else:
warn("Error while shutting down: closing connection.")
self.__server.unregister(self)
@@ -377,11 +378,11 @@
"""Called when this connection is successfully shut down."""
pass
- def shutdown(self, err=0):
+ def shutdown(self, err=0, retriable=0):
"""Begin a shutdown on this connection"""
-
+ if err:
+ self.handleFail(retriable)
self.__state = self.__shutdownFn
- #self.__server.registerWriter(self)
def fileno(self):
return self.__con.fileno()
@@ -392,6 +393,10 @@
def getPeerPK(self):
return self.__con.get_peer_cert_pk()
+
+ def handleFail(self, retriable=0):
+ """Called when we shutdown with an error."""
+ pass
#----------------------------------------------------------------------
PROTOCOL_STRING = "MMTP 1.0\r\n"
@@ -484,10 +489,10 @@
#----------------------------------------------------------------------
-# XXXX retry logic.
+# XXXX We need logic to do retires.
class MMTPClientConnection(SimpleTLSConnection):
def __init__(self, context, ip, port, keyID, messageList, handleList,
- sentCallback=None):
+ sentCallback=None, failCallback=None):
"""Create a connection to send messages to an MMTP server.
ip -- The IP of the destination server.
port -- The port to connect to.
@@ -496,7 +501,9 @@
handleList -- a list of objects corresponding to the messages in
messageList. Used for callback.
sentCallback -- None, or a function of (msg, handle) to be called
- whenever a message is successfully sent."""
+ whenever a message is successfully sent.
+ failCallback -- None, or a function of (msg, handle, retriable)
+ to be called when messages can't be sent."""
trace("CLIENT CON")
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
@@ -516,6 +523,7 @@
self.handleList = handleList
self.finished = self.__setupFinished
self.sentCallback = sentCallback
+ self.failCallback = failCallback
def __setupFinished(self):
"""Called when we're done with the client side negotations.
@@ -525,7 +533,8 @@
if self.keyID is not None:
if keyID != self.keyID:
warn("Got unexpected Key ID from %s", self.ip)
- self.shutdown(err=1)
+ # This may work again in a couple of hours
+ self.shutdown(err=1,retriable=1)
else:
debug("KeyID is valid")
@@ -546,7 +555,9 @@
inp = self.getInput()
if inp != PROTOCOL_STRING:
warn("Invalid protocol. Closing connection")
- self.shutdown(err=1)
+ # This isn't retriable; we don't talk to servers we don't
+ # understand.
+ self.shutdown(err=1,retriable=0)
return
self.beginNextMessage()
@@ -584,7 +595,9 @@
#XXXX Rehandshake
inp = self.getInput()
if inp != (RECEIVED_CONTROL+self.expectedDigest):
- self.shutdown(1)
+ # We only get bad ACKs if an adversary somehow subverts TLS's
+ # checksumming. That's not fixable.
+ self.shutdown(err=1,retriable=0)
return
debug("Received valid ACK for message.")
@@ -597,6 +610,11 @@
self.beginNextMessage()
+ def handleFail(self, retriable):
+ if self.failCallback is not None:
+ for msg, handle in zip(self.messageList, self.handleList):
+ self.failCallback(msg,handle,retriable)
+
LISTEN_BACKLOG = 10 # ???? Is something else more reasonable
class MMTPServer(AsyncServer):
"""A helper class to invoke AsyncServer, MMTPServerConnection, and
@@ -618,7 +636,10 @@
sock.setblocking(0)
con = MMTPServerConnection(sock, tls, self.onMessageReceived)
con.register(self)
-
+
+ def stopListening(self):
+ self.listener.shutdown()
+
def sendMessages(self, ip, port, keyID, messages, handles):
"""Send a set of messages to a given server."""
con = MMTPClientConnection(ip, port, keyID, messages, handles,
@@ -627,6 +648,9 @@
def onMessageReceived(self, msg):
pass
+
+ def onMessageUndeliverable(self, msg, handle, retriable):
+ pass
def onMessageSent(self, msg, handle):
pass
Index: Modules.py
===================================================================
RCS file: /home/minion/cvsroot/src/minion/lib/mixminion/Modules.py,v
retrieving revision 1.3
retrieving revision 1.4
diff -u -d -r1.3 -r1.4
--- Modules.py 6 Aug 2002 16:09:21 -0000 1.3
+++ Modules.py 12 Aug 2002 18:12:24 -0000 1.4
@@ -5,15 +5,20 @@
Type codes and dispatch functions for routing functionality."""
-#__all__ = [ 'ModuleManager' ]
+__all__ = [ 'ModuleManager', 'DROP_TYPE', 'FWD_TYPE', 'SWAP_FWD_TYPE',
+ 'DELIVER_OK', 'DELIVER_FAIL_RETRY', 'DELIVER_FAIL_NORETRY',
+ 'SMTP_TYPE', 'MBOX_TYPE' ]
import os
+import smtplib
import mixminion.Config
import mixminion.Packet
+import mixminion.Queue
from mixminion.Config import ConfigError, _parseBoolean, _parseCommand
from mixminion.Common import getLog
+# Return values for processMessage
DELIVER_OK = 1
DELIVER_FAIL_RETRY = 2
DELIVER_FAIL_NORETRY = 3
@@ -30,11 +35,20 @@
SMTP_TYPE = 0x0100 # Mail the message
MBOX_TYPE = 0x0101 # Send the message to one of a fixed list of addresses
-
class DeliveryModule:
- "XXXX DOCME"
+ """Abstract base for modules; delivery modules should implement the methods
+ in this class.
+
+ A delivery module has the following responsibilities:
+ * It must have a 0-argument contructor.
+ * If it is configurable, it must be able to specify its options,
+ validate its configuration, and configure itself.
+ * If it is advertisable, it must provide a server info block.
+ * It must know its own name.
+ * It must know which types it handles.
+ * Of course, it needs to know how to deliver a message."""
def __init__(self):
- pass
+ pass
def getConfigSyntax(self):
pass
@@ -49,27 +63,67 @@
pass
def getName(self):
+ """Return the name of this module. This name may be used to construct
+ directory paths, so it shouldn't contain any funny characters."""
pass
def getExitTypes(self):
+ """Return a list of numeric exit types which this module is willing to
+ handle."""
pass
def processMessage(self, message, exitType, exitInfo):
+ """Given a message with a given exitType and exitInfo, try to deliver
+ it. Return one of:
+ DELIVER_OK (if the message was successfully delivered),
+ DELIVER_FAIL_RETRY (if the message wasn't delivered, but might be
+ deliverable later), or
+ DELIVER_FAIL_NORETRY (if the message shouldn't be tried later)."""
pass
class ModuleManager:
+ """A ModuleManager knows about all of the modules in the systems.
+
+ A module may be in one of three states: unloaded, registered, or
+ enabled. An unloaded module is just a class in a python module.
+ A registered module has been loaded, configured, and listed with
+ the ModuleManager, but will not receive messags until it has been
+ enabled."""
+ ##
+ # Fields
+ # myntax: extensions to the syntax configuration in Config.py
+ # modules: a list of DeliveryModule objects
+ # nameToModule: XXXX Docdoc
+ # typeToModule: a map from delivery type to enabled deliverymodule.
+ # path: search path for python modules.
+ # queueRoot: directory where all the queues go.
+ # queues: a map from module name to queue.
+
def __init__(self):
self.syntax = {}
self.modules = []
+ self.nameToModule = {}
self.typeToModule = {}
+ self.path = []
+ self.queueRoot = None
+ self.queues = {}
self.registerModule(MBoxModule())
self.registerModule(DropModule())
+ def _setQueueRoot(self, queueRoot):
+ """Sets a directory under which all modules' queue directories
+ should go."""
+ self.queueRoot = queueRoot
+
def getConfigSyntax(self):
+ """Returns a dict to extend the syntax configuration in a Config
+ object. Should be called after all modules are registered."""
return self.syntax
def registerModule(self, module):
+ """Inform this ModuleManager about a delivery module. This method
+ updates the syntax options, but does not enable the module."""
self.modules.append(module)
syn = module.getConfigSyntax()
for sec, rules in syn.items():
@@ -78,40 +132,80 @@
self.syntax.update(syn)
def setPath(self, path):
+ """Sets the search path for Python modules"""
self.path = path
def loadExtModule(self, className):
- # CHECK! XXXX Handle errors
+ """Load and register a module from a python file. Takes a classname
+ of the format module.Class or package.module.Class. Raises
+ MixError if the module can't be loaded."""
ids = className.split(".")
pyPkg = ".".join(ids[:-1])
pyClassName = ids[-1]
try:
orig_path = sys.path[:]
- sys.path.extend(self.path)
- m = __import__(pyPkg, {}, {}, [])
+ sys.path[0:0] = self.path
+ try:
+ m = __import__(pyPkg, {}, {}, [])
+ except ImportError, e:
+ raise MixError("%s while importing %s" %(str(e),className))
finally:
sys.path = orig_path
- pyClass = getattr(pyPkg, pyClassname)
- self.registerModule(pyClass())
-
+ try:
+ pyClass = getattr(m, pyClassname)
+ except AttributeError, e:
+ raise MixError("No class %s in module %s" %(pyClassName,pyPkg))
+ try:
+ self.registerModule(pyClass())
+ except Exception, e:
+ raise MixError("Error initializing module %s" %className)
+
def validate(self, sections, entries, lines, contents):
for m in self.modules:
m.validateConfig(sections, entries, lines, contents)
def configure(self, config):
+ self.queueRoot = os.path.join(config['Server']['Homedir'],
+ 'work', 'queues', 'deliver')
+ createPrivateDir(self.queueRoot)
for m in self.modules:
m.configure(config, self)
def enableModule(self, module):
+ """Maps all the types for a module object."""
for t in module.getExitTypes():
self.typeToModule[t] = module
+ queueDir = os.path.join(self.queueRoot, module.getName())
+ queue = mixminion.Queue.Queue(queueDiir, create=1, scrub=1)
+ self.queues[module.getName()] = queue
def disableModule(self, module):
+ """Unmaps all the types for a module object."""
for t in module.getExitTypes():
if self.typeToModule.has_key(t):
del self.typeToModule[t]
+ if self.queues.has_key(module.getName()):
+ del self.queues[module.getName()]
+
+ def queueMessage(self, message, exitType, exitInfo):
+ mod = self.typeToModule.get(exitType, None)
+ if mod is not None:
+ queue = self.queues[mod.getName()]
+ f, handle = queue.openNewMessage()
+ cPickle.dumps((0, exitType, exitInfo, message), f, 1)
+ queue.finishMessage(f, handle)
+ else:
+ getLog().error("Unable to queue message with unknown type %s",
+ exitType)
+
+ def processMessages(self):
+ for name, queue in self.queues.items():
+ if len(queue):
+ XXXX
def processMessage(self, message, exitType, exitInfo):
+ """Tries to deliver a message. Return types are as in
+ DeliveryModule.processMessage"""
mod = self.typeToModule.get(exitType, None)
if mod is not None:
return mod.processMessage(message, exitType, exitInfo)
@@ -123,30 +217,21 @@
def getServerInfoBlocks(self):
return [ m.getServerInfoBlock() for m in self.modules ]
+#----------------------------------------------------------------------
class DropModule(DeliveryModule):
- def __init__(self):
- DeliveryModule.__init__(self)
-
+ """Null-object pattern: drops all messages it receives."""
def getConfigSyntax(self):
return { }
-
- def validateConfig(self, sections, entries, lines, contents):
- pass
-
- def configure(self, config, moduleManager):
- pass
-
def getServerInfoBlock(self):
return ""
-
+ def configure(self, config, manager):
+ manager.enable(self)
def getName(self):
- return "DROP module"
-
+ return "DROP"
def getExitTypes(self):
return [ DROP_TYPE ]
-
def processMessage(self, message, exitType, exitInfo):
- getLog().info("Dropping padding message")
+ getLog().debug("Dropping padding message")
return DELIVER_OK
#----------------------------------------------------------------------
@@ -163,7 +248,7 @@
'AddressFile' : ('ALLOW', None, None),
'ReturnAddress' : ('ALLOW', None, None),
'RemoveContact' : ('ALLOW', None, None),
- 'Command' : ('ALLOW', _parseCommand, "sendmail") }
+ 'SMTPServer' : ('ALLOW', None, 'localhost') }
}
def validateConfig(self, sections, entries, lines, contents):
@@ -173,7 +258,7 @@
def configure(self, config, moduleManager):
# XXXX Check this. error handling
self.enabled = config['Delivery/MBOX'].get("Enabled", 0)
- self.command = config['Delivery/MBOX']['Command']
+ self.server = config['Delivery/MBOX']['SMTPServer']
self.addressFile = config['Delivery/MBOX']['AddressFile']
self.returnAddress = config['Delivery/MBOX']['ReturnAddress']
self.contact = config['Delivery/MBOX']['RemoveContact']
@@ -192,9 +277,6 @@
self.nickname = socket.gethostname()
self.addr = config['Server'].get('IP', "<Unknown host>")
- if self.command != ('sendmail', []):
- getLog().warn("Ignoring mail command in version 0.0.1")
-
f = open(self.addressfile)
addresses = f.read()
f.close()
@@ -221,7 +303,7 @@
"""
def getName(self):
- return "MBOX module"
+ return "MBOX"
def getExitTypes(self):
return [ MBOX_TYPE ]
@@ -230,12 +312,14 @@
assert exitType == MBOX_TYPE
getLog().trace("Received MBOX message")
info = mixminion.packet.parseMBOXInfo(exitInfo)
- if not addresses.has_key(info.user):
+ try:
+ address = addresses[info.user]
+ except KeyError, e:
getLog.warn("Unknown MBOX user %r", info.user)
- return
+
msg = _escapeMessageForEmail(message)
- fields = { 'user': addresses[info.user],
+ fields = { 'user': address,
'return': self.returnAddr,
'nickname': self.nickname,
'addr': self.addr,
@@ -249,22 +333,29 @@
THIS IS AN ANONYMOUS MESSAGE. The mixminion server '%(nickname)s' at
%(addr)s has been configured to deliver messages to your address. If you
do not want to receive messages in the future, contact %(contact)s and you
-will be removed. (XXXX Need real boilerplate)
+will be removed.
%(msg)s
""" % fields
- f = os.popen("sendmail -i -t", 'w')
- f.write(msg)
- status = f.close()
- if status != 0:
- getLog().error("Unsuccessful sendmail")
- return DELIVER_FAIL_RETRY
-
- return DELIVER_OK
+ return sendSMTPMessage(self.server, [address], self.returnAddr, msg)
#----------------------------------------------------------------------
+def sendSMTPMessage(server, toList, fromAddr, message):
+ con = smtplib(server)
+ try:
+ con.sendmail(fromAddr, toList, message)
+ res = DELIVER_OK
+ except smtplib.SMTPException, e:
+ getLog().warn("Unsuccessful smtp: "+str(e))
+ res = DELIVER_FAIL_RETRY #????
+ con.quit()
+ con.close()
+
+ return res
+
+#----------------------------------------------------------------------
_allChars = "".join(map(chr, range(256)))
_nonprinting = "".join(map(chr, range(0x00, 0x07)+range(0x0E, 0x20)))
@@ -286,3 +377,4 @@
============ BASE-64 ENCODED ANONYMOUS MESSAGE BEGINS
%s
============ BASE-64 ENCODED ANONYMOUS MESSAGE ENDS\n""" % msg
+
Index: Queue.py
===================================================================
RCS file: /home/minion/cvsroot/src/minion/lib/mixminion/Queue.py,v
retrieving revision 1.8
retrieving revision 1.9
diff -u -d -r1.8 -r1.9
--- Queue.py 11 Aug 2002 07:50:34 -0000 1.8
+++ Queue.py 12 Aug 2002 18:12:24 -0000 1.9
@@ -10,6 +10,7 @@
import base64
import time
import stat
+import cPickle
from mixminion.Common import MixError, MixFatalError, secureDelete, getLog
from mixminion.Crypto import AESCounterPRNG
@@ -86,7 +87,7 @@
mode = os.stat(location)[stat.ST_MODE]
if mode & 0077:
# XXXX be more Draconian.
- getLog().warn("Worrisome more %o on directory %s", mode, location)
+ getLog().warn("Worrisome mode %o on directory %s", mode, location)
if scrub:
self.cleanQueue(1)
@@ -102,6 +103,14 @@
self.finishMessage(f, handle)
return handle
+ def queueObject(self, object):
+ """Queue an object using cPickle, and return a handle to that
+ object."""
+ f, handle = self.openNewMessage()
+ cPickle.dump(contents, f, 1)
+ self.finishMessage(f, handle)
+ return handle
+
def count(self, recount=0):
"""Returns the number of complete messages in the queue."""
if self.n_entries >= 0 and not recount:
@@ -172,13 +181,21 @@
return open(os.path.join(self.dir, "msg_"+handle), 'rb')
def messageContents(self, handle):
- """Given a messagge handle, returns the contents of the corresponding
+ """Given a message handle, returns the contents of the corresponding
message."""
f = open(os.path.join(self.dir, "msg_"+handle), 'rb')
s = f.read()
f.close()
return s
+ def getObject(self, handle):
+ """Given a message handle, read and unpickle the contents of the
+ corresponding message."""
+ f = open(os.path.join(self.dir, "msg_"+handle), 'rb')
+ res = cPickle.load(f)
+ f.close()
+ return res
+
def openNewMessage(self):
"""Returns (file, handle) tuple to create a new message. Once
you're done writing, you must call finishMessage to
@@ -252,6 +269,146 @@
"""Helper method: creates a new random handle."""
junk = self.rng.getBytes(9)
return base64.encodestring(junk).strip().replace("/","-")
+
+class DeliveryQueue(Queue):
+ """A DeliveryQueue implements a queue that greedily sends messages
+ to outgoing streams that occasionally fail. Messages in a
+ DeliveryQueue are no longer unstructured text, but rather
+ tuples of: (n_retries, next_retry_time, addressing info, msg).
+
+ This class is abstract. Implementors of this class should
+ subclass it to add a deliverMessages method. Multiple
+ invocations of this method may be active at a given time. Upon
+ success or failure, this method should cause deliverySucceeded
+ or deliveryFailed to be called as appropriate.
+
+ Users of this class will probably only want to call the queueMessage,
+ sendReadyMessages, and nextMessageReadyAt methods.
+
+ This class caches information about the directory state; it
+ won't play nice if multiple instances are looking at the same
+ directory.
+ """
+ ###
+ # Fields:
+ # sendableAt -- An inorder list of (time, handle) for all messages
+ # that we're not currently sending. Each time is either a
+ # number of seconds since the epoch at which the corresponding
+ # message will be sendable, or 0 to indicate that the message
+ # is sendable _now_.
+ # pending -- Dict from handle->1, for all messages that we're
+ # currently sending.
+
+ def __init__(self, location):
+ Queue.__init__(self, location, create=1, scrub=1)
+ self._rescan()
+
+ def _rescan(self):
+ """Rebuild the internal state of this queue from the underlying
+ directory."""
+ self.sendableAt = []
+ self.pending = {}
+ now = time.time()
+ for h in self.getAllmessages():
+ t = self.getObject(handle)[1] # retry time
+ self.sendableAt.append[(t, h)]
+ self.sendableAt.sort()
+
+ def queueMessage(self, addr, msg, retry=0, retryAt=0):
+ """Schedule a message for delivery.
+ addr -- An object to indicate the message's destination
+ msg -- the message itself
+ retry -- how many times so far have we tried to send?
+ retryAt -- when should we next retry? (0 for now)."""
+
+ self.queueObject( (retry, retryAt, addr, msg) )
+
+ if retryAt == 0:
+ self.sendableAt[0:0] = [(0, handle)]
+ else:
+ bisect.insort_right(self.sendableAt, (retryAt, handle))
+ return handle
+
+ def nextMessageReadyAt(self):
+ """Return the soonest possible time at which sendReadyMessages
+ will send something. If some time < now is returned,
+ the answer is 'immediately'. """
+ if self.sendableAt:
+ return self.sendableAt[0][0]
+ else:
+ return None
+
+ def messageContents(self,handle):
+ """Returns a (n_retries, retryAt, addr, msg) payload for a given
+ message handle."""
+ return self.getObject(handle)
+
+ def sendReadyMessages(self):
+ """Sends all messages which are ready to be send, and which are not
+ already being sent."""
+ now = time.time()
+ idx = bisect.bisect_right(self.sendableAt, (now, ""))
+ messages = []
+ sendable = self.sendableAt[:idx]
+ del self.sendableAt[:idx]
+ for when, h in self.sendable:
+ retries, retryAt, addr, msg = self.getObject(h)
+ messages.append((h, addr, msg, retries))
+ self.pending[h] = 1
+ if messages:
+ self.deliverMessage(messages)
+
+ def deliverMessages(self, msgList):
+ """Abstract method; Invoked with a list of
+ (handle, addr, message, n_retries) tuples every time we have a batch
+ of messages to send."""
+ # We could implement this as a single deliverMessage(h,addr,m,n)
+ # method, but that wouldn't allow implementations to batch
+ # messages being sent to the same address.
+ assert 0
+
+ def deliverySucceeded(self, handle):
+ """Removes a message from the outgoing queue. This method
+ should be invoked after the corresponding message has been
+ successfully delivered.
+ """
+ self.removeMessage(handle)
+ del self.pending[handle]
+
+ def deliveryFailed(self, handle, retryAt=None):
+ """Removes a message from the outgoing queue, or requeues it
+ for delivery at a later time. This method should be
+ invoked after the corresponding message has been
+ successfully delivered."""
+ del self.pending[handle]
+ if self.retryAt is not None:
+ # Queue the new one before removing the old one, for
+ # crash-proofness
+ retries, retryAt, addr, msg = self.getObject(h)
+ self.queueMessage(addr, msg, retries+1, retryAt)
+ self.removeMessage(handle)
+
+class MixQueue(Queue):
+ """A MixQueue holds a group of files, and returns some of them
+ as requested, according to some mixing algorithm.
+
+ It's the responsibility of the user of this class to only invoke it
+ at the specified interval."""
+ # Right now, we use the 'Cottrell' mixing algorithm with fixed
+ # parameters. We always keep 5 messages in the pool, and never send
+ # more than 30% of the pool at a time. These parameters should probably
+ # be more like 100
+ MIN_POOL_SIZE = 5
+ MAX_REPLACEMENT_RATE = 0.3
+ def __init__(self, location):
+ Queue.__init__(self, location, create=1, scrub=1)
+
+ def pickMessages(self):
+ """Return a list of handles."""
+ n = self.count()
+ nTransmit = min(n-self.MIN_POOL_SIZE,
+ int(n*self.MAX_REPLACEMENT_RATE))
+ return self.pickRandom(nTransmit)
def _secureDelete_bg(files, cleanFile):
pid = os.fork()
Index: ServerInfo.py
===================================================================
RCS file: /home/minion/cvsroot/src/minion/lib/mixminion/ServerInfo.py,v
retrieving revision 1.8
retrieving revision 1.9
diff -u -d -r1.8 -r1.9
--- ServerInfo.py 6 Aug 2002 16:09:21 -0000 1.8
+++ ServerInfo.py 12 Aug 2002 18:12:24 -0000 1.9
@@ -132,7 +132,6 @@
return IPV4Info(self.getAddr(), self.getPort(), self.getKeyID())
#----------------------------------------------------------------------
-# This should go in a different file.
class ServerKeys:
"""A set of expirable keys for use by a server.
@@ -334,12 +333,13 @@
finally:
f.close()
- # for debugging XXXX ### Remove this once we're more confident.
+ # XXXX for debugging: try to parse and validate the thing we just made.
+ # XXXX Remove this once we're more confident.
ServerInfo(string=info)
return info
-#-----------------------b-----------------------------------------------
+#----------------------------------------------------------------------
def getServerInfoDigest(info):
"""Calculate the digest of a server descriptor"""
return _getServerInfoDigestImpl(info, None)
Index: ServerMain.py
===================================================================
RCS file: /home/minion/cvsroot/src/minion/lib/mixminion/ServerMain.py,v
retrieving revision 1.2
retrieving revision 1.3
diff -u -d -r1.2 -r1.3
--- ServerMain.py 11 Aug 2002 07:50:34 -0000 1.2
+++ ServerMain.py 12 Aug 2002 18:12:24 -0000 1.3
@@ -9,7 +9,7 @@
import cPickle
-from mixminion.Common import getLog, MixFatalError, MixError
+from mixminion.Common import getLog, MixFatalError, MixError, createPrivateDir
# Directory layout:
# MINION_HOME/work/queues/incoming/
@@ -27,23 +27,6 @@
# conf/miniond.conf
# ....
-def createDir(d):
- if not os.path.exists(d):
- try:
- os.mkdir(d, 0700)
- except OSError, e:
- getLog().fatal("Unable to create directory %s"%d)
- raise MixFatalError()
- elif not os.path.isdir(d):
- getLog().fatal("%s is not a directory"%d)
- raise MixFatalError()
- else:
- m = os.stat(d)[stat.ST_MODE]
- # check permissions
- if m & 0077:
- getLog().fatal("Directory %s must be mode 0700" %d)
- raise MixFatalError()
-
class ServerState:
# XXXX This should be refactored. keys should be separated from queues.
# config
@@ -56,8 +39,8 @@
# set up directory structure.
c = self.config
self.homedir = c['Server']['Homedir']
- createDir(self.homedir)
- getLog()._configure() # ????
+ createPrivateDir(self.homedir)
+ getLog()._configure(self.config)# ????
w = os.path.join(self.homeDir, "work")
q = os.path.join(w, "queues")
@@ -65,7 +48,6 @@
self.mixDir = os.path.join(q, "mix")
self.outgoingDir = os.path.join(q, "outgoing")
self.deliverDir = os.path.join(q, "deliver")
- self.deliverMBOXDir = os.path.join(self.deliverDir, "mbox")
tlsDir = os.path.join(w, "tls")
self.hashlogsDir = os.path.join(w, "hashlogs")
@@ -75,9 +57,9 @@
for d in [self.homeDir, w, q, self.incomingDir, self.mixDir,
self.outgoingDir, self.deliverDir, tlsDir,
self.hashlogsDir, self.keysDir, self.confDir]:
- createDir(d)
+ createPrivateDir(d)
- for name in ("incoming", "mix", "outgoing", "deliverMBOX"):
+ for name in ("incoming", "mix", "outgoing"):
loc = getattr(self, name+"Dir")
queue = mixminion.Queue.Queue(loc, create=1, scrub=1)
setattr(self, name+"Queue", queue)