[Author Prev][Author Next][Thread Prev][Thread Next][Author Index][Thread Index]

[tor-commits] [stem/master] Stop using a thread for processing asynchronous events



commit ac9311c275b1b81904f1d0f72efff919ee590330
Author: Illia Volochii <illia.volochii@xxxxxxxxx>
Date:   Sun Apr 12 19:20:48 2020 +0300

    Stop using a thread for processing asynchronous events
---
 stem/control.py                 | 34 ++++++++++------------------------
 test/unit/control/controller.py |  2 +-
 2 files changed, 11 insertions(+), 25 deletions(-)

diff --git a/stem/control.py b/stem/control.py
index 2d33bd54..a3b7d253 100644
--- a/stem/control.py
+++ b/stem/control.py
@@ -560,12 +560,7 @@ class BaseController(object):
     self._reply_queue = asyncio.Queue()  # type: asyncio.Queue[Union[stem.response.ControlMessage, stem.ControllerError]]
     self._event_queue = asyncio.Queue()  # type: asyncio.Queue[stem.response.ControlMessage]
 
-    # thread to continually pull from the control socket
-    self._reader_thread = None  # type: Optional[threading.Thread]
-
-    # thread to pull from the _event_queue and call handle_event
-    self._event_notice = threading.Event()
-    self._event_thread = None  # type: Optional[threading.Thread]
+    self._event_notice = asyncio.Event()
 
     # saves our socket's prior _connect() and _close() methods so they can be
     # called along with ours
@@ -582,7 +577,7 @@ class BaseController(object):
     self._state_change_threads = []  # type: List[threading.Thread] # threads we've spawned to notify of state changes
 
     if self._socket.is_alive():
-      self._launch_threads()
+      self._create_loop_tasks()
 
     if is_authenticated:
       self._post_authentication()
@@ -837,7 +832,7 @@ class BaseController(object):
     pass
 
   async def _connect(self) -> None:
-    self._launch_threads()
+    self._create_loop_tasks()
     self._notify_status_listeners(State.INIT)
     await self._socket_connect()
     self._is_authenticated = False
@@ -852,10 +847,6 @@ class BaseController(object):
 
     # joins on our threads if it's safe to do so
 
-    for t in (self._reader_thread, self._event_thread):
-      if t and t.is_alive() and threading.current_thread() != t:
-        t.join()
-
     self._notify_status_listeners(State.CLOSED)
 
     await self._socket_close()
@@ -909,22 +900,15 @@ class BaseController(object):
           else:
             listener(self, state, change_timestamp)
 
-  def _launch_threads(self) -> None:
+  def _create_loop_tasks(self) -> None:
     """
     Initializes daemon threads. Threads can't be reused so we need to recreate
     them if we're restarted.
     """
 
     self._asyncio_loop.create_task(self._reader_loop())
+    self._asyncio_loop.create_task(self._event_loop())
 
-    # In theory concurrent calls could result in multiple start() calls on a
-    # single thread, which would cause an unexpected exception. Best be safe.
-
-    with self._socket._get_send_lock():
-      if not self._event_thread or not self._event_thread.is_alive():
-        self._event_thread = threading.Thread(target = self._event_loop, name = 'Event notifier')
-        self._event_thread.setDaemon(True)
-        self._event_thread.start()
 
   async def _reader_loop(self) -> None:
     """
@@ -955,7 +939,7 @@ class BaseController(object):
 
         await self._reply_queue.put(exc)
 
-  def _event_loop(self) -> None:
+  async def _event_loop(self) -> None:
     """
     Continually pulls messages from the _event_queue and sends them to our
     handle_event callback. This is done via its own thread so subclasses with a
@@ -982,8 +966,10 @@ class BaseController(object):
         if not self.is_alive():
           break
 
-        self._event_notice.wait(0.05)
-        self._event_notice.clear()
+        try:
+          await asyncio.wait_for(self._event_notice.wait(), timeout=0.05)
+        except asyncio.TimeoutError:
+          self._event_notice.clear()
 
 
 class Controller(BaseController):
diff --git a/test/unit/control/controller.py b/test/unit/control/controller.py
index 02ed2774..c0a07e2a 100644
--- a/test/unit/control/controller.py
+++ b/test/unit/control/controller.py
@@ -707,7 +707,7 @@ class TestControl(unittest.TestCase):
     with patch('time.time', Mock(return_value = TEST_TIMESTAMP)):
       with patch('stem.control.Controller.is_alive') as is_alive_mock:
         is_alive_mock.return_value = True
-        self.controller._launch_threads()
+        self.controller._create_loop_tasks()
 
         try:
           # Converting an event back into an uncast ControlMessage, then feeding it



_______________________________________________
tor-commits mailing list
tor-commits@xxxxxxxxxxxxxxxxxxxx
https://lists.torproject.org/cgi-bin/mailman/listinfo/tor-commits