""" This module is an old mockup of an AI chatbot system using ollama and a set of servers. It is not actively used or maintained anymore. """ import asyncio import json import logging from pathlib import Path from contextlib import asynccontextmanager import httpx import typing from ollama import AsyncClient import niobot if typing.TYPE_CHECKING: from ..main import TortoiseIntegratedBot @asynccontextmanager async def ollama_client(url: str) -> AsyncClient: client = AsyncClient(url) async with client._client: yield client class AIModule(niobot.Module): bot: "TortoiseIntegratedBot" log = logging.getLogger(__name__) async def find_server(self, gpu_only: bool = True) -> dict[str, str | bool] | None: for name, cfg in self.bot.cfg["ollama"].items(): url = cfg["url"] gpu = cfg["gpu"] if gpu_only and not gpu: self.log.info("Skipping %r, need GPU.", name) continue async with ollama_client(url) as client: try: self.log.info("Trying %r", name) await client.ps() except (httpx.HTTPError, ConnectionError): self.log.warning("%r is offline, trying next.", name) continue else: self.log.info("%r is online.") return {"name": name, **cfg} self.log.warning("No suitable ollama server is online.") @staticmethod def read_users(): p = Path("./store/users.json") if not p.exists(): return {} return json.loads(p.read_text()) @staticmethod def write_users(users: dict[str, str]): p = Path("./store/users.json") with open(p, "w") as _fd: json.dump(users, _fd) @niobot.command("whitelist.add") @niobot.is_owner() async def whitelist_add(self, ctx: niobot.Context, user_id: str, model: str = "llama3:latest"): """[Owner] Adds a user to the whitelist.""" users = self.read_users() users[user_id] = model self.write_users(users) await ctx.respond(f"Added {user_id} to the whitelist.") @niobot.command("whitelist.list") @niobot.is_owner() async def whitelist_list(self, ctx: niobot.Context): """[Owner] Lists all users in the whitelist.""" users = self.read_users() if not users: await ctx.respond("No users in the whitelist.") return await ctx.respond("\n".join(f"{k}: {v}" for k, v in users.items())) @niobot.command("whitelist.remove") @niobot.is_owner() async def whitelist_remove(self, ctx: niobot.Context, user_id: str): """[Owner] Removes a user from the whitelist.""" users = self.read_users() if user_id not in users: await ctx.respond(f"{user_id} not in the whitelist.") return del users[user_id] self.write_users(users) await ctx.respond(f"Removed {user_id} from the whitelist.") @niobot.command("ollama.set-model") @niobot.from_homeserver("nexy7574.co.uk", "nicroxio.co.uk", "shronk.net") async def set_model(self, ctx: niobot.Context, model: str): """Sets the model you want to use.""" users = self.read_users() if ctx.message.sender not in users: await ctx.respond("You must be whitelisted first.") return users[ctx.message.sender] = model self.write_users(users) await ctx.respond(f"Set model to {model}. Don't forget to pull it with `h!ollama.pull`.") @niobot.command("ollama.pull") @niobot.from_homeserver("nexy7574.co.uk", "nicroxio.co.uk", "shronk.net") async def pull_model(self, ctx: niobot.Context): """Pulls the model you set.""" users = self.read_users() if ctx.message.sender not in users: await ctx.respond("You need to set a model first. See: `h!help ollama.set-model`") return model = users[ctx.message.sender] server = await self.find_server(gpu_only=False) if not server: await ctx.respond("No servers available.") return msg = await ctx.respond(f"Pulling {model} on {server['name']!r}...") async with ollama_client(server["url"]) as client: await client.pull(model) await msg.edit(f"Pulled model {model}.") @niobot.command("ollama.chat", greedy=True) @niobot.from_homeserver("nexy7574.co.uk", "nicroxio.co.uk", "shronk.net") async def chat(self, ctx: niobot.Context): """Chat with the model.""" if "--gpu" in ctx.args: ctx.args.remove("--gpu") gpu_only = True else: gpu_only = False try: message = " ".join(ctx.args) users = self.read_users() if ctx.message.sender not in users: await ctx.respond("You need to set a model first. See: `h!help ollama.set-model`") return model = users[ctx.message.sender] res = await ctx.respond("Finding server...") server = await self.find_server(gpu_only=gpu_only) if not server: await res.edit(content="No servers available.") return async with ollama_client(server["url"]) as client: await res.edit(content=f"Generating response...") try: response = await client.chat(model, [{"role": "user", "content": message}]) except httpx.HTTPError as e: response = {"message": {"content": f"Error: {e}"}} await res.edit(content=response["message"]["content"]) except Exception as e: logging.exception(e) await ctx.respond(content="An error occurred.") @niobot.command("ollama.status", aliases=["ollama.ping"]) @niobot.from_homeserver("nexy7574.co.uk", "nicroxio.co.uk", "shronk.net") async def status(self, ctx: niobot.Context, gpu_only: bool = False): """Checks which servers are online.""" lines: dict[str, dict[str, str | None | bool]] = {} for name, cfg in self.bot.cfg["ollama"].items(): lines[name] = { "url": cfg["url"], "gpu": cfg["gpu"], "online": None } emojis = { True: "\N{white heavy check mark}", False: "\N{cross mark}", None: "\N{hourglass with flowing sand}" } def get_lines(): ln = [] for _n, _d in lines.items(): if gpu_only and _d["gpu"] is False: continue ln.append(f"* **{_n}**: {emojis[_d['online']]}") return "\n".join(ln) response = await ctx.respond(get_lines()) async def ping_task(target_url, target_name): async with ollama_client(target_url) as client: try: self.log.info("[status] Checking %s (%r)", target_name, target_url) await client.ps() except (httpx.HTTPError, ConnectionError): lines[target_name]["online"] = False else: lines[target_name]["online"] = True await response.edit(content=get_lines()) tasks = [] for name, cfg in self.bot.cfg["ollama"].items(): url = cfg["url"] gpu = cfg["gpu"] if gpu_only and not gpu: continue t = asyncio.create_task(ping_task(url, name)) tasks.append(t) await asyncio.gather(*tasks, return_exceptions=True)