[Author Prev][Author Next][Thread Prev][Thread Next][Author Index][Thread Index]

[minion-cvs] Untested and likely buggy code to limit bandwidth usage...



Update of /home/minion/cvsroot/src/minion/lib/mixminion/server
In directory moria.mit.edu:/tmp/cvs-serv1148/lib/mixminion/server

Modified Files:
	MMTPServer.py ServerConfig.py ServerMain.py 
Log Message:
Untested and likely buggy code to limit bandwidth usage and number of concurrent outgoing connections

Index: MMTPServer.py
===================================================================
RCS file: /home/minion/cvsroot/src/minion/lib/mixminion/server/MMTPServer.py,v
retrieving revision 1.74
retrieving revision 1.75
diff -u -d -r1.74 -r1.75
--- MMTPServer.py	2 Feb 2004 07:05:50 -0000	1.74
+++ MMTPServer.py	6 Feb 2004 23:14:28 -0000	1.75
@@ -23,6 +23,7 @@
 import socket
 import select
 import re
+import sys
 import threading
 import time
 from types import StringType
@@ -31,7 +32,7 @@
 import mixminion.TLSConnection
 import mixminion._minionlib as _ml
 from mixminion.Common import MixError, MixFatalError, MixProtocolError, \
-     LOG, stringContains
+     LOG, stringContains, floorDiv
 from mixminion.Crypto import sha1, getCommonPRNG
 from mixminion.Packet import PACKET_LEN, DIGEST_LEN, IPV4Info, MMTPHostInfo
 from mixminion.MMTPClient import PeerCertificateCache, MMTPClientConnection
@@ -54,11 +55,15 @@
     # self.connections: a map from fd to Connection objects.
     # self.state: a map from fd to the latest wantRead,wantWrite tuples
     #    returned by the connection objects' process or getStatus methods.
+
+    TICK_INTERVAL = 1.0 #DOCDOC
+
     def __init__(self):
         """Create a new AsyncServer with no readers or writers."""
         self._timeout = None
         self.connections = {}
         self.state = {}
+        self.bandwidthPerTick = self.bucket = None #DOCDOC
 
     def process(self,timeout):
         """If any relevant file descriptors become available within
@@ -81,6 +86,10 @@
             time.sleep(timeout)
             return
 
+        if self.bucket is not None and self.bucket <= 0:
+            time.sleep(timeout)
+            return
+
         try:
             readfds,writefds,exfds = select.select(readfds,writefds,exfds,
                                                    timeout)
@@ -92,12 +101,24 @@
 
         writefds += exfds
 
+        active = []
+
         for fd, c in self.connections.items():
             r = fd in readfds
             w = fd in writefds
             if not (r or w):
                 continue
-            wr, ww, isopen = c.process(r,w)
+            active.append((c,r,w,fd))
+
+        if not active: return
+        if self.bucket is None:
+            cap = None
+        else:
+            cap = floorDiv(self.bucket, len(active))
+        for c,r,w,fd in active:
+            wr, ww, isopen, nbytes = c.process(r,w,0,cap)
+            if cap is not None:
+                self.bucket -= nbytes
             if not isopen:
                 del self.connections[fd]
                 del self.state[fd]
@@ -133,6 +154,10 @@
             if con.tryTimeout(cutoff):
                 self.remove(con,fd)
 
+    def tick(self):
+        """DOCDOC"""
+        self.bucket = self.bandwidthPerTick
+
 class PollAsyncServer(SelectAsyncServer):
     """Subclass of SelectAsyncServer that uses 'poll' where available.  This
        is more efficient, but less universal."""
@@ -146,8 +171,11 @@
                            (1,1): select.POLLIN+select.POLLOUT+select.POLLERR,
                            (1,2): select.POLLIN+select.POLLOUT+select.POLLERR }
     def process(self,timeout):
+        if self.bucket is not None and self.bucket <= 0:
+            time.sleep(timeout)
+            return
         try:
-            # (watch out: poll takes a timeout in msec, but select takes a 
+            # (watch out: poll takes a timeout in msec, but select takes a
             #  timeout in sec.)
             events = self.poll.poll(timeout*1000)
         except select.error, e:
@@ -155,10 +183,19 @@
                 return
             else:
                 raise e
+        if not events:
+            return
+        if self.bucket is None:
+            cap = None
+        else:
+            cap = floorDiv(self.bucket,len(events))
         for fd, mask in events:
             c = self.connections[fd]
-            wr,ww,isopen = c.process(mask&select.POLLIN, mask&select.POLLOUT,
-                                     mask&(select.POLLERR|select.POLLHUP))
+            wr,ww,isopen,n = c.process(mask&select.POLLIN, mask&select.POLLOUT,
+                                       mask&(select.POLLERR|select.POLLHUP),
+                                       cap)
+            if cap is not None:
+                self.bucket -= n
             if not isopen:
                 self.poll.unregister(fd)
                 del self.connections[fd]
@@ -184,13 +221,13 @@
 
 class Connection:
     "A connection is an abstract superclass for asynchronous channels"
-    def process(self, r, w, x):
+    def process(self, r, w, x, cap):
         """Invoked when there is data to read or write.  Must return a 3-tuple
            of (wantRead, wantWrite, isOpen)."""
-        return 0,0,0
+        return 0,0,0,0#DOCDOC
     def getStatus(self):
         """Returns the same 3-tuple as process."""
-        return 0,0,0
+        return 0,0,0#DOCDOC
     def fileno(self):
         """Returns an integer file descriptor for this connection, or returns
            an object that can return such a descriptor."""
@@ -199,6 +236,12 @@
         """If this connection has seen no activity since 'cutoff', and it
            is subject to aging, shut it down."""
         pass
+    def resetMaxBandwidth(self, n):
+        """DOCDOC"""
+        pass
+    def getBytesTransferred(self):
+        """DOCDOC"""
+        return 0
 
 class ListenConnection(Connection):
     """A ListenConnection listens on a given port/ip combination, and calls
@@ -229,12 +272,12 @@
         LOG.info("Listening at %s on port %s (fd %s)",
                  ip, port, self.sock.fileno())
 
-    def process(self, r, w, x):
+    def process(self, r, w, x, cap):
         #XXXX007 do something with x
         con, addr = self.sock.accept()
         LOG.debug("Accepted connection from %s", addr)
         self.connectionFactory(con)
-        return self.isOpen,0,self.isOpen
+        return self.isOpen,0,self.isOpen,0
 
     def getStatus(self):
         return self.isOpen,0,self.isOpen
@@ -248,6 +291,9 @@
     def fileno(self):
         return self.sock.fileno()
 
+    def resetMaxBandwidth(self, n):
+        pass
+
 class MMTPServerConnection(mixminion.TLSConnection.TLSConnection):
     """A TLSConnection that implements the server side of MMTP."""
     ##
@@ -264,7 +310,7 @@
             addr,port = sock.getpeername()
             serverName = mixminion.ServerInfo.displayServerByAddress(addr,port)
         serverName += " (fd %s)"%sock.fileno()
-        
+
         mixminion.TLSConnection.TLSConnection.__init__(
             self, tls, sock, serverName)
         EventStats.log.receivedConnection()
@@ -427,6 +473,7 @@
     # msgQueue: An instance of MessageQueue to receive notification from DNS
     #     DNS threads.  See _queueSendablePackets for more information.
     # _lock: protects only serverContext.
+    # maxClientConnections: DOCDOC
 
     def __init__(self, config, servercontext):
         AsyncServer.__init__(self)
@@ -434,8 +481,10 @@
         self.serverContext = servercontext
         self.clientContext = _ml.TLSContext_new()
         self._lock = threading.Lock()
+        self.maxClientConnections = config['Outgoing/MMTP'].get(
+            'MaxConnections', 16)
 
-        # FFFF Don't always listen; don't always retransmit!
+        # Don't always listen; don't always retransmit!
         # FFFF Support listening on multiple IPs
 
         ip4_supported, ip6_supported = getProtocolSupport()
@@ -473,6 +522,7 @@
         self.certificateCache = PeerCertificateCache()
         self.dnsCache = None
         self.msgQueue = MessageQueue()
+        self.pendingPackets = [] #DOCDOC
 
     def connectDNSCache(self, dnsCache):
         """Use the DNSCache object 'DNSCache' to resolve DNS queries for
@@ -578,6 +628,12 @@
 
            This function should only be called from the main thread.
         """
+        while len(self.clientConByAddr) < self.maxClientConnections and self.pendingPackets:
+            args = self.pendingPackets[0]
+            del self.pendingPackets[0]
+            LOG.debug("Sending %s delayed packets...",len(args[5]))
+            self._sendPackets(*args)
+
         while 1:
             try:
                 family,addr,port,keyID,deliverable,serverName = \
@@ -607,6 +663,12 @@
                     con.addPacket(d)
                 return
 
+        if len(self.clientConByAddr) >= self.maxClientConnections:
+            LOG.debug("We already have %s open client connections; delaying %s packets for %s",
+                      len(self.clientConByAddr), len(deliverable), serverName)
+            self.pendingPackets.append((family,ip,port,keyid,deliverable,serverName))
+            return
+
         try:
             # There isn't any connection to the right server. Open one...
             addr = (ip, port, keyID)
@@ -614,7 +676,8 @@
             con = MMTPClientConnection(
                 family, ip, port, keyID, serverName=serverName,
                 context=self.clientContext, certCache=self.certificateCache)
-            con.allPacketsSent = finished
+            #con.allPacketsSent = finished #XXXX007 wrong!
+            con.onClosed = finished
         except (socket.error, MixProtocolError), e:
             LOG.error("Unexpected socket error connecting to %s: %s",
                       serverName, e)
@@ -635,7 +698,8 @@
             self.clientConByAddr[addr] = con
 
     def __clientFinished(self, addr):
-        """Called when a client connection runs out of packets to send."""
+        """Called when a client connection runs out of packets to send,
+           or halts."""
         try:
             del self.clientConByAddr[addr]
         except KeyError:

Index: ServerConfig.py
===================================================================
RCS file: /home/minion/cvsroot/src/minion/lib/mixminion/server/ServerConfig.py,v
retrieving revision 1.49
retrieving revision 1.50
diff -u -d -r1.49 -r1.50
--- ServerConfig.py	27 Jan 2004 05:30:23 -0000	1.49
+++ ServerConfig.py	6 Feb 2004 23:14:28 -0000	1.50
@@ -9,6 +9,7 @@
 
 import operator
 import os
+import sys
 
 import mixminion.Config
 import mixminion.server.Modules
@@ -91,10 +92,13 @@
             LOG.warn("Allow/deny are not yet supported")
 
         if not self['Outgoing/MMTP'].get('Enabled'):
-            LOG.warn("Disabling incoming MMTP is not yet supported.")
+            LOG.warn("Disabling outgoing MMTP is not yet supported.")
         if [e for e in self._sectionEntries['Outgoing/MMTP']
             if e[0] in ('Allow', 'Deny')]:
             LOG.warn("Allow/deny are not yet supported")
+        mc = self['Outgoing/MMTP'].get('MaxConnections')
+        if mc is not None and mc < 1:
+            raise ConfigError("MaxConnections must be at least 1.")
 
         self.validateRetrySchedule("Outgoing/MMTP")
 
@@ -363,6 +367,7 @@
         'Outgoing/MMTP' : { 'Enabled' : ('REQUIRE', "boolean", "no"),
                             'Retry' : ('ALLOW', "intervalList",
                                  "every 1 hour for 1 day, 7 hours for 5 days"),
+                            'MaxConnections' : ('ALLOW', 'int', '16'),
                           'Allow' : ('ALLOW*', "addressSet_allow", None),
                           'Deny' : ('ALLOW*', "addressSet_deny", None) },
         # FFFF Missing: Queue-Size / Queue config options

Index: ServerMain.py
===================================================================
RCS file: /home/minion/cvsroot/src/minion/lib/mixminion/server/ServerMain.py,v
retrieving revision 1.116
retrieving revision 1.117
diff -u -d -r1.116 -r1.117
--- ServerMain.py	2 Feb 2004 07:05:50 -0000	1.116
+++ ServerMain.py	6 Feb 2004 23:14:28 -0000	1.117
@@ -907,9 +907,11 @@
             nextEventTime = self.firstEventTime()
             now = time.time()
             timeLeft = nextEventTime - now
+            tickInterval = self.mmtpServer.TICK_INTERVAL
+            nexttick = now+tickInterval
             while timeLeft > 0:
                 # Handle pending network events
-                self.mmtpServer.process(2)
+                self.mmtpServer.process(tickInterval)
                 # Check for signals
                 if STOPPING:
                     LOG.info("Caught SIGTERM; shutting down.")
@@ -927,6 +929,9 @@
 
                 # Calculate remaining time until the next event.
                 now = time.time()
+                if now > nexttick:
+                    self.mmtpServer.tick()
+                    nextTick = now+tickInterval
                 timeLeft = nextEventTime - now
 
             # An event has fired.