[Author Prev][Author Next][Thread Prev][Thread Next][Author Index][Thread Index]
[minion-cvs] Flushing patches from my laptop)
Update of /home/minion/cvsroot/src/minion/lib/mixminion/server
In directory moria.mit.edu:/tmp/cvs-serv4734/lib/mixminion/server
Modified Files:
EventStats.py HashLog.py MMTPServer.py Modules.py
PacketHandler.py ServerConfig.py ServerKeys.py ServerMain.py
ServerQueue.py
Log Message:
(Flushing patches from my laptop)
BuildMessage, ClientMain:
- Improve comments and documentation
Common:
- Repair file mermissions better
- Don't assume all filesystems have the same block size
MMTPClient, MMTPServer:
- Increment protocol version; drop 0.1 and 0.2 support.
Packet:
- Drop obsolete packet format
test:
- Tests for rejected packets
- Tests for EventStats
EventStats:
- Debug; make rotation explicit
Hashlog:
- Don't die when a half-created hashlog is found.
- Sync on startup
Modules:
- Don't leak zombie mixmaster processes when flushing
PacketHandler:
- Change key rotation semantics from add/remove to set.
ServerConfig:
- Upgrade comments from ???? to ????004
ServerKeys
- Fix hashlog deletion
ServerMain:
- Refactor scheduler into a separate class.
- Drop umask in all cases, even when not daemon.
ServerQueue:
- Drop some legacy code
Index: EventStats.py
===================================================================
RCS file: /home/minion/cvsroot/src/minion/lib/mixminion/server/EventStats.py,v
retrieving revision 1.2
retrieving revision 1.3
diff -u -d -r1.2 -r1.3
--- EventStats.py 26 Apr 2003 14:39:59 -0000 1.2
+++ EventStats.py 5 May 2003 00:38:46 -0000 1.3
@@ -13,7 +13,7 @@
from time import time
from mixminion.Common import formatTime, LOG, previousMidnight, floorDiv, \
- createPrivateDir
+ createPrivateDir, MixError
# _EVENTS: a list of all recognized event types.
_EVENTS = [ 'ReceivedPacket',
@@ -32,6 +32,12 @@
def save(self, now=None):
"""Flushes this eventlog to disk."""
pass
+ def rotate(self, now=None):
+ """DOCDOC"""
+ pass
+ def getNextRotation(self):
+ """DOCDOC"""
+ return 0
def _log(self, event, arg=None):
"""Notes that an event has occurred.
event -- the type of event to note
@@ -118,7 +124,7 @@
periodically writes to 'historyFile' every 'interval' seconds."""
NilEventLog.__init__(self)
if os.path.exists(filename):
- # XXXX If this doesn't work, then we should
+ # XXXX If this doesn't work, then we should ????004
f = open(filename, 'rb')
self.__dict__.update(cPickle.load(f))
f.close()
@@ -155,8 +161,6 @@
to invoke."""
LOG.debug("Syncing statistics to disk")
if not now: now = time()
- if now > self.nextRotation:
- self._rotate()
tmpfile = self.filename + "_tmp"
try:
os.unlink(tmpfile)
@@ -176,9 +180,6 @@
def _log(self, event, arg=None):
try:
self._lock.acquire()
- if time() > self.nextRotation:
- self._rotate()
- self._save()
try:
self.count[event][arg] += 1
except KeyError:
@@ -189,31 +190,46 @@
finally:
self._lock.release()
+ def getNextRotation(self):
+ return self.nextRotation
+
+ def rotate(self,now=None):
+ if now is None: now = time()
+ if now < self.nextRotation:
+ raise MixError("Not ready to rotate event stats")
+ try:
+ self._lock.acquire()
+ self._rotate(now)
+ finally:
+ self._lock.release()
+
def _rotate(self, now=None):
"""Flush all events since the last rotation to the history file,
and clears the current event log."""
# Must hold lock
LOG.debug("Flushing statistics log")
- if now is None: now = time() #????
+ if now is None: now = time()
f = open(self.historyFilename, 'a')
- self.dump(f)
+ self.dump(f, now)
f.close()
- self.accumulatedTime = 0
self.count = {}
for e in _EVENTS:
self.count[e] = {}
- self.lastRotation = self.nextRotation
- self._setNextRotation()
+ self.lastRotation = now
+ self._save(now)
+ self.accumulatedTime = 0
+ self._setNextRotation(now)
- def dump(self, f):
+ def dump(self, f, now=None):
"""Write the current data to a file handle 'f'."""
+ if now is None: now = time()
try:
self._lock.acquire()
startTime = self.lastRotation
- endTime = time()
+ endTime = now
print >>f, "========== From %s to %s:" % (formatTime(startTime,1),
formatTime(endTime,1))
for event in _EVENTS:
@@ -240,8 +256,8 @@
finally:
self._lock.release()
-
def _setNextRotation(self, now=None):
+ # DOCDOC
# ???? Lock to 24-hour cycle
# This is a little weird. We won't save *until*:
@@ -253,18 +269,14 @@
# round to the hour, up to 5 minutes down and 55 up.
if not now: now = time()
- secToGo = max(0, self.rotateInterval * 0.75 - self.accumulatedTime)
+ accumulatedTime = self.accumulatedTime + (now - self.lastSave)
+ secToGo = max(0, self.rotateInterval * 0.75 - accumulatedTime)
self.nextRotation = max(self.lastRotation + self.rotateInterval,
now + secToGo)
- if not self.lastRotation: self.lastRotation = now
- if now - self.lastRotation <= self.rotateInterval * 1.2:
- base = self.lastRotation
- else:
- base = now
+ if self.nextRotation < now:
+ self.nextRotation = now
- self.nextRotation = base + self.rotateInterval
-
if (self.rotateInterval % 3600) == 0:
mid = previousMidnight(self.nextRotation)
rest = self.nextRotation - mid
Index: HashLog.py
===================================================================
RCS file: /home/minion/cvsroot/src/minion/lib/mixminion/server/HashLog.py,v
retrieving revision 1.9
retrieving revision 1.10
diff -u -d -r1.9 -r1.10
--- HashLog.py 26 Apr 2003 14:39:59 -0000 1.9
+++ HashLog.py 5 May 2003 00:38:46 -0000 1.10
@@ -8,6 +8,7 @@
import binascii
import os
+import stat
import anydbm, dumbdbm
import threading
from mixminion.Common import MixFatalError, LOG, createPrivateDir
@@ -63,9 +64,18 @@
'keyid'."""
parent = os.path.split(filename)[0]
createPrivateDir(parent)
- # XXXX004 catch empty hashlog.
- self.log = anydbm.open(filename, 'c')
+
+ # Catch empty logfiles: these can be created if we exit before
+ # syncing the log for the first time.
+ try:
+ if os.stat(filename)[stat.ST_SIZE] == 0:
+ LOG.warn("Half-created database %s found; cleaning up.")
+ os.unlink(filename)
+ except os.error:
+ pass
+
LOG.debug("Opening database %s for packet digests", filename)
+ self.log = anydbm.open(filename, 'c')
if isinstance(self.log, dumbdbm._Database):
LOG.warn("Warning: logging packet digests to a flat file.")
try:
@@ -73,7 +83,9 @@
raise MixFatalError("Log KEYID does not match current KEYID")
except KeyError:
self.log["KEYID"] = keyid
+ if hasattr(self.log, 'sync'): self.log.sync()
+ # Scan the journal file
self.journalFileName = filename+"_jrnl"
self.journal = {}
if os.path.exists(self.journalFileName):
@@ -85,7 +97,11 @@
self.journalFile = os.open(self.journalFileName,
_JOURNAL_OPEN_FLAGS|os.O_APPEND, 0600)
+
self.__lock = threading.RLock()
+
+ # On startup, we flush everything to disk.
+ self.sync()
def seenHash(self, hash):
"""Return true iff 'hash' has been logged before."""
Index: MMTPServer.py
===================================================================
RCS file: /home/minion/cvsroot/src/minion/lib/mixminion/server/MMTPServer.py,v
retrieving revision 1.25
retrieving revision 1.26
diff -u -d -r1.25 -r1.26
--- MMTPServer.py 26 Apr 2003 14:39:59 -0000 1.25
+++ MMTPServer.py 5 May 2003 00:38:46 -0000 1.26
@@ -8,9 +8,7 @@
of the protocol.
If you just want to send messages into the system, use MMTPClient.
-
- FFFF As yet unsupported are: Session resumption, key renegotiation,
- FFFF: Also unsupported: timeouts."""
+ """
# NOTE FOR THE CURIOUS: The 'asyncore' module in the standard library
# is another general select/poll wrapper... so why are we using our
@@ -133,10 +131,12 @@
if r: del self.readers[fd]
if w: del self.writers[fd]
- def tryTimeout(self, now):
+ def tryTimeout(self, now=None):
"""Timeout any connection that is too old."""
if self._timeout is None:
return
+ if now is None:
+ now = time.time()
# All connections older than 'cutoff' get purged.
cutoff = now - self._timeout
# Maintain a set of filenos for connections we've checked, so we don't
@@ -501,7 +501,7 @@
# Implementation for MMTP.
# The protocol string to send.
-PROTOCOL_STRING = "MMTP 0.1,0.2\r\n"
+PROTOCOL_STRING = "MMTP 0.3\r\n"
# The protocol specification to expect.
PROTOCOL_RE = re.compile("MMTP ([^\s\r\n]+)\r\n")
# Control line for sending a message.
@@ -528,17 +528,19 @@
# if negotiation hasn't completed.
# PROTOCOL_VERSIONS: (static) a list of protocol versions we allow,
# in decreasing order of preference.
- PROTOCOL_VERSIONS = [ '0.2', '0.1' ]
- def __init__(self, sock, tls, consumer):
+ PROTOCOL_VERSIONS = [ '0.3' ]
+ def __init__(self, sock, tls, consumer, rejectPackets=0):
"""Create an MMTP connection to receive messages sent along a given
socket. When valid packets are received, pass them to the
function 'consumer'."""
SimpleTLSConnection.__init__(self, sock, tls, 1,
"%s:%s"%sock.getpeername())
self.messageConsumer = consumer
- self.junkCallback = lambda : None
+ self.junkCallback = lambda : None #DOCDOC
+ self.rejectCallback = lambda : None #DOCDOC
self.finished = self.__setupFinished
self.protocol = None
+ self.rejectPackets = rejectPackets #DOCDOC
def __setupFinished(self):
"""Called once we're done accepting. Begins reading the protocol
@@ -593,10 +595,16 @@
if data.startswith(JUNK_CONTROL):
expectedDigest = sha1(msg+"JUNK")
replyDigest = sha1(msg+"RECEIVED JUNK")
+ replyControl = RECEIVED_CONTROL
isJunk = 1
elif data.startswith(SEND_CONTROL):
expectedDigest = sha1(msg+"SEND")
- replyDigest = sha1(msg+"RECEIVED")
+ if self.rejectPackets:
+ replyDigest = sha1(msg+"REJECTED")
+ replyControl = REJECTED_CONTROL
+ else:
+ replyDigest = sha1(msg+"RECEIVED")
+ replyControl = RECEIVED_CONTROL
isJunk = 0
else:
warn("Unrecognized command from %s. Closing connection.",
@@ -611,10 +619,12 @@
else:
debug("%s packet received from %s; Checksum valid.",
data[:4], self.address)
- self.finished = self.__sentAck
- self.beginWrite(RECEIVED_CONTROL+replyDigest)
+ self.finished = self.__sentAck
+ self.beginWrite(replyControl+replyDigest)
if isJunk:
self.junkCallback()
+ elif self.rejectPackets:
+ self.rejectCallback()
else:
self.messageConsumer(msg)
@@ -622,7 +632,6 @@
"""Called once we're done sending an ACK. Begins reading a new
message."""
debug("Send ACK for message from %s (fd %s)", self.address, self.fd)
- #FFFF Rehandshake
self.finished = self.__receivedMessage
self.expectRead(SEND_RECORD_LEN)
@@ -630,7 +639,6 @@
NULL_KEYID = "\x00"*20
-# FFFF We need to note retriable situations better.
class MMTPClientConnection(SimpleTLSConnection):
"""Asynchronious implementation of the sending ("client") side of a
mixminion connection."""
@@ -646,7 +654,7 @@
# if negotiation hasn't completed.
# PROTOCOL_VERSIONS: (static) a list of protocol versions we allow,
# in the order we offer them.
- PROTOCOL_VERSIONS = [ '0.1', '0.2' ]
+ PROTOCOL_VERSIONS = [ '0.3' ]
def __init__(self, context, ip, port, keyID, messageList, handleList,
sentCallback=None, failCallback=None, finishedCallback=None,
certCache=None):
@@ -668,7 +676,7 @@
DOCDOC certcache,finishedCallback"""
# Generate junk before connecting to avoid timing attacks
- self.junk = [] #XXXX doc this field.
+ self.junk = [] #DOCDOC doc this field.
for m in messageList:
if m == 'JUNK':
self.junk.append(getCommonPRNG().getBytes(MESSAGE_LEN))
@@ -773,10 +781,6 @@
del self.messageList[0]
msg = self.junk[0]
del self.junk[0]
- if self.protocol == '0.1':
- debug("Won't send junk to a 0.1 server.")
- self.beginNextMessage()
- return
self.expectedDigest = sha1(msg+"RECEIVED JUNK")
self.rejectDigest = sha1(msg+"REJECTED")
msg = JUNK_CONTROL+msg+sha1(msg+"JUNK")
@@ -809,7 +813,6 @@
Otherwise, begins shutting down.
"""
trace("received ack (fd %s)", self.fd)
- # FFFF Rehandshake
inp = self.getInput()
rejected = 0
if inp == REJECTED_CONTROL+self.rejectDigest:
@@ -863,7 +866,7 @@
self.context = tls
# FFFF Don't always listen; don't always retransmit!
- # FFFF Support listening on specific IPs
+ # FFFF Support listening on multiple IPs
if config['Incoming/MMTP'].get('ListenIP',None) is not None:
IP = config['Incoming/MMTP']['ListenIP']
@@ -889,9 +892,11 @@
Used to rotate keys."""
self.context = context
- def getNextTimeoutTime(self, now):
+ def getNextTimeoutTime(self, now=None):
"""Return the time at which we next purge connections, if we have
last done so at time 'now'."""
+ if now is None:
+ now = time.time()
return now + self._timeout
def _newMMTPConnection(self, sock):
@@ -958,4 +963,3 @@
def onMessageSent(self, msg, handle):
pass
-
Index: Modules.py
===================================================================
RCS file: /home/minion/cvsroot/src/minion/lib/mixminion/server/Modules.py,v
retrieving revision 1.35
retrieving revision 1.36
diff -u -d -r1.35 -r1.36
--- Modules.py 26 Apr 2003 14:39:59 -0000 1.35
+++ Modules.py 5 May 2003 00:38:46 -0000 1.36
@@ -34,7 +34,7 @@
from mixminion.Config import ConfigError, _parseBoolean, _parseCommand, \
_parseIntervalList
from mixminion.Common import LOG, createPrivateDir, MixError, isSMTPMailbox, \
- isPrintingAscii
+ isPrintingAscii, waitForChildren
from mixminion.Packet import ParseError, CompressedDataTooLong
# Return values for processMessage
@@ -225,6 +225,7 @@
LOG.info("Delivery thread shutting down.")
return
self.moduleManager._sendReadyMessages()
+ waitForChildren(blocking=0)
except:
LOG.error_exc(sys.exc_info(),
"Exception in delivery; shutting down thread.")
@@ -945,7 +946,7 @@
should be called after invocations of processMessage."""
cmd = self.command
LOG.debug("Flushing Mixmaster pool")
- os.spawnl(os.P_NOWAIT, cmd, cmd, "-S")
+ os.spawnl(os.P_WAIT, cmd, cmd, "-S")
class _MixmasterSMTPModuleDeliveryQueue(SimpleModuleDeliveryQueue):
"""Delivery queue for _MixmasterSMTPModule. Same as
Index: PacketHandler.py
===================================================================
RCS file: /home/minion/cvsroot/src/minion/lib/mixminion/server/PacketHandler.py,v
retrieving revision 1.14
retrieving revision 1.15
diff -u -d -r1.14 -r1.15
--- PacketHandler.py 26 Apr 2003 14:39:59 -0000 1.14
+++ PacketHandler.py 5 May 2003 00:38:46 -0000 1.15
@@ -12,6 +12,7 @@
import mixminion.Common as Common
import mixminion.BuildMessage
+from mixminion.ServerInfo import PACKET_KEY_BYTES
from mixminion.Common import MixError, isPrintingAscii
__all__ = [ 'PacketHandler', 'ContentError', 'DeliveryPacket', 'RelayedPacket']
@@ -27,8 +28,9 @@
to drop the message, relay the message, or send the message to
an exit handler."""
## Fields:
- # privatekey: list of RSA private keys that we accept
- # hashlog: list of HashLog objects corresponding to the keys.
+ # privatekeys: a list of 2-tuples of
+ # (1) a RSA private key that we accept
+ # (2) a HashLog objects corresponding to the given key
def __init__(self, privatekey, hashlog):
"""Constructs a new packet handler, given a private key object for
header encryption, and a hashlog object to prevent replays.
@@ -38,57 +40,52 @@
though: PK decryption is expensive. Also, a hashlog must be
provided for each private key.
"""
+ self.privatekeys = []
+ self.lock = threading.Lock()
+
try:
- # Check whether we have a key or a sequence of keys.
_ = privatekey[0]
- assert len(hashlog) == len(privatekey)
-
- self.privatekey = privatekey
- self.hashlog = hashlog
+ self.setKeys(privatekey, hashlog)
except TypeError:
- # Privatekey is not be subscriptable; we must have only one.
- self.privatekey = [privatekey]
- self.hashlog = [hashlog]
-
- self.lock = threading.Lock()
-
- def addKey(self, key, hashlog):
- """DOCDOC"""
- self.lock.acquire()
- self.privatekey.append(key)
- self.hashlog.append(hashlog)
- self.lock.release()
+ self.setKeys([privatekey], [hashlog])
- def removeKey(self, key):
+ def setKeys(self, keys, hashlogs):
"""DOCDOC"""
self.lock.acquire()
+ newKeys = {}
try:
- enc = key.encode_key(1)
- for i in range(len(self.privatekey)):
- k = self.privatekey[i]
- if k.enc(1) == enc:
- del self.privatekey[i]
- hlog = self.hashlog[i]
- del self.hashlog[i]
- hlog.close()
- return
- raise KeyError
+ # Build a set of asn.1-encoded public keys in *new* set.
+ for k in keys:
+ newKeys[k.encode_key(1)] = 1
+ if k.get_modulus_bytes() != PACKET_KEY_BYTES:
+ raise Common.MixFatalError("Incorrect packet key length")
+ # For all old public keys, if they aren't in the new set, close
+ # their hashlogs.
+ for k, h in self.privatekeys:
+ if not newKeys.get(k.encode_key(1)):
+ h.close()
+ # Now, set the keys.
+ self.privatekeys = zip(keys, hashlogs)
finally:
self.lock.release()
-
+
def syncLogs(self):
"""Sync all this PacketHandler's hashlogs."""
- self.lock.acquire()
- for h in self.hashlog:
- h.sync()
- self.lock.release()
+ try:
+ self.lock.acquire()
+ for _, h in self.privatekeys:
+ h.sync()
+ finally:
+ self.lock.release()
def close(self):
"""Close all this PacketHandler's hashlogs."""
- self.lock.acquire()
- for h in self.hashlog:
- h.close()
- self.lock.release()
+ try:
+ self.lock.acquire()
+ for _, h in self.privatekeys:
+ h.close()
+ finally:
+ self.lock.release()
def processMessage(self, msg):
"""Given a 32K mixminion message, processes it completely.
@@ -119,7 +116,7 @@
e = None
self.lock.acquire()
try:
- for pk, hashlog in zip(self.privatekey, self.hashlog):
+ for pk, hashlog in self.privatekeys:
try:
subh = Crypto.pk_decrypt(encSubh, pk)
break
Index: ServerConfig.py
===================================================================
RCS file: /home/minion/cvsroot/src/minion/lib/mixminion/server/ServerConfig.py,v
retrieving revision 1.22
retrieving revision 1.23
diff -u -d -r1.22 -r1.23
--- ServerConfig.py 26 Apr 2003 14:39:59 -0000 1.22
+++ ServerConfig.py 5 May 2003 00:38:46 -0000 1.23
@@ -128,7 +128,7 @@
reasons.append("StatsInterval is too short")
if not server["EncryptIdentityKey"]:
reasons.append("Identity key is not encrypted")
- # ???? Pkey lifetime, sloppiness?
+ # ????004 Pkey lifetime, sloppiness?
if server["MixAlgorithm"] not in _SECURE_MIX_RULES:
reasons.append("Mix algorithm is not secure")
else:
@@ -138,17 +138,17 @@
if server["MixInterval"][2] < 30*60:
reasons.append("Mix interval under 30 minutes")
- # ???? DIRSERVERS?
+ # ????004 DIRSERVERS?
- # ???? Incoming/MMTP
+ # ????004 Incoming/MMTP
- # ???? Outgoing/MMTP
+ # ????004 Outgoing/MMTP
- # ???? Modules?
+ # ????004 Modules?
def _validateRetrySchedule(mixInterval, entries, sectionname,
entryname='Retry'):
- #XXXX writeme.
+ """DOCDOC"""
entry = [e for e in entries.get(sectionname,[]) if e[0] == entryname]
if not entry:
return
Index: ServerKeys.py
===================================================================
RCS file: /home/minion/cvsroot/src/minion/lib/mixminion/server/ServerKeys.py,v
retrieving revision 1.19
retrieving revision 1.20
diff -u -d -r1.19 -r1.20
--- ServerKeys.py 26 Apr 2003 14:39:59 -0000 1.19
+++ ServerKeys.py 5 May 2003 00:38:46 -0000 1.20
@@ -252,7 +252,7 @@
files = [ os.path.join(dirname,f)
for f in os.listdir(dirname) ]
hashFiles = [ os.path.join(self.hashDir, "hash_"+name) ,
- os.path.join(self.hashDir, "hash_"+name+"jrnl") ]
+ os.path.join(self.hashDir, "hash_"+name+"_jrnl") ]
files += [ f for f in hashFiles if os.path.exists(f) ]
secureDelete(files, blocking=1)
os.rmdir(dirname)
Index: ServerMain.py
===================================================================
RCS file: /home/minion/cvsroot/src/minion/lib/mixminion/server/ServerMain.py,v
retrieving revision 1.50
retrieving revision 1.51
diff -u -d -r1.50 -r1.51
--- ServerMain.py 26 Apr 2003 14:39:59 -0000 1.50
+++ ServerMain.py 5 May 2003 00:38:46 -0000 1.51
@@ -17,6 +17,7 @@
import string
import time
import threading
+from types import *
# We pull this from mixminion.Common, just in case somebody still has
# a copy of the old "mixminion/server/Queue.py" (since renamed to
# ServerQueue.py)
@@ -179,9 +180,9 @@
for h in handles:
packet = self.queue.getObject(h)
- #XXXX004 remove the first case after 0.0.3
if type(packet) == type(()):
- LOG.debug(" (skipping message %s in obsolete format)", h)
+ #XXXX005 remove this case.
+ LOG.error(" (skipping message %s in obsolete format)", h)
elif packet.isDelivery():
LOG.debug(" (sending message %s to exit modules)",
formatBase64(packet.getContents()[:8]))
@@ -274,7 +275,8 @@
def onMessageReceived(self, msg):
self.incomingQueue.queueMessage(msg)
- EventStats.receivedPacket("ReceivedPacket", None) # XXXX Replace with server.
+ # XXXX Replace with server.
+ EventStats.log.receivedPacket()
def onMessageSent(self, msg, handle):
self.outgoingQueue.deliverySucceeded(handle)
@@ -399,7 +401,56 @@
#----------------------------------------------------------------------
-class MixminionServer:
+class _Scheduler:
+ """Mixin class for server. Implements DOCDOC"""
+ # DOCDOC
+ def __init__(self):
+ self.scheduledEvents = []
+
+ def firstEventTime(self):
+ if self.scheduledEvents:
+ return self.scheduledEvents[0][0]
+ else:
+ return -1
+
+ def scheduleOnce(self, when, name, cb):
+ assert type(name) is StringType
+ assert type(when) in (IntType, LongType, FloatType)
+ insort(self.scheduledEvents, (when, name, cb))
+
+ def scheduleRecurring(self, first, interval, name, cb):
+ assert type(name) is StringType
+ assert type(first) in (IntType, LongType, FloatType)
+ assert type(interval) in (IntType, LongType, FloatType)
+ next = lambda t=time.time,i=interval: t()+i
+ insort(self.scheduledEvents, (first, name,
+ _RecurringEvent(name, cb, self, next)))
+
+ def scheduleRecurringComplex(self, first, name, cb, nextFn):
+ assert type(name) is StringType
+ assert type(first) in (IntType, LongType, FloatType)
+ insort(self.scheduledEvents, (first, name,
+ _RecurringEvent(name, cb, self, nextFn)))
+
+ def processEvents(self, now=None):
+ if now is None: now = time.time()
+ se = self.scheduledEvents
+ while se and se[0][0] <= now:
+ cb = se[0][2]
+ del se[0]
+ cb()
+
+class _RecurringEvent:
+ def __init__(self, name, cb, scheduler, nextFn):
+ self.name = name
+ self.cb = cb
+ self.scheduler = scheduler
+ self.nextFn = nextFn
+ def __call__(self):
+ self.cb()
+ self.scheduler.scheduleOnce(self.nextFn(), self.name, self)
+
+class MixminionServer(_Scheduler):
"""Wraps and drives all the queues, and the async net server. Handles
all timed events."""
## Fields:
@@ -428,6 +479,7 @@
# pidFile: Filename in which we store the pid of the running server.
def __init__(self, config):
"""Create a new server from a ServerConfig."""
+ _Scheduler.__init__(self)
LOG.debug("Initializing server")
self.config = config
@@ -519,19 +571,28 @@
self.cleanQueues()
- # List of (eventTime, eventName) tuples. Current names are:
- # 'MIX', 'SHRED', and 'TIMEOUT'. Kept in sorted order.
- scheduledEvents = []
now = time.time()
+ self.scheduleRecurring(now+600, 600, "SHRED", self.cleanQueues)
+ self.scheduleRecurring(now+180, 180, "WAIT",
+ lambda: waitForChildren(blocking=0))
+ if EventStats.log.getNextRotation():#XXXX!!!!
+ self.scheduleRecurring(now+300, 300, "ES_SAVE",
+ EventStats.log.save)
+ self.scheduleRecurringComplex(EventStats.log.getNextRotation(),
+ "ES_ROTATE",
+ EventStats.log.rotate,
+ EventStats.log.getNextRotation)
+
+ self.scheduleRecurringComplex(self.mmtpServer.getNextTimeoutTime(now),
+ "TIMEOUT",
+ self.mmtpServer.tryTimeout,
+ self.mmtpServer.getNextTimeoutTime)
+
- scheduledEvents.append( (now + 600, "SHRED") )#FFFF make configurable
- scheduledEvents.append( (now + 180, "WAIT") )#FFFF make configurable
- scheduledEvents.append( (self.mmtpServer.getNextTimeoutTime(now),
- "TIMEOUT") )
nextMix = self.mixPool.getNextMixTime(now)
- scheduledEvents.append( (nextMix, "MIX") )
LOG.debug("First mix at %s", formatTime(nextMix,1))
- scheduledEvents.sort()
+ self.scheduleOnce(self.mixPool.getNextMixTime(now),
+ "MIX", self.doMix)
LOG.info("Entering main loop: Mixminion %s", mixminion.__version__)
@@ -543,7 +604,7 @@
# FFFF Support for automatic key rotation.
while 1:
- nextEventTime = scheduledEvents[0][0]
+ nextEventTime = self.firstEventTime()
now = time.time()
timeLeft = nextEventTime - now
while timeLeft > 0:
@@ -570,54 +631,35 @@
now = time.time()
timeLeft = nextEventTime - now
- # It's time for an event.
- event = scheduledEvents[0][1]
- del scheduledEvents[0]
+ # An event has fired.
+ self.processEvents()
- if event == 'TIMEOUT':
- LOG.trace("Timing out old connections")
- self.mmtpServer.tryTimeout(now)
- insort(scheduledEvents,
- (self.mmtpServer.getNextTimeoutTime(now), "TIMEOUT"))
- elif event == 'SHRED':
- self.cleanQueues()
- insort(scheduledEvents, (now + 600, "SHRED"))
- elif event == 'WAIT':
- # Every few minutes, we reap zombies. Why, you ask? Isn't
- # catching sigchild enough? Nope -- sometimes buggy delivery
- # software forks, stays along long enough to ignore a child's
- # SIGCHLD, then dies itself. Or something -- in any case, we
- # sure seem to be leaving a bunch of zombie mixmaster processes
- # around. This should fix it.
- waitForChildren(blocking=0)
- insort(scheduledEvents, (now + 180, "WAIT"))
- elif event == 'MIX':
- # Before we mix, we need to log the hashes to avoid replays.
- try:
- # There's a potential threading problem here... in
- # between this sync and the 'mix' below, nobody should
- # insert into the mix pool.
- self.mixPool.lock()
- self.packetHandler.syncLogs()
+ def doMix(self):
+ now = time.time()
+ # Before we mix, we need to log the hashes to avoid replays.
+ try:
+ # There's a potential threading problem here... in
+ # between this sync and the 'mix' below, nobody should
+ # insert into the mix pool.
+ self.mixPool.lock()
+ self.packetHandler.syncLogs()
- LOG.trace("Mix interval elapsed")
- # Choose a set of outgoing messages; put them in
- # outgoingqueue and modulemanager
- self.mixPool.mix()
- finally:
- self.mixPool.unlock()
-
- # Send outgoing messages
- self.outgoingQueue.sendReadyMessages()
- # Send exit messages
- self.moduleManager.sendReadyMessages()
+ LOG.trace("Mix interval elapsed")
+ # Choose a set of outgoing messages; put them in
+ # outgoingqueue and modulemanager
+ self.mixPool.mix()
+ finally:
+ self.mixPool.unlock()
- # Choose next mix interval
- nextMix = self.mixPool.getNextMixTime(now)
- insort(scheduledEvents, (nextMix, "MIX"))
- LOG.trace("Next mix at %s", formatTime(nextMix,1))
- else:
- assert event in ("MIX", "SHRED", "TIMEOUT", "WAIT")
+ # Send outgoing messages
+ self.outgoingQueue.sendReadyMessages()
+ # Send exit messages
+ self.moduleManager.sendReadyMessages()
+
+ # Choose next mix interval
+ nextMix = self.mixPool.getNextMixTime(now)
+ self.scheduleOnce(nextMix, "MIX", self.doMix)
+ LOG.trace("Next mix at %s", formatTime(nextMix,1))
def cleanQueues(self):
"""Remove all deleted messages from queues"""
@@ -769,6 +811,8 @@
LOG.fatal_exc(sys.exc_info(),
"Exception while starting server in the background")
os._exit(0)
+ else:
+ os.umask(0000)
# Configure event log
try:
Index: ServerQueue.py
===================================================================
RCS file: /home/minion/cvsroot/src/minion/lib/mixminion/server/ServerQueue.py,v
retrieving revision 1.12
retrieving revision 1.13
diff -u -d -r1.12 -r1.13
--- ServerQueue.py 27 Mar 2003 10:31:00 -0000 1.12
+++ ServerQueue.py 5 May 2003 00:38:46 -0000 1.13
@@ -319,14 +319,13 @@
# currently sending.
# retrySchedule: a list of intervals at which delivery of messages
# should be reattempted, as described in "setRetrySchedule".
-
def __init__(self, location, retrySchedule=None):
Queue.__init__(self, location, create=1, scrub=1)
self._rescan()
if retrySchedule is None:
self.retrySchedule = None
else:
- self.retrySchedule = retrySchedule[:]
+ self.setRetrySchedule(retrySchedule)
def setRetrySchedule(self, schedule):
"""Set the retry schedule for this queue. A retry schedule is
@@ -379,8 +378,6 @@
"""Returns a (n_retries, msg, nextAttempt) tuple for a given
message handle."""
o = self.getObject(handle)
- if len(o) == 3:# XXXX004 For legacy queues; delete after 0.0.3
- o = o + (0,)
return o[0], o[2], o[3]
def sendReadyMessages(self, now=None):
@@ -481,7 +478,9 @@
if retries <= len(self.retrySchedule):
self.queueDeliveryMessage(msg, retries, nextAttempt)
elif not self.retrySchedule:
- #LEGACY XXXX004
+ #XXXX005: Make sure this error never occurs.
+ LOG.error(
+ "ServerQueue.deliveryFailed without retrySchedule")
retries += 1
nextAttempt = 0
if retries < 10: