[Author Prev][Author Next][Thread Prev][Thread Next][Author Index][Thread Index]

[minion-cvs] Finish ServerQueue->Filestore refactoring



Update of /home/minion/cvsroot/src/minion/lib/mixminion
In directory moria.mit.edu:/tmp/cvs-serv30795/lib/mixminion

Modified Files:
	ClientMain.py Crypto.py Filestore.py test.py 
Log Message:
Finish ServerQueue->Filestore refactoring

Index: ClientMain.py
===================================================================
RCS file: /home/minion/cvsroot/src/minion/lib/mixminion/ClientMain.py,v
retrieving revision 1.102
retrieving revision 1.103
diff -u -d -r1.102 -r1.103
--- ClientMain.py	15 Jul 2003 15:30:56 -0000	1.102
+++ ClientMain.py	24 Jul 2003 17:37:16 -0000	1.103
@@ -28,6 +28,7 @@
 
 import mixminion.BuildMessage
 import mixminion.Crypto
+import mixminion.Filestore
 import mixminion.MMTPClient
 
 from mixminion.Common import AtomicFile, IntervalSet, LOG, floorDiv, \
@@ -1293,27 +1294,33 @@
     # XXXX change this to be OO; add nicknames.
 
     # XXXX write unit tests
+    #
+    # DOCDOC fields have changed.
 
     def __init__(self, directory, prng=None):
         """Create a new ClientQueue object, storing packets in 'directory'
            and generating random filenames using 'prng'."""
         self.dir = directory
         createPrivateDir(directory)
-        if prng is not None:
-            self.prng = prng
-        else:
-            self.prng = mixminion.Crypto.getCommonPRNG()
+        # We used to name entries "pkt_X"; this has changed.
+        #
+        for fn in os.listdir(directory):
+            if fn.startswith("pkt_"):
+                handle = fn[4:]
+                fname_old = os.path.join(directory, fn)
+                fname_new = os.path.join(directory, "msg_"+handle)
+                os.rename(fname_old, fname_new)
+        
+        self.store = mixminion.Filestore.ObjectStore(
+            directory, create=1, scrub=1)
 
     def queuePacket(self, message, routing):
         """Insert the 32K packet 'message' (to be delivered to 'routing')
            into the queue.  Return the handle of the newly inserted packet."""
         clientLock()
         try:
-            f, handle = self.prng.openNewFile(self.dir, "pkt_", 1)
-            cPickle.dump(("PACKET-0", message, routing,
-                          previousMidnight(time.time())), f, 1)
-            f.close()
-            return handle
+            fmt = ("PACKET-0", message, routing, previousMidnight(time.time()))
+            return self.store.queueObject(fmt)
         finally:
             clientUnlock()
 
@@ -1322,12 +1329,7 @@
            queue."""
         clientLock()
         try:
-            fnames = os.listdir(self.dir)
-            handles = []
-            for fname in fnames:
-                if fname.startswith("pkt_"):
-                    handles.append(fname[4:])
-            return handles
+            return self.store.getAllMessages()
         finally:
             clientUnlock()
 
@@ -1335,8 +1337,11 @@
         """Given a handle, return a 3-tuple of the corresponding
            32K packet, IPV4Info, and time of first queueing.  (The time
            is rounded down to the closest midnight GMT.)"""
-        fname = os.path.join(self.dir, "pkt_"+handle)
-        magic, message, routing, when = readPickled(fname)
+        obj = self.store.getObject(handle)
+        try:
+            magic, message, routing, when = obj
+        except (ValueError, TypeError):
+            magic = None
         if magic != "PACKET-0":
             LOG.error("Unrecognized packet format for %s",handle)
             return None
@@ -1345,13 +1350,12 @@
     def packetExists(self, handle):
         """Return true iff the queue contains a packet with the handle
            'handle'."""
-        fname = os.path.join(self.dir, "pkt_"+handle)
-        return os.path.exists(fname)
+        return self.store.messageExists(handle)
 
     def removePacket(self, handle):
         """Remove the packet named with the handle 'handle'."""
-        fname = os.path.join(self.dir, "pkt_"+handle)
-        secureDelete(fname, blocking=1)
+        self.store.removeMessage(handle)
+        self.store.cleanQueue()
 
     def inspectQueue(self, now=None):
         """Print a message describing how many messages in the queue are headed
@@ -1388,11 +1392,128 @@
                 remove.append(h)
         LOG.info("Removing %s old messages from queue", len(remove))
         for h in remove:
-            self.removePacket(h)
+            self.store.removeMessage(h)
+        self.store.cleanQueue()
+
+## class ClientQueue:
+##     """A ClientQueue holds packets that have been scheduled for delivery
+##        but not yet delivered.  As a matter of policy, we queue messages if
+##        the user tells us to, or if deliver has failed and the user didn't
+##        tell us not to."""
+##     ## Fields:
+##     # dir -- a directory to store packets in.
+##     # prng -- an instance of mixminion.Crypto.RNG.
+##     ## Format:
+##     # The directory holds files with names of the form pkt_<handle>.
+##     # Each file holds pickled tuple containing:
+##     #           ("PACKET-0",
+##     #             a 32K string (the packet),
+##     #             an instance of IPV4Info (the first hop),
+##     #             the latest midnight preceding the time when this
+##     #                 packet was inserted into the queue
+##     #           )
+##     # XXXX change this to be OO; add nicknames.
+
+##     # XXXX write unit tests
+
+##     def __init__(self, directory, prng=None):
+##         """Create a new ClientQueue object, storing packets in 'directory'
+##            and generating random filenames using 'prng'."""
+##         self.dir = directory
+##         createPrivateDir(directory)
+##         if prng is not None:
+##             self.prng = prng
+##         else:
+##             self.prng = mixminion.Crypto.getCommonPRNG()
+
+##     def queuePacket(self, message, routing):
+##         """Insert the 32K packet 'message' (to be delivered to 'routing')
+##            into the queue.  Return the handle of the newly inserted packet."""
+##         clientLock()
+##         try:
+##             f, handle = self.prng.openNewFile(self.dir, "pkt_", 1)
+##             cPickle.dump(("PACKET-0", message, routing,
+##                           previousMidnight(time.time())), f, 1)
+##             f.close()
+##             return handle
+##         finally:
+##             clientUnlock()
+
+##     def getHandles(self):
+##         """Return a list of the handles of all messages currently in the
+##            queue."""
+##         clientLock()
+##         try:
+##             fnames = os.listdir(self.dir)
+##             handles = []
+##             for fname in fnames:
+##                 if fname.startswith("pkt_"):
+##                     handles.append(fname[4:])
+##             return handles
+##         finally:
+##             clientUnlock()
+
+##     def getPacket(self, handle):
+##         """Given a handle, return a 3-tuple of the corresponding
+##            32K packet, IPV4Info, and time of first queueing.  (The time
+##            is rounded down to the closest midnight GMT.)"""
+##         fname = os.path.join(self.dir, "pkt_"+handle)
+##         magic, message, routing, when = readPickled(fname)
+##         if magic != "PACKET-0":
+##             LOG.error("Unrecognized packet format for %s",handle)
+##             return None
+##         return message, routing, when
+
+##     def packetExists(self, handle):
+##         """Return true iff the queue contains a packet with the handle
+##            'handle'."""
+##         fname = os.path.join(self.dir, "pkt_"+handle)
+##         return os.path.exists(fname)
+
+##     def removePacket(self, handle):
+##         """Remove the packet named with the handle 'handle'."""
+##         fname = os.path.join(self.dir, "pkt_"+handle)
+##         secureDelete(fname, blocking=1)
+
+##     def inspectQueue(self, now=None):
+##         """Print a message describing how many messages in the queue are headed
+##            to which addresses."""
+##         if now is None:
+##             now = time.time()
+##         handles = self.getHandles()
+##         if not handles:
+##             print "[Queue is empty.]"
+##             return
+##         timesByServer = {}
+##         for h in handles:
+##             _, routing, when = self.getPacket(h)
+##             timesByServer.setdefault(routing, []).append(when)
+##         for s in timesByServer.keys():
+##             count = len(timesByServer[s])
+##             oldest = min(timesByServer[s])
+##             days = floorDiv(now - oldest, 24*60*60)
+##             if days < 1:
+##                 days = "<1"
+##             print "%2d messages for server at %s:%s (oldest is %s days old)"%(
+##                 count, s.ip, s.port, days)
+
+##     def cleanQueue(self, maxAge, now=None):
+##         """Remove all messages older than maxAge seconds from this
+##            queue."""
+##         if now is None:
+##             now = time.time()
+##         cutoff = now - maxAge
+##         remove = []
+##         for h in self.getHandles():
+##             when = self.getPacket(h)[2]
+##             if when < cutoff:
+##                 remove.append(h)
+##         LOG.info("Removing %s old messages from queue", len(remove))
+##         for h in remove:
+##             self.removePacket(h)
 
 class MixminionClient:
-    """Access point for client functionality.  Currently, this is limited
-       to generating and sending forward messages"""
+    """Access point for client functionality."""
     ## Fields:
     # config: The ClientConfig object with the current configuration
     # prng: A pseudo-random number generator for padding and path selection

Index: Crypto.py
===================================================================
RCS file: /home/minion/cvsroot/src/minion/lib/mixminion/Crypto.py,v
retrieving revision 1.49
retrieving revision 1.50
diff -u -d -r1.49 -r1.50
--- Crypto.py	14 Jul 2003 15:38:50 -0000	1.49
+++ Crypto.py	24 Jul 2003 17:37:16 -0000	1.50
@@ -479,6 +479,9 @@
 # Magic number used for normal distribution
 NV_MAGICCONST = 4 * math.exp(-0.5)/math.sqrt(2.0)
 
+# Flag: is the filesystem case-insensitive?
+FS_IS_CASEI = sys.platform in ('cygwin', 'win32')
+
 class RNG:
     '''Base implementation class for random number generators.  Works
        by requesting a bunch of bytes via self._prng, and doling them
@@ -601,6 +604,8 @@
         while 1:
             bytes = self.getBytes(6)
             base = binascii.b2a_base64(bytes).strip().replace("/","-")
+            if FS_IS_CASEI:
+                base = base.lower()
             fname = os.path.join(dir, "%s%s"%(prefix,base))
             try:
                 fd = os.open(fname, flags, 0600)

Index: Filestore.py
===================================================================
RCS file: /home/minion/cvsroot/src/minion/lib/mixminion/Filestore.py,v
retrieving revision 1.1
retrieving revision 1.2
diff -u -d -r1.1 -r1.2
--- Filestore.py	24 Jul 2003 03:22:56 -0000	1.1
+++ Filestore.py	24 Jul 2003 17:37:16 -0000	1.2
@@ -146,6 +146,10 @@
         self._lock.release()
         return hs
 
+    def messageExists(self, handle):
+        """DOCDOC"""
+        return os.path.exists(os.path.join(self.dir, "msg_"+handle))
+
     def removeMessage(self, handle):
         """Given a handle, removes the corresponding message from the
            filestore.  """

Index: test.py
===================================================================
RCS file: /home/minion/cvsroot/src/minion/lib/mixminion/test.py,v
retrieving revision 1.140
retrieving revision 1.141
diff -u -d -r1.140 -r1.141
--- test.py	15 Jul 2003 15:30:56 -0000	1.140
+++ test.py	24 Jul 2003 17:37:16 -0000	1.141
@@ -42,6 +42,7 @@
 import mixminion.ClientMain
 import mixminion.Config
 import mixminion.Crypto as Crypto
+import mixminion.Filestore
 import mixminion.MMTPClient
 import mixminion.Packet
 import mixminion.ServerInfo
@@ -160,9 +161,41 @@
     Crypto.pk_generate = _pk_generate_replacement
 
 #----------------------------------------------------------------------
+# Add some helpful functions to unittest.TestCase
+
+class TestCase(unittest.TestCase):
+    """DOCDOC"""
+    def __init__(self, *args, **kwargs):
+        unittest.TestCase.__init__(self, *args, **kwargs)
+    def assertFloatEq(self, f1, f2):
+        if not floatEq(f1, f2):
+            self.fail("%s != %s" % (f1, f2))
+    def assertLongStringEq(self, s1, s2):
+        if s1 != s2:
+            d = findFirstDiff(s1, s2)
+            self.fail("Strings unequal.  First difference at %s: %r vs %r"
+                      % (d, s1[d:+10], s2[d:d+10]))
+    def assertUnorderedEq(self, l1, l2):
+        l1 = list(l1)[:]
+        l2 = list(l2)[:]
+        l1.sort()
+        l2.sort()
+        self.assertEquals(l1, l2)
+    def assertStartsWith(self, s1, s2):
+        if not s1.startswith(s2):
+            if len(s1) > min(40, len(s2)+5):
+                s1 = s1[:len(s2)+5]+"..."
+            self.fail("%r does not start with %r"%(s1,s2))
+    def assertEndsWith(self, s1, s2):
+        if not s1.endswith(s2):
+            if len(s1) > min(40, len(s2)+5):
+                s1 = "..."+s1[-(len(s2)+5):]
+            self.fail("%r does not end with %r"%(s1,s2))
+
+#----------------------------------------------------------------------
 # Tests for common functionality
 
-class MiscTests(unittest.TestCase):
+class MiscTests(TestCase):
     def testDiv(self):
         self.assertEquals(floorDiv(10,1), 10)
         self.assertEquals(floorDiv(10,2), 5)
@@ -584,7 +617,7 @@
                     self.assertEquals(h, headers)
                     self.assertEquals(b.strip(), inp.strip())
                     if not mode:
-                        self.assert_(b.endswith("\n"))
+                        self.assertEndsWith(b, "\n")
 
         # Test base64fn and concatenation.
         enc1 = armorText(inp2, "THIS THAT", [("H-64", "0")], 0)
@@ -624,7 +657,7 @@
         
 #----------------------------------------------------------------------
 
-class MinionlibCryptoTests(unittest.TestCase):
+class MinionlibCryptoTests(TestCase):
     """Tests for cryptographic C extensions."""
     def test_sha1(self):
         s1 = _ml.sha1
@@ -846,7 +879,7 @@
         self.assertEquals(p.encode_key(0), p2.encode_key(0))
 
 #----------------------------------------------------------------------
-class MinionlibFECTests(unittest.TestCase):
+class MinionlibFECTests(TestCase):
     def do_fec_test(self, k, n, sz):
         r = getCommonPRNG()
 
@@ -905,7 +938,7 @@
 
 #----------------------------------------------------------------------
 
-class CryptoTests(unittest.TestCase):
+class CryptoTests(TestCase):
     """Tests for Python cryptographic library"""
     def test_initcrypto(self):
         init_crypto()
@@ -1135,13 +1168,11 @@
             if foundUnmoved: break
         self.failUnless(foundUnmoved)
         for lst in lists:
-            s = lst[:]
-            s.sort()
-            self.assertEquals(s, range(100))
+            self.assertUnorderedEq(lst, range(100))
 
 #----------------------------------------------------------------------
 
-class PacketTests(unittest.TestCase):
+class PacketTests(TestCase):
     def test_subheader(self):
         s = Subheader(3,0,"abcdeabcdeabcdef",
                       "ABCDEFGHIJABCDEFGHIJ",
@@ -1442,7 +1473,7 @@
             resumeLog()
 
 #----------------------------------------------------------------------
-class HashLogTests(unittest.TestCase):
+class HashLogTests(TestCase):
     def test_hashlog(self):
         # Create a new,empty hashlog.
         fname = mix_mktemp(".db")
@@ -1561,7 +1592,7 @@
            to this server."""
         return IPV4Info(self.addr, self.port, self.keyid)
 
-class BuildMessageTests(unittest.TestCase):
+class BuildMessageTests(TestCase):
     def setUp(self):
         self.pk1 = getRSAKey(0,2048)
         self.pk2 = getRSAKey(1,2048)
@@ -1610,7 +1641,7 @@
                 pld = BuildMessage._encodePayload(m,ov,p)
                 self.assertEquals(28*1024, len(pld)+ov)
                 comp = compressData(m)
-                self.assert_(pld[22:].startswith(comp))
+                self.assertStartsWith(pld[22:], comp)
                 self.assertEquals(sha1(pld[22:]),pld[2:22])
                 self.assert_(BuildMessage._checkPayload(pld))
                 self.assertEquals(len(comp), ord(pld[0])*256+ord(pld[1]))
@@ -1979,7 +2010,7 @@
         self.assertEquals(0, ord(t[0]) & 0x80)
         comp = compressData("Hello!!!!")
         self.assertEquals(len(comp), ord(p[0])*256 +ord(p[1]))
-        self.assert_(p[22:].startswith(comp))
+        self.assertStartsWith(p[22:], comp)
         self.assertEquals(sha1(p[22:]), p[2:22])
 
         for rsakey in (rsa1, rsa2):
@@ -1994,7 +2025,7 @@
             comp = compressData(payload)
             self.assert_(len(comp), ord(msg[0])*256 + ord(msg[1]))
             self.assertEquals(sha1(msg[22:]), msg[2:22])
-            self.assert_(msg[22:].startswith(comp))
+            self.assertStartsWith(msg[22:], comp)
 
     def test_buildreply(self):
         brbi = BuildMessage._buildReplyBlockImpl
@@ -2060,10 +2091,10 @@
                           self.server3.getRoutingInfo().pack())
         self.assertEquals(reply.pack(), parseReplyBlock(reply.pack()).pack())
         txt = reply.packAsText()
-        self.assert_(txt.startswith(
-            "-----BEGIN TYPE III REPLY BLOCK-----\nVersion: 0.2\n\n"))
-        self.assert_(txt.endswith(
-            "-----END TYPE III REPLY BLOCK-----\n"))
+        self.assertStartsWith(txt,
+            "-----BEGIN TYPE III REPLY BLOCK-----\nVersion: 0.2\n\n")
+        self.assertEndsWith(txt,
+            "-----END TYPE III REPLY BLOCK-----\n")
         parsed = parseTextReplyBlocks(txt)
         self.assertEquals(1, len(parsed))
         self.assertEquals(reply.pack(), parsed[0].pack())
@@ -2123,7 +2154,7 @@
                                Crypto.PAYLOAD_ENCRYPT_MODE))
         comp = compressData('Information???')
         self.assertEquals(len(comp), ord(p[0])*256 +ord(p[1]))
-        self.assert_(p[22:].startswith(comp))
+        self.assertStartsWith(p[22:], comp)
         self.assertEquals(sha1(p[22:]), p[2:22])
 
         p,t = messages['srepl']
@@ -2136,7 +2167,7 @@
                                       Crypto.PAYLOAD_ENCRYPT_MODE))
         comp = compressData(payload)
         self.assertEquals(len(comp), ord(p[0])*256 +ord(p[1]))
-        self.assert_(p[22:].startswith(comp))
+        self.assertStartsWith(p[22:], comp)
         self.assertEquals(sha1(p[22:]), p[2:22])
 
     def test_decoding(self):
@@ -2276,7 +2307,7 @@
 #
 # (of course, we still need to build failing messages by hand)
 
-class PacketHandlerTests(unittest.TestCase):
+class PacketHandlerTests(TestCase):
     def setUp(self):
         self.pk1 = getRSAKey(0,2048)
         self.pk2 = getRSAKey(1,2048)
@@ -2320,7 +2351,7 @@
                 if appkey:
                     self.assertEquals(res.getApplicationKey(), appkey)
 
-                self.assert_(res.getContents().startswith(payload))
+                self.assertStartsWith(res.getContents(), payload)
                 break
         return res
 
@@ -2595,7 +2626,6 @@
 #----------------------------------------------------------------------
 # QUEUE
 
-
 class TestDeliveryQueue(DeliveryQueue):
     def __init__(self,d,now=None):
         DeliveryQueue.__init__(self,d,now=now)
@@ -2606,7 +2636,7 @@
     def _deliverMessages(self, msgList):
         self._msgs = msgList
 
-class QueueTests(unittest.TestCase):
+class QueueTests(TestCase):
     def setUp(self):
         mixminion.Common.installSIGCHLDHandler()
         self.d1 = mix_mktemp("q1")
@@ -2618,16 +2648,18 @@
             os.unlink(f)
 
     def testCreateQueue(self):
+        Store = mixminion.Filestore.MixedStore
+        
         # Nonexistent dir.
-        self.failUnlessRaises(MixFatalError, Queue, self.d1)
+        self.failUnlessRaises(MixFatalError, Store, self.d1)
         # File in place of dir
         writeFile(self.d1, "   ")
-        self.failUnlessRaises(MixFatalError, Queue, self.d1)
-        self.failUnlessRaises(MixFatalError, Queue, self.d1, create=1)
+        self.failUnlessRaises(MixFatalError, Store, self.d1)
+        self.failUnlessRaises(MixFatalError, Store, self.d1, create=1)
         os.unlink(self.d1)
 
         # Try to create
-        queue = Queue(self.d1, create=1)
+        queue = Store(self.d1, create=1)
         self.failUnless(os.path.isdir(self.d1))
         if not ON_WINDOWS:
             self.assertEquals(0700, os.stat(self.d1)[stat.ST_MODE] & 0777)
@@ -2638,10 +2670,10 @@
         self.assertEquals(2, queue.count())
 
         # Make sure recreate doesn't bonk
-        queue = Queue(self.d1, create=1)
+        queue = Store(self.d1, create=1)
 
         # Use a queue we haven't just made.
-        queue = Queue(self.d1)
+        queue = Store(self.d1)
         self.assertEquals(2, queue.count())
         self.assertEquals(queue.messageContents(h2), "Hello world 2")
         queue.removeMessage(h2)
@@ -2650,8 +2682,10 @@
         queue.removeAll(self.unlink)
 
     def testQueueOps(self):
-        queue1 = Queue(self.d2, create=1)
-        queue2 = Queue(self.d3, create=1)
+        Store = mixminion.Filestore.MixedStore
+        
+        queue1 = Store(self.d2, create=1)
+        queue2 = Store(self.d3, create=1)
 
         # Put 100 messages in queue1
         handles = [queue1.queueMessage("Sample message %s" % i)
@@ -2685,7 +2719,7 @@
             seen = {}
             for h in group:
                 c = queue2.messageContents(h)
-                self.failUnless(c.startswith("Sample message "))
+                self.assertStartsWith(c, "Sample message ")
                 i = atoi(c[15:])
                 self.failIf(seen.has_key(i))
                 seen[i]=1
@@ -2743,6 +2777,40 @@
         queue1.cleanQueue(self.unlink)
         queue2.cleanQueue(self.unlink)
 
+    def testMetadataQueues(self):
+        d_d = mix_mktemp("q_md")
+        Store = mixminion.Filestore.StringMetadataStore
+
+        queue = Store(d_d, create=1)
+        h1 = queue.queueMessage("abc")
+        queue.setMetadata(h1, [2,3])
+        self.assertEquals(readPickled(os.path.join(d_d, "meta_"+h1)), [2,3])
+        self.assertEquals(queue.getMetadata(h1), [2,3])
+        h2 = queue.queueMessage("def")
+        queue.setMetadata(h2, [5,6])
+        h3 = queue.queueMessage("ghi")
+        self.assertEquals(queue._metadata_cache, { h1 : [2,3], h2 : [5,6] })
+
+        queue = Store(d_d, create=0)
+        self.assertEquals(queue.getMetadata(h2), [5,6])
+        self.assertEquals(queue._metadata_cache, { h2 : [5,6] })
+        try:
+            suspendLog()
+            queue.loadAllMetadata(lambda h: h)
+        finally:
+            s = resumeLog()
+        self.assertEndsWith(s, "Missing metadata for file %s\n"%h3)
+        self.assertEquals(queue._metadata_cache, { h1 : [2,3], h2 : [5,6],
+                                                   h3: h3})
+        self.assertEquals(readPickled(os.path.join(d_d, "meta_"+h3)), h3)
+        queue.removeMessage(h2)
+        self.assertEquals(queue._metadata_cache, { h1 : [2,3], h3: h3 })
+        self.assert_(os.path.exists(os.path.join(d_d, "rmvm_"+h2)))
+        self.assert_(os.path.exists(os.path.join(d_d, "rmv_"+h2)))
+        queue.cleanQueue()
+        self.assert_(not os.path.exists(os.path.join(d_d, "rmvm_"+h2)))
+        self.assert_(not os.path.exists(os.path.join(d_d, "rmv_"+h2)))
+
     def testDeliveryQueues(self):
         d_d = mix_mktemp("qd")
 
@@ -2771,11 +2839,7 @@
         self.assertEquals([(h3, "Message 3")], msgs)
 
         # Now, make sure that msg1 is gone from the pool.
-        allHandles = queue.getAllMessages()
-        allHandles.sort()
-        exHandles = [h2,h3]
-        exHandles.sort()
-        self.assertEquals(exHandles, allHandles)
+        self.assertUnorderedEq(queue.getAllMessages(), [h2, h3])
 
         # Now, fail msg2 retriably, and fail msg3 hard.  Only one message
         # should be left.  (It will have a different handle from the old
@@ -2785,9 +2849,7 @@
         allHandles = queue.getAllMessages()
         h4 = allHandles[0]
         queue.cleanQueue(self.unlink)
-        files = os.listdir(d_d)
-        files.sort()
-        self.assertEquals(files, ["meta_"+h4, "msg_"+h4])
+        self.assertUnorderedEq(os.listdir(d_d), ["meta_"+h4, "msg_"+h4])
         self.assertEquals([h4], queue.getAllMessages())
         self.assertEquals(("Message 2", now, now, now+10), queue._inspect(h2))
 
@@ -2836,14 +2898,10 @@
 
         # Trivial 'TimedMixPool'
         queue = TimedMixPool(d_m)
-        h1 = queue.queueMessage("Hello1")
-        h2 = queue.queueMessage("Hello2")
-        h3 = queue.queueMessage("Hello3")
-        b = queue.getBatch()
-        msgs = [h1,h2,h3]
-        msgs.sort()
-        b.sort()
-        self.assertEquals(msgs,b)
+        h1 = queue.queueObject("Hello1")
+        h2 = queue.queueObject("Hello2")
+        h3 = queue.queueObject("Hello3")
+        self.assertUnorderedEq(queue.getBatch(), [h1,h2,h3])
 
         # Now, test the CottrellMixPool.
         cmq = CottrellMixPool(d_m, 600, 6, sendRate=.3)
@@ -2852,7 +2910,7 @@
         self.assertEquals([], cmq.getBatch())
         # 8 messages: 2 get sent
         for i in range(5):
-            cmq.queueMessage("Message %s"%i)
+            cmq.queueObject("Message %s"%i)
         self.assertEquals(8, cmq.count())
         b1, b2, b3 = cmq.getBatch(), cmq.getBatch(), cmq.getBatch()
         self.assertEquals(2, len(b1))
@@ -2869,11 +2927,10 @@
 
         # Send 30 when there are 100 messages.
         for x in xrange(92):
-            cmq.queueMessage("Hello2 %s"%x)
+            cmq.queueObject("Hello2 %s"%x)
         for x in xrange(10):
             self.assertEquals(30, len(cmq.getBatch()))
 
-
         # Binomial Cottrell pool
         bcmq = BinomialCottrellMixPool(d_m, 600, 6, sendRate=.3)
         # (Just make sure that we don't always return the same number of
@@ -2893,7 +2950,7 @@
 
 #---------------------------------------------------------------------
 # LOGGING
-class LogTests(unittest.TestCase):
+class LogTests(TestCase):
     def testLogging(self):
 
         # Create a new loghandler, and try sending a few messages to it.
@@ -2906,12 +2963,11 @@
         log.trace("Foo")
         self.assertEquals(buf.getvalue(), "")
         log.log("WARN", "Hello%sworld", ", ")
-        self.failUnless(buf.getvalue().endswith(
-            "[WARN] Hello, world\n"))
+        self.assertEndsWith(buf.getvalue(), "[WARN] Hello, world\n")
         self.failUnless(buf.getvalue().index('\n') == len(buf.getvalue())-1)
         log.error("All your anonymity are belong to us")
-        self.failUnless(buf.getvalue().endswith(
-            "[ERROR] All your anonymity are belong to us\n"))
+        self.assertEndsWith(buf.getvalue(), 
+            "[ERROR] All your anonymity are belong to us\n")
 
         buf.truncate(0)
 
@@ -2966,7 +3022,7 @@
 # File paranoia
 
 
-class FileParanoiaTests(unittest.TestCase):
+class FileParanoiaTests(TestCase):
     def ensureParanoia(self, whatkind):
         tempdir = mixminion.testSupport._MM_TESTING_TEMPDIR
 
@@ -3244,7 +3300,7 @@
         assert not (self._failed or self._succeeded)
         self._succeeded = 1
 
-class MMTPTests(unittest.TestCase):
+class MMTPTests(TestCase):
     #XXXX This class is bulky, and has lots of cut-and-paste.  It could do
     #XXXX with a refactoring.
     def doTest(self, fn):
@@ -3568,7 +3624,7 @@
         self._restrictFormat = restrict
         _ConfigFile.__init__(self,fname,string)
 
-class ConfigFileTests(unittest.TestCase):
+class ConfigFileTests(TestCase):
     def testValidFiles(self):
         
         TCF = TestConfigFile
@@ -3698,8 +3754,7 @@
         # interval
         self.assertEquals(str(C._parseInterval(" 1 sec ")),"1 second")
         self.assertEquals(str(C._parseInterval(" 99 sec ")),"99 seconds")
-        self.failUnless(floatEq(float(C._parseInterval("1.5 minutes")),
-                                90))
+        self.assertFloatEq(float(C._parseInterval("1.5 minutes")), 90)
         h2 = C._parseInterval("2 houRS")
         m120 = C._parseInterval("120 minutes")
         self.assertEquals(str(h2), "2 hours")
@@ -3775,13 +3830,13 @@
 
         SC = mixminion.server.ServerConfig
         # Fractions
-        self.assert_(floatEq(SC._parseFraction("90 %"), .90))
-        self.assert_(floatEq(SC._parseFraction(" 90%"), .90))
-        self.assert_(floatEq(SC._parseFraction(".02"), .02))
-        self.assert_(floatEq(SC._parseFraction("1"), 1))
-        self.assert_(floatEq(SC._parseFraction("0"), 0))
-        self.assert_(floatEq(SC._parseFraction("100%"), 1))
-        self.assert_(floatEq(SC._parseFraction("0%"), 0))
+        self.assertFloatEq(SC._parseFraction("90 %"), .90)
+        self.assertFloatEq(SC._parseFraction(" 90%"), .90)
+        self.assertFloatEq(SC._parseFraction(".02"), .02)
+        self.assertFloatEq(SC._parseFraction("1"), 1)
+        self.assertFloatEq(SC._parseFraction("0"), 0)
+        self.assertFloatEq(SC._parseFraction("100%"), 1)
+        self.assertFloatEq(SC._parseFraction("0%"), 0)
         # Mix algorithms
         self.assertEquals(SC._parseMixRule(" Cottrell"), "CottrellMixPool")
         self.assertEquals(SC._parseMixRule("binomialCottrell"),
@@ -3925,7 +3980,7 @@
 Nickname: fred-the-bunny
 """
 
-class ServerInfoTests(unittest.TestCase):
+class ServerInfoTests(TestCase):
     def test_ServerInfo(self):
         # Try generating a serverinfo and see if its values are as expected.
         identity = getRSAKey(1, 2048)
@@ -4014,8 +4069,8 @@
         self.assert_(not info.isNewerThan(time.time()+60))
 
         # Now check whether we still validate the same after some corruption
-        self.assert_(inf.startswith("[Server]\n"))
-        self.assert_(inf.endswith("\n"))
+        self.assertStartsWith(inf, "[Server]\n")
+        self.assertEndsWith(inf, "\n")
         self.assert_(stringContains(inf, "b.c\n"))
         inf2 = inf.replace("[Server]\n", "[Server] \r")
         inf2 = inf2.replace("b.c\n", "b.c\r\n")
@@ -4191,7 +4246,7 @@
                               identity, now)
         # (Fred1, and Lola1 should get included.)
         d = readFile(lst.getDirectoryFilename())
-        self.assert_(d.startswith("[Directory]\n"))
+        self.assertStartsWith(d, "[Directory]\n")
         eq(2, d.count("[Server]\n"))
         self.assert_(stringContains(d, examples["Fred"][1]))
         self.assert_(stringContains(d, examples["Lola"][1]))
@@ -4315,7 +4370,7 @@
 
 #----------------------------------------------------------------------
 # EventStats
-class EventStatsTests(unittest.TestCase):
+class EventStatsTests(TestCase):
     def testNilLog(self):
         import mixminion.server.EventStats as ES
         ES.configureLog({'Server': {'LogStats' : 0}})
@@ -4406,7 +4461,7 @@
                           'Homedir' : homedir,
                  'StatsInterval' : mixminion.Config._parseInterval("1 hour")}})
         eq(ES.log.count['UnretriableDelivery'], {})
-        self.assert_(floatEq(ES.log.lastSave, time.time()))
+        self.assertFloatEq(ES.log.lastSave, time.time())
 
         # Test time configured properly.
         # 1) Rotation interval is a multiple of hours.
@@ -4505,7 +4560,7 @@
             return mixminion.server.Modules.DELIVER_OK
 """
 
-class ModuleManagerTests(unittest.TestCase):
+class ModuleManagerTests(TestCase):
     def testModuleManager(self):
         FDP = FakeDeliveryPacket
         mod_dir = mix_mktemp()
@@ -4705,7 +4760,7 @@
         self.payload = None
         self.contents = contents
 
-class ModuleTests(unittest.TestCase):
+class ModuleTests(TestCase):
     def testEmailAddressSet(self):
         EmailAddressSet = mixminion.server.Modules.EmailAddressSet
         def has(set, item, self=self):
@@ -4815,9 +4870,9 @@
             fn = os.path.join(os.path.split(fn)[0],
                               "rmv_"+os.path.split(fn)[1][4:])
             m = readFile(fn)
-            self.assert_(m.startswith(
+            self.assertStartsWith(m,
                 "To: foo@bar\nFrom: nobody\n"
-                "Subject: foobar\nX-Anonymous: yes\n\n"))
+                "Subject: foobar\nX-Anonymous: yes\n\n")
             self.assert_(stringContains(m, "This is the message"))
 
             ## What about the flush command?
@@ -4919,10 +4974,7 @@
 Free to speak, to free ourselves
 Free to hide no more.
 -----END TYPE III ANONYMOUS MESSAGE-----\n"""
-            d = findFirstDiff(EXPECTED_SMTP_PACKET, args[3])
-            if d != -1:
-                print d, "near", repr(args[3][d-10:d+10])
-            self.assert_(EXPECTED_SMTP_PACKET == args[3])
+            self.assertLongStringEq(EXPECTED_SMTP_PACKET, args[3])
             clearReplacedFunctionCallLog()
 
             # Now, with headers.
@@ -4959,10 +5011,7 @@
 Free to speak, to free ourselves
 Free to hide no more.
 -----END TYPE III ANONYMOUS MESSAGE-----\n'''
-            d = findFirstDiff(EXPECTED_SMTP_PACKET, args[3])
-            if d != -1:
-                print d, "near", repr(args[3][d-10:d+10])
-            self.assert_(EXPECTED_SMTP_PACKET == args[3])
+            self.assertLongStringEq(EXPECTED_SMTP_PACKET, args[3])
             clearReplacedFunctionCallLog()
 
             # Now, try a bunch of messages that won't be delivered: one with
@@ -5040,10 +5089,7 @@
                                ['mixminion@theotherhost'],
                                'returnaddress@x'),
                               args[:3])
-            d = findFirstDiff(MBOX_EXPECTED_MESSAGE, args[3])
-            if d != -1:
-                print d, "near", repr(args[3][d:d+10])
-            self.assertEquals(MBOX_EXPECTED_MESSAGE, args[3])
+            self.assertLongStringEq(MBOX_EXPECTED_MESSAGE, args[3])
         finally:
             undoReplacedAttributes()
             clearReplacedFunctionCallLog()
@@ -5081,7 +5127,7 @@
             self.assert_(not os.path.exists(os.path.join(dir, "2")))
         finally:
             m = resumeLog()
-        self.assert_(m.endswith("Unable to deliver message\n"))
+        self.assertEndsWith(m, "Unable to deliver message\n")
 
         try:
             suspendLog()
@@ -5091,7 +5137,7 @@
             self.assert_(not os.path.exists(os.path.join(dir, "2")))
         finally:
             m = resumeLog()
-        self.assert_(m.endswith("Unable to retry delivery for message\n"))
+        self.assertEndsWith(m, "Unable to retry delivery for message\n")
 
         queue.sendReadyMessages()
 
@@ -5171,7 +5217,7 @@
         resumeLog()
     return mixminion.server.ServerKeys.ServerKeyring(conf)
 
-class ServerKeysTests(unittest.TestCase):
+class ServerKeysTests(TestCase):
     def testServerKeyring(self):
         keyring = _getServerKeyring()
         home = _FAKE_HOME
@@ -5280,7 +5326,7 @@
 
 #----------------------------------------------------------------------
 
-class ServerMainTests(unittest.TestCase):
+class ServerMainTests(TestCase):
     def testScheduler(self):
         _Scheduler = mixminion.server.ServerMain._Scheduler
         lst=[]
@@ -5338,7 +5384,7 @@
         self.assertEquals(pool.getNextMixTime(100), 100+12*60*60)
         self.assertEquals(pool.queue.minPool, 10)
         self.assertEquals(pool.queue.minSend, 1)
-        self.assert_(floatEq(pool.queue.sendRate, .4))
+        self.assertFloatEq(pool.queue.sendRate, .4)
 
         pool = MixPool(configBCottrell, mixDir)
         self.assert_(isinstance(pool.queue,
@@ -5346,7 +5392,7 @@
         self.assertEquals(pool.getNextMixTime(100), 100+6*60*60)
         self.assertEquals(pool.queue.minPool, 10)
         self.assertEquals(pool.queue.minSend, 1)
-        self.assert_(floatEq(pool.queue.sendRate, .4))
+        self.assertFloatEq(pool.queue.sendRate, .4)
 
         # FFFF test other mix pool behavior
 
@@ -5459,7 +5505,7 @@
 # variable to hold the latest instance of FakeBCC.
 BCC_INSTANCE = None
 
-class ClientMainTests(unittest.TestCase):
+class ClientMainTests(TestCase):
     def testClientDirectory(self):
         """Check out ClientMain's directory implementation"""
         eq = self.assertEquals
@@ -6127,7 +6173,7 @@
     tc = loader.loadTestsFromTestCase
 
     if 0:
-        suite.addTest(tc(ClientMainTests))
+        suite.addTest(tc(QueueTests))
         return suite
 
     suite.addTest(tc(MiscTests))