[Author Prev][Author Next][Thread Prev][Thread Next][Author Index][Thread Index]
[minion-cvs] Maint branches: fix two bugs.
Update of /home/minion/cvsroot/src/minion/lib/mixminion/server
In directory moria.mit.edu:/tmp/cvs-serv31676/lib/mixminion/server
Modified Files:
Tag: mixminion-v0-0-5-patches
Modules.py
Log Message:
Maint branches: fix two bugs.
- Possible synchronization problems when shutting down Fragments module
and adding messages at the same time.
- The server would die with an exception when you tried to HUP it, if
it was running the Fragments module.
Index: Modules.py
===================================================================
RCS file: /home/minion/cvsroot/src/minion/lib/mixminion/server/Modules.py,v
retrieving revision 1.54.2.2
retrieving revision 1.54.2.3
diff -u -d -r1.54.2.2 -r1.54.2.3
--- Modules.py 28 Sep 2003 03:57:33 -0000 1.54.2.2
+++ Modules.py 13 Oct 2003 16:27:45 -0000 1.54.2.3
@@ -486,13 +486,14 @@
def close(self):
"""Release all resources held by all modules."""
for module in self.enabled.keys():
- self.nameToModule[module].close()
+ mod = self.nameToModule[module]
+ self.disableModule(mod)
+ mod.close()
def sync(self):
"""Flush all state held by all modules to disk."""
for module in self.enabled.keys():
- self.nameToModule[module].close()
-
+ self.nameToModule[module].sync()
#----------------------------------------------------------------------
class DropModule(DeliveryModule):
@@ -539,6 +540,7 @@
self.maxMessageSize = None
self.maxInterval = None
self.maxFragments = None
+ self.lock = threading.RLock()
def getConfigSyntax(self):
return { "Delivery/Fragmented" :
{ 'Enabled' : ('REQUIRE', _parseBoolean, "no"),
@@ -570,15 +572,27 @@
def getExitTypes(self):
return [ mixminion.Packet.FRAGMENT_TYPE ]
def createDeliveryQueue(self, queueDir):
- self.close()
- self._queue = FragmentDeliveryQueue(self, queueDir, self.manager)
- return self._queue
+ self.lock.acquire()
+ try:
+ self.close()
+ self._queue = FragmentDeliveryQueue(self, queueDir, self.manager)
+ return self._queue
+ finally:
+ self.lock.release()
def sync(self):
- self._queue.pool.sync()
+ self.lock.acquire()
+ try:
+ self._queue.pool.sync()
+ finally:
+ self.lock.release()
def close(self):
- if self._queue:
- self._queue.pool.close()
- self._queue = None
+ self.lock.acquire()
+ try:
+ if self._queue:
+ self._queue.pool.close()
+ self._queue = None
+ finally:
+ self.lock.release()
class FragmentDeliveryQueue:
"""Delivery queue for FragmentModule.
@@ -593,6 +607,7 @@
self.directory = directory
self.manager = manager
self.pool = mixminion.Fragments.FragmentPool(self.directory)
+ self.lock = self.module.lock
def getPriority(self):
# We want to make sure that fragmented messages get reassembled
@@ -614,35 +629,47 @@
# Should be instance of FragmentPayload.
payload = packet.getDecodedPayload()
assert payload is not None
- self.pool.addFragment(payload)
+ self.lock.acquire()
+ try:
+ self.pool.addFragment(payload)
+ finally:
+ self.lock.release()
def cleanQueue(self, deleteFn=None):
- self.pool.cleanQueue(deleteFn)
+ try:
+ self.lock.acquire()
+ self.pool.cleanQueue(deleteFn)
+ finally:
+ self.lock.release()
def sendReadyMessages(self):
- self.pool.unchunkMessages()
- ready = self.pool.listReadyMessages()
- for msgid in ready:
- msg = self.pool.getReadyMessage(msgid)
- try:
- ssfm = mixminion.Packet.parseServerSideFragmentedMessage(msg)
- del msg
- except ParseError:
- LOG.warn("Dropping malformed server-side fragmented message")
- self.pool.markMessageCompleted(msgid, rejected=1)
- continue
- if len(ssfm.compressedContents) > self.module.maxMessageSize:
- LOG.warn("Dropping over-long fragmented message")
- self.pool.markMessageCompleted(msgid, rejected=1)
- continue
-
- fm = _FragmentedDeliveryMessage(ssfm)
- self.manager.queueDecodedMessage(fm)
- self.pool.markMessageCompleted(msgid)
+ self.lock.acquire()
+ try:
+ self.pool.unchunkMessages()
+ ready = self.pool.listReadyMessages()
+ for msgid in ready:
+ msg = self.pool.getReadyMessage(msgid)
+ try:
+ ssfm = mixminion.Packet.parseServerSideFragmentedMessage(msg)
+ del msg
+ except ParseError:
+ LOG.warn("Dropping malformed server-side fragmented message")
+ self.pool.markMessageCompleted(msgid, rejected=1)
+ continue
+ if len(ssfm.compressedContents) > self.module.maxMessageSize:
+ LOG.warn("Dropping over-long fragmented message")
+ self.pool.markMessageCompleted(msgid, rejected=1)
+ continue
- cutoff = previousMidnight(time.time()) - self.module.maxInterval
- self.pool.expireMessages(cutoff)
+ fm = _FragmentedDeliveryMessage(ssfm)
+ self.manager.queueDecodedMessage(fm)
+ self.pool.markMessageCompleted(msgid)
+ cutoff = previousMidnight(time.time()) - self.module.maxInterval
+ self.pool.expireMessages(cutoff)
+ finally:
+ self.lock.release()
+
class _FragmentedDeliveryMessage:
"""Helper class: obeys the interface of mixminion.server.PacketHandler.
DeliveryMessage, but contains a long message reassembled from