[Author Prev][Author Next][Thread Prev][Thread Next][Author Index][Thread Index]

[tor-commits] [stem/master] Implement ed25519 certificate encoding.

commit c723cb5c3eba172b414b61c8e9560ddc89b39303
Author: George Kadianakis <desnacked@xxxxxxxxxx>
Date:   Thu Oct 10 20:43:19 2019 +0300

    Implement ed25519 certificate encoding.
    I had trouble adapting the certificate.py code to do encoding, since it seems
    like the Ed25519CertificateV1 class has been made with parsing in mind, but in
    this case I will need to provide its raw attributes (keys, etc.)  and have it
    encode them into an actual certificate.
    I made my own class that is meant for encoding but we should likely merge these
    into one!
 stem/descriptor/certificate.py | 150 ++++++++++++++++++++++++++++++++++-------
 1 file changed, 126 insertions(+), 24 deletions(-)

diff --git a/stem/descriptor/certificate.py b/stem/descriptor/certificate.py
index 6ba92ee7..ac6d455f 100644
--- a/stem/descriptor/certificate.py
+++ b/stem/descriptor/certificate.py
@@ -78,19 +78,19 @@ import stem.prereq
 import stem.descriptor.server_descriptor
 import stem.util.enum
 import stem.util.str_tools
+import stem.util
+from cryptography.hazmat.primitives import serialization
 ED25519_ROUTER_SIGNATURE_PREFIX = b'Tor router descriptor signature v1'
 CertType = stem.util.enum.UppercaseEnum(
-  'AUTH',
 ExtensionType = stem.util.enum.Enum(('HAS_SIGNING_KEY', 4),)
 ExtensionFlag = stem.util.enum.UppercaseEnum('AFFECTS_VALIDATION', 'UNKNOWN')
@@ -157,6 +157,7 @@ class Ed25519Certificate(object):
     def _parse(descriptor, entries):
       value, block_type, block_contents = entries[keyword][0]
+      # XXX ATAGAR This ValueError never actually surfaces if it triggers...
       if not block_contents or block_type != 'ED25519 CERT':
         raise ValueError("'%s' should be followed by a ED25519 CERT block, but was a %s" % (keyword, block_type))
@@ -188,26 +189,16 @@ class Ed25519CertificateV1(Ed25519Certificate):
       raise ValueError('Ed25519 certificate was %i bytes, but should be at least %i' % (len(decoded), ED25519_HEADER_LENGTH + ED25519_SIGNATURE_LENGTH))
     cert_type = stem.util.str_tools._to_int(decoded[1:2])
+    try:
+      self.type = CertType.keys()[cert_type]
+    except IndexError:
+      raise ValueError('Certificate has wrong cert type')
-    if cert_type in (0, 1, 2, 3):
+    # Catch some invalid cert types
+    if self.type in ('RESERVED_0', 'RESERVED_1', 'RESERVED_2', 'RESERVED_3'):
       raise ValueError('Ed25519 certificate cannot have a type of %i. This is reserved to avoid conflicts with tor CERTS cells.' % cert_type)
-    elif cert_type == 4:
-      self.type = CertType.SIGNING
-    elif cert_type == 5:
-      self.type = CertType.LINK_CERT
-    elif cert_type == 6:
-      self.type = CertType.AUTH
-    elif cert_type == 7:
+    elif self.type == ('RESERVED_RSA'):
       raise ValueError('Ed25519 certificate cannot have a type of 7. This is reserved for RSA identity cross-certification.')
-    elif cert_type == 8:
-      # see rend-spec-v3.txt appendix E for these definitions
-      self.type = CertType.HS_V3_DESC_SIGNING
-    elif cert_type == 9:
-      self.type = CertType.HS_V3_INTRO_AUTH
-    elif cert_type == 0x0B:
-      self.type = CertType.HS_V3_INTRO_ENCRYPT
-    else:
-      raise ValueError('Ed25519 certificate type %i is unrecognized' % cert_type)
     # expiration time is in hours since epoch
@@ -333,3 +324,114 @@ class Ed25519CertificateV1(Ed25519Certificate):
       verify_key.verify(signature_bytes, descriptor_sha256_digest)
     except InvalidSignature:
       raise ValueError('Descriptor Ed25519 certificate signature invalid (Signature was forged or corrupt)')
+class MyED25519Certificate(object):
+  """
+  This class represents an ed25519 certificate and it's made for encoding it into a string.
+  We should merge this class with the one above.
+  """
+  def __init__(self, cert_type, expiration_date,
+               cert_key_type, certified_pub_key,
+               signing_priv_key, include_signing_key,
+               version=1):
+    """
+    :var int version
+    :var int cert_type
+    :var CertType cert_type
+    :var datetime expiration_date
+    :var int cert_key_type
+    :var ED25519PublicKey certified_pub_key
+    :var ED25519PrivateKey signing_priv_key
+    :var bool include_signing_key
+    """
+    self.version = version
+    self.cert_type = cert_type
+    self.expiration_date = expiration_date
+    self.cert_key_type = cert_key_type
+    self.certified_pub_key = certified_pub_key
+    self.signing_priv_key = signing_priv_key
+    self.signing_pub_key = signing_priv_key.public_key()
+    self.include_signing_key = include_signing_key
+    # XXX validate params
+  def _get_certificate_signature(self, msg_body):
+    return self.signing_priv_key.sign(msg_body)
+  def _get_cert_extensions_bytes(self):
+    """
+    Build the cert extensions part of the certificate
+    """
+    n_extensions = 0
+    # If we need to include the signing key, let's create the extension body
+    #         ExtLength [2 bytes]
+    #         ExtType   [1 byte]
+    #         ExtFlags  [1 byte]
+    #         ExtData   [ExtLength bytes]
+    if self.include_signing_key:
+      n_extensions += 1
+      signing_pubkey_bytes = self.signing_pub_key.public_bytes(encoding=serialization.Encoding.Raw,
+                                                               format=serialization.PublicFormat.Raw)
+      ext_length = len(signing_pubkey_bytes)
+      ext_type = 4
+      ext_flags = 0
+      ext_data = signing_pubkey_bytes
+    # Now build the actual byte representation of any extensions
+    ext_obj = bytes()
+    ext_obj += n_extensions.to_bytes(1, 'big')
+    if self.include_signing_key:
+      ext_obj += ext_length.to_bytes(2, 'big')
+      ext_obj += ext_type.to_bytes(1, 'big')
+      ext_obj += ext_flags.to_bytes(1, 'big')
+      ext_obj += ext_data
+    return ext_obj
+  def encode(self):
+    """Return a bytes representation of this certificate."""
+    obj = bytes()
+    # Encode VERSION
+    obj += self.version.to_bytes(1, 'big')
+    # Encode CERT_TYPE
+    try:
+      cert_type_int = CertType.index_of(self.cert_type)
+    except ValueError:
+      raise ValueError("Bad cert type %s" % self.cert_type)
+    obj += cert_type_int.to_bytes(1, 'big')
+    expiration_seconds_since_epoch = stem.util.datetime_to_unix(self.expiration_date)
+    expiration_hours_since_epoch = int(expiration_seconds_since_epoch) // 3600
+    obj += expiration_hours_since_epoch.to_bytes(4, 'big')
+    # Encode CERT_KEY_TYPE
+    obj += self.cert_key_type.to_bytes(1, 'big')
+    # Encode CERTIFIED_KEY
+    certified_pub_key_bytes = self.certified_pub_key.public_bytes(encoding=serialization.Encoding.Raw,
+                                                          format=serialization.PublicFormat.Raw)
+    assert(len(certified_pub_key_bytes) == 32)
+    obj += certified_pub_key_bytes
+    obj += self._get_cert_extensions_bytes()
+    # Do the signature on the body we have so far
+    obj += self._get_certificate_signature(obj)
+    return obj
+  def encode_for_descriptor(self):
+    cert_bytes = self.encode()
+    cert_b64 = base64.b64encode(cert_bytes)
+    cert_b64 = b'\n'.join(stem.util.str_tools._split_by_length(cert_b64, 64))
+    return b'-----BEGIN ED25519 CERT-----\n%s\n-----END ED25519 CERT-----' % cert_b64

tor-commits mailing list