diff --git a/setup.cfg b/setup.cfg index e6f2845..02cfbd2 100644 --- a/setup.cfg +++ b/setup.cfg @@ -1,6 +1,6 @@ [metadata] name = sas-airflow-provider -version = 0.0.11 +version = 0.0.12 author = SAS author_email = andrew.shakinovsky@sas.com description = Enables execution of Studio Flows and Jobs from Airflow diff --git a/src/sas_airflow_provider/operators/sas_studio.py b/src/sas_airflow_provider/operators/sas_studio.py index 3823b82..ccda2a8 100644 --- a/src/sas_airflow_provider/operators/sas_studio.py +++ b/src/sas_airflow_provider/operators/sas_studio.py @@ -24,7 +24,7 @@ from airflow.exceptions import AirflowException from airflow.models import BaseOperator from sas_airflow_provider.hooks.sas import SasHook -from sas_airflow_provider.util.util import dump_logs, create_or_connect_to_session, end_compute_session +from sas_airflow_provider.util.util import dump_logs, stream_log, create_or_connect_to_session, end_compute_session # main API URI for Code Gen URI_BASE = "/studioDevelopment/code" @@ -145,7 +145,8 @@ def __init__( self.on_success_callback=[on_success] self.on_failure_callback=[on_failure] self.on_retry_callback=[on_retry] - + + def execute(self, context): if self.path_type not in ['compute', 'content', 'raw']: raise AirflowFailException("Path type is invalid. Valid values are 'compute', 'content' or 'raw'") @@ -199,19 +200,14 @@ def execute(self, context): except Exception as e: raise AirflowException(f"SASStudioOperator error: {str(e)}") - - # Kick off the JES job. + # Kick off the JES job, wait to get the state + # _run_job_and_wait will poll for new + # SAS log-lines and stream them in the DAG'-log job, success = self._run_job_and_wait(jr, 10) - job_state = job["state"] - - # display logs if needed - if self.exec_log is True: - # Safeguard if we are unable to retreive the log. We will NOT throw any exceptions - try: - dump_logs(self.connection, job) - except Exception as e: - self.log.info("Unable to retrieve log. Maybe the log is too large.") - + job_state= "unknown" + if "state" in job: + job_state = job["state"] + # set output variables if success and self.output_macro_var_prefix and self.compute_session_id: try: @@ -219,7 +215,6 @@ def execute(self, context): except Exception as e: raise AirflowException(f"SASStudioOperator error: {str(e)}") - # raise exception in Airflow if SAS Studio Flow ended execution with "failed" "canceled" or "timed out" state # support retry for 'failed' (typically there is an ERROR in the log) and 'timed out' # do NOT support retry for 'canceled' (typically the SAS Job called ABORT ABEND) @@ -336,6 +331,7 @@ def _run_job_and_wait(self, job_request: dict, poll_interval: int) -> (dict, boo state = "unknown" countUnknownState = 0 log_location = None + num_log_lines= 0 while state in ["pending", "running"] or (state == "unknown" and ((countUnknownState*poll_interval) <= self.unknown_state_timeout)): time.sleep(poll_interval) @@ -348,12 +344,16 @@ def _run_job_and_wait(self, job_request: dict, poll_interval: int) -> (dict, boo else: countUnknownState = 0 job = response.json() - state = job["state"] - if state == "running" and log_location == None: - # Print the log location to the DAG-log, in case the user needs access to the SAS-log while it is running. - if "logLocation" in job: - log_location=job["logLocation"]; - self.log.info(f"While the job is running, the SAS-log formated as JSON can be found at URI: {log_location}?limit=9999999") + if "state" in job: + state = job["state"] + else: + self.log.info(f'Not able to determine state from {uri}. Will set state=unknown and continue checking...') + state = "unknown" + + # Get the latest new log lines. + if self.exec_log and state != "unknown": + num_log_lines=stream_log(self.connection, job, num_log_lines) + except Exception as e: countUnknownState = countUnknownState + 1 self.log.info(f'HTTP Call failed with error "{e}". Will set state=unknown and continue checking...') @@ -363,6 +363,10 @@ def _run_job_and_wait(self, job_request: dict, poll_interval: int) -> (dict, boo # Raise AirflowFailException as we don't know if the job is still running raise AirflowFailException(f'Unable to retrieve state of job after trying {countUnknownState} times. Will mark task as failed. Please check the SAS-log.') + # Be sure to Get the latest new log lines after the job have finished. + if self.exec_log: + num_log_lines=stream_log(self.connection, job, num_log_lines) + self.log.info("Job request has completed execution with the status: " + str(state)) success = True if state in ['failed', 'canceled', 'timed out']: diff --git a/src/sas_airflow_provider/util/util.py b/src/sas_airflow_provider/util/util.py index 0f7165a..072c9eb 100644 --- a/src/sas_airflow_provider/util/util.py +++ b/src/sas_airflow_provider/util/util.py @@ -19,6 +19,7 @@ import json import requests import os +import logging def get_folder_file_contents(session, path: str) -> str: @@ -106,6 +107,41 @@ def get_uri(links, rel): return link["uri"] +def stream_log(session,job,start,limit=99999) -> int: + current_line=start + + log_uri = get_uri(job["links"], "log") + if not log_uri: + logging.getLogger(name=None).warning("Warning: failed to retrieve log URI from links") + else: + try: + # Note if it is a files link (it will be that when the job have finished), this does not support the 'start' parameter, so we need to filter it by ourself. + # We will ignore the limit parameter in that case + is_files_link=log_uri.startswith("/files/") + + r = session.get(f"{log_uri}/content?start={start}&limit={limit}") + if r.ok: + # Parse the json log format and print each line + log_contents = r.text + jcontents = json.loads(log_contents) + lines=0; + for line in jcontents["items"]: + if (is_files_link and lines>=start) or not is_files_link: + t = line["type"] + if t != "title": + logging.getLogger(name=None).info(f'{line["line"]}') + current_line=current_line+1 + + lines=lines+1 + else: + logging.getLogger(name=None).warning(f"Failed to retrieve part of the log from URI: {log_uri}/content ") + except Exception as e: + logging.getLogger(name=None).warning("Unable to retrieve parts of the log.") + + return current_line + + + def dump_logs(session, job): """ Get the log from the job object