nonsensebot/app/modules/fediverse_preview.py

93 lines
3.6 KiB
Python
Raw Normal View History

2024-08-03 01:49:45 +01:00
"""
This module takes fediverse links (e.g.) and provides a preview in a reply.
2024-09-12 20:18:57 +01:00
By default, this will only work with fedi.transgender.ing, but if you set `[fediverse_preview.urls]` in your config,
you can add more. This module will use the Mastodon protocol, however it is designed to be used with misskey.
2024-08-03 01:49:45 +01:00
"""
2024-09-15 23:03:24 +01:00
2024-08-11 15:25:04 +01:00
import logging
2024-08-03 01:49:45 +01:00
import niobot
import httpx
import typing
import textwrap
from urllib.parse import urlparse
2024-09-15 23:03:24 +01:00
2024-08-03 01:49:45 +01:00
if typing.TYPE_CHECKING:
2024-09-15 23:03:24 +01:00
from ..main import NonsenseBot
2024-08-03 01:49:45 +01:00
class FediversePreviewModule(niobot.Module):
2024-09-15 23:03:24 +01:00
bot: "NonsenseBot"
2024-08-11 15:25:04 +01:00
log = logging.getLogger(__name__)
2024-08-03 01:49:45 +01:00
@niobot.event("message")
async def on_message(self, room: niobot.MatrixRoom, event: niobot.RoomMessage):
2024-09-15 20:25:42 +01:00
config = self.bot.cfg.get("fediverse_preview", {})
2024-08-15 11:34:38 +01:00
supported_prefixes = config.get("urls", ["https://fedi.transgender.ing"])
ignore = config.get("ignore", [])
2024-08-03 01:49:45 +01:00
if not isinstance(event, niobot.RoomMessageText):
return
2024-09-15 23:03:24 +01:00
if (
event.sender in ignore
or room.room_id in ignore
or event.sender == self.bot.user_id
):
return
2024-08-03 01:49:45 +01:00
sent = []
to_get: set[str] = set()
for item in event.body.split():
if not item.startswith(tuple(supported_prefixes)):
return
parsed = urlparse(item)
post_id = parsed.path.split("/")[-1]
if post_id in sent:
self.log.info("Already sent post %s", post_id)
continue
elif len(to_get) >= 5:
self.log.info("Already sent 5 posts, stopping")
break
to_get.add("https://%s/api/v1/statuses/%s" % (parsed.netloc, post_id))
if to_get:
async with httpx.AsyncClient(
headers={
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:129.0) Gecko/20100101 Firefox/129.0"
}
) as client:
for url in to_get:
resp = await client.get(
url
)
if resp.status_code != 200:
self.log.error("Got HTTP %d from %s", resp.status_code, resp.url)
continue
data = resp.json()
self.log.info("Got data: %r", data)
username = data["account"]["username"]
if not (text := data.get("content")):
self.log.warning("No text for post %s", url)
continue
2024-08-03 19:19:55 +01:00
self.log.info("Detected fediverse post %s: %r", url, text)
rendered = await self.bot._markdown_to_html(text)
parsed = urlparse(url)
text_body = "<blockquote>%s</blockquote>" % rendered
body = '<a href="%s">@%s:</a><br>%s' % (
"https://%s/@%s" % (parsed.netloc, username),
username,
text_body,
)
if data.get("media_attachments"):
body += "<p>{:,} attachments</p>".format(
len(data["media_attachments"])
)
self.log.info("Sending fediverse post %s", url)
await self.bot.send_message(
room,
body,
reply_to=event,
content_type="html.raw",
override={"body": f"@{username}: {data['content']!r}"},
2024-09-15 23:03:24 +01:00
)
sent.append(post_id)