[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]
[minion-cvs] As-yet-untested code to timeout old connections. Also m...
Update of /home/minion/cvsroot/src/minion/lib/mixminion/server
In directory moria.mit.edu:/tmp/cvs-serv6251/lib/mixminion/server
Modified Files:
MMTPServer.py ServerConfig.py ServerMain.py
Log Message:
As-yet-untested code to timeout old connections. Also move main event loop to a cleaner structure.
Index: MMTPServer.py
===================================================================
RCS file: /home/minion/cvsroot/src/minion/lib/mixminion/server/MMTPServer.py,v
retrieving revision 1.4
retrieving revision 1.5
diff -u -d -r1.4 -r1.5
--- MMTPServer.py 16 Dec 2002 02:40:11 -0000 1.4
+++ MMTPServer.py 21 Dec 2002 01:54:23 -0000 1.5
@@ -152,6 +152,10 @@
"""Returns an integer file descriptor for this connection, or returns
an object that can return such a descriptor."""
pass
+ def tryTimeout(self, cutoff):
+ """If this connection has seen no activity since 'cutoff', and it
+ is subject to aging, shut it down."""
+ pass
class ListenConnection(Connection):
"""A ListenConnection listens on a given port/ip combination, and calls
@@ -239,6 +243,7 @@
self.__sock = sock
self.__con = tls
self.fd = self.__con.fileno()
+ self.lastActivity = time.time()
if serverMode:
self.__state = self.__acceptFn
@@ -375,6 +380,18 @@
if len(out) == 0:
self.finished()
+ def tryTimeout(self, cutoff):
+ if self.lastActivity <= cutoff:
+ warn("Socket %s to %s timed out", self.fd, self.address)
+ # ???? I'm not sure this is right. Instead of just killing
+ # ???? the socket, should we shut down the SSL too?
+ # ???? Also, should we handle timeout as a separate kind of
+ # ???? error from a hooks point of view.
+ self.__server.unregister(self)
+ self.__state = None
+ self.__sock.close()
+ self.handleFail(1)
+
def handleRead(self):
self.__handleAll()
@@ -694,6 +711,8 @@
"""A helper class to invoke AsyncServer, MMTPServerConnection, and
MMTPClientConnection, with a function to add new connections, and
callbacks for message success and failure."""
+ ##
+ # _timeout: the interval after which we drop open inactive connections.
def __init__(self, config, tls):
AsyncServer.__init__(self)
@@ -706,6 +725,12 @@
self._newMMTPConnection)
#self.config = config
self.listener.register(self)
+ self._timeout = config['Server']['Timeout'][2]
+
+ def getNextTimeoutTime(self, now):
+ """Return the time at which we next purge connections, if we have
+ last done so at time 'now'."""
+ return now + self._timeout
def _newMMTPConnection(self, sock):
"""helper method. Creates and registers a new server connection when
@@ -742,3 +767,14 @@
def onMessageSent(self, msg, handle):
pass
+ def tryTimeout(self, now):
+ """Timeout any connection that is too old."""
+ cutoff = now - self._timeout
+ filenos = {}
+ for fd, r in self.readers.items():
+ r.tryTimeout(self._timeout)
+ filenos[fd] = 1
+ for fd, w in self.writers.items():
+ if filenos.has_key(fd): continue
+ w.tryTimeout(self._timeout)
+ filenos[fd] = 0
Index: ServerConfig.py
===================================================================
RCS file: /home/minion/cvsroot/src/minion/lib/mixminion/server/ServerConfig.py,v
retrieving revision 1.4
retrieving revision 1.5
diff -u -d -r1.4 -r1.5
--- ServerConfig.py 20 Dec 2002 23:52:07 -0000 1.4
+++ ServerConfig.py 21 Dec 2002 01:54:23 -0000 1.5
@@ -166,6 +166,7 @@
'MixInterval' : ('ALLOW', C._parseInterval, "30 min"),
'MixPoolRate' : ('ALLOW', _parseFraction, "60%"),
'MixPoolMinSize' : ('ALLOW', C._parseInt, "5"),
+ 'Timeout' : ('ALLOW', C._parseInterval, "5 min"),
},
'DirectoryServers' : { 'ServerURL' : ('ALLOW*', None, None),
'Publish' : ('ALLOW', C._parseBoolean, "no"),
@@ -175,13 +176,13 @@
'Incoming/MMTP' : { 'Enabled' : ('REQUIRE', C._parseBoolean, "no"),
'IP' : ('ALLOW', C._parseIP, "0.0.0.0"),
'Port' : ('ALLOW', C._parseInt, "48099"),
- 'Allow' : ('ALLOW*', C._parseAddressSet_allow, None),
- 'Deny' : ('ALLOW*', C._parseAddressSet_deny, None) },
+ 'Allow' : ('ALLOW*', C._parseAddressSet_allow, None),
+ 'Deny' : ('ALLOW*', C._parseAddressSet_deny, None)
+ },
'Outgoing/MMTP' : { 'Enabled' : ('REQUIRE', C._parseBoolean, "no"),
'Allow' : ('ALLOW*', C._parseAddressSet_allow, None),
'Deny' : ('ALLOW*', C._parseAddressSet_deny, None) },
# FFFF Missing: Queue-Size / Queue config options
- # FFFF timeout options
# FFFF listen timeout??
# FFFF Retry options
# FFFF pool options
Index: ServerMain.py
===================================================================
RCS file: /home/minion/cvsroot/src/minion/lib/mixminion/server/ServerMain.py,v
retrieving revision 1.9
retrieving revision 1.10
diff -u -d -r1.9 -r1.10
--- ServerMain.py 21 Dec 2002 00:18:49 -0000 1.9
+++ ServerMain.py 21 Dec 2002 01:54:23 -0000 1.10
@@ -25,6 +25,7 @@
import mixminion.server.ServerConfig
import mixminion.server.ServerKeys
+from bisect import insort
from mixminion.Common import LOG, LogStream, MixError, MixFatalError, ceilDiv,\
createPrivateDir, formatBase64, formatTime, waitForChildren
@@ -280,9 +281,6 @@
def run(self):
"""Run the server; don't return unless we hit an exception."""
- # FFFF Use heapq to schedule events? [I don't think so; there are only
- # FFFF two events, after all!] [But the intervals may be very
- # FFFF different...!]
f = open(self.pidFile, 'wt')
f.write("%s\n" % os.getpid())
@@ -290,14 +288,23 @@
self.cleanQueues()
+ # List of (eventTime, eventName) tuples. Current names are:
+ # 'MIX', 'SHRED', and 'TIMEOUT'. Kept in sorted order.
+ scheduledEvents = []
now = time.time()
+ scheduledEvents.append( (now + 6000, "SHRED") )#FFFF make configurable
+ scheduledEvents.append( (self.mmtpServer.getNextTimeoutTime(now),
+ "TIMEOUT") )
nextMix = self.mixPool.getNextMixTime(now)
- nextShred = now + 6000
+ scheduledEvents.append( (nextMix, "MIX") )
+ LOG.trace("Next mix at %s", formatTime(nextMix,1))
+ scheduledEvents.sort()
+
#FFFF Unused
#nextRotate = self.keyring.getNextKeyRotation()
while 1:
- LOG.trace("Next mix at %s", formatTime(nextMix,1))
- timeLeft = 1
+ nextEventTime = scheduledEvents[0][0]
+ timeLeft = nextEventTime - time.time()
while timeLeft > 0:
# Handle pending network events
self.mmtpServer.process(timeLeft)
@@ -307,29 +314,41 @@
# Prevent child processes from turning into zombies.
waitForChildren(1)
# Calculate remaining time.
- timeLeft = nextMix - time.time()
+ now = time.time()
+ timeLeft = nextEventTime - now
- # Before we mix, we need to log the hashes to avoid replays.
- # FFFF We need to recover on server failure.
- self.packetHandler.syncLogs()
+ event = scheduledEvents[0][1]
+ del scheduledEvents[0]
- LOG.trace("Mix interval elapsed")
- # Choose a set of outgoing messages; put them in outgoingqueue and
- # modulemanger
- self.mixPool.mix()
- # Send outgoing messages
- self.outgoingQueue.sendReadyMessages()
- # Send exit messages
- self.moduleManager.sendReadyMessages()
+ if event == 'TIMEOUT':
+ LOG.info("Timing out.")
+ self.mmtpServer.tryTimeout(now)
+ insort(scheduledEvents,
+ (self.mmtpServer.getNextTimeoutTime(now), "TIMEOUT"))
+ elif event == 'SHRED':
+ self.cleanQueues()
+ insort(scheduledEvents,
+ (now + 6000, "SHRED"))
+ elif event == 'MIX':
+ # Before we mix, we need to log the hashes to avoid replays.
+ # FFFF We need to recover on server failure.
+ self.packetHandler.syncLogs()
- # Choose next mix interval
- now = time.time()
- nextMix = self.mixPool.getNextMixTime(now)
+ LOG.trace("Mix interval elapsed")
+ # Choose a set of outgoing messages; put them in
+ # outgoingqueue and modulemanger
+ self.mixPool.mix()
+ # Send outgoing messages
+ self.outgoingQueue.sendReadyMessages()
+ # Send exit messages
+ self.moduleManager.sendReadyMessages()
- if now > nextShred:
- # FFFF Configurable shred interval
- self.cleanQueues()
- nextShred = now + 6000
+ # Choose next mix interval
+ nextMix = self.mixPool.getNextMixTime(now)
+ insort(scheduledEvents, (nextMix, "MIX"))
+ LOG.trace("Next mix at %s", formatTime(nextMix,1))
+ else:
+ assert event in ("MIX", "SHRED", "TIMEOUT")
def cleanQueues(self):
"""Remove all deleted messages from queues"""