college-bot-v2/src/main.py
nexy7574 b40ee8cce8
All checks were successful
Build and Publish Jimmy.2 / build_and_publish (push) Successful in 6s
well I feel stupid
2024-05-27 17:48:16 +01:00

302 lines
11 KiB
Python

import asyncio
import datetime
import glob
import logging
import random
import shutil
import sys
import time
import traceback
import typing
from logging import FileHandler
from threading import Event, Thread
import discord
import httpx
from discord.ext import commands
from rich.console import Console
from rich.logging import RichHandler
from conf import CONFIG
class KillableThread(Thread):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.kill = Event()
class KumaThread(KillableThread):
def __init__(self, url: str, interval: float = 60.0):
super().__init__(target=self.run)
self.daemon = True
self.log = logging.getLogger("jimmy .status")
self.url = url
self.interval = interval
self.kill = Event()
self.retries = 0
self.previous = False
self.was_ready = False
def calculate_backoff(self) -> float:
rnd = random.uniform(0, 1)
retries = min(self.retries, 1000)
t = (2 * 2**retries) + rnd
self.log.debug("Backoff: 2 * (2 ** %d) + %f = %f", retries, rnd, t)
# T can never exceed self.interval
return max(0, min(self.interval, t))
def run(self) -> None:
with httpx.Client(http2=True) as client:
while not self.kill.is_set():
start_time = time.time()
try:
self.retries += 1
url = self.url
if url.endswith("ping="):
url += str(round(bot.latency * 1000, 2))
if bot.is_ready() is False:
if self.was_ready is False:
# Wait rather than attack
time.sleep(1)
continue
url = url.replace("status=up", "status=down")
url = url.replace("msg=OK", "msg=Bot%20not%20ready")
else:
self.was_ready = True
response = client.get(url)
response.raise_for_status()
self.previous = bot.is_ready()
except httpx.HTTPError as error:
self.log.error("Failed to connect to uptime-kuma: %r: %r", url, error, exc_info=error)
timeout = self.calculate_backoff()
self.log.warning("Waiting %d seconds before retrying ping.", timeout)
time.sleep(timeout)
continue
self.retries = 0
end_time = time.time()
timeout = self.interval - (end_time - start_time)
if self.previous is False:
timeout = 2.5
self.kill.wait(timeout)
log = logging.getLogger("jimmy")
CONFIG.setdefault("logging", {})
cols, lns = shutil.get_terminal_size((120, 48))
logging.basicConfig(
format="%(asctime)s %(levelname)s %(name)s: %(message)s",
datefmt="%Y-%m-%d %H:%M:%S",
level=CONFIG["logging"].get("level", "INFO"),
handlers=[
RichHandler(
level=CONFIG["logging"].get("level", "INFO"),
show_time=False,
show_path=False,
markup=True,
console=Console(width=cols, height=lns),
),
FileHandler(filename=CONFIG["logging"].get("file", "jimmy.log"), encoding="utf-8", errors="replace"),
],
)
for logger in CONFIG["logging"].get("suppress", []):
logging.getLogger(logger).setLevel(logging.WARNING)
log.info(f"Suppressed logging for {logger}")
class Client(commands.Bot):
def __init_(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.web: typing.Optional[asyncio.Task] = None
self.uptime_thread = None
async def on_connect(self):
if not any(self.walk_application_commands()):
log.warning("No application commands. using pending.")
cmds = self.pending_application_commands
else:
cmds = list(self.walk_application_commands())
log.debug("Walking the following application commands: %r", cmds)
for command in cmds:
log.debug("Checking %r", command)
try:
if hasattr(command, "options"):
for option in command.options:
if option.input_type is None:
raise TypeError("Input type is None")
_ = command.to_dict()
except Exception as e:
log.error("Failed to validate %r", command, exc_info=e)
self.remove_application_command(command)
else:
log.debug("Loaded command %r.", command)
log.info("Connected to Discord.")
await super().on_connect()
async def start(self, token: str, *, reconnect: bool = True) -> None:
if CONFIG["jimmy"].get("uptime_kuma_url"):
self.uptime_thread = KumaThread(
CONFIG["jimmy"]["uptime_kuma_url"], CONFIG["jimmy"].get("uptime_kuma_interval", 58.0)
)
self.uptime_thread.start()
await super().start(token, reconnect=reconnect)
async def close(self) -> None:
if self.uptime_thread:
self.uptime_thread.kill.set()
await asyncio.get_event_loop().run_in_executor(None, self.uptime_thread.join)
await super().close()
bot = Client(
command_prefix=commands.when_mentioned_or("h!", "H!"),
case_insensitive=True,
strip_after_prefix=True,
debug_guilds=CONFIG["jimmy"].get("debug_guilds"),
intents=discord.Intents.all(),
)
for module in CONFIG["jimmy"].get("modules", ["cogs/*.py"]):
try:
bot.load_extension(module)
except (discord.ExtensionNotFound, ModuleNotFoundError):
log.debug("%r does not exist - trying as glob pattern", module)
for ext in glob.glob(module, recursive=True):
if ext.endswith(("__init__.py", ".pyc")):
continue
ext = ext.replace("/", ".")[:-3] # foo/bar.py -> foo.bar
if ext.split(".")[-1].startswith("__"):
log.info("Skipping loading %s - starts with double underscore." % ext)
try:
bot.load_extension(ext)
except (discord.ExtensionError, ModuleNotFoundError) as e:
log.error(f"Failed to load extension {ext}", exc_info=e)
else:
log.info(f"Loaded extension {ext}")
except discord.ExtensionError as e:
log.error(f"Failed to load extension {module}", exc_info=e)
else:
log.info(f"Loaded extension {module}")
@bot.event
async def on_ready():
log.info(f"Logged in as {bot.user} ({bot.user.id})")
@bot.listen()
async def on_application_command(ctx: discord.ApplicationContext):
log.info(f"Received command [b]{ctx.command}[/] from {ctx.author} in {ctx.guild}")
ctx.start_time = discord.utils.utcnow()
@bot.listen()
async def on_application_command_error(ctx: discord.ApplicationContext, exc: Exception):
log.error(f"Error in {ctx.command} from {ctx.author} in {ctx.guild}", exc_info=exc)
if isinstance(exc, commands.CommandOnCooldown):
expires = discord.utils.utcnow() + datetime.timedelta(seconds=exc.retry_after)
await ctx.respond(f"Command on cooldown. Try again {discord.utils.format_dt(expires, style='R')}.")
elif isinstance(exc, commands.MaxConcurrencyReached):
await ctx.respond("You've reached the maximum number of concurrent uses for this command.")
else:
if await bot.is_owner(ctx.author):
paginator = commands.Paginator(prefix="```py")
for line in traceback.format_exception(type(exc), exc, exc.__traceback__):
paginator.add_line(line[:1990])
for page in paginator.pages:
await ctx.respond(page)
else:
await ctx.respond(f"An error occurred while processing your command. Please try again later.\n" f"{exc}")
@bot.listen()
async def on_application_command_completion(ctx: discord.ApplicationContext):
time_taken = discord.utils.utcnow() - ctx.start_time
log.info(
f"Completed command [b]{ctx.command}[/] from {ctx.author} in "
f"{ctx.guild} in {time_taken.total_seconds():.2f} seconds."
)
@bot.message_command(name="Delete Message")
async def delete_message(ctx: discord.ApplicationContext, message: discord.Message):
await ctx.defer()
if not ctx.channel.permissions_for(ctx.me).manage_messages:
if message.author != bot.user:
return await ctx.respond("I don't have permission to delete messages in this channel.", delete_after=30)
log.info("%s deleted message %s>%s: %r", ctx.author, ctx.channel.name, message.id, message.content)
await message.delete(delay=3)
await ctx.respond(f"\N{white heavy check mark} Deleted message by {message.author.display_name}.")
await ctx.delete(delay=15)
@bot.slash_command(name="about")
async def about_me(ctx: discord.ApplicationContext):
embed = discord.Embed(
title="Jimmy v3",
description="A bot specifically for the LCC discord server(s).",
colour=discord.Colour.green()
)
embed.add_field(
name="Source",
value="[Source code is available under AGPLv3.](https://git.i-am.nexus/nex/college-bot-v2)"
)
user = await ctx.bot.fetch_user(421698654189912064)
appinfo = await ctx.bot.application_info()
author = appinfo.owner
embed.add_field(name="Author", value=f"{user.mention} created me. {author.mention} is running this instance.")
return await ctx.respond(embed=embed)
@bot.check_once
async def check_is_enabled(ctx: commands.Context | discord.ApplicationContext) -> bool:
disabled = CONFIG["jimmy"].get("disabled_commands", [])
if ctx.command.qualified_name in disabled:
raise commands.DisabledCommand(
"%s is disabled via this instance's configuration file." % (
ctx.command.qualified_name
)
)
return True
@bot.check_once
async def add_delays(ctx: commands.Context | discord.ApplicationContext) -> bool:
blocked = CONFIG["jimmy"].get("delay", {})
if str(ctx.author.id) in blocked:
range_start, range_end = blocked[str(ctx.author.id)]
range_start = max(0.01, range_start)
range_end = min(2.89, range_end)
n = random.randint(round(range_start * 100), round(range_end * 100)) / 100
log.warning("Delaying user %s by %.2f seconds.", ctx.author, n)
await asyncio.sleep(n)
log.info("Artificial delay for %s lifted", ctx.author)
else:
log.debug("%s is not in %s", ctx.author.id, blocked.keys())
return True
if not CONFIG["jimmy"].get("token"):
log.critical("No token specified in config.toml. Exiting. (hint: set jimmy.token in config.toml)")
sys.exit(1)
__disabled = CONFIG["jimmy"].get("disabled_commands", [])
__disabled_mode = CONFIG["jimmy"].get("disabled_mode", "disabled").lower()
if __disabled:
if __disabled_mode == "delete":
for __command__name in __disabled:
_cmd = bot.get_application_command(__command__name)
if not _cmd:
log.warning("Unknown command %r - cannot remove.", __command__name)
else:
bot.remove_application_command(_cmd)
log.info("Removed command %s", __command__name)
else:
log.info("Disabled commands are specified, however they're soft-blocked.")
else:
log.info("No commands to disable. Full steam ahead!")
bot.run(CONFIG["jimmy"]["token"])