college-bot-v1/cogs/other.py
2023-03-20 14:49:45 +00:00

954 lines
39 KiB
Python

import asyncio
import io
import os
import random
import re
import tempfile
import textwrap
from datetime import timedelta
from io import BytesIO
import dns.resolver
import httpx
from dns import asyncresolver
import aiofiles
import pyttsx3
from time import time, time_ns, sleep
from typing import Literal
from typing import Tuple, Optional, Dict
from pathlib import Path
from urllib.parse import urlparse
import aiohttp
import discord
import psutil
from discord.ext import commands
from rich.tree import Tree
from selenium import webdriver
from selenium.common.exceptions import WebDriverException
from selenium.webdriver.chrome.options import Options as ChromeOptions
from selenium.webdriver.chrome.service import Service as ChromeService
from selenium.webdriver.firefox.options import Options as FirefoxOptions
from selenium.webdriver.firefox.service import Service as FirefoxService
# from selenium.webdriver.ie
from utils import console
_engine = pyttsx3.init()
# noinspection PyTypeChecker
VOICES = [x.id for x in _engine.getProperty("voices")]
del _engine
# noinspection DuplicatedCode
class OtherCog(commands.Cog):
def __init__(self, bot):
self.bot = bot
self.lock = asyncio.Lock()
self.http = httpx.AsyncClient()
class AbortScreenshotTask(discord.ui.View):
def __init__(self, task: asyncio.Task):
super().__init__()
self.task = task
@discord.ui.button(label="Abort", style=discord.ButtonStyle.red)
async def abort(self, button: discord.ui.Button, interaction: discord.Interaction):
new: discord.Interaction = await interaction.response.send_message("Aborting...", ephemeral=True)
self.task.cancel()
try:
await self.task
except asyncio.CancelledError:
pass
self.disable_all_items()
button.label = "[ aborted ]"
await new.edit_original_response(content="Aborted screenshot task.", view=self)
self.stop()
async def screenshot_website(
self,
ctx: discord.ApplicationContext,
website: str,
driver: Literal["chrome", "firefox"],
render_time: int = 10,
load_timeout: int = 30,
window_height: int = 1920,
window_width: int = 1080,
full_screenshot: bool = False,
) -> Tuple[discord.File, str, int, int]:
async def _blocking(*args):
return await self.bot.loop.run_in_executor(None, *args)
def find_driver():
nonlocal driver, driver_path
drivers = {
"firefox": [
"/usr/bin/firefox-esr",
"/usr/bin/firefox",
],
"chrome": ["/usr/bin/chromium", "/usr/bin/chrome", "/usr/bin/chrome-browser", "/usr/bin/google-chrome"],
}
selected_driver = driver
arr = drivers.pop(selected_driver)
for binary in arr:
b = Path(binary).resolve()
if not b.exists():
continue
driver = selected_driver
driver_path = b
break
else:
for key, value in drivers.items():
for binary in value:
b = Path(binary).resolve()
if not b.exists():
continue
driver = key
driver_path = b
break
else:
continue
break
else:
raise RuntimeError("No browser binary.")
return driver, driver_path
driver, driver_path = find_driver()
console.log(
"Using driver '{}' with binary '{}' to screenshot '{}', as requested by {}.".format(
driver, driver_path, website, ctx.user
)
)
def _setup():
nonlocal driver
if driver == "chrome":
options = ChromeOptions()
options.add_argument("--headless")
options.add_argument("--no-sandbox")
options.add_argument("--disable-dev-shm-usage")
options.add_argument("--disable-gpu")
options.add_argument("--disable-extensions")
options.add_argument("--incognito")
options.binary_location = str(driver_path)
service = ChromeService("/usr/bin/chromedriver")
driver = webdriver.Chrome(service=service, options=options)
driver.set_window_size(window_height, window_width)
else:
options = FirefoxOptions()
options.add_argument("--headless")
options.add_argument("--private-window")
options.add_argument("--safe-mode")
options.add_argument("--new-instance")
options.binary_location = str(driver_path)
service = FirefoxService("/usr/bin/geckodriver")
driver = webdriver.Firefox(service=service, options=options)
driver.set_window_size(window_height, window_width)
return driver, textwrap.shorten(website, 100)
# Is it overkill to cast this to a thread? yes
# Do I give a flying fuck? kinda
# Why am I doing this? I suspect setup is causing a ~10-second block of the event loop
driver_name = driver
start_init = time()
driver, friendly_url = await asyncio.to_thread(_setup)
end_init = time()
console.log("Driver '{}' initialised in {} seconds.".format(driver_name, round(end_init - start_init, 2)))
def _edit(content: str):
self.bot.loop.create_task(ctx.interaction.edit_original_response(content=content))
expires = round(time() + load_timeout)
_edit(content=f"Screenshotting <{friendly_url}>... (49%, loading webpage, aborts <t:{expires}:R>)")
await _blocking(driver.set_page_load_timeout, load_timeout)
start = time()
await _blocking(driver.get, website)
end = time()
get_time = round((end - start) * 1000)
render_time_expires = round(time() + render_time)
_edit(content=f"Screenshotting <{friendly_url}>... (66%, stopping render <t:{render_time_expires}:R>)")
await asyncio.sleep(render_time)
_edit(content=f"Screenshotting <{friendly_url}>... (83%, saving screenshot)")
domain = re.sub(r"https?://", "", website)
screenshot_method = driver.get_screenshot_as_png
if full_screenshot and driver_name == "firefox":
screenshot_method = driver.get_full_page_screenshot_as_png
start = time()
data = await _blocking(screenshot_method)
_io = io.BytesIO()
# Write the data async because HAHAHAHAHAHAHA
# We'll do it in the existing event loop though because less overhead
await _blocking(_io.write, data)
_io.seek(0)
end = time()
screenshot_time = round((end - start) * 1000)
driver.quit()
return discord.File(_io, f"{domain}.png"), driver_name, get_time, screenshot_time
@staticmethod
async def get_interface_ip_addresses() -> Dict[str, list[Dict[str, str | bool | int]]]:
addresses = await asyncio.to_thread(psutil.net_if_addrs)
stats = await asyncio.to_thread(psutil.net_if_stats)
result = {}
for key in addresses.keys():
result[key] = []
for ip_addr in addresses[key]:
if ip_addr.broadcast is None:
continue
else:
result[key].append(
{
"ip": ip_addr.address,
"netmask": ip_addr.netmask,
"broadcast": ip_addr.broadcast,
"up": stats[key].isup,
"speed": stats[key].speed,
}
)
return result
async def analyse_text(self, text: str) -> Optional[Tuple[float, float, float, float]]:
"""Analyse text for positivity, negativity and neutrality."""
def inner():
try:
from utils.sentiment_analysis import intensity_analyser
except ImportError:
return None
scores = intensity_analyser.polarity_scores(text)
return scores["pos"], scores["neu"], scores["neg"], scores["compound"]
async with self.bot.training_lock:
return await self.bot.loop.run_in_executor(None, inner)
@staticmethod
async def get_xkcd(session: aiohttp.ClientSession, n: int) -> dict | None:
async with session.get("https://xkcd.com/{!s}/info.0.json".format(n)) as response:
if response.status == 200:
data = await response.json()
return data
@staticmethod
async def random_xkcd_number(session: aiohttp.ClientSession) -> int:
async with session.get("https://c.xkcd.com/random/comic") as response:
if response.status != 302:
number = random.randint(100, 999)
else:
number = int(response.headers["location"].split("/")[-2])
return number
@staticmethod
async def random_xkcd(session: aiohttp.ClientSession) -> dict | None:
"""Fetches a random XKCD.
Basically a shorthand for random_xkcd_number and get_xkcd.
"""
number = await OtherCog.random_xkcd_number(session)
return await OtherCog.get_xkcd(session, number)
@staticmethod
def get_xkcd_embed(data: dict) -> discord.Embed:
embed = discord.Embed(
title=data["safe_title"], description=data["alt"], color=discord.Colour.embed_background()
)
embed.set_footer(text="XKCD #{!s}".format(data["num"]))
embed.set_image(url=data["img"])
return embed
@staticmethod
async def generate_xkcd(n: int = None) -> discord.Embed:
async with aiohttp.ClientSession() as session:
if n is None:
data = await OtherCog.random_xkcd(session)
n = data["num"]
else:
data = await OtherCog.get_xkcd(session, n)
if data is None:
return discord.Embed(
title="Failed to load XKCD :(", description="Try again later.", color=discord.Colour.red()
).set_footer(text="Attempted to retrieve XKCD #{!s}".format(n))
return OtherCog.get_xkcd_embed(data)
class XKCDGalleryView(discord.ui.View):
def __init__(self, n: int):
super().__init__(timeout=300, disable_on_timeout=True)
self.n = n
def __rich_repr__(self):
yield "n", self.n
yield "message", self.message
@discord.ui.button(label="Previous", style=discord.ButtonStyle.blurple)
async def previous_comic(self, _, interaction: discord.Interaction):
self.n -= 1
await interaction.response.defer()
await interaction.edit_original_response(embed=await OtherCog.generate_xkcd(self.n))
@discord.ui.button(label="Random", style=discord.ButtonStyle.blurple)
async def random_comic(self, _, interaction: discord.Interaction):
await interaction.response.defer()
await interaction.edit_original_response(embed=await OtherCog.generate_xkcd())
self.n = random.randint(1, 999)
@discord.ui.button(label="Next", style=discord.ButtonStyle.blurple)
async def next_comic(self, _, interaction: discord.Interaction):
self.n += 1
await interaction.response.defer()
await interaction.edit_original_response(embed=await OtherCog.generate_xkcd(self.n))
@commands.slash_command()
async def xkcd(self, ctx: discord.ApplicationContext, *, number: int = None):
"""Shows an XKCD comic"""
embed = await self.generate_xkcd(number)
view = self.XKCDGalleryView(number)
return await ctx.respond(embed=embed, view=view)
@commands.slash_command()
async def sentiment(self, ctx: discord.ApplicationContext, *, text: str):
"""Attempts to detect a text's tone"""
await ctx.defer()
if not text:
return await ctx.respond("You need to provide some text to analyse.")
result = await self.analyse_text(text)
if result is None:
return await ctx.edit(content="Failed to load sentiment analysis module.")
embed = discord.Embed(title="Sentiment Analysis", color=discord.Colour.embed_background())
embed.add_field(name="Positive", value="{:.2%}".format(result[0]))
embed.add_field(name="Neutral", value="{:.2%}".format(result[2]))
embed.add_field(name="Negative", value="{:.2%}".format(result[1]))
embed.add_field(name="Compound", value="{:.2%}".format(result[3]))
return await ctx.edit(content=None, embed=embed)
@commands.message_command(name="Detect Sentiment")
async def message_sentiment(self, ctx: discord.ApplicationContext, message: discord.Message):
await ctx.defer()
text = str(message.clean_content)
if not text:
return await ctx.respond("You need to provide some text to analyse.")
await ctx.respond("Analyzing (this may take some time)...")
result = await self.analyse_text(text)
if result is None:
return await ctx.edit(content="Failed to load sentiment analysis module.")
embed = discord.Embed(title="Sentiment Analysis", color=discord.Colour.embed_background())
embed.add_field(name="Positive", value="{:.2%}".format(result[0]))
embed.add_field(name="Neutral", value="{:.2%}".format(result[2]))
embed.add_field(name="Negative", value="{:.2%}".format(result[1]))
embed.add_field(name="Compound", value="{:.2%}".format(result[3]))
embed.url = message.jump_url
return await ctx.edit(content=None, embed=embed)
corrupt_file = discord.SlashCommandGroup(
name="corrupt-file",
description="Corrupts files.",
)
@corrupt_file.command(name="generate")
async def generate_corrupt_file(self, ctx: discord.ApplicationContext, file_name: str, size_in_megabytes: float):
"""Generates a "corrupted" file."""
limit_mb = round(ctx.guild.filesize_limit / 1024 / 1024)
if size_in_megabytes > limit_mb:
return await ctx.respond(
f"File size must be less than {limit_mb} MB.\n"
"Want to corrupt larger files? see https://github.com/EEKIM10/cli-utils#installing-the-right-way"
" (and then run `ruin <file>`)."
)
await ctx.defer()
size = max(min(int(size_in_megabytes * 1024 * 1024), ctx.guild.filesize_limit), 1)
file = io.BytesIO()
file.write(os.urandom(size - 1024))
file.seek(0)
return await ctx.respond(file=discord.File(file, file_name))
@staticmethod
def do_file_corruption(file: io.BytesIO, passes: int, bound_start: int, bound_end: int):
for _ in range(passes):
file.seek(random.randint(bound_start, bound_end))
file.write(os.urandom(random.randint(128, 2048)))
file.seek(0)
return file
@corrupt_file.command(name="ruin")
async def ruin_corrupt_file(
self,
ctx: discord.ApplicationContext,
file: discord.Attachment,
passes: int = 10,
metadata_safety_boundary: float = 5,
):
"""Takes a file and corrupts parts of it"""
await ctx.defer()
attachment = file
if attachment.size > 8388608:
return await ctx.respond(
"File is too large. Max size 8mb.\n"
"Want to corrupt larger files? see https://github.com/EEKIM10/cli-utils#installing-the-right-way"
" (and then run `ruin <file>`)."
)
bound_pct = attachment.size * (0.01 * metadata_safety_boundary)
bound_start = round(bound_pct)
bound_end = round(attachment.size - bound_pct)
await ctx.respond("Downloading file...")
file = io.BytesIO(await file.read())
file.seek(0)
await ctx.edit(content="Corrupting file...")
file = await asyncio.to_thread(self.do_file_corruption, file, passes, bound_start, bound_end)
file.seek(0)
await ctx.edit(content="Uploading file...")
await ctx.edit(content="Here's your corrupted file!", file=discord.File(file, attachment.filename))
@commands.command(name="kys", aliases=["kill"])
@commands.is_owner()
async def end_your_life(self, ctx: commands.Context):
await ctx.send(":( okay")
await self.bot.close()
@commands.slash_command()
async def ip(self, ctx: discord.ApplicationContext, detailed: bool = False, secure: bool = True):
"""Gets current IP"""
if not await self.bot.is_owner(ctx.user):
return await ctx.respond("Internal IP: 0.0.0.0\nExternal IP: 0.0.0.0")
await ctx.defer(ephemeral=secure)
ips = await self.get_interface_ip_addresses()
root = Tree("IP Addresses")
internal = root.add("Internal")
external = root.add("External")
interfaces = internal.add("Interfaces")
for interface, addresses in ips.items():
interface_tree = interfaces.add(interface)
for address in addresses:
colour = "green" if address["up"] else "red"
ip_tree = interface_tree.add(
f"[{colour}]" + address["ip"] + ((" (up)" if address["up"] else " (down)") if not detailed else "")
)
if detailed:
ip_tree.add(f"IF Up: {'yes' if address['up'] else 'no'}")
ip_tree.add(f"Netmask: {address['netmask']}")
ip_tree.add(f"Broadcast: {address['broadcast']}")
async with aiohttp.ClientSession() as session:
try:
async with session.get("https://api.ipify.org") as resp:
external.add(await resp.text())
except aiohttp.ClientError as e:
external.add(f" [red]Error: {e}")
with console.capture() as capture:
console.print(root)
text = capture.get()
paginator = commands.Paginator(prefix="```", suffix="```")
for line in text.splitlines():
paginator.add_line(line)
for page in paginator.pages:
await ctx.respond(page, ephemeral=secure)
@commands.slash_command()
async def dig(
self,
ctx: discord.ApplicationContext,
domain: str,
_type: discord.Option(
str,
name="type",
default="A",
choices=[
"A",
"AAAA",
"ANY",
"AXFR",
"CNAME",
"HINFO",
"LOC",
"MX",
"NS",
"PTR",
"SOA",
"SRV",
"TXT",
],
),
):
"""Looks up a domain name"""
await ctx.defer()
if re.search(r"\s+", domain):
return await ctx.respond("Domain name cannot contain spaces.")
try:
response = await asyncresolver.resolve(
domain,
_type.upper(),
)
except Exception as e:
return await ctx.respond(f"Error: {e}")
res = response
tree = Tree(f"DNS Lookup for {domain}")
for record in res:
record_tree = tree.add(f"{record.rdtype.name} Record")
record_tree.add(f"Name: {res.name}")
record_tree.add(f"Value: {record.to_text()}")
with console.capture() as capture:
console.print(tree)
text = capture.get()
paginator = commands.Paginator(prefix="```", suffix="```")
for line in text.splitlines():
paginator.add_line(line)
paginator.add_line(empty=True)
paginator.add_line(f"Exit code: {0}")
paginator.add_line(f"DNS Server used: {res.nameserver}")
for page in paginator.pages:
await ctx.respond(page)
@commands.slash_command()
async def traceroute(
self,
ctx: discord.ApplicationContext,
url: str,
port: discord.Option(int, description="Port to use", default=None),
ping_type: discord.Option(
str,
name="ping-type",
description="Type of ping to use. See `traceroute --help`",
choices=["icmp", "tcp", "udp", "udplite", "dccp", "default"],
default="default",
),
use_ip_version: discord.Option(
str, name="ip-version", description="IP version to use.", choices=["ipv4", "ipv6"], default="ipv4"
),
max_ttl: discord.Option(int, name="ttl", description="Max number of hops", default=30),
):
"""Performs a traceroute request."""
await ctx.defer()
if re.search(r"\s+", url):
return await ctx.respond("URL cannot contain spaces.")
args = ["sudo", "-E", "-n", "traceroute"]
flags = {
"ping_type": {
"icmp": "-I",
"tcp": "-T",
"udp": "-U",
"udplite": "-UL",
"dccp": "-D",
},
"use_ip_version": {"ipv4": "-4", "ipv6": "-6"},
}
if ping_type != "default":
args.append(flags["ping_type"][ping_type])
else:
args = args[3:] # removes sudo
args.append(flags["use_ip_version"][use_ip_version])
args.append("-m")
args.append(str(max_ttl))
if port is not None:
args.append("-p")
args.append(str(port))
args.append(url)
paginator = commands.Paginator()
paginator.add_line(f"Running command: {' '.join(args[3 if args[0] == 'sudo' else 0:])}")
paginator.add_line(empty=True)
try:
start = time_ns()
process = await asyncio.create_subprocess_exec(
args[0],
*args[1:],
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
await process.wait()
stdout, stderr = await process.communicate()
end = time_ns()
time_taken_in_ms = (end - start) / 1000000
if stdout:
for line in stdout.splitlines():
paginator.add_line(line.decode())
if stderr:
for line in stderr.splitlines():
paginator.add_line(line.decode())
paginator.add_line(empty=True)
paginator.add_line(f"Exit code: {process.returncode}")
paginator.add_line(f"Time taken: {time_taken_in_ms:,.1f}ms")
except Exception as e:
paginator.add_line(f"Error: {e}")
for page in paginator.pages:
await ctx.respond(page)
@commands.slash_command()
@commands.max_concurrency(1, commands.BucketType.user)
@commands.cooldown(1, 30, commands.BucketType.user)
async def screenshot(
self,
ctx: discord.ApplicationContext,
url: str,
browser: discord.Option(str, description="Browser to use", choices=["chrome", "firefox"], default="chrome"),
render_timeout: discord.Option(int, name="render-timeout", description="Timeout for rendering", default=3),
load_timeout: discord.Option(int, name="load-timeout", description="Timeout for page load", default=60),
window_height: discord.Option(
int, name="window-height", description="the height of the window in pixels", default=1920
),
window_width: discord.Option(
int, name="window-width", description="the width of the window in pixels", default=1080
),
capture_whole_page: discord.Option(
bool,
name="capture-full-page",
description="(firefox only) whether to capture the full page or just the viewport.",
default=False,
),
):
"""Takes a screenshot of a URL"""
if capture_whole_page and browser != "firefox":
return await ctx.respond("The capture-full-page option is only available for firefox.")
window_width = max(min(1080 * 6, window_width), 1080 // 6)
window_height = max(min(1920 * 6, window_height), 1920 // 6)
await ctx.defer()
if ctx.user.id == 1019233057519177778 and ctx.me.guild_permissions.moderate_members:
if ctx.user.communication_disabled_until is None:
await ctx.user.timeout_for(timedelta(minutes=2), reason="no")
url = urlparse(url)
if not url.scheme:
if "/" in url.path:
hostname, path = url.path.split("/", 1)
else:
hostname = url.path
path = ""
url = url._replace(scheme="http", netloc=hostname, path=path)
friendly_url = textwrap.shorten(url.geturl(), 100)
await ctx.edit(content=f"Preparing to screenshot <{friendly_url}>... (0%, checking filters)")
async def blacklist_check() -> bool | str:
async with aiofiles.open("domains.txt") as blacklist:
for ln in await blacklist.readlines():
if not ln.strip():
continue
if re.match(ln.strip(), url.netloc):
return "Local blacklist"
return True
async def dns_check() -> Optional[bool | str]:
try:
for response in await asyncio.to_thread(dns.resolver.resolve, url.hostname, "A"):
if response.address == "0.0.0.0":
return "DNS blacklist"
except (dns.resolver.NXDOMAIN, dns.resolver.NoAnswer, dns.resolver.LifetimeTimeout, AttributeError):
return "Invalid domain or DNS error"
return True
done, pending = await asyncio.wait(
[
asyncio.create_task(blacklist_check(), name="local"),
asyncio.create_task(dns_check(), name="dns"),
],
return_when=asyncio.FIRST_COMPLETED,
)
done_tasks = done
try:
done = done_tasks.pop()
except KeyError:
return await ctx.respond("Something went wrong. Try again?\n")
result = await done
if not result:
return await ctx.edit(
content="That domain is blacklisted, doesn't exist, or there was no answer from the DNS server."
f" ({result!r})"
)
await asyncio.sleep(1)
await ctx.edit(content=f"Preparing to screenshot <{friendly_url}>... (16%, checking filters)")
okay = await (pending or done_tasks).pop()
if not okay:
return await ctx.edit(
content="That domain is blacklisted, doesn't exist, or there was no answer from the DNS server."
f" ({okay!r})"
)
await asyncio.sleep(1)
await ctx.edit(content=f"Screenshotting {textwrap.shorten(url.geturl(), 100)}... (33%, initializing browser)")
try:
async with self.lock:
screenshot, driver, fetch_time, screenshot_time = await self.screenshot_website(
ctx,
url.geturl(),
browser,
render_timeout,
load_timeout,
window_height,
window_width,
capture_whole_page,
)
except TimeoutError:
return await ctx.edit(content="Rendering screenshot timed out. Try using a smaller resolution.")
except WebDriverException as e:
paginator = commands.Paginator(prefix="```", suffix="```")
paginator.add_line("WebDriver Error (did you pass extreme or invalid command options?)")
paginator.add_line("Traceback:", empty=True)
for line in e.msg.splitlines():
paginator.add_line(line)
for page in paginator.pages:
await ctx.respond(page)
except Exception as e:
console.print_exception()
return await ctx.edit(content=f"Failed: {e}", delete_after=30)
else:
await ctx.edit(content=f"Screenshotting <{friendly_url}>... (99%, uploading image)")
await asyncio.sleep(0.5)
await ctx.edit(
content="Here's your screenshot!\n"
"Details:\n"
f"\\* Browser: {driver}\n"
f"\\* Resolution: {window_height}x{window_width} ({window_width*window_height:,} pixels)\n"
f"\\* URL: <{friendly_url}>\n"
f"\\* Load time: {fetch_time:.2f}ms\n"
f"\\* Screenshot render time: {screenshot_time:.2f}ms\n"
f"\\* Total time: {(fetch_time + screenshot_time):.2f}ms\n" +
(
'* Probability of being scat or something else horrifying: 100%'
if ctx.user.id == 1019233057519177778 else ''
),
file=screenshot,
)
domains = discord.SlashCommandGroup("domains", "Commands for managing domains")
@domains.command(name="add")
async def add_domain(self, ctx: discord.ApplicationContext, domain: str):
"""Adds a domain to the blacklist"""
await ctx.defer()
if not await self.bot.is_owner(ctx.user):
return await ctx.respond("You are not allowed to do that.")
async with aiofiles.open("domains.txt", "a") as blacklist:
await blacklist.write(domain.lower() + "\n")
await ctx.respond("Added domain to blacklist.")
@domains.command(name="remove")
async def remove_domain(self, ctx: discord.ApplicationContext, domain: str):
"""Removes a domain from the blacklist"""
await ctx.defer()
if not await self.bot.is_owner(ctx.user):
return await ctx.respond("You are not allowed to do that.")
async with aiofiles.open("domains.txt") as blacklist:
lines = await blacklist.readlines()
async with aiofiles.open("domains.txt", "w") as blacklist:
for line in lines:
if line.strip() != domain.lower():
await blacklist.write(line)
await ctx.respond("Removed domain from blacklist.")
# noinspection PyTypeHints
@commands.slash_command(name="yt-dl")
@commands.max_concurrency(1, commands.BucketType.user)
async def yt_dl(
self,
ctx: discord.ApplicationContext,
url: str,
proxy: str = None,
video_format: str = "",
upload_log: bool = True
):
"""Downloads a video from <URL> using youtube-dl"""
with tempfile.TemporaryDirectory(prefix="jimmy-ytdl-") as tempdir:
video_format = video_format.lower()
OUTPUT_FILE = str(Path(tempdir) / f"{ctx.user.id}.%(ext)s")
MAX_SIZE = ctx.guild.filesize_limit / 1024 ** 2
options = [
"--no-colors",
"--no-playlist",
"--no-check-certificates",
# "--max-filesize", str(MAX_SIZE) + "M",
"--no-warnings",
"--output", OUTPUT_FILE,
]
if proxy:
options.extend(["--proxy", proxy])
if video_format:
options.extend(["--format", video_format])
await ctx.defer()
await ctx.edit(content="Downloading video...")
try:
process = await asyncio.create_subprocess_exec(
"yt-dlp",
url,
*options,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
stdout, stderr = await process.communicate()
stdout_log = io.BytesIO(stdout)
stdout_log_file = discord.File(stdout_log, filename="stdout.txt")
stderr_log = io.BytesIO(stderr)
stderr_log_file = discord.File(stderr_log, filename="stderr.txt")
await process.wait()
except FileNotFoundError:
return await ctx.edit(content="Downloader not found.")
if process.returncode != 0:
files = [
stdout_log_file,
stderr_log_file
]
return await ctx.edit(content=f"Download failed:\n```\n{stderr.decode()}\n```", files=files)
await ctx.edit(content="Uploading video...")
files = [
stdout_log_file,
stderr_log_file
] if upload_log else []
for file_name in Path(tempdir).glob(f"{ctx.user.id}.*"):
stat = file_name.stat()
size_mb = stat.st_size / 1024 ** 2
if size_mb > MAX_SIZE - 0.1:
_x = io.BytesIO(
f"File {file_name.name} was too large ({size_mb:,.1f}MB vs {MAX_SIZE:.1f}MB)".encode()
)
files.append(discord.File(_x, filename=file_name.name + ".txt"))
try:
video = discord.File(file_name, filename=file_name.name)
files.append(video)
except FileNotFoundError:
continue
if not files:
return await ctx.edit(content="No files found.")
await ctx.edit(content="Here's your video!", files=files)
@commands.slash_command(name="text-to-mp3")
@commands.cooldown(5, 600, commands.BucketType.user)
async def text_to_mp3(
self,
ctx: discord.ApplicationContext,
speed: discord.Option(
int,
"The speed of the voice. Default is 150.",
required=False,
default=150
),
voice: discord.Option(
str,
"The voice to use. Some may cause timeout.",
autocomplete=discord.utils.basic_autocomplete(VOICES),
default="default"
)
):
"""Converts text to MP3. 5 uses per 10 minutes."""
if voice not in VOICES:
return await ctx.respond("Invalid voice.")
speed = min(300, max(50, speed))
_self = self
_bot = self.bot
class TextModal(discord.ui.Modal):
def __init__(self):
super().__init__(
discord.ui.InputText(
label="Text",
placeholder="Enter text to read",
min_length=1,
max_length=4000,
style=discord.InputTextStyle.long
),
title="Convert text to an MP3"
)
async def callback(self, interaction: discord.Interaction):
def _convert(text: str) -> Tuple[BytesIO, int]:
tmp_dir = tempfile.gettempdir()
target_fn = Path(tmp_dir) / f"jimmy-tts-{ctx.user.id}-{ctx.interaction.id}.mp3"
target_fn = str(target_fn)
engine = pyttsx3.init()
engine.setProperty("voice", voice)
engine.setProperty("rate", speed)
_io = BytesIO()
engine.save_to_file(text, target_fn)
engine.runAndWait()
last_3_sizes = [-3, -2, -1]
no_exists = 0
def should_loop():
if not os.path.exists(target_fn):
nonlocal no_exists
assert no_exists < 300, "File does not exist for 5 minutes."
no_exists += 1
return True
stat = os.stat(target_fn)
for _result in last_3_sizes:
if stat.st_size != _result:
return True
return False
while should_loop():
if os.path.exists(target_fn):
last_3_sizes.pop(0)
last_3_sizes.append(os.stat(target_fn).st_size)
sleep(1)
with open(target_fn, "rb") as f:
x = f.read()
_io.write(x)
os.remove(target_fn)
_io.seek(0)
return _io, len(x)
await interaction.response.defer()
text_pre = self.children[0].value
if text_pre.startswith("url:"):
_url = text_pre[4:].strip()
_msg = await interaction.followup.send("Downloading text...")
try:
response = await _self.http.get(_url, headers={"User-Agent": "Mozilla/5.0"})
if response.status_code != 200:
await _msg.edit(content=f"Failed to download text. Status code: {response.status_code}")
return
ct = response.headers.get("Content-Type", "application/octet-stream")
if not ct.startswith("text/plain"):
await _msg.edit(content=f"Failed to download text. Content-Type is {ct!r}, not text/plain")
return
text_pre = response.text
except (ConnectionError, httpx.HTTPError, httpx.NetworkError) as e:
await _msg.edit(content="Failed to download text. " + str(e))
return
else:
await _msg.edit(content="Text downloaded; Converting to MP3...")
else:
_msg = await interaction.followup.send("Converting text to MP3...")
try:
mp3, size = await _bot.loop.run_in_executor(None, _convert, text_pre)
except (Exception, IOError) as e:
await _msg.edit(content="failed. " + str(e))
raise e
if size >= ctx.guild.filesize_limit - 1500:
await _msg.edit(
content=f"MP3 is too large ({size / 1024 ** 2}Mb vs "
f"{ctx.guild.filesize_limit / 1024 ** 2}Mb)"
)
return
fn = ""
_words = text_pre.split()
while len(fn) < 28:
try:
word = _words.pop(0)
except IndexError:
break
if len(fn) + len(word) + 1 > 28:
continue
fn += word + "-"
fn = fn[:-1]
fn = fn[:28]
await _msg.edit(
content="Here's your MP3!",
file=discord.File(mp3, filename=fn + ".mp3")
)
await ctx.send_modal(TextModal())
def setup(bot):
bot.add_cog(OtherCog(bot))