61 lines
2.0 KiB
Python
61 lines
2.0 KiB
Python
|
from typing import Annotated
|
||
|
|
||
|
from fastapi import Depends, HTTPException, Request, Response, status
|
||
|
from fastapi.openapi.models import OAuthFlows as OAuthFlowsModel
|
||
|
from fastapi.security import OAuth2
|
||
|
from fastapi.security.utils import get_authorization_scheme_param
|
||
|
|
||
|
from api.application.protocols.jwt import JwtTokenProcessor
|
||
|
from api.domain.user.error import UserIsNotAuthorizedError
|
||
|
from api.infrastructure.dependencies.stub import Stub
|
||
|
|
||
|
|
||
|
class OAuth2PasswordBearerWithCookie(OAuth2):
|
||
|
def __init__(
|
||
|
self,
|
||
|
tokenUrl: str,
|
||
|
scheme_name: str | None = None,
|
||
|
scopes: dict[str, str] | None = None,
|
||
|
auto_error: bool = True,
|
||
|
):
|
||
|
if not scopes:
|
||
|
scopes = {}
|
||
|
flows = OAuthFlowsModel(password={"tokenUrl": tokenUrl, "scopes": scopes})
|
||
|
super().__init__(flows=flows, scheme_name=scheme_name, auto_error=auto_error)
|
||
|
|
||
|
async def __call__(self, request: Request) -> str | None:
|
||
|
authorization: str | None = request.cookies.get("access_token")
|
||
|
|
||
|
scheme, param = get_authorization_scheme_param(authorization)
|
||
|
if authorization is None or scheme.lower() != "bearer":
|
||
|
if self.auto_error:
|
||
|
raise HTTPException(
|
||
|
status_code=status.HTTP_401_UNAUTHORIZED,
|
||
|
detail="Not authenticated",
|
||
|
headers={"WWW-Authenticate": "Bearer"},
|
||
|
)
|
||
|
else:
|
||
|
return None
|
||
|
print(param)
|
||
|
return param
|
||
|
|
||
|
|
||
|
oauth2_scheme = OAuth2PasswordBearerWithCookie("/auth/login")
|
||
|
|
||
|
|
||
|
async def auth_required(
|
||
|
request: Request,
|
||
|
token: Annotated[
|
||
|
str,
|
||
|
Depends(oauth2_scheme),
|
||
|
],
|
||
|
jwt_processor: Annotated[JwtTokenProcessor, Depends(Stub(JwtTokenProcessor))],
|
||
|
) -> None:
|
||
|
if token is None:
|
||
|
raise UserIsNotAuthorizedError("Invalid authorization credentials")
|
||
|
|
||
|
if jwt_processor.validate_token(token=token) is None:
|
||
|
raise UserIsNotAuthorizedError("authorization credentials is old")
|
||
|
|
||
|
request.scope["auth"] = token
|