idk why I ever tried IPC there
All checks were successful
Build and Publish college-bot-v2 / build_and_publish (push) Successful in 13s
All checks were successful
Build and Publish college-bot-v2 / build_and_publish (push) Successful in 13s
This commit is contained in:
parent
babc888422
commit
65acd62d12
3 changed files with 85 additions and 174 deletions
|
@ -18,11 +18,14 @@ services:
|
|||
container_name: jimmy-webhook-server
|
||||
image: git.i-am.nexus/nex/college-bot:latest
|
||||
restart: unless-stopped
|
||||
volumes:
|
||||
- ./ipc/:/tmp/ipc/
|
||||
depends_on:
|
||||
- jimmy
|
||||
command: ["/app/venv/bin/uvicorn", "--host", "0.0.0.0", "--port", "1111", "server:app"]
|
||||
- redis
|
||||
environment:
|
||||
- REDIS=redis://redis:6379
|
||||
ports:
|
||||
- 1111:1111
|
||||
command: ["/app/venv/bin/python", "/app/server.py"]
|
||||
ollama:
|
||||
image: ollama/ollama:latest
|
||||
container_name: ollama
|
||||
|
|
104
src/main.py
104
src/main.py
|
@ -112,107 +112,10 @@ for logger in CONFIG["logging"].get("suppress", []):
|
|||
|
||||
class Client(commands.Bot):
|
||||
def __init_(self, *args, **kwargs):
|
||||
self.web = None
|
||||
self.uptime_thread = None
|
||||
|
||||
super().__init__(*args, **kwargs)
|
||||
|
||||
async def _webhook_reader(self):
|
||||
await self.wait_until_ready()
|
||||
fifo = CONFIG["jimmy"].get("fifo")
|
||||
if not fifo:
|
||||
return
|
||||
if not CONFIG["redis"]:
|
||||
return
|
||||
|
||||
db = redis.asyncio.Redis(**CONFIG["redis"])
|
||||
|
||||
while True:
|
||||
await asyncio.sleep(1)
|
||||
while not os.path.exists(fifo):
|
||||
await asyncio.sleep(1)
|
||||
try:
|
||||
async with aiofiles.open(fifo, "w") as fifo_fd:
|
||||
async for line in fifo_fd:
|
||||
log.debug("Webhook reader got this line: %r", line)
|
||||
try:
|
||||
data = json.loads(line)
|
||||
token = data.get("token")
|
||||
assert token, "No token in JSON"
|
||||
except (json.JSONDecodeError, AssertionError) as e:
|
||||
log.error("Failed to decode JSON from fifo: %r", line, exc_info=e)
|
||||
await fifo_fd.write(
|
||||
json.dumps(
|
||||
{
|
||||
"status": 400,
|
||||
"detail": "what the fuck did you give me"
|
||||
}
|
||||
)
|
||||
)
|
||||
else:
|
||||
if data.get("op") == "ping":
|
||||
await fifo_fd.write(
|
||||
json.dumps(
|
||||
{
|
||||
"token": token,
|
||||
"status": 200,
|
||||
}
|
||||
)
|
||||
)
|
||||
elif data.get("id"):
|
||||
existing = await db.get(
|
||||
"truth-%s" % data["id"]
|
||||
)
|
||||
if existing:
|
||||
data_notk = data.copy()
|
||||
data_notk.pop("token")
|
||||
existing = json.loads(existing)
|
||||
if existing == data_notk:
|
||||
await fifo_fd.write(
|
||||
json.dumps(
|
||||
{
|
||||
"token": token,
|
||||
"status": 204,
|
||||
}
|
||||
)
|
||||
)
|
||||
else:
|
||||
await fifo_fd.write(
|
||||
json.dumps(
|
||||
{
|
||||
"token": token,
|
||||
"status": 409,
|
||||
"detail": existing
|
||||
}
|
||||
)
|
||||
)
|
||||
else:
|
||||
await db.set(
|
||||
"truth-%s" % data["id"],
|
||||
json.dumps(data)
|
||||
)
|
||||
await fifo_fd.write(
|
||||
json.dumps(
|
||||
{
|
||||
"token": token,
|
||||
"status": 201,
|
||||
}
|
||||
)
|
||||
)
|
||||
else:
|
||||
await fifo_fd.write(
|
||||
json.dumps(
|
||||
{
|
||||
"token": token,
|
||||
"status": 400,
|
||||
"detail": "Unknown operation"
|
||||
}
|
||||
)
|
||||
)
|
||||
except Exception as err:
|
||||
log.error("Webhook reader failed", exc_info=err)
|
||||
continue
|
||||
|
||||
async def on_connect(self):
|
||||
if not any(self.walk_application_commands()):
|
||||
log.warning("No application commands. using pending.")
|
||||
|
@ -242,19 +145,12 @@ class Client(commands.Bot):
|
|||
CONFIG["jimmy"]["uptime_kuma_url"], CONFIG["jimmy"].get("uptime_kuma_interval", 58.0)
|
||||
)
|
||||
self.uptime_thread.start()
|
||||
if CONFIG["jimmy"].get("fifo"):
|
||||
if getattr(self, "web", None):
|
||||
self.web.cancel()
|
||||
self.web = None
|
||||
self.web = asyncio.create_task(self._webhook_reader())
|
||||
await super().start(token, reconnect=reconnect)
|
||||
|
||||
async def close(self) -> None:
|
||||
if getattr(self, "uptime_thread", None):
|
||||
self.uptime_thread.kill.set()
|
||||
await asyncio.get_event_loop().run_in_executor(None, self.uptime_thread.join)
|
||||
if getattr(self, "web", None):
|
||||
self.web.cancel()
|
||||
await super().close()
|
||||
|
||||
|
||||
|
|
146
src/server.py
146
src/server.py
|
@ -1,18 +1,16 @@
|
|||
#!/bin/env python3
|
||||
import json
|
||||
|
||||
import redis
|
||||
import os
|
||||
import secrets
|
||||
import asyncio
|
||||
import aiofiles
|
||||
import typing
|
||||
import time
|
||||
import uuid
|
||||
from contextlib import asynccontextmanager
|
||||
from fastapi import FastAPI, Depends, HTTPException
|
||||
from fastapi.responses import JSONResponse
|
||||
from fastapi.responses import JSONResponse, Response
|
||||
from fastapi.security import HTTPBasic, HTTPBasicCredentials
|
||||
from pydantic import BaseModel, Field, ValidationError
|
||||
|
||||
FIFO = "/tmp/ipc/ipc.socket"
|
||||
JSON: typing.Union[
|
||||
str, int, float, bool, None, typing.Dict[str, "JSON"], typing.List["JSON"]
|
||||
] = typing.Union[
|
||||
|
@ -28,22 +26,6 @@ class TruthPayload(BaseModel):
|
|||
extra: typing.Optional[JSON] = None
|
||||
|
||||
|
||||
class UpstreamPayload(BaseModel):
|
||||
id: uuid.UUID
|
||||
status: int = 200
|
||||
detail: typing.Optional[JSON] = None
|
||||
|
||||
|
||||
@asynccontextmanager
|
||||
async def lifetime(_app):
|
||||
if os.path.exists(FIFO):
|
||||
os.remove(FIFO)
|
||||
os.mkfifo(FIFO)
|
||||
try:
|
||||
yield
|
||||
finally:
|
||||
os.remove(FIFO)
|
||||
|
||||
security = HTTPBasic(realm="Jimmy")
|
||||
USERNAME = os.getenv("WEB_USERNAME", os.urandom(32).hex())
|
||||
PASSWORD = os.getenv("WEB_PASSWORD", os.urandom(32).hex())
|
||||
|
@ -58,6 +40,15 @@ def check_credentials(credentials: HTTPBasicCredentials = Depends(security)):
|
|||
return credentials
|
||||
|
||||
|
||||
def get_db() -> redis.Redis:
|
||||
uri = os.getenv("REDIS_URL", "redis://redis")
|
||||
conn = redis.Redis.from_url(uri)
|
||||
try:
|
||||
yield conn
|
||||
finally:
|
||||
conn.close()
|
||||
|
||||
|
||||
app = FastAPI(
|
||||
title="Jimmy v3 API",
|
||||
version="3.0.0",
|
||||
|
@ -66,54 +57,75 @@ app = FastAPI(
|
|||
)
|
||||
|
||||
|
||||
async def get_upstream_response(fd, token: str) -> UpstreamPayload:
|
||||
t = 0
|
||||
async for line in fd:
|
||||
if t >= 5:
|
||||
break
|
||||
t += 1
|
||||
try:
|
||||
upstream_response: UpstreamPayload = UpstreamPayload.model_validate_json(line)
|
||||
if upstream_response.id.hex == token:
|
||||
return upstream_response
|
||||
except ValidationError:
|
||||
continue
|
||||
raise RuntimeError("Timeout while waiting for response from upstream")
|
||||
@app.get("/api/truths/all")
|
||||
def get_all_truths(db: redis.Redis = Depends(get_db)):
|
||||
"""Retrieves all stored truths"""
|
||||
keys = db.keys()
|
||||
truths = [json.loads(db.get(key)) for key in keys]
|
||||
return truths
|
||||
|
||||
|
||||
@app.post("/api/truths", response_model=UpstreamPayload)
|
||||
async def post_truth(payload: TruthPayload, response: JSONResponse):
|
||||
token = uuid.uuid4().hex
|
||||
@app.get("/api/truths/{truth_id}")
|
||||
def get_truth(truth_id: str, db: redis.Redis = Depends(get_db)):
|
||||
"""Retrieves a stored truth"""
|
||||
data = db.get(truth_id)
|
||||
if not data:
|
||||
raise HTTPException(404, detail="%r not found." % id)
|
||||
return json.loads(data)
|
||||
|
||||
|
||||
@app.head("/api/truths/{truth_id}")
|
||||
def head_truth(truth_id: str, db: redis.Redis = Depends(get_db)):
|
||||
"""Checks that a truth exists"""
|
||||
data = db.get(truth_id)
|
||||
if not data:
|
||||
raise HTTPException(404)
|
||||
return Response()
|
||||
|
||||
|
||||
@app.post("/api/truths", status_code=201)
|
||||
def post_truth(payload: TruthPayload, response: JSONResponse, db: redis.Redis = Depends(get_db)):
|
||||
"""Stores a new truth"""
|
||||
data = payload.model_dump()
|
||||
data["token"] = token
|
||||
async with aiofiles.open(FIFO, "w") as fifo:
|
||||
await fifo.write(payload.json())
|
||||
try:
|
||||
upstream_response = await asyncio.wait_for(get_upstream_response(fifo, token), timeout=5)
|
||||
except (asyncio.TimeoutError, RuntimeError):
|
||||
raise HTTPException(
|
||||
500,
|
||||
detail="Timeout while waiting for response from upstream"
|
||||
)
|
||||
if upstream_response.id.hex == token:
|
||||
response.status_code = upstream_response.status
|
||||
return upstream_response
|
||||
existing = db.get(data["id"])
|
||||
if existing:
|
||||
parsed = json.loads(existing)
|
||||
if parsed == existing:
|
||||
return Response(status_code=204)
|
||||
raise HTTPException(409, detail="%r already exists." % data["id"])
|
||||
db.set(data["id"], json.dumps(data))
|
||||
response.status_code = 201
|
||||
return data
|
||||
|
||||
|
||||
@app.put("/api/truths/{truth_id}")
|
||||
def put_truth(truth_id: str, payload: TruthPayload, db: redis.Redis = Depends(get_db)):
|
||||
"""Replaces a stored truth"""
|
||||
data = payload.model_dump()
|
||||
existing = db.get(truth_id)
|
||||
if not existing:
|
||||
raise HTTPException(404, detail="%r not found." % truth_id)
|
||||
db.set(truth_id, json.dumps(data))
|
||||
return data
|
||||
|
||||
|
||||
@app.delete("/api/truths/{truth_id}", status_code=204)
|
||||
def delete_truth(truth_id: str, db: redis.Redis = Depends(get_db)):
|
||||
"""Deletes a stored truth"""
|
||||
if not db.delete(truth_id):
|
||||
raise HTTPException(404, detail="%r not found." % truth_id)
|
||||
return Response(status_code=204)
|
||||
|
||||
|
||||
@app.get("/api/health")
|
||||
async def health(response: JSONResponse):
|
||||
payload = {
|
||||
"token": uuid.uuid4().hex,
|
||||
"op": "ping"
|
||||
}
|
||||
async with aiofiles.open(FIFO, "w") as fifo:
|
||||
fifo.write(json.dumps(payload))
|
||||
try:
|
||||
upstream_response = await asyncio.wait_for(get_upstream_response(fifo, payload["token"]), timeout=5)
|
||||
except (asyncio.TimeoutError, RuntimeError):
|
||||
raise HTTPException(
|
||||
500,
|
||||
detail="Timeout while waiting for response from upstream"
|
||||
)
|
||||
response.status_code = upstream_response.status
|
||||
return upstream_response
|
||||
def health(db: redis.Redis = Depends(get_db)):
|
||||
try:
|
||||
db.ping()
|
||||
except ConnectionError:
|
||||
raise HTTPException(500, detail="Database connection error")
|
||||
return {"status": "ok"}
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
import uvicorn
|
||||
uvicorn.run(app, host="0.0.0.0", port=1111, forwarded_allow_ips="*")
|
||||
|
|
Loading…
Reference in a new issue