2024-02-09 22:37:12 +00:00
|
|
|
import os
|
|
|
|
|
|
|
|
import fastapi
|
|
|
|
import httpx
|
|
|
|
import logging
|
|
|
|
from typing import Annotated
|
|
|
|
from fastapi import Query, Header, HTTPException, Request
|
|
|
|
from fastapi.responses import JSONResponse
|
|
|
|
from pathlib import Path
|
|
|
|
from bs4 import BeautifulSoup
|
|
|
|
from rich.logging import RichHandler
|
2024-02-09 23:04:24 +00:00
|
|
|
from fastapi.middleware.cors import CORSMiddleware
|
2024-02-09 22:37:12 +00:00
|
|
|
|
|
|
|
|
|
|
|
logging.basicConfig(level=logging.INFO, format="%(message)s", datefmt="[%X]", handlers=[RichHandler(markup=True)])
|
2024-02-09 22:50:10 +00:00
|
|
|
app = fastapi.FastAPI(
|
|
|
|
root_path=os.environ.get("PREVIEW_ROOT_PATH", ""),
|
|
|
|
)
|
2024-02-09 23:04:24 +00:00
|
|
|
app.add_middleware(
|
|
|
|
CORSMiddleware,
|
|
|
|
allow_origins=["*"],
|
|
|
|
allow_credentials=True,
|
|
|
|
allow_methods=["GET", "OPTIONS"],
|
|
|
|
allow_headers=["*", "Authorization"],
|
|
|
|
)
|
2024-02-09 22:37:12 +00:00
|
|
|
|
|
|
|
MISSING_TOKEN = JSONResponse(
|
|
|
|
{"errcode": "M_MISSING_TOKEN", "error": "Missing access token"},
|
|
|
|
401
|
|
|
|
)
|
|
|
|
INVALID_TOKEN = JSONResponse(
|
|
|
|
{"errcode": "M_INVALID_TOKEN", "error": "Invalid access token"},
|
|
|
|
401
|
|
|
|
)
|
|
|
|
VALID_OG_TAGS = [
|
|
|
|
"title",
|
|
|
|
"type",
|
|
|
|
"image",
|
|
|
|
"url",
|
|
|
|
"audio",
|
|
|
|
"description",
|
|
|
|
"determiner",
|
|
|
|
"locale",
|
|
|
|
"locale:alternative",
|
|
|
|
"site_name",
|
|
|
|
"image:url",
|
|
|
|
"image:secure_url",
|
|
|
|
"image:type",
|
|
|
|
"image:width",
|
|
|
|
"image:height",
|
|
|
|
"image:alt",
|
|
|
|
"video",
|
|
|
|
"video:url",
|
|
|
|
"video:secure_url",
|
|
|
|
"video:type",
|
|
|
|
"video:width",
|
|
|
|
"video:height",
|
|
|
|
"video:alt",
|
|
|
|
"video:actor",
|
|
|
|
"video:actor:role",
|
|
|
|
"video:director",
|
|
|
|
"video:writer",
|
|
|
|
"video:duration",
|
|
|
|
"video:release_date",
|
|
|
|
"video:tag",
|
|
|
|
"video:series"
|
|
|
|
"audio:url",
|
|
|
|
"audio:secure_url",
|
|
|
|
"audio:type",
|
|
|
|
"music:duration",
|
|
|
|
"music:album",
|
|
|
|
"music:album:disc",
|
|
|
|
"music:album:track",
|
|
|
|
"music:musician",
|
|
|
|
"music:song",
|
|
|
|
"music:song:disc",
|
|
|
|
"music:song:track",
|
|
|
|
"music:release_date",
|
|
|
|
"music:creator",
|
|
|
|
"article:published_time",
|
|
|
|
"article:modified_time",
|
|
|
|
"article:expiration_time",
|
|
|
|
"article:author",
|
|
|
|
"article:section",
|
|
|
|
"article:tag",
|
|
|
|
"book:author",
|
|
|
|
"book:tag",
|
|
|
|
"book:isbn",
|
|
|
|
"book:release_date",
|
|
|
|
"profile:first_name",
|
|
|
|
"profile:last_name",
|
|
|
|
"profile:username",
|
|
|
|
"profile:gender"
|
|
|
|
]
|
|
|
|
URL_OG_TAGS = [
|
|
|
|
"video",
|
|
|
|
"video:url",
|
|
|
|
"video:secure_url",
|
|
|
|
"image",
|
|
|
|
"image:url",
|
|
|
|
"image:secure_url",
|
|
|
|
"audio",
|
|
|
|
"audio:url",
|
|
|
|
"audio:secure_url"
|
|
|
|
]
|
|
|
|
|
|
|
|
|
|
|
|
def upload_media(domain: str, access_token: str, file: bytes, filename: str, content_type: str):
|
|
|
|
logging.info(
|
|
|
|
"Creating media at %r called %r with the content type %r and %d bytes",
|
|
|
|
domain,
|
|
|
|
filename,
|
|
|
|
content_type,
|
|
|
|
len(file)
|
|
|
|
)
|
|
|
|
|
|
|
|
response = httpx.post(
|
|
|
|
"%s/_matrix/media/r0/upload" % domain,
|
|
|
|
headers={
|
|
|
|
"Authorization": f"Bearer {access_token}",
|
|
|
|
"Content-Type": content_type
|
|
|
|
},
|
|
|
|
files={
|
|
|
|
"file": (filename, file, content_type)
|
|
|
|
},
|
|
|
|
params={
|
|
|
|
"filename": filename
|
|
|
|
}
|
|
|
|
)
|
|
|
|
if response.status_code == 200:
|
|
|
|
logging.info("Media uploaded successfully")
|
|
|
|
mxc_url = response.json()["content_uri"]
|
|
|
|
return mxc_url
|
|
|
|
else:
|
|
|
|
logging.warning("Failed to upload media: HTTP %s", response.status_code)
|
|
|
|
logging.debug("Response: %r", response.text)
|
|
|
|
return None
|
|
|
|
|
|
|
|
|
|
|
|
@app.get("/preview_url")
|
|
|
|
def preview_url(
|
|
|
|
req: Request,
|
|
|
|
url: Annotated[str, Query(..., description="URL to preview")],
|
|
|
|
access_token_qs: str | None = Query(None, alias="access_token", description="Access token to use for the request."),
|
|
|
|
access_token_header: str | None = Header(None, alias="Authorization", description="Access token to use for the request."),
|
|
|
|
):
|
|
|
|
if access_token_qs is not None:
|
|
|
|
access_token = access_token_qs
|
|
|
|
elif access_token_header and access_token_header.startswith("Bearer "):
|
|
|
|
access_token = access_token_header.split("Bearer ")[1]
|
|
|
|
else:
|
|
|
|
return MISSING_TOKEN
|
|
|
|
|
|
|
|
domain = os.environ.get("PREVIEW_HOMESERVER", "https://" + req.url.hostname)
|
|
|
|
|
|
|
|
try:
|
|
|
|
response = httpx.get(
|
|
|
|
url,
|
|
|
|
headers={
|
|
|
|
"User-Agent": "Mozilla/5.0 (X11; Linux x86_64; rv:122.0) Gecko/20100101 Firefox/122.0"
|
|
|
|
},
|
|
|
|
timeout=60,
|
|
|
|
follow_redirects=True
|
|
|
|
)
|
|
|
|
except httpx.HTTPError as e:
|
|
|
|
raise HTTPException(500, f"Failed to fetch URL: {e!r}")
|
|
|
|
|
|
|
|
if "text/html" not in response.headers.get("content-type", ""):
|
|
|
|
return {}
|
|
|
|
|
|
|
|
soup = BeautifulSoup(response.text, "html.parser")
|
|
|
|
og_tags = {}
|
|
|
|
|
|
|
|
for tag in soup.find_all("meta"):
|
|
|
|
if tag.get("property", "").startswith("og:"):
|
|
|
|
tag_name = tag.get("property")[3:]
|
|
|
|
if tag_name in VALID_OG_TAGS:
|
|
|
|
og_tags[tag_name] = tag.get("content")
|
|
|
|
|
|
|
|
for tag_name in URL_OG_TAGS:
|
|
|
|
if tag_name in og_tags:
|
|
|
|
_url = og_tags[tag_name]
|
|
|
|
try:
|
|
|
|
response_media = httpx.get(
|
|
|
|
_url,
|
|
|
|
headers={
|
|
|
|
"User-Agent": "Mozilla/5.0 (X11; Linux x86_64; rv:122.0) Gecko/20100101 Firefox/122.0"
|
|
|
|
},
|
|
|
|
timeout=60,
|
|
|
|
follow_redirects=True
|
|
|
|
)
|
|
|
|
except httpx.HTTPError as e:
|
|
|
|
logging.exception("Failed to fetch url for OG tags @ %r: %r", _url, e, exc_info=True)
|
|
|
|
og_tags.pop(tag_name, None)
|
|
|
|
else:
|
|
|
|
if response_media.status_code not in range(200, 300):
|
|
|
|
logging.warning("Failed to fetch media: %r - HTTP %s", _url, response_media.status_code)
|
|
|
|
og_tags.pop(tag_name, None)
|
|
|
|
else:
|
|
|
|
upload_response = upload_media(
|
|
|
|
domain,
|
|
|
|
access_token,
|
|
|
|
response_media.content,
|
|
|
|
Path(httpx.URL(_url).path).name,
|
|
|
|
response_media.headers.get("content-type", "")
|
|
|
|
)
|
|
|
|
if upload_response:
|
|
|
|
og_tags[tag_name] = upload_response
|
2024-02-09 23:06:47 +00:00
|
|
|
if tag_name == "image":
|
|
|
|
og_tags["matrix:image:size"] = len(response_media.content)
|
2024-02-09 22:37:12 +00:00
|
|
|
logging.info("Uploaded media: %r" % _url)
|
|
|
|
else:
|
|
|
|
logging.warning("Failed to upload media: %r (no returned mxc)", _url)
|
|
|
|
|
|
|
|
return og_tags
|
|
|
|
|
|
|
|
|
|
|
|
if __name__ == "__main__":
|
|
|
|
import uvicorn
|
|
|
|
uvicorn.run(app, host="0.0.0.0", port=2226)
|