99 lines
2.9 KiB
Python
99 lines
2.9 KiB
Python
import requests
|
|
import json
|
|
import logging
|
|
import time
|
|
import aiohttp
|
|
from fastapi import FastAPI, Header, Request, Query, HTTPException
|
|
from fastapi.responses import JSONResponse
|
|
from contextlib import asynccontextmanager
|
|
|
|
logging.basicConfig(level=logging.INFO)
|
|
|
|
|
|
@asynccontextmanager
|
|
async def lifespan(_app: FastAPI):
|
|
async with aiohttp.ClientSession(
|
|
"https://ip.shronk.tech",
|
|
connector=aiohttp.TCPConnector(limit=2048),
|
|
raise_for_status=True
|
|
) as client:
|
|
_app.state.session = client
|
|
yield
|
|
|
|
|
|
app = FastAPI(lifespan=lifespan)
|
|
app.state.cache = {}
|
|
|
|
|
|
async def make_request(ip: str, headers: dict[str, str]) -> dict | HTTPException:
|
|
try:
|
|
async with app.state.session.get(f"/lookup?ip={ip}") as response:
|
|
data = await response.json()
|
|
except json.JSONDecodeError as e:
|
|
logging.error("Failed to parse data for %s: %s", ip, e, exc_info=True)
|
|
return HTTPException(500, "Failed to parse upstream response.")
|
|
except Exception as e:
|
|
logging.error("Failed to get data for %s: %s", ip, e, exc_info=True)
|
|
return HTTPException(500, "Failed to get upstream data.")
|
|
return data
|
|
|
|
@app.get("/")
|
|
async def ip(
|
|
request: Request,
|
|
X_Forwarded_For: str = Header(None),
|
|
User_Agent: str = Header("Mozilla/5.0"),
|
|
lookup: str = Query(None),
|
|
):
|
|
if lookup:
|
|
ip = lookup
|
|
elif X_Forwarded_For:
|
|
ip = X_Forwarded_For
|
|
else:
|
|
ip = request.client.host
|
|
|
|
if ip in app.state.cache:
|
|
data, timestamp = app.state.cache[ip]
|
|
if time.time() - timestamp < 3600:
|
|
logging.info("cache hit for %s", ip)
|
|
return JSONResponse(data)
|
|
logging.info("cache expired for %s", ip)
|
|
|
|
logging.info("looking up IP info for %s", ip)
|
|
result = await make_request(
|
|
ip,
|
|
{"User-Agent": User_Agent}
|
|
)
|
|
if isinstance(result, HTTPException):
|
|
return result
|
|
data["ip"] = ip
|
|
data.pop("legalese", None)
|
|
data.pop("source", None)
|
|
data.pop("brexitRequired", None)
|
|
logging.info("%s -> %r", ip, data)
|
|
app.state.cache[ip] = [data, time.time()]
|
|
return JSONResponse(data)
|
|
|
|
|
|
@app.get("/lookup")
|
|
async def lookup(ip: str = Query(...), User_Agent: str = Header("Mozilla/5.0")):
|
|
if ip in app.state.cache:
|
|
data, timestamp = app.state.cache[ip]
|
|
if time.time() - timestamp < 3600:
|
|
logging.info("cache hit for %s", ip)
|
|
return JSONResponse(data)
|
|
logging.info("cache expired for %s", ip)
|
|
|
|
logging.info("looking up IP info for %s", ip)
|
|
result = await make_request(
|
|
ip,
|
|
{"User-Agent": User_Agent}
|
|
)
|
|
if isinstance(result, HTTPException):
|
|
return result
|
|
data["ip"] = ip
|
|
data.pop("legalese", None)
|
|
data.pop("source", None)
|
|
data.pop("brexitRequired", None)
|
|
logging.info("%s -> %r", ip, data)
|
|
app.state.cache[ip] = [data, time.time()]
|
|
return JSONResponse(data)
|