drop-in-url-previews/src/server.py

459 lines
17 KiB
Python

import contextlib
import datetime
import fnmatch
import io
import json
import os
import textwrap
import time
import uuid
import fastapi
import httpx
import logging
import hashlib
from threading import Lock
from typing import Annotated
from fastapi import Query, Header, HTTPException, Request
from fastapi.responses import JSONResponse
from pathlib import Path
from bs4 import BeautifulSoup
from fastapi.middleware.cors import CORSMiddleware
import db
@contextlib.asynccontextmanager
async def startup(_):
with db.db:
logging.info("Creating tables")
db.db.create_tables([db.CachedURLs, db.CachedMedia])
db.db.commit()
yield
logging.basicConfig(
level=logging.getLevelName(os.environ.get("LOG_LEVEL", "INFO").upper()),
format="%(asctime)s:%(levelname)s:%(name)s:%(message)s",
datefmt="%d/%m/%Y %H:%M:%S"
)
if os.getenv("LOG_DEBUG_TIDY", "true") in ("1", "yes", "true"):
logging.getLogger("httpcore.connection").setLevel(logging.INFO)
logging.getLogger("httpcore.http11").setLevel(logging.INFO)
logging.getLogger("httpx").setLevel(logging.INFO)
app = fastapi.FastAPI(
root_path=os.environ.get("PREVIEW_ROOT_PATH", ""),
lifespan=startup
)
proxy = os.getenv("PREVIEW_PROXY")
if proxy:
logging.debug("Using proxy: %r", proxy)
lock = Lock()
# noinspection PyTypeChecker
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["GET", "OPTIONS"],
allow_headers=["*", "Authorization"],
)
MISSING_TOKEN = JSONResponse(
{"errcode": "M_MISSING_TOKEN", "error": "Missing access token"},
401
)
INVALID_TOKEN = JSONResponse(
{"errcode": "M_INVALID_TOKEN", "error": "Invalid access token"},
401
)
USER_AGENTS = {
"twitter": "TwitterBot/1.0",
"firefox": "Mozilla/5.0 (X11; Linux x86_64; rv:123.0) Gecko/20100101 Firefox/123.0",
"chrome": "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, Like Gecko) "
"Chrome/121.9.6167.160 Safari/537.36",
"google": "Mozilla/5.0 AppleWebKit/537.36 (KHTML, like Gecko; compatible; Googlebot/2.1; "
"+http://www.google.com/bot.html) Chrome/121.9.6167.160 Safari/537.36",
"bing": "Mozilla/5.0 (compatible; Bingbot/2.0; +http://www.bing.com/bingbot.htm)",
"yahoo": "Mozilla/5.0 (compatible; Yahoo! Slurp; http://help.yahoo.com/help/us/ysearch/slurp)",
"duckduckgo": "DuckDuckBot/1.0; (+http://duckduckgo.com/duckduckbot.html)",
"facebook": "facebookexternalhit/1.1 (+http://www.facebook.com/externalhit_uatext.php)",
"honest": "MatrixDropInURLPreviewBot/0.1 (+https://git.i-am.nexus/nex/drop-in-url-previews)"
}
os.environ.setdefault("PREVIEW_USER_AGENT", "google")
if os.environ["PREVIEW_USER_AGENT"].lower() not in USER_AGENTS:
raise ValueError(
"Invalid user agent: %r\nMust be one of: %s" % (
os.environ["PREVIEW_USER_AGENT"],
", ".join(USER_AGENTS.keys())
)
)
USER_AGENT = USER_AGENTS[os.environ["PREVIEW_USER_AGENT"].lower()]
logging.debug("Selecting user agent: %r", USER_AGENT)
VALID_OG_TAGS = [
"og:title",
"og:type",
"og:image",
"og:url",
"og:audio",
"og:description",
"og:determiner",
"og:locale",
"og:locale:alternative",
"og:site_name",
"og:image:url",
"og:image:secure_url",
"og:image:type",
"og:image:width",
"og:image:height",
"og:image:alt",
"og:video",
"og:video:url",
"og:video:secure_url",
"og:video:type",
"og:video:width",
"og:video:height",
"og:video:alt",
"og:video:actor",
"og:video:actor:role",
"og:video:director",
"og:video:writer",
"og:video:duration",
"og:video:release_date",
"og:video:tag",
"og:video:series"
"og:audio:url",
"og:audio:secure_url",
"og:audio:type",
"og:music:duration",
"og:music:album",
"og:music:album:disc",
"og:music:album:track",
"og:music:musician",
"og:music:song",
"og:music:song:disc",
"og:music:song:track",
"og:music:release_date",
"og:music:creator",
"og:article:published_time",
"og:article:modified_time",
"og:article:expiration_time",
"og:article:author",
"og:article:section",
"og:article:tag",
"og:book:author",
"og:book:tag",
"og:book:isbn",
"og:book:release_date",
"og:profile:first_name",
"og:profile:last_name",
"og:profile:username",
"og:profile:gender"
]
URL_OG_TAGS = [
"og:video",
"og:video:url",
"og:video:secure_url",
"og:image",
"og:image:url",
"og:image:secure_url",
"og:audio",
"og:audio:url",
"og:audio:secure_url"
]
TWITTER_MAPPING = {
"twitter:image": "og:image",
"twitter:title": "og:title",
"twitter:image:width": "og:image:width",
"twitter:image:height": "og:image:height",
"twitter:image:alt": "og:image:alt",
"twitter:description": "og:description",
}
def upload_media(
client: httpx.Client,
domain: str,
access_token: str,
file: io.BytesIO,
filename: str,
content_type: str
):
file.seek(0)
# 1000 hurts me because 1024 feels correct, but `MB` does in fact stand for MegaByte, not MebiByte.
if len(file.getvalue()) > int(os.getenv("PREVIEW_MAX_MEDIA_MB", "50")) * 1000 * 1000:
logging.warning(
"Media too large: %.2f Megabytes (max %.2fMB)",
len(file.getvalue()),
int(os.getenv("PREVIEW_MAX_MEDIA_MB", "50"))
)
md5 = hashlib.md5(file.getvalue()).hexdigest()
value = db.CachedMedia.get_or_none(md5=md5)
if value:
logging.info("found cached media for %r - %r", md5, value.mxc_url)
return value.mxc_url
logging.info(
"Creating media at %r called %r with the content type %r and %d bytes",
domain,
filename,
content_type,
len(file.getvalue())
)
# noinspection PyTypeChecker
response = client.post(
"%s/_matrix/media/r0/upload" % domain,
headers={
"Authorization": f"Bearer {access_token}",
"Content-Type": content_type
},
data=file.getvalue(),
params={
"filename": filename
}
)
if response.status_code == 200:
logging.info("Media uploaded successfully")
mxc_url = response.json()["content_uri"]
logging.debug("Media uploaded: %r", mxc_url)
db.CachedMedia.create(mxc_url=mxc_url, md5=md5).save()
return mxc_url
else:
logging.warning("Failed to upload media: HTTP %s", response.status_code)
logging.debug("Response: %r", response.text)
return None
def __preview_img(url: str, client: httpx.Client, access_token: str) -> dict:
bio = io.BytesIO()
# noinspection PyArgumentList
with client.stream("GET", url) as response:
for chunk in response.iter_bytes():
bio.write(chunk)
bio.seek(0)
mxc_url = upload_media(
client,
os.environ.get("PREVIEW_HOMESERVER", "https://matrix.org"),
access_token,
bio,
Path(httpx.URL(url).path).name,
response.headers.get("content-type", "image/jpeg")
)
if mxc_url:
return {
"og:image": mxc_url,
"matrix:image:size": len(bio.getvalue())
}
return {}
@app.get("/preview_url")
def preview_url(
req: Request,
res: JSONResponse,
url: Annotated[str, Query(..., description="URL to preview")],
ts: int = Query(None, description="The preferred point in time to return a preview for."),
access_token_qs: str | None = Query(None, alias="access_token", description="Access token to use for the request."),
access_token_header: str | None = Header(
None,
alias="Authorization",
description="Access token to use for the request."
),
):
domain = os.environ.get("PREVIEW_HOMESERVER", "https://" + req.url.hostname)
if ts:
ts = round(ts / 1000)
if access_token_qs is not None:
access_token = access_token_qs
elif access_token_header and access_token_header.startswith("Bearer "):
access_token = access_token_header.split("Bearer ")[1]
else:
return MISSING_TOKEN
response = httpx.get(
domain + "/_matrix/client/r0/account/whoami",
headers={"Authorization": f"Bearer {access_token}"}
)
if response.status_code != 200:
return INVALID_TOKEN
results = db.CachedURLs.select().where(db.CachedURLs.url == url)
if results:
for result in results:
# find the one with the closest timestamp
metadata = json.loads(result.metadata)
_ts = result.ts
created_at = datetime.datetime.fromtimestamp(_ts - 86400)
if ts is None or created_at <= datetime.datetime.fromtimestamp(ts):
logging.debug("Optimal cache hit for %r", url)
res.headers["X-Cache"] = "optimal"
return metadata
else:
logging.debug("No optimal cache matches for url %r.", url)
# No close matches, get the latest one
last_result = results[-1]
metadata = json.loads(last_result.metadata)
_ts = last_result.ts
created_at = datetime.datetime.fromtimestamp(_ts)
if (datetime.datetime.now() - created_at).days <= 7:
logging.debug("Stale cache hit for %r", url)
res.headers["X-Cache"] = "stale"
return metadata
else:
logging.debug("Stale cache miss for %r", url)
res.headers["X-Cache"] = "stale-miss"
else:
logging.debug("Full cache miss for %r", url)
res.headers["X-Cache"] = "full-miss"
with lock:
with httpx.Client(
headers={
"User-Agent": USER_AGENT
},
timeout=60,
follow_redirects=False,
trust_env=True, # for HTTP[S]/ALL_PROXY environment variables.
proxy=proxy
) as client:
try:
response = client.get(
url,
)
if response.status_code not in range(200, 400):
response.raise_for_status()
while response.next_request and response.next_request.url.host == response.url.host:
response = client.send(response.next_request)
if response.status_code not in range(200, 400):
response.raise_for_status()
except httpx.HTTPStatusError as e:
if e.response.status_code in (204, 400, 401, 403, 405, 429, 410):
return JSONResponse({}, e.response.status_code, {"Cache-Control": "no-store"})
raise HTTPException(
e.response.status_code,
f"Failed to fetch {e.response.url} - HTTP {e.response.status_code}: {e.response.text}"
)
except httpx.NetworkError as e:
logging.debug(f"Failed to fetch {url}", exc_info=True)
raise HTTPException(502, f"Failed to fetch {url} - {e}")
content_type = response.headers.get("content-type", "application/octet-stream")
if fnmatch.fnmatch(content_type, "image/*"):
result = __preview_img(url, client, access_token)
db.CachedURLs.create(
url=url,
ts=round(time.time()),
metadata=json.dumps(result)
).save()
res.headers["Cache-Control"] = "public, max-age=86400"
return result
if "text/html" not in content_type:
res.status_code = 204
res.media_type = "text/plain"
res.headers["Cache-Control"] = "no-store"
return None
soup = BeautifulSoup(response.text, "html.parser")
og_tags = {}
for tag in soup.find_all("meta"):
logging.debug("Found meta tag: %r", tag)
if tag.get("property", "").startswith(("og:", "twitter:")):
logging.debug(
"Tag %r is an OG/Twitter tag, with property: %r",
tag.get("property", "N/A"),
textwrap.shorten(tag.get("content", "N/A"), 100),
)
tag_name = tag.get("property")
if tag_name in (*VALID_OG_TAGS, *TWITTER_MAPPING.keys()):
og_tags[tag_name] = tag.get("content")
for tag in og_tags.copy().keys():
if tag.startswith("twitter:"):
if tag in TWITTER_MAPPING:
og_tags[TWITTER_MAPPING[tag]] = og_tags.pop(tag)
logging.debug("Mapped twitter tag %r to og tag %r", tag, TWITTER_MAPPING[tag])
else:
logging.warning("Unrecognized Twitter tag: %r", tag)
og_tags.pop(tag, None)
for tag_name in URL_OG_TAGS:
if tag_name in og_tags:
logging.debug("Retrieving tag %r to see if it needs uploading to Matrix", tag_name)
_url = og_tags[tag_name]
logging.debug("%r = %r", tag_name, _url)
try:
# noinspection PyArgumentList
with client.stream(
url=_url,
method="GET",
follow_redirects=True
) as response_media:
if response_media.status_code not in range(200, 300):
logging.warning(
"Failed to fetch media: %r - HTTP %s",
_url,
response_media.status_code
)
og_tags.pop(tag_name, None)
elif not response_media.headers.get("content-type", "").startswith(
("image/", "video/", "audio/")
):
logging.warning("Failed to fetch media: %r - not a media type", _url)
og_tags.pop(tag_name, None)
else:
logging.info(
"Downloading {:,} bytes of media: {!r}".format(
int(response_media.headers.get("content-length", 0)),
_url
)
)
_file = io.BytesIO()
_file.write(response_media.read())
_file.seek(0)
upload_response = upload_media(
client,
domain,
access_token,
_file,
Path(httpx.URL(_url).path).name,
response_media.headers.get("content-type", "")
)
if upload_response:
og_tags["original:" + tag_name.replace("og:", "")] = og_tags[tag_name]
og_tags[tag_name] = upload_response
if tag_name in ["og:image", "og:image:url", "og:image:secure_url"]:
_file.seek(0)
og_tags["matrix:image:size"] = len(_file.getvalue())
logging.info("Uploaded media: %r, set %r to %r", _url, tag_name, upload_response)
else:
logging.warning("Failed to upload media: %r (no returned mxc)", _url)
except httpx.HTTPError as e:
logging.exception("Failed to fetch url for OG tags @ %r: %r", _url, e, exc_info=True)
og_tags.pop(tag_name, None)
for key in og_tags.copy().keys():
if not key.startswith(("original:", "og:", "matrix:")):
value = og_tags.pop(key, None)
og_tags["og:" + key] = value
db.CachedURLs.create(
url=url,
ts=round(time.time()),
metadata=json.dumps(og_tags)
).save()
res.headers["Cache-Control"] = "public, max-age=86400"
return og_tags
if __name__ == "__main__":
import uvicorn
uvicorn.run(
app,
host=os.getenv("PREVIEW_HOST", "0.0.0.0"),
port=int(os.getenv("PREVIEW_PORT", 2226)),
# If you want to enable reverse-proxy support, you must set the $FORWARDED_ALLOW_IPS environment variable.
# See: https://www.uvicorn.org/settings/#http
)