[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[minion-cvs] More work on server code. Clean up some unnecessary co...



Update of /home/minion/cvsroot/src/minion/lib/mixminion
In directory moria.seul.org:/tmp/cvs-serv15299/lib/mixminion

Modified Files:
	Common.py Config.py Crypto.py MMTPServer.py Packet.py Queue.py 
	ServerMain.py test.py 
Log Message:
More work on server code.  Clean up some unnecessary complications in config system

Index: Common.py
===================================================================
RCS file: /home/minion/cvsroot/src/minion/lib/mixminion/Common.py,v
retrieving revision 1.10
retrieving revision 1.11
diff -u -d -r1.10 -r1.11
--- Common.py	26 Jul 2002 15:47:20 -0000	1.10
+++ Common.py	11 Aug 2002 07:50:34 -0000	1.11
@@ -57,11 +57,11 @@
 _SHRED_CMD = "---"
 _SHRED_OPTS = None
     
-def _shredConfigHook():
+def configureShredCommand(conf):
+    """Initialize the secure delete command from a given Config object.
+       If no object is provided, try some sane defaults."""
     global _SHRED_CMD
     global _SHRED_OPTS
-    import mixminion.Config as Config
-    conf = Config.getConfig()
     cmd, opts = None, None
     if conf is not None:
         val = conf['Host'].get('ShredCommand', None)
@@ -98,9 +98,7 @@
        XXXX let's try to avoid that, to be on the safe side. 
     """
     if _SHRED_CMD == "---":
-        import mixminion.Config as Config
-        _shredConfigHook()
-        Config.addHook(_shredConfigHook)
+        configureShredCommand(None)
 
     if fnames == []:
         return
@@ -125,13 +123,17 @@
 # I'm trying to make this interface look like a subset of the one in
 # the draft PEP-0282 (http://www.python.org/peps/pep-0282.html).
 
-#XXXX XXXX DOC DOC DOCDOC
-
 def _logtime():
-    #XXXX Is this guaranteed to work?
+    'Helper function.  Returns current local time formatted for log.'
+
+    # Note: Python strftime is implemented using that platform libc's
+    # strftime, so in theory, this might barf.  All of the format
+    # elements below are (I think) standard, so we should be ok.
     return time.strftime("%b %d %H:%m:%S")
 
-class FileLogTarget:
+class _FileLogHandler:
+    """Helper class for logging.  Represents a file on disk, and allows the
+       usual close-and-open gimmick for log rotation."""
     def __init__(self, fname):
         self.file = None
         self.fname = fname
@@ -139,14 +141,19 @@
     def reset(self):
         if self.file is not None:
             self.file.close()
-        # XXXX Fail sanely. :)
-        self.file = open(self.fname, 'a')
+	try: 
+	    self.file = open(self.fname, 'a')
+	except OSError, e:
+	    self.file = None
+	    raise MixError("Unable to open log file %r"%self.fname)
     def close(self):
         self.file.close()
     def write(self, severity, message):
+	if self.file is None:
+	    return
         print >> self.file, "%s [%s] %s" % (_logtime(), severity, message)
         
-class ConsoleLogTarget: 
+class _ConsoleLogHandler: 
     def __init__(self, file):
         self.file = file 
     def reset(self): pass
@@ -154,7 +161,6 @@
     def write(self, severity, message):
         print >> self.file, "%s [%s] %s" % (_logtime(), severity, message)
 
-
 _SEVERITIES = { 'TRACE' : -2,
                 'DEBUG' : -1,
                 'INFO' : 0,
@@ -164,19 +170,17 @@
                 'NEVER' : 100}
 
 class Log:
+    """A Log is a set of destinations for system messages, along with the
+       means to filter them to a desired verbosity."""
     def __init__(self, minSeverity):
-        self.handlers = []
-        self.setMinSeverity(minSeverity)
-        onReset(self.reset)
-        onTerminate(self.close)
+	self.configure(None)
+	self.setMinSeverity(minSeverity)
 
-    def _configure(self):
-        import mixminion.Config as Config
-        config = Config.getConfig()
+    def configure(self, config):
         self.handlers = []
         if config == None or not config.has_section("Server"):
             self.setMinSeverity("WARN")
-            self.addHandler(ConsoleLogTarget(sys.stderr))
+            self.addHandler(_ConsoleLogHandler(sys.stderr))
         else:
             self.setMinSeverity(config['Server'].get('LogLevel', "WARN"))
             logfile = config['Server'].get('LogFile',None)
@@ -184,10 +188,14 @@
                 homedir = config['Server']['Homedir']
                 if homedir:
                     logfile = os.path.join(homedir, "log")
-            if not logfile or config['Server'].get('EchoMessages',0):
-                self.addHandler(ConsoleLogTarget(sys.stderr))
+	    self.addHandler(_ConsoleLogHandler(sys.stderr))
             if logfile:
-                self.addHandler(FileLogTarget(logfile))
+		try:
+		    self.addHandler(_FileLogHandler(logfile))
+		except MixError, e:
+		    self.error(str(e))
+            if logfile and not config['Server'].get('EchoMessages',0):
+		del self.handlers[0]
             
     def setMinSeverity(self, minSeverity):
         self.severity = _SEVERITIES.get(minSeverity, 1)
@@ -202,9 +210,14 @@
         self.handlers.append(handler)
 
     def reset(self):
-        # FFFF reload configuration information here?
         for h in self.handlers:
-            h.reset()
+	    try:
+		h.reset()
+	    except MixError, e:
+		if len(self.handlers) > 1:
+		    self.error(str(e))
+		else:
+		    print >>sys.stderr, "Unable to reset log system"
 
     def close(self):
         for h in self.handlers:
@@ -230,18 +243,14 @@
     def fatal(self, message, *args):
         self.log("FATAL", message, *args)
 
-_theLog = None
-
+_THE_LOG = None
 def getLog():
     """Return the MixMinion log object."""
-    global _theLog
-    if _theLog is None:
-        import mixminion.Config as Config
-        _theLog = Log('DEBUG')
-        _theLog._configure()
-        Config.addHook(_theLog._configure)
-        
-    return _theLog
+    global _THE_LOG
+    if _THE_LOG is None:
+        _THE_LOG = Log('WARN')
+
+    return _THE_LOG
 
 #----------------------------------------------------------------------
 # Signal handling

Index: Config.py
===================================================================
RCS file: /home/minion/cvsroot/src/minion/lib/mixminion/Config.py,v
retrieving revision 1.7
retrieving revision 1.8
diff -u -d -r1.7 -r1.8
--- Config.py	6 Aug 2002 16:09:21 -0000	1.7
+++ Config.py	11 Aug 2002 07:50:34 -0000	1.8
@@ -46,7 +46,7 @@
    The restricted format is used for server descriptors.   
    """
 
-__all__ = [ 'getConfig', 'loadConfig', 'addHook' ]
+__all__ = [ 'getConfig', 'loadConfig' ]
 
 import os
 import re
@@ -80,26 +80,12 @@
         assert fname is not None
         _theConfiguration = ClientConfig(fname)
 
-    mixminion.Common.onReset(_theConfiguration.reload)
-
 def getConfig():
     """Return the configuration object for this process, or None if we haven't
        been configured yet."""
     return _theConfiguration
 #----------------------------------------------------------------------
 
-_CONFIG_HOOKS = []
-def addHook(hook):
-    '''Add 'hook' (a 0-argument function) to the list of configuration
-       hooks.  Whenever the configuration file is reloaded (as on a
-       SIGHUP), it invokes each of the configuration hooks in the
-       order it was added to the list.'''
-    # This isn't a method of _Config, since we want to be able to call
-    # it before we read the configuration file.
-    _CONFIG_HOOKS.append(hook)
-
-#----------------------------------------------------------------------
-
 class ConfigError(MixError):
     """Thrown when an error is found in a configuration file."""
     pass
@@ -233,7 +219,7 @@
     cmd, opts = c[0], c[1:]
     if os.path.isabs(cmd):
         if not os.path.exists(cmd):
-            raise ConfigError("File not found: %s" % cmd)
+            raise ConfigError("Executable file not found: %s" % cmd)
         else:
             return cmd, opts
     else:
@@ -300,7 +286,6 @@
 	    (0 <= mm < 60)  and (0 <= ss <= 61)):
 	raise ConfigError("Invalid %s %r" % (("date","time")[_timeMode],s))
 
-
     # we set the DST flag to zero so that subtracting time.timezone always
     # gives us gmt.
     return time.mktime((yyyy,MM,dd,hh,mm,ss,0,0,0))-time.timezone
@@ -422,7 +407,10 @@
     #  _sections: A map from secname->key->value.
     #  _sectionEntries: A  map from secname->[ (key, value) ] inorder.
     #  _sectionNames: An inorder list of secnames.
-    #  _callbacks: XXXX DOC
+    #  _callbacks: A map from section name to a callback function that should 
+    #      be invoked with (section,sectionEntries) after each section is
+    #      read.  This shouldn't be used for validation; it's for code that
+    #      needs to change the semantics of the parser.
     #
     # Fields to be set by a subclass:
     #     _syntax is map from sec->{key:
@@ -483,8 +471,6 @@
         f = open(self.fname, 'r')
         try:
             self.__reload(f)
-            for hook in _CONFIG_HOOKS:
-                hook()
         finally:
             f.close()
 
@@ -604,6 +590,7 @@
         self._sectionNames = self_sectionNames
 
     def _addCallback(self, section, cb):
+	"""For use by subclasses.  Adds a callback for a section"""
         if not hasattr(self, '_callbacks'):
             self._callbacks = {}
         self._callbacks[section] = cb
@@ -701,15 +688,20 @@
         }
 
 class ServerConfig(_ConfigFile):
+    ##
+    # Fields: 
+    #   moduleManager
+    #
     _restrictFormat = 0
     # XXXX Missing: Queue-Size / Queue config options
     # XXXX         timeout options
+    # XXXX       listen timeout??
     def __init__(self, fname=None, string=None):
         self._syntax = SERVER_SYNTAX.copy()
 
         import mixminion.Modules
         self.moduleManager = mixminion.Modules.ModuleManager()
-        self._addCallback("Server", self.loadModules)    
+        self._addCallback("Server", self.__loadModules)    
 
         _ConfigFile.__init__(self, fname, string)
 
@@ -717,19 +709,19 @@
         #XXXX write this.
         self.moduleManager.validate(sections, entries, lines, contents)
 
-    def loadModules(self, section, sectionEntries):
+    def __loadModules(self, section, sectionEntries):
+	"""Callback from the [Server] section of a config file.  Parses
+	   the module options, and adds new sections to the syntax 
+	   accordingly."""
         self.moduleManager.setPath(section.get('ModulePath', None))
         for mod in section.get('Module', []):
+	    info("Loading module %s", mod)
             self.moduleManager.loadExtModule(mod)
 
         self._syntax.update(self.moduleManager.getConfigSyntax())
     
-        
-    
 ##         if sections['Server']['PublicKeyLifeTime'][2] < 24*60*60:
 ##             raise ConfigError("PublicKeyLifetime must be at least 1 day.")
 ##         elif sections['Server']['PublicKeyLifeTime'][2] % (24*60*60) > 30:
 ##             getLog().warn("PublicKeyLifetime rounded to the nearest day")
 ##             nDays = sections[60*60*24]
-
-

Index: Crypto.py
===================================================================
RCS file: /home/minion/cvsroot/src/minion/lib/mixminion/Crypto.py,v
retrieving revision 1.11
retrieving revision 1.12
diff -u -d -r1.11 -r1.12
--- Crypto.py	28 Jul 2002 22:42:33 -0000	1.11
+++ Crypto.py	11 Aug 2002 07:50:34 -0000	1.12
@@ -9,6 +9,7 @@
    functionality themselves."""
 
 import os
+import sys
 import stat
 from types import StringType
 
@@ -35,14 +36,17 @@
 # Number of bytes in a SHA1 digest
 DIGEST_LEN = 160 >> 3
 
-def init_crypto():
+def init_crypto(config=None):
     """Initialize the crypto subsystem."""
+    configure_trng(config)
     trng(1)
     try:
         # Try to read /dev/urandom
         trng(1)
+    except MixFatalError, e:
+	raise
     except:
-        raise MixFatalError("Couldn't initialize entropy source")
+        raise MixFatalError("Error initializing entropy source")
     openssl_seed(40)
 
 def sha1(s):
@@ -453,40 +457,72 @@
         _theSharedPRNG = AESCounterPRNG()
     return _theSharedPRNG
 
+#----------------------------------------------------------------------
+# TRNG implementation
+
+# Here, we pick default files.
+#
+# This is a tricky point.  We want a device that gives securely-seeded
+# numbers from a really strong entropy source, but we don't need it to
+# block.  On Linux, this is /dev/urandom.  On BSD-ish things, this
+# MAY be /dev/srandom (the man page only says that urandom 'is not
+# guaranteed to be secure).  On Darwin, neither one seems to block.
+# On commercial Unices, your guess is as good as mine.
+PLATFORM_TRNG_DEFAULTS = {
+    'darwin' : [ "/dev/urandom", "/dev/random" ],
+    'linux2' : [ "/dev/urandom" ],
+    '***' : [ "/dev/urandom", "/dev/srandom", "/dev/random" ],
+    }
+
 _TRNG_FILENAME = None
-def _trng_set_filename():
+def configure_trng(config):
+    """Initialize the true entropy source from a given Config object.  If
+       none is provided, tries some sane defaults.""" 
     global _TRNG_FILENAME
-    config = mixminion.Config.getConfig()
     if config is not None:
-        file = config['Host'].get('EntropySource', "/dev/urandom")
+        requestedFile = config['Host'].get('EntropySource', None)
     else:
-        file = "/dev/urandom"
+	requestedFile = None
 
-    if not os.path.exists(file):
-        getLog().error("No such file as %s", file)
-        file = None
-    else:
-        st = os.stat(file)
-        if not (st[stat.ST_MODE] & stat.S_IFCHR):
-            getLog().error("Entropy source %s isn't a character device", file)
-            file = None
+    defaults = 	PLATFORM_TRNG_DEFAULTS.get(sys.platform,
+				   PLATFORM_TRNG_DEFAULTS['***'])
+    files = [ requestedFile ] + defaults
 
-    if file is None and _TRNG_FILENAME is None:
+    randFile = None
+    for file in files:
+	if file is None: 
+	    continue
+
+	verbose = 1#(file == requestedFile)
+	if not os.path.exists(file):
+	    if verbose:
+		getLog().error("No such file as %s", file)
+	else:
+	    st = os.stat(file)
+	    if not (st[stat.ST_MODE] & stat.S_IFCHR):
+		if verbose:
+		    getLog().error("Entropy source %s isn't a character device",
+				   file)
+	    else:
+		randFile = file
+		break
+
+    if randFile is None and _TRNG_FILENAME is None:
         getLog().fatal("No entropy source available")
         raise MixFatalError("No entropy source available")
-    elif file is None:
+    elif randFile is None:
         getLog().warn("Falling back to previous entropy source %s",
                       _TRNG_FILENAME)
     else:
-        _TRNG_FILENAME = file
+	getLog().info("Setting entropy source to %r", randFile)
+        _TRNG_FILENAME = randFile
     
 def _trng_uncached(n):
     '''Underlying access to our true entropy source.'''
     if _TRNG_FILENAME is None:
-        _trng_set_filename()
-        mixminion.Config.addHook(_trng_set_filename)
+        configure_trng(None)
         
-    f = open(_TRNG_FILENAME)
+    f = open(_TRNG_FILENAME, 'rb')
     d = f.read(n)
     f.close()
     return d

Index: MMTPServer.py
===================================================================
RCS file: /home/minion/cvsroot/src/minion/lib/mixminion/MMTPServer.py,v
retrieving revision 1.8
retrieving revision 1.9
diff -u -d -r1.8 -r1.9
--- MMTPServer.py	6 Aug 2002 16:09:21 -0000	1.8
+++ MMTPServer.py	11 Aug 2002 07:50:34 -0000	1.9
@@ -323,10 +323,7 @@
         while len(out):
             r = self.__con.write(out) # may throw
 
-            if r == 0:
-                self.shutdown() #XXXX
-                return
-
+	    assert r > 0
             out = out[r:]
 
         self.__outbuf = out
@@ -359,7 +356,6 @@
             self.__server.registerReader(self)
         except _ml.TLSClosed:
             warn("Unexpectedly closed connection")
-
             self.__sock.close()
             self.__server.unregister(self) 
         except _ml.TLSError:
@@ -488,9 +484,20 @@
 
 #----------------------------------------------------------------------
         
+# XXXX retry logic.
 class MMTPClientConnection(SimpleTLSConnection):
-    def __init__(self, context, ip, port, keyID, messageList,
+    def __init__(self, context, ip, port, keyID, messageList, handleList,
                  sentCallback=None):
+	"""Create a connection to send messages to an MMTP server.  
+	   ip -- The IP of the destination server.
+	   port -- The port to connect to.
+	   keyID -- None, or the expected SHA1 hash of the server's public key
+	   messageList -- a list of message payloads to send
+	   handleList -- a list of objects corresponding to the messages in
+	      messageList.  Used for callback.
+           sentCallback -- None, or a function of (msg, handle) to be called
+              whenever a message is successfully sent."""
+	
         trace("CLIENT CON")
         sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
         sock.setblocking(0)
@@ -506,6 +513,7 @@
 
         SimpleTLSConnection.__init__(self, sock, tls, 0)
         self.messageList = messageList
+	self.handleList = handleList
         self.finished = self.__setupFinished
         self.sentCallback = sentCallback
 
@@ -581,38 +589,44 @@
 
        debug("Received valid ACK for message.")
        justSent = self.messageList[0]
+       justSentHandle = self.handleList[0]
        del self.messageList[0]
+       del self.handleList[0]
        if self.sentCallback is not None:
-           self.sentCallback(justSent)
-
+           self.sentCallback(justSent, justSetHandle)
+	   
        self.beginNextMessage()
 
+LISTEN_BACKLOG = 10 # ???? Is something else more reasonable
 class MMTPServer(AsyncServer):
-    "XXXX"
+    """A helper class to invoke AsyncServer, MMTPServerConnection, and
+       MMTPClientConnection"""
     def __init__(self, config):
         self.context = config.getTLSContext(server=1)
         self.listener = ListenConnection("127.0.0.1",
-                                         config['Outgoing/MMTP']['Port']
-                                         10, self._newMMTPConnection)
+                                         config['Outgoing/MMTP']['Port'],
+					 LISTEN_BACKLOG,
+                                         self._newMMTPConnection)
         self.config = config
         self.listener.register(self)
 
     def _newMMTPConnection(self, sock):
-        "XXXX"
+	"""helper method.  Creates and registers a new server connection when
+	   the listener socket gets a hit."""
         # XXXX Check whether incoming IP is valid XXXX
         tls = self.context.sock(sock, serverMode=1)
         sock.setblocking(0)
         con = MMTPServerConnection(sock, tls, self.onMessageReceived)
         con.register(self)
         
-    def sendMessages(self, ip, port, keyID, messages):
-        con = MMTPClientConnection(ip, port, keyID, messages,
+    def sendMessages(self, ip, port, keyID, messages, handles):
+	"""Send a set of messages to a given server."""
+        con = MMTPClientConnection(ip, port, keyID, messages, handles,
                                    self.onMessageSent)
         con.register(self)
 
     def onMessageReceived(self, msg):
         pass
 
-    def onMessageSent(self, msg):
+    def onMessageSent(self, msg, handle):
         pass
-    

Index: Packet.py
===================================================================
RCS file: /home/minion/cvsroot/src/minion/lib/mixminion/Packet.py,v
retrieving revision 1.6
retrieving revision 1.7
diff -u -d -r1.6 -r1.7
--- Packet.py	6 Aug 2002 16:09:21 -0000	1.6
+++ Packet.py	11 Aug 2002 07:50:34 -0000	1.7
@@ -324,6 +324,13 @@
         """Return the routing info for this address"""
         assert len(self.keyinfo) == DIGEST_LEN
         return struct.pack(IPV4_PAT, _packIP(self.ip), self.port, self.keyinfo)
+    
+    def __hash__(self):
+	return hash(self.pack())
+
+    def __eq__(self, other):
+	return (type(self) == type(other) and self.ip == other.ip and
+		self.port == other.port and self.keyinfo == other.keyinfo)
 
 def parseSMTPInfo(s):
     """Convert the encoding of an SMTP routinginfo into an SMTPInfo object."""

Index: Queue.py
===================================================================
RCS file: /home/minion/cvsroot/src/minion/lib/mixminion/Queue.py,v
retrieving revision 1.7
retrieving revision 1.8
diff -u -d -r1.7 -r1.8
--- Queue.py	6 Aug 2002 16:09:21 -0000	1.7
+++ Queue.py	11 Aug 2002 07:50:34 -0000	1.8
@@ -56,6 +56,8 @@
     # Fields:   rng--a random number generator for creating new messages
     #                and getting a random slice of the queue.
     #           dir--the location of the queue.
+    #           n_entries: the number of complete messages in the queue.
+    #                 <0 if we haven't counted yet.  
     def __init__(self, location, create=0, scrub=0):
         """Creates a queue object for a given directory, 'location'.  If
            'create' is true, creates the directory if necessary.  If 'scrub'
@@ -89,6 +91,9 @@
         if scrub:
             self.cleanQueue(1)
 
+	# Count messages on first time through.
+	self.n_entries = -1
+
     def queueMessage(self, contents):
         """Creates a new message in the queue whose contents are 'contents',
            and returns a handle to that message."""
@@ -97,13 +102,17 @@
         self.finishMessage(f, handle)
         return handle
 
-    def count(self):
+    def count(self, recount=0):
         """Returns the number of complete messages in the queue."""
-        res = 0
-        for fn in os.listdir(self.dir):
-            if fn.startswith("msg_"):
-                res += 1
-        return res
+	if self.n_entries >= 0 and not recount:
+	    return self.n_entries
+	else:
+	    res = 0
+	    for fn in os.listdir(self.dir):
+		if fn.startswith("msg_"):
+		    res += 1
+	    self.n_entries = res
+	    return res
 
     def pickRandom(self, count=None):
         """Returns a list of 'count' handles to messages in this queue.
@@ -111,7 +120,6 @@
 
            If there are fewer than 'count' messages in the queue, all the
            messages will be retained."""
-
         messages = [fn for fn in os.listdir(self.dir) if fn.startswith("msg_")]
 
         n = len(messages)
@@ -130,6 +138,11 @@
 
         return [m[4:] for m in messages[:count]]
 
+    def getAllMessages(self):
+	"""Returns handles for all messages currently in the queue.
+           Note: this ordering is not guaranteed to be random"""
+        return [fn[4:] for fn in os.listdir(self.dir) if fn.startswith("msg_")]
+
     def removeMessage(self, handle):
         """Given a handle, removes the corresponding message from the queue."""
         self.__changeState(handle, "msg", "rmv")
@@ -140,9 +153,7 @@
         for m in os.listdir(self.dir):
             if m[:4] in ('inp_', 'msg_'):
                 self.__changeState(m[4:], m[:3], "rmv")
-                #removed.append(os.path.join(self.dir, "rmv_"+m[4:]))
-        #    elif m[:4] == 'rmv_':
-        #        removed.append(self.dir)
+	self.n_entries = 0
         self.cleanQueue()
 
     def moveMessage(self, handle, queue):
@@ -175,7 +186,7 @@
         handle = self.__newHandle()
         fname = os.path.join(self.dir, "inp_"+handle)
         fd = os.open(fname, _NEW_MESSAGE_MODE, 0600)
-        return os.fdopen(fd, 'w'), handle
+        return os.fdopen(fd, 'wb'), handle
 
     def finishMessage(self, f, handle):
         """Given a file and a corresponding handle, closes the file
@@ -227,9 +238,15 @@
 
     def __changeState(self, handle, s1, s2):
         """Helper method: changes the state of message 'handle' from 's1'
-           to 's2'."""
+           to 's2', and changes the internal count."""
         os.rename(os.path.join(self.dir, s1+"_"+handle),
                   os.path.join(self.dir, s2+"_"+handle))
+	if self.n_entries < 0:
+	    return
+	if s1 == 'msg' and s2 != 'msg':
+	    self.n_entries -= 1
+	elif s1 != 'msg' and s2 == 'msg':
+	    self.n_entries += 1
 
     def __newHandle(self):
         """Helper method: creates a new random handle."""

Index: ServerMain.py
===================================================================
RCS file: /home/minion/cvsroot/src/minion/lib/mixminion/ServerMain.py,v
retrieving revision 1.1
retrieving revision 1.2
diff -u -d -r1.1 -r1.2
--- ServerMain.py	6 Aug 2002 16:09:21 -0000	1.1
+++ ServerMain.py	11 Aug 2002 07:50:34 -0000	1.2
@@ -5,7 +5,9 @@
 
    The main loop and related functionality for a Mixminion server
 
-   BUG: No support for public key encryption"""
+   BUG: No support for encrypting private keys.n"""
+
+import cPickle
 
 from mixminion.Common import getLog, MixFatalError, MixError
 
@@ -43,6 +45,7 @@
             raise MixFatalError()
 
 class ServerState:
+    # XXXX This should be refactored.  keys should be separated from queues.
     # config
     # log
     # homedir
@@ -153,7 +156,6 @@
 
     def getDeliverMBOXQueue(self, which):
         return self.deliverMBOXQueue
-        
 
 class _Server(MMTPServer):
     def __init__(self, config, serverState):
@@ -164,27 +166,96 @@
     def onMessageReceived(self, msg):
         self.incomingQueue.queueMessage(msg)
 
-    def onMessageSent(self, msg):
-        self.outgoingQueue.remove
-    
+    def onMessageSent(self, msg, handle):
+        self.outgoingQueue.remove(handle)
 
 def runServer(config):
     s = ServerState(config)
+    log = getLog()
     packetHandler = s.getPacketHandler()
     context = s.getTLSContext()
 
-    shouldProcess = len(os.listdir(s.incomingDir))
-    shouldSend = len(os.listdir(s.outgoingDir))
-    shouldMBox = len(os.listdir(s.deliverMBOXDir))
     # XXXX Make these configurable; make mixing OO.
     mixInterval = 60
     mixPoolMinSize = 5
-    mixPoolMaxRate = 5 
+    mixPoolMaxRate = 0.5 
 
     nextMixTime = time.time() + mixInterval
 
-    server = mixminion.MMTPServer.MMTPServer(config)
+    server = _Server(config, s)
+
+    incomingQueue = s.getIncomingQueue()
+    outgoingQueue = s.getOutgoingQueue()
+    mixQueue = s.getMixQueue()
+    deliverMBOXQueue = s.getDeliverMBOXQueue()
+    while 1:  # Add shutdown mechanism XXXX
+        server.process(1)
+
+	# Possible timing attack here????? XXXXX ????
+	if incomingQueue.count():
+	    for h in incomingQueue.getAllMessages():
+		msg = incomingQueue.messageContents(h)
+		res = None
+		try:
+		    res = packetHandler.processHandler(msg)
+		    if res is None: log.info("Padding message dropped")
+		except miximinion.Packet.ParseError, e:
+		    log.warn("Malformed message dropped:"+str(e))
+		except miximinion.Crypto.CryptoError, e:
+		    log.warn("Invalid PK or misencrypted packet header:"+str(e))
+		except mixminion.PacketHandler.ContentError, e:
+		    log.warn("Discarding bad packet:"+str(e))
+
+		if res is not None:
+		    f, newHandle = mixQueue.openNewMessage()
+		    cPickle.dump(res, f, 1)
+		    mixQueue.finishMessage(f, newHandle)
+		    log.info("Message added to mix queue")
+		
+		incomingQueue.removeMessage(h)
+	
+	if time.time() > nextMixTime:
+	    nextMixTime = time.time() + mixInterval
+	    
+	    poolSize = mixQueue.count()
+	    if poolSize > mixPoolMinSize:
+		beginSending = {}
+		n = min(poolSize-mixPoolMinSize, int(poolSize*mixPoolMaxRate))
+		handles = mixQueue.pickRandom(n)
+		for h in handles:
+		    f = mixQueue.openMessage(h)
+		    type, data = cPickle.load(f)
+		    f.close()
+		    if type == 'QUEUE':
+			newHandle = mixQueue.moveMessage(h, outgoingQueue)
+			ipv4info, payload = data
+			beginSending.setdefault(ipv4info,[]).append(
+			    (newHandle, payload))
+		    else:
+			assert type == 'EXIT'
+			# XXXX Use modulemanager for this.
+			rt, ri, appKey, payload = data
+			if rt == Modules.DROP_TYPE:
+			    mixQueue.removeMessage(h)
+			elif rt == Moddules.MBOX_TYPE:
+			    mixQueue.moveMessage(h, deliverMBOXQueue)
+			else:
+			    # XXXX Shouldn't we drop stuff as early as possible,
+			    # XXXX so that we don't wind up with only a single
+			    # XXXX message sent out of the server?
+			    log.warn("Dropping unhandled type 0x%04x",rt)
+			    mixQueue.removeMessage(h)
+
+		if beginSending:
+		    # XXXX Handle timeouts; handle if we're already sending
+		    # XXXX to a server.
+		    for addr, messages in beginSending.items():
+			handles, payloads = zip(*messages)
+			server.sendMessages(addr.ip, addr.port, addr.keyinfo,
+					    payloads, handles)
+		    
+	    # XXXX Retry and resend after failure.
+	    # XXXX Scan for messages we should begin sending after restart.
+	    # XXXX Remove long-undeliverable messages
+	    # XXXX Deliver MBOX messages.
 
-    while 1:
-        
-        

Index: test.py
===================================================================
RCS file: /home/minion/cvsroot/src/minion/lib/mixminion/test.py,v
retrieving revision 1.16
retrieving revision 1.17
diff -u -d -r1.16 -r1.17
--- test.py	6 Aug 2002 16:09:21 -0000	1.16
+++ test.py	11 Aug 2002 07:50:34 -0000	1.17
@@ -1347,17 +1347,18 @@
         queue1.cleanQueue()    
         queue2.cleanQueue()
 
-#----------------------------------------------------------------------
+#---------------------------------------------------------------------
 # LOGGING
 class LogTests(unittest.TestCase):
     def testLogging(self):
         import cStringIO
-        from mixminion.Common import Log, FileLogTarget, ConsoleLogTarget
+        from mixminion.Common import Log, _FileLogHandler, _ConsoleLogHandler
         log = Log("INFO")
         self.assertEquals(log.getMinSeverity(), "INFO")
+	log.handlers = []
         log.log("WARN", "This message should not appear")
         buf = cStringIO.StringIO()
-        log.addHandler(ConsoleLogTarget(buf))
+        log.addHandler(_ConsoleLogHandler(buf))
         log.trace("Foo")
         self.assertEquals(buf.getvalue(), "")
         log.log("WARN", "Hello%sworld", ", ")
@@ -1371,7 +1372,7 @@
         t = tempfile.mktemp("log")
         t1 = t+"1"
         unlink_on_exit(t, t1)
-        log.addHandler(FileLogTarget(t))
+        log.addHandler(_FileLogHandler(t))
         log.info("Abc")
         log.info("Def")
         os.rename(t,t1)
@@ -1514,7 +1515,8 @@
         messages = ["helloxxx"*4096, "helloyyy"*4096]
         async = mixminion.MMTPServer.AsyncServer()
         clientcon = mixminion.MMTPServer.MMTPClientConnection(
-           _getTLSContext(0), "127.0.0.1", TEST_PORT, keyid, messages[:], None)
+           _getTLSContext(0), "127.0.0.1", TEST_PORT, keyid, messages[:], 
+	   [None, None], None)
         clientcon.register(async)
         def clientThread(clientcon=clientcon, async=async):
             while not clientcon.isShutdown():
@@ -1536,7 +1538,7 @@
         # Again, with bad keyid.
         clientcon = mixminion.MMTPServer.MMTPClientConnection(
            _getTLSContext(0), "127.0.0.1", TEST_PORT, "Z"*20,
-           messages[:], None)
+           messages[:], [None, None], None)
         clientcon.register(async)
         def clientThread(clientcon=clientcon, async=async):
             while not clientcon.isShutdown():