[Author Prev][Author Next][Thread Prev][Thread Next][Author Index][Thread Index]

[minion-cvs] Refactor incoming queues for better separation of permi...



Update of /home/minion/cvsroot/src/minion/lib/mixminion/directory
In directory moria.mit.edu:/tmp/cvs-serv8806/lib/mixminion/directory

Modified Files:
	ServerList.py 
Added Files:
	DirCGI.py 
Log Message:
Refactor incoming queues for better separation of permissions; add publication

--- NEW FILE: DirCGI.py ---
# Copyright 2003 Nick Mathewson.  See LICENSE for licensing information.
# $Id: DirCGI.py,v 1.1 2003/05/23 22:49:30 nickm Exp $

"""mixminion.directory.DirCGI

   Backend for directory-publish CGI.
   """

__all__ = [ ]

DIRECTORY_BASE = None
IDCACHE_FILE = None
QUEUE_BASE = None

def addServer(text, base):
    DIRECTORY_
    #XXXX write more.

              

Index: ServerList.py
===================================================================
RCS file: /home/minion/cvsroot/src/minion/lib/mixminion/directory/ServerList.py,v
retrieving revision 1.16
retrieving revision 1.17
diff -u -d -r1.16 -r1.17
--- ServerList.py	23 May 2003 07:54:11 -0000	1.16
+++ ServerList.py	23 May 2003 22:49:30 -0000	1.17
@@ -70,8 +70,10 @@
     #     server-ids/
     #          nickname-dateinserted
     #               (Pickled: ("V0", (nickname, encoded public key)))
-    #     incoming/
-    #          nickname-dateinserted.N ...    
+    #     incoming/new/
+    #          nickname-dateinserted.N ...
+    #     incoming/updates/
+    #          nickname-dateinserted.N ...
     #     servers/
     #          nickname-dateinserted.N ...
     #     archive/
@@ -83,17 +85,20 @@
     #          dir-dategenerated.N ...
     #     identity
     #     .lock
-    def __init__(self, baseDir):
+    #
+    # idCacheFile:
+    def __init__(self, baseDir, idCacheFile=None):
         """Initialize a ServerList to store servers under baseDir/servers,
            creating directories as needed.
         """
         self.baseDir = baseDir
         self.serverIDDir = os.path.join(self.baseDir, "server-ids")
         self.serverDir = os.path.join(self.baseDir, "servers")
-        self.incomingDir = os.path.join(self.baseDir, "incoming")
         self.rejectDir = os.path.join(self.baseDir, "reject")
         self.archiveDir = os.path.join(self.baseDir, "archive")
         self.dirArchiveDir = os.path.join(self.baseDir, "dirArchive")
+        self.idCacheFile = idCacheFile or os.path.join(self.baseDir,
+                                                       "id_cache")
         self.lockfile = Lockfile(os.path.join(self.baseDir, ".lock"))
         self.rlock = threading.RLock()
         self.servers = {}
@@ -139,6 +144,7 @@
                          ("V0", (nickname, pk_encode_public_key(ident))))
             
             self.serverIDs[nickname.lower()] = ident
+            self._writeIDCache()
         finally:
             self._unlock()
 
@@ -394,6 +400,8 @@
 
             # Then, rebuild self.serversByNickname
             self.__buildNicknameMap()
+
+            self._writeIDCache()
         finally:
             self._unlock()
             
@@ -415,40 +423,52 @@
         self.lockfile.release()
         self.rlock.release()
 
-class ServerQueue:
+    def _writeIDCache(self):
+        writePickled(self.idCacheFile, ("V0", self.serverIDs), 0644)
+
+class IncomingQueue:
     """DOCDOC"""
-    def __init__(self, incomingDir):
+    def __init__(self, incomingDir, rejectDir):
         """DOCDOC"""
         self.incomingDir = incomingDir
-        createPrivateDir(self.incomingDir)
+        self.rejectDir = rejectDir
+        if not os.path.exists(incomingDir):
+            raise MixFatalError("Incoming directory doesn't exist")
+        if not os.path.exists(rejectDir):
+            raise MixFatalError("Reject directory doesn't exist")
 
     def queueIncomingServer(self, contents, server):
         """DOCDOC"""
         nickname = server.getNickname()
         _writeServer(nickname, contents, self.incomingDir)
 
-    def serversPending(self):
+    def queueRejectedServer(self, contents, server):
+        nickname = server.getNickname()
+        _writeServer(nickname, contents, self.rejectDir)        
+
+    def newServersPending(self, newServ):
         """DOCDOC"""
         return len(os.listdir(self.incomingDir)) > 0
 
     def readPendingServers(self):
+        d = self.incomingDir
         res = []
-        for fname in os.listdir(self.incomingDir):
-            path = os.path.join(self.incomingDir,fname)
+        for fname in os.listdir(d):
+            path = os.path.join(d,fname)
             try:
                 text, server = _readServer(path)
             except (ConfigError, MixError),e:
                 os.unlink(path)
                 LOG.warn("Removed a bad server descriptor %s from incoming dir: %s",
                          fname, e)
-
+                continue
             res.append((fname, server, text, getIDFingerprint(server)))
         return res
 
     def delPendingServers(self, fnames):
         for fname in fnames:
             try:
-                os.path.unlink(os.path.join(self.incomingDir, fname))
+                os.path.unlink(os.path.join(d, fname))
             except OSError:
                 LOG.warn("delPendingServers: no such server %s"%fname)
 
@@ -456,114 +476,145 @@
     """DOCDOC"""
     pass
 
-def receiveServer(serverList, incomingQueue, text, source):
-    """DOCDOC
+class ServerIntake:
+    def __init__(self, base, idCacheFile):
+        self.newQueue = IncomingQueue(os.path.join(base, "new"),
+                                      os.path.join(base, "reject"))
+        self.updateQueue = IncomingQueue(os.path.join(base, "updates"),
+                                         os.path.join(base, "reject"))
+        self.idCacheFile = idCacheFile
+        self.idCache = None
 
-       Returns true on OK; raises UIError on failure; raises ServerQueued on
-       wait-for-admin."""
-    try:
-        text, server = _readServer(text)
-    except MixError, e:
-        LOG.warn("Rejected invalid server from %s: %s", source,e)
-        raise UIError("Server descriptor was not valid: %s"%e)
+    def getIDCache(self):
+        if self.idCache is None:
+            o = readPickled(self.idCacheFile)
+            if o[0] != 'V0':
+                raise MixFatalError("Malformatted ID cache")
+            self.idCache = o[1]
+        return self.idCache
 
-    try:
-        known = serverList.isServerKnown(server)
-    except MixError, e:
-        LOG.warn("Rejected server with mismatched identity from %s", source)
-        raise
+    def receiveServer(self, text, source):
+        """DOCDOC
 
-    if not known:
-        LOG.info("Received previously unknown server from %s", source)
-        incomingQueue.queueIncomingServer(text,server)
-        raise ServerQueuedException()
+           Returns true on OK; raises UIError on failure; raises
+           ServerQueued on wait-for-admin.
+           """
+        try:
+            text, server = _readServer(text)
+        except MixError, e:
+            LOG.warn("Rejected invalid server from %s: %s", source,e)
+            raise UIError("Server descriptor was not valid: %s"%e)
 
-    serverList.importServerInfo(text, knownOnly=1, server=server)
+        nickname = server.getNickname()
+        lcnickname = nickname.lower()
+        identity = server.getIdentity()
+        oldIdentity = self.getIDCache().get(lcnickname)
+        if oldIdentity is None:
+            LOG.info("Received previously unknown server %s from %s",
+                     nickname, source)
+            self.newQueue.queueIncomingServer(text,server)
+            return
 
-def acceptServer(serverList, incomingQueue, nickname):
-    if ':' in nickname:
-        nickname, fingerprint = nickname.split(":")
-    else:
-        fingerprint = None
-        
-    lcnickname = nickname.lower()
-    incoming = incomingQueue.readPendingServers()
-    # Do we have any pending servers of the desired name?
-    incoming = [ (fname,server,text,fp)
-                 for fname,server,text,fp in incoming
-                 if server.getNickname().lower() == lcnickname ]
-    if not incoming:
-        raise UIError("No incoming servers named %s"%nickname)
+        if pk_same_public_key(oldIdentity, identity):
+            LOG.info("Received update for server %s"%server)
+            self.updateQueue.queueIncomingServer(text,server)
+            return
 
-    if not fingerprint:
-        fps = [fp for f,s,t,fp in incoming]
-        for f in fps:
-            if f != fps[0]:
-                raise UIError("Multiple KeyIDs for servers names %s"%nickname)
-        reject = []
-    else:
-        reject = [ (f,s,t,fp) for f,s,t,fp in incoming
-                   if fp != fingerprint ]
-        incoming = [ (f,s,t,fp) for f,s,t,fp in incoming
-                    if fp == fingerprint ]
-        if not incoming:
-            raise UIError("No servers named %s with matching KeyID"%nickname)
-        if reject:
-            LOG.warn("Rejecting %s server(s) named %s with unmatching KeyIDs",
-                     len(reject), nickname)
+        LOG.warn("Rejected server with mismatched identity from %s", source)
+        self.updateQueue.queueRejectedServer(text,server)
+        raise UIError("I already know a server named %s with a different key.",
+                      nickname)
 
-    try:
-        serverList._lock()
-        serverList.learnServerID(incoming[0][1])
+    def _doAccept(self, serverList, q, incoming, reject, knownOnly):
         accepted = []
-        for fname,server,text,fp in incoming:
+        for fname, server, text, fp in incoming: 
             try:
-                serverList.importServerInfo(text,server=server)
+                serverList.importServerInfo(text,server=server,
+                                            knownOnly=knownOnly)
                 accepted.append(fname)
-            except MixError, e:
+            except (MixError, config), e:
                 LOG.warn("ServerList refused to include server %s: %s",
                          fname, e)
                 reject.append((fname,server,text,fp))
 
-        if len(accepted):
-            LOG.warn("Key ID learned; no servers actually imported")
+        for fname, server, text, fp in reject:
+            self.updateQueue.queueRejectedServer(text,server)
+
+        fnames = accepted + [fn for fn,_,_,_ in reject]
+        q.delPendingServers(fnames)
+
+    def acceptUpdates(self, serverList):
+        incoming = self.updateQueue.readPendingServers()
+        self._doAccept(serverList, self.updateQueue, incoming, [],
+                       knownOnly=1)
+
+    def acceptNewServers(self, serverList, nickname):
+        if ':' in nickname:
+            nickname, fingerprint = nickname.split(":")
         else:
-            LOG.info("%s server descriptor(s) imported.", len(accepted))
+            fingerprint = None
 
-        LOG.info("Moving %s descriptors to rejected status"%len(reject))
-        for fname,_,text,_ in reject:
-            _writeServer(serverList.rejectDir, text, "BAD-"+nickname)
+        lcnickname = nickname.lower()
+        incoming = self.newQueue.readPendingServers()
+        # Do we have any pending servers of the desired name?
+        incoming = [ (fname,server,text,fp)
+                     for fname,server,text,fp in incoming
+                     if server.getNickname().lower() == lcnickname ]
+        if not incoming:
+            raise UIError("No incoming servers named %s"%nickname)
 
-        incomingQueue.delPendingServers([fname for fname,_,text,_ in reject])
-        incomingQueue.delPendingServers(accepted)
-    finally:
-        serverList._unlock()
+        if not fingerprint:
+            fps = [fp for f,s,t,fp in incoming]
+            for f in fps:
+                if f != fps[0]:
+                    raise UIError("Multiple KeyIDs for servers named %s"%
+                                  nickname)
+            reject = []
+        else:
+            reject = [ (f,s,t,fp) for f,s,t,fp in incoming
+                       if fp != fingerprint ]
+            incoming = [ (f,s,t,fp) for f,s,t,fp in incoming
+                        if fp == fingerprint ]
+            if not incoming:
+                raise UIError("No servers named %s with matching KeyID"%
+                              nickname)
+            if reject:
+                LOG.warn("Rejecting %s servers named %s with unmatched KeyIDs",
+                         len(reject), nickname)
+            
+        try:
+            serverList._lock()
+            serverList.learnServerID(incoming[0][1])
+            self._doAccept(serverList, self.newQueue, incmoing, reject,
+                           knownOnly=1)
+        finally:
+            serverList._unlock()
 
-def listPendingServers(f, incomingQueue):
-    """DOCDOC"""
-    incoming = incomingQueue.readPendingServers()
-    # lcnickname->fp->servers
-    servers = {}
-    # lcnickname->nicknames
-    nicknames = {}
+    def listNewPendingServers(self, f):
+        """DOCDOC"""
+        incoming = self.newQueue.readPendingServers()
+        # lcnickname->fp->servers
+        servers = {}
+        # lcnickname->nicknames
+        nicknames = {}
     
-    for f,s,t,fp in incoming:
-        nickname = s.getNickname()
-        lcnickname = nickname.lower()
-        nicknames.setdefault(lcnickname, []).append(nickname)
-        servers.setdefault(lcnickname, {}).setdefault(fp, []).append(s)
+        for f,s,t,fp in incoming:
+            nickname = s.getNickname()
+            lcnickname = nickname.lower()
+            nicknames.setdefault(lcnickname, []).append(nickname)
+            servers.setdefault(lcnickname, {}).setdefault(fp, []).append(s)
 
-    sorted = nicknames.keys()
-    sorted.sort()
-    maxlen = max([len(n) for n in sorted])
-    format = " %"+str(maxlen)+"s:%s [%s descriptors]"
-    for lcnickname in sorted:
-        nickname = nicknames[lcnickname][0]
-        ss = servers[lcnickname]
-        if len(ss) > 1:
-            print >>f, ("***** MULTIPLE KEYIDS FOR %s:"%nickname)
-        for fp, s in ss:
-            print >>f, (format%(nickname,fp,len(s)))
+        sorted = nicknames.keys()
+        sorted.sort()
+        maxlen = max([len(n) for n in sorted])
+        format = " %"+str(maxlen)+"s:%s [%s descriptors]"
+        for lcnickname in sorted:
+            nickname = nicknames[lcnickname][0]
+            ss = servers[lcnickname]
+            if len(ss) > 1:
+                print >>f, ("***** MULTIPLE KEYIDS FOR %s:"%nickname)
+            for fp, s in ss:
+                print >>f, (format%(nickname,fp,len(s)))
         
 def _writeServer(directory, contents, nickname):
     newFile = nickname+"-"+formatFnameTime()