Add accounts system
All checks were successful
Build and Publish / build_and_publish (push) Successful in 59s

This commit is contained in:
Nexus 2024-07-11 18:44:15 +01:00
parent 2a158b44db
commit 5a7d8b5b62
Signed by: nex
GPG key ID: 0FA334385D0B689F
4 changed files with 116 additions and 10 deletions

View file

@ -1,8 +1,6 @@
FROM python:3.11-slim
#RUN apt-get update
#RUN apt-get install -y gcc
WORKDIR /app
COPY requirements.txt /app
RUN pip install --no-cache-dir -r requirements.txt
COPY src/ /app
CMD ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "80"]
CMD ["gunicorn", "main:app", "-w", "4", "-k", "uvicorn_worker.UvicornWorker", "-b", "0.0.0.0:80", "--forwarded-allow-ips", "*"]

View file

@ -3,3 +3,6 @@ tortoise-orm[asyncpg,asyncmy,asyncodbc,accel]~=0.21
uvicorn[standard]~=0.30
jinja2~=3.1
requests~=2.32
argon2-cffi~=23.1
gunicorn~=22.0
uvicorn-worker~=0.2

View file

@ -1,3 +1,4 @@
import datetime
import secrets
import uuid
from tortoise.models import Model
@ -12,9 +13,34 @@ class Redirect(Model):
)
destination = fields.CharField(max_length=8192)
created_at = fields.DatetimeField(auto_now_add=True)
expires = fields.DatetimeField(null=True)
expires = fields.DatetimeField(
null=True,
default=lambda: datetime.datetime.now(datetime.timezone.utc) + datetime.timedelta(days=180)
)
max_visits = fields.IntField(null=True)
visits = fields.IntField(default=0)
account = fields.ForeignKeyField("models.Account", related_name="redirects", null=True)
class Account(Model):
uuid = fields.UUIDField(primary_key=True, default=uuid.uuid4)
username = fields.CharField(min_length=3, max_length=255, unique=True)
password = fields.TextField()
created_at = fields.DatetimeField(auto_now_add=True)
token = fields.TextField(default=secrets.token_urlsafe)
access_history: fields.ReverseRelation["AccessHistory"]
redirects: fields.ReverseRelation["Redirect"]
class AccessHistory(Model):
uuid = fields.UUIDField(primary_key=True, default=uuid.uuid4)
account = fields.ForeignKeyField("models.Account", related_name="access_history")
ip_address = fields.TextField()
user_agent = fields.TextField()
timestamp = fields.DatetimeField(auto_now_add=True)
RedirectPydantic = pydantic_model_creator(Redirect)
AccountPydantic = pydantic_model_creator(Account, exclude=("password", "token"))
AccessHistoryPydantic = pydantic_model_creator(AccessHistory)

View file

@ -3,10 +3,14 @@ import os
import secrets
import typing
import uuid
import collections
from argon2 import PasswordHasher
from argon2.exceptions import Argon2Error
import db
import requests
from fastapi import FastAPI, Request, HTTPException, status, Form
from fastapi import FastAPI, Request, HTTPException, status, Form, Depends
from fastapi.security import HTTPBasic, HTTPBasicCredentials
from fastapi.staticfiles import StaticFiles
from fastapi.responses import RedirectResponse, JSONResponse
from fastapi.templating import Jinja2Templates
@ -52,8 +56,27 @@ async def lifespan(_app: FastAPI):
yield
async def has_account(req: Request, credentials: HTTPBasicCredentials = Depends(HTTPBasic())) -> db.Account:
account = await db.Account.get_or_none(username=credentials.username)
if account is None:
raise HTTPException(status_code=401, detail="Invalid credentials", headers={"WWW-Authenticate": "Basic"})
ph = PasswordHasher()
try:
ph.verify(account.password, credentials.password)
except Argon2Error:
if not secrets.compare_digest(account.token, credentials.password):
raise HTTPException(status_code=401, detail="Invalid credentials", headers={"WWW-Authenticate": "Basic"})
await db.AccessHistory.create(
account=account,
ip_address=req.client.host or "0.0.0.0",
user_agent=req.headers.get("User-Agent", "Unknown"),
)
return account
app = FastAPI(lifespan=lifespan, contact=get_contact_details())
templates = Jinja2Templates(directory="templates")
pin_cache = collections.deque(maxlen=100)
@app.get("/r/{key}")
@ -109,6 +132,7 @@ async def create_redirect(
max_visits: typing.Optional[int] = Form(None),
slug_length: int = Form(2048, gt=0, lte=16389),
slug_type: str = Form("urlsafe", regex="^(urlsafe|hex|words|uuid)$"),
account: typing.Annotated[db.Account, Depends(has_account)] = None
):
parsed = urlparse(destination)
if not parsed.scheme or not parsed.netloc:
@ -142,23 +166,78 @@ async def create_redirect(
slug = ":".join(uuids)
case _:
raise HTTPException(status_code=400, detail="Invalid slug type")
slug = slug[:slug_length]
while await db.Redirect.get_or_none(slug=slug) is not None:
slug = secrets.token_urlsafe(slug_length // 2)
redirect = await db.Redirect.create(
destination=destination, expires=expires, max_visits=max_visits, slug=slug
destination=destination, expires=expires, max_visits=max_visits, slug=slug, account=account
)
response.status_code = 201
return await db.RedirectPydantic.from_tortoise_orm(redirect)
# @app.delete("/api/delete/{slug}", response_model=db.RedirectPydantic)
async def delete_redirect(slug: str, token: str):
redirect = await db.Redirect.get_or_none(slug=slug)
@app.delete("/api/delete/{slug}", response_model=db.RedirectPydantic)
async def delete_redirect(slug: str, account: typing.Annotated[db.Account, Depends(has_account)]):
redirect = await db.Redirect.get_or_none(slug=slug, account=account)
if redirect is None:
raise HTTPException(status_code=404, detail="Not Found")
resp = await db.RedirectPydantic.from_tortoise_orm(redirect)
await redirect.delete()
return await db.RedirectPydantic.from_tortoise_orm(redirect)
return resp
@app.get("/api/account", response_model=db.AccountPydantic)
async def get_account(account: typing.Annotated[db.Account, Depends(has_account)]):
return await db.AccountPydantic.from_tortoise_orm(account)
@app.get("/api/account/audit", response_model=list[db.AccessHistoryPydantic])
async def get_account_access_logs(account: typing.Annotated[db.Account, Depends(has_account)]):
logs = await db.AccessHistory.filter(account=account)
response = []
for log in logs:
response.append(await db.AccessHistoryPydantic.from_tortoise_orm(log))
return response
@app.post("/api/account", response_model=db.AccountPydantic)
async def create_account(
resp: JSONResponse,
username: str = Form(...),
password: str = Form(...),
code: int = None
):
existing = await db.Account.get_or_none(username=username)
if existing is not None:
raise HTTPException(status_code=409, detail="Account already exists")
if code is not None:
if code not in pin_cache:
raise HTTPException(status_code=403, detail="Invalid code")
pin_cache.remove(code)
acc = await db.Account.create(
username=username,
password=PasswordHasher().hash(password)
)
resp.headers["X-Access-Token"] = acc.token
return await db.AccountPydantic.from_tortoise_orm(acc)
else:
pin = secrets.randbelow(1000000)
pin_cache.append(pin)
print(f"\n\n\tPin to create account %r: %d\n\n" % (username, pin), flush=True)
return JSONResponse(
content={
"detail": "Pin sent to console. Check your logs, and re-call with ?pin=<pin> to create account."
}
)
@app.delete("/api/account", response_model=db.AccountPydantic)
async def delete_account(account: typing.Annotated[db.Account, Depends(has_account)]):
resp = await db.AccountPydantic.from_tortoise_orm(account)
await account.delete()
return resp
app.mount("/", StaticFiles(directory="static", html=True), name="static")