diff --git a/dq_rules_example.json b/dq_rules_example.json index 32ef507..1350aef 100644 --- a/dq_rules_example.json +++ b/dq_rules_example.json @@ -9,23 +9,19 @@ "table_name": "well", "rules": [ { - "rule_name": "expect_column_values_to_be_between", - "parameters": [ - { + "rule_name": "ExpectColumnValuesToBeBetween", + "parameters": { "column": "latitude", "min_value": 6, "max_value": 10000 } - ] }, { - "rule_name": "expect_column_distinct_values_to_equal_set", - "parameters": [ - { + "rule_name": "ExpectColumnDistinctValuesToEqualSet", + "parameters": { "column": "latitude", "value_set": [1, 2] } - ] } ] }, @@ -34,12 +30,10 @@ "table_name": "container", "rules": [ { - "rule_name": "expect_column_values_to_not_be_null", - "parameters": [ - { + "rule_name": "ExpectColumnValuesToNotBeNull", + "parameters": { "column": "containertype" } - ] } ] }, @@ -62,44 +56,39 @@ "table_name": "containertype", "rules": [ { - "rule_name": "expect_table_row_count_to_be_between", - "parameters": [ - { + "rule_name": "ExpectTableRowCountToBeBetween", + "parameters": { "min_value": 1, "max_value": 1000 } - ] }, { - "rule_name": "expect_column_values_to_not_be_null", - "parameters": [ - { + "rule_name": "ExpectColumnValuesToNotBeNull", + "parameters": { "column": "weight", "row_condition": "col(\\\"volume\\\") < 5" - }, - { + } + }, + { + "rule_name": "ExpectColumnValuesToNotBeNull", + "parameters": { "column": "volume" } - ] }, { - "rule_name": "expect_column_values_to_be_between", - "parameters": [ - { + "rule_name": "ExpectColumnValuesToBeBetween", + "parameters": { "column": "volume", "min_value": 0, "max_value": 10000 } - ] }, { - "rule_name": "expect_column_values_to_be_of_type", - "parameters": [ - { + "rule_name": "ExpectColumnValuesToBeOfType", + "parameters": { "column": "volume", "type_": "DoubleType" } - ] } ] } diff --git a/pyproject.toml b/pyproject.toml index d63eb12..e072de2 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -15,11 +15,13 @@ description = "Wrapper for Great Expectations to fit the requirements of the Gem readme = "README.md" requires-python = ">=3.10" dependencies = [ - "great_expectations==1.0.3", - "pandas==2.1.4", - "pyspark==3.5.2", - "pyhumps==3.8.0", - "pyyaml==6.0.2", + "great_expectations==1.0.3", + "pandas==2.1.4", + "pyspark==3.5.2", + "pyhumps==3.8.0", + "pyyaml==6.0.2", + "delta-spark~=3.2.0", + "validators==0.34.0", ] @@ -32,7 +34,6 @@ dev = [ 'pylint ~= 2.16', 'autoflake ~= 2.0.1', 'coverage ~= 7.6.1', - 'delta-spark ~= 3.2.0', ] [tool.isort] diff --git a/src/dq_suite/common.py b/src/dq_suite/common.py index 012319d..7f4cf11 100644 --- a/src/dq_suite/common.py +++ b/src/dq_suite/common.py @@ -1,12 +1,9 @@ from dataclasses import dataclass from typing import Any, Dict, List, Literal -import yaml +from delta.tables import * from great_expectations import ExpectationSuite, get_context -from great_expectations.data_context import ( - AbstractDataContext, - EphemeralDataContext, -) +from great_expectations.data_context import AbstractDataContext from great_expectations.data_context.types.base import ( DataContextConfig, InMemoryStoreBackendDefaults, @@ -15,7 +12,6 @@ from pyspark.sql import DataFrame, SparkSession from pyspark.sql.functions import col from pyspark.sql.types import StructType -from delta.tables import * @dataclass() @@ -26,19 +22,17 @@ class Rule: """ rule_name: str # Name of the GX expectation - parameters: List[Dict[str, Any]] # Collection of parameters required for + parameters: Dict[str, Any] # Collection of parameters required for # evaluating the expectation def __post_init__(self): if not isinstance(self.rule_name, str): raise TypeError("'rule_name' should be of type str") - if not isinstance(self.parameters, list): - raise TypeError( - "'parameters' should be of type List[Dict[str, Any]]" - ) + if not isinstance(self.parameters, dict): + raise TypeError("'parameters' should be of type Dict[str, Any]") - def __getitem__(self, key) -> str | List[Dict[str, Any]] | None: + def __getitem__(self, key) -> str | Dict[str, Any] | None: if key == "rule_name": return self.rule_name elif key == "parameters": @@ -98,7 +92,7 @@ def __post_init__(self): if not isinstance(self.layer, str): raise TypeError("'layer' should be of type str") - def __getitem__(self, key) -> str | RulesList | None: + def __getitem__(self, key) -> str | None: if key == "name": return self.name elif key == "layer": @@ -126,7 +120,7 @@ def __post_init__(self): if not isinstance(self.tables, list): raise TypeError("'tables' should be RulesDictList") - def __getitem__(self, key) -> str | RulesDictList | None: + def __getitem__(self, key) -> DatasetDict | RulesDictList | None: if key == "dataset": return self.dataset elif key == "tables": @@ -203,23 +197,21 @@ def merge_df_with_unity_table( full_table_name = get_full_table_name( catalog_name=catalog_name, table_name=table_name ) - df_alias = f'{table_name}_df' - regelTabel = DeltaTable.forName(spark_session, full_table_name) - regelTabel.alias(table_name) \ - .merge( - df.alias(df_alias), - f'{table_name}.{table_merge_id} = {df_alias}.{df_merge_id}' - ) \ - .whenMatchedUpdate(set = merge_dict) \ - .whenNotMatchedInsert(values = merge_dict) \ - .execute() + df_alias = f"{table_name}_df" + regel_tabel = DeltaTable.forName(spark_session, full_table_name) + regel_tabel.alias(table_name).merge( + df.alias(df_alias), + f"{table_name}.{table_merge_id} = {df_alias}.{df_merge_id}", + ).whenMatchedUpdate(set=merge_dict).whenNotMatchedInsert( + values=merge_dict + ).execute() def get_data_context() -> AbstractDataContext: # pragma: no cover - part of GX return get_context( project_config=DataContextConfig( store_backend_defaults=InMemoryStoreBackendDefaults(), - analytics_enabled=False + analytics_enabled=False, ) ) diff --git a/src/dq_suite/df_checker.py b/src/dq_suite/df_checker.py index d6c9fdc..7a66455 100644 --- a/src/dq_suite/df_checker.py +++ b/src/dq_suite/df_checker.py @@ -1,7 +1,6 @@ from typing import List import great_expectations -import humps from great_expectations import Checkpoint, ValidationDefinition from great_expectations.checkpoint.actions import CheckpointAction from great_expectations.checkpoint.checkpoint import CheckpointResult @@ -146,25 +145,14 @@ def create_and_configure_expectations( for validation_rule in validation_rules_list: # Get the name of expectation as defined by GX gx_expectation_name = validation_rule["rule_name"] + gx_expectation_parameters: dict = validation_rule["parameters"] # Get the actual expectation as defined by GX gx_expectation = getattr( great_expectations.expectations.core, - humps.pascalize(gx_expectation_name), + gx_expectation_name, ) - # Issue 50 - # TODO: drop pascalization, and require this as input check - # when ingesting JSON? Could be done via humps.is_pascalcase() - - for validation_parameter_dict in validation_rule["parameters"]: - kwargs = {} - # Issue 51 - # TODO/check: is this loop really necessary? Intuitively, I added - # the same expectation for each column - I didn't consider using - # the same expectation with different parameters - for par_name, par_value in validation_parameter_dict.items(): - kwargs[par_name] = par_value - suite.add_expectation(gx_expectation(**kwargs)) + suite.add_expectation(gx_expectation(**gx_expectation_parameters)) def validate( @@ -215,12 +203,11 @@ def run( table_name=validation_settings_obj.table_name, ) if rules_dict is None: - print( + raise ValueError( f"No validations found for table_name " f"'{validation_settings_obj.table_name}' in JSON file at '" f"{json_path}'." ) - return # 2) perform the validation on the dataframe checkpoint_result = validate( diff --git a/src/dq_suite/input_helpers.py b/src/dq_suite/input_helpers.py index 6385a99..c076530 100644 --- a/src/dq_suite/input_helpers.py +++ b/src/dq_suite/input_helpers.py @@ -1,7 +1,9 @@ import json from typing import Any, Dict +import humps import requests +import validators from pyspark.sql import SparkSession from pyspark.sql.functions import col @@ -70,7 +72,7 @@ def fetch_schema_from_github( schema_dict = {} for table in dq_rules_dict["tables"]: if "validate_table_schema_url" in table: - url = table["validate_table_schema_url"] # TODO: validate URL + url = table["validate_table_schema_url"] r = requests.get(url) schema = json.loads(r.text) schema_dict[table["table_name"]] = schema @@ -123,7 +125,7 @@ def generate_dq_rules_from_schema( rule_type = column_type.capitalize() + "Type" rule = Rule( rule_name="expect_column_values_to_be_of_type", - parameters=[{"column": column, "type_": rule_type}], + parameters={"column": column, "type_": rule_type}, ) table["rules"].append(rule) @@ -184,7 +186,7 @@ def read_data_quality_rules_from_json(file_path: str) -> str: return dq_rules_json_string -def validate_and_load_data_quality_rules( +def load_data_quality_rules_from_json_string( dq_rules_json_string: str, ) -> Any | None: """ @@ -224,8 +226,10 @@ def data_quality_rules_json_string_to_dict( :param json_string: A JSON string with all DQ configuration. :return: rule_json: A dictionary with all DQ configuration. """ - dq_rules_dict: DataQualityRulesDict = validate_and_load_data_quality_rules( - dq_rules_json_string=json_string + dq_rules_dict: DataQualityRulesDict = ( + load_data_quality_rules_from_json_string( + dq_rules_json_string=json_string + ) ) for table in dq_rules_dict["tables"]: @@ -245,7 +249,136 @@ def get_data_quality_rules_dict(file_path: str) -> DataQualityRulesDict: dq_rules_json_string = read_data_quality_rules_from_json( file_path=file_path ) - data_quality_rules_dict = validate_and_load_data_quality_rules( + data_quality_rules_dict = load_data_quality_rules_from_json_string( dq_rules_json_string=dq_rules_json_string ) + validate_data_quality_rules_dict( + data_quality_rules_dict=data_quality_rules_dict + ) + return data_quality_rules_dict + + +def validate_data_quality_rules_dict( + data_quality_rules_dict: Any | None, +) -> None: + """ + Validates the format of the input JSON file containing all GX validations + for a particular dataset. + """ + # data_quality_rules_dict should (obviously) be a dict + if not isinstance(data_quality_rules_dict, dict): + raise TypeError("'data_quality_rules_dict' should be of type 'dict'") + + validate_dataset(data_quality_rules_dict=data_quality_rules_dict) + validate_tables(data_quality_rules_dict=data_quality_rules_dict) + + for rules_dict in data_quality_rules_dict["tables"]: + validate_rules_dict(rules_dict=rules_dict) + + if len(rules_dict["rules"]) == 0: + validate_table_schema(rules_dict=rules_dict) + else: + for rule in rules_dict["rules"]: + validate_rule(rule=rule) + + +def validate_dataset(data_quality_rules_dict: dict) -> None: + if "dataset" not in data_quality_rules_dict: + raise KeyError("No 'dataset' key found in data_quality_rules_dict") + + if not isinstance(data_quality_rules_dict["dataset"], dict): + raise TypeError("'dataset' should be of type 'dict'") + + if "name" not in data_quality_rules_dict["dataset"]: + raise KeyError( + "No 'name' key found in data_quality_rules_dict['dataset']" + ) + if "layer" not in data_quality_rules_dict["dataset"]: + raise KeyError( + "No 'layer' key found in data_quality_rules_dict['dataset']" + ) + + if not isinstance(data_quality_rules_dict["dataset"]["name"], str): + raise TypeError("Dataset 'name' should be of type 'str'") + if not isinstance(data_quality_rules_dict["dataset"]["layer"], str): + raise TypeError("Dataset 'layer' should be of type 'str'") + + +def validate_tables(data_quality_rules_dict: Any) -> None: + if "tables" not in data_quality_rules_dict: + raise KeyError("No 'tables' key found in data_quality_rules_dict") + + if not isinstance(data_quality_rules_dict["tables"], list): + raise TypeError("'tables' should be of type 'list'") + + +def validate_rules_dict(rules_dict: dict) -> None: + # All RulesDict objects in 'tables' should... + + # ... be a dict + if not isinstance(rules_dict, dict): + raise TypeError(f"{rules_dict} should be of type 'dict'") + + # ... contain 'unique_identifier', 'table_name' and 'rules' keys + if "unique_identifier" not in rules_dict: + raise KeyError(f"No 'unique_identifier' key found in {rules_dict}") + if "table_name" not in rules_dict: + raise KeyError(f"No 'table_name' key found in {rules_dict}") + if "rules" not in rules_dict: + raise KeyError(f"No 'rules' key found in {rules_dict}") + + if not isinstance(rules_dict["rules"], list): + raise TypeError(f"In {rules_dict}, 'rules' should be of type 'list'") + + +def validate_table_schema(rules_dict: dict) -> None: + if "validate_table_schema" not in rules_dict: + raise KeyError(f"No 'validate_table_schema' key found in {rules_dict}") + if "validate_table_schema_url" not in rules_dict: + raise KeyError( + f"No 'validate_table_schema_url' key found in {rules_dict}" + ) + if not validators.url(rules_dict["validate_table_schema_url"]): + raise ValueError( + f"The url specified in {rules_dict['validate_table_schema_url']} " + f"is invalid" + ) + + +def validate_rule(rule: dict) -> None: + # All Rule objects should... + + # ... be a dict + if not isinstance(rule, dict): + raise TypeError(f"{rule} should be of type 'dict'") + + # ... contain 'rule_name' and 'parameters' as keys + if "rule_name" not in rule: + raise KeyError(f"No 'rule_name' key found in {rule}") + if "parameters" not in rule: + raise KeyError(f"No 'parameters' key found in {rule}") + + # ... contain string-typed expectation names... + if not isinstance(rule["rule_name"], str): + raise TypeError(f"In {rule}, 'rule_name' should be of type 'str'") + # ... as defined in GX (which switched to Pascal case in v1.0) + if not humps.is_pascalcase(rule["rule_name"]): + raise ValueError( + f"The expectation name" + f" '{rule['rule_name']}' " + f"should " + f"be written in Pascal case, " + f"e.g. 'WrittenLikeThis' instead of " + f"'written_like_this' " + f"(hint: " + f"'{humps.pascalize(rule['rule_name'])}')" + ) + + # 'parameters' should NOT be a list (as used in previous + # versions), but a dict. The consequence of this is that the + # same expectation should be repeated multiple times, with a + # single dict of parameters each - decreasing the complexity + # of the dataclass, but adding 'repeated' expectations + if not isinstance(rule["parameters"], dict): + raise TypeError(f"In {rule}, 'parameters' should be of type 'dict'") diff --git a/src/dq_suite/output_transformations.py b/src/dq_suite/output_transformations.py index 02bc2ed..f6c1a1e 100644 --- a/src/dq_suite/output_transformations.py +++ b/src/dq_suite/output_transformations.py @@ -10,17 +10,17 @@ DataQualityRulesDict, ValidationSettings, is_empty_dataframe, - write_to_unity_catalog, merge_df_with_unity_table, + write_to_unity_catalog, ) +from .schemas.afwijking import SCHEMA as AFWIJKING_SCHEMA +from .schemas.bronattribuut import SCHEMA as BRONATTRIBUUT_SCHEMA from .schemas.brondataset import SCHEMA as BRONDATASET_SCHEMA from .schemas.brontabel import SCHEMA as BRONTABEL_SCHEMA -from .schemas.bronattribuut import SCHEMA as BRONATTRIBUUT_SCHEMA +from .schemas.pre_afwijking import SCHEMA as PRE_AFWIJKING_SCHEMA +from .schemas.pre_validatie import SCHEMA as PRE_VALIDATIE_SCHEMA from .schemas.regel import SCHEMA as REGEL_SCHEMA from .schemas.validatie import SCHEMA as VALIDATIE_SCHEMA -from .schemas.pre_validatie import SCHEMA as PRE_VALIDATIE_SCHEMA -from .schemas.afwijking import SCHEMA as AFWIJKING_SCHEMA -from .schemas.pre_afwijking import SCHEMA as PRE_AFWIJKING_SCHEMA def create_empty_dataframe( @@ -45,9 +45,12 @@ def construct_regel_id( df: DataFrame, output_columns_list: list[str], ) -> DataFrame: - df_with_id = df.withColumn("regelId", xxhash64(col("regelNaam"), col("regelParameters"), col("bronTabelId"))) + df_with_id = df.withColumn( + "regelId", + xxhash64(col("regelNaam"), col("regelParameters"), col("bronTabelId")), + ) return df_with_id.select(*output_columns_list) - + def create_parameter_list_from_results(result: dict) -> list[dict]: parameters = result["kwargs"] @@ -62,11 +65,15 @@ def get_target_attr_for_rule(result: dict) -> str: return result["kwargs"].get("column_list") -def get_unique_deviating_values(deviating_attribute_value: list[str]) -> set[str]: +def get_unique_deviating_values( + deviating_attribute_value: list[str], +) -> set[str]: unique_deviating_values = set() for waarde in deviating_attribute_value: if isinstance(waarde, dict): - waarde = tuple(waarde.items()) #transform because a dict cannot be added to a set + waarde = tuple( + waarde.items() + ) # transform because a dict cannot be added to a set unique_deviating_values.add(waarde) return unique_deviating_values @@ -79,8 +86,9 @@ def filter_df_based_on_deviating_values( if value is None: return df.filter(col(attribute).isNull()) elif isinstance(attribute, list): - # In case of compound keys, "attribute" is a list and "value" is a dict like tuple. - # The indeces will match, and we take [1] for value, because the "key" is stored in [0]. + # In case of compound keys, "attribute" is a list and "value" is a dict + # like tuple. The indeces will match, and we take [1] for value, + # because the "key" is stored in [0]. number_of_attrs = len(attribute) for i in range(number_of_attrs): df = df.filter(col(attribute[i]) == value[i][1]) @@ -94,18 +102,19 @@ def get_grouped_ids_per_deviating_value( unique_identifier: list[str], ) -> list[str]: ids = ( - filtered_df.select(unique_identifier) - .rdd.flatMap(lambda x: x) - .collect() + filtered_df.select(unique_identifier).rdd.flatMap(lambda x: x).collect() ) number_of_unique_ids = len(unique_identifier) - return [ids[x:x+number_of_unique_ids] for x in range(0, len(ids), number_of_unique_ids)] + return [ + ids[x : x + number_of_unique_ids] + for x in range(0, len(ids), number_of_unique_ids) + ] def extract_dq_validatie_data( table_name: str, dataset_name: str, - dq_result: dict, + dq_result: CheckpointDescriptionDict, catalog_name: str, spark_session: SparkSession, ) -> None: @@ -113,17 +122,20 @@ def extract_dq_validatie_data( [insert explanation here] :param table_name: Name of the tables - :param dq_result: # TODO: add dataclass? + :param dataset_name: + :param dq_result: :param catalog_name: :param spark_session: """ tabel_id = f"{dataset_name}_{table_name}" + + # "validation_results" is typed List[Dict[str, Any]] in GX dq_result = dq_result["validation_results"] # run_time = dq_result["meta"]["run_id"].run_time run_time = datetime.datetime(1900, 1, 1) # TODO: fix, find run_time in new GX API - + extracted_data = [] for validation_result in dq_result: for expectation_result in validation_result["expectations"]: @@ -135,8 +147,10 @@ def extract_dq_validatie_data( ) aantal_valide_records = element_count - unexpected_count expectation_type = expectation_result["expectation_type"] - parameter_list = create_parameter_list_from_results(result=expectation_result) - attribute = expectation_result["kwargs"].get("column") + parameter_list = create_parameter_list_from_results( + result=expectation_result + ) + expectation_result["kwargs"].get("column") output = expectation_result["success"] output_text = "success" if output else "failure" @@ -159,7 +173,13 @@ def extract_dq_validatie_data( ) df_validatie_with_id_ordered = construct_regel_id( df=df_validatie, - output_columns_list=['regelId','aantalValideRecords','aantalReferentieRecords','dqDatum','dqResultaat'] + output_columns_list=[ + "regelId", + "aantalValideRecords", + "aantalReferentieRecords", + "dqDatum", + "dqResultaat", + ], ) if not is_empty_dataframe(df=df_validatie_with_id_ordered): write_to_unity_catalog( @@ -176,7 +196,7 @@ def extract_dq_validatie_data( def extract_dq_afwijking_data( table_name: str, dataset_name: str, - dq_result: dict, # TODO: add dataclass? + dq_result: CheckpointDescriptionDict, df: DataFrame, unique_identifier: str, catalog_name: str, @@ -186,6 +206,7 @@ def extract_dq_afwijking_data( [insert explanation here] :param table_name: Name of the table + :param dataset_name: :param dq_result: :param df: A DataFrame containing the invalid (deviated) result :param unique_identifier: @@ -193,37 +214,40 @@ def extract_dq_afwijking_data( :param spark_session: """ tabel_id = f"{dataset_name}_{table_name}" + + # "validation_results" is typed List[Dict[str, Any]] in GX dq_result = dq_result["validation_results"] # run_time = dq_result["meta"]["run_id"].run_time run_time = datetime.datetime(1900, 1, 1) # TODO: fix, find run_time in new GX API - + extracted_data = [] - if not isinstance(unique_identifier, list): unique_identifier = [unique_identifier] + if not isinstance(unique_identifier, list): + unique_identifier = [unique_identifier] for validation_result in dq_result: for expectation_result in validation_result["expectations"]: expectation_type = expectation_result["expectation_type"] - parameter_list = create_parameter_list_from_results(result=expectation_result) + parameter_list = create_parameter_list_from_results( + result=expectation_result + ) attribute = get_target_attr_for_rule(result=expectation_result) deviating_attribute_value = expectation_result["result"].get( - "partial_unexpected_list", [] + "partial_unexpected_list", [] ) unique_deviating_values = get_unique_deviating_values( deviating_attribute_value ) for value in unique_deviating_values: filtered_df = filter_df_based_on_deviating_values( - value=value, - attribute=attribute, - df=df + value=value, attribute=attribute, df=df ) grouped_ids = get_grouped_ids_per_deviating_value( - filtered_df=filtered_df, - unique_identifier=unique_identifier + filtered_df=filtered_df, unique_identifier=unique_identifier ) - if isinstance(attribute, list): value = str(value) + if isinstance(attribute, list): + value = str(value) extracted_data.append( { "identifierVeldWaarde": grouped_ids, @@ -242,7 +266,12 @@ def extract_dq_afwijking_data( ) df_afwijking_with_id_ordered = construct_regel_id( df=df_afwijking, - output_columns_list=['regelId','identifierVeldWaarde','afwijkendeAttribuutWaarde','dqDatum'] + output_columns_list=[ + "regelId", + "identifierVeldWaarde", + "afwijkendeAttribuutWaarde", + "dqDatum", + ], ) if not is_empty_dataframe(df=df_afwijking): write_to_unity_catalog( @@ -313,9 +342,11 @@ def create_brontabel( tabel_id = f"{dataset_name}_{table_name}" unique_identifier = param["unique_identifier"] extracted_data.append( - {"bronTabelId": tabel_id, - "tabelNaam": table_name, - "uniekeSleutel": unique_identifier} + { + "bronTabelId": tabel_id, + "tabelNaam": table_name, + "uniekeSleutel": unique_identifier, + } ) df_brontabel = list_of_dicts_to_df( @@ -394,7 +425,7 @@ def create_bronattribute( df_merge_id="bronAttribuutId", merge_dict=merge_dict, spark_session=spark_session, - ) + ) def create_dq_regel( @@ -426,7 +457,7 @@ def create_dq_regel( "regelNaam": rule_name, "regelParameters": parameters, "bronTabelId": tabel_id, - "attribuut": column + "attribuut": column, } ) @@ -437,14 +468,20 @@ def create_dq_regel( ) df_regel_with_id_ordered = construct_regel_id( df=df_regel, - output_columns_list=['regelId','regelNaam','regelParameters','bronTabelId','attribuut'] + output_columns_list=[ + "regelId", + "regelNaam", + "regelParameters", + "bronTabelId", + "attribuut", + ], ) merge_dict = { "regelId": "regel_df.regelId", "regelNaam": "regel_df.regelNaam", "regelParameters": "regel_df.regelParameters", "bronTabelId": "regel_df.bronTabelId", - "attribuut": "regel_df.attribuut" + "attribuut": "regel_df.attribuut", } merge_df_with_unity_table( df=df_regel_with_id_ordered, @@ -454,7 +491,7 @@ def create_dq_regel( df_merge_id="regelId", merge_dict=merge_dict, spark_session=spark_session, - ) + ) def write_non_validation_tables( diff --git a/src/dq_suite/schemas/brontabel.py b/src/dq_suite/schemas/brontabel.py index 02e80cc..1ca55a6 100644 --- a/src/dq_suite/schemas/brontabel.py +++ b/src/dq_suite/schemas/brontabel.py @@ -5,4 +5,4 @@ .add("bronTabelId", "string") .add("tabelNaam", "string") .add("uniekeSleutel", "string") -) \ No newline at end of file +) diff --git a/tests/__init__.py b/tests/__init__.py index e69de29..d6ee825 100644 --- a/tests/__init__.py +++ b/tests/__init__.py @@ -0,0 +1,3 @@ +from pathlib import Path + +TEST_DATA_FOLDER = Path(__file__).parent / "test_data" diff --git a/tests/test_common.py b/tests/test_common.py index 49b224c..351f5d2 100644 --- a/tests/test_common.py +++ b/tests/test_common.py @@ -16,14 +16,14 @@ class TestRule: expected_rule_name = "the_rule" - expected_parameters = [{"q": 42}] + expected_parameters = {"q": 42} rule_obj = Rule( rule_name=expected_rule_name, parameters=expected_parameters ) def test_initialisation_with_wrong_typed_rule_name_raises_type_error(self): with pytest.raises(TypeError): - assert Rule(rule_name=123, parameters=[{}]) + assert Rule(rule_name=123, parameters={}) def test_initialisation_with_wrong_typed_parameters_raises_type_error(self): with pytest.raises(TypeError): @@ -42,7 +42,7 @@ def test_get_value_from_rule_by_non_existing_key_raises_key_error(self): class TestRulesDict: - rule_obj = Rule(rule_name="the_rule", parameters=[{"q": 42}]) + rule_obj = Rule(rule_name="the_rule", parameters={"q": 42}) expected_unique_identifier = "id" expected_table_name = "the_table" expected_rules_list = [rule_obj] @@ -99,7 +99,9 @@ def test_get_value_from_rule_dict_by_non_existing_key_raises_key_error( class TestDatasetDict: expected_dataset_name = "the_dataset" expected_layer_name = "brons" - dataset_obj = DatasetDict(name=expected_dataset_name, layer=expected_layer_name) + dataset_obj = DatasetDict( + name=expected_dataset_name, layer=expected_layer_name + ) def test_initialisation_with_wrong_typed_name_raises_type_error(self): with pytest.raises(TypeError): @@ -122,7 +124,7 @@ def test_get_value_from_dataset_by_non_existing_key_raises_key_error(self): class TestDataQualityRulesDict: - rule_obj = Rule(rule_name="the_rule", parameters=[{"q": 42}]) + rule_obj = Rule(rule_name="the_rule", parameters={"q": 42}) expected_unique_identifier = "id" expected_table_name = "the_table" expected_rules_list = [rule_obj] @@ -135,13 +137,11 @@ class TestDataQualityRulesDict: expected_dataset_name = "the_dataset" expected_layer_name = "brons" dataset_obj = DatasetDict( - name=expected_dataset_name, - layer=expected_layer_name + name=expected_dataset_name, layer=expected_layer_name ) dataset_obj = {"name": "the_dataset", "layer": "brons"} data_quality_rules_dict = DataQualityRulesDict( - dataset=dataset_obj, - tables=expected_rules_dict_obj_list + dataset=dataset_obj, tables=expected_rules_dict_obj_list ) def test_initialisation_with_wrong_typed_dataset_raises_type_error(self): @@ -153,17 +153,16 @@ def test_initialisation_with_wrong_typed_dataset_raises_type_error(self): unique_identifier="id", table_name="the_table", rules_list=[ - Rule(rule_name="the_rule", parameters=[{"q": 42}]) + Rule(rule_name="the_rule", parameters={"q": 42}) ], ) - ] + ], ) def test_initialisation_with_wrong_typed_tables_raises_type_error(self): with pytest.raises(TypeError): assert DataQualityRulesDict( - dataset={"name": "the_dataset", "layer": "brons"}, - tables=123 + dataset={"name": "the_dataset", "layer": "brons"}, tables=123 ) def test_get_value_from_data_quality_rules_dict_by_existing_key(self): diff --git a/tests/test_data/dq_rules.json b/tests/test_data/dq_rules.json index 2fa9a03..ec8f214 100644 --- a/tests/test_data/dq_rules.json +++ b/tests/test_data/dq_rules.json @@ -1,17 +1,19 @@ { + "dataset": { + "name": "the_dataset", + "layer": "the_layer" + }, "tables": [ { "unique_identifier": "id", "table_name": "the_table", "rules": [ { - "rule_name": "expect_column_distinct_values_to_equal_set", - "parameters": [ - { + "rule_name": "ExpectColumnDistinctValuesToEqualSet", + "parameters": { "column": "the_column", "value_set": [1, 2, 3] } - ] } ] } diff --git a/tests/test_input_helpers.py b/tests/test_input_helpers.py index 5634a3b..430c541 100644 --- a/tests/test_input_helpers.py +++ b/tests/test_input_helpers.py @@ -1,22 +1,283 @@ import pytest +from tests import TEST_DATA_FOLDER -from src.dq_suite.input_helpers import read_data_quality_rules_from_json +from src.dq_suite.input_helpers import ( + load_data_quality_rules_from_json_string, + read_data_quality_rules_from_json, + validate_data_quality_rules_dict, + validate_dataset, + validate_rule, + validate_rules_dict, + validate_table_schema, + validate_tables, +) -class TestReadDataQualityRulesFromJson: - dummy_file_path = "nonexistent_file_path" - real_file_path = "tests/test_data/dq_rules.json" +@pytest.fixture +def real_file_path(): + return f"{TEST_DATA_FOLDER}/dq_rules.json" + + +@pytest.fixture +def data_quality_rules_json_string(real_file_path): + return read_data_quality_rules_from_json(file_path=real_file_path) + + +@pytest.fixture +def data_quality_rules_dict(data_quality_rules_json_string): + return load_data_quality_rules_from_json_string( + dq_rules_json_string=data_quality_rules_json_string + ) + + +@pytest.fixture +def rules_dict(data_quality_rules_dict): + return data_quality_rules_dict["tables"][0] + +@pytest.mark.usefixtures("real_file_path") +class TestReadDataQualityRulesFromJson: def test_read_data_quality_rules_from_json_raises_file_not_found_error( self, ): with pytest.raises(FileNotFoundError): - read_data_quality_rules_from_json(file_path=self.dummy_file_path) + read_data_quality_rules_from_json(file_path="nonexistent_file_path") + + def test_read_data_quality_rules_from_json_returns_json_string( + self, real_file_path + ): + data_quality_rules_json_string = read_data_quality_rules_from_json( + file_path=real_file_path + ) + assert isinstance(data_quality_rules_json_string, str) + + +@pytest.mark.usefixtures("data_quality_rules_json_string") +class TestLoadDataQualityRulesFromJsonString: + # TODO: implement tests for all failure paths (and raise errors in + # read_data_quality_rules_from_json) + + def test_load_data_quality_rules_from_json_string( + self, data_quality_rules_json_string + ): + data_quality_rules_dict = load_data_quality_rules_from_json_string( + dq_rules_json_string=data_quality_rules_json_string + ) + assert isinstance(data_quality_rules_dict, dict) + + +@pytest.mark.usefixtures("data_quality_rules_dict") +class TestValidateDataQualityRulesDict: + def test_validate_data_quality_rules_dict_raises_type_error( + self, + ): + with pytest.raises(TypeError): + validate_data_quality_rules_dict( + data_quality_rules_dict="wrong_type" + ) + + def test_validate_data_quality_rules_dict(self, data_quality_rules_dict): + validate_data_quality_rules_dict( + data_quality_rules_dict=data_quality_rules_dict + ) + + +@pytest.mark.usefixtures("data_quality_rules_dict") +class TestValidateDataSet: + def test_validate_dataset_without_dataset_key_raises_key_error( + self, + ): + with pytest.raises(KeyError): + validate_dataset( + data_quality_rules_dict={"some_other_thing": "with_some_value"} + ) + + def test_validate_dataset_without_dict_typed_value_raises_type_error( + self, + ): + with pytest.raises(TypeError): + validate_dataset( + data_quality_rules_dict={"dataset": "with_some_value"} + ) + + def test_validate_dataset_without_name_key_raises_key_error( + self, + ): + with pytest.raises(KeyError): + validate_dataset(data_quality_rules_dict={"dataset": dict()}) + + def test_validate_dataset_without_layer_key_raises_key_error( + self, + ): + with pytest.raises(KeyError): + validate_dataset(data_quality_rules_dict={"dataset": {"name": 123}}) + + def test_validate_dataset_without_str_typed_name_raises_type_error( + self, + ): + with pytest.raises(TypeError): + validate_dataset( + data_quality_rules_dict={"dataset": {"name": 123, "layer": 456}} + ) + + def test_validate_dataset_without_str_typed_layer_raises_type_error( + self, + ): + with pytest.raises(TypeError): + validate_dataset( + data_quality_rules_dict={ + "dataset": {"name": "the_dataset_name", "layer": 456} + } + ) + + def test_validate_dataset_works_as_expected(self, data_quality_rules_dict): + validate_dataset(data_quality_rules_dict=data_quality_rules_dict) + + +@pytest.mark.usefixtures("data_quality_rules_dict") +class TestValidateTables: + def test_validate_tables_without_tables_key_raises_key_error( + self, + ): + with pytest.raises(KeyError): + validate_tables( + data_quality_rules_dict={"some_other_thing": "with_some_value"} + ) + + def test_validate_tables_without_list_typed_value_raises_type_error( + self, + ): + with pytest.raises(TypeError): + validate_tables( + data_quality_rules_dict={"tables": "with_some_value"} + ) + + def test_validate_tables_works_as_expected(self, data_quality_rules_dict): + validate_tables(data_quality_rules_dict=data_quality_rules_dict) - @pytest.mark.skip() # TODO: fix self.real_file_path - def test_read_data_quality_rules_from_json_returns_json_string(self): - dq_rules_json_string = read_data_quality_rules_from_json( - file_path=self.real_file_path + +@pytest.mark.usefixtures("rules_dict") +class TestValidateRulesDict: + def test_validate_rules_dict_without_dict_typed_value_raises_type_error( + self, + ): + with pytest.raises(TypeError): + validate_rules_dict(rules_dict="not_a_dict") + + def test_validate_rules_dict_without_unique_identifier_key_raises_key_error( + self, + ): + with pytest.raises(KeyError): + validate_rules_dict(rules_dict=dict()) + + def test_validate_rules_dict_without_table_name_key_raises_key_error( + self, + ): + with pytest.raises(KeyError): + validate_rules_dict(rules_dict={"unique_identifier": 123}) + + def test_validate_rules_dict_without_rules_key_raises_key_error( + self, + ): + with pytest.raises(KeyError): + validate_rules_dict( + rules_dict={"unique_identifier": 123, "table_name": 456} + ) + + def test_validate_rules_dict_without_list_typed_rules_raises_type_error( + self, + ): + with pytest.raises(TypeError): + validate_rules_dict( + rules_dict={ + "unique_identifier": 123, + "table_name": 456, + "rules": 789, + } + ) + + def test_validate_rules_dict_works_as_expected(self, rules_dict): + validate_rules_dict(rules_dict=rules_dict) + + +class TestValidateTableSchema: + def test_validate_table_schema_without_validate_table_schema_key_raises_key_error( + self, + ): + with pytest.raises(KeyError): + validate_table_schema(rules_dict=dict()) + + def test_validate_table_schema_without_validate_table_schema_url_key_raises_key_error( + self, + ): + with pytest.raises(KeyError): + validate_table_schema( + rules_dict={"validate_table_schema": "some_table_name"} + ) + + def test_validate_table_schema_with_invalid_url_raises_value_error(self): + with pytest.raises(ValueError): + validate_table_schema( + rules_dict={ + "validate_table_schema": "some_table_name", + "validate_table_schema_url": "some_invalid_url", + } + ) + + def test_validate_table_schema_works_as_expected( + self, + ): + validate_table_schema( + rules_dict={ + "validate_table_schema": "some_table_name", + "validate_table_schema_url": "https://www.someurl.nl", + } + ) + + +class TestValidateRule: + def test_validate_rule_without_dict_typed_rule_raises_type_error( + self, + ): + with pytest.raises(TypeError): + validate_rule(rule="not_a_dict") + + def test_validate_rule_without_rule_name_key_raises_key_error( + self, + ): + with pytest.raises(KeyError): + validate_rule(rule=dict()) + + def test_validate_rule_without_parameters_key_raises_key_error( + self, + ): + with pytest.raises(KeyError): + validate_rule(rule={"rule_name": 123}) + + def test_validate_rule_without_string_typed_rule_name_raises_type_error( + self, + ): + with pytest.raises(TypeError): + validate_rule(rule={"rule_name": 123, "parameters": 456}) + + def test_validate_rule_without_pascal_case_rule_name_raises_value_error( + self, + ): + with pytest.raises(ValueError): + validate_rule(rule={"rule_name": "the_rule", "parameters": 456}) + + def test_validate_rule_without_dict_typed_parameters_raises_type_error( + self, + ): + with pytest.raises(TypeError): + validate_rule(rule={"rule_name": "TheRule", "parameters": 456}) + + def test_validate_rule_works_as_expected( + self, + ): + validate_rule( + rule={ + "rule_name": "TheRule", + "parameters": {"some_key": "some_value"}, + } ) - assert type(dq_rules_json_string) is str - assert "tables" in dq_rules_json_string