39 lines
1.0 KiB
Python
39 lines
1.0 KiB
Python
from abc import ABC, abstractmethod
|
|
from typing import Generic, TypeVar
|
|
from uuid import UUID
|
|
|
|
from sqlalchemy import insert, select
|
|
from sqlalchemy.ext.asyncio import AsyncSession
|
|
|
|
from api.model.base import Base
|
|
|
|
ModelType = TypeVar("ModelType", bound=Base)
|
|
|
|
|
|
class AbstractRepository(ABC):
|
|
@abstractmethod
|
|
async def add_one(self, data: dict):
|
|
raise NotImplementedError
|
|
|
|
@abstractmethod
|
|
async def find_all(self):
|
|
raise NotImplementedError
|
|
|
|
|
|
class SQLAlchemyRepository(AbstractRepository, Generic[ModelType]):
|
|
model: type[ModelType]
|
|
|
|
def __init__(self, session: AsyncSession):
|
|
self.session = session
|
|
|
|
async def add_one(self, data: dict) -> UUID:
|
|
stmt = insert(self.model).values(**data).returning(self.model.id)
|
|
res = await self.session.execute(stmt)
|
|
return res.scalar_one()
|
|
|
|
async def find_all(self):
|
|
stmt = select(self.model)
|
|
res = await self.session.execute(stmt)
|
|
res = [row[0].to_read_model() for row in res.all()]
|
|
return res
|