From 1fcd7304809691fa935a8ec88b27bb4627745df9 Mon Sep 17 00:00:00 2001 From: Luke Robison Date: Fri, 1 Mar 2024 15:18:55 +0000 Subject: [PATCH 1/3] coll/han: reduce coll:han:get_algorithm prints Increase coll:han:get_algorithm verbosity level from 1 to 30, to avoid flooding terminal at any verbosity level. Thirty seems to be used for most of the other han dynamic selection prints. Signed-off-by: Luke Robison --- ompi/mca/coll/han/coll_han_dynamic.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ompi/mca/coll/han/coll_han_dynamic.c b/ompi/mca/coll/han/coll_han_dynamic.c index b6c4ce4b955..eba0cd37e6f 100644 --- a/ompi/mca/coll/han/coll_han_dynamic.c +++ b/ompi/mca/coll/han/coll_han_dynamic.c @@ -354,7 +354,7 @@ get_algorithm(COLLTYPE_T coll_id, } } if ( 0 == rank ) { - opal_output_verbose(1, mca_coll_han_component.han_output, + opal_output_verbose(30, mca_coll_han_component.han_output, "coll:han:get_algorithm %s size:%ld algorithm:%d %s", mca_coll_base_colltype_to_str(coll_id), msg_size, From 34c4a3086ea77286f303a4bbc2f6fa0b3a431a5c Mon Sep 17 00:00:00 2001 From: Luke Robison Date: Mon, 26 Feb 2024 20:31:33 +0000 Subject: [PATCH 2/3] coll/han: require smsc endpoint This will allow HAN collectives to check for and use SMSC methods to direct-map peer memory during on-node communication. Signed-off-by: Luke Robison --- ompi/mca/coll/han/coll_han.h | 17 +++++++++++++++++ ompi/mca/coll/han/coll_han_component.c | 5 +++++ ompi/mca/coll/han/configure.m4 | 18 ++++++++++++++++++ 3 files changed, 40 insertions(+) create mode 100644 ompi/mca/coll/han/configure.m4 diff --git a/ompi/mca/coll/han/coll_han.h b/ompi/mca/coll/han/coll_han.h index fa44a54f2b6..58b6c817dba 100644 --- a/ompi/mca/coll/han/coll_han.h +++ b/ompi/mca/coll/han/coll_han.h @@ -42,6 +42,7 @@ #include "mpi.h" #include "ompi/mca/mca.h" #include "opal/util/output.h" +#include "opal/mca/smsc/smsc.h" #include "ompi/mca/coll/base/coll_base_functions.h" #include "coll_han_trigger.h" #include "ompi/mca/coll/han/coll_han_dynamic.h" @@ -532,4 +533,20 @@ coll_han_utils_gcd(const uint64_t *numerators, const size_t size); int coll_han_utils_create_contiguous_datatype(size_t count, const ompi_datatype_t *oldType, ompi_datatype_t **newType); + +static inline struct mca_smsc_endpoint_t *mca_coll_han_get_smsc_endpoint (struct ompi_proc_t *proc) { + extern opal_mutex_t mca_coll_han_lock; + if (NULL == proc->proc_endpoints[OMPI_PROC_ENDPOINT_TAG_SMSC]) { + if (NULL == proc->proc_endpoints[OMPI_PROC_ENDPOINT_TAG_SMSC]) { + OPAL_THREAD_LOCK(&mca_coll_han_lock); + if (NULL == proc->proc_endpoints[OMPI_PROC_ENDPOINT_TAG_SMSC]) { + proc->proc_endpoints[OMPI_PROC_ENDPOINT_TAG_SMSC] = mca_smsc->get_endpoint(&proc->super); + } + OPAL_THREAD_UNLOCK(&mca_coll_han_lock); + } + } + + return (struct mca_smsc_endpoint_t *) proc->proc_endpoints[OMPI_PROC_ENDPOINT_TAG_SMSC]; +} + #endif /* MCA_COLL_HAN_EXPORT_H */ diff --git a/ompi/mca/coll/han/coll_han_component.c b/ompi/mca/coll/han/coll_han_component.c index 13aa5e531be..601f7573d2b 100644 --- a/ompi/mca/coll/han/coll_han_component.c +++ b/ompi/mca/coll/han/coll_han_component.c @@ -49,6 +49,11 @@ ompi_coll_han_components ompi_coll_han_available_components[COMPONENTS_COUNT] = { XHC, "xhc" } }; +/* + * Thread lock for han + */ +opal_mutex_t mca_coll_han_lock = OPAL_MUTEX_STATIC_INIT; + /* * Local functions */ diff --git a/ompi/mca/coll/han/configure.m4 b/ompi/mca/coll/han/configure.m4 new file mode 100644 index 00000000000..e84205eaa13 --- /dev/null +++ b/ompi/mca/coll/han/configure.m4 @@ -0,0 +1,18 @@ +# -*- autoconf -*- +# +# Copyright (c) 2022 Amazon.com, Inc. or its affiliates. All Rights reserved. +# $COPYRIGHT$ +# +# Additional copyrights may follow +# +# $HEADER$ +# + +AC_DEFUN([MCA_ompi_coll_han_CONFIG],[ + AC_CONFIG_FILES([ompi/mca/coll/han/Makefile]) + $1 +])dnl + +AC_DEFUN([MCA_ompi_coll_han_POST_CONFIG], [ + OMPI_REQUIRE_ENDPOINT_TAG([SMSC]) +])dnl From 6af5156b9eafa7891aad247e39716f2f3adf4b14 Mon Sep 17 00:00:00 2001 From: Luke Robison Date: Thu, 25 Jan 2024 17:24:54 +0000 Subject: [PATCH 3/3] coll/han: Add SMSC-based alltoall to HAN Add Alltoall algorithm to coll/han. Each rank on one host is assigned a single partner on a remote host and vice versa. Then the rank collects all the data its partner will need to receive from it's host, and sends it in one large send, and likewise receives it's data in one large recv, then cycles to the next host. This algorithm is only selected when SMSC component has ability to direct-map peer memory, which only exists for XPMEM module. Signed-off-by: Luke Robison --- ompi/mca/coll/han/Makefile.am | 3 +- ompi/mca/coll/han/coll_han.h | 17 ++ ompi/mca/coll/han/coll_han_algorithms.c | 5 + ompi/mca/coll/han/coll_han_algorithms.h | 7 + ompi/mca/coll/han/coll_han_alltoall.c | 389 ++++++++++++++++++++++++ ompi/mca/coll/han/coll_han_component.c | 13 + ompi/mca/coll/han/coll_han_dynamic.c | 127 ++++++++ ompi/mca/coll/han/coll_han_module.c | 6 +- ompi/mca/coll/han/coll_han_subcomms.c | 7 + 9 files changed, 572 insertions(+), 2 deletions(-) create mode 100644 ompi/mca/coll/han/coll_han_alltoall.c diff --git a/ompi/mca/coll/han/Makefile.am b/ompi/mca/coll/han/Makefile.am index e9ca89d055c..95ab470dc66 100644 --- a/ompi/mca/coll/han/Makefile.am +++ b/ompi/mca/coll/han/Makefile.am @@ -2,7 +2,7 @@ # Copyright (c) 2018-2020 The University of Tennessee and The University # of Tennessee Research Foundation. All rights # reserved. -# Copyright (c) 2022 Amazon.com, Inc. or its affiliates. All Rights reserved. +# Copyright (c) 2024 Amazon.com, Inc. or its affiliates. All Rights reserved. # Copyright (c) 2022 BULL S.A.S. All rights reserved. # $COPYRIGHT$ # @@ -15,6 +15,7 @@ sources = \ coll_han.h \ coll_han_trigger.h \ coll_han_algorithms.h \ +coll_han_alltoall.c \ coll_han_dynamic.h \ coll_han_dynamic_file.h \ coll_han_barrier.c \ diff --git a/ompi/mca/coll/han/coll_han.h b/ompi/mca/coll/han/coll_han.h index 58b6c817dba..e6746d47dc4 100644 --- a/ompi/mca/coll/han/coll_han.h +++ b/ompi/mca/coll/han/coll_han.h @@ -198,6 +198,7 @@ typedef struct mca_coll_han_op_module_name_t { mca_coll_han_op_up_low_module_name_t gatherv; mca_coll_han_op_up_low_module_name_t scatter; mca_coll_han_op_up_low_module_name_t scatterv; + mca_coll_han_op_up_low_module_name_t alltoall; } mca_coll_han_op_module_name_t; /** @@ -253,6 +254,13 @@ typedef struct mca_coll_han_component_t { uint32_t han_scatterv_up_module; /* low level module for scatterv */ uint32_t han_scatterv_low_module; + + /* low level module for alltoall */ + uint32_t han_alltoall_low_module; + /* alltoall: parallel stages */ + int32_t han_alltoall_pstages; + + /* name of the modules */ mca_coll_han_op_module_name_t han_op_module_name; /* whether we need reproducible results @@ -288,6 +296,7 @@ typedef struct mca_coll_han_single_collective_fallback_s { union { + mca_coll_base_module_alltoall_fn_t alltoall; mca_coll_base_module_allgather_fn_t allgather; mca_coll_base_module_allgatherv_fn_t allgatherv; mca_coll_base_module_allreduce_fn_t allreduce; @@ -309,6 +318,7 @@ typedef struct mca_coll_han_single_collective_fallback_s */ typedef struct mca_coll_han_collectives_fallback_s { + mca_coll_han_single_collective_fallback_t alltoall; mca_coll_han_single_collective_fallback_t allgather; mca_coll_han_single_collective_fallback_t allgatherv; mca_coll_han_single_collective_fallback_t allreduce; @@ -371,6 +381,9 @@ OBJ_CLASS_DECLARATION(mca_coll_han_module_t); * Some defines to stick to the naming used in the other components in terms of * fallback routines */ +#define previous_alltoall fallback.alltoall.alltoall +#define previous_alltoall_module fallback.alltoall.module + #define previous_allgather fallback.allgather.allgather #define previous_allgather_module fallback.allgather.module @@ -426,6 +439,7 @@ OBJ_CLASS_DECLARATION(mca_coll_han_module_t); HAN_UNINSTALL_COLL_API(COMM, HANM, allreduce); \ HAN_UNINSTALL_COLL_API(COMM, HANM, allgather); \ HAN_UNINSTALL_COLL_API(COMM, HANM, allgatherv); \ + HAN_UNINSTALL_COLL_API(COMM, HANM, alltoall); \ han_module->enabled = false; /* entire module set to pass-through from now on */ \ } while(0) @@ -486,6 +500,9 @@ mca_coll_han_get_all_coll_modules(struct ompi_communicator_t *comm, mca_coll_han_module_t *han_module); int +mca_coll_han_alltoall_intra_dynamic(ALLTOALL_BASE_ARGS, + mca_coll_base_module_t *module); +int mca_coll_han_allgather_intra_dynamic(ALLGATHER_BASE_ARGS, mca_coll_base_module_t *module); int diff --git a/ompi/mca/coll/han/coll_han_algorithms.c b/ompi/mca/coll/han/coll_han_algorithms.c index 9ebc04588c1..b113bd9ac07 100644 --- a/ompi/mca/coll/han/coll_han_algorithms.c +++ b/ompi/mca/coll/han/coll_han_algorithms.c @@ -1,6 +1,7 @@ /* -*- Mode: C; c-basic-offset:4 ; indent-tabs-mode:nil -*- */ /* * Copyright (c) 2020-2022 Bull S.A.S. All rights reserved. + * Copyright (c) 2024 Amazon.com, Inc. or its affiliates. All Rights Reserved. * * $COPYRIGHT$ * @@ -77,6 +78,10 @@ mca_coll_han_algorithm_value_t* mca_coll_han_available_algorithms[COLLCOUNT] = {"simple", (fnptr_t)&mca_coll_han_allgather_intra_simple}, // 2-level { 0 } }, + [ALLTOALL] = (mca_coll_han_algorithm_value_t[]){ + {"smsc", (fnptr_t)&mca_coll_han_alltoall_using_smsc}, // 2-level + { 0 } + }, }; int diff --git a/ompi/mca/coll/han/coll_han_algorithms.h b/ompi/mca/coll/han/coll_han_algorithms.h index 8d3bbdf91ec..9889e5b644d 100644 --- a/ompi/mca/coll/han/coll_han_algorithms.h +++ b/ompi/mca/coll/han/coll_han_algorithms.h @@ -1,6 +1,7 @@ /* -*- Mode: C; c-basic-offset:4 ; indent-tabs-mode:nil -*- */ /* * Copyright (c) 2020-2022 Bull S.A.S. All rights reserved. + * Copyright (c) 2024 Amazon.com, Inc. or its affiliates. All Rights Reserved. * * $COPYRIGHT$ * @@ -208,4 +209,10 @@ mca_coll_han_allgather_intra_simple(const void *sbuf, size_t scount, struct ompi_communicator_t *comm, mca_coll_base_module_t *module); +/* Alltoall */ +int +mca_coll_han_alltoall_using_smsc(ALLTOALL_BASE_ARGS, + mca_coll_base_module_t *module); + + #endif diff --git a/ompi/mca/coll/han/coll_han_alltoall.c b/ompi/mca/coll/han/coll_han_alltoall.c new file mode 100644 index 00000000000..c2f4b58e32b --- /dev/null +++ b/ompi/mca/coll/han/coll_han_alltoall.c @@ -0,0 +1,389 @@ +/* + * Copyright (c) 2024 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Additional copyrights may follow + * + * $HEADER$ + */ + +/** + * @file + * + * This file contains the hierarchical implementations of alltoall. + * + * mca_coll_han_alltoall_using_smsc: + * This algorithm relies on SMSC and specifically XPMEM because of + * the need to direct-map the memory. + * + * Each rank on one host is assigned a single + * partner on a remote host and vice versa. Then the rank collects all + * the data its partner will need to receive from its host, and sends it + * in one large send, and likewise receives its data in one large recv, + * then cycles to the next host. + */ + +#include "coll_han.h" +#include "ompi/mca/coll/base/coll_base_functions.h" +#include "ompi/mca/coll/base/coll_tags.h" +#include "ompi/mca/pml/pml.h" +#include "coll_han_trigger.h" +#include "opal/mca/smsc/smsc.h" +#include "opal/mca/rcache/rcache.h" +#include "ompi/mca/osc/base/base.h" + + + +/* Who is the given ranks partner during the exchange? + This function will require rounds comm_size-many rounds, and your partner + will select you in the same round which you select that partner. */ +static inline int ring_partner_no_skip(int rank, int round, int comm_size) { + /* make sure ring_partner is positive: make argument to modulo > 0 with +comm_size.*/ + return (comm_size + round - rank) % comm_size; +} + +/* Who is the given ranks partner during the exchange? + This function will require rounds comm_size-many rounds, and does + self-exchange last. */ +static inline int ring_partner(int rank, int round, int comm_size) { + round = round % comm_size; + if (round == comm_size - 1) { + /* last round: self-exchange */ + return rank; + } + int self_round = (2*rank) % comm_size; + if ( round < self_round ) + return ring_partner_no_skip(rank, round, comm_size); + else + return ring_partner_no_skip(rank, round+1, comm_size); +} + +int mca_coll_han_alltoall_using_smsc( + const void *sbuf, size_t scount, + struct ompi_datatype_t *sdtype, + void* rbuf, size_t rcount, + struct ompi_datatype_t *rdtype, + struct ompi_communicator_t *comm, + mca_coll_base_module_t *module) +{ + + mca_coll_han_module_t *han_module = (mca_coll_han_module_t *)module; + + OPAL_OUTPUT_VERBOSE((90, mca_coll_han_component.han_output, + "Entering mca_coll_han_alltoall_using_smsc\n")); + + if (!mca_smsc || !mca_smsc_base_has_feature(MCA_SMSC_FEATURE_CAN_MAP)) { + /* Assume all hosts take this path together :-\ */ + opal_output_verbose(1, mca_coll_han_component.han_output, "in mca_coll_han_alltoall_using_smsc, " + "but MCA_SMSC_FEATURE_CAN_MAP not available. Disqualifying this alg!\n"); + HAN_UNINSTALL_COLL_API(comm, han_module, alltoall); + return han_module->previous_alltoall(sbuf, scount, sdtype, rbuf, rcount, rdtype, + comm, han_module->previous_alltoall_module); + } + + /* Create the subcommunicators */ + if( OMPI_SUCCESS != mca_coll_han_comm_create_new(comm, han_module) ) { + opal_output_verbose(1, mca_coll_han_component.han_output, + "han cannot handle alltoall with this communicator. Fall back on another component\n"); + /* HAN cannot work with this communicator so fallback on all collectives */ + HAN_LOAD_FALLBACK_COLLECTIVES(comm, han_module); + return han_module->previous_alltoall(sbuf, scount, sdtype, rbuf, rcount, rdtype, + comm, han_module->previous_alltoall_module); + } + + /* Topo must be initialized to know rank distribution which then is used to + * determine if han can be used */ + mca_coll_han_topo_init(comm, han_module, 2); + if (han_module->are_ppn_imbalanced || !han_module->is_mapbycore){ + opal_output_verbose(1, mca_coll_han_component.han_output, + "han cannot handle alltoall with this communicator (imbalance/!mapbycore). " + "Fall back on another component\n"); + /* Put back the fallback collective support and call it once. All + * future calls will then be automatically redirected. + */ + HAN_UNINSTALL_COLL_API(comm, han_module, alltoall); + return han_module->previous_alltoall(sbuf, scount, sdtype, rbuf, rcount, rdtype, + comm, han_module->previous_alltoall_module); + } + + int rc, send_needs_bounce, ii_push_data; + size_t sndsize; + MPI_Aint sextent, rextent, lb; + char *send_bounce; + opal_convertor_t convertor; + size_t packed_size = 0, packed_size_tmp; + int use_isend; + void *gather_buf_in[4]; + int up_rank; + + ompi_communicator_t *low_comm = han_module->sub_comm[INTRA_NODE]; + ompi_communicator_t *up_comm = han_module->sub_comm[INTER_NODE]; + ompi_request_t **inter_recv_reqs; + ompi_request_t **inter_send_reqs; + + rc = ompi_datatype_get_extent( sdtype, &lb, &sextent); + rc = ompi_datatype_get_extent( rdtype, &lb, &rextent); + opal_datatype_type_size( &sdtype->super, &sndsize); + + int w_rank = ompi_comm_rank(comm); + int w_size = ompi_comm_size(comm); + + /* information about sub-communicators */ + int low_rank = ompi_comm_rank(low_comm); + int low_size = ompi_comm_size(low_comm); + int up_size = ompi_comm_size(up_comm); + + int fanout = mca_coll_han_component.han_alltoall_pstages; + if (!fanout) { + fanout = 1; + } + if (fanout > up_size) { fanout = up_size; } + + OBJ_CONSTRUCT( &convertor, opal_convertor_t ); + + + send_needs_bounce = 0; + /* get converter for copying to one of the leader ranks, and get packed size: */ + opal_convertor_copy_and_prepare_for_send(ompi_mpi_local_convertor, &sdtype->super, scount, sbuf, 0, &convertor); + send_needs_bounce |= 0 != opal_convertor_on_device(&convertor); + send_needs_bounce |= opal_convertor_need_buffers(&convertor); + opal_convertor_cleanup(&convertor); + + opal_convertor_copy_and_prepare_for_recv(ompi_mpi_local_convertor, &rdtype->super, rcount, rbuf, 0, &convertor); + send_needs_bounce |= 0 != opal_convertor_on_device(&convertor); + send_needs_bounce |= opal_convertor_need_buffers(&convertor); + opal_convertor_get_packed_size( &convertor, &packed_size ); + opal_convertor_cleanup(&convertor); + + /* + Because push-mode needs extra synchronizations, we'd like to avoid it, + however it might be necessary: + + If application buffer is non-contigious or non-homogenous, then we'll + need to "push" the data so that the packing process properly knows the + memory layout and types. + + If the application buffer is device memory, we'll also need to exchange + in push mode so that the process which has device registrations can + perform the reads. + + In both of these cases, we'll need to use the bounce buffer too. + */ + ii_push_data = send_needs_bounce; + + /* + If we have a fanout > 1, we'll need somewhere to put data for the next + send while the previous send is still in-flight. We'll need a dedicated + bounce buffer for this, but it doesn't mean we have to use "push" mode. + */ + send_needs_bounce |= fanout != 1; + + up_rank = w_rank / low_size; + assert( w_rank % low_size == low_rank ); + int64_t send_bytes_per_fan = low_size * packed_size; + inter_send_reqs = malloc(sizeof(*inter_send_reqs) * fanout); + inter_recv_reqs = malloc(sizeof(*inter_recv_reqs) * up_size ); + char **low_bufs = malloc(low_size * sizeof(*low_bufs)); + void **sbuf_map_ctx = malloc(low_size * sizeof(&sbuf_map_ctx)); + + const int nptrs_gather = 3; + void **gather_buf_out = calloc(low_size*nptrs_gather, sizeof(void*)); + bool send_bounce_is_allocated = false; + + do { +start_allgather: + if ( 0 == send_needs_bounce ) { + send_bounce = (char*)rbuf + up_rank*send_bytes_per_fan; + } else { + if (!send_bounce_is_allocated) { + send_bounce = malloc(send_bytes_per_fan * fanout); + send_bounce_is_allocated = true; + } + } + + if (ii_push_data) { + /* all ranks will push to the other ranks' bounce buffer */ + gather_buf_in[0] = send_bounce; + } else { + /* all ranks will pull from the other ranks' sbuf */ + gather_buf_in[0] = (void*)sbuf; + } + gather_buf_in[1] = *(void**)&send_needs_bounce; + gather_buf_in[2] = *(void**)&ii_push_data; + + rc = low_comm->c_coll->coll_allgather(gather_buf_in, nptrs_gather, MPI_AINT, + gather_buf_out, nptrs_gather, MPI_AINT, low_comm, + low_comm->c_coll->coll_allgather_module); + + if (rc != 0) { + OPAL_OUTPUT_VERBOSE((40, mca_coll_han_component.han_output, + "Allgather failed with %d\n",rc)); + goto cleanup; + } + + for (int jother=0; jother 1 || ii_push_data; + + for (int jother=0; jothermap_peer_region( + smsc_ep, + MCA_RCACHE_FLAGS_PERSIST, + low_bufs[jother], + sextent*w_size*scount, + (void**) &low_bufs[jother] ); + } + } + + for (int jslot=0; jslot < fanout; jslot++) { + inter_send_reqs[jslot] = MPI_REQUEST_NULL; + } + + + /* pre-post all our receives. We will be ready to receive all data regardless of fan-out. + (This is not an in-place algorithm)*/ + int inter_recv_count = 0; + for (int jround=0; jround= 0 ) { + /* we cannot fill for our next send until the previous send using this buffer is completed. */ + ompi_request_wait(&inter_send_reqs[jfan_slot], MPI_STATUS_IGNORE); + if (ii_push_data && jloop < up_size) { + /* barrier here so followers know all leaders have completed + previous isend for this slot, and may begin overwriting bounce slot. */ + low_comm->c_coll->coll_barrier(low_comm, low_comm->c_coll->coll_barrier_module); + } + } + + if (jloop < up_size) { + /* For this upper-comm partner, we must provide our data to all the leaders. */ + + int first_remote_wrank; + first_remote_wrank = up_partner*low_size; + + assert(up_partner >= 0); + assert(up_partner < up_size); + assert(first_remote_wrank <= w_size - low_size ); + + /* pack data into each of the leaders' buffers */ + for (int jlow=0; jlowsuper, scount, + from_addr, + 0, &convertor); + /* iovec: set the destination of the copy */ + iov.iov_base = to_addr; + iov.iov_len = sextent*scount; + + /* pack the data directly into local leader's sendbuf */ + packed_size_tmp = packed_size; + rc = opal_convertor_pack(&convertor, &iov, &iov_count, &packed_size_tmp); + opal_convertor_cleanup(&convertor); + + if (1 != rc) { + opal_output_verbose(1, mca_coll_han_component.han_output, + "opal_convert_pack failed with %d\n",rc); + rc = MPI_ERR_TRUNCATE; + goto cleanup; + } + rc = MPI_SUCCESS; + } + + if (ii_push_data) { + /* barrier here so leaders know all followers have filled data, + and can issue send. */ + low_comm->c_coll->coll_barrier(low_comm, low_comm->c_coll->coll_barrier_module); + } + + if (use_isend == 0) { + MCA_PML_CALL(send + (send_bounce, + send_bytes_per_fan, MPI_PACKED, first_remote_wrank+low_rank, + MCA_COLL_BASE_TAG_ALLTOALL, MCA_PML_BASE_SEND_STANDARD, + comm) ); + } else { + /* send the data to our remote partner */ + MCA_PML_CALL(isend + (&send_bounce[send_bytes_per_fan*jfan_slot], + send_bytes_per_fan, MPI_PACKED, first_remote_wrank+low_rank, + MCA_COLL_BASE_TAG_ALLTOALL, MCA_PML_BASE_SEND_STANDARD, + comm, &inter_send_reqs[jfan_slot])); + } + } + } + + /* wait for all irecv to complete */ + ompi_request_wait_all(inter_recv_count, inter_recv_reqs, MPI_STATUS_IGNORE); + +cleanup: + for (int jlow=0; jlowunmap_peer_region(sbuf_map_ctx[jlow]); + } + } + OBJ_DESTRUCT(&convertor); + if (send_bounce_is_allocated) free(send_bounce); + free(inter_send_reqs); + free(inter_recv_reqs); + free(sbuf_map_ctx); + free(low_bufs); + free(gather_buf_out); + + OPAL_OUTPUT_VERBOSE((40, mca_coll_han_component.han_output, + "Alltoall Complete with %d\n",rc)); + return rc; + +} \ No newline at end of file diff --git a/ompi/mca/coll/han/coll_han_component.c b/ompi/mca/coll/han/coll_han_component.c index 601f7573d2b..bb11b7d5ab7 100644 --- a/ompi/mca/coll/han/coll_han_component.c +++ b/ompi/mca/coll/han/coll_han_component.c @@ -6,6 +6,7 @@ * Copyright (c) 2020-2022 Bull S.A.S. All rights reserved. * Copyright (c) 2023 Computer Architecture and VLSI Systems (CARV) * Laboratory, ICS Forth. All rights reserved. + * Copyright (c) 2024 Amazon.com, Inc. or its affiliates. All Rights Reserved. * $COPYRIGHT$ * * Additional copyrights may follow @@ -397,6 +398,18 @@ static int han_register(void) "low level module for scatterv, 0 basic", OPAL_INFO_LVL_9, &cs->han_scatterv_low_module, &cs->han_op_module_name.scatterv.han_op_low_module_name); + cs->han_alltoall_low_module = 0; + (void) mca_coll_han_query_module_from_mca(c, "alltoall_lower_module", + "low level module for alltoall, 0 tuned, 1 sm ", + OPAL_INFO_LVL_9, &cs->han_alltoall_low_module, + &cs->han_op_module_name.alltoall.han_op_low_module_name); + cs->han_alltoall_pstages = 0; + (void) mca_base_component_var_register(c, "alltoall_pstages", + "Parallel Stages for alltoall. Higher numbers require more memory, " + "and performs more communication in parallel. 0 chooses pstages based on message size.", + MCA_BASE_VAR_TYPE_INT32_T, NULL, 0, 0, + OPAL_INFO_LVL_9, MCA_BASE_VAR_SCOPE_READONLY, + &cs->han_alltoall_pstages); cs->han_reproducible = 0; (void) mca_base_component_var_register(c, "reproducible", diff --git a/ompi/mca/coll/han/coll_han_dynamic.c b/ompi/mca/coll/han/coll_han_dynamic.c index eba0cd37e6f..69f25b40757 100644 --- a/ompi/mca/coll/han/coll_han_dynamic.c +++ b/ompi/mca/coll/han/coll_han_dynamic.c @@ -4,6 +4,7 @@ * Copyright (c) 2021 Triad National Security, LLC. All rights * reserved. * Copyright (c) 2022 IBM Corporation. All rights reserved + * Copyright (c) 2024 Amazon.com, Inc. or its affiliates. All Rights Reserved. * * $COPYRIGHT$ * @@ -40,6 +41,7 @@ bool mca_coll_han_is_coll_dynamic_implemented(COLLTYPE_T coll_id) case ALLGATHER: case ALLGATHERV: case ALLREDUCE: + case ALLTOALL: case BARRIER: case BCAST: case GATHER: @@ -1508,3 +1510,128 @@ mca_coll_han_scatterv_intra_dynamic(const void *sbuf, ompi_count_array_t scounts rbuf, rcount, rdtype, root, comm, sub_module); } + +/* + * alltoall selector: + * On a sub-communicator, checks the stored rules to find the module to use + * On the global communicator, calls the han collective implementation, or + * calls the correct module if fallback mechanism is activated + */ +int +mca_coll_han_alltoall_intra_dynamic(const void *sbuf, size_t scount, + struct ompi_datatype_t *sdtype, + void* rbuf, size_t rcount, + struct ompi_datatype_t *rdtype, + struct ompi_communicator_t *comm, + mca_coll_base_module_t *module) +{ + mca_coll_han_module_t *han_module = (mca_coll_han_module_t*) module; + TOPO_LVL_T topo_lvl = han_module->topologic_level; + mca_coll_base_module_alltoall_fn_t alltoall; + mca_coll_base_module_t *sub_module; + size_t dtype_size; + int rank, verbosity = 0; + + if (!han_module->enabled) { + return han_module->previous_alltoall(sbuf, scount, sdtype, rbuf, rcount, rdtype, comm, + han_module->previous_alltoall_module); + } + + /* Compute configuration information for dynamic rules */ + if( MPI_IN_PLACE != rbuf ) { + ompi_datatype_type_size(rdtype, &dtype_size); + dtype_size = dtype_size * rcount; + } else { + ompi_datatype_type_size(sdtype, &dtype_size); + dtype_size = dtype_size * scount; + } + + sub_module = get_module(ALLTOALL, + dtype_size, + comm, + han_module); + + /* First errors are always printed by rank 0 */ + rank = ompi_comm_rank(comm); + if( (0 == rank) && (han_module->dynamic_errors < mca_coll_han_component.max_dynamic_errors) ) { + verbosity = 30; + } + + if(NULL == sub_module) { + /* + * No valid collective module from dynamic rules + * nor from mca parameter + */ + han_module->dynamic_errors++; + opal_output_verbose(verbosity, mca_coll_han_component.han_output, + "coll:han:mca_coll_han_alltoall_intra_dynamic " + "HAN did not find any valid module for collective %d (%s) " + "with topological level %d (%s) on communicator (%s/%s). " + "Please check dynamic file/mca parameters\n", + ALLTOALL, mca_coll_base_colltype_to_str(ALLTOALL), + topo_lvl, mca_coll_han_topo_lvl_to_str(topo_lvl), + ompi_comm_print_cid(comm), comm->c_name); + OPAL_OUTPUT_VERBOSE((30, mca_coll_han_component.han_output, + "HAN/ALLTOALL: No module found for the sub-communicator. " + "Falling back to another component\n")); + alltoall = han_module->previous_alltoall; + sub_module = han_module->previous_alltoall_module; + } else if (NULL == sub_module->coll_alltoall) { + /* + * No valid collective from dynamic rules + * nor from mca parameter + */ + han_module->dynamic_errors++; + opal_output_verbose(verbosity, mca_coll_han_component.han_output, + "coll:han:mca_coll_han_alltoall_intra_dynamic " + "HAN found valid module for collective %d (%s) " + "with topological level %d (%s) on communicator (%s/%s) " + "but this module cannot handle this collective. " + "Please check dynamic file/mca parameters\n", + ALLTOALL, mca_coll_base_colltype_to_str(ALLTOALL), + topo_lvl, mca_coll_han_topo_lvl_to_str(topo_lvl), + ompi_comm_print_cid(comm), comm->c_name); + OPAL_OUTPUT_VERBOSE((30, mca_coll_han_component.han_output, + "HAN/ALLTOALL: the module found for the sub-" + "communicator cannot handle the ALLTOALL operation. " + "Falling back to another component\n")); + alltoall = han_module->previous_alltoall; + sub_module = han_module->previous_alltoall_module; + } else if (GLOBAL_COMMUNICATOR == topo_lvl && sub_module == module) { + /* + * No fallback mechanism activated for this configuration + * sub_module is valid + * sub_module->coll_alltoall is valid and point to this function + * Call han topological collective algorithm + */ + int algorithm_id = get_algorithm(ALLTOALL, + dtype_size, + comm, + han_module); + alltoall = (mca_coll_base_module_alltoall_fn_t)mca_coll_han_algorithm_id_to_fn(ALLTOALL, algorithm_id); + if (NULL == alltoall) { /* default behaviour */ + alltoall = mca_coll_han_alltoall_using_smsc; + } + } else { + /* + * If we get here: + * sub_module is valid + * sub_module->coll_alltoall is valid + * They points to the collective to use, according to the dynamic rules + * Selector's job is done, call the collective + */ + alltoall = sub_module->coll_alltoall; + } + + /* + * If we get here: + * sub_module is valid + * sub_module->coll_alltoall is valid + * They points to the collective to use, according to the dynamic rules + * Selector's job is done, call the collective + */ + return alltoall(sbuf, scount, sdtype, + rbuf, rcount, rdtype, + comm, + sub_module); +} diff --git a/ompi/mca/coll/han/coll_han_module.c b/ompi/mca/coll/han/coll_han_module.c index 8be8d5a9319..cf2c84e27fd 100644 --- a/ompi/mca/coll/han/coll_han_module.c +++ b/ompi/mca/coll/han/coll_han_module.c @@ -8,6 +8,7 @@ * reserved. * Copyright (c) 2022 IBM Corporation. All rights reserved * Copyright (c) 2024 NVIDIA Corporation. All rights reserved. + * Copyright (c) 2024 Amazon.com, Inc. or its affiliates. All Rights Reserved. * $COPYRIGHT$ * * Additional copyrights may follow @@ -47,6 +48,7 @@ static int mca_coll_han_module_disable(mca_coll_base_module_t * module, */ static void han_module_clear(mca_coll_han_module_t *han_module) { + CLEAN_PREV_COLL(han_module, alltoall); CLEAN_PREV_COLL(han_module, allgather); CLEAN_PREV_COLL(han_module, allgatherv); CLEAN_PREV_COLL(han_module, allreduce); @@ -232,7 +234,7 @@ mca_coll_han_comm_query(struct ompi_communicator_t * comm, int *priority) han_module->super.coll_module_enable = mca_coll_han_module_enable; han_module->super.coll_module_disable = mca_coll_han_module_disable; - han_module->super.coll_alltoall = NULL; + han_module->super.coll_alltoall = mca_coll_han_alltoall_intra_dynamic; han_module->super.coll_alltoallv = NULL; han_module->super.coll_alltoallw = NULL; han_module->super.coll_exscan = NULL; @@ -294,6 +296,7 @@ mca_coll_han_module_enable(mca_coll_base_module_t * module, { mca_coll_han_module_t * han_module = (mca_coll_han_module_t*) module; + HAN_INSTALL_COLL_API(comm, han_module, alltoall); HAN_INSTALL_COLL_API(comm, han_module, allgather); HAN_INSTALL_COLL_API(comm, han_module, allgatherv); HAN_INSTALL_COLL_API(comm, han_module, allreduce); @@ -321,6 +324,7 @@ mca_coll_han_module_disable(mca_coll_base_module_t * module, { mca_coll_han_module_t * han_module = (mca_coll_han_module_t *) module; + HAN_UNINSTALL_COLL_API(comm, han_module, alltoall); HAN_UNINSTALL_COLL_API(comm, han_module, allgather); HAN_UNINSTALL_COLL_API(comm, han_module, allgatherv); HAN_UNINSTALL_COLL_API(comm, han_module, allreduce); diff --git a/ompi/mca/coll/han/coll_han_subcomms.c b/ompi/mca/coll/han/coll_han_subcomms.c index 92bddb3ba51..71e01a4bba3 100644 --- a/ompi/mca/coll/han/coll_han_subcomms.c +++ b/ompi/mca/coll/han/coll_han_subcomms.c @@ -5,6 +5,7 @@ * Copyright (c) 2020 Bull S.A.S. All rights reserved. * Copyright (c) 2023 Computer Architecture and VLSI Systems (CARV) * Laboratory, ICS Forth. All rights reserved. + * Copyright (c) 2024 Amazon.com, Inc. or its affiliates. All Rights Reserved. * * Copyright (c) 2024 NVIDIA Corporation. All rights reserved. * $COPYRIGHT$ @@ -77,6 +78,7 @@ int mca_coll_han_comm_create_new(struct ompi_communicator_t *comm, * Reduce + Bcast may be called by the allreduce implementation * Gather + Bcast may be called by the allgather implementation */ + HAN_SUBCOM_SAVE_COLLECTIVE(fallbacks, comm, han_module, alltoall); HAN_SUBCOM_SAVE_COLLECTIVE(fallbacks, comm, han_module, allgatherv); HAN_SUBCOM_SAVE_COLLECTIVE(fallbacks, comm, han_module, allgather); HAN_SUBCOM_SAVE_COLLECTIVE(fallbacks, comm, han_module, allreduce); @@ -108,6 +110,7 @@ int mca_coll_han_comm_create_new(struct ompi_communicator_t *comm, if( local_procs == 1 ) { han_module->enabled = false; /* entire module set to pass-through from now on */ /* restore saved collectives */ + HAN_SUBCOM_RESTORE_COLLECTIVE(fallbacks, comm, han_module, alltoall); HAN_SUBCOM_RESTORE_COLLECTIVE(fallbacks, comm, han_module, allgatherv); HAN_SUBCOM_RESTORE_COLLECTIVE(fallbacks, comm, han_module, allgather); HAN_SUBCOM_RESTORE_COLLECTIVE(fallbacks, comm, han_module, allreduce); @@ -185,6 +188,7 @@ int mca_coll_han_comm_create_new(struct ompi_communicator_t *comm, han_module->cached_vranks = vranks; /* Restore the saved collectives */ + HAN_SUBCOM_RESTORE_COLLECTIVE(fallbacks, comm, han_module, alltoall); HAN_SUBCOM_RESTORE_COLLECTIVE(fallbacks, comm, han_module, allgatherv); HAN_SUBCOM_RESTORE_COLLECTIVE(fallbacks, comm, han_module, allgather); HAN_SUBCOM_RESTORE_COLLECTIVE(fallbacks, comm, han_module, allreduce); @@ -243,6 +247,7 @@ int mca_coll_han_comm_create(struct ompi_communicator_t *comm, * Reduce + Bcast may be called by the allreduce implementation * Gather + Bcast may be called by the allgather implementation */ + HAN_SUBCOM_SAVE_COLLECTIVE(fallbacks, comm, han_module, alltoall); HAN_SUBCOM_SAVE_COLLECTIVE(fallbacks, comm, han_module, allgatherv); HAN_SUBCOM_SAVE_COLLECTIVE(fallbacks, comm, han_module, allgather); HAN_SUBCOM_SAVE_COLLECTIVE(fallbacks, comm, han_module, allreduce); @@ -269,6 +274,7 @@ int mca_coll_han_comm_create(struct ompi_communicator_t *comm, comm->c_coll->coll_allreduce_module); if( local_procs == 1 ) { /* restore saved collectives */ + HAN_SUBCOM_RESTORE_COLLECTIVE(fallbacks, comm, han_module, alltoall); HAN_SUBCOM_RESTORE_COLLECTIVE(fallbacks, comm, han_module, allgatherv); HAN_SUBCOM_RESTORE_COLLECTIVE(fallbacks, comm, han_module, allgather); HAN_SUBCOM_RESTORE_COLLECTIVE(fallbacks, comm, han_module, allreduce); @@ -363,6 +369,7 @@ int mca_coll_han_comm_create(struct ompi_communicator_t *comm, han_module->cached_vranks = vranks; /* Reset the saved collectives to point back to HAN */ + HAN_SUBCOM_RESTORE_COLLECTIVE(fallbacks, comm, han_module, alltoall); HAN_SUBCOM_RESTORE_COLLECTIVE(fallbacks, comm, han_module, allgatherv); HAN_SUBCOM_RESTORE_COLLECTIVE(fallbacks, comm, han_module, allgather); HAN_SUBCOM_RESTORE_COLLECTIVE(fallbacks, comm, han_module, allreduce);