315 lines
10 KiB
Python
Executable file
315 lines
10 KiB
Python
Executable file
# This code is licensed under GNU AGPLv3. See $PROJECT/LICENSE for more info.
|
|
# The $PROJECT/static/* content is licensed under CC BY-NC-SA - see $PROJECT/static/LICENSE for info.
|
|
import fastapi
|
|
import os
|
|
import time
|
|
import httpx
|
|
import typing
|
|
import hashlib
|
|
import pydantic
|
|
from pathlib import Path
|
|
from fastapi.staticfiles import StaticFiles
|
|
from fastapi.responses import Response, JSONResponse
|
|
from fastapi.middleware.cors import CORSMiddleware
|
|
|
|
|
|
class AppIDList(pydantic.BaseModel):
|
|
app_ids: list[int] | int
|
|
|
|
|
|
app = fastapi.FastAPI(
|
|
title="How much does The Sims 4 cost?",
|
|
description="A simple API to get the price of The Sims 4 (and technically other games) on Steam. Source @ /server.py",
|
|
license_info={
|
|
"name": "GNU AGPLv3",
|
|
"identifier": "AGPL-3.0"
|
|
},
|
|
contact={
|
|
"name": "[Matrix] @nex",
|
|
"url": "https://matrix.to/#/@nex:nexy7574.co.uk"
|
|
},
|
|
version="1.2.0",
|
|
)
|
|
app.add_middleware(
|
|
CORSMiddleware,
|
|
allow_origins=["*"],
|
|
allow_credentials=False,
|
|
allow_methods=["GET", "HEAD", "POST"],
|
|
allow_headers=["*"]
|
|
)
|
|
app.state.session = httpx.AsyncClient(http2=False)
|
|
app.state.cache = {}
|
|
app.state.ratelimit = {}
|
|
app.state.etags = {}
|
|
|
|
|
|
@app.post(
|
|
"/api/appinfo",
|
|
responses={
|
|
200: {
|
|
"description": "Successfully fetched all of the app info IDs.",
|
|
"headers": {
|
|
"X-Ratelimit-Count": {
|
|
"schema": {
|
|
"type": "integer"
|
|
},
|
|
"description": "The number of requests you have sent to the API in the last 30 seconds."
|
|
},
|
|
"X-Ratelimit-Remaining": {
|
|
"schema": {
|
|
"type": "integer"
|
|
},
|
|
"description": "The number of requests you have left in the next 30 seconds."
|
|
},
|
|
"X-Ratelimit-Reset": {
|
|
"schema": {
|
|
"type": "float"
|
|
},
|
|
"description": "The unix timestamp of when the ratelimit resets."
|
|
},
|
|
}
|
|
},
|
|
304: {
|
|
"description": "The ETag provided was valid and no data was returned.",
|
|
},
|
|
429: {
|
|
"description": "You hit the anti-abuse ratelimit. Check the `Retry-After` header for more information.",
|
|
"headers": {
|
|
"Retry-After": {
|
|
"schema": {
|
|
"type": "float"
|
|
},
|
|
"description": "The amount of seconds you should wait before retrying."
|
|
}
|
|
}
|
|
},
|
|
500: {
|
|
"description": "There was an upstream error."
|
|
}
|
|
}
|
|
)
|
|
async def appinfo(
|
|
req: fastapi.Request,
|
|
body: AppIDList,
|
|
cc: str = fastapi.Query(None),
|
|
user_agent: str = fastapi.Header(..., alias="User-Agent"),
|
|
if_none_match: str = fastapi.Header(None, alias="If-None-Match"),
|
|
):
|
|
"""
|
|
This API endpoint proxies the call to the steam web API, and returns the data in a more usable format.
|
|
This also gets around the CORS issue that prevents this being a serverless solution.
|
|
|
|
This endpoint:
|
|
- Caches the data internally for 24 hours, regardless of etag and cache control.
|
|
- Has a ratelimit of 10 requests per 20 seconds, shared across all endpoints.
|
|
"""
|
|
if req.client.host in app.state.ratelimit:
|
|
expires = app.state.ratelimit[req.client.host]["expires"]
|
|
count = app.state.ratelimit[req.client.host]["count"]
|
|
rl_remaining = 10 - count
|
|
now = time.time()
|
|
if expires >= now:
|
|
if count >= 10:
|
|
app.state.ratelimit[req.client.host]["expires"] += (2 * (1.125 ** count))
|
|
return Response(
|
|
status_code=429,
|
|
headers={"Retry-After": str(app.state.ratelimit[req.client.host]["expires"] - now)}
|
|
)
|
|
else:
|
|
app.state.ratelimit[req.client.host]["count"] += 1
|
|
else:
|
|
del app.state.ratelimit[req.client.host]
|
|
else:
|
|
expires = time.time() + 20
|
|
app.state.ratelimit[req.client.host] = {
|
|
"expires": expires,
|
|
"count": 1
|
|
}
|
|
rl_remaining = 9
|
|
count = 1
|
|
|
|
auto_cc = "N/A"
|
|
if if_none_match:
|
|
if_none_match = if_none_match.replace("W/", "").strip('"')
|
|
if not cc:
|
|
_ccr = await app.state.session.get(f"https://ipinfo.io/{req.client.host}/country")
|
|
cc = _ccr.text.strip().lower()
|
|
auto_cc = cc
|
|
app_ids = body.app_ids
|
|
if isinstance(app_ids, int):
|
|
app_ids = [app_ids]
|
|
|
|
multi_fetch = len(app_ids) > 1
|
|
cache_hit = "MISS"
|
|
|
|
if if_none_match in app.state.etags:
|
|
if app.state.etags[if_none_match] > time.time():
|
|
cache_hit = "FULL-HIT"
|
|
return Response(status_code=304)
|
|
else:
|
|
del app.state.etags[if_none_match]
|
|
|
|
_md5 = hashlib.md5((",".join(map(str, app_ids)) + cc).encode(), usedforsecurity=False).hexdigest()
|
|
|
|
values = {}
|
|
remaining = app_ids.copy()
|
|
for app_id in app_ids:
|
|
if int(app_id) in app.state.cache:
|
|
data, expires = app.state.cache[app_id]
|
|
if expires > time.time():
|
|
cache_hit = "PARTIAL-HIT"
|
|
values[app_id] = data
|
|
remaining.pop(remaining.index(app_id))
|
|
else:
|
|
del app.state.cache[app_id]
|
|
values[app_id] = None
|
|
else:
|
|
values[app_id] = None
|
|
|
|
if len(remaining) == 0:
|
|
return JSONResponse(
|
|
values,
|
|
headers={
|
|
"cache-control": "public, max-age=806400",
|
|
"X-Cache": "FULL-HIT",
|
|
"X-Ratelimit-Remaining": str(rl_remaining),
|
|
"X-Ratelimit-Count": str(count),
|
|
"X-Ratelimit-Reset": str(expires),
|
|
"X-Ratelimit-Reset-After": str(expires),
|
|
},
|
|
)
|
|
elif len(remaining) == len(app_ids):
|
|
cache_hit = "MISS"
|
|
|
|
params = {
|
|
"appids": ",".join(map(str, remaining))
|
|
}
|
|
if cc is not None:
|
|
params["cc"] = cc
|
|
|
|
if multi_fetch:
|
|
params["filters"] = "price_overview"
|
|
|
|
headers = {
|
|
"User-Agent": user_agent,
|
|
}
|
|
|
|
result = original_response = await app.state.session.get(
|
|
"https://store.steampowered.com/api/appdetails",
|
|
params=params,
|
|
headers=headers,
|
|
)
|
|
if result.status_code == 429:
|
|
return Response(status_code=429)
|
|
elif result.status_code not in [304, 200]:
|
|
raise fastapi.HTTPException(result.status_code, result.text)
|
|
else:
|
|
app.state.etag = result.headers.get("etag")
|
|
app.state.last_modified = result.headers.get("last-modified")
|
|
|
|
result = result.json()
|
|
for _app_id, data in result.items():
|
|
if "data" in data:
|
|
app.state.cache[int(_app_id)] = (data["data"], time.time() + 86400)
|
|
values[int(_app_id)] = data["data"]
|
|
|
|
app.state.etags[_md5] = time.time() + 86400
|
|
return JSONResponse(
|
|
values,
|
|
original_response.status_code,
|
|
headers={
|
|
"ETag": _md5,
|
|
"cache-control": "public, max-age=806400",
|
|
"X-Cache": str(cache_hit),
|
|
"X-IP": str(req.client.host),
|
|
"X-Ratelimit-Remaining": str(rl_remaining),
|
|
"X-Ratelimit-Count": str(count),
|
|
"X-Ratelimit-Reset": str(expires),
|
|
"X-Ratelimit-Reset-After": str(expires),
|
|
},
|
|
)
|
|
|
|
|
|
RL_HEADERS = {
|
|
"X-Ratelimit-Throttled": {
|
|
"schema": {
|
|
"type": "boolean"
|
|
},
|
|
"description": "Whether or not you are actively being throttled."
|
|
},
|
|
"X-Ratelimit-Count": {
|
|
"schema": {
|
|
"type": "integer"
|
|
},
|
|
"description": "The number of requests you have sent to the API in the last 30 seconds."
|
|
},
|
|
"X-Ratelimit-Remaining": {
|
|
"schema": {
|
|
"type": "integer"
|
|
},
|
|
"description": "The number of requests you have left in the next 30 seconds."
|
|
},
|
|
"X-Ratelimit-Reset": {
|
|
"schema": {
|
|
"type": "float"
|
|
},
|
|
"description": "The unix timestamp of when the ratelimit resets."
|
|
}
|
|
}
|
|
RL_DOC = {
|
|
204: {
|
|
"description": "Successfully checked the ratelimit.",
|
|
"headers": RL_HEADERS
|
|
},
|
|
429: {
|
|
"description": "You hit the anti-abuse ratelimit. Check the `Retry-After` header for more information.",
|
|
"headers": {
|
|
**RL_HEADERS,
|
|
"Retry-After": {
|
|
"schema": {
|
|
"type": "float"
|
|
},
|
|
"description": "The amount of seconds you should wait before retrying."
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
@app.get("/api/ratelimit", status_code=204, responses=RL_DOC)
|
|
@app.head("/api/ratelimit", status_code=204, responses=RL_DOC)
|
|
async def check_ratelimit(req: fastapi.Request):
|
|
"""
|
|
Checks the current ratelimit. See GET /api/appinfo for more info.
|
|
"""
|
|
headers = {
|
|
"X-Ratelimit-Throttled": "False",
|
|
"X-Ratelimit-Remaining": "10",
|
|
"X-Ratelimit-Count": "0",
|
|
"X-Ratelimit-Reset": "0",
|
|
}
|
|
if req.client.host in app.state.ratelimit:
|
|
expires = app.state.ratelimit[req.client.host]["expires"]
|
|
count = app.state.ratelimit[req.client.host]["count"]
|
|
rl_remaining = 10 - count
|
|
|
|
headers["X-Ratelimit-Remaining"] = str(rl_remaining)
|
|
headers["X-Ratelimit-Count"] = str(count)
|
|
headers["X-Ratelimit-Reset"] = str(expires)
|
|
headers["X-Ratelimit-Throttled"] = str(rl_remaining >= 10 and expires >= time.time())
|
|
if headers["X-Ratelimit-Throttled"] == "True":
|
|
headers["Retry-After"] = str(expires - time.time())
|
|
return Response(None, 204 if headers["X-Ratelimit-Throttled"] == "False" else 429, headers=headers)
|
|
|
|
|
|
|
|
app.mount(
|
|
"/",
|
|
StaticFiles(directory=Path.cwd() / "static", html=True, check_dir=True, follow_symlink=True),
|
|
)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
import uvicorn
|
|
import os
|
|
print("Starting in", Path.cwd())
|
|
uvicorn.run(app, host="0.0.0.0", port=1037, forwarded_allow_ips="*")
|