From 8ccc0f0ce0298e5243981d3c796ff3404c70303e Mon Sep 17 00:00:00 2001 From: Sergey Vanyushkin Date: Fri, 5 Jul 2024 00:24:49 +0300 Subject: [PATCH] testcase stage2 --- api/actions/metrics_url.py | 15 ++++ db/dals.py | 47 +++++++++++ services/load_all_urls.py | 161 +++++++++++++++++++++++++++++++++++++ 3 files changed, 223 insertions(+) create mode 100644 api/actions/metrics_url.py create mode 100644 db/dals.py create mode 100644 services/load_all_urls.py diff --git a/api/actions/metrics_url.py b/api/actions/metrics_url.py new file mode 100644 index 0000000..c8d8af1 --- /dev/null +++ b/api/actions/metrics_url.py @@ -0,0 +1,15 @@ +from db.dals import MetricDAL + + +async def _add_new_metrics(urls, session): + async with session() as s: + order_dal = MetricDAL(s) + order_id = await order_dal.add_new_metrics(urls) + return order_id + + +async def _get_metrics_last_date(session): + async with session() as s: + order_dal = MetricDAL(s) + order_id = await order_dal.get_last_metrics_date() + return order_id diff --git a/db/dals.py b/db/dals.py new file mode 100644 index 0000000..919a325 --- /dev/null +++ b/db/dals.py @@ -0,0 +1,47 @@ +from datetime import datetime + +from sqlalchemy import and_, desc, func, select +from sqlalchemy.ext.asyncio import AsyncSession + +from db.models import Metrics, MetricsQuery, Query, Url + +########################################################### +# BLOCK FOR INTERACTION WITH DATABASE IN BUSINESS CONTEXT # +########################################################### + + +class UrlDAL: + """Data Access Layer for operating user info""" + + def __init__(self, db_session: AsyncSession): + self.db_session = db_session + + async def add_new_urls(self, add_values: list[Url]): + for new_url in add_values: + query = select(Url).where(Url.url == new_url.url) + res = await self.db_session.execute(query) + if res.scalar_one_or_none() is None: + self.db_session.add(new_url) + else: + # If nessessary we can update data here + pass + + await self.db_session.flush() + return + + +class MetricDAL: + """Data Access Layer for operating user info""" + + def __init__(self, db_session: AsyncSession): + self.db_session = db_session + + async def get_last_metrics_date(self) -> datetime | None: + query = ( + select(Metrics.date).group_by(Metrics.date).order_by(Metrics.date.desc()) + ) + result = await self.db_session.execute(query) + if result is None: + return None + last_date = result.scalars().first() + return last_date diff --git a/services/load_all_urls.py b/services/load_all_urls.py new file mode 100644 index 0000000..c28ef20 --- /dev/null +++ b/services/load_all_urls.py @@ -0,0 +1,161 @@ +import asyncio +import time +from datetime import datetime +from typing import Any + +import config +import requests + +from api.actions.metrics_url import _add_new_metrics, _get_metrics_last_date +from api.actions.urls import _add_new_urls +from db.models import Metrics, Url +from db.session import async_session + +ACCESS_TOKEN = f"{config.ACCESS_TOKEN}" +USER_ID = f"{config.USER_ID}" +HOST_ID = f"{config.HOST_ID}" + +date_format = "%Y-%m-%d" +# Формируем URL для запроса мониторинга поисковых запросов +URL = f"https://api.webmaster.yandex.net/v4/user/{USER_ID}/hosts/{HOST_ID}/query-analytics/list" + + +def clean_dataframe(data: dict[str, Any], from_date: datetime): + new_data = {} + new_data["text_indicator_to_statistics"] = [] + + for query in data["text_indicator_to_statistics"]: + tmp = {"text_indicator": query["text_indicator"], "statistics": []} + for el in query["statistics"]: + if datetime.strptime(el["date"], date_format) > from_date: + tmp["statistics"].append(el) + if len(tmp["statistics"]) != 0: + new_data["text_indicator_to_statistics"].append(tmp) + return new_data + + +async def add_data(data): + for query in data["text_indicator_to_statistics"]: + query_name = query["text_indicator"]["value"] + new_url = [Url(url=query_name)] + metrics = [] + date = query["statistics"][0]["date"] + + data_add = { + "date": date, + "ctr": 0, + "position": 0, + "impression": 0, + "demand": 0, + "clicks": 0, + } + for el in query["statistics"]: + + if date != el["date"]: + metrics.append( + Metrics( + url=query_name, + date=datetime.strptime(date, date_format), + ctr=data_add["ctr"], + position=data_add["position"], + impression=data_add["impression"], + demand=data_add["demand"], + clicks=data_add["clicks"], + ) + ) + date = el["date"] + data_add = { + "date": date, + "ctr": 0, + "position": 0, + "impression": 0, + "demand": 0, + "clicks": 0, + } + + field = el["field"] + if field == "IMPRESSIONS": + data_add["impression"] = el["value"] + elif field == "CLICKS": + data_add["clicks"] = el["value"] + elif field == "DEMAND": + data_add["demand"] = el["value"] + elif field == "CTR": + data_add["ctr"] = el["value"] + elif field == "POSITION": + data_add["position"] = el["value"] + await _add_new_urls(new_url, async_session) + await _add_new_metrics(metrics, async_session) + + +async def get_data_by_page(page, last_update_date): + body = { + "offset": page, + "limit": 500, + "device_type_indicator": "ALL", + "text_indicator": "URL", + "region_ids": [], + "filters": {}, + } + + response = requests.post( + URL, + json=body, + headers={ + "Authorization": f"OAuth {ACCESS_TOKEN}", + "Content-Type": "application/json; charset=UTF-8", + }, + ) + + print(response.text[:100]) + data = response.json() + + if last_update_date is not None: + data = clean_dataframe(data, last_update_date) + + await add_data(data) + + +async def get_all_data(): + last_update_date = await _get_metrics_last_date(async_session) + + body = { + "offset": 0, + "limit": 500, + "device_type_indicator": "ALL", + "text_indicator": "URL", + "region_ids": [], + "filters": {}, + } + + response = requests.post( + URL, + json=body, + headers={ + "Authorization": f"OAuth {ACCESS_TOKEN}", + "Content-Type": "application/json; charset=UTF-8", + }, + ) + + print(response.text[:100]) + data = response.json() + print(response.text, flush=True) + count = data["count"] + + if last_update_date is not None: + data = clean_dataframe(data, last_update_date) + + await add_data(data) + for offset in range(500, count, 500): + print(f"[INFO] PAGE{offset} DONE!") + await get_data_by_page(offset, last_update_date) + + +if __name__ == "__main__": + # --- Elapsed time: 380-430 seconds --- + # Metrics 503 919 rows + # Url 38 763 rows + + start_time = time.perf_counter() + asyncio.run(get_all_data()) + print(f" --- Elapsed time: {time.perf_counter() - start_time:.1f} seconds --- ")