This project provides a small local demo of a warehouse import pipeline:
- Fetches batches from a REST API
- Converts records to Apache Arrow tables
- Appends into an Iceberg table stored in MinIO (S3-compatible)
- Create and activate a virtual environment:
python -m venv .venvsource .venv/bin/activate
- Install dependencies:
pip install -e ".[dev]"
- Copy
env.exampleto.envand adjust if needed. - Start MinIO (and Temporal if you want workflow mode):
docker compose up -d
- Run the import locally:
warehouse-pipeline run
- Start the worker:
python -m warehouse_pipeline.temporal.worker
- Submit a workflow run:
warehouse-pipeline temporal-run
- Open Temporal UI:
http://localhost:8233
- Console:
http://localhost:9001 - Default credentials:
MINIO_ACCESS_KEY/MINIO_SECRET_KEY(defaults tominioadmin) - Bucket:
warehouse - Iceberg paths:
- Data files:
iceberg/local/posts/data/ - Metadata:
iceberg/local/posts/metadata/
- Data files:
flowchart LR
RestAPI[REST_API] --> Fetch[Fetch_batches]
Fetch --> Arrow[Arrow_tables]
Arrow --> Iceberg[Iceberg_table]
Iceberg --> MinIO[MinIO_S3]
flowchart LR
subgraph client [CLI_and_Worker]
CLI[CLI_run]
Worker[Temporal_worker]
end
subgraph temporal [Temporal]
Server[Temporal_server]
UI[Temporal_UI]
end
subgraph storage [Storage]
Catalog[SQLite_catalog]
Iceberg[Iceberg_table_format]
Minio[MinIO_S3]
end
Source[REST_API]
Arrow[Arrow_tables]
CLI -->|local_run| Source
CLI -->|local_run| Arrow
Arrow -->|data_files| Iceberg
CLI -->|metadata_ops| Catalog
Iceberg -->|data_files| Minio
CLI -->|temporal_run| Server
Worker -->|polls_task_queue| Server
Worker -->|activities| Source
Worker -->|activities| Arrow
Worker -->|metadata_ops| Catalog
Arrow -->|data_files| Iceberg
Iceberg -->|data_files| Minio
UI --> Server
- Local-first setup: MinIO provides S3-compatible storage and SQLite is used for the Iceberg catalog, so the whole stack runs locally.
- Explicit schemas: Arrow and Iceberg schemas are defined up front to make data types deterministic and to keep the demo stable.
- Batch pagination: The REST source pulls pages using
_pageand_limitparameters to keep memory usage predictable. - Async + Temporal: Async fetch is used for efficient I/O, and Temporal workflows split fetch/append into activities for retries and orchestration.
- Config via env: All runtime settings are driven by environment variables to keep the code portable across local and CI runs.
- The default source is
jsonplaceholder.typicode.com/posts. - The Iceberg catalog uses a local SQLite file and stores data in MinIO.
- Temporal runs on
localhost:7233by default (configure viaTEMPORAL_ADDRESS).