diff --git a/kale/common/utils.py b/kale/common/utils.py index d28f2513e..3a3e33dd9 100644 --- a/kale/common/utils.py +++ b/kale/common/utils.py @@ -40,12 +40,32 @@ def main_source_lives_in_cwd(): def random_string(size=5, chars=string.ascii_lowercase + string.digits): - """Generate random string.""" + """ + Generate a random string of specified length. + + Args: + size (int): Length of the generated string. Defaults to 5. + chars (str): Characters to choose from. Defaults to lowercase letters and digits. + + Returns: + str: Randomly generated string. + + Note: + Uses Python's random module and is not cryptographically secure. + """ return "".join(random.choice(chars) for _ in range(size)) def abs_working_dir(path): - """Get absolute path to parent dir.""" + """ + Get the absolute path of a directory, or the parent directory if a file path is provided. + + Args: + path (str): File or directory path. + + Returns: + str: Absolute path to the directory. + """ abs_path = os.path.abspath(path) if os.path.isfile(path): abs_path = os.path.dirname(abs_path) @@ -68,6 +88,7 @@ def onerror(function, path, excinfo): raise e log.info("Removing path `%s'", path) + log.debug("Attempting to remove path: %s", path) try: if os.path.isfile(path) or os.path.islink(path): @@ -83,6 +104,7 @@ def onerror(function, path, excinfo): # the exception handler handle it (i.e., check ignore_missing etc.) raise OSError(errno.ENOENT, "No such file or directory", path) except OSError as e: + log.debug("Error while removing path: %s", path) if silent: log.debug("Path `%s' does not exist, skipping removing it", path) return @@ -91,11 +113,23 @@ def onerror(function, path, excinfo): raise -def remove_ansi_color_sequences(text): - """Remove ANSI color sequences from text.""" - ansi_color_escape = re.compile(r"\x1B\[[0-9;]*m") - return ansi_color_escape.sub("", text) +def remove_ansi_sequences(text: str) -> str: + """ + Remove ANSI escape sequences from text. + + Args: + text (str): Input string possibly containing ANSI codes. + + Returns: + str: Cleaned string without ANSI sequences. + """ + if not isinstance(text, str): + return text + ansi_escape = re.compile( + r"\x1B(?:[@-Z\\-_]|\[[0-?]*[ -/]*[@-~])" + ) + return ansi_escape.sub("", text) def comment_magic_commands(code): """Comment the magic commands in a code block.""" @@ -182,17 +216,33 @@ def clean_dir(path: str): def shorten_long_string(obj: Any, chars: int = 75): - """Shorten the string representation of the input object.""" + """ + Shorten the string representation of an object by keeping the beginning and end. + + Args: + obj (Any): Input object to be shortened. + chars (int): Number of characters to keep at both ends. + + Returns: + str: Shortened string with ellipsis in the middle. + """ str_input = str(obj) - return str_input[:chars] + " ..... " + str_input[len(str_input) - chars :] + if len(str_input) <= chars * 2: + return str_input + return str_input[:chars] + " ..... " + str_input[-chars:] def dedent(text: str): - """Remove longest common prefix consisting of whitespaces. + """ + Remove the longest common leading whitespace from each line in a multiline string. + (This helps normalize indentation.) - Args: - text: Multiline string - """ + Args: + text (str): Multiline string. + + Returns: + str: Dedented string with normalized indentation. + """ matches = re.findall(r"(?m)^\s+", text) if len(matches) < len(text.splitlines()): return text diff --git a/kale/pipeline.py b/kale/pipeline.py index a327b9650..3e8df9a90 100644 --- a/kale/pipeline.py +++ b/kale/pipeline.py @@ -12,6 +12,32 @@ # See the License for the specific language governing permissions and # limitations under the License. +""" +Pipeline Module + +This module defines the core structure and execution logic of a Kale pipeline. + +Overview: +- A Pipeline is represented as a directed acyclic graph (DAG) of Steps +- Each Step represents a unit of execution +- Dependencies between steps define execution order + +Key Features: +- Uses networkx.DiGraph for DAG representation +- Supports topological sorting for execution +- Handles pipeline parameters and their propagation +- Provides utilities for dependency tracking and graph traversal + +Execution Flow: +1. Steps are added to the pipeline +2. Dependencies are defined between steps +3. The pipeline is executed in topological order +4. Each step receives pipeline parameters and executes accordingly + +This module acts as the central orchestration layer connecting +steps, configuration, and execution logic. +""" + from collections.abc import Iterable import copy import logging @@ -190,13 +216,22 @@ def _set_marshal_path(self): class Pipeline(nx.DiGraph): - """A Pipeline that can be converted into a KFP pipeline. - - This class is used to define a pipeline, its steps and all its - configurations. It extends nx.DiGraph to exploit some graph-related - algorithms but provides helper functions to work with Step objects - instead of standard networkx "nodes". This makes it simpler to access - the steps of the pipeline and their attributes. + """Represents a Kale pipeline as a Directed Acyclic Graph (DAG). + + The Pipeline class extends networkx.DiGraph to model execution + dependencies between Step objects. Each node corresponds to a Step, + and edges represent execution order. + + Responsibilities: + - Manage step addition and dependency linking + - Maintain pipeline parameters + - Provide execution order via topological sorting + - Offer utilities to inspect pipeline structure + + Attributes: + config (PipelineConfig): Pipeline configuration metadata + pipeline_parameters (dict): Mapping of parameter names to PipelineParam + processor: Optional processor used during execution """ def __init__(self, config: PipelineConfig, *args, **kwargs): @@ -212,7 +247,16 @@ def run(self): step.run(self.pipeline_parameters) def add_step(self, step: Step): - """Add a new Step to the pipeline.""" + """Add a Step to the pipeline. + + Each step must have a unique name. The step is stored as a node + in the underlying graph. + + Args: + step (Step): The step to add. + + Raises: + RuntimeError: If the object is not a Step or name already exists.""" if not isinstance(step, Step): raise RuntimeError("Not of type Step.") if step.name in self.steps_names: @@ -220,7 +264,13 @@ def add_step(self, step: Step): self.add_node(step.name, step=step) def add_dependency(self, parent: Step, child: Step): - """Link two Steps in the pipeline.""" + """Define an execution dependency between two steps. + + The child step will only execute after the parent step completes. + + Args: + parent (Step): The upstream step. + child (Step): The downstream step.""" self.add_edge(parent.name, child.name) def get_step(self, name: str) -> Step: @@ -282,12 +332,12 @@ def _steps_iterable(self, step_names: Iterable[str]) -> Iterable[Step]: yield self.get_step(name) def get_leaf_steps(self): - """Get the list of leaf steps of the pipeline. + """Retrieve all leaf steps in the pipeline. - A step is considered a leaf when its in-degree is > 0 and its - out-degree is 0. + A leaf step has no outgoing dependencies (i.e., no children). - Returns (list): A list of leaf Steps. + Returns: + list[Step]: List of leaf steps. """ return [x for x in self.steps if self.out_degree(x.name) == 0] diff --git a/kale/step.py b/kale/step.py index b3a55af59..1ad746a78 100644 --- a/kale/step.py +++ b/kale/step.py @@ -55,7 +55,24 @@ class StepConfig(Config): class Step: - """Class used to store information about a Step of the pipeline.""" + """Represents a single execution unit within a Kale pipeline. + + A Step encapsulates: + - The source code or callable to execute + - Input and output data + - Associated artifacts and pipeline parameters + - Execution configuration (via StepConfig) + + Steps are connected through dependencies to form a pipeline DAG + and are executed either locally or within a Kubeflow pipeline. + + Attributes: + source (list[str] | Callable): Code or function executed by the step + ins (list[Any]): Input variables for the step + outs (list[Any]): Output variables produced by the step + artifacts (list[Artifact]): Artifacts associated with the step + parameters (dict[str, PipelineParam]): Pipeline parameters consumed + """ def __init__( self, source: list[str] | Callable, ins: list[Any] = None, outs: list[Any] = None, **kwargs @@ -76,7 +93,10 @@ def __init__( self.fns_free_variables = {} def __call__(self, *args, **kwargs): - """Handler for when the @step decorated function is called.""" + """Invoke the step execution handler. + + This method is triggered when the step is called like a function. + It delegates execution to the configured execution handler.""" return execution_handler(self, *args, **kwargs) def add_artifact(self, artifact_name, artifact_type, is_input): @@ -105,7 +125,19 @@ def add_artifact(self, artifact_name, artifact_type, is_input): self.artifacts.append(new_artifact) def run(self, pipeline_parameters_values: dict[str, PipelineParam]): - """Run the step locally.""" + """Execute the step locally. + + This method: + - Selects the relevant pipeline parameters for this step + - Uses a Marshaller to execute the step's source + - Handles input/output serialization + - Links generated artifacts after execution + + Args: + pipeline_parameters_values (dict[str, PipelineParam]): + Dictionary of all pipeline parameters and their values. + + """ log.info("%s Running step '%s'... %s", "-" * 10, self.name, "-" * 10) # select just the pipeline parameters consumed by this step _params = {k: pipeline_parameters_values[k] for k in self.parameters} @@ -193,6 +225,23 @@ def kfp_outputs(self) -> list[Artifact]: def __default_execution_handler(step: Step, *args, **kwargs): + """ + Default execution handler for a Step. + + This handler is used when no pipeline execution context is set. + It attempts to execute the step's source as a plain Python function. + + If the step source is not callable (e.g., generated from a Notebook), + local execution is not supported and a RuntimeError is raised. + + Args: + step (Step): The step to execute. + *args: Positional arguments passed to the step function. + **kwargs: Keyword arguments passed to the step function. + + Raises: + RuntimeError: If the step source is not callable. + """ log.info("No Pipeline registration handler is set.") if not callable(step.source): raise RuntimeError(