diff --git a/sqlmesh/core/snapshot/evaluator.py b/sqlmesh/core/snapshot/evaluator.py index 658bb1c400..86fa897005 100644 --- a/sqlmesh/core/snapshot/evaluator.py +++ b/sqlmesh/core/snapshot/evaluator.py @@ -494,7 +494,7 @@ def migrate( with self.concurrent_context(): # Only migrate snapshots for which there's an existing data object concurrent_apply_to_snapshots( - snapshots_by_name.values(), + target_snapshots, lambda s: self._migrate_snapshot( s, snapshots_by_name, diff --git a/tests/core/test_snapshot_evaluator.py b/tests/core/test_snapshot_evaluator.py index 1c3d1e6adc..5e7b078787 100644 --- a/tests/core/test_snapshot_evaluator.py +++ b/tests/core/test_snapshot_evaluator.py @@ -3955,6 +3955,63 @@ def test_migrate_snapshot(snapshot: Snapshot, mocker: MockerFixture, adapter_moc ) +def test_migrate_only_processes_target_snapshots( + mocker: MockerFixture, adapter_mock, make_snapshot +): + evaluator = SnapshotEvaluator(adapter_mock) + + target_model = SqlModel( + name="test_schema.target_model", + kind=FullKind(), + query=parse_one("SELECT 1 AS a"), + ) + extra_model = SqlModel( + name="test_schema.extra_model", + kind=FullKind(), + query=parse_one("SELECT 1 AS a"), + ) + + target_snapshot = make_snapshot(target_model) + extra_snapshot = make_snapshot(extra_model) + target_snapshot.categorize_as(SnapshotChangeCategory.BREAKING) + extra_snapshot.categorize_as(SnapshotChangeCategory.BREAKING) + + target_snapshots = [target_snapshot] + snapshots = { + target_snapshot.snapshot_id: target_snapshot, + extra_snapshot.snapshot_id: extra_snapshot, + } + + mocker.patch.object( + evaluator, + "_get_data_objects", + return_value={target_snapshot.snapshot_id: mocker.Mock()}, + ) + migrate_mock = mocker.patch.object(evaluator, "_migrate_snapshot") + + def apply_side_effect(snapshot_iterable, fn, *_args, **_kwargs): + for snapshot in snapshot_iterable: + fn(snapshot) + return ([], []) + + apply_mock = mocker.patch( + "sqlmesh.core.snapshot.evaluator.concurrent_apply_to_snapshots", + side_effect=apply_side_effect, + ) + + evaluator.migrate(target_snapshots=target_snapshots, snapshots=snapshots) + + assert apply_mock.call_count == 1 + called_snapshots = list(apply_mock.call_args.args[0]) + assert called_snapshots == target_snapshots + + migrate_mock.assert_called_once() + called_snapshot, snapshots_by_name, *_ = migrate_mock.call_args.args + assert called_snapshot is target_snapshot + assert target_snapshot.name in snapshots_by_name + assert extra_snapshot.name in snapshots_by_name + + def test_migrate_managed(adapter_mock, make_snapshot, mocker: MockerFixture): evaluator = SnapshotEvaluator(adapter_mock)