[Author Prev][Author Next][Thread Prev][Thread Next][Author Index][Thread Index]

[minion-cvs] Move non-CLI-specific code from ClientMain into ClientU...

Update of /home/minion/cvsroot/src/minion/lib/mixminion
In directory moria.mit.edu:/tmp/cvs-serv31051/lib/mixminion

Modified Files:
	ClientMain.py ClientUtils.py Config.py test.py 
Added Files:
Log Message:
Move non-CLI-specific code from ClientMain into ClientUtils, ClientDirectory.

--- NEW FILE: ClientDirectory.py ---
# Copyright 2002-2003 Nick Mathewson.  See LICENSE for licensing information.
# Id: ClientMain.py,v 1.89 2003/06/05 18:41:40 nickm Exp $

"""mixminion.ClientDirectory: Code to handle the 'client' side of 
   dealing with mixminion directories.  This includes:
     - downloading and caching directories
     - path generation

__all__ = [ 'ClientDirectory', 'parsePath', 'parsePathLeg' ]

import cPickle
import errno
import os
import re
import signal
import socket
import stat
import time
import urllib2

import mixminion.ClientMain #XXXX
import mixminion.Config
import mixminion.Crypto
import mixminion.ServerInfo

from mixminion.Common import LOG, MixError, MixFatalError, UIError, \
     ceilDiv, createPrivateDir, formatDate, formatFnameTime, openUnique, \
     previousMidnight, readPickled, readPossiblyGzippedFile, \
     replaceFile, tryUnlink, writePickled 
from mixminion.Packet import MBOX_TYPE, SMTP_TYPE, DROP_TYPE

# FFFF This should be made configurable and adjustable.
MIXMINION_DIRECTORY_URL = "http://mixminion.net/directory/Directory.gz";

class ClientDirectory:
    """A ClientDirectory manages a list of server descriptors, either
       imported from the command line or from a directory."""
    # dir: directory where we store everything.
    # lastModified: time when we last modified this directory.
    # lastDownload: time when we last downloaded a directory
    # serverList: List of (ServerInfo, 'D'|'D-'|'I:filename') tuples.  The
    #   second element indicates whether the ServerInfo comes from a
    #   directory or a file.  ('D-' is an unrecommended server.)
    # fullServerList: List of (ServerInfo, 'D'|'D-'|'I:filename')
    #   tuples, including servers not on the Recommended-Servers list.
    # digestMap: Map of (Digest -> 'D'|'D-'|'I:filename').
    # byNickname: Map from nickname.lower() to list of (ServerInfo, source)
    #   tuples.
    # byCapability: Map from capability ('mbox'/'smtp'/'relay'/None) to
    #    list of (ServerInfo, source) tuples.
    # allServers: Same as byCapability[None]
    # __scanning: Flag to prevent recursive invocation of self.rescan().
    # clientVersions: String of allowable client versions as retrieved
    #    from most recent directory.
    # goodServerNicknames: A map from lowercased nicknames of recommended
    #    servers to 1.
    ## Layout:
    # DIR/cache: A cPickled tuple of ("ClientKeystore-0.2",
    #         lastModified, lastDownload, clientVersions, serverlist,
    #         fullServerList, digestMap) DOCDOC is this correct?
    # DIR/dir.gz *or* DIR/dir: A (possibly gzipped) directory file.
    # DIR/imported/: A directory of server descriptors.
    MAGIC = "ClientKeystore-0.3"

    # The amount of time to require a path to be valid, by default.
    # (Servers already have a keyOverlap of a few hours, so there's not so
    #  much need to do this at the client side.)

    def __init__(self, directory):
        """Create a new ClientDirectory to keep directories and descriptors
           under <directory>."""
        self.dir = directory
        createPrivateDir(os.path.join(self.dir, "imported"))
        self.digestMap = {}
        self.__scanning = 0
            mixminion.ClientMain.clientLock() # XXXX disentangle
            mixminion.ClientMain.clientUnlock() # XXXX

        # Mixminion 0.0.1 used an obsolete directory-full-of-servers in
        #   DIR/servers.  If there's nothing there, we remove it.  Otherwise,
        #   we warn.
        # XXXX010 Eventually, we can remove this.
        sdir = os.path.join(self.dir,"servers")
        if os.path.exists(sdir):
            if os.listdir(sdir):
                LOG.warn("Skipping obsolete server directory %s", sdir)
                    LOG.warn("Removing obsolete server directory %s", sdir)
                except OSError, e:
                    LOG.warn("Failed: %s", e)

    def updateDirectory(self, forceDownload=0, now=None):
        """Download a directory from the network as needed."""
        if now is None:
            now = time.time()

        if forceDownload or self.lastDownload < previousMidnight(now):
            LOG.debug("Directory is up to date.")
    def downloadDirectory(self):
        """Download a new directory from the network, validate it, and
           rescan its servers."""
        # FFFF Make configurable
        # Start downloading the directory.
        LOG.info("Downloading directory from %s", url)

        # XXXX Refactor download logic.

        if hasattr(signal, 'alarm'):
            def sigalrmHandler(sig, _):
            signal.signal(signal.SIGALRM, sigalrmHandler)
                infile = urllib2.urlopen(url)
            except IOError, e:
                raise UIError(
                    ("Couldn't connect to directory server: %s.\n"
                     "Try '-D no' to run without downloading a directory.")%e)
            except socket.error, e:
                if getattr(e,"errno",-1) == errno.EINTR:
                    raise UIError("Connection to directory server timed out")
                    raise UIError("Error connecting: %s"%e)
                raise UIError
            if hasattr(signal, 'alarm'):
        # Open a temporary output file.
        if url.endswith(".gz"):
            fname = os.path.join(self.dir, "dir_new.gz")
            outfile = open(fname, 'wb')
            gz = 1
            fname = os.path.join(self.dir, "dir_new")
            outfile = open(fname, 'w')
            gz = 0
        # Read the file off the network.
        while 1:
            s = infile.read(1<<16)
            if not s: break
        # Close open connections.
        # Open and validate the directory
        LOG.info("Validating directory")
            directory = mixminion.ServerInfo.ServerDirectory(
        except mixminion.Config.ConfigError, e:
            raise MixFatalError("Downloaded invalid directory: %s" % e)

        # Make sure that the identity is as expected.
        identity = directory['Signature']['DirectoryIdentity']
        if fp and mixminion.Crypto.pk_fingerprint(identity) != fp:
            raise MixFatalError("Bad identity key on directory")

        tryUnlink(os.path.join(self.dir, "cache"))

        # Install the new directory
        if gz:
            replaceFile(fname, os.path.join(self.dir, "dir.gz"))
            replaceFile(fname, os.path.join(self.dir, "dir"))

        # And regenerate the cache.
        # FFFF Actually, we could be a bit more clever here, and same some
        # FFFF time. But that's for later.

    def rescan(self, force=None, now=None):
        """Regenerate the cache based on files on the disk."""
        self.lastModified = self.lastDownload = -1
        self.serverList = []
        self.fullServerList = []
        self.clientVersions = None
        self.goodServerNicknames = {}

        if force:
            self.digestMap = {}

        # Read the servers from the directory.
        gzipFile = os.path.join(self.dir, "dir.gz")
        dirFile = os.path.join(self.dir, "dir")
        for fname in gzipFile, dirFile:
            if not os.path.exists(fname): continue
            self.lastDownload = self.lastModified = \
                directory = mixminion.ServerInfo.ServerDirectory(
            except mixminion.Config.ConfigError:
                LOG.warn("Ignoring invalid directory (!)")

            for s in directory.getServers():
                self.serverList.append((s, 'D'))
                self.digestMap[s.getDigest()] = 'D'
                self.goodServerNicknames[s.getNickname().lower()] = 1
            for s in directory.getAllServers():
                if self.goodServerNicknames.has_key(s.getNickname().lower()):
                    where = 'D'
                    where = 'D-'
                self.fullServerList.append((s, where))
                self.digestMap[s.getDigest()] = where

            self.clientVersions = (

        # Now check the server in DIR/servers.
        serverDir = os.path.join(self.dir, "imported")
        for fn in os.listdir(serverDir):
            # Try to read a file: is it a server descriptor?
            p = os.path.join(serverDir, fn)
                # Use validatedDigests *only* when not explicitly forced.
                info = mixminion.ServerInfo.ServerInfo(fname=p, assumeValid=0,
            except mixminion.Config.ConfigError:
                LOG.warn("Invalid server descriptor %s", p)
            mtime = os.stat(p)[stat.ST_MTIME]
            if mtime > self.lastModified:
                self.lastModifed = mtime
            self.serverList.append((info, "I:%s"%fn))
            self.fullServerList.append((info, "I:%s"%fn))
            self.digestMap[info.getDigest()] = "I:%s"%fn
            self.goodServerNicknames[info.getNickname().lower()] = 1

        # Regenerate the cache
        # Now try reloading, to make sure we can, and to get __rebuildTables.
        self.__scanning = 1

    def __load(self):
        """Helper method. Read the cached parsed descriptors from disk."""
            cached = readPickled(os.path.join(self.dir, "cache"))
            magic = cached[0]
            if magic == self.MAGIC:
                _, self.lastModified, self.lastDownload, self.clientVersions, \
                   self.serverList, self.fullServerList, self.digestMap \
                   = cached
                LOG.warn("Bad version on directory cache; rebuilding...")
        except (OSError, IOError):
            LOG.info("Couldn't read directory cache; rebuilding")
        except (cPickle.UnpicklingError, ValueError), e:
            LOG.info("Couldn't unpickle directory cache: %s", e)
        if self.__scanning:
            raise MixFatalError("Recursive error while regenerating cache")

    def __save(self):
        """Helper method. Recreate the cache on disk."""
        data = (self.MAGIC,
                self.lastModified, self.lastDownload,
                self.clientVersions, self.serverList, self.fullServerList,
        writePickled(os.path.join(self.dir, "cache"), data)

    def importFromFile(self, filename):
        """Import a new server descriptor stored in 'filename'"""

        contents = readPossiblyGzippedFile(filename)
        info = mixminion.ServerInfo.ServerInfo(string=contents, 

        nickname = info.getNickname()
        lcnickname = nickname.lower()
        identity = info.getIdentity()
        # Make sure that the identity key is consistent with what we know.
        for s, _ in self.serverList:
            if s.getNickname() == nickname:
                if not mixminion.Crypto.pk_same_public_key(identity,
                    raise MixError("Identity key changed for server %s in %s",
                                   nickname, filename)

        # Have we already imported this server?
        if self.digestMap.get(info.getDigest(), "X").startswith("I:"):
            raise UIError("Server descriptor is already imported")

        # Is the server expired?
        if info.isExpiredAt(time.time()):
            raise UIError("Server descriptor is expired")

        # Is the server superseded?
        if self.byNickname.has_key(lcnickname):
            if info.isSupersededBy([s for s,_ in self.byNickname[lcnickname]]):
                raise UIError("Server descriptor is already superseded")

        # Copy the server into DIR/servers.
        fnshort = "%s-%s"%(nickname, formatFnameTime())
        fname = os.path.join(self.dir, "imported", fnshort)
        f = openUnique(fname)[0]
        # Now store into the cache.
        fnshort = os.path.split(fname)[1]
        self.serverList.append((info, 'I:%s'%fnshort))
        self.fullServerList.append((info, 'I:%s'%fnshort))
        self.digestMap[info.getDigest()] = 'I:%s'%fnshort
        self.lastModified = time.time()

    def expungeByNickname(self, nickname):
        """Remove all imported (non-directory) server nicknamed 'nickname'."""
        lcnickname = nickname.lower()
        n = 0 # number removed
        newList = [] # replacement for serverList.

        for info, source in self.serverList:
            if source == 'D' or info.getNickname().lower() != lcnickname:
                newList.append((info, source))
            n += 1
                fn = source[2:]
                os.unlink(os.path.join(self.dir, "imported", fn))
            except OSError, e:
                LOG.error("Couldn't remove %s: %s", fn, e)

        self.serverList = newList
        # Recreate cache if needed.
        if n:
        return n

    def __rebuildTables(self):
        """Helper method.  Reconstruct byNickname, allServers, and byCapability
           from the internal start of this object.
        self.byNickname = {}
        self.allServers = []
        self.byCapability = { 'mbox': [],
                              'smtp': [],
                              'relay': [],
                              'frag': [],
                              None: self.allServers }
        self.goodServerNicknames = {}

        for info, where in self.serverList:
            nn = info.getNickname().lower()
            lists = [ self.allServers, self.byNickname.setdefault(nn, []) ]
            for c in info.getCaps():
                lists.append( self.byCapability[c] )
            for lst in lists:
                lst.append((info, where))
            self.goodServerNicknames[nn] = 1

        for info, where in self.fullServerList:
            nn = info.getNickname().lower()
            if self.goodServerNicknames.get(nn):
            self.byNickname.setdefault(nn, []).append((info, where))

    def listServers(self):
        """Returns a linewise listing of the current servers and their caps.
            This will go away or get refactored in future versions once we
            have client-level modules.
        lines = []
        nicknames = self.byNickname.keys()
        if not nicknames:
            return [ "No servers known" ]
        longestnamelen = max(map(len, nicknames))
        fmtlen = min(longestnamelen, 20)
        nnFormat = "%"+str(fmtlen)+"s:%s"
        for n in nicknames:
            nnreal = self.byNickname[n][0][0].getNickname()
            isGood = self.goodServerNicknames.get(n, 0)
            if isGood:
                status = ""
                status = " (not recommended)"
            for info, where in self.byNickname[n]:
                caps = info.getCaps()
                va = formatDate(info['Server']['Valid-After'])
                vu = formatDate(info['Server']['Valid-Until'])
                line = "      [%s to %s] %s"%(va,vu," ".join(caps))
        return lines

    def __findOne(self, lst, startAt, endAt):
        """Helper method.  Given a list of (ServerInfo, where), return a
           single element that is valid for all time between startAt and

           Watch out: this element is _not_ randomly chosen.
        res = self.__find(lst, startAt, endAt)
        if res:
            return res[0]
        return None

    def __find(self, lst, startAt, endAt):
        """Helper method.  Given a list of (ServerInfo, where), return all
           elements that are valid for all time between startAt and endAt.

           Only one element is returned for each nickname; if multiple
           elements with a given nickname are valid over the given time
           interval, the most-recently-published one is included.
        # FFFF This is not really good: servers may be the same, even if
        # FFFF their nicknames are different.  The logic should probably
        # FFFF go into directory, though.

        u = {} # Map from lcnickname -> latest-expiring info encountered in lst
        for info, _  in lst:
            if not info.isValidFrom(startAt, endAt):
            n = info.getNickname().lower()
            if u.has_key(n):
                if u[n].isNewerThan(info):
            u[n] = info

        return u.values()

    def findByExitTypeAndSize(self, exitType, size, nPackets):
        #XXXX006 remove this method.  It's not really a good interface,
        #XXXX006 and only gets used by the kludgy choose-a-new-last-hop logic
        """Return a server that supports exitType 'exittype' (currently must be
           SMTP_TYPE), and messages of size 'size' bytes."""
        assert exitType == SMTP_TYPE
        servers = self.__find(self.byCapability['smtp'], time.time(),
        servers = servers[:]
        for s in servers:
            maxSize = s['Delivery/SMTP']['Maximum-Size'] * 1024
            maxPackets = s['Delivery/Fragmented'].get('Maximum-Fragments',1)
            if maxSize >= size and maxPackets >= nPackets:
                return s

        return None

    def clean(self, now=None):
        """Remove all expired or superseded descriptors from DIR/servers."""

        if now is None:
            now = time.time()
        cutoff = now - 600

        # List of (ServerInfo,where) not to scratch.
        newServers = []
        for info, where in self.serverList:
            lcnickname = info.getNickname().lower()
            # Find all other SI's with the same name.
            others = [ s for s, _ in self.byNickname[lcnickname] ]
            # Find all digests of servers with the same name, in the directory.
            inDirectory = [ s.getDigest()
                            for s, w in self.byNickname[lcnickname]
                            if w in ('D','D-') ]
            if (where not in ('D', 'D-')
                and (info.isExpiredAt(cutoff)
                     or info.isSupersededBy(others)
                     or info.getDigest() in inDirectory)):
                # If the descriptor is not in the directory, and it is
                # expired, is superseded, or is duplicated by a descriptor
                # from the directory, remove it.
                    os.unlink(os.path.join(self.dir, "imported", where[2:]))
                except OSError, e:
                    LOG.info("Couldn't remove %s: %s", where[2:], e)
                # Don't scratch non-superseded, non-expired servers.
                newServers.append((info, where))

        # If we've actually deleted any servers, replace self.serverList and
        # rebuild.
        if len(self.serverList) != len(newServers):
            self.serverList = newServers
    def getServerInfo(self, name, startAt=None, endAt=None, strict=0):
        """Return the most-recently-published ServerInfo for a given
           'name' valid over a given time range.  If not strict, and no
           such server is found, return None.

           name -- A ServerInfo object, a nickname, or a filename.

        if startAt is None:
            startAt = time.time()
        if endAt is None:
            endAt = startAt + self.DEFAULT_REQUIRED_LIFETIME

        if isinstance(name, mixminion.ServerInfo.ServerInfo):
            # If it's a valid ServerInfo, we're done.
            if name.isValidFrom(startAt, endAt):
                return name
                LOG.error("Server is not currently valid")
        elif self.byNickname.has_key(name.lower()):
            # If it's a nickname, return a serverinfo with that name.
            s = self.__findOne(self.byNickname[name.lower()], startAt, endAt)

            if not s:
                raise UIError(
                    "Couldn't find any currently live descriptor with name %s"
                    % name)

            if not self.goodServerNicknames.has_key(s.getNickname().lower()):
                LOG.warn("Server %s is not recommended",name)
            return s
        elif os.path.exists(os.path.expanduser(name)):
            # If it's a filename, try to read it.
            fname = os.path.expanduser(name)
                return mixminion.ServerInfo.ServerInfo(fname=fname, 
            except OSError, e:
                raise UIError("Couldn't read descriptor %r: %s" %
                               (name, e))
            except mixminion.Config.ConfigError, e:
                raise UIError("Couldn't parse descriptor %r: %s" %
                               (name, e))
        elif strict:
            raise UIError("Couldn't find descriptor for %r" % name)
            return None

    def getPath(self, endCap, template, startAt=None, endAt=None, prng=None):
        """Workhorse method for path selection.  Given a template, and
           a capability that must be supported by the exit node, return
           a list of serverinfos that 'matches' the template, and whose
           last node provides exitCap.

           The template is a list of either: strings or serverinfos as
           expected by 'getServerInfo'; or None to indicate that
           getPath should select a corresponding server.

           All servers are chosen to be valid continuously from
           startAt to endAt.  The last server is not set) is selected
           to have 'endCap' (typically 'mbox' or 'smtp').  Set endCap
           to 'None' if you don't care.

           The path selection algorithm perfers to choose without
           replacement it it can.
        def setSub(s1, s2):
            """Helper function. Given two lists of serverinfo, returns all
               members of s1 that are not members of s2.  ServerInfos are
               considered equivalent if their nicknames are the same,
               ignoring case.
            n = [ inf.getNickname().lower() for inf in s2 ]
            return [ inf for inf in s1 if inf.getNickname().lower() not in n]

        # Fill in startAt, endAt, prng if not provided
        if startAt is None:
            startAt = time.time()
        if endAt is None:
            endAt = startAt + self.DEFAULT_REQUIRED_LIFETIME
        if prng is None:
            prng = mixminion.Crypto.getCommonPRNG()

        # Resolve explicitly-provided servers
        servers = []
        for name in template:
            if name is None:
                servers.append(self.getServerInfo(name, startAt, endAt, 1))

        # If we need to pick the last server, pick it first.
        if servers[-1] is None:
            # Who has the required exit capability....
            endCandidates = self.__find(self.byCapability[endCap],
            if not endCandidates:
                raise UIError("Can't build path: no %s servers known"%endCap)
            # ....that we haven't used yet?
            used = filter(None, servers)
            unusedEndCandidates = setSub(endCandidates, used)
            if unusedEndCandidates:
                # Somebody with the capability is unused
                endCandidates = unusedEndCandidates
            elif len(endCandidates) > 1 and servers[-2] is not None:
                # We can at least avoid of picking someone with the
                # capability who isn't the penultimate node.
                penultimate = servers[-2].getNickname().lower()
                endCandidates = setSub(endCandidates, [penultimate])
                # We're on our own.
                assert len(endCandidates)

            # Finally, fill in the last server.
            servers[-1] = prng.pick(endCandidates)

        # Now figure out which relays we haven't used yet.
        used = filter(None, servers)
        relays = self.__find(self.byCapability['relay'], startAt, endAt)
        if not relays:
            raise UIError("No relays known")
        elif len(relays) == 2:
            LOG.warn("Not enough servers to avoid same-server hops")
        elif len(relays) == 1:
            LOG.warn("Only one relay known")

        # Now fill in the servers. For each relay we need...
        for i in xrange(len(servers)):
            if servers[i] is not None:
            # Find the servers adjacent to it, if any...
            if i>0:
                abutters = filter(None,[ servers[i-1], servers[i+1]])
                abutters = filter(None,[ servers[i+1] ])
            # ...and see if there are any relays left that aren't adjacent.
            candidates = setSub(relays, abutters)
            if candidates:
                # Good.  There are.
                servers[i] = prng.pick(candidates)
                # Nope.  Choose a random relay.
                servers[i] = prng.pick(relays)

        # FFFF We need to make sure that the path isn't totally junky.

        return servers

    def checkClientVersion(self):
        """Check the current client's version against the stated version in
           the most recently downloaded directory; print a warning if this
           version isn't listed as recommended.
        if not self.clientVersions:
        allowed = self.clientVersions.split()
        current = mixminion.__version__
        if current in allowed:
            # This version is recommended.
        current_t = mixminion.version_info
        more_recent_exists = 0
        for a in allowed:
                t = mixminion.parse_version_string(a)
            except ValueError:
                LOG.warn("Couldn't parse recommended version %s", a)
                if mixminion.cmp_versions(current_t, t) < 0:
                    more_recent_exists = 1
            except ValueError:
        if more_recent_exists:
            LOG.warn("This software may be obsolete; "
                      "You should consider upgrading.")
            LOG.warn("This software is newer than any version "
                     "on the recommended list.")

def parsePath(directory, config, path, address, nHops=None,
              startAt=None, endAt=None, halfPath=0,
    """Resolve a path as specified on the command line.  Returns a
       (path-leg-1, path-leg-2) tuple, where each leg is a list of ServerInfo.

       directory -- the ClientDirectory to use.
       config -- unused for now.
       path -- the path, in a format described below.  If the path is
          None, all servers are chosen as if the path were '*'.
       address -- the address to deliver the message to; if it specifies
          an exit node, the exit node is appended to the second leg of the
          path and does not count against the number of hops.  If 'address'
          is None, the exit node must support relay.
       nHops -- the number of hops to use.  Defaults to defaultNHops.
       startAt/endAt -- A time range during which all servers must be valid.
       halfPath -- If true, we generate only the second leg of the path
          and leave the first leg empty.
       defaultNHops -- The default path length to use when we encounter a
          wildcard in the path.  Defaults to 6.

       Paths are ordinarily comma-separated lists of server nicknames or
       server descriptor filenames, as in:

       You can use a colon as a separator to divides the first leg of the path
       from the second:
       If nSwap and a colon are both used, they must match, or MixError is

       You can use a question mark to indicate a randomly chosen server:
       As an abbreviation, you can use star followed by a number to indicate
       that number of randomly chosen servers:
       You can use a star without a number to specify a fill point
       where randomly-selected servers will be added:
       Finally, you can use a tilde followed by a number to specify an
       approximate number of servers to add.  (The actual number will be
       chosen randomly, according to a normal distribution with standard
       deviation 1.5):

       The nHops argument must be consistent with the path, if both are
       specified.  Specifically, if nHops is used _without_ a star on the
       path, nHops must equal the path length; and if nHops is used _with_ a
       star on the path, nHops must be >= the path length.
    if not path:
        path = '*'
    # Break path into a list of entries of the form:
    #        Nickname
    #     or "<swap>"
    #     or "?"
    p = []
    while path:
        if path[0] == "'":
            m = re.match(r"'([^']+|\\')*'", path)
            if not m: 
                raise UIError("Mismatched quotes in path.")
            p.append(m.group(1).replace("\\'", "'"))
            path = path[m.end():]
            if path and path[0] not in ":,":
                raise UIError("Invalid quotes in path.")
        elif path[0] == '"':
            m = re.match(r'"([^"]+|\\")*"', path)
            if not m: 
                raise UIError("Mismatched quotes in path.")
            p.append(m.group(1).replace('\\"', '"'))
            path = path[m.end():]
            if path and path[0] not in ":,":
                raise UIError("Invalid quotes in path.")
            m = re.match(r"[^,:]+",path)
            if not m:
                raise UIError("Invalid path") 
            path = path[m.end():]
        if not path:
        elif path[0] == ',':
            path = path[1:]
        elif path[0] == ':':
            path = path[1:]

    path = []
    for ent in p:
        if re.match(r'\*(\d+)', ent):
        elif re.match(r'\~(\d+)', ent):
            avg = int(ent[1:])
            n = int(mixminion.Crypto.getCommonPRNG().getNormal(avg, 1.5)+0.5)
            if n < 0: n = 0

    # set explicitSwap to true iff the user specified a swap point.
    explicitSwap = path.count("<swap>")
    # set colonPos to the index of the explicit swap point, if any.
    if path.count("<swap>") > 1:
        raise UIError("Can't specify swap point twice")

    # set starPos to the index of the var-length wildcard, if any.
    if path.count("*") > 1:
        raise UIError("Can't have two variable-length wildcards in a path")
    elif path.count("*") == 1:
        starPos = path.index("*")
        starPos = None

    # If there's a variable-length wildcard...
    if starPos is not None:
        # Find out how many hops we should have.
        myNHops = nHops or defaultNHops or 6
        # Figure out how many nodes we need to add.
        haveHops = len(path) - 1
        # A colon will throw the count off.
        if explicitSwap:
            haveHops -= 1
        path[starPos:starPos+1] = ["?"]*max(0,myNHops-haveHops)

    # Figure out how long the first leg should be.
    if explicitSwap:
        # Calculate colon position
        colonPos = path.index("<swap>")
        if halfPath:
            raise UIError("Can't specify swap point with replies")
        firstLegLen = colonPos
        del path[colonPos]
    elif halfPath:
        firstLegLen = 0
        firstLegLen = ceilDiv(len(path), 2)

    # Do we have the right # of hops?
    if nHops is not None and len(path) != nHops:
        raise UIError("Mismatch between specified path lengths")

    # Replace all '?'s in path with [None].
    for i in xrange(len(path)):
        if path[i] == '?': path[i] = None

    # Figure out what capability we need in our exit node, so that
    # we can tell the directory.
    if address is None:
        rt, ri, exitNode = None, None, None
        exitCap = 'relay'
        rt, ri, exitNode = address.getRouting()
        if rt == MBOX_TYPE:
            exitCap = 'mbox'
        elif rt == SMTP_TYPE:
            exitCap = 'smtp'
            exitCap = None

    # If we have an explicit exit node from the address, append it.
    if exitNode is not None:

    # Get a list of serverinfo.
    path = directory.getPath(endCap=exitCap,
                             template=path, startAt=startAt, endAt=endAt)

    # Now sanity-check the servers.

    # Make sure all relay servers support relaying.
    for server in path[:-1]:
        if "relay" not in server.getCaps():
            raise UIError("Server %s does not support relay"
                          % server.getNickname())

    # Make sure the exit server can support the exit capability.
    if exitCap and exitCap not in path[-1].getCaps():
        raise UIError("Server %s does not support %s capability"
                      % (path[-1].getNickname(), exitCap))

    # Split the path into 2 legs.
    path1, path2 = path[:firstLegLen], path[firstLegLen:]
    if not halfPath and len(path1)+len(path2) < 2:
        raise UIError("Path is too short")
    if not halfPath and (not path1 or not path2):
        raise UIError("Each leg of the path must have at least 1 hop")

    # Make sure the path can fit into the headers.
    mixminion.BuildMessage.checkPathLength(path1, path2,

    # Return the two legs of the path.
    return path1, path2

def parsePathLeg(directory, config, path, nHops, address=None,
                 startAt=None, endAt=None, defaultNHops=None):
    """Parse a single leg of a path.  Used for generating SURBs (second leg
       only) or reply messages (first leg only).  Returns a list of

       directory -- the ClientDirectory to use.
       config -- unused for now.
       path -- The path, as described in parsePath, except that ':' is not
       nHops -- the number of hops to use.  Defaults to defaultNHops.
       startAt/endAt -- A time range during which all servers must be valid.
       defaultNHops -- The default path length to use when we encounter a
          wildcard in the path.  Defaults to 6.
    path1, path2 = parsePath(directory, config, path, address, nHops,
                             startAt=startAt, endAt=endAt, halfPath=1,
    assert path1 == []
    return path2

Index: ClientMain.py
RCS file: /home/minion/cvsroot/src/minion/lib/mixminion/ClientMain.py,v
retrieving revision 1.116
retrieving revision 1.117
diff -u -d -r1.116 -r1.117
--- ClientMain.py	28 Sep 2003 04:12:29 -0000	1.116
+++ ClientMain.py	28 Sep 2003 05:27:55 -0000	1.117
@@ -6,50 +6,31 @@
    Code for Mixminion command-line client.
-__all__ = [ 'Address', 'ClientKeyring', 'ClientDirectory', 'MixminionClient',
-    'parsePath', ]
+__all__ = [ 'Address', 'ClientKeyring', 'MixminionClient' ]
-import binascii
-import errno
-import cPickle
[...1183 lines suppressed...]
-        return SURBLog(self.surbLogFilename)
+        return mixminion.ClientUtils.SURBLog(self.surbLogFilename)
     def pingServer(self, routingInfo):
         """Given an IPV4Info, try to connect to a server and find out if
@@ -1709,12 +606,12 @@
-        return ClientConfig(fname=configFile)
+        return mixminion.Config.ClientConfig(fname=configFile)
     except (IOError, OSError), e:
         print >>sys.stderr, "Error reading configuration file %r:"%configFile
         print >>sys.stderr, "   ", str(e)
-    except ConfigError, e:
+    except mixminion.Config.ConfigError, e:
         print >>sys.stderr, "Error in configuration file %r"%configFile
         print >>sys.stderr, "   ", str(e)

Index: ClientUtils.py
RCS file: /home/minion/cvsroot/src/minion/lib/mixminion/ClientUtils.py,v
retrieving revision 1.1
retrieving revision 1.2
diff -u -d -r1.1 -r1.2
--- ClientUtils.py	6 Sep 2003 21:49:48 -0000	1.1
+++ ClientUtils.py	28 Sep 2003 05:27:56 -0000	1.2
@@ -8,15 +8,20 @@
 __all__ = [ 'NoPassword', 'PasswordManager', 'getPassword_term',
-            'getNewPassword_term', ]
+            'getNewPassword_term', 'SURBLog', 'ClientQueue' ]
+import binascii
 import cPickle
 import getpass
 import os
 import sys
+import time
-from mixminion.Common import readFile, writeFile, MixError
 import mixminion.Crypto
+import mixminion.Filestore
+from mixminion.Common import LOG, MixError, UIError, createPrivateDir, \
+     floorDiv, previousMidnight, readFile, writeFile
 class BadPassword(MixError):
@@ -36,7 +41,7 @@
         if self.passwords.has_key(name):
             return self.passwords[name]
         for othername, pwd in self.passwords.items():
-            if self._confirm(name, pwd):
+            if confirmFn(pwd):
                 self.passwords[name] = pwd
                 return pwd
         pmt = prompt
@@ -83,15 +88,14 @@
 def getNewPassword_term(prompt):
     """Read a new password from the console, then return it."""
-    s1 = "Enter new password for %s:"%which
-    s2 = "Verify password:".rjust(len(s1))
+    s2 = "Verify password:".rjust(len(prompt))
     if os.isatty(sys.stdout.fileno()):
         f = sys.stdout
         f = sys.stderr
     while 1:
-        p1 = self.getPassword_term(s1)
-        p2 = self.getPassword_term(s2)
+        p1 = getPassword_term(prompt)
+        p2 = getPassword_term(s2)
         if p1 == p2:
             return p1
         f.write("Passwords do not match.\n")
@@ -204,3 +208,223 @@
+# ----------------------------------------------------------------------
+class SURBLog(mixminion.Filestore.DBBase):
+    """A SURBLog manipulates a database on disk to remember which SURBs we've
+       used, so we don't reuse them accidentally.
+       """
+    #FFFF Using this feature should be optional.
+    ## Format:
+    # The database holds two kinds of keys:
+    #    "LAST_CLEANED" -> an integer of the last time self.clean() was called.
+    #    20-byte-hash-of-SURB -> str(expiry-time-of-SURB)
+    def __init__(self, filename, forceClean=0):
+        """Open a new SURBLog to store data in the file 'filename'.  If
+           forceClean is true, remove expired entries on startup.
+        """
+        mixminion.ClientMain.clientLock() #XXXX
+        mixminion.Filestore.DBBase.__init__(self, filename, "SURB log")
+        try:
+            lastCleaned = int(self.log['LAST_CLEANED'])
+        except (KeyError, ValueError):
+            lastCleaned = 0
+        if lastCleaned < time.time()-24*60*60 or forceClean:
+            self.clean()
+        self.sync()
+    def findUnusedSURB(self, surbList, verbose=0, now=None):
+        """Given a list of ReplyBlock objects, find the first that is neither
+           expired, about to expire, or used in the past.  Return None if
+           no such reply block exists."""
+        if now is None:
+            now = time.time()
+        nUsed = nExpired = nShortlived = 0
+        result = None
+        for surb in surbList: 
+            expiry = surb.timestamp
+            timeLeft = expiry - now
+            if self.isSURBUsed(surb):
+                nUsed += 1
+            elif timeLeft < 60:
+                nExpired += 1
+            elif timeLeft < 3*60*60:
+                nShortlived += 1
+            else:
+                result = surb
+                break
+        if verbose:
+            if nUsed:
+                LOG.warn("Skipping %s used reply blocks", nUsed)
+            if nExpired:
+                LOG.warn("Skipping %s expired reply blocks", nExpired)
+            if nShortlived:
+                LOG.warn("Skipping %s soon-to-expire reply blocks",nShortlived)
+        return result
+    def close(self):
+        """Release resources associated with the surblog."""
+        mixminion.Filestore.DBBase.close(self)
+        mixminion.ClientMain.clientUnlock()
+    def isSURBUsed(self, surb):
+        """Return true iff the ReplyBlock object 'surb' is marked as used."""
+        return self.has_key(surb)
+    def markSURBUsed(self, surb):
+        """Mark the ReplyBlock object 'surb' as used."""
+        self[surb] = surb.timestamp
+    def clean(self, now=None):
+        """Remove all entries from this SURBLog the correspond to expired
+           SURBs.  This is safe because if a SURB is expired, we'll never be
+           able to use it inadvertently."""
+        if now is None:
+            now = time.time() + 60*60
+        allHashes = self.log.keys()
+        removed = []
+        for hash in allHashes:
+            if self._decodeVal(self.log[hash]) < now:
+                removed.append(hash)
+        del allHashes
+        for hash in removed:
+            del self.log[hash]
+        self.log['LAST_CLEANED'] = str(int(now))
+        self.sync()
+    def _encodeKey(self, surb):
+        return binascii.b2a_hex(mixminion.Crypto.sha1(surb.pack()))
+    def _encodeVal(self, timestamp):
+        return str(timestamp)
+    def _decodeVal(self, timestamp):
+        try:
+            return int(timestamp)
+        except ValueError:
+            return 0
+# ----------------------------------------------------------------------
+class ClientQueue:
+    """A ClientQueue holds packets that have been scheduled for delivery
+       but not yet delivered.  As a matter of policy, we queue messages if
+       the user tells us to, or if deliver has failed and the user didn't
+       tell us not to."""
+    ## Fields:
+    # dir -- a directory to store packets in.
+    # store -- an instance of ObjectStore.  The entries are of the
+    #    format:
+    #           ("PACKET-0",
+    #             a 32K string (the packet),
+    #             an instance of IPV4Info (the first hop),
+    #             the latest midnight preceding the time when this
+    #                 packet was inserted into the queue
+    #           )
+    # XXXX change this to be OO; add nicknames.
+    # XXXX006 write unit tests
+    def __init__(self, directory, prng=None):
+        """Create a new ClientQueue object, storing packets in 'directory'
+           and generating random filenames using 'prng'."""
+        self.dir = directory
+        createPrivateDir(directory)
+        # We used to name entries "pkt_X"; this has changed.
+        # XXXX006 remove this when it's no longer needed.
+        for fn in os.listdir(directory):
+            if fn.startswith("pkt_"):
+                handle = fn[4:]
+                fname_old = os.path.join(directory, fn)
+                fname_new = os.path.join(directory, "msg_"+handle)
+                os.rename(fname_old, fname_new)
+        self.store = mixminion.Filestore.ObjectStore(
+            directory, create=1, scrub=1)
+    def queuePacket(self, message, routing):
+        """Insert the 32K packet 'message' (to be delivered to 'routing')
+           into the queue.  Return the handle of the newly inserted packet."""
+        mixminion.ClientMain.clientLock()
+        try:
+            fmt = ("PACKET-0", message, routing, previousMidnight(time.time()))
+            return self.store.queueObject(fmt)
+        finally:
+            mixminion.ClientMain.clientUnlock()
+    def getHandles(self):
+        """Return a list of the handles of all messages currently in the
+           queue."""
+        mixminion.ClientMain.clientLock()
+        try:
+            return self.store.getAllMessages()
+        finally:
+            mixminion.ClientMain.clientUnlock()
+    def getPacket(self, handle):
+        """Given a handle, return a 3-tuple of the corresponding
+           32K packet, IPV4Info, and time of first queueing.  (The time
+           is rounded down to the closest midnight GMT.)  May raise 
+           CorruptedFile."""
+        obj = self.store.getObject(handle)
+        try:
+            magic, message, routing, when = obj
+        except (ValueError, TypeError):
+            magic = None
+        if magic != "PACKET-0":
+            LOG.error("Unrecognized packet format for %s",handle)
+            return None
+        return message, routing, when
+    def packetExists(self, handle):
+        """Return true iff the queue contains a packet with the handle
+           'handle'."""
+        return self.store.messageExists(handle)
+    def removePacket(self, handle):
+        """Remove the packet named with the handle 'handle'."""
+        self.store.removeMessage(handle)
+        self.store.cleanQueue()
+    def inspectQueue(self, now=None):
+        """Print a message describing how many messages in the queue are headed
+           to which addresses."""
+        if now is None:
+            now = time.time()
+        handles = self.getHandles()
+        if not handles:
+            print "[Queue is empty.]"
+            return
+        timesByServer = {}
+        for h in handles:
+            try:
+                _, routing, when = self.getPacket(h)
+            except mixminion.Filestore.CorruptedFile:
+                continue
+            timesByServer.setdefault(routing, []).append(when)
+        for s in timesByServer.keys():
+            count = len(timesByServer[s])
+            oldest = min(timesByServer[s])
+            days = floorDiv(now - oldest, 24*60*60)
+            if days < 1:
+                days = "<1"
+            print "%2d messages for server at %s:%s (oldest is %s days old)"%(
+                count, s.ip, s.port, days)
+    def cleanQueue(self, maxAge, now=None):
+        """Remove all messages older than maxAge seconds from this
+           queue."""
+        if now is None:
+            now = time.time()
+        cutoff = now - maxAge
+        remove = []
+        for h in self.getHandles():
+            try:
+                when = self.getPacket(h)[2]
+            except mixminion.Filestore.CorruptedFile:
+                continue
+            if when < cutoff:
+                remove.append(h)
+        LOG.info("Removing %s old messages from queue", len(remove))
+        for h in remove:
+            self.store.removeMessage(h)
+        self.store.cleanQueue()

Index: Config.py
RCS file: /home/minion/cvsroot/src/minion/lib/mixminion/Config.py,v
retrieving revision 1.57
retrieving revision 1.58
diff -u -d -r1.57 -r1.58
--- Config.py	3 Sep 2003 15:49:58 -0000	1.57
+++ Config.py	28 Sep 2003 05:27:56 -0000	1.58
@@ -775,6 +775,7 @@
         return "".join(lines)
 class ClientConfig(_ConfigFile):
+    #XXXX Should this go into ClientUtils or something?
     _restrictFormat = 0
     _restrictKeys = _restrictSections = 1
     _syntax = {

Index: test.py
RCS file: /home/minion/cvsroot/src/minion/lib/mixminion/test.py,v
retrieving revision 1.154
retrieving revision 1.155
diff -u -d -r1.154 -r1.155
--- test.py	12 Sep 2003 15:52:46 -0000	1.154
+++ test.py	28 Sep 2003 05:27:56 -0000	1.155
@@ -5896,7 +5896,7 @@
         #XXXX006 finish testing corner cases and pickles.
-class ClientMainTests(TestCase):
+class ClientDirectoryTests(TestCase):
     def testClientDirectory(self):
         """Check out ClientMain's directory implementation"""
         eq = self.assertEquals
@@ -5904,7 +5904,7 @@
         ServerInfo = mixminion.ServerInfo.ServerInfo
         dirname = mix_mktemp()
-        ks = mixminion.ClientMain.ClientDirectory(dirname)
+        ks = mixminion.ClientDirectory.ClientDirectory(dirname)
         ## Write the descriptors to disk.
         edesc = getExampleServerDescriptors()
@@ -5942,7 +5942,7 @@
             self.assertRaises(MixError, ks.getServerInfo, "Joe", startAt=now,
             if i in (0,1,2):
-                ks = mixminion.ClientMain.ClientDirectory(dirname)
+                ks = mixminion.ClientDirectory.ClientDirectory(dirname)
             if i == 1:
             if i == 2:
@@ -5978,8 +5978,8 @@
         # Replace the real URL and fingerprint with the ones we have; for
         # unit testing purposes, we can't rely on an http server.
-        mixminion.ClientMain.MIXMINION_DIRECTORY_URL = fileURL(fname)
-        mixminion.ClientMain.MIXMINION_DIRECTORY_FINGERPRINT = fingerprint
+        mixminion.ClientDirectory.MIXMINION_DIRECTORY_URL = fileURL(fname)
+        mixminion.ClientDirectory.MIXMINION_DIRECTORY_FINGERPRINT = fingerprint
         # Reload the directory.
@@ -5991,7 +5991,7 @@
             if i in (0,1,2):
-                ks = mixminion.ClientMain.ClientDirectory(dirname)
+                ks = mixminion.ClientDirectory.ClientDirectory(dirname)
             if i == 1:
             if i == 2:
@@ -6015,7 +6015,7 @@
             [os.path.join(impdirname, s) for s in
              ("Fred1", "Fred2", "Lola2", "Alice0", "Alice1",
               "Bob3", "Bob4", "Lisa1", "Lisa2") ], identity)
-        mixminion.ClientMain.MIXMINION_DIRECTORY_URL = fileURL(fname)
+        mixminion.ClientDirectory.MIXMINION_DIRECTORY_URL = fileURL(fname)
         # Previous entries.
         self.assertSameSD(ks.getServerInfo("Alice"), edesc["Alice"][0])
@@ -6082,7 +6082,7 @@
             neq(p[1].getNickname(), "Alice")
             neq(p[1].getNickname(), "Joe")
             # 2b. With 3 <= servers < length
-            ks2 = mixminion.ClientMain.ClientDirectory(mix_mktemp())
+            ks2 = mixminion.ClientDirectory.ClientDirectory(mix_mktemp())
             ks2.importFromFile(os.path.join(impdirname, "Joe0"))
             ks2.importFromFile(os.path.join(impdirname, "Alice0"))
             ks2.importFromFile(os.path.join(impdirname, "Lisa1"))
@@ -6159,7 +6159,7 @@
         self.assertSameSD(p[-1], alice[0]) # We ignore endCap with endServers
         ### Now try parsePath.  This should exercise resolvePath as well.
-        ppath = mixminion.ClientMain.parsePath
+        ppath = mixminion.ClientDirectory.parsePath
         paddr = mixminion.ClientMain.parseAddress
         email = paddr("smtp:lloyd@dobler.com")
         mboxWithServer = paddr("mbox:Granola@Lola")
@@ -6342,10 +6342,30 @@
         ## Now try clean()
         ks.clean() # Should do nothing.
-        ks = mixminion.ClientMain.ClientDirectory(dirname)
+        ks = mixminion.ClientDirectory.ClientDirectory(dirname)
         ks.clean(now=now+oneDay*500) # Should zap all of imported servers.
         raises(MixError, ks.getServerInfo, "Lola")
+    def assertSameSD(self, s1, s2):
+        self.assert_(self.isSameServerDesc(s1,s2))
+    def isSameServerDesc(self, s1, s2):
+        """s1 and s2 are either ServerInfo objects or strings containing server
+           descriptors. Returns 1 iff their digest fields match"""
+        ds = []
+        for s in s1, s2:
+            if type(s) == types.StringType:
+                m = re.search(r"^Digest: (\S+)\n", s, re.M)
+                assert m
+                ds.append(base64.decodestring(m.group(1)))
+            elif isinstance(s, mixminion.ServerInfo.ServerInfo):
+                ds.append(s.getDigest())
+            else:
+                return 0
+        return ds[0] == ds[1]
+class ClientMainTests(TestCase):
     def testAddress(self):
         def parseEq(s, tp, addr, server, eq=self.assertEquals):
             "Helper: return true iff parseAddress(s).getRouting() == t,s,a."
@@ -6389,9 +6409,9 @@
         parseFails("0x9999") # No data
         parseFails("0xFEEEF:zymurgy") # Hex literal out of range
-    def testSURBLog(self):
+    def testSURBLog(self): #XXXX move this.
         brb = BuildMessage.buildReplyBlock
-        SURBLog = mixminion.ClientMain.SURBLog
+        SURBLog = mixminion.ClientUtils.SURBLog
         ServerInfo = mixminion.ServerInfo.ServerInfo
         dirname = mix_mktemp()
         fname = os.path.join(dirname, "surblog")
@@ -6572,24 +6592,6 @@
-    def assertSameSD(self, s1, s2):
-        self.assert_(self.isSameServerDesc(s1,s2))
-    def isSameServerDesc(self, s1, s2):
-        """s1 and s2 are either ServerInfo objects or strings containing server
-           descriptors. Returns 1 iff their digest fields match"""
-        ds = []
-        for s in s1, s2:
-            if type(s) == types.StringType:
-                m = re.search(r"^Digest: (\S+)\n", s, re.M)
-                assert m
-                ds.append(base64.decodestring(m.group(1)))
-            elif isinstance(s, mixminion.ServerInfo.ServerInfo):
-                ds.append(s.getDigest())
-            else:
-                return 0
-        return ds[0] == ds[1]
 class FragmentTests(TestCase):
     def testFragmentParams(self):
@@ -6780,7 +6782,7 @@
     tc = loader.loadTestsFromTestCase
     if 0:
-        suite.addTest(tc(ClientMainTests))
+        suite.addTest(tc(ClientDirectoryTests))
         return suite
     testClasses = [MiscTests,
@@ -6799,6 +6801,7 @@
+                   ClientDirectoryTests,