import asyncio import datetime import io import logging import os import tempfile import time import copy import typing from urllib.parse import urlparse import discord import selenium.common from discord.ext import commands from PIL import Image from selenium import webdriver from selenium.webdriver.chrome.options import Options as ChromeOptions from selenium.webdriver.chrome.service import Service as ChromeService from conf import CONFIG RESOLUTIONS = { "8k": "7680x4320", "4k": "3840x2160", "1440p": "2560x1440", "1080p": "1920x1080", "720p": "1280x720", "480p": "854x480", "360p": "640x360", "240p": "426x240", "144p": "256x144" } _RES_OPTION = discord.Option( name="resolution", description="The resolution of the browser, can be WxH, or a preset (e.g. 1080p).", autocomplete=discord.utils.basic_autocomplete(tuple(RESOLUTIONS.keys())) ) class ScreenshotCog(commands.Cog): def __init__(self, bot: commands.Bot): self.bot = bot self.log = logging.getLogger("jimmy.cogs.screenshot") def chrome_options(self) -> ChromeOptions: _chrome_options = ChromeOptions() _chrome_options.add_argument("--headless") _chrome_options.add_argument("--disable-extensions") _chrome_options.add_argument("--incognito") _chrome_options.add_argument("--remote-debugging-port=9222") if os.getuid() == 0: _chrome_options.add_argument("--no-sandbox") _chrome_options.add_argument("--disable-dev-shm-usage") _chrome_options.add_argument("--disable-setuid-sandbox") self.log.warning("Running as root, disabling chrome sandbox.") _chrome_options.add_argument( "--user-agent=Mozi11a/5.0 " "(X11; Linux x86_64) AppleWebKit/537.36 (KHTML, Like Gecko) Chrome/121.9.6167.160 Safari/537.36" ) prefs = { "download.open_pdf_in_system_reader": False, "plugins.always_open_pdf_externally": False, "download_restrictions": 3, } _chrome_options.add_experimental_option( "prefs", prefs ) return _chrome_options def compress_png(self, input_file: io.BytesIO) -> io.BytesIO: img = Image.open(input_file) img = img.convert("RGB") with tempfile.NamedTemporaryFile(suffix=".webp") as file: quality = 100 while quality > 0: if quality == 100: quality_r = 99 else: quality_r = quality self.log.debug("Compressing image with quality %d%%", quality_r) img.save(file.name, "webp", quality=quality_r) file.seek(0) value = io.BytesIO(file.read()) if len(value.getvalue()) <= 8 * 1024 * 1024: self.log.debug("%d%% was sufficient.", quality_r) break quality -= 15 else: raise RuntimeError("Couldn't compress image.") return value # noinspection PyTypeChecker @commands.slash_command() async def screenshot( self, ctx: discord.ApplicationContext, url: str, load_timeout: int = 10, render_timeout: int = None, eager: bool = None, resolution: typing.Annotated[ str, _RES_OPTION ] = "1440p", use_proxy: bool = False ): """Screenshots a webpage.""" await ctx.defer() if eager is None: eager = render_timeout is None if render_timeout is None: # render_timeout = 30 if eager else 10 render_timeout = 30 if not url.startswith("http"): url = "https://" + url self.log.debug( "User %s (%s) is attempting to screenshot %r with load timeout %d, render timeout %d, %s loading, and " "a %s resolution.", ctx.author, ctx.author.id, url, load_timeout, render_timeout, "eager" if eager else "lazy", resolution ) parsed = urlparse(url) await ctx.respond("Initialising...") start_init = time.time() try: options = self.chrome_options() if use_proxy is True and (server := CONFIG["screenshot"].get("proxy")): use_proxy = True options.add_argument("--proxy-server=" + server) else: use_proxy = False service = await asyncio.to_thread(ChromeService) driver: webdriver.Chrome = await asyncio.to_thread( webdriver.Chrome, service=service, options=options ) driver.set_page_load_timeout(load_timeout) if resolution: resolution = RESOLUTIONS.get(resolution.lower(), resolution) try: width, height = map(int, resolution.split("x")) driver.set_window_size(width, height) if height > 4320 or width > 7680: return await ctx.respond("Invalid resolution. Max resolution is 7680x4320 (8K).") except ValueError: return await ctx.respond("Invalid resolution. please provide width x height, e.g. 1920x1080") if eager: driver.implicitly_wait(render_timeout) except Exception as e: await ctx.respond("Failed to initialise browser: " + str(e)) raise end_init = time.time() await ctx.edit(content=("Loading webpage..." if not eager else "Loading & screenshotting webpage...")) start_request = time.time() try: await asyncio.to_thread(driver.get, url) except selenium.common.WebDriverException as e: await self.bot.loop.run_in_executor(None, driver.quit) if "TimeoutException" in str(e): return await ctx.respond("Timed out while loading webpage.") else: return await ctx.respond("Failed to load webpage:\n```\n%s\n```" % str(e.msg)) except Exception as e: await self.bot.loop.run_in_executor(None, driver.quit) await ctx.respond("Failed to get the webpage: " + str(e)) raise end_request = time.time() if not eager: now = discord.utils.utcnow() expires = now + datetime.timedelta(seconds=render_timeout) await ctx.edit(content=f"Rendering (expires {discord.utils.format_dt(expires, 'R')})...") start_wait = time.time() await asyncio.sleep(render_timeout) end_wait = time.time() else: start_wait = end_wait = 1 await ctx.edit(content="Saving screenshot...") start_save = time.time() ss = await asyncio.to_thread(driver.get_screenshot_as_png) file = io.BytesIO() await asyncio.to_thread(file.write, ss) file.seek(0) end_save = time.time() if len(await asyncio.to_thread(file.getvalue)) > 8 * 1024 * 1024: await ctx.edit(content="Compressing screenshot...") start_compress = time.time() file = await asyncio.to_thread(self.compress_png, file) fn = "screenshot.webp" end_compress = time.time() else: fn = "screenshot.png" start_compress = end_compress = 1 await ctx.edit(content="Cleaning up...") start_cleanup = time.time() await self.bot.loop.run_in_executor(None, driver.quit) end_cleanup = time.time() screenshot_size_mb = round(len(await asyncio.to_thread(file.getvalue)) / 1024 / 1024, 2) def seconds(start: float, end: float) -> float: return round(end - start, 2) embed = discord.Embed( title=f"Screenshot of {parsed.hostname}", description=f"Init time: {seconds(start_init, end_init)}s\n" f"Request time: {seconds(start_request, end_request)}s\n" f"Wait time: {seconds(start_wait, end_wait)}s\n" f"Save time: {seconds(start_save, end_save)}s\n" f"Compress time: {seconds(start_compress, end_compress)}s\n" f"Cleanup time: {seconds(start_cleanup, end_cleanup)}s\n" f"Total time: {seconds(start_init, end_cleanup)}s\n" f"Screenshot size: {screenshot_size_mb}MB\n" f"Resolution: {resolution}" f"Used proxy: {use_proxy}", colour=discord.Colour.dark_theme(), timestamp=discord.utils.utcnow() ) embed.set_image(url="attachment://" + fn) return await ctx.edit(content=None, embed=embed, file=discord.File(file, filename=fn)) def setup(bot): bot.add_cog(ScreenshotCog(bot))