certificate_analysis/analyze_certs.py

365 lines
11 KiB
Python
Raw Normal View History

2022-07-03 14:38:15 +00:00
import argparse
import glob
import os
import sys
from datetime import datetime
from anytree import Node, RenderTree
from anytree.exporter import DotExporter
from cryptography import x509
from cryptography.exceptions import UnsupportedAlgorithm
from cryptography.hazmat._oid import (
AuthorityInformationAccessOID,
ExtendedKeyUsageOID,
ExtensionOID,
NameOID,
)
from cryptography.hazmat.backends.openssl import rsa
from cryptography.hazmat.primitives.asymmetric.ec import EllipticCurvePublicKey
from cryptography.hazmat.primitives.asymmetric.ed25519 import Ed25519PublicKey
def analyze_certificate(cert, include_old) -> (bytes, bytes, x509.Certificate):
with open(cert, "rb") as f:
cert_bytes = f.read()
try:
parsed = x509.load_pem_x509_certificate(cert_bytes)
except ValueError:
try:
parsed = x509.load_der_x509_certificate(cert_bytes)
except ValueError:
print(f"skip {cert}", file=sys.stderr)
return None, None, None
if not include_old:
if parsed.not_valid_after < datetime.utcnow():
print(
f"skip {cert} not valid after {parsed.not_valid_after}", file=sys.stderr
)
return None, None, None
return parsed.subject.public_bytes(), parsed.issuer.public_bytes(), parsed
def edge_type_func(node, child):
return "--"
def get_key_type(public_key):
if isinstance(public_key, rsa.RSAPublicKey):
return f"RSA {public_key.key_size}"
if isinstance(public_key, EllipticCurvePublicKey):
return f"EC {public_key.curve.name}"
if isinstance(public_key, Ed25519PublicKey):
return "ED25519"
return type(public_key)
def node_name_func(node):
try:
key_type = get_key_type(node.data.public_key())
except UnsupportedAlgorithm:
key_type = "unsupported"
return f"{node.name}\n{hex(node.data.serial_number)}\n{key_type}"
def format_key_usage(value: x509.KeyUsage):
usages = []
if value.digital_signature:
usages.append("digital signature")
if value.content_commitment:
usages.append("non repudiation")
if value.key_encipherment:
usages.append("key encipherment")
if value.data_encipherment:
usages.append("data encipherment")
if value.key_agreement:
usages.append("key agreement")
if value.encipher_only:
usages.append("encipher only")
if value.decipher_only:
usages.append("decipher only")
if value.key_cert_sign:
usages.append("key cert sign")
if value.crl_sign:
usages.append("crl sign")
return ", ".join(usages)
def format_ext_key_usage(value: x509.ExtendedKeyUsage):
usages = []
labels = {
ExtendedKeyUsageOID.ANY_EXTENDED_KEY_USAGE: "any",
ExtendedKeyUsageOID.SERVER_AUTH: "server auth",
ExtendedKeyUsageOID.CLIENT_AUTH: "client auth",
ExtendedKeyUsageOID.CODE_SIGNING: "code signing",
ExtendedKeyUsageOID.EMAIL_PROTECTION: "email protection",
ExtendedKeyUsageOID.IPSEC_IKE: "IPSEC IKE",
ExtendedKeyUsageOID.KERBEROS_PKINIT_KDC: "Kerberos PKINIT KDC",
ExtendedKeyUsageOID.OCSP_SIGNING: "OCSP signing",
ExtendedKeyUsageOID.SMARTCARD_LOGON: "SmartCard logon",
ExtendedKeyUsageOID.TIME_STAMPING: "timestamping",
}
for eku in value:
if eku in labels:
usages.append(labels[eku])
else:
usages.append(format_oid_link(eku))
return ", ".join(usages)
def format_crl_distribution_points(value: x509.CRLDistributionPoints):
dps = []
for dp in value:
if dp.full_name is not None:
for item in dp.full_name:
if isinstance(item, x509.UniformResourceIdentifier):
dps.append(item.value)
else:
dps.append(item)
else:
dps.append(f"unsupported {dp}")
return ", ".join(dps)
def format_access_description(ai: x509.AccessDescription):
if ai.access_method == AuthorityInformationAccessOID.OCSP:
prefix = "OCSP:"
elif ai.access_method == AuthorityInformationAccessOID.CA_ISSUERS:
prefix = "CA Issuers:"
else:
prefix = format_oid_link(ai.access_method)
if isinstance(ai.access_location, x509.UniformResourceIdentifier):
return f"{prefix} {ai.access_location.value}"
return f"{prefix} {ai.access_location}"
def format_authinfo_access(value: x509.AuthorityInformationAccess):
access_infos = []
for ai in value:
access_infos.append(format_access_description(ai))
return ", ".join(access_infos)
def format_oid_link(oid: x509.ObjectIdentifier) -> str:
return f"[{oid.dotted_string}](https://oidref.com/{oid.dotted_string})"
def format_policy_information(pi: x509.PolicyInformation):
if pi.policy_identifier == x509.ObjectIdentifier("2.5.29.32.0"):
prefix = "anyPolicy"
else:
prefix = format_oid_link(pi.policy_identifier)
qualifiers = []
if pi.policy_qualifiers is None:
return prefix
for pq in pi.policy_qualifiers:
if isinstance(pq, str):
qualifiers.append(pq)
elif isinstance(pq, x509.UserNotice):
qualifiers.append(pq)
else:
qualifiers.append(f"unknown policy information {pq}")
value = ", ".join(qualifiers)
return f"{prefix}: {value}"
def format_certificate_policies(value: x509.CertificatePolicies):
policies = []
for pi in value:
policies.append(format_policy_information(pi))
return ", ".join(policies)
def format_subject_alternative_names(value: x509.SubjectAlternativeName) -> str:
alt_names = []
for name_type in (
x509.DNSName,
x509.RFC822Name,
x509.DirectoryName,
x509.OtherName,
x509.UniformResourceIdentifier,
):
alt_names.extend(value.get_values_for_type(name_type))
if not alt_names:
raise Exception(f"unhandled {value}")
return ", ".join(alt_names)
def cert_to_markdown(cert: x509.Certificate):
print("| Item | Value(s) |")
print("|------|----------|")
print("| Subject: |", cert.subject.rfc4514_string(), "|")
print("| Issuer: |", cert.issuer.rfc4514_string(), "|")
print(f"| Serial: | {hex(cert.serial_number)} ({cert.serial_number}) |")
try:
pubkey = get_key_type(cert.public_key())
except UnsupportedAlgorithm:
pubkey = "unsupported public key"
print(f"| Public Key: | {pubkey} |")
try:
hash_name = cert.signature_hash_algorithm.name
except UnsupportedAlgorithm:
hash_name = format_oid_link(cert.signature_algorithm_oid)
print(f"| Signature hash: | {hash_name} |")
print(f"| Not before: | {cert.not_valid_before} |")
print(f"| Not after: | {cert.not_valid_after} |")
for ext in cert.extensions:
if ext.oid == ExtensionOID.BASIC_CONSTRAINTS:
if ext.value.path_length is not None:
print(f"| Path length: | {ext.value.path_length} |")
elif ext.oid == ExtensionOID.SUBJECT_KEY_IDENTIFIER:
print(f"| Subject Key Id: | {ext.value.digest.hex(':')} |")
elif ext.oid == ExtensionOID.AUTHORITY_KEY_IDENTIFIER:
if ext.value.key_identifier is not None:
print(f"| Authority Key Id: | {ext.value.key_identifier.hex(':')} |")
else:
print(
"| Authority Key Id: |"
f" issuer {ext.value.authority_cert_issuer[0].value.rfc4514_string()},"
f" serial {ext.value.authority_cert_serial_number} |"
)
elif ext.oid == ExtensionOID.KEY_USAGE:
print("| Key Usage: |", format_key_usage(ext.value), "|")
elif ext.oid == ExtensionOID.EXTENDED_KEY_USAGE:
print("| Extended Key Usage: |", format_ext_key_usage(ext.value), "|")
elif ext.oid == ExtensionOID.CRL_DISTRIBUTION_POINTS:
print(
"| CRL Distribution Points: |",
format_crl_distribution_points(ext.value),
"|",
)
elif ext.oid == ExtensionOID.AUTHORITY_INFORMATION_ACCESS:
print(
"| Authority information access: |",
format_authinfo_access(ext.value),
"|",
)
elif ext.oid == ExtensionOID.CERTIFICATE_POLICIES:
print(
"| Certificate policies: |", format_certificate_policies(ext.value), "|"
)
elif ext.oid == ExtensionOID.SUBJECT_ALTERNATIVE_NAME:
print(
"| Subject alternative names: |",
format_subject_alternative_names(ext.value),
"|",
)
elif ext.oid == x509.ObjectIdentifier("1.3.6.1.5.5.7.1.3"):
print(
"| Qualified Certificate Statements: |"
" see [RFC-3739](https://datatracker.ietf.org/doc/html/rfc3739.html) |"
)
elif ext.oid == x509.ObjectIdentifier("2.5.29.16"):
print(
"| Private Key usage period | see [RFC-3280](https://tools.ietf.org/html/rfc3280.html) |"
)
else:
print(
f"| unknown extension {ext.oid.dotted_string} |"
f" {ext.value} try {format_oid_link(ext.oid)} |"
)
def analyze_certificates(directory, include_old):
nodes = {}
node_parents = {}
issuers = {}
roots = []
certs = glob.glob("*.cer", root_dir=directory)
certs += glob.glob("*.crt", root_dir=directory)
certs += glob.glob("*.der", root_dir=directory)
certs += glob.glob("*.pem", root_dir=directory)
for cert in sorted(certs):
subject, issuer, attributes = analyze_certificate(
os.path.join(directory, cert), include_old
)
if subject is None:
continue
map_key = f"{issuer}-{attributes.serial_number}"
if map_key in nodes:
continue
node = Node(
attributes.subject.get_attributes_for_oid(NameOID.COMMON_NAME)[0].value,
data=attributes,
)
nodes[map_key] = node
if issuer == subject:
roots.append(node)
else:
issuers.setdefault(issuer, []).append(map_key)
node_parents[map_key] = issuer
for item in nodes:
issuer = nodes[item].data.subject.public_bytes()
if issuer in issuers:
for map_key in issuers[issuer]:
nodes[map_key].parent = nodes[item]
for root in roots:
image_name = os.path.join(
directory, "{}-{}.svg".format(root.name, root.data.serial_number)
)
exp = DotExporter(
root,
graph="graph",
options=["rankdir=LR"],
edgetypefunc=edge_type_func,
nodenamefunc=node_name_func,
)
exp.to_picture(image_name)
print(f"# {root.name}")
print()
print(f"![hierarchy for {root.name}]({image_name})")
print()
for _, _, node in RenderTree(root):
cert = node.data
print(f"## {node.name}")
print()
cert_to_markdown(cert)
print()
def main():
parser = argparse.ArgumentParser(prog="analyze_certs")
parser.add_argument("directory", help="directory of certificate files")
parser.add_argument(
"include_old", action="store_false", help="include old certificates"
)
args = parser.parse_args()
analyze_certificates(args.directory, args.include_old)
if __name__ == "__main__":
main()