import asyncio import time from datetime import datetime from typing import Any import config import requests from api.actions.metrics_url import _add_new_metrics, _get_metrics_last_date from api.actions.urls import _add_new_urls from db.models import Metrics, Url from db.session import async_session ACCESS_TOKEN = f"{config.ACCESS_TOKEN}" USER_ID = f"{config.USER_ID}" HOST_ID = f"{config.HOST_ID}" date_format = "%Y-%m-%d" # Формируем URL для запроса мониторинга поисковых запросов URL = f"https://api.webmaster.yandex.net/v4/user/{USER_ID}/hosts/{HOST_ID}/query-analytics/list" def clean_dataframe(data: dict[str, Any], from_date: datetime): new_data = {} new_data["text_indicator_to_statistics"] = [] for query in data["text_indicator_to_statistics"]: tmp = {"text_indicator": query["text_indicator"], "statistics": []} for el in query["statistics"]: if datetime.strptime(el["date"], date_format) > from_date: tmp["statistics"].append(el) if len(tmp["statistics"]) != 0: new_data["text_indicator_to_statistics"].append(tmp) return new_data async def add_data(data): for query in data["text_indicator_to_statistics"]: query_name = query["text_indicator"]["value"] new_url = [Url(url=query_name)] metrics = [] date = query["statistics"][0]["date"] data_add = { "date": date, "ctr": 0, "position": 0, "impression": 0, "demand": 0, "clicks": 0, } for el in query["statistics"]: if date != el["date"]: metrics.append( Metrics( url=query_name, date=datetime.strptime(date, date_format), ctr=data_add["ctr"], position=data_add["position"], impression=data_add["impression"], demand=data_add["demand"], clicks=data_add["clicks"], ) ) date = el["date"] data_add = { "date": date, "ctr": 0, "position": 0, "impression": 0, "demand": 0, "clicks": 0, } field = el["field"] if field == "IMPRESSIONS": data_add["impression"] = el["value"] elif field == "CLICKS": data_add["clicks"] = el["value"] elif field == "DEMAND": data_add["demand"] = el["value"] elif field == "CTR": data_add["ctr"] = el["value"] elif field == "POSITION": data_add["position"] = el["value"] await _add_new_urls(new_url, async_session) await _add_new_metrics(metrics, async_session) async def get_data_by_page(page, last_update_date): body = { "offset": page, "limit": 500, "device_type_indicator": "ALL", "text_indicator": "URL", "region_ids": [], "filters": {}, } response = requests.post( URL, json=body, headers={ "Authorization": f"OAuth {ACCESS_TOKEN}", "Content-Type": "application/json; charset=UTF-8", }, ) print(response.text[:100]) data = response.json() if last_update_date is not None: data = clean_dataframe(data, last_update_date) await add_data(data) async def get_all_data(): last_update_date = await _get_metrics_last_date(async_session) body = { "offset": 0, "limit": 500, "device_type_indicator": "ALL", "text_indicator": "URL", "region_ids": [], "filters": {}, } response = requests.post( URL, json=body, headers={ "Authorization": f"OAuth {ACCESS_TOKEN}", "Content-Type": "application/json; charset=UTF-8", }, ) print(response.text[:100]) data = response.json() print(response.text, flush=True) count = data["count"] if last_update_date is not None: data = clean_dataframe(data, last_update_date) await add_data(data) for offset in range(500, count, 500): print(f"[INFO] PAGE{offset} DONE!") await get_data_by_page(offset, last_update_date) if __name__ == "__main__": # --- Elapsed time: 380-430 seconds --- # Metrics 503 919 rows # Url 38 763 rows start_time = time.perf_counter() asyncio.run(get_all_data()) print(f" --- Elapsed time: {time.perf_counter() - start_time:.1f} seconds --- ")