[Author Prev][Author Next][Thread Prev][Thread Next][Author Index][Thread Index]
[minion-cvs] Backport fix for removing packets on partially successf...
Update of /home/minion/cvsroot/src/minion/lib/mixminion
In directory moria.mit.edu:/tmp/cvs-serv17100/lib/mixminion
Modified Files:
Tag: mixminion-v0-0-6-patches
ClientMain.py
Log Message:
Backport fix for removing packets on partially successful delivery.
Index: ClientMain.py
===================================================================
RCS file: /home/minion/cvsroot/src/minion/lib/mixminion/ClientMain.py,v
retrieving revision 1.138.2.4
retrieving revision 1.138.2.5
diff -u -d -r1.138.2.4 -r1.138.2.5
--- ClientMain.py 19 Dec 2003 17:12:25 -0000 1.138.2.4
+++ ClientMain.py 21 Dec 2003 18:33:31 -0000 1.138.2.5
@@ -472,43 +472,55 @@
else:
mword = "packet"
+ packetsSentByIndex = {}
+ def callback(idx, packetsSentByIndex=packetsSentByIndex):
+ packetsSentByIndex[idx] = 1
+
try:
- try:
- # May raise TimeoutError
- LOG.info("Connecting...")
- mixminion.MMTPClient.sendPackets(routingInfo,
- pktList,
- timeout)
- LOG.info("... %s sent", mword)
- except:
- e = sys.exc_info()
- if noQueue and warnIfLost:
- LOG.error("Error with queueing disabled: %s lost", mword)
- elif lazyQueue:
- LOG.info("Error while delivering %s; %s queued",
- mword,mword)
- self.queuePackets(pktList, routingInfo)
- else:
- LOG.info("Error while delivering %s; leaving in queue",
- mword)
- LOG.info("Error was: %s",e[1])
- else:
- try:
- clientLock()
- for h in handles:
- if self.queue.packetExists(h):
- self.queue.removePacket(h)
- removed = 0
- for p in pktList:
- if hasattr(p, 'remove'):
- p.remove()
- removed = 1
- if handles or removed:
- self.queue.cleanQueue()
- finally:
- clientUnlock()
- except MixProtocolError, e:
- raise UIError(str(e))
+ # May raise TimeoutError
+ LOG.info("Connecting...")
+ mixminion.MMTPClient.sendPackets(routingInfo,
+ pktList,
+ timeout,
+ callback=callback)
+
+ except:
+ exc = sys.exc_info()
+ else:
+ exc = None
+ nGood = len(packetsSentByIndex)
+ nBad = len(pktList)-nGood
+
+ try:
+ clientLock()
+ if nGood:
+ LOG.info("... %s %s sent", nGood, mword)
+ for idx in packetsSentByIndex.keys():
+ if handles and handles[idx]:
+ self.queue.removePacket(handles[idx])
+ elif hasattr(pktList[idx], 'remove'):
+ pktList[idx].remove()
+
+ if nBad and warnIfLost:
+ LOG.error("Error with queueing disabled: %s/%s lost",
+ nBad, nGood+nBad)
+ elif nBad and lazyQueue:
+ LOG.info("Error while delivering %s; %s/%s left in queue",
+ mword,nBad,nGood+nBad)
+
+ badPackets = [ pktList[idx] for idx in xrange(len(pktList))
+ if not packetsSentByIndex.has_key(idx) ]
+
+ self.queuePackets(badPackets, routingInfo)
+ elif nBad:
+ LOG.info("Error while delivering %s; leaving %s/%s in queue",
+ mword, nBad, nBad+nGood)
+ if exc and not nBad:
+ LOG.info("Got error after all messages were delivered.")
+ if exc:
+ LOG.info("Error was: %s",exc[1])
+ finally:
+ clientUnlock()
def flushQueue(self, maxPackets=None):
"""Try to send packets in the queue to their destinations. Do not try