import json import re import typing import httpx from pathlib import Path import niobot if typing.TYPE_CHECKING: from ..main import NonsenseBot class MSCGetter(niobot.Module): bot: "NonsenseBot" def __init__(self, bot): super().__init__(bot) self.latest_msc = None self.msc_cache = Path.cwd() / ".msc-cache" self.msc_cache.mkdir(parents=True, exist_ok=True) async def get_msc_with_cache(self, number: int): if number not in range(1, 10_000): return {"error": "Invalid MSC ID"} file = self.msc_cache / ("%d.json" % number) content = None if file.exists(): try: content = json.loads(file.read_text("utf-8", "replace")) except json.JSONDecodeError: file.unlink() if content: return content async with httpx.AsyncClient() as client: response = await client.get( "https://api.github.com/repos/matrix-org/matrix-spec-proposals/pulls/%d" % number ) if response.status_code != 200: return {"error": "HTTP %d" % response.status_code} content = response.json() file.write_text(json.dumps(content)) return content @niobot.event("message") async def on_message(self, room: niobot.MatrixRoom, message: niobot.RoomMessage): if self.bot.is_old(message): return if message.sender == self.bot.user: return if not isinstance(message, niobot.RoomMessageText): return if "m.in_reply_to" in message.source.get("m.relates_to", []): return if await self.bot.redis.get( self.bot.redis_key(room.room_id, "auto_msc.enabled") ): matches = re.finditer("[MmSsCc]\W?([0-9]{1,4})", message.body) lines = [] for m in matches: no = m.group(1) if no: data = await self.get_msc_with_cache(int(no)) if data.get("error"): continue lines.append(f"[{data['title']}]({data['html_url']})") if lines: return await self.bot.send_message( room, "\n".join((f"* {ln}" for ln in lines)), reply_to=message ) @niobot.command() async def msc(self, ctx: niobot.Context, number: str): """Fetches the given MSC""" if number.startswith("msc"): number = number[3:] elif number.startswith("#"): number = number[1:] if not number.isdigit() or len(number) != 4: return await ctx.respond("Invalid MXC number.") data: dict = await self.get_msc_with_cache(int(number)) if data.get("error"): return await ctx.respond(data["error"]) return await ctx.respond(f"[{data['title']}]({data['html_url']})") @niobot.command("automsc.enable") async def auto_msc_enable(self, ctx: niobot.Context): """Automatically enables MSC linking. Requires a power level of at least 50.""" if (sp := ctx.room.power_levels.users.get(ctx.message.sender, -999)) < 50: return await ctx.respond("You need to have at least a power level of 50 to use this.") key = self.bot.redis_key(ctx.room.room_id, "auto_msc.enabled") exists = await self.bot.redis.get(key) if exists: return await ctx.respond("AutoMSC is already enabled in this room.") await self.bot.redis.set(key, 1) await self.bot.redis.save() return await self.bot.add_reaction( ctx.room, ctx.message, "\N{WHITE HEAVY CHECK MARK}" ) @niobot.command("automsc.disable") async def auto_msc_disable(self, ctx: niobot.Context): """Disables automatic MSC linking. Requires a power level of at least 50.""" if (sp := ctx.room.power_levels.users.get(ctx.message.sender, -999)) < 50: return await ctx.respond("You need to have at least a power level of 50 to use this.") key = self.bot.redis_key(ctx.room.room_id, "auto_msc.enabled") exists = await self.bot.redis.get(key) if exists: await self.bot.redis.delete(key) await self.bot.redis.save() await self.bot.add_reaction( ctx.room, ctx.message, "\N{WHITE HEAVY CHECK MARK}" ) else: return await ctx.respond("AutoMSC is already disabled in this room.")