[Author Prev][Author Next][Thread Prev][Thread Next][Author Index][Thread Index]
[tor-commits] [stem/master] Fixing deadlock in BaseController
commit d5162f4e369f5d2226f0e5262f2b63025f9a68ec
Author: Damian Johnson <atagar@xxxxxxxxxxxxxx>
Date: Sat Feb 18 18:15:07 2012 -0800
Fixing deadlock in BaseController
Found two concurrency bugs which were causing deadlock issues, and adding a
test that's more likely to trigger connect() and close() concurrency issues.
The issues were...
* The recv() method calls close if the socket is still flagged as being
alive. Unfortunately this can cause deadlock if the closing thread joins
on the recv thread.
* For some reason using a Condition rather than an Event caused the event
loop to sometimes miss the notice that caused the event thread to close,
causing its join() call to get stuck.
---
stem/control.py | 15 +++++----------
stem/socket.py | 31 ++++++++++++++++++++++---------
test/integ/control/base_controller.py | 25 +++++++++++++------------
3 files changed, 40 insertions(+), 31 deletions(-)
diff --git a/stem/control.py b/stem/control.py
index d5596f6..55c6e7a 100644
--- a/stem/control.py
+++ b/stem/control.py
@@ -99,7 +99,7 @@ class BaseController:
self._reader_thread = None
# thread to pull from the _event_queue and call handle_event
- self._event_cond = threading.Condition()
+ self._event_notice = threading.Event()
self._event_thread = None
# saves our socket's prior _connect() and _close() methods so they can be
@@ -291,9 +291,7 @@ class BaseController:
# awake from recv() raising a closure exception. Wake up the event thread
# too so it can end.
- self._event_cond.acquire()
- self._event_cond.notifyAll()
- self._event_cond.release()
+ self._event_notice.set()
# joins on our threads if it's safe to do so
@@ -379,10 +377,8 @@ class BaseController:
if control_message.content()[-1][0] == "650":
# asynchronous message, adds to the event queue and wakes up its handler
- self._event_cond.acquire()
self._event_queue.put(control_message)
- self._event_cond.notifyAll()
- self._event_cond.release()
+ self._event_notice.set()
else:
# response to a msg() call
self._reply_queue.put(control_message)
@@ -407,7 +403,6 @@ class BaseController:
event_message = self._event_queue.get_nowait()
self._handle_event(event_message)
except Queue.Empty:
- self._event_cond.acquire()
- self._event_cond.wait()
- self._event_cond.release()
+ self._event_notice.wait()
+ self._event_notice.clear()
diff --git a/stem/socket.py b/stem/socket.py
index 166c348..afea436 100644
--- a/stem/socket.py
+++ b/stem/socket.py
@@ -89,6 +89,9 @@ class ControlSocket:
self._socket, self._socket_file = None, None
self._is_alive = False
+ # indicates that we're in the midst of calling close()
+ self._handling_close = False
+
# Tracks sending and receiving separately. This should be safe, and doing
# so prevents deadlock where we block writes because we're waiting to read
# a message that isn't coming.
@@ -135,15 +138,22 @@ class ControlSocket:
complete message
"""
- try:
- with self._recv_lock:
- if not self.is_alive(): raise SocketClosed()
- return recv_message(self._socket_file)
- except SocketClosed, exc:
- # if recv_message raises a SocketClosed then we should properly shut
- # everything down
- if self.is_alive(): self.close()
- raise exc
+ with self._recv_lock:
+ try:
+ # makes a temporary reference to the _socket_file because connect()
+ # and close() may set or unset it
+
+ socket_file = self._socket_file
+
+ if not socket_file: raise SocketClosed()
+ return recv_message(socket_file)
+ except SocketClosed, exc:
+ # If recv_message raises a SocketClosed then we should properly shut
+ # everything down. However, if this was caused *from* a close call
+ # and it's joining on our thread then this would risk deadlock.
+
+ if self.is_alive() and not self._handling_close: self.close()
+ raise exc
def is_alive(self):
"""
@@ -197,6 +207,7 @@ class ControlSocket:
# is causing our is_alive() state to change.
is_change = self.is_alive()
+ self._handling_close = True
if self._socket:
# if we haven't yet established a connection then this raises an error
@@ -223,6 +234,8 @@ class ControlSocket:
if is_change:
self._close()
+
+ self._handling_close = False
def _get_send_lock(self):
"""
diff --git a/test/integ/control/base_controller.py b/test/integ/control/base_controller.py
index 2648496..4c6260d 100644
--- a/test/integ/control/base_controller.py
+++ b/test/integ/control/base_controller.py
@@ -38,8 +38,6 @@ class TestBaseController(unittest.TestCase):
Basic sanity check for the from_port constructor.
"""
- self.skipTest("work in progress")
-
if test.runner.Torrc.PORT in test.runner.get_runner().get_options():
controller = stem.control.BaseController.from_port(control_port = test.runner.CONTROL_PORT)
self.assertTrue(isinstance(controller, stem.control.BaseController))
@@ -51,21 +49,30 @@ class TestBaseController(unittest.TestCase):
Basic sanity check for the from_socket_file constructor.
"""
- self.skipTest("work in progress")
-
if test.runner.Torrc.SOCKET in test.runner.get_runner().get_options():
controller = stem.control.BaseController.from_socket_file(test.runner.CONTROL_SOCKET_PATH)
self.assertTrue(isinstance(controller, stem.control.BaseController))
else:
self.assertRaises(stem.socket.SocketError, stem.control.BaseController.from_socket_file, test.runner.CONTROL_SOCKET_PATH)
+ def test_connect_repeatedly(self):
+ """
+ Connects and closes the socket repeatedly. This is a simple attempt to
+ trigger concurrency issues.
+ """
+
+ with test.runner.get_runner().get_tor_socket() as control_socket:
+ controller = stem.control.BaseController(control_socket)
+
+ for i in xrange(250):
+ controller.connect()
+ controller.close()
+
def test_msg(self):
"""
Tests a basic query with the msg() method.
"""
- self.skipTest("work in progress")
-
runner = test.runner.get_runner()
with runner.get_tor_socket() as control_socket:
controller = stem.control.BaseController(control_socket)
@@ -79,8 +86,6 @@ class TestBaseController(unittest.TestCase):
Tests the msg() method against an invalid controller command.
"""
- self.skipTest("work in progress")
-
with test.runner.get_runner().get_tor_socket() as control_socket:
controller = stem.control.BaseController(control_socket)
response = controller.msg("invalid")
@@ -91,8 +96,6 @@ class TestBaseController(unittest.TestCase):
Tests the msg() method against a non-existant GETINFO option.
"""
- self.skipTest("work in progress")
-
with test.runner.get_runner().get_tor_socket() as control_socket:
controller = stem.control.BaseController(control_socket)
response = controller.msg("GETINFO blarg")
@@ -104,8 +107,6 @@ class TestBaseController(unittest.TestCase):
remove_status_listener() methods.
"""
- self.skipTest("work in progress")
-
state_observer = StateObserver()
with test.runner.get_runner().get_tor_socket(False) as control_socket:
_______________________________________________
tor-commits mailing list
tor-commits@xxxxxxxxxxxxxxxxxxxx
https://lists.torproject.org/cgi-bin/mailman/listinfo/tor-commits