[Author Prev][Author Next][Thread Prev][Thread Next][Author Index][Thread Index]

[tor-commits] [stem/master] All client usage failed with ValueErrors



commit 5d29a263130c47ec399dcdb5ef9994792a854cff
Author: Damian Johnson <atagar@xxxxxxxxxxxxxx>
Date:   Wed Jul 22 18:11:41 2020 -0700

    All client usage failed with ValueErrors
    
    When our Relay class sends a message it first drains its socket of unread
    data...
    
      async def _msg(self, cell):
        await self._orport.recv(timeout = 0)
        await self._orport.send(cell.pack(self.link_protocol))
        response = await self._orport.recv(timeout = 1)
        yield stem.client.cell.Cell.pop(response, self.link_protocol)[0]
    
    This in turn called asyncio.wait_for() with a timeout value of zero which
    returns immediately, leaving our socket undrained.
    
    Our following recv() is then polluted with unexpected data. For instance,
    this is caused anything that uses create_circuit() (such as descriptor
    downloads) to fail with confusing exceptions such as...
    
      ======================================================================
      ERROR: test_downloading_via_orport
      ----------------------------------------------------------------------
      Traceback (most recent call last):
        File "/home/atagar/Desktop/stem/test/require.py", line 60, in wrapped
          return func(self, *args, **kwargs)
        File "/home/atagar/Desktop/stem/test/require.py", line 75, in wrapped
          return func(self, *args, **kwargs)
        File "/home/atagar/Desktop/stem/test/require.py", line 75, in wrapped
          return func(self, *args, **kwargs)
        File "/home/atagar/Desktop/stem/test/integ/descriptor/remote.py", line 27, in test_downloading_via_orport
          fall_back_to_authority = False,
        File "/home/atagar/Desktop/stem/stem/util/__init__.py", line 363, in _run_async_method
          return future.result()
        File "/home/atagar/Python-3.7.0/Lib/concurrent/futures/_base.py", line 432, in result
          return self.__get_result()
        File "/home/atagar/Python-3.7.0/Lib/concurrent/futures/_base.py", line 384, in __get_result
          raise self._exception
        File "/home/atagar/Desktop/stem/stem/descriptor/remote.py", line 469, in run
          return [desc async for desc in self._run(suppress)]
        File "/home/atagar/Desktop/stem/stem/descriptor/remote.py", line 469, in <listcomp>
          return [desc async for desc in self._run(suppress)]
        File "/home/atagar/Desktop/stem/stem/descriptor/remote.py", line 482, in _run
          raise self.error
        File "/home/atagar/Desktop/stem/stem/descriptor/remote.py", line 549, in _download_descriptors
          response = await asyncio.wait_for(self._download_from(endpoint), time_remaining)
        File "/home/atagar/Python-3.7.0/Lib/asyncio/tasks.py", line 384, in wait_for
          return await fut
        File "/home/atagar/Desktop/stem/stem/descriptor/remote.py", line 588, in _download_from
          async with await relay.create_circuit() as circ:
        File "/home/atagar/Desktop/stem/stem/client/__init__.py", line 270, in create_circuit
          async for cell in self._msg(create_fast_cell):
        File "/home/atagar/Desktop/stem/stem/client/__init__.py", line 226, in _msg
          yield stem.client.cell.Cell.pop(response, self.link_protocol)[0]
        File "/home/atagar/Desktop/stem/stem/client/cell.py", line 182, in pop
          cls = Cell.by_value(command)
        File "/home/atagar/Desktop/stem/stem/client/cell.py", line 139, in by_value
          raise ValueError("'%s' isn't a valid cell value" % value)
      ValueError: '65' isn't a valid cell value
    
    This also reverts our Relay's '_orport_lock' back to a threaded RLock because
    asyncio locks are not reentrant, causing methods such as directory() (which
    call _send()) to deadlock upon themselves. We might drop this lock entirely in
    the future (thread safety should be moot now that the stem.client module is
    fully asynchronous).
---
 stem/client/__init__.py        | 18 +++++++++---------
 stem/socket.py                 |  7 +++----
 test/unit/descriptor/remote.py |  2 +-
 3 files changed, 13 insertions(+), 14 deletions(-)

diff --git a/stem/client/__init__.py b/stem/client/__init__.py
index 8c8da923..877d60e2 100644
--- a/stem/client/__init__.py
+++ b/stem/client/__init__.py
@@ -25,8 +25,8 @@ a wrapper for :class:`~stem.socket.RelaySocket`, much the same way as
     +- close - closes this circuit
 """
 
-import asyncio
 import hashlib
+import threading
 
 import stem
 import stem.client.cell
@@ -71,7 +71,7 @@ class Relay(object):
     self.link_protocol = LinkProtocol(link_protocol)
     self._orport = orport
     self._orport_buffer = b''  # unread bytes
-    self._orport_lock = asyncio.Lock()
+    self._orport_lock = threading.RLock()
     self._circuits = {}  # type: Dict[int, stem.client.Circuit]
 
   @staticmethod
@@ -162,7 +162,7 @@ class Relay(object):
     :returns: next :class:`~stem.client.cell.Cell`
     """
 
-    async with self._orport_lock:
+    with self._orport_lock:
       # cells begin with [circ_id][cell_type][...]
 
       circ_id_size = self.link_protocol.circ_id_size.size
@@ -253,7 +253,7 @@ class Relay(object):
     :func:`~stem.socket.BaseSocket.close` method.
     """
 
-    async with self._orport_lock:
+    with self._orport_lock:
       return await self._orport.close()
 
   async def create_circuit(self) -> 'stem.client.Circuit':
@@ -261,7 +261,7 @@ class Relay(object):
     Establishes a new circuit.
     """
 
-    async with self._orport_lock:
+    with self._orport_lock:
       circ_id = max(self._circuits) + 1 if self._circuits else self.link_protocol.first_circ_id
 
       create_fast_cell = stem.client.cell.CreateFastCell(circ_id)
@@ -286,7 +286,7 @@ class Relay(object):
       return circ
 
   async def __aiter__(self) -> AsyncIterator['stem.client.Circuit']:
-    async with self._orport_lock:
+    with self._orport_lock:
       for circ in self._circuits.values():
         yield circ
 
@@ -338,7 +338,7 @@ class Circuit(object):
     :returns: **str** with the requested descriptor data
     """
 
-    async with self.relay._orport_lock:
+    with self.relay._orport_lock:
       await self._send(RelayCommand.BEGIN_DIR, stream_id = stream_id)
       await self._send(RelayCommand.DATA, request, stream_id = stream_id)
 
@@ -372,7 +372,7 @@ class Circuit(object):
     :param stream_id: specific stream this concerns
     """
 
-    async with self.relay._orport_lock:
+    with self.relay._orport_lock:
       # Encrypt and send the cell. Our digest/key only updates if the cell is
       # successfully sent.
 
@@ -384,7 +384,7 @@ class Circuit(object):
       self.forward_key = forward_key
 
   async def close(self) -> None:
-    async with self.relay._orport_lock:
+    with self.relay._orport_lock:
       await self.relay._orport.send(stem.client.cell.DestroyCell(self.id).pack(self.relay.link_protocol))
       del self.relay._circuits[self.id]
 
diff --git a/stem/socket.py b/stem/socket.py
index a19a8f25..af52fffa 100644
--- a/stem/socket.py
+++ b/stem/socket.py
@@ -362,13 +362,12 @@ class RelaySocket(BaseSocket):
     """
 
     async def wrapped_recv(reader: asyncio.StreamReader) -> Optional[bytes]:
-      read_coroutine = reader.read(1024)
       if timeout is None:
-        return await read_coroutine
+        return await reader.read(1024)
       else:
         try:
-          return await asyncio.wait_for(read_coroutine, timeout)
-        except (asyncio.TimeoutError, ssl.SSLError, ssl.SSLWantReadError):
+          return await asyncio.wait_for(reader.read(1024), max(timeout, 0.0001))
+        except asyncio.TimeoutError:
           return None
 
     return await self._recv(wrapped_recv)
diff --git a/test/unit/descriptor/remote.py b/test/unit/descriptor/remote.py
index 6b9ca3ad..7447fd16 100644
--- a/test/unit/descriptor/remote.py
+++ b/test/unit/descriptor/remote.py
@@ -98,7 +98,7 @@ class TestDescriptorDownloader(unittest.TestCase):
     self.assertEqual('moria1', desc.nickname)
     self.assertEqual('128.31.0.34', desc.address)
     self.assertEqual('9695DFC35FFEB861329B9F1AB04C46397020CE31', desc.fingerprint)
-    self.assertEqual(TEST_DESCRIPTOR, desc.get_bytes())
+    self.assertEqual(TEST_DESCRIPTOR.rstrip(), desc.get_bytes())
 
     reply.stop()
 

_______________________________________________
tor-commits mailing list
tor-commits@xxxxxxxxxxxxxxxxxxxx
https://lists.torproject.org/cgi-bin/mailman/listinfo/tor-commits