Skip to content

Commit

Permalink
Add hard_deletes config and new_record Option for Snapshots (#317)
Browse files Browse the repository at this point in the history
  • Loading branch information
peterallenwebb authored Nov 22, 2024
1 parent d56ca34 commit 159b234
Show file tree
Hide file tree
Showing 6 changed files with 344 additions and 12 deletions.
6 changes: 6 additions & 0 deletions .changes/unreleased/Features-20241104-120653.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
kind: Features
body: Add new hard_deletes="new_record" mode for snapshots.
time: 2024-11-04T12:06:53.225939-05:00
custom:
Author: peterallenwebb
Issue: "317"
225 changes: 225 additions & 0 deletions dbt-tests-adapter/dbt/tests/adapter/simple_snapshot/new_record_mode.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,225 @@
import pytest

from dbt.tests.util import check_relations_equal, run_dbt

_seed_new_record_mode = """
create table {database}.{schema}.seed (
id INTEGER,
first_name VARCHAR(50),
last_name VARCHAR(50),
email VARCHAR(50),
gender VARCHAR(50),
ip_address VARCHAR(20),
updated_at TIMESTAMP WITHOUT TIME ZONE
);
create table {database}.{schema}.snapshot_expected (
id INTEGER,
first_name VARCHAR(50),
last_name VARCHAR(50),
email VARCHAR(50),
gender VARCHAR(50),
ip_address VARCHAR(20),
-- snapshotting fields
updated_at TIMESTAMP WITHOUT TIME ZONE,
dbt_valid_from TIMESTAMP WITHOUT TIME ZONE,
dbt_valid_to TIMESTAMP WITHOUT TIME ZONE,
dbt_scd_id TEXT,
dbt_updated_at TIMESTAMP WITHOUT TIME ZONE,
dbt_is_deleted TEXT
);
-- seed inserts
-- use the same email for two users to verify that duplicated check_cols values
-- are handled appropriately
insert into {database}.{schema}.seed (id, first_name, last_name, email, gender, ip_address, updated_at) values
(1, 'Judith', 'Kennedy', '(not provided)', 'Female', '54.60.24.128', '2015-12-24 12:19:28'),
(2, 'Arthur', 'Kelly', '(not provided)', 'Male', '62.56.24.215', '2015-10-28 16:22:15'),
(3, 'Rachel', 'Moreno', '[email protected]', 'Female', '31.222.249.23', '2016-04-05 02:05:30'),
(4, 'Ralph', 'Turner', '[email protected]', 'Male', '157.83.76.114', '2016-08-08 00:06:51'),
(5, 'Laura', 'Gonzales', '[email protected]', 'Female', '30.54.105.168', '2016-09-01 08:25:38'),
(6, 'Katherine', 'Lopez', '[email protected]', 'Female', '169.138.46.89', '2016-08-30 18:52:11'),
(7, 'Jeremy', 'Hamilton', '[email protected]', 'Male', '231.189.13.133', '2016-07-17 02:09:46'),
(8, 'Heather', 'Rose', '[email protected]', 'Female', '87.165.201.65', '2015-12-29 22:03:56'),
(9, 'Gregory', 'Kelly', '[email protected]', 'Male', '154.209.99.7', '2016-03-24 21:18:16'),
(10, 'Rachel', 'Lopez', '[email protected]', 'Female', '237.165.82.71', '2016-08-20 15:44:49'),
(11, 'Donna', 'Welch', '[email protected]', 'Female', '103.33.110.138', '2016-02-27 01:41:48'),
(12, 'Russell', 'Lawrence', '[email protected]', 'Male', '189.115.73.4', '2016-06-11 03:07:09'),
(13, 'Michelle', 'Montgomery', '[email protected]', 'Female', '243.220.95.82', '2016-06-18 16:27:19'),
(14, 'Walter', 'Castillo', '[email protected]', 'Male', '71.159.238.196', '2016-10-06 01:55:44'),
(15, 'Robin', 'Mills', '[email protected]', 'Female', '172.190.5.50', '2016-10-31 11:41:21'),
(16, 'Raymond', 'Holmes', '[email protected]', 'Male', '148.153.166.95', '2016-10-03 08:16:38'),
(17, 'Gary', 'Bishop', '[email protected]', 'Male', '161.108.182.13', '2016-08-29 19:35:20'),
(18, 'Anna', 'Riley', '[email protected]', 'Female', '253.31.108.22', '2015-12-11 04:34:27'),
(19, 'Sarah', 'Knight', '[email protected]', 'Female', '222.220.3.177', '2016-09-26 00:49:06'),
(20, 'Phyllis', 'Fox', null, 'Female', '163.191.232.95', '2016-08-21 10:35:19');
-- populate snapshot table
insert into {database}.{schema}.snapshot_expected (
id,
first_name,
last_name,
email,
gender,
ip_address,
updated_at,
dbt_valid_from,
dbt_valid_to,
dbt_updated_at,
dbt_scd_id,
dbt_is_deleted
)
select
id,
first_name,
last_name,
email,
gender,
ip_address,
updated_at,
-- fields added by snapshotting
updated_at as dbt_valid_from,
null::timestamp as dbt_valid_to,
updated_at as dbt_updated_at,
md5(id || '-' || first_name || '|' || updated_at::text) as dbt_scd_id,
'False' as dbt_is_deleted
from {database}.{schema}.seed;
"""

_snapshot_actual_sql = """
{% snapshot snapshot_actual %}
{{
config(
unique_key='id || ' ~ "'-'" ~ ' || first_name',
)
}}
select * from {{target.database}}.{{target.schema}}.seed
{% endsnapshot %}
"""

_snapshots_yml = """
snapshots:
- name: snapshot_actual
config:
strategy: timestamp
updated_at: updated_at
hard_deletes: new_record
"""

_ref_snapshot_sql = """
select * from {{ ref('snapshot_actual') }}
"""


_invalidate_sql = """
-- update records 11 - 21. Change email and updated_at field
update {schema}.seed set
updated_at = updated_at + interval '1 hour',
email = case when id = 20 then '[email protected]' else 'new_' || email end
where id >= 10 and id <= 20;
-- invalidate records 11 - 21
update {schema}.snapshot_expected set
dbt_valid_to = updated_at + interval '1 hour'
where id >= 10 and id <= 20;
"""

_update_sql = """
-- insert v2 of the 11 - 21 records
insert into {database}.{schema}.snapshot_expected (
id,
first_name,
last_name,
email,
gender,
ip_address,
updated_at,
dbt_valid_from,
dbt_valid_to,
dbt_updated_at,
dbt_scd_id,
dbt_is_deleted
)
select
id,
first_name,
last_name,
email,
gender,
ip_address,
updated_at,
-- fields added by snapshotting
updated_at as dbt_valid_from,
null::timestamp as dbt_valid_to,
updated_at as dbt_updated_at,
md5(id || '-' || first_name || '|' || updated_at::text) as dbt_scd_id,
'False' as dbt_is_deleted
from {database}.{schema}.seed
where id >= 10 and id <= 20;
"""

_delete_sql = """
delete from {schema}.seed where id = 1
"""


class SnapshotNewRecordMode:
@pytest.fixture(scope="class")
def snapshots(self):
return {"snapshot.sql": _snapshot_actual_sql}

@pytest.fixture(scope="class")
def models(self):
return {
"snapshots.yml": _snapshots_yml,
"ref_snapshot.sql": _ref_snapshot_sql,
}

@pytest.fixture(scope="class")
def seed_new_record_mode(self):
return _seed_new_record_mode

@pytest.fixture(scope="class")
def invalidate_sql(self):
return _invalidate_sql

@pytest.fixture(scope="class")
def update_sql(self):
return _update_sql

@pytest.fixture(scope="class")
def delete_sql(self):
return _delete_sql

def test_snapshot_new_record_mode(
self, project, seed_new_record_mode, invalidate_sql, update_sql
):
project.run_sql(seed_new_record_mode)
results = run_dbt(["snapshot"])
assert len(results) == 1

project.run_sql(invalidate_sql)
project.run_sql(update_sql)

results = run_dbt(["snapshot"])
assert len(results) == 1

check_relations_equal(project.adapter, ["snapshot_actual", "snapshot_expected"])

project.run_sql(_delete_sql)

results = run_dbt(["snapshot"])
assert len(results) == 1

# TODO: Further validate results.
57 changes: 55 additions & 2 deletions dbt/adapters/base/impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,14 @@ class FreshnessResponse(TypedDict):
age: float # age in seconds


class SnapshotStrategy(TypedDict):
unique_key: Optional[str]
updated_at: Optional[str]
row_changed: Optional[str]
scd_id: Optional[str]
hard_deletes: Optional[str]


class BaseAdapter(metaclass=AdapterMeta):
"""The BaseAdapter provides an abstract base class for adapters.
Expand Down Expand Up @@ -795,8 +803,8 @@ def valid_snapshot_target(
columns = self.get_columns_in_relation(relation)
names = set(c.name.lower() for c in columns)
missing = []
# Note: we're not checking dbt_updated_at here because it's not
# always present.
# Note: we're not checking dbt_updated_at or dbt_is_deleted here because they
# aren't always present.
for column in ("dbt_scd_id", "dbt_valid_from", "dbt_valid_to"):
desired = column_names[column] if column_names else column
if desired not in names:
Expand All @@ -805,6 +813,28 @@ def valid_snapshot_target(
if missing:
raise SnapshotTargetNotSnapshotTableError(missing)

@available.parse_none
def assert_valid_snapshot_target_given_strategy(
self, relation: BaseRelation, column_names: Dict[str, str], strategy: SnapshotStrategy
) -> None:
# Assert everything we can with the legacy function.
self.valid_snapshot_target(relation, column_names)

# Now do strategy-specific checks.
# TODO: Make these checks more comprehensive.
if strategy.get("hard_deletes", None) == "new_record":
columns = self.get_columns_in_relation(relation)
names = set(c.name.lower() for c in columns)
missing = []

for column in ("dbt_is_deleted",):
desired = column_names[column] if column_names else column
if desired not in names:
missing.append(desired)

if missing:
raise SnapshotTargetNotSnapshotTableError(missing)

@available.parse_none
def expand_target_column_types(
self, from_relation: BaseRelation, to_relation: BaseRelation
Expand Down Expand Up @@ -1795,6 +1825,29 @@ def _get_adapter_specific_run_info(cls, config) -> Dict[str, Any]:
"""
return {}

@available.parse_none
@classmethod
def get_hard_deletes_behavior(cls, config):
"""Check the hard_deletes config enum, and the legacy invalidate_hard_deletes
config flag in order to determine which behavior should be used for deleted
records in a snapshot. The default is to ignore them."""
invalidate_hard_deletes = config.get("invalidate_hard_deletes", None)
hard_deletes = config.get("hard_deletes", None)

if invalidate_hard_deletes is not None and hard_deletes is not None:
raise DbtValidationError(
"You cannot set both the invalidate_hard_deletes and hard_deletes config properties on the same snapshot."
)

if invalidate_hard_deletes or hard_deletes == "invalidate":
return "invalidate"
elif hard_deletes == "new_record":
return "new_record"
elif hard_deletes is None or hard_deletes == "ignore":
return "ignore"

raise DbtValidationError("Invalid setting for property hard_deletes.")


COLUMNS_EQUAL_SQL = """
with diff_count as (
Expand Down
Loading

0 comments on commit 159b234

Please sign in to comment.