Skip to content

Commit f01ef63

Browse files
erikamovohrite
andauthored
Split transform_warehouse to run in different schedules (#4072)
* Split dbt models to run in different schedules to replace transform_warehouse [#3753] Signed-off-by: Doc Ritezel <[email protected]> * Upgrade warehouse packages Signed-off-by: Doc Ritezel <[email protected]> * Remove extra CTE from Staging NTD Ridership models [#3542] * Fix datatype error on external_blackcat.all_ntdreports.reportlastmodifieddate since timestamp fields are not converted to UTC epoch anymore. * Fix Metabase sync by removing columns name on yml files that does not exist on the sql code anymore * Switch to LatestOnlyOperator * Exclude payments and mart_payments from metabase sync --------- Signed-off-by: Doc Ritezel <[email protected]> Co-authored-by: Doc Ritezel <[email protected]>
1 parent 4ebb225 commit f01ef63

File tree

40 files changed

+426
-947
lines changed

40 files changed

+426
-947
lines changed

.github/workflows/deploy-dbt.yml

Lines changed: 50 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ env:
2424
CALITP_BUCKET__PUBLISH: ${{ github.ref == 'refs/heads/main' && 'calitp-publish' || 'calitp-staging-publish' }}
2525
GOOGLE_CLOUD_PROJECT: ${{ github.ref == 'refs/heads/main' && 'cal-itp-data-infra' || 'cal-itp-data-infra-staging' }}
2626
METABASE_DESTINATION_DATABASE: ${{ github.ref == 'refs/heads/main' && 'Data Marts (formerly Warehouse Views)' || '(Internal) Staging Warehouse Views' }}
27+
DBT_DOCS_URL: ${{ github.ref == 'refs/heads/main' && 'https://dbt-docs.dds.dot.ca.gov' || 'https://dbt-docs-staging.dds.dot.ca.gov' }}
2728

2829
jobs:
2930
compile:
@@ -109,14 +110,15 @@ jobs:
109110
warehouse/target/*.json
110111
warehouse/target/*.msgpack
111112
112-
models-changed:
113-
name: Detect dbt model changes
113+
dbt-changed:
114+
name: Detect dbt model and seed changes
114115
runs-on: ubuntu-latest
115116

116-
if: ${{ github.event_name == 'pull_request' }}
117-
118117
outputs:
119-
any_changed: ${{ steps.changed-files-warehouse.outputs.any_changed }}
118+
has-changed: ${{ steps.changed.outputs.any_changed }}
119+
changed: ${{ steps.changed.outputs.all_changed_files }}
120+
has-changed-seeds: ${{ steps.changed-seeds.outputs.any_changed }}
121+
changed-seeds: ${{ steps.changed-seeds.outputs.all_changed_files }}
120122

121123
steps:
122124
- name: Checkout
@@ -125,19 +127,37 @@ jobs:
125127
fetch-depth: 0
126128

127129
- uses: tj-actions/changed-files@v46
128-
id: changed-files-warehouse
130+
id: changed
129131
with:
130-
files: 'warehouse/models/**/*.sql'
132+
path: warehouse
133+
separator: "+ "
134+
files: |
135+
models/**/*.sql
136+
seeds/*.csv
131137
132-
metabase:
133-
name: Sync Metabase
138+
- uses: tj-actions/changed-files@v46
139+
id: changed-seeds
140+
with:
141+
path: warehouse
142+
separator: "+ "
143+
files: seeds/*.csv
144+
145+
- name: List changed files
146+
if: ${{ steps.changed.outputs.any_changed == 'true' }}
147+
env:
148+
CHANGED_MODELS: ${{ steps.changed.outputs.all_changed_files }}
149+
run: |
150+
echo "List changed files: $CHANGED_MODELS"
151+
152+
models_and_metabase:
153+
name: Sync Models and Metabase
134154
runs-on: ubuntu-latest
135155

136156
needs:
137157
- compile
138-
- models-changed
158+
- dbt-changed
139159

140-
if: ${{ needs.models-changed.outputs.any_changed == 'true' }}
160+
if: ${{ needs.dbt-changed.outputs.has-changed == 'true' || needs.dbt-changed.outputs.has-changed-seeds == 'true' }}
141161

142162
permissions:
143163
contents: read
@@ -207,19 +227,22 @@ jobs:
207227
working-directory: warehouse
208228
run: poetry run dbt debug --target ${{ env.DBT_TARGET }}
209229

210-
- name: Download latest artifacts from GCS
230+
- name: Run seeds
231+
if: ${{ needs.dbt-changed.outputs.has-changed-seeds == 'true' }}
232+
working-directory: warehouse
233+
run: poetry run dbt seed --select "${{ needs.dbt-changed.outputs.changed-seeds }}+" --target ${{ env.DBT_TARGET }}
234+
235+
- name: List changed models
211236
working-directory: warehouse
212-
if: ${{ env.DBT_TARGET == 'staging' }}
213-
run: gsutil cp -r gs://${{ env.DBT_ARTIFACTS_BUCKET }}/latest/ ./target/
237+
run: poetry run dbt list --select "${{ needs.dbt-changed.outputs.changed }}+" --target ${{ env.DBT_TARGET }}
214238

215-
- name: Run changed models
239+
- name: Run dbt against changed files
216240
working-directory: warehouse
217-
if: ${{ env.DBT_TARGET == 'staging' }}
218-
run: poetry run dbt run --select state:modified+ --target ${{ env.DBT_TARGET }} --state ./target/latest
241+
run: poetry run dbt run --select "${{ needs.dbt-changed.outputs.changed }}+" --target ${{ env.DBT_TARGET }} --full-refresh
219242

220243
- name: Synchronize Metabase
221244
working-directory: warehouse
222-
run: poetry run dbt-metabase models -v --manifest-path=target/manifest.json --exclude-schemas="*staging, payments" --skip-sources --docs-url="https://dbt-docs.dds.dot.ca.gov" --metabase-url="https://dashboards.calitp.org" --metabase-database="${{ env.METABASE_DESTINATION_DATABASE }}" --metabase-api-key="${{ secrets.METABASE_API_KEY}}"
245+
run: poetry run dbt-metabase models -v --manifest-path=target/manifest.json --skip-sources --exclude-schemas=staging,payments,mart_payments --docs-url="${{ env.DBT_DOCS_URL }}" --metabase-url="https://dashboards.calitp.org" --metabase-database="${{ env.METABASE_DESTINATION_DATABASE }}" --metabase-api-key="${{ secrets.METABASE_API_KEY}}"
223246

224247
upload_dbt_docs:
225248
name: Upload to dbt-docs site
@@ -282,7 +305,7 @@ jobs:
282305
headers: |-
283306
content-type: application/vnd.msgpack
284307
285-
upload_docs:
308+
upload_dbt_artifacts:
286309
name: Upload to dbt artifacts bucket
287310
needs: [compile]
288311
runs-on: ubuntu-latest
@@ -383,10 +406,10 @@ jobs:
383406
runs-on: ubuntu-latest
384407

385408
needs:
386-
- models-changed
409+
- dbt-changed
387410
- compile
388411

389-
if: ${{ github.event_name == 'pull_request' && needs.models-changed.outputs.any_changed == 'true' }}
412+
if: ${{ github.event_name == 'pull_request' && needs.dbt-changed.outputs.has-changed == 'true' }}
390413

391414
permissions:
392415
contents: read
@@ -450,9 +473,13 @@ jobs:
450473
working-directory: warehouse
451474
run: poetry run dbt deps
452475

453-
- name: Download latest artifacts from GCS
476+
- name: Create Latest folder
477+
working-directory: warehouse/target
478+
run: mkdir latest
479+
480+
- name: Download latest artifacts from production dbt docs
454481
working-directory: warehouse
455-
run: gsutil cp -r gs://${{ env.DBT_ARTIFACTS_BUCKET }}/latest/ ./target/
482+
run: gsutil cp gs://calitp-dbt-docs/*.json ./target/latest/
456483

457484
- name: Create CI report
458485
working-directory: warehouse

airflow/dags/cosmos_nonpayment_dag.py

Lines changed: 0 additions & 38 deletions
This file was deleted.

airflow/dags/cosmos_payment_dag.py

Lines changed: 0 additions & 38 deletions
This file was deleted.

airflow/dags/create_external_tables/ntd_report_validation/external_table_all_ntdreports.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ schema_fields:
2323
- name: reportstatus
2424
type: STRING
2525
- name: reportlastmodifieddate
26-
type: INTEGER
26+
type: STRING
2727
- name: ntdreportingstationsandmaintenance_data
2828
type: RECORD
2929
mode: REPEATED

airflow/dags/dbt_all_dag.py

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
import os
2+
from datetime import datetime
3+
4+
from cosmos import DbtTaskGroup, ProfileConfig, ProjectConfig, RenderConfig
5+
from cosmos.constants import TestBehavior
6+
7+
from airflow import DAG
8+
from airflow.operators.latest_only import LatestOnlyOperator
9+
10+
DBT_TARGET = os.environ.get("DBT_TARGET")
11+
12+
with DAG(
13+
dag_id="dbt_all",
14+
tags=["dbt", "all"],
15+
# Monday, Thursday at 7am PDT/8am PST (2pm UTC)
16+
schedule="0 14 * * 1,4",
17+
start_date=datetime(2025, 7, 6),
18+
catchup=False,
19+
):
20+
latest_only = LatestOnlyOperator(task_id="latest_only", depends_on_past=False)
21+
22+
dbt_all = DbtTaskGroup(
23+
group_id="dbt_all",
24+
project_config=ProjectConfig(
25+
dbt_project_path="/home/airflow/gcs/data/warehouse",
26+
manifest_path="/home/airflow/gcs/data/warehouse/target/manifest.json",
27+
project_name="calitp_warehouse",
28+
seeds_relative_path="seeds/",
29+
),
30+
profile_config=ProfileConfig(
31+
target_name=DBT_TARGET,
32+
profile_name="calitp_warehouse",
33+
profiles_yml_filepath="/home/airflow/gcs/data/warehouse/profiles.yml",
34+
),
35+
render_config=RenderConfig(
36+
test_behavior=TestBehavior.AFTER_ALL,
37+
),
38+
operator_args={
39+
"install_deps": True,
40+
},
41+
default_args={"retries": 0},
42+
)
43+
44+
latest_only >> dbt_all

airflow/dags/dbt_payments_dag.py

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
import os
2+
from datetime import datetime
3+
4+
from cosmos import DbtTaskGroup, ProfileConfig, ProjectConfig, RenderConfig
5+
from cosmos.constants import TestBehavior
6+
7+
from airflow import DAG
8+
from airflow.operators.latest_only import LatestOnlyOperator
9+
10+
DBT_TARGET = os.environ.get("DBT_TARGET")
11+
12+
with DAG(
13+
dag_id="dbt_payments",
14+
tags=["dbt", "payments"],
15+
# Tuesday, Wednesday, Friday at 7am PDT/8am PST (2pm UTC)
16+
schedule="0 14 * * 2,3,5",
17+
start_date=datetime(2025, 7, 6),
18+
catchup=False,
19+
):
20+
latest_only = LatestOnlyOperator(task_id="latest_only", depends_on_past=False)
21+
22+
dbt_payments = DbtTaskGroup(
23+
group_id="dbt_payments",
24+
project_config=ProjectConfig(
25+
dbt_project_path="/home/airflow/gcs/data/warehouse",
26+
manifest_path="/home/airflow/gcs/data/warehouse/target/manifest.json",
27+
project_name="calitp_warehouse",
28+
seeds_relative_path="seeds/",
29+
),
30+
profile_config=ProfileConfig(
31+
target_name=DBT_TARGET,
32+
profile_name="calitp_warehouse",
33+
profiles_yml_filepath="/home/airflow/gcs/data/warehouse/profiles.yml",
34+
),
35+
render_config=RenderConfig(
36+
select=[
37+
"+path:models/staging/payments+",
38+
"+path:models/intermediate/payments+",
39+
"+path:models/mart/payments+",
40+
],
41+
test_behavior=TestBehavior.AFTER_ALL,
42+
),
43+
operator_args={
44+
"install_deps": True,
45+
},
46+
default_args={"retries": 0},
47+
)
48+
49+
latest_only >> dbt_payments

airflow/dags/transform_warehouse/METADATA.yml

Lines changed: 0 additions & 18 deletions
This file was deleted.

airflow/dags/transform_warehouse/README.md

Lines changed: 0 additions & 19 deletions
This file was deleted.

0 commit comments

Comments
 (0)