[Author Prev][Author Next][Thread Prev][Thread Next][Author Index][Thread Index]
[minion-cvs] Untested and likely buggy code to limit bandwidth usage...
Update of /home/minion/cvsroot/src/minion/lib/mixminion/server
In directory moria.mit.edu:/tmp/cvs-serv1148/lib/mixminion/server
Modified Files:
MMTPServer.py ServerConfig.py ServerMain.py
Log Message:
Untested and likely buggy code to limit bandwidth usage and number of concurrent outgoing connections
Index: MMTPServer.py
===================================================================
RCS file: /home/minion/cvsroot/src/minion/lib/mixminion/server/MMTPServer.py,v
retrieving revision 1.74
retrieving revision 1.75
diff -u -d -r1.74 -r1.75
--- MMTPServer.py 2 Feb 2004 07:05:50 -0000 1.74
+++ MMTPServer.py 6 Feb 2004 23:14:28 -0000 1.75
@@ -23,6 +23,7 @@
import socket
import select
import re
+import sys
import threading
import time
from types import StringType
@@ -31,7 +32,7 @@
import mixminion.TLSConnection
import mixminion._minionlib as _ml
from mixminion.Common import MixError, MixFatalError, MixProtocolError, \
- LOG, stringContains
+ LOG, stringContains, floorDiv
from mixminion.Crypto import sha1, getCommonPRNG
from mixminion.Packet import PACKET_LEN, DIGEST_LEN, IPV4Info, MMTPHostInfo
from mixminion.MMTPClient import PeerCertificateCache, MMTPClientConnection
@@ -54,11 +55,15 @@
# self.connections: a map from fd to Connection objects.
# self.state: a map from fd to the latest wantRead,wantWrite tuples
# returned by the connection objects' process or getStatus methods.
+
+ TICK_INTERVAL = 1.0 #DOCDOC
+
def __init__(self):
"""Create a new AsyncServer with no readers or writers."""
self._timeout = None
self.connections = {}
self.state = {}
+ self.bandwidthPerTick = self.bucket = None #DOCDOC
def process(self,timeout):
"""If any relevant file descriptors become available within
@@ -81,6 +86,10 @@
time.sleep(timeout)
return
+ if self.bucket is not None and self.bucket <= 0:
+ time.sleep(timeout)
+ return
+
try:
readfds,writefds,exfds = select.select(readfds,writefds,exfds,
timeout)
@@ -92,12 +101,24 @@
writefds += exfds
+ active = []
+
for fd, c in self.connections.items():
r = fd in readfds
w = fd in writefds
if not (r or w):
continue
- wr, ww, isopen = c.process(r,w)
+ active.append((c,r,w,fd))
+
+ if not active: return
+ if self.bucket is None:
+ cap = None
+ else:
+ cap = floorDiv(self.bucket, len(active))
+ for c,r,w,fd in active:
+ wr, ww, isopen, nbytes = c.process(r,w,0,cap)
+ if cap is not None:
+ self.bucket -= nbytes
if not isopen:
del self.connections[fd]
del self.state[fd]
@@ -133,6 +154,10 @@
if con.tryTimeout(cutoff):
self.remove(con,fd)
+ def tick(self):
+ """DOCDOC"""
+ self.bucket = self.bandwidthPerTick
+
class PollAsyncServer(SelectAsyncServer):
"""Subclass of SelectAsyncServer that uses 'poll' where available. This
is more efficient, but less universal."""
@@ -146,8 +171,11 @@
(1,1): select.POLLIN+select.POLLOUT+select.POLLERR,
(1,2): select.POLLIN+select.POLLOUT+select.POLLERR }
def process(self,timeout):
+ if self.bucket is not None and self.bucket <= 0:
+ time.sleep(timeout)
+ return
try:
- # (watch out: poll takes a timeout in msec, but select takes a
+ # (watch out: poll takes a timeout in msec, but select takes a
# timeout in sec.)
events = self.poll.poll(timeout*1000)
except select.error, e:
@@ -155,10 +183,19 @@
return
else:
raise e
+ if not events:
+ return
+ if self.bucket is None:
+ cap = None
+ else:
+ cap = floorDiv(self.bucket,len(events))
for fd, mask in events:
c = self.connections[fd]
- wr,ww,isopen = c.process(mask&select.POLLIN, mask&select.POLLOUT,
- mask&(select.POLLERR|select.POLLHUP))
+ wr,ww,isopen,n = c.process(mask&select.POLLIN, mask&select.POLLOUT,
+ mask&(select.POLLERR|select.POLLHUP),
+ cap)
+ if cap is not None:
+ self.bucket -= n
if not isopen:
self.poll.unregister(fd)
del self.connections[fd]
@@ -184,13 +221,13 @@
class Connection:
"A connection is an abstract superclass for asynchronous channels"
- def process(self, r, w, x):
+ def process(self, r, w, x, cap):
"""Invoked when there is data to read or write. Must return a 3-tuple
of (wantRead, wantWrite, isOpen)."""
- return 0,0,0
+ return 0,0,0,0#DOCDOC
def getStatus(self):
"""Returns the same 3-tuple as process."""
- return 0,0,0
+ return 0,0,0#DOCDOC
def fileno(self):
"""Returns an integer file descriptor for this connection, or returns
an object that can return such a descriptor."""
@@ -199,6 +236,12 @@
"""If this connection has seen no activity since 'cutoff', and it
is subject to aging, shut it down."""
pass
+ def resetMaxBandwidth(self, n):
+ """DOCDOC"""
+ pass
+ def getBytesTransferred(self):
+ """DOCDOC"""
+ return 0
class ListenConnection(Connection):
"""A ListenConnection listens on a given port/ip combination, and calls
@@ -229,12 +272,12 @@
LOG.info("Listening at %s on port %s (fd %s)",
ip, port, self.sock.fileno())
- def process(self, r, w, x):
+ def process(self, r, w, x, cap):
#XXXX007 do something with x
con, addr = self.sock.accept()
LOG.debug("Accepted connection from %s", addr)
self.connectionFactory(con)
- return self.isOpen,0,self.isOpen
+ return self.isOpen,0,self.isOpen,0
def getStatus(self):
return self.isOpen,0,self.isOpen
@@ -248,6 +291,9 @@
def fileno(self):
return self.sock.fileno()
+ def resetMaxBandwidth(self, n):
+ pass
+
class MMTPServerConnection(mixminion.TLSConnection.TLSConnection):
"""A TLSConnection that implements the server side of MMTP."""
##
@@ -264,7 +310,7 @@
addr,port = sock.getpeername()
serverName = mixminion.ServerInfo.displayServerByAddress(addr,port)
serverName += " (fd %s)"%sock.fileno()
-
+
mixminion.TLSConnection.TLSConnection.__init__(
self, tls, sock, serverName)
EventStats.log.receivedConnection()
@@ -427,6 +473,7 @@
# msgQueue: An instance of MessageQueue to receive notification from DNS
# DNS threads. See _queueSendablePackets for more information.
# _lock: protects only serverContext.
+ # maxClientConnections: DOCDOC
def __init__(self, config, servercontext):
AsyncServer.__init__(self)
@@ -434,8 +481,10 @@
self.serverContext = servercontext
self.clientContext = _ml.TLSContext_new()
self._lock = threading.Lock()
+ self.maxClientConnections = config['Outgoing/MMTP'].get(
+ 'MaxConnections', 16)
- # FFFF Don't always listen; don't always retransmit!
+ # Don't always listen; don't always retransmit!
# FFFF Support listening on multiple IPs
ip4_supported, ip6_supported = getProtocolSupport()
@@ -473,6 +522,7 @@
self.certificateCache = PeerCertificateCache()
self.dnsCache = None
self.msgQueue = MessageQueue()
+ self.pendingPackets = [] #DOCDOC
def connectDNSCache(self, dnsCache):
"""Use the DNSCache object 'DNSCache' to resolve DNS queries for
@@ -578,6 +628,12 @@
This function should only be called from the main thread.
"""
+ while len(self.clientConByAddr) < self.maxClientConnections and self.pendingPackets:
+ args = self.pendingPackets[0]
+ del self.pendingPackets[0]
+ LOG.debug("Sending %s delayed packets...",len(args[5]))
+ self._sendPackets(*args)
+
while 1:
try:
family,addr,port,keyID,deliverable,serverName = \
@@ -607,6 +663,12 @@
con.addPacket(d)
return
+ if len(self.clientConByAddr) >= self.maxClientConnections:
+ LOG.debug("We already have %s open client connections; delaying %s packets for %s",
+ len(self.clientConByAddr), len(deliverable), serverName)
+ self.pendingPackets.append((family,ip,port,keyid,deliverable,serverName))
+ return
+
try:
# There isn't any connection to the right server. Open one...
addr = (ip, port, keyID)
@@ -614,7 +676,8 @@
con = MMTPClientConnection(
family, ip, port, keyID, serverName=serverName,
context=self.clientContext, certCache=self.certificateCache)
- con.allPacketsSent = finished
+ #con.allPacketsSent = finished #XXXX007 wrong!
+ con.onClosed = finished
except (socket.error, MixProtocolError), e:
LOG.error("Unexpected socket error connecting to %s: %s",
serverName, e)
@@ -635,7 +698,8 @@
self.clientConByAddr[addr] = con
def __clientFinished(self, addr):
- """Called when a client connection runs out of packets to send."""
+ """Called when a client connection runs out of packets to send,
+ or halts."""
try:
del self.clientConByAddr[addr]
except KeyError:
Index: ServerConfig.py
===================================================================
RCS file: /home/minion/cvsroot/src/minion/lib/mixminion/server/ServerConfig.py,v
retrieving revision 1.49
retrieving revision 1.50
diff -u -d -r1.49 -r1.50
--- ServerConfig.py 27 Jan 2004 05:30:23 -0000 1.49
+++ ServerConfig.py 6 Feb 2004 23:14:28 -0000 1.50
@@ -9,6 +9,7 @@
import operator
import os
+import sys
import mixminion.Config
import mixminion.server.Modules
@@ -91,10 +92,13 @@
LOG.warn("Allow/deny are not yet supported")
if not self['Outgoing/MMTP'].get('Enabled'):
- LOG.warn("Disabling incoming MMTP is not yet supported.")
+ LOG.warn("Disabling outgoing MMTP is not yet supported.")
if [e for e in self._sectionEntries['Outgoing/MMTP']
if e[0] in ('Allow', 'Deny')]:
LOG.warn("Allow/deny are not yet supported")
+ mc = self['Outgoing/MMTP'].get('MaxConnections')
+ if mc is not None and mc < 1:
+ raise ConfigError("MaxConnections must be at least 1.")
self.validateRetrySchedule("Outgoing/MMTP")
@@ -363,6 +367,7 @@
'Outgoing/MMTP' : { 'Enabled' : ('REQUIRE', "boolean", "no"),
'Retry' : ('ALLOW', "intervalList",
"every 1 hour for 1 day, 7 hours for 5 days"),
+ 'MaxConnections' : ('ALLOW', 'int', '16'),
'Allow' : ('ALLOW*', "addressSet_allow", None),
'Deny' : ('ALLOW*', "addressSet_deny", None) },
# FFFF Missing: Queue-Size / Queue config options
Index: ServerMain.py
===================================================================
RCS file: /home/minion/cvsroot/src/minion/lib/mixminion/server/ServerMain.py,v
retrieving revision 1.116
retrieving revision 1.117
diff -u -d -r1.116 -r1.117
--- ServerMain.py 2 Feb 2004 07:05:50 -0000 1.116
+++ ServerMain.py 6 Feb 2004 23:14:28 -0000 1.117
@@ -907,9 +907,11 @@
nextEventTime = self.firstEventTime()
now = time.time()
timeLeft = nextEventTime - now
+ tickInterval = self.mmtpServer.TICK_INTERVAL
+ nexttick = now+tickInterval
while timeLeft > 0:
# Handle pending network events
- self.mmtpServer.process(2)
+ self.mmtpServer.process(tickInterval)
# Check for signals
if STOPPING:
LOG.info("Caught SIGTERM; shutting down.")
@@ -927,6 +929,9 @@
# Calculate remaining time until the next event.
now = time.time()
+ if now > nexttick:
+ self.mmtpServer.tick()
+ nextTick = now+tickInterval
timeLeft = nextEventTime - now
# An event has fired.