Newer
Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
# pylint: disable=import-error,invalid-name
import dns.resolver
import utils
DOMAIN = "shore.co.il"
SUBDOMAINS = [DOMAIN, f"www.{DOMAIN}"]
def get_resolvers():
"""Return a list of resolvers for each nameserver in the DNS zone."""
default_resolver = dns.resolver.Resolver()
resolvers = []
for ns in default_resolver.query(DOMAIN, "NS"):
for address in default_resolver.query(ns.to_text()):
resolver = dns.resolver.Resolver(configure=False)
resolver.nameservers = [address.to_text()]
resolvers.append(resolver)
return resolvers
RESOLVERS = get_resolvers()
def cross_query(qname, rdtype="A"):
"""Return all of the answers from all nameservers."""
answers = set()
for r in RESOLVERS:
for a in r.query(qname, rdtype):
answers.add(a)
return answers
def validate_soa():
"""Validate the SOA record."""
soas = cross_query(DOMAIN, "SOA")
if len(soas) > 1:
return [False, "SOA records don't match."]
try:
r = soas.pop()
r.mname.to_text()
except Exception: # pylint: disable=broad-except
return [False, "SOA record is invalid."]
return [True, "SOA record validated."]
def validate_mx():
"""Validate the MX record."""
mxs = cross_query(DOMAIN, "MX")
if len(mxs) > 1:
return [False, "MX records don't match."]
try:
r = mxs.pop()
ips = cross_query(r.exchange.to_text())
if len(ips) > 1:
return [False, "MX records don't match."]
except Exception: # pylint: disable=broad-except
return [False, "MX record is invalid."]
return [True, "MX record validated."]
def validate_subdomains():
"""Validate important subdomains."""
for d in SUBDOMAINS:
try:
ips = cross_query(d)
if len(ips) > 1:
return [True, f"Domain {d} records don't match."]
except Exception: # pylint: disable=broad-except
return [False, "Failed to validate domain {d}."]
return [True, "Subdomains validated."]
def handler(event, context): # pylint: disable=unused-argument
"""Lambda event handler."""
for c in [validate_soa, validate_mx, validate_subdomains]:
success, message = c.__call__()
print(message)
if not success:
utils.publish(message)
if __name__ == "__main__":
handler("event", "context")