[Author Prev][Author Next][Thread Prev][Thread Next][Author Index][Thread Index]
[tor-commits] [stem/master] HSv3 descriptor creation test
commit eca1fe8073c1f43e3f9eab114bcc192f6ca0deac
Author: Damian Johnson <atagar@xxxxxxxxxxxxxx>
Date: Fri Nov 15 13:22:45 2019 -0800
HSv3 descriptor creation test
Now that we've revised HiddenServiceDescriptorV3.content() we can test it
in a similar fashion to inner/outer layers. This provides the same coverage
as test_encode_decode_descriptor() so that test is now redundant.
Fixing a few bugs this spotted along the way and renaming the
'public_key_from_address' functions to specify that they take identity keys
instead (we have so many key types I lost track of what these truly did when I
originally tacked 'em on).
---
stem/descriptor/hidden_service.py | 28 ++++++----
test/unit/descriptor/hidden_service_v3.py | 88 ++++++++++++++-----------------
2 files changed, 57 insertions(+), 59 deletions(-)
diff --git a/stem/descriptor/hidden_service.py b/stem/descriptor/hidden_service.py
index dd4ab934..cdb8d645 100644
--- a/stem/descriptor/hidden_service.py
+++ b/stem/descriptor/hidden_service.py
@@ -957,6 +957,7 @@ class HiddenServiceDescriptorV3(BaseHiddenServiceDescriptor):
blinded_key = stem.descriptor.hsv3_crypto.HSv3PrivateBlindedKey(identity_key, blinding_param = blinding_param)
subcredential = HiddenServiceDescriptorV3._subcredential(identity_key, blinded_key.blinded_pubkey)
+ custom_sig = attr.pop('signature') if (attr and 'signature' in attr) else None
if not outer_layer:
outer_layer = OuterLayer.create(
@@ -983,7 +984,9 @@ class HiddenServiceDescriptorV3(BaseHiddenServiceDescriptor):
('superencrypted', b'\n' + outer_layer._encrypt(revision_counter, subcredential, blinded_key.blinded_pubkey)),
), ()) + b'\n'
- if 'signature' not in exclude:
+ if custom_sig:
+ desc_content += b'signature %s' % custom_sig
+ elif 'signature' not in exclude:
sig_content = stem.descriptor.certificate.SIG_PREFIX_HS_V3 + desc_content
desc_content += b'signature %s' % base64.b64encode(signing_key.sign(sig_content)).rstrip(b'=')
@@ -991,7 +994,7 @@ class HiddenServiceDescriptorV3(BaseHiddenServiceDescriptor):
@classmethod
def create(cls, attr = None, exclude = (), validate = True, sign = False, inner_layer = None, outer_layer = None, identity_key = None, signing_key = None, signing_cert = None, revision_counter = None, blinding_param = None):
- return cls(cls.content(attr, exclude, sign, inner_layer, outer_layer, identity_key, signing_key, signing_cert, revision_counter, blinding_param), validate = validate, skip_crypto_validation = not sign)
+ return cls(cls.content(attr, exclude, sign, inner_layer, outer_layer, identity_key, signing_key, signing_cert, revision_counter, blinding_param), validate = validate)
def __init__(self, raw_contents, validate = False):
super(HiddenServiceDescriptorV3, self).__init__(raw_contents, lazy_load = not validate)
@@ -1045,7 +1048,7 @@ class HiddenServiceDescriptorV3(BaseHiddenServiceDescriptor):
if not blinded_key:
raise ValueError('No signing key is present')
- identity_public_key = HiddenServiceDescriptorV3.public_key_from_address(onion_address)
+ identity_public_key = HiddenServiceDescriptorV3.identity_key_from_address(onion_address)
subcredential = HiddenServiceDescriptorV3._subcredential(identity_public_key, blinded_key)
outer_layer = OuterLayer._decrypt(self.superencrypted, self.revision_counter, subcredential, blinded_key)
@@ -1054,11 +1057,12 @@ class HiddenServiceDescriptorV3(BaseHiddenServiceDescriptor):
return self._inner_layer
@staticmethod
- def address_from_public_key(pubkey, suffix = True):
+ def address_from_identity_key(key, suffix = True):
"""
- Converts a hidden service public key into its address.
+ Converts a hidden service identity key into its address. This accepts all
+ key formats (private, public, or public bytes).
- :param bytes pubkey: hidden service public key
+ :param Ed25519PublicKey,Ed25519PrivateKey,bytes key: hidden service identity key
:param bool suffix: includes the '.onion' suffix if true, excluded otherwise
:returns: **unicode** hidden service address
@@ -1069,20 +1073,22 @@ class HiddenServiceDescriptorV3(BaseHiddenServiceDescriptor):
if not stem.prereq._is_sha3_available():
raise ImportError('Hidden service address conversion requires python 3.6+ or the pysha3 module (https://pypi.org/project/pysha3/)')
+ key = stem.util._pubkey_bytes(key) # normalize key into bytes
+
version = stem.client.datatype.Size.CHAR.pack(3)
- checksum = hashlib.sha3_256(CHECKSUM_CONSTANT + pubkey + version).digest()[:2]
- onion_address = base64.b32encode(pubkey + checksum + version)
+ checksum = hashlib.sha3_256(CHECKSUM_CONSTANT + key + version).digest()[:2]
+ onion_address = base64.b32encode(key + checksum + version)
return stem.util.str_tools._to_unicode(onion_address + b'.onion' if suffix else onion_address).lower()
@staticmethod
- def public_key_from_address(onion_address):
+ def identity_key_from_address(onion_address):
"""
- Converts a hidden service address into its public key.
+ Converts a hidden service address into its public identity key.
:param str onion_address: hidden service address
- :returns: **bytes** for the hidden service's public key
+ :returns: **bytes** for the hidden service's public identity key
:raises:
* **ImportError** if sha3 unsupported
diff --git a/test/unit/descriptor/hidden_service_v3.py b/test/unit/descriptor/hidden_service_v3.py
index 5bfd5cc4..34108188 100644
--- a/test/unit/descriptor/hidden_service_v3.py
+++ b/test/unit/descriptor/hidden_service_v3.py
@@ -3,7 +3,6 @@ Unit tests for stem.descriptor.hidden_service for version 3.
"""
import base64
-import datetime
import functools
import unittest
@@ -249,14 +248,14 @@ class TestHiddenServiceDescriptorV3(unittest.TestCase):
expect_invalid_attr(self, {'revision-counter': test_value}, 'revision_counter')
@require_sha3
- def test_address_from_public_key(self):
- self.assertEqual(HS_ADDRESS, HiddenServiceDescriptorV3.address_from_public_key(HS_PUBKEY))
+ def test_address_from_identity_key(self):
+ self.assertEqual(HS_ADDRESS, HiddenServiceDescriptorV3.address_from_identity_key(HS_PUBKEY))
@require_sha3
- def test_public_key_from_address(self):
- self.assertEqual(HS_PUBKEY, HiddenServiceDescriptorV3.public_key_from_address(HS_ADDRESS))
- self.assertRaisesWith(ValueError, "'boom.onion' isn't a valid hidden service v3 address", HiddenServiceDescriptorV3.public_key_from_address, 'boom')
- self.assertRaisesWith(ValueError, 'Bad checksum (expected def7 but was 842e)', HiddenServiceDescriptorV3.public_key_from_address, '5' * 56)
+ def test_identity_key_from_address(self):
+ self.assertEqual(HS_PUBKEY, HiddenServiceDescriptorV3.identity_key_from_address(HS_ADDRESS))
+ self.assertRaisesWith(ValueError, "'boom.onion' isn't a valid hidden service v3 address", HiddenServiceDescriptorV3.identity_key_from_address, 'boom')
+ self.assertRaisesWith(ValueError, 'Bad checksum (expected def7 but was 842e)', HiddenServiceDescriptorV3.identity_key_from_address, '5' * 56)
def test_intro_point_parse(self):
"""
@@ -430,60 +429,53 @@ class TestHiddenServiceDescriptorV3(unittest.TestCase):
self.assertEqual('1.1.1.1', inner_layer.introduction_points[0].link_specifiers[0].address)
@test.require.ed25519_support
- def test_encode_decode_descriptor(self):
+ def test_descriptor_creation(self):
"""
- Encode an HSv3 descriptor and then decode it and make sure you get the intended results.
-
- This test is from the point of view of the onionbalance, so the object that
- this test generates is the data that onionbalance also has available when
- making onion service descriptors.
+ HiddenServiceDescriptorV3 creation.
"""
if SKIP_SLOW_TESTS:
return
- from cryptography.hazmat.primitives.asymmetric.ed25519 import Ed25519PrivateKey
- from cryptography.hazmat.primitives.asymmetric.x25519 import X25519PrivateKey
-
- onion_key = X25519PrivateKey.generate()
- enc_key = X25519PrivateKey.generate()
- auth_key = Ed25519PrivateKey.generate()
- signing_key = Ed25519PrivateKey.generate()
+ # minimal descriptor
- expiration = datetime.datetime.utcnow() + datetime.timedelta(hours = 54)
+ self.assertTrue(HiddenServiceDescriptorV3.content().startswith('hs-descriptor 3\ndescriptor-lifetime 180\n'))
+ self.assertEqual(180, HiddenServiceDescriptorV3.create().lifetime)
- # Build the service
- private_identity_key = Ed25519PrivateKey.from_private_bytes(b'a' * 32)
- public_identity_key = private_identity_key.public_key()
+ # specify the parameters
- onion_address = HiddenServiceDescriptorV3.address_from_public_key(stem.util._pubkey_bytes(public_identity_key))
+ desc = HiddenServiceDescriptorV3.create({
+ 'hs-descriptor': '4',
+ 'descriptor-lifetime': '123',
+ 'descriptor-signing-key-cert': '\n-----BEGIN ED25519 CERT-----\nmalformed block\n-----END ED25519 CERT-----',
+ 'revision-counter': '5',
+ 'superencrypted': '\n-----BEGIN MESSAGE-----\nmalformed block\n-----END MESSAGE-----',
+ 'signature': 'abcde',
+ }, validate = False)
+
+ self.assertEqual(4, desc.version)
+ self.assertEqual(123, desc.lifetime)
+ self.assertEqual(None, desc.signing_cert) # malformed cert dropped because validation is disabled
+ self.assertEqual(5, desc.revision_counter)
+ self.assertEqual('-----BEGIN MESSAGE-----\nmalformed block\n-----END MESSAGE-----', desc.superencrypted)
+ self.assertEqual('abcde', desc.signature)
- intro_points = [
- IntroductionPointV3.create('1.1.1.1', 9001, expiration, onion_key, enc_key, auth_key, signing_key),
- IntroductionPointV3.create('2.2.2.2', 9001, expiration, onion_key, enc_key, auth_key, signing_key),
- IntroductionPointV3.create('3.3.3.3', 9001, expiration, onion_key, enc_key, auth_key, signing_key),
- ]
+ # include introduction points
- # TODO: replace with bytes.fromhex() when we drop python 2.x support
+ from cryptography.hazmat.primitives.asymmetric.ed25519 import Ed25519PrivateKey
- blind_param = bytearray.fromhex('677776AE42464CAAB0DF0BF1E68A5FB651A390A6A8243CF4B60EE73A6AC2E4E3')
+ identity_key = Ed25519PrivateKey.generate()
+ onion_address = HiddenServiceDescriptorV3.address_from_identity_key(identity_key)
- # Build the descriptor
- desc_string = HiddenServiceDescriptorV3.content(identity_key = private_identity_key, inner_layer = InnerLayer.create(introduction_points = intro_points), blinding_param = blind_param)
+ desc = HiddenServiceDescriptorV3.create(
+ identity_key = identity_key,
+ inner_layer = InnerLayer.create(introduction_points = [
+ IntroductionPointV3.create('1.1.1.1', 9001),
+ IntroductionPointV3.create('2.2.2.2', 9001),
+ IntroductionPointV3.create('3.3.3.3', 9001),
+ ]),
+ )
- # Parse the descriptor
- desc = HiddenServiceDescriptorV3.from_str(desc_string)
inner_layer = desc.decrypt(onion_address)
-
self.assertEqual(3, len(inner_layer.introduction_points))
-
- for i, intro_point in enumerate(inner_layer.introduction_points):
- original = intro_points[i]
-
- self.assertEqual(original.enc_key_raw, intro_point.enc_key_raw)
- self.assertEqual(original.onion_key_raw, intro_point.onion_key_raw)
- self.assertEqual(original.auth_key_cert.key, intro_point.auth_key_cert.key)
-
- self.assertEqual(intro_point.enc_key_raw, base64.b64encode(stem.util._pubkey_bytes(intro_point.enc_key())))
- self.assertEqual(intro_point.onion_key_raw, base64.b64encode(stem.util._pubkey_bytes(intro_point.onion_key())))
- self.assertEqual(intro_point.auth_key_cert.key, stem.util._pubkey_bytes(intro_point.auth_key()))
+ self.assertEqual('1.1.1.1', inner_layer.introduction_points[0].link_specifiers[0].address)
_______________________________________________
tor-commits mailing list
tor-commits@xxxxxxxxxxxxxxxxxxxx
https://lists.torproject.org/cgi-bin/mailman/listinfo/tor-commits