[Author Prev][Author Next][Thread Prev][Thread Next][Author Index][Thread Index]
[tor-commits] [stem/master] Breath asynchrony into the `BaseController`
commit 21ee7d288bdc1eacb7a9be805f5fcfcc28da8fc5
Author: Illia Volochii <illia.volochii@xxxxxxxxx>
Date: Sun Apr 12 19:42:40 2020 +0300
Breath asynchrony into the `BaseController`
---
stem/control.py | 25 +++++++++----------------
1 file changed, 9 insertions(+), 16 deletions(-)
diff --git a/stem/control.py b/stem/control.py
index a3b7d253..d592c4af 100644
--- a/stem/control.py
+++ b/stem/control.py
@@ -547,10 +547,6 @@ class BaseController(object):
self._asyncio_loop = asyncio.get_event_loop()
- self._asyncio_thread = threading.Thread(target = self._asyncio_loop.run_forever, name = 'asyncio')
- self._asyncio_thread.setDaemon(True)
- self._asyncio_thread.start()
-
self._msg_lock = threading.RLock()
self._status_listeners = [] # type: List[Tuple[Callable[[stem.control.BaseController, stem.control.State, float], None], bool]] # tuples of the form (callback, spawn_thread)
@@ -582,7 +578,7 @@ class BaseController(object):
if is_authenticated:
self._post_authentication()
- def msg(self, message: str) -> stem.response.ControlMessage:
+ async def msg(self, message: str) -> stem.response.ControlMessage:
"""
Sends a message to our control socket and provides back its reply.
@@ -648,7 +644,7 @@ class BaseController(object):
try:
self._asyncio_loop.create_task(self._socket.send(message))
- response = asyncio.run_coroutine_threadsafe(self._reply_queue.get(), self._asyncio_loop).result()
+ response = await self._reply_queue.get()
# If the message we received back had an exception then re-raise it to the
# caller. Otherwise return the response.
@@ -711,7 +707,7 @@ class BaseController(object):
return self._is_authenticated if self.is_alive() else False
- def connect(self) -> None:
+ async def connect(self) -> None:
"""
Reconnects our control socket. This is a pass-through for our socket's
:func:`~stem.socket.ControlSocket.connect` method.
@@ -719,15 +715,15 @@ class BaseController(object):
:raises: :class:`stem.SocketError` if unable to make a socket
"""
- asyncio.run_coroutine_threadsafe(self._socket.connect(), self._asyncio_loop).result()
+ await self._socket.connect()
- def close(self) -> None:
+ async def close(self) -> None:
"""
Closes our socket connection. This is a pass-through for our socket's
:func:`~stem.socket.BaseSocket.close` method.
"""
- asyncio.run_coroutine_threadsafe(self._socket.close(), self._asyncio_loop).result()
+ await self._socket.close()
# Join on any outstanding state change listeners. Closing is a state change
# of its own, so if we have any listeners it's quite likely there's some
@@ -740,9 +736,6 @@ class BaseController(object):
if t.is_alive() and threading.current_thread() != t:
t.join()
- self._asyncio_loop.call_soon_threadsafe(self._asyncio_loop.stop)
- self._asyncio_thread.join()
-
def get_socket(self) -> stem.socket.ControlSocket:
"""
Provides the socket used to speak with the tor process. Communicating with
@@ -815,11 +808,11 @@ class BaseController(object):
self._status_listeners = new_listeners
return is_changed
- def __enter__(self) -> 'stem.control.BaseController':
+ async def __aenter__(self) -> 'stem.control.BaseController':
return self
- def __exit__(self, exit_type: Optional[Type[BaseException]], value: Optional[BaseException], traceback: Optional[TracebackType]) -> None:
- self.close()
+ await def __aexit__(self, exit_type: Optional[Type[BaseException]], value: Optional[BaseException], traceback: Optional[TracebackType]) -> None:
+ await self.close()
def _handle_event(self, event_message: stem.response.ControlMessage) -> None:
"""
_______________________________________________
tor-commits mailing list
tor-commits@xxxxxxxxxxxxxxxxxxxx
https://lists.torproject.org/cgi-bin/mailman/listinfo/tor-commits