44 lines
1.3 KiB
Python
44 lines
1.3 KiB
Python
from collections.abc import AsyncIterable
|
|
from typing import Annotated
|
|
|
|
from fastapi import Depends
|
|
from sqlalchemy.ext.asyncio import (
|
|
AsyncEngine,
|
|
AsyncSession,
|
|
async_sessionmaker,
|
|
create_async_engine,
|
|
)
|
|
|
|
from api.application.abstractions.transaction import TransactionContextManager
|
|
from api.infrastructure.dependencies.stub import Stub
|
|
from api.infrastructure.persistence.transaction import (
|
|
SqlalchemyTransactionContextManager,
|
|
)
|
|
from api.infrastructure.settings import Settings
|
|
|
|
|
|
def get_transaction_context(session: Annotated[AsyncSession, Depends(Stub(AsyncSession))]) -> TransactionContextManager:
|
|
return SqlalchemyTransactionContextManager(session)
|
|
|
|
|
|
def create_engine(
|
|
settings: Annotated[Settings, Depends(Stub(Settings))],
|
|
) -> AsyncEngine:
|
|
return create_async_engine(settings.db.db_url)
|
|
|
|
|
|
def create_session_maker(
|
|
engine: Annotated[AsyncEngine, Depends(Stub(AsyncEngine))],
|
|
) -> async_sessionmaker[AsyncSession]:
|
|
return async_sessionmaker(engine, expire_on_commit=False)
|
|
|
|
|
|
async def new_session(
|
|
session_maker: Annotated[
|
|
async_sessionmaker[AsyncSession],
|
|
Depends(Stub(async_sessionmaker[AsyncSession])),
|
|
],
|
|
) -> AsyncIterable[AsyncSession]:
|
|
async with session_maker() as session:
|
|
yield session
|