[Author Prev][Author Next][Thread Prev][Thread Next][Author Index][Thread Index]
[tor-commits] [stem/master] Synchronous mixin
commit 1cbf3397ccbb77ea7df35109839a522dcf03556b
Author: Damian Johnson <atagar@xxxxxxxxxxxxxx>
Date: Sat Jun 20 16:31:17 2020 -0700
Synchronous mixin
Illia's AsyncClassWrapper does the trick but I think we can make this more
transparent. Lets try a mixin that overwrites asyncio methods dynamically.
Earlier I added a AsyncClassWrapper.__del__() method to clean itself up,
but doing so was a mistake. When our Python interpreter shuts down asyncio
closes its scheduler *before* this method invokes, which makes join() hang
because loop.stop() never runs.
To avoid these deadlocks we need Synchronous (or AsyncClassWrapper) users to
explicitly close the class themself.
---
stem/util/__init__.py | 89 +++++++++++++++++++++++++++++++++++++++++++
test/settings.cfg | 1 +
test/unit/util/synchronous.py | 54 ++++++++++++++++++++++++++
3 files changed, 144 insertions(+)
diff --git a/stem/util/__init__.py b/stem/util/__init__.py
index 25282b99..72239273 100644
--- a/stem/util/__init__.py
+++ b/stem/util/__init__.py
@@ -7,6 +7,8 @@ Utility functions used by the stem library.
import asyncio
import datetime
+import functools
+import inspect
import threading
from concurrent.futures import Future
@@ -144,6 +146,93 @@ def _hash_attr(obj: Any, *attributes: str, **kwargs: Any):
return my_hash
+class Synchronous(object):
+ """
+ Mixin that lets a class be called from both synchronous and asynchronous
+ contexts.
+
+ ::
+
+ class Example(Synchronous):
+ async def hello(self):
+ return 'hello'
+
+ def sync_demo():
+ instance = Example()
+ print('%s from a synchronous context' % instance.hello())
+ instance.close()
+
+ async def async_demo():
+ instance = Example()
+ print('%s from an asynchronous context' % await instance.hello())
+ instance.close()
+
+ sync_demo()
+ asyncio.run(async_demo())
+
+ Users are responsible for calling :func:`~stem.util.Synchronous.close` when
+ finished to clean up underlying resources.
+ """
+
+ def __init__(self):
+ self._loop = asyncio.new_event_loop()
+ self._loop_lock = threading.RLock()
+ self._loop_thread = threading.Thread(
+ name = '%s asyncio' % self.__class__.__name__,
+ target = self._loop.run_forever,
+ daemon = True,
+ )
+
+ self._is_closed = False
+
+ # overwrite asynchronous class methods with instance methods that can be
+ # called from either context
+
+ def wrap(func, *args, **kwargs):
+ if Synchronous.is_asyncio_context():
+ return func(*args, **kwargs)
+ else:
+ with self._loop_lock:
+ if self._is_closed:
+ raise RuntimeError('%s has been closed' % type(self).__name__)
+ elif not self._loop_thread.is_alive():
+ self._loop_thread.start()
+
+ return asyncio.run_coroutine_threadsafe(func(*args, **kwargs), self._loop).result()
+
+ for method_name, func in inspect.getmembers(self, predicate = inspect.ismethod):
+ if inspect.iscoroutinefunction(func):
+ setattr(self, method_name, functools.partial(wrap, func))
+
+ def close(self):
+ """
+ Terminate resources that permits this from being callable from synchronous
+ contexts. Once called any further synchronous invocations will fail with a
+ **RuntimeError**.
+ """
+
+ with self._loop_lock:
+ if self._loop_thread.is_alive():
+ self._loop.call_soon_threadsafe(self._loop.stop)
+ self._loop_thread.join()
+
+ self._is_closed = True
+
+ @staticmethod
+ def is_asyncio_context():
+ """
+ Check if running within a synchronous or asynchronous context.
+
+ :returns: **True** if within an asyncio conext, **False** otherwise
+ """
+
+ try:
+ asyncio.get_running_loop()
+ return True
+ except RuntimeError:
+ return False
+
+
class AsyncClassWrapper:
_loop: asyncio.AbstractEventLoop
_loop_thread: threading.Thread
diff --git a/test/settings.cfg b/test/settings.cfg
index 51109f96..fcef5ec1 100644
--- a/test/settings.cfg
+++ b/test/settings.cfg
@@ -248,6 +248,7 @@ test.unit_tests
|test.unit.util.system.TestSystem
|test.unit.util.term.TestTerminal
|test.unit.util.tor_tools.TestTorTools
+|test.unit.util.synchronous.TestSynchronous
|test.unit.util.__init__.TestBaseUtil
|test.unit.installation.TestInstallation
|test.unit.descriptor.descriptor.TestDescriptor
diff --git a/test/unit/util/synchronous.py b/test/unit/util/synchronous.py
new file mode 100644
index 00000000..26dad98d
--- /dev/null
+++ b/test/unit/util/synchronous.py
@@ -0,0 +1,54 @@
+"""
+Unit tests for the stem.util.Synchronous class.
+"""
+
+import asyncio
+import io
+import unittest
+
+from unittest.mock import patch
+
+from stem.util import Synchronous
+
+EXAMPLE_OUTPUT = """\
+hello from a synchronous context
+hello from an asynchronous context
+"""
+
+
+class Example(Synchronous):
+ async def hello(self):
+ return 'hello'
+
+
+class TestSynchronous(unittest.TestCase):
+ @patch('sys.stdout', new_callable = io.StringIO)
+ def test_example(self, stdout_mock):
+ def sync_demo():
+ instance = Example()
+ print('%s from a synchronous context' % instance.hello())
+ instance.close()
+
+ async def async_demo():
+ instance = Example()
+ print('%s from an asynchronous context' % await instance.hello())
+ instance.close()
+
+ sync_demo()
+ asyncio.run(async_demo())
+
+ self.assertEqual(EXAMPLE_OUTPUT, stdout_mock.getvalue())
+
+ def test_after_close(self):
+ # close a used instance
+
+ instance = Example()
+ self.assertEqual('hello', instance.hello())
+ instance.close()
+ self.assertRaises(RuntimeError, instance.hello)
+
+ # close an unused instance
+
+ instance = Example()
+ instance.close()
+ self.assertRaises(RuntimeError, instance.hello)
_______________________________________________
tor-commits mailing list
tor-commits@xxxxxxxxxxxxxxxxxxxx
https://lists.torproject.org/cgi-bin/mailman/listinfo/tor-commits