diff --git a/.astro-registry.yaml b/.astro-registry.yaml index 44d5729..32887ac 100644 --- a/.astro-registry.yaml +++ b/.astro-registry.yaml @@ -19,8 +19,8 @@ docs_url: https://github.com/andrewm4894/airflow-provider-anomaly-detection/blob # value should be "sample_provider.hooks.sample_hook.SampleHook". operators: - - module: airflow_anomaly_detection.operators.bigquery_metric_batch_ingest_operator.BigQueryMetricBatchIngestOperator - - module: airflow_anomaly_detection.operators.bigquery_metric_batch_train_operator.BigQueryMetricBatchTrainOperator - - module: airflow_anomaly_detection.operators.bigquery_metric_batch_train_operator.BigQueryMetricBatchScoreOperator - - module: airflow_anomaly_detection.operators.bigquery_metric_batch_alert_operator.BigQueryMetricBatchAlertOperator + - module: airflow_anomaly_detection.operators.bigquery.metric_batch_ingest_operator.BigQueryMetricBatchIngestOperator + - module: airflow_anomaly_detection.operators.bigquery.metric_batch_train_operator.BigQueryMetricBatchTrainOperator + - module: airflow_anomaly_detection.operators.bigquery.metric_batch_train_operator.BigQueryMetricBatchScoreOperator + - module: airflow_anomaly_detection.operators.bigquery.metric_batch_alert_operator.BigQueryMetricBatchAlertOperator - module: airflow_anomaly_detection.operators.metric_batch_alert_operator.MetricBatchEmailNotifyOperator diff --git a/airflow_anomaly_detection/__init__.py b/airflow_anomaly_detection/__init__.py index 9d575a0..07e8644 100644 --- a/airflow_anomaly_detection/__init__.py +++ b/airflow_anomaly_detection/__init__.py @@ -1,5 +1,5 @@ -__version__ = "0.0.22" +__version__ = "0.0.25" def get_provider_info(): return { diff --git a/airflow_anomaly_detection/example_dags/bigquery_anomaly_detection_dag/config/defaults.yaml b/airflow_anomaly_detection/example_dags/bigquery_anomaly_detection_dag/config/defaults.yaml index 2bee205..e952b01 100644 --- a/airflow_anomaly_detection/example_dags/bigquery_anomaly_detection_dag/config/defaults.yaml +++ b/airflow_anomaly_detection/example_dags/bigquery_anomaly_detection_dag/config/defaults.yaml @@ -36,3 +36,4 @@ alert_subject_emoji: '🔥' # emoji to use in alert emails. alert_metric_last_updated_hours_ago_max: 24 # max number of hours ago the metric was last updated to include in alerting, otherwise ignore. alert_metric_name_n_observations_min: 14 # min number of observations a metric must have to be considered for alerting. alert_airflow_fail_on_alert: False # whether to fail the alerting dag if an alert is triggered. +log_scores: False # whether to log metrics scores to the airflow logs. diff --git a/airflow_anomaly_detection/operators/bigquery/metric_batch_ingest_operator.py b/airflow_anomaly_detection/operators/bigquery/metric_batch_ingest_operator.py index ed1dbae..410cbe7 100644 --- a/airflow_anomaly_detection/operators/bigquery/metric_batch_ingest_operator.py +++ b/airflow_anomaly_detection/operators/bigquery/metric_batch_ingest_operator.py @@ -22,6 +22,9 @@ def __init__(self, metric_batch_sql: str, **kwargs) -> None: self.metric_batch_sql = metric_batch_sql def execute(self, context: Any): + """ + Executes `insert_job` to generate metrics. + """ gcp_destination_dataset = context['params'].get('gcp_destination_dataset', 'develop') gcp_ingest_destination_table_name = context['params'].get('gcp_ingest_destination_table_name', 'metrics') diff --git a/airflow_anomaly_detection/operators/bigquery/metric_batch_score_operator.py b/airflow_anomaly_detection/operators/bigquery/metric_batch_score_operator.py index 5f3d399..6faa1c3 100644 --- a/airflow_anomaly_detection/operators/bigquery/metric_batch_score_operator.py +++ b/airflow_anomaly_detection/operators/bigquery/metric_batch_score_operator.py @@ -50,6 +50,7 @@ def execute(self, context: Any): # create empty dataframe to store scores df_scores = pd.DataFrame() + # process each metric_name for metric_name in metrics_distinct: # filter for metric_name @@ -76,6 +77,9 @@ def execute(self, context: Any): df_scores_tmp['metric_name'] = metric_name df_scores_tmp['metric_timestamp'] = df_X['metric_timestamp'].values + if context['params'].get('log_scores', False): + self.log.info(df_scores_tmp.transpose().to_string()) + # append to df_scores df_scores = df_scores.append(df_scores_tmp) diff --git a/pyproject.toml b/pyproject.toml index 110bfcf..6f4abaf 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "hatchling.build" [project] name = "airflow-provider-anomaly-detection" -version = "0.0.22" +version = "0.0.25" authors = [ { name="andrewm4894", email="andrewm4894@gmail.com" }, ]