diff --git a/docker-compose.yml b/docker-compose.yml index cc57d5d..c1400ab 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -9,10 +9,20 @@ services: - ./jimmy.log:/app/jimmy.log - /dev/dri:/dev/dri - jimmy-data:/app/data + - ./ipc:/tmp/college-bot-ipc extra_hosts: - host.docker.internal:host-gateway depends_on: - redis # you can remove this if you remove the ollama and starboard cogs. + jimmy-webhook-server: + container_name: jimmy-webhook-server + image: git.i-am.nexus/nex/college-bot:latest + restart: unless-stopped + volumes: + - ./ipc:/tmp/college-bot-ipc + depends_on: + - jimmy + command: ["python", "server.py"] ollama: image: ollama/ollama:latest container_name: ollama diff --git a/requirements.txt b/requirements.txt index eb92f63..8fc8697 100644 --- a/requirements.txt +++ b/requirements.txt @@ -20,3 +20,6 @@ beautifulsoup4~=4.12 lxml~=5.1 matplotlib~=3.8 python-magic~=0.4 +aiofiles~=23.2 +fastapi~=0.111 +uvicorn[standard]~=0.30 diff --git a/src/main.py b/src/main.py index b41c45c..39ed58b 100644 --- a/src/main.py +++ b/src/main.py @@ -1,7 +1,10 @@ import asyncio import datetime import glob +import json import logging +import os +import redis import random import shutil import sys @@ -11,6 +14,7 @@ import typing from logging import FileHandler from threading import Event, Thread +import aiofiles import discord import httpx from discord.ext import commands @@ -111,6 +115,101 @@ class Client(commands.Bot): super().__init__(*args, **kwargs) self.web: typing.Optional[asyncio.Task] = None self.uptime_thread = None + + async def _webhook_reader(self): + await self.wait_until_ready() + fifo = CONFIG["jimmy"].get("fifo") + if not fifo: + return + if not CONFIG["redis"]: + return + + db = redis.asyncio.Redis(**CONFIG["redis"]) + + while True: + while not os.path.exists(fifo): + await asyncio.sleep(1) + try: + async with aiofiles.open(fifo, "a") as fifo_fd: + async for line in fifo_fd: + log.debug("Webhook reader got this line: %r", line) + try: + data = json.loads(line) + token = data.get("token") + assert token, "No token in JSON" + except (json.JSONDecodeError, AssertionError) as e: + log.error("Failed to decode JSON from fifo: %r", line, exc_info=e) + await fifo_fd.write( + json.dumps( + { + "status": 400, + "detail": "what the fuck did you give me" + } + ) + ) + else: + if data.get("op") == "ping": + await fifo_fd.write( + json.dumps( + { + "token": token, + "status": 200, + } + ) + ) + elif data.get("id"): + existing = await db.get( + "truth-%s" % data["id"] + ) + if existing: + data_notk = data.copy() + data_notk.pop("token") + existing = json.loads(existing) + if existing == data_notk: + await fifo_fd.write( + json.dumps( + { + "token": token, + "status": 204, + } + ) + ) + else: + await fifo_fd.write( + json.dumps( + { + "token": token, + "status": 409, + "detail": existing + } + ) + ) + else: + await db.set( + "truth-%s" % data["id"], + json.dumps(data) + ) + await fifo_fd.write( + json.dumps( + { + "token": token, + "status": 201, + } + ) + ) + else: + await fifo_fd.write( + json.dumps( + { + "token": token, + "status": 400, + "detail": "Unknown operation" + } + ) + ) + except Exception as err: + log.error("Webhook reader failed", exc_info=err) + continue async def on_connect(self): if not any(self.walk_application_commands()): diff --git a/src/server.py b/src/server.py new file mode 100644 index 0000000..86da020 --- /dev/null +++ b/src/server.py @@ -0,0 +1,117 @@ +import json +import os +import secrets +import asyncio +import aiofiles +import typing +import time +import uuid +from contextlib import asynccontextmanager +from fastapi import FastAPI, Depends, HTTPException +from fastapi.responses import JSONResponse +from fastapi.security import HTTPBasic, HTTPBasicCredentials +from pydantic import BaseModel, Field, ValidationError + +FIFO = "/tmp/college-bot-ipc" +JSON: typing.Union[ + str, int, float, bool, None, typing.Dict[str, "JSON"], typing.List["JSON"] +] + + +class TruthPayload(BaseModel): + id: str + content: str + author: typing.Literal["trump", "tate"] = Field(regex=r"^(trump|tate)$") + timestamp: float = Field(default_factory=time.time, ge=0) + extra: typing.Optional[JSON] = None + + +class UpstreamPayload(BaseModel): + id: uuid.UUID + status: int = 200 + detail: typing.Optional[JSON] = None + + +@asynccontextmanager +async def lifetime(_app): + if os.path.exists(FIFO): + os.remove(FIFO) + os.mkfifo(FIFO) + try: + yield + finally: + os.remove(FIFO) + +security = HTTPBasic(realm="Jimmy") +USERNAME = os.getenv("WEB_USERNAME", os.urandom(32).hex()) +PASSWORD = os.getenv("WEB_PASSWORD", os.urandom(32).hex()) + + +def check_credentials(credentials: HTTPBasicCredentials = Depends(security)): + err = HTTPException(status_code=401, detail="Unauthorized") + if credentials.username != USERNAME: + raise err + if not secrets.compare_digest(credentials.password, PASSWORD): + raise err + return credentials + + +app = FastAPI( + title="Jimmy v3 API", + version="3.0.0", + dependencies=[Depends(check_credentials)], + root_path=os.getenv("WEB_ROOT_PATH") +) + + +async def get_upstream_response(fd, token: str) -> UpstreamPayload: + t = 0 + async for line in fd: + if t >= 5: + break + t += 1 + try: + upstream_response: UpstreamPayload = UpstreamPayload.model_validate_json(line) + if upstream_response.id.hex == token: + return upstream_response + except ValidationError: + continue + raise RuntimeError("Timeout while waiting for response from upstream") + + +@app.post("/api/truths", response_model=UpstreamPayload) +async def post_truth(payload: TruthPayload, response: JSONResponse): + token = uuid.uuid4().hex + data = payload.model_dump() + data["token"] = token + async with aiofiles.open(FIFO, "w") as fifo: + await fifo.write(payload.json()) + try: + upstream_response = await asyncio.wait_for(get_upstream_response(fifo, token), timeout=5) + except (asyncio.TimeoutError, RuntimeError): + raise HTTPException( + 500, + detail="Timeout while waiting for response from upstream" + ) + if upstream_response.id.hex == token: + response.status_code = upstream_response.status + return upstream_response + + +@app.get("/api/health") +async def health(response: JSONResponse): + payload = { + "token": uuid.uuid4().hex, + "op": "ping" + } + async with aiofiles.open(FIFO, "w") as fifo: + fifo.write(json.dumps(payload)) + try: + upstream_response = await asyncio.wait_for(get_upstream_response(fifo, payload["token"]), timeout=5) + except (asyncio.TimeoutError, RuntimeError): + raise HTTPException( + 500, + detail="Timeout while waiting for response from upstream" + ) + response.status_code = upstream_response.status + return upstream_response