import argparse import glob import os import sys from datetime import datetime from anytree import Node, RenderTree from anytree.exporter import DotExporter from cryptography import x509 from cryptography.exceptions import UnsupportedAlgorithm from cryptography.hazmat._oid import ( AuthorityInformationAccessOID, ExtendedKeyUsageOID, ExtensionOID, NameOID, ) from cryptography.hazmat.backends.openssl import rsa from cryptography.hazmat.primitives.asymmetric.ec import EllipticCurvePublicKey from cryptography.hazmat.primitives.asymmetric.ed25519 import Ed25519PublicKey def analyze_certificate(cert, include_old) -> (bytes, bytes, x509.Certificate): with open(cert, "rb") as f: cert_bytes = f.read() try: parsed = x509.load_pem_x509_certificate(cert_bytes) except ValueError: try: parsed = x509.load_der_x509_certificate(cert_bytes) except ValueError: print(f"skip {cert}", file=sys.stderr) return None, None, None if not include_old: if parsed.not_valid_after < datetime.utcnow(): print( f"skip {cert} not valid after {parsed.not_valid_after}", file=sys.stderr ) return None, None, None return parsed.subject.public_bytes(), parsed.issuer.public_bytes(), parsed def edge_type_func(node, child): return "--" def get_key_type(public_key): if isinstance(public_key, rsa.RSAPublicKey): return f"RSA {public_key.key_size}" if isinstance(public_key, EllipticCurvePublicKey): return f"EC {public_key.curve.name}" if isinstance(public_key, Ed25519PublicKey): return "ED25519" return type(public_key) def node_name_func(node): try: key_type = get_key_type(node.data.public_key()) except UnsupportedAlgorithm: key_type = "unsupported" return f"{node.name}\n{hex(node.data.serial_number)}\n{key_type}" def format_key_usage(value: x509.KeyUsage): usages = [] if value.digital_signature: usages.append("digital signature") if value.content_commitment: usages.append("non repudiation") if value.key_encipherment: usages.append("key encipherment") if value.data_encipherment: usages.append("data encipherment") if value.key_agreement: usages.append("key agreement") if value.encipher_only: usages.append("encipher only") if value.decipher_only: usages.append("decipher only") if value.key_cert_sign: usages.append("key cert sign") if value.crl_sign: usages.append("crl sign") return ", ".join(usages) def format_ext_key_usage(value: x509.ExtendedKeyUsage): usages = [] labels = { ExtendedKeyUsageOID.ANY_EXTENDED_KEY_USAGE: "any", ExtendedKeyUsageOID.SERVER_AUTH: "server auth", ExtendedKeyUsageOID.CLIENT_AUTH: "client auth", ExtendedKeyUsageOID.CODE_SIGNING: "code signing", ExtendedKeyUsageOID.EMAIL_PROTECTION: "email protection", ExtendedKeyUsageOID.IPSEC_IKE: "IPSEC IKE", ExtendedKeyUsageOID.KERBEROS_PKINIT_KDC: "Kerberos PKINIT KDC", ExtendedKeyUsageOID.OCSP_SIGNING: "OCSP signing", ExtendedKeyUsageOID.SMARTCARD_LOGON: "SmartCard logon", ExtendedKeyUsageOID.TIME_STAMPING: "timestamping", } for eku in value: if eku in labels: usages.append(labels[eku]) else: usages.append(format_oid_link(eku)) return ", ".join(usages) def format_crl_distribution_points(value: x509.CRLDistributionPoints): dps = [] for dp in value: if dp.full_name is not None: for item in dp.full_name: if isinstance(item, x509.UniformResourceIdentifier): dps.append(item.value) else: dps.append(item) else: dps.append(f"unsupported {dp}") return ", ".join(dps) def format_access_description(ai: x509.AccessDescription): if ai.access_method == AuthorityInformationAccessOID.OCSP: prefix = "OCSP:" elif ai.access_method == AuthorityInformationAccessOID.CA_ISSUERS: prefix = "CA Issuers:" else: prefix = format_oid_link(ai.access_method) if isinstance(ai.access_location, x509.UniformResourceIdentifier): return f"{prefix} {ai.access_location.value}" return f"{prefix} {ai.access_location}" def format_authinfo_access(value: x509.AuthorityInformationAccess): access_infos = [] for ai in value: access_infos.append(format_access_description(ai)) return ", ".join(access_infos) def format_oid_link(oid: x509.ObjectIdentifier) -> str: return f"[{oid.dotted_string}](https://oidref.com/{oid.dotted_string})" def format_policy_information(pi: x509.PolicyInformation): if pi.policy_identifier == x509.ObjectIdentifier("2.5.29.32.0"): prefix = "anyPolicy" else: prefix = format_oid_link(pi.policy_identifier) qualifiers = [] if pi.policy_qualifiers is None: return prefix for pq in pi.policy_qualifiers: if isinstance(pq, str): qualifiers.append(pq) elif isinstance(pq, x509.UserNotice): qualifiers.append(pq) else: qualifiers.append(f"unknown policy information {pq}") value = ", ".join(qualifiers) return f"{prefix}: {value}" def format_certificate_policies(value: x509.CertificatePolicies): policies = [] for pi in value: policies.append(format_policy_information(pi)) return ", ".join(policies) def format_subject_alternative_names(value: x509.SubjectAlternativeName) -> str: alt_names = [] for name_type in ( x509.DNSName, x509.RFC822Name, x509.DirectoryName, x509.OtherName, x509.UniformResourceIdentifier, ): alt_names.extend(value.get_values_for_type(name_type)) if not alt_names: raise Exception(f"unhandled {value}") return ", ".join(alt_names) def cert_to_markdown(cert: x509.Certificate): print("| Item | Value(s) |") print("|------|----------|") print("| Subject: |", cert.subject.rfc4514_string(), "|") print("| Issuer: |", cert.issuer.rfc4514_string(), "|") print(f"| Serial: | {hex(cert.serial_number)} ({cert.serial_number}) |") try: pubkey = get_key_type(cert.public_key()) except UnsupportedAlgorithm: pubkey = "unsupported public key" print(f"| Public Key: | {pubkey} |") try: hash_name = cert.signature_hash_algorithm.name except UnsupportedAlgorithm: hash_name = format_oid_link(cert.signature_algorithm_oid) print(f"| Signature hash: | {hash_name} |") print(f"| Not before: | {cert.not_valid_before} |") print(f"| Not after: | {cert.not_valid_after} |") for ext in cert.extensions: if ext.oid == ExtensionOID.BASIC_CONSTRAINTS: if ext.value.path_length is not None: print(f"| Path length: | {ext.value.path_length} |") elif ext.oid == ExtensionOID.SUBJECT_KEY_IDENTIFIER: print(f"| Subject Key Id: | {ext.value.digest.hex(':')} |") elif ext.oid == ExtensionOID.AUTHORITY_KEY_IDENTIFIER: if ext.value.key_identifier is not None: print(f"| Authority Key Id: | {ext.value.key_identifier.hex(':')} |") else: print( "| Authority Key Id: |" f" issuer {ext.value.authority_cert_issuer[0].value.rfc4514_string()}," f" serial {ext.value.authority_cert_serial_number} |" ) elif ext.oid == ExtensionOID.KEY_USAGE: print("| Key Usage: |", format_key_usage(ext.value), "|") elif ext.oid == ExtensionOID.EXTENDED_KEY_USAGE: print("| Extended Key Usage: |", format_ext_key_usage(ext.value), "|") elif ext.oid == ExtensionOID.CRL_DISTRIBUTION_POINTS: print( "| CRL Distribution Points: |", format_crl_distribution_points(ext.value), "|", ) elif ext.oid == ExtensionOID.AUTHORITY_INFORMATION_ACCESS: print( "| Authority information access: |", format_authinfo_access(ext.value), "|", ) elif ext.oid == ExtensionOID.CERTIFICATE_POLICIES: print( "| Certificate policies: |", format_certificate_policies(ext.value), "|" ) elif ext.oid == ExtensionOID.SUBJECT_ALTERNATIVE_NAME: print( "| Subject alternative names: |", format_subject_alternative_names(ext.value), "|", ) elif ext.oid == x509.ObjectIdentifier("1.3.6.1.5.5.7.1.3"): print( "| Qualified Certificate Statements: |" " see [RFC-3739](https://datatracker.ietf.org/doc/html/rfc3739.html) |" ) elif ext.oid == x509.ObjectIdentifier("2.5.29.16"): print( "| Private Key usage period | see [RFC-3280](https://tools.ietf.org/html/rfc3280.html) |" ) else: print( f"| unknown extension {ext.oid.dotted_string} |" f" {ext.value} try {format_oid_link(ext.oid)} |" ) def analyze_certificates(directory, include_old): nodes = {} node_parents = {} issuers = {} roots = [] certs = glob.glob("*.cer", root_dir=directory) certs += glob.glob("*.crt", root_dir=directory) certs += glob.glob("*.der", root_dir=directory) certs += glob.glob("*.pem", root_dir=directory) for cert in sorted(certs): subject, issuer, attributes = analyze_certificate( os.path.join(directory, cert), include_old ) if subject is None: continue map_key = f"{issuer}-{attributes.serial_number}" if map_key in nodes: continue node = Node( attributes.subject.get_attributes_for_oid(NameOID.COMMON_NAME)[0].value, data=attributes, ) nodes[map_key] = node if issuer == subject: roots.append(node) else: issuers.setdefault(issuer, []).append(map_key) node_parents[map_key] = issuer for item in nodes: issuer = nodes[item].data.subject.public_bytes() if issuer in issuers: for map_key in issuers[issuer]: nodes[map_key].parent = nodes[item] for root in roots: image_name = os.path.join( directory, "{}-{}.svg".format(root.name, root.data.serial_number) ) exp = DotExporter( root, graph="graph", options=["rankdir=LR"], edgetypefunc=edge_type_func, nodenamefunc=node_name_func, ) exp.to_picture(image_name) print(f"# {root.name}") print() print(f"![hierarchy for {root.name}]({image_name})") print() for _, _, node in RenderTree(root): cert = node.data print(f"## {node.name}") print() cert_to_markdown(cert) print() def main(): parser = argparse.ArgumentParser(prog="analyze_certs") parser.add_argument("directory", help="directory of certificate files") parser.add_argument( "include_old", action="store_false", help="include old certificates" ) args = parser.parse_args() analyze_certificates(args.directory, args.include_old) if __name__ == "__main__": main()