[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[minion-cvs] Move all the pure-server modules into a new package, "m...



Update of /home/minion/cvsroot/src/minion/lib/mixminion/server
In directory moria.mit.edu:/tmp/cvs-serv11907/lib/mixminion/server

Added Files:
	HashLog.py MMTPServer.py Modules.py PacketHandler.py Queue.py 
	ServerMain.py __init__.py 
Log Message:
Move all the pure-server modules into a new package, 'mixminion.server.'

We still need to split 'Config' and 'ServerInfo' into their client and server
components.


--- NEW FILE: HashLog.py ---
# Copyright 2002 Nick Mathewson.  See LICENSE for licensing information.
# $Id: HashLog.py,v 1.1 2002/12/11 06:58:55 nickm Exp $

"""mixminion.HashLog

   Persistant memory for the hashed secrets we've seen.  Used by
   PacketHandler to prevent replay attacks."""

import os
import anydbm, dumbdbm
from mixminion.Common import MixFatalError, LOG, createPrivateDir
from mixminion.Packet import DIGEST_LEN

__all__ = [ 'HashLog' ]

# FFFF Mechanism to force a different default db module.

# FFFF two-copy journaling to protect against catastrophic failure that
# FFFF underlying DB code can't handle.

# flags to pass to os.open when opening the journal file.
_JOURNAL_OPEN_FLAGS = os.O_WRONLY|os.O_CREAT|getattr(os,'O_SYNC',0)
class HashLog:
    """A HashLog is a file containing a list of message digests that we've
       already processed.

       Each HashLog corresponds to a single public key (whose hash is the
       log's keyid).  A HashLog must persist for as long as the key does.

       It is not necessary to sync the HashLog to the disk every time
       a new message is seen; instead, we must only ensure that every
       _retransmitted_ message is first inserted into the hashlog and
       synced.  (One way to implement this is to process messages from
       'state A' into 'state B', marking them in the hashlog as we go,
       and syncing the hashlog before any message is sent from 'B' to
       the network.  On a restart, we reinsert all messages waiting in 'B'
       into the log.)

       HashLogs are implemented using Python's anydbm interface.  This defaults
       to using Berkeley DB, GDBM, or --if you have none of these-- a flat
       text file.

       The base HashLog implementation assumes an 8-bit-clean database that
       maps strings to strings."""
    ##
    # Internally, we also keep a flat 'journal' file to which we append
    # values that we've seen but not yet written to the database.  This way
    # we can survive crashes between 'logHash' and 'sync'.
    #
    # Fields:
    #   log: an anydbm instance.
    #   journalFileName: the name of our journal file
    #   journalFile: a file object for our journal file
    #   journal: a dictionary, used to cache values currently in the
    #       journal file.
    def __init__(self, filename, keyid):
        """Create a new HashLog to store data in 'filename' for the key
           'keyid'."""
        parent = os.path.split(filename)[0]
	createPrivateDir(parent)
        self.log = anydbm.open(filename, 'c')
	LOG.debug("Opening database %s for packet digests", filename)
        if isinstance(self.log, dumbdbm._Database):
            LOG.warn("Warning: logging packet digests to a flat file.")
        try:
            if self.log["KEYID"] != keyid:
                raise MixFatalError("Log KEYID does not match current KEYID")
        except KeyError:
            self.log["KEYID"] = keyid

	self.journalFileName = filename+"_jrnl"
	self.journal = {}
	if os.path.exists(self.journalFileName):
	    f = open(self.journalFileName, 'r')
	    # FFFF deal with really big journals?
	    j = f.read()
	    for i in xrange(0, len(j), DIGEST_LEN):
		self.journal[j[i:i+DIGEST_LEN]] = 1
	    f.close()

	self.journalFile = os.open(self.journalFileName, 
		    _JOURNAL_OPEN_FLAGS|os.O_APPEND, 0600)

    def seenHash(self, hash):
        """Return true iff 'hash' has been logged before."""
        try:
	    if self.journal.get(hash,0):
		return 1
            _ = self.log[hash]
            return 1
        except KeyError:
            return 0

    def logHash(self, hash):
        """Insert 'hash' into the database."""
	assert len(hash) == DIGEST_LEN
	self.journal[hash] = 1
	os.write(self.journalFile, hash)

    def sync(self):
        """Flushes changes to this log to the filesystem."""
	for hash in self.journal.keys():
	    self.log[hash] = "1"
        if hasattr(self.log, "sync"):
            self.log.sync()
	os.close(self.journalFile)
	self.journalFile = os.open(self.journalFileName,
		   _JOURNAL_OPEN_FLAGS|os.O_TRUNC, 0600)
	self.journal = {}

    def close(self):
        """Closes this log."""
        self.sync()
        self.log.close()
	os.close(self.journalFile)
	


--- NEW FILE: MMTPServer.py ---
# Copyright 2002 Nick Mathewson.  See LICENSE for licensing information.
# $Id: MMTPServer.py,v 1.1 2002/12/11 06:58:55 nickm Exp $
"""mixminion.MMTPServer

   This package implements the Mixminion Transfer Protocol as described
   in the Mixminion specification.  It uses a select loop to provide
   a nonblocking implementation of *both* the client and the server sides
   of the protocol.

   If you just want to send messages into the system, use MMTPClient.

   FFFF As yet unsupported are: Session resumption, key renegotiation,
   FFFF: Also unsupported: timeouts."""

# NOTE FOR THE CURIOUS: The 'asyncore' module in the standard library
#    is another general select/poll wrapper... so why are we using our
#    own?  Basically, because asyncore has IMO a couple of mismatches
#    with our design, the largest of which is that it has the 'server
#    loop' periodically query the connections for their status,
#    whereas we have the connections inform the server of their status
#    whenever they change.  This latter approach turns out to be far
#    easier to use with TLS.

import errno
import socket
import select
import re
import time
from types import StringType

import mixminion._minionlib as _ml
from mixminion.Common import MixError, MixFatalError, LOG, stringContains
from mixminion.Crypto import sha1
from mixminion.Packet import MESSAGE_LEN, DIGEST_LEN

__all__ = [ 'AsyncServer', 'ListenConnection', 'MMTPServerConnection',
            'MMTPClientConnection' ]

trace = LOG.trace
info = LOG.info
debug = LOG.info
warn = LOG.warn
error = LOG.error

class AsyncServer:
    """AsyncServer is the core of a general-purpose asynchronous
       select-based server loop.  AsyncServer maintains two lists of
       Connection objects that are waiting for reads and writes
       (respectively), and waits for their underlying sockets to be
       available for the desired operations.
       """
    ## Fields:
    # writers: map from fd to 'Connection' objects that are interested
    #      in write events.
    # readers: map from fd to 'Connection' objects that are interested
    #      in read events.
    def __init__(self):
        """Create a new AsyncServer with no readers or writers."""
        self.writers = {}
        self.readers = {}

    def process(self, timeout):
        """If any relevant file descriptors become available within
           'timeout' seconds, call the appropriate methods on their
           connections and return immediately after. Otherwise, wait
           'timeout' seconds and return.

           If we receive an unblocked signal, return immediately.
           """

##         trace("%s readers (%s), %s writers (%s)" % (len(self.readers),
## 						    readers,
## 						    len(self.writers),
## 						    writers))
        
        readfds = self.readers.keys()
        writefds = self.writers.keys()
        try:
            readfds,writefds,exfds = select.select(readfds,writefds,[],timeout)
        except select.error, e:
            if e[0] == errno.EINTR:
                return
            else:
                raise e

        for fd in readfds:
            trace("Select got a read on fd %s",fd)
            self.readers[fd].handleRead()
        for fd in writefds:
            trace("Select got a write on fd %s", fd)
            self.writers[fd].handleWrite()
        for fd in exfds:
            trace("Select got an exception on fd %s", fd)
            if self.readers.has_key(fd): del self.readers[fd]
            if self.writers.has_key(fd): del self.writers[fd]

    def hasReader(self, reader):
	"""Return true iff 'reader' is a reader on this server."""
	fd = reader.fileno()
	return self.readers.get(fd, None) is reader

    def hasWriter(self, writer):
	"""Return true iff 'writer' is a writer on this server."""
	fd = writer.fileno()
	return self.writers.get(fd, None) is writer

    def registerReader(self, reader):
        """Register a connection as a reader.  The connection's 'handleRead'
           method will be called whenever data is available for reading."""
        fd = reader.fileno()
        self.readers[fd] = reader
        if self.writers.has_key(fd):
            del self.writers[fd]

    def registerWriter(self, writer):
        """Register a connection as a writer.  The connection's 'handleWrite'
           method will be called whenever the buffer is free for writing.
        """
        fd = writer.fileno()
        self.writers[fd] = writer
        if self.readers.has_key(fd):
            del self.readers[fd]

    def registerBoth(self, connection):
        """Register a connection as a reader and a writer.  The
           connection's 'handleRead' and 'handleWrite' methods will be
           called as appropriate.
        """ 
        fd = connection.fileno()
        self.readers[fd] = self.writers[fd] = connection

    def unregister(self, connection):
        """Removes a connection from this server."""
        fd = connection.fileno()
        w = self.writers.has_key(fd)
        r = self.readers.has_key(fd)
        if r: del self.readers[fd]
        if w: del self.writers[fd]

class Connection:
    "A connection is an abstract superclass for asynchronous channels"
    def handleRead(self):
        """Invoked when there is data to read.""" 
        pass
    def handleWrite(self):
        """Invoked when there is data to write."""
        pass
    def register(self, server):
        """Invoked to register this connection with an AsyncServer."""
        pass
    def fileno(self):
        """Returns an integer file descriptor for this connection, or returns
           an object that can return such a descriptor."""
        pass

class ListenConnection(Connection):
    """A ListenConnection listens on a given port/ip combination, and calls
       a 'connectionFactory' method whenever a new connection is made to that
       port."""
    ## Fields:
    # ip: IP to listen on.
    # port: port to listen on.
    # sock: socket to bind.
    # connectionFactory: a function that takes as input a socket from a
    #    newly received connection, and returns a Connection object to
    #    register with the async server.
    def __init__(self, ip, port, backlog, connectionFactory):
        """Create a new ListenConnection"""
        self.ip = ip
        self.port = port
        self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        self.sock.setblocking(0)
        self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
        self.sock.bind((self.ip, self.port))
        self.sock.listen(backlog)
        self.connectionFactory = connectionFactory
        info("Listening at %s on port %s (fd %s)", ip, port,self.sock.fileno())

    def register(self, server):
        server.registerReader(self)
        self.server = server

    def handleRead(self):
        con, addr = self.sock.accept()
        debug("Accepted connection from %s (fd %s)", addr, con.fileno())
        rw = self.connectionFactory(con)
        rw.register(self.server)

    def shutdown(self):
        debug("Closing listener connection (fd %s)", self.sock.fileno())
        self.server.unregister(self)
        del self.server
        self.sock.close()
        info("Server connection closed")

    def fileno(self):
        return self.sock.fileno()

class SimpleTLSConnection(Connection):
    """SimpleTLSConnection is an abstract superclass for asynchronous TLS
       connections.  Conceptually, a SimpleTLSConnection is in one of 5 states:
           1. Negotiating a new connection (server side)
           2. Negotiating a new connection (client side)
           3. Reading
           4. Writing
           5. Shutting down.
       Reads procede until either a given number of bytes have been received,
       or until a provided terminator has been found.  Writes procede until
       a buffer is exhausted.

       After leaving states 1,2,3,or 4, the connection's "finished" method
       is called.  After leaving state 5, the connection's "shutdownFinished"
       method is called.
    """
    # Fields:
    #    lastActivity: the last time when we had a read or a write.
    #    address: Human readable IP address of our peer.  For debugging.
    #    fd: fileno for the underlying socket __con.
    #
    #    __con: an underlying TLS object
    #    __state: a callback to use whenever we get a read or a write. May
    #           throw _ml.TLSWantRead or _ml.TLSWantWrite.  See __acceptFn,
    #           __connectFn, __shutdownFn, __readFn, __writeFn.
    #    __server: an AsyncServer.
    #    __inbuf: A list of strings that we've read since the last expectRead. 
    #    __inbuflen: The total length of all the strings in __inbuf
    #    __expectReadLen: None, or the number of bytes to read before
    #           the current read succeeds.
    #    __terminator: None, or a string which will terminate the current read.
    #    __outbuf: None, or the remainder of the string we're currently
    #           writing.
    def __init__(self, sock, tls, serverMode):
        """Create a new SimpleTLSConnection.

           tls -- An underlying TLS connection.
           serverMode -- If true, we start with a server-side negotatiation.
                         otherwise, we start with a client-side negotatiation.
        """
        self.__sock = sock
	self.address = "%s:%s" % sock.getpeername()
        self.__con = tls
	self.fd = self.__con.fileno()

        if serverMode:
            self.__state = self.__acceptFn
        else:
            self.__state = self.__connectFn

    def isShutdown(self):
        """Returns true iff this connection is finished shutting down"""
        return self.__state is None

    def register(self, server):
        self.__server = server
        if self.__state == self.__acceptFn:
            server.registerReader(self)
        else:
            assert self.__state == self.__connectFn
            server.registerWriter(self)
        
    def expectRead(self, bytes=None, terminator=None):
        """Begin reading from the underlying TLS connection.

           After the read is finished, this object's finished method
           is invoked.  A call to 'getInput' will retrieve the contents
           of the input buffer since the last call to 'expectRead'.

           If 'terminator' is not provided, we try to read exactly
           'bytes' bytes.  If terminator is provided, we read until we
           encounter the terminator, but give up after 'bytes' bytes.
        """
        self.__inbuf = []
        self.__inbuflen = 0
        self.__expectReadLen = bytes
        self.__terminator = terminator

        self.__state = self.__readFn
        self.__server.registerReader(self)

    def beginWrite(self, str):
        """Begin writing a string to the underlying connection.  When the  
           string is completely written, this object's "finished" method
           will be called. 
        """
        self.__outbuf = str
        self.__state = self.__writeFn
        self.__server.registerWriter(self)

    def __acceptFn(self):
        """Hook to implement server-side handshake."""
        self.__con.accept() #may throw want*
        self.__server.unregister(self)
        self.finished()

    def __connectFn(self):
        """Hook to implement client-side handshake."""
        self.__con.connect() #may throw want*
        self.__server.unregister(self)
        self.finished()

    def __shutdownFn(self):
        """Hook to implement shutdown."""

	# This is a bit subtle.  The underlying 'shutdown' method
	# needs to be retried till the other guy sends an 'ack'
	# back... but we don't want to keep retrying indefinitely, or
	# else we can deadlock on a connection from ourself to
	# ourself.
	if self.__con.shutdown() == 1: #may throw want*
	    trace("Got a 1 on shutdown (fd %s)", self.fd)
	    self.__server.unregister(self)
	    self.__state = None
	    self.__sock.close()
	    self.shutdownFinished()
	    return

	# If we don't get any response on shutdown, stop blocking; the other
	# side may be hostile, confused, or deadlocking.
	trace("Got a 0 on shutdown (fd %s)", self.fd)
	# ???? Is 'wantread' always correct?
	# ???? Rather than waiting for a read, should we use a timer or 
	# ????       something?
	raise _ml.TLSWantRead()

    def __readFn(self):
        """Hook to implement read"""
        while 1:
            r = self.__con.read(1024) #may throw want*
            if r == 0:
                trace("read returned 0 (fd %s)", self.fd)
                self.shutdown(err=0)
                return
            else:
                assert isinstance(r, StringType)
                trace("read got %s bytes (fd %s)", len(r), self.fd)
                self.__inbuf.append(r)
                self.__inbuflen += len(r)
                if not self.__con.pending():
                    break

        if self.__terminator and len(self.__inbuf) > 1:
            self.__inbuf = ["".join(self.__inbuf)]

        if self.__expectReadLen and self.__inbuflen > self.__expectReadLen:
            warn("Protocol violation: too much data. Closing connection to %s",
		 self.address)
            self.shutdown(err=1, retriable=0)
            return
         
        if self.__terminator and stringContains(self.__inbuf[0], 
						self.__terminator):
            trace("read found terminator (fd %s)", self.fd)
            self.__server.unregister(self)
            self.finished()

        if self.__expectReadLen and (self.__inbuflen == self.__expectReadLen):
            trace("read got enough (fd %s)", self.fd)
            self.__server.unregister(self)
            self.finished()

    def __writeFn(self):
        """Hook to implement write"""
        out = self.__outbuf
        while len(out):
            r = self.__con.write(out) # may throw

	    assert r > 0
            out = out[r:]

        self.__outbuf = out
        if len(out) == 0:
            self.finished()
        
    def handleRead(self):
        self.__handleAll()

    def handleWrite(self):
        self.__handleAll()

    def __handleAll(self):
        """Underlying implementation of TLS connection: traverses as
           many states as possible until some operation blocks on
           reading or writing, or until the current __state becomes
           None.
        """
        self.lastActivity = time.time()
        
        try:
            # We have a while loop here so that, upon entering a new
            # state, we immediately see if we can go anywhere with it
            # without blocking.
	    while self.__state is not None:
                self.__state()
        except _ml.TLSWantWrite:
            self.__server.registerWriter(self)
        except _ml.TLSWantRead:
            self.__server.registerReader(self)
        except _ml.TLSClosed:
            warn("Unexpectedly closed connection to %s", self.address)
	    self.handleFail(retriable=1)
            self.__sock.close()
            self.__server.unregister(self) 
        except _ml.TLSError:
            if self.__state != self.__shutdownFn:
                warn("Unexpected error: closing connection to %s", 
		     self.address)
                self.shutdown(err=1, retriable=1)
            else:
                warn("Error while shutting down: closing connection to %s",
		     self.address)
                self.__server.unregister(self)
        else:
            # We are in no state at all.
            self.__server.unregister(self)
              
    def finished(self):
        """Called whenever a connect, accept, read, or write is finished."""
        pass

    def shutdownFinished(self):
        """Called when this connection is successfully shut down."""
        pass

    def shutdown(self, err=0, retriable=0):
        """Begin a shutdown on this connection"""
	if err:
	    self.handleFail(retriable)
        self.__state = self.__shutdownFn
        
    def fileno(self):
        return self.fd

    def getInput(self):
        """Returns the current contents of the input buffer."""
        return "".join(self.__inbuf)

    def getPeerPK(self):
        return self.__con.get_peer_cert_pk()

    def handleFail(self, retriable=0):
	"""Called when we shutdown with an error."""
	pass
    
#----------------------------------------------------------------------
# Implementation for MMTP.

# The protocol string to send.
PROTOCOL_STRING      = "MMTP 0.1\r\n"
# The protocol specification to expect.
PROTOCOL_RE          = re.compile("MMTP ([^\s\r\n]+)\r\n")
# Control line for sending a message.
SEND_CONTROL         = "SEND\r\n"
# Control line for sending padding.
JUNK_CONTROL         = "JUNK\r\n"
# Control line for acknowledging a message
RECEIVED_CONTROL     = "RECEIVED\r\n"
SEND_CONTROL_LEN     = len(SEND_CONTROL)
RECEIVED_CONTROL_LEN = len(RECEIVED_CONTROL)
SEND_RECORD_LEN      = len(SEND_CONTROL) + MESSAGE_LEN + DIGEST_LEN
RECEIVED_RECORD_LEN  = RECEIVED_CONTROL_LEN + DIGEST_LEN

class MMTPServerConnection(SimpleTLSConnection):
    '''An asynchronous implementation of the receiving side of an MMTP
       connection.'''
    ## Fields:
    # messageConsumer: a function to call with all received messages.
    # finished: callback when we're done with a read or write; see
    #     SimpleTLSConnection.
    def __init__(self, sock, tls, consumer):
	"""Create an MMTP connection to receive messages sent along a given
	   socket.  When valid packets are received, pass them to the 
	   function 'consumer'."""
        SimpleTLSConnection.__init__(self, sock, tls, 1)
        self.messageConsumer = consumer
        self.finished = self.__setupFinished

    def __setupFinished(self):
        """Called once we're done accepting.  Begins reading the protocol
           string.
        """
        self.finished = self.__receivedProtocol
        self.expectRead(1024, '\n')

    def __receivedProtocol(self):
        """Called once we're done reading the protocol string.  Either
           rejects, or sends our response.
        """
        trace("done w/ client sendproto (fd %s)", self.fd)
        inp = self.getInput()
        m = PROTOCOL_RE.match(inp)

        if not m:
            warn("Bad protocol list.  Closing connection to %s", self.address)
            self.shutdown(err=1)
        protocols = m.group(1).split(",")
        if "0.1" not in protocols:
            warn("Unsupported protocol list.  Closing connection to %s",
		 self.address)
            self.shutdown(err=1); return
        else:
            trace("protocol ok (fd %s)", self.fd)
            self.finished = self.__sentProtocol
            self.beginWrite(PROTOCOL_STRING)

    def __sentProtocol(self):
        """Called once we're done sending our protocol response.  Begins
           reading a packet from the line.
        """
        trace("done w/ server sendproto (fd %s)", self.fd)
        self.finished = self.__receivedMessage
        self.expectRead(SEND_RECORD_LEN)

    def __receivedMessage(self):
        """Called once we've read a message from the line.  Checks the
           digest, and either rejects or begins sending an ACK."""
        data = self.getInput()
        msg = data[SEND_CONTROL_LEN:-DIGEST_LEN]
        digest = data[-DIGEST_LEN:]

        if data.startswith(JUNK_CONTROL):
            expectedDigest = sha1(msg+"JUNK")
            replyDigest = sha1(msg+"RECEIVED JUNK")
        elif data.startswith(SEND_CONTROL):
            expectedDigest = sha1(msg+"SEND")
            replyDigest = sha1(msg+"RECEIVED")
        else:
            warn("Unrecognized command from %s.  Closing connection.",
		 self.address)
            self.shutdown(err=1)
            return
        if expectedDigest != digest:
            warn("Invalid checksum from %s. Closing connection",
		 self.address)
            self.shutdown(err=1)
            return
        else:
            debug("%s packet received from %s; Checksum valid.",
		  data[:4], self.address)
            self.finished = self.__sentAck
            self.beginWrite(RECEIVED_CONTROL+replyDigest)
            self.messageConsumer(msg)

    def __sentAck(self):
        """Called once we're done sending an ACK.  Begins reading a new
           message."""
        debug("Send ACK for message from %s (fd %s)", self.address, self.fd)
        #FFFF Rehandshake
        self.finished = self.__receivedMessage
        self.expectRead(SEND_RECORD_LEN)

#----------------------------------------------------------------------

# FFFF We need to note retriable situations better.
class MMTPClientConnection(SimpleTLSConnection):
    """Asynchronious implementation of the sending ("client") side of a
       mixminion connection."""
    ## Fields:
    # ip, port, keyID, messageList, handleList, sendCallback, failCallback: 
    #   As described in the docstring for __init__ below.
    def __init__(self, context, ip, port, keyID, messageList, handleList,
                 sentCallback=None, failCallback=None):
	"""Create a connection to send messages to an MMTP server.  
	   ip -- The IP of the destination server.
	   port -- The port to connect to.
	   keyID -- None, or the expected SHA1 hash of the server's public key
	   messageList -- a list of message payloads to send
	   handleList -- a list of objects corresponding to the messages in
	      messageList.  Used for callback.
           sentCallback -- None, or a function of (msg, handle) to be called
              whenever a message is successfully sent.
           failCallback -- None, or a function of (msg, handle, retriable)
              to be called when messages can't be sent."""
	
        sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        sock.setblocking(0)
        self.keyID = keyID
        self.ip = ip
        try:
            sock.connect((ip, port))
        except socket.error:
            # This will always raise an error, since we're nonblocking.  That's
            # okay.
            pass
        tls = context.sock(sock)

        SimpleTLSConnection.__init__(self, sock, tls, 0)
        self.messageList = messageList
	self.handleList = handleList
        self.finished = self.__setupFinished
        self.sentCallback = sentCallback
	self.failCallback = failCallback

        debug("Opening client connection (fd %s)", self.fd)

    def __setupFinished(self):
        """Called when we're done with the client side negotations.
           Begins sending the protocol string.
        """
        keyID = sha1(self.getPeerPK().encode_key(public=1))
        if self.keyID is not None:
            if keyID != self.keyID:
                warn("Got unexpected Key ID from %s", self.address)
		# This may work again in a couple of hours
                self.shutdown(err=1,retriable=1)
            else:
                debug("KeyID from %s is valid", self.address)

        self.beginWrite(PROTOCOL_STRING)
        self.finished = self.__sentProtocol

    def __sentProtocol(self):    
        """Called when we're done sending the protocol string.  Begins
           reading the server's response.
        """
        self.expectRead(1024, '\n')
        self.finished = self.__receivedProtocol

    def __receivedProtocol(self):
        """Called when we're done receiving the protocol string.  Begins
           sending a packet, or exits if we're done sending.
        """
        inp = self.getInput()
        if inp != PROTOCOL_STRING:
            warn("Invalid protocol.  Closing connection to %s", self.address)
	    # This isn't retriable; we don't talk to servers we don't
	    # understand.
            self.shutdown(err=1,retriable=0)
            return

        self.beginNextMessage()

    def beginNextMessage(self):
        """Start writing a message to the connection."""
        if not self.messageList:
            self.shutdown(0)
            return
        msg = self.messageList[0]
        self.expectedDigest = sha1(msg+"RECEIVED")
        msg = SEND_CONTROL+msg+sha1(msg+"SEND")
	assert len(msg) == SEND_RECORD_LEN
	
        self.beginWrite(msg)
        self.finished = self.__sentMessage

    def __sentMessage(self):
        """Called when we're done sending a message.  Begins reading the
           server's ACK."""

        debug("Message delivered to %s (fd %s)", self.address, self.fd)
        self.finished = self.__receivedAck
        self.expectRead(RECEIVED_RECORD_LEN)

    def __receivedAck(self):
       """Called when we're done reading the ACK.  If the ACK is bad,
          closes the connection.  If the ACK is correct, removes the
          just-sent message from the connection's internal queue, and
          calls sentCallback with the sent message.

          If there are more messages to send, begins sending the next.
          Otherwise, begins shutting down.
       """
       trace("received ack (fd %s)", self.fd)
       # FFFF Rehandshake
       inp = self.getInput()
       if inp != (RECEIVED_CONTROL+self.expectedDigest):
	   # We only get bad ACKs if an adversary somehow subverts TLS's
	   # checksumming.  That's not fixable.
           self.shutdown(err=1,retriable=0)
           return

       debug("Received valid ACK for message from %s", self.address)
       justSent = self.messageList[0]
       justSentHandle = self.handleList[0]
       del self.messageList[0]
       del self.handleList[0]
       if self.sentCallback is not None:
           self.sentCallback(justSent, justSentHandle)
	   
       self.beginNextMessage()

    def handleFail(self, retriable):
	"""Invoked when a message is not deliverable."""
	if self.failCallback is not None:
	    for msg, handle in zip(self.messageList, self.handleList):
		self.failCallback(msg,handle,retriable)

LISTEN_BACKLOG = 10 # ???? Is something else more reasonable?
class MMTPAsyncServer(AsyncServer):
    """A helper class to invoke AsyncServer, MMTPServerConnection, and
       MMTPClientConnection, with a function to add new connections, and
       callbacks for message success and failure."""
    def __init__(self, config, tls):
	AsyncServer.__init__(self)

        self.context = tls
	# FFFF Don't always listen; don't always retransmit!
	# FFFF Support listening on specific IPs
        self.listener = ListenConnection(config['Incoming/MMTP']['IP'],
                                         config['Incoming/MMTP']['Port'],
					 LISTEN_BACKLOG,
                                         self._newMMTPConnection)
	#self.config = config
        self.listener.register(self)

    def _newMMTPConnection(self, sock):
	"""helper method.  Creates and registers a new server connection when
	   the listener socket gets a hit."""
        # FFFF Check whether incoming IP is allowed!
        tls = self.context.sock(sock, serverMode=1)
        sock.setblocking(0)
        con = MMTPServerConnection(sock, tls, self.onMessageReceived)
        con.register(self)
	return con
    
    def stopListening(self):
	self.listener.shutdown()

    def sendMessages(self, ip, port, keyID, messages, handles):
	"""Begin sending a set of messages to a given server."""
	# ???? Can we remove these asserts yet?
	for m,h in zip(messages, handles):
	    assert len(m) == MESSAGE_LEN
	    assert len(h) < 32
	    
        con = MMTPClientConnection(self.context,
				   ip, port, keyID, messages, handles,
                                   self.onMessageSent, 
				   self.onMessageUndeliverable)
        con.register(self)

    def onMessageReceived(self, msg):
        pass

    def onMessageUndeliverable(self, msg, handle, retriable):
	pass

    def onMessageSent(self, msg, handle):
        pass


--- NEW FILE: Modules.py ---
# Copyright 2002 Nick Mathewson.  See LICENSE for licensing information.
# $Id: Modules.py,v 1.1 2002/12/11 06:58:55 nickm Exp $

"""mixminion.server.Modules

   Code to support pluggable exit module functionality; implementation
   for built-in modules.
   """
# FFFF We may, someday, want to support non-exit modules here.
# FFFF Maybe we should refactor MMTP delivery here too.

__all__ = [ 'ModuleManager', 'DeliveryModule',
	    'DELIVER_OK', 'DELIVER_FAIL_RETRY', 'DELIVER_FAIL_NORETRY'
	    ]

import os
import re
import sys
import smtplib
import socket
import base64

import mixminion.Config
import mixminion.Packet
import mixminion.BuildMessage
from mixminion.Config import ConfigError, _parseBoolean, _parseCommand
from mixminion.Common import LOG, createPrivateDir, MixError, isSMTPMailbox, \
     isPrintingAscii

import mixminion.server.Queue

# Return values for processMessage
DELIVER_OK = 1
DELIVER_FAIL_RETRY = 2
DELIVER_FAIL_NORETRY = 3

class DeliveryModule:
    """Abstract base for modules; delivery modules should implement
       the methods in this class.

       A delivery module has the following responsibilities:
           * It must have a 0-argument contructor.
           * If it is configurable, it must be able to specify its options,
             validate its configuration, and configure itself.
           * If it is advertisable, it must provide a server info block.
           * It must know its own name.
	   * It must know which types it handles.
	   * Of course, it needs to know how to deliver a message."""
    # FFFF DeliveryModules need to know about the AsyncServer object in
    # FFFF case they support asynchronous delivery.
    def __init__(self):
	"Zero-argument constructor, as required by Module protocol."
	pass

    def getConfigSyntax(self):
	"""Return a map from section names to section syntax, as described
	   in Config.py"""
        raise NotImplementedError("getConfigSyntax")

    def validateConfig(self, sections, entries, lines, contents):
	"""See mixminion.Config.validate"""
        pass

    def configure(self, config, manager):
	"""Configure this object using a given Config object, and (if
	   required) register it with the module manager."""
        raise NotImplementedError("configure")

    def getServerInfoBlock(self):
	"""Return a block for inclusion in a server descriptor."""
        raise NotImplementedError("getServerInfoBlock")

    def getName(self):
	"""Return the name of this module.  This name may be used to construct
	   directory paths, so it shouldn't contain any funny characters."""
        raise NotImplementedError("getName")

    def getExitTypes(self):
	"""Return a sequence of numeric exit types that this module can
           handle."""
        raise NotImplementedError("getExitTypes")

    def createDeliveryQueue(self, queueDir):
	"""Return a DeliveryQueue object suitable for delivering messages
	   via this module.  The default implementation returns a
	   SimpleModuleDeliveryQueue,  which (though adequate) doesn't
	   batch messages intended for the same destination.

	   For the 'address' component of the delivery queue, modules must
	   accept a tuple of: (exitType, address, tag).  If 'tag' is None,
	   the message has been decrypted; if 'tag' is 'err', the message is
	   corrupt.  Otherwise, the message is either a reply or an encrypted
	   forward message
	   """
        return SimpleModuleDeliveryQueue(self, queueDir)

    def processMessage(self, message, tag, exitType, exitInfo):
	"""Given a message with a given exitType and exitInfo, try to deliver
           it.  'tag' is as decribed in createDeliveryQueue. Return one of:
            DELIVER_OK (if the message was successfully delivered),
	    DELIVER_FAIL_RETRY (if the message wasn't delivered, but might be
              deliverable later), or
	    DELIVER_FAIL_NORETRY (if the message shouldn't be tried later).

	   (This method is only used by your delivery queue; if you use
	    a nonstandard delivery queue, you don't need to implement this."""
        raise NotImplementedError("processMessage")

class ImmediateDeliveryQueue:
    """Helper class usable as delivery queue for modules that don't
       actually want a queue.  Such modules should have very speedy
       processMessage() methods, and should never have deliery fail."""
    ##Fields:
    #  module: the underlying DeliveryModule object.
    def __init__(self, module):
	self.module = module

    def queueDeliveryMessage(self, (exitType, address, tag), message):
	"""Instead of queueing our message, pass it directly to the underlying
	   DeliveryModule."""	
	try:
	    res = self.module.processMessage(message, tag, exitType, address)
	    if res == DELIVER_OK:
		return
	    elif res == DELIVER_FAIL_RETRY:
		LOG.error("Unable to retry delivery for message")
	    else:
		LOG.error("Unable to deliver message")
	except:
	    LOG.error_exc(sys.exc_info(),
			       "Exception delivering message")

    def sendReadyMessages(self):
	# We do nothing here; we already delivered the messages
	pass

class SimpleModuleDeliveryQueue(mixminion.server.Queue.DeliveryQueue):
    """Helper class used as a default delivery queue for modules that
       don't care about batching messages to like addresses."""
    ## Fields: 
    # module: the underlying module.
    def __init__(self, module, directory):
	mixminion.server.Queue.DeliveryQueue.__init__(self, directory)
	self.module = module

    def _deliverMessages(self, msgList):
	for handle, addr, message, n_retries in msgList:	
	    try:
		exitType, address, tag = addr
		result = self.module.processMessage(message,tag,exitType,address)
		if result == DELIVER_OK:
		    self.deliverySucceeded(handle)
		elif result == DELIVER_FAIL_RETRY:
		    self.deliveryFailed(handle, 1)
		else:
		    LOG.error("Unable to deliver message")
		    self.deliveryFailed(handle, 0)
	    except:
		LOG.error_exc(sys.exc_info(),
				   "Exception delivering message")
		self.deliveryFailed(handle, 0)

class ModuleManager:
    """A ModuleManager knows about all of the server modules in the system.

       A module may be in one of three states: unloaded, registered, or
       enabled.  An unloaded module is just a class in a python module.
       A registered module has been loaded, configured, and listed with
       the ModuleManager, but will not receive messags until it is
       enabled.

       Because modules need to tell the ServerConfig object about their
       configuration options, initializing the ModuleManager is usually done
       through ServerConfig.  See ServerConfig.getModuleManager().

       To send messages, call 'queueMessage' for each message to send, then
       call 'sendReadyMessages'.
       """
    ##
    # Fields
    #    syntax: extensions to the syntax configuration in Config.py
    #    modules: a list of DeliveryModule objects
    #    enabled: a set of enabled DeliveryModule objects
    #    nameToModule: Map from module name to module
    #    typeToModule: a map from delivery type to enabled deliverymodule.
    #    path: search path for python modules.
    #    queueRoot: directory where all the queues go.
    #    queues: a map from module name to queue (Queue objects must support
    #            queueMessage and sendReadyMessages as in DeliveryQueue.)

    def __init__(self):
	"Create a new ModuleManager"
        self.syntax = {}
        self.modules = []
        self.enabled = {}

	self.nameToModule = {}
        self.typeToModule = {}
	self.path = []
	self.queueRoot = None
	self.queues = {}

        self.registerModule(MBoxModule())
        self.registerModule(DropModule())
        self.registerModule(MixmasterSMTPModule())

    def _setQueueRoot(self, queueRoot):
	"""Sets a directory under which all modules' queue directories
	   should go."""
        self.queueRoot = queueRoot

    def getConfigSyntax(self):
	"""Returns a dict to extend the syntax configuration in a Config
	   object. Should be called after all modules are registered."""
        return self.syntax

    def registerModule(self, module):
	"""Inform this ModuleManager about a delivery module.  This method
   	   updates the syntax options, but does not enable the module."""
        LOG.info("Loading module %s", module.getName())
        self.modules.append(module)
        syn = module.getConfigSyntax()
        for sec, rules in syn.items():
            if self.syntax.has_key(sec):
                raise ConfigError("Multiple modules want to define [%s]"% sec)
        self.syntax.update(syn)
	self.nameToModule[module.getName()] = module

    def setPath(self, path):
	"""Sets the search path for Python modules"""
	if path:
	    self.path = path.split(":")
	else:
	    self.path = []

    def loadExtModule(self, className):
	"""Load and register a module from a python file.  Takes a classname
           of the format module.Class or package.module.Class.  Raises
	   MixError if the module can't be loaded."""
        ids = className.split(".")
        pyPkg = ".".join(ids[:-1])
        pyClassName = ids[-1]
	orig_path = sys.path[:]
	LOG.info("Loading module %s", className)
        try:
	    sys.path[0:0] = self.path
	    try:
		m = __import__(pyPkg, {}, {}, [pyClassName])
	    except ImportError, e:
		raise MixError("%s while importing %s" %(str(e),className))
        finally:
            sys.path = orig_path
	try:
	    pyClass = getattr(m, pyClassName)
	except AttributeError, e:
	    raise MixError("No class %s in module %s" %(pyClassName,pyPkg))
	try:
	    self.registerModule(pyClass())
	except Exception, e:
	    raise MixError("Error initializing module %s" %className)
	
    def validate(self, sections, entries, lines, contents):
	# (As in ServerConfig)
        for m in self.modules:
            m.validateConfig(sections, entries, lines, contents)

    def configure(self, config):
	self._setQueueRoot(os.path.join(config['Server']['Homedir'],
					'work', 'queues', 'deliver'))
	createPrivateDir(self.queueRoot)
        for m in self.modules:
            m.configure(config, self)

    def enableModule(self, module):
	"""Sets up the module manager to deliver all messages whose exitTypes
            are returned by <module>.getExitTypes() to the module."""
        for t in module.getExitTypes():
            if (self.typeToModule.has_key(t) and
                self.typeToModule[t].getName() != module.getName()):
                LOG.warn("More than one module is enabled for type %x"%t)
            self.typeToModule[t] = module

        LOG.info("Module %s: enabled for types %s",
                      module.getName(),
                      map(hex, module.getExitTypes()))

	queueDir = os.path.join(self.queueRoot, module.getName())
	queue = module.createDeliveryQueue(queueDir)
	self.queues[module.getName()] = queue
        self.enabled[module.getName()] = 1

    def cleanQueues(self):
	"""Remove trash messages from all internal queues."""
	for queue in self.queues.values():
	    queue.cleanQueue()
	
    def disableModule(self, module):
	"""Unmaps all the types for a module object."""
        LOG.info("Disabling module %s", module.getName())
        for t in module.getExitTypes():
            if (self.typeToModule.has_key(t) and
                self.typeToModule[t].getName() == module.getName()):
                del self.typeToModule[t]
	if self.queues.has_key(module.getName()):
	    del self.queues[module.getName()]
        if self.enabled.has_key(module.getName()):
            del self.enabled[module.getName()]

    def queueMessage(self, message, tag, exitType, address):
	"""Queue a message for delivery."""
	# FFFF Support non-exit messages.
        mod = self.typeToModule.get(exitType, None)
        if mod is None:
            LOG.error("Unable to handle message with unknown type %s",
                           exitType)
	    return
	queue = self.queues[mod.getName()]
	LOG.debug("Delivering message %r (type %04x) via module %s",
		       message[:8], exitType, mod.getName())
	try:
	    payload = mixminion.BuildMessage.decodePayload(message, tag)
	except MixError:
	    queue.queueDeliveryMessage((exitType, address, 'err'), message)
	    return
	if payload is None:
	    # enrypted message
	    queue.queueDeliveryMessage((exitType, address, tag), message)
	else:
	    # forward message
	    queue.queueDeliveryMessage((exitType, address, None), payload)

    def sendReadyMessages(self):
	for name, queue in self.queues.items():
	    queue.sendReadyMessages()

    def getServerInfoBlocks(self):
        return [ m.getServerInfoBlock() for m in self.modules
                       if self.enabled.get(m.getName(),0) ]

#----------------------------------------------------------------------
class DropModule(DeliveryModule):
    """Null-object pattern: drops all messages it receives."""
    def getConfigSyntax(self):
        return { }
    def getServerInfoBlock(self):
        return ""
    def configure(self, config, manager):
	manager.enableModule(self)
    def getName(self):
        return "DROP"
    def getExitTypes(self):
        return [ mixminion.Packet.DROP_TYPE ]
    def createDeliveryQueue(self, directory):
	return ImmediateDeliveryQueue(self)
    def processMessage(self, message, tag, exitType, exitInfo):
        LOG.debug("Dropping padding message")
        return DELIVER_OK

#----------------------------------------------------------------------
class MBoxModule(DeliveryModule):
    """Implementation for MBOX delivery: sends messages, via SMTP, to
       addresses from a local file.  The file must have the format
          addr: smtpaddr
          addr: smtpaddr
           ...
       
       When we receive a message send to 'addr', we deliver it to smtpaddr.
       """
    ##
    # Fields:
    #   addresses: a map from address to SMTP address
    #   server: the name of our SMTP server
    #   addressFile: the location of our address file
    #   returnAddress: the address we use in our 'From' line
    #   contact: the contact address we mention in our boilerplate
    #   nickname: our server nickname; for use in our boilerplate
    #   addr: our IP address, or "<Unknown IP>": for use in our boilerplate.
    def __init__(self):
        DeliveryModule.__init__(self)
        self.addresses = {}

    def getConfigSyntax(self):
	# FFFF There should be some way to say that fields are required
	# FFFF if the module is enabled.
        return { "Delivery/MBOX" :
                 { 'Enabled' : ('REQUIRE',  _parseBoolean, "no"),
                   'AddressFile' : ('ALLOW', None, None),
                   'ReturnAddress' : ('ALLOW', None, None),
                   'RemoveContact' : ('ALLOW', None, None),
                   'SMTPServer' : ('ALLOW', None, 'localhost') }
                 }

    def validateConfig(self, sections, entries, lines, contents):
	sec = sections['Delivery/MBOX']
	if not sec.get('Enabled'):
	    return
	for field in ['AddressFile', 'ReturnAddress', 'RemoveContact',
		      'SMTPServer']:
	    if not sec.get(field):
		raise ConfigError("Missing field %s in [Delivery/MBOX]"%field)
	if not os.path.exists(sec['AddressFile']):
	    raise ConfigError("Address file %s seems not to exist."%
			      sec['AddresFile'])
	for field in ['ReturnAddress', 'RemoveContact']:
	    if not isSMTPMailbox(sec[field]):
		LOG.warn("Value of %s (%s) doesn't look like an email address",
			 field, sec[field])
	    

    def configure(self, config, moduleManager):
	if not config['Delivery/MBOX'].get("Enabled", 0):
	    moduleManager.disableModule(self)
	    return

	sec = config['Delivery/MBOX']
	self.server = sec['SMTPServer']
	self.addressFile = sec['AddressFile']
	self.returnAddress = sec['ReturnAddress']
	self.contact = sec['RemoveContact']
	# validate should have caught these.
	assert (self.server and self.addressFile and self.returnAddress
		and self.contact)

        self.nickname = config['Server']['Nickname']
        if not self.nickname:
            self.nickname = socket.gethostname()
        self.addr = config['Incoming/MMTP'].get('IP', "<Unknown IP>")

	# Parse the address file.
	self.addresses = {}
        f = open(self.addressFile)
	address_line_re = re.compile(r'\s*([^\s:=]+)\s*[:=]\s*(\S+)')
	try:
	    lineno = 0
	    while 1:
                line = f.readline()
                if not line:
                    break
		line = line.strip()
		lineno += 1
		if line == '' or line[0] == '#':
		    continue
		m = address_line_re.match(line)
		if not m:
		    raise ConfigError("Bad address on line %s of %s"%(
			lineno,self.addressFile))
		self.addresses[m.group(1)] = m.group(2)
		LOG.trace("Mapping MBOX address %s -> %s", m.group(1),
			       m.group(2))
	finally:
	    f.close()

	moduleManager.enableModule(self)

    def getServerInfoBlock(self):
        return """\
                  [Delivery/MBOX]
                  Version: 0.1
               """

    def getName(self):
        return "MBOX"

    def getExitTypes(self):
        return [ mixminion.Packet.MBOX_TYPE ]

    def processMessage(self, message, tag, exitType, address):
	# Determine that message's address;
        assert exitType == mixminion.Packet.MBOX_TYPE
        LOG.trace("Received MBOX message")
        info = mixminion.Packet.parseMBOXInfo(address)
	try:
	    address = self.addresses[info.user]
	except KeyError:
            LOG.error("Unknown MBOX user %r", info.user)
	    return DELIVER_FAIL_NORETRY

	# Escape the message if it isn't plaintext ascii
        msg = _escapeMessageForEmail(message, tag)

	# Generate the boilerplate (FFFF Make this configurable)
        fields = { 'user': address,
                   'return': self.returnAddress,
                   'nickname': self.nickname,
                   'addr': self.addr,
                   'contact': self.contact,
                   'msg': msg }
        msg = """\
To: %(user)s
From: %(return)s
Subject: Anonymous Mixminion message

THIS IS AN ANONYMOUS MESSAGE.  The mixminion server '%(nickname)s' at
%(addr)s has been configured to deliver messages to your address.  
If you do not want to receive messages in the future, contact %(contact)s 
and you will be removed.

%(msg)s""" % fields

        # Deliver the message
        return sendSMTPMessage(self.server, [address], self.returnAddress, msg)

#----------------------------------------------------------------------
class SMTPModule(DeliveryModule):
    """Placeholder for real exit node implementation. 
       For now, use MixmasterSMTPModule"""
    def __init__(self):
        DeliveryModule.__init__(self)
    def getServerInfoBlock(self):
	return "[Delivery/SMTP]\nVersion: 0.1\n"
    def getName(self):
        return "SMTP"
    def getExitTypes(self):
        return [ mixminion.Packet.SMTP_TYPE ]

class MixmasterSMTPModule(SMTPModule):
    """Implements SMTP by relaying messages via Mixmaster nodes.  This
       is kind of unreliable and kludgey, but it does allow us to
       test mixminion by usingg Mixmaster nodes as exits."""
    # FFFF Mixmaster has tons of options.  Maybe we should use 'em...
    # FFFF ... or maybe we should deliberately ignore them, since
    # FFFF this is only a temporary workaround until enough people 
    # FFFF are running SMTP exit nodes
    ## Fields:
    # server: The path (usually a single server) to use for outgoing messages.
    #    Multiple servers should be separated by commas.
    # subject: The subject line we use for outgoing messages
    # command: The Mixmaster binary.
    # options: Options to pass to the Mixmaster binary when queueing messages
    # tmpQueue: An auxiliary Queue used to hold files so we can pass them to
    #    Mixmaster.  (This should go away; we should use stdin instead.)
    
    def __init__(self):
        SMTPModule.__init__(self)

    def getConfigSyntax(self):
        return { "Delivery/SMTP-Via-Mixmaster" :
                 { 'Enabled' : ('REQUIRE', _parseBoolean, "no"),
                   'MixCommand' : ('REQUIRE', _parseCommand, None),
                   'Server' : ('REQUIRE', None, None),
                   'SubjectLine' : ('ALLOW', None,
                                    'Type-III Anonymous Message'),
                   }
                 }
                   
    def validateConfig(self, sections, entries, lines, contents):
	# Currently, we accept any configuration options that the config allows
        pass

    def configure(self, config, manager):
        sec = config['Delivery/SMTP-Via-Mixmaster']
	if not sec.get("Enabled", 0):
            manager.disableModule(self)
            return
        cmd = sec['MixCommand']
        self.server = sec['Server']
        self.subject = sec['SubjectLine']
        self.command = cmd[0]
        self.options = tuple(cmd[1]) + ("-l", self.server,
					"-s", self.subject)
        manager.enableModule(self)

    def getName(self): 
        return "SMTP_MIX2" 

    def createDeliveryQueue(self, queueDir):
	# We create a temporary queue so we can hold files there for a little
	# while before passing their names to mixmaster.
        self.tmpQueue = mixminion.server.Queue.Queue(queueDir+"_tmp", 1, 1)
        self.tmpQueue.removeAll()
        return _MixmasterSMTPModuleDeliveryQueue(self, queueDir)

    def processMessage(self, message, tag, exitType, smtpAddress):
	"""Insert a message into the Mixmaster queue"""
        assert exitType == mixminion.Packet.SMTP_TYPE
	# parseSMTPInfo will raise a parse error if the mailbox is invalid.
        info = mixminion.Packet.parseSMTPInfo(smtpAddress)

        msg = _escapeMessageForEmail(message, tag)
        handle = self.tmpQueue.queueMessage(msg)

        cmd = self.command
        opts = self.options + ("-t", info.email,
                               self.tmpQueue.getMessagePath(handle))
        code = os.spawnl(os.P_WAIT, cmd, cmd, *opts)
        LOG.debug("Queued Mixmaster message: exit code %s", code)
        self.tmpQueue.removeMessage(handle)
        return DELIVER_OK
                         
    def flushMixmasterPool(self):
	"""Send all pending messages from the Mixmaster queue.  This
	   should be called after invocations of processMessage."""
        cmd = self.command
        LOG.debug("Flushing Mixmaster pool")
        os.spawnl(os.P_NOWAIT, cmd, cmd, "-S")

class _MixmasterSMTPModuleDeliveryQueue(SimpleModuleDeliveryQueue):
    """Delivery queue for _MixmasterSMTPModule.  Same as
       SimpleModuleDeliveryQueue, except that we must call flushMixmasterPool
       after queueing messages for Mixmaster."""
    def _deliverMessages(self, msgList):
        SimpleModuleDeliveryQueue._deliverMessages(self, msgList)
        self.module.flushMixmasterPool()
        
#----------------------------------------------------------------------

def sendSMTPMessage(server, toList, fromAddr, message):
    """Send a single SMTP message.  The message will be delivered to
       toList, and seem to originate from fromAddr.  We use 'server' as an
       MTA."""
    # FFFF This implementation can stall badly if we don't have a fast
    # FFFF local MTA.
    LOG.trace("Sending message via SMTP host %s to %s", server, toList)
    con = smtplib.SMTP(server)
    try:
	con.sendmail(fromAddr, toList, message)
	res = DELIVER_OK
    except smtplib.SMTPException, e:
	LOG.warn("Unsuccessful smtp: "+str(e))
	res = DELIVER_FAIL_RETRY

    con.quit()
    con.close()

    return res

#----------------------------------------------------------------------

def _escapeMessageForEmail(msg, tag):
    """Helper function: Given a message and tag, escape the message if
       it is not plaintext ascii, and wrap it in some standard
       boilerplate.  Add a disclaimer if the message is not ascii.

          msg -- A (possibly decoded) message
	  tag -- One of: a 20-byte decoding tag [if the message is encrypted
                            or a reply]
	                 None [if the message is in plaintext]
                         'err' [if the message was invalid.]

       Returns None on an invalid message."""
    m = _escapeMessage(msg, tag, text=1)
    if m is None:
	return None
    code, msg, tag = m

    if code == 'ENC':
	junk_msg = """\
This message is not in plaintext.  It's either 1) a reply; 2) a forward
message encrypted to you; or 3) junk.\n\n"""
    else:
	junk_msg = ""

    if tag is not None:
	tag = "Decoding handle: "+tag+"\n"
    else:
	tag = ""

    return """\
%s============ ANONYMOUS MESSAGE BEGINS
%s%s============ ANONYMOUS MESSAGE ENDS\n""" %(junk_msg, tag, msg)

def _escapeMessage(message, tag, text=0):
    """Helper: given a decoded message (and possibly its tag), determine
       whether the message is a text plaintext message (code='TXT'), a
       binary plaintext message (code 'BIN'), or an encrypted message/reply
       (code='ENC').  If requested, non-TXT messages are base-64 encoded.
 
       Returns: (code, message, tag (for ENC) or None (for BIN, TXT).
       Returns None if the message is invalid.

          message -- A (possibly decoded) message
	  tag -- One of: a 20-byte decoding tag [if the message is encrypted
                            or a reply]
	                 None [if the message is in plaintext]
                         'err' [if the message was invalid.]
          text -- flag: if true, non-TXT messages must be base64-encoded.
    """
    if tag == 'err':
	return None
    elif tag is not None:
	code = "ENC"
    else:
	assert tag is None
	if isPrintingAscii(message, allowISO=1):
	    code = "TXT"
	else:
	    code = "BIN"

    if text and (code != "TXT") :
	message = base64.encodestring(message)
    if text and tag:
	tag = base64.encodestring(tag).strip()

    return code, message, tag

--- NEW FILE: PacketHandler.py ---
# Copyright 2002 Nick Mathewson.  See LICENSE for licensing information.
# $Id: PacketHandler.py,v 1.1 2002/12/11 06:58:55 nickm Exp $

"""mixminion.PacketHandler: Code to process mixminion packets on a server"""

import mixminion.Crypto as Crypto
import mixminion.Packet as Packet
import mixminion.Common as Common

__all__ = [ 'PacketHandler', 'ContentError' ]

class ContentError(Common.MixError):
    """Exception raised when a packed is malformatted or unacceptable."""
    pass

class PacketHandler:
    """Class to handle processing packets.  Given an incoming packet,
       it removes one layer of encryption, does all necessary integrity
       checks, swaps headers if necessary, re-pads, and decides whether
       to drop the message, relay the message, or send the message to
       an exit handler."""
    ## Fields:
    # privatekey: list of RSA private keys that we accept
    # hashlog: list of HashLog objects corresponding to the keys.
    def __init__(self, privatekey, hashlog):
        """Constructs a new packet handler, given a private key object for
           header encryption, and a hashlog object to prevent replays.

           A sequence of private keys may be provided, if you'd like the
           server to accept messages encrypted with any of them.  Beware,
           though: PK decryption is expensive.  Also, a hashlog must be
           provided for each private key.
        """
        try:
            # Check whether we have a key or a sequence of keys.
            _ = privatekey[0]
            assert len(hashlog) == len(privatekey)
            
            self.privatekey = privatekey
            self.hashlog = hashlog
        except TypeError:
	    # Privatekey is not be subscriptable; we must have only one.
            self.privatekey = (privatekey, )
            self.hashlog = (hashlog, )

    def syncLogs(self):
	"""Sync all this PacketHandler's hashlogs."""
	for h in self.hashlog:
	    h.sync()

    def close(self):
	"""Close all this PacketHandler's hashlogs."""
	for h in self.hashlog:
	    h.close()

    def processMessage(self, msg):
        """Given a 32K mixminion message, processes it completely.

           Returns one of:
                    None [if the mesesage should be dropped.]
                    ("EXIT",
                       (exit_type, exit_info, application_key,
                        tag, payload)) [if this is the exit node]
                    ("QUEUE", (ipv4info, message_out))
                        [if this is a forwarding node]

           May raise CryptoError, ParseError, or ContentError if the packet
           is malformatted, misencrypted, unparseable, repeated, or otherwise
           unhandleable.

           WARNING: This implementation does nothing to prevent timing
           attacks: dropped messages, messages with bad digests, replayed
           messages, and exit messages are all processed faster than
           forwarded messages.  You must prevent timing attacks elsewhere."""

        # Break into headers and payload
        msg = Packet.parseMessage(msg)
        header1 = Packet.parseHeader(msg.header1)

        # Try to decrypt the first subheader.  Try each private key in
        # order.  Only fail if all private keys fail.
        subh = None
        e = None
        for pk, hashlog in zip(self.privatekey, self.hashlog):
            try:
                subh = Crypto.pk_decrypt(header1[0], pk)
                break
            except Crypto.CryptoError, err:
                e = err
        if not subh:
            # Nobody managed to get us the first subheader.  Raise the
            # most-recently-received error.
            raise e

        subh = Packet.parseSubheader(subh) #may raise ParseError

        # Check the version: can we read it?
        if subh.major != Packet.MAJOR_NO or subh.minor != Packet.MINOR_NO:
            raise ContentError("Invalid protocol version")

        # Check the digest of all of header1 but the first subheader.
        if subh.digest != Crypto.sha1(header1[1:]):
            raise ContentError("Invalid digest")

        # Get ready to generate message keys.
        keys = Crypto.Keyset(subh.secret)

        # Replay prevention
        replayhash = keys.get(Crypto.REPLAY_PREVENTION_MODE, 20)
        if hashlog.seenHash(replayhash):
            raise ContentError("Duplicate message detected.")
        else:
            hashlog.logHash(replayhash)

        # If we're meant to drop, drop now.
        rt = subh.routingtype
        if rt == Packet.DROP_TYPE:
            return None

        # Prepare the key to decrypt the header in counter mode.  We'll be
        # using this more than once.
        header_sec_key = Crypto.aes_key(keys.get(Crypto.HEADER_SECRET_MODE))

        # If the subheader says that we have extra blocks of routing info,
        # decrypt and parse them now.
        if subh.isExtended():
            nExtra = subh.getNExtraBlocks() 
            if (rt < Packet.MIN_EXIT_TYPE) or (nExtra > 15):
                # None of the native methods allow multiple blocks; no
                # size can be longer than the number of bytes in the rest
                # of the header.
                raise ContentError("Impossibly long routing info length")

            extra = Crypto.ctr_crypt(header1[1:1+nExtra], header_sec_key)
            subh.appendExtraBlocks(extra)
            remainingHeader = header1[1+nExtra:]
        else:
            nExtra = 0
            remainingHeader = header1[1:]

        # Decrypt the payload.
        payload = Crypto.lioness_decrypt(msg.payload,
                              keys.getLionessKeys(Crypto.PAYLOAD_ENCRYPT_MODE))

        # If we're an exit node, there's no need to process the headers
        # further.
        if rt >= Packet.MIN_EXIT_TYPE:
            return ("EXIT",
                    (rt, subh.getExitAddress(),
                     keys.get(Crypto.APPLICATION_KEY_MODE),
		     subh.getTag(),
                     payload))

        # If we're not an exit node, make sure that what we recognize our
        # routing type.
        if rt not in (Packet.SWAP_FWD_TYPE, Packet.FWD_TYPE):
            raise ContentError("Unrecognized Mixminion routing type")

        # Pad the rest of header 1
        remainingHeader = remainingHeader +\
                          Crypto.prng(keys.get(Crypto.PRNG_MODE),
                                      Packet.HEADER_LEN-len(remainingHeader))

        # Decrypt the rest of header 1, encrypting the padding.
        header1 = Crypto.ctr_crypt(remainingHeader, header_sec_key, nExtra*128)

        # Decrypt header 2.
        header2 = Crypto.lioness_decrypt(msg.header2,
                           keys.getLionessKeys(Crypto.HEADER_ENCRYPT_MODE))

        # If we're the swap node, (1) decrypt the payload with a hash of 
	# header2... (2) decrypt header2 with a hash of the payload...
	# (3) and swap the headers.
        if rt == Packet.SWAP_FWD_TYPE:
	    hkey = Crypto.lioness_keys_from_header(header2)
	    payload = Crypto.lioness_decrypt(payload, hkey)

            hkey = Crypto.lioness_keys_from_payload(payload)
            header2 = Crypto.lioness_decrypt(header2, hkey)

            header1, header2 = header2, header1

        # Build the address object for the next hop
        address = Packet.parseIPV4Info(subh.routinginfo)

        # Construct the message for the next hop.
        msg = Packet.Message(header1, header2, payload).pack()

        return ("QUEUE", (address, msg))

--- NEW FILE: Queue.py ---
# Copyright 2002 Nick Mathewson.  See LICENSE for licensing information.
# $Id: Queue.py,v 1.1 2002/12/11 06:58:55 nickm Exp $

"""mixminion.server.Queue

   Facility for fairly secure, directory-based, unordered queues. 
   """

import os
import base64
import time
import stat
import cPickle

from mixminion.Common import MixError, MixFatalError, secureDelete, LOG, \
     createPrivateDir
from mixminion.Crypto import AESCounterPRNG

__all__ = [ 'Queue', 'DeliveryQueue', 'TimedMixQueue', 'CottrellMixQueue',
	    'BinomialCottrellMixQueue' ]

# Mode to pass to open(2) for creating a new file, and dying if it already
# exists.
_NEW_MESSAGE_FLAGS = os.O_WRONLY+os.O_CREAT+os.O_EXCL
# On windows or mac, binary != text.
_NEW_MESSAGE_FLAGS += getattr(os, 'O_BINARY', 0)

# Any inp_* files older than INPUT_TIMEOUT seconds old are assumed to be
# trash.
INPUT_TIMEOUT = 6000

# If we've been cleaning for more than CLEAN_TIMEOUT seconds, assume the
# old clean is dead.
CLEAN_TIMEOUT = 120

class Queue:
    """A Queue is an unordered collection of files with secure insert, move,
       and delete operations.

       Implementation: a queue is a directory of 'messages'.  Each
       filename in the directory has a name in one of the following
       formats:
             rmv_HANDLE  (A message waiting to be deleted)
             msg_HANDLE  (A message waiting in the queue.
             inp_HANDLE  (An incomplete message being created.)
       (Where HANDLE is a randomly chosen 12-character selection from the
       characters 'A-Za-z0-9+-'.  [Collision probability is negligable.])
       """
       # How negligible?  A back-of-the-envelope approximation: The chance
       # of a collision reaches .1% when you have 3e9 messages in a single
       # queue.  If Alice somehow manages to accumulate a 96 gigabyte
       # backlog, we'll have bigger problems than name collision... such
       # as the fact that most Unices behave badly when confronted with
       # 3 billion files in the same directory... or the fact that,
       # at today's processor speeds, it will take Alice 3 or 4
       # CPU-years to clear her backlog.

    # Fields:   rng--a random number generator for creating new messages
    #                and getting a random slice of the queue.
    #           dir--the location of the queue.
    #           n_entries: the number of complete messages in the queue.
    #                 <0 if we haven't counted yet.
    def __init__(self, location, create=0, scrub=0):
        """Creates a queue object for a given directory, 'location'.  If
           'create' is true, creates the directory if necessary.  If 'scrub'
           is true, removes any incomplete or invalidated messages from the
           Queue."""

        secureDelete([]) # Make sure secureDelete is configured. HACK!

        self.rng = AESCounterPRNG()
        self.dir = location

        if not os.path.isabs(location):
            LOG.warn("Queue path %s isn't absolute.", location)

        if os.path.exists(location) and not os.path.isdir(location):
            raise MixFatalError("%s is not a directory" % location)

	createPrivateDir(location, nocreate=(not create))

        if scrub:
            self.cleanQueue()

	# Count messages on first time through.
	self.n_entries = -1

    def queueMessage(self, contents):
        """Creates a new message in the queue whose contents are 'contents',
           and returns a handle to that message."""
        f, handle = self.openNewMessage()
        f.write(contents)
        self.finishMessage(f, handle)
        return handle

    def queueObject(self, object):
	"""Queue an object using cPickle, and return a handle to that
	   object."""
        f, handle = self.openNewMessage()
        cPickle.dump(object, f, 1)
        self.finishMessage(f, handle)
        return handle

    def count(self, recount=0):
        """Returns the number of complete messages in the queue."""
	if self.n_entries >= 0 and not recount:
	    return self.n_entries
	else:
	    res = 0
	    for fn in os.listdir(self.dir):
		if fn.startswith("msg_"):
		    res += 1
	    self.n_entries = res
	    return res

    def pickRandom(self, count=None):
        """Returns a list of 'count' handles to messages in this queue.
           The messages are chosen randomly, and returned in a random order.

           If there are fewer than 'count' messages in the queue, all the
           messages will be retained."""
        handles = [ fn[4:] for fn in os.listdir(self.dir)
		           if fn.startswith("msg_") ]

	return self.rng.shuffle(handles, count)

    def getAllMessages(self):
	"""Returns handles for all messages currently in the queue.
           Note: this ordering is not guaranteed to be random"""
        return [fn[4:] for fn in os.listdir(self.dir) if fn.startswith("msg_")]

    def removeMessage(self, handle):
        """Given a handle, removes the corresponding message from the queue."""
        self.__changeState(handle, "msg", "rmv")

    def removeAll(self):
        """Removes all messages from this queue."""
        for m in os.listdir(self.dir):
            if m[:4] in ('inp_', 'msg_'):
                self.__changeState(m[4:], m[:3], "rmv")
	self.n_entries = 0
        self.cleanQueue()

    def moveMessage(self, handle, queue):
        """Given a handle and a queue, moves the corresponding message from
           this queue to the queue provided.  Returns a new handle for
           the message in the destination queue."""
        # Since we're switching handle, we don't want to just rename;
        # We really want to copy and delete the old file.
        newHandle = queue.queueMessage(self.messageContents(handle))
        self.removeMessage(handle)
        return newHandle

    def getMessagePath(self, handle):
        """Given a handle for an existing message, return the name of the
           file that contains that message."""
        return os.path.join(self.dir, "msg_"+handle)

    def openMessage(self, handle):
        """Given a handle for an existing message, returns a file descriptor
           open to read that message."""
        return open(os.path.join(self.dir, "msg_"+handle), 'rb')

    def messageContents(self, handle):
        """Given a message handle, returns the contents of the corresponding
           message."""
        f = open(os.path.join(self.dir, "msg_"+handle), 'rb')
        s = f.read()
        f.close()
        return s

    def getObject(self, handle):
        """Given a message handle, read and unpickle the contents of the
	   corresponding message."""
        f = open(os.path.join(self.dir, "msg_"+handle), 'rb')
        res = cPickle.load(f)
        f.close()
        return res

    def openNewMessage(self):
        """Returns (file, handle) tuple to create a new message.  Once
           you're done writing, you must call finishMessage to
           commit your changes, or abortMessage to reject them."""
        handle = self.__newHandle()
        fname = os.path.join(self.dir, "inp_"+handle)
        fd = os.open(fname, _NEW_MESSAGE_FLAGS, 0600)
        return os.fdopen(fd, 'wb'), handle

    def finishMessage(self, f, handle):
        """Given a file and a corresponding handle, closes the file
           commits the corresponding message."""
        f.close()
        self.__changeState(handle, "inp", "msg")

    def abortMessage(self, f, handle):
        """Given a file and a corresponding handle, closes the file
           rejects the corresponding message."""
        f.close()
        self.__changeState(handle, "inp", "rmv")

    def cleanQueue(self):
        """Removes all timed-out or trash messages from the queue.

           Returns 1 if a clean is already in progress; otherwise
           returns 0.
        """

        now = time.time()
        cleanFile = os.path.join(self.dir,".cleaning")

	cleaning = 1
	while cleaning:
	    try:
		# Try to get the .cleaning lock file.  If we can create it,
		# we're the only cleaner around.
		fd = os.open(cleanFile, os.O_WRONLY+os.O_CREAT+os.O_EXCL, 0600)
		os.write(fd, str(now))
		os.close(fd)
		cleaning = 0
	    except OSError:
		try:
		    # If we can't create the file, see if it's too old.  If it
		    # is too old, delete it and try again.  If it isn't, there
		    # may be a live clean in progress.
		    s = os.stat(cleanFile)
		    if now - s[stat.ST_MTIME] > CLEAN_TIMEOUT:
			os.unlink(cleanFile)
		    else:
			return 1
		except OSError:
		    # If the 'stat' or 'unlink' calls above fail, then 
		    # .cleaning must not exist, or must not be readable
		    # by us.
		    if os.path.exists(cleanFile):
			# In the latter case, bail out.
			return 1

        rmv = []
        allowedTime = int(time.time()) - INPUT_TIMEOUT
        for m in os.listdir(self.dir):
            if m.startswith("rmv_"):
                rmv.append(os.path.join(self.dir, m))
            elif m.startswith("inp_"):
                s = os.stat(m)
                if s[stat.ST_MTIME] < allowedTime:
                    self.__changeState(m[4:], "inp", "rmv")
                    rmv.append(os.path.join(self.dir, m))
        _secureDelete_bg(rmv, cleanFile)
        return 0

    def __changeState(self, handle, s1, s2):
        """Helper method: changes the state of message 'handle' from 's1'
           to 's2', and changes the internal count."""
        os.rename(os.path.join(self.dir, s1+"_"+handle),
                  os.path.join(self.dir, s2+"_"+handle))
	if self.n_entries < 0:
	    return
	if s1 == 'msg' and s2 != 'msg':
	    self.n_entries -= 1
	elif s1 != 'msg' and s2 == 'msg':
	    self.n_entries += 1

    def __newHandle(self):
        """Helper method: creates a new random handle."""
        junk = self.rng.getBytes(9)
        return base64.encodestring(junk).strip().replace("/","-")

class DeliveryQueue(Queue):
    """A DeliveryQueue implements a queue that greedily sends messages
       to outgoing streams that occasionally fail.  Messages in a
       DeliveryQueue are no longer unstructured text, but rather
       tuples of: (n_retries, addressing info, msg).

       This class is abstract. Implementors of this class should
       subclass it to add a _deliverMessages method.  Multiple
       invocations of this method may be active at a given time.  Upon
       success or failure, this method should cause deliverySucceeded
       or deliveryFailed to be called as appropriate.

       Users of this class will probably only want to call the queueMessage,
       sendReadyMessages, and nextMessageReadyAt methods.

       This class caches information about the directory state; it
       won't play nice if multiple instances are looking at the same
       directory.
    """
    ###
    # Fields:
    #    sendable -- A list of handles for all messages
    #           that we're not currently sending.
    #    pending -- Dict from handle->1, for all messages that we're
    #           currently sending.

    def __init__(self, location):
	Queue.__init__(self, location, create=1, scrub=1)
	self._rescan()

    def _rescan(self):
	"""Rebuild the internal state of this queue from the underlying
           directory."""
	self.pending = {}
	self.sendable = self.getAllMessages()

    def queueMessage(self, msg):
	if 1: raise MixError("Tried to call DeliveryQueue.queueMessage.")

    def queueDeliveryMessage(self, addr, msg, retry=0):
	"""Schedule a message for delivery.
	     addr -- An object to indicate the message's destination
	     msg -- the message itself
	     retry -- how many times so far have we tried to send?"""

        handle = self.queueObject( (retry, addr, msg) )
	self.sendable.append(handle)

	return handle

    def get(self,handle):
	"""Returns a (n_retries, addr, msg) payload for a given
	   message handle."""
        return self.getObject(handle)
	
    def sendReadyMessages(self):
	"""Sends all messages which are not already being sent."""

	handles = self.sendable
	messages = []
	self.sendable = []
	for h in handles:
	    retries, addr, msg = self.getObject(h)
	    messages.append((h, addr, msg, retries))
	    self.pending[h] = 1
	if messages:
	    self._deliverMessages(messages)

    def _deliverMessages(self, msgList):
	"""Abstract method; Invoked with a list of
  	   (handle, addr, message, n_retries) tuples every time we have a batch
	   of messages to send.

           For every handle in the list, delierySucceeded or deliveryFailed
	   should eventually be called, or the message will sit in the queue
	   indefinitely, without being retried."""

        # We could implement this as a single _deliverMessage(h,addr,m,n)
	# method, but that wouldn't allow implementations to batch
	# messages being sent to the same address.

	raise NotImplementedError("_deliverMessages")

    def deliverySucceeded(self, handle):
	"""Removes a message from the outgoing queue.  This method
	   should be invoked after the corresponding message has been
	   successfully delivered.
        """
	self.removeMessage(handle)
	del self.pending[handle]

    def deliveryFailed(self, handle, retriable=0):
	"""Removes a message from the outgoing queue, or requeues it
	   for delivery at a later time.  This method should be
	   invoked after the corresponding message has been
	   successfully delivered."""
	del self.pending[handle]
	if retriable:
	    # Queue the new one before removing the old one, for
	    # crash-proofness
	    retries, addr, msg = self.getObject(handle)
	    # FFFF This test makes us never retry past the 10th attempt.
	    # FFFF That's wrong; we should be smarter.
	    if retries <= 10:
		self.queueDeliveryMessage(addr, msg, retries+1)
	self.removeMessage(handle)

class TimedMixQueue(Queue):
    """A TimedMixQueue holds a group of files, and returns some of them
       as requested, according to a mixing algorithm that sends a batch
       of messages every N seconds."""
    # FFFF : interval is unused.
    ## Fields:
    #   interval: scanning interval, in seconds.
    def __init__(self, location, interval=600):
	"""Create a TimedMixQueue that sends its entire batch of messages
	   every 'interval' seconds."""
        Queue.__init__(self, location, create=1, scrub=1)
	self.interval = interval

    def getBatch(self):
	"""Return handles for all messages that the pool is currently ready
	   to send in the next batch"""
	return self.pickRandom()

    def getInterval(self):
	return self.interval

class CottrellMixQueue(TimedMixQueue):
    """A CottrellMixQueue holds a group of files, and returns some of them
       as requested, according the Cottrell (timed dynamic-pool) mixing
       algorithm from Mixmaster."""
    # FFFF : interval is unused.
    ## Fields:
    # interval: scanning interval, in seconds.
    # minPool: Minimum number of messages to keep in pool.
    # minSend: Minimum number of messages above minPool before we consider
    #      sending.
    # sendRate: Largest fraction of the pool to send at a time.
    def __init__(self, location, interval=600, minPool=6, minSend=1,
		 sendRate=.7):
	"""Create a new queue that yields a batch of message every 'interval'
	   seconds, always keeps <minPool> messages in the pool, never sends
	   unless it has <minPool>+<minSend> messages, and never sends more
	   than <sendRate> * the corrent pool size.

	   If 'minSend'==1, this is a real Cottrell (type-II) mix pool.
	   Otherwise, this is a generic 'timed dynamic-pool' mix pool.  (Note
	   that there is still a matter of some controversy whether it ever
	   makes sense to set minSend != 1.)
	   """

        # Note that there was a bit of confusion here: earlier versions
	# implemented an algorithm called "mixmaster" that wasn't actually the
	# mixmaster algorithm.  I picked up the other algorithm from an early
	# draft of Roger, Paul, and Andrei's 'Batching Taxonomy' paper (since
	# corrected); they seem to have gotten it from Anja Jerichow's
	# Phd. thesis ("Generalisation and Security Improvement of
	# Mix-mediated Anonymous Communication") of 2000.
	#
	# *THIS* is the algorithm that the current 'Batching Taxonomy' paper
	# says that Cottrell says is the real thing.

	TimedMixQueue.__init__(self, location, interval)
	self.minPool = minPool	
	self.minSend = minSend
	self.sendRate = sendRate

    def _getBatchSize(self):
	"Helper method: returns the number of messages to send."
	pool = self.count()
	if pool >= (self.minPool + self.minSend):
	    sendable = pool - self.minPool
	    return min(sendable, max(1, int(pool * self.sendRate)))
	else:
	    return 0

    def getBatch(self):
	"Returns a list of handles for the next batch of messages to send."
	n = self._getBatchSize()
	if n:
	    return self.pickRandom(n)
	else:
	    return []

class BinomialCottrellMixQueue(CottrellMixQueue):
    """Same algorithm as CottrellMixQueue, but instead of sending N messages
       from the pool of size P, sends each message with probability N/P."""
    def getBatch(self):
	n = self._getBatchSize()
	if n == 0:
	    return []
	msgProbability = n / float(self.count())
	return self.rng.shuffle([ h for h in self.getAllMessages()
				    if self.rng.getFloat() < msgProbability ])

def _secureDelete_bg(files, cleanFile):
    """Helper method: delete files in another thread, removing 'cleanFile' 
       once we're done."""
      
    pid = os.fork()
    if pid != 0:
        return pid
    # Now we're in the child process.
    try:
        secureDelete(files, blocking=1)
    except OSError:
        # This is sometimes thrown when shred finishes before waitpid.
        pass
    try:
        os.unlink(cleanFile)
    except OSError:
        pass
    os._exit(0)
    return None # Never reached.

--- NEW FILE: ServerMain.py ---
# Copyright 2002 Nick Mathewson.  See LICENSE for licensing information.
# $Id: ServerMain.py,v 1.1 2002/12/11 06:58:55 nickm Exp $

"""mixminion.ServerMain

   The main loop and related functionality for a Mixminion server.

   See the "MixminionServer" class for more information about how it
   all works. """
#FFFF We need support for encrypting private keys.

import os
import getopt
import sys
import time
import bisect

import mixminion._minionlib
import mixminion.Crypto
from mixminion.ServerInfo import ServerKeyset, ServerInfo, \
     generateServerDescriptorAndKeys
from mixminion.Common import LOG, MixFatalError, MixError, secureDelete, \
     createPrivateDir, previousMidnight, ceilDiv, formatDate, formatTime

import mixminion.server.Queue
import mixminion.server.MMTPServer
import mixminion.server.Modules
import mixminion.server.HashLog
import mixminion.server.PacketHandler


class ServerKeyring:
    """A ServerKeyring remembers current and future keys, descriptors, and
       hash logs for a mixminion server.

       FFFF We need a way to generate keys as needed, not just a month's
       FFFF worth of keys up front. 
       """
    ## Fields:
    # homeDir: server home directory
    # keyDir: server key directory
    # keySloppiness: fudge-factor: how forgiving are we about key liveness?
    # keyIntervals: list of (start, end, keyset Name)
    # liveKey: list of (start, end, keyset name for current key.)
    # nextRotation: time_t when this key expires.
    # keyRange: tuple of (firstKey, lastKey) to represent which key names
    #      have keys on disk.

    ## Directory layout:
    #    MINION_HOME/work/queues/incoming/ [Queue of received,unprocessed pkts]
    #                             mix/ [Mix pool]
    #                             outgoing/ [Messages for mmtp delivery]
    #                             deliver/mbox/ []
    #                      tls/dhparam [Diffie-Hellman parameters]
    #                      hashlogs/hash_1*  [HashLogs of packet hashes 
    #                               hash_2*    corresponding to key sets]
    #                                ...  
    #                 log [Messages from the server]
    #                 keys/identity.key [Long-lived identity PK]
    #                      key_1/ServerDesc [Server descriptor]
    #                            mix.key [packet key]
    #                            mmtp.key [mmtp key]
    #                            mmtp.cert [mmmtp key x509 cert]
    #                      key_2/...
    #                 conf/miniond.conf [configuration file]
    #                       ....

    # FFFF Support to put keys/queues in separate directories.

    def __init__(self, config):
	"Create a ServerKeyring from a config object"
	self.configure(config)

    def configure(self, config):
	"Set up a SeverKeyring from a config object"
	self.config = config
	self.homeDir = config['Server']['Homedir']
	self.keyDir = os.path.join(self.homeDir, 'keys')
	self.hashDir = os.path.join(self.homeDir, 'work', 'hashlogs')
	self.keySloppiness = config['Server']['PublicKeySloppiness'][2]
	self.checkKeys()

    def checkKeys(self):
	"""Internal method: read information about all this server's
	   currently-prepared keys from disk."""
        self.keyIntervals = []
	firstKey = sys.maxint
	lastKey = 0

	LOG.debug("Scanning server keystore at %s", self.keyDir)

	if not os.path.exists(self.keyDir):
	    LOG.info("Creating server keystore at %s", self.keyDir)
	    createPrivateDir(self.keyDir)

	# Iterate over the entires in HOME/keys
        for dirname in os.listdir(self.keyDir):
	    # Skip any that aren't directories named "key_INT"
	    if not os.path.isdir(os.path.join(self.keyDir,dirname)):
		continue
            if not dirname.startswith('key_'):
		LOG.warn("Unexpected directory %s under %s",
			      dirname, self.keyDir)
                continue
            keysetname = dirname[4:]
	    try:
		setNum = int(keysetname)
		# keep trace of the first and last used key number
		if setNum < firstKey: firstKey = setNum
		if setNum > lastKey: lastKey = setNum
	    except ValueError:
		LOG.warn("Unexpected directory %s under %s",
			      dirname, self.keyDir)
		continue

	    # Find the server descriptor...
            d = os.path.join(self.keyDir, dirname)
            si = os.path.join(d, "ServerDesc")
            if os.path.exists(si):
                inf = ServerInfo(fname=si, assumeValid=1)
		# And find out when it's valid.
                t1 = inf['Server']['Valid-After']
                t2 = inf['Server']['Valid-Until']
                self.keyIntervals.append( (t1, t2, keysetname) )
		LOG.debug("Found key %s (valid from %s to %s)",
			       dirname, formatDate(t1), formatDate(t2))
	    else:
		LOG.warn("No server descriptor found for key %s"%dirname)

	# Now, sort the key intervals by starting time.
        self.keyIntervals.sort()
	self.keyRange = (firstKey, lastKey)

	# Now we try to see whether we have more or less than 1 key in effect
	# for a given time.
	for idx in xrange(len(self.keyIntervals)-1):
	    end = self.keyIntervals[idx][1]
	    start = self.keyIntervals[idx+1][0]
	    if start < end:
		LOG.warn("Multiple keys for %s.  That's unsupported.",
			      formatDate(end))
	    elif start > end:
		LOG.warn("Gap in key schedule: no key from %s to %s",
			      formatDate(end), formatDate(start))

	self.nextKeyRotation = 0 # Make sure that now > nextKeyRotation before
	                         # we call _getLiveKey()
	self._getLiveKey()       # Set up liveKey, nextKeyRotation.

    def getIdentityKey(self):
	"""Return this server's identity key.  Generate one if it doesn't
	   exist."""
	password = None # FFFF Use this, somehow.
	fn = os.path.join(self.keyDir, "identity.key")
	bits = self.config['Server']['IdentityKeyBits']
	if os.path.exists(fn):
	    key = mixminion.Crypto.pk_PEM_load(fn, password)
	    keylen = key.get_modulus_bytes()*8
	    if keylen != bits:
		LOG.warn(
		    "Stored identity key has %s bits, but you asked for %s.",
		    keylen, bits)
	else:
	    LOG.info("Generating identity key. (This may take a while.)")
	    key = mixminion.Crypto.pk_generate(bits)
	    mixminion.Crypto.pk_PEM_save(key, fn, password)
	    LOG.info("Generated %s-bit identity key.", bits)

	return key

    def removeIdentityKey(self):
        """Remove this server's identity key."""
        fn = os.path.join(self.keyDir, "identity.key")
        if not os.path.exists(fn):
            LOG.info("No identity key to remove.")
        else:
            LOG.warn("Removing identity key in 10 seconds")
            time.sleep(10)
            LOG.warn("Removing identity key")
            secureDelete([fn], blocking=1)

	dhfile = os.path.join(self.homeDir, 'work', 'tls', 'dhparam')
        if os.path.exists('dhfile'):
            LOG.info("Removing diffie-helman parameters file")
            secureDelete([dhfile], blocking=1)

    def createKeys(self, num=1, startAt=None):
	"""Generate 'num' public keys for this server. If startAt is provided,
           make the first key become valid at'startAt'.  Otherwise, make the
	   first key become valid right after the last key we currently have
	   expires.  If we have no keys now, make the first key start now."""
        # FFFF Use this.
	#password = None

	if startAt is None:
	    if self.keyIntervals:
		startAt = self.keyIntervals[-1][1]+60
	    else:
		startAt = time.time()+60

	startAt = previousMidnight(startAt)

	firstKey, lastKey = self.keyRange

	for _ in xrange(num):
	    if firstKey == sys.maxint:
		keynum = firstKey = lastKey = 1
	    elif firstKey > 1:
		firstKey -= 1
		keynum = firstKey
	    else:
		lastKey += 1
		keynum = lastKey

	    keyname = "%04d" % keynum

	    nextStart = startAt + self.config['Server']['PublicKeyLifetime'][2]

	    LOG.info("Generating key %s to run from %s through %s (GMT)",
		     keyname, formatDate(startAt), 
		     formatDate(nextStart-3600))
 	    generateServerDescriptorAndKeys(config=self.config,
					    identityKey=self.getIdentityKey(),
					    keyname=keyname,
					    keydir=self.keyDir,
					    hashdir=self.hashDir,
					    validAt=startAt)
	    startAt = nextStart

        self.checkKeys()

    def removeDeadKeys(self, now=None):
	"""Remove all keys that have expired"""
        self.checkKeys()

        if now is None:
            now = time.time()
            expiryStr = " expired"
        else:
            expiryStr = ""

        cutoff = now - self.keySloppiness
	dirs = [ os.path.join(self.keyDir,"key_"+name)
                  for va, vu, name in self.keyIntervals if vu < cutoff ]

	for dirname, (va, vu, name) in zip(dirs, self.keyIntervals):
            LOG.info("Removing%s key %s (valid from %s through %s)",
                        expiryStr, name, formatDate(va), formatDate(vu-3600))
	    files = [ os.path.join(dirname,f)
                                 for f in os.listdir(dirname) ]
	    secureDelete(files, blocking=1)
	    os.rmdir(dirname)

	self.checkKeys()

    def _getLiveKey(self, when=None):
	"""Find the first key that is now valid.  Return (Valid-after,
	   valid-util, name)."""
        if not self.keyIntervals:
	    self.liveKey = None
	    self.nextKeyRotation = 0
	    return None

	w = when
	if when is None:
	    when = time.time()
	    if when < self.nextKeyRotation:
		return self.liveKey

	idx = bisect.bisect(self.keyIntervals, (when, None, None))-1
	k = self.keyIntervals[idx]
	if w is None:
	    self.liveKey = k
	    self.nextKeyRotation = k[1]

	return k

    def getNextKeyRotation(self):
	"""Return the expiration time of the current key"""
        return self.nextKeyRotation

    def getServerKeyset(self):
	"""Return a ServerKeyset object for the currently live key."""
	# FFFF Support passwords on keys
	_, _, name = self._getLiveKey()
	keyset = ServerKeyset(self.keyDir, name, self.hashDir)
	keyset.load()
	return keyset

    def getDHFile(self):
	"""Return the filename for the diffie-helman parameters for the
	   server.  Creates the file if it doesn't yet exist."""
	dhdir = os.path.join(self.homeDir, 'work', 'tls')
	createPrivateDir(dhdir)
	dhfile = os.path.join(dhdir, 'dhparam')
        if not os.path.exists(dhfile):
            LOG.info("Generating Diffie-Helman parameters for TLS...")
            mixminion._minionlib.generate_dh_parameters(dhfile, verbose=0)
            LOG.info("...done")
	else:
	    LOG.debug("Using existing Diffie-Helman parameter from %s",
			   dhfile)

        return dhfile

    def getTLSContext(self):
	"""Create and return a TLS context from the currently live key."""
        keys = self.getServerKeyset()
        return mixminion._minionlib.TLSContext_new(keys.getCertFileName(),
						   keys.getMMTPKey(),
						   self.getDHFile())

    def getPacketHandler(self):
	"""Create and return a PacketHandler from the currently live key."""
        keys = self.getServerKeyset()
	packetKey = keys.getPacketKey()
	hashlog = mixminion.server.HashLog.HashLog(keys.getHashLogFileName(),
						 keys.getMMTPKeyID())
        return mixminion.server.PacketHandler.PacketHandler(packetKey,
						     hashlog)

class IncomingQueue(mixminion.server.Queue.DeliveryQueue):
    """A DeliveryQueue to accept messages from incoming MMTP connections,
       process them with a packet handler, and send them into a mix pool."""

    def __init__(self, location, packetHandler):
	"""Create an IncomingQueue that stores its messages in <location>
	   and processes them through <packetHandler>."""
	mixminion.server.Queue.DeliveryQueue.__init__(self, location)
	self.packetHandler = packetHandler
	self.mixPool = None

    def connectQueues(self, mixPool):
	"""Sets the target mix queue"""
	self.mixPool = mixPool

    def queueMessage(self, msg):
	"""Add a message for delivery"""
	LOG.trace("Inserted message %r into incoming queue", msg[:8])
	self.queueDeliveryMessage(None, msg)

    def _deliverMessages(self, msgList):
	"Implementation of abstract method from DeliveryQueue."
	ph = self.packetHandler
	for handle, _, message, n_retries in msgList:
	    try:
		res = ph.processMessage(message)
		if res is None:
		    # Drop padding before it gets to the mix.
		    LOG.debug("Padding message %r dropped", 
				   message[:8])
		else:
		    LOG.debug("Processed message %r; inserting into pool",
				   message[:8])
		    self.mixPool.queueObject(res)
		    self.deliverySucceeded(handle)
	    except mixminion.Crypto.CryptoError, e:
		LOG.warn("Invalid PK or misencrypted packet header: %s",
			      e)
		self.deliveryFailed(handle)
	    except mixminion.Packet.ParseError, e:
		LOG.warn("Malformed message dropped: %s", e)
		self.deliveryFailed(handle)
	    except mixminion.server.PacketHandler.ContentError, e:
		LOG.warn("Discarding bad packet: %s", e)
		self.deliveryFailed(handle)

class MixPool:
    """Wraps a mixminion.server.Queue.*MixQueue to send messages to an exit queue
       and a delivery queue."""
    def __init__(self, queue):
	"""Create a new MixPool to wrap a given *MixQueue."""
	self.queue = queue
	self.outgoingQueue = None
	self.moduleManager = None

    def queueObject(self, obj):
	"""Insert an object into the queue."""
	self.queue.queueObject(obj)

    def count(self):
	"Return the number of messages in the queue"
	return self.queue.count()

    def connectQueues(self, outgoing, manager):
	"""Sets the queue for outgoing mixminion packets, and the
  	   module manager for deliverable messages."""
	self.outgoingQueue = outgoing
	self.moduleManager = manager

    def mix(self):
	"""Get a batch of messages, and queue them for delivery as
	   appropriate."""
	handles = self.queue.getBatch()
	LOG.debug("Mixing %s messages out of %s", 
		       len(handles), self.queue.count())
	for h in handles:
	    tp, info = self.queue.getObject(h)
	    if tp == 'EXIT':
		rt, ri, app_key, tag, payload = info
		LOG.debug("  (sending message %r to exit modules)", 
			       payload[:8])
		self.moduleManager.queueMessage(payload, tag, rt, ri)
	    else:
		assert tp == 'QUEUE'
		ipv4, msg = info
		LOG.debug("  (sending message %r to MMTP server)", 
			       msg[:8])
		self.outgoingQueue.queueDeliveryMessage(ipv4, msg)
	    self.queue.removeMessage(h)

class OutgoingQueue(mixminion.server.Queue.DeliveryQueue):
    """DeliveryQueue to send messages via outgoing MMTP connections."""
    def __init__(self, location):
	"""Create a new OutgoingQueue that stores its messages in a given
 	   location."""
        mixminion.server.Queue.DeliveryQueue.__init__(self, location)
	self.server = None

    def connectQueues(self, server):
	"""Set the MMTPServer that this OutgoingQueue informs of its
	   deliverable messages."""
	self.server = server

    def _deliverMessages(self, msgList):
	"Implementation of abstract method from DeliveryQueue."
	# Map from addr -> [ (handle, msg) ... ]
	msgs = {}
	for handle, addr, message, n_retries in msgList:
	    msgs.setdefault(addr, []).append( (handle, message) )
	for addr, messages in msgs.items():
	    handles, messages = zip(*messages)
	    self.server.sendMessages(addr.ip, addr.port, addr.keyinfo,
				     list(messages), list(handles))

class _MMTPServer(mixminion.server.MMTPServer.MMTPAsyncServer):
    """Implementation of mixminion.server.MMTPServer that knows about
       delivery queues."""
    def __init__(self, config, tls):
        mixminion.server.MMTPServer.MMTPAsyncServer.__init__(self, config, tls)

    def connectQueues(self, incoming, outgoing):
        self.incomingQueue = incoming
        self.outgoingQueue = outgoing

    def onMessageReceived(self, msg):
        self.incomingQueue.queueMessage(msg)

    def onMessageSent(self, msg, handle):
        self.outgoingQueue.deliverySucceeded(handle)

    def onMessageUndeliverable(self, msg, handle, retriable):
	self.outgoingQueue.deliveryFailed(handle, retriable)

class MixminionServer:
    """Wraps and drives all the queues, and the async net server.  Handles
       all timed events."""
    ## Fields:
    # config: The ServerConfig object for this server
    # keyring: The ServerKeyring
    #
    # mmtpServer: Instance of mixminion.ServerMain._MMTPServer.  Receives 
    #    and transmits packets from the network.  Places the packets it
    #    receives in self.incomingQueue.
    # incomingQueue: Instance of IncomingQueue.  Holds received packets 
    #    before they are decoded.  Decodes packets with PacketHandler,
    #    and places them in mixPool.
    # packetHandler: Instance of PacketHandler.  Used by incomingQueue to 
    #    decrypt, check, and re-pad received packets.
    # mixPool: Instance of MixPool.  Holds processed messages, and 
    #    periodically decides which ones to deliver, according to some
    #    batching algorithm.
    # moduleManager: Instance of ModuleManager.  Map routing types to
    #    outging queues, and processes non-MMTP exit messages.
    # outgoingQueue: Holds messages waiting to be send via MMTP.

    def __init__(self, config):
	"""Create a new server from a ServerConfig."""
	LOG.debug("Initializing server")
	self.config = config
	self.keyring = ServerKeyring(config)
	if self.keyring._getLiveKey() is None:
	    LOG.info("Generating a month's worth of keys.")
	    LOG.info("(Don't count on this feature in future versions.)")
	    # We might not be able to do this, if we password-encrypt keys
	    keylife = config['Server']['PublicKeyLifetime'][2]
	    nKeys = ceilDiv(30*24*60*60, keylife)
	    self.keyring.createKeys(nKeys)

	LOG.trace("Initializing packet handler")
	self.packetHandler = self.keyring.getPacketHandler()
	LOG.trace("Initializing TLS context")
	tlsContext = self.keyring.getTLSContext()
	LOG.trace("Initializing MMTP server")
	self.mmtpServer = _MMTPServer(config, tlsContext)

	# FFFF Modulemanager should know about async so it can patch in if it
	# FFFF needs to.
	LOG.trace("Initializing delivery module")
	self.moduleManager = config.getModuleManager()
	self.moduleManager.configure(config)

	homeDir = config['Server']['Homedir']
	queueDir = os.path.join(homeDir, 'work', 'queues')

	incomingDir = os.path.join(queueDir, "incoming")
	LOG.trace("Initializing incoming queue")
	self.incomingQueue = IncomingQueue(incomingDir, self.packetHandler)
	LOG.trace("Found %d pending messages in incoming queue", 
		       self.incomingQueue.count())

	mixDir = os.path.join(queueDir, "mix")
	# FFFF The choice of mix algorithm should be configurable
	LOG.trace("Initializing Mix pool")
	self.mixPool =MixPool(mixminion.server.Queue.TimedMixQueue(mixDir, 60))
	LOG.trace("Found %d pending messages in Mix pool",
		       self.mixPool.count())

	outgoingDir = os.path.join(queueDir, "outgoing")
	LOG.trace("Initializing outgoing queue")
	self.outgoingQueue = OutgoingQueue(outgoingDir)
	LOG.trace("Found %d pending messages in outgoing queue",
		       self.outgoingQueue.count())

	LOG.trace("Connecting queues")
	self.incomingQueue.connectQueues(mixPool=self.mixPool)
	self.mixPool.connectQueues(outgoing=self.outgoingQueue,
				   manager=self.moduleManager)
	self.outgoingQueue.connectQueues(server=self.mmtpServer)
	self.mmtpServer.connectQueues(incoming=self.incomingQueue,
				      outgoing=self.outgoingQueue)

    def run(self):
	"""Run the server; don't return unless we hit an exception."""
	# FFFF Use heapq to schedule events? [I don't think so; there are only
	# FFFF   two events, after all!]
	now = time.time()
	MIX_INTERVAL = 20  # FFFF Configurable!
	nextMix = now + MIX_INTERVAL
	nextShred = now + 6000
	#FFFF Unused
	#nextRotate = self.keyring.getNextKeyRotation()
	while 1:
	    LOG.trace("Next mix at %s", formatTime(nextMix,1))
	    while time.time() < nextMix:
		# Handle pending network events
		self.mmtpServer.process(1)
		# Process any new messages that have come in, placing them
		# into the mix pool.
		self.incomingQueue.sendReadyMessages()

	    # Before we mix, we need to log the hashes to avoid replays.
	    # FFFF We need to recover on server failure.
	    self.packetHandler.syncLogs()

	    LOG.trace("Mix interval elapsed")
	    # Choose a set of outgoing messages; put them in outgoingqueue and
	    # modulemanger
	    self.mixPool.mix()
	    # Send outgoing messages
	    self.outgoingQueue.sendReadyMessages()
	    # Send exit messages
	    self.moduleManager.sendReadyMessages()

	    # Choose next mix interval
	    now = time.time()
	    nextMix = now + MIX_INTERVAL

	    if now > nextShred:
		# FFFF Configurable shred interval
		LOG.trace("Expunging deleted messages from queues")
		self.incomingQueue.cleanQueue()
		self.mixPool.queue.cleanQueue()
		self.outgoingQueue.cleanQueue()
		self.moduleManager.cleanQueues()
		nextShred = now + 6000

    def close(self):
	"""Release all resources; close all files."""
	self.packetHandler.close()

#----------------------------------------------------------------------
def usageAndExit(cmd):
    executable = sys.argv[0]
    print >>sys.stderr, "Usage: %s %s [-h] [-f configfile]" % (executable, cmd)
    sys.exit(0)

def configFromServerArgs(cmd, args):
    options, args = getopt.getopt(args, "hf:", ["help", "config="])
    if args:
	usageAndExit(cmd)
    configFile = "/etc/mixminiond.conf"
    for o,v in options:
	if o in ('-h', '--help'):
	    usageAndExit(cmd)
	if o in ('-f', '--config'):
	    configFile = v

    return readConfigFile(configFile)

def readConfigFile(configFile):
    try:
	return mixminion.Config.ServerConfig(fname=configFile)
    except (IOError, OSError), e:
	print >>sys.stderr, "Error reading configuration file %r:"%configFile
	print >>sys.stderr, "   ", str(e)
	sys.exit(1)
    except mixminion.Config.ConfigError, e:
	print >>sys.stderr, "Error in configuration file %r"%configFile
	print >>sys.stderr, str(e)
	sys.exit(1)
    return None #suppress pychecker warning

#----------------------------------------------------------------------
def runServer(cmd, args):
    config = configFromServerArgs(cmd, args)
    try:
	mixminion.Common.LOG.configure(config)
	LOG.debug("Configuring server")
	mixminion.Common.configureShredCommand(config)
	mixminion.Crypto.init_crypto(config)

	server = MixminionServer(config)
    except:
	LOG.fatal_exc(sys.exc_info(),"Exception while configuring server")
	print >>sys.stderr, "Shutting down because of exception"
        #XXXX print stack trace as well as logging?
	sys.exit(1)

    LOG.info("Starting server")
    try:
	server.run()
    except KeyboardInterrupt:
	pass
    except:
	LOG.fatal_exc(sys.exc_info(),"Exception while running server")
        #XXXX print stack trace as well as logging?
    LOG.info("Server shutting down")
    server.close()
    LOG.info("Server is shut down")

    sys.exit(0)

#----------------------------------------------------------------------
def runKeygen(cmd, args):
    options, args = getopt.getopt(args, "hf:n:",
                                  ["help", "config=", "keys="])
    # FFFF password-encrypted keys
    # FFFF Ability to fill gaps
    # FFFF Ability to generate keys with particular start/end intervals
    keys=1
    usage=0
    configFile = '/etc/miniond.conf'
    for opt,val in options:
	if opt in ('-h', '--help'):
	    usage=1
	elif opt in ('-f', '--config'):
	    configFile = val
	elif opt in ('-n', '--keys'):
	    try:
		keys = int(val)
	    except ValueError:
		print >>sys.stderr,("%s requires an integer" %opt)
		usage = 1
    if usage:
        print >>sys.stderr, "Usage: %s [-h] [-f configfile] [-n nKeys]"%cmd
        sys.exit(1)

    config = readConfigFile(configFile)

    LOG.setMinSeverity("INFO")
    mixminion.Crypto.init_crypto(config)
    keyring = ServerKeyring(config)
    print >>sys.stderr, "Creating %s keys..." % keys
    for i in xrange(keys):
	keyring.createKeys(1)
	print >> sys.stderr, ".... (%s/%s done)" % (i+1,keys)

#----------------------------------------------------------------------
def removeKeys(cmd, args):
    # FFFF Resist removing keys that have been published.
    # FFFF Generate 'suicide note' for removing identity key.
    options, args = getopt.getopt(args, "hf:", ["help", "config=",
                                                "remove-identity"])
    if args:
        print >>sys.stderr, "%s takes no arguments"%cmd
        usage = 1
        args = options = ()
    usage = 0
    removeIdentity = 0
    configFile = '/etc/miniond.conf'
    for opt,val in options:
	if opt in ('-h', '--help'):
	    usage=1
	elif opt in ('-f', '--config'):
	    configFile = val
	elif opt == '--remove-identity':
            removeIdentity = 1
    if usage:
        print >>sys.stderr, \
              "Usage: %s [-h|--help] [-f configfile] [--remove-identity]"%cmd
        sys.exit(1)

    config = readConfigFile(configFile)
    mixminion.Common.configureShredCommand(config)
    LOG.setMinSeverity("INFO")
    keyring = ServerKeyring(config)
    keyring.checkKeys()
    # This is impossibly far in the future.
    keyring.removeDeadKeys(now=(1L << 36))
    if removeIdentity:
        keyring.removeIdentityKey()
    LOG.info("Done removing keys")

--- NEW FILE: __init__.py ---
# Copyright 2002 Nick Mathewson.  See LICENSE for licensing information.
# $Id: __init__.py,v 1.1 2002/12/11 06:58:55 nickm Exp $

"""mixminion

   Client and server code for type III anonymous remailers.
   """

__all__ = [ ]

#'MMTPServer', 'Queue', 'HashLog', 'PacketHandler', 'Modules',
#	    'ServerMain' ]
## import mixminion.server.MMTPServer as MMTPServer
## import mixminion.server.PacketHandler as PacketHandler
## import mixminion.server.HashLog as HashLog
## import mixminion.server.Modules as Modules
## import mixminion.server.Queue as Queue
## import mixminion.server.ServerMain as ServerMain