nonsensebot/app/modules/evaluation.py

296 lines
11 KiB
Python
Raw Normal View History

2024-08-03 01:25:44 +01:00
"""
Module that provides code-evaluation commands.
This entire module is locked to NioBot.owner.
"""
2024-09-15 23:03:24 +01:00
2024-09-08 01:37:14 +01:00
import logging
2024-08-03 01:25:44 +01:00
import shlex
import tempfile
import textwrap
from urllib.parse import urlparse
2024-08-03 01:25:44 +01:00
import aiohttp
import niobot
import asyncio
import time
import pprint
import traceback
import io
import functools
class EvalModule(niobot.Module):
def __init__(self, bot: niobot.NioBot):
if not bot.owner_id:
2024-09-15 23:03:24 +01:00
raise RuntimeError(
"No owner ID set in niobot. Refusing to load for security reasons."
)
2024-08-03 01:25:44 +01:00
super().__init__(bot)
async def owner_check(self, ctx: niobot.Context) -> bool:
"""Checks if the current user is the owner, and if not, gives them a very informative message."""
if not self.bot.is_owner(ctx.message.sender):
await ctx.respond(
2024-09-15 23:03:24 +01:00
"\N{CROSS MARK} Only the owner of this bot can run evaluation commands. Nice try, though!"
2024-08-03 01:25:44 +01:00
)
return False
return True
@staticmethod
def undress_codeblock(code: str) -> str:
"""Removes any code block syntax from the given string."""
code = code.strip()
lines = code.splitlines(False)
2024-09-15 23:03:24 +01:00
if len(lines[0]) == 2 or lines[0] in (
"py",
"python",
"python3",
"sh",
"shell",
"bash",
):
2024-09-12 01:45:38 +01:00
# likely a codeblock language identifier
2024-08-03 01:25:44 +01:00
lines = lines[1:]
return "\n".join(lines)
@niobot.command("eval")
2024-09-12 01:45:38 +01:00
@niobot.is_owner()
2024-08-03 01:25:44 +01:00
async def python_eval(self, ctx: niobot.Context, code: str):
2024-09-12 01:45:38 +01:00
"""
Evaluates python code.
2024-08-03 01:25:44 +01:00
All code is automatically wrapped in an async function, so you can do top-level awaits.
You must return a value for it to be printed, or manually print() it.
The following special variables are available:
* `ctx` - The current context.
* `loop` - The current event loop.
* `stdout` - A StringIO object that you can write to print to stdout.
* `stderr` - A StringIO object that you can write to print to stderr.
* `_print` - The builtin print function.
* `print` - A partial of the builtin print function that prints to the stdout string IO
"""
if not await self.owner_check(ctx):
return
code = self.undress_codeblock(code)
stdout = io.StringIO()
stderr = io.StringIO()
g = {
**globals().copy(),
**locals().copy(),
2024-09-12 01:45:38 +01:00
"bot": ctx.bot,
2024-08-03 01:25:44 +01:00
"ctx": ctx,
"loop": asyncio.get_event_loop(),
"stdout": stdout,
"stderr": stderr,
"_print": print,
"print": functools.partial(print, file=stdout),
"niobot": niobot,
"pprint": pprint.pprint,
}
2024-09-15 23:03:24 +01:00
code = code.replace("\u00a0", " ") # 00A0 is  
2024-08-03 01:25:44 +01:00
code = textwrap.indent(code, " ")
code = f"async def __eval():\n{code}"
msg = await ctx.respond(f"Evaluating:\n```py\n{code}\n```")
2024-09-15 23:03:24 +01:00
e = await self.client.add_reaction(ctx.room, ctx.message, "\N{HAMMER}")
2024-08-03 01:25:44 +01:00
# noinspection PyBroadException
try:
start = time.time() * 1000
runner = await niobot.run_blocking(exec, code, g)
end_compile = time.time() * 1000
result = await g["__eval"]()
end_exec = time.time() * 1000
total_time = end_exec - start
time_str = "%.2fms (compile: %.2fms, exec: %.2fms)" % (
total_time,
end_compile - start,
end_exec - end_compile,
)
lines = ["Time: " + time_str + "\n"]
if result is not None:
if isinstance(result, (list, dict, tuple, set, int, float)):
result = pprint.pformat(
result,
indent=4,
width=80,
underscore_numbers=True,
)
else:
result = repr(result)
lines += ["Result:\n", "```", str(result), "```\n"]
else:
lines += ["No result.\n"]
if stdout.getvalue():
lines.append("Stdout:\n```\n" + stdout.getvalue() + "```")
if stderr.getvalue():
lines.append("Stderr:\n```\n" + stderr.getvalue() + "```")
2024-09-15 23:03:24 +01:00
await ctx.client.add_reaction(
ctx.room, ctx.message, "\N{WHITE HEAVY CHECK MARK}"
)
2024-08-03 01:25:44 +01:00
await msg.edit("\n".join(lines))
except Exception:
2024-09-15 23:03:24 +01:00
await ctx.client.add_reaction(ctx.room, ctx.message, "\N{CROSS MARK}")
2024-08-03 01:25:44 +01:00
await msg.edit(f"Error:\n```py\n{traceback.format_exc()}```")
@niobot.command("shell")
2024-09-12 01:45:38 +01:00
@niobot.is_owner()
2024-08-03 01:25:44 +01:00
async def shell(self, ctx: niobot.Context, command: str):
"""Runs a shell command in a subprocess. Does not output live."""
if command.startswith("sh\n"):
command = command[3:]
if command.startswith("$ "):
command = command[2:]
if not await self.owner_check(ctx):
return
msg = await ctx.respond(f"Running command: `{command}`")
cmd, args = command.split(" ", 1)
2024-09-15 23:03:24 +01:00
await self.client.add_reaction(ctx.room, ctx.message, "\N{HAMMER}")
2024-08-03 01:25:44 +01:00
# noinspection PyBroadException
try:
with tempfile.TemporaryDirectory() as tmpdir:
proc = await asyncio.create_subprocess_exec(
cmd,
*shlex.split(args),
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
stdin=asyncio.subprocess.DEVNULL,
cwd=tmpdir,
)
await proc.wait()
2024-09-15 23:03:24 +01:00
await ctx.client.add_reaction(
ctx.room, ctx.message, "\N{WHITE HEAVY CHECK MARK}"
)
2024-08-03 01:25:44 +01:00
lines = [f"Input:\n```sh\n$ {cmd} {args}\n```\n"]
2024-09-15 23:03:24 +01:00
stdout = (
(await proc.stdout.read())
.decode()
.replace("```", "`\u200b`\u200b`")
)
stderr = (
(await proc.stderr.read())
.decode()
.replace("```", "`\u200b`\u200b`")
)
2024-08-03 01:25:44 +01:00
if stdout:
lines.append("Stdout:\n```\n" + stdout + "```\n")
if stderr:
lines.append("Stderr:\n```\n" + stderr + "```\n")
await msg.edit("\n".join(lines))
except Exception:
2024-09-15 23:03:24 +01:00
await ctx.client.add_reaction(ctx.room, ctx.message, "\N{CROSS MARK}")
2024-08-03 01:25:44 +01:00
await msg.edit(f"Error:\n```py\n{traceback.format_exc()}```")
2024-09-08 01:39:16 +01:00
@niobot.command("thumbnail")
2024-08-03 01:25:44 +01:00
@niobot.is_owner()
2024-09-15 23:03:24 +01:00
async def thumbnail(
self, ctx: niobot.Context, url: str, width: int = 320, height: int = 240
):
2024-09-08 01:39:16 +01:00
"""Get the thumbnail for a URL.
URL can be an mxc, in which case C2S will be used to get a thumbnail.
Supplying a HTTP URL will use niobot thumbnailing."""
if url.startswith("mxc://"):
parsed = urlparse(url)
media_id = parsed.path.lstrip("/")
thumbnail = await ctx.bot.thumbnail(
2024-09-15 23:03:24 +01:00
parsed.netloc, media_id, width=width, height=height
)
if not isinstance(thumbnail, niobot.ThumbnailResponse):
return await ctx.respond("Failed to thumbnail: %r" % thumbnail)
2024-09-08 01:29:44 +01:00
fn = thumbnail.filename or "thumbnail.webp"
2024-09-08 01:32:35 +01:00
if isinstance(thumbnail.body, bytes):
body = io.BytesIO(thumbnail.body)
else:
body = thumbnail.body
file = await niobot.ImageAttachment.from_file(body, fn)
return await ctx.respond("Downloaded from %r" % url, file=file)
2024-09-15 23:03:24 +01:00
async with aiohttp.ClientSession(
headers={"User-Agent": niobot.__user_agent__}
) as client:
2024-08-03 01:25:44 +01:00
async with client.get(url) as response:
if response.status != 200:
await ctx.respond("Error: %d" % response.status)
return
data = await response.read()
data = io.BytesIO(data)
thumb = await niobot.run_blocking(
niobot.ImageAttachment.thumbnailify_image,
data,
)
thumb.save("file.webp", "webp")
2024-09-15 23:03:24 +01:00
attachment = await niobot.ImageAttachment.from_file(
"file.webp", generate_blurhash=True
)
2024-08-03 01:25:44 +01:00
await ctx.respond("thumbnail.webp", file=attachment)
2024-09-08 02:24:11 +01:00
@niobot.command()
@niobot.is_owner()
2024-09-12 01:45:38 +01:00
async def runas(self, ctx: niobot.Context, user: str, command: str, *args):
2024-09-08 02:24:11 +01:00
"""Run a command as another user."""
2024-09-18 21:22:22 +01:00
if args is not None and args[0] in ["%null%", None] or args is None:
2024-09-08 02:27:01 +01:00
args = tuple()
2024-09-18 21:22:22 +01:00
2024-09-18 21:21:57 +01:00
if args:
args = map(str, args)
2024-09-08 02:24:11 +01:00
event = ctx.event
event.sender = user
2024-09-08 02:28:27 +01:00
event.body = ctx.invoking_prefix + command
2024-09-08 02:27:01 +01:00
if args:
event.body += " " + " ".join(args)
2024-09-08 02:24:11 +01:00
try:
2024-09-08 02:27:51 +01:00
result = await self.bot.process_message(ctx.room, event)
2024-09-08 02:24:11 +01:00
await ctx.respond(f"Result of runas: `{result!r}`")
except Exception as e:
await ctx.respond(f"Error in runas process: `{e!r}`")
2024-09-14 14:46:18 +01:00
@niobot.command()
@niobot.is_owner()
async def rooms(self, ctx: niobot.Context, operation: str, room: str = None):
"""room management"""
match operation.casefold():
case "join":
result = await self.bot.join(room)
return await ctx.respond(repr(result))
case "leave":
if room.startswith("#"):
resolved = await self.bot.room_resolve_alias(room)
if not isinstance(resolved, niobot.RoomResolveAliasResponse):
2024-09-15 23:03:24 +01:00
return await ctx.respond(
"Failed to resolve room ID for: %r", room
)
2024-09-14 14:46:18 +01:00
room = resolved.room_id
response = await self.bot.room_leave(room)
return await ctx.respond(repr(response))
case _:
return await ctx.respond("Unknown operation.")
2024-09-18 21:03:25 +01:00
@niobot.command()
@niobot.is_owner()
async def botban(self, ctx: niobot.Context, target: str):
"""Bans someone from the bot."""
key = self.bot.redis_key("botban", target)
if await self.bot.redis.get(key) is not None:
return await ctx.respond("User/room is already banned.")
await self.bot.redis.set(key, 1)
return await ctx.respond("ok")
@niobot.command()
@niobot.is_owner()
async def botbanlist(self, ctx: niobot.Context):
return await ctx.respond("idk, keys are hashed lol")
@niobot.command()
@niobot.is_owner()
async def botunban(self, ctx: niobot.Context, target: str):
"""Unbans someone from the bot."""
key = self.bot.redis_key("botban", target)
if await self.bot.redis.get(key) is None:
return await ctx.respond("User/room is not banned.")
await self.bot.redis.delete(key)
return await ctx.respond("ok")