[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[minion-cvs] Fix 3 outstanding MMTP issues: cert confusion, self-to-...



Update of /home/minion/cvsroot/src/minion/lib/mixminion/server
In directory moria.mit.edu:/tmp/cvs-serv20486/src/minion/lib/mixminion/server

Modified Files:
	MMTPServer.py ServerKeys.py ServerMain.py 
Log Message:
Fix 3 outstanding MMTP issues: cert confusion, self-to-self connections, avoid multiple connections to same server.

Index: MMTPServer.py
===================================================================
RCS file: /home/minion/cvsroot/src/minion/lib/mixminion/server/MMTPServer.py,v
retrieving revision 1.20
retrieving revision 1.21
diff -u -d -r1.20 -r1.21
--- MMTPServer.py	20 Feb 2003 16:57:40 -0000	1.20
+++ MMTPServer.py	28 Mar 2003 15:36:23 -0000	1.21
@@ -29,9 +29,11 @@
 from types import StringType
 
 import mixminion._minionlib as _ml
-from mixminion.Common import MixError, MixFatalError, LOG, stringContains
+from mixminion.Common import MixError, MixFatalError, MixProtocolError, \
+     LOG, stringContains
 from mixminion.Crypto import sha1, getCommonPRNG
 from mixminion.Packet import MESSAGE_LEN, DIGEST_LEN
+from mixminion.MMTPClient import PeerCertificateCache
 
 __all__ = [ 'AsyncServer', 'ListenConnection', 'MMTPServerConnection',
             'MMTPClientConnection' ]
@@ -457,8 +459,9 @@
                 warn("Error while shutting down: closing connection to %s",
                      self.address)
                 self.__server.unregister(self)
+                self.handleFail(1)
         else:
-            # We are in no state at all.
+            # We are in no state at all; disconnect
             self.__server.unregister(self)
 
     def finished(self):
@@ -469,6 +472,11 @@
         """Called when this connection is successfully shut down."""
         pass
 
+    def shutdownFailed(self):
+        """Called when this connection goes down hard."""
+        #DOCDOc
+        pass
+
     def shutdown(self, err=0, retriable=0):
         """Begin a shutdown on this connection"""
         if err:
@@ -482,8 +490,8 @@
         """Returns the current contents of the input buffer."""
         return "".join(self.__inbuf)
 
-    def getPeerPK(self):
-        return self.__con.get_peer_cert_pk()
+    def getTLSConnection(self):
+        return self.__con
 
     def handleFail(self, retriable=0):
         """Called when we shutdown with an error."""
@@ -638,7 +646,8 @@
     #     in the order we offer them.
     PROTOCOL_VERSIONS = [ '0.1', '0.2' ]
     def __init__(self, context, ip, port, keyID, messageList, handleList,
-                 sentCallback=None, failCallback=None):
+                 sentCallback=None, failCallback=None, finishedCallback=None,
+                 certCache=None):
         """Create a connection to send messages to an MMTP server.
            Raises socket.error if the connection fails.
         
@@ -661,10 +670,15 @@
             if m == 'JUNK':
                 self.junk.append(getCommonPRNG().getBytes(MESSAGE_LEN))
 
+        if certCache is None:
+            certCache = PeerCertificateCache()
+        self.certCache = certCache
+
         sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
         sock.setblocking(0)
         self.keyID = keyID
         self.ip = ip
+        self.port = port
         try:
             sock.connect((ip, port))
         except socket.error, e:
@@ -676,31 +690,41 @@
         tls = context.sock(sock)
 
         SimpleTLSConnection.__init__(self, sock, tls, 0, "%s:%s"%(ip,port))
-        self.messageList = messageList
-        self.handleList = handleList
+        self.messageList = messageList[:]
+        self.handleList = handleList[:]
         self.finished = self.__setupFinished
         self.sentCallback = sentCallback
         self.failCallback = failCallback
+        self.finishedCallback = finishedCallback #DOCDOC
         self.protocol = None
 
         debug("Opening client connection (fd %s)", self.fd)
 
+    def addMessages(self, messages, handles):
+        "DOCDOC"
+        for m in messages:
+            if m == "JUNK":
+                self.junk.append(getCommonPRNG().getBytes(MESSAGE_LEN))
+        self.messageList.extend(messages)
+        self.handleList.extend(handles)
+
+    def getAddr(self):
+        "DOCDOC"
+        return self.ip, self.port, self.keyID
+
     def __setupFinished(self):
         """Called when we're done with the client side negotations.
            Begins sending the protocol string.
         """
-        keyID = sha1(self.getPeerPK().encode_key(public=1))
-        if self.keyID is not None:
-            if self.keyID == NULL_KEYID:
-                trace("Ignoring Key ID from %s", self.address)
-            elif keyID != self.keyID:
-                warn("Got unexpected Key ID from %s; shutting down connection",
-                     self.address)
-                # The keyid may start being good in a while.
-                self.shutdown(err=1,retriable=1)
-                return
-            else:
-                debug("KeyID from %s is valid", self.address)
+        try:
+            self.certCache.check(self.getTLSConnection(), self.keyID,
+                                 self.address)
+        except MixProtocolError, e:
+            warn("%s.  Shutting down connection",e)
+            self.shutdown(err=1,retriable=1)
+            return
+        else:
+            debug("KeyID from %s is valid", self.address)
 
         self.beginWrite("MMTP %s\r\n"%(",".join(self.PROTOCOL_VERSIONS)))
         self.finished = self.__sentProtocol
@@ -804,6 +828,14 @@
         if self.failCallback is not None:
             for msg, handle in zip(self.messageList, self.handleList):
                 self.failCallback(msg,handle,retriable)
+        if self.finishedCallback is not None:
+            self.finishedCallback()
+            
+    def shutdownFinished(self):
+        if self.finishedCallback is not None:
+            self.finishedCallback()
+
+
 
 LISTEN_BACKLOG = 128
 class MMTPAsyncServer(AsyncServer):
@@ -811,6 +843,7 @@
        MMTPClientConnection, with a function to add new connections, and
        callbacks for message success and failure."""
     ##
+    # clientConByAddr
     def __init__(self, config, tls):
         AsyncServer.__init__(self)
 
@@ -834,6 +867,7 @@
         #self.config = config
         self.listener.register(self)
         self._timeout = config['Server']['Timeout'][2]
+        self.clientConByAddr = {}
 
     def getNextTimeoutTime(self, now):
         """Return the time at which we next purge connections, if we have
@@ -861,16 +895,35 @@
             assert len(h) < 32
 
         try:
+            #DOCDOC 
+            con = self.clientConByAddr[(ip,port,keyID)]
+            LOG.debug("Queueing %s messages on open connection to %s:%s",
+                      len(messages), ip, port)
+            con.addMessages(messages, handles)
+            return
+        except KeyError:
+            pass
+
+        try:
             con = MMTPClientConnection(self.context,
                                        ip, port, keyID, messages, handles,
                                        self.onMessageSent,
                                        self.onMessageUndeliverable)
+            #XXXX004
+            con.finishedCallback = lambda con=con: self.__clientFinished(con)
             con.register(self)
+            self.clientConByAddr[con.getAddr()] = con
         except socket.error, e:
             LOG.error("Unexpected socket error connecting to %s:%s: %s",
                       ip, port, e)
             for m,h in zip(messages, handles):
                 self.onMessageUndeliverable(m,h,1)
+
+    def __clientFinished(self, con):
+        try:
+            del self.clientConByAddr[con.getAddr()]
+        except KeyError:
+            LOG.warn("Didn't find client connection in address map")
 
     def onMessageReceived(self, msg):
         pass

Index: ServerKeys.py
===================================================================
RCS file: /home/minion/cvsroot/src/minion/lib/mixminion/server/ServerKeys.py,v
retrieving revision 1.14
retrieving revision 1.15
diff -u -d -r1.14 -r1.15
--- ServerKeys.py	26 Mar 2003 16:34:08 -0000	1.14
+++ ServerKeys.py	28 Mar 2003 15:36:23 -0000	1.15
@@ -7,7 +7,8 @@
    """
 #FFFF We need support for encrypting private keys.
 
-__all__ = [ "ServerKeyring", "generateServerDescriptorAndKeys" ]
+__all__ = [ "ServerKeyring", "generateServerDescriptorAndKeys",
+            "generateCertChain" ]
 
 import bisect
 import os
@@ -316,6 +317,13 @@
         return mixminion.server.PacketHandler.PacketHandler(packetKey,
                                                      hashlog)
 
+    def getAddress(self):
+        """Return out current ip/port/keyid tuple"""
+        keys = self.getServerKeyset()
+        desc = keys.getServerDescriptor()
+        return (desc['Server']['IP'],
+                desc['Incoming/MMTP']['Port'],
+                desc['Incoming/MMTP']['Key-Digest'])
 
 #----------------------------------------------------------------------
 class ServerKeyset:
@@ -502,11 +510,9 @@
                config['Server']['PublicKeySloppiness'][2]
 
     if useServerKeys is None:
-        # Create the X509 certificate.
-        mixminion.Crypto.generate_cert(serverKeys.getCertFileName(),
-                                       mmtpKey,
-                                       "MMTP certificate for %s" %nickname,
-                                       certStarts, certEnds)
+        # Create the X509 certificates
+        generateCertChain(serverKeys.getCertFileName(),
+                          mmtpKey, identityKey, nickname, certStarts, certEnds)
 
     mmtpProtocolsIn = mixminion.server.MMTPServer.MMTPServerConnection \
                       .PROTOCOL_VERSIONS[:]
@@ -517,6 +523,10 @@
     mmtpProtocolsIn = ",".join(mmtpProtocolsIn)
     mmtpProtocolsOut = ",".join(mmtpProtocolsOut)
 
+    identityKeyID = formatBase64(
+                      mixminion.Crypto.sha1(
+                          mixminion.Crypto.pk_encode_public_key(identityKey)))
+
     fields = {
         "IP": config['Incoming/MMTP'].get('IP', "0.0.0.0"),
         "Port": config['Incoming/MMTP'].get('Port', 0),
@@ -528,8 +538,7 @@
         "ValidUntil": formatDate(validUntil),
         "PacketKey":
            formatBase64(mixminion.Crypto.pk_encode_public_key(packetKey)),
-        "KeyID":
-           formatBase64(serverKeys.getMMTPKeyID()),
+        "KeyID": identityKeyID,        
         "MMTPProtocolsIn" : mmtpProtocolsIn,
         "MMTPProtocolsOut" : mmtpProtocolsOut,
         }
@@ -697,3 +706,34 @@
                     ", ".join(ip_set.keys())))
 
     return ip_set.keys()[0]
+
+def generateCertChain(filename, mmtpKey, identityKey, nickname,
+                      certStarts, certEnds):
+    "DOCDOC"
+    fname = filename+"_tmp"
+    mixminion.Crypto.generate_cert(fname,
+                                   mmtpKey, identityKey,
+                                   "%s<MMTP>" %nickname,
+                                   nickname,
+                                   certStarts, certEnds)
+    try:
+        f = open(fname)
+        certText = f.read()
+    finally:
+        f.close()
+    os.unlink(fname)
+    mixminion.Crypto.generate_cert(fname,
+                                   identityKey, identityKey,
+                                   nickname, nickname,
+                                   certStarts, certEnds)
+    try:
+        f = open(fname)
+        identityCertText = f.read()
+        f.close()
+        os.unlink(fname)
+        f = open(filename, 'w')
+        f.write(certText)
+        f.write(identityCertText)
+    finally:
+        f.close()
+

Index: ServerMain.py
===================================================================
RCS file: /home/minion/cvsroot/src/minion/lib/mixminion/server/ServerMain.py,v
retrieving revision 1.46
retrieving revision 1.47
diff -u -d -r1.46 -r1.47
--- ServerMain.py	27 Mar 2003 10:31:00 -0000	1.46
+++ ServerMain.py	28 Mar 2003 15:36:23 -0000	1.47
@@ -207,21 +207,26 @@
     """
     ## Fields:
     # server -- an instance of _MMTPServer
-    def __init__(self, location):
+    # addr -- (publishedIP, publishedPort, publishedKeyID)
+    # incomingQueue -- DOCDOC
+    def __init__(self, location, (ip,port,keyid)):
         """Create a new OutgoingQueue that stores its messages in a given
            location."""
         mixminion.server.ServerQueue.DeliveryQueue.__init__(self, location)
         self.server = None
+        self.incomingQueue = None
+        self.addr = (ip,port,keyid)
 
     def configure(self, config):
         """Set up this queue according to a ServerConfig object."""
         retry = config['Outgoing/MMTP']['Retry']
         self.setRetrySchedule(retry)
 
-    def connectQueues(self, server):
+    def connectQueues(self, server, incoming):
         """Set the MMTPServer that this OutgoingQueue informs of its
            deliverable messages."""
         self.server = server
+        self.incomingQueue = incoming
 
     def _deliverMessages(self, msgList):
         "Implementation of abstract method from DeliveryQueue."
@@ -237,6 +242,16 @@
             message = packet.getPacket()
             msgs.setdefault(addr, []).append( (handle, message) )
         for addr, messages in msgs.items():
+            if self.addr[:2] == (addr.ip, addr.port):
+                if self.addr[2] != addr.keyinfo:
+                    LOG.warn("Delivering messages to myself with bad KeyID")
+                for h,m in messages:
+                    LOG.trace("Delivering message %s to myself.",
+                              formatBase64(m[:8]))
+                    self.incomingQueue.queueMessage(m)
+                    self.deliverySucceeded(h)
+                continue
+
             handles, messages = zip(*messages)
             self.server.sendMessages(addr.ip, addr.port, addr.keyinfo,
                                      list(messages), list(handles))
@@ -445,6 +460,8 @@
         LOG.debug("Initializing MMTP server")
         self.mmtpServer = _MMTPServer(config, tlsContext)
 
+        publishedIP, publishedPort, publishedKeyID = self.keyring.getAddress()
+
         # FFFF Modulemanager should know about async so it can patch in if it
         # FFFF needs to.
         LOG.debug("Initializing delivery module")
@@ -468,7 +485,8 @@
 
         outgoingDir = os.path.join(queueDir, "outgoing")
         LOG.debug("Initializing outgoing queue")
-        self.outgoingQueue = OutgoingQueue(outgoingDir)
+        self.outgoingQueue = OutgoingQueue(outgoingDir,
+                               (publishedIP, publishedPort, publishedKeyID))
         self.outgoingQueue.configure(config)
         LOG.debug("Found %d pending messages in outgoing queue",
                        self.outgoingQueue.count())
@@ -481,7 +499,8 @@
                                        processingThread=self.processingThread)
         self.mixPool.connectQueues(outgoing=self.outgoingQueue,
                                    manager=self.moduleManager)
-        self.outgoingQueue.connectQueues(server=self.mmtpServer)
+        self.outgoingQueue.connectQueues(server=self.mmtpServer,
+                                         incoming=self.incomingQueue)
         self.mmtpServer.connectQueues(incoming=self.incomingQueue,
                                       outgoing=self.outgoingQueue)