college-bot-v1/cogs/sysinfo.py

209 lines
8.1 KiB
Python
Raw Normal View History

2024-02-22 11:41:24 +00:00
import asyncio
import datetime
import os
import time
import socket
from typing import List
import discord
import humanize
import psutil
from functools import partial
from discord.ext import commands
from pathlib import Path
class InfoCog(commands.Cog):
EMOJIS = {
"CPU": "\N{brain}",
"RAM": "\N{ram}",
"SWAP": "\N{swan}",
"DISK": "\N{minidisc}",
"NETWORK": "\N{satellite antenna}",
"SENSORS": "\N{thermometer}",
"UPTIME": "\N{alarm clock}",
"ON": "\N{large green circle}",
"OFF": "\N{large red circle}",
}
def __init__(self, bot):
self.bot = bot
async def run_subcommand(self, *args: str):
"""Runs a command in a shell in the background, asynchronously, returning status, stdout, and stderr."""
proc = await asyncio.create_subprocess_exec(
*args,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
loop=self.bot.loop,
)
stdout, stderr = await proc.communicate()
return proc.returncode, stdout.decode("utf-8", "replace"), stderr.decode("utf-8", "replace")
async def unblock(self, function: callable, *args, **kwargs):
"""Runs a function in the background, asynchronously, returning the result."""
return await self.bot.loop.run_in_executor(None, partial(function, *args, **kwargs))
@staticmethod
def bar_fill(
filled: int,
total: int,
bar_width: int = 10,
char: str = "\N{black large square}",
unfilled_char: str = "\N{white large square}"
):
"""Returns a progress bar with the given length and fill character."""
if filled == 0:
filled = 0.01
if total == 0:
total = 0.01
percent = filled / total
to_fill = int(bar_width * percent)
return f"{char * to_fill}{unfilled_char * (bar_width - to_fill)}"
@commands.slash_command(name="system-info")
@commands.max_concurrency(1, commands.BucketType.user)
async def system_info(self, ctx: discord.ApplicationContext):
"""Gather statistics on the current host."""
bar_emojis = {
75: "\N{large yellow square}",
80: "\N{large orange square}",
90: "\N{large red square}",
}
await ctx.defer()
root_drive = Path(__file__).root
temperature = fans = {}
binary = os.name != "nt"
# Gather statistics
start = time.time()
cpu: List[float] = await self.unblock(psutil.cpu_percent, interval=1.0, percpu=True)
ram = await self.unblock(psutil.virtual_memory)
swap = await self.unblock(psutil.swap_memory)
disk = await self.unblock(psutil.disk_usage, root_drive)
network = await self.unblock(psutil.net_io_counters)
if getattr(psutil, "sensors_temperatures", None):
temperature = await self.unblock(psutil.sensors_temperatures)
if getattr(psutil, "sensors_fans", None):
fans = await self.unblock(psutil.sensors_fans)
uptime = datetime.datetime.fromtimestamp(await self.unblock(psutil.boot_time), datetime.timezone.utc)
end = time.time()
embed = discord.Embed(
title="System Statistics",
description=f"Collected in {humanize.precisedelta(datetime.timedelta(seconds=end - start))}.",
color=discord.Color.blurple(),
)
# Format statistics
per_core = "\n".join(f"{i}: {c:.2f}%" for i, c in enumerate(cpu))
total_cpu = sum(cpu)
pct = total_cpu / len(cpu)
cpu_bar_emoji = "\N{large green square}"
for threshold, emoji in bar_emojis.items():
if pct >= threshold:
cpu_bar_emoji = emoji
bar = self.bar_fill(sum(cpu), len(cpu) * 100, 16, cpu_bar_emoji, "\u2581")
embed.add_field(
name=f"{self.EMOJIS['CPU']} CPU",
value=f"**Usage:** {sum(cpu):.2f}%\n"
f"**Cores:** {len(cpu)}\n"
f"**Usage Per Core:**\n{per_core}\n"
f"{bar}",
inline=False,
)
if "coretemp" in temperature:
embed.add_field(
name=f"{self.EMOJIS['SENSORS']} Temperature (coretemp)",
value="\n".join(f"{s.label}: {s.current:.2f}°C" for s in temperature["coretemp"]),
inline=True,
)
elif "acpitz" in temperature:
embed.add_field(
name=f"{self.EMOJIS['SENSORS']} Temperature (acpitz)",
value="\n".join(f"{s.label}: {s.current:.2f}°C" for s in temperature["acpitz"]),
inline=True,
)
elif "cpu_thermal" in temperature:
embed.add_field(
name=f"{self.EMOJIS['SENSORS']} Temperature (cpu_thermal)",
value="\n".join(f"{s.label}: {s.current:.2f}°C" for s in temperature["cpu_thermal"]),
inline=True,
)
if fans:
embed.add_field(
name=f"{self.EMOJIS['SENSORS']} Fans",
value="\n".join(f"{s.label}: {s.current:.2f} RPM" for s in fans),
inline=True,
)
if Path("/tmp/fanstate").exists():
def get_state(command: int):
conn = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
conn.connect("/tmp/fanstate")
conn.sendall(str(command).encode("utf-8"))
content = conn.recv(1024).decode("utf-8").strip()
conn.close()
return content
await ctx.edit(content="Querying fan state...")
fan_active = int((await self.unblock(get_state, 1)).strip()) == 1
await ctx.edit(content="Querying LED colour...")
LED_colour = await self.unblock(get_state, 2)
r, g, b = map(int, LED_colour.split(";"))
LED_colour = f"#{r:02x}{g:02x}{b:02x}"
LED__colour = discord.Colour(int(LED_colour[1:], 16))
embed.colour = LED__colour
fan_state = f"{self.EMOJIS['OFF']} Inactive"
if fan_active:
fan_state = f"{self.EMOJIS['ON']} Active"
embed.add_field(
name=f"{self.EMOJIS['SENSORS']} Fan (PiMoroni Shim)",
value=f"{fan_state} (LED: {LED_colour}; Fan state: {fan_active})",
inline=True,
)
embed.add_field(
name=f"{self.EMOJIS['RAM']} RAM",
value=f"**Usage:** {ram.percent}%\n"
f"**Total:** {humanize.naturalsize(ram.total, binary=binary)}\n"
f"**Available:** {humanize.naturalsize(ram.available, binary=binary)}",
inline=False,
)
embed.add_field(
name=f"{self.EMOJIS['SWAP']} Swap",
value=f"**Usage:** {swap.percent}%\n"
f"**Total:** {humanize.naturalsize(swap.total, binary=binary)}\n"
f"**Free:** {humanize.naturalsize(swap.free, binary=binary)}\n"
f"**Used:** {humanize.naturalsize(swap.used, binary=binary)}",
inline=True,
)
embed.add_field(
name=f"{self.EMOJIS['DISK']} Disk ({root_drive})",
value=f"**Usage:** {disk.percent}%\n"
f"**Total:** {humanize.naturalsize(disk.total, binary=binary)}\n"
f"**Free:** {humanize.naturalsize(disk.free, binary=binary)}\n"
f"**Used:** {humanize.naturalsize(disk.used, binary=binary)}",
inline=False,
)
embed.add_field(
name=f"{self.EMOJIS['NETWORK']} Network",
value=f"**Sent:** {humanize.naturalsize(network.bytes_sent, binary=binary)}\n"
f"**Received:** {humanize.naturalsize(network.bytes_recv, binary=binary)}",
inline=True,
)
embed.add_field(
name=f"{self.EMOJIS['UPTIME']} Uptime",
value=f"Booted {discord.utils.format_dt(uptime, 'R')}"
f"({humanize.precisedelta(datetime.datetime.now(datetime.timezone.utc) - uptime)})",
inline=False,
)
await ctx.edit(content=None, embed=embed)
def setup(bot):
bot.add_cog(InfoCog(bot))