[Author Prev][Author Next][Thread Prev][Thread Next][Author Index][Thread Index]
[tor-commits] [stem/master] Stop using a thread for processing asynchronous events
commit ac9311c275b1b81904f1d0f72efff919ee590330
Author: Illia Volochii <illia.volochii@xxxxxxxxx>
Date: Sun Apr 12 19:20:48 2020 +0300
Stop using a thread for processing asynchronous events
---
stem/control.py | 34 ++++++++++------------------------
test/unit/control/controller.py | 2 +-
2 files changed, 11 insertions(+), 25 deletions(-)
diff --git a/stem/control.py b/stem/control.py
index 2d33bd54..a3b7d253 100644
--- a/stem/control.py
+++ b/stem/control.py
@@ -560,12 +560,7 @@ class BaseController(object):
self._reply_queue = asyncio.Queue() # type: asyncio.Queue[Union[stem.response.ControlMessage, stem.ControllerError]]
self._event_queue = asyncio.Queue() # type: asyncio.Queue[stem.response.ControlMessage]
- # thread to continually pull from the control socket
- self._reader_thread = None # type: Optional[threading.Thread]
-
- # thread to pull from the _event_queue and call handle_event
- self._event_notice = threading.Event()
- self._event_thread = None # type: Optional[threading.Thread]
+ self._event_notice = asyncio.Event()
# saves our socket's prior _connect() and _close() methods so they can be
# called along with ours
@@ -582,7 +577,7 @@ class BaseController(object):
self._state_change_threads = [] # type: List[threading.Thread] # threads we've spawned to notify of state changes
if self._socket.is_alive():
- self._launch_threads()
+ self._create_loop_tasks()
if is_authenticated:
self._post_authentication()
@@ -837,7 +832,7 @@ class BaseController(object):
pass
async def _connect(self) -> None:
- self._launch_threads()
+ self._create_loop_tasks()
self._notify_status_listeners(State.INIT)
await self._socket_connect()
self._is_authenticated = False
@@ -852,10 +847,6 @@ class BaseController(object):
# joins on our threads if it's safe to do so
- for t in (self._reader_thread, self._event_thread):
- if t and t.is_alive() and threading.current_thread() != t:
- t.join()
-
self._notify_status_listeners(State.CLOSED)
await self._socket_close()
@@ -909,22 +900,15 @@ class BaseController(object):
else:
listener(self, state, change_timestamp)
- def _launch_threads(self) -> None:
+ def _create_loop_tasks(self) -> None:
"""
Initializes daemon threads. Threads can't be reused so we need to recreate
them if we're restarted.
"""
self._asyncio_loop.create_task(self._reader_loop())
+ self._asyncio_loop.create_task(self._event_loop())
- # In theory concurrent calls could result in multiple start() calls on a
- # single thread, which would cause an unexpected exception. Best be safe.
-
- with self._socket._get_send_lock():
- if not self._event_thread or not self._event_thread.is_alive():
- self._event_thread = threading.Thread(target = self._event_loop, name = 'Event notifier')
- self._event_thread.setDaemon(True)
- self._event_thread.start()
async def _reader_loop(self) -> None:
"""
@@ -955,7 +939,7 @@ class BaseController(object):
await self._reply_queue.put(exc)
- def _event_loop(self) -> None:
+ async def _event_loop(self) -> None:
"""
Continually pulls messages from the _event_queue and sends them to our
handle_event callback. This is done via its own thread so subclasses with a
@@ -982,8 +966,10 @@ class BaseController(object):
if not self.is_alive():
break
- self._event_notice.wait(0.05)
- self._event_notice.clear()
+ try:
+ await asyncio.wait_for(self._event_notice.wait(), timeout=0.05)
+ except asyncio.TimeoutError:
+ self._event_notice.clear()
class Controller(BaseController):
diff --git a/test/unit/control/controller.py b/test/unit/control/controller.py
index 02ed2774..c0a07e2a 100644
--- a/test/unit/control/controller.py
+++ b/test/unit/control/controller.py
@@ -707,7 +707,7 @@ class TestControl(unittest.TestCase):
with patch('time.time', Mock(return_value = TEST_TIMESTAMP)):
with patch('stem.control.Controller.is_alive') as is_alive_mock:
is_alive_mock.return_value = True
- self.controller._launch_threads()
+ self.controller._create_loop_tasks()
try:
# Converting an event back into an uncast ControlMessage, then feeding it
_______________________________________________
tor-commits mailing list
tor-commits@xxxxxxxxxxxxxxxxxxxx
https://lists.torproject.org/cgi-bin/mailman/listinfo/tor-commits