Add support for more DNS types

This commit is contained in:
Nexus 2024-09-18 18:06:31 +01:00
parent 07bb4eb010
commit 820594b089
4 changed files with 46 additions and 38 deletions

View file

@ -46,13 +46,14 @@ def cli(log_level: str, log_file: str | None):
@click.option( @click.option(
"--resume/--no-resume", "--resume/--no-resume",
default=True, default=True,
help="Whether to do a full sync, or resume from previous" help="Whether to do a full sync, or resume from previous",
) )
def run(resume: bool): def run(resume: bool):
"""Runs the bot""" """Runs the bot"""
log.info("Starting bot.") log.info("Starting bot.")
log.debug("Importing instance...") log.debug("Importing instance...")
from .main import bot from .main import bot
log.debug("Import complete.") log.debug("Import complete.")
if resume is False: if resume is False:
@ -70,7 +71,7 @@ def run(resume: bool):
backoff_factor=original_config.backoff_factor, backoff_factor=original_config.backoff_factor,
max_timeout_retry_wait_time=original_config.max_timeout_retry_wait_time, max_timeout_retry_wait_time=original_config.max_timeout_retry_wait_time,
request_timeout=original_config.request_timeout, request_timeout=original_config.request_timeout,
io_chunk_size=original_config.io_chunk_size io_chunk_size=original_config.io_chunk_size,
) )
bot.config = new_config bot.config = new_config
bot.load_store() bot.load_store()

View file

@ -74,7 +74,7 @@ bot = NonsenseBot(
store_path=str(store.resolve()), store_path=str(store.resolve()),
command_prefix=config["bot"].get("prefix", "h!"), command_prefix=config["bot"].get("prefix", "h!"),
owner_id=config["bot"].get("owner_id") or "@nex:nexy7574.co.uk", owner_id=config["bot"].get("owner_id") or "@nex:nexy7574.co.uk",
auto_read_messages=False auto_read_messages=False,
) )
bot.cfg = config bot.cfg = config

View file

@ -13,31 +13,24 @@ from urllib.parse import urlparse
class DNSCommand(niobot.Module): class DNSCommand(niobot.Module):
@niobot.command(name="drill", aliases=["dig", "nslookup", "resolve"])
@niobot.command(
name="drill",
aliases=[
"dig",
"nslookup",
"resolve"
]
)
async def dns_lookup( async def dns_lookup(
self, self,
ctx: niobot.Context, ctx: niobot.Context,
hostname: str, hostname: str,
query_type: str = "A", query_type: str = "A",
dns_server: str = "internal" dns_server: str = "internal",
): ):
"""Performs a DNS lookup and returns the result for you""" """Performs a DNS lookup and returns the result for you"""
query_type = query_type.upper() query_type = query_type.upper()
if query_type not in ("A", "AAAA"): supported = ("A", "AAAA", "MX", "CNAME", "TXT", "PTR")
return await ctx.respond("Sorry, currently only `A` and `AAAA` queries are supported.") if query_type not in supported:
return await ctx.respond(
"Sorry, currently only %s queries are supported."
% (", ".join((f"`{x}`" for x in supported)))
)
query = dns.message.make_query( query = dns.message.make_query(hostname, query_type)
hostname,
query_type
)
resolver = dns.asyncresolver.get_default_resolver() resolver = dns.asyncresolver.get_default_resolver()
if dns_server == "internal": if dns_server == "internal":
@ -54,7 +47,9 @@ class DNSCommand(niobot.Module):
try: try:
ip = ipaddress.ip_address(parsed_resolver.hostname) ip = ipaddress.ip_address(parsed_resolver.hostname)
if ip.is_private and not self.bot.is_owner(ctx.message.sender): if ip.is_private and not self.bot.is_owner(ctx.message.sender):
return await ctx.respond("You do not have permission to use internal IPs.") return await ctx.respond(
"You do not have permission to use internal IPs."
)
except ValueError: except ValueError:
pass # likely not an IP address pass # likely not an IP address
@ -73,13 +68,17 @@ class DNSCommand(niobot.Module):
resolver_call = None resolver_call = None
if resolver_call is None: if resolver_call is None:
return await ctx.respond("Unrecognised scheme %r" % parsed_resolver.scheme) return await ctx.respond(
"Unrecognised scheme %r" % parsed_resolver.scheme
)
where = real_resolver_url where = real_resolver_url
else: else:
try: try:
ip = ipaddress.ip_address(dns_server) ip = ipaddress.ip_address(dns_server)
if ip.is_private and not self.bot.is_owner(ctx.message.sender): if ip.is_private and not self.bot.is_owner(ctx.message.sender):
return await ctx.respond("You do not have permission to use internal IPs.") return await ctx.respond(
"You do not have permission to use internal IPs."
)
except ValueError: except ValueError:
# try and resolve it as a hostname # try and resolve it as a hostname
try: try:
@ -88,10 +87,14 @@ class DNSCommand(niobot.Module):
"A", "A",
) )
except dns.exception.DNSException as e: except dns.exception.DNSException as e:
return await ctx.respond("Failed to resolve %r to an IP to query: `%s`" % (dns_server, e)) return await ctx.respond(
"Failed to resolve %r to an IP to query: `%s`" % (dns_server, e)
)
ip = ipaddress.ip_address(msg.response.answer[0].name.to_text(True)) ip = ipaddress.ip_address(msg.response.answer[0].name.to_text(True))
if ip.is_private and not self.bot.is_owner(ctx.message.sender): if ip.is_private and not self.bot.is_owner(ctx.message.sender):
return await ctx.respond("You do not have permission to use internal IPs.") return await ctx.respond(
"You do not have permission to use internal IPs."
)
resolver_call = dns.asyncquery.udp_with_fallback resolver_call = dns.asyncquery.udp_with_fallback
where = str(ip) where = str(ip)
@ -100,11 +103,9 @@ class DNSCommand(niobot.Module):
try: try:
start_ts = time.perf_counter() start_ts = time.perf_counter()
response: dns.message.Message | tuple[dns.message.Message, bool] = await resolver_call( response: (
q=query, dns.message.Message | tuple[dns.message.Message, bool]
where=where, ) = await resolver_call(q=query, where=where, timeout=60)
timeout=60
)
end_ts = time.perf_counter() end_ts = time.perf_counter()
if isinstance(response, dns.message.Message): if isinstance(response, dns.message.Message):
response = (response, False) response = (response, False)
@ -115,9 +116,11 @@ class DNSCommand(niobot.Module):
else: else:
response, _ = response response, _ = response
lines = [ lines = [
"Answer for `%s`, using DNS server %r (took %.2fms):" % (hostname, where, (end_ts - start_ts) / 1000), "Answer for `%s`, using DNS server %r (took %.2fms):"
"" % (hostname, where, (end_ts - start_ts) / 1000),
"",
] ]
for n, answer in enumerate(response.answer, start=1): for n, answer in enumerate(response.answer, start=1):
lines.append(f"{n}. `{answer.name.to_text(True)}` (`{answer.to_text()}`)") for item in answer.items.keys():
lines.append(f"* `{item.to_text()}`")
return await m.edit(content="\n".join(lines)) return await m.edit(content="\n".join(lines))

View file

@ -92,7 +92,9 @@ class MSCGetter(niobot.Module):
async def auto_msc_enable(self, ctx: niobot.Context): async def auto_msc_enable(self, ctx: niobot.Context):
"""Automatically enables MSC linking. Requires a power level of at least 50.""" """Automatically enables MSC linking. Requires a power level of at least 50."""
if (sp := ctx.room.power_levels.users.get(ctx.message.sender, -999)) < 50: if (sp := ctx.room.power_levels.users.get(ctx.message.sender, -999)) < 50:
return await ctx.respond(f"You need to have at least a power level of 50 to use this (you have {sp}).") return await ctx.respond(
f"You need to have at least a power level of 50 to use this (you have {sp})."
)
key = self.bot.redis_key(ctx.room.room_id, "auto_msc.enabled") key = self.bot.redis_key(ctx.room.room_id, "auto_msc.enabled")
exists = await self.bot.redis.get(key) exists = await self.bot.redis.get(key)
if exists: if exists:
@ -107,7 +109,9 @@ class MSCGetter(niobot.Module):
async def auto_msc_disable(self, ctx: niobot.Context): async def auto_msc_disable(self, ctx: niobot.Context):
"""Disables automatic MSC linking. Requires a power level of at least 50.""" """Disables automatic MSC linking. Requires a power level of at least 50."""
if (sp := ctx.room.power_levels.users.get(ctx.message.sender, -999)) < 50: if (sp := ctx.room.power_levels.users.get(ctx.message.sender, -999)) < 50:
return await ctx.respond(f"You need to have at least a power level of 50 to use this (you have {sp}).") return await ctx.respond(
f"You need to have at least a power level of 50 to use this (you have {sp})."
)
key = self.bot.redis_key(ctx.room.room_id, "auto_msc.enabled") key = self.bot.redis_key(ctx.room.room_id, "auto_msc.enabled")
exists = await self.bot.redis.get(key) exists = await self.bot.redis.get(key)
if exists: if exists: