[Author Prev][Author Next][Thread Prev][Thread Next][Author Index][Thread Index]

[minion-cvs] Refactor ServerList into three files



Update of /home/minion/cvsroot/src/minion/lib/mixminion/directory
In directory moria.mit.edu:/tmp/cvs-serv2125/lib/mixminion/directory

Modified Files:
	ServerList.py 
Added Files:
	Directory.py ServerInbox.py 
Log Message:
Refactor ServerList into three files

--- NEW FILE: Directory.py ---
# Copyright 2003 Nick Mathewson.  See LICENSE for licensing information.
# $Id: Directory.py,v 1.1 2003/05/25 21:57:05 nickm Exp $

"""mixminion.directory.Directory

   DOCDOC

   """

__all__ = [ 'ServerList', 'MismatchedID' ]

import os

import mixminion.Config
import mixminion.Crypto

from mixminion.Common import LOG, MixError, MixFatalError, UIError, \
     formatBase64, writePickled, readPickled

class MismatchedID(Exception):
    pass

class IDCache:
    """DOCDOC"""
    def __init__(self, location):
        self.location = location
        self.dirty = 0
        self.cache = None

    def containsID(self, nickname, ID):
        nickname = nickname.lower()
        try:
            return self.__containsID(nickname, ID)
        except TypeError:
            self.load()
            return self.__containsID(nickname, ID)

    def __containsID(self, lcnickname, ID):
        try:
            if self.cache[lcnickname] != ID:
                raise MismatchedID()
            return 1
        except KeyError:
            return 0

    def containsServer(self, server):
        nickname = server.getNickname()
        ID = getIDFingerprint(server)
        return self.containsID(nickname, ID)

    def insertID(self, nickname, ID):
        nickname = nickname.lower()
        self.dirty = 1
        try:
            self.__insertID(nickname, ID)
        except AttributeError:
            self.load()
            self.__insertID(nickname, ID)

    def __insertID(self, lcnickname, ID):
        old = self.cache.get(lcnickname)
        if old and old != ID:
            raise MismatchedID()
        self.cache[lcnickname] = ID

    def insertServer(self, server):
        key = server.getIdentity()
        nickname = server.getNickname()
        ID = getIDFingerprint(server)
        self.insertID(nickname, ID)

    def flush(self):
        if self.dirty:
            self.save()

    def load(self):
        if not os.path.exists(self.location):
            LOG.info("No ID cache; will create")
            self.cache = {}
            return
        try:
            obj = readPickled(self.location)
            # Pass pickling error
        except OSError, e:
            raise MixError("Cache exists, but cannot read cache: %s" % e)
        if len(obj) != 2:
            raise MixFatalError("Corrupt ID cache")
        
        typecode, data = obj
        if typecode != 'V0':
            raise MixFatalError("Unrecognized version on ID cache.")

        self.cache = data
        
    def save(self):
        writePickled(self.location,
                     ("V0", self.cache),
                     0644)
        self.dirty = 0
            
def getIDFingerprint(server):
    """DOCDOC"""
    ident = server.getIdentity()
    return formatBase64(
        mixminion.Crypto.sha1(
             mixminion.Crypto.pk_encode_public_key(ident)))

--- NEW FILE: ServerInbox.py ---
# Copyright 2003 Nick Mathewson.  See LICENSE for licensing information.
# $Id: ServerInbox.py,v 1.1 2003/05/25 21:57:05 nickm Exp $

"""mixminion.directory.ServerInbox

   DOCDOC

   """

__all__ = [ 'ServerInbox' ]

from mixminion.Common import LOG, MixErrror, MixFatalError, UIError, \
     readPickled, writePickled, 

from mixminion.directory.Directory import getIDFingerprint
from mixminion.directory.ServerList import _writeServer, _readServer

class ServerInbox:
    def __init__(self, base, idCache):
        self.newQueue = IncomingQueue(os.path.join(base, "new"),
                                      os.path.join(base, "reject"))
        self.updateQueue = IncomingQueue(os.path.join(base, "updates"),
                                         os.path.join(base, "reject"))
        self.idCache = idCache

    def receiveServer(self, text, source):
        """DOCDOC

           Returns true on OK; raises UIError on failure; raises
           ServerQueued on wait-for-admin.
           """
        try:
            text, server = _readServer(text)
        except MixError, e:
            LOG.warn("Rejected invalid server from %s: %s", source,e)
            raise UIError("Server descriptor was not valid: %s"%e)

        nickname = server.getNickname()

        try:
            known = self.idCache.containsServer(nickname)
        except mixminion.directory.Directory.MismatchedID:
            LOG.warn("Rejected server with mismatched identity from %s",
                     source)
            self.updateQueue.queueRejectedServer(text,server)
            raise UIError(("I already know a server named "
                           "%s with a different key.")%nickname)

        if not known:
            LOG.info("Received previously unknown server %s from %s",
                     nickname, source)
            self.newQueue.queueIncomingServer(text,server)
            return
        else:
            LOG.info("Received update for server %s from %s",
                     nickname, source)
            self.updateQueue.queueIncomingServer(text,server)
            return

    def _doAccept(self, serverList, q, incoming, reject, knownOnly):
        accepted = []
        for fname, server, text, fp in incoming: 
            try:
                serverList.importServerInfo(text,server=server,
                                            knownOnly=knownOnly)
                accepted.append(fname)
            except (MixError, config), e:
                LOG.warn("ServerList refused to include server %s: %s",
                         fname, e)
                reject.append((fname,server,text,fp))

        for fname, server, text, fp in reject:
            self.updateQueue.queueRejectedServer(text,server)

        fnames = accepted + [fn for fn,_,_,_ in reject]
        q.delPendingServers(fnames)

    def acceptUpdates(self, serverList):
        incoming = self.updateQueue.readPendingServers()
        self._doAccept(serverList, self.updateQueue, incoming, [],
                       knownOnly=1)

    def acceptNewServers(self, serverList, nickname):
        if ':' in nickname:
            nickname, fingerprint = nickname.split(":")
        else:
            fingerprint = None

        lcnickname = nickname.lower()
        incoming = self.newQueue.readPendingServers()
        # Do we have any pending servers of the desired name?
        incoming = [ (fname,server,text,fp)
                     for fname,server,text,fp in incoming
                     if server.getNickname().lower() == lcnickname ]
        if not incoming:
            raise UIError("No incoming servers named %s"%nickname)

        if not fingerprint:
            fps = [fp for f,s,t,fp in incoming]
            for f in fps:
                if f != fps[0]:
                    raise UIError("Multiple KeyIDs for servers named %s"%
                                  nickname)
            reject = []
        else:
            reject = [ (f,s,t,fp) for f,s,t,fp in incoming
                       if fp != fingerprint ]
            incoming = [ (f,s,t,fp) for f,s,t,fp in incoming
                        if fp == fingerprint ]
            if not incoming:
                raise UIError("No servers named %s with matching KeyID"%
                              nickname)
            if reject:
                LOG.warn("Rejecting %s servers named %s with unmatched KeyIDs",
                         len(reject), nickname)
            
        try:
            serverList._lock()
            serverList.learnServerID(incoming[0][1])
            self._doAccept(serverList, self.newQueue, incoming, reject,
                           knownOnly=1)
        finally:
            serverList._unlock()

    def listNewPendingServers(self, f):
        """DOCDOC"""
        incoming = self.newQueue.readPendingServers()
        # lcnickname->fp->servers
        servers = {}
        # lcnickname->nicknames
        nicknames = {}
    
        for f,s,t,fp in incoming:
            nickname = s.getNickname()
            lcnickname = nickname.lower()
            nicknames.setdefault(lcnickname, []).append(nickname)
            servers.setdefault(lcnickname, {}).setdefault(fp, []).append(s)

        sorted = nicknames.keys()
        sorted.sort()
        maxlen = max([len(n) for n in sorted])
        format = " %"+str(maxlen)+"s:%s [%s descriptors]"
        for lcnickname in sorted:
            nickname = nicknames[lcnickname][0]
            ss = servers[lcnickname]
            if len(ss) > 1:
                print >>f, ("***** MULTIPLE KEYIDS FOR %s:"%nickname)
            for fp, s in ss:
                print >>f, (format%(nickname,fp,len(s)))

class IncomingQueue:
    """DOCDOC"""
    def __init__(self, incomingDir, rejectDir):
        """DOCDOC"""
        self.incomingDir = incomingDir
        self.rejectDir = rejectDir
        if not os.path.exists(incomingDir):
            raise MixFatalError("Incoming directory doesn't exist")
        if not os.path.exists(rejectDir):
            raise MixFatalError("Reject directory doesn't exist")

    def queueIncomingServer(self, contents, server):
        """DOCDOC"""
        nickname = server.getNickname()
        _writeServer(nickname, contents, self.incomingDir)

    def queueRejectedServer(self, contents, server):
        nickname = server.getNickname()
        _writeServer(nickname, contents, self.rejectDir)        

    def newServersPending(self, newServ):
        """DOCDOC"""
        return len(os.listdir(self.incomingDir)) > 0

    def readPendingServers(self):
        res = []
        for fname in os.listdir(self.incomingDir):
            path = os.path.join(self.incomingDir,fname)
            try:
                text, server = _readServer(path)
            except (ConfigError, MixError),e:
                os.unlink(path)
                LOG.warn("Removed a bad server descriptor %s from incoming dir: %s",
                         fname, e)
                continue
            res.append((fname, server, text, getIDFingerprint(server)))
        return res

    def delPendingServers(self, fnames):
        for fname in fnames:
            try:
                os.path.unlink(os.path.join(self.incomingDir, fname))
            except OSError:
                LOG.warn("delPendingServers: no such server %s"%fname)

class ServerQueuedException(Exception):
    """DOCDOC"""
    pass

Index: ServerList.py
===================================================================
RCS file: /home/minion/cvsroot/src/minion/lib/mixminion/directory/ServerList.py,v
retrieving revision 1.18
retrieving revision 1.19
diff -u -d -r1.18 -r1.19
--- ServerList.py	25 May 2003 17:07:31 -0000	1.18
+++ ServerList.py	25 May 2003 21:57:05 -0000	1.19
@@ -20,6 +20,9 @@
 import threading
 import mixminion
 
+import mixminion.Config
+import mixminion.directory.Directory
+
 from mixminion.Crypto import pk_decode_public_key, pk_encode_public_key, \
      pk_same_public_key
 from mixminion.Common import IntervalSet, LOG, MixError, MixFatalError, \
@@ -62,7 +65,6 @@
     #  rejectDir: Directory where we store invalid descriptors.
     #  archiveDir: Directory where we store old descriptors
     #  servers: Map from filename within <serverDir> to ServerInfo objects.
-    #  serverIDs: Map from lowercased server nickname to serverID.
     #  serversByNickname: A map from lowercased server nickname to
     #       lists of filenames within <serverDir>
     ##Layout:
@@ -86,19 +88,22 @@
     #     identity
     #     .lock
     #
-    # idCacheFile:
-    def __init__(self, baseDir, idCacheFile=None):
+    # idCache: DOCDOC
+    def __init__(self, baseDir, idCache=None):
         """Initialize a ServerList to store servers under baseDir/servers,
            creating directories as needed.
         """
         self.baseDir = baseDir
+        if idCache is None:
+            idCache = mixminion.directory.Directory.IDCache(
+                os.path.join(baseDir, "xx_idcache"))
+        self.idCache = idCache
+        
         self.serverIDDir = os.path.join(self.baseDir, "server-ids")
         self.serverDir = os.path.join(self.baseDir, "servers")
         self.rejectDir = os.path.join(self.baseDir, "reject")
         self.archiveDir = os.path.join(self.baseDir, "archive")
         self.dirArchiveDir = os.path.join(self.baseDir, "dirArchive")
-        self.idCacheFile = idCacheFile or os.path.join(self.baseDir,
-                                                       "id_cache")
         self.lockfile = Lockfile(os.path.join(self.baseDir, ".lock"))
         self.rlock = threading.RLock()
         self.servers = {}
@@ -117,18 +122,12 @@
            a different key."""
         try:
             self._lock()
-            nickname = server.getNickname()
-            lcnickname = nickname.lower()
             try:
-                oldIdentity = self.serverIDs[lcnickname]
-            except KeyError:
-                return 0
-
-            newIdentity = server.getIdentity()
-            if not pk_same_public_key(newIdentity, oldIdentity):
-                raise UIError("Already know a server named %r with a different identity key." % nickname)
-
-            return 1
+                return self.idCache.containsServer(server)
+            except mixminion.directory.Directory.MismatchedID:
+                raise UIError(("Already know a server named "
+                               "%r with different identity key.")
+                              % server.getNickname())
         finally:
             self._unlock()
 
@@ -136,15 +135,14 @@
         """DOCDOC"""
         try:
             self._lock()
+            ident = server.getIdentity()
             nickname = server.getNickname()
             LOG.info("Learning identity for new server %s", nickname)
-            ident = server.getIdentity()
+            self.idCache.insertServer(server)
             writePickled(os.path.join(self.serverIDDir,
                                       nickname+"-"+formatFnameTime()),
                          ("V0", (nickname, pk_encode_public_key(ident))))
-            
-            self.serverIDs[nickname.lower()] = ident
-            self._writeIDCache()
+            self.idCache.save()
         finally:
             self._unlock()
 
@@ -182,11 +180,13 @@
                 # Okay -- make sure we don't have this same descriptor.
                 for fn in self.serversByNickname[lcnickname]:
                     oldServer = self.servers[fn]
-                    if oldServer['Server']['Digest'] == server['Server']['Digest']:
+                    if oldServer['Server']['Digest'] == \
+                           server['Server']['Digest']:
                         raise UIError("Server descriptor already inserted.")
                 # Okay -- make sure that this server isn't superseded.
                 if server.isSupersededBy(
-                 [ self.servers[fn] for fn in self.serversByNickname[lcnickname]]):
+                    [ self.servers[fn] for fn in
+                                 self.serversByNickname[lcnickname]]):
                     raise UIError("Server descriptor is superseded")
 
             if not known:
@@ -246,8 +246,11 @@
                 included.append((nickname, validAfter, fn))
 
             included.sort()
-            # FFFF We should probably not do all of this in RAM, but what the hey.
-            # FFFF It will only matter if we have many, many servers in the system.
+
+            # FFFF We should probably not do all of this in RAM, but
+            # FFFF what the hey.  It will only matter if we have many, many
+            # FFFF servers in the system.
+
             contents = [ ]
             for _, _, fn in included:
                 f = open(os.path.join(self.serverDir, fn), 'r')
@@ -370,7 +373,7 @@
                 try:
                     self.servers[filename] = ServerInfo(fname=path)
                 except ConfigError, e:
-                    LOG.warn("Somehow, a bad server named %s got into our store",
+                    LOG.warn("Somehow, a bad server named %s got in our store",
                              filename)
                     LOG.warn(" (Error was: %s)", str(e))
                     os.rename(path, os.path.join(self.rejectDir, filename))
@@ -381,7 +384,8 @@
                 path = os.path.join(self.serverIDDir, filename)
                 t = readPickled(path)
                 if t[0] != 'V0':
-                    LOG.warn("Skipping confusing stored key in file %s", filename)
+                    LOG.warn("Skipping confusing stored key in file %s",
+                             filename)
                     continue
                 nickname, key = t[1]
                 self.serverIDs[nickname.lower()] = pk_decode_public_key(key)
@@ -392,7 +396,8 @@
                 try:
                     ident = self.serverIDs[lcn]
                 except KeyError:
-                    raise UIError("No stored key for server %s", s.getNickname())
+                    raise UIError("No stored key for server %s",
+                                  s.getNickname())
 
                 if not pk_same_public_key(ident, s.getIdentity()):
                     raise UIError("Inconsistent stored key for server %s",
@@ -400,8 +405,6 @@
 
             # Then, rebuild self.serversByNickname
             self.__buildNicknameMap()
-
-            self._writeIDCache()
         finally:
             self._unlock()
             
@@ -423,198 +426,7 @@
         self.lockfile.release()
         self.rlock.release()
 
-    def _writeIDCache(self):
-        writePickled(self.idCacheFile, ("V0", self.serverIDs), 0644)
-
-class IncomingQueue:
-    """DOCDOC"""
-    def __init__(self, incomingDir, rejectDir):
-        """DOCDOC"""
-        self.incomingDir = incomingDir
-        self.rejectDir = rejectDir
-        if not os.path.exists(incomingDir):
-            raise MixFatalError("Incoming directory doesn't exist")
-        if not os.path.exists(rejectDir):
-            raise MixFatalError("Reject directory doesn't exist")
-
-    def queueIncomingServer(self, contents, server):
-        """DOCDOC"""
-        nickname = server.getNickname()
-        _writeServer(nickname, contents, self.incomingDir)
 
-    def queueRejectedServer(self, contents, server):
-        nickname = server.getNickname()
-        _writeServer(nickname, contents, self.rejectDir)        
-
-    def newServersPending(self, newServ):
-        """DOCDOC"""
-        return len(os.listdir(self.incomingDir)) > 0
-
-    def readPendingServers(self):
-        res = []
-        for fname in os.listdir(self.incomingDir):
-            path = os.path.join(self.incomingDir,fname)
-            try:
-                text, server = _readServer(path)
-            except (ConfigError, MixError),e:
-                os.unlink(path)
-                LOG.warn("Removed a bad server descriptor %s from incoming dir: %s",
-                         fname, e)
-                continue
-            res.append((fname, server, text, getIDFingerprint(server)))
-        return res
-
-    def delPendingServers(self, fnames):
-        for fname in fnames:
-            try:
-                os.path.unlink(os.path.join(self.incomingDir, fname))
-            except OSError:
-                LOG.warn("delPendingServers: no such server %s"%fname)
-
-class ServerQueuedException(Exception):
-    """DOCDOC"""
-    pass
-
-class ServerIntake:
-    def __init__(self, base, idCacheFile):
-        self.newQueue = IncomingQueue(os.path.join(base, "new"),
-                                      os.path.join(base, "reject"))
-        self.updateQueue = IncomingQueue(os.path.join(base, "updates"),
-                                         os.path.join(base, "reject"))
-        self.idCacheFile = idCacheFile
-        self.idCache = None
-
-    def getIDCache(self):
-        if self.idCache is None:
-            o = readPickled(self.idCacheFile)
-            if o[0] != 'V0':
-                raise MixFatalError("Malformatted ID cache")
-            self.idCache = o[1]
-        return self.idCache
-
-    def receiveServer(self, text, source):
-        """DOCDOC
-
-           Returns true on OK; raises UIError on failure; raises
-           ServerQueued on wait-for-admin.
-           """
-        try:
-            text, server = _readServer(text)
-        except MixError, e:
-            LOG.warn("Rejected invalid server from %s: %s", source,e)
-            raise UIError("Server descriptor was not valid: %s"%e)
-
-        nickname = server.getNickname()
-        lcnickname = nickname.lower()
-        identity = server.getIdentity()
-        oldIdentity = self.getIDCache().get(lcnickname)
-        if oldIdentity is None:
-            LOG.info("Received previously unknown server %s from %s",
-                     nickname, source)
-            self.newQueue.queueIncomingServer(text,server)
-            return
-
-        if pk_same_public_key(oldIdentity, identity):
-            LOG.info("Received update for server %s"%server)
-            self.updateQueue.queueIncomingServer(text,server)
-            return
-
-        LOG.warn("Rejected server with mismatched identity from %s", source)
-        self.updateQueue.queueRejectedServer(text,server)
-        raise UIError("I already know a server named %s with a different key.",
-                      nickname)
-
-    def _doAccept(self, serverList, q, incoming, reject, knownOnly):
-        accepted = []
-        for fname, server, text, fp in incoming: 
-            try:
-                serverList.importServerInfo(text,server=server,
-                                            knownOnly=knownOnly)
-                accepted.append(fname)
-            except (MixError, config), e:
-                LOG.warn("ServerList refused to include server %s: %s",
-                         fname, e)
-                reject.append((fname,server,text,fp))
-
-        for fname, server, text, fp in reject:
-            self.updateQueue.queueRejectedServer(text,server)
-
-        fnames = accepted + [fn for fn,_,_,_ in reject]
-        q.delPendingServers(fnames)
-
-    def acceptUpdates(self, serverList):
-        incoming = self.updateQueue.readPendingServers()
-        self._doAccept(serverList, self.updateQueue, incoming, [],
-                       knownOnly=1)
-
-    def acceptNewServers(self, serverList, nickname):
-        if ':' in nickname:
-            nickname, fingerprint = nickname.split(":")
-        else:
-            fingerprint = None
-
-        lcnickname = nickname.lower()
-        incoming = self.newQueue.readPendingServers()
-        # Do we have any pending servers of the desired name?
-        incoming = [ (fname,server,text,fp)
-                     for fname,server,text,fp in incoming
-                     if server.getNickname().lower() == lcnickname ]
-        if not incoming:
-            raise UIError("No incoming servers named %s"%nickname)
-
-        if not fingerprint:
-            fps = [fp for f,s,t,fp in incoming]
-            for f in fps:
-                if f != fps[0]:
-                    raise UIError("Multiple KeyIDs for servers named %s"%
-                                  nickname)
-            reject = []
-        else:
-            reject = [ (f,s,t,fp) for f,s,t,fp in incoming
-                       if fp != fingerprint ]
-            incoming = [ (f,s,t,fp) for f,s,t,fp in incoming
-                        if fp == fingerprint ]
-            if not incoming:
-                raise UIError("No servers named %s with matching KeyID"%
-                              nickname)
-            if reject:
-                LOG.warn("Rejecting %s servers named %s with unmatched KeyIDs",
-                         len(reject), nickname)
-            
-        try:
-            serverList._lock()
-            serverList.learnServerID(incoming[0][1])
-            self._doAccept(serverList, self.newQueue, incoming, reject,
-                           knownOnly=1)
-        finally:
-            serverList._unlock()
-
-    def listNewPendingServers(self, f):
-        """DOCDOC"""
-        incoming = self.newQueue.readPendingServers()
-        # lcnickname->fp->servers
-        servers = {}
-        # lcnickname->nicknames
-        nicknames = {}
-    
-        for f,s,t,fp in incoming:
-            nickname = s.getNickname()
-            lcnickname = nickname.lower()
-            nicknames.setdefault(lcnickname, []).append(nickname)
-            servers.setdefault(lcnickname, {}).setdefault(fp, []).append(s)
-
-        sorted = nicknames.keys()
-        sorted.sort()
-        maxlen = max([len(n) for n in sorted])
-        format = " %"+str(maxlen)+"s:%s [%s descriptors]"
-        for lcnickname in sorted:
-            nickname = nicknames[lcnickname][0]
-            ss = servers[lcnickname]
-            if len(ss) > 1:
-                print >>f, ("***** MULTIPLE KEYIDS FOR %s:"%nickname)
-            for fp, s in ss:
-                print >>f, (format%(nickname,fp,len(s)))
-        
 def _writeServer(directory, contents, nickname):
     newFile = nickname+"-"+formatFnameTime()
     f, newFile = openUnique(os.path.join(directory, "newFile"))
@@ -632,10 +444,3 @@
     # May raise ConfigError, MixError
     return contents, ServerInfo(string=contents, assumeValid=0)
 
-def getIDFingerprint(server):
-    """DOCDOC"""
-    ident = server.getIdentity()
-    return formatBase64(
-        mixminion.Crypto.sha1(
-             mixminion.Crypto.pk_encode_public_key(ident)))
-