diff --git a/airflow_anomaly_detection/__init__.py b/airflow_anomaly_detection/__init__.py index 07e8644..10cd5e2 100644 --- a/airflow_anomaly_detection/__init__.py +++ b/airflow_anomaly_detection/__init__.py @@ -1,5 +1,5 @@ -__version__ = "0.0.25" +__version__ = "0.0.30" def get_provider_info(): return { diff --git a/airflow_anomaly_detection/example_dags/bigquery_anomaly_detection_dag/config/defaults.yaml b/airflow_anomaly_detection/example_dags/bigquery_anomaly_detection_dag/config/defaults.yaml index e952b01..adb4d20 100644 --- a/airflow_anomaly_detection/example_dags/bigquery_anomaly_detection_dag/config/defaults.yaml +++ b/airflow_anomaly_detection/example_dags/bigquery_anomaly_detection_dag/config/defaults.yaml @@ -36,4 +36,4 @@ alert_subject_emoji: '🔥' # emoji to use in alert emails. alert_metric_last_updated_hours_ago_max: 24 # max number of hours ago the metric was last updated to include in alerting, otherwise ignore. alert_metric_name_n_observations_min: 14 # min number of observations a metric must have to be considered for alerting. alert_airflow_fail_on_alert: False # whether to fail the alerting dag if an alert is triggered. -log_scores: False # whether to log metrics scores to the airflow logs. +airflow_log_scores: False # whether to log metrics scores to the airflow logs. diff --git a/airflow_anomaly_detection/example_dags/bigquery_anomaly_detection_dag/sql/preprocess.sql b/airflow_anomaly_detection/example_dags/bigquery_anomaly_detection_dag/sql/preprocess.sql index 89d366e..9acfdb0 100644 --- a/airflow_anomaly_detection/example_dags/bigquery_anomaly_detection_dag/sql/preprocess.sql +++ b/airflow_anomaly_detection/example_dags/bigquery_anomaly_detection_dag/sql/preprocess.sql @@ -7,6 +7,7 @@ as features for training and scoring. The output needs to be a table with the following columns: - metric_timestamp - metric_name +- metric_value - x_... (features) */ @@ -33,6 +34,7 @@ metric_batch_preprocessed_data as select metric_timestamp, metric_name, + metric_value, metric_recency_rank, -- calculate the number of hours since the metric was last updated timestamp_diff(current_timestamp(), metric_timestamp_max, hour) as metric_last_updated_hours_ago, @@ -47,6 +49,7 @@ from select metric_timestamp, metric_name, + metric_value, -- take difference between the metric value and the lagged metric value {% for lag_n in range(params.preprocess_n_lags + 1) %} x_metric_value_lag{{ lag_n }} - x_metric_value_lag{{ lag_n + 1 }} as x_metric_value_lag{{ lag_n }}_diff, diff --git a/airflow_anomaly_detection/operators/bigquery/metric_batch_score_operator.py b/airflow_anomaly_detection/operators/bigquery/metric_batch_score_operator.py index 6faa1c3..965aefb 100644 --- a/airflow_anomaly_detection/operators/bigquery/metric_batch_score_operator.py +++ b/airflow_anomaly_detection/operators/bigquery/metric_batch_score_operator.py @@ -54,7 +54,7 @@ def execute(self, context: Any): for metric_name in metrics_distinct: # filter for metric_name - df_X = df_score[df_score['metric_name'] == metric_name] + df_X = df_score[df_score['metric_name'] == metric_name].reset_index() # drop columns that are not needed for scoring X = df_X[[col for col in df_X.columns if col.startswith('x_')]].values @@ -76,12 +76,14 @@ def execute(self, context: Any): df_scores_tmp = pd.DataFrame(scores, columns=['prob_normal','prob_anomaly']) df_scores_tmp['metric_name'] = metric_name df_scores_tmp['metric_timestamp'] = df_X['metric_timestamp'].values - - if context['params'].get('log_scores', False): - self.log.info(df_scores_tmp.transpose().to_string()) - + + if context['params'].get('airflow_log_scores', False): + self.log.info( + pd.concat([df_X, df_scores_tmp[['prob_normal','prob_anomaly']]],axis=1).transpose().to_string() + ) + # append to df_scores - df_scores = df_scores.append(df_scores_tmp) + df_scores = pd.concat([df_scores, df_scores_tmp]) # check if table exists and if not create it and partition by metric_timestamp if not bigquery_hook.table_exists( diff --git a/pyproject.toml b/pyproject.toml index 6f4abaf..c5e1831 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "hatchling.build" [project] name = "airflow-provider-anomaly-detection" -version = "0.0.25" +version = "0.0.30" authors = [ { name="andrewm4894", email="andrewm4894@gmail.com" }, ]