[Author Prev][Author Next][Thread Prev][Thread Next][Author Index][Thread Index]

[tor-commits] [stem/master] Fix message synchronization



commit 8d18e6bb83c02ecb1a98e4cafbde7113f4e55730
Author: Illia Volochii <illia.volochii@xxxxxxxxx>
Date:   Mon Apr 27 18:33:21 2020 +0300

    Fix message synchronization
---
 stem/control.py | 27 ++++++++++++++++++++++++---
 1 file changed, 24 insertions(+), 3 deletions(-)

diff --git a/stem/control.py b/stem/control.py
index 3eb706e0..d3e074b7 100644
--- a/stem/control.py
+++ b/stem/control.py
@@ -553,6 +553,27 @@ def event_description(event: str) -> str:
   return EVENT_DESCRIPTIONS.get(event.lower())
 
 
+class _MsgLock:
+  def __init__(self):
+    self._r_lock = threading.RLock()
+    self._async_lock = asyncio.Lock()
+
+  async def acquire(self):
+    await self._async_lock.acquire()
+    self._r_lock.acquire()
+
+  def release(self):
+    self._r_lock.release()
+    self._async_lock.release()
+
+  async def __aenter__(self):
+    await self.acquire()
+    return self
+
+  async def __aexit__(self, exc_type, exc_val, exc_tb):
+    self.release()
+
+
 class _BaseControllerSocketMixin:
   def is_alive(self) -> bool:
     """
@@ -622,7 +643,7 @@ class BaseController(_BaseControllerSocketMixin):
     self._asyncio_loop = asyncio.get_event_loop()
     self._asyncio_loop_tasks = []
 
-    self._msg_lock = threading.RLock()
+    self._msg_lock = _MsgLock()
 
     self._status_listeners = []  # type: List[Tuple[Callable[[stem.control.BaseController, stem.control.State, float], None], bool]] # tuples of the form (callback, spawn_thread)
     self._status_listeners_lock = threading.RLock()
@@ -669,7 +690,7 @@ class BaseController(_BaseControllerSocketMixin):
       * :class:`stem.SocketClosed` if the socket is shut down
     """
 
-    with self._msg_lock:
+    async with self._msg_lock:
       # If our _reply_queue isn't empty then one of a few things happened...
       #
       # - Our connection was closed and probably re-restablished. This was
@@ -1128,7 +1149,7 @@ class AsyncController(_ControllerClassMethodMixin, BaseController):
       * :class:`stem.connection.AuthenticationFailure` if unable to authenticate
     """
 
-    with self._msg_lock:
+    async with self._msg_lock:
       await self.connect()
       self.clear_cache()
       await self.authenticate(*args, **kwargs)



_______________________________________________
tor-commits mailing list
tor-commits@xxxxxxxxxxxxxxxxxxxx
https://lists.torproject.org/cgi-bin/mailman/listinfo/tor-commits