Give superuser account more power
All checks were successful
Build and Publish / build_and_publish (push) Successful in 29s

This commit is contained in:
Nexus 2024-07-11 18:55:42 +01:00
parent 5a7d8b5b62
commit 930bff7aa1
Signed by: nex
GPG key ID: 0FA334385D0B689F

View file

@ -53,6 +53,16 @@ async def lifespan(_app: FastAPI):
generate_schemas=True, generate_schemas=True,
add_exception_handlers=True, add_exception_handlers=True,
): ):
if not await db.Account.all().count():
generated_password = secrets.token_hex(16)
_d = await db.Account.create(
username="admin",
password=PasswordHasher().hash(generated_password),
)
print(
f"\n\n\tGenerated admin account with password: {generated_password} and token {_d.token}\n\n",
flush=True
)
yield yield
@ -76,7 +86,7 @@ async def has_account(req: Request, credentials: HTTPBasicCredentials = Depends(
app = FastAPI(lifespan=lifespan, contact=get_contact_details()) app = FastAPI(lifespan=lifespan, contact=get_contact_details())
templates = Jinja2Templates(directory="templates") templates = Jinja2Templates(directory="templates")
pin_cache = collections.deque(maxlen=100) pin_cache = []
@app.get("/r/{key}") @app.get("/r/{key}")
@ -134,6 +144,9 @@ async def create_redirect(
slug_type: str = Form("urlsafe", regex="^(urlsafe|hex|words|uuid)$"), slug_type: str = Form("urlsafe", regex="^(urlsafe|hex|words|uuid)$"),
account: typing.Annotated[db.Account, Depends(has_account)] = None account: typing.Annotated[db.Account, Depends(has_account)] = None
): ):
if os.getenv("ALLOW_ANONYMOUS", "true") is not "true":
if account is None:
raise HTTPException(status_code=401, detail="Unauthorised", headers={"WWW-Authenticate": "Basic"})
parsed = urlparse(destination) parsed = urlparse(destination)
if not parsed.scheme or not parsed.netloc: if not parsed.scheme or not parsed.netloc:
raise HTTPException(status_code=400, detail="Invalid URL") raise HTTPException(status_code=400, detail="Invalid URL")
@ -179,9 +192,11 @@ async def create_redirect(
@app.delete("/api/delete/{slug}", response_model=db.RedirectPydantic) @app.delete("/api/delete/{slug}", response_model=db.RedirectPydantic)
async def delete_redirect(slug: str, account: typing.Annotated[db.Account, Depends(has_account)]): async def delete_redirect(slug: str, account: typing.Annotated[db.Account, Depends(has_account)]):
redirect = await db.Redirect.get_or_none(slug=slug, account=account) redirect = await db.Redirect.get_or_none(slug=slug)
if redirect is None: if redirect is None:
raise HTTPException(status_code=404, detail="Not Found") raise HTTPException(status_code=404, detail="Not Found")
elif account.username != "admin" and redirect.account != account:
raise HTTPException(status_code=403, detail="You can only delete your own redirects.")
resp = await db.RedirectPydantic.from_tortoise_orm(redirect) resp = await db.RedirectPydantic.from_tortoise_orm(redirect)
await redirect.delete() await redirect.delete()
return resp return resp
@ -204,37 +219,28 @@ async def get_account_access_logs(account: typing.Annotated[db.Account, Depends(
@app.post("/api/account", response_model=db.AccountPydantic) @app.post("/api/account", response_model=db.AccountPydantic)
async def create_account( async def create_account(
resp: JSONResponse, resp: JSONResponse,
account: typing.Annotated[db.Account, Depends(has_account)],
username: str = Form(...), username: str = Form(...),
password: str = Form(...), password: str = Form(...),
code: int = None
): ):
if account.username != "admin":
raise HTTPException(status_code=403, detail="Only the superuser account can create new users.")
existing = await db.Account.get_or_none(username=username) existing = await db.Account.get_or_none(username=username)
if existing is not None: if existing is not None:
raise HTTPException(status_code=409, detail="Account already exists") raise HTTPException(status_code=409, detail="Account already exists")
acc = await db.Account.create(
if code is not None: username=username,
if code not in pin_cache: password=PasswordHasher().hash(password)
raise HTTPException(status_code=403, detail="Invalid code") )
pin_cache.remove(code) resp.headers["X-Access-Token"] = acc.token
acc = await db.Account.create( return await db.AccountPydantic.from_tortoise_orm(acc)
username=username,
password=PasswordHasher().hash(password)
)
resp.headers["X-Access-Token"] = acc.token
return await db.AccountPydantic.from_tortoise_orm(acc)
else:
pin = secrets.randbelow(1000000)
pin_cache.append(pin)
print(f"\n\n\tPin to create account %r: %d\n\n" % (username, pin), flush=True)
return JSONResponse(
content={
"detail": "Pin sent to console. Check your logs, and re-call with ?pin=<pin> to create account."
}
)
@app.delete("/api/account", response_model=db.AccountPydantic) @app.delete("/api/account", response_model=db.AccountPydantic)
async def delete_account(account: typing.Annotated[db.Account, Depends(has_account)]): async def delete_account(account: typing.Annotated[db.Account, Depends(has_account)], username: str = None):
username = username or account.username
if account.username not in [username, "admin"]:
raise HTTPException(status_code=403, detail="You can only delete your own account.")
resp = await db.AccountPydantic.from_tortoise_orm(account) resp = await db.AccountPydantic.from_tortoise_orm(account)
await account.delete() await account.delete()
return resp return resp