From e8bfc13b699d7d06f8ed407bcfec9b9372d53328 Mon Sep 17 00:00:00 2001 From: Ryan McGinty Date: Tue, 11 Jun 2024 13:23:49 -0700 Subject: [PATCH 1/3] WIP --- .../handlers/demand/context_manager.py | 16 ++++++++++++++-- .../handlers/demand/model.py | 3 +++ .../handlers/demand/scaffolding.py | 19 ++++++++++++++++++- 3 files changed, 35 insertions(+), 3 deletions(-) diff --git a/src/aibs_informatics_aws_lambda/handlers/demand/context_manager.py b/src/aibs_informatics_aws_lambda/handlers/demand/context_manager.py index ea08708..3203638 100644 --- a/src/aibs_informatics_aws_lambda/handlers/demand/context_manager.py +++ b/src/aibs_informatics_aws_lambda/handlers/demand/context_manager.py @@ -99,6 +99,7 @@ class DemandExecutionContextManager: demand_execution: DemandExecution scratch_vol_configuration: BatchEFSConfiguration shared_vol_configuration: BatchEFSConfiguration + tmp_vol_configuration: Optional[BatchEFSConfiguration] = None env_base: EnvBase = field(default_factory=EnvBase.from_env) def __post_init__(self): @@ -136,6 +137,8 @@ def container_tmp_path(self) -> Path: Returns: Path: container path for tmp volume """ + if self.tmp_vol_configuration: + return self.tmp_vol_configuration.mount_point_config.mount_point return self.scratch_vol_configuration.mount_point_config.as_mounted_path("tmp") @property @@ -193,10 +196,13 @@ def efs_mount_points(self) -> List[MountPointConfiguration]: Returns: List[MountPointConfiguration]: list of mount point configurations """ - return [ + mpcs = [ self.scratch_vol_configuration.mount_point_config, self.shared_vol_configuration.mount_point_config, ] + if self.tmp_vol_configuration: + mpcs.append(self.tmp_vol_configuration.mount_point_config) + return mpcs @property def batch_job_builder(self) -> BatchJobBuilder: @@ -259,12 +265,14 @@ def from_demand_execution(cls, demand_execution: DemandExecution, env_base: EnvB access_point_name=EFS_SHARED_ACCESS_POINT_NAME, read_only=True, ) + tmp_vol_configuration = None logger.info(f"Using following efs configuration: {vol_configuration}") return DemandExecutionContextManager( demand_execution=demand_execution, scratch_vol_configuration=vol_configuration, shared_vol_configuration=shared_vol_configuration, + tmp_vol_configuration=tmp_vol_configuration, env_base=env_base, ) @@ -310,6 +318,7 @@ def update_demand_execution_parameter_inputs( demand_execution = demand_execution.copy() execution_params = demand_execution.execution_parameters + # TODO: we should allow for the ability to specify the local path for the input updated_params = { param.name: Resolvable( local=(container_shared_path / sha256_hexdigest(param.remote_value)).as_posix(), @@ -403,16 +412,19 @@ def get_batch_efs_configuration( def generate_batch_job_builder( demand_execution: DemandExecution, + env_base: EnvBase, working_path: EFSPath, tmp_path: EFSPath, scratch_mount_point: MountPointConfiguration, shared_mount_point: MountPointConfiguration, - env_base: EnvBase, + tmp_mount_point: Optional[MountPointConfiguration] = None, ) -> BatchJobBuilder: logger.info(f"Constructing BatchJobBuilder instance") demand_execution = demand_execution.copy() efs_mount_points = [scratch_mount_point, shared_mount_point] + if tmp_mount_point: + efs_mount_points.append(tmp_mount_point) logger.info(f"Resolving local paths of working dir = {working_path} and tmp dir = {tmp_path}") container_working_path = get_local_path(working_path, mount_points=efs_mount_points) container_tmp_path = get_local_path(tmp_path, mount_points=efs_mount_points) diff --git a/src/aibs_informatics_aws_lambda/handlers/demand/model.py b/src/aibs_informatics_aws_lambda/handlers/demand/model.py index e88096a..e55d7e8 100644 --- a/src/aibs_informatics_aws_lambda/handlers/demand/model.py +++ b/src/aibs_informatics_aws_lambda/handlers/demand/model.py @@ -33,6 +33,9 @@ class DemandFileSystemConfigurations(SchemaModel): scratch: FileSystemConfiguration = custom_field( mm_field=FileSystemConfiguration.as_mm_field(), default_factory=FileSystemConfiguration ) + tmp: Optional[FileSystemConfiguration] = custom_field( + mm_field=FileSystemConfiguration.as_mm_field(), default=None + ) @dataclass diff --git a/src/aibs_informatics_aws_lambda/handlers/demand/scaffolding.py b/src/aibs_informatics_aws_lambda/handlers/demand/scaffolding.py index 06ec316..f7f6faa 100644 --- a/src/aibs_informatics_aws_lambda/handlers/demand/scaffolding.py +++ b/src/aibs_informatics_aws_lambda/handlers/demand/scaffolding.py @@ -8,6 +8,8 @@ EFS_SCRATCH_PATH, EFS_SHARED_ACCESS_POINT_NAME, EFS_SHARED_PATH, + EFS_TMP_ACCESS_POINT_NAME, + EFS_TMP_PATH, ) from aibs_informatics_aws_utils.efs import MountPointConfiguration from aibs_informatics_core.env import EnvBase @@ -56,13 +58,28 @@ def handle(self, request: PrepareDemandScaffoldingRequest) -> PrepareDemandScaff read_only=True, ) + if request.file_system_configurations.tmp is not None: + tmp_vol_configuration = construct_batch_efs_configuration( + env_base=self.env_base, + file_system=request.file_system_configurations.tmp.file_system, + access_point=request.file_system_configurations.tmp.access_point + if request.file_system_configurations.tmp.access_point + else EFS_TMP_ACCESS_POINT_NAME, + container_path=request.file_system_configurations.tmp.container_path + if request.file_system_configurations.tmp.container_path + else f"/opt/efs{EFS_TMP_PATH}", + read_only=False, + ) + else: + tmp_vol_configuration = None + context_manager = DemandExecutionContextManager( demand_execution=request.demand_execution, scratch_vol_configuration=scratch_vol_configuration, shared_vol_configuration=shared_vol_configuration, + tmp_vol_configuration=tmp_vol_configuration, env_base=self.env_base, ) - batch_job_builder = context_manager.batch_job_builder self.setup_file_system(context_manager) From 573b33cf2ee479bddc9afffb02f45a749dc0847c Mon Sep 17 00:00:00 2001 From: Ryan McGinty Date: Fri, 14 Jun 2024 11:22:19 -0700 Subject: [PATCH 2/3] update context manager w/ new tmp vol and config options --- .../handlers/data_sync/file_system.py | 3 +- .../handlers/demand/context_manager.py | 131 ++++++---- .../handlers/demand/model.py | 26 +- .../handlers/demand/test_context_manager.py | 228 +++++++++++++++++- 4 files changed, 339 insertions(+), 49 deletions(-) diff --git a/src/aibs_informatics_aws_lambda/handlers/data_sync/file_system.py b/src/aibs_informatics_aws_lambda/handlers/data_sync/file_system.py index 748d08f..4b24b0d 100644 --- a/src/aibs_informatics_aws_lambda/handlers/data_sync/file_system.py +++ b/src/aibs_informatics_aws_lambda/handlers/data_sync/file_system.py @@ -1,10 +1,11 @@ import logging from datetime import timedelta +from pathlib import Path from typing import List, TypeVar from aibs_informatics_aws_utils.data_sync.file_system import BaseFileSystem, Node, get_file_system from aibs_informatics_aws_utils.efs import detect_mount_points, get_local_path -from aibs_informatics_core.models.aws.efs import EFSPath, Path +from aibs_informatics_core.models.aws.efs import EFSPath from aibs_informatics_core.models.aws.s3 import S3URI from aibs_informatics_core.utils.file_operations import get_path_size_bytes, remove_path diff --git a/src/aibs_informatics_aws_lambda/handlers/demand/context_manager.py b/src/aibs_informatics_aws_lambda/handlers/demand/context_manager.py index 3203638..056097b 100644 --- a/src/aibs_informatics_aws_lambda/handlers/demand/context_manager.py +++ b/src/aibs_informatics_aws_lambda/handlers/demand/context_manager.py @@ -1,6 +1,6 @@ import logging -import os import re +import sys from copy import deepcopy from dataclasses import dataclass, field from pathlib import Path @@ -35,6 +35,11 @@ from aibs_informatics_core.utils.os_operations import write_env_file from aibs_informatics_core.utils.units import BYTES_PER_GIBIBYTE +from aibs_informatics_aws_lambda.handlers.demand.model import ( + ContextManagerConfiguration, + EnvFileWriteMode, +) + if TYPE_CHECKING: # pragma: no cover from mypy_boto3_batch.type_defs import ( EFSVolumeConfigurationTypeDef, @@ -100,13 +105,14 @@ class DemandExecutionContextManager: scratch_vol_configuration: BatchEFSConfiguration shared_vol_configuration: BatchEFSConfiguration tmp_vol_configuration: Optional[BatchEFSConfiguration] = None + configuration: ContextManagerConfiguration = field(default_factory=ContextManagerConfiguration) env_base: EnvBase = field(default_factory=EnvBase.from_env) def __post_init__(self): self._batch_job_builder = None self.demand_execution = update_demand_execution_parameter_inputs( - self.demand_execution, self.container_shared_path + self.demand_execution, self.container_shared_path, self.configuration.isolate_inputs ) self.demand_execution = update_demand_execution_parameter_outputs( self.demand_execution, self.container_working_path @@ -213,7 +219,11 @@ def batch_job_builder(self) -> BatchJobBuilder: tmp_path=self.efs_tmp_path, scratch_mount_point=self.scratch_vol_configuration.mount_point_config, shared_mount_point=self.shared_vol_configuration.mount_point_config, + tmp_mount_point=self.tmp_vol_configuration.mount_point_config + if self.tmp_vol_configuration + else None, env_base=self.env_base, + env_file_write_mode=self.configuration.env_file_write_mode, ) return self._batch_job_builder @@ -234,6 +244,8 @@ def pre_execution_data_sync_requests(self) -> List[PrepareBatchDataSyncRequest]: retain_source_data=True, require_lock=True, batch_size_bytes_limit=75 * BYTES_PER_GIBIBYTE, # 75 GiB max + size_only=self.configuration.size_only, + force=self.configuration.force, ) for param in self.demand_execution.execution_parameters.downloadable_job_param_inputs ] @@ -247,12 +259,19 @@ def post_execution_data_sync_requests(self) -> List[PrepareBatchDataSyncRequest] retain_source_data=False, require_lock=False, batch_size_bytes_limit=75 * BYTES_PER_GIBIBYTE, # 75 GiB max + size_only=self.configuration.size_only, + force=self.configuration.force, ) for param in self.demand_execution.execution_parameters.uploadable_job_param_outputs ] @classmethod - def from_demand_execution(cls, demand_execution: DemandExecution, env_base: EnvBase): + def from_demand_execution( + cls, + demand_execution: DemandExecution, + env_base: EnvBase, + configuration: Optional[ContextManagerConfiguration] = None, + ): vol_configuration = get_batch_efs_configuration( env_base=env_base, container_path=f"/opt/efs{EFS_SCRATCH_PATH}", @@ -273,6 +292,7 @@ def from_demand_execution(cls, demand_execution: DemandExecution, env_base: EnvB scratch_vol_configuration=vol_configuration, shared_vol_configuration=shared_vol_configuration, tmp_vol_configuration=tmp_vol_configuration, + configuration=configuration or ContextManagerConfiguration(), env_base=env_base, ) @@ -283,7 +303,7 @@ def from_demand_execution(cls, demand_execution: DemandExecution, env_base: EnvB def update_demand_execution_parameter_inputs( - demand_execution: DemandExecution, container_shared_path: Path + demand_execution: DemandExecution, container_shared_path: Path, isolate_inputs: bool = False ) -> DemandExecution: """Modifies demand execution input destinations with the location of the volume configuration @@ -311,6 +331,7 @@ def update_demand_execution_parameter_inputs( Args: demand_execution (DemandExecution): Demand execution object to modify (copied) vol_configuration (BatchEFSConfiguration): volume configuration + isolate_inputs (bool): flag to determine if inputs should be isolated Returns: DemandExecution: a demand execution with modified execution parameter inputs @@ -319,13 +340,16 @@ def update_demand_execution_parameter_inputs( demand_execution = demand_execution.copy() execution_params = demand_execution.execution_parameters # TODO: we should allow for the ability to specify the local path for the input - updated_params = { - param.name: Resolvable( - local=(container_shared_path / sha256_hexdigest(param.remote_value)).as_posix(), - remote=param.remote_value, - ) - for param in execution_params.downloadable_job_param_inputs - } + updated_params = {} + for param in execution_params.downloadable_job_param_inputs: + if isolate_inputs: + local = container_shared_path / demand_execution.execution_id / param.value + else: + local = container_shared_path / sha256_hexdigest(param.remote_value) + + new_resolvable = Resolvable(local=local.as_posix(), remote=param.remote_value) + updated_params[param.name] = new_resolvable + execution_params.update_params(**updated_params) return demand_execution @@ -418,12 +442,13 @@ def generate_batch_job_builder( scratch_mount_point: MountPointConfiguration, shared_mount_point: MountPointConfiguration, tmp_mount_point: Optional[MountPointConfiguration] = None, + env_file_write_mode: EnvFileWriteMode = EnvFileWriteMode.ALWAYS, ) -> BatchJobBuilder: logger.info(f"Constructing BatchJobBuilder instance") demand_execution = demand_execution.copy() efs_mount_points = [scratch_mount_point, shared_mount_point] - if tmp_mount_point: + if tmp_mount_point is not None: efs_mount_points.append(tmp_mount_point) logger.info(f"Resolving local paths of working dir = {working_path} and tmp dir = {tmp_path}") container_working_path = get_local_path(working_path, mount_points=efs_mount_points) @@ -485,42 +510,64 @@ def generate_batch_job_builder( # If the local environment file is not None, then the file is writable from this local machine # We will now write a portion of environment variables to files that can be written. - if local_environment_file is not None: - # Steps for writing environment variables to file: - # 1. Identify all environment variables that are not referenced in the command - # if not referenced, then add to environment file. - # 2. Write environment file - # 3. Add environment file to command - ENVIRONMENT_FILE_VAR = "ENVIRONMENT_FILE" - - # Step 1:, split environment variables based on reference are referenced in the command - writable_environment = environment.copy() - required_environment: Dict[str, str] = {} - for arg in command + [_ for c in pre_commands for _ in c]: - for match in re.findall(r"\$\{?([\w]+)\}?", arg): - if match in writable_environment: - required_environment[match] = writable_environment.pop(match) - - # Add the environment file variable to the required environment variables - environment = required_environment.copy() - environment[ENVIRONMENT_FILE_VAR] = container_environment_file.as_posix() - - # Step 2: write to the environment file - local_environment_file.parent.mkdir(parents=True, exist_ok=True) - write_env_file(writable_environment, local_environment_file) - - # Finally, add the environment file to the command - pre_commands.append(f". ${{{ENVIRONMENT_FILE_VAR}}}".split(" ")) - else: + if local_environment_file is None or env_file_write_mode == EnvFileWriteMode.NEVER: # If the environment file cannot be written to, then the environment variables are # passed directly to the container. This is a fallback option and will fail if the # environment variables are too long. + if local_environment_file is None: + reason = f"Could not write environment variables to file {efs_environment_file_uri}." + else: + reason = "Environment file write mode set to NEVER." + logger.warning( - f"Could not write environment variables to file {efs_environment_file_uri}." - "Environment variables will be passed directly to the container. " + f"{reason} Environment variables will be passed directly to the container. " "THIS MAY CAUSE THE CONTAINER TO FAIL IF THE ENVIRONMENT VARIABLES " "ARE LONGER THAN 8192 CHARACTERS!!!" ) + + else: + if env_file_write_mode == EnvFileWriteMode.IF_REQUIRED: + env_size = sum([sys.getsizeof(k) + sys.getsizeof(v) for k, v in environment.items()]) + + if env_size > 8192 * 0.9: + logger.info( + f"Environment variables are too large to pass directly to container (> 90% of 8192). " + f"Writing environment variables to file {efs_environment_file_uri}." + ) + confirm_write = True + else: + confirm_write = False + elif env_file_write_mode == EnvFileWriteMode.ALWAYS: + logger.info(f"Writing environment variables to file {efs_environment_file_uri}.") + confirm_write = True + + if confirm_write: + # Steps for writing environment variables to file: + # 1. Identify all environment variables that are not referenced in the command + # if not referenced, then add to environment file. + # 2. Write environment file + # 3. Add environment file to command + ENVIRONMENT_FILE_VAR = "_ENVIRONMENT_FILE" + + # Step 1:, split environment variables based on reference are referenced in the command + writable_environment = environment.copy() + required_environment: Dict[str, str] = {} + for arg in command + [_ for c in pre_commands for _ in c]: + for match in re.findall(r"\$\{?([\w]+)\}?", arg): + if match in writable_environment: + required_environment[match] = writable_environment.pop(match) + + # Add the environment file variable to the required environment variables + environment = required_environment.copy() + environment[ENVIRONMENT_FILE_VAR] = container_environment_file.as_posix() + + # Step 2: write to the environment file + local_environment_file.parent.mkdir(parents=True, exist_ok=True) + write_env_file(writable_environment, local_environment_file) + + # Finally, add the environment file to the command + pre_commands.append(f". ${{{ENVIRONMENT_FILE_VAR}}}".split(" ")) + # ------------------------------------------------------------------ command_string = " && ".join([" ".join(_) for _ in pre_commands + [command]]) @@ -529,6 +576,8 @@ def generate_batch_job_builder( BatchEFSConfiguration(scratch_mount_point, read_only=False), BatchEFSConfiguration(shared_mount_point, read_only=True), ] + if tmp_mount_point: + vol_configurations.append(BatchEFSConfiguration(tmp_mount_point, read_only=False)) logger.info(f"Constructing BatchJobBuilder instance...") return BatchJobBuilder( image=demand_execution.execution_image, diff --git a/src/aibs_informatics_aws_lambda/handlers/demand/model.py b/src/aibs_informatics_aws_lambda/handlers/demand/model.py index e55d7e8..97aa53a 100644 --- a/src/aibs_informatics_aws_lambda/handlers/demand/model.py +++ b/src/aibs_informatics_aws_lambda/handlers/demand/model.py @@ -1,7 +1,8 @@ from dataclasses import dataclass -from typing import Any, Dict, List, Optional +from enum import Enum +from typing import Any, Dict, List, Literal, Optional -from aibs_informatics_core.models.base import SchemaModel, custom_field +from aibs_informatics_core.models.base import EnumField, SchemaModel, custom_field from aibs_informatics_core.models.data_sync import DataSyncRequest from aibs_informatics_core.models.demand_execution import DemandExecution @@ -38,6 +39,23 @@ class DemandFileSystemConfigurations(SchemaModel): ) +class EnvFileWriteMode(str, Enum): + NEVER = "never" + ALWAYS = "always" + IF_REQUIRED = "IF_REQUIRED" + + +@dataclass +class ContextManagerConfiguration(SchemaModel): + isolate_inputs: bool = custom_field(default=False) + env_file_write_mode: EnvFileWriteMode = custom_field( + mm_field=EnumField(EnvFileWriteMode), default=EnvFileWriteMode.IF_REQUIRED + ) + # data sync configurations + force: bool = custom_field(default=False) + size_only: bool = custom_field(default=True) + + @dataclass class PrepareDemandScaffoldingRequest(SchemaModel): demand_execution: DemandExecution = custom_field(mm_field=DemandExecution.as_mm_field()) @@ -45,6 +63,10 @@ class PrepareDemandScaffoldingRequest(SchemaModel): mm_field=DemandFileSystemConfigurations.as_mm_field(), default_factory=DemandFileSystemConfigurations, ) + context_manager_configuration: ContextManagerConfiguration = custom_field( + mm_field=ContextManagerConfiguration.as_mm_field(), + default_factory=ContextManagerConfiguration, + ) @dataclass diff --git a/test/aibs_informatics_aws_lambda/handlers/demand/test_context_manager.py b/test/aibs_informatics_aws_lambda/handlers/demand/test_context_manager.py index dab799d..89c6e1f 100644 --- a/test/aibs_informatics_aws_lambda/handlers/demand/test_context_manager.py +++ b/test/aibs_informatics_aws_lambda/handlers/demand/test_context_manager.py @@ -41,6 +41,10 @@ get_batch_job_queue_name, update_demand_execution_parameter_inputs, ) +from aibs_informatics_aws_lambda.handlers.demand.model import ( + ContextManagerConfiguration, + EnvFileWriteMode, +) ENV_BASE = EnvBase("dev-marmotdev") DEMAND_ID = UniqueID.create() @@ -159,6 +163,41 @@ def test__update_demand_execution_parameter_inputs__works( assert job_inputs[0].value.startswith(efs_mount_point_config.mount_point.as_posix()) +def test__update_demand_execution_parameter_inputs__isolates_inputs( + get_or_create_file_system, create_access_point +): + fs_id = get_or_create_file_system("fs") + ap_id = create_access_point(fs_id, "ap", "/opt/efs") + + efs_mount_point_config = MountPointConfiguration.build( + "/mnt/efs", file_system=fs_id, access_point=ap_id + ) + + demand_execution = get_any_demand_execution( + execution_parameters=DemandExecutionParameters( + command=["cmd"], + inputs=["X"], + params={ + "X": { + "local": "X", + "remote": S3_URI / "in", + } + }, + output_s3_prefix=S3_URI, + ) + ) + + demand_execution = update_demand_execution_parameter_inputs( + demand_execution, efs_mount_point_config.mount_point, isolate_inputs=True + ) + + job_inputs = demand_execution.execution_parameters.job_param_inputs + + assert len(job_inputs) == 1 + assert job_inputs[0].value.startswith(efs_mount_point_config.mount_point.as_posix()) + assert job_inputs[0].value.endswith(f"/{DEMAND_ID}/X") + + def test__BatchEFSConfiguration__build__works(get_or_create_file_system, create_access_point): fs_id = get_or_create_file_system("fs") ap_id = create_access_point(fs_id, "access_point", "/opt/efs") @@ -300,9 +339,17 @@ def setUpEFS(self): self.gwo_file_system_id, tags={self.env_base.ENV_BASE_KEY: self.env_base}, ) + # HACK: on macos, /tmp is a symlink to /private/tmp. This causes problems in the + # resolution and mapping of mounted paths to efs paths because we use the + # `pathlib,Path.resolve` to get the real path. This method will resolve the + # symlink to the real path. This is not what we want. We want to keep the + # symlink in the path. But there is no other method in `pathlib.Path` that + # will normalize the path. + # Solution here is to make tmp -> tmpdir. This is a hack and should not be + # a problem in linux based machines in production. self.access_point_id__tmp = self.create_access_point( EFS_TMP_ACCESS_POINT_NAME, - f"{EFS_TMP_PATH}", + f"{EFS_TMP_PATH}dir", self.gwo_file_system_id, tags={self.env_base.ENV_BASE_KEY: self.env_base}, ) @@ -356,6 +403,50 @@ def create_access_point( ) return response["AccessPointId"] + def test__init__with_tmp_vol_configuration(self): + demand_execution = get_any_demand_execution( + execution_id=(demand_id := uuid_str("123")), + execution_parameters=DemandExecutionParameters( + command=["cmd"], + ), + ) + + vol_configuration = get_batch_efs_configuration( + env_base=self.env_base, + container_path=f"/opt/efs{EFS_SCRATCH_PATH}", + access_point_name=EFS_SCRATCH_ACCESS_POINT_NAME, + read_only=False, + ) + shared_vol_configuration = get_batch_efs_configuration( + env_base=self.env_base, + container_path=f"/opt/efs{EFS_SHARED_PATH}", + access_point_name=EFS_SHARED_ACCESS_POINT_NAME, + read_only=True, + ) + tmp_vol_configuration = get_batch_efs_configuration( + env_base=self.env_base, + container_path=f"/opt/efs{EFS_TMP_PATH}dir", + access_point_name=EFS_TMP_ACCESS_POINT_NAME, + read_only=False, + ) + + decm = DemandExecutionContextManager( + demand_execution=demand_execution, + scratch_vol_configuration=vol_configuration, + shared_vol_configuration=shared_vol_configuration, + tmp_vol_configuration=tmp_vol_configuration, + configuration=ContextManagerConfiguration(), + env_base=self.env_base, + ) + + self.assertEqual(len(decm.efs_mount_points), 3) + self.assertEqual(decm.container_shared_path, Path("/opt/efs/shared")) + self.assertEqual(decm.container_working_path, Path(f"/opt/efs/scratch/{demand_id}")) + self.assertEqual(decm.container_tmp_path, Path("/opt/efs/tmpdir")) + + bjb = decm.batch_job_builder + self.assertEqual(len(bjb.mount_points), 3) + def test__container_and_efs_path_properties__are_as_expected(self): demand_execution = get_any_demand_execution( execution_id=(demand_id := uuid_str("123")), @@ -374,7 +465,7 @@ def test__container_and_efs_path_properties__are_as_expected(self): ) self.assertEqual(decm.efs_tmp_path, EFSPath(f"{self.gwo_file_system_id}:/scratch/tmp")) - def test__batch_job_builder(self): + def test__batch_job_builder__always_write_mode(self): demand_execution = get_any_demand_execution( execution_id=uuid_str("123"), execution_parameters=DemandExecutionParameters( @@ -382,7 +473,11 @@ def test__batch_job_builder(self): params={"A": "a", "B": "b", "C": "c"}, ), ) - decm = DemandExecutionContextManager.from_demand_execution(demand_execution, self.env_base) + decm = DemandExecutionContextManager.from_demand_execution( + demand_execution, + self.env_base, + configuration=ContextManagerConfiguration(env_file_write_mode=EnvFileWriteMode.ALWAYS), + ) actual = decm.batch_job_builder self.assertEqual(actual.image, demand_execution.execution_image) self.assertStringPattern("dev-marmotdev-custom-[a-f0-9]{64}", actual.job_definition_name) @@ -398,16 +493,139 @@ def test__batch_job_builder(self): assert actual.environment == { "ENV_BASE": "dev-marmotdev", "AWS_REGION": "us-west-2", - "ENVIRONMENT_FILE": f"/opt/efs/scratch/{demand_execution.execution_id}/.demand.env", + "_ENVIRONMENT_FILE": f"/opt/efs/scratch/{demand_execution.execution_id}/.demand.env", + "WORKING_DIR": f"/opt/efs/scratch/{demand_execution.execution_id}", + "TMPDIR": "/opt/efs/scratch/tmp", + "A": "a", + "B": "b", + } + assert actual.command == [ + "/bin/bash", + "-c", + "mkdir -p ${WORKING_DIR} && mkdir -p ${TMPDIR} && cd ${WORKING_DIR} && . ${_ENVIRONMENT_FILE} && cmd ${ENV_BASE} ${A}/${B}", + ] + + def test__batch_job_builder__never_write_mode(self): + demand_execution = get_any_demand_execution( + execution_id=uuid_str("123"), + execution_parameters=DemandExecutionParameters( + command=["cmd", "${ENV_BASE}", "${A}/${B}"], + params={"A": "a", "B": "b", "C": "c"}, + ), + ) + decm = DemandExecutionContextManager.from_demand_execution( + demand_execution, + self.env_base, + configuration=ContextManagerConfiguration(env_file_write_mode=EnvFileWriteMode.NEVER), + ) + actual = decm.batch_job_builder + self.assertEqual(actual.image, demand_execution.execution_image) + self.assertStringPattern("dev-marmotdev-custom-[a-f0-9]{64}", actual.job_definition_name) + + # Assert environment and environment file are as expected + env_file = ( + self.root_mount_point / "scratch" / demand_execution.execution_id / ".demand.env" + ) + assert not env_file.exists() + assert actual.environment == { + "ENV_BASE": "dev-marmotdev", + "AWS_REGION": "us-west-2", + "WORKING_DIR": f"/opt/efs/scratch/{demand_execution.execution_id}", + "TMPDIR": "/opt/efs/scratch/tmp", + "EXECUTION_ID": demand_execution.execution_id, + "A": "a", + "B": "b", + "C": "c", + } + assert actual.command == [ + "/bin/bash", + "-c", + "mkdir -p ${WORKING_DIR} && mkdir -p ${TMPDIR} && cd ${WORKING_DIR} && cmd ${ENV_BASE} ${A}/${B}", + ] + + def test__batch_job_builder__conditional_write_mode__not_required(self): + demand_execution = get_any_demand_execution( + execution_id=uuid_str("123"), + execution_parameters=DemandExecutionParameters( + command=["cmd", "${ENV_BASE}", "${A}/${B}"], + params={"A": "a", "B": "b", "C": "c"}, + ), + ) + decm = DemandExecutionContextManager.from_demand_execution( + demand_execution, + self.env_base, + configuration=ContextManagerConfiguration( + env_file_write_mode=EnvFileWriteMode.IF_REQUIRED + ), + ) + actual = decm.batch_job_builder + self.assertEqual(actual.image, demand_execution.execution_image) + self.assertStringPattern("dev-marmotdev-custom-[a-f0-9]{64}", actual.job_definition_name) + + # Assert environment and environment file are as expected + env_file = ( + self.root_mount_point / "scratch" / demand_execution.execution_id / ".demand.env" + ) + assert not env_file.exists() + assert actual.environment == { + "ENV_BASE": "dev-marmotdev", + "AWS_REGION": "us-west-2", "WORKING_DIR": f"/opt/efs/scratch/{demand_execution.execution_id}", "TMPDIR": "/opt/efs/scratch/tmp", + "EXECUTION_ID": demand_execution.execution_id, "A": "a", "B": "b", + "C": "c", + } + assert actual.command == [ + "/bin/bash", + "-c", + "mkdir -p ${WORKING_DIR} && mkdir -p ${TMPDIR} && cd ${WORKING_DIR} && cmd ${ENV_BASE} ${A}/${B}", + ] + + def test__batch_job_builder__conditional_write_mode__required(self): + demand_execution = get_any_demand_execution( + execution_id=uuid_str("123"), + execution_parameters=DemandExecutionParameters( + command=["cmd", "${ENV_BASE}"], + params={ + # This should be greater than 8192 + f"VAR_{i}": f"VAL_{i}" + for i in range(1000, 2000) + }, + ), + ) + decm = DemandExecutionContextManager.from_demand_execution( + demand_execution, + self.env_base, + configuration=ContextManagerConfiguration( + env_file_write_mode=EnvFileWriteMode.IF_REQUIRED + ), + ) + actual = decm.batch_job_builder + self.assertEqual(actual.image, demand_execution.execution_image) + self.assertStringPattern("dev-marmotdev-custom-[a-f0-9]{64}", actual.job_definition_name) + + # Assert environment and environment file are as expected + env_file = ( + self.root_mount_point / "scratch" / demand_execution.execution_id / ".demand.env" + ) + assert env_file.exists() + assert env_file.read_text() == ( + f'export EXECUTION_ID="{demand_execution.execution_id}"\n' + + "\n".join([f'export VAR_{i}="VAL_{i}"' for i in range(1000, 2000)]) + ) + assert actual.environment == { + "ENV_BASE": "dev-marmotdev", + "AWS_REGION": "us-west-2", + "_ENVIRONMENT_FILE": f"/opt/efs/scratch/{demand_execution.execution_id}/.demand.env", + "WORKING_DIR": f"/opt/efs/scratch/{demand_execution.execution_id}", + "TMPDIR": "/opt/efs/scratch/tmp", } assert actual.command == [ "/bin/bash", "-c", - "mkdir -p ${WORKING_DIR} && mkdir -p ${TMPDIR} && cd ${WORKING_DIR} && . ${ENVIRONMENT_FILE} && cmd ${ENV_BASE} ${A}/${B}", + "mkdir -p ${WORKING_DIR} && mkdir -p ${TMPDIR} && cd ${WORKING_DIR} && . ${_ENVIRONMENT_FILE} && cmd ${ENV_BASE}", ] def test__pre_execution_data_sync_requests__no_inputs_generate_empty_list(self): From 4b28bfc466231a7b0f68288fa2f40cf542c65a57 Mon Sep 17 00:00:00 2001 From: Ryan McGinty Date: Fri, 14 Jun 2024 15:27:53 -0700 Subject: [PATCH 3/3] final fixes --- .../handlers/demand/context_manager.py | 13 +++++++++---- .../handlers/demand/model.py | 6 +++--- .../handlers/demand/test_context_manager.py | 11 ++++++++--- 3 files changed, 20 insertions(+), 10 deletions(-) diff --git a/src/aibs_informatics_aws_lambda/handlers/demand/context_manager.py b/src/aibs_informatics_aws_lambda/handlers/demand/context_manager.py index 056097b..6509372 100644 --- a/src/aibs_informatics_aws_lambda/handlers/demand/context_manager.py +++ b/src/aibs_informatics_aws_lambda/handlers/demand/context_manager.py @@ -112,7 +112,10 @@ def __post_init__(self): self._batch_job_builder = None self.demand_execution = update_demand_execution_parameter_inputs( - self.demand_execution, self.container_shared_path, self.configuration.isolate_inputs + self.demand_execution, + self.container_shared_path, + self.container_working_path, + self.configuration.isolate_inputs, ) self.demand_execution = update_demand_execution_parameter_outputs( self.demand_execution, self.container_working_path @@ -303,7 +306,10 @@ def from_demand_execution( def update_demand_execution_parameter_inputs( - demand_execution: DemandExecution, container_shared_path: Path, isolate_inputs: bool = False + demand_execution: DemandExecution, + container_shared_path: Path, + container_scratch_path: Path, + isolate_inputs: bool = False, ) -> DemandExecution: """Modifies demand execution input destinations with the location of the volume configuration @@ -339,11 +345,10 @@ def update_demand_execution_parameter_inputs( demand_execution = demand_execution.copy() execution_params = demand_execution.execution_parameters - # TODO: we should allow for the ability to specify the local path for the input updated_params = {} for param in execution_params.downloadable_job_param_inputs: if isolate_inputs: - local = container_shared_path / demand_execution.execution_id / param.value + local = container_scratch_path / demand_execution.execution_id / param.value else: local = container_shared_path / sha256_hexdigest(param.remote_value) diff --git a/src/aibs_informatics_aws_lambda/handlers/demand/model.py b/src/aibs_informatics_aws_lambda/handlers/demand/model.py index 97aa53a..f4f12c2 100644 --- a/src/aibs_informatics_aws_lambda/handlers/demand/model.py +++ b/src/aibs_informatics_aws_lambda/handlers/demand/model.py @@ -40,8 +40,8 @@ class DemandFileSystemConfigurations(SchemaModel): class EnvFileWriteMode(str, Enum): - NEVER = "never" - ALWAYS = "always" + NEVER = "NEVER" + ALWAYS = "ALWAYS" IF_REQUIRED = "IF_REQUIRED" @@ -49,7 +49,7 @@ class EnvFileWriteMode(str, Enum): class ContextManagerConfiguration(SchemaModel): isolate_inputs: bool = custom_field(default=False) env_file_write_mode: EnvFileWriteMode = custom_field( - mm_field=EnumField(EnvFileWriteMode), default=EnvFileWriteMode.IF_REQUIRED + mm_field=EnumField(EnvFileWriteMode), default=EnvFileWriteMode.ALWAYS ) # data sync configurations force: bool = custom_field(default=False) diff --git a/test/aibs_informatics_aws_lambda/handlers/demand/test_context_manager.py b/test/aibs_informatics_aws_lambda/handlers/demand/test_context_manager.py index 89c6e1f..f91febf 100644 --- a/test/aibs_informatics_aws_lambda/handlers/demand/test_context_manager.py +++ b/test/aibs_informatics_aws_lambda/handlers/demand/test_context_manager.py @@ -154,7 +154,9 @@ def test__update_demand_execution_parameter_inputs__works( ) demand_execution = update_demand_execution_parameter_inputs( - demand_execution, efs_mount_point_config.mount_point + demand_execution=demand_execution, + container_shared_path=efs_mount_point_config.mount_point, + container_scratch_path=efs_mount_point_config.mount_point, ) job_inputs = demand_execution.execution_parameters.job_param_inputs @@ -188,13 +190,16 @@ def test__update_demand_execution_parameter_inputs__isolates_inputs( ) demand_execution = update_demand_execution_parameter_inputs( - demand_execution, efs_mount_point_config.mount_point, isolate_inputs=True + demand_execution, + container_shared_path=efs_mount_point_config.mount_point, + container_scratch_path=Path("/opt/tmp/"), + isolate_inputs=True, ) job_inputs = demand_execution.execution_parameters.job_param_inputs assert len(job_inputs) == 1 - assert job_inputs[0].value.startswith(efs_mount_point_config.mount_point.as_posix()) + assert job_inputs[0].value.startswith("/opt/tmp") assert job_inputs[0].value.endswith(f"/{DEMAND_ID}/X")