[Author Prev][Author Next][Thread Prev][Thread Next][Author Index][Thread Index]

[tor-commits] [stem/master] Fixing deadlock in BaseController



commit d5162f4e369f5d2226f0e5262f2b63025f9a68ec
Author: Damian Johnson <atagar@xxxxxxxxxxxxxx>
Date:   Sat Feb 18 18:15:07 2012 -0800

    Fixing deadlock in BaseController
    
    Found two concurrency bugs which were causing deadlock issues, and adding a
    test that's more likely to trigger connect() and close() concurrency issues.
    
    The issues were...
      * The recv() method calls close if the socket is still flagged as being
        alive. Unfortunately this can cause deadlock if the closing thread joins
        on the recv thread.
    
      * For some reason using a Condition rather than an Event caused the event
        loop to sometimes miss the notice that caused the event thread to close,
        causing its join() call to get stuck.
---
 stem/control.py                       |   15 +++++----------
 stem/socket.py                        |   31 ++++++++++++++++++++++---------
 test/integ/control/base_controller.py |   25 +++++++++++++------------
 3 files changed, 40 insertions(+), 31 deletions(-)

diff --git a/stem/control.py b/stem/control.py
index d5596f6..55c6e7a 100644
--- a/stem/control.py
+++ b/stem/control.py
@@ -99,7 +99,7 @@ class BaseController:
     self._reader_thread = None
     
     # thread to pull from the _event_queue and call handle_event
-    self._event_cond = threading.Condition()
+    self._event_notice = threading.Event()
     self._event_thread = None
     
     # saves our socket's prior _connect() and _close() methods so they can be
@@ -291,9 +291,7 @@ class BaseController:
     # awake from recv() raising a closure exception. Wake up the event thread
     # too so it can end.
     
-    self._event_cond.acquire()
-    self._event_cond.notifyAll()
-    self._event_cond.release()
+    self._event_notice.set()
     
     # joins on our threads if it's safe to do so
     
@@ -379,10 +377,8 @@ class BaseController:
         
         if control_message.content()[-1][0] == "650":
           # asynchronous message, adds to the event queue and wakes up its handler
-          self._event_cond.acquire()
           self._event_queue.put(control_message)
-          self._event_cond.notifyAll()
-          self._event_cond.release()
+          self._event_notice.set()
         else:
           # response to a msg() call
           self._reply_queue.put(control_message)
@@ -407,7 +403,6 @@ class BaseController:
         event_message = self._event_queue.get_nowait()
         self._handle_event(event_message)
       except Queue.Empty:
-        self._event_cond.acquire()
-        self._event_cond.wait()
-        self._event_cond.release()
+        self._event_notice.wait()
+        self._event_notice.clear()
 
diff --git a/stem/socket.py b/stem/socket.py
index 166c348..afea436 100644
--- a/stem/socket.py
+++ b/stem/socket.py
@@ -89,6 +89,9 @@ class ControlSocket:
     self._socket, self._socket_file = None, None
     self._is_alive = False
     
+    # indicates that we're in the midst of calling close()
+    self._handling_close = False
+    
     # Tracks sending and receiving separately. This should be safe, and doing
     # so prevents deadlock where we block writes because we're waiting to read
     # a message that isn't coming.
@@ -135,15 +138,22 @@ class ControlSocket:
         complete message
     """
     
-    try:
-      with self._recv_lock:
-        if not self.is_alive(): raise SocketClosed()
-        return recv_message(self._socket_file)
-    except SocketClosed, exc:
-      # if recv_message raises a SocketClosed then we should properly shut
-      # everything down
-      if self.is_alive(): self.close()
-      raise exc
+    with self._recv_lock:
+      try:
+        # makes a temporary reference to the _socket_file because connect()
+        # and close() may set or unset it
+        
+        socket_file = self._socket_file
+        
+        if not socket_file: raise SocketClosed()
+        return recv_message(socket_file)
+      except SocketClosed, exc:
+        # If recv_message raises a SocketClosed then we should properly shut
+        # everything down. However, if this was caused *from* a close call
+        # and it's joining on our thread then this would risk deadlock.
+        
+        if self.is_alive() and not self._handling_close: self.close()
+        raise exc
   
   def is_alive(self):
     """
@@ -197,6 +207,7 @@ class ControlSocket:
       # is causing our is_alive() state to change.
       
       is_change = self.is_alive()
+      self._handling_close = True
       
       if self._socket:
         # if we haven't yet established a connection then this raises an error
@@ -223,6 +234,8 @@ class ControlSocket:
       
       if is_change:
         self._close()
+      
+      self._handling_close = False
   
   def _get_send_lock(self):
     """
diff --git a/test/integ/control/base_controller.py b/test/integ/control/base_controller.py
index 2648496..4c6260d 100644
--- a/test/integ/control/base_controller.py
+++ b/test/integ/control/base_controller.py
@@ -38,8 +38,6 @@ class TestBaseController(unittest.TestCase):
     Basic sanity check for the from_port constructor.
     """
     
-    self.skipTest("work in progress")
-    
     if test.runner.Torrc.PORT in test.runner.get_runner().get_options():
       controller = stem.control.BaseController.from_port(control_port = test.runner.CONTROL_PORT)
       self.assertTrue(isinstance(controller, stem.control.BaseController))
@@ -51,21 +49,30 @@ class TestBaseController(unittest.TestCase):
     Basic sanity check for the from_socket_file constructor.
     """
     
-    self.skipTest("work in progress")
-    
     if test.runner.Torrc.SOCKET in test.runner.get_runner().get_options():
       controller = stem.control.BaseController.from_socket_file(test.runner.CONTROL_SOCKET_PATH)
       self.assertTrue(isinstance(controller, stem.control.BaseController))
     else:
       self.assertRaises(stem.socket.SocketError, stem.control.BaseController.from_socket_file, test.runner.CONTROL_SOCKET_PATH)
   
+  def test_connect_repeatedly(self):
+    """
+    Connects and closes the socket repeatedly. This is a simple attempt to
+    trigger concurrency issues.
+    """
+    
+    with test.runner.get_runner().get_tor_socket() as control_socket:
+      controller = stem.control.BaseController(control_socket)
+      
+      for i in xrange(250):
+        controller.connect()
+        controller.close()
+  
   def test_msg(self):
     """
     Tests a basic query with the msg() method.
     """
     
-    self.skipTest("work in progress")
-    
     runner = test.runner.get_runner()
     with runner.get_tor_socket() as control_socket:
       controller = stem.control.BaseController(control_socket)
@@ -79,8 +86,6 @@ class TestBaseController(unittest.TestCase):
     Tests the msg() method against an invalid controller command.
     """
     
-    self.skipTest("work in progress")
-    
     with test.runner.get_runner().get_tor_socket() as control_socket:
       controller = stem.control.BaseController(control_socket)
       response = controller.msg("invalid")
@@ -91,8 +96,6 @@ class TestBaseController(unittest.TestCase):
     Tests the msg() method against a non-existant GETINFO option.
     """
     
-    self.skipTest("work in progress")
-    
     with test.runner.get_runner().get_tor_socket() as control_socket:
       controller = stem.control.BaseController(control_socket)
       response = controller.msg("GETINFO blarg")
@@ -104,8 +107,6 @@ class TestBaseController(unittest.TestCase):
     remove_status_listener() methods.
     """
     
-    self.skipTest("work in progress")
-    
     state_observer = StateObserver()
     
     with test.runner.get_runner().get_tor_socket(False) as control_socket:

_______________________________________________
tor-commits mailing list
tor-commits@xxxxxxxxxxxxxxxxxxxx
https://lists.torproject.org/cgi-bin/mailman/listinfo/tor-commits