[Author Prev][Author Next][Thread Prev][Thread Next][Author Index][Thread Index]

[minion-cvs] Resolve a potential race; drop memory on input buffers ...



Update of /home/minion/cvsroot/src/minion/lib/mixminion/server
In directory moria.mit.edu:/tmp/cvs-serv8037/lib/mixminion/server

Modified Files:
	MMTPServer.py 
Log Message:
Resolve a potential race; drop memory on input buffers more aggressively.



Index: MMTPServer.py
===================================================================
RCS file: /home/minion/cvsroot/src/minion/lib/mixminion/server/MMTPServer.py,v
retrieving revision 1.34
retrieving revision 1.35
diff -u -d -r1.34 -r1.35
--- MMTPServer.py	3 Jun 2003 17:28:12 -0000	1.34
+++ MMTPServer.py	5 Jun 2003 01:52:52 -0000	1.35
@@ -262,6 +262,8 @@
         self.lastActivity = time.time()
         self.__serverMode = serverMode
         self.__failed = 0
+        self.__inbuf = []
+        self.__inbuflen = 0
 
         if serverMode:
             self.__connecting = 0
@@ -298,7 +300,7 @@
            'bytes' bytes.  If terminator is provided, we read until we
            encounter the terminator, but give up after 'bytes' bytes.
         """
-        self.__inbuf = []
+        del self.__inbuf[:]
         self.__inbuflen = 0
         self.__expectReadLen = bytes
         self.__terminator = terminator
@@ -507,6 +509,13 @@
         """Returns the current contents of the input buffer."""
         return "".join(self.__inbuf)
 
+    def pullInput(self):
+        """DOCDOC"""
+        inp = "".join(self.__inbuf)
+        del self.__inbuf[:]
+        self.__inbuflen = 0
+        return inp
+
     def getTLSConnection(self):
         return self.__con
 
@@ -586,7 +595,7 @@
            rejects, or sends our response.
         """
         trace("done w/ client sendproto to %s", self.address)
-        inp = self.getInput()
+        inp = self.pullInput()
         m = PROTOCOL_RE.match(inp)
 
         if not m:
@@ -619,7 +628,7 @@
     def __receivedMessage(self):
         """Called once we've read a message from the line.  Checks the
            digest, and either rejects or begins sending an ACK."""
-        data = self.getInput()
+        data = self.pullInput()
         msg = data[SEND_CONTROL_LEN:-DIGEST_LEN]
         digest = data[-DIGEST_LEN:]
 
@@ -699,6 +708,8 @@
     # _curMessage, _curHandle: Correspond to the message and handle
     #     that we are currently trying to deliver.
     # DOCDOC other callbacks
+    # DOCDOC active
+
     PROTOCOL_VERSIONS = [ '0.3' ]
     def __init__(self, context, ip, port, keyID, messageList, handleList,
                  sentCallback=None, failCallback=None, finishedCallback=None,
@@ -727,6 +738,7 @@
         self.junk = []
         self.messageList = []
         self.handleList = []
+        self.active = 1
 
         self.addMessages(messageList, handleList)
 
@@ -756,14 +768,19 @@
         self.finishedCallback = finishedCallback
         self.protocol = None
         self._curMessage = self._curHandle = None
-
+        
         debug("Opening client connection to %s", self.address)
 
+    def isActive(self):
+        """DOCDOC"""
+        return self.active
+
     def addMessages(self, messages, handles):
         """Given a list of messages and handles, as given to
            MMTPServer.__init__, cause this connection to deliver that new
            set of messages after it's done with those it's currently sending.
         """
+        assert self.active
         assert len(messages) == len(handles)
         for m,h in zip(messages, handles):
             if m in ("JUNK", "RENEGOTIATE"):
@@ -807,7 +824,7 @@
         """Called when we're done receiving the protocol string.  Begins
            sending a packet, or exits if we're done sending.
         """
-        inp = self.getInput()
+        inp = self.pullInput()
 
         for p in self.PROTOCOL_VERSIONS:
             if inp == 'MMTP %s\r\n'%p:
@@ -872,7 +889,7 @@
           Otherwise, begins shutting down.
        """
        trace("received ack from %s", self.address)
-       inp = self.getInput()
+       inp = self.pullInput()
        rejected = 0
        if inp == REJECTED_CONTROL+self.rejectDigest:
            debug("Message rejected from %s", self.address)
@@ -908,7 +925,15 @@
         self._messageList = self.handleList = []
         self._curMessage = self._curHandle = None
 
+    def shutdown(self, err=0, retriable=0):
+        #DOCDOC
+
+        self.active = 0
+        SimpleTLSConnection.shutdown(self, err=err, retriable=retriable)
+
     def remove(self):
+        """DOCDOC"""
+        self.active = 0
         if self.finishedCallback is not None:
             self.finishedCallback()
         self.finishedCallback = None
@@ -997,11 +1022,12 @@
         try:
             # Is there an existing connection open to the right server?
             con = self.clientConByAddr[(ip,port,keyID)]
-            LOG.debug("Queueing %s messages on open connection to %s",
-                      len(messages), con.address)
-            #XXXX004 check for possible race here!
-            con.addMessages(messages, handles)
-            return
+            # If so, is that connection currently sending messages?
+            if con.isActive():
+                LOG.debug("Queueing %s messages on open connection to %s",
+                          len(messages), con.address)
+                con.addMessages(messages, handles)
+                return
         except KeyError:
             pass