diff --git a/metaflow/current.py b/metaflow/current.py index e3a9ff16882..d16b4454ed8 100644 --- a/metaflow/current.py +++ b/metaflow/current.py @@ -1,3 +1,5 @@ +from metaflow import FlowSpec + class Current(object): @@ -13,6 +15,7 @@ def __init__(self): self._is_running = False def _set_env(self, + flow=None, flow_name=None, run_id=None, step_name=None, @@ -23,6 +26,7 @@ def _set_env(self, username=None, is_running=True): + self._flow = flow self._flow_name = flow_name self._run_id = run_id self._step_name = step_name @@ -47,6 +51,10 @@ def get(self, key, default=None): def is_running_flow(self): return self._is_running + @property + def flow(self) -> FlowSpec: + return self._flow + @property def flow_name(self): return self._flow_name diff --git a/metaflow/plugins/kfp/kfp.py b/metaflow/plugins/kfp/kfp.py index 67510f84ff1..ca78f099049 100644 --- a/metaflow/plugins/kfp/kfp.py +++ b/metaflow/plugins/kfp/kfp.py @@ -421,7 +421,6 @@ def build_kfp_component(node: DAGNode, task_id: str) -> KfpComponent: deco for deco in node.decorators if isinstance(deco, EnvironmentDecorator) - and "kubernetes_vars" in deco.attributes ), None, # default ), @@ -605,7 +604,7 @@ def _set_container_resources( step_name=kfp_component.name, size=resource_requirements["volume"], workflow_uid=workflow_uid, - mode=dsl.VOLUME_MODE_RWO, + mode=mode, ) container_op.add_pvolumes({volume_dir: volume}) @@ -660,11 +659,13 @@ def _create_volume( workflow_uid: str, mode: str, ) -> PipelineVolume: - volume_name = sanitize_k8s_name(step_name) + volume_name = ( + sanitize_k8s_name(step_name) if mode == "ReadWriteMany" else "{{pod.name}}" + ) attribute_outputs = {"size": "{.status.capacity.storage}"} requested_resources = V1ResourceRequirements(requests={"storage": size}) pvc_spec = V1PersistentVolumeClaimSpec( - access_modes=mode, resources=requested_resources + access_modes=dsl.VOLUME_MODE_RWO, resources=requested_resources ) owner_reference = V1OwnerReference( api_version="argoproj.io/v1alpha1", @@ -959,7 +960,7 @@ def build_kfp_dag( envs = kfp_component.environment_decorator.attributes[ "kubernetes_vars" ] - for env in envs: + for env in envs if envs else []: container_op.container.add_env_variable(env) if kfp_component.total_retries and kfp_component.total_retries > 0: @@ -1144,7 +1145,7 @@ def create_shared_volumes( step_name=f"{kfp_component.name}-shared", size=resources["volume"], workflow_uid=workflow_uid_op.output, - mode=dsl.VOLUME_MODE_RWO, + mode=resources["volume_mode"], ) } return shared_volumes diff --git a/metaflow/plugins/kfp/tests/flows/resources_flow.py b/metaflow/plugins/kfp/tests/flows/resources_flow.py index bcdf2c0d4b4..6086c080f20 100644 --- a/metaflow/plugins/kfp/tests/flows/resources_flow.py +++ b/metaflow/plugins/kfp/tests/flows/resources_flow.py @@ -94,9 +94,7 @@ def get_env_vars(env_resources: Dict[str, str]) -> List[V1EnvVar]: class ResourcesFlow(FlowSpec): @resources( - local_storage="242", - cpu="0.6", - memory="1G", + local_storage="242", cpu="0.6", memory="1G", ) @environment( # pylint: disable=E1102 vars={"MY_ENV": "value"}, kubernetes_vars=kubernetes_vars @@ -132,16 +130,44 @@ def start(self): assert os.environ.get("AI_EXPERIMENT_NAME") == "metaflow_test" self.items = [1, 2] - self.next(self.split_step, foreach="items") + self.next(self.foreach_step, foreach="items") - @resources(volume="11G", volume_mode="ReadWriteMany") + @environment(vars={"MY_ENV": "value"}) # pylint: disable=E1102 + @resources(volume="11G") @step - def split_step(self): + def foreach_step(self): + # test simple environment var + assert os.environ.get("MY_ENV") == "value" + output = subprocess.check_output( "df -h | grep /opt/metaflow_volume", shell=True ) assert "11G" in str(output) + self.next(self.join_step) + + @resources(volume="12G") + @step + def join_step(self, inputs): + output = subprocess.check_output( + "df -h | grep /opt/metaflow_volume", shell=True + ) + assert "12G" in str(output) + self.next(self.split_step) + + @step + def split_step(self): + self.items = [1, 2] + self.next(self.shared_volume_foreach_step, foreach="items") + + @resources(volume="13G", volume_mode="ReadWriteMany") + @step + def shared_volume_foreach_step(self): + output = subprocess.check_output( + "df -h | grep /opt/metaflow_volume", shell=True + ) + assert "13G" in str(output) + file_path = "/opt/metaflow_volume/test.txt" message = "hello world!" @@ -159,15 +185,10 @@ def split_step(self): print("read_lines", read_lines) assert message == read_lines[0] - self.next(self.join_step) + self.next(self.shared_volume_join_step) - @resources(volume="12G") @step - def join_step(self, inputs): - output = subprocess.check_output( - "df -h | grep /opt/metaflow_volume", shell=True - ) - assert "12G" in str(output) + def shared_volume_join_step(self, inputs): self.next(self.end) @step diff --git a/metaflow/task.py b/metaflow/task.py index 389aa7f8a30..65596eef5f2 100644 --- a/metaflow/task.py +++ b/metaflow/task.py @@ -332,7 +332,8 @@ def run_step(self, self._init_foreach(step_name, join_type, inputs, split_index) # 4. initialize the current singleton - current._set_env(flow_name=self.flow.name, + current._set_env(flow=self.flow, + flow_name=self.flow.name, run_id=run_id, step_name=step_name, task_id=task_id,