add-quality-check-script #20

Open
jandd wants to merge 6 commits from add-quality-check-script into main
2 changed files with 183 additions and 22 deletions
Showing only changes of commit ef68be8b60 - Show all commits

View file

@ -1,27 +1,188 @@
#!/usr/bin/env python3
from sqlalchemy import create_engine, select, MetaData, Table
from pprint import pprint
import os
import re
import typing
from datetime import datetime, timezone
from typing import NamedTuple
db_user = os.getenv("db_user", default="cacert")
db_password = os.getenv("db_password", default="cacert")
db_host = os.getenv("db_host", "localhost")
db_port = os.getenv("db_port", "3306")
db_name = os.getenv("db_name", "cacert")
from cryptography import x509
from cryptography.exceptions import UnsupportedAlgorithm
from cryptography.hazmat.primitives.asymmetric import rsa, ec
from sqlalchemy import create_engine, select, MetaData, Table
dsn = (
f"mariadb+mariadbconnector://{db_user}:{db_password}@{db_host}:{db_port}/{db_name}"
)
engine = create_engine(dsn)
metadata_obj = MetaData()
class CheckResult(NamedTuple):
name: str
code: int
for table in ("emailcerts", "domaincerts", "orgemailcerts", "orgdomaincerts"):
certs_table = Table(table, metadata_obj, autoload_with=engine)
def __str__(self):
return f"[{self.code:04d}: {self.name}]"
stmt = select(certs_table)
with engine.connect() as conn:
for row in conn.execute(stmt):
pprint(row)
CODE_OK = CheckResult("OK", 0)
CODE_FILE_MISSING = CheckResult("file missing", 1000)
CODE_UNSUPPORTED_FORMAT = CheckResult("unsupported format", 1001)
CODE_EMPTY = CheckResult("empty", 1002)
CODE_DEPRECATED_SPKAC = CheckResult("deprecated SPKAC", 1003)
CODE_INVALID_SIGNATURE = CheckResult("invalid signature", 1004)
CODE_UNSUPPORTED_SIGNATURE_ALGORITHM = CheckResult("unsupported signature algorithm", 1005)
CODE_PUBLIC_KEY_TOO_WEAK = CheckResult("public key too weak", 1006)
CODE_UNSUPPORTED_PUBLIC_KEY = CheckResult("unsupported public key", 1007)
CODE_CSR_AND_CRT_PUBLIC_KEY_MISMATCH = CheckResult("CSR and CRT public key mismatch", 1008)
CODE_CERTIFICATE_FOR_INVALID_CSR = CheckResult("certificate for invalid CSR", 1009)
CODE_NOT_SIGNED_BY_EXPECTED_CA_CERTIFICATE = CheckResult("not signed by expected CA", 1010)
CODE_CERTIFICATE_IS_EXPIRED = CheckResult("certificate is expired", 1011)
SUPPORTED_SIGNATURE_ALGORITHMS = [
x509.oid.SignatureAlgorithmOID.RSA_WITH_SHA256, x509.oid.SignatureAlgorithmOID.RSA_WITH_SHA384,
x509.oid.SignatureAlgorithmOID.RSA_WITH_SHA512, x509.oid.SignatureAlgorithmOID.ECDSA_WITH_SHA256,
x509.oid.SignatureAlgorithmOID.ECDSA_WITH_SHA384, x509.oid.SignatureAlgorithmOID.ECDSA_WITH_SHA512
]
def check_csr(csr_name: str) -> [CheckResult, typing.Any]:
if not csr_name:
return CODE_EMPTY, None
if not os.path.isfile(csr_name):
return CODE_FILE_MISSING, None
with open(csr_name, "rb") as f:
csr_data = f.read()
if re.search(r"SPKAC = ", csr_data.decode("iso-8859-1")):
return CODE_DEPRECATED_SPKAC, None
try:
csr = x509.load_pem_x509_csr(csr_data)
except Exception as e:
print(e, csr_data)
return CODE_UNSUPPORTED_FORMAT, None
if csr.signature_algorithm_oid not in SUPPORTED_SIGNATURE_ALGORITHMS:
return CODE_UNSUPPORTED_SIGNATURE_ALGORITHM, None
try:
if not csr.is_signature_valid:
return CODE_INVALID_SIGNATURE, None
except Exception as e:
print(e, csr_data)
return CODE_INVALID_SIGNATURE, None
public_key = csr.public_key()
if isinstance(public_key, rsa.RSAPublicKey):
if public_key.key_size < 2048:
return CODE_PUBLIC_KEY_TOO_WEAK, None
elif isinstance(public_key, ec.EllipticCurvePublicKey):
if public_key.key_size < 256:
return CODE_PUBLIC_KEY_TOO_WEAK, None
else:
return CODE_UNSUPPORTED_PUBLIC_KEY, None
return CODE_OK, public_key.public_numbers()
def check_crt(crt_name: str, ca_certificate: x509.Certificate, public_numbers: typing.Any) -> CheckResult:
if not crt_name:
return CODE_EMPTY
if public_numbers is None:
return CODE_CERTIFICATE_FOR_INVALID_CSR
if not os.path.isfile(crt_name):
return CODE_FILE_MISSING
with open(crt_name, "rb") as f:
crt_data = f.read()
try:
crt = x509.load_pem_x509_certificate(crt_data)
except ValueError:
return CODE_UNSUPPORTED_FORMAT
if crt.signature_algorithm_oid not in SUPPORTED_SIGNATURE_ALGORITHMS:
return CODE_UNSUPPORTED_SIGNATURE_ALGORITHM
try:
public_key = crt.public_key()
except UnsupportedAlgorithm:
return CODE_UNSUPPORTED_PUBLIC_KEY
if isinstance(public_key, rsa.RSAPublicKey):
if public_key.key_size < 2048:
return CODE_PUBLIC_KEY_TOO_WEAK
elif isinstance(public_key, ec.EllipticCurvePublicKey):
if public_key.key_size < 256:
return CODE_PUBLIC_KEY_TOO_WEAK
else:
return CODE_UNSUPPORTED_PUBLIC_KEY
if public_key.public_numbers() != public_numbers:
return CODE_CSR_AND_CRT_PUBLIC_KEY_MISMATCH
if crt.not_valid_after_utc < datetime.now(timezone.utc):
return CODE_CERTIFICATE_IS_EXPIRED
try:
crt.verify_directly_issued_by(ca_certificate)
except Exception as e:
print(e, crt.issuer, ca_certificate.subject)
return CODE_NOT_SIGNED_BY_EXPECTED_CA_CERTIFICATE
return CODE_OK
def load_ca_certificates() -> dict[int, x509.Certificate]:
with open("../certs/root_X0F.crt", "rb") as f:
root_cert = x509.load_pem_x509_certificate(f.read())
with open("../certs/CAcert_Class3Root_x14E228.crt", "rb") as f:
class3_cert = x509.load_pem_x509_certificate(f.read())
return {
1: root_cert,
2: class3_cert,
}
def main():
db_user = os.getenv("DB_USER", default="cacert")
db_password = os.getenv("DB_PASSWORD", default="cacert")
db_host = os.getenv("DB_HOST", "localhost")
db_port = os.getenv("DB_PORT", "3306")
db_name = os.getenv("DB_NAME", "cacert")
ca_certificates = load_ca_certificates()
dsn = (
f"mariadb+mariadbconnector://{db_user}:{db_password}@{db_host}:{db_port}/{db_name}"
)
engine = create_engine(dsn)
metadata_obj = MetaData()
fail_counter = 0
good_counter = 0
for table in ("emailcerts", "domaincerts", "orgemailcerts", "orgdomaincerts"):
certs_table = Table(table, metadata_obj, autoload_with=engine)
stmt = select(certs_table)
with engine.connect() as conn:
for row in conn.execute(stmt):
csr_code, public_numbers = check_csr(row.csr_name)
ca_cert = ca_certificates[row.rootcert]
crt_code = check_crt(row.crt_name, ca_cert, public_numbers)
if csr_code != CODE_OK or crt_code != CODE_OK:
fail_counter += 1
print(f"{fail_counter:03d} {table}: {row.id:06d} -> csr_code: {csr_code}, crt_code: {crt_code}")
else:
good_counter += 1
print(f"Good: {good_counter}, Failed: {fail_counter}")
if __name__ == "__main__":
main()

6
scripts/poetry.lock generated
View file

@ -323,13 +323,13 @@ sqlcipher = ["sqlcipher3_binary"]
[[package]]
name = "typing-extensions"
version = "4.11.0"
version = "4.12.0"
description = "Backported and Experimental Type Hints for Python 3.8+"
optional = false
python-versions = ">=3.8"
files = [
{file = "typing_extensions-4.11.0-py3-none-any.whl", hash = "sha256:c1f94d72897edaf4ce775bb7558d5b79d8126906a14ea5ed1635921406c0387a"},
{file = "typing_extensions-4.11.0.tar.gz", hash = "sha256:83f085bd5ca59c80295fc2a82ab5dac679cbe02b9f33f7d83af68e241bea51b0"},
{file = "typing_extensions-4.12.0-py3-none-any.whl", hash = "sha256:b349c66bea9016ac22978d800cfff206d5f9816951f12a7d0ec5578b0a819594"},
{file = "typing_extensions-4.12.0.tar.gz", hash = "sha256:8cbcdc8606ebcb0d95453ad7dc5065e6237b6aa230a31e81d0f440c30fed5fd8"},
]
[metadata]