dns-zones/update-zones.py

315 lines
8.3 KiB
Python
Executable file

#!/usr/bin/env python3
# This script is used to update DNS zones that have changed in git
import argparse
import os
import tempfile
from email.message import EmailMessage, MIMEPart
from subprocess import DEVNULL, CalledProcessError, run
from sys import stderr
REFERENCE_BRANCH = "provisioned"
DEFAULT_BRANCH = "origin/main"
DNS_ADMINS = "critical-admin@cacert.org"
def git_changed_files(reference_branch, target_branch):
try:
run(["git", "fetch"], check=True)
except CalledProcessError as e:
print("git fetch failed", e.returncode, file=stderr)
return []
try:
git_diff = run(
[
"git",
"diff",
"--name-only",
f"{reference_branch}..{target_branch}",
],
check=True,
capture_output=True,
text=True,
)
output = git_diff.stdout.strip()
except CalledProcessError as e:
print("git diff returned", e.returncode, file=stderr)
return []
if not output:
return []
return output.strip().splitlines()
def pdns_managed_zones(secondary_only=False):
command = ["pdnsutil", "list-all-zones"]
if secondary_only:
command += ["slave"]
try:
all_zones = run(command, check=True, capture_output=True, text=True)
zones = all_zones.stdout.strip().splitlines()
except CalledProcessError as e:
print(
"could not get list of zones from pdnsutil",
e.returncode,
e.stderr,
file=stderr,
)
return []
return zones
def calculate_changed_zones(files, zones):
"""
Calculate the list of changed Zones from a list of file names and a list of
supported zones.
>>> calculate_changed_zones(["abc.edu", "vatican.mil"], ["abc.edu", "nsa.gov"])
['abc.edu']
>>> calculate_changed_zones(["x.y", "a.b", "b.c"], ["a.b", "x.y"])
['a.b', 'x.y']
"""
return sorted(set(files).intersection(zones))
def remove_secondary_zones(changed_zones):
"""
Remove DNS zones from the given set where the current server is a secondary DNS server.
"""
secondary_zones = pdns_managed_zones(secondary_only=True)
return sorted(set(changed_zones).difference(secondary_zones))
def generate_diff(zone, reference_branch, target_branch):
diffresult = run(
["git", "diff", f"{reference_branch}..{target_branch}", "--", zone],
check=True,
capture_output=True,
text=True,
)
return diffresult.stdout.strip()
def get_zone_data(zone, branch):
"""
Get the zone data for the zone from target branch excluding SOA record.
"""
result = run(
["git", "show", f"{branch}:{zone}"], check=True, capture_output=True, text=True
)
lines = result.stdout.strip().splitlines()
soa = "\n".join([l for l in lines if "SOA" in l])
non_soa = "\n".join([l for l in lines if not "SOA" in l])
return non_soa
def list_zone(zone):
result = run(
["pdnsutil", "list-zone", zone], check=True, capture_output=True, text=True
)
lines = result.stdout.strip().splitlines()
soa = "\n".join([l for l in lines if "SOA" in l])
non_soa = "\n".join([l for l in lines if not "SOA" in l])
return soa, non_soa
def load_zone(zone, zonedata):
with tempfile.NamedTemporaryFile(
mode="w", encoding="utf-8", suffix=zone, delete=False
) as zonefile:
print(zonedata, file=zonefile)
zonefile.close()
try:
p = run(
["pdnsutil", "load-zone", zone, zonefile.name],
check=True,
capture_output=True,
text=True,
)
print(p.stdout.strip())
finally:
os.unlink(zonefile.name)
def check_zone(zone):
p = run(
["pdnsutil", "check-zone", zone], check=True, capture_output=True, text=True
)
print(p.stdout.strip())
def increase_serial(zone):
p = run(
["pdnsutil", "increase-serial", zone],
check=True,
capture_output=True,
text=True,
)
print(p.stdout.strip())
def rectify_zone(zone):
p = run(
["pdnsutil", "rectify-zone", zone], check=True, capture_output=True, text=True
)
print(p.stdout.strip())
def send_audit_mail(diffs, audit_email_address, audit_sender_address, changelog):
message = EmailMessage()
message["Subject"] = "DNS changes applied"
message["To"] = audit_email_address
message["From"] = audit_sender_address
body = """A DNS change has been applied from git.
The following zones have been changed:
"""
for zone, _ in diffs:
body += f"\n - {zone}"
body += f"""
This is the change log for the applied commits:
{changelog}
"""
message.set_content(body)
for zone, diff in diffs:
message.add_attachment(diff, filename=f"{zone}.diff")
run(["/usr/lib/sendmail", "-t", "-oi"], input=message.as_bytes(), check=True)
def get_changelog(reference_branch, target_branch):
r = run(
["git", "log", f"{reference_branch}..{target_branch}"],
check=True,
capture_output=True,
text=True,
)
return r.stdout.strip()
def update_reference_branch(reference_branch, target_branch):
"""
Update the local git reference branch to track the target branch.
"""
run(
["git", "branch", "-D", reference_branch],
check=True,
stdout=DEVNULL,
stderr=DEVNULL,
)
run(["git", "branch", reference_branch, target_branch], check=True)
def main(reference_branch, target_branch, audit_email_address, audit_sender_address):
changed_files = git_changed_files(
reference_branch=reference_branch, target_branch=target_branch
)
zones_in_pdns = pdns_managed_zones()
if not changed_files:
print("no changed files from git")
return
if not zones_in_pdns:
print("no zones known to powerdns")
return
changed_zones = calculate_changed_zones(changed_files, zones_in_pdns)
changed_zones = remove_secondary_zones(changed_zones)
if not changed_zones:
print("no zones changed")
update_reference_branch(reference_branch, target_branch)
return
diffs = []
for zone in changed_zones:
diffs.append(
(
zone,
generate_diff(
zone=zone,
reference_branch=reference_branch,
target_branch=target_branch,
),
)
)
soa, old_zonedata = list_zone(zone)
zonedata = get_zone_data(zone=zone, branch=target_branch)
zonedata += f"\n{soa}"
try:
load_zone(zone, zonedata)
check_zone(zone)
increase_serial(zone)
rectify_zone(zone)
except CalledProcessError as pe:
print(
"process {} failed with return code {}:\n{}".format(
" ".join(pe.cmd), pe.returncode, pe.stderr
),
file=stderr,
)
print(f"reverting zone {zone}", file=stderr)
load_zone(zone, f"{old_zonedata}\n{soa}")
changelog = get_changelog(reference_branch, target_branch)
update_reference_branch(reference_branch, target_branch)
send_audit_mail(diffs, audit_email_address, audit_sender_address, changelog)
if __name__ == "__main__":
argparser = argparse.ArgumentParser()
argparser.add_argument(
"-b",
"--target-branch",
default=DEFAULT_BRANCH,
help="branch to get the latest zone files from",
)
argparser.add_argument(
"-r",
"--reference-branch",
default=REFERENCE_BRANCH,
help="local branch that has the latest applied zone files",
)
argparser.add_argument(
"-f",
"--audit-sender-address",
default=DNS_ADMINS,
help="email address which is used as sender of audit mail",
)
argparser.add_argument(
"-t",
"--audit-email-address",
default=DNS_ADMINS,
help="email address which is used as recipient of audit mail",
)
args = argparser.parse_args()
main(
target_branch=args.target_branch,
reference_branch=args.reference_branch,
audit_email_address=args.audit_email_address,
audit_sender_address=args.audit_sender_address,
)