howmuchdoesthesims4cost.lol/server.py

315 lines
10 KiB
Python
Executable file

# This code is licensed under GNU AGPLv3. See $PROJECT/LICENSE for more info.
# The $PROJECT/static/* content is licensed under CC BY-NC-SA - see $PROJECT/static/LICENSE for info.
import fastapi
import os
import time
import httpx
import typing
import hashlib
import pydantic
from pathlib import Path
from fastapi.staticfiles import StaticFiles
from fastapi.responses import Response, JSONResponse
from fastapi.middleware.cors import CORSMiddleware
class AppIDList(pydantic.BaseModel):
app_ids: list[int] | int
app = fastapi.FastAPI(
title="How much does The Sims 4 cost?",
description="A simple API to get the price of The Sims 4 (and technically other games) on Steam. Source @ /server.py",
license_info={
"name": "GNU AGPLv3",
"identifier": "AGPL-3.0"
},
contact={
"name": "[Matrix] @nex",
"url": "https://matrix.to/#/@nex:nexy7574.co.uk"
},
version="1.2.0",
)
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=False,
allow_methods=["GET", "HEAD", "POST"],
allow_headers=["*"]
)
app.state.session = httpx.AsyncClient(http2=False)
app.state.cache = {}
app.state.ratelimit = {}
app.state.etags = {}
@app.post(
"/api/appinfo",
responses={
200: {
"description": "Successfully fetched all of the app info IDs.",
"headers": {
"X-Ratelimit-Count": {
"schema": {
"type": "integer"
},
"description": "The number of requests you have sent to the API in the last 30 seconds."
},
"X-Ratelimit-Remaining": {
"schema": {
"type": "integer"
},
"description": "The number of requests you have left in the next 30 seconds."
},
"X-Ratelimit-Reset": {
"schema": {
"type": "float"
},
"description": "The unix timestamp of when the ratelimit resets."
},
}
},
304: {
"description": "The ETag provided was valid and no data was returned.",
},
429: {
"description": "You hit the anti-abuse ratelimit. Check the `Retry-After` header for more information.",
"headers": {
"Retry-After": {
"schema": {
"type": "float"
},
"description": "The amount of seconds you should wait before retrying."
}
}
},
500: {
"description": "There was an upstream error."
}
}
)
async def appinfo(
req: fastapi.Request,
body: AppIDList,
cc: str = fastapi.Query(None),
user_agent: str = fastapi.Header(..., alias="User-Agent"),
if_none_match: str = fastapi.Header(None, alias="If-None-Match"),
):
"""
This API endpoint proxies the call to the steam web API, and returns the data in a more usable format.
This also gets around the CORS issue that prevents this being a serverless solution.
This endpoint:
- Caches the data internally for 24 hours, regardless of etag and cache control.
- Has a ratelimit of 10 requests per 20 seconds, shared across all endpoints.
"""
if req.client.host in app.state.ratelimit:
expires = app.state.ratelimit[req.client.host]["expires"]
count = app.state.ratelimit[req.client.host]["count"]
rl_remaining = 10 - count
now = time.time()
if expires >= now:
if count >= 10:
app.state.ratelimit[req.client.host]["expires"] += (2 * (1.125 ** count))
return Response(
status_code=429,
headers={"Retry-After": str(app.state.ratelimit[req.client.host]["expires"] - now)}
)
else:
app.state.ratelimit[req.client.host]["count"] += 1
else:
del app.state.ratelimit[req.client.host]
else:
expires = time.time() + 20
app.state.ratelimit[req.client.host] = {
"expires": expires,
"count": 1
}
rl_remaining = 9
count = 1
auto_cc = "N/A"
if if_none_match:
if_none_match = if_none_match.replace("W/", "").strip('"')
if not cc:
_ccr = await app.state.session.get(f"https://ipinfo.io/{req.client.host}/country")
cc = _ccr.text.strip().lower()
auto_cc = cc
app_ids = body.app_ids
if isinstance(app_ids, int):
app_ids = [app_ids]
multi_fetch = len(app_ids) > 1
cache_hit = "MISS"
if if_none_match in app.state.etags:
if app.state.etags[if_none_match] > time.time():
cache_hit = "FULL-HIT"
return Response(status_code=304)
else:
del app.state.etags[if_none_match]
_md5 = hashlib.md5((",".join(map(str, app_ids)) + cc).encode(), usedforsecurity=False).hexdigest()
values = {}
remaining = app_ids.copy()
for app_id in app_ids:
if int(app_id) in app.state.cache:
data, expires = app.state.cache[app_id]
if expires > time.time():
cache_hit = "PARTIAL-HIT"
values[app_id] = data
remaining.pop(remaining.index(app_id))
else:
del app.state.cache[app_id]
values[app_id] = None
else:
values[app_id] = None
if len(remaining) == 0:
return JSONResponse(
values,
headers={
"cache-control": "public, max-age=806400",
"X-Cache": "FULL-HIT",
"X-Ratelimit-Remaining": str(rl_remaining),
"X-Ratelimit-Count": str(count),
"X-Ratelimit-Reset": str(expires),
"X-Ratelimit-Reset-After": str(expires),
},
)
elif len(remaining) == len(app_ids):
cache_hit = "MISS"
params = {
"appids": ",".join(map(str, remaining))
}
if cc is not None:
params["cc"] = cc
if multi_fetch:
params["filters"] = "price_overview"
headers = {
"User-Agent": user_agent,
}
result = original_response = await app.state.session.get(
"https://store.steampowered.com/api/appdetails",
params=params,
headers=headers,
)
if result.status_code == 429:
return Response(status_code=429)
elif result.status_code not in [304, 200]:
raise fastapi.HTTPException(result.status_code, result.text)
else:
app.state.etag = result.headers.get("etag")
app.state.last_modified = result.headers.get("last-modified")
result = result.json()
for _app_id, data in result.items():
if "data" in data:
app.state.cache[int(_app_id)] = (data["data"], time.time() + 86400)
values[int(_app_id)] = data["data"]
app.state.etags[_md5] = time.time() + 86400
return JSONResponse(
values,
original_response.status_code,
headers={
"ETag": _md5,
"cache-control": "public, max-age=806400",
"X-Cache": str(cache_hit),
"X-IP": str(req.client.host),
"X-Ratelimit-Remaining": str(rl_remaining),
"X-Ratelimit-Count": str(count),
"X-Ratelimit-Reset": str(expires),
"X-Ratelimit-Reset-After": str(expires),
},
)
RL_HEADERS = {
"X-Ratelimit-Throttled": {
"schema": {
"type": "boolean"
},
"description": "Whether or not you are actively being throttled."
},
"X-Ratelimit-Count": {
"schema": {
"type": "integer"
},
"description": "The number of requests you have sent to the API in the last 30 seconds."
},
"X-Ratelimit-Remaining": {
"schema": {
"type": "integer"
},
"description": "The number of requests you have left in the next 30 seconds."
},
"X-Ratelimit-Reset": {
"schema": {
"type": "float"
},
"description": "The unix timestamp of when the ratelimit resets."
}
}
RL_DOC = {
204: {
"description": "Successfully checked the ratelimit.",
"headers": RL_HEADERS
},
429: {
"description": "You hit the anti-abuse ratelimit. Check the `Retry-After` header for more information.",
"headers": {
**RL_HEADERS,
"Retry-After": {
"schema": {
"type": "float"
},
"description": "The amount of seconds you should wait before retrying."
}
}
}
}
@app.get("/api/ratelimit", status_code=204, responses=RL_DOC)
@app.head("/api/ratelimit", status_code=204, responses=RL_DOC)
async def check_ratelimit(req: fastapi.Request):
"""
Checks the current ratelimit. See GET /api/appinfo for more info.
"""
headers = {
"X-Ratelimit-Throttled": "False",
"X-Ratelimit-Remaining": "10",
"X-Ratelimit-Count": "0",
"X-Ratelimit-Reset": "0",
}
if req.client.host in app.state.ratelimit:
expires = app.state.ratelimit[req.client.host]["expires"]
count = app.state.ratelimit[req.client.host]["count"]
rl_remaining = 10 - count
headers["X-Ratelimit-Remaining"] = str(rl_remaining)
headers["X-Ratelimit-Count"] = str(count)
headers["X-Ratelimit-Reset"] = str(expires)
headers["X-Ratelimit-Throttled"] = str(rl_remaining >= 10 and expires >= time.time())
if headers["X-Ratelimit-Throttled"] == "True":
headers["Retry-After"] = str(expires - time.time())
return Response(None, 204 if headers["X-Ratelimit-Throttled"] == "False" else 429, headers=headers)
app.mount(
"/",
StaticFiles(directory=Path.cwd() / "static", html=True, check_dir=True, follow_symlink=True),
)
if __name__ == "__main__":
import uvicorn
import os
print("Starting in", Path.cwd())
uvicorn.run(app, host="0.0.0.0", port=1037, forwarded_allow_ips="*")