diff --git a/server.py b/server.py index 0366228..948308e 100644 --- a/server.py +++ b/server.py @@ -11,12 +11,12 @@ import httpx import logging import sqlite3 import appdirs +from threading import Lock from typing import Annotated from fastapi import Query, Header, HTTPException, Request from fastapi.responses import JSONResponse from pathlib import Path from bs4 import BeautifulSoup -from rich.logging import RichHandler from fastapi.middleware.cors import CORSMiddleware @@ -50,6 +50,7 @@ app = fastapi.FastAPI( root_path=os.environ.get("PREVIEW_ROOT_PATH", ""), lifespan=startup ) +lock = Lock() app.add_middleware( CORSMiddleware, allow_origins=["*"], @@ -231,106 +232,106 @@ def preview_url( return json.loads(metadata) domain = os.environ.get("PREVIEW_HOMESERVER", "https://" + req.url.hostname) - - try: - with httpx.Client( - headers={ - # "User-Agent": "Mozilla/5.0 (X11; Linux x86_64; rv:122.0) Gecko/20100101 Firefox/122.0" - "User-Agent": "TwitterBot/1.0" - }, - timeout=60, - follow_redirects=False - ) as client: - response = client.get( - url, - ) - if response.status_code not in range(200, 400): - response.raise_for_status() - while response.next_request and response.next_request.url.host == response.url.host: - response = client.send(response.next_request) + with lock: + try: + with httpx.Client( + headers={ + # "User-Agent": "Mozilla/5.0 (X11; Linux x86_64; rv:122.0) Gecko/20100101 Firefox/122.0" + "User-Agent": "TwitterBot/1.0" + }, + timeout=60, + follow_redirects=False + ) as client: + response = client.get( + url, + ) if response.status_code not in range(200, 400): response.raise_for_status() - except httpx.HTTPError as e: - raise HTTPException(500, f"Failed to fetch URL: {e!r}") + while response.next_request and response.next_request.url.host == response.url.host: + response = client.send(response.next_request) + if response.status_code not in range(200, 400): + response.raise_for_status() + except httpx.HTTPError as e: + raise HTTPException(500, f"Failed to fetch URL: {e!r}") - if "text/html" not in response.headers.get("content-type", ""): - return {} + if "text/html" not in response.headers.get("content-type", ""): + return {} - soup = BeautifulSoup(response.text, "html.parser") - og_tags = {} + soup = BeautifulSoup(response.text, "html.parser") + og_tags = {} - for tag in soup.find_all("meta"): - logging.debug("Found meta tag: %r", tag) - if tag.get("property", "").startswith(("og:", "twitter:")): - logging.debug( - "Tag %r is an OG/Twitter tag, with property: %r", - tag.get("property", "N/A"), - textwrap.shorten(tag.get("content", "N/A"), 100), - ) - tag_name = tag.get("property") - if tag_name in (*VALID_OG_TAGS, *TWITTER_MAPPING.keys()): - og_tags[tag_name] = tag.get("content") + for tag in soup.find_all("meta"): + logging.debug("Found meta tag: %r", tag) + if tag.get("property", "").startswith(("og:", "twitter:")): + logging.debug( + "Tag %r is an OG/Twitter tag, with property: %r", + tag.get("property", "N/A"), + textwrap.shorten(tag.get("content", "N/A"), 100), + ) + tag_name = tag.get("property") + if tag_name in (*VALID_OG_TAGS, *TWITTER_MAPPING.keys()): + og_tags[tag_name] = tag.get("content") - for tag in og_tags.copy().keys(): - if tag.startswith("twitter:"): - if tag in TWITTER_MAPPING: - og_tags[TWITTER_MAPPING[tag]] = og_tags.pop(tag) - logging.debug("Mapped twitter tag %r to og tag %r", tag, TWITTER_MAPPING[tag]) - else: - logging.warning("Unrecognized Twitter tag: %r", tag) - og_tags.pop(tag, None) + for tag in og_tags.copy().keys(): + if tag.startswith("twitter:"): + if tag in TWITTER_MAPPING: + og_tags[TWITTER_MAPPING[tag]] = og_tags.pop(tag) + logging.debug("Mapped twitter tag %r to og tag %r", tag, TWITTER_MAPPING[tag]) + else: + logging.warning("Unrecognized Twitter tag: %r", tag) + og_tags.pop(tag, None) - for tag_name in URL_OG_TAGS: - if tag_name in og_tags: - logging.debug("Retrieving tag %r to see if it needs uploading to Matrix", tag_name) - _url = og_tags[tag_name] - logging.debug("%r = %r", tag_name, _url) - try: - # noinspection PyArgumentList - with httpx.stream( - url=_url, - method="GET", - headers={ - "User-Agent": "Mozilla/5.0 (X11; Linux x86_64; rv:122.0) Gecko/20100101 Firefox/122.0" - }, - timeout=60, - follow_redirects=True - ) as response_media: - if response_media.status_code not in range(200, 300): - logging.warning("Failed to fetch media: %r - HTTP %s", _url, response_media.status_code) - og_tags.pop(tag_name, None) - elif not response_media.headers.get("content-type", "").startswith(("image/", "video/", "audio/")): - logging.warning("Failed to fetch media: %r - not a media type", _url) - og_tags.pop(tag_name, None) - else: - logging.info( - "Downloading {:,} bytes of media: {!r}".format( - int(response_media.headers.get("content-length", 0)), - _url - ) - ) - _file = io.BytesIO() - _file.write(response_media.read()) - _file.seek(0) - upload_response = upload_media( - domain, - access_token, - _file, - Path(httpx.URL(_url).path).name, - response_media.headers.get("content-type", "") - ) - if upload_response: - og_tags["original:" + tag_name.replace("og:", "")] = og_tags[tag_name] - og_tags[tag_name] = upload_response - if tag_name in ["og:image", "og:image:url", "og:image:secure_url"]: - _file.seek(0) - og_tags["matrix:image:size"] = len(_file.getvalue()) - logging.info("Uploaded media: %r, set %r to %r", _url, tag_name, upload_response) + for tag_name in URL_OG_TAGS: + if tag_name in og_tags: + logging.debug("Retrieving tag %r to see if it needs uploading to Matrix", tag_name) + _url = og_tags[tag_name] + logging.debug("%r = %r", tag_name, _url) + try: + # noinspection PyArgumentList + with httpx.stream( + url=_url, + method="GET", + headers={ + "User-Agent": "Mozilla/5.0 (X11; Linux x86_64; rv:122.0) Gecko/20100101 Firefox/122.0" + }, + timeout=60, + follow_redirects=True + ) as response_media: + if response_media.status_code not in range(200, 300): + logging.warning("Failed to fetch media: %r - HTTP %s", _url, response_media.status_code) + og_tags.pop(tag_name, None) + elif not response_media.headers.get("content-type", "").startswith(("image/", "video/", "audio/")): + logging.warning("Failed to fetch media: %r - not a media type", _url) + og_tags.pop(tag_name, None) else: - logging.warning("Failed to upload media: %r (no returned mxc)", _url) - except httpx.HTTPError as e: - logging.exception("Failed to fetch url for OG tags @ %r: %r", _url, e, exc_info=True) - og_tags.pop(tag_name, None) + logging.info( + "Downloading {:,} bytes of media: {!r}".format( + int(response_media.headers.get("content-length", 0)), + _url + ) + ) + _file = io.BytesIO() + _file.write(response_media.read()) + _file.seek(0) + upload_response = upload_media( + domain, + access_token, + _file, + Path(httpx.URL(_url).path).name, + response_media.headers.get("content-type", "") + ) + if upload_response: + og_tags["original:" + tag_name.replace("og:", "")] = og_tags[tag_name] + og_tags[tag_name] = upload_response + if tag_name in ["og:image", "og:image:url", "og:image:secure_url"]: + _file.seek(0) + og_tags["matrix:image:size"] = len(_file.getvalue()) + logging.info("Uploaded media: %r, set %r to %r", _url, tag_name, upload_response) + else: + logging.warning("Failed to upload media: %r (no returned mxc)", _url) + except httpx.HTTPError as e: + logging.exception("Failed to fetch url for OG tags @ %r: %r", _url, e, exc_info=True) + og_tags.pop(tag_name, None) for key in og_tags.copy().keys(): if not key.startswith(("original:", "og:", "matrix:")):