nonsensebot/app/modules/drill.py

134 lines
5.1 KiB
Python
Raw Normal View History

2024-09-18 16:29:55 +01:00
import time
2024-09-18 18:09:58 +01:00
from email.policy import default
2024-09-18 16:29:55 +01:00
from functools import partial
import httpx
import niobot
import ipaddress
import dns.asyncresolver
import dns.asyncquery
import dns.nameserver
import dns.resolver
import dns.message
from urllib.parse import urlparse
class DNSCommand(niobot.Module):
2024-09-18 18:06:31 +01:00
@niobot.command(name="drill", aliases=["dig", "nslookup", "resolve"])
2024-09-18 16:29:55 +01:00
async def dns_lookup(
2024-09-18 18:06:31 +01:00
self,
ctx: niobot.Context,
hostname: str,
query_type: str = "A",
dns_server: str = "internal",
2024-09-18 16:29:55 +01:00
):
"""Performs a DNS lookup and returns the result for you"""
2024-09-18 18:09:58 +01:00
port = 53
2024-09-18 16:29:55 +01:00
query_type = query_type.upper()
2024-09-18 18:06:31 +01:00
supported = ("A", "AAAA", "MX", "CNAME", "TXT", "PTR")
if query_type not in supported:
return await ctx.respond(
"Sorry, currently only %s queries are supported."
% (", ".join((f"`{x}`" for x in supported)))
)
2024-09-18 16:29:55 +01:00
2024-09-18 18:06:31 +01:00
query = dns.message.make_query(hostname, query_type)
2024-09-18 16:29:55 +01:00
resolver = dns.asyncresolver.get_default_resolver()
if dns_server == "internal":
resolver_call = dns.asyncquery.udp_with_fallback
where = resolver.nameservers[0]
elif dns_server.startswith(("quic", "https", "tls", "udp", "tcp")):
parsed_resolver = urlparse(dns_server)
if not parsed_resolver.scheme:
return await ctx.respond("Invalid URL (no scheme detected)")
elif not parsed_resolver.hostname:
return await ctx.respond("Invalid URL (no hostname)")
try:
ip = ipaddress.ip_address(parsed_resolver.hostname)
if ip.is_private and not self.bot.is_owner(ctx.message.sender):
2024-09-18 18:06:31 +01:00
return await ctx.respond(
"You do not have permission to use internal IPs."
)
2024-09-18 16:29:55 +01:00
except ValueError:
pass # likely not an IP address
match parsed_resolver.scheme:
case "https":
resolver_call = dns.asyncquery.https
2024-09-18 18:09:58 +01:00
default_port = 443
2024-09-18 16:29:55 +01:00
case "quic":
resolver_call = dns.asyncquery.quic
2024-09-18 18:09:58 +01:00
default_port = 443
2024-09-18 16:29:55 +01:00
case "tls":
resolver_call = dns.asyncquery.tls
2024-09-18 18:09:58 +01:00
default_port = 853
2024-09-18 16:29:55 +01:00
case "udp":
resolver_call = dns.asyncquery.udp
2024-09-18 18:09:58 +01:00
default_port = 53
2024-09-18 16:29:55 +01:00
case "tcp":
resolver_call = dns.asyncquery.tcp
2024-09-18 18:09:58 +01:00
default_port = 53
2024-09-18 16:29:55 +01:00
case _:
2024-09-18 18:09:58 +01:00
resolver_call = default_port = None
2024-09-18 16:29:55 +01:00
if resolver_call is None:
2024-09-18 18:06:31 +01:00
return await ctx.respond(
"Unrecognised scheme %r" % parsed_resolver.scheme
)
2024-09-18 18:09:58 +01:00
where = parsed_resolver.hostname
port = parsed_resolver.port or default_port
2024-09-18 16:29:55 +01:00
else:
try:
ip = ipaddress.ip_address(dns_server)
if ip.is_private and not self.bot.is_owner(ctx.message.sender):
2024-09-18 18:06:31 +01:00
return await ctx.respond(
"You do not have permission to use internal IPs."
)
2024-09-18 16:29:55 +01:00
except ValueError:
# try and resolve it as a hostname
try:
msg: dns.resolver.Answer = await dns.asyncresolver.resolve(
dns_server,
"A",
)
except dns.exception.DNSException as e:
2024-09-18 18:06:31 +01:00
return await ctx.respond(
"Failed to resolve %r to an IP to query: `%s`" % (dns_server, e)
)
2024-09-18 16:29:55 +01:00
ip = ipaddress.ip_address(msg.response.answer[0].name.to_text(True))
if ip.is_private and not self.bot.is_owner(ctx.message.sender):
2024-09-18 18:06:31 +01:00
return await ctx.respond(
"You do not have permission to use internal IPs."
)
2024-09-18 16:29:55 +01:00
resolver_call = dns.asyncquery.udp_with_fallback
where = str(ip)
m = await ctx.respond("Running query...")
try:
start_ts = time.perf_counter()
2024-09-18 18:06:31 +01:00
response: (
dns.message.Message | tuple[dns.message.Message, bool]
2024-09-18 18:09:58 +01:00
) = await resolver_call(q=query, where=where, port=port, timeout=60)
2024-09-18 16:29:55 +01:00
end_ts = time.perf_counter()
if isinstance(response, dns.message.Message):
response = (response, False)
except dns.exception.DNSException as err:
return await m.edit(
"There was an error while querying the DNS server:\n```\n%s\n```" % err
)
else:
response, _ = response
lines = [
2024-09-18 18:06:31 +01:00
"Answer for `%s`, using DNS server %r (took %.2fms):"
% (hostname, where, (end_ts - start_ts) / 1000),
"",
2024-09-18 16:29:55 +01:00
]
for n, answer in enumerate(response.answer, start=1):
2024-09-18 18:06:31 +01:00
for item in answer.items.keys():
lines.append(f"* `{item.to_text()}`")
2024-09-18 16:29:55 +01:00
return await m.edit(content="\n".join(lines))