Add auto responder for trump shit

This commit is contained in:
Nexus 2024-04-14 23:12:02 +01:00
parent b0f2f3f806
commit 41e5984664
Signed by: nex
GPG key ID: 0FA334385D0B689F
2 changed files with 150 additions and 19 deletions

120
src/cogs/auto_responder.py Normal file
View file

@ -0,0 +1,120 @@
import asyncio
import pathlib
import subprocess
import discord
import re
import tempfile
from urllib.parse import urlparse
import logging
from discord.ext import commands
from .ffmeta import FFMeta
class AutoResponder(commands.Cog):
def __init__(self, bot: commands.Bot):
self.bot = bot
self.log = logging.getLogger("jimmy.cogs.auto_responder")
self.transcode_lock = asyncio.Lock()
@staticmethod
def extract_links(text: str, *domains: str) -> list[str]:
"""
Extracts all links from a given text.
:param text: The raw text to extract links from.
:param domains: A list of domains to filter for.
:return: A list of found links
"""
split = text.split()
links = []
for word in split:
url = urlparse(word)
if not url.netloc:
continue
if domains and url.netloc not in domains:
continue
links.append(url.geturl())
return links
async def _transcode_hevc_to_h264(self, uri: str | pathlib.Path) -> tuple[discord.File, pathlib.Path] | None:
"""
Transcodes the given URL or file to H264 from HEVC, trying to preserve quality and file size.
:param uri: The URI to transcode
:return: A transcoded file
"""
self.log.info("Waiting for transcode lock to release")
async with self.transcode_lock:
cog: FFMeta = self.bot.get_cog("FFMeta")
if not cog:
raise RuntimeError("FFMeta cog not loaded")
if not isinstance(uri, str):
uri = str(uri)
self.log.info("Probing %s", uri)
info = await cog._run_ffprobe(uri, True)
if not info:
raise ValueError("No info found for %r" % uri)
streams = info.get("streams", [])
for stream in streams:
if stream.get("codec_name") == "hevc":
self.log.info("Found HEVC stream: %r", stream)
break
else:
self.log.info("No HEVC streams found in %s", uri)
return
with tempfile.NamedTemporaryFile(suffix=".mp4") as tmp:
tmp_path = pathlib.Path(tmp.name)
self.log.debug("Transcoding %r to %r", uri, tmp_path)
args = [
"-i", str(uri),
"-c:v", "libx264",
"-crf", "25",
"-c:a", "libopus",
"-b:a", "64k",
"-preset", "slower"
"-y"
]
process = await asyncio.create_subprocess_exec(
"ffmpeg",
*args,
str(tmp_path),
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
stdout, stderr = await process.communicate()
self.log.info("finished transcode with return code %d", process.returncode)
self.log.debug("stdout: %r", stdout.decode)
self.log.debug("stderr: %r", stderr.decode)
if process.returncode != 0:
raise subprocess.SubprocessError(
process.returncode,
" ".join(args),
stdout,
stderr,
)
return discord.File(tmp_path), tmp_path
@commands.Cog.listener("on_message")
async def auto_responder(self, message: discord.Message):
if message.author == self.bot.user:
return
# Check for HEVC truth social links and convert into h264
links = self.extract_links(message.content, "static-assets-1.truthsocial.com")
if links:
for link in links:
self.log.info("Found link to transcode: %r", link)
try:
file, _p = await self._transcode_hevc_to_h264(link)
if file:
if _p.stat().st_size <= 24.5 * 1024 * 1024:
await message.reply(file=file)
except Exception as e:
self.log.error("Failed to transcode %r: %r", link, e)
def setup(bot):
bot.add_cog(AutoResponder(bot))

View file

@ -2,6 +2,7 @@ import asyncio
import io import io
import json import json
import logging import logging
import pathlib
import tempfile import tempfile
import typing import typing
from pathlib import Path from pathlib import Path
@ -29,6 +30,31 @@ class FFMeta(commands.Cog):
img_dst.seek(0) img_dst.seek(0)
return img_dst return img_dst
async def _run_ffprobe(self, uri: str | pathlib.Path, as_json: bool = False) -> dict | str:
"""
Runs ffprobe on the given target (either file path or URL) and returns the result
:param uri: the URI to run ffprobe on
:return: The result
"""
_bin = "ffprobe"
cmd = ["-hide_banner", "-v", "quiet", "-print_format", "json", "-show_streams", "-show_format", "-i", str(uri)]
if not as_json:
cmd = ["-hide_banner", "-i", str(uri)]
process = await asyncio.create_subprocess_exec(
"ffprobe",
"-v", "quiet",
"-print_format", "json" if as_json else "default",
"-show_format",
"-i",
str(uri),
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
stdout, stderr = await process.communicate()
if as_json:
return json.loads(stdout.decode())
return stdout.decode(errors="replace")
@commands.slash_command() @commands.slash_command()
async def ffprobe(self, ctx: discord.ApplicationContext, url: str = None, attachment: discord.Attachment = None): async def ffprobe(self, ctx: discord.ApplicationContext, url: str = None, attachment: discord.Attachment = None):
"""Runs ffprobe on a given URL or attachment""" """Runs ffprobe on a given URL or attachment"""
@ -39,26 +65,11 @@ class FFMeta(commands.Cog):
await ctx.defer() await ctx.defer()
process = await asyncio.create_subprocess_exec( stdout = await self._run_ffprobe(url)
"ffprobe",
"-hide_banner",
url,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
stdout, stderr = await process.communicate()
stdout = stdout.decode("utf-8", "replace")
stderr = stderr.decode("utf-8", "replace")
paginator = commands.Paginator(prefix="```", suffix="```") paginator = commands.Paginator()
for line in stdout.splitlines(): for line in stdout.splitlines():
if stderr: paginator.add_line(line[:2000])
paginator.add_line(f"[OUT] {line}"[:2000])
else:
paginator.add_line(line[:2000])
for line in stderr.splitlines():
paginator.add_line(f"[ERR] {line}"[:2000])
for page in paginator.pages: for page in paginator.pages:
await ctx.respond(page) await ctx.respond(page)
@ -195,7 +206,7 @@ class FFMeta(commands.Cog):
process = await asyncio.create_subprocess_exec( process = await asyncio.create_subprocess_exec(
"ffmpeg", "ffmpeg",
"-hide_banner", "-hide_banner",
"-loglevel", "-v",
"warning", "warning",
"-stats", "-stats",
"-i", "-i",