You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

221 lines
7.1 KiB
Python

import configparser
import logging
import pathlib
import secrets
import sys
from hashlib import md5, sha1
from cryptography import x509
from cryptography.hazmat.primitives import hashes, serialization
from cryptography.hazmat.primitives.asymmetric import rsa
from faker import Faker
from sqlalchemy import MetaData, Table, select, create_engine, insert, func, update
CSR_TYPE_VENDOR_INDEPENDENT = "VI"
def build_user_unique_id(email: str) -> str:
return md5(email.encode() + secrets.token_bytes(8)).hexdigest()
def build_random_password():
return sha1(secrets.token_bytes(16)).hexdigest()
class User:
def __init__(
self,
email: str,
is_org_admin: bool = False,
is_assured: bool = False,
is_assurer: bool = False,
is_admin: bool = False,
):
self.email = email
self.is_org_admin = is_org_admin
self.is_assured = is_assured
self.is_assurer = is_assurer
self.is_admin = is_admin
self.id = None
self.first_name = ""
self.last_name = ""
def __repr__(self):
return f"User<'{self.email}', {self.id}>"
def get_name(self):
if self.is_assured:
return f"{self.first_name} {self.last_name}"
return "CAcert WoT user"
TEST_USERS = {
"wot": User("wot-user@example.org"),
"assured": User("assured-user@example.org", is_assured=True),
"assurer": User("assurer-user@example.org", is_assured=True, is_assurer=True),
"org-admin": User(
"org-user@example.org",
is_assured=True,
is_assurer=True,
is_org_admin=True,
),
"admin": User(
"admin-user@example.org",
is_assured=True,
is_assurer=True,
is_org_admin=True,
is_admin=True,
),
}
class DataGenerator:
def __init__(self, engine, csr_dir, crt_dir):
self.engine = engine
self.csr_dir = csr_dir
self.crt_dir = crt_dir
self.metadata = MetaData()
self.users_table = Table("users", self.metadata, autoload_with=self.engine)
self.emailcerts_table = Table(
"emailcerts", self.metadata, autoload_with=self.engine
)
self.orgemailcerts_table = Table(
"orgemailcerts", self.metadata, autoload_with=self.engine
)
self.domaincerts_table = Table(
"domaincerts", self.metadata, autoload_with=self.engine
)
self.orgdomaincerts_table = Table(
"orgdomaincerts", self.metadata, autoload_with=self.engine
)
self.fake = Faker()
Faker.seed(0)
def ensure_users(self):
with self.engine.connect() as conn:
for key, user in TEST_USERS.items():
stmt = select(self.users_table).where(
self.users_table.c.email == user.email
)
db_user = conn.execute(stmt).one_or_none()
if not db_user:
user.first_name = self.fake.first_name()
user.last_name = self.fake.last_name()
stmt = insert(self.users_table).values(
email=user.email,
fname=user.first_name,
lname=user.last_name,
password=build_random_password(),
orgadmin=user.is_org_admin,
admin=user.is_admin,
assurer=user.is_assurer,
adadmin=False,
locked=False,
dob=self.fake.passport_dob(),
uniqueID=build_user_unique_id(user.email),
otphash="",
otppin=0,
verified=True,
created=func.now(),
modified=func.now(),
)
result = conn.execute(stmt)
user.id = result.inserted_primary_key.id
conn.commit()
else:
user.id = db_user.id
user.first_name = db_user.fname
user.last_name = db_user.lname
logging.info("created test users %s", TEST_USERS)
def generate_csrs(self):
self.create_email_csr("wot")
self.create_email_csr("assured")
self.create_email_csr("assurer")
self.create_email_csr("admin")
self.create_email_csr("org-admin")
self.create_email_csr("assurer", md="md5")
self.create_email_csr("assurer", md="sha1")
self.create_email_csr("assurer", md="sha512")
def create_email_csr(self, user_key, md="sha256", root_cert=2):
user = TEST_USERS[user_key]
subject = x509.Name(
[
x509.NameAttribute(x509.oid.NameOID.COMMON_NAME, user.get_name()),
x509.NameAttribute(x509.oid.NameOID.EMAIL_ADDRESS, user.email),
]
)
# The value of the keytype column is set to "VI" as this is the one of the supportable key type, "NS" is for
# SPKAC type signing requests and uses MD5, "MS" seems to be related to Microsoft browsers. "VI" might stand
# for "vendor independent".
with self.engine.connect() as conn:
stmt = insert(self.emailcerts_table).values(
memid=user.id,
CN=user.email,
subject=f"/CN={user.get_name()}/emailAddress={user.email}",
keytype=CSR_TYPE_VENDOR_INDEPENDENT,
created=func.now(),
modified=func.now(),
rootcert=root_cert,
md=md,
disablelogin=1,
)
result = conn.execute(stmt)
cert_id = result.inserted_primary_key.id
logging.debug("created emailcerts entry with id %d", cert_id)
csr_filename = self.generate_csr("client", subject, cert_id)
stmt = (
update(self.emailcerts_table)
.where(self.emailcerts_table.c.id == cert_id)
.values(csr_name=csr_filename, modified=func.now())
)
conn.execute(stmt)
conn.commit()
def generate_csr(self, csr_type: str, subject: x509.Name, id: int) -> str:
private_key = rsa.generate_private_key(public_exponent=65537, key_size=2048)
csr = (
x509.CertificateSigningRequestBuilder()
.subject_name(subject)
.sign(private_key, hashes.SHA256())
)
p = pathlib.Path(self.csr_dir) / csr_type / str(id)[:3] / f"{csr_type}-{id}.csr"
p.parent.mkdir(mode=0o755, parents=True, exist_ok=True)
p.write_bytes(csr.public_bytes(serialization.Encoding.PEM))
return str(p)
def main(config_file: str):
cp = configparser.ConfigParser()
cp.read(config_file)
dsn = cp.get("db", "dsn")
engine = create_engine(dsn, echo=False)
logging.basicConfig(level=logging.INFO, stream=sys.stdout)
data_generator = DataGenerator(
engine, cp.get("files", "csr_dir"), cp.get("files", "crt_dir")
)
data_generator.ensure_users()
data_generator.generate_csrs()
if __name__ == "__main__":
main("config.ini")