[Author Prev][Author Next][Thread Prev][Thread Next][Author Index][Thread Index]

[minion-cvs] Document and debug changes to ClientDirectory.



Update of /home/minion/cvsroot/src/minion/lib/mixminion
In directory moria.mit.edu:/tmp/cvs-serv16549

Modified Files:
	ClientDirectory.py 
Log Message:
Document and debug changes to ClientDirectory.

When downloading directories, compare HTTP "Date" header to the
current date, and warn about skew.



Index: ClientDirectory.py
===================================================================
RCS file: /home/minion/cvsroot/src/minion/lib/mixminion/ClientDirectory.py,v
retrieving revision 1.41
retrieving revision 1.42
diff -u -d -r1.41 -r1.42
--- ClientDirectory.py	17 May 2004 21:40:15 -0000	1.41
+++ ClientDirectory.py	27 Jul 2004 03:05:23 -0000	1.42
@@ -21,6 +21,7 @@
 import threading
 import time
 import types
+import rfc822
 import urllib2
 
 import mixminion.Config
@@ -49,13 +50,27 @@
     """Raised when we have downloaded an invalid directory."""
 
 class _DescriptorSourceSharedState:
-    """DOCDOC"""
+    """Holds data shared in common across several descriptor sources.
+       Remembers which descriptors we've validated in the past, so we don't
+       need to do public-key operations when we see a descriptor more than
+       once.
+    """
+    ##Fields:
+    # digestMap: A map from 20-byte descriptor digests to server descriptor
+    #   expiry times.
+    # _changed: True iff digestMap has been changed since we last loaded or
+    #   saved.  (When saving, other classes set _changed to 0 for us.)
+
+    # Used to identify version when pickling
     MAGIC = "DSSS-0.1"
+    # How long do we hang on expired digests?
     EXPIRY_SLOPPINESS = 7200
     def __init__(self):
+        """Create a new _DescriptorSourceSharedState"""
         self.digestMap = {}
         self._changed = 1
     def clean(self, now=None):
+        """Forget about all descriptor digests that are expired."""
         if now is None:
             now = time.time()
         cutoff = now + self.EXPIRY_SLOPPINESS
@@ -64,10 +79,17 @@
                 del self.digestMap[k]
                 self._changed = 1
     def hasChanged(self):
+        """Return true iff this object has changd since we last loaded,
+           or since we last explicitly set self._changed to false."""
         return self._changed
     def _addDigest(self,s):
-        self.digestMap[s.getDigest()] = s['Server']['Valid-Until']
-        self._changed = 1
+        """Add the digest for the ServerInfo 's' to the set of validated
+           descriptors.
+        """
+        d = s.getDigest()
+        if not self.digestMap.has_key(d):
+            self.digestMap[d] = s['Server']['Valid-Until']
+            self._changed = 1
     def __getstate__(self):
         return self.MAGIC, self.digestMap
     def __setstate__(self,state):
@@ -81,36 +103,70 @@
             self._changed = 0
 
 class DescriptorSource:
-    """DOCDOC"""
+    """Abstract class for a container holding server descriptors.
+
+       Note that DescriptorSources are managed by
+       CachingDescriptorSource and loadCachingDescriptorSource below;
+       subclasses should assume that they are called accordingly.
+    """
+    ## Fields:
+    # _s: an instance of _DescriptorSourceSharedState
+    # _changed: true iff there is information in this DescriptorSource that
+    #   needs to be flushed to disk.
     def __init__(self):
+        """Create a new DescriptorSource"""
         assert self.__class__ != DescriptorSource
         self._s = None
         self._changed = 0
     def hasChanged(self):
+        """Return true iff self has information that needs to be
+           flushed to disk"""
         return self._changed
     def getServerList(self):
+        """Return a list of all ServerInfo objects in self."""
         raise NotImplemented()
     def getRecommendedNicknames(self):
+        """Return a list of the nicknames of all 'recommended' ServerInfo
+           objects in self."""
         return []
     def configure(self, config):
+        """Configure self based on a client or server configuration in
+           'config'. """
         pass
     def update(self, force=0, now=None, lock=None):
+        """Retrieve any new information needed for this store, for
+           example by downloading a fresh directory.  If 'force' is
+           true, then update the information regardless of whether we
+           think we need to.  If 'lock' is provided, use 'lock' to protect
+           the critical sections of the update.
+        """
         pass
     def rescan(self, force=0):
+        """Update the state of self based on any local changes."""
         pass
     def clean(self, now=None):
+        """Remove any out-of-date information from this object."""
         pass
     def save(self):
+        """Flush any changes from this object to disk."""
         self._changed = 0
     def _setSharedState(self,state):
+        """Helper: set the shared state for this object to 'state'."""
         self._s = state
 
 class FSBackedDescriptorSource(DescriptorSource):
-    # directory
-    # servers { fname -> (mtime, server descriptor) }
+    """A FSBackedDescriptorStore holds a number of server descriptors in a
+       filesystem, one per file.  All files are kept in a single directory,
+       and are managed by the FSBackedDescriptorSource object.
+    """
+    ## Fields:
+    # directory: the location for this store on the filesystem
+    # servers: A map from filename within the directory to tuples of
+    #     (file mtime, ServerInfo).
     MAGIC = "FBBDS-0.1"
     EXPIRY_SLOPPINESS = 7200
     def __init__(self, state):
+        """Create a new FSBackedDescriptorSource"""
         DescriptorSource.__init__(self)
         self._setSharedState(state)
         self.directory = None
@@ -122,26 +178,38 @@
         return [ s for _,s in self.servers.values() ]
 
     def getRecommendedNicknames(self):
-        # XXXX008 !!!! Major security implications here. Reconsider.
+        # XXXX008 !!!! Major security implications here: are all the
+        # descriptors on disk considered recommended?  Reconsider
+        # this.
         return [ s.getNickname().lower() for _,s in self.servers.values() ]
 
     def configure(self, config):
+        """Set up the directory where imported descriptors are stored."""
         self.directory = os.path.join(config.getDirectoryRoot(),
                                       "imported")
         createPrivateDir(self.directory)
 
     def rescan(self, force=0):
+        """Scan all the files in the underlying directory.  If there are any
+           new descriptors, or if any descriptors have changed, then
+           rescan them.  If force is true, reload all descriptors no matter
+           what.
+        """
         if self.directory is None:
             return
         if force:
             self.servers = {}
+            self._changed = 1
 
+        fnames = {}
+        # Rescan any files that are new, or that have changed.
         for fname in os.listdir(self.directory):
+            fnames[fname] = 1
             fullname = os.path.join(self.directory, fname)
             try:
                 mtime = long(os.stat(fullname)[stat.ST_MTIME])
             except OSError:
-                LOG.warn("Missing file %s", fullname)
+                LOG.warn("Unable to stat file %s", fullname)
                 del self.servers[fname]
                 continue
             if (self.servers.has_key(fname) and
@@ -157,9 +225,15 @@
                 continue
             self.servers[fname] = (mtime, s)
             self._s._addDigest(s)
-        self._changed = 1
+            self._changed = 1
+        # Remove any servers whose files are missing.
+        for fname in self.servers.keys():
+            if not fnames.has_key(fname):
+                del self.servers[fname]
+                self._changed = 1
 
     def clean(self, now=None):
+        """Remove all expired or superseded servers from the directory."""
         if now is None:
             now = time.time()
         cutoff = now + self.EXPIRY_SLOPPINESS
@@ -181,9 +255,15 @@
             self._removeOne(fname)
 
     def _removeOne(self, fname):
+        """Helper: remove the file fname from the directory."""
         tryUnlink(os.path.join(self.directory, fname))
 
     def importFromFile(self, sourceFname):
+        """Read and validate a descriptor stored in the possibly
+           gzipped file 'sourceFName'.  If the descriptor is valid, and not
+           already stored in this directory, and not superseded, then copy it
+           in.  Does *not* remove any descriptors superseded by sourceFName.
+        """
         contents = readPossiblyGzippedFile(sourceFname)
         try:
             s = mixminion.ServerInfo.ServerInfo(string=contents,assumeValid=0)
@@ -213,6 +293,8 @@
         self._changed = 1
 
     def expungeByNickname(self, nickname):
+        """Remove all descriptors for the server 'nickname' from the directory.
+        """
         badFnames = {}
         for fname, (_, sd) in self.servers.items():
             if sd.getNickname().lower() == nickname.lower():
@@ -239,9 +321,22 @@
             self._changed = 0
 
 class DirectoryBackedDescriptorSource(DescriptorSource):
-    """DOCDOC"""
+    """A DirectoryBakedDescriptorSource gets server descriptors by
+        reading directories from directory servers, and caching them
+        on disk.
+    """
+    ## Fields:
+    # fnameBase: The name of the file where we'll store a cached directory.
+    #   We may append '.gz' or '_new' or '_new.gz' as appropriate.
+    # serverDir: An instance of mixminion.ServerInfo.ServerDirectory, or
+    #   None.
+    # lastDownload: When did we last download the directory?
+    # __downloading: Boolean: are we currently downloading a new directory?
+    # timeout: How long do we wait when trying to download?  A number
+    #   of seconds, or None.
     MAGIC = "BDBS-0.1"
     def __init__(self, state):
+        """Create a new DirectoryBackedDescriptorSource"""
         DescriptorSource.__init__(self)
         self._setSharedState(state)
         self.fnameBase = None
@@ -264,15 +359,14 @@
             return self.serverDir.getRecommendedNicknames()
 
     def getRecommendedVersions(self):
+        """Return a 2-tuple of the software versions recommended for clients
+           and servers by the directory."""
         if self.serverDir == None:
             return [], []
         sec = self.serverDir['Recommended-Software']
         return (sec.get("MixminionClient",[]),
                 sec.get("MixminionServer",[]))
 
-    def getRecommendedServerVersions(self):
-        return self.serverDir['Recommended-Software'].get("MixminionClient",[])
-
     def configure(self, config):
         self.fnameBase = os.path.join(config.getDirectoryRoot(), "dir")
         createPrivateDir(config.getDirectoryRoot())
@@ -287,6 +381,7 @@
             return
         try:
             serverDir = None
+            # Check "dir" and "dir.gz"
             for ext in "", ".gz":
                 fname = self.fnameBase + ext
                 if not os.path.exists(fname):
@@ -306,10 +401,15 @@
         self._changed = 1
 
     def update(self, force=0, now=None, lock=None):
-        self.updateDirectory(forceDownload=force,now=now)
+        self.updateDirectory(forceDownload=force,now=now,lock=lock)
 
     def updateDirectory(self, forceDownload=0, url=None,
                         now=None, lock=None):
+        """Download a directory if necessary, or if 'forceDownload is true.
+
+           Same behavior as self.update, except that you may configure
+           a non-default URL.
+        """
         if now is None:
             now = time.time()
         if url is None:
@@ -317,12 +417,13 @@
 
         if (self.serverDir is None or forceDownload or
             self.lastDownload < previousMidnight(now)):
-            self.downloadDirectory(url=url,lock=None)
+            self.downloadDirectory(url=url,lock=lock)
         else:
             LOG.debug("Directory is up to date.")
 
     def downloadDirectory(self, url=MIXMINION_DIRECTORY_URL,
                           lock=None):
+        """Fetch a new directory."""
         if self.__downloading:
             LOG.info("Download already in progress")
             return 0
@@ -330,7 +431,29 @@
         self._downloadDirectoryImpl(url,lock)
         self.__downloading = 0
 
+    def _warnIfSkewed(self, dateHeader, expected=None):
+        """We just fetched a directory and got 'dateHeader' as the
+           Date header in the HTTP response.  Parse the date, and warn
+           if the date is too far from what we believe the current
+           time to be.
+        """
+        if expected is None:
+            expected = time.time()
+        try:
+            parsedDate = rfc822.parsedate_tz(dateHeader)
+        except ValueError:
+            LOG.warn("Invalid date header from directory: %r",dateHeader)
+            return
+        if not parsedDate: return
+
+        LOG.trace("Directory server said date is %r", dateHeader)
+        skew = (expected - rfc822.mktime_tz(parsedDate))/60.0
+        if abs(skew) > 30:
+            LOG.warn("The directory said that the date is %r; we are skewed by %+d minutes",
+                     dateHeader, skew)
+
     def _downloadDirectoryImpl(self, url, lock=None):
+        """Helper function: does the actual work of fetching a directory."""
         LOG.info("Downloading directory from %s", url)
         # XXXX Refactor download logic.
         if self.timeout:
@@ -345,6 +468,7 @@
                 request = urllib2.Request(url,
                           headers={ 'Pragma' : 'no-cache',
                                     'Cache-Control' : 'no-cache', })
+                startTime = time.time()
                 infile = urllib2.urlopen(request)
             except IOError, e:
                 #XXXX008 the "-D no" note makes no sense for servers.
@@ -381,6 +505,9 @@
         infile.close()
         outfile.close()
 
+        dateHeader = infile.info().get("Date","")
+        if dateHeader: self._warnIfSkewed(dateHeader, expected=startTime)
+
         # Open and validate the directory
         LOG.info("Validating directory")
 
@@ -436,9 +563,18 @@
         self.timeout = None
 
 class CachingDescriptorSource(DescriptorSource):
-    """DOCDOC"""
+    """A CachingDescriptorSource aggregates several base DescriptorSources,
+       combines their results, and handles caching their descriptors.
+
+       Method calls to methods supported only by a single DescriptorSource
+       are delegated to the appropriate object.
+    """
+    ##Fields:
+    # bases: a list of DescriptorSource objects to delegate to.
+    # cacheFile: filename to store our cache in.
     MAGIC = "CDS-0.1"
     def __init__(self,state):
+        """Create a new CachingDescriptorSource."""
         DescriptorSource.__init__(self)
         self.bases = []
         self._setSharedState(state)
@@ -513,14 +649,23 @@
         self._s._changed = 0
 
     def __getattr__(self, attr):
+        candidate = None
         for b in self.bases:
             o = getattr(b,attr,None)
             if isinstance(o, types.MethodType):
-                return o
-        raise AttributeError(attr)
+                if candidate is None:
+                    candidate = o
+                else:
+                    raise AttributeError("Too many options for %s"%attr)
+        if candidate is None:
+            raise AttributeError(attr)
+        else:
+            return candidate
 
 def loadCachingDescriptorSource(config):
-    """DOCDOC"""
+    """Return an instance of CachingDescriptorSource for our current
+       configuration, loading it from disk as necessary.
+    """
     if hasattr(config, 'isServerConfig') and config.isServerConfig():
         isServer = 1
     else:
@@ -556,6 +701,28 @@
     return store
 
 class ClientDirectory:
+    """Utility wrapper around a CachingDescriptorSource to handle common
+       functionality such as server lookup, path generation, and so on.
+    """
+    ## Fields:
+    # _lock: An instance of RWLock; protects all modifications to self or
+    #    to the underlying sources.
+    # _diskLock: A lock to protect all access to the disk.
+    # store: An instance of DescriptorSource.
+    ## Fields derived from self.source:
+    # allServers: A list of all known server descriptors.
+    # clientVersions, serverVersions: Lists of recommended software.
+    # goodNicknames: a dict whose keys are all recommended
+    #   nicknames. (lowercase)
+    # goodServers: A list of all server descriptors whose nicknames
+    #   are recommended.
+    # byNickname: A map from lowercase nickname to a list of ServerInfo with
+    #   that nickname.
+    # byKeyID: A map from identity key digest to a list of ServerInfo for that
+    #   server.
+    # blockedNicknames: a map from lowercase nickname to a list of the purposes
+    #   ('entry', 'exit', or '*') for which the corresponding server shouldn't
+    #   be selected in automatic path generation.  Set by configure.
     def __init__(self, config=None, store=None, diskLock=None):
         self._lock = RWLock()
         if diskLock is None:
@@ -575,7 +742,10 @@
         self.__scan()
 
     def __scan(self):
-        # Must hold write lock if other threads can reach this object.
+        """Helper: update all fields derived from self.store.
+
+           Must hold write lock if other threads can reach this object.
+        """
         self.allServers = self.store.getServerList()
         self.clientVersions, self.serverVersions = \
                              self.store.getRecommendedVersions()
@@ -595,6 +765,9 @@
                 self.goodServers.append(s)
 
     def flush(self):
+        """Save any pending changes to disk, and update all derivative
+           fields that would need to change.
+        """
         self._diskLock.acquire()
         self._lock.write_in()
         try:
@@ -606,8 +779,13 @@
             self._diskLock.release()
 
     def __scanAsNeeded(self):
+        """Helper: if there are any changes in the underlying store, then
+           flush them to disk.
+
+           Callers should hold no locks; a little faster than just calling
+           'flush'.
+        """
         #XXXX008 some calls are probably needless.
-        # should hold no locks.
         self._lock.read_in()
         try:
             if not self.store.hasChanged():
@@ -618,6 +796,7 @@
         self.flush()
 
     def configure(self,config):
+        """ """
         self._lock.write_in()
         try:
             self.store.configure(config)
@@ -639,6 +818,7 @@
             self._lock.write_out()
 
     def save(self):
+        """Flush all changes to disk, whether we need to or not."""
         self._diskLock.acquire()
         self._lock.read_in()
         try:
@@ -648,6 +828,7 @@
             self._diskLock.release()
 
     def rescan(self,force=0):
+        """Rescan the underlying source."""
         self._diskLock.acquire()
         self._lock.write_in()
         try:
@@ -655,8 +836,10 @@
         finally:
             self._lock.write_out()
             self._diskLock.release()
+        self.__scanAsNeeded()
 
     def update(self, force=0, now=None):
+        """Download a directory as needed."""
         self._diskLock.acquire()
         try:
             self.store.update(force=force,now=now,lock=self._lock)
@@ -665,6 +848,7 @@
         self.__scanAsNeeded()
 
     def clean(self, now=None):
+        """Remove expired and superseded descriptors."""
         self._diskLock.acquire()
         self._lock.write_in()
         try:
@@ -675,6 +859,7 @@
         self.__scanAsNeeded()
 
     def importFromFile(self, sourceFname):
+        """See FSBackedDescriptorSource.importFromFile"""
         self._diskLock.acquire()
         self._lock.write_in()
         try:
@@ -688,6 +873,7 @@
         self.__scanAsNeeded()
 
     def expungeByNickname(self, nickname):
+        """See FSBackedDescriptorSource.expungeByNickname"""
         self._diskLock.acquire()
         self._lock.write_in()
         try:
@@ -700,6 +886,10 @@
             self._diskLock.release()
         self.__scanAsNeeded()
 
+    def getAllServers(self):
+        """DOCDOC"""
+        return self.allServers
+
     def _installAsKeyIDResolver(self):
         """Use this ClientDirectory to identify servers in calls to
            ServerInfo.displayServer*.
@@ -1059,8 +1249,7 @@
         return paths
 
     def getPath(self, template, startAt=None, endAt=None, prng=None):
-        """Workhorse method for path selection.  Given a template, and
-           a capability that must be supported by the exit node, return
+        """Workhorse method for path selection.  Given a template, return
            a list of serverinfos that 'matches' the template, and whose
            last node provides exitCap.