Skip to content
Snippets Groups Projects
Commit a50746ca authored by nimrod's avatar nimrod
Browse files

Major refactor.

- Check base class, all checks derive from it and have a common calling
convention.
- Add retries.
- Add a DNS zone transfer check. Turns out if fails on ns1. WIP.
- A wrapper around checking web sites. Just provide a list of URLs.
parent 8ad9776a
No related branches found
No related tags found
No related merge requests found
Pipeline #1378 passed
from functools import lru_cache
import dns.resolver # pylint: disable=import-error
import utils
import dns.query # pylint: disable=import-error
import dns.zone # pylint: disable=import-error
from utils import Check
DOMAIN = "shore.co.il"
SUBDOMAINS = [DOMAIN, f"www.{DOMAIN}"]
class CheckDNS(Check): # pylint: disable=abstract-method
_domain = "shore.co.il"
_subdomains = [_domain, f"www.{_domain}"]
def get_resolvers():
@lru_cache(3)
def _get_resolvers(self):
"""Return a list of resolvers for each nameserver in the DNS zone."""
default_resolver = dns.resolver.Resolver()
resolvers = []
for ns in default_resolver.query( # pylint: disable=invalid-name
DOMAIN, "NS"
self._domain, "NS"
):
for address in default_resolver.query(ns.to_text()):
resolver = dns.resolver.Resolver(configure=False)
......@@ -19,74 +23,88 @@ def get_resolvers():
resolvers.append(resolver)
return resolvers
RESOLVERS = get_resolvers()
def cross_query(qname, rdtype="A"):
def _cross_query(self, qname, rdtype="A"):
"""Return all of the answers from all nameservers."""
answers = set()
for resolver in RESOLVERS:
for resolver in self._resolvers:
for address in resolver.query(qname, rdtype):
answers.add(address)
return answers
def __init__(self):
self._resolvers = self._get_resolvers()
def validate_soa():
class CheckSOA(CheckDNS):
def _check(self):
"""Validate the SOA record."""
soas = cross_query(DOMAIN, "SOA")
soas = self._cross_query(self._domain, "SOA")
if len(soas) > 1:
return [False, "SOA records don't match."]
return False, "SOA records don't match."
try:
record = soas.pop()
record.mname.to_text()
except Exception as e: # pylint: disable=broad-except,invalid-name
print(str(e))
return [False, "SOA record is invalid."]
return False, str(e)
return [True, "SOA record validated."]
return True, "SOA record validated."
def validate_mx():
class CheckMX(CheckDNS):
def _check(self):
"""Validate the MX record."""
mxs = cross_query(DOMAIN, "MX")
mxs = self._cross_query(self._domain, "MX")
if len(mxs) > 1:
return [False, "MX records don't match."]
return False, "MX records don't match."
try:
record = mxs.pop()
ips = cross_query(record.exchange.to_text())
ips = self._cross_query(record.exchange.to_text())
if len(ips) > 1:
return [False, "MX records don't match."]
return False, "MX records don't match."
except Exception as e: # pylint: disable=broad-except,invalid-name
print(str(e))
return [False, "MX record is invalid."]
return [False, str(e)]
return [True, "MX record validated."]
return True, "MX record validated."
def validate_subdomains():
class CheckSubDomains(CheckDNS):
def _check(self):
"""Validate important subdomains."""
for domain in SUBDOMAINS:
for domain in self._subdomains:
try:
ips = cross_query(domain)
ips = self._cross_query(domain)
if len(ips) > 1:
return [True, f"Domain {domain} records don't match."]
except Exception as e: # pylint: disable=broad-except,invalid-name
print(str(e))
return [False, "Failed to validate domain {d}."]
return [True, "Subdomains validated."]
return [False, str(e)]
return True, "Subdomains validated."
class CheckTransfer(CheckDNS):
def _check(self):
"""Validate zone transfer, to check the TCP transport."""
zones = []
for resolver in self._resolvers:
try:
zone = dns.zone.from_xfr(
dns.query.xfr(resolver.nameservers[0], self._domain)
)
zones.append(zone)
except Exception as e: # pylint: disable=broad-except,invalid-name
return False, str(e)
return True, "Zone transfer validated."
def handler(event, context): # pylint: disable=unused-argument
"""Lambda event handler."""
for check in [validate_soa, validate_mx, validate_subdomains]:
success, message = check.__call__()
print(message)
if not success:
utils.publish(message)
CheckSOA().run()
CheckMX().run()
CheckSubDomains().run()
# CheckTransfer().run() # AXFR is blocked on ns1, need to figure out why.
if __name__ == "__main__":
......
from utils import check_urls
from utils import website_handler
def handler(event, context): # pylint: disable=unused-argument
"""Lambda event handler."""
checks = [
URLS = [
{"url": "https://autoconfig.shore.co.il/mail/config-v1.1.xml"},
]
check_urls(checks)
handler = website_handler(URLS)
if __name__ == "__main__":
......
from utils import check_urls
from utils import website_handler
def handler(event, context): # pylint: disable=unused-argument
"""Lambda event handler."""
checks = [
URLS = [
{"url": "http://git.shore.co.il/", "codes": [301, 302]},
{"url": "https://git.shore.co.il/", "codes": [301, 302]},
{"url": "https://git.shore.co.il/explore/"},
]
check_urls(checks)
handler = website_handler(URLS)
if __name__ == "__main__":
......
from imaplib import IMAP4_SSL
from utils import publish
from utils import Check
def check_imap():
class CheckIMAP(Check):
def _check(self):
"""Check the IMAP port."""
try:
imap = IMAP4_SSL("imap.shore.co.il", 993)
......@@ -19,10 +20,7 @@ def check_imap():
def handler(event, context): # pylint: disable=unused-argument
"""Lambda event handler."""
success, message = check_imap()
print(message)
if not success:
publish(message)
CheckIMAP().run()
if __name__ == "__main__":
......
from utils import check_urls
from utils import website_handler
def handler(event, context): # pylint: disable=unused-argument
"""Lambda event handler."""
checks = [
URLS = [
{"url": "http://kodi.shore.co.il/", "codes": [301, 302]},
{"url": "https://kodi.shore.co.il/", "codes": [401]},
]
check_urls(checks)
handler = website_handler(URLS)
if __name__ == "__main__":
......
from utils import check_urls
from utils import website_handler
def handler(event, context): # pylint: disable=unused-argument
"""Lambda event handler."""
checks = [
URLS = [
{"url": "https://mta-sts.shore.co.il/.well-known/mta-sts.txt"},
]
check_urls(checks)
handler = website_handler(URLS)
if __name__ == "__main__":
......
from utils import check_urls
from utils import website_handler
def handler(event, context): # pylint: disable=unused-argument
"""Lambda event handler."""
checks = [
URLS = [
{"url": "http://myip.shore.co.il/"},
{"url": "https://myip.shore.co.il/"},
]
check_urls(checks)
handler = website_handler(URLS)
if __name__ == "__main__":
......
from utils import check_urls
from utils import website_handler
def handler(event, context): # pylint: disable=unused-argument
"""Lambda event handler."""
checks = [
URLS = [
{"url": "http://nextcloud.shore.co.il/", "codes": [301, 302]},
{"url": "https://nextcloud.shore.co.il/", "codes": [301, 302]},
{"url": "https://nextcloud.shore.co.il/login"},
]
check_urls(checks)
handler = website_handler(URLS)
if __name__ == "__main__":
......
from utils import check_urls
from utils import website_handler
def handler(event, context): # pylint: disable=unused-argument
"""Lambda event handler."""
checks = [
URLS = [
{"url": "http://notify.shore.co.il/", "codes": [301, 302]},
{"url": "https://notify.shore.co.il/ping"},
]
check_urls(checks)
handler = website_handler(URLS)
if __name__ == "__main__":
......
from utils import check_urls
from utils import website_handler
def handler(event, context): # pylint: disable=unused-argument
"""Lambda event handler."""
checks = [
URLS = [
{"url": "http://registry.shore.co.il/", "codes": [301, 302]},
{"url": "https://registry.shore.co.il/"},
{"url": "https://registry.shore.co.il/v2/_catalog"},
]
check_urls(checks)
handler = website_handler(URLS)
if __name__ == "__main__":
......
from smtplib import SMTP
from utils import publish
from utils import Check
def check_smtp(port):
class CheckSMTP(Check):
def __init__(self, port):
self._port = port
def _check(self):
"""Check the SMTP port."""
try:
smtp = SMTP("smtp.shore.co.il", port)
smtp = SMTP("smtp.shore.co.il", self._port)
ehlo = smtp.ehlo()
if ehlo[0] != 250 or "LOGIN" in ehlo[1].decode().split():
return [False, f"First EHLO on port {port} failed."]
return [False, f"First EHLO on port {self._port} failed."]
if smtp.starttls() != (220, b"TLS go ahead"):
return [False, f"STARTTLS on port {port} failed."]
return [False, f"STARTTLS on port {self._port} failed."]
ehlo = smtp.ehlo()
if ehlo[0] != 250 or "LOGIN" not in ehlo[1].decode().split():
return [False, f"Second EHLO on port {port} failed."]
return [False, f"Second EHLO on port {self._port} failed."]
smtp.close()
except Exception as e: # pylint: disable=broad-except,invalid-name
print(str(e))
return [False, f"SMTP failure on port {port}."]
return [True, f"SMTP on port {port} is OK."]
return [False, f"SMTP failure on port {self._port}."]
return [True, f"SMTP on port {self._port} is OK."]
def handler(event, context): # pylint: disable=unused-argument
......@@ -28,10 +32,7 @@ def handler(event, context): # pylint: disable=unused-argument
# raise the cost. For now assume that if the submission port is working,
# the smtp port is working too (same gateway, same host, same processes).
for port in [587]:
success, message = check_smtp(port)
print(message)
if not success:
publish(message)
CheckSMTP(port).run()
if __name__ == "__main__":
......
import socket
from utils import publish
from utils import Check
def check_ssh(host, port=22):
class CheckSSH(Check):
def __init__(self, hostname, port=22):
self._hostname = hostname
self._port = port
def _check(self):
"""Check that an SSH server is available on that host and port."""
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
try:
sock.connect((host, port))
sock.connect((self._hostname, self._port))
msg = sock.recv(1024)
sock.close()
return msg.startswith(b"SSH-2.0-OpenSSH")
if msg.startswith(b"SSH-2.0-OpenSSH"):
return True, f"SSH on {self._hostname}:{self._port} is OK."
return False, f"SSH on {self._hostname}:{self._port} failed."
except Exception as e: # pylint: disable=broad-except,invalid-name
print(str(e))
return False
return False, str(e)
def handler(event, context): # pylint: disable=unused-argument
"""Lambda event handler."""
for host in ["ns1.shore.co.il", "ns4.shore.co.il"]:
if check_ssh(host):
print(f"SSH on {host} is OK.")
else:
message = f"SSH on {host} failed."
print(message)
publish(message)
CheckSSH(host).run()
if __name__ == "__main__":
......
from utils import check_urls
from utils import website_handler
def handler(event, context): # pylint: disable=unused-argument
"""Lambda event handler."""
checks = [
URLS = [
{"url": "http://transmission.shore.co.il/", "codes": [301, 302]},
{"url": "https://transmission.shore.co.il/", "codes": [401]},
]
check_urls(checks)
handler = website_handler(URLS)
if __name__ == "__main__":
......
......@@ -3,35 +3,65 @@ import boto3 # pylint: disable=import-error
import requests # pylint: disable=import-error
TOPIC_ARN = os.getenv("TOPIC_ARN")
class Check:
_topic_arn = os.getenv("TOPIC_ARN")
_retries = int(os.getenv("RETRIES", "2"))
def run(self):
"""Run the check function with retries. Alert if failed."""
for _ in range(self._retries):
success, message = self._check()
print(message)
if success:
break
else:
self.publish(message)
return success
def _check(self):
"""The actual check."""
raise NotImplementedError
def publish(message):
def publish(self, message):
"""Publish an SNS message."""
if self._topic_arn is None:
print(f"Publish: {message}")
else:
client = boto3.client("sns")
client.publish(TopicArn=TOPIC_ARN, Message=message)
client.publish(TopicArn=self._topic_arn, Message=message)
class CheckURL(Check):
_method = "GET"
_valid_codes = (200,)
def check_url(url, method="GET", valid_codes=(200)):
"""Checks validaty of a URL.
def __init__(self, url, method=None, codes=None):
"""Initialize the check object."""
self._url = url
if method is not None:
self._method = method
if codes is not None:
self._valid_codes = codes
Allows specifying the HTTP method and a list of valid codes."""
def _check(self):
"""Checks validaty of a URL."""
try:
response = requests.request(method, url, allow_redirects=False)
return response.status_code in valid_codes
response = requests.request(
self._method, self._url, allow_redirects=False
)
return (
response.status_code in self._valid_codes,
f"{self._url} is OK.",
)
except Exception as e: # pylint: disable=broad-except,invalid-name
print(str(e))
return False
return False, str(e)
def check_urls(checks):
"""Check a list of URLs."""
for check in checks:
if check_url(
check["url"], check.get("method", "GET"), check.get("codes", [200])
):
print(f"{check['url']} is OK.")
else:
message = f"Failed check for {check['url']}."
print(message)
publish(message)
def website_handler(urls): # pylint: disable=unused-argument
"""Return a Lambda event handler for checking web sites."""
def _handler(event, context): # pylint: disable=unused-argument
for url in urls:
CheckURL(url["url"], url.get("method"), url.get("codes")).run()
return _handler
from utils import check_urls
from utils import website_handler
def handler(event, context): # pylint: disable=unused-argument
"""Lambda event handler."""
checks = [
URLS = [
{"url": "http://vouch.shore.co.il/", "codes": [301, 302]},
{"url": "https://vouch.shore.co.il/validate", "codes": [401]},
]
check_urls(checks)
handler = website_handler(URLS)
if __name__ == "__main__":
......
from utils import check_urls
from utils import website_handler
def handler(event, context): # pylint: disable=unused-argument
"""Lambda event handler."""
checks = [
URLS = [
{"url": "http://shore.co.il/", "codes": [301, 302]},
{"url": "http://www.shore.co.il/", "codes": [301, 302]},
{"url": "https://shore.co.il/", "codes": [301, 302]},
{"url": "https://www.shore.co.il/blog"},
]
check_urls(checks)
handler = website_handler(URLS)
if __name__ == "__main__":
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment