diff --git a/.gitignore b/.gitignore index 83ef3a1..fe78c55 100644 --- a/.gitignore +++ b/.gitignore @@ -1,8 +1,9 @@ -*.md -!README.md *.cer *.crt *.der +/.idea/ +*.md *.pem +__pycache__/ +!README.md */*.svg -/.idea/ diff --git a/analyze_certs.py b/analyze_certs.py index 06ee7e5..9d3cd65 100644 --- a/analyze_certs.py +++ b/analyze_certs.py @@ -8,195 +8,17 @@ from anytree import Node, RenderTree from anytree.exporter import DotExporter from cryptography import x509 from cryptography.exceptions import UnsupportedAlgorithm -from cryptography.hazmat._oid import ( - AuthorityInformationAccessOID, - ExtendedKeyUsageOID, - ExtensionOID, - NameOID, -) -from cryptography.hazmat.backends.openssl import rsa -from cryptography.hazmat.primitives.asymmetric.ec import EllipticCurvePublicKey -from cryptography.hazmat.primitives.asymmetric.ed25519 import Ed25519PublicKey +import helpers +from helpers import format_key_type, format_oid_link, parse_certificate, table_line -def analyze_certificate(cert, include_old) -> (bytes, bytes, x509.Certificate): - with open(cert, "rb") as f: - cert_bytes = f.read() - try: - parsed = x509.load_pem_x509_certificate(cert_bytes) - except ValueError: - try: - parsed = x509.load_der_x509_certificate(cert_bytes) - except ValueError: - print(f"skip {cert}", file=sys.stderr) - return None, None, None - - if not include_old: - if parsed.not_valid_after < datetime.utcnow(): - print( - f"skip {cert} not valid after {parsed.not_valid_after}", file=sys.stderr - ) - return None, None, None - - return parsed.subject.public_bytes(), parsed.issuer.public_bytes(), parsed - - -def edge_type_func(node, child): +def edge_type_func(*_): return "--" -def get_key_type(public_key): - if isinstance(public_key, rsa.RSAPublicKey): - return f"RSA {public_key.key_size}" - if isinstance(public_key, EllipticCurvePublicKey): - return f"EC {public_key.curve.name}" - if isinstance(public_key, Ed25519PublicKey): - return "ED25519" - return type(public_key) - - def node_name_func(node): - try: - key_type = get_key_type(node.data.public_key()) - except UnsupportedAlgorithm: - key_type = "unsupported" - return f"{node.name}\n{hex(node.data.serial_number)}\n{key_type}" - - -def format_key_usage(value: x509.KeyUsage): - usages = [] - if value.digital_signature: - usages.append("digital signature") - if value.content_commitment: - usages.append("non repudiation") - if value.key_encipherment: - usages.append("key encipherment") - if value.data_encipherment: - usages.append("data encipherment") - if value.key_agreement: - usages.append("key agreement") - if value.encipher_only: - usages.append("encipher only") - if value.decipher_only: - usages.append("decipher only") - if value.key_cert_sign: - usages.append("key cert sign") - if value.crl_sign: - usages.append("crl sign") - return ", ".join(usages) - - -def format_ext_key_usage(value: x509.ExtendedKeyUsage): - usages = [] - - labels = { - ExtendedKeyUsageOID.ANY_EXTENDED_KEY_USAGE: "any", - ExtendedKeyUsageOID.SERVER_AUTH: "server auth", - ExtendedKeyUsageOID.CLIENT_AUTH: "client auth", - ExtendedKeyUsageOID.CODE_SIGNING: "code signing", - ExtendedKeyUsageOID.EMAIL_PROTECTION: "email protection", - ExtendedKeyUsageOID.IPSEC_IKE: "IPSEC IKE", - ExtendedKeyUsageOID.KERBEROS_PKINIT_KDC: "Kerberos PKINIT KDC", - ExtendedKeyUsageOID.OCSP_SIGNING: "OCSP signing", - ExtendedKeyUsageOID.SMARTCARD_LOGON: "SmartCard logon", - ExtendedKeyUsageOID.TIME_STAMPING: "timestamping", - } - - for eku in value: - if eku in labels: - usages.append(labels[eku]) - else: - usages.append(format_oid_link(eku)) - - return ", ".join(usages) - - -def format_crl_distribution_points(value: x509.CRLDistributionPoints): - dps = [] - for dp in value: - if dp.full_name is not None: - for item in dp.full_name: - if isinstance(item, x509.UniformResourceIdentifier): - dps.append(item.value) - else: - dps.append(item) - else: - dps.append(f"unsupported {dp}") - return ", ".join(dps) - - -def format_access_description(ai: x509.AccessDescription): - if ai.access_method == AuthorityInformationAccessOID.OCSP: - prefix = "OCSP:" - elif ai.access_method == AuthorityInformationAccessOID.CA_ISSUERS: - prefix = "CA Issuers:" - else: - prefix = format_oid_link(ai.access_method) - - if isinstance(ai.access_location, x509.UniformResourceIdentifier): - return f"{prefix} {ai.access_location.value}" - - return f"{prefix} {ai.access_location}" - - -def format_authinfo_access(value: x509.AuthorityInformationAccess): - access_infos = [] - for ai in value: - access_infos.append(format_access_description(ai)) - return ", ".join(access_infos) - - -def format_oid_link(oid: x509.ObjectIdentifier) -> str: - return f"[{oid.dotted_string}](https://oidref.com/{oid.dotted_string})" - - -def format_policy_information(pi: x509.PolicyInformation): - if pi.policy_identifier == x509.ObjectIdentifier("2.5.29.32.0"): - prefix = "anyPolicy" - else: - prefix = format_oid_link(pi.policy_identifier) - qualifiers = [] - - if pi.policy_qualifiers is None: - return prefix - - for pq in pi.policy_qualifiers: - if isinstance(pq, str): - qualifiers.append(pq) - elif isinstance(pq, x509.UserNotice): - qualifiers.append(pq) - else: - qualifiers.append(f"unknown policy information {pq}") - - value = ", ".join(qualifiers) - return f"{prefix}: {value}" - - -def format_certificate_policies(value: x509.CertificatePolicies): - policies = [] - for pi in value: - policies.append(format_policy_information(pi)) - - return ", ".join(policies) - - -def format_subject_alternative_names(value: x509.SubjectAlternativeName) -> str: - alt_names = [] - - for name_type in ( - x509.DNSName, - x509.RFC822Name, - x509.DirectoryName, - x509.OtherName, - x509.UniformResourceIdentifier, - ): - alt_names.extend(value.get_values_for_type(name_type)) - - if not alt_names: - raise Exception(f"unhandled {value}") - - return ", ".join(alt_names) + return f"{node.name}\n{hex(node.data.serial_number)}\n{format_key_type(node.data)}" def cert_to_markdown(cert: x509.Certificate): @@ -205,11 +27,7 @@ def cert_to_markdown(cert: x509.Certificate): print("| Subject: |", cert.subject.rfc4514_string(), "|") print("| Issuer: |", cert.issuer.rfc4514_string(), "|") print(f"| Serial: | {hex(cert.serial_number)} ({cert.serial_number}) |") - try: - pubkey = get_key_type(cert.public_key()) - except UnsupportedAlgorithm: - pubkey = "unsupported public key" - print(f"| Public Key: | {pubkey} |") + print(f"| Public Key: | {format_key_type(cert)} |") try: hash_name = cert.signature_hash_algorithm.name except UnsupportedAlgorithm: @@ -219,46 +37,9 @@ def cert_to_markdown(cert: x509.Certificate): print(f"| Not after: | {cert.not_valid_after} |") for ext in cert.extensions: - if ext.oid == ExtensionOID.BASIC_CONSTRAINTS: + if ext.oid == x509.OID_BASIC_CONSTRAINTS: if ext.value.path_length is not None: print(f"| Path length: | {ext.value.path_length} |") - elif ext.oid == ExtensionOID.SUBJECT_KEY_IDENTIFIER: - print(f"| Subject Key Id: | {ext.value.digest.hex(':')} |") - elif ext.oid == ExtensionOID.AUTHORITY_KEY_IDENTIFIER: - if ext.value.key_identifier is not None: - print(f"| Authority Key Id: | {ext.value.key_identifier.hex(':')} |") - else: - print( - "| Authority Key Id: |" - f" issuer {ext.value.authority_cert_issuer[0].value.rfc4514_string()}," - f" serial {ext.value.authority_cert_serial_number} |" - ) - elif ext.oid == ExtensionOID.KEY_USAGE: - print("| Key Usage: |", format_key_usage(ext.value), "|") - elif ext.oid == ExtensionOID.EXTENDED_KEY_USAGE: - print("| Extended Key Usage: |", format_ext_key_usage(ext.value), "|") - elif ext.oid == ExtensionOID.CRL_DISTRIBUTION_POINTS: - print( - "| CRL Distribution Points: |", - format_crl_distribution_points(ext.value), - "|", - ) - elif ext.oid == ExtensionOID.AUTHORITY_INFORMATION_ACCESS: - print( - "| Authority information access: |", - format_authinfo_access(ext.value), - "|", - ) - elif ext.oid == ExtensionOID.CERTIFICATE_POLICIES: - print( - "| Certificate policies: |", format_certificate_policies(ext.value), "|" - ) - elif ext.oid == ExtensionOID.SUBJECT_ALTERNATIVE_NAME: - print( - "| Subject alternative names: |", - format_subject_alternative_names(ext.value), - "|", - ) elif ext.oid == x509.ObjectIdentifier("1.3.6.1.5.5.7.1.3"): print( "| Qualified Certificate Statements: |" @@ -269,9 +50,9 @@ def cert_to_markdown(cert: x509.Certificate): "| Private Key usage period | see [RFC-3280](https://tools.ietf.org/html/rfc3280.html) |" ) else: - print( - f"| unknown extension {ext.oid.dotted_string} |" - f" {ext.value} try {format_oid_link(ext.oid)} |" + table_line( + helpers.extension_label(ext.oid), + [helpers.format_extension(cert.extensions, ext.oid)], ) @@ -287,21 +68,30 @@ def analyze_certificates(directory, include_old): certs += glob.glob("*.pem", root_dir=directory) for cert in sorted(certs): - subject, issuer, attributes = analyze_certificate( - os.path.join(directory, cert), include_old - ) + certificate = parse_certificate(os.path.join(directory, cert)) - if subject is None: + if certificate is None: continue - map_key = f"{issuer}-{attributes.serial_number}" + if not include_old: + if certificate.not_valid_after < datetime.utcnow(): + print( + f"skip {cert} not valid after {certificate.not_valid_after}", + file=sys.stderr, + ) + continue + + subject = certificate.subject.public_bytes() + issuer = certificate.issuer.public_bytes() + + map_key = f"{issuer}-{certificate.serial_number}" if map_key in nodes: continue node = Node( - attributes.subject.get_attributes_for_oid(NameOID.COMMON_NAME)[0].value, - data=attributes, + certificate.subject.get_attributes_for_oid(x509.OID_COMMON_NAME)[0].value, + data=certificate, ) nodes[map_key] = node diff --git a/compare_certs.py b/compare_certs.py new file mode 100644 index 0000000..649572c --- /dev/null +++ b/compare_certs.py @@ -0,0 +1,107 @@ +""" +Script to compare a set of X.509 certificate files +""" +import argparse +import typing + +from cryptography import x509 + +import helpers + +DEFAULT_OIDS = [ + x509.OID_BASIC_CONSTRAINTS, + x509.OID_KEY_USAGE, + x509.OID_EXTENDED_KEY_USAGE, + x509.OID_AUTHORITY_KEY_IDENTIFIER, + x509.OID_SUBJECT_KEY_IDENTIFIER, + x509.OID_AUTHORITY_KEY_IDENTIFIER, + x509.OID_AUTHORITY_INFORMATION_ACCESS, + x509.OID_CRL_DISTRIBUTION_POINTS, + x509.OID_CERTIFICATE_POLICIES, +] + + +def table_line(key, values): + print(f"| {key} |", " | ".join(values), "|") + + +def format_duration(cert: x509.Certificate) -> str: + return f"{cert.not_valid_before} - {cert.not_valid_after} = {(cert.not_valid_after - cert.not_valid_before).days}d" + + +def compare_certificates(certificates: typing.List[str]): + parsed = {} + + for cert_filename in certificates: + parsed[cert_filename] = helpers.parse_certificate(cert_filename) + + print("| |", " | ".join([f"`{name}`" for name in certificates]), "|") + print("|---|" + "---|" * len(certificates)) + table_line("Subject", [parsed[fn].subject.rfc4514_string() for fn in certificates]) + table_line( + "Issuer", + [ + parsed[fn].issuer.rfc4514_string( + attr_name_overrides={ + x509.ObjectIdentifier("1.2.840.113549.1.9.1"): "emailAddress" + } + ) + for fn in certificates + ], + ) + table_line( + "Public Key algorithm", + [helpers.format_key_type(parsed[fn]) for fn in certificates], + ) + table_line( + "Hash algorithm", + [helpers.format_hash_algorithm(parsed[fn]) for fn in certificates], + ) + table_line( + "Validity duration", [format_duration(parsed[fn]) for fn in certificates] + ) + + extra_oids = set() + for cert in certificates: + for oid in [ + ext.oid for ext in parsed[cert].extensions if ext.oid not in DEFAULT_OIDS + ]: + extra_oids.add(oid) + + for oid in DEFAULT_OIDS: + table_line( + helpers.extension_label(oid), + [ + helpers.format_extension(parsed[fn].extensions, oid) + for fn in certificates + ], + ) + + for oid in extra_oids: + table_line( + helpers.extension_label(oid), + [ + helpers.format_extension(parsed[fn].extensions, oid) + for fn in certificates + ], + ) + + +def main(): + parser = argparse.ArgumentParser( + prog="compare certs", + description="compare a certificate to a set of other certificates", + ) + parser.add_argument("first", help="first certificates") + parser.add_argument("others", nargs="+", help="other certificates") + + args = parser.parse_args() + + cert_list = [args.first] + cert_list.extend(args.others) + + compare_certificates(cert_list) + + +if __name__ == "__main__": + main() diff --git a/helpers.py b/helpers.py new file mode 100644 index 0000000..a44546e --- /dev/null +++ b/helpers.py @@ -0,0 +1,268 @@ +import sys +import typing + +from cryptography import x509 +from cryptography.exceptions import UnsupportedAlgorithm +from cryptography.hazmat.backends.openssl import rsa +from cryptography.hazmat.primitives.asymmetric.ec import EllipticCurvePublicKey +from cryptography.hazmat.primitives.asymmetric.ed25519 import Ed25519PublicKey + + +class WrongExtensionType(Exception): + def __init__(self, oid: x509.ObjectIdentifier): + super(f"wrong object id: {oid.dotted_string}") + + +def parse_certificate(cert: str) -> typing.Union[x509.Certificate, None]: + with open(cert, "rb") as f: + cert_bytes = f.read() + + try: + parsed = x509.load_pem_x509_certificate(cert_bytes) + except ValueError: + try: + parsed = x509.load_der_x509_certificate(cert_bytes) + except ValueError: + print(f"skip {cert}", file=sys.stderr) + return None + + return parsed + + +def format_key_usage(value: x509.KeyUsage): + usages = [] + if value.digital_signature: + usages.append("digital signature") + if value.content_commitment: + usages.append("non repudiation") + if value.key_encipherment: + usages.append("key encipherment") + if value.data_encipherment: + usages.append("data encipherment") + if value.key_agreement: + usages.append("key agreement") + if value.encipher_only: + usages.append("encipher only") + if value.decipher_only: + usages.append("decipher only") + if value.key_cert_sign: + usages.append("key cert sign") + if value.crl_sign: + usages.append("crl sign") + return ", ".join(usages) + + +def format_oid_link(oid: x509.ObjectIdentifier) -> str: + return f"[{oid.dotted_string}](https://oidref.com/{oid.dotted_string})" + + +def get_key_type(public_key): + if isinstance(public_key, rsa.RSAPublicKey): + return f"RSA {public_key.key_size}" + if isinstance(public_key, EllipticCurvePublicKey): + return f"EC {public_key.curve.name}" + if isinstance(public_key, Ed25519PublicKey): + return "ED25519" + return type(public_key) + + +def format_key_type(cert: x509.Certificate) -> str: + try: + key_type = get_key_type(cert.public_key()) + except UnsupportedAlgorithm: + key_type = "unsupported key type" + return key_type + + +def format_ext_key_usage(value: x509.ExtendedKeyUsage): + usages = [] + + labels = { + x509.ExtendedKeyUsageOID.ANY_EXTENDED_KEY_USAGE: "any", + x509.ExtendedKeyUsageOID.SERVER_AUTH: "server auth", + x509.ExtendedKeyUsageOID.CLIENT_AUTH: "client auth", + x509.ExtendedKeyUsageOID.CODE_SIGNING: "code signing", + x509.ExtendedKeyUsageOID.EMAIL_PROTECTION: "email protection", + x509.ExtendedKeyUsageOID.IPSEC_IKE: "IPSEC IKE", + x509.ExtendedKeyUsageOID.KERBEROS_PKINIT_KDC: "Kerberos PKINIT KDC", + x509.ExtendedKeyUsageOID.OCSP_SIGNING: "OCSP signing", + x509.ExtendedKeyUsageOID.SMARTCARD_LOGON: "SmartCard logon", + x509.ExtendedKeyUsageOID.TIME_STAMPING: "timestamping", + } + + for eku in value: + if eku in labels: + usages.append(labels[eku]) + else: + usages.append(format_oid_link(eku)) + + return ", ".join(usages) + + +def format_crl_distribution_points(value: x509.CRLDistributionPoints): + dps = [] + for dp in value: + if dp.full_name is not None: + for item in dp.full_name: + if isinstance(item, x509.UniformResourceIdentifier): + dps.append(item.value) + else: + dps.append(item) + else: + dps.append(f"unsupported {dp}") + return ", ".join(dps) + + +def format_access_description(ai: x509.AccessDescription): + if ai.access_method == x509.AuthorityInformationAccessOID.OCSP: + prefix = "OCSP:" + elif ai.access_method == x509.AuthorityInformationAccessOID.CA_ISSUERS: + prefix = "CA Issuers:" + else: + prefix = format_oid_link(ai.access_method) + + if isinstance(ai.access_location, x509.UniformResourceIdentifier): + return f"{prefix} {ai.access_location.value}" + + return f"{prefix} {ai.access_location}" + + +def format_authority_information_access(value: x509.AuthorityInformationAccess): + access_infos = [] + for ai in value: + access_infos.append(format_access_description(ai)) + return ", ".join(access_infos) + + +def format_policy_information(pi: x509.PolicyInformation): + if pi.policy_identifier == x509.ObjectIdentifier("2.5.29.32.0"): + prefix = "anyPolicy" + else: + prefix = format_oid_link(pi.policy_identifier) + qualifiers = [] + + if pi.policy_qualifiers is None: + return prefix + + for pq in pi.policy_qualifiers: + if isinstance(pq, str): + qualifiers.append(pq) + elif isinstance(pq, x509.UserNotice): + qualifiers.append(pq) + else: + qualifiers.append(f"unknown policy information {pq}") + + value = ", ".join(qualifiers) + return f"{prefix}: {value}" + + +def format_certificate_policies(value: x509.CertificatePolicies): + policies = [] + for pi in value: + policies.append(format_policy_information(pi)) + + return ", ".join(policies) + + +def format_subject_alternative_names(value: x509.SubjectAlternativeName) -> str: + alt_names = [] + + for name_type in ( + x509.DNSName, + x509.RFC822Name, + x509.DirectoryName, + x509.OtherName, + x509.UniformResourceIdentifier, + ): + alt_names.extend(value.get_values_for_type(name_type)) + + if not alt_names: + raise Exception(f"unhandled {value}") + + return ", ".join(alt_names) + + +def format_hash_algorithm(cert: x509.Certificate) -> str: + try: + return cert.signature_hash_algorithm.name + except UnsupportedAlgorithm: + return f"unsupported hash algorithm {format_oid_link(cert.signature_algorithm_oid)}" + + +def extension_label(oid: x509.ObjectIdentifier) -> str: + labels = { + x509.OID_BASIC_CONSTRAINTS: "Basic constraints", + x509.OID_KEY_USAGE: "Key usage", + x509.OID_EXTENDED_KEY_USAGE: "Extended key usage", + x509.OID_AUTHORITY_KEY_IDENTIFIER: "Authority key identifier", + x509.OID_SUBJECT_KEY_IDENTIFIER: "Subject key identifier", + x509.OID_AUTHORITY_INFORMATION_ACCESS: "Authority information access", + x509.OID_CRL_DISTRIBUTION_POINTS: "CRL distribution points", + x509.OID_CERTIFICATE_POLICIES: "Certificate policies", + } + if oid in labels: + return labels[oid] + return format_oid_link(oid) + + +def format_basic_constraints(bc: x509.ExtensionType) -> str: + if isinstance(bc, x509.BasicConstraints): + parts = [] + if bc.path_length is not None: + parts.append(f"path length: {bc.path_length}") + parts.append(f"CA: {bc.ca}") + return ", ".join(parts) + return f"unexpected extension {bc.oid}" + + +def format_subject_key_identifier(value: x509.ExtensionType) -> str: + if not isinstance(value, x509.SubjectKeyIdentifier): + raise WrongExtensionType(value.oid) + + parts = [] + if value.key_identifier: + parts.append(f"key identifier: {value.key_identifier.hex(':')}") + if value.digest: + parts.append(f"digest: {value.digest.hex(':')}") + return ", ".join(parts) + + +def format_authority_key_identifier(value: x509.ExtensionType) -> str: + if not isinstance(value, x509.AuthorityKeyIdentifier): + raise WrongExtensionType(value.oid) + + if value.key_identifier is not None: + return f"identifier: {value.key_identifier.hex(':')}" + + return ( + f" issuer {value.authority_cert_issuer[0].value.rfc4514_string()}," + f" serial {value.authority_cert_serial_number}" + ) + + +def format_extension(extensions: x509.Extensions, oid: x509.ObjectIdentifier) -> str: + try: + ext = extensions.get_extension_for_oid(oid) + except x509.ExtensionNotFound: + return "-" + + formatters = { + x509.OID_BASIC_CONSTRAINTS: format_basic_constraints, + x509.OID_KEY_USAGE: format_key_usage, + x509.OID_EXTENDED_KEY_USAGE: format_ext_key_usage, + x509.OID_AUTHORITY_KEY_IDENTIFIER: format_authority_key_identifier, + x509.OID_SUBJECT_KEY_IDENTIFIER: format_subject_key_identifier, + x509.OID_AUTHORITY_INFORMATION_ACCESS: format_authority_information_access, + x509.OID_CRL_DISTRIBUTION_POINTS: format_crl_distribution_points, + x509.OID_CERTIFICATE_POLICIES: format_certificate_policies, + x509.OID_SUBJECT_ALTERNATIVE_NAME: format_subject_alternative_names, + } + + if oid in formatters: + return formatters[oid](ext.value) + (" (critical)" if ext.critical else "") + + return str(ext.value) + + +def table_line(key, values): + print(f"| {key} |", " | ".join(values), "|")