diff --git a/custom_components/fordpass/const.py b/custom_components/fordpass/const.py index 5d4b2ef..35320cf 100644 --- a/custom_components/fordpass/const.py +++ b/custom_components/fordpass/const.py @@ -48,10 +48,7 @@ "coolantTemp": {"icon": "mdi:coolant-temperature", "api_key": "engineCoolantTemp" ,"state_class": "measurement", "device_class": "temperature", "measurement": "°C"}, "outsideTemp": {"icon": "mdi:thermometer", "state_class": "measurement", "device_class": "temperature", "api_key": "outsideTemperature", "measurement": "°C"}, "engineOilTemp": {"icon": "mdi:oil-temperature", "state_class": "measurement", "device_class": "temperature", "api_key": "engineOilTemp", "measurement": "°C"}, - # "deepSleepInProgress": { - # "icon": "mdi:power-sleep", - # "name": "Deep Sleep Mode Active", - # }, + "deepSleep": {"icon": "mdi:power-sleep", "name": "Deep Sleep Mode Active", "api_key": "commandPreclusion", "api_class": "states"}, # "firmwareUpgInProgress": { # "icon": "mdi:one-up", # "name": "Firmware Update In Progress", diff --git a/custom_components/fordpass/device_tracker.py b/custom_components/fordpass/device_tracker.py index 959a176..aab76da 100644 --- a/custom_components/fordpass/device_tracker.py +++ b/custom_components/fordpass/device_tracker.py @@ -68,6 +68,8 @@ def extra_state_attributes(self): atts["gpsCoordinateMethod"] = self.coordinator.data["metrics"]["position"]["value"]["gpsCoordinateMethod"] if "gpsDimension" in self.coordinator.data["metrics"]["position"]["value"]: atts["gpsDimension"] = self.coordinator.data["metrics"]["position"]["value"]["gpsDimension"] + atts["compassDirection"] = self.coordinator.data.get("metrics", {}).get("compassDirection", {}).get("value", "Unknown") + return atts @property diff --git a/custom_components/fordpass/fordpass_new.py b/custom_components/fordpass/fordpass_new.py index 0a17e26..cafe84e 100644 --- a/custom_components/fordpass/fordpass_new.py +++ b/custom_components/fordpass/fordpass_new.py @@ -1,754 +1,506 @@ -"""Fordpass API Library""" -import hashlib -import json +"""All vehicle sensors from the accessible by the API""" + import logging -import os -import random -import re -import string -import time -from base64 import urlsafe_b64encode -import requests +from datetime import datetime, timedelta +import json -from requests.adapters import HTTPAdapter -from urllib3.util.retry import Retry +from homeassistant.const import ( + UnitOfTemperature, + UnitOfLength +) +from homeassistant.util import dt -_LOGGER = logging.getLogger(__name__) -defaultHeaders = { - "Accept": "*/*", - "Accept-Language": "en-us", - "User-Agent": "FordPass/23 CFNetwork/1408.0.4 Darwin/22.5.0", - "Accept-Encoding": "gzip, deflate, br", -} - -apiHeaders = { - **defaultHeaders, - "Content-Type": "application/json", -} - -region_lookup = { - "UK&Europe": "1E8C7794-FF5F-49BC-9596-A1E0C86C5B19", - "Australia": "5C80A6BB-CF0D-4A30-BDBF-FC804B5C1A98", - "North America & Canada": "71A3AD0A-CF46-4CCF-B473-FC7FE5BC4592", -} - -NEW_API = True - -BASE_URL = "https://usapi.cv.ford.com/api" -GUARD_URL = "https://api.mps.ford.com/api" -SSO_URL = "https://sso.ci.ford.com" -AUTONOMIC_URL = "https://api.autonomic.ai/v1" -AUTONOMIC_ACCOUNT_URL = "https://accounts.autonomic.ai/v1" - -session = requests.Session() - - -class Vehicle: - # Represents a Ford vehicle, with methods for status and issuing commands - - def __init__( - self, username, password, vin, region, save_token=False, config_location="" - ): - self.username = username - self.password = password - self.save_token = save_token - self.region = region_lookup[region] - self.region2 = region - self.vin = vin - self.token = None - self.expires = None - self.expires_at = None - self.refresh_token = None - self.auto_token = None - self.auto_expires_at = None - self.errors = 0 - retry = Retry(connect=3, backoff_factor=0.5) - adapter = HTTPAdapter(max_retries=retry) - session.mount("http://", adapter) - session.mount("https://", adapter) - if config_location == "": - self.token_location = "custom_components/fordpass/fordpass_token.txt" - else: - _LOGGER.debug(config_location) - self.token_location = config_location - - def base64_url_encode(self, data): - """Encode string to base64""" - return urlsafe_b64encode(data).rstrip(b'=') - - def generate_hash(self, code): - """Generate hash for login""" - hashengine = hashlib.sha256() - hashengine.update(code.encode('utf-8')) - return self.base64_url_encode(hashengine.digest()).decode('utf-8') - - def auth_step1(self): - """Obtain data-ibm-login-url""" - _LOGGER.debug("Running Step1") - try: - headers = { - **defaultHeaders, - 'Content-Type': 'application/json', - } - # _LOGGER.debug("Before") - code1 = ''.join(random.choice(string.ascii_lowercase) for i in range(43)) - code_verifier = self.generate_hash(code1) - url1 = f"{SSO_URL}/v1.0/endpoint/default/authorize?redirect_uri=fordapp://userauthorized&response_type=code&scope=openid&max_age=3600&client_id=9fb503e0-715b-47e8-adfd-ad4b7770f73b&code_challenge={code_verifier}&code_challenge_method=S256" - response = session.get( - url1, - headers=headers, - ) - # _LOGGER.debug(response.text) - # _LOGGER.debug(response.status_code) - if response.status_code != 200: - _LOGGER.debug("Incorrect response from URL") - raise Exception("Response from URL was invalid") - - ibm_url = re.findall('data-ibm-login-url="(.*)"\s', response.text)[0] - _LOGGER.debug("Step 1 Complete") - return {"ibm_url": ibm_url, "code1": code1} - except Exception as ex: - _LOGGER.debug("Step 1 Exception") - _LOGGER.debug(ex) - return None - - def auth_step2(self, ibm_url): - """Login using credentials""" - _LOGGER.debug("Running Step2") - try: - next_url = SSO_URL + ibm_url - headers = { - **defaultHeaders, - "Content-Type": "application/x-www-form-urlencoded", - } - data = { - "operation": "verify", - "login-form-type": "password", - "username": self.username, - "password": self.password - - } - response = session.post( - next_url, - headers=headers, - data=data, - allow_redirects=False - ) - - if response.status_code == 302: - next_url = response.headers["Location"] - _LOGGER.debug("Step 2 Complete") - return next_url - return None - except Exception as ex: - _LOGGER.debug("Step 2 Exception") - _LOGGER.debug(ex) - if response.text is not None: - _LOGGER.debug(response.text) - return None +from homeassistant.components.sensor import ( + SensorEntity, + SensorDeviceClass, + SensorStateClass +) - def auth_step3(self, next_url): - """Obtain code and grant_id""" - _LOGGER.debug("Running Step3") - try: - - headers = { - **defaultHeaders, - 'Content-Type': 'application/json', - } - - response = session.get( - next_url, - headers=headers, - allow_redirects=False - ) - - if response.status_code == 302: - next_url = response.headers["Location"] - query = requests.utils.urlparse(next_url).query - params = dict(x.split('=') for x in query.split('&')) - code = params["code"] - grant_id = params["grant_id"] - _LOGGER.debug("Step 3 Complete") - return {"code": code, "grant_id": grant_id} - response.raise_for_status() - return None - except Exception as ex: - _LOGGER.debug("Step 3 Exception") - _LOGGER.debug(ex) - if response.status_code is not None: - _LOGGER.debug(response.status_code) - if response.headers is not None: - _LOGGER.debug(response.headers) - return None +from . import FordPassEntity +from .const import CONF_DISTANCE_UNIT, CONF_PRESSURE_UNIT, DOMAIN, SENSORS, COORDINATOR - def auth_step4(self, codes, code1): - """Obtain access_token""" - _LOGGER.debug("Running Step4") - try: - grant_id = codes["grant_id"] - code = codes["code"] - headers = { - **defaultHeaders, - "Content-Type": "application/x-www-form-urlencoded", - } - - data = { - "client_id": "9fb503e0-715b-47e8-adfd-ad4b7770f73b", - "grant_type": "authorization_code", - "redirect_uri": 'fordapp://userauthorized', - "grant_id": grant_id, - "code": code, - "code_verifier": code1 - } - - response = session.post( - f"{SSO_URL}/oidc/endpoint/default/token", - headers=headers, - data=data - - ) - - if response.status_code == 200: - result = response.json() - if result["access_token"]: - access_token = result["access_token"] - _LOGGER.debug("Step 4 Complete") - return access_token - response.raise_for_status() - return None - except Exception as ex: - _LOGGER.debug("Step 4 exception") - _LOGGER.debug(ex) - if response.text is not None: - _LOGGER.debug(response.text) - return None - def auth_step5(self, access_token): - """Get tokens""" - _LOGGER.debug("Running Step5") - try: - data = {"ciToken": access_token} - headers = {**apiHeaders, "Application-Id": self.region} - response = session.post( - f"{GUARD_URL}/token/v2/cat-with-ci-access-token", - data=json.dumps(data), - headers=headers, - ) - - if response.status_code == 200: - result = response.json() - - self.token = result["access_token"] - self.refresh_token = result["refresh_token"] - self.expires_at = time.time() + result["expires_in"] - auto_token = self.get_auto_token() - self.auto_token = auto_token["access_token"] - self.auto_expires_at = time.time() + result["expires_in"] - if self.save_token: - result["expiry_date"] = time.time() + result["expires_in"] - result["auto_token"] = auto_token["access_token"] - result["auto_refresh"] = auto_token["refresh_token"] - result["auto_expiry"] = time.time() + auto_token["expires_in"] - - self.write_token(result) - session.cookies.clear() - _LOGGER.debug("Step 5 Complete") - return True - response.raise_for_status() - return False - except Exception as ex: - _LOGGER.debug("Step 5 exception") - _LOGGER.debug(ex) - if response.text is not None: - _LOGGER.debug(response.text) - return False - - def auth(self): - """New Authentication System """ - _LOGGER.debug("New System") - _LOGGER.debug(self.errors) - - # Run Step 1 auth - ibm_urls = self.auth_step1() - - if ibm_urls is None: - self.errors += 1 - if self.errors <= 10: - self.auth() - else: - raise Exception("Step 1 has reached error limit") - - # Run Step 2 auth - login_url = self.auth_step2(ibm_urls["ibm_url"]) - - if login_url is None: - self.errors += 1 - if self.errors <= 10: - self.auth() - else: - raise Exception("Step 2 has reached error limit") - - # Run Step 3 auth - codes = self.auth_step3(login_url) - - if codes is None: - self.errors += 1 - if self.errors <= 10: - self.auth() - else: - raise Exception("Step 3 has reached error limit") - - # Run Step 4 auth - access_tokens = self.auth_step4(codes, ibm_urls["code1"]) - - if access_tokens is None: - self.errors += 1 - if self.errors <= 10: - self.auth() - else: - raise Exception("Step 4 has reached error limit") - - # Run Step 5 auth - success = self.auth_step5(access_tokens) - - if success is False: - self.errors += 1 - if self.errors <= 10: - self.auth() - else: - raise Exception("Step 5 has reached error limit") - else: - self.errors = 0 - return True - return False - - def refresh_token_func(self, token): - """Refresh token if still valid""" - data = {"refresh_token": token["refresh_token"]} - headers = {**apiHeaders, "Application-Id": self.region} - - response = session.post( - f"{GUARD_URL}/token/v2/cat-with-refresh-token", - data=json.dumps(data), - headers=headers, - ) - if response.status_code == 200: - result = response.json() - if self.save_token: - result["expiry_date"] = time.time() + result["expires_in"] - self.write_token(result) - self.token = result["access_token"] - self.refresh_token = result["refresh_token"] - self.expires_at = time.time() + result["expires_in"] - if response.status_code == 401: - _LOGGER.debug("401 response stage 2: refresh stage 1 token") - self.auth() - - def __acquire_token(self): - # Fetch and refresh token as needed - # If file exists read in token file and check it's valid - _LOGGER.debug("Fetching token") - if self.save_token: - if os.path.isfile(self.token_location): - data = self.read_token() - self.token = data["access_token"] - self.refresh_token = data["refresh_token"] - self.expires_at = data["expiry_date"] - if "auto_token" in data and "auto_expiry" in data: - self.auto_token = data["auto_token"] - self.auto_expires_at = data["auto_expiry"] - else: - _LOGGER.debug("AUTO token not set in file") - self.auto_token = None - self.auto_expires_at = None - else: - data = {} - data["access_token"] = self.token - data["refresh_token"] = self.refresh_token - data["expiry_date"] = self.expires_at - data["auto_token"] = self.auto_token - data["auto_expiry"] = self.auto_expires_at - else: - data = {} - data["access_token"] = self.token - data["refresh_token"] = self.refresh_token - data["expiry_date"] = self.expires_at - data["auto_token"] = self.auto_token - data["auto_expiry"] = self.auto_expires_at - _LOGGER.debug(self.auto_token) - _LOGGER.debug(self.auto_expires_at) - if self.auto_token is None or self.auto_expires_at is None: - self.auth() - # self.auto_token = data["auto_token"] - # self.auto_expires_at = data["auto_expiry"] - if self.expires_at: - if time.time() >= self.expires_at: - _LOGGER.debug("No token, or has expired, requesting new token") - self.refresh_token_func(data) - # self.auth() - if self.auto_expires_at: - if time.time() >= self.auto_expires_at: - _LOGGER.debug("Autonomic token expired") - self.auth() - if self.token is None: - _LOGGER.debug("Fetching token4") - # No existing token exists so refreshing library - self.auth() - else: - _LOGGER.debug("Token is valid, continuing") - - def write_token(self, token): - """Save token to file for reuse""" - with open(self.token_location, "w", encoding="utf-8") as outfile: - token["expiry_date"] = time.time() + token["expires_in"] - _LOGGER.debug(token) - json.dump(token, outfile) - - def read_token(self): - """Read saved token from file""" - try: - with open(self.token_location, encoding="utf-8") as token_file: - token = json.load(token_file) - return token - except ValueError: - _LOGGER.debug("Fixing malformed token") - self.auth() - with open(self.token_location, encoding="utf-8") as token_file: - token = json.load(token_file) - return token - - def clear_token(self): - """Clear tokens from config directory""" - if os.path.isfile("/tmp/fordpass_token.txt"): - os.remove("/tmp/fordpass_token.txt") - if os.path.isfile("/tmp/token.txt"): - os.remove("/tmp/token.txt") - if os.path.isfile(self.token_location): - os.remove(self.token_location) - - def get_auto_token(self): - """Get token from new autonomic API""" - _LOGGER.debug("Getting Auto Token") - headers = { - "accept": "*/*", - "content-type": "application/x-www-form-urlencoded" - } - - data = { - "subject_token": self.token, - "subject_issuer": "fordpass", - "client_id": "fordpass-prod", - "grant_type": "urn:ietf:params:oauth:grant-type:token-exchange", - "subject_token_type": "urn:ietf:params:oauth:token-type:jwt", - - } - - r = session.post( - f"{AUTONOMIC_ACCOUNT_URL}/auth/oidc/token", - data=data, - headers=headers - ) +_LOGGER = logging.getLogger(__name__) - if r.status_code == 200: - result = r.json() - _LOGGER.debug(r.status_code) - _LOGGER.debug(r.text) - self.auto_token = result["access_token"] - return result - return False - - def get_status(self): - """Get status from Autonomics endpoint""" - params = {"lrdt": "01-01-1970 00:00:00"} - - headers = { - **apiHeaders, - "auth-token": self.token, - "Application-Id": self.region, - } - _LOGGER.debug("Status function before auto_token") - _LOGGER.debug(self.auto_token) - _LOGGER.debug(self.vin) - - _LOGGER.debug("Trying new vehicle API endpoint") - headers = { - **apiHeaders, - "authorization": f"Bearer {self.auto_token}", - "Application-Id": self.region, - } - r = session.get( - f"{AUTONOMIC_URL}/telemetry/sources/fordpass/vehicles/{self.vin}", params=params, headers=headers - ) - _LOGGER.debug(r.status_code) - return r - - def status(self): - """Get Vehicle status from API""" - _LOGGER.debug("Getting Vehicle Status") - self.__acquire_token() - - if NEW_API: - r = self.get_status() - _LOGGER.debug("NEW API???") - - if r.status_code == 200: - # _LOGGER.debug(r.text) - result = r.json() - return result - if r.status_code == 401: - self.auth() - response = self.get_status() - if response.status_code == 200: - result = response.json() - return result - if r.status_code == 403: - i = 0 - while i < 3: - _LOGGER.debug(f"Retrying Vehicle endpoint attempt {i}") - response = self.get_status() - if response.status_code == 200: - result = response.json() - return result - i += 1 - response.raise_for_status() - return {} - - def get_messages(self): - """Make call to messages API""" - headers = { - **apiHeaders, - "Auth-Token": self.token, - "Application-Id": self.region, - } - response = session.get(f"{GUARD_URL}/messagecenter/v3/messages?", headers=headers) - return response - - def messages(self): - """Get Vehicle messages from API""" - _LOGGER.debug("Getting Messages") - self.__acquire_token() - response = self.get_messages() - if response.status_code == 200: - result = response.json() - return result["result"]["messages"] - # _LOGGER.debug(result) - # _LOGGER.debug(response.text) - if response.status_code == 401: - self.auth() - response = self.get_messages() - if response.status_code == 200: - result = response.json() - return result["result"]["messages"] - return None - def get_vehicles(self): - """Make call to vehicles API""" - _LOGGER.debug("Getting Vehicles") - if self.region2 == "Australia": - countryheader = "AUS" - elif self.region2 == "North America & Canada": - countryheader = "USA" - elif self.region2 == "UK&Europe": - countryheader = "GBR" +async def async_setup_entry(hass, config_entry, async_add_entities): + """Add the Entities from the config.""" + entry = hass.data[DOMAIN][config_entry.entry_id][COORDINATOR] + sensors = [] + for key, value in SENSORS.items(): + sensor = CarSensor(entry, key, config_entry.options) + api_key = value["api_key"] + api_class = value.get("api_class", None) + string = isinstance(api_key, str) + if string and api_key == "messages" or api_key == "lastRefresh": + sensors.append(sensor) + elif string: + if api_key and api_class and api_key in sensor.coordinator.data.get(api_class, {}): + sensors.append(sensor) + continue + if api_key and api_key in sensor.coordinator.data.get("metrics", {}): + sensors.append(sensor) else: - countryheader = "USA" - headers = { - **apiHeaders, - "Auth-Token": self.token, - "Application-Id": self.region, - "Countrycode": countryheader, - "Locale": "EN-US" - } - - data = { - "dashboardRefreshRequest": "All" - } - response = session.post( - f"{GUARD_URL}/expdashboard/v1/details/", - headers=headers, - data=json.dumps(data) + for key in api_key: + if key and key in sensor.coordinator.data.get("metrics", {}): + sensors.append(sensor) + continue + _LOGGER.debug(hass.config.units) + async_add_entities(sensors, True) + + +class CarSensor( + FordPassEntity, + SensorEntity, +): + def __init__(self, coordinator, sensor, options): + + super().__init__( + device_id="fordpass_" + sensor, + name="fordpass_" + sensor, + coordinator=coordinator ) - return response - - def vehicles(self): - """Get vehicle list from account""" - self.__acquire_token() - - response = self.get_vehicles() - - if response.status_code == 207: - result = response.json() - return result - if response.status_code == 401: - self.auth() - response = self.get_vehicles() - if response.status_code == 207: - result = response.json() - return result - if response.status_code == 403: - i = 0 - while i <= 3: - response = self.get_vehicles() - if response.status_code == 207: - result = response.json() - return result - i += 1 + self.sensor = sensor + self.fordoptions = options + self._attr = {} + self.coordinator = coordinator + self.units = coordinator.hass.config.units + self.data = coordinator.data["metrics"] + self.events = coordinator.data["events"] + self.states = coordinator.data["states"] + self._device_id = "fordpass_" + sensor + # Required for HA 2022.7 + self.coordinator_context = object() + + def get_value(self, ftype): + """Get sensor value and attributes from coordinator data""" + self.data = self.coordinator.data["metrics"] + self.events = self.coordinator.data["events"] + self.states = self.coordinator.data["states"] + self.units = self.coordinator.hass.config.units + if ftype == "state": + if self.sensor == "odometer": + return self.data.get("odometer", {}).get("value") + #return self.data.get("odometer", {}).get("value", {}) + if self.sensor == "fuel": + fuel_level = self.data.get("fuelLevel", {}).get("value", 0) + if fuel_level is not None: + return round(fuel_level) + battery_soc = self.data.get("xevBatteryStateOfCharge", {}).get("value", 0) + if battery_soc is not None: + return round(battery_soc) + return None + if self.sensor == "battery": + return round(self.data.get("batteryStateOfCharge", {}).get("value", 0)) + if self.sensor == "oil": + return round(self.data.get("oilLifeRemaining", {}).get("value", 0)) + if self.sensor == "tirePressure": + return self.data.get("tirePressureSystemStatus", [{}])[0].get("value", "Unsupported") + if self.sensor == "gps": + return self.data.get("position", {}).get("value", "Unsupported") + if self.sensor == "alarm": + return self.data.get("alarmStatus", {}).get("value", "Unsupported") + if self.sensor == "ignitionStatus": + return self.data.get("ignitionStatus", {}).get("value", "Unsupported") + if self.sensor == "firmwareUpgInProgress": + return self.data.get("firmwareUpgradeInProgress", {}).get("value", "Unsupported") + if self.sensor == "deepSleep": + state = self.states.get("commandPreclusion", {}).get("value", {}).get("toState", "Unsupported") + if state == "COMMANDS_PRECLUDED": + return "ENABLED" + elif state == "COMMANDS_PERMITTED": + return "DISABLED" + else: + return state + if self.sensor == "doorStatus": + for value in self.data.get("doorStatus", []): + if value["value"] in ["CLOSED", "Invalid", "UNKNOWN"]: + continue + return "Open" + if self.data.get("hoodStatus", {}).get("value") == "OPEN": + return "Open" + return "Closed" + if self.sensor == "windowPosition": + for window in self.data.get("windowStatus", []): + windowrange = window.get("value", {}).get("doubleRange", {}) + if windowrange.get("lowerBound", 0.0) != 0.0 or windowrange.get("upperBound", 0.0) != 0.0: + return "Open" + return "Closed" + if self.sensor == "lastRefresh": + return dt.as_local(dt.parse_datetime(self.coordinator.data.get("updateTime", 0))) + if self.sensor == "elVeh" and "xevBatteryRange" in self.data: + return round(self.data.get("xevBatteryRange", {}).get("value"), 2) + # SquidBytes: Added elVehCharging + if self.sensor == "elVehCharging": + return self.data.get("xevPlugChargerStatus", {}).get("value", "Unsupported") + if self.sensor == "zoneLighting": + return self.data("zoneLighting", {}).get("zoneStatusData", {}).get("value", "Unsupported") + if self.sensor == "remoteStartStatus": + countdown_timer = self.data.get("remoteStartCountdownTimer", {}).get("value", 0) + return "Active" if countdown_timer > 0 else "Inactive" + if self.sensor == "messages": + messages = self.coordinator.data.get("messages") + return len(messages) if messages is not None else None + if self.sensor == "dieselSystemStatus": + return self.data.get("dieselExhaustFilterStatus", {}).get("value", "Unsupported") + if self.sensor == "exhaustFluidLevel": + return self.data.get("dieselExhaustFluidLevel", {}).get("value", "Unsupported") + if self.sensor == "speed": + return self.data.get("speed", {}).get("value", "Unsupported") + if self.sensor == "indicators": + return sum(1 for indicator in self.data.get("indicators", {}).values() if indicator.get("value")) + if self.sensor == "coolantTemp": + return self.data.get("engineCoolantTemp", {}).get("value", "Unsupported") + if self.sensor == "outsideTemp": + return self.data.get("outsideTemperature", {}).get("value", "Unsupported") + if self.sensor == "engineOilTemp": + return self.data.get("engineOilTemp", {}).get("value", "Unsupported") + return None + if ftype == "measurement": + return SENSORS.get(self.sensor, {}).get("measurement", None) + if ftype == "attribute": + if self.sensor == "odometer": + return self.data.get("odometer", {}) + if self.sensor == "outsideTemp": + ambient_temp = self.data.get("ambientTemp", {}).get("value") + if ambient_temp is not None: + return { "Ambient Temp": ambient_temp} + return None + if self.sensor == "fuel": + if "fuelRange" in self.data: + return {"fuelRange" : self.units.length(self.data.get("fuelRange", {}).get("value", 0),UnitOfLength.KILOMETERS)} + if "xevBatteryRange" in self.data: + return {"batteryRange": self.units.length(self.data.get("xevBatteryRange", {}).get("value", 0),UnitOfLength.KILOMETERS)} + if self.sensor == "battery": + return { + "Battery Voltage": self.data.get("batteryVoltage", {}).get("value", 0) + } + if self.sensor == "oil": + return self.data.get("oilLifeRemaining", {}) + if self.sensor == "tirePressure" and "tirePressure" in self.data: + pressure_unit = self.fordoptions.get(CONF_PRESSURE_UNIT) + if pressure_unit == "PSI": + conversion_factor = 0.1450377377 + decimal_places = 0 + elif pressure_unit == "BAR": + conversion_factor = 0.01 + decimal_places = 2 + elif pressure_unit == "kPa": + conversion_factor = 1 + decimal_places = 0 + else: + conversion_factor = 1 + decimal_places = 0 + tire_pressures = {} + for value in self.data["tirePressure"]: + tire_pressures[value["vehicleWheel"]] = round(float(value["value"]) * conversion_factor, decimal_places) + return tire_pressures + if self.sensor == "gps": + return self.data.get("position", {}) + if self.sensor == "alarm": + return self.data.get("alarmStatus", {}) + if self.sensor == "ignitionStatus": + return self.data.get("ignitionStatus", {}) + if self.sensor == "firmwareUpgInProgress": + return self.data.get("firmwareUpgradeInProgress", {}) + if self.sensor == "deepSleep": + return None + if self.sensor == "doorStatus": + doors = {} + for value in self.data.get(self.sensor, []): + if "vehicleSide" in value: + if value['vehicleDoor'] == "UNSPECIFIED_FRONT": + doors[value['vehicleSide']] = value['value'] + else: + doors[value['vehicleDoor']] = value['value'] + else: + doors[value["vehicleDoor"]] = value['value'] + if "hoodStatus" in self.data: + doors["HOOD"] = self.data["hoodStatus"]["value"] + return doors or None + if self.sensor == "windowPosition": + windows = {} + for window in self.data.get("windowStatus", []): + if window["vehicleWindow"] == "UNSPECIFIED_FRONT": + windows[window["vehicleSide"]] = window + else: + windows[window["vehicleWindow"]] = window + return windows + if self.sensor == "lastRefresh": + return None + if self.sensor == "elVeh": + if "xevBatteryRange" not in self.data: + return None + elecs = {} + if "xevBatteryPerformanceStatus" in self.data: + elecs["Battery Performance Status"] = self.data.get("xevBatteryPerformanceStatus", {}).get("value", "Unsupported") + + if "xevBatteryStateOfCharge" in self.data: + elecs["Battery Charge"] = self.data.get("xevBatteryStateOfCharge", {}).get("value", 0) + + if "xevBatteryActualStateOfCharge" in self.data: + elecs["Battery Actual Charge"] = self.data.get("xevBatteryActualStateOfCharge", {}).get("value", 0) + + if "xevBatteryCapacity" in self.data: + elecs["Maximum Battery Capacity"] = self.data.get("xevBatteryCapacity", {}).get("value", 0) + + if "xevBatteryMaximumRange" in self.data: + elecs["Maximum Battery Range"] = self.units.length(self.data.get("xevBatteryMaximumRange", {}).get("value", 0),UnitOfLength.KILOMETERS) + + if "xevBatteryVoltage" in self.data: + elecs["Battery Voltage"] = float(self.data.get("xevBatteryVoltage", {}).get("value", 0)) + batt_volt = elecs["Battery Voltage"] + + if "xevBatteryIoCurrent" in self.data: + elecs["Battery Amperage"] = float(self.data.get("xevBatteryIoCurrent", {}).get("value", 0)) + batt_amps = elecs["Battery Amperage"] + + if batt_volt != 0 and batt_amps != 0: + elecs["Battery kW"] = round((batt_volt * batt_amps) / 1000, 2) + + if "xevTractionMotorVoltage" in self.data: + elecs["Motor Voltage"] = float(self.data.get("xevTractionMotorVoltage", {}).get("value", 0)) + motor_volt = elecs["Motor Voltage"] + + if "xevTractionMotorCurrent" in self.data: + elecs["Motor Amperage"] = float(self.data.get("xevTractionMotorCurrent", {}).get("value", 0)) + motor_amps = elecs["Motor Amperage"] + + # This will make Motor kW not display if vehicle is not in use. Not sure if that is bad practice + if motor_volt != 0 and motor_amps != 0: + elecs["Motor kW"] = round((motor_volt * motor_amps) / 1000, 2) + + # tripXevBatteryChargeRegenerated should be a previous FordPass feature called "Driving Score". A % based on how much regen vs brake you use + if "tripXevBatteryChargeRegenerated" in self.data: + elecs["Trip Driving Score"] = self.data.get("tripXevBatteryChargeRegenerated", {}).get("value", 0) + + if "tripXevBatteryRangeRegenerated" in self.data: + elecs["Trip Range Regenerated"] = self.units.length(self.data.get("tripXevBatteryRangeRegenerated", {}).get("value", 0),UnitOfLength.KILOMETERS) + + if "customEvents" in self.events: + tripDataStr = self.events.get("customEvents", {}).get("xev-key-off-trip-segment-data", {}).get("oemData", {}).get("trip_data", {}).get("stringArrayValue", []) + for dataStr in tripDataStr: + tripData = json.loads(dataStr) + if "ambient_temperature" in tripData: + elecs["Trip Ambient Temp"] = self.units.temperature(tripData["ambient_temperature"], UnitOfTemperature.CELSIUS) + if "outside_air_ambient_temperature" in tripData: + elecs["Trip Outside Air Ambient Temp"] = self.units.temperature(tripData["outside_air_ambient_temperature"], UnitOfTemperature.CELSIUS) + if "trip_duration" in tripData: + elecs["Trip Duration"] = tripData["trip_duration"] / 3600 + if "cabin_temperature" in tripData: + elecs["Trip Cabin Temp"] = self.units.temperature(tripData["cabin_temperature"], UnitOfTemperature.CELSIUS) + if "energy_consumed" in tripData: + elecs["Trip Energy Consumed"] = round(tripData["energy_consumed"] / 1000, 2) + if "distance_traveled" in tripData: + elecs["Trip Distance Traveled"] = self.units.length(tripData["distance_traveled"], UnitOfLength.KILOMETERS) + if ( + "energy_consumed" in tripData + and tripData["energy_consumed"] is not None + and "distance_traveled" in tripData + and tripData["distance_traveled"] is not None + ): + elecs["Trip Efficiency"] = elecs["Trip Distance Traveled"] / elecs["Trip Energy Consumed"] + return elecs + # SquidBytes: Added elVehCharging + if self.sensor == "elVehCharging": + if "xevPlugChargerStatus" not in self.data: + return None + cs = {} + + if "xevPlugChargerStatus" in self.data: + cs["Plug Status"] = self.data.get("xevPlugChargerStatus", {}).get("value", "Unsupported") + + if "xevChargeStationCommunicationStatus" in self.data: + cs["Charging Station Status"] = self.data.get("xevChargeStationCommunicationStatus", {}).get("value", "Unsupported") + + if "xevBatteryChargeDisplayStatus" in self.data: + cs["Charging Status"] = self.data.get("xevBatteryChargeDisplayStatus", {}).get("value", "Unsupported") + + if "xevChargeStationPowerType" in self.data: + cs["Charging Type"] = self.data.get("xevChargeStationPowerType", {}).get("value", "Unsupported") + + # if "tripXevBatteryDistanceAccumulated" in self.data: + # cs["Distance Accumulated"] = self.units.length(self.data.get("tripXevBatteryDistanceAccumulated", {}).get("value", 0),UnitOfLength.KILOMETERS) + + if "xevBatteryChargerVoltageOutput" in self.data: + cs["Charging Voltage"] = float(self.data.get("xevBatteryChargerVoltageOutput", {}).get("value", 0)) + ch_volt = cs["Charging Voltage"] + + if "xevBatteryChargerCurrentOutput" in self.data: + cs["Charging Amperage"] = float(self.data.get("xevBatteryChargerCurrentOutput", {}).get("value", 0)) + ch_amps = cs["Charging Amperage"] + + # This will make Charging kW not display if vehicle is not charging. Not sure if that is bad practice by having it pop in and out + if ch_volt != 0 and ch_amps != 0: + cs["Charging kW"] = round((ch_volt * ch_amps) / 1000, 2) + + if "xevBatteryTemperature" in self.data: + cs["Battery Temperature"] = self.units.temperature(self.data.get("xevBatteryTemperature", {}).get("value", 0), UnitOfTemperature.CELSIUS) + + if "xevBatteryStateOfCharge" in self.data: + cs["State of Charge"] = self.data.get("xevBatteryStateOfCharge", {}).get("value", 0) + + if "xevBatteryTimeToFullCharge" in self.data: + cs_update_time = dt.parse_datetime(self.data.get("xevBatteryTimeToFullCharge", {}).get("updateTime", 0)) + cs_est_end_time = cs_update_time + timedelta(minutes=self.data.get("xevBatteryTimeToFullCharge", {}).get("value", 0)) + cs["Estimated End Time"] = dt.as_local(cs_est_end_time) + + return cs + + if self.sensor == "zoneLighting": + if "zoneLighting" not in self.data: + return None + if ( + self.data[self.sensor] is not None and self.data[self.sensor]["zoneStatusData"] is not None + ): + zone = {} + if self.data[self.sensor]["zoneStatusData"] is not None: + for key, value in self.data[self.sensor][ + "zoneStatusData" + ].items(): + zone["zone_" + key] = value["value"] + + if ( + self.data[self.sensor]["lightSwitchStatusData"] + is not None + ): + for key, value in self.data[self.sensor][ + "lightSwitchStatusData" + ].items(): + if value is not None: + zone[key] = value["value"] + + if ( + self.data[self.sensor]["zoneLightingFaultStatus"] + is not None + ): + zone["zoneLightingFaultStatus"] = self.data[ + self.sensor + ]["zoneLightingFaultStatus"]["value"] + if ( + self.data[self.sensor][ + "zoneLightingShutDownWarning" + ] + is not None + ): + zone["zoneLightingShutDownWarning"] = self.data[ + self.sensor + ]["zoneLightingShutDownWarning"]["value"] + return zone + return None + if self.sensor == "remoteStartStatus": + return {"Countdown:": self.data.get("remoteStartCountdownTimer", {}).get("value", 0)} + if self.sensor == "messages": + messages = {} + for value in self.coordinator.data.get("messages", []): + messages[value["messageSubject"]] = value["createdDate"] + return messages + if self.sensor == "dieselSystemStatus": + if self.data.get("indicators", {}).get("dieselExhaustOverTemp", {}).get("value") is not None: + return { + "Diesel Exhaust Over Temp": self.data["indicators"]["dieselExhaustOverTemp"]["value"] + } + return None + if self.sensor == "exhaustFluidLevel": + exhaustdata = {} + if self.data.get("dieselExhaustFluidLevelRangeRemaining", {}).get("value") is not None: + exhaustdata["Exhaust Fluid Range"] = self.data["dieselExhaustFluidLevelRangeRemaining"]["value"] + if self.data.get("indicators", {}).get("dieselExhaustFluidLow", {}).get("value") is not None: + exhaustdata["Exhaust Fluid Low"] = self.data["indicators"]["dieselExhaustFluidLow"]["value"] + if self.data.get("indicators", {}).get("dieselExhaustFluidSystemFault", {}).get("value") is not None: + exhaustdata["Exhaust Fluid System Fault"] = self.data["indicators"]["dieselExhaustFluidSystemFault"]["value"] + return exhaustdata or None + if self.sensor == "speed": + attribs = {} + if "acceleratorPedalPosition" in self.data: + attribs["acceleratorPedalPosition"] = self.data["acceleratorPedalPosition"]["value"] + if "brakePedalStatus" in self.data: + attribs["brakePedalStatus"] = self.data["brakePedalStatus"]["value"] + if "brakeTorque" in self.data: + attribs["brakeTorque"] = self.data["brakeTorque"]["value"] + if "engineSpeed" in self.data and "xevBatteryVoltage" not in self.data: + attribs["engineSpeed"] = self.data["engineSpeed"]["value"] + if "gearLeverPosition" in self.data: + attribs["gearLeverPosition"] = self.data["gearLeverPosition"]["value"] + if "parkingBrakeStatus" in self.data: + attribs["parkingBrakeStatus"] = self.data["parkingBrakeStatus"]["value"] + if "torqueAtTransmission" in self.data: + attribs["torqueAtTransmission"] = self.data["torqueAtTransmission"]["value"] + if "tripFuelEconomy" in self.data and "xevBatteryVoltage" not in self.data: + attribs["tripFuelEconomy"] = self.data["tripFuelEconomy"]["value"] + return attribs or None + if self.sensor == "indicators": + alerts = {} + for key, value in self.data.get("indicators", {}).items(): + if value.get("value") is not None: + alerts[key] = value["value"] + return alerts or None return None - def guard_status(self): - """Retrieve guard status from API""" - self.__acquire_token() - params = {"lrdt": "01-01-1970 00:00:00"} - headers = { - **apiHeaders, - "auth-token": self.token, - "Application-Id": self.region, - } - - response = session.get( - f"{GUARD_URL}/guardmode/v1/{self.vin}/session", - params=params, - headers=headers, - ) - return response.json() - - def start(self): - """ - Issue a start command to the engine - """ - return self.__request_and_poll_command("remoteStart") - - def stop(self): - """ - Issue a stop command to the engine - """ - return self.__request_and_poll_command("cancelRemoteStart") - - def lock(self): - """ - Issue a lock command to the doors - """ - return self.__request_and_poll_command("lock") - - def unlock(self): - """ - Issue an unlock command to the doors - """ - return self.__request_and_poll_command("unlock") - - def enable_guard(self): - """ - Enable Guard mode on supported models - """ - self.__acquire_token() - - response = self.__make_request( - "PUT", f"{GUARD_URL}/guardmode/v1/{self.vin}/session", None, None - ) - _LOGGER.debug(response.text) - return response - - def disable_guard(self): - """ - Disable Guard mode on supported models - """ - self.__acquire_token() - response = self.__make_request( - "DELETE", f"{GUARD_URL}/guardmode/v1/{self.vin}/session", None, None - ) - _LOGGER.debug(response.text) - return response - - def request_update(self, vin=""): - """Send request to vehicle for update""" - self.__acquire_token() - if vin: - vinnum = vin - else: - vinnum = self.vin - status = self.__request_and_poll_command("statusRefresh", vinnum) - return status - - def __make_request(self, method, url, data, params): - """ - Make a request to the given URL, passing data/params as needed - """ - - headers = { - **apiHeaders, - "auth-token": self.token, - "Application-Id": self.region, - } - - return getattr(requests, method.lower())( - url, headers=headers, data=data, params=params - ) + @property + def name(self): + """Return Sensor Name""" + return "fordpass_" + self.sensor + + # @property + # def state(self): + # """Return Sensor State""" + # return self.get_value("state") + + @property + def device_id(self): + """Return Sensor Device ID""" + return self.device_id + + @property + def extra_state_attributes(self): + """Return sensor attributes""" + return self.get_value("attribute") + + @property + def native_unit_of_measurement(self): + """Return sensor measurement""" + return self.get_value("measurement") + + @property + def native_value(self): + """Return Native Value""" + return self.get_value("state") + + @property + def icon(self): + """Return sensor icon""" + return SENSORS[self.sensor]["icon"] + + @property + def state_class(self): + """Return sensor state_class for statistics""" + if "state_class" in SENSORS[self.sensor]: + if SENSORS[self.sensor]["state_class"] == "total": + return SensorStateClass.TOTAL + if SENSORS[self.sensor]["state_class"] == "measurement": + return SensorStateClass.MEASUREMENT + if SENSORS[self.sensor]["state_class"] == "total_increasing": + return SensorStateClass.TOTAL_INCREASING + return None + return None - def __request_and_poll_command(self, command, vin=None): - """Send command to the new Command endpoint""" - self.__acquire_token() - headers = { - **apiHeaders, - "Application-Id": self.region, - "authorization": f"Bearer {self.auto_token}" - } - - data = { - "properties": {}, - "tags": {}, - "type": command, - "wakeUp": True - } - if vin is None: - r = session.post( - f"{AUTONOMIC_URL}/command/vehicles/{self.vin}/commands", - data=json.dumps(data), - headers=headers - ) - else: - r = session.post( - f"{AUTONOMIC_URL}/command/vehicles/{self.vin}/commands", - data=json.dumps(data), - headers=headers - ) - - _LOGGER.debug("Testing command") - _LOGGER.debug(r.status_code) - _LOGGER.debug(r.text) - if r.status_code == 201: - # New code to hanble checking states table from vehicle data - response = r.json() - command_id = response["id"] - # current_status = response["currentStatus"] - i = 1 - while i < 14: - # Check status every 10 seconds for 90 seconds until command completes or time expires - status = self.status() - _LOGGER.debug("STATUS") - _LOGGER.debug(status) - - if "states" in status: - _LOGGER.debug("States located") - if f"{command}Command" in status["states"]: - _LOGGER.debug("Found command") - _LOGGER.debug(status["states"][f"{command}Command"]["commandId"]) - if status["states"][f"{command}Command"]["commandId"] == command_id: - _LOGGER.debug("Making progress") - _LOGGER.debug(status["states"][f"{command}Command"]) - if status["states"][f"{command}Command"]["value"]["toState"] == "success": - _LOGGER.debug("Command succeeded") - return True - if status["states"][f"{command}Command"]["value"]["toState"] == "expired": - _LOGGER.debug("Command expired") - return False - i += 1 - _LOGGER.debug("Looping again") - time.sleep(10) - # time.sleep(90) - return False - return False + @property + def device_class(self): + """Return sensor device class for statistics""" + if "device_class" in SENSORS[self.sensor]: + if SENSORS[self.sensor]["device_class"] == "distance": + return SensorDeviceClass.DISTANCE + if SENSORS[self.sensor]["device_class"] == "timestamp": + return SensorDeviceClass.TIMESTAMP + if SENSORS[self.sensor]["device_class"] == "temperature": + return SensorDeviceClass.TEMPERATURE + if SENSORS[self.sensor]["device_class"] == "battery": + return SensorDeviceClass.BATTERY + if SENSORS[self.sensor]["device_class"] == "speed": + return SensorDeviceClass.SPEED + return None diff --git a/custom_components/fordpass/manifest.json b/custom_components/fordpass/manifest.json index acda366..6d7236d 100644 --- a/custom_components/fordpass/manifest.json +++ b/custom_components/fordpass/manifest.json @@ -12,6 +12,6 @@ "loggers": ["custom_components.fordpass"], "requirements": [], "ssdp": [], - "version": "0.1.59", + "version": "0.1.60", "zeroconf": [] } \ No newline at end of file diff --git a/custom_components/fordpass/sensor.py b/custom_components/fordpass/sensor.py index e979cbd..bb2e488 100644 --- a/custom_components/fordpass/sensor.py +++ b/custom_components/fordpass/sensor.py @@ -31,10 +31,14 @@ async def async_setup_entry(hass, config_entry, async_add_entities): for key, value in SENSORS.items(): sensor = CarSensor(entry, key, config_entry.options) api_key = value["api_key"] + api_class = value.get("api_class", None) string = isinstance(api_key, str) if string and api_key == "messages" or api_key == "lastRefresh": sensors.append(sensor) elif string: + if api_key and api_class and api_key in sensor.coordinator.data.get(api_class, {}): + sensors.append(sensor) + continue if api_key and api_key in sensor.coordinator.data.get("metrics", {}): sensors.append(sensor) else: @@ -65,6 +69,7 @@ def __init__(self, coordinator, sensor, options): self.units = coordinator.hass.config.units self.data = coordinator.data["metrics"] self.events = coordinator.data["events"] + self.states = coordinator.data["states"] self._device_id = "fordpass_" + sensor # Required for HA 2022.7 self.coordinator_context = object() @@ -73,6 +78,7 @@ def get_value(self, ftype): """Get sensor value and attributes from coordinator data""" self.data = self.coordinator.data["metrics"] self.events = self.coordinator.data["events"] + self.states = self.coordinator.data["states"] self.units = self.coordinator.hass.config.units if ftype == "state": if self.sensor == "odometer": @@ -145,6 +151,14 @@ def get_value(self, ftype): return self.data.get("outsideTemperature", {}).get("value", "Unsupported") if self.sensor == "engineOilTemp": return self.data.get("engineOilTemp", {}).get("value", "Unsupported") + if self.sensor == "deepSleepInProgress": + state = self.states.get("commandPreclusion", {}).get("value", {}).get("toState", "Unsupported") + if state == "COMMANDS_PRECLUDED": + return "ACTIVE" + elif state == "COMMANDS_PERMITTED": + return "DISABLED" + else: + return state return None if ftype == "measurement": return SENSORS.get(self.sensor, {}).get("measurement", None) diff --git a/info.md b/info.md index b362148..057d580 100644 --- a/info.md +++ b/info.md @@ -1,4 +1,8 @@ ## **Changelog** +### Version 1.60 +- Deepsleep status is now reported again as a sensor +- Compass Direction is now an attribute under the device_tracker entity + ### Version 1.59 - Add support for manual VIN entry (Lincoln cars hopefuly) - Please test this and report any errrors back! - Fix for lastRefresh sensor not returning local time