from dataclasses import dataclass from datetime import datetime from urllib.parse import unquote_to_bytes from cryptography import x509 from cryptography.x509 import SubjectAlternativeName, load_pem_x509_certificate from django.conf import settings from django.contrib.auth import get_user_model from django.contrib.auth.backends import BaseBackend from cats.models import UserCertificate UserModel = get_user_model() @dataclass(frozen=True) class CertificateInformation: serial_number: str issuer_common_name: str issuer_organization: str issuer_organizational_unit: str subject_common_name: str emails: list[str] not_before: datetime not_after: datetime def get_certificate_information(encoded_certificate=None): if encoded_certificate is None: return None pem_data = unquote_to_bytes(encoded_certificate) certificate = load_pem_x509_certificate(pem_data) serial_number = f"{certificate.serial_number:X}" if len(serial_number) % 2 == 1: serial_number = "0" + serial_number return CertificateInformation( serial_number=serial_number, issuer_common_name=certificate.issuer.get_attributes_for_oid( x509.OID_COMMON_NAME )[0].value, issuer_organization=certificate.issuer.get_attributes_for_oid( x509.OID_ORGANIZATION_NAME )[0].value, issuer_organizational_unit=certificate.issuer.get_attributes_for_oid( x509.OID_ORGANIZATIONAL_UNIT_NAME )[0].value, subject_common_name=certificate.subject.get_attributes_for_oid( x509.OID_COMMON_NAME )[0].value, emails=certificate.extensions.get_extension_for_class( SubjectAlternativeName ).value.get_values_for_type(x509.RFC822Name), not_before=certificate.not_valid_before_utc, not_after=certificate.not_valid_after_utc, ) def get_user_for_certificate(certificate): user = None for email in certificate.emails: try: user = UserModel.objects.get(email=UserModel.objects.normalize_email(email)) break except UserModel.DoesNotExist: continue return user class ClientCertificateBackend(BaseBackend): def authenticate(self, request, certificate=None): """ encoded_certificate is expected to be a URL encoded PEM certificate as sent by nginx when using the $ssl_client_escaped_cert """ user = get_user_for_certificate(certificate) if not user: user = UserModel.objects.create_user( certificate.emails[0], email=certificate.emails[0], is_staff=certificate.emails[0] in settings.CATS_ADMIN_EMAILS, ) try: user_certificate = UserCertificate.objects.get( serial_number=certificate.serial_number, issuer_name=certificate.issuer_common_name, ) if not user_certificate.user: user_certificate.user = user user_certificate.save() except UserCertificate.DoesNotExist: UserCertificate.objects.create( serial_number=certificate.serial_number, issuer_name=certificate.issuer_common_name, user=user, common_name=certificate.subject_common_name, ) return user def get_user(self, user_id): try: return UserModel.objects.get(pk=user_id) except UserModel.DoesNotExist: return None