[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]
[minion-cvs] Configurable, smart retry logic; lots of refactoring; m...
Update of /home/minion/cvsroot/src/minion/lib/mixminion
In directory moria.mit.edu:/tmp/cvs-serv3319/lib/mixminion
Modified Files:
test.py
Log Message:
Configurable, smart retry logic; lots of refactoring; more tests.
mixminiond.conf, ServerQueue, Modules, ServerMain, test:
- Add more sophisticated, more configurable retry logic
Modules, ServerQueue, ServerMain, test:
- Refactor queueDeliveryMessage to not have a bogus 'addr' argument.
ServerMain:
- Change OutgoingQueue to contain instances of RelayedPacket, not 2-tuples of
(IPv4Info, Packet).
ServerQueue:
- Remove dead code
test:
- Test payloads for drop messages.
- Test retry logic on server queues.
- Tests for DeliveryPacket methods
- Tests for connection padding and key renegotiation
- Use new queueDeliveryMessage interface
- Fix bug that generated server descs of the form "5 days days"
Index: test.py
===================================================================
RCS file: /home/minion/cvsroot/src/minion/lib/mixminion/test.py,v
retrieving revision 1.72
retrieving revision 1.73
diff -u -d -r1.72 -r1.73
--- test.py 12 Jan 2003 04:27:19 -0000 1.72
+++ test.py 13 Jan 2003 06:35:52 -0000 1.73
@@ -37,7 +37,6 @@
replaceAttribute, undoReplacedAttributes, replaceFunction, \
getReplacedFunctionCallLog, clearReplacedFunctionCallLog
-
import mixminion.BuildMessage as BuildMessage
import mixminion.ClientMain
import mixminion.Config
@@ -1560,9 +1559,7 @@
"Goodbye") ),
"Hello!!!!")
- m = bfm(payload, 500, "Goodbye",
- [self.server1,],
- [self.server3,])
+ m = bfm(payload, 500, "Goodbye", [self.server1], [self.server3])
messages = {}
@@ -1580,6 +1577,25 @@
"Hello!!!!",
decoder=decoder0)
+ # Drop message gets no tag, random payload
+ m = bfm(payload, DROP_TYPE, "", [self.server1], [self.server3])
+
+ def decoderDrop(p,t,self=self):
+ self.assertEquals(None, t)
+ self.failIf(BuildMessage._checkPayload(p))
+ return ""
+
+ self.do_message_test(m,
+ ( (self.pk1,), None,
+ (SWAP_FWD_TYPE,),
+ (self.server3.getRoutingInfo().pack(),) ),
+ ( (self.pk3,), None,
+ (DROP_TYPE,),
+ ("",) ),
+ "",
+ decoder=decoderDrop)
+
+
# Encrypted forward message
rsa1, rsa2 = self.pk1, self.pk512
payload = "<<<<Hello>>>>" * 100
@@ -1933,6 +1949,7 @@
self.assert_(res.getContents().startswith(payload))
break
+ return res
def test_successful(self):
bfm = BuildMessage.buildForwardMessage
@@ -1951,14 +1968,14 @@
# A one-hop/one-hop message.
m = bfm(p, SMTP_TYPE, "nobody@invalid", [self.server1], [self.server3])
-
- self.do_test_chain(m,
- [self.sp1,self.sp3],
- [FWD_TYPE, SMTP_TYPE],
- [self.server3.getRoutingInfo().pack(),
- "nobody@invalid"],
- p)
-
+
+ pkt = self.do_test_chain(m,
+ [self.sp1,self.sp3],
+ [FWD_TYPE, SMTP_TYPE],
+ [self.server3.getRoutingInfo().pack(),
+ "nobody@invalid"],
+ p)
+
# Try servers with multiple keys
m = bfm(p, SMTP_TYPE, "nobody@invalid", [self.server2], [self.server3])
self.do_test_chain(m, [self.sp2_3, self.sp2_3], [FWD_TYPE, SMTP_TYPE],
@@ -1985,6 +2002,77 @@
longemail],
p)
+ def test_deliverypacket(self):
+ # Test out DeliveryPacket.*: with a plaintext ascii packet.
+ bfm = BuildMessage.buildForwardMessage
+ befm = BuildMessage.buildEncryptedForwardMessage
+
+ p = "That gum you like, it's coming back in style."
+ m = bfm(p, SMTP_TYPE, "nobody@invalid", [self.server1], [self.server3])
+
+ pkt = self.do_test_chain(m,
+ [self.sp1,self.sp3],
+ [FWD_TYPE, SMTP_TYPE],
+ [self.server3.getRoutingInfo().pack(),
+ "nobody@invalid"],
+ p)
+
+ self.assertEquals(SMTP_TYPE, pkt.getExitType())
+ self.assertEquals("nobody@invalid", pkt.getAddress())
+ self.assertEquals(20, len(pkt.getTag()))
+ self.assertEquals(p, pkt.getContents())
+ self.assert_(pkt.isDelivery())
+ self.assert_(pkt.isPlaintext())
+ self.failIf(pkt.isOvercompressed())
+ self.assert_(pkt.isPrintingAscii())
+ self.failIf(pkt.isError())
+ self.assertEquals(p, pkt.getAsciiContents())
+ self.assertEquals(base64.encodestring(pkt.getTag()).strip(),
+ pkt.getAsciiTag())
+ # with a plaintext, nonascii packet.
+ pbin = hexread("0123456789ABCDEFFEDCBA9876543210")
+ m = bfm(pbin, SMTP_TYPE, "nobody@invalid",
+ [self.server1], [self.server3])
+ pkt = self.do_test_chain(m,
+ [self.sp1,self.sp3],
+ [FWD_TYPE, SMTP_TYPE],
+ [self.server3.getRoutingInfo().pack(),
+ "nobody@invalid"],
+ pbin)
+ self.assertEquals(pbin, pkt.getContents())
+ self.assert_(pkt.isPlaintext())
+ self.failIf(pkt.isPrintingAscii())
+ self.assertEquals(base64.encodestring(pkt.getContents()),
+ pkt.getAsciiContents())
+ # with an overcompressed content
+ pcomp = " "*4096
+ m = bfm(pcomp, SMTP_TYPE, "nobody@invalid",
+ [self.server1], [self.server3])
+ pkt = self.do_test_chain(m,
+ [self.sp1,self.sp3],
+ [FWD_TYPE, SMTP_TYPE],
+ [self.server3.getRoutingInfo().pack(),
+ "nobody@invalid"],
+ "")
+ self.assert_(not pkt.isPlaintext())
+ self.assert_(pkt.isOvercompressed())
+ self.assert_(pkt.getAsciiContents(),
+ base64.encodestring(BuildMessage.compressData(pcomp)))
+
+ m = befm(p, SMTP_TYPE, "nobody@invalid", [self.server1],
+ [self.server3], getRSAKey(0,1024))
+ pkt = self.do_test_chain(m,
+ [self.sp1,self.sp3],
+ [FWD_TYPE, SMTP_TYPE],
+ [self.server3.getRoutingInfo().pack(),
+ "nobody@invalid"],
+ "")
+ self.assert_(pkt.isEncrypted())
+ self.assert_(not pkt.isPrintingAscii())
+ self.assertEquals(len(pkt.getContents()), 28*1024)
+ self.assertEquals(base64.encodestring(pkt.getContents()),
+ pkt.getAsciiContents())
+
def test_rejected(self):
bfm = BuildMessage.buildForwardMessage
brm = BuildMessage.buildReplyMessage
@@ -2104,6 +2192,7 @@
m_x = self.sp2.processMessage(m_x).getPacket()
self.failUnlessRaises(CryptoError, self.sp3.processMessage, m_x)
+
#----------------------------------------------------------------------
# QUEUE
@@ -2113,6 +2202,9 @@
def __init__(self,d):
DeliveryQueue.__init__(self,d)
self._msgs = None
+ def sendReadyMessages(self, *x, **y):
+ self._msgs = None
+ DeliveryQueue.sendReadyMessages(self, *x,**y)
def _deliverMessages(self, msgList):
self._msgs = msgList
@@ -2252,26 +2344,27 @@
d_d = mix_mktemp("qd")
queue = TestDeliveryQueue(d_d)
-
+ queue.setRetrySchedule([10, 10, 10, 10]) # Retry up to 40 sec.
+ now = time.time()
# First, make sure the queue stores messages correctly.
- h1 = queue.queueDeliveryMessage("Address 1", "Message 1")
- h2 = queue.queueDeliveryMessage("Address 2", "Message 2")
- self.assertEquals((0, "Address 1", "Message 1"), queue.get(h1))
+ h1 = queue.queueDeliveryMessage("Message 1")
+ h2 = queue.queueDeliveryMessage("Message 2")
+ self.assertEquals((0, "Message 1", 0), queue.get(h1))
# Call sendReadyMessages to begin 'sending' msg1 and msg2.
- queue.sendReadyMessages()
+ queue.sendReadyMessages(now)
msgs = queue._msgs
self.assertEquals(2, len(msgs))
# _deliverMessages should have gotten them both.
- self.failUnless((h1, "Address 1", "Message 1", 0) in msgs)
- self.failUnless((h2, "Address 2", "Message 2", 0) in msgs)
+ self.failUnless((h1, "Message 1", 0) in msgs)
+ self.failUnless((h2, "Message 2", 0) in msgs)
# Add msg3, and acknowledge that msg1 succeeded. msg2 is now in limbo
- h3 = queue.queueDeliveryMessage("Address 3", "Message 3")
+ h3 = queue.queueDeliveryMessage("Message 3")
queue.deliverySucceeded(h1)
# Only msg3 should get sent out, since msg2 is still in progress.
- queue.sendReadyMessages()
+ queue.sendReadyMessages(now+1)
msgs = queue._msgs
- self.assertEquals([(h3, "Address 3", "Message 3", 0)], msgs)
+ self.assertEquals([(h3, "Message 3", 0)], msgs)
# Now, make sure that msg1 is gone from the pool.
allHandles = queue.getAllMessages()
@@ -2288,12 +2381,34 @@
allHandles = queue.getAllMessages()
h4 = allHandles[0]
self.assertEquals([h4], queue.getAllMessages())
- # When we try to send messages again, msg2 should be atttempted.
- queue.sendReadyMessages()
+ # When we try to send messages again after 5 seconds, nothing happens.
+ queue.sendReadyMessages(now+5)
msgs = queue._msgs
- self.assertEquals([(h4, "Address 2", "Message 2", 1)], msgs)
+ self.assertEquals(None, msgs)
+ # When we try to send again after after 11 seconds, message 2 fires.
+ queue.sendReadyMessages(now+11)
+ msgs = queue._msgs
+ self.assertEquals([(h4, "Message 2", 1)], msgs)
self.assertNotEquals(h2, h4)
-
+ queue.deliveryFailed(h4, retriable=1)
+ # At 30 seconds, message 2 fires.
+ h5 = queue.getAllMessages()[0]
+ queue.sendReadyMessages(now+30)
+ msgs = queue._msgs
+ self.assertEquals([(h5, "Message 2", 2)], msgs)
+ self.assertNotEquals(h5, h4)
+ queue.deliveryFailed(h5, retriable=1)
+ # At 45 sec, it fires one last time. It will have gotten up to #4
+ # already.
+ h6 = queue.getAllMessages()[0]
+ queue.sendReadyMessages(now+45)
+ msgs = queue._msgs
+ self.assertEquals([(h6, "Message 2", 4)], msgs)
+ self.assertNotEquals(h6, h5)
+ queue.deliveryFailed(h6, retriable=1)
+ # Now Message 2 is timed out.
+ self.assertEquals([], queue.getAllMessages())
+
queue.removeAll()
queue.cleanQueue()
@@ -2570,12 +2685,16 @@
messagesIn = []
def receivedHook(pkt,m=messagesIn):
m.append(pkt)
+ server.nJunkPackets = 0
+ def junkCallback(server=server): server.nJunkPackets += 1
def conFactory(sock, context=_getTLSContext(1),
- receiveMessage=receivedHook):
+ receiveMessage=receivedHook,junkCallback=junkCallback):
tls = context.sock(sock, serverMode=1)
sock.setblocking(0)
- return mixminion.server.MMTPServer.MMTPServerConnection(sock,tls,
- receiveMessage)
+ con = mixminion.server.MMTPServer.MMTPServerConnection(sock,tls,
+ receiveMessage)
+ con.junkCallback = junkCallback
+ return con
listener = mixminion.server.MMTPServer.ListenConnection("127.0.0.1",
TEST_PORT, 5, conFactory)
listener.register(server)
@@ -2618,10 +2737,12 @@
messages = ["helloxxx"*4096, "helloyyy"*4096]
+ # Send m1, then junk, then renegotiate, then m2.
server.process(0.1)
t = threading.Thread(None,
mixminion.MMTPClient.sendMessages,
- args=("127.0.0.1", TEST_PORT, keyid, messages))
+ args=("127.0.0.1", TEST_PORT, keyid,
+ [messages[0],"JUNK","RENEGOTIATE",messages[1]]))
t.start()
while len(messagesIn) < 2:
server.process(0.1)
@@ -2631,6 +2752,7 @@
server.process(0.1)
self.failUnless(messagesIn == messages)
+ self.assertEquals(1, server.nJunkPackets)
# Now, with bad keyid.
t = threading.Thread(None,
@@ -2648,19 +2770,19 @@
self.listener = listener
self.server = server
+ # Send m1, then junk, then renegotiate, then junk, then m2.
tlscon = mixminion.server.MMTPServer.SimpleTLSConnection
messages = ["helloxxx"*4096, "helloyyy"*4096]
async = mixminion.server.MMTPServer.AsyncServer()
clientcon = mixminion.server.MMTPServer.MMTPClientConnection(
_getTLSContext(0), "127.0.0.1", TEST_PORT, keyid,
- messages[:]+["JUNK","RENEGOTIATE","JUNK"],
+ [messages[0],"JUNK","RENEGOTIATE","JUNK",messages[1]],
[None, None], None)
clientcon.register(async)
def clientThread(clientcon=clientcon, async=async):
while not clientcon.isShutdown():
async.process(2)
-
server.process(0.1)
startTime = time.time()
t = threading.Thread(None, clientThread)
@@ -2682,7 +2804,8 @@
self.failUnless(c is not None)
self.failUnless(len(c) == 1)
self.failUnless(startTime <= c[0].lastActivity <= endTime)
-
+ self.assertEquals(2, server.nJunkPackets)
+
# Again, with bad keyid.
clientcon = mixminion.server.MMTPServer.MMTPClientConnection(
_getTLSContext(0), "127.0.0.1", TEST_PORT, "Z"*20,
@@ -2703,7 +2826,7 @@
t.join()
finally:
resumeLog() #unsuppress warning
-
+
def _testTimeout(self):
server, listener, messagesIn, keyid = _getMMTPServer()
self.listener = listener
@@ -2754,7 +2877,7 @@
finally:
logMessage = resumeLog()
# Did we log the timeout?
- self.assert_(stringContains(logMessage, "timed out"))#XXXX
+ self.assert_(stringContains(logMessage, "timed out"))
# Was the one message we expected in fact transmitted?
self.assertEquals([messagesIn[0]], ["helloxxx"*4096])
@@ -2940,6 +3063,15 @@
self.failUnless(floatEq(C._parseInterval("1.5 minutes")[2],
90))
self.assertEquals(C._parseInterval("2 houRS"), (2,"hour",7200))
+ # IntervalList
+ self.assertEquals(C._parseIntervalList(" 5 sec, 1 min, 2 hours"),
+ [ 5, 60, 7200 ])#XXXX mode
+ self.assertEquals([5,5,5,5,5,5, 8*3600,8*3600,8*3600,8*3600,],
+ C._parseIntervalList("5 sec for 30 sec, 8 hours for 1.3 days"))
+ self.assertEquals([60], C._parseIntervalList("1 min for 1 min"))
+ self.assertEquals([60,60], C._parseIntervalList("1 min for 1.5 min"))
+ self.assertEquals([60,60],
+ C._parseIntervalList("EVERY 1 min for 1.5 min"))
# int
self.assertEquals(C._parseInt("99"), 99)
# IP
@@ -3011,6 +3143,10 @@
fails(C._parseInterval, "seconds")
fails(C._parseInterval, "15")
fails(C._parseInterval, " 10 intervals")
+ fails(C._parseIntervalList, "1 min for 1 min for 1 min")
+ fails(C._parseIntervalList, "1 min for 2 fnords")
+ fails(C._parseIntervalList, "0 min for 2 hours")
+ fails(C._parseIntervalList, "every 30 minutes")
fails(C._parseInt, "9.9")
fails(C._parseInt, "9abc")
fails(C._parseIP, "256.0.0.1")
@@ -3138,9 +3274,9 @@
eq(info['Incoming/MMTP']['Version'], "0.1")
eq(info['Incoming/MMTP']['Port'], 48099)
- eq(info['Incoming/MMTP']['Protocols'], "0.1")
+ eq(info['Incoming/MMTP']['Protocols'], "0.1,0.2")
eq(info['Outgoing/MMTP']['Version'], "0.1")
- eq(info['Outgoing/MMTP']['Protocols'], "0.1")
+ eq(info['Outgoing/MMTP']['Protocols'], "0.1,0.2")
eq(info['Incoming/MMTP']['Allow'], [("192.168.0.16", "255.255.255.255",
1,1024),
("0.0.0.0", "0.0.0.0",
@@ -3517,12 +3653,12 @@
# Try sending a few messages to the module.
t = "ZZZZ"*5
- manager.queueMessage2(FDP('plain',1234,'fail!',"Hello 1", t))
- manager.queueMessage2(FDP('plain',1234,'fail?',"Hello 2", t))
- manager.queueMessage2(FDP('plain',1234,'good',"Hello 3", t))
- manager.queueMessage2(FDP('plain',
- mixminion.Packet.DROP_TYPE, "",
- "Drop very much", t))
+ manager.queueDecodedMessage(FDP('plain',1234,'fail!',"Hello 1", t))
+ manager.queueDecodedMessage(FDP('plain',1234,'fail?',"Hello 2", t))
+ manager.queueDecodedMessage(FDP('plain',1234,'good',"Hello 3", t))
+ manager.queueDecodedMessage(FDP('plain',
+ mixminion.Packet.DROP_TYPE, "",
+ "Drop very much", t))
queue = manager.queues['TestModule']
# Did the test module's delivery queue get the messages?
self.failUnless(isinstance(queue,
@@ -3803,6 +3939,7 @@
module = mixminion.server.Modules.MixmasterSMTPModule()
module.configure({"Delivery/SMTP-Via-Mixmaster" :
{"Enabled":1, "Server": "nonesuch",
+ "Retry": [0,0,0,0],
"SubjectLine":'foobar',
'MixCommand' : ('ls', ['-z'])}},
manager)
@@ -3810,8 +3947,8 @@
replaceFunction(os, "spawnl")
try:
# Send a message...
- queue.queueDeliveryMessage(None,FDP('plain', SMTP_TYPE, "foo@bar",
- "This is the message"))
+ queue.queueDeliveryMessage(FDP('plain', SMTP_TYPE, "foo@bar",
+ "This is the message"))
queue.sendReadyMessages()
# And make sure that Mixmaster was invoked correctly.
calls = getReplacedFunctionCallLog()
@@ -3860,7 +3997,6 @@
queue = manager.queues["SMTP"]
queueMessage = queue.queueDeliveryMessage
-
# Make sure blacklist got read.
self.assert_(module.blacklist.contains("nobody@wangafu.net"))
@@ -3873,8 +4009,7 @@
"Free to hide no more.")
# Try queueing a valild message and sending it.
- queueMessage(None,
- FDP('plain', SMTP_TYPE, "users@everywhere", haiku))
+ queueMessage(FDP('plain', SMTP_TYPE, "users@everywhere", haiku))
self.assertEquals(getReplacedFunctionCallLog(), [])
queue.sendReadyMessages()
# Was sendSMTPMessage invoked correctly?
@@ -3909,9 +4044,9 @@
# an invalid address, and one with a blocked address.
try:
suspendLog()
- queueMessage(None,FDP('plain',SMTP_TYPE, "not.an.addr", haiku))
- queueMessage(None,FDP('plain',SMTP_TYPE,
- "blocked@wangafu.net",haiku))
+ queueMessage(FDP('plain',SMTP_TYPE, "not.an.addr", haiku))
+ queueMessage(FDP('plain',SMTP_TYPE,
+ "blocked@wangafu.net",haiku))
queue.sendReadyMessages()
finally:
s = resumeLog()
@@ -3940,6 +4075,7 @@
"AddressFile": addrfile,
"ReturnAddress": "returnaddress@x",
"RemoveContact": "removeaddress@x",
+ "Retry": [0,0,0,0],
"SMTPServer" : "foo.bar.baz"}}, manager)
# Check that the address file was read correctly.
self.assertEquals({'mix-minion': 'mixminion@thishost',
@@ -3952,8 +4088,7 @@
lambda *args: mixminion.server.Modules.DELIVER_OK)
try:
# Try queueing a message...
- queue.queueDeliveryMessage(None,
- FDP('enc', MBOX_TYPE, 'mixdiddy',
+ queue.queueDeliveryMessage(FDP('enc', MBOX_TYPE, 'mixdiddy',
hexread("EFFACEAB1EFACADE")*20, "x"*20))
self.assertEquals(getReplacedFunctionCallLog(), [])
# ...and sending it.
@@ -3961,9 +4096,9 @@
try:
# Also, try sending a message to an unknown address
suspendLog()
- queue.queueDeliveryMessage(None,
- FDP('env', MBOX_TYPE, 'mixmuffin',
- hexread("EFFACEAB1EFACADE")*20,
+ queue.queueDeliveryMessage(
+ FDP('env', MBOX_TYPE, 'mixmuffin',
+ hexread("EFFACEAB1EFACADE")*20,
'x'*20))
queue.sendReadyMessages()
finally:
@@ -4003,17 +4138,17 @@
# Try sending a couple of messages.
queue = manager.queues['Testing_DirectoryDump']
p1 = FDP('plain',0xFFFE, "addr1","this is the message","t"*20)
- queue.queueDeliveryMessage(None, p1)
+ queue.queueDeliveryMessage(p1)
self.assert_(os.path.exists(os.path.join(dir, "0")))
p2 = FDP('plain',0xFFFE, "addr2", "This is message 2", "x"*20)
- queue.queueDeliveryMessage(None, p2)
+ queue.queueDeliveryMessage(p2)
self.assert_(os.path.exists(os.path.join(dir, "1")))
self.assertEquals(eme(p2),
readFile(os.path.join(dir, "1")))
# test failure.
try:
suspendLog()
- queue.queueDeliveryMessage(None,
+ queue.queueDeliveryMessage(
FDP('plain', 0xFFFE, "FAIL!",
"This is message X which won't be delivered", "x"*20))
self.assert_(not os.path.exists(os.path.join(dir, "2")))
@@ -4023,7 +4158,7 @@
try:
suspendLog()
- queue.queueDeliveryMessage(None,
+ queue.queueDeliveryMessage(
FDP('plain', 0xFFFE, "fail",
"This is message X which won't be delivered", "z"*20))
self.assert_(not os.path.exists(os.path.join(dir, "2")))
@@ -4043,13 +4178,13 @@
self.assertEquals(module.next, 91)
self.assertEquals(len(os.listdir(dir)), 3)
queue = manager.queues['Testing_DirectoryDump']
- queue.queueDeliveryMessage(None,
+ queue.queueDeliveryMessage(
FDP('plain',0xFFFE, "addr91", "This is message 91"))
- queue.queueDeliveryMessage(None,
+ queue.queueDeliveryMessage(
FDP('plain',0xFFFE, "addr92", "This is message 92"))
- queue.queueDeliveryMessage(None,
+ queue.queueDeliveryMessage(
FDP('plain',0xFFFE, "fail", "This is message 93"))
- queue.queueDeliveryMessage(None,
+ queue.queueDeliveryMessage(
FDP('plain',0xFFFE, "FAIL!", "This is message 94"))
# All 4 messages go into the queue...
@@ -4234,7 +4369,7 @@
Mode: relay
Homedir: %(homedir)s
EncryptIdentityKey: No
-PublicKeyLifetime: %(lifetime)s days
+PublicKeyLifetime: %(lifetime)s
IdentityKeyBits: 2048
EncryptPrivateKey: no
Nickname: %(nickname)s
@@ -4968,7 +5103,7 @@
tc = loader.loadTestsFromTestCase
if 0:
- suite.addTest(tc(MMTPTests))
+ suite.addTest(tc(BuildMessageTests))
return suite
suite.addTest(tc(MiscTests))