nonsensebot/app/modules/evaluation.py

242 lines
9.7 KiB
Python
Raw Normal View History

2024-08-03 01:25:44 +01:00
"""
Module that provides code-evaluation commands.
This entire module is locked to NioBot.owner.
"""
2024-09-08 01:37:14 +01:00
import logging
2024-08-03 01:25:44 +01:00
import shlex
import tempfile
import textwrap
from urllib.parse import urlparse
2024-08-03 01:25:44 +01:00
import aiohttp
import niobot
import asyncio
import time
import pprint
import traceback
import io
import functools
class EvalModule(niobot.Module):
def __init__(self, bot: niobot.NioBot):
if not bot.owner_id:
raise RuntimeError("No owner ID set in niobot. Refusing to load for security reasons.")
super().__init__(bot)
async def owner_check(self, ctx: niobot.Context) -> bool:
"""Checks if the current user is the owner, and if not, gives them a very informative message."""
if not self.bot.is_owner(ctx.message.sender):
await ctx.respond(
"\N{cross mark} Only the owner of this bot can run evaluation commands. Nice try, though!"
)
return False
return True
@staticmethod
def undress_codeblock(code: str) -> str:
"""Removes any code block syntax from the given string."""
code = code.strip()
lines = code.splitlines(False)
2024-09-12 01:45:38 +01:00
if len(lines[0]) == 2 or lines[0] in ("py", "python", "python3", "sh", "shell", "bash"):
# likely a codeblock language identifier
2024-08-03 01:25:44 +01:00
lines = lines[1:]
return "\n".join(lines)
@niobot.command("eval")
2024-09-12 01:45:38 +01:00
@niobot.is_owner()
2024-08-03 01:25:44 +01:00
async def python_eval(self, ctx: niobot.Context, code: str):
2024-09-12 01:45:38 +01:00
"""
Evaluates python code.
2024-08-03 01:25:44 +01:00
All code is automatically wrapped in an async function, so you can do top-level awaits.
You must return a value for it to be printed, or manually print() it.
The following special variables are available:
* `ctx` - The current context.
* `loop` - The current event loop.
* `stdout` - A StringIO object that you can write to print to stdout.
* `stderr` - A StringIO object that you can write to print to stderr.
* `_print` - The builtin print function.
* `print` - A partial of the builtin print function that prints to the stdout string IO
"""
if not await self.owner_check(ctx):
return
code = self.undress_codeblock(code)
stdout = io.StringIO()
stderr = io.StringIO()
g = {
**globals().copy(),
**locals().copy(),
2024-09-12 01:45:38 +01:00
"bot": ctx.bot,
2024-08-03 01:25:44 +01:00
"ctx": ctx,
"loop": asyncio.get_event_loop(),
"stdout": stdout,
"stderr": stderr,
"_print": print,
"print": functools.partial(print, file=stdout),
"niobot": niobot,
"pprint": pprint.pprint,
}
code = code.replace("\u00A0", " ") # 00A0 is  
code = textwrap.indent(code, " ")
code = f"async def __eval():\n{code}"
msg = await ctx.respond(f"Evaluating:\n```py\n{code}\n```")
e = await self.client.add_reaction(ctx.room, ctx.message, "\N{hammer}")
# noinspection PyBroadException
try:
start = time.time() * 1000
runner = await niobot.run_blocking(exec, code, g)
end_compile = time.time() * 1000
result = await g["__eval"]()
end_exec = time.time() * 1000
total_time = end_exec - start
time_str = "%.2fms (compile: %.2fms, exec: %.2fms)" % (
total_time,
end_compile - start,
end_exec - end_compile,
)
lines = ["Time: " + time_str + "\n"]
if result is not None:
if isinstance(result, (list, dict, tuple, set, int, float)):
result = pprint.pformat(
result,
indent=4,
width=80,
underscore_numbers=True,
)
else:
result = repr(result)
lines += ["Result:\n", "```", str(result), "```\n"]
else:
lines += ["No result.\n"]
if stdout.getvalue():
lines.append("Stdout:\n```\n" + stdout.getvalue() + "```")
if stderr.getvalue():
lines.append("Stderr:\n```\n" + stderr.getvalue() + "```")
await ctx.client.add_reaction(ctx.room, ctx.message, "\N{white heavy check mark}")
await msg.edit("\n".join(lines))
except Exception:
await ctx.client.add_reaction(ctx.room, ctx.message, "\N{cross mark}")
await msg.edit(f"Error:\n```py\n{traceback.format_exc()}```")
@niobot.command("shell")
2024-09-12 01:45:38 +01:00
@niobot.is_owner()
2024-08-03 01:25:44 +01:00
async def shell(self, ctx: niobot.Context, command: str):
"""Runs a shell command in a subprocess. Does not output live."""
if command.startswith("sh\n"):
command = command[3:]
if command.startswith("$ "):
command = command[2:]
if not await self.owner_check(ctx):
return
msg = await ctx.respond(f"Running command: `{command}`")
cmd, args = command.split(" ", 1)
await self.client.add_reaction(ctx.room, ctx.message, "\N{hammer}")
# noinspection PyBroadException
try:
with tempfile.TemporaryDirectory() as tmpdir:
proc = await asyncio.create_subprocess_exec(
cmd,
*shlex.split(args),
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
stdin=asyncio.subprocess.DEVNULL,
cwd=tmpdir,
)
await proc.wait()
await ctx.client.add_reaction(ctx.room, ctx.message, "\N{white heavy check mark}")
lines = [f"Input:\n```sh\n$ {cmd} {args}\n```\n"]
stdout = (await proc.stdout.read()).decode().replace("```", "`\u200b`\u200b`")
stderr = (await proc.stderr.read()).decode().replace("```", "`\u200b`\u200b`")
if stdout:
lines.append("Stdout:\n```\n" + stdout + "```\n")
if stderr:
lines.append("Stderr:\n```\n" + stderr + "```\n")
await msg.edit("\n".join(lines))
except Exception:
await ctx.client.add_reaction(ctx.room, ctx.message, "\N{cross mark}")
await msg.edit(f"Error:\n```py\n{traceback.format_exc()}```")
2024-09-08 01:39:16 +01:00
@niobot.command("thumbnail")
2024-08-03 01:25:44 +01:00
@niobot.is_owner()
async def thumbnail(self, ctx: niobot.Context, url: str, width: int = 320, height: int = 240):
2024-09-08 01:39:16 +01:00
"""Get the thumbnail for a URL.
URL can be an mxc, in which case C2S will be used to get a thumbnail.
Supplying a HTTP URL will use niobot thumbnailing."""
if url.startswith("mxc://"):
parsed = urlparse(url)
media_id = parsed.path.lstrip("/")
thumbnail = await ctx.bot.thumbnail(
parsed.netloc,
media_id,
width=width,
height=height
)
if not isinstance(thumbnail, niobot.ThumbnailResponse):
return await ctx.respond("Failed to thumbnail: %r" % thumbnail)
2024-09-08 01:29:44 +01:00
fn = thumbnail.filename or "thumbnail.webp"
2024-09-08 01:32:35 +01:00
if isinstance(thumbnail.body, bytes):
body = io.BytesIO(thumbnail.body)
else:
body = thumbnail.body
file = await niobot.ImageAttachment.from_file(body, fn)
return await ctx.respond("Downloaded from %r" % url, file=file)
2024-08-03 01:25:44 +01:00
async with aiohttp.ClientSession(headers={"User-Agent": niobot.__user_agent__}) as client:
async with client.get(url) as response:
if response.status != 200:
await ctx.respond("Error: %d" % response.status)
return
data = await response.read()
data = io.BytesIO(data)
thumb = await niobot.run_blocking(
niobot.ImageAttachment.thumbnailify_image,
data,
)
thumb.save("file.webp", "webp")
attachment = await niobot.ImageAttachment.from_file("file.webp", generate_blurhash=True)
await ctx.respond("thumbnail.webp", file=attachment)
2024-09-08 02:24:11 +01:00
@niobot.command()
@niobot.is_owner()
2024-09-12 01:45:38 +01:00
async def runas(self, ctx: niobot.Context, user: str, command: str, *args):
2024-09-08 02:24:11 +01:00
"""Run a command as another user."""
2024-09-12 01:45:38 +01:00
args = args or []
2024-09-08 02:27:01 +01:00
if args and args[0] == "%null%":
args = tuple()
2024-09-08 02:24:11 +01:00
event = ctx.event
event.sender = user
2024-09-08 02:28:27 +01:00
event.body = ctx.invoking_prefix + command
2024-09-08 02:27:01 +01:00
if args:
event.body += " " + " ".join(args)
2024-09-08 02:24:11 +01:00
try:
2024-09-08 02:27:51 +01:00
result = await self.bot.process_message(ctx.room, event)
2024-09-08 02:24:11 +01:00
await ctx.respond(f"Result of runas: `{result!r}`")
except Exception as e:
await ctx.respond(f"Error in runas process: `{e!r}`")
2024-09-14 14:46:18 +01:00
@niobot.command()
@niobot.is_owner()
async def rooms(self, ctx: niobot.Context, operation: str, room: str = None):
"""room management"""
match operation.casefold():
case "join":
result = await self.bot.join(room)
return await ctx.respond(repr(result))
case "leave":
if room.startswith("#"):
resolved = await self.bot.room_resolve_alias(room)
if not isinstance(resolved, niobot.RoomResolveAliasResponse):
return await ctx.respond("Failed to resolve room ID for: %r", room)
room = resolved.room_id
response = await self.bot.room_leave(room)
return await ctx.respond(repr(response))
case _:
return await ctx.respond("Unknown operation.")