[Author Prev][Author Next][Thread Prev][Thread Next][Author Index][Thread Index]

[minion-cvs] clean/flush messages to a single mix; note ping addr:po...



Update of /home/minion/cvsroot/src/minion/lib/mixminion
In directory moria.mit.edu:/tmp/cvs-serv29226/lib/mixminion

Modified Files:
	ClientMain.py ClientUtils.py ServerInfo.py test.py 
Log Message:
clean/flush messages to a single mix; note ping addr:port in TODO

Index: ClientMain.py
===================================================================
RCS file: /home/minion/cvsroot/src/minion/lib/mixminion/ClientMain.py,v
retrieving revision 1.150
retrieving revision 1.151
diff -u -d -r1.150 -r1.151
--- ClientMain.py	15 Jan 2004 21:03:25 -0000	1.150
+++ ClientMain.py	17 Jan 2004 04:24:57 -0000	1.151
@@ -377,7 +377,6 @@
         """Generate a reply message, but do not send it.  Returns
            a tuple of (packet body, ServerInfo for the first hop.)
 
-
             directory -- an instance of ClientDirectory; used to generate
                paths.
             address -- an instance of ExitAddress, used to tell where to
@@ -530,10 +529,12 @@
         finally:
                 clientUnlock()
 
-    def flushQueue(self, maxPackets=None):
+    def flushQueue(self, maxPackets=None, handles=None):
         """Try to send packets in the queue to their destinations.  Do not try
            to send more than maxPackets packets.  If not all packets will be
            sent, choose the ones to try at random.
+
+           DOCDOC nicknames
         """
         #XXXX write unit tests
         class PacketProxy:
@@ -550,8 +551,9 @@
         LOG.info("Flushing packet queue")
         clientLock()
         try:
-            handles = self.queue.getHandles()
-            LOG.info("Found %s pending packets", len(handles))
+            if handles is None:
+                handles = self.queue.getHandles()
+                LOG.info("Found %s pending packets", len(handles))
             if maxPackets is not None:
                 handles = mixminion.Crypto.getCommonPRNG().shuffle(handles,
                                                                maxPackets)
@@ -586,12 +588,23 @@
         else:
             LOG.info("No packets delivered")
 
-    def cleanQueue(self, maxAge, now=None):
+    def cleanQueue(self, handles):
         """Remove all packets older than maxAge seconds from the
            client queue."""
         try:
             clientLock()
-            self.queue.cleanQueue(maxAge, now)
+            byRouting = self._sortPackets(
+                [ (h, self.queue.getRouting(h)) for h in handles ],
+                shuffle = 0)
+            byName = [ (displayServer(ri), lst) for ri,lst in byRouting ]
+            byName.sort()
+            if not byName:
+                LOG.info("No packets removed.")
+            for name, lst in byName:
+                LOG.info("Removing %s packets for %s", len(lst), name)
+                for h in lst:
+                    self.queue.removePacket(h)
+            self.queue.cleanQueue()
         finally:
             clientUnlock()
 
@@ -1738,6 +1751,7 @@
 EXAMPLES:
   Try to send all currently queued packets.
       %(cmd)s
+DOCDOC
 """.strip()
 
 def flushQueue(cmd, args):
@@ -1753,7 +1767,8 @@
                 sys.exit(1)
     try:
         parser = CLIArgumentParser(options, wantConfig=1, wantLog=1,
-                                   wantClient=1)
+                                   wantClient=1,
+                                   wantClientDirectory=len(args))
     except UsageError, e:
         e.dump()
         print _FLUSH_QUEUE_USAGE % { 'cmd' : cmd }
@@ -1762,10 +1777,16 @@
     parser.init()
     client = parser.client
 
-    client.flushQueue(count)
+    if args:
+        handles = parser.client.queue.getHandlesByDestAndAge(
+            args,parser.directory, None)
+    else:
+        handles = None
+
+    client.flushQueue(count, handles)
 
 _CLEAN_QUEUE_USAGE = """\
-Usage: %(cmd)s <-d n|--days=n> [options]
+Usage: %(cmd)s  [options] [servername...]
   -h, --help                 Print this usage message and exit.
   -v, --verbose              Display extra debugging messages.
   -f <file>, --config=<file> Use a configuration file other than ~.mixminionrc
@@ -1775,12 +1796,13 @@
 EXAMPLES:
   Remove all pending packets older than one week.
       %(cmd)s -d 7
+DOCDODC
 """.strip()
 
 def cleanQueue(cmd, args):
     options, args = getopt.getopt(args, "hvf:d:",
              ["help", "verbose", "config=", "days=",])
-    days = None
+    days = 60
     for o,v in options:
         if o in ('-d','--days'):
             try:
@@ -1789,10 +1811,9 @@
                 print "ERROR: %s expects an integer" % o
                 sys.exit(1)
     try:
-        if days is None:
-            raise UsageError()
         parser = CLIArgumentParser(options, wantConfig=1, wantLog=1,
-                                   wantClient=1)
+                                   wantClient=1,
+                                   wantClientDirectory=len(args))
     except UsageError, e:
         e.dump()
         print _CLEAN_QUEUE_USAGE % { 'cmd' : cmd }
@@ -1800,7 +1821,14 @@
 
     parser.init()
     client = parser.client
-    client.cleanQueue(days*24*60*60)
+    notAfter = time.time() - days*24*60*60
+    if args:
+        handles = parser.client.queue.getHandlesByDestAndAge(
+            args, parser.directory, notAfter)
+    else:
+        handles = parser.client.queue.getHandlesByAge(notAfter)
+
+    client.cleanQueue(handles)
 
 _LIST_QUEUE_USAGE = """\
 Usage: %(cmd)s [options]

Index: ClientUtils.py
===================================================================
RCS file: /home/minion/cvsroot/src/minion/lib/mixminion/ClientUtils.py,v
retrieving revision 1.17
retrieving revision 1.18
diff -u -d -r1.17 -r1.18
--- ClientUtils.py	15 Jan 2004 21:03:26 -0000	1.17
+++ ClientUtils.py	17 Jan 2004 04:24:57 -0000	1.18
@@ -555,7 +555,6 @@
             del self._now
 
 # ----------------------------------------------------------------------
-
 class SURBLog(mixminion.Filestore.DBBase):
     """A SURBLog manipulates a database on disk to remember which SURBs we've
        used, so we don't reuse them accidentally.
@@ -723,11 +722,67 @@
         finally:
             mixminion.ClientMain.clientUnlock()
 
+    def getHandlesByAge(self, notAfter):
+        self.loadMetadata()
+        result = []
+        for h in self.store.getAllMessages():
+            _,_,when = self.store.getMetadata(h)
+            if when <= notAfter: result.append(h)
+        return result
+
+    def getHandlesByDestAndAge(self, destList, directory, notAfter=None,
+                               warnUnused=1):
+        """DOCDOC destset: set of hostnames, ips, or keyids"""
+        destSet = {}
+        reverse = {}
+        for d in destList:
+            if directory:
+                keyid = directory.getKeyIDByNickname(d)
+                if keyid:
+                    destSet[keyid] = 1
+                    reverse[keyid] = d
+                    continue
+            destSet[d] = 1
+
+        self.loadMetadata()
+        result = []
+        foundAny = {}
+        foundMatch = {}
+        for h in self.store.getAllMessages():
+            _, r, when = self.store.getMetadata(h)
+            if (destSet.has_key(r.keyinfo) or
+                (hasattr(r, 'hostname') and destSet.has_key(r.hostname)) or
+                (hasattr(r, 'ip') and destSet.has_key(r.ip))):
+
+                keys = [ getattr(r, 'hostname', None),
+                         getattr(r, 'ip', None),
+                         reverse.get(r.keyinfo, None),
+                         r.keyinfo ]
+                for k in keys: foundAny[k]=1
+                if notAfter and when > notAfter:
+                    continue
+                for k in keys: foundMatch[k]=1
+                result.append(h)
+        if warnUnused:
+            for d in destList:
+                if foundMatch.get(d):
+                    continue
+                elif foundAny.get(d):
+                    LOG.warn("No expired packets found for %r", d)
+                else:
+                    LOG.warn("No pending packets found for %r", d)
+        return result
+
     def getRouting(self, handle):
         """Return the routing information associated with the given handle."""
         self.loadMetadata()
         return self.store.getMetadata(handle)[1]
 
+    def getDate(self, handle):
+        """Return the date a given handle was inserted."""
+        self.loadMetadata()
+        return self.store.getMetadata(handle)[2]
+
     def getPacket(self, handle):
         """Given a handle, return a 3-tuple of the corresponding
            32K packet, {IPV4/Host}Info, and time of first queueing.  (The time
@@ -777,24 +832,8 @@
             res[s] = (count, oldest)
         return res
 
-    def cleanQueue(self, maxAge=None, now=None):
+    def cleanQueue(self):
         """Remove all packets older than maxAge seconds from this queue."""
-        if now is None:
-            now = time.time()
-        if maxAge is not None:
-            cutoff = now - maxAge
-            remove = []
-            self.loadMetadata()
-            for h in self.getHandles():
-                try:
-                    when = self.store.getMetadata(h)[2]
-                except mixminion.Filestore.CorruptedFile:
-                    continue
-                if when < cutoff:
-                    remove.append(h)
-            LOG.info("Removing %s old packets from queue", len(remove))
-            for h in remove:
-                self.store.removeMessage(h)
         self.store.cleanQueue()
         self.store.cleanMetadata()
 
@@ -808,6 +847,10 @@
             packet, routing, when = self.getPacket(h)
             return "V0", routing, when
 
-        self.store.loadAllMetadata(fixupHandle)
+        mixminion.ClientMain.clientLock()
+        try:
+            self.store.loadAllMetadata(fixupHandle)
+        finally:
+            mixminion.ClientMain.clientUnlock()
 
         self.metadataLoaded = 1

Index: ServerInfo.py
===================================================================
RCS file: /home/minion/cvsroot/src/minion/lib/mixminion/ServerInfo.py,v
retrieving revision 1.75
retrieving revision 1.76
diff -u -d -r1.75 -r1.76
--- ServerInfo.py	9 Jan 2004 00:46:12 -0000	1.75
+++ ServerInfo.py	17 Jan 2004 04:24:57 -0000	1.76
@@ -325,7 +325,7 @@
         return self['Server']['Packet-Key']
 
     def getKeyDigest(self):
-        """Returns a hash of this server's MMTP key"""
+        """Returns a hash of this server's identity key."""
         return sha1(pk_encode_public_key(self['Server']['Identity']))
 
     def getIPV4Info(self):

Index: test.py
===================================================================
RCS file: /home/minion/cvsroot/src/minion/lib/mixminion/test.py,v
retrieving revision 1.177
retrieving revision 1.178
diff -u -d -r1.177 -r1.178
--- test.py	8 Jan 2004 23:07:31 -0000	1.177
+++ test.py	17 Jan 2004 04:24:57 -0000	1.178
@@ -6502,7 +6502,7 @@
         p2 = mixminion.Crypto.getCommonPRNG().getBytes(32*1024)
         ipv4 = mixminion.Packet.IPV4Info("10.20.30.40",48099,"KZ"*10)
         host = mixminion.Packet.MMTPHostInfo("bliznerty.potrzebie",48099,
-                                             "KZ"*10)
+                                             "KL"*10)
         self.assertEquals(cq.getHandles(), [])
         self.assert_(not cq.packetExists("Z"))
         h1 = cq.queuePacket(p1, ipv4, now)
@@ -6517,7 +6517,26 @@
         v = cq.getPacket(h2)
         self.assertEquals((host,previousMidnight(now-24*60*60*10)), v[1:])
         self.assertLongStringEq(v[0], p2)
-        cq.cleanQueue(maxAge=24*60*60,now=now)
+
+        class FakeDir:
+            def getKeyIDByNickname(self, n):
+                if n == 'nerty': return "KL"*10
+                return None
+        D = FakeDir()
+        self.assertEquals([h1],
+                       cq.getHandlesByDestAndAge(["10.20.30.40"], None, None))
+        self.assertEquals([h1],
+                       cq.getHandlesByDestAndAge(["10.20.30.40"], D, None))
+        self.assertEquals([h2],
+                cq.getHandlesByDestAndAge(["bliznerty.potrzebie"], D, None))
+        self.assertEquals([h2],
+                cq.getHandlesByDestAndAge(["nerty"], D, None))
+        self.assertUnorderedEq([h1,h2],
+                cq.getHandlesByDestAndAge(["nerty", "10.20.30.40"], D, None))
+        self.assertEquals([h2], cq.getHandlesByAge(now-24*60*60))
+        cq.removePacket(h2)
+
+        cq.cleanQueue()
         self.assertEquals([h1], cq.getHandles())
         v = cq.getPacket(h1)
         self.assertEquals((ipv4,previousMidnight(now)), v[1:])
@@ -7515,7 +7534,7 @@
     tc = loader.loadTestsFromTestCase
 
     if 0:
-        suite.addTest(tc(ClientDirectoryTests))
+        suite.addTest(tc(ClientUtilTests))
         return suite
     testClasses = [MiscTests,
                    MinionlibCryptoTests,