new file: src.dev/CHANGELOG.md

new file:   src.dev/api.py
	new file:   src.dev/config.yaml
	new file:   src.dev/cron.py
	new file:   src.dev/log_config.py
	new file:   src.dev/moj_licznik.py
	new file:   src.dev/requirements.txt
	new file:   src.dev/run.py
	new file:   src.dev/run.sh
	deleted:    .idea/workspace.xml
	modified:   repository.yaml
	modified:   src.dev/CHANGELOG.md
This commit is contained in:
TC
2023-11-01 15:12:22 +01:00
parent 4b65573ed0
commit 1c76b80248
10 changed files with 1077 additions and 0 deletions

88
.idea/workspace.xml generated Normal file
View File

@@ -0,0 +1,88 @@
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="AutoImportSettings">
<option name="autoReloadType" value="SELECTIVE" />
</component>
<component name="ChangeListManager">
<list default="true" id="8727e2a1-35f4-4c9e-8dd5-0334d3dc5b3b" name="Changes" comment="">
<change beforePath="$PROJECT_DIR$/src/CHANGELOG.md" beforeDir="false" afterPath="$PROJECT_DIR$/src/CHANGELOG.md" afterDir="false" />
<change beforePath="$PROJECT_DIR$/src/api.py" beforeDir="false" afterPath="$PROJECT_DIR$/src/api.py" afterDir="false" />
<change beforePath="$PROJECT_DIR$/src/config.yaml" beforeDir="false" afterPath="$PROJECT_DIR$/src/config.yaml" afterDir="false" />
<change beforePath="$PROJECT_DIR$/src/cron.py" beforeDir="false" afterPath="$PROJECT_DIR$/src/cron.py" afterDir="false" />
<change beforePath="$PROJECT_DIR$/src/main.py" beforeDir="false" />
<change beforePath="$PROJECT_DIR$/src/moj_licznik.py" beforeDir="false" afterPath="$PROJECT_DIR$/src/moj_licznik.py" afterDir="false" />
<change beforePath="$PROJECT_DIR$/src/run.sh" beforeDir="false" afterPath="$PROJECT_DIR$/src/run.sh" afterDir="false" />
</list>
<option name="SHOW_DIALOG" value="false" />
<option name="HIGHLIGHT_CONFLICTS" value="true" />
<option name="HIGHLIGHT_NON_ACTIVE_CHANGELIST" value="false" />
<option name="LAST_RESOLUTION" value="IGNORE" />
</component>
<component name="Git.Settings">
<option name="RECENT_GIT_ROOT_PATH" value="$PROJECT_DIR$" />
</component>
<component name="MarkdownSettingsMigration">
<option name="stateVersion" value="1" />
</component>
<component name="ProjectColorInfo"><![CDATA[{
"associatedIndex": 8
}]]></component>
<component name="ProjectId" id="2XJbibVs5CCeRSUO1z20ufRUETI" />
<component name="ProjectViewState">
<option name="hideEmptyMiddlePackages" value="true" />
<option name="showLibraryContents" value="true" />
<option name="showVisibilityIcons" value="true" />
</component>
<component name="PropertiesComponent"><![CDATA[{
"keyToString": {
"RunOnceActivity.OpenProjectViewOnStart": "true",
"RunOnceActivity.ShowReadmeOnStart": "true",
"git-widget-placeholder": "wytworca",
"last_opened_file_path": "D:/github/tcich/ha-addon-energa-meter",
"project.structure.last.edited": "Project",
"project.structure.proportion": "0.15",
"project.structure.side.proportion": "0.2",
"settings.editor.selected.configurable": "configurable.group.tools"
}
}]]></component>
<component name="RunManager">
<configuration name="run" type="PythonConfigurationType" factoryName="Python" temporary="true" nameIsGenerated="true">
<module name="ha-addon-energa-meter" />
<option name="INTERPRETER_OPTIONS" value="" />
<option name="PARENT_ENVS" value="true" />
<envs>
<env name="PYTHONUNBUFFERED" value="1" />
</envs>
<option name="SDK_HOME" value="" />
<option name="SDK_NAME" value="Python 3.8" />
<option name="WORKING_DIRECTORY" value="$PROJECT_DIR$/src" />
<option name="IS_MODULE_SDK" value="false" />
<option name="ADD_CONTENT_ROOTS" value="true" />
<option name="ADD_SOURCE_ROOTS" value="true" />
<option name="SCRIPT_NAME" value="$PROJECT_DIR$/src/run.py" />
<option name="PARAMETERS" value="" />
<option name="SHOW_COMMAND_LINE" value="false" />
<option name="EMULATE_TERMINAL" value="false" />
<option name="MODULE_MODE" value="false" />
<option name="REDIRECT_INPUT" value="false" />
<option name="INPUT_FILE" value="" />
<method v="2" />
</configuration>
<recent_temporary>
<list>
<item itemvalue="Python.run" />
</list>
</recent_temporary>
</component>
<component name="SpellCheckerSettings" RuntimeDictionaries="0" Folders="0" CustomDictionaries="0" DefaultDictionary="application-level" UseSingleDictionary="true" transferred="true" />
<component name="TaskManager">
<task active="true" id="Default" summary="Default task">
<changelist id="8727e2a1-35f4-4c9e-8dd5-0334d3dc5b3b" name="Changes" comment="" />
<created>1698350257905</created>
<option name="number" value="Default" />
<option name="presentableId" value="Default" />
<updated>1698350257905</updated>
</task>
<servers />
</component>
</project>

10
src.dev/CHANGELOG.md Normal file
View File

@@ -0,0 +1,10 @@
## v0.1.3 [2023-10-31]
- Dodano obsługę liczników wytwórcy (Uwaga: zmiana struktury JSON)
## v0.1.2 [2023-10-23]
- Dodano obsługę trybu serwisowego aplikacji Mój Licznik
- Poprawiono logowanie błędów
- W przypadku problemów z logowaniem liczniki w aplikacji pozostają aktywne
## v0.1.1 [2023-10-22]
- Dodano obsługę błędnego logowania
## v0.1.0 [2023-10-21]
- Wersja beta

237
src.dev/api.py Normal file
View File

@@ -0,0 +1,237 @@
from peewee import SqliteDatabase
from flask import Flask, jsonify, request, redirect, url_for, abort
from waitress import serve
#from datetime
import datetime
import time, os, logging
from moj_licznik import PPETable, MeterTable, CounterTable, MainChartTable
import urllib.parse
logger = logging.getLogger("energaMeter.api")
path = os.path.dirname(os.path.abspath(__file__))
db_file = 'database.sqlite'
db = SqliteDatabase(os.path.join(path, db_file))
app = Flask(__name__)
@app.route('/', methods=['GET'])
def root():
query = PPETable.select() #.where(PPETable.is_active == True)
result_ppes = list(query)
ppes = []
for p in result_ppes:
meters_query = MeterTable.select().where(MeterTable.ppe_id == p.id)
meter_result = meters_query.execute()
meters = []
for meter in meter_result:
countners_query = CounterTable.select().where(CounterTable.meter_id == meter.id)
countners_result = countners_query.execute()
countners = []
for countner in countners_result:
countner = {
'tariff': countner.tariff,
'measurement_date': countner.measurement_date,
'meter_value': countner.meter_value
}
countners.append(countner)
meter = {
'meter_type': meter.meter_type,
'meter_type_url': urllib.parse.quote_plus(meter.meter_type),
'last_update_date': meter.last_update_date,
'first_date': meter.first_date,
'countners': countners
}
meters.append(meter)
ppe = {
'name': p.name,
'id': p.id,
'type': p.type,
'isActive': p.is_active,
'meters': meters
}
ppes.append(ppe)
logger.debug("API: GET /")
return jsonify({'ppes': ppes})
@app.route('/meters', methods=['GET'])
@app.route('/meters/', methods=['GET'])
def meters():
query = PPETable.select() #.where(PPETable.is_active == True)
result_ppes = list(query)
meters = []
for p in result_ppes:
meter = {
'name': p.name,
'id': p.id,
'ppe': p.ppe,
'number_of_zones': '',#p.number_of_zones,
'tariffCode': p.tariffCode,
# 'first_date': p.first_date,
'last_update_date': p.last_update_date,
# 'measurement_date': p.measurement_date,
}
for i in range(1, p.number_of_zones + 1):
zone_key = f'zone{i}'
daily_chart_key = f'zone{i}_daily_chart_sum'
zone_value = getattr(p, zone_key)
daily_chart_value = getattr(p, daily_chart_key)
# Zamień None na zero podczas obliczania sumy
zone_value = float(zone_value) if zone_value is not None else 0
daily_chart_value = float(daily_chart_value) if daily_chart_value is not None else 0
meter[zone_key] = {
'meter': zone_value,
'daily_chart': daily_chart_value,
'sum': zone_value + daily_chart_value
}
meters.append(meter)
logger.debug("GET /meters")
return jsonify({'meters': meters})
@app.route('/<int:ppe_id>', methods=['GET'])
@app.route('/<int:ppe_id>/', methods=['GET'])
def get_ppe(ppe_id):
meters_query = MeterTable.select().where(MeterTable.ppe_id == ppe_id)
meter_result = meters_query.execute()
if not meter_result:
abort(404)
meters = []
for meter in meter_result:
countners_query = CounterTable.select().where(CounterTable.meter_id == meter.id)
countners_result = countners_query.execute()
countners = []
for countner in countners_result:
countner = {
'tariff': countner.tariff,
'measurement_date': countner.measurement_date,
'meter_value': countner.meter_value
}
countners.append(countner)
meter = {
'meter_type': meter.meter_type,
'meter_type_url': urllib.parse.quote_plus(meter.meter_type),
'last_update_date': meter.last_update_date,
'first_date': meter.first_date,
'countners': countners
}
meters.append(meter)
logger.debug(f"API: GET /{ppe_id}")
return jsonify({'meters': meters})
@app.route('/<int:ppe_id>/<meter_type_url>', methods=['GET'])
@app.route('/<int:ppe_id>/<meter_type_url>/', methods=['GET'])
def get_meter_type(ppe_id, meter_type_url):
meter_type = urllib.parse.unquote(meter_type_url)
meters_query = MeterTable.select().where((MeterTable.ppe_id == str(ppe_id)) & (MeterTable.meter_type == meter_type))
#meters_query = MeterTable.select().where(MeterTable.ppe_id == str(ppe_id) & MeterTable.meter_type == meter_type)
meter_result = meters_query.execute()
if not meter_result:
abort(404)
meter = meter_result[0]
countners_query = CounterTable.select().where(CounterTable.meter_id == meter.id)
countners_result = countners_query.execute()
countners = []
for countner in countners_result:
countner = {
'tariff': countner.tariff,
'measurement_date': countner.measurement_date,
'meter_value': countner.meter_value
}
countners.append(countner)
meter = {
'meter_type': meter.meter_type,
'meter_type_url': urllib.parse.quote_plus(meter.meter_type),
'last_update_date': meter.last_update_date,
'first_date': meter.first_date,
'countners': countners
}
logger.debug(f"API: GET /{ppe_id}/{meter_type_url}")
return jsonify({'meter': meter})
@app.route('/<int:ppe_id>/<meter_type_url>/<tariff>', methods=['GET'])
@app.route('/<int:ppe_id>/<meter_type_url>/<tariff>/', methods=['GET'])
def get_countners(ppe_id, meter_type_url, tariff):
meter_type = urllib.parse.unquote(meter_type_url)
meters_query = MeterTable.select().where((MeterTable.ppe_id == str(ppe_id)) & (MeterTable.meter_type == meter_type))
meter_result = meters_query.execute()
if meter_result.count < 1:
abort(404)
meter = meter_result[0]
countners_query = CounterTable.select().where((CounterTable.meter_id == meter.id) & (CounterTable.tariff == tariff))
countners_result = countners_query.execute()
countner = countners_result[0]
countner = {
'tariff': countner.tariff,
'measurement_date': countner.measurement_date,
'meter_value': countner.meter_value
}
logger.debug(f"API: GET /{ppe_id}/{meter_type_url}/{countner}")
return jsonify({'countner': countner})
@app.route('/charts', methods=['GET'])
@app.route('/charts/', methods=['GET'])
def charts():
current_time = datetime.datetime.now()
current_time_unix = time.mktime(current_time.timetuple())
start_time = current_time - datetime.timedelta(days=1)
start_time_unix = time.mktime(start_time.timetuple())
start_date = request.args.get('start_date', start_time_unix*1000)
end_date = request.args.get('end_date', current_time_unix*1000)
mp = request.args.get('mp', None)
meter_type = request.args.get('meter_type', None)
zone = request.args.get('zone', None)
query = MainChartTable.select().where((MainChartTable.tm >= int(start_date)) & (MainChartTable.tm <= int(end_date)))
if mp:
query = query.where(MainChartTable.mp == mp)
if meter_type:
query = query.where(MainChartTable.meter_type == meter_type)
if zone:
query = query.where(MainChartTable.zone == zone)
result_ppes = list(query)
charts = []
for p in result_ppes:
czas = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(p.tm/1000))
chart = {
'mp': p.mp,
'meter_type': p.meter_type,
'meter_type_url': urllib.parse.quote_plus(p.meter_type),
'zone': p.zone,
'time_tm': p.tm,
'time': czas,
'value': p.value
}
charts.append(chart)
end_time = time.time()
logger.debug(f"API: GET /charts - {start_date} - {end_date}")
return jsonify({'charts': charts})

22
src.dev/config.yaml Normal file
View File

@@ -0,0 +1,22 @@
name: "Energa meter"
description: "Energa meter addon"
version: "0.1.3"
slug: "energa_meter"
init: false
options:
energa_username: ""
energa_password: ""
log_level: "INFO"
schema:
energa_username: str
energa_password: password
log_level: str
arch:
- aarch64
- amd64
- armhf
- armv7
- i386
startup: services
ports:
8000/tcp: 8000

0
src.dev/cron.py Normal file
View File

16
src.dev/log_config.py Normal file
View File

@@ -0,0 +1,16 @@
import logging
def configure_logging(l_level, logger_name):
log_level = getattr(logging, l_level.upper(), logging.INFO)
logging.basicConfig(level=log_level, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
console_handler = logging.StreamHandler()
console_handler.setLevel(log_level)
# Dodanie nazwy loggera do formatowania
formatter = logging.Formatter('%(asctime)s - {} - %(levelname)s - %(message)s'.format(logger_name))
console_handler.setFormatter(formatter)
#logging.getLogger().addHandler(console_handler)
print("Logowanie zostało skonfigurowane.")

599
src.dev/moj_licznik.py Normal file
View File

@@ -0,0 +1,599 @@
from peewee import SqliteDatabase
from datetime import datetime, timedelta, date
import calendar, requests, re, time, json, os, logging
import http.cookiejar as cookiejar
from requests.exceptions import HTTPError
from bs4 import BeautifulSoup
from enum import Enum
from peewee import AutoField, Model, CharField, IntegerField, DateField, BooleanField, CompositeKey, DecimalField, ForeignKeyField, SQL
import urllib.parse
logger = logging.getLogger("energaMeter")
path = os.path.dirname(os.path.abspath(__file__))
db_file = 'database.sqlite'
db = SqliteDatabase(os.path.join(path, db_file))
class ChartType(Enum):
DAY = "DAY"
MONTH = "MONTH"
YEAR = "YEAR"
class PPETable(Model):
id = CharField(primary_key=True)
ppe = CharField(unique=True)
tariffCode = CharField()
type = CharField()
name = CharField()
last_update_date = DateField(null=True)
is_active = BooleanField(default=True)
class Meta:
database = db
table_name = 'PPE'
constraints = [SQL('UNIQUE (ppe, tariffCode)')]
class MeterTable(Model):
id = AutoField() # Meter point
ppe_id = ForeignKeyField(PPETable, backref='zones')
meter_type = CharField()
last_update_date = DateField(null=True)
first_date = DateField(null=True)
class Meta:
database = db
table_name = 'METER'
constraints = [SQL('UNIQUE (ppe_id, meter_type)')]
class CounterTable(Model):
id = AutoField()
meter_id = ForeignKeyField(MeterTable, backref='meter')
tariff = CharField()
measurement_date = DateField(null=True)
meter_value = DecimalField(max_digits=15, decimal_places=5, null=True)
class Meta:
database = db
table_name = 'COUNTER'
# class CounterTable(Model):
# meter_id = ForeignKeyField(MeterTable, backref='zones')
# measurement_date = DateField(null=True)
# meter_value = DecimalField(max_digits=15, decimal_places=5, null=True)
# class Meta:
# database = db
class ChartTable(Model):
id = IntegerField()
meter_type = CharField()
year = IntegerField()
month = IntegerField(null=True)
day = IntegerField(null=True)
value =CharField()
class Meta:
database = db
table_name = 'CHART_CACHE'
primary_key = CompositeKey('id', 'year', 'month', 'day')
class MainChartTable(Model):
mp = CharField()
meter_type = CharField()
zone = IntegerField()
tm = IntegerField()
value = DecimalField(max_digits=20, decimal_places=16, null=True)
tarAvg = DecimalField(max_digits=20, decimal_places=16, null=True)
est = BooleanField(default=False)
cplt = BooleanField(default=False)
class Meta:
database = db
table_name = 'CHART'
primary_key = CompositeKey('mp', 'zone', 'tm')
def znajdz_typ_odbiorcy(element):
typ_odbiorcy = ''
div_elements = element.find_all('div', recursive=False)
for div_element in div_elements:
typ_span = div_element.find('span', text='Typ')
if typ_span:
typ_odbiorcy = typ_span.next_sibling.strip()
return typ_odbiorcy
typ_odbiorcy = znajdz_typ_odbiorcy(div_element) # Rekurencyjne przeszukiwanie zagnieżdżonych div
return typ_odbiorcy
def findCountners(page):
table = page.find('table')
countner_type_list = ["A-", "A+"]
data_list = []
# Jeśli znaleźliśmy tabelę, możemy przeszukać jej wiersze i komórki
if table:
for row in table.find_all('tr'):
cells = row.find_all('td')
if len(cells) > 1:
# Pobieramy opis z pierwszej komórki
description = cells[0].text.strip()
# Pomijamy, jeśli rodzaj_licznika jest pusty
if not description:
continue
# Usuwamy datę z opisu
description_parts = description.split('\n')
meter_type = description_parts[0][:2].strip()
if meter_type not in countner_type_list:
continue
tariff0 = description_parts[0][2:].strip()
tariff = ''.join(filter(str.isdigit, tariff0))
measurement_date = description_parts[1].strip()
# Pobieramy dane liczbowe z drugiej komórki
data = cells[1].text.strip()
# Usuwamy znaki nowej linii i spacje z danych
data = data.replace('\n', '').replace(' ', '')
data = data.replace(',', '.')
# Dzielimy dane na część całkowitą i część dziesiętną
parts = data.split('.')
if len(parts) == 2:
integer_part = parts[0]
decimal_part = parts[1]
else:
integer_part = parts[0]
decimal_part = '0'
data_dict = {
"meter_type": meter_type,
"tariff": tariff,
"measurement_date": measurement_date,
"meter_value": f"{integer_part}.{decimal_part}"
}
data_list.append(data_dict)
return data_list
class MojLicznik:
session = requests.Session()
session.cookies = cookiejar.LWPCookieJar(filename='cookies.txt')
meter_url = "https://mojlicznik.energa-operator.pl"
def databaseInit(self):
db.create_tables([PPETable], safe=True)
db.create_tables([MeterTable], safe=True)
db.create_tables([CounterTable], safe=True)
db.create_tables([ChartTable], safe=True)
db.create_tables([MainChartTable], safe=True)
def __init__(self):
self.username = None
self.password = None
self.loginStatus = False
self.data = [] # Change self.data to a list
self.ppes = []
self.databaseInit()
def login(self, _username, _password):
self.username = _username
self.password = _password
login_url = f"{self.meter_url}/dp/UserLogin.do"
self.loginStatus = False
try:
logger.debug("Pobieram formularz logowania.")
response = self.session.get(login_url)
response.raise_for_status()
if response.url == 'https://mojlicznik.energa-operator.pl/maintenance.html':
logger.critical("Trwają prace serwisowe w Mój Licznik. Logowanie nie jest możliwe, spróbuj później.")
return
except HTTPError as e:
logger.error(f"Wystąpił błąd HTTP: {e}")
soup = BeautifulSoup(response.text, 'html.parser')
csrf_token = soup.find('input', {'name': '_antixsrf'})['value']
login_data = {
'j_username': self.username,
'j_password': self.password,
'selectedForm': '1',
'save': 'save',
'clientOS': 'web',
'_antixsrf': csrf_token
}
try:
response = self.session.post(login_url, data=login_data)
response.raise_for_status()
except HTTPError as e:
logger.error(f"Wystąpił błąd HTTP: {e}")
soup = BeautifulSoup(response.text, 'html.parser')
login_error_text = 'Użytkownik lub hasło niepoprawne'
login_error = soup.find('div', text=login_error_text)
if login_error:
logger.critical(login_error_text)
return
else:
self.loginStatus = True
logger.info(f"Zalogowano")
body = soup.find('body')
type_value = znajdz_typ_odbiorcy(body)
logger.debug(f"Typ umowy: {type_value}.")
select_elements = soup.find_all('script', type='text/javascript')
meter_isd = []
for el in select_elements:
pattern = r"id:\s+(\d+),[\s\S]*?ppe:\s+'([\d\s]+)',[\s\S]*?tariffCode:\s+'([^']+)',[\s\S]*?name:\s+'([^']+)'"
matches = re.search(pattern, el.text)
if matches:
id_value = matches.group(1)
ppe_value = matches.group(2)
tariffCode_value = matches.group(3)
name_value = matches.group(4)
meter_isd.append(id_value)
retrieved_record = PPETable.get_or_none(id=id_value)
if retrieved_record:
logger.info(f"Licznik {id_value} istnieje w systemie.")
if not retrieved_record.is_active:
retrieved_record.is_active = True
retrieved_record.save()
else:
logger.info(f"Licznik {id_value} nie istnieje w systemie, zostanie dodany.")
data = PPETable.create(
id=id_value,
ppe=ppe_value,
tariffCode=tariffCode_value,
type=type_value,
name=name_value
)
update_query = PPETable.update(is_active=0).where(PPETable.id.not_in(meter_isd))
update_query.execute()
def logout(self):
logout_url = f"{self.meter_url}/dp/MainLogout.go"
try:
response = self.session.get(logout_url)
response.raise_for_status()
self.loginStatus = False
logger.info(f"Wylogowano.")
except HTTPError as e:
logger.error(f"Wystąpił błąd HTTP: {e}")
def update_countners(self):
query = PPETable.select().where(PPETable.is_active == True)
result_ppes = query.execute()
for p in result_ppes:
meter_url = f"{self.meter_url}/dp/UserData.do?mpc={p.id}&ppe={p.ppe}"
try:
response = self.session.get(meter_url)
response.raise_for_status()
soup = BeautifulSoup(response.text, 'html.parser')
countners_dict = findCountners(soup)
for c in countners_dict:
mn, mu = MeterTable.get_or_create(ppe_id=p.id, meter_type=c['meter_type'])
mn.last_update_date = datetime.now()
mn.save()
cn, cu = CounterTable.get_or_create(
meter_id = mn.id,
tariff=c['tariff']
)
cn.meter_value = c['meter_value']
cn.measurement_date = c['measurement_date']
cn.save()
logger.info(f"Zapisano stan licznika {c['meter_type']} taryfa {c['tariff']} z dnia: {c['measurement_date']} : {c['meter_value']}")
except HTTPError as e:
logger.error(f"Wystąpił błąd HTTP: {e}")
def update_first_date(self):
ppes_query = PPETable.select().where(PPETable.is_active == True)
result_ppes = ppes_query.execute()
for p in result_ppes:
meters_query = MeterTable.select().where((MeterTable.ppe_id == p.id) & (MeterTable.first_date.is_null(True)))
meters_result = meters_query.execute()
for meter in meters_result:
meter_type = meter.meter_type
print(f"Szukam najstarsze dane historyczne licznika {p.name} (PPE: {p.ppe}, {p.id}) typ: {meter_type}")
meter_point = p.id
max_years_back = 5
start_date = datetime.now()
last_chart_year = None
for n in range(max_years_back + 1):
first_day_of_year = datetime(start_date.year-n, 1, 1)
data_json = self.download_chart(ChartType.YEAR, first_day_of_year, meter_point, meter_type)
if data_json:
data = json.loads(data_json)
if data and data.get("mainChart") and len(data["mainChart"]) > 0:
last_chart_year = first_day_of_year.year
last_chart_month = None
max_month = 12
for n in range(max_month, 0, -1):
first_day_of_month = datetime(last_chart_year, n, 1)
data_json = self.download_chart(ChartType.MONTH, first_day_of_month, meter_point, meter_type)
if data_json:
data = json.loads(data_json)
if data and data.get("mainChart") and len(data["mainChart"]) > 0:
last_chart_month = n
last_chart_day = None
max_day = 31
first_day_of_day = datetime(last_chart_year, last_chart_month, 1)
_, max_day = calendar.monthrange(first_day_of_day.year, first_day_of_day.month)
for n in range(max_day, 0, -1):
first_day_of_day = datetime(last_chart_year, last_chart_month, n)
data_json = self.download_chart(ChartType.DAY, first_day_of_day, meter_point, meter_type)
if data_json:
data = json.loads(data_json)
if data and data.get("mainChart") and len(data["mainChart"]) > 0:
last_chart_day = n
first_date = datetime(last_chart_year, last_chart_month, last_chart_day).date()
print(f"Najstarsze dane historyczne dla licznika {p.name} (PPE: {p.ppe}, {p.id}) typ: {meter_type}: {first_date}")
meter.first_date = first_date
meter.save()
def save_main_charts(self, mp, vals, meter_type):
for val in vals:
#try:
z = val["zones"]
if z[0]:
# MainChartTable.get_or_create(tm = val["tm"], zone = 1, value = z[0], tarAvg=val["tarAvg"], est=val["est"], cplt=val["cplt"])
try:
existing_record = MainChartTable.get((MainChartTable.meter_type == meter_type) & (MainChartTable.mp == mp) & (MainChartTable.tm == val["tm"]) & (MainChartTable.zone == 1))
except MainChartTable.DoesNotExist:
# Jeśli rekord nie istnieje, utwórz nowy
MainChartTable.create(
mp=mp,
meter_type=meter_type,
tm=val["tm"],
zone=1,
value=z[0],
tarAvg=val["tarAvg"],
est=val["est"],
cplt=val["cplt"]
)
if z[1]:
try:
existing_record = MainChartTable.get((MainChartTable.meter_type == meter_type) & (MainChartTable.mp == mp) & (MainChartTable.tm == val["tm"]) & (MainChartTable.zone == 2))
except MainChartTable.DoesNotExist:
# Jeśli rekord nie istnieje, utwórz nowy
MainChartTable.create(
mp=mp,
meter_type=meter_type,
tm=val["tm"],
zone=2,
value=z[1],
tarAvg=val["tarAvg"],
est=val["est"],
cplt=val["cplt"]
)
if z[2]:
try:
existing_record = MainChartTable.get((MainChartTable.meter_type == meter_type) & (MainChartTable.mp == mp) & (MainChartTable.tm == val["tm"]) & (MainChartTable.zone == 1))
except MainChartTable.DoesNotExist:
# Jeśli rekord nie istnieje, utwórz nowy
MainChartTable.create(
mp=mp,
meter_type=meter_type,
tm=val["tm"],
zone=3,
value=z[2],
tarAvg=val["tarAvg"],
est=val["est"],
cplt=val["cplt"]
)
#except:
# pass
return None
def download_chart(self, type, date, meter_point, meter_type, update_mode=False):
if type == ChartType.DAY:
chart_type = "DAY"
first_day = datetime(date.year, date.month, date.day)
tsm_date = int(time.mktime(first_day.timetuple()) * 1000)
if type == ChartType.MONTH:
chart_type = "MONTH"
first_day = datetime(date.year, date.month, 1)
tsm_date = int(time.mktime(first_day.timetuple()) * 1000)
if type == ChartType.YEAR:
chart_type = "YEAR"
first_day = datetime(date.year, 1, 1)
tsm_date = int(time.mktime(first_day.timetuple()) * 1000)
# meter_type = 'A+'
chart_url = f"{self.meter_url}/dp/resources/chart?mainChartDate={tsm_date}&type={chart_type}&meterPoint={meter_point}&mo={urllib.parse.quote_plus(meter_type)}"
try:
response = self.session.get(chart_url)
data = json.loads(response.text)
response.raise_for_status()
if data["response"]:
id = data["response"]["meterPoint"]
mainChartDate = data["response"]["mainChartDate"]
mainChart = data["response"]["mainChart"]
if type == ChartType.DAY:
self.save_main_charts(meter_point, mainChart, meter_type)
date = int(mainChartDate) / 1000
month = None
day = None
dt = datetime.fromtimestamp(date)
year = dt.year
if type == ChartType.MONTH:
month = dt.month
if type == ChartType.DAY:
month = dt.month
day = dt.day
json_dump = json.dumps(data["response"], ensure_ascii=False)
if update_mode:
try:
chart_record = ChartTable.get(id=id,year=year, month=month, day=day)
chart_record.value = json_dump
chart_record.save()
except ChartTable.DoesNotExist:
chart_record = ChartTable.create(id=id, meter_type=meter_type, value=json_dump, year=year, month=month, day=day)
else:
try:
ChartTable.create(id=id, meter_type=meter_type, value=json_dump, year=year, month=month, day=day)
except:
pass
return json_dump
return None
except HTTPError as e:
logger.error(f"Wystąpił błąd HTTP: {e}")
def download_charts(self, full_mode=False):
query = PPETable.select().where(PPETable.is_active == True)
result_ppes = query.execute()
for p in result_ppes:
meters_query = MeterTable.select().where((MeterTable.ppe_id == p.id)) # // & (MeterTable.first_date.is_null(True)))
meters_result = meters_query.execute()
for meter in meters_result:
meter_type = meter.meter_type
logger.info(f"Pobieram dane historyczne dla {p.name} ({p.id}) typ: {meter_type}")
current_date = meter.first_date
if not full_mode:
current_date = meter.last_update_date - timedelta(days=1)
while current_date <= date.today():
try:
record = ChartTable.get(id=p.id, meter_type=meter_type, year=current_date.year, month=current_date.month, day=current_date.day)
# Jeśli rekord o określonych wartościach klucza głównego istnieje, zostanie pobrany.
logger.debug(f"Posiadam dane historyczne dla {p.name} ({p.id}) typ: {meter_type} na dzień: {current_date}")
except ChartTable.DoesNotExist:
self.download_chart(ChartType.DAY, current_date, p.id, meter_type)
logger.debug(f"Pobieram dane historyczne dla {p.name} ({p.id}) typ: {meter_type} na dzień: {current_date}")
current_date += timedelta(days=1)
def update_last_days(self):
today = datetime.today().date()
query = PPETable.select().where(PPETable.is_active == True)
result_ppes = query.execute()
for p in result_ppes:
meters_query = MeterTable.select().where((MeterTable.ppe_id == p.id) & (MeterTable.first_date.is_null(True)))
meters_result = meters_query.execute()
for meter in meters_result:
meter_type = meter.meter_type
logger.info(f"Aktualizacja danych bieżących dla {p.name} ({p.id}) typ: {meter_type}")
if not p.last_update_date:
p.last_update_date = today - timedelta(days=5)
p.save()
last_update_date = p.last_update_date - timedelta(days=1)
while last_update_date <= today:
logger.debug(f"Aktualizacja danych dla {p.name} ({p.id}) typ: {meter_type} na dzień: {last_update_date}")
self.download_chart(ChartType.DAY, last_update_date, p.id, meter_type, True)
p.last_update_date = last_update_date
p.save()
last_update_date += timedelta(days=1)
def get_current_meters(self, add_daily_char_data=False):
query = PPETable.select().where(PPETable.is_active == True)
result_ppes = query.execute()
for p in result_ppes:
if add_daily_char_data:
query = ChartTable.select().where((ChartTable.id == p.id) & (ChartTable.year == p.measurement_date.year) & (ChartTable.month == p.measurement_date.month) & (ChartTable.day == p.measurement_date.day))
query_count = query.count()
if (query_count > 0):
query_first = query.first()
value_json = json.loads(query_first.value)
print(query_first.value)
zones = value_json.get("zones", [])
if zones:
zone1_data = zones[0]
zone1_main_chart = zone1_data.get("mainChart", [])
# def set_daily_zones(self):
# query = PPETable.select().where(PPETable.is_active == True)
# result_ppes = query.execute()
# for p in result_ppes:
# query = ChartTable.select().where(
# (ChartTable.id == p.id) &
# ((ChartTable.year > p.measurement_date.year) |
# ((ChartTable.year == p.measurement_date.year) &
# (ChartTable.month > p.measurement_date.month)) |
# ((ChartTable.year == p.measurement_date.year) &
# (ChartTable.month == p.measurement_date.month) &
# (ChartTable.day >= p.measurement_date.day))
# ))
# zones_sums = {f"zone{i+1}_daily_chart_sum": 0.0 for i in range(3)}
# for chart_entry in query:
# value_json = json.loads(chart_entry.value)
# main_chart = value_json.get("mainChart", [])
# for entry in main_chart:
# zones = entry.get("zones", [])
# for i, value in enumerate(zones):
# if value is not None:
# zones_sums[f"zone{i+1}_daily_chart_sum"] += value
# for key, value in zones_sums.items():
# setattr(p, key, value)
# p.save()
def print_summary_zones(self):
query = PPETable.select().where(PPETable.is_active == True)
result_ppes = query.execute()
for p in result_ppes:
zon1 = (p.zone1 if p.zone1 is not None else 0 ) + (p.zone1_daily_chart_sum if p.zone1_daily_chart_sum is not None else 0)
zon2 = (p.zone2 if p.zone2 is not None else 0 ) + (p.zone2_daily_chart_sum if p.zone2_daily_chart_sum is not None else 0)
zon3 = (p.zone3 if p.zone3 is not None else 0 ) + (p.zone3_daily_chart_sum if p.zone3_daily_chart_sum is not None else 0)
print(f"{p.name} : {round(zon1, 5)} "
f"{round(zon2,5)} "
f"{round(zon3,5)}")
def get_current_meters_list(self):
query = PPETable.select().where(PPETable.is_active == True)
return query.execute()
def get_current_meter_value(self, meter_id, zone):
if zone == "zone1":
pPETable = PPETable.get(PPETable.id == meter_id)
return pPETable.zone1
if zone == "zone2":
pPETable = PPETable.get(PPETable.id == meter_id)
return pPETable.zone2
if zone == "zone3":
pPETable = PPETable.get(PPETable.id == meter_id)
return pPETable.zone3
return None

BIN
src.dev/requirements.txt Normal file

Binary file not shown.

96
src.dev/run.py Normal file
View File

@@ -0,0 +1,96 @@
import threading
import time, os, datetime
from api import app
from waitress import serve
import logging, configparser
from pathlib import Path
from moj_licznik import MojLicznik
from log_config import configure_logging
startup_task_completed = threading.Event()
def http_server():
serve(app, host="0.0.0.0", port=8000, threads=8)
# Jednorazowe zadanie przy starcie
def startup_task():
mojLicznik = MojLicznik()
logger.info("Rozpoczynam logowanie do Mój licznik.")
logger.debug(f"Logowanie użytkownika {username}.")
mojLicznik.login(username, password)
if mojLicznik.loginStatus:
logger.info(f"Aktualizacja liczników...")
mojLicznik.update_countners()
logger.info(f"Wyszukiwanie najstarszych danych...")
mojLicznik.update_first_date()
logger.info(f"Pobieranie danych...")
mojLicznik.download_charts(full_mode=True)
mojLicznik.update_last_days()
#mojLicznik.set_daily_zones()
logger.debug(f"Wylogowanie użytkownika.")
mojLicznik.logout()
startup_task_completed.set()
# Moduł 3: Cykliczne zadanie
def periodic_task():
startup_task_completed.wait()
while True:
try:
waiting_seconds = 600
logger.info(f"Oczekianie...")
logger.debug(f"Czekam {waiting_seconds} sekund.")
time.sleep(waiting_seconds)
mojLicznik = MojLicznik()
logger.info(f"Update...{datetime.datetime.now()}")
logger.info(f"Logowanie...")
mojLicznik.login(username, password)
if mojLicznik.loginStatus:
logger.info(f"Aktualizacja danych bieżących...")
mojLicznik.update_countners()
mojLicznik.update_last_days()
mojLicznik.download_charts(full_mode=False)
# mojLicznik.set_daily_zones()
mojLicznik.logout()
except:
logger.error("PT001: Błąd aktualizacji danych...")
# Uruchomienie wątków dla każdego modułu
if __name__ == "__main__":
plik = Path('config.ini')
username = None
password = None
log_level = None
if plik.is_file():
config = configparser.ConfigParser()
config.read("config.ini")
username = config.get("Credentials", "username")
password = config.get("Credentials", "password")
log_level = config.get("Logger", "log_level")
else:
username = os.getenv("USERNAME")
password = os.getenv("PASSWORD")
log_level = os.getenv("LOGLEVEL")
logger_name = "energaMeter"
configure_logging(log_level, logger_name)
logger = logging.getLogger(logger_name)
peewee_logger = logging.getLogger('peewee')
peewee_logger.setLevel(logging.ERROR) # Ustaw poziom na ERROR lub inny poziom, który jest wyższy niż ustawiony w configure_logging
logger.info("Inicjalizacja OK.")
http_server_thread = threading.Thread(target=http_server)
startup_task_thread = threading.Thread(target=startup_task)
periodic_task_thread = threading.Thread(target=periodic_task)
http_server_thread.start()
startup_task_thread.start()
periodic_task_thread.start()
http_server_thread.join()
startup_task_thread.join()
periodic_task_thread.join()

9
src.dev/run.sh Normal file
View File

@@ -0,0 +1,9 @@
#!/usr/bin/with-contenv bashio
export USERNAME=$(bashio::config 'energa_username')
export PASSWORD=$(bashio::config 'energa_password')
export LOGLEVEL=$(bashio::config 'log_level')
bashio::log.info "Uruchamiam API"
python run.py