sprint1: base func
This commit is contained in:
0
src/__init__.py
Normal file
0
src/__init__.py
Normal file
19
src/app.py
Normal file
19
src/app.py
Normal file
@@ -0,0 +1,19 @@
|
||||
from src.device import FakeInverter
|
||||
from src.monitor import Monitor
|
||||
|
||||
|
||||
def run_test_monitor(config: dict):
|
||||
monitor = Monitor(session_config=config)
|
||||
device_conf = {
|
||||
"name": "myinv",
|
||||
"model": "12345",
|
||||
"location": "my_home",
|
||||
"port": "COM",
|
||||
}
|
||||
device = FakeInverter(**device_conf)
|
||||
monitor.init_device(device)
|
||||
monitor.demo_get_device_data_from_file()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
pass
|
5
src/config.py
Normal file
5
src/config.py
Normal file
@@ -0,0 +1,5 @@
|
||||
from dotenv import dotenv_values
|
||||
|
||||
|
||||
prod_config = dotenv_values("src/.env_prod")
|
||||
test_config = dotenv_values("nts_ntu/.env_test")
|
110
src/device.py
Normal file
110
src/device.py
Normal file
@@ -0,0 +1,110 @@
|
||||
from abc import ABC, abstractmethod
|
||||
from collections import namedtuple
|
||||
from datetime import datetime, timedelta
|
||||
from typing import Any
|
||||
|
||||
|
||||
class Device(ABC):
|
||||
"""Абстрактный базовый класс прибора
|
||||
Описывает необходимые к реализации атрибуты/методы, для подключения прибора
|
||||
к системе мониторинга и сбора данных
|
||||
|
||||
Атрибуты обязятельные к определению в приборе
|
||||
---------------------------------------------------------------------------
|
||||
...
|
||||
|
||||
Методы обязательные к определению в приборе
|
||||
---------------------------------------------------------------------------
|
||||
def connect(self) -> None
|
||||
реализация подключения к прибору
|
||||
|
||||
def get_data(self) -> dict
|
||||
Получение текущих показаний прибора. Функция должна вернуть
|
||||
словарь определенной структуры(см. соответствующий пункт документации)
|
||||
"""
|
||||
|
||||
@abstractmethod
|
||||
def connect(self):
|
||||
"""Реализуется физическое подключение к прибору, для сбора данных"""
|
||||
raise NotImplementedError
|
||||
|
||||
@abstractmethod
|
||||
def get_data(self) -> dict:
|
||||
"""Реализует получение данных от прибора возвращает словарь.
|
||||
Считанные данные привести к dict описанной ниже структуры:
|
||||
dict_structure = {
|
||||
"measurement": "<name>",
|
||||
"tags": {"<tag_name>": "<tag_value>"},
|
||||
"fields": {"<field_name>": <field_value>},
|
||||
"time": <measurement_time>
|
||||
}
|
||||
Допустимые типы данных в описанном словаре:
|
||||
<name>: str
|
||||
<tag_name>: str
|
||||
<tag_value>: str
|
||||
<field_name>: str
|
||||
<field_value>: str | int | float | bool
|
||||
<measurement_time>: datetime
|
||||
|
||||
Функция должна вернуть созданный словарь
|
||||
"""
|
||||
raise NotImplementedError
|
||||
|
||||
|
||||
class Inverter(Device):
|
||||
def __init__(self, **kwargs) -> None:
|
||||
self.name: str = kwargs.get("name", "fakeinvertor")
|
||||
self.model: str = kwargs.get("model", "fakemodel")
|
||||
self.location: str = kwargs.get("name", "fakelocation")
|
||||
self._raw_data: str
|
||||
self.port: str = kwargs.get("name", "fakeCOMport")
|
||||
self.__dict__.update(kwargs)
|
||||
|
||||
def connect(self):
|
||||
pass
|
||||
|
||||
def get_data(self, current_time: datetime = datetime.now()) -> dict:
|
||||
val_template = namedtuple(
|
||||
"InvertorData",
|
||||
[
|
||||
"VVV", # O/P Voltage
|
||||
"QQQ", # O/P load percent (Digital) 0, - 0%
|
||||
"SS_S", # Battery voltage 12,24,48
|
||||
"BBB", # Battery capacity (as O/P load percent)
|
||||
"TT_T", # Heat Sink Temperature (0-99.9)
|
||||
"MMM", # Utility Power Voltage (0-250VACоооо)
|
||||
"RR_R", # Output Power Frequency (40.0-70.0) Hz
|
||||
"DDD", # DC BUS Voltage (0V)
|
||||
"PPP", # O/P load Percent (Analog) (0-100%)u
|
||||
"command_bits",
|
||||
],
|
||||
)
|
||||
raw = self._raw_data.strip().lstrip("(").rstrip(")").split()
|
||||
print(raw)
|
||||
|
||||
values = val_template(*map(float, raw[:-1]), raw[-1])
|
||||
answer: dict = {
|
||||
"measurement": self.name,
|
||||
"tags": {"model": self.model, "location": self.location},
|
||||
"fields": values._asdict(),
|
||||
"time": current_time,
|
||||
}
|
||||
return answer
|
||||
|
||||
|
||||
class FakeInverter(Inverter):
|
||||
def __init__(self, **kwargs) -> None:
|
||||
super().__init__(**kwargs)
|
||||
self.cur_datetime: datetime = datetime.now() - timedelta(hours=23)
|
||||
self.delta: timedelta = timedelta(minutes=10)
|
||||
|
||||
def __getattribute__(self, __name: str) -> Any:
|
||||
if __name == "cur_datetime":
|
||||
self.__dict__["cur_datetime"] += self.delta
|
||||
return super().__getattribute__(__name)
|
||||
|
||||
def set_fake_answer(self, line: str) -> None:
|
||||
self._raw_data = line
|
||||
|
||||
def get_data(self, current_time: datetime = datetime.now()) -> dict:
|
||||
return super().get_data(self.cur_datetime)
|
46
src/monitor.py
Normal file
46
src/monitor.py
Normal file
@@ -0,0 +1,46 @@
|
||||
import time
|
||||
from typing import List
|
||||
|
||||
from src.device import Device
|
||||
from src.session import InfDBSession
|
||||
|
||||
|
||||
class Monitor:
|
||||
def __init__(
|
||||
self,
|
||||
session_config: dict,
|
||||
pooling_delay: int | float = 0.1,
|
||||
) -> None:
|
||||
"""
|
||||
description
|
||||
"""
|
||||
self.device_list: List[Device] = []
|
||||
self.session_config: dict = session_config
|
||||
self.pooling_delay: int | float = pooling_delay
|
||||
|
||||
def init_device(self, device: Device) -> None:
|
||||
self.device_list.append(device)
|
||||
|
||||
def _get_device_data(self, device: Device) -> dict:
|
||||
answer = device.get_data()
|
||||
return answer
|
||||
|
||||
def __send_devices_data_to_db(self) -> None:
|
||||
session = InfDBSession(config=self.session_config)
|
||||
for device in self.device_list:
|
||||
data: dict = self._get_device_data(device)
|
||||
session.add(data)
|
||||
session.commit()
|
||||
|
||||
def mainloop(self) -> None:
|
||||
while True:
|
||||
self.__send_devices_data_to_db()
|
||||
time.sleep(self.pooling_delay)
|
||||
|
||||
def demo_get_device_data_from_file(self):
|
||||
with open("telemetry.txt", mode="r") as f:
|
||||
for line in f.readlines():
|
||||
line = line.strip()
|
||||
if line:
|
||||
self.device_list[0].set_fake_answer(line)
|
||||
self.__send_devices_data_to_db()
|
33
src/session.py
Normal file
33
src/session.py
Normal file
@@ -0,0 +1,33 @@
|
||||
import influxdb_client as infdb
|
||||
from influxdb_client import Point
|
||||
from influxdb_client.client.write_api import SYNCHRONOUS
|
||||
|
||||
|
||||
class InfDBSession:
|
||||
def __init__(self, config: dict) -> None:
|
||||
self.token: str
|
||||
self.url: str
|
||||
self.org: str
|
||||
self.bucket: str
|
||||
self.__dict__.update(config)
|
||||
self.__points_queue: list = []
|
||||
|
||||
self.__client = infdb.InfluxDBClient(
|
||||
url=self.url,
|
||||
token=self.token,
|
||||
org=self.org,
|
||||
)
|
||||
self.__write_api = self.__client.write_api(write_options=SYNCHRONOUS)
|
||||
|
||||
def add(self, point: dict) -> None:
|
||||
print(point)
|
||||
self.__points_queue.append(Point.from_dict(point))
|
||||
print(len(self.__points_queue))
|
||||
|
||||
def commit(self) -> None:
|
||||
while self.__points_queue:
|
||||
self.__write_api.write(
|
||||
org=self.org,
|
||||
bucket=self.bucket,
|
||||
record=self.__points_queue.pop(0),
|
||||
)
|
Reference in New Issue
Block a user