[Author Prev][Author Next][Thread Prev][Thread Next][Author Index][Thread Index]

[minion-cvs] Untested and likely buggy code to limit bandwidth usage...



Update of /home/minion/cvsroot/src/minion/lib/mixminion
In directory moria.mit.edu:/tmp/cvs-serv1148/lib/mixminion

Modified Files:
	Common.py MMTPClient.py TLSConnection.py test.py 
Log Message:
Untested and likely buggy code to limit bandwidth usage and number of concurrent outgoing connections

Index: Common.py
===================================================================
RCS file: /home/minion/cvsroot/src/minion/lib/mixminion/Common.py,v
retrieving revision 1.129
retrieving revision 1.130
diff -u -d -r1.129 -r1.130
--- Common.py	27 Jan 2004 05:13:36 -0000	1.129
+++ Common.py	6 Feb 2004 23:14:28 -0000	1.130
@@ -785,7 +785,7 @@
     # We use the '_dst[0]' variable to check whether our DST setting
     # has changed since the last time _logtime was called.
     if lt[8]!=_dst[0]:
-        # If it was, we regenerate the string '_tzadj[0]' to be the 
+        # If it was, we regenerate the string '_tzadj[0]' to be the
         # adjustment to UTC used to get local time.
         _dst[0]=lt[8]
         offset = floorDiv((calendar.timegm(lt)-time.mktime(lt)),60)
@@ -795,11 +795,11 @@
             sign='-'
         h,m=divmod(abs(offset),60)
         _tzadj[0]="%s%02d%02d"%(sign,h,m)
-        
+
     return "%s.%03d %s"%(_strftime("%b %d %H:%M:%S", lt),
                          (t*1000)%1000,
                          _tzadj[0])
-    #return time.strftime("%b %d %H:%M:%S.%%03d %z", time.localtime(t)) %( 
+    #return time.strftime("%b %d %H:%M:%S.%%03d %z", time.localtime(t)) %(
     #    (t*1000)%1000)
 
 class _FileLogHandler:

Index: MMTPClient.py
===================================================================
RCS file: /home/minion/cvsroot/src/minion/lib/mixminion/MMTPClient.py,v
retrieving revision 1.52
retrieving revision 1.53
diff -u -d -r1.52 -r1.53
--- MMTPClient.py	2 Feb 2004 07:05:49 -0000	1.52
+++ MMTPClient.py	6 Feb 2004 23:14:28 -0000	1.53
@@ -16,6 +16,7 @@
 __all__ = [ "MMTPClientConnection", "sendPackets", "DeliverableMessage" ]
 
 import socket
+import sys
 import time
 import mixminion._minionlib as _ml
 import mixminion.NetUtils
@@ -71,6 +72,7 @@
     #   sent to the TLS connection, in the order they should be sent.
     # pendingPackets: a list of DeliverableMessage objects that have been
     #   sent to the TLS connection, but which have not yet been acknowledged.
+    # nPacketsTotal: total number of packets we've ever been asked to send.
     # nPacketsSent: total number of packets sent across the TLS connection
     # nPacketsAcked: total number of acks received from the TLS connection
     # expectedAcks: list of acceptAck,rejectAck tuples for the packets
@@ -121,7 +123,7 @@
         self.packets = []
         self.pendingPackets = []
         self.expectedAcks = []
-        self.nPacketsSent = self.nPacketsAcked = 0
+        self.nPacketsSent = self.nPacketsAcked = self.nPacketsTotal =0
         self._isConnected = 0
         self._isFailed = 0
         self._isAlive = 1 #DOCDOC
@@ -135,6 +137,7 @@
            failure, deliverableMessage.failed will be called."""
         assert hasattr(deliverableMessage, 'getContents')
         self.packets.append(deliverableMessage)
+        self.nPacketsTotal += 1
         # If we're connected, maybe start sending the packet we just added.
         self._updateRWState()
 
@@ -448,7 +451,7 @@
 
         rfds,wfds,xfds=select.select(rfds,wfds,xfds,3)
         now = time.time()
-        wr,ww,isopen=con.process(fd in rfds, fd in wfds, 0)
+        wr,ww,isopen,_=con.process(fd in rfds, fd in wfds, 0)
         if isopen:
             if con.tryTimeout(now-timeout):
                 isopen = 0

Index: TLSConnection.py
===================================================================
RCS file: /home/minion/cvsroot/src/minion/lib/mixminion/TLSConnection.py,v
retrieving revision 1.8
retrieving revision 1.9
diff -u -d -r1.8 -r1.9
--- TLSConnection.py	2 Feb 2004 07:19:47 -0000	1.8
+++ TLSConnection.py	6 Feb 2004 23:14:28 -0000	1.9
@@ -6,7 +6,7 @@
 """
 
 #XXXX implement renegotiate
-
+import sys
 import time
 
 import mixminion._minionlib as _ml
@@ -15,6 +15,9 @@
 # Number of bytes to try reading at once.
 _READLEN = 1024
 
+class _Closing(Exception):
+    pass
+
 class TLSConnection:
     """Common abstract class to implement asynchronous bidirectional
        TLS connections.  This is still not a completely generic TLS
@@ -252,21 +255,21 @@
         self.__stateFn = self.__closedFn
         self.onClosed()
 
-    def __connectFn(self, r, w):
+    def __connectFn(self, r, w, cap):
         """state function: client-side TLS handshaking"""
         self.tls.connect() # might raise TLS*
         self.__setup = 1
         self.onConnected()
         return 1 # We may be ready for the next state.
 
-    def __acceptFn(self, r, w):
+    def __acceptFn(self, r, w, cap):
         """state function: server-side TLS handshaking"""
         self.tls.accept() # might raise TLS*
         self.__setup = 1
         self.onConnected()
         return 1 # We may be ready for the next state.
 
-    def __shutdownFn(self, r, w):
+    def __shutdownFn(self, r, w, cap):
         """state function: TLS shutdonw"""
         while 1:
             if self.__awaitingShutdown:
@@ -296,14 +299,14 @@
             if done:
                 LOG.debug("Got a completed shutdown from %s", self.address)
                 self.shutdownFinished()
-                self.__close()
+                raise _Closing()
                 return 0
             else:
                 LOG.trace("Shutdown returned zero -- entering read mode.")
                 self.__awaitingShutdown = 1
                 self.__bytesReadOnShutdown = 0
 
-    def __closedFn(self,r,w):
+    def __closedFn(self,r,w, cap):
         """state function: called when the connection is closed"""
         self.__sock = None
         return 0
@@ -314,36 +317,36 @@
         LOG.error("Read over 128 bytes of unexpected data from closing "
                   "connection to %s", self.address)
         self.onTLSError()
-        self.__close()
+        raise _Closing()
 
-    def __dataFn(self, r, w):
+    def __dataFn(self, r, w, cap):
         """state function: read or write data as appropriate"""
         if r:
             if self.__writeBlockedOnRead:
-                self.__doWrite()
+                cap = self.__doWrite(cap)
             if self.__reading and not self.__writeBlockedOnRead:
-                self.__doRead()
+                cap = self.__doRead(cap)
         if w:
             if self.__reading and self.__readBlockedOnWrite:
-                self.__doRead()
+                cap = self.__doRead(cap)
             if self.outbuf and not self.__readBlockedOnWrite:
-                self.__doWrite()
+                cap = self.__doWrite(cap)
         return 0
 
-    def __doWrite(self):
+    def __doWrite(self, cap):
         "Helper function: write as much data from self.outbuf as we can."
         self.__writeBlockedOnRead = 0
-        while self.outbuf:
+        while self.outbuf and cap > 0:
             try:
-                n = self.tls.write(self.outbuf[0])
+                n = self.tls.write(self.outbuf[0][:cap])
             except _ml.TLSWantRead:
                 self.__writeBlockedOnRead = 1
                 self.wantWrite = 0
                 self.wantRead = 1
-                return
+                return cap
             except _ml.TLSWantWrite:
                 self.wantWrite = 1
-                return
+                return cap
             else:
                 # We wrote some data: remove it from the buffer.
                 assert n >= 0
@@ -353,62 +356,72 @@
                 else:
                     self.outbuf[0] = self.outbuf[0][n:]
                 self.outbuflen -= n
+                cap -= n
                 self.onWrite(n)
         # There's no more data to write.  We only want write events now if
         # read is blocking on write.
         self.wantWrite = self.__readBlockedOnWrite
         self.doneWriting()
+        return cap
 
-    def __doRead(self):
+    def __doRead(self, cap):
         "Helper function: read as much data as we can."
         self.__readBlockedOnWrite = 0
-        while self.__reading:
+        while self.__reading and cap >= 0:
             try:
-                s = self.tls.read(_READLEN)
+                s = self.tls.read(min(_READLEN,cap))
                 if s == 0:
                     # The other side sent us a shutdown; we'll shutdown too.
                     self.receivedShutdown()
                     LOG.trace("read returned 0: shutting down connection to %s"
                               , self.address)
                     self.startShutdown()
-                    return
+                    return cap
                 else:
                     # We got some data; add it to the inbuf.
                     LOG.trace("Read got %s bytes from %s", len(s),self.address)
                     self.inbuf.append(s)
                     self.inbuflen += len(s)
-                    if not self.tls.pending():
+                    cap -= len(s)
+                    if (not self.tls.pending()) and cap > 0:
                         # Only call onRead when we've got all the pending
-                        # data from self.tls.
+                        # data from self.tls. DOCDOC cap
                         self.onRead()
             except _ml.TLSWantRead:
                 self.wantRead = 1
-                return
+                return cap
             except _ml.TLSWantWrite:
                 self.wantRead = 0
                 self.wantWrite = 1
                 self.__readBlockedOnWrite = 1
-                return
+                return cap
 
-    def process(self, r, w, x):
+    def process(self, r, w, x, maxBytes=None):
         """Given that we've received read/write events as indicated in r/w,
            advance the state of the connection as much as possible.  Return
-           is as in 'getStatus'."""
+           is as in 'getStatus'.
+
+           DOCDOC cap."""
         if x and (self.sock is not None):
             self.__close(gotClose=1)
-            return 0,0,0
-        elif not (r or w):
-            return self.wantRead, self.wantWrite, (self.sock is not None)
+            return 0,0,0,0
+        if not (r or w):
+            return self.wantRead, self.wantWrite, (self.sock is not None),0
+
+        bytesAtStart = bytesNow = self.tls.get_num_bytes_raw();
+        if maxBytes is None:
+            bytesCutoff = sys.maxint
+            maxBytes = sys.maxint-bytesNow
+        else:
+            bytesCutoff = nr+nw+maxBytes
         try:
             self.lastActivity = time.time()
-            while self.__stateFn(r, w):
+            while bytesNow < bytesCutoff and self.__stateFn(r, w, maxBytes):
                 # If __stateFn returns 1, then the state has changed, and
                 # we should try __stateFn again.
-                pass
-        except _ml.TLSClosed:
-            # We get this error if the socket unexpectedly closes underneath
-            # the TLS connection.
-            self.__close(gotClose=1)
+                if self.tls is not None:
+                    bytesNow = self.tls.get_num_bytes_raw()
+                    maxBytes = bytesCutoff-bytesNow
         except _ml.TLSWantRead:
             self.wantRead = 1
             self.wantWrite = 0
@@ -418,7 +431,18 @@
                 self.wantWrite = 2
             else:
                 self.wantWrite = 1
+        except _Closing:
+            #DOCDOC
+            if self.tls is not None:
+                bytesNow = self.tls.get_num_bytes_raw()
+            self.__close()
+        except _ml.TLSClosed:
+            # We get this error if the socket unexpectedly closes underneath
+            # the TLS connection.
+            self.__close(gotClose=1)
         except _ml.TLSError, e:
+            if self.tls is not None:
+                bytesNow = self.tls.get_num_bytes_raw()
             if not (self.__awaitingShutdown or self.__stateFn == self.__shutdownFn):
                 e = str(e)
                 if stringContains(e, 'wrong version number'):
@@ -433,10 +457,12 @@
                 self.onTLSError()
                 self.__close()
 
-        return self.wantRead, self.wantWrite, (self.sock is not None)
+        return (self.wantRead, self.wantWrite, (self.sock is not None),
+                bytesNow-bytesAtStart)
 
     def getStatus(self):
         """Return a 3-tuple of wantRead, wantWrite, and isOpen."""
+        #DOCDOC
         return self.wantRead, self.wantWrite, (self.sock is not None)
 
     #####

Index: test.py
===================================================================
RCS file: /home/minion/cvsroot/src/minion/lib/mixminion/test.py,v
retrieving revision 1.182
retrieving revision 1.183
diff -u -d -r1.182 -r1.183
--- test.py	6 Feb 2004 18:00:59 -0000	1.182
+++ test.py	6 Feb 2004 23:14:28 -0000	1.183
@@ -7543,7 +7543,7 @@
     tc = loader.loadTestsFromTestCase
 
     if 0:
-        suite.addTest(tc(ServerInfoTests))
+        suite.addTest(tc(MMTPTests))
         return suite
     testClasses = [MiscTests,
                    MinionlibCryptoTests,