diff --git a/.changes/unreleased/Fixes-20241217-163126.yaml b/.changes/unreleased/Fixes-20241217-163126.yaml new file mode 100644 index 000000000..8149567a5 --- /dev/null +++ b/.changes/unreleased/Fixes-20241217-163126.yaml @@ -0,0 +1,6 @@ +kind: Fixes +body: Fix the snapshot merge query for several adapters in new_record mode. +time: 2024-12-17T16:31:26.970526-05:00 +custom: + Author: peterallenwebb + Issue: "385" diff --git a/dbt-tests-adapter/dbt/tests/adapter/simple_snapshot/new_record_mode.py b/dbt-tests-adapter/dbt/tests/adapter/simple_snapshot/new_record_mode.py index c50f0ff94..310057635 100644 --- a/dbt-tests-adapter/dbt/tests/adapter/simple_snapshot/new_record_mode.py +++ b/dbt-tests-adapter/dbt/tests/adapter/simple_snapshot/new_record_mode.py @@ -2,7 +2,10 @@ from dbt.tests.util import check_relations_equal, run_dbt +# Snapshot source data for the tests in this file _seed_new_record_mode = """ +BEGIN + create table {database}.{schema}.seed ( id INTEGER, first_name VARCHAR(50), @@ -88,6 +91,8 @@ md5(id || '-' || first_name || '|' || updated_at::text) as dbt_scd_id, 'False' as dbt_is_deleted from {database}.{schema}.seed; + +END; """ _snapshot_actual_sql = """ @@ -119,6 +124,8 @@ _invalidate_sql = """ +BEGIN + -- update records 11 - 21. Change email and updated_at field update {schema}.seed set updated_at = updated_at + interval '1 hour', @@ -131,6 +138,7 @@ dbt_valid_to = updated_at + interval '1 hour' where id >= 10 and id <= 20; +END; """ _update_sql = """ @@ -169,8 +177,14 @@ where id >= 10 and id <= 20; """ +# SQL to delete a record from the snapshot source data _delete_sql = """ -delete from {schema}.seed where id = 1 +delete from {database}.{schema}.seed where id = 1 +""" + +# If the deletion worked correctly, this should return two rows, with one of them representing the deletion. +_delete_check_sql = """ +select dbt_valid_to, dbt_scd_id, dbt_is_deleted from {schema}.snapshot_actual where id = 1 """ @@ -222,4 +236,31 @@ def test_snapshot_new_record_mode( results = run_dbt(["snapshot"]) assert len(results) == 1 - # TODO: Further validate results. + check_result = project.run_sql(_delete_check_sql, fetch="all") + valid_to = 0 + scd_id = 1 + is_deleted = 2 + assert len(check_result) == 2 + assert ( + sum( + [ + 1 + for c in check_result + if c[valid_to] is None and c[scd_id] is not None and c[is_deleted] == "True" + ] + ) + == 1 + ) + assert ( + sum( + [ + 1 + for c in check_result + if c[valid_to] is not None + and c[scd_id] is not None + and c[is_deleted] == "False" + ] + ) + == 1 + ) + assert check_result[0][scd_id] != check_result[1][scd_id] diff --git a/dbt/include/global_project/macros/materializations/snapshots/helpers.sql b/dbt/include/global_project/macros/materializations/snapshots/helpers.sql index 33492cc95..e91d66113 100644 --- a/dbt/include/global_project/macros/materializations/snapshots/helpers.sql +++ b/dbt/include/global_project/macros/materializations/snapshots/helpers.sql @@ -40,7 +40,9 @@ {% macro default__snapshot_staging_table(strategy, source_sql, target_relation) -%} {% set columns = config.get('snapshot_table_column_names') or get_snapshot_table_column_names() %} - + {% if strategy.hard_deletes == 'new_record' %} + {% set new_scd_id = snapshot_hash_arguments([columns.dbt_scd_id, snapshot_get_time()]) %} + {% endif %} with snapshot_query as ( {{ source_sql }} @@ -169,12 +171,13 @@ {{ snapshot_get_time() }} as {{ columns.dbt_valid_from }}, {{ snapshot_get_time() }} as {{ columns.dbt_updated_at }}, snapshotted_data.{{ columns.dbt_valid_to }} as {{ columns.dbt_valid_to }}, - snapshotted_data.{{ columns.dbt_scd_id }}, + {{ new_scd_id }} as {{ columns.dbt_scd_id }}, 'True' as {{ columns.dbt_is_deleted }} from snapshotted_data left join deletes_source_data as source_data on {{ unique_key_join_on(strategy.unique_key, "snapshotted_data", "source_data") }} - where {{ unique_key_is_null(strategy.unique_key, "source_data") }} + where {{ unique_key_is_null(strategy.unique_key, "source_data") }} + ) {%- endif %}