[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[minion-cvs] As-yet-untested code to timeout old connections. Also m...



Update of /home/minion/cvsroot/src/minion/lib/mixminion/server
In directory moria.mit.edu:/tmp/cvs-serv6251/lib/mixminion/server

Modified Files:
	MMTPServer.py ServerConfig.py ServerMain.py 
Log Message:
As-yet-untested code to timeout old connections. Also move main event loop to a cleaner structure.

Index: MMTPServer.py
===================================================================
RCS file: /home/minion/cvsroot/src/minion/lib/mixminion/server/MMTPServer.py,v
retrieving revision 1.4
retrieving revision 1.5
diff -u -d -r1.4 -r1.5
--- MMTPServer.py	16 Dec 2002 02:40:11 -0000	1.4
+++ MMTPServer.py	21 Dec 2002 01:54:23 -0000	1.5
@@ -152,6 +152,10 @@
         """Returns an integer file descriptor for this connection, or returns
            an object that can return such a descriptor."""
         pass
+    def tryTimeout(self, cutoff):
+        """If this connection has seen no activity since 'cutoff', and it
+           is subject to aging, shut it down."""
+        pass
 
 class ListenConnection(Connection):
     """A ListenConnection listens on a given port/ip combination, and calls
@@ -239,6 +243,7 @@
         self.__sock = sock            
         self.__con = tls
         self.fd = self.__con.fileno()
+        self.lastActivity = time.time()
 
         if serverMode:
             self.__state = self.__acceptFn
@@ -375,6 +380,18 @@
         if len(out) == 0:
             self.finished()
 
+    def tryTimeout(self, cutoff):
+        if self.lastActivity <= cutoff:
+            warn("Socket %s to %s timed out", self.fd, self.address)
+            # ????     I'm not sure this is right.  Instead of just killing 
+            # ???? the socket, should we shut down the SSL too?
+            # ????     Also, should we handle timeout as a separate kind of
+            # ???? error from a hooks point of view.
+            self.__server.unregister(self)
+            self.__state = None
+            self.__sock.close()
+            self.handleFail(1)
+
     def handleRead(self):
         self.__handleAll()
 
@@ -694,6 +711,8 @@
     """A helper class to invoke AsyncServer, MMTPServerConnection, and
        MMTPClientConnection, with a function to add new connections, and
        callbacks for message success and failure."""
+    ##
+    # _timeout: the interval after which we drop open inactive connections.
     def __init__(self, config, tls):
         AsyncServer.__init__(self)
 
@@ -706,6 +725,12 @@
                                          self._newMMTPConnection)
         #self.config = config
         self.listener.register(self)
+        self._timeout = config['Server']['Timeout'][2]
+
+    def getNextTimeoutTime(self, now):
+        """Return the time at which we next purge connections, if we have
+           last done so at time 'now'."""
+        return now + self._timeout
 
     def _newMMTPConnection(self, sock):
         """helper method.  Creates and registers a new server connection when
@@ -742,3 +767,14 @@
     def onMessageSent(self, msg, handle):
         pass
 
+    def tryTimeout(self, now):
+        """Timeout any connection that is too old."""
+        cutoff = now - self._timeout
+        filenos = {}
+        for fd, r in self.readers.items():
+            r.tryTimeout(self._timeout)
+            filenos[fd] = 1
+        for fd, w in self.writers.items():
+            if filenos.has_key(fd): continue
+            w.tryTimeout(self._timeout)
+            filenos[fd] = 0

Index: ServerConfig.py
===================================================================
RCS file: /home/minion/cvsroot/src/minion/lib/mixminion/server/ServerConfig.py,v
retrieving revision 1.4
retrieving revision 1.5
diff -u -d -r1.4 -r1.5
--- ServerConfig.py	20 Dec 2002 23:52:07 -0000	1.4
+++ ServerConfig.py	21 Dec 2002 01:54:23 -0000	1.5
@@ -166,6 +166,7 @@
                      'MixInterval' : ('ALLOW', C._parseInterval, "30 min"),
                      'MixPoolRate' : ('ALLOW', _parseFraction, "60%"),
                      'MixPoolMinSize' : ('ALLOW', C._parseInt, "5"),
+		     'Timeout' : ('ALLOW', C._parseInterval, "5 min"),
                      },
         'DirectoryServers' : { 'ServerURL' : ('ALLOW*', None, None),
                                'Publish' : ('ALLOW', C._parseBoolean, "no"),
@@ -175,13 +176,13 @@
         'Incoming/MMTP' : { 'Enabled' : ('REQUIRE', C._parseBoolean, "no"),
                             'IP' : ('ALLOW', C._parseIP, "0.0.0.0"),
                             'Port' : ('ALLOW', C._parseInt, "48099"),
-                          'Allow' : ('ALLOW*', C._parseAddressSet_allow, None),
-                          'Deny' : ('ALLOW*', C._parseAddressSet_deny, None) },
+			  'Allow' : ('ALLOW*', C._parseAddressSet_allow, None),
+                          'Deny' : ('ALLOW*', C._parseAddressSet_deny, None) 
+			 },
         'Outgoing/MMTP' : { 'Enabled' : ('REQUIRE', C._parseBoolean, "no"),
                           'Allow' : ('ALLOW*', C._parseAddressSet_allow, None),
                           'Deny' : ('ALLOW*', C._parseAddressSet_deny, None) },
         # FFFF Missing: Queue-Size / Queue config options
-        # FFFF         timeout options
         # FFFF         listen timeout??
         # FFFF         Retry options
         # FFFF         pool options

Index: ServerMain.py
===================================================================
RCS file: /home/minion/cvsroot/src/minion/lib/mixminion/server/ServerMain.py,v
retrieving revision 1.9
retrieving revision 1.10
diff -u -d -r1.9 -r1.10
--- ServerMain.py	21 Dec 2002 00:18:49 -0000	1.9
+++ ServerMain.py	21 Dec 2002 01:54:23 -0000	1.10
@@ -25,6 +25,7 @@
 import mixminion.server.ServerConfig
 import mixminion.server.ServerKeys
 
+from bisect import insort
 from mixminion.Common import LOG, LogStream, MixError, MixFatalError, ceilDiv,\
      createPrivateDir, formatBase64, formatTime, waitForChildren
 
@@ -280,9 +281,6 @@
 
     def run(self):
         """Run the server; don't return unless we hit an exception."""
-        # FFFF Use heapq to schedule events? [I don't think so; there are only
-        # FFFF   two events, after all!]  [But the intervals may be very
-        # FFFF   different...!]
 
         f = open(self.pidFile, 'wt')
         f.write("%s\n" % os.getpid())
@@ -290,14 +288,23 @@
 
         self.cleanQueues()
 
+        # List of (eventTime, eventName) tuples.  Current names are:
+        #  'MIX', 'SHRED', and 'TIMEOUT'.  Kept in sorted order.
+        scheduledEvents = []
         now = time.time()
+        scheduledEvents.append( (now + 6000, "SHRED") )#FFFF make configurable
+        scheduledEvents.append( (self.mmtpServer.getNextTimeoutTime(now),
+                                 "TIMEOUT") )
         nextMix = self.mixPool.getNextMixTime(now)
-        nextShred = now + 6000
+        scheduledEvents.append( (nextMix, "MIX") )
+        LOG.trace("Next mix at %s", formatTime(nextMix,1))
+        scheduledEvents.sort()
+
         #FFFF Unused
         #nextRotate = self.keyring.getNextKeyRotation()
         while 1:
-            LOG.trace("Next mix at %s", formatTime(nextMix,1))
-            timeLeft = 1
+            nextEventTime = scheduledEvents[0][0]
+            timeLeft = nextEventTime - time.time()
             while timeLeft > 0:
                 # Handle pending network events
                 self.mmtpServer.process(timeLeft)
@@ -307,29 +314,41 @@
                 # Prevent child processes from turning into zombies.
                 waitForChildren(1)
                 # Calculate remaining time.
-                timeLeft = nextMix - time.time()
+                now = time.time()
+                timeLeft = nextEventTime - now
 
-            # Before we mix, we need to log the hashes to avoid replays.
-            # FFFF We need to recover on server failure.
-            self.packetHandler.syncLogs()
+            event = scheduledEvents[0][1]
+            del scheduledEvents[0]
 
-            LOG.trace("Mix interval elapsed")
-            # Choose a set of outgoing messages; put them in outgoingqueue and
-            # modulemanger
-            self.mixPool.mix()
-            # Send outgoing messages
-            self.outgoingQueue.sendReadyMessages()
-            # Send exit messages
-            self.moduleManager.sendReadyMessages()
+            if event == 'TIMEOUT':
+                LOG.info("Timing out.")
+                self.mmtpServer.tryTimeout(now)
+                insort(scheduledEvents,
+                       (self.mmtpServer.getNextTimeoutTime(now), "TIMEOUT"))
+            elif event == 'SHRED':
+                self.cleanQueues()
+                insort(scheduledEvents,
+                       (now + 6000, "SHRED"))
+            elif event == 'MIX':
+                # Before we mix, we need to log the hashes to avoid replays.
+                # FFFF We need to recover on server failure.
+                self.packetHandler.syncLogs()
 
-            # Choose next mix interval
-            now = time.time()
-            nextMix = self.mixPool.getNextMixTime(now)
+                LOG.trace("Mix interval elapsed")
+                # Choose a set of outgoing messages; put them in
+                # outgoingqueue and modulemanger
+                self.mixPool.mix()
+                # Send outgoing messages
+                self.outgoingQueue.sendReadyMessages()
+                # Send exit messages
+                self.moduleManager.sendReadyMessages()
 
-            if now > nextShred:
-                # FFFF Configurable shred interval
-                self.cleanQueues()
-                nextShred = now + 6000
+                # Choose next mix interval
+                nextMix = self.mixPool.getNextMixTime(now)
+                insort(scheduledEvents, (nextMix, "MIX"))
+                LOG.trace("Next mix at %s", formatTime(nextMix,1))
+            else:
+                assert event in ("MIX", "SHRED", "TIMEOUT")
 
     def cleanQueues(self):
         """Remove all deleted messages from queues"""