use aiohttp

This commit is contained in:
Nexus 2024-04-19 10:59:19 +01:00
parent 68afe5214d
commit d335768c1b
Signed by: nex
GPG key ID: 0FA334385D0B689F

View file

@ -2,16 +2,30 @@ import requests
import json
import logging
import time
import aiohttp
from fastapi import FastAPI, Header, Request, Query, HTTPException
from fastapi.responses import JSONResponse
from contextlib import asynccontextmanager
logging.basicConfig(level=logging.INFO)
@asynccontextmanager()
async def lifespan(_app: FastAPI):
async with aiohttp.ClientSession(
"https://ip.shronk.tech",
connector=aiohttp.TCPConnector(limit=2048)
) as client:
_app.state.session = client
yield
app = FastAPI()
logging.basicConfig(level=logging.INFO)
cache = {}
app.state.cache = {}
@app.get("/")
def ip(
async def ip(
request: Request,
X_Forwarded_For: str = Header(None),
User_Agent: str = Header("Mozilla/5.0"),
@ -26,8 +40,8 @@ def ip(
else:
ip = request.client.host
if ip in cache:
data, timestamp = cache[ip]
if ip in app.state.cache:
data, timestamp = app.state.cache[ip]
if time.time() - timestamp < 3600:
logging.info("cache hit for %s", ip)
return JSONResponse(data)
@ -35,27 +49,12 @@ def ip(
logging.info("looking up IP info for %s", ip)
try:
response = requests.get(
"https://ip.shronk.tech/lookup?ip=" + ip,
headers={
"User-Agent": User_Agent,
"Accept": Accept,
"Accept-Language": Accept_Language,
"Dnt": "1",
"Sec-Fetch-Dest": "document",
"Sec-Fetch-Mode": "navigate",
"Sec-Fetch-Site": "none",
"Sec-Fetch-User": "?1",
"Sec-Gpc": "1",
"Upgrade-Insecure-Requests": "1"
}
)
response.raise_for_status()
async with app.state.session.get(f"/lookup?ip={ip}") as response:
response.raise_for_status()
data = await response.json()
except Exception as e:
logging.error("Failed to get data for %s: %s", ip, e, exc_info=True)
raise HTTPException(500, "Failed to get upstream data.")
try:
data = response.json()
except json.JSONDecodeError as e:
logging.error("Failed to parse data for %s: %s", ip, e, exc_info=True)
raise HTTPException(500, "Failed to parse upstream response.")
@ -65,5 +64,5 @@ def ip(
data.pop("source", None)
data.pop("brexitRequired", None)
if response.status_code == 200:
cache[ip] = [data, time.time()]
app.state.cache[ip] = [data, time.time()]
return JSONResponse(data, response.status_code)