From 09c361971965a63acf62c226eb1a0e979b0c77c2 Mon Sep 17 00:00:00 2001 From: Alex <116374290+aaalexlit@users.noreply.github.com> Date: Thu, 14 Dec 2023 22:31:53 +0100 Subject: [PATCH] Refactor sequential_checks.py Reduce code duplication by making the test preset a parameter Note: There's no need to explicitly specify SequentialTaskRunner; the code is written in a way that will make it run sequentially. --- .../sequential_checks.py | 87 ++++++++----------- 1 file changed, 35 insertions(+), 52 deletions(-) diff --git a/module5/prefect_sequential_checks/sequential_checks.py b/module5/prefect_sequential_checks/sequential_checks.py index 86c4085..4c739ac 100644 --- a/module5/prefect_sequential_checks/sequential_checks.py +++ b/module5/prefect_sequential_checks/sequential_checks.py @@ -1,64 +1,47 @@ import os -import pandas as pd -from datetime import datetime, timedelta +import pandas as pd +from evidently.test_preset import (DataDriftTestPreset, DataQualityTestPreset, + DataStabilityTestPreset) +from evidently.test_preset.test_preset import TestPreset +from evidently.test_suite import TestSuite from sklearn import datasets from prefect import flow, task -from prefect.task_runners import SequentialTaskRunner - -from evidently.test_suite import TestSuite -from evidently.test_preset import DataDriftTestPreset, DataQualityTestPreset, DataStabilityTestPreset dir_path = "reports" + @task(name="LOAD_DATA", retries=3, retry_delay_seconds=15) def load_bank_data(): - bank_marketing = datasets.fetch_openml(name='bank-marketing', as_frame='auto') - bank_marketing_data = bank_marketing.frame - reference_data = bank_marketing_data[5000:7000] - prod_simulation_data = bank_marketing_data[7000:] - batch_size = 2000 - return reference_data, prod_simulation_data[:batch_size] - -@task(name="CHECK_QUALITY", retries=3, retry_delay_seconds=15) -def run_data_quality_test_suite(reference: pd.DataFrame, current: pd.DataFrame, ): - data_quality_suite = TestSuite(tests=[DataQualityTestPreset()]) - data_quality_suite.run(reference_data=reference, current_data=current) - if not data_quality_suite.as_dict()['summary']['all_passed']: - try: - os.mkdir(dir_path) - except OSError: - print("Creation of the directory {} failed".format(dir_path)) - data_quality_suite.save_html(os.path.join(dir_path, "data_quality_suite.html")) - -@task(name="CHECK_STABILITY", retries=3, retry_delay_seconds=15) -def run_data_stability_test_suite(reference: pd.DataFrame, current: pd.DataFrame): - data_stability_suite = TestSuite(tests=[DataQualityTestPreset()]) - data_stability_suite.run(reference_data=reference, current_data=current) - if not data_stability_suite.as_dict()['summary']['all_passed']: - try: - os.mkdir(dir_path) - except OSError: - print("Creation of the directory {} failed".format(dir_path)) - data_stability_suite.save_html(os.path.join(dir_path, "data_stability_suite.html")) - -@task(name="CHECK_DRIFT", retries=3, retry_delay_seconds=15) -def run_data_drift_test_suite(reference: pd.DataFrame, current: pd.DataFrame): - data_drift_suite = TestSuite(tests=[DataDriftTestPreset()]) - data_drift_suite.run(reference_data=reference, current_data=current) - if not data_drift_suite.as_dict()['summary']['all_passed']: - try: - os.mkdir(dir_path) - except OSError: - print("Creation of the directory {} failed".format(dir_path)) - data_drift_suite.save_html(os.path.join(dir_path, "data_drift_suite.html")) - -@flow(task_runner=SequentialTaskRunner) + bank_marketing = datasets.fetch_openml(name="bank-marketing", as_frame="auto") + bank_marketing_data = bank_marketing.frame + reference_data = bank_marketing_data[5000:7000] + prod_simulation_data = bank_marketing_data[7000:] + batch_size = 2000 + return reference_data, prod_simulation_data[:batch_size] + + +@task(task_run_name="Run_{test_preset.__name__}", retries=3, retry_delay_seconds=15) +def run_test_suite( + reference: pd.DataFrame, current: pd.DataFrame, test_preset: TestPreset +): + test_suite = TestSuite(tests=[test_preset()]) + test_suite.run(reference_data=reference, current_data=current) + if not test_suite.as_dict()["summary"]["all_passed"]: + try: + os.mkdir(dir_path) + except OSError: + print(f"Creation of the directory {dir_path} failed") + test_suite.save_html(os.path.join(dir_path, f"{test_preset.__name__[:-10]}Report.html")) + + +@flow() def checks_flow(): - reference, current = load_bank_data() - run_data_quality_test_suite(reference, current) - run_data_stability_test_suite(reference, current) - run_data_drift_test_suite(reference, current) + reference, current = load_bank_data() + run_test_suite(reference, current, DataDriftTestPreset) + run_test_suite(reference, current, DataQualityTestPreset) + run_test_suite(reference, current, DataStabilityTestPreset) + -checks_flow() \ No newline at end of file +checks_flow()