import configparser import logging import pathlib import random import secrets import sys from hashlib import md5, sha1 from cryptography import x509 from cryptography.hazmat.primitives import hashes, serialization from cryptography.hazmat.primitives.asymmetric import rsa from faker import Faker from sqlalchemy import MetaData, Table, select, create_engine, insert, func, update # german state codes from ISO-3166-2:DE GERMAN_STATE_CODES = [ "BW", "BY", "BE", "BB", "HB", "HH", "HE", "MV", "NI", "NW", "RP", "SL", "SN", "ST", "SH", "TH", ] CSR_TYPE_VENDOR_INDEPENDENT = "VI" def build_user_unique_id(email: str) -> str: return md5(email.encode() + secrets.token_bytes(8)).hexdigest() def build_random_password(): return sha1(secrets.token_bytes(16)).hexdigest() class User: def __init__( self, email: str, is_org_admin: bool = False, is_assured: bool = False, is_assurer: bool = False, is_admin: bool = False, ): self.email = email self.is_org_admin = is_org_admin self.is_assured = is_assured self.is_assurer = is_assurer self.is_admin = is_admin self.id = None self.first_name = "" self.last_name = "" def __repr__(self): return f"User<'{self.email}', {self.id}>" def get_name(self): if self.is_assured: return f"{self.first_name} {self.last_name}" return "CAcert WoT user" class Organization: def __init__(self, contact: str): self.contact = contact self.id = None self.name = "" self.locality = "" self.state = "" self.country = "" self.comments = "" self.ou_name = "" self.domain = "" def __repr__(self): return ( f"Organization<'{self.name}' <{self.contact}>, '{self.domain}', {self.id}>" ) TEST_USERS = { "wot": User("wot-user@example.org"), "assured": User("assured-user@example.org", is_assured=True), "assurer": User("assurer-user@example.org", is_assured=True, is_assurer=True), "org-admin": User( "org-user@example.org", is_assured=True, is_assurer=True, is_org_admin=True, ), "admin": User( "admin-user@example.org", is_assured=True, is_assurer=True, is_org_admin=True, is_admin=True, ), } TEST_ORGANIZATIONS = { "evil-corp": (Organization("contact@evilcorp.example.org"), "org-admin"), } class DataGenerator: def __init__(self, engine, csr_dir, crt_dir): self.engine = engine self.csr_dir = csr_dir self.crt_dir = crt_dir self.metadata = MetaData() self.users_table = Table("users", self.metadata, autoload_with=self.engine) self.organizations_table = Table( "orginfo", self.metadata, autoload_with=self.engine ) self.organizations_assignments_table = Table( "org", self.metadata, autoload_with=self.engine ) self.organizations_domain_table = Table( "orgdomains", self.metadata, autoload_with=self.engine ) self.emailcerts_table = Table( "emailcerts", self.metadata, autoload_with=self.engine ) self.orgemailcerts_table = Table( "orgemailcerts", self.metadata, autoload_with=self.engine ) self.domaincerts_table = Table( "domaincerts", self.metadata, autoload_with=self.engine ) self.orgdomaincerts_table = Table( "orgdomaincerts", self.metadata, autoload_with=self.engine ) self.fake = Faker() Faker.seed(42) def populate_users(self): with self.engine.connect() as conn: for key, user in TEST_USERS.items(): stmt = select(self.users_table).where( self.users_table.c.email == user.email ) db_user = conn.execute(stmt).one_or_none() if not db_user: user.first_name = self.fake.first_name() user.last_name = self.fake.last_name() stmt = insert(self.users_table).values( email=user.email, fname=user.first_name, lname=user.last_name, password=build_random_password(), orgadmin=user.is_org_admin, admin=user.is_admin, assurer=user.is_assurer, adadmin=False, locked=False, dob=self.fake.passport_dob(), uniqueID=build_user_unique_id(user.email), otphash="", otppin=0, verified=True, created=func.now(), modified=func.now(), ) result = conn.execute(stmt) user.id = result.inserted_primary_key.id conn.commit() else: user.id = db_user.id user.first_name = db_user.fname user.last_name = db_user.lname logging.info("created test users %s", TEST_USERS) def populate_organizations(self): with self.engine.connect() as conn: for key, org_item in TEST_ORGANIZATIONS.items(): org, admin = org_item db_orginfo = conn.execute( select(self.organizations_table).where( self.organizations_table.c.contact == org.contact ) ).one_or_none() if not db_orginfo: org.name = self.fake.company() org.locality = self.fake.city() org.country = self.fake.country_code() org.state = random.choice(GERMAN_STATE_CODES) org.comments = self.fake.catch_phrase() result = conn.execute( insert(self.organizations_table).values( contact=org.contact, O=org.name, L=org.locality, ST=org.state, C=org.country, comments=org.comments, creator_id=0, created=func.now(), ) ) org.id = result.inserted_primary_key.id conn.commit() else: org.id = db_orginfo.id org.name = db_orginfo.O org.locality = db_orginfo.L org.country = db_orginfo.C org.state = db_orginfo.ST org.comments = db_orginfo.comments db_org = conn.execute( select(self.organizations_assignments_table) .where(self.organizations_assignments_table.c.orgid == org.id) .where( self.organizations_assignments_table.c.memid == TEST_USERS[admin].id, ) ).one_or_none() if not db_org: org.ou_name = self.fake.text(max_nb_chars=40) conn.execute( insert(self.organizations_assignments_table).values( orgid=org.id, memid=TEST_USERS[admin].id, OU=org.ou_name, masteracc=True, comments=self.fake.sentence(), creator_id=0, created=func.now(), ) ) conn.commit() else: org.ou_name = db_org.OU logging.info("created organizational unit %s", org.ou_name) db_org_domain = conn.execute( select(self.organizations_domain_table).where( self.organizations_domain_table.c.orgid == org.id ) ).one_or_none() if not db_org_domain: org.domain = self.fake.domain_name() conn.execute( insert(self.organizations_domain_table).values( orgid=org.id, domain=org.domain, ) ) conn.commit() else: org.domain = db_org_domain.domain logging.info("created organizations %s", TEST_ORGANIZATIONS) def generate_csrs(self): self.create_email_csr("wot") self.create_email_csr("assured") self.create_email_csr("assurer") self.create_email_csr("admin") self.create_email_csr("org-admin") self.create_email_csr("assurer", md="md5") self.create_email_csr("assurer", md="sha1") self.create_email_csr("assurer", md="sha512") self.create_codesign_csr("assured") self.create_codesign_csr("admin") self.create_codesign_csr("assured", "sha512") self.create_org_email_csr("evil-corp") self.create_org_email_csr("evil-corp", "sha512") self.create_org_codesign_csr("evil-corp") self.create_org_codesign_csr("evil-corp", "sha512") def create_email_csr(self, user_key, md="sha256", root_cert=2): self._create_personal_client_cert_request(user_key, md, root_cert, False) def create_codesign_csr(self, user_key, md="sha256", root_cert=2): self._create_personal_client_cert_request(user_key, md, root_cert, True) def create_org_email_csr(self, org_key, md="sha256", root_cert=2): self._create_organization_client_cert_request(org_key, md, root_cert, False) def create_org_codesign_csr(self, org_key, md="sha256", root_cert=2): self._create_organization_client_cert_request(org_key, md, root_cert, True) def _generate_csr(self, csr_type: str, subject: x509.Name, row_id: int) -> str: private_key = rsa.generate_private_key(public_exponent=65537, key_size=2048) csr = ( x509.CertificateSigningRequestBuilder() .subject_name(subject) .sign(private_key, hashes.SHA256()) ) p = ( pathlib.Path(self.csr_dir) / csr_type / str(row_id)[:3] / f"{csr_type}-{row_id}.csr" ) p.parent.mkdir(mode=0o755, parents=True, exist_ok=True) p.write_bytes(csr.public_bytes(serialization.Encoding.PEM)) return str(p) def _create_personal_client_cert_request(self, user_key, md, root_cert, code_sign): user = TEST_USERS[user_key] subject = x509.Name( [ x509.NameAttribute(x509.oid.NameOID.COMMON_NAME, user.get_name()), x509.NameAttribute(x509.oid.NameOID.EMAIL_ADDRESS, user.email), ] ) # The value of the keytype column is set to "VI" as this is the one of the supportable key type, "NS" is for # SPKAC type signing requests and uses MD5, "MS" seems to be related to Microsoft browsers. "VI" might stand # for "vendor independent". with self.engine.connect() as conn: result = conn.execute( insert(self.emailcerts_table).values( memid=user.id, CN=user.email, subject=f"/CN={user.get_name()}/emailAddress={user.email}", keytype=CSR_TYPE_VENDOR_INDEPENDENT, created=func.now(), modified=func.now(), rootcert=root_cert, md=md, disablelogin=True, codesign=code_sign, ) ) cert_id = result.inserted_primary_key.id logging.debug("created emailcerts entry with id %d", cert_id) csr_filename = self._generate_csr("client", subject, cert_id) conn.execute( update(self.emailcerts_table) .where(self.emailcerts_table.c.id == cert_id) .values(csr_name=csr_filename, modified=func.now()) ) conn.commit() def _create_organization_client_cert_request( self, org_key, md, root_cert, code_sign ): org, user_key = TEST_ORGANIZATIONS[org_key] user = TEST_USERS[user_key] subject = x509.Name( [ x509.NameAttribute(x509.oid.NameOID.ORGANIZATION_NAME, org.name), x509.NameAttribute( x509.oid.NameOID.ORGANIZATIONAL_UNIT_NAME, org.ou_name ), x509.NameAttribute(x509.oid.NameOID.COUNTRY_NAME, org.country), x509.NameAttribute(x509.oid.NameOID.LOCALITY_NAME, org.locality), x509.NameAttribute(x509.oid.NameOID.STATE_OR_PROVINCE_NAME, org.state), ] ) with self.engine.connect() as conn: result = conn.execute( insert(self.orgemailcerts_table).values( orgid=org.id, CN=org.domain, subject=f"/O={org.name}/OU={org.ou_name}/C={org.country}/ST={org.state}/L={org.locality}/emailAddress={self.fake.email(domain=org.domain)}", keytype=CSR_TYPE_VENDOR_INDEPENDENT, created=func.now(), modified=func.now(), rootcert=root_cert, md=md, codesign=code_sign, ou=org.ou_name, orgadminid=user.id, ) ) cert_id = result.inserted_primary_key.id logging.debug("created orgemailcerts entry with id %d", cert_id) csr_filename = self._generate_csr("orgclient", subject, cert_id) conn.execute( update(self.orgemailcerts_table) .where(self.orgemailcerts_table.c.id == cert_id) .values(csr_name=csr_filename, modified=func.now()) ) conn.commit() def main(config_file: str): cp = configparser.ConfigParser() cp.read(config_file) dsn = cp.get("db", "dsn") engine = create_engine(dsn, echo=False) logging.basicConfig(level=logging.INFO, stream=sys.stdout) data_generator = DataGenerator( engine, cp.get("files", "csr_dir"), cp.get("files", "crt_dir") ) data_generator.populate_users() data_generator.populate_organizations() data_generator.generate_csrs() if __name__ == "__main__": main("config.ini")