Use sendmail instead of SMTP

- remove the SMTP requirement to be able to work with /usr/lib/sendmail
  instead
- use f-strings where appropriate to improve readability
- use text-parameter to subprocess.run to avoid extra decode calls
This commit is contained in:
Jan Dittberner 2022-09-17 09:15:51 +02:00 committed by Gitea
parent d93300732b
commit 424bd7954f

View file

@ -8,7 +8,6 @@ import tempfile
from subprocess import CalledProcessError, run
from sys import stderr
from email.message import EmailMessage, MIMEPart
from smtplib import SMTP
REFERENCE_BRANCH = "provisioned"
DEFAULT_BRANCH = "origin/main"
@ -28,12 +27,13 @@ def git_changed_files(reference_branch, target_branch):
"git",
"diff",
"--name-only",
"{}..{}".format(reference_branch, target_branch),
f"{reference_branch}..{target_branch}",
],
check=True,
capture_output=True,
text=True,
)
output = git_diff.stdout.decode("utf-8")
output = git_diff.stdout.strip()
except CalledProcessError as e:
print("git diff returned", e.returncode, file=stderr)
return []
@ -46,13 +46,15 @@ def git_changed_files(reference_branch, target_branch):
def pdns_managed_zones():
try:
all_zones = run(["pdnsutil", "list-all-zones"], check=True, capture_output=True)
zones = all_zones.stdout.decode("utf-8").strip().splitlines()
all_zones = run(
["pdnsutil", "list-all-zones"], check=True, capture_output=True, text=True
)
zones = all_zones.stdout.strip().splitlines()
except CalledProcessError as e:
print(
"could not get list of zones from pdnsutil",
e.returncode,
e.stderr.decode("utf-8"),
e.stderr,
file=stderr,
)
return []
@ -75,11 +77,12 @@ def calculate_changed_zones(files, zones):
def generate_diff(zone, reference_branch, target_branch):
diffresult = run(
["git", "diff", "{}..{}".format(reference_branch, target_branch), "--", zone],
["git", "diff", f"{reference_branch}..{target_branch}", "--", zone],
check=True,
capture_output=True,
text=True,
)
return diffresult.stdout.decode("utf-8").strip()
return diffresult.stdout.strip()
def get_zone_data(zone, branch):
@ -87,19 +90,21 @@ def get_zone_data(zone, branch):
Get the zone data for the zone from target branch excluding SOA record.
"""
result = run(
["git", "show", "{}:{}".format(branch, zone)], check=True, capture_output=True
["git", "show", f"{branch}:{zone}"], check=True, capture_output=True, text=True
)
lines = result.stdout.decode("utf-8").strip().splitlines()
lines = result.stdout.strip().splitlines()
soa = "\n".join([l for l in lines if "SOA" in l])
non_soa = "\n".join([l for l in lines if not "SOA" in l])
return non_soa
def list_zone(zone):
result = run(["pdnsutil", "list-zone", zone], check=True, capture_output=True)
result = run(
["pdnsutil", "list-zone", zone], check=True, capture_output=True, text=True
)
lines = result.stdout.decode("utf-8").strip().splitlines()
lines = result.stdout.strip().splitlines()
soa = "\n".join([l for l in lines if "SOA" in l])
non_soa = "\n".join([l for l in lines if not "SOA" in l])
return soa, non_soa
@ -116,46 +121,73 @@ def load_zone(zone, zonedata):
["pdnsutil", "load-zone", zone, zonefile.name],
check=True,
capture_output=True,
text=True,
)
print(p.stdout.decode("utf-8").strip())
print(p.stdout.strip())
finally:
os.unlink(zonefile.name)
def check_zone(zone):
p = run(["pdnsutil", "check-zone", zone], check=True, capture_output=True)
print(p.stdout.decode("utf-8").strip())
p = run(
["pdnsutil", "check-zone", zone], check=True, capture_output=True, text=True
)
print(p.stdout.strip())
def increase_serial(zone):
p = run(["pdnsutil", "increase-serial", zone], check=True, capture_output=True)
print(p.stdout.decode("utf-8").strip())
p = run(
["pdnsutil", "increase-serial", zone],
check=True,
capture_output=True,
text=True,
)
print(p.stdout.strip())
def rectify_zone(zone):
p = run(["pdnsutil", "rectify-zone", zone], check=True, capture_output=True)
print(p.stdout.decode("utf-8").strip())
p = run(
["pdnsutil", "rectify-zone", zone], check=True, capture_output=True, text=True
)
print(p.stdout.strip())
def send_audit_mail(diffs, audit_email_address, audit_sender_address):
def send_audit_mail(diffs, audit_email_address, audit_sender_address, changelog):
message = EmailMessage()
message["Subject"] = "DNS changes applied"
message["To"] = audit_email_address
message["From"] = audit_sender_address
body = """The following zones have been changed:
body = """A DNS change has been applied from git.
The following zones have been changed:
"""
for zone, _ in diffs:
body += "\n- {}".format(zone)
body += f"\n - {zone}"
body += f"""
This is the change log for the applied commits:
{changelog}
"""
message.set_content(body)
for zone, diff in diffs:
message.add_attachment(diff, filename="{}.diff".format(zone))
message.add_attachment(diff, filename=f"{zone}.diff")
server = SMTP("localhost")
server.send_message(message)
server.quit()
run(["/usr/lib/sendmail", "-t", "-oi"], input=message.as_bytes(), check=True)
def get_changelog(reference_branch, target_branch):
r = run(
["git", "log", f"{reference_branch}..{target_branch}"],
check=True,
capture_output=True,
text=True,
)
return r.stdout.strip()
def main(reference_branch, target_branch, audit_email_address, audit_sender_address):
@ -195,7 +227,7 @@ def main(reference_branch, target_branch, audit_email_address, audit_sender_addr
soa, old_zonedata = list_zone(zone)
zonedata = get_zone_data(zone=zone, branch=target_branch)
zonedata += "\n{}".format(soa)
zonedata += f"\n{soa}"
try:
load_zone(zone, zonedata)
@ -205,17 +237,19 @@ def main(reference_branch, target_branch, audit_email_address, audit_sender_addr
except CalledProcessError as pe:
print(
"process {} failed with return code {}:\n{}".format(
" ".join(pe.cmd), pe.returncode, pe.stderr.decode("utf-8")
" ".join(pe.cmd), pe.returncode, pe.stderr
),
file=stderr,
)
print("reverting zone {}".format(zone), file=stderr)
load_zone(zone, "{}\n{}".format(old_zonedata, soa))
print(f"reverting zone {zone}", file=stderr)
load_zone(zone, f"{old_zonedata}\n{soa}")
changelog = get_changelog(reference_branch, target_branch)
run(["git", "branch", "-d", reference_branch], check=True)
run(["git", "branch", reference_branch, target_branch], check=True)
send_audit_mail(diffs, audit_email_address, audit_sender_address)
send_audit_mail(diffs, audit_email_address, audit_sender_address, changelog)
if __name__ == "__main__":