From a1cf41186fdb8449280eec15ed8cbf7ef6af05c3 Mon Sep 17 00:00:00 2001 From: Wenduo Wang Date: Mon, 30 Oct 2023 03:59:52 +0000 Subject: [PATCH] communicator: introduce OMPI_COMM_DISJOINT flag This patch introduces a new communicator flag to indicate if no processes share the same node. This flag is intended for optimization of hierarchy-aware collective operations to select the more efficient transport/algorithm. In this change we introduce a non-blocking allreducestep in communicator activation and set the new flag in the completion callback function. Signed-off-by: Wenduo Wang --- contrib/check_unnecessary_headers.sh | 6 ++-- ompi/communicator/comm_cid.c | 43 ++++++++++++++++++++++------ ompi/communicator/communicator.h | 4 +++ ompi/communicator/help-comm.txt | 3 ++ 4 files changed, 45 insertions(+), 11 deletions(-) diff --git a/contrib/check_unnecessary_headers.sh b/contrib/check_unnecessary_headers.sh index 15edb513c45..bac0e427951 100644 --- a/contrib/check_unnecessary_headers.sh +++ b/contrib/check_unnecessary_headers.sh @@ -11,7 +11,7 @@ # Copyright (c) 2004-2005 The Regents of the University of California. # All rights reserved. # Copyright (c) 2009 Oak Ridge National Labs. All rights reserved. -# Copyright (c) 2022 Amazon.com, Inc. or its affiliates. +# Copyright (c) Amazon.com, Inc. or its affiliates. # All Rights reserved. # # @@ -181,8 +181,8 @@ SEARCH_HEADER[0]="ompi/attribute/attribute.h ATTR_HASH_SIZE OMPI_KEYVAL_PREDEFIN SEARCH_HEADER[1]="ompi/class/ompi_free_list.h ompi_free_list_item_init_fn_t ompi_free_list_t ompi_free_list_item_t ompi_free_list_init_ex ompi_free_list_init ompi_free_list_init_ex_new ompi_free_list_init_new ompi_free_list_grow ompi_free_list_resize ompi_free_list_pos_t OMPI_FREE_LIST_POS_BEGINNING ompi_free_list_parse OMPI_FREE_LIST_GET OMPI_FREE_LIST_WAIT __ompi_free_list_wait OMPI_FREE_LIST_RETURN" SEARCH_HEADER[2]="ompi/class/ompi_rb_tree.h ompi_rb_tree_nodecolor_t ompi_rb_tree_node_t ompi_rb_tree_comp_fn_t ompi_rb_tree_t ompi_rb_tree_condition_fn_t ompi_rb_tree_action_fn_t ompi_rb_tree_construct ompi_rb_tree_destruct ompi_rb_tree_init ompi_rb_tree_insert ompi_rb_tree_find_with ompi_rb_tree_find ompi_rb_tree_delete ompi_rb_tree_destroy ompi_rb_tree_traverse ompi_rb_tree_size" SEARCH_HEADER[3]="ompi/class/ompi_seq_tracker.h ompi_seq_tracker_range_t ompi_seq_tracker_t ompi_seq_tracker_check_duplicate ompi_seq_tracker_insert ompi_seq_tracker_copy" -SEARCH_HEADER[4]="ompi/communicator/communicator.h MPI_Comm MPI_COMM_WORLD ompi_communicator_t OMPI_COMM_INTER OMPI_COMM_CART OMPI_COMM_GRAPH OMPI_COMM_NAMEISSET OMPI_COMM_ISFREED OMPI_COMM_INTRINSIC OMPI_COMM_DYNAMIC OMPI_COMM_INVALID OMPI_COMM_PML_ADDED OMPI_COMM_IS_ OMPI_COMM_SET_ OMPI_COMM_ALLGATHER_TAG OMPI_COMM_BARRIER_TAG OMPI_COMM_ALLREDUCE_TAG OMPI_COMM_CID_ OMPI_COMM_BLOCK_ ompi_predefined_communicator_t ompi_mpi_comm_parent ompi_mpi_comm_world ompi_mpi_comm_self ompi_mpi_comm_null ompi_comm_invalid ompi_comm_rank ompi_comm_size ompi_comm_remote_size ompi_comm_get_cid ompi_comm_lookup ompi_comm_peer_lookup ompi_comm_peer_invalid ompi_comm_init ompi_comm_link_function ompi_comm_group ompi_comm_create ompi_topo_create ompi_comm_split ompi_comm_dup ompi_comm_compare ompi_comm_free ompi_comm_allocate ompi_comm_nextcid ompi_comm_finalize ompi_comm_set ompi_comm_get_rprocs ompi_comm_overlapping_groups ompi_comm_determine_first ompi_comm_activate ompi_comm_dump ompi_comm_set_name ompi_comm_reg_init ompi_comm_reg_finalize ompi_comm_num_dyncomm ompi_mpi_cxx_comm_errhandler_invoke" -SEARCH_HEADER[5]="ompi/datatype/convertor.h OMPI_COMM_INTER OMPI_COMM_CART OMPI_COMM_GRAPH OMPI_COMM_NAMEISSET OMPI_COMM_ISFREED OMPI_COMM_INTRINSIC OMPI_COMM_DYNAMIC OMPI_COMM_INVALID OMPI_COMM_PML_ADDED OMPI_COMM_IS_ OMPI_COMM_SET_ OMPI_COMM_ALLGATHER_TAG OMPI_COMM_BARRIER_TAG OMPI_COMM_ALLREDUCE_TAG OMPI_COMM_CID_ OMPI_COMM_BLOCK_ ompi_predefined_communicator_t ompi_mpi_comm_parent ompi_mpi_comm_null ompi_comm_invalid ompi_comm_rank ompi_comm_size ompi_comm_remote_size ompi_comm_get_cid ompi_comm_lookup ompi_comm_peer_lookup ompi_comm_peer_invalid ompi_comm_init ompi_comm_link_function ompi_comm_group ompi_comm_create ompi_topo_create ompi_comm_split ompi_comm_dup ompi_comm_compare ompi_comm_free ompi_comm_allocate ompi_comm_nextcid ompi_comm_finalize ompi_comm_set ompi_comm_get_rprocs ompi_comm_overlapping_groups ompi_comm_determine_first ompi_comm_activate ompi_comm_dump ompi_comm_set_name ompi_comm_reg_init ompi_comm_reg_finalize ompi_comm_num_dync CONVERTOR_DATATYPE_MASK CONVERTOR_SEND_CONVERSION CONVERTOR_RECV CONVERTOR_SEND CONVERTOR_HOMOGENEOUS CONVERTOR_NO_OP CONVERTOR_WITH_CHECKSUM CONVERTOR_TYPE_MASK CONVERTOR_STATE_START CONVERTOR_STATE_COMPLETE CONVERTOR_STATE_ALLOC CONVERTOR_COMPLETED ompi_convertor_t ompi_convertor_master_t dt_stack_t DT_STATIC_STACK_SIZE ompi_convertor_get_checksum ompi_convertor_pack ompi_convertor_unpack ompi_convertor_create ompi_convertor_cleanup ompi_convertor_need_buffers ompi_convertor_get_packed_size ompi_convertor_get_unpacked_size ompi_convertor_get_current_pointer ompi_convertor_prepare_for_send ompi_convertor_copy_and_prepare_for_send ompi_convertor_prepare_for_recv ompi_convertor_copy_and_prepare_for_recv ompi_convertor_raw ompi_convertor_set_position_nocheck ompi_convertor_set_position ompi_convertor_personalize ompi_convertor_clone ompi_convertor_clone_with_position ompi_convertor_dump ompi_ddt_dump_stack ompi_convertor_generic_simple_position MPI_Datatype" +SEARCH_HEADER[4]="ompi/communicator/communicator.h MPI_Comm MPI_COMM_WORLD ompi_communicator_t OMPI_COMM_INTER OMPI_COMM_CART OMPI_COMM_GRAPH OMPI_COMM_NAMEISSET OMPI_COMM_ISFREED OMPI_COMM_INTRINSIC OMPI_COMM_DYNAMIC OMPI_COMM_INVALID OMPI_COMM_DISJOINT_SET OMPI_COMM_DISJOINT OMPI_COMM_PML_ADDED OMPI_COMM_IS_ OMPI_COMM_SET_ OMPI_COMM_ALLGATHER_TAG OMPI_COMM_BARRIER_TAG OMPI_COMM_ALLREDUCE_TAG OMPI_COMM_CID_ OMPI_COMM_BLOCK_ ompi_predefined_communicator_t ompi_mpi_comm_parent ompi_mpi_comm_world ompi_mpi_comm_self ompi_mpi_comm_null ompi_comm_invalid ompi_comm_rank ompi_comm_size ompi_comm_remote_size ompi_comm_get_cid ompi_comm_lookup ompi_comm_peer_lookup ompi_comm_peer_invalid ompi_comm_init ompi_comm_link_function ompi_comm_group ompi_comm_create ompi_topo_create ompi_comm_split ompi_comm_dup ompi_comm_compare ompi_comm_free ompi_comm_allocate ompi_comm_nextcid ompi_comm_finalize ompi_comm_set ompi_comm_get_rprocs ompi_comm_overlapping_groups ompi_comm_determine_first ompi_comm_activate ompi_comm_dump ompi_comm_set_name ompi_comm_reg_init ompi_comm_reg_finalize ompi_comm_num_dyncomm ompi_mpi_cxx_comm_errhandler_invoke" +SEARCH_HEADER[5]="ompi/datatype/convertor.h OMPI_COMM_INTER OMPI_COMM_CART OMPI_COMM_GRAPH OMPI_COMM_NAMEISSET OMPI_COMM_ISFREED OMPI_COMM_INTRINSIC OMPI_COMM_DYNAMIC OMPI_COMM_INVALID OMPI_COMM_DISJOINT_SET OMPI_COMM_DISJOINT OMPI_COMM_PML_ADDED OMPI_COMM_IS_ OMPI_COMM_SET_ OMPI_COMM_ALLGATHER_TAG OMPI_COMM_BARRIER_TAG OMPI_COMM_ALLREDUCE_TAG OMPI_COMM_CID_ OMPI_COMM_BLOCK_ ompi_predefined_communicator_t ompi_mpi_comm_parent ompi_mpi_comm_null ompi_comm_invalid ompi_comm_rank ompi_comm_size ompi_comm_remote_size ompi_comm_get_cid ompi_comm_lookup ompi_comm_peer_lookup ompi_comm_peer_invalid ompi_comm_init ompi_comm_link_function ompi_comm_group ompi_comm_create ompi_topo_create ompi_comm_split ompi_comm_dup ompi_comm_compare ompi_comm_free ompi_comm_allocate ompi_comm_nextcid ompi_comm_finalize ompi_comm_set ompi_comm_get_rprocs ompi_comm_overlapping_groups ompi_comm_determine_first ompi_comm_activate ompi_comm_dump ompi_comm_set_name ompi_comm_reg_init ompi_comm_reg_finalize ompi_comm_num_dync CONVERTOR_DATATYPE_MASK CONVERTOR_SEND_CONVERSION CONVERTOR_RECV CONVERTOR_SEND CONVERTOR_HOMOGENEOUS CONVERTOR_NO_OP CONVERTOR_WITH_CHECKSUM CONVERTOR_TYPE_MASK CONVERTOR_STATE_START CONVERTOR_STATE_COMPLETE CONVERTOR_STATE_ALLOC CONVERTOR_COMPLETED ompi_convertor_t ompi_convertor_master_t dt_stack_t DT_STATIC_STACK_SIZE ompi_convertor_get_checksum ompi_convertor_pack ompi_convertor_unpack ompi_convertor_create ompi_convertor_cleanup ompi_convertor_need_buffers ompi_convertor_get_packed_size ompi_convertor_get_unpacked_size ompi_convertor_get_current_pointer ompi_convertor_prepare_for_send ompi_convertor_copy_and_prepare_for_send ompi_convertor_prepare_for_recv ompi_convertor_copy_and_prepare_for_recv ompi_convertor_raw ompi_convertor_set_position_nocheck ompi_convertor_set_position ompi_convertor_personalize ompi_convertor_clone ompi_convertor_clone_with_position ompi_convertor_dump ompi_ddt_dump_stack ompi_convertor_generic_simple_position MPI_Datatype" SEARCH_HEADER[6]="ompi/datatype/datatype.h MPI_Datatype DT_MAX_PREDEFINED DT_FLAG_ MAX_DT_COMPONENT_COUNT opal_ddt_count_t dt_type_desc_t ompi_datatype_t ompi_predefined_datatype_t ompi_ddt_init ompi_ddt_finalize ompi_ddt_create_ ompi_ddt_duplicate ompi_ddt_is_predefined ompi_ddt_create_from_packed_description" SEARCH_HEADER[7]="ompi/datatype/datatype_internal.h DDT_DUMP_STACK DT_ ddt_elem_id_description ddt_elem_desc ddt_elem_desc_t ddt_loop_desc ddt_loop_desc_t ddt_endloop_desc ddt_endloop_desc_t dt_elem_desc CREATE_LOOP_START CREATE_LOOP_END CREATE_ELEM ompi_complex_float_t ompi_complex_double_t ompi_complex_long_double_t ompi_ddt_basicDatatypes BASIC_DDT_FROM_ELEM ompi_ddt_default_convertors_init ompi_ddt_default_convertors_fini SAVE_STACK PUSH_STACK ompi_ddt_safeguard_pointer_debug_breakpoint OMPI_DDT_SAFEGUARD_POINTER GET_FIRST_NON_LOOP UPDATE_INTERNAL_COUNTERS ompi_ddt_print_args" SEARCH_HEADER[8]="ompi/errhandler/errhandler.h OMPI_ERRHANDLER_LANG_ ompi_errhandler_lang_t OMPI_ERRHANDLER_TYPE_ ompi_errhandler_type_t ompi_errhandler_t ompi_predefined_errhandler_t ompi_mpi_errhandler_null OMPI_ERRHANDLER_CHECK OMPI_ERRHANDLER_RETURN ompi_errhandler_init ompi_errhandler_finalize OMPI_ERRHANDLER_INVOKE ompi_errhandler_invoke ompi_errhandler_request_invoke ompi_errhandler_create ompi_errhandler_is_intrinsic ompi_errhandler_fortran_handler_fn_t OMPI_ERR_INIT_FINALIZE MPI_Errhandler" diff --git a/ompi/communicator/comm_cid.c b/ompi/communicator/comm_cid.c index 07970e8354f..4424bdf84e0 100644 --- a/ompi/communicator/comm_cid.c +++ b/ompi/communicator/comm_cid.c @@ -98,7 +98,7 @@ struct ompi_comm_cid_context_t { int remote_leader; int iter; /** storage for activate barrier */ - int ok; + int max_local_peers; char *port_string; bool send_first; int pml_tag; @@ -266,7 +266,7 @@ static ompi_comm_cid_context_t *mca_comm_cid_context_alloc (ompi_communicator_t context->send_first = send_first; context->iter = 0; - context->ok = 1; + context->max_local_peers = ompi_group_count_local_peers(newcomm->c_local_group); return context; } @@ -771,9 +771,33 @@ static int ompi_comm_nextcid_check_flag (ompi_comm_request_t *request) /* Non-blocking version of ompi_comm_activate */ static int ompi_comm_activate_nb_complete (ompi_comm_request_t *request); -static int ompi_comm_activate_complete (ompi_communicator_t **newcomm, ompi_communicator_t *comm) +/* Callback function to set communicator disjointness flags */ +static inline void ompi_comm_set_disjointness_nb_complete(ompi_comm_cid_context_t *context) +{ + if (OMPI_COMM_IS_DISJOINT_SET(*context->newcommp)) { + opal_show_help("help-comm.txt", "disjointness-set-again", true); + return; + } + + if (1 == context->max_local_peers) { + (*context->newcommp)->c_flags |= OMPI_COMM_DISJOINT; + } else { + (*context->newcommp)->c_flags &= ~OMPI_COMM_DISJOINT; + } + (*context->newcommp)->c_flags |= OMPI_COMM_DISJOINT_SET; +} + +static int ompi_comm_activate_complete (ompi_comm_cid_context_t *context) { int ret; + ompi_communicator_t **newcomm = context->newcommp, *comm = context->comm; + + /** + * Determine the new communicator's disjointness based on + * context->max_local_peers. It is reduced on the communicator + * before ompi_comm_activate_nb_complete is called. + */ + ompi_comm_set_disjointness_nb_complete(context); /** * Check to see if this process is in the new communicator. @@ -846,7 +870,7 @@ int ompi_comm_activate_nb (ompi_communicator_t **newcomm, ompi_communicator_t *c ompi_comm_cid_context_t *context; ompi_comm_request_t *request; ompi_request_t *subreq; - int ret = 0; + int ret = 0, local_peers = -1; /* the caller should not pass NULL for comm (it may be the same as *newcomm) */ assert (NULL != comm); @@ -878,10 +902,13 @@ int ompi_comm_activate_nb (ompi_communicator_t **newcomm, ompi_communicator_t *c OMPI_COMM_SET_PML_ADDED(*newcomm); } - /* Step 1: the barrier, after which it is allowed to - * send messages over the new communicator + /** + * Dual-purpose barrier: + * 1. The communicator's disjointness is inferred from max_local_peers. + * 2. After the operation it is allowed to send messages over the new communicator. */ - ret = context->allreduce_fn (&context->ok, &context->ok, 1, MPI_MIN, context, + local_peers = context->max_local_peers; + ret = context->allreduce_fn (&local_peers, &context->max_local_peers, 1, MPI_MAX, context, &subreq); if (OMPI_SUCCESS != ret) { ompi_comm_request_return (request); @@ -920,7 +947,7 @@ int ompi_comm_activate (ompi_communicator_t **newcomm, ompi_communicator_t *comm static int ompi_comm_activate_nb_complete (ompi_comm_request_t *request) { ompi_comm_cid_context_t *context = (ompi_comm_cid_context_t *) request->context; - return ompi_comm_activate_complete (context->newcommp, context->comm); + return ompi_comm_activate_complete (context); } /**************************************************************************/ diff --git a/ompi/communicator/communicator.h b/ompi/communicator/communicator.h index 7e41afb8631..06d78a77651 100644 --- a/ompi/communicator/communicator.h +++ b/ompi/communicator/communicator.h @@ -62,6 +62,8 @@ OMPI_DECLSPEC OBJ_CLASS_DECLARATION(ompi_communicator_t); #define OMPI_COMM_DYNAMIC 0x00000008 #define OMPI_COMM_ISFREED 0x00000010 #define OMPI_COMM_INVALID 0x00000020 +#define OMPI_COMM_DISJOINT_SET 0x00000040 +#define OMPI_COMM_DISJOINT 0x00000080 #define OMPI_COMM_CART 0x00000100 #define OMPI_COMM_GRAPH 0x00000200 #define OMPI_COMM_DIST_GRAPH 0x00000400 @@ -80,6 +82,8 @@ OMPI_DECLSPEC OBJ_CLASS_DECLARATION(ompi_communicator_t); #define OMPI_COMM_IS_FREED(comm) ((comm)->c_flags & OMPI_COMM_ISFREED) #define OMPI_COMM_IS_DYNAMIC(comm) ((comm)->c_flags & OMPI_COMM_DYNAMIC) #define OMPI_COMM_IS_INVALID(comm) ((comm)->c_flags & OMPI_COMM_INVALID) +#define OMPI_COMM_IS_DISJOINT_SET(comm) ((comm)->c_flags & OMPI_COMM_DISJOINT_SET) +#define OMPI_COMM_IS_DISJOINT(comm) ((comm)->c_flags & OMPI_COMM_DISJOINT) #define OMPI_COMM_IS_PML_ADDED(comm) ((comm)->c_flags & OMPI_COMM_PML_ADDED) #define OMPI_COMM_IS_EXTRA_RETAIN(comm) ((comm)->c_flags & OMPI_COMM_EXTRA_RETAIN) #define OMPI_COMM_IS_TOPO(comm) (OMPI_COMM_IS_CART((comm)) || \ diff --git a/ompi/communicator/help-comm.txt b/ompi/communicator/help-comm.txt index 90791f58f43..ae63490e237 100644 --- a/ompi/communicator/help-comm.txt +++ b/ompi/communicator/help-comm.txt @@ -34,3 +34,6 @@ in a call to MPI_Comm_split_type between peers in the communicator. [unexpected-split-type] Detected an unexpected split type in a call to MPI_Comm_split_type. split_type: %s (%d) +[disjointness-set-again] +Communicator disjointness should only be set once at initialization. +Attempts to modify the state are illegal and shall be ignored.