Skip to content

Commit e2a5aa1

Browse files
Merge pull request #224 from databrickslabs/issue_223
Adding multiple col support for auto_cdc api
2 parents 4141ee7 + cca97e2 commit e2a5aa1

File tree

5 files changed

+39
-5
lines changed

5 files changed

+39
-5
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
- Fixed issue Silver Data Quality not working [PR](https://github.com/databrickslabs/dlt-meta/issues/156)
1919
- Fixed issue Removed DPM flag check inside dataflowpipeline [PR](https://github.com/databrickslabs/dlt-meta/issues/177)
2020
- Fixed issue Updated dlt-meta demos into Delta Live Tables Notebook github [PR](https://github.com/databrickslabs/dlt-meta/issues/158)
21+
- Fixed issue Adding multiple col support for auto_cdc api [PR](https://github.com/databrickslabs/dlt-meta/pull/224)
2122

2223

2324
## [v.0.0.9]

docs/content/faq/execution.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -114,7 +114,7 @@ When you launch Lakeflow Declarative Pipeline it will read silver onboarding and
114114
"keys":[
115115
"customer_id"
116116
],
117-
"sequence_by":"dmsTimestamp",
117+
"sequence_by":"dmsTimestamp,enqueueTimestamp,sequenceId",
118118
"scd_type":"2",
119119
"apply_as_deletes":"Op = 'D'",
120120
"except_column_list":[

docs/content/releases/_index.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ draft: false
4343
- Fixed issue Changed dbfs.create to mkdirs for CLI: [PR](https://github.com/databrickslabs/dlt-meta/pull/53)
4444
- Fixed issue DLT-META CLI should use pypi lib instead of whl : [PR](https://github.com/databrickslabs/dlt-meta/pull/79)
4545
- Fixed issue Onboarding with multiple partition columns errors out: [PR](https://github.com/databrickslabs/dlt-meta/pull/134)
46+
- Fixed issue Adding multiple col support for auto_cdc api [PR](https://github.com/databrickslabs/dlt-meta/pull/224)
4647

4748
# v0.0.8
4849
## Enhancements

src/dataflow_pipeline.py

Lines changed: 20 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
import ast
66
import dlt
77
from pyspark.sql import DataFrame
8-
from pyspark.sql.functions import expr
8+
from pyspark.sql.functions import expr, struct
99
from pyspark.sql.types import StructType, StructField
1010
from src.dataflow_spec import BronzeDataflowSpec, SilverDataflowSpec, DataflowSpecUtils
1111
from src.pipeline_writers import AppendFlowWriter, DLTSinkWriter
@@ -630,11 +630,18 @@ def cdc_apply_changes(self):
630630
target_table = (
631631
f"{target_cl_name}{target_db_name}.{target_table_name}"
632632
)
633+
634+
# Handle comma-separated sequence columns using struct
635+
sequence_by = cdc_apply_changes.sequence_by
636+
if ',' in sequence_by:
637+
sequence_cols = [col.strip() for col in sequence_by.split(',')]
638+
sequence_by = struct(*sequence_cols) # Use struct() from pyspark.sql.functions
639+
633640
dlt.create_auto_cdc_flow(
634641
target=target_table,
635642
source=self.view_name,
636643
keys=cdc_apply_changes.keys,
637-
sequence_by=cdc_apply_changes.sequence_by,
644+
sequence_by=sequence_by,
638645
where=cdc_apply_changes.where,
639646
ignore_null_updates=cdc_apply_changes.ignore_null_updates,
640647
apply_as_deletes=apply_as_deletes,
@@ -673,8 +680,17 @@ def modify_schema_for_cdc_changes(self, cdc_apply_changes):
673680
for field in struct_schema.fields:
674681
if field.name not in cdc_apply_changes.except_column_list:
675682
modified_schema.add(field)
676-
if field.name == cdc_apply_changes.sequence_by:
677-
sequenced_by_data_type = field.dataType
683+
# For SCD Type 2, get data type of first sequence column
684+
sequence_by = cdc_apply_changes.sequence_by.strip()
685+
if ',' not in sequence_by:
686+
# Single column sequence
687+
if field.name == sequence_by:
688+
sequenced_by_data_type = field.dataType
689+
else:
690+
# Multiple column sequence - use first column's type
691+
first_sequence_col = sequence_by.split(',')[0].strip()
692+
if field.name == first_sequence_col:
693+
sequenced_by_data_type = field.dataType
678694
struct_schema = modified_schema
679695
else:
680696
raise Exception(f"Schema is None for {self.dataflowSpec} for cdc_apply_changes! ")

tests/test_dataflow_pipeline.py

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1363,6 +1363,22 @@ def test_write_bronze_cdc_apply_changes(self, mock_cdc_apply_changes):
13631363
pipeline.write_bronze()
13641364
assert mock_cdc_apply_changes.called
13651365

1366+
@patch.object(DataflowPipeline, 'cdc_apply_changes', return_value=None)
1367+
def test_write_bronze_cdc_apply_changes_multiple_sequence(self, mock_cdc_apply_changes):
1368+
"""Test write_bronze with CDC apply changes using multiple sequence columns."""
1369+
bronze_dataflow_spec = BronzeDataflowSpec(**self.bronze_dataflow_spec_map)
1370+
bronze_dataflow_spec.cdcApplyChanges = json.dumps({
1371+
"keys": ["id"],
1372+
"sequence_by": "event_timestamp, enqueue_timestamp, sequence_id",
1373+
"scd_type": "1",
1374+
"apply_as_deletes": "operation = 'DELETE'",
1375+
"except_column_list": ["operation", "event_timestamp", "enqueue_timestamp", "sequence_id", "_rescued_data"]
1376+
})
1377+
view_name = f"{bronze_dataflow_spec.targetDetails['table']}_inputview"
1378+
pipeline = DataflowPipeline(self.spark, bronze_dataflow_spec, view_name, None)
1379+
pipeline.write_bronze()
1380+
assert mock_cdc_apply_changes.called
1381+
13661382
@patch('pyspark.sql.SparkSession.readStream')
13671383
def test_get_silver_schema_uc_enabled(self, mock_read_stream):
13681384
"""Test get_silver_schema with Unity Catalog enabled."""

0 commit comments

Comments
 (0)