drop-in-url-previews/server.py

254 lines
8.3 KiB
Python
Raw Normal View History

2024-02-09 23:11:07 +00:00
import io
2024-02-09 22:37:12 +00:00
import os
import fastapi
import httpx
import logging
from typing import Annotated
from fastapi import Query, Header, HTTPException, Request
from fastapi.responses import JSONResponse
from pathlib import Path
from bs4 import BeautifulSoup
from rich.logging import RichHandler
2024-02-09 23:04:24 +00:00
from fastapi.middleware.cors import CORSMiddleware
2024-02-09 22:37:12 +00:00
logging.basicConfig(level=logging.INFO, format="%(message)s", datefmt="[%X]", handlers=[RichHandler(markup=True)])
2024-02-09 22:50:10 +00:00
app = fastapi.FastAPI(
root_path=os.environ.get("PREVIEW_ROOT_PATH", ""),
)
2024-02-09 23:04:24 +00:00
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["GET", "OPTIONS"],
allow_headers=["*", "Authorization"],
)
2024-02-09 22:37:12 +00:00
MISSING_TOKEN = JSONResponse(
{"errcode": "M_MISSING_TOKEN", "error": "Missing access token"},
401
)
INVALID_TOKEN = JSONResponse(
{"errcode": "M_INVALID_TOKEN", "error": "Invalid access token"},
401
)
VALID_OG_TAGS = [
"title",
"type",
"image",
"url",
"audio",
"description",
"determiner",
"locale",
"locale:alternative",
"site_name",
"image:url",
"image:secure_url",
"image:type",
"image:width",
"image:height",
"image:alt",
"video",
"video:url",
"video:secure_url",
"video:type",
"video:width",
"video:height",
"video:alt",
"video:actor",
"video:actor:role",
"video:director",
"video:writer",
"video:duration",
"video:release_date",
"video:tag",
"video:series"
"audio:url",
"audio:secure_url",
"audio:type",
"music:duration",
"music:album",
"music:album:disc",
"music:album:track",
"music:musician",
"music:song",
"music:song:disc",
"music:song:track",
"music:release_date",
"music:creator",
"article:published_time",
"article:modified_time",
"article:expiration_time",
"article:author",
"article:section",
"article:tag",
"book:author",
"book:tag",
"book:isbn",
"book:release_date",
"profile:first_name",
"profile:last_name",
"profile:username",
"profile:gender"
]
URL_OG_TAGS = [
"video",
"video:url",
"video:secure_url",
"image",
"image:url",
"image:secure_url",
"audio",
"audio:url",
"audio:secure_url"
]
2024-02-09 23:11:07 +00:00
def upload_media(domain: str, access_token: str, file: io.BytesIO, filename: str, content_type: str):
file.seek(0)
2024-02-09 22:37:12 +00:00
logging.info(
"Creating media at %r called %r with the content type %r and %d bytes",
domain,
filename,
content_type,
2024-02-09 23:11:07 +00:00
len(file.getvalue())
2024-02-09 22:37:12 +00:00
)
2024-02-09 23:33:08 +00:00
file.seek(0)
2024-02-09 22:37:12 +00:00
response = httpx.post(
"%s/_matrix/media/r0/upload" % domain,
headers={
"Authorization": f"Bearer {access_token}",
"Content-Type": content_type
},
2024-02-09 23:33:08 +00:00
data=file.getvalue(),
2024-02-09 22:37:12 +00:00
params={
"filename": filename
}
)
if response.status_code == 200:
logging.info("Media uploaded successfully")
mxc_url = response.json()["content_uri"]
return mxc_url
else:
logging.warning("Failed to upload media: HTTP %s", response.status_code)
logging.debug("Response: %r", response.text)
return None
@app.get("/preview_url")
def preview_url(
req: Request,
url: Annotated[str, Query(..., description="URL to preview")],
access_token_qs: str | None = Query(None, alias="access_token", description="Access token to use for the request."),
access_token_header: str | None = Header(None, alias="Authorization", description="Access token to use for the request."),
):
if access_token_qs is not None:
access_token = access_token_qs
elif access_token_header and access_token_header.startswith("Bearer "):
access_token = access_token_header.split("Bearer ")[1]
else:
return MISSING_TOKEN
domain = os.environ.get("PREVIEW_HOMESERVER", "https://" + req.url.hostname)
try:
response = httpx.get(
url,
headers={
"User-Agent": "Mozilla/5.0 (X11; Linux x86_64; rv:122.0) Gecko/20100101 Firefox/122.0"
},
timeout=60,
follow_redirects=True
)
except httpx.HTTPError as e:
raise HTTPException(500, f"Failed to fetch URL: {e!r}")
if "text/html" not in response.headers.get("content-type", ""):
return {}
soup = BeautifulSoup(response.text, "html.parser")
og_tags = {}
for tag in soup.find_all("meta"):
if tag.get("property", "").startswith("og:"):
tag_name = tag.get("property")[3:]
if tag_name in VALID_OG_TAGS:
og_tags[tag_name] = tag.get("content")
for tag_name in URL_OG_TAGS:
if tag_name in og_tags:
_url = og_tags[tag_name]
try:
2024-02-09 23:33:08 +00:00
with httpx.stream(
url=_url,
method="GET",
2024-02-09 22:37:12 +00:00
headers={
"User-Agent": "Mozilla/5.0 (X11; Linux x86_64; rv:122.0) Gecko/20100101 Firefox/122.0"
},
timeout=60,
follow_redirects=True
2024-02-09 23:33:08 +00:00
) as response_media:
if response_media.status_code not in range(200, 300):
logging.warning("Failed to fetch media: %r - HTTP %s", _url, response_media.status_code)
og_tags.pop(tag_name, None)
elif not response_media.headers.get("content-type", "").startswith(("image/", "video/", "audio/")):
logging.warning("Failed to fetch media: %r - not a media type", _url)
og_tags.pop(tag_name, None)
else:
logging.info(
"Downloading {:,} bytes of media: {!r}".format(
int(response_media.headers.get("content-length", 0)),
_url
)
)
_file = io.BytesIO()
_file.write(response_media.read())
_file.seek(0)
if "content-length" in response_media.headers:
_file.seek(0, os.SEEK_END)
if int(response_media.headers["content-length"]) != _file.tell():
logging.warning(
"Possibly failed to fetch media: {!r} - incomplete ({:,} downloaded, {:,} needed)"
2024-02-09 23:33:08 +00:00
.format(
_url,
_file.tell(),
int(response_media.headers["content-length"])
)
)
# og_tags.pop(tag_name, None)
# continue
2024-02-09 23:33:08 +00:00
_file.seek(0)
upload_response = upload_media(
domain,
access_token,
_file,
Path(httpx.URL(_url).path).name,
response_media.headers.get("content-type", "")
)
if upload_response:
og_tags["original:" + tag_name] = og_tags[tag_name]
og_tags[tag_name] = upload_response
if tag_name in ["image", "image:url", "image:secure_url"]:
_file.seek(0)
og_tags["matrix:image:size"] = len(_file.getvalue())
logging.info("Uploaded media: %r" % _url)
else:
logging.warning("Failed to upload media: %r (no returned mxc)", _url)
2024-02-09 22:37:12 +00:00
except httpx.HTTPError as e:
logging.exception("Failed to fetch url for OG tags @ %r: %r", _url, e, exc_info=True)
og_tags.pop(tag_name, None)
2024-02-10 01:32:37 +00:00
for key in og_tags.copy().keys():
if not key.startswith(("original:", "og:", "matrix:")):
value = og_tags.pop(key, None)
og_tags["og:" + key] = value
2024-02-09 22:37:12 +00:00
return og_tags
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=2226)