[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[minion-cvs] Refactor client locking (untested)



Update of /home/minion/cvsroot/src/minion/lib/mixminion
In directory moria.mit.edu:/tmp/cvs-serv10266/lib/mixminion

Modified Files:
	ClientMain.py Common.py benchmark.py test.py 
Log Message:
Refactor client locking (untested)

Index: ClientMain.py
===================================================================
RCS file: /home/minion/cvsroot/src/minion/lib/mixminion/ClientMain.py,v
retrieving revision 1.48
retrieving revision 1.49
diff -u -d -r1.48 -r1.49
--- ClientMain.py	6 Feb 2003 20:20:03 -0000	1.48
+++ ClientMain.py	7 Feb 2003 17:23:11 -0000	1.49
@@ -53,6 +53,20 @@
 MIXMINION_DIRECTORY_FINGERPRINT = "CD80DD1B8BE7CA2E13C928D57499992D56579CCD"
 
 
+
+_CLIENT_LOCKFILE = None
+
+def clientLock():
+    assert _CLIENT_LOCKFILE is not None
+    _CLIENT_LOCKFILE.acquire(blocking=1)
+
+def clientUnlock():
+    _CLIENT_LOCKFILE.release()
+
+def configureClientLock(filename):
+    global _CLIENT_LOCKFILE
+    _CLIENT_LOCKFILE = Lockfile(filename)
+
 #XXXX003 rename to server list
 class ClientKeystore:
     """A ClientKeystore manages a list of server descriptors, either
@@ -88,14 +102,13 @@
         createPrivateDir(self.dir)
         createPrivateDir(os.path.join(self.dir, "imported"))
         self.digestMap = {}
-        self.lockfile = Lockfile(os.path.join(self.dir, "lock"))
         self.__scanning = 0
         try:
-            self.lock()
+            clientLock()
             self.__load()
             self.clean()
         finally:
-            self.unlock()
+            clientUnlock()
         #XXXX003 Check version against directory's Recommended-Software field.
 
         # Mixminion 0.0.1 used an obsolete directory-full-of-servers in
@@ -112,12 +125,6 @@
                 except OSError, e:
                     LOG.warn("Failed: %s", e)
 
-    def lock(self):
-        self.lockfile.acquire(blocking=1)
-
-    def unlock(self):
-        self.lockfile.release()
-
     def updateDirectory(self, forceDownload=0, now=None):
         """Download a directory from the network as needed."""
         if now is None:
@@ -916,12 +923,9 @@
     # DB holds HEX(hash) -> str(expiry)
     def __init__(self, filename, forceClean=0):
         parent, shortfn = os.path.split(filename)
-        lockfilename = os.path.join(parent, "lck_"+shortfn)
         createPrivateDir(parent)
         LOG.debug("Opening SURB log")
-        self.lockfile = Lockfile(lockfilename)
-        self.lockfile.acquire(blocking=1)
-        
+        # DOCDOC MUST HOLD LOCK WHILE OPEN
         self.log = anydbm.open(filename, 'c')
         lastCleaned = int(self.log['LAST_CLEANED'])
         if lastCleaned < time.time()-24*60*60 or forceClean:
@@ -929,7 +933,6 @@
 
     def close(self):
         self.log.close()
-        self.lockfile.release()
 
     def isSURBUsed(self, surb):
         hash = binascii.b2a_hex(sha1(surb.pack()))
@@ -946,7 +949,7 @@
     def clean(self, now=None):
         if now is None:
             now = time.time() + 60*60
-        allHashes = k.keys()
+        allHashes = self.log.keys()
         removed = []
         for hash in allHashes:
             if self.log[hash] < now:
@@ -961,82 +964,61 @@
     def __init__(self, directory, prng=None):
         self.dir = directory
         createPrivateDir(directory)
-        self.lockfile = Lockfile(os.path.join(self.dir, "lock"))
         if prng is not None:
             self.prng = prng
         else:
             self.prng = mixminion.Crypto.getCommonPRNG()
 
-    def lock(self):
-        self.lockfile.acquire(blocking=1)
-
-    def unlock(self):
-        self.lockfile.release()
-
     def poolPacket(self, message, firstHop):
-        try:
-            self.lock()
-            f, handle = self.prng.openNewFile(self.dir, "pkt_", 1)
-            cPickle.dump(("PACKET-0", message, firstHop,
-                          previousMidnight(time.time())), f, 1)
-            f.close()
-            return handle
-        finally:
-            self.unlock()
-            
+        clientLock()
+        f, handle = self.prng.openNewFile(self.dir, "pkt_", 1)
+        cPickle.dump(("PACKET-0", message, firstHop,
+                      previousMidnight(time.time())), f, 1)
+        f.close()
+        return handle
+    
     def getHandles(self):
-        try:
-            self.lock()
-            fnames = os.listdir(self.dir)
-            handles = []
-            for fname in fnames:
-                if fname.startswith("pkt_"):
-                    handles.append(fname[4:])
-            return handles
-        except:
-            self.unlock()
+        clientLock()
+        fnames = os.listdir(self.dir)
+        handles = []
+        for fname in fnames:
+            if fname.startswith("pkt_"):
+                handles.append(fname[4:])
+        return handles
 
     def getPacket(self, handle):
-        try:
-            self.lock()
-            f = open(os.path.join(self.dir, "pkt_"+handle), 'rb')
-            magic, message, firstHop, when = cPickle.load(f)
-            f.close()
-            if magic != "PACKET-0":
-                LOG.error("Unrecognized packet format for %s",handle)
-                return None
-            return message, firstHop, when
-        finally:
-            self.unlock()
+        f = open(os.path.join(self.dir, "pkt_"+handle), 'rb')
+        magic, message, firstHop, when = cPickle.load(f)
+        f.close()
+        if magic != "PACKET-0":
+            LOG.error("Unrecognized packet format for %s",handle)
+            return None
+        return message, firstHop, when
 
+    def packetExists(self, handle):
+        fname = os.path.join(self.dir, "pkt_"+handle)
+        return os.path.exists(fname)
+        
     def removePacket(self, handle):
         fname = os.path.join(self.dir, "pkt_"+handle)
-        try:
-            self.lock()
-            secureDelete(fname, blocking=1)
-        finally:
-            self.unlock()
+        secureDelete(fname, blocking=1)
 
     def inspectPool(self, now=None):
         if now is None:
             now = time.time()
-        try:
-            self.lock()
-            handles = self.getHandles()
-            timesByServer = {}
-            for h in handles:
-                _, server, when = self.getPacket(h)
-                timesByServer.setdefault(server, []).append(when)
-            for s in timesByServer.keys():
-                count = len(timesByServer[s])
-                oldest = min(timesByServer[s])
-                days = (now - oldest) / (24*60*60)
-                if days < 1:
-                    days = "<1"
-                print "%2d messages for server %s:%s (oldest is %s days old)"%(
-                    count, s.getAddr(), s.getPort(), days)
-        finally:
-            self.unlock()
+        handles = self.getHandles()
+        timesByServer = {}
+        for h in handles:
+            _, server, when = self.getPacket(h)
+            timesByServer.setdefault(server, []).append(when)
+        for s in timesByServer.keys():
+            count = len(timesByServer[s])
+            oldest = min(timesByServer[s])
+            days = (now - oldest) / (24*60*60)
+            if days < 1:
+                days = "<1"
+            print "%2d messages for server %s:%s (oldest is %s days old)"%(
+                count, s.getAddr(), s.getPort(), days)
 
 class MixminionClient:
     """Access point for client functionality.  Currently, this is limited
@@ -1128,6 +1110,7 @@
         """
         if now is None:
             now = time.time()
+        clientLock()
         surbLog = SURBLog(self.surbLogFilename)
         try:
             for surb in surbList:
@@ -1154,6 +1137,7 @@
             raise MixError("No usable SURBs found.")
         finally:
             surbLog.close()
+            clientUnlock()
 
     def sendMessages(self, msgList, server, noPool=0, lazyPool=0,
                      warnIfLost=1):
@@ -1183,20 +1167,30 @@
                 elif lazyPool:
                     self.poolMessages(msgList, server)
                 raise
-            for h in handles:
-                self.pool.removePacket(h)
+            try:
+                clientLock()
+                for h in handles:
+                    if self.pool.packetExists(h):
+                        self.pool.removePacket(h)
+            finally:
+                clientUnlock()
         except socket.error, e:
             raise MixError("Error sending packets: %s" % e)
             
     def flushPool(self):
         LOG.info("Flushing message pool")
         # XXXX This is inefficient in space!
-        handles = self.pool.getHandles()
-        LOG.info("Found %s pending messages", len(handles))
-        messagesByServer = {}
-        for h in handles:
-            message, server, _ = self.pool.getPacket(h)
-            messagesByServer.setdefault(server, []).append((message, h))
+        clientLock()
+        try:
+            handles = self.pool.getHandles()
+            LOG.info("Found %s pending messages", len(handles))
+            messagesByServer = {}
+            for h in handles:
+                message, server, _ = self.pool.getPacket(h)
+                messagesByServer.setdefault(server, []).append((message, h))
+        finally:
+            clientUnlock()
+            
         for server in messagesByServer.keys():
             LOG.debug("Sending %s messages to %s...",
                       len(messagesByServer[server]), server.getAddr())
@@ -1205,9 +1199,14 @@
             try:
                 self.sendMessages(msgs, server, noPool=1, warnIfLost=0)
                 LOG.debug("... messages sent.")
-                for h in handles:
-                    self.pool.removePacket(h)
-            except MixError, e:
+                try:
+                    clientLock()
+                    for h in handles:
+                        if self.pool.packetExists(h):
+                            self.pool.removePacket(h)
+                finally:
+                    clientUnlock()
+            except MixError:
                 LOG.error("Can't deliver messages to %s:%s; leaving in pool",
                           server.getAddr(), server.getPort())
         LOG.info("Pool flushed")
@@ -1215,9 +1214,13 @@
     def poolMessages(self, msgList, server):
         LOG.trace("Pooling messages")
         handles = []
-        for msg in msgList:
-            h = self.pool.poolPacket(msg, server)
-            handles.append(h)
+        try:
+            clientLock()
+            for msg in msgList:
+                h = self.pool.poolPacket(msg, server)
+                handles.append(h)
+        finally:
+            clientUnlock()
         LOG.trace("Messages pooled")
         return handles
 
@@ -1445,6 +1448,7 @@
             mixminion.Common.configureShredCommand(self.config)
             mixminion.Crypto.init_crypto(self.config)
             userdir = os.path.expanduser(self.config['User']['UserDir'])
+            configureClientLock(os.path.join(userdir, "lock"))
         else:
             if self.wantLog:
                 LOG.setMinSeverity("ERROR")
@@ -1464,10 +1468,10 @@
             assert self.wantKeystore
             if self.download != 0:
                 try:
-                    self.keystore.lock()
+                    clientLock()
                     self.keystore.updateDirectory(forceDownload=self.download)
                 finally:
-                    self.keystore.unlock()
+                    clientUnlock()
 
     def parsePath(self):
         if self.wantReplyPath and self.address is None:
@@ -1700,7 +1704,7 @@
     keystore = parser.keystore
 
     try:
-        keystore.lock()
+        clientLock()
         for filename in args:
             print "Importing from", filename
             try:
@@ -1708,7 +1712,7 @@
             except MixError, e:
                 print "Error while importing: %s" % e
     finally:
-        keystore.unlock()
+        clientUnlock()
         
     print "Done."
 
@@ -1765,10 +1769,10 @@
     parser.init()
     keystore = parser.keystore
     try:
-        keystore.lock()
+        clientLock()
         keystore.updateDirectory(forceDownload=1)
     finally:
-        keystore.unlock()
+        clientUnlock()
     print "Directory updated"
 
 _CLIENT_DECODE_USAGE = """\
@@ -1836,7 +1840,7 @@
     out.close()
 
 _GENERATE_SURB_USAGE = """\
-Usage: %s [options] <files>
+Usage: %s [options]
   This space is temporarily left blank.
 """
 def generateSURB(cmd, args):
@@ -1894,6 +1898,12 @@
         out.write(surb.packAsText())
     out.close()
 
+_INSPECT_SURBS_USAGE = """\
+Usage: %s [options] <files>
+  This space is temporarily left blank.
+  DOCDOC
+"""
+
 def inspectSURBs(cmd, args):
     options, args = getopt.getopt(args, "hvf:",
              ["help", "verbose", "config=", ])
@@ -1920,6 +1930,12 @@
         for surb in surbs:
             print surb.format()
 
+_FLUSH_POOL_USAGE = """\
+Usage: %s [options]
+  This space is temporarily left blank.
+  DOCDOC
+"""
+
 def flushPool(cmd, args):
     options, args = getopt.getopt(args, "hvf:",
              ["help", "verbose", "config=", ])
@@ -1936,6 +1952,13 @@
 
     client.flushPool()
 
+
+_LIST_POOL_USAGE = """\
+Usage: %s [options]
+  This space is temporarily left blank.
+  DOCDOC
+"""
+
 def listPool(cmd, args):
     options, args = getopt.getopt(args, "hvf:",
              ["help", "verbose", "config=", ])
@@ -1949,4 +1972,8 @@
 
     parser.init()
     client = parser.client
-    client.pool.inspectPool()
+    try:
+        clientLock()
+        client.pool.inspectPool()
+    finally:
+        clientUnlock()

Index: Common.py
===================================================================
RCS file: /home/minion/cvsroot/src/minion/lib/mixminion/Common.py,v
retrieving revision 1.55
retrieving revision 1.56
diff -u -d -r1.55 -r1.56
--- Common.py	6 Feb 2003 20:20:03 -0000	1.55
+++ Common.py	7 Feb 2003 17:23:11 -0000	1.56
@@ -871,6 +871,10 @@
 
     def acquire(self, contents="", blocking=0):
         "Raises IOError DOCDOC"
+        if self.count > 0:
+            self.count += 1
+            return
+
         assert self.fd is None
         self.fd = os.open(self.filename, os.O_RDWR|os.O_CREAT, 0600)
         try:

Index: benchmark.py
===================================================================
RCS file: /home/minion/cvsroot/src/minion/lib/mixminion/benchmark.py,v
retrieving revision 1.29
retrieving revision 1.30
diff -u -d -r1.29 -r1.30
--- benchmark.py	5 Feb 2003 06:28:31 -0000	1.29
+++ benchmark.py	7 Feb 2003 17:23:11 -0000	1.30
@@ -22,7 +22,7 @@
 from mixminion.BuildMessage import _buildHeader, buildForwardMessage, \
      compressData, uncompressData, _encodePayload, decodePayload
 from mixminion.Common import secureDelete, installSIGCHLDHandler, \
-     waitForChildren, formatBase64
+     waitForChildren, formatBase64, Lockfile
 from mixminion.Crypto import *
 from mixminion.Crypto import OAEP_PARAMETER
 from mixminion.Crypto import _add_oaep_padding, _check_oaep_padding
@@ -610,6 +610,15 @@
     dname = mix_mktemp(".d")
 
     os.mkdir(dname)
+
+    lockfile = Lockfile(os.path.join("dname"))
+    t1 = time()
+    for _ in xrange(2000):
+        lockfile.acquire(1)
+        lockfile.release()
+    t = time()-t1
+    print "Lockfile: lock+unlock", timestr(t/2000.)
+    
     for i in xrange(200):
         f = open(os.path.join(dname, str(i)), 'wb')
         f.write(s32K)

Index: test.py
===================================================================
RCS file: /home/minion/cvsroot/src/minion/lib/mixminion/test.py,v
retrieving revision 1.80
retrieving revision 1.81
diff -u -d -r1.80 -r1.81
--- test.py	6 Feb 2003 20:20:03 -0000	1.80
+++ test.py	7 Feb 2003 17:23:11 -0000	1.81
@@ -5302,6 +5302,7 @@
 
 def testAll(name, args):
     init_crypto()
+    mixminion.ClientMain.configureClientLock(mix_mktemp())
 
     # Suppress 'files-can't-be-securely-deleted' message while testing
     LOG.setMinSeverity("FATAL")