nonsensebot/app/modules/ai.py
2024-07-29 00:36:32 +01:00

199 lines
7.2 KiB
Python

import json
import logging
from pathlib import Path
from contextlib import asynccontextmanager
import httpx
import typing
from ollama import AsyncClient
import niobot
if typing.TYPE_CHECKING:
from ..main import TortoiseIntegratedBot
@asynccontextmanager
async def ollama_client(url: str) -> AsyncClient:
client = AsyncClient(url)
async with client._client:
yield client
class AIModule(niobot.Module):
bot: "TortoiseIntegratedBot"
log = logging.getLogger(__name__)
async def find_server(self, gpu_only: bool = True) -> dict[str, str | bool] | None:
for name, cfg in self.bot.cfg["ollama"].items():
url = cfg["url"]
gpu = cfg["gpu"]
if gpu_only and not gpu:
self.log.info("Skipping %r, need GPU.", name)
continue
async with ollama_client(url) as client:
try:
self.log.info("Trying %r", name)
await client.ps()
except (httpx.HTTPError, ConnectionError):
self.log.warning("%r is offline, trying next.", name)
continue
else:
self.log.info("%r is online.")
return {"name": name, **cfg}
self.log.warning("No suitable ollama server is online.")
@staticmethod
def read_users():
p = Path("./store/users.json")
if not p.exists():
return {}
return json.loads(p.read_text())
@staticmethod
def write_users(users: dict[str, str]):
p = Path("./store/users.json")
with open(p, "w") as _fd:
json.dump(users, _fd)
@niobot.command("ping")
async def ping_command(self, ctx: niobot.Context):
"""Checks the bot is running."""
reply = await ctx.respond("Pong!")
server = await self.find_server(gpu_only=False)
if not server:
await reply.edit("Pong :(\nNo servers available.")
return
await reply.edit(f"Pong!\nSelected server: {server['name']}")
@niobot.command("whitelist.add")
@niobot.is_owner()
async def whitelist_add(self, ctx: niobot.Context, user_id: str, model: str = "llama3:latest"):
"""[Owner] Adds a user to the whitelist."""
users = self.read_users()
users[user_id] = model
self.write_users(users)
await ctx.respond(f"Added {user_id} to the whitelist.")
@niobot.command("whitelist.list")
@niobot.is_owner()
async def whitelist_list(self, ctx: niobot.Context):
"""[Owner] Lists all users in the whitelist."""
users = self.read_users()
if not users:
await ctx.respond("No users in the whitelist.")
return
await ctx.respond("\n".join(f"{k}: {v}" for k, v in users.items()))
@niobot.command("whitelist.remove")
@niobot.is_owner()
async def whitelist_remove(self, ctx: niobot.Context, user_id: str):
"""[Owner] Removes a user from the whitelist."""
users = self.read_users()
if user_id not in users:
await ctx.respond(f"{user_id} not in the whitelist.")
return
del users[user_id]
self.write_users(users)
await ctx.respond(f"Removed {user_id} from the whitelist.")
@niobot.command("ollama.set-model")
async def set_model(self, ctx: niobot.Context, model: str):
"""Sets the model you want to use."""
users = self.read_users()
if ctx.message.sender not in users:
await ctx.respond("You must be whitelisted first.")
return
users[ctx.message.sender] = model
self.write_users(users)
await ctx.respond(f"Set model to {model}. Don't forget to pull it with `h!ollama.pull`.")
@niobot.command("ollama.pull")
async def pull_model(self, ctx: niobot.Context):
"""Pulls the model you set."""
users = self.read_users()
if ctx.message.sender not in users:
await ctx.respond("You need to set a model first. See: `h!help ollama.set-model`")
return
model = users[ctx.message.sender]
server = await self.find_server(gpu_only=False)
if not server:
await ctx.respond("No servers available.")
return
msg = await ctx.respond(f"Pulling {model} on {server['name']!r}...")
async with ollama_client(server["url"]) as client:
await client.pull(model)
await msg.edit(f"Pulled model {model}.")
@niobot.command("ollama.chat", greedy=True)
async def chat(self, ctx: niobot.Context):
"""Chat with the model."""
if "--gpu" in ctx.args:
ctx.args.remove("--gpu")
gpu_only = True
else:
gpu_only = False
try:
message = " ".join(ctx.args)
users = self.read_users()
if ctx.message.sender not in users:
await ctx.respond("You need to set a model first. See: `h!help ollama.set-model`")
return
model = users[ctx.message.sender]
res = await ctx.respond("Finding server...")
server = await self.find_server(gpu_only=gpu_only)
if not server:
await res.edit(content="No servers available.")
return
async with ollama_client(server["url"]) as client:
await res.edit(content=f"Generating response...")
try:
response = await client.chat(model, [{"role": "user", "content": message}])
except httpx.HTTPError as e:
response = {"message": {"content": f"Error: {e}"}}
await res.edit(content=response["message"]["content"])
except Exception as e:
logging.exception(e)
await ctx.respond(content="An error occurred.")
@niobot.command("ollama.status")
async def status(self, ctx: niobot.Context, gpu_only: bool = False):
"""Checks which servers are online."""
lines: dict[str, dict[str, str | None | bool]] = {}
for name, cfg in self.bot.cfg["ollama"].items():
lines[name] = {
"url": cfg["url"],
"gpu": cfg["gpu"],
"online": None
}
emojis = {
True: "\N{white heavy check mark}",
False: "\N{cross mark}",
None: "\N{hourglass with flowing sand}"
}
def get_lines():
ln = []
for _n, _d in lines.items():
if gpu_only and _d["gpu"] is False:
continue
ln.append(f"* **{_n}**: {emojis[_d['online']]}")
return "\n".join(ln)
response = await ctx.respond(get_lines())
for name, cfg in self.bot.cfg["ollama"].items():
url = cfg["url"]
gpu = cfg["gpu"]
if gpu_only and not gpu:
continue
async with ollama_client(url) as client:
try:
await client.ps()
except (httpx.HTTPError, ConnectionError):
lines[name]["online"] = False
else:
lines[name]["online"] = True
await response.edit(content=get_lines())