[Author Prev][Author Next][Thread Prev][Thread Next][Author Index][Thread Index]

[minion-cvs] Make ClientDirectory threadsafe, and note that ClientDi...



Update of /home/minion/cvsroot/src/minion/lib/mixminion
In directory moria.mit.edu:/tmp/cvs-serv23590/lib/mixminion

Modified Files:
	ClientDirectory.py ClientMain.py test.py 
Log Message:
Make ClientDirectory threadsafe, and note that ClientDirectory has gotten entirely too big.

Index: ClientDirectory.py
===================================================================
RCS file: /home/minion/cvsroot/src/minion/lib/mixminion/ClientDirectory.py,v
retrieving revision 1.24
retrieving revision 1.25
diff -u -d -r1.24 -r1.25
--- ClientDirectory.py	7 Jan 2004 02:50:08 -0000	1.24
+++ ClientDirectory.py	8 Jan 2004 22:35:24 -0000	1.25
@@ -17,11 +17,11 @@
 import re
 import socket
 import stat
+import threading
 import time
 import types
 import urllib2
 
-import mixminion.ClientMain #XXXX -- it would be better not to need this.
 import mixminion.Config
 import mixminion.Crypto
 import mixminion.NetUtils
@@ -34,16 +34,21 @@
 from mixminion.Packet import MBOX_TYPE, SMTP_TYPE, DROP_TYPE, FRAGMENT_TYPE, \
      parseMBOXInfo, parseRelayInfoByType, parseSMTPInfo, ParseError, \
      ServerSideFragmentedMessage
+from mixminion.ThreadUtils import RWLock
 
 # FFFF This should be made configurable and adjustable.
 MIXMINION_DIRECTORY_URL = "http://mixminion.net/directory/Directory.gz";
 MIXMINION_DIRECTORY_FINGERPRINT = "CD80DD1B8BE7CA2E13C928D57499992D56579CCD"
 
+#XXXX This class has become unwieldy.  It should get refactored into:
+#XXXX "abstract server set", "directory-based server set", "disk-backed server
+#XXXX set", and "path generator".
+
 class ClientDirectory:
     """A ClientDirectory manages a list of server descriptors, either
        imported from the command line or from a directory."""
     ##Fields:
-    # dir: directory where we store everything.
+    # dir: directory where we store everything.  This field is immutable.
     # lastModified: time when we last modified this directory.
     # lastDownload: time when we last downloaded a directory
     # serverList: List of (ServerInfo, 'D'|'D-'|'I:filename') tuples.  The
@@ -54,15 +59,22 @@
     # digestMap: Map of (Digest -> 'D'|'D-'|'I:filename').
     # byNickname: Map from nickname.lower() to list of (ServerInfo, source)
     #   tuples.
-    # byKeyID: Map from desc.getKeyDigest() to list of ServerInfo.
+    # byKeyID: Map from desc.getKeyDigest() to list of (ServerInfo,source)
     # byCapability: Map from capability ('mbox'/'smtp'/'relay'/None) to
     #    list of (ServerInfo, source) tuples.
     # allServers: Same as byCapability[None]
     # __scanning: Flag to prevent recursive invocation of self.rescan().
+    # __downloading: Flag to prevent simultaneous invocation of 
+    #    downloadDirectory()
     # clientVersions: String of allowable client versions as retrieved
     #    from most recent directory.
+    # serverVersions: String of allowable server versions as retrieved
+    #    from most recent directory.
     # goodServerNicknames: A map from lowercased nicknames of recommended
     #    servers to 1.
+    # _lock: An instance of RWLock, used to synchonize access to all fields.
+    # _diskLock: An instance of RLock or Lockfile, used to synchronize
+    #    access to the disk.
     ## Layout:
     # DIR/cache: A cPickled tuple of ("ClientKeystore-0.3",
     #         lastModified, lastDownload, clientVersions, serverList,
@@ -77,20 +89,27 @@
     #  much need to do this at the client side.)
     DEFAULT_REQUIRED_LIFETIME = 1
 
-    def __init__(self, directory):
+    def __init__(self, directory, diskLock=None):
         """Create a new ClientDirectory to keep directories and descriptors
-           under <directory>."""
+           under <directory>.  If diskLock is provided, acquire diskLock
+           before accessing the disk."""
         self.dir = directory
         createPrivateDir(self.dir)
         createPrivateDir(os.path.join(self.dir, "imported"))
         self.digestMap = {}
         self.__scanning = 0
+        self.__downloading = 0
+        self._lock = RWLock()
+        if diskLock is None:
+            self._diskLock = threading.RLock()
+        else:
+            self._diskLock = diskLock
         try:
-            mixminion.ClientMain.clientLock() # XXXX disentangle
+            self._diskLock.acquire()
             self.__load()
             self.clean()
         finally:
-            mixminion.ClientMain.clientUnlock() # XXXX
+            self._diskLock.release()
 
     def updateDirectory(self, forceDownload=0, timeout=None, now=None):
         """Download a directory from the network as needed."""
@@ -106,6 +125,25 @@
         """Download a new directory from the network, validate it, and
            rescan its servers.  If the operation doesn't complete within
            timeout seconds, raise an error."""
+        try:
+            self._lock.write_in()
+            if self.__downloading:
+                LOG.info("Download already in progress")
+                return 0
+            self.__downloading = 1
+        finally:
+            self._lock.write_out()
+            
+        try:
+            self._downloadDirectory(timeout)
+        finally:
+            self._lock.write_in()
+            self.__downloading = 0
+            self._lock.write_out()
+
+
+    def _downloadDirectory(self, timeout=None):
+        """Helper: implements downloadDirectory but doesn't hold lock."""
         # Start downloading the directory.
         url = MIXMINION_DIRECTORY_URL
         LOG.info("Downloading directory from %s", url)
@@ -153,10 +191,15 @@
         outfile.close()
         # Open and validate the directory
         LOG.info("Validating directory")
+
+        self._lock.read_in()
+        digestMap = self.digestMap
+        self._lock.read_out()
+
         try:
             directory = mixminion.ServerInfo.ServerDirectory(
                 fname=fname,
-                validatedDigests=self.digestMap)
+                validatedDigests=digestMap)
         except mixminion.Config.ConfigError, e:
             raise MixFatalError("Downloaded invalid directory: %s" % e)
 
@@ -166,13 +209,18 @@
         if fp and mixminion.Crypto.pk_fingerprint(identity) != fp:
             raise MixFatalError("Bad identity key on directory")
 
-        tryUnlink(os.path.join(self.dir, "cache"))
-
-        # Install the new directory
-        if gz:
-            replaceFile(fname, os.path.join(self.dir, "dir.gz"))
-        else:
-            replaceFile(fname, os.path.join(self.dir, "dir"))
+        try:
+            self._lock.write_in()
+            self._diskLock.acquire()
+            # Install the new directory
+            tryUnlink(os.path.join(self.dir, "cache"))
+            if gz:
+                replaceFile(fname, os.path.join(self.dir, "dir.gz"))
+            else:
+                replaceFile(fname, os.path.join(self.dir, "dir"))
+        finally:
+            self._diskLock.release()
+            self._lock.write_out()
 
         # And regenerate the cache.
         self.rescan()
@@ -181,46 +229,64 @@
 
     def rescan(self, force=None, now=None):
         """Regenerate the cache based on files on the disk."""
-        self.lastModified = self.lastDownload = -1
-        self.serverList = []
-        self.fullServerList = []
-        self.clientVersions = None
-        self.goodServerNicknames = {}
+
+        # These variables shadow fields of self, until we replace them at the
+        # end.
+        s_lastModified = s_lastDownload = -1
+        s_serverList = []
+        s_fullServerList = []
+        s_clientVersions = None
+        s_serverVersions = None
+        s_goodServerNicknames = {}
 
         if force:
-            self.digestMap = {}
+            s_digestMap = {}
+        else:
+            self._lock.read_in()
+            s_digestMap = self.digestMap.copy()
+            self._lock.read_out()
 
         # Read the servers from the directory.
         gzipFile = os.path.join(self.dir, "dir.gz")
         dirFile = os.path.join(self.dir, "dir")
         for fname in gzipFile, dirFile:
-            if not os.path.exists(fname): continue
-            self.lastDownload = self.lastModified = \
-                                os.stat(fname)[stat.ST_MTIME]
+            self._diskLock.acquire()
             try:
-                directory = mixminion.ServerInfo.ServerDirectory(
-                    fname=fname,
-                    validatedDigests=self.digestMap)
-            except mixminion.Config.ConfigError:
-                LOG.warn("Ignoring invalid directory (!)")
+                if not os.path.exists(fname): continue
+                s_lastDownload = s_lastModified = \
+                                 os.stat(fname)[stat.ST_MTIME]
+                try:
+                    directory = mixminion.ServerInfo.ServerDirectory(
+                        fname=fname,
+                        validatedDigests=s_digestMap)
+                except mixminion.Config.ConfigError:
+                    LOG.warn("Ignoring invalid directory (!)")
+                    directory = None
+            finally:
+                self._diskLock.release()
+
+            if directory is None:
                 continue
 
-            for s in directory.getServers():
-                self.serverList.append((s, 'D'))
-                self.digestMap[s.getDigest()] = 'D'
-                self.goodServerNicknames[s.getNickname().lower()] = 1
+            for server in directory.getServers():
+                s_serverList.append((server, 'D'))
+                s_digestMap[server.getDigest()] = 'D'
+                s_goodServerNicknames[server.getNickname().lower()] = 1
 
-            for s in directory.getAllServers():
-                if self.goodServerNicknames.has_key(s.getNickname().lower()):
+            for server in directory.getAllServers():
+                lcnick = server.getNickname().lower()
+                if s_goodServerNicknames.has_key(lcnick):
                     where = 'D'
                 else:
                     where = 'D-'
 
-                self.fullServerList.append((s, where))
-                self.digestMap[s.getDigest()] = where
+                s_fullServerList.append((server, where))
+                s_digestMap[server.getDigest()] = where
 
-            self.clientVersions = (
+            s_clientVersions = (
                 directory['Recommended-Software'].get("MixminionClient"))
+            s_clientVersions = (
+                directory['Recommended-Software'].get("MixminionServer"))
             break
 
         # Now check the server in DIR/servers.
@@ -230,25 +296,40 @@
             # Try to read a file: is it a server descriptor?
             p = os.path.join(serverDir, fn)
             try:
-                # Use validatedDigests *only* when not explicitly forced.
+                # If force is true, we'll always re-check signaturues.
+                # Otherwise, we do so only with unrecognized servers.
                 info = mixminion.ServerInfo.ServerInfo(fname=p, assumeValid=0,
-                                  validatedDigests=self.digestMap)
+                                  validatedDigests=s_digestMap)
             except mixminion.Config.ConfigError:
                 LOG.warn("Invalid server descriptor %s", p)
                 continue
             mtime = os.stat(p)[stat.ST_MTIME]
-            if mtime > self.lastModified:
-                self.lastModifed = mtime
-            self.serverList.append((info, "I:%s"%fn))
-            self.fullServerList.append((info, "I:%s"%fn))
-            self.digestMap[info.getDigest()] = "I:%s"%fn
-            self.goodServerNicknames[info.getNickname().lower()] = 1
+            if mtime > s_lastModified:
+                s_lastModifed = mtime
+            s_serverList.append((info, "I:%s"%fn))
+            s_fullServerList.append((info, "I:%s"%fn))
+            s_digestMap[info.getDigest()] = "I:%s"%fn
+            s_goodServerNicknames[info.getNickname().lower()] = 1
 
-        # Regenerate the cache
-        self.__save()
-        # Now try reloading, to make sure we can, and to get __rebuildTables.
-        self.__scanning = 1
-        self.__load()
+        # Now, finally, we're done.  Replace the state of this class.
+        try:
+            self._lock.write_in()
+            self.lastModified = s_lastModified
+            self.lastDownload = s_lastDownload
+            self.serverList = s_serverList
+            self.fullServerList = s_fullServerList
+            self.clientVersions = s_clientVersions
+            self.serverVersions = s_serverVersions
+            self.goodServerNicknames = s_goodServerNicknames
+            self.digestMap = s_digestMap
+            
+            # Regenerate the cache
+            self.__save()
+            # Now try reloading, to make sure we can, and for __rebuildTables.
+            self.__scanning = 1
+            self.__load()
+        finally:
+            self._lock.write_out()
 
     def __load(self):
         """Helper method. Read the cached parsed descriptors from disk."""
@@ -256,8 +337,9 @@
             cached = readPickled(os.path.join(self.dir, "cache"))
             magic = cached[0]
             if magic == self.MAGIC:
-                _, self.lastModified, self.lastDownload, self.clientVersions, \
-                   self.serverList, self.fullServerList, self.digestMap \
+                _, self.lastModified, self.lastDownload, \
+                   self.clientVersions, self.serverList, \
+                   self.fullServerList, self.digestMap \
                    = cached
                 self.__rebuildTables()
                 return
@@ -272,12 +354,16 @@
         self.rescan()
 
     def __save(self):
-        """Helper method. Recreate the cache on disk."""
-        data = (self.MAGIC,
-                self.lastModified, self.lastDownload,
-                self.clientVersions, self.serverList, self.fullServerList,
-                self.digestMap)
-        writePickled(os.path.join(self.dir, "cache"), data)
+        """Helper method. Recreate the cache on disk.  Must hold read lock."""
+        self._diskLock.acquire()
+        try:
+            data = (self.MAGIC,
+                    self.lastModified, self.lastDownload,
+                    self.clientVersions, self.serverList, self.fullServerList,
+                    self.digestMap)
+            writePickled(os.path.join(self.dir, "cache"), data)
+        finally:
+            self._diskLock.release()
 
     def _installAsKeyIDResolver(self):
         """Use this ClientDirectory to identify servers in calls to
@@ -286,77 +372,100 @@
 
     def importFromFile(self, filename):
         """Import a new server descriptor stored in 'filename'"""
+        self._lock.write_in()
+        self._lock.write_out()
+        self._lock.read_in()
+        digestMap = self.digestMap
+        self._lock.read_out()
 
         contents = readPossiblyGzippedFile(filename)
         info = mixminion.ServerInfo.ServerInfo(string=contents,
-                                               validatedDigests=self.digestMap)
+                                               validatedDigests=digestMap)
 
         nickname = info.getNickname()
         lcnickname = nickname.lower()
         identity = info.getIdentity()
-        # Make sure that the identity key is consistent with what we know.
-        for s, _ in self.serverList:
-            if s.getNickname() == nickname:
-                if not mixminion.Crypto.pk_same_public_key(identity,
-                                                           s.getIdentity()):
-                    raise MixError("Identity key changed for server %s in %s"%(
-                                   nickname, filename))
 
-        # Have we already imported this server?
-        if self.digestMap.get(info.getDigest(), "X").startswith("I:"):
-            raise UIError("Server descriptor is already imported")
+        self._lock.read_in()
+        try:
+            # Make sure that the identity key is consistent with what we know.
+            for s, _ in self.serverList:
+                if s.getNickname() == nickname:
+                    if not mixminion.Crypto.pk_same_public_key(identity,
+                                                              s.getIdentity()):
+                        raise MixError(
+                            "Identity key changed for server %s in %s"%(
+                                nickname, filename))
 
-        # Is the server expired?
-        if info.isExpiredAt(time.time()):
-            raise UIError("Server descriptor is expired")
+            # Have we already imported this server?
+            if self.digestMap.get(info.getDigest(), "X").startswith("I:"):
+                raise UIError("Server descriptor is already imported")
 
-        # Is the server superseded?
-        if self.byNickname.has_key(lcnickname):
-            if info.isSupersededBy([s for s,_ in self.byNickname[lcnickname]]):
-                raise UIError("Server descriptor is already superseded")
+            # Is the server expired?
+            if info.isExpiredAt(time.time()):
+                raise UIError("Server descriptor is expired")
 
-        # Copy the server into DIR/servers.
-        fnshort = "%s-%s"%(nickname, formatFnameTime())
-        fname = os.path.join(self.dir, "imported", fnshort)
-        f = openUnique(fname)[0]
-        f.write(contents)
-        f.close()
-        # Now store into the cache.
-        fnshort = os.path.split(fname)[1]
-        self.serverList.append((info, 'I:%s'%fnshort))
-        self.fullServerList.append((info, 'I:%s'%fnshort))
-        self.digestMap[info.getDigest()] = 'I:%s'%fnshort
-        self.lastModified = time.time()
-        self.__save()
-        self.__rebuildTables()
+            # Is the server superseded?
+            if self.byNickname.has_key(lcnickname):
+                if info.isSupersededBy(
+                    [s for s,_ in self.byNickname[lcnickname]]):
+                    raise UIError("Server descriptor is already superseded")
+        finally:
+            self._lock.read_out()
+
+        self._lock.write_in()
+        try:
+            
+            self._diskLock.acquire()
+            try:
+                # Copy the server into DIR/servers.
+                fnshort = "%s-%s"%(nickname, formatFnameTime())
+                fname = os.path.join(self.dir, "imported", fnshort)
+                f = openUnique(fname)[0]
+                f.write(contents)
+                f.close()
+            finally:
+                self._diskLock.release()
+
+            # Now store into the cache.
+            fnshort = os.path.split(fname)[1]
+            self.serverList.append((info, 'I:%s'%fnshort))
+            self.fullServerList.append((info, 'I:%s'%fnshort))
+            self.digestMap[info.getDigest()] = 'I:%s'%fnshort
+            self.lastModified = time.time()
+            self.__rebuildTables()
+        except:
+            self._lock.write_out()
+            raise
+
+        self._lock.write_to_read()
+        try:
+            self.__save()
+        finally:
+            self._lock.read_out()
 
     def expungeByNickname(self, nickname):
         """Remove all imported (non-directory) server nicknamed 'nickname'."""
         lcnickname = nickname.lower()
-        n = 0 # number removed
-        newList = [] # replacement for serverList.
+        badSources = {}
+        try:
+            self._lock.write_in()
+            for info, source in self.serverList:
+                if source[0]=='D' or info.getNickname().lower() != lcnickname:
+                    continue
+                badSources[source] = 1
+        finally:
+            self._lock.write_out()
 
-        for info, source in self.serverList:
-            if source == 'D' or info.getNickname().lower() != lcnickname:
-                newList.append((info, source))
-                continue
-            n += 1
-            try:
-                fn = source[2:]
-                os.unlink(os.path.join(self.dir, "imported", fn))
-            except OSError, e:
-                LOG.error("Couldn't remove %s: %s", fn, e)
+        if badSources:
+            self.__removeBySource(badSources)
 
-        self.serverList = newList
-        # Recreate cache if needed.
-        if n:
-            self.rescan()
-        return n
+        return len(badSources)
 
     def __rebuildTables(self):
         """Helper method.  Reconstruct byNickname, byKeyID,
            allServers, and byCapability from the internal start of
-           this object.  """
+           this object.  Caller must hold write lock."""
         self.byNickname = {}
         self.byKeyID = {}
         self.allServers = []
@@ -396,40 +505,45 @@
 
            If 'goodOnly' is true, use only recommended servers.
         """
-        result = {}
-        if not self.fullServerList:
-            return {}
-        dirFeatures = [ 'status' ]
-        resFeatures = []
-        for f in features:
-            if f.lower() in dirFeatures:
-                resFeatures.append((f, ('+', f.lower())))
-            else:
-                feature = mixminion.Config.resolveFeatureName(
-                    f, mixminion.ServerInfo.ServerInfo)
-                resFeatures.append((f, feature))
-        for sd, _ in self.fullServerList:
-            if at and not sd.isValidAt(at):
-                continue
-            nickname = sd.getNickname()
-            isGood = self.goodServerNicknames.get(nickname.lower(), 0)
-            if goodOnly and not isGood:
-                continue
-            va, vu = sd['Server']['Valid-After'], sd['Server']['Valid-Until']
-            d = result.setdefault(nickname, {}).setdefault((va,vu), {})
-            for feature,(sec,ent) in resFeatures:
-                if sec == '+':
-                    if ent == 'status':
-                        if isGood:
-                            d['status'] = "(ok)"
+        self._lock.read_in()
+        try:
+            result = {}
+            if not self.fullServerList:
+                return {}
+            dirFeatures = [ 'status' ]
+            resFeatures = []
+            for f in features:
+                if f.lower() in dirFeatures:
+                    resFeatures.append((f, ('+', f.lower())))
+                else:
+                    feature = mixminion.Config.resolveFeatureName(
+                        f, mixminion.ServerInfo.ServerInfo)
+                    resFeatures.append((f, feature))
+            for sd, _ in self.fullServerList:
+                if at and not sd.isValidAt(at):
+                    continue
+                nickname = sd.getNickname()
+                isGood = self.goodServerNicknames.get(nickname.lower(), 0)
+                if goodOnly and not isGood:
+                    continue
+                va = sd['Server']['Valid-After']
+                vu = sd['Server']['Valid-Until']
+                d = result.setdefault(nickname, {}).setdefault((va,vu), {})
+                for feature,(sec,ent) in resFeatures:
+                    if sec == '+':
+                        if ent == 'status':
+                            if isGood:
+                                d['status'] = "(ok)"
+                            else:
+                                d['status'] = "(not recommended)"
                         else:
-                            d['status'] = "(not recommended)"
+                            raise AssertionError # Unreached.
                     else:
-                        raise AssertionError # Unreached.
-                else:
-                    d[feature] = str(sd.getFeature(sec,ent))
+                        d[feature] = str(sd.getFeature(sec,ent))
 
-        return result
+            return result
+        finally:
+            self._lock.read_out()
 
     def __find(self, lst, startAt, endAt):
         """Helper method.  Given a list of (ServerInfo, where), return all
@@ -438,6 +552,8 @@
            Only one element is returned for each nickname; if multiple
            elements with a given nickname are valid over the given time
            interval, the most-recently-published one is included.
+
+           Caller must hold read lock.
            """
         # FFFF This is not really good: servers may be the same, even if
         # FFFF their nicknames are different.  The logic should probably
@@ -458,14 +574,20 @@
         return u.values()
 
     def getNicknameByKeyID(self, keyid):
-        s = self.byKeyID.get(keyid)
-        if not s:
-            return None
-        r = []
-        for (desc,_) in s:
-            if desc.getNickname().lower() not in r:
-                r.append(desc.getNickname())
-        return "/".join(r)
+        """Given a keyid, return the nickname of the server with that keyid.
+           Return None if no such server is known."""
+        self._lock.read_in()
+        try:
+            s = self.byKeyID.get(keyid)
+            if not s:
+                return None
+            r = []
+            for (desc,_) in s:
+                if desc.getNickname().lower() not in r:
+                    r.append(desc.getNickname())
+            return "/".join(r)
+        finally:
+            self._lock.read_out()
 
     def getNameByRelay(self, routingType, routingInfo):
         """Given a routingType, routingInfo (as string) tuple, return the
@@ -488,7 +610,11 @@
             startAt = time.time()
         if endAt is None:
             endAt = time.time()+self.DEFAULT_REQUIRED_LIFETIME
-        return self.__find(self.serverList, startAt, endAt)
+        self._lock.read_in()
+        try:
+            return self.__find(self.serverList, startAt, endAt)
+        finally:
+            self._lock.read_out()
 
     def clean(self, now=None):
         """Remove all expired or superseded descriptors from DIR/servers."""
@@ -496,36 +622,68 @@
             now = time.time()
         cutoff = now - 600
 
-        # List of (ServerInfo,where) not to scratch.
-        newServers = []
-        for info, where in self.serverList:
-            lcnickname = info.getNickname().lower()
-            # Find all other SI's with the same name.
-            others = [ s for s, _ in self.byNickname[lcnickname] ]
-            # Find all digests of servers with the same name, in the directory.
-            inDirectory = [ s.getDigest()
-                            for s, w in self.byNickname[lcnickname]
-                            if w in ('D','D-') ]
-            if (where not in ('D', 'D-')
-                and (info.isExpiredAt(cutoff)
-                     or info.isSupersededBy(others)
-                     or info.getDigest() in inDirectory)):
-                # If the descriptor is not in the directory, and it is
-                # expired, is superseded, or is duplicated by a descriptor
-                # from the directory, remove it.
-                try:
-                    os.unlink(os.path.join(self.dir, "imported", where[2:]))
-                except OSError, e:
-                    LOG.info("Couldn't remove %s: %s", where[2:], e)
-            else:
-                # Don't scratch non-superseded, non-expired servers.
-                newServers.append((info, where))
+        self._lock.read_in()
+        try:
+            # List of sources to delete.
+            badSources = {}
+            for info, where in self.fullServerList:
+                lcnickname = info.getNickname().lower()
+                # Find all other SI's with the same name.
+                others = [ s for s, _ in self.byNickname[lcnickname] ]
+                # Find all digests of servers with the same name, in the dir
+                inDirectory = [ s.getDigest()
+                                for s, w in self.byNickname[lcnickname]
+                                if w in ('D','D-') ]
+                if (where not in ('D', 'D-')
+                    and (info.isExpiredAt(cutoff)
+                         or info.isSupersededBy(others)
+                         or info.getDigest() in inDirectory)):
+                    # If the descriptor is not in the directory, and it is
+                    # expired, is superseded, or is duplicated by a descriptor
+                    # from the directory, remove it.
+                    badSources[where]=1
+        finally:
+            self._lock.read_out()
 
-        # If we've actually deleted any servers, replace self.serverList and
-        # rebuild.
-        if len(self.serverList) != len(newServers):
-            self.serverList = newServers
-            self.rescan()
+        # If we've actually deleted any servers, adjust.
+        if badSources:
+            self.__removeBySource(badSources)
+
+    def __removeBySource(self, badSources):
+        """Helper method.  Removes files from imported list by source"""
+        try:
+            self._lock.write_in()
+            self._diskLock.acquire()
+            try:
+                for s in badSources.keys():
+                    fname = os.path.join(self.dir, "imported", s[2:])
+                    try:
+                        os.unlink(fname)
+                    except OSError, e:
+                        LOG.info("Couldn't remove %s: %s", s[2:], e)
+            finally:
+                self._diskLock.release()
+            for field in 'fullServerList', 'serverList', 'allServers':
+                val = getattr(self, field)
+                val = [ (i,w) for i,w in val if not badSources.has_key(w) ]
+                setattr(self,field,val)
+            for field in 'byNickname', 'byKeyID', 'byCapability':
+                d = getattr(self,field)
+                for k,v in d.items():
+                    v = [ (i,w) for i,w in v if not badSources.has_key(w) ]
+                    d[k]=v
+            for k,w in self.digestMap.items():
+                if badSources.has_key(w):
+                    del self.digestMap[k]
+        except:
+            self._lock.write_out()
+            raise
+
+        self._lock.write_to_read()
+        try:
+            self.__save()
+        finally:
+            self._lock.read_out()
 
     def getServerInfo(self, name, startAt=None, endAt=None, strict=0):
         """Return the most-recently-published ServerInfo for a given
@@ -546,17 +704,24 @@
                 return name
             else:
                 LOG.error("Server is not currently valid")
-        elif self.byNickname.has_key(name.lower()):
+                return None
+
+        self._lock.read_in()
+        try:
             # If it's a nickname, return a serverinfo with that name.
-            s = self.__find(self.byNickname[name.lower()], startAt, endAt)
+            lst = self.byNickname.get(name.lower())
+        finally:
+            self._lock.read_out()
 
-            if not s:
+        if lst is not None:
+            sds = self.__find(lst, startAt, endAt)
+            if strict and not sds:
                 raise UIError(
                     "Couldn't find any currently live descriptor with name %s"
                     % name)
-
-            s = s[0]
-            return s
+            elif not sds:
+                return None
+            return sds[0]
         elif os.path.exists(os.path.expanduser(name)):
             # If it's a filename, try to read it.
             fname = os.path.expanduser(name)
@@ -590,6 +755,17 @@
                 startAt, endAt -- A duration of time over which the
                    paths must remain valid.
         """
+        self._lock.read_in()
+        try:
+            return self._generatePaths(nPaths, pathSpec, exitAddress,
+                                       startAt, endAt, prng)
+        finally:
+            self._lock.read_out()
+
+    def _generatePaths(self, nPaths, pathSpec, exitAddress,
+                       startAt=None, endAt=None,
+                       prng=None):
+        """Helper: implement generatePaths, without getting lock"""
         assert pathSpec.isReply == exitAddress.isReply
 
         if prng is None:
@@ -627,7 +803,7 @@
             else:
                 n1 = len(p1)
 
-            path = self.getPath(p, startAt=startAt, endAt=endAt)
+            path = self._getPath(p, startAt=startAt, endAt=endAt)
             path1,path2 = path[:n1], path[n1:]
             paths.append( (path1,path2) )
             if pathSpec.isReply or pathSpec.isSURB:
@@ -655,6 +831,14 @@
 
            The path selection algorithm is described in path-spec.txxt
         """
+        self._lock.read_in()
+        try:
+            return self._getPath(template, startAt, endAt, prng)
+        finally:
+            self._lock.read_out()
+
+    def _getPath(self, template, startAt=None, endAt=None, prng=None):
+        """Helper: implement getPath, without getting lock"""
         # Fill in startAt, endAt, prng if not provided
         if startAt is None:
             startAt = time.time()
@@ -742,6 +926,16 @@
            If warnUnrecommended is true, give a warning if the user has
            requested any unrecommended servers.
            """
+        self._lock.read_in()
+        try:
+            return self._validatePath(pathSpec, exitAddress, startAt, endAt,
+                                      warnUnrecommended)
+        finally:
+            self._lock.read_out()
+
+    def _validatePath(self, pathSpec, exitAddress, startAt=None, endAt=None,
+                     warnUnrecommended=1):    
+        """Helper: implement validatePath without getting lock"""
         if startAt is None: startAt = time.time()
         if endAt is None: endAt = startAt+self.DEFAULT_REQUIRED_LIFETIME
 
@@ -799,14 +993,24 @@
                 warned[lc_nickname] = 1
                 LOG.warn("Server %s is not recommended",fixed.getNickname())
 
-    def checkClientVersion(self):
+    def checkSoftwareVersion(self,client=1):
         """Check the current client's version against the stated version in
            the most recently downloaded directory; print a warning if this
            version isn't listed as recommended.
            """
-        if not self.clientVersions:
-            return
-        allowed = self.clientVersions.split()
+        self._lock.read_in()
+        try:
+            if client:
+                if not self.clientVersions:
+                    return
+                allowed = self.clientVersions.split()
+            else:
+                if not self.serverVersions:
+                    return
+                allowed = self.serverVersions.split()
+        finally:
+            self._lock.read_out()
+
         current = mixminion.__version__
         if current in allowed:
             # This version is recommended.
@@ -1051,7 +1255,7 @@
         nickname = desc.getNickname()
 
         if self.headers:
-            #XXXX007 remove this eventually.
+            #XXXX008 remove this eventually.
             sware = desc['Server'].get("Software","")
             if (sware.startswith("Mixminion 0.0.4") or
                 sware.startswith("Mixminion 0.0.5alpha1")):

Index: ClientMain.py
===================================================================
RCS file: /home/minion/cvsroot/src/minion/lib/mixminion/ClientMain.py,v
retrieving revision 1.148
retrieving revision 1.149
diff -u -d -r1.148 -r1.149
--- ClientMain.py	7 Jan 2004 20:42:49 -0000	1.148
+++ ClientMain.py	8 Jan 2004 22:35:24 -0000	1.149
@@ -463,10 +463,6 @@
         else:
             handles = self.queuePackets(pktList, routingInfo)
 
-        if len(pktList) > 1:
-            mword = "packets"
-        else:
-            mword = "packet"
 
         packetsSentByIndex = {}
         def callback(idx, packetsSentByIndex=packetsSentByIndex):
@@ -490,7 +486,12 @@
         try:
             clientLock()
             if nGood:
+                if len(pktList) > 1:
+                    mword = "packets"
+                else:
+                    mword = "packet"
                 LOG.info("... %s %s sent", nGood, mword)
+                LOG.trace("Removing %s successful packets from queue", nGood)
             for idx in packetsSentByIndex.keys():
                 if handles and handles[idx]:
                     self.queue.removePacket(handles[idx])
@@ -507,8 +508,8 @@
                 LOG.error("Error with queueing disabled: %s/%s lost",
                           nBad, nGood+nBad)
             elif nBad and lazyQueue:
-                LOG.info("Error while delivering %s; %s/%s left in queue",
-                         mword,nBad,nGood+nBad)
+                LOG.info("Error while delivering packets; %s/%s left in queue",
+                         nBad,nGood+nBad)
 
                 badPackets = [ pktList[idx] for idx in xrange(len(pktList))
                                if not packetsSentByIndex.has_key(idx) ]
@@ -893,8 +894,10 @@
 
         if self.wantClientDirectory:
             assert self.wantConfig
+            assert _CLIENT_LOCKFILE
             LOG.debug("Configuring server list")
-            self.directory = mixminion.ClientDirectory.ClientDirectory(userdir)
+            self.directory = mixminion.ClientDirectory.ClientDirectory(
+                userdir, _CLIENT_LOCKFILE)
             self.directory._installAsKeyIDResolver()
 
         if self.wantDownload:
@@ -909,7 +912,7 @@
                     clientUnlock()
 
         if self.wantClientDirectory or self.wantDownload:
-            self.directory.checkClientVersion()
+            self.directory.checkSoftwareVersion(client=1)
 
     def parsePath(self):
         # Sets: exitAddress, pathSpec.

Index: test.py
===================================================================
RCS file: /home/minion/cvsroot/src/minion/lib/mixminion/test.py,v
retrieving revision 1.175
retrieving revision 1.176
diff -u -d -r1.175 -r1.176
--- test.py	8 Jan 2004 18:09:49 -0000	1.175
+++ test.py	8 Jan 2004 22:35:24 -0000	1.176
@@ -55,8 +55,10 @@
 import mixminion.Filestore
 import mixminion.Fragments
 import mixminion.MMTPClient
+import mixminion.NetUtils
 import mixminion.Packet
 import mixminion.ServerInfo
+import mixminion.ThreadUtils
 import mixminion.TLSConnection
 import mixminion._minionlib as _ml
 import mixminion.server.MMTPServer
@@ -722,9 +724,8 @@
                           "A B C\n-----END X-----\n", ["X"], 1)
 
     def test_clearableQueue(self):
-        import Queue
         #FFFF This test is inadequate for weird multithreaded
-        q = mixminion.Common.ClearableQueue()
+        q = mixminion.ThreadUtils.ClearableQueue()
         self.assert_(q.empty())
         q.put(1)
         q.put(2)
@@ -732,7 +733,20 @@
         self.assertEquals(1, q.get())
         q.clear()
         self.assert_(q.empty())
-        self.assertRaises(Queue.Empty, q.get_nowait)
+        self.assertRaises(mixminion.ThreadUtils.QueueEmpty, q.get_nowait)
+
+    def test_rwlock(self):
+        RWLock = mixminion.ThreadUtils.RWLock
+        lock = RWLock()
+        # XXXX007 This only tests non-blocking cases
+        lock.read_in()
+        lock.read_in()
+        lock.read_out()
+        lock.read_out()
+        lock.write_in()
+        lock.write_out()
+        lock.read_in()
+        lock.read_out()
 
     def test_englishSequence(self):
         es = englishSequence
@@ -6544,9 +6558,9 @@
             self.assertSameSD(edesc["Joe"][1],
                               ks.getServerInfo("Joe", startAt=now+10*oneDay))
             self.assertRaises(MixError, ks.getServerInfo, "Joe",
-                              startAt=now+30*oneDay)
+                              startAt=now+30*oneDay, strict=1)
             self.assertRaises(MixError, ks.getServerInfo, "Joe", startAt=now,
-                              endAt=now+6*oneDay)
+                              endAt=now+6*oneDay, strict=1)
             if i in (0,1,2):
                 ks = mixminion.ClientDirectory.ClientDirectory(dirname)
             if i == 1:
@@ -6959,7 +6973,7 @@
         ks.clean() # Should do nothing.
         ks = mixminion.ClientDirectory.ClientDirectory(dirname)
         ks.clean(now=now+oneDay*500) # Should zap all of imported servers.
-        raises(MixError, ks.getServerInfo, "Lola")
+        raises(MixError, ks.getServerInfo, "Lola", strict=1)
 
     def testFeatureMaps(self):
         from mixminion.ClientDirectory import compressFeatureMap
@@ -7501,7 +7515,7 @@
     tc = loader.loadTestsFromTestCase
 
     if 0:
-        suite.addTest(tc(FragmentTests))
+        suite.addTest(tc(ClientDirectoryTests))
         return suite
     testClasses = [MiscTests,
                    MinionlibCryptoTests,