import time from email.policy import default from functools import partial import httpx import niobot import ipaddress import dns.asyncresolver import dns.asyncquery import dns.nameserver import dns.resolver import dns.message from dns.quic._common import UnexpectedEOF from urllib.parse import urlparse class DNSCommand(niobot.Module): @niobot.command(name="drill", aliases=["dig", "nslookup", "resolve"]) async def dns_lookup( self, ctx: niobot.Context, hostname: str, query_type: str = "A", dns_server: str = "internal", ): """Performs a DNS lookup and returns the result for you""" port = 53 query_type = query_type.upper() supported = ("A", "AAAA", "MX", "CNAME", "TXT", "PTR") if query_type not in supported: return await ctx.respond( "Sorry, currently only %s queries are supported." % (", ".join((f"`{x}`" for x in supported))) ) query = dns.message.make_query(hostname, query_type) resolver = dns.asyncresolver.get_default_resolver() if dns_server == "internal": resolver_call = dns.asyncquery.udp_with_fallback where = resolver.nameservers[0] elif dns_server.startswith(("quic", "https", "tls", "udp", "tcp")): parsed_resolver = urlparse(dns_server) if not parsed_resolver.scheme: return await ctx.respond("Invalid URL (no scheme detected)") elif not parsed_resolver.hostname: return await ctx.respond("Invalid URL (no hostname)") try: ip = ipaddress.ip_address(parsed_resolver.hostname) if ip.is_private and not self.bot.is_owner(ctx.message.sender): return await ctx.respond( "You do not have permission to use internal IPs." ) except ValueError: pass # likely not an IP address match parsed_resolver.scheme: case "https": resolver_call = dns.asyncquery.https default_port = 443 case "quic": resolver_call = dns.asyncquery.quic default_port = 443 case "tls": resolver_call = dns.asyncquery.tls default_port = 853 case "udp": resolver_call = dns.asyncquery.udp default_port = 53 case "tcp": resolver_call = dns.asyncquery.tcp default_port = 53 case _: resolver_call = default_port = None if resolver_call is None: return await ctx.respond( "Unrecognised scheme %r" % parsed_resolver.scheme ) where = parsed_resolver.hostname port = parsed_resolver.port or default_port else: try: ip = ipaddress.ip_address(dns_server) if ip.is_private and not self.bot.is_owner(ctx.message.sender): return await ctx.respond( "You do not have permission to use internal IPs." ) except ValueError: # try and resolve it as a hostname try: msg: dns.resolver.Answer = await dns.asyncresolver.resolve( dns_server, "A", ) except dns.exception.DNSException as e: return await ctx.respond( "Failed to resolve %r to an IP to query: `%s`" % (dns_server, e) ) ip = ipaddress.ip_address(msg.response.answer[0].name.to_text(True)) if ip.is_private and not self.bot.is_owner(ctx.message.sender): return await ctx.respond( "You do not have permission to use internal IPs." ) resolver_call = dns.asyncquery.udp_with_fallback where = str(ip) m = await ctx.respond("Running query...") try: start_ts = time.perf_counter() response: ( dns.message.Message | tuple[dns.message.Message, bool] ) = await resolver_call(q=query, where=where, port=port, timeout=60) end_ts = time.perf_counter() if isinstance(response, dns.message.Message): response = (response, False) except (dns.exception.DNSException, UnexpectedEOF, httpx.HTTPError, ValueError) as err: return await m.edit( "There was an error while querying the DNS server:\n```\n%s\n```" % err ) else: response, _ = response lines = [ "Answer for `%s`, using DNS server %r (took %.2fms):" % (hostname, where, (end_ts - start_ts) / 1000), "", ] for n, answer in enumerate(response.answer, start=1): for item in answer.items.keys(): lines.append(f"* `{item.to_text()}`") return await m.edit(content="\n".join(lines))