Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
100 changes: 66 additions & 34 deletions src/dbt/adapters/fabricspark/livysession.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from __future__ import annotations

import os
import datetime as dt
import json
import re
Expand All @@ -25,12 +26,12 @@

livysession_credentials: FabricSparkCredentials

SESSION_ID_FILEPATH = "session_id.txt"
DEFAULT_POLL_WAIT = 10
DEFAULT_POLL_STATEMENT_WAIT = 5
AZURE_CREDENTIAL_SCOPE = "https://analysis.windows.net/powerbi/api/.default"
accessToken: AccessToken = None


def is_token_refresh_necessary(unixTimestamp: int) -> bool:
# Convert to datetime object
dt_object = dt.datetime.fromtimestamp(unixTimestamp)
Expand Down Expand Up @@ -151,42 +152,45 @@ def __exit__(
return True

def create_session(self, data) -> str:
# Create sessions
response = None
logger.debug("Creating Livy session (this may take a few minutes)")
try:
response = requests.post(
self.connect_url + "/sessions",
data=json.dumps(data),
headers=get_headers(self.credential, False),
)
if response.status_code == 200:
logger.debug("Initiated Livy Session...")
response.raise_for_status()
except requests.exceptions.ConnectionError as c_err:
raise Exception("Connection Error :", c_err.response.json())
except requests.exceptions.HTTPError as h_err:
raise Exception("Http Error: ", h_err.response.json())
except requests.exceptions.Timeout as t_err:
raise Exception("Timeout Error: ", t_err.response.json())
except requests.exceptions.RequestException as a_err:
raise Exception("Authorization Error: ", a_err.response.json())
except Exception as ex:
raise Exception(ex) from ex

if response is None:
raise Exception("Invalid response from Livy server")

self.session_id = None
try:
self.session_id = str(response.json()["id"])
except requests.exceptions.JSONDecodeError as json_err:
raise Exception("Json decode error to get session_id") from json_err
self.load_session_id()
if not self.session_id:
# Create sessions
response = None
logger.debug("Creating Livy session (this may take a few minutes)")
try:
response = requests.post(
self.connect_url + "/sessions",
data=json.dumps(data),
headers=get_headers(self.credential, False),
)
if response.status_code == 200:
logger.debug("Initiated Livy Session...")
response.raise_for_status()
except requests.exceptions.ConnectionError as c_err:
raise Exception("Connection Error :", c_err.response.json())
except requests.exceptions.HTTPError as h_err:
raise Exception("Http Error: ", h_err.response.json())
except requests.exceptions.Timeout as t_err:
raise Exception("Timeout Error: ", t_err.response.json())
except requests.exceptions.RequestException as a_err:
raise Exception("Authorization Error: ", a_err.response.json())
except Exception as ex:
raise Exception(ex) from ex

if response is None:
raise Exception("Invalid response from Livy server")
try:
self.session_id = str(response.json()["id"])
except requests.exceptions.JSONDecodeError as json_err:
raise Exception("Json decode error to get session_id") from json_err

# Wait for the session to start
self.wait_for_session_start()

logger.debug("Livy session created successfully")
logger.debug(f"Saving Livy session ID '{self.session_id}' to '{SESSION_ID_FILEPATH}' for reuse in future executions.")
self.save_session_id()

return self.session_id

def wait_for_session_start(self) -> None:
Expand All @@ -208,6 +212,7 @@ def wait_for_session_start(self) -> None:

def delete_session(self) -> None:

self.delete_session_file()
try:
# delete the session_id
_ = requests.delete(
Expand All @@ -216,13 +221,16 @@ def delete_session(self) -> None:
)
if _.status_code == 200:
logger.debug(f"Closed the livy session: {self.session_id}")

else:
response.raise_for_status()

except Exception as ex:
logger.error(f"Unable to close the livy session {self.session_id}, error: {ex}")

def is_valid_session(self) -> bool:
self.load_session_id()

if self.session_id is None:
logger.error("Session ID is None")
return False
Expand All @@ -235,6 +243,29 @@ def is_valid_session(self) -> bool:
invalid_states = ["dead", "shutting_down", "killed"]
return res["livyInfo"]["currentState"] not in invalid_states

def save_session_id(self) -> None:
"""Save the session ID to a text file for future reuse."""
logger.debug(f"Saving Livy session {self.session_id} to {SESSION_ID_FILEPATH}, in order to reuse it in future execution")
with open(SESSION_ID_FILEPATH, "w") as f:
f.write(self.session_id)

def load_session_id(self) -> None:
"""Load the session ID from the saved file if it exists."""
if os.path.exists(SESSION_ID_FILEPATH):
with open(SESSION_ID_FILEPATH, "r") as f:
self.session_id = f.read().strip()
logger.debug(f"Loaded Livy session ID: {self.session_id}")
else:
logger.warning(f"{SESSION_ID_FILEPATH} does not exist. Cannot load session ID.")
self.session_id = None

def delete_session_file(self) -> None:
"""Delete the session file if it exists."""
try:
os.remove(SESSION_ID_FILEPATH)
logger.debug(f"Deleted {SESSION_ID_FILEPATH}")
except FileNotFoundError:
logger.debug(f"{SESSION_ID_FILEPATH} not found. Skipping deletion.")

# cursor object - wrapped for livy API
class LivyCursor:
Expand Down Expand Up @@ -529,8 +560,9 @@ def disconnect() -> None:
LivySessionManager.livy_global_session is not None
and LivySessionManager.livy_global_session.is_valid_session()
):
LivySessionManager.livy_global_session.delete_session()
LivySessionManager.livy_global_session.is_new_session_required = True
logger.error(f"Skip delete session")
# LivySessionManager.livy_global_session.delete_session()
# LivySessionManager.livy_global_session.is_new_session_required = True
else:
logger.debug("No session to disconnect")

Expand Down