[Author Prev][Author Next][Thread Prev][Thread Next][Author Index][Thread Index]
[minion-cvs] Several days worth of hacking. Highlights: Key rotatio...
Update of /home/minion/cvsroot/src/minion/lib/mixminion/server
In directory moria.mit.edu:/tmp/cvs-serv2846/lib/mixminion/server
Modified Files:
EventStats.py MMTPServer.py Modules.py PacketHandler.py
ServerConfig.py ServerKeys.py ServerMain.py ServerQueue.py
Log Message:
Several days worth of hacking. Highlights: Key rotation, robust queues.
TODO:
- Update status, add time estimates
- Break down directory work
etc/mixminiond.conf:
- Rename PublicKeySloppiness to PublicKeyOverlap
*:
- Whitespace normalization
ClientMain:
- Improve path syntax to include ?, *n, Allow choice-with-replacement
- Use new readPickled functionality from Common
- Add -n argument for flush command
- Add default-path options to ClientConfig
- Be more specific about causes of failure when flushing; be more specific
about # messages flushed.
- Remove --swap-at option: now path syntax is adequate.
Config, ClientMain, Common:
- Change duration from a 3-tuple to an independent class. Now we
can say duration.getSeconds() rather than duration[2], which makes
some stuff more readable.
Common:
- Debug checkPrivateFile
- Add AtomicFile class to help with standard create/rename pattern.
- Add readPickled/writePickled wrappers
MMTPClient:
- Document PeerCertificateCache
Packet:
- Correct documentation on overflow, underflow.
benchmark:
- Improve format of printed sizes
- Improve pk timing; time with bizarre exponent.
- Add Timing for ServerQueues
test:
- Add tests for encodeBase64
- Correct tests for new DeliveryQueue implementation
- Add tests for checkPrivateFile
- Revise tests for _parseInterval in response to new Duration class.
- Add tests for generating new descriptors with existing keys
- Fix test for directory with bad signature: make it fail for the
right reason
- Deal with new validateConfig in Module
- Add test for scheduler.
- Tests for new path selection code
testSupport:
- Module code uses new interface
EventStats:
- Document, clean
MMTPServer:
- Better warning on TLSClosed while connecting.
- Document new functionality
Modules:
- validateConfig function no longer needs 'sections' and 'entries':
make it follow the same interface as other validation fns
- _deliverMessages: use new DeliveryQueue interface
PacketHandler:
- Always take a list of keys, never a single one.
ServerConfig:
- Refactor validateRetrySchedule
- Use new Duration class
- Rename PublicKeySloppiness to PublicKeyOverlap
ServerKeys: ***
- Implement key rotation:
- Notice when to add and remove keys from PacketHandlers, MMTPServer
- Set keys in packethandlers, mmtpserver
- Note that 512-bit DH moduli are kinda silly
- More code and debugging for descriptor regenration
ServerMain:
- Documentation
- Key rotation
- Respond to refactoring in DeliveryQueue
- Use lambdas to wrap EventStats rotation
- Separate reset method
- Remove obsolete commands
ServerQueue: ***
- Refactor DeliveryQueue so that it has a prayer of working: Keep
message delivery state in a separate file, and update separately.
Remember time of queueing for each method, and last attempted
delivery; n_retries is gone. This allows us to change the retry schedule
without putting messages in an inconsistent state.
An earlier version put the state for _all_ queued objects in a
single file: this turned out to be screamingly inefficient.
crypt.c, tls.c:
- Documentation fixes
Index: EventStats.py
===================================================================
RCS file: /home/minion/cvsroot/src/minion/lib/mixminion/server/EventStats.py,v
retrieving revision 1.3
retrieving revision 1.4
diff -u -d -r1.3 -r1.4
--- EventStats.py 5 May 2003 00:38:46 -0000 1.3
+++ EventStats.py 17 May 2003 00:08:44 -0000 1.4
@@ -7,15 +7,14 @@
__all__ = [ 'EventLog', 'NilEventLog' ]
-import cPickle
import os
from threading import RLock
from time import time
from mixminion.Common import formatTime, LOG, previousMidnight, floorDiv, \
- createPrivateDir, MixError
+ createPrivateDir, MixError, readPickled, writePickled
-# _EVENTS: a list of all recognized event types.
+# _EVENTS: a list of all recognized event types.
_EVENTS = [ 'ReceivedPacket',
'AttemptedRelay',
'SuccessfulRelay', 'FailedRelay', 'UnretriableRelay',
@@ -25,7 +24,7 @@
class NilEventLog:
"""Null implementation of EventLog interface: ignores all events and
- logs nothing.
+ logs nothing.
"""
def __init__(self):
pass
@@ -33,10 +32,13 @@
"""Flushes this eventlog to disk."""
pass
def rotate(self, now=None):
- """DOCDOC"""
+ """Move the pending events from this EventLog into a
+ summarized text listing, and start a new pool. Requires
+ that it's time to rotate.
+ """
pass
def getNextRotation(self):
- """DOCDOC"""
+ """Return a time after which it's okay to rotate the log."""
return 0
def _log(self, event, arg=None):
"""Notes that an event has occurred.
@@ -79,7 +81,7 @@
module fails unretriably.
"""
self._log("UnretriableDelivery", arg)
-
+
class EventLog(NilEventLog):
"""An EventLog records events, aggregates them according to some time
periods, and logs the totals to disk.
@@ -124,10 +126,7 @@
periodically writes to 'historyFile' every 'interval' seconds."""
NilEventLog.__init__(self)
if os.path.exists(filename):
- # XXXX If this doesn't work, then we should ????004
- f = open(filename, 'rb')
- self.__dict__.update(cPickle.load(f))
- f.close()
+ self.__dict__.update(readPickled(filename))
assert self.count is not None
assert self.lastRotation is not None
assert self.accumulatedTime is not None
@@ -155,7 +154,7 @@
self._save(now)
finally:
self._lock.release()
-
+
def _save(self, now=None):
"""Implements 'save' method. For internal use. Must hold self._lock
to invoke."""
@@ -168,14 +167,10 @@
pass
self.accumulatedTime += int(now-self.lastSave)
self.lastSave = now
- f = open(tmpfile, 'wb')
- cPickle.dump({ 'count' : self.count,
- 'lastRotation' : self.lastRotation,
- 'accumulatedTime' : self.accumulatedTime,
- },
- f, 1)
- f.close()
- os.rename(tmpfile, self.filename)
+ writePickled(self.filename, { 'count' : self.count,
+ 'lastRotation' : self.lastRotation,
+ 'accumulatedTime' : self.accumulatedTime,
+ })
def _log(self, event, arg=None):
try:
@@ -206,11 +201,11 @@
def _rotate(self, now=None):
"""Flush all events since the last rotation to the history file,
and clears the current event log."""
-
+
# Must hold lock
LOG.debug("Flushing statistics log")
if now is None: now = time()
-
+
f = open(self.historyFilename, 'a')
self.dump(f, now)
f.close()
@@ -220,7 +215,7 @@
self.count[e] = {}
self.lastRotation = now
self._save(now)
- self.accumulatedTime = 0
+ self.accumulatedTime = 0
self._setNextRotation(now)
def dump(self, f, now=None):
@@ -257,7 +252,7 @@
self._lock.release()
def _setNextRotation(self, now=None):
- # DOCDOC
+ """Helper function: calculate the time when we next rotate the log."""
# ???? Lock to 24-hour cycle
# This is a little weird. We won't save *until*:
@@ -283,7 +278,9 @@
self.nextRotation = mid + 3600 * floorDiv(rest+55*60, 3600)
def configureLog(config):
- """DOCDOC"""
+ """Given a configuration file, set up the log. May replace the log global
+ variable.
+ """
global log
if config['Server']['LogStats']:
LOG.info("Enabling statistics logging")
@@ -293,9 +290,11 @@
statsfile = os.path.join(homedir, "stats")
workfile = os.path.join(homedir, "work", "stats.tmp")
log = EventLog(
- workfile, statsfile, config['Server']['StatsInterval'][2])
+ workfile, statsfile, config['Server']['StatsInterval'].getSeconds())
+ LOG.info("Statistics logging enabled")
else:
log = NilEventLog()
LOG.info("Statistics logging disabled")
+# Global variable: The currently configured event log.
log = NilEventLog()
Index: MMTPServer.py
===================================================================
RCS file: /home/minion/cvsroot/src/minion/lib/mixminion/server/MMTPServer.py,v
retrieving revision 1.27
retrieving revision 1.28
diff -u -d -r1.27 -r1.28
--- MMTPServer.py 5 May 2003 02:52:01 -0000 1.27
+++ MMTPServer.py 17 May 2003 00:08:44 -0000 1.28
@@ -444,7 +444,10 @@
except _ml.TLSWantRead:
self.__server.registerReader(self)
except _ml.TLSClosed:
- warn("Unexpectedly closed connection to %s", self.address)
+ if self.__state is self.__connectFn:
+ warn("Couldn't connect to server %s", self.address)
+ else:
+ warn("Unexpectedly closed connection to %s", self.address)
self.handleFail(retriable=1)
self.__sock.close()
self.__server.unregister(self)
@@ -472,7 +475,6 @@
def shutdownFailed(self):
"""Called when this connection goes down hard."""
- #DOCDOc
pass
def shutdown(self, err=0, retriable=0):
@@ -526,19 +528,25 @@
# if negotiation hasn't completed.
# PROTOCOL_VERSIONS: (static) a list of protocol versions we allow,
# in decreasing order of preference.
+ # junkCallback: a no-arguments function called when we receive a
+ # junk packet.
+ # rejectCallback: a no-arguments function called when we reject a packet.
+ # rejectPackets: a flag: should we reject incoming packets?
PROTOCOL_VERSIONS = [ '0.3' ]
def __init__(self, sock, tls, consumer, rejectPackets=0):
"""Create an MMTP connection to receive messages sent along a given
socket. When valid packets are received, pass them to the
- function 'consumer'."""
+ function 'consumer'. If rejectPackets is true, then instead of
+ accepting packets, we refuse them instead--for example, if the
+ because the disk is full."""
SimpleTLSConnection.__init__(self, sock, tls, 1,
"%s:%s"%sock.getpeername())
self.messageConsumer = consumer
- self.junkCallback = lambda : None #DOCDOC
- self.rejectCallback = lambda : None #DOCDOC
+ self.junkCallback = lambda : None
+ self.rejectCallback = lambda : None
self.finished = self.__setupFinished
self.protocol = None
- self.rejectPackets = rejectPackets #DOCDOC
+ self.rejectPackets = rejectPackets
def __setupFinished(self):
"""Called once we're done accepting. Begins reading the protocol
@@ -569,7 +577,7 @@
self.finished = self.__sentProtocol
self.beginWrite("MMTP %s\r\n"% p)
return
-
+
warn("Unsupported protocol list. Closing connection to %s",
self.address)
self.shutdown(err=1)
@@ -599,7 +607,7 @@
expectedDigest = sha1(msg+"SEND")
if self.rejectPackets:
replyDigest = sha1(msg+"REJECTED")
- replyControl = REJECTED_CONTROL
+ replyControl = REJECTED_CONTROL
else:
replyDigest = sha1(msg+"RECEIVED")
replyControl = RECEIVED_CONTROL
@@ -617,7 +625,7 @@
else:
debug("%s packet received from %s; Checksum valid.",
data[:4], self.address)
- self.finished = self.__sentAck
+ self.finished = self.__sentAck
self.beginWrite(replyControl+replyDigest)
if isJunk:
self.junkCallback()
@@ -641,10 +649,13 @@
"""Asynchronious implementation of the sending ("client") side of a
mixminion connection."""
## Fields:
- # ip, port, keyID, messageList, handleList, sendCallback, failCallback:
- # As described in the docstring for __init__ below.
+ # ip, port, keyID, messageList, handleList, sendCallback, failCallback,
+ # finishedCallback, certCache:
+ # As described in the docstring for __init__ below. We remove entries
+ # from the front of messageList/handleList as we begin sending them.
# junk: A list of 32KB padding chunks that we're going to send. We
- # pregenerate these to avoid timing attacks.
+ # pregenerate these to avoid timing attacks. They correspond to
+ # the 'JUNK' entries in messageList.
# isJunk: flag. Is the current chunk padding?
# expectedDigest: The digest we expect to receive in response to the
# current chunk.
@@ -652,31 +663,34 @@
# if negotiation hasn't completed.
# PROTOCOL_VERSIONS: (static) a list of protocol versions we allow,
# in the order we offer them.
+ # _curMessage, _curHandle: Correspond to the message and handle
+ # that we are currently trying to deliver.
PROTOCOL_VERSIONS = [ '0.3' ]
def __init__(self, context, ip, port, keyID, messageList, handleList,
sentCallback=None, failCallback=None, finishedCallback=None,
certCache=None):
"""Create a connection to send messages to an MMTP server.
Raises socket.error if the connection fails.
-
+
ip -- The IP of the destination server.
port -- The port to connect to.
keyID -- None, or the expected SHA1 hash of the server's public key
messageList -- a list of message payloads and control strings.
The control string "JUNK" sends 32KB of padding; the control
string "RENEGOTIATE" renegotiates the connection key.
- handleList -- a list of objects corresponding to the messages in
+ handleList -- a list of objects corresponding to the entries in
messageList. Used for callback.
sentCallback -- None, or a function of (msg, handle) to be called
whenever a message is successfully sent.
failCallback -- None, or a function of (msg, handle, retriable)
to be called when messages can't be sent.
- DOCDOC certcache,finishedCallback
-
- DOCDOC lengths of handles and messsages are equal."""
-
+ finishedCallback -- None, or a function to be called when this
+ connection is closed.
+ certCache -- an instance of PeerCertificateCache to use for
+ checking server certificates.
+ """
# Generate junk before connecting to avoid timing attacks
- self.junk = [] #DOCDOC doc this field.
+ self.junk = []
self.messageList = []
self.handleList = []
@@ -705,14 +719,17 @@
self.finished = self.__setupFinished
self.sentCallback = sentCallback
self.failCallback = failCallback
- self.finishedCallback = finishedCallback #DOCDOC
+ self.finishedCallback = finishedCallback
self.protocol = None
- self._curMessage = self._curHandle = None#DOCDOC
+ self._curMessage = self._curHandle = None
- debug("Opening client connection (fd %s)", self.fd)
+ debug("Opening client connection to %s:%s (fd %s)", ip,port,self.fd)
def addMessages(self, messages, handles):
- "DOCDOC"
+ """Given a list of messages and handles, as given to
+ MMTPServer.__init__, cause this connection to deliver that new
+ set of messages after it's done with those it's currently sending.
+ """
assert len(messages) == len(handles)
for m,h in zip(messages, handles):
if m in ("JUNK", "RENEGOTIATE"):
@@ -724,7 +741,7 @@
self.handleList.extend(handles)
def getAddr(self):
- "DOCDOC"
+ """Return an (ip,port,keyID) tuple for this connection"""
return self.ip, self.port, self.keyID
def __setupFinished(self):
@@ -756,7 +773,7 @@
sending a packet, or exits if we're done sending.
"""
inp = self.getInput()
-
+
for p in self.PROTOCOL_VERSIONS:
if inp == 'MMTP %s\r\n'%p:
trace("Speaking MMTP version %s with %s", p, self.address)
@@ -777,7 +794,7 @@
return
msg = self._curMessage = self.messageList[0]
- handle = self._curHandle = self.handleList[0]
+ self._curHandle = self.handleList[0]
del self.messageList[0]
del self.handleList[0]
if msg == 'RENEGOTIATE':
@@ -788,12 +805,12 @@
msg = self.junk[0]
del self.junk[0]
self.expectedDigest = sha1(msg+"RECEIVED JUNK")
- self.rejectDigest = sha1(msg+"REJECTED")
+ self.rejectDigest = sha1(msg+"REJECTED")
msg = JUNK_CONTROL+msg+sha1(msg+"JUNK")
self.isJunk = 1
else:
self.expectedDigest = sha1(msg+"RECEIVED")
- self.rejectDigest = sha1(msg+"REJECTED")
+ self.rejectDigest = sha1(msg+"REJECTED")
msg = SEND_CONTROL+msg+sha1(msg+"SEND")
self.isJunk = 0
@@ -856,7 +873,7 @@
if self.finishedCallback is not None:
self.finishedCallback()
-
+
def shutdownFinished(self):
if self.finishedCallback is not None:
self.finishedCallback()
@@ -869,9 +886,13 @@
MMTPClientConnection, with a function to add new connections, and
callbacks for message success and failure."""
##
- # clientConByAddr
- # certificateCache
- #DOCDOC fields
+ # context: a TLSContext object to use for newly received connections.
+ # clientConByAddr: A map from 3-tuples returned by MMTPClientConnection.
+ # getAddr, to MMTPClientConnection objects.
+ # certificateCache: A PeerCertificateCache object.
+ # listener: A ListenConnection object.
+ # _timeout: The number of seconds of inactivity to allow on a connection
+ # before formerly shutting it down.
def __init__(self, config, tls):
AsyncServer.__init__(self)
@@ -894,7 +915,7 @@
self._newMMTPConnection)
#self.config = config
self.listener.register(self)
- self._timeout = config['Server']['Timeout'][2]
+ self._timeout = config['Server']['Timeout'].getSeconds()
self.clientConByAddr = {}
self.certificateCache = PeerCertificateCache()
@@ -934,7 +955,7 @@
assert len(h) < 32
try:
- #DOCDOC
+ # Is there an existing connection open to the right server?
con = self.clientConByAddr[(ip,port,keyID)]
LOG.debug("Queueing %s messages on open connection to %s:%s",
len(messages), ip, port)
@@ -944,6 +965,7 @@
pass
try:
+ # There isn't any connection to the right server. Open one...
addr = (ip, port, keyID)
finished = lambda addr=addr, self=self: self.__clientFinished(addr)
con = MMTPClientConnection(self.context,
@@ -953,6 +975,7 @@
finishedCallback=finished,
certCache=self.certificateCache)
con.register(self)
+ # ...and register it in clientConByAddr
assert addr == con.getAddr()
self.clientConByAddr[addr] = con
except socket.error, e:
@@ -962,7 +985,7 @@
self.onMessageUndeliverable(m,h,1)
def __clientFinished(self, addr):
- """DOCDOC"""
+ """Called when a client connection runs out of messages to send."""
try:
del self.clientConByAddr[addr]
except KeyError:
@@ -970,10 +993,15 @@
addr)
def onMessageReceived(self, msg):
+ """Abstract function. Called when we get a message"""
pass
def onMessageUndeliverable(self, msg, handle, retriable):
+ """Abstract function: Called when an attempt to deliver a
+ message fails."""
pass
def onMessageSent(self, msg, handle):
+ """Abstract function: Called when an attempt to deliver a
+ message succeeds."""
pass
Index: Modules.py
===================================================================
RCS file: /home/minion/cvsroot/src/minion/lib/mixminion/server/Modules.py,v
retrieving revision 1.36
retrieving revision 1.37
diff -u -d -r1.36 -r1.37
--- Modules.py 5 May 2003 00:38:46 -0000 1.36
+++ Modules.py 17 May 2003 00:08:45 -0000 1.37
@@ -70,7 +70,7 @@
in Config.py"""
raise NotImplementedError("getConfigSyntax")
- def validateConfig(self, sections, entries, lines, contents):
+ def validateConfig(self, config, lines, contents):
"""See mixminion.Config.validate"""
pass
@@ -139,7 +139,6 @@
res = self.module.processMessage(packet)
if res == DELIVER_OK:
EventStats.log.successfulDelivery() #XXXX
- return
elif res == DELIVER_FAIL_RETRY:
LOG.error("Unable to retry delivery for message")
EventStats.log.unretriableDelivery() #XXXX
@@ -170,7 +169,7 @@
self.module = module
def _deliverMessages(self, msgList):
- for handle, packet, n_retries in msgList:
+ for handle, packet in msgList:
try:
EventStats.log.attemptedDelivery() #XXXX
result = self.module.processMessage(packet)
@@ -181,6 +180,7 @@
self.deliveryFailed(handle, 1)
EventStats.log.failedDelivery() #XXXX
else:
+ assert result == DELIVER_FAIL_NORETRY
LOG.error("Unable to deliver message")
self.deliveryFailed(handle, 0)
EventStats.log.unretriableDelivery() #XXXX
@@ -260,7 +260,7 @@
# _isConfigured: flag: has this modulemanager's configure method been
# called?
# thread: None, or a DeliveryThread object.
-
+
def __init__(self):
"Create a new ModuleManager"
self.syntax = {}
@@ -346,10 +346,10 @@
except Exception, e:
raise MixError("Error initializing module %s" %className)
- def validate(self, sections, entries, lines, contents):
+ def validate(self, config, lines, contents):
# (As in ServerConfig)
for m in self.modules:
- m.validateConfig(sections, entries, lines, contents)
+ m.validateConfig(config, lines, contents)
def configure(self, config):
self._setQueueRoot(os.path.join(config['Server']['Homedir'],
@@ -399,7 +399,7 @@
exit module, and queue the packet for delivey by that exit module.
"""
exitType = packet.getExitType()
-
+
mod = self.typeToModule.get(exitType, None)
if mod is None:
LOG.error("Unable to handle message with unknown type %s",
@@ -408,9 +408,9 @@
queue = self.queues[mod.getName()]
LOG.debug("Delivering message %r (type %04x) via module %s",
packet.getContents()[:8], exitType, mod.getName())
-
+
queue.queueDeliveryMessage(packet)
-
+
def shutdown(self):
"""Tell the delivery thread (if any) to stop."""
if self.thread is not None:
@@ -632,8 +632,8 @@
'SMTPServer' : ('ALLOW', None, 'localhost') }
}
- def validateConfig(self, sections, entries, lines, contents):
- sec = sections['Delivery/MBOX']
+ def validateConfig(self, config, lines, contents):
+ sec = config['Delivery/MBOX']
if not sec.get('Enabled'):
return
for field in ['AddressFile', 'ReturnAddress', 'RemoveContact',
@@ -648,9 +648,7 @@
LOG.warn("Value of %s (%s) doesn't look like an email address",
field, sec[field])
- mixInterval = sections['Server']['MixInterval'][2]
- mixminion.server.ServerConfig._validateRetrySchedule(
- mixInterval, entries, "Delivery/MBOX")
+ config.validateRetrySchedule("Delivery/MBOX")
def configure(self, config, moduleManager):
if not config['Delivery/MBOX'].get("Enabled", 0):
@@ -790,8 +788,8 @@
}
}
- def validateConfig(self, sections, entries, lines, contents):
- sec = sections['Delivery/SMTP']
+ def validateConfig(self, config, lines, contents):
+ sec = config['Delivery/SMTP']
if not sec.get('Enabled'):
return
for field in 'SMTPServer', 'ReturnAddress':
@@ -804,9 +802,7 @@
LOG.warn("Return address (%s) doesn't look like an email address",
sec['ReturnAddress'])
- mixInterval = sections['Server']['MixInterval'][2]
- mixminion.server.ServerConfig._validateRetrySchedule(
- mixInterval, entries, "Delivery/SMTP")
+ config.validateRetrySchedule("Delivery/SMTP")
def configure(self, config, manager):
sec = config['Delivery/SMTP']
@@ -886,15 +882,13 @@
}
}
- def validateConfig(self, sections, entries, lines, contents):
+ def validateConfig(self, config, lines, contents):
#XXXX write more
- sec = sections['Delivery/SMTP-Via-Mixmaster']
+ sec = config['Delivery/SMTP-Via-Mixmaster']
if not sec.get("Enabled"):
return
- mixInterval = sections['Server']['MixInterval'][2]
- mixminion.server.ServerConfig._validateRetrySchedule(
- mixInterval, entries, "Delivery/SMTP-Via-Mixmaster")
-
+ config.validateRetrySchedule("Delivery/SMTP-Via-Mixmaster")
+
def configure(self, config, manager):
sec = config['Delivery/SMTP-Via-Mixmaster']
if not sec.get("Enabled", 0):
Index: PacketHandler.py
===================================================================
RCS file: /home/minion/cvsroot/src/minion/lib/mixminion/server/PacketHandler.py,v
retrieving revision 1.15
retrieving revision 1.16
diff -u -d -r1.15 -r1.16
--- PacketHandler.py 5 May 2003 00:38:46 -0000 1.15
+++ PacketHandler.py 17 May 2003 00:08:45 -0000 1.16
@@ -5,6 +5,7 @@
import binascii
import threading
+import types
from mixminion.Common import encodeBase64, formatBase64
import mixminion.Crypto as Crypto
@@ -31,26 +32,27 @@
# privatekeys: a list of 2-tuples of
# (1) a RSA private key that we accept
# (2) a HashLog objects corresponding to the given key
- def __init__(self, privatekey, hashlog):
- """Constructs a new packet handler, given a private key object for
- header encryption, and a hashlog object to prevent replays.
+ def __init__(self, privatekey=(), hashlog=()):
+ """Constructs a new packet handler, given a sequence of
+ private key object for header encryption, and a sequence of
+ corresponding hashlog object to prevent replays.
- A sequence of private keys may be provided, if you'd like the
- server to accept messages encrypted with any of them. Beware,
- though: PK decryption is expensive. Also, a hashlog must be
- provided for each private key.
+ The lists must be equally long. When a new packet is
+ processed, we try each of the private keys in sequence. If
+ the packet is decodeable with one of the keys, we log it in
+ the corresponding entry of the hashlog list.
"""
self.privatekeys = []
self.lock = threading.Lock()
-
- try:
- _ = privatekey[0]
- self.setKeys(privatekey, hashlog)
- except TypeError:
- self.setKeys([privatekey], [hashlog])
+
+ assert type(privatekey) in (types.ListType, types.TupleType)
+
+ self.setKeys(privatekey, hashlog)
def setKeys(self, keys, hashlogs):
- """DOCDOC"""
+ """Change the keys and hashlogs used by this PacketHandler.
+ Arguments are as to PacketHandler.__init__
+ """
self.lock.acquire()
newKeys = {}
try:
@@ -84,7 +86,7 @@
self.lock.acquire()
for _, h in self.privatekeys:
h.close()
- finally:
+ finally:
self.lock.release()
def processMessage(self, msg):
@@ -177,7 +179,7 @@
header1 = header1[overflowLength:]
header1 = subh.underflow + header1
-
+
# Decrypt the payload.
payload = Crypto.lioness_decrypt(msg.payload,
keys.getLionessKeys(Crypto.PAYLOAD_ENCRYPT_MODE))
@@ -217,7 +219,7 @@
msg = Packet.Message(header1, header2, payload).pack()
return RelayedPacket(address, msg)
-
+
class RelayedPacket:
"""A packet that is to be relayed to another server; returned by
returned by PacketHandler.processMessage."""
@@ -373,5 +375,5 @@
else:
assert self.isPlaintext()
tp = 'BIN'
-
+
return Packet.TextEncodedMessage(self.contents, tp, tag)
Index: ServerConfig.py
===================================================================
RCS file: /home/minion/cvsroot/src/minion/lib/mixminion/server/ServerConfig.py,v
retrieving revision 1.23
retrieving revision 1.24
diff -u -d -r1.23 -r1.24
--- ServerConfig.py 5 May 2003 00:38:46 -0000 1.23
+++ ServerConfig.py 17 May 2003 00:08:45 -0000 1.24
@@ -55,10 +55,10 @@
LOG.warn("Identity key encryption not yet implemented")
if server['EncryptPrivateKey']:
LOG.warn("Encrypted private keys not yet implemented")
- if server['PublicKeyLifetime'][2] < 24*60*60:
+ if server['PublicKeyLifetime'].getSeconds() < 24*60*60:
raise ConfigError("PublicKeyLifetime must be at least 1 day.")
- if server['PublicKeySloppiness'][2] > 20*60:
- raise ConfigError("PublicKeySloppiness must be <= 20 minutes.")
+ if server['PublicKeyOverlap'].getSeconds() > 6*60*60:
+ raise ConfigError("PublicKeyOverlap must be <= 6 hours")
if _haveEntry(self, 'Server', 'NoDaemon'):
LOG.warn("The NoDaemon option is obsolete. Use Daemon instead.")
@@ -66,7 +66,7 @@
if _haveEntry(self, 'Server', 'Mode'):
LOG.warn("Mode specification is not yet supported.")
- mixInterval = server['MixInterval'][2]
+ mixInterval = server['MixInterval'].getSeconds()
if mixInterval < 30*60:
LOG.warn("Dangerously low MixInterval")
if server['MixAlgorithm'] == 'TimedMixPool':
@@ -94,11 +94,9 @@
if e[0] in ('Allow', 'Deny')]:
LOG.warn("Allow/deny are not yet supported")
- _validateRetrySchedule(mixInterval, self._sectionEntries,
- "Outgoing/MMTP")
+ self.validateRetrySchedule("Outgoing/MMTP")
- self.moduleManager.validate(self._sections, self._sectionEntries,
- lines, contents)
+ self.moduleManager.validate(self, lines, contents)
def __loadModules(self, section, sectionEntries):
"""Callback from the [Server] section of a config file. Parses
@@ -124,20 +122,21 @@
server = self['Server']
if server['LogLevel'] in ('TRACE', 'DEBUG'):
reasons.append("Log is too verbose")
- if server['LogStats'] and server['StatsInterval'][2] < 24*60*60:
+ if server['LogStats'] and server['StatsInterval'].getSeconds() \
+ < 24*60*60:
reasons.append("StatsInterval is too short")
if not server["EncryptIdentityKey"]:
reasons.append("Identity key is not encrypted")
- # ????004 Pkey lifetime, sloppiness?
+ # ????004 Pkey lifetime, sloppiness?
if server["MixAlgorithm"] not in _SECURE_MIX_RULES:
reasons.append("Mix algorithm is not secure")
else:
if server["MixPoolMinSize"] < 5:
reasons.append("MixPoolMinSize is too small")
#MixPoolRate?
- if server["MixInterval"][2] < 30*60:
+ if server["MixInterval"].getSeconds() < 30*60:
reasons.append("Mix interval under 30 minutes")
-
+
# ????004 DIRSERVERS?
# ????004 Incoming/MMTP
@@ -146,40 +145,52 @@
# ????004 Modules?
-def _validateRetrySchedule(mixInterval, entries, sectionname,
- entryname='Retry'):
- """DOCDOC"""
- entry = [e for e in entries.get(sectionname,[]) if e[0] == entryname]
- if not entry:
- return
- assert len(entry) == 1
- sched = entry[0][1]
- total = reduce(operator.add, sched, 0)
+ return reasons
+
+ def validateRetrySchedule(self, sectionName, entryName='Retry'):
+ """Check whether the retry schedule in self[sectionName][entryName]
+ is reasonable. Warn or raise ConfigError if it isn't. Ignore
+ the entry if it isn't there.
+ """
+ entry = self[sectionName].get(entryName,None)
+ if not entry:
+ return
+ mixInterval = self['Server']['MixInterval'].getSeconds()
+ _validateRetrySchedule(mixInterval, entry, sectionName)
+
+def _validateRetrySchedule(mixInterval, schedule, sectionName):
+ """Backend for ServerConfig.validateRetrySchedule -- separated for testing.
+
+ mixInterval -- our batching interval.
+ schedule -- a retry schedule as returned by _parseIntervalList.
+ sectionName -- the name of the retrying subsystem: used for messages.
+ """
+ total = reduce(operator.add, schedule, 0)
# Warn if we try for less than a day.
if total < 24*60*60:
- LOG.warn("Dangerously low retry timeout for %s (<1 day)", sectionname)
-
+ LOG.warn("Dangerously low retry timeout for %s (<1 day)", sectionName)
+
# Warn if we try for more than two weeks.
if total > 2*7*24*60*60:
- LOG.warn("Very high retry timeout for %s (>14 days)", sectionname)
+ LOG.warn("Very high retry timeout for %s (>14 days)", sectionName)
# Warn if any of our intervals are less than the mix interval...
- if min(sched) < mixInterval-2:
- LOG.warn("Rounding retry intervals for %s to the nearest mix interval",
- sectionname)
+ if min(schedule) < mixInterval-2:
+ LOG.warn("Rounding retry intervals for %s to the nearest mix",
+ sectionName)
# ... or less than 5 minutes.
- elif min(sched) < 5*60:
- LOG.warn("Very fast retry intervals for %s (< 5 minutes)", sectionname)
+ elif min(schedule) < 5*60:
+ LOG.warn("Very fast retry intervals for %s (< 5 minutes)", sectionName)
# Warn if we make fewer than 5 attempts.
- if len(sched) < 5:
- LOG.warn("Dangerously low number of retries for %s (<5)", sectionname)
+ if len(schedule) < 5:
+ LOG.warn("Dangerously low number of retries for %s (<5)", sectionName)
# Warn if we make more than 50 attempts.
- if len(sched) > 50:
- LOG.warn("Very high number of retries for %s (>50)", sectionname)
+ if len(schedule) > 50:
+ LOG.warn("Very high number of retries for %s (>50)", sectionName)
#======================================================================
@@ -240,8 +251,8 @@
'IdentityKeyBits': ('ALLOW', C._parseInt, "2048"),
'PublicKeyLifetime' : ('ALLOW', C._parseInterval,
"30 days"),
- 'PublicKeySloppiness': ('ALLOW', C._parseInterval,
- "5 minutes"),
+ 'PublicKeyOverlap': ('ALLOW', C._parseInterval,
+ "5 minutes"),
'EncryptPrivateKey' : ('ALLOW', C._parseBoolean, "no"),
'Mode' : ('REQUIRE', C._parseServerMode, "local"),
'Nickname': ('ALLOW', C._parseNickname, None),
Index: ServerKeys.py
===================================================================
RCS file: /home/minion/cvsroot/src/minion/lib/mixminion/server/ServerKeys.py,v
retrieving revision 1.20
retrieving revision 1.21
diff -u -d -r1.20 -r1.21
--- ServerKeys.py 5 May 2003 00:38:46 -0000 1.20
+++ ServerKeys.py 17 May 2003 00:08:45 -0000 1.21
@@ -20,12 +20,14 @@
import mixminion.Crypto
import mixminion.Packet
import mixminion.server.HashLog
-import mixminion.server.PacketHandler
import mixminion.server.MMTPServer
+import mixminion.server.ServerMain
from mixminion.ServerInfo import ServerInfo, PACKET_KEY_BYTES, MMTP_KEY_BYTES,\
signServerInfo
-from mixminion.Common import LOG, MixError, MixFatalError, createPrivateDir, \
+
+from mixminion.Common import AtomicFile, LOG, MixError, MixFatalError, \
+ createPrivateDir, \
checkPrivateFile, formatBase64, formatDate, formatTime, previousMidnight,\
secureDelete
@@ -34,16 +36,17 @@
"""A ServerKeyring remembers current and future keys, descriptors, and
hash logs for a mixminion server.
+ DOCDOC
+
FFFF We need a way to generate keys as needed, not just a month's
FFFF worth of keys up front.
"""
## Fields:
# homeDir: server home directory
# keyDir: server key directory
- # keySloppiness: fudge-factor: how forgiving are we about key liveness?
+ # keyOverlap: How long after a new key begins do we accept the old one?
# keyIntervals: list of (start, end, keyset Name)
- # liveKey: list of (start, end, keyset name for current key.)
- # nextRotation: time_t when this key expires.
+ # nextRotation: time_t when this key expires, DOCDOCDOC not so.
# keyRange: tuple of (firstKey, lastKey) to represent which key names
# have keys on disk.
@@ -78,7 +81,8 @@
self.homeDir = config['Server']['Homedir']
self.keyDir = os.path.join(self.homeDir, 'keys')
self.hashDir = os.path.join(self.homeDir, 'work', 'hashlogs')
- self.keySloppiness = config['Server']['PublicKeySloppiness'][2]
+ self.keyOverlap = config['Server']['PublicKeyOverlap'].getSeconds()
+ self.nextUpdate = None
self.checkKeys()
def checkKeys(self):
@@ -144,10 +148,6 @@
LOG.warn("Gap in key schedule: no key from %s to %s",
formatDate(end), formatDate(start))
- self.nextKeyRotation = 0 # Make sure that now > nextKeyRotation before
- # we call _getLiveKey()
- self._getLiveKey() # Set up liveKey, nextKeyRotation.
-
def getIdentityKey(self):
"""Return this server's identity key. Generate one if it doesn't
exist."""
@@ -216,7 +216,7 @@
keyname = "%04d" % keynum
- nextStart = startAt + self.config['Server']['PublicKeyLifetime'][2]
+ nextStart = startAt + self.config['Server']['PublicKeyLifetime'].getSeconds()
LOG.info("Generating key %s to run from %s through %s (GMT)",
keyname, formatDate(startAt),
@@ -241,7 +241,7 @@
else:
expiryStr = ""
- cutoff = now - self.keySloppiness
+ cutoff = now - self.keyOverlap
for va, vu, name in self.keyIntervals:
if vu >= cutoff:
@@ -259,47 +259,50 @@
self.checkKeys()
- def _getLiveKey(self, when=None):
- """Find the first key that is now valid. Return (Valid-after,
+ def _getLiveKeys(self, now=None):
+ """Find all keys that are now valid. Return list of (Valid-after,
valid-util, name)."""
if not self.keyIntervals:
- self.liveKey = None
- self.nextKeyRotation = 0
- return None
-
- w = when
- if when is None:
- when = time.time()
- if when < self.nextKeyRotation:
- return self.liveKey
+ return []
+ if now is None:
+ now = time.time()
+ if self.nextUpdate and now > self.nextUpdate:
+ self.nextUpdate = None
- idx = bisect.bisect(self.keyIntervals, (when, None, None))-1
- k = self.keyIntervals[idx]
- if w is None:
- self.liveKey = k
- self.nextKeyRotation = k[1]
+ cutoff = now-self.keyOverlap
+ # A key is live if
+ # * it became valid before now, and
+ # * it did not become invalid until keyOverlap seconds ago
- return k
+ return [ k for k in self.keyIntervals
+ if k[0] < now and k[1] > cutoff ]
- def getNextKeyRotation(self):
- """Return the expiration time of the current key"""
- return self.nextKeyRotation
+ def getServerKeysets(self):
+ """Return a ServerKeyset object for the currently live key.
- def getServerKeyset(self):
- """Return a ServerKeyset object for the currently live key."""
+ DOCDOC"""
# FFFF Support passwords on keys
- _, _, name = self._getLiveKey()
- keyset = ServerKeyset(self.keyDir, name, self.hashDir)
- keyset.load()
- return keyset
+ keysets = [ ]
+ for va, vu, name in self._getLiveKeys():
+ ks = ServerKeyset(self.keyDir, name, self.hashDir)
+ ks.validAfter = va
+ ks.validUntil = vu
+ ks.load()
+ keysets.append(ks)
+
+ #XXXX004 there should only be 2.
+ return keysets
def getDHFile(self):
"""Return the filename for the diffie-helman parameters for the
server. Creates the file if it doesn't yet exist."""
+ #XXXX Make me private????004
dhdir = os.path.join(self.homeDir, 'work', 'tls')
createPrivateDir(dhdir)
dhfile = os.path.join(dhdir, 'dhparam')
if not os.path.exists(dhfile):
+ # ???? This is only using 512-bit Diffie-Hellman! That isn't
+ # ???? remotely enough.
LOG.info("Generating Diffie-Helman parameters for TLS...")
mixminion._minionlib.generate_dh_parameters(dhfile, verbose=0)
LOG.info("...done")
@@ -309,25 +312,61 @@
return dhfile
- def getTLSContext(self):
+ def _getTLSContext(self, keys=None):
"""Create and return a TLS context from the currently live key."""
- keys = self.getServerKeyset()
+ if keys is None:
+ keys = self.getServerKeysets()[-1]
return mixminion._minionlib.TLSContext_new(keys.getCertFileName(),
keys.getMMTPKey(),
self.getDHFile())
- def getPacketHandler(self):
- """Create and return a PacketHandler from the currently live key."""
- keys = self.getServerKeyset()
- packetKey = keys.getPacketKey()
- hashlog = mixminion.server.HashLog.HashLog(keys.getHashLogFileName(),
- keys.getMMTPKeyID())
- return mixminion.server.PacketHandler.PacketHandler(packetKey,
- hashlog)
+ def updateKeys(self, packetHandler, mmtpServer, when=None):
+ """DOCDOC: Return next rotation."""
+ self.removeDeadKeys()
+ keys = self.getServerKeysets(when)
+ LOG.info("Updating keys: %s currently valid", len(keys))
+ if mmtpServer is not None:
+ context = self._getTLSContext(keys[-1])
+ mmtpServer.setContext(context)
+ if packetHandler is not None:
+ packetKeys = []
+ hashLogs = []
+
+ for k in keys:
+ packetKeys.append(k.getPacketKey())
+ hashLogs.append(mixminion.server.HashLog.HashLog(
+ k.getHashLogFileName(), k.getPacketKeyID()))
+ packetHandler.setKeys(packetkeys, hashLogs)
+
+ self.getNextKeyRotation(keys)
+
+ def getNextKeyRotation(self, keys=None):
+ if self.nextUpdate is None:
+ if keys is None:
+ keys = self.getServerKeysets()
+ addKeyEvents = []
+ rmKeyEvents = []
+ for k in keys:
+ va, vu = k.getLiveness()
+ rmKeyEvents.append(vu+self.keyOverlap)
+ addKeyEvents.append(vu)
+ add = min(addKeyEvents); rm = min(rmKeyEvents)
+
+ if add < rm:
+ LOG.info("Next event: new key becomes valid at %s",
+ formatTime(add,1))
+ self.nextUpdate = add
+ else:
+ LOG.info("Next event: old key is removed at %s",
+ formatTime(rm,1))
+ self.nextUpdate = rm
+
+
+ return self.nextUpdate
def getAddress(self):
"""Return out current ip/port/keyid tuple"""
- keys = self.getServerKeyset()
+ keys = self.getServerKeysets()[0]
desc = keys.getServerDescriptor()
return (desc['Incoming/MMTP']['IP'],
desc['Incoming/MMTP']['Port'],
@@ -354,16 +393,24 @@
# descFile: filename of this keyset's server descriptor.
#
# packetKey, mmtpKey: This server's actual short-term keys.
+ # DOCDOC serverinfo, validAfter, validUntil
def __init__(self, keyroot, keyname, hashroot):
"""Load a set of keys named "keyname" on a server where all keys
are stored under the directory "keyroot" and hashlogs are stored
under "hashroot". """
+ self.keyroot = keyroot
+ self.keyname = keyname
+ self.hashroot= hashroot
+
keydir = os.path.join(keyroot, "key_"+keyname)
self.hashlogFile = os.path.join(hashroot, "hash_"+keyname)
self.packetKeyFile = os.path.join(keydir, "mix.key")
self.mmtpKeyFile = os.path.join(keydir, "mmtp.key")
self.certFile = os.path.join(keydir, "mmtp.cert")
self.descFile = os.path.join(keydir, "ServerDesc")
+ self.serverinfo = None
+ self.validAfter = None
+ self.validUntil = None
if not os.path.exists(keydir):
createPrivateDir(keydir)
@@ -387,13 +434,33 @@
def getDescriptorFileName(self): return self.descFile
def getPacketKey(self): return self.packetKey
def getMMTPKey(self): return self.mmtpKey
- def getMMTPKeyID(self):
- "Return the sha1 hash of the asn1 encoding of the MMTP public key"
- return mixminion.Crypto.sha1(self.mmtpKey.encode_key(1))
+ def getPacketKeyID(self):
+ "Return the sha1 hash of the asn1 encoding of the packet public key"
+ return mixminion.Crypto.sha1(self.packetKey.encode_key(1))
def getServerDescriptor(self):
- return ServerInfo(fname=self.descFile)
+ if self.serverinfo is None:
+ self.serverinfo = ServerInfo(fname=self.descFile)
+ return self.serverinfo
+ def getLiveness(self):
+ if self.validAfter is None or self.validUntil is None:
+ info = self.getServerDescriptor()
+ self.validAfter = info['Server']['Valid-After']
+ self.validUntil = info['Server']['Valid-Until']
+ return self.validAfter, self.validUntil
+ def regenerateServerDescriptor(self, config, identityKey, validAt=None):
+ """DOCDOC"""
+ self.load()
+ if validAt is None:
+ validAt = self.getLiveness()[0]
+ generateServerDescriptorAndKeys(config, identityKey,
+ self.keyroot, self.keyname, self.hashroot,
+ validAt=validAt, useServerKeys=1)
+ self.serverinfo = self.validAfter = self.validUntil = None
class _WarnWrapper:
+ """Helper for 'checkDescriptorConsistency' to keep its implementation
+ short. Counts the number of times it's invoked, and delegates to
+ LOG.warn if silence is false."""
def __init__(self, silence):
self.silence = silence
self.called = 0
@@ -403,12 +470,12 @@
LOG.warn(*args)
def checkDescriptorConsistency(info, config, log=1):
- """DOCDOC
+ """Given a ServerInfo and a ServerConfig, compare them for consistency.
- Return true iff info may have come from 'config'. If log is true,
- warn as well. Does not check keys.
+ Return true iff info may have come from 'config'. If 'log' is
+ true, warn as well. Does not check keys.
"""
-
+
if log:
warn = _WarnWrapper(0)
else:
@@ -419,7 +486,7 @@
if config_s['Nickname'] and (info_s['Nickname'] != config_s['Nickname']):
warn("Mismatched nicknames: %s in configuration; %s published.",
config_s['Nickname'], info_s['Nickname'])
-
+
idBits = info_s['Identity'].get_modulus_bytes()*8
confIDBits = config_s['IdentityKeyBits']
if idBits != confIDBits:
@@ -438,7 +505,7 @@
warn("Mismatched comments field.")
if (previousMidnight(info_s['Valid-Until']) !=
- previousMidnight(config_s['PublicKeyLifetime'][2] +
+ previousMidnight(config_s['PublicKeyLifetime'].getSeconds() +
info_s['Valid-After'])):
warn("Published lifetime does not match PublicKeyLifetime")
@@ -478,7 +545,7 @@
warn("%s enabled, but not published.", section)
return not warn.called
-
+
#----------------------------------------------------------------------
# Functionality to generate keys and server descriptors
@@ -489,7 +556,7 @@
def generateServerDescriptorAndKeys(config, identityKey, keydir, keyname,
hashdir, validAt=None, now=None,
- useServerKeys=None):
+ useServerKeys=0):
#XXXX reorder args
"""Generate and sign a new server descriptor, and generate all the keys to
go with it.
@@ -501,10 +568,15 @@
hashdir -- The root directory for storing hash logs.
validAt -- The starting time (in seconds) for this key's lifetime.
- DOCDOC useServerKeys,
+ DOCDOC useServerKeys
"""
- if useServerKeys is None:
+ if useServerKeys:
+ serverKeys = ServerKeyset(keydir, keyname, hashdir)
+ serverKeys.load()
+ packetKey = serverKeys.packetKey
+ mmtpKey = serverKeys.mmtpKey # not used
+ else:
# First, we generate both of our short-term keys...
packetKey = mixminion.Crypto.pk_generate(PACKET_KEY_BYTES*8)
mmtpKey = mixminion.Crypto.pk_generate(MMTP_KEY_BYTES*8)
@@ -515,12 +587,6 @@
serverKeys.packetKey = packetKey
serverKeys.mmtpKey = mmtpKey
serverKeys.save()
- else:
- #XXXX drop this once we've tested and added more validation logic.
- LOG.warn("EXPERIMENTAL FEATURE: Regenerating server descriptor from old keys")
- serverKeys = useServerKeys
- packetKey = serverKeys.getPacketKey()
- mmtpKey = serverKeys.getMMTPKey()
# FFFF unused
# allowIncoming = config['Incoming/MMTP'].get('Enabled', 0)
@@ -547,15 +613,14 @@
# Calculate descriptor and X509 certificate lifetimes.
# (Round validAt to previous mignight.)
validAt = mixminion.Common.previousMidnight(validAt+30)
- validUntil = validAt + config['Server']['PublicKeyLifetime'][2]
+ validUntil = validAt + config['Server']['PublicKeyLifetime'].getSeconds()
certStarts = validAt - CERTIFICATE_EXPIRY_SLOPPINESS
- certEnds = validUntil + CERTIFICATE_EXPIRY_SLOPPINESS + \
- config['Server']['PublicKeySloppiness'][2]
+ certEnds = validUntil + CERTIFICATE_EXPIRY_SLOPPINESS
- if useServerKeys is None:
- # Create the X509 certificates
- generateCertChain(serverKeys.getCertFileName(),
- mmtpKey, identityKey, nickname, certStarts, certEnds)
+ # Create the X509 certificates in any case, in case one of the parameters
+ # has changed.
+ generateCertChain(serverKeys.getCertFileName(),
+ mmtpKey, identityKey, nickname, certStarts, certEnds)
mmtpProtocolsIn = mixminion.server.MMTPServer.MMTPServerConnection \
.PROTOCOL_VERSIONS[:]
@@ -581,7 +646,7 @@
"ValidUntil": formatDate(validUntil),
"PacketKey":
formatBase64(mixminion.Crypto.pk_encode_public_key(packetKey)),
- "KeyID": identityKeyID,
+ "KeyID": identityKeyID,
"MMTPProtocolsIn" : mmtpProtocolsIn,
"MMTPProtocolsOut" : mmtpProtocolsOut,
"PacketFormat" : "%s.%s"%(mixminion.Packet.MAJOR_NO,
@@ -664,11 +729,13 @@
info = signServerInfo(info, identityKey)
# Write the desciptor
- f = open(serverKeys.getDescriptorFileName(), 'w')
+ f = AtomicFile(serverKeys.getDescriptorFileName(), 'w')
try:
f.write(info)
- finally:
f.close()
+ except:
+ f.discard()
+ raise
# This is for debugging: we try to parse and validate the descriptor
# we just made.
@@ -755,7 +822,7 @@
def generateCertChain(filename, mmtpKey, identityKey, nickname,
certStarts, certEnds):
- "DOCDOC"
+ """Create a two-certificate chain DOCDOC"""
fname = filename+"_tmp"
mixminion.Crypto.generate_cert(fname,
mmtpKey, identityKey,
Index: ServerMain.py
===================================================================
RCS file: /home/minion/cvsroot/src/minion/lib/mixminion/server/ServerMain.py,v
retrieving revision 1.51
retrieving revision 1.52
diff -u -d -r1.51 -r1.52
--- ServerMain.py 5 May 2003 00:38:46 -0000 1.51
+++ ServerMain.py 17 May 2003 00:08:45 -0000 1.52
@@ -39,7 +39,7 @@
installSIGCHLDHandler, Lockfile, secureDelete, waitForChildren
class IncomingQueue(mixminion.server.ServerQueue.Queue):
- """A DeliveryQueue to accept packets from incoming MMTP connections,
+ """A Queue to accept packets from incoming MMTP connections,
and hold them until they can be processed. As packets arrive, and
are stored to disk, we notify a message queue so that another thread
can read them."""
@@ -126,7 +126,7 @@
queue location."""
server = config['Server']
- interval = server['MixInterval'][2]
+ interval = server['MixInterval'].getSeconds()
if server['MixAlgorithm'] == 'TimedMixPool':
self.queue = mixminion.server.ServerQueue.TimedMixPool(
location=queueDir, interval=interval)
@@ -177,7 +177,7 @@
handles = self.queue.getBatch()
LOG.debug("%s messages in the mix pool; delivering %s.",
self.queue.count(), len(handles))
-
+
for h in handles:
packet = self.queue.getObject(h)
if type(packet) == type(()):
@@ -209,7 +209,8 @@
## Fields:
# server -- an instance of _MMTPServer
# addr -- (publishedIP, publishedPort, publishedKeyID)
- # incomingQueue -- DOCDOC
+ # incomingQueue -- pointer to IncomingQueue object to be used for
+ # self->self communication.
def __init__(self, location, (ip,port,keyid)):
"""Create a new OutgoingQueue that stores its messages in a given
location."""
@@ -224,8 +225,9 @@
self.setRetrySchedule(retry)
def connectQueues(self, server, incoming):
- """Set the MMTPServer that this OutgoingQueue informs of its
- deliverable messages."""
+ """Set the MMTPServer and IncomingQueue that this
+ OutgoingQueue informs of its deliverable messages."""
+
self.server = server
self.incomingQueue = incoming
@@ -233,7 +235,7 @@
"Implementation of abstract method from DeliveryQueue."
# Map from addr -> [ (handle, msg) ... ]
msgs = {}
- for handle, packet, n_retries in msgList:
+ for handle, packet in msgList:
if not isinstance(packet,
mixminion.server.PacketHandler.RelayedPacket):
LOG.warn("Skipping packet in obsolete format")
@@ -290,7 +292,7 @@
EventStats.log.failedRelay() # XXXX replace with addr
else:
EventStats.log.unretriableRelay() # XXXX replace with addr
-
+
#----------------------------------------------------------------------
class CleaningThread(threading.Thread):
"""Thread that handles file deletion. Some methods of secure deletion
@@ -402,23 +404,38 @@
#----------------------------------------------------------------------
class _Scheduler:
- """Mixin class for server. Implements DOCDOC"""
- # DOCDOC
+ """Mixin class for server. Implements a priority queue of ongoing,
+ scheduled tasks with a loose (few seconds) granularity.
+ """
+ # Fields:
+ # scheduledEvents: list of (time, identifying-string, callable)
+ # Sorted by time. We could use a heap here instead, but
+ # that doesn't turn into a net benefit until we have a hundred
+ # events or so.
def __init__(self):
+ """Create a new _Scheduler"""
self.scheduledEvents = []
def firstEventTime(self):
+ """Return the time at which the earliest-scheduled event is
+ supposed to occur. Returns -1 if no events.
+ """
if self.scheduledEvents:
return self.scheduledEvents[0][0]
else:
return -1
def scheduleOnce(self, when, name, cb):
+ """Schedule a callback function, 'cb', to be invoked at time 'when.'
+ """
assert type(name) is StringType
assert type(when) in (IntType, LongType, FloatType)
insort(self.scheduledEvents, (when, name, cb))
def scheduleRecurring(self, first, interval, name, cb):
+ """Schedule a callback function 'cb' to be invoked at time 'first,'
+ and every 'interval' seconds thereafter.
+ """
assert type(name) is StringType
assert type(first) in (IntType, LongType, FloatType)
assert type(interval) in (IntType, LongType, FloatType)
@@ -427,20 +444,42 @@
_RecurringEvent(name, cb, self, next)))
def scheduleRecurringComplex(self, first, name, cb, nextFn):
+ """Schedule a callback function 'cb' to be invoked at time 'first,'
+ and thereafter at times returned by 'nextFn'.
+
+ (nextFn is called immediately after the callback is invoked,
+ every time it is invoked, and should return a time at which.)
+ """
assert type(name) is StringType
assert type(first) in (IntType, LongType, FloatType)
insort(self.scheduledEvents, (first, name,
_RecurringEvent(name, cb, self, nextFn)))
def processEvents(self, now=None):
+ """Run all events that are scheduled to occur before 'now'.
+
+ Note: if an event reschedules itself for a time _before_ now,
+ it will only be run once per invocation of processEvents.
+
+ The right way to run this class is something like:
+ while 1:
+ interval = time.time() - scheduler.firstEventTime()
+ if interval > 0:
+ time.sleep(interval)
+ # or maybe, select.select(...,...,...,interval)
+ scheduler.processEvents()
+ """
if now is None: now = time.time()
se = self.scheduledEvents
+ cbs = []
while se and se[0][0] <= now:
- cb = se[0][2]
+ cbs.append(se[0][2])
del se[0]
+ for cb in cbs:
cb()
class _RecurringEvent:
+ """helper for _Scheduler. Calls a callback, then reschedules it."""
def __init__(self, name, cb, scheduler, nextFn):
self.name = name
self.cb = cb
@@ -499,20 +538,20 @@
#XXXX004 Catch ConfigError for bad serverinfo.
#XXXX004 Check whether config matches serverinfo
self.keyring = mixminion.server.ServerKeys.ServerKeyring(config)
- if self.keyring._getLiveKey() is None:
+ if not self.keyring.getLiveKeys():
LOG.info("Generating a month's worth of keys.")
LOG.info("(Don't count on this feature in future versions.)")
# We might not be able to do this, if we password-encrypt keys
- keylife = config['Server']['PublicKeyLifetime'][2]
+ keylife = config['Server']['PublicKeyLifetime'].getSeconds()
nKeys = ceilDiv(30*24*60*60, keylife)
self.keyring.createKeys(nKeys)
LOG.debug("Initializing packet handler")
- self.packetHandler = self.keyring.getPacketHandler()
- LOG.debug("Initializing TLS context")
- tlsContext = self.keyring.getTLSContext()
+ self.packetHandler = mixminion.server.PacketHandler.PacketHandler()
LOG.debug("Initializing MMTP server")
- self.mmtpServer = _MMTPServer(config, tlsContext)
+ self.mmtpServer = _MMTPServer(config, None)
+ LOG.debug("Initializing keys")
+ self.keyring.updateKeys(self.packetHandler, self.mmtpServer)
publishedIP, publishedPort, publishedKeyID = self.keyring.getAddress()
@@ -562,6 +601,10 @@
self.processingThread.start()
self.moduleManager.startThreading()
+ def updateKeys(self):
+ """DOCDOC"""
+ self.keyring.updateKeys(self.packetHandler, self.mmtpServer)
+
def run(self):
"""Run the server; don't return unless we hit an exception."""
global GOT_HUP
@@ -575,19 +618,23 @@
self.scheduleRecurring(now+600, 600, "SHRED", self.cleanQueues)
self.scheduleRecurring(now+180, 180, "WAIT",
lambda: waitForChildren(blocking=0))
- if EventStats.log.getNextRotation():#XXXX!!!!
+ if EventStats.log.getNextRotation():
self.scheduleRecurring(now+300, 300, "ES_SAVE",
- EventStats.log.save)
+ lambda: EventStats.log.save)
self.scheduleRecurringComplex(EventStats.log.getNextRotation(),
- "ES_ROTATE",
- EventStats.log.rotate,
- EventStats.log.getNextRotation)
+ "ES_ROTATE",
+ lambda: EventStats.log.rotate,
+ lambda: EventStats.log.getNextRotation)
self.scheduleRecurringComplex(self.mmtpServer.getNextTimeoutTime(now),
"TIMEOUT",
self.mmtpServer.tryTimeout,
self.mmtpServer.getNextTimeoutTime)
+ self.scheduleRecurringComplex(self.keyring.getNextKeyRotation(),
+ self.updateKeys,
+ "KEY_ROTATE",
+ self.keyring.getKeyRotation)
nextMix = self.mixPool.getNextMixTime(now)
LOG.debug("First mix at %s", formatTime(nextMix,1))
@@ -616,9 +663,7 @@
return
elif GOT_HUP:
LOG.info("Caught sighup")
- LOG.info("Resetting logs")
- LOG.reset()
- EventStats.log.save()
+ self.reset()
GOT_HUP = 0
# Make sure that our worker threads are still running.
if not (self.cleaningThread.isAlive() and
@@ -626,7 +671,7 @@
self.moduleManager.thread.isAlive()):
LOG.fatal("One of our threads has halted; shutting down.")
return
-
+
# Calculate remaining time until the next event.
now = time.time()
timeLeft = nextEventTime - now
@@ -634,6 +679,14 @@
# An event has fired.
self.processEvents()
+ def doReset(self):
+ LOG.info("Resetting logs")
+ LOG.reset()
+ EventStats.log.save()
+ LOG.info("Checking for key rotation")
+ self.keyring.checkKeys()
+ self.updateKeys()
+
def doMix(self):
now = time.time()
# Before we mix, we need to log the hashes to avoid replays.
@@ -788,7 +841,7 @@
def runServer(cmd, args):
if cmd.endswith(" server"):
print "Obsolete command. Use 'mixminion server-start' instead."
-
+
config = configFromServerArgs(cmd, args)
try:
# Configure the log, but delay disabling stderr until the last
@@ -834,7 +887,7 @@
LOG.fatal_exc(info,"Exception while configuring server")
LOG.fatal("Shutting down because of exception: %s", info[0])
sys.exit(1)
-
+
LOG.info("Starting server: Mixminion %s", mixminion.__version__)
try:
# We keep the console log open as long as possible so we can catch
@@ -898,12 +951,6 @@
assert cmd.endswith("reload-server") or cmd.endswith("server-reload")
reload = 1
- #XXXX004 remove this.
- if cmd.endswith("stop-server"):
- print "Obsolete command. Use 'mixminion server-stop' instead."
- elif cmd.endswith("reload-server"):
- print "Obsolete command. Use 'mixminion server-reload' instead."
-
if usage:
print _SIGNAL_SERVER_USAGE % { 'cmd' : cmd }
return
@@ -911,7 +958,10 @@
_signalServer(config, reload)
def _signalServer(config, reload):
- """DOCDOC"""
+ """Given a configuration file, sends a signal to the corresponding
+ server if it's running. If 'reload', the signal is HUP. Else,
+ the signal is TERM.
+ """
homeDir = config['Server']['Homedir']
pidFile = os.path.join(homeDir, "pid")
if not os.path.exists(pidFile):
Index: ServerQueue.py
===================================================================
RCS file: /home/minion/cvsroot/src/minion/lib/mixminion/server/ServerQueue.py,v
retrieving revision 1.13
retrieving revision 1.14
diff -u -d -r1.13 -r1.14
--- ServerQueue.py 5 May 2003 00:38:46 -0000 1.13
+++ ServerQueue.py 17 May 2003 00:08:45 -0000 1.14
@@ -12,9 +12,10 @@
import sys
import cPickle
import threading
+import types
from mixminion.Common import MixError, MixFatalError, secureDelete, LOG, \
- createPrivateDir
+ createPrivateDir, readPickled, writePickled, formatTime
from mixminion.Crypto import getCommonPRNG
__all__ = [ 'Queue', 'DeliveryQueue', 'TimedMixPool', 'CottrellMixPool',
@@ -127,7 +128,7 @@
return res
finally:
self._lock.release()
-
+
def pickRandom(self, count=None):
"""Returns a list of 'count' handles to messages in this queue.
The messages are chosen randomly, and returned in a random order.
@@ -166,7 +167,7 @@
"""Given a handle and a queue, moves the corresponding message from
this queue to the queue provided. Returns a new handle for
the message in the destination queue."""
- # Since we're switching handle, we don't want to just rename;
+ # Since we're switching handles, we don't want to just rename;
# We really want to copy and delete the old file.
try:
self._lock.acquire()
@@ -219,7 +220,7 @@
commit your changes, or abortMessage to reject them."""
file, handle = getCommonPRNG().openNewFile(self.dir, "inp_", 1)
return file, handle
-
+
def finishMessage(self, f, handle):
"""Given a file and a corresponding handle, closes the file
commits the corresponding message."""
@@ -279,7 +280,7 @@
LOG.error("Directory %s contains: %s", self.dir, contents)
self.count(1)
return
-
+
if self.n_entries < 0:
return
if s1 == 'msg' and s2 != 'msg':
@@ -289,27 +290,92 @@
finally:
self._lock.release()
+
+class _DeliveryState:
+ """Helper class: holds the state needed to schedule delivery or
+ eventual abandonmont of a message in a DeliveryQueue."""
+ ## Fields:
+ # queuedTime: time at which the corresponding message was first
+ # inserted into the queue.
+ # lastAttempt: The most recent time at which we attempted to
+ # deliver the message. (None means 'never').
+ def __init__(self, queuedTime=None, lastAttempt=None):
+ """Create a new _DeliveryState for a message received at
+ queuedTime (default now), whose last delivery attempt was
+ at lastAttempt (default never)."""
+ if queuedTime is None:
+ queuedTime = time.time()
+ self.queuedTime = queuedTime
+ self.lastAttempt = lastAttempt
+
+ def __getstate__(self):
+ # For pickling. All future versions of deliverystate will pickle
+ # to a tuple, whose first element will be a version string.
+ return ("V0", self.queuedTime, self.lastAttempt)
+
+ def __setstate__(self, state):
+ # For pickling.
+ if state[0] == "V0":
+ self.queuedTime = state[1]
+ self.lastAttempt = state[2]
+ else:
+ raise MixFatalError("Unrecognized delivery state")
+
+ def getNextAttempt(self, retrySchedule, now=None):
+ """Return the next time when we should try to deliver this message
+ according to the provided retrySchedule. If the time returned
+ is in the past, then immediate delivery is okay. If the time
+ returned is None, this message has expired and should be forgotten.
+ """
+ if not now:
+ now = time.time()
+
+ last = self.lastAttempt
+
+ # If we've never tried to deliver the message, it's ready to
+ # go immediately.
+ if last is None:
+ return now
+
+ # Otherwise, we count from the time the message was first queued,
+ # until we find a scheduled delivery that falls after the last
+ # attempted delivery.
+ #
+ # This scheduled delivery may be in the past. That's okay: it only
+ # means that we've missed a scheduled delivery, and we can try again
+ # immediately.
+ attempt = self.queuedTime
+ for interval in retrySchedule:
+ attempt += interval
+ if attempt > last:
+ return attempt
+
+ # Oops: there are no scheduled deliveries after the last delivery.
+ # Time to drop this message.
+ return None
+
+ def setLastAttempt(self, when):
+ """Update time of the last attempted delivery."""
+ self.lastAttempt = when
+
class DeliveryQueue(Queue):
- """A DeliveryQueue implements a queue that greedily sends messages
- to outgoing streams that occasionally fail. Messages in a
- DeliveryQueue are no longer unstructured text, but rather
- tuples of: (n_retries, None, message, nextAttempt), where
- n_retries is the number of delivery attempts so far,
- the message is an arbitrary pickled object, and nextAttempt is a time
- before which no further delivery should be attempted.
+ """A DeliveryQueue implements a queue that greedily sends messages to
+ outgoing streams that occasionally fail. All underlying messages
+ are pickled objects. Additionally, we store metadata about
+ attempted deliveries in the past, so we know when to schedule the
+ next delivery.
- This class is abstract. Implementors of this class should
- subclass it to add a _deliverMessages method. Multiple
- invocations of this method may be active at a given time. Upon
- success or failure, this method should cause deliverySucceeded
- or deliveryFailed to be called as appropriate.
+ This class is abstract. Implementors of this class should subclass
+ it to add a _deliverMessages method. Multiple invocations of this
+ method may be active at a given time. Upon success or failure, this
+ method should cause deliverySucceeded or deliveryFailed to be called
+ as appropriate.
- Users of this class will probably only want to call the queueMessage,
- sendReadyMessages, and nextMessageReadyAt methods.
+ Users of this class will probably only want to call the
+ queueMessage, sendReadyMessages, and nextMessageReadyAt methods.
- This class caches information about the directory state; it
- won't play nice if multiple instances are looking at the same
- directory.
+ This class caches information about the directory state; it won't
+ play nice if multiple instances are looking at the same directory.
"""
###
# Fields:
@@ -317,17 +383,36 @@
# currently sending.
# pending -- Dict from handle->time_sent, for all messages that we're
# currently sending.
- # retrySchedule: a list of intervals at which delivery of messages
+ # retrySchedule -- a list of intervals at which delivery of messages
# should be reattempted, as described in "setRetrySchedule".
- def __init__(self, location, retrySchedule=None):
+ # deliveryState -- a dict from handle->_DeliveryState object for
+ # all handles.
+ # nextAttempt -- a dict from handle->time-of-next-scheduled-delivery,
+ # for all handles. Not meaningful for handles in 'pending'.
+ # If the time is in the past, delivery can be tried now.
+ # If None, the message may be removable.
+ #
+ # XXXX Refactor as many of these fields as possible into _DeliveryState.
+ #
+ # Files:
+ # meta_* : a pickled _DeliveryState object for each message in the
+ # queue.
+ # rmv_meta_*: a dead metafile, waiting for removal.
+
+ def __init__(self, location, retrySchedule=None, now=None):
+ """Create a new DeliveryQueue object that stores its files in
+ <location>. If retrySchedule is provided, it is interpreted as
+ in setRetrySchedule."""
Queue.__init__(self, location, create=1, scrub=1)
+ self.retrySchedule = None
self._rescan()
- if retrySchedule is None:
- self.retrySchedule = None
+ if retrySchedule is not None:
+ self.setRetrySchedule(retrySchedule, now)
else:
- self.setRetrySchedule(retrySchedule)
+ self.setRetrySchedule([0], now)
+ self._repOk()
- def setRetrySchedule(self, schedule):
+ def setRetrySchedule(self, schedule, now=None):
"""Set the retry schedule for this queue. A retry schedule is
a list of integers, each representing a number of seconds.
For example, a schedule of [ 120, 120, 3600, 3600 ] will
@@ -343,151 +428,307 @@
every 30 minutes, messages will only me retried once every
30 minutes.
"""
- self.retrySchedule = schedule[:]
+ try:
+ self._lock.acquire()
+ self.retrySchedule = schedule[:]
+ self._rebuildNextAttempt(now)
+ finally:
+ self._lock.release()
- def _rescan(self):
- """Rebuild the internal state of this queue from the underlying
- directory."""
+ def _rescan(self, now=None):
+ """Helper: Rebuild the internal state of this queue from the
+ underlying directory. Trashes 'pending' and 'sendable'."""
+ fname = os.path.join(self.dir, "metadata")
try:
self._lock.acquire()
self.pending = {}
+ self.nextAttempt = {}
self.sendable = self.getAllMessages()
+ self._loadState()
+ self._rebuildNextAttempt(now)
+ self._repOk()
+ finally:
+ self._lock.release()
+
+ def _loadState(self):
+ """Read all DeliveryState objects from the disk."""
+ # must hold lock.
+ self.deliveryState = {}
+ for h in self.getAllMessages():
+ fn = os.path.join(self.dir, "meta_"+h)
+ if os.path.exists(fn):
+ self.deliveryState[h] = readPickled(fn)
+ else:
+ LOG.warn("No metadata for file handle %s", h)
+ obj = self.getObject(h)
+ #XXXX005 remove this.
+ if isinstance(obj, types.TupleType) and len(obj) == 4:
+ # This message is in an obsolete format from 0.0.3.
+ # We'd repair it, but packets from 0.0.3 are incompatible
+ # anyway.
+ LOG.info("Removing item %s in obsolete format", h)
+ self.removeMessage(h)
+ continue
+
+ self.deliveryState[h] = _DeliveryState()
+ self._writeState(h)
+
+ for fn in os.listdir(self.dir):
+ if fn.startswith("meta_"):
+ h = fn[5:]
+ if not self.deliveryState.has_key(h):
+ LOG.warn("Metadata for nonexistant handle %s", h)
+ os.unlink(os.path.join(self.dir, fn))
+
+ def _writeState(self, h):
+ """Helper method: writes out the metadata for handle 'h'. If that
+ handle has been removed, removes the metadata.
+ """
+ fn = os.path.join(self.dir, "meta_"+h)
+ ds = self.deliveryState.get(h)
+ if ds is not None:
+ writePickled(fn, self.deliveryState[h])
+ else:
+ try:
+ os.rename(fn, os.path.join(self.dir, "rmv_meta_"+h))
+ except OSError:
+ pass
+
+ def _rebuildNextAttempt(self, now=None):
+ """Helper: Reconstruct self.nextAttempt from self.retrySchedule and
+ self.deliveryState.
+
+ Callers must hold self._lock.
+ """
+ if self.retrySchedule is None:
+ rs = [0]
+ else:
+ rs = self.retrySchedule
+
+ nextAttempt = {}
+ for h, ds in self.deliveryState.items():
+ nextAttempt[h] = ds.getNextAttempt(rs, now)
+ self.nextAttempt = nextAttempt
+ self._repOk()
+
+ def _repOk(self):
+ """Raise an assertion error if the internal state of this object is
+ nonsensical."""
+ #XXXX004 Later in the release cycle, we should call this less. It
+ #XXXX004 adds ~8-9ms on my laptop for ~400 messages
+ try:
+ self._lock.acquire()
+
+ allHandles = self.getAllMessages()
+ knownHandles = self.pending.keys() + self.sendable
+ allHandles.sort()
+ knownHandles.sort()
+ assert allHandles == knownHandles
+ dsHandles = self.deliveryState.keys()
+ naHandles = self.nextAttempt.keys()
+ dsHandles.sort()
+ naHandles.sort()
+ assert allHandles == dsHandles
+ assert allHandles == naHandles
finally:
self._lock.release()
def queueMessage(self, msg):
if 1: raise MixError("Tried to call DeliveryQueue.queueMessage.")
- def queueDeliveryMessage(self, msg, retry=0, nextAttempt=0):
+ def queueDeliveryMessage(self, msg, now=None):
"""Schedule a message for delivery.
- msg -- the message. This can be any pickleable object
- retry -- how many times so far have we tried to send?
- nextAttempt -- A time before which no further attempts to
- deliver should be made.
+ msg -- the message. This can be any pickleable object.
"""
+ assert self.retrySchedule is not None
try:
self._lock.acquire()
- handle = self.queueObject( (retry, None, msg, nextAttempt) )
+ handle = self.queueObject(msg)
self.sendable.append(handle)
+ ds = self.deliveryState[handle] = _DeliveryState(now)
+ self.nextAttempt[handle] = \
+ ds.getNextAttempt(self.retrySchedule, now)
+ #self._saveState()
+ self._writeState(handle)
finally:
self._lock.release()
+ self._repOk()
return handle
- def get(self,handle):
- """Returns a (n_retries, msg, nextAttempt) tuple for a given
- message handle."""
+ def _inspect(self,handle):
+ """Returns a (msg, inserted, lastAttempt, nextAttempt) tuple
+ for a given message handle. For testing. """
+ self._repOk()
o = self.getObject(handle)
- return o[0], o[2], o[3]
+ return (o,
+ self.deliveryState[handle].queuedTime,
+ self.deliveryState[handle].lastAttempt,
+ self.nextAttempt[handle])
+
+ def removeExpiredMessages(self, now=None):
+ """Remove every message expired in this queue according to the
+ current schedule. Ordinarily, messages are removed when
+ their last delivery is over. Occasionally, however,
+ changing the schedule while the system is down can make calling
+ this method useful."""
+ try:
+ self._lock.acquire()
+ for h in self.sendable:
+ if self.nextAttempt[h] is None:
+ self.removeMessage(h)
+ finally:
+ self._lock.release()
def sendReadyMessages(self, now=None):
- """Sends all messages which are not already being sent."""
+ """Sends all messages which are not already being sent, and which
+ are scheduled to be sent."""
+ assert self.retrySchedule is not None
if now is None:
now = time.time()
+ LOG.trace("ServerQueue checking for deliverable messages in %s",
+ self.dir)
try:
self._lock.acquire()
handles = self.sendable
messages = []
self.sendable = []
for h in handles:
- retries,msg, nextAttempt = self.get(h)
- if nextAttempt <= now:
- messages.append((h, msg, retries))
+ assert not self.pending.has_key(h)
+ next = self.nextAttempt[h]
+ if next is None:
+ LOG.trace(" [%s] is expired.", h)
+ self.removeMessage(h)
+ elif next <= now:
+ LOG.trace(" [%s] is ready for delivery", h)
+ messages.append( (h, self.getObject(h)) )
self.pending[h] = now
else:
+ LOG.trace(" [%s] is not yet ready for delivery", h)
self.sendable.append(h)
+ for h in self.pending.keys():
+ LOG.trace(" [%s] is pending delivery", h)
finally:
self._lock.release()
+ self._repOk()
if messages:
self._deliverMessages(messages)
+ self._repOk()
def _deliverMessages(self, msgList):
- """Abstract method; Invoked with a list of
- (handle, message, n_retries) tuples every time we have a batch
- of messages to send.
+ """Abstract method; Invoked with a list of (handle, message)
+ tuples every time we have a batch of messages to send.
For every handle in the list, delierySucceeded or deliveryFailed
should eventually be called, or the message will sit in the queue
indefinitely, without being retried."""
- # We could implement this as a single _deliverMessage(h,addr,m,n)
+ # We could implement this as a single _deliverMessage(h,addr)
# method, but that wouldn't allow implementations to batch
# messages being sent to the same address.
raise NotImplementedError("_deliverMessages")
- def deliverySucceeded(self, handle):
- """Removes a message from the outgoing queue. This method
- should be invoked after the corresponding message has been
- successfully delivered.
- """
+ def removeMessage(self, handle):
try:
self._lock.acquire()
+ Queue.removeMessage(self, handle)
+ for d in self.pending, self.deliveryState, self.nextAttempt:
+ try:
+ del d[handle]
+ except KeyError:
+ pass
try:
- self.removeMessage(handle)
- except:
- # This should never happen.
- LOG.error_exc(sys.exc_info(), "Error removing message")
- try:
- del self.pending[handle]
- except KeyError:
- # This should never happen.
- LOG.error_exc(sys.exc_info(),
- "Handle %s was not pending", handle)
+ del self.sendable[self.sendable.index(handle)]
+ except ValueError:
+ pass
+
+ self._writeState(handle)
finally:
self._lock.release()
- def deliveryFailed(self, handle, retriable=0):
+ def removeAll(self):
+ try:
+ self._lock.acquire()
+ Queue.removeAll(self)
+ for m in os.listdir(self.dir):
+ if m[:5] == 'meta_':
+ os.rename(os.path.join(self.dir, m),
+ os.path.join(self.dir, "rmv_"+m))
+ self.deliveryState = {}
+ self.pending = {}
+ self.nextAttempt = {}
+ self.sendable = []
+ self.cleanQueue()
+ finally:
+ self._lock.release()
+
+
+ def deliverySucceeded(self, handle):
+ """Removes a message from the outgoing queue. This method
+ should be invoked after the corresponding message has been
+ successfully delivered.
+ """
+ assert self.retrySchedule is not None
+
+ LOG.trace("ServerQueue got successful delivery for %s from %s",
+ handle, self.dir)
+ self.removeMessage(handle)
+
+ def deliveryFailed(self, handle, retriable=0, now=None):
"""Removes a message from the outgoing queue, or requeues it
for delivery at a later time. This method should be
invoked after the corresponding message has been
- successfully delivered."""
+ unsuccessfully delivered."""
+ assert self.retrySchedule is not None
+ LOG.trace("ServerQueue failed to deliver %s from %s",
+ handle, self.dir)
try:
self._lock.acquire()
try:
lastAttempt = self.pending[handle]
- del self.pending[handle]
except KeyError:
# This should never happen
LOG.error_exc(sys.exc_info(),
"Handle %s was not pending", handle)
lastAttempt = 0
-
+
if retriable:
- # Queue the new one before removing the old one, for
- # crash-proofness. First, fetch the old information...
- retries, msg, schedAttempt = self.get(handle)
+ # If we can retry the message, update the deliveryState
+ # with the most recent attempt, and see if there's another
+ # attempt in the future.
+ try:
+ ds = self.deliveryState[handle]
+ except KeyError:
+ # This should never happen
+ LOG.error_exc(sys.exc_info(),
+ "Handle %s had no state", handle)
+ ds = self.deliveryState[handle] = _DeliveryState(now)
- # Multiple retry intervals may have passed in between the most
- # recent failed delivery attempt (lastAttempt) and the time it
- # was scheduled (schedAttempt). Increment 'retries' and to
- # reflect the number of retry intervals that have passed
- # between first sending the message and nextAttempt.
- if self.retrySchedule and retries < len(self.retrySchedule):
- nextAttempt = schedAttempt
- if nextAttempt == 0:
- nextAttempt = lastAttempt
- # Increment nextAttempt and retries according to the
- # retry schedule, until nextAttempt is after lastAttempt.
- while retries < len(self.retrySchedule):
- nextAttempt += self.retrySchedule[retries]
- retries += 1
- if nextAttempt > lastAttempt:
- break
- # If there are more attempts to be made, queue the message.
- if retries <= len(self.retrySchedule):
- self.queueDeliveryMessage(msg, retries, nextAttempt)
- elif not self.retrySchedule:
- #XXXX005: Make sure this error never occurs.
- LOG.error(
- "ServerQueue.deliveryFailed without retrySchedule")
- retries += 1
- nextAttempt = 0
- if retries < 10:
- self.queueDeliveryMessage(msg, retries, nextAttempt)
+ ds.setLastAttempt(lastAttempt)
+ nextAttempt = ds.getNextAttempt(self.retrySchedule, now)
+ if nextAttempt is not None:
+ LOG.trace(" (We'll %s try again at %s)", handle,
+ formatTime(nextAttempt, 1))
+ # There is another scheduled delivery attempt. Remember
+ # it, mark the message sendable again, and save our state.
+ self.nextAttempt[handle] = nextAttempt
+ self.sendable.append(handle)
+ try:
+ del self.pending[handle]
+ except KeyError:
+ LOG.error("Handle %s was not pending", handle)
- # Now, it's okay to remove the failed message.
+ self._repOk()
+ self._writeState(handle)
+ return
+
+ # Otherwise, fallthrough.
+
+ # If we reach this point, the message is undeliverable.
+ LOG.trace(" (Giving up on %s)", handle)
self.removeMessage(handle)
+ self._repOk()
finally:
self._lock.release()
@@ -584,4 +825,3 @@
class BinomialCottrellMixPool(_BinomialMixin,CottrellMixPool):
"""Same algorithm as CottrellMixPool, but instead of sending N messages
from the pool of size P, sends each message with probability N/P."""
-