Skip to content

Commit 7fafd29

Browse files
authored
Add a parquet/pandas example (#1431)
* Add admonition extension for the "note" formatting --------- Signed-off-by: Elliot Gunton <[email protected]>
1 parent ce684a5 commit 7fafd29

File tree

5 files changed

+367
-2
lines changed

5 files changed

+367
-2
lines changed
Lines changed: 175 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,175 @@
1+
# Parquet Pandas
2+
3+
4+
5+
This example shows how to use the custom serialisation features for third party libraries like Pandas. It also shows
6+
how we can easily use big file formats like parquet through Hera Artifact annotations.
7+
8+
!!! note
9+
This example requires `pandas`, `pyarrow` and `hera` itself to run. You will need to provide an image containing
10+
these dependencies.
11+
12+
13+
=== "Hera"
14+
15+
```python linenums="1"
16+
import io
17+
from pathlib import Path
18+
from typing import Annotated
19+
20+
from pandas import DataFrame, read_parquet
21+
22+
from hera.workflows import (
23+
DAG,
24+
Artifact,
25+
NoneArchiveStrategy,
26+
Task,
27+
Workflow,
28+
script,
29+
)
30+
31+
32+
@script(constructor="runner")
33+
def create_dataframe() -> Annotated[
34+
DataFrame,
35+
Artifact(
36+
name="dataset",
37+
dumpb=lambda df: df.to_parquet(),
38+
archive=NoneArchiveStrategy(),
39+
),
40+
]:
41+
# Here, we create some dummy data, and return it as a DataFrame.
42+
# The `dumpb` function from the `Artifact` annotation handles
43+
# the conversion to parquet format!
44+
data = {
45+
"age": [23, 19, 43, 65, 72],
46+
"height": [1.63, 1.82, 1.77, 1.59, 1.61],
47+
}
48+
return DataFrame(data)
49+
50+
51+
@script(constructor="runner")
52+
def loadb_dataframe(
53+
dataframe: Annotated[
54+
DataFrame,
55+
Artifact(name="dataset", loadb=lambda bytes_: read_parquet(io.BytesIO(bytes_))),
56+
],
57+
):
58+
# Using a `loadb` function here is possible but not very clear
59+
# as we have to create a buffered reader for `read_parquet`
60+
print(dataframe)
61+
62+
63+
@script(constructor="runner")
64+
def load_dataframe_from_path(
65+
dataframe_path: Annotated[
66+
Path,
67+
Artifact(name="dataset"),
68+
],
69+
):
70+
# Instead, we can read the parquet file from a path, which is easier to understand. Hera
71+
# automatically loads the `Path` of an Artifact to your function argument, so you can
72+
# read the contents of the Artifact file however you want!
73+
dataframe = read_parquet(dataframe_path)
74+
print(dataframe)
75+
76+
77+
with Workflow(generate_name="pandas-example-", entrypoint="d", service_account_name="argo") as w:
78+
with DAG(name="d"):
79+
create_task: Task = create_dataframe()
80+
consume_task_1 = loadb_dataframe(
81+
arguments={
82+
"dataset": create_task.get_artifact("dataset"),
83+
},
84+
)
85+
consume_task_2 = load_dataframe_from_path(
86+
arguments={
87+
"dataset": create_task.get_artifact("dataset"),
88+
},
89+
)
90+
91+
create_task >> [consume_task_1, consume_task_2]
92+
```
93+
94+
=== "YAML"
95+
96+
```yaml linenums="1"
97+
apiVersion: argoproj.io/v1alpha1
98+
kind: Workflow
99+
metadata:
100+
generateName: pandas-example-
101+
spec:
102+
entrypoint: d
103+
serviceAccountName: argo
104+
templates:
105+
- name: d
106+
dag:
107+
tasks:
108+
- name: create-dataframe
109+
template: create-dataframe
110+
- name: loadb-dataframe
111+
depends: create-dataframe
112+
template: loadb-dataframe
113+
arguments:
114+
artifacts:
115+
- name: dataset
116+
from: '{{tasks.create-dataframe.outputs.artifacts.dataset}}'
117+
- name: load-dataframe-from-path
118+
depends: create-dataframe
119+
template: load-dataframe-from-path
120+
arguments:
121+
artifacts:
122+
- name: dataset
123+
from: '{{tasks.create-dataframe.outputs.artifacts.dataset}}'
124+
- name: create-dataframe
125+
outputs:
126+
artifacts:
127+
- name: dataset
128+
path: /tmp/hera-outputs/artifacts/dataset
129+
archive:
130+
none: {}
131+
script:
132+
image: python:3.9
133+
source: '{{inputs.parameters}}'
134+
args:
135+
- -m
136+
- hera.workflows.runner
137+
- -e
138+
- examples.workflows.use_cases.parquet_pandas:create_dataframe
139+
command:
140+
- python
141+
env:
142+
- name: hera__outputs_directory
143+
value: /tmp/hera-outputs
144+
- name: loadb-dataframe
145+
inputs:
146+
artifacts:
147+
- name: dataset
148+
path: /tmp/hera-inputs/artifacts/dataset
149+
script:
150+
image: python:3.9
151+
source: '{{inputs.parameters}}'
152+
args:
153+
- -m
154+
- hera.workflows.runner
155+
- -e
156+
- examples.workflows.use_cases.parquet_pandas:loadb_dataframe
157+
command:
158+
- python
159+
- name: load-dataframe-from-path
160+
inputs:
161+
artifacts:
162+
- name: dataset
163+
path: /tmp/hera-inputs/artifacts/dataset
164+
script:
165+
image: python:3.9
166+
source: '{{inputs.parameters}}'
167+
args:
168+
- -m
169+
- hera.workflows.runner
170+
- -e
171+
- examples.workflows.use_cases.parquet_pandas:load_dataframe_from_path
172+
command:
173+
- python
174+
```
175+
Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,77 @@
1+
apiVersion: argoproj.io/v1alpha1
2+
kind: Workflow
3+
metadata:
4+
generateName: pandas-example-
5+
spec:
6+
entrypoint: d
7+
serviceAccountName: argo
8+
templates:
9+
- name: d
10+
dag:
11+
tasks:
12+
- name: create-dataframe
13+
template: create-dataframe
14+
- name: loadb-dataframe
15+
depends: create-dataframe
16+
template: loadb-dataframe
17+
arguments:
18+
artifacts:
19+
- name: dataset
20+
from: '{{tasks.create-dataframe.outputs.artifacts.dataset}}'
21+
- name: load-dataframe-from-path
22+
depends: create-dataframe
23+
template: load-dataframe-from-path
24+
arguments:
25+
artifacts:
26+
- name: dataset
27+
from: '{{tasks.create-dataframe.outputs.artifacts.dataset}}'
28+
- name: create-dataframe
29+
outputs:
30+
artifacts:
31+
- name: dataset
32+
path: /tmp/hera-outputs/artifacts/dataset
33+
archive:
34+
none: {}
35+
script:
36+
image: python:3.9
37+
source: '{{inputs.parameters}}'
38+
args:
39+
- -m
40+
- hera.workflows.runner
41+
- -e
42+
- examples.workflows.use_cases.parquet_pandas:create_dataframe
43+
command:
44+
- python
45+
env:
46+
- name: hera__outputs_directory
47+
value: /tmp/hera-outputs
48+
- name: loadb-dataframe
49+
inputs:
50+
artifacts:
51+
- name: dataset
52+
path: /tmp/hera-inputs/artifacts/dataset
53+
script:
54+
image: python:3.9
55+
source: '{{inputs.parameters}}'
56+
args:
57+
- -m
58+
- hera.workflows.runner
59+
- -e
60+
- examples.workflows.use_cases.parquet_pandas:loadb_dataframe
61+
command:
62+
- python
63+
- name: load-dataframe-from-path
64+
inputs:
65+
artifacts:
66+
- name: dataset
67+
path: /tmp/hera-inputs/artifacts/dataset
68+
script:
69+
image: python:3.9
70+
source: '{{inputs.parameters}}'
71+
args:
72+
- -m
73+
- hera.workflows.runner
74+
- -e
75+
- examples.workflows.use_cases.parquet_pandas:load_dataframe_from_path
76+
command:
77+
- python
Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,84 @@
1+
"""This example shows how to use the custom serialisation features for third party libraries like Pandas. It also shows
2+
how we can easily use big file formats like parquet through Hera Artifact annotations.
3+
4+
!!! note
5+
This example requires `pandas`, `pyarrow` and `hera` itself to run. You will need to provide an image containing
6+
these dependencies.
7+
"""
8+
9+
import io
10+
from pathlib import Path
11+
from typing import Annotated
12+
13+
from pandas import DataFrame, read_parquet
14+
15+
from hera.workflows import (
16+
DAG,
17+
Artifact,
18+
NoneArchiveStrategy,
19+
Task,
20+
Workflow,
21+
script,
22+
)
23+
24+
25+
@script(constructor="runner")
26+
def create_dataframe() -> Annotated[
27+
DataFrame,
28+
Artifact(
29+
name="dataset",
30+
dumpb=lambda df: df.to_parquet(),
31+
archive=NoneArchiveStrategy(),
32+
),
33+
]:
34+
# Here, we create some dummy data, and return it as a DataFrame.
35+
# The `dumpb` function from the `Artifact` annotation handles
36+
# the conversion to parquet format!
37+
data = {
38+
"age": [23, 19, 43, 65, 72],
39+
"height": [1.63, 1.82, 1.77, 1.59, 1.61],
40+
}
41+
return DataFrame(data)
42+
43+
44+
@script(constructor="runner")
45+
def loadb_dataframe(
46+
dataframe: Annotated[
47+
DataFrame,
48+
Artifact(name="dataset", loadb=lambda bytes_: read_parquet(io.BytesIO(bytes_))),
49+
],
50+
):
51+
# Using a `loadb` function here is possible but not very clear
52+
# as we have to create a buffered reader for `read_parquet`
53+
print(dataframe)
54+
55+
56+
@script(constructor="runner")
57+
def load_dataframe_from_path(
58+
dataframe_path: Annotated[
59+
Path,
60+
Artifact(name="dataset"),
61+
],
62+
):
63+
# Instead, we can read the parquet file from a path, which is easier to understand. Hera
64+
# automatically loads the `Path` of an Artifact to your function argument, so you can
65+
# read the contents of the Artifact file however you want!
66+
dataframe = read_parquet(dataframe_path)
67+
print(dataframe)
68+
69+
70+
with Workflow(generate_name="pandas-example-", entrypoint="d", service_account_name="argo") as w:
71+
with DAG(name="d"):
72+
create_task: Task = create_dataframe()
73+
consume_task_1 = loadb_dataframe(
74+
arguments={
75+
"dataset": create_task.get_artifact("dataset"),
76+
},
77+
)
78+
consume_task_2 = load_dataframe_from_path(
79+
arguments={
80+
"dataset": create_task.get_artifact("dataset"),
81+
},
82+
)
83+
84+
create_task >> [consume_task_1, consume_task_2]

mkdocs.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -146,6 +146,7 @@ extra:
146146
provider: mike
147147

148148
markdown_extensions:
149+
- admonition
149150
- smarty
150151
- attr_list
151152
- mkdocs-click

0 commit comments

Comments
 (0)