cacert-testdata-generator/main.py

217 lines
7 KiB
Python

import configparser
import logging
import pathlib
import secrets
import sys
from hashlib import md5, sha1
from cryptography import x509
from cryptography.hazmat.primitives import hashes, serialization
from cryptography.hazmat.primitives.asymmetric import rsa
from faker import Faker
from sqlalchemy import MetaData, Table, select, create_engine, insert, func, update
def build_user_unique_id(email: str) -> str:
return md5(email.encode() + secrets.token_bytes(8)).hexdigest()
def build_random_password():
return sha1(secrets.token_bytes(16)).hexdigest()
class User:
def __init__(
self,
email: str,
is_org_admin: bool = False,
is_assured: bool = False,
is_assurer: bool = False,
is_admin: bool = False,
):
self.email = email
self.is_org_admin = is_org_admin
self.is_assured = is_assured
self.is_assurer = is_assurer
self.is_admin = is_admin
self.id = None
self.first_name = ""
self.last_name = ""
def __repr__(self):
return f"User<'{self.email}', {self.id}>"
def get_name(self):
if self.is_assured:
return f"{self.first_name} {self.last_name}"
return "CAcert WoT user"
TEST_USERS = {
"wot": User("wot-user@example.org"),
"assured": User("assured-user@example.org", is_assured=True),
"assurer": User("assurer-user@example.org", is_assured=True, is_assurer=True),
"org-admin": User(
"org-user@example.org",
is_assured=True,
is_assurer=True,
is_org_admin=True,
),
"admin": User(
"admin-user@example.org",
is_assured=True,
is_assurer=True,
is_org_admin=True,
is_admin=True,
),
}
class DataGenerator:
def __init__(self, engine, csr_dir, crt_dir):
self.engine = engine
self.csr_dir = csr_dir
self.crt_dir = crt_dir
self.metadata = MetaData()
self.users_table = Table("users", self.metadata, autoload_with=self.engine)
self.emailcerts_table = Table(
"emailcerts", self.metadata, autoload_with=self.engine
)
self.orgemailcerts_table = Table(
"orgemailcerts", self.metadata, autoload_with=self.engine
)
self.domaincerts_table = Table(
"domaincerts", self.metadata, autoload_with=self.engine
)
self.orgdomaincerts_table = Table(
"orgdomaincerts", self.metadata, autoload_with=self.engine
)
self.fake = Faker()
Faker.seed(0)
def ensure_users(self):
with self.engine.connect() as conn:
for key, user in TEST_USERS.items():
stmt = select(self.users_table).where(
self.users_table.c.email == user.email
)
db_user = conn.execute(stmt).one_or_none()
if not db_user:
user.first_name = self.fake.first_name()
user.last_name = self.fake.last_name()
stmt = insert(self.users_table).values(
email=user.email,
fname=user.first_name,
lname=user.last_name,
password=build_random_password(),
orgadmin=user.is_org_admin,
admin=user.is_admin,
assurer=user.is_assurer,
adadmin=False,
locked=False,
dob=self.fake.passport_dob(),
uniqueID=build_user_unique_id(user.email),
otphash="",
otppin=0,
verified=True,
created=func.now(),
modified=func.now(),
)
result = conn.execute(stmt)
user.id = result.inserted_primary_key.id
conn.commit()
else:
user.id = db_user.id
user.first_name = db_user.fname
user.last_name = db_user.lname
logging.info("created test users %s", TEST_USERS)
def generate_csrs(self):
self.create_email_csr("wot")
self.create_email_csr("assured")
self.create_email_csr("assurer")
self.create_email_csr("admin")
self.create_email_csr("org-admin")
self.create_email_csr("assurer", md="md5")
self.create_email_csr("assurer", md="sha1")
self.create_email_csr("assurer", md="sha512")
def create_email_csr(self, user_key, md="sha256", root_cert=2):
user = TEST_USERS[user_key]
subject = x509.Name(
[
x509.NameAttribute(x509.oid.NameOID.COMMON_NAME, user.get_name()),
x509.NameAttribute(x509.oid.NameOID.EMAIL_ADDRESS, user.email),
]
)
# The value of the keytype column is set to "MS" as this is the only supportable key type, "NS" is for
# SPKAC type signing requests and uses MD5, "VI" uses unsupported Microsoft CSR attributes
with self.engine.connect() as conn:
stmt = insert(self.emailcerts_table).values(
memid=user.id,
CN=user.email,
subject=f"/CN={user.get_name()}/emailAddress={user.email}",
keytype="MS",
created=func.now(),
modified=func.now(),
rootcert=root_cert,
md=md,
disablelogin=1,
)
result = conn.execute(stmt)
cert_id = result.inserted_primary_key.id
logging.debug("created emailcerts entry with id %d", cert_id)
csr_filename = self.generate_csr("client", subject, cert_id)
stmt = (
update(self.emailcerts_table)
.where(self.emailcerts_table.c.id == cert_id)
.values(csr_name=csr_filename, modified=func.now())
)
conn.execute(stmt)
conn.commit()
def generate_csr(self, csr_type: str, subject: x509.Name, id: int) -> str:
private_key = rsa.generate_private_key(public_exponent=65537, key_size=2048)
csr = (
x509.CertificateSigningRequestBuilder()
.subject_name(subject)
.sign(private_key, hashes.SHA256())
)
p = pathlib.Path(self.csr_dir) / csr_type / str(id)[:3] / f"{csr_type}-{id}.csr"
p.parent.mkdir(mode=0o755, parents=True, exist_ok=True)
p.write_bytes(csr.public_bytes(serialization.Encoding.PEM))
return str(p)
def main(config_file: str):
cp = configparser.ConfigParser()
cp.read(config_file)
dsn = cp.get("db", "dsn")
engine = create_engine(dsn, echo=False)
logging.basicConfig(level=logging.INFO, stream=sys.stdout)
data_generator = DataGenerator(
engine, cp.get("files", "csr_dir"), cp.get("files", "crt_dir")
)
data_generator.ensure_users()
data_generator.generate_csrs()
if __name__ == "__main__":
main("config.ini")