Lobotomise Jimmy

This commit is contained in:
Nexus 2024-02-21 01:13:39 +00:00
parent a16666b689
commit 2e613adaaa
Signed by: nex
GPG key ID: 0FA334385D0B689F
44 changed files with 38 additions and 2806 deletions

View file

@ -1,6 +1,6 @@
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="dataSourceStorageLocal" created-in="PY-232.10227.11">
<component name="dataSourceStorageLocal" created-in="PY-233.13763.11">
<data-source name="main" uuid="28efee07-d306-4126-bf69-01008b4887e2">
<database-info product="SQLite" version="3.39.2" jdbc-version="2.1" driver-name="SQLite JDBC" driver-version="3.39.2.0" dbms="SQLITE" exact-version="3.39.2" exact-driver-version="3.39">
<identifier-quote-string>&quot;</identifier-quote-string>

View file

@ -3,5 +3,5 @@
<component name="Black">
<option name="sdkName" value="Python 3.11 (LCC-bot)" />
</component>
<component name="ProjectRootManager" version="2" project-jdk-name="Python 3.11 (LCC-bot)" project-jdk-type="Python SDK" />
<component name="ProjectRootManager" version="2" project-jdk-name="Python 3.10 (LCC-bot)" project-jdk-type="Python SDK" />
</project>

Binary file not shown.

Before

Width:  |  Height:  |  Size: 309 KiB

Binary file not shown.

Binary file not shown.

Before

Width:  |  Height:  |  Size: 23 KiB

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Before

Width:  |  Height:  |  Size: 8.8 KiB

Binary file not shown.

Binary file not shown.

Binary file not shown.

Before

Width:  |  Height:  |  Size: 22 KiB

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Before

Width:  |  Height:  |  Size: 28 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 52 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 9.4 MiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 2.9 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 2.3 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 3.9 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 3.5 KiB

Binary file not shown.

Binary file not shown.

Before

Width:  |  Height:  |  Size: 67 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 191 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 17 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 5.6 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 14 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 125 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 3.9 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 38 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 77 KiB

Binary file not shown.

View file

@ -110,16 +110,9 @@ class Events(commands.Cog):
if member.guild is None or member.guild.id not in guilds:
return
# student: Optional[Student] = await get_or_none(Student, user_id=member.id)
# if student and student.id:
# role = discord.utils.find(lambda r: r.name.lower() == "verified", member.guild.roles)
# if role and role < member.guild.me.top_role:
# await member.add_roles(role, reason="Verified")
channel: discord.TextChannel = discord.utils.get(member.guild.text_channels, name="general")
if channel and channel.can_send():
await channel.send(
# f"{LTR} {member.mention} (`{member}`, {f'{student.id}' if student else 'pending verification'})"
f"{LTR} {member.mention}"
)
@ -128,77 +121,12 @@ class Events(commands.Cog):
if member.guild is None or member.guild.id not in guilds:
return
# student: Optional[Student] = await get_or_none(Student, user_id=member.id)
channel: discord.TextChannel = discord.utils.get(member.guild.text_channels, name="general")
if channel and channel.can_send():
await channel.send(
# f"{RTL} {member.mention} (`{member}`, {f'{student.id}' if student else 'pending verification'})"
f"{RTL} {member.mention}"
)
async def process_message_for_github_links(self, message: discord.Message):
RAW_URL = "https://github.com/{repo}/raw/{branch}/{path}"
_re = re.match(
r"https://github\.com/(?P<repo>[a-zA-Z0-9-]+/[\w.-]+)/blob/(?P<path>[^#>]+)(\?[^#>]+)?"
r"(#L(?P<start_line>\d+)(([-~:]|(\.\.))L(?P<end_line>\d+))?)",
message.content,
)
if _re:
branch, path = _re.group("path").split("/", 1)
_p = Path(path).suffix
url = RAW_URL.format(repo=_re.group("repo"), branch=branch, path=path)
if all((GITHUB_PASSWORD, GITHUB_USERNAME)):
auth = (GITHUB_USERNAME, GITHUB_PASSWORD)
else:
auth = None
response = await self.http.get(url, follow_redirects=True, auth=auth)
if response.status_code == 200:
ctx = await self.bot.get_context(message)
lines = response.text.splitlines()
if _re.group("start_line"):
start_line = int(_re.group("start_line")) - 1
end_line = int(_re.group("end_line")) if _re.group("end_line") else start_line + 1
lines = lines[start_line:end_line]
paginator = commands.Paginator(prefix="```" + _p[1:], suffix="```", max_size=2000)
for line in lines:
paginator.add_line(line)
_pages = paginator.pages
paginator2 = pages.Paginator(_pages, timeout=300)
# noinspection PyTypeChecker
await paginator2.send(ctx, reference=message.to_reference())
if message.channel.permissions_for(message.guild.me).manage_messages:
await message.edit(suppress=True)
else:
RAW_URL = "https://github.com/{repo}/archive/refs/heads/{branch}.zip"
_full_re = re.finditer(
r"https://github\.com/(?P<repo>[a-zA-Z0-9-]+/[\w.-]+)(/tree/(?P<branch>[^#>]+))?\.(git|zip)",
message.content,
)
for _match in _full_re:
repo = _match.group("repo")
branch = _match.group("branch") or "master"
url = RAW_URL.format(
repo=repo,
branch=branch,
)
if all((GITHUB_PASSWORD, GITHUB_USERNAME)):
auth = (GITHUB_USERNAME, GITHUB_PASSWORD)
else:
auth = None
async with message.channel.typing():
response = await self.http.get(url, follow_redirects=True, auth=auth)
if response.status_code == 200:
content = response.content
if len(content) > message.guild.filesize_limit - 1000:
continue
_io = io.BytesIO(content)
fn = f"{repo.replace('/', '-')}-{branch}.zip"
await message.reply(file=discord.File(_io, filename=fn))
if message.channel.permissions_for(message.guild.me).manage_messages:
await message.edit(suppress=True)
@commands.Cog.listener()
async def on_voice_state_update(self, member: discord.Member, *_):
me_voice = member.guild.me.voice
@ -213,106 +141,6 @@ class Events(commands.Cog):
@commands.Cog.listener()
async def on_message(self, message: discord.Message):
def play_voice(_file):
async def internal():
if message.author.voice is not None and message.author.voice.channel is not None:
voice: discord.VoiceClient | None = None
if message.guild.voice_client is not None:
# noinspection PyUnresolvedReferences
if message.guild.voice_client.is_playing():
return
try:
await _dc(message.guild.voice_client)
except discord.HTTPException:
pass
try:
voice = await message.author.voice.channel.connect(timeout=10, reconnect=False)
except asyncio.TimeoutError:
await message.channel.trigger_typing()
await message.reply(
"I'd play the song but discord's voice servers are shit.", file=discord.File(_file)
)
region = message.author.voice.channel.rtc_region
# noinspection PyUnresolvedReferences
self.log.warning(
"Timed out connecting to voice channel: {0.name} in {0.guild.name} "
"(region {1})".format(
message.author.voice.channel, region.name if region else "auto (unknown)"
)
)
return
if voice.channel != message.author.voice.channel:
await voice.move_to(message.author.voice.channel)
if message.guild.me.voice.self_mute or message.guild.me.voice.mute:
await message.channel.trigger_typing()
await message.reply("Unmute me >:(", file=discord.File(_file))
else:
def after(err):
self.bot.loop.create_task(
_dc(voice),
)
if err is not None:
self.log.error(f"Error playing audio: {err}", exc_info=err)
self.bot.loop.create_task(message.add_reaction("\N{speaker with cancellation stroke}"))
else:
self.bot.loop.create_task(
message.remove_reaction("\N{speaker with three sound waves}", self.bot.user)
)
self.bot.loop.create_task(message.add_reaction("\N{speaker}"))
# noinspection PyTypeChecker
src = discord.FFmpegPCMAudio(str(_file.resolve()), stderr=subprocess.DEVNULL)
src = discord.PCMVolumeTransformer(src, volume=0.5)
voice.play(src, after=after)
if message.channel.permissions_for(message.guild.me).add_reactions:
await message.add_reaction("\N{speaker with three sound waves}")
else:
await message.channel.trigger_typing()
await message.reply(file=discord.File(_file))
return internal
async def send_what():
msg = message.reference.cached_message
if not msg:
try:
msg = await message.channel.fetch_message(message.reference.message_id)
except discord.HTTPException:
return
if msg.content.count(f"{self.bot.user.mention} said ") >= 2:
await message.reply("You really are deaf, aren't you.")
elif not msg.content:
await message.reply(
"Maybe *I* need to get my hearing checked, I have no idea what {} said.".format(msg.author.mention)
)
else:
text = "{0.author.mention} said '{0.content}', you deaf sod.".format(msg)
_content = textwrap.shorten(text, width=2000, placeholder="[...]")
await message.reply(_content, allowed_mentions=discord.AllowedMentions.none())
def get_sloc_count():
root = Path.cwd()
root_files = list(root.glob("**/*.py"))
root_files.append(root / "main.py")
root_files = list(filter(lambda f: "venv" not in f.parents and "venv" not in f.parts, root_files))
lines = 0
for file in root_files:
try:
code = file.read_text()
except (UnicodeDecodeError, IOError, SystemError):
continue
for line in code.splitlines():
line = line.strip()
if not line or line.startswith("#"):
continue
lines += 1
return lines, len(root_files)
if not message.guild:
return
@ -352,200 +180,10 @@ class Events(commands.Cog):
if message.author != self.bot.user and (payload.content or payload.attachments):
await self.bot.bridge_queue.put(payload.model_dump())
if message.channel.name == "pinboard" and not message.content.startswith(("#", "//", ";", "h!")):
if message.type == discord.MessageType.pins_add:
await message.delete(delay=0.01)
else:
try:
await message.pin(reason="Automatic pinboard pinning")
except discord.HTTPException as e:
return await message.reply(f"Failed to auto-pin: {e}", delete_after=10)
elif message.channel.name in ("verify", "timetable") and message.author != self.bot.user:
if message.channel.name in ("verify", "timetable") and message.author != self.bot.user:
if message.channel.permissions_for(message.guild.me).manage_messages:
await message.delete(delay=1)
else:
assets = Path.cwd() / "assets"
responses: Dict[str | tuple, Dict[str, Any]] = {
r"\bbee(s)*\b": {
"content": "https://ferdi-is.gay/bee",
},
r"it just works": {
"func": play_voice(assets / "it-just-works.ogg"),
"meta": {"check": (assets / "it-just-works.ogg").exists},
},
"count to (3|three)": {
"func": play_voice(assets / "count-to-three.ogg"),
"meta": {"check": (assets / "count-to-three.ogg").exists},
},
r"^linux$": {
"content": lambda: (assets / "copypasta.txt").read_text(),
"meta": {"needs_mention": True, "check": (assets / "copypasta.txt").exists},
},
r"carat": {
"file": discord.File(assets / "carat.jpg"),
"delete_after": None,
"meta": {"check": (assets / "carat.jpg").exists},
},
r"(lupupa|fuck(ed)? the hell out\W*)": {
"file": discord.File(assets / "lupupa.jpg"),
"meta": {"check": (assets / "lupupa.jpg").exists},
},
r"[s5]+(m)+[e3]+[g9]+": {
# "func": send_smeg,
"file": lambda: discord.File(random.choice(list((assets / "smeg").iterdir()))),
"delete_after": 30,
"meta": {"sub": {r"pattern": r"([-_.\s\u200b])+", r"with": ""}, "check": (assets / "smeg").exists},
},
r"(what|huh)(\?|!)*$": {"func": send_what, "meta": {"check": lambda: message.reference is not None}},
("year", "linux", "desktop"): {
"content": lambda: "%s will be the year of the GNU+Linux desktop." % datetime.now().year,
"delete_after": None,
},
r"mine(ing|d)? (diamonds|away)": {
"func": play_voice(assets / "mine-diamonds.ogg"),
"meta": {"check": (assets / "mine-diamonds.ogg").exists},
},
r"v[ei]r[mg]in(\sme(d|m[a]?)ia\W*)?(\W\w*\W*)?$": {
"content": "Get virgin'd",
"file": lambda: discord.File(random.choice(list(Path(assets / "virgin").iterdir()))),
"meta": {"check": (assets / "virgin").exists},
},
r"richard|(dick\W*$)": {
"file": discord.File(assets / "visio.png"),
"meta": {"check": (assets / "visio.png").exists},
},
r"thank(\syou|s)(,)? jimmy": {
"content": "You're welcome, %s!" % message.author.mention,
},
r"(ok )?jimmy (we|i) g[eo]t it": {
"content": "No need to be so rude! Cunt.",
},
r"c(mon|ome on) jimmy": {"content": "IM TRYING"},
r"(bor(r)?is|johnson)": {"file": discord.File(assets / "boris.jpeg")},
r"\W?(s(ource\w)?)?l(ines\s)?o(f\s)?c(ode)?(\W)?$": {
"content": lambda: "I have {:,} lines of source code across {:,} files!".format(*get_sloc_count())
},
r"t(ry\s)?i(t\s)?a(nd\s)?see.*": {"content": "https://tryitands.ee"},
r"disgrace": {"file": discord.File(assets / "disgrace.m4a")},
r"china": {"file": discord.File(assets / "china.m4a")},
r"drones": {"file": discord.File(assets / "drones.m4a")},
r"pork($|\W+)|markets": {"file": discord.File(assets / "pork.m4a")},
r"(wo)?man\sis\sa\s(wo)?man|(trans(\s)?)?gender\W*$": {
"file": discord.File(assets / "common-sense.m4a")
},
r"scrapped(\sit)?|((7\s|seven\s)?different\s)?bins|(meat\s|flying\s)?tax": {
"file": discord.File(assets / "scrapped.m4a")
},
r"peppa|pig": {"file": discord.File(assets / "peppa-pig.m4a")},
r"brush|hair": {"file": discord.File(assets / "hair.m4a")},
r"((cup\s)?of\s)?tea\W*$": {"file": discord.File(assets / "tea.m4a")},
r"wheat|fields": {"file": discord.File(assets / "wheat.m4a")},
r"(\W|^)bus((s)?es)?\W*$": {"file": discord.File(assets / "bus.m4a")},
r"^DoH$": {"content": "DoH: Domain Name Service over Hyper Text Transfer Protocol Secure"},
r"^DoT$": {"content": "DoT: Domain Name Service over Transport Layer Security"},
r"^DoQ$": {
"content": "DoQ: Domain Name Service over Quick User Datagram Protocol Internet Connections"
},
r"^(Do)?DTLS$": {"content": "DoDTLS: Domain Name Service over Datagram Transport Layer Security"},
r"^(arch|nix(os)?)$": {
"content": "https://kvinneby.vendicated.dev/@vee/statuses/01HJPVWA7CPFP14YAQ9DTGX1HK",
"file": discord.File(assets / "fedora.webp")
}
}
# Stop responding to any bots
if message.author.bot is True:
return
# Only respond if the message has content...
if message.content and message.channel.can_send(discord.Embed, discord.File):
for key, data in responses.items():
meta = data.pop("meta", {})
if meta.get("needs_mention"):
if not self.bot.user.mention not in message.mentions:
continue
if meta.get("check"):
try:
okay = meta["check"]()
except (Exception, RuntimeError):
traceback.print_exc()
okay = False
if not okay:
continue
elif meta.get("checks") and isinstance(meta["checks"], list):
for check in meta["checks"]:
try:
okay = check()
except (Exception, RuntimeError):
traceback.print_exc()
okay = False
if not okay:
break
else:
continue
if meta.get("sub") is not None and isinstance(meta["sub"], dict):
content = re.sub(meta["sub"]["pattern"], meta["sub"]["with"], message.content)
else:
content = message.content
if isinstance(key, str):
regex = re.compile(key.casefold(), re.IGNORECASE)
if not regex.search(content):
continue
elif isinstance(key, tuple):
if not all(k in content for k in key):
continue
if "func" in data:
try:
if inspect.iscoroutinefunction(data["func"]) or inspect.iscoroutine(data["func"]):
await data["func"]()
break
else:
data["func"]()
break
except (Exception, RuntimeError):
traceback.print_exc()
continue
else:
for k, v in data.copy().items():
if inspect.iscoroutinefunction(data[k]) or inspect.iscoroutine(data[k]):
data[k] = await v()
elif callable(v):
data[k] = v()
if data.get("file") is not None:
if not isinstance(data["file"], discord.File):
data["file"] = discord.File(data["file"])
data.setdefault("delete_after", 300)
await message.channel.trigger_typing()
await message.reply(**data)
break
await self.process_message_for_github_links(message)
T_EMOJI = "\U0001f3f3\U0000fe0f\U0000200d\U000026a7\U0000fe0f"
G_EMOJI = "\U0001f3f3\U0000fe0f\U0000200d\U0001f308"
N_EMOJI = "\U0001f922"
C_EMOJI = "\U0000271d\U0000fe0f"
reactions = {
r"mpreg|lupupa|\U0001fac3": "\U0001fac3", # mpreg
r"(trans(gender)?($|\W+)|%s)" % T_EMOJI: T_EMOJI, # trans
r"gay|%s" % G_EMOJI: G_EMOJI,
r"(femboy|trans(gender)?($|\W+))": C_EMOJI,
}
if message.channel.permissions_for(message.guild.me).add_reactions:
is_naus = random.randint(1, 100) == 32
for key, value in reactions.items():
if re.search(key, message.content, re.IGNORECASE):
await message.add_reaction(value)
if is_naus:
await message.add_reaction(N_EMOJI)
@tasks.loop(minutes=10)
async def fetch_discord_atom_feed(self):
if not SPAM_CHANNEL:

View file

@ -193,8 +193,6 @@ class Extremism(commands.Cog):
img_bio.seek(0)
decoration_bio.seek(0)
files = [
# discord.File(img_bio, "avatar.png"),
# discord.File(decoration_bio, "decoration.png"),
discord.File(img_bytes, filename="decorated." + ext)
]
await ctx.respond(files=files)

View file

@ -17,7 +17,7 @@ except ImportError:
class InfoCog(commands.Cog):
def __init__(self, bot):
self.bot = bot
self.client = httpx.AsyncClient(base_url="https://discord.com/api")
self.client = httpx.AsyncClient(base_url="https://discord.com/api/v10")
async def get_user_info(self, token: str):
try:
@ -45,7 +45,7 @@ class InfoCog(commands.Cog):
@commands.slash_command()
@commands.cooldown(1, 10, commands.BucketType.user)
async def me(self, ctx: discord.ApplicationCommand):
async def me(self, ctx: discord.ApplicationContext):
"""Displays oauth info about you"""
await ctx.defer()

View file

@ -1,163 +0,0 @@
from datetime import datetime
from typing import Sized
import discord
from discord.ext import commands
from utils import BannedStudentID, JimmyBans, Student, get_or_none, owner_or_admin
class LimitedList(list):
"""FIFO Limited list"""
def __init__(self, iterable: Sized = None, size: int = 5000):
if iterable:
assert len(iterable) <= size, "Initial iterable too big."
super().__init__(iterable or [])
self._max_size = size
def append(self, __object) -> None:
if len(self) + 1 >= self._max_size:
self.pop(0)
super().append(__object)
def __add__(self, other):
if len(other) > self._max_size:
raise ValueError("Other is too large")
elif len(other) == self._max_size:
self.clear()
super().__add__(other)
else:
for item in other:
self.append(item)
class Mod(commands.Cog):
def __init__(self, bot):
self.bot = bot
self.cache = LimitedList()
@commands.Cog.listener()
async def on_message_delete(self, message: discord.Message):
self.cache.append(message)
@commands.user_command(name="Ban Account's B Number")
@discord.default_permissions(ban_members=True)
async def ban_student_id(self, ctx: discord.ApplicationContext, member: discord.Member):
"""Bans a student ID from registering. Also bans an account associated with it."""
await ctx.defer(ephemeral=True)
student_id = await get_or_none(Student, user_id=member.id)
if student_id is None:
return await ctx.respond("\N{cross mark} Unknown B number (is the user verified yet?)", ephemeral=True)
ban = await get_or_none(BannedStudentID, student_id=student_id.id)
if ban is None:
ban = await BannedStudentID.objects.create(
student_id=student_id.id,
associated_account=member.id,
)
await member.ban(reason=f"Banned ID {ban.student_id} by {ctx.user}")
return await ctx.respond(
f"\N{white heavy check mark} Banned {ban.student_id} (and {member.mention})", ephemeral=True
)
@commands.slash_command(name="unban-student-number")
@discord.default_permissions(ban_members=True)
async def unban_student_id(self, ctx: discord.ApplicationContext, student_id: str):
"""Unbans a student ID and the account associated with it."""
student_id = student_id.upper()
ban = await get_or_none(BannedStudentID, student_id=student_id)
if not ban:
return await ctx.respond("\N{cross mark} That student ID isn't banned.")
await ctx.defer()
user_id = ban.associated_account
await ban.delete()
if not user_id:
return await ctx.respond(f"\N{white heavy check mark} Unbanned {student_id}. No user to unban.")
else:
try:
await ctx.guild.unban(discord.Object(user_id), reason=f"Unbanned by {ctx.user}")
except discord.HTTPException as e:
return await ctx.respond(
f"\N{white heavy check mark} Unbanned {student_id}. Failed to unban {user_id} - HTTP {e.status}."
)
else:
return await ctx.respond(f"\N{white heavy check mark} Unbanned {student_id}. Unbanned {user_id}.")
@commands.slash_command(name="block")
@owner_or_admin()
async def block_user(self, ctx: discord.ApplicationContext, user: discord.Member, reason: str, until: str):
"""Blocks a user from using the bot."""
await ctx.defer()
date = datetime.utcnow()
_time = until
try:
date, _time = until.split(" ")
except ValueError:
pass
else:
try:
date = datetime.strptime(date, "%d/%m/%Y")
except ValueError:
return await ctx.respond("Invalid date format. Use `DD/MM/YYYY`.")
try:
hour, minute = map(int, _time.split(":"))
except ValueError:
return await ctx.respond("\N{cross mark} Invalid time format. Use HH:MM.")
end = date.replace(hour=hour, minute=minute)
# get an entry for the user's ID, and if it doesn't exist, create it. Otherwise, alert the user.
entry = await get_or_none(JimmyBans, user_id=user.id)
if entry is None:
await JimmyBans.objects.create(user_id=user.id, reason=reason, until=end.timestamp())
else:
return await ctx.respond("\N{cross mark} That user is already blocked.")
await ctx.respond(f"\N{white heavy check mark} Blocked {user.mention} until {discord.utils.format_dt(end)}.")
@commands.slash_command(name="unblock")
@owner_or_admin()
async def unblock_user(self, ctx: discord.ApplicationContext, user: discord.Member):
"""Unblocks a user from using the bot."""
await ctx.defer()
entry = await get_or_none(JimmyBans, user_id=user.id)
if entry is None:
return await ctx.respond("\N{cross mark} That user isn't blocked.")
await entry.delete()
await ctx.respond(f"\N{white heavy check mark} Unblocked {user.mention}.")
@commands.command()
async def undelete(self, ctx: commands.Context, user: discord.User, *, query: str = None):
"""Searches through the message cache to see if there's any deleted messages."""
for message in self.cache:
message: discord.Message
if message.author == user:
query_ = query.lower() if query else None
content_ = str(message.clean_content or "").lower()
if query_ is not None and (query_ in content_ or content_ in query_):
break
else:
return await ctx.reply("\N{cross mark} No matches in cache.")
embeds = [
discord.Embed(title="Message found!"),
discord.Embed(
description=message.content,
colour=message.author.colour,
timestamp=message.created_at,
fields=[
discord.EmbedField("Attachment count", str(len(message.attachments)), False),
discord.EmbedField("Location", str(message.channel.mention)),
discord.EmbedField(
"Times",
f"Created: {discord.utils.format_dt(message.created_at, 'R')} | Edited: "
f"{'None' if message.edited_at is None else discord.utils.format_dt(message.edited_at, 'R')}",
),
],
).set_author(name=message.author.display_name, icon_url=message.author.display_avatar.url),
]
await ctx.reply(embeds=embeds)
def setup(bot):
bot.add_cog(Mod(bot))

File diff suppressed because it is too large Load diff

View file

@ -1,656 +0,0 @@
import asyncio
import datetime
import hashlib
import json
import logging
import random
import time
from datetime import timedelta
from pathlib import Path
from typing import Dict, List, Optional, Tuple
from urllib.parse import ParseResult, parse_qs, urlparse
import discord
import httpx
from discord.ext import commands, pages, tasks
from httpx import AsyncClient, Response
from utils import UptimeEntry, console
"""
Notice to anyone looking at this code:
Don't
It doesn't look nice
It's not well written
It just works
"""
BASE_JSON = """[
{
"name": "SHRoNK Bot",
"id": "SHRONK",
"uri": "user://994710566612500550/1063875884274163732?online=1&idle=0&dnd=0"
},
{
"name": "Nex's Droplet",
"id": "NEX_DROPLET",
"uri": "http://droplet.nexy7574.co.uk:9000/"
},
{
"name": "Nex's Pi",
"id": "PI",
"uri": "http://wg.nexy7574.co.uk:9000/"
},
{
"name": "SHRoNK Droplet",
"id": "SHRONK_DROPLET",
"uri": "http://shronkservz.tk:9000/"
}
]
"""
class UptimeCompetition(commands.Cog):
class CancelTaskView(discord.ui.View):
def __init__(self, task: asyncio.Task, expires: datetime.datetime):
timeout = expires - discord.utils.utcnow()
super().__init__(timeout=timeout.total_seconds() + 3, disable_on_timeout=True)
self.task = task
@discord.ui.button(label="Cancel", style=discord.ButtonStyle.red)
async def cancel(self, _, interaction: discord.Interaction):
self.stop()
if self.task:
self.task.cancel()
await interaction.response.edit_message(content="Uptime test cancelled.", view=None)
def __init__(self, bot: commands.Bot):
self.bot = bot
self.log = logging.getLogger("jimmy.cogs.uptime")
self.http = AsyncClient(verify=False, http2=True)
self._warning_posted = False
self.test_uptimes.add_exception_type(Exception)
self.test_uptimes.start()
self.last_result: list[UptimeEntry] = []
self.task_lock = asyncio.Lock()
self.task_event = asyncio.Event()
self.path = Path.home() / ".cache" / "lcc-bot" / "targets.json"
if not self.path.exists():
self.path.write_text(BASE_JSON)
self._cached_targets = self.read_targets()
@property
def cached_targets(self) -> List[Dict[str, str]]:
return self._cached_targets.copy()
def get_target(self, name: str = None, target_id: str = None) -> Optional[Dict[str, str]]:
for target in self.cached_targets:
if name and target["name"].lower() == name.lower():
return target
if target["id"] == target_id:
return target
return
def stats_autocomplete(self, ctx: discord.ApplicationContext) -> List[str]:
return [x["name"] for x in self.cached_targets if ctx.value.lower() in x["name"].lower()] + ["ALL"]
@staticmethod
def parse_user_uri(uri: str) -> Dict[str, str | None | List[str]]:
parsed = urlparse(uri)
response = {"guild": parsed.hostname, "user": None, "statuses": []}
if parsed.path:
response["user"] = parsed.path.strip("/")
if parsed.query:
response["statuses"] = [x for x, y in parse_qs(parsed.query).items() if y[0] == "1"]
return response
def read_targets(self) -> List[Dict[str, str]]:
with self.path.open() as f:
data: list = json.load(f)
data.sort(key=lambda x: x["name"])
self._cached_targets = data.copy()
return data
def write_targets(self, data: List[Dict[str, str]]):
self._cached_targets = data
with self.path.open("w") as f:
json.dump(data, f, indent=4, default=str)
def cog_unload(self):
self.test_uptimes.cancel()
@staticmethod
def assert_uptime_server_response(response: Response, expected_hash: str = None):
assert response.status_code in range(200, 400), f"Status code {response.status_code} is not in range 200-400"
if expected_hash:
md5 = hashlib.md5()
md5.update(response.content)
resolved = md5.hexdigest()
assert resolved == expected_hash, f"{resolved}!={expected_hash}"
async def _test_url(
self, url: str, *, max_retries: int | None = 10, timeout: int | None = 30
) -> Tuple[int, Response | Exception]:
attempts = 1
if max_retries is None:
max_retries = 1
if timeout is None:
timeout = 10
err = RuntimeError("Unknown Error")
while attempts < max_retries:
try:
response = await self.http.get(url, timeout=timeout, follow_redirects=True)
response.raise_for_status()
except (httpx.TimeoutException, httpx.HTTPStatusError, ConnectionError, TimeoutError) as err2:
attempts += 1
err = err2
await asyncio.sleep(1)
continue
else:
return attempts, response
return attempts, err
async def do_test_uptimes(self):
targets = self.cached_targets
# First we need to check that we are online.
# If we aren't online, this isn't very fair.
try:
await self.http.get("https://google.co.uk/")
except (httpx.HTTPError, Exception):
return # Offline :pensive:
create_tasks = []
# We need to collect the tasks in case the sqlite server is being sluggish
for target in targets.copy():
if not target["uri"].startswith("http"):
continue
targets.remove(target)
request_kwargs = {}
if timeout := target.get("timeout"):
request_kwargs["timeout"] = timeout
if max_retries := target.get("max_retries"):
request_kwargs["max_retries"] = max_retries
kwargs: Dict[str, str | int | None] = {"target_id": target["id"], "target": target["uri"], "notes": ""}
try:
attempts, response = await self._test_url(target["uri"], **request_kwargs)
except Exception as e:
response = e
attempts = 1
if isinstance(response, Exception):
kwargs["is_up"] = False
kwargs["response_time"] = None
kwargs["notes"] += f"Failed to access page after {attempts:,} attempts: {response}"
else:
if attempts > 1:
kwargs["notes"] += f"After {attempts:,} attempts, "
try:
self.assert_uptime_server_response(response, target.get("http_content_hash"))
except AssertionError as e:
kwargs["is_up"] = False
kwargs["notes"] += "content was invalid: " + str(e)
else:
kwargs["is_up"] = True
kwargs["response_time"] = round(response.elapsed.total_seconds() * 1000)
kwargs["notes"] += "nothing notable."
create_tasks.append(self.bot.loop.create_task(UptimeEntry.objects.create(**kwargs)))
# We need to check if the shronk bot is online since matthew
# Won't let us access their server (cough cough)
if self.bot.intents.presences is True:
# If we don't have presences this is useless.
if not self.bot.is_ready():
await self.bot.wait_until_ready()
for target in targets:
parsed = urlparse(target["uri"])
if parsed.scheme != "user":
continue
guild_id = int(parsed.hostname)
user_id = int(parsed.path.strip("/"))
okay_statuses = [
discord.Status.online if "online=1" in parsed.query.lower() else None,
discord.Status.idle if "idle=1" in parsed.query.lower() else None,
discord.Status.dnd if "dnd=1" in parsed.query.lower() else None,
]
if "offline=1" in parsed.query:
okay_statuses = [discord.Status.offline]
okay_statuses = list(filter(None, okay_statuses))
guild: discord.Guild = self.bot.get_guild(guild_id)
if guild is None:
self.log.warning(
f"[yellow]:warning: Unable to locate the guild for {target['name']!r}! Can't uptime check."
)
else:
user: discord.Member | None = guild.get_member(user_id)
if not user:
# SHRoNK Bot is not in members cache.
try:
user = await guild.fetch_member(user_id)
except discord.HTTPException:
self.log.warning(f"[yellow]Unable to locate {target['name']!r}! Can't uptime check.")
user = None
if user:
create_tasks.append(
self.bot.loop.create_task(
UptimeEntry.objects.create(
target_id=target["id"],
target=target["name"],
is_up=user.status in okay_statuses,
response_time=None,
notes="*Unable to monitor response time, not a HTTP request.*",
)
)
)
else:
if self._warning_posted is False:
self.log.warning(
"[yellow]:warning: Jimmy does not have the presences intent enabled. Uptime monitoring of the"
" shronk bot is disabled."
)
self._warning_posted = True
# Now we have to collect the tasks
return await asyncio.gather(*create_tasks, return_exceptions=True)
# All done!
@tasks.loop(minutes=1)
# @tasks.loop(seconds=30)
async def test_uptimes(self):
self.task_event.clear()
async with self.task_lock:
try:
self.last_result = await self.do_test_uptimes()
except (Exception, ConnectionError):
print()
console.print_exception()
self.task_event.set()
uptime = discord.SlashCommandGroup("uptime", "Commands for the uptime competition.")
@uptime.command(name="stats")
@commands.max_concurrency(1, commands.BucketType.user, wait=True)
@commands.cooldown(1, 10, commands.BucketType.user)
async def stats(
self,
ctx: discord.ApplicationContext,
query_target: discord.Option(
str,
name="target",
description="The target to check the uptime of. Defaults to all.",
required=False,
autocomplete=stats_autocomplete,
default="ALL",
),
look_back: discord.Option(
int, description="How many days to look back. Defaults to a week.", required=False, default=7
),
):
"""View collected uptime stats."""
org_target = query_target
def generate_embed(target, specific_entries: list[UptimeEntry]):
targ = target
embed = discord.Embed(
title=f"Uptime stats for {targ['name']}",
description=f"Showing uptime stats for the last {look_back:,} days.",
color=discord.Color.blurple(),
)
first_check = datetime.datetime.fromtimestamp(specific_entries[-1].timestamp, datetime.timezone.utc)
last_offline = last_online = None
online_count = offline_count = 0
for entry in reversed(specific_entries):
if entry.is_up is False:
last_offline = datetime.datetime.fromtimestamp(entry.timestamp, datetime.timezone.utc)
offline_count += 1
else:
last_online = datetime.datetime.fromtimestamp(entry.timestamp, datetime.timezone.utc)
online_count += 1
total_count = online_count + offline_count
online_avg = (online_count / total_count) * 100
average_response_time = (
sum(entry.response_time for entry in entries if entry.response_time is not None) / total_count
)
if org_target != "ALL":
embed.add_field(
name="\u200b",
value=f"*Started monitoring {discord.utils.format_dt(first_check, style='R')}, "
f"{total_count:,} monitoring events collected*\n"
f"**Online:**\n\t\\* {online_avg:.2f}% of the time ({online_count:,} events)\n\t"
f"\\* Last seen online: "
f"{discord.utils.format_dt(last_online, 'R') if last_online else 'Never'}\n"
f"\n"
f"**Offline:**\n\t\\* {100 - online_avg:.2f}% of the time ({offline_count:,} events)\n\t"
f"\\* Last seen offline: "
f"{discord.utils.format_dt(last_offline, 'R') if last_offline else 'Never'}\n"
f"\n"
f"**Average Response Time:**\n\t\\* {average_response_time:.2f}ms",
)
else:
embed.title = None
embed.description = f"{targ['name']}: {online_avg:.2f}% uptime, last offline: "
if last_offline:
embed.description += f"{discord.utils.format_dt(last_offline, 'R')}"
return embed
await ctx.defer()
now = discord.utils.utcnow()
look_back_timestamp = (now - timedelta(days=look_back)).timestamp()
targets = [query_target]
if query_target == "ALL":
targets = [t["id"] for t in self.cached_targets]
total_query_time = rows = 0
embeds = entries = []
for _target in targets:
_target = self.get_target(_target, _target)
query = UptimeEntry.objects.filter(timestamp__gte=look_back_timestamp).filter(target_id=_target["id"])
query = query.order_by("-timestamp")
start = time.time()
entries = await query.all()
end = time.time()
total_query_time += end - start
if not entries:
embeds.append(discord.Embed(description=f"No uptime entries found for {_target}."))
else:
embeds.append(await asyncio.to_thread(generate_embed, _target, entries))
embeds[-1].set_footer(text=f"Query took {end - start:.2f}s | {len(entries):,} rows")
if org_target == "ALL":
new_embed = discord.Embed(
title="Uptime stats for all monitored targets:",
description=f"Showing uptime stats for the last {look_back:,} days.\n\n",
color=discord.Color.blurple(),
)
for embed_ in embeds:
new_embed.description += f"{embed_.description}\n"
if total_query_time > 60:
minutes, seconds = divmod(total_query_time, 60)
new_embed.set_footer(text=f"Total query time: {minutes:.0f}m {seconds:.2f}s")
else:
new_embed.set_footer(text=f"Total query time: {total_query_time:.2f}s")
new_embed.set_footer(text=f"{new_embed.footer.text} | {len(entries):,} rows")
embeds = [new_embed]
await ctx.respond(embeds=embeds)
@uptime.command(name="speedtest")
@commands.is_owner()
async def stats_speedtest(self, ctx: discord.ApplicationContext):
"""Tests the database's speed"""
await ctx.defer()
tests = {
"all": None,
"lookback_7_day": None,
"lookback_30_day": None,
"lookback_90_day": None,
"lookback_365_day": None,
"first": None,
"random": None,
}
async def run_test(name):
match name:
case "all":
start = time.time()
e = await UptimeEntry.objects.all()
end = time.time()
return (end - start) * 1000, len(e)
case "lookback_7_day":
start = time.time()
e = await UptimeEntry.objects.filter(
UptimeEntry.columns.timestamp >= (discord.utils.utcnow() - timedelta(days=7)).timestamp()
).all()
end = time.time()
return (end - start) * 1000, len(e)
case "lookback_30_day":
start = time.time()
e = await UptimeEntry.objects.filter(
UptimeEntry.columns.timestamp >= (discord.utils.utcnow() - timedelta(days=30)).timestamp()
).all()
end = time.time()
return (end - start) * 1000, len(e)
case "lookback_90_day":
start = time.time()
e = await UptimeEntry.objects.filter(
UptimeEntry.columns.timestamp >= (discord.utils.utcnow() - timedelta(days=90)).timestamp()
).all()
end = time.time()
return (end - start) * 1000, len(e)
case "lookback_365_day":
start = time.time()
e = await UptimeEntry.objects.filter(
UptimeEntry.columns.timestamp >= (discord.utils.utcnow() - timedelta(days=365)).timestamp()
).all()
end = time.time()
return (end - start) * 1000, len(e)
case "first":
start = time.time()
e = await UptimeEntry.objects.first()
end = time.time()
return (end - start) * 1000, 1
case "random":
start = time.time()
e = await UptimeEntry.objects.offset(random.randint(0, 1000)).first()
end = time.time()
return (end - start) * 1000, 1
case _:
raise ValueError(f"Unknown test name: {name}")
def gen_embed(_copy):
embed = discord.Embed(title="\N{HOURGLASS} Speedtest Results", colour=discord.Colour.red())
for _name in _copy.keys():
if _copy[_name] is None:
embed.add_field(name=_name, value="Waiting...")
else:
_time, _row = _copy[_name]
_time /= 1000
rows_per_second = _row / _time
if _time >= 60:
minutes, seconds = divmod(_time, 60)
ts = f"{minutes:.0f}m {seconds:.2f}s"
else:
ts = f"{_time:.2f}s"
embed.add_field(name=_name, value=f"{ts}, {_row:,} rows ({rows_per_second:.2f} rows/s)")
return embed
embed = gen_embed(tests)
ctx = await ctx.respond(embed=embed)
for test_key in tests.keys():
tests[test_key] = await run_test(test_key)
embed = gen_embed(tests)
await ctx.edit(embed=embed)
embed.colour = discord.Colour.green()
await ctx.edit(embed=embed)
@uptime.command(name="view-next-run")
async def view_next_run(self, ctx: discord.ApplicationContext):
"""View when the next uptime test will run."""
await ctx.defer()
next_run = self.test_uptimes.next_iteration
if next_run is None:
return await ctx.respond("The uptime test is not running!")
else:
_wait = self.bot.loop.create_task(discord.utils.sleep_until(next_run))
view = self.CancelTaskView(_wait, next_run)
await ctx.respond(
f"The next uptime test will run in {discord.utils.format_dt(next_run, 'R')}. Waiting...", view=view
)
await _wait
if not self.task_event.is_set():
await ctx.edit(content="Uptime test running! Waiting for results...", view=None)
await self.task_event.wait()
embeds = []
for result in self.last_result:
if isinstance(result, Exception):
embed = discord.Embed(
title="Error",
description=f"An error occurred while running an uptime test: {result}",
color=discord.Color.red(),
)
else:
result = await UptimeEntry.objects.get(entry_id=result.entry_id)
target = self.get_target(target_id=result.target_id) or {"name": result.target_id}
embed = discord.Embed(
title="Uptime for: " + target["name"],
description="Is up: {0.is_up!s}\n"
"Response time: {1:,.2f}ms\n"
"Notes: {0.notes!s}".format(result, result.response_time or -1),
color=discord.Color.green(),
)
embeds.append(embed)
if len(embeds) >= 3:
paginator = pages.Paginator(embeds, loop_pages=True)
await ctx.delete(delay=0.1)
await paginator.respond(ctx.interaction)
else:
await ctx.edit(content="Uptime test complete! Results are in:", embeds=embeds, view=None)
monitors = uptime.create_subgroup(name="monitors", description="Manage uptime monitors.")
@monitors.command(name="add")
@commands.is_owner()
async def add_monitor(
self,
ctx: discord.ApplicationContext,
name: discord.Option(str, description="The name of the monitor."),
uri: discord.Option(
str,
description="The URI to monitor. Enter HELP for more info.",
),
http_max_retries: discord.Option(
int,
description="The maximum number of HTTP retries to make if the request fails.",
required=False,
default=None,
),
http_timeout: discord.Option(
int, description="The timeout for the HTTP request.", required=False, default=None
),
http_md5_content_hash: discord.Option(
str, description="The hash of the content to check for.", required=False, default=None
),
):
"""Creates a monitor to... monitor"""
await ctx.defer()
name: str
uri: str
http_max_retries: Optional[int]
http_timeout: Optional[int]
uri: ParseResult = urlparse(uri.lower())
if uri.scheme == "" and uri.path.lower() == "help":
return await ctx.respond(
"URI can be either `HTTP(S)`, or the custom `USER` scheme.\n"
"Examples:\n"
"`http://example.com` - HTTP GET request to example.com\n"
"`https://example.com` - HTTPS GET request to example.com\n\n"
f"`user://{ctx.guild.id}/{ctx.user.id}?dnd=1` - Checks if user `{ctx.user.id}` (you) is"
f" in `dnd` (the red status) in the server `{ctx.guild.id}` (this server)."
f"\nQuery options are: `dnd`, `idle`, `online`, `offline`. `1` means the status is classed as "
f"'online' - anything else means offline.\n"
f"The format is `user://{{server_id}}/{{user_id}}?{{status}}={{1|0}}`\n"
f"Setting `server_id` to `$GUILD` will auto-fill in the current server ID."
)
options = {
"name": name,
"id": name.upper().strip().replace(" ", "_"),
}
if uri.scheme == "user":
uri: ParseResult = uri._replace(netloc=uri.hostname.replace("$guild", str(ctx.guild.id)))
data = self.parse_user_uri(uri.geturl())
if data["guild"] is None or data["guild"].isdigit() is False:
return await ctx.respond("Invalid guild ID in URI.")
if data["user"] is None or data["user"].isdigit() is False:
return await ctx.respond("Invalid user ID in URI.")
if not data["statuses"] or any(
x.lower() not in ("dnd", "idle", "online", "offline") for x in data["statuses"]
):
return await ctx.respond("Invalid status query string in URI. Did you forget to supply one?")
guild = self.bot.get_guild(int(data["guild"]))
if guild is None:
return await ctx.respond("Invalid guild ID in URI (not found).")
try:
guild.get_member(int(data["user"])) or await guild.fetch_member(int(data["user"]))
except discord.HTTPException:
return await ctx.respond("Invalid user ID in URI (not found).")
options["uri"] = uri.geturl()
elif uri.scheme in ["http", "https"]:
attempts, response = await self._test_url(uri.geturl(), max_retries=http_max_retries, timeout=http_timeout)
if response is None:
return await ctx.respond(
f"Failed to connect to {uri.geturl()!r} after {attempts} attempts. Please ensure the target page"
f" is up, and try again."
)
options["uri"] = uri.geturl()
options["http_max_retries"] = http_max_retries
options["http_timeout"] = http_timeout
if http_md5_content_hash == "GEN":
http_md5_content_hash = hashlib.md5(response.content).hexdigest()
options["http_content_hash"] = http_md5_content_hash
else:
return await ctx.respond("Invalid URI scheme. Supported: HTTP[S], USER.")
targets = self.cached_targets
for target in targets:
if target["uri"] == options["uri"] or target["name"] == options["name"]:
return await ctx.respond("This monitor already exists.")
targets.append(options)
self.write_targets(targets)
await ctx.respond("Monitor added!")
@monitors.command(name="dump")
async def show_monitor(
self, ctx: discord.ApplicationContext, name: discord.Option(str, description="The name of the monitor.")
):
"""Shows a monitor's data."""
await ctx.defer()
name: str
target = self.get_target(name)
if target:
return await ctx.respond(f"```json\n{json.dumps(target, indent=4)}```")
await ctx.respond("Monitor not found.")
@monitors.command(name="remove")
@commands.is_owner()
async def remove_monitor(
self,
ctx: discord.ApplicationContext,
name: discord.Option(str, description="The name of the monitor."),
):
"""Removes a monitor."""
await ctx.defer()
name: str
targets = self.cached_targets
for target in targets:
if target["name"] == name or target["id"] == name:
targets.remove(target)
self.write_targets(targets)
return await ctx.respond("Monitor removed.")
await ctx.respond("Monitor not found.")
@monitors.command(name="reload")
@commands.is_owner()
async def reload_monitors(self, ctx: discord.ApplicationContext):
"""Reloads monitors data file from disk, overriding cache."""
await ctx.defer()
self.read_targets()
await ctx.respond("Monitors reloaded.")
def setup(bot):
bot.add_cog(UptimeCompetition(bot))

34
main.py
View file

@ -1,3 +1,4 @@
import asyncio
import logging
import os
import signal
@ -76,7 +77,11 @@ async def on_application_command_error(ctx: discord.ApplicationContext, error: E
return
if ctx.user.id == 1019233057519177778:
await ctx.respond("Uh oh! I did a fucky wucky >.< I'll make sure to let important peoplez know straight away!!")
await ctx.respond("Something happened!")
await asyncio.sleep(5)
await ctx.edit(
content="Command Error: `%s`" % repr(error)[:1950],
)
else:
text = "Application Command Error: `%r`" % error
await ctx.respond(textwrap.shorten(text, 2000))
@ -117,20 +122,19 @@ async def ping(ctx: discord.ApplicationContext):
gateway = round(ctx.bot.latency * 1000, 2)
return await ctx.respond(f"\N{white heavy check mark} Pong! `{gateway}ms`.")
@bot.check_once
async def check_not_banned(ctx: discord.ApplicationContext | commands.Context):
if await bot.is_owner(ctx.author) or ctx.command.name in ("block", "unblock", "timetable", "verify", "kys"):
return True
user = ctx.author
ban: JimmyBans = await get_or_none(JimmyBans, user_id=user.id)
if ban:
dt = datetime.fromtimestamp(ban.until, timezone.utc)
if dt < discord.utils.utcnow():
await ban.delete()
else:
raise JimmyBanException(dt, ban.reason)
return True
# @bot.check_once
# async def check_not_banned(ctx: discord.ApplicationContext | commands.Context):
# if await bot.is_owner(ctx.author) or ctx.command.name in ("block", "unblock", "timetable", "verify", "kys"):
# return True
# user = ctx.author
# ban: JimmyBans = await get_or_none(JimmyBans, user_id=user.id)
# if ban:
# dt = datetime.fromtimestamp(ban.until, timezone.utc)
# if dt < discord.utils.utcnow():
# await ban.delete()
# else:
# raise JimmyBanException(dt, ban.reason)
# return True
if __name__ == "__main__":