college-bot-v1/cogs/events.py

701 lines
29 KiB
Python
Raw Normal View History

2023-04-02 17:02:26 +01:00
import hashlib
2023-05-04 18:03:28 +01:00
import inspect
2023-03-17 11:00:42 +00:00
import io
2023-04-02 17:02:26 +01:00
import json
import os
2022-12-28 21:26:56 +00:00
import random
2023-03-17 10:41:29 +00:00
import re
2023-03-22 14:56:33 +00:00
import asyncio
2023-02-24 08:40:25 +00:00
import textwrap
2023-03-22 14:56:33 +00:00
import subprocess
2023-05-04 18:03:28 +01:00
import traceback
2023-04-02 17:02:26 +01:00
import warnings
from datetime import datetime, timezone, timedelta
from bs4 import BeautifulSoup
2022-12-31 17:13:40 +00:00
from pathlib import Path
2023-05-04 18:03:28 +01:00
from typing import Optional, Tuple, Dict, Any
2022-10-06 09:20:23 +01:00
import discord
2023-03-17 10:41:29 +00:00
import httpx
2023-04-02 17:02:26 +01:00
from discord.ext import commands, pages, tasks
2022-12-07 13:45:58 +00:00
from utils import Student, get_or_none, console
2023-02-09 10:14:20 +00:00
from config import guilds
2023-05-16 16:59:47 +01:00
from utils.db import AccessTokens
2023-04-02 17:02:26 +01:00
try:
from config import dev
except ImportError:
dev = False
2023-02-23 14:30:49 +00:00
try:
from config import OAUTH_REDIRECT_URI
except ImportError:
OAUTH_REDIRECT_URI = None
2023-03-17 10:41:29 +00:00
try:
from config import GITHUB_USERNAME
from config import GITHUB_PASSWORD
except ImportError:
GITHUB_USERNAME = None
GITHUB_PASSWORD = None
2022-10-06 09:20:23 +01:00
2023-04-02 17:02:26 +01:00
try:
from config import SPAM_CHANNEL
except ImportError:
SPAM_CHANNEL = None
2022-10-06 09:20:23 +01:00
LTR = "\N{black rightwards arrow}\U0000fe0f"
RTL = "\N{leftwards black arrow}\U0000fe0f"
2023-03-31 11:55:36 +01:00
async def _dc(client: discord.VoiceClient | None):
if client is None:
return
2023-03-22 16:51:54 +00:00
if client.is_playing():
client.stop()
2023-03-22 15:37:06 +00:00
try:
2023-03-22 16:51:54 +00:00
await client.disconnect(force=True)
2023-03-22 15:37:06 +00:00
finally:
2023-03-22 16:51:54 +00:00
# client.cleanup()
pass
2023-03-22 15:37:06 +00:00
2022-10-06 09:20:23 +01:00
class Events(commands.Cog):
def __init__(self, bot):
self.bot = bot
2023-03-17 10:41:29 +00:00
self.http = httpx.AsyncClient()
2023-04-02 17:02:26 +01:00
self.fetch_discord_atom_feed.start()
def cog_unload(self):
self.fetch_discord_atom_feed.cancel()
2022-10-13 08:53:45 +01:00
# noinspection DuplicatedCode
async def analyse_text(self, text: str) -> Optional[Tuple[float, float, float, float]]:
"""Analyse text for positivity, negativity and neutrality."""
def inner():
try:
from utils.sentiment_analysis import intensity_analyser
except ImportError:
return None
scores = intensity_analyser.polarity_scores(text)
return scores["pos"], scores["neu"], scores["neg"], scores["compound"]
async with self.bot.training_lock:
return await self.bot.loop.run_in_executor(None, inner)
2022-10-06 09:34:23 +01:00
@commands.Cog.listener("on_raw_reaction_add")
2022-10-06 09:20:23 +01:00
async def on_raw_reaction_add(self, payload: discord.RawReactionActionEvent):
channel: Optional[discord.TextChannel] = self.bot.get_channel(payload.channel_id)
2022-10-06 09:31:20 +01:00
if channel is not None:
2022-10-06 09:20:23 +01:00
try:
message: discord.Message = await channel.fetch_message(payload.message_id)
except discord.HTTPException:
return
2023-03-31 11:55:36 +01:00
if payload.emoji.name == "\N{wastebasket}\U0000fe0f":
if message.author.id == self.bot.user.id:
await message.delete(delay=0.25)
2023-03-31 11:59:17 +01:00
elif message.channel.permissions_for(message.guild.me).manage_messages:
2023-03-31 11:55:36 +01:00
reactions = 0
mod_reactions = 0
for reaction in message.reactions:
if reaction.emoji == payload.emoji:
async for member in reaction.users():
if member.id == self.bot.user.id:
continue
if member.guild_permissions.manage_messages:
mod_reactions += 1
reactions += 1
2023-03-31 11:57:45 +01:00
if reactions >= 2 or mod_reactions >= 1:
2023-03-31 11:55:36 +01:00
await message.delete(delay=0.1)
2022-10-06 09:20:23 +01:00
@commands.Cog.listener()
async def on_member_join(self, member: discord.Member):
if member.guild is None or member.guild.id not in guilds:
return
student: Optional[Student] = await get_or_none(Student, user_id=member.id)
if student and student.id:
role = discord.utils.find(lambda r: r.name.lower() == "verified", member.guild.roles)
if role and role < member.guild.me.top_role:
await member.add_roles(role, reason="Verified")
channel: discord.TextChannel = discord.utils.get(member.guild.text_channels, name="general")
if channel and channel.can_send():
await channel.send(
f"{LTR} {member.mention} (`{member}`, {f'{student.id}' if student else 'pending verification'})"
)
2022-10-06 09:20:23 +01:00
@commands.Cog.listener()
async def on_member_remove(self, member: discord.Member):
if member.guild is None or member.guild.id not in guilds:
return
student: Optional[Student] = await get_or_none(Student, user_id=member.id)
channel: discord.TextChannel = discord.utils.get(member.guild.text_channels, name="general")
if channel and channel.can_send():
await channel.send(
f"{RTL} {member.mention} (`{member}`, {f'{student.id}' if student else 'pending verification'})"
)
2022-10-06 09:20:23 +01:00
2023-03-17 10:41:29 +00:00
async def process_message_for_github_links(self, message: discord.Message):
RAW_URL = "https://github.com/{repo}/raw/{branch}/{path}"
_re = re.match(
r"https://github\.com/(?P<repo>[a-zA-Z0-9-]+/[\w.-]+)/blob/(?P<path>[^#>]+)(\?[^#>]+)?"
r"(#L(?P<start_line>\d+)(([-~:]|(\.\.))L(?P<end_line>\d+))?)",
message.content
)
if _re:
branch, path = _re.group("path").split("/", 1)
2023-03-17 10:48:07 +00:00
_p = Path(path).suffix
2023-03-17 10:41:29 +00:00
url = RAW_URL.format(
repo=_re.group("repo"),
branch=branch,
path=path
)
if all((GITHUB_PASSWORD, GITHUB_USERNAME)):
auth = (GITHUB_USERNAME, GITHUB_PASSWORD)
else:
auth = None
response = await self.http.get(url, follow_redirects=True, auth=auth)
if response.status_code == 200:
ctx = await self.bot.get_context(message)
lines = response.text.splitlines()
if _re.group("start_line"):
start_line = int(_re.group("start_line")) - 1
end_line = int(_re.group("end_line")) if _re.group("end_line") else start_line + 1
lines = lines[start_line:end_line]
2023-03-17 10:48:07 +00:00
paginator = commands.Paginator(prefix="```" + _p[1:], suffix="```", max_size=1000)
2023-03-17 10:41:29 +00:00
for line in lines:
paginator.add_line(line)
_pages = paginator.pages
paginator2 = pages.Paginator(_pages, timeout=300)
# noinspection PyTypeChecker
await paginator2.send(ctx, reference=message.to_reference())
if message.channel.permissions_for(message.guild.me).manage_messages:
await message.edit(suppress=True)
2023-03-17 11:00:42 +00:00
else:
RAW_URL = "https://github.com/{repo}/archive/refs/heads/{branch}.zip"
_full_re = re.finditer(
r"https://github\.com/(?P<repo>[a-zA-Z0-9-]+/[\w.-]+)(/tree/(?P<branch>[^#>]+))?\.(git|zip)",
message.content
)
for _match in _full_re:
repo = _match.group("repo")
branch = _match.group("branch") or "master"
url = RAW_URL.format(
repo=repo,
branch=branch,
)
if all((GITHUB_PASSWORD, GITHUB_USERNAME)):
auth = (GITHUB_USERNAME, GITHUB_PASSWORD)
else:
auth = None
async with message.channel.typing():
response = await self.http.get(url, follow_redirects=True, auth=auth)
if response.status_code == 200:
content = response.content
if len(content) > message.guild.filesize_limit - 1000:
continue
_io = io.BytesIO(content)
fn = f"{repo.replace('/', '-')}-{branch}.zip"
await message.reply(file=discord.File(_io, filename=fn))
if message.channel.permissions_for(message.guild.me).manage_messages:
await message.edit(suppress=True)
2023-03-17 10:41:29 +00:00
2023-03-22 14:56:33 +00:00
@commands.Cog.listener()
async def on_voice_state_update(
2023-05-05 00:11:29 +01:00
self,
2023-03-22 14:56:33 +00:00
member: discord.Member,
*_
):
2023-03-22 15:02:46 +00:00
me_voice = member.guild.me.voice
2023-03-22 16:30:24 +00:00
if me_voice is None or me_voice.channel is None or member.guild.voice_client is None:
2023-03-22 14:56:33 +00:00
return
channel = me_voice.channel
2023-05-01 19:40:11 +01:00
members = [m for m in channel.members if not m.bot]
if len(members) == 0:
2023-03-22 14:56:33 +00:00
# We are the only one in the channel
2023-03-22 15:37:06 +00:00
await _dc(member.guild.voice_client)
2023-03-22 14:56:33 +00:00
2022-10-09 19:27:02 +01:00
@commands.Cog.listener()
async def on_message(self, message: discord.Message):
2023-05-05 12:08:59 +01:00
def play_voice(_file):
async def internal():
if message.author.voice is not None and message.author.voice.channel is not None:
voice: discord.VoiceClient | None = None
if message.guild.voice_client is not None:
# noinspection PyUnresolvedReferences
if message.guild.voice_client.is_playing():
return
try:
await _dc(message.guild.voice_client)
except discord.HTTPException:
pass
2023-05-04 18:03:28 +01:00
try:
2023-05-05 12:08:59 +01:00
voice = await message.author.voice.channel.connect(timeout=10, reconnect=False)
except asyncio.TimeoutError:
await message.channel.trigger_typing()
await message.reply(
"I'd play the song but discord's voice servers are shit.",
file=discord.File(_file)
2023-05-04 18:03:28 +01:00
)
2023-05-05 12:08:59 +01:00
region = message.author.voice.channel.rtc_region
# noinspection PyUnresolvedReferences
console.log(
"Timed out connecting to voice channel: {0.name} in {0.guild.name} "
"(region {1})".format(
message.author.voice.channel,
region.name if region else "auto (unknown)"
)
)
return
2023-05-04 18:03:28 +01:00
2023-05-05 12:08:59 +01:00
if voice.channel != message.author.voice.channel:
await voice.move_to(message.author.voice.channel)
if message.guild.me.voice.self_mute or message.guild.me.voice.mute:
await _dc(voice)
await message.channel.trigger_typing()
await message.reply("Unmute me >:(", file=discord.File(_file))
else:
def after(err):
2023-05-05 20:08:47 +01:00
self.bot.loop.create_task(
2023-05-05 12:08:59 +01:00
_dc(voice),
)
if err is not None:
console.log(f"Error playing audio: {err}")
2023-05-05 20:08:47 +01:00
self.bot.loop.create_task(
message.add_reaction("\N{speaker with cancellation stroke}")
)
else:
self.bot.loop.create_task(
message.remove_reaction("\N{speaker with three sound waves}", self.bot.user)
)
self.bot.loop.create_task(
message.add_reaction("\N{speaker}")
)
2023-05-05 12:08:59 +01:00
# noinspection PyTypeChecker
src = discord.FFmpegPCMAudio(str(_file.resolve()), stderr=subprocess.DEVNULL)
src = discord.PCMVolumeTransformer(src, volume=0.5)
voice.play(
src,
after=after
2023-05-04 18:03:28 +01:00
)
2023-05-05 20:08:47 +01:00
if message.channel.permissions_for(message.guild.me).add_reactions:
await message.add_reaction("\N{speaker with three sound waves}")
2023-05-05 12:08:59 +01:00
else:
await message.channel.trigger_typing()
await message.reply(file=discord.File(_file))
return internal
2023-05-04 18:03:28 +01:00
async def send_smeg():
directory = Path.cwd() / "assets" / "smeg"
if directory:
choice = random.choice(list(directory.iterdir()))
_file = discord.File(
choice,
filename="%s.%s" % (os.urandom(32).hex(), choice.suffix)
)
2023-05-06 17:06:44 +01:00
await message.reply(file=_file, delete_after=60)
2023-05-04 18:03:28 +01:00
async def send_what():
msg = message.reference.cached_message
if not msg:
try:
msg = await message.channel.fetch_message(message.reference.message_id)
except discord.HTTPException:
return
if msg.content.count(f"{self.bot.user.mention} said ") >= 2:
await message.reply("You really are deaf, aren't you.")
elif not msg.content:
await message.reply(
"Maybe *I* need to get my hearing checked, I have no idea what {} said.".format(
msg.author.mention
)
)
else:
text = "{0.author.mention} said '{0.content}', you deaf sod.".format(
msg
)
_content = textwrap.shorten(
text, width=2000, placeholder="[...]"
)
await message.reply(_content, allowed_mentions=discord.AllowedMentions.none())
2023-05-16 16:59:47 +01:00
async def send_fuck_you() -> str:
student = await get_or_none(AccessTokens, user_id=message.author.id)
if student.ip_info is None or student.expires >= discord.utils.utcnow().timestamp():
2023-05-04 18:03:28 +01:00
if OAUTH_REDIRECT_URI:
2023-05-16 16:59:47 +01:00
return f"Let me see who you are, and then we'll talk... <{OAUTH_REDIRECT_URI}>"
2023-05-04 18:03:28 +01:00
else:
2023-05-16 16:59:47 +01:00
return "I literally don't even know who you are..."
2023-05-04 18:03:28 +01:00
else:
ip = student.ip_info
is_proxy = ip.get("proxy")
if is_proxy is None:
is_proxy = "?"
else:
is_proxy = "\N{WHITE HEAVY CHECK MARK}" if is_proxy else "\N{CROSS MARK}"
is_hosting = ip.get("hosting")
if is_hosting is None:
is_hosting = "?"
else:
is_hosting = "\N{WHITE HEAVY CHECK MARK}" if is_hosting else "\N{CROSS MARK}"
2023-05-16 16:59:47 +01:00
return (
2023-05-04 18:03:28 +01:00
"Nice argument, however,\n"
"IP: {0[query]}\n"
"ISP: {0[isp]}\n"
"Latitude: {0[lat]}\n"
"Longitude: {0[lon]}\n"
"Proxy server: {1}\n"
"VPS (or other hosting) provider: {2}\n\n"
"\N{smiling face with sunglasses}".format(
ip,
is_proxy,
is_hosting
2023-05-16 16:59:47 +01:00
)
2023-05-04 18:03:28 +01:00
)
2022-10-30 16:05:54 +00:00
if not message.guild:
2023-01-01 20:55:36 +00:00
return
2023-01-25 20:33:15 +00:00
2022-10-09 19:27:02 +01:00
if message.channel.name == "pinboard":
2022-10-10 18:15:48 +01:00
if message.type == discord.MessageType.pins_add:
await message.delete(delay=0.01)
else:
try:
await message.pin(reason="Automatic pinboard pinning")
except discord.HTTPException as e:
return await message.reply(f"Failed to auto-pin: {e}", delete_after=10)
elif message.channel.name in ("verify", "timetable") and message.author != self.bot.user:
if message.channel.permissions_for(message.guild.me).manage_messages:
await message.delete(delay=1)
2022-10-09 19:27:02 +01:00
2022-11-10 15:54:40 +00:00
else:
2023-05-04 18:03:28 +01:00
assets = Path.cwd() / "assets"
responses: Dict[str | tuple, Dict[str, Any]] = {
r"ferdi": {
"content": "https://ferdi-is.gay/",
"delete_after": 15,
},
r"\bbee(s)*\b": {
"content": "https://ferdi-is.gay/bee",
},
r"it just works": {
2023-05-06 21:22:22 +01:00
"func": play_voice(assets / "it-just-works.ogg"),
2023-05-05 12:08:59 +01:00
"meta": {
2023-05-06 21:22:22 +01:00
"check": (assets / "it-just-works.ogg").exists
2023-05-05 12:08:59 +01:00
}
2023-05-04 18:03:28 +01:00
},
r"^linux$": {
"content": lambda: (assets / "copypasta.txt").read_text(),
"meta": {
2023-05-04 23:54:32 +01:00
"needs_mention": True,
"check": (assets / "copypasta.txt").exists
2023-05-04 18:03:28 +01:00
}
},
r"carat": {
"file": discord.File(assets / "carat.jpg"),
2023-05-04 23:54:32 +01:00
"delete_after": None,
"meta": {
"check": (assets / "carat.jpg").exists
}
2023-05-04 18:03:28 +01:00
},
2023-05-05 09:32:57 +01:00
r"(lupupa|fuck(ed)? the hell out\W*)": {
2023-05-04 18:21:03 +01:00
"file": discord.File(assets / "lupupa.jpg"),
2023-05-04 23:54:32 +01:00
"meta": {
"check": (assets / "lupupa.jpg").exists
}
2023-05-04 18:21:03 +01:00
},
2023-05-04 18:03:28 +01:00
r"[s5]+(m)+[e3]+[g9]+": {
"func": send_smeg,
"meta": {
"sub": {
2023-05-05 20:08:47 +01:00
r"pattern": r"([-_.\s\u200b])+",
2023-05-04 18:03:28 +01:00
r"with": ''
2023-05-04 23:54:32 +01:00
},
"check": (assets / "smeg").exists
2023-05-04 18:03:28 +01:00
}
},
2023-05-08 00:57:46 +01:00
r"(what|huh)(\?|!)*$": {
2023-05-04 18:03:28 +01:00
"func": send_what,
"meta": {
"check": lambda: message.reference is not None
}
},
("year", "linux", "desktop"): {
"content": lambda: "%s will be the year of the GNU+Linux desktop." % datetime.now().year,
"delete_after": None
},
r"fuck you(\W)*": {
2023-05-18 09:44:35 +01:00
"content": send_fuck_you,
2023-05-04 18:03:28 +01:00
"meta": {
"check": lambda: message.content.startswith(self.bot.user.mention)
}
2023-05-05 12:08:59 +01:00
},
2023-05-05 20:08:47 +01:00
r"mine(ing|d)? (diamonds|away)": {
2023-05-05 12:08:59 +01:00
"func": play_voice(assets / "mine-diamonds.opus"),
"meta": {
"check": (assets / "mine-diamonds.opus").exists
}
2023-05-11 00:46:42 +01:00
},
r"v[ei]r[mg]in(\sme(d|m[a]?)ia\W*)?(\W\w*\W*)?$": {
2023-05-11 00:50:03 +01:00
"content": "Get virgin'd",
2023-05-11 00:46:42 +01:00
"file": lambda: discord.File(
random.choice(list(Path(assets / 'virgin').iterdir()))
),
"meta": {
"check": (assets / 'virgin').exists
}
2023-05-11 17:41:48 +01:00
},
r"richard|(dick\W*$)": {
"file": discord.File(assets / "visio.png"),
"meta": {
"check": (assets / "visio.png").exists
}
2023-05-16 16:59:47 +01:00
},
r"thank(\syou|s)(,)? jimmy": {
"content": "You're welcome, %s!" % message.author.mention,
},
r"(ok )?jimmy (we|i) g[eo]t it": {
"content": "No need to be so rude! Cunt.",
},
r"c(mon|ome on) jimmy": {
"content": "IM TRYING"
2023-05-16 17:13:15 +01:00
},
2023-05-04 18:03:28 +01:00
}
2023-02-24 08:40:25 +00:00
# Stop responding to any bots
2022-11-15 21:45:39 +00:00
if message.author.bot is True:
2022-11-10 15:54:40 +00:00
return
2023-05-04 18:03:28 +01:00
2023-02-24 08:40:25 +00:00
# Only respond if the message has content...
2023-05-04 18:03:28 +01:00
if message.content and message.channel.can_send(discord.Embed, discord.File):
for key, data in responses.items():
meta = data.pop("meta", {})
if meta.get("needs_mention"):
if not self.bot.user.mention not in message.mentions:
continue
if meta.get("check"):
try:
okay = meta["check"]()
except (Exception, RuntimeError):
traceback.print_exc()
okay = False
if not okay:
continue
elif meta.get("checks") and isinstance(meta["checks"], list):
for check in meta["checks"]:
2023-03-22 16:07:46 +00:00
try:
2023-05-04 18:03:28 +01:00
okay = check()
except (Exception, RuntimeError):
traceback.print_exc()
okay = False
if not okay:
break
2023-03-22 14:56:33 +00:00
else:
2023-05-04 18:03:28 +01:00
continue
if meta.get("sub") is not None and isinstance(meta["sub"], dict):
content = re.sub(
meta["sub"]["pattern"],
meta["sub"]["with"],
message.content
)
else:
content = message.content
if isinstance(key, str):
regex = re.compile(key, re.IGNORECASE)
if not regex.search(content):
continue
elif isinstance(key, tuple):
if not all(k in content for k in key):
continue
if "func" in data:
try:
2023-05-18 09:28:54 +01:00
if inspect.iscoroutinefunction(data["func"]) or inspect.iscoroutine(data["func"]):
2023-05-04 18:03:28 +01:00
await data["func"]()
break
2023-05-01 18:49:23 +01:00
else:
2023-05-04 18:03:28 +01:00
data["func"]()
break
except (Exception, RuntimeError):
traceback.print_exc()
continue
else:
for k, v in data.copy().items():
2023-05-18 09:45:48 +01:00
if inspect.iscoroutinefunction(data[k]) or inspect.iscoroutine(data[k]):
2023-05-18 09:44:35 +01:00
await v()
elif callable(v):
2023-05-04 18:03:28 +01:00
data[k] = v()
2023-05-04 18:21:03 +01:00
data.setdefault("delete_after", 30)
2023-05-18 09:44:35 +01:00
await message.channel.trigger_typing()
2023-05-04 18:03:28 +01:00
await message.reply(**data)
break
2023-05-04 18:03:28 +01:00
await self.process_message_for_github_links(message)
T_EMOJI = "\U0001f3f3\U0000fe0f\U0000200d\U000026a7\U0000fe0f"
G_EMOJI = "\U0001f3f3\U0000fe0f\U0000200d\U0001f308"
N_EMOJI = "\U0001f922"
C_EMOJI = "\U0000271d\U0000fe0f"
reactions = {
r"mpreg|lupupa|\U0001fac3": "\U0001fac3", # mpreg
r"(trans(gender)?($|\W+)|%s)" % T_EMOJI: T_EMOJI, # trans
r"gay|%s" % G_EMOJI: G_EMOJI,
r"(femboy|trans(gender)?($|\W+))": C_EMOJI
}
if message.channel.permissions_for(message.guild.me).add_reactions:
2022-12-28 21:26:56 +00:00
is_naus = random.randint(1, 100) == 32
2023-05-04 18:03:28 +01:00
for key, value in reactions.items():
if re.search(key, message.content, re.IGNORECASE):
await message.add_reaction(value)
2023-02-23 14:42:03 +00:00
2023-05-04 18:03:28 +01:00
if is_naus:
await message.add_reaction(N_EMOJI)
2023-02-23 14:30:49 +00:00
2023-04-02 17:02:26 +01:00
@tasks.loop(minutes=10)
async def fetch_discord_atom_feed(self):
if not SPAM_CHANNEL:
return
if not self.bot.is_ready():
await self.bot.wait_until_ready()
channel = self.bot.get_channel(SPAM_CHANNEL)
if channel is None or not channel.can_send(discord.Embed()):
warnings.warn("Cannot send to spam channel, disabling feed fetcher")
return
headers = {
"User-Agent": f"python-httpx/{httpx.__version__} (Like Akregator/5.22.3); syndication"
}
file = Path.home() / ".cache" / "lcc-bot" / "discord.atom"
if not file.exists():
file.parent.mkdir(parents=True, exist_ok=True)
last_modified = discord.utils.utcnow()
if dev:
last_modified = last_modified.replace(day=1, month=last_modified.month - 1)
else:
# calculate the sha256 hash of the file, returning the first 32 characters
# this is used to check if the file has changed
_hash = hashlib.sha256()
with file.open("rb") as f:
for chunk in iter(lambda: f.read(4096), b""):
_hash.update(chunk)
_hash = _hash.hexdigest()[:32]
headers["If-None-Match"] = f'W/"{_hash}"'
last_modified = datetime.fromtimestamp(file.stat().st_mtime, tz=timezone.utc)
try:
response = await self.http.get("https://discordstatus.com/history.atom", headers=headers)
except httpx.HTTPError as e:
console.log("Failed to fetch discord atom feed:", e)
return
if response.status_code == 304:
return
if response.status_code != 200:
console.log("Failed to fetch discord atom feed:", response.status_code)
return
with file.open("wb") as f:
f.write(response.content)
incidents_file = Path.home() / ".cache" / "lcc-bot" / "history.json"
if not incidents_file.exists():
incidents_file.parent.mkdir(parents=True, exist_ok=True)
incidents = {}
else:
with incidents_file.open("r") as f:
incidents = json.load(f)
soup = BeautifulSoup(response.content, "lxml-xml")
for entry in soup.find_all("entry"):
published_tag = entry.find("published")
updated_tag = entry.find("updated") or published_tag
published = datetime.fromisoformat(published_tag.text)
updated = datetime.fromisoformat(updated_tag.text)
if updated > last_modified:
title = entry.title.text
content = ""
soup2 = BeautifulSoup(entry.content.text, "html.parser")
sep = os.urandom(16).hex()
for br in soup2.find_all("br"):
br.replace_with(sep)
for _tag in soup2.find_all("p"):
text = _tag.get_text()
date, _content = text.split(sep, 1)
_content = _content.replace(sep, "\n")
date = re.sub(r"\s{2,}", " ", date)
try:
date = datetime.strptime(date, "%b %d, %H:%M PDT")
offset = -7
except ValueError:
date = datetime.strptime(date, "%b %d, %H:%M PST")
offset = -8
date = date.replace(year=updated.year, tzinfo=timezone(timedelta(hours=offset)))
content += f"[{discord.utils.format_dt(date)}]\n> "
content += "\n> ".join(_content.splitlines())
content += "\n\n"
_status = {
"Resolved": discord.Color.green(),
"Investigating": discord.Color.dark_orange(),
"Identified": discord.Color.orange(),
"Monitoring": discord.Color.blurple(),
}
colour = _status.get(content.splitlines()[1].split(" - ")[0], discord.Color.greyple())
if len(content) > 4096:
content = f"[open on discordstatus.com (too large to display)]({entry.link['href']})"
embed = discord.Embed(
title=title,
description=content,
color=colour,
url=entry.link["href"],
timestamp=updated
)
embed.set_author(
name="Discord Status",
url="https://discordstatus.com/",
icon_url="https://raw.githubusercontent.com/EEKIM10/LCC-bot/"
"fe0cb6dd932f9fc2cb0a26433aff8e4cce19279a/assets/discord.png"
)
embed.set_footer(
text="Published: {} | Updated: {}".format(
datetime.fromisoformat(entry.find("published").text).strftime("%Y-%m-%d %H:%M:%S"),
updated.strftime("%Y-%m-%d %H:%M:%S")
)
)
if entry.id.text not in incidents:
msg = await channel.send(embed=embed)
incidents[entry.id.text] = msg.id
else:
try:
msg = await channel.fetch_message(incidents[entry.id.text])
await msg.edit(embed=embed)
except discord.HTTPException:
msg = await channel.send(embed=embed)
incidents[entry.id.text] = msg.id
with incidents_file.open("w") as f:
json.dump(incidents, f, separators=(",", ":"))
2022-10-06 09:20:23 +01:00
def setup(bot):
bot.add_cog(Events(bot))