diff --git a/src/cogs/starboard.py b/src/cogs/starboard.py new file mode 100644 index 0000000..8a87ed6 --- /dev/null +++ b/src/cogs/starboard.py @@ -0,0 +1,235 @@ +import asyncio +import io +import textwrap +from typing import Tuple +from urllib.parse import urlparse + +import discord +import httpx +import aiosqlite +from discord.ext import commands + + +class StarBoardCog(commands.Cog): + def __init__(self, bot): + self.bot = bot + self.lock = asyncio.Lock() + self.db = "./data/starboard.db" + + async def open_db(self) -> aiosqlite.Connection: + """Opens a connection to the database""" + conn = await aiosqlite.connect(self.db) + conn.row_factory = aiosqlite.Row + await conn.execute( + """ + CREATE TABLE IF NOT EXISTS messages ( + source_id INTEGER PRIMARY KEY, + channel_id INTEGER NOT NULL, + guild_id INTEGER NOT NULL + starboard_message_id INTEGER NOT NULL UNIQUE, + starboard_channel_id INTEGER NOT NULL + ); + """ + ) + await conn.execute() + return conn + + async def get_starboard_message(self, message_id: int) -> dict | None: + async with await self.open_db() as conn: + async with conn.execute( + "SELECT * FROM messages WHERE source_id = ? OR starboard_message_id = ?", + (message_id, message_id) + ) as cursor: + row = await cursor.fetchone() + if not row: + return + return dict(row) + + @staticmethod + async def archive_image(starboard_message: discord.Message): + async with httpx.AsyncClient( + headers={ + "User-Agent": "Mozilla/5.0 (Windows NT 10.69; Win64; x64) " + "LCC-Bot-Scraper/1 (https://git.nexy7574.co.uk/nex/college-bot-v2)" + } + ) as session: + image = starboard_message.embeds[0].image + if image and image.url: + parsed = urlparse(image.url) + filename = parsed.path.split("/")[-1] + try: + r = await session.get(image.url) + except httpx.HTTPError: + if image.proxy_url: + r = await session.get(image.proxy_url) + else: + return + + FS_LIMIT = max(25 * 1024 * 1024, starboard_message.guild.filesize_limit) + if r.status_code == 200 and len(r.content) < FS_LIMIT: + file = io.BytesIO(r.content) + file.seek(0) + embed = starboard_message.embeds[0].copy() + embed.set_image(url="attachment://" + filename) + embeds = [embed, *starboard_message.embeds[1:]] + await starboard_message.edit(embeds=embeds, file=discord.File(file, filename=filename)) + + async def generate_starboard_embed(self, message: discord.Message) -> discord.Embed: + star_count = [x for x in message.reactions if str(x.emoji) == "\N{white medium star}"] + if not star_count: + star_count = 0 + else: + star_count = star_count[0].count + embed = discord.Embed(colour=discord.Colour.gold(), timestamp=message.created_at, description=message.content) + embed.set_author( + name=message.author.display_name, url=message.jump_url, icon_url=message.author.display_avatar.url + ) + + if star_count > 5: + stars = "\N{white medium star}x{:,}".format(star_count) + else: + stars = "\N{white medium star}" * star_count + stars = stars or "\N{no entry sign}" + + embed.add_field( + name="Info", + value=f"Star count: {stars}\n" + f"Channel: {message.channel.mention}\n" + f"Author: {message.author.mention}\n" + f"URL: [jump]({message.jump_url})\n" + f"Sent: {discord.utils.format_dt(message.created_at, 'R')}", + inline=False, + ) + if message.edited_at: + embed.fields[0].value += "\nLast edited: " + discord.utils.format_dt(message.edited_at, "R") + + if message.reference is not None: + try: + ref: discord.Message = await self.bot.get_channel(message.reference.channel_id).fetch_message( + message.reference.message_id + ) + except discord.HTTPException: + pass + else: + embed.add_field( + name="In reply to", + value=f"[Message by {ref.author.display_name}]({ref.jump_url}):\n>>> ", + inline=False, + ) + field = embed.fields[1] + if not ref.content: + embed.fields[1].value = field.value.replace(":\n>>> ", "") + else: + embed.fields[1].value += textwrap.shorten(ref.content, 1024 - len(field.value), placeholder="...") + if message.interaction is not None: + inter: discord.MessageInteraction = message.interaction + if inter.type == discord.InteractionType.application_command: + cmd = "".format(inter.id, inter.name) + else: + cmd = repr(inter.name) + embed.add_field( + name="In response to", + value=f"Command {cmd} by {inter.user.display_name}", + inline=False, + ) + + for file in message.attachments: + name = f"Attachment #{message.attachments.index(file)}" + spoiler = file.is_spoiler() + if spoiler: + embed.add_field(name=name, value=f"||[{file.filename}]({file.url})||", inline=False) + else: + if file.content_type.startswith("image"): + if embed.image.url is discord.Embed.Empty: + embed.set_image(url=file.url) + embed.add_field(name=name, value=f"[{file.filename}]({file.url})", inline=False) + + # embed.set_footer(text="Starboard threshold for this message was {:.2f}.".format(cap)) + return embed + + @commands.Cog.listener("on_raw_reaction_add") + @commands.Cog.listener("on_raw_reaction_remove") + async def on_star_add(self, payload: discord.RawReactionActionEvent): + if not payload.guild_id: + return + async with self.lock: + if str(payload.emoji) != "\N{white medium star}": + return + message: discord.Message = await self.bot.get_channel(payload.channel_id).fetch_message(payload.message_id) + if message.author.id == payload.user_id and payload.event_type == "REACTION_ADD": + if message.channel.permissions_for(message.guild.me).manage_messages: + await message.remove_reaction(payload.emoji, message.author) + return await message.reply( + f"You can't star your own messages you pretentious dick, {message.author.mention}." + ) + star_count = [x for x in message.reactions if str(x.emoji) == "\N{white medium star}"] + if not star_count: + star_count = 0 + else: + star_count = star_count[0].count + + if star_count == 0: + try: + database: StarBoardMessage = await StarBoardMessage.objects.get(id=payload.message_id) + except orm.NoMatch: + return + else: + channel = discord.utils.get(message.guild.text_channels, name="starboard") + if channel: + try: + message = await channel.fetch_message(database.id) + await message.delete(delay=0.1, reason="Starboard message lost all stars.") + except discord.HTTPException: + pass + finally: + await database.delete() + else: + await database.delete() + + database: Tuple[StarBoardMessage, bool] = await StarBoardMessage.objects.get_or_create( + {"channel": payload.channel_id}, id=payload.message_id + ) + entry, created = database + if created: + # noinspection PyUnresolvedReferences + cap = message.channel + if self.bot.intents.members and hasattr(cap, "members"): + cap = len([x for x in cap.members if not x.bot]) * 0.1 + else: + cap = cap.member_count * 0.1 + if star_count >= cap: + channel = discord.utils.get(message.guild.text_channels, name="starboard") + if channel and channel.can_send(): + embed = await self.generate_starboard_embed(message) + embeds = [embed, *tuple(filter(lambda x: x.type == "rich", message.embeds))][:10] + msg = await channel.send(embeds=embeds) + await entry.update(starboard_message=msg.id) + self.bot.loop.create_task(self.archive_image(msg)) + else: + await entry.delete() + return + else: + channel = discord.utils.get(message.guild.text_channels, name="starboard") + embed = await self.generate_starboard_embed(message) + embeds = [embed, *tuple(filter(lambda x: x.type == "rich", message.embeds))][:10] + if channel and channel.can_send() and entry.starboard_message: + try: + msg = await channel.fetch_message(entry.starboard_message) + except discord.NotFound: + msg = await channel.send(embeds=embeds) + await entry.update(starboard_message=msg.id) + self.bot.loop.create_task(self.archive_image(msg)) + except discord.HTTPException: + pass + else: + await msg.edit(embeds=embeds) + self.bot.loop.create_task(self.archive_image(msg)) + + @commands.message_command(name="Starboard Preview") + @discord.guild_only() + async def get_starboard_info(self, ctx: discord.ApplicationContext, message: discord.Message): + return await ctx.respond(embed=await self.generate_starboard_embed(message)) + + +def setup(bot): + bot.add_cog(StarBoardCog(bot)) \ No newline at end of file