college-bot-v1/cogs/voice.py
2023-05-02 18:08:09 +01:00

195 lines
6.9 KiB
Python

import io
import shutil
import asyncio
import discord
import yt_dlp
import tempfile
from datetime import datetime
from pathlib import Path
from discord.ext import commands
class TransparentQueue(asyncio.Queue):
def __init__(self, maxsize: int = 0) -> None:
super().__init__(maxsize)
self._internal_queue = []
async def put(self, item):
await super().put(item)
self._internal_queue.append(item)
async def job_done(self):
await super().job_done()
self._internal_queue.pop(0)
class YTDLSource(discord.PCMVolumeTransformer):
def __init__(self, source: discord.AudioSource, *, data: dict, volume: float = 0.5):
super().__init__(source, volume)
self.data = data
self.title = data.get("title")
self.url = data.get("url")
@property
def duration(self):
return self.data.get("duration")
@classmethod
async def from_url(cls, ytdl: yt_dlp.YoutubeDL, url, *, loop=None, stream=False):
ffmpeg_options = {"options": "-vn -b:a 44.1k"}
loop = loop or asyncio.get_event_loop()
data = await loop.run_in_executor(
None, lambda: ytdl.extract_info(url, download=not stream)
)
if "entries" in data:
if not data["entries"]:
# Empty playlist
return None
# Takes the first item from a playlist
data = data["entries"][0]
filename = data["url"] if stream else ytdl.prepare_filename(data)
return cls(discord.FFmpegPCMAudio(filename, **ffmpeg_options), data=data)
class VoiceCog(commands.Cog):
def __init__(self, bot):
self.bot = bot
self.ytdl_options = {
"format": "bestaudio/best",
"outtmpl": "%(title)s.%(ext)s",
"restrictfilenames": True,
"noplaylist": True,
"quiet": True,
"no_warnings": True,
"ignoreerrors": True,
"logtostderr": False,
"default_search": "auto",
}
self.cache = Path(tempfile.mkdtemp("jimmy-voice-cache-")).resolve()
self.yt_dl = yt_dlp.YoutubeDL(self.ytdl_options)
# self.queue = TransparentQueue(100)
# self._queue_task = self.bot.loop.create_task(self.queue_task())
async def queue_task(self):
if not self.bot.is_ready():
await self.bot.wait_until_ready()
while True:
ctx, player, author, inserted_at = await self.queue.get()
ctx.guild.voice_client.play(player, after=self.after_player(ctx))
embed = discord.Embed(
description=f"Now playing: [{player.title}]({player.url}), as requested by {author.mention}.",
color=discord.Color.green(),
)
try:
await ctx.respond(
embed=embed
)
except discord.HTTPException:
try:
await ctx.send(embed=embed)
except discord.HTTPException:
pass
await self.queue.job_done()
def cog_unload(self):
shutil.rmtree(self.cache)
def after_player(self, ctx: discord.ApplicationContext):
def after(e):
if e:
self.bot.loop.create_task(
ctx.respond(
f"An error occurred while playing the audio: {e}"
)
)
return after
async def unblock(self, func, *args, **kwargs):
return await self.bot.loop.run_in_executor(None, func, *args, **kwargs)
@commands.slash_command(name="play")
async def play(self, ctx: discord.ApplicationContext, url: str, volume: float = 50):
"""Streams a URL using yt-dl"""
if not ctx.user.voice:
await ctx.respond("You are not connected to a voice channel.")
return
if ctx.voice_client.is_playing():
await ctx.respond("Already playing audio. Use %s first." % self.bot.get_application_command("stop").mention)
return
player = await YTDLSource.from_url(self.yt_dl, url, loop=self.bot.loop, stream=True)
if not player:
await ctx.respond("Could not extract any audio from the given URL.")
ctx.guild.voice_client.play(player, after=self.after_player(ctx))
ctx.guild.voice_client.source.volume = min(100.0, max(1.0, volume / 100))
embed = discord.Embed(
description=f"Playing [{player.title}]({player.url})",
)
await ctx.respond(embed=embed)
@commands.slash_command(name="volume")
async def volume(self, ctx: discord.ApplicationContext, volume: float):
"""Changes the player's volume"""
if not 0 < volume < 101:
await ctx.respond("Volume must be between 1 and 100.")
return
ctx.guild.voice_client.source.volume = volume / 100
await ctx.respond(f"Changed volume to {volume}%")
@commands.slash_command(name="stop")
async def stop(self, ctx: discord.ApplicationContext):
"""Stops and disconnects the bot from voice"""
if not ctx.guild.voice_client:
await ctx.respond("Not connected to a voice channel.")
return
if ctx.voice_client:
if ctx.voice_client.is_playing():
# members = ctx.voice_client.channel.members
# bots = [m for m in members if m.bot]
# if len(bots) == len(members):
# pass
# else:
# humans = len(members) - len(bots)
# if humans > 1:
#
ctx.voice_client.stop()
await ctx.voice_client.disconnect(force=True)
await ctx.respond("Disconnected from voice channel.")
else:
await ctx.respond("Not connected to a voice channel.")
@commands.command(name="dump-metadata")
async def dump_metadata(self, ctx: commands.Context, *, url: str):
"""Dumps JSON YT-DLP metadata to a file"""
async with ctx.channel.typing():
file = io.StringIO()
data = await self.unblock(self.yt_dl.extract_info, url, download=False)
data = await self.unblock(self.yt_dl.sanitize_info, data)
json.dump(data, file, indent=4)
file.seek(0)
return await ctx.respond(file=discord.File(file, filename="metadata.json"))
async def cog_before_invoke(self, ctx: discord.ApplicationContext):
await ctx.defer()
if not self.cache.exists():
self.cache.mkdir()
if not ctx.guild.voice_client:
if ctx.user.voice:
await ctx.user.voice.channel.connect()
else:
await ctx.respond("You are not connected to a voice channel.")
raise commands.CommandError("User not connected to a voice channel.")
def setup(bot):
bot.add_cog(VoiceCog(bot))