Respect environment variables for httpx

This commit is contained in:
Nexus 2024-02-23 23:11:16 +00:00
parent 771cfadc23
commit a282dea03a
Signed by: nex
GPG key ID: 0FA334385D0B689F

196
server.py
View file

@ -52,6 +52,7 @@ app = fastapi.FastAPI(
lifespan=startup lifespan=startup
) )
lock = Lock() lock = Lock()
# noinspection PyTypeChecker
app.add_middleware( app.add_middleware(
CORSMiddleware, CORSMiddleware,
allow_origins=["*"], allow_origins=["*"],
@ -159,7 +160,14 @@ CACHE_FILE.touch(exist_ok=True)
logging.debug("Cache file: %r", CACHE_FILE) logging.debug("Cache file: %r", CACHE_FILE)
def upload_media(domain: str, access_token: str, file: io.BytesIO, filename: str, content_type: str): def upload_media(
client: httpx.Client,
domain: str,
access_token: str,
file: io.BytesIO,
filename: str,
content_type: str
):
file.seek(0) file.seek(0)
logging.info( logging.info(
"Creating media at %r called %r with the content type %r and %d bytes", "Creating media at %r called %r with the content type %r and %d bytes",
@ -170,7 +178,7 @@ def upload_media(domain: str, access_token: str, file: io.BytesIO, filename: str
) )
# noinspection PyTypeChecker # noinspection PyTypeChecker
response = httpx.post( response = client.post(
"%s/_matrix/media/r0/upload" % domain, "%s/_matrix/media/r0/upload" % domain,
headers={ headers={
"Authorization": f"Bearer {access_token}", "Authorization": f"Bearer {access_token}",
@ -243,15 +251,16 @@ def preview_url(
domain = os.environ.get("PREVIEW_HOMESERVER", "https://" + req.url.hostname) domain = os.environ.get("PREVIEW_HOMESERVER", "https://" + req.url.hostname)
with lock: with lock:
try: with httpx.Client(
with httpx.Client( headers={
headers={ # "User-Agent": "Mozilla/5.0 (X11; Linux x86_64; rv:122.0) Gecko/20100101 Firefox/122.0"
# "User-Agent": "Mozilla/5.0 (X11; Linux x86_64; rv:122.0) Gecko/20100101 Firefox/122.0" "User-Agent": "TwitterBot/1.0"
"User-Agent": "TwitterBot/1.0" },
}, timeout=60,
timeout=60, follow_redirects=False,
follow_redirects=False trust_env=True # for HTTP[S]/ALL_PROXY environment variables.
) as client: ) as client:
try:
response = client.get( response = client.get(
url, url,
) )
@ -261,92 +270,95 @@ def preview_url(
response = client.send(response.next_request) response = client.send(response.next_request)
if response.status_code not in range(200, 400): if response.status_code not in range(200, 400):
response.raise_for_status() response.raise_for_status()
except httpx.HTTPStatusError as e: except httpx.HTTPStatusError as e:
if e.response.status_code in (204, 400, 401, 403, 405, 429, 410): if e.response.status_code in (204, 400, 401, 403, 405, 429, 410):
return JSONResponse({}, e.response.status_code, {"Cache-Control": "no-store"}) return JSONResponse({}, e.response.status_code, {"Cache-Control": "no-store"})
raise HTTPException( raise HTTPException(
e.response.status_code, e.response.status_code,
f"Failed to fetch {e.response.url} - HTTP {e.response.status_code}: {e.response.text}" f"Failed to fetch {e.response.url} - HTTP {e.response.status_code}: {e.response.text}"
)
if "text/html" not in response.headers.get("content-type", ""):
return {}
soup = BeautifulSoup(response.text, "html.parser")
og_tags = {}
for tag in soup.find_all("meta"):
logging.debug("Found meta tag: %r", tag)
if tag.get("property", "").startswith(("og:", "twitter:")):
logging.debug(
"Tag %r is an OG/Twitter tag, with property: %r",
tag.get("property", "N/A"),
textwrap.shorten(tag.get("content", "N/A"), 100),
) )
tag_name = tag.get("property")
if tag_name in (*VALID_OG_TAGS, *TWITTER_MAPPING.keys()):
og_tags[tag_name] = tag.get("content")
for tag in og_tags.copy().keys(): if "text/html" not in response.headers.get("content-type", ""):
if tag.startswith("twitter:"): return {}
if tag in TWITTER_MAPPING:
og_tags[TWITTER_MAPPING[tag]] = og_tags.pop(tag)
logging.debug("Mapped twitter tag %r to og tag %r", tag, TWITTER_MAPPING[tag])
else:
logging.warning("Unrecognized Twitter tag: %r", tag)
og_tags.pop(tag, None)
for tag_name in URL_OG_TAGS: soup = BeautifulSoup(response.text, "html.parser")
if tag_name in og_tags: og_tags = {}
logging.debug("Retrieving tag %r to see if it needs uploading to Matrix", tag_name)
_url = og_tags[tag_name] for tag in soup.find_all("meta"):
logging.debug("%r = %r", tag_name, _url) logging.debug("Found meta tag: %r", tag)
try: if tag.get("property", "").startswith(("og:", "twitter:")):
# noinspection PyArgumentList logging.debug(
with httpx.stream( "Tag %r is an OG/Twitter tag, with property: %r",
url=_url, tag.get("property", "N/A"),
method="GET", textwrap.shorten(tag.get("content", "N/A"), 100),
headers={ )
"User-Agent": "Mozilla/5.0 (X11; Linux x86_64; rv:122.0) Gecko/20100101 Firefox/122.0" tag_name = tag.get("property")
}, if tag_name in (*VALID_OG_TAGS, *TWITTER_MAPPING.keys()):
timeout=60, og_tags[tag_name] = tag.get("content")
follow_redirects=True
) as response_media: for tag in og_tags.copy().keys():
if response_media.status_code not in range(200, 300): if tag.startswith("twitter:"):
logging.warning("Failed to fetch media: %r - HTTP %s", _url, response_media.status_code) if tag in TWITTER_MAPPING:
og_tags.pop(tag_name, None) og_tags[TWITTER_MAPPING[tag]] = og_tags.pop(tag)
elif not response_media.headers.get("content-type", "").startswith(("image/", "video/", "audio/")): logging.debug("Mapped twitter tag %r to og tag %r", tag, TWITTER_MAPPING[tag])
logging.warning("Failed to fetch media: %r - not a media type", _url) else:
og_tags.pop(tag_name, None) logging.warning("Unrecognized Twitter tag: %r", tag)
else: og_tags.pop(tag, None)
logging.info(
"Downloading {:,} bytes of media: {!r}".format( for tag_name in URL_OG_TAGS:
int(response_media.headers.get("content-length", 0)), if tag_name in og_tags:
_url logging.debug("Retrieving tag %r to see if it needs uploading to Matrix", tag_name)
_url = og_tags[tag_name]
logging.debug("%r = %r", tag_name, _url)
try:
# noinspection PyArgumentList
with client.stream(
url=_url,
method="GET",
follow_redirects=True
) as response_media:
if response_media.status_code not in range(200, 300):
logging.warning(
"Failed to fetch media: %r - HTTP %s",
_url,
response_media.status_code
) )
) og_tags.pop(tag_name, None)
_file = io.BytesIO() elif not response_media.headers.get("content-type", "").startswith(
_file.write(response_media.read()) ("image/", "video/", "audio/")
_file.seek(0) ):
upload_response = upload_media( logging.warning("Failed to fetch media: %r - not a media type", _url)
domain, og_tags.pop(tag_name, None)
access_token,
_file,
Path(httpx.URL(_url).path).name,
response_media.headers.get("content-type", "")
)
if upload_response:
og_tags["original:" + tag_name.replace("og:", "")] = og_tags[tag_name]
og_tags[tag_name] = upload_response
if tag_name in ["og:image", "og:image:url", "og:image:secure_url"]:
_file.seek(0)
og_tags["matrix:image:size"] = len(_file.getvalue())
logging.info("Uploaded media: %r, set %r to %r", _url, tag_name, upload_response)
else: else:
logging.warning("Failed to upload media: %r (no returned mxc)", _url) logging.info(
except httpx.HTTPError as e: "Downloading {:,} bytes of media: {!r}".format(
logging.exception("Failed to fetch url for OG tags @ %r: %r", _url, e, exc_info=True) int(response_media.headers.get("content-length", 0)),
og_tags.pop(tag_name, None) _url
)
)
_file = io.BytesIO()
_file.write(response_media.read())
_file.seek(0)
upload_response = upload_media(
client,
domain,
access_token,
_file,
Path(httpx.URL(_url).path).name,
response_media.headers.get("content-type", "")
)
if upload_response:
og_tags["original:" + tag_name.replace("og:", "")] = og_tags[tag_name]
og_tags[tag_name] = upload_response
if tag_name in ["og:image", "og:image:url", "og:image:secure_url"]:
_file.seek(0)
og_tags["matrix:image:size"] = len(_file.getvalue())
logging.info("Uploaded media: %r, set %r to %r", _url, tag_name, upload_response)
else:
logging.warning("Failed to upload media: %r (no returned mxc)", _url)
except httpx.HTTPError as e:
logging.exception("Failed to fetch url for OG tags @ %r: %r", _url, e, exc_info=True)
og_tags.pop(tag_name, None)
for key in og_tags.copy().keys(): for key in og_tags.copy().keys():
if not key.startswith(("original:", "og:", "matrix:")): if not key.startswith(("original:", "og:", "matrix:")):