Welcome to Week 2 of the Data Engineering Zoomcamp! This week, we’ll dive into workflow orchestration using Kestra.
Kestra is an open-source, event-driven orchestration platform that simplifies building both scheduled and event-driven workflows. By adopting Infrastructure as Code practices for data and process orchestration, Kestra enables you to build reliable workflows with just a few lines of YAML.
Note
You can find all videos for this week in this YouTube Playlist.
In this section, you’ll learn the foundations of workflow orchestration, its importance, and how Kestra fits into the orchestration landscape.
This week, we're gonna build ETL pipelines for Yellow and Green Taxi data from NYC’s Taxi and Limousine Commission (TLC). You will:
- Extract data from CSV files.
- Load it into Postgres or Google Cloud (GCS + BigQuery).
- Explore scheduling and backfilling workflows.
The project is organized as follows:
.
├── flows/
│ ├── 01_getting_started_data_pipeline.yaml
│ ├── 02_postgres_taxi.yaml
│ ├── 02_postgres_taxi_scheduled.yaml
│ ├── 03_postgres_dbt.yaml
│ ├── 04_gcp_kv.yaml
│ ├── 05_gcp_setup.yaml
│ ├── 06_gcp_taxi.yaml
│ ├── 06_gcp_taxi_scheduled.yaml
│ └── 07_gcp_dbt.yaml
We'll set up Kestra using Docker Compose containing one container for the Kestra server and another for the Postgres database:
cd 02-workflow-orchestration/
docker compose up -d
Once the container starts, you can access the Kestra UI at http://localhost:8080.
If you prefer to add flows programmatically using Kestra's API, run the following commands:
curl -X POST http://localhost:8080/api/v1/flows/import -F fileUpload=@flows/01_getting_started_data_pipeline.yaml
curl -X POST http://localhost:8080/api/v1/flows/import -F fileUpload=@flows/02_postgres_taxi.yaml
curl -X POST http://localhost:8080/api/v1/flows/import -F fileUpload=@flows/02_postgres_taxi_scheduled.yaml
curl -X POST http://localhost:8080/api/v1/flows/import -F fileUpload=@flows/03_postgres_dbt.yaml
curl -X POST http://localhost:8080/api/v1/flows/import -F fileUpload=@flows/04_gcp_kv.yaml
curl -X POST http://localhost:8080/api/v1/flows/import -F fileUpload=@flows/05_gcp_setup.yaml
curl -X POST http://localhost:8080/api/v1/flows/import -F fileUpload=@flows/06_gcp_taxi.yaml
curl -X POST http://localhost:8080/api/v1/flows/import -F fileUpload=@flows/06_gcp_taxi_scheduled.yaml
curl -X POST http://localhost:8080/api/v1/flows/import -F fileUpload=@flows/07_gcp_dbt.yaml
This introductory flow is added just to demonstrate a simple data pipeline which extracts data via HTTP REST API, transforms that data in Python and then queries it using DuckDB.
graph LR
Extract[Extract Data via HTTP REST API] --> Transform[Transform Data in Python]
Transform --> Query[Query Data with DuckDB]
Add the flow 01_getting_started_data_pipeline.yaml
from the UI if you haven't already and execute it to see the results. Inspect the Gantt and Logs tabs to understand the flow execution.
Before we start loading data to GCP, we'll first play with the Yellow and Green Taxi data using a local Postgres database running in a Docker container. To keep things simple, we'll use the same database as the one we set up for Kestra in Docker Compose.
The flow will extract CSV data partitioned by year and month, create tables, load data to the monthly table, and finally merge the data to the final destination table.
graph LR
Start[Select Year & Month] --> SetLabel[Set Labels]
SetLabel --> Extract[Extract CSV Data]
Extract -->|Taxi=Yellow| YellowFinalTable[Create Yellow Final Table]:::yellow
Extract -->|Taxi=Green| GreenFinalTable[Create Green Final Table]:::green
YellowFinalTable --> YellowMonthlyTable[Create Yellow Monthly Table]:::yellow
GreenFinalTable --> GreenMonthlyTable[Create Green Monthly Table]:::green
YellowMonthlyTable --> YellowCopyIn[Load Data to Monthly Table]:::yellow
GreenMonthlyTable --> GreenCopyIn[Load Data to Monthly Table]:::green
YellowCopyIn --> YellowMerge[Merge Yellow Data]:::yellow
GreenCopyIn --> GreenMerge[Merge Green Data]:::green
classDef yellow fill:#FFD700,stroke:#000,stroke-width:1px;
classDef green fill:#32CD32,stroke:#000,stroke-width:1px;
The flow code: 02_postgres_taxi.yaml
.
Note
The NYC Taxi and Limousine Commission (TLC) Trip Record Data provided on the nyc.gov website is currently available only in a Parquet format, but this is NOT the dataset we're going to use in this course. For the purpose of this course, we'll use the CSV files available here on GitHub. This is because the Parquet format can be challenging to understand by newcomers, and we want to make the course as accessible as possible — the CSV format can be easily introspected using tools like Excel or Google Sheets, or even a simple text editor.
We can now schedule the same pipeline shown above to run daily at 9 AM UTC. We'll also demonstrate how to backfill the data pipeline to run on historical data.
Note: given the large dataset, we'll backfill only data for the green taxi dataset for the year 2019.
The flow code: 02_postgres_taxi_scheduled.yaml
.
Now that we have raw data ingested into a local Postgres database, we can use dbt to transform the data into meaningful insights. The flow will sync the dbt models from Git to Kestra and run the dbt build
command to build the models.
graph LR
Start[Select dbt command] --> Sync[Sync Namespace Files]
Sync --> DbtBuild[Run dbt CLI]
The flow code: 03_postgres_dbt.yaml
.
Now that you've learned how to build ETL pipelines locally using Postgres, we are ready to move to the cloud. In this section, we'll load the same Yellow and Green Taxi data to Google Cloud Platform (GCP) using:
- Google Cloud Storage (GCS) as a data lake
- BigQuery as a data warehouse.
Before we start loading data to GCP, we need to set up the Google Cloud Platform.
First, adjust the following flow 04_gcp_kv.yaml
to include your service account, GCP project ID, BigQuery dataset and GCS bucket name (along with their location) as KV Store values:
- GCP_CREDS
- GCP_PROJECT_ID
- GCP_LOCATION
- GCP_BUCKET_NAME
- GCP_DATASET.
Warning
The GCP_CREDS
service account contains sensitive information. Ensure you keep it secure and do not commit it to Git. Keep it as secure as your passwords.
If you haven't already created the GCS bucket and BigQuery dataset in the first week of the course, you can use this flow to create them: 05_gcp_setup.yaml
.
graph LR
SetLabel[Set Labels] --> Extract[Extract CSV Data]
Extract --> UploadToGCS[Upload Data to GCS]
UploadToGCS -->|Taxi=Yellow| BQYellowTripdata[Main Yellow Tripdata Table]:::yellow
UploadToGCS -->|Taxi=Green| BQGreenTripdata[Main Green Tripdata Table]:::green
BQYellowTripdata --> BQYellowTableExt[External Table]:::yellow
BQGreenTripdata --> BQGreenTableExt[External Table]:::green
BQYellowTableExt --> BQYellowTableTmp[Monthly Table]:::yellow
BQGreenTableExt --> BQGreenTableTmp[Monthly Table]:::green
BQYellowTableTmp --> BQYellowMerge[Merge to Main Table]:::yellow
BQGreenTableTmp --> BQGreenMerge[Merge to Main Table]:::green
BQYellowMerge --> PurgeFiles[Purge Files]
BQGreenMerge --> PurgeFiles[Purge Files]
classDef yellow fill:#FFD700,stroke:#000,stroke-width:1px;
classDef green fill:#32CD32,stroke:#000,stroke-width:1px;
The flow code: 06_gcp_taxi.yaml
.
We can now schedule the same pipeline shown above to run daily at 9 AM UTC for the green dataset and at 10 AM UTC for the yellow dataset. You can backfill historical data directly from the Kestra UI.
Since we now process data in a cloud environment with infinitely scalable storage and compute, we can backfill the entire dataset for both the yellow and green taxi data without the risk of running out of resources on our local machine.
The flow code: 06_gcp_taxi_scheduled.yaml
.
Now that we have raw data ingested into BigQuery, we can use dbt to transform that data. The flow will sync the dbt models from Git to Kestra and run the dbt build
command to build the models:
graph LR
Start[Select dbt command] --> Sync[Sync Namespace Files]
Sync --> Build[Run dbt Build Command]
The flow code: 07_gcp_dbt.yaml
.
- Check Kestra Docs
- Explore our Blueprints library
- Browse over 600 plugins available in Kestra
- Give us a star on GitHub
- Join our Slack community if you have any questions
- Find all the videos in this YouTube Playlist
If you encounter similar errors to:
BigQueryError{reason=invalid, location=null,
message=Error while reading table: kestra-sandbox.zooomcamp.yellow_tripdata_2020_01,
error message: CSV table references column position 17, but line contains only 14 columns.;
line_number: 2103925 byte_offset_to_start_of_line: 194863028
column_index: 17 column_name: "congestion_surcharge" column_type: NUMERIC
File: gs://anna-geller/yellow_tripdata_2020-01.csv}
It means that the CSV file you're trying to load into BigQuery has a mismatch in the number of columns between the external source table (i.e. file in GCS) and the destination table in BigQuery. This can happen when for due to network/transfer issues, the file is not fully downloaded from GitHub or not correctly uploaded to GCS. The error suggests schema issues but that's not the case. Simply rerun the entire execution including redownloading the CSV file and reuploading it to GCS. This should resolve the issue.
Did you take notes? You can share them by creating a PR to this file!
- Notes from Manuel Guerra)
- Add your notes above this line