Skip to content

Commit

Permalink
Merge pull request #37 from andrewm4894/develop
Browse files Browse the repository at this point in the history
Develop
  • Loading branch information
andrewm4894 authored Jun 21, 2023
2 parents dba91e6 + 96666e4 commit 842fa60
Show file tree
Hide file tree
Showing 5 changed files with 27 additions and 15 deletions.
2 changes: 1 addition & 1 deletion airflow_anomaly_detection/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@

__version__ = "0.0.30"
__version__ = "0.0.32"

def get_provider_info():
return {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ preprocess_n_lags: 2 # number of lags to create for each metric.
score_max_n: 1 # max number of records to score.
score_max_n_days_ago: 7 # max number of days to score.
score_metric_last_updated_hours_ago_max: 24 # max number of hours ago the metric was last updated to include in scoring, otherwise ignore.
score_fail_on_no_model: True # whether to fail the scoring dag if no model is found.
alert_smooth_n: 3 # number of records to smooth over when smoothing anomaly score prior to alerting.
alert_status_threshold: 0.9 # threshold for the smoothed anomaly score for alerting on.
alert_max_n: 72 # max number of records to alert on.
Expand All @@ -37,3 +38,4 @@ alert_metric_last_updated_hours_ago_max: 24 # max number of hours ago the metric
alert_metric_name_n_observations_min: 14 # min number of observations a metric must have to be considered for alerting.
alert_airflow_fail_on_alert: False # whether to fail the alerting dag if an alert is triggered.
airflow_log_scores: False # whether to log metrics scores to the airflow logs.
debug_alert_always: False # whether to always alert on a metric, regardless of the score.
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,9 @@ select
*,
-- generate the alert status flag
case
-- if debug_alert_always is true, then always generate an alert
when lower('{{ params.debug_alert_always }}') = 'true' then 1
-- alert if the smoothed probability of anomaly is greater than or equal to {{ params.alert_status_threshold }}
when prob_anomaly_smooth >= {{ params.alert_status_threshold }} then 1
else 0
end as alert_status
Expand Down Expand Up @@ -82,13 +85,11 @@ metrics_alert_window_flagged as
select
metric_name,
-- generate a flag indicating whether the metric has an alert in the last {{ params.alert_window_last_n }} steps
max(alert_status) as has_alert_in_window_last_n,
max(if(metric_recency_rank <= {{ params.alert_window_last_n }},alert_status,0)) as has_alert_in_window_last_n,
-- get the number of observations for each metric
sum(1) as metric_name_n_observations
from
metrics_alert_flagged
where
metric_recency_rank <= {{ params.alert_window_last_n }}
metrics_alert_flagged
group by 1
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

from airflow.models.baseoperator import BaseOperator
from airflow.providers.google.cloud.hooks.bigquery import BigQueryHook
from airflow.exceptions import AirflowException

import pickle
import tempfile
Expand Down Expand Up @@ -59,15 +60,23 @@ def execute(self, context: Any):
# drop columns that are not needed for scoring
X = df_X[[col for col in df_X.columns if col.startswith('x_')]].values

# load model from GCS
storage_client = storage.Client(credentials=gcp_credentials)
bucket = storage_client.get_bucket(gcs_model_bucket)
model_name = f'{metric_name}.pkl'
blob = bucket.blob(f'models/{model_name}')
with tempfile.NamedTemporaryFile() as temp:
blob.download_to_filename(temp.name)
with open(temp.name, 'rb') as f:
model = pickle.load(f)
try:
# load model from GCS
storage_client = storage.Client(credentials=gcp_credentials)
bucket = storage_client.get_bucket(gcs_model_bucket)
model_name = f'{metric_name}.pkl'
blob = bucket.blob(f'models/{model_name}')
with tempfile.NamedTemporaryFile() as temp:
blob.download_to_filename(temp.name)
with open(temp.name, 'rb') as f:
model = pickle.load(f)
except Exception as e:
self.log(f"An error occurred: {e}")
if context['params'].get('airflow_fail_on_model_load_error', True):
raise AirflowException(f"An error occurred: {e}")
else:
self.log.info(f"Skipping metric_name {metric_name}")
continue

# score
scores = model.predict_proba(X)
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ build-backend = "hatchling.build"

[project]
name = "airflow-provider-anomaly-detection"
version = "0.0.30"
version = "0.0.32"
authors = [
{ name="andrewm4894", email="[email protected]" },
]
Expand Down

0 comments on commit 842fa60

Please sign in to comment.