From 6284454019563ec486d2114bb211ebb7ba27b3c2 Mon Sep 17 00:00:00 2001 From: Andrew Maguire Date: Sat, 17 Jun 2023 21:04:21 +0100 Subject: [PATCH 1/3] add ability to log scores to airflow --- airflow_anomaly_detection/__init__.py | 2 +- .../bigquery_anomaly_detection_dag/config/defaults.yaml | 1 + .../operators/bigquery/metric_batch_ingest_operator.py | 3 +++ .../operators/bigquery/metric_batch_score_operator.py | 4 ++++ pyproject.toml | 2 +- 5 files changed, 10 insertions(+), 2 deletions(-) diff --git a/airflow_anomaly_detection/__init__.py b/airflow_anomaly_detection/__init__.py index 9d575a0..9517191 100644 --- a/airflow_anomaly_detection/__init__.py +++ b/airflow_anomaly_detection/__init__.py @@ -1,5 +1,5 @@ -__version__ = "0.0.22" +__version__ = "0.0.23" def get_provider_info(): return { diff --git a/airflow_anomaly_detection/example_dags/bigquery_anomaly_detection_dag/config/defaults.yaml b/airflow_anomaly_detection/example_dags/bigquery_anomaly_detection_dag/config/defaults.yaml index 2bee205..e952b01 100644 --- a/airflow_anomaly_detection/example_dags/bigquery_anomaly_detection_dag/config/defaults.yaml +++ b/airflow_anomaly_detection/example_dags/bigquery_anomaly_detection_dag/config/defaults.yaml @@ -36,3 +36,4 @@ alert_subject_emoji: '🔥' # emoji to use in alert emails. alert_metric_last_updated_hours_ago_max: 24 # max number of hours ago the metric was last updated to include in alerting, otherwise ignore. alert_metric_name_n_observations_min: 14 # min number of observations a metric must have to be considered for alerting. alert_airflow_fail_on_alert: False # whether to fail the alerting dag if an alert is triggered. +log_scores: False # whether to log metrics scores to the airflow logs. diff --git a/airflow_anomaly_detection/operators/bigquery/metric_batch_ingest_operator.py b/airflow_anomaly_detection/operators/bigquery/metric_batch_ingest_operator.py index ed1dbae..410cbe7 100644 --- a/airflow_anomaly_detection/operators/bigquery/metric_batch_ingest_operator.py +++ b/airflow_anomaly_detection/operators/bigquery/metric_batch_ingest_operator.py @@ -22,6 +22,9 @@ def __init__(self, metric_batch_sql: str, **kwargs) -> None: self.metric_batch_sql = metric_batch_sql def execute(self, context: Any): + """ + Executes `insert_job` to generate metrics. + """ gcp_destination_dataset = context['params'].get('gcp_destination_dataset', 'develop') gcp_ingest_destination_table_name = context['params'].get('gcp_ingest_destination_table_name', 'metrics') diff --git a/airflow_anomaly_detection/operators/bigquery/metric_batch_score_operator.py b/airflow_anomaly_detection/operators/bigquery/metric_batch_score_operator.py index 5f3d399..232f352 100644 --- a/airflow_anomaly_detection/operators/bigquery/metric_batch_score_operator.py +++ b/airflow_anomaly_detection/operators/bigquery/metric_batch_score_operator.py @@ -50,6 +50,7 @@ def execute(self, context: Any): # create empty dataframe to store scores df_scores = pd.DataFrame() + # process each metric_name for metric_name in metrics_distinct: # filter for metric_name @@ -76,6 +77,9 @@ def execute(self, context: Any): df_scores_tmp['metric_name'] = metric_name df_scores_tmp['metric_timestamp'] = df_X['metric_timestamp'].values + if context['params'].get('log_scores', False): + self.log.info(df_scores_tmp) + # append to df_scores df_scores = df_scores.append(df_scores_tmp) diff --git a/pyproject.toml b/pyproject.toml index 110bfcf..feb1bde 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "hatchling.build" [project] name = "airflow-provider-anomaly-detection" -version = "0.0.22" +version = "0.0.23" authors = [ { name="andrewm4894", email="andrewm4894@gmail.com" }, ] From 0533107b495860a3288e4f09fe806dc6c11b7f15 Mon Sep 17 00:00:00 2001 From: Andrew Maguire Date: Sat, 17 Jun 2023 21:17:50 +0100 Subject: [PATCH 2/3] use to_string() --- .astro-registry.yaml | 8 ++++---- airflow_anomaly_detection/__init__.py | 2 +- .../operators/bigquery/metric_batch_score_operator.py | 2 +- pyproject.toml | 2 +- 4 files changed, 7 insertions(+), 7 deletions(-) diff --git a/.astro-registry.yaml b/.astro-registry.yaml index 44d5729..32887ac 100644 --- a/.astro-registry.yaml +++ b/.astro-registry.yaml @@ -19,8 +19,8 @@ docs_url: https://github.com/andrewm4894/airflow-provider-anomaly-detection/blob # value should be "sample_provider.hooks.sample_hook.SampleHook". operators: - - module: airflow_anomaly_detection.operators.bigquery_metric_batch_ingest_operator.BigQueryMetricBatchIngestOperator - - module: airflow_anomaly_detection.operators.bigquery_metric_batch_train_operator.BigQueryMetricBatchTrainOperator - - module: airflow_anomaly_detection.operators.bigquery_metric_batch_train_operator.BigQueryMetricBatchScoreOperator - - module: airflow_anomaly_detection.operators.bigquery_metric_batch_alert_operator.BigQueryMetricBatchAlertOperator + - module: airflow_anomaly_detection.operators.bigquery.metric_batch_ingest_operator.BigQueryMetricBatchIngestOperator + - module: airflow_anomaly_detection.operators.bigquery.metric_batch_train_operator.BigQueryMetricBatchTrainOperator + - module: airflow_anomaly_detection.operators.bigquery.metric_batch_train_operator.BigQueryMetricBatchScoreOperator + - module: airflow_anomaly_detection.operators.bigquery.metric_batch_alert_operator.BigQueryMetricBatchAlertOperator - module: airflow_anomaly_detection.operators.metric_batch_alert_operator.MetricBatchEmailNotifyOperator diff --git a/airflow_anomaly_detection/__init__.py b/airflow_anomaly_detection/__init__.py index 9517191..b124c8e 100644 --- a/airflow_anomaly_detection/__init__.py +++ b/airflow_anomaly_detection/__init__.py @@ -1,5 +1,5 @@ -__version__ = "0.0.23" +__version__ = "0.0.24" def get_provider_info(): return { diff --git a/airflow_anomaly_detection/operators/bigquery/metric_batch_score_operator.py b/airflow_anomaly_detection/operators/bigquery/metric_batch_score_operator.py index 232f352..dcb8042 100644 --- a/airflow_anomaly_detection/operators/bigquery/metric_batch_score_operator.py +++ b/airflow_anomaly_detection/operators/bigquery/metric_batch_score_operator.py @@ -78,7 +78,7 @@ def execute(self, context: Any): df_scores_tmp['metric_timestamp'] = df_X['metric_timestamp'].values if context['params'].get('log_scores', False): - self.log.info(df_scores_tmp) + self.log.info(df_scores_tmp.to_string()) # append to df_scores df_scores = df_scores.append(df_scores_tmp) diff --git a/pyproject.toml b/pyproject.toml index feb1bde..2370d53 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "hatchling.build" [project] name = "airflow-provider-anomaly-detection" -version = "0.0.23" +version = "0.0.24" authors = [ { name="andrewm4894", email="andrewm4894@gmail.com" }, ] From 4cba5b93ad29935934cc7c30648c933ddb453726 Mon Sep 17 00:00:00 2001 From: Andrew Maguire Date: Sat, 17 Jun 2023 21:25:55 +0100 Subject: [PATCH 3/3] transpose scores when logging --- airflow_anomaly_detection/__init__.py | 2 +- .../operators/bigquery/metric_batch_score_operator.py | 2 +- pyproject.toml | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/airflow_anomaly_detection/__init__.py b/airflow_anomaly_detection/__init__.py index b124c8e..07e8644 100644 --- a/airflow_anomaly_detection/__init__.py +++ b/airflow_anomaly_detection/__init__.py @@ -1,5 +1,5 @@ -__version__ = "0.0.24" +__version__ = "0.0.25" def get_provider_info(): return { diff --git a/airflow_anomaly_detection/operators/bigquery/metric_batch_score_operator.py b/airflow_anomaly_detection/operators/bigquery/metric_batch_score_operator.py index dcb8042..6faa1c3 100644 --- a/airflow_anomaly_detection/operators/bigquery/metric_batch_score_operator.py +++ b/airflow_anomaly_detection/operators/bigquery/metric_batch_score_operator.py @@ -78,7 +78,7 @@ def execute(self, context: Any): df_scores_tmp['metric_timestamp'] = df_X['metric_timestamp'].values if context['params'].get('log_scores', False): - self.log.info(df_scores_tmp.to_string()) + self.log.info(df_scores_tmp.transpose().to_string()) # append to df_scores df_scores = df_scores.append(df_scores_tmp) diff --git a/pyproject.toml b/pyproject.toml index 2370d53..6f4abaf 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "hatchling.build" [project] name = "airflow-provider-anomaly-detection" -version = "0.0.24" +version = "0.0.25" authors = [ { name="andrewm4894", email="andrewm4894@gmail.com" }, ]