Compare commits

...

2 Commits

@ -1,6 +1,7 @@
import configparser
import logging
import pathlib
import random
import secrets
import sys
from hashlib import md5, sha1
@ -11,6 +12,28 @@ from cryptography.hazmat.primitives.asymmetric import rsa
from faker import Faker
from sqlalchemy import MetaData, Table, select, create_engine, insert, func, update
# german state codes from ISO-3166-2:DE
GERMAN_STATE_CODES = [
"BW",
"BY",
"BE",
"BB",
"HB",
"HH",
"HE",
"MV",
"NI",
"NW",
"RP",
"SL",
"SN",
"ST",
"SH",
"TH",
]
CSR_TYPE_VENDOR_INDEPENDENT = "VI"
def build_user_unique_id(email: str) -> str:
return md5(email.encode() + secrets.token_bytes(8)).hexdigest()
@ -47,6 +70,24 @@ class User:
return "CAcert WoT user"
class Organization:
def __init__(self, contact: str):
self.contact = contact
self.id = None
self.name = ""
self.locality = ""
self.state = ""
self.country = ""
self.comments = ""
self.ou_name = ""
self.domain = ""
def __repr__(self):
return (
f"Organization<'{self.name}' <{self.contact}>, '{self.domain}', {self.id}>"
)
TEST_USERS = {
"wot": User("wot-user@example.org"),
"assured": User("assured-user@example.org", is_assured=True),
@ -66,6 +107,10 @@ TEST_USERS = {
),
}
TEST_ORGANIZATIONS = {
"evil-corp": (Organization("contact@evilcorp.example.org"), "org-admin"),
}
class DataGenerator:
def __init__(self, engine, csr_dir, crt_dir):
@ -74,6 +119,15 @@ class DataGenerator:
self.crt_dir = crt_dir
self.metadata = MetaData()
self.users_table = Table("users", self.metadata, autoload_with=self.engine)
self.organizations_table = Table(
"orginfo", self.metadata, autoload_with=self.engine
)
self.organizations_assignments_table = Table(
"org", self.metadata, autoload_with=self.engine
)
self.organizations_domain_table = Table(
"orgdomains", self.metadata, autoload_with=self.engine
)
self.emailcerts_table = Table(
"emailcerts", self.metadata, autoload_with=self.engine
)
@ -87,9 +141,9 @@ class DataGenerator:
"orgdomaincerts", self.metadata, autoload_with=self.engine
)
self.fake = Faker()
Faker.seed(0)
Faker.seed(42)
def ensure_users(self):
def populate_users(self):
with self.engine.connect() as conn:
for key, user in TEST_USERS.items():
stmt = select(self.users_table).where(
@ -129,6 +183,93 @@ class DataGenerator:
user.last_name = db_user.lname
logging.info("created test users %s", TEST_USERS)
def populate_organizations(self):
with self.engine.connect() as conn:
for key, org_item in TEST_ORGANIZATIONS.items():
org, admin = org_item
db_orginfo = conn.execute(
select(self.organizations_table).where(
self.organizations_table.c.contact == org.contact
)
).one_or_none()
if not db_orginfo:
org.name = self.fake.company()
org.locality = self.fake.city()
org.country = self.fake.country_code()
org.state = random.choice(GERMAN_STATE_CODES)
org.comments = self.fake.catch_phrase()
result = conn.execute(
insert(self.organizations_table).values(
contact=org.contact,
O=org.name,
L=org.locality,
ST=org.state,
C=org.country,
comments=org.comments,
creator_id=0,
created=func.now(),
)
)
org.id = result.inserted_primary_key.id
conn.commit()
else:
org.id = db_orginfo.id
org.name = db_orginfo.O
org.locality = db_orginfo.L
org.country = db_orginfo.C
org.state = db_orginfo.ST
org.comments = db_orginfo.comments
db_org = conn.execute(
select(self.organizations_assignments_table)
.where(self.organizations_assignments_table.c.orgid == org.id)
.where(
self.organizations_assignments_table.c.memid
== TEST_USERS[admin].id,
)
).one_or_none()
if not db_org:
org.ou_name = self.fake.text(max_nb_chars=40)
conn.execute(
insert(self.organizations_assignments_table).values(
orgid=org.id,
memid=TEST_USERS[admin].id,
OU=org.ou_name,
masteracc=True,
comments=self.fake.sentence(),
creator_id=0,
created=func.now(),
)
)
conn.commit()
else:
org.ou_name = db_org.OU
logging.info("created organizational unit %s", org.ou_name)
db_org_domain = conn.execute(
select(self.organizations_domain_table).where(
self.organizations_domain_table.c.orgid == org.id
)
).one_or_none()
if not db_org_domain:
org.domain = self.fake.domain_name()
conn.execute(
insert(self.organizations_domain_table).values(
orgid=org.id,
domain=org.domain,
)
)
conn.commit()
else:
org.domain = db_org_domain.domain
logging.info("created organizations %s", TEST_ORGANIZATIONS)
def generate_csrs(self):
self.create_email_csr("wot")
self.create_email_csr("assured")
@ -138,8 +279,48 @@ class DataGenerator:
self.create_email_csr("assurer", md="md5")
self.create_email_csr("assurer", md="sha1")
self.create_email_csr("assurer", md="sha512")
self.create_codesign_csr("assured")
self.create_codesign_csr("admin")
self.create_codesign_csr("assured", "sha512")
self.create_org_email_csr("evil-corp")
self.create_org_email_csr("evil-corp", "sha512")
self.create_org_codesign_csr("evil-corp")
self.create_org_codesign_csr("evil-corp", "sha512")
def create_email_csr(self, user_key, md="sha256", root_cert=2):
self._create_personal_client_cert_request(user_key, md, root_cert, False)
def create_codesign_csr(self, user_key, md="sha256", root_cert=2):
self._create_personal_client_cert_request(user_key, md, root_cert, True)
def create_org_email_csr(self, org_key, md="sha256", root_cert=2):
self._create_organization_client_cert_request(org_key, md, root_cert, False)
def create_org_codesign_csr(self, org_key, md="sha256", root_cert=2):
self._create_organization_client_cert_request(org_key, md, root_cert, True)
def _generate_csr(self, csr_type: str, subject: x509.Name, row_id: int) -> str:
private_key = rsa.generate_private_key(public_exponent=65537, key_size=2048)
csr = (
x509.CertificateSigningRequestBuilder()
.subject_name(subject)
.sign(private_key, hashes.SHA256())
)
p = (
pathlib.Path(self.csr_dir)
/ csr_type
/ str(row_id)[:3]
/ f"{csr_type}-{row_id}.csr"
)
p.parent.mkdir(mode=0o755, parents=True, exist_ok=True)
p.write_bytes(csr.public_bytes(serialization.Encoding.PEM))
return str(p)
def _create_personal_client_cert_request(self, user_key, md, root_cert, code_sign):
user = TEST_USERS[user_key]
subject = x509.Name(
@ -149,50 +330,83 @@ class DataGenerator:
]
)
# The value of the keytype column is set to "MS" as this is the only supportable key type, "NS" is for
# SPKAC type signing requests and uses MD5, "VI" uses unsupported Microsoft CSR attributes
# The value of the keytype column is set to "VI" as this is the one of the supportable key type, "NS" is for
# SPKAC type signing requests and uses MD5, "MS" seems to be related to Microsoft browsers. "VI" might stand
# for "vendor independent".
with self.engine.connect() as conn:
stmt = insert(self.emailcerts_table).values(
memid=user.id,
CN=user.email,
subject=f"/CN={user.get_name()}/emailAddress={user.email}",
keytype="MS",
created=func.now(),
modified=func.now(),
rootcert=root_cert,
md=md,
disablelogin=1,
result = conn.execute(
insert(self.emailcerts_table).values(
memid=user.id,
CN=user.email,
subject=f"/CN={user.get_name()}/emailAddress={user.email}",
keytype=CSR_TYPE_VENDOR_INDEPENDENT,
created=func.now(),
modified=func.now(),
rootcert=root_cert,
md=md,
disablelogin=True,
codesign=code_sign,
)
)
result = conn.execute(stmt)
cert_id = result.inserted_primary_key.id
logging.debug("created emailcerts entry with id %d", cert_id)
csr_filename = self.generate_csr("client", subject, cert_id)
csr_filename = self._generate_csr("client", subject, cert_id)
stmt = (
conn.execute(
update(self.emailcerts_table)
.where(self.emailcerts_table.c.id == cert_id)
.values(csr_name=csr_filename, modified=func.now())
)
conn.execute(stmt)
conn.commit()
def generate_csr(self, csr_type: str, subject: x509.Name, id: int) -> str:
private_key = rsa.generate_private_key(public_exponent=65537, key_size=2048)
def _create_organization_client_cert_request(
self, org_key, md, root_cert, code_sign
):
org, user_key = TEST_ORGANIZATIONS[org_key]
user = TEST_USERS[user_key]
csr = (
x509.CertificateSigningRequestBuilder()
.subject_name(subject)
.sign(private_key, hashes.SHA256())
subject = x509.Name(
[
x509.NameAttribute(x509.oid.NameOID.ORGANIZATION_NAME, org.name),
x509.NameAttribute(
x509.oid.NameOID.ORGANIZATIONAL_UNIT_NAME, org.ou_name
),
x509.NameAttribute(x509.oid.NameOID.COUNTRY_NAME, org.country),
x509.NameAttribute(x509.oid.NameOID.LOCALITY_NAME, org.locality),
x509.NameAttribute(x509.oid.NameOID.STATE_OR_PROVINCE_NAME, org.state),
]
)
p = pathlib.Path(self.csr_dir) / csr_type / str(id)[:3] / f"{csr_type}-{id}.csr"
p.parent.mkdir(mode=0o755, parents=True, exist_ok=True)
with self.engine.connect() as conn:
result = conn.execute(
insert(self.orgemailcerts_table).values(
orgid=org.id,
CN=org.domain,
subject=f"/O={org.name}/OU={org.ou_name}/C={org.country}/ST={org.state}/L={org.locality}/emailAddress={self.fake.email(domain=org.domain)}",
keytype=CSR_TYPE_VENDOR_INDEPENDENT,
created=func.now(),
modified=func.now(),
rootcert=root_cert,
md=md,
codesign=code_sign,
ou=org.ou_name,
orgadminid=user.id,
)
)
cert_id = result.inserted_primary_key.id
logging.debug("created orgemailcerts entry with id %d", cert_id)
p.write_bytes(csr.public_bytes(serialization.Encoding.PEM))
csr_filename = self._generate_csr("orgclient", subject, cert_id)
return str(p)
conn.execute(
update(self.orgemailcerts_table)
.where(self.orgemailcerts_table.c.id == cert_id)
.values(csr_name=csr_filename, modified=func.now())
)
conn.commit()
def main(config_file: str):
@ -209,7 +423,8 @@ def main(config_file: str):
engine, cp.get("files", "csr_dir"), cp.get("files", "crt_dir")
)
data_generator.ensure_users()
data_generator.populate_users()
data_generator.populate_organizations()
data_generator.generate_csrs()

Loading…
Cancel
Save