Skip to content

Commit

Permalink
Add check that auto-genned params do not have already set artifact na…
Browse files Browse the repository at this point in the history
…mes (#544)

Noticed that when adding a param to a script and then setting an
artifact as the value of that script input can result in both an
artifact and a parameter added to the template.

# Reproducible example (main)

```python
from hera.workflows import DAG, Artifact, Workflow, script


@script(outputs=Artifact(name="result", path="/tmp/result"))
def produce():
    import pickle

    result = "foo testing"
    with open("/tmp/result", "wb") as f:
        pickle.dump(result, f)


@script(inputs=Artifact(name="i", path="/tmp/i"))
def consume(i):
    import pickle

    with open("/tmp/i", "rb") as f:
        i = pickle.load(f)
    print(i)


with Workflow(generate_name="fv-test-", entrypoint="d") as w:
    with DAG(name="d"):
        p = produce()
        c = consume(arguments=Artifact(name="i", from_="{{tasks.produce.outputs.artifacts.result}}"))
        p >> c

print(w.to_yaml())
```

Above results in:
```yaml
...
      image: python:3.7
      source: "import os\nimport sys\nsys.path.append(os.getcwd())\nimport pickle\n\
        \nresult = \"foo testing\"\nwith open(\"/tmp/result\", \"wb\") as f:\n   \
        \ pickle.dump(result, f)\n"
  - inputs:
      artifacts:
      - name: i
        path: /tmp/i
      parameters:
      - name: i
    name: consume
    script:
      command:
...
```

# Post-fix (this branch, isolated to problem section)

```yaml
...
        \nresult = \"foo testing\"\nwith open(\"/tmp/result\", \"wb\") as f:\n   \
        \ pickle.dump(result, f)\n"
  - inputs:
      artifacts:
      - name: i
        path: /tmp/i
    name: consume
    script:
      command:
...
```

---------

Signed-off-by: Flaviu Vadan <[email protected]>
Signed-off-by: Sambhav Kothari <[email protected]>
Co-authored-by: Sambhav Kothari <[email protected]>
  • Loading branch information
flaviuvadan and sambhav authored Apr 2, 2023
1 parent fe6ef10 commit ef328d2
Show file tree
Hide file tree
Showing 5 changed files with 161 additions and 2 deletions.
86 changes: 86 additions & 0 deletions docs/examples/workflows/script_auto_infer.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
# Script Auto Infer






=== "Hera"

```python linenums="1"
from hera.workflows import DAG, Artifact, Workflow, script


@script(outputs=Artifact(name="result", path="/tmp/result"))
def produce():
import pickle

result = "foo testing"
with open("/tmp/result", "wb") as f:
pickle.dump(result, f)


@script(inputs=Artifact(name="i", path="/tmp/i"))
def consume(i):
import pickle

with open("/tmp/i", "rb") as f:
i = pickle.load(f)
print(i)


with Workflow(generate_name="fv-test-", entrypoint="d") as w:
with DAG(name="d"):
p = produce()
c = consume(arguments=Artifact(name="i", from_="{{tasks.produce.outputs.artifacts.result}}"))
p >> c
```

=== "YAML"

```yaml linenums="1"
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
generateName: fv-test-
spec:
entrypoint: d
templates:
- dag:
tasks:
- name: produce
template: produce
- arguments:
artifacts:
- from: '{{tasks.produce.outputs.artifacts.result}}'
name: i
depends: produce
name: consume
template: consume
name: d
- name: produce
outputs:
artifacts:
- name: result
path: /tmp/result
script:
command:
- python
image: python:3.7
source: "import os\nimport sys\nsys.path.append(os.getcwd())\nimport pickle\n\
\nresult = \"foo testing\"\nwith open(\"/tmp/result\", \"wb\") as f:\n \
\ pickle.dump(result, f)\n"
- inputs:
artifacts:
- name: i
path: /tmp/i
name: consume
script:
command:
- python
image: python:3.7
source: "import os\nimport sys\nsys.path.append(os.getcwd())\nimport json\n\n\
import pickle\n\nwith open(\"/tmp/i\", \"rb\") as f:\n i = pickle.load(f)\n\
print(i)\n"
```

43 changes: 43 additions & 0 deletions examples/workflows/script-auto-infer.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
generateName: fv-test-
spec:
entrypoint: d
templates:
- dag:
tasks:
- name: produce
template: produce
- arguments:
artifacts:
- from: '{{tasks.produce.outputs.artifacts.result}}'
name: i
depends: produce
name: consume
template: consume
name: d
- name: produce
outputs:
artifacts:
- name: result
path: /tmp/result
script:
command:
- python
image: python:3.7
source: "import os\nimport sys\nsys.path.append(os.getcwd())\nimport pickle\n\
\nresult = \"foo testing\"\nwith open(\"/tmp/result\", \"wb\") as f:\n \
\ pickle.dump(result, f)\n"
- inputs:
artifacts:
- name: i
path: /tmp/i
name: consume
script:
command:
- python
image: python:3.7
source: "import os\nimport sys\nsys.path.append(os.getcwd())\nimport json\n\n\
import pickle\n\nwith open(\"/tmp/i\", \"rb\") as f:\n i = pickle.load(f)\n\
print(i)\n"
26 changes: 26 additions & 0 deletions examples/workflows/script_auto_infer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
from hera.workflows import DAG, Artifact, Workflow, script


@script(outputs=Artifact(name="result", path="/tmp/result"))
def produce():
import pickle

result = "foo testing"
with open("/tmp/result", "wb") as f:
pickle.dump(result, f)


@script(inputs=Artifact(name="i", path="/tmp/i"))
def consume(i):
import pickle

with open("/tmp/i", "rb") as f:
i = pickle.load(f)
print(i)


with Workflow(generate_name="fv-test-", entrypoint="d") as w:
with DAG(name="d"):
p = produce()
c = consume(arguments=Artifact(name="i", from_="{{tasks.produce.outputs.artifacts.result}}"))
p >> c
5 changes: 4 additions & 1 deletion src/hera/workflows/_mixins.py
Original file line number Diff line number Diff line change
Expand Up @@ -436,8 +436,11 @@ def __call__(self, *args, **kwargs) -> Optional[SubNodeMixin]:
# these are the already set parameters. If a users has already set a parameter argument, then Hera
# uses the user-provided value rather than the inferred value
arguments = self.arguments if isinstance(self.arguments, list) else [self.arguments] # type: ignore
arguments = list(filter(lambda x: x is not None, arguments))
parameters = [arg for arg in arguments if isinstance(arg, ModelParameter) or isinstance(arg, Parameter)]
parameter_names = {p.name for p in parameters}
artifacts = [arg for arg in arguments if isinstance(arg, ModelArtifact) or isinstance(arg, Artifact)]
artifact_names = {a.name for a in artifacts}
if "source" in kwargs and "with_param" in kwargs:
# Argo uses the `inputs` field to indicate the expected parameters of a specific template whereas the
# `arguments` are used to indicate exactly what _values_ are assigned to the set inputs. Here,
Expand All @@ -447,7 +450,7 @@ def __call__(self, *args, **kwargs) -> Optional[SubNodeMixin]:
new_parameters = _get_params_from_source(kwargs["source"])
if new_parameters is not None:
for p in new_parameters:
if p.name not in parameter_names:
if p.name not in parameter_names and p.name not in artifact_names:
arguments.append(p)
elif "source" in kwargs and "with_items" in kwargs:
# similarly to the above, we can infer the arguments to create based on the content of `with_items`.
Expand Down
3 changes: 2 additions & 1 deletion src/hera/workflows/script.py
Original file line number Diff line number Diff line change
Expand Up @@ -150,8 +150,9 @@ def _build_inputs(self) -> Optional[ModelInputs]:
inputs = ModelInputs(parameters=func_parameters)

already_set_params = {p.name for p in inputs.parameters or []}
already_set_artifacts = {p.name for p in inputs.artifacts or []}
for param in func_parameters:
if param.name not in already_set_params:
if param.name not in already_set_params and param.name not in already_set_artifacts:
inputs.parameters = [param] if inputs.parameters is None else inputs.parameters + [param]
return inputs

Expand Down

0 comments on commit ef328d2

Please sign in to comment.