Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add hard_deletes config and new_record Option for Snapshots #317

Merged
merged 15 commits into from
Nov 22, 2024
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .changes/unreleased/Features-20241104-120653.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
kind: Features
body: Add new hard_deletes="new_record" mode for snapshots.
time: 2024-11-04T12:06:53.225939-05:00
custom:
Author: peterallenwebb
Issue: "317"
57 changes: 55 additions & 2 deletions dbt/adapters/base/impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,14 @@ class FreshnessResponse(TypedDict):
age: float # age in seconds


class SnapshotStrategy(TypedDict):
unique_key: Optional[str]
updated_at: Optional[str]
row_changed: Optional[str]
scd_id: Optional[str]
hard_deletes: Optional[str]


class BaseAdapter(metaclass=AdapterMeta):
"""The BaseAdapter provides an abstract base class for adapters.

Expand Down Expand Up @@ -795,8 +803,8 @@ def valid_snapshot_target(
columns = self.get_columns_in_relation(relation)
names = set(c.name.lower() for c in columns)
missing = []
# Note: we're not checking dbt_updated_at here because it's not
# always present.
# Note: we're not checking dbt_updated_at or dbt_is_deleted here because they
peterallenwebb marked this conversation as resolved.
Show resolved Hide resolved
# aren't always present.
for column in ("dbt_scd_id", "dbt_valid_from", "dbt_valid_to"):
desired = column_names[column] if column_names else column
if desired not in names:
Expand All @@ -805,6 +813,28 @@ def valid_snapshot_target(
if missing:
raise SnapshotTargetNotSnapshotTableError(missing)

@available.parse_none
def assert_valid_snapshot_target_given_strategy(
self, relation: BaseRelation, column_names: Dict[str, str], strategy: SnapshotStrategy
) -> None:
# Assert everything we can with the legacy function.
self.valid_snapshot_target(relation, column_names)

# Now do strategy-specific checks.
# TODO: Make these checks more comprehensive.
if strategy.get("hard_deletes", None) == "new_record":
columns = self.get_columns_in_relation(relation)
names = set(c.name.lower() for c in columns)
missing = []

for column in ("dbt_is_deleted",):
desired = column_names[column] if column_names else column
if desired not in names:
missing.append(desired)

if missing:
raise SnapshotTargetNotSnapshotTableError(missing)

@available.parse_none
def expand_target_column_types(
self, from_relation: BaseRelation, to_relation: BaseRelation
Expand Down Expand Up @@ -1795,6 +1825,29 @@ def _get_adapter_specific_run_info(cls, config) -> Dict[str, Any]:
"""
return {}

@available.parse_none
@classmethod
def get_hard_deletes_behavior(cls, config):
"""Check the hard_deletes config enum, and the legacy invalidate_hard_deletes
config flag in order to determine which behavior should be used for deleted
records in a snapshot. The default is to ignore them."""
invalidate_hard_deletes = config.get("invalidate_hard_deletes", None)
hard_deletes = config.get("hard_deletes", None)

if invalidate_hard_deletes is not None and hard_deletes is not None:
peterallenwebb marked this conversation as resolved.
Show resolved Hide resolved
raise DbtValidationError(
"You cannot set both the invalidate_hard_deletes and hard_deletes config properties on the same snapshot."
)

if invalidate_hard_deletes or hard_deletes == "invalidate":
return "invalidate"
elif hard_deletes == "new_record":
return "new_record"
elif hard_deletes is None or hard_deletes == "ignore":
return "ignore"

raise DbtValidationError("Invalid setting for property hard_deletes.")


COLUMNS_EQUAL_SQL = """
with diff_count as (
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
{% endmacro %}

{% macro get_snapshot_table_column_names() %}
{{ return({'dbt_valid_to': 'dbt_valid_to', 'dbt_valid_from': 'dbt_valid_from', 'dbt_scd_id': 'dbt_scd_id', 'dbt_updated_at': 'dbt_updated_at'}) }}
{{ return({'dbt_valid_to': 'dbt_valid_to', 'dbt_valid_from': 'dbt_valid_from', 'dbt_scd_id': 'dbt_scd_id', 'dbt_updated_at': 'dbt_updated_at', 'dbt_is_deleted': 'dbt_is_deleted'}) }}
peterallenwebb marked this conversation as resolved.
Show resolved Hide resolved
{% endmacro %}

{% macro default__snapshot_staging_table(strategy, source_sql, target_relation) -%}
Expand Down Expand Up @@ -82,7 +82,7 @@
from snapshot_query
),

{%- if strategy.invalidate_hard_deletes %}
{%- if strategy.hard_deletes == 'invalidate' or strategy.hard_deletes == 'new_record' %}

deletes_source_data as (

Expand All @@ -96,6 +96,9 @@
select
'insert' as dbt_change_type,
source_data.*
{%- if strategy.hard_deletes == 'new_record' -%}
,'False' as {{ columns.dbt_is_deleted }}
{%- endif %}

from insertions_source_data as source_data
left outer join snapshotted_data
Expand All @@ -113,6 +116,9 @@
'update' as dbt_change_type,
source_data.*,
snapshotted_data.{{ columns.dbt_scd_id }}
{%- if strategy.hard_deletes == 'new_record' -%}
, snapshotted_data.{{ columns.dbt_is_deleted }}
{%- endif %}

from updates_source_data as source_data
join snapshotted_data
Expand All @@ -122,9 +128,8 @@
)
)

{%- if strategy.invalidate_hard_deletes -%}
{%- if strategy.hard_deletes == 'invalidate' or strategy.hard_deletes == 'new_record' %}
,

deletes as (

select
Expand All @@ -134,7 +139,38 @@
{{ snapshot_get_time() }} as {{ columns.dbt_updated_at }},
{{ snapshot_get_time() }} as {{ columns.dbt_valid_to }},
snapshotted_data.{{ columns.dbt_scd_id }}
{%- if strategy.hard_deletes == 'new_record' -%}
, snapshotted_data.{{ columns.dbt_is_deleted }}
{%- endif %}
from snapshotted_data
left join deletes_source_data as source_data
on {{ unique_key_join_on(strategy.unique_key, "snapshotted_data", "source_data") }}
where {{ unique_key_is_null(strategy.unique_key, "source_data") }}
)
{%- endif %}

{%- if strategy.hard_deletes == 'new_record' %}
{% set source_sql_cols = get_column_schema_from_query(source_sql) %}
,
deletion_records as (

select
'insert' as dbt_change_type,
{%- for col in source_sql_cols -%}
snapshotted_data.{{ adapter.quote(col.column) }},
{% endfor -%}
{%- if strategy.unique_key | is_list -%}
{%- for key in strategy.unique_key -%}
snapshotted_data.{{ key }} as dbt_unique_key_{{ loop.index }},
{% endfor -%}
{%- else -%}
snapshotted_data.dbt_unique_key as dbt_unique_key,
{% endif -%}
{{ snapshot_get_time() }} as {{ columns.dbt_valid_from }},
peterallenwebb marked this conversation as resolved.
Show resolved Hide resolved
{{ snapshot_get_time() }} as {{ columns.dbt_updated_at }},
snapshotted_data.{{ columns.dbt_valid_to }} as {{ columns.dbt_valid_to }},
peterallenwebb marked this conversation as resolved.
Show resolved Hide resolved
snapshotted_data.{{ columns.dbt_scd_id }},
'True' as {{ columns.dbt_is_deleted }}
from snapshotted_data
left join deletes_source_data as source_data
on {{ unique_key_join_on(strategy.unique_key, "snapshotted_data", "source_data") }}
Expand All @@ -145,10 +181,15 @@
select * from insertions
union all
select * from updates
{%- if strategy.invalidate_hard_deletes %}
{%- if strategy.hard_deletes == 'invalidate' or strategy.hard_deletes == 'new_record' %}
union all
select * from deletes
{%- endif %}
{%- if strategy.hard_deletes == 'new_record' %}
union all
select * from deletion_records
{%- endif %}


{%- endmacro %}

Expand All @@ -165,6 +206,9 @@
{{ strategy.updated_at }} as {{ columns.dbt_updated_at }},
{{ strategy.updated_at }} as {{ columns.dbt_valid_from }},
{{ get_dbt_valid_to_current(strategy, columns) }}
{%- if strategy.hard_deletes == 'new_record' -%}
, 'False' as {{ columns.dbt_is_deleted }}
{% endif -%}
from (
{{ sql }}
) sbq
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@

{% set columns = config.get("snapshot_table_column_names") or get_snapshot_table_column_names() %}

{{ adapter.valid_snapshot_target(target_relation, columns) }}
{{ adapter.assert_valid_snapshot_target_given_strategy(target_relation, columns, strategy) }}

{% set build_or_select_sql = snapshot_staging_table(strategy, sql, target_relation) %}
{% set staging_table = build_snapshot_staging_table(strategy, sql, target_relation) %}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,8 @@
{# The model_config parameter is no longer used, but is passed in anyway for compatibility. #}
{% set primary_key = config.get('unique_key') %}
{% set updated_at = config.get('updated_at') %}
{% set invalidate_hard_deletes = config.get('invalidate_hard_deletes') or false %}
{% set hard_deletes = adapter.get_hard_deletes_behavior(config) %}
{% set invalidate_hard_deletes = hard_deletes == 'invalidate' %}
{% set columns = config.get("snapshot_table_column_names") or get_snapshot_table_column_names() %}

{#/*
Expand All @@ -78,7 +79,8 @@
"updated_at": updated_at,
"row_changed": row_changed_expr,
"scd_id": scd_id_expr,
"invalidate_hard_deletes": invalidate_hard_deletes
"invalidate_hard_deletes": invalidate_hard_deletes,
"hard_deletes": hard_deletes
}) %}
{% endmacro %}

Expand Down Expand Up @@ -141,7 +143,8 @@
{# The model_config parameter is no longer used, but is passed in anyway for compatibility. #}
{% set check_cols_config = config.get('check_cols') %}
{% set primary_key = config.get('unique_key') %}
{% set invalidate_hard_deletes = config.get('invalidate_hard_deletes') or false %}
{% set hard_deletes = adapter.get_hard_deletes_behavior(config) %}
{% set invalidate_hard_deletes = hard_deletes == 'invalidate' %}
{% set updated_at = config.get('updated_at') or snapshot_get_time() %}

{% set column_added = false %}
Expand Down Expand Up @@ -175,6 +178,7 @@
"updated_at": updated_at,
"row_changed": row_changed_expr,
"scd_id": scd_id_expr,
"invalidate_hard_deletes": invalidate_hard_deletes
"invalidate_hard_deletes": invalidate_hard_deletes,
"hard_deletes": hard_deletes
}) %}
{% endmacro %}
Loading