Skip to content

Commit

Permalink
Merge pull request #12384 from dalcinl/bugfix/ulfm-intercomm
Browse files Browse the repository at this point in the history
ULFM: Support for intercomm [i]agree and [i]shrink
  • Loading branch information
bosilca authored Apr 19, 2024
2 parents bb7ecde + b01e156 commit 0261a03
Show file tree
Hide file tree
Showing 3 changed files with 31 additions and 15 deletions.
7 changes: 5 additions & 2 deletions ompi/communicator/comm_cid.c
Original file line number Diff line number Diff line change
Expand Up @@ -1578,14 +1578,17 @@ static int ompi_comm_ft_allreduce_intra_nb(int *inbuf, int *outbuf, int count,
static int ompi_comm_ft_allreduce_inter_nb(int *inbuf, int *outbuf, int count,
struct ompi_op_t *op, ompi_comm_cid_context_t *cid_context,
ompi_request_t **req) {
return MPI_ERR_UNSUPPORTED_OPERATION;
//TODO: CID_INTER_FT needs an implementation, using the non-ft for now...
int rc = ompi_comm_allreduce_inter_nb(inbuf, outbuf, count, op, cid_context, req);
return (rc == MPI_ERR_REVOKED || rc == MPI_ERR_PROC_FAILED) ? MPI_ERR_UNSUPPORTED_OPERATION: rc;
}

static int ompi_comm_ft_allreduce_intra_pmix_nb(int *inbuf, int *outbuf, int count,
struct ompi_op_t *op, ompi_comm_cid_context_t *cid_context,
ompi_request_t **req) {
//TODO: CID_INTRA_PMIX_FT needs an implementation, using the non-ft for now...
return ompi_comm_allreduce_intra_pmix_nb(inbuf, outbuf, count, op, cid_context, req);
int rc = ompi_comm_allreduce_intra_pmix_nb(inbuf, outbuf, count, op, cid_context, req);
return (rc == MPI_ERR_REVOKED || rc == MPI_ERR_PROC_FAILED) ? MPI_ERR_UNSUPPORTED_OPERATION: rc;
}

#endif /* OPAL_ENABLE_FT_MPI */
33 changes: 22 additions & 11 deletions ompi/communicator/ft/comm_ft.c
Original file line number Diff line number Diff line change
Expand Up @@ -416,6 +416,7 @@ struct ompi_comm_ishrink_context_t {
ompi_group_t *failed_group;
ompi_group_t *alive_group;
ompi_group_t *alive_rgroup;
int flag;
double start;
};
typedef struct ompi_comm_ishrink_context_t ompi_comm_ishrink_context_t;
Expand All @@ -429,7 +430,6 @@ static int ompi_comm_ishrink_check_activate(ompi_comm_request_t *request);
int ompi_comm_ishrink_internal(ompi_communicator_t* comm, ompi_communicator_t** newcomm, ompi_request_t** req)
{
int rc;
int flag = 1;
#if OPAL_ENABLE_DEBUG
double stop;
#endif
Expand Down Expand Up @@ -479,7 +479,8 @@ int ompi_comm_ishrink_internal(ompi_communicator_t* comm, ompi_communicator_t**
* the value of flag, instead we are only using the globally consistent
* return value.
*/
rc = comm->c_coll->coll_iagree( &flag,
context->flag = 1;
rc = comm->c_coll->coll_iagree( &context->flag,
1,
&ompi_mpi_int.dt,
&ompi_mpi_op_band.op,
Expand Down Expand Up @@ -508,7 +509,7 @@ static int ompi_comm_ishrink_check_agree(ompi_comm_request_t *request) {
ompi_communicator_t *comm = context->comm;
ompi_request_t *subreq[1];
ompi_group_t *comm_group = NULL;
int rc, flag = 1;
int rc;
#if OPAL_ENABLE_DEBUG
double stop;
#endif
Expand All @@ -522,13 +523,17 @@ static int ompi_comm_ishrink_check_agree(ompi_comm_request_t *request) {
rc = request->super.req_status.MPI_ERROR;
if( (OMPI_SUCCESS != rc) && (MPI_ERR_PROC_FAILED != rc) ) {
opal_output(0, "%s:%d Agreement failure: %d\n", __FILE__, __LINE__, rc);
ompi_comm_request_return(request);
OBJ_RELEASE(context->failed_group);
return rc;
}

if( MPI_ERR_PROC_FAILED == rc ) {
/* previous round found more failures, redo */
OBJ_RELEASE(context->failed_group);
request->super.req_status.MPI_ERROR = MPI_SUCCESS;
rc = comm->c_coll->coll_iagree( &flag,
context->flag = 1;
rc = comm->c_coll->coll_iagree( &context->flag,
1,
&ompi_mpi_int.dt,
&ompi_mpi_op_band.op,
Expand Down Expand Up @@ -575,7 +580,6 @@ static int ompi_comm_ishrink_check_agree(ompi_comm_request_t *request) {
}
}
OBJ_RELEASE(context->failed_group);
context->failed_group = NULL;

rc = ompi_comm_set_nb( context->newcomm, /* new comm */
comm, /* old comm */
Expand Down Expand Up @@ -614,15 +618,16 @@ static int ompi_comm_ishrink_check_setrank(ompi_comm_request_t *request) {

/* cleanup temporary groups */
OBJ_RELEASE(context->alive_group);
context->alive_group = NULL;
if( NULL != context->alive_rgroup ) {
OBJ_RELEASE(context->alive_rgroup);
}
context->alive_rgroup = NULL;

/* check errors in prior step */
if( NULL == *context->newcomm ) {
rc = MPI_ERR_INTERN;
rc = request->super.req_status.MPI_ERROR;
if( OMPI_SUCCESS != rc ) {
opal_output_verbose(1, ompi_ftmpi_output_handle,
"%s ompi: comm_ishrink: Construction failed with error %d",
OMPI_NAME_PRINT(OMPI_PROC_MY_NAME), rc);
ompi_comm_request_return(request);
OBJ_RELEASE(*context->newcomm);
return rc;
Expand Down Expand Up @@ -719,6 +724,7 @@ static int ompi_comm_ishrink_check_cid(ompi_comm_request_t *request) {
mode,
subreq );
if( OMPI_SUCCESS != rc ) {
ompi_comm_request_return(request);
OBJ_RELEASE(*context->newcomm);
return rc;
}
Expand All @@ -729,18 +735,23 @@ static int ompi_comm_ishrink_check_cid(ompi_comm_request_t *request) {
}

static int ompi_comm_ishrink_check_activate(ompi_comm_request_t *request) {
ompi_comm_ishrink_context_t *context =
(ompi_comm_ishrink_context_t *)request->context;
int rc;
#if OPAL_ENABLE_DEBUG
double stop;
#endif

rc = request->super.req_status.MPI_ERROR;
if( OMPI_SUCCESS != rc ) {
opal_output_verbose(1, ompi_ftmpi_output_handle,
"%s ompi: comm_ishrink: Activation failed with error %d",
OMPI_NAME_PRINT(OMPI_PROC_MY_NAME), rc);
ompi_comm_request_return(request);
OBJ_RELEASE(*context->newcomm);
return rc;
}
#if OPAL_ENABLE_DEBUG
ompi_comm_ishrink_context_t *context =
(ompi_comm_ishrink_context_t *)request->context;
stop = MPI_Wtime();
OPAL_OUTPUT_VERBOSE((10, ompi_ftmpi_output_handle,
"%s ompi: comm_ishrink: COLL SELECT: %g seconds\n",
Expand Down
6 changes: 4 additions & 2 deletions ompi/mca/coll/base/coll_base_agree_noft.c
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@ ompi_coll_base_agree_noft(void *contrib,
struct ompi_communicator_t* comm,
mca_coll_base_module_t *module)
{
return comm->c_coll->coll_allreduce(MPI_IN_PLACE, contrib, dt_count, dt, op,
void *sendbuf = OMPI_COMM_IS_INTER(comm) ? contrib : MPI_IN_PLACE;
return comm->c_coll->coll_allreduce(sendbuf, contrib, dt_count, dt, op,
comm, comm->c_coll->coll_allreduce_module);
}

Expand All @@ -40,6 +41,7 @@ ompi_coll_base_iagree_noft(void *contrib,
ompi_request_t **request,
mca_coll_base_module_t *module)
{
return comm->c_coll->coll_iallreduce(MPI_IN_PLACE, contrib, dt_count, dt, op,
void *sendbuf = OMPI_COMM_IS_INTER(comm) ? contrib : MPI_IN_PLACE;
return comm->c_coll->coll_iallreduce(sendbuf, contrib, dt_count, dt, op,
comm, request, comm->c_coll->coll_iallreduce_module);
}

0 comments on commit 0261a03

Please sign in to comment.