[Author Prev][Author Next][Thread Prev][Thread Next][Author Index][Thread Index]
[minion-cvs] Resolve a potential race; drop memory on input buffers ...
Update of /home/minion/cvsroot/src/minion/lib/mixminion/server
In directory moria.mit.edu:/tmp/cvs-serv8037/lib/mixminion/server
Modified Files:
MMTPServer.py
Log Message:
Resolve a potential race; drop memory on input buffers more aggressively.
Index: MMTPServer.py
===================================================================
RCS file: /home/minion/cvsroot/src/minion/lib/mixminion/server/MMTPServer.py,v
retrieving revision 1.34
retrieving revision 1.35
diff -u -d -r1.34 -r1.35
--- MMTPServer.py 3 Jun 2003 17:28:12 -0000 1.34
+++ MMTPServer.py 5 Jun 2003 01:52:52 -0000 1.35
@@ -262,6 +262,8 @@
self.lastActivity = time.time()
self.__serverMode = serverMode
self.__failed = 0
+ self.__inbuf = []
+ self.__inbuflen = 0
if serverMode:
self.__connecting = 0
@@ -298,7 +300,7 @@
'bytes' bytes. If terminator is provided, we read until we
encounter the terminator, but give up after 'bytes' bytes.
"""
- self.__inbuf = []
+ del self.__inbuf[:]
self.__inbuflen = 0
self.__expectReadLen = bytes
self.__terminator = terminator
@@ -507,6 +509,13 @@
"""Returns the current contents of the input buffer."""
return "".join(self.__inbuf)
+ def pullInput(self):
+ """DOCDOC"""
+ inp = "".join(self.__inbuf)
+ del self.__inbuf[:]
+ self.__inbuflen = 0
+ return inp
+
def getTLSConnection(self):
return self.__con
@@ -586,7 +595,7 @@
rejects, or sends our response.
"""
trace("done w/ client sendproto to %s", self.address)
- inp = self.getInput()
+ inp = self.pullInput()
m = PROTOCOL_RE.match(inp)
if not m:
@@ -619,7 +628,7 @@
def __receivedMessage(self):
"""Called once we've read a message from the line. Checks the
digest, and either rejects or begins sending an ACK."""
- data = self.getInput()
+ data = self.pullInput()
msg = data[SEND_CONTROL_LEN:-DIGEST_LEN]
digest = data[-DIGEST_LEN:]
@@ -699,6 +708,8 @@
# _curMessage, _curHandle: Correspond to the message and handle
# that we are currently trying to deliver.
# DOCDOC other callbacks
+ # DOCDOC active
+
PROTOCOL_VERSIONS = [ '0.3' ]
def __init__(self, context, ip, port, keyID, messageList, handleList,
sentCallback=None, failCallback=None, finishedCallback=None,
@@ -727,6 +738,7 @@
self.junk = []
self.messageList = []
self.handleList = []
+ self.active = 1
self.addMessages(messageList, handleList)
@@ -756,14 +768,19 @@
self.finishedCallback = finishedCallback
self.protocol = None
self._curMessage = self._curHandle = None
-
+
debug("Opening client connection to %s", self.address)
+ def isActive(self):
+ """DOCDOC"""
+ return self.active
+
def addMessages(self, messages, handles):
"""Given a list of messages and handles, as given to
MMTPServer.__init__, cause this connection to deliver that new
set of messages after it's done with those it's currently sending.
"""
+ assert self.active
assert len(messages) == len(handles)
for m,h in zip(messages, handles):
if m in ("JUNK", "RENEGOTIATE"):
@@ -807,7 +824,7 @@
"""Called when we're done receiving the protocol string. Begins
sending a packet, or exits if we're done sending.
"""
- inp = self.getInput()
+ inp = self.pullInput()
for p in self.PROTOCOL_VERSIONS:
if inp == 'MMTP %s\r\n'%p:
@@ -872,7 +889,7 @@
Otherwise, begins shutting down.
"""
trace("received ack from %s", self.address)
- inp = self.getInput()
+ inp = self.pullInput()
rejected = 0
if inp == REJECTED_CONTROL+self.rejectDigest:
debug("Message rejected from %s", self.address)
@@ -908,7 +925,15 @@
self._messageList = self.handleList = []
self._curMessage = self._curHandle = None
+ def shutdown(self, err=0, retriable=0):
+ #DOCDOC
+
+ self.active = 0
+ SimpleTLSConnection.shutdown(self, err=err, retriable=retriable)
+
def remove(self):
+ """DOCDOC"""
+ self.active = 0
if self.finishedCallback is not None:
self.finishedCallback()
self.finishedCallback = None
@@ -997,11 +1022,12 @@
try:
# Is there an existing connection open to the right server?
con = self.clientConByAddr[(ip,port,keyID)]
- LOG.debug("Queueing %s messages on open connection to %s",
- len(messages), con.address)
- #XXXX004 check for possible race here!
- con.addMessages(messages, handles)
- return
+ # If so, is that connection currently sending messages?
+ if con.isActive():
+ LOG.debug("Queueing %s messages on open connection to %s",
+ len(messages), con.address)
+ con.addMessages(messages, handles)
+ return
except KeyError:
pass