""" What it says on the tin - a command wrapper around yt-dlp. """ import datetime import functools import math import textwrap import time import typing import uuid from typing import Optional from urllib.parse import urlparse import httpx import humanize import niobot import logging import asyncio import shutil import subprocess import tempfile from pathlib import Path from yt_dlp import YoutubeDL, DownloadError def utcnow(): return datetime.datetime.now(tz=datetime.timezone.utc) class YoutubeDLModule(niobot.Module): def __init__(self, client: niobot.NioBot): super().__init__(client) self.log = logging.getLogger("jimmy.cogs.ytdl") self.common_formats = { "144p": "bv[width<=144]+ba[ext=webm]/bv[width<=144]+ba[ext=m4a]/bv[width<=144]+ba/b[width<=144]", "240p": "bv[width<=240]+ba[ext=webm]/bv[width<=240]+ba[ext=m4a]/bv[width<=240]+ba/b[width<=240]", "360p": "bv[width<=360]+ba[ext=webm]/bv[width<=360]+ba[ext=m4a]/bv[width<=360]+ba/b[width<=360]", "480p": "bv[width<=500]+ba[ext=webm]/bv[width<=500]+ba[ext=m4a]/bv[width<=500]+bab[width<=480]", "720p": "bv[width<=720]+ba[ext=webm]/bv[width<=720]+ba[ext=m4a]/bv[width<=720]+ba/b[width<=720]", "1080p": "bv[width<=1080]+ba[ext=webm]/bv[width<=1080]+ba[ext=m4a]/bv[width<=1080]+ba", "1440p": "bv[width<=1440]+ba[ext=webm]/bv[width<=1440]+ba[ext=m4a]/bv[width<=1440]+ba", "2160p": "bv[width<=2160]+ba[ext=webm]/bv[width<=2160]+ba[ext=m4a]/bv[width<=2160]+ba", "mp3": "ba[filesize<100M]", "m4a": "ba[ext=m4a][filesize<100M]", "opus": "ba[ext=webm][filesize<100M]", "vorbis": "ba[ext=webm][filesize<100M]", "ogg": "ba[ext=webm][filesize<100M]", } self.default_options = { "noplaylist": True, "nocheckcertificate": True, "no_color": True, "noprogress": True, "logger": self.log, "format": "((bv+ba/b)[vcodec!=h265][filesize<100M]/b[filesize<=100M]/b)", "outtmpl": "%(title).50s.%(ext)s", "format_sort": [ "vcodec:h264", "acodec:aac", "vcodec:vp9", "acodec:opus", "acodec:vorbis", "vcodec:vp8", "ext", ], "merge_output_format": "webm/mp4/mov/m4a/oga/ogg/mp3/mka/mkv", "source_address": "0.0.0.0", "concurrent_fragment_downloads": 4, "max_filesize": "600M" } async def convert_to_m4a(self, file: Path) -> Path: """ Converts a file to m4a format. :param file: The file to convert :return: The converted file """ def inner(): if not shutil.which("ffmpeg"): raise RuntimeError("ffmpeg is not installed.") new_file = file.with_suffix(".m4a") args = [ "-vn", "-sn", "-i", str(file), "-c:a", "aac", "-b:a", "96k", "-movflags", "faststart", "-y", str(new_file), ] self.log.debug("Running command: ffmpeg %s", " ".join(args)) process = subprocess.run( ["ffmpeg", *args], stdout=subprocess.PIPE, stderr=subprocess.PIPE ) if process.returncode != 0: raise RuntimeError(process.stderr.decode()) return new_file return await asyncio.to_thread(inner) @staticmethod async def upload_to_0x0( name: str, data: typing.IO[bytes], mime_type: str | None = None ) -> str: if not mime_type: import magic mime_type = await asyncio.to_thread( magic.from_buffer, data.read(4096), mime=True ) data.seek(0) async with httpx.AsyncClient() as client: response = await client.post( "https://0x0.st", files={"file": (name, data, mime_type)}, data={"expires": 12}, headers={ "User-Agent": "CollegeBot (see: https://gist.i-am.nexus/nex/f63fcb9eb389401caf66d1dfc3c7570c)" }, ) if response.status_code == 200: return urlparse(response.text).path[1:] response.raise_for_status() @niobot.command() async def ytdl( self, ctx: niobot.Context, url: str, snip: Optional[str] = None, download_format: Optional[str] = None, ): """ Downloads a video from YouTube or other source snip: an optional timestamp to snip the video to, in the format `start-end`. e.g. `1:30-2:00` download_format: the format to download the video in. e.g. `1080p`, or a specific ID (e.g. `22`) """ response = await ctx.respond("Preparing...") options = self.default_options.copy() description = "" with tempfile.TemporaryDirectory(prefix="jimmy-ytdl-") as temp_dir: temp_dir = Path(temp_dir) paths = { target: str(temp_dir) for target in ( "home", "temp", ) } chosen_format = self.default_options["format"] if download_format: if download_format in self.common_formats: chosen_format = self.common_formats[download_format] else: chosen_format = download_format options.setdefault("postprocessors", []) options["format"] = chosen_format options["paths"] = paths filesize = 0 with YoutubeDL(options) as downloader: await response.edit(content="Fetching metadata (step 1/10)") try: # noinspection PyTypeChecker extracted_info = await asyncio.to_thread( downloader.extract_info, url, download=False ) except DownloadError as e: extracted_info = { "title": "error", "thumbnail_url": None, "webpage_url": url, "format": "error", "format_id": "-1", "ext": "wav", "format_note": str(e), "resolution": "1x1", "fps": "1", "vcodec": "error", "acodec": "error", "filesize": 0, } title = "error" description = str(e) likes = views = 0 else: title = extracted_info.get("title", url) or url title = textwrap.shorten(title, 100) webpage_url = extracted_info.get("webpage_url", url) chosen_format = ( extracted_info.get("format") or chosen_format or str(uuid.uuid4()) ) chosen_format_id = extracted_info.get("format_id") or str( uuid.uuid4() ) final_extension = extracted_info.get("ext") or "mp4" format_note = ( extracted_info.get( "format_note", "%s (%s)" % (chosen_format, chosen_format_id) ) or "" ) resolution = extracted_info.get("resolution") or "1x1" fps = extracted_info.get("fps", 0.0) or 0.0 vcodec = extracted_info.get("vcodec") or "h264" acodec = extracted_info.get("acodec") or "aac" filesize = extracted_info.get( "filesize", extracted_info.get("filesize_approx", 1) ) likes = extracted_info.get( "like_count", extracted_info.get("average_rating", 0) ) views = extracted_info.get("view_count", 0) lines = [] if chosen_format and chosen_format_id: lines.append( "* Chosen format: `%s` (`%s`)" % (chosen_format, chosen_format_id), ) if format_note: lines.append("* Format note: %r" % format_note) if final_extension: lines.append("* File extension: " + final_extension) if resolution: _s = resolution if fps: _s += " @ %s FPS" % fps lines.append("* Resolution: " + _s) if vcodec or acodec: lines.append("%s+%s" % (vcodec or "N/A", acodec or "N/A")) if filesize: lines.append("* Filesize: %s" % humanize.naturalsize(filesize)) if lines: description += "\n" description += "\n".join(lines) if filesize and filesize >= 500 * 1000: return await response.edit("Sorry, could not find a format small enough.") await response.edit( f"# {title}\n\n{description}\n\nProgress: `0% [..........]`\n\nDownloading (step 2/10)" ) try: await asyncio.to_thread( functools.partial(downloader.download, [url]) ) except DownloadError as e: logging.error(e, exc_info=True) return await response.edit( f"# Error!\n\nDownload failed:\n```\n{e}\n```", ) try: file: Path = next( temp_dir.glob("*." + extracted_info.get("ext", "*")) ) except StopIteration: ext = extracted_info.get("ext", "*") self.log.warning( "Failed to locate downloaded file. Was supposed to be looking for a file extension of " "%r amongst files %r, however none were found.", ext, list(map(str, temp_dir.iterdir())), ) return await response.edit( f"# Error\n\nFailed to locate downloaded file. Expected a file with the extension {ext}.\n\n" f"Files: {', '.join(list(map(str, temp_dir.iterdir())))}", ) if snip: try: trim_start, trim_end = snip.split("-") except ValueError: trim_start, trim_end = snip, None trim_start = trim_start or "00:00:00" trim_end = trim_end or extracted_info.get( "duration_string", "00:30:00" ) new_file = temp_dir / ("output" + file.suffix) args = [ "-hwaccel", "auto", "-i", str(file), "-ss", trim_start, "-to", trim_end, "-preset", "fast", "-crf", "24", "-deadline", "realtime", "-cpu-used", "5", "-movflags", "faststart", "-b:a", "96k", "-y", "-strict", "2", str(new_file), ] await response.edit( f"# Trimming from {trim_start} to {trim_end}\n\nPlease wait, this may take a couple of minutes." ) self.log.debug("Running command: 'ffmpeg %s'", " ".join(args)) process = await asyncio.create_subprocess_exec( "ffmpeg", *args, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, ) stdout, stderr = await process.communicate() self.log.debug("STDOUT:\n%r", stdout.decode()) self.log.debug("STDERR:\n%r", stderr.decode()) if process.returncode != 0: await response.edit( f"# Trim failed\n\nError:\n```\n{stderr.decode()}\n```", ) file = new_file stat = file.stat() size_bytes = stat.st_size if size_bytes >= ((500 * 1024 * 1024) - 256): return await response.edit( f"# Error\n\nFile is too large to upload. Size: {humanize.naturalsize(size_bytes)}", ) size_megabits = (size_bytes * 8) / 1024 / 1024 eta_seconds = size_megabits / 20 await response.edit( content=f"Uploading (ETA: {humanize.naturaldelta(eta_seconds)})..." ) views = views or 0 likes = likes or 0 try: if vcodec.lower() in [ "hevc", "h265", "av1", "av01", ]: with file.open("rb") as fb: part = await self.upload_to_0x0(file.name, fb) await ctx.respond("https://embeds.video/0x0/" + part) else: attachment = await niobot.which(file).from_file(file) await response.reply(None, attachment) except ( ConnectionError, httpx.HTTPStatusError, ) as e: self.log.error(e, exc_info=True) await response.edit( content=f"# Error\n\nUpload failed:\n```\n{e}\n```" ) else: await response.edit( content=f"# [Downloaded {title}!]({webpage_url})\n\nViews: {views:,} | Likes: {likes:,}" )