drop-in-url-previews/server.py

324 lines
10 KiB
Python
Raw Normal View History

2024-02-10 01:57:03 +00:00
import contextlib
2024-02-09 23:11:07 +00:00
import io
import json
2024-02-09 22:37:12 +00:00
import os
import time
2024-02-10 01:52:20 +00:00
import uuid
2024-02-09 22:37:12 +00:00
import fastapi
import httpx
import logging
2024-02-10 01:52:20 +00:00
import sqlite3
import appdirs
2024-02-09 22:37:12 +00:00
from typing import Annotated
from fastapi import Query, Header, HTTPException, Request
from fastapi.responses import JSONResponse
from pathlib import Path
from bs4 import BeautifulSoup
from rich.logging import RichHandler
2024-02-09 23:04:24 +00:00
from fastapi.middleware.cors import CORSMiddleware
2024-02-09 22:37:12 +00:00
2024-02-10 01:52:20 +00:00
logging.basicConfig(
level=logging.getLevelName(os.environ.get("LOG_LEVEL", "INFO").upper()),
format="%(message)s",
datefmt="[%X]",
handlers=[RichHandler(markup=True)]
)
2024-02-09 22:50:10 +00:00
app = fastapi.FastAPI(
root_path=os.environ.get("PREVIEW_ROOT_PATH", ""),
)
2024-02-09 23:04:24 +00:00
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["GET", "OPTIONS"],
allow_headers=["*", "Authorization"],
)
2024-02-09 22:37:12 +00:00
MISSING_TOKEN = JSONResponse(
{"errcode": "M_MISSING_TOKEN", "error": "Missing access token"},
401
)
INVALID_TOKEN = JSONResponse(
{"errcode": "M_INVALID_TOKEN", "error": "Invalid access token"},
401
)
VALID_OG_TAGS = [
"title",
"type",
"image",
"url",
"audio",
"description",
"determiner",
"locale",
"locale:alternative",
"site_name",
"image:url",
"image:secure_url",
"image:type",
"image:width",
"image:height",
"image:alt",
"video",
"video:url",
"video:secure_url",
"video:type",
"video:width",
"video:height",
"video:alt",
"video:actor",
"video:actor:role",
"video:director",
"video:writer",
"video:duration",
"video:release_date",
"video:tag",
"video:series"
"audio:url",
"audio:secure_url",
"audio:type",
"music:duration",
"music:album",
"music:album:disc",
"music:album:track",
"music:musician",
"music:song",
"music:song:disc",
"music:song:track",
"music:release_date",
"music:creator",
"article:published_time",
"article:modified_time",
"article:expiration_time",
"article:author",
"article:section",
"article:tag",
"book:author",
"book:tag",
"book:isbn",
"book:release_date",
"profile:first_name",
"profile:last_name",
"profile:username",
"profile:gender"
]
URL_OG_TAGS = [
"video",
"video:url",
"video:secure_url",
"image",
"image:url",
"image:secure_url",
"audio",
"audio:url",
"audio:secure_url"
]
2024-02-10 01:52:20 +00:00
if Path.cwd() == Path("/app"):
logging.info("Look to be running in a docker container. Cache will be stored in /app/cache.")
CACHE_DIR = Path("/app/cache")
else:
CACHE_DIR = Path(appdirs.user_cache_dir("matrix-url-preview"))
CACHE_FILE = CACHE_DIR / "db.sqlite3"
2024-02-10 01:58:11 +00:00
CACHE_FILE.touch()
2024-02-10 01:52:20 +00:00
logging.debug("Cache file: %r", CACHE_FILE)
2024-02-10 01:57:03 +00:00
@contextlib.asynccontextmanager
2024-02-10 01:52:20 +00:00
async def startup():
if not CACHE_DIR.exists():
CACHE_DIR.mkdir(parents=True)
with sqlite3.connect(CACHE_FILE) as conn:
conn.execute(
"""
CREATE TABLE IF NOT EXISTS cache (
uuid TEXT PRIMARY KEY,
url TEXT NOT NULL,
ts INTEGER NOT NULL,
metadata TEXT NOT NULL
)
"""
)
2024-02-10 01:57:03 +00:00
yield
2024-02-10 01:52:20 +00:00
2024-02-09 22:37:12 +00:00
2024-02-09 23:11:07 +00:00
def upload_media(domain: str, access_token: str, file: io.BytesIO, filename: str, content_type: str):
file.seek(0)
2024-02-09 22:37:12 +00:00
logging.info(
"Creating media at %r called %r with the content type %r and %d bytes",
domain,
filename,
content_type,
2024-02-09 23:11:07 +00:00
len(file.getvalue())
2024-02-09 22:37:12 +00:00
)
2024-02-09 23:33:08 +00:00
file.seek(0)
2024-02-09 22:37:12 +00:00
response = httpx.post(
"%s/_matrix/media/r0/upload" % domain,
headers={
"Authorization": f"Bearer {access_token}",
"Content-Type": content_type
},
2024-02-09 23:33:08 +00:00
data=file.getvalue(),
2024-02-09 22:37:12 +00:00
params={
"filename": filename
}
)
if response.status_code == 200:
logging.info("Media uploaded successfully")
mxc_url = response.json()["content_uri"]
return mxc_url
else:
logging.warning("Failed to upload media: HTTP %s", response.status_code)
logging.debug("Response: %r", response.text)
return None
@app.get("/preview_url")
def preview_url(
req: Request,
url: Annotated[str, Query(..., description="URL to preview")],
2024-02-10 01:52:20 +00:00
ts: int = Query(None, description="The preferred point in time to return a preview for."),
2024-02-09 22:37:12 +00:00
access_token_qs: str | None = Query(None, alias="access_token", description="Access token to use for the request."),
2024-02-10 01:52:20 +00:00
access_token_header: str | None = Header(
None,
alias="Authorization",
description="Access token to use for the request."
),
2024-02-09 22:37:12 +00:00
):
if access_token_qs is not None:
access_token = access_token_qs
elif access_token_header and access_token_header.startswith("Bearer "):
access_token = access_token_header.split("Bearer ")[1]
else:
return MISSING_TOKEN
2024-02-10 01:52:20 +00:00
with sqlite3.connect(CACHE_FILE) as conn:
cursor = conn.cursor()
cursor.execute(
2024-02-10 01:54:46 +00:00
"SELECT metadata,ts FROM cache WHERE url = ?",
2024-02-10 01:57:03 +00:00
(url,)
2024-02-10 01:52:20 +00:00
)
results = cursor.fetchall()
if results:
for result in results:
# find the one with the closest timestamp
metadata, _ts = result
if ts is None or abs(ts - _ts) < 3600:
logging.debug("Optimal cache hit for %r", url)
return json.loads(metadata)
2024-02-10 01:52:20 +00:00
# No close matches, get the latest one
metadata, _ts = results[-1]
# If the latest one is more than 3 hours old, re-fetch. Otherwise, return.
if ts is None or abs(ts - _ts) < 10800:
logging.debug("Cache hit for %r", url)
return json.loads(metadata)
2024-02-10 01:52:20 +00:00
2024-02-09 22:37:12 +00:00
domain = os.environ.get("PREVIEW_HOMESERVER", "https://" + req.url.hostname)
try:
response = httpx.get(
url,
headers={
"User-Agent": "Mozilla/5.0 (X11; Linux x86_64; rv:122.0) Gecko/20100101 Firefox/122.0"
},
timeout=60,
follow_redirects=True
)
except httpx.HTTPError as e:
raise HTTPException(500, f"Failed to fetch URL: {e!r}")
if "text/html" not in response.headers.get("content-type", ""):
return {}
soup = BeautifulSoup(response.text, "html.parser")
og_tags = {}
for tag in soup.find_all("meta"):
if tag.get("property", "").startswith("og:"):
tag_name = tag.get("property")[3:]
if tag_name in VALID_OG_TAGS:
og_tags[tag_name] = tag.get("content")
for tag_name in URL_OG_TAGS:
if tag_name in og_tags:
_url = og_tags[tag_name]
try:
2024-02-09 23:33:08 +00:00
with httpx.stream(
url=_url,
method="GET",
2024-02-09 22:37:12 +00:00
headers={
"User-Agent": "Mozilla/5.0 (X11; Linux x86_64; rv:122.0) Gecko/20100101 Firefox/122.0"
},
timeout=60,
follow_redirects=True
2024-02-09 23:33:08 +00:00
) as response_media:
if response_media.status_code not in range(200, 300):
logging.warning("Failed to fetch media: %r - HTTP %s", _url, response_media.status_code)
og_tags.pop(tag_name, None)
elif not response_media.headers.get("content-type", "").startswith(("image/", "video/", "audio/")):
logging.warning("Failed to fetch media: %r - not a media type", _url)
og_tags.pop(tag_name, None)
else:
logging.info(
"Downloading {:,} bytes of media: {!r}".format(
int(response_media.headers.get("content-length", 0)),
_url
)
)
_file = io.BytesIO()
_file.write(response_media.read())
_file.seek(0)
if "content-length" in response_media.headers:
_file.seek(0, os.SEEK_END)
if int(response_media.headers["content-length"]) != _file.tell():
logging.warning(
"Possibly failed to fetch media: {!r} - incomplete ({:,} downloaded, {:,} needed)"
2024-02-09 23:33:08 +00:00
.format(
_url,
_file.tell(),
int(response_media.headers["content-length"])
)
)
# og_tags.pop(tag_name, None)
# continue
2024-02-09 23:33:08 +00:00
_file.seek(0)
upload_response = upload_media(
domain,
access_token,
_file,
Path(httpx.URL(_url).path).name,
response_media.headers.get("content-type", "")
)
if upload_response:
og_tags["original:" + tag_name] = og_tags[tag_name]
og_tags[tag_name] = upload_response
if tag_name in ["image", "image:url", "image:secure_url"]:
_file.seek(0)
og_tags["matrix:image:size"] = len(_file.getvalue())
logging.info("Uploaded media: %r" % _url)
else:
logging.warning("Failed to upload media: %r (no returned mxc)", _url)
2024-02-09 22:37:12 +00:00
except httpx.HTTPError as e:
logging.exception("Failed to fetch url for OG tags @ %r: %r", _url, e, exc_info=True)
og_tags.pop(tag_name, None)
2024-02-10 01:32:37 +00:00
for key in og_tags.copy().keys():
if not key.startswith(("original:", "og:", "matrix:")):
value = og_tags.pop(key, None)
og_tags["og:" + key] = value
2024-02-10 01:52:20 +00:00
with sqlite3.connect(CACHE_FILE) as conn:
conn.execute(
"INSERT INTO cache (uuid, url, ts, metadata) VALUES (?, ?, ?, ?)",
(str(uuid.uuid4()), url, round(time.time()), json.dumps(og_tags))
2024-02-10 01:52:20 +00:00
)
2024-02-09 22:37:12 +00:00
return og_tags
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=2226)