nonsensebot/app/modules/evaluation.py

193 lines
7.9 KiB
Python
Raw Normal View History

2024-08-03 01:25:44 +01:00
"""
Module that provides code-evaluation commands.
This entire module is locked to NioBot.owner.
"""
import shlex
import tempfile
import textwrap
from urllib.parse import urlparse
2024-08-03 01:25:44 +01:00
import aiohttp
import niobot
import asyncio
import time
import pprint
import traceback
import io
import functools
class EvalModule(niobot.Module):
def __init__(self, bot: niobot.NioBot):
if not bot.owner_id:
raise RuntimeError("No owner ID set in niobot. Refusing to load for security reasons.")
super().__init__(bot)
async def owner_check(self, ctx: niobot.Context) -> bool:
"""Checks if the current user is the owner, and if not, gives them a very informative message."""
if not self.bot.is_owner(ctx.message.sender):
await ctx.respond(
"\N{cross mark} Only the owner of this bot can run evaluation commands. Nice try, though!"
)
return False
return True
@staticmethod
def undress_codeblock(code: str) -> str:
"""Removes any code block syntax from the given string."""
code = code.strip()
lines = code.splitlines(False)
if len(lines[0]) == 2 or lines[0] in ("py", "python", "python3"): # likely a codeblock language identifier
lines = lines[1:]
return "\n".join(lines)
@niobot.command("eval")
async def python_eval(self, ctx: niobot.Context, code: str):
"""Evaluates python code.
All code is automatically wrapped in an async function, so you can do top-level awaits.
You must return a value for it to be printed, or manually print() it.
The following special variables are available:
* `ctx` - The current context.
* `loop` - The current event loop.
* `stdout` - A StringIO object that you can write to print to stdout.
* `stderr` - A StringIO object that you can write to print to stderr.
* `_print` - The builtin print function.
* `print` - A partial of the builtin print function that prints to the stdout string IO
"""
if not await self.owner_check(ctx):
return
code = self.undress_codeblock(code)
stdout = io.StringIO()
stderr = io.StringIO()
g = {
**globals().copy(),
**locals().copy(),
"ctx": ctx,
"loop": asyncio.get_event_loop(),
"stdout": stdout,
"stderr": stderr,
"_print": print,
"print": functools.partial(print, file=stdout),
"niobot": niobot,
"pprint": pprint.pprint,
}
code = code.replace("\u00A0", " ") # 00A0 is  
code = textwrap.indent(code, " ")
code = f"async def __eval():\n{code}"
msg = await ctx.respond(f"Evaluating:\n```py\n{code}\n```")
e = await self.client.add_reaction(ctx.room, ctx.message, "\N{hammer}")
# noinspection PyBroadException
try:
start = time.time() * 1000
runner = await niobot.run_blocking(exec, code, g)
end_compile = time.time() * 1000
result = await g["__eval"]()
end_exec = time.time() * 1000
total_time = end_exec - start
time_str = "%.2fms (compile: %.2fms, exec: %.2fms)" % (
total_time,
end_compile - start,
end_exec - end_compile,
)
lines = ["Time: " + time_str + "\n"]
if result is not None:
if isinstance(result, (list, dict, tuple, set, int, float)):
result = pprint.pformat(
result,
indent=4,
width=80,
underscore_numbers=True,
)
else:
result = repr(result)
lines += ["Result:\n", "```", str(result), "```\n"]
else:
lines += ["No result.\n"]
if stdout.getvalue():
lines.append("Stdout:\n```\n" + stdout.getvalue() + "```")
if stderr.getvalue():
lines.append("Stderr:\n```\n" + stderr.getvalue() + "```")
await ctx.client.add_reaction(ctx.room, ctx.message, "\N{white heavy check mark}")
await msg.edit("\n".join(lines))
except Exception:
await ctx.client.add_reaction(ctx.room, ctx.message, "\N{cross mark}")
await msg.edit(f"Error:\n```py\n{traceback.format_exc()}```")
@niobot.command("shell")
async def shell(self, ctx: niobot.Context, command: str):
"""Runs a shell command in a subprocess. Does not output live."""
if command.startswith("sh\n"):
command = command[3:]
if command.startswith("$ "):
command = command[2:]
if not await self.owner_check(ctx):
return
msg = await ctx.respond(f"Running command: `{command}`")
cmd, args = command.split(" ", 1)
await self.client.add_reaction(ctx.room, ctx.message, "\N{hammer}")
# noinspection PyBroadException
try:
with tempfile.TemporaryDirectory() as tmpdir:
proc = await asyncio.create_subprocess_exec(
cmd,
*shlex.split(args),
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
stdin=asyncio.subprocess.DEVNULL,
cwd=tmpdir,
)
await proc.wait()
await ctx.client.add_reaction(ctx.room, ctx.message, "\N{white heavy check mark}")
lines = [f"Input:\n```sh\n$ {cmd} {args}\n```\n"]
stdout = (await proc.stdout.read()).decode().replace("```", "`\u200b`\u200b`")
stderr = (await proc.stderr.read()).decode().replace("```", "`\u200b`\u200b`")
if stdout:
lines.append("Stdout:\n```\n" + stdout + "```\n")
if stderr:
lines.append("Stderr:\n```\n" + stderr + "```\n")
await msg.edit("\n".join(lines))
except Exception:
await ctx.client.add_reaction(ctx.room, ctx.message, "\N{cross mark}")
await msg.edit(f"Error:\n```py\n{traceback.format_exc()}```")
@niobot.command("thumbnail", hidden=True)
@niobot.is_owner()
async def thumbnail(self, ctx: niobot.Context, url: str, width: int = 320, height: int = 240):
2024-08-03 01:25:44 +01:00
"""Get the thumbnail for a URL"""
if url.startswith("mxc://"):
parsed = urlparse(url)
media_id = parsed.path.lstrip("/")
thumbnail = await ctx.bot.thumbnail(
parsed.netloc,
media_id,
width=width,
height=height
)
if not isinstance(thumbnail, niobot.ThumbnailResponse):
return await ctx.respond("Failed to thumbnail: %r" % thumbnail)
2024-09-08 01:29:44 +01:00
fn = thumbnail.filename or "thumbnail.webp"
file = niobot.ImageAttachment.from_file(thumbnail.body, fn)
return await ctx.respond("Downloaded from %r" % url, file=file)
2024-08-03 01:25:44 +01:00
async with aiohttp.ClientSession(headers={"User-Agent": niobot.__user_agent__}) as client:
async with client.get(url) as response:
if response.status != 200:
await ctx.respond("Error: %d" % response.status)
return
data = await response.read()
data = io.BytesIO(data)
thumb = await niobot.run_blocking(
niobot.ImageAttachment.thumbnailify_image,
data,
)
thumb.save("file.webp", "webp")
attachment = await niobot.ImageAttachment.from_file("file.webp", generate_blurhash=True)
self.log.info("Generated thumbnail: %r", attachment)
await ctx.respond("thumbnail.webp", file=attachment)