[Author Prev][Author Next][Thread Prev][Thread Next][Author Index][Thread Index]
[minion-cvs] clean/flush messages to a single mix; note ping addr:po...
Update of /home/minion/cvsroot/src/minion/lib/mixminion
In directory moria.mit.edu:/tmp/cvs-serv29226/lib/mixminion
Modified Files:
ClientMain.py ClientUtils.py ServerInfo.py test.py
Log Message:
clean/flush messages to a single mix; note ping addr:port in TODO
Index: ClientMain.py
===================================================================
RCS file: /home/minion/cvsroot/src/minion/lib/mixminion/ClientMain.py,v
retrieving revision 1.150
retrieving revision 1.151
diff -u -d -r1.150 -r1.151
--- ClientMain.py 15 Jan 2004 21:03:25 -0000 1.150
+++ ClientMain.py 17 Jan 2004 04:24:57 -0000 1.151
@@ -377,7 +377,6 @@
"""Generate a reply message, but do not send it. Returns
a tuple of (packet body, ServerInfo for the first hop.)
-
directory -- an instance of ClientDirectory; used to generate
paths.
address -- an instance of ExitAddress, used to tell where to
@@ -530,10 +529,12 @@
finally:
clientUnlock()
- def flushQueue(self, maxPackets=None):
+ def flushQueue(self, maxPackets=None, handles=None):
"""Try to send packets in the queue to their destinations. Do not try
to send more than maxPackets packets. If not all packets will be
sent, choose the ones to try at random.
+
+ DOCDOC nicknames
"""
#XXXX write unit tests
class PacketProxy:
@@ -550,8 +551,9 @@
LOG.info("Flushing packet queue")
clientLock()
try:
- handles = self.queue.getHandles()
- LOG.info("Found %s pending packets", len(handles))
+ if handles is None:
+ handles = self.queue.getHandles()
+ LOG.info("Found %s pending packets", len(handles))
if maxPackets is not None:
handles = mixminion.Crypto.getCommonPRNG().shuffle(handles,
maxPackets)
@@ -586,12 +588,23 @@
else:
LOG.info("No packets delivered")
- def cleanQueue(self, maxAge, now=None):
+ def cleanQueue(self, handles):
"""Remove all packets older than maxAge seconds from the
client queue."""
try:
clientLock()
- self.queue.cleanQueue(maxAge, now)
+ byRouting = self._sortPackets(
+ [ (h, self.queue.getRouting(h)) for h in handles ],
+ shuffle = 0)
+ byName = [ (displayServer(ri), lst) for ri,lst in byRouting ]
+ byName.sort()
+ if not byName:
+ LOG.info("No packets removed.")
+ for name, lst in byName:
+ LOG.info("Removing %s packets for %s", len(lst), name)
+ for h in lst:
+ self.queue.removePacket(h)
+ self.queue.cleanQueue()
finally:
clientUnlock()
@@ -1738,6 +1751,7 @@
EXAMPLES:
Try to send all currently queued packets.
%(cmd)s
+DOCDOC
""".strip()
def flushQueue(cmd, args):
@@ -1753,7 +1767,8 @@
sys.exit(1)
try:
parser = CLIArgumentParser(options, wantConfig=1, wantLog=1,
- wantClient=1)
+ wantClient=1,
+ wantClientDirectory=len(args))
except UsageError, e:
e.dump()
print _FLUSH_QUEUE_USAGE % { 'cmd' : cmd }
@@ -1762,10 +1777,16 @@
parser.init()
client = parser.client
- client.flushQueue(count)
+ if args:
+ handles = parser.client.queue.getHandlesByDestAndAge(
+ args,parser.directory, None)
+ else:
+ handles = None
+
+ client.flushQueue(count, handles)
_CLEAN_QUEUE_USAGE = """\
-Usage: %(cmd)s <-d n|--days=n> [options]
+Usage: %(cmd)s [options] [servername...]
-h, --help Print this usage message and exit.
-v, --verbose Display extra debugging messages.
-f <file>, --config=<file> Use a configuration file other than ~.mixminionrc
@@ -1775,12 +1796,13 @@
EXAMPLES:
Remove all pending packets older than one week.
%(cmd)s -d 7
+DOCDODC
""".strip()
def cleanQueue(cmd, args):
options, args = getopt.getopt(args, "hvf:d:",
["help", "verbose", "config=", "days=",])
- days = None
+ days = 60
for o,v in options:
if o in ('-d','--days'):
try:
@@ -1789,10 +1811,9 @@
print "ERROR: %s expects an integer" % o
sys.exit(1)
try:
- if days is None:
- raise UsageError()
parser = CLIArgumentParser(options, wantConfig=1, wantLog=1,
- wantClient=1)
+ wantClient=1,
+ wantClientDirectory=len(args))
except UsageError, e:
e.dump()
print _CLEAN_QUEUE_USAGE % { 'cmd' : cmd }
@@ -1800,7 +1821,14 @@
parser.init()
client = parser.client
- client.cleanQueue(days*24*60*60)
+ notAfter = time.time() - days*24*60*60
+ if args:
+ handles = parser.client.queue.getHandlesByDestAndAge(
+ args, parser.directory, notAfter)
+ else:
+ handles = parser.client.queue.getHandlesByAge(notAfter)
+
+ client.cleanQueue(handles)
_LIST_QUEUE_USAGE = """\
Usage: %(cmd)s [options]
Index: ClientUtils.py
===================================================================
RCS file: /home/minion/cvsroot/src/minion/lib/mixminion/ClientUtils.py,v
retrieving revision 1.17
retrieving revision 1.18
diff -u -d -r1.17 -r1.18
--- ClientUtils.py 15 Jan 2004 21:03:26 -0000 1.17
+++ ClientUtils.py 17 Jan 2004 04:24:57 -0000 1.18
@@ -555,7 +555,6 @@
del self._now
# ----------------------------------------------------------------------
-
class SURBLog(mixminion.Filestore.DBBase):
"""A SURBLog manipulates a database on disk to remember which SURBs we've
used, so we don't reuse them accidentally.
@@ -723,11 +722,67 @@
finally:
mixminion.ClientMain.clientUnlock()
+ def getHandlesByAge(self, notAfter):
+ self.loadMetadata()
+ result = []
+ for h in self.store.getAllMessages():
+ _,_,when = self.store.getMetadata(h)
+ if when <= notAfter: result.append(h)
+ return result
+
+ def getHandlesByDestAndAge(self, destList, directory, notAfter=None,
+ warnUnused=1):
+ """DOCDOC destset: set of hostnames, ips, or keyids"""
+ destSet = {}
+ reverse = {}
+ for d in destList:
+ if directory:
+ keyid = directory.getKeyIDByNickname(d)
+ if keyid:
+ destSet[keyid] = 1
+ reverse[keyid] = d
+ continue
+ destSet[d] = 1
+
+ self.loadMetadata()
+ result = []
+ foundAny = {}
+ foundMatch = {}
+ for h in self.store.getAllMessages():
+ _, r, when = self.store.getMetadata(h)
+ if (destSet.has_key(r.keyinfo) or
+ (hasattr(r, 'hostname') and destSet.has_key(r.hostname)) or
+ (hasattr(r, 'ip') and destSet.has_key(r.ip))):
+
+ keys = [ getattr(r, 'hostname', None),
+ getattr(r, 'ip', None),
+ reverse.get(r.keyinfo, None),
+ r.keyinfo ]
+ for k in keys: foundAny[k]=1
+ if notAfter and when > notAfter:
+ continue
+ for k in keys: foundMatch[k]=1
+ result.append(h)
+ if warnUnused:
+ for d in destList:
+ if foundMatch.get(d):
+ continue
+ elif foundAny.get(d):
+ LOG.warn("No expired packets found for %r", d)
+ else:
+ LOG.warn("No pending packets found for %r", d)
+ return result
+
def getRouting(self, handle):
"""Return the routing information associated with the given handle."""
self.loadMetadata()
return self.store.getMetadata(handle)[1]
+ def getDate(self, handle):
+ """Return the date a given handle was inserted."""
+ self.loadMetadata()
+ return self.store.getMetadata(handle)[2]
+
def getPacket(self, handle):
"""Given a handle, return a 3-tuple of the corresponding
32K packet, {IPV4/Host}Info, and time of first queueing. (The time
@@ -777,24 +832,8 @@
res[s] = (count, oldest)
return res
- def cleanQueue(self, maxAge=None, now=None):
+ def cleanQueue(self):
"""Remove all packets older than maxAge seconds from this queue."""
- if now is None:
- now = time.time()
- if maxAge is not None:
- cutoff = now - maxAge
- remove = []
- self.loadMetadata()
- for h in self.getHandles():
- try:
- when = self.store.getMetadata(h)[2]
- except mixminion.Filestore.CorruptedFile:
- continue
- if when < cutoff:
- remove.append(h)
- LOG.info("Removing %s old packets from queue", len(remove))
- for h in remove:
- self.store.removeMessage(h)
self.store.cleanQueue()
self.store.cleanMetadata()
@@ -808,6 +847,10 @@
packet, routing, when = self.getPacket(h)
return "V0", routing, when
- self.store.loadAllMetadata(fixupHandle)
+ mixminion.ClientMain.clientLock()
+ try:
+ self.store.loadAllMetadata(fixupHandle)
+ finally:
+ mixminion.ClientMain.clientUnlock()
self.metadataLoaded = 1
Index: ServerInfo.py
===================================================================
RCS file: /home/minion/cvsroot/src/minion/lib/mixminion/ServerInfo.py,v
retrieving revision 1.75
retrieving revision 1.76
diff -u -d -r1.75 -r1.76
--- ServerInfo.py 9 Jan 2004 00:46:12 -0000 1.75
+++ ServerInfo.py 17 Jan 2004 04:24:57 -0000 1.76
@@ -325,7 +325,7 @@
return self['Server']['Packet-Key']
def getKeyDigest(self):
- """Returns a hash of this server's MMTP key"""
+ """Returns a hash of this server's identity key."""
return sha1(pk_encode_public_key(self['Server']['Identity']))
def getIPV4Info(self):
Index: test.py
===================================================================
RCS file: /home/minion/cvsroot/src/minion/lib/mixminion/test.py,v
retrieving revision 1.177
retrieving revision 1.178
diff -u -d -r1.177 -r1.178
--- test.py 8 Jan 2004 23:07:31 -0000 1.177
+++ test.py 17 Jan 2004 04:24:57 -0000 1.178
@@ -6502,7 +6502,7 @@
p2 = mixminion.Crypto.getCommonPRNG().getBytes(32*1024)
ipv4 = mixminion.Packet.IPV4Info("10.20.30.40",48099,"KZ"*10)
host = mixminion.Packet.MMTPHostInfo("bliznerty.potrzebie",48099,
- "KZ"*10)
+ "KL"*10)
self.assertEquals(cq.getHandles(), [])
self.assert_(not cq.packetExists("Z"))
h1 = cq.queuePacket(p1, ipv4, now)
@@ -6517,7 +6517,26 @@
v = cq.getPacket(h2)
self.assertEquals((host,previousMidnight(now-24*60*60*10)), v[1:])
self.assertLongStringEq(v[0], p2)
- cq.cleanQueue(maxAge=24*60*60,now=now)
+
+ class FakeDir:
+ def getKeyIDByNickname(self, n):
+ if n == 'nerty': return "KL"*10
+ return None
+ D = FakeDir()
+ self.assertEquals([h1],
+ cq.getHandlesByDestAndAge(["10.20.30.40"], None, None))
+ self.assertEquals([h1],
+ cq.getHandlesByDestAndAge(["10.20.30.40"], D, None))
+ self.assertEquals([h2],
+ cq.getHandlesByDestAndAge(["bliznerty.potrzebie"], D, None))
+ self.assertEquals([h2],
+ cq.getHandlesByDestAndAge(["nerty"], D, None))
+ self.assertUnorderedEq([h1,h2],
+ cq.getHandlesByDestAndAge(["nerty", "10.20.30.40"], D, None))
+ self.assertEquals([h2], cq.getHandlesByAge(now-24*60*60))
+ cq.removePacket(h2)
+
+ cq.cleanQueue()
self.assertEquals([h1], cq.getHandles())
v = cq.getPacket(h1)
self.assertEquals((ipv4,previousMidnight(now)), v[1:])
@@ -7515,7 +7534,7 @@
tc = loader.loadTestsFromTestCase
if 0:
- suite.addTest(tc(ClientDirectoryTests))
+ suite.addTest(tc(ClientUtilTests))
return suite
testClasses = [MiscTests,
MinionlibCryptoTests,