From 997ca3bc25b954da5fbd834665a1035043e9a05d Mon Sep 17 00:00:00 2001 From: Robert Bartel Date: Fri, 5 Jul 2024 13:43:52 -0400 Subject: [PATCH 01/21] Update DataRequirement with needs_data_local attr. --- python/lib/core/dmod/core/meta_data.py | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/python/lib/core/dmod/core/meta_data.py b/python/lib/core/dmod/core/meta_data.py index d67552d26..16a9151aa 100644 --- a/python/lib/core/dmod/core/meta_data.py +++ b/python/lib/core/dmod/core/meta_data.py @@ -1406,8 +1406,14 @@ class DataRequirement(Serializable): """ category: DataCategory domain: DataDomain + # TODO: (later/future) define another type ("FulfillmentDetails" maybe) tracking all three of these; make that + # entire object optional here, but the attributes within it non-optional (though confirm this doesn't break things) fulfilled_access_at: Optional[str] = Field(description="The location at which the fulfilling dataset for this requirement is accessible, if the dataset known.") fulfilled_by: Optional[str] = Field(description="The name of the dataset that will fulfill this, if it is known.") + needs_data_local: Optional[bool] = Field(None, description="Whether this requirement will be fulfilled in a way " + "that requires the data to be copied or extracted " + "locally (or to a local volume) for the worker to use " + "it.") is_input: bool = Field(description="Whether this represents required input data, as opposed to a requirement for storing output data.") size: Optional[int] From 4e3e03f1928f6ad5db4f36d50c7656b7bd08abcd Mon Sep 17 00:00:00 2001 From: Robert Bartel Date: Fri, 5 Jul 2024 13:48:09 -0400 Subject: [PATCH 02/21] Setting needs_data_local when fulfilled_by is set. Updating usages of DataRequirement so that whenever the fulfilled_by attribute of an instance is set - creation time or otherwise - the new needs_data_local is also set. --- .../dmod/test/scheduler_test_utils.py | 2 +- python/lib/scheduler/dmod/test/test_job.py | 2 ++ .../dmod/dataservice/data_derive_util.py | 12 ++++++--- .../dataservice/dataset_manager_collection.py | 27 +++++++++++++++++++ .../dataservice/dmod/dataservice/service.py | 17 +++++++++--- .../dmod/partitionerservice/service.py | 10 ++++++- 6 files changed, 61 insertions(+), 9 deletions(-) diff --git a/python/lib/scheduler/dmod/test/scheduler_test_utils.py b/python/lib/scheduler/dmod/test/scheduler_test_utils.py index 091bba9eb..84ab31ebe 100644 --- a/python/lib/scheduler/dmod/test/scheduler_test_utils.py +++ b/python/lib/scheduler/dmod/test/scheduler_test_utils.py @@ -86,7 +86,7 @@ def mock_job(model: str = 'nwm', cpus: int = 4, mem: int = 500000, strategy: str data_domain = DataDomain(data_format=DataFormat.NGEN_CSV_OUTPUT, discrete_restrictions=[DiscreteRestriction(variable='id', values=[])]) output_requirement = DataRequirement(domain=data_domain, is_input=False, category=DataCategory.OUTPUT, - fulfilled_by=dataset_name) + fulfilled_by=dataset_name, needs_data_local=False) else: raise(ValueError("Unsupported mock model {}".format(model))) diff --git a/python/lib/scheduler/dmod/test/test_job.py b/python/lib/scheduler/dmod/test/test_job.py index 698f01507..613a242e1 100644 --- a/python/lib/scheduler/dmod/test/test_job.py +++ b/python/lib/scheduler/dmod/test/test_job.py @@ -200,7 +200,9 @@ def test_factory_init_from_deserialized_json_2_b(self): index_val = 0 for req in base_job.data_requirements: + req.fulfilled_access_at = 'imaginary-dataset-{}'.format(index_val) req.fulfilled_by = 'imaginary-dataset-{}'.format(index_val) + req.needs_data_local = False index_val += 1 for f in [req.fulfilled_by for req in base_job.data_requirements]: diff --git a/python/services/dataservice/dmod/dataservice/data_derive_util.py b/python/services/dataservice/dmod/dataservice/data_derive_util.py index 094284b86..437992f33 100644 --- a/python/services/dataservice/dmod/dataservice/data_derive_util.py +++ b/python/services/dataservice/dmod/dataservice/data_derive_util.py @@ -125,11 +125,12 @@ def __init__(self, dataset_manager_collection: DatasetManagerCollection, noah_ow def _apply_dataset_to_requirement(self, dataset: Dataset, requirement: DataRequirement, job: Job): """ - Set ::attribute:`DataRequirement.fulfilled_access_at` and ::attribute:`DataRequirement.fulfilled_by`. + Set attributes that indicate details of how this dataset fulfills this requirement. - Update the provided requirement's ::attribute:`DataRequirement.fulfilled_access_at` and - ::attribute:`DataRequirement.fulfilled_by` attributes to associate the requirement with the provided dataset. - The dataset is assume to have already been determined as satisfactory to fulfill the given requirement. + Update the provided requirement's ::attribute:`DataRequirement.fulfilled_access_at`, + ::attribute:`DataRequirement.fulfilled_by`, and ::attribute:`DataRequirement.needs_data_local` attributes to + associate the requirement with the provided dataset. The dataset is assumed to have already been determined as + satisfactory to fulfill the given requirement. Parameters ---------- @@ -152,6 +153,9 @@ def _apply_dataset_to_requirement(self, dataset: Dataset, requirement: DataRequi ################################################################################# requirement.fulfilled_access_at = self._determine_access_location(dataset, job) ################################################################################# + + # Identify datasets that need data locally for job exec, and set needs_data_local to True or False + requirement.needs_data_local = self._managers.would_requirement_need_local_data(dataset) requirement.fulfilled_by = dataset.name def _build_bmi_auto_generator(self, diff --git a/python/services/dataservice/dmod/dataservice/dataset_manager_collection.py b/python/services/dataservice/dmod/dataservice/dataset_manager_collection.py index 6e7c6c35b..782f92627 100644 --- a/python/services/dataservice/dmod/dataservice/dataset_manager_collection.py +++ b/python/services/dataservice/dmod/dataservice/dataset_manager_collection.py @@ -17,6 +17,33 @@ class DatasetManagerCollection: default_factory=dict, init=False ) + @classmethod + def would_requirement_need_local_data(cls, dataset: Dataset) -> bool: + """ + Check whether a ::class:`DataRequirement` fulfilled by this dataset needs local data for job execution. + + Check whether a hypothetical ::class:`DataRequirement` fulfilled by this dataset needs data cached locally for + execution of the requirement's parent job. + + Parameters + ---------- + dataset + The dataset fulfilling a hypothetical data requirement for a job. + + Returns + ------- + bool + Whether a ::class:`DataRequirement` fulfilled by this dataset needs data cached locally for job execution. + """ + # Always require local data when something is archived + if dataset.is_data_archived: + return True + # Also require local data for anything in the object store + elif dataset.dataset_type == DatasetType.OBJECT_STORE: + return True + else: + return False + def __hash__(self) -> int: return id(self) diff --git a/python/services/dataservice/dmod/dataservice/service.py b/python/services/dataservice/dmod/dataservice/service.py index 08a68a5e0..1a2ad4bfa 100644 --- a/python/services/dataservice/dmod/dataservice/service.py +++ b/python/services/dataservice/dmod/dataservice/service.py @@ -790,6 +790,12 @@ def _create_output_datasets(self, job: Job): dataset_type = DatasetType.OBJECT_STORE mgr = self._managers.manager(dataset_type) + # If possible, decide whether the output will need to be locally written + if dataset_type == DatasetType.OBJECT_STORE: + write_output_locally_first = True + else: + write_output_locally_first = None + data_format = job.model_request.output_formats[i] # If we are writing to an object store, lots of CSV files will kill us, so switch to archived variant if dataset_type == DatasetType.OBJECT_STORE and data_format == DataFormat.NGEN_CSV_OUTPUT: @@ -803,7 +809,8 @@ def _create_output_datasets(self, job: Job): discrete_restrictions=[id_restrict])) # TODO: (later) in the future, whether the job is running via Docker needs to be checked # TODO: also, whatever is done here needs to align with what is done within perform_checks_for_job, when - # setting the fulfilled_access_at for the DataRequirement + # setting the fulfilled_access_at for the DataRequirement (and with _determine_access_location functions + # in python/services/dataservice/dmod/dataservice/data_derive_util.py ) is_job_run_in_docker = True if is_job_run_in_docker: output_access_at = dataset.docker_mount @@ -812,7 +819,8 @@ def _create_output_datasets(self, job: Job): raise DmodRuntimeError(msg.format(dataset.__class__.__name__, job.job_id)) # Create a data requirement for the job, fulfilled by the new dataset requirement = DataRequirement(domain=dataset.data_domain, is_input=False, category=DataCategory.OUTPUT, - fulfilled_by=dataset.name, fulfilled_access_at=output_access_at) + fulfilled_by=dataset.name, fulfilled_access_at=output_access_at, + needs_data_local=write_output_locally_first) job.data_requirements.append(requirement) async def perform_checks_for_job(self, job: Job) -> bool: @@ -861,13 +869,16 @@ async def perform_checks_for_job(self, job: Job) -> bool: job_ds_user.link_to_dataset(dataset=dataset) # TODO: (later) in the future, whether the job is running via Docker needs to be checked # TODO: also, whatever is done here needs to align with what is done within _create_output_dataset, - # when creating the output data DataRequirement + # when creating the output data DataRequirement, and with _determine_access_location function + # in python/services/dataservice/dmod/dataservice/data_derive_util.py is_job_run_in_docker = True if is_job_run_in_docker: requirement.fulfilled_access_at = dataset.docker_mount else: msg = "Could not determine proper access location for dataset of type {} by non-Docker job {}." raise DmodRuntimeError(msg.format(dataset.__class__.__name__, job.job_id)) + # Identify datasets that need data locally for job exec, and set needs_data_local to True or False + requirement.needs_data_local = DatasetManagerCollection.would_requirement_need_local_data(dataset) requirement.fulfilled_by = dataset.name return True except Exception as e: diff --git a/python/services/partitionerservice/dmod/partitionerservice/service.py b/python/services/partitionerservice/dmod/partitionerservice/service.py index 3d4435997..4dc465c92 100644 --- a/python/services/partitionerservice/dmod/partitionerservice/service.py +++ b/python/services/partitionerservice/dmod/partitionerservice/service.py @@ -327,7 +327,11 @@ async def _find_partition_dataset(self, job: Job) -> DatasetManagementResponse: if response.success: logging.info("Existing partition dataset for {} found: {}".format(job.job_id, response.dataset_name)) for r in reqs: + # TODO: (later) fix this not being set + #r.fulfilled_access_at = r.fulfilled_by = response.dataset_name + # For the moment at least, we shouldn't need to worry about IO speed and copying the config locally + r.needs_data_local = False else: logging.info("No existing partition dataset for {} was found: ".format(job.job_id)) return response @@ -415,8 +419,12 @@ async def _generate_partition_config_dataset(self, job: Job) -> bool: # If good, save the partition dataset data_id as a data requirement for the job. data_id_restrict = DiscreteRestriction(variable=StandardDatasetIndex.DATA_ID, values=[part_dataset_data_id]) domain = DataDomain(data_format=DataFormat.NGEN_PARTITION_CONFIG, discrete_restrictions=[data_id_restrict]) + # TODO: (later) fix this not being set properly + #fulfilled_access_at = + fulfilled_access_at = None requirement = DataRequirement(domain=domain, is_input=True, category=DataCategory.CONFIG, - fulfilled_by=part_dataset_name) + fulfilled_by=part_dataset_name, fulfilled_access_at=fulfilled_access_at, + needs_data_local=False) job.data_requirements.append(requirement) else: logging.error("Partition config dataset generation for {} failed".format(job.job_id)) From 36253850cb4cbbc98ea3e22820aec9580aa5ca70 Mon Sep 17 00:00:00 2001 From: Robert Bartel Date: Fri, 5 Jul 2024 14:01:00 -0400 Subject: [PATCH 03/21] Update, document ngen image custom dir structure. Add 2 new directories - /dmod/local_volumes and /dmod/cluster_volumes - to ngen image directory structure, meant for mount points of different types of volumes containing necessary data for the job; also, adding README with some initial documentation on this directory structure. --- docker/main/ngen/Dockerfile | 6 ++- docker/main/ngen/README.md | 100 ++++++++++++++++++++++++++++++++++++ 2 files changed, 104 insertions(+), 2 deletions(-) create mode 100644 docker/main/ngen/README.md diff --git a/docker/main/ngen/Dockerfile b/docker/main/ngen/Dockerfile index b100f6d40..fbc372433 100644 --- a/docker/main/ngen/Dockerfile +++ b/docker/main/ngen/Dockerfile @@ -133,6 +133,8 @@ RUN dnf update -y \ && mkdir -p ${SSHDIR} \ # Create DMOD-specific directory structure \ && mkdir -p /dmod && chown ${USER} /dmod \ + && mkdir -p /dmod/local_volumes && chown ${USER} /dmod/local_volumes \ + && mkdir -p /dmod/cluster_volumes && chown ${USER} /dmod/cluster_volumes \ && mkdir -p /dmod/datasets && chown ${USER} /dmod/datasets \ #&& mkdir -p /dmod/shared_libs && chown ${USER} /dmod/shared_libs \ && mkdir -p /dmod/lib && chown ${USER} /dmod/lib \ @@ -959,9 +961,9 @@ COPY --chown=${USER} --chmod=744 py_funcs.py /dmod/bin/py_funcs ENV HYDRA_PROXY_RETRY_COUNT=5 -# Change permissions for entrypoint and make sure dataset volume mount parent directories exists +# Change permissions for entrypoint and make sure data volume mount and dataset symlink parent directories exists RUN chmod +x ${WORKDIR}/entrypoint.sh \ - && for d in ${DATASET_DIRECTORIES}; do mkdir -p /dmod/datasets/${d}; done \ + && for p in datasets local_volumes cluster_volumes; do for d in ${DATASET_DIRECTORIES}; do mkdir -p /dmod/${p}/${d}; done; done \ && pushd /dmod/bin \ # NOTE use of `ln -sf`. \ && ( ( stat ngen-parallel && ln -sf ngen-parallel ngen ) || ( stat ngen-serial && ln -sf ngen-serial ngen ) ) \ diff --git a/docker/main/ngen/README.md b/docker/main/ngen/README.md new file mode 100644 index 000000000..12ec8b27b --- /dev/null +++ b/docker/main/ngen/README.md @@ -0,0 +1,100 @@ +# About + +TODO:More details about the image. + +TODO:Once implemented, details about available image variants. + +TODO:Once implemented, details about extending the image with additional modules. + +# Image `/dmod/` Directory + +## TL;DR - Directories for Configs + +Several important pieces details if manually constructing configurations to upload into DMOD datasets: +* Compiled BMI module libraries will be in `/dmod/shared_libs/` +* Data used during job execution (e.g., forcings, configs) will (or at least appear) under `/dmod/datasets/` + * Paths descend first by data type, then by dataset name + * E.g., `/dmod/datasets/config/vpu-01-bmi-config-dataset-01/NoahOWP_cat-9999.namelist` + * E.g., `/dmod/datasets/forcing/vpu-01-forcings-dataset-01/cat-1000.csv` + +## Directory Structure Details + +Several important directories exist in the worker images' file system under the `/dmod/` directory, which are important to the operation of job worker containers. + +## Static Directories + +Several important DMOD-specific directories in the worker images are static. They contain things either created or copied in during the image build. These are standard/general things, available in advance, that need to be in fixed, well-defined locations. Examples are the ngen executables, Python packages, and compiled BMI modules. + +### `/dmod/bin/` +* Directory containing custom executables and scripts +* Location for parallel and serial ngen executables, plus the `ngen` symlink that points to one of them (dependent on build config, but parallel by default) +* Path appended to `PATH` in environment + +### `/dmod/bmi_module_data/` +* Directory containing any necessary generic per-BMI-module data files + * `/dmod/bmi_module_data/noah_owp/parameters/` + * Nested directory containing parameters files for Noah-OWP-Modular + * `/dmod/bmi_module_data/lgar-c/data/data/` + * Nested directory for generic LGAR data files + +### `/dmod/shared_libs/` +* Directories containing libraries built during intermediate steps of the image building process that will be needed for worker container execution +* E.g., Compiled BMI module shared libraries, like `libcfebmi.so` for CFE +* Path is appended to `LD_LIBRARY_PATH` + +### `/dmod/venv/` +* Directory for Python virtual environment loaded by worker for execution + +## Dynamic Directories + +Others important DMOD-specific directories in the worker images are dynamic. There is a higher level baseline directory structure that is created by the image build, but the nested contents, which are what is most important to job execution, is put into place when the job worker container is created. Examples of this are configs and forcings. + +### `/dmod/datasets/` +* This contains the paths from which jobs should read their necessary data, and which config files should reference +* Contains subdirectories for different dataset types + * `config`, `forcing`, `hydrofabric`, `observation`, `output` (e.g., `/dmod/datasets/forcing/`) +* Each subdirectory may contain further "subdirectories" (really symlinks) containing different data needed for the current job + * E.g., `/dmod/datasets/config/vpu-01-bmi-config-dataset-01/` + * Has data from `vpu-01-bmi-config-dataset-01` dataset +* Data subdirectories are actually symlinks to an analogous mounted path for either a [cluster volume](#dmodcluster_volumes) or [local volume](#dmodlocal_volumes) + * If a dataset can be mounted as a cluster volume and used directly by the job without local copying, the symlink will be to an analogous cluster volume + * e.g., `/dmod/datasets/config/real-cfg-ds-01/ -> /dmod/cluster_volumes/config/real-cfg-ds-01/` + * If data from a dataset needs to be local or preprocessed in some way by the worker before use, it will be prepared in a local volume, and a symlink here will point to that + * e.g., `/dmod/datasets/config/vpu-01-bmi-config-dataset-01/ -> /dmod/local_volumes/config/vpu-01-bmi-config-dataset-01/` + + +### `/dmod/cluster_volumes/` +* First level subdirectories correspond to DMOD dataset types, as with `/dmod/datasets/` + * e.g., `/dmod/cluster_volumes/config/` +* Second-level subdirectories are mounted Docker cluster volumes that are, in some way or another, synced across all physical nodes of the deployment + * e.g., `/dmod/cluster_volumes/config/vpu-01-bmi-config-dataset-01/` +* Automatic synchronization at the DMOD deployment level + * All workers for a job see exactly the same set of mounted volumes here + * All workers on all physical nodes see the same contents in each mounted volume directory + * Writes done on any worker to a file under a volume subdirectory are seen (essentially) immediately by workers on **all** physical nodes +* Common scenario (typical case at the time of this writing) for these are DMOD dataset volumes + * A dataset is directly mounted as Docker volume via some specialized storage driver (e.g., `s3fs`) + * The contents of the dataset can then be interacted with by the worker as if they were a regular file system (even if they aren't) + * The mount directory name matches the DMOD dataset name + * E.g., the `vpu-01-bmi-config-dataset-01` dataset would be at `/dmod/cluster_volumes/config/vpu-01-bmi-config-dataset-01/` + +*** TODO: set up to link or do pre-processing/extraction/etc. as needed on startup *** +*** TODO: have indication that be that there exists a directory already for the dataset under `/dmod/datasets/` +TODO: have this be for local Docker volumes that are just on individual hosts that we want to keep synced + +### `/dmod/local_volumes/` +* First level subdirectories correspond to DMOD dataset types, as with `/dmod/datasets/` + * e.g., `/dmod/local_volumes/config/` +* Second-level subdirectories are mounted Docker local volumes + * e.g., `/dmod/local_volumes/config/vpu-01-bmi-config-dataset-01/` +* These local volumes are job-wide but host-specific + * All workers for a job see exactly the same set of mounted volumes here + * All workers on the same physical nodes are using the same local volume on that physical host + * Workers on different physical nodes are using different volumes + * This means some coordinated synchronization needs to be performed by a subset of worker +* The use case is for any data that needs to be local on the container/host (generally for performance reasons) to be copied/extracted to a subdirectory here, typically from an analogous subdirectory under `/dmod/cluster_volumes/` + * This at least reduces copying somewhat by allowing workers on same node to share the same local volume + +TODO: make scheduler create these per dataset on the fly at job time and mount them in (and have something to clean them up, but maybe not right away) +TODO: make one worker per physical host extract data when needed from archives in analogous cluster volumes + \ No newline at end of file From 152e8d3f91a489e89b74d61e45fdb409650b7968 Mon Sep 17 00:00:00 2001 From: Robert Bartel Date: Fri, 5 Jul 2024 14:04:07 -0400 Subject: [PATCH 04/21] Update Launcher for using local data volumes. Updating Launcher to prepare services with local volume mounts when some data requirements must be fulfilled by local data on the physical node, and to update the relevant other args for starting worker services so that one worker on each node makes sure data gets prepared in local volumes as needed as part of job startup. --- .../lib/scheduler/dmod/scheduler/scheduler.py | 55 ++++++++++++++++--- 1 file changed, 46 insertions(+), 9 deletions(-) diff --git a/python/lib/scheduler/dmod/scheduler/scheduler.py b/python/lib/scheduler/dmod/scheduler/scheduler.py index a1a2b2ed2..f0301dbea 100644 --- a/python/lib/scheduler/dmod/scheduler/scheduler.py +++ b/python/lib/scheduler/dmod/scheduler/scheduler.py @@ -364,7 +364,7 @@ def _ds_names_helper(cls, job: 'Job', worker_index: int, category: DataCategory, return list(dataset_names) # TODO (later): once we get to dynamic/custom images (i.e., for arbitrary BMI modules), make sure this still works - def _generate_docker_cmd_args(self, job: 'Job', worker_index: int) -> List[str]: + def _generate_docker_cmd_args(self, job: 'Job', worker_index: int, primary_workers: Dict[str, int]) -> List[str]: """ Create the Docker "CMD" arguments list to be used to start all services that will perform this job. @@ -379,6 +379,8 @@ def _generate_docker_cmd_args(self, job: 'Job', worker_index: int) -> List[str]: The job to have worker Docker services started, with those services needing "CMD" arguments generated. worker_index : int The particular worker service index in question, which will have a specific set of data requirements. + primary_workers : Dict[str, int] + Mapping of Docker node hostname to the primary job worker index on that node, for all nodes used by the job. Returns ------- @@ -411,7 +413,8 @@ def _generate_docker_cmd_args(self, job: 'Job', worker_index: int) -> List[str]: } if isinstance(job.model_request, AbstractNgenRequest): - docker_cmd_arg_map.update(self._generate_nextgen_job_docker_cmd_args(job, worker_index)) + docker_cmd_arg_map.update(self._generate_nextgen_job_docker_cmd_args(job=job, worker_index=worker_index, + primary_workers=primary_workers)) # Finally, convert the args map to a list, with each "flag"/key immediately preceding its value args_as_list = [] @@ -421,7 +424,8 @@ def _generate_docker_cmd_args(self, job: 'Job', worker_index: int) -> List[str]: return args_as_list - def _generate_nextgen_job_docker_cmd_args(self, job: 'Job', worker_index: int) -> Dict[str, str]: + def _generate_nextgen_job_docker_cmd_args(self, job: 'Job', worker_index: int, + primary_workers: Dict[str, int]) -> Dict[str, str]: """ Prepare the specific Docker CMD arg applicable to Nextgen-based jobs @@ -436,6 +440,8 @@ def _generate_nextgen_job_docker_cmd_args(self, job: 'Job', worker_index: int) - The job to have worker Docker services started, with those services needing "CMD" arguments generated. worker_index : int The particular worker service index in question, which will have a specific set of data requirements. + primary_workers : Dict[str, int] + Mapping of Docker node hostname to the primary job worker index on that node, for all nodes used by the job. Returns ------- @@ -456,6 +462,7 @@ def _generate_nextgen_job_docker_cmd_args(self, job: 'Job', worker_index: int) - "--hydrofabric-dataset": self._ds_names_helper(job, worker_index, DataCategory.HYDROFABRIC, max_count=1)[0], "--config-dataset": self._ds_names_helper(job, worker_index, DataCategory.CONFIG, max_count=1, data_format=DataFormat.NGEN_JOB_COMPOSITE_CONFIG)[0], + "--primary-workers": ",".join(str(v) for _, v in primary_workers.items()) } if job.cpu_count > 1: @@ -714,14 +721,43 @@ def start_job(self, job: 'Job') -> Tuple[bool, tuple]: secrets = [self.get_secret_reference(secret_name) for secret_name in ['object_store_exec_user_name', 'object_store_exec_user_passwd']] + # Decide (somewhat trivially) which worker per-host will be considered "primary" + # In particular, this node will handle processing any data that needs to be local on the host + # Also, always make worker 0 primary for its host + per_node_primary_workers = {job.allocations[0].hostname: 0} + for alloc_index in range(1, num_allocations): + if (hostname := job.allocations[alloc_index].hostname) not in per_node_primary_workers: + per_node_primary_workers[hostname] = alloc_index + for alloc_index in range(num_allocations): alloc = job.allocations[alloc_index] constraints_str = f"node.hostname == {alloc.hostname}" constraints = list(constraints_str.split("/")) - pattern = '{}:/dmod/datasets/{}/{}:rw' - mounts = [pattern.format(r.fulfilled_access_at, r.category.name.lower(), r.fulfilled_by) for r in - job.worker_data_requirements[alloc_index] if r.fulfilled_access_at is not None] + mounts = [] + for req in job.worker_data_requirements[alloc_index]: + # TODO: (later) not sure, but this seems like it may be problematic condition that requires exception + if req.fulfilled_access_at is None: + logging.warning(f"{req.__class__.__name__} fulfilled by '{req.fulfilled_by}' for '{job.job_id!s}' " + f"has no value for `fulfilled_access_at`; skipping this {req.__class__.__name__} " + f"when assembling Docker service mounts.") + continue + cluster_volume = req.fulfilled_access_at + category_subdir = req.category.name.lower() + dataset_name = req.fulfilled_by + mounts.append(f"{cluster_volume}:/dmod/cluster_volumes/{category_subdir}/{dataset_name}") + + # Allow this to not have been set (for now at least), but log a warning + if req.needs_data_local is None: + logging.warning(f"{req.__class__.__name__} fulfilled by '{req.fulfilled_by}' for '{job.job_id!s}' " + f"does not indicate explicitly whether the required data needs to be available " + f"locally during job execution.") + # For requirements that need local data, mount a local Docker volume for them + elif req.needs_data_local: + # TODO: (later/future) make sure something has checked to see that space is available on the nodes + local_volume = f"{dataset_name}_local_vol" + mounts.append(f"{local_volume}:/dmod/local_volumes/{category_subdir}/{dataset_name}") + #mounts.append('/local/model_as_a_service/docker_host_volumes/forcing_local:/dmod/datasets/forcing_local:rw') # Introduce a way to inject data access directly via env config, to potentially bypass things for testing bind_mount_from_env = getenv('DMOD_JOB_WORKER_HOST_MOUNT') @@ -748,9 +784,10 @@ def start_job(self, job: 'Job') -> Tuple[bool, tuple]: service_params.capabilities_to_add = ['SYS_ADMIN'] #TODO check for proper service creation, return False if doesn't work - service = self.create_service(serviceParams=service_params, idx=alloc_index, - docker_cmd_args=self._generate_docker_cmd_args(job, alloc_index)) - + cmd_args = self._generate_docker_cmd_args(job=job, worker_index=alloc_index, + primary_workers=per_node_primary_workers) + service = self.create_service(serviceParams=service_params, idx=alloc_index, docker_cmd_args=cmd_args) + service_per_allocation.append(service) logging.info("\n") From 1aabb622ccbf0420d85bc83d80e43caa1e64e685 Mon Sep 17 00:00:00 2001 From: Robert Bartel Date: Fri, 12 Jul 2024 09:33:29 -0400 Subject: [PATCH 05/21] Update ngen-related images to have mc client. Making MinIO CLI client available within ngen worker image and derivatives (e.g., calibration worker), though without a pre-configured alias for connected to the object store service. --- docker/main/ngen/Dockerfile | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/docker/main/ngen/Dockerfile b/docker/main/ngen/Dockerfile index fbc372433..2d9064b59 100644 --- a/docker/main/ngen/Dockerfile +++ b/docker/main/ngen/Dockerfile @@ -945,8 +945,16 @@ COPY --chown=${USER} --from=build_bmi_snow_17 /dmod/ /dmod/ COPY --chown=${USER} --from=build_customizations /dmod/ /dmod/ USER root -# Update path and make sure dataset directory is there -RUN echo "export PATH=${PATH}" >> /etc/profile \ +# TODO: (later) consider something like this in the future (at least optionally) when downloading the mc client below: +# ARG MINIO_CLIENT_RELEASE="RELEASE.2024-07-08T20-59-24Z" +# ... +# https://dl.min.io/client/mc/release/linux-amd64/archive/mc.${MINIO_CLIENT_RELEASE} + +# Setup minio client; also update path and make sure dataset directory is there +RUN curl -L -o /dmod/bin/mc https://dl.min.io/client/mc/release/linux-amd64/mc \ + && chmod +x /dmod/bin/mc \ + && mkdir /dmod/.mc \ + && echo "export PATH=${PATH}" >> /etc/profile \ && sed -i "s/PasswordAuthentication yes/#PasswordAuthentication yes/g" /etc/ssh/sshd_config \ && sed -i "s/PermitRootLogin yes/PermitRootLogin no/g" /etc/ssh/sshd_config \ && sed -i "s/#ClientAliveInterval.*/ClientAliveInterval 60/" /etc/ssh/sshd_config \ From 8aff1c24cbdfe0a4edb7c77fd0612dc63ee7b9d5 Mon Sep 17 00:00:00 2001 From: Robert Bartel Date: Fri, 12 Jul 2024 09:37:29 -0400 Subject: [PATCH 06/21] Update worker Python functions to make data local. Adding functionality to py_funcs.py to support making DMOD dataset data local (not just be locally accessible from remote storage). --- docker/main/ngen/py_funcs.py | 200 ++++++++++++++++++++++++++++++++++- 1 file changed, 198 insertions(+), 2 deletions(-) diff --git a/docker/main/ngen/py_funcs.py b/docker/main/ngen/py_funcs.py index 676d7fbc0..82a995682 100644 --- a/docker/main/ngen/py_funcs.py +++ b/docker/main/ngen/py_funcs.py @@ -1,16 +1,23 @@ #!/usr/bin/env python3 import argparse +import json import logging import os import shutil import tarfile +import concurrent.futures +import subprocess from datetime import datetime from enum import Enum from pathlib import Path from subprocess import Popen -from typing import Dict, List, Literal, Optional +from typing import Dict, List, Literal, Optional, Set, Tuple + + +def get_dmod_date_str_pattern() -> str: + return '%Y-%m-%d,%H:%M:%S' class ArchiveStrategy(Enum): @@ -56,6 +63,15 @@ def _subparse_move_to_directory(parent_subparser_container): sub_cmd_parser.add_argument("dest_dir", type=Path, help="Destination directory to which to move the output") +def _parse_for_make_data_local(parent_subparsers_container): + # A parser for the 'tar_and_copy' param itself, underneath the parent 'command' subparsers container + desc = "If a primary worker, copy/extract to make dataset data locally available on physical node" + helper_cmd_parser = parent_subparsers_container.add_parser('make_data_local', description=desc) + helper_cmd_parser.add_argument('worker_index', type=int, help='The index of this particular worker.') + helper_cmd_parser.add_argument('primary_workers', type=lambda s: {int(i) for i in s.split(',')}, + help='Comma-delimited string of primary worker indices.') + + def _parse_for_move_job_output(parent_subparsers_container): # A parser for the 'tar_and_copy' param itself, underneath the parent 'command' subparsers container desc = "Move output data files produced by a job to another location, typically to put them into a DMOD dataset." @@ -92,10 +108,91 @@ def _parse_args() -> argparse.Namespace: _parse_for_tar_and_copy(parent_subparsers_container=subparsers) _parse_for_gather_output(parent_subparsers_container=subparsers) _parse_for_move_job_output(parent_subparsers_container=subparsers) + _parse_for_make_data_local(parent_subparsers_container=subparsers) return parser.parse_args() +def _get_serial_dataset_dict(serialized_ds_file: Path) -> dict: + with serialized_ds_file.open() as s_file: + return json.loads(s_file.read()) + + +def _make_dataset_dir_local(local_data_dir: Path, do_optimized_object_store_copy: bool): + """ + Make the data in corresponding remotely-backed dataset directory local by placing in a local directory. + + Make a local, optimized copy of data from a dataset, where the data is also locally accessible/mounted but actually + stored elsewhere, making it less optimal for use by the worker. + + Function examines the serialized dataset file of the source directory and, if already present (indicating this dest + directory has been set up before), the analogous serialized dataset file in the destination directory. First, if + there is `dest_dir` version of this file, and if it has the same ``last_updated`` value, the function considers the + `dest_dir` contents to already be in sync with the `src_dir` simply returns. Second, it examines whether archiving + was used for the entire dataset, and then either extracts or copies data to the local volume as appropriate. + + Note that the function does alter the name of the serialized dataset file on the `dest_dir` side, primarily as an + indication that this is meant as a local copy of data, but not a full-fledge DMOD dataset. It also allows for a + final deliberate step of renaming (or copying with a different name) this file, which ensures the checked + ``last_updated`` value on the `dest_dir` side will have not been updated before a successful sync of the actual data + was completed. + + Parameters + ---------- + local_data_dir + Storage directory, locally available on this worker's host node, in which to copy/extract data. + do_optimized_object_store_copy + Whether to do an optimized copy for object store dataset data using the MinIO client (via a subprocess call). + """ + # TODO: (later) eventually source several details of this this from other part of the code + + dataset_vol_dir = get_cluster_volumes_root_directory().joinpath(local_data_dir.parent).joinpath(local_data_dir.name) + + local_serial_file = local_data_dir.joinpath(".ds_serial_state.json") + dataset_serial_file = dataset_vol_dir.joinpath(f"{dataset_vol_dir.name}_serialized.json") + + # Both should exist + if not dataset_vol_dir.is_dir(): + raise RuntimeError(f"Can't make data local from dataset mount path '{dataset_vol_dir!s}': not a directory") + elif not local_data_dir.is_dir(): + raise RuntimeError(f"Can't make data from '{dataset_vol_dir!s}' local: '{local_data_dir!s}' is not a directory") + # Also, dataset dir should not be empty + elif len([f for f in dataset_vol_dir.glob("*")]) == 0: + raise RuntimeError(f"Can't make data local from '{dataset_vol_dir!s}' local because it is empty") + + serial_ds_dict = _get_serial_dataset_dict(dataset_serial_file) + + # If dest_dir is not brand new and has something in it, check to make sure it isn't already as it needs to be + if local_serial_file.exists(): + prev_ds_dict = _get_serial_dataset_dict(local_serial_file) + current_last_updated = datetime.strptime(serial_ds_dict["last_updated"], get_dmod_date_str_pattern()) + prev_last_updated = datetime.strptime(prev_ds_dict["last_updated"], get_dmod_date_str_pattern()) + if prev_last_updated == current_last_updated: + logging.info(f"'{local_data_dir!s}' already shows most recent 'last_updated'; skipping redundant copy") + return + + # Determine if need to extract + if serial_ds_dict.get("data_archiving", True): + # Identify and extract archive + src_archive_file = [f for f in dataset_vol_dir.glob(f"{dataset_vol_dir.name}_archived*")][0] + archive_file = local_data_dir.joinpath(src_archive_file.name) + shutil.copy2(src_archive_file, archive_file) + shutil.unpack_archive(archive_file, local_data_dir) + archive_file.unlink(missing_ok=True) + # Also manually copy serialized state file (do last) + shutil.copy2(dataset_serial_file, local_serial_file) + # Need to optimize by using minio client directly here when dealing with OBJECT_STORE dataset, or will take 10x time + # TODO: (later) this is a bit of a hack, though a necessary one; find a way to integrate more elegantly + elif do_optimized_object_store_copy and serial_ds_dict["type"] == "OBJECT_STORE": + subprocess.run(["mc", "cp", "--config-dir", "/dmod/.mc", "-r", f"minio/{local_data_dir.name}/", f"{local_data_dir}/."]) + else: + # Otherwise copy contents + shutil.copy2(dataset_vol_dir, local_data_dir) + # Rename the copied serialized state file in the copy as needed + # But do this last to confirm directory contents are never more up-to-date with last_updated than expected + local_data_dir.joinpath(dataset_serial_file.name).rename(local_serial_file) + + def _move_to_directory(source_dir: Path, dest_dir: Path, archive_name: Optional[str] = None): """ Move source data files from their initial directory to a different directory, potentially combining into an archive. @@ -124,6 +221,14 @@ def _move_to_directory(source_dir: Path, dest_dir: Path, archive_name: Optional[ shutil.move(p, dest_dir) +def _parse_docker_secret(secret_name: str) -> str: + return Path("/run/secrets", secret_name).read_text().strip() + + +def _parse_object_store_secrets() -> Tuple[str, str]: + return _parse_docker_secret('object_store_exec_user_name'), _parse_docker_secret('object_store_exec_user_passwd') + + def gather_output(mpi_host_names: List[str], output_write_dir: Path): """ Using subprocesses, gather output from remote MPI hosts and collect in the analogous directory on this host. @@ -151,6 +256,95 @@ def gather_output(mpi_host_names: List[str], output_write_dir: Path): f"{error_in_bytes.decode()}") +def get_cluster_volumes_root_directory() -> Path: + """ + Get the root directory for cluster volumes (i.e., backed by dataset directly, synced cluster-wide) on this worker. + + Returns + ------- + Path + The root directory for cluster volumes on this worker. + """ + return Path("/dmod/cluster_volumes") + + +def get_local_volumes_root_directory() -> Path: + """ + Get the root directory for local volumes (i.e., local to physical node, share by all node's workers) on this worker. + + Returns + ------- + Path + The root directory for local volumes on this worker. + """ + return Path("/dmod/local_volumes") + + +def make_data_local(worker_index: int, primary_workers: Set[int]): + """ + Make data local for each local volume mount that exists, but only if this worker is a primary. + + Copy or extract data from mounted volumes/directories directly backed by DMOD datasets (i.e., "cluster volumes") to + corresponding directories local to the physical node (i.e. "local volumes"), for any such local directories found to + exist. An important distinction is that a local volume is local to the physical node and not the worker itself, and + thus is shared by all workers on that node. As such, return immediately without performing any actions if this + worker is not considered a "primary" worker, so that only one worker per node manipulates data. + + Function (for a primary worker) iterates through the local volume subdirectory paths to see if any local volumes + were set up when the worker was created. For any that are found, the function ensures data from the corresponding, + dataset-backed cluster volume directory is replicated in the local volume directory. + + Parameters + ---------- + worker_index + This worker's index. + primary_workers + Indices of designated primary workers + + See Also + -------- + _make_dataset_dir_local + """ + if worker_index not in primary_workers: + return + + cluster_vol_dir = get_cluster_volumes_root_directory() + local_vol_dir = get_local_volumes_root_directory() + expected_subdirs = {"config", "forcing", "hydrofabric", "observation", "output"} + + if not cluster_vol_dir.exists(): + raise RuntimeError(f"Can't make data local: cluster volume root '{cluster_vol_dir!s}' does not exist") + if not cluster_vol_dir.is_dir(): + raise RuntimeError(f"Can't make data local: cluster volume root '{cluster_vol_dir!s}' is not a directory") + if not local_vol_dir.exists(): + raise RuntimeError(f"Can't make data local: local volume root '{local_vol_dir!s}' does not exist") + if not local_vol_dir.is_dir(): + raise RuntimeError(f"Can't make data local: local volume root '{local_vol_dir!s}' is not a directory") + + try: + obj_store_access_key, obj_store_secret_key = _parse_object_store_secrets() + mc_ls_result = subprocess.run(["mc", "alias", "--config-dir", "/dmod/.mc", "ls", "minio"]) + if mc_ls_result.returncode != 0: + subprocess.run(["mc", "alias", "--config-dir", "/dmod/.mc", "set", "minio", obj_store_access_key, obj_store_secret_key]) + do_optimized_object_store_copy = True + except Exception as e: + logging.warning(f"Unable to parse secrets for optimized MinIO local data copying: {e!s}") + do_optimized_object_store_copy = False + + # Use some multi-threading here since this is IO heavy + futures = set() + with concurrent.futures.ThreadPoolExecutor(max_workers=5) as pool: + for type_dir in (td for td in local_vol_dir.glob("*") if td.is_dir()): + if not cluster_vol_dir.joinpath(type_dir.name).is_dir(): + raise RuntimeError(f"Directory '{type_dir!s}' does not have analog in '{cluster_vol_dir!s}'") + if type_dir.name not in expected_subdirs: + logging.warning(f"Found unexpected (but matching) local volume data type subdirectory {type_dir.name}") + for local_ds_dir in (d for d in type_dir.glob("*") if d.is_dir()): + futures.add(pool.submit(_make_dataset_dir_local, local_ds_dir, do_optimized_object_store_copy)) + for future in futures: + future.result() + + def get_date_str() -> str: """ Get the current date and time as a string with format ``%Y-%m-%d,%H:%M:%S`` @@ -159,7 +353,7 @@ def get_date_str() -> str: ------- The current date and time as a string. """ - return datetime.now().strftime('%Y-%m-%d,%H:%M:%S') + return datetime.now().strftime(get_dmod_date_str_pattern()) def move_job_output(output_directory: Path, move_action: str, archiving: ArchiveStrategy = ArchiveStrategy.DYNAMIC, @@ -314,6 +508,8 @@ def main(): gather_output(mpi_host_names=[h for h in mpi_host_to_nproc_map], output_write_dir=args.output_write_dir) elif args.command == 'move_job_output': move_job_output(**(vars(args))) + elif args.command == 'make_data_local': + make_data_local(**(vars(args))) else: raise RuntimeError(f"Command arg '{args.command}' doesn't match a command supported by module's main function") From 13b61a503b798a1cc429ad68b7f420a1d8ce0ce7 Mon Sep 17 00:00:00 2001 From: Robert Bartel Date: Fri, 12 Jul 2024 09:37:47 -0400 Subject: [PATCH 07/21] Update worker entrypoints for local data. Updating main entrypoint scripts for ngen and calibration worker images for local data handling. --- docker/main/ngen/ngen_cal_entrypoint.sh | 8 ++++++++ docker/main/ngen/ngen_entrypoint.sh | 8 ++++++++ 2 files changed, 16 insertions(+) diff --git a/docker/main/ngen/ngen_cal_entrypoint.sh b/docker/main/ngen/ngen_cal_entrypoint.sh index 2483f9855..1a843f27f 100755 --- a/docker/main/ngen/ngen_cal_entrypoint.sh +++ b/docker/main/ngen/ngen_cal_entrypoint.sh @@ -39,6 +39,10 @@ while [ ${#} -gt 0 ]; do declare -x CALIBRATION_CONFIG_BASENAME="${2:?}" shift ;; + --primary-workers) + declare -x PRIMARY_WORKERS="${2:?}" + shift + ;; esac shift done @@ -101,6 +105,10 @@ start_calibration() { return ${NGEN_RETURN} } +# Run make_data_local Python functions to make necessary data local +# Called for every worker, but Python code will make sure only one worker per node makes a call that has effect +py_funcs make_data_local ${WORKER_INDEX:-0} ${PRIMARY_WORKERS:-0} + # We can allow worker index to not be supplied when executing serially if [ "${WORKER_INDEX:-0}" = "0" ]; then if [ "$(whoami)" = "${MPI_USER:?MPI user not defined}" ]; then diff --git a/docker/main/ngen/ngen_entrypoint.sh b/docker/main/ngen/ngen_entrypoint.sh index 0278fd640..0e43f02af 100755 --- a/docker/main/ngen/ngen_entrypoint.sh +++ b/docker/main/ngen/ngen_entrypoint.sh @@ -35,6 +35,10 @@ while [ ${#} -gt 0 ]; do declare -x WORKER_INDEX="${2:?}" shift ;; + --primary-workers) + declare -x PRIMARY_WORKERS="${2:?}" + shift + ;; esac shift done @@ -60,6 +64,10 @@ if [ ! -e /dmod/datasets/linked_job_output ]; then ln -s ${JOB_OUTPUT_WRITE_DIR} /dmod/datasets/linked_job_output fi +# Run make_data_local Python functions to make necessary data local +# Called for every worker, but Python code will make sure only one worker per node makes a call that has effect +py_funcs make_data_local ${WORKER_INDEX:-0} ${PRIMARY_WORKERS:-0} + # We can allow worker index to not be supplied when executing serially if [ "${WORKER_INDEX:-0}" = "0" ]; then if [ "$(whoami)" = "${MPI_USER:?MPI user not defined}" ]; then From e163b618f16e594a3162615b3298a591459fe02d Mon Sep 17 00:00:00 2001 From: Robert Bartel Date: Thu, 15 Aug 2024 11:36:06 -0400 Subject: [PATCH 08/21] Fix fast dev update script GUI handling. Fixing script so that GUI services do not get stopped and updated unless that is actually asked for with the available CLI option. --- scripts/prep_fast_dev_update.sh | 34 +++++++++++++++------------------ 1 file changed, 15 insertions(+), 19 deletions(-) diff --git a/scripts/prep_fast_dev_update.sh b/scripts/prep_fast_dev_update.sh index 79200e47c..c1cc8fb1c 100755 --- a/scripts/prep_fast_dev_update.sh +++ b/scripts/prep_fast_dev_update.sh @@ -139,19 +139,23 @@ while [ ${#} -gt 0 ]; do shift done +# Always stop GUI when GUI update flag is set, regardless of whether set for safe running or to deploy +if [ -n "${DO_GUI:-}" ]; then + if ${CONTROL_SCRIPT} ${GUI_STACK_NAME} check > /dev/null; then + ${CONTROL_SCRIPT} ${GUI_STACK_NAME} stop + STOPPED_GUI_FOR_REBUILD="true" + sleep 1 + fi +fi + # Make sure nothing is running if it doesn't need to be, bailing or stopping it as appropriate if [ -n "${RUN_SAFE:-}" ]; then if ${CONTROL_SCRIPT} ${PRIMARY_STACK_NAME} check > /dev/null; then >&2 echo "Error: option for safe mode active and found primary '${PRIMARY_STACK_NAME}' stack running; exiting." exit 1 fi +# If deploying, and stack is running, make sure to stop it elif [ -n "${DO_DEPLOY:-}" ]; then - if ${CONTROL_SCRIPT} ${GUI_STACK_NAME} check > /dev/null; then - ${CONTROL_SCRIPT} ${GUI_STACK_NAME} stop - STOPPED_GUI_FOR_REBUILD="true" - sleep 1 - fi - if ${CONTROL_SCRIPT} ${PRIMARY_STACK_NAME} check > /dev/null; then ${CONTROL_SCRIPT} ${PRIMARY_STACK_NAME} stop echo "Waiting for services to stop ..." @@ -159,15 +163,6 @@ elif [ -n "${DO_DEPLOY:-}" ]; then fi fi -if [ -n "${DO_GUI:-}" ]; then - if [ -z "${STOPPED_GUI_FOR_REBUILD:-}" ]; then - if ${CONTROL_SCRIPT} ${GUI_STACK_NAME} check > /dev/null; then - ${CONTROL_SCRIPT} ${GUI_STACK_NAME} stop - STOPPED_GUI_FOR_REBUILD="true" - fi - fi -fi - # Prepare a Docker volume for the dmod Python packages, removing any existing # Do in background so other things can be done try_clean_volume & @@ -178,16 +173,17 @@ if [ -n "${JUST_REMOVE_VOLUME:-}" ]; then exit fi -# Build updated py-sources image; if requested, build everything, but by default, just build the last image +# Build and push updated py-sources image; if requested, build everything, but by default, just build the last image if [ -n "${DO_FULL_BUILD:-}" ]; then - ${CONTROL_SCRIPT} ${PY_PACKAGES_STACK_NAME} build + ${CONTROL_SCRIPT} ${PY_PACKAGES_STACK_NAME} build push else ${CONTROL_SCRIPT} --build-args "${PY_PACKAGES_LAST_SERVICE_NAME}" ${PY_PACKAGES_STACK_NAME} build + ${CONTROL_SCRIPT} ${PY_PACKAGES_STACK_NAME} push fi -if [ -n "${STOPPED_GUI_FOR_REBUILD:-}${DO_GUI:-}" ]; then +if [ -n "${DO_GUI:-}" ]; then echo "Rebuilding nwm_gui stack app service image" - ${CONTROL_SCRIPT} ${GUI_STACK_NAME} build & + ${CONTROL_SCRIPT} ${GUI_STACK_NAME} build push & _REBUILD_GUI_IMAGES_PID=$! fi From 1f4fa7bc4c05e3d0656d47ab91f6d6d0319ec728 Mon Sep 17 00:00:00 2001 From: Robert Bartel Date: Thu, 15 Aug 2024 11:38:06 -0400 Subject: [PATCH 09/21] Move call to make_data_local in entrypoints. Moving call to this Python function so that it happens before sanity checks (at the entrypoint level) ensuring dataset directories exist, as they won't exist until any data is made local. --- docker/main/ngen/ngen_cal_entrypoint.sh | 8 ++++---- docker/main/ngen/ngen_entrypoint.sh | 8 ++++---- 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/docker/main/ngen/ngen_cal_entrypoint.sh b/docker/main/ngen/ngen_cal_entrypoint.sh index 1a843f27f..17a9f1c90 100755 --- a/docker/main/ngen/ngen_cal_entrypoint.sh +++ b/docker/main/ngen/ngen_cal_entrypoint.sh @@ -53,6 +53,10 @@ declare -x JOB_OUTPUT_WRITE_DIR="/tmp/job_output" # Get some universally applicable functions and constants source ./funcs.sh +# Run make_data_local Python functions to make necessary data local +# Called for every worker, but Python code will make sure only one worker per node makes a call that has effect +py_funcs make_data_local ${WORKER_INDEX:-0} ${PRIMARY_WORKERS:-0} + ngen_sanity_checks_and_derived_init init_script_mpi_vars init_ngen_executable_paths @@ -105,10 +109,6 @@ start_calibration() { return ${NGEN_RETURN} } -# Run make_data_local Python functions to make necessary data local -# Called for every worker, but Python code will make sure only one worker per node makes a call that has effect -py_funcs make_data_local ${WORKER_INDEX:-0} ${PRIMARY_WORKERS:-0} - # We can allow worker index to not be supplied when executing serially if [ "${WORKER_INDEX:-0}" = "0" ]; then if [ "$(whoami)" = "${MPI_USER:?MPI user not defined}" ]; then diff --git a/docker/main/ngen/ngen_entrypoint.sh b/docker/main/ngen/ngen_entrypoint.sh index 0e43f02af..0fe32c38c 100755 --- a/docker/main/ngen/ngen_entrypoint.sh +++ b/docker/main/ngen/ngen_entrypoint.sh @@ -49,6 +49,10 @@ declare -x JOB_OUTPUT_WRITE_DIR="/tmp/job_output" # Get some universally applicable functions and constants source /ngen/funcs.sh +# Run make_data_local Python functions to make necessary data local +# Called for every worker, but Python code will make sure only one worker per node makes a call that has effect +py_funcs make_data_local ${WORKER_INDEX:-0} ${PRIMARY_WORKERS:-0} + ngen_sanity_checks_and_derived_init init_script_mpi_vars init_ngen_executable_paths @@ -64,10 +68,6 @@ if [ ! -e /dmod/datasets/linked_job_output ]; then ln -s ${JOB_OUTPUT_WRITE_DIR} /dmod/datasets/linked_job_output fi -# Run make_data_local Python functions to make necessary data local -# Called for every worker, but Python code will make sure only one worker per node makes a call that has effect -py_funcs make_data_local ${WORKER_INDEX:-0} ${PRIMARY_WORKERS:-0} - # We can allow worker index to not be supplied when executing serially if [ "${WORKER_INDEX:-0}" = "0" ]; then if [ "$(whoami)" = "${MPI_USER:?MPI user not defined}" ]; then From 72067dab51026d89142586c4de22ef3d624f6b43 Mon Sep 17 00:00:00 2001 From: Robert Bartel Date: Thu, 15 Aug 2024 11:52:01 -0400 Subject: [PATCH 10/21] Fix issues w/ use of separate cluster/local data. - Order minio client args properly (config dir must come first) - Cleanup output handling during minio client subprocess - Correct a few logical mistakes with how conditionals should behave - Fix issue with path object creation when copying from cluster volume - Adding some helpful logging messages - Make sure we actually create symlinks --- docker/main/ngen/py_funcs.py | 86 +++++++++++++++++++++++++++++++++--- 1 file changed, 79 insertions(+), 7 deletions(-) diff --git a/docker/main/ngen/py_funcs.py b/docker/main/ngen/py_funcs.py index 82a995682..a8c15c726 100644 --- a/docker/main/ngen/py_funcs.py +++ b/docker/main/ngen/py_funcs.py @@ -16,6 +16,9 @@ from typing import Dict, List, Literal, Optional, Set, Tuple +MINIO_ALIAS_NAME = "minio" + + def get_dmod_date_str_pattern() -> str: return '%Y-%m-%d,%H:%M:%S' @@ -146,7 +149,7 @@ def _make_dataset_dir_local(local_data_dir: Path, do_optimized_object_store_copy """ # TODO: (later) eventually source several details of this this from other part of the code - dataset_vol_dir = get_cluster_volumes_root_directory().joinpath(local_data_dir.parent).joinpath(local_data_dir.name) + dataset_vol_dir = get_cluster_volumes_root_directory().joinpath(local_data_dir.parent.name).joinpath(local_data_dir.name) local_serial_file = local_data_dir.joinpath(".ds_serial_state.json") dataset_serial_file = dataset_vol_dir.joinpath(f"{dataset_vol_dir.name}_serialized.json") @@ -172,7 +175,7 @@ def _make_dataset_dir_local(local_data_dir: Path, do_optimized_object_store_copy return # Determine if need to extract - if serial_ds_dict.get("data_archiving", True): + if serial_ds_dict.get("data_archiving", False): # Identify and extract archive src_archive_file = [f for f in dataset_vol_dir.glob(f"{dataset_vol_dir.name}_archived*")][0] archive_file = local_data_dir.joinpath(src_archive_file.name) @@ -184,7 +187,12 @@ def _make_dataset_dir_local(local_data_dir: Path, do_optimized_object_store_copy # Need to optimize by using minio client directly here when dealing with OBJECT_STORE dataset, or will take 10x time # TODO: (later) this is a bit of a hack, though a necessary one; find a way to integrate more elegantly elif do_optimized_object_store_copy and serial_ds_dict["type"] == "OBJECT_STORE": - subprocess.run(["mc", "cp", "--config-dir", "/dmod/.mc", "-r", f"minio/{local_data_dir.name}/", f"{local_data_dir}/."]) + alias_src_path = f"{MINIO_ALIAS_NAME}/{local_data_dir.name}/" + logging.info(f"Copying data from '{alias_src_path}' to '{local_data_dir}'") + subprocess.run(["mc", "--config-dir", "/dmod/.mc", "cp", "-r", alias_src_path, f"{local_data_dir}/."], + stdout=subprocess.DEVNULL, + stderr=subprocess.DEVNULL) + logging.info(f"Local copying from '{alias_src_path}' complete") else: # Otherwise copy contents shutil.copy2(dataset_vol_dir, local_data_dir) @@ -256,6 +264,18 @@ def gather_output(mpi_host_names: List[str], output_write_dir: Path): f"{error_in_bytes.decode()}") +def get_data_exec_root_directory() -> Path: + """ + Get the root directory path to use for reading and writing dataset data during job execution. + + Returns + ------- + Path + The root directory path to use for reading and writing dataset data during job execution. + """ + return Path("/dmod/datasets") + + def get_cluster_volumes_root_directory() -> Path: """ Get the root directory for cluster volumes (i.e., backed by dataset directly, synced cluster-wide) on this worker. @@ -267,6 +287,17 @@ def get_cluster_volumes_root_directory() -> Path: """ return Path("/dmod/cluster_volumes") +def get_expected_data_category_subdirs() -> Set[str]: + """ + Get names of expected subdirectories for dataset categories underneath directories like local or cluster volumes. + + Returns + ------- + Set[str] + Names of expected subdirectories for dataset categories underneath directories like local or cluster volumes. + """ + return {"config", "forcing", "hydrofabric", "observation", "output"} + def get_local_volumes_root_directory() -> Path: """ @@ -280,7 +311,29 @@ def get_local_volumes_root_directory() -> Path: return Path("/dmod/local_volumes") -def make_data_local(worker_index: int, primary_workers: Set[int]): +def link_to_data_exec_root(): + """ + Create symlinks into the data exec root for any dataset subdirectories in, e.g., cluster or local data mounts. + + Function iterates through data mount roots for data (e.g., local volumes, cluster volumes) according to priority of + their use (i.e., local volume data should be used before an analogous cluster volume copy). If nothing for that + dataset category and name exists under the data exec root (from ::function:`get_data_exec_root_directory`), then + a symlink is created. + """ + # Note that order here is important; prioritize local data if it is there + data_symlink_sources = [get_local_volumes_root_directory(), get_cluster_volumes_root_directory()] + for dir in data_symlink_sources: + # At this level, order isn't as important + for category_subdir in get_expected_data_category_subdirs(): + for dataset_dir in [d for d in dir.joinpath(category_subdir).glob('*') if d.is_dir()]: + data_exec_analog = get_data_exec_root_directory().joinpath(category_subdir).joinpath(dataset_dir.name) + if not data_exec_analog.exists(): + logging.info(f"Creating dataset symlink with source '{dataset_dir!s}'") + os.symlink(dataset_dir, data_exec_analog) + logging.info(f"Symlink created at dest '{data_exec_analog!s}'") + + +def make_data_local(worker_index: int, primary_workers: Set[int], **kwargs): """ Make data local for each local volume mount that exists, but only if this worker is a primary. @@ -300,17 +353,23 @@ def make_data_local(worker_index: int, primary_workers: Set[int]): This worker's index. primary_workers Indices of designated primary workers + kwargs + Other ignored keyword args. See Also -------- _make_dataset_dir_local """ + + # Every work does need to do this, though + link_to_data_exec_root() + if worker_index not in primary_workers: return cluster_vol_dir = get_cluster_volumes_root_directory() local_vol_dir = get_local_volumes_root_directory() - expected_subdirs = {"config", "forcing", "hydrofabric", "observation", "output"} + expected_subdirs = get_expected_data_category_subdirs() if not cluster_vol_dir.exists(): raise RuntimeError(f"Can't make data local: cluster volume root '{cluster_vol_dir!s}' does not exist") @@ -323,15 +382,28 @@ def make_data_local(worker_index: int, primary_workers: Set[int]): try: obj_store_access_key, obj_store_secret_key = _parse_object_store_secrets() - mc_ls_result = subprocess.run(["mc", "alias", "--config-dir", "/dmod/.mc", "ls", "minio"]) + logging.info(f"Executing test run of minio client to see if alias '{MINIO_ALIAS_NAME}' already exists") + mc_ls_result = subprocess.run(["mc", "--config-dir", "/dmod/.mc", "alias", "ls", MINIO_ALIAS_NAME]) + logging.debug(f"Return code from alias check was {mc_ls_result.returncode!s}") if mc_ls_result.returncode != 0: - subprocess.run(["mc", "alias", "--config-dir", "/dmod/.mc", "set", "minio", obj_store_access_key, obj_store_secret_key]) + logging.info(f"Creating new minio alias '{MINIO_ALIAS_NAME}'") + # TODO: (later) need to set value for obj_store_url better than just hardcoding it + obj_store_url = "http://minio-proxy:9000" + subprocess.run(["mc", "--config-dir", "/dmod/.mc", "alias", "set", MINIO_ALIAS_NAME, obj_store_url, + obj_store_access_key, obj_store_secret_key]) + logging.info(f"Now rechecking minio client test for '{MINIO_ALIAS_NAME}' alias") + mc_ls_result_2 = subprocess.run(["mc", "--config-dir", "/dmod/.mc", "alias", "ls", MINIO_ALIAS_NAME]) + if mc_ls_result_2.returncode != 0: + raise RuntimeError(f"Could not successfully create minio alias '{MINIO_ALIAS_NAME}'") do_optimized_object_store_copy = True + except RuntimeError as e: + raise e except Exception as e: logging.warning(f"Unable to parse secrets for optimized MinIO local data copying: {e!s}") do_optimized_object_store_copy = False # Use some multi-threading here since this is IO heavy + logging.info(f"Performing local data copying using multiple threads") futures = set() with concurrent.futures.ThreadPoolExecutor(max_workers=5) as pool: for type_dir in (td for td in local_vol_dir.glob("*") if td.is_dir()): From 291953361c75e0aa100a9dae2e8a89ae7b02d40f Mon Sep 17 00:00:00 2001 From: Robert Bartel Date: Fri, 16 Aug 2024 13:24:02 -0400 Subject: [PATCH 11/21] Fix more issues with py_funcs functions. - Fixing handling of symlink for output dataset so it points to cluster volume as needed (i.e., so output can actually make it out of the worker) - Fixing some issues with keyword args coming in from CLI that certain functions weren't set up to disregard properly - Adding a bit more helpful logging in places --- docker/main/ngen/py_funcs.py | 47 ++++++++++++++++++++++++++++++++---- 1 file changed, 42 insertions(+), 5 deletions(-) diff --git a/docker/main/ngen/py_funcs.py b/docker/main/ngen/py_funcs.py index a8c15c726..4b7547712 100644 --- a/docker/main/ngen/py_funcs.py +++ b/docker/main/ngen/py_funcs.py @@ -221,10 +221,10 @@ def _move_to_directory(source_dir: Path, dest_dir: Path, archive_name: Optional[ raise ValueError(f"{get_date_str()} Can't move job output to non-directory path {dest_dir!s}!") if archive_name: - logging.info("Archiving output files to output dataset") + logging.info(f"Archiving output files to output dataset directory '{dest_dir!s}'") tar_and_copy(source=source_dir, dest=dest_dir, archive_name=archive_name) else: - logging.info("Moving output file(s) to output dataset") + logging.info("Moving output file(s) to output dataset directory '{dest_dir!s}'") for p in source_dir.glob("*"): shutil.move(p, dest_dir) @@ -287,6 +287,7 @@ def get_cluster_volumes_root_directory() -> Path: """ return Path("/dmod/cluster_volumes") + def get_expected_data_category_subdirs() -> Set[str]: """ Get names of expected subdirectories for dataset categories underneath directories like local or cluster volumes. @@ -311,7 +312,7 @@ def get_local_volumes_root_directory() -> Path: return Path("/dmod/local_volumes") -def link_to_data_exec_root(): +def link_to_data_exec_root(exceptions: Optional[List[Path]] = None): """ Create symlinks into the data exec root for any dataset subdirectories in, e.g., cluster or local data mounts. @@ -319,7 +320,33 @@ def link_to_data_exec_root(): their use (i.e., local volume data should be used before an analogous cluster volume copy). If nothing for that dataset category and name exists under the data exec root (from ::function:`get_data_exec_root_directory`), then a symlink is created. + + Exceptions to the regular prioritization can be provided. By default (or whenever ``execptions`` is ``None``), + all ``output`` category/directory datasets will have their symlinks backed by cluster volumes over local volumes. To + avoid any such exceptions and strictly follow the general priority rules, an empty list can be explicitly passed. + + Parameters + ---------- + exceptions + An optional list of exceptions to the general setup rules to create a symlink in the data exec root before + anything else. + """ + if exceptions is None: + exceptions = [d for d in get_cluster_volumes_root_directory().joinpath('output').glob('*') if d.is_dir()] + + for d in exceptions: + data_exec_analog = get_data_exec_root_directory().joinpath(d.parent.name).joinpath(d.name) + if data_exec_analog.exists(): + if data_exec_analog.is_symlink(): + logging.warning(f"Overwriting previous symlink at '{data_exec_analog!s}' pointing to '{data_exec_analog.readlink()!s}") + else: + logging.warning(f"Overwriting previous contents at '{data_exec_analog}' with new symlink") + else: + logging.info(f"Creating dataset symlink with source '{d!s}'") + os.symlink(d, data_exec_analog) + logging.info(f"Symlink created at dest '{data_exec_analog!s}'") + # Note that order here is important; prioritize local data if it is there data_symlink_sources = [get_local_volumes_root_directory(), get_cluster_volumes_root_directory()] for dir in data_symlink_sources: @@ -415,6 +442,7 @@ def make_data_local(worker_index: int, primary_workers: Set[int], **kwargs): futures.add(pool.submit(_make_dataset_dir_local, local_ds_dir, do_optimized_object_store_copy)) for future in futures: future.result() + logging.info(f"Local data copying complete") def get_date_str() -> str: @@ -480,7 +508,9 @@ def move_job_output(output_directory: Path, move_action: str, archiving: Archive archive_name = None if move_action == "to_directory": - _move_to_directory(source_dir=output_directory, dest_dir=kwargs["dest_dir"], archive_name=archive_name) + dest_dir = kwargs["dest_dir"] + logging.info(f"Moving output from '{output_directory!s}' to '{dest_dir!s}'") + _move_to_directory(source_dir=output_directory, dest_dir=dest_dir, archive_name=archive_name) else: raise RuntimeError(f"{get_date_str()} Invalid CLI move action {move_action}") @@ -512,7 +542,7 @@ def process_mpi_hosts_string(hosts_string: str, hosts_sep: str = ",", host_detai return results -def tar_and_copy(source: Path, dest: Path, archive_name: str, do_dry_run: bool = False, do_compress: bool = False): +def tar_and_copy(source: Path, dest: Path, archive_name: str, do_dry_run: bool = False, do_compress: bool = False, **kwargs): """ Make a tar archive from the contents of a directory, and place this in a specified destination. @@ -528,6 +558,8 @@ def tar_and_copy(source: Path, dest: Path, archive_name: str, do_dry_run: bool = Whether to only perform a dry run to check paths, with no archiving/moving/copying. do_compress Whether to compress the created archive with gzip compression. + kwargs + Other unused keyword args. Raises ------- @@ -560,12 +592,17 @@ def tar_and_copy(source: Path, dest: Path, archive_name: str, do_dry_run: bool = return tar_mode_args = "w:gz" if do_compress else "w" + logging.info(f"Creating archive file '{archive_create_path!s}'") with tarfile.open(archive_create_path, tar_mode_args) as tar: for p in source.glob("*"): + logging.debug(f"Adding '{p!s}' to archive") tar.add(p, arcname=p.name) if archive_create_path != final_archive_path: + logging.info(f"Moving archive to final location at '{final_archive_path!s}'") shutil.move(archive_create_path, final_archive_path) + else: + logging.info(f"Archive creation complete and at final location") def main(): From 088b35c32976aa4e63d5eaea989e8259aea3700c Mon Sep 17 00:00:00 2001 From: Robert Bartel Date: Fri, 16 Aug 2024 13:30:36 -0400 Subject: [PATCH 12/21] Update worker entrypoints for permissions issues. Adding logic and reordering certain things to make sure that, given local writing initially of job outputs, etc., that process to then move the results to backing dataset storage works properly and does not run into permissions issues. --- docker/main/ngen/ngen_cal_entrypoint.sh | 55 +++++++++++----- docker/main/ngen/ngen_entrypoint.sh | 86 +++++++++++++++---------- 2 files changed, 89 insertions(+), 52 deletions(-) diff --git a/docker/main/ngen/ngen_cal_entrypoint.sh b/docker/main/ngen/ngen_cal_entrypoint.sh index 17a9f1c90..d2b7d5cc9 100755 --- a/docker/main/ngen/ngen_cal_entrypoint.sh +++ b/docker/main/ngen/ngen_cal_entrypoint.sh @@ -52,13 +52,15 @@ declare -x JOB_OUTPUT_WRITE_DIR="/tmp/job_output" # Get some universally applicable functions and constants source ./funcs.sh +init_script_mpi_vars -# Run make_data_local Python functions to make necessary data local -# Called for every worker, but Python code will make sure only one worker per node makes a call that has effect -py_funcs make_data_local ${WORKER_INDEX:-0} ${PRIMARY_WORKERS:-0} +if [ "$(whoami)" != "${MPI_USER:?MPI user not defined}" ]; then + # Run make_data_local Python functions to make necessary data local + # Called for every worker, but Python code will make sure only one worker per node makes a call that has effect + py_funcs make_data_local ${WORKER_INDEX:-0} ${PRIMARY_WORKERS:-0} +fi ngen_sanity_checks_and_derived_init -init_script_mpi_vars init_ngen_executable_paths # Move to the output write directory @@ -70,6 +72,7 @@ cd ${JOB_OUTPUT_WRITE_DIR} #Needed for routing if [ ! -e /dmod/datasets/linked_job_output ]; then ln -s ${JOB_OUTPUT_WRITE_DIR} /dmod/datasets/linked_job_output + chown -R ${MPI_USER}:${MPI_USER} /dmod/datasets/linked_job_output fi start_calibration() { @@ -114,25 +117,16 @@ if [ "${WORKER_INDEX:-0}" = "0" ]; then if [ "$(whoami)" = "${MPI_USER:?MPI user not defined}" ]; then # This will only have an effect when running with multiple MPI nodes, so its safe to have even in serial exec trap close_remote_workers EXIT - # Have "main" (potentially only) worker copy config files to output dataset for record keeping - # TODO: (later) in ngen and ngen-cal entrypoints, consider adding controls for whether this is done or a simpler - # TODO: 'cp' call, based on whether we write directly to output dataset dir or some other output write dir - # Do a dry run first to sanity check directory and fail if needed before backgrounding process - py_funcs tar_and_copy --dry-run --compress ${CONFIG_DATASET_DIR:?Config dataset directory not defined} config_dataset.tgz ${OUTPUT_DATASET_DIR:?} - # Then actually run the archive and copy function in the background - py_funcs tar_and_copy --compress ${CONFIG_DATASET_DIR:?} config_dataset.tgz ${OUTPUT_DATASET_DIR:?} & - _CONFIG_COPY_PROC=$! - # If there is partitioning, which implies multi-processing job ... - if [ -n "${PARTITION_DATASET_DIR:-}" ]; then - # Include partition config dataset too if appropriate - cp -a ${PARTITION_DATASET_DIR}/. ${OUTPUT_DATASET_DIR:?} - fi # Run the same function to execute ngen_cal (it's config will handle whether MPI is used internally) start_calibration + # Remember: all these things are done as the root user on worker 0 else # Start SSHD on the main worker if have an MPI job if [ -n "${PARTITION_DATASET_DIR:-}" ]; then + # Include partition config dataset too if appropriate, though for simplicity, just copy directly + cp -a ${PARTITION_DATASET_DIR}/. ${OUTPUT_DATASET_DIR:?} + echo "$(print_date) Starting SSH daemon on main worker" /usr/sbin/sshd -D & _SSH_D_PID="$!" @@ -140,6 +134,20 @@ if [ "${WORKER_INDEX:-0}" = "0" ]; then trap cleanup_sshuser_exit EXIT fi + # Have "main" (potentially only) worker copy config files to output dataset for record keeping + # TODO: (later) in ngen and ngen-cal entrypoints, consider adding controls for whether this is done or a simpler + # TODO: 'cp' call, based on whether we write directly to output dataset dir or some other output write dir + # Do a dry run first to sanity check directory and fail if needed before backgrounding process + py_funcs tar_and_copy --dry-run --compress config_dataset.tgz ${CONFIG_DATASET_DIR:?Config dataset directory not defined} ${OUTPUT_DATASET_DIR:?} + _R_DRY=${?} + if [ ${_R_DRY} -ne 0 ]; then + echo "$(print_date) Job exec failed due to issue with copying configs to output (code ${_R_DRY})" + exit ${_R_DRY} + fi + # Then actually run the archive and copy function in the background + py_funcs tar_and_copy --compress config_dataset.tgz ${CONFIG_DATASET_DIR:?} ${OUTPUT_DATASET_DIR:?} & + _CONFIG_COPY_PROC=$! + # Make sure we run ngen/ngen-cal as our MPI_USER echo "$(print_date) Running exec script as '${MPI_USER:?}'" # Do this by just re-running this script with the same args, but as the other user @@ -147,6 +155,19 @@ if [ "${WORKER_INDEX:-0}" = "0" ]; then _EXEC_STRING="${0} ${@}" su ${MPI_USER:?} --session-command "${_EXEC_STRING}" #time su ${MPI_USER:?} --session-command "${_EXEC_STRING}" + + # TODO: (later) in ngen and ngen-cal entrypoints, add controls for whether this is done base on whether we + # TODO: are writing directly to output dataset dir or some other output write dir; this will be + # TODO: important once netcdf output works + # Then wait at this point (if necessary) for our background config copy to avoid taxing things + echo "$(print_date) Waiting for compression and copying of configuration files to output dataset" + wait ${_CONFIG_COPY_PROC:-} + _R_CONFIG_COPY=${?} + if [ ${_R_CONFIG_COPY} -eq 0 ]; then + echo "$(print_date) Compression/copying of config data to output dataset complete" + else + echo "$(print_date) Copying of config data to output dataset exited with error code: ${_R_CONFIG_COPY}" + fi fi else run_secondary_mpi_ssh_worker_node diff --git a/docker/main/ngen/ngen_entrypoint.sh b/docker/main/ngen/ngen_entrypoint.sh index 0fe32c38c..f6b67613a 100755 --- a/docker/main/ngen/ngen_entrypoint.sh +++ b/docker/main/ngen/ngen_entrypoint.sh @@ -48,13 +48,15 @@ declare -x JOB_OUTPUT_WRITE_DIR="/tmp/job_output" # Get some universally applicable functions and constants source /ngen/funcs.sh +init_script_mpi_vars -# Run make_data_local Python functions to make necessary data local -# Called for every worker, but Python code will make sure only one worker per node makes a call that has effect -py_funcs make_data_local ${WORKER_INDEX:-0} ${PRIMARY_WORKERS:-0} +if [ "$(whoami)" != "${MPI_USER:?MPI user not defined}" ]; then + # Run make_data_local Python functions to make necessary data local + # Called for every worker, but Python code will make sure only one worker per node makes a call that has effect + py_funcs make_data_local ${WORKER_INDEX:-0} ${PRIMARY_WORKERS:-0} +fi ngen_sanity_checks_and_derived_init -init_script_mpi_vars init_ngen_executable_paths # Move to the output write directory @@ -66,59 +68,37 @@ cd ${JOB_OUTPUT_WRITE_DIR} #Needed for routing if [ ! -e /dmod/datasets/linked_job_output ]; then ln -s ${JOB_OUTPUT_WRITE_DIR} /dmod/datasets/linked_job_output + chown -R ${MPI_USER}:${MPI_USER} /dmod/datasets/linked_job_output fi # We can allow worker index to not be supplied when executing serially if [ "${WORKER_INDEX:-0}" = "0" ]; then + # For in the nested/recursive call to this script with the MPI_USER ... if [ "$(whoami)" = "${MPI_USER:?MPI user not defined}" ]; then # This will only have an effect when running with multiple MPI nodes, so its safe to have even in serial exec trap close_remote_workers EXIT - # Have "main" (potentially only) worker copy config files to output dataset for record keeping - # TODO: (later) in ngen and ngen-cal entrypoints, consider adding controls for whether this is done or a simpler - # TODO: 'cp' call, based on whether we write directly to output dataset dir or some other output write dir - # Do a dry run first to sanity check directory and fail if needed before backgrounding process - py_funcs tar_and_copy --dry-run --compress ${CONFIG_DATASET_DIR:?Config dataset directory not defined} config_dataset.tgz ${OUTPUT_DATASET_DIR:?} - # Then actually run the archive and copy function in the background - py_funcs tar_and_copy --compress ${CONFIG_DATASET_DIR:?} config_dataset.tgz ${OUTPUT_DATASET_DIR:?} & - _CONFIG_COPY_PROC=$! + # If there is partitioning, which implies multi-processing job ... if [ -n "${PARTITION_DATASET_DIR:-}" ]; then - # Include partition config dataset too if appropriate, though for simplicity, just copy directly - cp -a ${PARTITION_DATASET_DIR}/. ${OUTPUT_DATASET_DIR:?} - # Then run execution + # Execute the MPI ngen run exec_main_worker_ngen_run # TODO: (later) in ngen and ngen-cal entrypoints, add controls for whether this is done base on whether we # TODO: are writing directly to output dataset dir or some other output write dir; this will be # TODO: important once netcdf output works - # Then gather output from all worker hosts + # Once done, gather output from all worker hosts py_funcs gather_output ${MPI_HOST_STRING:?} ${JOB_OUTPUT_WRITE_DIR:?} - # Then wait at this point (if necessary) for our background config copy to avoid taxing things - echo "$(print_date) Waiting for compression and copying of configuration files to output dataset" - wait ${_CONFIG_COPY_PROC} - echo "$(print_date) Compression/copying of config data to output dataset complete" - echo "$(print_date) Copying results to output dataset" - py_funcs move_job_output --job_id ${JOB_ID:?} ${JOB_OUTPUT_WRITE_DIR} to_directory ${OUTPUT_DATASET_DIR:?} - echo "$(print_date) Results copied to output dataset" # Otherwise, we just have a serial job ... else - # Execute it first exec_serial_ngen_run - - # TODO: (later) in ngen and ngen-cal entrypoints, add controls for whether this is done base on whether we - # TODO: are writing directly to output dataset dir or some other output write dir; this will be - # TODO: important once netcdf output works - echo "$(print_date) Waiting for compression and copying of configuration files to output dataset" - wait ${_CONFIG_COPY_PROC} - echo "$(print_date) Compression/copying of config data to output dataset complete" - - echo "$(print_date) Copying results to output dataset" - py_funcs move_job_output --job_id ${JOB_ID:?} ${JOB_OUTPUT_WRITE_DIR} to_directory ${OUTPUT_DATASET_DIR:?} - echo "$(print_date) Results copied to output dataset" fi + # Remember: all these things are done as the root user on worker 0 else # Start SSHD on the main worker if have an MPI job if [ -n "${PARTITION_DATASET_DIR:-}" ]; then + # Include partition config dataset too if appropriate, though for simplicity, just copy directly + cp -a ${PARTITION_DATASET_DIR}/. ${OUTPUT_DATASET_DIR:?} + echo "$(print_date) Starting SSH daemon on main worker" /usr/sbin/sshd -D & _SSH_D_PID="$!" @@ -126,6 +106,20 @@ if [ "${WORKER_INDEX:-0}" = "0" ]; then trap cleanup_sshuser_exit EXIT fi + # Have "main" (potentially only) worker copy config files to output dataset for record keeping + # TODO: (later) in ngen and ngen-cal entrypoints, consider adding controls for whether this is done or a simpler + # TODO: 'cp' call, based on whether we write directly to output dataset dir or some other output write dir + # Do a dry run first to sanity check directory and fail if needed before backgrounding process + py_funcs tar_and_copy --dry-run --compress config_dataset.tgz ${CONFIG_DATASET_DIR:?Config dataset directory not defined} ${OUTPUT_DATASET_DIR:?} + _R_DRY=${?} + if [ ${_R_DRY} -ne 0 ]; then + echo "$(print_date) Job exec failed due to issue with copying configs to output (code ${_R_DRY})" + exit ${_R_DRY} + fi + # Then actually run the archive and copy function in the background + py_funcs tar_and_copy --compress config_dataset.tgz ${CONFIG_DATASET_DIR:?} ${OUTPUT_DATASET_DIR:?} & + _CONFIG_COPY_PROC=$! + # Make sure we run the model as our MPI_USER echo "$(print_date) Running exec script as '${MPI_USER:?}'" # Do this by just re-running this script with the same args, but as the other user @@ -133,6 +127,28 @@ if [ "${WORKER_INDEX:-0}" = "0" ]; then _EXEC_STRING="${0} ${@}" su ${MPI_USER:?} --session-command "${_EXEC_STRING}" #time su ${MPI_USER:?} --session-command "${_EXEC_STRING}" + + # TODO: (later) in ngen and ngen-cal entrypoints, add controls for whether this is done base on whether we + # TODO: are writing directly to output dataset dir or some other output write dir; this will be + # TODO: important once netcdf output works + # Then wait at this point (if necessary) for our background config copy to avoid taxing things + echo "$(print_date) Waiting for compression and copying of configuration files to output dataset" + wait ${_CONFIG_COPY_PROC:-} + _R_CONFIG_COPY=${?} + if [ ${_R_CONFIG_COPY} -eq 0 ]; then + echo "$(print_date) Compression/copying of config data to output dataset complete" + else + echo "$(print_date) Copying of config data to output dataset exited with error code: ${_R_CONFIG_COPY}" + fi + + echo "$(print_date) Copying results to output dataset" + py_funcs move_job_output --job_id ${JOB_ID:?} ${JOB_OUTPUT_WRITE_DIR} to_directory ${OUTPUT_DATASET_DIR:?} + _R_RESULT_COPY=${?} + if [ ${_R_RESULT_COPY} -eq 0 ]; then + echo "$(print_date) Results copied to output dataset" + else + echo "$(print_date) Error copying to output dataset directory '${OUTPUT_DATASET_DIR}' (error code: ${_R_RESULT_COPY})" + fi fi else run_secondary_mpi_ssh_worker_node From d71abfc402b8863742312a2aedfcdf334921d1b3 Mon Sep 17 00:00:00 2001 From: Robert Bartel Date: Wed, 30 Oct 2024 15:31:40 -0400 Subject: [PATCH 13/21] Bump core version to 0.21.0. --- python/lib/core/dmod/core/_version.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/lib/core/dmod/core/_version.py b/python/lib/core/dmod/core/_version.py index 2f15b8cd3..e45337122 100644 --- a/python/lib/core/dmod/core/_version.py +++ b/python/lib/core/dmod/core/_version.py @@ -1 +1 @@ -__version__ = '0.20.0' +__version__ = '0.21.0' From 15486d85631d8f0897ca53dc83863972740d3e58 Mon Sep 17 00:00:00 2001 From: Robert Bartel Date: Fri, 16 Aug 2024 14:43:38 -0400 Subject: [PATCH 14/21] Update scheduler dep to latest core. --- python/lib/scheduler/pyproject.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/lib/scheduler/pyproject.toml b/python/lib/scheduler/pyproject.toml index a03b8cadc..61b8eec31 100644 --- a/python/lib/scheduler/pyproject.toml +++ b/python/lib/scheduler/pyproject.toml @@ -17,7 +17,7 @@ dependencies = [ "dmod.communication>=0.22.0", "dmod.modeldata>=0.7.1", "dmod.redis>=0.1.0", - "dmod.core>=0.17.0", + "dmod.core>=0.20.0", "cryptography", "uri", "pyyaml", From 42a09fa835c97da6a51ce50ba5978c0cca0ca675 Mon Sep 17 00:00:00 2001 From: Robert Bartel Date: Fri, 16 Aug 2024 14:44:36 -0400 Subject: [PATCH 15/21] Update dataservice internal deps to latest. Update dependencies on core and scheduler to 0.21.0 and 0.14.0 respectively. --- python/services/dataservice/pyproject.toml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/python/services/dataservice/pyproject.toml b/python/services/dataservice/pyproject.toml index acde4194a..735513d91 100644 --- a/python/services/dataservice/pyproject.toml +++ b/python/services/dataservice/pyproject.toml @@ -9,9 +9,9 @@ authors = [ { name = "Austin Raney", email = "austin.raney@noaa.gov" }, ] dependencies = [ - "dmod.core>=0.19.0", + "dmod.core>=0.21.0", "dmod.communication>=0.21.0", - "dmod.scheduler>=0.12.2", + "dmod.scheduler>=0.14.0", "dmod.modeldata>=0.13.0", "redis", "pydantic[dotenv]>=1.10.8,~=1.10", From 2164ca0d119ddd6ad813321708b04e58e2a6ea7f Mon Sep 17 00:00:00 2001 From: Robert Bartel Date: Fri, 16 Aug 2024 14:44:50 -0400 Subject: [PATCH 16/21] Bump dataservice version to 0.13.0. --- python/services/dataservice/dmod/dataservice/_version.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/services/dataservice/dmod/dataservice/_version.py b/python/services/dataservice/dmod/dataservice/_version.py index 2c7bffbf8..2d7893e3d 100644 --- a/python/services/dataservice/dmod/dataservice/_version.py +++ b/python/services/dataservice/dmod/dataservice/_version.py @@ -1 +1 @@ -__version__ = '0.12.0' +__version__ = '0.13.0' From 0c4ef3c2434ed94f4aca66bdd7151007be1cd870 Mon Sep 17 00:00:00 2001 From: Robert Bartel Date: Fri, 16 Aug 2024 14:45:28 -0400 Subject: [PATCH 17/21] Update schedulerservice internal deps to latest. Update dependencies on core and scheduler to 0.21.0 and 0.14.0 respectively. --- python/services/schedulerservice/pyproject.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/services/schedulerservice/pyproject.toml b/python/services/schedulerservice/pyproject.toml index 5a8c16c6d..0013ebdc1 100644 --- a/python/services/schedulerservice/pyproject.toml +++ b/python/services/schedulerservice/pyproject.toml @@ -10,7 +10,7 @@ authors = [ { name = "Austin Raney", email = "austin.raney@noaa.gov" }, ] dependencies = [ - "dmod.core>=0.17.0", + "dmod.core>=0.21.0", "dmod.communication>=0.22.0", "dmod.scheduler>=0.14.0", ] From b59cb7ebf732634d280d7ab5b1560276fc666342 Mon Sep 17 00:00:00 2001 From: Robert Bartel Date: Fri, 16 Aug 2024 14:46:21 -0400 Subject: [PATCH 18/21] Update requestsservice to latest core dep. Updating dependency on core to 0.21.0. --- python/services/requestservice/pyproject.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/services/requestservice/pyproject.toml b/python/services/requestservice/pyproject.toml index 7e22792eb..edb80e6f1 100644 --- a/python/services/requestservice/pyproject.toml +++ b/python/services/requestservice/pyproject.toml @@ -12,7 +12,7 @@ authors = [ ] dependencies = [ "websockets", - "dmod.core>=0.19.0", + "dmod.core>=0.21.0", "dmod.communication>=0.22.0", "dmod.access>=0.2.0", "dmod.externalrequests>=0.6.0", From 84710c421b0dab7a785571af3169259f56684766 Mon Sep 17 00:00:00 2001 From: Robert Bartel Date: Fri, 16 Aug 2024 14:47:07 -0400 Subject: [PATCH 19/21] Update partitionerservice internal deps to latest. Updating dependencies on core and scheduler to 0.21.0 and 0.14.0 respectively. --- python/services/partitionerservice/pyproject.toml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/python/services/partitionerservice/pyproject.toml b/python/services/partitionerservice/pyproject.toml index 459490b9f..097d0c16e 100644 --- a/python/services/partitionerservice/pyproject.toml +++ b/python/services/partitionerservice/pyproject.toml @@ -9,10 +9,10 @@ authors = [ { name = "Austin Raney", email = "austin.raney@noaa.gov" }, ] dependencies = [ - "dmod.core>=0.1.0", + "dmod.core>=0.21.0", "dmod.communication>=0.7.1", "dmod.modeldata>=0.7.1", - "dmod.scheduler>=0.12.2", + "dmod.scheduler>=0.14.0", "dmod.externalrequests>=0.3.0", ] readme = "README.md" From 46e442ea26b75f408ec2261c5ab9db07462c9dad Mon Sep 17 00:00:00 2001 From: Robert Bartel Date: Fri, 16 Aug 2024 14:47:20 -0400 Subject: [PATCH 20/21] Bump partitionerservice version to 0.3.0. --- .../partitionerservice/dmod/partitionerservice/_version.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/services/partitionerservice/dmod/partitionerservice/_version.py b/python/services/partitionerservice/dmod/partitionerservice/_version.py index d93b5b242..0404d8103 100644 --- a/python/services/partitionerservice/dmod/partitionerservice/_version.py +++ b/python/services/partitionerservice/dmod/partitionerservice/_version.py @@ -1 +1 @@ -__version__ = '0.2.3' +__version__ = '0.3.0' From 25dbf8e85f6288c5342b0d25c36fbaa66eca80ad Mon Sep 17 00:00:00 2001 From: Robert Bartel Date: Thu, 22 Aug 2024 11:50:33 -0400 Subject: [PATCH 21/21] Account for platform in image mc client download. Account for building in environments other than Linux X86_64 when downloading the MinIO client for the ngen worker images. --- docker/main/ngen/Dockerfile | 15 ++++++++++++++- 1 file changed, 14 insertions(+), 1 deletion(-) diff --git a/docker/main/ngen/Dockerfile b/docker/main/ngen/Dockerfile index 2d9064b59..6fa549f8e 100644 --- a/docker/main/ngen/Dockerfile +++ b/docker/main/ngen/Dockerfile @@ -951,7 +951,20 @@ USER root # https://dl.min.io/client/mc/release/linux-amd64/archive/mc.${MINIO_CLIENT_RELEASE} # Setup minio client; also update path and make sure dataset directory is there -RUN curl -L -o /dmod/bin/mc https://dl.min.io/client/mc/release/linux-amd64/mc \ +RUN OS_NAME=$(uname -s | tr '[:upper:]' '[:lower:]') \ + && PLATFORM=$(uname -m | tr '[:upper:]' '[:lower:]') \ + && case "${PLATFORM:?}" in \ + aarch64) \ + MINIO_HW_NAME=arm64 \ + ;; \ + x86_64) \ + MINIO_HW_NAME=amd64 \ + ;; \ + *) \ + MINIO_HW_NAME=${PLATFORM} \ + ;; \ + esac \ + && curl -L -o /dmod/bin/mc https://dl.min.io/client/mc/release/${OS_NAME}-${MINIO_HW_NAME}/mc \ && chmod +x /dmod/bin/mc \ && mkdir /dmod/.mc \ && echo "export PATH=${PATH}" >> /etc/profile \