From 87525f891a054d419e881a95bb84374c96e10559 Mon Sep 17 00:00:00 2001 From: steve Date: Tue, 14 Nov 2023 20:54:21 +1000 Subject: [PATCH] Added initial code for new auth flow --- custom_components/fordpass/fordpass_new.py | 250 ++++++++++++++++++--- custom_components/fordpass/manifest.json | 2 +- info.md | 4 + 3 files changed, 220 insertions(+), 36 deletions(-) diff --git a/custom_components/fordpass/fordpass_new.py b/custom_components/fordpass/fordpass_new.py index 0a17e26..5eba17c 100644 --- a/custom_components/fordpass/fordpass_new.py +++ b/custom_components/fordpass/fordpass_new.py @@ -17,7 +17,14 @@ defaultHeaders = { "Accept": "*/*", "Accept-Language": "en-us", - "User-Agent": "FordPass/23 CFNetwork/1408.0.4 Darwin/22.5.0", + "User-Agent": "FordPass/26 CFNetwork/1485 Darwin/23.1.0", + "Accept-Encoding": "gzip, deflate, br", +} + +loginHeaders = { + "Accept": "*/*", + "Accept-Language": "en-us", + "User-Agent": "Mozilla/5.0 (iPhone; CPU iPhone OS 17_0 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/17.1 Mobile/15E148 Safari/604.1", "Accept-Encoding": "gzip, deflate, br", } @@ -32,6 +39,19 @@ "North America & Canada": "71A3AD0A-CF46-4CCF-B473-FC7FE5BC4592", } +locale_lookup = { + "UK&Europe": "EN-GB", + "Australia": "EN-AU", + "North America & Canada": "EN-US", +} + +locale_short_lookup = { + "UK&Europe": "GB", + "Australia": "AUS", + "North America & Canada": "USA", +} + + NEW_API = True BASE_URL = "https://usapi.cv.ford.com/api" @@ -39,6 +59,7 @@ SSO_URL = "https://sso.ci.ford.com" AUTONOMIC_URL = "https://api.autonomic.ai/v1" AUTONOMIC_ACCOUNT_URL = "https://accounts.autonomic.ai/v1" +FORD_LOGIN_URL = "https://login.ford.com" session = requests.Session() @@ -53,6 +74,8 @@ def __init__( self.password = password self.save_token = save_token self.region = region_lookup[region] + self.country_code = locale_lookup[region] + self.short_code = locale_short_lookup[region] self.region2 = region self.vin = vin self.token = None @@ -81,6 +104,149 @@ def generate_hash(self, code): hashengine = hashlib.sha256() hashengine.update(code.encode('utf-8')) return self.base64_url_encode(hashengine.digest()).decode('utf-8') + + def auth2_step1(self): + """Auth2 step 1 obtain tokens""" + _LOGGER.debug("Running Step1 new!") + headers = { + **loginHeaders, + } + code1 = ''.join(random.choice(string.ascii_lowercase) for i in range(43)) + code_verifier = self.generate_hash(code1) + step1_session = requests.session() + step1_url = f"{FORD_LOGIN_URL}/4566605f-43a7-400a-946e-89cc9fdb0bd7/B2C_1A_SignInSignUp_en-AU/oauth2/v2.0/authorize?redirect_uri=fordapp://userauthorized&response_type=code&max_age=3600&scope=%2009852200-05fd-41f6-8c21-d36d3497dc64%20openid&client_id=09852200-05fd-41f6-8c21-d36d3497dc64&code_challenge={code_verifier}&code_challenge_method=S256&ui_locales={self.country_code}&language_code={self.country_code}&country_code={self.short_code}&ford_application_id=5C80A6BB-CF0D-4A30-BDBF-FC804B5C1A98" + step1get = step1_session.get( + step1_url, + headers=headers, + ) + + step1get.raise_for_status() + + #_LOGGER.debug(step1_session.text) + pattern = r'var SETTINGS = (\{[^;]*\});' + #_LOGGER.debug(step1get.text) + match = re.search(pattern, step1get.text) + transId = None + csrfToken = None + if match: + settings = match.group(1) + settings_json = json.loads(settings) + _LOGGER.debug(settings_json) + _LOGGER.debug(settings_json["transId"]) + transId = settings_json["transId"] + csrfToken = settings_json["csrf"] + _LOGGER.debug(step1get.status_code) + _LOGGER.debug(step1_session.cookies.get_dict()) + data = { + "request_type": "RESPONSE", + "signInName": self.username, + "password": self.password, + } + urlp = f"{FORD_LOGIN_URL}/4566605f-43a7-400a-946e-89cc9fdb0bd7/B2C_1A_SignInSignUp_en-AU/SelfAsserted?tx={transId}&p=B2C_1A_SignInSignUp_en-AU" + _LOGGER.debug(urlp) + headers = { + **loginHeaders, + "Origin": "https://login.ford.com", + "Referer": step1_url, + "X-Csrf-Token": csrfToken + } + step1post = step1_session.post( + urlp, + headers=headers, + data=data + ) + step1post.raise_for_status() + _LOGGER.debug("checking password") + _LOGGER.debug(step1post.text) + _LOGGER.debug(step1post.status_code) + cookie_dict = step1_session.cookies.get_dict() + _LOGGER.debug(cookie_dict) + + + + + step1pt2 = step1_session.get( + f"{FORD_LOGIN_URL}/4566605f-43a7-400a-946e-89cc9fdb0bd7/B2C_1A_SignInSignUp_en-AU/api/CombinedSigninAndSignup/confirmed?rememberMe=false&csrf_token={csrfToken}", + headers=headers, + allow_redirects=False, + ) + step1pt2.raise_for_status() + + test = step1pt2.headers["Location"] + _LOGGER.debug(test) + + code_new = test.replace("fordapp://userauthorized/?code=","") + + _LOGGER.debug(code_new) + + data = { + "client_id" : "09852200-05fd-41f6-8c21-d36d3497dc64", + "grant_type": "authorization_code", + "code_verifier": code1, + "code": code_new, + "redirect_uri": "fordapp://userauthorized" + + } + + step1pt3 = step1_session.post( + f"{FORD_LOGIN_URL}/4566605f-43a7-400a-946e-89cc9fdb0bd7/B2C_1A_SignInSignUp_en-AU/oauth2/v2.0/token", + headers=headers, + data=data + ) + step1pt3.raise_for_status() + + _LOGGER.debug(step1pt3.status_code) + _LOGGER.debug(step1pt3.text) + + tokens = step1pt3.json() + if tokens: + if self.auth2_step2(tokens): + return tokens + else: + _LOGGER.debug("DAM IT WENT WRONG") + + + + + + + + + + def auth2_step2(self, result): + _LOGGER.debug(result) + + data = {"idpToken": result["access_token"]} + headers = {**apiHeaders, "Application-Id": self.region} + response = session.post( + f"{GUARD_URL}/token/v2/cat-with-b2c-access-token", + data=json.dumps(data), + headers=headers, + ) + response.raise_for_status() + _LOGGER.debug(response.status_code) + _LOGGER.debug(response.text) + result = response.json() + self.token = result["access_token"] + _LOGGER.debug(self.token) + self.refresh_token = result["refresh_token"] + self.expires_at = time.time() + result["expires_in"] + _LOGGER.debug(self.expires_at) + auto_token = self.get_auto_token() + _LOGGER.debug("AUTO 2") + self.auto_token = auto_token["access_token"] + self.auto_expires_at = time.time() + result["expires_in"] + if self.save_token: + result["expiry_date"] = time.time() + result["expires_in"] + result["auto_token"] = auto_token["access_token"] + result["auto_refresh"] = "" #auto_token["refresh_token"] + result["auto_expiry"] = time.time() + auto_token["expires_in"] + + self.write_token(result) + session.cookies.clear() + _LOGGER.debug("Step 5 Complete") + return True + def auth_step1(self): """Obtain data-ibm-login-url""" @@ -93,13 +259,13 @@ def auth_step1(self): # _LOGGER.debug("Before") code1 = ''.join(random.choice(string.ascii_lowercase) for i in range(43)) code_verifier = self.generate_hash(code1) - url1 = f"{SSO_URL}/v1.0/endpoint/default/authorize?redirect_uri=fordapp://userauthorized&response_type=code&scope=openid&max_age=3600&client_id=9fb503e0-715b-47e8-adfd-ad4b7770f73b&code_challenge={code_verifier}&code_challenge_method=S256" + url1 = f"https://login.ford.com/4566605f-43a7-400a-946e-89cc9fdb0bd7/B2C_1A_SignInSignUp_en-AU/oauth2/v2.0/authorize?redirect_uri=fordapp://userauthorized&response_type=code&scope=%2009852200-05fd-41f6-8c21-d36d3497dc64%20openid&max_age=3600&client_id=09852200-05fd-41f6-8c21-d36d3497dc64&code_challenge={code_verifier}&code_challenge_method=S256&ui_locales=en-AU&language_code=en-AU&country_code=AUS&ford_application_id=5C80A6BB-CF0D-4A30-BDBF-FC804B5C1A98" response = session.get( url1, headers=headers, ) - # _LOGGER.debug(response.text) - # _LOGGER.debug(response.status_code) + _LOGGER.debug(response.text) + _LOGGER.debug(response.status_code) if response.status_code != 200: _LOGGER.debug("Incorrect response from URL") raise Exception("Response from URL was invalid") @@ -271,54 +437,56 @@ def auth(self): _LOGGER.debug(self.errors) # Run Step 1 auth - ibm_urls = self.auth_step1() + access_tokens = self.auth2_step1() + # ibm_urls = self.auth_step1() - if ibm_urls is None: - self.errors += 1 - if self.errors <= 10: - self.auth() - else: - raise Exception("Step 1 has reached error limit") - # Run Step 2 auth - login_url = self.auth_step2(ibm_urls["ibm_url"]) + # if ibm_urls is None: + # self.errors += 1 + # if self.errors <= 10: + # self.auth() + # else: + # raise Exception("Step 1 has reached error limit") - if login_url is None: - self.errors += 1 - if self.errors <= 10: - self.auth() - else: - raise Exception("Step 2 has reached error limit") + # # Run Step 2 auth + # login_url = self.auth_step2(ibm_urls["ibm_url"]) - # Run Step 3 auth - codes = self.auth_step3(login_url) + # if login_url is None: + # self.errors += 1 + # if self.errors <= 10: + # self.auth() + # else: + # raise Exception("Step 2 has reached error limit") - if codes is None: - self.errors += 1 - if self.errors <= 10: - self.auth() - else: - raise Exception("Step 3 has reached error limit") + # # Run Step 3 auth + # codes = self.auth_step3(login_url) + + # if codes is None: + # self.errors += 1 + # if self.errors <= 10: + # self.auth() + # else: + # raise Exception("Step 3 has reached error limit") - # Run Step 4 auth - access_tokens = self.auth_step4(codes, ibm_urls["code1"]) + # # Run Step 4 auth + # access_tokens = self.auth_step4(codes, ibm_urls["code1"]) if access_tokens is None: self.errors += 1 if self.errors <= 10: self.auth() else: - raise Exception("Step 4 has reached error limit") + raise Exception("Step 1 has reached error limit") # Run Step 5 auth - success = self.auth_step5(access_tokens) - + #success = self.auth_step5(access_tokens) + success = self.auth2_step2(access_tokens) if success is False: self.errors += 1 if self.errors <= 10: self.auth() else: - raise Exception("Step 5 has reached error limit") + raise Exception("Step 2 has reached error limit") else: self.errors = 0 return True @@ -326,6 +494,7 @@ def auth(self): def refresh_token_func(self, token): """Refresh token if still valid""" + _LOGGER.debug("Refreshing token") data = {"refresh_token": token["refresh_token"]} headers = {**apiHeaders, "Application-Id": self.region} @@ -445,11 +614,14 @@ def get_auto_token(self): } + _LOGGER.debug(data) + r = session.post( f"{AUTONOMIC_ACCOUNT_URL}/auth/oidc/token", data=data, headers=headers ) + _LOGGER.debug(r.text) if r.status_code == 200: result = r.json() @@ -494,8 +666,9 @@ def status(self): _LOGGER.debug("NEW API???") if r.status_code == 200: - # _LOGGER.debug(r.text) + #_LOGGER.debug(r.text) result = r.json() + return result if r.status_code == 401: self.auth() @@ -744,7 +917,14 @@ def __request_and_poll_command(self, command, vin=None): _LOGGER.debug("Command succeeded") return True if status["states"][f"{command}Command"]["value"]["toState"] == "expired": - _LOGGER.debug("Command expired") + _LOGGER.warning(f"Fordpass Command: {status.get('states', {}).get(f'{command}Command', {}).get('message', 'Expired Status')}") + if "statusRefresh" in command: + raise exceptions.HomeAssistantError(f"Fordpass Command: {status.get('states', {}).get(f'{command}Command', {}).get('message', 'Expired Status')}") + return False + if status["states"][f"{command}Command"]["value"]["toState"] == "failed": + _LOGGER.warning(f"Fordpass Command: {status.get('states', {}).get(f'{command}Command', {}).get('message', 'Failed Status')}") + if "statusRefresh" in command: + raise exceptions.HomeAssistantError(f"Fordpass Command: {status.get('states', {}).get(f'{command}Command', {}).get('message', 'Failed Status')}") return False i += 1 _LOGGER.debug("Looping again") diff --git a/custom_components/fordpass/manifest.json b/custom_components/fordpass/manifest.json index 4290edd..ba3c203 100644 --- a/custom_components/fordpass/manifest.json +++ b/custom_components/fordpass/manifest.json @@ -12,6 +12,6 @@ "loggers": ["custom_components.fordpass"], "requirements": [], "ssdp": [], - "version": "0.1.61", + "version": "0.1.63", "zeroconf": [] } \ No newline at end of file diff --git a/info.md b/info.md index 3ec526a..2d30f80 100644 --- a/info.md +++ b/info.md @@ -1,4 +1,8 @@ ## **Changelog** +### Version 1.63 - Beta +- Reworked authentication to use login.ford.com +### Version 1.62 +- Skipped due to emergency release of auth changes ### Version 1.61 - Deepsleep status is now reported again as a sensor - Compass Direction is now an attribute under the device_tracker entity