diff --git a/README.md b/README.md index 828a55d..735d427 100644 --- a/README.md +++ b/README.md @@ -1,7 +1,7 @@ # About dq-suite-amsterdam This repository aims to be an easy-to-use wrapper for the data quality library [Great Expectations](https://github.com/great-expectations/great_expectations) (GX). All that is needed to get started is an in-memory Spark dataframe and a set of data quality rules - specified in a JSON file [of particular formatting](dq_rules_example.json). -By default, none of the validation results are written to Unity Catalog. Alternatively, one could allow for writing to a `data_quality` schema in UC, which one has to create once per catalog via [this notebook](scripts/data_quality_tables.sql). Additionally, users can choose to get notified via Slack or Microsoft Teams. +By default, all the validation results are written to Unity Catalog. Alternatively, one could disallow writing to a `data_quality` schema in UC, which one has to create once per catalog via [this notebook](scripts/data_quality_tables.sql). Additionally, users can choose to get notified via Slack or Microsoft Teams. diff --git a/pyproject.toml b/pyproject.toml index 7496bbd..87ee571 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta" [project] name = "dq-suite-amsterdam" -version = "0.11.10" +version = "0.11.11" authors = [ { name="Arthur Kordes", email="a.kordes@amsterdam.nl" }, { name="Aysegul Cayir Aydar", email="a.cayiraydar@amsterdam.nl" }, diff --git a/src/dq_suite/other.py b/src/dq_suite/other.py index 70a62bc..5ec36a6 100644 --- a/src/dq_suite/other.py +++ b/src/dq_suite/other.py @@ -89,7 +89,9 @@ def get_all_table_name_to_column_names_mappings( return list_of_all_table_name_to_column_names_mappings -def export_schema_to_json_string(dataset: str, spark: SparkSession, *table: str) -> str: +def export_schema_to_json_string( + dataset: str, spark: SparkSession, *table: str +) -> str: """ Function exports a schema from Unity Catalog to be used by the Excel input form @@ -104,8 +106,8 @@ def export_schema_to_json_string(dataset: str, spark: SparkSession, *table: str) table_name_list = table else: table_name_list = get_table_name_list_from_unity_catalog( - dataset=dataset, spark=spark - ) + dataset=dataset, spark=spark + ) df_columns_tables = create_dataframe_containing_all_column_names_in_tables( table_name_list=table_name_list, spark=spark diff --git a/src/dq_suite/output_transformations.py b/src/dq_suite/output_transformations.py index a83e134..3e7ed6e 100644 --- a/src/dq_suite/output_transformations.py +++ b/src/dq_suite/output_transformations.py @@ -37,9 +37,11 @@ def convert_param_values_to_float(parameters): """ float_list = ["min_value", "max_value"] for k, v in parameters.items(): - if k in float_list: v = round(float(v), 1) + if k in float_list: + v = round(float(v), 1) parameters[k] = v + def create_empty_dataframe( spark_session: SparkSession, schema: StructType ) -> DataFrame: diff --git a/src/dq_suite/validation.py b/src/dq_suite/validation.py index d689623..15d91d9 100644 --- a/src/dq_suite/validation.py +++ b/src/dq_suite/validation.py @@ -1,5 +1,5 @@ import datetime -from typing import Dict, List, Literal +from typing import Dict, List, Literal, Tuple from great_expectations import ( Checkpoint, @@ -131,7 +131,7 @@ def _get_or_add_expectation_suite(self) -> ExpectationSuite: return suite @staticmethod - def _get_gx_expectation_object(validation_rule: Rule): + def _get_gx_expectation_object(validation_rule: Rule, table_name: str): """ From great_expectations.expectations.core, get the relevant class and instantiate an expectation object with the user-defined parameters @@ -140,6 +140,13 @@ def _get_gx_expectation_object(validation_rule: Rule): gx_expectation_class = getattr(gx_core, gx_expectation_name) gx_expectation_parameters: dict = validation_rule["parameters"] + column_name = gx_expectation_parameters.get("column", None) + + gx_expectation_parameters["meta"] = { + "table_name": table_name, + "column_name": column_name, + "expectation_name": gx_expectation_name, + } return gx_expectation_class(**gx_expectation_parameters) def add_expectations_to_suite(self, validation_rules_list: List[Rule]): @@ -148,7 +155,7 @@ def add_expectations_to_suite(self, validation_rules_list: List[Rule]): for validation_rule in validation_rules_list: gx_expectation_obj = self._get_gx_expectation_object( - validation_rule=validation_rule + validation_rule=validation_rule, table_name=self.table_name ) expectation_suite_obj.add_expectation(gx_expectation_obj) @@ -285,7 +292,9 @@ def validate( validation_runner_obj.create_validation_definition() print("***Starting validation run***") - return validation_runner_obj.run_validation(batch_parameters={"dataframe": df}) + return validation_runner_obj.run_validation( + batch_parameters={"dataframe": df} + ) def run_validation( @@ -300,7 +309,9 @@ def run_validation( ms_teams_webhook: str | None = None, notify_on: Literal["all", "success", "failure"] = "failure", write_results_to_unity_catalog: bool = True, -) -> None: # pragma: no cover - only GX functions + debug_mode: bool = False, +) -> bool | Tuple[bool, CheckpointResult]: # pragma: no cover - only GX + # functions """ Main function for users of dq_suite. @@ -320,7 +331,9 @@ def run_validation( an MS Teams notification will be sent notify_on: when to send notifications, can be equal to "all", "success" or "failure" - write_results_to_unity_catalog: toggle writing of results to UC + write_results_to_unity_catalog: by default (True) write results to UC + debug_mode: default (False) returns a boolean flag, alternatively (True) + a tuple containing boolean flag and CheckpointResult object is returned """ validation_settings_obj = ValidationSettings( spark_session=spark_session, @@ -358,11 +371,16 @@ def run_validation( rules_dict=rules_dict, validation_settings_obj=validation_settings_obj, ) - validation_output = checkpoint_result.describe_dict() - run_time = datetime.datetime.now() # TODO: get from RunIdentifier object + + if debug_mode: # Don't write to UC in debug mode + return checkpoint_result.success, checkpoint_result # 3) ... and write results to unity catalog if write_results_to_unity_catalog: + validation_output = checkpoint_result.describe_dict() + run_time = ( + datetime.datetime.now() + ) # TODO: get from RunIdentifier object write_non_validation_tables( dq_rules_dict=validation_dict, validation_settings_obj=validation_settings_obj, @@ -375,3 +393,4 @@ def run_validation( unique_identifier=rules_dict["unique_identifier"], run_time=run_time, ) + return checkpoint_result.success diff --git a/tests/test_validation.py b/tests/test_validation.py index ff6e2ec..0fa7e05 100644 --- a/tests/test_validation.py +++ b/tests/test_validation.py @@ -99,7 +99,7 @@ def test_get_nonexistent_gx_expectation_object_raises_attribute_error( ) validation_runner_obj._get_gx_expectation_object( - validation_rule=the_rule + validation_rule=the_rule, table_name="the_table" ) def test_get_gx_expectation_object(self, validation_runner_obj): @@ -109,7 +109,7 @@ def test_get_gx_expectation_object(self, validation_runner_obj): ) the_expectation_object = ( validation_runner_obj._get_gx_expectation_object( - validation_rule=the_rule + validation_rule=the_rule, table_name="the_table" ) )