Skip to content

Commit

Permalink
Add current.flow field & @resources(volume) fix (#119)
Browse files Browse the repository at this point in the history
* Add flow field to current Flow

* KFP @Environment fix

* Fix @resources(volume) on foreach regression

* remove unnecessary kfp @Environment(vars) fix

Co-authored-by: Taleb Zeghmi <[email protected]>
  • Loading branch information
talebzeghmi and talebzeghmi authored Sep 15, 2021
1 parent 8ceb6d3 commit 6afc8f9
Show file tree
Hide file tree
Showing 4 changed files with 51 additions and 20 deletions.
8 changes: 8 additions & 0 deletions metaflow/current.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
from metaflow import FlowSpec


class Current(object):

Expand All @@ -13,6 +15,7 @@ def __init__(self):
self._is_running = False

def _set_env(self,
flow=None,
flow_name=None,
run_id=None,
step_name=None,
Expand All @@ -23,6 +26,7 @@ def _set_env(self,
username=None,
is_running=True):

self._flow = flow
self._flow_name = flow_name
self._run_id = run_id
self._step_name = step_name
Expand All @@ -47,6 +51,10 @@ def get(self, key, default=None):
def is_running_flow(self):
return self._is_running

@property
def flow(self) -> FlowSpec:
return self._flow

@property
def flow_name(self):
return self._flow_name
Expand Down
13 changes: 7 additions & 6 deletions metaflow/plugins/kfp/kfp.py
Original file line number Diff line number Diff line change
Expand Up @@ -421,7 +421,6 @@ def build_kfp_component(node: DAGNode, task_id: str) -> KfpComponent:
deco
for deco in node.decorators
if isinstance(deco, EnvironmentDecorator)
and "kubernetes_vars" in deco.attributes
),
None, # default
),
Expand Down Expand Up @@ -605,7 +604,7 @@ def _set_container_resources(
step_name=kfp_component.name,
size=resource_requirements["volume"],
workflow_uid=workflow_uid,
mode=dsl.VOLUME_MODE_RWO,
mode=mode,
)
container_op.add_pvolumes({volume_dir: volume})

Expand Down Expand Up @@ -660,11 +659,13 @@ def _create_volume(
workflow_uid: str,
mode: str,
) -> PipelineVolume:
volume_name = sanitize_k8s_name(step_name)
volume_name = (
sanitize_k8s_name(step_name) if mode == "ReadWriteMany" else "{{pod.name}}"
)
attribute_outputs = {"size": "{.status.capacity.storage}"}
requested_resources = V1ResourceRequirements(requests={"storage": size})
pvc_spec = V1PersistentVolumeClaimSpec(
access_modes=mode, resources=requested_resources
access_modes=dsl.VOLUME_MODE_RWO, resources=requested_resources
)
owner_reference = V1OwnerReference(
api_version="argoproj.io/v1alpha1",
Expand Down Expand Up @@ -959,7 +960,7 @@ def build_kfp_dag(
envs = kfp_component.environment_decorator.attributes[
"kubernetes_vars"
]
for env in envs:
for env in envs if envs else []:
container_op.container.add_env_variable(env)

if kfp_component.total_retries and kfp_component.total_retries > 0:
Expand Down Expand Up @@ -1144,7 +1145,7 @@ def create_shared_volumes(
step_name=f"{kfp_component.name}-shared",
size=resources["volume"],
workflow_uid=workflow_uid_op.output,
mode=dsl.VOLUME_MODE_RWO,
mode=resources["volume_mode"],
)
}
return shared_volumes
Expand Down
47 changes: 34 additions & 13 deletions metaflow/plugins/kfp/tests/flows/resources_flow.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,9 +94,7 @@ def get_env_vars(env_resources: Dict[str, str]) -> List[V1EnvVar]:

class ResourcesFlow(FlowSpec):
@resources(
local_storage="242",
cpu="0.6",
memory="1G",
local_storage="242", cpu="0.6", memory="1G",
)
@environment( # pylint: disable=E1102
vars={"MY_ENV": "value"}, kubernetes_vars=kubernetes_vars
Expand Down Expand Up @@ -132,16 +130,44 @@ def start(self):
assert os.environ.get("AI_EXPERIMENT_NAME") == "metaflow_test"

self.items = [1, 2]
self.next(self.split_step, foreach="items")
self.next(self.foreach_step, foreach="items")

@resources(volume="11G", volume_mode="ReadWriteMany")
@environment(vars={"MY_ENV": "value"}) # pylint: disable=E1102
@resources(volume="11G")
@step
def split_step(self):
def foreach_step(self):
# test simple environment var
assert os.environ.get("MY_ENV") == "value"

output = subprocess.check_output(
"df -h | grep /opt/metaflow_volume", shell=True
)
assert "11G" in str(output)

self.next(self.join_step)

@resources(volume="12G")
@step
def join_step(self, inputs):
output = subprocess.check_output(
"df -h | grep /opt/metaflow_volume", shell=True
)
assert "12G" in str(output)
self.next(self.split_step)

@step
def split_step(self):
self.items = [1, 2]
self.next(self.shared_volume_foreach_step, foreach="items")

@resources(volume="13G", volume_mode="ReadWriteMany")
@step
def shared_volume_foreach_step(self):
output = subprocess.check_output(
"df -h | grep /opt/metaflow_volume", shell=True
)
assert "13G" in str(output)

file_path = "/opt/metaflow_volume/test.txt"
message = "hello world!"

Expand All @@ -159,15 +185,10 @@ def split_step(self):
print("read_lines", read_lines)
assert message == read_lines[0]

self.next(self.join_step)
self.next(self.shared_volume_join_step)

@resources(volume="12G")
@step
def join_step(self, inputs):
output = subprocess.check_output(
"df -h | grep /opt/metaflow_volume", shell=True
)
assert "12G" in str(output)
def shared_volume_join_step(self, inputs):
self.next(self.end)

@step
Expand Down
3 changes: 2 additions & 1 deletion metaflow/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -332,7 +332,8 @@ def run_step(self,
self._init_foreach(step_name, join_type, inputs, split_index)

# 4. initialize the current singleton
current._set_env(flow_name=self.flow.name,
current._set_env(flow=self.flow,
flow_name=self.flow.name,
run_id=run_id,
step_name=step_name,
task_id=task_id,
Expand Down

0 comments on commit 6afc8f9

Please sign in to comment.