[Author Prev][Author Next][Thread Prev][Thread Next][Author Index][Thread Index]
[minion-cvs] First batch of pinger implementation; new scheduler; ne...
Update of /home/minion/cvsroot/src/minion/lib/mixminion/server
In directory moria.mit.edu:/tmp/cvs-serv19854/server
Modified Files:
MMTPServer.py ServerConfig.py ServerKeys.py ServerMain.py
Added Files:
Pinger.py
Log Message:
First batch of pinger implementation; new scheduler; new cert rotation policy
Pinger.py:
- Code to send one-hop pings to measure one-node reliability
- Initial code to send two-hop pings (not finished; not used.)
- Code to try sending link padding to servers in order to measure their
uptime.
- Code to store record of ping events, and rotate out old records.
- Code to parse entries from record of ping events.
- Code to configure ping generators.
Still missing: code to analyze ping events; code to report results of
analysis; code to feed status of analysis back into pinger; more tests;
more docs.
MMTPServer.py:
- Support link padding on outgoing connections from servers
- Subclass MMTPClientConnection to tell PingLog about failed and successful
connections.
- Hook pinglogs into MMTPClientConnection.
ServerConfig.py:
- Add an option to turn on pinging
ServerKeys.py, ServerMain.py:
- Decouple link key rotation from packet key rotation; there's no need
to keep link keys alive for a long time, or even save them to disk.
ServerKeys.py:
- Add function to return current descriptor. How did we get on so
long without it? Anyway, the pinger needs it.
ServerMain:
- IncomingQueue sends decoded ping packets to the PingLog.
- OutgoingQueue asks the PingGenerator to add link padding as needed.
- Configure and enable pinger as required.
- Completely re-work scheduler, for three reasons:
- The old scheduler was not very OO, and worked very circuitously.
(I could no longer convince myself it was correct.)
- The old scheduler wasn't very good at handling regular events
that needed to be run in the background, and that couldn't be
rescheduled until they were done running.
- The old scheduler used an always-sorted-list structure to make
lookup-next-event O(1) while making schedule-an-event O(N). But
neither of these is in the critical path, and maintaining the
list in order was hazardous.
Index: MMTPServer.py
===================================================================
RCS file: /home/minion/cvsroot/src/minion/lib/mixminion/server/MMTPServer.py,v
retrieving revision 1.84
retrieving revision 1.85
diff -u -d -r1.84 -r1.85
--- MMTPServer.py 23 Mar 2004 00:09:24 -0000 1.84
+++ MMTPServer.py 27 Jul 2004 04:33:20 -0000 1.85
@@ -41,8 +41,7 @@
from mixminion.Filestore import CorruptedFile
from mixminion.ThreadUtils import MessageQueue, QueueEmpty
-__all__ = [ 'AsyncServer', 'ListenConnection', 'MMTPServerConnection',
- 'MMTPClientConnection' ]
+__all__ = [ 'AsyncServer', 'ListenConnection', 'MMTPServerConnection' ]
class SelectAsyncServer:
"""AsyncServer is the core of a general-purpose asynchronous
@@ -499,6 +498,32 @@
def isJunk(self):
return 0
+class LinkPadding(mixminion.MMTPClient.DeliverableMessage):
+ """DOCDOC"""
+ def __init__(self):
+ self.contents = getCommonPRNG().getBytes(1<<15)
+ def succeeded(self): pass
+ def failed(self,retriable=0): pass
+ def getContents(self): return self.contents
+ def isJunk(self): return 1
+
+class _ClientCon(MMTPClientConnection):
+ """DOCDOC"""
+ def configurePingLog(self, pingLog, nickname):
+ self._pingLog = pingLog
+ self._nickname = nickname
+ self._wasOnceConnected = 0
+ def onProtocolRead(self):
+ MMTPClientConnection.onProtocolRead(self)
+ if self._isConnected:
+ if self._pingLog:
+ self._pingLog.connected(self._nickname)
+ self._wasOnceConnected = 1
+ def _failPendingPackets(self):
+ if not self._wasOnceConnected and self._pingLog:
+ self._pingLog.connectFailed(self._nickname)
+ MMTPClientConnection._failPendingPackets(self)
+
LISTEN_BACKLOG = 128
class MMTPAsyncServer(AsyncServer):
"""A helper class to invoke AsyncServer, MMTPServerConnection, and
@@ -574,6 +599,7 @@
self.dnsCache = None
self.msgQueue = MessageQueue()
self.pendingPackets = []
+ self.pingLog = None
def connectDNSCache(self, dnsCache):
"""Use the DNSCache object 'DNSCache' to resolve DNS queries for
@@ -581,6 +607,9 @@
"""
self.dnsCache = dnsCache
+ def connectPingLog(self, pingLog):
+ self.pingLog = pingLog
+
def setServerContext(self, servercontext):
"""Change the TLS context used for newly received connections.
Used to rotate keys."""
@@ -723,9 +752,13 @@
# There isn't any connection to the right server. Open one...
addr = (ip, port, keyID)
finished = lambda addr=addr, self=self: self.__clientFinished(addr)
- con = MMTPClientConnection(
+ con = _ClientCon(
family, ip, port, keyID, serverName=serverName,
context=self.clientContext, certCache=self.certificateCache)
+ nickname = mixminion.ServerInfo.getNicknameByKeyID(keyID)
+ if nickname is None:
+ nickname = "<unknown>"
+ con.configurePingLog(self.pingLog, nickname)
#con.allPacketsSent = finished #XXXX007 wrong!
con.onClosed = finished
except (socket.error, MixProtocolError), e:
--- NEW FILE: Pinger.py ---
# Copyright 2004 Nick Mathewson. See LICENSE for licensing information.
# $Id: Pinger.py,v 1.1 2004/07/27 04:33:20 nickm Exp $
"""mixminion.server.Pinger
Built-in network reliability tester (pinger) for Mixminion servers.
Our pinger uses a three-part architecture. First, we periodically
consider adding testing packets to the outgoing batches.
Second, we note the outcome of connection attempts; and the timing
of our sending/receiving test packets.
Third, we use the timing/uptime information in the second part
above to try to infer how reliable the other nodes in the network
are.
"""
import calendar
import os
import re
import struct
import threading
import time
import mixminion.BuildMessage
import mixminion.Crypto
import mixminion.Packet
import mixminion.ServerInfo
import mixminion.server.PacketHandler
import mixminion.server.MMTPServer
from mixminion.Common import ceilDiv, createPrivateDir, formatBase64, \
formatFnameDate, formatTime, LOG, parseFnameDate, previousMidnight, \
secureDelete
class PingLog:
"""DOCDOC
stores record of pinger events
"""
HISTORY_DAYS = 12
HEARTBEAT_INTERVAL = 30*60
def __init__(self, location):
createPrivateDir(location)
self.location = location
self.file = None
self.fname = None
self.lock = threading.RLock()
self.rotate()
def rotate(self,now=None):
self.lock.acquire()
try:
date = formatFnameDate(now)
if self.file is not None:
if self.fname.endswith(date):
# no need to rotate.
return
self.rotated()
self.close()
self.fname = os.path.join(self.location, "events-"+date)
self.file = open(self.fname, 'a')
finally:
self.lock.release()
def close(self):
self.lock.acquire()
try:
if self.file is not None:
self.file.close()
self.file = None
finally:
self.lock.release()
def clean(self, now=None, deleteFn=None):
if now is None:
now = time.time()
self.lock.acquire()
try:
self.rotate(now)
bad = []
cutoff = previousMidnight(now) - 24*60*60*(self.HISTORY_DAYS+1)
for fn in os.listdir(self.location):
if not fn.startswith("events-"):
continue
try:
date = parseFnameDate(fn[7:])
except ValueError:
LOG.warn("Bad events filename %r; removing", fn)
bad.append(os.path.join(self.location, fn))
continue
if date < cutoff:
LOG.debug("Removing expired events file %s", fn)
bad.append(os.path.join(self.location, fn))
if deleteFn:
deleteFn(bad)
else:
secureDelete(bad, blocking=1)
finally:
self.lock.release()
def flush(self):
self.lock.acquire()
try:
if self.file is not None:
self.file.flush()
finally:
self.lock.release()
def _write(self,*msg):
self.lock.acquire()
try:
now = time.time()
m = "%s %s\n" % (formatTime(now)," ".join(msg))
self.file.write(m)
self.file.flush()
# XXXX self.events.append((now, msg))
finally:
self.lock.release()
def startup(self):
self._write("STARTUP")
def shutdown(self):
self._write("SHUTDOWN")
def rotated(self):
self._write("ROTATE")
def connected(self, nickname):
self._write("CONNECTED",nickname)
def connectFailed(self, nickname):
self._write("CONNECT_FAILED",nickname)
def queuedPing(self, hash, path):
self._write("PING",formatBase64(hash),path)
def gotPing(self, hash):
self._write("GOT_PING",formatBase64(hash))
def heartbeat(self):
self._write("STILL_UP")
def _getAllFilenames(self):
fns = [ fn for fn in os.listdir(self.location) if
fn.startswith("events-") ]
fns.sort()
return fns
def processPing(self, packet):#instanceof DeliveryPacket with type==ping
assert packet.getExitType() != mixminion.Packet.PING_TYPE
addr = packet.getAddress()
if len(addr) != mixminion.Crypto.DIGEST_LEN:
LOG.warn("Ignoring malformed ping packet (exitInfo length %s)",
len(packet.address))
return
if addr != mixminion.Crypto.sha1(packet.payload):
LOG.warn("Received ping packet with corrupt payload; ignoring")
return
LOG.debug("Received valid ping packet [%s]",formatBase64(addr))
self.gotPing(addr)
def iteratePingLog(file, func):
_EVENT_ARGS = {
"STARTUP" : 0,
"SHUTDOWN" : 0,
"ROTATE" : 0,
"CONNECTED" : 1,
"CONNECT_FAILED" : 1,
"PING" : 2,
"GOT_PING" : 1,
"STILL_UP" : 0 }
_PAT = re.compile(
r"^(\d{4})-(\d{2})-(\d{2}) (\d{2}):(\d{2}):(\d{2}) (\w+ ?.*)")
if hasattr(file,"xreadlines"):
readLines = file.xreadlines
else:
readLines = file.readlines
events = []
for line in readLines():
if line.startswith("#"):
return
m = _PAT.match(line)
if not m:
continue
gr = m.groups()
# parse time, event; make sure right # of args.
tm = calendar.timegm(*gr[:6])
event = tuple(gr[6].split())
if _EVENT_ARGS.get(event[0]) != len(event)-1:
continue
func(tm, event)
def readEventLog(file):
result = []
iterateEventLog(file, lambda tm, event: result.append((tm,event)))
return result
class PingStatus:
def __init__(self):
self.serverStatus = {} #"U"/"D",as-of
self.serverUptime = {}
self.serverDowntime = {}
self.pendingPings = {} #hash64->sent,path
self.lastEvent = None
def addEvent(self, tm, event):
eType = event[0]
if eType == 'PING':
self.pendingPings[event[1]] = tm, event[2]
elif eType == 'GOT_PING':
try:
tSent, path = self.pendingPings[event[1]]
except KeyError:
# we didn't send it, or can't remember sending it.
pass
else:
self.pingDone(path, tSent, tm)
elif eType == 'CONNECTED':
try:
s, tLast = self.serverStatus[event[1]]
except KeyError:
self.serverStatus[event[1]] = ('U', tm)
else:
if s == 'D':
try:
self.serverDowntime[event[1]] += tm-tLast
except KeyError:
self.serverDowntime[event[1]] = tm-tLast
serverStatus[event[1]] = ('U', tm)
elif eType == 'CONNECT_FAILED':
try:
s, tLast = self.serverStatus[event[1]]
except KeyError:
self.serverStatus[event[1]] = ('D', tm)
else:
if s == 'U':
try:
self.serverDowntime[event[1]] += tm-tLast
except KeyError:
self.serverDowntime[event[1]] = tm-tLast
serverStatus[event[1]] = ('U', tm)
elif eType == 'SHUTDOWN':
self.diedAt(tm)
elif eType == 'STARTUP':
if self.lastEvent:
self.diedAt(self.lastEvent[0])
self.lastEvent = (tm, event)
def pingDone(self, path, queuedAt, receivedAt):
servers = path.split(",")
if len(servers) == 1:
self.oneHopPingDone(servers[0], queuedAt, receivedAt)
elif len(servers) == 2:
self.twoHopPingDone(servers, queuedAt, receivedAt)
else:
pass # never made now.
def oneHopPingDone(self, nickname, queuedAt, receivedAt):
pass
def diedAt(self, diedAt):
for nickname, (status, tLast) in self.serverStatus.items():
if status == 'U':
m = self.serverUptime
else:
m = self.serverDowntime
try:
m[nickname] += diedAt-tLast
except KeyError:
m[nickname] = diedAt-tLast
self.serverStatus = {}
self.lastEvent = None
def getNetworkStatus(self):
nicknames = {}
for n in self.serverUptime.keys(): nicknames[n]=1
for n in self.serverDowntime.keys(): nicknames[n]=1
class PingGenerator:
"""DOCDOC"""
#XXXX008 add abstract functions.
def __init__(self, config):
self.directory = None
self.pingLog = None
self.outgoingQueue = None
self.myNickname = config['Server']['Nickname']
def connect(self, directory, outgoingQueue, pingLog, keyring):
pass
def getFirstPingTime(self):
return None
def scheduleAllPings(self, now=None):
pass
def sendPings(self, now=None):
pass
def addLinkPadding(self, pkts):
pass
def _sendOnePing(self, path1, path2):
assert path1 and path2
assert path2[-1].getNickname() == self.myNickname
p1 = self.directory.getPath(path1)
p2 = self.directory.getPath(path2)
verbose_path = ",".join([s.getNickname() for s in (p1+p2[:-1])])
payload = mixminion.BuildMessage.buildRandomPayload()
payloadHash = mixminion.Crypto.sha1(payload)
packet = mixminion.BuildMessage.buildForwardPacket(
payload, exitType=mixminion.Packet.PING_TYPE, exitInfo=payloadHash,
path1=p1, path2=p2, suppressTag=1)
addr = p1[0].getMMTPHostInfo()
obj = mixminion.server.PacketHandler.RelayedPacket(addr, packet)
LOG.debug("Pinger queueing ping along path %s [%s]",verbose_path,
formatBase64(payloadHash))
self.pingLog.queuedPing(payloadHash, verbose_path)
self.outgoingQueue.queueDeliveryMessage(obj, addr)
class _PingScheduler:
def __init__(self):
self.nextPingTime = {}#path->when
# PERIOD
# PING_INTERVAL
def connect(self, directory, outgoingQueue, pingLog, keyring):
self.directory = directory
self.outgoingQueue = outgoingQueue
self.pingLog = pingLog
self.keyring = keyring
def scheduleAllPings(self, now=None):
raise NotImplemented()
def _getPeriodStart(self, t):
raise NotImplemented()
def _schedulePing(self,path,now=None):
if now is None: now = time.time()
periodStart = _getPeriodStart(now)
t = periodStart + self._getPerturbation(path, periodStart)
t += self.PING_INTERVAL * ceilDiv(now-t, self.PING_INTERVAL)
if t>periodStart+self.PERIOD:
t = periodStart+self.PERIOD+self._getPerturbation(path,
periodStart+self.PERIOD)
self.nextPingTime[path] = t
LOG.trace("Scheduling %d-hop ping for %s at %s", len(path),
",".join(path), formatTime(t,1))
return t
def _getPerturbation(self, path, periodStart):
sha = mixminion.Crypto.sha1(",".join(path) + "@@" + str(day))
sec = abs(struct.unpack("I", sha[:4])[0]) % self.PING_INTERVAL
return sec
def getFirstPingTime(self):
if self.nextPingTime:
return min(self.nextPingTime.values())
else:
return None
class OneHopPingGenerator(PingGenerator,_PingScheduler):
"""DOCDOC"""
#XXXX008 make this configurable, but not less than 2 hours.
PING_INTERVAL = 2*60*60
PERIOD = 24*60*60
def __init__(self, config):
PingGenerator.__init__(self, config)
_PingScheduler.__init__(self)
def scheduleAllPings(self, now=None):
if now is None: now = time.time()
servers = self.directory.getAllServers()
nicknames = {}
for s in servers:
nicknames[s.getNickname()]=1
for n in nicknames.keys():
self._schedulePing((n,),now)
def _getPeriodStart(self, t):
return previousMidnight(t)
def sendPings(self, now=None):
if now is None: now = time.time()
servers = self.directory.getAllServers()
nicknames = {}
for s in servers:
nicknames[s.getNickname()] = 1
pingable = []
for n in nicknames.keys():
when = self.nextPingTime.get((n,))
if when is None:
# No ping scheduled; server must be new to directory.
self._schedulePing((n,),now)
continue
elif when > now: # Not yet.
continue
else:
# Time for a ping!
pingable.append(n)
myDescriptor = self.keyring.getCurrentDescriptor()
for n in pingable:
self._sendOnePing([n], [myDescriptor])
self._schedulePing((n,), now+60)
class TwoHopPingGenerator:
"""DOCDOC"""
#XXXX008 make this configurable, but not less than 2 hours.
PING_INTERVAL = 7*24*60*60
PERIOD = 7*24*60*60
def __init__(self, config):
PingGenerator.__init__(self, config)
_PingScheduler.__init__(self)
def scheduleAllPings(self, now=None):
if now is None: now = time.time()
servers = self.directory.getAllServers()
nicknames = {}
for s in servers:
nicknames[s.getNickname()]=1
for n1 in nicknames.keys():
for n2 in nicknames.keys():
self._schedulePing((n1,n2),now)
def _getPeriodStart(self, t):
return previousMidnight(t)
def sendPings(self, now=None):
if now is None: now = time.time()
servers = self.directory.getAllServers()
nicknames = {}
for s in servers:
nicknames[s.getNickname()] = 1
pingable = []
for n1 in nicknames.keys():
for n2 in nicknames.keys():
when = self.nextPingTime.get((n1,n2))
if when is None:
# No ping scheduled; server must be new to directory.
self._schedulePing((n1,n2),now)
continue
elif when > now: # Not yet.
continue
else:
# Time for a ping!
pingable.append((n1,n2))
myDescriptor = self.keyring.getCurrentDescriptor()
for n1, n2 in pingable:
self._sendOnePing([n1,n2], [myDescriptor])
self._schedulePing((n1,n2), now+60)
#class GeometricLinkPaddingGenerator
class TestLinkPaddingGenerator(PingGenerator):
"""DOCDOC"""
def __init__(self, config):
PingGenerator.__init__(self,config)
interval = config['Server']['MixInterval'].getSeconds()
if interval < 60*60:
self.prob = interval / float(60*60)
else:
self.prob = 1.0
def connect(self, directory, outgoingQueue, pingLog, keyring):
self.directory = directory
def addLinkPadding(self, pkts):
prng = mixminion.Crypto.getCommonPRNG()
addressSet = {}
for s in self.directory.getAllServers():
addressSet[s.getRoutingInfo()]=1
needPadding = []
for addr in addressSet.keys():
if not pkts.get(addr,None):
if prng.getFloat() < self.prob:
LOG.debug("Pinger adding link-padding to test %s",
mixminion.ServerInfo.displayServerByRouting(addr))
needPadding.append(addr)
for addr in needPadding:
padding = mixminion.server.MMTPServer.LinkPadding()
pkts.setdefault(addr,[]).append(padding)
class CompoundPingGenerator:
def __init__(self, generators):
self.gens = generators[:]
def connect(self, directory, outgoingQueue, pingLog, keyring):
assert directory
assert outgoingQueue
assert pingLog
assert keyring
for g in self.gens:
g.connect(directory, outgoingQueue, pingLog, keyring)
def scheduleAllPings(self, now=None):
for g in self.gens:
g.scheduleAllPings(now)
def getFirstPingTime(self):
times = []
for g in self.gens:
t = g.getFirstPingTime()
if t is not None:
times.append(t)
if times:
return min(times)
else:
return None
def sendPings(self,now=None):
for g in self.gens:
g.sendPings()
return self.getFirstPingTime()
def addLinkPadding(self,pkts):
for g in self.gens:
g.addLinkPadding(pkts)
def getPingGenerator(config):
"""DOCDOC"""
if not config['Pinging'].get('Enabled'):
return CompoundPingGenerator([])
pingers = []
pingers.append(OneHopPingGenerator(config))
pingers.append(TestLinkPaddingGenerator(config))
return CompoundPingGenerator(pingers)
Index: ServerConfig.py
===================================================================
RCS file: /home/minion/cvsroot/src/minion/lib/mixminion/server/ServerConfig.py,v
retrieving revision 1.56
retrieving revision 1.57
diff -u -d -r1.56 -r1.57
--- ServerConfig.py 2 May 2004 18:45:16 -0000 1.56
+++ ServerConfig.py 27 Jul 2004 04:33:20 -0000 1.57
@@ -357,6 +357,7 @@
'MaxBandwidth' : ('ALLOW', "size", None),
'MaxBandwidthSpike' : ('ALLOW', "size", None),
},
+ 'Pinging' : { 'Enabled' : ('ALLOW', 'boolean', 'yes') },
'DirectoryServers' : { # '__SECTION__' : ('REQUIRE', None, None),
'ServerURL' : ('ALLOW*', None, None),
'PublishURL' : ('ALLOW*', None, None),
Index: ServerKeys.py
===================================================================
RCS file: /home/minion/cvsroot/src/minion/lib/mixminion/server/ServerKeys.py,v
retrieving revision 1.66
retrieving revision 1.67
diff -u -d -r1.66 -r1.67
--- ServerKeys.py 6 Mar 2004 00:04:38 -0000 1.66
+++ ServerKeys.py 27 Jul 2004 04:33:20 -0000 1.67
@@ -33,7 +33,7 @@
from mixminion.Common import AtomicFile, LOG, MixError, MixFatalError, \
ceilDiv, createPrivateDir, checkPrivateFile, englishSequence, \
formatBase64, formatDate, formatTime, previousMidnight, readFile, \
- secureDelete, tryUnlink, UIError, writeFile
+ replaceFile, secureDelete, tryUnlink, UIError, writeFile
from mixminion.Config import ConfigError
#----------------------------------------------------------------------
@@ -54,6 +54,16 @@
#FFFF Make this configurable
DIRECTORY_UPLOAD_URL = "http://mixminion.net/minion-cgi/publish"
+
+# We have our X509 certificate set to expire a bit after public key does,
+# so that slightly-skewed clients don't incorrectly give up while trying to
+# connect to us. (And so that we don't mess up the world while being
+# slightly skewed.)
+CERTIFICATE_EXPIRY_SLOPPINESS = 2*60*60
+
+# DOCDOC
+CERTIFICATE_LIFETIME = 24*60*60
+
#----------------------------------------------------------------------
class ServerKeyring:
"""A ServerKeyring remembers current and future keys, descriptors, and
@@ -86,9 +96,13 @@
self.keyDir = config.getKeyDir()
self.hashDir = os.path.join(config.getWorkDir(), 'hashlogs')
self.dhFile = os.path.join(config.getWorkDir(), 'tls', 'dhparam')
+ self.certFile = os.path.join(config.getWorkDir(), "cert_chain")
self.keyOverlap = config['Server']['PublicKeyOverlap'].getSeconds()
+ self.nickname = config['Server']['Nickname'] #DOCDOC
self.nextUpdate = None
self.currentKeys = None
+ self._tlsContext = None #DOCDOC
+ self._tlsContextExpires = -1 #DOCDOC
self.checkKeys()
def checkKeys(self):
@@ -452,16 +466,44 @@
return self.dhFile
- def _getTLSContext(self, keys=None):
- """Create and return a TLS context from the currently live key."""
- if keys is None:
- keys = self.getServerKeysets()[-1]
- return mixminion._minionlib.TLSContext_new(keys.getCertFileName(),
- keys.getMMTPKey(),
- self._getDHFile())
+ def _newTLSContext(self, now=None):
+ """Create and return a TLS context."""
+ if now is None:
+ now = time.time()
+ mmtpKey = mixminion.Crypto.pk_generate(MMTP_KEY_BYTES*8)
- def updateKeys(self, packetHandler, mmtpServer, statusFile=None,when=None):
- """Update the keys and certificates stored in a PacketHandler and an
+ certStarts = now - CERTIFICATE_EXPIRY_SLOPPINESS
+ expires = now + CERTIFICATE_LIFETIME
+ certEnds = now + CERTIFICATE_LIFETIME + CERTIFICATE_EXPIRY_SLOPPINESS
+
+ tmpName = self.certFile + "_tmp"
+ generateCertChain(tmpName, mmtpKey, self.getIdentityKey(),
+ self.nickname, certStarts, certEnds)
+ replaceFile(tmpName, self.certFile)
+
+ self._tlsContext = (
+ mixminion._minionlib.TLSContext_new(self.certFile,
+ mmtpKey,
+ self._getDHFile()))
+ self._tlsContextExpires = expires
+ return self._tlsContext
+
+ def _getTLSContext(self, force=0, now=None):
+ if now is None:
+ now = time.time()
+ if force or self._tlsContext is None or self._tlsContextExpires < now:
+ return self._newTLSContext(now=now)
+ else:
+ return self._tlsContext
+
+ def updateMMTPServerTLSContext(self,mmtpServer,force=0,now=None):
+ """DOCDOC"""
+ context = self._getTLSContext(force=force,now=now)
+ mmtpServer.setServerContext(context)
+ return self._tlsContextExpires
+
+ def updateKeys(self, packetHandler, statusFile=None,when=None):
+ """Update the keys stored in a PacketHandler,
MMTPServer object, so that they contain the currently correct
keys. Also removes any dead keys.
@@ -475,12 +517,6 @@
LOG.info("Updating keys: %s currently valid (%s); %s expired (%s)",
len(keys), " ".join(keyNames),
len(deadKeys), " ".join(deadKeyNames))
- if mmtpServer is not None:
- LOG.trace("Using TLS cert from %s: good from %s to %s",
- keyNames[-1], formatDate(keys[-1].validAfter),
- formatDate(keys[-1].validUntil))
- context = self._getTLSContext(keys[-1])
- mmtpServer.setServerContext(context)
if packetHandler is not None:
packetKeys = []
hashLogs = []
@@ -547,12 +583,28 @@
def getAddress(self):
"""Return out current ip/port/keyid tuple"""
- keys = self.getServerKeysets()[0]
- desc = keys.getServerDescriptor()
+ desc = self.getCurrentDescriptor()
return (desc['Incoming/MMTP']['IP'],
desc['Incoming/MMTP']['Port'],
desc.getKeyDigest())
+ def getCurrentDescriptor(self, now=None):
+ """DOCDOC"""
+ self._lock.acquire()
+ if now is None:
+ now = time.time()
+ try:
+ keysets = self.getServerKeysets()
+ for k in keysets:
+ va,vu = k.getLiveness()
+ if va <= now <= vu:
+ return k.getServerDescriptor()
+
+ LOG.warn("getCurrentDescriptor: no live keysets??")
+ return self.getServerKeysets()[-1].getServerDescriptor()
+ finally:
+ self._lock.release()
+
def lock(self, blocking=1):
return self._lock.acquire(blocking)
@@ -577,7 +629,6 @@
# keydir: Directory to store this keyset's data.
# hashlogFile: filename of this keyset's hashlog.
# packetKeyFile, mmtpKeyFile: filename of this keyset's short-term keys
- # certFile: filename of this keyset's X509 certificate
# descFile: filename of this keyset's server descriptor.
# publishedFile: filename to store this server's publication time.
#
@@ -599,6 +650,10 @@
self.packetKeyFile = os.path.join(keydir, "mix.key")
self.mmtpKeyFile = os.path.join(keydir, "mmtp.key")
self.certFile = os.path.join(keydir, "mmtp.cert")
+ if os.path.exists(self.mmtpKeyFile):
+ secureDelete(self.mmtpKeyFile)
+ if os.path.exists(self.certFile):
+ secureDelete(self.certFile)
self.descFile = os.path.join(keydir, "ServerDesc")
self.publishedFile = os.path.join(keydir, "published")
self.serverinfo = None
@@ -611,9 +666,6 @@
def delete(self):
"""Remove this keyset from disk."""
files = [self.packetKeyFile,
- self.mmtpKeyFile,
- self.certFile,
- self.certFile+"_tmp",
self.descFile,
self.publishedFile,
self.hashlogFile ]
@@ -625,7 +677,6 @@
def checkKeys(self):
"""Check whether all the required keys exist and are private."""
checkPrivateFile(self.packetKeyFile)
- checkPrivateFile(self.mmtpKeyFile)
def load(self, password=None):
"""Read the short-term keys from disk. Must be called before
@@ -633,24 +684,18 @@
self.checkKeys()
self.packetKey = mixminion.Crypto.pk_PEM_load(self.packetKeyFile,
password)
- self.mmtpKey = mixminion.Crypto.pk_PEM_load(self.mmtpKeyFile,
- password)
def save(self, password=None):
"""Save this set of keys to disk."""
mixminion.Crypto.pk_PEM_save(self.packetKey, self.packetKeyFile,
password)
- mixminion.Crypto.pk_PEM_save(self.mmtpKey, self.mmtpKeyFile,
- password)
def clear(self):
"""Stop holding the keys in memory."""
- self.packetKey = self.mmtpKey = None
+ self.packetKey = None
- def getCertFileName(self): return self.certFile
def getHashLogFileName(self): return self.hashlogFile
def getDescriptorFileName(self): return self.descFile
def getPacketKey(self): return self.packetKey
- def getMMTPKey(self): return self.mmtpKey
def getPacketKeyID(self):
"Return the sha1 hash of the asn1 encoding of the packet public key"
return mixminion.Crypto.sha1(self.packetKey.encode_key(1))
@@ -889,11 +934,6 @@
#----------------------------------------------------------------------
# Functionality to generate keys and server descriptors
-# We have our X509 certificate set to expire a bit after public key does,
-# so that slightly-skewed clients don't incorrectly give up while trying to
-# connect to us.
-CERTIFICATE_EXPIRY_SLOPPINESS = 5*60
-
def generateServerDescriptorAndKeys(config, identityKey, keydir, keyname,
hashdir, validAt=None, now=None,
useServerKeys=0, validUntil=None):
@@ -915,17 +955,14 @@
serverKeys = ServerKeyset(keydir, keyname, hashdir)
serverKeys.load()
packetKey = serverKeys.packetKey
- mmtpKey = serverKeys.mmtpKey # not used
else:
# First, we generate both of our short-term keys...
packetKey = mixminion.Crypto.pk_generate(PACKET_KEY_BYTES*8)
- mmtpKey = mixminion.Crypto.pk_generate(MMTP_KEY_BYTES*8)
# ...and save them to disk, setting up our directory structure while
# we're at it.
serverKeys = ServerKeyset(keydir, keyname, hashdir)
serverKeys.packetKey = packetKey
- serverKeys.mmtpKey = mmtpKey
serverKeys.save()
# FFFF unused
@@ -952,13 +989,6 @@
if not validUntil:
keyLifetime = config['Server']['PublicKeyLifetime'].getSeconds()
validUntil = previousMidnight(validAt + keyLifetime + 30)
- certStarts = validAt - CERTIFICATE_EXPIRY_SLOPPINESS
- certEnds = validUntil + CERTIFICATE_EXPIRY_SLOPPINESS
-
- # Create the X509 certificates in any case, in case one of the parameters
- # has changed.
- generateCertChain(serverKeys.getCertFileName(),
- mmtpKey, identityKey, nickname, certStarts, certEnds)
mmtpProtocolsIn = mixminion.server.MMTPServer.MMTPServerConnection \
.PROTOCOL_VERSIONS[:]
@@ -1249,3 +1279,4 @@
return "Mixminion %s; Python %r on %r" % (
mixminion.__version__, sys.version, uname)
+
Index: ServerMain.py
===================================================================
RCS file: /home/minion/cvsroot/src/minion/lib/mixminion/server/ServerMain.py,v
retrieving revision 1.130
retrieving revision 1.131
diff -u -d -r1.130 -r1.131
--- ServerMain.py 17 May 2004 21:25:27 -0000 1.130
+++ ServerMain.py 27 Jul 2004 04:33:20 -0000 1.131
@@ -21,6 +21,7 @@
# ...
# stats.tmp [Cache of stats from latest period]
# dir/... [Directory dowloaded from directory server.]
+# pinger/log/ [DOCDOC]
#
# QUEUEDIR defaults to ${WORKDIR}/queues
# ${QUEUEDIR}/incoming/ [Queue of received,unprocessed pkts]
@@ -64,6 +65,7 @@
import mixminion.server.MMTPServer
import mixminion.server.Modules
import mixminion.server.PacketHandler
+import mixminion.server.Pinger
import mixminion.server.ServerQueue
import mixminion.server.ServerConfig
import mixminion.server.ServerKeys
@@ -156,6 +158,7 @@
mixminion.Filestore.StringStore.__init__(self, location, create=1)
self.packetHandler = packetHandler
self.mixPool = None
+ self.pingLog = None#DOCDOC
def connectQueues(self, mixPool, processingThread):
"""Sets the target mix queue"""
@@ -166,6 +169,10 @@
self.processingThread.addJob(
lambda self=self, h=h: self.__deliverPacket(h))
+ def setPingLog(self, pingLog):
+ "DOCDOC"
+ self.pingLog = pingLog
+
def queuePacket(self, pkt):
"""Add a packet for delivery"""
h = mixminion.Filestore.StringStore.queueMessage(self, pkt)
@@ -192,7 +199,17 @@
self.removeMessage(handle)
else:
if res.isDelivery():
- res.decode()
+ if res.getExitType() == mixminion.Packet.PING_TYPE:
+ LOG.debug("Ping packet IN:%s decoded", handle)
+ if self.pingLog is not None:
+ self.pingLog.processPing(res)
+ else:
+ LOG.debug("Pinging not enabled; discarding packet")
+ self.removeMessage(handle)
+ return
+ else:
+ #XXXX008 defer decoding to module; don't do it here.
+ res.decode()
self.mixPool.queueObject(res)
self.removeMessage(handle)
@@ -325,6 +342,7 @@
mixminion.server.ServerQueue.PerAddressDeliveryQueue.__init__(self, location)
self.server = None
self.incomingQueue = None
+ self.pingGenerator = None
self.keyID = keyid
def configure(self, config):
@@ -332,12 +350,13 @@
retry = config['Outgoing/MMTP']['Retry']
self.setRetrySchedule(retry)
- def connectQueues(self, server, incoming):
+ def connectQueues(self, server, incoming, pingGenerator):
"""Set the MMTPServer and IncomingQueue that this
OutgoingQueue informs of its deliverable packets."""
self.server = server
self.incomingQueue = incoming
+ self.pingGenerator = pingGenerator#DOCDOC
def _deliverMessages(self, msgList):
"Implementation of abstract method from DeliveryQueue."
@@ -351,6 +370,8 @@
except mixminion.Filestore.CorruptedFile:
continue
pkts.setdefault(addr, []).append(pending)
+
+ deliverable = {}
for routing, packets in pkts.items():
if self.keyID == routing.keyinfo:
for pending in packets:
@@ -361,13 +382,18 @@
pending.succeeded()
continue
- deliverable = [
+ deliverable[routing] = [
mixminion.server.MMTPServer.DeliverablePacket(pending)
for pending in packets ]
LOG.trace("Delivering packets OUT:[%s] to %s",
" ".join([p.getHandle() for p in packets]),
mixminion.ServerInfo.displayServerByRouting(routing))
- self.server.sendPacketsByRouting(routing, deliverable)
+
+ if self.pingGenerator is not None:
+ self.pingGenerator.addLinkPadding(deliverable)
+
+ for routing, packets in deliverable.items():
+ self.server.sendPacketsByRouting(routing, packets)
class _MMTPServer(mixminion.server.MMTPServer.MMTPAsyncServer):
"""Implementation of mixminion.server.MMTPServer that knows about
@@ -519,112 +545,190 @@
signal.signal(signal.SIGTERM, _sigTermHandler)
#----------------------------------------------------------------------
+# Functions to schedule periodic events. Invocation times are approximate,
+# and not accurate to within more than a minute or two.
+
+class ScheduledEvent:
+ """Abstract base class for a scheduleable event."""
+ def getNextTime(self):
+ """Return the next time when this event should be called. Return
+ -1 for 'never' and 'None' for 'currently unknown'.
+ """
+ raise NotImplementedError("getNextTime")
+ def __call__(self):
+ """Invoke this event."""
+ raise NotImplementedError("__call__")
+
+class OneTimeEvent:
+ """An event that will be called exactly once."""
+ def __init__(self, when, func):
+ """Create an event to call func() at the time 'when'."""
+ self.when = when
+ self.func = func
+ def getNextTime(self):
+ return self.when
+ def __call__(self):
+ self.func()
+ self.when = -1
+
+class RecurringEvent:
+ """An event that will be called at regular intervals."""
+ def __init__(self, when, func, repeat):
+ """Create an event to call func() at the time 'when', and every
+ 'repeat' seconds thereafter."""
+ self.when = when
+ self.func = func
+ self.repeat = repeat
+ def getNextTime(self):
+ return self.when
+ def __call__(self):
+ try:
+ self.func()
+ finally:
+ self.when += self.repeat
+
+class RecurringComplexEvent(RecurringEvent):
+ """An event that will be called at irregular intervals."""
+ def __init__(self, when, func):
+ """Create an event to invoke func() at time 'when'. func() must
+ return -1 for 'do not call again', or a time when it should next
+ be called."""
+ RecurringEvent.__init__(self, when, func, None)
+ def __call__(self):
+ self.when = self.func()
+
+class RecurringBackgroundEvent:
+ """An event that will be called at regular intervals, and scheduled
+ as a background job. Does not reschedule the event while it is
+ already in progress."""
+ def __init__(self, when, scheduleJob, func, repeat):
+ """Create an event to invoke 'func' at time 'when' and every
+ 'repeat' seconds thereafter. The function 'scheduleJob' will
+ be invoked with a single callable object in order to run that
+ callable in the background.
+ """
+ self.when = when
+ self.scheduleJob = scheduleJob
+ self.func = func
+ self.repeat = repeat
+ self.running = 0
+ self.lock = threading.Lock()
+ def getNextTime(self):
+ self.lock.acquire()
+ try:
+ if self.running:
+ return None
+ else:
+ return self.when
+ finally:
+ self.lock.release()
+ def __call__(self):
+ self.lock.acquire()
+ try:
+ if self.running:
+ return
+ self.running = 1
+ finally:
+ self.lock.release()
+
+ self.scheduleJob(self._background)
+ def _background(self):
+ """Helper function: this one is actually invoked by the background
+ thread."""
+ self.func()
+ self.lock.acquire()
+ try:
+ now = time.time()
+ while self.when < now:
+ self.when += self.repeat
+ self.running = 0
+ finally:
+ self.lock.release()
+
+class RecurringComplexBackgroundEvent(RecurringBackgroundEvent):
+ """An event to run a job at irregular intervals in the background."""
+ def __init__(self, when, scheduleJob, func):
+ """Create an event to invoke 'func' at time 'when'. func() must
+ return -1 for 'do not call again', or a time when it should next
+ be called.
+
+ The function 'scheduleJob' will be invoked with a single
+ callable object in order to run that callable in the
+ background.
+ """
+ RecurringBackgroundEvent.__init__(self, when, scheduleJob, func, None)
+ def _background(self):
+ next = self.func()
+ self.lock.acquire()
+ try:
+ self.when = next
+ self.running = 0
+ finally:
+ self.lock.release()
class _Scheduler:
- """Mixin class for server. Implements a priority queue of ongoing,
- scheduled tasks with a loose (few seconds) granularity.
- """
- # Fields:
- # scheduledEvents: list of (time, identifying-string, callable)
- # Sorted by time. We could use a heap here instead, but
- # that doesn't turn into a net benefit until we have a hundred
- # events or so.
+ """Base class: used to run a bunch of events periodically."""
+ ##Fields:
+ # scheduledEvents: a list of ScheduledEvent objects.
+ # schedLock: a threading.RLock object to protect the list scheduledEvents
+ # (but not the events themselves).
+ #XXXX008 needs more tests
def __init__(self):
- """Create a new _Scheduler"""
+ """Create a new scheduler."""
self.scheduledEvents = []
self.schedLock = threading.RLock()
-
def firstEventTime(self):
- """Return the time at which the earliest-scheduled event is
- supposed to occur. Returns -1 if no events.
- """
+ """Return the time at which an event will first occur."""
self.schedLock.acquire()
try:
- if self.scheduledEvents:
- return self.scheduledEvents[0][0]
- else:
+ if not self.scheduledEvents:
return -1
+ first = 0
+ for e in self.scheduledEvents:
+ t = e.getNextTime()
+ if t in (-1,None): continue
+ if not first or t < first:
+ first = t
+ return first
finally:
self.schedLock.release()
- def scheduleOnce(self, when, name, cb):
- """Schedule a callback function, 'cb', to be invoked at time 'when.'
- """
- assert type(name) is StringType
- assert type(when) in (IntType, LongType, FloatType)
+ def scheduleEvent(self, event):
+ """Add a ScheduledEvent to this scheduler"""
+ when = event.getNextTime()
+ if when == -1:
+ return
+ self.schedLock.acquire()
try:
- self.schedLock.acquire()
- insort(self.scheduledEvents, (when, name, cb))
+ self.scheduledEvents.append(event)
finally:
- self.schedLock.release()
+ self.schedLock.acquire()
+
+ #XXXX008 -- these are only used for testing.
+ def scheduleOnce(self, when, name, cb):
+ self.scheduleEvent(OneTimeEvent(when,cb))
def scheduleRecurring(self, first, interval, name, cb):
- """Schedule a callback function 'cb' to be invoked at time 'first,'
- and every 'interval' seconds thereafter.
- """
- assert type(name) is StringType
- assert type(first) in (IntType, LongType, FloatType)
- assert type(interval) in (IntType, LongType, FloatType)
- def cbWrapper(cb=cb, interval=interval):
- cb()
- return time.time()+interval
- self.scheduleRecurringComplex(first,name,cbWrapper)
+ self.scheduleEvent(RecurringEvent(first, cb, interval))
def scheduleRecurringComplex(self, first, name, cb):
- """Schedule a callback function 'cb' to be invoked at time 'first,'
- and thereafter at times returned by 'cb'.
-
- (Every time the callback is invoked, if it returns a non-None value,
- the event is rescheduled for the time it returns.)
- """
- assert type(name) is StringType
- assert type(first) in (IntType, LongType, FloatType)
- self.scheduleOnce(first, name, _RecurringEvent(name, cb, self))
+ self.scheduleEvent(RecurringComplexEvent(first, cb))
def processEvents(self, now=None):
- """Run all events that are scheduled to occur before 'now'.
-
- Note: if an event reschedules itself for a time _before_ now,
- it will only be run once per invocation of processEvents.
-
- The right way to run this class is something like:
- while 1:
- interval = time.time() - scheduler.firstEventTime()
- if interval > 0:
- time.sleep(interval)
- # or maybe, select.select(...,...,...,interval)
- scheduler.processEvents()
- """
- if now is None: now = time.time()
+ """Run all events that need to get called at the time 'now'."""
+ if now is None:
+ now = time.time()
self.schedLock.acquire()
try:
- se = self.scheduledEvents
- cbs = []
- while se and se[0][0] <= now:
- cbs.append(se[0][2])
- del se[0]
+ events = [(e.getNextTime(),e) for e in self.scheduledEvents]
+ self.scheduledEvents = [e for t,e in events if t != -1]
+ runnable = [(t,e) for t,e in events
+ if t not in (-1,None) and t <= now]
finally:
self.schedLock.release()
- for cb in cbs:
- cb()
-
-class _RecurringEvent:
- """helper for _Scheduler. Calls a callback, then reschedules it."""
- def __init__(self, name, cb, scheduler):
- self.name = name
- self.cb = cb
- self.scheduler = scheduler
-
- def __call__(self):
- nextTime = self.cb()
- if nextTime is None:
- LOG.warn("Not rescheduling %s", self.name)
- return
- elif nextTime < time.time():
- raise MixFatalError("Tried to schedule event %s in the past! (%s)"
- %(self.name, formatTime(nextTime,1)))
-
- self.scheduler.scheduleOnce(nextTime, self.name, self)
+ runnable.sort()
+ for _,e in runnable:
+ e()
class MixminionServer(_Scheduler):
"""Wraps and drives all the queues, and the async net server. Handles
@@ -654,6 +758,7 @@
# running in the same directory. The filename for this lock is
# stored in self.pidFile.
# pidFile: Filename in which we store the pid of the running server.
+ # pingLog: DOCDOC
def __init__(self, config):
"""Create a new server from a ServerConfig."""
_Scheduler.__init__(self)
@@ -720,8 +825,9 @@
self.mmtpServer = _MMTPServer(config, None)
LOG.debug("Initializing keys")
self.descriptorFile = os.path.join(homeDir, "current-desc")
- self.keyring.updateKeys(self.packetHandler, self.mmtpServer,
+ self.keyring.updateKeys(self.packetHandler,
self.descriptorFile)
+ self.keyring.updateMMTPServerTLSContext(self.mmtpServer)
LOG.debug("Initializing directory client")
self.dirClient = mixminion.ClientDirectory.ClientDirectory(config)
try:
@@ -766,6 +872,22 @@
LOG.debug("Found %d pending packets in outgoing queue",
self.outgoingQueue.count())
+ pingerEnabled = config['Pinging'].get("Enabled")
+ if pingerEnabled:
+ #FFFF Later, enable this stuff anyway, to make R-G-B mixing work.
+ LOG.debug("Initializing ping log")
+ pingerDir = os.path.join(config.getWorkDir(), "pinger")
+ pingerLogDir = os.path.join(pingerDir, "log")
+ self.pingLog = mixminion.server.Pinger.PingLog(pingerLogDir)
+ self.pingLog.startup()
+
+ LOG.debug("Initializing ping generator")
+
+ self.pingGenerator=mixminion.server.Pinger.getPingGenerator(config)
+ else:
+ self.pingLog = None
+ self.pingGenerator = None
+
self.cleaningThread = CleaningThread()
self.processingThread = ProcessingThread()
@@ -777,10 +899,22 @@
self.mixPool.connectQueues(outgoing=self.outgoingQueue,
manager=self.moduleManager)
self.outgoingQueue.connectQueues(server=self.mmtpServer,
- incoming=self.incomingQueue)
+ incoming=self.incomingQueue,
+ pingGenerator=self.pingGenerator)
self.mmtpServer.connectQueues(incoming=self.incomingQueue,
outgoing=self.outgoingQueue)
self.mmtpServer.connectDNSCache(self.dnsCache)
+ if self.pingGenerator is not None:
+ assert self.pingLog
+ self.pingGenerator.connect(directory=self.dirClient,
+ outgoingQueue=self.outgoingQueue,
+ pingLog=self.pingLog,
+ keyring=self.keyring)
+ self.pingGenerator.scheduleAllPings(time.time())
+
+ if self.pingLog is not None:
+ self.incomingQueue.setPingLog(self.pingLog)
+ self.mmtpServer.connectPingLog(self.pingLog)
self.cleaningThread.start()
self.processingThread.start()
@@ -802,7 +936,7 @@
return time.time() + 120
try:
- self.keyring.updateKeys(self.packetHandler, self.mmtpServer,
+ self.keyring.updateKeys(self.packetHandler,
self.descriptorFile)
return self.keyring.getNextKeyRotation()
finally:
@@ -810,49 +944,35 @@
def generateKeys(self):
"""Callback used to schedule key-generation"""
-
- # We generate and publish keys in the processing thread, so we don't
- # slow down the server. We also reschedule from the processing thread,
- # so that we can take the new keyset into account when calculating
- # when keys are next needed.
-
- def c(self=self):
- try:
- self.keyring.lock()
- self.keyring.createKeysAsNeeded()
- self.updateKeys(lock=0)
- if self.config['DirectoryServers'].get('Publish'):
- self.keyring.publishKeys()
- self.scheduleOnce(self.keyring.getNextKeygen(),
- "KEY_GEN",
- self.generateKeys)
- finally:
- self.keyring.unlock()
-
- self.processingThread.addJob(c)
+ try:
+ self.keyring.lock()
+ self.keyring.createKeysAsNeeded()
+ self.updateKeys(lock=0)
+ if self.config['DirectoryServers'].get('Publish'):
+ self.keyring.publishKeys()
+ return self.keyring.getNextKeygen()
+ finally:
+ self.keyring.unlock()
def updateDirectoryClient(self):
- def c(self=self):
- try:
- self.dirClient.update()
- nextUpdate = succeedingMidnight(time.time()+30)
- prng = mixminion.Crypto.getCommonPRNG()
- # Randomly retrieve the directory within an hour after
- # midnight, to avoid hosing the server.
- nextUpdate += prng.getInt(60)*60
- except mixminion.ClientDirectory.GotInvalidDirectoryError, e:
- LOG.warn(str(e))
- LOG.warn(" I'll try again in an hour.")
- nextUpdate = min(succeedingMidnight(time.time()+30),
- time.time()+3600)
- except UIError, e:#XXXX008 This should really be a new exception
- LOG.warn(str(e))
- LOG.warn(" I'll try again in an hour.")
- nextUpdate = min(succeedingMidnight(time.time()+30),
- time.time()+3600)
- self.scheduleOnce(nextUpdate, "UPDATE_DIR_CLIENT",
- self.updateDirectoryClient)
- self.processingThread.addJob(c)
+ try:
+ self.dirClient.update()
+ nextUpdate = succeedingMidnight(time.time()+30)
+ prng = mixminion.Crypto.getCommonPRNG()
+ # Randomly retrieve the directory within an hour after
+ # midnight, to avoid hosing the server.
+ nextUpdate += prng.getInt(60)*60
+ except mixminion.ClientDirectory.GotInvalidDirectoryError, e:
+ LOG.warn(str(e))
+ LOG.warn(" I'll try again in an hour.")
+ nextUpdate = min(succeedingMidnight(time.time()+30),
+ time.time()+3600)
+ except UIError, e:#XXXX008 This should really be a new exception
+ LOG.warn(str(e))
+ LOG.warn(" I'll try again in an hour.")
+ nextUpdate = min(succeedingMidnight(time.time()+30),
+ time.time()+3600)
+ return nextUpdate
def run(self):
"""Run the server; don't return unless we hit an exception."""
@@ -864,43 +984,68 @@
self.cleanQueues()
now = time.time()
- self.scheduleRecurring(now+600, 600, "SHRED", self.cleanQueues)
- self.scheduleRecurring(now+180, 180, "WAIT",
- lambda: waitForChildren(blocking=0))
+ self.scheduleEvent(RecurringEvent(now+600, self.cleanQueues, 600))
+ self.scheduleEvent(RecurringEvent(now+180,
+ lambda: waitForChildren(blocking=0),
+ 180))
if EventStats.log.getNextRotation():
- self.scheduleRecurring(now+300, 300, "ES_SAVE",
- lambda: EventStats.log.save)
def _rotateStats():
EventStats.log.rotate()
return EventStats.log.getNextRotation()
- self.scheduleRecurringComplex(EventStats.log.getNextRotation(),
- "ES_ROTATE",
- _rotateStats)
+ self.scheduleEvent(RecurringEvent(now+300,
+ lambda: EventStats.log.save, 300))
+ self.scheduleEvent(RecurringComplexEvent(
+ EventStats.log.getNextRotation(),
+ _rotateStats))
def _tryTimeout(self=self):
self.mmtpServer.tryTimeout()
self.dnsCache.cleanCache()
return self.mmtpServer.getNextTimeoutTime()
- self.scheduleRecurringComplex(self.mmtpServer.getNextTimeoutTime(now),
- "TIMEOUT",
- _tryTimeout)
+ self.scheduleEvent(RecurringComplexEvent(
+ self.mmtpServer.getNextTimeoutTime(now),
+ _tryTimeout))
- self.scheduleRecurringComplex(self.keyring.getNextKeyRotation(),
- "KEY_ROTATE",
- self.updateKeys)
+ self.scheduleEvent(RecurringComplexEvent(
+ self.keyring.getNextKeyRotation(),
+ self.updateKeys))
- self.scheduleOnce(self.keyring.getNextKeygen(),
- "KEY_GEN",
- self.generateKeys)
+ self.scheduleEvent(RecurringBackgroundEvent(
+ self.keyring._tlsContextExpires,
+ self.processingThread.addJob,
+ lambda self=self: self.keyring.updateMMTPServerTLSContext(
+ self.mmtpServer,
+ force=1),
+ mixminion.server.ServerKeys.CERTIFICATE_LIFETIME))
+
+ self.scheduleEvent(RecurringComplexBackgroundEvent(
+ self.keyring.getNextKeygen(),
+ self.processingThread.addJob,
+ self.generateKeys))
+
+ if self.pingGenerator is not None:
+ self.scheduleEvent(RecurringComplexBackgroundEvent(
+ self.pingGenerator.getFirstPingTime(),
+ self.processingThread.addJob,
+ self.pingGenerator.sendPings))
+ if self.pingLog is not None:
+ self.scheduleEvent(RecurringEvent(
+ now+self.pingLog.HEARTBEAT_INTERVAL,
+ self.pingLog.heartbeat,
+ self.pingLog.HEARTBEAT_INTERVAL))
# Makes next update get scheduled.
- self.updateDirectoryClient()
+ nextUpdate = self.updateDirectoryClient()
+ self.scheduleEvent(RecurringComplexBackgroundEvent(
+ nextUpdate,
+ self.processingThread.addJob,
+ self.updateDirectoryClient))
nextMix = self.mixPool.getNextMixTime(now)
LOG.debug("First mix at %s", formatTime(nextMix,1))
- self.scheduleRecurringComplex(self.mixPool.getNextMixTime(now),
- "MIX", self.doMix)
+ self.scheduleEvent(RecurringComplexEvent(
+ self.mixPool.getNextMixTime(now), self.doMix))
LOG.info("Entering main loop: Mixminion %s", mixminion.__version__)
@@ -910,15 +1055,16 @@
if self.config['Server'].get("Daemon",1):
closeUnusedFDs()
+ SCHEDULE_INTERVAL = 60
+ TICK_INTERVAL = self.mmtpServer.TICK_INTERVAL
while 1:
- nextEventTime = self.firstEventTime()
now = time.time()
- timeLeft = nextEventTime - now
- tickInterval = self.mmtpServer.TICK_INTERVAL
- nextTick = now+tickInterval
+ nextEvent = now + SCHEDULE_INTERVAL
+ timeLeft = SCHEDULE_INTERVAL
+ nextTick = now+TICK_INTERVAL
while timeLeft > 0:
# Handle pending network events
- self.mmtpServer.process(tickInterval)
+ self.mmtpServer.process(TICK_INTERVAL)
# Check for signals
if STOPPING:
LOG.info("Caught SIGTERM; shutting down.")
@@ -938,8 +1084,8 @@
now = time.time()
if now > nextTick:
self.mmtpServer.tick()
- nextTick = now+tickInterval
- timeLeft = nextEventTime - now
+ nextTick = now+TICK_INTERVAL
+ timeLeft = nextEvent - now
# An event has fired.
self.processEvents()
@@ -999,9 +1145,13 @@
self.mixPool.queue.cleanQueue(df)
self.outgoingQueue.cleanQueue(df)
self.moduleManager.cleanQueues(df)
+ if self.pingLog:
+ self.pingLog.clean()
def close(self):
"""Release all resources; close all files."""
+ if self.pingLog is not None:
+ self.pingLog.shutdown()
self.cleaningThread.shutdown()
self.processingThread.shutdown()
self.moduleManager.shutdown()