Skip to content
utils.py 1.41 KiB
Newer Older
nimrod's avatar
nimrod committed
# pylint: disable=import-error
import os
nimrod's avatar
nimrod committed
import socket
nimrod's avatar
nimrod committed
import boto3
nimrod's avatar
nimrod committed
import requests
nimrod's avatar
nimrod committed


TOPIC_ARN = os.getenv("TOPIC_ARN")


def publish(message):
    """Publish an SNS message."""
    client = boto3.client("sns")
    client.publish(TopicArn=TOPIC_ARN, Message=message)
nimrod's avatar
nimrod committed


def check_url(url, method="GET", valid_codes=(200)):
nimrod's avatar
nimrod committed
    """Checks validaty of a URL.
nimrod's avatar
nimrod committed

    Allows specifying the HTTP method and a list of valid codes."""
    try:
        response = requests.request(method, url, allow_redirects=False)
        return response.status_code in valid_codes
nimrod's avatar
nimrod committed
    except Exception as e:  # pylint: disable=broad-except,invalid-name
        print(str(e))
nimrod's avatar
nimrod committed
        return False
nimrod's avatar
nimrod committed


def check_urls(checks):
    """Check a list of URLs."""
    for check in checks:
        if check_url(
            check["url"], check.get("method", "GET"), check.get("codes", [200])
        ):
            print(f"{check['url']} is OK.")
        else:
            message = f"Failed check for {check['url']}."
            print(message)
            publish(message)
nimrod's avatar
nimrod committed


def check_ssh(host, port=22):
    """Check that an SSH server is available on that host and port."""
    sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    try:
        sock.connect((host, port))
        msg = sock.recv(1024)
        return msg.startswith(b"SSH-2.0-OpenSSH")
    except Exception as e:  # pylint: disable=broad-except,invalid-name
        print(str(e))
        return False