[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]
[minion-cvs] More work on multithreaded server, other goodies. It s...
Update of /home/minion/cvsroot/src/minion/lib/mixminion
In directory moria.mit.edu:/tmp/cvs-serv10789/lib/mixminion
Modified Files:
ClientMain.py Common.py Main.py benchmark.py test.py
testSupport.py
Log Message:
More work on multithreaded server, other goodies. It seems more stable now.
setup.py:
- Remove old mixminion/server/Queue.py where found
ClientMain, ServerMain:
- Don't use stderr if it isn't an error.
Common:
- Call 'Queue' 'MessageQueue' to avoid potential confusion.
Main:
- Add a disclaimer to the main usage message
Modules, PacketHandler, ServerMain:
- Refactor how we pass packets around so as to move message decoding into the
processing thread: Objects are beautiful; tuples are kludgey.
ServerMain:
- Document more of the multithreaded stuff
- refactor
- make locking more sane
- handle queues with obsolete tuple-based messages.
- Remove old comment.
- Reset logs on sighup
- Check for thread liveness (this may be a bad idea)
ServerQueue:
- Simplify clean logic
- make locking more sane.
- Be more robust on erroneous queue state.
test, benchmark:
- Update tests to use new interfaces
Index: ClientMain.py
===================================================================
RCS file: /home/minion/cvsroot/src/minion/lib/mixminion/ClientMain.py,v
retrieving revision 1.40
retrieving revision 1.41
diff -u -d -r1.40 -r1.41
--- ClientMain.py 9 Jan 2003 06:56:34 -0000 1.40
+++ ClientMain.py 10 Jan 2003 20:12:04 -0000 1.41
@@ -94,9 +94,7 @@
try:
LOG.warn("Removing obsolete server directory %s", sdir)
os.rmdir(sdir)
- print >>sys.stderr, "OK"
except OSError, e:
- print >>sys.stderr, "BAD"
LOG.warn("Failed: %s", e)
def updateDirectory(self, forceDownload=0, now=None):
@@ -1000,8 +998,7 @@
# options will change between now and 1.0.0
def runClient(cmd, args):
if cmd.endswith(" client"):
- print >>sys.stderr, \
- "The 'client' command is deprecated. Use 'send' instead."
+ print "The 'client' command is deprecated. Use 'send' instead."
options, args = getopt.getopt(args, "hvf:i:t:H:P:D:",
["help", "verbose", "config=", "input=",
@@ -1077,7 +1074,7 @@
keystore.updateDirectory(forceDownload=download)
if address is None:
- print >>sys.stderr, "No recipients specified; exiting."
+ print "No recipients specified; exiting."
sys.exit(0)
try:
@@ -1094,7 +1091,7 @@
# XXXX Clean up this ugly control structure.
if inFile is None and address.getRouting()[0] == DROP_TYPE:
payload = ""
- print >>sys.stderr, "Sending dummy message"
+ LOG.info("Sending dummy message")
else:
if address.getRouting()[0] == DROP_TYPE:
LOG.warn("Sending a payload with a dummy message makes no sense")
@@ -1104,8 +1101,7 @@
if inFile == '-':
f = sys.stdin
- print >>sys.stderr, \
- "Enter your message now. Type Ctrl-D when you are done."
+ print "Enter your message now. Type Ctrl-D when you are done."
else:
f = open(inFile, 'r')
@@ -1113,7 +1109,7 @@
payload = f.read()
f.close()
except KeyboardInterrupt:
- print >>sys.stderr, "Interrupted. Message not sent."
+ print "Interrupted. Message not sent."
sys.exit(1)
client.sendForwardMessage(address, payload, path1, path2)
Index: Common.py
===================================================================
RCS file: /home/minion/cvsroot/src/minion/lib/mixminion/Common.py,v
retrieving revision 1.51
retrieving revision 1.52
diff -u -d -r1.51 -r1.52
--- Common.py 9 Jan 2003 17:42:11 -0000 1.51
+++ Common.py 10 Jan 2003 20:12:05 -0000 1.52
@@ -28,7 +28,9 @@
import traceback
# Imported here so we can get it in mixminion.server without being shadowed
# by the old Queue.py file.
-import Queue
+from Queue import Queue
+MessageQueue = Queue
+del Queue
from types import StringType
Index: Main.py
===================================================================
RCS file: /home/minion/cvsroot/src/minion/lib/mixminion/Main.py,v
retrieving revision 1.24
retrieving revision 1.25
diff -u -d -r1.24 -r1.25
--- Main.py 9 Jan 2003 06:50:49 -0000 1.24
+++ Main.py 10 Jan 2003 20:12:05 -0000 1.25
@@ -153,6 +153,13 @@
print ("Copyright 2002-2003 Nick Mathewson. "+
"See LICENSE for licensing information.")
+def printUsage():
+ import mixminion
+ print "Mixminion version %s" % mixminion.__version__
+ print _USAGE
+ print "NOTE: This software is for testing only. The user set is too small"
+ print " to be anonymous, and the code is too alpha to be reliable."
+
def main(args):
"Use <args> to fix path, pick a command and pass it arguments."
# Specifically, args[0] is used to fix sys.path so we can import
@@ -164,7 +171,7 @@
# Check whether we have a recognized command.
if len(args) == 1 or not _COMMANDS.has_key(args[1]):
printVersion(args[0],args[1:])
- print _USAGE
+ printUsage()
sys.exit(1)
# Read the module and function.
Index: benchmark.py
===================================================================
RCS file: /home/minion/cvsroot/src/minion/lib/mixminion/benchmark.py,v
retrieving revision 1.27
retrieving revision 1.28
diff -u -d -r1.27 -r1.28
--- benchmark.py 9 Jan 2003 06:28:58 -0000 1.27
+++ benchmark.py 10 Jan 2003 20:12:05 -0000 1.28
@@ -20,7 +20,8 @@
import mixminion._minionlib as _ml
from mixminion.BuildMessage import _buildHeader, buildForwardMessage, \
- compressData, uncompressData
+ compressData, uncompressData, _encodePayload, decodePayload, \
+ CompressedDataTooLong
from mixminion.Common import secureDelete, installSIGCHLDHandler, \
waitForChildren, formatBase64
from mixminion.Crypto import *
@@ -484,6 +485,31 @@
print "Server process (swap, no log)", timeit(
lambda sp=sp, m_swap=m_swap: sp.processMessage(m_swap), 100)
+def encodingTiming():
+ print "#=============== END-TO-END ENCODING =================="
+ shortP = "hello world"
+ prng = AESCounterPRNG()
+ p = _encodePayload(shortP, 0, prng)
+ t = prng.getBytes(20)
+ print "Decode short payload", timeit(
+ lambda p=p,t=t: decodePayload(p, t), 1000)
+
+ k20 = prng.getBytes(20*1024)
+ p = _encodePayload(k20, 0, prng)
+ t = prng.getBytes(20)
+ print "Decode 20K payload", timeit(
+ lambda p=p,t=t: decodePayload(p, t), 1000)
+
+ comp = "x"*(20*1024)
+ p = _encodePayload(comp, 0, prng)
+ t = prng.getBytes(20)
+ def decode(p=p,t=t):
+ try:
+ decodePayload(p,t)
+ except CompressedDataTooLong:
+ pass
+ print "Decode overcompressed payload", timeit(decode, 1000)
+
#----------------------------------------------------------------------
def timeEfficiency():
print "#================= ACTUAL v. IDEAL ====================="
@@ -673,6 +699,7 @@
buildMessageTiming()
directoryTiming()
fileOpsTiming()
+ encodingTiming()
serverProcessTiming()
hashlogTiming()
timeEfficiency()
Index: test.py
===================================================================
RCS file: /home/minion/cvsroot/src/minion/lib/mixminion/test.py,v
retrieving revision 1.70
retrieving revision 1.71
diff -u -d -r1.70 -r1.71
--- test.py 9 Jan 2003 06:28:58 -0000 1.70
+++ test.py 10 Jan 2003 20:12:05 -0000 1.71
@@ -1918,23 +1918,20 @@
payload: beginning of expected final payload."""
for sp, rt, ri in zip(sps,routingtypes,routinginfo):
res = sp.processMessage(m)
- self.assertEquals(len(res), 2)
+ self.assert_(isinstance(res, DeliveryPacket) or
+ isinstance(res, RelayedPacket))
if rt in (FWD_TYPE, SWAP_FWD_TYPE):
- self.assertEquals(res[0], "QUEUE")
- self.assertEquals(res[1][0].pack(), ri)
- self.assertEquals(FWD_TYPE, rt)
- m = res[1][1]
+ self.assert_(not res.isDelivery())
+ self.assertEquals(res.getAddress().pack(), ri)
+ m = res.getPacket()
else:
- self.assertEquals(res[0], "EXIT")
- self.assertEquals(res[1][0], rt)
- self.assertEquals(res[1][1], ri)
+ self.assert_(res.isDelivery())
+ self.assertEquals(res.getExitType(), rt)
+ self.assertEquals(res.getAddress(), ri)
if appkey:
- self.assertEquals(appkey, res[1][2])
+ self.assertEquals(res.getApplicationKey(), appkey)
- #tag = res[1][3]
- p = res[1][4]
- p = BuildMessage._decodeForwardPayload(p)
- self.assert_(p.startswith(payload))
+ self.assert_(res.getContents().startswith(payload))
break
def test_successful(self):
@@ -2014,9 +2011,9 @@
reply,s,tag = brbi([self.server3], SMTP_TYPE, "fred@invalid")
m = brm("Y", [self.server2], reply)
m2 = brm("Y", [self.server1], reply)
- q, (a,m) = self.sp2.processMessage(m)
+ m = self.sp2.processMessage(m).getPacket()
self.sp3.processMessage(m)
- q, (a,m2) = self.sp1.processMessage(m2)
+ m2 = self.sp1.processMessage(m2).getPacket()
self.failUnlessRaises(ContentError, self.sp3.processMessage, m2)
# Even duplicate secrets need to go.
@@ -2026,14 +2023,14 @@
reply2,s,t = brbi([self.server2], MBOX_TYPE, "foo",0,prng)
m = brm("Y", [self.server3], reply1)
m2 = brm("Y", [self.server3], reply2)
- q, (a,m) = self.sp3.processMessage(m)
+ m = self.sp3.processMessage(m).getPacket()
self.sp1.processMessage(m)
- q, (a,m2) = self.sp3.processMessage(m2)
+ m2 = self.sp3.processMessage(m2).getPacket()
self.failUnlessRaises(ContentError, self.sp2.processMessage, m2)
# Drop gets dropped.
m = bfm("Z", DROP_TYPE, "", [self.server2], [self.server2])
- q, (a,m) = self.sp2.processMessage(m)
+ m = self.sp2.processMessage(m).getPacket()
res = self.sp2.processMessage(m)
self.assertEquals(res,None)
@@ -2103,8 +2100,8 @@
[self.server3])
m_x = m[:-30] + " "*30
assert len(m_x) == len(m)
- q, (a, m_x) = self.sp1.processMessage(m_x)
- q, (a, m_x) = self.sp2.processMessage(m_x)
+ m_x = self.sp1.processMessage(m_x).getPacket()
+ m_x = self.sp2.processMessage(m_x).getPacket()
self.failUnlessRaises(CryptoError, self.sp3.processMessage, m_x)
@@ -3479,9 +3476,10 @@
return None
def getExitTypes(self):
return (1234,)
- def processMessage(self, message, tag, exitType, exitInfo):
- self.processedMessages.append(message)
- self.processedAll.append( (message, tag, exitType, exitInfo) )
+ def processMessage(self, packet):
+ self.processedMessages.append(packet.getContents())
+ self.processedAll.append( packet )
+ exitInfo = packet.getAddress()
if exitInfo == 'fail?':
return mixminion.server.Modules.DELIVER_FAIL_RETRY
elif exitInfo == 'fail!':
@@ -3492,6 +3490,7 @@
class ModuleManagerTests(unittest.TestCase):
def testModuleManager(self):
+ FDP = FakeDeliveryPacket
mod_dir = mix_mktemp()
home_dir = mix_mktemp()
@@ -3529,11 +3528,12 @@
# Try sending a few messages to the module.
t = "ZZZZ"*5
- manager.queueMessage("Hello 1", t, 1234, "fail!")
- manager.queueMessage("Hello 2", t, 1234, "fail?")
- manager.queueMessage("Hello 3", t, 1234, "good")
- manager.queueMessage("Drop very much", None,
- mixminion.Packet.DROP_TYPE, t)
+ manager.queueMessage2(FDP('plain',1234,'fail!',"Hello 1", t))
+ manager.queueMessage2(FDP('plain',1234,'fail?',"Hello 2", t))
+ manager.queueMessage2(FDP('plain',1234,'good',"Hello 3", t))
+ manager.queueMessage2(FDP('plain',
+ mixminion.Packet.DROP_TYPE, "",
+ "Drop very much", t))
queue = manager.queues['TestModule']
# Did the test module's delivery queue get the messages?
self.failUnless(isinstance(queue,
@@ -3557,54 +3557,54 @@
self.assertEquals(4, len(exampleMod.processedMessages))
self.assertEquals("Hello 2", exampleMod.processedMessages[-1])
- # But, none of them was decodeable: all of them should have been
- # tagged as 'err'
- self.assertEquals('err', exampleMod.processedAll[0][1])
+ self.assert_(exampleMod.processedAll[0].isPlaintext())
- # Try a real message, to make sure that we really decode stuff properly
- msg = mixminion.BuildMessage._encodePayload(
- "A man disguised as an ostrich, actually.",
- 0, Crypto.getCommonPRNG())
- manager.queueMessage(msg, "A"*20, 1234, "Hello")
- exampleMod.processedAll = []
- manager.sendReadyMessages()
- # The retriable message got sent again; the other one, we care about.
- pos = None
- for i in xrange(len(exampleMod.processedAll)):
- if not exampleMod.processedAll[i][0].startswith('Hello'):
- pos = i
- self.assert_(pos is not None)
- self.assertEquals(exampleMod.processedAll[i],
- ("A man disguised as an ostrich, actually.",
- None, 1234, "Hello" ))
+#### All these tests belong as tests of DeliveryPacket
- # Now a non-decodeable message
- manager.queueMessage("XYZZYZZY"*3584, "Z"*20, 1234, "Buenas noches")
- exampleMod.processedAll = []
- manager.sendReadyMessages()
- pos = None
- for i in xrange(len(exampleMod.processedAll)):
- if not exampleMod.processedAll[i][0].startswith('Hello'):
- pos = i
- self.assert_(pos is not None)
- self.assertEquals(exampleMod.processedAll[i],
- ("XYZZYZZY"*3584, "Z"*20, 1234, "Buenas noches"))
+## # Try a real message, to make sure that we really decode stuff properly
+## msg = mixminion.BuildMessage._encodePayload(
+## "A man disguised as an ostrich, actually.",
+## 0, Crypto.getCommonPRNG())
+## manager.queueMessage(msg, "A"*20, 1234, "Hello")
+## exampleMod.processedAll = []
+## manager.sendReadyMessages()
+## # The retriable message got sent again; the other one, we care about.
+## pos = None
+## for i in xrange(len(exampleMod.processedAll)):
+## if not exampleMod.processedAll[i][0].startswith('Hello'):
+## pos = i
+## self.assert_(pos is not None)
+## self.assertEquals(exampleMod.processedAll[i],
+## ("A man disguised as an ostrich, actually.",
+## None, 1234, "Hello" ))
- # Now a message that compressed too much.
- # (first, erase the pending message.)
- manager.queues[exampleMod.getName()].removeAll()
- manager.queues[exampleMod.getName()]._rescan()
+## # Now a non-decodeable message
+## manager.queueMessage("XYZZYZZY"*3584, "Z"*20, 1234, "Buenas noches")
+## exampleMod.processedAll = []
+## manager.sendReadyMessages()
+## pos = None
+## for i in xrange(len(exampleMod.processedAll)):
+## if not exampleMod.processedAll[i][0].startswith('Hello'):
+## pos = i
+## self.assert_(pos is not None)
+## self.assertEquals(exampleMod.processedAll[i],
+## ("XYZZYZZY"*3584, "Z"*20, 1234, "Buenas noches"))
- p = "For whom is the funhouse fun?"*8192
- msg = mixminion.BuildMessage._encodePayload(
- p, 0, Crypto.getCommonPRNG())
- manager.queueMessage(msg, "Z"*20, 1234, "Buenas noches")
- exampleMod.processedAll = []
- self.assertEquals(len(exampleMod.processedAll), 0)
- manager.sendReadyMessages()
- self.assertEquals(len(exampleMod.processedAll), 1)
- self.assertEquals(exampleMod.processedAll[0],
- (BuildMessage.compressData(p), 'long', 1234, "Buenas noches"))
+## # Now a message that compressed too much.
+## # (first, erase the pending message.)
+## manager.queues[exampleMod.getName()].removeAll()
+## manager.queues[exampleMod.getName()]._rescan()
+
+## p = "For whom is the funhouse fun?"*8192
+## msg = mixminion.BuildMessage._encodePayload(
+## p, 0, Crypto.getCommonPRNG())
+## manager.queueMessage(msg, "Z"*20, 1234, "Buenas noches")
+## exampleMod.processedAll = []
+## self.assertEquals(len(exampleMod.processedAll), 0)
+## manager.sendReadyMessages()
+## self.assertEquals(len(exampleMod.processedAll), 1)
+## self.assertEquals(exampleMod.processedAll[0],
+## (BuildMessage.compressData(p), 'long', 1234, "Buenas noches"))
# Check serverinfo generation.
try:
@@ -3649,45 +3649,26 @@
def testDecoding(self):
'test decoding and test encapsulation.'
- em = mixminion.server.Modules._escapeMessage
eme = mixminion.server.Modules._escapeMessageForEmail
message = "Somebody set up us the module!\n\n(What you say?)\n"
binmessage = hexread("00ADD1EDC0FFEED00DAD")*40
tag = ".!..!....!........!."
- #####
- # Test escapeMessage
-
- # plaintext text message, text mode.
- self.assertEquals(em(message, None, 1), ("TXT", message, None))
- # plaintext text message, bin mode.
- self.assertEquals(em(message, None, 0), ("TXT", message, None))
- # plaintext bin message, text mode.
- self.assertEquals(em(binmessage, None, 1),
- ("BIN", base64.encodestring(binmessage), None))
- # plaintext bin message, bin mode.
- self.assertEquals(em(binmessage, None, 0), ("BIN", binmessage, None))
-
- encoded = "baobob "*1024*4
- # "Encoded" message, text mode
- self.assertEquals(em(encoded, tag, 1),
- ("ENC", base64.encodestring(encoded),
- base64.encodestring(tag)[:-1]))
- # "Encoded" message, binary mode
- self.assertEquals(em(encoded, tag, 0),
- ("ENC", encoded, tag))
+ def FDPFast(type,message,tag="xyzzyxyzzzyxyzzyxyzzzy"):
+ return FakeDeliveryPacket(type,0xFFFE,"addr",message,tag)
####
# Tests escapeMessageForEmail
- self.assert_(stringContains(eme(message, None), message))
+ self.assert_(stringContains(eme(FDPFast('plain',message)), message))
expect = "BEGINS ========\n"+\
base64.encodestring(binmessage)+"====="
- self.assert_(stringContains(eme(binmessage, None), expect))
+ self.assert_(stringContains(eme(FDPFast('plain',binmessage)), expect))
expect = "BEGINS ========\nDecoding handle: "+\
base64.encodestring(tag)+\
- base64.encodestring(encoded)+"====="
- self.assert_(stringContains(eme(encoded, tag), expect))
+ base64.encodestring(binmessage)+"====="
+ self.assert_(stringContains(eme(FDPFast('enc',binmessage,tag)),
+ expect))
# Sample address file for testing MBOX
MBOX_ADDRESS_SAMPLE = """\
@@ -3736,6 +3717,18 @@
"""
+class FakeDeliveryPacket(mixminion.server.PacketHandler.DeliveryPacket):
+ "DOCDOC"
+ def __init__(self, type, exitType, exitAddress, contents, tag=None):
+ if tag is None:
+ tag = "-="*10
+ mixminion.server.PacketHandler.DeliveryPacket.__init__(self,
+ exitType, exitAddress, None, tag, None)
+ self.type = type
+ self.payload = None
+ self.contents = contents
+
+
class ModuleTests(unittest.TestCase):
def testEmailAddressSet(self):
EmailAddressSet = mixminion.server.Modules.EmailAddressSet
@@ -3815,6 +3808,7 @@
os.spawnl with a stub function so that we don't actually send
anything."""
manager = self.getManager()
+ FDP = FakeDeliveryPacket
# Configure the module.
module = mixminion.server.Modules.MixmasterSMTPModule()
@@ -3827,8 +3821,8 @@
replaceFunction(os, "spawnl")
try:
# Send a message...
- queue.queueDeliveryMessage((SMTP_TYPE, "foo@bar", None),
- "This is the message")
+ queue.queueDeliveryMessage(None,FDP('plain', SMTP_TYPE, "foo@bar",
+ "This is the message"))
queue.sendReadyMessages()
# And make sure that Mixmaster was invoked correctly.
calls = getReplacedFunctionCallLog()
@@ -3859,6 +3853,8 @@
def testDirectSMTP(self):
"""Check out the SMTP module. (We temporarily relace sendSMTPMessage
with a stub function so that we don't actually send anything.)"""
+ FDP = FakeDeliveryPacket
+
blacklistFile = mix_mktemp()
writeFile(blacklistFile, "Deny onehost wangafu.net\nDeny user fred\n")
@@ -3888,7 +3884,8 @@
"Free to hide no more.")
# Try queueing a valild message and sending it.
- queueMessage((SMTP_TYPE, "users@everywhere", None), haiku)
+ queueMessage(None,
+ FDP('plain', SMTP_TYPE, "users@everywhere", haiku))
self.assertEquals(getReplacedFunctionCallLog(), [])
queue.sendReadyMessages()
# Was sendSMTPMessage invoked correctly?
@@ -3923,8 +3920,9 @@
# an invalid address, and one with a blocked address.
try:
suspendLog()
- queueMessage((SMTP_TYPE, "not.an.addr", None), haiku)
- queueMessage((SMTP_TYPE, "blocked@wangafu.net", None), haiku)
+ queueMessage(None,FDP('plain',SMTP_TYPE, "not.an.addr", haiku))
+ queueMessage(None,FDP('plain',SMTP_TYPE,
+ "blocked@wangafu.net",haiku))
queue.sendReadyMessages()
finally:
s = resumeLog()
@@ -3940,6 +3938,7 @@
def testMBOX(self):
"""Check out the MBOX module. (We temporarily relace sendSMTPMessage
with a stub function so that we don't actually send anything.)"""
+ FDP = FakeDeliveryPacket
# Configure the module
manager = self.getManager()
module = mixminion.server.Modules.MBoxModule()
@@ -3964,21 +3963,24 @@
lambda *args: mixminion.server.Modules.DELIVER_OK)
try:
# Try queueing a message...
- queue.queueDeliveryMessage((MBOX_TYPE, 'mixdiddy', "x"*20),
- hexread("EFFACEAB1EFACADE")*20)
+ queue.queueDeliveryMessage(None,
+ FDP('enc', MBOX_TYPE, 'mixdiddy',
+ hexread("EFFACEAB1EFACADE")*20, "x"*20))
self.assertEquals(getReplacedFunctionCallLog(), [])
# ...and sending it.
queue.sendReadyMessages()
try:
# Also, try sending a message to an unknown address
suspendLog()
- queue.queueDeliveryMessage((MBOX_TYPE, 'mixmuffin', "x"*20),
- hexread("EFFACEAB1EFACADE")*20)
+ queue.queueDeliveryMessage(None,
+ FDP('env', MBOX_TYPE, 'mixmuffin',
+ hexread("EFFACEAB1EFACADE")*20,
+ 'x'*20))
queue.sendReadyMessages()
finally:
m = resumeLog()
- self.assert_(stringContains(m,"Unknown MBOX user 'mixmuffin'"))
- self.assert_(stringContains(m,"Unable to deliver message"))
+ self.assert_(stringContains(m,"Unknown MBOX user 'mixmuffin'"))
+ self.assert_(stringContains(m,"Unable to deliver message"))
# Check that sendSMTPMessage was called correctly.
self.assertEquals(1, len(getReplacedFunctionCallLog()))
@@ -3999,6 +4001,7 @@
def testDirectoryDump(self):
"""Check out the DirectoryStoreModule that we use for testing on
machines with unreliable/nonexistant SMTP."""
+ FDP = FakeDeliveryPacket
eme = mixminion.server.Modules._escapeMessageForEmail
dir = mix_mktemp()
manager = self.getManager()
@@ -4010,19 +4013,20 @@
{'Location': dir, 'UseQueue' : 0}}, manager)
# Try sending a couple of messages.
queue = manager.queues['Testing_DirectoryDump']
- queue.queueDeliveryMessage((0xFFFE, "addr1", "t"*20),
- "This is the message")
+ p1 = FDP('plain',0xFFFE, "addr1","this is the message","t"*20)
+ queue.queueDeliveryMessage(None, p1)
self.assert_(os.path.exists(os.path.join(dir, "0")))
- queue.queueDeliveryMessage((0xFFFE, "addr2", "x"*20),
- "This is message 2")
+ p2 = FDP('plain',0xFFFE, "addr2", "This is message 2", "x"*20)
+ queue.queueDeliveryMessage(None, p2)
self.assert_(os.path.exists(os.path.join(dir, "1")))
- self.assertEquals(eme("This is message 2", "x"*20),
+ self.assertEquals(eme(p2),
readFile(os.path.join(dir, "1")))
# test failure.
try:
suspendLog()
- queue.queueDeliveryMessage((0xFFFE, "FAIL!", "y"*20),
- "This is message X which won't be delivered")
+ queue.queueDeliveryMessage(None,
+ FDP('plain', 0xFFFE, "FAIL!",
+ "This is message X which won't be delivered", "x"*20))
self.assert_(not os.path.exists(os.path.join(dir, "2")))
finally:
m = resumeLog()
@@ -4030,12 +4034,13 @@
try:
suspendLog()
- queue.queueDeliveryMessage((0xFFFE, "fail", "z"*20),
- "This is message X which won't be delivered")
+ queue.queueDeliveryMessage(None,
+ FDP('plain', 0xFFFE, "fail",
+ "This is message X which won't be delivered", "z"*20))
self.assert_(not os.path.exists(os.path.join(dir, "2")))
finally:
m = resumeLog()
- self.assert_(m.endswith("Unable to retry delivery for message\n"))
+ self.assert_(m.endswith("Unable to retry delivery for message\n"))
queue.sendReadyMessages()
@@ -4049,14 +4054,15 @@
self.assertEquals(module.next, 91)
self.assertEquals(len(os.listdir(dir)), 3)
queue = manager.queues['Testing_DirectoryDump']
- queue.queueDeliveryMessage((0xFFFE, "addr91", None),
- "This is message 91")
- queue.queueDeliveryMessage((0xFFFE, "addr92", None),
- "This is message 92")
- queue.queueDeliveryMessage((0xFFFE, "fail", None),
- "This is message 93")
- queue.queueDeliveryMessage((0xFFFE, "FAIL!", None),
- "This is message 94")
+ queue.queueDeliveryMessage(None,
+ FDP('plain',0xFFFE, "addr91", "This is message 91"))
+ queue.queueDeliveryMessage(None,
+ FDP('plain',0xFFFE, "addr92", "This is message 92"))
+ queue.queueDeliveryMessage(None,
+ FDP('plain',0xFFFE, "fail", "This is message 93"))
+ queue.queueDeliveryMessage(None,
+ FDP('plain',0xFFFE, "FAIL!", "This is message 94"))
+
# All 4 messages go into the queue...
self.assertEquals(4, queue.count())
self.assertEquals(3, len(os.listdir(dir)))
Index: testSupport.py
===================================================================
RCS file: /home/minion/cvsroot/src/minion/lib/mixminion/testSupport.py,v
retrieving revision 1.12
retrieving revision 1.13
diff -u -d -r1.12 -r1.13
--- testSupport.py 16 Dec 2002 02:40:11 -0000 1.12
+++ testSupport.py 10 Jan 2003 20:12:05 -0000 1.13
@@ -80,8 +80,9 @@
else:
return ImmediateDeliveryQueue(self)
- def processMessage(self, message, tag, exitType, exitInfo):
- assert exitType == 0xFFFE
+ def processMessage(self, packet):
+ assert packet.getExitType() == 0xFFFE
+ exitInfo = packet.getAddress()
if exitInfo == 'fail':
return DELIVER_FAIL_RETRY
@@ -90,15 +91,15 @@
LOG.debug("Delivering test message")
- m = _escapeMessageForEmail(message, tag)
+ m = _escapeMessageForEmail(packet)
if m is None:
# Ordinarily, we'd drop corrupt messages, but this module is
# meant for debugging.
m = """\
==========CORRUPT OR UNDECODABLE MESSAGE
Decoding handle: %s%s==========MESSAGE ENDS""" % (
- base64.encodestring(tag),
- base64.encodestring(message))
+ base64.encodestring(packet.getTag()),
+ base64.encodestring(packet.getContents()))
f = open(os.path.join(self.loc, str(self.next)), 'w')
self.next += 1