[Author Prev][Author Next][Thread Prev][Thread Next][Author Index][Thread Index]
[minion-cvs] Pull Modules.py bugfixes into main branch.
Update of /home/minion/cvsroot/src/minion/lib/mixminion/server
In directory moria.mit.edu:/tmp/cvs-serv1259
Modified Files:
Modules.py
Log Message:
Pull Modules.py bugfixes into main branch.
Index: Modules.py
===================================================================
RCS file: /home/minion/cvsroot/src/minion/lib/mixminion/server/Modules.py,v
retrieving revision 1.57
retrieving revision 1.58
diff -u -d -r1.57 -r1.58
--- Modules.py 28 Sep 2003 05:26:32 -0000 1.57
+++ Modules.py 13 Oct 2003 17:12:58 -0000 1.58
@@ -489,13 +489,14 @@
def close(self):
"""Release all resources held by all modules."""
for module in self.enabled.keys():
- self.nameToModule[module].close()
+ mod = self.nameToModule[module]
+ self.disableModule(mod)
+ mod.close()
def sync(self):
"""Flush all state held by all modules to disk."""
for module in self.enabled.keys():
- self.nameToModule[module].close()
-
+ self.nameToModule[module].sync()
#----------------------------------------------------------------------
class DropModule(DeliveryModule):
@@ -542,6 +543,7 @@
self.maxMessageSize = None
self.maxInterval = None
self.maxFragments = None
+ self.lock = threading.RLock()
def getConfigSyntax(self):
return { "Delivery/Fragmented" :
{ 'Enabled' : ('REQUIRE', _parseBoolean, "no"),
@@ -573,15 +575,27 @@
def getExitTypes(self):
return [ mixminion.Packet.FRAGMENT_TYPE ]
def createDeliveryQueue(self, queueDir):
- self.close()
- self._queue = FragmentDeliveryQueue(self, queueDir, self.manager)
- return self._queue
+ self.lock.acquire()
+ try:
+ self.close()
+ self._queue = FragmentDeliveryQueue(self, queueDir, self.manager)
+ return self._queue
+ finally:
+ self.lock.release()
def sync(self):
- self._queue.pool.sync()
+ self.lock.acquire()
+ try:
+ self._queue.pool.sync()
+ finally:
+ self.lock.release()
def close(self):
- if self._queue:
- self._queue.pool.close()
- self._queue = None
+ self.lock.acquire()
+ try:
+ if self._queue:
+ self._queue.pool.close()
+ self._queue = None
+ finally:
+ self.lock.release()
class FragmentDeliveryQueue:
"""Delivery queue for FragmentModule.
@@ -596,6 +610,7 @@
self.directory = directory
self.manager = manager
self.pool = mixminion.Fragments.FragmentPool(self.directory)
+ self.lock = self.module.lock
def getPriority(self):
# We want to make sure that fragmented messages get reassembled
@@ -617,35 +632,47 @@
# Should be instance of FragmentPayload.
payload = packet.getDecodedPayload()
assert payload is not None
- self.pool.addFragment(payload)
+ self.lock.acquire()
+ try:
+ self.pool.addFragment(payload)
+ finally:
+ self.lock.release()
def cleanQueue(self, deleteFn=None):
- self.pool.cleanQueue(deleteFn)
+ try:
+ self.lock.acquire()
+ self.pool.cleanQueue(deleteFn)
+ finally:
+ self.lock.release()
def sendReadyMessages(self):
- self.pool.unchunkMessages()
- ready = self.pool.listReadyMessages()
- for msgid in ready:
- msg = self.pool.getReadyMessage(msgid)
- try:
- ssfm = mixminion.Packet.parseServerSideFragmentedMessage(msg)
- del msg
- except ParseError:
- LOG.warn("Dropping malformed server-side fragmented message")
- self.pool.markMessageCompleted(msgid, rejected=1)
- continue
- if len(ssfm.compressedContents) > self.module.maxMessageSize:
- LOG.warn("Dropping over-long fragmented message")
- self.pool.markMessageCompleted(msgid, rejected=1)
- continue
-
- fm = _FragmentedDeliveryMessage(ssfm)
- self.manager.queueDecodedMessage(fm)
- self.pool.markMessageCompleted(msgid)
+ self.lock.acquire()
+ try:
+ self.pool.unchunkMessages()
+ ready = self.pool.listReadyMessages()
+ for msgid in ready:
+ msg = self.pool.getReadyMessage(msgid)
+ try:
+ ssfm = mixminion.Packet.parseServerSideFragmentedMessage(msg)
+ del msg
+ except ParseError:
+ LOG.warn("Dropping malformed server-side fragmented message")
+ self.pool.markMessageCompleted(msgid, rejected=1)
+ continue
+ if len(ssfm.compressedContents) > self.module.maxMessageSize:
+ LOG.warn("Dropping over-long fragmented message")
+ self.pool.markMessageCompleted(msgid, rejected=1)
+ continue
- cutoff = previousMidnight(time.time()) - self.module.maxInterval
- self.pool.expireMessages(cutoff)
+ fm = _FragmentedDeliveryMessage(ssfm)
+ self.manager.queueDecodedMessage(fm)
+ self.pool.markMessageCompleted(msgid)
+ cutoff = previousMidnight(time.time()) - self.module.maxInterval
+ self.pool.expireMessages(cutoff)
+ finally:
+ self.lock.release()
+
class _FragmentedDeliveryMessage:
"""Helper class: obeys the interface of mixminion.server.PacketHandler.
DeliveryMessage, but contains a long message reassembled from