Add certificate compare script

- add compare_certs.py
- refactor common logic into helpers.py
This commit is contained in:
Jan Dittberner 2022-07-03 18:44:17 +02:00 committed by Jan Dittberner
parent 55be7f7d37
commit bcb504c555
4 changed files with 406 additions and 240 deletions

9
.gitignore vendored
View file

@ -1,8 +1,9 @@
*.md
!README.md
*.cer *.cer
*.crt *.crt
*.der *.der
*.pem
*/*.svg
/.idea/ /.idea/
*.md
*.pem
__pycache__/
!README.md
*/*.svg

View file

@ -8,195 +8,17 @@ from anytree import Node, RenderTree
from anytree.exporter import DotExporter from anytree.exporter import DotExporter
from cryptography import x509 from cryptography import x509
from cryptography.exceptions import UnsupportedAlgorithm from cryptography.exceptions import UnsupportedAlgorithm
from cryptography.hazmat._oid import (
AuthorityInformationAccessOID, import helpers
ExtendedKeyUsageOID, from helpers import format_key_type, format_oid_link, parse_certificate, table_line
ExtensionOID,
NameOID,
)
from cryptography.hazmat.backends.openssl import rsa
from cryptography.hazmat.primitives.asymmetric.ec import EllipticCurvePublicKey
from cryptography.hazmat.primitives.asymmetric.ed25519 import Ed25519PublicKey
def analyze_certificate(cert, include_old) -> (bytes, bytes, x509.Certificate): def edge_type_func(*_):
with open(cert, "rb") as f:
cert_bytes = f.read()
try:
parsed = x509.load_pem_x509_certificate(cert_bytes)
except ValueError:
try:
parsed = x509.load_der_x509_certificate(cert_bytes)
except ValueError:
print(f"skip {cert}", file=sys.stderr)
return None, None, None
if not include_old:
if parsed.not_valid_after < datetime.utcnow():
print(
f"skip {cert} not valid after {parsed.not_valid_after}", file=sys.stderr
)
return None, None, None
return parsed.subject.public_bytes(), parsed.issuer.public_bytes(), parsed
def edge_type_func(node, child):
return "--" return "--"
def get_key_type(public_key):
if isinstance(public_key, rsa.RSAPublicKey):
return f"RSA {public_key.key_size}"
if isinstance(public_key, EllipticCurvePublicKey):
return f"EC {public_key.curve.name}"
if isinstance(public_key, Ed25519PublicKey):
return "ED25519"
return type(public_key)
def node_name_func(node): def node_name_func(node):
try: return f"{node.name}\n{hex(node.data.serial_number)}\n{format_key_type(node.data)}"
key_type = get_key_type(node.data.public_key())
except UnsupportedAlgorithm:
key_type = "unsupported"
return f"{node.name}\n{hex(node.data.serial_number)}\n{key_type}"
def format_key_usage(value: x509.KeyUsage):
usages = []
if value.digital_signature:
usages.append("digital signature")
if value.content_commitment:
usages.append("non repudiation")
if value.key_encipherment:
usages.append("key encipherment")
if value.data_encipherment:
usages.append("data encipherment")
if value.key_agreement:
usages.append("key agreement")
if value.encipher_only:
usages.append("encipher only")
if value.decipher_only:
usages.append("decipher only")
if value.key_cert_sign:
usages.append("key cert sign")
if value.crl_sign:
usages.append("crl sign")
return ", ".join(usages)
def format_ext_key_usage(value: x509.ExtendedKeyUsage):
usages = []
labels = {
ExtendedKeyUsageOID.ANY_EXTENDED_KEY_USAGE: "any",
ExtendedKeyUsageOID.SERVER_AUTH: "server auth",
ExtendedKeyUsageOID.CLIENT_AUTH: "client auth",
ExtendedKeyUsageOID.CODE_SIGNING: "code signing",
ExtendedKeyUsageOID.EMAIL_PROTECTION: "email protection",
ExtendedKeyUsageOID.IPSEC_IKE: "IPSEC IKE",
ExtendedKeyUsageOID.KERBEROS_PKINIT_KDC: "Kerberos PKINIT KDC",
ExtendedKeyUsageOID.OCSP_SIGNING: "OCSP signing",
ExtendedKeyUsageOID.SMARTCARD_LOGON: "SmartCard logon",
ExtendedKeyUsageOID.TIME_STAMPING: "timestamping",
}
for eku in value:
if eku in labels:
usages.append(labels[eku])
else:
usages.append(format_oid_link(eku))
return ", ".join(usages)
def format_crl_distribution_points(value: x509.CRLDistributionPoints):
dps = []
for dp in value:
if dp.full_name is not None:
for item in dp.full_name:
if isinstance(item, x509.UniformResourceIdentifier):
dps.append(item.value)
else:
dps.append(item)
else:
dps.append(f"unsupported {dp}")
return ", ".join(dps)
def format_access_description(ai: x509.AccessDescription):
if ai.access_method == AuthorityInformationAccessOID.OCSP:
prefix = "OCSP:"
elif ai.access_method == AuthorityInformationAccessOID.CA_ISSUERS:
prefix = "CA Issuers:"
else:
prefix = format_oid_link(ai.access_method)
if isinstance(ai.access_location, x509.UniformResourceIdentifier):
return f"{prefix} {ai.access_location.value}"
return f"{prefix} {ai.access_location}"
def format_authinfo_access(value: x509.AuthorityInformationAccess):
access_infos = []
for ai in value:
access_infos.append(format_access_description(ai))
return ", ".join(access_infos)
def format_oid_link(oid: x509.ObjectIdentifier) -> str:
return f"[{oid.dotted_string}](https://oidref.com/{oid.dotted_string})"
def format_policy_information(pi: x509.PolicyInformation):
if pi.policy_identifier == x509.ObjectIdentifier("2.5.29.32.0"):
prefix = "anyPolicy"
else:
prefix = format_oid_link(pi.policy_identifier)
qualifiers = []
if pi.policy_qualifiers is None:
return prefix
for pq in pi.policy_qualifiers:
if isinstance(pq, str):
qualifiers.append(pq)
elif isinstance(pq, x509.UserNotice):
qualifiers.append(pq)
else:
qualifiers.append(f"unknown policy information {pq}")
value = ", ".join(qualifiers)
return f"{prefix}: {value}"
def format_certificate_policies(value: x509.CertificatePolicies):
policies = []
for pi in value:
policies.append(format_policy_information(pi))
return ", ".join(policies)
def format_subject_alternative_names(value: x509.SubjectAlternativeName) -> str:
alt_names = []
for name_type in (
x509.DNSName,
x509.RFC822Name,
x509.DirectoryName,
x509.OtherName,
x509.UniformResourceIdentifier,
):
alt_names.extend(value.get_values_for_type(name_type))
if not alt_names:
raise Exception(f"unhandled {value}")
return ", ".join(alt_names)
def cert_to_markdown(cert: x509.Certificate): def cert_to_markdown(cert: x509.Certificate):
@ -205,11 +27,7 @@ def cert_to_markdown(cert: x509.Certificate):
print("| Subject: |", cert.subject.rfc4514_string(), "|") print("| Subject: |", cert.subject.rfc4514_string(), "|")
print("| Issuer: |", cert.issuer.rfc4514_string(), "|") print("| Issuer: |", cert.issuer.rfc4514_string(), "|")
print(f"| Serial: | {hex(cert.serial_number)} ({cert.serial_number}) |") print(f"| Serial: | {hex(cert.serial_number)} ({cert.serial_number}) |")
try: print(f"| Public Key: | {format_key_type(cert)} |")
pubkey = get_key_type(cert.public_key())
except UnsupportedAlgorithm:
pubkey = "unsupported public key"
print(f"| Public Key: | {pubkey} |")
try: try:
hash_name = cert.signature_hash_algorithm.name hash_name = cert.signature_hash_algorithm.name
except UnsupportedAlgorithm: except UnsupportedAlgorithm:
@ -219,46 +37,9 @@ def cert_to_markdown(cert: x509.Certificate):
print(f"| Not after: | {cert.not_valid_after} |") print(f"| Not after: | {cert.not_valid_after} |")
for ext in cert.extensions: for ext in cert.extensions:
if ext.oid == ExtensionOID.BASIC_CONSTRAINTS: if ext.oid == x509.OID_BASIC_CONSTRAINTS:
if ext.value.path_length is not None: if ext.value.path_length is not None:
print(f"| Path length: | {ext.value.path_length} |") print(f"| Path length: | {ext.value.path_length} |")
elif ext.oid == ExtensionOID.SUBJECT_KEY_IDENTIFIER:
print(f"| Subject Key Id: | {ext.value.digest.hex(':')} |")
elif ext.oid == ExtensionOID.AUTHORITY_KEY_IDENTIFIER:
if ext.value.key_identifier is not None:
print(f"| Authority Key Id: | {ext.value.key_identifier.hex(':')} |")
else:
print(
"| Authority Key Id: |"
f" issuer {ext.value.authority_cert_issuer[0].value.rfc4514_string()},"
f" serial {ext.value.authority_cert_serial_number} |"
)
elif ext.oid == ExtensionOID.KEY_USAGE:
print("| Key Usage: |", format_key_usage(ext.value), "|")
elif ext.oid == ExtensionOID.EXTENDED_KEY_USAGE:
print("| Extended Key Usage: |", format_ext_key_usage(ext.value), "|")
elif ext.oid == ExtensionOID.CRL_DISTRIBUTION_POINTS:
print(
"| CRL Distribution Points: |",
format_crl_distribution_points(ext.value),
"|",
)
elif ext.oid == ExtensionOID.AUTHORITY_INFORMATION_ACCESS:
print(
"| Authority information access: |",
format_authinfo_access(ext.value),
"|",
)
elif ext.oid == ExtensionOID.CERTIFICATE_POLICIES:
print(
"| Certificate policies: |", format_certificate_policies(ext.value), "|"
)
elif ext.oid == ExtensionOID.SUBJECT_ALTERNATIVE_NAME:
print(
"| Subject alternative names: |",
format_subject_alternative_names(ext.value),
"|",
)
elif ext.oid == x509.ObjectIdentifier("1.3.6.1.5.5.7.1.3"): elif ext.oid == x509.ObjectIdentifier("1.3.6.1.5.5.7.1.3"):
print( print(
"| Qualified Certificate Statements: |" "| Qualified Certificate Statements: |"
@ -269,9 +50,9 @@ def cert_to_markdown(cert: x509.Certificate):
"| Private Key usage period | see [RFC-3280](https://tools.ietf.org/html/rfc3280.html) |" "| Private Key usage period | see [RFC-3280](https://tools.ietf.org/html/rfc3280.html) |"
) )
else: else:
print( table_line(
f"| unknown extension {ext.oid.dotted_string} |" helpers.extension_label(ext.oid),
f" {ext.value} try {format_oid_link(ext.oid)} |" [helpers.format_extension(cert.extensions, ext.oid)],
) )
@ -287,21 +68,30 @@ def analyze_certificates(directory, include_old):
certs += glob.glob("*.pem", root_dir=directory) certs += glob.glob("*.pem", root_dir=directory)
for cert in sorted(certs): for cert in sorted(certs):
subject, issuer, attributes = analyze_certificate( certificate = parse_certificate(os.path.join(directory, cert))
os.path.join(directory, cert), include_old
)
if subject is None: if certificate is None:
continue continue
map_key = f"{issuer}-{attributes.serial_number}" if not include_old:
if certificate.not_valid_after < datetime.utcnow():
print(
f"skip {cert} not valid after {certificate.not_valid_after}",
file=sys.stderr,
)
continue
subject = certificate.subject.public_bytes()
issuer = certificate.issuer.public_bytes()
map_key = f"{issuer}-{certificate.serial_number}"
if map_key in nodes: if map_key in nodes:
continue continue
node = Node( node = Node(
attributes.subject.get_attributes_for_oid(NameOID.COMMON_NAME)[0].value, certificate.subject.get_attributes_for_oid(x509.OID_COMMON_NAME)[0].value,
data=attributes, data=certificate,
) )
nodes[map_key] = node nodes[map_key] = node

107
compare_certs.py Normal file
View file

@ -0,0 +1,107 @@
"""
Script to compare a set of X.509 certificate files
"""
import argparse
import typing
from cryptography import x509
import helpers
DEFAULT_OIDS = [
x509.OID_BASIC_CONSTRAINTS,
x509.OID_KEY_USAGE,
x509.OID_EXTENDED_KEY_USAGE,
x509.OID_AUTHORITY_KEY_IDENTIFIER,
x509.OID_SUBJECT_KEY_IDENTIFIER,
x509.OID_AUTHORITY_KEY_IDENTIFIER,
x509.OID_AUTHORITY_INFORMATION_ACCESS,
x509.OID_CRL_DISTRIBUTION_POINTS,
x509.OID_CERTIFICATE_POLICIES,
]
def table_line(key, values):
print(f"| {key} |", " | ".join(values), "|")
def format_duration(cert: x509.Certificate) -> str:
return f"{cert.not_valid_before} - {cert.not_valid_after} = {(cert.not_valid_after - cert.not_valid_before).days}d"
def compare_certificates(certificates: typing.List[str]):
parsed = {}
for cert_filename in certificates:
parsed[cert_filename] = helpers.parse_certificate(cert_filename)
print("| |", " | ".join([f"`{name}`" for name in certificates]), "|")
print("|---|" + "---|" * len(certificates))
table_line("Subject", [parsed[fn].subject.rfc4514_string() for fn in certificates])
table_line(
"Issuer",
[
parsed[fn].issuer.rfc4514_string(
attr_name_overrides={
x509.ObjectIdentifier("1.2.840.113549.1.9.1"): "emailAddress"
}
)
for fn in certificates
],
)
table_line(
"Public Key algorithm",
[helpers.format_key_type(parsed[fn]) for fn in certificates],
)
table_line(
"Hash algorithm",
[helpers.format_hash_algorithm(parsed[fn]) for fn in certificates],
)
table_line(
"Validity duration", [format_duration(parsed[fn]) for fn in certificates]
)
extra_oids = set()
for cert in certificates:
for oid in [
ext.oid for ext in parsed[cert].extensions if ext.oid not in DEFAULT_OIDS
]:
extra_oids.add(oid)
for oid in DEFAULT_OIDS:
table_line(
helpers.extension_label(oid),
[
helpers.format_extension(parsed[fn].extensions, oid)
for fn in certificates
],
)
for oid in extra_oids:
table_line(
helpers.extension_label(oid),
[
helpers.format_extension(parsed[fn].extensions, oid)
for fn in certificates
],
)
def main():
parser = argparse.ArgumentParser(
prog="compare certs",
description="compare a certificate to a set of other certificates",
)
parser.add_argument("first", help="first certificates")
parser.add_argument("others", nargs="+", help="other certificates")
args = parser.parse_args()
cert_list = [args.first]
cert_list.extend(args.others)
compare_certificates(cert_list)
if __name__ == "__main__":
main()

268
helpers.py Normal file
View file

@ -0,0 +1,268 @@
import sys
import typing
from cryptography import x509
from cryptography.exceptions import UnsupportedAlgorithm
from cryptography.hazmat.backends.openssl import rsa
from cryptography.hazmat.primitives.asymmetric.ec import EllipticCurvePublicKey
from cryptography.hazmat.primitives.asymmetric.ed25519 import Ed25519PublicKey
class WrongExtensionType(Exception):
def __init__(self, oid: x509.ObjectIdentifier):
super(f"wrong object id: {oid.dotted_string}")
def parse_certificate(cert: str) -> typing.Union[x509.Certificate, None]:
with open(cert, "rb") as f:
cert_bytes = f.read()
try:
parsed = x509.load_pem_x509_certificate(cert_bytes)
except ValueError:
try:
parsed = x509.load_der_x509_certificate(cert_bytes)
except ValueError:
print(f"skip {cert}", file=sys.stderr)
return None
return parsed
def format_key_usage(value: x509.KeyUsage):
usages = []
if value.digital_signature:
usages.append("digital signature")
if value.content_commitment:
usages.append("non repudiation")
if value.key_encipherment:
usages.append("key encipherment")
if value.data_encipherment:
usages.append("data encipherment")
if value.key_agreement:
usages.append("key agreement")
if value.encipher_only:
usages.append("encipher only")
if value.decipher_only:
usages.append("decipher only")
if value.key_cert_sign:
usages.append("key cert sign")
if value.crl_sign:
usages.append("crl sign")
return ", ".join(usages)
def format_oid_link(oid: x509.ObjectIdentifier) -> str:
return f"[{oid.dotted_string}](https://oidref.com/{oid.dotted_string})"
def get_key_type(public_key):
if isinstance(public_key, rsa.RSAPublicKey):
return f"RSA {public_key.key_size}"
if isinstance(public_key, EllipticCurvePublicKey):
return f"EC {public_key.curve.name}"
if isinstance(public_key, Ed25519PublicKey):
return "ED25519"
return type(public_key)
def format_key_type(cert: x509.Certificate) -> str:
try:
key_type = get_key_type(cert.public_key())
except UnsupportedAlgorithm:
key_type = "unsupported key type"
return key_type
def format_ext_key_usage(value: x509.ExtendedKeyUsage):
usages = []
labels = {
x509.ExtendedKeyUsageOID.ANY_EXTENDED_KEY_USAGE: "any",
x509.ExtendedKeyUsageOID.SERVER_AUTH: "server auth",
x509.ExtendedKeyUsageOID.CLIENT_AUTH: "client auth",
x509.ExtendedKeyUsageOID.CODE_SIGNING: "code signing",
x509.ExtendedKeyUsageOID.EMAIL_PROTECTION: "email protection",
x509.ExtendedKeyUsageOID.IPSEC_IKE: "IPSEC IKE",
x509.ExtendedKeyUsageOID.KERBEROS_PKINIT_KDC: "Kerberos PKINIT KDC",
x509.ExtendedKeyUsageOID.OCSP_SIGNING: "OCSP signing",
x509.ExtendedKeyUsageOID.SMARTCARD_LOGON: "SmartCard logon",
x509.ExtendedKeyUsageOID.TIME_STAMPING: "timestamping",
}
for eku in value:
if eku in labels:
usages.append(labels[eku])
else:
usages.append(format_oid_link(eku))
return ", ".join(usages)
def format_crl_distribution_points(value: x509.CRLDistributionPoints):
dps = []
for dp in value:
if dp.full_name is not None:
for item in dp.full_name:
if isinstance(item, x509.UniformResourceIdentifier):
dps.append(item.value)
else:
dps.append(item)
else:
dps.append(f"unsupported {dp}")
return ", ".join(dps)
def format_access_description(ai: x509.AccessDescription):
if ai.access_method == x509.AuthorityInformationAccessOID.OCSP:
prefix = "OCSP:"
elif ai.access_method == x509.AuthorityInformationAccessOID.CA_ISSUERS:
prefix = "CA Issuers:"
else:
prefix = format_oid_link(ai.access_method)
if isinstance(ai.access_location, x509.UniformResourceIdentifier):
return f"{prefix} {ai.access_location.value}"
return f"{prefix} {ai.access_location}"
def format_authority_information_access(value: x509.AuthorityInformationAccess):
access_infos = []
for ai in value:
access_infos.append(format_access_description(ai))
return ", ".join(access_infos)
def format_policy_information(pi: x509.PolicyInformation):
if pi.policy_identifier == x509.ObjectIdentifier("2.5.29.32.0"):
prefix = "anyPolicy"
else:
prefix = format_oid_link(pi.policy_identifier)
qualifiers = []
if pi.policy_qualifiers is None:
return prefix
for pq in pi.policy_qualifiers:
if isinstance(pq, str):
qualifiers.append(pq)
elif isinstance(pq, x509.UserNotice):
qualifiers.append(pq)
else:
qualifiers.append(f"unknown policy information {pq}")
value = ", ".join(qualifiers)
return f"{prefix}: {value}"
def format_certificate_policies(value: x509.CertificatePolicies):
policies = []
for pi in value:
policies.append(format_policy_information(pi))
return ", ".join(policies)
def format_subject_alternative_names(value: x509.SubjectAlternativeName) -> str:
alt_names = []
for name_type in (
x509.DNSName,
x509.RFC822Name,
x509.DirectoryName,
x509.OtherName,
x509.UniformResourceIdentifier,
):
alt_names.extend(value.get_values_for_type(name_type))
if not alt_names:
raise Exception(f"unhandled {value}")
return ", ".join(alt_names)
def format_hash_algorithm(cert: x509.Certificate) -> str:
try:
return cert.signature_hash_algorithm.name
except UnsupportedAlgorithm:
return f"unsupported hash algorithm {format_oid_link(cert.signature_algorithm_oid)}"
def extension_label(oid: x509.ObjectIdentifier) -> str:
labels = {
x509.OID_BASIC_CONSTRAINTS: "Basic constraints",
x509.OID_KEY_USAGE: "Key usage",
x509.OID_EXTENDED_KEY_USAGE: "Extended key usage",
x509.OID_AUTHORITY_KEY_IDENTIFIER: "Authority key identifier",
x509.OID_SUBJECT_KEY_IDENTIFIER: "Subject key identifier",
x509.OID_AUTHORITY_INFORMATION_ACCESS: "Authority information access",
x509.OID_CRL_DISTRIBUTION_POINTS: "CRL distribution points",
x509.OID_CERTIFICATE_POLICIES: "Certificate policies",
}
if oid in labels:
return labels[oid]
return format_oid_link(oid)
def format_basic_constraints(bc: x509.ExtensionType) -> str:
if isinstance(bc, x509.BasicConstraints):
parts = []
if bc.path_length is not None:
parts.append(f"path length: {bc.path_length}")
parts.append(f"CA: {bc.ca}")
return ", ".join(parts)
return f"unexpected extension {bc.oid}"
def format_subject_key_identifier(value: x509.ExtensionType) -> str:
if not isinstance(value, x509.SubjectKeyIdentifier):
raise WrongExtensionType(value.oid)
parts = []
if value.key_identifier:
parts.append(f"key identifier: {value.key_identifier.hex(':')}")
if value.digest:
parts.append(f"digest: {value.digest.hex(':')}")
return ", ".join(parts)
def format_authority_key_identifier(value: x509.ExtensionType) -> str:
if not isinstance(value, x509.AuthorityKeyIdentifier):
raise WrongExtensionType(value.oid)
if value.key_identifier is not None:
return f"identifier: {value.key_identifier.hex(':')}"
return (
f" issuer {value.authority_cert_issuer[0].value.rfc4514_string()},"
f" serial {value.authority_cert_serial_number}"
)
def format_extension(extensions: x509.Extensions, oid: x509.ObjectIdentifier) -> str:
try:
ext = extensions.get_extension_for_oid(oid)
except x509.ExtensionNotFound:
return "-"
formatters = {
x509.OID_BASIC_CONSTRAINTS: format_basic_constraints,
x509.OID_KEY_USAGE: format_key_usage,
x509.OID_EXTENDED_KEY_USAGE: format_ext_key_usage,
x509.OID_AUTHORITY_KEY_IDENTIFIER: format_authority_key_identifier,
x509.OID_SUBJECT_KEY_IDENTIFIER: format_subject_key_identifier,
x509.OID_AUTHORITY_INFORMATION_ACCESS: format_authority_information_access,
x509.OID_CRL_DISTRIBUTION_POINTS: format_crl_distribution_points,
x509.OID_CERTIFICATE_POLICIES: format_certificate_policies,
x509.OID_SUBJECT_ALTERNATIVE_NAME: format_subject_alternative_names,
}
if oid in formatters:
return formatters[oid](ext.value) + (" (critical)" if ext.critical else "")
return str(ext.value)
def table_line(key, values):
print(f"| {key} |", " | ".join(values), "|")