college-bot-v2/src/cogs/quote_quota.py
nex ba57e6ed43
All checks were successful
Build and Publish Jimmy.2 / build_and_publish (push) Successful in 8s
Tweak quote regex
2024-04-25 23:53:13 +01:00

345 lines
13 KiB
Python

import asyncio
import io
import logging
import re
from datetime import datetime, timedelta
from typing import Annotated, Callable, Iterable
import discord
import matplotlib.pyplot as plt
from discord.ext import commands
from conf import CONFIG
class QuoteQuota(commands.Cog):
def __init__(self, bot):
self.bot = bot
self.quotes_channel_id = CONFIG["quote_a"].get("channel_id")
self.names = CONFIG["quote_a"].get("names", {})
self.log = logging.getLogger("jimmy.cogs.quote_quota")
@property
def quotes_channel(self) -> discord.TextChannel | None:
if self.quotes_channel_id:
c = self.bot.get_channel(self.quotes_channel_id)
if c:
return c
@staticmethod
def generate_pie_chart(usernames: list[str], counts: list[int], no_other: bool = False) -> discord.File:
"""
Converts the given username and count tuples into a nice pretty pie chart.
:param usernames: The usernames
:param counts: The number of times the username appears in the chat
:param no_other: Disables the "other" grouping
:returns: The pie chart image
"""
def pct(v: int):
return f"{v:.1f}% ({round((v / 100) * sum(counts))})"
if no_other is False:
other = []
# Any authors with less than 5% of the total count will be grouped into "other"
for i, author in enumerate(usernames.copy()):
if (c := counts[i]) / sum(counts) < 0.05:
other.append(c)
counts[i] = -1
usernames.remove(author)
if other:
usernames.append("Other")
counts.append(sum(other))
# And now filter out any -1% counts
counts = [c for c in counts if c != -1]
mapping = {}
for i, author in enumerate(usernames):
mapping[author] = counts[i]
# Sort the authors by count
new_mapping = {}
for author, count in sorted(mapping.items(), key=lambda x: x[1], reverse=True):
new_mapping[author] = count
usernames = list(new_mapping.keys())
counts = list(new_mapping.values())
plt.clf()
fig, ax = plt.subplots(figsize=(7, 7))
ax.pie(
counts,
labels=usernames,
autopct=pct,
startangle=90,
radius=1.2,
)
fig.subplots_adjust(left=0.1, bottom=0.1, right=0.9, top=0.9, wspace=0.3, hspace=0.4)
fio = io.BytesIO()
fig.savefig(fio, format="png")
fio.seek(0)
return discord.File(fio, filename="pie.png")
@commands.slash_command()
async def quota(
self,
ctx: discord.ApplicationContext,
days: Annotated[
int,
discord.Option(
int,
name="lookback",
description="How many days to look back on. Defaults to 7.",
default=7,
min_value=1,
max_value=365,
),
],
merge_other: Annotated[
bool,
discord.Option(
bool,
name="merge_other",
description="Whether to merge authors with less than 5% of the total count into 'Other'.",
default=True,
),
],
):
"""Checks the quote quota for the quotes channel."""
now = discord.utils.utcnow()
oldest = now - timedelta(days=days)
await ctx.defer()
channel = self.quotes_channel or discord.utils.get(ctx.guild.text_channels, name="quotes")
if not channel:
return await ctx.respond(":x: Cannot find quotes channel.")
await ctx.respond("Gathering messages, this may take a moment.")
authors = {}
filtered_messages = 0
total = 0
async for message in channel.history(limit=None, after=oldest, oldest_first=False):
total += 1
if not message.content:
filtered_messages += 1
continue
if message.attachments:
regex = r".*\s*-\s*([\w]+)"
else:
regex = r".+\s+-\s*([\w]+)"
if not (m := re.match(regex, str(message.clean_content))):
filtered_messages += 1
continue
name = m.group(1)
name = name.strip().casefold()
if name == "me":
name = message.author.name.strip().casefold()
if name in self.names:
name = self.names[name].title()
else:
filtered_messages += 1
continue
elif name in self.names:
name = self.names[name].title()
elif name.isdigit():
filtered_messages += 1
continue
name = name.title()
authors.setdefault(name, 0)
authors[name] += 1
if not authors:
if total:
return await ctx.edit(
content="No valid messages found in the last {!s} days. "
"Make sure quotes are formatted properly ending with ` - AuthorName`"
' (e.g. `"This is my quote" - Jimmy`)'.format(days)
)
else:
return await ctx.edit(content="No messages found in the last {!s} days.".format(days))
file = await asyncio.to_thread(
self.generate_pie_chart, list(authors.keys()), list(authors.values()), merge_other
)
return await ctx.edit(
content="{:,} messages (out of {:,}) were filtered (didn't follow format?)".format(
filtered_messages, total
),
file=file,
)
def _metacounter(
self, messages: list[discord.Message], filter_func: Callable[[discord.Message], bool], *, now: datetime = None
) -> dict[str, float | int | dict[str, int]]:
def _is_today(date: datetime) -> bool:
return date.date() == now.date()
now = now or discord.utils.utcnow().replace(minute=0, second=0, microsecond=0)
counts = {
"hour": 0,
"day": 0,
"today": 0,
"week": 0,
"all_time": 0,
"per_minute": 0.0,
"per_hour": 0.0,
"per_day": 0.0,
"hours": {},
}
for i in range(24):
counts["hours"][str(i)] = 0
for message in messages:
if filter_func(message):
age = now - message.created_at
self.log.debug("%r was a truth (%.2f seconds ago).", message.id, age.total_seconds())
counts["all_time"] += 1
if _is_today(message.created_at):
counts["today"] += 1
if message.created_at > now - timedelta(hours=1):
counts["hour"] += 1
if message.created_at > now - timedelta(days=1):
counts["day"] += 1
if message.created_at > now - timedelta(days=7):
counts["week"] += 1
counts["hours"][str(message.created_at.hour)] += 1
counts["per_minute"] = counts["hour"] / 60
counts["per_hour"] = counts["day"] / 24
counts["per_day"] = counts["week"] / 7
self.log.info("Total truth counts: %r", counts)
return counts
async def _process_trump_truths(self, messages: list[discord.Message]):
"""
Processes the given messages to count the number of posts by Donald Trump.
:param messages: The messages to process
:returns: The stats
"""
def is_truth(msg: discord.Message) -> bool:
if msg.author.id == 1101439218334576742:
# if msg.created_at.timestamp() <= 1713202855.80234:
# # Pre 6:30pm 15th April, embeds were not tagged for detection.
# truth = any((_te.type == "rich" for _te in msg.embeds))
# if truth:
# self.log.debug("Found untagged rich trump truth embed: %r", msg.id)
# return True
for __t_e in msg.embeds:
if __t_e.type == "rich" and __t_e.colour is not None and __t_e.colour.value == 0x5448EE:
self.log.debug("Found tagged rich trump truth embed: %r", msg.id)
return True
return False
return self._metacounter(messages, is_truth)
async def _process_tate_truths(self, messages: list[discord.Message]):
"""
Processes the given messages to count the number of posts by Andrew Tate.
:param messages: The messages to process
:returns: The stats
"""
def is_truth(msg: discord.Message) -> bool:
if msg.author.id == 1229496078726860921:
# All the tate truths are already tagged.
for __t_e in msg.embeds:
if __t_e.type == "rich" and __t_e.colour.value == 0x5448EE:
self.log.debug("Found tagged rich tate truth embed %r", __t_e)
return True
return False
return self._metacounter(messages, is_truth)
@staticmethod
def _generate_truth_frequency_graph(hours: dict[str, int]) -> discord.File:
"""
Generates a bar chart representing the most truthed times.
:param hours: A dictionary of {"hour": count}
:return: A discord.File with the rendered graph
"""
hrs = [x.zfill(2) for x in hours.keys()]
plt.clf()
plt.title("Truth times (UTC)")
plt.xlabel("Hour")
plt.ylabel("Truths")
plt.bar(hrs, list(hours.values()), color="#5448EE")
average = sum(hours.values()) / len(hours)
plt.axhline(average, color="red", linestyle="--", label=f"Average: {average:.1f}")
file = io.BytesIO()
plt.savefig(file, format="png")
file.seek(0)
return discord.File(file, "truths.png")
async def _process_all_messages(self, channel: discord.TextChannel) -> tuple[discord.Embed, discord.File]:
"""
Processes all the messages in the given channel.
:param channel: The channel to process
:returns: The stats
"""
embed = discord.Embed(title="Truth Counts", color=discord.Color.blurple(), timestamp=discord.utils.utcnow())
messages: list[discord.Message] = await channel.history(
limit=None, after=discord.Object(1229487065117233203)
).flatten()
trump_stats = await self._process_trump_truths(messages)
tate_stats = await self._process_tate_truths(messages)
embed.add_field(
name="Donald Trump",
value=(
f"**All time:** {trump_stats['all_time']:,}\n"
f"**Today:** {trump_stats['today']:,}\n"
f"**Last Week:** {trump_stats['week']:,} ({trump_stats['per_day']:.1f}/day)\n"
f"**Last 24 Hours:** {trump_stats['day']:,} ({trump_stats['per_hour']:.1f}/hour)\n"
f"**Last Hour:** {trump_stats['hour']:,} ({trump_stats['per_minute']:.1f}/min)"
),
)
embed.add_field(
name="Andrew Tate",
value=(
f"**All time:** {tate_stats['all_time']:,}\n"
f"**Today:** {tate_stats['today']:,}\n"
f"**Last Week:** {tate_stats['week']:,} ({tate_stats['per_day']:.1f}/day)\n"
f"**Last 24 Hours:** {tate_stats['day']:,} ({tate_stats['per_hour']:.1f}/hour)\n"
f"**Last Hour:** {tate_stats['hour']:,} ({tate_stats['per_minute']:.1f}/min)"
),
)
hours = trump_stats["hours"].copy()
for stat_k, stat_v in tate_stats["hours"].items():
hours[stat_k] += stat_v
graph = await asyncio.to_thread(self._generate_truth_frequency_graph, hours)
embed.set_image(url="attachment://truths.png")
return embed, graph
@commands.slash_command(name="truths")
async def truths_counter(self, ctx: discord.ApplicationContext):
"""Counts the number of times the word 'truth' has been said in the quotes channel."""
channel = discord.utils.get(ctx.guild.text_channels, name="spam")
if not channel:
return await ctx.respond(":x: Cannot find spam channel.")
await ctx.defer()
now = discord.utils.utcnow().replace(minute=0, second=0, microsecond=0)
embed = discord.Embed(
title="Counting truths, please wait.",
description="This may take a minute depending on how insane Trump and Tate are feeling.",
color=discord.Color.blurple(),
timestamp=now,
)
await ctx.respond(embed=embed)
embed, file = await self._process_all_messages(channel)
await ctx.edit(embed=embed, file=file)
def setup(bot):
bot.add_cog(QuoteQuota(bot))