41 lines
1.3 KiB
Python
41 lines
1.3 KiB
Python
from collections.abc import AsyncIterable
|
|
from typing import Annotated
|
|
|
|
from fastapi import Depends
|
|
from sqlalchemy.ext.asyncio import (AsyncEngine, AsyncSession,
|
|
async_sessionmaker, create_async_engine)
|
|
|
|
from api.application.abstractions.transaction import TransactionContextManager
|
|
from api.infrastructure.dependencies.stub import Stub
|
|
from api.infrastructure.persistence.transaction import \
|
|
SqlalchemyTransactionContextManager
|
|
from api.infrastructure.settings import Settings
|
|
|
|
|
|
def get_transaction_context(
|
|
session: Annotated[AsyncSession, Depends(Stub(AsyncSession))]
|
|
) -> TransactionContextManager:
|
|
return SqlalchemyTransactionContextManager(session)
|
|
|
|
|
|
def create_engine(
|
|
settings: Annotated[Settings, Depends(Stub(Settings))],
|
|
) -> AsyncEngine:
|
|
return create_async_engine(settings.db.db_url)
|
|
|
|
|
|
def create_session_maker(
|
|
engine: Annotated[AsyncEngine, Depends(Stub(AsyncEngine))],
|
|
) -> async_sessionmaker[AsyncSession]:
|
|
return async_sessionmaker(engine, expire_on_commit=False)
|
|
|
|
|
|
async def new_session(
|
|
session_maker: Annotated[
|
|
async_sessionmaker[AsyncSession],
|
|
Depends(Stub(async_sessionmaker[AsyncSession])),
|
|
],
|
|
) -> AsyncIterable[AsyncSession]:
|
|
async with session_maker() as session:
|
|
yield session
|