|
31 | 31 | OnAdditiveChange,
|
32 | 32 | on_destructive_change_validator,
|
33 | 33 | on_additive_change_validator,
|
| 34 | + TimeColumn, |
34 | 35 | )
|
35 | 36 | from sqlmesh.dbt.basemodel import BaseModelConfig, Materialization, SnapshotStrategy
|
36 | 37 | from sqlmesh.dbt.common import SqlStr, sql_str_validator
|
@@ -85,7 +86,7 @@ class ModelConfig(BaseModelConfig):
|
85 | 86 |
|
86 | 87 | # sqlmesh fields
|
87 | 88 | sql: SqlStr = SqlStr("")
|
88 |
| - time_column: t.Optional[str] = None |
| 89 | + time_column: t.Optional[TimeColumn] = None |
89 | 90 | cron: t.Optional[str] = None
|
90 | 91 | interval_unit: t.Optional[str] = None
|
91 | 92 | batch_concurrency: t.Optional[int] = None
|
@@ -152,6 +153,7 @@ class ModelConfig(BaseModelConfig):
|
152 | 153 | _sql_validator = sql_str_validator
|
153 | 154 | _on_destructive_change_validator = on_destructive_change_validator
|
154 | 155 | _on_additive_change_validator = on_additive_change_validator
|
| 156 | + _time_column_validator = TimeColumn.validator() |
155 | 157 |
|
156 | 158 | @field_validator(
|
157 | 159 | "unique_key",
|
@@ -243,17 +245,6 @@ def snapshot_strategy(self) -> t.Optional[SnapshotStrategy]:
|
243 | 245 | def table_schema(self) -> str:
|
244 | 246 | return self.target_schema or super().table_schema
|
245 | 247 |
|
246 |
| - def _get_overlapping_field_value( |
247 |
| - self, context: DbtContext, dbt_field_name: str, sqlmesh_field_name: str |
248 |
| - ) -> t.Optional[t.Any]: |
249 |
| - dbt_field = self._get_field_value(dbt_field_name) |
250 |
| - sqlmesh_field = getattr(self, sqlmesh_field_name, None) |
251 |
| - if dbt_field is not None and sqlmesh_field is not None: |
252 |
| - get_console().log_warning( |
253 |
| - f"Both '{dbt_field_name}' and '{sqlmesh_field_name}' are set for model '{self.canonical_name(context)}'. '{sqlmesh_field_name}' will be used." |
254 |
| - ) |
255 |
| - return sqlmesh_field if sqlmesh_field is not None else dbt_field |
256 |
| - |
257 | 248 | def model_kind(self, context: DbtContext) -> ModelKind:
|
258 | 249 | """
|
259 | 250 | Get the sqlmesh ModelKind
|
@@ -342,16 +333,18 @@ def model_kind(self, context: DbtContext) -> ModelKind:
|
342 | 333 | f"Supported strategies include {collection_to_str(INCREMENTAL_BY_TIME_RANGE_STRATEGIES)}."
|
343 | 334 | )
|
344 | 335 |
|
345 |
| - if self.time_column and strategy not in {"incremental_by_time_range", "microbatch"}: |
| 336 | + if self.time_column and strategy != "incremental_by_time_range": |
346 | 337 | get_console().log_warning(
|
347 | 338 | f"Using `time_column` on a model with incremental_strategy '{strategy}' has been deprecated. "
|
348 | 339 | f"Please use `incremental_by_time_range` instead in model '{self.canonical_name(context)}'."
|
349 | 340 | )
|
350 | 341 |
|
351 | 342 | if strategy == "microbatch":
|
352 |
| - time_column = self._get_overlapping_field_value( |
353 |
| - context, "event_time", "time_column" |
354 |
| - ) |
| 343 | + if self.time_column: |
| 344 | + raise ConfigError( |
| 345 | + f"{self.canonical_name(context)}: 'time_column' cannot be used with 'microbatch' incremental strategy. Use 'event_time' instead." |
| 346 | + ) |
| 347 | + time_column = self._get_field_value("event_time") |
355 | 348 | if not time_column:
|
356 | 349 | raise ConfigError(
|
357 | 350 | f"{self.canonical_name(context)}: 'event_time' is required for microbatch incremental strategy."
|
|
0 commit comments