diff --git a/kale/common/utils.py b/kale/common/utils.py index d28f2513e..19992892f 100644 --- a/kale/common/utils.py +++ b/kale/common/utils.py @@ -1,4 +1,4 @@ -# Copyright 2026 The Kubeflow Authors. +# Copyright 2026 The Kubeflow Authors. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -45,7 +45,7 @@ def random_string(size=5, chars=string.ascii_lowercase + string.digits): def abs_working_dir(path): - """Get absolute path to parent dir.""" + """Get absolute working directory.""" abs_path = os.path.abspath(path) if os.path.isfile(path): abs_path = os.path.dirname(abs_path) @@ -53,12 +53,7 @@ def abs_working_dir(path): def rm_r(path, ignore_missing=True, silent=False): - """Remove a file or directory. - - Similar to rm -r. If the path does not exist and ignore_missing is False, - OSError is raised, otherwise it is ignored. - If silent is True, nothing is raised. - """ + """Remove a file or directory.""" def onerror(function, path, excinfo): # Function to handle ENOENT in shutil.rmtree() @@ -68,6 +63,7 @@ def onerror(function, path, excinfo): raise e log.info("Removing path `%s'", path) + log.debug("Attempting to remove path: %s", path) try: if os.path.isfile(path) or os.path.islink(path): @@ -83,6 +79,7 @@ def onerror(function, path, excinfo): # the exception handler handle it (i.e., check ignore_missing etc.) raise OSError(errno.ENOENT, "No such file or directory", path) except OSError as e: + log.debug("Error while removing path: %s", path) if silent: log.debug("Path `%s' does not exist, skipping removing it", path) return @@ -91,11 +88,15 @@ def onerror(function, path, excinfo): raise -def remove_ansi_color_sequences(text): - """Remove ANSI color sequences from text.""" - ansi_color_escape = re.compile(r"\x1B\[[0-9;]*m") - return ansi_color_escape.sub("", text) +def remove_ansi_sequences(text: str) -> str: + """Remove ANSI escape sequences from text.""" + if not isinstance(text, str): + return text + ansi_escape = re.compile( + r"\x1B(?:[@-Z\\-_]|\[[0-?]*[ -/]*[@-~])" + ) + return ansi_escape.sub("", text) def comment_magic_commands(code): """Comment the magic commands in a code block.""" @@ -128,21 +129,7 @@ def is_ipython() -> bool: def graceful_exit(exit_code): - """Exit the program gracefully. - - Running the function `sys.exit()` raises a special exception `SystemExit` - that is not caught by the python REPL, making it exit the program. - IPython's REPL, instead, does catch `SystemExit`. It displays the message - and then goes back to the REPL. - - Code that could either run in an IPython kernel (because the Kale pipeline - was produced from a notebook) or in a standard Python process, needs to - handle the exit process seamlessly, regardless of where it's running. - - In case the code is running inside an IPython kernel, this function raises - a `KaleGracefulExit` exception. This exception is expected to ke captured - inside the `kale.common.jputils.capture_streams` function. - """ + """Exit the program gracefully.""" if is_ipython(): from kale.common.jputils import KaleGracefulExit @@ -182,17 +169,15 @@ def clean_dir(path: str): def shorten_long_string(obj: Any, chars: int = 75): - """Shorten the string representation of the input object.""" + """Shorten long string.""" str_input = str(obj) - return str_input[:chars] + " ..... " + str_input[len(str_input) - chars :] + if len(str_input) <= chars * 2: + return str_input + return str_input[:chars] + " ..... " + str_input[-chars:] def dedent(text: str): - """Remove longest common prefix consisting of whitespaces. - - Args: - text: Multiline string - """ + """Dedent text.""" matches = re.findall(r"(?m)^\s+", text) if len(matches) < len(text.splitlines()): return text @@ -200,33 +185,7 @@ def dedent(text: str): def compute_pip_index_urls() -> list[str]: - """Compute the list of pip simple index URLs for generated KFP components. - - Using a local PyPI index is useful when itering on local - developement with an unpublished version of Kale. - - Precedence: - 1. If `KALE_PIP_INDEX_URLS` is set, split its comma-separated value and - return that list (order preserved). - 2. Else, if `KALE_DEV_MODE` is truthy (`1`, `true`, `yes`, or `on`), - return a list with the devpi simple URL (`KALE_DEVPI_SIMPLE_URL`) or - its default value. - 3. Otherwise, return the production default: - ["https://pypi.org/simple"]. - - Environment variables: - KALE_PIP_INDEX_URLS: - Comma-separated list of PEP 503 “simple” index URLs. Highest - priority. - KALE_DEV_MODE: - Boolean-like flag enabling dev mode (interprets 1/true/yes/on). - KALE_DEVPI_SIMPLE_URL: - Devpi “simple” index URL used when dev mode is enabled. - - Returns: - list[str]: Index URLs suitable for the `pip_index_urls` parameter in - `@kfp_dsl.component`. - """ + """Compute pip index URLs.""" pypi_prod_url = "https://pypi.org/simple" urls: list[str] @@ -261,17 +220,7 @@ def compute_pip_index_urls() -> list[str]: def compute_trusted_hosts() -> list[str]: - """Compute the list of trusted hosts for use with pip. - - If user set a comma separated list of hosts using the - system property KALE_PIP_TRUSTED_HOSTS, then these hosts - will be used as trusted hosts, otherwise the list will be - empty. - - - :return: The list of trusted hosts configured by the user - :rtype: list[str] - """ + """Compute trusted hosts.""" pip_trusted_hosts = os.getenv("KALE_PIP_TRUSTED_HOSTS") trusted_hosts: list[str] = [] if pip_trusted_hosts: diff --git a/kale/pipeline.py b/kale/pipeline.py index a327b9650..3e8df9a90 100644 --- a/kale/pipeline.py +++ b/kale/pipeline.py @@ -12,6 +12,32 @@ # See the License for the specific language governing permissions and # limitations under the License. +""" +Pipeline Module + +This module defines the core structure and execution logic of a Kale pipeline. + +Overview: +- A Pipeline is represented as a directed acyclic graph (DAG) of Steps +- Each Step represents a unit of execution +- Dependencies between steps define execution order + +Key Features: +- Uses networkx.DiGraph for DAG representation +- Supports topological sorting for execution +- Handles pipeline parameters and their propagation +- Provides utilities for dependency tracking and graph traversal + +Execution Flow: +1. Steps are added to the pipeline +2. Dependencies are defined between steps +3. The pipeline is executed in topological order +4. Each step receives pipeline parameters and executes accordingly + +This module acts as the central orchestration layer connecting +steps, configuration, and execution logic. +""" + from collections.abc import Iterable import copy import logging @@ -190,13 +216,22 @@ def _set_marshal_path(self): class Pipeline(nx.DiGraph): - """A Pipeline that can be converted into a KFP pipeline. - - This class is used to define a pipeline, its steps and all its - configurations. It extends nx.DiGraph to exploit some graph-related - algorithms but provides helper functions to work with Step objects - instead of standard networkx "nodes". This makes it simpler to access - the steps of the pipeline and their attributes. + """Represents a Kale pipeline as a Directed Acyclic Graph (DAG). + + The Pipeline class extends networkx.DiGraph to model execution + dependencies between Step objects. Each node corresponds to a Step, + and edges represent execution order. + + Responsibilities: + - Manage step addition and dependency linking + - Maintain pipeline parameters + - Provide execution order via topological sorting + - Offer utilities to inspect pipeline structure + + Attributes: + config (PipelineConfig): Pipeline configuration metadata + pipeline_parameters (dict): Mapping of parameter names to PipelineParam + processor: Optional processor used during execution """ def __init__(self, config: PipelineConfig, *args, **kwargs): @@ -212,7 +247,16 @@ def run(self): step.run(self.pipeline_parameters) def add_step(self, step: Step): - """Add a new Step to the pipeline.""" + """Add a Step to the pipeline. + + Each step must have a unique name. The step is stored as a node + in the underlying graph. + + Args: + step (Step): The step to add. + + Raises: + RuntimeError: If the object is not a Step or name already exists.""" if not isinstance(step, Step): raise RuntimeError("Not of type Step.") if step.name in self.steps_names: @@ -220,7 +264,13 @@ def add_step(self, step: Step): self.add_node(step.name, step=step) def add_dependency(self, parent: Step, child: Step): - """Link two Steps in the pipeline.""" + """Define an execution dependency between two steps. + + The child step will only execute after the parent step completes. + + Args: + parent (Step): The upstream step. + child (Step): The downstream step.""" self.add_edge(parent.name, child.name) def get_step(self, name: str) -> Step: @@ -282,12 +332,12 @@ def _steps_iterable(self, step_names: Iterable[str]) -> Iterable[Step]: yield self.get_step(name) def get_leaf_steps(self): - """Get the list of leaf steps of the pipeline. + """Retrieve all leaf steps in the pipeline. - A step is considered a leaf when its in-degree is > 0 and its - out-degree is 0. + A leaf step has no outgoing dependencies (i.e., no children). - Returns (list): A list of leaf Steps. + Returns: + list[Step]: List of leaf steps. """ return [x for x in self.steps if self.out_degree(x.name) == 0] diff --git a/kale/step.py b/kale/step.py index b3a55af59..1ad746a78 100644 --- a/kale/step.py +++ b/kale/step.py @@ -55,7 +55,24 @@ class StepConfig(Config): class Step: - """Class used to store information about a Step of the pipeline.""" + """Represents a single execution unit within a Kale pipeline. + + A Step encapsulates: + - The source code or callable to execute + - Input and output data + - Associated artifacts and pipeline parameters + - Execution configuration (via StepConfig) + + Steps are connected through dependencies to form a pipeline DAG + and are executed either locally or within a Kubeflow pipeline. + + Attributes: + source (list[str] | Callable): Code or function executed by the step + ins (list[Any]): Input variables for the step + outs (list[Any]): Output variables produced by the step + artifacts (list[Artifact]): Artifacts associated with the step + parameters (dict[str, PipelineParam]): Pipeline parameters consumed + """ def __init__( self, source: list[str] | Callable, ins: list[Any] = None, outs: list[Any] = None, **kwargs @@ -76,7 +93,10 @@ def __init__( self.fns_free_variables = {} def __call__(self, *args, **kwargs): - """Handler for when the @step decorated function is called.""" + """Invoke the step execution handler. + + This method is triggered when the step is called like a function. + It delegates execution to the configured execution handler.""" return execution_handler(self, *args, **kwargs) def add_artifact(self, artifact_name, artifact_type, is_input): @@ -105,7 +125,19 @@ def add_artifact(self, artifact_name, artifact_type, is_input): self.artifacts.append(new_artifact) def run(self, pipeline_parameters_values: dict[str, PipelineParam]): - """Run the step locally.""" + """Execute the step locally. + + This method: + - Selects the relevant pipeline parameters for this step + - Uses a Marshaller to execute the step's source + - Handles input/output serialization + - Links generated artifacts after execution + + Args: + pipeline_parameters_values (dict[str, PipelineParam]): + Dictionary of all pipeline parameters and their values. + + """ log.info("%s Running step '%s'... %s", "-" * 10, self.name, "-" * 10) # select just the pipeline parameters consumed by this step _params = {k: pipeline_parameters_values[k] for k in self.parameters} @@ -193,6 +225,23 @@ def kfp_outputs(self) -> list[Artifact]: def __default_execution_handler(step: Step, *args, **kwargs): + """ + Default execution handler for a Step. + + This handler is used when no pipeline execution context is set. + It attempts to execute the step's source as a plain Python function. + + If the step source is not callable (e.g., generated from a Notebook), + local execution is not supported and a RuntimeError is raised. + + Args: + step (Step): The step to execute. + *args: Positional arguments passed to the step function. + **kwargs: Keyword arguments passed to the step function. + + Raises: + RuntimeError: If the step source is not callable. + """ log.info("No Pipeline registration handler is set.") if not callable(step.source): raise RuntimeError(