diff --git a/examples/basics/basics/task.py b/examples/basics/basics/task.py index 67992ea9f..9e7e6914e 100644 --- a/examples/basics/basics/task.py +++ b/examples/basics/basics/task.py @@ -34,7 +34,6 @@ from sklearn.linear_model import LogisticRegression from sklearn.model_selection import train_test_split - # %% [markdown] # The use of the :py:func:`flytekit.task` decorator is mandatory for a ``PythonFunctionTask``. # A task is essentially a regular Python function, with the exception that all inputs and outputs must be clearly annotated with their types. @@ -42,9 +41,7 @@ # %% @task -def train_model( - hyperparameters: dict, test_size: float, random_state: int -) -> LogisticRegression: +def train_model(hyperparameters: dict, test_size: float, random_state: int) -> LogisticRegression: """ Parameters: hyperparameters (dict): A dictionary containing the hyperparameters for the model. @@ -58,9 +55,7 @@ def train_model( iris = load_iris() # Splitting the data into train and test sets - X_train, _, y_train, _ = train_test_split( - iris.data, iris.target, test_size=test_size, random_state=random_state - ) + X_train, _, y_train, _ = train_test_split(iris.data, iris.target, test_size=test_size, random_state=random_state) # Creating and training the logistic regression model with the given hyperparameters clf = LogisticRegression(**hyperparameters) @@ -97,12 +92,10 @@ def train_model_wf( """ This workflow invokes the train_model task with the given hyperparameters, test size and random state. """ - return train_model( - hyperparameters=hyperparameters, test_size=test_size, random_state=random_state - ) + return train_model(hyperparameters=hyperparameters, test_size=test_size, random_state=random_state) -# %% +# %% [markdown] # ```{note} # When invoking the `train_model` task, you need to use keyword arguments to specify the values for the corresponding parameters. # ```` @@ -110,16 +103,17 @@ def train_model_wf( # ## Use `partial` to provide default arguments to tasks # # You can use the {py:func}`functools.partial` function to assign default or constant values to the parameters of your tasks. + +# %% import functools @workflow -def train_model_wf_with_partial( - test_size: float = 0.2, random_state: int = 42 -) -> LogisticRegression: +def train_model_wf_with_partial(test_size: float = 0.2, random_state: int = 42) -> LogisticRegression: partial_task = functools.partial(train_model, hyperparameters={"C": 0.1}) return partial_task(test_size=test_size, random_state=random_state) + # %% [markdown] # In this toy example, we're calling the `square` task twice and returning the result. diff --git a/examples/control_flow/control_flow/map_task.py b/examples/control_flow/control_flow/map_task.py index 925242b91..5bbbf5a1a 100644 --- a/examples/control_flow/control_flow/map_task.py +++ b/examples/control_flow/control_flow/map_task.py @@ -55,7 +55,7 @@ def coalesce(b: List[str]) -> str: # To repeat the execution of the `a_mappable_task` across a collection of inputs, use the {py:func}`~flytekit:flytekit.map_task` function from flytekit. # In this example, the input `a` is of type `List[int]`. # The `a_mappable_task` is executed for each element in the list. -# +# # You can utilize the `with_overrides` function to set resources specifically for individual map tasks. # This allows you to customize resource allocations such as memory usage. @@ -129,10 +129,9 @@ def multi_input_task(quantity: int, price: float, shipping: float) -> float: # %% import functools + @workflow -def multiple_workflow( - list_q: List[int] = [1, 2, 3, 4, 5], p: float = 6.0, s: float = 7.0 -) -> List[float]: +def multiple_workflow(list_q: List[int] = [1, 2, 3, 4, 5], p: float = 6.0, s: float = 7.0) -> List[float]: partial_task = functools.partial(multi_input_task, price=p, shipping=s) return map_task(partial_task)(quantity=list_q) @@ -147,9 +146,7 @@ def get_price() -> float: @workflow -def multiple_workflow_with_task_output( - list_q: List[int] = [1, 2, 3, 4, 5], s: float = 6.0 -) -> List[float]: +def multiple_workflow_with_task_output(list_q: List[int] = [1, 2, 3, 4, 5], s: float = 6.0) -> List[float]: p = get_price() partial_task = functools.partial(multi_input_task, price=p, shipping=s) return map_task(partial_task)(quantity=list_q) @@ -160,10 +157,13 @@ def multiple_workflow_with_task_output( # %% @workflow -def multiple_workflow_with_lists(list_q: List[int] = [1, 2, 3, 4, 5], list_p: List[float] = [6.0, 9.0, 8.7, 6.5, 1.2], s: float = 6.0) -> List[float]: +def multiple_workflow_with_lists( + list_q: List[int] = [1, 2, 3, 4, 5], list_p: List[float] = [6.0, 9.0, 8.7, 6.5, 1.2], s: float = 6.0 +) -> List[float]: partial_task = functools.partial(multi_input_task, shipping=s) return map_task(partial_task)(quantity=list_q, price=list_p) + # %% [markdown] # ```{note} # It is important to note that you cannot provide a list as an input to a partial task.