From 9291444a449a6993255dda564914ccd9b605e256 Mon Sep 17 00:00:00 2001 From: Juan Pablo de Castro Date: Wed, 5 Jul 2023 17:27:26 +0200 Subject: [PATCH] Add H388X to routers compatibility list. --- README.md | 108 ++--- custom_components/zte_tracker/manifest.json | 20 +- .../zte_tracker/zteclient/_version.py | 4 +- .../zte_tracker/zteclient/zte_client.py | 422 +++++++++--------- 4 files changed, 279 insertions(+), 275 deletions(-) diff --git a/README.md b/README.md index 9ee961f..b7229f2 100644 --- a/README.md +++ b/README.md @@ -1,53 +1,55 @@ -# ZTE Router Integration for Home Assistant -Component to integrate some ZTE routers as a device trackers in home assistant. - -## Features -- Provides a device_tracker to monitor the connection status of devices in your Wifi and LAN ports. -- Exposes the status of the scanner in "sensor.zte_tracker". -- Exposes the service "zte_tracker.pause" to pause/resume the scanner because when the scanner is running the web-admin-console sessions are cancelled. - -## Compatible routers -| Name | Model | -| ------------- |:-------------: | -| ZTE F6640 | F6640 | -| ZTE H288A | H288A | -| ZTE H169A | H169A | - -This integration could work with more routers. Try one of the above and see if it work with yours. - -## Installation - -To use this integration, place the following snippet in configuration.yaml. - - -``` -# Setup the platform zte_tracker -zte_tracker: - host: 192.168.1.1 - model: F6640 - username: user - password: !secret zte_password - interval_seconds: 60 - consider_home: 180 - poll_time: 60 - new_device_defaults: - track_new_devices: no -``` -Change the following parameters to match your configuration: - -`host`: Your router's local IP address (Usually 192.168.1.1 or 192.168.0.1) - -`username`: Your router's login username (Usually admin) - -`password`: Your router's login password - -`model`: Your router's model. Chose one from the Model column of the [table above](#compatible-routers) - - -For more information about the device_tracker parameters visit the official [Home Assistant Documentation](https://www.home-assistant.io/integrations/device_tracker/) - -## Contributors - -- Thanks to @gselivanof for H288A, H169A models. - -[![hacs_badge](https://img.shields.io/badge/HACS-Custom-41BDF5.svg?style=for-the-badge)](https://github.com/hacs/integration) +# ZTE Router Integration for Home Assistant +Component to integrate some ZTE routers as a device trackers in home assistant. + +## Features +- Provides a device_tracker to monitor the connection status of devices in your Wifi and LAN ports. +- Exposes the status of the scanner in "sensor.zte_tracker". +- Exposes the service "zte_tracker.pause" to pause/resume the scanner because when the scanner is running the web-admin-console sessions are cancelled. + +## Compatible routers +| Name | Model Param | +| ------------- |:-------------: | +| ZTE F6640 | F6640 | +| ZTE H288A | H288A | +| ZTE H169A | H169A | +| ZTE H388X | H388X | + + +This integration could work with more routers. Try one of the above and see if it work with yours. + +## Installation + +To use this integration, place the following snippet in configuration.yaml. + + +``` +# Setup the platform zte_tracker +zte_tracker: + host: 192.168.1.1 + model: F6640 + username: user + password: !secret zte_password + interval_seconds: 60 + consider_home: 180 + poll_time: 60 + new_device_defaults: + track_new_devices: no +``` +Change the following parameters to match your configuration: + +`host`: Your router's local IP address (Usually 192.168.1.1 or 192.168.0.1) + +`username`: Your router's login username (Usually admin) + +`password`: Your router's login password + +`model`: Your router's model. Chose one from the Model column of the [table above](#compatible-routers) + + +For more information about the device_tracker parameters visit the official [Home Assistant Documentation](https://www.home-assistant.io/integrations/device_tracker/) + +## Contributors + +- Thanks to @gselivanof for H288A, H169A models support, @TrinTragula for H388X verification. + +[![hacs_badge](https://img.shields.io/badge/HACS-Custom-41BDF5.svg?style=for-the-badge)](https://github.com/hacs/integration) diff --git a/custom_components/zte_tracker/manifest.json b/custom_components/zte_tracker/manifest.json index 8351839..287bed5 100644 --- a/custom_components/zte_tracker/manifest.json +++ b/custom_components/zte_tracker/manifest.json @@ -1,10 +1,10 @@ -{ - "domain": "zte_tracker", - "name": "ZTE router tracker", - "documentation": "https://raw.githubusercontent.com/juacas/zte_tracker/master/README.md", - "codeowners": ["@juacas"], - "issue_tracker": "https://github.com/juacas/zte_tracker/issues", - "version": "v1.3.1", - "requirements": [], - "iot_class": "local_polling" -} +{ + "domain": "zte_tracker", + "name": "ZTE router tracker", + "documentation": "https://raw.githubusercontent.com/juacas/zte_tracker/master/README.md", + "codeowners": ["@juacas"], + "issue_tracker": "https://github.com/juacas/zte_tracker/issues", + "version": "v1.3.2", + "requirements": [], + "iot_class": "local_polling" +} diff --git a/custom_components/zte_tracker/zteclient/_version.py b/custom_components/zte_tracker/zteclient/_version.py index 7d47ca4..c300312 100644 --- a/custom_components/zte_tracker/zteclient/_version.py +++ b/custom_components/zte_tracker/zteclient/_version.py @@ -1,2 +1,2 @@ -__version__ = '1.2.0' -__releasedate__ = '20230129' +__version__ = '1.2.1' +__releasedate__ = '20230705' diff --git a/custom_components/zte_tracker/zteclient/zte_client.py b/custom_components/zte_tracker/zteclient/zte_client.py index c5ec931..67dd45f 100644 --- a/custom_components/zte_tracker/zteclient/zte_client.py +++ b/custom_components/zte_tracker/zteclient/zte_client.py @@ -1,210 +1,212 @@ -from array import array -import logging -import re -import json -import hashlib -import time -from requests import Session -import xml.etree.ElementTree as ET - -_LOGGER = logging.getLogger(__name__) -_MODELS = { - 'F6640': { - 'wlan_script': 'wlan_client_stat_lua.lua', - 'wlan_id_element': 'OBJ_WLAN_AD_ID', - 'lan_script': 'accessdev_landevs_lua.lua', - 'lan_id_element': 'OBJ_ACCESSDEV_ID', - }, - 'H288A': { - 'wlan_script': 'accessdev_ssiddev_lua.lua', - 'wlan_id_element': 'OBJ_ACCESSDEV_ID', - 'lan_script': 'accessdev_landevs_lua.lua', - 'lan_id_element': 'OBJ_ACCESSDEV_ID'} -} -# Synonym for H288A -_MODELS['H196A'] = _MODELS['H288A'] - -class zteClient: - def __init__(self, host, username, password,model): - """Initialize the client.""" - self.statusmsg = None - self.host = host - self.username = username - self.password = password - self.session = None - self.login_data = None - self.status = 'on' - self.device_info = None - self.guid = int(time.time()*1000) - self.model = model - self.paths = _MODELS[model] - - # REBOOT THE ROUTER - def reboot(self) -> bool: - if not self.login: - return False - # REBOOT REQUEST - _LOGGER.info("Requesting reboot") - try: - raise Exception("Not implemented") - except Exception as e: - _LOGGER.error('Failed to reboot: {0}'.format(e)) - return False - finally: - self.logout() - - # LOGIN PROCEDURE - def login(self) -> bool: - """ - Login procedure using ZTE challenge - :return: true if the login has succeeded - """ - try: - self.session = Session() - self.session.headers.update({'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/80.0.3987.132 Safari/537.36'}) - self.session.headers.update({'DNT': '1'}) - self.session.cookies.set('_TESTCOOKIESUPPORT', '1') - - # Step1: Get session token. - session_token = self.get_session_token() - - # Step2: query for login token. - r = self.session.get('http://{0}/?_type=loginData&_tag=login_token&_={1}'.format(self.host, self.get_guid() ), verify=False) - self.log_request(r) - # parse XML response. - xml_response = ET.fromstring(r.content) - assert xml_response.tag == 'ajax_response_xml_root', 'Unexpected response ' + xml_response.text - login_token = xml_response.text - assert login_token, 'Empty login_token' - - # Step3: Login entry - pass_hash = self.password + login_token - password_param = hashlib.sha256(pass_hash.encode()).hexdigest() - - r = self.session.post("http://{0}/?_type=loginData&_tag=login_entry".format(self.host),verify=False, - data= { "action": "login", "Password": password_param, "Username": self.username, "_sessionTOKEN": session_token }) - self.log_request(r) - self.login_data = r.json() - # if login need refresh make a new request. - if self.login_data['login_need_refresh'] == 1: - _LOGGER.debug("REFRESH") - # r = self.session.get('http://{0}/'.format(self.host), verify=False) - # self.log_request(r) - self.statusmsg = None - - return True - except Exception as e: - self.statusmsg = 'Failed login: {0}'.format(e) - _LOGGER.error(self.statusmsg) - self.login_data = None - self.session.close() - return False - - def get_guid(self): - guid = self.guid - self.guid +=1 - return guid - - def get_session_token(self): - r = self.session.get('http://{0}/?_type=loginData&_tag=login_entry'.format(self.host),verify=False) - self.log_request(r) - self.status = 'on' - device_info = r.json() - assert device_info['lockingTime'] == 0 and device_info['sess_token'], 'Empty sess_token. Device locked?' - session_token = device_info['sess_token'] - return session_token - - ## LOGOUT ## - def logout(self): - try: - if self.login_data is None: - return False - - r = self.session.post('http://{0}?_type=loginData&_tag=logout_entry'.format(self.host), - data={'IF_LogOff':'1'}, verify=False) - self.log_request(r) - - assert r.ok, r - _LOGGER.debug("Logged out") - except Exception as e: - _LOGGER.error('Failed to logout: {0}'.format(e)) - finally: - self.session.close() - self.login_data = None - - def get_devices_response(self): - """ - Get the list of devices - """ - lan_devices = self.get_lan_devices() - wifi_devices = self.get_wifi_devices() - devices = wifi_devices + lan_devices - return devices - - def get_lan_devices(self): - """ - Get the list of devices connected to the LAN ports - :return: list of devices - """ - # GET DEVICES RESPONSE from http://10.0.0.1/?_type=menuData&_tag=accessdev_homepage_lua.lua&InstNum=5&_=1663922344910 - try: - r= self.session.get('http://{0}/?_type=menuView&_tag=localNetStatus&_={1}'.format(self.host, self.get_guid()),verify=False) - lan_request = 'http://{0}/?_type=menuData&_tag={1}&_{2}'.format(self.host, self.paths['lan_script'], self.get_guid()); - r= self.session.get(lan_request, verify=False) - self.log_request(r) - devices = self.parse_devices(r.text, self.paths['lan_id_element'], 'LAN') - self.statusmsg = 'OK' - return devices - except Exception as e: - self.statusmsg = 'Failed to get LAN devices: {0}'.format(e) - _LOGGER.error(self.statusmsg) - return [] - - - def get_wifi_devices(self): - """ - Get the list of devices connected to the wifi - :return: list of devices - """ - # GET DEVICES RESPONSE - try: - r= self.session.get('http://{0}/?_type=menuView&_tag=localNetStatus&_={1}'.format(self.host, self.get_guid()),verify=False) - wlan_request = 'http://{0}/?_type=menuData&_tag={1}&_={2}'.format(self.host, self.paths['wlan_script'],self.get_guid()) - r= self.session.get(wlan_request,verify=False) - self.log_request(r) - devices = self.parse_devices(r.text, self.paths['wlan_id_element'], 'WLAN') - - self.statusmsg = 'OK' - except Exception as e: - self.statusmsg = 'Failed to get Devices: {0} rdev {2}'.format(e, r.content) - _LOGGER.error(self.statusmsg) - return [] - - return devices - - def log_request(self, r): - _LOGGER.debug(r.request.url) - _LOGGER.debug(r.request.headers) - _LOGGER.debug(r.text[0:200]) - - # Parse xml response to get devices - def parse_devices(self, xml_response, node_name='OBJ_WLAN_AD_ID', network_type='WLAN'): - """Parse the xml response and return a list of devices.""" - devices = [] - xml = ET.fromstring(xml_response) - assert xml.tag == 'ajax_response_xml_root', 'Unexpected response ' + xml_response - - for device in xml.findall(f'{node_name}/Instance'): - device_info = {'Active': True, "IconType": None, "NetworkType": network_type } - for i in range(0, int(len(device)/2)): - paramname = device[i*2].text - paramvalue = device[i*2+1].text - if paramname == 'MACAddress': - device_info['MACAddress'] = paramvalue - elif paramname == 'IPAddress': - device_info['IPAddress'] = paramvalue - elif paramname == 'HostName': - device_info['HostName'] = paramvalue - devices.append(device_info) - return devices - +from array import array +import logging +import re +import json +import hashlib +import time +from requests import Session +import xml.etree.ElementTree as ET + +_LOGGER = logging.getLogger(__name__) +_MODELS = { + 'F6640': { + 'wlan_script': 'wlan_client_stat_lua.lua', + 'wlan_id_element': 'OBJ_WLAN_AD_ID', + 'lan_script': 'accessdev_landevs_lua.lua', + 'lan_id_element': 'OBJ_ACCESSDEV_ID', + }, + 'H288A': { + 'wlan_script': 'accessdev_ssiddev_lua.lua', + 'wlan_id_element': 'OBJ_ACCESSDEV_ID', + 'lan_script': 'accessdev_landevs_lua.lua', + 'lan_id_element': 'OBJ_ACCESSDEV_ID'} +} +# Synonym H196A is like H288A +_MODELS['H196A'] = _MODELS['H288A'] +# Synonym H388X is like H288A +_MODELS['H388X'] = _MODELS['H288A'] + +class zteClient: + def __init__(self, host, username, password,model): + """Initialize the client.""" + self.statusmsg = None + self.host = host + self.username = username + self.password = password + self.session = None + self.login_data = None + self.status = 'on' + self.device_info = None + self.guid = int(time.time()*1000) + self.model = model + self.paths = _MODELS[model] + + # REBOOT THE ROUTER + def reboot(self) -> bool: + if not self.login: + return False + # REBOOT REQUEST + _LOGGER.info("Requesting reboot") + try: + raise Exception("Not implemented") + except Exception as e: + _LOGGER.error('Failed to reboot: {0}'.format(e)) + return False + finally: + self.logout() + + # LOGIN PROCEDURE + def login(self) -> bool: + """ + Login procedure using ZTE challenge + :return: true if the login has succeeded + """ + try: + self.session = Session() + self.session.headers.update({'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/80.0.3987.132 Safari/537.36'}) + self.session.headers.update({'DNT': '1'}) + self.session.cookies.set('_TESTCOOKIESUPPORT', '1') + + # Step1: Get session token. + session_token = self.get_session_token() + + # Step2: query for login token. + r = self.session.get('http://{0}/?_type=loginData&_tag=login_token&_={1}'.format(self.host, self.get_guid() ), verify=False) + self.log_request(r) + # parse XML response. + xml_response = ET.fromstring(r.content) + assert xml_response.tag == 'ajax_response_xml_root', 'Unexpected response ' + xml_response.text + login_token = xml_response.text + assert login_token, 'Empty login_token' + + # Step3: Login entry + pass_hash = self.password + login_token + password_param = hashlib.sha256(pass_hash.encode()).hexdigest() + + r = self.session.post("http://{0}/?_type=loginData&_tag=login_entry".format(self.host),verify=False, + data= { "action": "login", "Password": password_param, "Username": self.username, "_sessionTOKEN": session_token }) + self.log_request(r) + self.login_data = r.json() + # if login need refresh make a new request. + if self.login_data['login_need_refresh'] == 1: + _LOGGER.debug("REFRESH") + # r = self.session.get('http://{0}/'.format(self.host), verify=False) + # self.log_request(r) + self.statusmsg = None + + return True + except Exception as e: + self.statusmsg = 'Failed login: {0}'.format(e) + _LOGGER.error(self.statusmsg) + self.login_data = None + self.session.close() + return False + + def get_guid(self): + guid = self.guid + self.guid +=1 + return guid + + def get_session_token(self): + r = self.session.get('http://{0}/?_type=loginData&_tag=login_entry'.format(self.host),verify=False) + self.log_request(r) + self.status = 'on' + device_info = r.json() + assert device_info['lockingTime'] == 0 and device_info['sess_token'], 'Empty sess_token. Device locked?' + session_token = device_info['sess_token'] + return session_token + + ## LOGOUT ## + def logout(self): + try: + if self.login_data is None: + return False + + r = self.session.post('http://{0}?_type=loginData&_tag=logout_entry'.format(self.host), + data={'IF_LogOff':'1'}, verify=False) + self.log_request(r) + + assert r.ok, r + _LOGGER.debug("Logged out") + except Exception as e: + _LOGGER.error('Failed to logout: {0}'.format(e)) + finally: + self.session.close() + self.login_data = None + + def get_devices_response(self): + """ + Get the list of devices + """ + lan_devices = self.get_lan_devices() + wifi_devices = self.get_wifi_devices() + devices = wifi_devices + lan_devices + return devices + + def get_lan_devices(self): + """ + Get the list of devices connected to the LAN ports + :return: list of devices + """ + # GET DEVICES RESPONSE from http://10.0.0.1/?_type=menuData&_tag=accessdev_homepage_lua.lua&InstNum=5&_=1663922344910 + try: + r= self.session.get('http://{0}/?_type=menuView&_tag=localNetStatus&_={1}'.format(self.host, self.get_guid()),verify=False) + lan_request = 'http://{0}/?_type=menuData&_tag={1}&_{2}'.format(self.host, self.paths['lan_script'], self.get_guid()); + r= self.session.get(lan_request, verify=False) + self.log_request(r) + devices = self.parse_devices(r.text, self.paths['lan_id_element'], 'LAN') + self.statusmsg = 'OK' + return devices + except Exception as e: + self.statusmsg = 'Failed to get LAN devices: {0}'.format(e) + _LOGGER.error(self.statusmsg) + return [] + + + def get_wifi_devices(self): + """ + Get the list of devices connected to the wifi + :return: list of devices + """ + # GET DEVICES RESPONSE + try: + r= self.session.get('http://{0}/?_type=menuView&_tag=localNetStatus&_={1}'.format(self.host, self.get_guid()),verify=False) + wlan_request = 'http://{0}/?_type=menuData&_tag={1}&_={2}'.format(self.host, self.paths['wlan_script'],self.get_guid()) + r= self.session.get(wlan_request,verify=False) + self.log_request(r) + devices = self.parse_devices(r.text, self.paths['wlan_id_element'], 'WLAN') + + self.statusmsg = 'OK' + except Exception as e: + self.statusmsg = 'Failed to get Devices: {0} rdev {2}'.format(e, r.content) + _LOGGER.error(self.statusmsg) + return [] + + return devices + + def log_request(self, r): + _LOGGER.debug(r.request.url) + _LOGGER.debug(r.request.headers) + _LOGGER.debug(r.text[0:200]) + + # Parse xml response to get devices + def parse_devices(self, xml_response, node_name='OBJ_WLAN_AD_ID', network_type='WLAN'): + """Parse the xml response and return a list of devices.""" + devices = [] + xml = ET.fromstring(xml_response) + assert xml.tag == 'ajax_response_xml_root', 'Unexpected response ' + xml_response + + for device in xml.findall(f'{node_name}/Instance'): + device_info = {'Active': True, "IconType": None, "NetworkType": network_type } + for i in range(0, int(len(device)/2)): + paramname = device[i*2].text + paramvalue = device[i*2+1].text + if paramname == 'MACAddress': + device_info['MACAddress'] = paramvalue + elif paramname == 'IPAddress': + device_info['IPAddress'] = paramvalue + elif paramname == 'HostName': + device_info['HostName'] = paramvalue + devices.append(device_info) + return devices +