diff --git a/src/main.py b/src/main.py index b05f97d..4581a03 100644 --- a/src/main.py +++ b/src/main.py @@ -53,6 +53,16 @@ async def lifespan(_app: FastAPI): generate_schemas=True, add_exception_handlers=True, ): + if not await db.Account.all().count(): + generated_password = secrets.token_hex(16) + _d = await db.Account.create( + username="admin", + password=PasswordHasher().hash(generated_password), + ) + print( + f"\n\n\tGenerated admin account with password: {generated_password} and token {_d.token}\n\n", + flush=True + ) yield @@ -76,7 +86,7 @@ async def has_account(req: Request, credentials: HTTPBasicCredentials = Depends( app = FastAPI(lifespan=lifespan, contact=get_contact_details()) templates = Jinja2Templates(directory="templates") -pin_cache = collections.deque(maxlen=100) +pin_cache = [] @app.get("/r/{key}") @@ -134,6 +144,9 @@ async def create_redirect( slug_type: str = Form("urlsafe", regex="^(urlsafe|hex|words|uuid)$"), account: typing.Annotated[db.Account, Depends(has_account)] = None ): + if os.getenv("ALLOW_ANONYMOUS", "true") is not "true": + if account is None: + raise HTTPException(status_code=401, detail="Unauthorised", headers={"WWW-Authenticate": "Basic"}) parsed = urlparse(destination) if not parsed.scheme or not parsed.netloc: raise HTTPException(status_code=400, detail="Invalid URL") @@ -179,9 +192,11 @@ async def create_redirect( @app.delete("/api/delete/{slug}", response_model=db.RedirectPydantic) async def delete_redirect(slug: str, account: typing.Annotated[db.Account, Depends(has_account)]): - redirect = await db.Redirect.get_or_none(slug=slug, account=account) + redirect = await db.Redirect.get_or_none(slug=slug) if redirect is None: raise HTTPException(status_code=404, detail="Not Found") + elif account.username != "admin" and redirect.account != account: + raise HTTPException(status_code=403, detail="You can only delete your own redirects.") resp = await db.RedirectPydantic.from_tortoise_orm(redirect) await redirect.delete() return resp @@ -204,37 +219,28 @@ async def get_account_access_logs(account: typing.Annotated[db.Account, Depends( @app.post("/api/account", response_model=db.AccountPydantic) async def create_account( resp: JSONResponse, + account: typing.Annotated[db.Account, Depends(has_account)], username: str = Form(...), password: str = Form(...), - code: int = None ): + if account.username != "admin": + raise HTTPException(status_code=403, detail="Only the superuser account can create new users.") existing = await db.Account.get_or_none(username=username) if existing is not None: raise HTTPException(status_code=409, detail="Account already exists") - - if code is not None: - if code not in pin_cache: - raise HTTPException(status_code=403, detail="Invalid code") - pin_cache.remove(code) - acc = await db.Account.create( - username=username, - password=PasswordHasher().hash(password) - ) - resp.headers["X-Access-Token"] = acc.token - return await db.AccountPydantic.from_tortoise_orm(acc) - else: - pin = secrets.randbelow(1000000) - pin_cache.append(pin) - print(f"\n\n\tPin to create account %r: %d\n\n" % (username, pin), flush=True) - return JSONResponse( - content={ - "detail": "Pin sent to console. Check your logs, and re-call with ?pin= to create account." - } - ) + acc = await db.Account.create( + username=username, + password=PasswordHasher().hash(password) + ) + resp.headers["X-Access-Token"] = acc.token + return await db.AccountPydantic.from_tortoise_orm(acc) @app.delete("/api/account", response_model=db.AccountPydantic) -async def delete_account(account: typing.Annotated[db.Account, Depends(has_account)]): +async def delete_account(account: typing.Annotated[db.Account, Depends(has_account)], username: str = None): + username = username or account.username + if account.username not in [username, "admin"]: + raise HTTPException(status_code=403, detail="You can only delete your own account.") resp = await db.AccountPydantic.from_tortoise_orm(account) await account.delete() return resp