First init

This commit is contained in:
TC
2023-10-22 01:55:11 +02:00
commit 81b4805c0b
26 changed files with 1266 additions and 0 deletions

2
src/CHANGELOG.md Normal file
View File

@@ -0,0 +1,2 @@
## v0.1.0 [2023-10-21]
- Wersja beta

17
src/Dockerfile Normal file
View File

@@ -0,0 +1,17 @@
ARG BUILD_FROM
FROM $BUILD_FROM
RUN apk add --no-cache python3
RUN apk add --update py3-pip
# Copy data for add-on
COPY run.sh /
COPY requirements.txt /
COPY main.py /
COPY api.py /
COPY cron.py /
# COPY database.sqlite /
COPY moj_licznik.py /
RUN chmod a+x /run.sh
RUN pip install -r requirements.txt
CMD [ "/run.sh" ]

104
src/README.md Normal file
View File

@@ -0,0 +1,104 @@
[![ha_badge](https://img.shields.io/badge/Home%20Assistant-Add%20On-blue.svg)](https://www.home-assistant.io/)
# [Energa meter](https://github.com/tcich/ha-addon-energa-meter) Home Assistant add-on
[aarch64-shield]: https://img.shields.io/badge/aarch64-yes-green.svg
[amd64-shield]: https://img.shields.io/badge/amd64-yes-green.svg
[armv6-shield]: https://img.shields.io/badge/armv6-yes-green.svg
[armv7-shield]: https://img.shields.io/badge/armv7-yes-green.svg
[i386-shield]: https://img.shields.io/badge/i386-yes-green.svg
![aarch64-shield]
![amd64-shield]
![armv6-shield]
![armv7-shield]
![i386-shield]
**Podoba Ci się?** [Postaw kawę.](https://buycoffee.to/tcich)
## O dodatku
To jest dodatek dla [Home Assistant](https://www.home-assistant.io/). Instalacja dodatku [Energa meter](https://github.com/tcich/ha-addon-energa-meter) umożliwia cykliczne pobieranie danych z aplikacji [Mój Licznik - Energa](https://mojlicznik.energa-operator.pl) udostępnianej klientom Operatora energetycznego Energa
## Instalacja
1) Dodaj repozytorium do repozytoriów dodatków swojego HA za pomocą poniższego przycisku
[![Open your Home Assistant instance and show the add add-on repository dialog with a specific repository URL pre-filled.](https://my.home-assistant.io/badges/supervisor_add_addon_repository.svg)](https://my.home-assistant.io/redirect/supervisor_add_addon_repository/?repository_url=https%3A%2F%2Fgithub.com%2Ftcich%2Fha-addon-energa-meter)
Lub zainstaluj manualnie z Ustawienia -> Dodatki -> Sklep z dodatkami -> ⁞ (Menu) -> Repozytoria -> Wpisz `https://github.com/tcich/hassio-mojlicznik` -> Dodaj. Następnie w ⁞ (Menu) -> Sprawdź aktualizacje (może być konieczne przeładowanie strony)
2) Odszukaj dodatek na liście dodatków w sklepie z dodatkami i zainstaluj go.
3) W zakładce konfiguracja uzupełnij nazwę użytkownika oraz hasło do aplikacji Mój Licznik, jeżeli potrzebujesz to zmień udostępniany port dla API
4) Przejdź do zakładki informacje i uruchom dodatek (pierwsze uruchomienie może trwać kilkanaście minut), jeżeli w logu pojawi się informacja *INFO: Czekam...* oznacza to, że pierwsze inicjalne pobieranie danych zostało ukończone.
## Konfiguracja sensorów
1) Ustal ID Twoich liczników, w tym celu przejdź do adresu Twojego HA na porcie 8000 lub innym jeźeli zmieniłeś go w konfiguracji, np. http://192.168.1.10:8000 wyświetli się w formacie json lista dostępnych liczników, możesz również odszukać ID w logu: *Licznik 12335379 istnieje w systemie*
2) W pliku configuration.yaml w HA dodaj następującą konfigurację np.:
```
sensor:
- platform: rest
resource: http://localhost:8000/meters/12335379
name: "Energia aktualna T1"
unique_id: 12335379_sumz1
unit_of_measurement: "kWh"
value_template: "{{ value_json.meter.zone1.meter | round(2) }}"
- platform: rest
resource: http://localhost:8000/meters/12335379
name: "Dzienny odczyt licznika"
unique_id: 12335379_meterz1
unit_of_measurement: "kWh"
value_template: "{{ value_json.meter.zone1.sum | round(2) }}"
- platform: rest
resource: http://localhost:8000/meters/12335379
name: "Energia aktualna T2"
unique_id: 12335379_sumz2
unit_of_measurement: "kWh"
value_template: "{{ value_json.meter.zone2.meter | round(2) }}"
- platform: rest
resource: http://localhost:8000/meters/12335379
name: "Dzienny odczyt licznika"
unique_id: 12335379_meterz2
unit_of_measurement: "kWh"
value_template: "{{ value_json.meter.zone2.sum | round(2) }}"
```
# Opis konfiguracji
| element konfiguracji | Opis |
|-------------------|-------------------|
| resource: http://localhost:8000/meters/12335379 | Adres API z danymi konkretnego licznika, podajemy nazwę instancji dockera (**Nazwa hosta** z okna dodatku) lub localhost|
| name: "Energia aktualna" | Nazwa sensora, wpisz dowolną|
| unique_id | Unikalny ID sensora, nie mogą być w systemie dwa sensory z tym samym ID|
| unit_of_measurement: "kWh" | Jednostka miary, nie zmieniaj chyba, że wiesz co robisz|
| value_template: "{{ value_json.meter.zone2.meter \| round(2) }}" | Zaokrąglony do dwóch miejsc po przecinku stan sensora|
# Opis konfiguracji cd
| value_template | Opis |
|-------------------|-------------------|
| value_json.meter.zone1.sum | Suma licznika oraz dziennego zużycia dla tartfy1 (dostępne są: zone1, zone2, zone3)|
| value_json.meter.zone2.meter | Stan licznika dziennego dla taryfy1 (dostępne są: zone1, zone2, zone3)|
## API dla wykresów, np. Grafana
Aby pobrać dane z API w formacie JSON należy użyć adresu http://home_assistant:8000/charts/12729?start_date=1695332400129&end_date=1697924583285
gdzie:
* 12729 - jest to ID licznika
* start_date - początek okresu w milisekundach wg. standardu EPOCH (timestamp)
* end_date - koniec okresu w milisekundach wg. standardu EPOCH (timestamp)
## Jak dodać wykres do Grafana
[Instrukcja konfiguracji Grafana](https://github.com/tcich/ha-addon-energa-meter/blob/main/README.md#jak-doda%C4%87-wykres-do-grafana)
## Znane problemy
Czasami w aplikacji Mój Licznik włącza się captha (jeżeli masz dużo danych historycznych lub wielokrotnie instalujesz dodatek)
## Uwagi
Dostęp do aktualnej wersji API nie jest zabezpieczony tokenem
Każde przeinstalowanie dodatku pobiera ponownie dane z aplikacji Mój Licznik
**Podoba Ci się?** [Postaw kawę.](https://buycoffee.to/tcich)

203
src/api.py Normal file
View File

@@ -0,0 +1,203 @@
from peewee import SqliteDatabase
from flask import Flask, jsonify, request, redirect, url_for
from waitress import serve
import time
from datetime import datetime
from moj_licznik import PPETable, MainChartTable
DEBUG = False
app = Flask(__name__)
db = SqliteDatabase('database.sqlite')
@app.route('/', methods=['GET'])
def root_redirect():
query = PPETable.select().where(PPETable.is_active == True)
result_ppes = list(query)
meters = []
for p in result_ppes:
meter = {
'name': p.name,
'id': p.id
}
meters.append(meter)
if DEBUG:
print("API: GET /")
return jsonify({'meters': meters})
@app.route('/meters', methods=['GET'])
def meters():
query = PPETable.select().where(PPETable.is_active == True)
result_ppes = list(query)
meters = []
for p in result_ppes:
meter = {
'name': p.name,
'id': p.id,
'ppe': p.ppe,
'number_of_zones': p.number_of_zones,
'tariffCode': p.tariffCode,
'first_date': p.first_date,
'last_update_date': p.last_update_date,
'measurement_date': p.measurement_date,
}
for i in range(1, p.number_of_zones + 1):
zone_key = f'zone{i}'
daily_chart_key = f'zone{i}_daily_chart_sum'
zone_value = getattr(p, zone_key)
daily_chart_value = getattr(p, daily_chart_key)
# Zamień None na zero podczas obliczania sumy
zone_value = float(zone_value) if zone_value is not None else 0
daily_chart_value = float(daily_chart_value) if daily_chart_value is not None else 0
meter[zone_key] = {
'meter': zone_value,
'daily_chart': daily_chart_value,
'sum': zone_value + daily_chart_value
}
meters.append(meter)
if DEBUG:
print("API: GET /")
return jsonify({'meters': meters})
@app.route('/meters/<int:meter_id>', methods=['GET'])
def get_meter(meter_id):
query = PPETable.select().where((PPETable.is_active == True) & (PPETable.id == meter_id))
result_ppes = list(query)
if result_ppes:
p = result_ppes[0] # There should be only one matching record
meter = {
'name': p.name,
'id': p.id,
'ppe': p.ppe,
'number_of_zones': p.number_of_zones,
'tariffCode': p.tariffCode,
'first_date': p.first_date,
'last_update_date': p.last_update_date,
'measurement_date': p.measurement_date
}
for i in range(1, p.number_of_zones + 1):
zone_key = f'zone{i}'
daily_chart_key = f'zone{i}_daily_chart_sum'
zone_value = getattr(p, zone_key)
daily_chart_value = getattr(p, daily_chart_key)
# Zamień None na zero podczas obliczania sumy
zone_value = float(zone_value) if zone_value is not None else 0
daily_chart_value = float(daily_chart_value) if daily_chart_value is not None else 0
meter[zone_key] = {
'meter': zone_value,
'daily_chart': daily_chart_value,
'sum': zone_value + daily_chart_value
}
print(f"API: GET /meters/{meter_id}")
return jsonify({'meter': meter})
else:
return jsonify({'error': 'Meter not found'}, 404)
# @app.route('/meters/<int:meter_id>', methods=['GET'])
# def get_meter(meter_id):
# query = PPETable.select().where((PPETable.is_active == True) & (PPETable.id == meter_id))
# result_ppes = list(query)
# if result_ppes:
# p = result_ppes[0] # There should be only one matching record
# meter = {
# 'name': p.name,
# 'id': p.id,
# 'ppe': p.ppe,
# 'number_of_zones': p.number_of_zones,
# 'tariffCode': p.tariffCode,
# 'first_date': p.first_date,
# 'last_update_date': p.last_update_date,
# 'measurement_date': p.measurement_date
# }
# for i in range(1, p.number_of_zones + 1):
# zone_key = f'zone{i}'
# daily_chart_key = f'zone{i}_daily_chart_sum'
# meter[zone_key] = getattr(p, zone_key)
# meter[daily_chart_key] = getattr(p, daily_chart_key)
# print(f"API: GET /meters/{meter_id}")
# return jsonify({'meter': meter})
# else:
# return jsonify({'error': 'Meter not found'}, 404)
@app.route('/charts/<mp>', methods=['GET'])
def charts(mp):
start_time = time.time()
current_time = time.localtime()
start_date = request.args.get('start_date', (time.mktime(current_time) - 864000))
end_date = request.args.get('end_date', (time.mktime(current_time)))
query = MainChartTable.select().where((MainChartTable.mp == mp) & (MainChartTable.tm >= start_date) & (MainChartTable.tm <= end_date))
result_ppes = list(query)
charts = []
for p in result_ppes:
czas = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(p.tm/1000))
chart = {
'mp': p.mp,
'zone': p.zone,
'tm': p.tm,
'czas': czas,
'value': p.value
}
charts.append(chart)
end_time = time.time()
if DEBUG:
print(f"API: GET / - {start_date} - {end_date}")
return jsonify({'charts': charts})
@app.route('/<mp>/<zone>', methods=['GET'])
def charts_zone(mp, zone):
start_time = time.time()
current_time = time.localtime()
start_date = request.args.get('start_date', (time.mktime(current_time) - 864000))
end_date = request.args.get('end_date', (time.mktime(current_time)))
query = MainChartTable.select().where((MainChartTable.mp == mp) & (MainChartTable.tm >= start_date) & (MainChartTable.tm <= end_date) & (MainChartTable.zone == zone))
result_ppes = list(query)
charts = []
for p in result_ppes:
czas = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(p.tm/1000))
chart = {
'mp': p.mp,
'zone': p.zone,
'tm': p.tm,
'czas': czas,
'value': p.value
}
charts.append(chart)
end_time = time.time()
if DEBUG:
print(f"API: GET / - {start_date} - {end_date}")
return jsonify({'charts': charts})
if __name__ == "__main__":
serve(app, host="0.0.0.0", port=8000, threads=8)

20
src/config.yaml Normal file
View File

@@ -0,0 +1,20 @@
name: "Energa meter"
description: "Energa meter addon"
version: "0.1.0"
slug: "energa_meter"
init: false
options:
energa_username: ""
energa_password: ""
schema:
energa_username: str
energa_password: password
arch:
- aarch64
- amd64
- armhf
- armv7
- i386
startup: services
ports:
8000/tcp: 8000

32
src/cron.py Normal file
View File

@@ -0,0 +1,32 @@
import configparser, time, datetime, os
from moj_licznik import MojLicznik
from pathlib import Path
def main():
plik = Path('config.ini')
username = None
password = None
if plik.is_file():
print(f"Pobieram parametry z config.ini.")
config = configparser.ConfigParser()
config.read("config.ini")
username = config.get("Credentials", "username")
password = config.get("Credentials", "password")
else:
username = os.getenv("USERNAME")
password = os.getenv("PASSWORD")
try:
mojLicznik = MojLicznik()
print(f"Update...{datetime.datetime.now()}")
print(f"Logowanie...")
mojLicznik.login(username, password)
print(f"Aktualizacja danych bieżących...")
mojLicznik.uppdate_measurments()
mojLicznik.update_last_days()
mojLicznik.set_daily_zones()
mojLicznik.logout()
except:
print("Błąd aktualizacji danych...")
if __name__ == "__main__":
main()

BIN
src/icon.png Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 17 KiB

BIN
src/logo.png Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 17 KiB

34
src/main.py Normal file
View File

@@ -0,0 +1,34 @@
import configparser, time, datetime, os
from moj_licznik import MojLicznik
from pathlib import Path
def main():
plik = Path('config.ini')
username = None
password = None
if plik.is_file():
print(f"Pobieram parametry z config.ini.")
config = configparser.ConfigParser()
config.read("config.ini")
username = config.get("Credentials", "username")
password = config.get("Credentials", "password")
else:
username = os.getenv("USERNAME")
password = os.getenv("PASSWORD")
print(f"Inicjacja...")
mojLicznik = MojLicznik()
print(f"Logowanie...", username)
mojLicznik.login(username, password)
print(f"Aktualizacja liczników...")
mojLicznik.uppdate_measurments()
print(f"Wyszukiwanie najstarszych danych...")
mojLicznik.update_first_date()
print(f"Pobieranie danych...")
mojLicznik.download_charts(True)
mojLicznik.update_last_days()
mojLicznik.set_daily_zones()
mojLicznik.logout()
if __name__ == "__main__":
main()

502
src/moj_licznik.py Normal file
View File

@@ -0,0 +1,502 @@
from peewee import SqliteDatabase
from datetime import datetime, timedelta, date
import calendar, requests, re, time, json
import http.cookiejar as cookiejar
from requests.exceptions import HTTPError
from bs4 import BeautifulSoup
from enum import Enum
from peewee import Model, CharField, IntegerField, DateField, BooleanField, CompositeKey, DecimalField
db = SqliteDatabase('database.sqlite')
class ChartType(Enum):
DAY = "DAY"
MONTH = "MONTH"
YEAR = "YEAR"
class PPETable(Model):
ppe = CharField()
tariffCode = CharField()
name = CharField()
zone1 = DecimalField(max_digits=15, decimal_places=5, null=True)
zone2 = DecimalField(max_digits=15, decimal_places=5, null=True)
zone3 = DecimalField(max_digits=15, decimal_places=5, null=True)
zone1_daily_chart_sum = DecimalField(max_digits=10, decimal_places=5, null=True)
zone2_daily_chart_sum = DecimalField(max_digits=10, decimal_places=5, null=True)
zone3_daily_chart_sum = DecimalField(max_digits=10, decimal_places=5, null=True)
number_of_zones = IntegerField(default=0)
is_active = BooleanField(default=True)
measurement_date = DateField(null=True)
first_date = DateField(null=True)
last_update_date = DateField(null=True)
class Meta:
database = db
class ChartTable(Model):
id = IntegerField()
year = IntegerField()
month = IntegerField(null=True)
day = IntegerField(null=True)
value =CharField()
class Meta:
database = db
primary_key = CompositeKey('id', 'year', 'month', 'day')
class MainChartTable(Model):
mp = CharField()
zone = IntegerField()
tm = IntegerField()
value = DecimalField(max_digits=20, decimal_places=16, null=True)
tarAvg = DecimalField(max_digits=20, decimal_places=16, null=True)
est = BooleanField(default=False)
cplt = BooleanField(default=False)
class Meta:
database = db
primary_key = CompositeKey('mp', 'zone', 'tm')
class MojLicznik:
session = requests.Session()
session.cookies = cookiejar.LWPCookieJar(filename='cookies.txt')
meter_url = "https://mojlicznik.energa-operator.pl"
def databaseInit(self):
db.create_tables([ChartTable], safe=True)
db.create_tables([PPETable], safe=True)
db.create_tables([MainChartTable], safe=True)
def __init__(self):
self.username = None
self.password = None
self.loginStatus = False
self.data = [] # Change self.data to a list
self.ppes = []
self.databaseInit()
def login(self, _username, _password):
self.username = _username
self.password = _password
login_url = f"{self.meter_url}/dp/UserLogin.do"
try:
response = self.session.get(login_url)
response.raise_for_status()
print(f"Logowanie rozpoczęte.")
except HTTPError as e:
print(f"Wystąpił błąd HTTP: {e}")
soup = BeautifulSoup(response.text, 'html.parser')
csrf_token = soup.find('input', {'name': '_antixsrf'})['value']
login_data = {
'j_username': self.username,
'j_password': self.password,
'selectedForm': '1',
'save': 'save',
'clientOS': 'web',
'_antixsrf': csrf_token
}
try:
response = self.session.post(login_url, data=login_data)
response.raise_for_status()
self.loginStatus = True
print(f"Zalogowano")
except HTTPError as e:
print(f"Wystąpił błąd HTTP: {e}")
soup = BeautifulSoup(response.text, 'html.parser')
select_elements = soup.find_all('script', type='text/javascript')
meter_isd = []
for el in select_elements:
pattern = r"id:\s+(\d+),[\s\S]*?ppe:\s+'([\d\s]+)',[\s\S]*?tariffCode:\s+'([^']+)',[\s\S]*?name:\s+'([^']+)'"
matches = re.search(pattern, el.text)
if matches:
id_value = matches.group(1)
ppe_value = matches.group(2)
tariffCode_value = matches.group(3)
name_value = matches.group(4)
meter_isd.append(id_value)
retrieved_record = PPETable.get_or_none(id=id_value)
if retrieved_record:
print(f"Licznik {id_value} istnieje w systemie.")
if not retrieved_record.is_active:
retrieved_record.is_active = True
retrieved_record.save()
else:
print(f"Licznik {id_value} nie istnieje w systemie.")
data = PPETable.create(
id=id_value,
ppe=ppe_value,
tariffCode=tariffCode_value,
name=name_value
)
update_query = PPETable.update(is_active=0).where(PPETable.id.not_in(meter_isd))
update_query.execute()
def logout(self):
logout_url = f"{self.meter_url}/dp/MainLogout.go"
try:
response = self.session.get(logout_url)
response.raise_for_status()
self.loginStatus = False
print(f"Wylogowano.")
except HTTPError as e:
print(f"Wystąpił błąd HTTP: {e}")
def uppdate_measurments(self):
query = PPETable.select().where(PPETable.is_active == True)
result_ppes = query.execute()
for p in result_ppes:
meter_url = f"{self.meter_url}/dp/UserData.do?mpc={p.id}&ppe={p.ppe}"
try:
response = self.session.get(meter_url)
response.raise_for_status()
soup = BeautifulSoup(response.text, 'html.parser')
td_elements = soup.find_all('td', class_='last')
date_divs = soup.find_all("div", style="font-size: 10px")
for div in date_divs:
p.measurement_date = datetime.strptime(div.text.strip(), "%Y-%m-%d %H:%M").date()
i = 0
for td in td_elements:
text = td.get_text()
cleaned_text = re.sub(r'[^\d,]', '', text)
cleaned_number_str = cleaned_text.lstrip('0').replace(',', '.')
i = i + 1
if i == 1:
p.zone1 = float(cleaned_number_str)
p.number_of_zones = 1
elif i == 2:
p.zone2 = float(cleaned_number_str)
p.number_of_zones = 2
elif i == 3:
p.zone3 = float(cleaned_number_str)
p.number_of_zones = 1
p.last_update_date = datetime.now()
p.save()
print(f"Zapisano stan licznika {p.name} na dzień: {p.measurement_date}")
except HTTPError as e:
print(f"Wystąpił błąd HTTP: {e}")
def update_first_date(self):
query = PPETable.select().where(PPETable.first_date.is_null(True) & (PPETable.is_active == True))
result_ppes = query.execute()
for p in result_ppes:
print(f"Szukam najstarsze dane historyczne licznika {p.name}")
meter_point = p.id
max_years_back = 5
start_date = datetime.now()
last_chart_year = None
for n in range(max_years_back + 1):
first_day_of_year = datetime(start_date.year-n, 1, 1)
data_json = self.download_chart(ChartType.YEAR, first_day_of_year, meter_point)
if data_json:
data = json.loads(data_json)
if data and data.get("mainChart") and len(data["mainChart"]) > 0:
last_chart_year = first_day_of_year.year
last_chart_month = None
max_month = 12
for n in range(max_month, 0, -1):
first_day_of_month = datetime(last_chart_year, n, 1)
data_json = self.download_chart(ChartType.MONTH, first_day_of_month, meter_point)
if data_json:
data = json.loads(data_json)
if data and data.get("mainChart") and len(data["mainChart"]) > 0:
last_chart_month = n
last_chart_day = None
max_day = 31
first_day_of_day = datetime(last_chart_year, last_chart_month, 1)
_, max_day = calendar.monthrange(first_day_of_day.year, first_day_of_day.month)
for n in range(max_day, 0, -1):
first_day_of_day = datetime(last_chart_year, last_chart_month, n)
data_json = self.download_chart(ChartType.DAY, first_day_of_day, meter_point)
if data_json:
data = json.loads(data_json)
if data and data.get("mainChart") and len(data["mainChart"]) > 0:
last_chart_day = n
first_date = datetime(last_chart_year, last_chart_month, last_chart_day).date()
print(f"Najstarsze dane historyczne dla licznika {p.name}: {first_date}")
p.first_date = first_date
p.save()
def save_main_charts(self, mp, vals):
for val in vals:
#try:
z = val["zones"]
# {"tm": "1690412400000", "tarAvg": 0.3899153269199055, "zones": [null, 0.232, null], "est": false, "cplt": true},
if z[0]:
# MainChartTable.get_or_create(tm = val["tm"], zone = 1, value = z[0], tarAvg=val["tarAvg"], est=val["est"], cplt=val["cplt"])
try:
existing_record = MainChartTable.get((MainChartTable.mp == mp) & (MainChartTable.tm == val["tm"]) & (MainChartTable.zone == 1))
except MainChartTable.DoesNotExist:
# Jeśli rekord nie istnieje, utwórz nowy
MainChartTable.create(
mp=mp,
tm=val["tm"],
zone=1,
value=z[0],
tarAvg=val["tarAvg"],
est=val["est"],
cplt=val["cplt"]
)
if z[1]:
try:
existing_record = MainChartTable.get((MainChartTable.mp == mp) & (MainChartTable.tm == val["tm"]) & (MainChartTable.zone == 2))
except MainChartTable.DoesNotExist:
# Jeśli rekord nie istnieje, utwórz nowy
MainChartTable.create(
mp=mp,
tm=val["tm"],
zone=2,
value=z[1],
tarAvg=val["tarAvg"],
est=val["est"],
cplt=val["cplt"]
)
if z[2]:
try:
existing_record = MainChartTable.get((MainChartTable.mp == mp) & (MainChartTable.tm == val["tm"]) & (MainChartTable.zone == 1))
except MainChartTable.DoesNotExist:
# Jeśli rekord nie istnieje, utwórz nowy
MainChartTable.create(
mp=mp,
tm=val["tm"],
zone=3,
value=z[2],
tarAvg=val["tarAvg"],
est=val["est"],
cplt=val["cplt"]
)
#except:
# pass
return None
def download_chart(self, type, date, meter_point, update_mode=False):
if type == ChartType.DAY:
chart_type = "DAY"
first_day = datetime(date.year, date.month, date.day)
tsm_date = int(time.mktime(first_day.timetuple()) * 1000)
if type == ChartType.MONTH:
chart_type = "MONTH"
first_day = datetime(date.year, date.month, 1)
tsm_date = int(time.mktime(first_day.timetuple()) * 1000)
if type == ChartType.YEAR:
chart_type = "YEAR"
first_day = datetime(date.year, 1, 1)
tsm_date = int(time.mktime(first_day.timetuple()) * 1000)
chart_url = f"{self.meter_url}/dp/resources/chart?mainChartDate={tsm_date}&type={chart_type}&meterPoint={meter_point}&mo=A%2B"
try:
response = self.session.get(chart_url)
data = json.loads(response.text)
response.raise_for_status()
if data["response"]:
id = data["response"]["meterPoint"]
mainChartDate = data["response"]["mainChartDate"]
mainChart = data["response"]["mainChart"]
if type == ChartType.DAY:
self.save_main_charts(meter_point, mainChart)
date = int(mainChartDate) / 1000
month = None
day = None
dt = datetime.fromtimestamp(date)
year = dt.year
if type == ChartType.MONTH:
month = dt.month
if type == ChartType.DAY:
month = dt.month
day = dt.day
json_dump = json.dumps(data["response"], ensure_ascii=False)
if update_mode:
try:
chart_record = ChartTable.get(id=id,year=year, month=month, day=day)
chart_record.value = json_dump
chart_record.save()
except ChartTable.DoesNotExist:
chart_record = ChartTable.create(id=id, value=json_dump, year=year, month=month, day=day)
else:
try:
ChartTable.create(id=id, value=json_dump, year=year, month=month, day=day)
except:
pass
return json_dump
return None
except HTTPError as e:
print(f"Wystąpił błąd HTTP: {e}")
def download_charts(self, full_mode=False):
query = PPETable.select().where(PPETable.is_active == True)
result_ppes = query.execute()
for p in result_ppes:
current_date = p.first_date
if not full_mode:
current_date = p.measurement_date - timedelta(days=1)
while current_date <= date.today():
try:
record = ChartTable.get(id=p.id, year=current_date.year, month=current_date.month, day=current_date.day)
# Jeśli rekord o określonych wartościach klucza głównego istnieje, zostanie pobrany.
print(f"Posiadam dane historyczne dla {p.name} na dzień: {current_date}")
except ChartTable.DoesNotExist:
self.download_chart(ChartType.DAY, current_date, p.id)
print(f"Pobieram dane historyczne dla {p.name} na dzień: {current_date}")
current_date += timedelta(days=1)
def update_last_days(self):
today = datetime.today().date()
query = PPETable.select().where(PPETable.is_active == True)
result_ppes = query.execute()
for p in result_ppes:
if not p.last_update_date:
p.last_update_date = today - timedelta(days=5)
p.save()
last_update_date = p.last_update_date - timedelta(days=1)
while last_update_date <= today:
print(f"Aktualizacja danych dla {p.name} na dzień: {last_update_date}")
self.download_chart(ChartType.DAY, last_update_date, p.id, True)
p.last_update_date = last_update_date
p.save()
last_update_date += timedelta(days=1)
def get_current_meters(self, add_daily_char_data=False):
query = PPETable.select().where(PPETable.is_active == True)
result_ppes = query.execute()
for p in result_ppes:
if add_daily_char_data:
query = ChartTable.select().where((ChartTable.id == p.id) & (ChartTable.year == p.measurement_date.year) & (ChartTable.month == p.measurement_date.month) & (ChartTable.day == p.measurement_date.day))
query_count = query.count()
if (query_count > 0):
query_first = query.first()
value_json = json.loads(query_first.value)
print(query_first.value)
zones = value_json.get("zones", [])
if zones:
zone1_data = zones[0]
zone1_main_chart = zone1_data.get("mainChart", [])
#print(zone1_data)
#else:
#print(f"{p.name} ({p.measurement_date}) : {p.zone1}, {p.zone2}, {p.zone3}")
#else:
#print(f"{p.name} ({p.measurement_date}) : {p.zone1}, {p.zone2}, {p.zone3}")
def set_daily_zones(self):
query = PPETable.select().where(PPETable.is_active == True)
result_ppes = query.execute()
for p in result_ppes:
query = ChartTable.select().where(
(ChartTable.id == p.id) &
((ChartTable.year > p.measurement_date.year) |
((ChartTable.year == p.measurement_date.year) &
(ChartTable.month > p.measurement_date.month)) |
((ChartTable.year == p.measurement_date.year) &
(ChartTable.month == p.measurement_date.month) &
(ChartTable.day >= p.measurement_date.day))
))
zones_sums = {f"zone{i+1}_daily_chart_sum": 0.0 for i in range(3)}
for chart_entry in query:
value_json = json.loads(chart_entry.value)
main_chart = value_json.get("mainChart", [])
for entry in main_chart:
zones = entry.get("zones", [])
for i, value in enumerate(zones):
if value is not None:
zones_sums[f"zone{i+1}_daily_chart_sum"] += value
for key, value in zones_sums.items():
setattr(p, key, value)
p.save()
# def set_daily_zones(self):
# query = PPETable.select().where(PPETable.is_active == True)
# result_ppes = query.execute()
# for p in result_ppes:
# query = ChartTable.select().where((ChartTable.id == p.id) & (ChartTable.year >= p.measurement_date.year) & (ChartTable.month >= p.measurement_date.month) & (ChartTable.day >= p.measurement_date.day))
# query_count = query.count()
# if (query_count > 0):
# query_first = query.first()
# value_json = json.loads(query_first.value)
# main_chart = value_json.get("mainChart", [])
# # Inicjalizacja słownika do przechowywania sum dla każdej sekcji zones
# zones_sums = {f"zone{i+1}": 0.0 for i in range(len(main_chart[0].get("zones", [])))}
# for entry in main_chart:
# zones = entry.get("zones", [])
# for i, value in enumerate(zones):
# if value is not None:
# zones_sums[f"zone{i+1}"] += value
# if (zones_sums["zone1"] > 0):
# p.zone1_daily_chart_sum = zones_sums["zone1"]
# else:
# p.zone1_daily_chart_sum = None
# if (zones_sums["zone2"] > 0):
# p.zone2_daily_chart_sum = zones_sums["zone2"]
# else:
# p.zone2_daily_chart_sum = None
# if (zones_sums["zone3"]):
# p.zone3_daily_chart_sum = zones_sums["zone3"]
# else:
# p.zone3_daily_chart_sum = None
# p.save()
def print_summary_zones(self):
query = PPETable.select().where(PPETable.is_active == True)
result_ppes = query.execute()
for p in result_ppes:
zon1 = (p.zone1 if p.zone1 is not None else 0 ) + (p.zone1_daily_chart_sum if p.zone1_daily_chart_sum is not None else 0)
zon2 = (p.zone2 if p.zone2 is not None else 0 ) + (p.zone2_daily_chart_sum if p.zone2_daily_chart_sum is not None else 0)
zon3 = (p.zone3 if p.zone3 is not None else 0 ) + (p.zone3_daily_chart_sum if p.zone3_daily_chart_sum is not None else 0)
print(f"{p.name} : {round(zon1, 5)} "
f"{round(zon2,5)} "
f"{round(zon3,5)}")
def get_current_meters_list(self):
query = PPETable.select().where(PPETable.is_active == True)
return query.execute()
def get_current_meter_value(self, meter_id, zone):
if zone == "zone1":
pPETable = PPETable.get(PPETable.id == meter_id)
return pPETable.zone1
if zone == "zone2":
pPETable = PPETable.get(PPETable.id == meter_id)
return pPETable.zone2
if zone == "zone3":
pPETable = PPETable.get(PPETable.id == meter_id)
return pPETable.zone3
return None

BIN
src/requirements.txt Normal file

Binary file not shown.

17
src/run.sh Normal file
View File

@@ -0,0 +1,17 @@
#!/usr/bin/with-contenv bashio
export USERNAME=$(bashio::config 'energa_username')
export PASSWORD=$(bashio::config 'energa_password')
bashio::log.info "Uruchamiam API"
python api.py &
bashio::log.info "Uruchamiam MAIN"
python main.py
bashio::log.info "Uruchamiam CRON"
while true; do
python cron.py
bashio::log.info "Czekam..."
sleep 1800
done