[Author Prev][Author Next][Thread Prev][Thread Next][Author Index][Thread Index]

[minion-cvs] When sending messages along an MMTP connection, from a ...



Update of /home/minion/cvsroot/src/minion/lib/mixminion
In directory moria.mit.edu:/tmp/cvs-serv16661/lib/mixminion

Modified Files:
	ClientMain.py 
Log Message:
When sending messages along an MMTP connection, from a client,
recognize the possibility that the connection may fail after some
messages have been sent.

This actually matters: the outel pinger has (according to qumqats)
~18000 queued packets stacked up due to the last flushing bug.  That's
an average of ~21 MB per server.  Under the old logic, if there were a
connection error at any point on the 21MB stream, all 21MB would be
retried.  Now, if any packet succeeds, that packet is deleted and not
retried.

This packet affects client->server delivery; server->server relaying
already did the right thing.



Index: ClientMain.py
===================================================================
RCS file: /home/minion/cvsroot/src/minion/lib/mixminion/ClientMain.py,v
retrieving revision 1.142
retrieving revision 1.143
diff -u -d -r1.142 -r1.143
--- ClientMain.py	19 Dec 2003 17:09:36 -0000	1.142
+++ ClientMain.py	21 Dec 2003 18:32:11 -0000	1.143
@@ -472,43 +472,55 @@
         else:
             mword = "packet"
 
+        packetsSentByIndex = {}
+        def callback(idx, packetsSentByIndex=packetsSentByIndex):
+            packetsSentByIndex[idx] = 1
+
         try:
-            try:
-                # May raise TimeoutError
-                LOG.info("Connecting...")
-                mixminion.MMTPClient.sendPackets(routingInfo,
-                                                 pktList,
-                                                 timeout)
-                LOG.info("... %s sent", mword)
-            except:
-                e = sys.exc_info()
-                if noQueue and warnIfLost:
-                    LOG.error("Error with queueing disabled: %s lost", mword)
-                elif lazyQueue:
-                    LOG.info("Error while delivering %s; %s queued",
-                             mword,mword)
-                    self.queuePackets(pktList, routingInfo)
-                else:
-                    LOG.info("Error while delivering %s; leaving in queue",
-                             mword)
-                LOG.info("Error was: %s",e[1])
-            else:
-                try:
-                    clientLock()
-                    for h in handles:
-                        if self.queue.packetExists(h):
-                            self.queue.removePacket(h)
-                    removed = 0
-                    for p in pktList:
-                        if hasattr(p, 'remove'):
-                            p.remove()
-                            removed = 1
-                    if handles or removed:
-                        self.queue.cleanQueue()
-                finally:
-                    clientUnlock()
-        except MixProtocolError, e:
-            raise UIError(str(e))
+            # May raise TimeoutError
+            LOG.info("Connecting...")
+            mixminion.MMTPClient.sendPackets(routingInfo,
+                                             pktList,
+                                             timeout,
+                                             callback=callback)
+
+        except:
+            exc = sys.exc_info()
+        else:
+            exc = None
+        nGood = len(packetsSentByIndex)
+        nBad = len(pktList)-nGood
+
+        try:
+            clientLock()
+            if nGood:
+                LOG.info("... %s %s sent", nGood, mword)
+            for idx in packetsSentByIndex.keys():
+                if handles and handles[idx]:
+                    self.queue.removePacket(handles[idx])
+                elif hasattr(pktList[idx], 'remove'):
+                    pktList[idx].remove()
+
+            if nBad and warnIfLost:
+                LOG.error("Error with queueing disabled: %s/%s lost",
+                          nBad, nGood+nBad)
+            elif nBad and lazyQueue:
+                LOG.info("Error while delivering %s; %s/%s left in queue",
+                         mword,nBad,nGood+nBad)
+
+                badPackets = [ pktList[idx] for idx in xrange(len(pktList))
+                               if not packetsSentByIndex.has_key(idx) ]
+
+                self.queuePackets(badPackets, routingInfo)
+            elif nBad:
+                LOG.info("Error while delivering %s; leaving %s/%s in queue",
+                         mword, nBad, nBad+nGood)
+            if exc and not nBad:
+                LOG.info("Got error after all messages were delivered.")
+            if exc:
+                LOG.info("Error was: %s",exc[1])
+        finally:
+                clientUnlock()
 
     def flushQueue(self, maxPackets=None):
         """Try to send packets in the queue to their destinations.  Do not try