diff --git a/src/dbt/adapters/fabricspark/livysession.py b/src/dbt/adapters/fabricspark/livysession.py index 3efcfea..733ee01 100644 --- a/src/dbt/adapters/fabricspark/livysession.py +++ b/src/dbt/adapters/fabricspark/livysession.py @@ -1,5 +1,6 @@ from __future__ import annotations +import os import datetime as dt import json import re @@ -25,12 +26,12 @@ livysession_credentials: FabricSparkCredentials +SESSION_ID_FILEPATH = "session_id.txt" DEFAULT_POLL_WAIT = 10 DEFAULT_POLL_STATEMENT_WAIT = 5 AZURE_CREDENTIAL_SCOPE = "https://analysis.windows.net/powerbi/api/.default" accessToken: AccessToken = None - def is_token_refresh_necessary(unixTimestamp: int) -> bool: # Convert to datetime object dt_object = dt.datetime.fromtimestamp(unixTimestamp) @@ -151,42 +152,45 @@ def __exit__( return True def create_session(self, data) -> str: - # Create sessions - response = None - logger.debug("Creating Livy session (this may take a few minutes)") - try: - response = requests.post( - self.connect_url + "/sessions", - data=json.dumps(data), - headers=get_headers(self.credential, False), - ) - if response.status_code == 200: - logger.debug("Initiated Livy Session...") - response.raise_for_status() - except requests.exceptions.ConnectionError as c_err: - raise Exception("Connection Error :", c_err.response.json()) - except requests.exceptions.HTTPError as h_err: - raise Exception("Http Error: ", h_err.response.json()) - except requests.exceptions.Timeout as t_err: - raise Exception("Timeout Error: ", t_err.response.json()) - except requests.exceptions.RequestException as a_err: - raise Exception("Authorization Error: ", a_err.response.json()) - except Exception as ex: - raise Exception(ex) from ex - - if response is None: - raise Exception("Invalid response from Livy server") - - self.session_id = None - try: - self.session_id = str(response.json()["id"]) - except requests.exceptions.JSONDecodeError as json_err: - raise Exception("Json decode error to get session_id") from json_err + self.load_session_id() + if not self.session_id: + # Create sessions + response = None + logger.debug("Creating Livy session (this may take a few minutes)") + try: + response = requests.post( + self.connect_url + "/sessions", + data=json.dumps(data), + headers=get_headers(self.credential, False), + ) + if response.status_code == 200: + logger.debug("Initiated Livy Session...") + response.raise_for_status() + except requests.exceptions.ConnectionError as c_err: + raise Exception("Connection Error :", c_err.response.json()) + except requests.exceptions.HTTPError as h_err: + raise Exception("Http Error: ", h_err.response.json()) + except requests.exceptions.Timeout as t_err: + raise Exception("Timeout Error: ", t_err.response.json()) + except requests.exceptions.RequestException as a_err: + raise Exception("Authorization Error: ", a_err.response.json()) + except Exception as ex: + raise Exception(ex) from ex + + if response is None: + raise Exception("Invalid response from Livy server") + try: + self.session_id = str(response.json()["id"]) + except requests.exceptions.JSONDecodeError as json_err: + raise Exception("Json decode error to get session_id") from json_err # Wait for the session to start self.wait_for_session_start() logger.debug("Livy session created successfully") + logger.debug(f"Saving Livy session ID '{self.session_id}' to '{SESSION_ID_FILEPATH}' for reuse in future executions.") + self.save_session_id() + return self.session_id def wait_for_session_start(self) -> None: @@ -208,6 +212,7 @@ def wait_for_session_start(self) -> None: def delete_session(self) -> None: + self.delete_session_file() try: # delete the session_id _ = requests.delete( @@ -216,6 +221,7 @@ def delete_session(self) -> None: ) if _.status_code == 200: logger.debug(f"Closed the livy session: {self.session_id}") + else: response.raise_for_status() @@ -223,6 +229,8 @@ def delete_session(self) -> None: logger.error(f"Unable to close the livy session {self.session_id}, error: {ex}") def is_valid_session(self) -> bool: + self.load_session_id() + if self.session_id is None: logger.error("Session ID is None") return False @@ -235,6 +243,29 @@ def is_valid_session(self) -> bool: invalid_states = ["dead", "shutting_down", "killed"] return res["livyInfo"]["currentState"] not in invalid_states + def save_session_id(self) -> None: + """Save the session ID to a text file for future reuse.""" + logger.debug(f"Saving Livy session {self.session_id} to {SESSION_ID_FILEPATH}, in order to reuse it in future execution") + with open(SESSION_ID_FILEPATH, "w") as f: + f.write(self.session_id) + + def load_session_id(self) -> None: + """Load the session ID from the saved file if it exists.""" + if os.path.exists(SESSION_ID_FILEPATH): + with open(SESSION_ID_FILEPATH, "r") as f: + self.session_id = f.read().strip() + logger.debug(f"Loaded Livy session ID: {self.session_id}") + else: + logger.warning(f"{SESSION_ID_FILEPATH} does not exist. Cannot load session ID.") + self.session_id = None + + def delete_session_file(self) -> None: + """Delete the session file if it exists.""" + try: + os.remove(SESSION_ID_FILEPATH) + logger.debug(f"Deleted {SESSION_ID_FILEPATH}") + except FileNotFoundError: + logger.debug(f"{SESSION_ID_FILEPATH} not found. Skipping deletion.") # cursor object - wrapped for livy API class LivyCursor: @@ -529,8 +560,9 @@ def disconnect() -> None: LivySessionManager.livy_global_session is not None and LivySessionManager.livy_global_session.is_valid_session() ): - LivySessionManager.livy_global_session.delete_session() - LivySessionManager.livy_global_session.is_new_session_required = True + logger.error(f"Skip delete session") + # LivySessionManager.livy_global_session.delete_session() + # LivySessionManager.livy_global_session.is_new_session_required = True else: logger.debug("No session to disconnect")