From da9d46b6c75e72a07e80a41d022e69c66bec2580 Mon Sep 17 00:00:00 2001 From: Andrew Maguire Date: Sat, 17 Jun 2023 21:45:45 +0100 Subject: [PATCH 1/5] use concat() instead of append() --- airflow_anomaly_detection/__init__.py | 2 +- .../operators/bigquery/metric_batch_score_operator.py | 3 ++- pyproject.toml | 2 +- 3 files changed, 4 insertions(+), 3 deletions(-) diff --git a/airflow_anomaly_detection/__init__.py b/airflow_anomaly_detection/__init__.py index 07e8644..12f8639 100644 --- a/airflow_anomaly_detection/__init__.py +++ b/airflow_anomaly_detection/__init__.py @@ -1,5 +1,5 @@ -__version__ = "0.0.25" +__version__ = "0.0.26" def get_provider_info(): return { diff --git a/airflow_anomaly_detection/operators/bigquery/metric_batch_score_operator.py b/airflow_anomaly_detection/operators/bigquery/metric_batch_score_operator.py index 6faa1c3..51ff37e 100644 --- a/airflow_anomaly_detection/operators/bigquery/metric_batch_score_operator.py +++ b/airflow_anomaly_detection/operators/bigquery/metric_batch_score_operator.py @@ -78,10 +78,11 @@ def execute(self, context: Any): df_scores_tmp['metric_timestamp'] = df_X['metric_timestamp'].values if context['params'].get('log_scores', False): + self.log.info(df_X.transpose().to_string()) self.log.info(df_scores_tmp.transpose().to_string()) # append to df_scores - df_scores = df_scores.append(df_scores_tmp) + df_scores = pd.concat([df_scores, df_scores_tmp]) # check if table exists and if not create it and partition by metric_timestamp if not bigquery_hook.table_exists( diff --git a/pyproject.toml b/pyproject.toml index 6f4abaf..e788c1d 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "hatchling.build" [project] name = "airflow-provider-anomaly-detection" -version = "0.0.25" +version = "0.0.26" authors = [ { name="andrewm4894", email="andrewm4894@gmail.com" }, ] From 77d217348999d8d80db0f76b049de0431719d9dc Mon Sep 17 00:00:00 2001 From: Andrew Maguire Date: Sat, 17 Jun 2023 21:49:32 +0100 Subject: [PATCH 2/5] naming cleanup --- .../bigquery_anomaly_detection_dag/config/defaults.yaml | 2 +- .../operators/bigquery/metric_batch_score_operator.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/airflow_anomaly_detection/example_dags/bigquery_anomaly_detection_dag/config/defaults.yaml b/airflow_anomaly_detection/example_dags/bigquery_anomaly_detection_dag/config/defaults.yaml index e952b01..adb4d20 100644 --- a/airflow_anomaly_detection/example_dags/bigquery_anomaly_detection_dag/config/defaults.yaml +++ b/airflow_anomaly_detection/example_dags/bigquery_anomaly_detection_dag/config/defaults.yaml @@ -36,4 +36,4 @@ alert_subject_emoji: '🔥' # emoji to use in alert emails. alert_metric_last_updated_hours_ago_max: 24 # max number of hours ago the metric was last updated to include in alerting, otherwise ignore. alert_metric_name_n_observations_min: 14 # min number of observations a metric must have to be considered for alerting. alert_airflow_fail_on_alert: False # whether to fail the alerting dag if an alert is triggered. -log_scores: False # whether to log metrics scores to the airflow logs. +airflow_log_scores: False # whether to log metrics scores to the airflow logs. diff --git a/airflow_anomaly_detection/operators/bigquery/metric_batch_score_operator.py b/airflow_anomaly_detection/operators/bigquery/metric_batch_score_operator.py index 51ff37e..8b9b3b6 100644 --- a/airflow_anomaly_detection/operators/bigquery/metric_batch_score_operator.py +++ b/airflow_anomaly_detection/operators/bigquery/metric_batch_score_operator.py @@ -77,7 +77,7 @@ def execute(self, context: Any): df_scores_tmp['metric_name'] = metric_name df_scores_tmp['metric_timestamp'] = df_X['metric_timestamp'].values - if context['params'].get('log_scores', False): + if context['params'].get('airflow_log_scores', False): self.log.info(df_X.transpose().to_string()) self.log.info(df_scores_tmp.transpose().to_string()) From c6ea7379367f569e9b5419483466cb11f62add8e Mon Sep 17 00:00:00 2001 From: Andrew Maguire Date: Sat, 17 Jun 2023 22:22:38 +0100 Subject: [PATCH 3/5] clean up logging of scores --- airflow_anomaly_detection/__init__.py | 2 +- .../bigquery_anomaly_detection_dag/sql/preprocess.sql | 3 +++ .../operators/bigquery/metric_batch_score_operator.py | 11 +++++++---- pyproject.toml | 2 +- 4 files changed, 12 insertions(+), 6 deletions(-) diff --git a/airflow_anomaly_detection/__init__.py b/airflow_anomaly_detection/__init__.py index 12f8639..b11a412 100644 --- a/airflow_anomaly_detection/__init__.py +++ b/airflow_anomaly_detection/__init__.py @@ -1,5 +1,5 @@ -__version__ = "0.0.26" +__version__ = "0.0.27" def get_provider_info(): return { diff --git a/airflow_anomaly_detection/example_dags/bigquery_anomaly_detection_dag/sql/preprocess.sql b/airflow_anomaly_detection/example_dags/bigquery_anomaly_detection_dag/sql/preprocess.sql index 89d366e..9acfdb0 100644 --- a/airflow_anomaly_detection/example_dags/bigquery_anomaly_detection_dag/sql/preprocess.sql +++ b/airflow_anomaly_detection/example_dags/bigquery_anomaly_detection_dag/sql/preprocess.sql @@ -7,6 +7,7 @@ as features for training and scoring. The output needs to be a table with the following columns: - metric_timestamp - metric_name +- metric_value - x_... (features) */ @@ -33,6 +34,7 @@ metric_batch_preprocessed_data as select metric_timestamp, metric_name, + metric_value, metric_recency_rank, -- calculate the number of hours since the metric was last updated timestamp_diff(current_timestamp(), metric_timestamp_max, hour) as metric_last_updated_hours_ago, @@ -47,6 +49,7 @@ from select metric_timestamp, metric_name, + metric_value, -- take difference between the metric value and the lagged metric value {% for lag_n in range(params.preprocess_n_lags + 1) %} x_metric_value_lag{{ lag_n }} - x_metric_value_lag{{ lag_n + 1 }} as x_metric_value_lag{{ lag_n }}_diff, diff --git a/airflow_anomaly_detection/operators/bigquery/metric_batch_score_operator.py b/airflow_anomaly_detection/operators/bigquery/metric_batch_score_operator.py index 8b9b3b6..510c03c 100644 --- a/airflow_anomaly_detection/operators/bigquery/metric_batch_score_operator.py +++ b/airflow_anomaly_detection/operators/bigquery/metric_batch_score_operator.py @@ -76,11 +76,14 @@ def execute(self, context: Any): df_scores_tmp = pd.DataFrame(scores, columns=['prob_normal','prob_anomaly']) df_scores_tmp['metric_name'] = metric_name df_scores_tmp['metric_timestamp'] = df_X['metric_timestamp'].values - + if context['params'].get('airflow_log_scores', False): - self.log.info(df_X.transpose().to_string()) - self.log.info(df_scores_tmp.transpose().to_string()) - + self.log.info( + pd.concat( + [df_X.transpose(), df_scores_tmp[['prob_normal','prob_anomaly']].transpose()] + ).to_string() + ) + # append to df_scores df_scores = pd.concat([df_scores, df_scores_tmp]) diff --git a/pyproject.toml b/pyproject.toml index e788c1d..50d4d55 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "hatchling.build" [project] name = "airflow-provider-anomaly-detection" -version = "0.0.26" +version = "0.0.27" authors = [ { name="andrewm4894", email="andrewm4894@gmail.com" }, ] From c666d96fe87800e346ddf694013711b8bf92411a Mon Sep 17 00:00:00 2001 From: Andrew Maguire Date: Sat, 17 Jun 2023 23:11:32 +0100 Subject: [PATCH 4/5] clean up score logging --- airflow_anomaly_detection/__init__.py | 2 +- .../operators/bigquery/metric_batch_score_operator.py | 5 ++++- pyproject.toml | 2 +- 3 files changed, 6 insertions(+), 3 deletions(-) diff --git a/airflow_anomaly_detection/__init__.py b/airflow_anomaly_detection/__init__.py index b11a412..1a84e46 100644 --- a/airflow_anomaly_detection/__init__.py +++ b/airflow_anomaly_detection/__init__.py @@ -1,5 +1,5 @@ -__version__ = "0.0.27" +__version__ = "0.0.28" def get_provider_info(): return { diff --git a/airflow_anomaly_detection/operators/bigquery/metric_batch_score_operator.py b/airflow_anomaly_detection/operators/bigquery/metric_batch_score_operator.py index 510c03c..a6b9e00 100644 --- a/airflow_anomaly_detection/operators/bigquery/metric_batch_score_operator.py +++ b/airflow_anomaly_detection/operators/bigquery/metric_batch_score_operator.py @@ -80,7 +80,10 @@ def execute(self, context: Any): if context['params'].get('airflow_log_scores', False): self.log.info( pd.concat( - [df_X.transpose(), df_scores_tmp[['prob_normal','prob_anomaly']].transpose()] + [ + df_X.transpose().rename(columns={"0":"x"}, inplace=True), + df_scores_tmp[['prob_normal','prob_anomaly']].transpose().rename(columns={"0":"x"}, inplace=True) + ] ).to_string() ) diff --git a/pyproject.toml b/pyproject.toml index 50d4d55..43bd53f 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "hatchling.build" [project] name = "airflow-provider-anomaly-detection" -version = "0.0.27" +version = "0.0.28" authors = [ { name="andrewm4894", email="andrewm4894@gmail.com" }, ] From 713a9f0f2198ce4c29108c6e3d2a703beb25598a Mon Sep 17 00:00:00 2001 From: Andrew Maguire Date: Sat, 17 Jun 2023 23:50:24 +0100 Subject: [PATCH 5/5] clean up --- airflow_anomaly_detection/__init__.py | 2 +- .../operators/bigquery/metric_batch_score_operator.py | 9 ++------- pyproject.toml | 2 +- 3 files changed, 4 insertions(+), 9 deletions(-) diff --git a/airflow_anomaly_detection/__init__.py b/airflow_anomaly_detection/__init__.py index 1a84e46..10cd5e2 100644 --- a/airflow_anomaly_detection/__init__.py +++ b/airflow_anomaly_detection/__init__.py @@ -1,5 +1,5 @@ -__version__ = "0.0.28" +__version__ = "0.0.30" def get_provider_info(): return { diff --git a/airflow_anomaly_detection/operators/bigquery/metric_batch_score_operator.py b/airflow_anomaly_detection/operators/bigquery/metric_batch_score_operator.py index a6b9e00..965aefb 100644 --- a/airflow_anomaly_detection/operators/bigquery/metric_batch_score_operator.py +++ b/airflow_anomaly_detection/operators/bigquery/metric_batch_score_operator.py @@ -54,7 +54,7 @@ def execute(self, context: Any): for metric_name in metrics_distinct: # filter for metric_name - df_X = df_score[df_score['metric_name'] == metric_name] + df_X = df_score[df_score['metric_name'] == metric_name].reset_index() # drop columns that are not needed for scoring X = df_X[[col for col in df_X.columns if col.startswith('x_')]].values @@ -79,12 +79,7 @@ def execute(self, context: Any): if context['params'].get('airflow_log_scores', False): self.log.info( - pd.concat( - [ - df_X.transpose().rename(columns={"0":"x"}, inplace=True), - df_scores_tmp[['prob_normal','prob_anomaly']].transpose().rename(columns={"0":"x"}, inplace=True) - ] - ).to_string() + pd.concat([df_X, df_scores_tmp[['prob_normal','prob_anomaly']]],axis=1).transpose().to_string() ) # append to df_scores diff --git a/pyproject.toml b/pyproject.toml index 43bd53f..c5e1831 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "hatchling.build" [project] name = "airflow-provider-anomaly-detection" -version = "0.0.28" +version = "0.0.30" authors = [ { name="andrewm4894", email="andrewm4894@gmail.com" }, ]