[Author Prev][Author Next][Thread Prev][Thread Next][Author Index][Thread Index]
[minion-cvs] Refactor incoming queues for better separation of permi...
Update of /home/minion/cvsroot/src/minion/lib/mixminion/directory
In directory moria.mit.edu:/tmp/cvs-serv8806/lib/mixminion/directory
Modified Files:
ServerList.py
Added Files:
DirCGI.py
Log Message:
Refactor incoming queues for better separation of permissions; add publication
--- NEW FILE: DirCGI.py ---
# Copyright 2003 Nick Mathewson. See LICENSE for licensing information.
# $Id: DirCGI.py,v 1.1 2003/05/23 22:49:30 nickm Exp $
"""mixminion.directory.DirCGI
Backend for directory-publish CGI.
"""
__all__ = [ ]
DIRECTORY_BASE = None
IDCACHE_FILE = None
QUEUE_BASE = None
def addServer(text, base):
DIRECTORY_
#XXXX write more.
Index: ServerList.py
===================================================================
RCS file: /home/minion/cvsroot/src/minion/lib/mixminion/directory/ServerList.py,v
retrieving revision 1.16
retrieving revision 1.17
diff -u -d -r1.16 -r1.17
--- ServerList.py 23 May 2003 07:54:11 -0000 1.16
+++ ServerList.py 23 May 2003 22:49:30 -0000 1.17
@@ -70,8 +70,10 @@
# server-ids/
# nickname-dateinserted
# (Pickled: ("V0", (nickname, encoded public key)))
- # incoming/
- # nickname-dateinserted.N ...
+ # incoming/new/
+ # nickname-dateinserted.N ...
+ # incoming/updates/
+ # nickname-dateinserted.N ...
# servers/
# nickname-dateinserted.N ...
# archive/
@@ -83,17 +85,20 @@
# dir-dategenerated.N ...
# identity
# .lock
- def __init__(self, baseDir):
+ #
+ # idCacheFile:
+ def __init__(self, baseDir, idCacheFile=None):
"""Initialize a ServerList to store servers under baseDir/servers,
creating directories as needed.
"""
self.baseDir = baseDir
self.serverIDDir = os.path.join(self.baseDir, "server-ids")
self.serverDir = os.path.join(self.baseDir, "servers")
- self.incomingDir = os.path.join(self.baseDir, "incoming")
self.rejectDir = os.path.join(self.baseDir, "reject")
self.archiveDir = os.path.join(self.baseDir, "archive")
self.dirArchiveDir = os.path.join(self.baseDir, "dirArchive")
+ self.idCacheFile = idCacheFile or os.path.join(self.baseDir,
+ "id_cache")
self.lockfile = Lockfile(os.path.join(self.baseDir, ".lock"))
self.rlock = threading.RLock()
self.servers = {}
@@ -139,6 +144,7 @@
("V0", (nickname, pk_encode_public_key(ident))))
self.serverIDs[nickname.lower()] = ident
+ self._writeIDCache()
finally:
self._unlock()
@@ -394,6 +400,8 @@
# Then, rebuild self.serversByNickname
self.__buildNicknameMap()
+
+ self._writeIDCache()
finally:
self._unlock()
@@ -415,40 +423,52 @@
self.lockfile.release()
self.rlock.release()
-class ServerQueue:
+ def _writeIDCache(self):
+ writePickled(self.idCacheFile, ("V0", self.serverIDs), 0644)
+
+class IncomingQueue:
"""DOCDOC"""
- def __init__(self, incomingDir):
+ def __init__(self, incomingDir, rejectDir):
"""DOCDOC"""
self.incomingDir = incomingDir
- createPrivateDir(self.incomingDir)
+ self.rejectDir = rejectDir
+ if not os.path.exists(incomingDir):
+ raise MixFatalError("Incoming directory doesn't exist")
+ if not os.path.exists(rejectDir):
+ raise MixFatalError("Reject directory doesn't exist")
def queueIncomingServer(self, contents, server):
"""DOCDOC"""
nickname = server.getNickname()
_writeServer(nickname, contents, self.incomingDir)
- def serversPending(self):
+ def queueRejectedServer(self, contents, server):
+ nickname = server.getNickname()
+ _writeServer(nickname, contents, self.rejectDir)
+
+ def newServersPending(self, newServ):
"""DOCDOC"""
return len(os.listdir(self.incomingDir)) > 0
def readPendingServers(self):
+ d = self.incomingDir
res = []
- for fname in os.listdir(self.incomingDir):
- path = os.path.join(self.incomingDir,fname)
+ for fname in os.listdir(d):
+ path = os.path.join(d,fname)
try:
text, server = _readServer(path)
except (ConfigError, MixError),e:
os.unlink(path)
LOG.warn("Removed a bad server descriptor %s from incoming dir: %s",
fname, e)
-
+ continue
res.append((fname, server, text, getIDFingerprint(server)))
return res
def delPendingServers(self, fnames):
for fname in fnames:
try:
- os.path.unlink(os.path.join(self.incomingDir, fname))
+ os.path.unlink(os.path.join(d, fname))
except OSError:
LOG.warn("delPendingServers: no such server %s"%fname)
@@ -456,114 +476,145 @@
"""DOCDOC"""
pass
-def receiveServer(serverList, incomingQueue, text, source):
- """DOCDOC
+class ServerIntake:
+ def __init__(self, base, idCacheFile):
+ self.newQueue = IncomingQueue(os.path.join(base, "new"),
+ os.path.join(base, "reject"))
+ self.updateQueue = IncomingQueue(os.path.join(base, "updates"),
+ os.path.join(base, "reject"))
+ self.idCacheFile = idCacheFile
+ self.idCache = None
- Returns true on OK; raises UIError on failure; raises ServerQueued on
- wait-for-admin."""
- try:
- text, server = _readServer(text)
- except MixError, e:
- LOG.warn("Rejected invalid server from %s: %s", source,e)
- raise UIError("Server descriptor was not valid: %s"%e)
+ def getIDCache(self):
+ if self.idCache is None:
+ o = readPickled(self.idCacheFile)
+ if o[0] != 'V0':
+ raise MixFatalError("Malformatted ID cache")
+ self.idCache = o[1]
+ return self.idCache
- try:
- known = serverList.isServerKnown(server)
- except MixError, e:
- LOG.warn("Rejected server with mismatched identity from %s", source)
- raise
+ def receiveServer(self, text, source):
+ """DOCDOC
- if not known:
- LOG.info("Received previously unknown server from %s", source)
- incomingQueue.queueIncomingServer(text,server)
- raise ServerQueuedException()
+ Returns true on OK; raises UIError on failure; raises
+ ServerQueued on wait-for-admin.
+ """
+ try:
+ text, server = _readServer(text)
+ except MixError, e:
+ LOG.warn("Rejected invalid server from %s: %s", source,e)
+ raise UIError("Server descriptor was not valid: %s"%e)
- serverList.importServerInfo(text, knownOnly=1, server=server)
+ nickname = server.getNickname()
+ lcnickname = nickname.lower()
+ identity = server.getIdentity()
+ oldIdentity = self.getIDCache().get(lcnickname)
+ if oldIdentity is None:
+ LOG.info("Received previously unknown server %s from %s",
+ nickname, source)
+ self.newQueue.queueIncomingServer(text,server)
+ return
-def acceptServer(serverList, incomingQueue, nickname):
- if ':' in nickname:
- nickname, fingerprint = nickname.split(":")
- else:
- fingerprint = None
-
- lcnickname = nickname.lower()
- incoming = incomingQueue.readPendingServers()
- # Do we have any pending servers of the desired name?
- incoming = [ (fname,server,text,fp)
- for fname,server,text,fp in incoming
- if server.getNickname().lower() == lcnickname ]
- if not incoming:
- raise UIError("No incoming servers named %s"%nickname)
+ if pk_same_public_key(oldIdentity, identity):
+ LOG.info("Received update for server %s"%server)
+ self.updateQueue.queueIncomingServer(text,server)
+ return
- if not fingerprint:
- fps = [fp for f,s,t,fp in incoming]
- for f in fps:
- if f != fps[0]:
- raise UIError("Multiple KeyIDs for servers names %s"%nickname)
- reject = []
- else:
- reject = [ (f,s,t,fp) for f,s,t,fp in incoming
- if fp != fingerprint ]
- incoming = [ (f,s,t,fp) for f,s,t,fp in incoming
- if fp == fingerprint ]
- if not incoming:
- raise UIError("No servers named %s with matching KeyID"%nickname)
- if reject:
- LOG.warn("Rejecting %s server(s) named %s with unmatching KeyIDs",
- len(reject), nickname)
+ LOG.warn("Rejected server with mismatched identity from %s", source)
+ self.updateQueue.queueRejectedServer(text,server)
+ raise UIError("I already know a server named %s with a different key.",
+ nickname)
- try:
- serverList._lock()
- serverList.learnServerID(incoming[0][1])
+ def _doAccept(self, serverList, q, incoming, reject, knownOnly):
accepted = []
- for fname,server,text,fp in incoming:
+ for fname, server, text, fp in incoming:
try:
- serverList.importServerInfo(text,server=server)
+ serverList.importServerInfo(text,server=server,
+ knownOnly=knownOnly)
accepted.append(fname)
- except MixError, e:
+ except (MixError, config), e:
LOG.warn("ServerList refused to include server %s: %s",
fname, e)
reject.append((fname,server,text,fp))
- if len(accepted):
- LOG.warn("Key ID learned; no servers actually imported")
+ for fname, server, text, fp in reject:
+ self.updateQueue.queueRejectedServer(text,server)
+
+ fnames = accepted + [fn for fn,_,_,_ in reject]
+ q.delPendingServers(fnames)
+
+ def acceptUpdates(self, serverList):
+ incoming = self.updateQueue.readPendingServers()
+ self._doAccept(serverList, self.updateQueue, incoming, [],
+ knownOnly=1)
+
+ def acceptNewServers(self, serverList, nickname):
+ if ':' in nickname:
+ nickname, fingerprint = nickname.split(":")
else:
- LOG.info("%s server descriptor(s) imported.", len(accepted))
+ fingerprint = None
- LOG.info("Moving %s descriptors to rejected status"%len(reject))
- for fname,_,text,_ in reject:
- _writeServer(serverList.rejectDir, text, "BAD-"+nickname)
+ lcnickname = nickname.lower()
+ incoming = self.newQueue.readPendingServers()
+ # Do we have any pending servers of the desired name?
+ incoming = [ (fname,server,text,fp)
+ for fname,server,text,fp in incoming
+ if server.getNickname().lower() == lcnickname ]
+ if not incoming:
+ raise UIError("No incoming servers named %s"%nickname)
- incomingQueue.delPendingServers([fname for fname,_,text,_ in reject])
- incomingQueue.delPendingServers(accepted)
- finally:
- serverList._unlock()
+ if not fingerprint:
+ fps = [fp for f,s,t,fp in incoming]
+ for f in fps:
+ if f != fps[0]:
+ raise UIError("Multiple KeyIDs for servers named %s"%
+ nickname)
+ reject = []
+ else:
+ reject = [ (f,s,t,fp) for f,s,t,fp in incoming
+ if fp != fingerprint ]
+ incoming = [ (f,s,t,fp) for f,s,t,fp in incoming
+ if fp == fingerprint ]
+ if not incoming:
+ raise UIError("No servers named %s with matching KeyID"%
+ nickname)
+ if reject:
+ LOG.warn("Rejecting %s servers named %s with unmatched KeyIDs",
+ len(reject), nickname)
+
+ try:
+ serverList._lock()
+ serverList.learnServerID(incoming[0][1])
+ self._doAccept(serverList, self.newQueue, incmoing, reject,
+ knownOnly=1)
+ finally:
+ serverList._unlock()
-def listPendingServers(f, incomingQueue):
- """DOCDOC"""
- incoming = incomingQueue.readPendingServers()
- # lcnickname->fp->servers
- servers = {}
- # lcnickname->nicknames
- nicknames = {}
+ def listNewPendingServers(self, f):
+ """DOCDOC"""
+ incoming = self.newQueue.readPendingServers()
+ # lcnickname->fp->servers
+ servers = {}
+ # lcnickname->nicknames
+ nicknames = {}
- for f,s,t,fp in incoming:
- nickname = s.getNickname()
- lcnickname = nickname.lower()
- nicknames.setdefault(lcnickname, []).append(nickname)
- servers.setdefault(lcnickname, {}).setdefault(fp, []).append(s)
+ for f,s,t,fp in incoming:
+ nickname = s.getNickname()
+ lcnickname = nickname.lower()
+ nicknames.setdefault(lcnickname, []).append(nickname)
+ servers.setdefault(lcnickname, {}).setdefault(fp, []).append(s)
- sorted = nicknames.keys()
- sorted.sort()
- maxlen = max([len(n) for n in sorted])
- format = " %"+str(maxlen)+"s:%s [%s descriptors]"
- for lcnickname in sorted:
- nickname = nicknames[lcnickname][0]
- ss = servers[lcnickname]
- if len(ss) > 1:
- print >>f, ("***** MULTIPLE KEYIDS FOR %s:"%nickname)
- for fp, s in ss:
- print >>f, (format%(nickname,fp,len(s)))
+ sorted = nicknames.keys()
+ sorted.sort()
+ maxlen = max([len(n) for n in sorted])
+ format = " %"+str(maxlen)+"s:%s [%s descriptors]"
+ for lcnickname in sorted:
+ nickname = nicknames[lcnickname][0]
+ ss = servers[lcnickname]
+ if len(ss) > 1:
+ print >>f, ("***** MULTIPLE KEYIDS FOR %s:"%nickname)
+ for fp, s in ss:
+ print >>f, (format%(nickname,fp,len(s)))
def _writeServer(directory, contents, nickname):
newFile = nickname+"-"+formatFnameTime()