Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add dataset level #49

Merged
merged 11 commits into from
Sep 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta"

[project]
name = "dq-suite-amsterdam"
version = "0.8.0"
version = "0.9.0"
authors = [
{ name="Arthur Kordes", email="[email protected]" },
{ name="Aysegul Cayir Aydar", email="[email protected]" },
Expand Down
73 changes: 49 additions & 24 deletions scripts/data_quality_tables.sql
Original file line number Diff line number Diff line change
@@ -1,20 +1,38 @@
-- Databricks notebook source
-- MAGIC %md
-- MAGIC schema and tables for Dataquality great expectations
-- MAGIC This script creates the schema and tables for dq-suite-amsterdam

-- COMMAND ----------

CREATE WIDGET TEXT catalog DEFAULT "dpxx_dev"

-- COMMAND ----------

create schema if not exists ${catalog}.data_quality

-- COMMAND ----------

CREATE TABLE IF NOT EXISTS ${catalog}.data_quality.regel (
`regelId` STRING,
`regelNaam` STRING,
`regelParameters` STRING,
`bronTabelId` STRING)
CREATE TABLE IF NOT EXISTS ${catalog}.data_quality.brondataset (
bronDatasetId STRING,
medaillonLaag STRING)
USING delta
COMMENT 'Created by the file upload UI'
COMMENT 'Deployed by dq-suite-amsterdam'
TBLPROPERTIES (
'delta.columnMapping.mode' = 'name',
'delta.enableDeletionVectors' = 'true',
'delta.feature.columnMapping' = 'supported',
'delta.feature.deletionVectors' = 'supported',
'delta.minReaderVersion' = '3',
'delta.minWriterVersion' = '7')

-- COMMAND ----------

CREATE TABLE IF NOT EXISTS ${catalog}.data_quality.brontabel (
bronTabelId STRING,
tabelNaam STRING,
uniekeSleutel STRING)
USING delta
COMMENT 'Deployed by dq-suite-amsterdam'
TBLPROPERTIES (
'delta.columnMapping.mode' = 'name',
'delta.enableDeletionVectors' = 'true',
Expand All @@ -26,12 +44,29 @@ TBLPROPERTIES (
-- COMMAND ----------

CREATE TABLE IF NOT EXISTS ${catalog}.data_quality.bronattribuut (
name STRING,
`bronAttribuutId` STRING,
`bronTabelId` STRING,
`attribuutNaam` STRING)
bronAttribuutId STRING,
bronTabelId STRING,
attribuutNaam STRING)
USING delta
COMMENT 'Created by the file upload UI'
COMMENT 'Deployed by dq-suite-amsterdam'
TBLPROPERTIES (
'delta.columnMapping.mode' = 'name',
'delta.enableDeletionVectors' = 'true',
'delta.feature.columnMapping' = 'supported',
'delta.feature.deletionVectors' = 'supported',
'delta.minReaderVersion' = '3',
'delta.minWriterVersion' = '7')

-- COMMAND ----------

CREATE TABLE IF NOT EXISTS ${catalog}.data_quality.regel (
regelId STRING,
regelNaam STRING,
regelParameters STRING,
bronTabelId STRING,
attribuut STRING)
USING delta
COMMENT 'Deployed by dq-suite-amsterdam'
TBLPROPERTIES (
'delta.columnMapping.mode' = 'name',
'delta.enableDeletionVectors' = 'true',
Expand All @@ -49,7 +84,7 @@ CREATE TABLE IF NOT EXISTS ${catalog}.data_quality.validatie (
dqDatum TIMESTAMP,
dqResultaat STRING)
USING delta
COMMENT 'Created by the file upload UI'
COMMENT 'Deployed by dq-suite-amsterdam'
TBLPROPERTIES (
'delta.checkpoint.writeStatsAsJson' = 'false',
'delta.checkpoint.writeStatsAsStruct' = 'true',
Expand All @@ -67,21 +102,11 @@ CREATE TABLE IF NOT EXISTS ${catalog}.data_quality.afwijking (
afwijkendeAttribuutWaarde STRING,
dqDatum TIMESTAMP)
USING delta
COMMENT 'Created by the file upload UI'
COMMENT 'Deployed by dq-suite-amsterdam'
TBLPROPERTIES (
'delta.checkpoint.writeStatsAsJson' = 'false',
'delta.checkpoint.writeStatsAsStruct' = 'true',
'delta.enableDeletionVectors' = 'true',
'delta.feature.deletionVectors' = 'supported',
'delta.minReaderVersion' = '3',
'delta.minWriterVersion' = '7')

-- COMMAND ----------

CREATE TABLE IF NOT EXISTS ${catalog}.data_quality.brontabel (
bronTabelId STRING,
uniekeSleutel STRING)
USING delta
TBLPROPERTIES (
'delta.minReaderVersion' = '1',
'delta.minWriterVersion' = '2')
40 changes: 38 additions & 2 deletions src/dq_suite/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,19 +72,55 @@ def __getitem__(self, key) -> str | RulesList | None:
raise KeyError(key)


@dataclass()
class DatasetDict:
"""
Groups the name and the medallion layer of the dataset where the
rules apply to.
"""

name: str
layer: str

def __post_init__(self):
if not isinstance(self.name, str):
raise TypeError("'name' should be of type str")

if not isinstance(self.layer, str):
raise TypeError("'layer' should be of type str")

def __getitem__(self, key) -> str | RulesList | None:
if key == "name":
return self.name
elif key == "layer":
return self.layer
raise KeyError(key)


RulesDictList = List[RulesDict] # a list of dictionaries containing DQ rules


@dataclass()
class DataQualityRulesDict:
"""
Groups a list of Table-objects together with the definition of the dataset
these tables are a part of.
"""

dataset: DatasetDict
tables: RulesDictList

def __post_init__(self):
if not isinstance(self.dataset, dict):
raise TypeError("'dataset' should be DatasetDict")

if not isinstance(self.tables, list):
raise TypeError("'tables' should be RulesDictList")

def __getitem__(self, key) -> RulesDictList | None:
if key == "tables":
def __getitem__(self, key) -> str | RulesDictList | None:
if key == "dataset":
return self.dataset
elif key == "tables":
return self.tables
raise KeyError(key)

Expand Down
92 changes: 74 additions & 18 deletions src/dq_suite/output_transformations.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,9 @@
write_to_unity_catalog,
merge_df_with_unity_table,
)
from .schemas.bronattribuut import SCHEMA as BRONATTRIBUUT_SCHEMA
from .schemas.brondataset import SCHEMA as BRONDATASET_SCHEMA
from .schemas.brontabel import SCHEMA as BRONTABEL_SCHEMA
from .schemas.bronattribuut import SCHEMA as BRONATTRIBUUT_SCHEMA
from .schemas.regel import SCHEMA as REGEL_SCHEMA
from .schemas.validatie import SCHEMA as VALIDATIE_SCHEMA
from .schemas.pre_validatie import SCHEMA as PRE_VALIDATIE_SCHEMA
Expand Down Expand Up @@ -199,6 +200,43 @@ def extract_dq_afwijking_data(
pass


def create_brondataset(
dq_rules_dict: DataQualityRulesDict,
catalog_name: str,
spark_session: SparkSession,
) -> None:
"""
Function takes the dataset name and layer from the provided
Data Quality rules to create a DataFrame containing this metadata.

:param dq_rules_dict:
:param catalog_name:
:param spark_session:
"""
name = dq_rules_dict["dataset"]["name"]
layer = dq_rules_dict["dataset"]["layer"]
extracted_data = [{"bronDatasetId": name, "medaillonLaag": layer}]

df_brondataset = list_of_dicts_to_df(
list_of_dicts=extracted_data,
spark_session=spark_session,
schema=BRONDATASET_SCHEMA,
)
merge_dict = {
"bronDatasetId": "brondataset_df.bronDatasetId",
"medaillonLaag": "brondataset_df.medaillonLaag",
}
merge_df_with_unity_table(
df=df_brondataset,
catalog_name=catalog_name,
table_name="brondataset",
table_merge_id="bronDatasetId",
df_merge_id="bronDatasetId",
merge_dict=merge_dict,
spark_session=spark_session,
)


def create_brontabel(
dq_rules_dict: DataQualityRulesDict,
catalog_name: str,
Expand All @@ -213,11 +251,15 @@ def create_brontabel(
:param spark_session:
"""
extracted_data = []
dataset_name = dq_rules_dict["dataset"]["name"]
for param in dq_rules_dict["tables"]:
name = param["table_name"]
table_name = param["table_name"]
tabel_id = f"{dataset_name}_{table_name}"
unique_identifier = param["unique_identifier"]
extracted_data.append(
{"bronTabelId": name, "uniekeSleutel": unique_identifier}
{"bronTabelId": tabel_id,
"tabelNaam": table_name,
"uniekeSleutel": unique_identifier}
)

df_brontabel = list_of_dicts_to_df(
Expand All @@ -227,6 +269,7 @@ def create_brontabel(
)
merge_dict = {
"bronTabelId": "brontabel_df.bronTabelId",
"tabelNaam": "brontabel_df.tabelNaam",
"uniekeSleutel": "brontabel_df.uniekeSleutel",
}
merge_df_with_unity_table(
Expand Down Expand Up @@ -254,24 +297,26 @@ def create_bronattribute(
:param spark_session:
"""
extracted_data = []
dataset_name = dq_rules_dict["dataset"]["name"]
used_ids = set() # To keep track of used IDs
for param in dq_rules_dict["tables"]:
bron_tabel = param["table_name"]
table_name = param["table_name"]
tabel_id = f"{dataset_name}_{table_name}"
for rule in param["rules"]:
parameters = rule.get("parameters", [])
for parameter in parameters:
if isinstance(parameter, dict) and "column" in parameter:
attribute_name = parameter["column"]
# Create a unique ID
unique_id = f"{bron_tabel}_{attribute_name}"
unique_id = f"{tabel_id}_{attribute_name}"
# Check if the ID is already used
if unique_id not in used_ids:
used_ids.add(unique_id)
extracted_data.append(
{
"bronAttribuutId": unique_id,
"attribuutNaam": attribute_name,
"bronTabelId": bron_tabel,
"bronTabelId": tabel_id,
}
)

Expand Down Expand Up @@ -311,18 +356,23 @@ def create_dq_regel(
:param spark_session:
"""
extracted_data = []
for param in dq_rules_dict["tables"]:
bron_tabel = param["table_name"]
for rule in param["rules"]:
dataset_name = dq_rules_dict["dataset"]["name"]
for table in dq_rules_dict["tables"]:
table_name = table["table_name"]
tabel_id = f"{dataset_name}_{table_name}"
for rule in table["rules"]:
rule_name = rule["rule_name"]
parameters = rule.get("parameters", [])
extracted_data.append(
{
"regelNaam": rule_name,
"regelParameters": parameters,
"bronTabelId": bron_tabel
}
)
for param_set in parameters:
column = param_set.get("column")
extracted_data.append(
{
"regelNaam": rule_name,
"regelParameters": parameters,
"bronTabelId": tabel_id,
"attribuut": column
}
)

df_regel = list_of_dicts_to_df(
list_of_dicts=extracted_data,
Expand All @@ -331,13 +381,14 @@ def create_dq_regel(
)
df_regel_with_id_ordered = construct_regel_id(
df=df_regel,
output_columns_list=['regelId','regelNaam','regelParameters','bronTabelId']
output_columns_list=['regelId','regelNaam','regelParameters','bronTabelId','attribuut']
)
merge_dict = {
"regelId": "regel_df.regelId",
"regelNaam": "regel_df.regelNaam",
"regelParameters": "regel_df.regelParameters",
"bronTabelId": "regel_df.bronTabelId"
"bronTabelId": "regel_df.bronTabelId",
"attribuut": "regel_df.attribuut"
}
merge_df_with_unity_table(
df=df_regel_with_id_ordered,
Expand All @@ -354,6 +405,11 @@ def write_non_validation_tables(
dq_rules_dict: DataQualityRulesDict,
validation_settings_obj: ValidationSettings,
) -> None:
create_brondataset(
dq_rules_dict=dq_rules_dict,
catalog_name=validation_settings_obj.catalog_name,
spark_session=validation_settings_obj.spark_session,
)
create_brontabel(
dq_rules_dict=dq_rules_dict,
catalog_name=validation_settings_obj.catalog_name,
Expand Down
5 changes: 5 additions & 0 deletions src/dq_suite/schemas/brondataset.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
from pyspark.sql.types import StructType

SCHEMA = (
StructType().add("bronDatasetId", "string").add("medaillonLaag", "string")
)
7 changes: 5 additions & 2 deletions src/dq_suite/schemas/brontabel.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
from pyspark.sql.types import StructType

SCHEMA = (
StructType().add("bronTabelId", "string").add("uniekeSleutel", "string")
)
StructType()
.add("bronTabelId", "string")
.add("tabelNaam", "string")
.add("uniekeSleutel", "string")
)
1 change: 1 addition & 0 deletions src/dq_suite/schemas/regel.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,5 @@
.add("regelNaam", "string")
.add("regelParameters", "string")
.add("bronTabelId", "string")
.add("attribuut", "string")
)
Loading