service_man/api/infrastructure/persistence/repositories/company_repository.py

51 lines
1.6 KiB
Python

from sqlalchemy import text
from sqlalchemy.ext.asyncio import AsyncSession
from api.domain.company.model import (Company, CompanyAddress, CompanyEmail,
CompanyId, CompanyName)
from api.domain.company.repository import CompanyRepository
from api.domain.user.model import UserId
class SqlAlchemyCompanyRepository(CompanyRepository):
def __init__(self, session: AsyncSession) -> None:
self.session = session
async def create_company(self, company: Company) -> None:
stmt = text(
"""INSERT INTO company (id, name, email, address, owner_id)
VALUES(:id, :name, :email, :address, :owner_id)
"""
)
await self.session.execute(
stmt,
{
"id": str(company.id.value),
"name": company.name.value,
"email": company.email.value,
"address": company.address.value,
"owner_id": str(company.owner_id.value),
},
)
async def get_companies_by_owner_email(self, filter: dict) -> list[Company]:
stmt = text("""SELECT * FROM companies WHERE email = :val""")
result = await self.session.execute(stmt, {"val": filter["email"]})
result = result.mappings().all()
return [
Company(
id=CompanyId(c.id),
name=CompanyName(c.name),
email=CompanyEmail(c.email),
address=CompanyAddress(c.address),
owner_id=UserId(c.owner_id),
)
for c in result
]
# async def get_users(self) -> list[User]:
# return []