Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
93 changes: 21 additions & 72 deletions kale/common/utils.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright 2026 The Kubeflow Authors.
# Copyright 2026 The Kubeflow Authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -45,20 +45,15 @@ def random_string(size=5, chars=string.ascii_lowercase + string.digits):


def abs_working_dir(path):
"""Get absolute path to parent dir."""
"""Get absolute working directory."""
abs_path = os.path.abspath(path)
if os.path.isfile(path):
abs_path = os.path.dirname(abs_path)
return abs_path


def rm_r(path, ignore_missing=True, silent=False):
"""Remove a file or directory.

Similar to rm -r. If the path does not exist and ignore_missing is False,
OSError is raised, otherwise it is ignored.
If silent is True, nothing is raised.
"""
"""Remove a file or directory."""

def onerror(function, path, excinfo):
# Function to handle ENOENT in shutil.rmtree()
Expand All @@ -68,6 +63,7 @@ def onerror(function, path, excinfo):
raise e

log.info("Removing path `%s'", path)
log.debug("Attempting to remove path: %s", path)

try:
if os.path.isfile(path) or os.path.islink(path):
Expand All @@ -83,6 +79,7 @@ def onerror(function, path, excinfo):
# the exception handler handle it (i.e., check ignore_missing etc.)
raise OSError(errno.ENOENT, "No such file or directory", path)
except OSError as e:
log.debug("Error while removing path: %s", path)
if silent:
log.debug("Path `%s' does not exist, skipping removing it", path)
return
Expand All @@ -91,11 +88,15 @@ def onerror(function, path, excinfo):
raise


def remove_ansi_color_sequences(text):
"""Remove ANSI color sequences from text."""
ansi_color_escape = re.compile(r"\x1B\[[0-9;]*m")
return ansi_color_escape.sub("", text)
def remove_ansi_sequences(text: str) -> str:
"""Remove ANSI escape sequences from text."""
if not isinstance(text, str):
return text

ansi_escape = re.compile(
r"\x1B(?:[@-Z\\-_]|\[[0-?]*[ -/]*[@-~])"
)
return ansi_escape.sub("", text)

def comment_magic_commands(code):
"""Comment the magic commands in a code block."""
Expand Down Expand Up @@ -128,21 +129,7 @@ def is_ipython() -> bool:


def graceful_exit(exit_code):
"""Exit the program gracefully.

Running the function `sys.exit()` raises a special exception `SystemExit`
that is not caught by the python REPL, making it exit the program.
IPython's REPL, instead, does catch `SystemExit`. It displays the message
and then goes back to the REPL.

Code that could either run in an IPython kernel (because the Kale pipeline
was produced from a notebook) or in a standard Python process, needs to
handle the exit process seamlessly, regardless of where it's running.

In case the code is running inside an IPython kernel, this function raises
a `KaleGracefulExit` exception. This exception is expected to ke captured
inside the `kale.common.jputils.capture_streams` function.
"""
"""Exit the program gracefully."""
if is_ipython():
from kale.common.jputils import KaleGracefulExit

Expand Down Expand Up @@ -182,51 +169,23 @@ def clean_dir(path: str):


def shorten_long_string(obj: Any, chars: int = 75):
"""Shorten the string representation of the input object."""
"""Shorten long string."""
str_input = str(obj)
return str_input[:chars] + " ..... " + str_input[len(str_input) - chars :]
if len(str_input) <= chars * 2:
return str_input
return str_input[:chars] + " ..... " + str_input[-chars:]


def dedent(text: str):
"""Remove longest common prefix consisting of whitespaces.

Args:
text: Multiline string
"""
"""Dedent text."""
matches = re.findall(r"(?m)^\s+", text)
if len(matches) < len(text.splitlines()):
return text
return re.sub(r"(?m)^.{%d}" % min(map(len, matches)), "", text)


def compute_pip_index_urls() -> list[str]:
"""Compute the list of pip simple index URLs for generated KFP components.

Using a local PyPI index is useful when itering on local
developement with an unpublished version of Kale.

Precedence:
1. If `KALE_PIP_INDEX_URLS` is set, split its comma-separated value and
return that list (order preserved).
2. Else, if `KALE_DEV_MODE` is truthy (`1`, `true`, `yes`, or `on`),
return a list with the devpi simple URL (`KALE_DEVPI_SIMPLE_URL`) or
its default value.
3. Otherwise, return the production default:
["https://pypi.org/simple"].

Environment variables:
KALE_PIP_INDEX_URLS:
Comma-separated list of PEP 503 “simple” index URLs. Highest
priority.
KALE_DEV_MODE:
Boolean-like flag enabling dev mode (interprets 1/true/yes/on).
KALE_DEVPI_SIMPLE_URL:
Devpi “simple” index URL used when dev mode is enabled.

Returns:
list[str]: Index URLs suitable for the `pip_index_urls` parameter in
`@kfp_dsl.component`.
"""
"""Compute pip index URLs."""
pypi_prod_url = "https://pypi.org/simple"
urls: list[str]

Expand Down Expand Up @@ -261,17 +220,7 @@ def compute_pip_index_urls() -> list[str]:


def compute_trusted_hosts() -> list[str]:
"""Compute the list of trusted hosts for use with pip.

If user set a comma separated list of hosts using the
system property KALE_PIP_TRUSTED_HOSTS, then these hosts
will be used as trusted hosts, otherwise the list will be
empty.


:return: The list of trusted hosts configured by the user
:rtype: list[str]
"""
"""Compute trusted hosts."""
pip_trusted_hosts = os.getenv("KALE_PIP_TRUSTED_HOSTS")
trusted_hosts: list[str] = []
if pip_trusted_hosts:
Expand Down
76 changes: 63 additions & 13 deletions kale/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,32 @@
# See the License for the specific language governing permissions and
# limitations under the License.

"""
Pipeline Module

This module defines the core structure and execution logic of a Kale pipeline.

Overview:
- A Pipeline is represented as a directed acyclic graph (DAG) of Steps
- Each Step represents a unit of execution
- Dependencies between steps define execution order

Key Features:
- Uses networkx.DiGraph for DAG representation
- Supports topological sorting for execution
- Handles pipeline parameters and their propagation
- Provides utilities for dependency tracking and graph traversal

Execution Flow:
1. Steps are added to the pipeline
2. Dependencies are defined between steps
3. The pipeline is executed in topological order
4. Each step receives pipeline parameters and executes accordingly

This module acts as the central orchestration layer connecting
steps, configuration, and execution logic.
"""

from collections.abc import Iterable
import copy
import logging
Expand Down Expand Up @@ -190,13 +216,22 @@ def _set_marshal_path(self):


class Pipeline(nx.DiGraph):
"""A Pipeline that can be converted into a KFP pipeline.

This class is used to define a pipeline, its steps and all its
configurations. It extends nx.DiGraph to exploit some graph-related
algorithms but provides helper functions to work with Step objects
instead of standard networkx "nodes". This makes it simpler to access
the steps of the pipeline and their attributes.
"""Represents a Kale pipeline as a Directed Acyclic Graph (DAG).

The Pipeline class extends networkx.DiGraph to model execution
dependencies between Step objects. Each node corresponds to a Step,
and edges represent execution order.

Responsibilities:
- Manage step addition and dependency linking
- Maintain pipeline parameters
- Provide execution order via topological sorting
- Offer utilities to inspect pipeline structure

Attributes:
config (PipelineConfig): Pipeline configuration metadata
pipeline_parameters (dict): Mapping of parameter names to PipelineParam
processor: Optional processor used during execution
"""

def __init__(self, config: PipelineConfig, *args, **kwargs):
Expand All @@ -212,15 +247,30 @@ def run(self):
step.run(self.pipeline_parameters)

def add_step(self, step: Step):
"""Add a new Step to the pipeline."""
"""Add a Step to the pipeline.

Each step must have a unique name. The step is stored as a node
in the underlying graph.

Args:
step (Step): The step to add.

Raises:
RuntimeError: If the object is not a Step or name already exists."""
if not isinstance(step, Step):
raise RuntimeError("Not of type Step.")
if step.name in self.steps_names:
raise RuntimeError(f"Step with name '{step.name}' already exists")
self.add_node(step.name, step=step)

def add_dependency(self, parent: Step, child: Step):
"""Link two Steps in the pipeline."""
"""Define an execution dependency between two steps.

The child step will only execute after the parent step completes.

Args:
parent (Step): The upstream step.
child (Step): The downstream step."""
self.add_edge(parent.name, child.name)

def get_step(self, name: str) -> Step:
Expand Down Expand Up @@ -282,12 +332,12 @@ def _steps_iterable(self, step_names: Iterable[str]) -> Iterable[Step]:
yield self.get_step(name)

def get_leaf_steps(self):
"""Get the list of leaf steps of the pipeline.
"""Retrieve all leaf steps in the pipeline.

A step is considered a leaf when its in-degree is > 0 and its
out-degree is 0.
A leaf step has no outgoing dependencies (i.e., no children).

Returns (list): A list of leaf Steps.
Returns:
list[Step]: List of leaf steps.
"""
return [x for x in self.steps if self.out_degree(x.name) == 0]

Expand Down
55 changes: 52 additions & 3 deletions kale/step.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,24 @@ class StepConfig(Config):


class Step:
"""Class used to store information about a Step of the pipeline."""
"""Represents a single execution unit within a Kale pipeline.

A Step encapsulates:
- The source code or callable to execute
- Input and output data
- Associated artifacts and pipeline parameters
- Execution configuration (via StepConfig)

Steps are connected through dependencies to form a pipeline DAG
and are executed either locally or within a Kubeflow pipeline.

Attributes:
source (list[str] | Callable): Code or function executed by the step
ins (list[Any]): Input variables for the step
outs (list[Any]): Output variables produced by the step
artifacts (list[Artifact]): Artifacts associated with the step
parameters (dict[str, PipelineParam]): Pipeline parameters consumed
"""

def __init__(
self, source: list[str] | Callable, ins: list[Any] = None, outs: list[Any] = None, **kwargs
Expand All @@ -76,7 +93,10 @@ def __init__(
self.fns_free_variables = {}

def __call__(self, *args, **kwargs):
"""Handler for when the @step decorated function is called."""
"""Invoke the step execution handler.

This method is triggered when the step is called like a function.
It delegates execution to the configured execution handler."""
return execution_handler(self, *args, **kwargs)

def add_artifact(self, artifact_name, artifact_type, is_input):
Expand Down Expand Up @@ -105,7 +125,19 @@ def add_artifact(self, artifact_name, artifact_type, is_input):
self.artifacts.append(new_artifact)

def run(self, pipeline_parameters_values: dict[str, PipelineParam]):
"""Run the step locally."""
"""Execute the step locally.

This method:
- Selects the relevant pipeline parameters for this step
- Uses a Marshaller to execute the step's source
- Handles input/output serialization
- Links generated artifacts after execution

Args:
pipeline_parameters_values (dict[str, PipelineParam]):
Dictionary of all pipeline parameters and their values.

"""
log.info("%s Running step '%s'... %s", "-" * 10, self.name, "-" * 10)
# select just the pipeline parameters consumed by this step
_params = {k: pipeline_parameters_values[k] for k in self.parameters}
Expand Down Expand Up @@ -193,6 +225,23 @@ def kfp_outputs(self) -> list[Artifact]:


def __default_execution_handler(step: Step, *args, **kwargs):
"""
Default execution handler for a Step.

This handler is used when no pipeline execution context is set.
It attempts to execute the step's source as a plain Python function.

If the step source is not callable (e.g., generated from a Notebook),
local execution is not supported and a RuntimeError is raised.

Args:
step (Step): The step to execute.
*args: Positional arguments passed to the step function.
**kwargs: Keyword arguments passed to the step function.

Raises:
RuntimeError: If the step source is not callable.
"""
log.info("No Pipeline registration handler is set.")
if not callable(step.source):
raise RuntimeError(
Expand Down