[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]
[minion-cvs] Move all the pure-server modules into a new package, "m...
Update of /home/minion/cvsroot/src/minion/lib/mixminion/server
In directory moria.mit.edu:/tmp/cvs-serv11907/lib/mixminion/server
Added Files:
HashLog.py MMTPServer.py Modules.py PacketHandler.py Queue.py
ServerMain.py __init__.py
Log Message:
Move all the pure-server modules into a new package, 'mixminion.server.'
We still need to split 'Config' and 'ServerInfo' into their client and server
components.
--- NEW FILE: HashLog.py ---
# Copyright 2002 Nick Mathewson. See LICENSE for licensing information.
# $Id: HashLog.py,v 1.1 2002/12/11 06:58:55 nickm Exp $
"""mixminion.HashLog
Persistant memory for the hashed secrets we've seen. Used by
PacketHandler to prevent replay attacks."""
import os
import anydbm, dumbdbm
from mixminion.Common import MixFatalError, LOG, createPrivateDir
from mixminion.Packet import DIGEST_LEN
__all__ = [ 'HashLog' ]
# FFFF Mechanism to force a different default db module.
# FFFF two-copy journaling to protect against catastrophic failure that
# FFFF underlying DB code can't handle.
# flags to pass to os.open when opening the journal file.
_JOURNAL_OPEN_FLAGS = os.O_WRONLY|os.O_CREAT|getattr(os,'O_SYNC',0)
class HashLog:
"""A HashLog is a file containing a list of message digests that we've
already processed.
Each HashLog corresponds to a single public key (whose hash is the
log's keyid). A HashLog must persist for as long as the key does.
It is not necessary to sync the HashLog to the disk every time
a new message is seen; instead, we must only ensure that every
_retransmitted_ message is first inserted into the hashlog and
synced. (One way to implement this is to process messages from
'state A' into 'state B', marking them in the hashlog as we go,
and syncing the hashlog before any message is sent from 'B' to
the network. On a restart, we reinsert all messages waiting in 'B'
into the log.)
HashLogs are implemented using Python's anydbm interface. This defaults
to using Berkeley DB, GDBM, or --if you have none of these-- a flat
text file.
The base HashLog implementation assumes an 8-bit-clean database that
maps strings to strings."""
##
# Internally, we also keep a flat 'journal' file to which we append
# values that we've seen but not yet written to the database. This way
# we can survive crashes between 'logHash' and 'sync'.
#
# Fields:
# log: an anydbm instance.
# journalFileName: the name of our journal file
# journalFile: a file object for our journal file
# journal: a dictionary, used to cache values currently in the
# journal file.
def __init__(self, filename, keyid):
"""Create a new HashLog to store data in 'filename' for the key
'keyid'."""
parent = os.path.split(filename)[0]
createPrivateDir(parent)
self.log = anydbm.open(filename, 'c')
LOG.debug("Opening database %s for packet digests", filename)
if isinstance(self.log, dumbdbm._Database):
LOG.warn("Warning: logging packet digests to a flat file.")
try:
if self.log["KEYID"] != keyid:
raise MixFatalError("Log KEYID does not match current KEYID")
except KeyError:
self.log["KEYID"] = keyid
self.journalFileName = filename+"_jrnl"
self.journal = {}
if os.path.exists(self.journalFileName):
f = open(self.journalFileName, 'r')
# FFFF deal with really big journals?
j = f.read()
for i in xrange(0, len(j), DIGEST_LEN):
self.journal[j[i:i+DIGEST_LEN]] = 1
f.close()
self.journalFile = os.open(self.journalFileName,
_JOURNAL_OPEN_FLAGS|os.O_APPEND, 0600)
def seenHash(self, hash):
"""Return true iff 'hash' has been logged before."""
try:
if self.journal.get(hash,0):
return 1
_ = self.log[hash]
return 1
except KeyError:
return 0
def logHash(self, hash):
"""Insert 'hash' into the database."""
assert len(hash) == DIGEST_LEN
self.journal[hash] = 1
os.write(self.journalFile, hash)
def sync(self):
"""Flushes changes to this log to the filesystem."""
for hash in self.journal.keys():
self.log[hash] = "1"
if hasattr(self.log, "sync"):
self.log.sync()
os.close(self.journalFile)
self.journalFile = os.open(self.journalFileName,
_JOURNAL_OPEN_FLAGS|os.O_TRUNC, 0600)
self.journal = {}
def close(self):
"""Closes this log."""
self.sync()
self.log.close()
os.close(self.journalFile)
--- NEW FILE: MMTPServer.py ---
# Copyright 2002 Nick Mathewson. See LICENSE for licensing information.
# $Id: MMTPServer.py,v 1.1 2002/12/11 06:58:55 nickm Exp $
"""mixminion.MMTPServer
This package implements the Mixminion Transfer Protocol as described
in the Mixminion specification. It uses a select loop to provide
a nonblocking implementation of *both* the client and the server sides
of the protocol.
If you just want to send messages into the system, use MMTPClient.
FFFF As yet unsupported are: Session resumption, key renegotiation,
FFFF: Also unsupported: timeouts."""
# NOTE FOR THE CURIOUS: The 'asyncore' module in the standard library
# is another general select/poll wrapper... so why are we using our
# own? Basically, because asyncore has IMO a couple of mismatches
# with our design, the largest of which is that it has the 'server
# loop' periodically query the connections for their status,
# whereas we have the connections inform the server of their status
# whenever they change. This latter approach turns out to be far
# easier to use with TLS.
import errno
import socket
import select
import re
import time
from types import StringType
import mixminion._minionlib as _ml
from mixminion.Common import MixError, MixFatalError, LOG, stringContains
from mixminion.Crypto import sha1
from mixminion.Packet import MESSAGE_LEN, DIGEST_LEN
__all__ = [ 'AsyncServer', 'ListenConnection', 'MMTPServerConnection',
'MMTPClientConnection' ]
trace = LOG.trace
info = LOG.info
debug = LOG.info
warn = LOG.warn
error = LOG.error
class AsyncServer:
"""AsyncServer is the core of a general-purpose asynchronous
select-based server loop. AsyncServer maintains two lists of
Connection objects that are waiting for reads and writes
(respectively), and waits for their underlying sockets to be
available for the desired operations.
"""
## Fields:
# writers: map from fd to 'Connection' objects that are interested
# in write events.
# readers: map from fd to 'Connection' objects that are interested
# in read events.
def __init__(self):
"""Create a new AsyncServer with no readers or writers."""
self.writers = {}
self.readers = {}
def process(self, timeout):
"""If any relevant file descriptors become available within
'timeout' seconds, call the appropriate methods on their
connections and return immediately after. Otherwise, wait
'timeout' seconds and return.
If we receive an unblocked signal, return immediately.
"""
## trace("%s readers (%s), %s writers (%s)" % (len(self.readers),
## readers,
## len(self.writers),
## writers))
readfds = self.readers.keys()
writefds = self.writers.keys()
try:
readfds,writefds,exfds = select.select(readfds,writefds,[],timeout)
except select.error, e:
if e[0] == errno.EINTR:
return
else:
raise e
for fd in readfds:
trace("Select got a read on fd %s",fd)
self.readers[fd].handleRead()
for fd in writefds:
trace("Select got a write on fd %s", fd)
self.writers[fd].handleWrite()
for fd in exfds:
trace("Select got an exception on fd %s", fd)
if self.readers.has_key(fd): del self.readers[fd]
if self.writers.has_key(fd): del self.writers[fd]
def hasReader(self, reader):
"""Return true iff 'reader' is a reader on this server."""
fd = reader.fileno()
return self.readers.get(fd, None) is reader
def hasWriter(self, writer):
"""Return true iff 'writer' is a writer on this server."""
fd = writer.fileno()
return self.writers.get(fd, None) is writer
def registerReader(self, reader):
"""Register a connection as a reader. The connection's 'handleRead'
method will be called whenever data is available for reading."""
fd = reader.fileno()
self.readers[fd] = reader
if self.writers.has_key(fd):
del self.writers[fd]
def registerWriter(self, writer):
"""Register a connection as a writer. The connection's 'handleWrite'
method will be called whenever the buffer is free for writing.
"""
fd = writer.fileno()
self.writers[fd] = writer
if self.readers.has_key(fd):
del self.readers[fd]
def registerBoth(self, connection):
"""Register a connection as a reader and a writer. The
connection's 'handleRead' and 'handleWrite' methods will be
called as appropriate.
"""
fd = connection.fileno()
self.readers[fd] = self.writers[fd] = connection
def unregister(self, connection):
"""Removes a connection from this server."""
fd = connection.fileno()
w = self.writers.has_key(fd)
r = self.readers.has_key(fd)
if r: del self.readers[fd]
if w: del self.writers[fd]
class Connection:
"A connection is an abstract superclass for asynchronous channels"
def handleRead(self):
"""Invoked when there is data to read."""
pass
def handleWrite(self):
"""Invoked when there is data to write."""
pass
def register(self, server):
"""Invoked to register this connection with an AsyncServer."""
pass
def fileno(self):
"""Returns an integer file descriptor for this connection, or returns
an object that can return such a descriptor."""
pass
class ListenConnection(Connection):
"""A ListenConnection listens on a given port/ip combination, and calls
a 'connectionFactory' method whenever a new connection is made to that
port."""
## Fields:
# ip: IP to listen on.
# port: port to listen on.
# sock: socket to bind.
# connectionFactory: a function that takes as input a socket from a
# newly received connection, and returns a Connection object to
# register with the async server.
def __init__(self, ip, port, backlog, connectionFactory):
"""Create a new ListenConnection"""
self.ip = ip
self.port = port
self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.sock.setblocking(0)
self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
self.sock.bind((self.ip, self.port))
self.sock.listen(backlog)
self.connectionFactory = connectionFactory
info("Listening at %s on port %s (fd %s)", ip, port,self.sock.fileno())
def register(self, server):
server.registerReader(self)
self.server = server
def handleRead(self):
con, addr = self.sock.accept()
debug("Accepted connection from %s (fd %s)", addr, con.fileno())
rw = self.connectionFactory(con)
rw.register(self.server)
def shutdown(self):
debug("Closing listener connection (fd %s)", self.sock.fileno())
self.server.unregister(self)
del self.server
self.sock.close()
info("Server connection closed")
def fileno(self):
return self.sock.fileno()
class SimpleTLSConnection(Connection):
"""SimpleTLSConnection is an abstract superclass for asynchronous TLS
connections. Conceptually, a SimpleTLSConnection is in one of 5 states:
1. Negotiating a new connection (server side)
2. Negotiating a new connection (client side)
3. Reading
4. Writing
5. Shutting down.
Reads procede until either a given number of bytes have been received,
or until a provided terminator has been found. Writes procede until
a buffer is exhausted.
After leaving states 1,2,3,or 4, the connection's "finished" method
is called. After leaving state 5, the connection's "shutdownFinished"
method is called.
"""
# Fields:
# lastActivity: the last time when we had a read or a write.
# address: Human readable IP address of our peer. For debugging.
# fd: fileno for the underlying socket __con.
#
# __con: an underlying TLS object
# __state: a callback to use whenever we get a read or a write. May
# throw _ml.TLSWantRead or _ml.TLSWantWrite. See __acceptFn,
# __connectFn, __shutdownFn, __readFn, __writeFn.
# __server: an AsyncServer.
# __inbuf: A list of strings that we've read since the last expectRead.
# __inbuflen: The total length of all the strings in __inbuf
# __expectReadLen: None, or the number of bytes to read before
# the current read succeeds.
# __terminator: None, or a string which will terminate the current read.
# __outbuf: None, or the remainder of the string we're currently
# writing.
def __init__(self, sock, tls, serverMode):
"""Create a new SimpleTLSConnection.
tls -- An underlying TLS connection.
serverMode -- If true, we start with a server-side negotatiation.
otherwise, we start with a client-side negotatiation.
"""
self.__sock = sock
self.address = "%s:%s" % sock.getpeername()
self.__con = tls
self.fd = self.__con.fileno()
if serverMode:
self.__state = self.__acceptFn
else:
self.__state = self.__connectFn
def isShutdown(self):
"""Returns true iff this connection is finished shutting down"""
return self.__state is None
def register(self, server):
self.__server = server
if self.__state == self.__acceptFn:
server.registerReader(self)
else:
assert self.__state == self.__connectFn
server.registerWriter(self)
def expectRead(self, bytes=None, terminator=None):
"""Begin reading from the underlying TLS connection.
After the read is finished, this object's finished method
is invoked. A call to 'getInput' will retrieve the contents
of the input buffer since the last call to 'expectRead'.
If 'terminator' is not provided, we try to read exactly
'bytes' bytes. If terminator is provided, we read until we
encounter the terminator, but give up after 'bytes' bytes.
"""
self.__inbuf = []
self.__inbuflen = 0
self.__expectReadLen = bytes
self.__terminator = terminator
self.__state = self.__readFn
self.__server.registerReader(self)
def beginWrite(self, str):
"""Begin writing a string to the underlying connection. When the
string is completely written, this object's "finished" method
will be called.
"""
self.__outbuf = str
self.__state = self.__writeFn
self.__server.registerWriter(self)
def __acceptFn(self):
"""Hook to implement server-side handshake."""
self.__con.accept() #may throw want*
self.__server.unregister(self)
self.finished()
def __connectFn(self):
"""Hook to implement client-side handshake."""
self.__con.connect() #may throw want*
self.__server.unregister(self)
self.finished()
def __shutdownFn(self):
"""Hook to implement shutdown."""
# This is a bit subtle. The underlying 'shutdown' method
# needs to be retried till the other guy sends an 'ack'
# back... but we don't want to keep retrying indefinitely, or
# else we can deadlock on a connection from ourself to
# ourself.
if self.__con.shutdown() == 1: #may throw want*
trace("Got a 1 on shutdown (fd %s)", self.fd)
self.__server.unregister(self)
self.__state = None
self.__sock.close()
self.shutdownFinished()
return
# If we don't get any response on shutdown, stop blocking; the other
# side may be hostile, confused, or deadlocking.
trace("Got a 0 on shutdown (fd %s)", self.fd)
# ???? Is 'wantread' always correct?
# ???? Rather than waiting for a read, should we use a timer or
# ???? something?
raise _ml.TLSWantRead()
def __readFn(self):
"""Hook to implement read"""
while 1:
r = self.__con.read(1024) #may throw want*
if r == 0:
trace("read returned 0 (fd %s)", self.fd)
self.shutdown(err=0)
return
else:
assert isinstance(r, StringType)
trace("read got %s bytes (fd %s)", len(r), self.fd)
self.__inbuf.append(r)
self.__inbuflen += len(r)
if not self.__con.pending():
break
if self.__terminator and len(self.__inbuf) > 1:
self.__inbuf = ["".join(self.__inbuf)]
if self.__expectReadLen and self.__inbuflen > self.__expectReadLen:
warn("Protocol violation: too much data. Closing connection to %s",
self.address)
self.shutdown(err=1, retriable=0)
return
if self.__terminator and stringContains(self.__inbuf[0],
self.__terminator):
trace("read found terminator (fd %s)", self.fd)
self.__server.unregister(self)
self.finished()
if self.__expectReadLen and (self.__inbuflen == self.__expectReadLen):
trace("read got enough (fd %s)", self.fd)
self.__server.unregister(self)
self.finished()
def __writeFn(self):
"""Hook to implement write"""
out = self.__outbuf
while len(out):
r = self.__con.write(out) # may throw
assert r > 0
out = out[r:]
self.__outbuf = out
if len(out) == 0:
self.finished()
def handleRead(self):
self.__handleAll()
def handleWrite(self):
self.__handleAll()
def __handleAll(self):
"""Underlying implementation of TLS connection: traverses as
many states as possible until some operation blocks on
reading or writing, or until the current __state becomes
None.
"""
self.lastActivity = time.time()
try:
# We have a while loop here so that, upon entering a new
# state, we immediately see if we can go anywhere with it
# without blocking.
while self.__state is not None:
self.__state()
except _ml.TLSWantWrite:
self.__server.registerWriter(self)
except _ml.TLSWantRead:
self.__server.registerReader(self)
except _ml.TLSClosed:
warn("Unexpectedly closed connection to %s", self.address)
self.handleFail(retriable=1)
self.__sock.close()
self.__server.unregister(self)
except _ml.TLSError:
if self.__state != self.__shutdownFn:
warn("Unexpected error: closing connection to %s",
self.address)
self.shutdown(err=1, retriable=1)
else:
warn("Error while shutting down: closing connection to %s",
self.address)
self.__server.unregister(self)
else:
# We are in no state at all.
self.__server.unregister(self)
def finished(self):
"""Called whenever a connect, accept, read, or write is finished."""
pass
def shutdownFinished(self):
"""Called when this connection is successfully shut down."""
pass
def shutdown(self, err=0, retriable=0):
"""Begin a shutdown on this connection"""
if err:
self.handleFail(retriable)
self.__state = self.__shutdownFn
def fileno(self):
return self.fd
def getInput(self):
"""Returns the current contents of the input buffer."""
return "".join(self.__inbuf)
def getPeerPK(self):
return self.__con.get_peer_cert_pk()
def handleFail(self, retriable=0):
"""Called when we shutdown with an error."""
pass
#----------------------------------------------------------------------
# Implementation for MMTP.
# The protocol string to send.
PROTOCOL_STRING = "MMTP 0.1\r\n"
# The protocol specification to expect.
PROTOCOL_RE = re.compile("MMTP ([^\s\r\n]+)\r\n")
# Control line for sending a message.
SEND_CONTROL = "SEND\r\n"
# Control line for sending padding.
JUNK_CONTROL = "JUNK\r\n"
# Control line for acknowledging a message
RECEIVED_CONTROL = "RECEIVED\r\n"
SEND_CONTROL_LEN = len(SEND_CONTROL)
RECEIVED_CONTROL_LEN = len(RECEIVED_CONTROL)
SEND_RECORD_LEN = len(SEND_CONTROL) + MESSAGE_LEN + DIGEST_LEN
RECEIVED_RECORD_LEN = RECEIVED_CONTROL_LEN + DIGEST_LEN
class MMTPServerConnection(SimpleTLSConnection):
'''An asynchronous implementation of the receiving side of an MMTP
connection.'''
## Fields:
# messageConsumer: a function to call with all received messages.
# finished: callback when we're done with a read or write; see
# SimpleTLSConnection.
def __init__(self, sock, tls, consumer):
"""Create an MMTP connection to receive messages sent along a given
socket. When valid packets are received, pass them to the
function 'consumer'."""
SimpleTLSConnection.__init__(self, sock, tls, 1)
self.messageConsumer = consumer
self.finished = self.__setupFinished
def __setupFinished(self):
"""Called once we're done accepting. Begins reading the protocol
string.
"""
self.finished = self.__receivedProtocol
self.expectRead(1024, '\n')
def __receivedProtocol(self):
"""Called once we're done reading the protocol string. Either
rejects, or sends our response.
"""
trace("done w/ client sendproto (fd %s)", self.fd)
inp = self.getInput()
m = PROTOCOL_RE.match(inp)
if not m:
warn("Bad protocol list. Closing connection to %s", self.address)
self.shutdown(err=1)
protocols = m.group(1).split(",")
if "0.1" not in protocols:
warn("Unsupported protocol list. Closing connection to %s",
self.address)
self.shutdown(err=1); return
else:
trace("protocol ok (fd %s)", self.fd)
self.finished = self.__sentProtocol
self.beginWrite(PROTOCOL_STRING)
def __sentProtocol(self):
"""Called once we're done sending our protocol response. Begins
reading a packet from the line.
"""
trace("done w/ server sendproto (fd %s)", self.fd)
self.finished = self.__receivedMessage
self.expectRead(SEND_RECORD_LEN)
def __receivedMessage(self):
"""Called once we've read a message from the line. Checks the
digest, and either rejects or begins sending an ACK."""
data = self.getInput()
msg = data[SEND_CONTROL_LEN:-DIGEST_LEN]
digest = data[-DIGEST_LEN:]
if data.startswith(JUNK_CONTROL):
expectedDigest = sha1(msg+"JUNK")
replyDigest = sha1(msg+"RECEIVED JUNK")
elif data.startswith(SEND_CONTROL):
expectedDigest = sha1(msg+"SEND")
replyDigest = sha1(msg+"RECEIVED")
else:
warn("Unrecognized command from %s. Closing connection.",
self.address)
self.shutdown(err=1)
return
if expectedDigest != digest:
warn("Invalid checksum from %s. Closing connection",
self.address)
self.shutdown(err=1)
return
else:
debug("%s packet received from %s; Checksum valid.",
data[:4], self.address)
self.finished = self.__sentAck
self.beginWrite(RECEIVED_CONTROL+replyDigest)
self.messageConsumer(msg)
def __sentAck(self):
"""Called once we're done sending an ACK. Begins reading a new
message."""
debug("Send ACK for message from %s (fd %s)", self.address, self.fd)
#FFFF Rehandshake
self.finished = self.__receivedMessage
self.expectRead(SEND_RECORD_LEN)
#----------------------------------------------------------------------
# FFFF We need to note retriable situations better.
class MMTPClientConnection(SimpleTLSConnection):
"""Asynchronious implementation of the sending ("client") side of a
mixminion connection."""
## Fields:
# ip, port, keyID, messageList, handleList, sendCallback, failCallback:
# As described in the docstring for __init__ below.
def __init__(self, context, ip, port, keyID, messageList, handleList,
sentCallback=None, failCallback=None):
"""Create a connection to send messages to an MMTP server.
ip -- The IP of the destination server.
port -- The port to connect to.
keyID -- None, or the expected SHA1 hash of the server's public key
messageList -- a list of message payloads to send
handleList -- a list of objects corresponding to the messages in
messageList. Used for callback.
sentCallback -- None, or a function of (msg, handle) to be called
whenever a message is successfully sent.
failCallback -- None, or a function of (msg, handle, retriable)
to be called when messages can't be sent."""
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.setblocking(0)
self.keyID = keyID
self.ip = ip
try:
sock.connect((ip, port))
except socket.error:
# This will always raise an error, since we're nonblocking. That's
# okay.
pass
tls = context.sock(sock)
SimpleTLSConnection.__init__(self, sock, tls, 0)
self.messageList = messageList
self.handleList = handleList
self.finished = self.__setupFinished
self.sentCallback = sentCallback
self.failCallback = failCallback
debug("Opening client connection (fd %s)", self.fd)
def __setupFinished(self):
"""Called when we're done with the client side negotations.
Begins sending the protocol string.
"""
keyID = sha1(self.getPeerPK().encode_key(public=1))
if self.keyID is not None:
if keyID != self.keyID:
warn("Got unexpected Key ID from %s", self.address)
# This may work again in a couple of hours
self.shutdown(err=1,retriable=1)
else:
debug("KeyID from %s is valid", self.address)
self.beginWrite(PROTOCOL_STRING)
self.finished = self.__sentProtocol
def __sentProtocol(self):
"""Called when we're done sending the protocol string. Begins
reading the server's response.
"""
self.expectRead(1024, '\n')
self.finished = self.__receivedProtocol
def __receivedProtocol(self):
"""Called when we're done receiving the protocol string. Begins
sending a packet, or exits if we're done sending.
"""
inp = self.getInput()
if inp != PROTOCOL_STRING:
warn("Invalid protocol. Closing connection to %s", self.address)
# This isn't retriable; we don't talk to servers we don't
# understand.
self.shutdown(err=1,retriable=0)
return
self.beginNextMessage()
def beginNextMessage(self):
"""Start writing a message to the connection."""
if not self.messageList:
self.shutdown(0)
return
msg = self.messageList[0]
self.expectedDigest = sha1(msg+"RECEIVED")
msg = SEND_CONTROL+msg+sha1(msg+"SEND")
assert len(msg) == SEND_RECORD_LEN
self.beginWrite(msg)
self.finished = self.__sentMessage
def __sentMessage(self):
"""Called when we're done sending a message. Begins reading the
server's ACK."""
debug("Message delivered to %s (fd %s)", self.address, self.fd)
self.finished = self.__receivedAck
self.expectRead(RECEIVED_RECORD_LEN)
def __receivedAck(self):
"""Called when we're done reading the ACK. If the ACK is bad,
closes the connection. If the ACK is correct, removes the
just-sent message from the connection's internal queue, and
calls sentCallback with the sent message.
If there are more messages to send, begins sending the next.
Otherwise, begins shutting down.
"""
trace("received ack (fd %s)", self.fd)
# FFFF Rehandshake
inp = self.getInput()
if inp != (RECEIVED_CONTROL+self.expectedDigest):
# We only get bad ACKs if an adversary somehow subverts TLS's
# checksumming. That's not fixable.
self.shutdown(err=1,retriable=0)
return
debug("Received valid ACK for message from %s", self.address)
justSent = self.messageList[0]
justSentHandle = self.handleList[0]
del self.messageList[0]
del self.handleList[0]
if self.sentCallback is not None:
self.sentCallback(justSent, justSentHandle)
self.beginNextMessage()
def handleFail(self, retriable):
"""Invoked when a message is not deliverable."""
if self.failCallback is not None:
for msg, handle in zip(self.messageList, self.handleList):
self.failCallback(msg,handle,retriable)
LISTEN_BACKLOG = 10 # ???? Is something else more reasonable?
class MMTPAsyncServer(AsyncServer):
"""A helper class to invoke AsyncServer, MMTPServerConnection, and
MMTPClientConnection, with a function to add new connections, and
callbacks for message success and failure."""
def __init__(self, config, tls):
AsyncServer.__init__(self)
self.context = tls
# FFFF Don't always listen; don't always retransmit!
# FFFF Support listening on specific IPs
self.listener = ListenConnection(config['Incoming/MMTP']['IP'],
config['Incoming/MMTP']['Port'],
LISTEN_BACKLOG,
self._newMMTPConnection)
#self.config = config
self.listener.register(self)
def _newMMTPConnection(self, sock):
"""helper method. Creates and registers a new server connection when
the listener socket gets a hit."""
# FFFF Check whether incoming IP is allowed!
tls = self.context.sock(sock, serverMode=1)
sock.setblocking(0)
con = MMTPServerConnection(sock, tls, self.onMessageReceived)
con.register(self)
return con
def stopListening(self):
self.listener.shutdown()
def sendMessages(self, ip, port, keyID, messages, handles):
"""Begin sending a set of messages to a given server."""
# ???? Can we remove these asserts yet?
for m,h in zip(messages, handles):
assert len(m) == MESSAGE_LEN
assert len(h) < 32
con = MMTPClientConnection(self.context,
ip, port, keyID, messages, handles,
self.onMessageSent,
self.onMessageUndeliverable)
con.register(self)
def onMessageReceived(self, msg):
pass
def onMessageUndeliverable(self, msg, handle, retriable):
pass
def onMessageSent(self, msg, handle):
pass
--- NEW FILE: Modules.py ---
# Copyright 2002 Nick Mathewson. See LICENSE for licensing information.
# $Id: Modules.py,v 1.1 2002/12/11 06:58:55 nickm Exp $
"""mixminion.server.Modules
Code to support pluggable exit module functionality; implementation
for built-in modules.
"""
# FFFF We may, someday, want to support non-exit modules here.
# FFFF Maybe we should refactor MMTP delivery here too.
__all__ = [ 'ModuleManager', 'DeliveryModule',
'DELIVER_OK', 'DELIVER_FAIL_RETRY', 'DELIVER_FAIL_NORETRY'
]
import os
import re
import sys
import smtplib
import socket
import base64
import mixminion.Config
import mixminion.Packet
import mixminion.BuildMessage
from mixminion.Config import ConfigError, _parseBoolean, _parseCommand
from mixminion.Common import LOG, createPrivateDir, MixError, isSMTPMailbox, \
isPrintingAscii
import mixminion.server.Queue
# Return values for processMessage
DELIVER_OK = 1
DELIVER_FAIL_RETRY = 2
DELIVER_FAIL_NORETRY = 3
class DeliveryModule:
"""Abstract base for modules; delivery modules should implement
the methods in this class.
A delivery module has the following responsibilities:
* It must have a 0-argument contructor.
* If it is configurable, it must be able to specify its options,
validate its configuration, and configure itself.
* If it is advertisable, it must provide a server info block.
* It must know its own name.
* It must know which types it handles.
* Of course, it needs to know how to deliver a message."""
# FFFF DeliveryModules need to know about the AsyncServer object in
# FFFF case they support asynchronous delivery.
def __init__(self):
"Zero-argument constructor, as required by Module protocol."
pass
def getConfigSyntax(self):
"""Return a map from section names to section syntax, as described
in Config.py"""
raise NotImplementedError("getConfigSyntax")
def validateConfig(self, sections, entries, lines, contents):
"""See mixminion.Config.validate"""
pass
def configure(self, config, manager):
"""Configure this object using a given Config object, and (if
required) register it with the module manager."""
raise NotImplementedError("configure")
def getServerInfoBlock(self):
"""Return a block for inclusion in a server descriptor."""
raise NotImplementedError("getServerInfoBlock")
def getName(self):
"""Return the name of this module. This name may be used to construct
directory paths, so it shouldn't contain any funny characters."""
raise NotImplementedError("getName")
def getExitTypes(self):
"""Return a sequence of numeric exit types that this module can
handle."""
raise NotImplementedError("getExitTypes")
def createDeliveryQueue(self, queueDir):
"""Return a DeliveryQueue object suitable for delivering messages
via this module. The default implementation returns a
SimpleModuleDeliveryQueue, which (though adequate) doesn't
batch messages intended for the same destination.
For the 'address' component of the delivery queue, modules must
accept a tuple of: (exitType, address, tag). If 'tag' is None,
the message has been decrypted; if 'tag' is 'err', the message is
corrupt. Otherwise, the message is either a reply or an encrypted
forward message
"""
return SimpleModuleDeliveryQueue(self, queueDir)
def processMessage(self, message, tag, exitType, exitInfo):
"""Given a message with a given exitType and exitInfo, try to deliver
it. 'tag' is as decribed in createDeliveryQueue. Return one of:
DELIVER_OK (if the message was successfully delivered),
DELIVER_FAIL_RETRY (if the message wasn't delivered, but might be
deliverable later), or
DELIVER_FAIL_NORETRY (if the message shouldn't be tried later).
(This method is only used by your delivery queue; if you use
a nonstandard delivery queue, you don't need to implement this."""
raise NotImplementedError("processMessage")
class ImmediateDeliveryQueue:
"""Helper class usable as delivery queue for modules that don't
actually want a queue. Such modules should have very speedy
processMessage() methods, and should never have deliery fail."""
##Fields:
# module: the underlying DeliveryModule object.
def __init__(self, module):
self.module = module
def queueDeliveryMessage(self, (exitType, address, tag), message):
"""Instead of queueing our message, pass it directly to the underlying
DeliveryModule."""
try:
res = self.module.processMessage(message, tag, exitType, address)
if res == DELIVER_OK:
return
elif res == DELIVER_FAIL_RETRY:
LOG.error("Unable to retry delivery for message")
else:
LOG.error("Unable to deliver message")
except:
LOG.error_exc(sys.exc_info(),
"Exception delivering message")
def sendReadyMessages(self):
# We do nothing here; we already delivered the messages
pass
class SimpleModuleDeliveryQueue(mixminion.server.Queue.DeliveryQueue):
"""Helper class used as a default delivery queue for modules that
don't care about batching messages to like addresses."""
## Fields:
# module: the underlying module.
def __init__(self, module, directory):
mixminion.server.Queue.DeliveryQueue.__init__(self, directory)
self.module = module
def _deliverMessages(self, msgList):
for handle, addr, message, n_retries in msgList:
try:
exitType, address, tag = addr
result = self.module.processMessage(message,tag,exitType,address)
if result == DELIVER_OK:
self.deliverySucceeded(handle)
elif result == DELIVER_FAIL_RETRY:
self.deliveryFailed(handle, 1)
else:
LOG.error("Unable to deliver message")
self.deliveryFailed(handle, 0)
except:
LOG.error_exc(sys.exc_info(),
"Exception delivering message")
self.deliveryFailed(handle, 0)
class ModuleManager:
"""A ModuleManager knows about all of the server modules in the system.
A module may be in one of three states: unloaded, registered, or
enabled. An unloaded module is just a class in a python module.
A registered module has been loaded, configured, and listed with
the ModuleManager, but will not receive messags until it is
enabled.
Because modules need to tell the ServerConfig object about their
configuration options, initializing the ModuleManager is usually done
through ServerConfig. See ServerConfig.getModuleManager().
To send messages, call 'queueMessage' for each message to send, then
call 'sendReadyMessages'.
"""
##
# Fields
# syntax: extensions to the syntax configuration in Config.py
# modules: a list of DeliveryModule objects
# enabled: a set of enabled DeliveryModule objects
# nameToModule: Map from module name to module
# typeToModule: a map from delivery type to enabled deliverymodule.
# path: search path for python modules.
# queueRoot: directory where all the queues go.
# queues: a map from module name to queue (Queue objects must support
# queueMessage and sendReadyMessages as in DeliveryQueue.)
def __init__(self):
"Create a new ModuleManager"
self.syntax = {}
self.modules = []
self.enabled = {}
self.nameToModule = {}
self.typeToModule = {}
self.path = []
self.queueRoot = None
self.queues = {}
self.registerModule(MBoxModule())
self.registerModule(DropModule())
self.registerModule(MixmasterSMTPModule())
def _setQueueRoot(self, queueRoot):
"""Sets a directory under which all modules' queue directories
should go."""
self.queueRoot = queueRoot
def getConfigSyntax(self):
"""Returns a dict to extend the syntax configuration in a Config
object. Should be called after all modules are registered."""
return self.syntax
def registerModule(self, module):
"""Inform this ModuleManager about a delivery module. This method
updates the syntax options, but does not enable the module."""
LOG.info("Loading module %s", module.getName())
self.modules.append(module)
syn = module.getConfigSyntax()
for sec, rules in syn.items():
if self.syntax.has_key(sec):
raise ConfigError("Multiple modules want to define [%s]"% sec)
self.syntax.update(syn)
self.nameToModule[module.getName()] = module
def setPath(self, path):
"""Sets the search path for Python modules"""
if path:
self.path = path.split(":")
else:
self.path = []
def loadExtModule(self, className):
"""Load and register a module from a python file. Takes a classname
of the format module.Class or package.module.Class. Raises
MixError if the module can't be loaded."""
ids = className.split(".")
pyPkg = ".".join(ids[:-1])
pyClassName = ids[-1]
orig_path = sys.path[:]
LOG.info("Loading module %s", className)
try:
sys.path[0:0] = self.path
try:
m = __import__(pyPkg, {}, {}, [pyClassName])
except ImportError, e:
raise MixError("%s while importing %s" %(str(e),className))
finally:
sys.path = orig_path
try:
pyClass = getattr(m, pyClassName)
except AttributeError, e:
raise MixError("No class %s in module %s" %(pyClassName,pyPkg))
try:
self.registerModule(pyClass())
except Exception, e:
raise MixError("Error initializing module %s" %className)
def validate(self, sections, entries, lines, contents):
# (As in ServerConfig)
for m in self.modules:
m.validateConfig(sections, entries, lines, contents)
def configure(self, config):
self._setQueueRoot(os.path.join(config['Server']['Homedir'],
'work', 'queues', 'deliver'))
createPrivateDir(self.queueRoot)
for m in self.modules:
m.configure(config, self)
def enableModule(self, module):
"""Sets up the module manager to deliver all messages whose exitTypes
are returned by <module>.getExitTypes() to the module."""
for t in module.getExitTypes():
if (self.typeToModule.has_key(t) and
self.typeToModule[t].getName() != module.getName()):
LOG.warn("More than one module is enabled for type %x"%t)
self.typeToModule[t] = module
LOG.info("Module %s: enabled for types %s",
module.getName(),
map(hex, module.getExitTypes()))
queueDir = os.path.join(self.queueRoot, module.getName())
queue = module.createDeliveryQueue(queueDir)
self.queues[module.getName()] = queue
self.enabled[module.getName()] = 1
def cleanQueues(self):
"""Remove trash messages from all internal queues."""
for queue in self.queues.values():
queue.cleanQueue()
def disableModule(self, module):
"""Unmaps all the types for a module object."""
LOG.info("Disabling module %s", module.getName())
for t in module.getExitTypes():
if (self.typeToModule.has_key(t) and
self.typeToModule[t].getName() == module.getName()):
del self.typeToModule[t]
if self.queues.has_key(module.getName()):
del self.queues[module.getName()]
if self.enabled.has_key(module.getName()):
del self.enabled[module.getName()]
def queueMessage(self, message, tag, exitType, address):
"""Queue a message for delivery."""
# FFFF Support non-exit messages.
mod = self.typeToModule.get(exitType, None)
if mod is None:
LOG.error("Unable to handle message with unknown type %s",
exitType)
return
queue = self.queues[mod.getName()]
LOG.debug("Delivering message %r (type %04x) via module %s",
message[:8], exitType, mod.getName())
try:
payload = mixminion.BuildMessage.decodePayload(message, tag)
except MixError:
queue.queueDeliveryMessage((exitType, address, 'err'), message)
return
if payload is None:
# enrypted message
queue.queueDeliveryMessage((exitType, address, tag), message)
else:
# forward message
queue.queueDeliveryMessage((exitType, address, None), payload)
def sendReadyMessages(self):
for name, queue in self.queues.items():
queue.sendReadyMessages()
def getServerInfoBlocks(self):
return [ m.getServerInfoBlock() for m in self.modules
if self.enabled.get(m.getName(),0) ]
#----------------------------------------------------------------------
class DropModule(DeliveryModule):
"""Null-object pattern: drops all messages it receives."""
def getConfigSyntax(self):
return { }
def getServerInfoBlock(self):
return ""
def configure(self, config, manager):
manager.enableModule(self)
def getName(self):
return "DROP"
def getExitTypes(self):
return [ mixminion.Packet.DROP_TYPE ]
def createDeliveryQueue(self, directory):
return ImmediateDeliveryQueue(self)
def processMessage(self, message, tag, exitType, exitInfo):
LOG.debug("Dropping padding message")
return DELIVER_OK
#----------------------------------------------------------------------
class MBoxModule(DeliveryModule):
"""Implementation for MBOX delivery: sends messages, via SMTP, to
addresses from a local file. The file must have the format
addr: smtpaddr
addr: smtpaddr
...
When we receive a message send to 'addr', we deliver it to smtpaddr.
"""
##
# Fields:
# addresses: a map from address to SMTP address
# server: the name of our SMTP server
# addressFile: the location of our address file
# returnAddress: the address we use in our 'From' line
# contact: the contact address we mention in our boilerplate
# nickname: our server nickname; for use in our boilerplate
# addr: our IP address, or "<Unknown IP>": for use in our boilerplate.
def __init__(self):
DeliveryModule.__init__(self)
self.addresses = {}
def getConfigSyntax(self):
# FFFF There should be some way to say that fields are required
# FFFF if the module is enabled.
return { "Delivery/MBOX" :
{ 'Enabled' : ('REQUIRE', _parseBoolean, "no"),
'AddressFile' : ('ALLOW', None, None),
'ReturnAddress' : ('ALLOW', None, None),
'RemoveContact' : ('ALLOW', None, None),
'SMTPServer' : ('ALLOW', None, 'localhost') }
}
def validateConfig(self, sections, entries, lines, contents):
sec = sections['Delivery/MBOX']
if not sec.get('Enabled'):
return
for field in ['AddressFile', 'ReturnAddress', 'RemoveContact',
'SMTPServer']:
if not sec.get(field):
raise ConfigError("Missing field %s in [Delivery/MBOX]"%field)
if not os.path.exists(sec['AddressFile']):
raise ConfigError("Address file %s seems not to exist."%
sec['AddresFile'])
for field in ['ReturnAddress', 'RemoveContact']:
if not isSMTPMailbox(sec[field]):
LOG.warn("Value of %s (%s) doesn't look like an email address",
field, sec[field])
def configure(self, config, moduleManager):
if not config['Delivery/MBOX'].get("Enabled", 0):
moduleManager.disableModule(self)
return
sec = config['Delivery/MBOX']
self.server = sec['SMTPServer']
self.addressFile = sec['AddressFile']
self.returnAddress = sec['ReturnAddress']
self.contact = sec['RemoveContact']
# validate should have caught these.
assert (self.server and self.addressFile and self.returnAddress
and self.contact)
self.nickname = config['Server']['Nickname']
if not self.nickname:
self.nickname = socket.gethostname()
self.addr = config['Incoming/MMTP'].get('IP', "<Unknown IP>")
# Parse the address file.
self.addresses = {}
f = open(self.addressFile)
address_line_re = re.compile(r'\s*([^\s:=]+)\s*[:=]\s*(\S+)')
try:
lineno = 0
while 1:
line = f.readline()
if not line:
break
line = line.strip()
lineno += 1
if line == '' or line[0] == '#':
continue
m = address_line_re.match(line)
if not m:
raise ConfigError("Bad address on line %s of %s"%(
lineno,self.addressFile))
self.addresses[m.group(1)] = m.group(2)
LOG.trace("Mapping MBOX address %s -> %s", m.group(1),
m.group(2))
finally:
f.close()
moduleManager.enableModule(self)
def getServerInfoBlock(self):
return """\
[Delivery/MBOX]
Version: 0.1
"""
def getName(self):
return "MBOX"
def getExitTypes(self):
return [ mixminion.Packet.MBOX_TYPE ]
def processMessage(self, message, tag, exitType, address):
# Determine that message's address;
assert exitType == mixminion.Packet.MBOX_TYPE
LOG.trace("Received MBOX message")
info = mixminion.Packet.parseMBOXInfo(address)
try:
address = self.addresses[info.user]
except KeyError:
LOG.error("Unknown MBOX user %r", info.user)
return DELIVER_FAIL_NORETRY
# Escape the message if it isn't plaintext ascii
msg = _escapeMessageForEmail(message, tag)
# Generate the boilerplate (FFFF Make this configurable)
fields = { 'user': address,
'return': self.returnAddress,
'nickname': self.nickname,
'addr': self.addr,
'contact': self.contact,
'msg': msg }
msg = """\
To: %(user)s
From: %(return)s
Subject: Anonymous Mixminion message
THIS IS AN ANONYMOUS MESSAGE. The mixminion server '%(nickname)s' at
%(addr)s has been configured to deliver messages to your address.
If you do not want to receive messages in the future, contact %(contact)s
and you will be removed.
%(msg)s""" % fields
# Deliver the message
return sendSMTPMessage(self.server, [address], self.returnAddress, msg)
#----------------------------------------------------------------------
class SMTPModule(DeliveryModule):
"""Placeholder for real exit node implementation.
For now, use MixmasterSMTPModule"""
def __init__(self):
DeliveryModule.__init__(self)
def getServerInfoBlock(self):
return "[Delivery/SMTP]\nVersion: 0.1\n"
def getName(self):
return "SMTP"
def getExitTypes(self):
return [ mixminion.Packet.SMTP_TYPE ]
class MixmasterSMTPModule(SMTPModule):
"""Implements SMTP by relaying messages via Mixmaster nodes. This
is kind of unreliable and kludgey, but it does allow us to
test mixminion by usingg Mixmaster nodes as exits."""
# FFFF Mixmaster has tons of options. Maybe we should use 'em...
# FFFF ... or maybe we should deliberately ignore them, since
# FFFF this is only a temporary workaround until enough people
# FFFF are running SMTP exit nodes
## Fields:
# server: The path (usually a single server) to use for outgoing messages.
# Multiple servers should be separated by commas.
# subject: The subject line we use for outgoing messages
# command: The Mixmaster binary.
# options: Options to pass to the Mixmaster binary when queueing messages
# tmpQueue: An auxiliary Queue used to hold files so we can pass them to
# Mixmaster. (This should go away; we should use stdin instead.)
def __init__(self):
SMTPModule.__init__(self)
def getConfigSyntax(self):
return { "Delivery/SMTP-Via-Mixmaster" :
{ 'Enabled' : ('REQUIRE', _parseBoolean, "no"),
'MixCommand' : ('REQUIRE', _parseCommand, None),
'Server' : ('REQUIRE', None, None),
'SubjectLine' : ('ALLOW', None,
'Type-III Anonymous Message'),
}
}
def validateConfig(self, sections, entries, lines, contents):
# Currently, we accept any configuration options that the config allows
pass
def configure(self, config, manager):
sec = config['Delivery/SMTP-Via-Mixmaster']
if not sec.get("Enabled", 0):
manager.disableModule(self)
return
cmd = sec['MixCommand']
self.server = sec['Server']
self.subject = sec['SubjectLine']
self.command = cmd[0]
self.options = tuple(cmd[1]) + ("-l", self.server,
"-s", self.subject)
manager.enableModule(self)
def getName(self):
return "SMTP_MIX2"
def createDeliveryQueue(self, queueDir):
# We create a temporary queue so we can hold files there for a little
# while before passing their names to mixmaster.
self.tmpQueue = mixminion.server.Queue.Queue(queueDir+"_tmp", 1, 1)
self.tmpQueue.removeAll()
return _MixmasterSMTPModuleDeliveryQueue(self, queueDir)
def processMessage(self, message, tag, exitType, smtpAddress):
"""Insert a message into the Mixmaster queue"""
assert exitType == mixminion.Packet.SMTP_TYPE
# parseSMTPInfo will raise a parse error if the mailbox is invalid.
info = mixminion.Packet.parseSMTPInfo(smtpAddress)
msg = _escapeMessageForEmail(message, tag)
handle = self.tmpQueue.queueMessage(msg)
cmd = self.command
opts = self.options + ("-t", info.email,
self.tmpQueue.getMessagePath(handle))
code = os.spawnl(os.P_WAIT, cmd, cmd, *opts)
LOG.debug("Queued Mixmaster message: exit code %s", code)
self.tmpQueue.removeMessage(handle)
return DELIVER_OK
def flushMixmasterPool(self):
"""Send all pending messages from the Mixmaster queue. This
should be called after invocations of processMessage."""
cmd = self.command
LOG.debug("Flushing Mixmaster pool")
os.spawnl(os.P_NOWAIT, cmd, cmd, "-S")
class _MixmasterSMTPModuleDeliveryQueue(SimpleModuleDeliveryQueue):
"""Delivery queue for _MixmasterSMTPModule. Same as
SimpleModuleDeliveryQueue, except that we must call flushMixmasterPool
after queueing messages for Mixmaster."""
def _deliverMessages(self, msgList):
SimpleModuleDeliveryQueue._deliverMessages(self, msgList)
self.module.flushMixmasterPool()
#----------------------------------------------------------------------
def sendSMTPMessage(server, toList, fromAddr, message):
"""Send a single SMTP message. The message will be delivered to
toList, and seem to originate from fromAddr. We use 'server' as an
MTA."""
# FFFF This implementation can stall badly if we don't have a fast
# FFFF local MTA.
LOG.trace("Sending message via SMTP host %s to %s", server, toList)
con = smtplib.SMTP(server)
try:
con.sendmail(fromAddr, toList, message)
res = DELIVER_OK
except smtplib.SMTPException, e:
LOG.warn("Unsuccessful smtp: "+str(e))
res = DELIVER_FAIL_RETRY
con.quit()
con.close()
return res
#----------------------------------------------------------------------
def _escapeMessageForEmail(msg, tag):
"""Helper function: Given a message and tag, escape the message if
it is not plaintext ascii, and wrap it in some standard
boilerplate. Add a disclaimer if the message is not ascii.
msg -- A (possibly decoded) message
tag -- One of: a 20-byte decoding tag [if the message is encrypted
or a reply]
None [if the message is in plaintext]
'err' [if the message was invalid.]
Returns None on an invalid message."""
m = _escapeMessage(msg, tag, text=1)
if m is None:
return None
code, msg, tag = m
if code == 'ENC':
junk_msg = """\
This message is not in plaintext. It's either 1) a reply; 2) a forward
message encrypted to you; or 3) junk.\n\n"""
else:
junk_msg = ""
if tag is not None:
tag = "Decoding handle: "+tag+"\n"
else:
tag = ""
return """\
%s============ ANONYMOUS MESSAGE BEGINS
%s%s============ ANONYMOUS MESSAGE ENDS\n""" %(junk_msg, tag, msg)
def _escapeMessage(message, tag, text=0):
"""Helper: given a decoded message (and possibly its tag), determine
whether the message is a text plaintext message (code='TXT'), a
binary plaintext message (code 'BIN'), or an encrypted message/reply
(code='ENC'). If requested, non-TXT messages are base-64 encoded.
Returns: (code, message, tag (for ENC) or None (for BIN, TXT).
Returns None if the message is invalid.
message -- A (possibly decoded) message
tag -- One of: a 20-byte decoding tag [if the message is encrypted
or a reply]
None [if the message is in plaintext]
'err' [if the message was invalid.]
text -- flag: if true, non-TXT messages must be base64-encoded.
"""
if tag == 'err':
return None
elif tag is not None:
code = "ENC"
else:
assert tag is None
if isPrintingAscii(message, allowISO=1):
code = "TXT"
else:
code = "BIN"
if text and (code != "TXT") :
message = base64.encodestring(message)
if text and tag:
tag = base64.encodestring(tag).strip()
return code, message, tag
--- NEW FILE: PacketHandler.py ---
# Copyright 2002 Nick Mathewson. See LICENSE for licensing information.
# $Id: PacketHandler.py,v 1.1 2002/12/11 06:58:55 nickm Exp $
"""mixminion.PacketHandler: Code to process mixminion packets on a server"""
import mixminion.Crypto as Crypto
import mixminion.Packet as Packet
import mixminion.Common as Common
__all__ = [ 'PacketHandler', 'ContentError' ]
class ContentError(Common.MixError):
"""Exception raised when a packed is malformatted or unacceptable."""
pass
class PacketHandler:
"""Class to handle processing packets. Given an incoming packet,
it removes one layer of encryption, does all necessary integrity
checks, swaps headers if necessary, re-pads, and decides whether
to drop the message, relay the message, or send the message to
an exit handler."""
## Fields:
# privatekey: list of RSA private keys that we accept
# hashlog: list of HashLog objects corresponding to the keys.
def __init__(self, privatekey, hashlog):
"""Constructs a new packet handler, given a private key object for
header encryption, and a hashlog object to prevent replays.
A sequence of private keys may be provided, if you'd like the
server to accept messages encrypted with any of them. Beware,
though: PK decryption is expensive. Also, a hashlog must be
provided for each private key.
"""
try:
# Check whether we have a key or a sequence of keys.
_ = privatekey[0]
assert len(hashlog) == len(privatekey)
self.privatekey = privatekey
self.hashlog = hashlog
except TypeError:
# Privatekey is not be subscriptable; we must have only one.
self.privatekey = (privatekey, )
self.hashlog = (hashlog, )
def syncLogs(self):
"""Sync all this PacketHandler's hashlogs."""
for h in self.hashlog:
h.sync()
def close(self):
"""Close all this PacketHandler's hashlogs."""
for h in self.hashlog:
h.close()
def processMessage(self, msg):
"""Given a 32K mixminion message, processes it completely.
Returns one of:
None [if the mesesage should be dropped.]
("EXIT",
(exit_type, exit_info, application_key,
tag, payload)) [if this is the exit node]
("QUEUE", (ipv4info, message_out))
[if this is a forwarding node]
May raise CryptoError, ParseError, or ContentError if the packet
is malformatted, misencrypted, unparseable, repeated, or otherwise
unhandleable.
WARNING: This implementation does nothing to prevent timing
attacks: dropped messages, messages with bad digests, replayed
messages, and exit messages are all processed faster than
forwarded messages. You must prevent timing attacks elsewhere."""
# Break into headers and payload
msg = Packet.parseMessage(msg)
header1 = Packet.parseHeader(msg.header1)
# Try to decrypt the first subheader. Try each private key in
# order. Only fail if all private keys fail.
subh = None
e = None
for pk, hashlog in zip(self.privatekey, self.hashlog):
try:
subh = Crypto.pk_decrypt(header1[0], pk)
break
except Crypto.CryptoError, err:
e = err
if not subh:
# Nobody managed to get us the first subheader. Raise the
# most-recently-received error.
raise e
subh = Packet.parseSubheader(subh) #may raise ParseError
# Check the version: can we read it?
if subh.major != Packet.MAJOR_NO or subh.minor != Packet.MINOR_NO:
raise ContentError("Invalid protocol version")
# Check the digest of all of header1 but the first subheader.
if subh.digest != Crypto.sha1(header1[1:]):
raise ContentError("Invalid digest")
# Get ready to generate message keys.
keys = Crypto.Keyset(subh.secret)
# Replay prevention
replayhash = keys.get(Crypto.REPLAY_PREVENTION_MODE, 20)
if hashlog.seenHash(replayhash):
raise ContentError("Duplicate message detected.")
else:
hashlog.logHash(replayhash)
# If we're meant to drop, drop now.
rt = subh.routingtype
if rt == Packet.DROP_TYPE:
return None
# Prepare the key to decrypt the header in counter mode. We'll be
# using this more than once.
header_sec_key = Crypto.aes_key(keys.get(Crypto.HEADER_SECRET_MODE))
# If the subheader says that we have extra blocks of routing info,
# decrypt and parse them now.
if subh.isExtended():
nExtra = subh.getNExtraBlocks()
if (rt < Packet.MIN_EXIT_TYPE) or (nExtra > 15):
# None of the native methods allow multiple blocks; no
# size can be longer than the number of bytes in the rest
# of the header.
raise ContentError("Impossibly long routing info length")
extra = Crypto.ctr_crypt(header1[1:1+nExtra], header_sec_key)
subh.appendExtraBlocks(extra)
remainingHeader = header1[1+nExtra:]
else:
nExtra = 0
remainingHeader = header1[1:]
# Decrypt the payload.
payload = Crypto.lioness_decrypt(msg.payload,
keys.getLionessKeys(Crypto.PAYLOAD_ENCRYPT_MODE))
# If we're an exit node, there's no need to process the headers
# further.
if rt >= Packet.MIN_EXIT_TYPE:
return ("EXIT",
(rt, subh.getExitAddress(),
keys.get(Crypto.APPLICATION_KEY_MODE),
subh.getTag(),
payload))
# If we're not an exit node, make sure that what we recognize our
# routing type.
if rt not in (Packet.SWAP_FWD_TYPE, Packet.FWD_TYPE):
raise ContentError("Unrecognized Mixminion routing type")
# Pad the rest of header 1
remainingHeader = remainingHeader +\
Crypto.prng(keys.get(Crypto.PRNG_MODE),
Packet.HEADER_LEN-len(remainingHeader))
# Decrypt the rest of header 1, encrypting the padding.
header1 = Crypto.ctr_crypt(remainingHeader, header_sec_key, nExtra*128)
# Decrypt header 2.
header2 = Crypto.lioness_decrypt(msg.header2,
keys.getLionessKeys(Crypto.HEADER_ENCRYPT_MODE))
# If we're the swap node, (1) decrypt the payload with a hash of
# header2... (2) decrypt header2 with a hash of the payload...
# (3) and swap the headers.
if rt == Packet.SWAP_FWD_TYPE:
hkey = Crypto.lioness_keys_from_header(header2)
payload = Crypto.lioness_decrypt(payload, hkey)
hkey = Crypto.lioness_keys_from_payload(payload)
header2 = Crypto.lioness_decrypt(header2, hkey)
header1, header2 = header2, header1
# Build the address object for the next hop
address = Packet.parseIPV4Info(subh.routinginfo)
# Construct the message for the next hop.
msg = Packet.Message(header1, header2, payload).pack()
return ("QUEUE", (address, msg))
--- NEW FILE: Queue.py ---
# Copyright 2002 Nick Mathewson. See LICENSE for licensing information.
# $Id: Queue.py,v 1.1 2002/12/11 06:58:55 nickm Exp $
"""mixminion.server.Queue
Facility for fairly secure, directory-based, unordered queues.
"""
import os
import base64
import time
import stat
import cPickle
from mixminion.Common import MixError, MixFatalError, secureDelete, LOG, \
createPrivateDir
from mixminion.Crypto import AESCounterPRNG
__all__ = [ 'Queue', 'DeliveryQueue', 'TimedMixQueue', 'CottrellMixQueue',
'BinomialCottrellMixQueue' ]
# Mode to pass to open(2) for creating a new file, and dying if it already
# exists.
_NEW_MESSAGE_FLAGS = os.O_WRONLY+os.O_CREAT+os.O_EXCL
# On windows or mac, binary != text.
_NEW_MESSAGE_FLAGS += getattr(os, 'O_BINARY', 0)
# Any inp_* files older than INPUT_TIMEOUT seconds old are assumed to be
# trash.
INPUT_TIMEOUT = 6000
# If we've been cleaning for more than CLEAN_TIMEOUT seconds, assume the
# old clean is dead.
CLEAN_TIMEOUT = 120
class Queue:
"""A Queue is an unordered collection of files with secure insert, move,
and delete operations.
Implementation: a queue is a directory of 'messages'. Each
filename in the directory has a name in one of the following
formats:
rmv_HANDLE (A message waiting to be deleted)
msg_HANDLE (A message waiting in the queue.
inp_HANDLE (An incomplete message being created.)
(Where HANDLE is a randomly chosen 12-character selection from the
characters 'A-Za-z0-9+-'. [Collision probability is negligable.])
"""
# How negligible? A back-of-the-envelope approximation: The chance
# of a collision reaches .1% when you have 3e9 messages in a single
# queue. If Alice somehow manages to accumulate a 96 gigabyte
# backlog, we'll have bigger problems than name collision... such
# as the fact that most Unices behave badly when confronted with
# 3 billion files in the same directory... or the fact that,
# at today's processor speeds, it will take Alice 3 or 4
# CPU-years to clear her backlog.
# Fields: rng--a random number generator for creating new messages
# and getting a random slice of the queue.
# dir--the location of the queue.
# n_entries: the number of complete messages in the queue.
# <0 if we haven't counted yet.
def __init__(self, location, create=0, scrub=0):
"""Creates a queue object for a given directory, 'location'. If
'create' is true, creates the directory if necessary. If 'scrub'
is true, removes any incomplete or invalidated messages from the
Queue."""
secureDelete([]) # Make sure secureDelete is configured. HACK!
self.rng = AESCounterPRNG()
self.dir = location
if not os.path.isabs(location):
LOG.warn("Queue path %s isn't absolute.", location)
if os.path.exists(location) and not os.path.isdir(location):
raise MixFatalError("%s is not a directory" % location)
createPrivateDir(location, nocreate=(not create))
if scrub:
self.cleanQueue()
# Count messages on first time through.
self.n_entries = -1
def queueMessage(self, contents):
"""Creates a new message in the queue whose contents are 'contents',
and returns a handle to that message."""
f, handle = self.openNewMessage()
f.write(contents)
self.finishMessage(f, handle)
return handle
def queueObject(self, object):
"""Queue an object using cPickle, and return a handle to that
object."""
f, handle = self.openNewMessage()
cPickle.dump(object, f, 1)
self.finishMessage(f, handle)
return handle
def count(self, recount=0):
"""Returns the number of complete messages in the queue."""
if self.n_entries >= 0 and not recount:
return self.n_entries
else:
res = 0
for fn in os.listdir(self.dir):
if fn.startswith("msg_"):
res += 1
self.n_entries = res
return res
def pickRandom(self, count=None):
"""Returns a list of 'count' handles to messages in this queue.
The messages are chosen randomly, and returned in a random order.
If there are fewer than 'count' messages in the queue, all the
messages will be retained."""
handles = [ fn[4:] for fn in os.listdir(self.dir)
if fn.startswith("msg_") ]
return self.rng.shuffle(handles, count)
def getAllMessages(self):
"""Returns handles for all messages currently in the queue.
Note: this ordering is not guaranteed to be random"""
return [fn[4:] for fn in os.listdir(self.dir) if fn.startswith("msg_")]
def removeMessage(self, handle):
"""Given a handle, removes the corresponding message from the queue."""
self.__changeState(handle, "msg", "rmv")
def removeAll(self):
"""Removes all messages from this queue."""
for m in os.listdir(self.dir):
if m[:4] in ('inp_', 'msg_'):
self.__changeState(m[4:], m[:3], "rmv")
self.n_entries = 0
self.cleanQueue()
def moveMessage(self, handle, queue):
"""Given a handle and a queue, moves the corresponding message from
this queue to the queue provided. Returns a new handle for
the message in the destination queue."""
# Since we're switching handle, we don't want to just rename;
# We really want to copy and delete the old file.
newHandle = queue.queueMessage(self.messageContents(handle))
self.removeMessage(handle)
return newHandle
def getMessagePath(self, handle):
"""Given a handle for an existing message, return the name of the
file that contains that message."""
return os.path.join(self.dir, "msg_"+handle)
def openMessage(self, handle):
"""Given a handle for an existing message, returns a file descriptor
open to read that message."""
return open(os.path.join(self.dir, "msg_"+handle), 'rb')
def messageContents(self, handle):
"""Given a message handle, returns the contents of the corresponding
message."""
f = open(os.path.join(self.dir, "msg_"+handle), 'rb')
s = f.read()
f.close()
return s
def getObject(self, handle):
"""Given a message handle, read and unpickle the contents of the
corresponding message."""
f = open(os.path.join(self.dir, "msg_"+handle), 'rb')
res = cPickle.load(f)
f.close()
return res
def openNewMessage(self):
"""Returns (file, handle) tuple to create a new message. Once
you're done writing, you must call finishMessage to
commit your changes, or abortMessage to reject them."""
handle = self.__newHandle()
fname = os.path.join(self.dir, "inp_"+handle)
fd = os.open(fname, _NEW_MESSAGE_FLAGS, 0600)
return os.fdopen(fd, 'wb'), handle
def finishMessage(self, f, handle):
"""Given a file and a corresponding handle, closes the file
commits the corresponding message."""
f.close()
self.__changeState(handle, "inp", "msg")
def abortMessage(self, f, handle):
"""Given a file and a corresponding handle, closes the file
rejects the corresponding message."""
f.close()
self.__changeState(handle, "inp", "rmv")
def cleanQueue(self):
"""Removes all timed-out or trash messages from the queue.
Returns 1 if a clean is already in progress; otherwise
returns 0.
"""
now = time.time()
cleanFile = os.path.join(self.dir,".cleaning")
cleaning = 1
while cleaning:
try:
# Try to get the .cleaning lock file. If we can create it,
# we're the only cleaner around.
fd = os.open(cleanFile, os.O_WRONLY+os.O_CREAT+os.O_EXCL, 0600)
os.write(fd, str(now))
os.close(fd)
cleaning = 0
except OSError:
try:
# If we can't create the file, see if it's too old. If it
# is too old, delete it and try again. If it isn't, there
# may be a live clean in progress.
s = os.stat(cleanFile)
if now - s[stat.ST_MTIME] > CLEAN_TIMEOUT:
os.unlink(cleanFile)
else:
return 1
except OSError:
# If the 'stat' or 'unlink' calls above fail, then
# .cleaning must not exist, or must not be readable
# by us.
if os.path.exists(cleanFile):
# In the latter case, bail out.
return 1
rmv = []
allowedTime = int(time.time()) - INPUT_TIMEOUT
for m in os.listdir(self.dir):
if m.startswith("rmv_"):
rmv.append(os.path.join(self.dir, m))
elif m.startswith("inp_"):
s = os.stat(m)
if s[stat.ST_MTIME] < allowedTime:
self.__changeState(m[4:], "inp", "rmv")
rmv.append(os.path.join(self.dir, m))
_secureDelete_bg(rmv, cleanFile)
return 0
def __changeState(self, handle, s1, s2):
"""Helper method: changes the state of message 'handle' from 's1'
to 's2', and changes the internal count."""
os.rename(os.path.join(self.dir, s1+"_"+handle),
os.path.join(self.dir, s2+"_"+handle))
if self.n_entries < 0:
return
if s1 == 'msg' and s2 != 'msg':
self.n_entries -= 1
elif s1 != 'msg' and s2 == 'msg':
self.n_entries += 1
def __newHandle(self):
"""Helper method: creates a new random handle."""
junk = self.rng.getBytes(9)
return base64.encodestring(junk).strip().replace("/","-")
class DeliveryQueue(Queue):
"""A DeliveryQueue implements a queue that greedily sends messages
to outgoing streams that occasionally fail. Messages in a
DeliveryQueue are no longer unstructured text, but rather
tuples of: (n_retries, addressing info, msg).
This class is abstract. Implementors of this class should
subclass it to add a _deliverMessages method. Multiple
invocations of this method may be active at a given time. Upon
success or failure, this method should cause deliverySucceeded
or deliveryFailed to be called as appropriate.
Users of this class will probably only want to call the queueMessage,
sendReadyMessages, and nextMessageReadyAt methods.
This class caches information about the directory state; it
won't play nice if multiple instances are looking at the same
directory.
"""
###
# Fields:
# sendable -- A list of handles for all messages
# that we're not currently sending.
# pending -- Dict from handle->1, for all messages that we're
# currently sending.
def __init__(self, location):
Queue.__init__(self, location, create=1, scrub=1)
self._rescan()
def _rescan(self):
"""Rebuild the internal state of this queue from the underlying
directory."""
self.pending = {}
self.sendable = self.getAllMessages()
def queueMessage(self, msg):
if 1: raise MixError("Tried to call DeliveryQueue.queueMessage.")
def queueDeliveryMessage(self, addr, msg, retry=0):
"""Schedule a message for delivery.
addr -- An object to indicate the message's destination
msg -- the message itself
retry -- how many times so far have we tried to send?"""
handle = self.queueObject( (retry, addr, msg) )
self.sendable.append(handle)
return handle
def get(self,handle):
"""Returns a (n_retries, addr, msg) payload for a given
message handle."""
return self.getObject(handle)
def sendReadyMessages(self):
"""Sends all messages which are not already being sent."""
handles = self.sendable
messages = []
self.sendable = []
for h in handles:
retries, addr, msg = self.getObject(h)
messages.append((h, addr, msg, retries))
self.pending[h] = 1
if messages:
self._deliverMessages(messages)
def _deliverMessages(self, msgList):
"""Abstract method; Invoked with a list of
(handle, addr, message, n_retries) tuples every time we have a batch
of messages to send.
For every handle in the list, delierySucceeded or deliveryFailed
should eventually be called, or the message will sit in the queue
indefinitely, without being retried."""
# We could implement this as a single _deliverMessage(h,addr,m,n)
# method, but that wouldn't allow implementations to batch
# messages being sent to the same address.
raise NotImplementedError("_deliverMessages")
def deliverySucceeded(self, handle):
"""Removes a message from the outgoing queue. This method
should be invoked after the corresponding message has been
successfully delivered.
"""
self.removeMessage(handle)
del self.pending[handle]
def deliveryFailed(self, handle, retriable=0):
"""Removes a message from the outgoing queue, or requeues it
for delivery at a later time. This method should be
invoked after the corresponding message has been
successfully delivered."""
del self.pending[handle]
if retriable:
# Queue the new one before removing the old one, for
# crash-proofness
retries, addr, msg = self.getObject(handle)
# FFFF This test makes us never retry past the 10th attempt.
# FFFF That's wrong; we should be smarter.
if retries <= 10:
self.queueDeliveryMessage(addr, msg, retries+1)
self.removeMessage(handle)
class TimedMixQueue(Queue):
"""A TimedMixQueue holds a group of files, and returns some of them
as requested, according to a mixing algorithm that sends a batch
of messages every N seconds."""
# FFFF : interval is unused.
## Fields:
# interval: scanning interval, in seconds.
def __init__(self, location, interval=600):
"""Create a TimedMixQueue that sends its entire batch of messages
every 'interval' seconds."""
Queue.__init__(self, location, create=1, scrub=1)
self.interval = interval
def getBatch(self):
"""Return handles for all messages that the pool is currently ready
to send in the next batch"""
return self.pickRandom()
def getInterval(self):
return self.interval
class CottrellMixQueue(TimedMixQueue):
"""A CottrellMixQueue holds a group of files, and returns some of them
as requested, according the Cottrell (timed dynamic-pool) mixing
algorithm from Mixmaster."""
# FFFF : interval is unused.
## Fields:
# interval: scanning interval, in seconds.
# minPool: Minimum number of messages to keep in pool.
# minSend: Minimum number of messages above minPool before we consider
# sending.
# sendRate: Largest fraction of the pool to send at a time.
def __init__(self, location, interval=600, minPool=6, minSend=1,
sendRate=.7):
"""Create a new queue that yields a batch of message every 'interval'
seconds, always keeps <minPool> messages in the pool, never sends
unless it has <minPool>+<minSend> messages, and never sends more
than <sendRate> * the corrent pool size.
If 'minSend'==1, this is a real Cottrell (type-II) mix pool.
Otherwise, this is a generic 'timed dynamic-pool' mix pool. (Note
that there is still a matter of some controversy whether it ever
makes sense to set minSend != 1.)
"""
# Note that there was a bit of confusion here: earlier versions
# implemented an algorithm called "mixmaster" that wasn't actually the
# mixmaster algorithm. I picked up the other algorithm from an early
# draft of Roger, Paul, and Andrei's 'Batching Taxonomy' paper (since
# corrected); they seem to have gotten it from Anja Jerichow's
# Phd. thesis ("Generalisation and Security Improvement of
# Mix-mediated Anonymous Communication") of 2000.
#
# *THIS* is the algorithm that the current 'Batching Taxonomy' paper
# says that Cottrell says is the real thing.
TimedMixQueue.__init__(self, location, interval)
self.minPool = minPool
self.minSend = minSend
self.sendRate = sendRate
def _getBatchSize(self):
"Helper method: returns the number of messages to send."
pool = self.count()
if pool >= (self.minPool + self.minSend):
sendable = pool - self.minPool
return min(sendable, max(1, int(pool * self.sendRate)))
else:
return 0
def getBatch(self):
"Returns a list of handles for the next batch of messages to send."
n = self._getBatchSize()
if n:
return self.pickRandom(n)
else:
return []
class BinomialCottrellMixQueue(CottrellMixQueue):
"""Same algorithm as CottrellMixQueue, but instead of sending N messages
from the pool of size P, sends each message with probability N/P."""
def getBatch(self):
n = self._getBatchSize()
if n == 0:
return []
msgProbability = n / float(self.count())
return self.rng.shuffle([ h for h in self.getAllMessages()
if self.rng.getFloat() < msgProbability ])
def _secureDelete_bg(files, cleanFile):
"""Helper method: delete files in another thread, removing 'cleanFile'
once we're done."""
pid = os.fork()
if pid != 0:
return pid
# Now we're in the child process.
try:
secureDelete(files, blocking=1)
except OSError:
# This is sometimes thrown when shred finishes before waitpid.
pass
try:
os.unlink(cleanFile)
except OSError:
pass
os._exit(0)
return None # Never reached.
--- NEW FILE: ServerMain.py ---
# Copyright 2002 Nick Mathewson. See LICENSE for licensing information.
# $Id: ServerMain.py,v 1.1 2002/12/11 06:58:55 nickm Exp $
"""mixminion.ServerMain
The main loop and related functionality for a Mixminion server.
See the "MixminionServer" class for more information about how it
all works. """
#FFFF We need support for encrypting private keys.
import os
import getopt
import sys
import time
import bisect
import mixminion._minionlib
import mixminion.Crypto
from mixminion.ServerInfo import ServerKeyset, ServerInfo, \
generateServerDescriptorAndKeys
from mixminion.Common import LOG, MixFatalError, MixError, secureDelete, \
createPrivateDir, previousMidnight, ceilDiv, formatDate, formatTime
import mixminion.server.Queue
import mixminion.server.MMTPServer
import mixminion.server.Modules
import mixminion.server.HashLog
import mixminion.server.PacketHandler
class ServerKeyring:
"""A ServerKeyring remembers current and future keys, descriptors, and
hash logs for a mixminion server.
FFFF We need a way to generate keys as needed, not just a month's
FFFF worth of keys up front.
"""
## Fields:
# homeDir: server home directory
# keyDir: server key directory
# keySloppiness: fudge-factor: how forgiving are we about key liveness?
# keyIntervals: list of (start, end, keyset Name)
# liveKey: list of (start, end, keyset name for current key.)
# nextRotation: time_t when this key expires.
# keyRange: tuple of (firstKey, lastKey) to represent which key names
# have keys on disk.
## Directory layout:
# MINION_HOME/work/queues/incoming/ [Queue of received,unprocessed pkts]
# mix/ [Mix pool]
# outgoing/ [Messages for mmtp delivery]
# deliver/mbox/ []
# tls/dhparam [Diffie-Hellman parameters]
# hashlogs/hash_1* [HashLogs of packet hashes
# hash_2* corresponding to key sets]
# ...
# log [Messages from the server]
# keys/identity.key [Long-lived identity PK]
# key_1/ServerDesc [Server descriptor]
# mix.key [packet key]
# mmtp.key [mmtp key]
# mmtp.cert [mmmtp key x509 cert]
# key_2/...
# conf/miniond.conf [configuration file]
# ....
# FFFF Support to put keys/queues in separate directories.
def __init__(self, config):
"Create a ServerKeyring from a config object"
self.configure(config)
def configure(self, config):
"Set up a SeverKeyring from a config object"
self.config = config
self.homeDir = config['Server']['Homedir']
self.keyDir = os.path.join(self.homeDir, 'keys')
self.hashDir = os.path.join(self.homeDir, 'work', 'hashlogs')
self.keySloppiness = config['Server']['PublicKeySloppiness'][2]
self.checkKeys()
def checkKeys(self):
"""Internal method: read information about all this server's
currently-prepared keys from disk."""
self.keyIntervals = []
firstKey = sys.maxint
lastKey = 0
LOG.debug("Scanning server keystore at %s", self.keyDir)
if not os.path.exists(self.keyDir):
LOG.info("Creating server keystore at %s", self.keyDir)
createPrivateDir(self.keyDir)
# Iterate over the entires in HOME/keys
for dirname in os.listdir(self.keyDir):
# Skip any that aren't directories named "key_INT"
if not os.path.isdir(os.path.join(self.keyDir,dirname)):
continue
if not dirname.startswith('key_'):
LOG.warn("Unexpected directory %s under %s",
dirname, self.keyDir)
continue
keysetname = dirname[4:]
try:
setNum = int(keysetname)
# keep trace of the first and last used key number
if setNum < firstKey: firstKey = setNum
if setNum > lastKey: lastKey = setNum
except ValueError:
LOG.warn("Unexpected directory %s under %s",
dirname, self.keyDir)
continue
# Find the server descriptor...
d = os.path.join(self.keyDir, dirname)
si = os.path.join(d, "ServerDesc")
if os.path.exists(si):
inf = ServerInfo(fname=si, assumeValid=1)
# And find out when it's valid.
t1 = inf['Server']['Valid-After']
t2 = inf['Server']['Valid-Until']
self.keyIntervals.append( (t1, t2, keysetname) )
LOG.debug("Found key %s (valid from %s to %s)",
dirname, formatDate(t1), formatDate(t2))
else:
LOG.warn("No server descriptor found for key %s"%dirname)
# Now, sort the key intervals by starting time.
self.keyIntervals.sort()
self.keyRange = (firstKey, lastKey)
# Now we try to see whether we have more or less than 1 key in effect
# for a given time.
for idx in xrange(len(self.keyIntervals)-1):
end = self.keyIntervals[idx][1]
start = self.keyIntervals[idx+1][0]
if start < end:
LOG.warn("Multiple keys for %s. That's unsupported.",
formatDate(end))
elif start > end:
LOG.warn("Gap in key schedule: no key from %s to %s",
formatDate(end), formatDate(start))
self.nextKeyRotation = 0 # Make sure that now > nextKeyRotation before
# we call _getLiveKey()
self._getLiveKey() # Set up liveKey, nextKeyRotation.
def getIdentityKey(self):
"""Return this server's identity key. Generate one if it doesn't
exist."""
password = None # FFFF Use this, somehow.
fn = os.path.join(self.keyDir, "identity.key")
bits = self.config['Server']['IdentityKeyBits']
if os.path.exists(fn):
key = mixminion.Crypto.pk_PEM_load(fn, password)
keylen = key.get_modulus_bytes()*8
if keylen != bits:
LOG.warn(
"Stored identity key has %s bits, but you asked for %s.",
keylen, bits)
else:
LOG.info("Generating identity key. (This may take a while.)")
key = mixminion.Crypto.pk_generate(bits)
mixminion.Crypto.pk_PEM_save(key, fn, password)
LOG.info("Generated %s-bit identity key.", bits)
return key
def removeIdentityKey(self):
"""Remove this server's identity key."""
fn = os.path.join(self.keyDir, "identity.key")
if not os.path.exists(fn):
LOG.info("No identity key to remove.")
else:
LOG.warn("Removing identity key in 10 seconds")
time.sleep(10)
LOG.warn("Removing identity key")
secureDelete([fn], blocking=1)
dhfile = os.path.join(self.homeDir, 'work', 'tls', 'dhparam')
if os.path.exists('dhfile'):
LOG.info("Removing diffie-helman parameters file")
secureDelete([dhfile], blocking=1)
def createKeys(self, num=1, startAt=None):
"""Generate 'num' public keys for this server. If startAt is provided,
make the first key become valid at'startAt'. Otherwise, make the
first key become valid right after the last key we currently have
expires. If we have no keys now, make the first key start now."""
# FFFF Use this.
#password = None
if startAt is None:
if self.keyIntervals:
startAt = self.keyIntervals[-1][1]+60
else:
startAt = time.time()+60
startAt = previousMidnight(startAt)
firstKey, lastKey = self.keyRange
for _ in xrange(num):
if firstKey == sys.maxint:
keynum = firstKey = lastKey = 1
elif firstKey > 1:
firstKey -= 1
keynum = firstKey
else:
lastKey += 1
keynum = lastKey
keyname = "%04d" % keynum
nextStart = startAt + self.config['Server']['PublicKeyLifetime'][2]
LOG.info("Generating key %s to run from %s through %s (GMT)",
keyname, formatDate(startAt),
formatDate(nextStart-3600))
generateServerDescriptorAndKeys(config=self.config,
identityKey=self.getIdentityKey(),
keyname=keyname,
keydir=self.keyDir,
hashdir=self.hashDir,
validAt=startAt)
startAt = nextStart
self.checkKeys()
def removeDeadKeys(self, now=None):
"""Remove all keys that have expired"""
self.checkKeys()
if now is None:
now = time.time()
expiryStr = " expired"
else:
expiryStr = ""
cutoff = now - self.keySloppiness
dirs = [ os.path.join(self.keyDir,"key_"+name)
for va, vu, name in self.keyIntervals if vu < cutoff ]
for dirname, (va, vu, name) in zip(dirs, self.keyIntervals):
LOG.info("Removing%s key %s (valid from %s through %s)",
expiryStr, name, formatDate(va), formatDate(vu-3600))
files = [ os.path.join(dirname,f)
for f in os.listdir(dirname) ]
secureDelete(files, blocking=1)
os.rmdir(dirname)
self.checkKeys()
def _getLiveKey(self, when=None):
"""Find the first key that is now valid. Return (Valid-after,
valid-util, name)."""
if not self.keyIntervals:
self.liveKey = None
self.nextKeyRotation = 0
return None
w = when
if when is None:
when = time.time()
if when < self.nextKeyRotation:
return self.liveKey
idx = bisect.bisect(self.keyIntervals, (when, None, None))-1
k = self.keyIntervals[idx]
if w is None:
self.liveKey = k
self.nextKeyRotation = k[1]
return k
def getNextKeyRotation(self):
"""Return the expiration time of the current key"""
return self.nextKeyRotation
def getServerKeyset(self):
"""Return a ServerKeyset object for the currently live key."""
# FFFF Support passwords on keys
_, _, name = self._getLiveKey()
keyset = ServerKeyset(self.keyDir, name, self.hashDir)
keyset.load()
return keyset
def getDHFile(self):
"""Return the filename for the diffie-helman parameters for the
server. Creates the file if it doesn't yet exist."""
dhdir = os.path.join(self.homeDir, 'work', 'tls')
createPrivateDir(dhdir)
dhfile = os.path.join(dhdir, 'dhparam')
if not os.path.exists(dhfile):
LOG.info("Generating Diffie-Helman parameters for TLS...")
mixminion._minionlib.generate_dh_parameters(dhfile, verbose=0)
LOG.info("...done")
else:
LOG.debug("Using existing Diffie-Helman parameter from %s",
dhfile)
return dhfile
def getTLSContext(self):
"""Create and return a TLS context from the currently live key."""
keys = self.getServerKeyset()
return mixminion._minionlib.TLSContext_new(keys.getCertFileName(),
keys.getMMTPKey(),
self.getDHFile())
def getPacketHandler(self):
"""Create and return a PacketHandler from the currently live key."""
keys = self.getServerKeyset()
packetKey = keys.getPacketKey()
hashlog = mixminion.server.HashLog.HashLog(keys.getHashLogFileName(),
keys.getMMTPKeyID())
return mixminion.server.PacketHandler.PacketHandler(packetKey,
hashlog)
class IncomingQueue(mixminion.server.Queue.DeliveryQueue):
"""A DeliveryQueue to accept messages from incoming MMTP connections,
process them with a packet handler, and send them into a mix pool."""
def __init__(self, location, packetHandler):
"""Create an IncomingQueue that stores its messages in <location>
and processes them through <packetHandler>."""
mixminion.server.Queue.DeliveryQueue.__init__(self, location)
self.packetHandler = packetHandler
self.mixPool = None
def connectQueues(self, mixPool):
"""Sets the target mix queue"""
self.mixPool = mixPool
def queueMessage(self, msg):
"""Add a message for delivery"""
LOG.trace("Inserted message %r into incoming queue", msg[:8])
self.queueDeliveryMessage(None, msg)
def _deliverMessages(self, msgList):
"Implementation of abstract method from DeliveryQueue."
ph = self.packetHandler
for handle, _, message, n_retries in msgList:
try:
res = ph.processMessage(message)
if res is None:
# Drop padding before it gets to the mix.
LOG.debug("Padding message %r dropped",
message[:8])
else:
LOG.debug("Processed message %r; inserting into pool",
message[:8])
self.mixPool.queueObject(res)
self.deliverySucceeded(handle)
except mixminion.Crypto.CryptoError, e:
LOG.warn("Invalid PK or misencrypted packet header: %s",
e)
self.deliveryFailed(handle)
except mixminion.Packet.ParseError, e:
LOG.warn("Malformed message dropped: %s", e)
self.deliveryFailed(handle)
except mixminion.server.PacketHandler.ContentError, e:
LOG.warn("Discarding bad packet: %s", e)
self.deliveryFailed(handle)
class MixPool:
"""Wraps a mixminion.server.Queue.*MixQueue to send messages to an exit queue
and a delivery queue."""
def __init__(self, queue):
"""Create a new MixPool to wrap a given *MixQueue."""
self.queue = queue
self.outgoingQueue = None
self.moduleManager = None
def queueObject(self, obj):
"""Insert an object into the queue."""
self.queue.queueObject(obj)
def count(self):
"Return the number of messages in the queue"
return self.queue.count()
def connectQueues(self, outgoing, manager):
"""Sets the queue for outgoing mixminion packets, and the
module manager for deliverable messages."""
self.outgoingQueue = outgoing
self.moduleManager = manager
def mix(self):
"""Get a batch of messages, and queue them for delivery as
appropriate."""
handles = self.queue.getBatch()
LOG.debug("Mixing %s messages out of %s",
len(handles), self.queue.count())
for h in handles:
tp, info = self.queue.getObject(h)
if tp == 'EXIT':
rt, ri, app_key, tag, payload = info
LOG.debug(" (sending message %r to exit modules)",
payload[:8])
self.moduleManager.queueMessage(payload, tag, rt, ri)
else:
assert tp == 'QUEUE'
ipv4, msg = info
LOG.debug(" (sending message %r to MMTP server)",
msg[:8])
self.outgoingQueue.queueDeliveryMessage(ipv4, msg)
self.queue.removeMessage(h)
class OutgoingQueue(mixminion.server.Queue.DeliveryQueue):
"""DeliveryQueue to send messages via outgoing MMTP connections."""
def __init__(self, location):
"""Create a new OutgoingQueue that stores its messages in a given
location."""
mixminion.server.Queue.DeliveryQueue.__init__(self, location)
self.server = None
def connectQueues(self, server):
"""Set the MMTPServer that this OutgoingQueue informs of its
deliverable messages."""
self.server = server
def _deliverMessages(self, msgList):
"Implementation of abstract method from DeliveryQueue."
# Map from addr -> [ (handle, msg) ... ]
msgs = {}
for handle, addr, message, n_retries in msgList:
msgs.setdefault(addr, []).append( (handle, message) )
for addr, messages in msgs.items():
handles, messages = zip(*messages)
self.server.sendMessages(addr.ip, addr.port, addr.keyinfo,
list(messages), list(handles))
class _MMTPServer(mixminion.server.MMTPServer.MMTPAsyncServer):
"""Implementation of mixminion.server.MMTPServer that knows about
delivery queues."""
def __init__(self, config, tls):
mixminion.server.MMTPServer.MMTPAsyncServer.__init__(self, config, tls)
def connectQueues(self, incoming, outgoing):
self.incomingQueue = incoming
self.outgoingQueue = outgoing
def onMessageReceived(self, msg):
self.incomingQueue.queueMessage(msg)
def onMessageSent(self, msg, handle):
self.outgoingQueue.deliverySucceeded(handle)
def onMessageUndeliverable(self, msg, handle, retriable):
self.outgoingQueue.deliveryFailed(handle, retriable)
class MixminionServer:
"""Wraps and drives all the queues, and the async net server. Handles
all timed events."""
## Fields:
# config: The ServerConfig object for this server
# keyring: The ServerKeyring
#
# mmtpServer: Instance of mixminion.ServerMain._MMTPServer. Receives
# and transmits packets from the network. Places the packets it
# receives in self.incomingQueue.
# incomingQueue: Instance of IncomingQueue. Holds received packets
# before they are decoded. Decodes packets with PacketHandler,
# and places them in mixPool.
# packetHandler: Instance of PacketHandler. Used by incomingQueue to
# decrypt, check, and re-pad received packets.
# mixPool: Instance of MixPool. Holds processed messages, and
# periodically decides which ones to deliver, according to some
# batching algorithm.
# moduleManager: Instance of ModuleManager. Map routing types to
# outging queues, and processes non-MMTP exit messages.
# outgoingQueue: Holds messages waiting to be send via MMTP.
def __init__(self, config):
"""Create a new server from a ServerConfig."""
LOG.debug("Initializing server")
self.config = config
self.keyring = ServerKeyring(config)
if self.keyring._getLiveKey() is None:
LOG.info("Generating a month's worth of keys.")
LOG.info("(Don't count on this feature in future versions.)")
# We might not be able to do this, if we password-encrypt keys
keylife = config['Server']['PublicKeyLifetime'][2]
nKeys = ceilDiv(30*24*60*60, keylife)
self.keyring.createKeys(nKeys)
LOG.trace("Initializing packet handler")
self.packetHandler = self.keyring.getPacketHandler()
LOG.trace("Initializing TLS context")
tlsContext = self.keyring.getTLSContext()
LOG.trace("Initializing MMTP server")
self.mmtpServer = _MMTPServer(config, tlsContext)
# FFFF Modulemanager should know about async so it can patch in if it
# FFFF needs to.
LOG.trace("Initializing delivery module")
self.moduleManager = config.getModuleManager()
self.moduleManager.configure(config)
homeDir = config['Server']['Homedir']
queueDir = os.path.join(homeDir, 'work', 'queues')
incomingDir = os.path.join(queueDir, "incoming")
LOG.trace("Initializing incoming queue")
self.incomingQueue = IncomingQueue(incomingDir, self.packetHandler)
LOG.trace("Found %d pending messages in incoming queue",
self.incomingQueue.count())
mixDir = os.path.join(queueDir, "mix")
# FFFF The choice of mix algorithm should be configurable
LOG.trace("Initializing Mix pool")
self.mixPool =MixPool(mixminion.server.Queue.TimedMixQueue(mixDir, 60))
LOG.trace("Found %d pending messages in Mix pool",
self.mixPool.count())
outgoingDir = os.path.join(queueDir, "outgoing")
LOG.trace("Initializing outgoing queue")
self.outgoingQueue = OutgoingQueue(outgoingDir)
LOG.trace("Found %d pending messages in outgoing queue",
self.outgoingQueue.count())
LOG.trace("Connecting queues")
self.incomingQueue.connectQueues(mixPool=self.mixPool)
self.mixPool.connectQueues(outgoing=self.outgoingQueue,
manager=self.moduleManager)
self.outgoingQueue.connectQueues(server=self.mmtpServer)
self.mmtpServer.connectQueues(incoming=self.incomingQueue,
outgoing=self.outgoingQueue)
def run(self):
"""Run the server; don't return unless we hit an exception."""
# FFFF Use heapq to schedule events? [I don't think so; there are only
# FFFF two events, after all!]
now = time.time()
MIX_INTERVAL = 20 # FFFF Configurable!
nextMix = now + MIX_INTERVAL
nextShred = now + 6000
#FFFF Unused
#nextRotate = self.keyring.getNextKeyRotation()
while 1:
LOG.trace("Next mix at %s", formatTime(nextMix,1))
while time.time() < nextMix:
# Handle pending network events
self.mmtpServer.process(1)
# Process any new messages that have come in, placing them
# into the mix pool.
self.incomingQueue.sendReadyMessages()
# Before we mix, we need to log the hashes to avoid replays.
# FFFF We need to recover on server failure.
self.packetHandler.syncLogs()
LOG.trace("Mix interval elapsed")
# Choose a set of outgoing messages; put them in outgoingqueue and
# modulemanger
self.mixPool.mix()
# Send outgoing messages
self.outgoingQueue.sendReadyMessages()
# Send exit messages
self.moduleManager.sendReadyMessages()
# Choose next mix interval
now = time.time()
nextMix = now + MIX_INTERVAL
if now > nextShred:
# FFFF Configurable shred interval
LOG.trace("Expunging deleted messages from queues")
self.incomingQueue.cleanQueue()
self.mixPool.queue.cleanQueue()
self.outgoingQueue.cleanQueue()
self.moduleManager.cleanQueues()
nextShred = now + 6000
def close(self):
"""Release all resources; close all files."""
self.packetHandler.close()
#----------------------------------------------------------------------
def usageAndExit(cmd):
executable = sys.argv[0]
print >>sys.stderr, "Usage: %s %s [-h] [-f configfile]" % (executable, cmd)
sys.exit(0)
def configFromServerArgs(cmd, args):
options, args = getopt.getopt(args, "hf:", ["help", "config="])
if args:
usageAndExit(cmd)
configFile = "/etc/mixminiond.conf"
for o,v in options:
if o in ('-h', '--help'):
usageAndExit(cmd)
if o in ('-f', '--config'):
configFile = v
return readConfigFile(configFile)
def readConfigFile(configFile):
try:
return mixminion.Config.ServerConfig(fname=configFile)
except (IOError, OSError), e:
print >>sys.stderr, "Error reading configuration file %r:"%configFile
print >>sys.stderr, " ", str(e)
sys.exit(1)
except mixminion.Config.ConfigError, e:
print >>sys.stderr, "Error in configuration file %r"%configFile
print >>sys.stderr, str(e)
sys.exit(1)
return None #suppress pychecker warning
#----------------------------------------------------------------------
def runServer(cmd, args):
config = configFromServerArgs(cmd, args)
try:
mixminion.Common.LOG.configure(config)
LOG.debug("Configuring server")
mixminion.Common.configureShredCommand(config)
mixminion.Crypto.init_crypto(config)
server = MixminionServer(config)
except:
LOG.fatal_exc(sys.exc_info(),"Exception while configuring server")
print >>sys.stderr, "Shutting down because of exception"
#XXXX print stack trace as well as logging?
sys.exit(1)
LOG.info("Starting server")
try:
server.run()
except KeyboardInterrupt:
pass
except:
LOG.fatal_exc(sys.exc_info(),"Exception while running server")
#XXXX print stack trace as well as logging?
LOG.info("Server shutting down")
server.close()
LOG.info("Server is shut down")
sys.exit(0)
#----------------------------------------------------------------------
def runKeygen(cmd, args):
options, args = getopt.getopt(args, "hf:n:",
["help", "config=", "keys="])
# FFFF password-encrypted keys
# FFFF Ability to fill gaps
# FFFF Ability to generate keys with particular start/end intervals
keys=1
usage=0
configFile = '/etc/miniond.conf'
for opt,val in options:
if opt in ('-h', '--help'):
usage=1
elif opt in ('-f', '--config'):
configFile = val
elif opt in ('-n', '--keys'):
try:
keys = int(val)
except ValueError:
print >>sys.stderr,("%s requires an integer" %opt)
usage = 1
if usage:
print >>sys.stderr, "Usage: %s [-h] [-f configfile] [-n nKeys]"%cmd
sys.exit(1)
config = readConfigFile(configFile)
LOG.setMinSeverity("INFO")
mixminion.Crypto.init_crypto(config)
keyring = ServerKeyring(config)
print >>sys.stderr, "Creating %s keys..." % keys
for i in xrange(keys):
keyring.createKeys(1)
print >> sys.stderr, ".... (%s/%s done)" % (i+1,keys)
#----------------------------------------------------------------------
def removeKeys(cmd, args):
# FFFF Resist removing keys that have been published.
# FFFF Generate 'suicide note' for removing identity key.
options, args = getopt.getopt(args, "hf:", ["help", "config=",
"remove-identity"])
if args:
print >>sys.stderr, "%s takes no arguments"%cmd
usage = 1
args = options = ()
usage = 0
removeIdentity = 0
configFile = '/etc/miniond.conf'
for opt,val in options:
if opt in ('-h', '--help'):
usage=1
elif opt in ('-f', '--config'):
configFile = val
elif opt == '--remove-identity':
removeIdentity = 1
if usage:
print >>sys.stderr, \
"Usage: %s [-h|--help] [-f configfile] [--remove-identity]"%cmd
sys.exit(1)
config = readConfigFile(configFile)
mixminion.Common.configureShredCommand(config)
LOG.setMinSeverity("INFO")
keyring = ServerKeyring(config)
keyring.checkKeys()
# This is impossibly far in the future.
keyring.removeDeadKeys(now=(1L << 36))
if removeIdentity:
keyring.removeIdentityKey()
LOG.info("Done removing keys")
--- NEW FILE: __init__.py ---
# Copyright 2002 Nick Mathewson. See LICENSE for licensing information.
# $Id: __init__.py,v 1.1 2002/12/11 06:58:55 nickm Exp $
"""mixminion
Client and server code for type III anonymous remailers.
"""
__all__ = [ ]
#'MMTPServer', 'Queue', 'HashLog', 'PacketHandler', 'Modules',
# 'ServerMain' ]
## import mixminion.server.MMTPServer as MMTPServer
## import mixminion.server.PacketHandler as PacketHandler
## import mixminion.server.HashLog as HashLog
## import mixminion.server.Modules as Modules
## import mixminion.server.Queue as Queue
## import mixminion.server.ServerMain as ServerMain