Lock the parser to prevent spam and repeated requests

This commit is contained in:
Nexus 2024-02-22 18:24:59 +00:00
parent 9f97c523d6
commit d7422926ad
Signed by: nex
GPG key ID: 0FA334385D0B689F

187
server.py
View file

@ -11,12 +11,12 @@ import httpx
import logging import logging
import sqlite3 import sqlite3
import appdirs import appdirs
from threading import Lock
from typing import Annotated from typing import Annotated
from fastapi import Query, Header, HTTPException, Request from fastapi import Query, Header, HTTPException, Request
from fastapi.responses import JSONResponse from fastapi.responses import JSONResponse
from pathlib import Path from pathlib import Path
from bs4 import BeautifulSoup from bs4 import BeautifulSoup
from rich.logging import RichHandler
from fastapi.middleware.cors import CORSMiddleware from fastapi.middleware.cors import CORSMiddleware
@ -50,6 +50,7 @@ app = fastapi.FastAPI(
root_path=os.environ.get("PREVIEW_ROOT_PATH", ""), root_path=os.environ.get("PREVIEW_ROOT_PATH", ""),
lifespan=startup lifespan=startup
) )
lock = Lock()
app.add_middleware( app.add_middleware(
CORSMiddleware, CORSMiddleware,
allow_origins=["*"], allow_origins=["*"],
@ -231,106 +232,106 @@ def preview_url(
return json.loads(metadata) return json.loads(metadata)
domain = os.environ.get("PREVIEW_HOMESERVER", "https://" + req.url.hostname) domain = os.environ.get("PREVIEW_HOMESERVER", "https://" + req.url.hostname)
with lock:
try: try:
with httpx.Client( with httpx.Client(
headers={ headers={
# "User-Agent": "Mozilla/5.0 (X11; Linux x86_64; rv:122.0) Gecko/20100101 Firefox/122.0" # "User-Agent": "Mozilla/5.0 (X11; Linux x86_64; rv:122.0) Gecko/20100101 Firefox/122.0"
"User-Agent": "TwitterBot/1.0" "User-Agent": "TwitterBot/1.0"
}, },
timeout=60, timeout=60,
follow_redirects=False follow_redirects=False
) as client: ) as client:
response = client.get( response = client.get(
url, url,
) )
if response.status_code not in range(200, 400):
response.raise_for_status()
while response.next_request and response.next_request.url.host == response.url.host:
response = client.send(response.next_request)
if response.status_code not in range(200, 400): if response.status_code not in range(200, 400):
response.raise_for_status() response.raise_for_status()
except httpx.HTTPError as e: while response.next_request and response.next_request.url.host == response.url.host:
raise HTTPException(500, f"Failed to fetch URL: {e!r}") response = client.send(response.next_request)
if response.status_code not in range(200, 400):
response.raise_for_status()
except httpx.HTTPError as e:
raise HTTPException(500, f"Failed to fetch URL: {e!r}")
if "text/html" not in response.headers.get("content-type", ""): if "text/html" not in response.headers.get("content-type", ""):
return {} return {}
soup = BeautifulSoup(response.text, "html.parser") soup = BeautifulSoup(response.text, "html.parser")
og_tags = {} og_tags = {}
for tag in soup.find_all("meta"): for tag in soup.find_all("meta"):
logging.debug("Found meta tag: %r", tag) logging.debug("Found meta tag: %r", tag)
if tag.get("property", "").startswith(("og:", "twitter:")): if tag.get("property", "").startswith(("og:", "twitter:")):
logging.debug( logging.debug(
"Tag %r is an OG/Twitter tag, with property: %r", "Tag %r is an OG/Twitter tag, with property: %r",
tag.get("property", "N/A"), tag.get("property", "N/A"),
textwrap.shorten(tag.get("content", "N/A"), 100), textwrap.shorten(tag.get("content", "N/A"), 100),
) )
tag_name = tag.get("property") tag_name = tag.get("property")
if tag_name in (*VALID_OG_TAGS, *TWITTER_MAPPING.keys()): if tag_name in (*VALID_OG_TAGS, *TWITTER_MAPPING.keys()):
og_tags[tag_name] = tag.get("content") og_tags[tag_name] = tag.get("content")
for tag in og_tags.copy().keys(): for tag in og_tags.copy().keys():
if tag.startswith("twitter:"): if tag.startswith("twitter:"):
if tag in TWITTER_MAPPING: if tag in TWITTER_MAPPING:
og_tags[TWITTER_MAPPING[tag]] = og_tags.pop(tag) og_tags[TWITTER_MAPPING[tag]] = og_tags.pop(tag)
logging.debug("Mapped twitter tag %r to og tag %r", tag, TWITTER_MAPPING[tag]) logging.debug("Mapped twitter tag %r to og tag %r", tag, TWITTER_MAPPING[tag])
else: else:
logging.warning("Unrecognized Twitter tag: %r", tag) logging.warning("Unrecognized Twitter tag: %r", tag)
og_tags.pop(tag, None) og_tags.pop(tag, None)
for tag_name in URL_OG_TAGS: for tag_name in URL_OG_TAGS:
if tag_name in og_tags: if tag_name in og_tags:
logging.debug("Retrieving tag %r to see if it needs uploading to Matrix", tag_name) logging.debug("Retrieving tag %r to see if it needs uploading to Matrix", tag_name)
_url = og_tags[tag_name] _url = og_tags[tag_name]
logging.debug("%r = %r", tag_name, _url) logging.debug("%r = %r", tag_name, _url)
try: try:
# noinspection PyArgumentList # noinspection PyArgumentList
with httpx.stream( with httpx.stream(
url=_url, url=_url,
method="GET", method="GET",
headers={ headers={
"User-Agent": "Mozilla/5.0 (X11; Linux x86_64; rv:122.0) Gecko/20100101 Firefox/122.0" "User-Agent": "Mozilla/5.0 (X11; Linux x86_64; rv:122.0) Gecko/20100101 Firefox/122.0"
}, },
timeout=60, timeout=60,
follow_redirects=True follow_redirects=True
) as response_media: ) as response_media:
if response_media.status_code not in range(200, 300): if response_media.status_code not in range(200, 300):
logging.warning("Failed to fetch media: %r - HTTP %s", _url, response_media.status_code) logging.warning("Failed to fetch media: %r - HTTP %s", _url, response_media.status_code)
og_tags.pop(tag_name, None) og_tags.pop(tag_name, None)
elif not response_media.headers.get("content-type", "").startswith(("image/", "video/", "audio/")): elif not response_media.headers.get("content-type", "").startswith(("image/", "video/", "audio/")):
logging.warning("Failed to fetch media: %r - not a media type", _url) logging.warning("Failed to fetch media: %r - not a media type", _url)
og_tags.pop(tag_name, None) og_tags.pop(tag_name, None)
else:
logging.info(
"Downloading {:,} bytes of media: {!r}".format(
int(response_media.headers.get("content-length", 0)),
_url
)
)
_file = io.BytesIO()
_file.write(response_media.read())
_file.seek(0)
upload_response = upload_media(
domain,
access_token,
_file,
Path(httpx.URL(_url).path).name,
response_media.headers.get("content-type", "")
)
if upload_response:
og_tags["original:" + tag_name.replace("og:", "")] = og_tags[tag_name]
og_tags[tag_name] = upload_response
if tag_name in ["og:image", "og:image:url", "og:image:secure_url"]:
_file.seek(0)
og_tags["matrix:image:size"] = len(_file.getvalue())
logging.info("Uploaded media: %r, set %r to %r", _url, tag_name, upload_response)
else: else:
logging.warning("Failed to upload media: %r (no returned mxc)", _url) logging.info(
except httpx.HTTPError as e: "Downloading {:,} bytes of media: {!r}".format(
logging.exception("Failed to fetch url for OG tags @ %r: %r", _url, e, exc_info=True) int(response_media.headers.get("content-length", 0)),
og_tags.pop(tag_name, None) _url
)
)
_file = io.BytesIO()
_file.write(response_media.read())
_file.seek(0)
upload_response = upload_media(
domain,
access_token,
_file,
Path(httpx.URL(_url).path).name,
response_media.headers.get("content-type", "")
)
if upload_response:
og_tags["original:" + tag_name.replace("og:", "")] = og_tags[tag_name]
og_tags[tag_name] = upload_response
if tag_name in ["og:image", "og:image:url", "og:image:secure_url"]:
_file.seek(0)
og_tags["matrix:image:size"] = len(_file.getvalue())
logging.info("Uploaded media: %r, set %r to %r", _url, tag_name, upload_response)
else:
logging.warning("Failed to upload media: %r (no returned mxc)", _url)
except httpx.HTTPError as e:
logging.exception("Failed to fetch url for OG tags @ %r: %r", _url, e, exc_info=True)
og_tags.pop(tag_name, None)
for key in og_tags.copy().keys(): for key in og_tags.copy().keys():
if not key.startswith(("original:", "og:", "matrix:")): if not key.startswith(("original:", "og:", "matrix:")):