Fix IPServer

This commit is contained in:
Nexus 2024-04-28 01:04:24 +01:00
parent 6b29f7d868
commit 3ba2c19aa9
Signed by: nex
GPG key ID: 0FA334385D0B689F
2 changed files with 46 additions and 35 deletions

4
.gitignore vendored
View file

@ -1,4 +1,6 @@
.idea/
.vscode/
.venv/
venv/
venv/
*.pyc
__pycache__/

View file

@ -6,7 +6,7 @@ import aiohttp
import random
import ipaddress
from fastapi import FastAPI, Header, Request, Query, HTTPException
from fastapi.responses import JSONResponse, PlainTextResponse
from fastapi.responses import JSONResponse, PlainTextResponse, RedirectResponse
from fastapi.staticfiles import StaticFiles
from fastapi.middleware.cors import CORSMiddleware
from contextlib import asynccontextmanager
@ -57,12 +57,26 @@ app.add_middleware(
@app.middleware("http")
async def http_middleware(request: Request, call_next):
if app.state.meta.waiting >= 2512:
return JSONResponse({"error": os.urandom(512).decode("latin-1", "replace")}, status_code=503)
return JSONResponse({"detail": os.urandom(512).decode("latin-1", "replace")}, status_code=503)
try:
_ip = ipaddress.ip_address(request.client.host)
if isinstance(_ip, ipaddress.IPv6Address):
return JSONResponse(
{"detail": "IPv6 is not supported at this time."},
status_code=400
)
except ValueError:
return JSONResponse(
{"detail": "Invalid IP address."},
status_code=400
)
with app.state.meta:
return await call_next(request)
async def make_request(ip: str, headers: dict[str, str]) -> dict | HTTPException:
async def make_request(ip: str, headers: dict[str, str] = None) -> dict | HTTPException:
if ip in app.state.cache:
data, timestamp = app.state.cache[ip]
if time.time() - timestamp < 3600:
@ -71,7 +85,7 @@ async def make_request(ip: str, headers: dict[str, str]) -> dict | HTTPException
logging.info("cache expired for %s", ip)
try:
async with app.state.session.get(f"/lookup?ip={ip}") as response:
async with app.state.session.get(f"/lookup?ip={ip}", headers=headers) as response:
data = await response.json()
except json.JSONDecodeError as e:
logging.error("Failed to parse data for %s: %s", ip, e, exc_info=True)
@ -86,7 +100,6 @@ async def make_request(ip: str, headers: dict[str, str]) -> dict | HTTPException
async def ip(
request: Request,
X_Forwarded_For: str = Header(None),
User_Agent: str = Header("Mozilla/5.0"),
lookup: str = Query(None),
):
if lookup:
@ -95,19 +108,12 @@ async def ip(
ip = X_Forwarded_For
else:
ip = request.client.host
try:
_ip = ip_address.ip_address(ip)
if isinstance(_ip, ipaddress.IPv6Address):
raise HTTPException(400, detail="IPv6 is not supported at this time.")
except ValueError:
raise HTTPException(400, detail="%r does not appear to be a valid IP address" % ip)
if ip == "127.0.0.1":
ip = ".".join(map(str, (random.randint(0, 255) for _ in range(4))))
logging.info("looking up IP info for %s", ip)
data = await make_request(
ip,
{"User-Agent": User_Agent}
)
data = await make_request(ip)
if isinstance(data, HTTPException):
raise data
data["ip"] = ip
@ -127,13 +133,9 @@ async def lookup(ip: str = Query(...), User_Agent: str = Header("Mozilla/5.0")):
logging.info("cache hit for %s", ip)
return JSONResponse(data)
logging.info("cache expired for %s", ip)
try:
_ip = ip_address.ip_address(ip)
if isinstance(_ip, ipaddress.IPv6Address):
raise HTTPException(400, detail="IPv6 is not supported at this time.")
except ValueError:
raise HTTPException(400, detail="%r does not appear to be a valid IP address" % ip)
if ip == "127.0.0.1":
ip = ".".join(map(str, (random.randint(0, 255) for _ in range(4))))
logging.info("looking up IP info for %s", ip)
data = await make_request(
@ -153,21 +155,15 @@ async def lookup(ip: str = Query(...), User_Agent: str = Header("Mozilla/5.0")):
@app.get("/imfeelinglucky")
async def im_feeling_lucky(req: Request):
host = req.client.host
try:
_ip = ip_address.ip_address(host)
if isinstance(_ip, ipaddress.IPv6Address):
raise HTTPException(400, detail="IPv6 is not supported at this time.")
except ValueError:
raise HTTPException(400, detail="%r does not appear to be a valid IP address" % host)
data = await make_request(host, {"User-Agent": "Mozilla/5.0 Nex/19.04.2024"})
ip = req.client.host
if ip == "127.0.0.1":
ip = ".".join(map(str, (random.randint(0, 255) for _ in range(4))))
data = await make_request(ip)
if not isinstance(data, dict):
raise data
data = data.copy()
parts = list(map(int, host.split(".")))
parts = list(map(int, ip.split(".")))
n = random.randint(1, 1000)
if n in range(50, 54):
if n > 400:
@ -183,6 +179,12 @@ def get_raw(req: Request):
return PlainTextResponse(req.client.host)
@app.get("/random")
def get_random():
ip = ".".join(map(str, (random.randint(0, 255) for _ in range(4))))
return RedirectResponse(f"/lookup?ip={ip}")
@app.get("/health")
async def get_health():
detail = {"issues": []}
@ -208,3 +210,10 @@ async def get_health():
app.mount("/check", StaticFiles(directory="./cert", html=True), name="shronk-cert")
if __name__ == "__main__":
import uvicorn
logging.critical("Running in development mode! Even then, you should use the docker container!")
app.debug = True
uvicorn.run("ipserv:app", port=0, reload=True, log_level="info")