Skip to content

Commit

Permalink
Merge pull request #36 from andrewm4894/develop
Browse files Browse the repository at this point in the history
Develop
  • Loading branch information
andrewm4894 authored Jun 17, 2023
2 parents 4066cc6 + 713a9f0 commit 0283888
Show file tree
Hide file tree
Showing 5 changed files with 14 additions and 9 deletions.
2 changes: 1 addition & 1 deletion airflow_anomaly_detection/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@

__version__ = "0.0.25"
__version__ = "0.0.30"

def get_provider_info():
return {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,4 +36,4 @@ alert_subject_emoji: '🔥' # emoji to use in alert emails.
alert_metric_last_updated_hours_ago_max: 24 # max number of hours ago the metric was last updated to include in alerting, otherwise ignore.
alert_metric_name_n_observations_min: 14 # min number of observations a metric must have to be considered for alerting.
alert_airflow_fail_on_alert: False # whether to fail the alerting dag if an alert is triggered.
log_scores: False # whether to log metrics scores to the airflow logs.
airflow_log_scores: False # whether to log metrics scores to the airflow logs.
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ as features for training and scoring.
The output needs to be a table with the following columns:
- metric_timestamp
- metric_name
- metric_value
- x_... (features)
*/
Expand All @@ -33,6 +34,7 @@ metric_batch_preprocessed_data as
select
metric_timestamp,
metric_name,
metric_value,
metric_recency_rank,
-- calculate the number of hours since the metric was last updated
timestamp_diff(current_timestamp(), metric_timestamp_max, hour) as metric_last_updated_hours_ago,
Expand All @@ -47,6 +49,7 @@ from
select
metric_timestamp,
metric_name,
metric_value,
-- take difference between the metric value and the lagged metric value
{% for lag_n in range(params.preprocess_n_lags + 1) %}
x_metric_value_lag{{ lag_n }} - x_metric_value_lag{{ lag_n + 1 }} as x_metric_value_lag{{ lag_n }}_diff,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ def execute(self, context: Any):
for metric_name in metrics_distinct:

# filter for metric_name
df_X = df_score[df_score['metric_name'] == metric_name]
df_X = df_score[df_score['metric_name'] == metric_name].reset_index()

# drop columns that are not needed for scoring
X = df_X[[col for col in df_X.columns if col.startswith('x_')]].values
Expand All @@ -76,12 +76,14 @@ def execute(self, context: Any):
df_scores_tmp = pd.DataFrame(scores, columns=['prob_normal','prob_anomaly'])
df_scores_tmp['metric_name'] = metric_name
df_scores_tmp['metric_timestamp'] = df_X['metric_timestamp'].values

if context['params'].get('log_scores', False):
self.log.info(df_scores_tmp.transpose().to_string())


if context['params'].get('airflow_log_scores', False):
self.log.info(
pd.concat([df_X, df_scores_tmp[['prob_normal','prob_anomaly']]],axis=1).transpose().to_string()
)

# append to df_scores
df_scores = df_scores.append(df_scores_tmp)
df_scores = pd.concat([df_scores, df_scores_tmp])

# check if table exists and if not create it and partition by metric_timestamp
if not bigquery_hook.table_exists(
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ build-backend = "hatchling.build"

[project]
name = "airflow-provider-anomaly-detection"
version = "0.0.25"
version = "0.0.30"
authors = [
{ name="andrewm4894", email="[email protected]" },
]
Expand Down

0 comments on commit 0283888

Please sign in to comment.