From 1a3466a969fab70b65d4c77664e966ef6bafe02c Mon Sep 17 00:00:00 2001 From: KAIBALYA MOHANTY <168870673+Kaibalya-Mohanty@users.noreply.github.com> Date: Tue, 17 Mar 2026 19:01:03 +0530 Subject: [PATCH 1/7] utils.py Signed-off-by: KAIBALYA MOHANTY <168870673+Kaibalya-Mohanty@users.noreply.github.com> --- kale/common/utils.py | 58 ++++++++++++++++++++++++++++++++++++++------ 1 file changed, 50 insertions(+), 8 deletions(-) diff --git a/kale/common/utils.py b/kale/common/utils.py index d28f2513e..44ee340fe 100644 --- a/kale/common/utils.py +++ b/kale/common/utils.py @@ -40,12 +40,32 @@ def main_source_lives_in_cwd(): def random_string(size=5, chars=string.ascii_lowercase + string.digits): - """Generate random string.""" + """ + Generate a random string of specified length. + + Args: + size (int): Length of the generated string. Defaults to 5. + chars (str): Characters to choose from. Defaults to lowercase letters and digits. + + Returns: + str: Randomly generated string. + + Note: + Uses Python's random module and is not cryptographically secure. + """ return "".join(random.choice(chars) for _ in range(size)) def abs_working_dir(path): - """Get absolute path to parent dir.""" + """ + Get the absolute path of a directory, or the parent directory if a file path is provided. + + Args: + path (str): File or directory path. + + Returns: + str: Absolute path to the directory. + """ abs_path = os.path.abspath(path) if os.path.isfile(path): abs_path = os.path.dirname(abs_path) @@ -92,7 +112,15 @@ def onerror(function, path, excinfo): def remove_ansi_color_sequences(text): - """Remove ANSI color sequences from text.""" + """ + Remove ANSI escape sequences (color codes) from text. + + Args: + text (str): Input string possibly containing ANSI codes. + + Returns: + str: Cleaned string without ANSI sequences. + """ ansi_color_escape = re.compile(r"\x1B\[[0-9;]*m") return ansi_color_escape.sub("", text) @@ -182,17 +210,31 @@ def clean_dir(path: str): def shorten_long_string(obj: Any, chars: int = 75): - """Shorten the string representation of the input object.""" + """ + Shorten the string representation of an object by keeping the beginning and end. + + Args: + obj (Any): Input object to be shortened. + chars (int): Number of characters to keep at both ends. + + Returns: + str: Shortened string with ellipsis in the middle. + """ str_input = str(obj) return str_input[:chars] + " ..... " + str_input[len(str_input) - chars :] def dedent(text: str): - """Remove longest common prefix consisting of whitespaces. + """ + Remove the longest common leading whitespace from each line in a multiline string. + (This helps normalize indentation.) - Args: - text: Multiline string - """ + Args: + text (str): Multiline string. + + Returns: + str: Dedented string with normalized indentation. + """ matches = re.findall(r"(?m)^\s+", text) if len(matches) < len(text.splitlines()): return text From 1e47bb53a83c41e77e8c36459bb8d717e9a07376 Mon Sep 17 00:00:00 2001 From: KAIBALYA MOHANTY <168870673+Kaibalya-Mohanty@users.noreply.github.com> Date: Thu, 19 Mar 2026 15:50:34 +0530 Subject: [PATCH 2/7] Update pipeline.py Signed-off-by: KAIBALYA MOHANTY <168870673+Kaibalya-Mohanty@users.noreply.github.com> --- kale/pipeline.py | 76 +++++++++++++++++++++++++++++++++++++++--------- 1 file changed, 63 insertions(+), 13 deletions(-) diff --git a/kale/pipeline.py b/kale/pipeline.py index a327b9650..3e8df9a90 100644 --- a/kale/pipeline.py +++ b/kale/pipeline.py @@ -12,6 +12,32 @@ # See the License for the specific language governing permissions and # limitations under the License. +""" +Pipeline Module + +This module defines the core structure and execution logic of a Kale pipeline. + +Overview: +- A Pipeline is represented as a directed acyclic graph (DAG) of Steps +- Each Step represents a unit of execution +- Dependencies between steps define execution order + +Key Features: +- Uses networkx.DiGraph for DAG representation +- Supports topological sorting for execution +- Handles pipeline parameters and their propagation +- Provides utilities for dependency tracking and graph traversal + +Execution Flow: +1. Steps are added to the pipeline +2. Dependencies are defined between steps +3. The pipeline is executed in topological order +4. Each step receives pipeline parameters and executes accordingly + +This module acts as the central orchestration layer connecting +steps, configuration, and execution logic. +""" + from collections.abc import Iterable import copy import logging @@ -190,13 +216,22 @@ def _set_marshal_path(self): class Pipeline(nx.DiGraph): - """A Pipeline that can be converted into a KFP pipeline. - - This class is used to define a pipeline, its steps and all its - configurations. It extends nx.DiGraph to exploit some graph-related - algorithms but provides helper functions to work with Step objects - instead of standard networkx "nodes". This makes it simpler to access - the steps of the pipeline and their attributes. + """Represents a Kale pipeline as a Directed Acyclic Graph (DAG). + + The Pipeline class extends networkx.DiGraph to model execution + dependencies between Step objects. Each node corresponds to a Step, + and edges represent execution order. + + Responsibilities: + - Manage step addition and dependency linking + - Maintain pipeline parameters + - Provide execution order via topological sorting + - Offer utilities to inspect pipeline structure + + Attributes: + config (PipelineConfig): Pipeline configuration metadata + pipeline_parameters (dict): Mapping of parameter names to PipelineParam + processor: Optional processor used during execution """ def __init__(self, config: PipelineConfig, *args, **kwargs): @@ -212,7 +247,16 @@ def run(self): step.run(self.pipeline_parameters) def add_step(self, step: Step): - """Add a new Step to the pipeline.""" + """Add a Step to the pipeline. + + Each step must have a unique name. The step is stored as a node + in the underlying graph. + + Args: + step (Step): The step to add. + + Raises: + RuntimeError: If the object is not a Step or name already exists.""" if not isinstance(step, Step): raise RuntimeError("Not of type Step.") if step.name in self.steps_names: @@ -220,7 +264,13 @@ def add_step(self, step: Step): self.add_node(step.name, step=step) def add_dependency(self, parent: Step, child: Step): - """Link two Steps in the pipeline.""" + """Define an execution dependency between two steps. + + The child step will only execute after the parent step completes. + + Args: + parent (Step): The upstream step. + child (Step): The downstream step.""" self.add_edge(parent.name, child.name) def get_step(self, name: str) -> Step: @@ -282,12 +332,12 @@ def _steps_iterable(self, step_names: Iterable[str]) -> Iterable[Step]: yield self.get_step(name) def get_leaf_steps(self): - """Get the list of leaf steps of the pipeline. + """Retrieve all leaf steps in the pipeline. - A step is considered a leaf when its in-degree is > 0 and its - out-degree is 0. + A leaf step has no outgoing dependencies (i.e., no children). - Returns (list): A list of leaf Steps. + Returns: + list[Step]: List of leaf steps. """ return [x for x in self.steps if self.out_degree(x.name) == 0] From e2991b98aee6fb09be1a1ddaf4f98dd69b2a91fa Mon Sep 17 00:00:00 2001 From: KAIBALYA MOHANTY <168870673+Kaibalya-Mohanty@users.noreply.github.com> Date: Thu, 19 Mar 2026 16:28:30 +0530 Subject: [PATCH 3/7] Update step.py Signed-off-by: KAIBALYA MOHANTY <168870673+Kaibalya-Mohanty@users.noreply.github.com> --- kale/step.py | 52 +++++++++++++++++++++++++++++++++++++++++++++++++--- 1 file changed, 49 insertions(+), 3 deletions(-) diff --git a/kale/step.py b/kale/step.py index b3a55af59..b685ec767 100644 --- a/kale/step.py +++ b/kale/step.py @@ -55,7 +55,23 @@ class StepConfig(Config): class Step: - """Class used to store information about a Step of the pipeline.""" + """Represents a single execution unit within a Kale pipeline. + + A Step encapsulates: + - The source code or callable to execute + - Input and output data + - Associated artifacts and pipeline parameters + - Execution configuration (via StepConfig) + + Steps are connected through dependencies to form a pipeline DAG + and are executed either locally or within a Kubeflow pipeline. + + Attributes: + source (list[str] | Callable): Code or function executed by the step + ins (list[Any]): Input variables for the step + outs (list[Any]): Output variables produced by the step + artifacts (list[Artifact]): Artifacts associated with the step + parameters (dict[str, PipelineParam]): Pipeline parameters consumed""" def __init__( self, source: list[str] | Callable, ins: list[Any] = None, outs: list[Any] = None, **kwargs @@ -76,7 +92,10 @@ def __init__( self.fns_free_variables = {} def __call__(self, *args, **kwargs): - """Handler for when the @step decorated function is called.""" + """Invoke the step execution handler. + + This method is triggered when the step is called like a function. + It delegates execution to the configured execution handler.""" return execution_handler(self, *args, **kwargs) def add_artifact(self, artifact_name, artifact_type, is_input): @@ -105,7 +124,17 @@ def add_artifact(self, artifact_name, artifact_type, is_input): self.artifacts.append(new_artifact) def run(self, pipeline_parameters_values: dict[str, PipelineParam]): - """Run the step locally.""" + """Execute the step locally. + + This method: + - Selects the relevant pipeline parameters for this step + - Uses a Marshaller to execute the step's source + - Handles input/output serialization + - Links generated artifacts after execution + + Args: + pipeline_parameters_values (dict[str, PipelineParam]): + Dictionary of all pipeline parameters and their values.""" log.info("%s Running step '%s'... %s", "-" * 10, self.name, "-" * 10) # select just the pipeline parameters consumed by this step _params = {k: pipeline_parameters_values[k] for k in self.parameters} @@ -193,6 +222,23 @@ def kfp_outputs(self) -> list[Artifact]: def __default_execution_handler(step: Step, *args, **kwargs): + """ + Default execution handler for a Step. + + This handler is used when no pipeline execution context is set. + It attempts to execute the step's source as a plain Python function. + + If the step source is not callable (e.g., generated from a Notebook), + local execution is not supported and a RuntimeError is raised. + + Args: + step (Step): The step to execute. + *args: Positional arguments passed to the step function. + **kwargs: Keyword arguments passed to the step function. + + Raises: + RuntimeError: If the step source is not callable. + """ log.info("No Pipeline registration handler is set.") if not callable(step.source): raise RuntimeError( From 481e7d9b86fb37501c98ebd17e458c53c907a149 Mon Sep 17 00:00:00 2001 From: KAIBALYA MOHANTY <168870673+Kaibalya-Mohanty@users.noreply.github.com> Date: Thu, 19 Mar 2026 16:50:44 +0530 Subject: [PATCH 4/7] Update step.py Signed-off-by: KAIBALYA MOHANTY <168870673+Kaibalya-Mohanty@users.noreply.github.com> --- kale/step.py | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/kale/step.py b/kale/step.py index b685ec767..1ad746a78 100644 --- a/kale/step.py +++ b/kale/step.py @@ -71,7 +71,8 @@ class Step: ins (list[Any]): Input variables for the step outs (list[Any]): Output variables produced by the step artifacts (list[Artifact]): Artifacts associated with the step - parameters (dict[str, PipelineParam]): Pipeline parameters consumed""" + parameters (dict[str, PipelineParam]): Pipeline parameters consumed + """ def __init__( self, source: list[str] | Callable, ins: list[Any] = None, outs: list[Any] = None, **kwargs @@ -134,7 +135,9 @@ def run(self, pipeline_parameters_values: dict[str, PipelineParam]): Args: pipeline_parameters_values (dict[str, PipelineParam]): - Dictionary of all pipeline parameters and their values.""" + Dictionary of all pipeline parameters and their values. + + """ log.info("%s Running step '%s'... %s", "-" * 10, self.name, "-" * 10) # select just the pipeline parameters consumed by this step _params = {k: pipeline_parameters_values[k] for k in self.parameters} From 69085792ae661329e85cbed7968785f6ebdab256 Mon Sep 17 00:00:00 2001 From: KAIBALYA MOHANTY <168870673+Kaibalya-Mohanty@users.noreply.github.com> Date: Fri, 20 Mar 2026 22:16:12 +0530 Subject: [PATCH 5/7] Refactor ANSI sequence removal function Signed-off-by: KAIBALYA MOHANTY <168870673+Kaibalya-Mohanty@users.noreply.github.com> --- kale/common/utils.py | 32 ++++++++++++++++++++------------ 1 file changed, 20 insertions(+), 12 deletions(-) diff --git a/kale/common/utils.py b/kale/common/utils.py index 44ee340fe..3a3e33dd9 100644 --- a/kale/common/utils.py +++ b/kale/common/utils.py @@ -88,6 +88,7 @@ def onerror(function, path, excinfo): raise e log.info("Removing path `%s'", path) + log.debug("Attempting to remove path: %s", path) try: if os.path.isfile(path) or os.path.islink(path): @@ -103,6 +104,7 @@ def onerror(function, path, excinfo): # the exception handler handle it (i.e., check ignore_missing etc.) raise OSError(errno.ENOENT, "No such file or directory", path) except OSError as e: + log.debug("Error while removing path: %s", path) if silent: log.debug("Path `%s' does not exist, skipping removing it", path) return @@ -111,19 +113,23 @@ def onerror(function, path, excinfo): raise -def remove_ansi_color_sequences(text): - """ - Remove ANSI escape sequences (color codes) from text. - - Args: - text (str): Input string possibly containing ANSI codes. +def remove_ansi_sequences(text: str) -> str: + """ + Remove ANSI escape sequences from text. - Returns: - str: Cleaned string without ANSI sequences. - """ - ansi_color_escape = re.compile(r"\x1B\[[0-9;]*m") - return ansi_color_escape.sub("", text) + Args: + text (str): Input string possibly containing ANSI codes. + + Returns: + str: Cleaned string without ANSI sequences. + """ + if not isinstance(text, str): + return text + ansi_escape = re.compile( + r"\x1B(?:[@-Z\\-_]|\[[0-?]*[ -/]*[@-~])" + ) + return ansi_escape.sub("", text) def comment_magic_commands(code): """Comment the magic commands in a code block.""" @@ -221,7 +227,9 @@ def shorten_long_string(obj: Any, chars: int = 75): str: Shortened string with ellipsis in the middle. """ str_input = str(obj) - return str_input[:chars] + " ..... " + str_input[len(str_input) - chars :] + if len(str_input) <= chars * 2: + return str_input + return str_input[:chars] + " ..... " + str_input[-chars:] def dedent(text: str): From c95f750ec1e80ff4143570648d712ca728a4adae Mon Sep 17 00:00:00 2001 From: KAIBALYA MOHANTY <168870673+Kaibalya-Mohanty@users.noreply.github.com> Date: Fri, 20 Mar 2026 22:18:10 +0530 Subject: [PATCH 6/7] fix(utils): improve ANSI sequence handling and logging - Improved ANSI escape sequence handling with robust regex - Added type safety for non-string inputs - Enhanced logging with debug statements for better traceability Signed-off-by: KAIBALYA MOHANTY <168870673+Kaibalya-Mohanty@users.noreply.github.com> --- kale/common/utils.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/kale/common/utils.py b/kale/common/utils.py index 3a3e33dd9..3df748107 100644 --- a/kale/common/utils.py +++ b/kale/common/utils.py @@ -1,4 +1,4 @@ -# Copyright 2026 The Kubeflow Authors. +# Copyright 2026 The Kubeflow Authors. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. From 654cd650e85ca9ca1e0d31cf803ad56ce559aaf5 Mon Sep 17 00:00:00 2001 From: KAIBALYA MOHANTY <168870673+Kaibalya-Mohanty@users.noreply.github.com> Date: Mon, 23 Mar 2026 21:30:24 +0530 Subject: [PATCH 7/7] Remove docstring changes to keep PR focused on ANSI handling Updated docstrings to be more concise while retaining essential information. Signed-off-by: KAIBALYA MOHANTY <168870673+Kaibalya-Mohanty@users.noreply.github.com> --- kale/common/utils.py | 119 ++++--------------------------------------- 1 file changed, 9 insertions(+), 110 deletions(-) diff --git a/kale/common/utils.py b/kale/common/utils.py index 3df748107..19992892f 100644 --- a/kale/common/utils.py +++ b/kale/common/utils.py @@ -40,32 +40,12 @@ def main_source_lives_in_cwd(): def random_string(size=5, chars=string.ascii_lowercase + string.digits): - """ - Generate a random string of specified length. - - Args: - size (int): Length of the generated string. Defaults to 5. - chars (str): Characters to choose from. Defaults to lowercase letters and digits. - - Returns: - str: Randomly generated string. - - Note: - Uses Python's random module and is not cryptographically secure. - """ + """Generate random string.""" return "".join(random.choice(chars) for _ in range(size)) def abs_working_dir(path): - """ - Get the absolute path of a directory, or the parent directory if a file path is provided. - - Args: - path (str): File or directory path. - - Returns: - str: Absolute path to the directory. - """ + """Get absolute working directory.""" abs_path = os.path.abspath(path) if os.path.isfile(path): abs_path = os.path.dirname(abs_path) @@ -73,12 +53,7 @@ def abs_working_dir(path): def rm_r(path, ignore_missing=True, silent=False): - """Remove a file or directory. - - Similar to rm -r. If the path does not exist and ignore_missing is False, - OSError is raised, otherwise it is ignored. - If silent is True, nothing is raised. - """ + """Remove a file or directory.""" def onerror(function, path, excinfo): # Function to handle ENOENT in shutil.rmtree() @@ -114,15 +89,7 @@ def onerror(function, path, excinfo): def remove_ansi_sequences(text: str) -> str: - """ - Remove ANSI escape sequences from text. - - Args: - text (str): Input string possibly containing ANSI codes. - - Returns: - str: Cleaned string without ANSI sequences. - """ + """Remove ANSI escape sequences from text.""" if not isinstance(text, str): return text @@ -162,21 +129,7 @@ def is_ipython() -> bool: def graceful_exit(exit_code): - """Exit the program gracefully. - - Running the function `sys.exit()` raises a special exception `SystemExit` - that is not caught by the python REPL, making it exit the program. - IPython's REPL, instead, does catch `SystemExit`. It displays the message - and then goes back to the REPL. - - Code that could either run in an IPython kernel (because the Kale pipeline - was produced from a notebook) or in a standard Python process, needs to - handle the exit process seamlessly, regardless of where it's running. - - In case the code is running inside an IPython kernel, this function raises - a `KaleGracefulExit` exception. This exception is expected to ke captured - inside the `kale.common.jputils.capture_streams` function. - """ + """Exit the program gracefully.""" if is_ipython(): from kale.common.jputils import KaleGracefulExit @@ -216,16 +169,7 @@ def clean_dir(path: str): def shorten_long_string(obj: Any, chars: int = 75): - """ - Shorten the string representation of an object by keeping the beginning and end. - - Args: - obj (Any): Input object to be shortened. - chars (int): Number of characters to keep at both ends. - - Returns: - str: Shortened string with ellipsis in the middle. - """ + """Shorten long string.""" str_input = str(obj) if len(str_input) <= chars * 2: return str_input @@ -233,16 +177,7 @@ def shorten_long_string(obj: Any, chars: int = 75): def dedent(text: str): - """ - Remove the longest common leading whitespace from each line in a multiline string. - (This helps normalize indentation.) - - Args: - text (str): Multiline string. - - Returns: - str: Dedented string with normalized indentation. - """ + """Dedent text.""" matches = re.findall(r"(?m)^\s+", text) if len(matches) < len(text.splitlines()): return text @@ -250,33 +185,7 @@ def dedent(text: str): def compute_pip_index_urls() -> list[str]: - """Compute the list of pip simple index URLs for generated KFP components. - - Using a local PyPI index is useful when itering on local - developement with an unpublished version of Kale. - - Precedence: - 1. If `KALE_PIP_INDEX_URLS` is set, split its comma-separated value and - return that list (order preserved). - 2. Else, if `KALE_DEV_MODE` is truthy (`1`, `true`, `yes`, or `on`), - return a list with the devpi simple URL (`KALE_DEVPI_SIMPLE_URL`) or - its default value. - 3. Otherwise, return the production default: - ["https://pypi.org/simple"]. - - Environment variables: - KALE_PIP_INDEX_URLS: - Comma-separated list of PEP 503 “simple” index URLs. Highest - priority. - KALE_DEV_MODE: - Boolean-like flag enabling dev mode (interprets 1/true/yes/on). - KALE_DEVPI_SIMPLE_URL: - Devpi “simple” index URL used when dev mode is enabled. - - Returns: - list[str]: Index URLs suitable for the `pip_index_urls` parameter in - `@kfp_dsl.component`. - """ + """Compute pip index URLs.""" pypi_prod_url = "https://pypi.org/simple" urls: list[str] @@ -311,17 +220,7 @@ def compute_pip_index_urls() -> list[str]: def compute_trusted_hosts() -> list[str]: - """Compute the list of trusted hosts for use with pip. - - If user set a comma separated list of hosts using the - system property KALE_PIP_TRUSTED_HOSTS, then these hosts - will be used as trusted hosts, otherwise the list will be - empty. - - - :return: The list of trusted hosts configured by the user - :rtype: list[str] - """ + """Compute trusted hosts.""" pip_trusted_hosts = os.getenv("KALE_PIP_TRUSTED_HOSTS") trusted_hosts: list[str] = [] if pip_trusted_hosts: