import time from functools import partial import httpx import niobot import ipaddress import dns.asyncresolver import dns.asyncquery import dns.nameserver import dns.resolver import dns.message from urllib.parse import urlparse class DNSCommand(niobot.Module): @niobot.command( name="drill", aliases=[ "dig", "nslookup", "resolve" ] ) async def dns_lookup( self, ctx: niobot.Context, hostname: str, query_type: str = "A", dns_server: str = "internal" ): """Performs a DNS lookup and returns the result for you""" query_type = query_type.upper() if query_type not in ("A", "AAAA"): return await ctx.respond("Sorry, currently only `A` and `AAAA` queries are supported.") query = dns.message.make_query( hostname, query_type ) resolver = dns.asyncresolver.get_default_resolver() if dns_server == "internal": resolver_call = dns.asyncquery.udp_with_fallback where = resolver.nameservers[0] elif dns_server.startswith(("quic", "https", "tls", "udp", "tcp")): parsed_resolver = urlparse(dns_server) if not parsed_resolver.scheme: return await ctx.respond("Invalid URL (no scheme detected)") elif not parsed_resolver.hostname: return await ctx.respond("Invalid URL (no hostname)") real_resolver_url = parsed_resolver.geturl() try: ip = ipaddress.ip_address(parsed_resolver.hostname) if ip.is_private and not self.bot.is_owner(ctx.message.sender): return await ctx.respond("You do not have permission to use internal IPs.") except ValueError: pass # likely not an IP address match parsed_resolver.scheme: case "https": resolver_call = dns.asyncquery.https case "quic": resolver_call = dns.asyncquery.quic case "tls": resolver_call = dns.asyncquery.tls case "udp": resolver_call = dns.asyncquery.udp case "tcp": resolver_call = dns.asyncquery.tcp case _: resolver_call = None if resolver_call is None: return await ctx.respond("Unrecognised scheme %r" % parsed_resolver.scheme) where = real_resolver_url else: try: ip = ipaddress.ip_address(dns_server) if ip.is_private and not self.bot.is_owner(ctx.message.sender): return await ctx.respond("You do not have permission to use internal IPs.") except ValueError: # try and resolve it as a hostname try: msg: dns.resolver.Answer = await dns.asyncresolver.resolve( dns_server, "A", ) except dns.exception.DNSException as e: return await ctx.respond("Failed to resolve %r to an IP to query: `%s`" % (dns_server, e)) ip = ipaddress.ip_address(msg.response.answer[0].name.to_text(True)) if ip.is_private and not self.bot.is_owner(ctx.message.sender): return await ctx.respond("You do not have permission to use internal IPs.") resolver_call = dns.asyncquery.udp_with_fallback where = str(ip) m = await ctx.respond("Running query...") try: start_ts = time.perf_counter() response: dns.message.Message | tuple[dns.message.Message, bool] = await resolver_call( q=query, where=where, timeout=60 ) end_ts = time.perf_counter() if isinstance(response, dns.message.Message): response = (response, False) except dns.exception.DNSException as err: return await m.edit( "There was an error while querying the DNS server:\n```\n%s\n```" % err ) else: response, _ = response lines = [ "Answer for `%s`, using DNS server %r (took %.2fms):" % (hostname, where, (end_ts - start_ts)), "" ] for n, answer in enumerate(response.answer, start=1): lines.append(f"{n}. `{answer.name.to_text(True)}` (`{answer.to_text()}`)") return await m.edit(content="\n".join(lines))