nonsensebot/app/modules/drill.py

134 lines
5.2 KiB
Python

import time
from email.policy import default
from functools import partial
import httpx
import niobot
import ipaddress
import dns.asyncresolver
import dns.asyncquery
import dns.nameserver
import dns.resolver
import dns.message
from dns.quic._common import UnexpectedEOF
from urllib.parse import urlparse
class DNSCommand(niobot.Module):
@niobot.command(name="drill", aliases=["dig", "nslookup", "resolve"])
async def dns_lookup(
self,
ctx: niobot.Context,
hostname: str,
query_type: str = "A",
dns_server: str = "internal",
):
"""Performs a DNS lookup and returns the result for you"""
port = 53
query_type = query_type.upper()
supported = ("A", "AAAA", "MX", "CNAME", "TXT", "PTR")
if query_type not in supported:
return await ctx.respond(
"Sorry, currently only %s queries are supported."
% (", ".join((f"`{x}`" for x in supported)))
)
query = dns.message.make_query(hostname, query_type)
resolver = dns.asyncresolver.get_default_resolver()
if dns_server == "internal":
resolver_call = dns.asyncquery.udp_with_fallback
where = resolver.nameservers[0]
elif dns_server.startswith(("quic", "https", "tls", "udp", "tcp")):
parsed_resolver = urlparse(dns_server)
if not parsed_resolver.scheme:
return await ctx.respond("Invalid URL (no scheme detected)")
elif not parsed_resolver.hostname:
return await ctx.respond("Invalid URL (no hostname)")
try:
ip = ipaddress.ip_address(parsed_resolver.hostname)
if ip.is_private and not self.bot.is_owner(ctx.message.sender):
return await ctx.respond(
"You do not have permission to use internal IPs."
)
except ValueError:
pass # likely not an IP address
match parsed_resolver.scheme:
case "https":
resolver_call = dns.asyncquery.https
default_port = 443
case "quic":
resolver_call = dns.asyncquery.quic
default_port = 443
case "tls":
resolver_call = dns.asyncquery.tls
default_port = 853
case "udp":
resolver_call = dns.asyncquery.udp
default_port = 53
case "tcp":
resolver_call = dns.asyncquery.tcp
default_port = 53
case _:
resolver_call = default_port = None
if resolver_call is None:
return await ctx.respond(
"Unrecognised scheme %r" % parsed_resolver.scheme
)
where = parsed_resolver.hostname
port = parsed_resolver.port or default_port
else:
try:
ip = ipaddress.ip_address(dns_server)
if ip.is_private and not self.bot.is_owner(ctx.message.sender):
return await ctx.respond(
"You do not have permission to use internal IPs."
)
except ValueError:
# try and resolve it as a hostname
try:
msg: dns.resolver.Answer = await dns.asyncresolver.resolve(
dns_server,
"A",
)
except dns.exception.DNSException as e:
return await ctx.respond(
"Failed to resolve %r to an IP to query: `%s`" % (dns_server, e)
)
ip = ipaddress.ip_address(msg.response.answer[0].name.to_text(True))
if ip.is_private and not self.bot.is_owner(ctx.message.sender):
return await ctx.respond(
"You do not have permission to use internal IPs."
)
resolver_call = dns.asyncquery.udp_with_fallback
where = str(ip)
m = await ctx.respond("Running query...")
try:
start_ts = time.perf_counter()
response: (
dns.message.Message | tuple[dns.message.Message, bool]
) = await resolver_call(q=query, where=where, port=port, timeout=60)
end_ts = time.perf_counter()
if isinstance(response, dns.message.Message):
response = (response, False)
except (dns.exception.DNSException, UnexpectedEOF, httpx.HTTPError, ValueError) as err:
return await m.edit(
"There was an error while querying the DNS server:\n```\n%s\n```" % err
)
else:
response, _ = response
lines = [
"Answer for `%s`, using DNS server %r (took %.2fms):"
% (hostname, where, (end_ts - start_ts) / 1000),
"",
]
for n, answer in enumerate(response.answer, start=1):
for item in answer.items.keys():
lines.append(f"* `{item.to_text()}`")
return await m.edit(content="\n".join(lines))