[Author Prev][Author Next][Thread Prev][Thread Next][Author Index][Thread Index]
[minion-cvs] Most of the back-end for the Great Mixminion Directory ...
Update of /home/minion/cvsroot/src/minion/lib/mixminion/directory
In directory moria.mit.edu:/tmp/cvs-serv11598/src/minion/lib/mixminion/directory
Modified Files:
DirFormats.py Directory.py ServerInbox.py ServerList.py
Log Message:
Most of the back-end for the Great Mixminion Directory Rewrite. Still needs more testing and more glue code
Index: DirFormats.py
===================================================================
RCS file: /home/minion/cvsroot/src/minion/lib/mixminion/directory/DirFormats.py,v
retrieving revision 1.4
retrieving revision 1.5
diff -u -d -r1.4 -r1.5
--- DirFormats.py 13 Dec 2004 07:06:10 -0000 1.4
+++ DirFormats.py 3 May 2005 03:26:50 -0000 1.5
@@ -59,9 +59,13 @@
valid = []
for server in servers:
try:
- s = mixminion.ServerInfo.ServerInfo(
- string=str(server), validatedDigests=validatedDigests,
- _keepContents=1)
+ if isinstance(server, mixminion.ServerInfo.ServerInfo):
+ assert server._originalContents
+ s = server
+ else:
+ s = mixminion.ServerInfo.ServerInfo(
+ string=str(server), validatedDigests=validatedDigests,
+ _keepContents=1)
except ConfigError,e:
LOG.warn("Rejecting malformed serverinfo: %s",e)
else:
Index: Directory.py
===================================================================
RCS file: /home/minion/cvsroot/src/minion/lib/mixminion/directory/Directory.py,v
retrieving revision 1.18
retrieving revision 1.19
diff -u -d -r1.18 -r1.19
--- Directory.py 27 Jul 2004 03:13:02 -0000 1.18
+++ Directory.py 3 May 2005 03:26:50 -0000 1.19
@@ -10,12 +10,13 @@
import os
import stat
+import time
import mixminion.Config
import mixminion.Crypto
from mixminion.Common import LOG, MixError, MixFatalError, UIError, \
- formatBase64, writePickled, readPickled
+ formatBase64, writePickled, readPickled, formatTime
class Directory:
"""Wrapper class for directory filestores.
@@ -222,126 +223,126 @@
raise mixminion.Config.ConfigError("User %s is not in group %s"
%(cgiuser, cgigrp))
-class MismatchedID(Exception):
- """Exception class: raised when the identity key on a new server
- descriptor doesn't match the identity key known for that nickname."""
- pass
-
-class IDCache:
- """Cache to hold a set of nickname->identity key mappings"""
- ##Fields:
- # cache: map from lowercased nickname to ID fingerprint.
- # location: filename to hold pickled cache.
- # dirty: are all the values in 'self.cache' flushed to disk? (boolean)
- # postSave: None, or a function to call after every save.
- ##Pickled Format:
- # ("V0", {lcnickname -> ID Fingerprint} )
- def __init__(self, location, postSave=None):
- """Create an identity cache object.
-
- location -- name of file to hold pickled cache.
- postSave -- optionally, a function to be called after every
- save."""
- self.location = location
- self.dirty = 0
- self.cache = None
- self.postSave = postSave
-
- def emptyCache(self):
- """Remove all entries from this cache."""
- self.dirty = 1
- self.cache = {}
-
- def containsID(self, nickname, ID):
- """Check the identity for the server named 'nickname'. If the
- server is not known, return false. If the identity matches the
- identity key fingerprint 'ID', return true. If the server is
- known, but the identity does not match, raise MismatchedID.
- """
- if self.cache is None: self.load()
+class VoteFile:
+ """File listing dirserver's current disposition towards various
+ nickname/identity comibations. Each can be voted 'yes', 'no',
+ or 'abstain'.
+ """
+ ## Fields:
+ # status: identity fingerprint -> ("yes", "nickname") | ("no", None) |
+ # ("abstain", None) | ("ignore", None)
+ # fname
+ # dirty, uid, gid
+ def __init__(self, fname, uid=None, gid=None):
+ self.fname = fname
+ self.uid = uid
+ self.gid = gig
+ if not self._loadFromCache():
+ self._load(fname)
- lcnickname = nickname.lower()
+ def _load(self, fname):
+ pat = re.compile(r'(yes|no|abstain|ignore)\s+(\S+)\s+([a-fA-F0-9 ]+)')
+ f = open(fname, 'r')
try:
- if self.cache[lcnickname] != ID:
- raise MismatchedID()
- return 1
- except KeyError:
- return 0
-
- def containsServer(self, server):
- """Check the identity key contained in a server descriptor. Return
- true if the server is known, false if the server unknown, and
- raise MismatchedID if the server is known but its ID is
- incorrect."""
- nickname = server.getNickname()
- ID = getIDFingerprint(server)
- return self.containsID(nickname, ID)
-
- def insertID(self, nickname, ID):
- """Record the server named 'nickname' as having an identity key
- with fingerprint 'ID'. If the server already haves a different
- ID, raise MismatchedID."""
- if self.cache is None: self.load()
-
- lcnickname = nickname.lower()
- self.dirty = 1
- old = self.cache.get(lcnickname)
- if old and old != ID:
- raise MismatchedID()
- self.cache[lcnickname] = ID
-
- def insertServer(self, server):
- """Record the identity key of ServerInfo 'server'. If another
- server with the same nickname and a different identity key is
- already known, raise MismatchedID."""
- nickname = server.getNickname()
- ID = getIDFingerprint(server)
- self.insertID(nickname, ID)
-
- def flush(self):
- """If any entries in the cache are new, write the cache to disk."""
- if self.dirty:
- self.save()
+ status = {}
+ lineof = {}
+ byName = {}
+ lineno = 0
+ for line in open(fname, 'r').readlines():
+ lineno += 1
+ line = line.strip()
+ if not line or line[0] == '#': continue
+ m = pat.match(line)
+ if not m:
+ LOG.warn("Skipping ill-formed line %s in %s",lineno,fname)
+ continue
+ vote, nickname, fingerprint = m.groups()
+ try:
+ mixminion.Config._parseNickname(nickname)
+ except mixminion.Config.ConfigError, e:
+ LOG.warn("Skipping bad nickname '%s', on line %s of %s: %s",
+ nickname, lineno, fname, e)
+ continue
+ try:
+ ident = binascii.a2b_hex(fingerprint.replace(" ", ""))
+ if len(ident) != mixminion.Crypto.DIGEST_LEN:
+ raise TypeError("Wrong length for digest")
+ except TypeError, e:
+ LOG.warn("Invalid fingerprint on line %s of %s: %s", lineno,
+ fname, e)
+ continue
+ if status.has_key(ident):
+ LOG.warn("Ignoring duplicate entry for fingprint on line %s (first appeared on line %s)", lineno, lineof[ident])
+ continue
+ lineof[ident] = lineno
+ if vote == 'yes':
+ status[ident] = (vote, nickname)
+ if byName.has_key(nickname.lower()):
+ LOG.warn("Ignoring second yes-vote for a nickname %r",
+ nickname)
+ continue
+ byName[nickname] = ident
+ else:
+ status[ident] = (vote, None)
+ self.status = status
+ self.dirty = 1
+ finally:
+ f.close()
- def load(self):
- """Re-read the cache from disk."""
- if not os.path.exists(self.location):
- LOG.info("No ID cache; will create")
- self.cache = {}
+ def appendUnknownServers(self, lst):
+ # list of [(nickname, fingerprint) ...]
+ if not lst:
return
+ f = open(fname, 'a+')
try:
- obj = readPickled(self.location)
- # Pass pickling error
- except OSError, e:
- raise MixError("Cache exists, but cannot read cache: %s" % e)
- if len(obj) != 2:
- raise MixFatalError("Corrupt ID cache")
-
- typecode, data = obj
- if typecode != 'V0':
- raise MixFatalError("Unrecognized version on ID cache.")
+ f.seek(-1, 2)
+ nl = (f.read(1) == '\n')
+ if not nl: f.write("\n")
+ for name, fp in lst:
+ f.write("# Added %s\n#abstain %s %s\n"%(date, name, fp))
+ finally:
+ f.close()
- self.cache = data
+ def _loadFromCache(self):
+ # raise OSError or return false on can't/shouldn't load.
+ cacheFname = self.fname + ".cache"
+ try:
+ cache_mtime = os.stat(cacheFname)[stat.ST_MTIME]
+ file_mtime = os.stat(self.fname)[stat.ST_MTIME]
+ except OSError:
+ return 0
+ if file_mtime >= cache_mtime:
+ return 0
+ try:
+ p = readPickled(cacheFname)
+ except (OSError, cPickle.UnpicklingError), _:
+ return 0
+ if type(p) != types.TupleType or p[0] != 'VoteCache-0':
+ return 0
+ self.status = p[1]
+ self.dirty = 0
+ return 1
- def save(self):
- """Write the cache to disk."""
- if self.cache is None:
- return
- writePickled(self.location,
- ("V0", self.cache),
- 0640)
- if self.postSave:
- self.postSave()
+ def saveCache(self):
+ cacheFname = self.fname + ".cache"
+ writePickled(cacheFname, ("VoteCache-0", self.status), 0640)
+ if self.uid is not None and self.gid is not None:
+ _set_uid_gid_mode(cacheFname, self.uid, self.gid, 0640)
+ _set_uid_gid_mode(self.name, self.uid, self.gid, 0640)
self.dirty = 0
-def getIDFingerprint(server):
- """Given a ServerInfo, return the fingerprint of its identity key.
+ def getServerStatus(self, server):
+ # status + 'unknown' + 'mismatch'
+ ident = server.getIdentityDigest()
+ try:
+ vote, nickname = self.status[ident]
+ except KeyError:
+ return "unknown"
- We compute fingerprints by taking the ASN.1 encoding of the key,
- then taking the SHA1 hash of the encoding."""
- ident = server.getIdentity()
- return mixminion.Crypto.sha1(
- mixminion.Crypto.pk_encode_public_key(ident))
+ if vote == 'yes' and nickname != server.getNickname():
+ return "mismatch"
+
+ return vote
def _set_uid_gid_mode(fn, uid, gid, mode):
"""Change the permissions on the file named 'fname', so that fname
Index: ServerInbox.py
===================================================================
RCS file: /home/minion/cvsroot/src/minion/lib/mixminion/directory/ServerInbox.py,v
retrieving revision 1.12
retrieving revision 1.13
diff -u -d -r1.12 -r1.13
--- ServerInbox.py 28 Nov 2003 04:14:04 -0000 1.12
+++ ServerInbox.py 3 May 2005 03:26:50 -0000 1.13
@@ -16,28 +16,17 @@
formatBase64, readPickled, tryUnlink, writePickled
from mixminion.ServerInfo import ServerInfo
-from mixminion.directory.Directory import getIDFingerprint, MismatchedID
-from mixminion.directory.ServerList import _writeServer, _readServer
-
class ServerInbox:
"""A ServerInbox holds server descriptors received from the outside
world that are not yet ready to be included in the directory.
"""
## Fields:
- # newQueue: IncomingQueue object to hold descriptors for previously
- # unknown servers.
- # updateQueue: IncomingQueue object to hold descriptors for currently
- # known servers.
- def __init__(self, base, idCache):
- """Initialize a ServerInbox to store its files in 'base', and
- check server descriptors against the IDCache 'idCache'."""
- self.newQueue = IncomingQueue(os.path.join(base, "new"),
- os.path.join(base, "reject"))
- self.updateQueue = IncomingQueue(os.path.join(base, "updates"),
- os.path.join(base, "reject"))
- self.idCache = idCache
+ def __init__(self, store, voteFile):
+ """DOCDOC"""
+ self.store = store
+ self.voteFile = voteFile
- def receiveServer(self, text, source):
+ def receiveServer(self, text, source, now=None):
"""Process a new server descriptor and store it for later action.
(To be run by the CGI user.)
@@ -50,214 +39,59 @@
source -- a (human readable) string describing the source
of the descriptor, used in error messages.
- """
+ """
+ if now is None:
+ now = time.time()
+
try:
- server = ServerInfo(string=text,assumeValid=0)
+ server = ServerInfo(string=text,assumeValid=0,_keepContents=1)
except MixError, e:
LOG.warn("Rejected invalid server from %s: %s", source,e)
raise UIError("Server descriptor was not valid: %s"%e)
- nickname = server.getNickname()
-
- try:
- known = self.idCache.containsServer(server)
- except MismatchedID:
- LOG.warn("Rejected server with mismatched identity from %s",
- source)
- self.updateQueue.queueRejectedServer(text,server)
+ status = self.voteFile.getServerStatus(server)
+ if status == "mismatch":#XXXX missing case? (I found an elif here)
+ LOG.warn("Rejected server with mismatched identity for %s from %s",
+ nickname, source)
+ self.store.addServer(server)
raise UIError(("I already know a server named "
- "%s with a different key.")%nickname)
+ "%s with a different key.")%server.getNickname())
- if not known:
+ if server.isExpiredAt(time.time()):
+ LOG.warn("Rejecting expired descriptor from %s", source)
+ raise UIError("That descriptor is already expired; your clock"
+ " is probably skewed.")
+
+ if status in ("yes", "no", "abstain"):
+ LOG.warn("Received update for already-seen server %r from %s (vote=%s)",
+ server.getNickname(), source, status)
+ self.store.addServer(server)
+ return 1
+ else:
+ assert status == "unknown"
LOG.info("Received previously unknown server %s from %s",
nickname, source)
- self.newQueue.queueIncomingServer(text,server)
+ self.store.addServer(server)
raise ServerQueuedException(
"Server queued pending manual checking")
- else:
- LOG.info("Received update for server %s from %s",
- nickname, source)
- self.updateQueue.queueIncomingServer(text,server)
- return 1
-
- def _doAccept(self, serverList, q, incoming, reject, knownOnly):
- """Helper function: move servers from an IncomingQueue into
- a ServerList. (To be run by the directory user.)
-
- serverList -- an instance of ServerList
- q -- an instance of IncomingQueue
- incoming -- a list of [filename, serverinfo, descriptor text,
- fingerprint] for servers to insert.
- reject -- a list of [filename, serverinfo, desc text, fprint]
- for servers to reject.
- knownOnly -- boolean: accept only servers with previously
- known identity keys?
- """
- accepted = []
- for fname, server, text, fp in incoming:
- try:
- serverList.importServerInfo(text,server=server,
- knownOnly=knownOnly)
- accepted.append(fname)
- except MixError, e:
- LOG.warn("ServerList refused to include server %s: %s",
- fname, e)
- reject.append((fname,server,text,fp))
-
- for fname, server, text, fp in reject:
- self.updateQueue.queueRejectedServer(text,server)
-
- fnames = accepted + [fn for fn,_,_,_ in reject]
- q.delPendingServers(fnames)
-
- def acceptUpdates(self, serverList):
- """Move updates for existing servers into the directory. (To
- be run by the directory user.)"""
- incoming = self.updateQueue.readPendingServers()
- self._doAccept(serverList, self.updateQueue, incoming, [],
- knownOnly=1)
-
- def acceptNewServer(self, serverList, nickname):
- """Move the descriptors for a new server with a given nickname
- into the directory. (To be run by a the directory user.)
-
- If the nickname is of the format name:FINGERPRINT, then
- only insert servers with the nickname/fingerprint pair.
- """
- if ':' in nickname:
- nickname, fingerprint = nickname.split(":")
- else:
- fingerprint = None
-
- lcnickname = nickname.lower()
- incoming = self.newQueue.readPendingServers()
- # Do we have any pending servers of the desired name?
- incoming = [ (fname,server,text,fp)
- for fname,server,text,fp in incoming
- if server.getNickname().lower() == lcnickname ]
- if not incoming:
- raise UIError("No incoming servers named %s"%nickname)
-
- if not fingerprint:
- fps = [fp for f,s,t,fp in incoming]
- for f in fps:
- if f != fps[0]:
- raise UIError("Multiple KeyIDs for servers named %s"%
- nickname)
- reject = []
- else:
- reject = [ (f,s,t,fp) for f,s,t,fp in incoming
- if fp != fingerprint ]
- incoming = [ (f,s,t,fp) for f,s,t,fp in incoming
- if fp == fingerprint ]
- if not incoming:
- raise UIError("No servers named %s with matching KeyID"%
- nickname)
- if reject:
- LOG.warn("Rejecting %s servers named %s with unmatched KeyIDs",
- len(reject), nickname)
-
- try:
- serverList._lock()
- serverList.learnServerID(incoming[0][1])
- self._doAccept(serverList, self.newQueue, incoming, reject,
- knownOnly=1)
- finally:
- serverList._unlock()
-
- def listNewPendingServers(self, f):
- """Print a list of new servers waiting admin attention to the file
- f."""
- incoming = self.newQueue.readPendingServers()
- # lcnickname->fp->servers
- servers = {}
- # lcnickname->nicknames
- nicknames = {}
-
- for fname,s,t,fp in incoming:
- nickname = s.getNickname()
- lcnickname = nickname.lower()
- nicknames.setdefault(lcnickname, []).append(nickname)
- servers.setdefault(lcnickname, {}).setdefault(fp, []).append(s)
-
- sorted = nicknames.keys()
- sorted.sort()
- if not sorted:
- print >>f, "No incoming descriptors"
- return
- maxlen = max([len(n) for n in sorted])
- format = " %"+str(maxlen)+"s:%s [%s descriptors]"
- for lcnickname in sorted:
- nickname = nicknames[lcnickname][0]
- ss = servers[lcnickname]
- if len(ss) > 1:
- print >>f, ("***** MULTIPLE KEYIDS FOR %s:"%nickname)
- for fp, s in ss.items():
- print >>f, (format%(nickname,fp,len(s)))
-
-class IncomingQueue:
- """Implementation helper: holds incoming server descriptors as
- separate files in a directory."""
- def __init__(self, incomingDir, rejectDir):
- """Create an IncomingQueue to hold incoming servers in incomingDir
- and rejected servers in rejectDir."""
- self.incomingDir = incomingDir
- self.rejectDir = rejectDir
- if not os.path.exists(incomingDir):
- raise MixFatalError("Incoming directory doesn't exist")
- if not os.path.exists(rejectDir):
- raise MixFatalError("Reject directory doesn't exist")
-
- def queueIncomingServer(self, contents, server):
- """Write a server into the incoming directory.
-
- contents -- the text of the server descriptor.
- server -- the parsed server descriptor.
- """
- nickname = server.getNickname()
- _writeServer(self.incomingDir, contents, nickname, 0644)
-
- def queueRejectedServer(self, contents, server):
- """Write a server into the rejected directory.
-
- contents -- the text of the server descriptor.
- server -- the parsed server descriptor.
- """
- nickname = server.getNickname()
- _writeServer(self.rejectDir, contents, nickname, 0644)
-
- def newServersPending(self, newServ):
- """Return true iff there is a new server waiting in the incoming
- directory."""
- return len(os.listdir(self.incomingDir)) > 0
- def readPendingServers(self):
- """Scan all of the servers waiting in the incoming directory. If
- any are bad, remove them. Return a list of
- (filename, ServerInfo, server descriptor, ID Fingerprint)
- tuples for all the servers in the directory.
- """
- res = []
- for fname in os.listdir(self.incomingDir):
- path = os.path.join(self.incomingDir,fname)
+ def moveEntriesToStore(self, intoStore):
+ keys = self.store.listKeys()
+ unknown = {}
+ for k in keys:
try:
- text, server = _readServer(path)
- except MixError, e:
- os.unlink(path)
- LOG.warn(
- "Removed a bad server descriptor %s from incoming dir: %s",
- fname, e)
- continue
- fp = formatBase64(getIDFingerprint(server))
- res.append((fname, server, text, fp))
- return res
-
- def delPendingServers(self, fnames):
- """Remove a list of pending servers with filename 'filename' from
- the incoming directory."""
- for fname in fnames:
- if not tryUnlink(os.path.join(self.incomingDir, fname)):
- LOG.warn("delPendingServers: no such server %s"%fname)
+ s = self.store.loadServer(k, keepContents=1, assumeValid=0)
+ except (OSError, mixminion.Config.ConfigError), _:
+ self.store.delServer(s)
+ else:
+ status = self.voteFile.getServerStatus(s)
+ if status not in ("ignore", "mismatch"):
+ intoStore.addServer(s)
+ if status == 'unknown':
+ unknown[(s.getNickname(), getIdentityFingerprint())]=1
+ self.store.delServer(k)
+ if unknown:
+ self.voteFile.appendUnknownServers(unknown.keys())
class ServerQueuedException(Exception):
"""Exception: raised when an incoming server is received for a previously
Index: ServerList.py
===================================================================
RCS file: /home/minion/cvsroot/src/minion/lib/mixminion/directory/ServerList.py,v
retrieving revision 1.56
retrieving revision 1.57
diff -u -d -r1.56 -r1.57
--- ServerList.py 27 Jul 2004 03:13:02 -0000 1.56
+++ ServerList.py 3 May 2005 03:26:50 -0000 1.57
@@ -19,9 +19,11 @@
import os
import time
import threading
-import mixminion
+import xreadlines
+import mixminion
import mixminion.Config
+import mixminion.Filestore
import mixminion.directory.Directory
from mixminion.Crypto import pk_decode_public_key, pk_encode_public_key, \
@@ -35,477 +37,282 @@
from mixminion.ServerInfo import ServerDirectory, ServerInfo, \
_getDirectoryDigestImpl
-class ServerList:
- """A ServerList holds a set of server descriptors for use in generating
- directories. It checks new descriptors for consistency with old ones
- as they are inserted. It will reject any server if:
- -- it is expired (Valid-Until in the past)
- -- it is superseded (For all time it is valid, a more-recently-
- published descriptor is also valid.)
- -- it is inconsistent (We already know a descriptor for this
- nickname, with a different identity key.)
- [FFFF This check will become stricter in the future.]
+"""
+Redesign notes:
- This implementation isn't terribly optimized, but there's no need to
- optimize it until we have far more descriptors to worry about.
- """
- ##Fields:
- # baseDir: Base directory of this list
- # serverDir: Directory where we store active descriptors.
- # rejectDir: Directory where we store invalid descriptors.
- # archiveDir: Directory where we store old descriptors
- # servers: Map from filename within <serverDir> to ServerInfo objects.
- # serversByNickname: A map from lowercased server nickname to
- # lists of filenames within <serverDir>
- # idCache: an instance of Directory.IDCache
- ##Layout:
- # basedir
- # server-ids/
- # nickname-dateinserted
- # (Pickled: ("V0", (nickname, encoded public key)))
- # incoming/new/
- # nickname-dateinserted.N ...
- # incoming/updates/
- # nickname-dateinserted.N ...
- # servers/
- # nickname-dateinserted.N ...
- # archive/
- # nickname-dateinserted.N ...
- # reject/
- # nickname-dateinserted.N ...
- # directory
- # dirArchive/
- # dir-dategenerated.N ...
- # identity
- # .lock
+ We need to store a pile of descriptors. They aren't always ones we
+ believe in. They can be:
+ - expired (archive these)
+ - superseded (archive these)
+ - reliable or not
+ - trusted or not
- def __init__(self, baseDir, config, idCache=None):
- """Initialize a ServerList to store servers under baseDir/servers,
- creating directories as needed.
- """
- self.baseDir = baseDir
- self.config = config
- if idCache is None:
- idCache = mixminion.directory.Directory.IDCache(
- os.path.join(baseDir, "xx_idcache"))
- self.idCache = idCache
+ Our workflow looks like this:
+ 1. On node publish: serverdesc stored into an inbox by DirCGI.
+ Minimal validation performed: check for another server with same nick,
+ different identity; check for expired/far-future.
+ 2. Regularly: copy servers from inbox into main store, revalidating them.
+ - Move bogus servers into reject (Bogus==can't validate; have validated
+ different ID with that nick; etc.).
+ - Move dead/superseded servers into archive.
+ - Make current-raw-servers.gz
+ [ENTRY]
+ 3. Regularly: Pull other directories' current-raw-servers.gz and find
+ out if there are any new servers. [ENTRY]
+ 4. Regularly: if there are any servers neither trusted nor distrusted,
+ email the administrator. [ENTRY]
+ 5. Daily:
+ - Generate a vote directory. [ENTRY]
+ - Download all other vote directories. [ENTRY]
+ - Generate consensus directory [ENTRY]
+ - Pull down sigs from other consensus directories; attach them
+ to consensus directory. [ENTRY]
+LATER:
+ - Incorporate information from pinger. Make sure probationary
+ servers get pinged.
- self.serverIDDir = os.path.join(self.baseDir, "server-ids")
- self.serverDir = os.path.join(self.baseDir, "servers")
- self.rejectDir = os.path.join(self.baseDir, "reject")
- self.archiveDir = os.path.join(self.baseDir, "archive")
- self.dirArchiveDir = os.path.join(self.baseDir, "dirArchive")
- self.lockfile = Lockfile(os.path.join(self.baseDir, ".lock"))
- self.rlock = threading.RLock()
- self.servers = {}
- self.serversByNickname = {}
- createPrivateDir(self.serverIDDir)
- createPrivateDir(self.serverDir)
- createPrivateDir(self.rejectDir)
- createPrivateDir(self.archiveDir)
- createPrivateDir(self.dirArchiveDir)
- self.rescan()
+"""
- def isServerKnown(self, server):
- """Return true iff the current server descriptor is known. Raises
- MixError if we have a server descriptor with this name, but
- a different key."""
- try:
- self._lock()
- try:
- return self.idCache.containsServer(server)
- except mixminion.directory.Directory.MismatchedID:
- raise UIError(("Already know a server named "
- "%r with different identity key.")
- % server.getNickname())
- finally:
- self._unlock()
+class DescriptorStatus:
+ def __init__(self, digest, published, validAfter, validUntil, nickname,
+ identityDigest):
+ self._digest = digest
+ self._published = published
+ self._validAfter = validAfter
+ self._validUntil = validUntil
+ self._nickname = nickname
+ self._identityDigest = identityDigest
- def rebuildIDCache(self):
- for fn in os.listdir(self.serverIDDir):
- fname = os.path.join(self.serverIDDir, fn)
- tp,val = readPickled(fname)
- if tp != "V0":
- LOG.warn("Weird file version %s on %s",tp,fname)
- continue
- nickname, ident = val
- ID = mixminion.Crypto.sha1(ident)
- self.idCache.insertID(nickname, ID)
+ def isSupersededBy(self, others):
+ valid = IntervalSet([(self._validAfter, self._validUntil)])
+ for o in others:
+ if (o._published > self._published and
+ o._identityDigest == self._identityDigest):
+ valid -= o.getIntervalSet()
+ return valid.isEmpty()
- def learnServerID(self, server):
- """Mark the ID for a server descriptor as the canonical
- identity key associated with that server's nickname."""
- try:
- self._lock()
- ident = server.getIdentity()
- nickname = server.getNickname()
- try:
- if self.idCache.containsServer(server):
- LOG.warn("Server %s already known", nickname)
- except mixminion.directory.MismatchedID:
- raise MixFatalError("Mismatched ID for server %s" % nickname)
+class ServerStore:
+ KEY_LENGTH=29
+ def __init__(self, location, dbLocation, insertOnly=0):
+ self._loc = location
+ self._dbLoc = dbLocation
+ if not insertOnly:
+ self.clean()
+ self._statusDB = mixminion.Filestore.WritethroughDict(
+ self._dbLoc, "server cache")
+ else:
+ self._statusDB = None
+ createPrivateDir(location)
- LOG.info("Learning identity for new server %s", nickname)
- self.idCache.insertServer(server)
- writePickled(os.path.join(self.serverIDDir,
- nickname+"-"+formatFnameTime()),
- ("V0", (nickname, pk_encode_public_key(ident))))
- self.idCache.save()
- finally:
- self._unlock()
+ def close(self):
+ self._statusDB.close()
- def importServerInfo(self, contents, knownOnly=0, server=None):
- """Insert a ServerInfo into the list. If the server is expired, or
- superseded, or inconsistent, raise a MixError.
+ def sync(self):
+ self._statusDB.sync()
- contents -- a string containing the descriptor, or the name of a
- file containing the descriptor (possibly gzip'd)
- knownOnly -- if true, raise MixError is we don't already have
- a descriptor with this nickname.
- server -- If provided, a parsed ServerInfo corresponding to
- 'contents'.
- """
- # Raises ConfigError, MixError,
+ def hasServer(self, server):
+ key = self._getKey(server.getDigest())
+ if self._statusDB is None:
+ return os.path.exists(os.path.join(self._loc,key))
+ else:
+ return self._statusDB.has_key(key)
- if not server:
- contents, server = _readServer(contents)
+ def addServer(self, server, contents=None):
+ # returns key
+ if contents is None:
+ assert server._originalContents
+ contents = server._originalContents
+ key = self._getKey(server.getDigest())
+ f = AtomicFile(os.path.join(self._loc,key))
try:
- self._lock()
+ f.write(contents)
+ f.close()
+ except:
+ f.discard()
+ raise
- nickname = server.getNickname()
- lcnickname = nickname.lower()
+ if self._statusDB is not None:
+ self._updateCache(key, server)
+ return key
- known = self.isServerKnown(server)
- if knownOnly and not known:
- raise UIError("Unknown server %s: use import-new."%nickname)
+ def delServer(self, key):
+ if self._statusDB is not None:
+ try:
+ del self._statusDB[key]
+ except KeyError:
+ pass
+ try:
+ os.unlink(os.path.join(self._loc, key))
+ except OSError:
+ pass
- # Is the server already invalid?
- if server.isExpiredAt(time.time()):
- raise UIError("Descriptor has already expired")
+ def rescan(self):
+ self._statusDB.close()
+ os.path.unlink(self._dbLoc)
+ self.clean()
+ self._statusDB = mixminion.Filestore.WritethroughDict(
+ self._dbLoc, "server cache")
+ for key in os.listdir(self._loc):
+ fn = os.path.join(self._loc, key)
+ try:
+ #XXXX digest-cache
+ server = ServerInfo(fname=fn)
+ except (OSError, MixError, ConfigError), e:
+ LOG.warn("Deleting invalid server %s: %s", key, e)
+ os.unlink(fn)
+ server = None
+ if server is None: continue
- # Is there already a server with the same nickname?
- if self.serversByNickname.has_key(lcnickname):
- # Okay -- make sure we don't have this same descriptor.
- for fn in self.serversByNickname[lcnickname]:
- oldServer = self.servers[fn]
- if oldServer['Server']['Digest'] == \
- server['Server']['Digest']:
- raise UIError("Server descriptor already inserted.")
- # Okay -- make sure that this server isn't superseded.
- if server.isSupersededBy(
- [ self.servers[fn] for fn in
- self.serversByNickname[lcnickname]]):
- raise UIError("Server descriptor is superseded")
+ k2 = self._getKey(server.getDigest())
+ if k2 != key:
+ LOG.info("Renaming server in %s to correct file %s",key,k2)
+ os.rename(fn, os.path.join(self._loc, k2))
+ key = k2
+ self._updateCache(key, server)
- if not known:
- # Is the identity new to us?
- self.learnServerID(server)
+ self.flush()
- newFile = _writeServer(self.serverDir, contents, nickname)
+ def archiveServers(self, archiveLocation, now=None):
+ if now is not None:
+ now = time.time()
- # Now update the internal structure
- self.servers[newFile] = server
- self.serversByNickname.setdefault(lcnickname, []).append(newFile)
- finally:
- self._unlock()
+ archive = {}
+ byIdentity = {}
+ for key, status in self._statusDB.items():
+ if status._validUntil < now:
+ archive[key] = 1
+ continue
+ byIdentity.setdefault(status._identityDigest, []).append(status)
- def expungeServersByNickname(self, nickname):
- """Forcibly remove all servers named <nickname>"""
- try:
- self._lock()
- LOG.info("Removing all servers named %s", nickname)
- lcnickname = nickname.lower()
- if not self.serversByNickname.has_key(lcnickname):
- LOG.info(" (No such servers exist)")
- return
- servers = self.serversByNickname[lcnickname]
- for fn in servers:
- LOG.info(" Removing %s", fn)
- _moveServer(self.serverDir, self.archiveDir, fn)
- del self.servers[fn]
- del self.serversByNickname[lcnickname]
- LOG.info(" (%s servers removed)", len(servers))
- finally:
- self._unlock()
+ for ident, servers in byIdentity.items():
+ for s in servers:
+ if s.isSupersededBy(servers):
+ archive[self._getKey(s._digest)] = 1
- def generateDirectory(self,
- startAt, endAt, extraTime,
- identityKey,
- publicationTime=None,
- badServers=(),
- excludeServers=()):
- """Generate and sign a new directory, to be effective from <startAt>
- through <endAt>. It includes all servers that are valid at
- any time between <startAt> and <endAt>+<extraTime>. The directory
- is signed with <identityKey>.
+ for key in archive.keys():
+ self.moveServer(key,archiveLocation)
- Any servers whose nicknames appear in 'badServers' are marked as
- not recommended; any servers whose nicknames appear in
- 'excludeServers' are left off the directory entirely.
- """
+ def moveServer(self, key, location):
+ os.rename(os.path.join(self._loc, key),
+ os.path.join(location, key))
try:
- self._lock()
- self.clean()
- if publicationTime is None:
- publicationTime = time.time()
- if previousMidnight(startAt) >= previousMidnight(endAt):
- raise MixError("Validity range does not contain a full day.")
+ del self._statusDB[key]
+ except KeyError:
+ pass
- excludeServers = [ nickname.lower() for nickname in excludeServers]
+ def loadServer(self, key, keepContents=0, assumeValid=1):
+ #XXXX digest-cache
+ return ServerInfo(fname=os.path.join(self._loc,key),
+ assumeValid=assumeValid,
+ _keepContents=keepContents)
- # First, sort all servers by nickname.
- includedByNickname = {}
- for fn, s in self.servers.items():
- nickname = s.getNickname().lower()
- if nickname in excludeServers: continue
- includedByNickname.setdefault(nickname, []).append((s, fn))
+ def listKeys(self):
+ if self._statusDB is not None:
+ return self._statusDB.keys()
+ else:
+ return [ f for f in os.path.listdir(self._loc)
+ if not f.endswith(".tmp") ]
- # Second, find all servers that are valid for part of the period,
- # and that aren't superseded for the whole period.
- timeRange = IntervalSet([(previousMidnight(startAt),
- endAt+extraTime)])
+ def getByNickname(self, nickname):
+ return [ key for key,status in self._statusDB.items()
+ if status._nickname == nickname ]
- for nickname, ss in includedByNickname.items():
- # We prefer the most-recently-published descriptor. If two
- # are published at the same time, we prefer the one that
- # expires last.
- ss = [ (s['Server']['Published'],
- s['Server']['Valid-Until'],
- s, fn) for s,fn in ss]
- ss.sort()
- ss.reverse()
- uncovered = timeRange.copy()
- included = []
- for _, _, s, fn in ss:
- valid = s.getIntervalSet()
- if (uncovered * valid):
- included.append((s, fn))
- uncovered -= valid
- includedByNickname[nickname] = included
+ def getByIdentityDigest(self, digest):
+ return [ key for key,status in self._statusDB.items()
+ if status._identityDigest == identityDigest ]
- # Now sort the remaining servers by nickname, then by valid-after.
- included = []
- for ss in includedByNickname.values():
- for s,fn in ss:
- nickname = s.getNickname()
- validAfter = s['Server']['Valid-After']
- included.append((nickname, validAfter, fn))
- included.sort()
+ def getByLiveness(self, startAt, endAt):
+ return [ key for key,status in self._statusDB.items()
+ if (endAt > status._validAfter and
+ startAt < status._validUntil) ]
- # FFFF We should probably not do all of this in RAM, but
- # FFFF what the hey. It will only matter if we have many, many
- # FFFF servers in the system.
- contents = [ ]
- for _, _, fn in included:
- txt = readFile(os.path.join(self.serverDir, fn))
- contents.append(txt)
+ def _updateCache(self, key, server):
+ assert key == self._getKey(server.getDigest())
- goodServers = [n for n,_,_ in included if n not in badServers]
- g = {}
- for n in goodServers: g[n]=1
- goodServers = g.keys()
- goodServers.sort()
- goodServers = ", ".join(goodServers)
+ sec = server['Server']
- clientVersions = self.config['Directory']['ClientVersions']
- serverVersions = self.config['Directory']['ServerVersions']
+ status = DescriptorStatus(sec['Digest'],
+ sec['Published'],
+ sec['Valid-After'],
+ sec['Valid-Until'],
+ sec['Nickname'],
+ server.getKeyDigest())
+ self._statusDB[key] = status
- #FFFF Support for multiple signatures
- header = """\
- [Directory]
- Version: 0.2
- Published: %s
- Valid-After: %s
- Valid-Until: %s
- Recommended-Servers: %s
- [Signature]
- DirectoryIdentity: %s
- DirectoryDigest:
- DirectorySignature:
- [Recommended-Software]
- MixminionClient: %s
- MixminionServer: %s
- """ % (formatTime(publicationTime),
- formatDate(startAt),
- formatDate(endAt),
- goodServers,
- formatBase64(pk_encode_public_key(identityKey)),
- ", ".join(clientVersions),
- ", ".join(serverVersions))
+ def _getKey(self, digest):
+ k = formatBase64(digest).replace("/","-").replace("=","")
+ assert len(k) == self.KEY_LENGTH
+ return k
- directory = header+"".join(contents)
- directory = _getDirectoryDigestImpl(directory, identityKey)
+ def clean(self):
+ for fn in os.listdir(self._loc):
+ if len(fn) > self.KEY_LENGTH and stringContains(fn, ".tmp"):
+ os.unlink(os.path.join(self._loc,fn))
- # Make sure that the directory checks out
- # FFFF remove this once we are _very_ confident.
- if 1:
- parsed = ServerDirectory(string=directory)
- includedDigests = {}
- for _, _, fn in included:
- includedDigests[self.servers[fn]['Server']['Digest']] = 1
- foundDigests = {}
- for s in parsed.getAllServers():
- foundDigests[s['Server']['Digest']] = 1
- assert foundDigests == includedDigests
+ def _repOK(self):
+ self.clean()
+ keys = self._statusDB.keys()
+ fnames = os.listdir(self._loc)
+ keys.sort()
+ fnames.sort()
+ if keys != fnames: return 0
- writeFile(os.path.join(self.baseDir, "directory"),
- directory,
- mode=0644)
+ for f in fnames:
+ status = self._statusDB[f]
+ try:
+ #XXXX digest-cache
+ server = ServerInfo(fname=os.path.join(self._loc, f))
+ except:
+ return 0
+ if status._digest != server.getDigest(): return 0
+ if status._published != server['Server']['Published']: return 0
+ if status._validAfter != server['Server']['Valid-After']: return 0
+ if status._validUntil != server['Server']['Valid-Until']: return 0
+ if status._nickname != server['Server']['Nickname']: return 0
+ if status._identityDigest != server.getKeyStatus(): return 0
- f, _ = openUnique(os.path.join(self.dirArchiveDir,
- "dir-"+formatFnameTime()))
- f.write(directory)
- f.close()
- finally:
- self._unlock()
+ return 1
- def getDirectoryFilename(self):
- """Return the filename of the most recently generated directory"""
- return os.path.join(self.baseDir, "directory")
+class LiveServerList:
+ def __init__(self, store):
+ self.store = store
- def clean(self, now=None):
- """Remove all expired or superseded servers from the active directory.
- """
- # This algorithm is inefficient: O(N_descs * N_descs_per_nickname).
- # We're just going to ignore that.
+ def clean(self, voteList, archiveLocation, now=None):
+ self.store.flush()
+ self.store.clean()
+ self.store.archiveServers(archiveLocation, now=now)
+ rejectKeys = [ k for k,status in self.store._statusDB.items()
+ if voteList.status[status._identityDigest][0] == 'ignore']
+ for k in rejectKeys:
+ self.store.moveServer(k, archiveLocation)
+
+ def generateRawServerList(self, voteList, archiveLocation, outFile,
+ now=None):
if now is None:
now = time.time()
+ self.store.clean(voteList, archiveLocation, now=now)
+ # add 2 extra days for margin-of-error.
+ for k in self.store.getByLiveness(now, now+24*60*60*32):
+ f = open(os.path.join(self.store._loc, k), 'r')
+ outFile.write(f.read())
+ f.close()
- try:
- self._lock()
- removed = {} # Map from filename->whyRemoved
- # Find all superseded servers
- for servers in self.serversByNickname.values():
- servers = [ (self.servers[fn]['Server']['Published'],
- fn, self.servers[fn]) for fn in servers ]
- servers.sort()
- fns = [ fn for _, fn, _ in servers]
- servers = [ s for _, _, s in servers ]
- for idx in range(len(servers)):
- if servers[idx].isSupersededBy(servers[idx+1:]):
- removed[fns[idx]] = "superseded"
-
- # Find all expired servers.
- for fn, s in self.servers.items():
- if removed.has_key(fn):
- continue
- if s.isExpiredAt(now-6000):
- # The descriptor is expired.
- removed[fn] = "expired"
-
- # Now, do the actual removing.
- for fn, why in removed.items():
- LOG.info("Removing %s descriptor %s", why, fn)
- _moveServer(self.serverDir, self.archiveDir, fn)
- del self.servers[fn]
-
- self.__buildNicknameMap()
- finally:
- self._unlock()
-
- def rescan(self):
- """Reconstruct this ServerList object's internal state."""
- try:
- self._lock()
- # First, build self.servers
- self.servers = {}
- for filename in os.listdir(self.serverDir):
- path = os.path.join(self.serverDir, filename)
- try:
- self.servers[filename] = ServerInfo(fname=path)
- except ConfigError, e:
- LOG.warn("Somehow, a bad server named %s got in our store",
- filename)
- LOG.warn(" (Error was: %s)", str(e))
- _moveServer(self.serverDir, self.rejectDir, filename)
-
- # Next, rebuild self.serverIDs:
- self.serverIDs = {}
- for filename in os.listdir(self.serverIDDir):
- path = os.path.join(self.serverIDDir, filename)
- t = readPickled(path)
- if t[0] != 'V0':
- LOG.warn("Skipping confusing stored key in file %s",
- filename)
- continue
- nickname, key = t[1]
- key = pk_decode_public_key(key)
- if self.serverIDs.has_key(nickname.lower()):
- LOG.warn("Eeek! Multiple entries for %s", nickname)
- if not pk_same_public_key(self.serverIDs[nickname.lower()],
- key):
- raise MixFatalError(
- "Multiple conflicting entries for %s"%nickname)
- self.serverIDs[nickname.lower()] = key
-
- # (check for consistency)
- for s in self.servers.values():
- lcn = s.getNickname().lower()
- try:
- ident = self.serverIDs[lcn]
- except KeyError:
- raise UIError("No stored key for server %s" %
- s.getNickname())
-
- if not pk_same_public_key(ident, s.getIdentity()):
- raise UIError("Inconsistent stored key for server %s" %
- s.getNickname())
-
- # Then, rebuild self.serversByNickname
- self.__buildNicknameMap()
- finally:
- self._unlock()
-
- def __buildNicknameMap(self):
- """Helper method. Regenerate self.serversByNickname from
- self.servers
-
- Caller must hold lock."""
- self.serversByNickname = {}
- for fn, server in self.servers.items():
- nickname = server.getNickname()
- self.serversByNickname.setdefault(nickname.lower(), []).append(fn)
-
- def _lock(self):
- self.rlock.acquire()
- self.lockfile.acquire(blocking=1)
-
- def _unlock(self):
- self.lockfile.release()
- self.rlock.release()
-
-def _moveServer(directory1, directory2, fname):
- """Move a server contained in directory1/fname into directory2, chosing
- a new name for it as needed, and avoiding duplicates."""
- fname1 = os.path.join(directory1, fname)
- if "-" in fname:
- nn = fname[:fname.find("-")]
- else:
- nn = "UNKNOWN"
- s = readPossiblyGzippedFile(fname1)
- _writeServer(directory2, s, nn)
- os.unlink(fname1)
+ def addServersFromInbox(self, inbox):
+ self.inbox.moveEntriesToStore(self)
-def _writeServer(directory, contents, nickname, mode=0600):
- """Write a server descriptor in 'contents' into a new file in
- 'directory'. The file will have permissions 'mode', and a name
- of the form nickname-YYYYMMDDHHMMSS.n
- """
- newFile = nickname+"-"+formatFnameTime()
- f, newFile = openUnique(os.path.join(directory, newFile), 'w', mode)
- newFile = os.path.split(newFile)[1]
- f.write(contents)
- f.close()
- return newFile
+ def _addOneFromRawDirLines(self, lines):
+ s = "".join(lines)
+ #XXXX digest-cache
+ si = ServerInfo(s,assumeValid=0,keepContents=1)
+ if not self.store.hasServer(si):
+ self.store.addServer(si)
-def _readServer(contents):
- """Read a ServerInfo from the string 'contents', which is either a
- server descriptor or the name of a file holding a descriptor.
- Raise MixError on failure."""
- if stringContains(contents, "[Server]"):
- pass
- else:
- contents = readPossiblyGzippedFile(contents)
+ def addServersFromRawDirectoryFile(self, file):
+ curLines = []
+ for line in mixminion.Common.iterFileLines(file):
+ if line == '[Server]\n' and curLines:
+ self._addOneFromRawLines(curLines)
+ del curLines[:]
+ if curLines:
+ self._addOneFromRawLines(curLines)
- # May raise ConfigError, MixError
- return contents, ServerInfo(string=contents, assumeValid=0)