import asyncio import io import logging import re from datetime import datetime, timedelta from typing import Annotated, Callable, Iterable import discord import matplotlib.pyplot as plt from discord.ext import commands from conf import CONFIG class QuoteQuota(commands.Cog): def __init__(self, bot): self.bot = bot self.quotes_channel_id = CONFIG["quote_a"].get("channel_id") self.names = CONFIG["quote_a"].get("names", {}) self.log = logging.getLogger("jimmy.cogs.quote_quota") @property def quotes_channel(self) -> discord.TextChannel | None: if self.quotes_channel_id: c = self.bot.get_channel(self.quotes_channel_id) if c: return c @staticmethod def generate_pie_chart(usernames: list[str], counts: list[int], no_other: bool = False) -> discord.File: """ Converts the given username and count tuples into a nice pretty pie chart. :param usernames: The usernames :param counts: The number of times the username appears in the chat :param no_other: Disables the "other" grouping :returns: The pie chart image """ def pct(v: int): return f"{v:.1f}% ({round((v / 100) * sum(counts))})" if no_other is False: other = [] # Any authors with less than 5% of the total count will be grouped into "other" for i, author in enumerate(usernames.copy()): if (c := counts[i]) / sum(counts) < 0.05: other.append(c) counts[i] = -1 usernames.remove(author) if other: usernames.append("Other") counts.append(sum(other)) # And now filter out any -1% counts counts = [c for c in counts if c != -1] mapping = {} for i, author in enumerate(usernames): mapping[author] = counts[i] # Sort the authors by count new_mapping = {} for author, count in sorted(mapping.items(), key=lambda x: x[1], reverse=True): new_mapping[author] = count usernames = list(new_mapping.keys()) counts = list(new_mapping.values()) plt.clf() fig, ax = plt.subplots(figsize=(7, 7)) ax.pie( counts, labels=usernames, autopct=pct, startangle=90, radius=1.2, ) fig.subplots_adjust(left=0.1, bottom=0.1, right=0.9, top=0.9, wspace=0.3, hspace=0.4) fio = io.BytesIO() fig.savefig(fio, format="png") fio.seek(0) return discord.File(fio, filename="pie.png") @commands.slash_command() async def quota( self, ctx: discord.ApplicationContext, days: Annotated[ int, discord.Option( int, name="lookback", description="How many days to look back on. Defaults to 7.", default=7, min_value=1, max_value=365, ), ], merge_other: Annotated[ bool, discord.Option( bool, name="merge_other", description="Whether to merge authors with less than 5% of the total count into 'Other'.", default=True, ), ], ): """Checks the quote quota for the quotes channel.""" now = discord.utils.utcnow() oldest = now - timedelta(days=days) await ctx.defer() channel = self.quotes_channel or discord.utils.get(ctx.guild.text_channels, name="quotes") if not channel: return await ctx.respond(":x: Cannot find quotes channel.") await ctx.respond("Gathering messages, this may take a moment.") authors = {} filtered_messages = 0 total = 0 async for message in channel.history(limit=None, after=oldest, oldest_first=False): total += 1 if not message.content: filtered_messages += 1 continue if message.attachments: regex = r".*\s*-\s*@?([\w\s]+)" else: regex = r".+\s+-\s*@?([\w\s]+)" if not (m := re.match(regex, str(message.clean_content))): filtered_messages += 1 continue name = m.group(1) name = name.strip().casefold() if name == "me": name = message.author.name.strip().casefold() if name in self.names: name = self.names[name].title() else: filtered_messages += 1 continue elif name in self.names: name = self.names[name].title() elif name.isdigit(): filtered_messages += 1 continue name = name.title() authors.setdefault(name, 0) authors[name] += 1 if not authors: if total: return await ctx.edit( content="No valid messages found in the last {!s} days. " "Make sure quotes are formatted properly ending with ` - AuthorName`" ' (e.g. `"This is my quote" - Jimmy`)'.format(days) ) else: return await ctx.edit(content="No messages found in the last {!s} days.".format(days)) file = await asyncio.to_thread( self.generate_pie_chart, list(authors.keys()), list(authors.values()), merge_other ) return await ctx.edit( content="{:,} messages (out of {:,}) were filtered (didn't follow format?)".format( filtered_messages, total ), file=file, ) def _metacounter( self, messages: list[discord.Message], filter_func: Callable[[discord.Message], bool], *, now: datetime = None ) -> dict[str, float | int | dict[str, int]]: def _is_today(date: datetime) -> bool: return date.date() == now.date() now = now or discord.utils.utcnow().replace(minute=0, second=0, microsecond=0) counts = { "hour": 0, "day": 0, "today": 0, "week": 0, "all_time": 0, "per_minute": 0.0, "per_hour": 0.0, "per_day": 0.0, "hours": {}, } for i in range(24): counts["hours"][str(i)] = 0 for message in messages: if filter_func(message): age = now - message.created_at self.log.debug("%r was a truth (%.2f seconds ago).", message.id, age.total_seconds()) counts["all_time"] += 1 if _is_today(message.created_at): counts["today"] += 1 if message.created_at > now - timedelta(hours=1): counts["hour"] += 1 if message.created_at > now - timedelta(days=1): counts["day"] += 1 if message.created_at > now - timedelta(days=7): counts["week"] += 1 counts["hours"][str(message.created_at.hour)] += 1 counts["per_minute"] = counts["hour"] / 60 counts["per_hour"] = counts["day"] / 24 counts["per_day"] = counts["week"] / 7 self.log.info("Total truth counts: %r", counts) return counts async def _process_trump_truths(self, messages: list[discord.Message]): """ Processes the given messages to count the number of posts by Donald Trump. :param messages: The messages to process :returns: The stats """ def is_truth(msg: discord.Message) -> bool: if msg.author.id == 1101439218334576742: # if msg.created_at.timestamp() <= 1713202855.80234: # # Pre 6:30pm 15th April, embeds were not tagged for detection. # truth = any((_te.type == "rich" for _te in msg.embeds)) # if truth: # self.log.debug("Found untagged rich trump truth embed: %r", msg.id) # return True for __t_e in msg.embeds: if __t_e.type == "rich" and __t_e.colour is not None and __t_e.colour.value == 0x5448EE: self.log.debug("Found tagged rich trump truth embed: %r", msg.id) return True return False return self._metacounter(messages, is_truth) async def _process_tate_truths(self, messages: list[discord.Message]): """ Processes the given messages to count the number of posts by Andrew Tate. :param messages: The messages to process :returns: The stats """ def is_truth(msg: discord.Message) -> bool: if msg.author.id == 1229496078726860921: # All the tate truths are already tagged. for __t_e in msg.embeds: if __t_e.type == "rich" and __t_e.colour.value == 0x5448EE: self.log.debug("Found tagged rich tate truth embed %r", __t_e) return True return False return self._metacounter(messages, is_truth) @staticmethod def _generate_truth_frequency_graph(hours: dict[str, int]) -> discord.File: """ Generates a bar chart representing the most truthed times. :param hours: A dictionary of {"hour": count} :return: A discord.File with the rendered graph """ hrs = [x.zfill(2) for x in hours.keys()] plt.clf() plt.title("Truth times (UTC)") plt.xlabel("Hour") plt.ylabel("Truths") plt.bar(hrs, list(hours.values()), color="#5448EE") average = sum(hours.values()) / len(hours) plt.axhline(average, color="red", linestyle="--", label=f"Average: {average:.1f}") file = io.BytesIO() plt.savefig(file, format="png") file.seek(0) return discord.File(file, "truths.png") async def _process_all_messages(self, channel: discord.TextChannel) -> tuple[discord.Embed, discord.File]: """ Processes all the messages in the given channel. :param channel: The channel to process :returns: The stats """ embed = discord.Embed(title="Truth Counts", color=discord.Color.blurple(), timestamp=discord.utils.utcnow()) messages: list[discord.Message] = await channel.history( limit=None, after=discord.Object(1229487065117233203) ).flatten() trump_stats = await self._process_trump_truths(messages) tate_stats = await self._process_tate_truths(messages) embed.add_field( name="Donald Trump", value=( f"**All time:** {trump_stats['all_time']:,}\n" f"**Today:** {trump_stats['today']:,}\n" f"**Last Week:** {trump_stats['week']:,} ({trump_stats['per_day']:.1f}/day)\n" f"**Last 24 Hours:** {trump_stats['day']:,} ({trump_stats['per_hour']:.1f}/hour)\n" f"**Last Hour:** {trump_stats['hour']:,} ({trump_stats['per_minute']:.1f}/min)" ), ) embed.add_field( name="Andrew Tate", value=( f"**All time:** {tate_stats['all_time']:,}\n" f"**Today:** {tate_stats['today']:,}\n" f"**Last Week:** {tate_stats['week']:,} ({tate_stats['per_day']:.1f}/day)\n" f"**Last 24 Hours:** {tate_stats['day']:,} ({tate_stats['per_hour']:.1f}/hour)\n" f"**Last Hour:** {tate_stats['hour']:,} ({tate_stats['per_minute']:.1f}/min)" ), ) hours = trump_stats["hours"].copy() for stat_k, stat_v in tate_stats["hours"].items(): hours[stat_k] += stat_v graph = await asyncio.to_thread(self._generate_truth_frequency_graph, hours) embed.set_image(url="attachment://truths.png") return embed, graph @commands.slash_command(name="truths") async def truths_counter(self, ctx: discord.ApplicationContext): """Counts the number of times the word 'truth' has been said in the quotes channel.""" channel = discord.utils.get(ctx.guild.text_channels, name="spam") if not channel: return await ctx.respond(":x: Cannot find spam channel.") await ctx.defer() now = discord.utils.utcnow().replace(minute=0, second=0, microsecond=0) embed = discord.Embed( title="Counting truths, please wait.", description="This may take a minute depending on how insane Trump and Tate are feeling.", color=discord.Color.blurple(), timestamp=now, ) await ctx.respond(embed=embed) embed, file = await self._process_all_messages(channel) await ctx.edit(embed=embed, file=file) def setup(bot): bot.add_cog(QuoteQuota(bot))