From d5e12e3ca47ccde2c4552eb978f1ab7e51683bba Mon Sep 17 00:00:00 2001 From: Bas Date: Thu, 14 Nov 2024 15:08:31 +0100 Subject: [PATCH] Issue 36: increase test coverage of validation.py (#68) * Rename to validation.py * Move filter_validation_dict_by_table_name to input_helpers.py, modify/add related tests * Minor change * Formatting * Add placeholder test classes * Move a bunch of GX data_context method calls to ValidationSettings.create_batch_definition * Start implementing TestGetOrAddValidationDefinition, mock create_batch_definition call * Make sure create_batch_definition only initialise_or_update_attributes when necessary * Stop printing of expectation suite * Simplify get_or_add_validation_definition, move parts to ValidationSettings * Add some todos * Remove dataclass decorator * Fix initialisation of ValidationSettings * Add another todo * Refactor more logic into ValidationSettings.create_validation_definition * Formatting * Remove get_or_add_validation_definition and test_get_or_add_validation_definition * Separate into ValidationSettings dataclass and ValidationRunner class * Formatting * Refactor some functions to only use ValidationRunner instance * Separate call to _set_data_context during init * Fix some imports * Add TestValidationRunner * Formatting * Pull action_list into ValidationRunner * Re-add __post_init__ to ValidationSettings * Add post-init checks * Make name derived variables private * Fix post-init checks * Fix broken tests * Added tests for create_action_list * Create empty list upon each call to create_action_list * Make _initialise_or_update_name_parameters private, call upon init * Pull add_expectations_to_suite into ValidationRunner * Pull _get_or_add_checkpoint and run_validation into ValidationRunner * Add docstring and type hint, rename to run * Formatting * Fix broken tests * Remove validation_settings_obj as an argument * Update README.md * Rename check_name to validation_name * Fix broken tests * Add docstring and todo * Add test_get_gx_expectation_object * Simplify code by removing boolean send_..._notification arguments * Formatting * Add todo * Fix broken tests and imports * Add test_rules_dict_without_rules_field_results_in_table_schema_validation * Modify README.md * Add docstrings * Rename to validation_name * Remove test_get_or_add_checkpoint * Formatting * Add TODO + comments * Add docstrings * Update README.md * Modify docstrings, update README.md * Increase version * Update README.md * Fix broken test_extract_table_data_returns_correct_list * Fix some tests in test_common.py * Modify README.md * Add test for _get_or_add_expectation_suite * Add another test for _get_gx_expectation_object * Add test for add_expectations_to_suite * Formatting * Remove test * Add some comments and todos * Make action_list tests more explicit * Remove some 'pragma: no cover' statements * Fix use of MicrosoftTeamsNotificationAction * Minor changes * Minor changes * Modify docstring --------- Co-authored-by: bas --- README.md | 27 +- pyproject.toml | 2 +- src/dq_suite/__init__.py | 9 +- src/dq_suite/common.py | 102 ++++---- src/dq_suite/df_checker.py | 234 ----------------- src/dq_suite/validation.py | 374 +++++++++++++++++++++++++++ src/dq_suite/validation_input.py | 12 +- tests/test_common.py | 115 ++++++-- tests/test_data/dq_rules.json | 7 + tests/test_output_transformations.py | 5 + tests/test_validation.py | 236 +++++++++++++++++ tests/test_validation_input.py | 51 ++++ 12 files changed, 840 insertions(+), 334 deletions(-) delete mode 100644 src/dq_suite/df_checker.py create mode 100644 src/dq_suite/validation.py create mode 100644 tests/test_validation.py diff --git a/README.md b/README.md index 6d94283..66ed585 100644 --- a/README.md +++ b/README.md @@ -26,28 +26,31 @@ pip install dq-suite-amsterdam 3. Get ready to validate your first table. To do so, define -- `catalog_name` as the name of your catalog -- `table_name` as the name of the table for which a data quality check is required. This name should also occur in the JSON file - `dq_rule_json_path` as a path to a JSON file, formatted in [this](dq_rules_example.json) way - `df` as a Spark dataframe containing the table that needs to be validated (e.g. via `spark.read.csv` or `spark.read.table`) +- `spark` as a SparkSession object (in Databricks notebooks, this is by default called `spark`) +- `catalog_name` as the name of your catalog ('dpxx_dev' or 'dpxx_prd') +- `table_name` as the name of the table for which a data quality check is required. This name should also occur in the JSON file at `dq_rule_json_path` + 4. Finally, perform the validation by running ```python import dq_suite -validation_settings_obj = dq_suite.ValidationSettings(spark_session=spark, - catalog_name=catalog_name, - table_name=table_name, - check_name="name_of_check_goes_here") -dq_suite.run(json_path=dq_rule_json_path, df=df, validation_settings_obj=validation_settings_obj) +dq_suite.validation.run( + json_path=dq_rule_json_path, + df=df, + spark_session=spark, + catalog_name=catalog_name, + table_name=table_name, + validation_name="my_validation_name", +) ``` -Note: Looping over multiple data frames may require a redefinition of the `json_path` and `validation_settings` variables. - -See the documentation of `ValidationSettings` for what other parameters can be passed upon intialisation. +See the documentation of `dq_suite.validation.run` for what other parameters can be passed. -# Known exceptions +# Known exceptions / issues - The functions can run on Databricks using a Personal Compute Cluster or using a Job Cluster. Using a Shared Compute Cluster will result in an error, as it does not have the permissions that Great Expectations requires. @@ -57,7 +60,7 @@ Older versions of DBR will result in errors upon install of the `dq-suite-amster - At time of writing (late Aug 2024), Great Expectations v1.0.0 has just been released, and is not (yet) compatible with Python 3.12. Hence, make sure you are using the correct version of Python as interpreter for your project. -- The run_time is defined separately from Great Expectations in df_checker. We plan on fixing it when Great Expectations has documented how to access it from the RunIdentifier object. +- The `run_time` value is defined separately from Great Expectations in `validation.py`. We plan on fixing this when Great Expectations has documented how to access it from the RunIdentifier object. # Updates diff --git a/pyproject.toml b/pyproject.toml index 135499d..29802d1 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta" [project] name = "dq-suite-amsterdam" -version = "0.11.2" +version = "0.11.3" authors = [ { name="Arthur Kordes", email="a.kordes@amsterdam.nl" }, { name="Aysegul Cayir Aydar", email="a.cayiraydar@amsterdam.nl" }, diff --git a/src/dq_suite/__init__.py b/src/dq_suite/__init__.py index 43e14ba..6f8b2e9 100644 --- a/src/dq_suite/__init__.py +++ b/src/dq_suite/__init__.py @@ -1,7 +1,8 @@ """DQ API.""" +# from .common import ValidationSettings +# from .other import schema_to_json_string +# from .validation import run +# # # Use __all__ to let developers know what is part of the public API. -# __all__ = ["export_schema_to_json_string", "run", "ValidationSettings"] - #from .common import ValidationSettings - #from .df_checker import run - #from .other import schema_to_json_string +# __all__ = ["schema_to_json_string", "run", "ValidationSettings"] diff --git a/src/dq_suite/common.py b/src/dq_suite/common.py index 7f4cf11..d962d8e 100644 --- a/src/dq_suite/common.py +++ b/src/dq_suite/common.py @@ -1,14 +1,7 @@ from dataclasses import dataclass -from typing import Any, Dict, List, Literal +from typing import Literal from delta.tables import * -from great_expectations import ExpectationSuite, get_context -from great_expectations.data_context import AbstractDataContext -from great_expectations.data_context.types.base import ( - DataContextConfig, - InMemoryStoreBackendDefaults, -) -from great_expectations.exceptions import DataContextError from pyspark.sql import DataFrame, SparkSession from pyspark.sql.functions import col from pyspark.sql.types import StructType @@ -114,7 +107,7 @@ class DataQualityRulesDict: tables: RulesDictList def __post_init__(self): - if not isinstance(self.dataset, dict): + if not isinstance(self.dataset, DatasetDict): raise TypeError("'dataset' should be DatasetDict") if not isinstance(self.tables, list): @@ -128,6 +121,7 @@ def __getitem__(self, key) -> DatasetDict | RulesDictList | None: raise KeyError(key) +# TODO: replace by df.isEmpty() def is_empty_dataframe(df: DataFrame) -> bool: return len(df.take(1)) == 0 @@ -207,50 +201,32 @@ def merge_df_with_unity_table( ).execute() -def get_data_context() -> AbstractDataContext: # pragma: no cover - part of GX - return get_context( - project_config=DataContextConfig( - store_backend_defaults=InMemoryStoreBackendDefaults(), - analytics_enabled=False, - ) - ) - - @dataclass() class ValidationSettings: """ + Contains all user input required for running a validation. Typically, + this means catalog, table and validation names and a SparkSession object. + spark_session: SparkSession object catalog_name: name of unity catalog table_name: name of table in unity catalog - check_name: name of data quality check + validation_name: name of data quality check data_context_root_dir: path to write GX data context - default "/dbfs/great_expectations/" - data_context: a data context object - expectation_suite_name: name of the GX expectation suite - checkpoint_name: name of the GX checkpoint - run_name: name of the data quality run - send_slack_notification: indicator to use GX's built-in Slack - notification action - slack_webhook: webhook, recommended to store in key vault - send_ms_teams_notification: indicator to use GX's built-in Microsoft - Teams notification action - ms_teams_webhook: webhook, recommended to store in key vault + slack_webhook: webhook, recommended to store in key vault. If not None, + a Slack notification will be sent + ms_teams_webhook: webhook, recommended to store in key vault. If not None, + an MS Teams notification will be sent notify_on: when to send notifications, can be equal to "all", - "success" or "failure" + "success" or "failure" """ spark_session: SparkSession catalog_name: str table_name: str - check_name: str + validation_name: str data_context_root_dir: str = "/dbfs/great_expectations/" - data_context: AbstractDataContext | None = None - expectation_suite_name: str | None = None - checkpoint_name: str | None = None - run_name: str | None = None - send_slack_notification: bool = False slack_webhook: str | None = None - send_ms_teams_notification: bool = False ms_teams_webhook: str | None = None notify_on: Literal["all", "success", "failure"] = "failure" @@ -261,40 +237,50 @@ def __post_init__(self): raise TypeError("'catalog_name' should be of type str") if not isinstance(self.table_name, str): raise TypeError("'table_name' should be of type str") - if not isinstance(self.check_name, str): - raise TypeError("'check_name' should be of type str") + if not isinstance(self.validation_name, str): + raise TypeError("'validation_name' should be of type str") if not isinstance(self.data_context_root_dir, str): raise TypeError("'data_context_root_dir' should be of type str") + if not isinstance(self.slack_webhook, str): + if self.slack_webhook is not None: + raise TypeError("'slack_webhook' should be of type str") + if not isinstance(self.ms_teams_webhook, str): + if self.ms_teams_webhook is not None: + raise TypeError("'ms_teams_webhook' should be of type str") if self.notify_on not in ["all", "success", "failure"]: raise ValueError( "'notify_on' should be equal to 'all', 'success' or 'failure'" ) + self._initialise_or_update_name_parameters() - def initialise_or_update_attributes(self): # pragma: no cover - complex - # function - self._set_data_context() - - # TODO/check: do we want to allow for custom names via parameters? + def _initialise_or_update_name_parameters(self): + # TODO/check: nearly all names are related to 'validation_name' - do we want + # to allow for custom names via parameters? self._set_expectation_suite_name() self._set_checkpoint_name() self._set_run_name() - - # Finally, add/retrieve the suite to/from the data context - try: - self.data_context.suites.get(name=self.expectation_suite_name) - except DataContextError: - self.data_context.suites.add( - suite=ExpectationSuite(name=self.expectation_suite_name) - ) - - def _set_data_context(self): # pragma: no cover - uses part of GX - self.data_context = get_data_context() + self._set_data_source_name() + self._set_validation_definition_name() + self._set_batch_definition_name() def _set_expectation_suite_name(self): - self.expectation_suite_name = f"{self.check_name}_expectation_suite" + self._expectation_suite_name = ( + f"{self.validation_name}_expectation_suite" + ) def _set_checkpoint_name(self): - self.checkpoint_name = f"{self.check_name}_checkpoint" + self._checkpoint_name = f"{self.validation_name}_checkpoint" def _set_run_name(self): - self.run_name = f"%Y%m%d-%H%M%S-{self.check_name}" + self._run_name = f"%Y%m%d-%H%M%S-{self.validation_name}" + + def _set_data_source_name(self): + self._data_source_name = f"spark_data_source_{self.validation_name}" + + def _set_validation_definition_name(self): + self._validation_definition_name = ( + f"{self.validation_name}_validation_definition" + ) + + def _set_batch_definition_name(self): + self._batch_definition_name = f"{self.validation_name}_batch_definition" diff --git a/src/dq_suite/df_checker.py b/src/dq_suite/df_checker.py deleted file mode 100644 index d42b534..0000000 --- a/src/dq_suite/df_checker.py +++ /dev/null @@ -1,234 +0,0 @@ -import datetime -from typing import List - -import great_expectations -from great_expectations import Checkpoint, ValidationDefinition -from great_expectations.checkpoint.actions import CheckpointAction -from great_expectations.checkpoint.checkpoint import CheckpointResult -from great_expectations.exceptions import DataContextError -from pyspark.sql import DataFrame - -from .common import DataQualityRulesDict, Rule, RulesDict, ValidationSettings -from .output_transformations import ( - write_non_validation_tables, - write_validation_table, -) -from .validation_input import get_data_quality_rules_dict - - -def filter_validation_dict_by_table_name( - validation_dict: DataQualityRulesDict, table_name: str -) -> RulesDict | None: - for rules_dict in validation_dict["tables"]: - if rules_dict["table_name"] == table_name: - # Only one RulesDict per table expected, so return the first match - return rules_dict - return None - - -def get_or_add_validation_definition( - validation_settings_obj: ValidationSettings, -) -> ValidationDefinition: - dataframe_datasource = ( - validation_settings_obj.data_context.data_sources.add_or_update_spark( - name=f"spark_datasource_" f"{validation_settings_obj.check_name}" - ) - ) - - df_asset = dataframe_datasource.add_dataframe_asset( - name=validation_settings_obj.check_name - ) - batch_definition = df_asset.add_batch_definition_whole_dataframe( - name=f"{validation_settings_obj.check_name}_batch_definition" - ) - - validation_definition_name = ( - f"{validation_settings_obj.check_name}" f"_validation_definition" - ) - try: - validation_definition = ( - validation_settings_obj.data_context.validation_definitions.get( - name=validation_definition_name - ) - ) - except DataContextError: - validation_definition = ValidationDefinition( - name=validation_definition_name, - data=batch_definition, - suite=validation_settings_obj.data_context.suites.get( - validation_settings_obj.expectation_suite_name - ), - ) # Note: a validation definition combines data with a suite of - # expectations - validation_definition = ( - validation_settings_obj.data_context.validation_definitions.add( - validation=validation_definition - ) - ) - - return validation_definition - - -def create_action_list( - validation_settings_obj: ValidationSettings, -) -> List[CheckpointAction]: - action_list = list() - - if validation_settings_obj.send_slack_notification & ( - validation_settings_obj.slack_webhook is not None - ): - action_list.append( - great_expectations.checkpoint.SlackNotificationAction( - name="send_slack_notification", - slack_webhook=validation_settings_obj.slack_webhook, - notify_on=validation_settings_obj.notify_on, - renderer={ - "module_name": "great_expectations.render.renderer.slack_renderer", - "class_name": "SlackRenderer", - }, - ) - ) - - if validation_settings_obj.send_ms_teams_notification & ( - validation_settings_obj.ms_teams_webhook is not None - ): - action_list.append( - great_expectations.checkpoint.MicrosoftTeamsNotificationAction( - name="send_ms_teams_notification", - microsoft_teams_webhook=validation_settings_obj.ms_teams_webhook, - notify_on=validation_settings_obj.notify_on, - renderer={ - "module_name": "great_expectations.render.renderer.microsoft_teams_renderer", - "class_name": "MicrosoftTeamsRenderer", - }, - ) - ) - - return action_list - - -def get_or_add_checkpoint( - validation_settings_obj: ValidationSettings, - validation_definition: ValidationDefinition, -) -> Checkpoint: - try: - checkpoint = validation_settings_obj.data_context.checkpoints.get( - name=validation_settings_obj.checkpoint_name - ) - except DataContextError: - action_list = create_action_list( - validation_settings_obj=validation_settings_obj - ) - checkpoint = Checkpoint( - name=validation_settings_obj.checkpoint_name, - validation_definitions=[validation_definition], - actions=action_list, - ) # Note: a checkpoint combines validations with actions - - # Add checkpoint to data context for future use - ( - validation_settings_obj.data_context.checkpoints.add( - checkpoint=checkpoint - ) - ) - return checkpoint - - -def create_and_configure_expectations( - validation_rules_list: List[Rule], - validation_settings_obj: ValidationSettings, -) -> None: - # The suite should exist by now - suite = validation_settings_obj.data_context.suites.get( - name=validation_settings_obj.expectation_suite_name - ) - - for validation_rule in validation_rules_list: - # Get the name of expectation as defined by GX - gx_expectation_name = validation_rule["rule_name"] - gx_expectation_parameters: dict = validation_rule["parameters"] - - # Get the actual expectation as defined by GX - gx_expectation = getattr( - great_expectations.expectations.core, - gx_expectation_name, - ) - suite.add_expectation(gx_expectation(**gx_expectation_parameters)) - - -def validate( - df: DataFrame, - rules_dict: RulesDict, - validation_settings_obj: ValidationSettings, -) -> CheckpointResult: - """ - [explanation goes here] - - :param df: A list of DataFrame instances to process. - :param rules_dict: a RulesDict object containing the - data quality rules to be evaluated. - :param validation_settings_obj: [explanation goes here] - """ - # Make sure all attributes are aligned before validating - validation_settings_obj.initialise_or_update_attributes() - - create_and_configure_expectations( - validation_rules_list=rules_dict["rules"], - validation_settings_obj=validation_settings_obj, - ) - - validation_definition = get_or_add_validation_definition( - validation_settings_obj=validation_settings_obj, - ) - print("***Starting validation definition run***") - print(validation_definition.run(batch_parameters={"dataframe": df})) - checkpoint = get_or_add_checkpoint( - validation_settings_obj=validation_settings_obj, - validation_definition=validation_definition, - ) - - batch_params = {"dataframe": df} - return checkpoint.run(batch_parameters=batch_params) - - -def run( - json_path: str, df: DataFrame, validation_settings_obj: ValidationSettings -) -> None: - if not hasattr(df, "table_name"): - df.table_name = validation_settings_obj.table_name - - # 1) extract the data quality rules to be applied... - validation_dict = get_data_quality_rules_dict(file_path=json_path) - rules_dict = filter_validation_dict_by_table_name( - validation_dict=validation_dict, - table_name=validation_settings_obj.table_name, - ) - if rules_dict is None: - raise ValueError( - f"No validations found for table_name " - f"'{validation_settings_obj.table_name}' in JSON file at '" - f"{json_path}'." - ) - - # 2) perform the validation on the dataframe - checkpoint_result = validate( - df=df, - rules_dict=rules_dict, - validation_settings_obj=validation_settings_obj, - ) - validation_output = checkpoint_result.describe_dict() - run_time = datetime.datetime.now() # TODO: get from RunIdentifier object - - # 3) write results to unity catalog - write_non_validation_tables( - dq_rules_dict=validation_dict, - validation_settings_obj=validation_settings_obj, - ) - write_validation_table( - validation_output=validation_output, - validation_settings_obj=validation_settings_obj, - df=df, - dataset_name=validation_dict["dataset"]["name"], - unique_identifier=rules_dict["unique_identifier"], - run_time=run_time, - ) diff --git a/src/dq_suite/validation.py b/src/dq_suite/validation.py new file mode 100644 index 0000000..d2b7cd0 --- /dev/null +++ b/src/dq_suite/validation.py @@ -0,0 +1,374 @@ +import datetime +from typing import Dict, List, Literal + +from great_expectations import ( + Checkpoint, + ExpectationSuite, + ValidationDefinition, + get_context, +) +from great_expectations.checkpoint import ( + MicrosoftTeamsNotificationAction, + SlackNotificationAction, +) +from great_expectations.checkpoint.checkpoint import CheckpointResult +from great_expectations.core.batch_definition import BatchDefinition +from great_expectations.data_context import AbstractDataContext +from great_expectations.data_context.types.base import ( + DataContextConfig, + InMemoryStoreBackendDefaults, +) +from great_expectations.datasource.fluent import SparkDatasource +from great_expectations.datasource.fluent.spark_datasource import DataFrameAsset +from great_expectations.exceptions import DataContextError +from great_expectations.expectations import core as gx_core +from pyspark.sql import DataFrame, SparkSession + +from .common import Rule, RulesDict, ValidationSettings +from .output_transformations import ( + write_non_validation_tables, + write_validation_table, +) +from .validation_input import ( + filter_validation_dict_by_table_name, + get_data_quality_rules_dict, + validate_data_quality_rules_dict, +) + + +class ValidationRunner: + def __init__( + self, + validation_settings_obj: ValidationSettings | None = None, + data_context: AbstractDataContext | None = None, + data_source: SparkDatasource | None = None, + dataframe_asset: DataFrameAsset | None = None, + validation_definition: ValidationDefinition | None = None, + batch_definition: BatchDefinition | None = None, + action_list: List | None = None, + ): # TODO: change all variables to private, once all logic has been moved + # inside this class + """ + spark_session: SparkSession object + catalog_name: name of unity catalog + table_name: name of table in unity catalog + validation_name: name of data quality check + data_context_root_dir: path to write GX data + context - default "/dbfs/great_expectations/" + data_context: a data context object + expectation_suite_name: name of the GX expectation suite + checkpoint_name: name of the GX checkpoint + run_name: name of the data quality run + slack_webhook: webhook, recommended to store in key vault + ms_teams_webhook: webhook, recommended to store in key vault + notify_on: when to send notifications, can be equal to "all", + "success" or "failure" + """ + + if validation_settings_obj is None: + raise ValueError( + "No ValidationSettings instance has been " "provided." + ) + if not isinstance(validation_settings_obj, ValidationSettings): + raise ValueError( + "No ValidationSettings instance has been " "provided." + ) + + # Copy ValidationSettings parameters + self.spark_session = validation_settings_obj.spark_session + self.catalog_name = validation_settings_obj.catalog_name + self.table_name = validation_settings_obj.table_name + self.validation_name = validation_settings_obj.validation_name + self.data_context_root_dir = ( + validation_settings_obj.data_context_root_dir + ) + self.data_source_name = validation_settings_obj._data_source_name + self.expectation_suite_name = ( + validation_settings_obj._expectation_suite_name + ) + self.checkpoint_name = validation_settings_obj._checkpoint_name + self.run_name = validation_settings_obj._run_name + self.validation_definition_name = ( + validation_settings_obj._validation_definition_name + ) + self.batch_definition_name = ( + validation_settings_obj._batch_definition_name + ) + self.slack_webhook = validation_settings_obj.slack_webhook + self.ms_teams_webhook = validation_settings_obj.ms_teams_webhook + self.notify_on = validation_settings_obj.notify_on + + # ValidationRunner-specific parameters + self.data_context = data_context + self.data_source = data_source + self.dataframe_asset = dataframe_asset + self.batch_definition = batch_definition + self.validation_definition = validation_definition + self.action_list = action_list + + self._set_data_context() + + def _set_data_context(self): + self.data_context = get_context( + project_config=DataContextConfig( + store_backend_defaults=InMemoryStoreBackendDefaults(), + analytics_enabled=False, + ) + ) + + def _get_or_add_expectation_suite(self) -> ExpectationSuite: + try: # If expectation_suite_name exists in data_context + suite = self.data_context.suites.get( + name=self.expectation_suite_name + ) + except DataContextError: + self.data_context.suites.add( + suite=ExpectationSuite(name=self.expectation_suite_name) + ) + suite = self.data_context.suites.get( + name=self.expectation_suite_name + ) + return suite + + @staticmethod + def _get_gx_expectation_object(validation_rule: Rule): + """ + From great_expectations.expectations.core, get the relevant class and + instantiate an expectation object with the user-defined parameters + """ + gx_expectation_name = validation_rule["rule_name"] + gx_expectation_class = getattr(gx_core, gx_expectation_name) + + gx_expectation_parameters: dict = validation_rule["parameters"] + return gx_expectation_class(**gx_expectation_parameters) + + def add_expectations_to_suite(self, validation_rules_list: List[Rule]): + expectation_suite_obj = self._get_or_add_expectation_suite() # Add if + # it does not exist + + for validation_rule in validation_rules_list: + gx_expectation_obj = self._get_gx_expectation_object( + validation_rule=validation_rule + ) + expectation_suite_obj.add_expectation(gx_expectation_obj) + + def create_batch_definition(self): # pragma: no cover - only GX functions + self.data_source = self.data_context.data_sources.add_or_update_spark( + name=self.data_source_name + ) + self.dataframe_asset = self.data_source.add_dataframe_asset( + name=self.validation_name + ) + + self.batch_definition = ( + self.dataframe_asset.add_batch_definition_whole_dataframe( + name=self.batch_definition_name + ) + ) + + def create_validation_definition( + self, + ): # pragma: no cover - only GX functions + """ + Note: a validation definition combines data with a suite of + expectations. Therefore, this function can only be called if a + batch definition and a (populated) expectation suite exist. + """ + try: # If validation_definition_name exists in data_context + validation_definition = ( + self.data_context.validation_definitions.get( + name=self.validation_definition_name + ) + ) + except DataContextError: + validation_definition = ValidationDefinition( + name=self.validation_definition_name, + data=self.batch_definition, + suite=self.data_context.suites.get(self.expectation_suite_name), + ) + validation_definition = ( + self.data_context.validation_definitions.add( + validation=validation_definition + ) + ) + self.validation_definition = validation_definition + + def _add_slack_notification_to_action_list( + self, + ): + self.action_list.append( + SlackNotificationAction( + name="send_slack_notification", + slack_webhook=self.slack_webhook, + notify_on=self.notify_on, + renderer={ + "module_name": "great_expectations.render.renderer.slack_renderer", + "class_name": "SlackRenderer", + }, + ) + ) + + def _add_microsoft_teams_notification_to_action_list( + self, + ): + self.action_list.append( + MicrosoftTeamsNotificationAction( + name="send_ms_teams_notification", + teams_webhook=self.ms_teams_webhook, + notify_on=self.notify_on, + renderer={ + "module_name": "great_expectations.render.renderer.microsoft_teams_renderer", + "class_name": "MicrosoftTeamsRenderer", + }, + ) + ) + + def _create_action_list(self): + self.action_list = list() + + if self.slack_webhook is not None: + self._add_slack_notification_to_action_list() + + if self.ms_teams_webhook is not None: + self._add_microsoft_teams_notification_to_action_list() + + def _get_or_add_checkpoint( + self, + ) -> Checkpoint: # pragma: no cover - only GX functions + try: + checkpoint = self.data_context.checkpoints.get( + name=self.checkpoint_name + ) # If checkpoint_name exists in data_context + except DataContextError: + self._create_action_list() + checkpoint = Checkpoint( + name=self.checkpoint_name, + validation_definitions=[self.validation_definition], + actions=self.action_list, + ) # Note: a checkpoint combines validations with actions + + # Add checkpoint to data context for future use + (self.data_context.checkpoints.add(checkpoint=checkpoint)) + return checkpoint + + def run( + self, batch_parameters: Dict[str, DataFrame] + ) -> CheckpointResult: # pragma: no cover - only GX functions + checkpoint = self._get_or_add_checkpoint() + return checkpoint.run(batch_parameters=batch_parameters) + + +def validate( + df: DataFrame, + rules_dict: RulesDict, + validation_settings_obj: ValidationSettings, +) -> CheckpointResult: # pragma: no cover - only GX functions + """ + Uses the rules_dict to populate an expectation suite, and applies these + rules to a Spark Dataframe containing the data of interest. Returns the + results of the validation. + + :param df: A list of DataFrame instances to process. + :param rules_dict: a RulesDict object containing the + data quality rules to be evaluated. + :param validation_settings_obj: ValidationSettings object, contains all + user input required for running a validation. + """ + validation_runner_obj = ValidationRunner( + validation_settings_obj=validation_settings_obj + ) + + validation_runner_obj.add_expectations_to_suite( + validation_rules_list=rules_dict["rules"] + ) + validation_runner_obj.create_batch_definition() + validation_runner_obj.create_validation_definition() + + print("***Starting validation run***") + return validation_runner_obj.run(batch_parameters={"dataframe": df}) + + +def run( + json_path: str, + df: DataFrame, + spark_session: SparkSession, + catalog_name: str, + table_name: str, + check_name: str, + data_context_root_dir: str = "/dbfs/great_expectations/", + slack_webhook: str | None = None, + ms_teams_webhook: str | None = None, + notify_on: Literal["all", "success", "failure"] = "failure", +) -> None: # pragma: no cover - only GX functions + """ + Main function for users of dq_suite. + + Runs a validation (specified by the rules in the JSON file located at [ + json_path]) on a dataframe [df], and writes the results to a data_quality + table in [catalog_name]. + + spark_session: SparkSession object + catalog_name: name of unity catalog + table_name: name of table in unity catalog + validation_name: name of data quality check + data_context_root_dir: path to write GX data + context - default "/dbfs/great_expectations/" + slack_webhook: webhook, recommended to store in key vault. If not None, + a Slack notification will be sent + ms_teams_webhook: webhook, recommended to store in key vault. If not None, + an MS Teams notification will be sent + notify_on: when to send notifications, can be equal to "all", + "success" or "failure" + """ + validation_settings_obj = ValidationSettings( + spark_session=spark_session, + catalog_name=catalog_name, + table_name=table_name, + validation_name=check_name, + data_context_root_dir=data_context_root_dir, + slack_webhook=slack_webhook, + ms_teams_webhook=ms_teams_webhook, + notify_on=notify_on, + ) + + if not hasattr(df, "table_name"): + # TODO/check: we can have df.table_name != + # validation_settings_obj.table_name: is this wrong? + df.table_name = validation_settings_obj.table_name + + # 1) extract the data quality rules to be applied... + validation_dict = get_data_quality_rules_dict(file_path=json_path) + validate_data_quality_rules_dict(data_quality_rules_dict=validation_dict) + rules_dict = filter_validation_dict_by_table_name( + validation_dict=validation_dict, + table_name=validation_settings_obj.table_name, + ) + if rules_dict is None: + raise ValueError( + f"No validations found for table_name " + f"'{validation_settings_obj.table_name}' in JSON file at '" + f"{json_path}'." + ) + + # 2) ... perform the validation on the dataframe... + checkpoint_result = validate( + df=df, + rules_dict=rules_dict, + validation_settings_obj=validation_settings_obj, + ) + validation_output = checkpoint_result.describe_dict() + run_time = datetime.datetime.now() # TODO: get from RunIdentifier object + + # 3) ... and write results to unity catalog + write_non_validation_tables( + dq_rules_dict=validation_dict, + validation_settings_obj=validation_settings_obj, + ) + write_validation_table( + validation_output=validation_output, + validation_settings_obj=validation_settings_obj, + df=df, + dataset_name=validation_dict["dataset"]["name"], + unique_identifier=rules_dict["unique_identifier"], + run_time=run_time, + ) diff --git a/src/dq_suite/validation_input.py b/src/dq_suite/validation_input.py index abc6f87..7ef5f6c 100644 --- a/src/dq_suite/validation_input.py +++ b/src/dq_suite/validation_input.py @@ -5,7 +5,7 @@ import humps import validators -from .common import DataQualityRulesDict +from .common import DataQualityRulesDict, RulesDict def read_data_quality_rules_from_json(file_path: str) -> str: @@ -157,3 +157,13 @@ def get_data_quality_rules_dict(file_path: str) -> DataQualityRulesDict: ) return data_quality_rules_dict + + +def filter_validation_dict_by_table_name( + validation_dict: DataQualityRulesDict, table_name: str +) -> RulesDict | None: + for rules_dict in validation_dict["tables"]: + if rules_dict["table_name"] == table_name: + # Only one RulesDict per table expected, so return the first match + return rules_dict + return None diff --git a/tests/test_common.py b/tests/test_common.py index 351f5d2..443a371 100644 --- a/tests/test_common.py +++ b/tests/test_common.py @@ -139,10 +139,6 @@ class TestDataQualityRulesDict: dataset_obj = DatasetDict( name=expected_dataset_name, layer=expected_layer_name ) - dataset_obj = {"name": "the_dataset", "layer": "brons"} - data_quality_rules_dict = DataQualityRulesDict( - dataset=dataset_obj, tables=expected_rules_dict_obj_list - ) def test_initialisation_with_wrong_typed_dataset_raises_type_error(self): with pytest.raises(TypeError): @@ -161,21 +157,30 @@ def test_initialisation_with_wrong_typed_dataset_raises_type_error(self): def test_initialisation_with_wrong_typed_tables_raises_type_error(self): with pytest.raises(TypeError): - assert DataQualityRulesDict( - dataset={"name": "the_dataset", "layer": "brons"}, tables=123 - ) + assert DataQualityRulesDict(dataset=self.dataset_obj, tables=123) def test_get_value_from_data_quality_rules_dict_by_existing_key(self): + data_quality_rules_dict = DataQualityRulesDict( + dataset=self.dataset_obj, tables=self.expected_rules_dict_obj_list + ) + assert data_quality_rules_dict["dataset"] == self.dataset_obj assert ( - self.data_quality_rules_dict["tables"] + data_quality_rules_dict["tables"] == self.expected_rules_dict_obj_list ) def test_get_value_from_rule_dict_by_non_existing_key_raises_key_error( self, ): + data_quality_rules_dict = DataQualityRulesDict( + dataset=self.dataset_obj, tables=self.expected_rules_dict_obj_list + ) with pytest.raises(KeyError): - assert self.data_quality_rules_dict["wrong_key"] + assert data_quality_rules_dict["wrong_key"] + + +def test_is_empty_dataframe(): + pass # TODO: implement def test_get_full_table_name(): @@ -191,13 +196,25 @@ def test_get_full_table_name(): ) +def test_enforce_column_order(): + pass # TODO: implement + + +def test_enforce_schema(): + pass # TODO: implement + + +def test_merge_df_with_unity_table(): + pass # TODO: implement + + class TestValidationSettings: spark_session_mock = Mock(spec=SparkSession) validation_settings_obj = ValidationSettings( spark_session=spark_session_mock, catalog_name="the_catalog", table_name="the_table", - check_name="the_check", + validation_name="the_validation", ) def test_initialisation_with_wrong_typed_spark_session_raises_type_error( @@ -208,7 +225,7 @@ def test_initialisation_with_wrong_typed_spark_session_raises_type_error( spark_session=123, catalog_name="the_catalog", table_name="the_table", - check_name="the_check", + validation_name="the_validation", ) def test_initialisation_with_wrong_typed_catalog_name_raises_type_error( @@ -219,7 +236,7 @@ def test_initialisation_with_wrong_typed_catalog_name_raises_type_error( spark_session=self.spark_session_mock, catalog_name=123, table_name="the_table", - check_name="the_check", + validation_name="the_validation", ) def test_initialisation_with_wrong_typed_table_name_raises_type_error(self): @@ -228,41 +245,91 @@ def test_initialisation_with_wrong_typed_table_name_raises_type_error(self): spark_session=self.spark_session_mock, catalog_name="the_catalog", table_name=123, - check_name="the_check", + validation_name="the_validation", + ) + + def test_initialisation_with_wrong_typed_validation_name_raises_type_error( + self, + ): + with pytest.raises(TypeError): + assert ValidationSettings( + spark_session=self.spark_session_mock, + catalog_name="the_catalog", + table_name="the_table", + validation_name=123, + ) + + def test_initialisation_with_wrong_typed_data_context_root_dir_raises_type_error( + self, + ): + with pytest.raises(TypeError): + assert ValidationSettings( + spark_session=self.spark_session_mock, + catalog_name="the_catalog", + table_name="the_table", + validation_name="the_validation_name", + data_context_root_dir=123, ) - def test_initialisation_with_wrong_typed_check_name_raises_type_error(self): + def test_initialisation_with_wrong_typed_slack_webhook_raises_type_error( + self, + ): with pytest.raises(TypeError): assert ValidationSettings( spark_session=self.spark_session_mock, catalog_name="the_catalog", table_name="the_table", - check_name=123, + validation_name="the_validation_name", + slack_webhook=123, + ) + + def test_initialisation_with_wrong_typed_ms_teams_webhook_raises_type_error( + self, + ): + with pytest.raises(TypeError): + assert ValidationSettings( + spark_session=self.spark_session_mock, + catalog_name="the_catalog", + table_name="the_table", + validation_name="the_validation_name", + ms_teams_webhook=123, + ) + + def test_initialisation_with_wrong_valued_notify_on_raises_value_error( + self, + ): + with pytest.raises(ValueError): + assert ValidationSettings( + spark_session=self.spark_session_mock, + catalog_name="the_catalog", + table_name="the_table", + validation_name="the_validation_name", + notify_on="haha_this_is_wrong", ) def test_set_expectation_suite_name(self): - assert self.validation_settings_obj.expectation_suite_name is None + assert hasattr(self.validation_settings_obj, "_expectation_suite_name") self.validation_settings_obj._set_expectation_suite_name() assert ( - self.validation_settings_obj.expectation_suite_name - == f"{self.validation_settings_obj.check_name}_expectation_suite" + self.validation_settings_obj._expectation_suite_name + == f"{self.validation_settings_obj.validation_name}_expectation_suite" ) def test_set_checkpoint_name(self): - assert self.validation_settings_obj.checkpoint_name is None + assert hasattr(self.validation_settings_obj, "_checkpoint_name") self.validation_settings_obj._set_checkpoint_name() assert ( - self.validation_settings_obj.checkpoint_name - == f"{self.validation_settings_obj.check_name}_checkpoint" + self.validation_settings_obj._checkpoint_name + == f"{self.validation_settings_obj.validation_name}_checkpoint" ) def test_set_run_name(self): - assert self.validation_settings_obj.run_name is None + assert hasattr(self.validation_settings_obj, "_run_name") self.validation_settings_obj._set_run_name() assert ( - self.validation_settings_obj.run_name - == f"%Y%m%d-%H%M%S-{self.validation_settings_obj.check_name}" + self.validation_settings_obj._run_name + == f"%Y%m%d-%H%M%S-{self.validation_settings_obj.validation_name}" ) diff --git a/tests/test_data/dq_rules.json b/tests/test_data/dq_rules.json index d7f2120..69ef815 100644 --- a/tests/test_data/dq_rules.json +++ b/tests/test_data/dq_rules.json @@ -37,6 +37,13 @@ } } ] + }, + { + "unique_identifier": "id", + "table_name": "the_third_table_name", + "validate_table_schema": "the_third_table_name", + "validate_table_schema_url": "https://www.someurl.nl", + "rules": [] } ] } \ No newline at end of file diff --git a/tests/test_output_transformations.py b/tests/test_output_transformations.py index 1a6d8df..4c5609a 100644 --- a/tests/test_output_transformations.py +++ b/tests/test_output_transformations.py @@ -309,6 +309,11 @@ def test_extract_table_data_returns_correct_list( "tabelNaam": "the_other_table", "uniekeSleutel": "other_id", }, + { + "bronTabelId": "the_dataset_the_third_table_name", + "tabelNaam": "the_third_table_name", + "uniekeSleutel": "id", + }, ] assert test_output == expected_result diff --git a/tests/test_validation.py b/tests/test_validation.py new file mode 100644 index 0000000..ff6e2ec --- /dev/null +++ b/tests/test_validation.py @@ -0,0 +1,236 @@ +from unittest.mock import Mock + +import pytest +from great_expectations.checkpoint import ( + MicrosoftTeamsNotificationAction, + SlackNotificationAction, +) +from great_expectations.expectations import ExpectColumnDistinctValuesToEqualSet +from pyspark.sql import SparkSession + +from src.dq_suite.common import Rule, ValidationSettings +from src.dq_suite.validation import ValidationRunner + + +@pytest.fixture +def validation_settings_obj(): + spark_session_mock = Mock(spec=SparkSession) + validation_settings_obj = ValidationSettings( + spark_session=spark_session_mock, + catalog_name="the_catalog", + table_name="the_table", + validation_name="the_validation", + ) + return validation_settings_obj + + +@pytest.fixture +def validation_runner_obj(validation_settings_obj): + return ValidationRunner(validation_settings_obj=validation_settings_obj) + + +@pytest.mark.usefixtures("validation_settings_obj") +@pytest.mark.usefixtures("validation_runner_obj") +class TestValidationRunner: + """ + Note: the following methods are not yet tested, because they consist + entirely of (a sequence of calls to) GX methods/functions: + - create_batch_definition + - create_validation_definition + - _get_or_add_checkpoint + - run + """ + + def test_initialisation_with_none_valued_validation_settings_raises_value_error( + self, + ): + with pytest.raises(ValueError): + assert ValidationRunner(validation_settings_obj=None) + + def test_initialisation_with_wrong_typed_validation_settings_raises_value_error( + self, + ): + with pytest.raises(ValueError): + assert ValidationRunner(validation_settings_obj=123) + + def test_initialisation_works_as_expected(self, validation_settings_obj): + validation_runner_obj = ValidationRunner( + validation_settings_obj=validation_settings_obj + ) + + # After setting all attribute values, a data_context should be created + assert validation_runner_obj.data_context is not None + + def test_get_or_add_expectation_suite_works_as_expected( + self, validation_runner_obj + ): + # Initially, no suites exist in the data context + suites_list = list(validation_runner_obj.data_context.suites.all()) + assert len(suites_list) == 0 + + # Create a new validation suite + expected_expectation_suite_name = ( + "previously_undefined_validation_suite" + ) + validation_runner_obj.expectation_suite_name = ( + expected_expectation_suite_name + ) + first_suite = validation_runner_obj._get_or_add_expectation_suite() + + # Now, there should be 1 suite in the data context + suites_list = list(validation_runner_obj.data_context.suites.all()) + assert len(suites_list) == 1 + assert suites_list[0] == first_suite + + # Calling the function again should return the existing validation suite + second_suite = validation_runner_obj._get_or_add_expectation_suite() + suites_list = list(validation_runner_obj.data_context.suites.all()) + assert len(suites_list) == 1 + assert suites_list[0] == first_suite + assert first_suite == second_suite + + def test_get_nonexistent_gx_expectation_object_raises_attribute_error( + self, validation_runner_obj + ): + with pytest.raises(AttributeError): + the_rule = Rule( + rule_name="NonExistentExpectation", + parameters={"column": "the_column", "value_set": [1, 2, 3]}, + ) + + validation_runner_obj._get_gx_expectation_object( + validation_rule=the_rule + ) + + def test_get_gx_expectation_object(self, validation_runner_obj): + the_rule = Rule( + rule_name="ExpectColumnDistinctValuesToEqualSet", + parameters={"column": "the_column", "value_set": [1, 2, 3]}, + ) + the_expectation_object = ( + validation_runner_obj._get_gx_expectation_object( + validation_rule=the_rule + ) + ) + + assert isinstance( + the_expectation_object, ExpectColumnDistinctValuesToEqualSet + ) + assert the_expectation_object.column == the_rule["parameters"]["column"] + assert ( + the_expectation_object.value_set + == the_rule["parameters"]["value_set"] + ) + + def test_add_expectations_to_suite_works_as_expected( + self, validation_runner_obj + ): + validation_rules_list = [ + Rule( + rule_name="ExpectColumnDistinctValuesToEqualSet", + parameters={"column": "the_column", "value_set": [1, 2, 3]}, + ) + ] + validation_runner_obj.add_expectations_to_suite( + validation_rules_list=validation_rules_list + ) + suites_list = list(validation_runner_obj.data_context.suites.all()) + expectations_list = suites_list[0]["expectations"] + assert len(expectations_list) == 1 + assert isinstance( + expectations_list[0], ExpectColumnDistinctValuesToEqualSet + ) + + def test_create_batch_definition(self, validation_runner_obj): + # Initially, no batch definitions exist in the data context + + # validation_runner_obj.create_batch_definition() + + # TODO: mock use of spark for + # data_context.data_sources.add_or_update_spark + pass + + def test_create_validation_definition(self, validation_runner_obj): + # TODO: mock use of batch_definition for ValidationDefinition + pass + + def test_create_action_list_with_slack_webhook(self, validation_runner_obj): + # Initially, there are no actions in the action_list parameter + assert validation_runner_obj.action_list is None + + validation_runner_obj.slack_webhook = "the_slack_webhook" + validation_runner_obj._create_action_list() + + # Now there should be 1 SlackNotificationAction in the action_list + assert isinstance(validation_runner_obj.action_list, list) + assert len(validation_runner_obj.action_list) == 1 + assert isinstance( + validation_runner_obj.action_list[0], SlackNotificationAction + ) + + def test_create_action_list_without_slack_webhook( + self, validation_runner_obj + ): + # Initially, there are no actions in the action_list parameter + assert validation_runner_obj.action_list is None + + validation_runner_obj.slack_webhook = None + validation_runner_obj._create_action_list() + + # Now there should still be no actions in the action_list parameter + assert isinstance(validation_runner_obj.action_list, list) + assert len(validation_runner_obj.action_list) == 0 + + def test_create_action_list_with_ms_teams_webhook( + self, validation_runner_obj + ): + # Initially, there are no actions in the action_list parameter + assert validation_runner_obj.action_list is None + + validation_runner_obj.ms_teams_webhook = "the_ms_teams_webhook" + validation_runner_obj._create_action_list() + + # Now there should be 1 MicrosoftTeamsNotificationAction in the action_list + assert isinstance(validation_runner_obj.action_list, list) + assert len(validation_runner_obj.action_list) == 1 + assert isinstance( + validation_runner_obj.action_list[0], + MicrosoftTeamsNotificationAction, + ) + + def test_create_action_list_without_ms_teams_webhook( + self, validation_runner_obj + ): + # Initially, there are no actions in the action_list parameter + assert validation_runner_obj.action_list is None + + validation_runner_obj.ms_teams_webhook = None + validation_runner_obj._create_action_list() + + # Now there should still be no actions in the action_list parameter + assert isinstance(validation_runner_obj.action_list, list) + assert len(validation_runner_obj.action_list) == 0 + + def test_get_or_add_checkpoint(self, validation_runner_obj): + # TODO: mock use of ValidationDefinition for Checkpoint + pass + + +class TestValidate: + """ + Note: this function is not yet tested, because it consists entirely of + (a sequence of calls to) GX methods/functions. + """ + + def test_validate(self): + pass + + +class TestRun: + """ + Note: this function is not yet tested, because it consists entirely of + (a sequence of calls to) GX methods/functions. + """ + + def test_run(self): + pass diff --git a/tests/test_validation_input.py b/tests/test_validation_input.py index 2e13bb8..2503682 100644 --- a/tests/test_validation_input.py +++ b/tests/test_validation_input.py @@ -1,9 +1,12 @@ import json +from unittest.mock import patch import pytest from tests import TEST_DATA_FOLDER +from src.dq_suite import validation_input from src.dq_suite.validation_input import ( + filter_validation_dict_by_table_name, get_data_quality_rules_dict, read_data_quality_rules_from_json, validate_data_quality_rules_dict, @@ -40,6 +43,18 @@ def data_quality_rules_dict(data_quality_rules_json_string): return json.loads(data_quality_rules_json_string) +@pytest.fixture +def validated_data_quality_rules_dict(data_quality_rules_dict): + """ + Note: the normal flow validates the data quality rules dict before + using it - hence this fixture. + """ + validate_data_quality_rules_dict( + data_quality_rules_dict=data_quality_rules_dict + ) + return data_quality_rules_dict + + @pytest.fixture def rules_dict(data_quality_rules_dict): return data_quality_rules_dict["tables"][0] @@ -89,6 +104,18 @@ def test_validate_data_quality_rules_dict_raises_type_error( data_quality_rules_dict="wrong_type" ) + def test_rules_dict_without_rules_field_results_in_table_schema_validation( + self, data_quality_rules_dict + ): + with patch.object( + target=validation_input, + attribute="validate_table_schema", + ) as validate_table_schema_mock: + validate_data_quality_rules_dict( + data_quality_rules_dict=data_quality_rules_dict + ) + validate_table_schema_mock.assert_called_once() + def test_validate_data_quality_rules_dict(self, data_quality_rules_dict): validate_data_quality_rules_dict( data_quality_rules_dict=data_quality_rules_dict @@ -314,3 +341,27 @@ def test_get_data_quality_rules_dict_returns_dict( file_path=real_json_file_path ) assert isinstance(data_quality_rules_dict, dict) + + +@pytest.mark.usefixtures("validated_data_quality_rules_dict") +class TestFilterValidationDictByTableName: + def test_filter_validation_dict_by_table_name_returns_none_for_nonexistent_table( + self, validated_data_quality_rules_dict + ): + the_nonexistent_table_dict = filter_validation_dict_by_table_name( + validation_dict=validated_data_quality_rules_dict, + table_name="the_nonexistent_table", + ) + assert the_nonexistent_table_dict is None + + def test_filter_validation_dict_by_table_name( + self, validated_data_quality_rules_dict + ): + the_table_dict = filter_validation_dict_by_table_name( + validation_dict=validated_data_quality_rules_dict, + table_name="the_table", + ) + assert the_table_dict is not None + assert "unique_identifier" in the_table_dict + assert "table_name" in the_table_dict + assert "rules" in the_table_dict