drop-in-url-previews/server.py

365 lines
12 KiB
Python
Raw Normal View History

2024-02-10 01:57:03 +00:00
import contextlib
2024-02-09 23:11:07 +00:00
import io
import json
2024-02-09 22:37:12 +00:00
import os
2024-02-22 15:08:01 +00:00
import textwrap
import time
2024-02-10 01:52:20 +00:00
import uuid
2024-02-09 22:37:12 +00:00
import fastapi
import httpx
import logging
2024-02-10 01:52:20 +00:00
import sqlite3
import appdirs
2024-02-09 22:37:12 +00:00
from typing import Annotated
from fastapi import Query, Header, HTTPException, Request
from fastapi.responses import JSONResponse
from pathlib import Path
from bs4 import BeautifulSoup
from rich.logging import RichHandler
2024-02-09 23:04:24 +00:00
from fastapi.middleware.cors import CORSMiddleware
2024-02-09 22:37:12 +00:00
2024-02-10 02:00:55 +00:00
@contextlib.asynccontextmanager
async def startup(_):
if not CACHE_DIR.exists():
CACHE_DIR.mkdir(parents=True)
with sqlite3.connect(CACHE_FILE) as conn:
conn.execute(
"""
CREATE TABLE IF NOT EXISTS cache (
uuid TEXT PRIMARY KEY,
url TEXT NOT NULL,
ts INTEGER NOT NULL,
metadata TEXT NOT NULL
)
"""
)
yield
2024-02-10 01:52:20 +00:00
logging.basicConfig(
level=logging.getLevelName(os.environ.get("LOG_LEVEL", "INFO").upper()),
format="%(message)s",
datefmt="[%X]",
handlers=[RichHandler(markup=True)]
)
2024-02-09 22:50:10 +00:00
app = fastapi.FastAPI(
root_path=os.environ.get("PREVIEW_ROOT_PATH", ""),
2024-02-10 02:00:55 +00:00
lifespan=startup
2024-02-09 22:50:10 +00:00
)
2024-02-09 23:04:24 +00:00
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["GET", "OPTIONS"],
allow_headers=["*", "Authorization"],
)
2024-02-09 22:37:12 +00:00
MISSING_TOKEN = JSONResponse(
{"errcode": "M_MISSING_TOKEN", "error": "Missing access token"},
401
)
INVALID_TOKEN = JSONResponse(
{"errcode": "M_INVALID_TOKEN", "error": "Invalid access token"},
401
)
VALID_OG_TAGS = [
2024-02-22 15:08:01 +00:00
"og:title",
"og:type",
"og:image",
"og:url",
"og:audio",
"og:description",
"og:determiner",
"og:locale",
"og:locale:alternative",
"og:site_name",
"og:image:url",
"og:image:secure_url",
"og:image:type",
"og:image:width",
"og:image:height",
"og:image:alt",
"og:video",
"og:video:url",
"og:video:secure_url",
"og:video:type",
"og:video:width",
"og:video:height",
"og:video:alt",
"og:video:actor",
"og:video:actor:role",
"og:video:director",
"og:video:writer",
"og:video:duration",
"og:video:release_date",
"og:video:tag",
"og:video:series"
"og:audio:url",
"og:audio:secure_url",
"og:audio:type",
"og:music:duration",
"og:music:album",
"og:music:album:disc",
"og:music:album:track",
"og:music:musician",
"og:music:song",
"og:music:song:disc",
"og:music:song:track",
"og:music:release_date",
"og:music:creator",
"og:article:published_time",
"og:article:modified_time",
"og:article:expiration_time",
"og:article:author",
"og:article:section",
"og:article:tag",
"og:book:author",
"og:book:tag",
"og:book:isbn",
"og:book:release_date",
"og:profile:first_name",
"og:profile:last_name",
"og:profile:username",
"og:profile:gender"
2024-02-09 22:37:12 +00:00
]
URL_OG_TAGS = [
2024-02-22 15:08:01 +00:00
"og:video",
"og:video:url",
"og:video:secure_url",
"og:image",
"og:image:url",
"og:image:secure_url",
"og:audio",
"og:audio:url",
"og:audio:secure_url"
2024-02-09 22:37:12 +00:00
]
2024-02-22 15:08:01 +00:00
TWITTER_MAPPING = {
"twitter:site": "site_name",
"twitter:creator": "site_name",
"twitter:image": "image",
"twitter:title": "title",
"twitter:image:width": "image:width",
"twitter:image:height": "image:height",
}
2024-02-09 22:37:12 +00:00
2024-02-10 01:52:20 +00:00
if Path.cwd() == Path("/app"):
logging.info("Look to be running in a docker container. Cache will be stored in /app/cache.")
CACHE_DIR = Path("/app/cache")
else:
CACHE_DIR = Path(appdirs.user_cache_dir("matrix-url-preview"))
2024-02-10 01:59:56 +00:00
CACHE_DIR.mkdir(parents=True, exist_ok=True)
2024-02-10 01:52:20 +00:00
CACHE_FILE = CACHE_DIR / "db.sqlite3"
2024-02-10 01:59:56 +00:00
CACHE_FILE.touch(exist_ok=True)
2024-02-10 01:52:20 +00:00
logging.debug("Cache file: %r", CACHE_FILE)
2024-02-09 23:11:07 +00:00
def upload_media(domain: str, access_token: str, file: io.BytesIO, filename: str, content_type: str):
file.seek(0)
2024-02-09 22:37:12 +00:00
logging.info(
"Creating media at %r called %r with the content type %r and %d bytes",
domain,
filename,
content_type,
2024-02-09 23:11:07 +00:00
len(file.getvalue())
2024-02-09 22:37:12 +00:00
)
2024-02-09 23:33:08 +00:00
file.seek(0)
2024-02-09 22:37:12 +00:00
2024-02-16 21:10:03 +00:00
# noinspection PyTypeChecker
2024-02-09 22:37:12 +00:00
response = httpx.post(
"%s/_matrix/media/r0/upload" % domain,
headers={
"Authorization": f"Bearer {access_token}",
"Content-Type": content_type
},
2024-02-09 23:33:08 +00:00
data=file.getvalue(),
2024-02-09 22:37:12 +00:00
params={
"filename": filename
}
)
if response.status_code == 200:
logging.info("Media uploaded successfully")
mxc_url = response.json()["content_uri"]
return mxc_url
else:
logging.warning("Failed to upload media: HTTP %s", response.status_code)
logging.debug("Response: %r", response.text)
return None
@app.get("/preview_url")
def preview_url(
req: Request,
url: Annotated[str, Query(..., description="URL to preview")],
2024-02-10 01:52:20 +00:00
ts: int = Query(None, description="The preferred point in time to return a preview for."),
2024-02-09 22:37:12 +00:00
access_token_qs: str | None = Query(None, alias="access_token", description="Access token to use for the request."),
2024-02-10 01:52:20 +00:00
access_token_header: str | None = Header(
None,
alias="Authorization",
description="Access token to use for the request."
),
2024-02-09 22:37:12 +00:00
):
if access_token_qs is not None:
access_token = access_token_qs
elif access_token_header and access_token_header.startswith("Bearer "):
access_token = access_token_header.split("Bearer ")[1]
else:
return MISSING_TOKEN
2024-02-10 01:52:20 +00:00
with sqlite3.connect(CACHE_FILE) as conn:
cursor = conn.cursor()
cursor.execute(
2024-02-10 01:54:46 +00:00
"SELECT metadata,ts FROM cache WHERE url = ?",
2024-02-10 01:57:03 +00:00
(url,)
2024-02-10 01:52:20 +00:00
)
results = cursor.fetchall()
if results:
for result in results:
# find the one with the closest timestamp
metadata, _ts = result
if ts is None or abs(ts - _ts) < 3600:
logging.debug("Optimal cache hit for %r", url)
return json.loads(metadata)
2024-02-10 01:52:20 +00:00
# No close matches, get the latest one
metadata, _ts = results[-1]
# If the latest one is more than 3 hours old, re-fetch. Otherwise, return.
if ts is None or abs(ts - _ts) < 10800:
logging.debug("Cache hit for %r", url)
return json.loads(metadata)
2024-02-10 01:52:20 +00:00
2024-02-09 22:37:12 +00:00
domain = os.environ.get("PREVIEW_HOMESERVER", "https://" + req.url.hostname)
try:
response = httpx.get(
url,
headers={
"User-Agent": "Mozilla/5.0 (X11; Linux x86_64; rv:122.0) Gecko/20100101 Firefox/122.0"
},
timeout=60,
follow_redirects=True
)
except httpx.HTTPError as e:
raise HTTPException(500, f"Failed to fetch URL: {e!r}")
if "text/html" not in response.headers.get("content-type", ""):
return {}
soup = BeautifulSoup(response.text, "html.parser")
og_tags = {}
for tag in soup.find_all("meta"):
2024-02-22 15:08:01 +00:00
logging.debug("Found meta tag: %r", tag)
if tag.get("property", "").startswith(("og:", "twitter:")):
logging.debug(
"Tag %r is an OG/Twitter tag, with property: %r",
2024-02-22 15:10:15 +00:00
tag.get("property", "N/A"),
textwrap.shorten(tag.get("content", "N/A"), 100),
2024-02-22 15:08:01 +00:00
)
tag_name = tag.get("property")
if tag_name in (*VALID_OG_TAGS, *TWITTER_MAPPING.keys()):
2024-02-09 22:37:12 +00:00
og_tags[tag_name] = tag.get("content")
2024-02-22 15:08:01 +00:00
for tag in og_tags.keys():
if tag.startswith("twitter:"):
if tag in TWITTER_MAPPING:
og_tags[TWITTER_MAPPING[tag]] = og_tags.pop(tag)
logging.debug("Mapped twitter tag %r to og tag %r", tag, TWITTER_MAPPING[tag])
else:
logging.warning("Unrecognized Twitter tag: %r", tag)
og_tags.pop(tag, None)
2024-02-09 22:37:12 +00:00
for tag_name in URL_OG_TAGS:
if tag_name in og_tags:
2024-02-22 15:08:01 +00:00
logging.debug("Retrieving tag %r to see if it needs uploading to Matrix", tag_name)
2024-02-09 22:37:12 +00:00
_url = og_tags[tag_name]
2024-02-22 15:08:01 +00:00
logging.debug("%r = %r", tag_name, _url)
2024-02-09 22:37:12 +00:00
try:
2024-02-16 21:10:03 +00:00
# noinspection PyArgumentList
2024-02-09 23:33:08 +00:00
with httpx.stream(
url=_url,
method="GET",
2024-02-09 22:37:12 +00:00
headers={
"User-Agent": "Mozilla/5.0 (X11; Linux x86_64; rv:122.0) Gecko/20100101 Firefox/122.0"
},
timeout=60,
follow_redirects=True
2024-02-09 23:33:08 +00:00
) as response_media:
if response_media.status_code not in range(200, 300):
logging.warning("Failed to fetch media: %r - HTTP %s", _url, response_media.status_code)
og_tags.pop(tag_name, None)
elif not response_media.headers.get("content-type", "").startswith(("image/", "video/", "audio/")):
logging.warning("Failed to fetch media: %r - not a media type", _url)
og_tags.pop(tag_name, None)
else:
logging.info(
"Downloading {:,} bytes of media: {!r}".format(
int(response_media.headers.get("content-length", 0)),
_url
)
)
_file = io.BytesIO()
_file.write(response_media.read())
_file.seek(0)
if "content-length" in response_media.headers:
_file.seek(0, os.SEEK_END)
if int(response_media.headers["content-length"]) != _file.tell():
logging.warning(
"Possibly failed to fetch media: {!r} - incomplete ({:,} downloaded, {:,} needed)"
2024-02-09 23:33:08 +00:00
.format(
_url,
_file.tell(),
int(response_media.headers["content-length"])
)
)
_file.seek(0)
upload_response = upload_media(
domain,
access_token,
_file,
Path(httpx.URL(_url).path).name,
response_media.headers.get("content-type", "")
)
if upload_response:
2024-02-22 15:08:01 +00:00
og_tags["original:" + tag_name.replace("og:", "")] = og_tags[tag_name]
2024-02-09 23:33:08 +00:00
og_tags[tag_name] = upload_response
2024-02-22 15:08:01 +00:00
if tag_name in ["og:image", "og:image:url", "og:image:secure_url"]:
2024-02-09 23:33:08 +00:00
_file.seek(0)
og_tags["matrix:image:size"] = len(_file.getvalue())
logging.info("Uploaded media: %r" % _url)
else:
logging.warning("Failed to upload media: %r (no returned mxc)", _url)
2024-02-09 22:37:12 +00:00
except httpx.HTTPError as e:
logging.exception("Failed to fetch url for OG tags @ %r: %r", _url, e, exc_info=True)
og_tags.pop(tag_name, None)
2024-02-10 01:32:37 +00:00
for key in og_tags.copy().keys():
if not key.startswith(("original:", "og:", "matrix:")):
value = og_tags.pop(key, None)
og_tags["og:" + key] = value
2024-02-10 01:52:20 +00:00
with sqlite3.connect(CACHE_FILE) as conn:
conn.execute(
"INSERT INTO cache (uuid, url, ts, metadata) VALUES (?, ?, ?, ?)",
(str(uuid.uuid4()), url, round(time.time()), json.dumps(og_tags))
2024-02-10 01:52:20 +00:00
)
2024-02-16 21:10:03 +00:00
return JSONResponse(
og_tags,
200,
headers={
"Cache-Control": "public, max-age=86400"
}
)
2024-02-09 22:37:12 +00:00
if __name__ == "__main__":
import uvicorn
2024-02-16 21:10:03 +00:00
uvicorn.run(
app,
host=os.getenv("PREVIEW_HOST", "0.0.0.0"),
port=int(os.getenv("PREVIEW_PORT", 2226)),
# If you want to enable reverse-proxy support, you must set the $FORWARDED_ALLOW_IPS environment variable.
# See: https://www.uvicorn.org/settings/#http
)