college-bot-v2/src/cogs/ffmeta.py

234 lines
8 KiB
Python
Raw Normal View History

2024-02-06 23:03:49 +00:00
import asyncio
2024-02-07 16:02:32 +00:00
import io
2024-02-07 16:53:29 +00:00
import json
2024-02-06 23:03:49 +00:00
import logging
2024-02-07 16:53:29 +00:00
import tempfile
2024-02-07 16:18:09 +00:00
import typing
2024-02-07 17:04:58 +00:00
from pathlib import Path
2024-02-07 16:18:09 +00:00
2024-02-07 16:02:32 +00:00
import PIL.Image
2024-02-06 23:03:49 +00:00
import discord
2024-02-07 16:02:32 +00:00
import httpx
2024-02-06 23:03:49 +00:00
from discord.ext import commands
2024-02-07 16:53:29 +00:00
from conf import VERSION
2024-02-06 23:03:49 +00:00
class FFMeta(commands.Cog):
def __init__(self, bot: commands.Bot):
self.bot = bot
self.log = logging.getLogger("jimmy.cogs.ffmeta")
2024-02-07 16:02:32 +00:00
def jpegify_image(self, input_file: io.BytesIO, quality: int = 50, image_format: str = "jpeg") -> io.BytesIO:
quality = min(1, max(quality, 100))
img_src = PIL.Image.open(input_file)
img_dst = io.BytesIO()
self.log.debug("Saving input file (%r) as %r with quality %r%%", input_file, image_format, quality)
img_src.save(img_dst, format=image_format, quality=quality)
img_dst.seek(0)
return img_dst
2024-02-06 23:03:49 +00:00
@commands.slash_command()
async def ffprobe(self, ctx: discord.ApplicationContext, url: str = None, attachment: discord.Attachment = None):
"""Runs ffprobe on a given URL or attachment"""
if url is None:
if attachment is None:
return await ctx.respond("No URL or attachment provided")
url = attachment.url
await ctx.defer()
process = await asyncio.create_subprocess_exec(
"ffprobe",
"-hide_banner",
url,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
stdout, stderr = await process.communicate()
2024-02-06 23:07:30 +00:00
stdout = stdout.decode("utf-8", "replace")
stderr = stderr.decode("utf-8", "replace")
2024-02-06 23:03:49 +00:00
paginator = commands.Paginator(prefix="```", suffix="```")
2024-02-06 23:08:03 +00:00
for line in stdout.splitlines():
2024-02-06 23:03:49 +00:00
if stderr:
paginator.add_line(f"[OUT] {line}"[:2000])
else:
paginator.add_line(line[:2000])
2024-02-06 23:08:03 +00:00
for line in stderr.splitlines():
2024-02-06 23:03:49 +00:00
paginator.add_line(f"[ERR] {line}"[:2000])
for page in paginator.pages:
await ctx.respond(page)
2024-02-07 16:02:32 +00:00
@commands.slash_command()
2024-02-07 16:18:09 +00:00
async def jpegify(
self,
ctx: discord.ApplicationContext,
url: str = None,
attachment: discord.Attachment = None,
quality: typing.Annotated[
int,
discord.Option(
int,
description="The quality of the resulting image from 1%-100%",
default=50,
min_value=1,
max_value=100
)
] = 50,
image_format: typing.Annotated[
str,
discord.Option(
str,
description="The format of the resulting image",
2024-02-07 16:28:36 +00:00
choices=["jpeg", "webp"],
2024-02-07 16:18:09 +00:00
default="jpeg"
)
] = "jpeg"
):
2024-02-07 16:02:32 +00:00
"""Converts a given URL or attachment to a JPEG"""
if url is None:
if attachment is None:
return await ctx.respond("No URL or attachment provided")
url = attachment.url
await ctx.defer()
src = io.BytesIO()
2024-02-07 16:53:29 +00:00
async with httpx.AsyncClient(
headers={"User-Agent": f"DiscordBot (Jimmy, v2, {VERSION}, +https://github.com/nexy7574/college-bot-v2)"}
) as client:
2024-02-07 16:02:32 +00:00
response = await client.get(url)
if response.status_code != 200:
return
src.write(response.content)
2024-02-07 16:18:09 +00:00
try:
dst = await asyncio.to_thread(self.jpegify_image, src, quality, image_format)
except Exception as e:
await ctx.respond(f"Failed to convert image: `{e}`.")
self.log.error("Failed to convert image %r: %r", url, e)
return
else:
await ctx.respond(file=discord.File(dst, filename=f"jpegified.{image_format}"))
2024-02-07 16:02:32 +00:00
2024-02-07 16:53:29 +00:00
@commands.slash_command()
async def opusinate(
self,
ctx: discord.ApplicationContext,
url: str = None,
attachment: discord.Attachment = None,
bitrate: typing.Annotated[
int,
discord.Option(
int,
description="The bitrate in kilobits of the resulting audio from 1-512",
2024-02-07 16:56:41 +00:00
default=96,
2024-02-07 16:53:29 +00:00
min_value=1,
max_value=512
)
2024-02-07 16:56:41 +00:00
] = 96,
2024-02-07 16:53:29 +00:00
mono: typing.Annotated[
bool,
discord.Option(
bool,
description="Whether to convert the audio to mono",
default=False
)
] = False
):
"""Converts a given URL or attachment to an Opus file"""
2024-02-07 17:04:58 +00:00
filename = "opusinated.ogg"
2024-02-07 16:53:29 +00:00
if url is None:
if attachment is None:
return await ctx.respond("No URL or attachment provided")
url = attachment.url
2024-02-07 17:04:58 +00:00
filename = str(Path(attachment.filename).with_suffix(".ogg"))
2024-02-07 16:53:29 +00:00
await ctx.defer()
channels = 2 if not mono else 1
with tempfile.NamedTemporaryFile() as temp:
async with httpx.AsyncClient(
2024-02-07 17:02:17 +00:00
headers={
"User-Agent": f"DiscordBot (Jimmy, v2, {VERSION}, +https://github.com/nexy7574/college-bot-v2)"
}
2024-02-07 16:53:29 +00:00
) as client:
response = await client.get(url)
if response.status_code != 200:
return
temp.write(response.content)
temp.flush()
probe_process = await asyncio.create_subprocess_exec(
"ffprobe",
"-v", "quiet",
"-print_format", "json",
"-show_format",
"-i",
temp.name,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
stdout, stderr = await probe_process.communicate()
stdout = stdout.decode("utf-8", "replace")
data = {"format": {"duration": 195}} # 3 minutes and 15 seconds is the 2023 average.
if stdout:
try:
data = json.loads(stdout)
except json.JSONDecodeError:
pass
2024-02-07 16:56:41 +00:00
duration = float(data["format"].get("duration", 195))
2024-02-07 16:53:29 +00:00
max_end_size = ((bitrate * duration * channels) / 8) * 1024
if max_end_size > (24.75 * 1024 * 1024):
return await ctx.respond(
"The file would be too large to send ({:,.2f} MiB).".format(max_end_size / 1024 / 1024)
)
process = await asyncio.create_subprocess_exec(
"ffmpeg",
"-hide_banner",
2024-02-07 17:02:17 +00:00
"-loglevel",
"info",
"-stats",
2024-02-07 16:53:29 +00:00
"-i",
temp.name,
"-c:a", "libopus",
"-b:a", f"{bitrate}k",
"-vn",
"-sn",
"-ac", str(channels),
"-f", "opus",
2024-02-07 17:02:17 +00:00
"-y",
2024-02-07 16:53:29 +00:00
"pipe:1",
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
stdout, stderr = await process.communicate()
stderr = stderr.decode("utf-8", "replace")
paginator = commands.Paginator(prefix="```", suffix="```")
2024-02-07 17:06:58 +00:00
known_lines = []
2024-02-07 16:53:29 +00:00
for line in stderr.splitlines():
2024-02-07 17:08:02 +00:00
if line.strip() in known_lines:
2024-02-07 17:06:58 +00:00
continue
2024-02-07 17:08:02 +00:00
known_lines.append(line.strip())
2024-02-07 17:06:10 +00:00
if line.strip().startswith(":"):
2024-02-07 17:02:17 +00:00
continue
2024-02-07 16:53:29 +00:00
paginator.add_line(f"{line}"[:2000])
for page in paginator.pages:
await ctx.respond(page)
file = io.BytesIO(stdout)
if (fs := len(file.getvalue())) > (24.75 * 1024 * 1024):
return await ctx.respond("The file is too large to send ({:,.2f} MiB).".format(fs / 1024 / 1024))
2024-02-07 17:04:58 +00:00
await ctx.respond(file=discord.File(file, filename=filename))
2024-02-07 16:53:29 +00:00
2024-02-06 23:03:49 +00:00
def setup(bot: commands.Bot):
bot.add_cog(FFMeta(bot))