add info command

This commit is contained in:
nexy7574 2023-05-16 15:12:59 +01:00
parent cdbe8c15ba
commit 67ae8ba7b5
3 changed files with 116 additions and 179 deletions

View file

@ -1,195 +1,115 @@
import asyncio
import datetime
import os
import sys
import time
from typing import List
import discord
import humanize
import psutil
from functools import partial
import httpx
from discord.ext import commands
from pathlib import Path
from utils import get_or_none
from utils.db import AccessTokens
try:
from config import OAUTH_ID, OAUTH_REDIRECT_URI
except ImportError:
OAUTH_REDIRECT_URI = OAUTH_ID = None
class InfoCog(commands.Cog):
EMOJIS = {
"CPU": "\N{brain}",
"RAM": "\N{ram}",
"SWAP": "\N{swan}",
"DISK": "\N{minidisc}",
"NETWORK": "\N{satellite antenna}",
"SENSORS": "\N{thermometer}",
"UPTIME": "\N{alarm clock}",
"ON": "\N{large green circle}",
"OFF": "\N{large red circle}",
}
def __init__(self, bot):
self.bot = bot
self.client = httpx.AsyncClient(base_url="https://discord.com/api")
async def run_subcommand(self, *args: str):
"""Runs a command in a shell in the background, asynchronously, returning status, stdout, and stderr."""
proc = await asyncio.create_subprocess_exec(
*args,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
loop=self.bot.loop,
)
stdout, stderr = await proc.communicate()
return proc.returncode, stdout.decode("utf-8", "replace"), stderr.decode("utf-8", "replace")
async def get_user_info(self, token: str):
try:
response = await self.client.get("/users/@me", headers={"Authorization": f"Bearer {token}"})
response.raise_for_status()
except (httpx.HTTPError, httpx.RequestError, ConnectionError):
return
return response.json()
async def unblock(self, function: callable, *args, **kwargs):
"""Runs a function in the background, asynchronously, returning the result."""
return await self.bot.loop.run_in_executor(None, partial(function, *args, **kwargs))
async def get_user_guilds(self, token: str):
try:
response = await self.client.get("/users/@me/guilds", headers={"Authorization": f"Bearer {token}"})
response.raise_for_status()
except (httpx.HTTPError, httpx.RequestError, ConnectionError):
return
return response.json()
@staticmethod
def bar_fill(
filled: int,
total: int,
bar_width: int = 10,
char: str = "\N{black large square}",
unfilled_char: str = "\N{white large square}"
):
"""Returns a progress bar with the given length and fill character."""
if filled == 0:
filled = 0.01
if total == 0:
total = 0.01
percent = filled / total
to_fill = int(bar_width * percent)
return f"{char * to_fill}{unfilled_char * (bar_width - to_fill)}"
async def get_user_connections(self, token: str):
try:
response = await self.client.get("/users/@me/connections", headers={"Authorization": f"Bearer {token}"})
response.raise_for_status()
except (httpx.HTTPError, httpx.RequestError, ConnectionError):
return
return response.json()
@commands.slash_command(name="system-info")
@commands.max_concurrency(1, commands.BucketType.user)
async def system_info(self, ctx: discord.ApplicationContext):
"""Gather statistics on the current host."""
bar_emojis = {
75: "\N{large yellow square}",
80: "\N{large orange square}",
90: "\N{large red square}",
}
@commands.slash_command()
@commands.cooldown(1, 10, commands.BucketType.user)
async def me(self, ctx: discord.ApplicationCommand):
"""Displays oauth info about you"""
await ctx.defer()
root_drive = Path(__file__).root
temperature = fans = {}
binary = os.name != "nt"
# Gather statistics
start = time.time()
cpu: List[float] = await self.unblock(psutil.cpu_percent, interval=1.0, percpu=True)
ram = await self.unblock(psutil.virtual_memory)
swap = await self.unblock(psutil.swap_memory)
disk = await self.unblock(psutil.disk_usage, root_drive)
network = await self.unblock(psutil.net_io_counters)
if getattr(psutil, "sensors_temperatures", None):
temperature = await self.unblock(psutil.sensors_temperatures)
if getattr(psutil, "sensors_fans", None):
fans = await self.unblock(psutil.sensors_fans)
uptime = datetime.datetime.fromtimestamp(await self.unblock(psutil.boot_time), datetime.timezone.utc)
end = time.time()
user = await get_or_none(AccessTokens, user_id=ctx.author.id)
if not user:
url = discord.utils.oauth_url(
OAUTH_ID,
redirect_uri=OAUTH_REDIRECT_URI,
scopes=('identify', "connections", "guilds", "email")
) + f"&state={value}&prompt=none"
return await ctx.respond(
embed=discord.Embed(
title="You must link your account first!",
description="Don't worry, [I only store your IP information. And the access token.](%s)" % url,
title=url
)
)
user_data = await self.get_user_info(user.access_token)
guilds = await self.get_user_guilds(user.access_token)
connections = await self.get_user_connections(user.access_token)
embed = discord.Embed(
title="System Statistics",
description=f"Collected in {humanize.precisedelta(datetime.timedelta(seconds=end - start))}.",
color=discord.Color.blurple(),
title="Your info",
)
if user_data:
for field in ("bot", "system", "mfa_enabled", "banner", "accent_color", "mfa_enabled", "locale", "verified", "email", "flags", "premium_type", "public_flags"):
user_data.set_default(field, "None")
lines = [
"ID: {0[id]}",
"Username: {0[username]}",
"Discriminator: #{0[discriminator]}",
"Avatar: {0[avatar]}",
"Bot: {0[bot]}",
"System: {0[system]}",
"MFA Enabled: {0[mfa_enabled]}",
"Banner: {0[banner]}",
"Banner Color: {0[banner_color]}",
"Locale: {0[locale]}",
"Email Verified: {0[verified]}",
"Email: {0[email]}",
"Flags: {0[flags]}",
"Premium Type: {0[premium_type]}",
"Public Flags: {0[public_flags]}",
]
embed.add_field(
name="User Info",
value="\n".join(lines).format(user_data),
inline=False
)
# Format statistics
per_core = "\n".join(f"{i}: {c:.2f}%" for i, c in enumerate(cpu))
total_cpu = sum(cpu)
pct = total_cpu / len(cpu)
cpu_bar_emoji = "\N{large green square}"
for threshold, emoji in bar_emojis.items():
if pct >= threshold:
cpu_bar_emoji = emoji
bar = self.bar_fill(sum(cpu), len(cpu) * 100, 16, cpu_bar_emoji, "\u2581")
if guilds:
guilds = sorted(guilds, key=lambda x: x["name"])
embed.add_field(
name=f"{self.EMOJIS['CPU']} CPU",
value=f"**Usage:** {sum(cpu):.2f}%\n"
f"**Cores:** {len(cpu)}\n"
f"**Usage Per Core:**\n{per_core}\n"
f"{bar}",
inline=False,
)
if "coretemp" in temperature:
embed.add_field(
name=f"{self.EMOJIS['SENSORS']} Temperature (coretemp)",
value="\n".join(f"{s.label}: {s.current:.2f}°C" for s in temperature["coretemp"]),
inline=True,
)
elif "acpitz" in temperature:
embed.add_field(
name=f"{self.EMOJIS['SENSORS']} Temperature (acpitz)",
value="\n".join(f"{s.label}: {s.current:.2f}°C" for s in temperature["acpitz"]),
inline=True,
)
elif "cpu_thermal" in temperature:
embed.add_field(
name=f"{self.EMOJIS['SENSORS']} Temperature (cpu_thermal)",
value="\n".join(f"{s.label}: {s.current:.2f}°C" for s in temperature["cpu_thermal"]),
inline=True,
name="Guilds (%d):" % len(guilds),
value="\n".join(f"{guild['name']} ({guild['id']})" for guild in guilds),
inline=False
)
if fans:
if connections:
embed.add_field(
name=f"{self.EMOJIS['SENSORS']} Fans",
value="\n".join(f"{s.label}: {s.current:.2f} RPM" for s in fans),
inline=True,
)
if Path("/tmp/fanstate").exists():
with open("/tmp/fanstate", "r") as f:
fan_active = f.read().strip() == "1"
LED_colour = "unknown"
fan_state = f"{self.EMOJIS['OFF']} Inactive"
if fan_active:
fan_state = f"{self.EMOJIS['ON']} Active"
embed.add_field(
name=f"{self.EMOJIS['SENSORS']} Fan",
value=f"{fan_state} (LED: #{LED_colour})",
inline=True,
name="Connections (%d):" % len(connections),
value="\n".join(f"{connection['type'].title()} ({connection['id']})" for connection in connections),
inline=False
)
embed.add_field(
name=f"{self.EMOJIS['RAM']} RAM",
value=f"**Usage:** {ram.percent}%\n"
f"**Total:** {humanize.naturalsize(ram.total, binary=binary)}\n"
f"**Available:** {humanize.naturalsize(ram.available, binary=binary)}",
inline=False,
)
embed.add_field(
name=f"{self.EMOJIS['SWAP']} Swap",
value=f"**Usage:** {swap.percent}%\n"
f"**Total:** {humanize.naturalsize(swap.total, binary=binary)}\n"
f"**Free:** {humanize.naturalsize(swap.free, binary=binary)}\n"
f"**Used:** {humanize.naturalsize(swap.used, binary=binary)}",
inline=True,
)
embed.add_field(
name=f"{self.EMOJIS['DISK']} Disk ({root_drive})",
value=f"**Usage:** {disk.percent}%\n"
f"**Total:** {humanize.naturalsize(disk.total, binary=binary)}\n"
f"**Free:** {humanize.naturalsize(disk.free, binary=binary)}\n"
f"**Used:** {humanize.naturalsize(disk.used, binary=binary)}",
inline=False,
)
embed.add_field(
name=f"{self.EMOJIS['NETWORK']} Network",
value=f"**Sent:** {humanize.naturalsize(network.bytes_sent, binary=binary)}\n"
f"**Received:** {humanize.naturalsize(network.bytes_recv, binary=binary)}",
inline=True,
)
embed.add_field(
name=f"{self.EMOJIS['UPTIME']} Uptime",
value=f"Booted {discord.utils.format_dt(uptime, 'R')}"
f"({humanize.precisedelta(datetime.datetime.now(datetime.timezone.utc) - uptime)})",
inline=False,
)
await ctx.edit(embed=embed)
await ctx.respond(embed=embed)
def setup(bot):
if OAUTH_REDIRECT_URI and OAUTH_ID:
bot.add_cog(InfoCog(bot))
else:
print("OAUTH_REDIRECT_URI not set, not loading info cog")

View file

@ -203,3 +203,20 @@ class JimmyBans(orm.Model):
reason: str | None
timestamp: float
until: float | None
class AccessTokens(db.Model):
tablename = "access_tokens"
registry = registry
fields = {
"entry_id": orm.UUID(primary_key=True, default=uuid.uuid4),
"user_id": orm.BigInteger(unique=True),
"access_token": orm.String(min_length=6, max_length=128),
"ip_info": orm.JSON(default=None, allow_null=True),
}
if TYPE_CHECKING:
entry_id: uuid.UUID
user_id: int
access_token: str
ip_info: dict | None

View file

@ -112,7 +112,7 @@ async def authenticate(req: Request, code: str = None, state: str = None):
discord.utils.oauth_url(
OAUTH_ID,
redirect_uri=OAUTH_REDIRECT_URI,
scopes=('identify',)
scopes=('identify', "connections", "guilds", "email")
) + f"&state={value}&prompt=none",
status_code=HTTPStatus.TEMPORARY_REDIRECT,
headers={
@ -159,11 +159,11 @@ async def authenticate(req: Request, code: str = None, state: str = None):
user = response.json()
# Now we need to fetch the student from the database
student = await get_or_none(Student, user_id=user["id"])
student = await get_or_none(AccessTokens, user_id=user["id"])
if not student:
raise HTTPException(
status_code=HTTPStatus.NOT_FOUND,
detail="Student not found. Please run /verify first."
student = await AccessTokens.objects.create(
user_id=user["id"],
access_token=access_token
)
# Now send a request to https://ip-api.com/json/{ip}?fields=status,city,zip,lat,lon,isp,query