testcase_ceoec/services/load_all_urls.py

162 lines
4.7 KiB
Python
Raw Permalink Blame History

This file contains invisible Unicode characters!

This file contains invisible Unicode characters that may be processed differently from what appears below. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to reveal hidden characters.

import asyncio
import time
from datetime import datetime
from typing import Any
import config
import requests
from api.actions.metrics_url import _add_new_metrics, _get_metrics_last_date
from api.actions.urls import _add_new_urls
from db.models import Metrics, Url
from db.session import async_session
ACCESS_TOKEN = f"{config.ACCESS_TOKEN}"
USER_ID = f"{config.USER_ID}"
HOST_ID = f"{config.HOST_ID}"
date_format = "%Y-%m-%d"
# Формируем URL для запроса мониторинга поисковых запросов
URL = f"https://api.webmaster.yandex.net/v4/user/{USER_ID}/hosts/{HOST_ID}/query-analytics/list"
def clean_dataframe(data: dict[str, Any], from_date: datetime):
new_data = {}
new_data["text_indicator_to_statistics"] = []
for query in data["text_indicator_to_statistics"]:
tmp = {"text_indicator": query["text_indicator"], "statistics": []}
for el in query["statistics"]:
if datetime.strptime(el["date"], date_format) > from_date:
tmp["statistics"].append(el)
if len(tmp["statistics"]) != 0:
new_data["text_indicator_to_statistics"].append(tmp)
return new_data
async def add_data(data):
for query in data["text_indicator_to_statistics"]:
query_name = query["text_indicator"]["value"]
new_url = [Url(url=query_name)]
metrics = []
date = query["statistics"][0]["date"]
data_add = {
"date": date,
"ctr": 0,
"position": 0,
"impression": 0,
"demand": 0,
"clicks": 0,
}
for el in query["statistics"]:
if date != el["date"]:
metrics.append(
Metrics(
url=query_name,
date=datetime.strptime(date, date_format),
ctr=data_add["ctr"],
position=data_add["position"],
impression=data_add["impression"],
demand=data_add["demand"],
clicks=data_add["clicks"],
)
)
date = el["date"]
data_add = {
"date": date,
"ctr": 0,
"position": 0,
"impression": 0,
"demand": 0,
"clicks": 0,
}
field = el["field"]
if field == "IMPRESSIONS":
data_add["impression"] = el["value"]
elif field == "CLICKS":
data_add["clicks"] = el["value"]
elif field == "DEMAND":
data_add["demand"] = el["value"]
elif field == "CTR":
data_add["ctr"] = el["value"]
elif field == "POSITION":
data_add["position"] = el["value"]
await _add_new_urls(new_url, async_session)
await _add_new_metrics(metrics, async_session)
async def get_data_by_page(page, last_update_date):
body = {
"offset": page,
"limit": 500,
"device_type_indicator": "ALL",
"text_indicator": "URL",
"region_ids": [],
"filters": {},
}
response = requests.post(
URL,
json=body,
headers={
"Authorization": f"OAuth {ACCESS_TOKEN}",
"Content-Type": "application/json; charset=UTF-8",
},
)
print(response.text[:100])
data = response.json()
if last_update_date is not None:
data = clean_dataframe(data, last_update_date)
await add_data(data)
async def get_all_data():
last_update_date = await _get_metrics_last_date(async_session)
body = {
"offset": 0,
"limit": 500,
"device_type_indicator": "ALL",
"text_indicator": "URL",
"region_ids": [],
"filters": {},
}
response = requests.post(
URL,
json=body,
headers={
"Authorization": f"OAuth {ACCESS_TOKEN}",
"Content-Type": "application/json; charset=UTF-8",
},
)
print(response.text[:100])
data = response.json()
print(response.text, flush=True)
count = data["count"]
if last_update_date is not None:
data = clean_dataframe(data, last_update_date)
await add_data(data)
for offset in range(500, count, 500):
print(f"[INFO] PAGE{offset} DONE!")
await get_data_by_page(offset, last_update_date)
if __name__ == "__main__":
# --- Elapsed time: 380-430 seconds ---
# Metrics 503 919 rows
# Url 38 763 rows
start_time = time.perf_counter()
asyncio.run(get_all_data())
print(f" --- Elapsed time: {time.perf_counter() - start_time:.1f} seconds --- ")