From 020687e2fdd49ecc38957fff15eace026e8be756 Mon Sep 17 00:00:00 2001 From: Ilia Yastrebov Date: Mon, 28 Jul 2025 03:59:42 +0000 Subject: [PATCH 01/18] UCP: Added config option for protocol variants --- src/ucp/core/ucp_context.c | 8 ++++++++ src/ucp/core/ucp_context.h | 2 ++ 2 files changed, 10 insertions(+) diff --git a/src/ucp/core/ucp_context.c b/src/ucp/core/ucp_context.c index aae4878627a..a2dad24a4bc 100644 --- a/src/ucp/core/ucp_context.c +++ b/src/ucp/core/ucp_context.c @@ -504,6 +504,14 @@ static ucs_config_field_t ucp_context_config_table[] = { "directory.", ucs_offsetof(ucp_context_config_t, proto_info_dir), UCS_CONFIG_TYPE_STRING}, + {"PROTO_VARIANTS", "y", + "Enable multiple variants of UCP protocols, meaning that a single protocol\n" + "may have multiple variants (optimized for latency or bandwidth) for the same\n" + "operation. The value is interpreted as follows:\n" + " 'y' : Enable multiple variants\n" + " 'n' : Disable multiple variants\n", + ucs_offsetof(ucp_context_config_t, proto_variants_enable), UCS_CONFIG_TYPE_BOOL}, + {"REG_NONBLOCK_MEM_TYPES", "", "Perform only non-blocking memory registration for these memory types.\n" "Non-blocking registration means that the page registration may be\n" diff --git a/src/ucp/core/ucp_context.h b/src/ucp/core/ucp_context.h index f4c1e365d42..e1474ff62d5 100644 --- a/src/ucp/core/ucp_context.h +++ b/src/ucp/core/ucp_context.h @@ -184,6 +184,8 @@ typedef struct ucp_context_config { char *select_distance_md; /** Directory to write protocol selection information */ char *proto_info_dir; + /** Enable multiple variants of UCP protocols */ + int proto_variants_enable; /** Memory types that perform non-blocking registration by default */ uint64_t reg_nb_mem_types; /** Prefer native RMA transports for RMA/AMO protocols */ From bc5fcdd46094810b07fe5fec2337afb785fad01b Mon Sep 17 00:00:00 2001 From: Ilia Yastrebov Date: Mon, 28 Jul 2025 09:51:58 +0000 Subject: [PATCH 02/18] UCP: Added protocol variant enum --- src/ucp/proto/proto_common.h | 19 +++++++++++++++++++ src/ucp/proto/proto_common.inl | 22 ++++++++++++++++++++++ 2 files changed, 41 insertions(+) diff --git a/src/ucp/proto/proto_common.h b/src/ucp/proto/proto_common.h index cd421a13852..428e16b6047 100644 --- a/src/ucp/proto/proto_common.h +++ b/src/ucp/proto/proto_common.h @@ -196,6 +196,25 @@ typedef struct { } ucp_proto_common_lane_priv_t; +/** + * Protocol selection variants macro, used to iterate over all variants. + * The second argument is the message size for the variant, which is used to + * calculate the score for the variant. The third argument is the name of the + * variant, which is used to print the variant name. + */ +#define UCP_FOREACH_PROTO_VARIANT(_macro) \ + _macro(UCP_PROTO_VARIANT_LAT, UCS_KBYTE, "lat") \ + _macro(UCP_PROTO_VARIANT_BW, UCS_GBYTE, "bw") + +/** + * Protocol selection variant enum + */ +#define UCP_PROTO_VARIANT_ENUMIFY(ID, MSG_SIZE, NAME) ID, +typedef enum { + UCP_FOREACH_PROTO_VARIANT(UCP_PROTO_VARIANT_ENUMIFY) +} ucp_proto_variant_t; + + /** * Called the first time the protocol starts sending a request, and only once * per request. diff --git a/src/ucp/proto/proto_common.inl b/src/ucp/proto/proto_common.inl index c544a5c8be0..b5697275c05 100644 --- a/src/ucp/proto/proto_common.inl +++ b/src/ucp/proto/proto_common.inl @@ -407,4 +407,26 @@ ucp_proto_common_get_dev_index(const ucp_proto_init_params_t *params, return params->worker->context->tl_rscs[rsc_index].dev_index; } +static UCS_F_ALWAYS_INLINE size_t +ucp_proto_common_get_variant_msg_size(ucp_proto_variant_t variant) +{ +#define UCP_PROTO_VARIANT_IT(ID, MSG_SIZE, _) case ID: return MSG_SIZE; + switch (variant) { + UCP_FOREACH_PROTO_VARIANT(UCP_PROTO_VARIANT_IT) + default: ucs_assert_always(0); + } +#undef UCP_PROTO_VARIANT_IT +} + +static UCS_F_ALWAYS_INLINE const char* +ucp_proto_common_get_variant_name(ucp_proto_variant_t variant) +{ +#define UCP_PROTO_VARIANT_IT(ID, _, NAME) case ID: return NAME; + switch (variant) { + UCP_FOREACH_PROTO_VARIANT(UCP_PROTO_VARIANT_IT) + default: ucs_assert_always(0); + } +#undef UCP_PROTO_VARIANT_IT +} + #endif From cae63ff08cd883354d6e1c087e101f5fa0648d1f Mon Sep 17 00:00:00 2001 From: Ilia Yastrebov Date: Mon, 28 Jul 2025 11:35:15 +0000 Subject: [PATCH 03/18] UCP: Sorting by score, not BW --- src/ucp/proto/proto_multi.c | 58 ++++++++++++++++++++++--------------- 1 file changed, 34 insertions(+), 24 deletions(-) diff --git a/src/ucp/proto/proto_multi.c b/src/ucp/proto/proto_multi.c index 66e96c1a505..5bae284129c 100644 --- a/src/ucp/proto/proto_multi.c +++ b/src/ucp/proto/proto_multi.c @@ -56,17 +56,25 @@ ucp_proto_multi_get_avail_bw(const ucp_proto_init_params_t *params, return lane_perf->bandwidth * ratio; } +static UCS_F_ALWAYS_INLINE double +ucp_proto_multi_get_score(double bw, const ucp_proto_common_tl_perf_t *lane_perf, + ucp_proto_variant_t variant) +{ + size_t msg_size = ucp_proto_common_get_variant_msg_size(variant); + return 1.0 / ((msg_size / bw) + lane_perf->latency); +} + static ucp_lane_index_t -ucp_proto_multi_find_max_avail_bw_lane(const ucp_proto_init_params_t *params, - const ucp_lane_index_t *lanes, - const ucp_proto_common_tl_perf_t *lanes_perf, - const ucp_proto_lane_selection_t *selection, - ucp_lane_map_t index_map) +ucp_proto_multi_find_max_score_lane(const ucp_proto_init_params_t *params, + const ucp_lane_index_t *lanes, + const ucp_proto_common_tl_perf_t *lanes_perf, + const ucp_proto_lane_selection_t *selection, + ucp_lane_map_t index_map, + ucp_proto_variant_t variant) { - /* Initial value is 1Bps, so we don't consider lanes with lower available - * bandwidth. */ - double max_avail_bw = 1.0; + double max_score = 0.0; ucp_lane_index_t max_index = UCP_NULL_LANE; + double score; double avail_bw; const ucp_proto_common_tl_perf_t *lane_perf; ucp_lane_index_t lane, index; @@ -77,9 +85,10 @@ ucp_proto_multi_find_max_avail_bw_lane(const ucp_proto_init_params_t *params, lane_perf = &lanes_perf[lane]; avail_bw = ucp_proto_multi_get_avail_bw(params, lane, lane_perf, selection); - if (avail_bw > max_avail_bw) { - max_avail_bw = avail_bw; - max_index = index; + score = ucp_proto_multi_get_score(avail_bw, lane_perf, variant); + if (score > max_score) { + max_score = score; + max_index = index; } } @@ -102,13 +111,14 @@ ucp_proto_select_add_lane(ucp_proto_lane_selection_t *selection, } static void -ucp_proto_multi_select_bw_lanes(const ucp_proto_init_params_t *params, - const ucp_lane_index_t *lanes, - ucp_lane_index_t num_lanes, - ucp_lane_index_t max_lanes, - const ucp_proto_common_tl_perf_t *lanes_perf, - int fixed_first_lane, - ucp_proto_lane_selection_t *selection) +ucp_proto_multi_select_lanes(const ucp_proto_init_params_t *params, + const ucp_lane_index_t *lanes, + ucp_lane_index_t num_lanes, + ucp_lane_index_t max_lanes, + const ucp_proto_common_tl_perf_t *lanes_perf, + int fixed_first_lane, + ucp_proto_variant_t variant, + ucp_proto_lane_selection_t *selection) { ucp_lane_index_t i, lane_index; ucp_lane_map_t index_map; @@ -125,9 +135,9 @@ ucp_proto_multi_select_bw_lanes(const ucp_proto_init_params_t *params, for (i = fixed_first_lane? 1 : 0; i < ucs_min(max_lanes, num_lanes); ++i) { /* Greedy algorithm: find the best option at every step */ - lane_index = ucp_proto_multi_find_max_avail_bw_lane(params, lanes, - lanes_perf, selection, - index_map); + lane_index = ucp_proto_multi_find_max_score_lane(params, lanes, + lanes_perf, selection, + index_map, variant); if (lane_index == UCP_NULL_LANE) { break; } @@ -242,9 +252,9 @@ ucs_status_t ucp_proto_multi_init(const ucp_proto_multi_init_params_t *params, } num_lanes = num_fast_lanes; - ucp_proto_multi_select_bw_lanes(¶ms->super.super, lanes, num_lanes, - params->max_lanes, lanes_perf, - fixed_first_lane, &selection); + ucp_proto_multi_select_lanes(¶ms->super.super, lanes, num_lanes, + params->max_lanes, lanes_perf, + fixed_first_lane, UCP_PROTO_VARIANT_BW, &selection); ucs_trace("selected %u lanes for %s", selection.num_lanes, ucp_proto_id_field(params->super.super.proto_id, name)); From 1a340c3f7d47ac7cbf43f89ccd7acb3e853e6e94 Mon Sep 17 00:00:00 2001 From: Ilia Yastrebov Date: Thu, 27 Feb 2025 18:00:23 +0000 Subject: [PATCH 04/18] UCP/PROTO: Added node to tl_perf struct --- src/ucp/proto/proto_common.c | 15 +++++++-------- src/ucp/proto/proto_common.h | 6 ++++-- src/ucp/proto/proto_init.c | 8 +++----- src/ucp/proto/proto_init.h | 1 - src/ucp/proto/proto_multi.c | 22 +++++++++------------- src/ucp/proto/proto_single.c | 10 ++++------ 6 files changed, 27 insertions(+), 35 deletions(-) diff --git a/src/ucp/proto/proto_common.c b/src/ucp/proto/proto_common.c index 4adb8f3b98a..22d3a511936 100644 --- a/src/ucp/proto/proto_common.c +++ b/src/ucp/proto/proto_common.c @@ -235,7 +235,7 @@ ucp_proto_common_get_frag_size(const ucp_proto_common_init_params_t *params, /* Update 'perf' with the distance */ static void ucp_proto_common_update_lane_perf_by_distance( - ucp_proto_common_tl_perf_t *perf, ucp_proto_perf_node_t *perf_node, + ucp_proto_common_tl_perf_t *perf, const ucs_sys_dev_distance_t *distance, const char *perf_name, const char *perf_fmt, ...) { @@ -261,7 +261,7 @@ static void ucp_proto_common_update_lane_perf_by_distance( sys_perf_node = ucp_proto_perf_node_new_data(perf_name, "%s", perf_node_desc); ucp_proto_perf_node_add_data(sys_perf_node, "", distance_func); - ucp_proto_perf_node_own_child(perf_node, &sys_perf_node); + ucp_proto_perf_node_own_child(perf->node, &sys_perf_node); } void ucp_proto_common_lane_perf_node(ucp_context_h context, @@ -317,6 +317,7 @@ static void ucp_proto_common_tl_perf_reset(ucp_proto_common_tl_perf_t *tl_perf) tl_perf->sys_latency = 0; tl_perf->min_length = 0; tl_perf->max_frag = SIZE_MAX; + tl_perf->node = NULL; } static void ucp_proto_common_perf_attr_set_mem_type( @@ -337,8 +338,7 @@ static void ucp_proto_common_perf_attr_set_mem_type( ucs_status_t ucp_proto_common_get_lane_perf(const ucp_proto_common_init_params_t *params, ucp_lane_index_t lane, - ucp_proto_common_tl_perf_t *tl_perf, - ucp_proto_perf_node_t **perf_node_p) + ucp_proto_common_tl_perf_t *tl_perf) { ucp_worker_h worker = params->super.worker; ucp_context_h context = worker->context; @@ -356,7 +356,6 @@ ucp_proto_common_get_lane_perf(const ucp_proto_common_init_params_t *params, if (lane == UCP_NULL_LANE) { ucp_proto_common_tl_perf_reset(tl_perf); - *perf_node_p = NULL; return UCS_OK; } @@ -423,7 +422,7 @@ ucp_proto_common_get_lane_perf(const ucp_proto_common_init_params_t *params, ucp_proto_common_get_lane_distance(¶ms->super, lane, sys_dev, &distance); ucp_proto_common_update_lane_perf_by_distance( - tl_perf, perf_node, &distance, "local system", "%s %s", + tl_perf, &distance, "local system", "%s %s", ucs_topo_sys_device_get_name(sys_dev), ucs_topo_sys_device_bdf_name(sys_dev, bdf_name, sizeof(bdf_name))); @@ -437,7 +436,7 @@ ucp_proto_common_get_lane_perf(const ucp_proto_common_init_params_t *params, rkey_config = &worker->rkey_config[params->super.rkey_cfg_index]; distance = rkey_config->lanes_distance[lane]; ucp_proto_common_update_lane_perf_by_distance( - tl_perf, perf_node, &distance, "remote system", "sys-dev %d %s", + tl_perf, &distance, "remote system", "sys-dev %d %s", rkey_config->key.sys_dev, ucs_memory_type_names[rkey_config->key.mem_type]); } @@ -460,7 +459,7 @@ ucp_proto_common_get_lane_perf(const ucp_proto_common_init_params_t *params, tl_perf->send_post_overhead); ucp_proto_perf_node_add_scalar(perf_node, "recv", tl_perf->recv_overhead); - *perf_node_p = perf_node; + tl_perf->node = perf_node; return UCS_OK; err_deref_perf_node: diff --git a/src/ucp/proto/proto_common.h b/src/ucp/proto/proto_common.h index 428e16b6047..5c7cc7132ae 100644 --- a/src/ucp/proto/proto_common.h +++ b/src/ucp/proto/proto_common.h @@ -176,6 +176,9 @@ typedef struct { /* Maximum single message length */ size_t max_frag; + + /* Performance selection tree node */ + ucp_proto_perf_node_t *node; } ucp_proto_common_tl_perf_t; @@ -298,8 +301,7 @@ void ucp_proto_common_lane_perf_node(ucp_context_h context, ucs_status_t ucp_proto_common_get_lane_perf(const ucp_proto_common_init_params_t *params, ucp_lane_index_t lane, - ucp_proto_common_tl_perf_t *perf, - ucp_proto_perf_node_t **perf_node_p); + ucp_proto_common_tl_perf_t *perf); typedef int (*ucp_proto_common_filter_lane_cb_t)( diff --git a/src/ucp/proto/proto_init.c b/src/ucp/proto/proto_init.c index 724d99fcff2..bce2884f868 100644 --- a/src/ucp/proto/proto_init.c +++ b/src/ucp/proto/proto_init.c @@ -129,7 +129,6 @@ ucp_proto_init_skip_recv_overhead(const ucp_proto_common_init_params_t *params, static ucs_status_t ucp_proto_init_add_tl_perf(const ucp_proto_common_init_params_t *params, const ucp_proto_common_tl_perf_t *tl_perf, - ucp_proto_perf_node_t *const tl_perf_node, size_t range_start, size_t range_end, ucp_proto_perf_t *perf) { @@ -185,7 +184,7 @@ ucp_proto_init_add_tl_perf(const ucp_proto_common_init_params_t *params, return ucp_proto_perf_add_funcs(perf, range_start, range_end, perf_factors, ucp_proto_perf_node_new_data("transport", ""), - tl_perf_node); + tl_perf->node); } /** @@ -503,7 +502,6 @@ ucp_proto_common_check_mem_access(const ucp_proto_common_init_params_t *params) ucs_status_t ucp_proto_init_perf(const ucp_proto_common_init_params_t *params, const ucp_proto_common_tl_perf_t *tl_perf, - ucp_proto_perf_node_t *const tl_perf_node, ucp_md_map_t reg_md_map, const char *perf_name, ucp_proto_perf_t **perf_p) { @@ -532,8 +530,8 @@ ucs_status_t ucp_proto_init_perf(const ucp_proto_common_init_params_t *params, return status; } - status = ucp_proto_init_add_tl_perf(params, tl_perf, tl_perf_node, - range_start, range_end, perf); + status = ucp_proto_init_add_tl_perf(params, tl_perf, range_start, range_end, + perf); if (status != UCS_OK) { goto err_cleanup_perf; } diff --git a/src/ucp/proto/proto_init.h b/src/ucp/proto/proto_init.h index 4bff6472b01..8ff0c1516b2 100644 --- a/src/ucp/proto/proto_init.h +++ b/src/ucp/proto/proto_init.h @@ -68,7 +68,6 @@ ucp_proto_init_add_buffer_copy_time(ucp_worker_h worker, const char *title, ucs_status_t ucp_proto_init_perf(const ucp_proto_common_init_params_t *params, const ucp_proto_common_tl_perf_t *tl_perf, - ucp_proto_perf_node_t *const tl_perf_node, ucp_md_map_t reg_md_map, const char *perf_name, ucp_proto_perf_t **perf_p); diff --git a/src/ucp/proto/proto_multi.c b/src/ucp/proto/proto_multi.c index 5bae284129c..812ccecb618 100644 --- a/src/ucp/proto/proto_multi.c +++ b/src/ucp/proto/proto_multi.c @@ -158,14 +158,12 @@ ucs_status_t ucp_proto_multi_init(const ucp_proto_multi_init_params_t *params, { ucp_context_h context = params->super.super.worker->context; const double max_bw_ratio = context->config.ext.multi_lane_max_ratio; - ucp_proto_perf_node_t *lanes_perf_nodes[UCP_PROTO_MAX_LANES]; ucp_proto_common_tl_perf_t lanes_perf[UCP_PROTO_MAX_LANES]; ucp_proto_common_tl_perf_t *lane_perf, perf; ucp_lane_index_t lanes[UCP_PROTO_MAX_LANES]; double max_bandwidth, max_frag_ratio, min_bandwidth; ucp_lane_index_t i, lane, num_lanes, num_fast_lanes; ucp_proto_multi_lane_priv_t *lpriv; - ucp_proto_perf_node_t *perf_node; size_t max_frag, min_length, min_end_offset, min_chunk; ucp_proto_lane_selection_t selection; ucp_md_map_t reg_md_map; @@ -213,8 +211,7 @@ ucs_status_t ucp_proto_multi_init(const ucp_proto_multi_init_params_t *params, lane = lanes[i]; lane_perf = &lanes_perf[lane]; - status = ucp_proto_common_get_lane_perf(¶ms->super, lane, lane_perf, - &lanes_perf_nodes[lane]); + status = ucp_proto_common_get_lane_perf(¶ms->super, lane, lane_perf); if (status != UCS_OK) { return status; } @@ -389,27 +386,26 @@ ucs_status_t ucp_proto_multi_init(const ucp_proto_multi_init_params_t *params, } ucs_assert(mpriv->num_lanes == ucs_popcount(selection.lane_map)); - /* After this block, 'perf_node' and 'lane_perf_nodes[]' have extra ref */ if (mpriv->num_lanes == 1) { - perf_node = lanes_perf_nodes[ucs_ffs64(selection.lane_map)]; - ucp_proto_perf_node_ref(perf_node); + perf.node = lanes_perf[ucs_ffs64(selection.lane_map)].node; + ucp_proto_perf_node_ref(perf.node); } else { - perf_node = ucp_proto_perf_node_new_data("multi", "%u lanes", + perf.node = ucp_proto_perf_node_new_data("multi", "%u lanes", mpriv->num_lanes); ucs_for_each_bit(lane, selection.lane_map) { ucs_assert(lane < UCP_MAX_LANES); - ucp_proto_perf_node_add_child(perf_node, lanes_perf_nodes[lane]); + ucp_proto_perf_node_add_child(perf.node, lanes_perf[lane].node); } } - status = ucp_proto_init_perf(¶ms->super, &perf, perf_node, reg_md_map, - perf_name, perf_p); + status = ucp_proto_init_perf(¶ms->super, &perf, reg_md_map, perf_name, + perf_p); /* Deref unused nodes */ for (i = 0; i < num_lanes; ++i) { - ucp_proto_perf_node_deref(&lanes_perf_nodes[lanes[i]]); + ucp_proto_perf_node_deref(&lanes_perf[lanes[i]].node); } - ucp_proto_perf_node_deref(&perf_node); + ucp_proto_perf_node_deref(&perf.node); return status; } diff --git a/src/ucp/proto/proto_single.c b/src/ucp/proto/proto_single.c index 2441cc08475..866d62771c0 100644 --- a/src/ucp/proto/proto_single.c +++ b/src/ucp/proto/proto_single.c @@ -24,7 +24,6 @@ ucs_status_t ucp_proto_single_init(const ucp_proto_single_init_params_t *params, { const char *proto_name = ucp_proto_id_field(params->super.super.proto_id, name); - ucp_proto_perf_node_t *tl_perf_node; ucp_proto_common_tl_perf_t tl_perf; ucp_lane_index_t num_lanes; ucp_md_map_t reg_md_map; @@ -57,15 +56,14 @@ ucs_status_t ucp_proto_single_init(const ucp_proto_single_init_params_t *params, ucp_proto_common_lane_priv_init(¶ms->super, reg_md_map, lane, &spriv->super); - status = ucp_proto_common_get_lane_perf(¶ms->super, lane, &tl_perf, - &tl_perf_node); + status = ucp_proto_common_get_lane_perf(¶ms->super, lane, &tl_perf); if (status != UCS_OK) { return status; } - status = ucp_proto_init_perf(¶ms->super, &tl_perf, tl_perf_node, - reg_md_map, proto_name, perf_p); - ucp_proto_perf_node_deref(&tl_perf_node); + status = ucp_proto_init_perf(¶ms->super, &tl_perf, reg_md_map, + proto_name, perf_p); + ucp_proto_perf_node_deref(&tl_perf.node); return status; } From 9af95c5198c4cd1bb94e2c687a57338f81eb5d39 Mon Sep 17 00:00:00 2001 From: Ilia Yastrebov Date: Fri, 28 Feb 2025 05:43:44 +0000 Subject: [PATCH 05/18] UCP/PROTO: Fixed node issue --- src/ucp/proto/proto_common.c | 26 ++++++++++++-------------- src/ucp/proto/proto_multi.c | 2 +- 2 files changed, 13 insertions(+), 15 deletions(-) diff --git a/src/ucp/proto/proto_common.c b/src/ucp/proto/proto_common.c index 22d3a511936..1b1252cf6fa 100644 --- a/src/ucp/proto/proto_common.c +++ b/src/ucp/proto/proto_common.c @@ -345,7 +345,7 @@ ucp_proto_common_get_lane_perf(const ucp_proto_common_init_params_t *params, ucp_rsc_index_t rsc_index = ucp_proto_common_get_rsc_index(¶ms->super, lane); ucp_worker_iface_t *wiface = ucp_worker_iface(worker, rsc_index); - ucp_proto_perf_node_t *perf_node, *lane_perf_node; + ucp_proto_perf_node_t *lane_perf_node; const ucp_rkey_config_t *rkey_config; ucs_sys_dev_distance_t distance; size_t tl_min_frag, tl_max_frag; @@ -369,9 +369,9 @@ ucp_proto_common_get_lane_perf(const ucp_proto_common_init_params_t *params, return UCS_ERR_INVALID_PARAM; } - perf_node = ucp_proto_perf_node_new_data("lane", "%u ppn %u eps", - context->config.est_num_ppn, - context->config.est_num_eps); + tl_perf->node = ucp_proto_perf_node_new_data("lane", "%u ppn %u eps", + context->config.est_num_ppn, + context->config.est_num_eps); perf_attr.field_mask = UCT_PERF_ATTR_FIELD_OPERATION | UCT_PERF_ATTR_FIELD_SEND_PRE_OVERHEAD | @@ -405,7 +405,7 @@ ucp_proto_common_get_lane_perf(const ucp_proto_common_init_params_t *params, ucp_proto_common_lane_perf_node(context, rsc_index, &perf_attr, &lane_perf_node); - ucp_proto_perf_node_own_child(perf_node, &lane_perf_node); + ucp_proto_perf_node_own_child(tl_perf->node, &lane_perf_node); /* If reg_mem_info type is not unknown we assume the protocol is going to * send that mem type in a zero copy fashion. So, need to consider the @@ -450,20 +450,18 @@ ucp_proto_common_get_lane_perf(const ucp_proto_common_init_params_t *params, params->hdr_size); ucs_assert(tl_perf->sys_latency >= 0); - ucp_proto_perf_node_add_bandwidth(perf_node, "bw", tl_perf->bandwidth); - ucp_proto_perf_node_add_scalar(perf_node, "lat", tl_perf->latency); - ucp_proto_perf_node_add_scalar(perf_node, "sys-lat", tl_perf->sys_latency); - ucp_proto_perf_node_add_scalar(perf_node, "send-pre", + ucp_proto_perf_node_add_bandwidth(tl_perf->node, "bw", tl_perf->bandwidth); + ucp_proto_perf_node_add_scalar(tl_perf->node, "lat", tl_perf->latency); + ucp_proto_perf_node_add_scalar(tl_perf->node, "sys-lat", tl_perf->sys_latency); + ucp_proto_perf_node_add_scalar(tl_perf->node, "send-pre", tl_perf->send_pre_overhead); - ucp_proto_perf_node_add_scalar(perf_node, "send-post", + ucp_proto_perf_node_add_scalar(tl_perf->node, "send-post", tl_perf->send_post_overhead); - ucp_proto_perf_node_add_scalar(perf_node, "recv", tl_perf->recv_overhead); - - tl_perf->node = perf_node; + ucp_proto_perf_node_add_scalar(tl_perf->node, "recv", tl_perf->recv_overhead); return UCS_OK; err_deref_perf_node: - ucp_proto_perf_node_deref(&perf_node); + ucp_proto_perf_node_deref(&tl_perf->node); return status; } diff --git a/src/ucp/proto/proto_multi.c b/src/ucp/proto/proto_multi.c index 812ccecb618..3238cf2cda2 100644 --- a/src/ucp/proto/proto_multi.c +++ b/src/ucp/proto/proto_multi.c @@ -238,7 +238,7 @@ ucs_status_t ucp_proto_multi_init(const ucp_proto_multi_init_params_t *params, if ((lane_perf->bandwidth * max_bw_ratio) < max_bandwidth) { /* Bandwidth on this lane is too low compared to the fastest available lane, so it's not worth using it */ - ucp_proto_perf_node_deref(&lanes_perf_nodes[lane]); + ucp_proto_perf_node_deref(&lanes_perf[lane].node); ucs_trace("drop " UCP_PROTO_LANE_FMT, UCP_PROTO_LANE_ARG(¶ms->super.super, lane, lane_perf)); } else { From c090af1752f195844a229eaf36788a644b375d7e Mon Sep 17 00:00:00 2001 From: Ilia Yastrebov Date: Tue, 29 Jul 2025 05:13:48 +0000 Subject: [PATCH 06/18] UCP/PROTO: Selection perf aggregation --- src/ucp/proto/proto_common.h | 9 ++- src/ucp/proto/proto_multi.c | 139 ++++++++++++++++++++++------------- 2 files changed, 93 insertions(+), 55 deletions(-) diff --git a/src/ucp/proto/proto_common.h b/src/ucp/proto/proto_common.h index 5c7cc7132ae..7d23822bd88 100644 --- a/src/ucp/proto/proto_common.h +++ b/src/ucp/proto/proto_common.h @@ -183,10 +183,11 @@ typedef struct { typedef struct { - ucp_lane_map_t lane_map; - ucp_lane_index_t lanes[UCP_PROTO_MAX_LANES]; - ucp_lane_index_t num_lanes; - uint8_t dev_count[UCP_MAX_RESOURCES]; + ucp_lane_map_t lane_map; + ucp_lane_index_t lanes[UCP_PROTO_MAX_LANES]; + ucp_lane_index_t num_lanes; + uint8_t dev_count[UCP_MAX_RESOURCES]; + ucp_proto_common_tl_perf_t perf; } ucp_proto_lane_selection_t; diff --git a/src/ucp/proto/proto_multi.c b/src/ucp/proto/proto_multi.c index 3238cf2cda2..c2b1459dcc8 100644 --- a/src/ucp/proto/proto_multi.c +++ b/src/ucp/proto/proto_multi.c @@ -110,6 +110,84 @@ ucp_proto_select_add_lane(ucp_proto_lane_selection_t *selection, selection->dev_count[dev_index]++; } +static void +ucp_proto_select_aggregate_perf(ucp_proto_lane_selection_t *selection, + const ucp_proto_common_tl_perf_t *lanes_perf) +{ + ucp_proto_common_tl_perf_t *perf = &selection->perf; + const ucp_proto_common_tl_perf_t *lane_perf; + ucp_lane_index_t lane; + + /* TODO: Adjust performance estimation based on the actual resource usage, + * i.e. split the overall iface BW between all selected paths. */ + + ucs_for_each_bit(lane, selection->lane_map) { + lane_perf = &lanes_perf[lane]; + + /* Update aggregated performance metric */ + perf->bandwidth += lane_perf->bandwidth; + perf->send_pre_overhead += lane_perf->send_pre_overhead; + perf->send_post_overhead += lane_perf->send_post_overhead; + perf->recv_overhead += lane_perf->recv_overhead; + perf->latency = ucs_max(perf->latency, lane_perf->latency); + perf->sys_latency = ucs_max(perf->sys_latency, + lane_perf->sys_latency); + } + + if (selection->num_lanes == 1) { + lane_perf = &lanes_perf[selection->lanes[0]]; + ucp_proto_perf_node_ref(lane_perf->node); + selection->perf.node = lane_perf->node; + } else { + selection->perf.node = ucp_proto_perf_node_new_data("multi", "%u lanes", + selection->num_lanes); + ucs_for_each_bit(lane, selection->lane_map) { + lane_perf = &lanes_perf[lane]; + ucp_proto_perf_node_add_child(selection->perf.node, lane_perf->node); + } + } +} + +static void +ucp_proto_select_trace_lanes(const ucp_proto_init_params_t *params, + const ucp_proto_lane_selection_t *selection, + const ucp_proto_common_tl_perf_t *lanes_perf, + const char *desc) +{ + ucp_context_h UCS_V_UNUSED context = params->worker->context; + const ucp_proto_common_tl_perf_t *lane_perf; + ucp_lane_index_t i, lane; + ucp_rsc_index_t rsc_index; + + if (!ucs_log_is_enabled(UCS_LOG_LEVEL_DEBUG)) { + return; + } + + ucs_debug("%s=%u, protocol=%s", desc, selection->num_lanes, + ucp_proto_id_field(params->proto_id, name)); + ucs_log_indent(1); + + for (i = 0; i < selection->num_lanes; ++i) { + lane = selection->lanes[i]; + lane_perf = &lanes_perf[lane]; + rsc_index = ucp_proto_common_get_rsc_index(params, lane); + + ucs_debug("lane[%d] " UCT_TL_RESOURCE_DESC_FMT " bw " + UCP_PROTO_PERF_FUNC_BW_FMT UCP_PROTO_TIME_FMT(latency) + UCP_PROTO_TIME_FMT(send_pre_overhead) + UCP_PROTO_TIME_FMT(send_post_overhead) + UCP_PROTO_TIME_FMT(recv_overhead), lane, + UCT_TL_RESOURCE_DESC_ARG(&context->tl_rscs[rsc_index].tl_rsc), + (lane_perf->bandwidth / UCS_MBYTE), + UCP_PROTO_TIME_ARG(lane_perf->latency), + UCP_PROTO_TIME_ARG(lane_perf->send_pre_overhead), + UCP_PROTO_TIME_ARG(lane_perf->send_post_overhead), + UCP_PROTO_TIME_ARG(lane_perf->recv_overhead)); + } + + ucs_log_indent(-1); +} + static void ucp_proto_multi_select_lanes(const ucp_proto_init_params_t *params, const ucp_lane_index_t *lanes, @@ -145,10 +223,6 @@ ucp_proto_multi_select_lanes(const ucp_proto_init_params_t *params, ucp_proto_select_add_lane(selection, params, lanes[lane_index]); index_map &= ~UCS_BIT(lane_index); } - - /* TODO: Aggregate performance: - * Split full iface bandwidth between selected paths, according to the total - * path ratio */ } ucs_status_t ucp_proto_multi_init(const ucp_proto_multi_init_params_t *params, @@ -159,7 +233,7 @@ ucs_status_t ucp_proto_multi_init(const ucp_proto_multi_init_params_t *params, ucp_context_h context = params->super.super.worker->context; const double max_bw_ratio = context->config.ext.multi_lane_max_ratio; ucp_proto_common_tl_perf_t lanes_perf[UCP_PROTO_MAX_LANES]; - ucp_proto_common_tl_perf_t *lane_perf, perf; + ucp_proto_common_tl_perf_t *lane_perf; ucp_lane_index_t lanes[UCP_PROTO_MAX_LANES]; double max_bandwidth, max_frag_ratio, min_bandwidth; ucp_lane_index_t i, lane, num_lanes, num_fast_lanes; @@ -221,12 +295,6 @@ ucs_status_t ucp_proto_multi_init(const ucp_proto_multi_init_params_t *params, } /* Select the lanes to use, and calculate their aggregate performance */ - perf.bandwidth = 0; - perf.send_pre_overhead = 0; - perf.send_post_overhead = 0; - perf.recv_overhead = 0; - perf.latency = 0; - perf.sys_latency = 0; max_frag_ratio = 0; min_bandwidth = DBL_MAX; @@ -253,20 +321,13 @@ ucs_status_t ucp_proto_multi_init(const ucp_proto_multi_init_params_t *params, params->max_lanes, lanes_perf, fixed_first_lane, UCP_PROTO_VARIANT_BW, &selection); - ucs_trace("selected %u lanes for %s", selection.num_lanes, - ucp_proto_id_field(params->super.super.proto_id, name)); - ucs_log_indent(1); + ucp_proto_select_aggregate_perf(&selection, lanes_perf); + ucp_proto_select_trace_lanes(¶ms->super.super, &selection, lanes_perf, + ucp_proto_common_get_variant_name(UCP_PROTO_VARIANT_BW)); for (i = 0; i < selection.num_lanes; ++i) { lane = selection.lanes[i]; lane_perf = &lanes_perf[lane]; - ucs_trace(UCP_PROTO_LANE_FMT UCP_PROTO_TIME_FMT(send_pre_overhead) - UCP_PROTO_TIME_FMT(send_post_overhead) - UCP_PROTO_TIME_FMT(recv_overhead), - UCP_PROTO_LANE_ARG(¶ms->super.super, lane, lane_perf), - UCP_PROTO_TIME_ARG(lane_perf->send_pre_overhead), - UCP_PROTO_TIME_ARG(lane_perf->send_post_overhead), - UCP_PROTO_TIME_ARG(lane_perf->recv_overhead)); /* Calculate maximal bandwidth-to-fragment-size ratio, which is used to adjust fragment sizes so they are proportional to bandwidth ratio and @@ -275,19 +336,8 @@ ucs_status_t ucp_proto_multi_init(const ucp_proto_multi_init_params_t *params, lane_perf->bandwidth / lane_perf->max_frag); min_bandwidth = ucs_min(min_bandwidth, lane_perf->bandwidth); - - /* Update aggregated performance metric */ - perf.bandwidth += lane_perf->bandwidth; - perf.send_pre_overhead += lane_perf->send_pre_overhead; - perf.send_post_overhead += lane_perf->send_post_overhead; - perf.recv_overhead += lane_perf->recv_overhead; - perf.latency = ucs_max(perf.latency, lane_perf->latency); - perf.sys_latency = ucs_max(perf.sys_latency, - lane_perf->sys_latency); } - ucs_log_indent(-1); - /* Initialize multi-lane private data and relative weights */ reg_md_map = ucp_proto_common_reg_md_map(¶ms->super, selection.lane_map); @@ -297,8 +347,6 @@ ucs_status_t ucp_proto_multi_init(const ucp_proto_multi_init_params_t *params, mpriv->min_frag = 0; mpriv->max_frag_sum = 0; mpriv->align_thresh = 1; - perf.max_frag = 0; - perf.min_length = 0; weight_sum = 0; min_end_offset = 0; @@ -328,11 +376,11 @@ ucs_status_t ucp_proto_multi_init(const ucp_proto_multi_init_params_t *params, params->min_chunk / min_bandwidth)); max_frag = ucs_max(max_frag, min_chunk); lpriv->max_frag = max_frag; - perf.max_frag += max_frag; + selection.perf.max_frag += max_frag; /* Calculate lane weight as a fixed-point fraction */ lpriv->weight = ucs_proto_multi_calc_weight(lane_perf->bandwidth, - perf.bandwidth); + selection.perf.bandwidth); ucs_assert(lpriv->weight > 0); ucs_assert(lpriv->weight <= UCP_PROTO_MULTI_WEIGHT_MAX); @@ -372,7 +420,7 @@ ucs_status_t ucp_proto_multi_init(const ucp_proto_multi_init_params_t *params, lpriv->weight; ucs_assert(ucp_proto_multi_scaled_length(lpriv->weight, min_length) >= lane_perf->min_length); - perf.min_length = ucs_max(perf.min_length, min_length); + selection.perf.min_length = ucs_max(selection.perf.min_length, min_length); weight_sum += lpriv->weight; min_end_offset += min_chunk; @@ -386,26 +434,15 @@ ucs_status_t ucp_proto_multi_init(const ucp_proto_multi_init_params_t *params, } ucs_assert(mpriv->num_lanes == ucs_popcount(selection.lane_map)); - if (mpriv->num_lanes == 1) { - perf.node = lanes_perf[ucs_ffs64(selection.lane_map)].node; - ucp_proto_perf_node_ref(perf.node); - } else { - perf.node = ucp_proto_perf_node_new_data("multi", "%u lanes", - mpriv->num_lanes); - ucs_for_each_bit(lane, selection.lane_map) { - ucs_assert(lane < UCP_MAX_LANES); - ucp_proto_perf_node_add_child(perf.node, lanes_perf[lane].node); - } - } + status = ucp_proto_init_perf(¶ms->super, &selection.perf, reg_md_map, + perf_name, perf_p); - status = ucp_proto_init_perf(¶ms->super, &perf, reg_md_map, perf_name, - perf_p); + ucp_proto_perf_node_deref(&selection.perf.node); /* Deref unused nodes */ for (i = 0; i < num_lanes; ++i) { ucp_proto_perf_node_deref(&lanes_perf[lanes[i]].node); } - ucp_proto_perf_node_deref(&perf.node); return status; } From 44cf07d54aacaefbd6a4b802dfbee5399e5cc799 Mon Sep 17 00:00:00 2001 From: Ilia Yastrebov Date: Tue, 29 Jul 2025 10:30:42 +0000 Subject: [PATCH 07/18] UCP/PROTO: Moved common logic to proto_common --- src/ucp/proto/proto_common.c | 193 +++++++++++++++++++++++++++++++++++ src/ucp/proto/proto_common.h | 55 ++++++---- src/ucp/proto/proto_multi.c | 177 +------------------------------- 3 files changed, 234 insertions(+), 191 deletions(-) diff --git a/src/ucp/proto/proto_common.c b/src/ucp/proto/proto_common.c index 1b1252cf6fa..5e8fdb68419 100644 --- a/src/ucp/proto/proto_common.c +++ b/src/ucp/proto/proto_common.c @@ -926,3 +926,196 @@ void ucp_proto_fatal_invalid_stage(ucp_request_t *req, const char *func_name) req->send.proto_config->proto->name, req->send.proto_stage, func_name); } + +static UCS_F_ALWAYS_INLINE double +ucp_proto_select_get_avail_bw(const ucp_proto_init_params_t *params, + ucp_lane_index_t lane, + const ucp_proto_common_tl_perf_t *lane_perf, + const ucp_proto_lane_selection_t *selection) +{ + /* Minimal path ratio */ + static const double MIN_RATIO = 0.01; + ucp_context_h context = params->worker->context; + double multi_path_ratio = context->config.ext.multi_path_ratio; + ucp_rsc_index_t dev_index = ucp_proto_common_get_dev_index(params, lane); + uint8_t path_index = selection->dev_count[dev_index]; + double ratio; + + if (UCS_CONFIG_DBL_IS_AUTO(multi_path_ratio)) { + ratio = ucs_min(1.0 - (lane_perf->path_ratio * path_index), + lane_perf->path_ratio); + } else { + ratio = 1.0 - (multi_path_ratio * path_index); + } + + if (ratio < MIN_RATIO) { + /* The iface BW is entirely consumed by the selected paths. But we still + * need to assign some minimal BW to the these extra paths in order to + * select them. We divide min ratio of the iface BW by path_index so + * that each additional path on the same device has lower bandwidth. */ + ratio = MIN_RATIO / path_index; + } + + ucs_trace("ratio=%0.3f path_index=%u avail_bw=" UCP_PROTO_PERF_FUNC_BW_FMT + " " UCP_PROTO_LANE_FMT, ratio, path_index, + (lane_perf->bandwidth * ratio) / UCS_MBYTE, + UCP_PROTO_LANE_ARG(params, lane, lane_perf)); + + return lane_perf->bandwidth * ratio; +} + +static UCS_F_ALWAYS_INLINE double +ucp_proto_select_get_score(double bw, const ucp_proto_common_tl_perf_t *lane_perf, + ucp_proto_variant_t variant) +{ + size_t msg_size = ucp_proto_common_get_variant_msg_size(variant); + return 1.0 / ((msg_size / bw) + lane_perf->latency); +} + +static ucp_lane_index_t +ucp_proto_select_find_max_score_lane(const ucp_proto_init_params_t *params, + const ucp_lane_index_t *lanes, + const ucp_proto_common_tl_perf_t *lanes_perf, + const ucp_proto_lane_selection_t *selection, + ucp_lane_map_t index_map) +{ + double max_score = 0.0; + ucp_lane_index_t max_index = UCP_NULL_LANE; + double score; + double avail_bw; + const ucp_proto_common_tl_perf_t *lane_perf; + ucp_lane_index_t lane, index; + + ucs_assert(index_map != 0); + ucs_for_each_bit(index, index_map) { + lane = lanes[index]; + lane_perf = &lanes_perf[lane]; + avail_bw = ucp_proto_select_get_avail_bw(params, lane, lane_perf, + selection); + score = ucp_proto_select_get_score(avail_bw, lane_perf, + selection->variant); + if (score > max_score) { + max_score = score; + max_index = index; + } + } + + return max_index; +} + +static UCS_F_ALWAYS_INLINE void +ucp_proto_select_add_lane(ucp_proto_lane_selection_t *selection, + const ucp_proto_init_params_t *params, + ucp_lane_index_t lane) +{ + ucp_rsc_index_t dev_index = ucp_proto_common_get_dev_index(params, lane); + + ucs_assertv(selection->num_lanes < UCP_PROTO_MAX_LANES, + "selection num_lanes=%u max_lanes=%u", selection->num_lanes, + UCP_PROTO_MAX_LANES); + selection->lanes[selection->num_lanes++] = lane; + selection->lane_map |= UCS_BIT(lane); + selection->dev_count[dev_index]++; +} + +void +ucp_proto_select_lanes(const ucp_proto_init_params_t *params, + const ucp_lane_index_t *lanes, + ucp_lane_index_t num_lanes, ucp_lane_index_t max_lanes, + const ucp_proto_common_tl_perf_t *lanes_perf, + int fixed_first_lane, ucp_proto_variant_t variant, + ucp_proto_lane_selection_t *selection) +{ + ucp_lane_index_t i, lane_index; + ucp_lane_map_t index_map; + + memset(selection, 0, sizeof(*selection)); + selection->variant = variant; + + /* Select all available indexes */ + index_map = UCS_MASK(num_lanes); + + if (fixed_first_lane) { + ucp_proto_select_add_lane(selection, params, lanes[0]); + index_map &= ~UCS_BIT(0); + } + + for (i = fixed_first_lane? 1 : 0; i < ucs_min(max_lanes, num_lanes); ++i) { + /* Greedy algorithm: find the best option at every step */ + lane_index = ucp_proto_select_find_max_score_lane(params, lanes, + lanes_perf, selection, + index_map); + if (lane_index == UCP_NULL_LANE) { + break; + } + + ucp_proto_select_add_lane(selection, params, lanes[lane_index]); + index_map &= ~UCS_BIT(lane_index); + } +} + +void ucp_proto_select_trace(const ucp_proto_init_params_t *params, + const ucp_proto_lane_selection_t *selection, + const ucp_proto_common_tl_perf_t *lanes_perf, + const char *desc, ucs_log_level_t level) +{ + ucp_context_h UCS_V_UNUSED context = params->worker->context; + const ucp_proto_common_tl_perf_t *lane_perf; + ucp_lane_index_t i, lane; + ucp_rsc_index_t rsc_index; + + if (!ucs_log_is_enabled(level)) { + return; + } + + ucs_log(level, "%s(%s)=%u, protocol=%s", desc, + ucp_proto_common_get_variant_name(selection->variant), + selection->num_lanes, ucp_proto_id_field(params->proto_id, name)); + ucs_log_indent(1); + + for (i = 0; i < selection->num_lanes; ++i) { + lane = selection->lanes[i]; + lane_perf = &lanes_perf[lane]; + rsc_index = ucp_proto_common_get_rsc_index(params, lane); + + ucs_log(level, "lane[%d] " UCT_TL_RESOURCE_DESC_FMT " bw " + UCP_PROTO_PERF_FUNC_BW_FMT UCP_PROTO_TIME_FMT(latency) + UCP_PROTO_TIME_FMT(send_pre_overhead) + UCP_PROTO_TIME_FMT(send_post_overhead) + UCP_PROTO_TIME_FMT(recv_overhead), lane, + UCT_TL_RESOURCE_DESC_ARG(&context->tl_rscs[rsc_index].tl_rsc), + (lane_perf->bandwidth / UCS_MBYTE), + UCP_PROTO_TIME_ARG(lane_perf->latency), + UCP_PROTO_TIME_ARG(lane_perf->send_pre_overhead), + UCP_PROTO_TIME_ARG(lane_perf->send_post_overhead), + UCP_PROTO_TIME_ARG(lane_perf->recv_overhead)); + } + + ucs_log_indent(-1); +} + +size_t +ucp_proto_select_distinct(ucp_proto_lane_selection_t *selection, size_t count) +{ + size_t distinct = 1; + size_t i, j; + int unique; + ucs_assertv_always(count > 0 && count <= UCP_PROTO_VARIANT_LAST, + "count=%zu", count); + + for (i = 1; i < count; ++i) { + unique = 1; + for (j = 0; j < distinct; ++j) { + if (selection[i].lane_map == selection[j].lane_map) { + unique = 0; + break; + } + } + + if (unique && (i != distinct)) { + selection[distinct++] = selection[i]; + } + } + + return distinct; +} diff --git a/src/ucp/proto/proto_common.h b/src/ucp/proto/proto_common.h index 7d23822bd88..e1706a2dc48 100644 --- a/src/ucp/proto/proto_common.h +++ b/src/ucp/proto/proto_common.h @@ -182,24 +182,6 @@ typedef struct { } ucp_proto_common_tl_perf_t; -typedef struct { - ucp_lane_map_t lane_map; - ucp_lane_index_t lanes[UCP_PROTO_MAX_LANES]; - ucp_lane_index_t num_lanes; - uint8_t dev_count[UCP_MAX_RESOURCES]; - ucp_proto_common_tl_perf_t perf; -} ucp_proto_lane_selection_t; - - -/* Private data per lane */ -typedef struct { - ucp_lane_index_t lane; /* Lane index in the endpoint */ - ucp_rsc_index_t md_index; /* Index of UCT memory handle (for zero copy) */ - ucp_md_index_t rkey_index; /* Remote key index (for remote access) */ - uint8_t max_iov; /* Maximal number of IOVs on this lane */ -} ucp_proto_common_lane_priv_t; - - /** * Protocol selection variants macro, used to iterate over all variants. * The second argument is the message size for the variant, which is used to @@ -216,9 +198,29 @@ typedef struct { #define UCP_PROTO_VARIANT_ENUMIFY(ID, MSG_SIZE, NAME) ID, typedef enum { UCP_FOREACH_PROTO_VARIANT(UCP_PROTO_VARIANT_ENUMIFY) + UCP_PROTO_VARIANT_LAST } ucp_proto_variant_t; +typedef struct { + ucp_lane_map_t lane_map; + ucp_lane_index_t lanes[UCP_PROTO_MAX_LANES]; + ucp_lane_index_t num_lanes; + uint8_t dev_count[UCP_MAX_RESOURCES]; + ucp_proto_common_tl_perf_t perf; + ucp_proto_variant_t variant; +} ucp_proto_lane_selection_t; + + +/* Private data per lane */ +typedef struct { + ucp_lane_index_t lane; /* Lane index in the endpoint */ + ucp_rsc_index_t md_index; /* Index of UCT memory handle (for zero copy) */ + ucp_md_index_t rkey_index; /* Remote key index (for remote access) */ + uint8_t max_iov; /* Maximal number of IOVs on this lane */ +} ucp_proto_common_lane_priv_t; + + /** * Called the first time the protocol starts sending a request, and only once * per request. @@ -383,4 +385,21 @@ void ucp_proto_reset_fatal_not_implemented(ucp_request_t *req); void ucp_proto_fatal_invalid_stage(ucp_request_t *req, const char *func_name); +void +ucp_proto_select_lanes(const ucp_proto_init_params_t *params, + const ucp_lane_index_t *lanes, ucp_lane_index_t num_lanes, + ucp_lane_index_t max_lanes, + const ucp_proto_common_tl_perf_t *lanes_perf, + int fixed_first_lane, ucp_proto_variant_t variant, + ucp_proto_lane_selection_t *selection); + +void +ucp_proto_select_trace(const ucp_proto_init_params_t *params, + const ucp_proto_lane_selection_t *selection, + const ucp_proto_common_tl_perf_t *lanes_perf, + const char *desc, ucs_log_level_t level); + +size_t +ucp_proto_select_distinct(ucp_proto_lane_selection_t *selection, size_t count); + #endif diff --git a/src/ucp/proto/proto_multi.c b/src/ucp/proto/proto_multi.c index c2b1459dcc8..399599042a4 100644 --- a/src/ucp/proto/proto_multi.c +++ b/src/ucp/proto/proto_multi.c @@ -20,99 +20,9 @@ #include -static UCS_F_ALWAYS_INLINE double -ucp_proto_multi_get_avail_bw(const ucp_proto_init_params_t *params, - ucp_lane_index_t lane, - const ucp_proto_common_tl_perf_t *lane_perf, - const ucp_proto_lane_selection_t *selection) -{ - /* Minimal path ratio */ - static const double MIN_RATIO = 0.01; - ucp_context_h context = params->worker->context; - double multi_path_ratio = context->config.ext.multi_path_ratio; - ucp_rsc_index_t dev_index = ucp_proto_common_get_dev_index(params, lane); - uint8_t path_index = selection->dev_count[dev_index]; - double ratio; - - if (UCS_CONFIG_DBL_IS_AUTO(multi_path_ratio)) { - ratio = ucs_min(1.0 - (lane_perf->path_ratio * path_index), - lane_perf->path_ratio); - } else { - ratio = 1.0 - (multi_path_ratio * path_index); - } - - if (ratio < MIN_RATIO) { - /* The iface BW is entirely consumed by the selected paths. But we still - * need to assign some minimal BW to the these extra paths in order to - * select them. We divide min ratio of the iface BW by path_index so - * that each additional path on the same device has lower bandwidth. */ - ratio = MIN_RATIO / path_index; - } - - ucs_trace("ratio=%0.3f path_index=%u avail_bw=" UCP_PROTO_PERF_FUNC_BW_FMT - " " UCP_PROTO_LANE_FMT, ratio, path_index, - (lane_perf->bandwidth * ratio) / UCS_MBYTE, - UCP_PROTO_LANE_ARG(params, lane, lane_perf)); - return lane_perf->bandwidth * ratio; -} - -static UCS_F_ALWAYS_INLINE double -ucp_proto_multi_get_score(double bw, const ucp_proto_common_tl_perf_t *lane_perf, - ucp_proto_variant_t variant) -{ - size_t msg_size = ucp_proto_common_get_variant_msg_size(variant); - return 1.0 / ((msg_size / bw) + lane_perf->latency); -} - -static ucp_lane_index_t -ucp_proto_multi_find_max_score_lane(const ucp_proto_init_params_t *params, - const ucp_lane_index_t *lanes, - const ucp_proto_common_tl_perf_t *lanes_perf, - const ucp_proto_lane_selection_t *selection, - ucp_lane_map_t index_map, - ucp_proto_variant_t variant) -{ - double max_score = 0.0; - ucp_lane_index_t max_index = UCP_NULL_LANE; - double score; - double avail_bw; - const ucp_proto_common_tl_perf_t *lane_perf; - ucp_lane_index_t lane, index; - - ucs_assert(index_map != 0); - ucs_for_each_bit(index, index_map) { - lane = lanes[index]; - lane_perf = &lanes_perf[lane]; - avail_bw = ucp_proto_multi_get_avail_bw(params, lane, lane_perf, - selection); - score = ucp_proto_multi_get_score(avail_bw, lane_perf, variant); - if (score > max_score) { - max_score = score; - max_index = index; - } - } - - return max_index; -} - -static UCS_F_ALWAYS_INLINE void -ucp_proto_select_add_lane(ucp_proto_lane_selection_t *selection, - const ucp_proto_init_params_t *params, - ucp_lane_index_t lane) -{ - ucp_rsc_index_t dev_index = ucp_proto_common_get_dev_index(params, lane); - - ucs_assertv(selection->num_lanes < UCP_PROTO_MAX_LANES, - "selection num_lanes=%u max_lanes=%u", selection->num_lanes, - UCP_PROTO_MAX_LANES); - selection->lanes[selection->num_lanes++] = lane; - selection->lane_map |= UCS_BIT(lane); - selection->dev_count[dev_index]++; -} - static void -ucp_proto_select_aggregate_perf(ucp_proto_lane_selection_t *selection, - const ucp_proto_common_tl_perf_t *lanes_perf) +ucp_proto_multi_aggregate_perf(ucp_proto_lane_selection_t *selection, + const ucp_proto_common_tl_perf_t *lanes_perf) { ucp_proto_common_tl_perf_t *perf = &selection->perf; const ucp_proto_common_tl_perf_t *lane_perf; @@ -148,83 +58,6 @@ ucp_proto_select_aggregate_perf(ucp_proto_lane_selection_t *selection, } } -static void -ucp_proto_select_trace_lanes(const ucp_proto_init_params_t *params, - const ucp_proto_lane_selection_t *selection, - const ucp_proto_common_tl_perf_t *lanes_perf, - const char *desc) -{ - ucp_context_h UCS_V_UNUSED context = params->worker->context; - const ucp_proto_common_tl_perf_t *lane_perf; - ucp_lane_index_t i, lane; - ucp_rsc_index_t rsc_index; - - if (!ucs_log_is_enabled(UCS_LOG_LEVEL_DEBUG)) { - return; - } - - ucs_debug("%s=%u, protocol=%s", desc, selection->num_lanes, - ucp_proto_id_field(params->proto_id, name)); - ucs_log_indent(1); - - for (i = 0; i < selection->num_lanes; ++i) { - lane = selection->lanes[i]; - lane_perf = &lanes_perf[lane]; - rsc_index = ucp_proto_common_get_rsc_index(params, lane); - - ucs_debug("lane[%d] " UCT_TL_RESOURCE_DESC_FMT " bw " - UCP_PROTO_PERF_FUNC_BW_FMT UCP_PROTO_TIME_FMT(latency) - UCP_PROTO_TIME_FMT(send_pre_overhead) - UCP_PROTO_TIME_FMT(send_post_overhead) - UCP_PROTO_TIME_FMT(recv_overhead), lane, - UCT_TL_RESOURCE_DESC_ARG(&context->tl_rscs[rsc_index].tl_rsc), - (lane_perf->bandwidth / UCS_MBYTE), - UCP_PROTO_TIME_ARG(lane_perf->latency), - UCP_PROTO_TIME_ARG(lane_perf->send_pre_overhead), - UCP_PROTO_TIME_ARG(lane_perf->send_post_overhead), - UCP_PROTO_TIME_ARG(lane_perf->recv_overhead)); - } - - ucs_log_indent(-1); -} - -static void -ucp_proto_multi_select_lanes(const ucp_proto_init_params_t *params, - const ucp_lane_index_t *lanes, - ucp_lane_index_t num_lanes, - ucp_lane_index_t max_lanes, - const ucp_proto_common_tl_perf_t *lanes_perf, - int fixed_first_lane, - ucp_proto_variant_t variant, - ucp_proto_lane_selection_t *selection) -{ - ucp_lane_index_t i, lane_index; - ucp_lane_map_t index_map; - - memset(selection, 0, sizeof(*selection)); - - /* Select all available indexes */ - index_map = UCS_MASK(num_lanes); - - if (fixed_first_lane) { - ucp_proto_select_add_lane(selection, params, lanes[0]); - index_map &= ~UCS_BIT(0); - } - - for (i = fixed_first_lane? 1 : 0; i < ucs_min(max_lanes, num_lanes); ++i) { - /* Greedy algorithm: find the best option at every step */ - lane_index = ucp_proto_multi_find_max_score_lane(params, lanes, - lanes_perf, selection, - index_map, variant); - if (lane_index == UCP_NULL_LANE) { - break; - } - - ucp_proto_select_add_lane(selection, params, lanes[lane_index]); - index_map &= ~UCS_BIT(lane_index); - } -} - ucs_status_t ucp_proto_multi_init(const ucp_proto_multi_init_params_t *params, const char *perf_name, ucp_proto_perf_t **perf_p, @@ -317,13 +150,11 @@ ucs_status_t ucp_proto_multi_init(const ucp_proto_multi_init_params_t *params, } num_lanes = num_fast_lanes; - ucp_proto_multi_select_lanes(¶ms->super.super, lanes, num_lanes, + ucp_proto_select_lanes(¶ms->super.super, lanes, num_lanes, params->max_lanes, lanes_perf, fixed_first_lane, UCP_PROTO_VARIANT_BW, &selection); - ucp_proto_select_aggregate_perf(&selection, lanes_perf); - ucp_proto_select_trace_lanes(¶ms->super.super, &selection, lanes_perf, - ucp_proto_common_get_variant_name(UCP_PROTO_VARIANT_BW)); + ucp_proto_multi_aggregate_perf(&selection, lanes_perf); for (i = 0; i < selection.num_lanes; ++i) { lane = selection.lanes[i]; From e0a8fa41e96c116df63ebeb8548fb474ae89a8b5 Mon Sep 17 00:00:00 2001 From: Ilia Yastrebov Date: Tue, 29 Jul 2025 10:41:45 +0000 Subject: [PATCH 08/18] UCP/PROTO: Split ucp_proto_multi_init_perf from ucp_proto_multi_init --- src/ucp/proto/proto_multi.c | 226 +++++++++++++++++++----------------- 1 file changed, 119 insertions(+), 107 deletions(-) diff --git a/src/ucp/proto/proto_multi.c b/src/ucp/proto/proto_multi.c index 399599042a4..5d20fbc2a1b 100644 --- a/src/ucp/proto/proto_multi.c +++ b/src/ucp/proto/proto_multi.c @@ -58,106 +58,24 @@ ucp_proto_multi_aggregate_perf(ucp_proto_lane_selection_t *selection, } } -ucs_status_t ucp_proto_multi_init(const ucp_proto_multi_init_params_t *params, - const char *perf_name, - ucp_proto_perf_t **perf_p, - ucp_proto_multi_priv_t *mpriv) +static ucs_status_t +ucp_proto_multi_init_perf(const ucp_proto_multi_init_params_t *params, + ucp_proto_lane_selection_t *selection, + const ucp_proto_common_tl_perf_t *lanes_perf, + const char *perf_name, ucp_proto_perf_t **perf_p, + ucp_proto_multi_priv_t *mpriv) { - ucp_context_h context = params->super.super.worker->context; - const double max_bw_ratio = context->config.ext.multi_lane_max_ratio; - ucp_proto_common_tl_perf_t lanes_perf[UCP_PROTO_MAX_LANES]; - ucp_proto_common_tl_perf_t *lane_perf; - ucp_lane_index_t lanes[UCP_PROTO_MAX_LANES]; - double max_bandwidth, max_frag_ratio, min_bandwidth; - ucp_lane_index_t i, lane, num_lanes, num_fast_lanes; + double max_frag_ratio = 0; + double min_bandwidth = DBL_MAX; + ucp_lane_index_t i, lane; ucp_proto_multi_lane_priv_t *lpriv; + const ucp_proto_common_tl_perf_t *lane_perf; size_t max_frag, min_length, min_end_offset, min_chunk; - ucp_proto_lane_selection_t selection; ucp_md_map_t reg_md_map; uint32_t weight_sum; - ucs_status_t status; - int fixed_first_lane; - ucs_assert(params->max_lanes <= UCP_PROTO_MAX_LANES); - - if ((ucp_proto_select_op_flags(params->super.super.select_param) & - UCP_PROTO_SELECT_OP_FLAG_RESUME) && - !(params->super.flags & UCP_PROTO_COMMON_INIT_FLAG_RESUME)) { - return UCS_ERR_UNSUPPORTED; - } - - if (!ucp_proto_common_init_check_err_handling(¶ms->super) || - (params->max_lanes == 0)) { - return UCS_ERR_UNSUPPORTED; - } - - if (!ucp_proto_common_check_memtype_copy(¶ms->super)) { - return UCS_ERR_UNSUPPORTED; - } - - /* Find first lane */ - num_lanes = ucp_proto_common_find_lanes( - ¶ms->super.super, params->super.flags, params->first.lane_type, - params->first.tl_cap_flags, 1, 0, ucp_proto_common_filter_min_frag, - lanes); - if (num_lanes == 0) { - ucs_trace("no lanes for %s", - ucp_proto_id_field(params->super.super.proto_id, name)); - return UCS_ERR_NO_ELEM; - } - - /* Find rest of the lanes */ - num_lanes += ucp_proto_common_find_lanes( - ¶ms->super.super, params->super.flags, params->middle.lane_type, - params->middle.tl_cap_flags, UCP_PROTO_MAX_LANES - 1, - UCS_BIT(lanes[0]), ucp_proto_common_filter_min_frag, lanes + 1); - - /* Get bandwidth of all lanes and max_bandwidth */ - max_bandwidth = 0; - for (i = 0; i < num_lanes; ++i) { - lane = lanes[i]; - lane_perf = &lanes_perf[lane]; - - status = ucp_proto_common_get_lane_perf(¶ms->super, lane, lane_perf); - if (status != UCS_OK) { - return status; - } - - /* Calculate maximal bandwidth of all lanes, to skip slow lanes */ - max_bandwidth = ucs_max(max_bandwidth, lane_perf->bandwidth); - } - - /* Select the lanes to use, and calculate their aggregate performance */ - max_frag_ratio = 0; - min_bandwidth = DBL_MAX; - - /* Filter out slow lanes */ - fixed_first_lane = params->first.lane_type != params->middle.lane_type; - for (i = fixed_first_lane ? 1 : 0, num_fast_lanes = i; i < num_lanes; ++i) { - lane = lanes[i]; - lane_perf = &lanes_perf[lane]; - if ((lane_perf->bandwidth * max_bw_ratio) < max_bandwidth) { - /* Bandwidth on this lane is too low compared to the fastest - available lane, so it's not worth using it */ - ucp_proto_perf_node_deref(&lanes_perf[lane].node); - ucs_trace("drop " UCP_PROTO_LANE_FMT, - UCP_PROTO_LANE_ARG(¶ms->super.super, lane, lane_perf)); - } else { - lanes[num_fast_lanes++] = lane; - ucs_trace("avail " UCP_PROTO_LANE_FMT, - UCP_PROTO_LANE_ARG(¶ms->super.super, lane, lane_perf)); - } - } - - num_lanes = num_fast_lanes; - ucp_proto_select_lanes(¶ms->super.super, lanes, num_lanes, - params->max_lanes, lanes_perf, - fixed_first_lane, UCP_PROTO_VARIANT_BW, &selection); - - ucp_proto_multi_aggregate_perf(&selection, lanes_perf); - - for (i = 0; i < selection.num_lanes; ++i) { - lane = selection.lanes[i]; + for (i = 0; i < selection->num_lanes; ++i) { + lane = selection->lanes[i]; lane_perf = &lanes_perf[lane]; /* Calculate maximal bandwidth-to-fragment-size ratio, which is used to @@ -165,15 +83,14 @@ ucs_status_t ucp_proto_multi_init(const ucp_proto_multi_init_params_t *params, also do not exceed maximal supported size */ max_frag_ratio = ucs_max(max_frag_ratio, lane_perf->bandwidth / lane_perf->max_frag); - - min_bandwidth = ucs_min(min_bandwidth, lane_perf->bandwidth); + min_bandwidth = ucs_min(min_bandwidth, lane_perf->bandwidth); } /* Initialize multi-lane private data and relative weights */ reg_md_map = ucp_proto_common_reg_md_map(¶ms->super, - selection.lane_map); + selection->lane_map); mpriv->reg_md_map = reg_md_map | params->initial_reg_md_map; - mpriv->lane_map = selection.lane_map; + mpriv->lane_map = selection->lane_map; mpriv->num_lanes = 0; mpriv->min_frag = 0; mpriv->max_frag_sum = 0; @@ -181,7 +98,7 @@ ucs_status_t ucp_proto_multi_init(const ucp_proto_multi_init_params_t *params, weight_sum = 0; min_end_offset = 0; - ucs_for_each_bit(lane, selection.lane_map) { + ucs_for_each_bit(lane, selection->lane_map) { ucs_assert(lane < UCP_MAX_LANES); lpriv = &mpriv->lanes[mpriv->num_lanes++]; @@ -205,13 +122,14 @@ ucs_status_t ucp_proto_multi_init(const ucp_proto_multi_init_params_t *params, ucs_max(UCP_MIN_BCOPY, lane_perf->bandwidth * params->min_chunk / min_bandwidth)); - max_frag = ucs_max(max_frag, min_chunk); - lpriv->max_frag = max_frag; - selection.perf.max_frag += max_frag; + + max_frag = ucs_max(max_frag, min_chunk); + lpriv->max_frag = max_frag; + selection->perf.max_frag += max_frag; /* Calculate lane weight as a fixed-point fraction */ lpriv->weight = ucs_proto_multi_calc_weight(lane_perf->bandwidth, - selection.perf.bandwidth); + selection->perf.bandwidth); ucs_assert(lpriv->weight > 0); ucs_assert(lpriv->weight <= UCP_PROTO_MULTI_WEIGHT_MAX); @@ -251,7 +169,8 @@ ucs_status_t ucp_proto_multi_init(const ucp_proto_multi_init_params_t *params, lpriv->weight; ucs_assert(ucp_proto_multi_scaled_length(lpriv->weight, min_length) >= lane_perf->min_length); - selection.perf.min_length = ucs_max(selection.perf.min_length, min_length); + selection->perf.min_length = ucs_max(selection->perf.min_length, + min_length); weight_sum += lpriv->weight; min_end_offset += min_chunk; @@ -263,11 +182,104 @@ ucs_status_t ucp_proto_multi_init(const ucp_proto_multi_init_params_t *params, lpriv->opt_align = ucp_proto_multi_get_lane_opt_align(params, lane); mpriv->align_thresh = ucs_max(mpriv->align_thresh, lpriv->opt_align); } - ucs_assert(mpriv->num_lanes == ucs_popcount(selection.lane_map)); + ucs_assert(mpriv->num_lanes == ucs_popcount(selection->lane_map)); + + return ucp_proto_init_perf(¶ms->super, &selection->perf, reg_md_map, + perf_name, perf_p); +} - status = ucp_proto_init_perf(¶ms->super, &selection.perf, reg_md_map, - perf_name, perf_p); +ucs_status_t ucp_proto_multi_init(const ucp_proto_multi_init_params_t *params, + const char *perf_name, + ucp_proto_perf_t **perf_p, + ucp_proto_multi_priv_t *mpriv) +{ + ucp_context_h context = params->super.super.worker->context; + const double max_bw_ratio = context->config.ext.multi_lane_max_ratio; + ucp_proto_common_tl_perf_t lanes_perf[UCP_PROTO_MAX_LANES]; + ucp_proto_common_tl_perf_t *lane_perf; + ucp_lane_index_t lanes[UCP_PROTO_MAX_LANES]; + double max_bandwidth; + ucp_lane_index_t i, lane, num_lanes, num_fast_lanes; + ucp_proto_lane_selection_t selection; + ucs_status_t status; + int fixed_first_lane; + + ucs_assert(params->max_lanes <= UCP_PROTO_MAX_LANES); + + if ((ucp_proto_select_op_flags(params->super.super.select_param) & + UCP_PROTO_SELECT_OP_FLAG_RESUME) && + !(params->super.flags & UCP_PROTO_COMMON_INIT_FLAG_RESUME)) { + return UCS_ERR_UNSUPPORTED; + } + + if (!ucp_proto_common_init_check_err_handling(¶ms->super) || + (params->max_lanes == 0)) { + return UCS_ERR_UNSUPPORTED; + } + + if (!ucp_proto_common_check_memtype_copy(¶ms->super)) { + return UCS_ERR_UNSUPPORTED; + } + + /* Find first lane */ + num_lanes = ucp_proto_common_find_lanes( + ¶ms->super.super, params->super.flags, params->first.lane_type, + params->first.tl_cap_flags, 1, 0, ucp_proto_common_filter_min_frag, + lanes); + if (num_lanes == 0) { + ucs_trace("no lanes for %s", + ucp_proto_id_field(params->super.super.proto_id, name)); + return UCS_ERR_NO_ELEM; + } + + /* Find rest of the lanes */ + num_lanes += ucp_proto_common_find_lanes( + ¶ms->super.super, params->super.flags, params->middle.lane_type, + params->middle.tl_cap_flags, UCP_PROTO_MAX_LANES - 1, + UCS_BIT(lanes[0]), ucp_proto_common_filter_min_frag, lanes + 1); + + /* Get bandwidth of all lanes and max_bandwidth */ + max_bandwidth = 0; + for (i = 0; i < num_lanes; ++i) { + lane = lanes[i]; + lane_perf = &lanes_perf[lane]; + + status = ucp_proto_common_get_lane_perf(¶ms->super, lane, lane_perf); + if (status != UCS_OK) { + return status; + } + + /* Calculate maximal bandwidth of all lanes, to skip slow lanes */ + max_bandwidth = ucs_max(max_bandwidth, lane_perf->bandwidth); + } + + /* Filter out slow lanes */ + fixed_first_lane = params->first.lane_type != params->middle.lane_type; + for (i = fixed_first_lane ? 1 : 0, num_fast_lanes = i; i < num_lanes; ++i) { + lane = lanes[i]; + lane_perf = &lanes_perf[lane]; + if ((lane_perf->bandwidth * max_bw_ratio) < max_bandwidth) { + /* Bandwidth on this lane is too low compared to the fastest + available lane, so it's not worth using it */ + ucp_proto_perf_node_deref(&lanes_perf[lane].node); + ucs_trace("drop " UCP_PROTO_LANE_FMT, + UCP_PROTO_LANE_ARG(¶ms->super.super, lane, lane_perf)); + } else { + lanes[num_fast_lanes++] = lane; + ucs_trace("avail " UCP_PROTO_LANE_FMT, + UCP_PROTO_LANE_ARG(¶ms->super.super, lane, lane_perf)); + } + } + + num_lanes = num_fast_lanes; + ucp_proto_select_lanes(¶ms->super.super, lanes, num_lanes, + params->max_lanes, lanes_perf, + fixed_first_lane, UCP_PROTO_VARIANT_BW, &selection); + + ucp_proto_multi_aggregate_perf(&selection, lanes_perf); + status = ucp_proto_multi_init_perf(params, &selection, lanes_perf, + perf_name, perf_p, mpriv); ucp_proto_perf_node_deref(&selection.perf.node); /* Deref unused nodes */ From 2d3196db4c4fdebaa55f6ccc9a9dfc85c8dbc456 Mon Sep 17 00:00:00 2001 From: Ilia Yastrebov Date: Tue, 29 Jul 2025 13:59:30 +0000 Subject: [PATCH 09/18] UCP/PROTO: Fixed build --- src/ucp/proto/proto_common.h | 4 ++-- src/ucp/proto/proto_common.inl | 12 ++++++++++-- 2 files changed, 12 insertions(+), 4 deletions(-) diff --git a/src/ucp/proto/proto_common.h b/src/ucp/proto/proto_common.h index e1706a2dc48..37a3b1db9c4 100644 --- a/src/ucp/proto/proto_common.h +++ b/src/ucp/proto/proto_common.h @@ -189,8 +189,8 @@ typedef struct { * variant, which is used to print the variant name. */ #define UCP_FOREACH_PROTO_VARIANT(_macro) \ - _macro(UCP_PROTO_VARIANT_LAT, UCS_KBYTE, "lat") \ - _macro(UCP_PROTO_VARIANT_BW, UCS_GBYTE, "bw") + _macro(UCP_PROTO_VARIANT_BW, UCS_GBYTE, "bw") \ + _macro(UCP_PROTO_VARIANT_LAT, UCS_KBYTE, "lat") /** * Protocol selection variant enum diff --git a/src/ucp/proto/proto_common.inl b/src/ucp/proto/proto_common.inl index b5697275c05..1f7048f0ab5 100644 --- a/src/ucp/proto/proto_common.inl +++ b/src/ucp/proto/proto_common.inl @@ -413,9 +413,10 @@ ucp_proto_common_get_variant_msg_size(ucp_proto_variant_t variant) #define UCP_PROTO_VARIANT_IT(ID, MSG_SIZE, _) case ID: return MSG_SIZE; switch (variant) { UCP_FOREACH_PROTO_VARIANT(UCP_PROTO_VARIANT_IT) - default: ucs_assert_always(0); + default: ucs_fatal("unexpected variant %d", variant); } #undef UCP_PROTO_VARIANT_IT + return 0; } static UCS_F_ALWAYS_INLINE const char* @@ -424,9 +425,16 @@ ucp_proto_common_get_variant_name(ucp_proto_variant_t variant) #define UCP_PROTO_VARIANT_IT(ID, _, NAME) case ID: return NAME; switch (variant) { UCP_FOREACH_PROTO_VARIANT(UCP_PROTO_VARIANT_IT) - default: ucs_assert_always(0); + default: ucs_fatal("unexpected variant %d", variant); } #undef UCP_PROTO_VARIANT_IT + return NULL; +} + +static UCS_F_ALWAYS_INLINE size_t +ucp_proto_common_get_variants_count(ucp_context_h context) +{ + return context->config.ext.proto_variants_enable ? UCP_PROTO_VARIANT_LAST : 1; } #endif From 197e36a8beed5b0237339b84a0f9543a373d6184 Mon Sep 17 00:00:00 2001 From: Ilia Yastrebov Date: Mon, 4 Aug 2025 11:30:20 +0000 Subject: [PATCH 10/18] UCP/PROTO: Memory pool in worker for lane selection --- src/ucp/core/ucp_worker.c | 19 +++++++++++++++++++ src/ucp/core/ucp_worker.h | 1 + 2 files changed, 20 insertions(+) diff --git a/src/ucp/core/ucp_worker.c b/src/ucp/core/ucp_worker.c index 66dd96d4263..6fa20d83a1a 100644 --- a/src/ucp/core/ucp_worker.c +++ b/src/ucp/core/ucp_worker.c @@ -130,6 +130,14 @@ static ucs_mpool_ops_t ucp_rkey_mpool_ops = { .obj_str = NULL }; +static ucs_mpool_ops_t ucp_proto_select_mpool_ops = { + .chunk_alloc = ucs_mpool_chunk_malloc, + .chunk_release = ucs_mpool_chunk_free, + .obj_init = NULL, + .obj_cleanup = NULL, + .obj_str = NULL +}; + #define ucp_worker_discard_uct_ep_hash_key(_uct_ep) \ kh_int64_hash_func((uintptr_t)(_uct_ep)) @@ -2006,6 +2014,16 @@ static ucs_status_t ucp_worker_init_mpools(ucp_worker_h worker) worker->flags |= UCP_WORKER_FLAG_AM_MPOOL_INITIALIZED; } + ucs_mpool_params_reset(&mp_params); + mp_params.elem_size = sizeof(ucp_proto_lane_select_t); + mp_params.elems_per_chunk = 8; + mp_params.ops = &ucp_proto_select_mpool_ops; + mp_params.name = "ucp_proto_lane_select"; + status = ucs_mpool_init(&mp_params, &worker->proto_select_mp); + if (status != UCS_OK) { + goto err_reg_mp_cleanup; + } + return UCS_OK; err_reg_mp_cleanup: @@ -2043,6 +2061,7 @@ static void ucp_worker_destroy_mpools(ucp_worker_h worker) } ucs_mpool_cleanup(&worker->req_mp, !(worker->flags & UCP_WORKER_FLAG_IGNORE_REQUEST_LEAK)); + ucs_mpool_cleanup(&worker->proto_select_mp, 1); } static unsigned ucp_worker_ep_config_free_cb(void *arg) diff --git a/src/ucp/core/ucp_worker.h b/src/ucp/core/ucp_worker.h index e0f4e2b9c31..919bc716051 100644 --- a/src/ucp/core/ucp_worker.h +++ b/src/ucp/core/ucp_worker.h @@ -318,6 +318,7 @@ typedef struct ucp_worker { ucp_worker_cm_t *cms; /* Array of CMs, one for each component */ ucs_mpool_set_t am_mps; /* Memory pool set for AM receives */ ucs_mpool_t reg_mp; /* Registered memory pool */ + ucs_mpool_t proto_select_mp; /* Protocol selection memory pool */ ucp_worker_mpool_hash_t mpool_hash; /* Hash table of memory pools */ ucs_queue_head_t rkey_ptr_reqs; /* Queue of submitted RKEY PTR requests that * are in-progress */ From 5c4e7caeb946e2f493dac432e1d214ab83236ef7 Mon Sep 17 00:00:00 2001 From: Ilia Yastrebov Date: Mon, 4 Aug 2025 11:36:54 +0000 Subject: [PATCH 11/18] UCP/PROTO: API for multi variants --- src/ucp/proto/proto_common.c | 245 ++++++++++++++++++++++++++++------- src/ucp/proto/proto_common.h | 45 ++++--- src/ucp/proto/proto_multi.c | 153 ++++++---------------- src/ucp/proto/proto_multi.h | 11 +- src/ucp/rndv/proto_rndv.c | 9 +- src/ucp/rndv/proto_rndv.h | 2 +- src/ucp/rndv/rndv_get.c | 31 ++++- src/ucp/rndv/rndv_put.c | 169 ++++++++++++++---------- 8 files changed, 395 insertions(+), 270 deletions(-) diff --git a/src/ucp/proto/proto_common.c b/src/ucp/proto/proto_common.c index 5e8fdb68419..948a2928128 100644 --- a/src/ucp/proto/proto_common.c +++ b/src/ucp/proto/proto_common.c @@ -928,10 +928,10 @@ void ucp_proto_fatal_invalid_stage(ucp_request_t *req, const char *func_name) } static UCS_F_ALWAYS_INLINE double -ucp_proto_select_get_avail_bw(const ucp_proto_init_params_t *params, - ucp_lane_index_t lane, - const ucp_proto_common_tl_perf_t *lane_perf, - const ucp_proto_lane_selection_t *selection) +ucp_proto_lane_select_avail_bw(const ucp_proto_init_params_t *params, + ucp_lane_index_t lane, + const ucp_proto_common_tl_perf_t *lane_perf, + const ucp_proto_lane_selection_t *selection) { /* Minimal path ratio */ static const double MIN_RATIO = 0.01; @@ -965,19 +965,19 @@ ucp_proto_select_get_avail_bw(const ucp_proto_init_params_t *params, } static UCS_F_ALWAYS_INLINE double -ucp_proto_select_get_score(double bw, const ucp_proto_common_tl_perf_t *lane_perf, - ucp_proto_variant_t variant) +ucp_proto_lane_select_score(double bw, + const ucp_proto_common_tl_perf_t *lane_perf, + ucp_proto_variant_t variant) { size_t msg_size = ucp_proto_common_get_variant_msg_size(variant); return 1.0 / ((msg_size / bw) + lane_perf->latency); } static ucp_lane_index_t -ucp_proto_select_find_max_score_lane(const ucp_proto_init_params_t *params, - const ucp_lane_index_t *lanes, - const ucp_proto_common_tl_perf_t *lanes_perf, - const ucp_proto_lane_selection_t *selection, - ucp_lane_map_t index_map) +ucp_proto_lane_select_find(const ucp_proto_init_params_t *params, + const ucp_proto_lane_select_t *select, + const ucp_proto_lane_selection_t *selection, + ucp_lane_map_t index_map) { double max_score = 0.0; ucp_lane_index_t max_index = UCP_NULL_LANE; @@ -988,12 +988,12 @@ ucp_proto_select_find_max_score_lane(const ucp_proto_init_params_t *params, ucs_assert(index_map != 0); ucs_for_each_bit(index, index_map) { - lane = lanes[index]; - lane_perf = &lanes_perf[lane]; - avail_bw = ucp_proto_select_get_avail_bw(params, lane, lane_perf, - selection); - score = ucp_proto_select_get_score(avail_bw, lane_perf, - selection->variant); + lane = select->lanes[index]; + lane_perf = &select->lanes_perf[lane]; + avail_bw = ucp_proto_lane_select_avail_bw(params, lane, lane_perf, + selection); + score = ucp_proto_lane_select_score(avail_bw, lane_perf, + selection->variant); if (score > max_score) { max_score = score; max_index = index; @@ -1004,7 +1004,7 @@ ucp_proto_select_find_max_score_lane(const ucp_proto_init_params_t *params, } static UCS_F_ALWAYS_INLINE void -ucp_proto_select_add_lane(ucp_proto_lane_selection_t *selection, +ucp_proto_lane_select_add(ucp_proto_lane_selection_t *selection, const ucp_proto_init_params_t *params, ucp_lane_index_t lane) { @@ -1018,13 +1018,12 @@ ucp_proto_select_add_lane(ucp_proto_lane_selection_t *selection, selection->dev_count[dev_index]++; } -void -ucp_proto_select_lanes(const ucp_proto_init_params_t *params, - const ucp_lane_index_t *lanes, - ucp_lane_index_t num_lanes, ucp_lane_index_t max_lanes, - const ucp_proto_common_tl_perf_t *lanes_perf, - int fixed_first_lane, ucp_proto_variant_t variant, - ucp_proto_lane_selection_t *selection) +static void +ucp_proto_lane_select(const ucp_proto_init_params_t *params, + const ucp_proto_lane_select_req_t *req, + const ucp_proto_lane_select_t *select, + ucp_proto_variant_t variant, + ucp_proto_lane_selection_t *selection) { ucp_lane_index_t i, lane_index; ucp_lane_map_t index_map; @@ -1033,30 +1032,31 @@ ucp_proto_select_lanes(const ucp_proto_init_params_t *params, selection->variant = variant; /* Select all available indexes */ - index_map = UCS_MASK(num_lanes); + index_map = UCS_MASK(select->num_lanes); - if (fixed_first_lane) { - ucp_proto_select_add_lane(selection, params, lanes[0]); + if (req->fixed_first_lane) { + ucp_proto_lane_select_add(selection, params, select->lanes[0]); index_map &= ~UCS_BIT(0); } - for (i = fixed_first_lane? 1 : 0; i < ucs_min(max_lanes, num_lanes); ++i) { + i = req->fixed_first_lane ? 1 : 0; + for (; i < ucs_min(req->max_lanes, select->num_lanes); ++i) { /* Greedy algorithm: find the best option at every step */ - lane_index = ucp_proto_select_find_max_score_lane(params, lanes, - lanes_perf, selection, - index_map); + lane_index = ucp_proto_lane_select_find(params, select, selection, + index_map); if (lane_index == UCP_NULL_LANE) { break; } - ucp_proto_select_add_lane(selection, params, lanes[lane_index]); + ucp_proto_lane_select_add(selection, params, select->lanes[lane_index]); index_map &= ~UCS_BIT(lane_index); } } -void ucp_proto_select_trace(const ucp_proto_init_params_t *params, +static void +ucp_proto_lane_select_trace(const ucp_proto_init_params_t *params, + const ucp_proto_lane_select_t *select, const ucp_proto_lane_selection_t *selection, - const ucp_proto_common_tl_perf_t *lanes_perf, const char *desc, ucs_log_level_t level) { ucp_context_h UCS_V_UNUSED context = params->worker->context; @@ -1075,17 +1075,19 @@ void ucp_proto_select_trace(const ucp_proto_init_params_t *params, for (i = 0; i < selection->num_lanes; ++i) { lane = selection->lanes[i]; - lane_perf = &lanes_perf[lane]; + lane_perf = &select->lanes_perf[lane]; rsc_index = ucp_proto_common_get_rsc_index(params, lane); ucs_log(level, "lane[%d] " UCT_TL_RESOURCE_DESC_FMT " bw " UCP_PROTO_PERF_FUNC_BW_FMT UCP_PROTO_TIME_FMT(latency) + UCP_PROTO_TIME_FMT(sys_latency) UCP_PROTO_TIME_FMT(send_pre_overhead) UCP_PROTO_TIME_FMT(send_post_overhead) UCP_PROTO_TIME_FMT(recv_overhead), lane, UCT_TL_RESOURCE_DESC_ARG(&context->tl_rscs[rsc_index].tl_rsc), (lane_perf->bandwidth / UCS_MBYTE), UCP_PROTO_TIME_ARG(lane_perf->latency), + UCP_PROTO_TIME_ARG(lane_perf->sys_latency), UCP_PROTO_TIME_ARG(lane_perf->send_pre_overhead), UCP_PROTO_TIME_ARG(lane_perf->send_post_overhead), UCP_PROTO_TIME_ARG(lane_perf->recv_overhead)); @@ -1094,28 +1096,175 @@ void ucp_proto_select_trace(const ucp_proto_init_params_t *params, ucs_log_indent(-1); } -size_t -ucp_proto_select_distinct(ucp_proto_lane_selection_t *selection, size_t count) +static void +ucp_proto_lane_select_distinct(ucp_proto_lane_select_t *select) { - size_t distinct = 1; - size_t i, j; + ucp_lane_index_t i, j; int unique; - ucs_assertv_always(count > 0 && count <= UCP_PROTO_VARIANT_LAST, - "count=%zu", count); - for (i = 1; i < count; ++i) { + select->num_selections = 1; + for (i = 1; i < UCP_PROTO_VARIANT_LAST; ++i) { unique = 1; - for (j = 0; j < distinct; ++j) { - if (selection[i].lane_map == selection[j].lane_map) { + for (j = 0; j < select->num_selections; ++j) { + if (select->selections[i].lane_map == select->selections[j].lane_map) { unique = 0; break; } } - if (unique && (i != distinct)) { - selection[distinct++] = selection[i]; + if (unique) { + if (i != select->num_selections) { + select->selections[select->num_selections] = select->selections[i]; + } + + ++select->num_selections; + } + } +} + +static void +ucp_proto_lane_select_aggregate(const ucp_proto_lane_select_t *select, + ucp_proto_lane_selection_t *selection) +{ + ucp_proto_common_tl_perf_t *perf = &selection->perf; + const ucp_proto_common_tl_perf_t *lane_perf; + ucp_lane_index_t lane; + + /* TODO: Adjust performance estimation based on the actual resource usage, + * i.e. split the overall iface BW between all selected paths. */ + + ucs_for_each_bit(lane, selection->lane_map) { + lane_perf = &select->lanes_perf[lane]; + + /* Update aggregated performance metric */ + perf->bandwidth += lane_perf->bandwidth; + perf->send_pre_overhead += lane_perf->send_pre_overhead; + perf->send_post_overhead += lane_perf->send_post_overhead; + perf->recv_overhead += lane_perf->recv_overhead; + perf->latency = ucs_max(perf->latency, lane_perf->latency); + perf->sys_latency = ucs_max(perf->sys_latency, + lane_perf->sys_latency); + } + + if (selection->num_lanes == 1) { + lane_perf = &select->lanes_perf[selection->lanes[0]]; + ucp_proto_perf_node_ref(lane_perf->node); + selection->perf.node = lane_perf->node; + } else { + selection->perf.node = ucp_proto_perf_node_new_data("multi", "%u lanes", + selection->num_lanes); + ucs_for_each_bit(lane, selection->lane_map) { + lane_perf = &select->lanes_perf[lane]; + ucp_proto_perf_node_add_child(selection->perf.node, lane_perf->node); } } +} + +ucs_status_t +ucp_proto_lane_select_init(const ucp_proto_common_init_params_t *params, + const ucp_proto_lane_select_req_t *req, + ucp_proto_lane_select_t **select_p) +{ + ucp_worker_h worker = params->super.worker; + ucp_context_h context = worker->context; + const double max_bw_ratio = context->config.ext.multi_lane_max_ratio; + double max_bandwidth = 0; + ucp_proto_common_tl_perf_t *lane_perf; + ucp_lane_index_t i, lane; + ucp_proto_lane_select_t *select; + ucp_proto_lane_selection_t *selection; + ucs_status_t status; + ucp_proto_variant_t variant; + + select = ucs_mpool_get(&worker->proto_select_mp); + if (select == NULL) { + return UCS_ERR_NO_MEMORY; + } + + memset(select, 0, sizeof(*select)); + + /* Get bandwidth of all lanes and max_bandwidth */ + for (i = 0; i < req->num_lanes; ++i) { + lane = req->lanes[i]; + lane_perf = &select->lanes_perf[lane]; + + status = ucp_proto_common_get_lane_perf(params, lane, lane_perf); + if (status != UCS_OK) { + ucs_mpool_put(select); + return status; + } + + /* Calculate maximal bandwidth of all lanes, to skip slow lanes */ + max_bandwidth = ucs_max(max_bandwidth, lane_perf->bandwidth); + } + + /* Add fast lanes to the selection */ + for (i = 0; i < req->num_lanes; ++i) { + lane = req->lanes[i]; + lane_perf = &select->lanes_perf[lane]; + + if ((req->fixed_first_lane && (i == 0)) || + ((lane_perf->bandwidth * max_bw_ratio) >= max_bandwidth)) { + select->lanes[select->num_lanes++] = lane; + ucs_trace("avail " UCP_PROTO_LANE_FMT, + UCP_PROTO_LANE_ARG(¶ms->super, lane, lane_perf)); + } else { + /* Bandwidth on this lane is too low compared to the fastest + available lane, so it's not worth using it */ + ucp_proto_perf_node_deref(&lane_perf->node); + ucs_trace("drop " UCP_PROTO_LANE_FMT, + UCP_PROTO_LANE_ARG(¶ms->super, lane, lane_perf)); + } + } + + /* Initialize all protocol variants */ + for (variant = 0; variant < UCP_PROTO_VARIANT_LAST; ++variant) { + selection = &select->selections[variant]; + ucp_proto_lane_select(¶ms->super, req, select, variant, selection); + ucp_proto_lane_select_trace(¶ms->super, select, selection, + "found variant", UCS_LOG_LEVEL_TRACE); + } + + /* Select distinct protocol variants */ + ucp_proto_lane_select_distinct(select); + ucs_assert(select->num_selections > 0); + /* TODO: remove */ + select->num_selections = 1; + + /* Initialize selected distinct protocol variants */ + for (i = 0; i < select->num_selections; ++i) { + selection = &select->selections[i]; + + /* Append variant name only if there are multiple variants */ + snprintf(selection->name, sizeof(selection->name), "%s%s", req->perf_name, + (select->num_selections > 1 ? + ucp_proto_common_get_variant_name(selection->variant) : + "")); + + ucp_proto_lane_select_aggregate(select, selection); + ucp_proto_lane_select_trace(¶ms->super, select, selection, + "selected variant", UCS_LOG_LEVEL_DEBUG); + } + + *select_p = select; + return UCS_OK; +} + +void ucp_proto_lane_select_destroy(ucp_proto_lane_select_t *select) +{ + ucp_proto_common_tl_perf_t *lane_perf; + ucp_lane_index_t i, lane; + + /* Deref unused nodes */ + for (i = 0; i < select->num_lanes; ++i) { + lane = select->lanes[i]; + lane_perf = &select->lanes_perf[lane]; + ucp_proto_perf_node_deref(&lane_perf->node); + } + + for (i = 0; i < select->num_selections; ++i) { + ucp_proto_perf_node_deref(&select->selections[i].perf.node); + } - return distinct; + ucs_mpool_put(select); } diff --git a/src/ucp/proto/proto_common.h b/src/ucp/proto/proto_common.h index 37a3b1db9c4..40309edb5fc 100644 --- a/src/ucp/proto/proto_common.h +++ b/src/ucp/proto/proto_common.h @@ -189,8 +189,8 @@ typedef struct { * variant, which is used to print the variant name. */ #define UCP_FOREACH_PROTO_VARIANT(_macro) \ - _macro(UCP_PROTO_VARIANT_BW, UCS_GBYTE, "bw") \ - _macro(UCP_PROTO_VARIANT_LAT, UCS_KBYTE, "lat") + _macro(UCP_PROTO_VARIANT_BW, UCS_GBYTE, "(bw)") \ + _macro(UCP_PROTO_VARIANT_LAT, UCS_KBYTE, "(lat)") /** * Protocol selection variant enum @@ -209,9 +209,28 @@ typedef struct { uint8_t dev_count[UCP_MAX_RESOURCES]; ucp_proto_common_tl_perf_t perf; ucp_proto_variant_t variant; + char name[UCP_PROTO_DESC_STR_MAX]; } ucp_proto_lane_selection_t; +typedef struct { + const char *perf_name; + ucp_lane_index_t lanes[UCP_PROTO_MAX_LANES]; + ucp_lane_index_t num_lanes; + ucp_lane_index_t max_lanes; + int fixed_first_lane; +} ucp_proto_lane_select_req_t; + + +typedef struct { + ucp_lane_index_t lanes[UCP_PROTO_MAX_LANES]; + ucp_lane_index_t num_lanes; + ucp_proto_common_tl_perf_t lanes_perf[UCP_PROTO_MAX_LANES]; + ucp_proto_lane_selection_t selections[UCP_PROTO_VARIANT_LAST]; + ucp_lane_index_t num_selections; +} ucp_proto_lane_select_t; + + /* Private data per lane */ typedef struct { ucp_lane_index_t lane; /* Lane index in the endpoint */ @@ -385,21 +404,11 @@ void ucp_proto_reset_fatal_not_implemented(ucp_request_t *req); void ucp_proto_fatal_invalid_stage(ucp_request_t *req, const char *func_name); -void -ucp_proto_select_lanes(const ucp_proto_init_params_t *params, - const ucp_lane_index_t *lanes, ucp_lane_index_t num_lanes, - ucp_lane_index_t max_lanes, - const ucp_proto_common_tl_perf_t *lanes_perf, - int fixed_first_lane, ucp_proto_variant_t variant, - ucp_proto_lane_selection_t *selection); - -void -ucp_proto_select_trace(const ucp_proto_init_params_t *params, - const ucp_proto_lane_selection_t *selection, - const ucp_proto_common_tl_perf_t *lanes_perf, - const char *desc, ucs_log_level_t level); - -size_t -ucp_proto_select_distinct(ucp_proto_lane_selection_t *selection, size_t count); +ucs_status_t +ucp_proto_lane_select_init(const ucp_proto_common_init_params_t *params, + const ucp_proto_lane_select_req_t *req, + ucp_proto_lane_select_t **select_p); + +void ucp_proto_lane_select_destroy(ucp_proto_lane_select_t *select); #endif diff --git a/src/ucp/proto/proto_multi.c b/src/ucp/proto/proto_multi.c index 5d20fbc2a1b..7b204d1d571 100644 --- a/src/ucp/proto/proto_multi.c +++ b/src/ucp/proto/proto_multi.c @@ -20,49 +20,11 @@ #include -static void -ucp_proto_multi_aggregate_perf(ucp_proto_lane_selection_t *selection, - const ucp_proto_common_tl_perf_t *lanes_perf) -{ - ucp_proto_common_tl_perf_t *perf = &selection->perf; - const ucp_proto_common_tl_perf_t *lane_perf; - ucp_lane_index_t lane; - - /* TODO: Adjust performance estimation based on the actual resource usage, - * i.e. split the overall iface BW between all selected paths. */ - - ucs_for_each_bit(lane, selection->lane_map) { - lane_perf = &lanes_perf[lane]; - - /* Update aggregated performance metric */ - perf->bandwidth += lane_perf->bandwidth; - perf->send_pre_overhead += lane_perf->send_pre_overhead; - perf->send_post_overhead += lane_perf->send_post_overhead; - perf->recv_overhead += lane_perf->recv_overhead; - perf->latency = ucs_max(perf->latency, lane_perf->latency); - perf->sys_latency = ucs_max(perf->sys_latency, - lane_perf->sys_latency); - } - - if (selection->num_lanes == 1) { - lane_perf = &lanes_perf[selection->lanes[0]]; - ucp_proto_perf_node_ref(lane_perf->node); - selection->perf.node = lane_perf->node; - } else { - selection->perf.node = ucp_proto_perf_node_new_data("multi", "%u lanes", - selection->num_lanes); - ucs_for_each_bit(lane, selection->lane_map) { - lane_perf = &lanes_perf[lane]; - ucp_proto_perf_node_add_child(selection->perf.node, lane_perf->node); - } - } -} - -static ucs_status_t +ucs_status_t ucp_proto_multi_init_perf(const ucp_proto_multi_init_params_t *params, + const ucp_proto_lane_select_t *select, ucp_proto_lane_selection_t *selection, - const ucp_proto_common_tl_perf_t *lanes_perf, - const char *perf_name, ucp_proto_perf_t **perf_p, + ucp_proto_perf_t **perf_p, ucp_proto_multi_priv_t *mpriv) { double max_frag_ratio = 0; @@ -76,7 +38,7 @@ ucp_proto_multi_init_perf(const ucp_proto_multi_init_params_t *params, for (i = 0; i < selection->num_lanes; ++i) { lane = selection->lanes[i]; - lane_perf = &lanes_perf[lane]; + lane_perf = &select->lanes_perf[lane]; /* Calculate maximal bandwidth-to-fragment-size ratio, which is used to adjust fragment sizes so they are proportional to bandwidth ratio and @@ -102,7 +64,7 @@ ucp_proto_multi_init_perf(const ucp_proto_multi_init_params_t *params, ucs_assert(lane < UCP_MAX_LANES); lpriv = &mpriv->lanes[mpriv->num_lanes++]; - lane_perf = &lanes_perf[lane]; + lane_perf = &select->lanes_perf[lane]; ucp_proto_common_lane_priv_init(¶ms->super, mpriv->reg_md_map, lane, &lpriv->super); @@ -185,24 +147,14 @@ ucp_proto_multi_init_perf(const ucp_proto_multi_init_params_t *params, ucs_assert(mpriv->num_lanes == ucs_popcount(selection->lane_map)); return ucp_proto_init_perf(¶ms->super, &selection->perf, reg_md_map, - perf_name, perf_p); + selection->name, perf_p); } -ucs_status_t ucp_proto_multi_init(const ucp_proto_multi_init_params_t *params, - const char *perf_name, - ucp_proto_perf_t **perf_p, - ucp_proto_multi_priv_t *mpriv) +ucs_status_t +ucp_proto_multi_init(const ucp_proto_multi_init_params_t *params, + const char *perf_name, ucp_proto_lane_select_t **select_p) { - ucp_context_h context = params->super.super.worker->context; - const double max_bw_ratio = context->config.ext.multi_lane_max_ratio; - ucp_proto_common_tl_perf_t lanes_perf[UCP_PROTO_MAX_LANES]; - ucp_proto_common_tl_perf_t *lane_perf; - ucp_lane_index_t lanes[UCP_PROTO_MAX_LANES]; - double max_bandwidth; - ucp_lane_index_t i, lane, num_lanes, num_fast_lanes; - ucp_proto_lane_selection_t selection; - ucs_status_t status; - int fixed_first_lane; + ucp_proto_lane_select_req_t req; ucs_assert(params->max_lanes <= UCP_PROTO_MAX_LANES); @@ -222,72 +174,29 @@ ucs_status_t ucp_proto_multi_init(const ucp_proto_multi_init_params_t *params, } /* Find first lane */ - num_lanes = ucp_proto_common_find_lanes( + req.num_lanes = ucp_proto_common_find_lanes( ¶ms->super.super, params->super.flags, params->first.lane_type, params->first.tl_cap_flags, 1, 0, ucp_proto_common_filter_min_frag, - lanes); - if (num_lanes == 0) { + req.lanes); + if (req.num_lanes == 0) { ucs_trace("no lanes for %s", ucp_proto_id_field(params->super.super.proto_id, name)); return UCS_ERR_NO_ELEM; } /* Find rest of the lanes */ - num_lanes += ucp_proto_common_find_lanes( + req.num_lanes += ucp_proto_common_find_lanes( ¶ms->super.super, params->super.flags, params->middle.lane_type, params->middle.tl_cap_flags, UCP_PROTO_MAX_LANES - 1, - UCS_BIT(lanes[0]), ucp_proto_common_filter_min_frag, lanes + 1); + UCS_BIT(req.lanes[0]), ucp_proto_common_filter_min_frag, + req.lanes + 1); - /* Get bandwidth of all lanes and max_bandwidth */ - max_bandwidth = 0; - for (i = 0; i < num_lanes; ++i) { - lane = lanes[i]; - lane_perf = &lanes_perf[lane]; + /* Initialize lane selection */ + req.perf_name = perf_name; + req.max_lanes = params->max_lanes; + req.fixed_first_lane = params->first.lane_type != params->middle.lane_type; - status = ucp_proto_common_get_lane_perf(¶ms->super, lane, lane_perf); - if (status != UCS_OK) { - return status; - } - - /* Calculate maximal bandwidth of all lanes, to skip slow lanes */ - max_bandwidth = ucs_max(max_bandwidth, lane_perf->bandwidth); - } - - /* Filter out slow lanes */ - fixed_first_lane = params->first.lane_type != params->middle.lane_type; - for (i = fixed_first_lane ? 1 : 0, num_fast_lanes = i; i < num_lanes; ++i) { - lane = lanes[i]; - lane_perf = &lanes_perf[lane]; - if ((lane_perf->bandwidth * max_bw_ratio) < max_bandwidth) { - /* Bandwidth on this lane is too low compared to the fastest - available lane, so it's not worth using it */ - ucp_proto_perf_node_deref(&lanes_perf[lane].node); - ucs_trace("drop " UCP_PROTO_LANE_FMT, - UCP_PROTO_LANE_ARG(¶ms->super.super, lane, lane_perf)); - } else { - lanes[num_fast_lanes++] = lane; - ucs_trace("avail " UCP_PROTO_LANE_FMT, - UCP_PROTO_LANE_ARG(¶ms->super.super, lane, lane_perf)); - } - } - - num_lanes = num_fast_lanes; - ucp_proto_select_lanes(¶ms->super.super, lanes, num_lanes, - params->max_lanes, lanes_perf, - fixed_first_lane, UCP_PROTO_VARIANT_BW, &selection); - - ucp_proto_multi_aggregate_perf(&selection, lanes_perf); - - status = ucp_proto_multi_init_perf(params, &selection, lanes_perf, - perf_name, perf_p, mpriv); - ucp_proto_perf_node_deref(&selection.perf.node); - - /* Deref unused nodes */ - for (i = 0; i < num_lanes; ++i) { - ucp_proto_perf_node_deref(&lanes_perf[lanes[i]].node); - } - - return status; + return ucp_proto_lane_select_init(¶ms->super, &req, select_p); } size_t ucp_proto_multi_priv_size(const ucp_proto_multi_priv_t *mpriv) @@ -301,18 +210,30 @@ void ucp_proto_multi_probe(const ucp_proto_multi_init_params_t *params) { const char *proto_name = ucp_proto_id_field(params->super.super.proto_id, name); + ucp_proto_lane_select_t *select; ucp_proto_multi_priv_t mpriv; + ucp_lane_index_t i; ucp_proto_perf_t *perf; ucs_status_t status; - status = ucp_proto_multi_init(params, proto_name, &perf, &mpriv); + status = ucp_proto_multi_init(params, proto_name, &select); if (status != UCS_OK) { return; } - ucp_proto_select_add_proto(¶ms->super.super, params->super.cfg_thresh, - params->super.cfg_priority, perf, &mpriv, - ucp_proto_multi_priv_size(&mpriv)); + for (i = 0; i < select->num_selections; ++i) { + status = ucp_proto_multi_init_perf(params, select, &select->selections[i], + &perf, &mpriv); + if (status != UCS_OK) { + continue; + } + + ucp_proto_select_add_proto(¶ms->super.super, params->super.cfg_thresh, + params->super.cfg_priority, perf, &mpriv, + ucp_proto_multi_priv_size(&mpriv)); + } + + ucp_proto_lane_select_destroy(select); } static const ucp_ep_config_key_lane_t * diff --git a/src/ucp/proto/proto_multi.h b/src/ucp/proto/proto_multi.h index f7d2d862d18..73744c25989 100644 --- a/src/ucp/proto/proto_multi.h +++ b/src/ucp/proto/proto_multi.h @@ -164,8 +164,15 @@ typedef ucs_status_t (*ucp_proto_multi_lane_send_func_t)(ucp_request_t *req, ucs_status_t ucp_proto_multi_init(const ucp_proto_multi_init_params_t *params, const char *perf_name, - ucp_proto_perf_t **perf_p, - ucp_proto_multi_priv_t *mpriv); + ucp_proto_lane_select_t **select_p); + + +ucs_status_t +ucp_proto_multi_init_perf(const ucp_proto_multi_init_params_t *params, + const ucp_proto_lane_select_t *select, + ucp_proto_lane_selection_t *selection, + ucp_proto_perf_t **perf_p, + ucp_proto_multi_priv_t *mpriv); size_t ucp_proto_multi_priv_size(const ucp_proto_multi_priv_t *mpriv); diff --git a/src/ucp/rndv/proto_rndv.c b/src/ucp/rndv/proto_rndv.c index cfaf62c637c..789028599a2 100644 --- a/src/ucp/rndv/proto_rndv.c +++ b/src/ucp/rndv/proto_rndv.c @@ -616,22 +616,17 @@ ucp_proto_rndv_ack_init(const ucp_proto_common_init_params_t *init_params, ucs_status_t ucp_proto_rndv_bulk_init(const ucp_proto_multi_init_params_t *init_params, - const char *op_name, const char *ack_name, + const char *ack_name, ucp_proto_perf_t *bulk_perf, ucp_proto_perf_t **perf_p, ucp_proto_rndv_bulk_priv_t *rpriv) { ucp_context_t *context = init_params->super.super.worker->context; size_t rndv_align_thresh = context->config.ext.rndv_align_thresh; ucp_proto_multi_priv_t *mpriv = &rpriv->mpriv; - ucp_proto_perf_t *bulk_perf, *ack_perf; + ucp_proto_perf_t *ack_perf; const char *proto_name; ucs_status_t status; - status = ucp_proto_multi_init(init_params, op_name, &bulk_perf, mpriv); - if (status != UCS_OK) { - return status; - } - /* Adjust align split threshold by user configuration */ mpriv->align_thresh = ucs_max(rndv_align_thresh, mpriv->align_thresh + mpriv->min_frag); diff --git a/src/ucp/rndv/proto_rndv.h b/src/ucp/rndv/proto_rndv.h index 8299b740491..39fb0e3e9ba 100644 --- a/src/ucp/rndv/proto_rndv.h +++ b/src/ucp/rndv/proto_rndv.h @@ -177,7 +177,7 @@ ucp_proto_rndv_ack_init(const ucp_proto_common_init_params_t *init_params, ucs_status_t ucp_proto_rndv_bulk_init(const ucp_proto_multi_init_params_t *init_params, - const char *name, const char *ack_name, + const char *ack_name, ucp_proto_perf_t *bulk_perf, ucp_proto_perf_t **perf_p, ucp_proto_rndv_bulk_priv_t *rpriv); diff --git a/src/ucp/rndv/rndv_get.c b/src/ucp/rndv/rndv_get.c index 9a8fb8c28f4..a4527bd3e58 100644 --- a/src/ucp/rndv/rndv_get.c +++ b/src/ucp/rndv/rndv_get.c @@ -57,7 +57,9 @@ ucp_proto_rndv_get_common_probe(const ucp_proto_init_params_t *init_params, cap.get.opt_zcopy_align), }; ucp_proto_rndv_bulk_priv_t rpriv; - ucp_proto_perf_t *perf; + ucp_proto_lane_select_t *select; + ucp_lane_index_t i; + ucp_proto_perf_t *bulk_perf, *perf; ucs_status_t status; if ((init_params->select_param->dt_class != UCP_DATATYPE_CONTIG) || @@ -66,16 +68,31 @@ ucp_proto_rndv_get_common_probe(const ucp_proto_init_params_t *init_params, return; } - status = ucp_proto_rndv_bulk_init(¶ms, UCP_PROTO_RNDV_GET_DESC, - UCP_PROTO_RNDV_ATS_NAME, &perf, &rpriv); + status = ucp_proto_multi_init(¶ms, UCP_PROTO_RNDV_GET_DESC, &select); if (status != UCS_OK) { return; } - ucp_proto_select_add_proto(¶ms.super.super, params.super.cfg_thresh, - params.super.cfg_priority, perf, &rpriv, - UCP_PROTO_MULTI_EXTENDED_PRIV_SIZE(&rpriv, - mpriv)); + for (i = 0; i < select->num_selections; ++i) { + status = ucp_proto_multi_init_perf(¶ms, select, &select->selections[i], + &bulk_perf, &rpriv.mpriv); + if (status != UCS_OK) { + continue; + } + + status = ucp_proto_rndv_bulk_init(¶ms, UCP_PROTO_RNDV_ATS_NAME, + bulk_perf, &perf, &rpriv); + if (status != UCS_OK) { + continue; + } + + ucp_proto_select_add_proto(¶ms.super.super, params.super.cfg_thresh, + params.super.cfg_priority, perf, &rpriv, + UCP_PROTO_MULTI_EXTENDED_PRIV_SIZE(&rpriv, + mpriv)); + } + + ucp_proto_lane_select_destroy(select); } static UCS_F_ALWAYS_INLINE ucs_status_t diff --git a/src/ucp/rndv/rndv_put.c b/src/ucp/rndv/rndv_put.c index 28948386ffb..a7cc015674a 100644 --- a/src/ucp/rndv/rndv_put.c +++ b/src/ucp/rndv/rndv_put.c @@ -230,6 +230,89 @@ ucp_proto_rndv_put_common_request_init(ucp_request_t *req) return ucp_proto_rndv_bulk_request_init(req, &rpriv->bulk); } +static void +ucp_proto_rndv_put_add_proto(const ucp_proto_multi_init_params_t *params, + uct_completion_callback_t comp_cb, + uint8_t stat_counter, ucp_proto_perf_t *perf, + ucp_proto_rndv_put_priv_t *rpriv) +{ + const size_t atp_size = sizeof(ucp_rndv_ack_hdr_t); + ucp_context_h context = params->super.super.worker->context; + const uct_iface_attr_t *iface_attr; + ucp_lane_index_t lane_idx, lane; + int send_atp, use_fence; + unsigned atp_map; + + send_atp = !ucp_proto_rndv_init_params_is_ppln_frag(¶ms->super.super); + + /* Check which lanes support sending ATP */ + atp_map = 0; + for (lane_idx = 0; lane_idx < rpriv->bulk.mpriv.num_lanes; ++lane_idx) { + lane = rpriv->bulk.mpriv.lanes[lane_idx].super.lane; + iface_attr = ucp_proto_common_get_iface_attr(¶ms->super.super, lane); + if (((iface_attr->cap.flags & UCT_IFACE_FLAG_AM_SHORT) && + (iface_attr->cap.am.max_short >= atp_size)) || + ((iface_attr->cap.flags & UCT_IFACE_FLAG_AM_BCOPY) && + (iface_attr->cap.am.max_bcopy >= atp_size))) { + atp_map |= UCS_BIT(lane); + } + } + + /* Use fence only if all lanes support sending ATP and flush is not forced + */ + use_fence = send_atp && !context->config.ext.rndv_put_force_flush && + (rpriv->bulk.mpriv.lane_map == atp_map); + + /* All lanes can send ATP - invalidate am_lane, to use mpriv->lanes. + * Otherwise, would need to flush all lanes and send ATP on: + * - All lanes supporting ATP send. This ensures that data is flushed + * remotely (i.e. resides in the target buffer), which may not be the case + * with IB transports. An alternative would be to pass + * UCT_FLUSH_FLAG_REMOTE to uct_ep_flush, but using this flag increases + * UCP worker address size. + * TODO: Consider calling UCT ep flush with remote flag when/if address + * size is not an issue anymore. + * - Control lane if none of the lanes support sending ATP + */ + if (use_fence) { + /* Send fence followed by ATP on all lanes */ + rpriv->bulk.super.lane = UCP_NULL_LANE; + rpriv->put_comp_cb = comp_cb; + rpriv->atp_comp_cb = NULL; + rpriv->stage_after_put = UCP_PROTO_RNDV_PUT_STAGE_FENCED_ATP; + rpriv->flush_map = 0; + rpriv->atp_map = rpriv->bulk.mpriv.lane_map; + } else { + /* Flush all lanes and send ATP on all supporting lanes (or control lane + * otherwise) */ + if (send_atp) { + rpriv->put_comp_cb = + ucp_proto_rndv_put_common_flush_completion_send_atp; + rpriv->atp_comp_cb = comp_cb; + rpriv->atp_map = (atp_map == 0) ? + UCS_BIT(rpriv->bulk.super.lane) : atp_map; + } else { + rpriv->put_comp_cb = comp_cb; + rpriv->atp_comp_cb = NULL; + rpriv->atp_map = 0; + } + rpriv->stage_after_put = UCP_PROTO_RNDV_PUT_STAGE_FLUSH; + rpriv->flush_map = rpriv->bulk.mpriv.lane_map; + ucs_assert(rpriv->flush_map != 0); + } + + if (send_atp) { + ucs_assert(rpriv->atp_map != 0); + } + rpriv->atp_num_lanes = ucs_popcount(rpriv->atp_map); + rpriv->stat_counter = stat_counter; + + ucp_proto_select_add_proto(¶ms->super.super, params->super.cfg_thresh, + params->super.cfg_priority, perf, rpriv, + UCP_PROTO_MULTI_EXTENDED_PRIV_SIZE(rpriv, + bulk.mpriv)); +} + static void ucp_proto_rndv_put_common_probe(const ucp_proto_init_params_t *init_params, uint64_t rndv_modes, size_t max_length, @@ -239,7 +322,6 @@ ucp_proto_rndv_put_common_probe(const ucp_proto_init_params_t *init_params, int support_ppln, uint8_t stat_counter, const ucp_memory_info_t *reg_mem_info) { - const size_t atp_size = sizeof(ucp_rndv_ack_hdr_t); ucp_context_t *context = init_params->worker->context; ucp_proto_multi_init_params_t params = { .super.super = *init_params, @@ -273,13 +355,11 @@ ucp_proto_rndv_put_common_probe(const ucp_proto_init_params_t *init_params, .opt_align_offs = ucs_offsetof(uct_iface_attr_t, cap.put.opt_zcopy_align), }; - const uct_iface_attr_t *iface_attr; - ucp_lane_index_t lane_idx, lane; ucp_proto_rndv_put_priv_t rpriv; - int send_atp, use_fence; - ucp_proto_perf_t *perf; + ucp_proto_lane_select_t *select; + ucp_lane_index_t i; + ucp_proto_perf_t *bulk_perf, *perf; ucs_status_t status; - unsigned atp_map; if ((init_params->select_param->dt_class != UCP_DATATYPE_CONTIG) || !ucp_proto_rndv_op_check(init_params, UCP_OP_ID_RNDV_SEND, @@ -288,81 +368,28 @@ ucp_proto_rndv_put_common_probe(const ucp_proto_init_params_t *init_params, return; } - status = ucp_proto_rndv_bulk_init(¶ms, UCP_PROTO_RNDV_PUT_DESC, - UCP_PROTO_RNDV_ATP_NAME, &perf, - &rpriv.bulk); + status = ucp_proto_multi_init(¶ms, UCP_PROTO_RNDV_PUT_DESC, &select); if (status != UCS_OK) { return; } - send_atp = !ucp_proto_rndv_init_params_is_ppln_frag(init_params); - - /* Check which lanes support sending ATP */ - atp_map = 0; - for (lane_idx = 0; lane_idx < rpriv.bulk.mpriv.num_lanes; ++lane_idx) { - lane = rpriv.bulk.mpriv.lanes[lane_idx].super.lane; - iface_attr = ucp_proto_common_get_iface_attr(init_params, lane); - if (((iface_attr->cap.flags & UCT_IFACE_FLAG_AM_SHORT) && - (iface_attr->cap.am.max_short >= atp_size)) || - ((iface_attr->cap.flags & UCT_IFACE_FLAG_AM_BCOPY) && - (iface_attr->cap.am.max_bcopy >= atp_size))) { - atp_map |= UCS_BIT(lane); + for (i = 0; i < select->num_selections; ++i) { + status = ucp_proto_multi_init_perf(¶ms, select, &select->selections[i], + &bulk_perf, &rpriv.bulk.mpriv); + if (status != UCS_OK) { + continue; } - } - - /* Use fence only if all lanes support sending ATP and flush is not forced - */ - use_fence = send_atp && !context->config.ext.rndv_put_force_flush && - (rpriv.bulk.mpriv.lane_map == atp_map); - /* All lanes can send ATP - invalidate am_lane, to use mpriv->lanes. - * Otherwise, would need to flush all lanes and send ATP on: - * - All lanes supporting ATP send. This ensures that data is flushed - * remotely (i.e. resides in the target buffer), which may not be the case - * with IB transports. An alternative would be to pass - * UCT_FLUSH_FLAG_REMOTE to uct_ep_flush, but using this flag increases - * UCP worker address size. - * TODO: Consider calling UCT ep flush with remote flag when/if address - * size is not an issue anymore. - * - Control lane if none of the lanes support sending ATP - */ - if (use_fence) { - /* Send fence followed by ATP on all lanes */ - rpriv.bulk.super.lane = UCP_NULL_LANE; - rpriv.put_comp_cb = comp_cb; - rpriv.atp_comp_cb = NULL; - rpriv.stage_after_put = UCP_PROTO_RNDV_PUT_STAGE_FENCED_ATP; - rpriv.flush_map = 0; - rpriv.atp_map = rpriv.bulk.mpriv.lane_map; - } else { - /* Flush all lanes and send ATP on all supporting lanes (or control lane - * otherwise) */ - if (send_atp) { - rpriv.put_comp_cb = - ucp_proto_rndv_put_common_flush_completion_send_atp; - rpriv.atp_comp_cb = comp_cb; - rpriv.atp_map = (atp_map == 0) ? - UCS_BIT(rpriv.bulk.super.lane) : atp_map; - } else { - rpriv.put_comp_cb = comp_cb; - rpriv.atp_comp_cb = NULL; - rpriv.atp_map = 0; + status = ucp_proto_rndv_bulk_init(¶ms, UCP_PROTO_RNDV_ATP_NAME, + bulk_perf, &perf, &rpriv.bulk); + if (status != UCS_OK) { + continue; } - rpriv.stage_after_put = UCP_PROTO_RNDV_PUT_STAGE_FLUSH; - rpriv.flush_map = rpriv.bulk.mpriv.lane_map; - ucs_assert(rpriv.flush_map != 0); - } - if (send_atp) { - ucs_assert(rpriv.atp_map != 0); + ucp_proto_rndv_put_add_proto(¶ms, comp_cb, stat_counter, perf, &rpriv); } - rpriv.atp_num_lanes = ucs_popcount(rpriv.atp_map); - rpriv.stat_counter = stat_counter; - ucp_proto_select_add_proto(¶ms.super.super, params.super.cfg_thresh, - params.super.cfg_priority, perf, &rpriv, - UCP_PROTO_MULTI_EXTENDED_PRIV_SIZE(&rpriv, - bulk.mpriv)); + ucp_proto_lane_select_destroy(select); } static const char * From da419b4bc69ebf75876bb16980701799c40a39a4 Mon Sep 17 00:00:00 2001 From: Ilia Yastrebov Date: Mon, 4 Aug 2025 13:35:55 +0000 Subject: [PATCH 12/18] UCP/PROTO: Temporarily disabled variants by default --- src/ucp/core/ucp_context.c | 2 +- src/ucp/proto/proto_common.c | 23 +++++++++++++---------- 2 files changed, 14 insertions(+), 11 deletions(-) diff --git a/src/ucp/core/ucp_context.c b/src/ucp/core/ucp_context.c index a2dad24a4bc..c882096416c 100644 --- a/src/ucp/core/ucp_context.c +++ b/src/ucp/core/ucp_context.c @@ -504,7 +504,7 @@ static ucs_config_field_t ucp_context_config_table[] = { "directory.", ucs_offsetof(ucp_context_config_t, proto_info_dir), UCS_CONFIG_TYPE_STRING}, - {"PROTO_VARIANTS", "y", + {"PROTO_VARIANTS", "n", "Enable multiple variants of UCP protocols, meaning that a single protocol\n" "may have multiple variants (optimized for latency or bandwidth) for the same\n" "operation. The value is interpreted as follows:\n" diff --git a/src/ucp/proto/proto_common.c b/src/ucp/proto/proto_common.c index 948a2928128..c827687be97 100644 --- a/src/ucp/proto/proto_common.c +++ b/src/ucp/proto/proto_common.c @@ -1099,13 +1099,13 @@ ucp_proto_lane_select_trace(const ucp_proto_init_params_t *params, static void ucp_proto_lane_select_distinct(ucp_proto_lane_select_t *select) { + ucp_lane_index_t num_distinct = 1; ucp_lane_index_t i, j; int unique; - select->num_selections = 1; - for (i = 1; i < UCP_PROTO_VARIANT_LAST; ++i) { + for (i = num_distinct; i < select->num_selections; ++i) { unique = 1; - for (j = 0; j < select->num_selections; ++j) { + for (j = 0; j < num_distinct; ++j) { if (select->selections[i].lane_map == select->selections[j].lane_map) { unique = 0; break; @@ -1113,13 +1113,15 @@ ucp_proto_lane_select_distinct(ucp_proto_lane_select_t *select) } if (unique) { - if (i != select->num_selections) { - select->selections[select->num_selections] = select->selections[i]; + if (i != num_distinct) { + select->selections[num_distinct] = select->selections[i]; } - ++select->num_selections; + ++num_distinct; } } + + select->num_selections = num_distinct; } static void @@ -1174,7 +1176,7 @@ ucp_proto_lane_select_init(const ucp_proto_common_init_params_t *params, ucp_proto_lane_select_t *select; ucp_proto_lane_selection_t *selection; ucs_status_t status; - ucp_proto_variant_t variant; + ucp_proto_variant_t variant, max_variant; select = ucs_mpool_get(&worker->proto_select_mp); if (select == NULL) { @@ -1218,18 +1220,19 @@ ucp_proto_lane_select_init(const ucp_proto_common_init_params_t *params, } /* Initialize all protocol variants */ - for (variant = 0; variant < UCP_PROTO_VARIANT_LAST; ++variant) { + max_variant = context->config.ext.proto_variants_enable ? + UCP_PROTO_VARIANT_LAST : 1; + for (variant = 0; variant < max_variant; ++variant) { selection = &select->selections[variant]; ucp_proto_lane_select(¶ms->super, req, select, variant, selection); ucp_proto_lane_select_trace(¶ms->super, select, selection, "found variant", UCS_LOG_LEVEL_TRACE); + ++select->num_selections; } /* Select distinct protocol variants */ ucp_proto_lane_select_distinct(select); ucs_assert(select->num_selections > 0); - /* TODO: remove */ - select->num_selections = 1; /* Initialize selected distinct protocol variants */ for (i = 0; i < select->num_selections; ++i) { From d342665595debcfc1848bc2d1651f8333596a2d5 Mon Sep 17 00:00:00 2001 From: Ilia Yastrebov Date: Mon, 4 Aug 2025 14:10:38 +0000 Subject: [PATCH 13/18] UCP/PROTO: Allocate on the heap when mp is not created yet --- src/ucp/proto/proto_common.c | 13 +++++++++++-- src/ucp/proto/proto_common.h | 1 + 2 files changed, 12 insertions(+), 2 deletions(-) diff --git a/src/ucp/proto/proto_common.c b/src/ucp/proto/proto_common.c index c827687be97..29db0536c27 100644 --- a/src/ucp/proto/proto_common.c +++ b/src/ucp/proto/proto_common.c @@ -1178,12 +1178,17 @@ ucp_proto_lane_select_init(const ucp_proto_common_init_params_t *params, ucs_status_t status; ucp_proto_variant_t variant, max_variant; - select = ucs_mpool_get(&worker->proto_select_mp); + if (worker->proto_select_mp.data != NULL) { + select = ucs_mpool_get(&worker->proto_select_mp); + } else { + select = ucs_malloc(sizeof(*select), "ucp_proto_lane_select_t"); + } if (select == NULL) { return UCS_ERR_NO_MEMORY; } memset(select, 0, sizeof(*select)); + select->mp_alloc = worker->proto_select_mp.data != NULL; /* Get bandwidth of all lanes and max_bandwidth */ for (i = 0; i < req->num_lanes; ++i) { @@ -1269,5 +1274,9 @@ void ucp_proto_lane_select_destroy(ucp_proto_lane_select_t *select) ucp_proto_perf_node_deref(&select->selections[i].perf.node); } - ucs_mpool_put(select); + if (select->mp_alloc) { + ucs_mpool_put(select); + } else { + ucs_free(select); + } } diff --git a/src/ucp/proto/proto_common.h b/src/ucp/proto/proto_common.h index 40309edb5fc..95936adb038 100644 --- a/src/ucp/proto/proto_common.h +++ b/src/ucp/proto/proto_common.h @@ -228,6 +228,7 @@ typedef struct { ucp_proto_common_tl_perf_t lanes_perf[UCP_PROTO_MAX_LANES]; ucp_proto_lane_selection_t selections[UCP_PROTO_VARIANT_LAST]; ucp_lane_index_t num_selections; + int mp_alloc; } ucp_proto_lane_select_t; From bab801e656032c41d7a1c96809f5b7961b35c1b9 Mon Sep 17 00:00:00 2001 From: Ilia Yastrebov Date: Mon, 4 Aug 2025 14:33:32 +0000 Subject: [PATCH 14/18] UCP/PROTO: Added unit test --- test/gtest/ucp/test_ucp_proto_mock.cc | 33 +++++++++++++++++++++------ 1 file changed, 26 insertions(+), 7 deletions(-) diff --git a/test/gtest/ucp/test_ucp_proto_mock.cc b/test/gtest/ucp/test_ucp_proto_mock.cc index f70eee451ca..16f700bb11f 100644 --- a/test/gtest/ucp/test_ucp_proto_mock.cc +++ b/test/gtest/ucp/test_ucp_proto_mock.cc @@ -534,19 +534,38 @@ class test_ucp_proto_mock_rcx : public test_ucp_proto_mock { }; UCS_TEST_P(test_ucp_proto_mock_rcx, memtype_copy_enable, - "IB_NUM_PATHS?=1", "MAX_RNDV_LANES=1", - "MEMTYPE_COPY_ENABLE=n") + "IB_NUM_PATHS?=1", "MAX_RNDV_LANES=1", "MEMTYPE_COPY_ENABLE=n", + "PROTO_VARIANTS=n") { ucp_proto_select_key_t key = any_key(); key.param.op_id_flags = UCP_OP_ID_AM_SEND; key.param.op_attr = 0; check_ep_config(sender(), { - {0, 0, "rendezvous no data fetch", ""}, - {1, 64, "rendezvous zero-copy fenced write to remote", - "rc_mlx5/mock_0:1"}, - {21992, INF, "rendezvous zero-copy read from remote", - "rc_mlx5/mock_0:1"}, + {0, 0, "rendezvous no data fetch", ""}, + {1, 64, "rendezvous zero-copy fenced write to remote", + "rc_mlx5/mock_0:1"}, + {65, INF, "rendezvous zero-copy read from remote", + "rc_mlx5/mock_0:1"}, + }, key); +} + +UCS_TEST_P(test_ucp_proto_mock_rcx, proto_variants_enable, + "IB_NUM_PATHS?=1", "MAX_RNDV_LANES=1", "MEMTYPE_COPY_ENABLE=n", + "PROTO_VARIANTS=y") +{ + ucp_proto_select_key_t key = any_key(); + key.param.op_id_flags = UCP_OP_ID_AM_SEND; + key.param.op_attr = 0; + + check_ep_config(sender(), { + {0, 0, "rendezvous no data fetch", ""}, + {1, 64, "rendezvous zero-copy fenced write to remote", + "rc_mlx5/mock_1:1"}, + {65, 16800, "rendezvous zero-copy read from remote", + "rc_mlx5/mock_1:1"}, + {16801, INF, "rendezvous zero-copy read from remote", + "rc_mlx5/mock_0:1"}, }, key); } From b17ccaee54425bb6e306c580b21dba7cb82390d3 Mon Sep 17 00:00:00 2001 From: Ilia Yastrebov Date: Tue, 5 Aug 2025 05:24:38 +0000 Subject: [PATCH 15/18] UCP/PROTO: Fixed coverity warning --- src/ucp/proto/proto_multi.c | 1 + 1 file changed, 1 insertion(+) diff --git a/src/ucp/proto/proto_multi.c b/src/ucp/proto/proto_multi.c index 7b204d1d571..2f0a18ef5bc 100644 --- a/src/ucp/proto/proto_multi.c +++ b/src/ucp/proto/proto_multi.c @@ -201,6 +201,7 @@ ucp_proto_multi_init(const ucp_proto_multi_init_params_t *params, size_t ucp_proto_multi_priv_size(const ucp_proto_multi_priv_t *mpriv) { + ucs_assert_always(mpriv->num_lanes <= UCP_MAX_LANES); return ucs_offsetof(ucp_proto_multi_priv_t, lanes) + (mpriv->num_lanes * ucs_field_sizeof(ucp_proto_multi_priv_t, lanes[0])); From cd938c180627bb437071d7aabf4d2ff09f74c580 Mon Sep 17 00:00:00 2001 From: Ilia Yastrebov Date: Tue, 5 Aug 2025 07:51:15 +0000 Subject: [PATCH 16/18] UCP/PROTO: Remove slow lanes per selection --- src/ucp/proto/proto_common.c | 82 +++++++++++++++++++++++------------- 1 file changed, 52 insertions(+), 30 deletions(-) diff --git a/src/ucp/proto/proto_common.c b/src/ucp/proto/proto_common.c index 29db0536c27..ac80f0972c3 100644 --- a/src/ucp/proto/proto_common.c +++ b/src/ucp/proto/proto_common.c @@ -1004,8 +1004,8 @@ ucp_proto_lane_select_find(const ucp_proto_init_params_t *params, } static UCS_F_ALWAYS_INLINE void -ucp_proto_lane_select_add(ucp_proto_lane_selection_t *selection, - const ucp_proto_init_params_t *params, +ucp_proto_lane_select_add(const ucp_proto_init_params_t *params, + ucp_proto_lane_selection_t *selection, ucp_lane_index_t lane) { ucp_rsc_index_t dev_index = ucp_proto_common_get_dev_index(params, lane); @@ -1035,7 +1035,7 @@ ucp_proto_lane_select(const ucp_proto_init_params_t *params, index_map = UCS_MASK(select->num_lanes); if (req->fixed_first_lane) { - ucp_proto_lane_select_add(selection, params, select->lanes[0]); + ucp_proto_lane_select_add(params, selection, select->lanes[0]); index_map &= ~UCS_BIT(0); } @@ -1048,11 +1048,54 @@ ucp_proto_lane_select(const ucp_proto_init_params_t *params, break; } - ucp_proto_lane_select_add(selection, params, select->lanes[lane_index]); + ucp_proto_lane_select_add(params, selection, select->lanes[lane_index]); index_map &= ~UCS_BIT(lane_index); } } +static void +ucp_proto_lane_select_fast_lanes(const ucp_proto_init_params_t *params, + const ucp_proto_lane_select_req_t *req, + const ucp_proto_lane_select_t *select, + ucp_proto_lane_selection_t *selection) +{ + ucp_context_h context = params->worker->context; + const double max_bw_ratio = context->config.ext.multi_lane_max_ratio; + double max_bandwidth = 0; + ucp_proto_lane_selection_t fast = {0}; + + const ucp_proto_common_tl_perf_t *lane_perf; + ucp_lane_index_t i, lane; + + /* Get bandwidth of all lanes and max_bandwidth */ + for (i = 0; i < selection->num_lanes; ++i) { + lane = selection->lanes[i]; + lane_perf = &select->lanes_perf[lane]; + + /* Calculate maximal BW of all selected lanes, to skip slow lanes */ + max_bandwidth = ucs_max(max_bandwidth, lane_perf->bandwidth); + } + + /* Add fast lanes to the selection */ + for (i = 0; i < selection->num_lanes; ++i) { + lane = selection->lanes[i]; + lane_perf = &select->lanes_perf[lane]; + + if ((req->fixed_first_lane && (i == 0)) || + ((lane_perf->bandwidth * max_bw_ratio) >= max_bandwidth)) { + ucp_proto_lane_select_add(params, &fast, lane); + ucs_trace("avail " UCP_PROTO_LANE_FMT, + UCP_PROTO_LANE_ARG(params, lane, lane_perf)); + } else { + ucs_trace("drop " UCP_PROTO_LANE_FMT, + UCP_PROTO_LANE_ARG(params, lane, lane_perf)); + } + } + + fast.variant = selection->variant; + *selection = fast; +} + static void ucp_proto_lane_select_trace(const ucp_proto_init_params_t *params, const ucp_proto_lane_select_t *select, @@ -1167,10 +1210,8 @@ ucp_proto_lane_select_init(const ucp_proto_common_init_params_t *params, const ucp_proto_lane_select_req_t *req, ucp_proto_lane_select_t **select_p) { - ucp_worker_h worker = params->super.worker; - ucp_context_h context = worker->context; - const double max_bw_ratio = context->config.ext.multi_lane_max_ratio; - double max_bandwidth = 0; + ucp_worker_h worker = params->super.worker; + ucp_context_h context = worker->context; ucp_proto_common_tl_perf_t *lane_perf; ucp_lane_index_t i, lane; ucp_proto_lane_select_t *select; @@ -1190,7 +1231,7 @@ ucp_proto_lane_select_init(const ucp_proto_common_init_params_t *params, memset(select, 0, sizeof(*select)); select->mp_alloc = worker->proto_select_mp.data != NULL; - /* Get bandwidth of all lanes and max_bandwidth */ + /* Get bandwidth of all lanes */ for (i = 0; i < req->num_lanes; ++i) { lane = req->lanes[i]; lane_perf = &select->lanes_perf[lane]; @@ -1201,27 +1242,7 @@ ucp_proto_lane_select_init(const ucp_proto_common_init_params_t *params, return status; } - /* Calculate maximal bandwidth of all lanes, to skip slow lanes */ - max_bandwidth = ucs_max(max_bandwidth, lane_perf->bandwidth); - } - - /* Add fast lanes to the selection */ - for (i = 0; i < req->num_lanes; ++i) { - lane = req->lanes[i]; - lane_perf = &select->lanes_perf[lane]; - - if ((req->fixed_first_lane && (i == 0)) || - ((lane_perf->bandwidth * max_bw_ratio) >= max_bandwidth)) { - select->lanes[select->num_lanes++] = lane; - ucs_trace("avail " UCP_PROTO_LANE_FMT, - UCP_PROTO_LANE_ARG(¶ms->super, lane, lane_perf)); - } else { - /* Bandwidth on this lane is too low compared to the fastest - available lane, so it's not worth using it */ - ucp_proto_perf_node_deref(&lane_perf->node); - ucs_trace("drop " UCP_PROTO_LANE_FMT, - UCP_PROTO_LANE_ARG(¶ms->super, lane, lane_perf)); - } + select->lanes[select->num_lanes++] = lane; } /* Initialize all protocol variants */ @@ -1230,6 +1251,7 @@ ucp_proto_lane_select_init(const ucp_proto_common_init_params_t *params, for (variant = 0; variant < max_variant; ++variant) { selection = &select->selections[variant]; ucp_proto_lane_select(¶ms->super, req, select, variant, selection); + ucp_proto_lane_select_fast_lanes(¶ms->super, req, select, selection); ucp_proto_lane_select_trace(¶ms->super, select, selection, "found variant", UCS_LOG_LEVEL_TRACE); ++select->num_selections; From ab717f30d2a74177d5f93cfd710ca030db26d769 Mon Sep 17 00:00:00 2001 From: Ilia Yastrebov Date: Tue, 12 Aug 2025 14:04:57 +0000 Subject: [PATCH 17/18] UCP/PROTO/GTEST: Use CX7 only for IB device mocking --- test/gtest/ucp/test_ucp_proto_mock.cc | 22 +++++++++++++--------- 1 file changed, 13 insertions(+), 9 deletions(-) diff --git a/test/gtest/ucp/test_ucp_proto_mock.cc b/test/gtest/ucp/test_ucp_proto_mock.cc index 16f700bb11f..e5dc3be47ff 100644 --- a/test/gtest/ucp/test_ucp_proto_mock.cc +++ b/test/gtest/ucp/test_ucp_proto_mock.cc @@ -39,12 +39,17 @@ class mock_iface { void add_mock_iface( const std::string &dev_name = "mock", iface_attr_func_t cb = [](uct_iface_attr_t &iface_attr) {}, - perf_attr_func_t perf_cb = cx7_perf_mock) + perf_attr_func_t perf_cb = [](uct_perf_attr_t& perf_attr) {}) { m_iface_attrs_funcs[dev_name] = cb; m_perf_attrs_funcs[dev_name] = perf_cb; } + void add_mock_cx7_iface(const std::string &dev_name, iface_attr_func_t cb) + { + add_mock_iface(dev_name, cb, cx7_perf_mock); + } + void mock_transport(const std::string &tl_name) { uct_component_h component; @@ -163,8 +168,7 @@ class mock_iface { static void cx7_perf_mock(uct_perf_attr_t& perf_attr) { - perf_attr.path_bandwidth.shared = 0.95 * perf_attr.bandwidth.shared; - perf_attr.path_bandwidth.dedicated = 0; + perf_attr.path_bandwidth.shared = 0.95 * perf_attr.bandwidth.shared; } /* We have to use singleton to mock C functions */ @@ -515,7 +519,7 @@ class test_ucp_proto_mock_rcx : public test_ucp_proto_mock { virtual void init() override { /* Device with higher BW and latency */ - add_mock_iface("mock_0:1", [](uct_iface_attr_t &iface_attr) { + add_mock_cx7_iface("mock_0:1", [](uct_iface_attr_t &iface_attr) { iface_attr.cap.am.max_short = 2000; iface_attr.bandwidth.shared = 28e9; iface_attr.latency.c = 600e-9; @@ -523,7 +527,7 @@ class test_ucp_proto_mock_rcx : public test_ucp_proto_mock { iface_attr.cap.get.max_zcopy = 16384; }); /* Device with smaller BW but lower latency */ - add_mock_iface("mock_1:1", [](uct_iface_attr_t &iface_attr) { + add_mock_cx7_iface("mock_1:1", [](uct_iface_attr_t &iface_attr) { iface_attr.cap.am.max_short = 208; iface_attr.bandwidth.shared = 24e9; iface_attr.latency.c = 500e-9; @@ -661,7 +665,7 @@ class test_ucp_proto_mock_rcx2 : public test_ucp_proto_mock { virtual void init() override { /* Device with high BW and lower latency */ - add_mock_iface("mock_0:1", [](uct_iface_attr_t &iface_attr) { + add_mock_cx7_iface("mock_0:1", [](uct_iface_attr_t &iface_attr) { iface_attr.cap.am.max_short = 208; iface_attr.bandwidth.shared = 28e9; iface_attr.latency.c = 500e-9; @@ -669,7 +673,7 @@ class test_ucp_proto_mock_rcx2 : public test_ucp_proto_mock { iface_attr.cap.get.max_zcopy = 16384; }); /* Device with lower BW and higher latency */ - add_mock_iface("mock_1:1", [](uct_iface_attr_t &iface_attr) { + add_mock_cx7_iface("mock_1:1", [](uct_iface_attr_t &iface_attr) { iface_attr.cap.am.max_short = 2000; iface_attr.bandwidth.shared = 24e9; iface_attr.latency.c = 600e-9; @@ -709,7 +713,7 @@ class test_ucp_proto_mock_rcx3 : public test_ucp_proto_mock { { /* Device with high BW and lower latency, but 0 get_zcopy. * This use case is similar to cuda_ipc when NVLink is not available. */ - add_mock_iface("mock_0:1", [](uct_iface_attr_t &iface_attr) { + add_mock_cx7_iface("mock_0:1", [](uct_iface_attr_t &iface_attr) { iface_attr.cap.am.max_short = 208; iface_attr.bandwidth.shared = 28e9; iface_attr.latency.c = 500e-9; @@ -717,7 +721,7 @@ class test_ucp_proto_mock_rcx3 : public test_ucp_proto_mock { iface_attr.cap.get.max_zcopy = 0; }); /* Device with lower BW and higher latency */ - add_mock_iface("mock_1:1", [](uct_iface_attr_t &iface_attr) { + add_mock_cx7_iface("mock_1:1", [](uct_iface_attr_t &iface_attr) { iface_attr.cap.am.max_short = 2000; iface_attr.bandwidth.shared = 24e9; iface_attr.latency.c = 600e-9; From be3fd55dab45874c766af5029750cdb7a4721a66 Mon Sep 17 00:00:00 2001 From: Ilia Yastrebov Date: Thu, 14 Aug 2025 15:17:35 +0000 Subject: [PATCH 18/18] UCP/PROTO/GTEST: Latency variant: don't select lanes with higher latency --- src/ucp/proto/proto_common.c | 46 +++++++++++++------- test/gtest/ucp/test_ucp_proto_mock.cc | 60 +++++++++++++++++++++++++++ 2 files changed, 90 insertions(+), 16 deletions(-) diff --git a/src/ucp/proto/proto_common.c b/src/ucp/proto/proto_common.c index ac80f0972c3..9d753b0f621 100644 --- a/src/ucp/proto/proto_common.c +++ b/src/ucp/proto/proto_common.c @@ -673,7 +673,6 @@ ucp_proto_common_find_lanes(const ucp_proto_init_params_t *params, } } - ucs_trace("%s: added as lane %d", lane_desc, lane); lanes[num_lanes++] = lane; } @@ -930,7 +929,7 @@ void ucp_proto_fatal_invalid_stage(ucp_request_t *req, const char *func_name) static UCS_F_ALWAYS_INLINE double ucp_proto_lane_select_avail_bw(const ucp_proto_init_params_t *params, ucp_lane_index_t lane, - const ucp_proto_common_tl_perf_t *lane_perf, + const ucp_proto_lane_select_t *select, const ucp_proto_lane_selection_t *selection) { /* Minimal path ratio */ @@ -939,8 +938,10 @@ ucp_proto_lane_select_avail_bw(const ucp_proto_init_params_t *params, double multi_path_ratio = context->config.ext.multi_path_ratio; ucp_rsc_index_t dev_index = ucp_proto_common_get_dev_index(params, lane); uint8_t path_index = selection->dev_count[dev_index]; + const ucp_proto_common_tl_perf_t *lane_perf; double ratio; + lane_perf = &select->lanes_perf[lane]; if (UCS_CONFIG_DBL_IS_AUTO(multi_path_ratio)) { ratio = ucs_min(1.0 - (lane_perf->path_ratio * path_index), lane_perf->path_ratio); @@ -965,12 +966,29 @@ ucp_proto_lane_select_avail_bw(const ucp_proto_init_params_t *params, } static UCS_F_ALWAYS_INLINE double -ucp_proto_lane_select_score(double bw, - const ucp_proto_common_tl_perf_t *lane_perf, - ucp_proto_variant_t variant) +ucp_proto_lane_select_score(double bw, ucp_lane_index_t lane, + const ucp_proto_lane_select_t *select, + const ucp_proto_lane_selection_t *selection) { - size_t msg_size = ucp_proto_common_get_variant_msg_size(variant); - return 1.0 / ((msg_size / bw) + lane_perf->latency); + size_t msg_size = ucp_proto_common_get_variant_msg_size(selection->variant); + const ucp_proto_common_tl_perf_t *lane_perf, *best_lane_perf; + double latency, best_latency; + + lane_perf = &select->lanes_perf[lane]; + latency = lane_perf->latency + lane_perf->sys_latency; + + /* For latency variant, do not select lanes with higher latency than the + * best lane found so far */ + if ((selection->variant == UCP_PROTO_VARIANT_LAT) && + (selection->num_lanes > 0)) { + best_lane_perf = &select->lanes_perf[selection->lanes[0]]; + best_latency = best_lane_perf->latency + best_lane_perf->sys_latency; + if (ucp_score_cmp(latency, best_latency) > 0) { + return -1.0; + } + } + + return 1.0 / ((msg_size / bw) + latency); } static ucp_lane_index_t @@ -983,17 +1001,13 @@ ucp_proto_lane_select_find(const ucp_proto_init_params_t *params, ucp_lane_index_t max_index = UCP_NULL_LANE; double score; double avail_bw; - const ucp_proto_common_tl_perf_t *lane_perf; ucp_lane_index_t lane, index; ucs_assert(index_map != 0); ucs_for_each_bit(index, index_map) { - lane = select->lanes[index]; - lane_perf = &select->lanes_perf[lane]; - avail_bw = ucp_proto_lane_select_avail_bw(params, lane, lane_perf, - selection); - score = ucp_proto_lane_select_score(avail_bw, lane_perf, - selection->variant); + lane = select->lanes[index]; + avail_bw = ucp_proto_lane_select_avail_bw(params, lane, select, selection); + score = ucp_proto_lane_select_score(avail_bw, lane, select, selection); if (score > max_score) { max_score = score; max_index = index; @@ -1084,8 +1098,6 @@ ucp_proto_lane_select_fast_lanes(const ucp_proto_init_params_t *params, if ((req->fixed_first_lane && (i == 0)) || ((lane_perf->bandwidth * max_bw_ratio) >= max_bandwidth)) { ucp_proto_lane_select_add(params, &fast, lane); - ucs_trace("avail " UCP_PROTO_LANE_FMT, - UCP_PROTO_LANE_ARG(params, lane, lane_perf)); } else { ucs_trace("drop " UCP_PROTO_LANE_FMT, UCP_PROTO_LANE_ARG(params, lane, lane_perf)); @@ -1243,6 +1255,8 @@ ucp_proto_lane_select_init(const ucp_proto_common_init_params_t *params, } select->lanes[select->num_lanes++] = lane; + ucs_trace("avail " UCP_PROTO_LANE_FMT, + UCP_PROTO_LANE_ARG(¶ms->super, lane, lane_perf)); } /* Initialize all protocol variants */ diff --git a/test/gtest/ucp/test_ucp_proto_mock.cc b/test/gtest/ucp/test_ucp_proto_mock.cc index e5dc3be47ff..11117e9a939 100644 --- a/test/gtest/ucp/test_ucp_proto_mock.cc +++ b/test/gtest/ucp/test_ucp_proto_mock.cc @@ -749,6 +749,66 @@ UCS_TEST_P(test_ucp_proto_mock_rcx3, single_lane_no_zcopy, UCP_INSTANTIATE_TEST_CASE_TLS(test_ucp_proto_mock_rcx3, rcx, "rc_x") +class test_ucp_proto_mock_variants : public test_ucp_proto_mock { +public: + test_ucp_proto_mock_variants() + { + mock_transport("rc_mlx5"); + } + + virtual void init() override + { + /* Mock for "cuda_ipc" device. */ + add_mock_iface("nvlink", [](uct_iface_attr_t &iface_attr) { + iface_attr.cap.am.max_short = 0; + iface_attr.bandwidth.shared = 100e9; + iface_attr.latency.c = 2000e-9; + iface_attr.latency.m = 1e-9; + }, [](uct_perf_attr_t &perf_attr) { + perf_attr.path_bandwidth.shared = 0.5 * perf_attr.bandwidth.shared; + }); + /* IB device attached to GPU */ + add_mock_cx7_iface("fast:1", [](uct_iface_attr_t &iface_attr) { + iface_attr.cap.am.max_short = 2000; + iface_attr.bandwidth.shared = 45e9; + iface_attr.latency.c = 600e-9; + iface_attr.latency.m = 1e-9; + }); + /* IB device not attached to GPU, therefore both BW and latency are + * limited by distance */ + add_mock_cx7_iface("slow:1", [](uct_iface_attr_t &iface_attr) { + iface_attr.cap.am.max_short = 2000; + iface_attr.bandwidth.shared = 25e9; + iface_attr.latency.c = 900e-9; + iface_attr.latency.m = 1e-9; + }); + test_ucp_proto_mock::init(); + } +}; + +UCS_TEST_P(test_ucp_proto_mock_variants, latency_variant, + "IB_NUM_PATHS?=2", "MAX_RNDV_LANES=2", "RNDV_THRESH=0", "PROTO_VARIANTS=y") +{ + ucp_proto_select_key_t key = any_key(); + key.param.op_id_flags = UCP_OP_ID_AM_SEND; + key.param.op_attr = 0; + + /* + * Variant for latency must not include slow lane, because it adds + * latency to the path. + */ + check_ep_config(sender(), { + {1, 129, "rendezvous fragmented copy-in copy-out", + "rc_mlx5/fast:1/path0"}, + {130, 229091, "rendezvous zero-copy read from remote", + "rc_mlx5/fast:1 50% on path0 and 50% on path1"}, + {229092, INF, "rendezvous zero-copy read from remote", + "rc_mlx5/nvlink 50% on path0 and 50% on path1"}, + }, key); +} + +UCP_INSTANTIATE_TEST_CASE_TLS(test_ucp_proto_mock_variants, rcx, "rc_x") + class test_ucp_proto_mock_cma : public test_ucp_proto_mock { public: test_ucp_proto_mock_cma()