nonsensebot/app/modules/evaluation.py
2024-09-18 21:22:22 +01:00

295 lines
11 KiB
Python

"""
Module that provides code-evaluation commands.
This entire module is locked to NioBot.owner.
"""
import logging
import shlex
import tempfile
import textwrap
from urllib.parse import urlparse
import aiohttp
import niobot
import asyncio
import time
import pprint
import traceback
import io
import functools
class EvalModule(niobot.Module):
def __init__(self, bot: niobot.NioBot):
if not bot.owner_id:
raise RuntimeError(
"No owner ID set in niobot. Refusing to load for security reasons."
)
super().__init__(bot)
async def owner_check(self, ctx: niobot.Context) -> bool:
"""Checks if the current user is the owner, and if not, gives them a very informative message."""
if not self.bot.is_owner(ctx.message.sender):
await ctx.respond(
"\N{CROSS MARK} Only the owner of this bot can run evaluation commands. Nice try, though!"
)
return False
return True
@staticmethod
def undress_codeblock(code: str) -> str:
"""Removes any code block syntax from the given string."""
code = code.strip()
lines = code.splitlines(False)
if len(lines[0]) == 2 or lines[0] in (
"py",
"python",
"python3",
"sh",
"shell",
"bash",
):
# likely a codeblock language identifier
lines = lines[1:]
return "\n".join(lines)
@niobot.command("eval")
@niobot.is_owner()
async def python_eval(self, ctx: niobot.Context, code: str):
"""
Evaluates python code.
All code is automatically wrapped in an async function, so you can do top-level awaits.
You must return a value for it to be printed, or manually print() it.
The following special variables are available:
* `ctx` - The current context.
* `loop` - The current event loop.
* `stdout` - A StringIO object that you can write to print to stdout.
* `stderr` - A StringIO object that you can write to print to stderr.
* `_print` - The builtin print function.
* `print` - A partial of the builtin print function that prints to the stdout string IO
"""
if not await self.owner_check(ctx):
return
code = self.undress_codeblock(code)
stdout = io.StringIO()
stderr = io.StringIO()
g = {
**globals().copy(),
**locals().copy(),
"bot": ctx.bot,
"ctx": ctx,
"loop": asyncio.get_event_loop(),
"stdout": stdout,
"stderr": stderr,
"_print": print,
"print": functools.partial(print, file=stdout),
"niobot": niobot,
"pprint": pprint.pprint,
}
code = code.replace("\u00a0", " ") # 00A0 is  
code = textwrap.indent(code, " ")
code = f"async def __eval():\n{code}"
msg = await ctx.respond(f"Evaluating:\n```py\n{code}\n```")
e = await self.client.add_reaction(ctx.room, ctx.message, "\N{HAMMER}")
# noinspection PyBroadException
try:
start = time.time() * 1000
runner = await niobot.run_blocking(exec, code, g)
end_compile = time.time() * 1000
result = await g["__eval"]()
end_exec = time.time() * 1000
total_time = end_exec - start
time_str = "%.2fms (compile: %.2fms, exec: %.2fms)" % (
total_time,
end_compile - start,
end_exec - end_compile,
)
lines = ["Time: " + time_str + "\n"]
if result is not None:
if isinstance(result, (list, dict, tuple, set, int, float)):
result = pprint.pformat(
result,
indent=4,
width=80,
underscore_numbers=True,
)
else:
result = repr(result)
lines += ["Result:\n", "```", str(result), "```\n"]
else:
lines += ["No result.\n"]
if stdout.getvalue():
lines.append("Stdout:\n```\n" + stdout.getvalue() + "```")
if stderr.getvalue():
lines.append("Stderr:\n```\n" + stderr.getvalue() + "```")
await ctx.client.add_reaction(
ctx.room, ctx.message, "\N{WHITE HEAVY CHECK MARK}"
)
await msg.edit("\n".join(lines))
except Exception:
await ctx.client.add_reaction(ctx.room, ctx.message, "\N{CROSS MARK}")
await msg.edit(f"Error:\n```py\n{traceback.format_exc()}```")
@niobot.command("shell")
@niobot.is_owner()
async def shell(self, ctx: niobot.Context, command: str):
"""Runs a shell command in a subprocess. Does not output live."""
if command.startswith("sh\n"):
command = command[3:]
if command.startswith("$ "):
command = command[2:]
if not await self.owner_check(ctx):
return
msg = await ctx.respond(f"Running command: `{command}`")
cmd, args = command.split(" ", 1)
await self.client.add_reaction(ctx.room, ctx.message, "\N{HAMMER}")
# noinspection PyBroadException
try:
with tempfile.TemporaryDirectory() as tmpdir:
proc = await asyncio.create_subprocess_exec(
cmd,
*shlex.split(args),
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
stdin=asyncio.subprocess.DEVNULL,
cwd=tmpdir,
)
await proc.wait()
await ctx.client.add_reaction(
ctx.room, ctx.message, "\N{WHITE HEAVY CHECK MARK}"
)
lines = [f"Input:\n```sh\n$ {cmd} {args}\n```\n"]
stdout = (
(await proc.stdout.read())
.decode()
.replace("```", "`\u200b`\u200b`")
)
stderr = (
(await proc.stderr.read())
.decode()
.replace("```", "`\u200b`\u200b`")
)
if stdout:
lines.append("Stdout:\n```\n" + stdout + "```\n")
if stderr:
lines.append("Stderr:\n```\n" + stderr + "```\n")
await msg.edit("\n".join(lines))
except Exception:
await ctx.client.add_reaction(ctx.room, ctx.message, "\N{CROSS MARK}")
await msg.edit(f"Error:\n```py\n{traceback.format_exc()}```")
@niobot.command("thumbnail")
@niobot.is_owner()
async def thumbnail(
self, ctx: niobot.Context, url: str, width: int = 320, height: int = 240
):
"""Get the thumbnail for a URL.
URL can be an mxc, in which case C2S will be used to get a thumbnail.
Supplying a HTTP URL will use niobot thumbnailing."""
if url.startswith("mxc://"):
parsed = urlparse(url)
media_id = parsed.path.lstrip("/")
thumbnail = await ctx.bot.thumbnail(
parsed.netloc, media_id, width=width, height=height
)
if not isinstance(thumbnail, niobot.ThumbnailResponse):
return await ctx.respond("Failed to thumbnail: %r" % thumbnail)
fn = thumbnail.filename or "thumbnail.webp"
if isinstance(thumbnail.body, bytes):
body = io.BytesIO(thumbnail.body)
else:
body = thumbnail.body
file = await niobot.ImageAttachment.from_file(body, fn)
return await ctx.respond("Downloaded from %r" % url, file=file)
async with aiohttp.ClientSession(
headers={"User-Agent": niobot.__user_agent__}
) as client:
async with client.get(url) as response:
if response.status != 200:
await ctx.respond("Error: %d" % response.status)
return
data = await response.read()
data = io.BytesIO(data)
thumb = await niobot.run_blocking(
niobot.ImageAttachment.thumbnailify_image,
data,
)
thumb.save("file.webp", "webp")
attachment = await niobot.ImageAttachment.from_file(
"file.webp", generate_blurhash=True
)
await ctx.respond("thumbnail.webp", file=attachment)
@niobot.command()
@niobot.is_owner()
async def runas(self, ctx: niobot.Context, user: str, command: str, *args):
"""Run a command as another user."""
if args is not None and args[0] in ["%null%", None] or args is None:
args = tuple()
if args:
args = map(str, args)
event = ctx.event
event.sender = user
event.body = ctx.invoking_prefix + command
if args:
event.body += " " + " ".join(args)
try:
result = await self.bot.process_message(ctx.room, event)
await ctx.respond(f"Result of runas: `{result!r}`")
except Exception as e:
await ctx.respond(f"Error in runas process: `{e!r}`")
@niobot.command()
@niobot.is_owner()
async def rooms(self, ctx: niobot.Context, operation: str, room: str = None):
"""room management"""
match operation.casefold():
case "join":
result = await self.bot.join(room)
return await ctx.respond(repr(result))
case "leave":
if room.startswith("#"):
resolved = await self.bot.room_resolve_alias(room)
if not isinstance(resolved, niobot.RoomResolveAliasResponse):
return await ctx.respond(
"Failed to resolve room ID for: %r", room
)
room = resolved.room_id
response = await self.bot.room_leave(room)
return await ctx.respond(repr(response))
case _:
return await ctx.respond("Unknown operation.")
@niobot.command()
@niobot.is_owner()
async def botban(self, ctx: niobot.Context, target: str):
"""Bans someone from the bot."""
key = self.bot.redis_key("botban", target)
if await self.bot.redis.get(key) is not None:
return await ctx.respond("User/room is already banned.")
await self.bot.redis.set(key, 1)
return await ctx.respond("ok")
@niobot.command()
@niobot.is_owner()
async def botbanlist(self, ctx: niobot.Context):
return await ctx.respond("idk, keys are hashed lol")
@niobot.command()
@niobot.is_owner()
async def botunban(self, ctx: niobot.Context, target: str):
"""Unbans someone from the bot."""
key = self.bot.redis_key("botban", target)
if await self.bot.redis.get(key) is None:
return await ctx.respond("User/room is not banned.")
await self.bot.redis.delete(key)
return await ctx.respond("ok")