college-bot-v1/cogs/other.py

483 lines
20 KiB
Python
Raw Normal View History

2022-12-01 13:34:26 +00:00
import asyncio
2022-12-01 12:22:11 +00:00
import io
import os
2023-01-03 13:56:23 +00:00
import random
import re
2023-01-03 15:17:09 +00:00
import textwrap
2023-01-03 15:12:09 +00:00
import dns.resolver
2023-01-03 15:17:09 +00:00
import aiofiles
2023-01-03 13:56:23 +00:00
from time import sleep as time_sleep
from typing import Literal
2022-12-29 17:41:41 +00:00
from typing import Tuple, Optional, Dict
2023-01-03 13:56:23 +00:00
from pathlib import Path
2023-01-03 14:43:49 +00:00
from urllib.parse import urlparse
2022-11-13 23:16:47 +00:00
import aiohttp
2023-01-03 13:56:23 +00:00
import discord
import psutil
2022-11-13 23:16:47 +00:00
from discord.ext import commands
2022-12-29 17:41:41 +00:00
from rich.tree import Tree
2023-01-03 13:56:23 +00:00
from selenium import webdriver
from selenium.webdriver.chrome.options import Options as ChromeOptions
from selenium.webdriver.chrome.service import Service as ChromeService
from selenium.webdriver.firefox.options import Options as FirefoxOptions
from selenium.webdriver.firefox.service import Service as FirefoxService
2022-12-29 17:41:41 +00:00
from utils import console
2022-11-13 23:16:47 +00:00
# noinspection DuplicatedCode
2022-11-13 23:16:47 +00:00
class OtherCog(commands.Cog):
def __init__(self, bot):
self.bot = bot
2022-11-14 17:20:31 +00:00
2023-01-03 15:20:50 +00:00
async def screenshot_website(
2023-01-09 14:36:32 +00:00
self, ctx: discord.ApplicationContext, website: str, driver: Literal["chrome", "firefox"], render_time: int = 10
2023-01-03 15:20:50 +00:00
) -> discord.File:
2023-01-03 13:56:23 +00:00
if not Path("/usr/bin/firefox").exists():
2023-01-03 15:20:50 +00:00
driver = "chrome"
2023-01-03 13:56:23 +00:00
if not Path("/usr/bin/geckodriver").exists():
2023-01-03 15:20:50 +00:00
driver = "chrome"
2023-01-03 13:56:23 +00:00
2023-01-03 15:20:50 +00:00
if driver == "chrome":
2023-01-03 13:56:23 +00:00
options = ChromeOptions()
options.add_argument("--headless")
options.add_argument("--no-sandbox")
options.add_argument("--disable-dev-shm-usage")
options.add_argument("--disable-gpu")
options.add_argument("--window-size=1920x1080")
options.add_argument("--disable-extensions")
options.add_argument("--incognito")
for opt in ("chrome", "chromium"):
if Path(f"/usr/bin/{opt}").exists():
options.binary_location = f"/usr/bin/{opt}"
break
else:
options.binary_location = "/usr/bin/chromium"
service = ChromeService("/usr/bin/chromedriver")
driver = webdriver.Chrome(service=service, options=options)
else:
options = FirefoxOptions()
2023-01-03 15:20:50 +00:00
options.add_argument("--headless")
2023-01-03 13:56:23 +00:00
options.add_argument("--private-window")
options.add_argument("--safe-mode")
options.add_argument("--new-instance")
for opt in ("firefox", "firefox-esr"):
if Path(f"/usr/bin/{opt}").exists():
options.binary_location = f"/usr/bin/{opt}"
break
else:
options.binary_location = "/usr/bin/firefox"
service = FirefoxService("/usr/bin/geckodriver")
driver = webdriver.Firefox(service=service, options=options)
2023-01-09 14:25:44 +00:00
friendly_url = textwrap.shorten(website, 100)
2023-01-03 13:56:23 +00:00
2023-01-09 14:36:32 +00:00
async def _edit(content: str):
self.bot.loop.create_task(ctx.interaction.edit_original_response(content=content))
await _edit(content=f"Screenshotting {friendly_url}... (49%)")
2023-01-03 14:32:21 +00:00
await asyncio.to_thread(driver.get, website)
2023-01-09 14:36:32 +00:00
await _edit(content=f"Screenshotting {friendly_url}... (66%)")
2023-01-03 14:29:33 +00:00
await asyncio.sleep(render_time)
2023-01-09 14:36:32 +00:00
await _edit(content=f"Screenshotting {friendly_url}... (83%)")
2023-01-03 13:56:23 +00:00
domain = re.sub(r"https?://", "", website)
2023-01-03 14:33:22 +00:00
data = await asyncio.to_thread(driver.get_screenshot_as_png)
2023-01-03 13:56:23 +00:00
_io = io.BytesIO()
2023-01-03 14:29:33 +00:00
_io.write(data)
2023-01-03 13:56:23 +00:00
_io.seek(0)
driver.quit()
return discord.File(_io, f"{domain}.png")
2022-12-29 17:41:41 +00:00
@staticmethod
async def get_interface_ip_addresses() -> Dict[str, list[Dict[str, str | bool | int]]]:
addresses = await asyncio.to_thread(psutil.net_if_addrs)
stats = await asyncio.to_thread(psutil.net_if_stats)
result = {}
for key in addresses.keys():
result[key] = []
for ip_addr in addresses[key]:
if ip_addr.broadcast is None:
continue
else:
result[key].append(
{
"ip": ip_addr.address,
"netmask": ip_addr.netmask,
"broadcast": ip_addr.broadcast,
"up": stats[key].isup,
2023-01-03 15:20:50 +00:00
"speed": stats[key].speed,
2022-12-29 17:41:41 +00:00
}
)
return result
async def analyse_text(self, text: str) -> Optional[Tuple[float, float, float, float]]:
"""Analyse text for positivity, negativity and neutrality."""
def inner():
try:
from utils.sentiment_analysis import intensity_analyser
except ImportError:
return None
scores = intensity_analyser.polarity_scores(text)
return scores["pos"], scores["neu"], scores["neg"], scores["compound"]
async with self.bot.training_lock:
return await self.bot.loop.run_in_executor(None, inner)
2022-11-14 17:20:31 +00:00
@staticmethod
async def get_xkcd(session: aiohttp.ClientSession, n: int) -> dict | None:
async with session.get("https://xkcd.com/{!s}/info.0.json".format(n)) as response:
if response.status == 200:
2022-11-13 23:16:47 +00:00
data = await response.json()
2022-11-14 17:20:31 +00:00
return data
@staticmethod
async def random_xkcd_number(session: aiohttp.ClientSession) -> int:
async with session.get("https://c.xkcd.com/random/comic") as response:
if response.status != 302:
number = random.randint(100, 999)
else:
number = int(response.headers["location"].split("/")[-2])
2022-11-14 17:20:31 +00:00
return number
@staticmethod
async def random_xkcd(session: aiohttp.ClientSession) -> dict | None:
"""Fetches a random XKCD.
Basically a shorthand for random_xkcd_number and get_xkcd.
"""
number = await OtherCog.random_xkcd_number(session)
return await OtherCog.get_xkcd(session, number)
@staticmethod
def get_xkcd_embed(data: dict) -> discord.Embed:
2022-11-13 23:16:47 +00:00
embed = discord.Embed(
title=data["safe_title"], description=data["alt"], color=discord.Colour.embed_background()
2022-11-13 23:16:47 +00:00
)
embed.set_footer(text="XKCD #{!s}".format(data["num"]))
embed.set_image(url=data["img"])
2022-11-14 17:20:31 +00:00
return embed
@staticmethod
async def generate_xkcd(n: int = None) -> discord.Embed:
async with aiohttp.ClientSession() as session:
if n is None:
data = await OtherCog.random_xkcd(session)
n = data["num"]
2022-11-14 17:20:31 +00:00
else:
data = await OtherCog.get_xkcd(session, n)
if data is None:
return discord.Embed(
title="Failed to load XKCD :(", description="Try again later.", color=discord.Colour.red()
2022-11-14 17:20:31 +00:00
).set_footer(text="Attempted to retrieve XKCD #{!s}".format(n))
return OtherCog.get_xkcd_embed(data)
class XKCDGalleryView(discord.ui.View):
def __init__(self, n: int):
super().__init__(timeout=300, disable_on_timeout=True)
self.n = n
2022-11-16 17:28:47 +00:00
def __rich_repr__(self):
yield "n", self.n
yield "message", self.message
@discord.ui.button(label="Previous", style=discord.ButtonStyle.blurple)
2022-11-14 17:20:31 +00:00
async def previous_comic(self, _, interaction: discord.Interaction):
self.n -= 1
await interaction.response.defer()
await interaction.edit_original_response(embed=await OtherCog.generate_xkcd(self.n))
@discord.ui.button(label="Random", style=discord.ButtonStyle.blurple)
2022-11-14 17:20:31 +00:00
async def random_comic(self, _, interaction: discord.Interaction):
await interaction.response.defer()
await interaction.edit_original_response(embed=await OtherCog.generate_xkcd())
self.n = random.randint(1, 999)
@discord.ui.button(label="Next", style=discord.ButtonStyle.blurple)
2022-11-14 17:20:31 +00:00
async def next_comic(self, _, interaction: discord.Interaction):
self.n += 1
await interaction.response.defer()
await interaction.edit_original_response(embed=await OtherCog.generate_xkcd(self.n))
@commands.slash_command()
async def xkcd(self, ctx: discord.ApplicationContext, *, number: int = None):
"""Shows an XKCD comic"""
embed = await self.generate_xkcd(number)
view = self.XKCDGalleryView(number)
return await ctx.respond(embed=embed, view=view)
2022-11-13 23:16:47 +00:00
@commands.slash_command()
async def sentiment(self, ctx: discord.ApplicationContext, *, text: str):
"""Attempts to detect a text's tone"""
await ctx.defer()
if not text:
return await ctx.respond("You need to provide some text to analyse.")
result = await self.analyse_text(text)
if result is None:
return await ctx.edit(content="Failed to load sentiment analysis module.")
embed = discord.Embed(title="Sentiment Analysis", color=discord.Colour.embed_background())
embed.add_field(name="Positive", value="{:.2%}".format(result[0]))
embed.add_field(name="Neutral", value="{:.2%}".format(result[2]))
embed.add_field(name="Negative", value="{:.2%}".format(result[1]))
embed.add_field(name="Compound", value="{:.2%}".format(result[3]))
return await ctx.edit(content=None, embed=embed)
@commands.message_command(name="Detect Sentiment")
async def message_sentiment(self, ctx: discord.ApplicationContext, message: discord.Message):
await ctx.defer()
text = str(message.clean_content)
if not text:
return await ctx.respond("You need to provide some text to analyse.")
await ctx.respond("Analyzing (this may take some time)...")
result = await self.analyse_text(text)
if result is None:
return await ctx.edit(content="Failed to load sentiment analysis module.")
embed = discord.Embed(title="Sentiment Analysis", color=discord.Colour.embed_background())
embed.add_field(name="Positive", value="{:.2%}".format(result[0]))
embed.add_field(name="Neutral", value="{:.2%}".format(result[2]))
embed.add_field(name="Negative", value="{:.2%}".format(result[1]))
embed.add_field(name="Compound", value="{:.2%}".format(result[3]))
embed.url = message.jump_url
return await ctx.edit(content=None, embed=embed)
2022-12-01 12:22:11 +00:00
corrupt_file = discord.SlashCommandGroup(
name="corrupt-file",
description="Corrupts files.",
)
@corrupt_file.command(name="generate")
async def generate_corrupt_file(self, ctx: discord.ApplicationContext, file_name: str, size_in_megabytes: float):
"""Generates a "corrupted" file."""
2022-12-08 11:33:58 +00:00
limit_mb = round(ctx.guild.filesize_limit / 1024 / 1024)
if size_in_megabytes > limit_mb:
2022-12-28 21:14:14 +00:00
return await ctx.respond(
f"File size must be less than {limit_mb} MB.\n"
"Want to corrupt larger files? see https://github.com/EEKIM10/cli-utils#installing-the-right-way"
" (and then run `ruin <file>`)."
)
2022-12-01 12:22:11 +00:00
await ctx.defer()
2022-12-08 11:33:58 +00:00
size = max(min(int(size_in_megabytes * 1024 * 1024), ctx.guild.filesize_limit), 1)
2022-12-01 12:22:11 +00:00
file = io.BytesIO()
2022-12-08 11:35:15 +00:00
file.write(os.urandom(size - 1024))
2022-12-01 12:22:11 +00:00
file.seek(0)
return await ctx.respond(file=discord.File(file, file_name))
2022-12-01 13:30:04 +00:00
@staticmethod
def do_file_corruption(file: io.BytesIO, passes: int, bound_start: int, bound_end: int):
for _ in range(passes):
file.seek(random.randint(bound_start, bound_end))
file.write(os.urandom(random.randint(128, 2048)))
file.seek(0)
2022-12-01 13:32:57 +00:00
return file
2022-12-01 13:30:04 +00:00
2022-12-01 12:22:11 +00:00
@corrupt_file.command(name="ruin")
2022-12-01 12:42:51 +00:00
async def ruin_corrupt_file(
2022-12-28 21:14:14 +00:00
self,
ctx: discord.ApplicationContext,
file: discord.Attachment,
passes: int = 10,
metadata_safety_boundary: float = 5,
2022-12-01 12:42:51 +00:00
):
2022-12-01 12:22:11 +00:00
"""Takes a file and corrupts parts of it"""
await ctx.defer()
attachment = file
2022-12-01 12:30:48 +00:00
if attachment.size > 8388608:
2022-12-28 21:14:14 +00:00
return await ctx.respond(
"File is too large. Max size 8mb.\n"
"Want to corrupt larger files? see https://github.com/EEKIM10/cli-utils#installing-the-right-way"
" (and then run `ruin <file>`)."
)
2022-12-01 12:42:51 +00:00
bound_pct = attachment.size * (0.01 * metadata_safety_boundary)
bound_start = round(bound_pct)
bound_end = round(attachment.size - bound_pct)
2022-12-01 13:30:04 +00:00
await ctx.respond("Downloading file...")
2022-12-01 12:22:11 +00:00
file = io.BytesIO(await file.read())
file.seek(0)
2022-12-01 13:30:04 +00:00
await ctx.edit(content="Corrupting file...")
2022-12-28 21:14:14 +00:00
file = await asyncio.to_thread(self.do_file_corruption, file, passes, bound_start, bound_end)
2022-12-01 12:26:29 +00:00
file.seek(0)
2022-12-01 13:30:04 +00:00
await ctx.edit(content="Uploading file...")
await ctx.edit(content="Here's your corrupted file!", file=discord.File(file, attachment.filename))
2022-12-01 12:22:11 +00:00
2022-12-29 17:41:41 +00:00
@commands.command(name="kys", aliases=["kill"])
2022-12-28 21:14:14 +00:00
@commands.is_owner()
async def end_your_life(self, ctx: commands.Context):
await ctx.send(":( okay")
await self.bot.close()
2022-12-29 17:41:41 +00:00
@commands.slash_command()
async def ip(self, ctx: discord.ApplicationContext, detailed: bool = False, secure: bool = True):
"""Gets current IP"""
if not await self.bot.is_owner(ctx.user):
2023-01-09 14:25:44 +00:00
return await ctx.respond("Internal IP: 0.0.0.0\nExternal IP: 0.0.0.0")
2022-12-29 17:41:41 +00:00
await ctx.defer(ephemeral=secure)
ips = await self.get_interface_ip_addresses()
root = Tree("IP Addresses")
internal = root.add("Internal")
external = root.add("External")
interfaces = internal.add("Interfaces")
for interface, addresses in ips.items():
interface_tree = interfaces.add(interface)
for address in addresses:
colour = "green" if address["up"] else "red"
ip_tree = interface_tree.add(
f"[{colour}]" + address["ip"] + ((" (up)" if address["up"] else " (down)") if not detailed else "")
)
if detailed:
ip_tree.add(f"IF Up: {'yes' if address['up'] else 'no'}")
ip_tree.add(f"Netmask: {address['netmask']}")
ip_tree.add(f"Broadcast: {address['broadcast']}")
async with aiohttp.ClientSession() as session:
try:
async with session.get("https://api.ipify.org") as resp:
external.add(await resp.text())
except aiohttp.ClientError as e:
external.add(f" [red]Error: {e}")
with console.capture() as capture:
console.print(root)
text = capture.get()
paginator = commands.Paginator(prefix="```", suffix="```")
for line in text.splitlines():
paginator.add_line(line)
for page in paginator.pages:
await ctx.respond(page, ephemeral=secure)
2022-11-13 23:16:47 +00:00
2023-01-03 13:56:23 +00:00
@commands.slash_command()
async def screenshot(
2023-01-03 15:20:50 +00:00
self,
ctx: discord.ApplicationContext,
url: str,
browser: discord.Option(str, description="Browser to use", choices=["chrome", "firefox"], default="chrome"),
2023-01-09 14:25:44 +00:00
render_timeout: int = 5,
2023-01-03 13:56:23 +00:00
):
"""Takes a screenshot of a URL"""
await ctx.defer()
if ctx.user.id == 1019233057519177778:
if getattr(self.bot, "ALLOW_MATTHEW", False) is False:
return await ctx.respond("No.")
2023-01-03 14:20:24 +00:00
if not url.startswith("http"):
2023-01-04 19:56:56 +00:00
url = "http://" + url
2023-01-03 14:20:24 +00:00
2023-01-03 14:43:49 +00:00
url = urlparse(url)
2023-01-09 14:25:44 +00:00
friendly_url = textwrap.shorten(url.geturl(), 100)
2023-01-03 14:43:49 +00:00
2023-01-03 15:20:50 +00:00
await ctx.edit(
2023-01-09 14:25:44 +00:00
content=f"Preparing to screenshot {friendly_url}... (0%)"
2023-01-03 15:20:50 +00:00
)
2023-01-03 14:46:05 +00:00
2023-01-09 14:36:32 +00:00
async def blacklist_check() -> bool | str:
2023-01-09 14:25:44 +00:00
async with aiofiles.open("domains.txt") as blacklist:
for line in await blacklist.readlines():
if not line.strip():
continue
if re.match(line.strip(), url.netloc):
2023-01-09 14:36:32 +00:00
return "Local blacklist"
2023-01-09 14:25:44 +00:00
# return await ctx.edit(content="That domain is blacklisted.")
return True
2023-01-09 14:36:32 +00:00
async def dns_check() -> Optional[bool | str]:
2023-01-09 14:25:44 +00:00
try:
for response in await asyncio.to_thread(dns.resolver.resolve, url.hostname, "A"):
if response.address == "0.0.0.0":
2023-01-09 14:36:32 +00:00
return "DNS blacklist"
2023-01-09 14:25:44 +00:00
except (dns.resolver.NXDOMAIN, dns.resolver.NoAnswer):
return
else:
return True
done, pending = await asyncio.wait(
[
asyncio.create_task(blacklist_check(), name="local"),
asyncio.create_task(dns_check(), name="dns"),
],
return_when=asyncio.FIRST_COMPLETED,
2023-01-03 15:20:50 +00:00
)
2023-01-09 14:36:32 +00:00
done = done or pending
2023-01-09 14:25:44 +00:00
done = done.pop()
2023-01-09 14:36:32 +00:00
result = await done
if result is not True:
2023-01-09 14:25:44 +00:00
return await ctx.edit(
content="That domain is blacklisted, doesn't exist, or there was no answer from the DNS server."
2023-01-09 14:36:32 +00:00
f" ({result!r})"
2023-01-09 14:25:44 +00:00
)
2023-01-09 14:36:32 +00:00
await asyncio.sleep(1)
2023-01-09 14:25:44 +00:00
await ctx.edit(content=f"Preparing to screenshot {friendly_url}... (16%)")
okay = await pending.pop()
if okay is not True:
return await ctx.edit(
content="That domain is blacklisted, doesn't exist, or there was no answer from the DNS server."
2023-01-09 14:36:32 +00:00
f" ({result!r})"
2023-01-09 14:25:44 +00:00
)
2023-01-09 14:36:32 +00:00
await asyncio.sleep(1)
2023-01-09 14:25:44 +00:00
await ctx.edit(content=f"Screenshotting {textwrap.shorten(url.geturl(), 100)}... (33%)")
2023-01-03 13:56:23 +00:00
try:
2023-01-03 15:20:50 +00:00
screenshot = await self.screenshot_website(ctx, url.geturl(), browser, render_timeout)
2023-01-03 13:56:23 +00:00
except Exception as e:
2023-01-03 14:32:21 +00:00
console.print_exception()
2023-01-03 13:56:23 +00:00
return await ctx.edit(content=f"Error: {e}")
else:
2023-01-09 14:25:44 +00:00
await ctx.edit(content=f"Screenshotting {friendly_url}... (99%)")
await asyncio.sleep(0.5)
2023-01-03 13:56:23 +00:00
await ctx.edit(content="Here's your screenshot!", file=screenshot)
2023-01-03 14:43:49 +00:00
domains = discord.SlashCommandGroup("domains", "Commands for managing domains")
@domains.command(name="add")
async def add_domain(self, ctx: discord.ApplicationContext, domain: str):
"""Adds a domain to the blacklist"""
await ctx.defer()
if not await self.bot.is_owner(ctx.user):
return await ctx.respond("You are not allowed to do that.")
2023-01-03 15:17:09 +00:00
async with aiofiles.open("domains.txt", "a") as blacklist:
await blacklist.write(domain.lower() + "\n")
2023-01-03 14:43:49 +00:00
await ctx.respond("Added domain to blacklist.")
@domains.command(name="remove")
async def remove_domain(self, ctx: discord.ApplicationContext, domain: str):
"""Removes a domain from the blacklist"""
await ctx.defer()
if not await self.bot.is_owner(ctx.user):
return await ctx.respond("You are not allowed to do that.")
2023-01-03 15:17:09 +00:00
async with aiofiles.open("domains.txt") as blacklist:
lines = await blacklist.readlines()
async with aiofiles.open("domains.txt", "w") as blacklist:
2023-01-03 14:43:49 +00:00
for line in lines:
if line.strip() != domain.lower():
2023-01-03 15:17:09 +00:00
await blacklist.write(line)
2023-01-03 14:43:49 +00:00
await ctx.respond("Removed domain from blacklist.")
2023-01-03 15:20:50 +00:00
@commands.group(name="matthew", invoke_without_command=True)
2023-01-03 15:04:21 +00:00
@commands.is_owner()
2023-01-03 15:20:50 +00:00
async def matthew_group(self, ctx: commands.Context):
2023-01-03 15:04:21 +00:00
"""Commands for managing domains"""
await ctx.send_help(ctx.command)
2023-01-03 14:43:49 +00:00
2023-01-03 15:20:50 +00:00
@matthew_group.command(name="enable")
async def enable_matthew(self, ctx: commands.Context):
2023-01-03 14:43:49 +00:00
"""Enables Matthew"""
2023-01-03 15:21:37 +00:00
if not await self.bot.is_owner(ctx.author):
return await ctx.reply("You are not allowed to do that.")
2023-01-03 14:43:49 +00:00
self.bot.ALLOW_MATTHEW = True
2023-01-03 15:20:50 +00:00
await ctx.reply("Matthew enabled.")
2023-01-03 14:43:49 +00:00
2023-01-03 15:20:50 +00:00
@matthew_group.command(name="disable")
async def disable_matthew(self, ctx: commands.Context):
2023-01-03 14:43:49 +00:00
"""Disables Matthew"""
2023-01-03 15:21:37 +00:00
if not await self.bot.is_owner(ctx.author):
return await ctx.reply("You are not allowed to do that.")
2023-01-03 14:43:49 +00:00
self.bot.ALLOW_MATTHEW = False
2023-01-03 15:20:50 +00:00
await ctx.reply("Matthew disabled.")
2023-01-03 14:43:49 +00:00
2022-11-13 23:16:47 +00:00
def setup(bot):
bot.add_cog(OtherCog(bot))