[Author Prev][Author Next][Thread Prev][Thread Next][Author Index][Thread Index]
[minion-cvs] When sending messages along an MMTP connection, from a ...
Update of /home/minion/cvsroot/src/minion/lib/mixminion
In directory moria.mit.edu:/tmp/cvs-serv16661/lib/mixminion
Modified Files:
ClientMain.py
Log Message:
When sending messages along an MMTP connection, from a client,
recognize the possibility that the connection may fail after some
messages have been sent.
This actually matters: the outel pinger has (according to qumqats)
~18000 queued packets stacked up due to the last flushing bug. That's
an average of ~21 MB per server. Under the old logic, if there were a
connection error at any point on the 21MB stream, all 21MB would be
retried. Now, if any packet succeeds, that packet is deleted and not
retried.
This packet affects client->server delivery; server->server relaying
already did the right thing.
Index: ClientMain.py
===================================================================
RCS file: /home/minion/cvsroot/src/minion/lib/mixminion/ClientMain.py,v
retrieving revision 1.142
retrieving revision 1.143
diff -u -d -r1.142 -r1.143
--- ClientMain.py 19 Dec 2003 17:09:36 -0000 1.142
+++ ClientMain.py 21 Dec 2003 18:32:11 -0000 1.143
@@ -472,43 +472,55 @@
else:
mword = "packet"
+ packetsSentByIndex = {}
+ def callback(idx, packetsSentByIndex=packetsSentByIndex):
+ packetsSentByIndex[idx] = 1
+
try:
- try:
- # May raise TimeoutError
- LOG.info("Connecting...")
- mixminion.MMTPClient.sendPackets(routingInfo,
- pktList,
- timeout)
- LOG.info("... %s sent", mword)
- except:
- e = sys.exc_info()
- if noQueue and warnIfLost:
- LOG.error("Error with queueing disabled: %s lost", mword)
- elif lazyQueue:
- LOG.info("Error while delivering %s; %s queued",
- mword,mword)
- self.queuePackets(pktList, routingInfo)
- else:
- LOG.info("Error while delivering %s; leaving in queue",
- mword)
- LOG.info("Error was: %s",e[1])
- else:
- try:
- clientLock()
- for h in handles:
- if self.queue.packetExists(h):
- self.queue.removePacket(h)
- removed = 0
- for p in pktList:
- if hasattr(p, 'remove'):
- p.remove()
- removed = 1
- if handles or removed:
- self.queue.cleanQueue()
- finally:
- clientUnlock()
- except MixProtocolError, e:
- raise UIError(str(e))
+ # May raise TimeoutError
+ LOG.info("Connecting...")
+ mixminion.MMTPClient.sendPackets(routingInfo,
+ pktList,
+ timeout,
+ callback=callback)
+
+ except:
+ exc = sys.exc_info()
+ else:
+ exc = None
+ nGood = len(packetsSentByIndex)
+ nBad = len(pktList)-nGood
+
+ try:
+ clientLock()
+ if nGood:
+ LOG.info("... %s %s sent", nGood, mword)
+ for idx in packetsSentByIndex.keys():
+ if handles and handles[idx]:
+ self.queue.removePacket(handles[idx])
+ elif hasattr(pktList[idx], 'remove'):
+ pktList[idx].remove()
+
+ if nBad and warnIfLost:
+ LOG.error("Error with queueing disabled: %s/%s lost",
+ nBad, nGood+nBad)
+ elif nBad and lazyQueue:
+ LOG.info("Error while delivering %s; %s/%s left in queue",
+ mword,nBad,nGood+nBad)
+
+ badPackets = [ pktList[idx] for idx in xrange(len(pktList))
+ if not packetsSentByIndex.has_key(idx) ]
+
+ self.queuePackets(badPackets, routingInfo)
+ elif nBad:
+ LOG.info("Error while delivering %s; leaving %s/%s in queue",
+ mword, nBad, nBad+nGood)
+ if exc and not nBad:
+ LOG.info("Got error after all messages were delivered.")
+ if exc:
+ LOG.info("Error was: %s",exc[1])
+ finally:
+ clientUnlock()
def flushQueue(self, maxPackets=None):
"""Try to send packets in the queue to their destinations. Do not try