This repository has been archived on 2024-06-12. You can view files and clone it, but cannot push or open issues or pull requests.
ipserv/ipserv.py
Nexus bc13c0dcc2
All checks were successful
Build and Publish ipserv / build_and_publish (push) Successful in 1m15s
Disable IPv6
2024-04-23 20:52:34 +01:00

210 lines
6.2 KiB
Python

import json
import logging
import time
import os
import aiohttp
import random
import ipaddress
from fastapi import FastAPI, Header, Request, Query, HTTPException
from fastapi.responses import JSONResponse, PlainTextResponse
from fastapi.staticfiles import StaticFiles
from fastapi.middleware.cors import CORSMiddleware
from contextlib import asynccontextmanager
logging.basicConfig(level=logging.INFO)
class Manager:
def __init__(self, api: FastAPI):
self.app = api
self.waiting = 0
def __enter__(self) -> "Manager":
self.waiting += 1
if self.waiting >= 2048:
logging.critical("TCP pool full! %d/2048. Requests are now being backlogged.")
elif self.waiting > 1024:
logging.warning("TCP pool half full! %d/2048.")
return self
def __exit__(self, *args):
self.waiting -= 1
@asynccontextmanager
async def lifespan(_app: FastAPI):
async with aiohttp.ClientSession(
"https://ip.shronk.tech",
connector=aiohttp.TCPConnector(limit=2048),
raise_for_status=True
) as client:
_app.state.session = client
yield
app = FastAPI(lifespan=lifespan)
app.state.cache = {}
app.state.meta = Manager(app)
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_methods=["GET", "HEAD", "OPTIONS"],
allow_headers=["*"],
)
@app.middleware("http")
async def http_middleware(request: Request, call_next):
if app.state.meta.waiting >= 2512:
return JSONResponse({"error": os.urandom(512).decode("latin-1", "replace")}, status_code=503)
with app.state.meta:
return await call_next(request)
async def make_request(ip: str, headers: dict[str, str]) -> dict | HTTPException:
if ip in app.state.cache:
data, timestamp = app.state.cache[ip]
if time.time() - timestamp < 3600:
logging.info("cache hit for %s", ip)
return data
logging.info("cache expired for %s", ip)
try:
async with app.state.session.get(f"/lookup?ip={ip}") as response:
data = await response.json()
except json.JSONDecodeError as e:
logging.error("Failed to parse data for %s: %s", ip, e, exc_info=True)
return HTTPException(500, "Failed to parse upstream response.")
except Exception as e:
logging.error("Failed to get data for %s: %s", ip, e, exc_info=True)
return HTTPException(500, "Failed to get upstream data.")
return data
@app.get("/")
async def ip(
request: Request,
X_Forwarded_For: str = Header(None),
User_Agent: str = Header("Mozilla/5.0"),
lookup: str = Query(None),
):
if lookup:
ip = lookup
elif X_Forwarded_For:
ip = X_Forwarded_For
else:
ip = request.client.host
try:
_ip = ip_address.ip_address(ip)
if isinstance(_ip, ipaddress.IPv6Address):
raise HTTPException(400, detail="IPv6 is not supported at this time.")
except ValueError:
raise HTTPException(400, detail="%r does not appear to be a valid IP address" % ip)
logging.info("looking up IP info for %s", ip)
data = await make_request(
ip,
{"User-Agent": User_Agent}
)
if isinstance(data, HTTPException):
raise data
data["ip"] = ip
data.pop("legalese", None)
data.pop("source", None)
data.pop("brexitRequired", None)
logging.info("%s -> %r", ip, data)
app.state.cache[ip] = [data, time.time()]
return JSONResponse(data)
@app.get("/lookup")
async def lookup(ip: str = Query(...), User_Agent: str = Header("Mozilla/5.0")):
if ip in app.state.cache:
data, timestamp = app.state.cache[ip]
if time.time() - timestamp < 3600:
logging.info("cache hit for %s", ip)
return JSONResponse(data)
logging.info("cache expired for %s", ip)
try:
_ip = ip_address.ip_address(ip)
if isinstance(_ip, ipaddress.IPv6Address):
raise HTTPException(400, detail="IPv6 is not supported at this time.")
except ValueError:
raise HTTPException(400, detail="%r does not appear to be a valid IP address" % ip)
logging.info("looking up IP info for %s", ip)
data = await make_request(
ip,
{"User-Agent": User_Agent}
)
if isinstance(data, HTTPException):
raise data
data["ip"] = ip
data.pop("legalese", None)
data.pop("source", None)
data.pop("brexitRequired", None)
logging.info("%s -> %r", ip, data)
app.state.cache[ip] = [data, time.time()]
return JSONResponse(data)
@app.get("/imfeelinglucky")
async def im_feeling_lucky(req: Request):
host = req.client.host
try:
_ip = ip_address.ip_address(host)
if isinstance(_ip, ipaddress.IPv6Address):
raise HTTPException(400, detail="IPv6 is not supported at this time.")
except ValueError:
raise HTTPException(400, detail="%r does not appear to be a valid IP address" % host)
data = await make_request(host, {"User-Agent": "Mozilla/5.0 Nex/19.04.2024"})
if not isinstance(data, dict):
raise data
data = data.copy()
parts = list(map(int, host.split(".")))
n = random.randint(1, 1000)
if n in range(50, 54):
if n > 400:
parts[n % 4] += 1
else:
parts[n % 4] -= 1
data["ip"] = ".".join(map(str, parts))
return JSONResponse(data)
@app.get("/raw")
def get_raw(req: Request):
return PlainTextResponse(req.client.host)
@app.get("/health")
async def get_health():
detail = {"issues": []}
if app.state.meta.waiting >= 2048:
detail["status"] = "critical"
detail["issues"].append("(C) Connection pool full.")
elif app.state.meta.waiting >= 1024:
detail["status"] = "warning"
detail["issues"].append("(W) TCP pool half full.")
else:
detail["status"] = "ok"
try:
t = time.perf_counter()
async with app.state.session.get("/") as response:
await response.text()
detail["latency"] = time.perf_counter() - t
except Exception as e:
detail["issues"].append(f"(E) Failed to check upstream: {e}")
detail["status"] = "critical"
return JSONResponse(detail)
app.mount("/check", StaticFiles(directory="./cert", html=True), name="shronk-cert")