Allow admins to use even more commands

This commit is contained in:
nex 2023-01-09 14:25:44 +00:00
parent ddcaa7a880
commit ae1bed5f66
4 changed files with 71 additions and 33 deletions

View file

@ -72,12 +72,13 @@ class OtherCog(commands.Cog):
options.binary_location = "/usr/bin/firefox"
service = FirefoxService("/usr/bin/geckodriver")
driver = webdriver.Firefox(service=service, options=options)
friendly_url = textwrap.shorten(website, 100)
await ctx.edit(content="Loading website...")
await ctx.edit(content=f"Screenshotting {friendly_url}... (49%)")
await asyncio.to_thread(driver.get, website)
await ctx.edit(content=f"Waiting {render_time:,} seconds to render...")
await ctx.edit(content=f"Screenshotting {friendly_url}... (66%)")
await asyncio.sleep(render_time)
await ctx.edit(content="Taking screenshot...")
await ctx.edit(content="Screenshotting {friendly_url}... (83%)")
domain = re.sub(r"https?://", "", website)
data = await asyncio.to_thread(driver.get_screenshot_as_png)
_io = io.BytesIO()
@ -309,7 +310,7 @@ class OtherCog(commands.Cog):
async def ip(self, ctx: discord.ApplicationContext, detailed: bool = False, secure: bool = True):
"""Gets current IP"""
if not await self.bot.is_owner(ctx.user):
return await ctx.respond("Internal IP: 0.0.0.0\n" "External IP: 0.0.0.0")
return await ctx.respond("Internal IP: 0.0.0.0\nExternal IP: 0.0.0.0")
await ctx.defer(ephemeral=secure)
ips = await self.get_interface_ip_addresses()
@ -351,7 +352,7 @@ class OtherCog(commands.Cog):
ctx: discord.ApplicationContext,
url: str,
browser: discord.Option(str, description="Browser to use", choices=["chrome", "firefox"], default="chrome"),
render_timeout: int = 10,
render_timeout: int = 5,
):
"""Takes a screenshot of a URL"""
await ctx.defer()
@ -363,37 +364,61 @@ class OtherCog(commands.Cog):
url = "http://" + url
url = urlparse(url)
friendly_url = textwrap.shorten(url.geturl(), 100)
await ctx.edit(
content=f"Preparing to screenshot {textwrap.shorten(url.geturl(), 100)}... (checking local filters)"
content=f"Preparing to screenshot {friendly_url}... (0%)"
)
async def blacklist_check() -> bool:
async with aiofiles.open("domains.txt") as blacklist:
for line in await blacklist.readlines():
if not line.strip():
continue
if re.match(line.strip(), url.netloc):
return await ctx.edit(content="That domain is blacklisted.")
return False
# return await ctx.edit(content="That domain is blacklisted.")
return True
await ctx.edit(
content=f"Preparing to screenshot {textwrap.shorten(url.geturl(), 100)}... (checking DNS filters)"
)
async def dns_check() -> Optional[bool]:
try:
for response in await asyncio.to_thread(dns.resolver.resolve, url.hostname, "A"):
if response.address == "0.0.0.0":
return await ctx.edit(content="That domain is filtered.")
except dns.resolver.NXDOMAIN:
return await ctx.edit(content="That domain does not exist.")
except dns.resolver.NoAnswer:
return await ctx.edit(content="DNS resolver did not respond.")
return False
except (dns.resolver.NXDOMAIN, dns.resolver.NoAnswer):
return
else:
return True
await ctx.edit(content=f"Preparing to screenshot {textwrap.shorten(url.geturl(), 100)}... (Filters OK)")
done, pending = await asyncio.wait(
[
asyncio.create_task(blacklist_check(), name="local"),
asyncio.create_task(dns_check(), name="dns"),
],
return_when=asyncio.FIRST_COMPLETED,
)
done = done.pop()
if done.result() is not True:
return await ctx.edit(
content="That domain is blacklisted, doesn't exist, or there was no answer from the DNS server."
)
await ctx.edit(content=f"Preparing to screenshot {friendly_url}... (16%)")
okay = await pending.pop()
if okay is not True:
return await ctx.edit(
content="That domain is blacklisted, doesn't exist, or there was no answer from the DNS server."
)
await ctx.edit(content=f"Screenshotting {textwrap.shorten(url.geturl(), 100)}... (33%)")
try:
screenshot = await self.screenshot_website(ctx, url.geturl(), browser, render_timeout)
except Exception as e:
console.print_exception()
return await ctx.edit(content=f"Error: {e}")
else:
await ctx.edit(content=f"Screenshotting {friendly_url}... (99%)")
await asyncio.sleep(0.5)
await ctx.edit(content="Here's your screenshot!", file=screenshot)
domains = discord.SlashCommandGroup("domains", "Commands for managing domains")

View file

@ -1,7 +1,7 @@
import discord
import orm
from discord.ext import commands
from utils import VerifyCode, Student, VerifyView, get_or_none, console
from utils import VerifyCode, Student, VerifyView, get_or_none, console, owner_or_admin
class VerifyCog(commands.Cog):
@ -38,11 +38,12 @@ class VerifyCog(commands.Cog):
@commands.command(name="de-verify")
@commands.guild_only()
@owner_or_admin()
async def verification_del(self, ctx: commands.Context, *, user: discord.Member):
"""Removes a user's verification status"""
if not await self.bot.is_owner(ctx.author):
if not ctx.author.guild_permissions.administrator:
return await ctx.reply(":x: Permission denied.")
# if not await self.bot.is_owner(ctx.author):
# if not ctx.author.guild_permissions.administrator:
# return await ctx.reply(":x: Permission denied.")
await ctx.trigger_typing()
for code in await VerifyCode.objects.all(bind=user.id):
await code.delete()
@ -59,11 +60,9 @@ class VerifyCog(commands.Cog):
@commands.command(name="verify")
@commands.guild_only()
@owner_or_admin()
async def verification_force(self, ctx: commands.Context, user: discord.Member, _id: str, name: str):
"""Manually verifies someone"""
if not await self.bot.is_owner(ctx.author):
if not ctx.author.guild_permissions.administrator:
return await ctx.reply(":x: Permission denied.")
existing = await Student.objects.create(id=_id, user_id=user.id, name=name)
role = discord.utils.find(lambda r: r.name.lower() == "verified", ctx.guild.roles)
if role and role < ctx.guild.me.top_role:
@ -96,7 +95,7 @@ class VerifyCog(commands.Cog):
)
@commands.command(name="rebind")
@commands.is_owner()
@owner_or_admin()
async def rebind_code(self, ctx: commands.Context, b_number: str, *, user: discord.Member):
# noinspection GrazieInspection
"""Changes which account a B number is bound to"""

View file

@ -30,7 +30,11 @@ extensions = [
"cogs.starboard",
]
for ext in extensions:
try:
bot.load_extension(ext)
except discord.ExtensionFailed as e:
console.log(f"[red]Failed to load extension {ext}: {e}")
else:
console.log(f"Loaded extension [green]{ext}")
bot.loop.run_until_complete(registry.create_all())

View file

@ -1,6 +1,8 @@
from typing import Optional
from urllib.parse import urlparse
from discord.ext import commands
from ._email import *
from .db import *
from .console import *
@ -46,3 +48,11 @@ def hyperlink(url: str, *, text: str = None, max_length: int = None) -> str:
if len(fmt) > max_length:
return url
return fmt
def owner_or_admin():
async def predicate(ctx: commands.Context):
if ctx.author.guild_permissions.administrator or await ctx.bot.is_owner(ctx.author):
return True
raise commands.MissingPermissions(["administrator"])
return commands.check(predicate)