[Author Prev][Author Next][Thread Prev][Thread Next][Author Index][Thread Index]
[minion-cvs] Untested and likely buggy code to limit bandwidth usage...
Update of /home/minion/cvsroot/src/minion/lib/mixminion
In directory moria.mit.edu:/tmp/cvs-serv1148/lib/mixminion
Modified Files:
Common.py MMTPClient.py TLSConnection.py test.py
Log Message:
Untested and likely buggy code to limit bandwidth usage and number of concurrent outgoing connections
Index: Common.py
===================================================================
RCS file: /home/minion/cvsroot/src/minion/lib/mixminion/Common.py,v
retrieving revision 1.129
retrieving revision 1.130
diff -u -d -r1.129 -r1.130
--- Common.py 27 Jan 2004 05:13:36 -0000 1.129
+++ Common.py 6 Feb 2004 23:14:28 -0000 1.130
@@ -785,7 +785,7 @@
# We use the '_dst[0]' variable to check whether our DST setting
# has changed since the last time _logtime was called.
if lt[8]!=_dst[0]:
- # If it was, we regenerate the string '_tzadj[0]' to be the
+ # If it was, we regenerate the string '_tzadj[0]' to be the
# adjustment to UTC used to get local time.
_dst[0]=lt[8]
offset = floorDiv((calendar.timegm(lt)-time.mktime(lt)),60)
@@ -795,11 +795,11 @@
sign='-'
h,m=divmod(abs(offset),60)
_tzadj[0]="%s%02d%02d"%(sign,h,m)
-
+
return "%s.%03d %s"%(_strftime("%b %d %H:%M:%S", lt),
(t*1000)%1000,
_tzadj[0])
- #return time.strftime("%b %d %H:%M:%S.%%03d %z", time.localtime(t)) %(
+ #return time.strftime("%b %d %H:%M:%S.%%03d %z", time.localtime(t)) %(
# (t*1000)%1000)
class _FileLogHandler:
Index: MMTPClient.py
===================================================================
RCS file: /home/minion/cvsroot/src/minion/lib/mixminion/MMTPClient.py,v
retrieving revision 1.52
retrieving revision 1.53
diff -u -d -r1.52 -r1.53
--- MMTPClient.py 2 Feb 2004 07:05:49 -0000 1.52
+++ MMTPClient.py 6 Feb 2004 23:14:28 -0000 1.53
@@ -16,6 +16,7 @@
__all__ = [ "MMTPClientConnection", "sendPackets", "DeliverableMessage" ]
import socket
+import sys
import time
import mixminion._minionlib as _ml
import mixminion.NetUtils
@@ -71,6 +72,7 @@
# sent to the TLS connection, in the order they should be sent.
# pendingPackets: a list of DeliverableMessage objects that have been
# sent to the TLS connection, but which have not yet been acknowledged.
+ # nPacketsTotal: total number of packets we've ever been asked to send.
# nPacketsSent: total number of packets sent across the TLS connection
# nPacketsAcked: total number of acks received from the TLS connection
# expectedAcks: list of acceptAck,rejectAck tuples for the packets
@@ -121,7 +123,7 @@
self.packets = []
self.pendingPackets = []
self.expectedAcks = []
- self.nPacketsSent = self.nPacketsAcked = 0
+ self.nPacketsSent = self.nPacketsAcked = self.nPacketsTotal =0
self._isConnected = 0
self._isFailed = 0
self._isAlive = 1 #DOCDOC
@@ -135,6 +137,7 @@
failure, deliverableMessage.failed will be called."""
assert hasattr(deliverableMessage, 'getContents')
self.packets.append(deliverableMessage)
+ self.nPacketsTotal += 1
# If we're connected, maybe start sending the packet we just added.
self._updateRWState()
@@ -448,7 +451,7 @@
rfds,wfds,xfds=select.select(rfds,wfds,xfds,3)
now = time.time()
- wr,ww,isopen=con.process(fd in rfds, fd in wfds, 0)
+ wr,ww,isopen,_=con.process(fd in rfds, fd in wfds, 0)
if isopen:
if con.tryTimeout(now-timeout):
isopen = 0
Index: TLSConnection.py
===================================================================
RCS file: /home/minion/cvsroot/src/minion/lib/mixminion/TLSConnection.py,v
retrieving revision 1.8
retrieving revision 1.9
diff -u -d -r1.8 -r1.9
--- TLSConnection.py 2 Feb 2004 07:19:47 -0000 1.8
+++ TLSConnection.py 6 Feb 2004 23:14:28 -0000 1.9
@@ -6,7 +6,7 @@
"""
#XXXX implement renegotiate
-
+import sys
import time
import mixminion._minionlib as _ml
@@ -15,6 +15,9 @@
# Number of bytes to try reading at once.
_READLEN = 1024
+class _Closing(Exception):
+ pass
+
class TLSConnection:
"""Common abstract class to implement asynchronous bidirectional
TLS connections. This is still not a completely generic TLS
@@ -252,21 +255,21 @@
self.__stateFn = self.__closedFn
self.onClosed()
- def __connectFn(self, r, w):
+ def __connectFn(self, r, w, cap):
"""state function: client-side TLS handshaking"""
self.tls.connect() # might raise TLS*
self.__setup = 1
self.onConnected()
return 1 # We may be ready for the next state.
- def __acceptFn(self, r, w):
+ def __acceptFn(self, r, w, cap):
"""state function: server-side TLS handshaking"""
self.tls.accept() # might raise TLS*
self.__setup = 1
self.onConnected()
return 1 # We may be ready for the next state.
- def __shutdownFn(self, r, w):
+ def __shutdownFn(self, r, w, cap):
"""state function: TLS shutdonw"""
while 1:
if self.__awaitingShutdown:
@@ -296,14 +299,14 @@
if done:
LOG.debug("Got a completed shutdown from %s", self.address)
self.shutdownFinished()
- self.__close()
+ raise _Closing()
return 0
else:
LOG.trace("Shutdown returned zero -- entering read mode.")
self.__awaitingShutdown = 1
self.__bytesReadOnShutdown = 0
- def __closedFn(self,r,w):
+ def __closedFn(self,r,w, cap):
"""state function: called when the connection is closed"""
self.__sock = None
return 0
@@ -314,36 +317,36 @@
LOG.error("Read over 128 bytes of unexpected data from closing "
"connection to %s", self.address)
self.onTLSError()
- self.__close()
+ raise _Closing()
- def __dataFn(self, r, w):
+ def __dataFn(self, r, w, cap):
"""state function: read or write data as appropriate"""
if r:
if self.__writeBlockedOnRead:
- self.__doWrite()
+ cap = self.__doWrite(cap)
if self.__reading and not self.__writeBlockedOnRead:
- self.__doRead()
+ cap = self.__doRead(cap)
if w:
if self.__reading and self.__readBlockedOnWrite:
- self.__doRead()
+ cap = self.__doRead(cap)
if self.outbuf and not self.__readBlockedOnWrite:
- self.__doWrite()
+ cap = self.__doWrite(cap)
return 0
- def __doWrite(self):
+ def __doWrite(self, cap):
"Helper function: write as much data from self.outbuf as we can."
self.__writeBlockedOnRead = 0
- while self.outbuf:
+ while self.outbuf and cap > 0:
try:
- n = self.tls.write(self.outbuf[0])
+ n = self.tls.write(self.outbuf[0][:cap])
except _ml.TLSWantRead:
self.__writeBlockedOnRead = 1
self.wantWrite = 0
self.wantRead = 1
- return
+ return cap
except _ml.TLSWantWrite:
self.wantWrite = 1
- return
+ return cap
else:
# We wrote some data: remove it from the buffer.
assert n >= 0
@@ -353,62 +356,72 @@
else:
self.outbuf[0] = self.outbuf[0][n:]
self.outbuflen -= n
+ cap -= n
self.onWrite(n)
# There's no more data to write. We only want write events now if
# read is blocking on write.
self.wantWrite = self.__readBlockedOnWrite
self.doneWriting()
+ return cap
- def __doRead(self):
+ def __doRead(self, cap):
"Helper function: read as much data as we can."
self.__readBlockedOnWrite = 0
- while self.__reading:
+ while self.__reading and cap >= 0:
try:
- s = self.tls.read(_READLEN)
+ s = self.tls.read(min(_READLEN,cap))
if s == 0:
# The other side sent us a shutdown; we'll shutdown too.
self.receivedShutdown()
LOG.trace("read returned 0: shutting down connection to %s"
, self.address)
self.startShutdown()
- return
+ return cap
else:
# We got some data; add it to the inbuf.
LOG.trace("Read got %s bytes from %s", len(s),self.address)
self.inbuf.append(s)
self.inbuflen += len(s)
- if not self.tls.pending():
+ cap -= len(s)
+ if (not self.tls.pending()) and cap > 0:
# Only call onRead when we've got all the pending
- # data from self.tls.
+ # data from self.tls. DOCDOC cap
self.onRead()
except _ml.TLSWantRead:
self.wantRead = 1
- return
+ return cap
except _ml.TLSWantWrite:
self.wantRead = 0
self.wantWrite = 1
self.__readBlockedOnWrite = 1
- return
+ return cap
- def process(self, r, w, x):
+ def process(self, r, w, x, maxBytes=None):
"""Given that we've received read/write events as indicated in r/w,
advance the state of the connection as much as possible. Return
- is as in 'getStatus'."""
+ is as in 'getStatus'.
+
+ DOCDOC cap."""
if x and (self.sock is not None):
self.__close(gotClose=1)
- return 0,0,0
- elif not (r or w):
- return self.wantRead, self.wantWrite, (self.sock is not None)
+ return 0,0,0,0
+ if not (r or w):
+ return self.wantRead, self.wantWrite, (self.sock is not None),0
+
+ bytesAtStart = bytesNow = self.tls.get_num_bytes_raw();
+ if maxBytes is None:
+ bytesCutoff = sys.maxint
+ maxBytes = sys.maxint-bytesNow
+ else:
+ bytesCutoff = nr+nw+maxBytes
try:
self.lastActivity = time.time()
- while self.__stateFn(r, w):
+ while bytesNow < bytesCutoff and self.__stateFn(r, w, maxBytes):
# If __stateFn returns 1, then the state has changed, and
# we should try __stateFn again.
- pass
- except _ml.TLSClosed:
- # We get this error if the socket unexpectedly closes underneath
- # the TLS connection.
- self.__close(gotClose=1)
+ if self.tls is not None:
+ bytesNow = self.tls.get_num_bytes_raw()
+ maxBytes = bytesCutoff-bytesNow
except _ml.TLSWantRead:
self.wantRead = 1
self.wantWrite = 0
@@ -418,7 +431,18 @@
self.wantWrite = 2
else:
self.wantWrite = 1
+ except _Closing:
+ #DOCDOC
+ if self.tls is not None:
+ bytesNow = self.tls.get_num_bytes_raw()
+ self.__close()
+ except _ml.TLSClosed:
+ # We get this error if the socket unexpectedly closes underneath
+ # the TLS connection.
+ self.__close(gotClose=1)
except _ml.TLSError, e:
+ if self.tls is not None:
+ bytesNow = self.tls.get_num_bytes_raw()
if not (self.__awaitingShutdown or self.__stateFn == self.__shutdownFn):
e = str(e)
if stringContains(e, 'wrong version number'):
@@ -433,10 +457,12 @@
self.onTLSError()
self.__close()
- return self.wantRead, self.wantWrite, (self.sock is not None)
+ return (self.wantRead, self.wantWrite, (self.sock is not None),
+ bytesNow-bytesAtStart)
def getStatus(self):
"""Return a 3-tuple of wantRead, wantWrite, and isOpen."""
+ #DOCDOC
return self.wantRead, self.wantWrite, (self.sock is not None)
#####
Index: test.py
===================================================================
RCS file: /home/minion/cvsroot/src/minion/lib/mixminion/test.py,v
retrieving revision 1.182
retrieving revision 1.183
diff -u -d -r1.182 -r1.183
--- test.py 6 Feb 2004 18:00:59 -0000 1.182
+++ test.py 6 Feb 2004 23:14:28 -0000 1.183
@@ -7543,7 +7543,7 @@
tc = loader.loadTestsFromTestCase
if 0:
- suite.addTest(tc(ServerInfoTests))
+ suite.addTest(tc(MMTPTests))
return suite
testClasses = [MiscTests,
MinionlibCryptoTests,