diff --git a/scripts/check_db_certificates.py b/scripts/check_db_certificates.py index 208e354..e5ba23f 100644 --- a/scripts/check_db_certificates.py +++ b/scripts/check_db_certificates.py @@ -1,27 +1,188 @@ #!/usr/bin/env python3 - -from sqlalchemy import create_engine, select, MetaData, Table -from pprint import pprint import os +import re +import typing +from datetime import datetime, timezone +from typing import NamedTuple -db_user = os.getenv("db_user", default="cacert") -db_password = os.getenv("db_password", default="cacert") -db_host = os.getenv("db_host", "localhost") -db_port = os.getenv("db_port", "3306") -db_name = os.getenv("db_name", "cacert") +from cryptography import x509 +from cryptography.exceptions import UnsupportedAlgorithm +from cryptography.hazmat.primitives.asymmetric import rsa, ec +from sqlalchemy import create_engine, select, MetaData, Table -dsn = ( - f"mariadb+mariadbconnector://{db_user}:{db_password}@{db_host}:{db_port}/{db_name}" -) -engine = create_engine(dsn) -metadata_obj = MetaData() +class CheckResult(NamedTuple): + name: str + code: int -for table in ("emailcerts", "domaincerts", "orgemailcerts", "orgdomaincerts"): - certs_table = Table(table, metadata_obj, autoload_with=engine) + def __str__(self): + return f"[{self.code:04d}: {self.name}]" - stmt = select(certs_table) - with engine.connect() as conn: - for row in conn.execute(stmt): - pprint(row) +CODE_OK = CheckResult("OK", 0) +CODE_FILE_MISSING = CheckResult("file missing", 1000) +CODE_UNSUPPORTED_FORMAT = CheckResult("unsupported format", 1001) +CODE_EMPTY = CheckResult("empty", 1002) +CODE_DEPRECATED_SPKAC = CheckResult("deprecated SPKAC", 1003) +CODE_INVALID_SIGNATURE = CheckResult("invalid signature", 1004) +CODE_UNSUPPORTED_SIGNATURE_ALGORITHM = CheckResult("unsupported signature algorithm", 1005) +CODE_PUBLIC_KEY_TOO_WEAK = CheckResult("public key too weak", 1006) +CODE_UNSUPPORTED_PUBLIC_KEY = CheckResult("unsupported public key", 1007) +CODE_CSR_AND_CRT_PUBLIC_KEY_MISMATCH = CheckResult("CSR and CRT public key mismatch", 1008) +CODE_CERTIFICATE_FOR_INVALID_CSR = CheckResult("certificate for invalid CSR", 1009) +CODE_NOT_SIGNED_BY_EXPECTED_CA_CERTIFICATE = CheckResult("not signed by expected CA", 1010) +CODE_CERTIFICATE_IS_EXPIRED = CheckResult("certificate is expired", 1011) + +SUPPORTED_SIGNATURE_ALGORITHMS = [ + x509.oid.SignatureAlgorithmOID.RSA_WITH_SHA256, x509.oid.SignatureAlgorithmOID.RSA_WITH_SHA384, + x509.oid.SignatureAlgorithmOID.RSA_WITH_SHA512, x509.oid.SignatureAlgorithmOID.ECDSA_WITH_SHA256, + x509.oid.SignatureAlgorithmOID.ECDSA_WITH_SHA384, x509.oid.SignatureAlgorithmOID.ECDSA_WITH_SHA512 +] + + +def check_csr(csr_name: str) -> [CheckResult, typing.Any]: + if not csr_name: + return CODE_EMPTY, None + + if not os.path.isfile(csr_name): + return CODE_FILE_MISSING, None + + with open(csr_name, "rb") as f: + csr_data = f.read() + + if re.search(r"SPKAC = ", csr_data.decode("iso-8859-1")): + return CODE_DEPRECATED_SPKAC, None + + try: + csr = x509.load_pem_x509_csr(csr_data) + except Exception as e: + print(e, csr_data) + return CODE_UNSUPPORTED_FORMAT, None + + if csr.signature_algorithm_oid not in SUPPORTED_SIGNATURE_ALGORITHMS: + return CODE_UNSUPPORTED_SIGNATURE_ALGORITHM, None + + try: + if not csr.is_signature_valid: + return CODE_INVALID_SIGNATURE, None + except Exception as e: + print(e, csr_data) + return CODE_INVALID_SIGNATURE, None + + public_key = csr.public_key() + + if isinstance(public_key, rsa.RSAPublicKey): + if public_key.key_size < 2048: + return CODE_PUBLIC_KEY_TOO_WEAK, None + elif isinstance(public_key, ec.EllipticCurvePublicKey): + if public_key.key_size < 256: + return CODE_PUBLIC_KEY_TOO_WEAK, None + else: + return CODE_UNSUPPORTED_PUBLIC_KEY, None + + return CODE_OK, public_key.public_numbers() + + +def check_crt(crt_name: str, ca_certificate: x509.Certificate, public_numbers: typing.Any) -> CheckResult: + if not crt_name: + return CODE_EMPTY + + if public_numbers is None: + return CODE_CERTIFICATE_FOR_INVALID_CSR + + if not os.path.isfile(crt_name): + return CODE_FILE_MISSING + + with open(crt_name, "rb") as f: + crt_data = f.read() + + try: + crt = x509.load_pem_x509_certificate(crt_data) + except ValueError: + return CODE_UNSUPPORTED_FORMAT + + if crt.signature_algorithm_oid not in SUPPORTED_SIGNATURE_ALGORITHMS: + return CODE_UNSUPPORTED_SIGNATURE_ALGORITHM + + try: + public_key = crt.public_key() + except UnsupportedAlgorithm: + return CODE_UNSUPPORTED_PUBLIC_KEY + + if isinstance(public_key, rsa.RSAPublicKey): + if public_key.key_size < 2048: + return CODE_PUBLIC_KEY_TOO_WEAK + elif isinstance(public_key, ec.EllipticCurvePublicKey): + if public_key.key_size < 256: + return CODE_PUBLIC_KEY_TOO_WEAK + else: + return CODE_UNSUPPORTED_PUBLIC_KEY + + if public_key.public_numbers() != public_numbers: + return CODE_CSR_AND_CRT_PUBLIC_KEY_MISMATCH + + if crt.not_valid_after_utc < datetime.now(timezone.utc): + return CODE_CERTIFICATE_IS_EXPIRED + + try: + crt.verify_directly_issued_by(ca_certificate) + except Exception as e: + print(e, crt.issuer, ca_certificate.subject) + return CODE_NOT_SIGNED_BY_EXPECTED_CA_CERTIFICATE + + return CODE_OK + + +def load_ca_certificates() -> dict[int, x509.Certificate]: + with open("../certs/root_X0F.crt", "rb") as f: + root_cert = x509.load_pem_x509_certificate(f.read()) + + with open("../certs/CAcert_Class3Root_x14E228.crt", "rb") as f: + class3_cert = x509.load_pem_x509_certificate(f.read()) + + return { + 1: root_cert, + 2: class3_cert, + } + + +def main(): + db_user = os.getenv("DB_USER", default="cacert") + db_password = os.getenv("DB_PASSWORD", default="cacert") + db_host = os.getenv("DB_HOST", "localhost") + db_port = os.getenv("DB_PORT", "3306") + db_name = os.getenv("DB_NAME", "cacert") + + ca_certificates = load_ca_certificates() + + dsn = ( + f"mariadb+mariadbconnector://{db_user}:{db_password}@{db_host}:{db_port}/{db_name}" + ) + engine = create_engine(dsn) + + metadata_obj = MetaData() + fail_counter = 0 + good_counter = 0 + + for table in ("emailcerts", "domaincerts", "orgemailcerts", "orgdomaincerts"): + certs_table = Table(table, metadata_obj, autoload_with=engine) + + stmt = select(certs_table) + + with engine.connect() as conn: + for row in conn.execute(stmt): + csr_code, public_numbers = check_csr(row.csr_name) + ca_cert = ca_certificates[row.rootcert] + crt_code = check_crt(row.crt_name, ca_cert, public_numbers) + + if csr_code != CODE_OK or crt_code != CODE_OK: + fail_counter += 1 + print(f"{fail_counter:03d} {table}: {row.id:06d} -> csr_code: {csr_code}, crt_code: {crt_code}") + else: + good_counter += 1 + + print(f"Good: {good_counter}, Failed: {fail_counter}") + + +if __name__ == "__main__": + main() diff --git a/scripts/poetry.lock b/scripts/poetry.lock index 4318ddc..ce0d81d 100644 --- a/scripts/poetry.lock +++ b/scripts/poetry.lock @@ -323,13 +323,13 @@ sqlcipher = ["sqlcipher3_binary"] [[package]] name = "typing-extensions" -version = "4.11.0" +version = "4.12.0" description = "Backported and Experimental Type Hints for Python 3.8+" optional = false python-versions = ">=3.8" files = [ - {file = "typing_extensions-4.11.0-py3-none-any.whl", hash = "sha256:c1f94d72897edaf4ce775bb7558d5b79d8126906a14ea5ed1635921406c0387a"}, - {file = "typing_extensions-4.11.0.tar.gz", hash = "sha256:83f085bd5ca59c80295fc2a82ab5dac679cbe02b9f33f7d83af68e241bea51b0"}, + {file = "typing_extensions-4.12.0-py3-none-any.whl", hash = "sha256:b349c66bea9016ac22978d800cfff206d5f9816951f12a7d0ec5578b0a819594"}, + {file = "typing_extensions-4.12.0.tar.gz", hash = "sha256:8cbcdc8606ebcb0d95453ad7dc5065e6237b6aa230a31e81d0f440c30fed5fd8"}, ] [metadata]