testcase_ceoec/services/load_all_urls.py

162 lines
4.7 KiB
Python
Raw Permalink Normal View History

2024-07-05 00:24:49 +03:00
import asyncio
import time
from datetime import datetime
from typing import Any
import config
import requests
from api.actions.metrics_url import _add_new_metrics, _get_metrics_last_date
from api.actions.urls import _add_new_urls
from db.models import Metrics, Url
from db.session import async_session
ACCESS_TOKEN = f"{config.ACCESS_TOKEN}"
USER_ID = f"{config.USER_ID}"
HOST_ID = f"{config.HOST_ID}"
date_format = "%Y-%m-%d"
# Формируем URL для запроса мониторинга поисковых запросов
URL = f"https://api.webmaster.yandex.net/v4/user/{USER_ID}/hosts/{HOST_ID}/query-analytics/list"
def clean_dataframe(data: dict[str, Any], from_date: datetime):
new_data = {}
new_data["text_indicator_to_statistics"] = []
for query in data["text_indicator_to_statistics"]:
tmp = {"text_indicator": query["text_indicator"], "statistics": []}
for el in query["statistics"]:
if datetime.strptime(el["date"], date_format) > from_date:
tmp["statistics"].append(el)
if len(tmp["statistics"]) != 0:
new_data["text_indicator_to_statistics"].append(tmp)
return new_data
async def add_data(data):
for query in data["text_indicator_to_statistics"]:
query_name = query["text_indicator"]["value"]
new_url = [Url(url=query_name)]
metrics = []
date = query["statistics"][0]["date"]
data_add = {
"date": date,
"ctr": 0,
"position": 0,
"impression": 0,
"demand": 0,
"clicks": 0,
}
for el in query["statistics"]:
if date != el["date"]:
metrics.append(
Metrics(
url=query_name,
date=datetime.strptime(date, date_format),
ctr=data_add["ctr"],
position=data_add["position"],
impression=data_add["impression"],
demand=data_add["demand"],
clicks=data_add["clicks"],
)
)
date = el["date"]
data_add = {
"date": date,
"ctr": 0,
"position": 0,
"impression": 0,
"demand": 0,
"clicks": 0,
}
field = el["field"]
if field == "IMPRESSIONS":
data_add["impression"] = el["value"]
elif field == "CLICKS":
data_add["clicks"] = el["value"]
elif field == "DEMAND":
data_add["demand"] = el["value"]
elif field == "CTR":
data_add["ctr"] = el["value"]
elif field == "POSITION":
data_add["position"] = el["value"]
await _add_new_urls(new_url, async_session)
await _add_new_metrics(metrics, async_session)
async def get_data_by_page(page, last_update_date):
body = {
"offset": page,
"limit": 500,
"device_type_indicator": "ALL",
"text_indicator": "URL",
"region_ids": [],
"filters": {},
}
response = requests.post(
URL,
json=body,
headers={
"Authorization": f"OAuth {ACCESS_TOKEN}",
"Content-Type": "application/json; charset=UTF-8",
},
)
print(response.text[:100])
data = response.json()
if last_update_date is not None:
data = clean_dataframe(data, last_update_date)
await add_data(data)
async def get_all_data():
last_update_date = await _get_metrics_last_date(async_session)
body = {
"offset": 0,
"limit": 500,
"device_type_indicator": "ALL",
"text_indicator": "URL",
"region_ids": [],
"filters": {},
}
response = requests.post(
URL,
json=body,
headers={
"Authorization": f"OAuth {ACCESS_TOKEN}",
"Content-Type": "application/json; charset=UTF-8",
},
)
print(response.text[:100])
data = response.json()
print(response.text, flush=True)
count = data["count"]
if last_update_date is not None:
data = clean_dataframe(data, last_update_date)
await add_data(data)
for offset in range(500, count, 500):
print(f"[INFO] PAGE{offset} DONE!")
await get_data_by_page(offset, last_update_date)
if __name__ == "__main__":
# --- Elapsed time: 380-430 seconds ---
# Metrics 503 919 rows
# Url 38 763 rows
start_time = time.perf_counter()
asyncio.run(get_all_data())
print(f" --- Elapsed time: {time.perf_counter() - start_time:.1f} seconds --- ")