diff --git a/docker-compose.yml b/docker-compose.yml index dbb10db..02a512b 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -18,11 +18,14 @@ services: container_name: jimmy-webhook-server image: git.i-am.nexus/nex/college-bot:latest restart: unless-stopped - volumes: - - ./ipc/:/tmp/ipc/ depends_on: - jimmy - command: ["/app/venv/bin/uvicorn", "--host", "0.0.0.0", "--port", "1111", "server:app"] + - redis + environment: + - REDIS=redis://redis:6379 + ports: + - 1111:1111 + command: ["/app/venv/bin/python", "/app/server.py"] ollama: image: ollama/ollama:latest container_name: ollama diff --git a/src/main.py b/src/main.py index 4b0fce6..a243d34 100644 --- a/src/main.py +++ b/src/main.py @@ -112,107 +112,10 @@ for logger in CONFIG["logging"].get("suppress", []): class Client(commands.Bot): def __init_(self, *args, **kwargs): - self.web = None self.uptime_thread = None super().__init__(*args, **kwargs) - async def _webhook_reader(self): - await self.wait_until_ready() - fifo = CONFIG["jimmy"].get("fifo") - if not fifo: - return - if not CONFIG["redis"]: - return - - db = redis.asyncio.Redis(**CONFIG["redis"]) - - while True: - await asyncio.sleep(1) - while not os.path.exists(fifo): - await asyncio.sleep(1) - try: - async with aiofiles.open(fifo, "w") as fifo_fd: - async for line in fifo_fd: - log.debug("Webhook reader got this line: %r", line) - try: - data = json.loads(line) - token = data.get("token") - assert token, "No token in JSON" - except (json.JSONDecodeError, AssertionError) as e: - log.error("Failed to decode JSON from fifo: %r", line, exc_info=e) - await fifo_fd.write( - json.dumps( - { - "status": 400, - "detail": "what the fuck did you give me" - } - ) - ) - else: - if data.get("op") == "ping": - await fifo_fd.write( - json.dumps( - { - "token": token, - "status": 200, - } - ) - ) - elif data.get("id"): - existing = await db.get( - "truth-%s" % data["id"] - ) - if existing: - data_notk = data.copy() - data_notk.pop("token") - existing = json.loads(existing) - if existing == data_notk: - await fifo_fd.write( - json.dumps( - { - "token": token, - "status": 204, - } - ) - ) - else: - await fifo_fd.write( - json.dumps( - { - "token": token, - "status": 409, - "detail": existing - } - ) - ) - else: - await db.set( - "truth-%s" % data["id"], - json.dumps(data) - ) - await fifo_fd.write( - json.dumps( - { - "token": token, - "status": 201, - } - ) - ) - else: - await fifo_fd.write( - json.dumps( - { - "token": token, - "status": 400, - "detail": "Unknown operation" - } - ) - ) - except Exception as err: - log.error("Webhook reader failed", exc_info=err) - continue - async def on_connect(self): if not any(self.walk_application_commands()): log.warning("No application commands. using pending.") @@ -242,19 +145,12 @@ class Client(commands.Bot): CONFIG["jimmy"]["uptime_kuma_url"], CONFIG["jimmy"].get("uptime_kuma_interval", 58.0) ) self.uptime_thread.start() - if CONFIG["jimmy"].get("fifo"): - if getattr(self, "web", None): - self.web.cancel() - self.web = None - self.web = asyncio.create_task(self._webhook_reader()) await super().start(token, reconnect=reconnect) async def close(self) -> None: if getattr(self, "uptime_thread", None): self.uptime_thread.kill.set() await asyncio.get_event_loop().run_in_executor(None, self.uptime_thread.join) - if getattr(self, "web", None): - self.web.cancel() await super().close() diff --git a/src/server.py b/src/server.py index 9be8833..c443940 100644 --- a/src/server.py +++ b/src/server.py @@ -1,18 +1,16 @@ +#!/bin/env python3 import json + +import redis import os import secrets -import asyncio -import aiofiles import typing import time -import uuid -from contextlib import asynccontextmanager from fastapi import FastAPI, Depends, HTTPException -from fastapi.responses import JSONResponse +from fastapi.responses import JSONResponse, Response from fastapi.security import HTTPBasic, HTTPBasicCredentials from pydantic import BaseModel, Field, ValidationError -FIFO = "/tmp/ipc/ipc.socket" JSON: typing.Union[ str, int, float, bool, None, typing.Dict[str, "JSON"], typing.List["JSON"] ] = typing.Union[ @@ -28,22 +26,6 @@ class TruthPayload(BaseModel): extra: typing.Optional[JSON] = None -class UpstreamPayload(BaseModel): - id: uuid.UUID - status: int = 200 - detail: typing.Optional[JSON] = None - - -@asynccontextmanager -async def lifetime(_app): - if os.path.exists(FIFO): - os.remove(FIFO) - os.mkfifo(FIFO) - try: - yield - finally: - os.remove(FIFO) - security = HTTPBasic(realm="Jimmy") USERNAME = os.getenv("WEB_USERNAME", os.urandom(32).hex()) PASSWORD = os.getenv("WEB_PASSWORD", os.urandom(32).hex()) @@ -58,6 +40,15 @@ def check_credentials(credentials: HTTPBasicCredentials = Depends(security)): return credentials +def get_db() -> redis.Redis: + uri = os.getenv("REDIS_URL", "redis://redis") + conn = redis.Redis.from_url(uri) + try: + yield conn + finally: + conn.close() + + app = FastAPI( title="Jimmy v3 API", version="3.0.0", @@ -66,54 +57,75 @@ app = FastAPI( ) -async def get_upstream_response(fd, token: str) -> UpstreamPayload: - t = 0 - async for line in fd: - if t >= 5: - break - t += 1 - try: - upstream_response: UpstreamPayload = UpstreamPayload.model_validate_json(line) - if upstream_response.id.hex == token: - return upstream_response - except ValidationError: - continue - raise RuntimeError("Timeout while waiting for response from upstream") +@app.get("/api/truths/all") +def get_all_truths(db: redis.Redis = Depends(get_db)): + """Retrieves all stored truths""" + keys = db.keys() + truths = [json.loads(db.get(key)) for key in keys] + return truths -@app.post("/api/truths", response_model=UpstreamPayload) -async def post_truth(payload: TruthPayload, response: JSONResponse): - token = uuid.uuid4().hex +@app.get("/api/truths/{truth_id}") +def get_truth(truth_id: str, db: redis.Redis = Depends(get_db)): + """Retrieves a stored truth""" + data = db.get(truth_id) + if not data: + raise HTTPException(404, detail="%r not found." % id) + return json.loads(data) + + +@app.head("/api/truths/{truth_id}") +def head_truth(truth_id: str, db: redis.Redis = Depends(get_db)): + """Checks that a truth exists""" + data = db.get(truth_id) + if not data: + raise HTTPException(404) + return Response() + + +@app.post("/api/truths", status_code=201) +def post_truth(payload: TruthPayload, response: JSONResponse, db: redis.Redis = Depends(get_db)): + """Stores a new truth""" data = payload.model_dump() - data["token"] = token - async with aiofiles.open(FIFO, "w") as fifo: - await fifo.write(payload.json()) - try: - upstream_response = await asyncio.wait_for(get_upstream_response(fifo, token), timeout=5) - except (asyncio.TimeoutError, RuntimeError): - raise HTTPException( - 500, - detail="Timeout while waiting for response from upstream" - ) - if upstream_response.id.hex == token: - response.status_code = upstream_response.status - return upstream_response + existing = db.get(data["id"]) + if existing: + parsed = json.loads(existing) + if parsed == existing: + return Response(status_code=204) + raise HTTPException(409, detail="%r already exists." % data["id"]) + db.set(data["id"], json.dumps(data)) + response.status_code = 201 + return data + + +@app.put("/api/truths/{truth_id}") +def put_truth(truth_id: str, payload: TruthPayload, db: redis.Redis = Depends(get_db)): + """Replaces a stored truth""" + data = payload.model_dump() + existing = db.get(truth_id) + if not existing: + raise HTTPException(404, detail="%r not found." % truth_id) + db.set(truth_id, json.dumps(data)) + return data + + +@app.delete("/api/truths/{truth_id}", status_code=204) +def delete_truth(truth_id: str, db: redis.Redis = Depends(get_db)): + """Deletes a stored truth""" + if not db.delete(truth_id): + raise HTTPException(404, detail="%r not found." % truth_id) + return Response(status_code=204) @app.get("/api/health") -async def health(response: JSONResponse): - payload = { - "token": uuid.uuid4().hex, - "op": "ping" - } - async with aiofiles.open(FIFO, "w") as fifo: - fifo.write(json.dumps(payload)) - try: - upstream_response = await asyncio.wait_for(get_upstream_response(fifo, payload["token"]), timeout=5) - except (asyncio.TimeoutError, RuntimeError): - raise HTTPException( - 500, - detail="Timeout while waiting for response from upstream" - ) - response.status_code = upstream_response.status - return upstream_response +def health(db: redis.Redis = Depends(get_db)): + try: + db.ping() + except ConnectionError: + raise HTTPException(500, detail="Database connection error") + return {"status": "ok"} + + +if __name__ == "__main__": + import uvicorn + uvicorn.run(app, host="0.0.0.0", port=1111, forwarded_allow_ips="*")