ETLCase is a high-performance, async-first ETL framework designed to integrate Large Language Models (LLMs) directly into data processing workflows. It supports both high-throughput Batch processing and infinite Continuous streaming, providing real-time observability and AI-driven data analysis.
The framework is divided into three primary layers:
- Pipeline: The central orchestrator. It uses a decorator-based API to define tasks and dependencies. It automatically constructs a Directed Acyclic Graph (DAG) and performs a topological sort to ensure tasks run in the correct order.
- PipelineContext: A thread-safe container passed to every task, holding artifacts, execution history, and the event log.
- TaskInput: A unified interface that allows tasks to iterate over inputs (async for name, data in inputs) regardless of whether the pipeline is in Batch (one-off) or Continuous (queue-based) mode.
- Profiling: Every task is monitored for duration, record count, status, and errors. A rolling event log tracks system-level signals.
- LLMGateway: A unified gateway for AI interactions. It supports:
- Text Completion: Standard prompt/response.
- Vision API: Seamlessly handles image bytes in the prompt.
- Structured Output: Uses Pydantic models to force the LLM to return valid JSON, which is automatically parsed into Python objects.
- Real-time UI: A Streamlit-based dashboard that polls the running engine.
- Live Profiling: Visualizes the status of every task, showing "Records Out" and real-time durations.
- AI Analysis History: Displays findings from @DEF.analysis decorators. It maintains a historical list of results, showing the most recent insights at the top.
- Usage Metrics: Breakdowns of LLM costs and token counts per provider.
The framework includes optimized utilities for common AI-assisted data tasks:
This utility generates a new structured column in a Polars DataFrame using an LLM. It is designed for maximum efficiency:
- Automatic Batching: If the DataFrame contains multiple rows, it dynamically creates a wrapper Pydantic model to request a list of objects from the LLM in a single API call.
- Schema Enforcement: Uses Pydantic to ensure the LLM output matches your required structure.
- Integration: Returns a Polars DataFrame with the new column containing structured data (Structs).
A powerful diagnostic tool for comparing two datasets (e.g., current chunk vs. historical data):
- Statistical Analysis: Automatically calculates numerical distributions (mean, std, min, max, etc.) for both datasets.
- Keyword Scanning: Scans text columns for specific error signals like "error", "fail", or "timeout".
- AI Summary: Constructs a comprehensive report of statistical shifts and textual keyword matches, then uses the LLM to explain the differences in human-readable terms.
Pipelines are defined in Python files within the pipelines/ directory.
import polars as pl
from src.engine.pipeline import Pipeline
from src.engine.common import ETLStage
# continuous=True enables asyncio.Queue-based streaming
DEF = Pipeline(continuous=False) Use the @DEF.task decorator. Tasks are grouped by ETLStage (Extract, Transform, Load).
@DEF.task(stage=ETLStage.Extract, name="load_data")
async def extract(ctx):
yield pl.read_csv("data.csv")
@DEF.task(stage=ETLStage.Transform, name="clean_data", dependencies=["load_data"])
async def transform(ctx, inputs):
async for name, df in inputs:
# data processing logic
yield df.filter(pl.col("value") > 0)Analyses run in the background as soon as a task emits data. They do not block the main data flow.
@DEF.analysis(task_name="clean_data", name="Anomaly Detector")
async def check_for_weirdness(ctx, current_output, previous_output):
if current_output.height > 100:
return "Warning: Unusually large data chunk detected."
return None # Returning None hides the result from the UIThe framework is optimized for multi-modal data. You can pipe raw image bytes from an Extract task into a Transform task that uses gateway.complete(images=[...]) to describe or OCR the images, then load the resulting metadata into a database.
Instead of writing complex regex or validation rules, use @DEF.analysis to let an LLM "watch" the data.
- Example: In the money pipeline, an LLM analyzes joined datasets to report on the human impact of failed transactions.
- Filtering: If the LLM finds nothing important, the analysis returns None, keeping the dashboard clutter-free.
Using src.engine.utils.llm.generate_structured_column, you can process entire DataFrames in a single LLM call. The utility automatically batches rows into a structured list prompt, significantly reducing latency and API costs compared to row-by-row processing.
Set continuous=True to build pipelines that never stop.
- Extract: Polls an API, watches a folder for new files, or listens to a socket.
- Transform/Load: Process data as it arrives.
- Observability: The dashboard updates live as every individual chunk passes through the system.
- Start the Dashboard:
uv run streamlit run src/app/dashboard.py
- Select a Pipeline: Use the sidebar to choose between money, products, or micro.
- Run: Click "Run" to see the engine execute the DAG, track costs, and display AI insights in real-time.