[Author Prev][Author Next][Thread Prev][Thread Next][Author Index][Thread Index]

[minion-cvs] Pull Modules.py bugfixes into main branch.



Update of /home/minion/cvsroot/src/minion/lib/mixminion/server
In directory moria.mit.edu:/tmp/cvs-serv1259

Modified Files:
	Modules.py 
Log Message:
Pull Modules.py bugfixes into main branch.

Index: Modules.py
===================================================================
RCS file: /home/minion/cvsroot/src/minion/lib/mixminion/server/Modules.py,v
retrieving revision 1.57
retrieving revision 1.58
diff -u -d -r1.57 -r1.58
--- Modules.py	28 Sep 2003 05:26:32 -0000	1.57
+++ Modules.py	13 Oct 2003 17:12:58 -0000	1.58
@@ -489,13 +489,14 @@
     def close(self):
         """Release all resources held by all modules."""
         for module in self.enabled.keys():
-            self.nameToModule[module].close()
+            mod = self.nameToModule[module]
+            self.disableModule(mod)
+            mod.close()
 
     def sync(self):
         """Flush all state held by all modules to disk."""
         for module in self.enabled.keys():
-            self.nameToModule[module].close()
-
+            self.nameToModule[module].sync()
 
 #----------------------------------------------------------------------
 class DropModule(DeliveryModule):
@@ -542,6 +543,7 @@
         self.maxMessageSize = None
         self.maxInterval = None
         self.maxFragments = None
+        self.lock = threading.RLock()
     def getConfigSyntax(self):
         return { "Delivery/Fragmented" :
                  { 'Enabled' : ('REQUIRE',  _parseBoolean, "no"),
@@ -573,15 +575,27 @@
     def getExitTypes(self):
         return [ mixminion.Packet.FRAGMENT_TYPE ]
     def createDeliveryQueue(self, queueDir):
-        self.close()
-        self._queue = FragmentDeliveryQueue(self, queueDir, self.manager)
-        return self._queue
+        self.lock.acquire()
+        try:
+            self.close()
+            self._queue = FragmentDeliveryQueue(self, queueDir, self.manager)
+            return self._queue
+        finally:
+            self.lock.release()
     def sync(self):
-        self._queue.pool.sync()
+        self.lock.acquire()
+        try:
+            self._queue.pool.sync()
+        finally:
+            self.lock.release()
     def close(self):
-        if self._queue:
-            self._queue.pool.close()
-            self._queue = None
+        self.lock.acquire()
+        try:
+            if self._queue:
+                self._queue.pool.close()
+                self._queue = None
+        finally:
+            self.lock.release()
     
 class FragmentDeliveryQueue:
     """Delivery queue for FragmentModule.
@@ -596,6 +610,7 @@
         self.directory = directory
         self.manager = manager
         self.pool = mixminion.Fragments.FragmentPool(self.directory)
+        self.lock = self.module.lock
 
     def getPriority(self):
         # We want to make sure that fragmented messages get reassembled
@@ -617,35 +632,47 @@
         # Should be instance of FragmentPayload.
         payload = packet.getDecodedPayload()
         assert payload is not None
-        self.pool.addFragment(payload)
+        self.lock.acquire()
+        try:
+            self.pool.addFragment(payload)
+        finally:
+            self.lock.release()
 
     def cleanQueue(self, deleteFn=None):
-        self.pool.cleanQueue(deleteFn)
+        try:
+            self.lock.acquire()
+            self.pool.cleanQueue(deleteFn)
+        finally:
+            self.lock.release()
 
     def sendReadyMessages(self):
-        self.pool.unchunkMessages()
-        ready = self.pool.listReadyMessages()
-        for msgid in ready:
-            msg = self.pool.getReadyMessage(msgid)
-            try:
-                ssfm = mixminion.Packet.parseServerSideFragmentedMessage(msg)
-                del msg
-            except ParseError:
-                LOG.warn("Dropping malformed server-side fragmented message")
-                self.pool.markMessageCompleted(msgid, rejected=1)
-                continue
-            if len(ssfm.compressedContents) > self.module.maxMessageSize:
-                LOG.warn("Dropping over-long fragmented message")
-                self.pool.markMessageCompleted(msgid, rejected=1)
-                continue
-
-            fm = _FragmentedDeliveryMessage(ssfm)
-            self.manager.queueDecodedMessage(fm)
-            self.pool.markMessageCompleted(msgid)
+        self.lock.acquire()
+        try:
+            self.pool.unchunkMessages()
+            ready = self.pool.listReadyMessages()
+            for msgid in ready:
+                msg = self.pool.getReadyMessage(msgid)
+                try:
+                    ssfm = mixminion.Packet.parseServerSideFragmentedMessage(msg)
+                    del msg
+                except ParseError:
+                    LOG.warn("Dropping malformed server-side fragmented message")
+                    self.pool.markMessageCompleted(msgid, rejected=1)
+                    continue
+                if len(ssfm.compressedContents) > self.module.maxMessageSize:
+                    LOG.warn("Dropping over-long fragmented message")
+                    self.pool.markMessageCompleted(msgid, rejected=1)
+                    continue
 
-        cutoff = previousMidnight(time.time()) - self.module.maxInterval
-        self.pool.expireMessages(cutoff)
+                fm = _FragmentedDeliveryMessage(ssfm)
+                self.manager.queueDecodedMessage(fm)
+                self.pool.markMessageCompleted(msgid)
 
+            cutoff = previousMidnight(time.time()) - self.module.maxInterval
+            self.pool.expireMessages(cutoff)
+        finally:
+            self.lock.release()
+            
 class _FragmentedDeliveryMessage:
     """Helper class: obeys the interface of mixminion.server.PacketHandler.
        DeliveryMessage, but contains a long message reassembled from