From 5a2c2e7ffc70b91393b2a0f305529a9b54afc6e1 Mon Sep 17 00:00:00 2001 From: nexy7574 Date: Sun, 28 Apr 2024 01:31:20 +0100 Subject: [PATCH] Utilise many backends --- ipserv.py | 82 +++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 82 insertions(+) diff --git a/ipserv.py b/ipserv.py index b93a76a..7178571 100644 --- a/ipserv.py +++ b/ipserv.py @@ -87,6 +87,7 @@ async def make_request(ip: str, headers: dict[str, str] = None) -> dict | HTTPEx try: async with app.state.session.get(f"/lookup?ip={ip}", headers=headers) as response: data = await response.json() + data["source"] = "SHRoNKNet" except json.JSONDecodeError as e: logging.error("Failed to parse data for %s: %s", ip, e, exc_info=True) return HTTPException(500, "Failed to parse upstream response.") @@ -96,6 +97,87 @@ async def make_request(ip: str, headers: dict[str, str] = None) -> dict | HTTPEx return data +async def _get_ipinfo_no_token(ip: str): + try: + async with app.state.session.get("https://ipinfo.io/widget/demo/" + ip) as response: + data = (await response.json())["data"] + return { + "ip": data["ip"], + "city": data["city"], + "country": data["region"], + "countryCode": data["country"], + "asn": int(data["asn"]["asn"][2:]), + "isp": data["asn"]["name"], + "source": "ipinfo", + "lat": data["loc"].split(",")[0], + "lon": data["loc"].split(",")[1], + "hostname": data["hostname"], + "timezone": data["timezone"], + "subnet": data["abuse"]["network"], + "abuse": data["abuse"]["email"] + } + except json.JSONDecodeError as e: + logging.error("Failed to parse data for %s: %s", ip, e, exc_info=True) + return HTTPException(500, "Failed to parse upstream response.") + except Exception as e: + logging.error("Failed to get data for %s: %s", ip, e, exc_info=True) + return HTTPException(500, "Failed to get upstream data.") + + +async def _get_ipinfo_token(ip: str, token: str): + try: + async with app.state.session.get("https://ipinfo.io/%s?token=%s" % (ip, token)) as response: + data = await response.json() + return { + "ip": data["ip"], + "city": data["city"], + "country": data["region"], + "countryCode": data["country"], + "asn": int(data["org"].split()[0][2:]), + "isp": data["org"].split(" ", 1)[1], + "source": "ipinfo", + "lat": data["loc"].split(",")[0], + "lon": data["loc"].split(",")[1], + "hostname": data["hostname"], + "timezone": data["timezone"], + } + except json.JSONDecodeError as e: + logging.error("Failed to parse data for %s: %s", ip, e, exc_info=True) + return HTTPException(500, "Failed to parse upstream response.") + except Exception as e: + logging.error("Failed to get data for %s: %s", ip, e, exc_info=True) + return HTTPException(500, "Failed to get upstream data.") + + +async def get_from_ipinfo(ip: str) -> dict | HTTPException: + if ip in app.state.cache: + data, timestamp = app.state.cache[ip] + if time.time() - timestamp < 3600: + return data + + token = os.getenv("IPINFO_TOKEN") + if token: + return await _get_ipinfo_token(ip, token) + return await _get_ipinfo_no_token(ip) + + +async def get_ip_information(ip: str) -> dict | HTTPException: + try: + data = await make_request(ip) + if isinstance(data, dict): + return data + except Exception as e: + logging.error("Failed to contact shronk net ip: %s", e, exc_info=True) + + try: + data = await get_ip_information(ip) + except Exception as e: + logging.error("Failed to contact ipinfo: %s", e, exc_info=True) + else: + return data + return {"ip": ip} + + @app.get("/") async def ip( request: Request,