Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion sqlmesh/core/snapshot/evaluator.py
Original file line number Diff line number Diff line change
Expand Up @@ -494,7 +494,7 @@ def migrate(
with self.concurrent_context():
# Only migrate snapshots for which there's an existing data object
concurrent_apply_to_snapshots(
snapshots_by_name.values(),
target_snapshots,
lambda s: self._migrate_snapshot(
s,
snapshots_by_name,
Expand Down
57 changes: 57 additions & 0 deletions tests/core/test_snapshot_evaluator.py
Original file line number Diff line number Diff line change
Expand Up @@ -3955,6 +3955,63 @@ def test_migrate_snapshot(snapshot: Snapshot, mocker: MockerFixture, adapter_moc
)


def test_migrate_only_processes_target_snapshots(
mocker: MockerFixture, adapter_mock, make_snapshot
):
evaluator = SnapshotEvaluator(adapter_mock)

target_model = SqlModel(
name="test_schema.target_model",
kind=FullKind(),
query=parse_one("SELECT 1 AS a"),
)
extra_model = SqlModel(
name="test_schema.extra_model",
kind=FullKind(),
query=parse_one("SELECT 1 AS a"),
)

target_snapshot = make_snapshot(target_model)
extra_snapshot = make_snapshot(extra_model)
target_snapshot.categorize_as(SnapshotChangeCategory.BREAKING)
extra_snapshot.categorize_as(SnapshotChangeCategory.BREAKING)

target_snapshots = [target_snapshot]
snapshots = {
target_snapshot.snapshot_id: target_snapshot,
extra_snapshot.snapshot_id: extra_snapshot,
}

mocker.patch.object(
evaluator,
"_get_data_objects",
return_value={target_snapshot.snapshot_id: mocker.Mock()},
)
migrate_mock = mocker.patch.object(evaluator, "_migrate_snapshot")

def apply_side_effect(snapshot_iterable, fn, *_args, **_kwargs):
for snapshot in snapshot_iterable:
fn(snapshot)
return ([], [])

apply_mock = mocker.patch(
"sqlmesh.core.snapshot.evaluator.concurrent_apply_to_snapshots",
side_effect=apply_side_effect,
)

evaluator.migrate(target_snapshots=target_snapshots, snapshots=snapshots)

assert apply_mock.call_count == 1
called_snapshots = list(apply_mock.call_args.args[0])
assert called_snapshots == target_snapshots

migrate_mock.assert_called_once()
called_snapshot, snapshots_by_name, *_ = migrate_mock.call_args.args
assert called_snapshot is target_snapshot
assert target_snapshot.name in snapshots_by_name
assert extra_snapshot.name in snapshots_by_name


def test_migrate_managed(adapter_mock, make_snapshot, mocker: MockerFixture):
evaluator = SnapshotEvaluator(adapter_mock)

Expand Down