Skip to content

Commit

Permalink
Move to Python for entrypoint IO optimizations.
Browse files Browse the repository at this point in the history
  • Loading branch information
robertbartel committed Jun 12, 2024
1 parent e38df7a commit b667dd6
Show file tree
Hide file tree
Showing 4 changed files with 315 additions and 96 deletions.
1 change: 1 addition & 0 deletions docker/main/ngen/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -899,6 +899,7 @@ USER ${USER}
COPY --chown=${USER} noah_owp_parameters /dmod/bmi_module_data/noah_owp/parameters/
COPY --chown=${USER} ngen_entrypoint.sh ${WORKDIR}/entrypoint.sh
COPY --chown=${USER} funcs.sh ${WORKDIR}
COPY --chown=${USER} --chmod=744 py_funcs.py /dmod/bin/py_funcs

ENV HYDRA_PROXY_RETRY_COUNT=5

Expand Down
91 changes: 0 additions & 91 deletions docker/main/ngen/funcs.sh
Original file line number Diff line number Diff line change
Expand Up @@ -213,94 +213,3 @@ run_secondary_mpi_ssh_worker_node()
sleep 5
done
}

tar_and_copy()
{
# If $1 is "--dry-run" then just do sanity checks without tarring or copying, then shift args
# If $1 is "--compress", then indicate that tar should be gzipped, then shift args

# $1 is source directory containing contents to archive
# $2 is the name of/path to the produced tar archive
# $3 is the location to copy to

if [ "${1:?No args given to tar_and_copy}" == "--dry-run" ]; then
local _DRY_RUN="true"
shift
fi

if [ "${1:?No contents directory given to tar_and_copy}" == "--compress" ]; then
local _TAR_EXTRA_ARGS="-z"
shift
fi

local _CONTENTS_DIR="${1:?No contents directory given to tar_and_copy}"
local _TAR_FILE="${2:?No archive file given to tar_and_copy}"
local _DEST_DIR="${3:?No copy destination directory given to tar_and_copy}"

if [ ! -e "${_CONTENTS_DIR}" ]; then
>&2 echo "$(print_date) Error: tar_and_copy contents directory '${_CONTENTS_DIR}' does not exist!"
exit 1
elif [ ! -d "${_CONTENTS_DIR}" ]; then
>&2 echo "$(print_date) Error: tar_and_copy contents directory '${_CONTENTS_DIR}' exists but is not a directory!"
exit 1
elif [ ! -e "${_DEST_DIR}" ]; then
>&2 echo "$(print_date) Error: tar_and_copy copy destination directory '${_DEST_DIR}' does not exist!"
exit 1
elif [ ! -e "${_DEST_DIR}" ]; then
>&2 echo "$(print_date) Error: tar_and_copy copy destination directory '${_DEST_DIR}' exist but is not a directory!"
exit 1
elif [ -e "${_TAR_FILE}" ]; then
>&2 echo "$(print_date) Error: tar_and_copy archive file '${_TAR_FILE}' already exists!"
exit 1
fi

if [ "${_DRY_RUN:-}" == "true" ]; then
return 0
fi

tar -c ${_TAR_EXTRA_ARGS:-} -f "${_DEST_DIR}/${_TAR_FILE}" -C "${_CONTENTS_DIR}" .
#cp -a "${_TAR_FILE}" "${_DEST_DIR}/."
#rm "${_TAR_FILE}"
}

gather_output() {
echo "$(print_date) Gather from remote worker host ${JOB_OUTPUT_WRITE_DIR:?Job temp output dir not defined} dirs"
for i in $(echo "${MPI_HOST_STRING}" | sed 's/,/ /g'); do
_HOST_NAME=$(echo "${i}" | awk -F: '{print $1}')
if [ "$(hostname)" == "${_HOST_NAME}" ]; then
continue
fi
scp -q -r ${_HOST_NAME}:${JOB_OUTPUT_WRITE_DIR}/ ${JOB_OUTPUT_WRITE_DIR}/. &
done
for p in $(jobs -p); do
wait ${p}
_R=$?
if [ ${_R} -ne 0 ]; then
echo "$(print_date) Error: remote copying of output exited with error ${_R}"
exit ${_R}
fi
done
}

move_output_to_dataset()
{
# $1 output directory
# $2 dataset directory

if [ ! -d ${1:?No output directory given for copying to dataset} ]; then
>&2 echo "$(print_date) Error: cannot move output from non-directory path '${1}' to output dataset!"
exit 1
elif [ ! -d ${2:?No output dataset directory given for copying} ]; then
>&2 echo "$(print_date) Error: cannot move output to non-directory path '${1}' for output dataset!"
exit 1
fi

if [ $(ls ${1} | grep '.csv' | wc -l) -gt 0 ]; then
echo "$(print_date) Archiving and copying output CSVs to output dataset"
tar_and_copy ${1} job-${JOB_ID:?}-output.tar ${2}
else
echo "$(print_date) Copying output file(s) to output dataset"
cp -a ${1}/. ${2}/.
fi
rm -rf ${1}
}
10 changes: 5 additions & 5 deletions docker/main/ngen/ngen_entrypoint.sh
Original file line number Diff line number Diff line change
Expand Up @@ -69,9 +69,9 @@ if [ "${WORKER_INDEX:-0}" = "0" ]; then
# TODO: (later) in ngen and ngen-cal entrypoints, consider adding controls for whether this is done or a simpler
# TODO: 'cp' call, based on whether we write directly to output dataset dir or some other output write dir
# Do a dry run first to sanity check directory and fail if needed before backgrounding process
tar_and_copy --dry-run --compress ${CONFIG_DATASET_DIR:?Config dataset directory not defined} config_dataset.tgz ${OUTPUT_DATASET_DIR:?}
py_funcs tar_and_copy --dry-run --compress ${CONFIG_DATASET_DIR:?Config dataset directory not defined} config_dataset.tgz ${OUTPUT_DATASET_DIR:?}
# Then actually run the archive and copy function in the background
tar_and_copy --compress ${CONFIG_DATASET_DIR:?} config_dataset.tgz ${OUTPUT_DATASET_DIR:?} &
py_funcs tar_and_copy --compress ${CONFIG_DATASET_DIR:?} config_dataset.tgz ${OUTPUT_DATASET_DIR:?} &
_CONFIG_COPY_PROC=$!
# If there is partitioning, which implies multi-processing job ...
if [ -n "${PARTITION_DATASET_DIR:-}" ]; then
Expand All @@ -84,13 +84,13 @@ if [ "${WORKER_INDEX:-0}" = "0" ]; then
# TODO: are writing directly to output dataset dir or some other output write dir; this will be
# TODO: important once netcdf output works
# Then gather output from all worker hosts
gather_output
py_funcs gather_output ${MPI_HOST_STRING:?} ${JOB_OUTPUT_WRITE_DIR:?}
# Then wait at this point (if necessary) for our background config copy to avoid taxing things
echo "$(print_date) Waiting for compression and copying of configuration files to output dataset"
wait ${_CONFIG_COPY_PROC}
echo "$(print_date) Compression/copying of config data to output dataset complete"
echo "$(print_date) Copying results to output dataset"
move_output_to_dataset ${JOB_OUTPUT_WRITE_DIR} ${OUTPUT_DATASET_DIR:?}
py_funcs move_job_output --job_id ${JOB_ID:?} ${JOB_OUTPUT_WRITE_DIR} to_directory ${OUTPUT_DATASET_DIR:?}
echo "$(print_date) Results copied to output dataset"
# Otherwise, we just have a serial job ...
else
Expand All @@ -105,7 +105,7 @@ if [ "${WORKER_INDEX:-0}" = "0" ]; then
echo "$(print_date) Compression/copying of config data to output dataset complete"

echo "$(print_date) Copying results to output dataset"
move_output_to_dataset ${JOB_OUTPUT_WRITE_DIR} ${OUTPUT_DATASET_DIR:?}
py_funcs move_job_output --job_id ${JOB_ID:?} ${JOB_OUTPUT_WRITE_DIR} to_directory ${OUTPUT_DATASET_DIR:?}
echo "$(print_date) Results copied to output dataset"
fi
else
Expand Down
Loading

0 comments on commit b667dd6

Please sign in to comment.