import os import fastapi import httpx import logging from typing import Annotated from fastapi import Query, Header, HTTPException, Request from fastapi.responses import JSONResponse from pathlib import Path from bs4 import BeautifulSoup from rich.logging import RichHandler logging.basicConfig(level=logging.INFO, format="%(message)s", datefmt="[%X]", handlers=[RichHandler(markup=True)]) app = fastapi.FastAPI() MISSING_TOKEN = JSONResponse( {"errcode": "M_MISSING_TOKEN", "error": "Missing access token"}, 401 ) INVALID_TOKEN = JSONResponse( {"errcode": "M_INVALID_TOKEN", "error": "Invalid access token"}, 401 ) VALID_OG_TAGS = [ "title", "type", "image", "url", "audio", "description", "determiner", "locale", "locale:alternative", "site_name", "image:url", "image:secure_url", "image:type", "image:width", "image:height", "image:alt", "video", "video:url", "video:secure_url", "video:type", "video:width", "video:height", "video:alt", "video:actor", "video:actor:role", "video:director", "video:writer", "video:duration", "video:release_date", "video:tag", "video:series" "audio:url", "audio:secure_url", "audio:type", "music:duration", "music:album", "music:album:disc", "music:album:track", "music:musician", "music:song", "music:song:disc", "music:song:track", "music:release_date", "music:creator", "article:published_time", "article:modified_time", "article:expiration_time", "article:author", "article:section", "article:tag", "book:author", "book:tag", "book:isbn", "book:release_date", "profile:first_name", "profile:last_name", "profile:username", "profile:gender" ] URL_OG_TAGS = [ "video", "video:url", "video:secure_url", "image", "image:url", "image:secure_url", "audio", "audio:url", "audio:secure_url" ] def upload_media(domain: str, access_token: str, file: bytes, filename: str, content_type: str): logging.info( "Creating media at %r called %r with the content type %r and %d bytes", domain, filename, content_type, len(file) ) response = httpx.post( "%s/_matrix/media/r0/upload" % domain, headers={ "Authorization": f"Bearer {access_token}", "Content-Type": content_type }, files={ "file": (filename, file, content_type) }, params={ "filename": filename } ) if response.status_code == 200: logging.info("Media uploaded successfully") mxc_url = response.json()["content_uri"] return mxc_url else: logging.warning("Failed to upload media: HTTP %s", response.status_code) logging.debug("Response: %r", response.text) return None @app.get("/preview_url") def preview_url( req: Request, url: Annotated[str, Query(..., description="URL to preview")], access_token_qs: str | None = Query(None, alias="access_token", description="Access token to use for the request."), access_token_header: str | None = Header(None, alias="Authorization", description="Access token to use for the request."), ): if access_token_qs is not None: access_token = access_token_qs elif access_token_header and access_token_header.startswith("Bearer "): access_token = access_token_header.split("Bearer ")[1] else: return MISSING_TOKEN domain = os.environ.get("PREVIEW_HOMESERVER", "https://" + req.url.hostname) try: response = httpx.get( url, headers={ "User-Agent": "Mozilla/5.0 (X11; Linux x86_64; rv:122.0) Gecko/20100101 Firefox/122.0" }, timeout=60, follow_redirects=True ) except httpx.HTTPError as e: raise HTTPException(500, f"Failed to fetch URL: {e!r}") if "text/html" not in response.headers.get("content-type", ""): return {} soup = BeautifulSoup(response.text, "html.parser") og_tags = {} for tag in soup.find_all("meta"): if tag.get("property", "").startswith("og:"): tag_name = tag.get("property")[3:] if tag_name in VALID_OG_TAGS: og_tags[tag_name] = tag.get("content") for tag_name in URL_OG_TAGS: if tag_name in og_tags: _url = og_tags[tag_name] try: response_media = httpx.get( _url, headers={ "User-Agent": "Mozilla/5.0 (X11; Linux x86_64; rv:122.0) Gecko/20100101 Firefox/122.0" }, timeout=60, follow_redirects=True ) except httpx.HTTPError as e: logging.exception("Failed to fetch url for OG tags @ %r: %r", _url, e, exc_info=True) og_tags.pop(tag_name, None) else: if response_media.status_code not in range(200, 300): logging.warning("Failed to fetch media: %r - HTTP %s", _url, response_media.status_code) og_tags.pop(tag_name, None) else: upload_response = upload_media( domain, access_token, response_media.content, Path(httpx.URL(_url).path).name, response_media.headers.get("content-type", "") ) if upload_response: og_tags[tag_name] = upload_response logging.info("Uploaded media: %r" % _url) else: logging.warning("Failed to upload media: %r (no returned mxc)", _url) return og_tags if __name__ == "__main__": import uvicorn uvicorn.run(app, host="0.0.0.0", port=2226)