Skip to content

Commit

Permalink
Feat!: Add multiple engine project support (#3394)
Browse files Browse the repository at this point in the history
  • Loading branch information
themisvaltinos authored Dec 3, 2024
1 parent 04feaff commit d7e21f0
Show file tree
Hide file tree
Showing 26 changed files with 848 additions and 105 deletions.
3 changes: 3 additions & 0 deletions docs/concepts/models/overview.md
Original file line number Diff line number Diff line change
Expand Up @@ -351,6 +351,9 @@ Learn more about these properties and their default values in the [model configu

NOTE: This can only be set for forward-only models.

### gateway
: Specifies the gateway to use for the execution of this model. When not specified, the default gateway is used.

## Incremental Model Properties

These properties can be specified in an incremental model's `kind` definition.
Expand Down
111 changes: 111 additions & 0 deletions docs/guides/multi_engine.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
# Multi-Engine guide

Organizations typically connect to a data warehouse through a single engine to ensure data consistency. However, there are cases where the processing capabilities of one engine may be better suited to specific tasks than another.

By decoupling storage from compute and with growing support for open table formats like Apache Iceberg and Hive, different engines can now interact with the same data.

With SQLMesh's new multi-engine feature, users can leverage multiple engine adapters within a single project, offering the flexibility to choose the best engine for each task.

This feature allows you to run each model on a specified engine, provided the data catalog is shared and the engines support read/write operations on it.


## Configuring project with multiple engines

To configure a SQLMesh project with multiple engines, simply include all required gateway [connections](../reference/configuration.md#connection) in your configuration.

Next, specify the appropriate `gateway` in the `MODEL` DDL for each model. If no gateway is explicitly defined, the default gateway will be used.

The [virtual layer](../concepts/glossary.md#virtual-layer) will be created within the engine corresponding to the default gateway.

### Example

Below is a simple example of setting up a project with connections to both DuckDB and PostgreSQL.

In this setup, the PostgreSQL engine is set as the default, so it will be used to manage views in the virtual layer.

Meanwhile, the DuckDB's [attach](https://duckdb.org/docs/sql/statements/attach.html) feature enables read-write access to the PostgreSQL catalog's physical tables.

=== "YAML"

```yaml linenums="1"
gateways:
duckdb:
connection:
type: duckdb
catalogs:
main_db:
type: postgres
path: 'dbname=main_db user=postgres host=127.0.0.1'
extensions:
- name: iceberg
postgres:
connection:
type: postgres
database: main_db
user: user
password: password
host: 127.0.0.1
port: 5432
default_gateway: postgres
```

=== "Python"

```python linenums="1"
from sqlmesh.core.config import (
Config,
ModelDefaultsConfig,
GatewayConfig,
DuckDBConnectionConfig,
PostgresConnectionConfig
)
from sqlmesh.core.config.connection import DuckDBAttachOptions

config = Config(
model_defaults=ModelDefaultsConfig(dialect="postgres"),
gateways={
"duckdb": GatewayConfig(
connection=DuckDBConnectionConfig(
catalogs={
"main_db": DuckDBAttachOptions(
type="postgres",
path="dbname=main_db user=postgres host=127.0.0.1"
),
},
extensions=["iceberg"],
)
),
"postgres": GatewayConfig(
connection=PostgresConnectionConfig(
host="127.0.0.1",
port=5432,
user="postgres",
password="password",
database="main_db",
)
),
},
default_gateway="postgres",
)
```

Given this configuration, when a model’s gateway is set to duckdb, it will be materialized within the PostgreSQL `main_db` catalog, but it will be evaluated using DuckDB’s engine.


```sql linenums="1"
MODEL (
name orders.order_ship_date,
kind FULL,
gateway duckdb,
);

SELECT
l_orderkey,
l_shipdate
FROM
iceberg_scan('data/bucket/lineitem_iceberg', allow_moved_paths = true);
```

In this model, the DuckDB engine can be used to scan and load data from an iceberg table and create the physical table in the PostgreSQL database.

While the PostgreSQL engine is responsible for creating the model's view for the virtual layer.
1 change: 1 addition & 0 deletions docs/reference/model_configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ Configuration options for SQLMesh model properties. Supported by all model kinds
| `session_properties` | A key-value mapping of arbitrary properties specific to the target engine that are applied to the engine session. Specified as key-value pairs (`key = value`). | dict | N |
| `allow_partials` | Whether this model can process partial (incomplete) data intervals | bool | N |
| `enabled` | Whether the model is enabled. This attribute is `true` by default. Setting it to `false` causes SQLMesh to ignore this model when loading the project. | bool | N |
| `gateway` | Specifies the gateway to use for the execution of this model. When not specified, the default gateway is used. | str | N |

### Model defaults

Expand Down
1 change: 1 addition & 0 deletions mkdocs.yml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ nav:
- guides/projects.md
- guides/multi_repo.md
- guides/isolated_systems.md
- guides/multi_engine.md
- Project setup:
- guides/configuration.md
- guides/connections.md
Expand Down
94 changes: 72 additions & 22 deletions sqlmesh/core/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,11 @@
from sqlmesh.core import constants as c
from sqlmesh.core.analytics import python_api_analytics
from sqlmesh.core.audit import Audit, ModelAudit, StandaloneAudit
from sqlmesh.core.config import CategorizerConfig, Config, load_configs
from sqlmesh.core.config import (
CategorizerConfig,
Config,
load_configs,
)
from sqlmesh.core.config.loader import C
from sqlmesh.core.console import Console, get_console
from sqlmesh.core.context_diff import ContextDiff
Expand Down Expand Up @@ -289,7 +293,6 @@ class GenericContext(BaseContext, t.Generic[C]):
"""Encapsulates a SQLMesh environment supplying convenient functions to perform various tasks.
Args:
engine_adapter: The default engine adapter to use.
notification_targets: The notification target to use. Defaults to what is defined in config.
paths: The directories containing SQLMesh files.
config: A Config object or the name of a Config object in config.py.
Expand All @@ -311,7 +314,6 @@ class GenericContext(BaseContext, t.Generic[C]):

def __init__(
self,
engine_adapter: t.Optional[EngineAdapter] = None,
notification_targets: t.Optional[t.List[NotificationTarget]] = None,
state_sync: t.Optional[StateSync] = None,
paths: t.Union[str | Path, t.Iterable[str | Path]] = "",
Expand Down Expand Up @@ -367,19 +369,22 @@ def __init__(
self.environment_ttl = self.config.environment_ttl
self.pinned_environments = Environment.sanitize_names(self.config.pinned_environments)
self.auto_categorize_changes = self.config.plan.auto_categorize_changes
self.selected_gateway = gateway or self.config.default_gateway_name

self._connection_config = self.config.get_connection(self.gateway)
self.concurrent_tasks = concurrent_tasks or self._connection_config.concurrent_tasks
self._engine_adapter = engine_adapter or self._connection_config.create_engine_adapter()

self.console = console or get_console(dialect=self._engine_adapter.dialect)
self._engine_adapters: t.Dict[str, EngineAdapter] = {
self.selected_gateway: self._connection_config.create_engine_adapter()
}

self._snapshot_evaluator: t.Optional[SnapshotEvaluator] = None

self.console = console or get_console(dialect=self.engine_adapter.dialect)
self._test_connection_config = self.config.get_test_connection(
self.gateway, self.default_catalog, default_catalog_dialect=self.engine_adapter.DIALECT
)

self._snapshot_evaluator: t.Optional[SnapshotEvaluator] = None

self._provided_state_sync: t.Optional[StateSync] = state_sync
self._state_sync: t.Optional[StateSync] = None

Expand All @@ -406,24 +411,32 @@ def default_dialect(self) -> t.Optional[str]:

@property
def engine_adapter(self) -> EngineAdapter:
"""Returns an engine adapter."""
return self._engine_adapter
"""Returns the default engine adapter."""
return self._engine_adapters[self.selected_gateway]

@property
def snapshot_evaluator(self) -> SnapshotEvaluator:
if not self._snapshot_evaluator:
if self._snapshot_gateways:
self._create_engine_adapters(set(self._snapshot_gateways.values()))
self._snapshot_evaluator = SnapshotEvaluator(
self.engine_adapter.with_log_level(logging.INFO),
{
gateway: adapter.with_log_level(logging.INFO)
for gateway, adapter in self._engine_adapters.items()
},
ddl_concurrent_tasks=self.concurrent_tasks,
selected_gateway=self.selected_gateway,
)
return self._snapshot_evaluator

def execution_context(
self, deployability_index: t.Optional[DeployabilityIndex] = None
self,
deployability_index: t.Optional[DeployabilityIndex] = None,
engine_adapter: t.Optional[EngineAdapter] = None,
) -> ExecutionContext:
"""Returns an execution context."""
return ExecutionContext(
engine_adapter=self._engine_adapter,
engine_adapter=engine_adapter or self.engine_adapter,
snapshots=self.snapshots,
deployability_index=deployability_index,
default_dialect=self.default_dialect,
Expand Down Expand Up @@ -905,7 +918,9 @@ def render(
if model.is_seed:
df = next(
model.render(
context=self.execution_context(),
context=self.execution_context(
engine_adapter=self._get_engine_adapter(model.gateway)
),
start=start,
end=end,
execution_time=execution_time,
Expand All @@ -924,7 +939,7 @@ def render(
snapshots=snapshots,
expand=expand,
deployability_index=deployability_index,
engine_adapter=self.engine_adapter,
engine_adapter=self._get_engine_adapter(model.gateway),
**kwargs,
)

Expand All @@ -950,7 +965,6 @@ def evaluate(
limit: A limit applied to the model.
"""
snapshot = self.get_snapshot(model_or_snapshot, raise_if_missing=True)

df = self.snapshot_evaluator.evaluate_and_fetch(
snapshot,
start=start,
Expand Down Expand Up @@ -1459,8 +1473,10 @@ def table_diff(
"""
source_alias, target_alias = source, target

adapter = self.engine_adapter
if model_or_snapshot:
model = self.get_model(model_or_snapshot, raise_if_missing=True)
adapter = self._get_engine_adapter(model.gateway)
source_env = self.state_reader.get_environment(source)
target_env = self.state_reader.get_environment(target)

Expand Down Expand Up @@ -1495,7 +1511,7 @@ def table_diff(
)

table_diff = TableDiff(
adapter=self._engine_adapter,
adapter=adapter,
source=source,
target=target,
on=on,
Expand Down Expand Up @@ -1619,14 +1635,15 @@ def create_test(
}

try:
model_to_test = self.get_model(model, raise_if_missing=True)
test_adapter = self._test_connection_config.create_engine_adapter(
register_comments_override=False
)
generate_test(
model=self.get_model(model, raise_if_missing=True),
model=model_to_test,
input_queries=input_queries,
models=self._models,
engine_adapter=self._engine_adapter,
engine_adapter=self._get_engine_adapter(model_to_test.gateway),
test_engine_adapter=test_adapter,
project_path=self.path,
overwrite=overwrite,
Expand Down Expand Up @@ -1840,7 +1857,7 @@ def create_external_models(self, strict: bool = False) -> None:
if self.config_for_node(model) is config
},
),
adapter=self._engine_adapter,
adapter=self.engine_adapter,
state_reader=self.state_reader,
dialect=config.model_defaults.dialect,
gateway=self.gateway,
Expand All @@ -1867,7 +1884,7 @@ def print_info(self, skip_connection: bool = False, verbose: bool = False) -> No
self.config.get_state_connection(self.gateway), self.console, "State Connection"
)

self._try_connection("data warehouse", self._engine_adapter.ping)
self._try_connection("data warehouse", self.engine_adapter.ping)
state_connection = self.config.get_state_connection(self.gateway)
if state_connection:
self._try_connection("state backend", state_connection.connection_validator())
Expand Down Expand Up @@ -1958,7 +1975,9 @@ def _run_plan_tests(
result, test_output = self._run_tests()
if result.testsRun > 0:
self.console.log_test_results(
result, test_output, self._test_connection_config._engine_adapter.DIALECT
result,
test_output,
self._test_connection_config._engine_adapter.DIALECT,
)
if not result.wasSuccessful():
raise PlanError(
Expand Down Expand Up @@ -1988,6 +2007,35 @@ def _model_tables(self) -> t.Dict[str, str]:
for fqn, snapshot in self.snapshots.items()
}

@property
def _snapshot_gateways(self) -> t.Dict[str, str]:
"""Mapping of snapshot name to the gateway if specified in the model."""

return {
fqn: snapshot.model.gateway
for fqn, snapshot in self.snapshots.items()
if snapshot.is_model and snapshot.model.gateway
}

def _create_engine_adapters(self, gateways: t.Optional[t.Set] = None) -> None:
"""Create engine adapters for the gateways, when none provided include all defined in the configs."""

for gateway_name in self.config.gateways:
if gateway_name != self.selected_gateway and (
gateways is None or gateway_name in gateways
):
connection = self.config.get_connection(gateway_name)
adapter = connection.create_engine_adapter()
self.concurrent_tasks = min(self.concurrent_tasks, connection.concurrent_tasks)
self._engine_adapters[gateway_name] = adapter

def _get_engine_adapter(self, gateway: t.Optional[str] = None) -> EngineAdapter:
if gateway:
if adapter := self._engine_adapters.get(gateway):
return adapter
raise SQLMeshError(f"Gateway '{gateway}' not found in the available engine adapters.")
return self.engine_adapter

def _snapshots(
self, models_override: t.Optional[UniqueKeyDict[str, Model]] = None
) -> t.Dict[str, Snapshot]:
Expand Down Expand Up @@ -2085,7 +2133,9 @@ def _run_janitor(self, ignore_ttl: bool = False) -> None:
self._cleanup_environments()
expired_snapshots = self.state_sync.delete_expired_snapshots(ignore_ttl=ignore_ttl)
self.snapshot_evaluator.cleanup(
expired_snapshots, on_complete=self.console.update_cleanup_progress
expired_snapshots,
self._snapshot_gateways,
on_complete=self.console.update_cleanup_progress,
)

self.state_sync.compact_intervals()
Expand Down
1 change: 1 addition & 0 deletions sqlmesh/core/engine_adapter/mixins.py
Original file line number Diff line number Diff line change
Expand Up @@ -299,6 +299,7 @@ def _build_create_table_exp(
select_or_union.set("where", None)

temp_view_name = self._get_temp_table("ctas")

self.create_view(
temp_view_name, select_statement, replace=False, no_schema_binding=False
)
Expand Down
Loading

0 comments on commit d7e21f0

Please sign in to comment.