service_man/api/uow/repository.py

38 lines
1.0 KiB
Python

from abc import ABC, abstractmethod
from typing import Generic, TypeVar
from sqlalchemy import insert, select
from sqlalchemy.ext.asyncio import AsyncSession
import api.models as models
ModelType = TypeVar("ModelType", bound=models.Base)
class AbstractRepository(ABC):
@abstractmethod
async def add_one(self, data: dict):
raise NotImplementedError()
@abstractmethod
async def find_all(self):
raise NotImplementedError()
class SQLAlchemyRepository(AbstractRepository, Generic[ModelType]):
model: type[ModelType]
def __init__(self, session: AsyncSession):
self.session = session
async def add_one(self, data: dict) -> ModelType:
stmt = insert(self.model).values(**data)
res = await self.session.execute(stmt)
return res.scalar_one()
async def find_all(self) -> list[ModelType]:
stmt = select(self.model)
res = await self.session.execute(stmt)
res = [row[0].to_read_model() for row in res.all()]
return res