nonsensebot/app/modules/ai.py
2024-09-15 23:03:24 +01:00

213 lines
7.6 KiB
Python

"""
This module is an old mockup of an AI chatbot system using ollama and a set of servers.
It is not actively used or maintained anymore.
"""
import asyncio
import json
import logging
from pathlib import Path
from contextlib import asynccontextmanager
import httpx
import typing
from ollama import AsyncClient
import niobot
if typing.TYPE_CHECKING:
from ..main import NonsenseBot
@asynccontextmanager
async def ollama_client(url: str) -> AsyncClient:
client = AsyncClient(url)
async with client._client:
yield client
class AIModule(niobot.Module):
bot: "NonsenseBot"
log = logging.getLogger(__name__)
async def find_server(self, gpu_only: bool = True) -> dict[str, str | bool] | None:
for name, cfg in self.bot.cfg["ollama"].items():
url = cfg["url"]
gpu = cfg["gpu"]
if gpu_only and not gpu:
self.log.info("Skipping %r, need GPU.", name)
continue
async with ollama_client(url) as client:
try:
self.log.info("Trying %r", name)
await client.ps()
except (httpx.HTTPError, ConnectionError):
self.log.warning("%r is offline, trying next.", name)
continue
else:
self.log.info("%r is online.")
return {"name": name, **cfg}
self.log.warning("No suitable ollama server is online.")
@staticmethod
def read_users():
p = Path("./store/users.json")
if not p.exists():
return {}
return json.loads(p.read_text())
@staticmethod
def write_users(users: dict[str, str]):
p = Path("./store/users.json")
with open(p, "w") as _fd:
json.dump(users, _fd)
@niobot.command("whitelist.add")
@niobot.is_owner()
async def whitelist_add(
self, ctx: niobot.Context, user_id: str, model: str = "llama3:latest"
):
"""[Owner] Adds a user to the whitelist."""
users = self.read_users()
users[user_id] = model
self.write_users(users)
await ctx.respond(f"Added {user_id} to the whitelist.")
@niobot.command("whitelist.list")
@niobot.is_owner()
async def whitelist_list(self, ctx: niobot.Context):
"""[Owner] Lists all users in the whitelist."""
users = self.read_users()
if not users:
await ctx.respond("No users in the whitelist.")
return
await ctx.respond("\n".join(f"{k}: {v}" for k, v in users.items()))
@niobot.command("whitelist.remove")
@niobot.is_owner()
async def whitelist_remove(self, ctx: niobot.Context, user_id: str):
"""[Owner] Removes a user from the whitelist."""
users = self.read_users()
if user_id not in users:
await ctx.respond(f"{user_id} not in the whitelist.")
return
del users[user_id]
self.write_users(users)
await ctx.respond(f"Removed {user_id} from the whitelist.")
@niobot.command("ollama.set-model")
@niobot.from_homeserver("nexy7574.co.uk", "nicroxio.co.uk", "shronk.net")
async def set_model(self, ctx: niobot.Context, model: str):
"""Sets the model you want to use."""
users = self.read_users()
if ctx.message.sender not in users:
await ctx.respond("You must be whitelisted first.")
return
users[ctx.message.sender] = model
self.write_users(users)
await ctx.respond(
f"Set model to {model}. Don't forget to pull it with `h!ollama.pull`."
)
@niobot.command("ollama.pull")
@niobot.from_homeserver("nexy7574.co.uk", "nicroxio.co.uk", "shronk.net")
async def pull_model(self, ctx: niobot.Context):
"""Pulls the model you set."""
users = self.read_users()
if ctx.message.sender not in users:
await ctx.respond(
"You need to set a model first. See: `h!help ollama.set-model`"
)
return
model = users[ctx.message.sender]
server = await self.find_server(gpu_only=False)
if not server:
await ctx.respond("No servers available.")
return
msg = await ctx.respond(f"Pulling {model} on {server['name']!r}...")
async with ollama_client(server["url"]) as client:
await client.pull(model)
await msg.edit(f"Pulled model {model}.")
@niobot.command("ollama.chat", greedy=True)
@niobot.from_homeserver("nexy7574.co.uk", "nicroxio.co.uk", "shronk.net")
async def chat(self, ctx: niobot.Context):
"""Chat with the model."""
if "--gpu" in ctx.args:
ctx.args.remove("--gpu")
gpu_only = True
else:
gpu_only = False
try:
message = " ".join(ctx.args)
users = self.read_users()
if ctx.message.sender not in users:
await ctx.respond(
"You need to set a model first. See: `h!help ollama.set-model`"
)
return
model = users[ctx.message.sender]
res = await ctx.respond("Finding server...")
server = await self.find_server(gpu_only=gpu_only)
if not server:
await res.edit(content="No servers available.")
return
async with ollama_client(server["url"]) as client:
await res.edit(content=f"Generating response...")
try:
response = await client.chat(
model, [{"role": "user", "content": message}]
)
except httpx.HTTPError as e:
response = {"message": {"content": f"Error: {e}"}}
await res.edit(content=response["message"]["content"])
except Exception as e:
logging.exception(e)
await ctx.respond(content="An error occurred.")
@niobot.command("ollama.status", aliases=["ollama.ping"])
@niobot.from_homeserver("nexy7574.co.uk", "nicroxio.co.uk", "shronk.net")
async def status(self, ctx: niobot.Context, gpu_only: bool = False):
"""Checks which servers are online."""
lines: dict[str, dict[str, str | None | bool]] = {}
for name, cfg in self.bot.cfg["ollama"].items():
lines[name] = {"url": cfg["url"], "gpu": cfg["gpu"], "online": None}
emojis = {
True: "\N{WHITE HEAVY CHECK MARK}",
False: "\N{CROSS MARK}",
None: "\N{HOURGLASS WITH FLOWING SAND}",
}
def get_lines():
ln = []
for _n, _d in lines.items():
if gpu_only and _d["gpu"] is False:
continue
ln.append(f"* **{_n}**: {emojis[_d['online']]}")
return "\n".join(ln)
response = await ctx.respond(get_lines())
async def ping_task(target_url, target_name):
async with ollama_client(target_url) as client:
try:
self.log.info("[status] Checking %s (%r)", target_name, target_url)
await client.ps()
except (httpx.HTTPError, ConnectionError):
lines[target_name]["online"] = False
else:
lines[target_name]["online"] = True
await response.edit(content=get_lines())
tasks = []
for name, cfg in self.bot.cfg["ollama"].items():
url = cfg["url"]
gpu = cfg["gpu"]
if gpu_only and not gpu:
continue
t = asyncio.create_task(ping_task(url, name))
tasks.append(t)
await asyncio.gather(*tasks, return_exceptions=True)