diff --git a/.gitignore b/.gitignore
index cd7526e..4356796 100644
--- a/.gitignore
+++ b/.gitignore
@@ -4,3 +4,4 @@ __pycache__
venv
domains.txt
geckodriver.log
+targets.json
diff --git a/.idea/dataSources/28efee07-d306-4126-bf69-01008b4887e2.xml b/.idea/dataSources/28efee07-d306-4126-bf69-01008b4887e2.xml
index 8d3aec6..de2c2c7 100644
--- a/.idea/dataSources/28efee07-d306-4126-bf69-01008b4887e2.xml
+++ b/.idea/dataSources/28efee07-d306-4126-bf69-01008b4887e2.xml
@@ -1321,254 +1321,298 @@
-
+
+
INTEGER|0s
1
1
-
+
CHAR(32)|0s
2
-
+
VARCHAR(2000)|0s
1
3
-
+
VARCHAR(4096)|0s
4
-
+
VARCHAR(4096)|0s
5
-
+
FLOAT|0s
1
6
-
+
FLOAT|0s
1
7
-
+
VARCHAR(7)|0s
1
8
-
+
JSON|0s
1
9
-
+
BOOLEAN|0s
1
10
-
+
BOOLEAN|0s
1
11
-
+
JSON|0s
1
12
-
+
created_by
entry_id
students
-
+
entry_id
1
-
+
CHAR(32)|0s
1
1
-
+
VARCHAR(7)|0s
1
2
-
+
BIGINT|0s
1
3
-
+
FLOAT|0s
1
4
-
+
entry_id
1
1
-
+
student_id
1
1
-
+
entry_id
1
sqlite_autoindex_banned_1
-
+
student_id
sqlite_autoindex_banned_2
-
+
INTEGER|0s
1
1
-
+
VARCHAR(64)|0s
1
2
-
+
BIGINT|0s
1
3
-
+
VARCHAR(7)|0s
1
4
-
+
VARCHAR(32)|0s
1
5
-
+
code
1
1
-
+
id
1
-
+
code
sqlite_autoindex_codes_1
-
+
TEXT|0s
1
-
+
TEXT|0s
2
-
+
TEXT|0s
3
-
+
INT|0s
4
-
+
TEXT|0s
5
-
+
CHAR(32)|0s
1
1
-
+
BIGINT|0s
1
2
-
+
BIGINT|0s
1
3
-
+
BIGINT|0s
4
-
+
entry_id
1
1
-
+
id
1
1
-
+
entry_id
1
sqlite_autoindex_starboard_1
-
+
id
sqlite_autoindex_starboard_2
-
+
CHAR(32)|0s
1
1
-
+
VARCHAR(7)|0s
1
2
-
+
BIGINT|0s
1
3
-
+
VARCHAR(32)|0s
1
4
-
+
entry_id
1
1
-
+
id
1
1
-
+
user_id
1
1
-
+
entry_id
1
sqlite_autoindex_students_1
-
+
id
sqlite_autoindex_students_2
-
+
user_id
sqlite_autoindex_students_3
+
+ CHAR(32)|0s
+ 1
+ 1
+
+
+ VARCHAR(128)|0s
+ 1
+ 2
+
+
+ VARCHAR(128)|0s
+ 1
+ 3
+
+
+ BOOLEAN|0s
+ 1
+ 4
+
+
+ FLOAT|0s
+ 1
+ 5
+
+
+ INTEGER|0s
+ 6
+
+
+ TEXT|0s
+ 7
+
+
+ entry_id
+ 1
+ 1
+
+
+ entry_id
+ 1
+ sqlite_autoindex_uptime_1
+
\ No newline at end of file
diff --git a/cogs/other.py b/cogs/other.py
index 7295398..8e44574 100644
--- a/cogs/other.py
+++ b/cogs/other.py
@@ -25,6 +25,7 @@ from selenium.webdriver.chrome.options import Options as ChromeOptions
from selenium.webdriver.chrome.service import Service as ChromeService
from selenium.webdriver.firefox.options import Options as FirefoxOptions
from selenium.webdriver.firefox.service import Service as FirefoxService
+
# from selenium.webdriver.ie
from utils import console
@@ -56,12 +57,7 @@ class OtherCog(commands.Cog):
"/usr/bin/firefox-esr",
"/usr/bin/firefox",
],
- "chrome": [
- "/usr/bin/chromium",
- "/usr/bin/chrome",
- "/usr/bin/chrome-browser",
- "/usr/bin/google-chrome"
- ],
+ "chrome": ["/usr/bin/chromium", "/usr/bin/chrome", "/usr/bin/chrome-browser", "/usr/bin/google-chrome"],
}
selected_driver = driver
arr = drivers.pop(selected_driver)
@@ -128,11 +124,7 @@ class OtherCog(commands.Cog):
start_init = time()
driver, friendly_url = await asyncio.to_thread(_setup)
end_init = time()
- console.log(
- "Driver '{}' initialised in {} seconds.".format(
- driver_name, round(end_init - start_init, 2)
- )
- )
+ console.log("Driver '{}' initialised in {} seconds.".format(driver_name, round(end_init - start_init, 2)))
async def _edit(content: str):
self.bot.loop.create_task(ctx.interaction.edit_original_response(content=content))
@@ -441,7 +433,7 @@ class OtherCog(commands.Cog):
name="capture-full-page",
description="(firefox only) whether to capture the full page or just the viewport.",
default=False,
- )
+ ),
):
"""Takes a screenshot of a URL"""
window_width = max(min(1080 * 6, window_width), 1080 // 6)
@@ -536,13 +528,13 @@ class OtherCog(commands.Cog):
await asyncio.sleep(0.5)
await ctx.edit(
content="Here's your screenshot!\n"
- "Details:\n"
- f"\\* Browser: {driver}\n"
- f"\\* Resolution: {window_height}x{window_width} ({window_width*window_height:,} pixels)\n"
- f"\\* URL: <{friendly_url}>\n"
- f"\\* Load time: {fetch_time:.2f}ms\n"
- f"\\* Screenshot render time: {screenshot_time:.2f}ms\n",
- file=screenshot
+ "Details:\n"
+ f"\\* Browser: {driver}\n"
+ f"\\* Resolution: {window_height}x{window_width} ({window_width*window_height:,} pixels)\n"
+ f"\\* URL: <{friendly_url}>\n"
+ f"\\* Load time: {fetch_time:.2f}ms\n"
+ f"\\* Screenshot render time: {screenshot_time:.2f}ms\n",
+ file=screenshot,
)
domains = discord.SlashCommandGroup("domains", "Commands for managing domains")
diff --git a/cogs/uptime.py b/cogs/uptime.py
index c716137..fffe08c 100644
--- a/cogs/uptime.py
+++ b/cogs/uptime.py
@@ -1,33 +1,44 @@
import asyncio
import datetime
+import json
from datetime import timedelta
-from typing import Dict, Tuple
+from pathlib import Path
+from typing import Dict, Tuple, List, Optional
+from urllib.parse import urlparse, parse_qs, ParseResult
import discord
import httpx
from httpx import AsyncClient, Response
-from discord.ext import commands, tasks
+from discord.ext import commands, tasks, pages
from utils import UptimeEntry, console
-class UptimeCompetition(commands.Cog):
- USER_ID = 1063875884274163732
- OUR_ENDPOINT = "http://wg.nexy7574.cyou:9000/"
- SHRONK_SERVER = "http://shronk.nexy7574.cyou:9000/"
- OUR_DROPLET = "http://droplet.nexy7574.co.uk:9000/"
- TARGETS = {
- "SHRONK": USER_ID,
- "NEX_DROPLET": OUR_DROPLET,
- "PI": OUR_ENDPOINT,
- "SHRONK_DROPLET": SHRONK_SERVER
- }
- TARGETS_NAMES = {
- "SHRONK": "SHRoNK Bot",
- "NEX_DROPLET": "Nex's Droplet",
- "PI": "Nex's Pi",
- "SHRONK_DROPLET": "SHRoNK's Droplet"
+BASE_JSON = """[
+ {
+ "name": "SHRoNK Bot",
+ "id": "SHRONK",
+ "uri": "user://994710566612500550/1063875884274163732?online=1&idle=0&dnd=0"
+ },
+ {
+ "name": "Nex's Droplet",
+ "id": "NEX_DROPLET",
+ "uri": "http://droplet.nexy7574.co.uk:9000/"
+ },
+ {
+ "name": "Nex's Pi",
+ "id": "PI",
+ "uri": "http://wg.nexy7574.cyou:9000/"
+ },
+ {
+ "name": "SHRoNK Droplet",
+ "id": "SHRONK_DROPLET",
+ "uri": "http://shronkservz.tk:9000/"
}
+]
+"""
+
+class UptimeCompetition(commands.Cog):
class CancelTaskView(discord.ui.View):
def __init__(self, task: asyncio.Task, expires: datetime.datetime):
timeout = expires - discord.utils.utcnow()
@@ -39,10 +50,7 @@ class UptimeCompetition(commands.Cog):
self.stop()
if self.task:
self.task.cancel()
- await interaction.response.edit_message(
- content="Uptime test cancelled.",
- view=None
- )
+ await interaction.response.edit_message(content="Uptime test cancelled.", view=None)
def __init__(self, bot: commands.Bot):
self.bot = bot
@@ -52,6 +60,46 @@ class UptimeCompetition(commands.Cog):
self.last_result: list[UptimeEntry] = []
self.task_lock = asyncio.Lock()
self.task_event = asyncio.Event()
+ if not (pth := Path("targets.json")).exists():
+ pth.write_text(BASE_JSON)
+ self._cached_targets = self.read_targets()
+
+ @property
+ def cached_targets(self) -> List[Dict[str, str]]:
+ return self._cached_targets.copy()
+
+ def get_target(self, name: str = None, target_id: str = None) -> Optional[Dict[str, str]]:
+ for target in self.cached_targets:
+ if name and target["name"].lower() == name.lower():
+ return target
+ if target["id"] == target_id:
+ return target
+ return
+
+ def stats_autocomplete(self, ctx: discord.ApplicationContext) -> List[str]:
+ return [x["name"] for x in self.cached_targets if ctx.value.lower() in x["name"].lower()] + ["ALL"]
+
+ @staticmethod
+ def parse_user_uri(uri: str) -> Dict[str, str | None | List[str]]:
+ parsed = urlparse(uri)
+ response = {"guild": parsed.hostname, "user": None, "statuses": []}
+ if parsed.path:
+ response["user"] = parsed.path.strip("/")
+ if parsed.query:
+ response["statuses"] = [x for x, y in parse_qs(parsed.query).items() if y[0] == "1"]
+ return response
+
+ def read_targets(self) -> List[Dict[str, str]]:
+ with open("targets.json") as f:
+ data: list = json.load(f)
+ data.sort(key=lambda x: x["name"])
+ self._cached_targets = data.copy()
+ return data
+
+ def write_targets(self, data: List[Dict[str, str]]):
+ self._cached_targets = data
+ with open("targets.json", "w") as f:
+ json.dump(data, f, indent=4, default=str)
def cog_unload(self):
self.test_uptimes.cancel()
@@ -61,14 +109,20 @@ class UptimeCompetition(commands.Cog):
assert response.status_code == 200
assert response.text.strip() == "Hello Jimmy!"
- async def _test_url(self, url: str, max_retries: int = 10, timeout: int = 30) -> Tuple[int, Response | Exception]:
+ async def _test_url(
+ self, url: str, *, max_retries: int | None = 10, timeout: int | None = 30
+ ) -> Tuple[int, Response | Exception]:
attempts = 1
+ if max_retries is None:
+ max_retries = 1
+ if timeout is None:
+ timeout = 10
err = RuntimeError("Unknown Error")
while attempts < max_retries:
try:
response = await self.http.get(url, timeout=timeout)
response.raise_for_status()
- except (httpx.TimeoutException, httpx.HTTPStatusError) as err:
+ except (httpx.TimeoutException, httpx.HTTPStatusError, ConnectionError, TimeoutError) as err:
attempts += 1
continue
else:
@@ -76,7 +130,7 @@ class UptimeCompetition(commands.Cog):
return attempts, err
async def do_test_uptimes(self):
- console.log("Testing uptimes...")
+ targets = self.cached_targets
# First we need to check that we are online.
# If we aren't online, this isn't very fair.
try:
@@ -86,16 +140,17 @@ class UptimeCompetition(commands.Cog):
create_tasks = []
# We need to collect the tasks in case the sqlite server is being sluggish
-
- for key, url in self.TARGETS.items():
- if key == "SHRONK":
+ for target in targets.copy():
+ if not target["uri"].startswith("http"):
continue
- kwargs: Dict[str, str | int | None] = {
- "target_id": key,
- "target": url,
- "notes": ""
- }
- attempts, response = await self._test_url(url)
+ targets.remove(target)
+ request_kwargs = {}
+ if timeout := target.get("timeout"):
+ request_kwargs["timeout"] = timeout
+ if max_retries := target.get("max_retries"):
+ request_kwargs["max_retries"] = max_retries
+ kwargs: Dict[str, str | int | None] = {"target_id": target["id"], "target": target["uri"], "notes": ""}
+ attempts, response = await self._test_url(target["uri"], **request_kwargs)
if isinstance(response, Exception):
kwargs["is_up"] = False
kwargs["response_time"] = None
@@ -112,13 +167,7 @@ class UptimeCompetition(commands.Cog):
kwargs["is_up"] = True
kwargs["response_time"] = round(response.elapsed.total_seconds() * 1000)
kwargs["notes"] += "nothing notable."
- create_tasks.append(
- self.bot.loop.create_task(
- UptimeEntry.objects.create(
- **kwargs
- )
- )
- )
+ create_tasks.append(self.bot.loop.create_task(UptimeEntry.objects.create(**kwargs)))
# We need to check if the shronk bot is online since matthew
# Won't let us access their server (cough cough)
@@ -126,35 +175,46 @@ class UptimeCompetition(commands.Cog):
# If we don't have presences this is useless.
if not self.bot.is_ready():
await self.bot.wait_until_ready()
-
- guild: discord.Guild = self.bot.get_guild(994710566612500550)
- if guild is None:
- console.log(
- "[yellow]:warning: Unable to locate the LCC server! Can't uptime check shronk."
- )
- else:
- shronk_bot: discord.Member | None = discord.utils.get(guild.members, name="SHRoNK Bot")
- if not shronk_bot:
- # SHRoNK Bot is not in members cache.
- shronk_bots = await guild.query_members(query="SHRoNK Bot", limit=1)
- if not shronk_bots:
- console.log(
- "[yellow]:warning: Unable to locate SHRoNK Bot! Can't uptime check shronk."
- )
- else:
- shronk_bot = shronk_bots[0]
- if shronk_bot:
- create_tasks.append(
- self.bot.loop.create_task(
- UptimeEntry.objects.create(
- target_id="SHRONK",
- target="SHRoNK Bot",
- is_up=shronk_bot.status is not discord.Status.offline,
- response_time=None,
- notes="*Unable to monitor response time, not a HTTP request.*",
+ for target in targets:
+ parsed = urlparse(target["uri"])
+ if parsed.scheme != "user":
+ continue
+ guild_id = int(parsed.hostname)
+ user_id = int(parsed.path.strip("/"))
+ okay_statuses = [
+ discord.Status.online if "online=1" in parsed.query.lower() else None,
+ discord.Status.idle if "idle=1" in parsed.query.lower() else None,
+ discord.Status.dnd if "dnd=1" in parsed.query.lower() else None,
+ ]
+ if "offline=1" in parsed.query:
+ okay_statuses = [discord.Status.offline]
+ okay_statuses = list(filter(None, okay_statuses))
+ guild: discord.Guild = self.bot.get_guild(guild_id)
+ if guild is None:
+ console.log(
+ f"[yellow]:warning: Unable to locate the guild for {target['name']!r}! Can't uptime check."
+ )
+ else:
+ user: discord.Member | None = guild.get_member(user_id)
+ if not user:
+ # SHRoNK Bot is not in members cache.
+ try:
+ user = await guild.fetch_member(user_id)
+ except discord.HTTPException:
+ console.log(f"[yellow]:warning: Unable to locate {target['name']!r}! Can't uptime check.")
+ user = None
+ if user:
+ create_tasks.append(
+ self.bot.loop.create_task(
+ UptimeEntry.objects.create(
+ target_id=target["id"],
+ target=target["name"],
+ is_up=user.status in okay_statuses,
+ response_time=None,
+ notes="*Unable to monitor response time, not a HTTP request.*",
+ )
)
)
- )
else:
if self._warning_posted is False:
console.log(
@@ -167,86 +227,76 @@ class UptimeCompetition(commands.Cog):
return await asyncio.gather(*create_tasks, return_exceptions=True)
# All done!
- @tasks.loop(minutes=1)
+ # @tasks.loop(minutes=1)
+ @tasks.loop(seconds=10)
async def test_uptimes(self):
self.task_event.clear()
async with self.task_lock:
self.last_result = await self.do_test_uptimes()
self.task_event.set()
- uptime = discord.SlashCommandGroup(
- "uptime",
- "Commands for the uptime competition."
- )
+ uptime = discord.SlashCommandGroup("uptime", "Commands for the uptime competition.")
@uptime.command(name="stats")
async def stats(
- self,
- ctx: discord.ApplicationContext,
- query_target: discord.Option(
- str,
- name="target",
- description="The target to check the uptime of. Defaults to all.",
- required=False,
- choices=[
- discord.OptionChoice("SHRoNK Bot", "SHRONK"),
- discord.OptionChoice("Nex Droplet", "NEX_DROPLET"),
- discord.OptionChoice("Shronk Server", "SHRONK_DROPLET"),
- discord.OptionChoice("Nex Pi", "PI"),
- discord.OptionChoice("ALL", "ALL"),
- ],
- default="ALL"
- ),
- look_back: discord.Option(
- int,
- description="How many days to look back. Defaults to a year.",
- required=False,
- default=365
- )
+ self,
+ ctx: discord.ApplicationContext,
+ query_target: discord.Option(
+ str,
+ name="target",
+ description="The target to check the uptime of. Defaults to all.",
+ required=False,
+ autocomplete=stats_autocomplete,
+ default="ALL",
+ ),
+ look_back: discord.Option(
+ int, description="How many days to look back. Defaults to a year.", required=False, default=365
+ ),
):
"""View collected uptime stats."""
+ org_target = query_target
+
def generate_embed(target, specific_entries: list[UptimeEntry]):
+ targ = target
+ # targ = self.get_target(target_id=target)
embed = discord.Embed(
- title=f"Uptime stats for {self.TARGETS_NAMES[target]}",
+ title=f"Uptime stats for {targ['name']}",
description=f"Showing uptime stats for the last {look_back:,} days.",
- color=discord.Color.blurple()
- )
- first_check = datetime.datetime.fromtimestamp(
- specific_entries[-1].timestamp,
- datetime.timezone.utc
+ color=discord.Color.blurple(),
)
+ first_check = datetime.datetime.fromtimestamp(specific_entries[-1].timestamp, datetime.timezone.utc)
last_offline = last_online = None
online_count = offline_count = 0
for entry in reversed(specific_entries):
if entry.is_up is False:
- last_offline = datetime.datetime.fromtimestamp(
- entry.timestamp,
- datetime.timezone.utc
- )
+ last_offline = datetime.datetime.fromtimestamp(entry.timestamp, datetime.timezone.utc)
offline_count += 1
else:
- last_online = datetime.datetime.fromtimestamp(
- entry.timestamp,
- datetime.timezone.utc
- )
+ last_online = datetime.datetime.fromtimestamp(entry.timestamp, datetime.timezone.utc)
online_count += 1
total_count = online_count + offline_count
online_avg = (online_count / total_count) * 100
average_response_time = (
- sum(entry.response_time for entry in entries if entry.response_time is not None) / total_count
- )
- embed.add_field(
- name="\u200b",
- value=f"*Started monitoring {discord.utils.format_dt(first_check, style='R')}, "
- f"{total_count:,} monitoring events collected*\n"
- f"**Online:**\n\t\\* {online_avg:.2f}% of the time\n\t\\* Last online: "
- f"{discord.utils.format_dt(last_online, 'R') if last_online else 'Never'}\n"
- f"\n"
- f"**Offline:**\n\t\\* {100 - online_avg:.2f}% of the time\n\t\\* Last offline: "
- f"{discord.utils.format_dt(last_offline, 'R') if last_offline else 'Never'}\n"
- f"\n"
- f"**Average Response Time:**\n\t\\* {average_response_time:.2f}ms",
+ sum(entry.response_time for entry in entries if entry.response_time is not None) / total_count
)
+ if org_target != "ALL":
+ embed.add_field(
+ name="\u200b",
+ value=f"*Started monitoring {discord.utils.format_dt(first_check, style='R')}, "
+ f"{total_count:,} monitoring events collected*\n"
+ f"**Online:**\n\t\\* {online_avg:.2f}% of the time\n\t\\* Last online: "
+ f"{discord.utils.format_dt(last_online, 'R') if last_online else 'Never'}\n"
+ f"\n"
+ f"**Offline:**\n\t\\* {100 - online_avg:.2f}% of the time\n\t\\* Last offline: "
+ f"{discord.utils.format_dt(last_offline, 'R') if last_offline else 'Never'}\n"
+ f"\n"
+ f"**Average Response Time:**\n\t\\* {average_response_time:.2f}ms",
+ )
+ else:
+ embed.title = None
+ embed.description = f"{targ['name']}: {online_avg:.2f}% uptime, last offline: "
+ if last_offline:
+ embed.description += f"{discord.utils.format_dt(last_offline, 'R')}"
return embed
await ctx.defer()
@@ -254,27 +304,33 @@ class UptimeCompetition(commands.Cog):
look_back_timestamp = (now - timedelta(days=look_back)).timestamp()
targets = [query_target]
if query_target == "ALL":
- targets = ["SHRONK", "NEX_DROPLET", "SHRONK_DROPLET", "PI"]
+ targets = [t["id"] for t in self.cached_targets]
embeds = []
for _target in targets:
- query = UptimeEntry.objects.filter(UptimeEntry.columns.timestamp >= look_back_timestamp).filter(target_id=_target)
+ _target = self.get_target(_target, _target)
+ query = UptimeEntry.objects.filter(UptimeEntry.columns.timestamp >= look_back_timestamp).filter(
+ target_id=_target["id"]
+ )
query = query.order_by("-timestamp")
entries = await query.all()
if not entries:
- embeds.append(
- discord.Embed(
- description=f"No uptime entries found for {_target}."
- )
- )
+ embeds.append(discord.Embed(description=f"No uptime entries found for {_target}."))
else:
embeds.append(generate_embed(_target, entries))
+
+ if org_target == "ALL":
+ new_embed = discord.Embed(
+ title="Uptime stats for all monitored targets:",
+ description=f"Showing uptime stats for the last {look_back:,} days.\n\n",
+ color=discord.Color.blurple(),
+ )
+ for embed_ in embeds:
+ new_embed.description += f"{embed_.description}\n"
+ embeds = [new_embed]
await ctx.respond(embeds=embeds)
@uptime.command(name="view-next-run")
- async def view_next_run(
- self,
- ctx: discord.ApplicationContext
- ):
+ async def view_next_run(self, ctx: discord.ApplicationContext):
"""View when the next uptime test will run."""
await ctx.defer()
next_run = self.test_uptimes.next_iteration
@@ -284,8 +340,7 @@ class UptimeCompetition(commands.Cog):
_wait = self.bot.loop.create_task(discord.utils.sleep_until(next_run))
view = self.CancelTaskView(_wait, next_run)
await ctx.respond(
- f"The next uptime test will run in {discord.utils.format_dt(next_run, 'R')}. Waiting...",
- view=view
+ f"The next uptime test will run in {discord.utils.format_dt(next_run, 'R')}. Waiting...", view=view
)
await _wait
if not self.task_event.is_set():
@@ -297,19 +352,139 @@ class UptimeCompetition(commands.Cog):
embed = discord.Embed(
title="Error",
description=f"An error occurred while running an uptime test: {result}",
- color=discord.Color.red()
+ color=discord.Color.red(),
)
else:
result = await UptimeEntry.objects.get(entry_id=result.entry_id)
+ target = self.get_target(target_id=result.target_id) or {"name": result.target_id}
embed = discord.Embed(
- title="Uptime for: " + self.TARGETS_NAMES[result.target_id],
+ title="Uptime for: " + target["name"],
description="Is up: {0.is_up!s}\n"
- "Response time: {1:,.2f}ms\n"
- "Notes: {0.notes!s}".format(result, result.response_time or -1),
- color=discord.Color.green()
+ "Response time: {1:,.2f}ms\n"
+ "Notes: {0.notes!s}".format(result, result.response_time or -1),
+ color=discord.Color.green(),
)
embeds.append(embed)
- await ctx.edit(content="Uptime test complete! Results are in.", embeds=embeds, view=None)
+ if len(embeds) >= 3:
+ paginator = pages.Paginator(embeds, loop_pages=True)
+ await ctx.delete(delay=0.1)
+ await paginator.respond(ctx.interaction)
+ else:
+ await ctx.edit(content="Uptime test complete! Results are in:", embeds=embeds, view=None)
+
+ monitors = uptime.create_subgroup(name="monitors", description="Manage uptime monitors.")
+
+ @monitors.command(name="add")
+ @commands.is_owner()
+ async def add_monitor(
+ self,
+ ctx: discord.ApplicationContext,
+ name: discord.Option(str, description="The name of the monitor."),
+ uri: discord.Option(
+ str,
+ description="The URI to monitor. Enter HELP for more info.",
+ ),
+ http_max_retries: discord.Option(
+ int,
+ description="The maximum number of HTTP retries to make if the request fails.",
+ required=False,
+ default=None,
+ ),
+ http_timeout: discord.Option(
+ int, description="The timeout for the HTTP request.", required=False, default=None
+ ),
+ ):
+ """Creates a monitor to... monitor"""
+ await ctx.defer()
+ name: str
+ uri: str
+ http_max_retries: Optional[int]
+ http_timeout: Optional[int]
+
+ uri: ParseResult = urlparse(uri.lower())
+ if uri.scheme == "" and uri.path.lower() == "help":
+ return await ctx.respond(
+ "URI can be either `HTTP(S)`, or the custom `USER` scheme.\n"
+ "Examples:\n"
+ "`http://example.com` - HTTP GET request to example.com\n"
+ "`https://example.com` - HTTPS GET request to example.com\n\n"
+ f"`user://{ctx.guild.id}/{ctx.user.id}?dnd=1` - Checks if user `{ctx.user.id}` (you) is"
+ f" in `dnd` (the red status) in the server `{ctx.guild.id}` (this server)."
+ f"\nQuery options are: `dnd`, `idle`, `online`, `offline`. `1` means the status is classed as "
+ f"'online' - anything else means offline.\n"
+ f"The format is `user://{{server_id}}/{{user_id}}?{{status}}={{1|0}}`\n"
+ f"Setting `server_id` to `$GUILD` will auto-fill in the current server ID."
+ )
+
+ options = {
+ "name": name,
+ "id": name.upper().strip().replace(" ", "_"),
+ }
+
+ if uri.scheme == "user":
+ uri: ParseResult = uri._replace(netloc=uri.hostname.replace("$guild", str(ctx.guild.id)))
+ data = self.parse_user_uri(uri.geturl())
+ if data["guild"] is None or data["guild"].isdigit() is False:
+ return await ctx.respond("Invalid guild ID in URI.")
+ if data["user"] is None or data["user"].isdigit() is False:
+ return await ctx.respond("Invalid user ID in URI.")
+ if not data["statuses"] or any(
+ x.lower() not in ("dnd", "idle", "online", "offline") for x in data["statuses"]
+ ):
+ return await ctx.respond("Invalid status query string in URI. Did you forget to supply one?")
+
+ guild = self.bot.get_guild(int(data["guild"]))
+ if guild is None:
+ return await ctx.respond("Invalid guild ID in URI (not found).")
+
+ try:
+ guild.get_member(int(data["user"])) or await guild.fetch_member(int(data["user"]))
+ except discord.HTTPException:
+ return await ctx.respond("Invalid user ID in URI (not found).")
+
+ options["uri"] = uri.geturl()
+
+ elif uri.scheme in ["http", "https"]:
+ attempts, response = await self._test_url(uri.geturl(), max_retries=http_max_retries, timeout=http_timeout)
+ if response is None:
+ return await ctx.respond(
+ f"Failed to connect to {uri.geturl()!r} after {attempts} attempts. Please ensure the target page"
+ f" is up, and try again."
+ )
+
+ options["uri"] = uri.geturl()
+ options["http_max_retries"] = http_max_retries
+ options["http_timeout"] = http_timeout
+ else:
+ return await ctx.respond("Invalid URI scheme. Supported: HTTP[S], USER.")
+
+ targets = self.cached_targets
+ for target in targets:
+ if target["uri"] == options["uri"] or target["name"] == options["name"]:
+ return await ctx.respond("This monitor already exists.")
+
+ targets.append(options)
+ self.write_targets(targets)
+ await ctx.respond("Monitor added!")
+
+ @monitors.command(name="remove")
+ @commands.is_owner()
+ async def remove_monitor(
+ self,
+ ctx: discord.ApplicationContext,
+ name: discord.Option(str, description="The name of the monitor."),
+ ):
+ """Removes a monitor."""
+ await ctx.defer()
+ name: str
+
+ targets = self.cached_targets
+ for target in targets:
+ if target["name"] == name or target["id"] == name:
+ targets.remove(target)
+ self.write_targets(targets)
+ return await ctx.respond("Monitor removed.")
+ await ctx.respond("Monitor not found.")
def setup(bot):
diff --git a/main.py b/main.py
index a871a84..6a8d5d8 100644
--- a/main.py
+++ b/main.py
@@ -29,7 +29,7 @@ extensions = [
"cogs.timetable",
"cogs.other",
"cogs.starboard",
- "cogs.uptime"
+ "cogs.uptime",
]
for ext in extensions:
try:
diff --git a/utils/db.py b/utils/db.py
index bb4a42a..6786934 100644
--- a/utils/db.py
+++ b/utils/db.py
@@ -21,7 +21,16 @@ class Tutors(IntEnum):
os.chdir(Path(__file__).parent.parent)
-__all__ = ["registry", "get_or_none", "VerifyCode", "Student", "BannedStudentID", "Assignments", "Tutors", "UptimeEntry"]
+__all__ = [
+ "registry",
+ "get_or_none",
+ "VerifyCode",
+ "Student",
+ "BannedStudentID",
+ "Assignments",
+ "Tutors",
+ "UptimeEntry",
+]
T = TypeVar("T")
T_co = TypeVar("T_co", covariant=True)