Fan-out with independent Volume for each branch #539
-
Hi, I would like to create a workflow with a fan-out where subsequent tasks share a volume (unique to each branch). The DAG would look something like:
For example, Tasks B1, C1 share Volume 1; Tasks B2, C2 share Volume 2, etc. I know I can define a volume to be used by two subsequent tasks as follows: from hera import Task, Workflow, Volume,
with Workflow("volumes-pvc-hera-", generate_name=True) as w:
volumes = [Volume(size="1Gi", mount_path="/mnt/vol")]
generate = Task(
"generate",
image="docker/whalesay:latest",
command=["sh", "-c"],
args=[
"echo generating message in volume; cowsay hello world | tee /mnt/vol/hello_world.txt"
],
volumes=volumes,
)
print = Task(
"print",
image="alpine:latest",
command=["sh", "-c"],
args=[
"echo getting message from volume; find /mnt/vol; cat /mnt/vol/hello_world.txt"
],
volumes=volumes,
)
generate >> print As noted above, one needs the reference to the volume for each subsequent task. My question is: how to generalise the example above for the fan-out case? Thanks! |
Beta Was this translation helpful? Give feedback.
Replies: 3 comments 9 replies
-
I have also a minimal example below: def A():
import json
import sys
# this can be anything! e.g fetch from some API, then in parallel process all entities; chunk database records
# and process them in parallel, etc.
json.dump([i for i in range(10)], sys.stdout)
def B(value: int):
print(f"Received value: {value}!")
with open(f"/mnt/vol/{value}.txt", 'w') as f:
f.write("Values!")
def C(value: int):
import pathlib
# do something with value and files in /mnt/vol
print("Files in /mnt/vol:")
print(tuple(pathlib.Path("/mnt/vol").iterdir()))
# assumes you used `hera.set_global_token` and `hera.set_global_host` so that the workflow can be submitted
with Workflow("dynamic-fanout-", generate_name=True) as w:
volumes = [Volume(size="1Gi", mount_path="/mnt/vol")]
A_task = Task("a", A)
B_task = Task("b", B, with_param=A_task.get_result(), volumes=volumes)
C_task = Task("c", C, with_param=A_task.get_result(), volumes=volumes)
A_task >> B_task >> C_task When I execute this, the shared volume is the same across the branches, and Task C only starts once all Task B's have completed. My questions are:
|
Beta Was this translation helpful? Give feedback.
-
I think I was able to solve question 1 using a parallel DAG as follows def A():
import json
import sys
# this can be anything! e.g fetch from some API, then in parallel process all entities; chunk database records
# and process them in parallel, etc.
json.dump([i for i in range(10)], sys.stdout)
def B(value: int):
import pathlib
pathlib.Path("/mnt/vol").mkdir(parents=True, exist_ok=True)
print(f"Received value: {value}!")
with open(f"/mnt/vol/{value}.txt", "w") as f:
f.write("Values!")
def C(value: int):
import pathlib
# do something with value and files in /mnt/vol
print("Files in /mnt/vol:")
print(tuple(pathlib.Path("/mnt/vol").iterdir()))
# assumes you used `hera.set_global_token` and `hera.set_global_host` so that the workflow can be submitted
with Workflow("dynamic-fanout-", generate_name=True, service=service, service_account_name=sa) as w:
with DAG("pipeline", inputs=[Parameter("value")], outputs=[Parameter("value")]) as pipeline:
volumes = [Volume(size="1Gi", mount_path="/mnt/vol")]
B_task = Task("b", B, inputs=[pipeline.get_parameter("value")], volumes=volumes)
C_task = Task("c", C, inputs=[pipeline.get_parameter("value")], volumes=volumes)
B_task >> C_task
pipeline.outputs = [C_task.get_result_as("output")]
A_task = Task("a", A)
parallel_task = Task("parallel-pipeline", dag=pipeline, with_param=A_task.get_result())
A_task >> parallel_task Still, the same volume is shared across all branches, so question (2) remains. |
Beta Was this translation helpful? Give feedback.
-
Hi @eduardohenriquearnold! Thanks for the question :) I think this is actually a limitation of Argo Workflows. When volumes are created dynamically they are set at the workflow level. So, Argo tells K8S about the PV/PVCs it needs to create. Those PV/PVCs, depending on your cluster setup, either instantly create the volumes when the workflow is created or when the pod is created/ready to mount the volume. To my knowledge, there's no way for Argo to know how many tasks there will be in the fanout, so it does not know what volumes to create. Now, I think there might be ways around this. I have not experimented with this so take it with a grain of salt. This idea is also conditioned on your application knowing how many tasks there will be a fanout over:
I think the above might work? I don't think this is fully possible in Hera at the moment because we had an accidental regression and we cannot set volumes at the workflow level anymore. However, it will for sure be possible in the upcoming Hera V5, where you will get full feature parity with Argo Workflows + Events! |
Beta Was this translation helpful? Give feedback.
Hi @eduardohenriquearnold! Thanks for the question :) I think this is actually a limitation of Argo Workflows. When volumes are created dynamically they are set at the workflow level. So, Argo tells K8S about the PV/PVCs it needs to create. Those PV/PVCs, depending on your cluster setup, either instantly create the volumes when the workflow is created or when the pod is created/ready to mount the volume. To my knowledge, there's no way for Argo to know how many tasks there will be in the fanout, so it does not know what volumes to create. Now, I think there might be ways around this. I have not experimented with this so take it with a grain of salt. This idea is also conditioned on your a…