add company domain, models, route

main
Сергей Ванюшкин 2024-04-08 14:32:50 +00:00
parent 0f5a98b273
commit efdacfa0ce
12 changed files with 131 additions and 6 deletions

View File

@ -1,9 +1,11 @@
from fastapi import FastAPI
from api.presentation.routers import auth_router, healthcheck_router, user_router
from api.presentation.routers import (auth_router, company_router,
healthcheck_router, user_router)
def init_routers(app: FastAPI) -> None:
app.include_router(user_router)
app.include_router(auth_router)
app.include_router(company_router)
app.include_router(healthcheck_router)

View File

@ -0,0 +1,6 @@
from dataclasses import dataclass
@dataclass(frozen=True)
class CompanyByOwnerEmail:
email: str

View File

@ -0,0 +1,7 @@
from dataclasses import dataclass
@dataclass(frozen=True)
class CompanyBaseResponse:
name: str
email: str

View File

View File

@ -0,0 +1,10 @@
from api.domain import DomainError
class CompanyNotFoundError(DomainError): ...
class CompanyAlreadyExistsError(DomainError): ...
class UserAccessError(DomainError): ...

View File

@ -0,0 +1,57 @@
import re
from dataclasses import dataclass
from uuid import UUID, uuid4
from api.domain import DomainValidationError
from api.domain.entity import DomainEntity
from api.domain.user.model import User, UserId
from api.domain.value_obj import DomainValueObject
@dataclass(frozen=True)
class CompanyEmail(DomainValueObject):
value: str
def __post_init__(self) -> None:
pattern = r"^[\w\.-]+@[a-zA-Z\d\.-]+\.[a-zA-Z]{2,}$"
if not re.match(pattern, self.value):
raise DomainValidationError(
"Invalid email format. Email must be in the format 'example@example.com'."
)
@dataclass(frozen=True)
class CompanyName(DomainValueObject):
value: str
def __post_init__(self) -> None:
if len(self.value) < 1:
raise DomainValidationError("First name must be at least 1 character long.")
if len(self.value) > 100:
raise DomainValidationError(
"First name must be at most 100 characters long."
)
if not self.value.isalpha():
raise DomainValidationError("First name must only contain letters.")
@dataclass(frozen=True)
class CompanyId(DomainValueObject):
value: UUID
@dataclass
class Company(DomainEntity[CompanyId]):
name: CompanyName
email: CompanyEmail
owner: User
@staticmethod
def create(name: str, email: str, owner: User) -> "Company":
return Company(
id=CompanyId(uuid4()),
name=CompanyName(name),
email=CompanyEmail(email),
owner=owner,
)

View File

@ -0,0 +1,15 @@
from typing import Protocol
from api.domain.company.model import Company
from api.domain.user.model import User
class CompanyRepository(Protocol):
async def get_company(self, filter: dict) -> User | None:
raise NotImplementedError
async def create_company(self, company: Company) -> None:
raise NotImplementedError
async def get_workes_list(self) -> list[User] | None:
raise NotImplementedError

View File

@ -6,12 +6,15 @@ from jose.jwt import decode, encode
from api.application.protocols.date_time import DateTimeProvider
from api.application.protocols.jwt import JwtTokenProcessor
from api.domain.user.error import UserInvalidCredentialsError
from api.domain.user.model import UserId
from api.infrastructure.auth.jwt_settings import JwtSettings
class JoseJwtTokenProcessor(JwtTokenProcessor):
def __init__(self, jwt_options: JwtSettings, date_time_provider: DateTimeProvider) -> None:
def __init__(
self, jwt_options: JwtSettings, date_time_provider: DateTimeProvider
) -> None:
self.jwt_options = jwt_options
self.date_time_provider = date_time_provider
@ -29,11 +32,15 @@ class JoseJwtTokenProcessor(JwtTokenProcessor):
def validate_token(self, token: str) -> UserId | None:
try:
payload = decode(token, self.jwt_options.secret, [self.jwt_options.algorithm])
payload = decode(
token, self.jwt_options.secret, [self.jwt_options.algorithm]
)
return UserId(UUID(payload["sub"]))
except (JWTError, ValueError, KeyError):
return None
def refresh_token(self, token: str) -> str:
return ""
user = self.validate_token(token)
if user is None:
raise UserInvalidCredentialsError("invalid token")
return self.generate_token(user)

View File

@ -36,7 +36,6 @@ class OAuth2PasswordBearerWithCookie(OAuth2):
)
else:
return None
print(param)
return param

View File

@ -1,4 +1,5 @@
from .auth import auth_router
from .company import company_router
from .ping import healthcheck_router
from .user import user_router
@ -6,4 +7,5 @@ __all__ = (
"healthcheck_router",
"auth_router",
"user_router",
"company_router",
)

View File

@ -0,0 +1,20 @@
from fastapi import APIRouter, Depends, Request
from api.application.contracts.company.company_response import \
CompanyBaseResponse
from api.presentation.auth.fasapi_auth import auth_required
company_router = APIRouter(prefix="/company", tags=["Company"])
@company_router.get(
"/",
response_model=None,
dependencies=[Depends(auth_required)],
)
async def get_company(request: Request) -> CompanyBaseResponse:
print(request.scope["auth"])
return CompanyBaseResponse(
name="some",
email="some",
)