Skip to content

Commit 7029c40

Browse files
Feat: Support SQLMesh project generation from dlt pipeline (TobikoData#3218)
1 parent 3eb5364 commit 7029c40

File tree

14 files changed

+477
-23
lines changed

14 files changed

+477
-23
lines changed

Diff for: Makefile

+2-2
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,10 @@
11
.PHONY: docs
22

33
install-dev:
4-
pip3 install -e ".[dev,web,slack]"
4+
pip3 install -e ".[dev,web,slack,dlt]"
55

66
install-cicd-test:
7-
pip3 install -e ".[dev,web,slack,cicdtest]"
7+
pip3 install -e ".[dev,web,slack,cicdtest,dlt]"
88

99
install-doc:
1010
pip3 install -r ./docs/requirements.txt

Diff for: docs/integrations/dlt.md

+87
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,87 @@
1+
# dlt
2+
3+
SQLMesh enables efforless project generation using data ingested through [dlt](https://github.com/dlt-hub/dlt). This involves creating a baseline project scaffolding, generating incremental models to process the data from the pipeline's tables by inspecting its schema and configuring the gateway connection using the pipeline's credentials.
4+
5+
## Getting started
6+
### Reading from a dlt pipeline
7+
8+
To load data from a dlt pipeline into SQLMesh, ensure the dlt pipeline has been run or restored locally. Then simply execute the sqlmesh `init` command *within the dlt project root directory* using the `dlt` template option and specifying the pipeline's name with the `dlt-pipeline` option:
9+
10+
```bash
11+
$ sqlmesh init -t dlt --dlt-pipeline <pipeline-name> dialect
12+
```
13+
14+
This will create the configuration file and directories, which are found in all SQLMesh projects:
15+
16+
- config.yaml
17+
- The file for project configuration. Refer to [configuration](../reference/configuration.md).
18+
- ./models
19+
- SQL and Python models. Refer to [models](../concepts/models/overview.md).
20+
- ./seeds
21+
- Seed files. Refer to [seeds](../concepts/models/seed_models.md).
22+
- ./audits
23+
- Shared audit files. Refer to [auditing](../concepts/audits.md).
24+
- ./tests
25+
- Unit test files. Refer to [testing](../concepts/tests.md).
26+
- ./macros
27+
- Macro files. Refer to [macros](../concepts/macros/overview.md).
28+
29+
SQLMesh will also automatically generate models to ingest data from the pipeline incrementally. Incremental loading is ideal for large datasets where recomputing entire tables is resource-intensive. In this case utilizing the [`INCREMENTAL_BY_TIME_RANGE` model kind](../concepts/models/model_kinds.md#incremental_by_time_range). However, these model definitions can be customized to meet your specific project needs.
30+
31+
#### Configuration
32+
33+
SQLMesh will retrieve the data warehouse connection credentials from your dlt project to configure the `config.yaml` file. This configuration can be modified or customized as needed. For more details, refer to the [configuration guide](../guides/configuration.md).
34+
35+
### Example
36+
37+
Generating a SQLMesh project dlt is quite simple. In this example, we'll use the example `sushi_pipeline.py` from the [sushi-dlt project](https://github.com/TobikoData/sqlmesh/tree/main/examples/sushi_dlt).
38+
39+
First, run the pipeline within the project directory:
40+
41+
```bash
42+
$ python sushi_pipeline.py
43+
Pipeline sushi load step completed in 2.09 seconds
44+
Load package 1728074157.660565 is LOADED and contains no failed jobs
45+
```
46+
47+
After the pipeline has run, generate a SQLMesh project by executing:
48+
49+
```bash
50+
$ sqlmesh init -t dlt --dlt-pipeline sushi duckdb
51+
```
52+
53+
Then the SQLMesh project is all set up. You can then proceed to run the SQLMesh `plan` command to ingest the dlt pipeline data and populate the SQLMesh tables:
54+
55+
```bash
56+
$ sqlmesh plan
57+
New environment `prod` will be created from `prod`
58+
Summary of differences against `prod`:
59+
Models:
60+
└── Added:
61+
├── sushi_dataset_sqlmesh.incremental__dlt_loads
62+
├── sushi_dataset_sqlmesh.incremental_sushi_types
63+
└── sushi_dataset_sqlmesh.incremental_waiters
64+
Models needing backfill (missing dates):
65+
├── sushi_dataset_sqlmesh.incremental__dlt_loads: 2024-10-03 - 2024-10-03
66+
├── sushi_dataset_sqlmesh.incremental_sushi_types: 2024-10-03 - 2024-10-03
67+
└── sushi_dataset_sqlmesh.incremental_waiters: 2024-10-03 - 2024-10-03
68+
Apply - Backfill Tables [y/n]: y
69+
Creating physical table ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 100.0% • 3/3 • 0:00:00
70+
71+
All model versions have been created successfully
72+
73+
[1/1] sushi_dataset_sqlmesh.incremental__dlt_loads evaluated in 0.01s
74+
[1/1] sushi_dataset_sqlmesh.incremental_sushi_types evaluated in 0.00s
75+
[1/1] sushi_dataset_sqlmesh.incremental_waiters evaluated in 0.01s
76+
Evaluating models ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 100.0% • 3/3 • 0:00:00
77+
78+
79+
All model batches have been executed successfully
80+
81+
Virtually Updating 'prod' ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 100.0% • 0:00:00
82+
83+
The target environment has been updated successfully
84+
```
85+
86+
Once the models are planned and applied, you can continue as with any SQLMesh project, generating and applying [plans](../concepts/overview.md#make-a-plan), running [tests](../concepts/overview.md#tests) or [audits](../concepts/overview.md#audits), and executing models with a [scheduler](../guides/scheduling.md) if desired.
87+

Diff for: docs/integrations/overview.md

+1
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ SQLMesh supports integrations with the following tools:
55

66
* [Airflow](airflow.md)
77
* [dbt](dbt.md)
8+
* [dlt](dlt.md)
89
* [GitHub Actions](github.md)
910
* [Kestra](https://kestra.io/plugins/plugin-sqlmesh/tasks/cli/io.kestra.plugin.sqlmesh.cli.sqlmeshcli)
1011

Diff for: docs/reference/cli.md

+3-1
Original file line numberDiff line numberDiff line change
@@ -214,7 +214,9 @@ Usage: sqlmesh init [OPTIONS] [SQL_DIALECT]
214214
215215
Options:
216216
-t, --template TEXT Project template. Supported values: airflow, dbt,
217-
default, empty.
217+
dlt, default, empty.
218+
--dlt-pipeline TEXT DLT pipeline for which to generate a SQLMesh project.
219+
This option is supported if the template is dlt.
218220
--help Show this message and exit.
219221
```
220222

Diff for: docs/reference/notebook.md

+5-2
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,7 @@ options:
7070

7171
#### init
7272
```
73-
%init [--template TEMPLATE] path sql_dialect
73+
%init [--template TEMPLATE] [--dlt-pipeline PIPELINE] path sql_dialect
7474
7575
Creates a SQLMesh project scaffold with a default SQL dialect.
7676
@@ -87,7 +87,10 @@ positional arguments:
8787
options:
8888
--template TEMPLATE, -t TEMPLATE
8989
Project template. Supported values: airflow, dbt,
90-
default, empty.
90+
dlt, default, empty.
91+
--dlt-pipeline PIPELINE
92+
DLT pipeline for which to generate a SQLMesh project.
93+
This option is supported if the template is dlt.
9194
```
9295

9396
#### plan

Diff for: examples/sushi_dlt/sushi_pipeline.py

+35
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
import typing as t
2+
import dlt
3+
4+
5+
# Example sushi_types table
6+
@dlt.resource(name="sushi_types", primary_key="id", write_disposition="merge")
7+
def sushi_types() -> t.Iterator[t.Dict[str, t.Any]]:
8+
yield from [
9+
{"id": 0, "name": "Tobiko"},
10+
{"id": 1, "name": "Sashimi"},
11+
{"id": 2, "name": "Maki"},
12+
{"id": 3, "name": "Temaki"},
13+
]
14+
15+
16+
# Example waiters table
17+
@dlt.resource(name="waiters", primary_key="id", write_disposition="merge")
18+
def waiters() -> t.Iterator[t.Dict[str, t.Any]]:
19+
yield from [
20+
{"id": 0, "name": "Toby"},
21+
{"id": 1, "name": "Tyson"},
22+
{"id": 2, "name": "Ryan"},
23+
{"id": 3, "name": "George"},
24+
{"id": 4, "name": "Chris"},
25+
{"id": 5, "name": "Max"},
26+
{"id": 6, "name": "Vincent"},
27+
{"id": 7, "name": "Iaroslav"},
28+
{"id": 8, "name": "Emma"},
29+
{"id": 9, "name": "Maia"},
30+
]
31+
32+
33+
# Run the pipeline
34+
p = dlt.pipeline(pipeline_name="sushi", destination="duckdb")
35+
info = p.run([sushi_types(), waiters()])

Diff for: setup.cfg

+3
Original file line numberDiff line numberDiff line change
@@ -98,3 +98,6 @@ ignore_missing_imports = True
9898

9999
[mypy-pydantic_core.*]
100100
ignore_missing_imports = True
101+
102+
[mypy-dlt.*]
103+
ignore_missing_imports = True

Diff for: setup.py

+3
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,9 @@
105105
"dbt": [
106106
"dbt-core<2",
107107
],
108+
"dlt": [
109+
"dlt",
110+
],
108111
"gcppostgres": [
109112
"cloud-sql-python-connector[pg8000]",
110113
],

Diff for: sqlmesh/cli/example_project.py

+45-11
Original file line numberDiff line numberDiff line change
@@ -4,35 +4,46 @@
44

55
import click
66
from sqlglot import Dialect
7+
from sqlmesh.integrations.dlt import generate_dlt_models_and_settings
78
from sqlmesh.utils.date import yesterday_ds
89

910

1011
class ProjectTemplate(Enum):
1112
AIRFLOW = "airflow"
1213
DBT = "dbt"
14+
DLT = "dlt"
1315
DEFAULT = "default"
1416
EMPTY = "empty"
1517

1618

17-
def _gen_config(dialect: t.Optional[str], template: ProjectTemplate) -> str:
19+
def _gen_config(
20+
dialect: t.Optional[str],
21+
settings: t.Optional[str],
22+
start: t.Optional[str],
23+
template: ProjectTemplate,
24+
) -> str:
25+
connection_settings = (
26+
settings
27+
or """ type: duckdb
28+
database: db.db"""
29+
)
30+
1831
default_configs = {
1932
ProjectTemplate.DEFAULT: f"""gateways:
2033
local:
2134
connection:
22-
type: duckdb
23-
database: db.db
35+
{connection_settings}
2436
2537
default_gateway: local
2638
2739
model_defaults:
2840
dialect: {dialect}
29-
start: {yesterday_ds()}
41+
start: {start or yesterday_ds()}
3042
""",
3143
ProjectTemplate.AIRFLOW: f"""gateways:
3244
local:
3345
connection:
34-
type: duckdb
35-
database: db.db
46+
{connection_settings}
3647
3748
default_gateway: local
3849
@@ -55,6 +66,7 @@ def _gen_config(dialect: t.Optional[str], template: ProjectTemplate) -> str:
5566
}
5667

5768
default_configs[ProjectTemplate.EMPTY] = default_configs[ProjectTemplate.DEFAULT]
69+
default_configs[ProjectTemplate.DLT] = default_configs[ProjectTemplate.DEFAULT]
5870
return default_configs[template]
5971

6072

@@ -158,6 +170,7 @@ def init_example_project(
158170
path: t.Union[str, Path],
159171
dialect: t.Optional[str],
160172
template: ProjectTemplate = ProjectTemplate.DEFAULT,
173+
pipeline: t.Optional[str] = None,
161174
) -> None:
162175
root_path = Path(path)
163176
config_extension = "py" if template == ProjectTemplate.DBT else "yaml"
@@ -176,12 +189,27 @@ def init_example_project(
176189
"Default SQL dialect is a required argument for SQLMesh projects"
177190
)
178191

179-
_create_config(config_path, dialect, template)
192+
models = None
193+
settings = None
194+
start = None
195+
if template == ProjectTemplate.DLT:
196+
if pipeline and dialect:
197+
models, settings, start = generate_dlt_models_and_settings(pipeline, dialect)
198+
else:
199+
raise click.ClickException(
200+
"DLT pipeline is a required argument to generate a SQLMesh project from DLT"
201+
)
202+
203+
_create_config(config_path, dialect, settings, start, template)
180204
if template == ProjectTemplate.DBT:
181205
return
182206

183207
_create_folders([audits_path, macros_path, models_path, seeds_path, tests_path])
184208

209+
if template == ProjectTemplate.DLT:
210+
_create_models(models_path, models)
211+
return
212+
185213
if template != ProjectTemplate.EMPTY:
186214
_create_macros(macros_path)
187215
_create_audits(audits_path)
@@ -196,11 +224,17 @@ def _create_folders(target_folders: t.Sequence[Path]) -> None:
196224
(folder_path / ".gitkeep").touch()
197225

198226

199-
def _create_config(config_path: Path, dialect: t.Optional[str], template: ProjectTemplate) -> None:
227+
def _create_config(
228+
config_path: Path,
229+
dialect: t.Optional[str],
230+
settings: t.Optional[str],
231+
start: t.Optional[str],
232+
template: ProjectTemplate,
233+
) -> None:
200234
if dialect:
201235
Dialect.get_or_raise(dialect)
202236

203-
project_config = _gen_config(dialect, template)
237+
project_config = _gen_config(dialect, settings, start, template)
204238

205239
_write_file(
206240
config_path,
@@ -216,8 +250,8 @@ def _create_audits(audits_path: Path) -> None:
216250
_write_file(audits_path / "assert_positive_order_ids.sql", EXAMPLE_AUDIT)
217251

218252

219-
def _create_models(models_path: Path) -> None:
220-
for model_name, model_def in [
253+
def _create_models(models_path: Path, models: t.Optional[t.Set[t.Tuple[str, str]]] = None) -> None:
254+
for model_name, model_def in models or [
221255
(EXAMPLE_FULL_MODEL_NAME, EXAMPLE_FULL_MODEL_DEF),
222256
(EXAMPLE_INCREMENTAL_MODEL_NAME, EXAMPLE_INCREMENTAL_MODEL_DEF),
223257
(EXAMPLE_SEED_MODEL_NAME, EXAMPLE_SEED_MODEL_DEF),

Diff for: sqlmesh/cli/main.py

+13-3
Original file line numberDiff line numberDiff line change
@@ -118,20 +118,30 @@ def cli(
118118
"-t",
119119
"--template",
120120
type=str,
121-
help="Project template. Supported values: airflow, dbt, default, empty.",
121+
help="Project template. Supported values: airflow, dbt, dlt, default, empty.",
122+
)
123+
@click.option(
124+
"--dlt-pipeline",
125+
type=str,
126+
help="DLT pipeline for which to generate a SQLMesh project. Use alongside template: dlt",
122127
)
123128
@click.pass_context
124129
@error_handler
125130
@cli_analytics
126131
def init(
127-
ctx: click.Context, sql_dialect: t.Optional[str] = None, template: t.Optional[str] = None
132+
ctx: click.Context,
133+
sql_dialect: t.Optional[str] = None,
134+
template: t.Optional[str] = None,
135+
dlt_pipeline: t.Optional[str] = None,
128136
) -> None:
129137
"""Create a new SQLMesh repository."""
130138
try:
131139
project_template = ProjectTemplate(template.lower() if template else "default")
132140
except ValueError:
133141
raise click.ClickException(f"Invalid project template '{template}'")
134-
init_example_project(ctx.obj, dialect=sql_dialect, template=project_template)
142+
init_example_project(
143+
ctx.obj, dialect=sql_dialect, template=project_template, pipeline=dlt_pipeline
144+
)
135145

136146

137147
@cli.command("render")

0 commit comments

Comments
 (0)