1
0
Fork 0

Implement update-zones.py to update zones from git

- ignore temporary files and Python bytecode
- add update-zones.py
add-secure1-alias-for-www1
Jan Dittberner 2 years ago committed by Gitea
parent f70a11c863
commit d93300732b

2
.gitignore vendored

@ -0,0 +1,2 @@
.*.swp
__pycache__/

@ -0,0 +1,254 @@
#!/usr/bin/env python3
# This script is used to update DNS zones that have changed in git
import argparse
import os
import tempfile
from subprocess import CalledProcessError, run
from sys import stderr
from email.message import EmailMessage, MIMEPart
from smtplib import SMTP
REFERENCE_BRANCH = "provisioned"
DEFAULT_BRANCH = "origin/main"
DNS_ADMINS = "critical-admin@cacert.org"
def git_changed_files(reference_branch, target_branch):
try:
run(["git", "fetch"], check=True)
except CalledProcessError as e:
print("git fetch failed", e.returncode, file=stderr)
return []
try:
git_diff = run(
[
"git",
"diff",
"--name-only",
"{}..{}".format(reference_branch, target_branch),
],
check=True,
capture_output=True,
)
output = git_diff.stdout.decode("utf-8")
except CalledProcessError as e:
print("git diff returned", e.returncode, file=stderr)
return []
if not output:
return []
return output.strip().splitlines()
def pdns_managed_zones():
try:
all_zones = run(["pdnsutil", "list-all-zones"], check=True, capture_output=True)
zones = all_zones.stdout.decode("utf-8").strip().splitlines()
except CalledProcessError as e:
print(
"could not get list of zones from pdnsutil",
e.returncode,
e.stderr.decode("utf-8"),
file=stderr,
)
return []
return zones
def calculate_changed_zones(files, zones):
"""
Calculate the list of changed Zones from a list of file names and a list of
supported zones.
>>> calculate_changed_zones(["abc.edu", "vatican.mil"], ["abc.edu", "nsa.gov"])
['abc.edu']
>>> calculate_changed_zones(["x.y", "a.b", "b.c"], ["a.b", "x.y"])
['a.b', 'x.y']
"""
return sorted(set(files).intersection(zones))
def generate_diff(zone, reference_branch, target_branch):
diffresult = run(
["git", "diff", "{}..{}".format(reference_branch, target_branch), "--", zone],
check=True,
capture_output=True,
)
return diffresult.stdout.decode("utf-8").strip()
def get_zone_data(zone, branch):
"""
Get the zone data for the zone from target branch excluding SOA record.
"""
result = run(
["git", "show", "{}:{}".format(branch, zone)], check=True, capture_output=True
)
lines = result.stdout.decode("utf-8").strip().splitlines()
soa = "\n".join([l for l in lines if "SOA" in l])
non_soa = "\n".join([l for l in lines if not "SOA" in l])
return non_soa
def list_zone(zone):
result = run(["pdnsutil", "list-zone", zone], check=True, capture_output=True)
lines = result.stdout.decode("utf-8").strip().splitlines()
soa = "\n".join([l for l in lines if "SOA" in l])
non_soa = "\n".join([l for l in lines if not "SOA" in l])
return soa, non_soa
def load_zone(zone, zonedata):
with tempfile.NamedTemporaryFile(
mode="w", encoding="utf-8", suffix=zone, delete=False
) as zonefile:
print(zonedata, file=zonefile)
zonefile.close()
try:
p = run(
["pdnsutil", "load-zone", zone, zonefile.name],
check=True,
capture_output=True,
)
print(p.stdout.decode("utf-8").strip())
finally:
os.unlink(zonefile.name)
def check_zone(zone):
p = run(["pdnsutil", "check-zone", zone], check=True, capture_output=True)
print(p.stdout.decode("utf-8").strip())
def increase_serial(zone):
p = run(["pdnsutil", "increase-serial", zone], check=True, capture_output=True)
print(p.stdout.decode("utf-8").strip())
def rectify_zone(zone):
p = run(["pdnsutil", "rectify-zone", zone], check=True, capture_output=True)
print(p.stdout.decode("utf-8").strip())
def send_audit_mail(diffs, audit_email_address, audit_sender_address):
message = EmailMessage()
message["Subject"] = "DNS changes applied"
message["To"] = audit_email_address
message["From"] = audit_sender_address
body = """The following zones have been changed:
"""
for zone, _ in diffs:
body += "\n- {}".format(zone)
message.set_content(body)
for zone, diff in diffs:
message.add_attachment(diff, filename="{}.diff".format(zone))
server = SMTP("localhost")
server.send_message(message)
server.quit()
def main(reference_branch, target_branch, audit_email_address, audit_sender_address):
changed_files = git_changed_files(
reference_branch=reference_branch, target_branch=target_branch
)
zones_in_pdns = pdns_managed_zones()
if not changed_files:
print("no changed files from git")
return
if not zones_in_pdns:
print("no zones known to powerdns")
return
changed_zones = calculate_changed_zones(changed_files, zones_in_pdns)
if not changed_zones:
print("no zones changed")
return
diffs = []
for zone in changed_zones:
diffs.append(
(
zone,
generate_diff(
zone=zone,
reference_branch=reference_branch,
target_branch=target_branch,
),
)
)
soa, old_zonedata = list_zone(zone)
zonedata = get_zone_data(zone=zone, branch=target_branch)
zonedata += "\n{}".format(soa)
try:
load_zone(zone, zonedata)
check_zone(zone)
increase_serial(zone)
rectify_zone(zone)
except CalledProcessError as pe:
print(
"process {} failed with return code {}:\n{}".format(
" ".join(pe.cmd), pe.returncode, pe.stderr.decode("utf-8")
),
file=stderr,
)
print("reverting zone {}".format(zone), file=stderr)
load_zone(zone, "{}\n{}".format(old_zonedata, soa))
run(["git", "branch", "-d", reference_branch], check=True)
run(["git", "branch", reference_branch, target_branch], check=True)
send_audit_mail(diffs, audit_email_address, audit_sender_address)
if __name__ == "__main__":
argparser = argparse.ArgumentParser()
argparser.add_argument(
"-b",
"--target-branch",
default=DEFAULT_BRANCH,
help="branch to get the latest zone files from",
)
argparser.add_argument(
"-r",
"--reference-branch",
default=REFERENCE_BRANCH,
help="local branch that has the latest applied zone files",
)
argparser.add_argument(
"-f",
"--audit-sender-address",
default=DNS_ADMINS,
help="email address which is used as sender of audit mail",
)
argparser.add_argument(
"-t",
"--audit-email-address",
default=DNS_ADMINS,
help="email address which is used as recipient of audit mail",
)
args = argparser.parse_args()
main(
target_branch=args.target_branch,
reference_branch=args.reference_branch,
audit_email_address=args.audit_email_address,
audit_sender_address=args.audit_sender_address,
)
Loading…
Cancel
Save