-
nimrod authored
If the message is an empty string or in the fallback case of using the request data if the message is None, don't send a notification.
nimrod authoredIf the message is an empty string or in the fallback case of using the request data if the message is None, don't send a notification.
app.py 1.95 KiB
"""Anonymous push notification service for Nextcloud."""
# pylint: disable=import-error
import json
import os
import docker
from flask import Flask, request
app = Flask(__name__)
client = docker.from_env()
NAME = os.getenv("NC_NAME", "Nimrod Adar")
CONTAINER_NAME = os.getenv("NC_CONTAINER", "nextcloud_nextcloud_1")
@app.route("/ping")
def ping():
"""Healthcheck."""
return "pong"
@app.route("/send", methods=["GET", "POST"])
def send_message(): # noqa: MC0001
"""Send a notification."""
if request.method == "POST":
# Needs to be called before accessing other request parameters,
# otherwise it will be empty.
data = request.get_data(cache=True, as_text=True)
if "message" in request.form:
message = request.form["message"]
elif request.is_json:
message = request.get_json()["message"]
elif "message" in request.args:
message = request.args["message"]
else:
message = data
else:
message = request.args["message"]
if message is None or message.strip() == "":
raise RuntimeError("No message or message is empty.")
for c in client.containers.list(): # pylint: disable=invalid-name
if c.name == CONTAINER_NAME:
container = c
break
else:
raise RuntimeError(f"Failed to find container {CONTAINER_NAME}.")
users = json.loads(
container.exec_run(
"./occ user:list --output json --info", user="www-data"
).output
)
for (k, v) in users.items(): # pylint: disable=invalid-name
if v["display_name"] == NAME:
uid = k
break
else:
raise RuntimeError(f"Failed to find user {NAME}.")
result = container.exec_run(
f"""./occ notification:generate -- {uid} '{message}'""",
user="www-data",
)
if result.exit_code != 0:
raise RuntimeError(result.output.decode())
return message