48 lines
1.5 KiB
Python
48 lines
1.5 KiB
Python
|
from datetime import datetime
|
||
|
|
||
|
from sqlalchemy import and_, desc, func, select
|
||
|
from sqlalchemy.ext.asyncio import AsyncSession
|
||
|
|
||
|
from db.models import Metrics, MetricsQuery, Query, Url
|
||
|
|
||
|
###########################################################
|
||
|
# BLOCK FOR INTERACTION WITH DATABASE IN BUSINESS CONTEXT #
|
||
|
###########################################################
|
||
|
|
||
|
|
||
|
class UrlDAL:
|
||
|
"""Data Access Layer for operating user info"""
|
||
|
|
||
|
def __init__(self, db_session: AsyncSession):
|
||
|
self.db_session = db_session
|
||
|
|
||
|
async def add_new_urls(self, add_values: list[Url]):
|
||
|
for new_url in add_values:
|
||
|
query = select(Url).where(Url.url == new_url.url)
|
||
|
res = await self.db_session.execute(query)
|
||
|
if res.scalar_one_or_none() is None:
|
||
|
self.db_session.add(new_url)
|
||
|
else:
|
||
|
# If nessessary we can update data here
|
||
|
pass
|
||
|
|
||
|
await self.db_session.flush()
|
||
|
return
|
||
|
|
||
|
|
||
|
class MetricDAL:
|
||
|
"""Data Access Layer for operating user info"""
|
||
|
|
||
|
def __init__(self, db_session: AsyncSession):
|
||
|
self.db_session = db_session
|
||
|
|
||
|
async def get_last_metrics_date(self) -> datetime | None:
|
||
|
query = (
|
||
|
select(Metrics.date).group_by(Metrics.date).order_by(Metrics.date.desc())
|
||
|
)
|
||
|
result = await self.db_session.execute(query)
|
||
|
if result is None:
|
||
|
return None
|
||
|
last_date = result.scalars().first()
|
||
|
return last_date
|