Skip to content

Commit

Permalink
fcoll/vulcan: add support for GPU aggregation buffers
Browse files Browse the repository at this point in the history
If the user user input buffers are GPU device memory, use also GPU device memory for the aggregation step. This will allow the data transfer to occur between GPU buffers, and hence take advantage of the much higher GPU-GPU interconnects (e.g. XGMI, NVLINK, etc.).

The downside of this approach is that we cannot call directly into the fbtl ipwritev routine, but have to go through the common_ompio_file_iwrite_pregen routine, which performs the necessary segmenting and staging through the host memory.

Signed-off-by: Edgar Gabriel <[email protected]>
  • Loading branch information
edgargabriel committed Jul 18, 2024
1 parent a6d5b36 commit 8b24867
Show file tree
Hide file tree
Showing 7 changed files with 149 additions and 26 deletions.
2 changes: 2 additions & 0 deletions ompi/mca/common/ompio/common_ompio.h
Original file line number Diff line number Diff line change
Expand Up @@ -262,6 +262,8 @@ OMPI_DECLSPEC int mca_common_ompio_file_write_at (ompio_file_t *fh, OMPI_MPI_OFF
OMPI_DECLSPEC int mca_common_ompio_file_iwrite (ompio_file_t *fh, const void *buf, size_t count,
struct ompi_datatype_t *datatype, ompi_request_t **request);

OMPI_DECLSPEC int mca_common_ompio_file_iwrite_pregen (ompio_file_t *fh, ompi_request_t *request);

OMPI_DECLSPEC int mca_common_ompio_file_iwrite_at (ompio_file_t *fh, OMPI_MPI_OFFSET_TYPE offset,
const void *buf, size_t count, struct ompi_datatype_t *datatype,
ompi_request_t **request);
Expand Down
69 changes: 68 additions & 1 deletion ompi/mca/common/ompio/common_ompio_file_write.c
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
* Copyright (c) 2008-2019 University of Houston. All rights reserved.
* Copyright (c) 2015-2018 Research Organization for Information Science
* and Technology (RIST). All rights reserved.
* Copyright (c) 2022-2023 Advanced Micro Devices, Inc. All rights reserved.
* Copyright (c) 2022-2024 Advanced Micro Devices, Inc. All rights reserved.
* Copyright (c) 2024 Triad National Security, LLC. All rights
* reserved.
* $COPYRIGHT$
Expand Down Expand Up @@ -329,6 +329,7 @@ static void mca_common_ompio_post_next_write_subreq(struct mca_ompio_request_t *
decoded_iov.iov_base = req->req_tbuf;
decoded_iov.iov_len = req->req_size;
opal_convertor_pack (&req->req_convertor, &decoded_iov, &iov_count, &pos);

mca_common_ompio_build_io_array (req->req_fview, index, req->req_num_subreqs,
bytes_per_cycle, pos,
iov_count, &decoded_iov,
Expand Down Expand Up @@ -472,6 +473,72 @@ int mca_common_ompio_file_iwrite (ompio_file_t *fh,
return ret;
}

/*
** This routine is invoked from the fcoll component.
** It is only used if the temporary buffer is a gpu buffer,
** and the fbtl supports the ipwritev operation.
**
** The io-array has already been generated in fcoll/xxx/file_write_all,
** and we use the pre-computed offsets to created a pseudo fview.
** The position of the file pointer is updated in the fcoll
** component, not here.
*/

int mca_common_ompio_file_iwrite_pregen (ompio_file_t *fh,
ompi_request_t *request)
{
uint32_t i;
size_t max_data;
size_t pipeline_buf_size;
mca_ompio_request_t *ompio_req = (mca_ompio_request_t *) request;

if (NULL == fh->f_fbtl->fbtl_ipwritev) {
return MPI_ERR_INTERN;
}

max_data = fh->f_io_array[0].length;
pipeline_buf_size = OMPIO_MCA_GET(fh, pipeline_buffer_size);

mca_common_ompio_register_progress ();

OMPIO_PREPARE_BUF (fh, fh->f_io_array[0].memory_address, max_data, MPI_BYTE,
ompio_req->req_tbuf, &ompio_req->req_convertor, max_data,
pipeline_buf_size, NULL, i);

ompio_req->req_num_subreqs = ceil((double)max_data/pipeline_buf_size);
ompio_req->req_size = pipeline_buf_size;
ompio_req->req_max_data = max_data;
ompio_req->req_post_next_subreq = mca_common_ompio_post_next_write_subreq;
ompio_req->req_fh = fh;
ompio_req->req_ompi.req_status.MPI_ERROR = MPI_SUCCESS;

ompio_req->req_fview = (struct ompio_fview_t *) calloc(1, sizeof(struct ompio_fview_t));
if (NULL == ompio_req->req_fview) {
opal_output(1, "common_ompio: error allocating memory\n");
return OMPI_ERR_OUT_OF_RESOURCE;
}

ompio_req->req_fview->f_decoded_iov = (struct iovec*) malloc ( fh->f_num_of_io_entries *
sizeof(struct iovec));
if (NULL == ompio_req->req_fview->f_decoded_iov) {
opal_output(1, "common_ompio_file_iwrite_pregen: could not allocate memory\n");
return OMPI_ERR_OUT_OF_RESOURCE;
}

ompio_req->req_fview->f_iov_count = fh->f_num_of_io_entries;
for (i=0; i < ompio_req->req_fview->f_iov_count; i++) {
ompio_req->req_fview->f_decoded_iov[i].iov_base = fh->f_io_array[i].offset;
ompio_req->req_fview->f_decoded_iov[i].iov_len = fh->f_io_array[i].length ;
}

fh->f_num_of_io_entries = 0;
free (fh->f_io_array);
fh->f_io_array = NULL;

mca_common_ompio_post_next_write_subreq(ompio_req, 0);
return OMPI_SUCCESS;
}

int mca_common_ompio_file_iwrite_at (ompio_file_t *fh,
OMPI_MPI_OFFSET_TYPE offset,
const void *buf,
Expand Down
1 change: 1 addition & 0 deletions ompi/mca/fcoll/vulcan/fcoll_vulcan.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ extern int mca_fcoll_vulcan_priority;
extern int mca_fcoll_vulcan_num_groups;
extern int mca_fcoll_vulcan_write_chunksize;
extern int mca_fcoll_vulcan_async_io;
extern int mca_fcoll_vulcan_use_accelerator_buffers;

OMPI_DECLSPEC extern mca_fcoll_base_component_3_0_0_t mca_fcoll_vulcan_component;

Expand Down
82 changes: 62 additions & 20 deletions ompi/mca/fcoll/vulcan/fcoll_vulcan_file_write_all.c
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
* Copyright (c) 2023 Jeffrey M. Squyres. All rights reserved.
* Copyright (c) 2024 Triad National Security, LLC. All rights
* reserved.
* Copyright (c) 2024 Advanced Micro Devices, Inc. All rights reserved.
* $COPYRIGHT$
*
* Additional copyrights may follow
Expand All @@ -30,10 +31,12 @@
#include "ompi/mca/fcoll/fcoll.h"
#include "ompi/mca/fcoll/base/fcoll_base_coll_array.h"
#include "ompi/mca/common/ompio/common_ompio.h"
#include "ompi/mca/common/ompio/common_ompio_buffer.h"
#include "ompi/mca/io/io.h"
#include "ompi/mca/common/ompio/common_ompio_request.h"
#include "math.h"
#include "ompi/mca/pml/pml.h"
#include "opal/mca/accelerator/accelerator.h"
#include <unistd.h>

#define DEBUG_ON 0
Expand Down Expand Up @@ -88,13 +91,12 @@ typedef struct mca_io_ompio_aggregator_data {
_aggr[_i]->prev_recvtype=(ompi_datatype_t **)_t; } \
}



static int shuffle_init ( int index, int cycles, int aggregator, int rank,
mca_io_ompio_aggregator_data *data,
ompi_request_t **reqs );
static int write_init (ompio_file_t *fh, int aggregator, mca_io_ompio_aggregator_data *aggr_data,
int write_chunksize, int write_synchType, ompi_request_t **request);
int write_chunksize, int write_synchType, ompi_request_t **request,
bool is_accelerator_buffer);
int mca_fcoll_vulcan_break_file_view ( struct iovec *decoded_iov, int iov_count,
struct iovec *local_iov_array, int local_count,
struct iovec ***broken_decoded_iovs, int **broken_iov_counts,
Expand Down Expand Up @@ -155,6 +157,8 @@ int mca_fcoll_vulcan_file_write_all (struct ompio_file_t *fh,

ompi_count_array_t fview_count_desc;
ompi_disp_array_t displs_desc;
int is_gpu, is_managed;
bool use_accelerator_buffer = false;

#if OMPIO_FCOLL_WANT_TIME_BREAKDOWN
double write_time = 0.0, start_write_time = 0.0, end_write_time = 0.0;
Expand All @@ -180,6 +184,11 @@ int mca_fcoll_vulcan_file_write_all (struct ompio_file_t *fh,
goto exit;
}

mca_common_ompio_check_gpu_buf (fh, buf, &is_gpu, &is_managed);
if (is_gpu && !is_managed &&
fh->f_get_mca_parameter_value ("use_accelerator_buffers", strlen("use_accelerator_buffers"))) {
use_accelerator_buffer = true;
}
/* since we want to overlap 2 iterations, define the bytes_per_cycle to be half of what
the user requested */
bytes_per_cycle =bytes_per_cycle/2;
Expand Down Expand Up @@ -529,13 +538,31 @@ int mca_fcoll_vulcan_file_write_all (struct ompio_file_t *fh,
goto exit;
}


aggr_data[i]->global_buf = (char *) malloc (bytes_per_cycle);
aggr_data[i]->prev_global_buf = (char *) malloc (bytes_per_cycle);
if (NULL == aggr_data[i]->global_buf || NULL == aggr_data[i]->prev_global_buf){
opal_output(1, "OUT OF MEMORY");
ret = OMPI_ERR_OUT_OF_RESOURCE;
goto exit;
if (use_accelerator_buffer) {
opal_output_verbose(10, ompi_fcoll_base_framework.framework_output,
"Allocating GPU device buffer for aggregation\n");
ret = opal_accelerator.mem_alloc(MCA_ACCELERATOR_NO_DEVICE_ID, (void**)&aggr_data[i]->global_buf,
bytes_per_cycle);
if (OPAL_SUCCESS != ret) {
opal_output(1, "Could not allocate accelerator memory");
ret = OMPI_ERR_OUT_OF_RESOURCE;
goto exit;
}
ret = opal_accelerator.mem_alloc(MCA_ACCELERATOR_NO_DEVICE_ID, (void**)&aggr_data[i]->prev_global_buf,
bytes_per_cycle);
if (OPAL_SUCCESS != ret) {
opal_output(1, "Could not allocate accelerator memory");
ret = OMPI_ERR_OUT_OF_RESOURCE;
goto exit;
}
} else {
aggr_data[i]->global_buf = (char *) malloc (bytes_per_cycle);
aggr_data[i]->prev_global_buf = (char *) malloc (bytes_per_cycle);
if (NULL == aggr_data[i]->global_buf || NULL == aggr_data[i]->prev_global_buf){
opal_output(1, "OUT OF MEMORY");
ret = OMPI_ERR_OUT_OF_RESOURCE;
goto exit;
}
}

aggr_data[i]->recvtype = (ompi_datatype_t **) malloc (fh->f_procs_per_group *
Expand Down Expand Up @@ -605,7 +632,7 @@ int mca_fcoll_vulcan_file_write_all (struct ompio_file_t *fh,
start_write_time = MPI_Wtime();
#endif
ret = write_init (fh, fh->f_aggr_list[aggr_index], aggr_data[aggr_index],
write_chunksize, write_synch_type, &req_iwrite);
write_chunksize, write_synch_type, &req_iwrite, use_accelerator_buffer);
if (OMPI_SUCCESS != ret){
goto exit;
}
Expand Down Expand Up @@ -645,7 +672,7 @@ int mca_fcoll_vulcan_file_write_all (struct ompio_file_t *fh,
start_write_time = MPI_Wtime();
#endif
ret = write_init (fh, fh->f_aggr_list[aggr_index], aggr_data[aggr_index],
write_chunksize, write_synch_type, &req_iwrite);
write_chunksize, write_synch_type, &req_iwrite, use_accelerator_buffer);
if (OMPI_SUCCESS != ret){
goto exit;
}
Expand Down Expand Up @@ -704,8 +731,13 @@ exit :

free (aggr_data[i]->disp_index);
free (aggr_data[i]->max_disp_index);
free (aggr_data[i]->global_buf);
free (aggr_data[i]->prev_global_buf);
if (use_accelerator_buffer) {
opal_accelerator.mem_release(MCA_ACCELERATOR_NO_DEVICE_ID, aggr_data[i]->global_buf);
opal_accelerator.mem_release(MCA_ACCELERATOR_NO_DEVICE_ID, aggr_data[i]->prev_global_buf);
} else {
free (aggr_data[i]->global_buf);
free (aggr_data[i]->prev_global_buf);
}
for(l=0;l<aggr_data[i]->procs_per_group;l++){
free (aggr_data[i]->blocklen_per_process[l]);
free (aggr_data[i]->displs_per_process[l]);
Expand Down Expand Up @@ -749,7 +781,8 @@ static int write_init (ompio_file_t *fh,
mca_io_ompio_aggregator_data *aggr_data,
int write_chunksize,
int write_synchType,
ompi_request_t **request )
ompi_request_t **request,
bool is_accelerator_buffer)
{
int ret = OMPI_SUCCESS;
ssize_t ret_temp = 0;
Expand All @@ -770,11 +803,20 @@ static int write_init (ompio_file_t *fh,
write_chunksize);

if (1 == write_synchType) {
ret = fh->f_fbtl->fbtl_ipwritev(fh, (ompi_request_t *) ompio_req);
if(0 > ret) {
opal_output (1, "vulcan_write_all: fbtl_ipwritev failed\n");
ompio_req->req_ompi.req_status.MPI_ERROR = ret;
ompio_req->req_ompi.req_status._ucount = 0;
if (is_accelerator_buffer) {
ret = mca_common_ompio_file_iwrite_pregen(fh, (ompi_request_t *) ompio_req);
if(0 > ret) {
opal_output (1, "vulcan_write_all: mca_common_ompio_iwrite_pregen failed\n");
ompio_req->req_ompi.req_status.MPI_ERROR = ret;
ompio_req->req_ompi.req_status._ucount = 0;
}
} else {
ret = fh->f_fbtl->fbtl_ipwritev(fh, (ompi_request_t *) ompio_req);
if(0 > ret) {
opal_output (1, "vulcan_write_all: fbtl_ipwritev failed\n");
ompio_req->req_ompi.req_status.MPI_ERROR = ret;
ompio_req->req_ompi.req_status._ucount = 0;
}
}
}
else {
Expand Down
5 changes: 4 additions & 1 deletion ompi/mca/io/ompio/io_ompio.c
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
* Copyright (c) 2012-2013 Inria. All rights reserved.
* Copyright (c) 2015-2018 Research Organization for Information Science
* and Technology (RIST). All rights reserved.
* Copyright (c) 2022 Advanced Micro Devices, Inc. All rights reserved.
* Copyright (c) 2022-2024 Advanced Micro Devices, Inc. All rights reserved.
* $COPYRIGHT$
*
* Additional copyrights may follow
Expand Down Expand Up @@ -556,6 +556,9 @@ int mca_io_ompio_get_mca_parameter_value ( char *mca_parameter_name, int name_le
else if ( !strncmp ( mca_parameter_name, "coll_timing_info", name_length )) {
return mca_io_ompio_coll_timing_info;
}
else if ( !strncmp (mca_parameter_name, "use_accelerator_buffers", name_length)) {
return mca_io_ompio_use_accelerator_buffers;
}
else {
opal_output (1, "Error in mca_io_ompio_get_mca_parameter_value: unknown parameter name");
}
Expand Down
4 changes: 2 additions & 2 deletions ompi/mca/io/ompio/io_ompio.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
* Copyright (c) 2015-2018 Research Organization for Information Science
* and Technology (RIST). All rights reserved.
* Copyright (c) 2016-2017 IBM Corporation. All rights reserved.
* Copyright (c) 2022 Advanced Micro Devices, Inc. All rights reserved.
* Copyright (c) 2022-2024 Advanced Micro Devices, Inc. All rights reserved.
* Copyright (c) 2024 Triad National Security, LLC. All rights
* reserved.
* $COPYRIGHT$
Expand Down Expand Up @@ -57,7 +57,7 @@ extern int mca_io_ompio_max_aggregators_ratio;
extern int mca_io_ompio_aggregators_cutoff_threshold;
extern int mca_io_ompio_overwrite_amode;
extern int mca_io_ompio_verbose_info_parsing;

extern int mca_io_ompio_use_accelerator_buffers;
OMPI_DECLSPEC extern int mca_io_ompio_coll_timing_info;

#define QUEUESIZE 2048
Expand Down
12 changes: 10 additions & 2 deletions ompi/mca/io/ompio/io_ompio_component.c
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
* and Technology (RIST). All rights reserved.
* Copyright (c) 2016-2017 IBM Corporation. All rights reserved.
* Copyright (c) 2018 DataDirect Networks. All rights reserved.
* Copyright (c) 2022-2023 Advanced Micro Devices, Inc. All rights reserved.
* Copyright (c) 2022-2024 Advanced Micro Devices, Inc. All rights reserved.
* Copyright (c) 2024 Triad National Security, LLC. All rights
* reserved.
* $COPYRIGHT$
Expand Down Expand Up @@ -49,7 +49,7 @@ int mca_io_ompio_max_aggregators_ratio=8;
int mca_io_ompio_aggregators_cutoff_threshold=3;
int mca_io_ompio_overwrite_amode = 1;
int mca_io_ompio_verbose_info_parsing = 0;

int mca_io_ompio_use_accelerator_buffers = 1;
int mca_io_ompio_grouping_option=5;

/*
Expand Down Expand Up @@ -263,6 +263,14 @@ static int register_component(void)
MCA_BASE_VAR_SCOPE_READONLY,
&mca_io_ompio_verbose_info_parsing);

mca_io_ompio_use_accelerator_buffers = 1;
(void) mca_base_component_var_register(&mca_io_ompio_component.io_version,
"use_accelerator_buffers", "Allow using accelerator buffers"
"for data aggregation in collective I/O if input buffer is device memory",
MCA_BASE_VAR_TYPE_INT, NULL, 0, 0,
OPAL_INFO_LVL_9,
MCA_BASE_VAR_SCOPE_READONLY, &mca_io_ompio_use_accelerator_buffers);

return OMPI_SUCCESS;
}

Expand Down

0 comments on commit 8b24867

Please sign in to comment.