295 lines
11 KiB
Python
295 lines
11 KiB
Python
"""
|
|
Module that provides code-evaluation commands.
|
|
This entire module is locked to NioBot.owner.
|
|
"""
|
|
|
|
import logging
|
|
import shlex
|
|
import tempfile
|
|
import textwrap
|
|
from urllib.parse import urlparse
|
|
|
|
import aiohttp
|
|
import niobot
|
|
import asyncio
|
|
import time
|
|
import pprint
|
|
import traceback
|
|
import io
|
|
import functools
|
|
|
|
|
|
class EvalModule(niobot.Module):
|
|
def __init__(self, bot: niobot.NioBot):
|
|
if not bot.owner_id:
|
|
raise RuntimeError(
|
|
"No owner ID set in niobot. Refusing to load for security reasons."
|
|
)
|
|
super().__init__(bot)
|
|
|
|
async def owner_check(self, ctx: niobot.Context) -> bool:
|
|
"""Checks if the current user is the owner, and if not, gives them a very informative message."""
|
|
if not self.bot.is_owner(ctx.message.sender):
|
|
await ctx.respond(
|
|
"\N{CROSS MARK} Only the owner of this bot can run evaluation commands. Nice try, though!"
|
|
)
|
|
return False
|
|
return True
|
|
|
|
@staticmethod
|
|
def undress_codeblock(code: str) -> str:
|
|
"""Removes any code block syntax from the given string."""
|
|
code = code.strip()
|
|
lines = code.splitlines(False)
|
|
if len(lines[0]) == 2 or lines[0] in (
|
|
"py",
|
|
"python",
|
|
"python3",
|
|
"sh",
|
|
"shell",
|
|
"bash",
|
|
):
|
|
# likely a codeblock language identifier
|
|
lines = lines[1:]
|
|
return "\n".join(lines)
|
|
|
|
@niobot.command("eval")
|
|
@niobot.is_owner()
|
|
async def python_eval(self, ctx: niobot.Context, code: str):
|
|
"""
|
|
Evaluates python code.
|
|
|
|
All code is automatically wrapped in an async function, so you can do top-level awaits.
|
|
You must return a value for it to be printed, or manually print() it.
|
|
|
|
The following special variables are available:
|
|
|
|
* `ctx` - The current context.
|
|
* `loop` - The current event loop.
|
|
* `stdout` - A StringIO object that you can write to print to stdout.
|
|
* `stderr` - A StringIO object that you can write to print to stderr.
|
|
* `_print` - The builtin print function.
|
|
* `print` - A partial of the builtin print function that prints to the stdout string IO
|
|
"""
|
|
if not await self.owner_check(ctx):
|
|
return
|
|
code = self.undress_codeblock(code)
|
|
stdout = io.StringIO()
|
|
stderr = io.StringIO()
|
|
|
|
g = {
|
|
**globals().copy(),
|
|
**locals().copy(),
|
|
"bot": ctx.bot,
|
|
"ctx": ctx,
|
|
"loop": asyncio.get_event_loop(),
|
|
"stdout": stdout,
|
|
"stderr": stderr,
|
|
"_print": print,
|
|
"print": functools.partial(print, file=stdout),
|
|
"niobot": niobot,
|
|
"pprint": pprint.pprint,
|
|
}
|
|
code = code.replace("\u00a0", " ") # 00A0 is
|
|
code = textwrap.indent(code, " ")
|
|
code = f"async def __eval():\n{code}"
|
|
msg = await ctx.respond(f"Evaluating:\n```py\n{code}\n```")
|
|
e = await self.client.add_reaction(ctx.room, ctx.message, "\N{HAMMER}")
|
|
# noinspection PyBroadException
|
|
try:
|
|
start = time.time() * 1000
|
|
runner = await niobot.run_blocking(exec, code, g)
|
|
end_compile = time.time() * 1000
|
|
result = await g["__eval"]()
|
|
end_exec = time.time() * 1000
|
|
total_time = end_exec - start
|
|
time_str = "%.2fms (compile: %.2fms, exec: %.2fms)" % (
|
|
total_time,
|
|
end_compile - start,
|
|
end_exec - end_compile,
|
|
)
|
|
lines = ["Time: " + time_str + "\n"]
|
|
if result is not None:
|
|
if isinstance(result, (list, dict, tuple, set, int, float)):
|
|
result = pprint.pformat(
|
|
result,
|
|
indent=4,
|
|
width=80,
|
|
underscore_numbers=True,
|
|
)
|
|
else:
|
|
result = repr(result)
|
|
lines += ["Result:\n", "```", str(result), "```\n"]
|
|
else:
|
|
lines += ["No result.\n"]
|
|
if stdout.getvalue():
|
|
lines.append("Stdout:\n```\n" + stdout.getvalue() + "```")
|
|
if stderr.getvalue():
|
|
lines.append("Stderr:\n```\n" + stderr.getvalue() + "```")
|
|
await ctx.client.add_reaction(
|
|
ctx.room, ctx.message, "\N{WHITE HEAVY CHECK MARK}"
|
|
)
|
|
await msg.edit("\n".join(lines))
|
|
except Exception:
|
|
await ctx.client.add_reaction(ctx.room, ctx.message, "\N{CROSS MARK}")
|
|
await msg.edit(f"Error:\n```py\n{traceback.format_exc()}```")
|
|
|
|
@niobot.command("shell")
|
|
@niobot.is_owner()
|
|
async def shell(self, ctx: niobot.Context, command: str):
|
|
"""Runs a shell command in a subprocess. Does not output live."""
|
|
if command.startswith("sh\n"):
|
|
command = command[3:]
|
|
if command.startswith("$ "):
|
|
command = command[2:]
|
|
|
|
if not await self.owner_check(ctx):
|
|
return
|
|
|
|
msg = await ctx.respond(f"Running command: `{command}`")
|
|
cmd, args = command.split(" ", 1)
|
|
await self.client.add_reaction(ctx.room, ctx.message, "\N{HAMMER}")
|
|
# noinspection PyBroadException
|
|
try:
|
|
with tempfile.TemporaryDirectory() as tmpdir:
|
|
proc = await asyncio.create_subprocess_exec(
|
|
cmd,
|
|
*shlex.split(args),
|
|
stdout=asyncio.subprocess.PIPE,
|
|
stderr=asyncio.subprocess.PIPE,
|
|
stdin=asyncio.subprocess.DEVNULL,
|
|
cwd=tmpdir,
|
|
)
|
|
await proc.wait()
|
|
await ctx.client.add_reaction(
|
|
ctx.room, ctx.message, "\N{WHITE HEAVY CHECK MARK}"
|
|
)
|
|
lines = [f"Input:\n```sh\n$ {cmd} {args}\n```\n"]
|
|
stdout = (
|
|
(await proc.stdout.read())
|
|
.decode()
|
|
.replace("```", "`\u200b`\u200b`")
|
|
)
|
|
stderr = (
|
|
(await proc.stderr.read())
|
|
.decode()
|
|
.replace("```", "`\u200b`\u200b`")
|
|
)
|
|
if stdout:
|
|
lines.append("Stdout:\n```\n" + stdout + "```\n")
|
|
if stderr:
|
|
lines.append("Stderr:\n```\n" + stderr + "```\n")
|
|
await msg.edit("\n".join(lines))
|
|
except Exception:
|
|
await ctx.client.add_reaction(ctx.room, ctx.message, "\N{CROSS MARK}")
|
|
await msg.edit(f"Error:\n```py\n{traceback.format_exc()}```")
|
|
|
|
@niobot.command("thumbnail")
|
|
@niobot.is_owner()
|
|
async def thumbnail(
|
|
self, ctx: niobot.Context, url: str, width: int = 320, height: int = 240
|
|
):
|
|
"""Get the thumbnail for a URL.
|
|
|
|
URL can be an mxc, in which case C2S will be used to get a thumbnail.
|
|
Supplying a HTTP URL will use niobot thumbnailing."""
|
|
if url.startswith("mxc://"):
|
|
parsed = urlparse(url)
|
|
media_id = parsed.path.lstrip("/")
|
|
thumbnail = await ctx.bot.thumbnail(
|
|
parsed.netloc, media_id, width=width, height=height
|
|
)
|
|
if not isinstance(thumbnail, niobot.ThumbnailResponse):
|
|
return await ctx.respond("Failed to thumbnail: %r" % thumbnail)
|
|
fn = thumbnail.filename or "thumbnail.webp"
|
|
if isinstance(thumbnail.body, bytes):
|
|
body = io.BytesIO(thumbnail.body)
|
|
else:
|
|
body = thumbnail.body
|
|
file = await niobot.ImageAttachment.from_file(body, fn)
|
|
return await ctx.respond("Downloaded from %r" % url, file=file)
|
|
|
|
async with aiohttp.ClientSession(
|
|
headers={"User-Agent": niobot.__user_agent__}
|
|
) as client:
|
|
async with client.get(url) as response:
|
|
if response.status != 200:
|
|
await ctx.respond("Error: %d" % response.status)
|
|
return
|
|
data = await response.read()
|
|
data = io.BytesIO(data)
|
|
thumb = await niobot.run_blocking(
|
|
niobot.ImageAttachment.thumbnailify_image,
|
|
data,
|
|
)
|
|
thumb.save("file.webp", "webp")
|
|
attachment = await niobot.ImageAttachment.from_file(
|
|
"file.webp", generate_blurhash=True
|
|
)
|
|
await ctx.respond("thumbnail.webp", file=attachment)
|
|
|
|
@niobot.command()
|
|
@niobot.is_owner()
|
|
async def runas(self, ctx: niobot.Context, user: str, command: str, *args):
|
|
"""Run a command as another user."""
|
|
if args is not None and args[0] in ["%null%", None] or args is None:
|
|
args = tuple()
|
|
|
|
if args:
|
|
args = map(str, args)
|
|
event = ctx.event
|
|
event.sender = user
|
|
event.body = ctx.invoking_prefix + command
|
|
if args:
|
|
event.body += " " + " ".join(args)
|
|
try:
|
|
result = await self.bot.process_message(ctx.room, event)
|
|
await ctx.respond(f"Result of runas: `{result!r}`")
|
|
except Exception as e:
|
|
await ctx.respond(f"Error in runas process: `{e!r}`")
|
|
|
|
@niobot.command()
|
|
@niobot.is_owner()
|
|
async def rooms(self, ctx: niobot.Context, operation: str, room: str = None):
|
|
"""room management"""
|
|
match operation.casefold():
|
|
case "join":
|
|
result = await self.bot.join(room)
|
|
return await ctx.respond(repr(result))
|
|
case "leave":
|
|
if room.startswith("#"):
|
|
resolved = await self.bot.room_resolve_alias(room)
|
|
if not isinstance(resolved, niobot.RoomResolveAliasResponse):
|
|
return await ctx.respond(
|
|
"Failed to resolve room ID for: %r", room
|
|
)
|
|
room = resolved.room_id
|
|
response = await self.bot.room_leave(room)
|
|
return await ctx.respond(repr(response))
|
|
case _:
|
|
return await ctx.respond("Unknown operation.")
|
|
|
|
@niobot.command()
|
|
@niobot.is_owner()
|
|
async def botban(self, ctx: niobot.Context, target: str):
|
|
"""Bans someone from the bot."""
|
|
key = self.bot.redis_key("botban", target)
|
|
if await self.bot.redis.get(key) is not None:
|
|
return await ctx.respond("User/room is already banned.")
|
|
await self.bot.redis.set(key, 1)
|
|
return await ctx.respond("ok")
|
|
|
|
@niobot.command()
|
|
@niobot.is_owner()
|
|
async def botbanlist(self, ctx: niobot.Context):
|
|
return await ctx.respond("idk, keys are hashed lol")
|
|
|
|
@niobot.command()
|
|
@niobot.is_owner()
|
|
async def botunban(self, ctx: niobot.Context, target: str):
|
|
"""Unbans someone from the bot."""
|
|
key = self.bot.redis_key("botban", target)
|
|
if await self.bot.redis.get(key) is None:
|
|
return await ctx.respond("User/room is not banned.")
|
|
await self.bot.redis.delete(key)
|
|
return await ctx.respond("ok")
|