[Author Prev][Author Next][Thread Prev][Thread Next][Author Index][Thread Index]
[minion-cvs] Make ClientDirectory threadsafe, and note that ClientDi...
Update of /home/minion/cvsroot/src/minion/lib/mixminion
In directory moria.mit.edu:/tmp/cvs-serv23590/lib/mixminion
Modified Files:
ClientDirectory.py ClientMain.py test.py
Log Message:
Make ClientDirectory threadsafe, and note that ClientDirectory has gotten entirely too big.
Index: ClientDirectory.py
===================================================================
RCS file: /home/minion/cvsroot/src/minion/lib/mixminion/ClientDirectory.py,v
retrieving revision 1.24
retrieving revision 1.25
diff -u -d -r1.24 -r1.25
--- ClientDirectory.py 7 Jan 2004 02:50:08 -0000 1.24
+++ ClientDirectory.py 8 Jan 2004 22:35:24 -0000 1.25
@@ -17,11 +17,11 @@
import re
import socket
import stat
+import threading
import time
import types
import urllib2
-import mixminion.ClientMain #XXXX -- it would be better not to need this.
import mixminion.Config
import mixminion.Crypto
import mixminion.NetUtils
@@ -34,16 +34,21 @@
from mixminion.Packet import MBOX_TYPE, SMTP_TYPE, DROP_TYPE, FRAGMENT_TYPE, \
parseMBOXInfo, parseRelayInfoByType, parseSMTPInfo, ParseError, \
ServerSideFragmentedMessage
+from mixminion.ThreadUtils import RWLock
# FFFF This should be made configurable and adjustable.
MIXMINION_DIRECTORY_URL = "http://mixminion.net/directory/Directory.gz"
MIXMINION_DIRECTORY_FINGERPRINT = "CD80DD1B8BE7CA2E13C928D57499992D56579CCD"
+#XXXX This class has become unwieldy. It should get refactored into:
+#XXXX "abstract server set", "directory-based server set", "disk-backed server
+#XXXX set", and "path generator".
+
class ClientDirectory:
"""A ClientDirectory manages a list of server descriptors, either
imported from the command line or from a directory."""
##Fields:
- # dir: directory where we store everything.
+ # dir: directory where we store everything. This field is immutable.
# lastModified: time when we last modified this directory.
# lastDownload: time when we last downloaded a directory
# serverList: List of (ServerInfo, 'D'|'D-'|'I:filename') tuples. The
@@ -54,15 +59,22 @@
# digestMap: Map of (Digest -> 'D'|'D-'|'I:filename').
# byNickname: Map from nickname.lower() to list of (ServerInfo, source)
# tuples.
- # byKeyID: Map from desc.getKeyDigest() to list of ServerInfo.
+ # byKeyID: Map from desc.getKeyDigest() to list of (ServerInfo,source)
# byCapability: Map from capability ('mbox'/'smtp'/'relay'/None) to
# list of (ServerInfo, source) tuples.
# allServers: Same as byCapability[None]
# __scanning: Flag to prevent recursive invocation of self.rescan().
+ # __downloading: Flag to prevent simultaneous invocation of
+ # downloadDirectory()
# clientVersions: String of allowable client versions as retrieved
# from most recent directory.
+ # serverVersions: String of allowable server versions as retrieved
+ # from most recent directory.
# goodServerNicknames: A map from lowercased nicknames of recommended
# servers to 1.
+ # _lock: An instance of RWLock, used to synchonize access to all fields.
+ # _diskLock: An instance of RLock or Lockfile, used to synchronize
+ # access to the disk.
## Layout:
# DIR/cache: A cPickled tuple of ("ClientKeystore-0.3",
# lastModified, lastDownload, clientVersions, serverList,
@@ -77,20 +89,27 @@
# much need to do this at the client side.)
DEFAULT_REQUIRED_LIFETIME = 1
- def __init__(self, directory):
+ def __init__(self, directory, diskLock=None):
"""Create a new ClientDirectory to keep directories and descriptors
- under <directory>."""
+ under <directory>. If diskLock is provided, acquire diskLock
+ before accessing the disk."""
self.dir = directory
createPrivateDir(self.dir)
createPrivateDir(os.path.join(self.dir, "imported"))
self.digestMap = {}
self.__scanning = 0
+ self.__downloading = 0
+ self._lock = RWLock()
+ if diskLock is None:
+ self._diskLock = threading.RLock()
+ else:
+ self._diskLock = diskLock
try:
- mixminion.ClientMain.clientLock() # XXXX disentangle
+ self._diskLock.acquire()
self.__load()
self.clean()
finally:
- mixminion.ClientMain.clientUnlock() # XXXX
+ self._diskLock.release()
def updateDirectory(self, forceDownload=0, timeout=None, now=None):
"""Download a directory from the network as needed."""
@@ -106,6 +125,25 @@
"""Download a new directory from the network, validate it, and
rescan its servers. If the operation doesn't complete within
timeout seconds, raise an error."""
+ try:
+ self._lock.write_in()
+ if self.__downloading:
+ LOG.info("Download already in progress")
+ return 0
+ self.__downloading = 1
+ finally:
+ self._lock.write_out()
+
+ try:
+ self._downloadDirectory(timeout)
+ finally:
+ self._lock.write_in()
+ self.__downloading = 0
+ self._lock.write_out()
+
+
+ def _downloadDirectory(self, timeout=None):
+ """Helper: implements downloadDirectory but doesn't hold lock."""
# Start downloading the directory.
url = MIXMINION_DIRECTORY_URL
LOG.info("Downloading directory from %s", url)
@@ -153,10 +191,15 @@
outfile.close()
# Open and validate the directory
LOG.info("Validating directory")
+
+ self._lock.read_in()
+ digestMap = self.digestMap
+ self._lock.read_out()
+
try:
directory = mixminion.ServerInfo.ServerDirectory(
fname=fname,
- validatedDigests=self.digestMap)
+ validatedDigests=digestMap)
except mixminion.Config.ConfigError, e:
raise MixFatalError("Downloaded invalid directory: %s" % e)
@@ -166,13 +209,18 @@
if fp and mixminion.Crypto.pk_fingerprint(identity) != fp:
raise MixFatalError("Bad identity key on directory")
- tryUnlink(os.path.join(self.dir, "cache"))
-
- # Install the new directory
- if gz:
- replaceFile(fname, os.path.join(self.dir, "dir.gz"))
- else:
- replaceFile(fname, os.path.join(self.dir, "dir"))
+ try:
+ self._lock.write_in()
+ self._diskLock.acquire()
+ # Install the new directory
+ tryUnlink(os.path.join(self.dir, "cache"))
+ if gz:
+ replaceFile(fname, os.path.join(self.dir, "dir.gz"))
+ else:
+ replaceFile(fname, os.path.join(self.dir, "dir"))
+ finally:
+ self._diskLock.release()
+ self._lock.write_out()
# And regenerate the cache.
self.rescan()
@@ -181,46 +229,64 @@
def rescan(self, force=None, now=None):
"""Regenerate the cache based on files on the disk."""
- self.lastModified = self.lastDownload = -1
- self.serverList = []
- self.fullServerList = []
- self.clientVersions = None
- self.goodServerNicknames = {}
+
+ # These variables shadow fields of self, until we replace them at the
+ # end.
+ s_lastModified = s_lastDownload = -1
+ s_serverList = []
+ s_fullServerList = []
+ s_clientVersions = None
+ s_serverVersions = None
+ s_goodServerNicknames = {}
if force:
- self.digestMap = {}
+ s_digestMap = {}
+ else:
+ self._lock.read_in()
+ s_digestMap = self.digestMap.copy()
+ self._lock.read_out()
# Read the servers from the directory.
gzipFile = os.path.join(self.dir, "dir.gz")
dirFile = os.path.join(self.dir, "dir")
for fname in gzipFile, dirFile:
- if not os.path.exists(fname): continue
- self.lastDownload = self.lastModified = \
- os.stat(fname)[stat.ST_MTIME]
+ self._diskLock.acquire()
try:
- directory = mixminion.ServerInfo.ServerDirectory(
- fname=fname,
- validatedDigests=self.digestMap)
- except mixminion.Config.ConfigError:
- LOG.warn("Ignoring invalid directory (!)")
+ if not os.path.exists(fname): continue
+ s_lastDownload = s_lastModified = \
+ os.stat(fname)[stat.ST_MTIME]
+ try:
+ directory = mixminion.ServerInfo.ServerDirectory(
+ fname=fname,
+ validatedDigests=s_digestMap)
+ except mixminion.Config.ConfigError:
+ LOG.warn("Ignoring invalid directory (!)")
+ directory = None
+ finally:
+ self._diskLock.release()
+
+ if directory is None:
continue
- for s in directory.getServers():
- self.serverList.append((s, 'D'))
- self.digestMap[s.getDigest()] = 'D'
- self.goodServerNicknames[s.getNickname().lower()] = 1
+ for server in directory.getServers():
+ s_serverList.append((server, 'D'))
+ s_digestMap[server.getDigest()] = 'D'
+ s_goodServerNicknames[server.getNickname().lower()] = 1
- for s in directory.getAllServers():
- if self.goodServerNicknames.has_key(s.getNickname().lower()):
+ for server in directory.getAllServers():
+ lcnick = server.getNickname().lower()
+ if s_goodServerNicknames.has_key(lcnick):
where = 'D'
else:
where = 'D-'
- self.fullServerList.append((s, where))
- self.digestMap[s.getDigest()] = where
+ s_fullServerList.append((server, where))
+ s_digestMap[server.getDigest()] = where
- self.clientVersions = (
+ s_clientVersions = (
directory['Recommended-Software'].get("MixminionClient"))
+ s_clientVersions = (
+ directory['Recommended-Software'].get("MixminionServer"))
break
# Now check the server in DIR/servers.
@@ -230,25 +296,40 @@
# Try to read a file: is it a server descriptor?
p = os.path.join(serverDir, fn)
try:
- # Use validatedDigests *only* when not explicitly forced.
+ # If force is true, we'll always re-check signaturues.
+ # Otherwise, we do so only with unrecognized servers.
info = mixminion.ServerInfo.ServerInfo(fname=p, assumeValid=0,
- validatedDigests=self.digestMap)
+ validatedDigests=s_digestMap)
except mixminion.Config.ConfigError:
LOG.warn("Invalid server descriptor %s", p)
continue
mtime = os.stat(p)[stat.ST_MTIME]
- if mtime > self.lastModified:
- self.lastModifed = mtime
- self.serverList.append((info, "I:%s"%fn))
- self.fullServerList.append((info, "I:%s"%fn))
- self.digestMap[info.getDigest()] = "I:%s"%fn
- self.goodServerNicknames[info.getNickname().lower()] = 1
+ if mtime > s_lastModified:
+ s_lastModifed = mtime
+ s_serverList.append((info, "I:%s"%fn))
+ s_fullServerList.append((info, "I:%s"%fn))
+ s_digestMap[info.getDigest()] = "I:%s"%fn
+ s_goodServerNicknames[info.getNickname().lower()] = 1
- # Regenerate the cache
- self.__save()
- # Now try reloading, to make sure we can, and to get __rebuildTables.
- self.__scanning = 1
- self.__load()
+ # Now, finally, we're done. Replace the state of this class.
+ try:
+ self._lock.write_in()
+ self.lastModified = s_lastModified
+ self.lastDownload = s_lastDownload
+ self.serverList = s_serverList
+ self.fullServerList = s_fullServerList
+ self.clientVersions = s_clientVersions
+ self.serverVersions = s_serverVersions
+ self.goodServerNicknames = s_goodServerNicknames
+ self.digestMap = s_digestMap
+
+ # Regenerate the cache
+ self.__save()
+ # Now try reloading, to make sure we can, and for __rebuildTables.
+ self.__scanning = 1
+ self.__load()
+ finally:
+ self._lock.write_out()
def __load(self):
"""Helper method. Read the cached parsed descriptors from disk."""
@@ -256,8 +337,9 @@
cached = readPickled(os.path.join(self.dir, "cache"))
magic = cached[0]
if magic == self.MAGIC:
- _, self.lastModified, self.lastDownload, self.clientVersions, \
- self.serverList, self.fullServerList, self.digestMap \
+ _, self.lastModified, self.lastDownload, \
+ self.clientVersions, self.serverList, \
+ self.fullServerList, self.digestMap \
= cached
self.__rebuildTables()
return
@@ -272,12 +354,16 @@
self.rescan()
def __save(self):
- """Helper method. Recreate the cache on disk."""
- data = (self.MAGIC,
- self.lastModified, self.lastDownload,
- self.clientVersions, self.serverList, self.fullServerList,
- self.digestMap)
- writePickled(os.path.join(self.dir, "cache"), data)
+ """Helper method. Recreate the cache on disk. Must hold read lock."""
+ self._diskLock.acquire()
+ try:
+ data = (self.MAGIC,
+ self.lastModified, self.lastDownload,
+ self.clientVersions, self.serverList, self.fullServerList,
+ self.digestMap)
+ writePickled(os.path.join(self.dir, "cache"), data)
+ finally:
+ self._diskLock.release()
def _installAsKeyIDResolver(self):
"""Use this ClientDirectory to identify servers in calls to
@@ -286,77 +372,100 @@
def importFromFile(self, filename):
"""Import a new server descriptor stored in 'filename'"""
+ self._lock.write_in()
+ self._lock.write_out()
+ self._lock.read_in()
+ digestMap = self.digestMap
+ self._lock.read_out()
contents = readPossiblyGzippedFile(filename)
info = mixminion.ServerInfo.ServerInfo(string=contents,
- validatedDigests=self.digestMap)
+ validatedDigests=digestMap)
nickname = info.getNickname()
lcnickname = nickname.lower()
identity = info.getIdentity()
- # Make sure that the identity key is consistent with what we know.
- for s, _ in self.serverList:
- if s.getNickname() == nickname:
- if not mixminion.Crypto.pk_same_public_key(identity,
- s.getIdentity()):
- raise MixError("Identity key changed for server %s in %s"%(
- nickname, filename))
- # Have we already imported this server?
- if self.digestMap.get(info.getDigest(), "X").startswith("I:"):
- raise UIError("Server descriptor is already imported")
+ self._lock.read_in()
+ try:
+ # Make sure that the identity key is consistent with what we know.
+ for s, _ in self.serverList:
+ if s.getNickname() == nickname:
+ if not mixminion.Crypto.pk_same_public_key(identity,
+ s.getIdentity()):
+ raise MixError(
+ "Identity key changed for server %s in %s"%(
+ nickname, filename))
- # Is the server expired?
- if info.isExpiredAt(time.time()):
- raise UIError("Server descriptor is expired")
+ # Have we already imported this server?
+ if self.digestMap.get(info.getDigest(), "X").startswith("I:"):
+ raise UIError("Server descriptor is already imported")
- # Is the server superseded?
- if self.byNickname.has_key(lcnickname):
- if info.isSupersededBy([s for s,_ in self.byNickname[lcnickname]]):
- raise UIError("Server descriptor is already superseded")
+ # Is the server expired?
+ if info.isExpiredAt(time.time()):
+ raise UIError("Server descriptor is expired")
- # Copy the server into DIR/servers.
- fnshort = "%s-%s"%(nickname, formatFnameTime())
- fname = os.path.join(self.dir, "imported", fnshort)
- f = openUnique(fname)[0]
- f.write(contents)
- f.close()
- # Now store into the cache.
- fnshort = os.path.split(fname)[1]
- self.serverList.append((info, 'I:%s'%fnshort))
- self.fullServerList.append((info, 'I:%s'%fnshort))
- self.digestMap[info.getDigest()] = 'I:%s'%fnshort
- self.lastModified = time.time()
- self.__save()
- self.__rebuildTables()
+ # Is the server superseded?
+ if self.byNickname.has_key(lcnickname):
+ if info.isSupersededBy(
+ [s for s,_ in self.byNickname[lcnickname]]):
+ raise UIError("Server descriptor is already superseded")
+ finally:
+ self._lock.read_out()
+
+ self._lock.write_in()
+ try:
+
+ self._diskLock.acquire()
+ try:
+ # Copy the server into DIR/servers.
+ fnshort = "%s-%s"%(nickname, formatFnameTime())
+ fname = os.path.join(self.dir, "imported", fnshort)
+ f = openUnique(fname)[0]
+ f.write(contents)
+ f.close()
+ finally:
+ self._diskLock.release()
+
+ # Now store into the cache.
+ fnshort = os.path.split(fname)[1]
+ self.serverList.append((info, 'I:%s'%fnshort))
+ self.fullServerList.append((info, 'I:%s'%fnshort))
+ self.digestMap[info.getDigest()] = 'I:%s'%fnshort
+ self.lastModified = time.time()
+ self.__rebuildTables()
+ except:
+ self._lock.write_out()
+ raise
+
+ self._lock.write_to_read()
+ try:
+ self.__save()
+ finally:
+ self._lock.read_out()
def expungeByNickname(self, nickname):
"""Remove all imported (non-directory) server nicknamed 'nickname'."""
lcnickname = nickname.lower()
- n = 0 # number removed
- newList = [] # replacement for serverList.
+ badSources = {}
+ try:
+ self._lock.write_in()
+ for info, source in self.serverList:
+ if source[0]=='D' or info.getNickname().lower() != lcnickname:
+ continue
+ badSources[source] = 1
+ finally:
+ self._lock.write_out()
- for info, source in self.serverList:
- if source == 'D' or info.getNickname().lower() != lcnickname:
- newList.append((info, source))
- continue
- n += 1
- try:
- fn = source[2:]
- os.unlink(os.path.join(self.dir, "imported", fn))
- except OSError, e:
- LOG.error("Couldn't remove %s: %s", fn, e)
+ if badSources:
+ self.__removeBySource(badSources)
- self.serverList = newList
- # Recreate cache if needed.
- if n:
- self.rescan()
- return n
+ return len(badSources)
def __rebuildTables(self):
"""Helper method. Reconstruct byNickname, byKeyID,
allServers, and byCapability from the internal start of
- this object. """
+ this object. Caller must hold write lock."""
self.byNickname = {}
self.byKeyID = {}
self.allServers = []
@@ -396,40 +505,45 @@
If 'goodOnly' is true, use only recommended servers.
"""
- result = {}
- if not self.fullServerList:
- return {}
- dirFeatures = [ 'status' ]
- resFeatures = []
- for f in features:
- if f.lower() in dirFeatures:
- resFeatures.append((f, ('+', f.lower())))
- else:
- feature = mixminion.Config.resolveFeatureName(
- f, mixminion.ServerInfo.ServerInfo)
- resFeatures.append((f, feature))
- for sd, _ in self.fullServerList:
- if at and not sd.isValidAt(at):
- continue
- nickname = sd.getNickname()
- isGood = self.goodServerNicknames.get(nickname.lower(), 0)
- if goodOnly and not isGood:
- continue
- va, vu = sd['Server']['Valid-After'], sd['Server']['Valid-Until']
- d = result.setdefault(nickname, {}).setdefault((va,vu), {})
- for feature,(sec,ent) in resFeatures:
- if sec == '+':
- if ent == 'status':
- if isGood:
- d['status'] = "(ok)"
+ self._lock.read_in()
+ try:
+ result = {}
+ if not self.fullServerList:
+ return {}
+ dirFeatures = [ 'status' ]
+ resFeatures = []
+ for f in features:
+ if f.lower() in dirFeatures:
+ resFeatures.append((f, ('+', f.lower())))
+ else:
+ feature = mixminion.Config.resolveFeatureName(
+ f, mixminion.ServerInfo.ServerInfo)
+ resFeatures.append((f, feature))
+ for sd, _ in self.fullServerList:
+ if at and not sd.isValidAt(at):
+ continue
+ nickname = sd.getNickname()
+ isGood = self.goodServerNicknames.get(nickname.lower(), 0)
+ if goodOnly and not isGood:
+ continue
+ va = sd['Server']['Valid-After']
+ vu = sd['Server']['Valid-Until']
+ d = result.setdefault(nickname, {}).setdefault((va,vu), {})
+ for feature,(sec,ent) in resFeatures:
+ if sec == '+':
+ if ent == 'status':
+ if isGood:
+ d['status'] = "(ok)"
+ else:
+ d['status'] = "(not recommended)"
else:
- d['status'] = "(not recommended)"
+ raise AssertionError # Unreached.
else:
- raise AssertionError # Unreached.
- else:
- d[feature] = str(sd.getFeature(sec,ent))
+ d[feature] = str(sd.getFeature(sec,ent))
- return result
+ return result
+ finally:
+ self._lock.read_out()
def __find(self, lst, startAt, endAt):
"""Helper method. Given a list of (ServerInfo, where), return all
@@ -438,6 +552,8 @@
Only one element is returned for each nickname; if multiple
elements with a given nickname are valid over the given time
interval, the most-recently-published one is included.
+
+ Caller must hold read lock.
"""
# FFFF This is not really good: servers may be the same, even if
# FFFF their nicknames are different. The logic should probably
@@ -458,14 +574,20 @@
return u.values()
def getNicknameByKeyID(self, keyid):
- s = self.byKeyID.get(keyid)
- if not s:
- return None
- r = []
- for (desc,_) in s:
- if desc.getNickname().lower() not in r:
- r.append(desc.getNickname())
- return "/".join(r)
+ """Given a keyid, return the nickname of the server with that keyid.
+ Return None if no such server is known."""
+ self._lock.read_in()
+ try:
+ s = self.byKeyID.get(keyid)
+ if not s:
+ return None
+ r = []
+ for (desc,_) in s:
+ if desc.getNickname().lower() not in r:
+ r.append(desc.getNickname())
+ return "/".join(r)
+ finally:
+ self._lock.read_out()
def getNameByRelay(self, routingType, routingInfo):
"""Given a routingType, routingInfo (as string) tuple, return the
@@ -488,7 +610,11 @@
startAt = time.time()
if endAt is None:
endAt = time.time()+self.DEFAULT_REQUIRED_LIFETIME
- return self.__find(self.serverList, startAt, endAt)
+ self._lock.read_in()
+ try:
+ return self.__find(self.serverList, startAt, endAt)
+ finally:
+ self._lock.read_out()
def clean(self, now=None):
"""Remove all expired or superseded descriptors from DIR/servers."""
@@ -496,36 +622,68 @@
now = time.time()
cutoff = now - 600
- # List of (ServerInfo,where) not to scratch.
- newServers = []
- for info, where in self.serverList:
- lcnickname = info.getNickname().lower()
- # Find all other SI's with the same name.
- others = [ s for s, _ in self.byNickname[lcnickname] ]
- # Find all digests of servers with the same name, in the directory.
- inDirectory = [ s.getDigest()
- for s, w in self.byNickname[lcnickname]
- if w in ('D','D-') ]
- if (where not in ('D', 'D-')
- and (info.isExpiredAt(cutoff)
- or info.isSupersededBy(others)
- or info.getDigest() in inDirectory)):
- # If the descriptor is not in the directory, and it is
- # expired, is superseded, or is duplicated by a descriptor
- # from the directory, remove it.
- try:
- os.unlink(os.path.join(self.dir, "imported", where[2:]))
- except OSError, e:
- LOG.info("Couldn't remove %s: %s", where[2:], e)
- else:
- # Don't scratch non-superseded, non-expired servers.
- newServers.append((info, where))
+ self._lock.read_in()
+ try:
+ # List of sources to delete.
+ badSources = {}
+ for info, where in self.fullServerList:
+ lcnickname = info.getNickname().lower()
+ # Find all other SI's with the same name.
+ others = [ s for s, _ in self.byNickname[lcnickname] ]
+ # Find all digests of servers with the same name, in the dir
+ inDirectory = [ s.getDigest()
+ for s, w in self.byNickname[lcnickname]
+ if w in ('D','D-') ]
+ if (where not in ('D', 'D-')
+ and (info.isExpiredAt(cutoff)
+ or info.isSupersededBy(others)
+ or info.getDigest() in inDirectory)):
+ # If the descriptor is not in the directory, and it is
+ # expired, is superseded, or is duplicated by a descriptor
+ # from the directory, remove it.
+ badSources[where]=1
+ finally:
+ self._lock.read_out()
- # If we've actually deleted any servers, replace self.serverList and
- # rebuild.
- if len(self.serverList) != len(newServers):
- self.serverList = newServers
- self.rescan()
+ # If we've actually deleted any servers, adjust.
+ if badSources:
+ self.__removeBySource(badSources)
+
+ def __removeBySource(self, badSources):
+ """Helper method. Removes files from imported list by source"""
+ try:
+ self._lock.write_in()
+ self._diskLock.acquire()
+ try:
+ for s in badSources.keys():
+ fname = os.path.join(self.dir, "imported", s[2:])
+ try:
+ os.unlink(fname)
+ except OSError, e:
+ LOG.info("Couldn't remove %s: %s", s[2:], e)
+ finally:
+ self._diskLock.release()
+ for field in 'fullServerList', 'serverList', 'allServers':
+ val = getattr(self, field)
+ val = [ (i,w) for i,w in val if not badSources.has_key(w) ]
+ setattr(self,field,val)
+ for field in 'byNickname', 'byKeyID', 'byCapability':
+ d = getattr(self,field)
+ for k,v in d.items():
+ v = [ (i,w) for i,w in v if not badSources.has_key(w) ]
+ d[k]=v
+ for k,w in self.digestMap.items():
+ if badSources.has_key(w):
+ del self.digestMap[k]
+ except:
+ self._lock.write_out()
+ raise
+
+ self._lock.write_to_read()
+ try:
+ self.__save()
+ finally:
+ self._lock.read_out()
def getServerInfo(self, name, startAt=None, endAt=None, strict=0):
"""Return the most-recently-published ServerInfo for a given
@@ -546,17 +704,24 @@
return name
else:
LOG.error("Server is not currently valid")
- elif self.byNickname.has_key(name.lower()):
+ return None
+
+ self._lock.read_in()
+ try:
# If it's a nickname, return a serverinfo with that name.
- s = self.__find(self.byNickname[name.lower()], startAt, endAt)
+ lst = self.byNickname.get(name.lower())
+ finally:
+ self._lock.read_out()
- if not s:
+ if lst is not None:
+ sds = self.__find(lst, startAt, endAt)
+ if strict and not sds:
raise UIError(
"Couldn't find any currently live descriptor with name %s"
% name)
-
- s = s[0]
- return s
+ elif not sds:
+ return None
+ return sds[0]
elif os.path.exists(os.path.expanduser(name)):
# If it's a filename, try to read it.
fname = os.path.expanduser(name)
@@ -590,6 +755,17 @@
startAt, endAt -- A duration of time over which the
paths must remain valid.
"""
+ self._lock.read_in()
+ try:
+ return self._generatePaths(nPaths, pathSpec, exitAddress,
+ startAt, endAt, prng)
+ finally:
+ self._lock.read_out()
+
+ def _generatePaths(self, nPaths, pathSpec, exitAddress,
+ startAt=None, endAt=None,
+ prng=None):
+ """Helper: implement generatePaths, without getting lock"""
assert pathSpec.isReply == exitAddress.isReply
if prng is None:
@@ -627,7 +803,7 @@
else:
n1 = len(p1)
- path = self.getPath(p, startAt=startAt, endAt=endAt)
+ path = self._getPath(p, startAt=startAt, endAt=endAt)
path1,path2 = path[:n1], path[n1:]
paths.append( (path1,path2) )
if pathSpec.isReply or pathSpec.isSURB:
@@ -655,6 +831,14 @@
The path selection algorithm is described in path-spec.txxt
"""
+ self._lock.read_in()
+ try:
+ return self._getPath(template, startAt, endAt, prng)
+ finally:
+ self._lock.read_out()
+
+ def _getPath(self, template, startAt=None, endAt=None, prng=None):
+ """Helper: implement getPath, without getting lock"""
# Fill in startAt, endAt, prng if not provided
if startAt is None:
startAt = time.time()
@@ -742,6 +926,16 @@
If warnUnrecommended is true, give a warning if the user has
requested any unrecommended servers.
"""
+ self._lock.read_in()
+ try:
+ return self._validatePath(pathSpec, exitAddress, startAt, endAt,
+ warnUnrecommended)
+ finally:
+ self._lock.read_out()
+
+ def _validatePath(self, pathSpec, exitAddress, startAt=None, endAt=None,
+ warnUnrecommended=1):
+ """Helper: implement validatePath without getting lock"""
if startAt is None: startAt = time.time()
if endAt is None: endAt = startAt+self.DEFAULT_REQUIRED_LIFETIME
@@ -799,14 +993,24 @@
warned[lc_nickname] = 1
LOG.warn("Server %s is not recommended",fixed.getNickname())
- def checkClientVersion(self):
+ def checkSoftwareVersion(self,client=1):
"""Check the current client's version against the stated version in
the most recently downloaded directory; print a warning if this
version isn't listed as recommended.
"""
- if not self.clientVersions:
- return
- allowed = self.clientVersions.split()
+ self._lock.read_in()
+ try:
+ if client:
+ if not self.clientVersions:
+ return
+ allowed = self.clientVersions.split()
+ else:
+ if not self.serverVersions:
+ return
+ allowed = self.serverVersions.split()
+ finally:
+ self._lock.read_out()
+
current = mixminion.__version__
if current in allowed:
# This version is recommended.
@@ -1051,7 +1255,7 @@
nickname = desc.getNickname()
if self.headers:
- #XXXX007 remove this eventually.
+ #XXXX008 remove this eventually.
sware = desc['Server'].get("Software","")
if (sware.startswith("Mixminion 0.0.4") or
sware.startswith("Mixminion 0.0.5alpha1")):
Index: ClientMain.py
===================================================================
RCS file: /home/minion/cvsroot/src/minion/lib/mixminion/ClientMain.py,v
retrieving revision 1.148
retrieving revision 1.149
diff -u -d -r1.148 -r1.149
--- ClientMain.py 7 Jan 2004 20:42:49 -0000 1.148
+++ ClientMain.py 8 Jan 2004 22:35:24 -0000 1.149
@@ -463,10 +463,6 @@
else:
handles = self.queuePackets(pktList, routingInfo)
- if len(pktList) > 1:
- mword = "packets"
- else:
- mword = "packet"
packetsSentByIndex = {}
def callback(idx, packetsSentByIndex=packetsSentByIndex):
@@ -490,7 +486,12 @@
try:
clientLock()
if nGood:
+ if len(pktList) > 1:
+ mword = "packets"
+ else:
+ mword = "packet"
LOG.info("... %s %s sent", nGood, mword)
+ LOG.trace("Removing %s successful packets from queue", nGood)
for idx in packetsSentByIndex.keys():
if handles and handles[idx]:
self.queue.removePacket(handles[idx])
@@ -507,8 +508,8 @@
LOG.error("Error with queueing disabled: %s/%s lost",
nBad, nGood+nBad)
elif nBad and lazyQueue:
- LOG.info("Error while delivering %s; %s/%s left in queue",
- mword,nBad,nGood+nBad)
+ LOG.info("Error while delivering packets; %s/%s left in queue",
+ nBad,nGood+nBad)
badPackets = [ pktList[idx] for idx in xrange(len(pktList))
if not packetsSentByIndex.has_key(idx) ]
@@ -893,8 +894,10 @@
if self.wantClientDirectory:
assert self.wantConfig
+ assert _CLIENT_LOCKFILE
LOG.debug("Configuring server list")
- self.directory = mixminion.ClientDirectory.ClientDirectory(userdir)
+ self.directory = mixminion.ClientDirectory.ClientDirectory(
+ userdir, _CLIENT_LOCKFILE)
self.directory._installAsKeyIDResolver()
if self.wantDownload:
@@ -909,7 +912,7 @@
clientUnlock()
if self.wantClientDirectory or self.wantDownload:
- self.directory.checkClientVersion()
+ self.directory.checkSoftwareVersion(client=1)
def parsePath(self):
# Sets: exitAddress, pathSpec.
Index: test.py
===================================================================
RCS file: /home/minion/cvsroot/src/minion/lib/mixminion/test.py,v
retrieving revision 1.175
retrieving revision 1.176
diff -u -d -r1.175 -r1.176
--- test.py 8 Jan 2004 18:09:49 -0000 1.175
+++ test.py 8 Jan 2004 22:35:24 -0000 1.176
@@ -55,8 +55,10 @@
import mixminion.Filestore
import mixminion.Fragments
import mixminion.MMTPClient
+import mixminion.NetUtils
import mixminion.Packet
import mixminion.ServerInfo
+import mixminion.ThreadUtils
import mixminion.TLSConnection
import mixminion._minionlib as _ml
import mixminion.server.MMTPServer
@@ -722,9 +724,8 @@
"A B C\n-----END X-----\n", ["X"], 1)
def test_clearableQueue(self):
- import Queue
#FFFF This test is inadequate for weird multithreaded
- q = mixminion.Common.ClearableQueue()
+ q = mixminion.ThreadUtils.ClearableQueue()
self.assert_(q.empty())
q.put(1)
q.put(2)
@@ -732,7 +733,20 @@
self.assertEquals(1, q.get())
q.clear()
self.assert_(q.empty())
- self.assertRaises(Queue.Empty, q.get_nowait)
+ self.assertRaises(mixminion.ThreadUtils.QueueEmpty, q.get_nowait)
+
+ def test_rwlock(self):
+ RWLock = mixminion.ThreadUtils.RWLock
+ lock = RWLock()
+ # XXXX007 This only tests non-blocking cases
+ lock.read_in()
+ lock.read_in()
+ lock.read_out()
+ lock.read_out()
+ lock.write_in()
+ lock.write_out()
+ lock.read_in()
+ lock.read_out()
def test_englishSequence(self):
es = englishSequence
@@ -6544,9 +6558,9 @@
self.assertSameSD(edesc["Joe"][1],
ks.getServerInfo("Joe", startAt=now+10*oneDay))
self.assertRaises(MixError, ks.getServerInfo, "Joe",
- startAt=now+30*oneDay)
+ startAt=now+30*oneDay, strict=1)
self.assertRaises(MixError, ks.getServerInfo, "Joe", startAt=now,
- endAt=now+6*oneDay)
+ endAt=now+6*oneDay, strict=1)
if i in (0,1,2):
ks = mixminion.ClientDirectory.ClientDirectory(dirname)
if i == 1:
@@ -6959,7 +6973,7 @@
ks.clean() # Should do nothing.
ks = mixminion.ClientDirectory.ClientDirectory(dirname)
ks.clean(now=now+oneDay*500) # Should zap all of imported servers.
- raises(MixError, ks.getServerInfo, "Lola")
+ raises(MixError, ks.getServerInfo, "Lola", strict=1)
def testFeatureMaps(self):
from mixminion.ClientDirectory import compressFeatureMap
@@ -7501,7 +7515,7 @@
tc = loader.loadTestsFromTestCase
if 0:
- suite.addTest(tc(FragmentTests))
+ suite.addTest(tc(ClientDirectoryTests))
return suite
testClasses = [MiscTests,
MinionlibCryptoTests,