Skip to content

Commit

Permalink
Merge pull request #27 from zestysoft/remove-utc-unit
Browse files Browse the repository at this point in the history
Use timestamp device class
  • Loading branch information
zestysoft authored Nov 29, 2024
2 parents 39a49d1 + cb986b9 commit 70741ce
Show file tree
Hide file tree
Showing 2 changed files with 58 additions and 57 deletions.
2 changes: 1 addition & 1 deletion custom_components/sensus_analytics/manifest.json
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{
"domain": "sensus_analytics",
"name": "Sensus Analytics Integration",
"version": "1.4.3",
"version": "1.4.5",
"documentation": "https://github.com/zestysoft/sensus_analytics_integration",
"dependencies": [],
"codeowners": ["@zestysoft"],
Expand Down
113 changes: 57 additions & 56 deletions custom_components/sensus_analytics/sensor.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
"""Sensor platform for the Sensus Analytics Integration."""

from homeassistant.components.sensor import SensorEntity
from homeassistant.components.sensor import SensorDeviceClass, SensorEntity
from homeassistant.config_entries import ConfigEntry
from homeassistant.core import HomeAssistant
from homeassistant.helpers.entity import DeviceInfo
Expand Down Expand Up @@ -34,7 +34,24 @@ async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry, async_add_e
)


class DynamicUnitSensorBase(CoordinatorEntity, SensorEntity):
class UsageConversionMixin:
"""Mixin to provide usage conversion."""

# pylint: disable=too-few-public-methods
def _convert_usage(self, usage):
"""Convert usage based on configuration and native unit."""
if usage is None:
return None
usage_unit = self.coordinator.data.get("usageUnit")
if usage_unit == "CF" and self.coordinator.config_entry.data.get("unit_type") == "G":
try:
return round(float(usage) * CF_TO_GALLON)
except (ValueError, TypeError):
return None
return usage


class DynamicUnitSensorBase(UsageConversionMixin, CoordinatorEntity, SensorEntity):
"""Base class for sensors with dynamic units."""

def __init__(self, coordinator, entry):
Expand All @@ -50,15 +67,6 @@ def __init__(self, coordinator, entry):
model="Water Meter",
)

def _convert_usage(self, usage):
"""Convert usage based on configuration and native unit."""
if usage is None:
return None
usage_unit = self.coordinator.data.get("usageUnit")
if usage_unit == "CF" and self.coordinator.config_entry.data.get("unit_type") == "G":
return round(float(usage) * CF_TO_GALLON)
return usage

def _get_usage_unit(self):
"""Determine the unit of measurement for usage sensors."""
usage_unit = self.coordinator.data.get("usageUnit")
Expand All @@ -72,22 +80,25 @@ def native_unit_of_measurement(self):
return self._get_usage_unit()


class StaticUnitSensorBase(CoordinatorEntity, SensorEntity):
class StaticUnitSensorBase(UsageConversionMixin, CoordinatorEntity, SensorEntity):
"""Base class for sensors with static units."""

def __init__(self, coordinator, entry, unit):
def __init__(self, coordinator, entry, unit=None, device_class=None):
"""Initialize the static unit sensor base."""
super().__init__(coordinator)
self.coordinator = coordinator
self.entry = entry
self._unique_id = f"{DOMAIN}_{entry.entry_id}"
self._attr_native_unit_of_measurement = unit
self._attr_device_info = DeviceInfo(
identifiers={(DOMAIN, entry.entry_id)},
name=DEFAULT_NAME,
manufacturer="Unknown",
model="Water Meter",
)
if unit:
self._attr_native_unit_of_measurement = unit
if device_class:
self._attr_device_class = device_class


class SensusAnalyticsDailyUsageSensor(DynamicUnitSensorBase):
Expand Down Expand Up @@ -143,8 +154,8 @@ class SensusAnalyticsLastReadSensor(StaticUnitSensorBase):

def __init__(self, coordinator, entry):
"""Initialize the last read sensor."""
super().__init__(coordinator, entry, unit="UTC")
self._attr_name = f"{DEFAULT_NAME} Last Read"
super().__init__(coordinator, entry, unit=None, device_class=SensorDeviceClass.TIMESTAMP)
self._attr_name = f"{DEFAULT_NAME} Last Read (UTC)"
self._attr_unique_id = f"{self._unique_id}_last_read"
self._attr_icon = "mdi:clock-time-nine"

Expand All @@ -154,7 +165,10 @@ def native_value(self):
last_read_ts = self.coordinator.data.get("lastRead")
if last_read_ts:
# Convert milliseconds to seconds for timestamp
return dt_util.utc_from_timestamp(last_read_ts / 1000).strftime("%Y-%m-%d %H:%M:%S")
try:
return dt_util.utc_from_timestamp(last_read_ts / 1000)
except (ValueError, TypeError):
return None
return None


Expand Down Expand Up @@ -228,8 +242,8 @@ class SensusAnalyticsLatestReadTimeSensor(StaticUnitSensorBase):

def __init__(self, coordinator, entry):
"""Initialize the latest read time sensor."""
super().__init__(coordinator, entry, unit="UTC")
self._attr_name = f"{DEFAULT_NAME} Latest Read Time"
super().__init__(coordinator, entry, unit=None, device_class=SensorDeviceClass.TIMESTAMP)
self._attr_name = f"{DEFAULT_NAME} Latest Read Time (UTC)"
self._attr_unique_id = f"{self._unique_id}_latest_read_time"
self._attr_icon = "mdi:clock-time-nine"

Expand All @@ -239,7 +253,10 @@ def native_value(self):
latest_read_time_ts = self.coordinator.data.get("latestReadTime")
if latest_read_time_ts:
# Convert milliseconds to seconds for timestamp
return dt_util.utc_from_timestamp(latest_read_time_ts / 1000).strftime("%Y-%m-%d %H:%M:%S")
try:
return dt_util.utc_from_timestamp(latest_read_time_ts / 1000)
except (ValueError, TypeError):
return None
return None


Expand Down Expand Up @@ -279,15 +296,6 @@ def native_value(self):
usage_gallons = self._convert_usage(usage)
return self._calculate_cost(usage_gallons)

def _convert_usage(self, usage):
"""Convert usage based on the configuration and native unit."""
if usage is None:
return None
usage_unit = self.coordinator.data.get("usageUnit")
if usage_unit == "CF" and self.coordinator.config_entry.data.get("unit_type") == "G":
return round(float(usage) * CF_TO_GALLON)
return usage

def _calculate_cost(self, usage_gallons):
"""Calculate the billing cost based on tiers and service fee."""
tier1_gallons = self.coordinator.config_entry.data.get("tier1_gallons")
Expand All @@ -298,15 +306,16 @@ def _calculate_cost(self, usage_gallons):
service_fee = self.coordinator.config_entry.data.get("service_fee")

cost = service_fee
if usage_gallons <= tier1_gallons:
cost += usage_gallons * tier1_price
elif usage_gallons <= tier1_gallons + tier2_gallons:
cost += tier1_gallons * tier1_price
cost += (usage_gallons - tier1_gallons) * tier2_price
else:
cost += tier1_gallons * tier1_price
cost += tier2_gallons * tier2_price
cost += (usage_gallons - tier1_gallons - tier2_gallons) * tier3_price
if usage_gallons is not None:
if usage_gallons <= tier1_gallons:
cost += usage_gallons * tier1_price
elif usage_gallons <= tier1_gallons + tier2_gallons:
cost += tier1_gallons * tier1_price
cost += (usage_gallons - tier1_gallons) * tier2_price
else:
cost += tier1_gallons * tier1_price
cost += tier2_gallons * tier2_price
cost += (usage_gallons - tier1_gallons - tier2_gallons) * tier3_price

return round(cost, 2)

Expand All @@ -330,15 +339,6 @@ def native_value(self):
usage_gallons = self._convert_usage(usage)
return self._calculate_daily_fee(usage_gallons)

def _convert_usage(self, usage):
"""Convert usage based on the configuration and native unit."""
if usage is None:
return None
usage_unit = self.coordinator.data.get("usageUnit")
if usage_unit == "CF" and self.coordinator.config_entry.data.get("unit_type") == "G":
return round(float(usage) * CF_TO_GALLON)
return usage

def _calculate_daily_fee(self, usage_gallons):
"""Calculate the daily fee based on tiers."""
tier1_gallons = self.coordinator.config_entry.data.get("tier1_gallons")
Expand All @@ -348,14 +348,15 @@ def _calculate_daily_fee(self, usage_gallons):
tier3_price = self.coordinator.config_entry.data.get("tier3_price")

cost = 0
if usage_gallons <= tier1_gallons:
cost += usage_gallons * tier1_price
elif usage_gallons <= tier1_gallons + tier2_gallons:
cost += tier1_gallons * tier1_price
cost += (usage_gallons - tier1_gallons) * tier2_price
else:
cost += tier1_gallons * tier1_price
cost += tier2_gallons * tier2_price
cost += (usage_gallons - tier1_gallons - tier2_gallons) * tier3_price
if usage_gallons is not None:
if usage_gallons <= tier1_gallons:
cost += usage_gallons * tier1_price
elif usage_gallons <= tier1_gallons + tier2_gallons:
cost += tier1_gallons * tier1_price
cost += (usage_gallons - tier1_gallons) * tier2_price
else:
cost += tier1_gallons * tier1_price
cost += tier2_gallons * tier2_price
cost += (usage_gallons - tier1_gallons - tier2_gallons) * tier3_price

return round(cost, 2)

0 comments on commit 70741ce

Please sign in to comment.