[Author Prev][Author Next][Thread Prev][Thread Next][Author Index][Thread Index]

[tor-commits] [stem/master] Cache parsed descriptors within Query



commit 6a7ebd9f12f89aa359d2df32cb47a44707a61008
Author: Damian Johnson <atagar@xxxxxxxxxxxxxx>
Date:   Sun Nov 1 15:15:47 2020 -0800

    Cache parsed descriptors within Query
    
    Our Query class cached the bytes we download rather than parsed descriptors.
    This could be advantagous if a user downloads descriptors without caring
    about the results (unlikely), but otherwise it's all downside...
    
      * Slower: The Query class downloads asynchronously so we can parallelize.
        By parsing when the results are requested we serialize that part of the
        runtime.
    
      * Memory: Caching bytes reduced the upfront memory usage, but multiplies
        it upon retrieving the results because we create fresh Descriptor
        objects upon each invocation.
    
      * Duplication: Each invocation of our run method re-parsed the descriptors.
        For larger documents like the consensus this duplicates a lot of work.
    
      * Complexity: Caching bytes needlessly complicated the run method.
---
 stem/descriptor/remote.py | 67 +++++++++++++++++++----------------------------
 1 file changed, 27 insertions(+), 40 deletions(-)

diff --git a/stem/descriptor/remote.py b/stem/descriptor/remote.py
index e8367e8b..136b9d15 100644
--- a/stem/descriptor/remote.py
+++ b/stem/descriptor/remote.py
@@ -338,9 +338,8 @@ class Query(Synchronous):
   :var bool fall_back_to_authority: when retrying request issues the last
     request to a directory authority if **True**
 
-  :var str content: downloaded descriptor content
+  :var list downloaded: downloaded descriptors, **None** if not yet retrieved
   :var Exception error: exception if a problem occured
-  :var bool is_done: flag that indicates if our request has finished
 
   :var float start_time: unix timestamp when we first started running
   :var dict reply_headers: headers provided in the response,
@@ -413,9 +412,8 @@ class Query(Synchronous):
     self.retries = retries
     self.fall_back_to_authority = fall_back_to_authority
 
-    self.content = None  # type: Optional[bytes]
+    self.downloaded = None  # type: Optional[List[stem.descriptor.Descriptor]]
     self.error = None  # type: Optional[BaseException]
-    self.is_done = False
     self.download_url = None  # type: Optional[str]
 
     self.start_time = None  # type: Optional[float]
@@ -470,8 +468,14 @@ class Query(Synchronous):
 
   async def _run(self, suppress: bool) -> AsyncIterator[stem.descriptor.Descriptor]:
     with self._downloader_lock:
-      self.start()
-      await self._downloader_task
+      if not self.downloaded and not self.error:
+        if not self._downloader_task:
+          self.start()
+
+        try:
+          self.downloaded = await self._downloader_task
+        except Exception as exc:
+          self.error = exc
 
       if self.error:
         if suppress:
@@ -479,30 +483,8 @@ class Query(Synchronous):
 
         raise self.error
       else:
-        if self.content is None:
-          if suppress:
-            return
-
-          raise ValueError('BUG: _download_descriptors() finished without either results or an error')
-
-        try:
-          results = stem.descriptor.parse_file(
-            io.BytesIO(self.content),
-            self.descriptor_type,
-            validate = self.validate,
-            document_handler = self.document_handler,
-            **self.kwargs
-          )
-
-          for desc in results:
-            yield desc
-        except ValueError as exc:
-          self.error = exc  # encountered a parsing error
-
-          if suppress:
-            return
-
-          raise self.error
+        for desc in self.downloaded:
+          yield desc
 
   async def __aiter__(self) -> AsyncIterator[stem.descriptor.Descriptor]:
     async for desc in self._run(True):
@@ -526,7 +508,7 @@ class Query(Synchronous):
     else:
       return random.choice(self.endpoints)
 
-  async def _download_descriptors(self, retries: int, timeout: Optional[float]) -> None:
+  async def _download_descriptors(self, retries: int, timeout: Optional[float]) -> List['stem.descriptor.Descriptor']:
     self.start_time = time.time()
 
     retries = self.retries
@@ -545,17 +527,24 @@ class Query(Synchronous):
 
       try:
         response = await asyncio.wait_for(self._download_from(endpoint), time_remaining)
-        self.content, self.reply_headers = _http_body_and_headers(response)
+        content, self.reply_headers = _http_body_and_headers(response)
 
-        self.is_done = True
         self.runtime = time.time() - self.start_time
 
         log.trace('Descriptors retrieved from %s in %0.2fs' % (downloaded_from, self.runtime))
-        return
+
+        try:
+          return list(stem.descriptor.parse_file(
+            io.BytesIO(content),
+            self.descriptor_type,
+            validate = self.validate,
+            document_handler = self.document_handler,
+            **self.kwargs
+          ))
+        except ValueError:
+          raise  # parsing failed
       except asyncio.TimeoutError as exc:
-        self.is_done = True
-        self.error = stem.DownloadTimeout(downloaded_from, exc, sys.exc_info()[2], self.timeout)
-        return
+        raise stem.DownloadTimeout(downloaded_from, exc, sys.exc_info()[2], self.timeout)
       except:
         exception = sys.exc_info()[1]
         retries -= 1
@@ -568,9 +557,7 @@ class Query(Synchronous):
         else:
           log.debug("Failed to download descriptors from '%s': %s" % (self.download_url, exception))
 
-          self.is_done = True
-          self.error = exception
-          return
+          raise
 
   async def _download_from(self, endpoint: stem.Endpoint) -> bytes:
     http_request = '\r\n'.join((



_______________________________________________
tor-commits mailing list
tor-commits@xxxxxxxxxxxxxxxxxxxx
https://lists.torproject.org/cgi-bin/mailman/listinfo/tor-commits