[Author Prev][Author Next][Thread Prev][Thread Next][Author Index][Thread Index]
[tor-commits] [stem/master] Use Synchronous for Query
commit 69be99b4aaa0ebdb038266103a7c9e748e38ef3b
Author: Damian Johnson <atagar@xxxxxxxxxxxxxx>
Date: Wed Jun 24 17:47:23 2020 -0700
Use Synchronous for Query
Time to use our mixin in practice. Good news is that it works and *greatly*
deduplicates our code, but it's not all sunshine and ponies...
* Query users now must call close(). This is a significant hassle in terms of
usability, and must be fixed prior to release. However, it'll require some
API adustments.
* Mypy's type checks assume that Synchronous users are calling Coroutines,
causing false positives when objects are used in a synchronous fashion.
---
stem/descriptor/remote.py | 351 ++++++++---------------------------------
stem/util/__init__.py | 28 +++-
test/unit/descriptor/remote.py | 61 ++++---
3 files changed, 134 insertions(+), 306 deletions(-)
diff --git a/stem/descriptor/remote.py b/stem/descriptor/remote.py
index f1ce79db..ad6a02ab 100644
--- a/stem/descriptor/remote.py
+++ b/stem/descriptor/remote.py
@@ -92,7 +92,6 @@ import time
import stem
import stem.client
-import stem.control
import stem.descriptor
import stem.descriptor.networkstatus
import stem.directory
@@ -100,8 +99,8 @@ import stem.util.enum
import stem.util.tor_tools
from stem.descriptor import Compression
-from stem.util import log, str_tools
-from typing import Any, AsyncIterator, Dict, Iterator, List, Optional, Sequence, Tuple, Union
+from stem.util import Synchronous, log, str_tools
+from typing import Any, AsyncIterator, Dict, List, Optional, Sequence, Tuple, Union
# Tor has a limited number of descriptors we can fetch explicitly by their
# fingerprint or hashes due to a limit on the url length by squid proxies.
@@ -227,7 +226,7 @@ def get_detached_signatures(**query_args: Any) -> 'stem.descriptor.remote.Query'
return get_instance().get_detached_signatures(**query_args)
-class AsyncQuery(object):
+class Query(Synchronous):
"""
Asynchronous request for descriptor content from a directory authority or
mirror. These can either be made through the
@@ -235,18 +234,18 @@ class AsyncQuery(object):
advanced usage.
To block on the response and get results either call
- :func:`~stem.descriptor.remote.AsyncQuery.run` or iterate over the Query. The
- :func:`~stem.descriptor.remote.AsyncQuery.run` method pass along any errors
- that arise...
+ :func:`~stem.descriptor.remote.Query.run` or iterate over the Query. The
+ :func:`~stem.descriptor.remote.Query.run` method pass along any errors that
+ arise...
::
- from stem.descriptor.remote import AsyncQuery
+ from stem.descriptor.remote import Query
print('Current relays:')
try:
- for desc in await AsyncQuery('/tor/server/all', 'server-descriptor 1.0').run():
+ for desc in await Query('/tor/server/all', 'server-descriptor 1.0').run():
print(desc.fingerprint)
except Exception as exc:
print('Unable to retrieve the server descriptors: %s' % exc)
@@ -257,7 +256,7 @@ class AsyncQuery(object):
print('Current relays:')
- async for desc in AsyncQuery('/tor/server/all', 'server-descriptor 1.0'):
+ async for desc in Query('/tor/server/all', 'server-descriptor 1.0'):
print(desc.fingerprint)
In either case exceptions are available via our 'error' attribute.
@@ -290,6 +289,39 @@ class AsyncQuery(object):
For legacy reasons if our resource has a '.z' suffix then our **compression**
argument is overwritten with Compression.GZIP.
+ .. versionchanged:: 1.7.0
+ Added support for downloading from ORPorts.
+
+ .. versionchanged:: 1.7.0
+ Added the compression argument.
+
+ .. versionchanged:: 1.7.0
+ Added the reply_headers attribute.
+
+ The class this provides changed between Python versions. In python2
+ this was called httplib.HTTPMessage, whereas in python3 the class was
+ renamed to http.client.HTTPMessage.
+
+ .. versionchanged:: 1.7.0
+ Avoid downloading from tor26. This directory authority throttles its
+ DirPort to such an extent that requests either time out or take on the
+ order of minutes.
+
+ .. versionchanged:: 1.7.0
+ Avoid downloading from Bifroest. This is the bridge authority so it
+ doesn't vote in the consensus, and apparently times out frequently.
+
+ .. versionchanged:: 1.8.0
+ Serge has replaced Bifroest as our bridge authority. Avoiding descriptor
+ downloads from it instead.
+
+ .. versionchanged:: 1.8.0
+ Defaulting to gzip compression rather than plaintext downloads.
+
+ .. versionchanged:: 1.8.0
+ Using :class:`~stem.descriptor.__init__.Compression` for our compression
+ argument.
+
:var str resource: resource being fetched, such as '/tor/server/all'
:var str descriptor_type: type of descriptors being fetched (for options see
:func:`~stem.descriptor.__init__.parse_file`), this is guessed from the
@@ -327,9 +359,15 @@ class AsyncQuery(object):
:var float timeout: duration before we'll time out our request
:var str download_url: last url used to download the descriptor, this is
unset until we've actually made a download attempt
+
+ :param start: start making the request when constructed (default is **True**)
+ :param block: only return after the request has been completed, this is
+ the same as running **query.run(True)** (default is **False**)
"""
- def __init__(self, resource: str, descriptor_type: Optional[str] = None, endpoints: Optional[Sequence[stem.Endpoint]] = None, compression: Union[stem.descriptor._Compression, Sequence[stem.descriptor._Compression]] = (Compression.GZIP,), retries: int = 2, fall_back_to_authority: bool = False, timeout: Optional[float] = None, validate: bool = False, document_handler: stem.descriptor.DocumentHandler = stem.descriptor.DocumentHandler.ENTRIES, **kwargs: Any) -> None:
+ def __init__(self, resource: str, descriptor_type: Optional[str] = None, endpoints: Optional[Sequence[stem.Endpoint]] = None, compression: Union[stem.descriptor._Compression, Sequence[stem.descriptor._Compression]] = (Compression.GZIP,), retries: int = 2, fall_back_to_authority: bool = False, timeout: Optional[float] = None, start: bool = True, block: bool = False, validate: bool = False, document_handler: stem.descriptor.DocumentHandler = stem.descriptor.DocumentHandler.ENTRIES, **kwargs: Any) -> None:
+ super(Query, self).__init__()
+
if not resource.startswith('/'):
raise ValueError("Resources should start with a '/': %s" % resource)
@@ -388,6 +426,12 @@ class AsyncQuery(object):
self._downloader_task = None # type: Optional[asyncio.Task]
self._downloader_lock = threading.RLock()
+ if start:
+ self.start()
+
+ if block:
+ self.run(True)
+
async def start(self) -> None:
"""
Starts downloading the scriptors if we haven't started already.
@@ -398,12 +442,14 @@ class AsyncQuery(object):
loop = asyncio.get_running_loop()
self._downloader_task = loop.create_task(self._download_descriptors(self.retries, self.timeout))
- async def run(self, suppress: bool = False) -> List['stem.descriptor.Descriptor']:
+ async def run(self, suppress: bool = False, close: bool = True) -> List['stem.descriptor.Descriptor']:
"""
Blocks until our request is complete then provides the descriptors. If we
haven't yet started our request then this does so.
:param suppress: avoids raising exceptions if **True**
+ :param close: terminates the resources backing this query if **True**,
+ further method calls will raise a RuntimeError
:returns: list for the requested :class:`~stem.descriptor.__init__.Descriptor` instances
@@ -416,7 +462,15 @@ class AsyncQuery(object):
* :class:`~stem.DownloadFailed` if our request fails
"""
- return [desc async for desc in self._run(suppress)]
+ # TODO: We should replace our 'close' argument with a new API design prior
+ # to release. Self-destructing this object by default for synchronous users
+ # is quite a step backward, but is acceptable as we iterate on this.
+
+ try:
+ return [desc async for desc in self._run(suppress)]
+ finally:
+ if close:
+ self._loop.call_soon_threadsafe(self._loop.stop)
async def _run(self, suppress: bool) -> AsyncIterator[stem.descriptor.Descriptor]:
with self._downloader_lock:
@@ -544,271 +598,6 @@ class AsyncQuery(object):
raise ValueError("BUG: endpoints can only be ORPorts or DirPorts, '%s' was a %s" % (endpoint, type(endpoint).__name__))
-class Query(stem.util.AsyncClassWrapper):
- """
- Asynchronous request for descriptor content from a directory authority or
- mirror. These can either be made through the
- :class:`~stem.descriptor.remote.DescriptorDownloader` or directly for more
- advanced usage.
-
- To block on the response and get results either call
- :func:`~stem.descriptor.remote.Query.run` or iterate over the Query. The
- :func:`~stem.descriptor.remote.Query.run` method pass along any errors that
- arise...
-
- ::
-
- from stem.descriptor.remote import Query
-
- print('Current relays:')
-
- try:
- for desc in Query('/tor/server/all', 'server-descriptor 1.0').run():
- print(desc.fingerprint)
- except Exception as exc:
- print('Unable to retrieve the server descriptors: %s' % exc)
-
- ... while iterating fails silently...
-
- ::
-
- print('Current relays:')
-
- for desc in Query('/tor/server/all', 'server-descriptor 1.0'):
- print(desc.fingerprint)
-
- In either case exceptions are available via our 'error' attribute.
-
- Tor provides quite a few different descriptor resources via its directory
- protocol (see section 4.2 and later of the `dir-spec
- <https://gitweb.torproject.org/torspec.git/tree/dir-spec.txt>`_).
- Commonly useful ones include...
-
- =============================================== ===========
- Resource Description
- =============================================== ===========
- /tor/server/all all present server descriptors
- /tor/server/fp/<fp1>+<fp2>+<fp3> server descriptors with the given fingerprints
- /tor/extra/all all present extrainfo descriptors
- /tor/extra/fp/<fp1>+<fp2>+<fp3> extrainfo descriptors with the given fingerprints
- /tor/micro/d/<hash1>-<hash2> microdescriptors with the given hashes
- /tor/status-vote/current/consensus present consensus
- /tor/status-vote/current/consensus-microdesc present microdescriptor consensus
- /tor/status-vote/next/bandwidth bandwidth authority heuristics for the next consenus
- /tor/status-vote/next/consensus-signatures detached signature, used for making the next consenus
- /tor/keys/all key certificates for the authorities
- /tor/keys/fp/<v3ident1>+<v3ident2> key certificates for specific authorities
- =============================================== ===========
-
- **ZSTD** compression requires `zstandard
- <https://pypi.org/project/zstandard/>`_, and **LZMA** requires the `lzma
- module <https://docs.python.org/3/library/lzma.html>`_.
-
- For legacy reasons if our resource has a '.z' suffix then our **compression**
- argument is overwritten with Compression.GZIP.
-
- .. versionchanged:: 1.7.0
- Added support for downloading from ORPorts.
-
- .. versionchanged:: 1.7.0
- Added the compression argument.
-
- .. versionchanged:: 1.7.0
- Added the reply_headers attribute.
-
- The class this provides changed between Python versions. In python2
- this was called httplib.HTTPMessage, whereas in python3 the class was
- renamed to http.client.HTTPMessage.
-
- .. versionchanged:: 1.7.0
- Avoid downloading from tor26. This directory authority throttles its
- DirPort to such an extent that requests either time out or take on the
- order of minutes.
-
- .. versionchanged:: 1.7.0
- Avoid downloading from Bifroest. This is the bridge authority so it
- doesn't vote in the consensus, and apparently times out frequently.
-
- .. versionchanged:: 1.8.0
- Serge has replaced Bifroest as our bridge authority. Avoiding descriptor
- downloads from it instead.
-
- .. versionchanged:: 1.8.0
- Defaulting to gzip compression rather than plaintext downloads.
-
- .. versionchanged:: 1.8.0
- Using :class:`~stem.descriptor.__init__.Compression` for our compression
- argument.
-
- :var str resource: resource being fetched, such as '/tor/server/all'
- :var str descriptor_type: type of descriptors being fetched (for options see
- :func:`~stem.descriptor.__init__.parse_file`), this is guessed from the
- resource if **None**
-
- :var list endpoints: :class:`~stem.DirPort` or :class:`~stem.ORPort` of the
- authority or mirror we're querying, this uses authorities if undefined
- :var list compression: list of :data:`stem.descriptor.Compression`
- we're willing to accept, when none are mutually supported downloads fall
- back to Compression.PLAINTEXT
- :var int retries: number of times to attempt the request if downloading it
- fails
- :var bool fall_back_to_authority: when retrying request issues the last
- request to a directory authority if **True**
-
- :var str content: downloaded descriptor content
- :var Exception error: exception if a problem occured
- :var bool is_done: flag that indicates if our request has finished
-
- :var float start_time: unix timestamp when we first started running
- :var http.client.HTTPMessage reply_headers: headers provided in the response,
- **None** if we haven't yet made our request
- :var float runtime: time our query took, this is **None** if it's not yet
- finished
-
- :var bool validate: checks the validity of the descriptor's content if
- **True**, skips these checks otherwise
- :var stem.descriptor.__init__.DocumentHandler document_handler: method in
- which to parse a :class:`~stem.descriptor.networkstatus.NetworkStatusDocument`
- :var dict kwargs: additional arguments for the descriptor constructor
-
- Following are only applicable when downloading from a
- :class:`~stem.DirPort`...
-
- :var float timeout: duration before we'll time out our request
- :var str download_url: last url used to download the descriptor, this is
- unset until we've actually made a download attempt
-
- :param start: start making the request when constructed (default is **True**)
- :param block: only return after the request has been completed, this is
- the same as running **query.run(True)** (default is **False**)
- """
-
- def __init__(self, resource: str, descriptor_type: Optional[str] = None, endpoints: Optional[Sequence[stem.Endpoint]] = None, compression: Union[stem.descriptor._Compression, Sequence[stem.descriptor._Compression]] = (Compression.GZIP,), retries: int = 2, fall_back_to_authority: bool = False, timeout: Optional[float] = None, start: bool = True, block: bool = False, validate: bool = False, document_handler: stem.descriptor.DocumentHandler = stem.descriptor.DocumentHandler.ENTRIES, **kwargs: Any) -> None:
- self._loop = asyncio.new_event_loop()
- self._loop_thread = threading.Thread(target = self._loop.run_forever, name = 'query asyncio')
- self._loop_thread.setDaemon(True)
- self._loop_thread.start()
-
- self._wrapped_instance: AsyncQuery = self._init_async_class( # type: ignore
- AsyncQuery,
- resource,
- descriptor_type,
- endpoints,
- compression,
- retries,
- fall_back_to_authority,
- timeout,
- validate,
- document_handler,
- **kwargs,
- )
-
- if start:
- self.start()
-
- if block:
- self.run(True)
-
- def start(self) -> None:
- """
- Starts downloading the scriptors if we haven't started already.
- """
-
- self._execute_async_method('start')
-
- def run(self, suppress = False) -> List['stem.descriptor.Descriptor']:
- """
- Blocks until our request is complete then provides the descriptors. If we
- haven't yet started our request then this does so.
-
- :param suppress: avoids raising exceptions if **True**
-
- :returns: list for the requested :class:`~stem.descriptor.__init__.Descriptor` instances
-
- :raises:
- Using the iterator can fail with the following if **suppress** is
- **False**...
-
- * **ValueError** if the descriptor contents is malformed
- * :class:`~stem.DownloadTimeout` if our request timed out
- * :class:`~stem.DownloadFailed` if our request fails
- """
-
- return self._execute_async_method('run', suppress)
-
- def __iter__(self) -> Iterator[stem.descriptor.Descriptor]:
- for desc in self._execute_async_generator_method('__aiter__'):
- yield desc
-
- @property
- def descriptor_type(self) -> str:
- return self._wrapped_instance.descriptor_type
-
- @property
- def endpoints(self) -> List[Union[stem.ORPort, stem.DirPort]]:
- return self._wrapped_instance.endpoints
-
- @property
- def resource(self) -> str:
- return self._wrapped_instance.resource
-
- @property
- def compression(self) -> List[stem.descriptor._Compression]:
- return self._wrapped_instance.compression
-
- @property
- def retries(self) -> int:
- return self._wrapped_instance.retries
-
- @property
- def fall_back_to_authority(self) -> bool:
- return self._wrapped_instance.fall_back_to_authority
-
- @property
- def content(self) -> Optional[bytes]:
- return self._wrapped_instance.content
-
- @property
- def error(self) -> Optional[BaseException]:
- return self._wrapped_instance.error
-
- @property
- def is_done(self) -> bool:
- return self._wrapped_instance.is_done
-
- @property
- def download_url(self) -> Optional[str]:
- return self._wrapped_instance.download_url
-
- @property
- def start_time(self) -> Optional[float]:
- return self._wrapped_instance.start_time
-
- @property
- def timeout(self) -> Optional[float]:
- return self._wrapped_instance.timeout
-
- @property
- def runtime(self) -> Optional[float]:
- return self._wrapped_instance.runtime
-
- @property
- def validate(self) -> bool:
- return self._wrapped_instance.validate
-
- @property
- def document_handler(self) -> stem.descriptor.DocumentHandler:
- return self._wrapped_instance.document_handler
-
- @property
- def reply_headers(self) -> Optional[Dict[str, str]]:
- return self._wrapped_instance.reply_headers
-
- @property
- def kwargs(self) -> Dict[str, Any]:
- return self._wrapped_instance.kwargs
-
-
class DescriptorDownloader(object):
"""
Configurable class that issues :class:`~stem.descriptor.remote.Query`
@@ -848,7 +637,7 @@ class DescriptorDownloader(object):
directories = [auth for auth in stem.directory.Authority.from_cache().values() if auth.nickname not in DIR_PORT_BLACKLIST]
new_endpoints = set([stem.DirPort(directory.address, directory.dir_port) for directory in directories])
- consensus = list(self.get_consensus(document_handler = stem.descriptor.DocumentHandler.DOCUMENT).run())[0]
+ consensus = list(self.get_consensus(document_handler = stem.descriptor.DocumentHandler.DOCUMENT).run())[0] # type: ignore
for desc in consensus.routers.values():
if stem.Flag.V2DIR in desc.flags and desc.dir_port:
@@ -858,7 +647,7 @@ class DescriptorDownloader(object):
self._endpoints = list(new_endpoints)
- return consensus # type: ignore
+ return consensus
def their_server_descriptor(self, **query_args: Any) -> 'stem.descriptor.remote.Query':
"""
@@ -1013,7 +802,7 @@ class DescriptorDownloader(object):
# authority key certificates
if consensus_query.validate and consensus_query.document_handler == stem.descriptor.DocumentHandler.DOCUMENT:
- consensus = list(consensus_query.run())[0]
+ consensus = list(consensus_query.run())[0] # type: ignore
key_certs = self.get_key_certificates(**query_args).run()
try:
diff --git a/stem/util/__init__.py b/stem/util/__init__.py
index 72239273..e8ef361e 100644
--- a/stem/util/__init__.py
+++ b/stem/util/__init__.py
@@ -12,7 +12,7 @@ import inspect
import threading
from concurrent.futures import Future
-from typing import Any, AsyncIterator, Iterator, Type, Union
+from typing import Any, AsyncIterator, Callable, Iterator, Type, Union
__all__ = [
'conf',
@@ -116,7 +116,7 @@ def _pubkey_bytes(key: Union['cryptography.hazmat.primitives.asymmetric.ed25519.
raise ValueError('Key must be a string or cryptographic public/private key (was %s)' % type(key).__name__)
-def _hash_attr(obj: Any, *attributes: str, **kwargs: Any):
+def _hash_attr(obj: Any, *attributes: str, **kwargs: Any) -> int:
"""
Provide a hash value for the given set of attributes.
@@ -124,6 +124,8 @@ def _hash_attr(obj: Any, *attributes: str, **kwargs: Any):
:param attributes: attribute names to take into account
:param cache: persists hash in a '_cached_hash' object attribute
:param parent: include parent's hash value
+
+ :returns: **int** object hash
"""
is_cached = kwargs.get('cache', False)
@@ -174,11 +176,11 @@ class Synchronous(object):
finished to clean up underlying resources.
"""
- def __init__(self):
+ def __init__(self) -> None:
self._loop = asyncio.new_event_loop()
self._loop_lock = threading.RLock()
self._loop_thread = threading.Thread(
- name = '%s asyncio' % self.__class__.__name__,
+ name = '%s asyncio' % type(self).__name__,
target = self._loop.run_forever,
daemon = True,
)
@@ -188,7 +190,7 @@ class Synchronous(object):
# overwrite asynchronous class methods with instance methods that can be
# called from either context
- def wrap(func, *args, **kwargs):
+ def wrap(func: Callable, *args: Any, **kwargs: Any) -> Any:
if Synchronous.is_asyncio_context():
return func(*args, **kwargs)
else:
@@ -204,7 +206,7 @@ class Synchronous(object):
if inspect.iscoroutinefunction(func):
setattr(self, method_name, functools.partial(wrap, func))
- def close(self):
+ def close(self) -> None:
"""
Terminate resources that permits this from being callable from synchronous
contexts. Once called any further synchronous invocations will fail with a
@@ -219,7 +221,7 @@ class Synchronous(object):
self._is_closed = True
@staticmethod
- def is_asyncio_context():
+ def is_asyncio_context() -> bool:
"""
Check if running within a synchronous or asynchronous context.
@@ -232,6 +234,18 @@ class Synchronous(object):
except RuntimeError:
return False
+ def __iter__(self) -> Iterator:
+ async def convert_async_generator(generator: AsyncIterator) -> Iterator:
+ return iter([d async for d in generator])
+
+ iter_func = getattr(self, '__aiter__')
+
+ if iter_func:
+ with self._loop_lock:
+ return asyncio.run_coroutine_threadsafe(convert_async_generator(iter_func()), self._loop).result()
+ else:
+ raise TypeError("'%s' object is not iterable" % type(self).__name__)
+
class AsyncClassWrapper:
_loop: asyncio.AbstractEventLoop
diff --git a/test/unit/descriptor/remote.py b/test/unit/descriptor/remote.py
index 797bc8a3..1fd2aaf9 100644
--- a/test/unit/descriptor/remote.py
+++ b/test/unit/descriptor/remote.py
@@ -70,7 +70,7 @@ def mock_download(descriptor, encoding = 'identity', response_code_header = None
data = response_code_header + stem.util.str_tools._to_bytes(HEADER % encoding) + b'\r\n\r\n' + descriptor
- return patch('stem.descriptor.remote.AsyncQuery._download_from', Mock(side_effect = coro_func_returning_value(data)))
+ return patch('stem.descriptor.remote.Query._download_from', Mock(side_effect = coro_func_returning_value(data)))
class TestDescriptorDownloader(unittest.TestCase):
@@ -100,6 +100,8 @@ class TestDescriptorDownloader(unittest.TestCase):
self.assertEqual('9695DFC35FFEB861329B9F1AB04C46397020CE31', desc.fingerprint)
self.assertEqual(TEST_DESCRIPTOR, desc.get_bytes())
+ reply.close()
+
def test_response_header_code(self):
"""
When successful Tor provides a '200 OK' status, but we should accept other 2xx
@@ -133,7 +135,7 @@ class TestDescriptorDownloader(unittest.TestCase):
def test_reply_header_data(self):
query = stem.descriptor.remote.get_server_descriptors('9695DFC35FFEB861329B9F1AB04C46397020CE31', start = False)
self.assertEqual(None, query.reply_headers) # initially we don't have a reply
- query.run()
+ query.run(close = False)
self.assertEqual('Fri, 13 Apr 2018 16:35:50 GMT', query.reply_headers.get('Date'))
self.assertEqual('application/octet-stream', query.reply_headers.get('Content-Type'))
@@ -148,11 +150,13 @@ class TestDescriptorDownloader(unittest.TestCase):
descriptors = list(query)
self.assertEqual(1, len(descriptors))
self.assertEqual('moria1', descriptors[0].nickname)
+ query.close()
def test_gzip_url_override(self):
query = stem.descriptor.remote.Query(TEST_RESOURCE + '.z', compression = Compression.PLAINTEXT, start = False)
self.assertEqual([stem.descriptor.Compression.GZIP], query.compression)
self.assertEqual(TEST_RESOURCE, query.resource)
+ query.close()
@mock_download(read_resource('compressed_identity'), encoding = 'identity')
def test_compression_plaintext(self):
@@ -160,12 +164,15 @@ class TestDescriptorDownloader(unittest.TestCase):
Download a plaintext descriptor.
"""
- descriptors = list(stem.descriptor.remote.get_server_descriptors(
+ query = stem.descriptor.remote.get_server_descriptors(
'9695DFC35FFEB861329B9F1AB04C46397020CE31',
compression = Compression.PLAINTEXT,
validate = True,
skip_crypto_validation = not test.require.CRYPTOGRAPHY_AVAILABLE,
- ))
+ )
+
+ descriptors = list(query)
+ query.close()
self.assertEqual(1, len(descriptors))
self.assertEqual('moria1', descriptors[0].nickname)
@@ -176,12 +183,15 @@ class TestDescriptorDownloader(unittest.TestCase):
Download a gip compressed descriptor.
"""
- descriptors = list(stem.descriptor.remote.get_server_descriptors(
+ query = stem.descriptor.remote.get_server_descriptors(
'9695DFC35FFEB861329B9F1AB04C46397020CE31',
compression = Compression.GZIP,
validate = True,
skip_crypto_validation = not test.require.CRYPTOGRAPHY_AVAILABLE,
- ))
+ )
+
+ descriptors = list(query)
+ query.close()
self.assertEqual(1, len(descriptors))
self.assertEqual('moria1', descriptors[0].nickname)
@@ -195,11 +205,14 @@ class TestDescriptorDownloader(unittest.TestCase):
if not Compression.ZSTD.available:
self.skipTest('(requires zstd module)')
- descriptors = list(stem.descriptor.remote.get_server_descriptors(
+ query = stem.descriptor.remote.get_server_descriptors(
'9695DFC35FFEB861329B9F1AB04C46397020CE31',
compression = Compression.ZSTD,
validate = True,
- ))
+ )
+
+ descriptors = list(query)
+ query.close()
self.assertEqual(1, len(descriptors))
self.assertEqual('moria1', descriptors[0].nickname)
@@ -213,11 +226,14 @@ class TestDescriptorDownloader(unittest.TestCase):
if not Compression.LZMA.available:
self.skipTest('(requires lzma module)')
- descriptors = list(stem.descriptor.remote.get_server_descriptors(
+ query = stem.descriptor.remote.get_server_descriptors(
'9695DFC35FFEB861329B9F1AB04C46397020CE31',
compression = Compression.LZMA,
validate = True,
- ))
+ )
+
+ descriptors = list(query)
+ query.close()
self.assertEqual(1, len(descriptors))
self.assertEqual('moria1', descriptors[0].nickname)
@@ -228,16 +244,21 @@ class TestDescriptorDownloader(unittest.TestCase):
Surface level exercising of each getter method for downloading descriptors.
"""
+ queries = []
+
downloader = stem.descriptor.remote.get_instance()
- downloader.get_server_descriptors()
- downloader.get_extrainfo_descriptors()
- downloader.get_microdescriptors('test-hash')
- downloader.get_consensus()
- downloader.get_vote(stem.directory.Authority.from_cache()['moria1'])
- downloader.get_key_certificates()
- downloader.get_bandwidth_file()
- downloader.get_detached_signatures()
+ queries.append(downloader.get_server_descriptors())
+ queries.append(downloader.get_extrainfo_descriptors())
+ queries.append(downloader.get_microdescriptors('test-hash'))
+ queries.append(downloader.get_consensus())
+ queries.append(downloader.get_vote(stem.directory.Authority.from_cache()['moria1']))
+ queries.append(downloader.get_key_certificates())
+ queries.append(downloader.get_bandwidth_file())
+ queries.append(downloader.get_detached_signatures())
+
+ for query in queries:
+ query.close()
@mock_download(b'some malformed stuff')
def test_malformed_content(self):
@@ -264,6 +285,8 @@ class TestDescriptorDownloader(unittest.TestCase):
self.assertRaises(ValueError, query.run)
+ query.close()
+
def test_query_with_invalid_endpoints(self):
invalid_endpoints = {
'hello': "'h' is a str.",
@@ -292,3 +315,5 @@ class TestDescriptorDownloader(unittest.TestCase):
self.assertEqual(1, len(list(query)))
self.assertEqual(1, len(list(query)))
self.assertEqual(1, len(list(query)))
+
+ query.close()
_______________________________________________
tor-commits mailing list
tor-commits@xxxxxxxxxxxxxxxxxxxx
https://lists.torproject.org/cgi-bin/mailman/listinfo/tor-commits