"""Anonymous push notification service for Nextcloud.""" # pylint: disable=import-error import json import os import docker from flask import Flask, request app = Flask(__name__) client = docker.from_env() NAME = os.getenv("NC_NAME", "Nimrod Adar") CONTAINER_NAME = os.getenv("NC_CONTAINER", "nextcloud_nextcloud_1") @app.route("/ping") def ping(): """Healthcheck.""" return "pong" @app.route("/send", methods=["GET", "POST"]) def send_message(): # noqa: MC0001 """Send a notification.""" if request.method == "POST": # Needs to be called before accessing other request parameters, # otherwise it will be empty. data = request.get_data(cache=True, as_text=True) if "message" in request.form: message = request.form["message"] elif request.is_json: message = request.get_json()["message"] elif "message" in request.args: message = request.args["message"] else: message = data else: message = request.args["message"] if message is None or message.strip() == "": raise RuntimeError("No message or message is empty.") for c in client.containers.list(): # pylint: disable=invalid-name if c.name == CONTAINER_NAME: container = c break else: raise RuntimeError(f"Failed to find container {CONTAINER_NAME}.") users = json.loads( container.exec_run( "./occ user:list --output json --info", user="www-data" ).output ) for (k, v) in users.items(): # pylint: disable=invalid-name if v["display_name"] == NAME: uid = k break else: raise RuntimeError(f"Failed to find user {NAME}.") result = container.exec_run( f"""./occ notification:generate -- {uid} '{message}'""", user="www-data", ) if result.exit_code != 0: raise RuntimeError(result.output.decode()) return message