From 9c2c22ab4cc3051374ca8a2de8caed4e0f13cbb6 Mon Sep 17 00:00:00 2001 From: Jan Dittberner Date: Sun, 14 Jan 2024 20:11:25 +0100 Subject: [PATCH] Add support for organization client certificate requests --- main.py | 266 ++++++++++++++++++++++++++++++++++++++++++++++++++------ 1 file changed, 239 insertions(+), 27 deletions(-) diff --git a/main.py b/main.py index f245394..722956a 100644 --- a/main.py +++ b/main.py @@ -1,6 +1,7 @@ import configparser import logging import pathlib +import random import secrets import sys from hashlib import md5, sha1 @@ -11,6 +12,26 @@ from cryptography.hazmat.primitives.asymmetric import rsa from faker import Faker from sqlalchemy import MetaData, Table, select, create_engine, insert, func, update +# german state codes from ISO-3166-2:DE +GERMAN_STATE_CODES = [ + "BW", + "BY", + "BE", + "BB", + "HB", + "HH", + "HE", + "MV", + "NI", + "NW", + "RP", + "SL", + "SN", + "ST", + "SH", + "TH", +] + CSR_TYPE_VENDOR_INDEPENDENT = "VI" @@ -49,6 +70,24 @@ class User: return "CAcert WoT user" +class Organization: + def __init__(self, contact: str): + self.contact = contact + self.id = None + self.name = "" + self.locality = "" + self.state = "" + self.country = "" + self.comments = "" + self.ou_name = "" + self.domain = "" + + def __repr__(self): + return ( + f"Organization<'{self.name}' <{self.contact}>, '{self.domain}', {self.id}>" + ) + + TEST_USERS = { "wot": User("wot-user@example.org"), "assured": User("assured-user@example.org", is_assured=True), @@ -68,6 +107,10 @@ TEST_USERS = { ), } +TEST_ORGANIZATIONS = { + "evil-corp": (Organization("contact@evilcorp.example.org"), "org-admin"), +} + class DataGenerator: def __init__(self, engine, csr_dir, crt_dir): @@ -76,6 +119,15 @@ class DataGenerator: self.crt_dir = crt_dir self.metadata = MetaData() self.users_table = Table("users", self.metadata, autoload_with=self.engine) + self.organizations_table = Table( + "orginfo", self.metadata, autoload_with=self.engine + ) + self.organizations_assignments_table = Table( + "org", self.metadata, autoload_with=self.engine + ) + self.organizations_domain_table = Table( + "orgdomains", self.metadata, autoload_with=self.engine + ) self.emailcerts_table = Table( "emailcerts", self.metadata, autoload_with=self.engine ) @@ -89,9 +141,9 @@ class DataGenerator: "orgdomaincerts", self.metadata, autoload_with=self.engine ) self.fake = Faker() - Faker.seed(0) + Faker.seed(42) - def ensure_users(self): + def populate_users(self): with self.engine.connect() as conn: for key, user in TEST_USERS.items(): stmt = select(self.users_table).where( @@ -131,6 +183,93 @@ class DataGenerator: user.last_name = db_user.lname logging.info("created test users %s", TEST_USERS) + def populate_organizations(self): + with self.engine.connect() as conn: + for key, org_item in TEST_ORGANIZATIONS.items(): + org, admin = org_item + db_orginfo = conn.execute( + select(self.organizations_table).where( + self.organizations_table.c.contact == org.contact + ) + ).one_or_none() + if not db_orginfo: + org.name = self.fake.company() + org.locality = self.fake.city() + org.country = self.fake.country_code() + org.state = random.choice(GERMAN_STATE_CODES) + org.comments = self.fake.catch_phrase() + + result = conn.execute( + insert(self.organizations_table).values( + contact=org.contact, + O=org.name, + L=org.locality, + ST=org.state, + C=org.country, + comments=org.comments, + creator_id=0, + created=func.now(), + ) + ) + + org.id = result.inserted_primary_key.id + + conn.commit() + else: + org.id = db_orginfo.id + org.name = db_orginfo.O + org.locality = db_orginfo.L + org.country = db_orginfo.C + org.state = db_orginfo.ST + org.comments = db_orginfo.comments + + db_org = conn.execute( + select(self.organizations_assignments_table) + .where(self.organizations_assignments_table.c.orgid == org.id) + .where( + self.organizations_assignments_table.c.memid + == TEST_USERS[admin].id, + ) + ).one_or_none() + if not db_org: + org.ou_name = self.fake.text(max_nb_chars=40) + conn.execute( + insert(self.organizations_assignments_table).values( + orgid=org.id, + memid=TEST_USERS[admin].id, + OU=org.ou_name, + masteracc=True, + comments=self.fake.sentence(), + creator_id=0, + created=func.now(), + ) + ) + + conn.commit() + else: + org.ou_name = db_org.OU + logging.info("created organizational unit %s", org.ou_name) + + db_org_domain = conn.execute( + select(self.organizations_domain_table).where( + self.organizations_domain_table.c.orgid == org.id + ) + ).one_or_none() + if not db_org_domain: + org.domain = self.fake.domain_name() + conn.execute( + insert(self.organizations_domain_table).values( + orgid=org.id, + domain=org.domain, + ) + ) + + conn.commit() + else: + org.domain = db_org_domain.domain + + logging.info("created organizations %s", TEST_ORGANIZATIONS) + def generate_csrs(self): self.create_email_csr("wot") self.create_email_csr("assured") @@ -140,8 +279,48 @@ class DataGenerator: self.create_email_csr("assurer", md="md5") self.create_email_csr("assurer", md="sha1") self.create_email_csr("assurer", md="sha512") + self.create_codesign_csr("assured") + self.create_codesign_csr("admin") + self.create_codesign_csr("assured", "sha512") + self.create_org_email_csr("evil-corp") + self.create_org_email_csr("evil-corp", "sha512") + self.create_org_codesign_csr("evil-corp") + self.create_org_codesign_csr("evil-corp", "sha512") def create_email_csr(self, user_key, md="sha256", root_cert=2): + self._create_personal_client_cert_request(user_key, md, root_cert, False) + + def create_codesign_csr(self, user_key, md="sha256", root_cert=2): + self._create_personal_client_cert_request(user_key, md, root_cert, True) + + def create_org_email_csr(self, org_key, md="sha256", root_cert=2): + self._create_organization_client_cert_request(org_key, md, root_cert, False) + + def create_org_codesign_csr(self, org_key, md="sha256", root_cert=2): + self._create_organization_client_cert_request(org_key, md, root_cert, True) + + def _generate_csr(self, csr_type: str, subject: x509.Name, row_id: int) -> str: + private_key = rsa.generate_private_key(public_exponent=65537, key_size=2048) + + csr = ( + x509.CertificateSigningRequestBuilder() + .subject_name(subject) + .sign(private_key, hashes.SHA256()) + ) + + p = ( + pathlib.Path(self.csr_dir) + / csr_type + / str(row_id)[:3] + / f"{csr_type}-{row_id}.csr" + ) + p.parent.mkdir(mode=0o755, parents=True, exist_ok=True) + + p.write_bytes(csr.public_bytes(serialization.Encoding.PEM)) + + return str(p) + + def _create_personal_client_cert_request(self, user_key, md, root_cert, code_sign): user = TEST_USERS[user_key] subject = x509.Name( @@ -155,47 +334,79 @@ class DataGenerator: # SPKAC type signing requests and uses MD5, "MS" seems to be related to Microsoft browsers. "VI" might stand # for "vendor independent". with self.engine.connect() as conn: - stmt = insert(self.emailcerts_table).values( - memid=user.id, - CN=user.email, - subject=f"/CN={user.get_name()}/emailAddress={user.email}", - keytype=CSR_TYPE_VENDOR_INDEPENDENT, - created=func.now(), - modified=func.now(), - rootcert=root_cert, - md=md, - disablelogin=1, + result = conn.execute( + insert(self.emailcerts_table).values( + memid=user.id, + CN=user.email, + subject=f"/CN={user.get_name()}/emailAddress={user.email}", + keytype=CSR_TYPE_VENDOR_INDEPENDENT, + created=func.now(), + modified=func.now(), + rootcert=root_cert, + md=md, + disablelogin=True, + codesign=code_sign, + ) ) - result = conn.execute(stmt) cert_id = result.inserted_primary_key.id logging.debug("created emailcerts entry with id %d", cert_id) - csr_filename = self.generate_csr("client", subject, cert_id) + csr_filename = self._generate_csr("client", subject, cert_id) - stmt = ( + conn.execute( update(self.emailcerts_table) .where(self.emailcerts_table.c.id == cert_id) .values(csr_name=csr_filename, modified=func.now()) ) - conn.execute(stmt) conn.commit() - def generate_csr(self, csr_type: str, subject: x509.Name, id: int) -> str: - private_key = rsa.generate_private_key(public_exponent=65537, key_size=2048) + def _create_organization_client_cert_request( + self, org_key, md, root_cert, code_sign + ): + org, user_key = TEST_ORGANIZATIONS[org_key] + user = TEST_USERS[user_key] - csr = ( - x509.CertificateSigningRequestBuilder() - .subject_name(subject) - .sign(private_key, hashes.SHA256()) + subject = x509.Name( + [ + x509.NameAttribute(x509.oid.NameOID.ORGANIZATION_NAME, org.name), + x509.NameAttribute( + x509.oid.NameOID.ORGANIZATIONAL_UNIT_NAME, org.ou_name + ), + x509.NameAttribute(x509.oid.NameOID.COUNTRY_NAME, org.country), + x509.NameAttribute(x509.oid.NameOID.LOCALITY_NAME, org.locality), + x509.NameAttribute(x509.oid.NameOID.STATE_OR_PROVINCE_NAME, org.state), + ] ) - p = pathlib.Path(self.csr_dir) / csr_type / str(id)[:3] / f"{csr_type}-{id}.csr" - p.parent.mkdir(mode=0o755, parents=True, exist_ok=True) + with self.engine.connect() as conn: + result = conn.execute( + insert(self.orgemailcerts_table).values( + orgid=org.id, + CN=org.domain, + subject=f"/O={org.name}/OU={org.ou_name}/C={org.country}/ST={org.state}/L={org.locality}/emailAddress={self.fake.email(domain=org.domain)}", + keytype=CSR_TYPE_VENDOR_INDEPENDENT, + created=func.now(), + modified=func.now(), + rootcert=root_cert, + md=md, + codesign=code_sign, + ou=org.ou_name, + orgadminid=user.id, + ) + ) + cert_id = result.inserted_primary_key.id + logging.debug("created orgemailcerts entry with id %d", cert_id) - p.write_bytes(csr.public_bytes(serialization.Encoding.PEM)) + csr_filename = self._generate_csr("orgclient", subject, cert_id) - return str(p) + conn.execute( + update(self.orgemailcerts_table) + .where(self.orgemailcerts_table.c.id == cert_id) + .values(csr_name=csr_filename, modified=func.now()) + ) + + conn.commit() def main(config_file: str): @@ -212,7 +423,8 @@ def main(config_file: str): engine, cp.get("files", "csr_dir"), cp.get("files", "crt_dir") ) - data_generator.ensure_users() + data_generator.populate_users() + data_generator.populate_organizations() data_generator.generate_csrs()