From d856c102608a31457e6d3d724c68d67bee391408 Mon Sep 17 00:00:00 2001 From: nexy7574 Date: Mon, 8 Jul 2024 01:53:31 +0100 Subject: [PATCH] Add onion feed to cogs --- src/cogs/onion_feed.py | 100 +++++++++++++++++++++++++++++++++++++++++ 1 file changed, 100 insertions(+) create mode 100644 src/cogs/onion_feed.py diff --git a/src/cogs/onion_feed.py b/src/cogs/onion_feed.py new file mode 100644 index 0000000..18c6b08 --- /dev/null +++ b/src/cogs/onion_feed.py @@ -0,0 +1,100 @@ +""" +Scrapes the onion RSS feed once every hour and posts any new articles to the desired channel +""" +import asyncio +import dataclasses +import datetime +import logging +from bs4 import BeautifulSoup +import discord +from discord.ext import commands, tasks +import httpx +from conf import CONFIG +import redis + + +@dataclasses.dataclass +class RSSItem: + title: str + link: str + description: str + pubDate: datetime.datetime + guid: str + thumbnail: str + + +class OnionFeed(commands.Cog): + SOURCE = "https://www.theonion.com/rss" + EPOCH = datetime.datetime(2024, 7, 7, tzinfo=datetime.timezone.utc) + + def __init__(self, bot): + self.bot: commands.Bot = bot + self.log = logging.getLogger("jimmy.cogs.onion_feed") + self.check_onion_feed.start() + self.redis = redis.Redis(**CONFIG["redis"]) + + def cog_unload(self) -> None: + self.check_onion_feed.cancel() + + @staticmethod + def parse_item(item: BeautifulSoup) -> RSSItem: + kwargs = { + "title": item.title.get_text(strip=True).strip(), + "link": item.link.get_text(strip=True).strip(), + "pubDate": datetime.datetime.strptime( + item.pubDate.get_text(strip=True).strip(), "%a, %d %b %Y %H:%M:%S %Z" + ), + "guid": item.guid.get_text(strip=True).strip(), + "description": BeautifulSoup(item.description.get_text()).p.get_text(strip=True).strip()[:-1], + "thumbnail": item.find("media:thumbnail")["url"], + } + return RSSItem(**kwargs) + + def parse_feed(self, text: str) -> list[RSSItem]: + soup = BeautifulSoup(text, "xml") + return [self.parse_item(item) for item in soup.find_all("item")] + + @tasks.loop(hours=1) + async def check_onion_feed(self): + if not self.bot.is_ready(): + await self.bot.wait_until_ready() + + guild = self.bot.get_guild(994710566612500550) + if not guild: + return self.log.error("Nonsense guild not found. Can't do onion feed.") + channel = discord.utils.get(guild.text_channels, name="spam") + if not channel: + return self.log.error("Spam channel not found.") + + async with httpx.AsyncClient() as client: + response = await client.get(self.SOURCE) + if response.status_code != 200: + return self.log.error(f"Failed to fetch onion feed: {response.status_code}") + items: list[RSSItem] = await asyncio.to_thread(self.parse_feed, response.text) + for item in items: + if self.redis.get("onion-" + item.guid): + continue + embed = discord.Embed( + title=item.title, + url=item.link, + description=item.description + f"... [Read More]({item.link})", + color=0x00DF78, + timestamp=item.pubDate, + ) + embed.set_thumbnail(url=item.thumbnail) + try: + msg = await channel.send(embed=embed) + # noinspection PyAsyncCall + self.redis.set("onion-" + item.guid, str(msg.id)) + except discord.HTTPException: + self.log.exception(f"Failed to send onion feed message: {item.title}") + else: + self.log.debug(f"Sent onion feed message: {item.title}") + + @check_onion_feed.before_loop + async def before_check_onion_feed(self): + await self.bot.wait_until_ready() + + +def setup(bot): + bot.add_cog(OnionFeed(bot))