import json import logging import time import os import aiohttp import random from fastapi import FastAPI, Header, Request, Query, HTTPException from fastapi.responses import JSONResponse, PlainTextResponse from contextlib import asynccontextmanager logging.basicConfig(level=logging.INFO) class Manager: def __init__(self, api: FastAPI): self.app = api self.waiting = 0 def __enter__(self) -> "Manager": self.waiting += 1 if self.waiting >= 2048: logging.critical("TCP pool full! %d/2048. Requests are now being backlogged.") elif self.waiting > 1024: logging.warning("TCP pool half full! %d/2048.") return self def __exit__(self, *args): self.waiting -= 1 @asynccontextmanager async def lifespan(_app: FastAPI): async with aiohttp.ClientSession( "https://ip.shronk.tech", connector=aiohttp.TCPConnector(limit=2048), raise_for_status=True ) as client: _app.state.session = client yield app = FastAPI(lifespan=lifespan) app.state.cache = {} app.state.meta = Manager(app) @app.middleware("http") async def http_middleware(request: Request, call_next): if app.state.meta.waiting >= 2512: return JSONResponse({"error": os.urandom(512).decode("latin-1", "replace")}, status_code=503) with app.state.meta: return await call_next(request) async def make_request(ip: str, headers: dict[str, str]) -> dict | HTTPException: if ip in app.state.cache: data, timestamp = app.state.cache[ip] if time.time() - timestamp < 3600: logging.info("cache hit for %s", ip) return data logging.info("cache expired for %s", ip) try: async with app.state.session.get(f"/lookup?ip={ip}") as response: data = await response.json() except json.JSONDecodeError as e: logging.error("Failed to parse data for %s: %s", ip, e, exc_info=True) return HTTPException(500, "Failed to parse upstream response.") except Exception as e: logging.error("Failed to get data for %s: %s", ip, e, exc_info=True) return HTTPException(500, "Failed to get upstream data.") return data @app.get("/") async def ip( request: Request, X_Forwarded_For: str = Header(None), User_Agent: str = Header("Mozilla/5.0"), lookup: str = Query(None), ): if lookup: ip = lookup elif X_Forwarded_For: ip = X_Forwarded_For else: ip = request.client.host logging.info("looking up IP info for %s", ip) data = await make_request( ip, {"User-Agent": User_Agent} ) if isinstance(data, HTTPException): raise data data["ip"] = ip data.pop("legalese", None) data.pop("source", None) data.pop("brexitRequired", None) logging.info("%s -> %r", ip, data) app.state.cache[ip] = [data, time.time()] return JSONResponse(data) @app.get("/lookup") async def lookup(ip: str = Query(...), User_Agent: str = Header("Mozilla/5.0")): if ip in app.state.cache: data, timestamp = app.state.cache[ip] if time.time() - timestamp < 3600: logging.info("cache hit for %s", ip) return JSONResponse(data) logging.info("cache expired for %s", ip) logging.info("looking up IP info for %s", ip) data = await make_request( ip, {"User-Agent": User_Agent} ) if isinstance(data, HTTPException): raise data data["ip"] = ip data.pop("legalese", None) data.pop("source", None) data.pop("brexitRequired", None) logging.info("%s -> %r", ip, data) app.state.cache[ip] = [data, time.time()] return JSONResponse(data) @app.get("/imfeelinglucky") async def im_feeling_lucky(req: Request): host = req.client.host if host.count(".") != 3: raise HTTPException(400, "IPv4 only endpoint.") data = await make_request(host, {"User-Agent": "Mozilla/5.0 Nex/19.04.2024"}) if not isinstance(data, dict): raise data data = data.copy() parts = list(map(int, host.split("."))) n = random.randint(1, 1000) if n in range(50, 54): if n > 400: parts[n % 4] += 1 else: parts[n % 4] -= 1 data["ip"] = ".".join(map(str, parts)) return JSONResponse(data) @app.get("/raw") def get_raw(req: Request): return PlainTextResponse(req.client.host) @app.get("/health") async def get_health(): detail = {"issues": []} if app.state.meta.waiting >= 2048: detail["status"] = "critical" detail["issues"].append("(C) Connection pool full.") elif app.state.meta.waiting >= 1024: detail["status"] = "warning" detail["issues"].append("(W) TCP pool half full.") else: detail["status"] = "ok" try: t = time.perf_counter() async with app.state.session.get("/") as response: await response.text() detail["latency"] = time.perf_counter() - t except Exception as e: detail["issues"].append(f"(E) Failed to check upstream: {e}") detail["status"] = "critical" return JSONResponse(detail)