[Author Prev][Author Next][Thread Prev][Thread Next][Author Index][Thread Index]
[minion-cvs] Resolve numerous TODO items, including path selection, ...
Update of /home/minion/cvsroot/src/minion/lib/mixminion/server
In directory moria.mit.edu:/tmp/cvs-serv25930/lib/mixminion/server
Modified Files:
MMTPServer.py Modules.py PacketHandler.py ServerMain.py
Log Message:
Resolve numerous TODO items, including path selection, unit tests,
bugfixes, and UI issues.
ClientDirectory:
- Add configurable timeouts for directory retrieval
- Make ClientDirectories, when available, help map keyids to nicer,
more readily displayable strings.
- Debug getNicknameByKeyID
- Debug compressFormatMap
- Make syntax for exit addresses more flexible
- Only warn once for each unrecognized exit type.
- Add 'getAvgLength' to PathElement
- Minimize ~ elements to 1 hop
- Add a kludge to allow paths of the form ~N to work as forward
paths.
ClientMain:
- Deal with function renaming
- Add (partial) support for sending messages for user-side
fragmentation
- ***ALWAYS*** scramble packet order before sending them.
- Rename Messages to Packets throughout as appropriate.
- Bugfix: only de-queue messages when delivery is successful.
- Refactor flushQueue to use existing support functions.
- Rename list-servers options again: --cascade and
--cascade-features are probably more readable than --cascade=1
and --cascade=2.
ClientUtils:
- Rename LazyEncryptedPickle to LazyEncryptedStore; begin refactoring
- Use displayServer() as appropriate
Config:
- Make _formatEntry use new unparsing functionality.
- Add options for user-configurable timeout.
MMTPClient:
- Rename 'message' to 'packet' as appropriate.
- Use new displayServer() function to pretty-print server names
instead of IPs whenever possible.
- Change error-printing options to handle timeouts and ssl version
errors better.
Main:
- Add stubbed code to handle invocation as 'mixminiond': defer for
now.
NetUtils:
- Add strings to TimeoutErrors
Packet:
- Rename 'message' to 'packet' as appropriate.
- Improve display of reply blocks.
ServerInfo:
- Add new displayServer() function to pretty-print servers, looking
up their actual nicknames whereever possible. Servers can't take
full advantage of this yet, since they don't download directories,
but they can still call displayServer without harm.
test:
- Rename 'message' to 'packet' as appropriate
- Adjust ServerInfo tests to not try to make server with 'frag' but
no actual delivery module, since this isn't allowed any more.
- Add tests for getFeature()
- Add tests for featureMap-related functions
- Make tests for DNS lookup less time-dependent
- Tests for new literal exit type behavior.
MMTPServer:
- Rename 'message' to 'packet' as appropriate
- Don't alias the LOG.* functions any more. Apparently, this was
allowing a long-standing bug where all DEBUG messages in
MMTPServer were getting tagged as INFO messages.
- (Partial) Use displayServer() as appropriate.
Modules:
- Don't allow Delivery/Fragmented unless some other kind of delivery
method is supported.
PacketHandler, ServerMain:
- s/message/packet/ as appropriate
Index: MMTPServer.py
===================================================================
RCS file: /home/minion/cvsroot/src/minion/lib/mixminion/server/MMTPServer.py,v
retrieving revision 1.56
retrieving revision 1.57
diff -u -d -r1.56 -r1.57
--- MMTPServer.py 10 Nov 2003 04:12:20 -0000 1.56
+++ MMTPServer.py 19 Nov 2003 09:48:10 -0000 1.57
@@ -7,7 +7,7 @@
a nonblocking implementation of *both* the client and the server sides
of the protocol.
- If you just want to send messages into the system, use MMTPClient.
+ If you just want to send packets into the system, use MMTPClient.
"""
# NOTE FOR THE CURIOUS: The 'asyncore' module in the standard library
@@ -30,21 +30,16 @@
from mixminion.Common import MixError, MixFatalError, MixProtocolError, \
LOG, stringContains, MessageQueue, QueueEmpty
from mixminion.Crypto import sha1, getCommonPRNG
-from mixminion.Packet import MESSAGE_LEN, DIGEST_LEN, IPV4Info, MMTPHostInfo
+from mixminion.Packet import PACKET_LEN, DIGEST_LEN, IPV4Info, MMTPHostInfo
from mixminion.MMTPClient import PeerCertificateCache
from mixminion.NetUtils import IN_PROGRESS_ERRNOS, getProtocolSupport, AF_INET, AF_INET6
import mixminion.server.EventStats as EventStats
from mixminion.Filestore import CorruptedFile
+from mixminion.ServerInfo import displayServer
__all__ = [ 'AsyncServer', 'ListenConnection', 'MMTPServerConnection',
'MMTPClientConnection' ]
-trace = LOG.trace
-info = LOG.info
-debug = LOG.info
-warn = LOG.warn
-error = LOG.error
-
class AsyncServer:
"""AsyncServer is the core of a general-purpose asynchronous
select-based server loop. AsyncServer maintains two lists of
@@ -204,7 +199,8 @@
self.ip, self.port, e))
self.sock.listen(backlog)
self.connectionFactory = connectionFactory
- info("Listening at %s on port %s (fd %s)", ip, port,self.sock.fileno())
+ LOG.info("Listening at %s on port %s (fd %s)",
+ ip, port, self.sock.fileno())
def register(self, server):
server.registerReader(self)
@@ -212,16 +208,16 @@
def handleRead(self):
con, addr = self.sock.accept()
- debug("Accepted connection from %s (fd %s)", addr, con.fileno())
+ LOG.debug("Accepted connection from %s (fd %s)", addr, con.fileno())
rw = self.connectionFactory(con)
rw.register(self.server)
def shutdown(self):
- debug("Closing listener connection (fd %s)", self.sock.fileno())
+ LOG.debug("Closing listener connection (fd %s)", self.sock.fileno())
self.server.unregister(self)
del self.server
self.sock.close()
- info("Server connection closed")
+ LOG.info("Server connection closed")
def fileno(self):
return self.sock.fileno()
@@ -271,6 +267,7 @@
tls -- An underlying TLS connection.
serverMode -- If true, we start with a server-side negotatiation.
otherwise, we start with a client-side negotatiation.
+ DOCDOC address
"""
self.__sock = sock
self.__con = tls
@@ -372,17 +369,17 @@
while 1:
done = self.__con.shutdown() # may throw want*
if not done and self.__awaitingShutdown:
- error("Shutdown returned zero twice from %s -- bailing",
- self.address)
+ LOG.error("Shutdown returned zero twice from %s -- bailing",
+ self.address)
done = 1
if done:
- debug("Got a completed shutdown from %s", self.address)
+ LOG.debug("Got a completed shutdown from %s", self.address)
self.__sock.close()
self.__state = None
self.shutdownFinished()
return
else:
- trace("Shutdown returned zero -- entering read mode")
+ LOG.trace("Shutdown returned zero -- entering read mode")
self.__awaitingShutdown = 1
#DODOC is this right?
if 1:
@@ -404,14 +401,15 @@
r = self.__con.read(1024) #may throw want*
if r == 0:
if self.__awaitingShutdown:
- debug("read returned 0: shutdown complete (fd %s)",self.fd)
+ LOG.debug("read returned 0: shutdown complete (fd %s)",
+ self.fd)
else:
- debug("read returned 0: shutting down (fd %s)", self.fd)
+ LOG.debug("read returned 0: shutting down (fd %s)",self.fd)
self.shutdown(err=0)
return
else:
assert isinstance(r, StringType)
- trace("read got %s bytes (fd %s)", len(r), self.fd)
+ LOG.trace("read got %s bytes (fd %s)", len(r), self.fd)
self.__inbuf.append(r)
self.__inbuflen += len(r)
if not self.__con.pending():
@@ -421,19 +419,19 @@
self.__inbuf = ["".join(self.__inbuf)]
if self.__expectReadLen and self.__inbuflen > self.__expectReadLen:
- warn("Protocol violation: too much data. Closing connection to %s",
- self.address)
+ LOG.warn("Protocol violation: too much data. Closing connection to %s",
+ self.address)
self.shutdown(err=1, retriable=0)
return
if self.__terminator and stringContains(self.__inbuf[0],
self.__terminator):
- trace("read found terminator (fd %s)", self.fd)
+ LOG.trace("read found terminator (fd %s)", self.fd)
self.__server.unregister(self)
self.finished()
if self.__expectReadLen and (self.__inbuflen == self.__expectReadLen):
- trace("read got enough (fd %s)", self.fd)
+ LOG.trace("read got enough (fd %s)", self.fd)
self.__server.unregister(self)
self.finished()
@@ -465,7 +463,7 @@
def tryTimeout(self, cutoff):
if self.lastActivity <= cutoff:
- warn("Connection to %s timed out", self.address)
+ LOG.warn("Connection to %s timed out", self.address)
# ???? I'm not sure this is right. Instead of just killing
# ???? the socket, should we shut down the SSL too?
self.__sock.close()
@@ -500,9 +498,9 @@
self.__server.registerReader(self)
except _ml.TLSClosed:
if self.__connecting:
- warn("Couldn't connect to server %s", self.address)
+ LOG.warn("Couldn't connect to server %s", self.address)
else:
- warn("Unexpectedly closed connection to %s", self.address)
+ LOG.warn("Unexpectedly closed connection to %s", self.address)
self.__sock.close()
if not self.__failed:
self.__failed = 1
@@ -510,13 +508,16 @@
self.remove()
except _ml.TLSError, e:
if self.__state != self.__shutdownFn and (not self.__awaitingShutdown):
- warn("Unexpected TLS error: %s. Closing connection to %s.",
- e, self.address)
+ e = str(e)
+ if e == 'wrong version number':
+ e = 'wrong version number (or failed handshake)'
+ LOG.warn("Unexpected TLS error: %s. Closing connection to %s.",
+ e, self.address)
self.shutdown(err=1, retriable=1)
self.__handleAll() # Try another round of the loop.
else:
- warn("Error while shutting down: closing connection to %s",
- self.address)
+ LOG.warn("Error while shutting down: closing connection to %s",
+ self.address)
self.__sock.close()
if not self.__failed:
self.__failed = 1
@@ -591,24 +592,24 @@
PROTOCOL_STRING = "MMTP 0.3\r\n"
# The protocol specification to expect.
PROTOCOL_RE = re.compile("MMTP ([^\s\r\n]+)\r\n")
-# Control line for sending a message.
+# Control line for sending a packet
SEND_CONTROL = "SEND\r\n"
# Control line for sending padding.
JUNK_CONTROL = "JUNK\r\n"
-# Control line for acknowledging a message
+# Control line for acknowledging a packet
RECEIVED_CONTROL = "RECEIVED\r\n"
-# Control line for refusing a message
+# Control line for refusing a packet
REJECTED_CONTROL = "REJECTED\r\n"
SEND_CONTROL_LEN = len(SEND_CONTROL)
RECEIVED_CONTROL_LEN = len(RECEIVED_CONTROL)
-SEND_RECORD_LEN = len(SEND_CONTROL) + MESSAGE_LEN + DIGEST_LEN
+SEND_RECORD_LEN = len(SEND_CONTROL) + PACKET_LEN + DIGEST_LEN
RECEIVED_RECORD_LEN = RECEIVED_CONTROL_LEN + DIGEST_LEN
class MMTPServerConnection(SimpleTLSConnection):
'''An asynchronous implementation of the receiving side of an MMTP
connection.'''
## Fields:
- # messageConsumer: a function to call with all received messages.
+ # packetConsumer: a function to call with all received packets.
# finished: callback when we're done with a read or write; see
# SimpleTLSConnection.
# protocol: The MMTP protocol version we're currently using, or None
@@ -621,7 +622,7 @@
# rejectPackets: a flag: should we reject incoming packets?
PROTOCOL_VERSIONS = [ '0.3' ]
def __init__(self, sock, tls, consumer, rejectPackets=0):
- """Create an MMTP connection to receive messages sent along a given
+ """Create an MMTP connection to receive packets sent along a given
socket. When valid packets are received, pass them to the
function 'consumer'. If rejectPackets is true, then instead of
accepting packets, we refuse them instead--for example, if the
@@ -631,7 +632,7 @@
EventStats.log.receivedConnection() #FFFF addr
- self.messageConsumer = consumer
+ self.packetConsumer = consumer
self.junkCallback = lambda : None
self.rejectCallback = lambda : None
self.finished = self.__setupFinished
@@ -649,26 +650,26 @@
"""Called once we're done reading the protocol string. Either
rejects, or sends our response.
"""
- trace("done w/ client sendproto to %s", self.address)
+ LOG.trace("done w/ client sendproto to %s", self.address)
inp = self.pullInput()
m = PROTOCOL_RE.match(inp)
if not m:
- warn("Bad protocol list: %r. Closing connection to %s", inp,
- self.address)
+ LOG.warn("Bad protocol list: %r. Closing connection to %s", inp,
+ self.address)
self.shutdown(err=1)
return
protocols = m.group(1).split(",")
for p in self.PROTOCOL_VERSIONS:
if p in protocols:
- trace("Using protocol %s with %s", p, self.address)
+ LOG.trace("Using protocol %s with %s", p, self.address)
self.protocol = p
self.finished = self.__sentProtocol
self.beginWrite("MMTP %s\r\n"% p)
return
- warn("Unsupported protocol list. Closing connection to %s",
- self.address)
+ LOG.warn("Unsupported protocol list. Closing connection to %s",
+ self.address)
self.shutdown(err=1)
return
@@ -676,47 +677,48 @@
"""Called once we're done sending our protocol response. Begins
reading a packet from the line.
"""
- trace("done w/ server sendproto to %s", self.address)
- self.finished = self.__receivedMessage
+ LOG.trace("done w/ server sendproto to %s", self.address)
+ self.finished = self.__receivedPacket
self.expectRead(SEND_RECORD_LEN)
- def __receivedMessage(self):
- """Called once we've read a message from the line. Checks the
+ def __receivedPacket(self):
+ """Called once we've read a packet from the line. Checks the
digest, and either rejects or begins sending an ACK."""
data = self.pullInput()
- msg = data[SEND_CONTROL_LEN:-DIGEST_LEN]
+ pkt = data[SEND_CONTROL_LEN:-DIGEST_LEN]
digest = data[-DIGEST_LEN:]
if data.startswith(JUNK_CONTROL):
- expectedDigest = sha1(msg+"JUNK")
- replyDigest = sha1(msg+"RECEIVED JUNK")
+ expectedDigest = sha1(pkt+"JUNK")
+ replyDigest = sha1(pkt+"RECEIVED JUNK")
replyControl = RECEIVED_CONTROL
isJunk = 1
elif data.startswith(SEND_CONTROL):
- expectedDigest = sha1(msg+"SEND")
+ expectedDigest = sha1(pkt+"SEND")
if self.rejectPackets:
- replyDigest = sha1(msg+"REJECTED")
+ replyDigest = sha1(pkt+"REJECTED")
replyControl = REJECTED_CONTROL
else:
- replyDigest = sha1(msg+"RECEIVED")
+ replyDigest = sha1(pkt+"RECEIVED")
replyControl = RECEIVED_CONTROL
isJunk = 0
else:
- warn("Unrecognized command (%r) from %s. Closing connection.",
- data[:4], self.address)
+ LOG.warn("Unrecognized command (%r) from %s. Closing connection.",
+ data[:4], self.address)
self.shutdown(err=1)
return
if expectedDigest != digest:
- warn("Invalid checksum from %s. Closing connection",
+ LOG.warn("Invalid checksum from %s. Closing connection",
self.address)
self.shutdown(err=1)
return
else:
if isJunk:
- debug("Link padding received from %s; Checksum valid.",
- self.address)
+ LOG.debug("Link padding received from %s; Checksum valid.",
+ self.address)
else:
- debug("Packet received from %s; Checksum valid.", self.address)
+ LOG.debug("Packet received from %s; Checksum valid.",
+ self.address)
self.finished = self.__sentAck
self.beginWrite(replyControl+replyDigest)
if isJunk:
@@ -724,17 +726,17 @@
elif self.rejectPackets:
self.rejectCallback()
else:
- self.messageConsumer(msg)
+ self.packetConsumer(pkt)
def __sentAck(self):
"""Called once we're done sending an ACK. Begins reading a new
- message."""
- debug("Send ACK for message from %s", self.address)
- self.finished = self.__receivedMessage
+ packet."""
+ LOG.debug("Send ACK for packet from %s", self.address)
+ self.finished = self.__receivedPacket
self.expectRead(SEND_RECORD_LEN)
def remove(self):
- self.messageConsumer = None
+ self.packetConsumer = None
self.finished = None
self.junkCallback = None
self.rejectCallback = None
@@ -778,16 +780,16 @@
"""Asynchronous implementation of the sending ("client") side of a
mixminion connection."""
## Fields:
- # ip, port, keyID, messageList, finishedCallback, certCache:
+ # ip, port, keyID, packetList, finishedCallback, certCache:
# As described in the docstring for __init__ below. We remove entries
- # from the front of messageList/handleList as we begin sending them.
- # _curMessage, _curHandle: Correspond to the message that we are
- # currently trying to deliver. If _curHandle is None, _curMessage
+ # from the front of packetList/handleList as we begin sending them.
+ # _curPacekt, _curHandle: Correspond to the packet that we are
+ # currently trying to deliver. If _curHandle is None, _curPacket
# is a control string. If _curHandle is a DeliverableMessage,
- # _curMessage is the corresponding 32KB string.
+ # _curPacket is the corresponding 32KB string.
# junk: A list of 32KB padding chunks that we're going to send. We
# pregenerate these to avoid timing attacks. They correspond to
- # the 'JUNK' entries in messageList.
+ # the 'JUNK' entries in pacektList.
# isJunk: flag. Is the current chunk padding?
# expectedDigest: The digest we expect to receive in response to the
# current chunk.
@@ -795,19 +797,19 @@
# if negotiation hasn't completed.
# PROTOCOL_VERSIONS: (static) a list of protocol versions we allow,
# in the order we offer them.
- # active: Are we currently able to send messages to a server? Boolean.
+ # active: Are we currently able to send packets to the server? Boolean.
PROTOCOL_VERSIONS = [ '0.3' ]
- def __init__(self, context, ip, port, keyID, messageList,
+ def __init__(self, context, ip, port, keyID, packetList,
finishedCallback=None, certCache=None):
- """Create a connection to send messages to an MMTP server.
+ """Create a connection to send packets to an MMTP server.
Raises socket.error if the connection fails.
ip -- The IP of the destination server.
port -- The port to connect to.
keyID -- None, or the expected SHA1 hash of the server's public key
- messageList -- a list of message payloads and control strings.
- Message payloads must implement the DeliverableMessage
+ packetList -- a list of packets and control strings.
+ Packets must implement the DeliverableMessage
interface above; allowable control strings are either
string "JUNK", which sends 32KB of padding; or the control
string "RENEGOTIATE" which renegotiates the connection key.
@@ -818,10 +820,10 @@
"""
# Generate junk before connecting to avoid timing attacks
self.junk = []
- self.messageList = []
+ self.packetList = []
self.active = 1
- self.addMessages(messageList)
+ self.addPackets(packetList)
if certCache is None:
certCache = PeerCertificateCache()
@@ -849,32 +851,32 @@
self.finished = self.__setupFinished
self.finishedCallback = finishedCallback
self.protocol = None
- self._curMessage = self._curHandle = None
+ self._curPacket = self._curHandle = None
EventStats.log.attemptedConnect() #FFFF addr
- debug("Opening client connection to %s", self.address)
+ LOG.debug("Opening client connection to %s", self.address)
def isActive(self):
- """Return true iff messages added to this connection via addMessages
+ """Return true iff packets added to this connection via addPackets
will be delivered. isActive() will return false if, for example,
the connection is currently shutting down."""
return self.active
- def addMessages(self, messages):
- """Given a list of messages and control strings, as given to
+ def addPackets(self, packets):
+ """Given a list of packets and control strings, as given to
MMTPServer.__init__, cause this connection to deliver that new
- set of messages after it's done with those it's currently
+ set of packets after it's done with those it's currently
sending.
"""
assert self.active
- for m in messages:
- if m == "JUNK":
- self.junk.append(getCommonPRNG().getBytes(MESSAGE_LEN))
- elif m == 'RENEGOTIATE':
+ for pkt in packets:
+ if pkt == "JUNK":
+ self.junk.append(getCommonPRNG().getBytes(PACKET_LEN))
+ elif pkt == 'RENEGOTIATE':
pass
else:
EventStats.log.attemptedRelay() #FFFF addr
- self.messageList.extend(messages)
+ self.packetList.extend(packets)
def getAddr(self):
"""Return an (ip,port,keyID) tuple for this connection"""
@@ -888,12 +890,12 @@
self.certCache.check(self.getTLSConnection(), self.keyID,
self.address)
except MixProtocolError, e:
- warn("Certificate error: %s. Shutting down connection to %s",
- e, self.address)
+ LOG.warn("Certificate error: %s. Shutting down connection to %s",
+ e, self.address)
self.shutdown(err=1,retriable=1)
return
else:
- debug("KeyID is valid from %s", self.address)
+ LOG.debug("KeyID is valid from %s", self.address)
EventStats.log.successfulConnect()
@@ -915,88 +917,88 @@
for p in self.PROTOCOL_VERSIONS:
if inp == 'MMTP %s\r\n'%p:
- trace("Speaking MMTP version %s with %s", p, self.address)
+ LOG.trace("Speaking MMTP version %s with %s", p, self.address)
self.protocol = inp
- self.beginNextMessage()
+ self.beginNextPacket()
return
- warn("Invalid protocol. Closing connection to %s", self.address)
+ LOG.warn("Invalid protocol. Closing connection to %s", self.address)
# This isn't retriable; we don't talk to servers we don't
# understand.
self.shutdown(err=1,retriable=0)
return
- def beginNextMessage(self):
- """Start writing a message to the connection."""
- self._getNextMessage()
- if not self._curMessage:
+ def beginNextPacket(self):
+ """Start writing a packet to the connection."""
+ self._getNextPacket()
+ if not self._curPacket:
self.shutdown(0)
return
- msg = self._curMessage
- if msg == 'RENEGOTIATE':
- self.finished = self.beginNextMessage
+ pkt = self._curPacket
+ if pkt == 'RENEGOTIATE':
+ self.finished = self.beginNextPacket
self.startRenegotiate()
return
- elif msg == 'JUNK':
- msg = self.junk[0]
+ elif pkt == 'JUNK':
+ pkt = self.junk[0]
del self.junk[0]
- self.expectedDigest = sha1(msg+"RECEIVED JUNK")
- self.rejectDigest = sha1(msg+"REJECTED")
- msg = JUNK_CONTROL+msg+sha1(msg+"JUNK")
+ self.expectedDigest = sha1(pkt+"RECEIVED JUNK")
+ self.rejectDigest = sha1(pkt+"REJECTED")
+ pkt = JUNK_CONTROL+pkt+sha1(pkt+"JUNK")
self.isJunk = 1
else:
- self.expectedDigest = sha1(msg+"RECEIVED")
- self.rejectDigest = sha1(msg+"REJECTED")
- msg = SEND_CONTROL+msg+sha1(msg+"SEND")
+ self.expectedDigest = sha1(pkt+"RECEIVED")
+ self.rejectDigest = sha1(pkt+"REJECTED")
+ pkt = SEND_CONTROL+pkt+sha1(pkt+"SEND")
self.isJunk = 0
- assert len(msg) == SEND_RECORD_LEN
- self.beginWrite(msg)
- self.finished = self.__sentMessage
+ assert len(pkt) == SEND_RECORD_LEN
+ self.beginWrite(pkt)
+ self.finished = self.__sentPacket
- def _getNextMessage(self):
- """Helper function: pull the next _curHandle, _curMessage pair from
- self.messageList."""
- while self.messageList:
- m = self.messageList[0]
- del self.messageList[0]
+ def _getNextPacket(self):
+ """Helper function: pull the next _curHandle, _curPacket pair from
+ self.packetList."""
+ while self.packetList:
+ m = self.packetList[0]
+ del self.packetList[0]
if hasattr(m, 'getContents'):
self._curHandle = m
try:
- self._curMessage = m.getContents()
+ self._curPacket = m.getContents()
except CorruptedFile:
pass
return
else:
self._curHandle = None
- self._curMessage = m
+ self._curPacket = m
return
- self._curHandle = self._curMessage = None
+ self._curHandle = self._curPacket = None
- def __sentMessage(self):
- """Called when we're done sending a message. Begins reading the
+ def __sentPacket(self):
+ """Called when we're done sending a packet. Begins reading the
server's ACK."""
- debug("Message delivered to %s", self.address)
+ LOG.debug("Packet delivered to %s", self.address)
self.finished = self.__receivedAck
self.expectRead(RECEIVED_RECORD_LEN)
def __receivedAck(self):
"""Called when we're done reading the ACK. If the ACK is bad,
closes the connection. If the ACK is correct, removes the
- just-sent message from the connection's internal queue, and
- calls sentCallback with the sent message.
+ just-sent packet from the connection's internal queue, and
+ calls sentCallback with the sent packet.
- If there are more messages to send, begins sending the next.
+ If there are more packets to send, begins sending the next.
Otherwise, begins shutting down.
"""
- trace("received ack from %s", self.address)
+ LOG.trace("received ack from %s", self.address)
inp = self.pullInput()
rejected = 0
if inp == REJECTED_CONTROL+self.rejectDigest:
- debug("Message rejected from %s", self.address)
+ LOG.debug("Packet rejected from %s", self.address)
rejected = 1
elif inp != (RECEIVED_CONTROL+self.expectedDigest):
# We only get bad ACKs if an adversary somehow subverts TLS's
@@ -1004,7 +1006,7 @@
self.shutdown(err=1,retriable=0)
return
else:
- debug("Received valid ACK for message from %s", self.address)
+ LOG.debug("Received valid ACK for packet sent to %s", self.address)
if not self.isJunk:
if not rejected:
@@ -1014,9 +1016,9 @@
self._curHandle.failed(retriable=1)
EventStats.log.failedRelay() #FFFF addr
- self._curMessage = self._curHandle = None
+ self._curPacket = self._curHandle = None
- self.beginNextMessage()
+ self.beginNextPacket()
def handleFail(self, retriable):
"""Invoked when we shutdown with an error."""
@@ -1029,14 +1031,14 @@
if self._curHandle is not None:
self._curHandle.failed(retriable)
statFn()
- for msg in self.messageList:
+ for pkt in self.packetList:
try:
- msg.failed(retriable)
+ pkt.failed(retriable)
statFn()
except AttributeError:
pass
- self.messageList = []
- self._curMessage = self._curHandle = None
+ self.packetList = []
+ self._curPacket = self._curHandle = None
def shutdown(self, err=0, retriable=0):
self.active = 0
@@ -1057,7 +1059,7 @@
class MMTPAsyncServer(AsyncServer):
"""A helper class to invoke AsyncServer, MMTPServerConnection, and
MMTPClientConnection, with a function to add new connections, and
- callbacks for message success and failure."""
+ callbacks for packet success and failure."""
##
# serverContext: a TLSContext object to use for newly received connections.
# clientContext: a TLSContext object to use for initiated connections.
@@ -1131,7 +1133,7 @@
# FFFF Check whether incoming IP is allowed!
tls = self.serverContext.sock(sock, serverMode=1)
sock.setblocking(0)
- con = MMTPServerConnection(sock, tls, self.onMessageReceived)
+ con = MMTPServerConnection(sock, tls, self.onPacketReceived)
con.register(self)
return con
@@ -1139,10 +1141,10 @@
for listener in self.listeners:
listener.shutdown()
- def sendMessagesByRouting(self, routing, deliverable):
+ def sendPacketsByRouting(self, routing, deliverable):
"""DOCDOC"""
if isinstance(routing, IPV4Info):
- self.sendMessages(AF_INET, routing.ip, routing.port,
+ self.sendPackets(AF_INET, routing.ip, routing.port,
routing.keyinfo, deliverable)
else:
assert isinstance(routing, MMTPHostInfo)
@@ -1155,27 +1157,27 @@
except AttributeError:
pass
else:
- self.queueSendableMessages(family, addr,
+ self.queueSendablePackets(family, addr,
routing.port, routing.keyinfo,
deliverable)
self.dnsCache.lookup(routing.hostname, lookupDone)
- def queueSendableMessages(self, family, addr, port, keyID, deliverable):
+ def queueSendablePackets(self, family, addr, port, keyID, deliverable):
"""DOCDOC"""
self.msgQueue.put((family,addr,port,keyID,deliverable))
- def sendQueuedMessages(self):
+ def sendQueuedPackets(self):
"""DOCDOC"""
while 1:
try:
family,addr,port,keyID,deliverable=self.msgQueue.get(block=0)
except QueueEmpty:
return
- self.sendMessages(family,addr,port,keyID,deliverable)
+ self.sendPackets(family,addr,port,keyID,deliverable)
- def sendMessages(self, family, ip, port, keyID, deliverable):
- """Begin sending a set of messages to a given server.
+ def sendPackets(self, family, ip, port, keyID, deliverable):
+ """Begin sending a set of packets to a given server.
deliverable is a list of objects obeying the DeliverableMessage
interface.
@@ -1187,11 +1189,11 @@
pass
else:
# No exception: There is an existing connection. But is that
- # connection currently sending messages?
+ # connection currently sending packets?
if con.isActive():
- LOG.debug("Queueing %s messages on open connection to %s",
+ LOG.debug("Queueing %s packets on open connection to %s",
len(deliverable), con.address)
- con.addMessages(deliverable)
+ con.addPackets(deliverable)
return
try:
@@ -1219,18 +1221,18 @@
self.clientConByAddr[addr] = con
def __clientFinished(self, addr):
- """Called when a client connection runs out of messages to send."""
+ """Called when a client connection runs out of packets to send."""
try:
del self.clientConByAddr[addr]
except KeyError:
- warn("Didn't find client connection to %s in address map",
- addr)
+ LOG.warn("Didn't find client connection to %s in address map",
+ addr)
- def onMessageReceived(self, msg):
- """Abstract function. Called when we get a message"""
+ def onPacketReceived(self, pkt):
+ """Abstract function. Called when we get a packet"""
pass
def process(self, timeout):
"""DOCDOC overrides"""
- self.sendQueuedMessages()
+ self.sendQueuedPackets()
AsyncServer.process(self, timeout)
Index: Modules.py
===================================================================
RCS file: /home/minion/cvsroot/src/minion/lib/mixminion/server/Modules.py,v
retrieving revision 1.59
retrieving revision 1.60
diff -u -d -r1.59 -r1.60
--- Modules.py 7 Nov 2003 07:03:28 -0000 1.59
+++ Modules.py 19 Nov 2003 09:48:10 -0000 1.60
@@ -549,6 +549,15 @@
'MaximumSize' : ('REQUIRE', "size", None),
'MaximumInterval' : ('ALLOW', "interval", "2 days" )
} }
+
+ def validateConfig(self, config, lines, contents):
+ frag = config.get('Delivery/Fragmented', {}).get("Enabled")
+ mbox = config.get('Delivery/MBOX', {}).get("Enabled")
+ smtp = config.get('Delivery/SMTP', {}).get("Enabled")
+ smtpmm = config.get('Delivery/SMTP-Via-Mixmaster', {}).get("Enabled")
+ if frag and not (mbox or smtp or smtpmm):
+ raise ConfigError("You've specified Fragmented delivery, but no actual delivery method. This doesn't make much sense.")
+
def getRetrySchedule(self):
return [ ]
def configure(self, config, manager):
Index: PacketHandler.py
===================================================================
RCS file: /home/minion/cvsroot/src/minion/lib/mixminion/server/PacketHandler.py,v
retrieving revision 1.30
retrieving revision 1.31
diff -u -d -r1.30 -r1.31
--- PacketHandler.py 10 Nov 2003 04:12:20 -0000 1.30
+++ PacketHandler.py 19 Nov 2003 09:48:10 -0000 1.31
@@ -26,7 +26,7 @@
"""Class to handle processing packets. Given an incoming packet,
it removes one layer of encryption, does all necessary integrity
checks, swaps headers if necessary, re-pads, and decides whether
- to drop the message, relay the message, or send the message to
+ to drop the packet, relay the packet, or send the packet to
an exit handler."""
## Fields:
# privatekeys: a list of 2-tuples of
@@ -90,11 +90,11 @@
finally:
self.lock.release()
- def processMessage(self, msg):
- """Given a 32K mixminion message, processes it completely.
+ def processPacket(self, msg):
+ """Given a 32K mixminion packet, processes it completely.
Return one of:
- None [if the message should be dropped.]
+ None [if the packet should be dropped.]
a DeliveryPacket object
a RelayedPacket object
@@ -103,9 +103,9 @@
unhandleable.
WARNING: This implementation does nothing to prevent timing
- attacks: dropped messages, messages with bad digests, replayed
- messages, and exit messages are all processed faster than
- forwarded messages. You must prevent timing attacks elsewhere."""
+ attacks: dropped packets, packets with bad digests, replayed
+ packets, and exit packets are all processed faster than
+ forwarded packets. You must prevent timing attacks elsewhere."""
# Break into headers and payload
pkt = Packet.parsePacket(msg)
@@ -148,13 +148,13 @@
if subh.digest != Crypto.sha1(header1):
raise ContentError("Invalid digest")
- # Get ready to generate message keys.
+ # Get ready to generate packet keys.
keys = Crypto.Keyset(subh.secret)
# Replay prevention
replayhash = keys.get(Crypto.REPLAY_PREVENTION_MODE, Crypto.DIGEST_LEN)
if hashlog.seenHash(replayhash):
- raise ContentError("Duplicate message detected.")
+ raise ContentError("Duplicate packet detected.")
else:
hashlog.logHash(replayhash)
@@ -235,14 +235,14 @@
# Build the address object for the next hop
address = Packet.parseRelayInfoByType(rt, subh.routinginfo)
- # Construct the message for the next hop.
+ # Construct the packet for the next hop.
pkt = Packet.Packet(header1, header2, payload).pack()
return RelayedPacket(address, pkt)
class RelayedPacket:
"""A packet that is to be relayed to another server; returned by
- returned by PacketHandler.processMessage."""
+ returned by PacketHandler.processPacket."""
## Fields:
# address -- an instance of IPV4Info
# msg -- a 32K packet.
@@ -269,7 +269,7 @@
class DeliveryPacket:
"""A packet that is to be delivered via some exit module; returned by
- PacketHandler.processMessage"""
+ PacketHandler.processPacket"""
##Fields:
# exitType -- a 2-byte integer indicating which exit module to use.
# address -- a string encoding the address to deliver to.
Index: ServerMain.py
===================================================================
RCS file: /home/minion/cvsroot/src/minion/lib/mixminion/server/ServerMain.py,v
retrieving revision 1.100
retrieving revision 1.101
diff -u -d -r1.100 -r1.101
--- ServerMain.py 10 Nov 2003 04:12:20 -0000 1.100
+++ ServerMain.py 19 Nov 2003 09:48:10 -0000 1.101
@@ -13,7 +13,7 @@
## Directory layout:
# MINION_HOME/work/queues/incoming/ [Queue of received,unprocessed pkts]
# mix/ [Mix pool]
-# outgoing/ [Messages for mmtp delivery]
+# outgoing/ [Packets for mmtp delivery]
# deliver/*/ [Messages for delivery via modules]
# tls/dhparam [Diffie-Hellman parameters]
# hashlogs/hash_1* [HashLogs of packet hashes
@@ -123,14 +123,14 @@
class IncomingQueue(mixminion.Filestore.StringStore):
"""A Queue to accept packets from incoming MMTP connections,
and hold them until they can be processed. As packets arrive, and
- are stored to disk, we notify a message queue so that another thread
+ are stored to disk, we notify a MessageQueue so that another thread
can read them."""
## Fields:
# packetHandler -- an instance of PacketHandler.
# mixPool -- an instance of MixPool
# processingThread -- an instance of ProcessingThread
def __init__(self, location, packetHandler):
- """Create an IncomingQueue that stores its messages in <location>
+ """Create an IncomingQueue that stores its packets in <location>
and processes them through <packetHandler>."""
mixminion.Filestore.StringStore.__init__(self, location, create=1)
self.packetHandler = packetHandler
@@ -143,27 +143,31 @@
for h in self.getAllMessages():
assert h is not None
self.processingThread.addJob(
- lambda self=self, h=h: self.__deliverMessage(h))
+ lambda self=self, h=h: self.__deliverPacket(h))
- def queueMessage(self, msg):
- """Add a message for delivery"""
- h = mixminion.Filestore.StringStore.queueMessage(self, msg)
- LOG.trace("Inserting message IN:%s into incoming queue", h)
+ def queuePacket(self, pkt):
+ """Add a packet for delivery"""
+ h = mixminion.Filestore.StringStore.queueMessage(self, pkt)
+ LOG.trace("Inserting packet IN:%s into incoming queue", h)
assert h is not None
self.processingThread.addJob(
- lambda self=self, h=h: self.__deliverMessage(h))
+ lambda self=self, h=h: self.__deliverPacket(h))
- def __deliverMessage(self, handle):
- """Process a single message with a given handle, and insert it into
+ def queueMessage(self, m):
+ # Never call this directly.
+ assert 0
+
+ def __deliverPacket(self, handle):
+ """Process a single packet with a given handle, and insert it into
the Mix pool. This function is called from within the processing
thread."""
ph = self.packetHandler
- message = self.messageContents(handle)
+ packet = self.messageContents(handle)
try:
- res = ph.processMessage(message)
+ res = ph.processPacket(packet)
if res is None:
# Drop padding before it gets to the mix.
- LOG.debug("Padding message IN:%s dropped", handle)
+ LOG.debug("Padding packet IN:%s dropped", handle)
self.removeMessage(handle)
else:
if res.isDelivery():
@@ -171,14 +175,14 @@
self.mixPool.queueObject(res)
self.removeMessage(handle)
- LOG.debug("Processed message IN:%s; inserting into mix pool",
+ LOG.debug("Processed packet IN:%s; inserting into mix pool",
handle)
except mixminion.Crypto.CryptoError, e:
- LOG.warn("Invalid PK or misencrypted header in message IN:%s: %s",
+ LOG.warn("Invalid PK or misencrypted header in packet IN:%s: %s",
handle, e)
self.removeMessage(handle)
except mixminion.Packet.ParseError, e:
- LOG.warn("Malformed message IN:%s dropped: %s", handle, e)
+ LOG.warn("Malformed packet IN:%s dropped: %s", handle, e)
self.removeMessage(handle)
except mixminion.server.PacketHandler.ContentError, e:
LOG.warn("Discarding bad packet IN:%s: %s", handle, e)
@@ -189,7 +193,7 @@
self.removeMessage(handle)
class MixPool:
- """Wraps a mixminion.server.ServerQueue.*MixPool to send messages
+ """Wraps a mixminion.server.ServerQueue.*MixPool to send packets
to an exit queue and a delivery queue. The files in the
MixPool are instances of RelayedPacket or DeliveryPacket from
PacketHandler.
@@ -238,23 +242,23 @@
return self.queue.queueObject(obj)
def count(self):
- "Return the number of messages in the pool"
+ "Return the number of packets in the pool"
return self.queue.count()
def connectQueues(self, outgoing, manager):
"""Sets the queue for outgoing mixminion packets, and the
- module manager for deliverable messages."""
+ module manager for deliverable packets."""
self.outgoingQueue = outgoing
self.moduleManager = manager
def mix(self):
- """Get a batch of messages, and queue them for delivery as
+ """Get a batch of packets, and queue them for delivery as
appropriate."""
if self.queue.count() == 0:
- LOG.trace("No messages in the mix pool")
+ LOG.trace("No packets in the mix pool")
return
handles = self.queue.getBatch()
- LOG.debug("%s messages in the mix pool; delivering %s.",
+ LOG.debug("%s packets in the mix pool; delivering %s.",
self.queue.count(), len(handles))
for h in handles:
@@ -265,16 +269,16 @@
if packet.isDelivery():
h2 = self.moduleManager.queueDecodedMessage(packet)
if h2:
- LOG.debug(" (sending message MIX:%s to exit modules as MOD:%s)"
+ LOG.debug(" (sending packet MIX:%s to exit modules as MOD:%s)"
, h, h2)
else:
- LOG.debug(" (exit modules received message MIX:%s without queueing.)", h)
+ LOG.debug(" (exit modules received packet MIX:%s without queueing.)", h)
else:
address = packet.getAddress()
h2 = self.outgoingQueue.queueDeliveryMessage(packet, address)
- LOG.debug(" (sending message MIX:%s to MMTP server as OUT:%s)"
+ LOG.debug(" (sending packet MIX:%s to MMTP server as OUT:%s)"
, h, h2)
- # In any case, we're through with this message now.
+ # In any case, we're through with this packet now.
self.queue.removeMessage(h)
def getNextMixTime(self, now):
@@ -283,7 +287,7 @@
return now + self.queue.getInterval()
class OutgoingQueue(mixminion.server.ServerQueue.DeliveryQueue):
- """DeliveryQueue to send messages via outgoing MMTP connections. All
+ """DeliveryQueue to send packets via outgoing MMTP connections. All
methods on this class are called from the main thread. The underlying
objects in this queue are instances of RelayedPacket.
@@ -295,7 +299,7 @@
# incomingQueue -- pointer to IncomingQueue object to be used for
# self->self communication.
def __init__(self, location, (ip,port,keyid)):
- """Create a new OutgoingQueue that stores its messages in a given
+ """Create a new OutgoingQueue that stores its packets in a given
location."""
mixminion.server.ServerQueue.DeliveryQueue.__init__(self, location)
self.server = None
@@ -309,7 +313,7 @@
def connectQueues(self, server, incoming):
"""Set the MMTPServer and IncomingQueue that this
- OutgoingQueue informs of its deliverable messages."""
+ OutgoingQueue informs of its deliverable packets."""
self.server = server
self.incomingQueue = incoming
@@ -317,7 +321,7 @@
def _deliverMessages(self, msgList):
"Implementation of abstract method from DeliveryQueue."
# Map from addr -> [ (handle, msg) ... ]
- msgs = {}
+ pkts = {}
for pending in msgList:
try:
addr = pending.getAddress()
@@ -325,24 +329,24 @@
addr = pending.getMessage().getAddress()
except mixminion.Filestore.CorruptedFile:
continue
- msgs.setdefault(addr, []).append(pending)
- for routing, messages in msgs.items():
+ pkts.setdefault(addr, []).append(pending)
+ for routing, packets in pkts.items():
if self.keyID == routing.keyinfo:
- for pending in messages:
- LOG.trace("Delivering message OUT:%s to myself.",
+ for pending in packets:
+ LOG.trace("Delivering packet OUT:%s to myself.",
pending.getHandle())
- self.incomingQueue.queueMessage(
+ self.incomingQueue.queuePacket(
pending.getMessage().getPacket())
pending.succeeded()
continue
deliverable = [
mixminion.server.MMTPServer.DeliverablePacket(pending)
- for pending in messages ]
- LOG.trace("Delivering messages OUT:[%s] to %s",
- " ".join([p.getHandle() for p in messages]),
+ for pending in packets ]
+ LOG.trace("Delivering packets OUT:[%s] to %s",
+ " ".join([p.getHandle() for p in packets]),
routing)
- self.server.sendMessagesByRouting(routing, deliverable)
+ self.server.sendPacketsByRouting(routing, deliverable)
class _MMTPServer(mixminion.server.MMTPServer.MMTPAsyncServer):
"""Implementation of mixminion.server.MMTPServer that knows about
@@ -351,8 +355,8 @@
All methods in this class are run from the main thread.
"""
## Fields:
- # incomingQueue -- a Queue to hold messages we receive
- # outgoingQueue -- a DeliveryQueue to hold messages to be sent.
+ # incomingQueue -- a Queue to hold packetts we receive
+ # outgoingQueue -- a DeliveryQueue to hold packets to be sent.
def __init__(self, config, servercontext):
mixminion.server.MMTPServer.MMTPAsyncServer.__init__(
self, config, servercontext)
@@ -361,8 +365,8 @@
self.incomingQueue = incoming
self.outgoingQueue = outgoing
- def onMessageReceived(self, msg):
- self.incomingQueue.queueMessage(msg)
+ def onPacketReceived(self, pkt):
+ self.incomingQueue.queuePacket(pkt)
# FFFF Replace with server.
EventStats.log.receivedPacket()
@@ -600,12 +604,12 @@
# and places them in mixPool.
# packetHandler: Instance of PacketHandler. Used by incomingQueue to
# decrypt, check, and re-pad received packets.
- # mixPool: Instance of MixPool. Holds processed messages, and
+ # mixPool: Instance of MixPool. Holds processed packets, and
# periodically decides which ones to deliver, according to some
# batching algorithm.
# moduleManager: Instance of ModuleManager. Map routing types to
# outgoing queues, and processes non-MMTP exit messages.
- # outgoingQueue: Holds messages waiting to be send via MMTP.
+ # outgoingQueue: Holds packets waiting to be send via MMTP.
# cleaningThread: Thread used to remove packets in the background
# processingThread: Thread to handle CPU-intensive activity without
# slowing down network interactivity.
@@ -688,14 +692,14 @@
incomingDir = os.path.join(queueDir, "incoming")
LOG.debug("Initializing incoming queue")
self.incomingQueue = IncomingQueue(incomingDir, self.packetHandler)
- LOG.debug("Found %d pending messages in incoming queue",
+ LOG.debug("Found %d pending packets in incoming queue",
self.incomingQueue.count())
mixDir = os.path.join(queueDir, "mix")
LOG.trace("Initializing Mix pool")
self.mixPool = MixPool(config, mixDir)
- LOG.debug("Found %d pending messages in Mix pool",
+ LOG.debug("Found %d pending packets in Mix pool",
self.mixPool.count())
outgoingDir = os.path.join(queueDir, "outgoing")
@@ -703,7 +707,7 @@
self.outgoingQueue = OutgoingQueue(outgoingDir,
(publishedIP, publishedPort, publishedKeyID))
self.outgoingQueue.configure(config)
- LOG.debug("Found %d pending messages in outgoing queue",
+ LOG.debug("Found %d pending packets in outgoing queue",
self.outgoingQueue.count())
self.cleaningThread = CleaningThread()
@@ -866,7 +870,7 @@
def doMix(self):
"""Called when the server's mix is about to fire. Picks some
- messages to send, and sends them to the appropriate queues.
+ packets to send, and sends them to the appropriate queues.
"""
now = time.time()
@@ -878,13 +882,13 @@
self.packetHandler.syncLogs()
LOG.trace("Mix interval elapsed")
- # Choose a set of outgoing messages; put them in
+ # Choose a set of outgoing packets; put them in
# outgoingqueue and modulemanager
self.mixPool.mix()
finally:
self.mixPool.unlock()
- # Send outgoing messages
+ # Send outgoing packets
self.outgoingQueue.sendReadyMessages()
# Send exit messages
self.moduleManager.sendReadyMessages()