123 lines
4.6 KiB
Python
123 lines
4.6 KiB
Python
import time
|
|
from functools import partial
|
|
|
|
import httpx
|
|
import niobot
|
|
import ipaddress
|
|
import dns.asyncresolver
|
|
import dns.asyncquery
|
|
import dns.nameserver
|
|
import dns.resolver
|
|
import dns.message
|
|
from urllib.parse import urlparse
|
|
|
|
|
|
class DNSCommand(niobot.Module):
|
|
|
|
@niobot.command(
|
|
name="drill",
|
|
aliases=[
|
|
"dig",
|
|
"nslookup",
|
|
"resolve"
|
|
]
|
|
)
|
|
async def dns_lookup(
|
|
self,
|
|
ctx: niobot.Context,
|
|
hostname: str,
|
|
query_type: str = "A",
|
|
dns_server: str = "internal"
|
|
):
|
|
"""Performs a DNS lookup and returns the result for you"""
|
|
query_type = query_type.upper()
|
|
if query_type not in ("A", "AAAA"):
|
|
return await ctx.respond("Sorry, currently only `A` and `AAAA` queries are supported.")
|
|
|
|
query = dns.message.make_query(
|
|
hostname,
|
|
query_type
|
|
)
|
|
|
|
resolver = dns.asyncresolver.get_default_resolver()
|
|
if dns_server == "internal":
|
|
resolver_call = dns.asyncquery.udp_with_fallback
|
|
where = resolver.nameservers[0]
|
|
elif dns_server.startswith(("quic", "https", "tls", "udp", "tcp")):
|
|
parsed_resolver = urlparse(dns_server)
|
|
if not parsed_resolver.scheme:
|
|
return await ctx.respond("Invalid URL (no scheme detected)")
|
|
elif not parsed_resolver.hostname:
|
|
return await ctx.respond("Invalid URL (no hostname)")
|
|
|
|
real_resolver_url = parsed_resolver.geturl()
|
|
try:
|
|
ip = ipaddress.ip_address(parsed_resolver.hostname)
|
|
if ip.is_private and not self.bot.is_owner(ctx.message.sender):
|
|
return await ctx.respond("You do not have permission to use internal IPs.")
|
|
except ValueError:
|
|
pass # likely not an IP address
|
|
|
|
match parsed_resolver.scheme:
|
|
case "https":
|
|
resolver_call = dns.asyncquery.https
|
|
case "quic":
|
|
resolver_call = dns.asyncquery.quic
|
|
case "tls":
|
|
resolver_call = dns.asyncquery.tls
|
|
case "udp":
|
|
resolver_call = dns.asyncquery.udp
|
|
case "tcp":
|
|
resolver_call = dns.asyncquery.tcp
|
|
case _:
|
|
resolver_call = None
|
|
|
|
if resolver_call is None:
|
|
return await ctx.respond("Unrecognised scheme %r" % parsed_resolver.scheme)
|
|
where = real_resolver_url
|
|
else:
|
|
try:
|
|
ip = ipaddress.ip_address(dns_server)
|
|
if ip.is_private and not self.bot.is_owner(ctx.message.sender):
|
|
return await ctx.respond("You do not have permission to use internal IPs.")
|
|
except ValueError:
|
|
# try and resolve it as a hostname
|
|
try:
|
|
msg: dns.resolver.Answer = await dns.asyncresolver.resolve(
|
|
dns_server,
|
|
"A",
|
|
)
|
|
except dns.exception.DNSException as e:
|
|
return await ctx.respond("Failed to resolve %r to an IP to query: `%s`" % (dns_server, e))
|
|
ip = ipaddress.ip_address(msg.response.answer[0].name.to_text(True))
|
|
if ip.is_private and not self.bot.is_owner(ctx.message.sender):
|
|
return await ctx.respond("You do not have permission to use internal IPs.")
|
|
|
|
resolver_call = dns.asyncquery.udp_with_fallback
|
|
where = str(ip)
|
|
|
|
m = await ctx.respond("Running query...")
|
|
|
|
try:
|
|
start_ts = time.perf_counter()
|
|
response: dns.message.Message | tuple[dns.message.Message, bool] = await resolver_call(
|
|
q=query,
|
|
where=where,
|
|
timeout=60
|
|
)
|
|
end_ts = time.perf_counter()
|
|
if isinstance(response, dns.message.Message):
|
|
response = (response, False)
|
|
except dns.exception.DNSException as err:
|
|
return await m.edit(
|
|
"There was an error while querying the DNS server:\n```\n%s\n```" % err
|
|
)
|
|
else:
|
|
response, _ = response
|
|
lines = [
|
|
"Answer for `%s`, using DNS server %r (took %.2fms):" % (hostname, where, (end_ts - start_ts) / 1000),
|
|
""
|
|
]
|
|
for n, answer in enumerate(response.answer, start=1):
|
|
lines.append(f"{n}. `{answer.name.to_text(True)}` (`{answer.to_text()}`)")
|
|
return await m.edit(content="\n".join(lines))
|