auth next part
This commit is contained in:
0
api/infrastructure/auth/__init__.py
Normal file
0
api/infrastructure/auth/__init__.py
Normal file
37
api/infrastructure/auth/jwt_processor.py
Normal file
37
api/infrastructure/auth/jwt_processor.py
Normal file
@@ -0,0 +1,37 @@
|
||||
from datetime import timedelta
|
||||
from uuid import UUID
|
||||
|
||||
from jose import JWTError
|
||||
from jose.jwt import decode, encode
|
||||
|
||||
from api.application.protocols.date_time import DateTimeProvider
|
||||
from api.application.protocols.jwt import JwtTokenProcessor
|
||||
from api.domain.user.model import UserId
|
||||
from api.infrastructure.auth.jwt_settings import JwtSettings
|
||||
|
||||
|
||||
class JoseJwtTokenProcessor(JwtTokenProcessor):
|
||||
def __init__(self, jwt_options: JwtSettings, date_time_provider: DateTimeProvider) -> None:
|
||||
self.jwt_options = jwt_options
|
||||
self.date_time_provider = date_time_provider
|
||||
|
||||
def generate_token(self, user_id: UserId) -> str:
|
||||
issued_at = self.date_time_provider.get_current_time()
|
||||
expiration_time = issued_at + timedelta(hours=self.jwt_options.expires_in)
|
||||
|
||||
claims = {
|
||||
"iat": issued_at,
|
||||
"exp": expiration_time,
|
||||
"sub": str(user_id.value),
|
||||
}
|
||||
|
||||
return encode(claims, self.jwt_options.secret, self.jwt_options.algorithm)
|
||||
|
||||
def validate_token(self, token: str) -> UserId | None:
|
||||
try:
|
||||
payload = decode(token, self.jwt_options.secret, [self.jwt_options.algorithm])
|
||||
|
||||
return UserId(UUID(payload["sub"]))
|
||||
|
||||
except (JWTError, ValueError, KeyError):
|
||||
return None
|
8
api/infrastructure/auth/jwt_settings.py
Normal file
8
api/infrastructure/auth/jwt_settings.py
Normal file
@@ -0,0 +1,8 @@
|
||||
from dataclasses import dataclass, field
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class JwtSettings:
|
||||
secret: str
|
||||
expires_in: int = field(default=2)
|
||||
algorithm: str = field(default="HS256")
|
Reference in New Issue
Block a user