[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]
[minion-cvs] Fix 3 outstanding MMTP issues: cert confusion, self-to-...
Update of /home/minion/cvsroot/src/minion/lib/mixminion/server
In directory moria.mit.edu:/tmp/cvs-serv20486/src/minion/lib/mixminion/server
Modified Files:
MMTPServer.py ServerKeys.py ServerMain.py
Log Message:
Fix 3 outstanding MMTP issues: cert confusion, self-to-self connections, avoid multiple connections to same server.
Index: MMTPServer.py
===================================================================
RCS file: /home/minion/cvsroot/src/minion/lib/mixminion/server/MMTPServer.py,v
retrieving revision 1.20
retrieving revision 1.21
diff -u -d -r1.20 -r1.21
--- MMTPServer.py 20 Feb 2003 16:57:40 -0000 1.20
+++ MMTPServer.py 28 Mar 2003 15:36:23 -0000 1.21
@@ -29,9 +29,11 @@
from types import StringType
import mixminion._minionlib as _ml
-from mixminion.Common import MixError, MixFatalError, LOG, stringContains
+from mixminion.Common import MixError, MixFatalError, MixProtocolError, \
+ LOG, stringContains
from mixminion.Crypto import sha1, getCommonPRNG
from mixminion.Packet import MESSAGE_LEN, DIGEST_LEN
+from mixminion.MMTPClient import PeerCertificateCache
__all__ = [ 'AsyncServer', 'ListenConnection', 'MMTPServerConnection',
'MMTPClientConnection' ]
@@ -457,8 +459,9 @@
warn("Error while shutting down: closing connection to %s",
self.address)
self.__server.unregister(self)
+ self.handleFail(1)
else:
- # We are in no state at all.
+ # We are in no state at all; disconnect
self.__server.unregister(self)
def finished(self):
@@ -469,6 +472,11 @@
"""Called when this connection is successfully shut down."""
pass
+ def shutdownFailed(self):
+ """Called when this connection goes down hard."""
+ #DOCDOc
+ pass
+
def shutdown(self, err=0, retriable=0):
"""Begin a shutdown on this connection"""
if err:
@@ -482,8 +490,8 @@
"""Returns the current contents of the input buffer."""
return "".join(self.__inbuf)
- def getPeerPK(self):
- return self.__con.get_peer_cert_pk()
+ def getTLSConnection(self):
+ return self.__con
def handleFail(self, retriable=0):
"""Called when we shutdown with an error."""
@@ -638,7 +646,8 @@
# in the order we offer them.
PROTOCOL_VERSIONS = [ '0.1', '0.2' ]
def __init__(self, context, ip, port, keyID, messageList, handleList,
- sentCallback=None, failCallback=None):
+ sentCallback=None, failCallback=None, finishedCallback=None,
+ certCache=None):
"""Create a connection to send messages to an MMTP server.
Raises socket.error if the connection fails.
@@ -661,10 +670,15 @@
if m == 'JUNK':
self.junk.append(getCommonPRNG().getBytes(MESSAGE_LEN))
+ if certCache is None:
+ certCache = PeerCertificateCache()
+ self.certCache = certCache
+
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.setblocking(0)
self.keyID = keyID
self.ip = ip
+ self.port = port
try:
sock.connect((ip, port))
except socket.error, e:
@@ -676,31 +690,41 @@
tls = context.sock(sock)
SimpleTLSConnection.__init__(self, sock, tls, 0, "%s:%s"%(ip,port))
- self.messageList = messageList
- self.handleList = handleList
+ self.messageList = messageList[:]
+ self.handleList = handleList[:]
self.finished = self.__setupFinished
self.sentCallback = sentCallback
self.failCallback = failCallback
+ self.finishedCallback = finishedCallback #DOCDOC
self.protocol = None
debug("Opening client connection (fd %s)", self.fd)
+ def addMessages(self, messages, handles):
+ "DOCDOC"
+ for m in messages:
+ if m == "JUNK":
+ self.junk.append(getCommonPRNG().getBytes(MESSAGE_LEN))
+ self.messageList.extend(messages)
+ self.handleList.extend(handles)
+
+ def getAddr(self):
+ "DOCDOC"
+ return self.ip, self.port, self.keyID
+
def __setupFinished(self):
"""Called when we're done with the client side negotations.
Begins sending the protocol string.
"""
- keyID = sha1(self.getPeerPK().encode_key(public=1))
- if self.keyID is not None:
- if self.keyID == NULL_KEYID:
- trace("Ignoring Key ID from %s", self.address)
- elif keyID != self.keyID:
- warn("Got unexpected Key ID from %s; shutting down connection",
- self.address)
- # The keyid may start being good in a while.
- self.shutdown(err=1,retriable=1)
- return
- else:
- debug("KeyID from %s is valid", self.address)
+ try:
+ self.certCache.check(self.getTLSConnection(), self.keyID,
+ self.address)
+ except MixProtocolError, e:
+ warn("%s. Shutting down connection",e)
+ self.shutdown(err=1,retriable=1)
+ return
+ else:
+ debug("KeyID from %s is valid", self.address)
self.beginWrite("MMTP %s\r\n"%(",".join(self.PROTOCOL_VERSIONS)))
self.finished = self.__sentProtocol
@@ -804,6 +828,14 @@
if self.failCallback is not None:
for msg, handle in zip(self.messageList, self.handleList):
self.failCallback(msg,handle,retriable)
+ if self.finishedCallback is not None:
+ self.finishedCallback()
+
+ def shutdownFinished(self):
+ if self.finishedCallback is not None:
+ self.finishedCallback()
+
+
LISTEN_BACKLOG = 128
class MMTPAsyncServer(AsyncServer):
@@ -811,6 +843,7 @@
MMTPClientConnection, with a function to add new connections, and
callbacks for message success and failure."""
##
+ # clientConByAddr
def __init__(self, config, tls):
AsyncServer.__init__(self)
@@ -834,6 +867,7 @@
#self.config = config
self.listener.register(self)
self._timeout = config['Server']['Timeout'][2]
+ self.clientConByAddr = {}
def getNextTimeoutTime(self, now):
"""Return the time at which we next purge connections, if we have
@@ -861,16 +895,35 @@
assert len(h) < 32
try:
+ #DOCDOC
+ con = self.clientConByAddr[(ip,port,keyID)]
+ LOG.debug("Queueing %s messages on open connection to %s:%s",
+ len(messages), ip, port)
+ con.addMessages(messages, handles)
+ return
+ except KeyError:
+ pass
+
+ try:
con = MMTPClientConnection(self.context,
ip, port, keyID, messages, handles,
self.onMessageSent,
self.onMessageUndeliverable)
+ #XXXX004
+ con.finishedCallback = lambda con=con: self.__clientFinished(con)
con.register(self)
+ self.clientConByAddr[con.getAddr()] = con
except socket.error, e:
LOG.error("Unexpected socket error connecting to %s:%s: %s",
ip, port, e)
for m,h in zip(messages, handles):
self.onMessageUndeliverable(m,h,1)
+
+ def __clientFinished(self, con):
+ try:
+ del self.clientConByAddr[con.getAddr()]
+ except KeyError:
+ LOG.warn("Didn't find client connection in address map")
def onMessageReceived(self, msg):
pass
Index: ServerKeys.py
===================================================================
RCS file: /home/minion/cvsroot/src/minion/lib/mixminion/server/ServerKeys.py,v
retrieving revision 1.14
retrieving revision 1.15
diff -u -d -r1.14 -r1.15
--- ServerKeys.py 26 Mar 2003 16:34:08 -0000 1.14
+++ ServerKeys.py 28 Mar 2003 15:36:23 -0000 1.15
@@ -7,7 +7,8 @@
"""
#FFFF We need support for encrypting private keys.
-__all__ = [ "ServerKeyring", "generateServerDescriptorAndKeys" ]
+__all__ = [ "ServerKeyring", "generateServerDescriptorAndKeys",
+ "generateCertChain" ]
import bisect
import os
@@ -316,6 +317,13 @@
return mixminion.server.PacketHandler.PacketHandler(packetKey,
hashlog)
+ def getAddress(self):
+ """Return out current ip/port/keyid tuple"""
+ keys = self.getServerKeyset()
+ desc = keys.getServerDescriptor()
+ return (desc['Server']['IP'],
+ desc['Incoming/MMTP']['Port'],
+ desc['Incoming/MMTP']['Key-Digest'])
#----------------------------------------------------------------------
class ServerKeyset:
@@ -502,11 +510,9 @@
config['Server']['PublicKeySloppiness'][2]
if useServerKeys is None:
- # Create the X509 certificate.
- mixminion.Crypto.generate_cert(serverKeys.getCertFileName(),
- mmtpKey,
- "MMTP certificate for %s" %nickname,
- certStarts, certEnds)
+ # Create the X509 certificates
+ generateCertChain(serverKeys.getCertFileName(),
+ mmtpKey, identityKey, nickname, certStarts, certEnds)
mmtpProtocolsIn = mixminion.server.MMTPServer.MMTPServerConnection \
.PROTOCOL_VERSIONS[:]
@@ -517,6 +523,10 @@
mmtpProtocolsIn = ",".join(mmtpProtocolsIn)
mmtpProtocolsOut = ",".join(mmtpProtocolsOut)
+ identityKeyID = formatBase64(
+ mixminion.Crypto.sha1(
+ mixminion.Crypto.pk_encode_public_key(identityKey)))
+
fields = {
"IP": config['Incoming/MMTP'].get('IP', "0.0.0.0"),
"Port": config['Incoming/MMTP'].get('Port', 0),
@@ -528,8 +538,7 @@
"ValidUntil": formatDate(validUntil),
"PacketKey":
formatBase64(mixminion.Crypto.pk_encode_public_key(packetKey)),
- "KeyID":
- formatBase64(serverKeys.getMMTPKeyID()),
+ "KeyID": identityKeyID,
"MMTPProtocolsIn" : mmtpProtocolsIn,
"MMTPProtocolsOut" : mmtpProtocolsOut,
}
@@ -697,3 +706,34 @@
", ".join(ip_set.keys())))
return ip_set.keys()[0]
+
+def generateCertChain(filename, mmtpKey, identityKey, nickname,
+ certStarts, certEnds):
+ "DOCDOC"
+ fname = filename+"_tmp"
+ mixminion.Crypto.generate_cert(fname,
+ mmtpKey, identityKey,
+ "%s<MMTP>" %nickname,
+ nickname,
+ certStarts, certEnds)
+ try:
+ f = open(fname)
+ certText = f.read()
+ finally:
+ f.close()
+ os.unlink(fname)
+ mixminion.Crypto.generate_cert(fname,
+ identityKey, identityKey,
+ nickname, nickname,
+ certStarts, certEnds)
+ try:
+ f = open(fname)
+ identityCertText = f.read()
+ f.close()
+ os.unlink(fname)
+ f = open(filename, 'w')
+ f.write(certText)
+ f.write(identityCertText)
+ finally:
+ f.close()
+
Index: ServerMain.py
===================================================================
RCS file: /home/minion/cvsroot/src/minion/lib/mixminion/server/ServerMain.py,v
retrieving revision 1.46
retrieving revision 1.47
diff -u -d -r1.46 -r1.47
--- ServerMain.py 27 Mar 2003 10:31:00 -0000 1.46
+++ ServerMain.py 28 Mar 2003 15:36:23 -0000 1.47
@@ -207,21 +207,26 @@
"""
## Fields:
# server -- an instance of _MMTPServer
- def __init__(self, location):
+ # addr -- (publishedIP, publishedPort, publishedKeyID)
+ # incomingQueue -- DOCDOC
+ def __init__(self, location, (ip,port,keyid)):
"""Create a new OutgoingQueue that stores its messages in a given
location."""
mixminion.server.ServerQueue.DeliveryQueue.__init__(self, location)
self.server = None
+ self.incomingQueue = None
+ self.addr = (ip,port,keyid)
def configure(self, config):
"""Set up this queue according to a ServerConfig object."""
retry = config['Outgoing/MMTP']['Retry']
self.setRetrySchedule(retry)
- def connectQueues(self, server):
+ def connectQueues(self, server, incoming):
"""Set the MMTPServer that this OutgoingQueue informs of its
deliverable messages."""
self.server = server
+ self.incomingQueue = incoming
def _deliverMessages(self, msgList):
"Implementation of abstract method from DeliveryQueue."
@@ -237,6 +242,16 @@
message = packet.getPacket()
msgs.setdefault(addr, []).append( (handle, message) )
for addr, messages in msgs.items():
+ if self.addr[:2] == (addr.ip, addr.port):
+ if self.addr[2] != addr.keyinfo:
+ LOG.warn("Delivering messages to myself with bad KeyID")
+ for h,m in messages:
+ LOG.trace("Delivering message %s to myself.",
+ formatBase64(m[:8]))
+ self.incomingQueue.queueMessage(m)
+ self.deliverySucceeded(h)
+ continue
+
handles, messages = zip(*messages)
self.server.sendMessages(addr.ip, addr.port, addr.keyinfo,
list(messages), list(handles))
@@ -445,6 +460,8 @@
LOG.debug("Initializing MMTP server")
self.mmtpServer = _MMTPServer(config, tlsContext)
+ publishedIP, publishedPort, publishedKeyID = self.keyring.getAddress()
+
# FFFF Modulemanager should know about async so it can patch in if it
# FFFF needs to.
LOG.debug("Initializing delivery module")
@@ -468,7 +485,8 @@
outgoingDir = os.path.join(queueDir, "outgoing")
LOG.debug("Initializing outgoing queue")
- self.outgoingQueue = OutgoingQueue(outgoingDir)
+ self.outgoingQueue = OutgoingQueue(outgoingDir,
+ (publishedIP, publishedPort, publishedKeyID))
self.outgoingQueue.configure(config)
LOG.debug("Found %d pending messages in outgoing queue",
self.outgoingQueue.count())
@@ -481,7 +499,8 @@
processingThread=self.processingThread)
self.mixPool.connectQueues(outgoing=self.outgoingQueue,
manager=self.moduleManager)
- self.outgoingQueue.connectQueues(server=self.mmtpServer)
+ self.outgoingQueue.connectQueues(server=self.mmtpServer,
+ incoming=self.incomingQueue)
self.mmtpServer.connectQueues(incoming=self.incomingQueue,
outgoing=self.outgoingQueue)