Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use timestamp device class #27

Merged
merged 4 commits into from
Nov 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion custom_components/sensus_analytics/manifest.json
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{
"domain": "sensus_analytics",
"name": "Sensus Analytics Integration",
"version": "1.4.3",
"version": "1.4.5",
"documentation": "https://github.com/zestysoft/sensus_analytics_integration",
"dependencies": [],
"codeowners": ["@zestysoft"],
Expand Down
113 changes: 57 additions & 56 deletions custom_components/sensus_analytics/sensor.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
"""Sensor platform for the Sensus Analytics Integration."""

from homeassistant.components.sensor import SensorEntity
from homeassistant.components.sensor import SensorDeviceClass, SensorEntity
from homeassistant.config_entries import ConfigEntry
from homeassistant.core import HomeAssistant
from homeassistant.helpers.entity import DeviceInfo
Expand Down Expand Up @@ -34,7 +34,24 @@ async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry, async_add_e
)


class DynamicUnitSensorBase(CoordinatorEntity, SensorEntity):
class UsageConversionMixin:
"""Mixin to provide usage conversion."""

# pylint: disable=too-few-public-methods
def _convert_usage(self, usage):
"""Convert usage based on configuration and native unit."""
if usage is None:
return None
usage_unit = self.coordinator.data.get("usageUnit")
if usage_unit == "CF" and self.coordinator.config_entry.data.get("unit_type") == "G":
try:
return round(float(usage) * CF_TO_GALLON)
except (ValueError, TypeError):
return None
return usage


class DynamicUnitSensorBase(UsageConversionMixin, CoordinatorEntity, SensorEntity):
"""Base class for sensors with dynamic units."""

def __init__(self, coordinator, entry):
Expand All @@ -50,15 +67,6 @@ def __init__(self, coordinator, entry):
model="Water Meter",
)

def _convert_usage(self, usage):
"""Convert usage based on configuration and native unit."""
if usage is None:
return None
usage_unit = self.coordinator.data.get("usageUnit")
if usage_unit == "CF" and self.coordinator.config_entry.data.get("unit_type") == "G":
return round(float(usage) * CF_TO_GALLON)
return usage

def _get_usage_unit(self):
"""Determine the unit of measurement for usage sensors."""
usage_unit = self.coordinator.data.get("usageUnit")
Expand All @@ -72,22 +80,25 @@ def native_unit_of_measurement(self):
return self._get_usage_unit()


class StaticUnitSensorBase(CoordinatorEntity, SensorEntity):
class StaticUnitSensorBase(UsageConversionMixin, CoordinatorEntity, SensorEntity):
"""Base class for sensors with static units."""

def __init__(self, coordinator, entry, unit):
def __init__(self, coordinator, entry, unit=None, device_class=None):
"""Initialize the static unit sensor base."""
super().__init__(coordinator)
self.coordinator = coordinator
self.entry = entry
self._unique_id = f"{DOMAIN}_{entry.entry_id}"
self._attr_native_unit_of_measurement = unit
self._attr_device_info = DeviceInfo(
identifiers={(DOMAIN, entry.entry_id)},
name=DEFAULT_NAME,
manufacturer="Unknown",
model="Water Meter",
)
if unit:
self._attr_native_unit_of_measurement = unit
if device_class:
self._attr_device_class = device_class


class SensusAnalyticsDailyUsageSensor(DynamicUnitSensorBase):
Expand Down Expand Up @@ -143,8 +154,8 @@ class SensusAnalyticsLastReadSensor(StaticUnitSensorBase):

def __init__(self, coordinator, entry):
"""Initialize the last read sensor."""
super().__init__(coordinator, entry, unit="UTC")
self._attr_name = f"{DEFAULT_NAME} Last Read"
super().__init__(coordinator, entry, unit=None, device_class=SensorDeviceClass.TIMESTAMP)
self._attr_name = f"{DEFAULT_NAME} Last Read (UTC)"
self._attr_unique_id = f"{self._unique_id}_last_read"
self._attr_icon = "mdi:clock-time-nine"

Expand All @@ -154,7 +165,10 @@ def native_value(self):
last_read_ts = self.coordinator.data.get("lastRead")
if last_read_ts:
# Convert milliseconds to seconds for timestamp
return dt_util.utc_from_timestamp(last_read_ts / 1000).strftime("%Y-%m-%d %H:%M:%S")
try:
return dt_util.utc_from_timestamp(last_read_ts / 1000)
except (ValueError, TypeError):
return None
return None


Expand Down Expand Up @@ -228,8 +242,8 @@ class SensusAnalyticsLatestReadTimeSensor(StaticUnitSensorBase):

def __init__(self, coordinator, entry):
"""Initialize the latest read time sensor."""
super().__init__(coordinator, entry, unit="UTC")
self._attr_name = f"{DEFAULT_NAME} Latest Read Time"
super().__init__(coordinator, entry, unit=None, device_class=SensorDeviceClass.TIMESTAMP)
self._attr_name = f"{DEFAULT_NAME} Latest Read Time (UTC)"
self._attr_unique_id = f"{self._unique_id}_latest_read_time"
self._attr_icon = "mdi:clock-time-nine"

Expand All @@ -239,7 +253,10 @@ def native_value(self):
latest_read_time_ts = self.coordinator.data.get("latestReadTime")
if latest_read_time_ts:
# Convert milliseconds to seconds for timestamp
return dt_util.utc_from_timestamp(latest_read_time_ts / 1000).strftime("%Y-%m-%d %H:%M:%S")
try:
return dt_util.utc_from_timestamp(latest_read_time_ts / 1000)
except (ValueError, TypeError):
return None
return None


Expand Down Expand Up @@ -279,15 +296,6 @@ def native_value(self):
usage_gallons = self._convert_usage(usage)
return self._calculate_cost(usage_gallons)

def _convert_usage(self, usage):
"""Convert usage based on the configuration and native unit."""
if usage is None:
return None
usage_unit = self.coordinator.data.get("usageUnit")
if usage_unit == "CF" and self.coordinator.config_entry.data.get("unit_type") == "G":
return round(float(usage) * CF_TO_GALLON)
return usage

def _calculate_cost(self, usage_gallons):
"""Calculate the billing cost based on tiers and service fee."""
tier1_gallons = self.coordinator.config_entry.data.get("tier1_gallons")
Expand All @@ -298,15 +306,16 @@ def _calculate_cost(self, usage_gallons):
service_fee = self.coordinator.config_entry.data.get("service_fee")

cost = service_fee
if usage_gallons <= tier1_gallons:
cost += usage_gallons * tier1_price
elif usage_gallons <= tier1_gallons + tier2_gallons:
cost += tier1_gallons * tier1_price
cost += (usage_gallons - tier1_gallons) * tier2_price
else:
cost += tier1_gallons * tier1_price
cost += tier2_gallons * tier2_price
cost += (usage_gallons - tier1_gallons - tier2_gallons) * tier3_price
if usage_gallons is not None:
if usage_gallons <= tier1_gallons:
cost += usage_gallons * tier1_price
elif usage_gallons <= tier1_gallons + tier2_gallons:
cost += tier1_gallons * tier1_price
cost += (usage_gallons - tier1_gallons) * tier2_price
else:
cost += tier1_gallons * tier1_price
cost += tier2_gallons * tier2_price
cost += (usage_gallons - tier1_gallons - tier2_gallons) * tier3_price

return round(cost, 2)

Expand All @@ -330,15 +339,6 @@ def native_value(self):
usage_gallons = self._convert_usage(usage)
return self._calculate_daily_fee(usage_gallons)

def _convert_usage(self, usage):
"""Convert usage based on the configuration and native unit."""
if usage is None:
return None
usage_unit = self.coordinator.data.get("usageUnit")
if usage_unit == "CF" and self.coordinator.config_entry.data.get("unit_type") == "G":
return round(float(usage) * CF_TO_GALLON)
return usage

def _calculate_daily_fee(self, usage_gallons):
"""Calculate the daily fee based on tiers."""
tier1_gallons = self.coordinator.config_entry.data.get("tier1_gallons")
Expand All @@ -348,14 +348,15 @@ def _calculate_daily_fee(self, usage_gallons):
tier3_price = self.coordinator.config_entry.data.get("tier3_price")

cost = 0
if usage_gallons <= tier1_gallons:
cost += usage_gallons * tier1_price
elif usage_gallons <= tier1_gallons + tier2_gallons:
cost += tier1_gallons * tier1_price
cost += (usage_gallons - tier1_gallons) * tier2_price
else:
cost += tier1_gallons * tier1_price
cost += tier2_gallons * tier2_price
cost += (usage_gallons - tier1_gallons - tier2_gallons) * tier3_price
if usage_gallons is not None:
if usage_gallons <= tier1_gallons:
cost += usage_gallons * tier1_price
elif usage_gallons <= tier1_gallons + tier2_gallons:
cost += tier1_gallons * tier1_price
cost += (usage_gallons - tier1_gallons) * tier2_price
else:
cost += tier1_gallons * tier1_price
cost += tier2_gallons * tier2_price
cost += (usage_gallons - tier1_gallons - tier2_gallons) * tier3_price

return round(cost, 2)
Loading