Utilise many backends
Some checks failed
Build and Publish ipserv / build_and_publish (push) Has been cancelled
Some checks failed
Build and Publish ipserv / build_and_publish (push) Has been cancelled
This commit is contained in:
parent
3ba2c19aa9
commit
5a2c2e7ffc
1 changed files with 82 additions and 0 deletions
82
ipserv.py
82
ipserv.py
|
@ -87,6 +87,7 @@ async def make_request(ip: str, headers: dict[str, str] = None) -> dict | HTTPEx
|
||||||
try:
|
try:
|
||||||
async with app.state.session.get(f"/lookup?ip={ip}", headers=headers) as response:
|
async with app.state.session.get(f"/lookup?ip={ip}", headers=headers) as response:
|
||||||
data = await response.json()
|
data = await response.json()
|
||||||
|
data["source"] = "SHRoNKNet"
|
||||||
except json.JSONDecodeError as e:
|
except json.JSONDecodeError as e:
|
||||||
logging.error("Failed to parse data for %s: %s", ip, e, exc_info=True)
|
logging.error("Failed to parse data for %s: %s", ip, e, exc_info=True)
|
||||||
return HTTPException(500, "Failed to parse upstream response.")
|
return HTTPException(500, "Failed to parse upstream response.")
|
||||||
|
@ -96,6 +97,87 @@ async def make_request(ip: str, headers: dict[str, str] = None) -> dict | HTTPEx
|
||||||
return data
|
return data
|
||||||
|
|
||||||
|
|
||||||
|
async def _get_ipinfo_no_token(ip: str):
|
||||||
|
try:
|
||||||
|
async with app.state.session.get("https://ipinfo.io/widget/demo/" + ip) as response:
|
||||||
|
data = (await response.json())["data"]
|
||||||
|
return {
|
||||||
|
"ip": data["ip"],
|
||||||
|
"city": data["city"],
|
||||||
|
"country": data["region"],
|
||||||
|
"countryCode": data["country"],
|
||||||
|
"asn": int(data["asn"]["asn"][2:]),
|
||||||
|
"isp": data["asn"]["name"],
|
||||||
|
"source": "ipinfo",
|
||||||
|
"lat": data["loc"].split(",")[0],
|
||||||
|
"lon": data["loc"].split(",")[1],
|
||||||
|
"hostname": data["hostname"],
|
||||||
|
"timezone": data["timezone"],
|
||||||
|
"subnet": data["abuse"]["network"],
|
||||||
|
"abuse": data["abuse"]["email"]
|
||||||
|
}
|
||||||
|
except json.JSONDecodeError as e:
|
||||||
|
logging.error("Failed to parse data for %s: %s", ip, e, exc_info=True)
|
||||||
|
return HTTPException(500, "Failed to parse upstream response.")
|
||||||
|
except Exception as e:
|
||||||
|
logging.error("Failed to get data for %s: %s", ip, e, exc_info=True)
|
||||||
|
return HTTPException(500, "Failed to get upstream data.")
|
||||||
|
|
||||||
|
|
||||||
|
async def _get_ipinfo_token(ip: str, token: str):
|
||||||
|
try:
|
||||||
|
async with app.state.session.get("https://ipinfo.io/%s?token=%s" % (ip, token)) as response:
|
||||||
|
data = await response.json()
|
||||||
|
return {
|
||||||
|
"ip": data["ip"],
|
||||||
|
"city": data["city"],
|
||||||
|
"country": data["region"],
|
||||||
|
"countryCode": data["country"],
|
||||||
|
"asn": int(data["org"].split()[0][2:]),
|
||||||
|
"isp": data["org"].split(" ", 1)[1],
|
||||||
|
"source": "ipinfo",
|
||||||
|
"lat": data["loc"].split(",")[0],
|
||||||
|
"lon": data["loc"].split(",")[1],
|
||||||
|
"hostname": data["hostname"],
|
||||||
|
"timezone": data["timezone"],
|
||||||
|
}
|
||||||
|
except json.JSONDecodeError as e:
|
||||||
|
logging.error("Failed to parse data for %s: %s", ip, e, exc_info=True)
|
||||||
|
return HTTPException(500, "Failed to parse upstream response.")
|
||||||
|
except Exception as e:
|
||||||
|
logging.error("Failed to get data for %s: %s", ip, e, exc_info=True)
|
||||||
|
return HTTPException(500, "Failed to get upstream data.")
|
||||||
|
|
||||||
|
|
||||||
|
async def get_from_ipinfo(ip: str) -> dict | HTTPException:
|
||||||
|
if ip in app.state.cache:
|
||||||
|
data, timestamp = app.state.cache[ip]
|
||||||
|
if time.time() - timestamp < 3600:
|
||||||
|
return data
|
||||||
|
|
||||||
|
token = os.getenv("IPINFO_TOKEN")
|
||||||
|
if token:
|
||||||
|
return await _get_ipinfo_token(ip, token)
|
||||||
|
return await _get_ipinfo_no_token(ip)
|
||||||
|
|
||||||
|
|
||||||
|
async def get_ip_information(ip: str) -> dict | HTTPException:
|
||||||
|
try:
|
||||||
|
data = await make_request(ip)
|
||||||
|
if isinstance(data, dict):
|
||||||
|
return data
|
||||||
|
except Exception as e:
|
||||||
|
logging.error("Failed to contact shronk net ip: %s", e, exc_info=True)
|
||||||
|
|
||||||
|
try:
|
||||||
|
data = await get_ip_information(ip)
|
||||||
|
except Exception as e:
|
||||||
|
logging.error("Failed to contact ipinfo: %s", e, exc_info=True)
|
||||||
|
else:
|
||||||
|
return data
|
||||||
|
return {"ip": ip}
|
||||||
|
|
||||||
|
|
||||||
@app.get("/")
|
@app.get("/")
|
||||||
async def ip(
|
async def ip(
|
||||||
request: Request,
|
request: Request,
|
||||||
|
|
Reference in a new issue