-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathEastron_device.py
93 lines (69 loc) · 2.43 KB
/
Eastron_device.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
from copy import copy
import dbus
from functools import partial
from pymodbus.client import *
from pymodbus.register_read_message import ReadHoldingRegistersResponse, ReadInputRegistersResponse
import logging
import os
import time
import traceback
import device
from settingsdevice import SettingsDevice
from vedbus import VeDbusService
import __main__
from register import Reg
from utils import *
log = logging.getLogger()
class ModbusDevice(device.ModbusDevice):
min_timeout = 0.1
def read_data_regs(self, regs, d):
now = time.time()
if all(now - r.time < r.max_age for r in regs):
return
start = regs[0].base
count = regs[-1].base + regs[-1].count - start
rr = self.modbus.read_input_registers(start, count, unit=self.unit)
latency = time.time() - now
if rr.isError():
log.error('Error reading registers %#04x-%#04x: %s',
start, start + count - 1, rr)
raise Exception(rr)
for reg in regs:
base = reg.base - start
end = base + reg.count
if now - reg.time > reg.max_age:
if reg.decode(rr.registers[base:end]):
d[reg.name] = copy(reg) if reg.isvalid() else None
reg.time = now
return latency
class CustomName(device.CustomName):
pass
class EnergyMeter(ModbusDevice):
allowed_roles = ['grid', 'pvinverter', 'genset', 'acload']
default_role = 'grid'
default_instance = 40
nr_phases = None
def position_setting_changed(self, service, path, value):
self.dbus['/Position'] = value['Value']
def init_device_settings(self, dbus):
super().init_device_settings(dbus)
self.pos_item = None
if self.role == 'pvinverter':
self.pos_item = self.settings.addSetting(
self.settings_path + '/Position', 0, 0, 2,
callback=self.position_setting_changed)
def device_init_late(self):
super().device_init_late()
if self.pos_item is not None:
self.dbus.add_path('/Position', self.pos_item.get_value(),
writeable=True,
onchangecallback=self.position_changed)
def position_changed(self, path, val):
if not 0 <= val <= 2:
return False
self.pos_item.set_value(val)
return True
__all__ = [
'EnergyMeter',
'ModbusDevice',
]