[Author Prev][Author Next][Thread Prev][Thread Next][Author Index][Thread Index]
[tor-commits] [stem/master] Inner layer creation and encryption
commit 41f8a42a9e8735513b72d9ba3f6a5c11acf02943
Author: Damian Johnson <atagar@xxxxxxxxxxxxxx>
Date: Mon Nov 11 15:32:55 2019 -0800
Inner layer creation and encryption
InnerLayer creation, encryption, and test. While _get_superencrypted_blob()
provided a great demo, it was limited to just introduction points. Now we'll
support anything the layer does.
---
stem/descriptor/__init__.py | 2 +-
stem/descriptor/hidden_service.py | 41 +++++++++++++++++++++++---
test/unit/descriptor/hidden_service_v3.py | 48 +++++++++++++++++++++++++++++++
3 files changed, 86 insertions(+), 5 deletions(-)
diff --git a/stem/descriptor/__init__.py b/stem/descriptor/__init__.py
index 3a3d1838..862d5aca 100644
--- a/stem/descriptor/__init__.py
+++ b/stem/descriptor/__init__.py
@@ -975,7 +975,7 @@ class Descriptor(object):
:returns: **bytes** for the descriptor's contents
"""
- return self._raw_contents
+ return stem.util.str_tools._to_bytes(self._raw_contents)
def get_unrecognized_lines(self):
"""
diff --git a/stem/descriptor/hidden_service.py b/stem/descriptor/hidden_service.py
index 141f54b0..fcd29032 100644
--- a/stem/descriptor/hidden_service.py
+++ b/stem/descriptor/hidden_service.py
@@ -920,14 +920,14 @@ def _get_middle_descriptor_layer_body(encrypted):
b'%s' % (fake_pub_key_bytes_b64, fake_clients, encrypted)
-def _get_superencrypted_blob(intro_points, descriptor_signing_privkey, revision_counter, blinded_key, subcredential):
+def _get_superencrypted_blob(intro_points, revision_counter, blinded_key, subcredential):
"""
Get the superencrypted blob (which also includes the encrypted blob) that
should be attached to the descriptor
"""
- inner_descriptor_layer = stem.util.str_tools._to_bytes('create2-formats 2\n' + '\n'.join(map(IntroductionPointV3.encode, intro_points)) + '\n')
- inner_ciphertext_b64 = b'encrypted\n' + _encrypt_layer(inner_descriptor_layer, b'hsdir-encrypted-data', revision_counter, subcredential, blinded_key)
+ inner_layer = InnerLayer.create(introduction_points = intro_points)
+ inner_ciphertext_b64 = b'encrypted\n' + inner_layer.encrypt(revision_counter, blinded_key, subcredential)
middle_descriptor_layer = _get_middle_descriptor_layer_body(inner_ciphertext_b64)
@@ -1042,7 +1042,7 @@ class HiddenServiceDescriptorV3(BaseHiddenServiceDescriptor):
# this descriptor object so that we don't have to carry them around
# functions and instead we could use e.g. self.descriptor_signing_public_key
# But because this is a @classmethod this is not possible :/
- superencrypted_blob = _get_superencrypted_blob(intro_points, signing_key, revision_counter_int, blinded_pubkey_bytes, subcredential)
+ superencrypted_blob = _get_superencrypted_blob(intro_points, revision_counter_int, blinded_pubkey_bytes, subcredential)
desc_content = _descriptor_content(attr, exclude, (
('hs-descriptor', '3'),
@@ -1281,6 +1281,24 @@ class InnerLayer(Descriptor):
plaintext = _decrypt_layer(outer_layer.encrypted, b'hsdir-encrypted-data', revision_counter, subcredential, blinded_key)
return InnerLayer(plaintext, validate = True, outer_layer = outer_layer)
+ @classmethod
+ def content(cls, attr = None, exclude = (), sign = False, introduction_points = None):
+ if sign:
+ raise NotImplementedError('Signing of %s not implemented' % cls.__name__)
+
+ if introduction_points:
+ suffix = '\n' + '\n'.join(map(IntroductionPointV3.encode, introduction_points))
+ else:
+ suffix = ''
+
+ return _descriptor_content(attr, exclude, (
+ ('create2-formats', '2'),
+ )) + suffix
+
+ @classmethod
+ def create(cls, attr = None, exclude = (), validate = True, sign = False, introduction_points = None):
+ return cls(cls.content(attr, exclude, sign, introduction_points), validate = validate)
+
def __init__(self, content, validate = False, outer_layer = None):
super(InnerLayer, self).__init__(content, lazy_load = not validate)
self.outer = outer_layer
@@ -1304,6 +1322,21 @@ class InnerLayer(Descriptor):
else:
self._entries = entries
+ def encrypt(self, revision_counter, blinded_key, subcredential):
+ """
+ Encrypts into the content contained within the OuterLayer.
+
+ :param int revision_counter: descriptor revision number
+ :param bytes blinded_key: descriptor signing key
+ :param bytes subcredential: public key hash
+
+ :returns: base64 encoded content of the outer layer's 'encrypted' field
+ """
+
+ if not stem.prereq.is_crypto_available(ed25519 = True):
+ raise ImportError('Hidden service descriptor encryption requires cryptography version 2.6')
+
+ return _encrypt_layer(self.get_bytes(), b'hsdir-encrypted-data', revision_counter, subcredential, blinded_key)
# TODO: drop this alias in stem 2.x
diff --git a/test/unit/descriptor/hidden_service_v3.py b/test/unit/descriptor/hidden_service_v3.py
index 163ea3db..f6173768 100644
--- a/test/unit/descriptor/hidden_service_v3.py
+++ b/test/unit/descriptor/hidden_service_v3.py
@@ -28,6 +28,12 @@ from test.unit.descriptor import (
)
try:
+ # added in python 2.7
+ from collections import OrderedDict
+except ImportError:
+ from stem.util.ordereddict import OrderedDict
+
+try:
# added in python 3.3
from unittest.mock import patch, Mock
except ImportError:
@@ -284,6 +290,48 @@ class TestHiddenServiceDescriptorV3(unittest.TestCase):
reparsed = IntroductionPointV3.parse(intro_point.encode())
self.assertEqual(intro_point, reparsed)
+ def test_inner_layer_creation(self):
+ """
+ Internal layer creation.
+ """
+
+ # minimal layer
+
+ self.assertEqual('create2-formats 2', InnerLayer.content())
+ self.assertEqual([2], InnerLayer.create().formats)
+
+ # specify their only mandatory parameter (formats)
+
+ self.assertEqual('create2-formats 1 2 3', InnerLayer.content({'create2-formats': '1 2 3'}))
+ self.assertEqual([1, 2, 3], InnerLayer.create({'create2-formats': '1 2 3'}).formats)
+
+ # include optional parameters
+
+ desc = InnerLayer.create(OrderedDict((
+ ('intro-auth-required', 'ed25519'),
+ ('single-onion-service', ''),
+ )))
+
+ self.assertEqual([2], desc.formats)
+ self.assertEqual(['ed25519'], desc.intro_auth)
+ self.assertEqual(True, desc.is_single_service)
+ self.assertEqual([], desc.introduction_points)
+
+ # include introduction points
+
+ desc = InnerLayer.create(introduction_points = [
+ IntroductionPointV3.create('1.1.1.1', 9001),
+ IntroductionPointV3.create('2.2.2.2', 9001),
+ IntroductionPointV3.create('3.3.3.3', 9001),
+ ])
+
+ self.assertEqual(3, len(desc.introduction_points))
+ self.assertEqual('1.1.1.1', desc.introduction_points[0].link_specifiers[0].address)
+
+ self.assertTrue(InnerLayer.content(introduction_points = [
+ IntroductionPointV3.create('1.1.1.1', 9001),
+ ]).startswith('create2-formats 2\nintroduction-point AQAGAQEBASMp'))
+
@test.require.ed25519_support
def test_encode_decode_descriptor(self):
"""
_______________________________________________
tor-commits mailing list
tor-commits@xxxxxxxxxxxxxxxxxxxx
https://lists.torproject.org/cgi-bin/mailman/listinfo/tor-commits