From c0b9188be11393ba52f4dd69dd47b6a3fceb0aa7 Mon Sep 17 00:00:00 2001 From: nex Date: Sun, 29 Jan 2023 19:17:44 +0000 Subject: [PATCH] Add traceroute command --- .idea/dataSources.local.xml | 2 +- cogs/other.py | 98 ++++++++++++++++++++++++++++++++++++- 2 files changed, 98 insertions(+), 2 deletions(-) diff --git a/.idea/dataSources.local.xml b/.idea/dataSources.local.xml index 92c980b..4316a9e 100644 --- a/.idea/dataSources.local.xml +++ b/.idea/dataSources.local.xml @@ -1,6 +1,6 @@ - + " diff --git a/cogs/other.py b/cogs/other.py index ec305ea..b8a0605 100644 --- a/cogs/other.py +++ b/cogs/other.py @@ -8,7 +8,7 @@ import textwrap import dns.resolver from dns import asyncresolver import aiofiles -from time import time +from time import time, time_ns from typing import Literal from typing import Tuple, Optional, Dict from pathlib import Path @@ -489,6 +489,102 @@ class OtherCog(commands.Cog): for page in paginator.pages: await ctx.respond(page) + @commands.slash_command() + async def traceroute( + self, + ctx: discord.ApplicationContext, + url: str, + port: discord.Option( + int, + description="Port to use", + default=None + ), + ping_type: discord.Option( + str, + name="ping-type", + description="Type of ping to use. See `traceroute --help`", + choices=["icmp", "tcp", "udp", "udplite", "dccp", "default"], + default="default" + ), + use_ip_version: discord.Option( + str, + name="ip-version", + description="IP version to use.", + choices=["ipv4", "ipv6"], + default="ipv4" + ), + max_ttl: discord.Option( + int, + name="ttl", + description="Max number of hops", + default=30 + ) + ): + """Performs a traceroute request.""" + await ctx.defer() + if re.search(r"\s+", url): + return await ctx.respond("URL cannot contain spaces.") + + args = [ + "sudo", + "-E", + "-n", + "traceroute" + ] + flags = { + "ping_type": { + "icmp": "-I", + "tcp": "-T", + "udp": "-U", + "udplite": "-UL", + "dccp": "-D", + }, + "use_ip_version": { + "ipv4": "-4", + "ipv6": "-6" + } + } + + if ping_type != "default": + args.append(flags["ping_type"][ping_type]) + else: + args = args[3:] # removes sudo + args.append(flags["use_ip_version"][use_ip_version]) + args.append("-m") + args.append(str(max_ttl)) + if port is not None: + args.append("-p") + args.append(str(port)) + args.append(url) + paginator = commands.Paginator() + paginator.add_line(f"Running command: {' '.join(args[3 if args[0] == 'sudo' else 0:])}") + paginator.add_line(empty=True) + try: + start = time_ns() + process = await asyncio.create_subprocess_exec( + args[0], + *args[1:], + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.PIPE, + ) + await process.wait() + stdout, stderr = await process.communicate() + end = time_ns() + time_taken_in_ms = (end - start) / 1000000 + if stdout: + for line in stdout.splitlines(): + paginator.add_line(line.decode()) + if stderr: + for line in stderr.splitlines(): + paginator.add_line(line.decode()) + paginator.add_line(empty=True) + paginator.add_line(f"Exit code: {process.returncode}") + paginator.add_line(f"Time taken: {time_taken_in_ms:,.1f}ms") + except Exception as e: + paginator.add_line(f"Error: {e}") + for page in paginator.pages: + await ctx.respond(page) + @commands.slash_command() @commands.max_concurrency(1, commands.BucketType.user) @commands.cooldown(1, 30, commands.BucketType.user)