""" Module that provides code-evaluation commands. This entire module is locked to NioBot.owner. """ import logging import shlex import tempfile import textwrap from urllib.parse import urlparse import aiohttp import niobot import asyncio import time import pprint import traceback import io import functools class EvalModule(niobot.Module): def __init__(self, bot: niobot.NioBot): if not bot.owner_id: raise RuntimeError( "No owner ID set in niobot. Refusing to load for security reasons." ) super().__init__(bot) async def owner_check(self, ctx: niobot.Context) -> bool: """Checks if the current user is the owner, and if not, gives them a very informative message.""" if not self.bot.is_owner(ctx.message.sender): await ctx.respond( "\N{CROSS MARK} Only the owner of this bot can run evaluation commands. Nice try, though!" ) return False return True @staticmethod def undress_codeblock(code: str) -> str: """Removes any code block syntax from the given string.""" code = code.strip() lines = code.splitlines(False) if len(lines[0]) == 2 or lines[0] in ( "py", "python", "python3", "sh", "shell", "bash", ): # likely a codeblock language identifier lines = lines[1:] return "\n".join(lines) @niobot.command("eval") @niobot.is_owner() async def python_eval(self, ctx: niobot.Context, code: str): """ Evaluates python code. All code is automatically wrapped in an async function, so you can do top-level awaits. You must return a value for it to be printed, or manually print() it. The following special variables are available: * `ctx` - The current context. * `loop` - The current event loop. * `stdout` - A StringIO object that you can write to print to stdout. * `stderr` - A StringIO object that you can write to print to stderr. * `_print` - The builtin print function. * `print` - A partial of the builtin print function that prints to the stdout string IO """ if not await self.owner_check(ctx): return code = self.undress_codeblock(code) stdout = io.StringIO() stderr = io.StringIO() g = { **globals().copy(), **locals().copy(), "bot": ctx.bot, "ctx": ctx, "loop": asyncio.get_event_loop(), "stdout": stdout, "stderr": stderr, "_print": print, "print": functools.partial(print, file=stdout), "niobot": niobot, "pprint": pprint.pprint, } code = code.replace("\u00a0", " ") # 00A0 is   code = textwrap.indent(code, " ") code = f"async def __eval():\n{code}" msg = await ctx.respond(f"Evaluating:\n```py\n{code}\n```") e = await self.client.add_reaction(ctx.room, ctx.message, "\N{HAMMER}") # noinspection PyBroadException try: start = time.time() * 1000 runner = await niobot.run_blocking(exec, code, g) end_compile = time.time() * 1000 result = await g["__eval"]() end_exec = time.time() * 1000 total_time = end_exec - start time_str = "%.2fms (compile: %.2fms, exec: %.2fms)" % ( total_time, end_compile - start, end_exec - end_compile, ) lines = ["Time: " + time_str + "\n"] if result is not None: if isinstance(result, (list, dict, tuple, set, int, float)): result = pprint.pformat( result, indent=4, width=80, underscore_numbers=True, ) else: result = repr(result) lines += ["Result:\n", "```", str(result), "```\n"] else: lines += ["No result.\n"] if stdout.getvalue(): lines.append("Stdout:\n```\n" + stdout.getvalue() + "```") if stderr.getvalue(): lines.append("Stderr:\n```\n" + stderr.getvalue() + "```") await ctx.client.add_reaction( ctx.room, ctx.message, "\N{WHITE HEAVY CHECK MARK}" ) await msg.edit("\n".join(lines)) except Exception: await ctx.client.add_reaction(ctx.room, ctx.message, "\N{CROSS MARK}") await msg.edit(f"Error:\n```py\n{traceback.format_exc()}```") @niobot.command("shell") @niobot.is_owner() async def shell(self, ctx: niobot.Context, command: str): """Runs a shell command in a subprocess. Does not output live.""" if command.startswith("sh\n"): command = command[3:] if command.startswith("$ "): command = command[2:] if not await self.owner_check(ctx): return msg = await ctx.respond(f"Running command: `{command}`") cmd, args = command.split(" ", 1) await self.client.add_reaction(ctx.room, ctx.message, "\N{HAMMER}") # noinspection PyBroadException try: with tempfile.TemporaryDirectory() as tmpdir: proc = await asyncio.create_subprocess_exec( cmd, *shlex.split(args), stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, stdin=asyncio.subprocess.DEVNULL, cwd=tmpdir, ) await proc.wait() await ctx.client.add_reaction( ctx.room, ctx.message, "\N{WHITE HEAVY CHECK MARK}" ) lines = [f"Input:\n```sh\n$ {cmd} {args}\n```\n"] stdout = ( (await proc.stdout.read()) .decode() .replace("```", "`\u200b`\u200b`") ) stderr = ( (await proc.stderr.read()) .decode() .replace("```", "`\u200b`\u200b`") ) if stdout: lines.append("Stdout:\n```\n" + stdout + "```\n") if stderr: lines.append("Stderr:\n```\n" + stderr + "```\n") await msg.edit("\n".join(lines)) except Exception: await ctx.client.add_reaction(ctx.room, ctx.message, "\N{CROSS MARK}") await msg.edit(f"Error:\n```py\n{traceback.format_exc()}```") @niobot.command("thumbnail") @niobot.is_owner() async def thumbnail( self, ctx: niobot.Context, url: str, width: int = 320, height: int = 240 ): """Get the thumbnail for a URL. URL can be an mxc, in which case C2S will be used to get a thumbnail. Supplying a HTTP URL will use niobot thumbnailing.""" if url.startswith("mxc://"): parsed = urlparse(url) media_id = parsed.path.lstrip("/") thumbnail = await ctx.bot.thumbnail( parsed.netloc, media_id, width=width, height=height ) if not isinstance(thumbnail, niobot.ThumbnailResponse): return await ctx.respond("Failed to thumbnail: %r" % thumbnail) fn = thumbnail.filename or "thumbnail.webp" if isinstance(thumbnail.body, bytes): body = io.BytesIO(thumbnail.body) else: body = thumbnail.body file = await niobot.ImageAttachment.from_file(body, fn) return await ctx.respond("Downloaded from %r" % url, file=file) async with aiohttp.ClientSession( headers={"User-Agent": niobot.__user_agent__} ) as client: async with client.get(url) as response: if response.status != 200: await ctx.respond("Error: %d" % response.status) return data = await response.read() data = io.BytesIO(data) thumb = await niobot.run_blocking( niobot.ImageAttachment.thumbnailify_image, data, ) thumb.save("file.webp", "webp") attachment = await niobot.ImageAttachment.from_file( "file.webp", generate_blurhash=True ) await ctx.respond("thumbnail.webp", file=attachment) @niobot.command() @niobot.is_owner() async def runas(self, ctx: niobot.Context, user: str, command: str, *args): """Run a command as another user.""" if args is not None and args[0] in ["%null%", None] or args is None: args = tuple() if args: args = map(str, args) event = ctx.event event.sender = user event.body = ctx.invoking_prefix + command if args: event.body += " " + " ".join(args) try: result = await self.bot.process_message(ctx.room, event) await ctx.respond(f"Result of runas: `{result!r}`") except Exception as e: await ctx.respond(f"Error in runas process: `{e!r}`") @niobot.command() @niobot.is_owner() async def rooms(self, ctx: niobot.Context, operation: str, room: str = None): """room management""" match operation.casefold(): case "join": result = await self.bot.join(room) return await ctx.respond(repr(result)) case "leave": if room.startswith("#"): resolved = await self.bot.room_resolve_alias(room) if not isinstance(resolved, niobot.RoomResolveAliasResponse): return await ctx.respond( "Failed to resolve room ID for: %r", room ) room = resolved.room_id response = await self.bot.room_leave(room) return await ctx.respond(repr(response)) case _: return await ctx.respond("Unknown operation.") @niobot.command() @niobot.is_owner() async def botban(self, ctx: niobot.Context, target: str): """Bans someone from the bot.""" key = self.bot.redis_key("botban", target) if await self.bot.redis.get(key) is not None: return await ctx.respond("User/room is already banned.") await self.bot.redis.set(key, 1) return await ctx.respond("ok") @niobot.command() @niobot.is_owner() async def botbanlist(self, ctx: niobot.Context): return await ctx.respond("idk, keys are hashed lol") @niobot.command() @niobot.is_owner() async def botunban(self, ctx: niobot.Context, target: str): """Unbans someone from the bot.""" key = self.bot.redis_key("botban", target) if await self.bot.redis.get(key) is None: return await ctx.respond("User/room is not banned.") await self.bot.redis.delete(key) return await ctx.respond("ok")