[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[minion-cvs] Client-side pooling, locking, refactoring. (Untested)



Update of /home/minion/cvsroot/src/minion/lib/mixminion
In directory moria.mit.edu:/tmp/cvs-serv31315/lib/mixminion

Modified Files:
	ClientMain.py Common.py MMTPClient.py Main.py Packet.py 
	test.py 
Log Message:
Client-side pooling, locking, refactoring.  (Untested)

(Untested; use at your own risk!)

ClientMain:
        - Add basic locking functionality
        - Add a simple pool to hold messages when we've been asked to, or
          when we can't deliver them.
        - Add log to remember which SURBs we've used
        - Handle files that contain many SURBs
        - Refactor command-line argument parsing to minimize code duplication
        - Add SURB inspection command.

Common:
        - Add a succeedingMidnight function

Common, ServerMain:
        - Move Lockfile to Common

Packet:
        - Add support for multiple reply blocks in one file.



Index: ClientMain.py
===================================================================
RCS file: /home/minion/cvsroot/src/minion/lib/mixminion/ClientMain.py,v
retrieving revision 1.47
retrieving revision 1.48
diff -u -d -r1.47 -r1.48
--- ClientMain.py	5 Feb 2003 06:30:53 -0000	1.47
+++ ClientMain.py	6 Feb 2003 20:20:03 -0000	1.48
@@ -20,6 +20,8 @@
 #      - Per-system directory location is a neat idea, but individual users
 #        must check signature.  That's a way better idea for later.
 
+import anydbm
+import binascii
 import cPickle
 import getopt
 import getpass
@@ -29,26 +31,29 @@
 import sys
 import time
[...1221 lines suppressed...]
+
+    parser.init()
+    client = parser.client
+
+    client.flushPool()
+
+def listPool(cmd, args):
+    options, args = getopt.getopt(args, "hvf:",
+             ["help", "verbose", "config=", ])
+    try:
+        parser = CLIArgumentParser(options, wantConfig=1, wantLog=1,
+                                   wantClient=1)
+    except UsageError, e:
+        e.dump()
+        print _LIST_POOL_USAGE % cmd
+        sys.exit(1)
+
+    parser.init()
+    client = parser.client
+    client.pool.inspectPool()

Index: Common.py
===================================================================
RCS file: /home/minion/cvsroot/src/minion/lib/mixminion/Common.py,v
retrieving revision 1.54
retrieving revision 1.55
diff -u -d -r1.54 -r1.55
--- Common.py	4 Feb 2003 02:02:51 -0000	1.54
+++ Common.py	6 Feb 2003 20:20:03 -0000	1.55
@@ -16,6 +16,7 @@
 import base64
 import bisect
 import calendar
+import fcntl
 import gzip
 import os
 import re
@@ -572,6 +573,12 @@
     yyyy,MM,dd = time.gmtime(when)[0:3]
     return calendar.timegm((yyyy,MM,dd,0,0,0,0,0,0))
 
+def succeedingMidnight(when):
+    "DOCDOC"
+    #XXXX003 test me
+    yyyy,MM,dd = time.gmtime(when)[0:3]
+    return calendar.timegm((yyyy,MM,dd+1,0,0,0,0,0,0))
+
 def formatTime(when,localtime=0):
     """Given a time in seconds since the epoch, returns a time value in the
        format used by server descriptors (YYYY/MM/DD HH:MM:SS) in GMT"""
@@ -853,3 +860,42 @@
             pass
         idx += 1
         fname = os.path.join(base, "%s.%s"%(rest,idx))
+
+#----------------------------------------------------------------------
+class Lockfile:
+    "DOCDOC"
+    def __init__(self, filename):
+        self.filename = filename
+        self.count = 0
+        self.fd = None
+
+    def acquire(self, contents="", blocking=0):
+        "Raises IOError DOCDOC"
+        assert self.fd is None
+        self.fd = os.open(self.filename, os.O_RDWR|os.O_CREAT, 0600)
+        try:
+            if blocking:
+                fcntl.flock(self.fd, fcntl.LOCK_EX|fcntl.LOCK_NB)
+            else:
+                fcntl.flock(self.fd, fcntl.LOCK_EX)
+            self.count += 1
+        except:
+            os.close(self.fd)
+            self.fd = None
+            raise
+
+    def release(self):
+        assert self.fd is not None
+        self.count -= 1
+        if self.count > 0:
+            return
+        try:
+            os.unlink(self.filename)
+            fcntl.flock(self.fd, fcntl.LOCK_UN)
+            os.close(self.fd)
+            self.fd = None
+        except OSError:
+            pass
+    
+                
+        

Index: MMTPClient.py
===================================================================
RCS file: /home/minion/cvsroot/src/minion/lib/mixminion/MMTPClient.py,v
retrieving revision 1.19
retrieving revision 1.20
diff -u -d -r1.19 -r1.20
--- MMTPClient.py	4 Feb 2003 02:33:46 -0000	1.19
+++ MMTPClient.py	6 Feb 2003 20:20:03 -0000	1.20
@@ -58,6 +58,8 @@
 
     def connect(self, connectTimeout=None):
         """Negotiate the handshake and protocol."""
+        #DOCDOC connectTimeout
+        # FFFF There should be a way to specify timeout for communication.
         def sigalarmHandler(sig, _):
             assert sig == signal.SIGALRM
         if connectTimeout:

Index: Main.py
===================================================================
RCS file: /home/minion/cvsroot/src/minion/lib/mixminion/Main.py,v
retrieving revision 1.26
retrieving revision 1.27
diff -u -d -r1.26 -r1.27
--- Main.py	5 Feb 2003 05:34:55 -0000	1.26
+++ Main.py	6 Feb 2003 20:20:03 -0000	1.27
@@ -122,6 +122,9 @@
     "update-servers" : ( 'mixminion.ClientMain', 'updateServers' ),
     "decode" :         ( 'mixminion.ClientMain', 'clientDecode' ),
     "generate-surb" :  ( 'mixminion.ClientMain', 'generateSURB' ),
+    "inspect-surbs" :  ( 'mixminion.ClientMain', 'inspectSURBs' ),
+    "flush" :          ( 'mixminion.ClientMain', 'flushPool' ),
+    "inspect-pool" :   ( 'mixminion.ClientMain', 'listPool' ),
     "server" :         ( 'mixminion.server.ServerMain', 'runServer' ),
     "server-keygen" :  ( 'mixminion.server.ServerMain', 'runKeygen'),
     "server-DELKEYS" : ( 'mixminion.server.ServerMain', 'removeKeys'),
@@ -139,6 +142,7 @@
   "       update-servers [Download a fresh server directory]\n"+
   "       decode         [Decode or decrypt a received message]\n"+
   "       generate-surb  [Generate a single-use reply block]\n"+
+  "       inspect-surbs  [DOCDOC]\n"+
   "                               (For Servers)\n"+
   "       server         [Begin running a Mixminon server]\n"+
   "       server-keygen  [Generate keys for a Mixminion server]\n"+
@@ -151,7 +155,7 @@
   "For help on sending a message, run 'mixminion send --help'"
 )
 
-def printVersion(cmd,args):
+def printVersion():
     import mixminion
     print "Mixminion version %s" % mixminion.__version__
     print ("Copyright 2002-2003 Nick Mathewson.  "+
@@ -174,7 +178,6 @@
 
     # Check whether we have a recognized command.
     if len(args) == 1 or not _COMMANDS.has_key(args[1]):
-        printVersion(args[0],args[1:])
         printUsage()
         sys.exit(1)
 
@@ -185,7 +188,9 @@
 
     # Invoke the command.
     try:
-        commandStr = " ".join(args[0:2])
+        cmdFile = os.path.split(args[0])[1]
+        cmdName = args[1]
+        commandStr = "%s %s" % (cmdFile, cmdName)
         func(commandStr, args[2:])
     except getopt.GetoptError, e:
         sys.stderr.write(str(e)+"\n")

Index: Packet.py
===================================================================
RCS file: /home/minion/cvsroot/src/minion/lib/mixminion/Packet.py,v
retrieving revision 1.27
retrieving revision 1.28
diff -u -d -r1.27 -r1.28
--- Packet.py	5 Feb 2003 06:30:53 -0000	1.27
+++ Packet.py	6 Feb 2003 20:20:03 -0000	1.28
@@ -30,8 +30,9 @@
 import sys
 import zlib
 from socket import inet_ntoa, inet_aton
-from mixminion.Common import MixError, MixFatalError, floorDiv, isSMTPMailbox,\
-     LOG
+from mixminion.Common import MixError, MixFatalError, floorDiv, formatTime, \
+     isSMTPMailbox, LOG
+from mixminion.Crypto import sha1
 
 if sys.version_info[:3] < (2,2,0):
     import mixminion._zlibutil as zlibutil
@@ -427,8 +428,18 @@
         idx = m.end()
     return blocks
 
-def parseReplyBlock(s):
-    """Return a new ReplyBlock object for an encoded reply block."""        
+def parseReplyBlocks(s):
+    "DOCDOC"
+    blocks = []
+    while s:
+        block, length = parseReplyBlock(s, allowMore=1, returnLen=1)
+        blocks.append(block)
+        s = s[length:]
+    return blocks
+
+def parseReplyBlock(s, allowMore=0, returnLen=0):
+    """Return a new ReplyBlock object for an encoded reply block."""
+    # DOCDOC withIdx
     if len(s) < MIN_RB_LEN:
         raise ParseError("Reply block too short")
     try:
@@ -445,10 +456,17 @@
                          major,minor)
 
     ri = s[MIN_RB_LEN:]
-    if len(ri) != rlen:
+    length = rlen + MIN_RB_LEN
+    if allowMore:
+        ri = ri[:rlen]
+    elif len(ri) != rlen:
         raise ParseError("Misformatted reply block")
 
-    return ReplyBlock(header, timestamp, rt, ri, key)
+    surb =  ReplyBlock(header, timestamp, rt, ri, key)
+    if returnLen:
+        return surb, length
+    else:
+        return surb
 
 class ReplyBlock:
     """A mixminion reply block, including the address of the first hop
@@ -462,6 +480,17 @@
         self.routingInfo = ri
         self.encryptionKey = key
 
+    def format(self):
+        hash = binascii.b2a_hex(sha1(self.pack()))
+        expiry = formatTime(self.timestamp)
+        if self.routingType == SWAP_FWD_TYPE:
+            server = parseIPV4Info(self.routingInfo).format()
+        else:
+            server = "????"
+        return """Reply block hash: %s
+Expires at: %s GMT
+First server is: %s""" % (hash, expiry, server)
+
     def pack(self):
         """Returns the external representation of this reply block"""
         return struct.pack(RB_UNPACK_PATTERN,
@@ -506,6 +535,10 @@
         self.ip = ip
         self.port = port
         self.keyinfo = keyinfo
+
+    def format(self):
+        return "%s:%s (keyid=%s)"%(self.ip, self.port,
+                                   binascii.b2a_hex(self.keyinfo))
 
     def pack(self):
         """Return the routing info for this address"""

Index: test.py
===================================================================
RCS file: /home/minion/cvsroot/src/minion/lib/mixminion/test.py,v
retrieving revision 1.79
retrieving revision 1.80
diff -u -d -r1.79 -r1.80
--- test.py	5 Feb 2003 07:10:54 -0000	1.79
+++ test.py	6 Feb 2003 20:20:03 -0000	1.80
@@ -1128,7 +1128,7 @@
         p = ptem(mt1.pack())[0]
         eq(p.pack(), mt1.pack())
         eq(p.getContents(), "Hello, whirled\n")
-        eq(p.isText(), 1)
+        self.assert_(p.isText())
         p = ptem("This message is a test of the emergent broadcast system?\n "
                  +mt2.pack())[0]
         eq(p.pack(), mt2.pack())
@@ -1138,16 +1138,16 @@
         p, i = ptem(s)
         p2, _ = ptem(s, idx=i)
         eq(p.pack(), mb1.pack())
-        eq(p.isBinary(), 1)
+        self.assert_(p.isBinary())
         eq(p.getContents(), v)
         eq(p2.pack(), ml1.pack())
-        eq(p2.isOvercompressed(), 1)
+        self.assert_(p2.isOvercompressed())
         eq(p2.getContents(), v)
         # An encoded message
         p = ptem(menc1.pack())[0]
         eq(p.pack(), menc1.pack())
         eq(p.getContents(), v)
-        eq(p.isEncrypted(), 1)
+        self.assert_(p.isEncrypted())
         eq(p.getTag(), "9"*20)
 
 #----------------------------------------------------------------------
@@ -2874,28 +2874,35 @@
             sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
             sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
             sock.bind(("127.0.0.1", TEST_PORT))
+            #sock.listen(5)
+            #s, _ = sock.accept()
             while pausing[0] > 0:
                 time.sleep(.1)
                 pausing[0] -= .1
             time.sleep(2)
+            #s.close()
             sock.close()
-        pausing = [3]
+        pausing = [4]
         t = threading.Thread(None, threadfn, args=(pausing,))
         t.start()
         
         now = time.time()
+        timedout = 0
         try:
-            mixminion.MMTPClient.sendMessages("127.0.0.1",
-                                              #Is there a better IP????
-                                              TEST_PORT, "Z"*20, ["JUNK"],
-                                              connectTimeout=1)
-            self.fail("Expected the connection to time out")
-        except mixminion.MMTPClient.TimeoutError:
-            pass
-        passed = time.time() - now
+            try:
+                mixminion.MMTPClient.sendMessages("127.0.0.1",
+                                                  TEST_PORT, "Z"*20, ["JUNK"],
+                                                  connectTimeout=1)
+                timedout = 0
+            except mixminion.MMTPClient.TimeoutError:
+                timedout = 1
+        finally:
+            passed = time.time() - now
+            pausing[0] = 0
+            t.join()
+            
         self.assert_(passed < 2)
-        pausing[0] = 0
-        t.join()
+        self.assert_(timedout)
 
     def _testNonblockingTransmission(self):
         server, listener, messagesIn, keyid = _getMMTPServer()
@@ -5266,7 +5273,7 @@
     tc = loader.loadTestsFromTestCase
 
     if 0:
-        suite.addTest(tc(PacketTests))
+        suite.addTest(tc(MMTPTests))
         return suite
 
     suite.addTest(tc(MiscTests))