Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Topo aware coll comm v2 #12032

Merged
merged 2 commits into from
Dec 2, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions contrib/check_unnecessary_headers.sh
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
# Copyright (c) 2004-2005 The Regents of the University of California.
# All rights reserved.
# Copyright (c) 2009 Oak Ridge National Labs. All rights reserved.
# Copyright (c) 2022 Amazon.com, Inc. or its affiliates.
# Copyright (c) Amazon.com, Inc. or its affiliates.
# All Rights reserved.
#
#
Expand Down Expand Up @@ -181,8 +181,8 @@ SEARCH_HEADER[0]="ompi/attribute/attribute.h ATTR_HASH_SIZE OMPI_KEYVAL_PREDEFIN
SEARCH_HEADER[1]="ompi/class/ompi_free_list.h ompi_free_list_item_init_fn_t ompi_free_list_t ompi_free_list_item_t ompi_free_list_init_ex ompi_free_list_init ompi_free_list_init_ex_new ompi_free_list_init_new ompi_free_list_grow ompi_free_list_resize ompi_free_list_pos_t OMPI_FREE_LIST_POS_BEGINNING ompi_free_list_parse OMPI_FREE_LIST_GET OMPI_FREE_LIST_WAIT __ompi_free_list_wait OMPI_FREE_LIST_RETURN"
SEARCH_HEADER[2]="ompi/class/ompi_rb_tree.h ompi_rb_tree_nodecolor_t ompi_rb_tree_node_t ompi_rb_tree_comp_fn_t ompi_rb_tree_t ompi_rb_tree_condition_fn_t ompi_rb_tree_action_fn_t ompi_rb_tree_construct ompi_rb_tree_destruct ompi_rb_tree_init ompi_rb_tree_insert ompi_rb_tree_find_with ompi_rb_tree_find ompi_rb_tree_delete ompi_rb_tree_destroy ompi_rb_tree_traverse ompi_rb_tree_size"
SEARCH_HEADER[3]="ompi/class/ompi_seq_tracker.h ompi_seq_tracker_range_t ompi_seq_tracker_t ompi_seq_tracker_check_duplicate ompi_seq_tracker_insert ompi_seq_tracker_copy"
SEARCH_HEADER[4]="ompi/communicator/communicator.h MPI_Comm MPI_COMM_WORLD ompi_communicator_t OMPI_COMM_INTER OMPI_COMM_CART OMPI_COMM_GRAPH OMPI_COMM_NAMEISSET OMPI_COMM_ISFREED OMPI_COMM_INTRINSIC OMPI_COMM_DYNAMIC OMPI_COMM_INVALID OMPI_COMM_PML_ADDED OMPI_COMM_IS_ OMPI_COMM_SET_ OMPI_COMM_ALLGATHER_TAG OMPI_COMM_BARRIER_TAG OMPI_COMM_ALLREDUCE_TAG OMPI_COMM_CID_ OMPI_COMM_BLOCK_ ompi_predefined_communicator_t ompi_mpi_comm_parent ompi_mpi_comm_world ompi_mpi_comm_self ompi_mpi_comm_null ompi_comm_invalid ompi_comm_rank ompi_comm_size ompi_comm_remote_size ompi_comm_get_cid ompi_comm_lookup ompi_comm_peer_lookup ompi_comm_peer_invalid ompi_comm_init ompi_comm_link_function ompi_comm_group ompi_comm_create ompi_topo_create ompi_comm_split ompi_comm_dup ompi_comm_compare ompi_comm_free ompi_comm_allocate ompi_comm_nextcid ompi_comm_finalize ompi_comm_set ompi_comm_get_rprocs ompi_comm_overlapping_groups ompi_comm_determine_first ompi_comm_activate ompi_comm_dump ompi_comm_set_name ompi_comm_reg_init ompi_comm_reg_finalize ompi_comm_num_dyncomm ompi_mpi_cxx_comm_errhandler_invoke"
SEARCH_HEADER[5]="ompi/datatype/convertor.h OMPI_COMM_INTER OMPI_COMM_CART OMPI_COMM_GRAPH OMPI_COMM_NAMEISSET OMPI_COMM_ISFREED OMPI_COMM_INTRINSIC OMPI_COMM_DYNAMIC OMPI_COMM_INVALID OMPI_COMM_PML_ADDED OMPI_COMM_IS_ OMPI_COMM_SET_ OMPI_COMM_ALLGATHER_TAG OMPI_COMM_BARRIER_TAG OMPI_COMM_ALLREDUCE_TAG OMPI_COMM_CID_ OMPI_COMM_BLOCK_ ompi_predefined_communicator_t ompi_mpi_comm_parent ompi_mpi_comm_null ompi_comm_invalid ompi_comm_rank ompi_comm_size ompi_comm_remote_size ompi_comm_get_cid ompi_comm_lookup ompi_comm_peer_lookup ompi_comm_peer_invalid ompi_comm_init ompi_comm_link_function ompi_comm_group ompi_comm_create ompi_topo_create ompi_comm_split ompi_comm_dup ompi_comm_compare ompi_comm_free ompi_comm_allocate ompi_comm_nextcid ompi_comm_finalize ompi_comm_set ompi_comm_get_rprocs ompi_comm_overlapping_groups ompi_comm_determine_first ompi_comm_activate ompi_comm_dump ompi_comm_set_name ompi_comm_reg_init ompi_comm_reg_finalize ompi_comm_num_dync CONVERTOR_DATATYPE_MASK CONVERTOR_SEND_CONVERSION CONVERTOR_RECV CONVERTOR_SEND CONVERTOR_HOMOGENEOUS CONVERTOR_NO_OP CONVERTOR_WITH_CHECKSUM CONVERTOR_TYPE_MASK CONVERTOR_STATE_START CONVERTOR_STATE_COMPLETE CONVERTOR_STATE_ALLOC CONVERTOR_COMPLETED ompi_convertor_t ompi_convertor_master_t dt_stack_t DT_STATIC_STACK_SIZE ompi_convertor_get_checksum ompi_convertor_pack ompi_convertor_unpack ompi_convertor_create ompi_convertor_cleanup ompi_convertor_need_buffers ompi_convertor_get_packed_size ompi_convertor_get_unpacked_size ompi_convertor_get_current_pointer ompi_convertor_prepare_for_send ompi_convertor_copy_and_prepare_for_send ompi_convertor_prepare_for_recv ompi_convertor_copy_and_prepare_for_recv ompi_convertor_raw ompi_convertor_set_position_nocheck ompi_convertor_set_position ompi_convertor_personalize ompi_convertor_clone ompi_convertor_clone_with_position ompi_convertor_dump ompi_ddt_dump_stack ompi_convertor_generic_simple_position MPI_Datatype"
SEARCH_HEADER[4]="ompi/communicator/communicator.h MPI_Comm MPI_COMM_WORLD ompi_communicator_t OMPI_COMM_INTER OMPI_COMM_CART OMPI_COMM_GRAPH OMPI_COMM_NAMEISSET OMPI_COMM_ISFREED OMPI_COMM_INTRINSIC OMPI_COMM_DYNAMIC OMPI_COMM_INVALID OMPI_COMM_DISJOINT_SET OMPI_COMM_DISJOINT OMPI_COMM_PML_ADDED OMPI_COMM_IS_ OMPI_COMM_SET_ OMPI_COMM_ALLGATHER_TAG OMPI_COMM_BARRIER_TAG OMPI_COMM_ALLREDUCE_TAG OMPI_COMM_CID_ OMPI_COMM_BLOCK_ ompi_predefined_communicator_t ompi_mpi_comm_parent ompi_mpi_comm_world ompi_mpi_comm_self ompi_mpi_comm_null ompi_comm_invalid ompi_comm_rank ompi_comm_size ompi_comm_remote_size ompi_comm_get_cid ompi_comm_lookup ompi_comm_peer_lookup ompi_comm_peer_invalid ompi_comm_init ompi_comm_link_function ompi_comm_group ompi_comm_create ompi_topo_create ompi_comm_split ompi_comm_dup ompi_comm_compare ompi_comm_free ompi_comm_allocate ompi_comm_nextcid ompi_comm_finalize ompi_comm_set ompi_comm_get_rprocs ompi_comm_overlapping_groups ompi_comm_determine_first ompi_comm_activate ompi_comm_dump ompi_comm_set_name ompi_comm_reg_init ompi_comm_reg_finalize ompi_comm_num_dyncomm ompi_mpi_cxx_comm_errhandler_invoke"
SEARCH_HEADER[5]="ompi/datatype/convertor.h OMPI_COMM_INTER OMPI_COMM_CART OMPI_COMM_GRAPH OMPI_COMM_NAMEISSET OMPI_COMM_ISFREED OMPI_COMM_INTRINSIC OMPI_COMM_DYNAMIC OMPI_COMM_INVALID OMPI_COMM_DISJOINT_SET OMPI_COMM_DISJOINT OMPI_COMM_PML_ADDED OMPI_COMM_IS_ OMPI_COMM_SET_ OMPI_COMM_ALLGATHER_TAG OMPI_COMM_BARRIER_TAG OMPI_COMM_ALLREDUCE_TAG OMPI_COMM_CID_ OMPI_COMM_BLOCK_ ompi_predefined_communicator_t ompi_mpi_comm_parent ompi_mpi_comm_null ompi_comm_invalid ompi_comm_rank ompi_comm_size ompi_comm_remote_size ompi_comm_get_cid ompi_comm_lookup ompi_comm_peer_lookup ompi_comm_peer_invalid ompi_comm_init ompi_comm_link_function ompi_comm_group ompi_comm_create ompi_topo_create ompi_comm_split ompi_comm_dup ompi_comm_compare ompi_comm_free ompi_comm_allocate ompi_comm_nextcid ompi_comm_finalize ompi_comm_set ompi_comm_get_rprocs ompi_comm_overlapping_groups ompi_comm_determine_first ompi_comm_activate ompi_comm_dump ompi_comm_set_name ompi_comm_reg_init ompi_comm_reg_finalize ompi_comm_num_dync CONVERTOR_DATATYPE_MASK CONVERTOR_SEND_CONVERSION CONVERTOR_RECV CONVERTOR_SEND CONVERTOR_HOMOGENEOUS CONVERTOR_NO_OP CONVERTOR_WITH_CHECKSUM CONVERTOR_TYPE_MASK CONVERTOR_STATE_START CONVERTOR_STATE_COMPLETE CONVERTOR_STATE_ALLOC CONVERTOR_COMPLETED ompi_convertor_t ompi_convertor_master_t dt_stack_t DT_STATIC_STACK_SIZE ompi_convertor_get_checksum ompi_convertor_pack ompi_convertor_unpack ompi_convertor_create ompi_convertor_cleanup ompi_convertor_need_buffers ompi_convertor_get_packed_size ompi_convertor_get_unpacked_size ompi_convertor_get_current_pointer ompi_convertor_prepare_for_send ompi_convertor_copy_and_prepare_for_send ompi_convertor_prepare_for_recv ompi_convertor_copy_and_prepare_for_recv ompi_convertor_raw ompi_convertor_set_position_nocheck ompi_convertor_set_position ompi_convertor_personalize ompi_convertor_clone ompi_convertor_clone_with_position ompi_convertor_dump ompi_ddt_dump_stack ompi_convertor_generic_simple_position MPI_Datatype"
SEARCH_HEADER[6]="ompi/datatype/datatype.h MPI_Datatype DT_MAX_PREDEFINED DT_FLAG_ MAX_DT_COMPONENT_COUNT opal_ddt_count_t dt_type_desc_t ompi_datatype_t ompi_predefined_datatype_t ompi_ddt_init ompi_ddt_finalize ompi_ddt_create_ ompi_ddt_duplicate ompi_ddt_is_predefined ompi_ddt_create_from_packed_description"
SEARCH_HEADER[7]="ompi/datatype/datatype_internal.h DDT_DUMP_STACK DT_ ddt_elem_id_description ddt_elem_desc ddt_elem_desc_t ddt_loop_desc ddt_loop_desc_t ddt_endloop_desc ddt_endloop_desc_t dt_elem_desc CREATE_LOOP_START CREATE_LOOP_END CREATE_ELEM ompi_complex_float_t ompi_complex_double_t ompi_complex_long_double_t ompi_ddt_basicDatatypes BASIC_DDT_FROM_ELEM ompi_ddt_default_convertors_init ompi_ddt_default_convertors_fini SAVE_STACK PUSH_STACK ompi_ddt_safeguard_pointer_debug_breakpoint OMPI_DDT_SAFEGUARD_POINTER GET_FIRST_NON_LOOP UPDATE_INTERNAL_COUNTERS ompi_ddt_print_args"
SEARCH_HEADER[8]="ompi/errhandler/errhandler.h OMPI_ERRHANDLER_LANG_ ompi_errhandler_lang_t OMPI_ERRHANDLER_TYPE_ ompi_errhandler_type_t ompi_errhandler_t ompi_predefined_errhandler_t ompi_mpi_errhandler_null OMPI_ERRHANDLER_CHECK OMPI_ERRHANDLER_RETURN ompi_errhandler_init ompi_errhandler_finalize OMPI_ERRHANDLER_INVOKE ompi_errhandler_invoke ompi_errhandler_request_invoke ompi_errhandler_create ompi_errhandler_is_intrinsic ompi_errhandler_fortran_handler_fn_t OMPI_ERR_INIT_FINALIZE MPI_Errhandler"
Expand Down
43 changes: 35 additions & 8 deletions ompi/communicator/comm_cid.c
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ struct ompi_comm_cid_context_t {
int remote_leader;
int iter;
/** storage for activate barrier */
int ok;
int max_local_peers;
char *port_string;
bool send_first;
int pml_tag;
Expand Down Expand Up @@ -266,7 +266,7 @@ static ompi_comm_cid_context_t *mca_comm_cid_context_alloc (ompi_communicator_t

context->send_first = send_first;
context->iter = 0;
context->ok = 1;
context->max_local_peers = ompi_group_count_local_peers(newcomm->c_local_group);

return context;
}
Expand Down Expand Up @@ -771,9 +771,33 @@ static int ompi_comm_nextcid_check_flag (ompi_comm_request_t *request)
/* Non-blocking version of ompi_comm_activate */
static int ompi_comm_activate_nb_complete (ompi_comm_request_t *request);

static int ompi_comm_activate_complete (ompi_communicator_t **newcomm, ompi_communicator_t *comm)
/* Callback function to set communicator disjointness flags */
static inline void ompi_comm_set_disjointness_nb_complete(ompi_comm_cid_context_t *context)
{
if (OMPI_COMM_IS_DISJOINT_SET(*context->newcommp)) {
opal_show_help("help-comm.txt", "disjointness-set-again", true);
return;
}

if (1 == context->max_local_peers) {
(*context->newcommp)->c_flags |= OMPI_COMM_DISJOINT;
} else {
(*context->newcommp)->c_flags &= ~OMPI_COMM_DISJOINT;
}
(*context->newcommp)->c_flags |= OMPI_COMM_DISJOINT_SET;
}

static int ompi_comm_activate_complete (ompi_comm_cid_context_t *context)
{
int ret;
ompi_communicator_t **newcomm = context->newcommp, *comm = context->comm;

/**
* Determine the new communicator's disjointness based on
* context->max_local_peers. It is reduced on the communicator
* before ompi_comm_activate_nb_complete is called.
*/
ompi_comm_set_disjointness_nb_complete(context);

/**
* Check to see if this process is in the new communicator.
Expand Down Expand Up @@ -846,7 +870,7 @@ int ompi_comm_activate_nb (ompi_communicator_t **newcomm, ompi_communicator_t *c
ompi_comm_cid_context_t *context;
ompi_comm_request_t *request;
ompi_request_t *subreq;
int ret = 0;
int ret = 0, local_peers = -1;

/* the caller should not pass NULL for comm (it may be the same as *newcomm) */
assert (NULL != comm);
Expand Down Expand Up @@ -878,10 +902,13 @@ int ompi_comm_activate_nb (ompi_communicator_t **newcomm, ompi_communicator_t *c
OMPI_COMM_SET_PML_ADDED(*newcomm);
}

/* Step 1: the barrier, after which it is allowed to
* send messages over the new communicator
/**
* Dual-purpose barrier:
* 1. The communicator's disjointness is inferred from max_local_peers.
* 2. After the operation it is allowed to send messages over the new communicator.
*/
ret = context->allreduce_fn (&context->ok, &context->ok, 1, MPI_MIN, context,
bosilca marked this conversation as resolved.
Show resolved Hide resolved
local_peers = context->max_local_peers;
ret = context->allreduce_fn (&local_peers, &context->max_local_peers, 1, MPI_MAX, context,
&subreq);
if (OMPI_SUCCESS != ret) {
bosilca marked this conversation as resolved.
Show resolved Hide resolved
ompi_comm_request_return (request);
Expand Down Expand Up @@ -920,7 +947,7 @@ int ompi_comm_activate (ompi_communicator_t **newcomm, ompi_communicator_t *comm
static int ompi_comm_activate_nb_complete (ompi_comm_request_t *request)
{
ompi_comm_cid_context_t *context = (ompi_comm_cid_context_t *) request->context;
return ompi_comm_activate_complete (context->newcommp, context->comm);
return ompi_comm_activate_complete (context);
}

/**************************************************************************/
Expand Down
4 changes: 4 additions & 0 deletions ompi/communicator/communicator.h
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,8 @@ OMPI_DECLSPEC OBJ_CLASS_DECLARATION(ompi_communicator_t);
#define OMPI_COMM_DYNAMIC 0x00000008
#define OMPI_COMM_ISFREED 0x00000010
#define OMPI_COMM_INVALID 0x00000020
#define OMPI_COMM_DISJOINT_SET 0x00000040
#define OMPI_COMM_DISJOINT 0x00000080
#define OMPI_COMM_CART 0x00000100
#define OMPI_COMM_GRAPH 0x00000200
#define OMPI_COMM_DIST_GRAPH 0x00000400
Expand All @@ -80,6 +82,8 @@ OMPI_DECLSPEC OBJ_CLASS_DECLARATION(ompi_communicator_t);
#define OMPI_COMM_IS_FREED(comm) ((comm)->c_flags & OMPI_COMM_ISFREED)
#define OMPI_COMM_IS_DYNAMIC(comm) ((comm)->c_flags & OMPI_COMM_DYNAMIC)
#define OMPI_COMM_IS_INVALID(comm) ((comm)->c_flags & OMPI_COMM_INVALID)
#define OMPI_COMM_IS_DISJOINT_SET(comm) ((comm)->c_flags & OMPI_COMM_DISJOINT_SET)
#define OMPI_COMM_IS_DISJOINT(comm) ((comm)->c_flags & OMPI_COMM_DISJOINT)
#define OMPI_COMM_IS_PML_ADDED(comm) ((comm)->c_flags & OMPI_COMM_PML_ADDED)
#define OMPI_COMM_IS_EXTRA_RETAIN(comm) ((comm)->c_flags & OMPI_COMM_EXTRA_RETAIN)
#define OMPI_COMM_IS_TOPO(comm) (OMPI_COMM_IS_CART((comm)) || \
Expand Down
3 changes: 3 additions & 0 deletions ompi/communicator/help-comm.txt
Original file line number Diff line number Diff line change
Expand Up @@ -34,3 +34,6 @@ in a call to MPI_Comm_split_type between peers in the communicator.
[unexpected-split-type]
Detected an unexpected split type in a call to MPI_Comm_split_type.
split_type: %s (%d)
[disjointness-set-again]
Communicator disjointness should only be set once at initialization.
Attempts to modify the state are illegal and shall be ignored.
7 changes: 4 additions & 3 deletions ompi/mca/coll/han/coll_han_subcomms.c
Original file line number Diff line number Diff line change
Expand Up @@ -282,6 +282,7 @@ int mca_coll_han_comm_create(struct ompi_communicator_t *comm,
opal_info_set(&comm_info, "ompi_comm_coll_preference", "tuned,^han");
ompi_comm_split_type(comm, MPI_COMM_TYPE_SHARED, 0,
&comm_info, &(low_comms[0]));
assert(OMPI_COMM_IS_DISJOINT_SET(low_comms[0]) && !OMPI_COMM_IS_DISJOINT(low_comms[0]));

/*
* Get my local rank and the local size
Expand All @@ -296,6 +297,7 @@ int mca_coll_han_comm_create(struct ompi_communicator_t *comm,
opal_info_set(&comm_info, "ompi_comm_coll_preference", "sm,^han");
ompi_comm_split_type(comm, MPI_COMM_TYPE_SHARED, 0,
&comm_info, &(low_comms[1]));
assert(OMPI_COMM_IS_DISJOINT_SET(low_comms[1]) && !OMPI_COMM_IS_DISJOINT(low_comms[1]));

/*
* Upgrade libnbc module priority to set up up_comms[0] with libnbc module
Expand All @@ -304,15 +306,16 @@ int mca_coll_han_comm_create(struct ompi_communicator_t *comm,
*/
opal_info_set(&comm_info, "ompi_comm_coll_preference", "libnbc,^han");
ompi_comm_split_with_info(comm, low_rank, w_rank, &comm_info, &(up_comms[0]), false);

up_rank = ompi_comm_rank(up_comms[0]);
assert(OMPI_COMM_IS_DISJOINT_SET(up_comms[0]) && OMPI_COMM_IS_DISJOINT(up_comms[0]));

/*
* Upgrade adapt module priority to set up up_comms[0] with adapt module
* This sub-communicator contains one process per node.
*/
opal_info_set(&comm_info, "ompi_comm_coll_preference", "adapt,^han");
ompi_comm_split_with_info(comm, low_rank, w_rank, &comm_info, &(up_comms[1]), false);
assert(OMPI_COMM_IS_DISJOINT_SET(up_comms[1]) && OMPI_COMM_IS_DISJOINT(up_comms[1]));

/*
* Set my virtual rank number.
Expand Down Expand Up @@ -350,5 +353,3 @@ int mca_coll_han_comm_create(struct ompi_communicator_t *comm,
OBJ_DESTRUCT(&comm_info);
return OMPI_SUCCESS;
}