college-bot-v2/src/cogs/net.py
nexy7574 bbd74cd1e9
All checks were successful
Build and Publish Jimmy.2 / build_and_publish (push) Successful in 7s
Default to hiding any filtered text
2024-05-01 00:54:55 +01:00

351 lines
12 KiB
Python

import asyncio
import io
import json
import os
import re
import time
import typing
from pathlib import Path
import discord
import httpx
from discord.ext import commands
from dns import asyncresolver
from rich.console import Console
from rich.tree import Tree
from conf import CONFIG
class GetFilteredTextView(discord.ui.View):
def __init__(self, text: str):
self.text = text
super().__init__(timeout=600)
@discord.ui.button(
label="See filtered data",
emoji="\N{INBOX TRAY}"
)
async def see_filtered_data(self, _, interaction: discord.Interaction):
await interaction.response.defer(ephemeral=True)
await interaction.followup.send(
file=discord.File(
io.BytesIO(self.text.encode()),
"filtered.txt"
)
)
class NetworkCog(commands.Cog):
def __init__(self, bot: commands.Bot):
self.bot = bot
self.config = CONFIG["network"]
@commands.slash_command()
async def ping(self, ctx: discord.ApplicationContext, target: str = None):
"""Get the bot's latency, or the network latency to a target."""
if target is None:
return await ctx.respond(f"Pong! {round(self.bot.latency * 1000)}ms")
else:
await ctx.defer()
process = await asyncio.create_subprocess_exec(
"ping",
"-c",
"5",
target,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
stdout, stderr = await process.communicate()
paginator = commands.Paginator()
for line in stdout.splitlines():
paginator.add_line(line.decode("utf-8"))
for line in stderr.splitlines():
paginator.add_line("[STDERR] " + line.decode("utf-8"))
for page in paginator.pages:
await ctx.respond(page)
@commands.slash_command()
async def whois(self, ctx: discord.ApplicationContext, target: str):
"""Get information about a user."""
async def run_command(with_disclaimer: bool = False):
args = [] if with_disclaimer else ["-H"]
process = await asyncio.create_subprocess_exec(
"whois",
*args,
target,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
so, se = await process.communicate()
return so, se, process.returncode
await ctx.defer()
paginator = commands.Paginator()
redacted = io.BytesIO()
stdout, stderr, status = await run_command()
def decide(ln: str) -> typing.Optional[bool]:
if ln.startswith(">>> Last update"):
return
if "REDACTED" in ln or "Please query the WHOIS server of the owning registrar" in ln or ":" not in ln:
return False
else:
return True
for line in stdout.decode().splitlines():
line = line.strip()
if not line:
continue
a = decide(line)
if a:
paginator.add_line(line)
elif a is None:
redacted.write(b"[STDERR] " + line.encode() + b"\n")
for line in stderr.decode().splitlines():
line = line.strip()
if not line:
continue
a = decide(line)
if a:
paginator.add_line("[STDERR] " + line)
elif a is None:
redacted.write(b"[STDERR] " + line.encode() + b"\n")
if not paginator.pages:
stdout, stderr, status = await run_command(with_disclaimer=True)
if not any((stdout, stderr)):
return await ctx.respond(f"No output was returned with status code {status}.")
file = io.BytesIO()
file.write(stdout)
if stderr:
file.write(b"\n----- STDERR -----\n")
file.write(stderr)
file.seek(0)
return await ctx.respond(
"Seemingly all output was filtered. Returning raw command output.",
file=discord.File(file, "whois.txt")
)
last: discord.Interaction | discord.WebhookMessage | None = None
for page in paginator.pages:
last = await ctx.respond(page)
if last and redacted.tell():
view = GetFilteredTextView(redacted.getvalue().decode())
await last.edit(view=view)
@commands.slash_command()
async def dig(
self,
ctx: discord.ApplicationContext,
domain: str,
_type: discord.Option(
str,
name="type",
default="A",
choices=[
"A",
"AAAA",
"ANY",
"AXFR",
"CNAME",
"HINFO",
"LOC",
"MX",
"NS",
"PTR",
"SOA",
"SRV",
"TXT",
],
),
):
"""Looks up a domain name"""
await ctx.defer()
if re.search(r"\s+", domain):
return await ctx.respond("Domain name cannot contain spaces.")
try:
response = await asyncresolver.resolve(
domain,
_type.upper(),
)
except Exception as e:
return await ctx.respond(f"Error: {e}")
res = response
tree = Tree(f"DNS Lookup for {domain}")
for record in res:
record_tree = tree.add(f"{record.rdtype.name} Record")
record_tree.add(f"Name: {res.name}")
record_tree.add(f"Value: {record.to_text()}")
console = Console()
with console.capture() as capture:
console.print(tree)
text = capture.get()
paginator = commands.Paginator()
for line in text.splitlines():
paginator.add_line(line)
paginator.add_line(empty=True)
paginator.add_line(f"Exit code: {0}")
paginator.add_line(f"DNS Server used: {res.nameserver}")
for page in paginator.pages:
await ctx.respond(page)
@commands.slash_command()
async def traceroute(
self,
ctx: discord.ApplicationContext,
url: str,
port: discord.Option(int, description="Port to use", default=None),
ping_type: discord.Option(
str,
name="ping-type",
description="Type of ping to use. See `traceroute --help`",
choices=["icmp", "tcp", "udp", "udplite", "dccp", "default"],
default="default",
),
use_ip_version: discord.Option(
str, name="ip-version", description="IP version to use.", choices=["ipv4", "ipv6"], default="ipv4"
),
max_ttl: discord.Option(int, name="ttl", description="Max number of hops", default=30),
):
"""Performs a traceroute request."""
await ctx.defer()
if re.search(r"\s+", url):
return await ctx.respond("URL cannot contain spaces.")
args = ["sudo", "-E", "-n", "traceroute"]
flags = {
"ping_type": {
"icmp": "-I",
"tcp": "-T",
"udp": "-U",
"udplite": "-UL",
"dccp": "-D",
},
"use_ip_version": {"ipv4": "-4", "ipv6": "-6"},
}
if ping_type == "default" or os.getuid() == 0:
args = args[3:] # removes sudo
else:
args.append(flags["ping_type"][ping_type])
args.append(flags["use_ip_version"][use_ip_version])
args.append("-m")
args.append(str(max_ttl))
if port is not None:
args.append("-p")
args.append(str(port))
args.append(url)
paginator = commands.Paginator()
paginator.add_line(f"Running command: {' '.join(args[3 if args[0] == 'sudo' else 0:])}")
paginator.add_line(empty=True)
try:
start = time.time_ns()
process = await asyncio.create_subprocess_exec(
args[0],
*args[1:],
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
await process.wait()
stdout, stderr = await process.communicate()
end = time.time_ns()
time_taken_in_ms = (end - start) / 1000000
if stdout:
for line in stdout.splitlines():
paginator.add_line(line.decode())
if stderr:
for line in stderr.splitlines():
paginator.add_line(line.decode())
paginator.add_line(empty=True)
paginator.add_line(f"Exit code: {process.returncode}")
paginator.add_line(f"Time taken: {time_taken_in_ms:,.1f}ms")
except Exception as e:
paginator.add_line(f"Error: {e}")
for page in paginator.pages:
await ctx.respond(page)
@commands.slash_command(name="what-are-matthews-bank-details")
async def matthew_bank(self, ctx: discord.ApplicationContext):
"""For the 80th time"""
f = Path.cwd() / "assets" / "sensitive" / "matthew-bank.webp"
if not f.exists():
return await ctx.respond("Idk")
else:
await ctx.defer()
await ctx.respond(file=discord.File(f))
async def _fetch_ip_response(
self,
server: str,
lookup: str,
client: httpx.AsyncClient
) -> tuple[dict, float] | httpx.HTTPError | ConnectionError | json.JSONDecodeError:
try:
start = time.perf_counter()
response = await client.get(f"https://{server}/lookup?ip={lookup}")
end = time.perf_counter()
except (httpx.HTTPError, ConnectionError) as e:
return e
else:
return response.json(), round(end - start, 2)
@commands.slash_command(name="ip")
async def get_ip_address(self, ctx: discord.ApplicationContext, lookup: str = None):
"""Fetches IP info from SHRONK IP servers"""
await ctx.defer()
async with httpx.AsyncClient(headers={"User-Agent": "Mozilla/5.0 Jimmy/v2"}) as client:
if not lookup:
response = await client.get("https://api.ipify.org")
lookup = response.text
servers = self.config.get(
"ip_servers",
[
"ip.shronk.net",
"ip.i-am.nexus",
"ip.shronk.nicroxio.co.uk"
]
)
embed = discord.Embed(
title="IP lookup information for: %s" % lookup,
)
if not servers:
embed.description = "No IP servers configured in config.toml."
embed.colour = discord.Color.red()
return await ctx.respond(embed=embed)
for server in servers:
try:
start = time.perf_counter()
response = await client.get(f"https://{server}/lookup?ip={lookup}")
end = time.perf_counter()
except (httpx.HTTPError, ConnectionError) as e:
embed.add_field(
name=server,
value=f"An error occurred while fetching the data: {e}",
)
else:
try:
v = json.dumps(response.json(), indent=4)
except (ValueError, json.JSONDecodeError) as e:
t = response.text[:512]
embed.add_field(
name=server,
value=f"An error occurred while parsing the data: {e}\nData: ```\n%s\n```" % t,
)
else:
embed.add_field(
name="%s (%.2fms)" % (server, (end - start) * 1000),
value="```json\n%s\n```" % v,
inline=False
)
await ctx.respond(embed=embed)
def setup(bot):
bot.add_cog(NetworkCog(bot))