college-bot-v1/cogs/starboard.py
2024-05-05 01:12:16 +01:00

242 lines
10 KiB
Python

import asyncio
import io
import logging
import textwrap
import redis
import json
from urllib.parse import urlparse
import discord
import httpx
from discord.ext import commands
class StarBoardCog(commands.Cog):
def __init__(self, bot):
self.bot = bot
self.log = logging.getLogger("jimmy.starboard")
self.lock = asyncio.Lock()
self.redis = redis.asyncio.Redis(decode_responses=True)
@staticmethod
async def archive_image(starboard_message: discord.Message):
async with httpx.AsyncClient(
headers={
"User-Agent": "Mozilla/5.0 (Windows NT 10.69; Win64; x64) "
"LCC-Bot-Scraper/0 (https://github.com/nexy7574/LCC-bot)"
}
) as session:
image = starboard_message.embeds[0].image
if image and image.url:
parsed = urlparse(image.url)
filename = parsed.path.split("/")[-1]
try:
r = await session.get(image.url)
except httpx.HTTPError:
if image.proxy_url:
r = await session.get(image.proxy_url)
else:
return
FS_LIMIT = starboard_message.guild.filesize_limit
# if FS_LIMIT is 8mb, its actually 25MB
if FS_LIMIT == 8 * 1024 * 1024:
FS_LIMIT = 25 * 1024 * 1024
if r.status_code == 200 and len(r.content) < FS_LIMIT:
file = io.BytesIO(r.content)
file.seek(0)
embed = starboard_message.embeds[0].copy()
embed.set_image(url="attachment://" + filename)
embeds = [embed, *starboard_message.embeds[1:]]
await starboard_message.edit(embeds=embeds, file=discord.File(file, filename=filename))
async def _ping_check(self):
return await self.redis.ping()
async def generate_starboard_embed(self, message: discord.Message) -> discord.Embed:
star_count = [x for x in message.reactions if str(x.emoji) == "\N{white medium star}"]
if not star_count:
star_count = 0
else:
star_count = star_count[0].count
embed = discord.Embed(colour=discord.Colour.gold(), timestamp=message.created_at, description=message.content)
embed.set_author(
name=message.author.display_name, url=message.jump_url, icon_url=message.author.display_avatar.url
)
if star_count > 5:
stars = "\N{white medium star}x{:,}".format(star_count)
else:
stars = "\N{white medium star}" * star_count
stars = stars or "\N{no entry sign}"
embed.add_field(
name="Info",
value=f"Star count: {stars}\n"
f"Channel: {message.channel.mention}\n"
f"Author: {message.author.mention}\n"
f"URL: [jump]({message.jump_url})\n"
f"Sent: {discord.utils.format_dt(message.created_at, 'R')}",
inline=False,
)
if message.edited_at:
embed.fields[0].value += "\nLast edited: " + discord.utils.format_dt(message.edited_at, "R")
if message.reference is not None:
try:
ref: discord.Message = await self.bot.get_channel(message.reference.channel_id).fetch_message(
message.reference.message_id
)
except discord.HTTPException:
pass
else:
embed.add_field(
name="In reply to",
value=f"[Message by {ref.author.display_name}]({ref.jump_url}):\n>>> ",
inline=False,
)
field = embed.fields[1]
if not ref.content:
embed.fields[1].value = field.value.replace(":\n>>> ", "")
else:
embed.fields[1].value += textwrap.shorten(ref.content, 1024 - len(field.value), placeholder="...")
if message.interaction is not None:
inter: discord.MessageInteraction = message.interaction
if inter.type == discord.InteractionType.application_command:
cmd = "</{}:{}>".format(inter.id, inter.name)
else:
cmd = repr(inter.name)
embed.add_field(
name="In response to",
value=f"Command {cmd} by {inter.user.display_name}",
inline=False,
)
for file in message.attachments:
name = f"Attachment #{message.attachments.index(file)}"
spoiler = file.is_spoiler()
if spoiler:
embed.add_field(name=name, value=f"||[{file.filename}]({file.url})||", inline=False)
else:
if file.content_type.startswith("image"):
if embed.image is not None:
embed.set_image(url=file.url)
embed.add_field(name=name, value=f"[{file.filename}]({file.url})", inline=False)
# embed.set_footer(text="Starboard threshold for this message was {:.2f}.".format(cap))
return embed
@commands.Cog.listener("on_raw_reaction_add")
@commands.Cog.listener("on_raw_reaction_remove")
async def on_star_add(self, payload: discord.RawReactionActionEvent):
if not payload.guild_id:
return
if not await self._ping_check():
self.log.warning("Redis ping check failed - redis offline?")
return
async with self.lock:
if str(payload.emoji) != "\N{white medium star}":
return
_channel: discord.TextChannel = self.bot.get_channel(payload.channel_id)
message: discord.Message = await _channel.fetch_message(payload.message_id)
if message.author.id == payload.user_id and payload.event_type == "REACTION_ADD":
if message.channel.permissions_for(message.guild.me).manage_messages:
await message.remove_reaction(payload.emoji, message.author)
return await message.reply(
f"You can't star your own messages you pretentious dick, {message.author.mention}."
)
star_count = [x for x in message.reactions if str(x.emoji) == "\N{white medium star}"]
if not star_count:
star_count = 0
else:
star_count = star_count[0].count
entry = await self.redis.get(str(message.id))
if entry:
entry = json.loads(entry)
channel = discord.utils.get(message.guild.text_channels, name="starboard")
if not channel:
return
if star_count == 0:
if not entry:
return
else:
entry = json.loads(entry)
if channel:
try:
message = await channel.fetch_message(entry["starboard_msg_id"])
await message.delete(delay=0.1, reason="Starboard message lost all stars.")
except discord.HTTPException:
pass
finally:
return await self.redis.delete(str(message.id))
else:
return await self.redis.delete(str(message.id))
if not entry:
created = True
entry = {
"message_id": payload.message_id,
"channel_id": payload.channel_id,
"guild_id": payload.guild_id,
"starboard_channel_id": channel.id,
"starboard_message_id": None
}
else:
created = False
if created:
# noinspection PyUnresolvedReferences
cap = message.channel
if self.bot.intents.members and hasattr(cap, "members"):
cap = len([x for x in cap.members if not x.bot]) * 0.1
else:
cap = cap.member_count * 0.1
if star_count >= cap:
if channel and channel.can_send(discord.Embed()):
embed = await self.generate_starboard_embed(message)
embeds = [embed, *tuple(filter(lambda x: x.type == "rich", message.embeds))][:10]
msg = await channel.send(embeds=embeds)
entry["starboard_message_id"] = msg.id
await self.redis.set(str(payload.message_id), json.dumps(entry))
self.bot.loop.create_task(self.archive_image(msg))
else:
await self.redis.delete(str(payload.message_id))
return
else:
channel = discord.utils.get(message.guild.text_channels, name="starboard")
embed = await self.generate_starboard_embed(message)
embeds = [embed, *tuple(filter(lambda x: x.type == "rich", message.embeds))][:10]
if channel and channel.can_send() and entry.starboard_message:
try:
msg = await channel.fetch_message(entry.starboard_message)
except discord.NotFound:
msg = await channel.send(embeds=embeds)
# await entry.update(starboard_message=msg.id)
entry["starboard_message_id"] = msg.id
await self.redis.set(str(payload.message_id), json.dumps(entry))
self.bot.loop.create_task(self.archive_image(msg))
except discord.HTTPException:
pass
else:
await msg.edit(embeds=embeds)
self.bot.loop.create_task(self.archive_image(msg))
@commands.message_command(name="Starboard Info")
@discord.guild_only()
async def get_starboard_info(self, ctx: discord.ApplicationContext, message: discord.Message):
if not await self._ping_check():
self.log.warning("Redis ping check failed - redis offline?")
data = {"warning": "redis offline - ping failed."}
else:
data = await self.redis.get(str(message.id))
return await ctx.respond(
'```json\n%s\n```' % json.dumps(data, indent=4),
embed=await self.generate_starboard_embed(message)
)
def setup(bot):
bot.add_cog(StarBoardCog(bot))