import io import os import uuid import fastapi import httpx import logging import sqlite3 import appdirs from typing import Annotated from fastapi import Query, Header, HTTPException, Request from fastapi.responses import JSONResponse from pathlib import Path from bs4 import BeautifulSoup from rich.logging import RichHandler from fastapi.middleware.cors import CORSMiddleware logging.basicConfig( level=logging.getLevelName(os.environ.get("LOG_LEVEL", "INFO").upper()), format="%(message)s", datefmt="[%X]", handlers=[RichHandler(markup=True)] ) app = fastapi.FastAPI( root_path=os.environ.get("PREVIEW_ROOT_PATH", ""), ) app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_credentials=True, allow_methods=["GET", "OPTIONS"], allow_headers=["*", "Authorization"], ) MISSING_TOKEN = JSONResponse( {"errcode": "M_MISSING_TOKEN", "error": "Missing access token"}, 401 ) INVALID_TOKEN = JSONResponse( {"errcode": "M_INVALID_TOKEN", "error": "Invalid access token"}, 401 ) VALID_OG_TAGS = [ "title", "type", "image", "url", "audio", "description", "determiner", "locale", "locale:alternative", "site_name", "image:url", "image:secure_url", "image:type", "image:width", "image:height", "image:alt", "video", "video:url", "video:secure_url", "video:type", "video:width", "video:height", "video:alt", "video:actor", "video:actor:role", "video:director", "video:writer", "video:duration", "video:release_date", "video:tag", "video:series" "audio:url", "audio:secure_url", "audio:type", "music:duration", "music:album", "music:album:disc", "music:album:track", "music:musician", "music:song", "music:song:disc", "music:song:track", "music:release_date", "music:creator", "article:published_time", "article:modified_time", "article:expiration_time", "article:author", "article:section", "article:tag", "book:author", "book:tag", "book:isbn", "book:release_date", "profile:first_name", "profile:last_name", "profile:username", "profile:gender" ] URL_OG_TAGS = [ "video", "video:url", "video:secure_url", "image", "image:url", "image:secure_url", "audio", "audio:url", "audio:secure_url" ] if Path.cwd() == Path("/app"): logging.info("Look to be running in a docker container. Cache will be stored in /app/cache.") CACHE_DIR = Path("/app/cache") else: CACHE_DIR = Path(appdirs.user_cache_dir("matrix-url-preview")) CACHE_FILE = CACHE_DIR / "db.sqlite3" logging.debug("Cache file: %r", CACHE_FILE) @app.on_event("startup") async def startup(): if not CACHE_DIR.exists(): CACHE_DIR.mkdir(parents=True) with sqlite3.connect(CACHE_FILE) as conn: conn.execute( """ CREATE TABLE IF NOT EXISTS cache ( uuid TEXT PRIMARY KEY, url TEXT NOT NULL, ts INTEGER NOT NULL, metadata TEXT NOT NULL ) """ ) def upload_media(domain: str, access_token: str, file: io.BytesIO, filename: str, content_type: str): file.seek(0) logging.info( "Creating media at %r called %r with the content type %r and %d bytes", domain, filename, content_type, len(file.getvalue()) ) file.seek(0) response = httpx.post( "%s/_matrix/media/r0/upload" % domain, headers={ "Authorization": f"Bearer {access_token}", "Content-Type": content_type }, data=file.getvalue(), params={ "filename": filename } ) if response.status_code == 200: logging.info("Media uploaded successfully") mxc_url = response.json()["content_uri"] return mxc_url else: logging.warning("Failed to upload media: HTTP %s", response.status_code) logging.debug("Response: %r", response.text) return None @app.get("/preview_url") def preview_url( req: Request, url: Annotated[str, Query(..., description="URL to preview")], ts: int = Query(None, description="The preferred point in time to return a preview for."), access_token_qs: str | None = Query(None, alias="access_token", description="Access token to use for the request."), access_token_header: str | None = Header( None, alias="Authorization", description="Access token to use for the request." ), ): if access_token_qs is not None: access_token = access_token_qs elif access_token_header and access_token_header.startswith("Bearer "): access_token = access_token_header.split("Bearer ")[1] else: return MISSING_TOKEN with sqlite3.connect(CACHE_FILE) as conn: cursor = conn.cursor() cursor.execute( "SELECT (metadata,ts) FROM cache WHERE url = ?", (url, ts) ) results = cursor.fetchall() if results: for result in results: # find the one with the closest timestamp metadata, _ts = result if ts is None or abs(ts - _ts) < 3600: logging.debug("Optimal cache hit for %r", url) return metadata # No close matches, get the latest one metadata, _ts = results[-1] # If the latest one is more than 3 hours old, re-fetch. Otherwise, return. if ts is None or abs(ts - _ts) < 10800: logging.debug("Cache hit for %r", url) return metadata domain = os.environ.get("PREVIEW_HOMESERVER", "https://" + req.url.hostname) try: response = httpx.get( url, headers={ "User-Agent": "Mozilla/5.0 (X11; Linux x86_64; rv:122.0) Gecko/20100101 Firefox/122.0" }, timeout=60, follow_redirects=True ) except httpx.HTTPError as e: raise HTTPException(500, f"Failed to fetch URL: {e!r}") if "text/html" not in response.headers.get("content-type", ""): return {} soup = BeautifulSoup(response.text, "html.parser") og_tags = {} for tag in soup.find_all("meta"): if tag.get("property", "").startswith("og:"): tag_name = tag.get("property")[3:] if tag_name in VALID_OG_TAGS: og_tags[tag_name] = tag.get("content") for tag_name in URL_OG_TAGS: if tag_name in og_tags: _url = og_tags[tag_name] try: with httpx.stream( url=_url, method="GET", headers={ "User-Agent": "Mozilla/5.0 (X11; Linux x86_64; rv:122.0) Gecko/20100101 Firefox/122.0" }, timeout=60, follow_redirects=True ) as response_media: if response_media.status_code not in range(200, 300): logging.warning("Failed to fetch media: %r - HTTP %s", _url, response_media.status_code) og_tags.pop(tag_name, None) elif not response_media.headers.get("content-type", "").startswith(("image/", "video/", "audio/")): logging.warning("Failed to fetch media: %r - not a media type", _url) og_tags.pop(tag_name, None) else: logging.info( "Downloading {:,} bytes of media: {!r}".format( int(response_media.headers.get("content-length", 0)), _url ) ) _file = io.BytesIO() _file.write(response_media.read()) _file.seek(0) if "content-length" in response_media.headers: _file.seek(0, os.SEEK_END) if int(response_media.headers["content-length"]) != _file.tell(): logging.warning( "Possibly failed to fetch media: {!r} - incomplete ({:,} downloaded, {:,} needed)" .format( _url, _file.tell(), int(response_media.headers["content-length"]) ) ) # og_tags.pop(tag_name, None) # continue _file.seek(0) upload_response = upload_media( domain, access_token, _file, Path(httpx.URL(_url).path).name, response_media.headers.get("content-type", "") ) if upload_response: og_tags["original:" + tag_name] = og_tags[tag_name] og_tags[tag_name] = upload_response if tag_name in ["image", "image:url", "image:secure_url"]: _file.seek(0) og_tags["matrix:image:size"] = len(_file.getvalue()) logging.info("Uploaded media: %r" % _url) else: logging.warning("Failed to upload media: %r (no returned mxc)", _url) except httpx.HTTPError as e: logging.exception("Failed to fetch url for OG tags @ %r: %r", _url, e, exc_info=True) og_tags.pop(tag_name, None) for key in og_tags.copy().keys(): if not key.startswith(("original:", "og:", "matrix:")): value = og_tags.pop(key, None) og_tags["og:" + key] = value with sqlite3.connect(CACHE_FILE) as conn: conn.execute( "INSERT INTO cache (uuid, url, ts, metadata) VALUES (?, ?, ?, ?)", (str(uuid.uuid4()), url, int(response.headers["date"]), str(og_tags)) ) return og_tags if __name__ == "__main__": import uvicorn uvicorn.run(app, host="0.0.0.0", port=2226)