import asyncio import logging import re import discord import io import matplotlib.pyplot as plt from datetime import timedelta, datetime from discord.ext import commands from typing import Callable, Iterable, Annotated from conf import CONFIG class QuoteQuota(commands.Cog): def __init__(self, bot): self.bot = bot self.quotes_channel_id = CONFIG["quote_a"].get("channel_id") self.names = CONFIG["quote_a"].get("names", {}) self.log = logging.getLogger("jimmy.cogs.quote_quota") @property def quotes_channel(self) -> discord.TextChannel | None: if self.quotes_channel_id: c = self.bot.get_channel(self.quotes_channel_id) if c: return c @staticmethod def generate_pie_chart( usernames: list[str], counts: list[int], no_other: bool = False ) -> discord.File: """ Converts the given username and count tuples into a nice pretty pie chart. :param usernames: The usernames :param counts: The number of times the username appears in the chat :param no_other: Disables the "other" grouping :returns: The pie chart image """ def pct(v: int): return f"{v:.1f}% ({round((v / 100) * sum(counts))})" if no_other is False: other = [] # Any authors with less than 5% of the total count will be grouped into "other" for i, author in enumerate(usernames.copy()): if (c := counts[i]) / sum(counts) < 0.05: other.append(c) counts[i] = -1 usernames.remove(author) if other: usernames.append("Other") counts.append(sum(other)) # And now filter out any -1% counts counts = [c for c in counts if c != -1] mapping = {} for i, author in enumerate(usernames): mapping[author] = counts[i] # Sort the authors by count new_mapping = {} for author, count in sorted(mapping.items(), key=lambda x: x[1], reverse=True): new_mapping[author] = count usernames = list(new_mapping.keys()) counts = list(new_mapping.values()) fig, ax = plt.subplots(figsize=(7, 7)) ax.pie( counts, labels=usernames, autopct=pct, startangle=90, radius=1.2, ) fig.subplots_adjust(left=0.1, bottom=0.1, right=0.9, top=0.9, wspace=0.3, hspace=0.4) fio = io.BytesIO() fig.savefig(fio, format='png') fio.seek(0) return discord.File(fio, filename="pie.png") @commands.slash_command() async def quota( self, ctx: discord.ApplicationContext, days: Annotated[ int, discord.Option( int, name="lookback", description="How many days to look back on. Defaults to 7.", default=7, min_value=1, max_value=365 ) ], merge_other: Annotated[ bool, discord.Option( bool, name="merge_other", description="Whether to merge authors with less than 5% of the total count into 'Other'.", default=True ) ] ): """Checks the quote quota for the quotes channel.""" now = discord.utils.utcnow() oldest = now - timedelta(days=days) await ctx.defer() channel = self.quotes_channel or discord.utils.get(ctx.guild.text_channels, name="quotes") if not channel: return await ctx.respond(":x: Cannot find quotes channel.") await ctx.respond("Gathering messages, this may take a moment.") authors = {} filtered_messages = 0 total = 0 async for message in channel.history( limit=None, after=oldest, oldest_first=False ): total += 1 if not message.content: filtered_messages += 1 continue if message.attachments: regex = r".*\s*-\s*@?([\w\s]+)" else: regex = r".+\s+-\s*@?([\w\s]+)" if not (m := re.match(regex, str(message.clean_content))): filtered_messages += 1 continue name = m.group(1) name = name.strip().casefold() if name == "me": name = message.author.name.strip().casefold() if name in self.names: name = self.names[name].title() else: filtered_messages += 1 continue elif name in self.names: name = self.names[name].title() elif name.isdigit(): filtered_messages += 1 continue name = name.title() authors.setdefault(name, 0) authors[name] += 1 if not authors: if total: return await ctx.edit( content="No valid messages found in the last {!s} days. " "Make sure quotes are formatted properly ending with ` - AuthorName`" " (e.g. `\"This is my quote\" - Jimmy`)".format(days) ) else: return await ctx.edit( content="No messages found in the last {!s} days.".format(days) ) file = await asyncio.to_thread( self.generate_pie_chart, list(authors.keys()), list(authors.values()), merge_other ) return await ctx.edit( content="{:,} messages (out of {:,}) were filtered (didn't follow format?)".format( filtered_messages, total ), file=file ) def _metacounter( self, messages: list[discord.Message], filter_func: Callable[[discord.Message], bool], *, now: datetime = None ) -> dict[str, float | int]: now = now or discord.utils.utcnow().replace(minute=0, second=0, microsecond=0) counts = { "hour": 0, "day": 0, "week": 0, "all_time": 0, "per_minute": 0.0, "per_hour": 0.0, "per_day": 0.0, } for message in messages: if filter_func(message): age = now - message.created_at self.log.debug("%r was a truth (%.2f seconds ago).", message.id, age.total_seconds()) counts["all_time"] += 1 if message.created_at > now - timedelta(hours=1): counts["hour"] += 1 if message.created_at > now - timedelta(days=1): counts["day"] += 1 if message.created_at > now - timedelta(days=7): counts["week"] += 1 counts["per_minute"] = counts["hour"] / 60 counts["per_hour"] = counts["day"] / 24 counts["per_day"] = counts["week"] / 7 self.log.info("Total truth counts: %r", counts) return counts async def _process_trump_truths(self, messages: list[discord.Message]) -> dict[str, int]: """ Processes the given messages to count the number of posts by Donald Trump. :param messages: The messages to process :returns: The stats """ def is_truth(msg: discord.Message) -> bool: if msg.author.id == 1101439218334576742: # if msg.created_at.timestamp() <= 1713202855.80234: # # Pre 6:30pm 15th April, embeds were not tagged for detection. # truth = any((_te.type == "rich" for _te in msg.embeds)) # if truth: # self.log.debug("Found untagged rich trump truth embed: %r", msg.id) # return True for __t_e in msg.embeds: if __t_e.type == "rich" and __t_e.colour is not None and __t_e.colour.value == 0x5448EE: self.log.debug("Found tagged rich trump truth embed: %r", msg.id) return True return False return self._metacounter(messages, is_truth) async def _process_tate_truths(self, messages: list[discord.Message]) -> dict[str, int]: """ Processes the given messages to count the number of posts by Andrew Tate. :param messages: The messages to process :returns: The stats """ def is_truth(msg: discord.Message) -> bool: if msg.author.id == 1229496078726860921: # All the tate truths are already tagged. for __t_e in msg.embeds: if __t_e.type == "rich" and __t_e.colour.value == 0x5448EE: self.log.debug("Found tagged rich tate truth embed %r", __t_e) return True return False return self._metacounter(messages, is_truth) async def _process_all_messages(self, channel: discord.TextChannel) -> discord.Embed: """ Processes all the messages in the given channel. :param channel: The channel to process :returns: The stats """ embed = discord.Embed( title="Truth Counts", color=discord.Color.blurple(), timestamp=discord.utils.utcnow() ) messages: list[discord.Message] = await channel.history( limit=None, after=discord.Object(1229487065117233203) ).flatten() trump_stats = await self._process_trump_truths(messages) tate_stats = await self._process_tate_truths(messages) embed.add_field( name="Donald Trump", value=( f"**All time:** {trump_stats['all_time']:,}\n" f"**Last Week:** {trump_stats['week']:,} ({trump_stats['per_day']:.1f}/day)\n" f"**Last Day:** {trump_stats['day']:,} ({trump_stats['per_hour']:.1f}/hour)\n" f"**Last Hour:** {trump_stats['hour']:,} ({trump_stats['per_minute']:.1f}/min)" ), ) embed.add_field( name="Andrew Tate", value=( f"**All time:** {tate_stats['all_time']:,}\n" f"**Last Week:** {tate_stats['week']:,} ({tate_stats['per_day']:.1f}/day)\n" f"**Last Day:** {tate_stats['day']:,} ({tate_stats['per_hour']:.1f}/hour)\n" f"**Last Hour:** {tate_stats['hour']:,} ({tate_stats['per_minute']:.1f}/min)" ) ) return embed @commands.slash_command(name="truths") async def truths_counter(self, ctx: discord.ApplicationContext): """Counts the number of times the word 'truth' has been said in the quotes channel.""" channel = discord.utils.get(ctx.guild.text_channels, name="spam") if not channel: return await ctx.respond(":x: Cannot find spam channel.") await ctx.defer() now = discord.utils.utcnow().replace(minute=0, second=0, microsecond=0) embed = discord.Embed( title="Counting truths, please wait.", description="This may take a minute depending on how insane Trump and Tate are feeling.", color=discord.Color.blurple(), timestamp=now ) await ctx.respond(embed=embed) embed = await self._process_all_messages(channel) await ctx.edit(embed=embed) def setup(bot): bot.add_cog(QuoteQuota(bot))