[Author Prev][Author Next][Thread Prev][Thread Next][Author Index][Thread Index]
[tor-commits] [stem/master] Cache parsed descriptors within Query
commit 6a7ebd9f12f89aa359d2df32cb47a44707a61008
Author: Damian Johnson <atagar@xxxxxxxxxxxxxx>
Date: Sun Nov 1 15:15:47 2020 -0800
Cache parsed descriptors within Query
Our Query class cached the bytes we download rather than parsed descriptors.
This could be advantagous if a user downloads descriptors without caring
about the results (unlikely), but otherwise it's all downside...
* Slower: The Query class downloads asynchronously so we can parallelize.
By parsing when the results are requested we serialize that part of the
runtime.
* Memory: Caching bytes reduced the upfront memory usage, but multiplies
it upon retrieving the results because we create fresh Descriptor
objects upon each invocation.
* Duplication: Each invocation of our run method re-parsed the descriptors.
For larger documents like the consensus this duplicates a lot of work.
* Complexity: Caching bytes needlessly complicated the run method.
---
stem/descriptor/remote.py | 67 +++++++++++++++++++----------------------------
1 file changed, 27 insertions(+), 40 deletions(-)
diff --git a/stem/descriptor/remote.py b/stem/descriptor/remote.py
index e8367e8b..136b9d15 100644
--- a/stem/descriptor/remote.py
+++ b/stem/descriptor/remote.py
@@ -338,9 +338,8 @@ class Query(Synchronous):
:var bool fall_back_to_authority: when retrying request issues the last
request to a directory authority if **True**
- :var str content: downloaded descriptor content
+ :var list downloaded: downloaded descriptors, **None** if not yet retrieved
:var Exception error: exception if a problem occured
- :var bool is_done: flag that indicates if our request has finished
:var float start_time: unix timestamp when we first started running
:var dict reply_headers: headers provided in the response,
@@ -413,9 +412,8 @@ class Query(Synchronous):
self.retries = retries
self.fall_back_to_authority = fall_back_to_authority
- self.content = None # type: Optional[bytes]
+ self.downloaded = None # type: Optional[List[stem.descriptor.Descriptor]]
self.error = None # type: Optional[BaseException]
- self.is_done = False
self.download_url = None # type: Optional[str]
self.start_time = None # type: Optional[float]
@@ -470,8 +468,14 @@ class Query(Synchronous):
async def _run(self, suppress: bool) -> AsyncIterator[stem.descriptor.Descriptor]:
with self._downloader_lock:
- self.start()
- await self._downloader_task
+ if not self.downloaded and not self.error:
+ if not self._downloader_task:
+ self.start()
+
+ try:
+ self.downloaded = await self._downloader_task
+ except Exception as exc:
+ self.error = exc
if self.error:
if suppress:
@@ -479,30 +483,8 @@ class Query(Synchronous):
raise self.error
else:
- if self.content is None:
- if suppress:
- return
-
- raise ValueError('BUG: _download_descriptors() finished without either results or an error')
-
- try:
- results = stem.descriptor.parse_file(
- io.BytesIO(self.content),
- self.descriptor_type,
- validate = self.validate,
- document_handler = self.document_handler,
- **self.kwargs
- )
-
- for desc in results:
- yield desc
- except ValueError as exc:
- self.error = exc # encountered a parsing error
-
- if suppress:
- return
-
- raise self.error
+ for desc in self.downloaded:
+ yield desc
async def __aiter__(self) -> AsyncIterator[stem.descriptor.Descriptor]:
async for desc in self._run(True):
@@ -526,7 +508,7 @@ class Query(Synchronous):
else:
return random.choice(self.endpoints)
- async def _download_descriptors(self, retries: int, timeout: Optional[float]) -> None:
+ async def _download_descriptors(self, retries: int, timeout: Optional[float]) -> List['stem.descriptor.Descriptor']:
self.start_time = time.time()
retries = self.retries
@@ -545,17 +527,24 @@ class Query(Synchronous):
try:
response = await asyncio.wait_for(self._download_from(endpoint), time_remaining)
- self.content, self.reply_headers = _http_body_and_headers(response)
+ content, self.reply_headers = _http_body_and_headers(response)
- self.is_done = True
self.runtime = time.time() - self.start_time
log.trace('Descriptors retrieved from %s in %0.2fs' % (downloaded_from, self.runtime))
- return
+
+ try:
+ return list(stem.descriptor.parse_file(
+ io.BytesIO(content),
+ self.descriptor_type,
+ validate = self.validate,
+ document_handler = self.document_handler,
+ **self.kwargs
+ ))
+ except ValueError:
+ raise # parsing failed
except asyncio.TimeoutError as exc:
- self.is_done = True
- self.error = stem.DownloadTimeout(downloaded_from, exc, sys.exc_info()[2], self.timeout)
- return
+ raise stem.DownloadTimeout(downloaded_from, exc, sys.exc_info()[2], self.timeout)
except:
exception = sys.exc_info()[1]
retries -= 1
@@ -568,9 +557,7 @@ class Query(Synchronous):
else:
log.debug("Failed to download descriptors from '%s': %s" % (self.download_url, exception))
- self.is_done = True
- self.error = exception
- return
+ raise
async def _download_from(self, endpoint: stem.Endpoint) -> bytes:
http_request = '\r\n'.join((
_______________________________________________
tor-commits mailing list
tor-commits@xxxxxxxxxxxxxxxxxxxx
https://lists.torproject.org/cgi-bin/mailman/listinfo/tor-commits