Compare commits

...

2 Commits

@ -1,6 +1,7 @@
import configparser import configparser
import logging import logging
import pathlib import pathlib
import random
import secrets import secrets
import sys import sys
from hashlib import md5, sha1 from hashlib import md5, sha1
@ -11,6 +12,28 @@ from cryptography.hazmat.primitives.asymmetric import rsa
from faker import Faker from faker import Faker
from sqlalchemy import MetaData, Table, select, create_engine, insert, func, update from sqlalchemy import MetaData, Table, select, create_engine, insert, func, update
# german state codes from ISO-3166-2:DE
GERMAN_STATE_CODES = [
"BW",
"BY",
"BE",
"BB",
"HB",
"HH",
"HE",
"MV",
"NI",
"NW",
"RP",
"SL",
"SN",
"ST",
"SH",
"TH",
]
CSR_TYPE_VENDOR_INDEPENDENT = "VI"
def build_user_unique_id(email: str) -> str: def build_user_unique_id(email: str) -> str:
return md5(email.encode() + secrets.token_bytes(8)).hexdigest() return md5(email.encode() + secrets.token_bytes(8)).hexdigest()
@ -47,6 +70,24 @@ class User:
return "CAcert WoT user" return "CAcert WoT user"
class Organization:
def __init__(self, contact: str):
self.contact = contact
self.id = None
self.name = ""
self.locality = ""
self.state = ""
self.country = ""
self.comments = ""
self.ou_name = ""
self.domain = ""
def __repr__(self):
return (
f"Organization<'{self.name}' <{self.contact}>, '{self.domain}', {self.id}>"
)
TEST_USERS = { TEST_USERS = {
"wot": User("wot-user@example.org"), "wot": User("wot-user@example.org"),
"assured": User("assured-user@example.org", is_assured=True), "assured": User("assured-user@example.org", is_assured=True),
@ -66,6 +107,10 @@ TEST_USERS = {
), ),
} }
TEST_ORGANIZATIONS = {
"evil-corp": (Organization("contact@evilcorp.example.org"), "org-admin"),
}
class DataGenerator: class DataGenerator:
def __init__(self, engine, csr_dir, crt_dir): def __init__(self, engine, csr_dir, crt_dir):
@ -74,6 +119,15 @@ class DataGenerator:
self.crt_dir = crt_dir self.crt_dir = crt_dir
self.metadata = MetaData() self.metadata = MetaData()
self.users_table = Table("users", self.metadata, autoload_with=self.engine) self.users_table = Table("users", self.metadata, autoload_with=self.engine)
self.organizations_table = Table(
"orginfo", self.metadata, autoload_with=self.engine
)
self.organizations_assignments_table = Table(
"org", self.metadata, autoload_with=self.engine
)
self.organizations_domain_table = Table(
"orgdomains", self.metadata, autoload_with=self.engine
)
self.emailcerts_table = Table( self.emailcerts_table = Table(
"emailcerts", self.metadata, autoload_with=self.engine "emailcerts", self.metadata, autoload_with=self.engine
) )
@ -87,9 +141,9 @@ class DataGenerator:
"orgdomaincerts", self.metadata, autoload_with=self.engine "orgdomaincerts", self.metadata, autoload_with=self.engine
) )
self.fake = Faker() self.fake = Faker()
Faker.seed(0) Faker.seed(42)
def ensure_users(self): def populate_users(self):
with self.engine.connect() as conn: with self.engine.connect() as conn:
for key, user in TEST_USERS.items(): for key, user in TEST_USERS.items():
stmt = select(self.users_table).where( stmt = select(self.users_table).where(
@ -129,6 +183,93 @@ class DataGenerator:
user.last_name = db_user.lname user.last_name = db_user.lname
logging.info("created test users %s", TEST_USERS) logging.info("created test users %s", TEST_USERS)
def populate_organizations(self):
with self.engine.connect() as conn:
for key, org_item in TEST_ORGANIZATIONS.items():
org, admin = org_item
db_orginfo = conn.execute(
select(self.organizations_table).where(
self.organizations_table.c.contact == org.contact
)
).one_or_none()
if not db_orginfo:
org.name = self.fake.company()
org.locality = self.fake.city()
org.country = self.fake.country_code()
org.state = random.choice(GERMAN_STATE_CODES)
org.comments = self.fake.catch_phrase()
result = conn.execute(
insert(self.organizations_table).values(
contact=org.contact,
O=org.name,
L=org.locality,
ST=org.state,
C=org.country,
comments=org.comments,
creator_id=0,
created=func.now(),
)
)
org.id = result.inserted_primary_key.id
conn.commit()
else:
org.id = db_orginfo.id
org.name = db_orginfo.O
org.locality = db_orginfo.L
org.country = db_orginfo.C
org.state = db_orginfo.ST
org.comments = db_orginfo.comments
db_org = conn.execute(
select(self.organizations_assignments_table)
.where(self.organizations_assignments_table.c.orgid == org.id)
.where(
self.organizations_assignments_table.c.memid
== TEST_USERS[admin].id,
)
).one_or_none()
if not db_org:
org.ou_name = self.fake.text(max_nb_chars=40)
conn.execute(
insert(self.organizations_assignments_table).values(
orgid=org.id,
memid=TEST_USERS[admin].id,
OU=org.ou_name,
masteracc=True,
comments=self.fake.sentence(),
creator_id=0,
created=func.now(),
)
)
conn.commit()
else:
org.ou_name = db_org.OU
logging.info("created organizational unit %s", org.ou_name)
db_org_domain = conn.execute(
select(self.organizations_domain_table).where(
self.organizations_domain_table.c.orgid == org.id
)
).one_or_none()
if not db_org_domain:
org.domain = self.fake.domain_name()
conn.execute(
insert(self.organizations_domain_table).values(
orgid=org.id,
domain=org.domain,
)
)
conn.commit()
else:
org.domain = db_org_domain.domain
logging.info("created organizations %s", TEST_ORGANIZATIONS)
def generate_csrs(self): def generate_csrs(self):
self.create_email_csr("wot") self.create_email_csr("wot")
self.create_email_csr("assured") self.create_email_csr("assured")
@ -138,8 +279,48 @@ class DataGenerator:
self.create_email_csr("assurer", md="md5") self.create_email_csr("assurer", md="md5")
self.create_email_csr("assurer", md="sha1") self.create_email_csr("assurer", md="sha1")
self.create_email_csr("assurer", md="sha512") self.create_email_csr("assurer", md="sha512")
self.create_codesign_csr("assured")
self.create_codesign_csr("admin")
self.create_codesign_csr("assured", "sha512")
self.create_org_email_csr("evil-corp")
self.create_org_email_csr("evil-corp", "sha512")
self.create_org_codesign_csr("evil-corp")
self.create_org_codesign_csr("evil-corp", "sha512")
def create_email_csr(self, user_key, md="sha256", root_cert=2): def create_email_csr(self, user_key, md="sha256", root_cert=2):
self._create_personal_client_cert_request(user_key, md, root_cert, False)
def create_codesign_csr(self, user_key, md="sha256", root_cert=2):
self._create_personal_client_cert_request(user_key, md, root_cert, True)
def create_org_email_csr(self, org_key, md="sha256", root_cert=2):
self._create_organization_client_cert_request(org_key, md, root_cert, False)
def create_org_codesign_csr(self, org_key, md="sha256", root_cert=2):
self._create_organization_client_cert_request(org_key, md, root_cert, True)
def _generate_csr(self, csr_type: str, subject: x509.Name, row_id: int) -> str:
private_key = rsa.generate_private_key(public_exponent=65537, key_size=2048)
csr = (
x509.CertificateSigningRequestBuilder()
.subject_name(subject)
.sign(private_key, hashes.SHA256())
)
p = (
pathlib.Path(self.csr_dir)
/ csr_type
/ str(row_id)[:3]
/ f"{csr_type}-{row_id}.csr"
)
p.parent.mkdir(mode=0o755, parents=True, exist_ok=True)
p.write_bytes(csr.public_bytes(serialization.Encoding.PEM))
return str(p)
def _create_personal_client_cert_request(self, user_key, md, root_cert, code_sign):
user = TEST_USERS[user_key] user = TEST_USERS[user_key]
subject = x509.Name( subject = x509.Name(
@ -149,50 +330,83 @@ class DataGenerator:
] ]
) )
# The value of the keytype column is set to "MS" as this is the only supportable key type, "NS" is for # The value of the keytype column is set to "VI" as this is the one of the supportable key type, "NS" is for
# SPKAC type signing requests and uses MD5, "VI" uses unsupported Microsoft CSR attributes # SPKAC type signing requests and uses MD5, "MS" seems to be related to Microsoft browsers. "VI" might stand
# for "vendor independent".
with self.engine.connect() as conn: with self.engine.connect() as conn:
stmt = insert(self.emailcerts_table).values( result = conn.execute(
memid=user.id, insert(self.emailcerts_table).values(
CN=user.email, memid=user.id,
subject=f"/CN={user.get_name()}/emailAddress={user.email}", CN=user.email,
keytype="MS", subject=f"/CN={user.get_name()}/emailAddress={user.email}",
created=func.now(), keytype=CSR_TYPE_VENDOR_INDEPENDENT,
modified=func.now(), created=func.now(),
rootcert=root_cert, modified=func.now(),
md=md, rootcert=root_cert,
disablelogin=1, md=md,
disablelogin=True,
codesign=code_sign,
)
) )
result = conn.execute(stmt)
cert_id = result.inserted_primary_key.id cert_id = result.inserted_primary_key.id
logging.debug("created emailcerts entry with id %d", cert_id) logging.debug("created emailcerts entry with id %d", cert_id)
csr_filename = self.generate_csr("client", subject, cert_id) csr_filename = self._generate_csr("client", subject, cert_id)
stmt = ( conn.execute(
update(self.emailcerts_table) update(self.emailcerts_table)
.where(self.emailcerts_table.c.id == cert_id) .where(self.emailcerts_table.c.id == cert_id)
.values(csr_name=csr_filename, modified=func.now()) .values(csr_name=csr_filename, modified=func.now())
) )
conn.execute(stmt)
conn.commit() conn.commit()
def generate_csr(self, csr_type: str, subject: x509.Name, id: int) -> str: def _create_organization_client_cert_request(
private_key = rsa.generate_private_key(public_exponent=65537, key_size=2048) self, org_key, md, root_cert, code_sign
):
org, user_key = TEST_ORGANIZATIONS[org_key]
user = TEST_USERS[user_key]
csr = ( subject = x509.Name(
x509.CertificateSigningRequestBuilder() [
.subject_name(subject) x509.NameAttribute(x509.oid.NameOID.ORGANIZATION_NAME, org.name),
.sign(private_key, hashes.SHA256()) x509.NameAttribute(
x509.oid.NameOID.ORGANIZATIONAL_UNIT_NAME, org.ou_name
),
x509.NameAttribute(x509.oid.NameOID.COUNTRY_NAME, org.country),
x509.NameAttribute(x509.oid.NameOID.LOCALITY_NAME, org.locality),
x509.NameAttribute(x509.oid.NameOID.STATE_OR_PROVINCE_NAME, org.state),
]
) )
p = pathlib.Path(self.csr_dir) / csr_type / str(id)[:3] / f"{csr_type}-{id}.csr" with self.engine.connect() as conn:
p.parent.mkdir(mode=0o755, parents=True, exist_ok=True) result = conn.execute(
insert(self.orgemailcerts_table).values(
orgid=org.id,
CN=org.domain,
subject=f"/O={org.name}/OU={org.ou_name}/C={org.country}/ST={org.state}/L={org.locality}/emailAddress={self.fake.email(domain=org.domain)}",
keytype=CSR_TYPE_VENDOR_INDEPENDENT,
created=func.now(),
modified=func.now(),
rootcert=root_cert,
md=md,
codesign=code_sign,
ou=org.ou_name,
orgadminid=user.id,
)
)
cert_id = result.inserted_primary_key.id
logging.debug("created orgemailcerts entry with id %d", cert_id)
p.write_bytes(csr.public_bytes(serialization.Encoding.PEM)) csr_filename = self._generate_csr("orgclient", subject, cert_id)
return str(p) conn.execute(
update(self.orgemailcerts_table)
.where(self.orgemailcerts_table.c.id == cert_id)
.values(csr_name=csr_filename, modified=func.now())
)
conn.commit()
def main(config_file: str): def main(config_file: str):
@ -209,7 +423,8 @@ def main(config_file: str):
engine, cp.get("files", "csr_dir"), cp.get("files", "crt_dir") engine, cp.get("files", "csr_dir"), cp.get("files", "crt_dir")
) )
data_generator.ensure_users() data_generator.populate_users()
data_generator.populate_organizations()
data_generator.generate_csrs() data_generator.generate_csrs()

Loading…
Cancel
Save