nonsensebot/app/modules/yd_dl.py
2024-09-18 21:08:01 +01:00

380 lines
15 KiB
Python

"""
What it says on the tin - a command wrapper around yt-dlp.
"""
import datetime
import functools
import math
import textwrap
import time
import typing
import uuid
from typing import Optional
from urllib.parse import urlparse
import httpx
import humanize
import niobot
import logging
import asyncio
import shutil
import subprocess
import tempfile
from pathlib import Path
from yt_dlp import YoutubeDL, DownloadError
def utcnow():
return datetime.datetime.now(tz=datetime.timezone.utc)
class YoutubeDLModule(niobot.Module):
def __init__(self, client: niobot.NioBot):
super().__init__(client)
self.log = logging.getLogger("jimmy.cogs.ytdl")
self.common_formats = {
"144p": "bv[width<=144]+ba[ext=webm]/bv[width<=144]+ba[ext=m4a]/bv[width<=144]+ba/b[width<=144]",
"240p": "bv[width<=240]+ba[ext=webm]/bv[width<=240]+ba[ext=m4a]/bv[width<=240]+ba/b[width<=240]",
"360p": "bv[width<=360]+ba[ext=webm]/bv[width<=360]+ba[ext=m4a]/bv[width<=360]+ba/b[width<=360]",
"480p": "bv[width<=500]+ba[ext=webm]/bv[width<=500]+ba[ext=m4a]/bv[width<=500]+bab[width<=480]",
"720p": "bv[width<=720]+ba[ext=webm]/bv[width<=720]+ba[ext=m4a]/bv[width<=720]+ba/b[width<=720]",
"1080p": "bv[width<=1080]+ba[ext=webm]/bv[width<=1080]+ba[ext=m4a]/bv[width<=1080]+ba",
"1440p": "bv[width<=1440]+ba[ext=webm]/bv[width<=1440]+ba[ext=m4a]/bv[width<=1440]+ba",
"2160p": "bv[width<=2160]+ba[ext=webm]/bv[width<=2160]+ba[ext=m4a]/bv[width<=2160]+ba",
"mp3": "ba[filesize<100M]",
"m4a": "ba[ext=m4a][filesize<100M]",
"opus": "ba[ext=webm][filesize<100M]",
"vorbis": "ba[ext=webm][filesize<100M]",
"ogg": "ba[ext=webm][filesize<100M]",
}
self.default_options = {
"noplaylist": True,
"nocheckcertificate": True,
"no_color": True,
"noprogress": True,
"logger": self.log,
"format": "((bv+ba/b)[vcodec!=h265][filesize<100M]/b[filesize<=100M]/b)",
"outtmpl": "%(title).50s.%(ext)s",
"format_sort": [
"vcodec:h264",
"acodec:aac",
"vcodec:vp9",
"acodec:opus",
"acodec:vorbis",
"vcodec:vp8",
"ext",
],
"merge_output_format": "webm/mp4/mov/m4a/oga/ogg/mp3/mka/mkv",
"source_address": "0.0.0.0",
"concurrent_fragment_downloads": 4,
"max_filesize": "600M"
}
async def convert_to_m4a(self, file: Path) -> Path:
"""
Converts a file to m4a format.
:param file: The file to convert
:return: The converted file
"""
def inner():
if not shutil.which("ffmpeg"):
raise RuntimeError("ffmpeg is not installed.")
new_file = file.with_suffix(".m4a")
args = [
"-vn",
"-sn",
"-i",
str(file),
"-c:a",
"aac",
"-b:a",
"96k",
"-movflags",
"faststart",
"-y",
str(new_file),
]
self.log.debug("Running command: ffmpeg %s", " ".join(args))
process = subprocess.run(
["ffmpeg", *args], stdout=subprocess.PIPE, stderr=subprocess.PIPE
)
if process.returncode != 0:
raise RuntimeError(process.stderr.decode())
return new_file
return await asyncio.to_thread(inner)
@staticmethod
async def upload_to_0x0(
name: str, data: typing.IO[bytes], mime_type: str | None = None
) -> str:
if not mime_type:
import magic
mime_type = await asyncio.to_thread(
magic.from_buffer, data.read(4096), mime=True
)
data.seek(0)
async with httpx.AsyncClient() as client:
response = await client.post(
"https://0x0.st",
files={"file": (name, data, mime_type)},
data={"expires": 12},
headers={
"User-Agent": "CollegeBot (see: https://gist.i-am.nexus/nex/f63fcb9eb389401caf66d1dfc3c7570c)"
},
)
if response.status_code == 200:
return urlparse(response.text).path[1:]
response.raise_for_status()
@niobot.command()
async def ytdl(
self,
ctx: niobot.Context,
url: str,
snip: Optional[str] = None,
download_format: Optional[str] = None,
):
"""
Downloads a video from YouTube or other source
snip: an optional timestamp to snip the video to, in the format `start-end`. e.g. `1:30-2:00`
download_format: the format to download the video in. e.g. `1080p`, or a specific ID (e.g. `22`)
"""
response = await ctx.respond("Preparing...")
options = self.default_options.copy()
description = ""
with tempfile.TemporaryDirectory(prefix="jimmy-ytdl-") as temp_dir:
temp_dir = Path(temp_dir)
paths = {
target: str(temp_dir)
for target in (
"home",
"temp",
)
}
chosen_format = self.default_options["format"]
if download_format:
if download_format in self.common_formats:
chosen_format = self.common_formats[download_format]
else:
chosen_format = download_format
options.setdefault("postprocessors", [])
options["format"] = chosen_format
options["paths"] = paths
filesize = 0
with YoutubeDL(options) as downloader:
await response.edit(content="Fetching metadata (step 1/10)")
try:
# noinspection PyTypeChecker
extracted_info = await asyncio.to_thread(
downloader.extract_info, url, download=False
)
except DownloadError as e:
extracted_info = {
"title": "error",
"thumbnail_url": None,
"webpage_url": url,
"format": "error",
"format_id": "-1",
"ext": "wav",
"format_note": str(e),
"resolution": "1x1",
"fps": "1",
"vcodec": "error",
"acodec": "error",
"filesize": 0,
}
title = "error"
description = str(e)
likes = views = 0
else:
title = extracted_info.get("title", url) or url
title = textwrap.shorten(title, 100)
webpage_url = extracted_info.get("webpage_url", url)
chosen_format = (
extracted_info.get("format")
or chosen_format
or str(uuid.uuid4())
)
chosen_format_id = extracted_info.get("format_id") or str(
uuid.uuid4()
)
final_extension = extracted_info.get("ext") or "mp4"
format_note = (
extracted_info.get(
"format_note", "%s (%s)" % (chosen_format, chosen_format_id)
)
or ""
)
resolution = extracted_info.get("resolution") or "1x1"
fps = extracted_info.get("fps", 0.0) or 0.0
vcodec = extracted_info.get("vcodec") or "h264"
acodec = extracted_info.get("acodec") or "aac"
filesize = extracted_info.get(
"filesize", extracted_info.get("filesize_approx", 1)
)
likes = extracted_info.get(
"like_count", extracted_info.get("average_rating", 0)
)
views = extracted_info.get("view_count", 0)
lines = []
if chosen_format and chosen_format_id:
lines.append(
"* Chosen format: `%s` (`%s`)"
% (chosen_format, chosen_format_id),
)
if format_note:
lines.append("* Format note: %r" % format_note)
if final_extension:
lines.append("* File extension: " + final_extension)
if resolution:
_s = resolution
if fps:
_s += " @ %s FPS" % fps
lines.append("* Resolution: " + _s)
if vcodec or acodec:
lines.append("%s+%s" % (vcodec or "N/A", acodec or "N/A"))
if filesize:
lines.append("* Filesize: %s" % humanize.naturalsize(filesize))
if lines:
description += "\n"
description += "\n".join(lines)
if filesize and filesize >= 500 * 1000:
return await response.edit("Sorry, could not find a format small enough.")
await response.edit(
f"# {title}\n\n{description}\n\nProgress: `0% [..........]`\n\nDownloading (step 2/10)"
)
try:
await asyncio.to_thread(
functools.partial(downloader.download, [url])
)
except DownloadError as e:
logging.error(e, exc_info=True)
return await response.edit(
f"# Error!\n\nDownload failed:\n```\n{e}\n```",
)
try:
file: Path = next(
temp_dir.glob("*." + extracted_info.get("ext", "*"))
)
except StopIteration:
ext = extracted_info.get("ext", "*")
self.log.warning(
"Failed to locate downloaded file. Was supposed to be looking for a file extension of "
"%r amongst files %r, however none were found.",
ext,
list(map(str, temp_dir.iterdir())),
)
return await response.edit(
f"# Error\n\nFailed to locate downloaded file. Expected a file with the extension {ext}.\n\n"
f"Files: {', '.join(list(map(str, temp_dir.iterdir())))}",
)
if snip:
try:
trim_start, trim_end = snip.split("-")
except ValueError:
trim_start, trim_end = snip, None
trim_start = trim_start or "00:00:00"
trim_end = trim_end or extracted_info.get(
"duration_string", "00:30:00"
)
new_file = temp_dir / ("output" + file.suffix)
args = [
"-hwaccel",
"auto",
"-i",
str(file),
"-ss",
trim_start,
"-to",
trim_end,
"-preset",
"fast",
"-crf",
"24",
"-deadline",
"realtime",
"-cpu-used",
"5",
"-movflags",
"faststart",
"-b:a",
"96k",
"-y",
"-strict",
"2",
str(new_file),
]
await response.edit(
f"# Trimming from {trim_start} to {trim_end}\n\nPlease wait, this may take a couple of minutes."
)
self.log.debug("Running command: 'ffmpeg %s'", " ".join(args))
process = await asyncio.create_subprocess_exec(
"ffmpeg",
*args,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
stdout, stderr = await process.communicate()
self.log.debug("STDOUT:\n%r", stdout.decode())
self.log.debug("STDERR:\n%r", stderr.decode())
if process.returncode != 0:
await response.edit(
f"# Trim failed\n\nError:\n```\n{stderr.decode()}\n```",
)
file = new_file
stat = file.stat()
size_bytes = stat.st_size
if size_bytes >= ((500 * 1024 * 1024) - 256):
return await response.edit(
f"# Error\n\nFile is too large to upload. Size: {humanize.naturalsize(size_bytes)}",
)
size_megabits = (size_bytes * 8) / 1024 / 1024
eta_seconds = size_megabits / 20
await response.edit(
content=f"Uploading (ETA: {humanize.naturaldelta(eta_seconds)})..."
)
views = views or 0
likes = likes or 0
try:
if vcodec.lower() in [
"hevc",
"h265",
"av1",
"av01",
]:
with file.open("rb") as fb:
part = await self.upload_to_0x0(file.name, fb)
await ctx.respond("https://embeds.video/0x0/" + part)
else:
attachment = await niobot.which(file).from_file(file)
await response.reply(None, attachment)
except (
ConnectionError,
httpx.HTTPStatusError,
) as e:
self.log.error(e, exc_info=True)
await response.edit(
content=f"# Error\n\nUpload failed:\n```\n{e}\n```"
)
else:
await response.edit(
content=f"# [Downloaded {title}!]({webpage_url})\n\nViews: {views:,} | Likes: {likes:,}"
)