[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]
[minion-cvs] Refactor client locking (untested)
Update of /home/minion/cvsroot/src/minion/lib/mixminion
In directory moria.mit.edu:/tmp/cvs-serv10266/lib/mixminion
Modified Files:
ClientMain.py Common.py benchmark.py test.py
Log Message:
Refactor client locking (untested)
Index: ClientMain.py
===================================================================
RCS file: /home/minion/cvsroot/src/minion/lib/mixminion/ClientMain.py,v
retrieving revision 1.48
retrieving revision 1.49
diff -u -d -r1.48 -r1.49
--- ClientMain.py 6 Feb 2003 20:20:03 -0000 1.48
+++ ClientMain.py 7 Feb 2003 17:23:11 -0000 1.49
@@ -53,6 +53,20 @@
MIXMINION_DIRECTORY_FINGERPRINT = "CD80DD1B8BE7CA2E13C928D57499992D56579CCD"
+
+_CLIENT_LOCKFILE = None
+
+def clientLock():
+ assert _CLIENT_LOCKFILE is not None
+ _CLIENT_LOCKFILE.acquire(blocking=1)
+
+def clientUnlock():
+ _CLIENT_LOCKFILE.release()
+
+def configureClientLock(filename):
+ global _CLIENT_LOCKFILE
+ _CLIENT_LOCKFILE = Lockfile(filename)
+
#XXXX003 rename to server list
class ClientKeystore:
"""A ClientKeystore manages a list of server descriptors, either
@@ -88,14 +102,13 @@
createPrivateDir(self.dir)
createPrivateDir(os.path.join(self.dir, "imported"))
self.digestMap = {}
- self.lockfile = Lockfile(os.path.join(self.dir, "lock"))
self.__scanning = 0
try:
- self.lock()
+ clientLock()
self.__load()
self.clean()
finally:
- self.unlock()
+ clientUnlock()
#XXXX003 Check version against directory's Recommended-Software field.
# Mixminion 0.0.1 used an obsolete directory-full-of-servers in
@@ -112,12 +125,6 @@
except OSError, e:
LOG.warn("Failed: %s", e)
- def lock(self):
- self.lockfile.acquire(blocking=1)
-
- def unlock(self):
- self.lockfile.release()
-
def updateDirectory(self, forceDownload=0, now=None):
"""Download a directory from the network as needed."""
if now is None:
@@ -916,12 +923,9 @@
# DB holds HEX(hash) -> str(expiry)
def __init__(self, filename, forceClean=0):
parent, shortfn = os.path.split(filename)
- lockfilename = os.path.join(parent, "lck_"+shortfn)
createPrivateDir(parent)
LOG.debug("Opening SURB log")
- self.lockfile = Lockfile(lockfilename)
- self.lockfile.acquire(blocking=1)
-
+ # DOCDOC MUST HOLD LOCK WHILE OPEN
self.log = anydbm.open(filename, 'c')
lastCleaned = int(self.log['LAST_CLEANED'])
if lastCleaned < time.time()-24*60*60 or forceClean:
@@ -929,7 +933,6 @@
def close(self):
self.log.close()
- self.lockfile.release()
def isSURBUsed(self, surb):
hash = binascii.b2a_hex(sha1(surb.pack()))
@@ -946,7 +949,7 @@
def clean(self, now=None):
if now is None:
now = time.time() + 60*60
- allHashes = k.keys()
+ allHashes = self.log.keys()
removed = []
for hash in allHashes:
if self.log[hash] < now:
@@ -961,82 +964,61 @@
def __init__(self, directory, prng=None):
self.dir = directory
createPrivateDir(directory)
- self.lockfile = Lockfile(os.path.join(self.dir, "lock"))
if prng is not None:
self.prng = prng
else:
self.prng = mixminion.Crypto.getCommonPRNG()
- def lock(self):
- self.lockfile.acquire(blocking=1)
-
- def unlock(self):
- self.lockfile.release()
-
def poolPacket(self, message, firstHop):
- try:
- self.lock()
- f, handle = self.prng.openNewFile(self.dir, "pkt_", 1)
- cPickle.dump(("PACKET-0", message, firstHop,
- previousMidnight(time.time())), f, 1)
- f.close()
- return handle
- finally:
- self.unlock()
-
+ clientLock()
+ f, handle = self.prng.openNewFile(self.dir, "pkt_", 1)
+ cPickle.dump(("PACKET-0", message, firstHop,
+ previousMidnight(time.time())), f, 1)
+ f.close()
+ return handle
+
def getHandles(self):
- try:
- self.lock()
- fnames = os.listdir(self.dir)
- handles = []
- for fname in fnames:
- if fname.startswith("pkt_"):
- handles.append(fname[4:])
- return handles
- except:
- self.unlock()
+ clientLock()
+ fnames = os.listdir(self.dir)
+ handles = []
+ for fname in fnames:
+ if fname.startswith("pkt_"):
+ handles.append(fname[4:])
+ return handles
def getPacket(self, handle):
- try:
- self.lock()
- f = open(os.path.join(self.dir, "pkt_"+handle), 'rb')
- magic, message, firstHop, when = cPickle.load(f)
- f.close()
- if magic != "PACKET-0":
- LOG.error("Unrecognized packet format for %s",handle)
- return None
- return message, firstHop, when
- finally:
- self.unlock()
+ f = open(os.path.join(self.dir, "pkt_"+handle), 'rb')
+ magic, message, firstHop, when = cPickle.load(f)
+ f.close()
+ if magic != "PACKET-0":
+ LOG.error("Unrecognized packet format for %s",handle)
+ return None
+ return message, firstHop, when
+ def packetExists(self, handle):
+ fname = os.path.join(self.dir, "pkt_"+handle)
+ return os.path.exists(fname)
+
def removePacket(self, handle):
fname = os.path.join(self.dir, "pkt_"+handle)
- try:
- self.lock()
- secureDelete(fname, blocking=1)
- finally:
- self.unlock()
+ secureDelete(fname, blocking=1)
def inspectPool(self, now=None):
if now is None:
now = time.time()
- try:
- self.lock()
- handles = self.getHandles()
- timesByServer = {}
- for h in handles:
- _, server, when = self.getPacket(h)
- timesByServer.setdefault(server, []).append(when)
- for s in timesByServer.keys():
- count = len(timesByServer[s])
- oldest = min(timesByServer[s])
- days = (now - oldest) / (24*60*60)
- if days < 1:
- days = "<1"
- print "%2d messages for server %s:%s (oldest is %s days old)"%(
- count, s.getAddr(), s.getPort(), days)
- finally:
- self.unlock()
+ handles = self.getHandles()
+ timesByServer = {}
+ for h in handles:
+ _, server, when = self.getPacket(h)
+ timesByServer.setdefault(server, []).append(when)
+ for s in timesByServer.keys():
+ count = len(timesByServer[s])
+ oldest = min(timesByServer[s])
+ days = (now - oldest) / (24*60*60)
+ if days < 1:
+ days = "<1"
+ print "%2d messages for server %s:%s (oldest is %s days old)"%(
+ count, s.getAddr(), s.getPort(), days)
class MixminionClient:
"""Access point for client functionality. Currently, this is limited
@@ -1128,6 +1110,7 @@
"""
if now is None:
now = time.time()
+ clientLock()
surbLog = SURBLog(self.surbLogFilename)
try:
for surb in surbList:
@@ -1154,6 +1137,7 @@
raise MixError("No usable SURBs found.")
finally:
surbLog.close()
+ clientUnlock()
def sendMessages(self, msgList, server, noPool=0, lazyPool=0,
warnIfLost=1):
@@ -1183,20 +1167,30 @@
elif lazyPool:
self.poolMessages(msgList, server)
raise
- for h in handles:
- self.pool.removePacket(h)
+ try:
+ clientLock()
+ for h in handles:
+ if self.pool.packetExists(h):
+ self.pool.removePacket(h)
+ finally:
+ clientUnlock()
except socket.error, e:
raise MixError("Error sending packets: %s" % e)
def flushPool(self):
LOG.info("Flushing message pool")
# XXXX This is inefficient in space!
- handles = self.pool.getHandles()
- LOG.info("Found %s pending messages", len(handles))
- messagesByServer = {}
- for h in handles:
- message, server, _ = self.pool.getPacket(h)
- messagesByServer.setdefault(server, []).append((message, h))
+ clientLock()
+ try:
+ handles = self.pool.getHandles()
+ LOG.info("Found %s pending messages", len(handles))
+ messagesByServer = {}
+ for h in handles:
+ message, server, _ = self.pool.getPacket(h)
+ messagesByServer.setdefault(server, []).append((message, h))
+ finally:
+ clientUnlock()
+
for server in messagesByServer.keys():
LOG.debug("Sending %s messages to %s...",
len(messagesByServer[server]), server.getAddr())
@@ -1205,9 +1199,14 @@
try:
self.sendMessages(msgs, server, noPool=1, warnIfLost=0)
LOG.debug("... messages sent.")
- for h in handles:
- self.pool.removePacket(h)
- except MixError, e:
+ try:
+ clientLock()
+ for h in handles:
+ if self.pool.packetExists(h):
+ self.pool.removePacket(h)
+ finally:
+ clientUnlock()
+ except MixError:
LOG.error("Can't deliver messages to %s:%s; leaving in pool",
server.getAddr(), server.getPort())
LOG.info("Pool flushed")
@@ -1215,9 +1214,13 @@
def poolMessages(self, msgList, server):
LOG.trace("Pooling messages")
handles = []
- for msg in msgList:
- h = self.pool.poolPacket(msg, server)
- handles.append(h)
+ try:
+ clientLock()
+ for msg in msgList:
+ h = self.pool.poolPacket(msg, server)
+ handles.append(h)
+ finally:
+ clientUnlock()
LOG.trace("Messages pooled")
return handles
@@ -1445,6 +1448,7 @@
mixminion.Common.configureShredCommand(self.config)
mixminion.Crypto.init_crypto(self.config)
userdir = os.path.expanduser(self.config['User']['UserDir'])
+ configureClientLock(os.path.join(userdir, "lock"))
else:
if self.wantLog:
LOG.setMinSeverity("ERROR")
@@ -1464,10 +1468,10 @@
assert self.wantKeystore
if self.download != 0:
try:
- self.keystore.lock()
+ clientLock()
self.keystore.updateDirectory(forceDownload=self.download)
finally:
- self.keystore.unlock()
+ clientUnlock()
def parsePath(self):
if self.wantReplyPath and self.address is None:
@@ -1700,7 +1704,7 @@
keystore = parser.keystore
try:
- keystore.lock()
+ clientLock()
for filename in args:
print "Importing from", filename
try:
@@ -1708,7 +1712,7 @@
except MixError, e:
print "Error while importing: %s" % e
finally:
- keystore.unlock()
+ clientUnlock()
print "Done."
@@ -1765,10 +1769,10 @@
parser.init()
keystore = parser.keystore
try:
- keystore.lock()
+ clientLock()
keystore.updateDirectory(forceDownload=1)
finally:
- keystore.unlock()
+ clientUnlock()
print "Directory updated"
_CLIENT_DECODE_USAGE = """\
@@ -1836,7 +1840,7 @@
out.close()
_GENERATE_SURB_USAGE = """\
-Usage: %s [options] <files>
+Usage: %s [options]
This space is temporarily left blank.
"""
def generateSURB(cmd, args):
@@ -1894,6 +1898,12 @@
out.write(surb.packAsText())
out.close()
+_INSPECT_SURBS_USAGE = """\
+Usage: %s [options] <files>
+ This space is temporarily left blank.
+ DOCDOC
+"""
+
def inspectSURBs(cmd, args):
options, args = getopt.getopt(args, "hvf:",
["help", "verbose", "config=", ])
@@ -1920,6 +1930,12 @@
for surb in surbs:
print surb.format()
+_FLUSH_POOL_USAGE = """\
+Usage: %s [options]
+ This space is temporarily left blank.
+ DOCDOC
+"""
+
def flushPool(cmd, args):
options, args = getopt.getopt(args, "hvf:",
["help", "verbose", "config=", ])
@@ -1936,6 +1952,13 @@
client.flushPool()
+
+_LIST_POOL_USAGE = """\
+Usage: %s [options]
+ This space is temporarily left blank.
+ DOCDOC
+"""
+
def listPool(cmd, args):
options, args = getopt.getopt(args, "hvf:",
["help", "verbose", "config=", ])
@@ -1949,4 +1972,8 @@
parser.init()
client = parser.client
- client.pool.inspectPool()
+ try:
+ clientLock()
+ client.pool.inspectPool()
+ finally:
+ clientUnlock()
Index: Common.py
===================================================================
RCS file: /home/minion/cvsroot/src/minion/lib/mixminion/Common.py,v
retrieving revision 1.55
retrieving revision 1.56
diff -u -d -r1.55 -r1.56
--- Common.py 6 Feb 2003 20:20:03 -0000 1.55
+++ Common.py 7 Feb 2003 17:23:11 -0000 1.56
@@ -871,6 +871,10 @@
def acquire(self, contents="", blocking=0):
"Raises IOError DOCDOC"
+ if self.count > 0:
+ self.count += 1
+ return
+
assert self.fd is None
self.fd = os.open(self.filename, os.O_RDWR|os.O_CREAT, 0600)
try:
Index: benchmark.py
===================================================================
RCS file: /home/minion/cvsroot/src/minion/lib/mixminion/benchmark.py,v
retrieving revision 1.29
retrieving revision 1.30
diff -u -d -r1.29 -r1.30
--- benchmark.py 5 Feb 2003 06:28:31 -0000 1.29
+++ benchmark.py 7 Feb 2003 17:23:11 -0000 1.30
@@ -22,7 +22,7 @@
from mixminion.BuildMessage import _buildHeader, buildForwardMessage, \
compressData, uncompressData, _encodePayload, decodePayload
from mixminion.Common import secureDelete, installSIGCHLDHandler, \
- waitForChildren, formatBase64
+ waitForChildren, formatBase64, Lockfile
from mixminion.Crypto import *
from mixminion.Crypto import OAEP_PARAMETER
from mixminion.Crypto import _add_oaep_padding, _check_oaep_padding
@@ -610,6 +610,15 @@
dname = mix_mktemp(".d")
os.mkdir(dname)
+
+ lockfile = Lockfile(os.path.join("dname"))
+ t1 = time()
+ for _ in xrange(2000):
+ lockfile.acquire(1)
+ lockfile.release()
+ t = time()-t1
+ print "Lockfile: lock+unlock", timestr(t/2000.)
+
for i in xrange(200):
f = open(os.path.join(dname, str(i)), 'wb')
f.write(s32K)
Index: test.py
===================================================================
RCS file: /home/minion/cvsroot/src/minion/lib/mixminion/test.py,v
retrieving revision 1.80
retrieving revision 1.81
diff -u -d -r1.80 -r1.81
--- test.py 6 Feb 2003 20:20:03 -0000 1.80
+++ test.py 7 Feb 2003 17:23:11 -0000 1.81
@@ -5302,6 +5302,7 @@
def testAll(name, args):
init_crypto()
+ mixminion.ClientMain.configureClientLock(mix_mktemp())
# Suppress 'files-can't-be-securely-deleted' message while testing
LOG.setMinSeverity("FATAL")