nonsensebot/app/modules/msc_getter.py

218 lines
8.2 KiB
Python
Raw Permalink Normal View History

2024-09-19 17:40:50 +01:00
from __future__ import annotations
2024-09-15 23:03:24 +01:00
import json
import logging
2024-09-15 23:03:24 +01:00
import re
import typing
2024-09-19 17:41:54 +01:00
from typing import Annotated, TYPE_CHECKING
2024-09-15 23:03:24 +01:00
import httpx
from pathlib import Path
2024-09-15 20:25:42 +01:00
import niobot
2024-09-15 23:03:24 +01:00
if typing.TYPE_CHECKING:
from ..main import NonsenseBot
2024-09-15 20:25:42 +01:00
class MSCGetter(niobot.Module):
2024-09-19 17:46:09 +01:00
if TYPE_CHECKING:
bot: "NonsenseBot"
log = logging.getLogger(__name__)
2024-09-15 23:03:24 +01:00
def __init__(self, bot):
super().__init__(bot)
self.latest_msc = None
self.msc_cache = Path.cwd() / ".msc-cache"
self.msc_cache.mkdir(parents=True, exist_ok=True)
2024-09-19 17:28:25 +01:00
async def get_msc_with_cache(self, number: int) -> dict:
2024-09-15 23:03:24 +01:00
if number not in range(1, 10_000):
return {"error": "Invalid MSC ID"}
file = self.msc_cache / ("%d.json" % number)
content = None
if file.exists():
try:
content = json.loads(file.read_text("utf-8", "replace"))
except json.JSONDecodeError:
file.unlink()
if content:
return content
self.log.debug("Requesting MSC: %d", number)
2024-09-15 23:03:24 +01:00
async with httpx.AsyncClient() as client:
response = await client.get(
2024-09-15 23:18:57 +01:00
"https://api.github.com/repos/matrix-org/matrix-spec-proposals/issues/%d"
2024-09-15 23:03:24 +01:00
% number
)
if response.status_code != 200:
2024-09-19 17:28:25 +01:00
return {
"error": "Failed to fetch issue from GitHub (HTTP {!s}),"
"and no cached version was available.".format(response.status_code)
}
2024-09-15 23:03:24 +01:00
content = response.json()
file.write_text(json.dumps(content))
return content
2024-09-19 17:28:25 +01:00
async def search_for_msc(self, query: str) -> list[dict]:
force_fetch = "+fetch" in query
query = query.replace("+fetch", "")
found = []
for cached_msc in self.msc_cache.glob("*.json"):
data = json.loads(cached_msc.read_text())
if query.casefold() in data["title"].casefold():
found.append(data)
if len(found) < 10 or force_fetch:
async with httpx.AsyncClient() as client:
response = await client.get(
"https://api.github.com/search/issues",
params={
2024-09-19 17:35:46 +01:00
"q": query + "+is:pull-request+repo:matrix-org/matrix-spec-proposals",
2024-09-19 17:28:25 +01:00
"per_page": 10
}
)
if response.status_code == 200:
data = response.json()
2024-09-19 17:35:46 +01:00
for pr in data["items"]:
2024-09-19 17:28:25 +01:00
found.append(pr)
number = pr["number"]
file = self.msc_cache / ("%d.json" % number)
if not file.exists():
file.write_text(json.dumps(pr))
else:
return [
{
"title": "Error querying GitHub (HTTP %s)." % str(response.status_code),
"html_url": "https://http.cat/" + str(response.status_code)
}
]
return found
2024-09-15 23:03:24 +01:00
@niobot.event("message")
async def on_message(self, room: niobot.MatrixRoom, message: niobot.RoomMessage):
if self.bot.is_old(message):
return
if message.sender == self.bot.user:
return
if not isinstance(message, niobot.RoomMessageText):
return
if "m.in_reply_to" in message.source.get("m.relates_to", []):
return
if await self.bot.redis.get(
self.bot.redis_key(room.room_id, "auto_msc.enabled")
):
2024-09-19 17:28:25 +01:00
matches = re.finditer(r"((msc)\W?)([0-9]{1,4})", message.body, re.IGNORECASE)
2024-09-15 23:03:24 +01:00
lines = []
for m in matches:
no = m.group(1)
if no:
data = await self.get_msc_with_cache(int(no))
if data.get("error"):
continue
lines.append(f"[{data['title']}]({data['html_url']})")
if lines:
return await self.bot.send_message(
room, "\n".join((f"* {ln}" for ln in lines)), reply_to=message
)
2024-09-19 17:28:25 +01:00
@staticmethod
def pr_to_display(data: dict) -> str:
return f"* [{data['title']}]({data['html_url']})"
2024-09-15 20:25:42 +01:00
@niobot.command()
2024-09-19 17:35:46 +01:00
async def msc(
self,
ctx: niobot.Context,
2024-09-19 18:00:05 +01:00
number_or_query
2024-09-19 17:35:46 +01:00
):
2024-09-15 20:25:42 +01:00
"""Fetches the given MSC"""
2024-09-19 17:47:05 +01:00
if number_or_query.startswith("?"): # search
msg = await ctx.respond("Searching for relevant MSCs with query %r..." % number_or_query[1:])
results = await self.search_for_msc(number_or_query[1:])
2024-09-19 17:28:25 +01:00
if not results:
await msg.edit("No MSCs matched your query.")
lines = []
for pr in results:
lines.append(self.pr_to_display(pr))
lines_formatted = "\n".join(lines)
2024-09-19 18:01:52 +01:00
if len(lines) > 3:
2024-09-19 17:28:25 +01:00
new_lines = [
"<ul>"
]
2024-09-19 18:01:52 +01:00
for pr in results[:3]:
2024-09-19 17:28:25 +01:00
new_lines.append('<li><a href="{0[html_url]}" target="_blank">{0[title]}</a></li>'.format(pr))
new_lines += [
"</ul>",
"<details>",
2024-09-19 18:01:52 +01:00
"<summary>And %d more...</summary>" % (len(results) - 3),
2024-09-19 17:28:25 +01:00
"<ul>"
]
2024-09-19 18:01:52 +01:00
for pr in results[3:]:
2024-09-19 17:28:25 +01:00
new_lines.append('<li><a href="{0[html_url]}" target="_blank">{0[title]}</a></li>'.format(pr))
new_lines += [
"</ul>",
"</details>"
]
await msg.edit(
content="\n".join(new_lines),
content_type="html.raw",
override={
"body": lines_formatted
}
)
else:
await msg.edit(content=lines_formatted)
return
2024-09-19 17:47:05 +01:00
if number_or_query.startswith("msc"):
number_or_query = number_or_query[3:]
elif number_or_query.startswith("#"):
number_or_query = number_or_query[1:]
if not number_or_query.isdigit() or len(number_or_query) != 4:
2024-09-15 20:25:42 +01:00
return await ctx.respond("Invalid MXC number.")
2024-09-15 23:03:24 +01:00
2024-09-19 17:47:05 +01:00
msg = await ctx.respond("Fetching MSC #{:0>4}...".format(number_or_query))
data: dict = await self.get_msc_with_cache(int(number_or_query))
2024-09-15 23:03:24 +01:00
if data.get("error"):
2024-09-19 17:28:25 +01:00
return await msg.edit(data["error"])
return await msg.edit(self.pr_to_display(data))
2024-09-15 23:03:24 +01:00
@niobot.command("automsc.enable")
async def auto_msc_enable(self, ctx: niobot.Context):
"""Automatically enables MSC linking. Requires a power level of at least 50."""
2024-09-15 23:08:53 +01:00
if (sp := ctx.room.power_levels.users.get(ctx.message.sender, -999)) < 50:
2024-09-18 18:06:31 +01:00
return await ctx.respond(
f"You need to have at least a power level of 50 to use this (you have {sp})."
)
2024-09-15 23:03:24 +01:00
key = self.bot.redis_key(ctx.room.room_id, "auto_msc.enabled")
exists = await self.bot.redis.get(key)
if exists:
return await ctx.respond("AutoMSC is already enabled in this room.")
await self.bot.redis.set(key, 1)
await self.bot.redis.save()
return await self.bot.add_reaction(
ctx.room, ctx.message, "\N{WHITE HEAVY CHECK MARK}"
2024-09-15 20:25:42 +01:00
)
2024-09-15 23:03:24 +01:00
@niobot.command("automsc.disable")
async def auto_msc_disable(self, ctx: niobot.Context):
"""Disables automatic MSC linking. Requires a power level of at least 50."""
2024-09-15 23:08:53 +01:00
if (sp := ctx.room.power_levels.users.get(ctx.message.sender, -999)) < 50:
2024-09-18 18:06:31 +01:00
return await ctx.respond(
f"You need to have at least a power level of 50 to use this (you have {sp})."
)
2024-09-15 23:03:24 +01:00
key = self.bot.redis_key(ctx.room.room_id, "auto_msc.enabled")
exists = await self.bot.redis.get(key)
if exists:
await self.bot.redis.delete(key)
await self.bot.redis.save()
await self.bot.add_reaction(
ctx.room, ctx.message, "\N{WHITE HEAVY CHECK MARK}"
)
else:
return await ctx.respond("AutoMSC is already disabled in this room.")