[Author Prev][Author Next][Thread Prev][Thread Next][Author Index][Thread Index]

[minion-cvs] Look, we"re alpha again! Time for some delayed perform...



Update of /home/minion/cvsroot/src/minion/lib/mixminion
In directory moria.mit.edu:/tmp/cvs-serv6726/lib/mixminion

Modified Files:
	Common.py test.py 
Log Message:
Look, we're alpha again!  Time for some delayed performance work.

(Some of this still needs docs and tests)

Common, ServerMain:
- To make the server shut down faster, abandon pending work in the 
  processing and cleaning threads when we're shutting down.

  (Formerly, if we got behind cleaning files or processing messages,
  we'd wait to finish before we shut down.  Now we shut down almost
  immediately.  This is safe, because the code is written to be safe
  in the presence of SIGKILL's anyway.)

test, MMTPServer, Modules, ServerMain
- When we had N messages to deliver, we used to suck all N into
  memory.  Clearly, this was a pretty rotten idea: instead, we now
  lazy-load messages as they're needed.  This makes the mixminion
  server use even less memory under load than it did before.

  The code is a little hairy, but it seems to work for me.  It still
  needs documentation.



Index: Common.py
===================================================================
RCS file: /home/minion/cvsroot/src/minion/lib/mixminion/Common.py,v
retrieving revision 1.89
retrieving revision 1.90
diff -u -d -r1.89 -r1.90
--- Common.py	6 Jun 2003 06:04:57 -0000	1.89
+++ Common.py	13 Jun 2003 01:03:45 -0000	1.90
@@ -36,9 +36,11 @@
 import traceback
 # Imported here so we can get it in mixminion.server without being shadowed
 # by the old Queue.py file.
-from Queue import Queue
+from Queue import Queue, Empty
 MessageQueue = Queue
+QueueEmpty = Empty
 del Queue
+del Empty
 
 from types import StringType
 
@@ -1312,3 +1314,23 @@
             pass
 
         self.fd = None
+
+#----------------------------------------------------------------------
+# Threading operations
+
+class ClearableQueue(MessageQueue):
+    """DOCDOC"""
+    #XXXX005 testme
+    def clear(self):
+        if not self.esema.acquire(0):
+            return
+        self.mutex.acquire()
+        was_full = self._full()
+        self._clear()
+        assert self._empty()
+        if was_full:
+            self.fsema.release()
+        self.mutex.release()
+
+    def _clear(self):
+        del self.queue[:]

Index: test.py
===================================================================
RCS file: /home/minion/cvsroot/src/minion/lib/mixminion/test.py,v
retrieving revision 1.120
retrieving revision 1.121
diff -u -d -r1.120 -r1.121
--- test.py	6 Jun 2003 07:17:35 -0000	1.120
+++ test.py	13 Jun 2003 01:03:45 -0000	1.121
@@ -2599,24 +2599,24 @@
         queue = TestDeliveryQueue(d_d, now)
         queue.setRetrySchedule([10, 10, 10, 10]) # Retry up to 40 sec.
         # First, make sure the queue stores messages correctly.
-        h1 = queue.queueDeliveryMessage("Message 1", now)
-        h2 = queue.queueDeliveryMessage("Message 2", now)
+        h1 = queue.queueDeliveryMessage("Message 1", now=now)
+        h2 = queue.queueDeliveryMessage("Message 2", now=now)
         self.assertEquals(("Message 1", now, None, now), queue._inspect(h1))
         self.assertEquals(("Message 2", now, None, now), queue._inspect(h2))
 
         # Call sendReadyMessages to begin 'sending' msg1 and msg2.
         queue.sendReadyMessages(now)
-        msgs = queue._msgs
+        msgs = [ (msg.getHandle(), msg.getMessage()) for msg in queue._msgs ]
         self.assertEquals(2, len(msgs))
         # _deliverMessages should have gotten them both.
         self.failUnless((h1, "Message 1") in msgs)
         self.failUnless((h2, "Message 2") in msgs)
         # Add msg3, and acknowledge that msg1 succeeded.  msg2 is now in limbo
-        h3 = queue.queueDeliveryMessage("Message 3", now)
+        h3 = queue.queueDeliveryMessage("Message 3", now=now)
         queue.deliverySucceeded(h1)
         # Only msg3 should get sent out, since msg2 is still in progress.
         queue.sendReadyMessages(now+1)
-        msgs = queue._msgs
+        msgs = [ (msg.getHandle(), msg.getMessage()) for msg in queue._msgs ]
         self.assertEquals([(h3, "Message 3")], msgs)
 
         # Now, make sure that msg1 is gone from the pool.
@@ -2648,20 +2648,19 @@
 
         # When we try to send messages again after 5 seconds, nothing happens.
         queue.sendReadyMessages(now+5)
-        msgs = queue._msgs
-        self.assertEquals(None, msgs)
+        self.assertEquals(None, queue._msgs)
         # When we try to send again after after 11 seconds, message 2 fires.
         queue.sendReadyMessages(now+11)
-        msgs = queue._msgs
+        msgs = [ (msg.getHandle(), msg.getMessage()) for msg in queue._msgs ]
         self.assertEquals([(h4, "Message 2")], msgs)
         self.assertEquals(h2, h4)
-        queue.deliveryFailed(h4, retriable=1, now=now+15)
+        queue._msgs[0].failed(retriable=1, now=now+15)
         self.assertEquals(("Message 2", now, now+11, now+20),
                           queue._inspect(h2))
         # At 31 seconds, message 2 fires.
         h5 = queue.getAllMessages()[0]
         queue.sendReadyMessages(now+31)
-        msgs = queue._msgs
+        msgs = [ (msg.getHandle(), msg.getMessage()) for msg in queue._msgs ]
         self.assertEquals([(h5, "Message 2")], msgs)
         self.assertEquals(h5, h4)
         queue.deliveryFailed(h5, retriable=1, now=now+33)
@@ -2671,7 +2670,7 @@
         # already.
         h6 = queue.getAllMessages()[0]
         queue.sendReadyMessages(now+45)
-        msgs = queue._msgs
+        msgs = [ (msg.getHandle(), msg.getMessage()) for msg in queue._msgs ]
         self.assertEquals([(h6, "Message 2")], msgs)
         self.assertEquals(h6, h5)
         queue.deliveryFailed(h6, retriable=1, now=now+100)
@@ -2681,19 +2680,6 @@
         queue.removeAll(self.unlink)
         queue.cleanQueue(self.unlink)
 
-        # Make sure old-style messages get nuked.
-        writePickled(os.path.join(d_d, "msg_ABCDEFGH"),
-                     (5, None, "xyzzy", 6))
-        try:
-            suspendLog("TRACE")
-            queue = TestDeliveryQueue(d_d, now+4)
-        finally:
-            s = resumeLog()
-        self.assert_(stringContains(s, "No metadata for file handle ABCDEFGH"))
-        self.assert_(stringContains(s, "Removing item ABCDEFGH"))
-        queue.setRetrySchedule([10, 10, 10, 10]) # Retry up to 40 sec.
-        self.assertEquals([], queue.getAllMessages())
-
     def testMixPools(self):
         d_m = mix_mktemp("qm")
 
@@ -3075,6 +3061,21 @@
 
     return server, listener, messagesIn, keyid
 
+class FakeDeliverable:
+    def __init__(self, s):
+        self._failed = self._succeeded = 0
+        self._retriable = -1
+        self._contents = s
+    def getContents(self):
+        return self._contents
+    def failed(self, retriable):
+        assert not (self._failed or self._succeeded)
+        self._failed = 1
+        self._retriable = retriable
+    def succeeded(self):
+        assert not (self._failed or self._succeeded)
+        self._succeeded = 1
+
 class MMTPTests(unittest.TestCase):
     #XXXX This class is bulky, and has lots of cut-and-paste.  It could do
     #XXXX with a refactoring.
@@ -3194,11 +3195,12 @@
         # Send m1, then junk, then renegotiate, then junk, then m2.
         tlscon = mixminion.server.MMTPServer.SimpleTLSConnection
         messages = ["helloxxx"*4096, "helloyyy"*4096]
+        deliv = [FakeDeliverable(m) for m in messages]
         async = mixminion.server.MMTPServer.AsyncServer()
         clientcon = mixminion.server.MMTPServer.MMTPClientConnection(
            _getTLSContext(0), "127.0.0.1", TEST_PORT, keyid,
-           [messages[0],"JUNK","RENEGOTIATE","JUNK",messages[1]],
-           [None]*5, None)
+           [deliv[0],"JUNK","RENEGOTIATE","JUNK",deliv[1]],
+           None)
         clientcon.register(async)
         def clientThread(clientcon=clientcon, async=async):
             while not clientcon.isShutdown():
@@ -3226,11 +3228,14 @@
         self.failUnless(len(c) == 1)
         self.failUnless(startTime <= c[0].lastActivity <= endTime)
         self.assertEquals(2, server.nJunkPackets)
+        self.assert_(deliv[0]._succeeded)
+        self.assert_(deliv[1]._succeeded)
 
         # Again, with bad keyid.
+        deliv = [FakeDeliverable(m) for m in messages]
         clientcon = mixminion.server.MMTPServer.MMTPClientConnection(
-           _getTLSContext(0), "127.0.0.1", TEST_PORT, "Z"*20,
-           messages[:], [None, None], None)
+            _getTLSContext(0), "127.0.0.1", TEST_PORT, "Z"*20,
+            deliv[:], None)
         clientcon.register(async)
         def clientThread2(clientcon=clientcon, async=async):
             while not clientcon.isShutdown():
@@ -3248,6 +3253,9 @@
         finally:
             resumeLog()  #unsuppress warning
 
+        self.assert_(deliv[0]._failed)
+        self.assert_(deliv[1]._failed)
+
     def _testTimeout(self):
         server, listener, messagesIn, keyid = _getMMTPServer()
         self.listener = listener
@@ -3322,7 +3330,6 @@
         self.server = server
 
         messages = ["helloxxx"*4096, "helloyyy"*4096]
-
         # Send 2 messages -- both should be rejected.
         server.process(0.1)
         routing = IPV4Info("127.0.0.1", TEST_PORT, keyid)
@@ -3347,13 +3354,12 @@
 
         # Send m1, then junk, then renegotiate, then junk, then m2.
         messages = ["helloxxx"*4096, "helloyyy"*4096]
+        deliv = [FakeDeliverable(m) for m in messages]
         async = mixminion.server.MMTPServer.AsyncServer()
-        _failed_args = []
-        def _failed(msg, handle, retriable, _f=_failed_args):
-            _f.append((msg,handle,retriable))
+
         clientcon = mixminion.server.MMTPServer.MMTPClientConnection(
            _getTLSContext(0), "127.0.0.1", TEST_PORT, keyid,
-           messages, [None,None], None, failCallback=_failed)
+           deliv)
         clientcon.register(async)
         def clientThread(clientcon=clientcon, async=async):
             while not clientcon.isShutdown():
@@ -3367,8 +3373,8 @@
             server.process(0.1)
         t.join()
         self.assertEquals(len(messagesIn), 0)
-        self.assertEquals(_failed_args, [(messages[0], None, 1),
-                                         (messages[1], None, 1)])
+        self.assertEquals(deliv[0]._retriable, 1)
+        self.assertEquals(deliv[1]._retriable, 1)
 
 #----------------------------------------------------------------------
 # Config files
@@ -5857,7 +5863,7 @@
     tc = loader.loadTestsFromTestCase
 
     if 0:
-        suite.addTest(tc(MiscTests))
+        suite.addTest(tc(MMTPTests))
         return suite
 
     suite.addTest(tc(MiscTests))