You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

433 lines
15 KiB
Python

import configparser
import logging
import pathlib
import random
import secrets
import sys
from hashlib import md5, sha1
from cryptography import x509
from cryptography.hazmat.primitives import hashes, serialization
from cryptography.hazmat.primitives.asymmetric import rsa
from faker import Faker
from sqlalchemy import MetaData, Table, select, create_engine, insert, func, update
# german state codes from ISO-3166-2:DE
GERMAN_STATE_CODES = [
"BW",
"BY",
"BE",
"BB",
"HB",
"HH",
"HE",
"MV",
"NI",
"NW",
"RP",
"SL",
"SN",
"ST",
"SH",
"TH",
]
CSR_TYPE_VENDOR_INDEPENDENT = "VI"
def build_user_unique_id(email: str) -> str:
return md5(email.encode() + secrets.token_bytes(8)).hexdigest()
def build_random_password():
return sha1(secrets.token_bytes(16)).hexdigest()
class User:
def __init__(
self,
email: str,
is_org_admin: bool = False,
is_assured: bool = False,
is_assurer: bool = False,
is_admin: bool = False,
):
self.email = email
self.is_org_admin = is_org_admin
self.is_assured = is_assured
self.is_assurer = is_assurer
self.is_admin = is_admin
self.id = None
self.first_name = ""
self.last_name = ""
def __repr__(self):
return f"User<'{self.email}', {self.id}>"
def get_name(self):
if self.is_assured:
return f"{self.first_name} {self.last_name}"
return "CAcert WoT user"
class Organization:
def __init__(self, contact: str):
self.contact = contact
self.id = None
self.name = ""
self.locality = ""
self.state = ""
self.country = ""
self.comments = ""
self.ou_name = ""
self.domain = ""
def __repr__(self):
return (
f"Organization<'{self.name}' <{self.contact}>, '{self.domain}', {self.id}>"
)
TEST_USERS = {
"wot": User("wot-user@example.org"),
"assured": User("assured-user@example.org", is_assured=True),
"assurer": User("assurer-user@example.org", is_assured=True, is_assurer=True),
"org-admin": User(
"org-user@example.org",
is_assured=True,
is_assurer=True,
is_org_admin=True,
),
"admin": User(
"admin-user@example.org",
is_assured=True,
is_assurer=True,
is_org_admin=True,
is_admin=True,
),
}
TEST_ORGANIZATIONS = {
"evil-corp": (Organization("contact@evilcorp.example.org"), "org-admin"),
}
class DataGenerator:
def __init__(self, engine, csr_dir, crt_dir):
self.engine = engine
self.csr_dir = csr_dir
self.crt_dir = crt_dir
self.metadata = MetaData()
self.users_table = Table("users", self.metadata, autoload_with=self.engine)
self.organizations_table = Table(
"orginfo", self.metadata, autoload_with=self.engine
)
self.organizations_assignments_table = Table(
"org", self.metadata, autoload_with=self.engine
)
self.organizations_domain_table = Table(
"orgdomains", self.metadata, autoload_with=self.engine
)
self.emailcerts_table = Table(
"emailcerts", self.metadata, autoload_with=self.engine
)
self.orgemailcerts_table = Table(
"orgemailcerts", self.metadata, autoload_with=self.engine
)
self.domaincerts_table = Table(
"domaincerts", self.metadata, autoload_with=self.engine
)
self.orgdomaincerts_table = Table(
"orgdomaincerts", self.metadata, autoload_with=self.engine
)
self.fake = Faker()
Faker.seed(42)
def populate_users(self):
with self.engine.connect() as conn:
for key, user in TEST_USERS.items():
stmt = select(self.users_table).where(
self.users_table.c.email == user.email
)
db_user = conn.execute(stmt).one_or_none()
if not db_user:
user.first_name = self.fake.first_name()
user.last_name = self.fake.last_name()
stmt = insert(self.users_table).values(
email=user.email,
fname=user.first_name,
lname=user.last_name,
password=build_random_password(),
orgadmin=user.is_org_admin,
admin=user.is_admin,
assurer=user.is_assurer,
adadmin=False,
locked=False,
dob=self.fake.passport_dob(),
uniqueID=build_user_unique_id(user.email),
otphash="",
otppin=0,
verified=True,
created=func.now(),
modified=func.now(),
)
result = conn.execute(stmt)
user.id = result.inserted_primary_key.id
conn.commit()
else:
user.id = db_user.id
user.first_name = db_user.fname
user.last_name = db_user.lname
logging.info("created test users %s", TEST_USERS)
def populate_organizations(self):
with self.engine.connect() as conn:
for key, org_item in TEST_ORGANIZATIONS.items():
org, admin = org_item
db_orginfo = conn.execute(
select(self.organizations_table).where(
self.organizations_table.c.contact == org.contact
)
).one_or_none()
if not db_orginfo:
org.name = self.fake.company()
org.locality = self.fake.city()
org.country = self.fake.country_code()
org.state = random.choice(GERMAN_STATE_CODES)
org.comments = self.fake.catch_phrase()
result = conn.execute(
insert(self.organizations_table).values(
contact=org.contact,
O=org.name,
L=org.locality,
ST=org.state,
C=org.country,
comments=org.comments,
creator_id=0,
created=func.now(),
)
)
org.id = result.inserted_primary_key.id
conn.commit()
else:
org.id = db_orginfo.id
org.name = db_orginfo.O
org.locality = db_orginfo.L
org.country = db_orginfo.C
org.state = db_orginfo.ST
org.comments = db_orginfo.comments
db_org = conn.execute(
select(self.organizations_assignments_table)
.where(self.organizations_assignments_table.c.orgid == org.id)
.where(
self.organizations_assignments_table.c.memid
== TEST_USERS[admin].id,
)
).one_or_none()
if not db_org:
org.ou_name = self.fake.text(max_nb_chars=40)
conn.execute(
insert(self.organizations_assignments_table).values(
orgid=org.id,
memid=TEST_USERS[admin].id,
OU=org.ou_name,
masteracc=True,
comments=self.fake.sentence(),
creator_id=0,
created=func.now(),
)
)
conn.commit()
else:
org.ou_name = db_org.OU
logging.info("created organizational unit %s", org.ou_name)
db_org_domain = conn.execute(
select(self.organizations_domain_table).where(
self.organizations_domain_table.c.orgid == org.id
)
).one_or_none()
if not db_org_domain:
org.domain = self.fake.domain_name()
conn.execute(
insert(self.organizations_domain_table).values(
orgid=org.id,
domain=org.domain,
)
)
conn.commit()
else:
org.domain = db_org_domain.domain
logging.info("created organizations %s", TEST_ORGANIZATIONS)
def generate_csrs(self):
self.create_email_csr("wot")
self.create_email_csr("assured")
self.create_email_csr("assurer")
self.create_email_csr("admin")
self.create_email_csr("org-admin")
self.create_email_csr("assurer", md="md5")
self.create_email_csr("assurer", md="sha1")
self.create_email_csr("assurer", md="sha512")
self.create_codesign_csr("assured")
self.create_codesign_csr("admin")
self.create_codesign_csr("assured", "sha512")
self.create_org_email_csr("evil-corp")
self.create_org_email_csr("evil-corp", "sha512")
self.create_org_codesign_csr("evil-corp")
self.create_org_codesign_csr("evil-corp", "sha512")
def create_email_csr(self, user_key, md="sha256", root_cert=2):
self._create_personal_client_cert_request(user_key, md, root_cert, False)
def create_codesign_csr(self, user_key, md="sha256", root_cert=2):
self._create_personal_client_cert_request(user_key, md, root_cert, True)
def create_org_email_csr(self, org_key, md="sha256", root_cert=2):
self._create_organization_client_cert_request(org_key, md, root_cert, False)
def create_org_codesign_csr(self, org_key, md="sha256", root_cert=2):
self._create_organization_client_cert_request(org_key, md, root_cert, True)
def _generate_csr(self, csr_type: str, subject: x509.Name, row_id: int) -> str:
private_key = rsa.generate_private_key(public_exponent=65537, key_size=2048)
csr = (
x509.CertificateSigningRequestBuilder()
.subject_name(subject)
.sign(private_key, hashes.SHA256())
)
p = (
pathlib.Path(self.csr_dir)
/ csr_type
/ str(row_id)[:3]
/ f"{csr_type}-{row_id}.csr"
)
p.parent.mkdir(mode=0o755, parents=True, exist_ok=True)
p.write_bytes(csr.public_bytes(serialization.Encoding.PEM))
return str(p)
def _create_personal_client_cert_request(self, user_key, md, root_cert, code_sign):
user = TEST_USERS[user_key]
subject = x509.Name(
[
x509.NameAttribute(x509.oid.NameOID.COMMON_NAME, user.get_name()),
x509.NameAttribute(x509.oid.NameOID.EMAIL_ADDRESS, user.email),
]
)
# The value of the keytype column is set to "VI" as this is the one of the supportable key type, "NS" is for
# SPKAC type signing requests and uses MD5, "MS" seems to be related to Microsoft browsers. "VI" might stand
# for "vendor independent".
with self.engine.connect() as conn:
result = conn.execute(
insert(self.emailcerts_table).values(
memid=user.id,
CN=user.email,
subject=f"/CN={user.get_name()}/emailAddress={user.email}",
keytype=CSR_TYPE_VENDOR_INDEPENDENT,
created=func.now(),
modified=func.now(),
rootcert=root_cert,
md=md,
disablelogin=True,
codesign=code_sign,
)
)
cert_id = result.inserted_primary_key.id
logging.debug("created emailcerts entry with id %d", cert_id)
csr_filename = self._generate_csr("client", subject, cert_id)
conn.execute(
update(self.emailcerts_table)
.where(self.emailcerts_table.c.id == cert_id)
.values(csr_name=csr_filename, modified=func.now())
)
conn.commit()
def _create_organization_client_cert_request(
self, org_key, md, root_cert, code_sign
):
org, user_key = TEST_ORGANIZATIONS[org_key]
user = TEST_USERS[user_key]
subject = x509.Name(
[
x509.NameAttribute(x509.oid.NameOID.ORGANIZATION_NAME, org.name),
x509.NameAttribute(
x509.oid.NameOID.ORGANIZATIONAL_UNIT_NAME, org.ou_name
),
x509.NameAttribute(x509.oid.NameOID.COUNTRY_NAME, org.country),
x509.NameAttribute(x509.oid.NameOID.LOCALITY_NAME, org.locality),
x509.NameAttribute(x509.oid.NameOID.STATE_OR_PROVINCE_NAME, org.state),
]
)
with self.engine.connect() as conn:
result = conn.execute(
insert(self.orgemailcerts_table).values(
orgid=org.id,
CN=org.domain,
subject=f"/O={org.name}/OU={org.ou_name}/C={org.country}/ST={org.state}/L={org.locality}/emailAddress={self.fake.email(domain=org.domain)}",
keytype=CSR_TYPE_VENDOR_INDEPENDENT,
created=func.now(),
modified=func.now(),
rootcert=root_cert,
md=md,
codesign=code_sign,
ou=org.ou_name,
orgadminid=user.id,
)
)
cert_id = result.inserted_primary_key.id
logging.debug("created orgemailcerts entry with id %d", cert_id)
csr_filename = self._generate_csr("orgclient", subject, cert_id)
conn.execute(
update(self.orgemailcerts_table)
.where(self.orgemailcerts_table.c.id == cert_id)
.values(csr_name=csr_filename, modified=func.now())
)
conn.commit()
def main(config_file: str):
cp = configparser.ConfigParser()
cp.read(config_file)
dsn = cp.get("db", "dsn")
engine = create_engine(dsn, echo=False)
logging.basicConfig(level=logging.INFO, stream=sys.stdout)
data_generator = DataGenerator(
engine, cp.get("files", "csr_dir"), cp.get("files", "crt_dir")
)
data_generator.populate_users()
data_generator.populate_organizations()
data_generator.generate_csrs()
if __name__ == "__main__":
main("config.ini")