from typing import Annotated from fastapi import Depends, HTTPException, Request, Response, status from fastapi.openapi.models import OAuthFlows as OAuthFlowsModel from fastapi.security import OAuth2 from fastapi.security.utils import get_authorization_scheme_param from api.application.protocols.jwt import JwtTokenProcessor from api.domain.user.error import UserIsNotAuthorizedError from api.infrastructure.dependencies.stub import Stub class OAuth2PasswordBearerWithCookie(OAuth2): def __init__( self, tokenUrl: str, scheme_name: str | None = None, scopes: dict[str, str] | None = None, auto_error: bool = True, ): if not scopes: scopes = {} flows = OAuthFlowsModel(password={"tokenUrl": tokenUrl, "scopes": scopes}) super().__init__(flows=flows, scheme_name=scheme_name, auto_error=auto_error) async def __call__(self, request: Request) -> str | None: authorization: str | None = request.cookies.get("access_token") scheme, param = get_authorization_scheme_param(authorization) if authorization is None or scheme.lower() != "bearer": if self.auto_error: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Not authenticated", headers={"WWW-Authenticate": "Bearer"}, ) else: return None print(param) return param oauth2_scheme = OAuth2PasswordBearerWithCookie("/auth/login") async def auth_required( request: Request, token: Annotated[ str, Depends(oauth2_scheme), ], jwt_processor: Annotated[JwtTokenProcessor, Depends(Stub(JwtTokenProcessor))], ) -> None: if token is None: raise UserIsNotAuthorizedError("Invalid authorization credentials") if jwt_processor.validate_token(token=token) is None: raise UserIsNotAuthorizedError("authorization credentials is old") request.scope["auth"] = token