diff --git a/.idea/college-bot-2.0.iml b/.idea/college-bot-2.0.iml
new file mode 100644
index 0000000..2c80e12
--- /dev/null
+++ b/.idea/college-bot-2.0.iml
@@ -0,0 +1,10 @@
+
+
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/.idea/inspectionProfiles/profiles_settings.xml b/.idea/inspectionProfiles/profiles_settings.xml
new file mode 100644
index 0000000..105ce2d
--- /dev/null
+++ b/.idea/inspectionProfiles/profiles_settings.xml
@@ -0,0 +1,6 @@
+
+
+
+
+
+
\ No newline at end of file
diff --git a/.idea/misc.xml b/.idea/misc.xml
new file mode 100644
index 0000000..d807ba6
--- /dev/null
+++ b/.idea/misc.xml
@@ -0,0 +1,7 @@
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/.idea/modules.xml b/.idea/modules.xml
new file mode 100644
index 0000000..5a8fd1c
--- /dev/null
+++ b/.idea/modules.xml
@@ -0,0 +1,8 @@
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/.idea/vcs.xml b/.idea/vcs.xml
new file mode 100644
index 0000000..94a25f7
--- /dev/null
+++ b/.idea/vcs.xml
@@ -0,0 +1,6 @@
+
+
+
+
+
+
\ No newline at end of file
diff --git a/.idea/workspace.xml b/.idea/workspace.xml
new file mode 100644
index 0000000..21f0547
--- /dev/null
+++ b/.idea/workspace.xml
@@ -0,0 +1,102 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+ {
+ "associatedIndex": 8
+}
+
+
+
+
+
+
+
+
+ {
+ "keyToString": {
+ "ASKED_ADD_EXTERNAL_FILES": "true",
+ "RunOnceActivity.OpenProjectViewOnStart": "true",
+ "RunOnceActivity.ShowReadmeOnStart": "true",
+ "git-widget-placeholder": "master",
+ "node.js.detected.package.eslint": "true",
+ "node.js.detected.package.tslint": "true",
+ "node.js.selected.package.eslint": "(autodetect)",
+ "node.js.selected.package.tslint": "(autodetect)",
+ "nodejs_package_manager_path": "npm",
+ "settings.editor.selected.configurable": "settings.sync",
+ "vue.rearranger.settings.migration": "true"
+ }
+}
+
+
+
+
+
+
+
+
+
+
+
+ 1704132606619
+
+
+ 1704132606619
+
+
+
+
+
+
+
+
+ 1704234449168
+
+
+
+ 1704234449168
+
+
+
+
+
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/cogs/net.py b/cogs/net.py
new file mode 100644
index 0000000..e63988b
--- /dev/null
+++ b/cogs/net.py
@@ -0,0 +1,248 @@
+import io
+import os
+import typing
+
+import discord
+from rich.console import Console
+import time
+import re
+from dns import asyncresolver
+from rich.tree import Tree
+import asyncio
+from discord.ext import commands
+
+
+class NetworkCog(commands.Cog):
+ def __init__(self, bot: commands.Bot):
+ self.bot = bot
+
+ @commands.slash_command()
+ async def ping(self, ctx: discord.ApplicationContext, target: str = None):
+ """Get the bot's latency, or the network latency to a target."""
+ if target is None:
+ return await ctx.respond(f"Pong! {round(self.bot.latency * 1000)}ms")
+ else:
+ await ctx.defer()
+ process = await asyncio.create_subprocess_exec(
+ "ping",
+ "-c",
+ "5",
+ target,
+ stdout=asyncio.subprocess.PIPE,
+ stderr=asyncio.subprocess.PIPE,
+ )
+ stdout, stderr = await process.communicate()
+ paginator = commands.Paginator()
+
+ for line in stdout.splitlines():
+ paginator.add_line(line.decode("utf-8"))
+ for line in stderr.splitlines():
+ paginator.add_line("[STDERR] " + line.decode("utf-8"))
+
+ for page in paginator.pages:
+ await ctx.respond(page)
+
+ @commands.slash_command()
+ async def whois(self, ctx: discord.ApplicationContext, target: str):
+ """Get information about a user."""
+
+ async def run_command(with_disclaimer: bool = False):
+ args = [] if with_disclaimer else ["-H"]
+ process = await asyncio.create_subprocess_exec(
+ "whois",
+ *args,
+ target,
+ stdout=asyncio.subprocess.PIPE,
+ stderr=asyncio.subprocess.PIPE,
+ )
+ so, se = await process.communicate()
+ return so, se, process.returncode
+
+ await ctx.defer()
+ paginator = commands.Paginator()
+ redacted = io.BytesIO()
+ stdout, stderr, status = await run_command()
+
+ def decide(ln: str) -> typing.Optional[bool]:
+ if ln.startswith(">>> Last update"):
+ return
+ if "REDACTED" in ln or "Please query the WHOIS server of the owning registrar" in ln or ":" not in ln:
+ return False
+ else:
+ return True
+
+ for line in stdout.decode().splitlines():
+ line = line.strip()
+ if not line:
+ continue
+ a = decide(line)
+ if a:
+ paginator.add_line(line)
+ elif a is None:
+ redacted.write(b"[STDERR] " + line.encode() + b"\n")
+
+ for line in stderr.decode().splitlines():
+ line = line.strip()
+ if not line:
+ continue
+ a = decide(line)
+ if a:
+ paginator.add_line("[STDERR] " + line)
+ elif a is None:
+ redacted.write(b"[STDERR] " + line.encode() + b"\n")
+
+ if not paginator.pages:
+ stdout, stderr, status = await run_command(with_disclaimer=True)
+ if not any((stdout, stderr)):
+ return await ctx.respond(f"No output was returned with status code {status}.")
+ file = io.BytesIO()
+ file.write(stdout)
+ if stderr:
+ file.write(b"\n----- STDERR -----\n")
+ file.write(stderr)
+ file.seek(0)
+ return await ctx.respond(
+ "Seemingly all output was filtered. Returning raw command output.",
+ file=discord.File(file, "whois.txt")
+ )
+
+ for page in paginator.pages:
+ await ctx.respond(page)
+ if redacted.getvalue():
+ redacted.seek(0)
+ await ctx.respond(file=discord.File(redacted, "redacted.txt"))
+
+ @commands.slash_command()
+ async def dig(
+ self,
+ ctx: discord.ApplicationContext,
+ domain: str,
+ _type: discord.Option(
+ str,
+ name="type",
+ default="A",
+ choices=[
+ "A",
+ "AAAA",
+ "ANY",
+ "AXFR",
+ "CNAME",
+ "HINFO",
+ "LOC",
+ "MX",
+ "NS",
+ "PTR",
+ "SOA",
+ "SRV",
+ "TXT",
+ ],
+ ),
+ ):
+ """Looks up a domain name"""
+ await ctx.defer()
+ if re.search(r"\s+", domain):
+ return await ctx.respond("Domain name cannot contain spaces.")
+ try:
+ response = await asyncresolver.resolve(
+ domain,
+ _type.upper(),
+ )
+ except Exception as e:
+ return await ctx.respond(f"Error: {e}")
+ res = response
+ tree = Tree(f"DNS Lookup for {domain}")
+ for record in res:
+ record_tree = tree.add(f"{record.rdtype.name} Record")
+ record_tree.add(f"Name: {res.name}")
+ record_tree.add(f"Value: {record.to_text()}")
+ console = Console()
+ with console.capture() as capture:
+ console.print(tree)
+ text = capture.get()
+ paginator = commands.Paginator(prefix="```", suffix="```")
+ for line in text.splitlines():
+ paginator.add_line(line)
+ paginator.add_line(empty=True)
+ paginator.add_line(f"Exit code: {0}")
+ paginator.add_line(f"DNS Server used: {res.nameserver}")
+ for page in paginator.pages:
+ await ctx.respond(page)
+
+ @commands.slash_command()
+ async def traceroute(
+ self,
+ ctx: discord.ApplicationContext,
+ url: str,
+ port: discord.Option(int, description="Port to use", default=None),
+ ping_type: discord.Option(
+ str,
+ name="ping-type",
+ description="Type of ping to use. See `traceroute --help`",
+ choices=["icmp", "tcp", "udp", "udplite", "dccp", "default"],
+ default="default",
+ ),
+ use_ip_version: discord.Option(
+ str, name="ip-version", description="IP version to use.", choices=["ipv4", "ipv6"], default="ipv4"
+ ),
+ max_ttl: discord.Option(int, name="ttl", description="Max number of hops", default=30),
+ ):
+ """Performs a traceroute request."""
+ await ctx.defer()
+ if re.search(r"\s+", url):
+ return await ctx.respond("URL cannot contain spaces.")
+
+ args = ["sudo", "-E", "-n", "traceroute"]
+ flags = {
+ "ping_type": {
+ "icmp": "-I",
+ "tcp": "-T",
+ "udp": "-U",
+ "udplite": "-UL",
+ "dccp": "-D",
+ },
+ "use_ip_version": {"ipv4": "-4", "ipv6": "-6"},
+ }
+
+ if ping_type == "default" or os.getuid() == 0:
+ args = args[3:] # removes sudo
+ else:
+ args.append(flags["ping_type"][ping_type])
+ args.append(flags["use_ip_version"][use_ip_version])
+ args.append("-m")
+ args.append(str(max_ttl))
+ if port is not None:
+ args.append("-p")
+ args.append(str(port))
+ args.append(url)
+ paginator = commands.Paginator()
+ paginator.add_line(f"Running command: {' '.join(args[3 if args[0] == 'sudo' else 0:])}")
+ paginator.add_line(empty=True)
+ try:
+ start = time.time_ns()
+ process = await asyncio.create_subprocess_exec(
+ args[0],
+ *args[1:],
+ stdout=asyncio.subprocess.PIPE,
+ stderr=asyncio.subprocess.PIPE,
+ )
+ await process.wait()
+ stdout, stderr = await process.communicate()
+ end = time.time_ns()
+ time_taken_in_ms = (end - start) / 1000000
+ if stdout:
+ for line in stdout.splitlines():
+ paginator.add_line(line.decode())
+ if stderr:
+ for line in stderr.splitlines():
+ paginator.add_line(line.decode())
+ paginator.add_line(empty=True)
+ paginator.add_line(f"Exit code: {process.returncode}")
+ paginator.add_line(f"Time taken: {time_taken_in_ms:,.1f}ms")
+ except Exception as e:
+ paginator.add_line(f"Error: {e}")
+ for page in paginator.pages:
+ await ctx.respond(page)
+
+
+def setup(bot):
+ bot.add_cog(NetworkCog(bot))
diff --git a/cogs/screenshot.py b/cogs/screenshot.py
new file mode 100644
index 0000000..22d8abf
--- /dev/null
+++ b/cogs/screenshot.py
@@ -0,0 +1,333 @@
+import datetime
+import io
+import logging
+import shutil
+import tempfile
+import time
+import zipfile
+from urllib.parse import urlparse
+from PIL import Image
+
+import discord
+from discord.ext import commands
+import asyncio
+import aiohttp
+from pathlib import Path
+
+from selenium import webdriver
+from selenium.common.exceptions import WebDriverException
+from selenium.webdriver.chrome.options import Options as ChromeOptions
+from selenium.webdriver.chrome.service import Service as ChromeService
+
+
+class ScreenshotCog(commands.Cog):
+ def __init__(self, bot: commands.Bot):
+ self.bot = bot
+
+ self.chrome_options = ChromeOptions()
+ self.chrome_options.add_argument("--headless")
+ self.chrome_options.add_argument("--disable-dev-shm-usage")
+ # self.chrome_options.add_argument("--disable-gpu")
+ self.chrome_options.add_argument("--disable-extensions")
+ self.chrome_options.add_argument("--incognito")
+
+ prefs = {
+ # "download.open_pdf_in_system_reader": False,
+ # "download.prompt_for_download": True,
+ # "download.default_directory": "/dev/null",
+ # "plugins.always_open_pdf_externally": False,
+ "download_restrictions": 3,
+ }
+ self.chrome_options.add_experimental_option(
+ "prefs", prefs
+ )
+
+ self.dir = Path(__file__).parent.parent / "chrome"
+ self.dir.mkdir(mode=0o775, exist_ok=True)
+
+ self.chrome_dir = self.dir / "chrome-headless-shell-linux64"
+ self.chrome_bin = self.chrome_dir / "chrome-headless-shell"
+ self.chromedriver_dir = self.dir / "chromedriver-linux64"
+ self.chromedriver_bin = self.chromedriver_dir / "chromedriver"
+
+ self.chrome_options.binary_location = str(self.chrome_bin.resolve())
+
+ self.log = logging.getLogger("jimmy.cogs.screenshot")
+
+ def clear_directories(self):
+ shutil.rmtree(self.chrome_dir, ignore_errors=True)
+ shutil.rmtree(self.chromedriver_dir, ignore_errors=True)
+ self.chrome_dir.mkdir(mode=0o775, exist_ok=True)
+ self.chromedriver_dir.mkdir(mode=0o775, exist_ok=True)
+
+ async def download_latest_chrome(self, current: str = None, *, channel: str = "Stable"):
+ async with aiohttp.ClientSession(raise_for_status=True) as session:
+ async with session.get(
+ "https://googlechromelabs.github.io/chrome-for-testing/last-known-good-versions-with-downloads.json",
+ ) as response:
+ versions = await response.json()
+ self.log.debug("Got chrome versions: %r", versions)
+ downloads = versions["channels"][channel]["downloads"]
+ version = versions["channels"][channel]["version"]
+ if version == current:
+ self.log.debug(f"Chrome is up to date ({versions} == {current})")
+ return
+
+ self.log.debug("Downloading chrome...")
+ chrome_zip_url = filter(lambda x: x["platform"] == "linux64", downloads["chrome-headless-shell"])
+ if not chrome_zip_url:
+ self.log.critical("No chrome zip url found for linux64 in %r.", downloads["chrome-headless-shell"])
+ raise RuntimeError("No chrome zip url found for linux64.")
+ chrome_zip_url = next(chrome_zip_url)["url"]
+ self.log.debug("Chrome zip url: %s", chrome_zip_url)
+
+ self.clear_directories()
+
+ chrome_target = (self.chrome_dir.parent / f"chrome-download-{version}.zip")
+ chromedriver_target = (self.chromedriver_dir.parent / f"chromedriver-download-{version}.zip")
+ if chrome_target.exists():
+ chrome_target.unlink()
+ if chromedriver_target.exists():
+ chromedriver_target.unlink()
+ with chrome_target.open("wb+") as file:
+ async with session.get(chrome_zip_url) as response:
+ async for data in response.content.iter_any():
+ self.log.debug("Read %d bytes from chrome zip.", len(data))
+ file.write(data)
+
+ self.log.debug("Extracting chrome...")
+ with zipfile.ZipFile(chrome_target) as zip_file:
+ await asyncio.get_event_loop().run_in_executor(
+ None,
+ zip_file.extractall,
+ self.chrome_dir.parent
+ )
+ self.log.debug("Finished extracting chrome.")
+
+ self.log.debug("Downloading chromedriver...")
+ chromedriver_zip_url = filter(lambda x: x["platform"] == "linux64", downloads["chromedriver"])
+ if not chromedriver_zip_url:
+ self.log.critical("No chromedriver zip url found for linux64 in %r.", downloads["chromedriver"])
+ raise RuntimeError("No chromedriver zip url found for linux64.")
+ chromedriver_zip_url = next(chromedriver_zip_url)["url"]
+ self.log.debug("Chromedriver zip url: %s", chromedriver_zip_url)
+
+ with chromedriver_target.open("wb+") as file:
+ async with session.get(chromedriver_zip_url) as response:
+ async for data in response.content.iter_any():
+ self.log.debug("Read %d bytes from chromedriver zip.", len(data))
+ file.write(data)
+
+ self.log.debug("Extracting chromedriver...")
+ with zipfile.ZipFile(chromedriver_target) as zip_file:
+ await asyncio.get_event_loop().run_in_executor(
+ None,
+ zip_file.extractall,
+ self.chromedriver_dir.parent
+ )
+ self.log.debug("Finished extracting chromedriver.")
+
+ self.log.debug("Making binaries executable.")
+ await asyncio.get_event_loop().run_in_executor(
+ None,
+ self.chrome_bin.chmod,
+ 0o775
+ )
+ await asyncio.get_event_loop().run_in_executor(
+ None,
+ self.chromedriver_bin.chmod,
+ 0o775
+ )
+ self.log.debug("Finished making binaries executable.")
+
+ async def get_version(self, full: bool = False) -> str:
+ proc = await asyncio.create_subprocess_exec(
+ str(self.chromedriver_bin),
+ "--version",
+ stdout=asyncio.subprocess.PIPE,
+ stderr=asyncio.subprocess.PIPE
+ )
+ stdout, stderr = await proc.communicate()
+ if proc.returncode != 0:
+ raise RuntimeError(f"Error getting chromedriver version: {stderr.decode()}")
+ if full:
+ return stdout.decode().strip()
+ return stdout.decode().strip().split(" ")[1]
+
+ async def is_up_to_date(self, channel: str = "Stable"):
+ try:
+ current = await self.get_version(False)
+ except (RuntimeError, FileNotFoundError):
+ return False
+ async with aiohttp.ClientSession(raise_for_status=True) as session:
+ async with session.get(
+ "https://googlechromelabs.github.io/chrome-for-testing/last-known-good-versions-with-downloads.json",
+ ) as response:
+ versions = await response.json()
+ self.log.debug("Got chrome versions: %r", versions)
+ version = versions["channels"][channel]["version"]
+ if version == current:
+ self.log.debug(f"Chrome is up to date ({versions} == {current})")
+ return True
+ return False
+
+ @commands.command(name="update-chrome")
+ async def update_chrome(self, ctx: commands.Context, channel: str = "Stable"):
+ channel = channel.title()
+ if await self.is_up_to_date(channel):
+ await ctx.reply("Chrome is already up to date. Updating anyway.")
+ async with ctx.channel.typing():
+ try:
+ await self.download_latest_chrome(channel)
+ except RuntimeError as e:
+ return await ctx.reply(f"\N{cross mark} Error downloading chrome: {e}")
+
+ chrome_okay = self.chrome_bin.exists() and self.chrome_bin.is_file()
+ chromedriver_okay = self.chromedriver_bin.exists() and self.chromedriver_bin.is_file()
+
+ try:
+ chromedriver_version = await self.get_version(True)
+ except RuntimeError as e:
+ chromedriver_version = str(e)
+
+ return await ctx.reply(
+ f"\N{white heavy check mark} Done.\n"
+ f"CHANNEL: {channel}\n"
+ f"CHROME OKAY: {chrome_okay}\n"
+ f"CHROMEDRIVER OKAY: {chromedriver_okay}\n"
+ f"CHROMEDRIVER VERSION: {chromedriver_version}"
+ )
+
+ def compress_png(self, input_file: io.BytesIO) -> io.BytesIO:
+ img = Image.open(input_file)
+ img = img.convert("RGB")
+ with tempfile.NamedTemporaryFile(suffix=".webp") as file:
+ quality = 100
+ while quality > 0:
+ if quality == 100:
+ quality_r = 99
+ else:
+ quality_r = quality
+ self.log.debug("Compressing image with quality %d%%", quality_r)
+ img.save(file.name, "webp", quality=quality_r)
+ file.seek(0)
+ value = io.BytesIO(file.read())
+ if len(value.getvalue()) <= 24 * 1024 * 1024:
+ self.log.debug("%d%% was sufficient.", quality_r)
+ break
+ quality -= 15
+ else:
+ raise RuntimeError("Couldn't compress image.")
+ return value
+
+ @commands.slash_command()
+ async def screenshot(
+ self,
+ ctx: discord.ApplicationContext,
+ url: str,
+ load_timeout: int = 10,
+ render_timeout: int = None,
+ eager: bool = None,
+ resolution: str = "1920x1080"
+ ):
+ """Screenshots a webpage."""
+ await ctx.defer()
+
+ if eager is None:
+ eager = render_timeout is None
+ if render_timeout is None:
+ render_timeout = 30 if eager else 10
+ if not url.startswith("http"):
+ url = "https://" + url
+ parsed = urlparse(url)
+ await ctx.respond("Initialising...")
+
+ if not all(map(lambda x: x.exists() and x.is_file(), (self.chrome_bin, self.chromedriver_bin))):
+ await ctx.edit(content="Chrome is not installed, downloading. This may take a minute.")
+ await self.download_latest_chrome()
+ await ctx.edit(content="Initialising...")
+ elif not await self.is_up_to_date():
+ await ctx.edit(content="Updating chrome. This may take a minute.")
+ await self.download_latest_chrome()
+ await ctx.edit(content="Initialising...")
+
+ start_init = time.time()
+ service = await asyncio.to_thread(ChromeService, str(self.chromedriver_bin))
+ driver: webdriver.Chrome = await asyncio.to_thread(
+ webdriver.Chrome,
+ service=service,
+ options=self.chrome_options
+ )
+ driver.set_page_load_timeout(load_timeout)
+ if resolution:
+ try:
+ width, height = map(int, resolution.split("x"))
+ driver.set_window_size(width, height)
+ if height > 4320 or width > 7680:
+ return await ctx.respond("Invalid resolution. Max resolution is 7680x4320 (8K).")
+ except ValueError:
+ return await ctx.respond("Invalid resolution. please provide width x height, e.g. 1920x1080")
+ if eager:
+ driver.implicitly_wait(render_timeout)
+ end_init = time.time()
+
+ await ctx.edit(content=("Loading webpage..." if not eager else "Loading & screenshotting webpage..."))
+ start_request = time.time()
+ await asyncio.to_thread(driver.get, url)
+ end_request = time.time()
+
+ if not eager:
+ now = discord.utils.utcnow()
+ expires = now + datetime.timedelta(seconds=render_timeout)
+ await ctx.edit(content=f"Rendering (expires {discord.utils.format_dt(expires, 'R')})...")
+ start_wait = time.time()
+ await asyncio.sleep(render_timeout)
+ end_wait = time.time()
+ else:
+ start_wait = end_wait = 1
+
+ await ctx.edit(content="Saving screenshot...")
+ start_save = time.time()
+ ss = await asyncio.to_thread(driver.get_screenshot_as_png)
+ file = io.BytesIO()
+ await asyncio.to_thread(file.write, ss)
+ file.seek(0)
+ end_save = time.time()
+
+ if len(await asyncio.to_thread(file.getvalue)) > 24 * 1024 * 1024:
+ start_compress = time.time()
+ file = await asyncio.to_thread(self.compress_png, file)
+ fn = "screenshot.webp"
+ end_compress = time.time()
+ else:
+ fn = "screenshot.png"
+ start_compress = end_compress = 1
+
+ await ctx.edit(content="Cleaning up...")
+ start_cleanup = time.time()
+ await asyncio.to_thread(driver.quit)
+ end_cleanup = time.time()
+
+ screenshot_size_mb = round(len(await asyncio.to_thread(file.getvalue)) / 1024 / 1024, 2)
+
+ def seconds(start: float, end: float) -> float:
+ return round(end - start, 2)
+
+ embed = discord.Embed(
+ title=f"Screenshot of {parsed.hostname}",
+ description=f"Init time: {seconds(start_init, end_init)}s\n"
+ f"Request time: {seconds(start_request, end_request)}s\n"
+ f"Wait time: {seconds(start_wait, end_wait)}s\n"
+ f"Save time: {seconds(start_save, end_save)}s\n"
+ f"Compress time: {seconds(start_compress, end_compress)}s\n"
+ f"Cleanup time: {seconds(start_cleanup, end_cleanup)}s\n"
+ f"Screenshot size: {screenshot_size_mb}MB\n",
+ colour=discord.Colour.dark_theme(),
+ timestamp=discord.utils.utcnow()
+ )
+ embed.set_image(url="attachment://" + fn)
+ return await ctx.edit(content=None, embed=embed, file=discord.File(file, filename=fn))
+
+
+def setup(bot):
+ bot.add_cog(ScreenshotCog(bot))
diff --git a/cogs/ytdl.py b/cogs/ytdl.py
new file mode 100644
index 0000000..1e9b796
--- /dev/null
+++ b/cogs/ytdl.py
@@ -0,0 +1,335 @@
+import asyncio
+import functools
+import logging
+import textwrap
+import typing
+
+import discord
+import yt_dlp
+from urllib.parse import urlparse
+from pathlib import Path
+import tempfile
+from discord.ext import commands
+
+
+COOKIES_TXT = Path.cwd() / "cookies.txt"
+
+
+class YTDLCog(commands.Cog):
+ def __init__(self, bot: commands.Bot) -> None:
+ self.bot = bot
+ self.log = logging.getLogger("jimmy.cogs.ytdl")
+ self.common_formats = {
+ "144p": "17", # mp4 (h264+aac) v
+ "240p": "133+139",
+ "360p": "18",
+ "480p": "135+139",
+ "720p": "22",
+ "1080p": "137+140",
+ "1440p": "248+251", # webm (vp9+opus) v
+ "2160p": "313+251",
+ "mp3": "ba[filesize<25M]",
+ "m4a": "ba[ext=m4a][filesize<25M]",
+ "opus": "ba[ext=webm][filesize<25M]",
+ "vorbis": "ba[ext=webm][filesize<25M]",
+ "ogg": "ba[ext=webm][filesize<25M]",
+ }
+ self.default_options = {
+ "noplaylist": True,
+ "nocheckcertificate": True,
+ "no_color": True,
+ "noprogress": True,
+ "logger": self.log,
+ "format": "((bv+ba/b)[vcodec!=h265][vcodec!=av01][filesize<15M]/b[filesize<=15M]/b)",
+ "outtmpl": f"%(title).50s.%(ext)s",
+ "format_sort": [
+ "vcodec:h264",
+ "acodec:aac",
+ "vcodec:vp9",
+ "acodec:opus",
+ "acodec:vorbis",
+ "vcodec:vp8",
+ "ext",
+ ],
+ "merge_output_format": "webm/mp4/mov/m4a/oga/ogg/mp3/mka/mkv",
+ "source_address": "0.0.0.0",
+ "concurrent_fragment_downloads": 4,
+ "max_filesize": (25 * 1024 * 1024) - 256
+ }
+ self.colours = {
+ "youtube.com": 0xff0000,
+ "tiktok.com": 0x25F5EF,
+ "instagram.com": 0xe1306c,
+ "shronk.net": 0xFFF952
+ }
+
+ @commands.slash_command(name="yt-dl")
+ @commands.max_concurrency(1, wait=False)
+ # @commands.bot_has_permissions(send_messages=True, embed_links=True, attach_files=True)
+ async def yt_dl_command(
+ self,
+ ctx: discord.ApplicationContext,
+ url: typing.Annotated[
+ str,
+ discord.Option(
+ str,
+ description="The URL to download from.",
+ required=True
+ )
+ ],
+ user_format: typing.Annotated[
+ typing.Optional[str],
+ discord.Option(
+ str,
+ name="format",
+ description="The name of the format to download. Can also specify resolutions for youtube.",
+ required=False,
+ default=None
+ )
+ ],
+ audio_only: typing.Annotated[
+ bool,
+ discord.Option(
+ bool,
+ name="audio-only",
+ description="Whether to convert result into an m4a file. Overwrites `format` if True.",
+ required=False,
+ default=False,
+ )
+ ],
+ snip: typing.Annotated[
+ typing.Optional[str],
+ discord.Option(
+ str,
+ description="A start and end position to trim. e.g. 00:00:00-00:10:00.",
+ required=False
+ )
+ ]
+ ):
+ """Runs yt-dlp and outputs into discord."""
+ await ctx.defer()
+ options = self.default_options.copy()
+ description = ""
+
+ with tempfile.TemporaryDirectory(prefix="jimmy-ytdl-") as temp_dir:
+ temp_dir = Path(temp_dir)
+ paths = {
+ target: str(temp_dir)
+ for target in (
+ "home",
+ "temp",
+ )
+ }
+
+ chosen_format = self.default_options["format"]
+ if user_format:
+ if user_format in self.common_formats:
+ chosen_format = self.common_formats[user_format]
+ else:
+ chosen_format = user_format
+
+ if audio_only:
+ # Overwrite format here to be best audio under 25 megabytes.
+ chosen_format = "ba[filesize<20M]"
+ # Also force sorting by the best audio bitrate first.
+ options["format_sort"] = [
+ "abr",
+ "br"
+ ]
+ options["postprocessors"] = [
+ {"key": "FFmpegExtractAudio", "preferredquality": "96", "preferredcodec": "best"}
+ ]
+ options["format"] = chosen_format
+ options["paths"] = paths
+
+ with yt_dlp.YoutubeDL(options) as downloader:
+ await ctx.respond(
+ embed=discord.Embed().set_footer(text="Downloading (step 1/10)")
+ )
+ try:
+ # noinspection PyTypeChecker
+ extracted_info = await asyncio.to_thread(downloader.extract_info, url, download=False)
+ except yt_dlp.utils.DownloadError as e:
+ title = "error"
+ description = str(e)
+ thumbnail_url = webpage_url = None
+ else:
+ title = extracted_info.get("title", url)
+ title = textwrap.shorten(title, 100)
+ thumbnail_url = extracted_info.get("thumbnail") or None
+ webpage_url = extracted_info.get("webpage_url") or None
+
+ chosen_format = extracted_info.get("format")
+ chosen_format_id = extracted_info.get("format_id")
+ final_extension = extracted_info.get("ext")
+ format_note = extracted_info.get("format_note", "%s (%s)" % (chosen_format, chosen_format_id))
+ resolution = extracted_info.get("resolution")
+ fps = extracted_info.get("fps")
+ vcodec = extracted_info.get("vcodec")
+ acodec = extracted_info.get("acodec")
+ filesize = extracted_info.get("filesize", extracted_info.get("filesize_approx", 1))
+
+ lines = []
+ if chosen_format and chosen_format_id:
+ lines.append(
+ "* Chosen format: `%s` (`%s`)" % (chosen_format, chosen_format_id),
+ )
+ if format_note:
+ lines.append("* Format note: %r" % format_note)
+ if final_extension:
+ lines.append("* File extension: " + final_extension)
+ if resolution:
+ _s = resolution
+ if fps:
+ _s += " @ %s FPS" % fps
+ lines.append("* Resolution: " + _s)
+ if vcodec or acodec:
+ lines.append("%s+%s" % (vcodec or "N/A", acodec or "N/A"))
+ if filesize:
+ lines.append("* Filesize: %s" % yt_dlp.utils.format_bytes(filesize))
+
+ if lines:
+ description += "\n"
+ description += "\n".join(lines)
+
+ domain = urlparse(webpage_url).netloc
+ await ctx.edit(
+ embed=discord.Embed(
+ title=title,
+ description=description,
+ url=webpage_url,
+ colour=self.colours.get(domain, discord.Colour.og_blurple())
+ ).set_footer(text="Downloading (step 2/10)").set_thumbnail(url=thumbnail_url)
+ )
+ try:
+ await asyncio.to_thread(functools.partial(downloader.download, [url]))
+ except yt_dlp.DownloadError as e:
+ logging.error(e, exc_info=True)
+ return await ctx.edit(
+ embed=discord.Embed(
+ title="Error",
+ description=f"Download failed:\n```\n{e}\n```",
+ colour=discord.Colour.red(),
+ url=webpage_url,
+ ),
+ delete_after=120,
+ )
+ try:
+ file = next(temp_dir.glob("*." + extracted_info["ext"]))
+ except StopIteration:
+ return await ctx.edit(
+ embed=discord.Embed(
+ title="Error",
+ description="Failed to locate downloaded video file.\n"
+ f"Files: {', '.join(list(map(str, temp_dir.iterdir())))}",
+ colour=discord.Colour.red(),
+ url=webpage_url
+ )
+ )
+
+ if snip:
+ try:
+ trim_start, trim_end = snip.split("-")
+ except ValueError:
+ trim_start, trim_end = snip, None
+ trim_start = trim_start or "00:00:00"
+ trim_end = trim_end or extracted_info.get("duration_string", "00:30:00")
+ new_file = temp_dir / ("output." + file.suffix)
+ args = [
+ "-hwaccel",
+ "auto",
+ "-ss",
+ trim_start,
+ "-i",
+ str(file),
+ "-to",
+ trim_end,
+ "-preset",
+ "faster",
+ "-crf",
+ "28",
+ "-deadline",
+ "realtime",
+ "-cpu-used",
+ "5",
+ "-movflags",
+ "faststart",
+ "-b:a",
+ "48k",
+ "-y",
+ "-strict",
+ "2",
+ str(new_file)
+ ]
+ async with ctx.channel.typing():
+ await ctx.edit(
+ embed=discord.Embed(
+ title=f"Trimming from {trim_start} to {trim_end}.",
+ description="Please wait, this may take a couple of minutes.",
+ colour=discord.Colour.og_blurple(),
+ timestamp=discord.utils.utcnow()
+ )
+ )
+ process = await asyncio.create_subprocess_exec(
+ "ffmpeg",
+ *args,
+ stdout=asyncio.subprocess.PIPE,
+ stderr=asyncio.subprocess.PIPE
+ )
+ stdout, stderr = await process.communicate()
+ if process.returncode != 0:
+ return await ctx.edit(
+ embed=discord.Embed(
+ title="Error",
+ description=f"Trimming failed:\n```\n{stderr.decode()}\n```",
+ colour=discord.Colour.red(),
+ url=webpage_url
+ )
+ )
+ file = new_file
+
+ stat = file.stat()
+ size_bytes = stat.st_size
+ if size_bytes >= ((25 * 1024 * 1024) - 256):
+ return await ctx.edit(
+ embed=discord.Embed(
+ title="Error",
+ description=f"File is too large to upload ({round(size_bytes / 1024 / 1024)}MB).",
+ colour=discord.Colour.red(),
+ url=webpage_url
+ )
+ )
+ size_megabits = (size_bytes * 8) / 1024 / 1024
+ eta_seconds = size_megabits / 20
+ upload_file = await asyncio.to_thread(discord.File, file, filename=file.name)
+ await ctx.edit(
+ embed=discord.Embed(
+ title="Uploading...",
+ description=f"ETA ",
+ colour=discord.Colour.og_blurple(),
+ timestamp=discord.utils.utcnow()
+ )
+ )
+ try:
+ await ctx.edit(
+ file=upload_file,
+ embed=discord.Embed(
+ title=f"Downloaded {title}!",
+ colour=discord.Colour.green(),
+ timestamp=discord.utils.utcnow(),
+ url=webpage_url
+ )
+ )
+ except discord.HTTPException as e:
+ self.log.error(e, exc_info=True)
+ return await ctx.edit(
+ embed=discord.Embed(
+ title="Error",
+ description=f"Upload failed:\n```\n{e}\n```",
+ colour=discord.Colour.red(),
+ url=webpage_url
+ )
+ )
+
+def setup(bot):
+ bot.add_cog(YTDLCog(bot))
diff --git a/config.example.toml b/config.example.toml
new file mode 100644
index 0000000..189353c
--- /dev/null
+++ b/config.example.toml
@@ -0,0 +1,2 @@
+[jimmy]
+token = "foo"
diff --git a/main.py b/main.py
new file mode 100644
index 0000000..4b898ef
--- /dev/null
+++ b/main.py
@@ -0,0 +1,91 @@
+import datetime
+import traceback
+
+import toml
+import discord
+import logging
+from pathlib import Path
+from rich.logging import RichHandler
+from logging import FileHandler
+from discord.ext import commands
+
+
+log = logging.getLogger("jimmy")
+
+try:
+ CONFIG = toml.load('config.toml')
+except FileNotFoundError:
+ log.critical("Unable to locate config.toml.", exc_info=True)
+ raise
+
+logging.basicConfig(
+ format="%(asctime)s %(levelname)s %(name)s %(message)s",
+ datefmt="%Y-%m-%d %H:%M:%S",
+ level=CONFIG.get("logging", {}).get("level", "INFO"),
+ handlers=[
+ RichHandler(
+ level=CONFIG.get("logging", {}).get("level", "INFO"),
+ show_time=False,
+ show_path=False,
+ markup=True
+ ),
+ FileHandler(
+ filename=CONFIG.get("logging", {}).get("file", "jimmy.log"),
+ mode="a",
+ )
+ ]
+)
+
+bot = commands.Bot(
+ command_prefix=commands.when_mentioned_or("h!", "H!"),
+ case_insensitive=True,
+ strip_after_prefix=True,
+ debug_guilds=CONFIG["jimmy"].get("debug_guilds")
+)
+
+bot.load_extension("cogs.ytdl")
+bot.load_extension("cogs.net")
+bot.load_extension("cogs.screenshot")
+
+
+@bot.event
+async def on_ready():
+ log.info(f"Logged in as {bot.user} ({bot.user.id})")
+
+
+@bot.listen()
+async def on_application_command(ctx: discord.ApplicationContext):
+ log.info(f"Received command [b]{ctx.command}[/] from {ctx.author} in {ctx.guild}")
+ ctx.start_time = discord.utils.utcnow()
+
+
+@bot.listen()
+async def on_application_command_error(ctx: discord.ApplicationContext, exc: Exception):
+ log.error(f"Error in {ctx.command} from {ctx.author} in {ctx.guild}", exc_info=exc)
+ if isinstance(exc, commands.CommandOnCooldown):
+ expires = discord.utils.utcnow() + datetime.timedelta(seconds=exc.retry_after)
+ await ctx.respond(f"Command on cooldown. Try again {discord.utils.format_dt(expires, style='R')}.")
+ elif isinstance(exc, commands.MaxConcurrencyReached):
+ await ctx.respond("You've reached the maximum number of concurrent uses for this command.")
+ else:
+ if await bot.is_owner(ctx.author):
+ paginator = commands.Paginator(prefix="```py")
+ for line in traceback.format_exception(type(exc), exc, exc.__traceback__):
+ paginator.add_line(line[:1990])
+ for page in paginator.pages:
+ await ctx.respond(page)
+ else:
+ await ctx.respond(f"An error occurred while processing your command. Please try again later.\n"
+ f"{exc}")
+
+
+@bot.listen()
+async def on_application_command_completion(ctx: discord.ApplicationContext):
+ time_taken = discord.utils.utcnow() - ctx.start_time
+ log.info(
+ f"Completed command [b]{ctx.command}[/] from {ctx.author} in "
+ f"{ctx.guild} in {time_taken.total_seconds():.2f} seconds."
+ )
+
+
+bot.run(CONFIG["jimmy"]["token"])
diff --git a/requirements.txt b/requirements.txt
new file mode 100644
index 0000000..85c37a1
--- /dev/null
+++ b/requirements.txt
@@ -0,0 +1,9 @@
+wheel>=0.42
+setuptools>=69
+yt-dlp @ https://github.com/yt-dlp/yt-dlp/archive/master.tar.gz
+py-cord==2.4.1
+httpx==0.26
+psycopg==3.1.16
+toml==0.10.2
+pillow==10.2
+selenium==4.16