Add traceroute command

This commit is contained in:
nex 2023-01-29 19:17:44 +00:00
parent 883451ee41
commit c0b9188be1
2 changed files with 98 additions and 2 deletions

View file

@ -1,6 +1,6 @@
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="dataSourceStorageLocal" created-in="PY-223.8214.51">
<component name="dataSourceStorageLocal" created-in="PY-223.8617.48">
<data-source name="main" uuid="28efee07-d306-4126-bf69-01008b4887e2">
<database-info product="SQLite" version="3.39.2" jdbc-version="2.1" driver-name="SQLite JDBC" driver-version="3.39.2.0" dbms="SQLITE" exact-version="3.39.2" exact-driver-version="3.39">
<identifier-quote-string>&quot;</identifier-quote-string>

View file

@ -8,7 +8,7 @@ import textwrap
import dns.resolver
from dns import asyncresolver
import aiofiles
from time import time
from time import time, time_ns
from typing import Literal
from typing import Tuple, Optional, Dict
from pathlib import Path
@ -489,6 +489,102 @@ class OtherCog(commands.Cog):
for page in paginator.pages:
await ctx.respond(page)
@commands.slash_command()
async def traceroute(
self,
ctx: discord.ApplicationContext,
url: str,
port: discord.Option(
int,
description="Port to use",
default=None
),
ping_type: discord.Option(
str,
name="ping-type",
description="Type of ping to use. See `traceroute --help`",
choices=["icmp", "tcp", "udp", "udplite", "dccp", "default"],
default="default"
),
use_ip_version: discord.Option(
str,
name="ip-version",
description="IP version to use.",
choices=["ipv4", "ipv6"],
default="ipv4"
),
max_ttl: discord.Option(
int,
name="ttl",
description="Max number of hops",
default=30
)
):
"""Performs a traceroute request."""
await ctx.defer()
if re.search(r"\s+", url):
return await ctx.respond("URL cannot contain spaces.")
args = [
"sudo",
"-E",
"-n",
"traceroute"
]
flags = {
"ping_type": {
"icmp": "-I",
"tcp": "-T",
"udp": "-U",
"udplite": "-UL",
"dccp": "-D",
},
"use_ip_version": {
"ipv4": "-4",
"ipv6": "-6"
}
}
if ping_type != "default":
args.append(flags["ping_type"][ping_type])
else:
args = args[3:] # removes sudo
args.append(flags["use_ip_version"][use_ip_version])
args.append("-m")
args.append(str(max_ttl))
if port is not None:
args.append("-p")
args.append(str(port))
args.append(url)
paginator = commands.Paginator()
paginator.add_line(f"Running command: {' '.join(args[3 if args[0] == 'sudo' else 0:])}")
paginator.add_line(empty=True)
try:
start = time_ns()
process = await asyncio.create_subprocess_exec(
args[0],
*args[1:],
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
await process.wait()
stdout, stderr = await process.communicate()
end = time_ns()
time_taken_in_ms = (end - start) / 1000000
if stdout:
for line in stdout.splitlines():
paginator.add_line(line.decode())
if stderr:
for line in stderr.splitlines():
paginator.add_line(line.decode())
paginator.add_line(empty=True)
paginator.add_line(f"Exit code: {process.returncode}")
paginator.add_line(f"Time taken: {time_taken_in_ms:,.1f}ms")
except Exception as e:
paginator.add_line(f"Error: {e}")
for page in paginator.pages:
await ctx.respond(page)
@commands.slash_command()
@commands.max_concurrency(1, commands.BucketType.user)
@commands.cooldown(1, 30, commands.BucketType.user)