Skip to content

Commit

Permalink
Issue/86 return checkpoint result (#89)
Browse files Browse the repository at this point in the history
* Have run_validation return a CheckpointResult

* Move output for UC into conditional

* Explicitly return failed expectations via slack and ms teams

* Test adding some metadata

* Add table name and column name to metadata

* Add table name, column name and expectation name separately to metadata

* Simplify getting column name

* Fix broken tests

* Formatting

* Return boolean by default, include CheckpointResult in debug_mode

* Modify README.md

* Increase version number

* Modify docstring

* Ignore writing to UC in debug mode

---------

Co-authored-by: bas <[email protected]>
  • Loading branch information
SSchotten and bas authored Feb 17, 2025
1 parent 0c6b1f8 commit 040bb52
Show file tree
Hide file tree
Showing 6 changed files with 39 additions and 16 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# About dq-suite-amsterdam
This repository aims to be an easy-to-use wrapper for the data quality library [Great Expectations](https://github.com/great-expectations/great_expectations) (GX). All that is needed to get started is an in-memory Spark dataframe and a set of data quality rules - specified in a JSON file [of particular formatting](dq_rules_example.json).

By default, none of the validation results are written to Unity Catalog. Alternatively, one could allow for writing to a `data_quality` schema in UC, which one has to create once per catalog via [this notebook](scripts/data_quality_tables.sql). Additionally, users can choose to get notified via Slack or Microsoft Teams.
By default, all the validation results are written to Unity Catalog. Alternatively, one could disallow writing to a `data_quality` schema in UC, which one has to create once per catalog via [this notebook](scripts/data_quality_tables.sql). Additionally, users can choose to get notified via Slack or Microsoft Teams.

<img src="docs/wip_computer.jpg" width="20%" height="auto">

Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta"

[project]
name = "dq-suite-amsterdam"
version = "0.11.10"
version = "0.11.11"
authors = [
{ name="Arthur Kordes", email="[email protected]" },
{ name="Aysegul Cayir Aydar", email="[email protected]" },
Expand Down
8 changes: 5 additions & 3 deletions src/dq_suite/other.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,9 @@ def get_all_table_name_to_column_names_mappings(
return list_of_all_table_name_to_column_names_mappings


def export_schema_to_json_string(dataset: str, spark: SparkSession, *table: str) -> str:
def export_schema_to_json_string(
dataset: str, spark: SparkSession, *table: str
) -> str:
"""
Function exports a schema from Unity Catalog to be used by the Excel
input form
Expand All @@ -104,8 +106,8 @@ def export_schema_to_json_string(dataset: str, spark: SparkSession, *table: str)
table_name_list = table
else:
table_name_list = get_table_name_list_from_unity_catalog(
dataset=dataset, spark=spark
)
dataset=dataset, spark=spark
)

df_columns_tables = create_dataframe_containing_all_column_names_in_tables(
table_name_list=table_name_list, spark=spark
Expand Down
4 changes: 3 additions & 1 deletion src/dq_suite/output_transformations.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,11 @@ def convert_param_values_to_float(parameters):
"""
float_list = ["min_value", "max_value"]
for k, v in parameters.items():
if k in float_list: v = round(float(v), 1)
if k in float_list:
v = round(float(v), 1)
parameters[k] = v


def create_empty_dataframe(
spark_session: SparkSession, schema: StructType
) -> DataFrame:
Expand Down
35 changes: 27 additions & 8 deletions src/dq_suite/validation.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import datetime
from typing import Dict, List, Literal
from typing import Dict, List, Literal, Tuple

from great_expectations import (
Checkpoint,
Expand Down Expand Up @@ -131,7 +131,7 @@ def _get_or_add_expectation_suite(self) -> ExpectationSuite:
return suite

@staticmethod
def _get_gx_expectation_object(validation_rule: Rule):
def _get_gx_expectation_object(validation_rule: Rule, table_name: str):
"""
From great_expectations.expectations.core, get the relevant class and
instantiate an expectation object with the user-defined parameters
Expand All @@ -140,6 +140,13 @@ def _get_gx_expectation_object(validation_rule: Rule):
gx_expectation_class = getattr(gx_core, gx_expectation_name)

gx_expectation_parameters: dict = validation_rule["parameters"]
column_name = gx_expectation_parameters.get("column", None)

gx_expectation_parameters["meta"] = {
"table_name": table_name,
"column_name": column_name,
"expectation_name": gx_expectation_name,
}
return gx_expectation_class(**gx_expectation_parameters)

def add_expectations_to_suite(self, validation_rules_list: List[Rule]):
Expand All @@ -148,7 +155,7 @@ def add_expectations_to_suite(self, validation_rules_list: List[Rule]):

for validation_rule in validation_rules_list:
gx_expectation_obj = self._get_gx_expectation_object(
validation_rule=validation_rule
validation_rule=validation_rule, table_name=self.table_name
)
expectation_suite_obj.add_expectation(gx_expectation_obj)

Expand Down Expand Up @@ -285,7 +292,9 @@ def validate(
validation_runner_obj.create_validation_definition()

print("***Starting validation run***")
return validation_runner_obj.run_validation(batch_parameters={"dataframe": df})
return validation_runner_obj.run_validation(
batch_parameters={"dataframe": df}
)


def run_validation(
Expand All @@ -300,7 +309,9 @@ def run_validation(
ms_teams_webhook: str | None = None,
notify_on: Literal["all", "success", "failure"] = "failure",
write_results_to_unity_catalog: bool = True,
) -> None: # pragma: no cover - only GX functions
debug_mode: bool = False,
) -> bool | Tuple[bool, CheckpointResult]: # pragma: no cover - only GX
# functions
"""
Main function for users of dq_suite.
Expand All @@ -320,7 +331,9 @@ def run_validation(
an MS Teams notification will be sent
notify_on: when to send notifications, can be equal to "all",
"success" or "failure"
write_results_to_unity_catalog: toggle writing of results to UC
write_results_to_unity_catalog: by default (True) write results to UC
debug_mode: default (False) returns a boolean flag, alternatively (True)
a tuple containing boolean flag and CheckpointResult object is returned
"""
validation_settings_obj = ValidationSettings(
spark_session=spark_session,
Expand Down Expand Up @@ -358,11 +371,16 @@ def run_validation(
rules_dict=rules_dict,
validation_settings_obj=validation_settings_obj,
)
validation_output = checkpoint_result.describe_dict()
run_time = datetime.datetime.now() # TODO: get from RunIdentifier object

if debug_mode: # Don't write to UC in debug mode
return checkpoint_result.success, checkpoint_result

# 3) ... and write results to unity catalog
if write_results_to_unity_catalog:
validation_output = checkpoint_result.describe_dict()
run_time = (
datetime.datetime.now()
) # TODO: get from RunIdentifier object
write_non_validation_tables(
dq_rules_dict=validation_dict,
validation_settings_obj=validation_settings_obj,
Expand All @@ -375,3 +393,4 @@ def run_validation(
unique_identifier=rules_dict["unique_identifier"],
run_time=run_time,
)
return checkpoint_result.success
4 changes: 2 additions & 2 deletions tests/test_validation.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ def test_get_nonexistent_gx_expectation_object_raises_attribute_error(
)

validation_runner_obj._get_gx_expectation_object(
validation_rule=the_rule
validation_rule=the_rule, table_name="the_table"
)

def test_get_gx_expectation_object(self, validation_runner_obj):
Expand All @@ -109,7 +109,7 @@ def test_get_gx_expectation_object(self, validation_runner_obj):
)
the_expectation_object = (
validation_runner_obj._get_gx_expectation_object(
validation_rule=the_rule
validation_rule=the_rule, table_name="the_table"
)
)

Expand Down

0 comments on commit 040bb52

Please sign in to comment.