add some in company and linters

main
Сергей Ванюшкин 2024-04-08 23:55:30 +03:00
parent efdacfa0ce
commit 361378929f
13 changed files with 87 additions and 36 deletions

View File

@ -1,6 +1,6 @@
repos:
- repo: https://github.com/pre-commit/pre-commit-hooks
rev: v2.3.0
rev: v4.6.0
hooks:
- id: trailing-whitespace # убирает лишние пробелы
- id: check-added-large-files # проверяет тяжелые файлы на изменения
@ -12,7 +12,7 @@ repos:
# Отсортировывает импорты в проекте
- repo: https://github.com/pycqa/isort
rev: 5.12.0
rev: 5.13.2
hooks:
- id: isort
exclude: __init__.py
@ -20,29 +20,30 @@ repos:
# Обновляет синтаксис Python кода в соответствии с последними версиями
- repo: https://github.com/asottile/pyupgrade
rev: v3.3.1
rev: v3.15.2
hooks:
- id: pyupgrade
args: [ --py310-plus ]
# Форматирует код под PEP8
- repo: https://github.com/pre-commit/mirrors-autopep8
rev: v2.0.1
rev: v2.0.4
hooks:
- id: autopep8
args: [ "-i", "--in-place", "--max-line-length=120" ]
# Сканер стилистических ошибок, нарушающие договоренности PEP8
- repo: https://github.com/PyCQA/flake8
rev: 6.0.0
rev: 7.0.0
hooks:
- id: flake8
additional_dependencies: [flake8-print, pep8-naming, flake8-bugbear]
exclude: __init__.py
args: [ "--ignore=E501,F821", "--max-line-length=120" ]
# Форматирует код под PEP8 c помощью black
- repo: https://github.com/psf/black
rev: 23.1.0
rev: 24.3.0
hooks:
- id: black
language_version: python3.10
@ -50,7 +51,7 @@ repos:
# Проверка статических типов с помощью mypy
- repo: https://github.com/pre-commit/mirrors-mypy
rev: v0.991
rev: v1.9.0
hooks:
- id: mypy
exclude: 'migrations'

View File

@ -22,7 +22,6 @@ from .routers import init_routers
@asynccontextmanager
async def lifespan(app: FastAPI) -> AsyncGenerator:
print("init lifespan")
app.dependency_overrides[DBSettings] = get_db_settings
app.dependency_overrides[JwtSettings] = get_jwt_settings
app.dependency_overrides[Settings] = app_settings

View File

@ -1,7 +1,11 @@
from fastapi import FastAPI
from api.presentation.routers import (auth_router, company_router,
healthcheck_router, user_router)
from api.presentation.routers import (
auth_router,
company_router,
healthcheck_router,
user_router,
)
def init_routers(app: FastAPI) -> None:

View File

@ -0,0 +1,12 @@
from api.application.contracts.company.company_request import CompanyByOwnerEmail
from api.application.contracts.company.company_response import CompanyBaseResponse
from api.domain.company.repository import CompanyRepository
class GetCompaniesByOwnerEmail:
def __init__(self, company_repository: CompanyRepository) -> None:
self.company_repository = company_repository
async def execute(self, request: CompanyByOwnerEmail) -> list[CompanyBaseResponse]:
companies = await self.company_repository.get_companies_by_owner_email(filter={"email": request.email})
return [CompanyBaseResponse(name=comp.name.value, email=comp.email.value) for comp in companies]

View File

@ -4,7 +4,7 @@ from uuid import UUID, uuid4
from api.domain import DomainValidationError
from api.domain.entity import DomainEntity
from api.domain.user.model import User, UserId
from api.domain.user.model import User
from api.domain.value_obj import DomainValueObject
@ -16,9 +16,7 @@ class CompanyEmail(DomainValueObject):
pattern = r"^[\w\.-]+@[a-zA-Z\d\.-]+\.[a-zA-Z]{2,}$"
if not re.match(pattern, self.value):
raise DomainValidationError(
"Invalid email format. Email must be in the format 'example@example.com'."
)
raise DomainValidationError("Invalid email format. Email must be in the format 'example@example.com'.")
@dataclass(frozen=True)
@ -29,9 +27,7 @@ class CompanyName(DomainValueObject):
if len(self.value) < 1:
raise DomainValidationError("First name must be at least 1 character long.")
if len(self.value) > 100:
raise DomainValidationError(
"First name must be at most 100 characters long."
)
raise DomainValidationError("First name must be at most 100 characters long.")
if not self.value.isalpha():
raise DomainValidationError("First name must only contain letters.")

View File

@ -5,11 +5,11 @@ from api.domain.user.model import User
class CompanyRepository(Protocol):
async def get_company(self, filter: dict) -> User | None:
async def get_companies_by_owner_email(self, filter: dict) -> list[Company]:
raise NotImplementedError
async def create_company(self, company: Company) -> None:
raise NotImplementedError
async def get_workes_list(self) -> list[User] | None:
async def get_workes_list(self) -> list[User]:
raise NotImplementedError

View File

@ -2,20 +2,20 @@ from api.domain import DomainError
class UserNotFoundError(DomainError):
...
pass
class UserValidationError(DomainError):
...
pass
class UserInvalidCredentialsError(DomainError):
...
pass
class UserAlreadyExistsError(DomainError):
...
pass
class UserIsNotAuthorizedError(DomainError):
...
pass

View File

@ -12,9 +12,7 @@ from api.infrastructure.auth.jwt_settings import JwtSettings
class JoseJwtTokenProcessor(JwtTokenProcessor):
def __init__(
self, jwt_options: JwtSettings, date_time_provider: DateTimeProvider
) -> None:
def __init__(self, jwt_options: JwtSettings, date_time_provider: DateTimeProvider) -> None:
self.jwt_options = jwt_options
self.date_time_provider = date_time_provider
@ -32,9 +30,7 @@ class JoseJwtTokenProcessor(JwtTokenProcessor):
def validate_token(self, token: str) -> UserId | None:
try:
payload = decode(
token, self.jwt_options.secret, [self.jwt_options.algorithm]
)
payload = decode(token, self.jwt_options.secret, [self.jwt_options.algorithm])
return UserId(UUID(payload["sub"]))
except (JWTError, ValueError, KeyError):
return None

View File

@ -2,4 +2,4 @@ from sqlalchemy.orm import DeclarativeBase
class Base(DeclarativeBase):
...
pass

View File

@ -0,0 +1,45 @@
# from sqlalchemy import text
# from sqlalchemy.ext.asyncio import AsyncSession
#
# from api.domain.company import CompanyRepository, company
# from api.domain.user.model import UserEmail, UserFirstName, UserId
#
#
# class SqlAlchemyUserRepository(UserRepository):
# def __init__(self, session: AsyncSession) -> None:
# self.session = session
#
# async def create_user(self, user: User) -> None:
# stmt = text(
# """INSERT INTO users (id, name, email, hashed_password)
# VALUES(:id, :name, :email, :hashed_password)
# """
# )
# await self.session.execute(
# stmt,
# {
# "id": str(user.id.value),
# "name": user.name.value,
# "email": user.email.value,
# "hashed_password": user.hashed_password,
# },
# )
#
# async def get_user(self, filter: dict) -> User | None:
# stmt = text("""SELECT * FROM users WHERE email = :val""")
# result = await self.session.execute(stmt, {"val": filter["email"]})
#
# result = result.mappings().one_or_none()
#
# if result is None:
# return None
#
# return User(
# id=UserId(result.id),
# name=UserFirstName(result.name),
# email=UserEmail(result.email),
# hashed_password=result.hashed_password,
# )
#
# async def get_users(self) -> list[User]:
# return []

View File

@ -1,6 +1,6 @@
from typing import Annotated
from fastapi import Depends, HTTPException, Request, Response, status
from fastapi import Depends, HTTPException, Request, status
from fastapi.openapi.models import OAuthFlows as OAuthFlowsModel
from fastapi.security import OAuth2
from fastapi.security.utils import get_authorization_scheme_param
@ -13,7 +13,7 @@ from api.infrastructure.dependencies.stub import Stub
class OAuth2PasswordBearerWithCookie(OAuth2):
def __init__(
self,
tokenUrl: str,
tokenUrl: str, # noqa
scheme_name: str | None = None,
scopes: dict[str, str] | None = None,
auto_error: bool = True,

View File

@ -1,7 +1,6 @@
from fastapi import APIRouter, Depends, Request
from api.application.contracts.company.company_response import \
CompanyBaseResponse
from api.application.contracts.company.company_response import CompanyBaseResponse
from api.presentation.auth.fasapi_auth import auth_required
company_router = APIRouter(prefix="/company", tags=["Company"])
@ -13,7 +12,6 @@ company_router = APIRouter(prefix="/company", tags=["Company"])
dependencies=[Depends(auth_required)],
)
async def get_company(request: Request) -> CompanyBaseResponse:
print(request.scope["auth"])
return CompanyBaseResponse(
name="some",
email="some",