[Author Prev][Author Next][Thread Prev][Thread Next][Author Index][Thread Index]
[minion-cvs] Document and debug changes to ClientDirectory.
Update of /home/minion/cvsroot/src/minion/lib/mixminion
In directory moria.mit.edu:/tmp/cvs-serv16549
Modified Files:
ClientDirectory.py
Log Message:
Document and debug changes to ClientDirectory.
When downloading directories, compare HTTP "Date" header to the
current date, and warn about skew.
Index: ClientDirectory.py
===================================================================
RCS file: /home/minion/cvsroot/src/minion/lib/mixminion/ClientDirectory.py,v
retrieving revision 1.41
retrieving revision 1.42
diff -u -d -r1.41 -r1.42
--- ClientDirectory.py 17 May 2004 21:40:15 -0000 1.41
+++ ClientDirectory.py 27 Jul 2004 03:05:23 -0000 1.42
@@ -21,6 +21,7 @@
import threading
import time
import types
+import rfc822
import urllib2
import mixminion.Config
@@ -49,13 +50,27 @@
"""Raised when we have downloaded an invalid directory."""
class _DescriptorSourceSharedState:
- """DOCDOC"""
+ """Holds data shared in common across several descriptor sources.
+ Remembers which descriptors we've validated in the past, so we don't
+ need to do public-key operations when we see a descriptor more than
+ once.
+ """
+ ##Fields:
+ # digestMap: A map from 20-byte descriptor digests to server descriptor
+ # expiry times.
+ # _changed: True iff digestMap has been changed since we last loaded or
+ # saved. (When saving, other classes set _changed to 0 for us.)
+
+ # Used to identify version when pickling
MAGIC = "DSSS-0.1"
+ # How long do we hang on expired digests?
EXPIRY_SLOPPINESS = 7200
def __init__(self):
+ """Create a new _DescriptorSourceSharedState"""
self.digestMap = {}
self._changed = 1
def clean(self, now=None):
+ """Forget about all descriptor digests that are expired."""
if now is None:
now = time.time()
cutoff = now + self.EXPIRY_SLOPPINESS
@@ -64,10 +79,17 @@
del self.digestMap[k]
self._changed = 1
def hasChanged(self):
+ """Return true iff this object has changd since we last loaded,
+ or since we last explicitly set self._changed to false."""
return self._changed
def _addDigest(self,s):
- self.digestMap[s.getDigest()] = s['Server']['Valid-Until']
- self._changed = 1
+ """Add the digest for the ServerInfo 's' to the set of validated
+ descriptors.
+ """
+ d = s.getDigest()
+ if not self.digestMap.has_key(d):
+ self.digestMap[d] = s['Server']['Valid-Until']
+ self._changed = 1
def __getstate__(self):
return self.MAGIC, self.digestMap
def __setstate__(self,state):
@@ -81,36 +103,70 @@
self._changed = 0
class DescriptorSource:
- """DOCDOC"""
+ """Abstract class for a container holding server descriptors.
+
+ Note that DescriptorSources are managed by
+ CachingDescriptorSource and loadCachingDescriptorSource below;
+ subclasses should assume that they are called accordingly.
+ """
+ ## Fields:
+ # _s: an instance of _DescriptorSourceSharedState
+ # _changed: true iff there is information in this DescriptorSource that
+ # needs to be flushed to disk.
def __init__(self):
+ """Create a new DescriptorSource"""
assert self.__class__ != DescriptorSource
self._s = None
self._changed = 0
def hasChanged(self):
+ """Return true iff self has information that needs to be
+ flushed to disk"""
return self._changed
def getServerList(self):
+ """Return a list of all ServerInfo objects in self."""
raise NotImplemented()
def getRecommendedNicknames(self):
+ """Return a list of the nicknames of all 'recommended' ServerInfo
+ objects in self."""
return []
def configure(self, config):
+ """Configure self based on a client or server configuration in
+ 'config'. """
pass
def update(self, force=0, now=None, lock=None):
+ """Retrieve any new information needed for this store, for
+ example by downloading a fresh directory. If 'force' is
+ true, then update the information regardless of whether we
+ think we need to. If 'lock' is provided, use 'lock' to protect
+ the critical sections of the update.
+ """
pass
def rescan(self, force=0):
+ """Update the state of self based on any local changes."""
pass
def clean(self, now=None):
+ """Remove any out-of-date information from this object."""
pass
def save(self):
+ """Flush any changes from this object to disk."""
self._changed = 0
def _setSharedState(self,state):
+ """Helper: set the shared state for this object to 'state'."""
self._s = state
class FSBackedDescriptorSource(DescriptorSource):
- # directory
- # servers { fname -> (mtime, server descriptor) }
+ """A FSBackedDescriptorStore holds a number of server descriptors in a
+ filesystem, one per file. All files are kept in a single directory,
+ and are managed by the FSBackedDescriptorSource object.
+ """
+ ## Fields:
+ # directory: the location for this store on the filesystem
+ # servers: A map from filename within the directory to tuples of
+ # (file mtime, ServerInfo).
MAGIC = "FBBDS-0.1"
EXPIRY_SLOPPINESS = 7200
def __init__(self, state):
+ """Create a new FSBackedDescriptorSource"""
DescriptorSource.__init__(self)
self._setSharedState(state)
self.directory = None
@@ -122,26 +178,38 @@
return [ s for _,s in self.servers.values() ]
def getRecommendedNicknames(self):
- # XXXX008 !!!! Major security implications here. Reconsider.
+ # XXXX008 !!!! Major security implications here: are all the
+ # descriptors on disk considered recommended? Reconsider
+ # this.
return [ s.getNickname().lower() for _,s in self.servers.values() ]
def configure(self, config):
+ """Set up the directory where imported descriptors are stored."""
self.directory = os.path.join(config.getDirectoryRoot(),
"imported")
createPrivateDir(self.directory)
def rescan(self, force=0):
+ """Scan all the files in the underlying directory. If there are any
+ new descriptors, or if any descriptors have changed, then
+ rescan them. If force is true, reload all descriptors no matter
+ what.
+ """
if self.directory is None:
return
if force:
self.servers = {}
+ self._changed = 1
+ fnames = {}
+ # Rescan any files that are new, or that have changed.
for fname in os.listdir(self.directory):
+ fnames[fname] = 1
fullname = os.path.join(self.directory, fname)
try:
mtime = long(os.stat(fullname)[stat.ST_MTIME])
except OSError:
- LOG.warn("Missing file %s", fullname)
+ LOG.warn("Unable to stat file %s", fullname)
del self.servers[fname]
continue
if (self.servers.has_key(fname) and
@@ -157,9 +225,15 @@
continue
self.servers[fname] = (mtime, s)
self._s._addDigest(s)
- self._changed = 1
+ self._changed = 1
+ # Remove any servers whose files are missing.
+ for fname in self.servers.keys():
+ if not fnames.has_key(fname):
+ del self.servers[fname]
+ self._changed = 1
def clean(self, now=None):
+ """Remove all expired or superseded servers from the directory."""
if now is None:
now = time.time()
cutoff = now + self.EXPIRY_SLOPPINESS
@@ -181,9 +255,15 @@
self._removeOne(fname)
def _removeOne(self, fname):
+ """Helper: remove the file fname from the directory."""
tryUnlink(os.path.join(self.directory, fname))
def importFromFile(self, sourceFname):
+ """Read and validate a descriptor stored in the possibly
+ gzipped file 'sourceFName'. If the descriptor is valid, and not
+ already stored in this directory, and not superseded, then copy it
+ in. Does *not* remove any descriptors superseded by sourceFName.
+ """
contents = readPossiblyGzippedFile(sourceFname)
try:
s = mixminion.ServerInfo.ServerInfo(string=contents,assumeValid=0)
@@ -213,6 +293,8 @@
self._changed = 1
def expungeByNickname(self, nickname):
+ """Remove all descriptors for the server 'nickname' from the directory.
+ """
badFnames = {}
for fname, (_, sd) in self.servers.items():
if sd.getNickname().lower() == nickname.lower():
@@ -239,9 +321,22 @@
self._changed = 0
class DirectoryBackedDescriptorSource(DescriptorSource):
- """DOCDOC"""
+ """A DirectoryBakedDescriptorSource gets server descriptors by
+ reading directories from directory servers, and caching them
+ on disk.
+ """
+ ## Fields:
+ # fnameBase: The name of the file where we'll store a cached directory.
+ # We may append '.gz' or '_new' or '_new.gz' as appropriate.
+ # serverDir: An instance of mixminion.ServerInfo.ServerDirectory, or
+ # None.
+ # lastDownload: When did we last download the directory?
+ # __downloading: Boolean: are we currently downloading a new directory?
+ # timeout: How long do we wait when trying to download? A number
+ # of seconds, or None.
MAGIC = "BDBS-0.1"
def __init__(self, state):
+ """Create a new DirectoryBackedDescriptorSource"""
DescriptorSource.__init__(self)
self._setSharedState(state)
self.fnameBase = None
@@ -264,15 +359,14 @@
return self.serverDir.getRecommendedNicknames()
def getRecommendedVersions(self):
+ """Return a 2-tuple of the software versions recommended for clients
+ and servers by the directory."""
if self.serverDir == None:
return [], []
sec = self.serverDir['Recommended-Software']
return (sec.get("MixminionClient",[]),
sec.get("MixminionServer",[]))
- def getRecommendedServerVersions(self):
- return self.serverDir['Recommended-Software'].get("MixminionClient",[])
-
def configure(self, config):
self.fnameBase = os.path.join(config.getDirectoryRoot(), "dir")
createPrivateDir(config.getDirectoryRoot())
@@ -287,6 +381,7 @@
return
try:
serverDir = None
+ # Check "dir" and "dir.gz"
for ext in "", ".gz":
fname = self.fnameBase + ext
if not os.path.exists(fname):
@@ -306,10 +401,15 @@
self._changed = 1
def update(self, force=0, now=None, lock=None):
- self.updateDirectory(forceDownload=force,now=now)
+ self.updateDirectory(forceDownload=force,now=now,lock=lock)
def updateDirectory(self, forceDownload=0, url=None,
now=None, lock=None):
+ """Download a directory if necessary, or if 'forceDownload is true.
+
+ Same behavior as self.update, except that you may configure
+ a non-default URL.
+ """
if now is None:
now = time.time()
if url is None:
@@ -317,12 +417,13 @@
if (self.serverDir is None or forceDownload or
self.lastDownload < previousMidnight(now)):
- self.downloadDirectory(url=url,lock=None)
+ self.downloadDirectory(url=url,lock=lock)
else:
LOG.debug("Directory is up to date.")
def downloadDirectory(self, url=MIXMINION_DIRECTORY_URL,
lock=None):
+ """Fetch a new directory."""
if self.__downloading:
LOG.info("Download already in progress")
return 0
@@ -330,7 +431,29 @@
self._downloadDirectoryImpl(url,lock)
self.__downloading = 0
+ def _warnIfSkewed(self, dateHeader, expected=None):
+ """We just fetched a directory and got 'dateHeader' as the
+ Date header in the HTTP response. Parse the date, and warn
+ if the date is too far from what we believe the current
+ time to be.
+ """
+ if expected is None:
+ expected = time.time()
+ try:
+ parsedDate = rfc822.parsedate_tz(dateHeader)
+ except ValueError:
+ LOG.warn("Invalid date header from directory: %r",dateHeader)
+ return
+ if not parsedDate: return
+
+ LOG.trace("Directory server said date is %r", dateHeader)
+ skew = (expected - rfc822.mktime_tz(parsedDate))/60.0
+ if abs(skew) > 30:
+ LOG.warn("The directory said that the date is %r; we are skewed by %+d minutes",
+ dateHeader, skew)
+
def _downloadDirectoryImpl(self, url, lock=None):
+ """Helper function: does the actual work of fetching a directory."""
LOG.info("Downloading directory from %s", url)
# XXXX Refactor download logic.
if self.timeout:
@@ -345,6 +468,7 @@
request = urllib2.Request(url,
headers={ 'Pragma' : 'no-cache',
'Cache-Control' : 'no-cache', })
+ startTime = time.time()
infile = urllib2.urlopen(request)
except IOError, e:
#XXXX008 the "-D no" note makes no sense for servers.
@@ -381,6 +505,9 @@
infile.close()
outfile.close()
+ dateHeader = infile.info().get("Date","")
+ if dateHeader: self._warnIfSkewed(dateHeader, expected=startTime)
+
# Open and validate the directory
LOG.info("Validating directory")
@@ -436,9 +563,18 @@
self.timeout = None
class CachingDescriptorSource(DescriptorSource):
- """DOCDOC"""
+ """A CachingDescriptorSource aggregates several base DescriptorSources,
+ combines their results, and handles caching their descriptors.
+
+ Method calls to methods supported only by a single DescriptorSource
+ are delegated to the appropriate object.
+ """
+ ##Fields:
+ # bases: a list of DescriptorSource objects to delegate to.
+ # cacheFile: filename to store our cache in.
MAGIC = "CDS-0.1"
def __init__(self,state):
+ """Create a new CachingDescriptorSource."""
DescriptorSource.__init__(self)
self.bases = []
self._setSharedState(state)
@@ -513,14 +649,23 @@
self._s._changed = 0
def __getattr__(self, attr):
+ candidate = None
for b in self.bases:
o = getattr(b,attr,None)
if isinstance(o, types.MethodType):
- return o
- raise AttributeError(attr)
+ if candidate is None:
+ candidate = o
+ else:
+ raise AttributeError("Too many options for %s"%attr)
+ if candidate is None:
+ raise AttributeError(attr)
+ else:
+ return candidate
def loadCachingDescriptorSource(config):
- """DOCDOC"""
+ """Return an instance of CachingDescriptorSource for our current
+ configuration, loading it from disk as necessary.
+ """
if hasattr(config, 'isServerConfig') and config.isServerConfig():
isServer = 1
else:
@@ -556,6 +701,28 @@
return store
class ClientDirectory:
+ """Utility wrapper around a CachingDescriptorSource to handle common
+ functionality such as server lookup, path generation, and so on.
+ """
+ ## Fields:
+ # _lock: An instance of RWLock; protects all modifications to self or
+ # to the underlying sources.
+ # _diskLock: A lock to protect all access to the disk.
+ # store: An instance of DescriptorSource.
+ ## Fields derived from self.source:
+ # allServers: A list of all known server descriptors.
+ # clientVersions, serverVersions: Lists of recommended software.
+ # goodNicknames: a dict whose keys are all recommended
+ # nicknames. (lowercase)
+ # goodServers: A list of all server descriptors whose nicknames
+ # are recommended.
+ # byNickname: A map from lowercase nickname to a list of ServerInfo with
+ # that nickname.
+ # byKeyID: A map from identity key digest to a list of ServerInfo for that
+ # server.
+ # blockedNicknames: a map from lowercase nickname to a list of the purposes
+ # ('entry', 'exit', or '*') for which the corresponding server shouldn't
+ # be selected in automatic path generation. Set by configure.
def __init__(self, config=None, store=None, diskLock=None):
self._lock = RWLock()
if diskLock is None:
@@ -575,7 +742,10 @@
self.__scan()
def __scan(self):
- # Must hold write lock if other threads can reach this object.
+ """Helper: update all fields derived from self.store.
+
+ Must hold write lock if other threads can reach this object.
+ """
self.allServers = self.store.getServerList()
self.clientVersions, self.serverVersions = \
self.store.getRecommendedVersions()
@@ -595,6 +765,9 @@
self.goodServers.append(s)
def flush(self):
+ """Save any pending changes to disk, and update all derivative
+ fields that would need to change.
+ """
self._diskLock.acquire()
self._lock.write_in()
try:
@@ -606,8 +779,13 @@
self._diskLock.release()
def __scanAsNeeded(self):
+ """Helper: if there are any changes in the underlying store, then
+ flush them to disk.
+
+ Callers should hold no locks; a little faster than just calling
+ 'flush'.
+ """
#XXXX008 some calls are probably needless.
- # should hold no locks.
self._lock.read_in()
try:
if not self.store.hasChanged():
@@ -618,6 +796,7 @@
self.flush()
def configure(self,config):
+ """ """
self._lock.write_in()
try:
self.store.configure(config)
@@ -639,6 +818,7 @@
self._lock.write_out()
def save(self):
+ """Flush all changes to disk, whether we need to or not."""
self._diskLock.acquire()
self._lock.read_in()
try:
@@ -648,6 +828,7 @@
self._diskLock.release()
def rescan(self,force=0):
+ """Rescan the underlying source."""
self._diskLock.acquire()
self._lock.write_in()
try:
@@ -655,8 +836,10 @@
finally:
self._lock.write_out()
self._diskLock.release()
+ self.__scanAsNeeded()
def update(self, force=0, now=None):
+ """Download a directory as needed."""
self._diskLock.acquire()
try:
self.store.update(force=force,now=now,lock=self._lock)
@@ -665,6 +848,7 @@
self.__scanAsNeeded()
def clean(self, now=None):
+ """Remove expired and superseded descriptors."""
self._diskLock.acquire()
self._lock.write_in()
try:
@@ -675,6 +859,7 @@
self.__scanAsNeeded()
def importFromFile(self, sourceFname):
+ """See FSBackedDescriptorSource.importFromFile"""
self._diskLock.acquire()
self._lock.write_in()
try:
@@ -688,6 +873,7 @@
self.__scanAsNeeded()
def expungeByNickname(self, nickname):
+ """See FSBackedDescriptorSource.expungeByNickname"""
self._diskLock.acquire()
self._lock.write_in()
try:
@@ -700,6 +886,10 @@
self._diskLock.release()
self.__scanAsNeeded()
+ def getAllServers(self):
+ """DOCDOC"""
+ return self.allServers
+
def _installAsKeyIDResolver(self):
"""Use this ClientDirectory to identify servers in calls to
ServerInfo.displayServer*.
@@ -1059,8 +1249,7 @@
return paths
def getPath(self, template, startAt=None, endAt=None, prng=None):
- """Workhorse method for path selection. Given a template, and
- a capability that must be supported by the exit node, return
+ """Workhorse method for path selection. Given a template, return
a list of serverinfos that 'matches' the template, and whose
last node provides exitCap.