Skip to content

Commit c40d791

Browse files
authored
Chore: Split migration script implementations into DDL and DML (#5307)
1 parent 61a0b11 commit c40d791

File tree

99 files changed

+555
-125
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

99 files changed

+555
-125
lines changed

sqlmesh/core/context.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -587,6 +587,7 @@ def state_sync(self) -> StateSync:
587587
self._state_sync = self._new_state_sync()
588588

589589
if self._state_sync.get_versions(validate=False).schema_version == 0:
590+
self.console.log_status_update("Initializing new project state...")
590591
self._state_sync.migrate(default_catalog=self.default_catalog)
591592
self._state_sync.get_versions()
592593
self._state_sync = CachingStateSync(self._state_sync) # type: ignore

sqlmesh/core/state_sync/db/environment.py

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -285,11 +285,13 @@ def get_environment_statements(self, environment: str) -> t.List[EnvironmentStat
285285
return []
286286

287287
def _environment_from_row(self, row: t.Tuple[str, ...]) -> Environment:
288-
return Environment(**{field: row[i] for i, field in enumerate(Environment.all_fields())})
288+
return Environment(
289+
**{field: row[i] for i, field in enumerate(sorted(Environment.all_fields()))}
290+
)
289291

290292
def _environment_summmary_from_row(self, row: t.Tuple[str, ...]) -> EnvironmentSummary:
291293
return EnvironmentSummary(
292-
**{field: row[i] for i, field in enumerate(EnvironmentSummary.all_fields())}
294+
**{field: row[i] for i, field in enumerate(sorted(EnvironmentSummary.all_fields()))}
293295
)
294296

295297
def _environments_query(
@@ -298,7 +300,7 @@ def _environments_query(
298300
lock_for_update: bool = False,
299301
required_fields: t.Optional[t.List[str]] = None,
300302
) -> exp.Select:
301-
query_fields = required_fields if required_fields else Environment.all_fields()
303+
query_fields = required_fields if required_fields else sorted(Environment.all_fields())
302304
query = (
303305
exp.select(*(exp.to_identifier(field) for field in query_fields))
304306
.from_(self.environments_table)
@@ -328,7 +330,7 @@ def _fetch_environment_summaries(
328330
self.engine_adapter,
329331
self._environments_query(
330332
where=where,
331-
required_fields=list(EnvironmentSummary.all_fields()),
333+
required_fields=sorted(EnvironmentSummary.all_fields()),
332334
),
333335
)
334336
]

sqlmesh/core/state_sync/db/migrator.py

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -173,9 +173,14 @@ def _apply_migrations(
173173

174174
snapshot_count_before = self.snapshot_state.count() if versions.schema_version else None
175175

176+
state_table_exist = any(self.engine_adapter.table_exists(t) for t in self._state_tables)
177+
176178
for migration in migrations:
177179
logger.info(f"Applying migration {migration}")
178-
migration.migrate(state_sync, default_catalog=default_catalog)
180+
migration.migrate_schemas(state_sync, default_catalog=default_catalog)
181+
if state_table_exist:
182+
# No need to run DML for the initial migration since all tables are empty
183+
migration.migrate_rows(state_sync, default_catalog=default_catalog)
179184

180185
snapshot_count_after = self.snapshot_state.count()
181186

sqlmesh/migrations/v0001_init.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@
99
from sqlmesh.utils.migration import index_text_type
1010

1111

12-
def migrate(state_sync, **kwargs): # type: ignore
12+
def migrate_schemas(state_sync, **kwargs): # type: ignore
1313
engine_adapter = state_sync.engine_adapter
1414
schema = state_sync.schema
1515
snapshots_table = "_snapshots"
@@ -58,3 +58,7 @@ def migrate(state_sync, **kwargs): # type: ignore
5858
"sqlglot_version": exp.DataType.build("text"),
5959
},
6060
)
61+
62+
63+
def migrate_rows(state_sync, **kwargs): # type: ignore
64+
pass
Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,9 @@
11
"""Remove identify=True kwarg for rendering sql"""
22

33

4-
def migrate(state_sync, **kwargs): # type: ignore
4+
def migrate_schemas(state_sync, **kwargs): # type: ignore
5+
pass
6+
7+
8+
def migrate_rows(state_sync, **kwargs): # type: ignore
59
pass

sqlmesh/migrations/v0003_move_batch_size.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,11 @@
55
from sqlglot import exp
66

77

8-
def migrate(state_sync, **kwargs): # type: ignore
8+
def migrate_schemas(state_sync, **kwargs): # type: ignore
9+
pass
10+
11+
12+
def migrate_rows(state_sync, **kwargs): # type: ignore
913
snapshots_table = "_snapshots"
1014
if state_sync.schema:
1115
snapshots_table = f"{state_sync.schema}.{snapshots_table}"

sqlmesh/migrations/v0004_environmnent_add_finalized_at.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
from sqlglot import exp
44

55

6-
def migrate(state_sync, **kwargs): # type: ignore
6+
def migrate_schemas(state_sync, **kwargs): # type: ignore
77
engine_adapter = state_sync.engine_adapter
88
environments_table = "_environments"
99
if state_sync.schema:
@@ -21,3 +21,7 @@ def migrate(state_sync, **kwargs): # type: ignore
2121
)
2222

2323
engine_adapter.execute(alter_table_exp)
24+
25+
26+
def migrate_rows(state_sync, **kwargs): # type: ignore
27+
pass

sqlmesh/migrations/v0005_create_seed_table.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
from sqlmesh.utils.migration import index_text_type
66

77

8-
def migrate(state_sync, **kwargs): # type: ignore
8+
def migrate_schemas(state_sync, **kwargs): # type: ignore
99
engine_adapter = state_sync.engine_adapter
1010
seeds_table = "_seeds"
1111
if state_sync.schema:
@@ -22,3 +22,7 @@ def migrate(state_sync, **kwargs): # type: ignore
2222
},
2323
primary_key=("name", "identifier"),
2424
)
25+
26+
27+
def migrate_rows(state_sync, **kwargs): # type: ignore
28+
pass
Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,9 @@
11
"""Seed hashes moved from to_string to to_json for performance."""
22

33

4-
def migrate(state_sync, **kwargs): # type: ignore
4+
def migrate_schemas(state_sync, **kwargs): # type: ignore
5+
pass
6+
7+
8+
def migrate_rows(state_sync, **kwargs): # type: ignore
59
pass

sqlmesh/migrations/v0007_env_table_info_to_kind.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,11 @@ def _hash(data): # type: ignore
1212
return str(zlib.crc32(";".join("" if d is None else d for d in data).encode("utf-8")))
1313

1414

15-
def migrate(state_sync, **kwargs): # type: ignore
15+
def migrate_schemas(state_sync, **kwargs): # type: ignore
16+
pass
17+
18+
19+
def migrate_rows(state_sync, **kwargs): # type: ignore
1620
import pandas as pd
1721

1822
engine_adapter = state_sync.engine_adapter

0 commit comments

Comments
 (0)