From 52992aad46c73623e6648aea973f771a504e0ffa Mon Sep 17 00:00:00 2001 From: Jan Dittberner Date: Sun, 26 May 2024 10:43:09 +0200 Subject: [PATCH] Refactor and improve statistic script --- scripts/check_db_certificates.py | 344 ++++++++++++++++++++----------- 1 file changed, 223 insertions(+), 121 deletions(-) diff --git a/scripts/check_db_certificates.py b/scripts/check_db_certificates.py index e5ba23f..edd3cf7 100644 --- a/scripts/check_db_certificates.py +++ b/scripts/check_db_certificates.py @@ -1,4 +1,5 @@ #!/usr/bin/env python3 +import logging import os import re import typing @@ -18,6 +19,12 @@ class CheckResult(NamedTuple): def __str__(self): return f"[{self.code:04d}: {self.name}]" + def __lt__(self, other): + return self.code < other.code + + def __gt__(self, other): + return self.code > other.code + CODE_OK = CheckResult("OK", 0) CODE_FILE_MISSING = CheckResult("file missing", 1000) @@ -40,104 +47,11 @@ SUPPORTED_SIGNATURE_ALGORITHMS = [ ] -def check_csr(csr_name: str) -> [CheckResult, typing.Any]: - if not csr_name: - return CODE_EMPTY, None - - if not os.path.isfile(csr_name): - return CODE_FILE_MISSING, None - - with open(csr_name, "rb") as f: - csr_data = f.read() - - if re.search(r"SPKAC = ", csr_data.decode("iso-8859-1")): - return CODE_DEPRECATED_SPKAC, None - - try: - csr = x509.load_pem_x509_csr(csr_data) - except Exception as e: - print(e, csr_data) - return CODE_UNSUPPORTED_FORMAT, None - - if csr.signature_algorithm_oid not in SUPPORTED_SIGNATURE_ALGORITHMS: - return CODE_UNSUPPORTED_SIGNATURE_ALGORITHM, None - - try: - if not csr.is_signature_valid: - return CODE_INVALID_SIGNATURE, None - except Exception as e: - print(e, csr_data) - return CODE_INVALID_SIGNATURE, None - - public_key = csr.public_key() - - if isinstance(public_key, rsa.RSAPublicKey): - if public_key.key_size < 2048: - return CODE_PUBLIC_KEY_TOO_WEAK, None - elif isinstance(public_key, ec.EllipticCurvePublicKey): - if public_key.key_size < 256: - return CODE_PUBLIC_KEY_TOO_WEAK, None - else: - return CODE_UNSUPPORTED_PUBLIC_KEY, None - - return CODE_OK, public_key.public_numbers() - - -def check_crt(crt_name: str, ca_certificate: x509.Certificate, public_numbers: typing.Any) -> CheckResult: - if not crt_name: - return CODE_EMPTY - - if public_numbers is None: - return CODE_CERTIFICATE_FOR_INVALID_CSR - - if not os.path.isfile(crt_name): - return CODE_FILE_MISSING - - with open(crt_name, "rb") as f: - crt_data = f.read() - - try: - crt = x509.load_pem_x509_certificate(crt_data) - except ValueError: - return CODE_UNSUPPORTED_FORMAT - - if crt.signature_algorithm_oid not in SUPPORTED_SIGNATURE_ALGORITHMS: - return CODE_UNSUPPORTED_SIGNATURE_ALGORITHM - - try: - public_key = crt.public_key() - except UnsupportedAlgorithm: - return CODE_UNSUPPORTED_PUBLIC_KEY - - if isinstance(public_key, rsa.RSAPublicKey): - if public_key.key_size < 2048: - return CODE_PUBLIC_KEY_TOO_WEAK - elif isinstance(public_key, ec.EllipticCurvePublicKey): - if public_key.key_size < 256: - return CODE_PUBLIC_KEY_TOO_WEAK - else: - return CODE_UNSUPPORTED_PUBLIC_KEY - - if public_key.public_numbers() != public_numbers: - return CODE_CSR_AND_CRT_PUBLIC_KEY_MISMATCH - - if crt.not_valid_after_utc < datetime.now(timezone.utc): - return CODE_CERTIFICATE_IS_EXPIRED - - try: - crt.verify_directly_issued_by(ca_certificate) - except Exception as e: - print(e, crt.issuer, ca_certificate.subject) - return CODE_NOT_SIGNED_BY_EXPECTED_CA_CERTIFICATE - - return CODE_OK - - -def load_ca_certificates() -> dict[int, x509.Certificate]: - with open("../certs/root_X0F.crt", "rb") as f: +def load_ca_certificates(root_ca_cert, sub_ca_cert) -> dict[int, x509.Certificate]: + with open(root_ca_cert, "rb") as f: root_cert = x509.load_pem_x509_certificate(f.read()) - with open("../certs/CAcert_Class3Root_x14E228.crt", "rb") as f: + with open(sub_ca_cert, "rb") as f: class3_cert = x509.load_pem_x509_certificate(f.read()) return { @@ -146,42 +60,230 @@ def load_ca_certificates() -> dict[int, x509.Certificate]: } +class Counters: + fail = 0 + good = 0 + good_csr = 0 + good_crt = 0 + missing_crt = 0 + expired_crt = 0 + + csr_codes: dict[CheckResult, int] = {} + crt_codes: dict[CheckResult, int] = {} + + def count_fail(self): + self.fail += 1 + + def count_good(self): + self.good += 1 + + def count_good_csr(self): + self.good_csr += 1 + + def count_good_crt(self): + self.good_crt += 1 + + def count_missing_crt(self): + self.missing_crt += 1 + + def count_expired_crt(self): + self.expired_crt += 1 + + def count_csr(self, csr_code: CheckResult): + self.csr_codes.setdefault(csr_code, 0) + self.csr_codes[csr_code] += 1 + + def count_crt(self, crt_code: CheckResult): + self.crt_codes.setdefault(crt_code, 0) + self.crt_codes[crt_code] += 1 + + def __str__(self): + return ( + f"good CSR and certificate: {self.good}\n" + f"good CSR, issue with certificate: {self.good_csr}\n" + f"good certificate, issue with CSR: {self.good_crt}\n" + f"failed CSR and certificate: {self.fail}\n" + f"missing certificate: {self.missing_crt}\n" + f"expired certificate: {self.expired_crt}\n\nCSR results:\n" + ) + "\n".join([f"{code}: {count:d}" for code, count in sorted(self.csr_codes.items())]) + ( + "\n\nCertificate results:\n" + ) + ( + "\n".join([f"{code}: {count:d}" for code, count in sorted(self.crt_codes.items())]) + ) + + +class Analyzer: + def __init__(self, logger: logging.Logger, dsn: str, ca_certificates: dict[int, x509.Certificate]): + self.logger = logger + self.engine = create_engine(dsn) + self.ca_certificates = ca_certificates + self.c = Counters() + + def analyze(self): + metadata_obj = MetaData() + + for table in ("emailcerts", "domaincerts", "orgemailcerts", "orgdomaincerts"): + certs_table = Table(table, metadata_obj, autoload_with=self.engine) + + stmt = select(certs_table) + + with self.engine.connect() as conn: + for row in conn.execute(stmt): + self.analyze_row(table, row) + + def analyze_row(self, table: str, row) -> None: + ca_cert = self.ca_certificates[row.rootcert] + csr_code, public_numbers = self.check_csr(row.csr_name) + self.c.count_csr(csr_code) + crt_code = self.check_crt(row.crt_name, ca_cert, public_numbers) + self.c.count_crt(crt_code) + + if csr_code != CODE_OK and crt_code not in (CODE_OK, CODE_CERTIFICATE_IS_EXPIRED): + self.logger.debug( + "%06d %s: %06d -> csr_code: %s, crt_code: %s", + self.c.fail, table, row.id, csr_code, crt_code + ) + self.c.count_fail() + return + + if csr_code != CODE_OK: + self.c.count_good_crt() + if crt_code == CODE_CERTIFICATE_IS_EXPIRED: + self.c.count_expired_crt() + return + + if crt_code == CODE_EMPTY: + self.c.count_missing_crt() + return + + if csr_code == CODE_OK and crt_code not in (CODE_OK, CODE_CERTIFICATE_IS_EXPIRED): + self.c.count_good_csr() + return + + if crt_code == CODE_CERTIFICATE_IS_EXPIRED: + self.c.count_expired_crt() + + self.c.count_good() + + def check_csr(self, csr_name: str) -> [CheckResult, typing.Any]: + if not csr_name: + return CODE_EMPTY, None + + if not os.path.isfile(csr_name): + return CODE_FILE_MISSING, None + + with open(csr_name, "rb") as f: + csr_data = f.read() + + if re.search(r"SPKAC = ", csr_data.decode("iso-8859-1")): + return CODE_DEPRECATED_SPKAC, None + + try: + csr = x509.load_pem_x509_csr(csr_data) + except Exception as e: + self.logger.debug("unsupported CSR format: %s for\n%s", e, csr_data) + return CODE_UNSUPPORTED_FORMAT, None + + if csr.signature_algorithm_oid not in SUPPORTED_SIGNATURE_ALGORITHMS: + return CODE_UNSUPPORTED_SIGNATURE_ALGORITHM, None + + try: + if not csr.is_signature_valid: + return CODE_INVALID_SIGNATURE, None + except Exception as e: + self.logger.debug("CSR signature check failed: %s for \n%s", e, csr_data) + return CODE_INVALID_SIGNATURE, None + + public_key = csr.public_key() + + if isinstance(public_key, rsa.RSAPublicKey): + if public_key.key_size < 2048: + return CODE_PUBLIC_KEY_TOO_WEAK, None + elif isinstance(public_key, ec.EllipticCurvePublicKey): + if public_key.key_size < 256: + return CODE_PUBLIC_KEY_TOO_WEAK, None + else: + return CODE_UNSUPPORTED_PUBLIC_KEY, None + + return CODE_OK, public_key.public_numbers() + + def check_crt(self, crt_name: str, ca_certificate: x509.Certificate, public_numbers: typing.Any) -> CheckResult: + if not crt_name: + return CODE_EMPTY + + if not os.path.isfile(crt_name): + return CODE_FILE_MISSING + + with open(crt_name, "rb") as f: + crt_data = f.read() + + try: + crt = x509.load_pem_x509_certificate(crt_data) + except ValueError: + return CODE_UNSUPPORTED_FORMAT + + if crt.signature_algorithm_oid not in SUPPORTED_SIGNATURE_ALGORITHMS: + return CODE_UNSUPPORTED_SIGNATURE_ALGORITHM + + try: + public_key = crt.public_key() + except UnsupportedAlgorithm: + return CODE_UNSUPPORTED_PUBLIC_KEY + + if isinstance(public_key, rsa.RSAPublicKey): + if public_key.key_size < 2048: + return CODE_PUBLIC_KEY_TOO_WEAK + elif isinstance(public_key, ec.EllipticCurvePublicKey): + if public_key.key_size < 256: + return CODE_PUBLIC_KEY_TOO_WEAK + else: + return CODE_UNSUPPORTED_PUBLIC_KEY + + if public_numbers and public_key.public_numbers() != public_numbers: + return CODE_CSR_AND_CRT_PUBLIC_KEY_MISMATCH + + if crt.not_valid_after_utc < datetime.now(timezone.utc): + return CODE_CERTIFICATE_IS_EXPIRED + + try: + crt.verify_directly_issued_by(ca_certificate) + except Exception as e: + self.logger.debug("certificate verification failed: %s\n issuer of certificate: %s\n" + " CA certificate: %s", e, crt.issuer, + ca_certificate.subject) + return CODE_NOT_SIGNED_BY_EXPECTED_CA_CERTIFICATE + + return CODE_OK + + def get_statistics(self): + self.logger.info("Statistics:\n%s", self.c) + + def main(): db_user = os.getenv("DB_USER", default="cacert") db_password = os.getenv("DB_PASSWORD", default="cacert") db_host = os.getenv("DB_HOST", "localhost") db_port = os.getenv("DB_PORT", "3306") db_name = os.getenv("DB_NAME", "cacert") + root_ca_cert: str = os.getenv("ROOT_CA_CERTIFICATE", "../www/certs/root_X0F.crt") + sub_ca_cert = os.getenv("SUB_CA_CERTIFICATE", "../www/certs/CAcert_Class3Root_x14E228.crt") + debug = bool(os.getenv("DEBUG", "false")) - ca_certificates = load_ca_certificates() + logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s: %(message)s") + logger = logging.getLogger(__name__) - dsn = ( - f"mariadb+mariadbconnector://{db_user}:{db_password}@{db_host}:{db_port}/{db_name}" + if debug: + logger.level = logging.DEBUG + + analyzer = Analyzer( + logger=logger, + dsn=f"mariadb+mariadbconnector://{db_user}:{db_password}@{db_host}:{db_port}/{db_name}", + ca_certificates=load_ca_certificates(root_ca_cert, sub_ca_cert), ) - engine = create_engine(dsn) - metadata_obj = MetaData() - fail_counter = 0 - good_counter = 0 + analyzer.analyze() - for table in ("emailcerts", "domaincerts", "orgemailcerts", "orgdomaincerts"): - certs_table = Table(table, metadata_obj, autoload_with=engine) - - stmt = select(certs_table) - - with engine.connect() as conn: - for row in conn.execute(stmt): - csr_code, public_numbers = check_csr(row.csr_name) - ca_cert = ca_certificates[row.rootcert] - crt_code = check_crt(row.crt_name, ca_cert, public_numbers) - - if csr_code != CODE_OK or crt_code != CODE_OK: - fail_counter += 1 - print(f"{fail_counter:03d} {table}: {row.id:06d} -> csr_code: {csr_code}, crt_code: {crt_code}") - else: - good_counter += 1 - - print(f"Good: {good_counter}, Failed: {fail_counter}") + analyzer.get_statistics() if __name__ == "__main__":