-
Notifications
You must be signed in to change notification settings - Fork 1
/
jobs.py
35 lines (26 loc) · 1.37 KB
/
jobs.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
from dagster import job
from src import ops_no_prio, ops_with_prio
@job
def static_job():
pokemon_list = ops_no_prio.static_extract()
pokemon_list = ops_no_prio.non_prio_transform_1(pokemon_list)
pokemon_list = ops_no_prio.non_prio_transform_2(pokemon_list)
ops_no_prio.non_prio_load(pokemon_list)
# max_concurrent is not really necessary here -- it's used to show the difference of prio vs. non-prio for beefier machines
@job(config={"execution": {"config": {"multiprocess": {"max_concurrent": 4}}}})
def dynamic_job():
def _for_each(pokemon_batch):
pokemon_batch = ops_no_prio.non_prio_transform_1(pokemon_batch)
pokemon_batch = ops_no_prio.non_prio_transform_2(pokemon_batch)
ops_no_prio.non_prio_load(pokemon_batch)
pokemon_list = ops_no_prio.non_prio_dynamic_extract()
pokemon_list.map(_for_each)
# max_concurrent is not really necessary here -- it's used to show the difference of prio vs. non-prio for beefier machines
@job(config={"execution": {"config": {"multiprocess": {"max_concurrent": 4}}}})
def prio_dynamic_job():
def _for_each(pokemon_batch):
pokemon_batch = ops_with_prio.prio_transform_1(pokemon_batch)
pokemon_batch = ops_with_prio.prio_transform_2(pokemon_batch)
ops_with_prio.prio_load(pokemon_batch)
pokemon_list = ops_with_prio.prio_dynamic_extract()
pokemon_list.map(_for_each)