Add audio boost command

This commit is contained in:
Nexus 2023-05-05 20:08:47 +01:00
parent 983a5cf112
commit 33919a86fc
Signed by: nex
GPG key ID: 0FA334385D0B689F
5 changed files with 158 additions and 35 deletions

View file

@ -260,12 +260,21 @@ class Events(commands.Cog):
else:
def after(err):
asyncio.run_coroutine_threadsafe(
self.bot.loop.create_task(
_dc(voice),
self.bot.loop
)
if err is not None:
console.log(f"Error playing audio: {err}")
self.bot.loop.create_task(
message.add_reaction("\N{speaker with cancellation stroke}")
)
else:
self.bot.loop.create_task(
message.remove_reaction("\N{speaker with three sound waves}", self.bot.user)
)
self.bot.loop.create_task(
message.add_reaction("\N{speaker}")
)
# noinspection PyTypeChecker
src = discord.FFmpegPCMAudio(str(_file.resolve()), stderr=subprocess.DEVNULL)
@ -274,6 +283,8 @@ class Events(commands.Cog):
src,
after=after
)
if message.channel.permissions_for(message.guild.me).add_reactions:
await message.add_reaction("\N{speaker with three sound waves}")
else:
await message.channel.trigger_typing()
await message.reply(file=discord.File(_file))
@ -414,7 +425,7 @@ class Events(commands.Cog):
"func": send_smeg,
"meta": {
"sub": {
r"pattern": r"(-_.\s)+",
r"pattern": r"([-_.\s\u200b])+",
r"with": ''
},
"check": (assets / "smeg").exists
@ -436,7 +447,7 @@ class Events(commands.Cog):
"check": lambda: message.content.startswith(self.bot.user.mention)
}
},
r"mine diamonds": {
r"mine(ing|d)? (diamonds|away)": {
"func": play_voice(assets / "mine-diamonds.opus"),
"meta": {
"check": (assets / "mine-diamonds.opus").exists

View file

@ -35,9 +35,7 @@ from selenium.webdriver.chrome.options import Options as ChromeOptions
from selenium.webdriver.chrome.service import Service as ChromeService
from selenium.webdriver.firefox.options import Options as FirefoxOptions
from selenium.webdriver.firefox.service import Service as FirefoxService
# from selenium.webdriver.ie
from utils import console
from utils import console, Timer
try:
from config import proxy
@ -1536,38 +1534,53 @@ class OtherCog(commands.Cog):
):
"""OCRs an image"""
await ctx.defer()
timings: Dict[str, float] = {}
attachment: discord.Attachment
data = await attachment.read()
file = io.BytesIO(data)
file.seek(0)
img = await self.bot.loop.run_in_executor(None, Image.open, file)
with Timer(timings, "download attachment"):
data = await attachment.read()
file = io.BytesIO(data)
file.seek(0)
with Timer(timings, "Parse image"):
img = await self.bot.loop.run_in_executor(None, Image.open, file)
try:
text = await self.bot.loop.run_in_executor(None, pytesseract.image_to_string, img)
with Timer(timings, "Run OCR"):
text = await self.bot.loop.run_in_executor(None, pytesseract.image_to_string, img)
except pytesseract.TesseractError as e:
return await ctx.respond(f"Failed to perform OCR: `{e}`")
if len(text) > ctx.guild.filesize_limit - 100:
try:
response = await self.http.put(
"https://api.mystb.in/paste",
json={
"files": [
{
"filename": "ocr.txt",
"content": text
}
],
}
)
response.raise_for_status()
except httpx.HTTPError:
return await ctx.respond("OCR content too large to post.")
else:
data = response.json()
return await ctx.respond("https://mystb.in/%s" % data["id"])
if len(text) > 4096:
with Timer(timings, "Upload text to mystbin"):
try:
response = await self.http.put(
"https://api.mystb.in/paste",
json={
"files": [
{
"filename": "ocr.txt",
"content": text
}
],
}
)
response.raise_for_status()
except httpx.HTTPError:
return await ctx.respond("OCR content too large to post.")
else:
data = response.json()
with Timer(timings, "Respond (URL)"):
embed = discord.Embed(
description="View on [mystb.in](%s)" % ("https://mystb.in/" + data["id"]),
colour=discord.Colour.dark_theme()
)
await ctx.respond(embed=embed)
else:
with Timer(timings, "Respond (File)"):
out_file = io.BytesIO(text.encode("utf-8", "replace"))
await ctx.respond(file=discord.File(out_file, filename="ocr.txt"))
out_file = io.BytesIO(text.encode("utf-8", "replace"))
return await ctx.respond(file=discord.File(out_file, filename="ocr.txt"))
await ctx.edit(
content="Timings:\n" + "\n".join("%s: %s" % (k.title(), v) for k, v in timings.items()),
)
def setup(bot):

View file

@ -3,7 +3,10 @@ import io
import json
import shutil
import asyncio
import subprocess
import discord
import httpx
import yt_dlp
import tempfile
from pathlib import Path
@ -136,7 +139,12 @@ class VoiceCog(commands.Cog):
return await self.bot.loop.run_in_executor(None, call)
@commands.slash_command(name="play")
async def stream(self, ctx: discord.ApplicationContext, url: str, volume: float = 100):
async def stream(
self,
ctx: discord.ApplicationContext,
url: str,
volume: float = 100,
):
"""Streams a URL using yt-dl"""
if not ctx.user.voice:
await ctx.respond("You are not connected to a voice channel.")
@ -333,6 +341,73 @@ class VoiceCog(commands.Cog):
await sender("You are not connected to a voice channel.")
raise commands.CommandError("User not connected to a voice channel.")
@commands.slash_command(name="boost-audio")
async def boost_audio(
self,
ctx: discord.ApplicationContext,
file: discord.Attachment,
level: discord.Option(
float,
"A level (in percentage) of volume (e.g. 150 = 150%)",
min_value=0.1,
max_value=999.99
)
):
"""Boosts an audio file's audio level."""
await ctx.defer()
if file.size >= (25 * 1024 * 1024):
return await ctx.respond("File is too large (25MB Max).")
with tempfile.TemporaryDirectory("jimmy-audio-boost-") as temp_dir_raw:
temp_dir = Path(temp_dir_raw).resolve()
_input = temp_dir / file.filename
output = _input.with_name(_input.name + "-processed" + '.'.join(_input.suffixes))
await file.save(_input)
proc: subprocess.CompletedProcess = await self.bot.loop.run_in_executor(
None,
functools.partial(
subprocess.run,
(
"ffmpeg",
"-hide_banner",
"-i",
str(_input),
"-b:a",
"44.1k",
"-af",
"volume=%d" % (level / 100),
str(output),
),
capture_output=True
)
)
if proc.returncode == 0:
if output.stat().st_size >= (25 * 1024 * 1024) + len(output.name):
return await ctx.respond("I'd love to serve you your boosted file, but its too large.")
return await ctx.respond(file=discord.File(output))
else:
data = {
"files": [
{
"content": proc.stderr.decode() or 'empty',
"filename": "stderr.txt"
},
{
"content": proc.stdout.decode() or 'empty',
"filename": "stdout.txt"
}
]
}
response = await httpx.AsyncClient().put("https://api.mystb.in/paste", json=data)
if response.status_code == 201:
data = response.json()
key = "https://mystb.in/" + data["id"]
else:
key = "https://www.youtube.com/watch?v=dgha9S39Y6M&status_code=%d" % response.status_code
await ctx.respond("Failed ([exit code %d](%s))" % (proc.returncode, key))
await ctx.edit(embed=None)
def setup(bot):
bot.add_cog(VoiceCog(bot))

View file

@ -1,7 +1,7 @@
from typing import Optional, TYPE_CHECKING
from urllib.parse import urlparse
import discord
import time
from discord.ext import commands
from ._email import *
@ -30,6 +30,28 @@ class JimmyBanException(discord.CheckFailure):
return f"<JimmyBanException until={self.until!r} reason={self.reason!r}>"
class Timer:
def __init__(self, target: dict = None, name: str = ...):
self.target = target
self.name = name
self._start_time = None
self._end_time = None
@property
def total(self) -> float:
if not self._start_time and not self._end_time:
raise RuntimeError("Timer has not been started or stopped.")
return self._end_time - self._start_time
def __enter__(self):
self._start_time = time.time()
def __exit__(self, exc_type, exc_val, exc_tb):
self._end_time = time.time()
if self.target and self.name is not ...:
self.target[self.name] = self.total
def simple_embed_paginator(
lines: list[str], *, assert_ten: bool = False, empty_is_none: bool = True, **kwargs
) -> Optional[list[discord.Embed]]:

View file

@ -55,6 +55,8 @@ class Bot(commands.Bot):
async def on_error(self, event: str, *args, **kwargs):
e_type, e, tb = sys.exc_info()
if isinstance(e, discord.NotFound) and e.code == 10062: # invalid interaction
return
if isinstance(e, discord.CheckFailure) and 'The global check once functions failed.' in str(e):
return
await super().on_error(event, *args, **kwargs)