108 lines
3 KiB
Python
108 lines
3 KiB
Python
# import sys
|
|
import asyncio
|
|
import hashlib
|
|
import sys
|
|
|
|
import niobot
|
|
import tomllib
|
|
import platform
|
|
import logging
|
|
from pathlib import Path
|
|
|
|
import redis.asyncio as redis
|
|
|
|
|
|
log = logging.getLogger(__name__)
|
|
logging.getLogger("nio.rooms").setLevel(logging.WARNING)
|
|
logging.getLogger("nio.crypto.log").setLevel(logging.WARNING)
|
|
logging.getLogger("peewee").setLevel(logging.WARNING)
|
|
logging.getLogger("nio.responses").setLevel(logging.WARNING)
|
|
|
|
|
|
with open("config.toml", "rb") as fd:
|
|
config = tomllib.load(fd)
|
|
|
|
|
|
(store := (Path.cwd() / "store")).mkdir(parents=True, exist_ok=True)
|
|
|
|
|
|
class NonsenseBot(niobot.NioBot):
|
|
cfg: dict
|
|
|
|
def __init__(self, *args, **kwargs):
|
|
super().__init__(*args, **kwargs)
|
|
self.redis: redis.Redis | None = None
|
|
|
|
@staticmethod
|
|
def redis_key(*parts: str) -> str:
|
|
"""Returns a redis key for the given info"""
|
|
h = hashlib.md5()
|
|
for part in parts:
|
|
h.update(part.encode())
|
|
return h.hexdigest()
|
|
|
|
async def close(self):
|
|
await self.redis.aclose()
|
|
|
|
async def start(self, **kwargs):
|
|
url = config["database"].get("uri")
|
|
if url:
|
|
self.redis = redis.Redis.from_url(url)
|
|
assert await self.redis.ping()
|
|
|
|
sys.path.extend(("..", "."))
|
|
|
|
for file in (Path(__file__).parent / "./modules").glob("*.py"):
|
|
if file.name.startswith("__"):
|
|
log.warning("Skipping loading %s - dunder file.", file)
|
|
name = "app.modules." + file.name[:-3]
|
|
try:
|
|
log.info("Loading %s...", name)
|
|
bot.mount_module(name)
|
|
except Exception:
|
|
log.error("Failed to load %s!", name, exc_info=True)
|
|
else:
|
|
log.info("Loaded %s.", name)
|
|
|
|
await super().start(**kwargs)
|
|
|
|
|
|
bot = NonsenseBot(
|
|
homeserver=config["bot"]["homeserver"],
|
|
user_id=config["bot"]["user_id"],
|
|
device_id=config["bot"].get("device_id", platform.node()),
|
|
store_path=str(store.resolve()),
|
|
command_prefix=config["bot"].get("prefix", "h!"),
|
|
owner_id=config["bot"].get("owner_id") or "@nex:nexy7574.co.uk",
|
|
auto_read_messages=False,
|
|
config=niobot.AsyncClientConfig()
|
|
)
|
|
bot.cfg = config
|
|
|
|
|
|
@bot.on_event("ready")
|
|
async def on_ready(_):
|
|
log.info("Bot has logged in.")
|
|
|
|
|
|
@bot.on_event("command")
|
|
async def on_command(ctx: niobot.Context):
|
|
log.info("Command %s invoked by %s.", ctx.command, ctx.message.sender)
|
|
|
|
|
|
@bot.on_event("command_error")
|
|
async def on_command_error(ctx: niobot.Context, exc: Exception):
|
|
exc = getattr(exc, "original", exc) or exc
|
|
log.error("Command %s failed.", ctx.command, exc_info=exc)
|
|
text = "\N{WARNING SIGN} Command failed: `%s`" % exc
|
|
text = text.replace(bot.access_token, "REDACTED")
|
|
await ctx.respond(text)
|
|
|
|
|
|
@bot.on_event("command_complete")
|
|
async def on_command_complete(ctx: niobot.Context, _):
|
|
log.info("Command %s completed.", ctx.command)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
asyncio.run(bot.start(access_token=bot.cfg["bot"]["access_token"]))
|