diff --git a/CHANGELOG.md b/CHANGELOG.md index b60ab83..71ec9b8 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -18,6 +18,7 @@ - Fixed issue Silver Data Quality not working [PR](https://github.com/databrickslabs/dlt-meta/issues/156) - Fixed issue Removed DPM flag check inside dataflowpipeline [PR](https://github.com/databrickslabs/dlt-meta/issues/177) - Fixed issue Updated dlt-meta demos into Delta Live Tables Notebook github [PR](https://github.com/databrickslabs/dlt-meta/issues/158) +- Fixed issue Adding multiple col support for auto_cdc api [PR](https://github.com/databrickslabs/dlt-meta/pull/224) ## [v.0.0.9] diff --git a/docs/content/faq/execution.md b/docs/content/faq/execution.md index ad6ca29..369724f 100644 --- a/docs/content/faq/execution.md +++ b/docs/content/faq/execution.md @@ -114,7 +114,7 @@ When you launch Lakeflow Declarative Pipeline it will read silver onboarding and "keys":[ "customer_id" ], - "sequence_by":"dmsTimestamp", + "sequence_by":"dmsTimestamp,enqueueTimestamp,sequenceId", "scd_type":"2", "apply_as_deletes":"Op = 'D'", "except_column_list":[ diff --git a/docs/content/releases/_index.md b/docs/content/releases/_index.md index dfc2e73..39999e4 100644 --- a/docs/content/releases/_index.md +++ b/docs/content/releases/_index.md @@ -43,6 +43,7 @@ draft: false - Fixed issue Changed dbfs.create to mkdirs for CLI: [PR](https://github.com/databrickslabs/dlt-meta/pull/53) - Fixed issue DLT-META CLI should use pypi lib instead of whl : [PR](https://github.com/databrickslabs/dlt-meta/pull/79) - Fixed issue Onboarding with multiple partition columns errors out: [PR](https://github.com/databrickslabs/dlt-meta/pull/134) +- Fixed issue Adding multiple col support for auto_cdc api [PR](https://github.com/databrickslabs/dlt-meta/pull/224) # v0.0.8 ## Enhancements diff --git a/src/dataflow_pipeline.py b/src/dataflow_pipeline.py index 06edf6f..568bb24 100644 --- a/src/dataflow_pipeline.py +++ b/src/dataflow_pipeline.py @@ -5,7 +5,7 @@ import ast import dlt from pyspark.sql import DataFrame -from pyspark.sql.functions import expr +from pyspark.sql.functions import expr, struct from pyspark.sql.types import StructType, StructField from src.dataflow_spec import BronzeDataflowSpec, SilverDataflowSpec, DataflowSpecUtils from src.pipeline_writers import AppendFlowWriter, DLTSinkWriter @@ -630,11 +630,18 @@ def cdc_apply_changes(self): target_table = ( f"{target_cl_name}{target_db_name}.{target_table_name}" ) + + # Handle comma-separated sequence columns using struct + sequence_by = cdc_apply_changes.sequence_by + if ',' in sequence_by: + sequence_cols = [col.strip() for col in sequence_by.split(',')] + sequence_by = struct(*sequence_cols) # Use struct() from pyspark.sql.functions + dlt.create_auto_cdc_flow( target=target_table, source=self.view_name, keys=cdc_apply_changes.keys, - sequence_by=cdc_apply_changes.sequence_by, + sequence_by=sequence_by, where=cdc_apply_changes.where, ignore_null_updates=cdc_apply_changes.ignore_null_updates, apply_as_deletes=apply_as_deletes, @@ -673,8 +680,17 @@ def modify_schema_for_cdc_changes(self, cdc_apply_changes): for field in struct_schema.fields: if field.name not in cdc_apply_changes.except_column_list: modified_schema.add(field) - if field.name == cdc_apply_changes.sequence_by: - sequenced_by_data_type = field.dataType + # For SCD Type 2, get data type of first sequence column + sequence_by = cdc_apply_changes.sequence_by.strip() + if ',' not in sequence_by: + # Single column sequence + if field.name == sequence_by: + sequenced_by_data_type = field.dataType + else: + # Multiple column sequence - use first column's type + first_sequence_col = sequence_by.split(',')[0].strip() + if field.name == first_sequence_col: + sequenced_by_data_type = field.dataType struct_schema = modified_schema else: raise Exception(f"Schema is None for {self.dataflowSpec} for cdc_apply_changes! ") diff --git a/tests/test_dataflow_pipeline.py b/tests/test_dataflow_pipeline.py index 17220d5..e4cc1e6 100644 --- a/tests/test_dataflow_pipeline.py +++ b/tests/test_dataflow_pipeline.py @@ -1363,6 +1363,22 @@ def test_write_bronze_cdc_apply_changes(self, mock_cdc_apply_changes): pipeline.write_bronze() assert mock_cdc_apply_changes.called + @patch.object(DataflowPipeline, 'cdc_apply_changes', return_value=None) + def test_write_bronze_cdc_apply_changes_multiple_sequence(self, mock_cdc_apply_changes): + """Test write_bronze with CDC apply changes using multiple sequence columns.""" + bronze_dataflow_spec = BronzeDataflowSpec(**self.bronze_dataflow_spec_map) + bronze_dataflow_spec.cdcApplyChanges = json.dumps({ + "keys": ["id"], + "sequence_by": "event_timestamp, enqueue_timestamp, sequence_id", + "scd_type": "1", + "apply_as_deletes": "operation = 'DELETE'", + "except_column_list": ["operation", "event_timestamp", "enqueue_timestamp", "sequence_id", "_rescued_data"] + }) + view_name = f"{bronze_dataflow_spec.targetDetails['table']}_inputview" + pipeline = DataflowPipeline(self.spark, bronze_dataflow_spec, view_name, None) + pipeline.write_bronze() + assert mock_cdc_apply_changes.called + @patch('pyspark.sql.SparkSession.readStream') def test_get_silver_schema_uc_enabled(self, mock_read_stream): """Test get_silver_schema with Unity Catalog enabled."""