import json import logging import time import os import aiohttp import random import ipaddress from fastapi import FastAPI, Header, Request, Query, HTTPException from fastapi.responses import JSONResponse, PlainTextResponse, RedirectResponse from fastapi.staticfiles import StaticFiles from fastapi.middleware.cors import CORSMiddleware from contextlib import asynccontextmanager logging.basicConfig(level=logging.INFO) class Manager: def __init__(self, api: FastAPI): self.app = api self.waiting = 0 def __enter__(self) -> "Manager": self.waiting += 1 if self.waiting >= 2048: logging.critical("TCP pool full! %d/2048. Requests are now being backlogged.") elif self.waiting > 1024: logging.warning("TCP pool half full! %d/2048.") return self def __exit__(self, *args): self.waiting -= 1 @asynccontextmanager async def lifespan(_app: FastAPI): async with aiohttp.ClientSession( "https://ip.shronk.tech", connector=aiohttp.TCPConnector(limit=2048), raise_for_status=True, timeout=aiohttp.ClientTimeout(5) ) as client: _app.state.session = client yield app = FastAPI(lifespan=lifespan) app.state.cache = {} app.state.meta = Manager(app) app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_methods=["GET", "HEAD", "OPTIONS"], allow_headers=["*"], ) @app.middleware("http") async def http_middleware(request: Request, call_next): if app.state.meta.waiting >= 2512: return JSONResponse({"detail": os.urandom(512).decode("latin-1", "replace")}, status_code=503) try: _ip = ipaddress.ip_address(request.client.host) if isinstance(_ip, ipaddress.IPv6Address): return JSONResponse( {"detail": "IPv6 is not supported at this time."}, status_code=400 ) except ValueError: return JSONResponse( {"detail": "Invalid IP address."}, status_code=400 ) with app.state.meta: return await call_next(request) async def make_request(ip: str, headers: dict[str, str] = None) -> dict | HTTPException: if ip in app.state.cache: data, timestamp = app.state.cache[ip] if time.time() - timestamp < 3600: logging.info("cache hit for %s", ip) return data logging.info("cache expired for %s", ip) try: logging.info("Querying SHRoNKNet: %s", ip) async with app.state.session.get(f"/lookup?ip={ip}", headers=headers) as response: data = await response.json() data["source"] = "SHRoNKNet" except json.JSONDecodeError as e: logging.error("Failed to parse data for %s: %s", ip, e, exc_info=True) return HTTPException(500, "Failed to parse upstream response.") except Exception as e: logging.error("Failed to get data for %s: %s", ip, e, exc_info=True) return HTTPException(500, "Failed to get upstream data.") return data async def _get_ipinfo_no_token(ip: str): try: logging.info("Querying IPInfo WITHOUT TOKEN: %s", ip) async with aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(5)) as session: async with session.get("https://ipinfo.io/widget/demo/" + ip) as response: data = (await response.json())["data"] return { "ip": data["ip"], "city": data["city"], "country": data["region"], "countryCode": data["country"], "asn": int(data["asn"]["asn"][2:]), "isp": data["asn"]["name"], "source": "ipinfo", "lat": data["loc"].split(",")[0], "lon": data["loc"].split(",")[1], "hostname": data["hostname"], "timezone": data["timezone"], "subnet": data["abuse"]["network"], "abuse": data["abuse"]["email"] } except json.JSONDecodeError as e: logging.error("Failed to parse data for %s: %s", ip, e, exc_info=True) return HTTPException(500, "Failed to parse upstream response.") except Exception as e: logging.error("Failed to get data for %s: %s", ip, e, exc_info=True) return HTTPException(500, "Failed to get upstream data.") async def _get_ipinfo_token(ip: str, token: str): try: logging.info("Querying IPInfo WITH token: %s", ip) async with aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(5)) as session: async with session.get("https://ipinfo.io/%s?token=%s" % (ip, token)) as response: data = await response.json() return { "ip": data["ip"], "city": data["city"], "country": data["region"], "countryCode": data["country"], "asn": int(data["org"].split()[0][2:]), "isp": data["org"].split(" ", 1)[1], "source": "ipinfo", "lat": data["loc"].split(",")[0], "lon": data["loc"].split(",")[1], "hostname": data["hostname"], "timezone": data["timezone"], } except json.JSONDecodeError as e: logging.error("Failed to parse data for %s: %s", ip, e, exc_info=True) return HTTPException(500, "Failed to parse upstream response.") except Exception as e: logging.error("Failed to get data for %s: %s", ip, e, exc_info=True) return HTTPException(500, "Failed to get upstream data.") async def get_from_ipinfo(ip: str) -> dict | HTTPException: if ip in app.state.cache: data, timestamp = app.state.cache[ip] if time.time() - timestamp < 3600: return data token = os.getenv("IPINFO_TOKEN") if token: return await _get_ipinfo_token(ip, token) return await _get_ipinfo_no_token(ip) async def get_ip_information(ip: str) -> dict | HTTPException: try: data = await make_request(ip) if isinstance(data, dict): return data except Exception as e: logging.error("Failed to contact shronk net ip: %s", e, exc_info=True) try: data = await get_from_ipinfo(ip) except Exception as e: logging.error("Failed to contact ipinfo: %s", e, exc_info=True) else: return data return {"ip": ip} @app.get("/") async def ip( request: Request, X_Forwarded_For: str = Header(None), lookup: str = Query(None), ): if lookup: ip = lookup elif X_Forwarded_For: ip = X_Forwarded_For else: ip = request.client.host if ip == "127.0.0.1": ip = ".".join(map(str, (random.randint(0, 255) for _ in range(4)))) logging.info("looking up IP info for %s", ip) data = await get_ip_information(ip) if isinstance(data, HTTPException): raise data data["ip"] = ip data.pop("legalese", None) data.pop("source", None) data.pop("brexitRequired", None) logging.info("%s -> %r", ip, data) app.state.cache[ip] = [data, time.time()] return JSONResponse(data) @app.get("/lookup") async def lookup(ip: str = Query(...), User_Agent: str = Header("Mozilla/5.0")): if ip in app.state.cache: data, timestamp = app.state.cache[ip] if time.time() - timestamp < 3600: logging.info("cache hit for %s", ip) return JSONResponse(data) logging.info("cache expired for %s", ip) if ip == "127.0.0.1": ip = ".".join(map(str, (random.randint(0, 255) for _ in range(4)))) logging.info("looking up IP info for %s", ip) data = await make_request( ip, {"User-Agent": User_Agent} ) if isinstance(data, HTTPException): raise data data["ip"] = ip data.pop("legalese", None) data.pop("source", None) data.pop("brexitRequired", None) logging.info("%s -> %r", ip, data) app.state.cache[ip] = [data, time.time()] return JSONResponse(data) @app.get("/imfeelinglucky") async def im_feeling_lucky(req: Request): ip = req.client.host if ip == "127.0.0.1": ip = ".".join(map(str, (random.randint(0, 255) for _ in range(4)))) data = await make_request(ip) if not isinstance(data, dict): raise data data = data.copy() parts = list(map(int, ip.split("."))) n = random.randint(1, 1000) if n in range(50, 54): if n > 400: parts[n % 4] += 1 else: parts[n % 4] -= 1 data["ip"] = ".".join(map(str, parts)) return JSONResponse(data) @app.get("/raw") def get_raw(req: Request): return PlainTextResponse(req.client.host) @app.get("/random") def get_random(): ip = ".".join(map(str, (random.randint(0, 255) for _ in range(4)))) return RedirectResponse(f"/lookup?ip={ip}") @app.get("/health") async def get_health(): detail = {"issues": []} if app.state.meta.waiting >= 2048: detail["status"] = "critical" detail["issues"].append("(C) Connection pool full.") elif app.state.meta.waiting >= 1024: detail["status"] = "warning" detail["issues"].append("(W) TCP pool half full.") else: detail["status"] = "ok" try: t = time.perf_counter() async with app.state.session.get("/") as response: await response.text() detail["latency"] = time.perf_counter() - t except Exception as e: detail["issues"].append(f"(E) Failed to check upstream: {e}") detail["status"] = "critical" return JSONResponse(detail) app.mount("/check", StaticFiles(directory="./cert", html=True), name="shronk-cert") if __name__ == "__main__": import uvicorn logging.critical("Running in development mode! Even then, you should use the docker container!") app.debug = True uvicorn.run("ipserv:app", port=0, reload=True, log_level="info")