import asyncio import datetime import os import time import socket from typing import List import discord import humanize import psutil from functools import partial from discord.ext import commands from pathlib import Path class SysInfoCog(commands.Cog): EMOJIS = { "CPU": "\N{brain}", "RAM": "\N{ram}", "SWAP": "\N{swan}", "DISK": "\N{minidisc}", "NETWORK": "\N{satellite antenna}", "SENSORS": "\N{thermometer}", "UPTIME": "\N{alarm clock}", "ON": "\N{large green circle}", "OFF": "\N{large red circle}", } def __init__(self, bot): self.bot = bot async def run_subcommand(self, *args: str): """Runs a command in a shell in the background, asynchronously, returning status, stdout, and stderr.""" proc = await asyncio.create_subprocess_exec( *args, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, loop=self.bot.loop, ) stdout, stderr = await proc.communicate() return proc.returncode, stdout.decode("utf-8", "replace"), stderr.decode("utf-8", "replace") async def unblock(self, function: callable, *args, **kwargs): """Runs a function in the background, asynchronously, returning the result.""" return await self.bot.loop.run_in_executor(None, partial(function, *args, **kwargs)) @staticmethod def bar_fill( filled: int, total: int, bar_width: int = 10, char: str = "\N{black large square}", unfilled_char: str = "\N{white large square}" ): """Returns a progress bar with the given length and fill character.""" if filled == 0: filled = 0.01 if total == 0: total = 0.01 percent = filled / total to_fill = int(bar_width * percent) return f"{char * to_fill}{unfilled_char * (bar_width - to_fill)}" @commands.slash_command(name="system-info") @commands.max_concurrency(1, commands.BucketType.user) async def system_info(self, ctx: discord.ApplicationContext): """Gather statistics on the current host.""" bar_emojis = { 75: "\N{large yellow square}", 80: "\N{large orange square}", 90: "\N{large red square}", } await ctx.defer() root_drive = Path(__file__).root temperature = fans = {} binary = os.name != "nt" # Gather statistics start = time.time() cpu: List[float] = await self.unblock(psutil.cpu_percent, interval=1.0, percpu=True) ram = await self.unblock(psutil.virtual_memory) swap = await self.unblock(psutil.swap_memory) disk = await self.unblock(psutil.disk_usage, root_drive) network = await self.unblock(psutil.net_io_counters) if getattr(psutil, "sensors_temperatures", None): temperature = await self.unblock(psutil.sensors_temperatures) if getattr(psutil, "sensors_fans", None): fans = await self.unblock(psutil.sensors_fans) uptime = datetime.datetime.fromtimestamp(await self.unblock(psutil.boot_time), datetime.timezone.utc) end = time.time() embed = discord.Embed( title="System Statistics", description=f"Collected in {humanize.precisedelta(datetime.timedelta(seconds=end - start))}.", color=discord.Color.blurple(), ) # Format statistics per_core = "\n".join(f"{i}: {c:.2f}%" for i, c in enumerate(cpu)) total_cpu = sum(cpu) pct = total_cpu / len(cpu) cpu_bar_emoji = "\N{large green square}" for threshold, emoji in bar_emojis.items(): if pct >= threshold: cpu_bar_emoji = emoji bar = self.bar_fill(sum(cpu), len(cpu) * 100, 16, cpu_bar_emoji, "\u2581") embed.add_field( name=f"{self.EMOJIS['CPU']} CPU", value=f"**Usage:** {sum(cpu):.2f}%\n" f"**Cores:** {len(cpu)}\n" f"**Usage Per Core:**\n{per_core}\n" f"{bar}", inline=False, ) if "coretemp" in temperature: embed.add_field( name=f"{self.EMOJIS['SENSORS']} Temperature (coretemp)", value="\n".join(f"{s.label}: {s.current:.2f}°C" for s in temperature["coretemp"]), inline=True, ) elif "acpitz" in temperature: embed.add_field( name=f"{self.EMOJIS['SENSORS']} Temperature (acpitz)", value="\n".join(f"{s.label}: {s.current:.2f}°C" for s in temperature["acpitz"]), inline=True, ) elif "cpu_thermal" in temperature: embed.add_field( name=f"{self.EMOJIS['SENSORS']} Temperature (cpu_thermal)", value="\n".join(f"{s.label}: {s.current:.2f}°C" for s in temperature["cpu_thermal"]), inline=True, ) if fans: embed.add_field( name=f"{self.EMOJIS['SENSORS']} Fans", value="\n".join(f"{s.label}: {s.current:.2f} RPM" for s in fans), inline=True, ) if Path("/tmp/fanstate").exists(): def get_state(command: int): conn = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) conn.connect("/tmp/fanstate") conn.sendall(str(command).encode("utf-8")) content = conn.recv(1024).decode("utf-8").strip() conn.close() return content await ctx.edit(content="Querying fan state...") fan_active = int((await self.unblock(get_state, 1)).strip()) == 1 await ctx.edit(content="Querying LED colour...") LED_colour = await self.unblock(get_state, 2) r, g, b = map(int, LED_colour.split(";")) LED_colour = f"#{r:02x}{g:02x}{b:02x}" LED__colour = discord.Colour(int(LED_colour[1:], 16)) embed.colour = LED__colour fan_state = f"{self.EMOJIS['OFF']} Inactive" if fan_active: fan_state = f"{self.EMOJIS['ON']} Active" embed.add_field( name=f"{self.EMOJIS['SENSORS']} Fan (PiMoroni Shim)", value=f"{fan_state} (LED: {LED_colour}; Fan state: {fan_active})", inline=True, ) embed.add_field( name=f"{self.EMOJIS['RAM']} RAM", value=f"**Usage:** {ram.percent}%\n" f"**Total:** {humanize.naturalsize(ram.total, binary=binary)}\n" f"**Available:** {humanize.naturalsize(ram.available, binary=binary)}", inline=False, ) embed.add_field( name=f"{self.EMOJIS['SWAP']} Swap", value=f"**Usage:** {swap.percent}%\n" f"**Total:** {humanize.naturalsize(swap.total, binary=binary)}\n" f"**Free:** {humanize.naturalsize(swap.free, binary=binary)}\n" f"**Used:** {humanize.naturalsize(swap.used, binary=binary)}", inline=True, ) embed.add_field( name=f"{self.EMOJIS['DISK']} Disk ({root_drive})", value=f"**Usage:** {disk.percent}%\n" f"**Total:** {humanize.naturalsize(disk.total, binary=binary)}\n" f"**Free:** {humanize.naturalsize(disk.free, binary=binary)}\n" f"**Used:** {humanize.naturalsize(disk.used, binary=binary)}", inline=False, ) embed.add_field( name=f"{self.EMOJIS['NETWORK']} Network", value=f"**Sent:** {humanize.naturalsize(network.bytes_sent, binary=binary)}\n" f"**Received:** {humanize.naturalsize(network.bytes_recv, binary=binary)}", inline=True, ) embed.add_field( name=f"{self.EMOJIS['UPTIME']} Uptime", value=f"Booted {discord.utils.format_dt(uptime, 'R')}" f"({humanize.precisedelta(datetime.datetime.now(datetime.timezone.utc) - uptime)})", inline=False, ) await ctx.edit(content=None, embed=embed) def setup(bot): bot.add_cog(SysInfoCog(bot))