[Author Prev][Author Next][Thread Prev][Thread Next][Author Index][Thread Index]
[tor-commits] [stem/master] Expanded descriptor compression support
commit 72700087b94f2889b5b364738a1178c324862ba5
Author: Damian Johnson <atagar@xxxxxxxxxxxxxx>
Date: Wed Mar 28 11:57:10 2018 -0700
Expanded descriptor compression support
We supported plaintext and gzip when downloading descriptors, but recently tor
added lzma and zstd support as well...
https://gitweb.torproject.org/torspec.git/commit/?id=1cb56af
In practice I'm having difficulty finding an example of this working in the
live tor network...
https://trac.torproject.org/projects/tor/ticket/25667
That said, gonna go ahead and push this since even setting lzma/zstd aside it's
a nice step forward since it allows callers to specify compression headers.
---
docs/change_log.rst | 1 +
stem/descriptor/remote.py | 159 ++++++++++++++++++++++++++++++----------
stem/version.py | 2 +
test/integ/descriptor/remote.py | 20 +++++
test/unit/descriptor/remote.py | 50 ++++++++++---
5 files changed, 182 insertions(+), 50 deletions(-)
diff --git a/docs/change_log.rst b/docs/change_log.rst
index bb42c982..a67aae2d 100644
--- a/docs/change_log.rst
+++ b/docs/change_log.rst
@@ -55,6 +55,7 @@ The following are only available within Stem's `git repository
* **Descriptors**
* `Fallback directory v2 support <https://lists.torproject.org/pipermail/tor-dev/2017-December/012721.html>`_, which adds *nickname* and *extrainfo*
+ * Added zstd and lzma compression support (:spec:`1cb56af`)
* Reduced maximum descriptors fetched by the remote module to match tor's new limit (:trac:`24743`)
* Consensus **shared_randomness_*_reveal_count** attributes undocumented, and unavailable if retrieved before their corresponding shared_randomness_*_value attribute (:trac:`25046`)
* Allow 'proto' line to have blank values (:spec:`a8455f4`)
diff --git a/stem/descriptor/remote.py b/stem/descriptor/remote.py
index 9181dbcf..ab61d3a7 100644
--- a/stem/descriptor/remote.py
+++ b/stem/descriptor/remote.py
@@ -80,6 +80,21 @@ content. For example...
Maximum number of microdescriptors that can requested at a time by their
hashes.
+
+.. data:: Compression (enum)
+
+ Compression when downloading descriptors.
+
+ .. versionadded:: 1.7.0
+
+ =============== ===========
+ Compression Description
+ =============== ===========
+ **PLAINTEXT** Uncompressed data.
+ **GZIP** `GZip compression <https://www.gnu.org/software/gzip/>`_.
+ **ZSTD** `Zstandard compression <https://www.zstd.net>`_
+ **LZMA** `LZMA compression <https://en.wikipedia.org/wiki/LZMA>`_.
+ =============== ===========
"""
import io
@@ -91,6 +106,13 @@ import threading
import time
import zlib
+import stem.descriptor
+import stem.prereq
+import stem.util.enum
+
+from stem import Flag
+from stem.util import _hash_attr, connection, log, str_tools, tor_tools
+
try:
# added in python 2.7
from collections import OrderedDict
@@ -103,11 +125,24 @@ try:
except ImportError:
import urllib2 as urllib
-import stem.descriptor
-import stem.prereq
+try:
+ # added in python 3.3
+ import lzma
+ LZMA_SUPPORTED = True
+except ImportError:
+ LZMA_SUPPORTED = False
-from stem import Flag
-from stem.util import _hash_attr, connection, log, str_tools, tor_tools
+ZSTD_SUPPORTED = False
+
+Compression = stem.util.enum.Enum(
+ ('PLAINTEXT', 'identity'),
+ ('GZIP', 'gzip'), # can also be 'deflate'
+ ('ZSTD', 'x-zstd'),
+ ('LZMA', 'x-tor-lzma'),
+)
+
+ZSTD_UNAVAILABLE_MSG = 'ZSTD is not yet supported'
+LZMA_UNAVAILABLE_MSG = 'LZMA compression was requested but requires the lzma module, which was added in python 3.3'
# Tor has a limited number of descriptors we can fetch explicitly by their
# fingerprint or hashes due to a limit on the url length by squid proxies.
@@ -224,7 +259,7 @@ class Query(object):
from stem.descriptor.remote import Query
query = Query(
- '/tor/server/all.z',
+ '/tor/server/all',
block = True,
timeout = 30,
)
@@ -243,7 +278,7 @@ class Query(object):
print('Current relays:')
- for desc in Query('/tor/server/all.z', 'server-descriptor 1.0'):
+ for desc in Query('/tor/server/all', 'server-descriptor 1.0'):
print(desc.fingerprint)
In either case exceptions are available via our 'error' attribute.
@@ -256,28 +291,37 @@ class Query(object):
=============================================== ===========
Resource Description
=============================================== ===========
- /tor/server/all.z all present server descriptors
- /tor/server/fp/<fp1>+<fp2>+<fp3>.z server descriptors with the given fingerprints
- /tor/extra/all.z all present extrainfo descriptors
- /tor/extra/fp/<fp1>+<fp2>+<fp3>.z extrainfo descriptors with the given fingerprints
- /tor/micro/d/<hash1>-<hash2>.z microdescriptors with the given hashes
- /tor/status-vote/current/consensus.z present consensus
- /tor/status-vote/current/consensus-microdesc.z present microdescriptor consensus
- /tor/keys/all.z key certificates for the authorities
- /tor/keys/fp/<v3ident1>+<v3ident2>.z key certificates for specific authorities
+ /tor/server/all all present server descriptors
+ /tor/server/fp/<fp1>+<fp2>+<fp3> server descriptors with the given fingerprints
+ /tor/extra/all all present extrainfo descriptors
+ /tor/extra/fp/<fp1>+<fp2>+<fp3> extrainfo descriptors with the given fingerprints
+ /tor/micro/d/<hash1>-<hash2> microdescriptors with the given hashes
+ /tor/status-vote/current/consensus present consensus
+ /tor/status-vote/current/consensus-microdesc present microdescriptor consensus
+ /tor/keys/all key certificates for the authorities
+ /tor/keys/fp/<v3ident1>+<v3ident2> key certificates for specific authorities
=============================================== ===========
- The '.z' suffix can be excluded to get a plaintext rather than compressed
- response. Compression is handled transparently, so this shouldn't matter to
- the caller.
+ **LZMA** compression requires the `lzma module
+ <https://docs.python.org/3/library/lzma.html>`_ which was added in Python
+ 3.3.
+
+ For legacy reasons if our resource has a '.z' suffix then our **compression**
+ argument is overwritten with Compression.GZIP.
+
+ .. versionchanged:: 1.7.0
+ Added the compression argument.
- :var str resource: resource being fetched, such as '/tor/server/all.z'
+ :var str resource: resource being fetched, such as '/tor/server/all'
:var str descriptor_type: type of descriptors being fetched (for options see
:func:`~stem.descriptor.__init__.parse_file`), this is guessed from the
resource if **None**
:var list endpoints: (address, dirport) tuples of the authority or mirror
we're querying, this uses authorities if undefined
+ :var list compression: list of :data:`stem.descriptor.remote.Compression`
+ we're willing to accept, when none are mutually supported downloads fall
+ back to Compression.PLAINTEXT
:var int retries: number of times to attempt the request if downloading it
fails
:var bool fall_back_to_authority: when retrying request issues the last
@@ -305,17 +349,37 @@ class Query(object):
the same as running **query.run(True)** (default is **False**)
"""
- def __init__(self, resource, descriptor_type = None, endpoints = None, retries = 2, fall_back_to_authority = False, timeout = None, start = True, block = False, validate = False, document_handler = stem.descriptor.DocumentHandler.ENTRIES, **kwargs):
+ def __init__(self, resource, descriptor_type = None, endpoints = None, compression = None, retries = 2, fall_back_to_authority = False, timeout = None, start = True, block = False, validate = False, document_handler = stem.descriptor.DocumentHandler.ENTRIES, **kwargs):
if not resource.startswith('/'):
raise ValueError("Resources should start with a '/': %s" % resource)
- self.resource = resource
+ if resource.endswith('.z'):
+ compression = [Compression.GZIP]
+ resource = resource[:-2]
+ elif compression is None:
+ compression = [Compression.PLAINTEXT]
+ else:
+ if isinstance(compression, str):
+ compression = [compression] # caller provided only a single option
+
+ if Compression.LZMA in compression and not LZMA_SUPPORTED:
+ log.log_once('stem.descriptor.remote.lzma_unavailable', log.INFO, LZMA_UNAVAILABLE_MSG)
+ compression.remove(Compression.LZMA)
+
+ if Compression.ZSTD in compression and not ZSTD_SUPPORTED:
+ log.log_once('stem.descriptor.remote.zstd_unavailable', log.INFO, ZSTD_UNAVAILABLE_MSG)
+ compression.remove(Compression.ZSTD)
+
+ if not compression:
+ compression = [Compression.PLAINTEXT]
if descriptor_type:
self.descriptor_type = descriptor_type
else:
self.descriptor_type = _guess_descriptor_type(resource)
+ self.resource = resource
+ self.compression = compression
self.endpoints = endpoints if endpoints else []
self.retries = retries
self.fall_back_to_authority = fall_back_to_authority
@@ -352,7 +416,7 @@ class Query(object):
self._downloader_thread = threading.Thread(
name = 'Descriptor Query',
target = self._download_descriptors,
- args = (self.retries,)
+ args = (self.compression, self.retries,)
)
self._downloader_thread.setDaemon(True)
@@ -435,26 +499,41 @@ class Query(object):
if use_authority or not self.endpoints:
directories = get_authorities().values()
- picked = random.choice(directories)
+ picked = random.choice(list(directories))
address, dirport = picked.address, picked.dir_port
else:
address, dirport = random.choice(self.endpoints)
return 'http://%s:%i/%s' % (address, dirport, self.resource.lstrip('/'))
- def _download_descriptors(self, retries):
+ def _download_descriptors(self, compression, retries):
try:
use_authority = retries == 0 and self.fall_back_to_authority
self.download_url = self._pick_url(use_authority)
-
self.start_time = time.time()
- response = urllib.urlopen(self.download_url, timeout = self.timeout).read()
- if self.download_url.endswith('.z'):
- response = zlib.decompress(response)
+ response = urllib.urlopen(
+ urllib.Request(
+ self.download_url,
+ headers = {'Accept-Encoding': ', '.join(compression)},
+ ),
+ timeout = self.timeout,
+ )
+
+ data = response.read()
+ encoding = response.info().getheader('Content-Encoding')
- self.content = response.strip()
+ if encoding in (Compression.GZIP, 'deflate'):
+ # The '32' is for automatic header detection...
+ # https://stackoverflow.com/questions/3122145/zlib-error-error-3-while-decompressing-incorrect-header-check/22310760#22310760
+ data = zlib.decompress(data, zlib.MAX_WBITS | 32)
+ elif encoding == Compression.ZSTD:
+ pass # TODO: implement
+ elif encoding == Compression.LZMA and LZMA_SUPPORTED:
+ data = lzma.decompress(data)
+
+ self.content = data.strip()
self.runtime = time.time() - self.start_time
log.trace("Descriptors retrieved from '%s' in %0.2fs" % (self.download_url, self.runtime))
except:
@@ -462,7 +541,7 @@ class Query(object):
if retries > 0:
log.debug("Unable to download descriptors from '%s' (%i retries remaining): %s" % (self.download_url, retries, exc))
- return self._download_descriptors(retries - 1)
+ return self._download_descriptors(compression, retries - 1)
else:
log.debug("Unable to download descriptors from '%s': %s" % (self.download_url, exc))
self.error = exc
@@ -539,7 +618,7 @@ class DescriptorDownloader(object):
fingerprints (this is due to a limit on the url length by squid proxies).
"""
- resource = '/tor/server/all.z'
+ resource = '/tor/server/all'
if isinstance(fingerprints, str):
fingerprints = [fingerprints]
@@ -548,7 +627,7 @@ class DescriptorDownloader(object):
if len(fingerprints) > MAX_FINGERPRINTS:
raise ValueError('Unable to request more than %i descriptors at a time by their fingerprints' % MAX_FINGERPRINTS)
- resource = '/tor/server/fp/%s.z' % '+'.join(fingerprints)
+ resource = '/tor/server/fp/%s' % '+'.join(fingerprints)
return self.query(resource, **query_args)
@@ -569,7 +648,7 @@ class DescriptorDownloader(object):
fingerprints (this is due to a limit on the url length by squid proxies).
"""
- resource = '/tor/extra/all.z'
+ resource = '/tor/extra/all'
if isinstance(fingerprints, str):
fingerprints = [fingerprints]
@@ -578,7 +657,7 @@ class DescriptorDownloader(object):
if len(fingerprints) > MAX_FINGERPRINTS:
raise ValueError('Unable to request more than %i descriptors at a time by their fingerprints' % MAX_FINGERPRINTS)
- resource = '/tor/extra/fp/%s.z' % '+'.join(fingerprints)
+ resource = '/tor/extra/fp/%s' % '+'.join(fingerprints)
return self.query(resource, **query_args)
@@ -613,7 +692,7 @@ class DescriptorDownloader(object):
if len(hashes) > MAX_MICRODESCRIPTOR_HASHES:
raise ValueError('Unable to request more than %i microdescriptors at a time by their hashes' % MAX_MICRODESCRIPTOR_HASHES)
- return self.query('/tor/micro/d/%s.z' % '-'.join(hashes), **query_args)
+ return self.query('/tor/micro/d/%s' % '-'.join(hashes), **query_args)
def get_consensus(self, authority_v3ident = None, microdescriptor = False, **query_args):
"""
@@ -643,7 +722,7 @@ class DescriptorDownloader(object):
if authority_v3ident:
resource += '/%s' % authority_v3ident
- consensus_query = self.query(resource + '.z', **query_args)
+ consensus_query = self.query(resource, **query_args)
# if we're performing validation then check that it's signed by the
# authority key certificates
@@ -672,7 +751,7 @@ class DescriptorDownloader(object):
if 'endpoint' not in query_args:
query_args['endpoints'] = [(authority.address, authority.dir_port)]
- return self.query(resource + '.z', **query_args)
+ return self.query(resource, **query_args)
def get_key_certificates(self, authority_v3idents = None, **query_args):
"""
@@ -694,7 +773,7 @@ class DescriptorDownloader(object):
squid proxies).
"""
- resource = '/tor/keys/all.z'
+ resource = '/tor/keys/all'
if isinstance(authority_v3idents, str):
authority_v3idents = [authority_v3idents]
@@ -703,7 +782,7 @@ class DescriptorDownloader(object):
if len(authority_v3idents) > MAX_FINGERPRINTS:
raise ValueError('Unable to request more than %i key certificates at a time by their identity fingerprints' % MAX_FINGERPRINTS)
- resource = '/tor/keys/fp/%s.z' % '+'.join(authority_v3idents)
+ resource = '/tor/keys/fp/%s' % '+'.join(authority_v3idents)
return self.query(resource, **query_args)
@@ -711,7 +790,7 @@ class DescriptorDownloader(object):
"""
Issues a request for the given resource.
- :param str resource: resource being fetched, such as '/tor/server/all.z'
+ :param str resource: resource being fetched, such as '/tor/server/all'
:param query_args: additional arguments for the
:class:`~stem.descriptor.remote.Query` constructor
diff --git a/stem/version.py b/stem/version.py
index 9de2f1a5..9036effb 100644
--- a/stem/version.py
+++ b/stem/version.py
@@ -35,6 +35,7 @@ easily parsed and compared, for instance...
Requirement Description
===================================== ===========
**AUTH_SAFECOOKIE** SAFECOOKIE authentication method
+ **DESCRIPTOR_COMPRESSION** `Expanded compression support for ZSTD and LZMA <https://gitweb.torproject.org/torspec.git/commit/?id=1cb56afdc1e55e303e3e6b69e90d983ee217d93f>`_
**DROPGUARDS** DROPGUARDS requests
**EVENT_AUTHDIR_NEWDESCS** AUTHDIR_NEWDESC events
**EVENT_BUILDTIMEOUT_SET** BUILDTIMEOUT_SET events
@@ -353,6 +354,7 @@ safecookie_req.greater_than(Version('0.2.3.13'))
Requirement = stem.util.enum.Enum(
('AUTH_SAFECOOKIE', safecookie_req),
+ ('DESCRIPTOR_COMPRESSION', Version('0.3.1.1-alpha')),
('DROPGUARDS', Version('0.2.5.1-alpha')),
('EVENT_AUTHDIR_NEWDESCS', Version('0.1.1.10-alpha')),
('EVENT_BUILDTIMEOUT_SET', Version('0.2.2.7-alpha')),
diff --git a/test/integ/descriptor/remote.py b/test/integ/descriptor/remote.py
index 3123ef06..4e10eac2 100644
--- a/test/integ/descriptor/remote.py
+++ b/test/integ/descriptor/remote.py
@@ -16,6 +16,26 @@ import test.require
class TestDescriptorDownloader(unittest.TestCase):
@test.require.only_run_once
@test.require.online
+ def test_compression(self):
+ """
+ Issue a request for a plaintext descriptor.
+ """
+
+ moria1 = stem.descriptor.remote.get_authorities()['moria1']
+
+ descriptors = list(stem.descriptor.remote.Query(
+ '/tor/server/fp/%s' % moria1.fingerprint,
+ 'server-descriptor 1.0',
+ endpoints = [(moria1.address, moria1.dir_port)],
+ timeout = 30,
+ validate = True,
+ ).run())
+
+ self.assertEqual(1, len(descriptors))
+ self.assertEqual('moria1', descriptors[0].nickname)
+
+ @test.require.only_run_once
+ @test.require.online
def test_shorthand_aliases(self):
"""
Quick sanity test that we can call our shorthand aliases for getting
diff --git a/test/unit/descriptor/remote.py b/test/unit/descriptor/remote.py
index ac150d5c..3661a54b 100644
--- a/test/unit/descriptor/remote.py
+++ b/test/unit/descriptor/remote.py
@@ -11,6 +11,8 @@ import stem.descriptor.remote
import stem.prereq
import stem.util.conf
+from stem.descriptor.remote import Compression
+
try:
# added in python 2.7
from collections import OrderedDict
@@ -29,6 +31,8 @@ except ImportError:
URL_OPEN = 'urllib.request.urlopen' if stem.prereq.is_python_3() else 'urllib2.urlopen'
+TEST_RESOURCE = '/tor/server/fp/9695DFC35FFEB861329B9F1AB04C46397020CE31'
+
# Output from requesting moria1's descriptor from itself...
# % curl http://128.31.0.39:9131/tor/server/fp/9695DFC35FFEB861329B9F1AB04C46397020CE31
@@ -108,6 +112,33 @@ FALLBACK_ENTRY = b"""\
class TestDescriptorDownloader(unittest.TestCase):
+ def test_gzip_url_override(self):
+ query = stem.descriptor.remote.Query(TEST_RESOURCE, start = False)
+ self.assertEqual([Compression.PLAINTEXT], query.compression)
+ self.assertEqual(TEST_RESOURCE, query.resource)
+
+ query = stem.descriptor.remote.Query(TEST_RESOURCE + '.z', compression = Compression.PLAINTEXT, start = False)
+ self.assertEqual([Compression.GZIP], query.compression)
+ self.assertEqual(TEST_RESOURCE, query.resource)
+
+ def test_zstd_support_check(self):
+ with patch('stem.descriptor.remote.ZSTD_SUPPORTED', True):
+ query = stem.descriptor.remote.Query(TEST_RESOURCE, compression = Compression.ZSTD, start = False)
+ self.assertEqual([Compression.ZSTD], query.compression)
+
+ with patch('stem.descriptor.remote.ZSTD_SUPPORTED', False):
+ query = stem.descriptor.remote.Query(TEST_RESOURCE, compression = Compression.ZSTD, start = False)
+ self.assertEqual([Compression.PLAINTEXT], query.compression)
+
+ def test_lzma_support_check(self):
+ with patch('stem.descriptor.remote.LZMA_SUPPORTED', True):
+ query = stem.descriptor.remote.Query(TEST_RESOURCE, compression = Compression.LZMA, start = False)
+ self.assertEqual([Compression.LZMA], query.compression)
+
+ with patch('stem.descriptor.remote.LZMA_SUPPORTED', False):
+ query = stem.descriptor.remote.Query(TEST_RESOURCE, compression = Compression.LZMA, start = False)
+ self.assertEqual([Compression.PLAINTEXT], query.compression)
+
@patch(URL_OPEN)
def test_query_download(self, urlopen_mock):
"""
@@ -117,13 +148,14 @@ class TestDescriptorDownloader(unittest.TestCase):
urlopen_mock.return_value = io.BytesIO(TEST_DESCRIPTOR)
query = stem.descriptor.remote.Query(
- '/tor/server/fp/9695DFC35FFEB861329B9F1AB04C46397020CE31',
+ TEST_RESOURCE,
'server-descriptor 1.0',
endpoints = [('128.31.0.39', 9131)],
+ compression = Compression.PLAINTEXT,
validate = True,
)
- expeced_url = 'http://128.31.0.39:9131/tor/server/fp/9695DFC35FFEB861329B9F1AB04C46397020CE31'
+ expeced_url = 'http://128.31.0.39:9131' + TEST_RESOURCE
self.assertEqual(expeced_url, query._pick_url())
descriptors = list(query)
@@ -135,7 +167,7 @@ class TestDescriptorDownloader(unittest.TestCase):
self.assertEqual('9695DFC35FFEB861329B9F1AB04C46397020CE31', desc.fingerprint)
self.assertEqual(TEST_DESCRIPTOR.strip(), desc.get_bytes())
- urlopen_mock.assert_called_once_with(expeced_url, timeout = None)
+ self.assertEqual(1, urlopen_mock.call_count)
@patch(URL_OPEN)
def test_query_with_malformed_content(self, urlopen_mock):
@@ -147,9 +179,10 @@ class TestDescriptorDownloader(unittest.TestCase):
urlopen_mock.return_value = io.BytesIO(descriptor_content)
query = stem.descriptor.remote.Query(
- '/tor/server/fp/9695DFC35FFEB861329B9F1AB04C46397020CE31',
+ TEST_RESOURCE,
'server-descriptor 1.0',
endpoints = [('128.31.0.39', 9131)],
+ compression = Compression.PLAINTEXT,
validate = True,
)
@@ -171,7 +204,7 @@ class TestDescriptorDownloader(unittest.TestCase):
urlopen_mock.side_effect = socket.timeout('connection timed out')
query = stem.descriptor.remote.Query(
- '/tor/server/fp/9695DFC35FFEB861329B9F1AB04C46397020CE31',
+ TEST_RESOURCE,
'server-descriptor 1.0',
endpoints = [('128.31.0.39', 9131)],
fall_back_to_authority = False,
@@ -180,10 +213,6 @@ class TestDescriptorDownloader(unittest.TestCase):
)
self.assertRaises(socket.timeout, query.run)
- urlopen_mock.assert_called_with(
- 'http://128.31.0.39:9131/tor/server/fp/9695DFC35FFEB861329B9F1AB04C46397020CE31',
- timeout = 5,
- )
self.assertEqual(3, urlopen_mock.call_count)
@patch(URL_OPEN)
@@ -191,9 +220,10 @@ class TestDescriptorDownloader(unittest.TestCase):
urlopen_mock.return_value = io.BytesIO(TEST_DESCRIPTOR)
query = stem.descriptor.remote.Query(
- '/tor/server/fp/9695DFC35FFEB861329B9F1AB04C46397020CE31',
+ TEST_RESOURCE,
'server-descriptor 1.0',
endpoints = [('128.31.0.39', 9131)],
+ compression = Compression.PLAINTEXT,
validate = True,
)
_______________________________________________
tor-commits mailing list
tor-commits@xxxxxxxxxxxxxxxxxxxx
https://lists.torproject.org/cgi-bin/mailman/listinfo/tor-commits