Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add hard_deletes config and new_record Option for Snapshots #317

Merged
merged 15 commits into from
Nov 22, 2024
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .changes/unreleased/Features-20241104-120653.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
kind: Features
body: Add new hard_deletes="new_record" mode for snapshots.
time: 2024-11-04T12:06:53.225939-05:00
custom:
Author: peterallenwebb
Issue: "317"
4 changes: 2 additions & 2 deletions dbt/adapters/base/impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -793,8 +793,8 @@ def valid_snapshot_target(
columns = self.get_columns_in_relation(relation)
names = set(c.name.lower() for c in columns)
missing = []
# Note: we're not checking dbt_updated_at here because it's not
# always present.
# Note: we're not checking dbt_updated_at or dbt_is_deleted here because they
peterallenwebb marked this conversation as resolved.
Show resolved Hide resolved
# aren't always present.
for column in ("dbt_scd_id", "dbt_valid_from", "dbt_valid_to"):
desired = column_names[column] if column_names else column
if desired not in names:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,29 @@
{% endmacro %}

{% macro get_snapshot_table_column_names() %}
{{ return({'dbt_valid_to': 'dbt_valid_to', 'dbt_valid_from': 'dbt_valid_from', 'dbt_scd_id': 'dbt_scd_id', 'dbt_updated_at': 'dbt_updated_at'}) }}
{{ return({'dbt_valid_to': 'dbt_valid_to', 'dbt_valid_from': 'dbt_valid_from', 'dbt_scd_id': 'dbt_scd_id', 'dbt_updated_at': 'dbt_updated_at', 'dbt_is_deleted': 'dbt_is_deleted'}) }}
peterallenwebb marked this conversation as resolved.
Show resolved Hide resolved
{% endmacro %}

{# Check the hard_deletes config enum, and the legacy invalidate_hard_deletes
config flag in order to determine which behavior should be used for deleted
records in the current snapshot. The default is to ignore them. #}
{% macro get_hard_delete_behavior() %}
{% set invalidate_hard_deletes = config.get('invalidate_hard_deletes') %}
{% set hard_deletes = config.get('hard_deletes') %}

{% if invalidate_hard_deletes is not none and hard_deletes is not none %}
{% do exceptions.raise_compiler_error("You cannot set both the invalidate_hard_deletes and hard_deletes config properties on the same snapshot.") %}
{% endif %}

{% if invalidate_hard_deletes or hard_deletes == 'invalidate' %}
{{ return('invalidate') }}
{% elif hard_deletes == 'new_record' %}
{{ return('new_record') }}
{% elif hard_deletes is none or hard_deletes == 'ignore' %}
{{ return('ignore') }}
{% else %}
{% do exceptions.raise_compiler_error("Invalid setting for property hard_deletes.") %}
{% endif %}
{% endmacro %}

{% macro default__snapshot_staging_table(strategy, source_sql, target_relation) -%}
Expand Down Expand Up @@ -82,7 +104,7 @@
from snapshot_query
),

{%- if strategy.invalidate_hard_deletes %}
{%- if strategy.hard_deletes == 'invalidate' or strategy.hard_deletes == 'new_record' %}

deletes_source_data as (

Expand All @@ -96,6 +118,9 @@
select
'insert' as dbt_change_type,
source_data.*
{%- if strategy.hard_deletes == 'new_record' -%},
'False' as {{ columns.dbt_is_deleted }}
{%- endif %}

from insertions_source_data as source_data
left outer join snapshotted_data
Expand All @@ -113,6 +138,9 @@
'update' as dbt_change_type,
source_data.*,
snapshotted_data.{{ columns.dbt_scd_id }}
{%- if strategy.hard_deletes == 'new_record' -%},
snapshotted_data.{{ columns.dbt_is_deleted }}
{%- endif %}

from updates_source_data as source_data
join snapshotted_data
Expand All @@ -122,9 +150,8 @@
)
)

{%- if strategy.invalidate_hard_deletes -%}
{%- if strategy.hard_deletes == 'invalidate' or strategy.hard_deletes == 'new_record' %}
,

deletes as (

select
Expand All @@ -134,7 +161,38 @@
{{ snapshot_get_time() }} as {{ columns.dbt_updated_at }},
{{ snapshot_get_time() }} as {{ columns.dbt_valid_to }},
snapshotted_data.{{ columns.dbt_scd_id }}
{%- if strategy.hard_deletes == 'new_record' -%},
peterallenwebb marked this conversation as resolved.
Show resolved Hide resolved
snapshotted_data.{{ columns.dbt_is_deleted }}
{%- endif %}
from snapshotted_data
left join deletes_source_data as source_data
on {{ unique_key_join_on(strategy.unique_key, "snapshotted_data", "source_data") }}
where {{ unique_key_is_null(strategy.unique_key, "source_data") }}
)
{%- endif %}

{%- if strategy.hard_deletes == 'new_record' %}
{% set source_sql_cols = get_column_schema_from_query(source_sql) %}
,
deletion_records as (

select
'insert' as dbt_change_type,
{%- for col in source_sql_cols -%}
snapshotted_data.{{ adapter.quote(col.column) }},
{% endfor -%}
{%- if strategy.unique_key | is_list -%}
{%- for key in strategy.unique_key -%}
snapshotted_data.{{ key }} as dbt_unique_key_{{ loop.index }},
{% endfor -%}
{%- else -%}
snapshotted_data.{{ strategy.unique_key }} as dbt_unique_key,
{% endif -%}
{{ snapshot_get_time() }} as {{ columns.dbt_valid_from }},
peterallenwebb marked this conversation as resolved.
Show resolved Hide resolved
{{ snapshot_get_time() }} as {{ columns.dbt_updated_at }},
snapshotted_data.{{ columns.dbt_valid_to }} as {{ columns.dbt_valid_to }},
peterallenwebb marked this conversation as resolved.
Show resolved Hide resolved
snapshotted_data.{{ columns.dbt_scd_id }},
'True' as {{ columns.dbt_is_deleted }}
from snapshotted_data
left join deletes_source_data as source_data
on {{ unique_key_join_on(strategy.unique_key, "snapshotted_data", "source_data") }}
Expand All @@ -145,10 +203,15 @@
select * from insertions
union all
select * from updates
{%- if strategy.invalidate_hard_deletes %}
{%- if strategy.hard_deletes == 'invalidate' or strategy.hard_deletes == 'new_record' %}
union all
select * from deletes
{%- endif %}
{%- if strategy.hard_deletes == 'new_record' %}
union all
select * from deletion_records
{%- endif %}


{%- endmacro %}

Expand All @@ -165,6 +228,9 @@
{{ strategy.updated_at }} as {{ columns.dbt_updated_at }},
{{ strategy.updated_at }} as {{ columns.dbt_valid_from }},
{{ get_dbt_valid_to_current(strategy, columns) }}
{%- if strategy.hard_deletes == 'new_record' -%},
'False' as {{ columns.dbt_is_deleted }}
{% endif -%}
from (
{{ sql }}
) sbq
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,22 @@

{{ adapter.valid_snapshot_target(target_relation, columns) }}

{# Raise an exception if the user has selected the new_record mode for
peterallenwebb marked this conversation as resolved.
Show resolved Hide resolved
hard deletes, but there is no dbt_is_deleted column in the target,
which would indicate it was created in a different mode. #}
{% if strategy.hard_deletes == 'new_record' %}
{% set target_cols = adapter.get_columns_in_relation(target_relation) %}
{% set ns = namespace(found_is_deleted_col = false) %}
{% for col in target_cols %}
{% if col.column == columns['dbt_is_deleted'] %}
{% set ns.found_is_deleted_col = true %}
{% endif %}
{% endfor %}
{% if not ns.found_is_deleted_col %}
{% do exceptions.raise_compiler_error('Did not find a dbt_is_deleted column in snapshot target. Changing the snapshot hard_deletes mode is not supported after a snapshot has been created.') %}
{% endif %}
{% endif %}

{% set build_or_select_sql = snapshot_staging_table(strategy, sql, target_relation) %}
{% set staging_table = build_snapshot_staging_table(strategy, sql, target_relation) %}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,8 @@
{# The model_config parameter is no longer used, but is passed in anyway for compatibility. #}
{% set primary_key = config.get('unique_key') %}
{% set updated_at = config.get('updated_at') %}
{% set invalidate_hard_deletes = config.get('invalidate_hard_deletes') or false %}
{% set hard_deletes = get_hard_delete_behavior() %}
{% set invalidate_hard_deletes = hard_deletes == 'invalidate' %}
{% set columns = config.get("snapshot_table_column_names") or get_snapshot_table_column_names() %}

{#/*
Expand All @@ -78,7 +79,8 @@
"updated_at": updated_at,
"row_changed": row_changed_expr,
"scd_id": scd_id_expr,
"invalidate_hard_deletes": invalidate_hard_deletes
"invalidate_hard_deletes": invalidate_hard_deletes,
"hard_deletes": hard_deletes
}) %}
{% endmacro %}

Expand Down Expand Up @@ -141,7 +143,8 @@
{# The model_config parameter is no longer used, but is passed in anyway for compatibility. #}
{% set check_cols_config = config.get('check_cols') %}
{% set primary_key = config.get('unique_key') %}
{% set invalidate_hard_deletes = config.get('invalidate_hard_deletes') or false %}
{% set hard_deletes = get_hard_delete_behavior() %}
{% set invalidate_hard_deletes = hard_deletes == 'invalidate' %}
{% set updated_at = config.get('updated_at') or snapshot_get_time() %}

{% set column_added = false %}
Expand Down Expand Up @@ -175,6 +178,7 @@
"updated_at": updated_at,
"row_changed": row_changed_expr,
"scd_id": scd_id_expr,
"invalidate_hard_deletes": invalidate_hard_deletes
"invalidate_hard_deletes": invalidate_hard_deletes,
"hard_deletes": hard_deletes
}) %}
{% endmacro %}
Loading