college-bot-v1/cogs/other.py

1636 lines
67 KiB
Python
Raw Normal View History

2022-12-01 13:34:26 +00:00
import asyncio
2023-06-03 14:43:31 +01:00
import glob
2022-12-01 12:22:11 +00:00
import io
2023-04-27 11:13:25 +01:00
import json
2022-12-01 12:22:11 +00:00
import os
2023-05-11 18:58:02 +01:00
import subprocess
2023-01-03 13:56:23 +00:00
import random
import re
2023-03-14 12:39:57 +00:00
import tempfile
2023-01-03 15:17:09 +00:00
import textwrap
2023-04-28 21:31:00 +01:00
import gzip
2023-02-06 11:30:47 +00:00
from datetime import timedelta
2023-04-29 02:25:19 +01:00
from functools import partial
2023-03-16 21:45:01 +00:00
from io import BytesIO
2023-01-03 15:17:09 +00:00
2023-01-03 15:12:09 +00:00
import dns.resolver
2023-03-20 14:39:22 +00:00
import httpx
2023-01-23 16:57:31 +00:00
from dns import asyncresolver
2023-01-03 15:17:09 +00:00
import aiofiles
2023-03-16 22:45:04 +00:00
import pyttsx3
2023-03-16 23:04:41 +00:00
from time import time, time_ns, sleep
2023-01-03 13:56:23 +00:00
from typing import Literal
2022-12-29 17:41:41 +00:00
from typing import Tuple, Optional, Dict
2023-01-03 13:56:23 +00:00
from pathlib import Path
2023-01-03 14:43:49 +00:00
from urllib.parse import urlparse
2023-05-05 10:35:17 +01:00
from PIL import Image
import pytesseract
2022-11-13 23:16:47 +00:00
import aiohttp
2023-01-03 13:56:23 +00:00
import discord
import psutil
2023-04-28 21:31:00 +01:00
from discord.ext import commands, pages
2022-12-29 17:41:41 +00:00
from rich.tree import Tree
2023-01-03 13:56:23 +00:00
from selenium import webdriver
2023-01-16 15:51:03 +00:00
from selenium.common.exceptions import WebDriverException
2023-01-03 13:56:23 +00:00
from selenium.webdriver.chrome.options import Options as ChromeOptions
from selenium.webdriver.chrome.service import Service as ChromeService
from selenium.webdriver.firefox.options import Options as FirefoxOptions
from selenium.webdriver.firefox.service import Service as FirefoxService
2023-05-05 20:08:47 +01:00
from utils import console, Timer
2022-11-13 23:16:47 +00:00
2023-04-28 21:31:00 +01:00
try:
from config import proxy
except ImportError:
proxy = None
try:
from config import proxies
except ImportError:
if proxy:
proxies = [proxy] * 2
else:
proxies = []
2023-03-20 14:48:23 +00:00
_engine = pyttsx3.init()
# noinspection PyTypeChecker
VOICES = [x.id for x in _engine.getProperty("voices")]
del _engine
2022-11-13 23:16:47 +00:00
2023-04-28 21:31:00 +01:00
def format_autocomplete(ctx: discord.AutocompleteContext):
url = ctx.options.get("url", os.urandom(6).hex())
self: "OtherCog" = ctx.bot.cogs["OtherCog"] # type: ignore
if url in self._fmt_cache:
2023-04-29 02:59:17 +01:00
suitable = []
for _format_key in self._fmt_cache[url]:
_format = self._fmt_cache[url][_format_key]
_format_nice = _format["format"]
if ctx.value.lower() in _format_nice.lower():
suitable.append(_format_nice)
return suitable
2023-04-28 21:31:00 +01:00
try:
parsed = urlparse(url, allow_fragments=True)
except ValueError:
pass
else:
if parsed.scheme in ("http", "https") and parsed.netloc:
self._fmt_queue.put_nowait(url)
2023-04-29 02:27:18 +01:00
return []
2023-04-28 21:31:00 +01:00
# noinspection DuplicatedCode
2022-11-13 23:16:47 +00:00
class OtherCog(commands.Cog):
def __init__(self, bot):
self.bot = bot
self.lock = asyncio.Lock()
2023-03-20 14:39:22 +00:00
self.http = httpx.AsyncClient()
2023-04-28 21:31:00 +01:00
self._fmt_cache = {}
self._fmt_queue = asyncio.Queue()
self._worker_task = self.bot.loop.create_task(self.cache_population_job())
def cog_unload(self):
self._worker_task.cancel()
async def cache_population_job(self):
while True:
url = await self._fmt_queue.get()
if url not in self._fmt_cache:
await self.list_formats(url, use_proxy=1)
self._fmt_queue.task_done()
async def list_formats(self, url: str, *, use_proxy: int = 0) -> dict:
if url in self._fmt_cache:
return self._fmt_cache[url]
2023-04-29 02:59:17 +01:00
import yt_dlp
class NullLogger:
def debug(self, *args, **kwargs):
pass
def info(self, *args, **kwargs):
pass
def warning(self, *args, **kwargs):
pass
def error(self, *args, **kwargs):
pass
with tempfile.TemporaryDirectory(prefix="jimmy-ytdl", suffix="-info") as tempdir:
with yt_dlp.YoutubeDL(
{
"windowsfilenames": True,
"restrictfilenames": True,
"noplaylist": True,
"nocheckcertificate": True,
"no_color": True,
"noprogress": True,
"logger": NullLogger(),
"paths": {"home": tempdir, "temp": tempdir},
}
) as downloader:
try:
info = await self.bot.loop.run_in_executor(
None,
partial(downloader.extract_info, url, download=False)
)
except yt_dlp.utils.DownloadError:
return {}
info = downloader.sanitize_info(info)
new = {
fmt["format_id"]: {
"id": fmt["format_id"],
"ext": fmt["ext"],
"protocol": fmt["protocol"],
2023-06-01 01:09:05 +01:00
"acodec": fmt.get("acodec", "?"),
"vcodec": fmt.get("vcodec", "?"),
"resolution": fmt.get("resolution", "?x?"),
2023-04-29 02:59:17 +01:00
"filesize": fmt.get("filesize", float('inf')),
2023-06-01 01:09:05 +01:00
"format": fmt.get("format", '?'),
2023-04-29 02:59:17 +01:00
}
for fmt in info["formats"]
}
2023-04-28 21:31:00 +01:00
self._fmt_cache[url] = new
return new
2022-11-14 17:20:31 +00:00
class AbortScreenshotTask(discord.ui.View):
def __init__(self, task: asyncio.Task):
super().__init__()
self.task = task
@discord.ui.button(label="Abort", style=discord.ButtonStyle.red)
async def abort(self, button: discord.ui.Button, interaction: discord.Interaction):
new: discord.Interaction = await interaction.response.send_message("Aborting...", ephemeral=True)
self.task.cancel()
try:
await self.task
except asyncio.CancelledError:
pass
self.disable_all_items()
button.label = "[ aborted ]"
await new.edit_original_response(content="Aborted screenshot task.", view=self)
self.stop()
2023-01-03 15:20:50 +00:00
async def screenshot_website(
2023-01-15 19:39:07 +00:00
self,
ctx: discord.ApplicationContext,
website: str,
driver: Literal["chrome", "firefox"],
render_time: int = 10,
2023-02-02 12:16:02 +00:00
load_timeout: int = 30,
2023-01-15 19:39:07 +00:00
window_height: int = 1920,
window_width: int = 1080,
full_screenshot: bool = False,
2023-01-16 15:51:03 +00:00
) -> Tuple[discord.File, str, int, int]:
2023-01-15 19:39:07 +00:00
async def _blocking(*args):
return await self.bot.loop.run_in_executor(None, *args)
def find_driver():
2023-01-16 09:55:45 +00:00
nonlocal driver, driver_path
2023-01-15 19:39:07 +00:00
drivers = {
"firefox": [
"/usr/bin/firefox-esr",
"/usr/bin/firefox",
],
2023-01-18 20:54:48 +00:00
"chrome": ["/usr/bin/chromium", "/usr/bin/chrome", "/usr/bin/chrome-browser", "/usr/bin/google-chrome"],
2023-01-15 19:39:07 +00:00
}
selected_driver = driver
arr = drivers.pop(selected_driver)
for binary in arr:
b = Path(binary).resolve()
if not b.exists():
continue
driver = selected_driver
driver_path = b
break
else:
for key, value in drivers.items():
for binary in value:
b = Path(binary).resolve()
if not b.exists():
continue
driver = key
driver_path = b
break
else:
2023-01-13 22:42:03 +00:00
continue
break
else:
2023-01-15 19:39:07 +00:00
raise RuntimeError("No browser binary.")
return driver, driver_path
driver, driver_path = find_driver()
console.log(
"Using driver '{}' with binary '{}' to screenshot '{}', as requested by {}.".format(
driver, driver_path, website, ctx.user
)
)
def _setup():
nonlocal driver
if driver == "chrome":
options = ChromeOptions()
options.add_argument("--headless")
options.add_argument("--no-sandbox")
options.add_argument("--disable-dev-shm-usage")
options.add_argument("--disable-gpu")
options.add_argument("--disable-extensions")
options.add_argument("--incognito")
options.binary_location = str(driver_path)
service = ChromeService("/usr/bin/chromedriver")
driver = webdriver.Chrome(service=service, options=options)
driver.set_window_size(window_height, window_width)
2023-01-13 22:42:03 +00:00
else:
2023-01-15 19:39:07 +00:00
options = FirefoxOptions()
options.add_argument("--headless")
options.add_argument("--private-window")
options.add_argument("--safe-mode")
options.add_argument("--new-instance")
options.binary_location = str(driver_path)
service = FirefoxService("/usr/bin/geckodriver")
driver = webdriver.Firefox(service=service, options=options)
driver.set_window_size(window_height, window_width)
return driver, textwrap.shorten(website, 100)
# Is it overkill to cast this to a thread? yes
# Do I give a flying fuck? kinda
# Why am I doing this? I suspect setup is causing a ~10-second block of the event loop
2023-01-16 15:51:03 +00:00
driver_name = driver
start_init = time()
2023-01-15 19:39:07 +00:00
driver, friendly_url = await asyncio.to_thread(_setup)
2023-01-16 15:51:03 +00:00
end_init = time()
2023-01-18 20:54:48 +00:00
console.log("Driver '{}' initialised in {} seconds.".format(driver_name, round(end_init - start_init, 2)))
2023-01-03 13:56:23 +00:00
def _edit(content: str):
2023-01-09 14:36:32 +00:00
self.bot.loop.create_task(ctx.interaction.edit_original_response(content=content))
2023-02-02 12:16:02 +00:00
expires = round(time() + load_timeout)
_edit(content=f"Screenshotting <{friendly_url}>... (49%, loading webpage, aborts <t:{expires}:R>)")
await _blocking(driver.set_page_load_timeout, load_timeout)
2023-01-16 15:51:03 +00:00
start = time()
2023-01-15 19:39:07 +00:00
await _blocking(driver.get, website)
2023-01-16 15:51:03 +00:00
end = time()
get_time = round((end - start) * 1000)
render_time_expires = round(time() + render_time)
_edit(content=f"Screenshotting <{friendly_url}>... (66%, stopping render <t:{render_time_expires}:R>)")
2023-01-03 14:29:33 +00:00
await asyncio.sleep(render_time)
_edit(content=f"Screenshotting <{friendly_url}>... (83%, saving screenshot)")
2023-01-03 13:56:23 +00:00
domain = re.sub(r"https?://", "", website)
2023-01-15 19:39:07 +00:00
screenshot_method = driver.get_screenshot_as_png
2023-01-16 15:51:03 +00:00
if full_screenshot and driver_name == "firefox":
2023-01-15 19:39:07 +00:00
screenshot_method = driver.get_full_page_screenshot_as_png
2023-01-16 15:51:03 +00:00
start = time()
2023-01-15 19:39:07 +00:00
data = await _blocking(screenshot_method)
2023-01-03 13:56:23 +00:00
_io = io.BytesIO()
2023-01-15 19:39:07 +00:00
# Write the data async because HAHAHAHAHAHAHA
# We'll do it in the existing event loop though because less overhead
await _blocking(_io.write, data)
2023-01-03 13:56:23 +00:00
_io.seek(0)
2023-01-16 15:51:03 +00:00
end = time()
screenshot_time = round((end - start) * 1000)
2023-01-03 13:56:23 +00:00
driver.quit()
2023-01-16 15:51:03 +00:00
return discord.File(_io, f"{domain}.png"), driver_name, get_time, screenshot_time
2023-01-03 13:56:23 +00:00
2022-12-29 17:41:41 +00:00
@staticmethod
async def get_interface_ip_addresses() -> Dict[str, list[Dict[str, str | bool | int]]]:
addresses = await asyncio.to_thread(psutil.net_if_addrs)
stats = await asyncio.to_thread(psutil.net_if_stats)
result = {}
for key in addresses.keys():
result[key] = []
for ip_addr in addresses[key]:
if ip_addr.broadcast is None:
continue
else:
result[key].append(
{
"ip": ip_addr.address,
"netmask": ip_addr.netmask,
"broadcast": ip_addr.broadcast,
"up": stats[key].isup,
2023-01-03 15:20:50 +00:00
"speed": stats[key].speed,
2022-12-29 17:41:41 +00:00
}
)
return result
async def analyse_text(self, text: str) -> Optional[Tuple[float, float, float, float]]:
"""Analyse text for positivity, negativity and neutrality."""
def inner():
try:
from utils.sentiment_analysis import intensity_analyser
except ImportError:
return None
scores = intensity_analyser.polarity_scores(text)
return scores["pos"], scores["neu"], scores["neg"], scores["compound"]
async with self.bot.training_lock:
return await self.bot.loop.run_in_executor(None, inner)
2022-11-14 17:20:31 +00:00
@staticmethod
async def get_xkcd(session: aiohttp.ClientSession, n: int) -> dict | None:
async with session.get("https://xkcd.com/{!s}/info.0.json".format(n)) as response:
if response.status == 200:
2022-11-13 23:16:47 +00:00
data = await response.json()
2022-11-14 17:20:31 +00:00
return data
@staticmethod
async def random_xkcd_number(session: aiohttp.ClientSession) -> int:
async with session.get("https://c.xkcd.com/random/comic") as response:
if response.status != 302:
number = random.randint(100, 999)
else:
number = int(response.headers["location"].split("/")[-2])
2022-11-14 17:20:31 +00:00
return number
@staticmethod
async def random_xkcd(session: aiohttp.ClientSession) -> dict | None:
"""Fetches a random XKCD.
Basically a shorthand for random_xkcd_number and get_xkcd.
"""
number = await OtherCog.random_xkcd_number(session)
return await OtherCog.get_xkcd(session, number)
@staticmethod
def get_xkcd_embed(data: dict) -> discord.Embed:
2022-11-13 23:16:47 +00:00
embed = discord.Embed(
title=data["safe_title"], description=data["alt"], color=discord.Colour.embed_background()
2022-11-13 23:16:47 +00:00
)
embed.set_footer(text="XKCD #{!s}".format(data["num"]))
embed.set_image(url=data["img"])
2022-11-14 17:20:31 +00:00
return embed
@staticmethod
async def generate_xkcd(n: int = None) -> discord.Embed:
async with aiohttp.ClientSession() as session:
if n is None:
data = await OtherCog.random_xkcd(session)
n = data["num"]
2022-11-14 17:20:31 +00:00
else:
data = await OtherCog.get_xkcd(session, n)
if data is None:
return discord.Embed(
title="Failed to load XKCD :(", description="Try again later.", color=discord.Colour.red()
2022-11-14 17:20:31 +00:00
).set_footer(text="Attempted to retrieve XKCD #{!s}".format(n))
return OtherCog.get_xkcd_embed(data)
class XKCDGalleryView(discord.ui.View):
def __init__(self, n: int):
super().__init__(timeout=300, disable_on_timeout=True)
self.n = n
2022-11-16 17:28:47 +00:00
def __rich_repr__(self):
yield "n", self.n
yield "message", self.message
@discord.ui.button(label="Previous", style=discord.ButtonStyle.blurple)
2022-11-14 17:20:31 +00:00
async def previous_comic(self, _, interaction: discord.Interaction):
self.n -= 1
await interaction.response.defer()
await interaction.edit_original_response(embed=await OtherCog.generate_xkcd(self.n))
@discord.ui.button(label="Random", style=discord.ButtonStyle.blurple)
2022-11-14 17:20:31 +00:00
async def random_comic(self, _, interaction: discord.Interaction):
await interaction.response.defer()
await interaction.edit_original_response(embed=await OtherCog.generate_xkcd())
self.n = random.randint(1, 999)
@discord.ui.button(label="Next", style=discord.ButtonStyle.blurple)
2022-11-14 17:20:31 +00:00
async def next_comic(self, _, interaction: discord.Interaction):
self.n += 1
await interaction.response.defer()
await interaction.edit_original_response(embed=await OtherCog.generate_xkcd(self.n))
@commands.slash_command()
async def xkcd(self, ctx: discord.ApplicationContext, *, number: int = None):
"""Shows an XKCD comic"""
embed = await self.generate_xkcd(number)
view = self.XKCDGalleryView(number)
return await ctx.respond(embed=embed, view=view)
2022-11-13 23:16:47 +00:00
@commands.slash_command()
async def sentiment(self, ctx: discord.ApplicationContext, *, text: str):
"""Attempts to detect a text's tone"""
await ctx.defer()
if not text:
return await ctx.respond("You need to provide some text to analyse.")
result = await self.analyse_text(text)
if result is None:
return await ctx.edit(content="Failed to load sentiment analysis module.")
embed = discord.Embed(title="Sentiment Analysis", color=discord.Colour.embed_background())
embed.add_field(name="Positive", value="{:.2%}".format(result[0]))
embed.add_field(name="Neutral", value="{:.2%}".format(result[2]))
embed.add_field(name="Negative", value="{:.2%}".format(result[1]))
embed.add_field(name="Compound", value="{:.2%}".format(result[3]))
return await ctx.edit(content=None, embed=embed)
@commands.message_command(name="Detect Sentiment")
async def message_sentiment(self, ctx: discord.ApplicationContext, message: discord.Message):
await ctx.defer()
text = str(message.clean_content)
if not text:
return await ctx.respond("You need to provide some text to analyse.")
await ctx.respond("Analyzing (this may take some time)...")
result = await self.analyse_text(text)
if result is None:
return await ctx.edit(content="Failed to load sentiment analysis module.")
embed = discord.Embed(title="Sentiment Analysis", color=discord.Colour.embed_background())
embed.add_field(name="Positive", value="{:.2%}".format(result[0]))
embed.add_field(name="Neutral", value="{:.2%}".format(result[2]))
embed.add_field(name="Negative", value="{:.2%}".format(result[1]))
embed.add_field(name="Compound", value="{:.2%}".format(result[3]))
embed.url = message.jump_url
return await ctx.edit(content=None, embed=embed)
2022-12-01 12:22:11 +00:00
corrupt_file = discord.SlashCommandGroup(
name="corrupt-file",
description="Corrupts files.",
)
@corrupt_file.command(name="generate")
async def generate_corrupt_file(self, ctx: discord.ApplicationContext, file_name: str, size_in_megabytes: float):
"""Generates a "corrupted" file."""
2022-12-08 11:33:58 +00:00
limit_mb = round(ctx.guild.filesize_limit / 1024 / 1024)
if size_in_megabytes > limit_mb:
2022-12-28 21:14:14 +00:00
return await ctx.respond(
f"File size must be less than {limit_mb} MB.\n"
"Want to corrupt larger files? see https://github.com/EEKIM10/cli-utils#installing-the-right-way"
" (and then run `ruin <file>`)."
)
2022-12-01 12:22:11 +00:00
await ctx.defer()
2022-12-08 11:33:58 +00:00
size = max(min(int(size_in_megabytes * 1024 * 1024), ctx.guild.filesize_limit), 1)
2022-12-01 12:22:11 +00:00
file = io.BytesIO()
2022-12-08 11:35:15 +00:00
file.write(os.urandom(size - 1024))
2022-12-01 12:22:11 +00:00
file.seek(0)
return await ctx.respond(file=discord.File(file, file_name))
2022-12-01 13:30:04 +00:00
@staticmethod
def do_file_corruption(file: io.BytesIO, passes: int, bound_start: int, bound_end: int):
for _ in range(passes):
file.seek(random.randint(bound_start, bound_end))
file.write(os.urandom(random.randint(128, 2048)))
file.seek(0)
2022-12-01 13:32:57 +00:00
return file
2022-12-01 13:30:04 +00:00
2022-12-01 12:22:11 +00:00
@corrupt_file.command(name="ruin")
2022-12-01 12:42:51 +00:00
async def ruin_corrupt_file(
2022-12-28 21:14:14 +00:00
self,
ctx: discord.ApplicationContext,
file: discord.Attachment,
passes: int = 10,
metadata_safety_boundary: float = 5,
2022-12-01 12:42:51 +00:00
):
2022-12-01 12:22:11 +00:00
"""Takes a file and corrupts parts of it"""
await ctx.defer()
attachment = file
2022-12-01 12:30:48 +00:00
if attachment.size > 8388608:
2022-12-28 21:14:14 +00:00
return await ctx.respond(
"File is too large. Max size 8mb.\n"
"Want to corrupt larger files? see https://github.com/EEKIM10/cli-utils#installing-the-right-way"
" (and then run `ruin <file>`)."
)
2022-12-01 12:42:51 +00:00
bound_pct = attachment.size * (0.01 * metadata_safety_boundary)
bound_start = round(bound_pct)
bound_end = round(attachment.size - bound_pct)
2022-12-01 13:30:04 +00:00
await ctx.respond("Downloading file...")
2022-12-01 12:22:11 +00:00
file = io.BytesIO(await file.read())
file.seek(0)
2022-12-01 13:30:04 +00:00
await ctx.edit(content="Corrupting file...")
2022-12-28 21:14:14 +00:00
file = await asyncio.to_thread(self.do_file_corruption, file, passes, bound_start, bound_end)
2022-12-01 12:26:29 +00:00
file.seek(0)
2022-12-01 13:30:04 +00:00
await ctx.edit(content="Uploading file...")
await ctx.edit(content="Here's your corrupted file!", file=discord.File(file, attachment.filename))
2022-12-01 12:22:11 +00:00
2022-12-29 17:41:41 +00:00
@commands.command(name="kys", aliases=["kill"])
2022-12-28 21:14:14 +00:00
@commands.is_owner()
async def end_your_life(self, ctx: commands.Context):
await ctx.send(":( okay")
await self.bot.close()
2022-12-29 17:41:41 +00:00
@commands.slash_command()
async def ip(self, ctx: discord.ApplicationContext, detailed: bool = False, secure: bool = True):
"""Gets current IP"""
if not await self.bot.is_owner(ctx.user):
2023-01-09 14:25:44 +00:00
return await ctx.respond("Internal IP: 0.0.0.0\nExternal IP: 0.0.0.0")
2022-12-29 17:41:41 +00:00
await ctx.defer(ephemeral=secure)
ips = await self.get_interface_ip_addresses()
root = Tree("IP Addresses")
internal = root.add("Internal")
external = root.add("External")
interfaces = internal.add("Interfaces")
for interface, addresses in ips.items():
interface_tree = interfaces.add(interface)
for address in addresses:
colour = "green" if address["up"] else "red"
ip_tree = interface_tree.add(
f"[{colour}]" + address["ip"] + ((" (up)" if address["up"] else " (down)") if not detailed else "")
)
if detailed:
ip_tree.add(f"IF Up: {'yes' if address['up'] else 'no'}")
ip_tree.add(f"Netmask: {address['netmask']}")
ip_tree.add(f"Broadcast: {address['broadcast']}")
async with aiohttp.ClientSession() as session:
try:
async with session.get("https://api.ipify.org") as resp:
external.add(await resp.text())
except aiohttp.ClientError as e:
external.add(f" [red]Error: {e}")
with console.capture() as capture:
console.print(root)
text = capture.get()
paginator = commands.Paginator(prefix="```", suffix="```")
for line in text.splitlines():
paginator.add_line(line)
for page in paginator.pages:
await ctx.respond(page, ephemeral=secure)
2022-11-13 23:16:47 +00:00
2023-01-23 16:57:31 +00:00
@commands.slash_command()
async def dig(
2023-02-09 13:44:49 +00:00
self,
ctx: discord.ApplicationContext,
domain: str,
_type: discord.Option(
str,
name="type",
default="A",
choices=[
"A",
"AAAA",
"ANY",
"AXFR",
"CNAME",
"HINFO",
"LOC",
"MX",
"NS",
"PTR",
"SOA",
"SRV",
"TXT",
],
),
2023-01-23 16:57:31 +00:00
):
"""Looks up a domain name"""
await ctx.defer()
if re.search(r"\s+", domain):
return await ctx.respond("Domain name cannot contain spaces.")
try:
response = await asyncresolver.resolve(
domain,
_type.upper(),
)
except Exception as e:
return await ctx.respond(f"Error: {e}")
res = response
tree = Tree(f"DNS Lookup for {domain}")
for record in res:
record_tree = tree.add(f"{record.rdtype.name} Record")
record_tree.add(f"Name: {res.name}")
record_tree.add(f"Value: {record.to_text()}")
with console.capture() as capture:
console.print(tree)
text = capture.get()
paginator = commands.Paginator(prefix="```", suffix="```")
for line in text.splitlines():
paginator.add_line(line)
paginator.add_line(empty=True)
paginator.add_line(f"Exit code: {0}")
paginator.add_line(f"DNS Server used: {res.nameserver}")
for page in paginator.pages:
await ctx.respond(page)
2023-01-29 19:17:44 +00:00
@commands.slash_command()
async def traceroute(
2023-02-09 13:44:49 +00:00
self,
ctx: discord.ApplicationContext,
url: str,
port: discord.Option(int, description="Port to use", default=None),
ping_type: discord.Option(
str,
name="ping-type",
description="Type of ping to use. See `traceroute --help`",
choices=["icmp", "tcp", "udp", "udplite", "dccp", "default"],
default="default",
),
use_ip_version: discord.Option(
str, name="ip-version", description="IP version to use.", choices=["ipv4", "ipv6"], default="ipv4"
),
max_ttl: discord.Option(int, name="ttl", description="Max number of hops", default=30),
2023-01-29 19:17:44 +00:00
):
"""Performs a traceroute request."""
await ctx.defer()
if re.search(r"\s+", url):
return await ctx.respond("URL cannot contain spaces.")
2023-02-09 13:44:49 +00:00
args = ["sudo", "-E", "-n", "traceroute"]
2023-01-29 19:17:44 +00:00
flags = {
"ping_type": {
"icmp": "-I",
"tcp": "-T",
"udp": "-U",
"udplite": "-UL",
"dccp": "-D",
},
2023-02-09 13:44:49 +00:00
"use_ip_version": {"ipv4": "-4", "ipv6": "-6"},
2023-01-29 19:17:44 +00:00
}
if ping_type != "default":
args.append(flags["ping_type"][ping_type])
else:
args = args[3:] # removes sudo
args.append(flags["use_ip_version"][use_ip_version])
args.append("-m")
args.append(str(max_ttl))
if port is not None:
args.append("-p")
args.append(str(port))
args.append(url)
paginator = commands.Paginator()
paginator.add_line(f"Running command: {' '.join(args[3 if args[0] == 'sudo' else 0:])}")
paginator.add_line(empty=True)
try:
start = time_ns()
process = await asyncio.create_subprocess_exec(
args[0],
*args[1:],
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
await process.wait()
stdout, stderr = await process.communicate()
end = time_ns()
time_taken_in_ms = (end - start) / 1000000
if stdout:
for line in stdout.splitlines():
paginator.add_line(line.decode())
if stderr:
for line in stderr.splitlines():
paginator.add_line(line.decode())
paginator.add_line(empty=True)
paginator.add_line(f"Exit code: {process.returncode}")
paginator.add_line(f"Time taken: {time_taken_in_ms:,.1f}ms")
except Exception as e:
paginator.add_line(f"Error: {e}")
for page in paginator.pages:
await ctx.respond(page)
2023-01-03 13:56:23 +00:00
@commands.slash_command()
@commands.max_concurrency(1, commands.BucketType.user)
@commands.cooldown(1, 30, commands.BucketType.user)
2023-01-03 13:56:23 +00:00
async def screenshot(
2023-01-03 15:20:50 +00:00
self,
ctx: discord.ApplicationContext,
url: str,
2023-02-02 12:16:02 +00:00
browser: discord.Option(str, description="Browser to use", choices=["chrome", "firefox"], default="chrome"),
render_timeout: discord.Option(int, name="render-timeout", description="Timeout for rendering", default=3),
load_timeout: discord.Option(int, name="load-timeout", description="Timeout for page load", default=60),
2023-01-15 19:39:07 +00:00
window_height: discord.Option(
int, name="window-height", description="the height of the window in pixels", default=1920
),
window_width: discord.Option(
int, name="window-width", description="the width of the window in pixels", default=1080
),
capture_whole_page: discord.Option(
bool,
name="capture-full-page",
description="(firefox only) whether to capture the full page or just the viewport.",
default=False,
2023-01-18 20:54:48 +00:00
),
2023-01-03 13:56:23 +00:00
):
"""Takes a screenshot of a URL"""
if capture_whole_page and browser != "firefox":
return await ctx.respond("The capture-full-page option is only available for firefox.")
2023-01-16 15:51:03 +00:00
window_width = max(min(1080 * 6, window_width), 1080 // 6)
window_height = max(min(1920 * 6, window_height), 1920 // 6)
2023-01-03 13:56:23 +00:00
await ctx.defer()
2023-05-12 11:05:57 +01:00
# if ctx.user.id == 1019233057519177778 and ctx.me.guild_permissions.moderate_members:
# if ctx.user.communication_disabled_until is None:
# await ctx.user.timeout_for(timedelta(minutes=2), reason="no")
2023-01-03 14:43:49 +00:00
url = urlparse(url)
2023-01-16 09:59:37 +00:00
if not url.scheme:
2023-01-16 10:57:42 +00:00
if "/" in url.path:
hostname, path = url.path.split("/", 1)
else:
hostname = url.path
path = ""
2023-01-16 10:59:05 +00:00
url = url._replace(scheme="http", netloc=hostname, path=path)
2023-01-16 09:59:37 +00:00
2023-01-09 14:25:44 +00:00
friendly_url = textwrap.shorten(url.geturl(), 100)
2023-01-03 14:43:49 +00:00
await ctx.edit(content=f"Preparing to screenshot <{friendly_url}>... (0%, checking filters)")
2023-01-03 14:46:05 +00:00
2023-01-09 14:36:32 +00:00
async def blacklist_check() -> bool | str:
async with aiofiles.open("./assets/domains.txt") as blacklist:
2023-03-07 15:10:56 +00:00
for ln in await blacklist.readlines():
if not ln.strip():
2023-01-09 14:25:44 +00:00
continue
if re.match(ln.strip(), url.netloc):
2023-01-09 14:36:32 +00:00
return "Local blacklist"
2023-01-09 14:25:44 +00:00
return True
2023-01-09 14:36:32 +00:00
async def dns_check() -> Optional[bool | str]:
2023-01-09 14:25:44 +00:00
try:
# noinspection PyTypeChecker
2023-01-09 14:25:44 +00:00
for response in await asyncio.to_thread(dns.resolver.resolve, url.hostname, "A"):
if response.address == "0.0.0.0":
2023-01-09 14:36:32 +00:00
return "DNS blacklist"
2023-01-16 10:51:54 +00:00
except (dns.resolver.NXDOMAIN, dns.resolver.NoAnswer, dns.resolver.LifetimeTimeout, AttributeError):
return "Invalid domain or DNS error"
2023-03-07 15:02:34 +00:00
return True
2023-01-09 14:25:44 +00:00
done, pending = await asyncio.wait(
[
asyncio.create_task(blacklist_check(), name="local"),
asyncio.create_task(dns_check(), name="dns"),
],
return_when=asyncio.FIRST_COMPLETED,
2023-01-03 15:20:50 +00:00
)
2023-01-09 14:43:22 +00:00
done_tasks = done
2023-01-09 14:44:30 +00:00
try:
2023-01-09 14:45:57 +00:00
done = done_tasks.pop()
2023-01-09 14:44:30 +00:00
except KeyError:
2023-01-15 19:39:07 +00:00
return await ctx.respond("Something went wrong. Try again?\n")
2023-01-09 14:36:32 +00:00
result = await done
2023-03-07 15:02:34 +00:00
if not result:
2023-01-09 14:25:44 +00:00
return await ctx.edit(
content="That domain is blacklisted, doesn't exist, or there was no answer from the DNS server."
2023-01-15 19:39:07 +00:00
f" ({result!r})"
2023-01-09 14:25:44 +00:00
)
2023-01-09 14:36:32 +00:00
await asyncio.sleep(1)
await ctx.edit(content=f"Preparing to screenshot <{friendly_url}>... (16%, checking filters)")
2023-01-09 14:45:57 +00:00
okay = await (pending or done_tasks).pop()
2023-03-07 15:02:34 +00:00
if not okay:
2023-01-09 14:25:44 +00:00
return await ctx.edit(
content="That domain is blacklisted, doesn't exist, or there was no answer from the DNS server."
2023-03-14 10:44:16 +00:00
f" ({okay!r})"
2023-01-09 14:25:44 +00:00
)
2023-01-09 14:36:32 +00:00
await asyncio.sleep(1)
await ctx.edit(content=f"Screenshotting {textwrap.shorten(url.geturl(), 100)}... (33%, initializing browser)")
2023-01-03 13:56:23 +00:00
try:
async with self.lock:
screenshot, driver, fetch_time, screenshot_time = await self.screenshot_website(
2023-02-09 13:44:49 +00:00
ctx,
url.geturl(),
browser,
render_timeout,
load_timeout,
window_height,
window_width,
capture_whole_page,
)
2023-01-16 15:51:03 +00:00
except TimeoutError:
return await ctx.edit(content="Rendering screenshot timed out. Try using a smaller resolution.")
except WebDriverException as e:
paginator = commands.Paginator(prefix="```", suffix="```")
paginator.add_line("WebDriver Error (did you pass extreme or invalid command options?)")
paginator.add_line("Traceback:", empty=True)
for line in e.msg.splitlines():
paginator.add_line(line)
for page in paginator.pages:
await ctx.respond(page)
2023-01-03 13:56:23 +00:00
except Exception as e:
2023-01-03 14:32:21 +00:00
console.print_exception()
2023-01-16 15:51:03 +00:00
return await ctx.edit(content=f"Failed: {e}", delete_after=30)
2023-01-03 13:56:23 +00:00
else:
await ctx.edit(content=f"Screenshotting <{friendly_url}>... (99%, uploading image)")
2023-01-09 14:25:44 +00:00
await asyncio.sleep(0.5)
2023-01-16 15:51:03 +00:00
await ctx.edit(
content="Here's your screenshot!\n"
2023-01-18 20:54:48 +00:00
"Details:\n"
f"\\* Browser: {driver}\n"
f"\\* Resolution: {window_height}x{window_width} ({window_width*window_height:,} pixels)\n"
f"\\* URL: <{friendly_url}>\n"
f"\\* Load time: {fetch_time:.2f}ms\n"
2023-03-07 15:07:00 +00:00
f"\\* Screenshot render time: {screenshot_time:.2f}ms\n"
2023-05-15 18:48:19 +01:00
f"\\* Total time: {(fetch_time + screenshot_time):.2f}ms\n" +
2023-03-07 15:07:00 +00:00
(
'* Probability of being scat or something else horrifying: 100%'
if ctx.user.id == 1019233057519177778 else ''
),
2023-01-18 20:54:48 +00:00
file=screenshot,
2023-01-16 15:51:03 +00:00
)
2023-01-03 13:56:23 +00:00
2023-01-03 14:43:49 +00:00
domains = discord.SlashCommandGroup("domains", "Commands for managing domains")
@domains.command(name="add")
async def add_domain(self, ctx: discord.ApplicationContext, domain: str):
"""Adds a domain to the blacklist"""
await ctx.defer()
if not await self.bot.is_owner(ctx.user):
return await ctx.respond("You are not allowed to do that.")
async with aiofiles.open("./assets/domains.txt", "a") as blacklist:
2023-01-03 15:17:09 +00:00
await blacklist.write(domain.lower() + "\n")
2023-01-03 14:43:49 +00:00
await ctx.respond("Added domain to blacklist.")
@domains.command(name="remove")
async def remove_domain(self, ctx: discord.ApplicationContext, domain: str):
"""Removes a domain from the blacklist"""
await ctx.defer()
if not await self.bot.is_owner(ctx.user):
return await ctx.respond("You are not allowed to do that.")
async with aiofiles.open("./assets/domains.txt") as blacklist:
2023-01-03 15:17:09 +00:00
lines = await blacklist.readlines()
async with aiofiles.open("./assets/domains.txt", "w") as blacklist:
2023-01-03 14:43:49 +00:00
for line in lines:
if line.strip() != domain.lower():
2023-01-03 15:17:09 +00:00
await blacklist.write(line)
2023-01-03 14:43:49 +00:00
await ctx.respond("Removed domain from blacklist.")
2023-04-29 02:25:19 +01:00
@commands.slash_command(name="yt-dl-beta")
@commands.max_concurrency(1, commands.BucketType.user)
async def yt_dl_2(
self,
ctx: discord.ApplicationContext,
url: discord.Option(
description="The URL to download.",
type=str
),
2023-04-29 02:59:17 +01:00
list_formats: bool = False,
2023-04-29 02:25:19 +01:00
_format: discord.Option(
name="format",
description="The format to download.",
type=str,
autocomplete=format_autocomplete,
default=""
2023-04-29 02:59:17 +01:00
) = "",
2023-05-15 18:48:19 +01:00
extract_audio: bool = False,
2023-04-29 02:59:17 +01:00
upload_log: bool = False,
2023-04-29 02:25:19 +01:00
):
"""Downloads a video using youtube-dl"""
await ctx.defer()
2023-05-25 14:24:39 +01:00
compress_if_possible = False
2023-04-29 02:59:17 +01:00
formats = await self.list_formats(url)
if list_formats:
embeds = []
for fmt in formats.keys():
fs = formats[fmt].get("filesize", 0.1) or 0.1
if fs == float("inf"):
fs = 0
units = ["B"]
else:
units = ["B", "KB", "MB", "GB", "TB", "PB", "EB", "ZB", "YB"]
while fs > 1024:
fs /= 1024
units.pop(0)
embeds.append(
discord.Embed(
title=fmt,
2023-05-17 19:54:01 +01:00
description="- Encoding: {3} + {2}\n"
2023-04-29 02:59:17 +01:00
"- Extension: `.{0[ext]}`\n"
"- Resolution: {0[resolution]}\n"
"- Filesize: {1}\n"
2023-05-17 19:54:01 +01:00
"- Protocol: {0[protocol]}\n".format(
formats[fmt],
formats[fmt].get("acodec", 'N/A'),
formats[fmt].get("vcodec", 'N/A'),
f"{round(fs, 2)}{units[0]}"
),
2023-04-29 02:59:17 +01:00
colour=discord.Colour.blurple()
).add_field(
name="Download:",
value="{} url:{} video_format:{}".format(
2023-06-01 01:19:18 +01:00
self.bot.get_application_command("yt-dl-beta").mention,
2023-04-29 02:59:17 +01:00
url,
fmt
)
)
)
_paginator = pages.Paginator(embeds, loop_pages=True)
2023-06-01 01:21:27 +01:00
# await ctx.delete(delay=0.1)
2023-04-29 02:59:17 +01:00
return await _paginator.respond(ctx.interaction)
if _format:
_fmt = _format
for fmt in formats.keys():
if formats[fmt]["format"] == _format:
_format = fmt
break
# else:
# if not await self.bot.is_owner(ctx.user):
# return await ctx.edit(
# embed=discord.Embed(
# title="Error",
# description="Invalid format %r. pass `list-formats:True` to see a list of formats." % _fmt,
# colour=discord.Colour.red()
# ),
# delete_after=30
# )
2023-04-29 02:59:17 +01:00
2023-04-29 02:25:19 +01:00
MAX_SIZE_MB = ctx.guild.filesize_limit / 1024 / 1024
if MAX_SIZE_MB == 8.0:
MAX_SIZE_MB = 25.0
2023-05-11 18:55:58 +01:00
BYTES_REMAINING = (MAX_SIZE_MB - 0.256) * 1024 * 1024
2023-04-29 02:59:17 +01:00
import yt_dlp
2023-04-29 02:25:19 +01:00
2023-05-17 19:50:24 +01:00
with tempfile.TemporaryDirectory(prefix="jimmy-ytdl-") as tempdir_str:
2023-04-29 02:25:19 +01:00
tempdir = Path(tempdir_str).resolve()
stdout = tempdir / "stdout.txt"
stderr = tempdir / "stderr.txt"
class Logger:
def __init__(self):
self.stdout = open(stdout, "w+")
self.stderr = open(stderr, "w+")
def __del__(self):
self.stdout.close()
self.stderr.close()
def debug(self, msg: str):
if msg.startswith("[debug]"):
return
self.info(msg)
def info(self, msg: str):
self.stdout.write(msg + "\n")
self.stdout.flush()
def warning(self, msg: str):
self.stderr.write(msg + "\n")
self.stderr.flush()
def error(self, msg: str):
self.stderr.write(msg + "\n")
self.stderr.flush()
logger = Logger()
paths = {
target: str(tempdir)
for target in (
"home",
"temp",
)
}
2023-05-17 20:05:18 +01:00
args = {
"windowsfilenames": True,
"restrictfilenames": True,
"noplaylist": True,
"nocheckcertificate": True,
"no_color": True,
"noprogress": True,
"logger": logger,
"format": _format or None,
"paths": paths,
"outtmpl": f"{ctx.user.id}-%(title).50s.%(ext)s",
"trim_file_name": 128,
"extract_audio": extract_audio,
2023-05-25 23:00:10 +01:00
"format_sort": ["codec:h264", "ext"]
2023-05-17 20:05:18 +01:00
}
if extract_audio:
args["postprocessors"] = [
2023-04-29 02:25:19 +01:00
{
2023-05-17 20:05:18 +01:00
"key": "FFmpegExtractAudio",
2023-06-01 13:40:02 +01:00
"preferredquality": "48",
2023-06-01 13:42:55 +01:00
"preferredcodec": "vorbis"
2023-04-29 02:25:19 +01:00
}
2023-05-17 20:05:18 +01:00
]
args["format"] = args["format"] or f"(ba/b)[filesize<={MAX_SIZE_MB}M]"
if args["format"] is None:
args["format"] = f"(bv*+ba/bv/ba/b)[filesize<={MAX_SIZE_MB}M]"
with yt_dlp.YoutubeDL(args) as downloader:
2023-04-29 02:25:19 +01:00
try:
await ctx.respond(
embed=discord.Embed(title="Downloading...", colour=discord.Colour.blurple())
)
2023-05-12 11:09:50 +01:00
await self.bot.loop.run_in_executor(None, partial(downloader.download, [url]))
2023-04-29 02:25:19 +01:00
except yt_dlp.utils.DownloadError as e:
return await ctx.edit(
embed=discord.Embed(
title="Error",
description=f"Download failed:\n```\n{e}\n```",
colour=discord.Colour.red()
2023-06-01 01:09:05 +01:00
),
delete_after=30
2023-04-29 02:25:19 +01:00
)
else:
embed = discord.Embed(
title="Downloaded!",
description="",
colour=discord.Colour.green()
)
del logger
files = []
if upload_log:
2023-05-15 18:25:13 +01:00
if out_size := stdout.stat().st_size:
2023-04-29 02:25:19 +01:00
files.append(discord.File(stdout, "stdout.txt"))
2023-05-11 18:55:58 +01:00
BYTES_REMAINING -= out_size
2023-05-15 18:25:13 +01:00
if err_size := stderr.stat().st_size:
2023-04-29 02:25:19 +01:00
files.append(discord.File(stderr, "stderr.txt"))
2023-05-11 18:55:58 +01:00
BYTES_REMAINING -= err_size
2023-04-29 02:25:19 +01:00
for file in tempdir.glob(f"{ctx.user.id}-*"):
if file.stat().st_size == 0:
embed.description += f"\N{warning sign}\ufe0f {file.name} is empty.\n"
continue
st = file.stat().st_size
2023-05-11 19:01:53 +01:00
COMPRESS_FAILED = False
2023-05-15 18:48:19 +01:00
if st / 1024 / 1024 >= MAX_SIZE_MB or st >= BYTES_REMAINING:
2023-05-17 19:50:24 +01:00
if compress_if_possible and file.suffix in (
".mp4",
".mkv",
".mov",
'.aac',
'.opus',
'.webm'
):
2023-05-15 18:48:19 +01:00
await ctx.edit(
embed=discord.Embed(
title="Compressing...",
description="File name: `%s`\nThis will take a long time." % file.name,
colour=discord.Colour.blurple()
2023-05-11 18:34:32 +01:00
)
2023-05-11 19:04:13 +01:00
)
2023-05-15 18:48:19 +01:00
target = file.with_name(file.name + '.compressed' + file.suffix)
ffmpeg_command = [
"ffmpeg",
"-hide_banner",
"-i",
str(file),
"-crf",
"30",
"-preset",
2023-05-17 19:50:24 +01:00
"slow",
2023-05-15 18:48:19 +01:00
str(target)
]
try:
await self.bot.loop.run_in_executor(
None,
partial(
subprocess.run,
ffmpeg_command,
check=True
)
)
except subprocess.CalledProcessError:
COMPRESS_FAILED = True
else:
file = target
st = file.stat().st_size
2023-04-29 02:25:19 +01:00
units = ["B", "KB", "MB", "GB", "TB"]
st_r = st
while st_r > 1024:
st_r /= 1024
units.pop(0)
embed.description += "\N{warning sign}\ufe0f {} is too large to upload ({!s}{}" \
2023-05-11 19:01:53 +01:00
", max is {}MB{}).\n".format(
2023-05-12 16:39:34 +01:00
file.name,
round(st_r, 2),
units[0],
MAX_SIZE_MB,
', compressing failed' if COMPRESS_FAILED else ', compressed fine.'
2023-04-29 02:59:17 +01:00
)
2023-04-29 02:25:19 +01:00
continue
2023-05-11 18:55:58 +01:00
else:
files.append(discord.File(file, file.name))
BYTES_REMAINING -= st
2023-04-29 02:25:19 +01:00
if not files:
2023-04-29 02:59:17 +01:00
embed.description += "No files to upload. Directory list:\n%s" % (
"\n".join(r'\* ' + f.name for f in tempdir.iterdir())
)
2023-04-29 02:25:19 +01:00
return await ctx.edit(embed=embed)
else:
_desc = embed.description
2023-04-29 02:59:17 +01:00
embed.description += f"Uploading {len(files)} file(s)..."
2023-04-29 02:25:19 +01:00
await ctx.edit(embed=embed)
await ctx.channel.trigger_typing()
embed.description = _desc
2023-05-17 19:50:24 +01:00
start = time()
2023-04-29 02:25:19 +01:00
await ctx.edit(embed=embed, files=files)
2023-05-17 19:50:24 +01:00
end = time()
2023-05-16 17:13:15 +01:00
if (end - start) < 10:
await ctx.respond("*clearing typing*", delete_after=0.01)
2023-05-12 11:09:50 +01:00
2023-05-11 22:41:56 +01:00
async def bgtask():
await asyncio.sleep(120.0)
try:
await ctx.edit(embed=None)
except discord.NotFound:
pass
self.bot.loop.create_task(bgtask())
2023-05-12 16:39:34 +01:00
2023-03-16 21:45:01 +00:00
@commands.slash_command(name="text-to-mp3")
@commands.cooldown(5, 600, commands.BucketType.user)
2023-03-16 23:49:37 +00:00
async def text_to_mp3(
self,
ctx: discord.ApplicationContext,
speed: discord.Option(
int,
"The speed of the voice. Default is 150.",
required=False,
default=150
2023-03-20 14:48:23 +00:00
),
voice: discord.Option(
str,
"The voice to use. Some may cause timeout.",
2023-03-20 14:49:45 +00:00
autocomplete=discord.utils.basic_autocomplete(VOICES),
2023-03-20 14:48:23 +00:00
default="default"
2023-03-16 23:49:37 +00:00
)
):
2023-03-16 21:45:01 +00:00
"""Converts text to MP3. 5 uses per 10 minutes."""
2023-03-20 14:49:45 +00:00
if voice not in VOICES:
return await ctx.respond("Invalid voice.")
2023-03-16 23:58:18 +00:00
speed = min(300, max(50, speed))
2023-03-20 14:39:22 +00:00
_self = self
2023-03-16 21:45:01 +00:00
_bot = self.bot
2023-03-17 09:25:26 +00:00
2023-03-16 21:45:01 +00:00
class TextModal(discord.ui.Modal):
def __init__(self):
super().__init__(
discord.ui.InputText(
label="Text",
placeholder="Enter text to read",
min_length=1,
max_length=4000,
style=discord.InputTextStyle.long
),
title="Convert text to an MP3"
)
2023-03-20 14:48:23 +00:00
2023-03-16 21:45:01 +00:00
async def callback(self, interaction: discord.Interaction):
2023-03-20 14:39:22 +00:00
def _convert(text: str) -> Tuple[BytesIO, int]:
tmp_dir = tempfile.gettempdir()
target_fn = Path(tmp_dir) / f"jimmy-tts-{ctx.user.id}-{ctx.interaction.id}.mp3"
target_fn = str(target_fn)
2023-03-16 23:02:18 +00:00
engine = pyttsx3.init()
2023-03-20 14:48:23 +00:00
engine.setProperty("voice", voice)
2023-03-16 23:49:37 +00:00
engine.setProperty("rate", speed)
2023-03-16 23:02:18 +00:00
_io = BytesIO()
engine.save_to_file(text, target_fn)
engine.runAndWait()
2023-03-16 23:22:27 +00:00
last_3_sizes = [-3, -2, -1]
2023-03-20 14:39:22 +00:00
no_exists = 0
2023-03-16 23:27:03 +00:00
def should_loop():
if not os.path.exists(target_fn):
2023-03-20 14:39:22 +00:00
nonlocal no_exists
assert no_exists < 300, "File does not exist for 5 minutes."
no_exists += 1
2023-03-16 23:27:03 +00:00
return True
stat = os.stat(target_fn)
for _result in last_3_sizes:
if stat.st_size != _result:
return True
return False
while should_loop():
2023-03-16 23:31:38 +00:00
if os.path.exists(target_fn):
2023-03-16 23:34:29 +00:00
last_3_sizes.pop(0)
2023-03-16 23:31:38 +00:00
last_3_sizes.append(os.stat(target_fn).st_size)
2023-03-16 23:49:46 +00:00
sleep(1)
2023-03-16 23:22:27 +00:00
2023-03-16 23:02:18 +00:00
with open(target_fn, "rb") as f:
2023-03-20 14:39:22 +00:00
x = f.read()
_io.write(x)
2023-03-16 23:02:18 +00:00
os.remove(target_fn)
_io.seek(0)
2023-03-20 14:39:22 +00:00
return _io, len(x)
2023-03-16 22:45:04 +00:00
2023-03-16 21:45:01 +00:00
await interaction.response.defer()
2023-03-16 22:45:04 +00:00
text_pre = self.children[0].value
2023-03-20 14:39:22 +00:00
if text_pre.startswith("url:"):
_url = text_pre[4:].strip()
_msg = await interaction.followup.send("Downloading text...")
try:
2023-05-11 18:01:47 +01:00
response = await _self.http.get(
_url,
headers={"User-Agent": "Mozilla/5.0"},
2023-05-11 18:01:47 +01:00
follow_redirects=True
)
2023-03-20 14:39:22 +00:00
if response.status_code != 200:
await _msg.edit(content=f"Failed to download text. Status code: {response.status_code}")
return
ct = response.headers.get("Content-Type", "application/octet-stream")
if not ct.startswith("text/plain"):
await _msg.edit(content=f"Failed to download text. Content-Type is {ct!r}, not text/plain")
return
text_pre = response.text
except (ConnectionError, httpx.HTTPError, httpx.NetworkError) as e:
await _msg.edit(content="Failed to download text. " + str(e))
return
2023-03-20 14:39:22 +00:00
else:
2023-05-11 18:01:47 +01:00
_msg = await interaction.followup.send("Converting text to MP3... (0 seconds elapsed)")
2023-05-11 18:01:47 +01:00
async def assurance_task():
while True:
await asyncio.sleep(5.5)
await _msg.edit(
2023-05-11 18:04:53 +01:00
content=f"Converting text to MP3... ({time() - start_time:.1f} seconds elapsed)"
2023-05-11 18:01:47 +01:00
)
2023-05-11 18:04:53 +01:00
start_time = time()
2023-05-11 18:02:37 +01:00
task = _bot.loop.create_task(assurance_task())
2023-03-16 23:28:24 +00:00
try:
2023-05-11 18:01:47 +01:00
mp3, size = await asyncio.wait_for(
_bot.loop.run_in_executor(None, _convert, text_pre),
2023-05-12 16:39:34 +01:00
timeout=600
2023-05-11 18:01:47 +01:00
)
except asyncio.TimeoutError:
task.cancel()
await _msg.edit(content="Failed to convert text to MP3 - Timeout. Try shorter/less complex text.")
return
2023-03-16 23:28:24 +00:00
except (Exception, IOError) as e:
2023-05-11 18:01:47 +01:00
task.cancel()
2023-03-16 23:29:34 +00:00
await _msg.edit(content="failed. " + str(e))
raise e
2023-05-11 18:01:47 +01:00
task.cancel()
del task
2023-03-20 14:39:22 +00:00
if size >= ctx.guild.filesize_limit - 1500:
await _msg.edit(
2023-04-27 11:22:06 +01:00
content=f"MP3 is too large ({size / 1024 / 1024}Mb vs "
f"{ctx.guild.filesize_limit / 1024 / 1024}Mb)"
2023-03-20 14:39:22 +00:00
)
return
2023-03-16 22:45:04 +00:00
fn = ""
_words = text_pre.split()
while len(fn) < 28:
try:
word = _words.pop(0)
except IndexError:
break
if len(fn) + len(word) + 1 > 28:
continue
fn += word + "-"
2023-03-16 23:28:24 +00:00
fn = fn[:-1]
fn = fn[:28]
2023-03-16 21:45:01 +00:00
await _msg.edit(
content="Here's your MP3!",
2023-03-17 09:25:26 +00:00
file=discord.File(mp3, filename=fn + ".mp3")
2023-03-16 21:45:01 +00:00
)
await ctx.send_modal(TextModal())
2023-03-27 23:16:28 +01:00
@commands.slash_command()
@commands.cooldown(5, 10, commands.BucketType.user)
@commands.max_concurrency(1, commands.BucketType.user)
async def quote(self, ctx: discord.ApplicationContext):
"""Generates a random quote"""
2023-04-28 15:38:05 +01:00
emoji = discord.PartialEmoji(name='loading', animated=True, id=1101463077586735174)
2023-04-28 21:31:00 +01:00
2023-04-28 15:20:15 +01:00
async def get_quote() -> str | discord.File:
try:
response = await self.http.get("https://inspirobot.me/api?generate=true")
except (ConnectionError, httpx.HTTPError, httpx.NetworkError) as e:
return "Failed to get quote. " + str(e)
if response.status_code != 200:
return f"Failed to get quote. Status code: {response.status_code}"
url = response.text
try:
response = await self.http.get(url)
except (ConnectionError, httpx.HTTPError, httpx.NetworkError) as e:
return url
else:
if response.status_code != 200:
return url
x = io.BytesIO(response.content)
x.seek(0)
return discord.File(x, filename="quote.jpg")
class GenerateNewView(discord.ui.View):
2023-04-28 15:39:32 +01:00
def __init__(self):
super().__init__(
timeout=300,
disable_on_timeout=True
)
2023-04-28 15:20:15 +01:00
async def __aenter__(self):
self.disable_all_items()
if self.message:
await self.message.edit(view=self)
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
self.enable_all_items()
if self.message:
await self.message.edit(view=self)
return self
2023-04-28 15:24:28 +01:00
async def interaction_check(self, interaction: discord.Interaction) -> bool:
return interaction.user == ctx.user and interaction.channel == ctx.channel
2023-04-28 15:20:15 +01:00
@discord.ui.button(
label="New Quote",
style=discord.ButtonStyle.green,
emoji=discord.PartialEmoji.from_str("\U000023ed\U0000fe0f")
)
async def new_quote(self, _, interaction: discord.Interaction):
2023-04-28 15:32:15 +01:00
await interaction.response.defer(invisible=True)
2023-04-28 15:20:15 +01:00
async with self:
2023-04-28 15:38:05 +01:00
followup = await interaction.followup.send(f"{emoji} Generating quote")
2023-04-28 15:20:15 +01:00
new_result = await get_quote()
if isinstance(new_result, discord.File):
2023-04-28 15:40:46 +01:00
return await followup.edit(content=None, file=new_result, view=GenerateNewView())
2023-04-28 15:20:15 +01:00
else:
2023-04-28 15:38:05 +01:00
return await followup.edit(content=new_result, view=GenerateNewView())
2023-04-28 15:20:15 +01:00
@discord.ui.button(
label="Regenerate",
style=discord.ButtonStyle.blurple,
emoji=discord.PartialEmoji.from_str("\U0001f504")
)
async def regenerate(self, _, interaction: discord.Interaction):
2023-04-28 15:32:15 +01:00
await interaction.response.defer(invisible=True)
2023-04-28 15:20:15 +01:00
async with self:
2023-04-28 15:24:28 +01:00
message = await interaction.original_response()
2023-04-28 15:38:05 +01:00
if "\U00002b50" in [_reaction.emoji for _reaction in message.reactions]:
2023-04-28 15:24:28 +01:00
return await interaction.followup.send(
"\N{cross mark} Message is starred and cannot be regenerated. You can press "
"'New Quote' to generate a new quote instead.",
ephemeral=True
)
2023-04-28 15:20:15 +01:00
new_result = await get_quote()
if isinstance(new_result, discord.File):
return await interaction.edit_original_response(file=new_result)
else:
return await interaction.edit_original_response(content=new_result)
2023-04-28 15:26:09 +01:00
@discord.ui.button(
label="Delete",
style=discord.ButtonStyle.red,
emoji="\N{wastebasket}\U0000fe0f"
)
async def delete(self, _, interaction: discord.Interaction):
2023-04-28 15:32:15 +01:00
await interaction.response.defer(invisible=True)
2023-04-28 15:26:09 +01:00
await interaction.delete_original_response()
self.stop()
2023-03-27 23:16:28 +01:00
await ctx.defer()
2023-04-28 15:20:15 +01:00
result = await get_quote()
if isinstance(result, discord.File):
return await ctx.respond(file=result, view=GenerateNewView())
2023-03-27 23:16:28 +01:00
else:
2023-04-28 15:20:15 +01:00
return await ctx.respond(result, view=GenerateNewView())
2023-03-14 12:39:57 +00:00
2023-05-05 10:35:17 +01:00
@commands.slash_command()
@commands.cooldown(1, 30, commands.BucketType.user)
@commands.max_concurrency(1, commands.BucketType.user)
async def ocr(
self,
ctx: discord.ApplicationContext,
attachment: discord.Option(
discord.SlashCommandOptionType.attachment,
description="Image to perform OCR on",
)
):
"""OCRs an image"""
await ctx.defer()
2023-05-05 20:08:47 +01:00
timings: Dict[str, float] = {}
2023-05-05 10:35:17 +01:00
attachment: discord.Attachment
2023-05-30 19:53:47 +01:00
with Timer() as _t:
2023-05-05 20:08:47 +01:00
data = await attachment.read()
file = io.BytesIO(data)
file.seek(0)
2023-05-30 19:53:47 +01:00
timings["Download attachment"] = _t.total
with Timer() as _t:
2023-05-05 20:08:47 +01:00
img = await self.bot.loop.run_in_executor(None, Image.open, file)
2023-05-30 19:53:47 +01:00
timings["Parse image"] = _t.total
2023-05-05 10:35:17 +01:00
try:
2023-05-30 19:53:47 +01:00
with Timer() as _t:
2023-05-05 20:08:47 +01:00
text = await self.bot.loop.run_in_executor(None, pytesseract.image_to_string, img)
2023-05-30 19:53:47 +01:00
timings["Perform OCR"] = _t.total
2023-05-05 10:35:17 +01:00
except pytesseract.TesseractError as e:
return await ctx.respond(f"Failed to perform OCR: `{e}`")
2023-05-05 20:08:47 +01:00
if len(text) > 4096:
2023-05-30 19:53:47 +01:00
with Timer() as _t:
2023-05-05 20:08:47 +01:00
try:
response = await self.http.put(
"https://api.mystb.in/paste",
json={
"files": [
{
"filename": "ocr.txt",
"content": text
}
],
}
)
response.raise_for_status()
except httpx.HTTPError:
return await ctx.respond("OCR content too large to post.")
else:
data = response.json()
with Timer(timings, "Respond (URL)"):
embed = discord.Embed(
description="View on [mystb.in](%s)" % ("https://mystb.in/" + data["id"]),
colour=discord.Colour.dark_theme()
)
await ctx.respond(embed=embed)
2023-05-30 19:53:47 +01:00
timings["Upload text to mystbin"] = _t.total
elif len(text) <= 1500 and text.count("\n") <= 7:
with Timer() as _t:
await ctx.respond(embed=discord.Embed(description=text))
timings["Respond (Text)"] = _t.total
2023-05-05 20:08:47 +01:00
else:
2023-05-30 19:53:47 +01:00
with Timer() as _t:
2023-05-05 20:08:47 +01:00
out_file = io.BytesIO(text.encode("utf-8", "replace"))
await ctx.respond(file=discord.File(out_file, filename="ocr.txt"))
2023-05-30 19:53:47 +01:00
timings["Respond (File)"] = _t.total
2023-05-05 10:35:17 +01:00
if timings:
2023-05-30 19:56:39 +01:00
text = "Timings:\n" + "\n".join("{}: {:.2f}s".format(k.title(), v) for k, v in timings.items())
await ctx.edit(
2023-05-30 19:53:47 +01:00
content=text,
)
2023-05-05 10:35:17 +01:00
2023-05-23 15:09:09 +01:00
@commands.slash_command(name="image-to-gif")
@commands.cooldown(1, 30, commands.BucketType.user)
@commands.max_concurrency(1, commands.BucketType.user)
async def convert_image_to_gif(
self,
ctx: discord.ApplicationContext,
image: discord.Option(
discord.SlashCommandOptionType.attachment,
description="Image to convert. PNG/JPEG only.",
),
backup: discord.Option(
discord.SlashCommandOptionType.boolean,
description="Sends the GIF to your DM as well so you'll never lose it.",
default=False
2023-05-23 15:09:09 +01:00
)
):
"""Converts a static image to a gif, so you can save it"""
2023-05-25 09:55:02 +01:00
await ctx.defer()
2023-05-23 15:09:09 +01:00
image: discord.Attachment
with tempfile.TemporaryFile("wb+") as f:
await image.save(f)
f.seek(0)
img = await self.bot.loop.run_in_executor(None, Image.open, f)
2023-05-23 17:27:58 +01:00
if img.format.upper() not in ("PNG", "JPEG", "WEBP", "HEIF", "BMP", "TIFF"):
return await ctx.respond("Image must be PNG, JPEG, WEBP, or HEIF.")
2023-05-23 15:09:09 +01:00
with tempfile.TemporaryFile("wb+") as f2:
2023-05-23 17:27:58 +01:00
caller = partial(img.save, f2, format="GIF")
await self.bot.loop.run_in_executor(None, caller)
2023-05-23 15:09:09 +01:00
f2.seek(0)
try:
await ctx.respond(file=discord.File(f2, filename="image.gif"))
except discord.HTTPException as e:
if e.code == 40005:
return await ctx.respond("Image is too large.")
return await ctx.respond(f"Failed to upload: `{e}`")
if backup:
try:
await ctx.user.send(file=discord.File(f2, filename="image.gif"))
except discord.Forbidden:
return await ctx.respond("Unable to mirror to your DM - am I blocked?", ephemeral=True)
2023-05-23 15:09:09 +01:00
2023-05-25 09:51:16 +01:00
@commands.message_command(name="Convert Image to GIF")
async def convert_image_to_gif(self, ctx: discord.ApplicationContext, message: discord.Message):
2023-05-25 09:55:02 +01:00
await ctx.defer()
2023-05-25 09:51:16 +01:00
for attachment in message.attachments:
if attachment.content_type.startswith("image/"):
break
else:
return await ctx.respond("No image found.")
image = attachment
image: discord.Attachment
with tempfile.TemporaryFile("wb+") as f:
await image.save(f)
f.seek(0)
img = await self.bot.loop.run_in_executor(None, Image.open, f)
if img.format.upper() not in ("PNG", "JPEG", "WEBP", "HEIF", "BMP", "TIFF"):
return await ctx.respond("Image must be PNG, JPEG, WEBP, or HEIF.")
with tempfile.TemporaryFile("wb+") as f2:
caller = partial(img.save, f2, format="GIF")
await self.bot.loop.run_in_executor(None, caller)
f2.seek(0)
try:
await ctx.respond(file=discord.File(f2, filename="image.gif"))
except discord.HTTPException as e:
if e.code == 40005:
return await ctx.respond("Image is too large.")
return await ctx.respond(f"Failed to upload: `{e}`")
try:
2023-05-25 09:53:24 +01:00
f2.seek(0)
2023-05-25 09:51:16 +01:00
await ctx.user.send(file=discord.File(f2, filename="image.gif"))
except discord.Forbidden:
return await ctx.respond("Unable to mirror to your DM - am I blocked?", ephemeral=True)
2023-06-03 14:17:34 +01:00
@commands.slash_command()
@commands.cooldown(1, 180, commands.BucketType.user)
@commands.max_concurrency(1, commands.BucketType.user)
async def sherlock(
self,
ctx: discord.ApplicationContext,
username: str,
search_nsfw: bool = False,
use_tor: bool = False
):
"""Sherlocks a username."""
2023-06-03 14:43:31 +01:00
if re.search(r"\s", username) is not None:
return await ctx.respond("Username cannot contain spaces.")
2023-06-03 14:22:35 +01:00
async def background_task():
2023-06-03 14:46:36 +01:00
chars = ["|", "/", "-", "\\"]
n = 0
2023-06-03 14:22:35 +01:00
# Every 5 seconds update the embed to show that the command is still running
while True:
2023-06-03 14:57:35 +01:00
await asyncio.sleep(2.5)
2023-06-03 14:23:53 +01:00
elapsed = time() - start_time
2023-06-03 14:28:44 +01:00
embed = discord.Embed(
2023-06-03 14:47:49 +01:00
title="Sherlocking username %s" % chars[n % 4],
2023-06-03 14:28:44 +01:00
description=f"Elapsed: {elapsed:.0f}s",
colour=discord.Colour.dark_theme()
)
2023-06-03 14:22:35 +01:00
await ctx.edit(
2023-06-03 14:28:44 +01:00
embed=embed
2023-06-03 14:22:35 +01:00
)
2023-06-03 14:46:36 +01:00
n += 1
2023-06-03 14:22:35 +01:00
2023-06-03 14:17:34 +01:00
await ctx.defer()
# output results to a temporary directory
with tempfile.TemporaryDirectory() as tempdir:
command = [
"docker",
"run",
"--rm",
"-t",
"-v",
f"{tempdir}:/opt/sherlock/results",
"sherlock",
2023-06-03 15:04:18 +01:00
"--folderoutput", "/opt/sherlock/results",
2023-06-03 14:46:36 +01:00
"--print-found",
2023-06-03 15:04:18 +01:00
"--csv"
2023-06-03 14:17:34 +01:00
]
if search_nsfw:
2023-06-03 14:22:35 +01:00
command.append("--nsfw")
2023-06-03 14:17:34 +01:00
if use_tor:
command.append("--tor")
# Output to result.csv
# Username to search for
command.append(username)
# Run the command
2023-06-03 14:22:35 +01:00
start_time = time()
2023-06-03 14:17:34 +01:00
result = await asyncio.create_subprocess_exec(
*command,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
2023-06-03 14:22:35 +01:00
await ctx.respond(embed=discord.Embed(title="Starting..."))
task = asyncio.create_task(background_task())
2023-06-03 14:17:34 +01:00
# Wait for it to finish
stdout, stderr = await result.communicate()
2023-06-03 14:31:57 +01:00
await result.wait()
2023-06-03 14:22:35 +01:00
task.cancel()
# wait for task to exit
try:
await task
except asyncio.CancelledError:
pass
2023-06-03 14:17:34 +01:00
# If it errored, send the error
if result.returncode != 0:
2023-06-03 14:51:33 +01:00
return await ctx.edit(
2023-06-03 14:17:34 +01:00
embed=discord.Embed(
title="Error",
2023-06-03 14:56:01 +01:00
description=f"```ansi\n{stderr.decode()[:4000]}```",
2023-06-03 14:17:34 +01:00
colour=discord.Colour.red(),
)
)
# If it didn't error, send the results
2023-06-03 15:01:21 +01:00
stdout = stdout.decode()
if len(stdout) > 4000:
paginator = commands.Paginator("```ansi", max_size=4000)
for line in stdout.splitlines():
paginator.add_line(line)
desc = paginator.pages[0]
title = "Results (truncated)"
else:
desc = f"```ansi\n{stdout}```"
title = "Results"
2023-06-03 15:10:31 +01:00
print(glob.glob(f"{tempdir}/*"))
files = list(map(discord.File, glob.glob(f"{tempdir}/*")))
2023-06-03 14:51:33 +01:00
await ctx.edit(
2023-06-03 15:10:31 +01:00
files=files,
2023-06-03 14:17:34 +01:00
embed=discord.Embed(
2023-06-03 15:01:21 +01:00
title=title,
description=desc,
2023-06-03 14:17:34 +01:00
colour=discord.Colour.green(),
),
)
2023-01-03 14:43:49 +00:00
2022-11-13 23:16:47 +00:00
def setup(bot):
bot.add_cog(OtherCog(bot))