""" This module takes misskey links (e.g.) and provides a preview in a reply. """ import niobot import httpx import typing import textwrap from urllib.parse import urlparse if typing.TYPE_CHECKING: from ..main import TortoiseIntegratedBot class MisskeyPreviewModule(niobot.Module): bot: "TortoiseIntegratedBot" @niobot.event("message") async def on_message(self, room: niobot.MatrixRoom, event: niobot.RoomMessage): supported_prefixes = self.bot.cfg.get("misskey_preview", {}) supported_prefixes = supported_prefixes.get("urls", ["https://fedi.transgender.ing/notes"]) if not isinstance(event, niobot.RoomMessageText): return sent = [] async with httpx.AsyncClient() as client: for item in event.body.split(): if not event.body.startswith(tuple(supported_prefixes)): return parsed = urlparse(item) post_id = parsed.path.split("/")[-1] if post_id in sent: continue elif len(sent) >= 5: break resp = await client.post("https://%s/api/notes/show" % parsed.netloc, json={"noteId": post_id}) if resp.status_code != 200: continue data = resp.json() username = data["user"]["username"] if not data["text"]: continue text_body = [] for line in textwrap.shorten(data["text"], width=1000).splitlines(keepends=True): text_body.append("> " + (line or "\u200b")) body = "@%s:\n\n%s" % ( "https://%s/@%s" % (parsed.netloc, username), username, "".join(text_body), ) await self.bot.send_message( room, body, reply_to=event, content_type="html.raw", override={"body": f"@{username}: {data['text']!r}"} ) sent.append(post_id)