This commit is contained in:
nex 2023-01-16 15:51:03 +00:00
parent 947f57d334
commit f48eb699e7

View file

@ -4,10 +4,11 @@ import os
import random
import re
import textwrap
import traceback
import dns.resolver
import aiofiles
from time import sleep as time_sleep
from time import time
from typing import Literal
from typing import Tuple, Optional, Dict
from pathlib import Path
@ -19,10 +20,12 @@ import psutil
from discord.ext import commands
from rich.tree import Tree
from selenium import webdriver
from selenium.common.exceptions import WebDriverException
from selenium.webdriver.chrome.options import Options as ChromeOptions
from selenium.webdriver.chrome.service import Service as ChromeService
from selenium.webdriver.firefox.options import Options as FirefoxOptions
from selenium.webdriver.firefox.service import Service as FirefoxService
# from selenium.webdriver.ie
from utils import console
@ -41,7 +44,7 @@ class OtherCog(commands.Cog):
window_height: int = 1920,
window_width: int = 1080,
full_screenshot: bool = False,
) -> discord.File:
) -> Tuple[discord.File, str, int, int]:
async def _blocking(*args):
return await self.bot.loop.run_in_executor(None, *args)
@ -51,10 +54,8 @@ class OtherCog(commands.Cog):
"firefox": [
"/usr/bin/firefox-esr",
"/usr/bin/firefox",
# '/usr/bin/geckodriver'
],
"chrome": [
# '/usr/bin/chromedriver',
"/usr/bin/chromium",
"/usr/bin/chrome",
"/usr/bin/chrome-browser",
@ -87,7 +88,6 @@ class OtherCog(commands.Cog):
return driver, driver_path
driver, driver_path = find_driver()
original_driver_name = driver
console.log(
"Using driver '{}' with binary '{}' to screenshot '{}', as requested by {}.".format(
driver, driver_path, website, ctx.user
@ -123,31 +123,45 @@ class OtherCog(commands.Cog):
# Is it overkill to cast this to a thread? yes
# Do I give a flying fuck? kinda
# Why am I doing this? I suspect setup is causing a ~10-second block of the event loop
driver_name = driver
start_init = time()
driver, friendly_url = await asyncio.to_thread(_setup)
end_init = time()
console.log(
"Driver '{}' initialised in {} seconds.".format(
driver_name, round(end_init - start_init, 2)
)
)
async def _edit(content: str):
self.bot.loop.create_task(ctx.interaction.edit_original_response(content=content))
await _edit(content=f"Screenshotting {friendly_url}... (49%)")
await _edit(content=f"Screenshotting <{friendly_url}>... (49%)")
await _blocking(driver.set_page_load_timeout, render_time)
start = time()
await _blocking(driver.get, website)
await _edit(content=f"Screenshotting {friendly_url}... (66%)")
end = time()
get_time = round((end - start) * 1000)
await _edit(content=f"Screenshotting <{friendly_url}>... (66%)")
await asyncio.sleep(render_time)
await _edit(content=f"Screenshotting {friendly_url}... (83%)")
await _edit(content=f"Screenshotting <{friendly_url}>... (83%)")
domain = re.sub(r"https?://", "", website)
screenshot_method = driver.get_screenshot_as_png
if full_screenshot and original_driver_name == "firefox":
if full_screenshot and driver_name == "firefox":
screenshot_method = driver.get_full_page_screenshot_as_png
start = time()
data = await _blocking(screenshot_method)
_io = io.BytesIO()
# Write the data async because HAHAHAHAHAHAHA
# We'll do it in the existing event loop though because less overhead
await _blocking(_io.write, data)
_io.seek(0)
end = time()
screenshot_time = round((end - start) * 1000)
driver.quit()
return discord.File(_io, f"{domain}.png")
return discord.File(_io, f"{domain}.png"), driver_name, get_time, screenshot_time
@staticmethod
async def get_interface_ip_addresses() -> Dict[str, list[Dict[str, str | bool | int]]]:
@ -429,10 +443,12 @@ class OtherCog(commands.Cog):
)
):
"""Takes a screenshot of a URL"""
window_width = max(min(1080 * 6, window_width), 1080 // 6)
window_height = max(min(1920 * 6, window_height), 1920 // 6)
await ctx.defer()
if ctx.user.id == 1019233057519177778:
if getattr(self.bot, "ALLOW_MATTHEW", False) is False:
return await ctx.respond("No.")
# if ctx.user.id == 1019233057519177778:
# if getattr(self.bot, "ALLOW_MATTHEW", False) is False:
# return await ctx.respond("No.")
url = urlparse(url)
if not url.scheme:
if "/" in url.path:
@ -444,7 +460,7 @@ class OtherCog(commands.Cog):
friendly_url = textwrap.shorten(url.geturl(), 100)
await ctx.edit(content=f"Preparing to screenshot {friendly_url}... (0%)")
await ctx.edit(content=f"Preparing to screenshot <{friendly_url}>... (0%)")
async def blacklist_check() -> bool | str:
async with aiofiles.open("domains.txt") as blacklist:
@ -486,7 +502,7 @@ class OtherCog(commands.Cog):
)
await asyncio.sleep(1)
await ctx.edit(content=f"Preparing to screenshot {friendly_url}... (16%)")
await ctx.edit(content=f"Preparing to screenshot <{friendly_url}>... (16%)")
okay = await (pending or done_tasks).pop()
if okay is not True:
return await ctx.edit(
@ -497,16 +513,35 @@ class OtherCog(commands.Cog):
await asyncio.sleep(1)
await ctx.edit(content=f"Screenshotting {textwrap.shorten(url.geturl(), 100)}... (33%)")
try:
screenshot = await self.screenshot_website(
screenshot, driver, fetch_time, screenshot_time = await self.screenshot_website(
ctx, url.geturl(), browser, render_timeout, window_height, window_width, capture_whole_page
)
except TimeoutError:
return await ctx.edit(content="Rendering screenshot timed out. Try using a smaller resolution.")
except WebDriverException as e:
paginator = commands.Paginator(prefix="```", suffix="```")
paginator.add_line("WebDriver Error (did you pass extreme or invalid command options?)")
paginator.add_line("Traceback:", empty=True)
for line in e.msg.splitlines():
paginator.add_line(line)
for page in paginator.pages:
await ctx.respond(page)
except Exception as e:
console.print_exception()
return await ctx.edit(content=f"Error: {e}", delete_after=30)
return await ctx.edit(content=f"Failed: {e}", delete_after=30)
else:
await ctx.edit(content=f"Screenshotting {friendly_url}... (99%)")
await ctx.edit(content=f"Screenshotting <{friendly_url}>... (99%)")
await asyncio.sleep(0.5)
await ctx.edit(content="Here's your screenshot!", file=screenshot)
await ctx.edit(
content="Here's your screenshot!\n"
"Details:\n"
f"\\* Browser: {driver}\n"
f"\\* Resolution: {window_height}x{window_width} ({window_width*window_height:,} pixels)\n"
f"\\* URL: <{friendly_url}>\n"
f"\\* Load time: {fetch_time:.2f}ms\n"
f"\\* Screenshot render time: {screenshot_time:.2f}ms\n",
file=screenshot
)
domains = discord.SlashCommandGroup("domains", "Commands for managing domains")
@ -534,28 +569,6 @@ class OtherCog(commands.Cog):
await blacklist.write(line)
await ctx.respond("Removed domain from blacklist.")
@commands.group(name="matthew", invoke_without_command=True)
@commands.is_owner()
async def matthew_group(self, ctx: commands.Context):
"""Commands for managing domains"""
await ctx.send_help(ctx.command)
@matthew_group.command(name="enable")
async def enable_matthew(self, ctx: commands.Context):
"""Enables Matthew"""
if not await self.bot.is_owner(ctx.author):
return await ctx.reply("You are not allowed to do that.")
self.bot.ALLOW_MATTHEW = True
await ctx.reply("Matthew enabled.")
@matthew_group.command(name="disable")
async def disable_matthew(self, ctx: commands.Context):
"""Disables Matthew"""
if not await self.bot.is_owner(ctx.author):
return await ctx.reply("You are not allowed to do that.")
self.bot.ALLOW_MATTHEW = False
await ctx.reply("Matthew disabled.")
def setup(bot):
bot.add_cog(OtherCog(bot))