Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion .github/workflows/onpush.yml
Original file line number Diff line number Diff line change
Expand Up @@ -61,15 +61,18 @@ jobs:
- name: Run Unit Tests
run: python -m coverage run -m pytest tests/ -v

- name: Generate coverage XML
run: python -m coverage xml -o coverage.xml

- name: Publish test coverage
if: startsWith(matrix.os,'ubuntu')
uses: codecov/codecov-action@v3
with:
token: ${{ secrets.CODECOV_TOKEN }}
files: ./coverage.xml
env_vars: OS,PYTHON
fail_ci_if_error: true
flags: unittests
name: codecov-umbrella
path_to_write_report: ./coverage/codecov_report.txt
verbose: true

2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
- Fixed issue Silver Data Quality not working [PR](https://github.com/databrickslabs/dlt-meta/issues/156)
- Fixed issue Removed DPM flag check inside dataflowpipeline [PR](https://github.com/databrickslabs/dlt-meta/issues/177)
- Fixed issue Updated dlt-meta demos into Delta Live Tables Notebook github [PR](https://github.com/databrickslabs/dlt-meta/issues/158)
- Fixed issue Adding multiple col support for auto_cdc api [PR](https://github.com/databrickslabs/dlt-meta/pull/224)
- Fixed issue Added support for custom transformations for Kafka/Delta [PR](https://github.com/databrickslabs/dlt-meta/pull/228)


## [v.0.0.9]
Expand Down
4 changes: 3 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@

In practice, a single generic pipeline reads the Dataflowspec and uses it to orchestrate and run the necessary data processing workloads. This approach streamlines the development and management of data pipelines, allowing for a more efficient and scalable data processing workflow

[Lakeflow Declarative Pipelines](https://www.databricks.com/product/data-engineering/lakeflow-declarative-pipelines) and `DLT-META` are designed to complement each other. [Lakeflow Declarative Pipelines](https://www.databricks.com/product/data-engineering/lakeflow-declarative-pipelines) provide a declarative, intent-driven foundation for building and managing data workflows, while DLT-META adds a powerful configuration-driven layer that automates and scales pipeline creation. By combining these approaches, teams can move beyond manual coding to achieve true enterprise-level agility, governance, and efficiency, templatizing and automating pipelines for any scale of modern data-driven business

### Components:

#### Metadata Interface
Expand All @@ -45,7 +47,7 @@ In practice, a single generic pipeline reads the Dataflowspec and uses it to orc

![DLT-META Stages](./docs/static/images/dlt-meta_stages.png)

## DLT-META Lakeflow Declarative Pipeline Features support
## DLT-META `Lakeflow Declarative Pipelines` Features support
| Features | DLT-META Support |
| ------------- | ------------- |
| Input data sources | Autoloader, Delta, Eventhub, Kafka, snapshot |
Expand Down
17 changes: 12 additions & 5 deletions docs/content/_index.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,14 @@ draft: false


## Project Overview
DLT-META is a metadata-driven framework designed to work with Databricks Lakeflow Declarative Pipelines . This framework enables the automation of bronze and silver data pipelines by leveraging metadata recorded in an onboarding JSON file. This file, known as the Dataflowspec, serves as the data flow specification, detailing the source and target metadata required for the pipelines.
`DLT-META` is a metadata-driven framework designed to work with [Lakeflow Declarative Pipelines](https://www.databricks.com/product/data-engineering/lakeflow-declarative-pipelines). This framework enables the automation of bronze and silver data pipelines by leveraging metadata recorded in an onboarding JSON file. This file, known as the Dataflowspec, serves as the data flow specification, detailing the source and target metadata required for the pipelines.

In practice, a single generic pipeline reads the Dataflowspec and uses it to orchestrate and run the necessary data processing workloads. This approach streamlines the development and management of data pipelines, allowing for a more efficient and scalable data processing workflow

[Lakeflow Declarative Pipelines](https://www.databricks.com/product/data-engineering/lakeflow-declarative-pipelines) and `DLT-META` are designed to complement each other. [Lakeflow Declarative Pipelines](https://www.databricks.com/product/data-engineering/lakeflow-declarative-pipelines) provide a declarative, intent-driven foundation for building and managing data workflows, while DLT-META adds a powerful configuration-driven layer that automates and scales pipeline creation. By combining these approaches, teams can move beyond manual coding to achieve true enterprise-level agility, governance, and efficiency, templatizing and automating pipelines for any scale of modern data-driven business



### DLT-META components:

#### Metadata Interface
Expand Down Expand Up @@ -40,7 +44,7 @@ In practice, a single generic pipeline reads the Dataflowspec and uses it to orc
- Option#1: [DLT-META CLI](https://databrickslabs.github.io/dlt-meta/getting_started/dltmeta_cli/#dataflow-dlt-pipeline)
- Option#2: [DLT-META MANUAL](https://databrickslabs.github.io/dlt-meta/getting_started/dltmeta_manual/#dataflow-dlt-pipeline)

## DLT-META DLT Features support
## DLT-META `Lakeflow Declarative Pipelines` Features support
| Features | DLT-META Support |
| ------------- | ------------- |
| Input data sources | Autoloader, Delta, Eventhub, Kafka, snapshot |
Expand All @@ -50,11 +54,14 @@ In practice, a single generic pipeline reads the Dataflowspec and uses it to orc
| Quarantine table support | Bronze layer |
| [create_auto_cdc_flow](https://docs.databricks.com/aws/en/dlt-ref/dlt-python-ref-apply-changes) API support | Bronze, Silver layer |
| [create_auto_cdc_from_snapshot_flow](https://docs.databricks.com/aws/en/dlt-ref/dlt-python-ref-apply-changes-from-snapshot) API support | Bronze layer|
| [append_flow](https://docs.databricks.com/aws/en/dlt-ref/dlt-python-ref-append-flow) API support | Bronze layer|
| Liquid cluster support | Bronze, Bronze Quarantine, Silver, Silver Quarantine tables|
| [append_flow](https://docs.databricks.com/en/delta-live-tables/flows.html#use-append-flow-to-write-to-a-streaming-table-from-multiple-source-streams) API support | Bronze layer|
| Liquid cluster support | Bronze, Bronze Quarantine, Silver tables|
| [DLT-META CLI](https://databrickslabs.github.io/dlt-meta/getting_started/dltmeta_cli/) | ```databricks labs dlt-meta onboard```, ```databricks labs dlt-meta deploy``` |
| Bronze and Silver pipeline chaining | Deploy dlt-meta pipeline with ```layer=bronze_silver``` option using default publishing mode |
| [DLT Sinks](https://docs.databricks.com/aws/en/dlt/dlt-sinks) | Supported formats:external ```delta table```, ```kafka```.Bronze, Silver layers|
| [create_sink](https://docs.databricks.com/aws/en/dlt-ref/dlt-python-ref-sink) API support |Supported formats:```external delta table , kafka``` Bronze, Silver layers|
| [Databricks Asset Bundles](https://docs.databricks.com/aws/en/dev-tools/bundles/) | Supported
| [DLT-META UI](https://github.com/databrickslabs/dlt-meta/tree/main/lakehouse_app#dlt-meta-lakehouse-app-setup) | Uses Databricks Lakehouse DLT-META App

## How much does it cost ?
DLT-META does not have any **direct cost** associated with it other than the cost to run the Databricks Lakeflow Declarative Pipelines
on your environment.The overall cost will be determined primarily by the [Databricks Lakeflow Declarative Pipelines Pricing] (https://www.databricks.com/product/pricing/lakeflow-declarative-pipelines)
Expand Down
12 changes: 11 additions & 1 deletion docs/content/faq/execution.md
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ When you launch Lakeflow Declarative Pipeline it will read silver onboarding and
"keys":[
"customer_id"
],
"sequence_by":"dmsTimestamp",
"sequence_by":"dmsTimestamp,enqueueTimestamp,sequenceId",
"scd_type":"2",
"apply_as_deletes":"Op = 'D'",
"except_column_list":[
Expand Down Expand Up @@ -180,3 +180,13 @@ DLT-META have tag [source_metadata](https://github.com/databrickslabs/dlt-meta/b
- `autoloader_metadata_col_name` if this provided then will be used to rename _metadata to this value otherwise default is `source_metadata`
- `select_metadata_cols:{key:value}` will be used to extract columns from _metadata. key is target dataframe column name and value is expression used to add column from _metadata column

**Q. After upgrading dlt-meta, why do Lakeflow Declarative Pipeline fail with the message “Materializing tables in custom schemas is not supported,” and how can this be fixed?**

This failure happens because the pipeline was created using Legacy Publishing mode, which does not support saving tables with catalog or schema qualifiers (such as catalog.schema.table). As a result, using qualified table names leads to an error:

``
com.databricks.pipelines.common.errors.DLTAnalysisException: Materializing tables in custom schemas is not supported. Please remove the database qualifier from table 'catalog_name.schema_name.table_name'
``

To resolve this, migrate the pipeline to the default (Databricks Publishing Mode) by following Databricks’ guide: [Migrate to the default publishing mode](https://docs.databricks.com/aws/en/dlt/migrate-to-dpm#migrate-to-the-default-publishing-mode).

3 changes: 3 additions & 0 deletions docs/content/releases/_index.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ draft: false
- Fixed issue Silver Data Quality not working [PR](https://github.com/databrickslabs/dlt-meta/issues/156)
- Fixed issue Removed DPM flag check inside dataflowpipeline [PR](https://github.com/databrickslabs/dlt-meta/issues/177)
- Fixed issue Updated dlt-meta demos into Delta Live Tables Notebook github [PR](https://github.com/databrickslabs/dlt-meta/issues/158)
- Fixed issue Adding multiple col support for auto_cdc api [PR](https://github.com/databrickslabs/dlt-meta/pull/224)
- Fixed issue Added support for custom transformations for Kafka/Delta [PR](https://github.com/databrickslabs/dlt-meta/pull/228)

# v0.0.9
## Enhancements
Expand All @@ -44,6 +46,7 @@ draft: false
- Fixed issue DLT-META CLI should use pypi lib instead of whl : [PR](https://github.com/databrickslabs/dlt-meta/pull/79)
- Fixed issue Onboarding with multiple partition columns errors out: [PR](https://github.com/databrickslabs/dlt-meta/pull/134)


# v0.0.8
## Enhancements
- Added dlt append_flow api support: [PR](https://github.com/databrickslabs/dlt-meta/pull/58)
Expand Down
28 changes: 22 additions & 6 deletions src/dataflow_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import ast
import dlt
from pyspark.sql import DataFrame
from pyspark.sql.functions import expr
from pyspark.sql.functions import expr, struct
from pyspark.sql.types import StructType, StructField
from src.dataflow_spec import BronzeDataflowSpec, SilverDataflowSpec, DataflowSpecUtils
from src.pipeline_writers import AppendFlowWriter, DLTSinkWriter
Expand Down Expand Up @@ -315,9 +315,9 @@ def read_bronze(self) -> DataFrame:
if bronze_dataflow_spec.sourceFormat == "cloudFiles":
input_df = pipeline_reader.read_dlt_cloud_files()
elif bronze_dataflow_spec.sourceFormat == "delta" or bronze_dataflow_spec.sourceFormat == "snapshot":
return pipeline_reader.read_dlt_delta()
input_df = pipeline_reader.read_dlt_delta()
elif bronze_dataflow_spec.sourceFormat == "eventhub" or bronze_dataflow_spec.sourceFormat == "kafka":
return pipeline_reader.read_kafka()
input_df = pipeline_reader.read_kafka()
else:
raise Exception(f"{bronze_dataflow_spec.sourceFormat} source format not supported")
return self.apply_custom_transform_fun(input_df)
Expand Down Expand Up @@ -630,11 +630,18 @@ def cdc_apply_changes(self):
target_table = (
f"{target_cl_name}{target_db_name}.{target_table_name}"
)

# Handle comma-separated sequence columns using struct
sequence_by = cdc_apply_changes.sequence_by
if ',' in sequence_by:
sequence_cols = [col.strip() for col in sequence_by.split(',')]
sequence_by = struct(*sequence_cols) # Use struct() from pyspark.sql.functions

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how do we guarantee consistent ordering of the sequencing columns. "col1, col2" vs. "col2, col1" are different structs and will have a different sorting and uniqueness logic.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes! Code preserves the order exactly as provided:

  • split(',') returns tokens in the original order
  • struct(sequence_cols) builds the struct with fields in that same order

So "col1, col2" and "col2, col1" produce different structs, which is expected and will change sequencing semantics.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sounds good.


dlt.create_auto_cdc_flow(
target=target_table,
source=self.view_name,
keys=cdc_apply_changes.keys,
sequence_by=cdc_apply_changes.sequence_by,
sequence_by=sequence_by,
where=cdc_apply_changes.where,
ignore_null_updates=cdc_apply_changes.ignore_null_updates,
apply_as_deletes=apply_as_deletes,
Expand Down Expand Up @@ -673,8 +680,17 @@ def modify_schema_for_cdc_changes(self, cdc_apply_changes):
for field in struct_schema.fields:
if field.name not in cdc_apply_changes.except_column_list:
modified_schema.add(field)
if field.name == cdc_apply_changes.sequence_by:
sequenced_by_data_type = field.dataType
# For SCD Type 2, get data type of first sequence column
sequence_by = cdc_apply_changes.sequence_by.strip()
if ',' not in sequence_by:
# Single column sequence
if field.name == sequence_by:
sequenced_by_data_type = field.dataType
else:
# Multiple column sequence - use first column's type
first_sequence_col = sequence_by.split(',')[0].strip()
if field.name == first_sequence_col:
sequenced_by_data_type = field.dataType
struct_schema = modified_schema
else:
raise Exception(f"Schema is None for {self.dataflowSpec} for cdc_apply_changes! ")
Expand Down
16 changes: 16 additions & 0 deletions tests/test_dataflow_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -1363,6 +1363,22 @@ def test_write_bronze_cdc_apply_changes(self, mock_cdc_apply_changes):
pipeline.write_bronze()
assert mock_cdc_apply_changes.called

@patch.object(DataflowPipeline, 'cdc_apply_changes', return_value=None)
def test_write_bronze_cdc_apply_changes_multiple_sequence(self, mock_cdc_apply_changes):
"""Test write_bronze with CDC apply changes using multiple sequence columns."""
bronze_dataflow_spec = BronzeDataflowSpec(**self.bronze_dataflow_spec_map)
bronze_dataflow_spec.cdcApplyChanges = json.dumps({
"keys": ["id"],
"sequence_by": "event_timestamp, enqueue_timestamp, sequence_id",
"scd_type": "1",
"apply_as_deletes": "operation = 'DELETE'",
"except_column_list": ["operation", "event_timestamp", "enqueue_timestamp", "sequence_id", "_rescued_data"]
})
view_name = f"{bronze_dataflow_spec.targetDetails['table']}_inputview"
pipeline = DataflowPipeline(self.spark, bronze_dataflow_spec, view_name, None)
pipeline.write_bronze()
assert mock_cdc_apply_changes.called

@patch('pyspark.sql.SparkSession.readStream')
def test_get_silver_schema_uc_enabled(self, mock_read_stream):
"""Test get_silver_schema with Unity Catalog enabled."""
Expand Down