This repository has been archived on 2024-06-12. You can view files and clone it, but cannot push or open issues or pull requests.
ipserv/ipserv.py
2024-04-19 11:13:35 +01:00

99 lines
2.9 KiB
Python

import requests
import json
import logging
import time
import aiohttp
from fastapi import FastAPI, Header, Request, Query, HTTPException
from fastapi.responses import JSONResponse
from contextlib import asynccontextmanager
logging.basicConfig(level=logging.INFO)
@asynccontextmanager
async def lifespan(_app: FastAPI):
async with aiohttp.ClientSession(
"https://ip.shronk.tech",
connector=aiohttp.TCPConnector(limit=2048),
raise_for_status=True
) as client:
_app.state.session = client
yield
app = FastAPI(lifespan=lifespan)
app.state.cache = {}
async def make_request(ip: str, headers: dict[str, str]) -> dict | HTTPException:
try:
async with app.state.session.get(f"/lookup?ip={ip}") as response:
data = await response.json()
except json.JSONDecodeError as e:
logging.error("Failed to parse data for %s: %s", ip, e, exc_info=True)
return HTTPException(500, "Failed to parse upstream response.")
except Exception as e:
logging.error("Failed to get data for %s: %s", ip, e, exc_info=True)
return HTTPException(500, "Failed to get upstream data.")
return data
@app.get("/")
async def ip(
request: Request,
X_Forwarded_For: str = Header(None),
User_Agent: str = Header("Mozilla/5.0"),
lookup: str = Query(None),
):
if lookup:
ip = lookup
elif X_Forwarded_For:
ip = X_Forwarded_For
else:
ip = request.client.host
if ip in app.state.cache:
data, timestamp = app.state.cache[ip]
if time.time() - timestamp < 3600:
logging.info("cache hit for %s", ip)
return JSONResponse(data)
logging.info("cache expired for %s", ip)
logging.info("looking up IP info for %s", ip)
result = await make_request(
ip,
{"User-Agent": User_Agent}
)
if isinstance(result, HTTPException):
return result
logging.info("%s -> %r", ip, data)
data["ip"] = ip
data.pop("legalese", None)
data.pop("source", None)
data.pop("brexitRequired", None)
app.state.cache[ip] = [data, time.time()]
return JSONResponse(data)
@app.get("/lookup")
async def lookup(ip: str = Query(...), User_Agent: str = Header("Mozilla/5.0")):
if ip in app.state.cache:
data, timestamp = app.state.cache[ip]
if time.time() - timestamp < 3600:
logging.info("cache hit for %s", ip)
return JSONResponse(data)
logging.info("cache expired for %s", ip)
logging.info("looking up IP info for %s", ip)
result = await make_request(
ip,
{"User-Agent": User_Agent}
)
if isinstance(result, HTTPException):
return result
logging.info("%s -> %r", ip, data)
data["ip"] = ip
data.pop("legalese", None)
data.pop("source", None)
data.pop("brexitRequired", None)
app.state.cache[ip] = [data, time.time()]
return JSONResponse(data)