#!/usr/bin/env python3 # This script is used to update DNS zones that have changed in git import argparse import os import tempfile from email.message import EmailMessage, MIMEPart from subprocess import DEVNULL, CalledProcessError, run from sys import stderr REFERENCE_BRANCH = "provisioned" DEFAULT_BRANCH = "origin/main" DNS_ADMINS = "critical-admin@cacert.org" def git_changed_files(reference_branch, target_branch): try: run(["git", "fetch"], check=True) except CalledProcessError as e: print("git fetch failed", e.returncode, file=stderr) return [] try: git_diff = run( [ "git", "diff", "--name-only", f"{reference_branch}..{target_branch}", ], check=True, capture_output=True, text=True, ) output = git_diff.stdout.strip() except CalledProcessError as e: print("git diff returned", e.returncode, file=stderr) return [] if not output: return [] return output.strip().splitlines() def pdns_managed_zones(secondary_only=False): command = ["pdnsutil", "list-all-zones"] if secondary_only: command += ["slave"] try: all_zones = run(command, check=True, capture_output=True, text=True) zones = all_zones.stdout.strip().splitlines() except CalledProcessError as e: print( "could not get list of zones from pdnsutil", e.returncode, e.stderr, file=stderr, ) return [] return zones def calculate_changed_zones(files, zones): """ Calculate the list of changed Zones from a list of file names and a list of supported zones. >>> calculate_changed_zones(["abc.edu", "vatican.mil"], ["abc.edu", "nsa.gov"]) ['abc.edu'] >>> calculate_changed_zones(["x.y", "a.b", "b.c"], ["a.b", "x.y"]) ['a.b', 'x.y'] """ return sorted(set(files).intersection(zones)) def remove_secondary_zones(changed_zones): """ Remove DNS zones from the given set where the current server is a secondary DNS server. """ secondary_zones = pdns_managed_zones(secondary_only=True) return sorted(set(changed_zones).difference(secondary_zones)) def generate_diff(zone, reference_branch, target_branch): diffresult = run( ["git", "diff", f"{reference_branch}..{target_branch}", "--", zone], check=True, capture_output=True, text=True, ) return diffresult.stdout.strip() def get_zone_data(zone, branch): """ Get the zone data for the zone from target branch excluding SOA record. """ result = run( ["git", "show", f"{branch}:{zone}"], check=True, capture_output=True, text=True ) lines = result.stdout.strip().splitlines() soa = "\n".join([l for l in lines if "SOA" in l]) non_soa = "\n".join([l for l in lines if not "SOA" in l]) return non_soa def list_zone(zone): result = run( ["pdnsutil", "list-zone", zone], check=True, capture_output=True, text=True ) lines = result.stdout.strip().splitlines() soa = "\n".join([l for l in lines if "SOA" in l]) non_soa = "\n".join([l for l in lines if not "SOA" in l]) return soa, non_soa def load_zone(zone, zonedata): with tempfile.NamedTemporaryFile( mode="w", encoding="utf-8", suffix=zone, delete=False ) as zonefile: print(zonedata, file=zonefile) zonefile.close() try: p = run( ["pdnsutil", "load-zone", zone, zonefile.name], check=True, capture_output=True, text=True, ) print(p.stdout.strip()) finally: os.unlink(zonefile.name) def check_zone(zone): p = run( ["pdnsutil", "check-zone", zone], check=True, capture_output=True, text=True ) print(p.stdout.strip()) def increase_serial(zone): p = run( ["pdnsutil", "increase-serial", zone], check=True, capture_output=True, text=True, ) print(p.stdout.strip()) def rectify_zone(zone): p = run( ["pdnsutil", "rectify-zone", zone], check=True, capture_output=True, text=True ) print(p.stdout.strip()) def send_audit_mail(diffs, audit_email_address, audit_sender_address, changelog): message = EmailMessage() message["Subject"] = "DNS changes applied" message["To"] = audit_email_address message["From"] = audit_sender_address body = """A DNS change has been applied from git. The following zones have been changed: """ for zone, _ in diffs: body += f"\n - {zone}" body += f""" This is the change log for the applied commits: {changelog} """ message.set_content(body) for zone, diff in diffs: message.add_attachment(diff, filename=f"{zone}.diff") run(["/usr/lib/sendmail", "-t", "-oi"], input=message.as_bytes(), check=True) def get_changelog(reference_branch, target_branch): r = run( ["git", "log", f"{reference_branch}..{target_branch}"], check=True, capture_output=True, text=True, ) return r.stdout.strip() def update_reference_branch(reference_branch, target_branch): """ Update the local git reference branch to track the target branch. """ run( ["git", "branch", "-D", reference_branch], check=True, stdout=DEVNULL, stderr=DEVNULL, ) run(["git", "branch", reference_branch, target_branch], check=True) def main(reference_branch, target_branch, audit_email_address, audit_sender_address): changed_files = git_changed_files( reference_branch=reference_branch, target_branch=target_branch ) zones_in_pdns = pdns_managed_zones() if not changed_files: print("no changed files from git") return if not zones_in_pdns: print("no zones known to powerdns") return changed_zones = calculate_changed_zones(changed_files, zones_in_pdns) changed_zones = remove_secondary_zones(changed_zones) if not changed_zones: print("no zones changed") update_reference_branch(reference_branch, target_branch) return diffs = [] for zone in changed_zones: diffs.append( ( zone, generate_diff( zone=zone, reference_branch=reference_branch, target_branch=target_branch, ), ) ) soa, old_zonedata = list_zone(zone) zonedata = get_zone_data(zone=zone, branch=target_branch) zonedata += f"\n{soa}" try: load_zone(zone, zonedata) check_zone(zone) increase_serial(zone) rectify_zone(zone) except CalledProcessError as pe: print( "process {} failed with return code {}:\n{}".format( " ".join(pe.cmd), pe.returncode, pe.stderr ), file=stderr, ) print(f"reverting zone {zone}", file=stderr) load_zone(zone, f"{old_zonedata}\n{soa}") changelog = get_changelog(reference_branch, target_branch) update_reference_branch(reference_branch, target_branch) send_audit_mail(diffs, audit_email_address, audit_sender_address, changelog) if __name__ == "__main__": argparser = argparse.ArgumentParser() argparser.add_argument( "-b", "--target-branch", default=DEFAULT_BRANCH, help="branch to get the latest zone files from", ) argparser.add_argument( "-r", "--reference-branch", default=REFERENCE_BRANCH, help="local branch that has the latest applied zone files", ) argparser.add_argument( "-f", "--audit-sender-address", default=DNS_ADMINS, help="email address which is used as sender of audit mail", ) argparser.add_argument( "-t", "--audit-email-address", default=DNS_ADMINS, help="email address which is used as recipient of audit mail", ) args = argparser.parse_args() main( target_branch=args.target_branch, reference_branch=args.reference_branch, audit_email_address=args.audit_email_address, audit_sender_address=args.audit_sender_address, )