college-bot-v1/cogs/events.py

379 lines
15 KiB
Python
Raw Normal View History

2023-11-04 17:18:39 +00:00
import asyncio
2023-04-02 17:02:26 +01:00
import hashlib
2023-05-04 18:03:28 +01:00
import inspect
2023-03-17 11:00:42 +00:00
import io
2023-04-02 17:02:26 +01:00
import json
2023-11-07 11:43:57 +00:00
import logging
2023-04-02 17:02:26 +01:00
import os
2022-12-28 21:26:56 +00:00
import random
2023-03-17 10:41:29 +00:00
import re
2023-03-22 14:56:33 +00:00
import subprocess
2023-11-04 17:18:39 +00:00
import textwrap
2023-05-04 18:03:28 +01:00
import traceback
2023-04-02 17:02:26 +01:00
import warnings
2023-11-04 17:18:39 +00:00
from datetime import datetime, timedelta, timezone
2022-12-31 17:13:40 +00:00
from pathlib import Path
2023-11-04 17:18:39 +00:00
from typing import Any, Dict, Optional, Tuple
2022-10-06 09:20:23 +01:00
import discord
2023-03-17 10:41:29 +00:00
import httpx
2023-11-29 11:53:42 +00:00
import pydantic
2023-11-04 17:18:39 +00:00
from bs4 import BeautifulSoup
from discord.ext import commands, pages, tasks
2023-12-05 21:12:33 +00:00
from config import guilds
2023-11-04 17:18:39 +00:00
2023-04-02 17:02:26 +01:00
try:
from config import dev
except ImportError:
dev = False
2023-02-23 14:30:49 +00:00
try:
from config import OAUTH_REDIRECT_URI
except ImportError:
OAUTH_REDIRECT_URI = None
2023-03-17 10:41:29 +00:00
try:
2023-11-04 17:18:39 +00:00
from config import GITHUB_PASSWORD, GITHUB_USERNAME
2023-03-17 10:41:29 +00:00
except ImportError:
GITHUB_USERNAME = None
GITHUB_PASSWORD = None
2022-10-06 09:20:23 +01:00
2023-04-02 17:02:26 +01:00
try:
from config import SPAM_CHANNEL
except ImportError:
SPAM_CHANNEL = None
2022-10-06 09:20:23 +01:00
LTR = "\N{black rightwards arrow}\U0000fe0f"
RTL = "\N{leftwards black arrow}\U0000fe0f"
class MessagePayload(pydantic.BaseModel):
class MessageAttachmentPayload(pydantic.BaseModel):
url: str
proxy_url: str
filename: str
size: int
2023-11-07 11:34:53 +00:00
width: Optional[int] = None
height: Optional[int] = None
content_type: str
2024-02-21 15:46:24 +00:00
event_type: str = "create"
message_id: int
author: str
2023-11-07 11:32:30 +00:00
is_automated: bool = False
avatar: str
content: str
clean_content: str
at: float
attachments: list[MessageAttachmentPayload] = []
reply_to: Optional["MessagePayload"] = None
2023-03-31 11:55:36 +01:00
async def _dc(client: discord.VoiceClient | None):
if client is None:
return
2023-03-22 16:51:54 +00:00
if client.is_playing():
client.stop()
2023-03-22 15:37:06 +00:00
try:
2023-03-22 16:51:54 +00:00
await client.disconnect(force=True)
2023-03-22 15:37:06 +00:00
finally:
2023-03-22 16:51:54 +00:00
# client.cleanup()
pass
2023-03-22 15:37:06 +00:00
2022-10-06 09:20:23 +01:00
class Events(commands.Cog):
def __init__(self, bot):
self.bot = bot
2023-03-17 10:41:29 +00:00
self.http = httpx.AsyncClient()
2023-08-06 15:42:38 +01:00
if not hasattr(self.bot, "bridge_queue") or self.bot.bridge_queue.empty():
2023-08-07 13:06:28 +01:00
self.bot.bridge_queue = asyncio.Queue()
2023-04-02 17:02:26 +01:00
self.fetch_discord_atom_feed.start()
2023-08-07 13:06:28 +01:00
self.bridge_health = False
2023-12-05 17:26:09 +00:00
self.log = logging.getLogger("jimmy.cogs.events")
2023-04-02 17:02:26 +01:00
def cog_unload(self):
self.fetch_discord_atom_feed.cancel()
2022-10-13 08:53:45 +01:00
2022-10-06 09:34:23 +01:00
@commands.Cog.listener("on_raw_reaction_add")
2022-10-06 09:20:23 +01:00
async def on_raw_reaction_add(self, payload: discord.RawReactionActionEvent):
channel: Optional[discord.TextChannel] = self.bot.get_channel(payload.channel_id)
2022-10-06 09:31:20 +01:00
if channel is not None:
2022-10-06 09:20:23 +01:00
try:
message: discord.Message = await channel.fetch_message(payload.message_id)
except discord.HTTPException:
return
2023-03-31 11:55:36 +01:00
if payload.emoji.name == "\N{wastebasket}\U0000fe0f":
2023-11-12 19:28:44 +00:00
if message.author.bot:
2023-03-31 11:55:36 +01:00
await message.delete(delay=0.25)
2022-10-06 09:20:23 +01:00
@commands.Cog.listener()
async def on_member_join(self, member: discord.Member):
if member.guild is None or member.guild.id not in guilds:
return
channel: discord.TextChannel = discord.utils.get(member.guild.text_channels, name="general")
if channel and channel.can_send():
await channel.send(
2023-12-05 17:26:09 +00:00
f"{LTR} {member.mention}"
)
2022-10-06 09:20:23 +01:00
@commands.Cog.listener()
async def on_member_remove(self, member: discord.Member):
if member.guild is None or member.guild.id not in guilds:
return
channel: discord.TextChannel = discord.utils.get(member.guild.text_channels, name="general")
if channel and channel.can_send():
await channel.send(
2023-12-05 17:26:09 +00:00
f"{RTL} {member.mention}"
)
2022-10-06 09:20:23 +01:00
2023-03-22 14:56:33 +00:00
@commands.Cog.listener()
2023-11-04 17:18:39 +00:00
async def on_voice_state_update(self, member: discord.Member, *_):
2023-03-22 15:02:46 +00:00
me_voice = member.guild.me.voice
2023-03-22 16:30:24 +00:00
if me_voice is None or me_voice.channel is None or member.guild.voice_client is None:
2023-03-22 14:56:33 +00:00
return
channel = me_voice.channel
2023-05-01 19:40:11 +01:00
members = [m for m in channel.members if not m.bot]
if len(members) == 0:
2023-03-22 14:56:33 +00:00
# We are the only one in the channel
2023-03-22 15:37:06 +00:00
await _dc(member.guild.voice_client)
2023-03-22 14:56:33 +00:00
2022-10-09 19:27:02 +01:00
@commands.Cog.listener()
async def on_message(self, message: discord.Message):
2022-10-30 16:05:54 +00:00
if not message.guild:
2023-01-01 20:55:36 +00:00
return
2023-01-25 20:33:15 +00:00
2023-08-07 13:06:28 +01:00
if message.channel.name == "femboy-hole":
def generate_payload(_message: discord.Message) -> MessagePayload:
_payload = MessagePayload(
message_id=_message.id,
2024-02-21 15:46:24 +00:00
author=_message.author.display_name,
2023-11-07 11:32:30 +00:00
is_automated=_message.author.bot or _message.author.system,
avatar=_message.author.display_avatar.with_static_format("webp").with_size(512).url,
2023-12-05 21:12:33 +00:00
content=_message.content or "",
clean_content=str(_message.clean_content or ""),
at=_message.created_at.timestamp(),
)
for attachment in _message.attachments:
_payload.attachments.append(
MessagePayload.MessageAttachmentPayload(
url=attachment.url,
2023-11-07 11:36:19 +00:00
filename=attachment.filename,
proxy_url=attachment.proxy_url,
size=attachment.size,
width=attachment.width,
height=attachment.height,
2023-12-05 21:12:33 +00:00
content_type=attachment.content_type,
)
)
if _message.reference is not None and _message.reference.cached_message:
2023-11-07 11:43:57 +00:00
try:
_payload.reply_to = generate_payload(_message.reference.cached_message)
except RecursionError:
_payload.reply_to = None
2023-12-05 21:12:33 +00:00
logging.warning("Failed to generate reply payload for message %s", _message.id, exc_info=True)
return _payload
payload = generate_payload(message)
if message.author != self.bot.user and (payload.content or payload.attachments):
await self.bot.bridge_queue.put(payload.model_dump())
2023-06-28 22:19:40 +01:00
2024-02-21 01:13:39 +00:00
if message.channel.name in ("verify", "timetable") and message.author != self.bot.user:
if message.channel.permissions_for(message.guild.me).manage_messages:
await message.delete(delay=1)
2022-10-09 19:27:02 +01:00
if message.content:
assets = Path.cwd() / "assets"
words = re.split(r"\s+", message.content)
2024-03-29 13:25:09 +00:00
words = tuple(map(str.lower, words))
if "lupupa" in words and (file := assets / "lupupa.jpg").exists():
await message.reply(file=discord.File(file), delete_after=60)
elif any(word in words for word in ("fedora", "nix", "nixos")) and (file := assets / "fedora.jpg").exists():
await message.reply(file=discord.File(file), delete_after=60)
elif "carat" in words and (file := assets / "carat.jpg").exists():
await message.reply(file=discord.File(file), delete_after=60)
elif "boris" in words and (file := assets / "boris.jpg").exists():
await message.reply(file=discord.File(file), delete_after=60)
elif "twitter" in words or "vxtwitter" in words:
2024-04-02 15:49:34 +01:00
new_content = re.sub(
r"(^|\W+)twitter",
lambda m: f'~~{m.group()}~~ \U0001D54F',
message.content,
flags=re.IGNORECASE
)
2024-03-29 13:22:20 +00:00
if len(new_content) > 2000:
2024-04-02 15:49:34 +01:00
new_content = re.sub(
r"(^|\W+)twitter",
'\U0001D54F',
message.content,
flags=re.IGNORECASE
)
new_content = new_content.replace("vxtwitter", "fixupx")
2024-03-29 13:22:20 +00:00
await message.reply(new_content, delete_after=300)
2024-02-21 15:46:24 +00:00
@commands.Cog.listener("on_message_edit")
async def on_message_edit(self, before: discord.Message, after: discord.Message):
if before.author.bot or before.author.system:
return
if before.channel.name == "femboy-hole":
if before.content != after.content:
_payload = MessagePayload(
message_id=before.id,
author=after.author.display_name,
is_automated=after.author.bot or after.author.system,
avatar=after.author.display_avatar.with_static_format("webp").with_size(512).url,
content=after.content or "",
clean_content=str(after.clean_content or ""),
at=(after.edited_at or after.created_at).timestamp(),
event_type="edit"
)
await self.bot.bridge_queue.put(_payload.model_dump())
@commands.Cog.listener("on_message_delete")
async def on_message_delete(self, message: discord.Message):
if message.author.bot or message.author.system:
return
if message.channel.name == "femboy-hole":
_payload = MessagePayload(
message_id=message.id,
author=message.author.display_name,
is_automated=message.author.bot or message.author.system,
avatar=message.author.display_avatar.with_static_format("webp").with_size(512).url,
content=message.content or "",
clean_content=str(message.clean_content or ""),
at=message.created_at.timestamp(),
event_type="redact"
)
await self.bot.bridge_queue.put(_payload.model_dump())
2023-04-02 17:02:26 +01:00
@tasks.loop(minutes=10)
async def fetch_discord_atom_feed(self):
if not SPAM_CHANNEL:
return
if not self.bot.is_ready():
await self.bot.wait_until_ready()
channel = self.bot.get_channel(SPAM_CHANNEL)
if channel is None or not channel.can_send(discord.Embed()):
warnings.warn("Cannot send to spam channel, disabling feed fetcher")
return
2023-11-04 17:18:39 +00:00
headers = {"User-Agent": f"python-httpx/{httpx.__version__} (Like Akregator/5.22.3); syndication"}
2023-04-02 17:02:26 +01:00
file = Path.home() / ".cache" / "lcc-bot" / "discord.atom"
if not file.exists():
file.parent.mkdir(parents=True, exist_ok=True)
last_modified = discord.utils.utcnow()
if dev:
last_modified = last_modified.replace(day=1, month=last_modified.month - 1)
else:
# calculate the sha256 hash of the file, returning the first 32 characters
# this is used to check if the file has changed
_hash = hashlib.sha256()
with file.open("rb") as f:
for chunk in iter(lambda: f.read(4096), b""):
_hash.update(chunk)
_hash = _hash.hexdigest()[:32]
headers["If-None-Match"] = f'W/"{_hash}"'
last_modified = datetime.fromtimestamp(file.stat().st_mtime, tz=timezone.utc)
try:
response = await self.http.get("https://discordstatus.com/history.atom", headers=headers)
except httpx.HTTPError as e:
2023-12-05 17:26:09 +00:00
self.log.error("Failed to fetch discord atom feed: %r", e, exc_info=e)
2023-04-02 17:02:26 +01:00
return
if response.status_code == 304:
return
if response.status_code != 200:
2023-12-05 17:26:09 +00:00
self.log.error("Failed to fetch discord atom feed: HTTP/%s", response.status_code)
2023-04-02 17:02:26 +01:00
return
with file.open("wb") as f:
f.write(response.content)
incidents_file = Path.home() / ".cache" / "lcc-bot" / "history.json"
if not incidents_file.exists():
incidents_file.parent.mkdir(parents=True, exist_ok=True)
incidents = {}
else:
with incidents_file.open("r") as f:
incidents = json.load(f)
soup = BeautifulSoup(response.content, "lxml-xml")
for entry in soup.find_all("entry"):
published_tag = entry.find("published")
updated_tag = entry.find("updated") or published_tag
published = datetime.fromisoformat(published_tag.text)
updated = datetime.fromisoformat(updated_tag.text)
if updated > last_modified:
title = entry.title.text
content = ""
soup2 = BeautifulSoup(entry.content.text, "html.parser")
sep = os.urandom(16).hex()
for br in soup2.find_all("br"):
br.replace_with(sep)
for _tag in soup2.find_all("p"):
text = _tag.get_text()
date, _content = text.split(sep, 1)
_content = _content.replace(sep, "\n")
date = re.sub(r"\s{2,}", " ", date)
try:
date = datetime.strptime(date, "%b %d, %H:%M PDT")
offset = -7
except ValueError:
date = datetime.strptime(date, "%b %d, %H:%M PST")
offset = -8
date = date.replace(year=updated.year, tzinfo=timezone(timedelta(hours=offset)))
content += f"[{discord.utils.format_dt(date)}]\n> "
content += "\n> ".join(_content.splitlines())
content += "\n\n"
_status = {
"resolved": discord.Color.green(),
"investigating": discord.Color.dark_orange(),
"identified": discord.Color.orange(),
"monitoring": discord.Color.blurple(),
2023-04-02 17:02:26 +01:00
}
colour = _status.get(content.splitlines()[1].split(" - ")[0].lower(), discord.Color.greyple())
2023-04-02 17:02:26 +01:00
if len(content) > 4096:
content = f"[open on discordstatus.com (too large to display)]({entry.link['href']})"
embed = discord.Embed(
2023-11-04 17:18:39 +00:00
title=title, description=content, color=colour, url=entry.link["href"], timestamp=updated
2023-04-02 17:02:26 +01:00
)
embed.set_author(
name="Discord Status",
url="https://discordstatus.com/",
icon_url="https://raw.githubusercontent.com/EEKIM10/LCC-bot/"
2023-11-04 17:18:39 +00:00
"fe0cb6dd932f9fc2cb0a26433aff8e4cce19279a/assets/discord.png",
2023-04-02 17:02:26 +01:00
)
embed.set_footer(
text="Published: {} | Updated: {}".format(
datetime.fromisoformat(entry.find("published").text).strftime("%Y-%m-%d %H:%M:%S"),
2023-11-04 17:18:39 +00:00
updated.strftime("%Y-%m-%d %H:%M:%S"),
2023-04-02 17:02:26 +01:00
)
)
if entry.id.text not in incidents:
msg = await channel.send(embed=embed)
incidents[entry.id.text] = msg.id
else:
try:
msg = await channel.fetch_message(incidents[entry.id.text])
await msg.edit(embed=embed)
except discord.HTTPException:
msg = await channel.send(embed=embed)
incidents[entry.id.text] = msg.id
with incidents_file.open("w") as f:
json.dump(incidents, f, separators=(",", ":"))
2022-10-06 09:20:23 +01:00
def setup(bot):
bot.add_cog(Events(bot))