-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathsensor.py
64 lines (52 loc) · 1.79 KB
/
sensor.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
import requests
import re
from datetime import timedelta
from homeassistant.util import Throttle
from homeassistant.components.sensor import (
SensorEntity,
SensorDeviceClass,
SensorStateClass
)
from homeassistant.const import UnitOfEnergy # Ensure using updated constants
# Define the minimum time interval between updates
MIN_TIME_BETWEEN_UPDATES = timedelta(minutes=5)
def scrape_solar_data(url):
try:
response = requests.get(url, timeout=10)
response.raise_for_status()
# Use regular expression to find the solar energy data
match = re.search(r"Solar Generated Today: (\d+\.\d+) kWh", response.text)
if match:
return float(match.group(1))
except Exception as e:
return None
class APSystemsSensor(SensorEntity):
def __init__(self, ip_address):
self._state = None
self._ip_address = ip_address
self._name = "APSystems Power"
@property
def name(self):
return self._name
@property
def unique_id(self):
return f"apsystems_power_{self._ip_address}"
@property
def device_class(self):
return SensorDeviceClass.ENERGY
@property
def state_class(self):
return SensorStateClass.TOTAL_INCREASING
@property
def unit_of_measurement(self):
return UnitOfEnergy.KILO_WATT_HOUR
@property
def state(self):
return self._state
@Throttle(MIN_TIME_BETWEEN_UPDATES)
async def async_update(self):
url = f"http://{self._ip_address}/index.php/realtimedata/power_graph"
self._state = await self.hass.async_add_executor_job(scrape_solar_data, url)
async def async_setup_entry(hass, entry, async_add_entities):
ip_address = entry.data["ip_address"]
async_add_entities([APSystemsSensor(ip_address)], True)