Skip to content

Commit

Permalink
coll/basic: mca_coll_basic_gatherv_intra root posts irecv
Browse files Browse the repository at this point in the history
Match mca_coll_basic_gatherv_inter and post recvs all at once
and wait for completion.

Signed-off-by: Wenduo Wang <[email protected]>
  • Loading branch information
wenduwan committed Feb 4, 2024
1 parent 9d69edb commit 61a3692
Showing 1 changed file with 56 additions and 44 deletions.
100 changes: 56 additions & 44 deletions ompi/mca/coll/basic/coll_basic_gatherv.c
Original file line number Diff line number Diff line change
Expand Up @@ -42,75 +42,87 @@ mca_coll_basic_gatherv_intra(const void *sbuf, int scount,
void *rbuf, const int *rcounts, const int *disps,
struct ompi_datatype_t *rdtype, int root,
struct ompi_communicator_t *comm,
mca_coll_base_module_t *module)
mca_coll_base_module_t *module)
{
int i, rank, size, err;
int err, i, peer, rank, size;
char *ptmp;
ptrdiff_t lb, extent;
size_t rdsize;

size = ompi_comm_size(comm);
rank = ompi_comm_rank(comm);

/* Everyone but root sends data and returns. Don't send anything
for sendcounts of 0 (even though MPI_Gatherv has a guard for 0
counts, this routine is used elsewhere, like the implementation
of allgatherv, so it's possible to get here with a scount of
0) */
if (root == rank) {
/* Root receives from everyone else */
ompi_datatype_type_size(rdtype, &rdsize);
if (OPAL_UNLIKELY(0 == rdsize)) {
/* bozzo case */
return MPI_SUCCESS;
}

if (rank != root) {
size_t sdsize;
ompi_datatype_type_size(sdtype, &sdsize);
if (scount > 0 && sdsize > 0) {
return MCA_PML_CALL(send(sbuf, scount, sdtype, root,
MCA_COLL_BASE_TAG_GATHERV,
MCA_PML_BASE_SEND_STANDARD, comm));
err = ompi_datatype_get_extent(rdtype, &lb, &extent);
if (OMPI_SUCCESS != err) {
return OMPI_ERROR;
}
return MPI_SUCCESS;
}

/* I am the root, loop receiving data. */
if (MPI_IN_PLACE != sbuf && (0 < scount) && (0 < rcounts[rank])) {
/* Directly copy self sbuf to rbuf */
err = ompi_datatype_sndrcv(sbuf, scount, sdtype,
((char *) rbuf) + (extent * disps[rank]), rcounts[rank],
rdtype);
if (MPI_SUCCESS != err) {
return err;
}
}

ompi_datatype_type_size(rdtype, &rdsize);
if (OPAL_UNLIKELY(0 == rdsize)) {
/* bozzo case */
return MPI_SUCCESS;
}
ompi_request_t **reqs;
size_t nrecv = 0, recv_iter = 0;

err = ompi_datatype_get_extent(rdtype, &lb, &extent);
if (OMPI_SUCCESS != err) {
return OMPI_ERROR;
}
for (i = 0; i < size; ++i) {
/* We directly copied the data from self */
if (0 < rcounts[i] && rank != i) {
++nrecv;
}
}

if (0 == nrecv) {
/* Nothing to receive */
return MPI_SUCCESS;
}

for (i = 0; i < size; ++i) {
ptmp = ((char *) rbuf) + (extent * disps[i]);
reqs = ompi_coll_base_comm_get_reqs(module->base_data, nrecv);

if (i == rank) {
/* simple optimization */
if (MPI_IN_PLACE != sbuf && (0 < scount) && (0 < rcounts[i])) {
err = ompi_datatype_sndrcv(sbuf, scount, sdtype,
ptmp, rcounts[i], rdtype);
}
} else {
for (i = 1; i < size; ++i) {
peer = (rank + i) % size;
ptmp = ((char *) rbuf) + (extent * disps[peer]);
/* Only receive if there is something to receive */
if (rcounts[i] > 0) {
err = MCA_PML_CALL(recv(ptmp, rcounts[i], rdtype, i,
MCA_COLL_BASE_TAG_GATHERV,
comm, MPI_STATUS_IGNORE));
if (0 < rcounts[peer]) {
err = MCA_PML_CALL(irecv(ptmp, rcounts[peer], rdtype, peer,
MCA_COLL_BASE_TAG_GATHERV, comm, &reqs[recv_iter++]));
}
}

if (MPI_SUCCESS != err) {
return err;
}
assert(nrecv == recv_iter);

err = ompi_request_wait_all(nrecv, reqs, MPI_STATUSES_IGNORE);
return err;
}

/* All done */
/* Everyone but root sends data and returns. Don't send anything
for sendcounts of 0 (even though MPI_Gatherv has a guard for 0
counts, this routine is used elsewhere, like the implementation
of allgatherv, so it's possible to get here with a scount of
0) */

size_t sdsize;
ompi_datatype_type_size(sdtype, &sdsize);
if (scount > 0 && sdsize > 0) {
return MCA_PML_CALL(send(sbuf, scount, sdtype, root, MCA_COLL_BASE_TAG_GATHERV,
MCA_PML_BASE_SEND_STANDARD, comm));
}
return MPI_SUCCESS;
}


/*
* gatherv_inter
*
Expand Down

0 comments on commit 61a3692

Please sign in to comment.