[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[minion-cvs] Timeouts, descriptor parsing, refactoring, more tests.



Update of /home/minion/cvsroot/src/minion/lib/mixminion/server
In directory moria.mit.edu:/tmp/cvs-serv22424/lib/mixminion/server

Modified Files:
	ServerConfig.py ServerKeys.py ServerMain.py ServerQueue.py 
Log Message:
Timeouts, descriptor parsing, refactoring, more tests.

ClientMain, MMTPClient, Config:
- Support configurable client timeout

Common:
- Make stdout/stderr wrappers not suck

Config, ServerInfo:
- Add support for ServerInfo and directory parsing to ignore unrecognized
  keys

Packet, ServerList, ..., ServerKeys:
- Note work for 0.0.3

ServerMain:
- Change ProcessingThread to be callable-based, not packet-based.

test:
- Tests for log streams
- Tests for client timeouts
- Tests for interval list warnings



Index: ServerConfig.py
===================================================================
RCS file: /home/minion/cvsroot/src/minion/lib/mixminion/server/ServerConfig.py,v
retrieving revision 1.16
retrieving revision 1.17
diff -u -d -r1.16 -r1.17
--- ServerConfig.py	14 Jan 2003 09:20:18 -0000	1.16
+++ ServerConfig.py	17 Jan 2003 06:18:06 -0000	1.17
@@ -238,5 +238,4 @@
                           'Deny' : ('ALLOW*', C._parseAddressSet_deny, None) },
         # FFFF Missing: Queue-Size / Queue config options
         # FFFF         listen timeout??
-        # FFFF         Retry options
         }

Index: ServerKeys.py
===================================================================
RCS file: /home/minion/cvsroot/src/minion/lib/mixminion/server/ServerKeys.py,v
retrieving revision 1.9
retrieving revision 1.10
diff -u -d -r1.9 -r1.10
--- ServerKeys.py	13 Jan 2003 06:18:55 -0000	1.9
+++ ServerKeys.py	17 Jan 2003 06:18:06 -0000	1.10
@@ -498,6 +498,10 @@
         Valid-Until: %(ValidUntil)s
         Packet-Key: %(PacketKey)s
         """ % fields
+    # XXXX003 add 'packet-formats'
+    #   Packet-Formats: 0.2
+    # XXXX003 add 'software'
+    #   Software: Mixminion %(version)s
     if contact:
         info += "Contact: %s\n"%contact
     if comments:

Index: ServerMain.py
===================================================================
RCS file: /home/minion/cvsroot/src/minion/lib/mixminion/server/ServerMain.py,v
retrieving revision 1.32
retrieving revision 1.33
diff -u -d -r1.32 -r1.33
--- ServerMain.py	13 Jan 2003 06:35:52 -0000	1.32
+++ ServerMain.py	17 Jan 2003 06:18:06 -0000	1.33
@@ -48,15 +48,16 @@
         self.packetHandler = packetHandler
         self.mixPool = None
         self.moduleManager = None
-        self._queue = MessageQueue()
-        for h in self.getAllMessages():
-            assert h is not None
-            self._queue.put(h)
 
-    def connectQueues(self, mixPool, manager):
+    def connectQueues(self, mixPool, manager, processingThread):
         """Sets the target mix queue"""
         self.mixPool = mixPool
         self.moduleManager = manager #XXXX003 refactor.
+        self.processingThread = processingThread
+        for h in self.getAllMessages():
+            assert h is not None
+            self.processingThread.addJob(
+                lambda self=self, h=h: self.__deliverMessage(h))
 
     def queueMessage(self, msg):
         """Add a message for delivery"""
@@ -64,12 +65,10 @@
                   formatBase64(msg[:8]))
         h = mixminion.server.ServerQueue.Queue.queueMessage(self, msg)
         assert h is not None
-        self._queue.put(h)
-
-    def getMessageQueue(self):
-        return self._queue
+        self.processingThread.addJob(
+            lambda self=self, h=h: self.__deliverMessage(h))
 
-    def deliverMessage(self, handle):
+    def __deliverMessage(self, handle):
         """Process a single message with a given handle, and insert it into
            the Mix pool.  This function is called from within the processing
            thread."""
@@ -147,7 +146,7 @@
 
     def queueObject(self, obj):
         """Insert an object into the queue."""
-        obj.isDelivery() #XXXX remove this implicit typecheck.
+        obj.isDelivery() #XXXX003 remove this implicit typecheck.
         self.queue.queueObject(obj)
 
     def count(self):
@@ -172,7 +171,7 @@
         
         for h in handles:
             packet = self.queue.getObject(h)
-            #XXXX remove the first case
+            #XXXX remove the first case after 0.0.3
             if type(packet) == type(()):
                 LOG.debug("  (skipping message %s in obsolete format)", h)
             elif packet.isDelivery():
@@ -216,7 +215,7 @@
         "Implementation of abstract method from DeliveryQueue."
         # Map from addr -> [ (handle, msg) ... ]
         msgs = {}
-        # XXXX SKIP DEAD MESSAGES!!!!
+        # XXXX003 SKIP DEAD MESSAGES!!!!
         for handle, packet, n_retries in msgList:
             addr = packet.getAddress()
             message = packet.getPacket()
@@ -284,26 +283,31 @@
             LOG.error_exc(sys.exc_info(),
                           "Exception while cleaning; shutting down thread.")
 
-class PacketProcessingThread(threading.Thread):
+class ProcessingThread(threading.Thread):
+    class _Shutdown:
+        def __call__(self):
+            raise self
+    
     #DOCDOC
     def __init__(self, incomingQueue):
         threading.Thread.__init__(self)
-        # Clean up logic; maybe refactor. ????
-        self.incomingQueue = incomingQueue
-        self.mqueue = incomingQueue.getMessageQueue()
+        self.mqueue = MessageQueue()
 
     def shutdown(self):
         LOG.info("Telling processing thread to shut down.")
-        self.mqueue.put(None)
+        self.mqueue.put(ProcessingThread._Shutdown())
+
+    def addJob(self, job):
+        self.mqueue.put(job)
 
     def run(self):
         try:
             while 1:
-                handle = self.mqueue.get()
-                if handle is None:
-                    LOG.info("Processing thread shutting down.")
-                    return
-                self.incomingQueue.deliverMessage(handle)
+                job = self.mqueue.get()
+                job()
+        except ProcessingThread._Shutdown:
+            LOG.info("Processing thread shutting down.")
+            return
         except:
             LOG.error_exc(sys.exc_info(),
                           "Exception while processing; shutting down thread.")
@@ -417,19 +421,20 @@
         LOG.debug("Found %d pending messages in outgoing queue",
                        self.outgoingQueue.count())
 
+        self.cleaningThread = CleaningThread()
+        self.processingThread = ProcessingThread(self.incomingQueue)
+
         LOG.debug("Connecting queues")
         self.incomingQueue.connectQueues(mixPool=self.mixPool,
-                                         manager=self.moduleManager)
+                                         manager=self.moduleManager,
+                                        processingThread=self.processingThread)
         self.mixPool.connectQueues(outgoing=self.outgoingQueue,
                                    manager=self.moduleManager)
         self.outgoingQueue.connectQueues(server=self.mmtpServer)
         self.mmtpServer.connectQueues(incoming=self.incomingQueue,
                                       outgoing=self.outgoingQueue)
 
-
-        self.cleaningThread = CleaningThread()
         self.cleaningThread.start()
-        self.processingThread = PacketProcessingThread(self.incomingQueue)
         self.processingThread.start()
         self.moduleManager.startThreading()
 
@@ -478,13 +483,6 @@
                     LOG.fatal("One of our threads has halted; shutting down.")
                     return
                 
-##                 # Process any new messages that have come in, placing them
-##                 # into the mix pool.
-##                 self.incomingQueue.sendReadyMessages()
-##                  ##Prevent child processes from turning into zombies.
-##                  #???? I think we should just install a SIGCHLD handler.
-##                  waitForChildren(onceOnly=1,blocking=0)
-
                 # Calculate remaining time.
                 now = time.time()
                 timeLeft = nextEventTime - now
@@ -581,7 +579,7 @@
         if pid != 0:
             os._exit(0)
     # Chdir to / so that we don't hold the CWD unnecessarily.
-    os.chdir(os.path.normpath("/")) #???? Is this right on Win32?
+    os.chdir(os.path.normpath("/")) # WIN32 Is this right on Windows?
     # Set umask to 000 so that we drop any (possibly nutty) umasks that
     # our users had before.
     os.umask(0000)

Index: ServerQueue.py
===================================================================
RCS file: /home/minion/cvsroot/src/minion/lib/mixminion/server/ServerQueue.py,v
retrieving revision 1.3
retrieving revision 1.4
diff -u -d -r1.3 -r1.4
--- ServerQueue.py	13 Jan 2003 06:35:52 -0000	1.3
+++ ServerQueue.py	17 Jan 2003 06:18:06 -0000	1.4
@@ -318,8 +318,6 @@
        won't play nice if multiple instances are looking at the same
        directory.
     """
-    # XXXX separating addr was a mistake.
-    
     ###
     # Fields:
     #    sendable -- A list of handles for all messages
@@ -472,7 +470,6 @@
     """A TimedMixQueue holds a group of files, and returns some of them
        as requested, according to a mixing algorithm that sends a batch
        of messages every N seconds."""
-    # FFFF : interval is unused.
     ## Fields:
     #   interval: scanning interval, in seconds.
     def __init__(self, location, interval=600):
@@ -493,7 +490,6 @@
     """A CottrellMixQueue holds a group of files, and returns some of them
        as requested, according the Cottrell (timed dynamic-pool) mixing
        algorithm from Mixmaster."""
-    # FFFF : interval is unused.
     ## Fields:
     # interval: scanning interval, in seconds.
     # minPool: Minimum number of messages to keep in pool.