diff --git a/src/ucp/core/ucp_context.c b/src/ucp/core/ucp_context.c index aa66d5ccfe9..c39e8f8d48b 100644 --- a/src/ucp/core/ucp_context.c +++ b/src/ucp/core/ucp_context.c @@ -504,6 +504,14 @@ static ucs_config_field_t ucp_context_config_table[] = { "directory.", ucs_offsetof(ucp_context_config_t, proto_info_dir), UCS_CONFIG_TYPE_STRING}, + {"PROTO_VARIANTS", "n", + "Enable multiple variants of UCP protocols, meaning that a single protocol\n" + "may have multiple variants (optimized for latency or bandwidth) for the same\n" + "operation. The value is interpreted as follows:\n" + " 'y' : Enable multiple variants\n" + " 'n' : Disable multiple variants\n", + ucs_offsetof(ucp_context_config_t, proto_variants_enable), UCS_CONFIG_TYPE_BOOL}, + {"REG_NONBLOCK_MEM_TYPES", "", "Perform only non-blocking memory registration for these memory types.\n" "Non-blocking registration means that the page registration may be\n" diff --git a/src/ucp/core/ucp_context.h b/src/ucp/core/ucp_context.h index 7df775955db..05cbaac0d3d 100644 --- a/src/ucp/core/ucp_context.h +++ b/src/ucp/core/ucp_context.h @@ -184,6 +184,8 @@ typedef struct ucp_context_config { char *select_distance_md; /** Directory to write protocol selection information */ char *proto_info_dir; + /** Enable multiple variants of UCP protocols */ + int proto_variants_enable; /** Memory types that perform non-blocking registration by default */ uint64_t reg_nb_mem_types; /** Prefer native RMA transports for RMA/AMO protocols */ diff --git a/src/ucp/core/ucp_worker.c b/src/ucp/core/ucp_worker.c index 81bb56ec9be..47bb674fe7b 100644 --- a/src/ucp/core/ucp_worker.c +++ b/src/ucp/core/ucp_worker.c @@ -130,6 +130,14 @@ static ucs_mpool_ops_t ucp_rkey_mpool_ops = { .obj_str = NULL }; +static ucs_mpool_ops_t ucp_proto_select_mpool_ops = { + .chunk_alloc = ucs_mpool_chunk_malloc, + .chunk_release = ucs_mpool_chunk_free, + .obj_init = NULL, + .obj_cleanup = NULL, + .obj_str = NULL +}; + #define ucp_worker_discard_uct_ep_hash_key(_uct_ep) \ kh_int64_hash_func((uintptr_t)(_uct_ep)) @@ -2006,6 +2014,16 @@ static ucs_status_t ucp_worker_init_mpools(ucp_worker_h worker) worker->flags |= UCP_WORKER_FLAG_AM_MPOOL_INITIALIZED; } + ucs_mpool_params_reset(&mp_params); + mp_params.elem_size = sizeof(ucp_proto_lane_select_t); + mp_params.elems_per_chunk = 8; + mp_params.ops = &ucp_proto_select_mpool_ops; + mp_params.name = "ucp_proto_lane_select"; + status = ucs_mpool_init(&mp_params, &worker->proto_select_mp); + if (status != UCS_OK) { + goto err_reg_mp_cleanup; + } + return UCS_OK; err_reg_mp_cleanup: @@ -2043,6 +2061,7 @@ static void ucp_worker_destroy_mpools(ucp_worker_h worker) } ucs_mpool_cleanup(&worker->req_mp, !(worker->flags & UCP_WORKER_FLAG_IGNORE_REQUEST_LEAK)); + ucs_mpool_cleanup(&worker->proto_select_mp, 1); } static unsigned ucp_worker_ep_config_free_cb(void *arg) diff --git a/src/ucp/core/ucp_worker.h b/src/ucp/core/ucp_worker.h index 561f0ec8043..f1101866eff 100644 --- a/src/ucp/core/ucp_worker.h +++ b/src/ucp/core/ucp_worker.h @@ -339,6 +339,7 @@ typedef struct ucp_worker { ucp_worker_cm_t *cms; /* Array of CMs, one for each component */ ucs_mpool_set_t am_mps; /* Memory pool set for AM receives */ ucs_mpool_t reg_mp; /* Registered memory pool */ + ucs_mpool_t proto_select_mp; /* Protocol selection memory pool */ ucp_worker_mpool_hash_t mpool_hash; /* Hash table of memory pools */ ucs_queue_head_t rkey_ptr_reqs; /* Queue of submitted RKEY PTR requests that * are in-progress */ diff --git a/src/ucp/proto/proto_common.c b/src/ucp/proto/proto_common.c index 5704ba33f3a..2bba10bbbd7 100644 --- a/src/ucp/proto/proto_common.c +++ b/src/ucp/proto/proto_common.c @@ -235,7 +235,7 @@ ucp_proto_common_get_frag_size(const ucp_proto_common_init_params_t *params, /* Update 'perf' with the distance */ static void ucp_proto_common_update_lane_perf_by_distance( - ucp_proto_common_tl_perf_t *perf, ucp_proto_perf_node_t *perf_node, + ucp_proto_common_tl_perf_t *perf, const ucs_sys_dev_distance_t *distance, const char *perf_name, const char *perf_fmt, ...) { @@ -261,7 +261,7 @@ static void ucp_proto_common_update_lane_perf_by_distance( sys_perf_node = ucp_proto_perf_node_new_data(perf_name, "%s", perf_node_desc); ucp_proto_perf_node_add_data(sys_perf_node, "", distance_func); - ucp_proto_perf_node_own_child(perf_node, &sys_perf_node); + ucp_proto_perf_node_own_child(perf->node, &sys_perf_node); } void ucp_proto_common_lane_perf_node(ucp_context_h context, @@ -317,6 +317,7 @@ static void ucp_proto_common_tl_perf_reset(ucp_proto_common_tl_perf_t *tl_perf) tl_perf->sys_latency = 0; tl_perf->min_length = 0; tl_perf->max_frag = SIZE_MAX; + tl_perf->node = NULL; } static void ucp_proto_common_perf_attr_set_mem_type( @@ -337,15 +338,14 @@ static void ucp_proto_common_perf_attr_set_mem_type( ucs_status_t ucp_proto_common_get_lane_perf(const ucp_proto_common_init_params_t *params, ucp_lane_index_t lane, - ucp_proto_common_tl_perf_t *tl_perf, - ucp_proto_perf_node_t **perf_node_p) + ucp_proto_common_tl_perf_t *tl_perf) { ucp_worker_h worker = params->super.worker; ucp_context_h context = worker->context; ucp_rsc_index_t rsc_index = ucp_proto_common_get_rsc_index(¶ms->super, lane); ucp_worker_iface_t *wiface = ucp_worker_iface(worker, rsc_index); - ucp_proto_perf_node_t *perf_node, *lane_perf_node; + ucp_proto_perf_node_t *lane_perf_node; const ucp_rkey_config_t *rkey_config; ucs_sys_dev_distance_t distance; size_t tl_min_frag, tl_max_frag; @@ -356,7 +356,6 @@ ucp_proto_common_get_lane_perf(const ucp_proto_common_init_params_t *params, if (lane == UCP_NULL_LANE) { ucp_proto_common_tl_perf_reset(tl_perf); - *perf_node_p = NULL; return UCS_OK; } @@ -370,9 +369,9 @@ ucp_proto_common_get_lane_perf(const ucp_proto_common_init_params_t *params, return UCS_ERR_INVALID_PARAM; } - perf_node = ucp_proto_perf_node_new_data("lane", "%u ppn %u eps", - context->config.est_num_ppn, - context->config.est_num_eps); + tl_perf->node = ucp_proto_perf_node_new_data("lane", "%u ppn %u eps", + context->config.est_num_ppn, + context->config.est_num_eps); perf_attr.field_mask = UCT_PERF_ATTR_FIELD_OPERATION | UCT_PERF_ATTR_FIELD_SEND_PRE_OVERHEAD | @@ -406,7 +405,7 @@ ucp_proto_common_get_lane_perf(const ucp_proto_common_init_params_t *params, ucp_proto_common_lane_perf_node(context, rsc_index, &perf_attr, &lane_perf_node); - ucp_proto_perf_node_own_child(perf_node, &lane_perf_node); + ucp_proto_perf_node_own_child(tl_perf->node, &lane_perf_node); /* If reg_mem_info type is not unknown we assume the protocol is going to * send that mem type in a zero copy fashion. So, need to consider the @@ -423,7 +422,7 @@ ucp_proto_common_get_lane_perf(const ucp_proto_common_init_params_t *params, ucp_proto_common_get_lane_distance(¶ms->super, lane, sys_dev, &distance); ucp_proto_common_update_lane_perf_by_distance( - tl_perf, perf_node, &distance, "local system", "%s %s", + tl_perf, &distance, "local system", "%s %s", ucs_topo_sys_device_get_name(sys_dev), ucs_topo_sys_device_bdf_name(sys_dev, bdf_name, sizeof(bdf_name))); @@ -437,7 +436,7 @@ ucp_proto_common_get_lane_perf(const ucp_proto_common_init_params_t *params, rkey_config = &worker->rkey_config[params->super.rkey_cfg_index]; distance = rkey_config->lanes_distance[lane]; ucp_proto_common_update_lane_perf_by_distance( - tl_perf, perf_node, &distance, "remote system", "sys-dev %d %s", + tl_perf, &distance, "remote system", "sys-dev %d %s", rkey_config->key.sys_dev, ucs_memory_type_names[rkey_config->key.mem_type]); } @@ -451,20 +450,18 @@ ucp_proto_common_get_lane_perf(const ucp_proto_common_init_params_t *params, params->hdr_size); ucs_assert(tl_perf->sys_latency >= 0); - ucp_proto_perf_node_add_bandwidth(perf_node, "bw", tl_perf->bandwidth); - ucp_proto_perf_node_add_scalar(perf_node, "lat", tl_perf->latency); - ucp_proto_perf_node_add_scalar(perf_node, "sys-lat", tl_perf->sys_latency); - ucp_proto_perf_node_add_scalar(perf_node, "send-pre", + ucp_proto_perf_node_add_bandwidth(tl_perf->node, "bw", tl_perf->bandwidth); + ucp_proto_perf_node_add_scalar(tl_perf->node, "lat", tl_perf->latency); + ucp_proto_perf_node_add_scalar(tl_perf->node, "sys-lat", tl_perf->sys_latency); + ucp_proto_perf_node_add_scalar(tl_perf->node, "send-pre", tl_perf->send_pre_overhead); - ucp_proto_perf_node_add_scalar(perf_node, "send-post", + ucp_proto_perf_node_add_scalar(tl_perf->node, "send-post", tl_perf->send_post_overhead); - ucp_proto_perf_node_add_scalar(perf_node, "recv", tl_perf->recv_overhead); - - *perf_node_p = perf_node; + ucp_proto_perf_node_add_scalar(tl_perf->node, "recv", tl_perf->recv_overhead); return UCS_OK; err_deref_perf_node: - ucp_proto_perf_node_deref(&perf_node); + ucp_proto_perf_node_deref(&tl_perf->node); return status; } @@ -686,7 +683,6 @@ ucp_proto_common_find_lanes(const ucp_proto_init_params_t *params, continue; } - ucs_trace("%s: added as lane %d", lane_desc, lane); lanes[num_lanes++] = lane; } @@ -939,3 +935,394 @@ void ucp_proto_fatal_invalid_stage(ucp_request_t *req, const char *func_name) req->send.proto_config->proto->name, req->send.proto_stage, func_name); } + +static UCS_F_ALWAYS_INLINE double +ucp_proto_lane_select_avail_bw(const ucp_proto_init_params_t *params, + ucp_lane_index_t lane, + const ucp_proto_lane_select_t *select, + const ucp_proto_lane_selection_t *selection) +{ + /* Minimal path ratio */ + static const double MIN_RATIO = 0.01; + ucp_context_h context = params->worker->context; + double multi_path_ratio = context->config.ext.multi_path_ratio; + ucp_rsc_index_t dev_index = ucp_proto_common_get_dev_index(params, lane); + uint8_t path_index = selection->dev_count[dev_index]; + const ucp_proto_common_tl_perf_t *lane_perf; + double ratio; + + lane_perf = &select->lanes_perf[lane]; + if (UCS_CONFIG_DBL_IS_AUTO(multi_path_ratio)) { + ratio = ucs_min(1.0 - (lane_perf->path_ratio * path_index), + lane_perf->path_ratio); + } else { + ratio = 1.0 - (multi_path_ratio * path_index); + } + + if (ratio < MIN_RATIO) { + /* The iface BW is entirely consumed by the selected paths. But we still + * need to assign some minimal BW to the these extra paths in order to + * select them. We divide min ratio of the iface BW by path_index so + * that each additional path on the same device has lower bandwidth. */ + ratio = MIN_RATIO / path_index; + } + + ucs_trace("ratio=%0.3f path_index=%u avail_bw=" UCP_PROTO_PERF_FUNC_BW_FMT + " " UCP_PROTO_LANE_FMT, ratio, path_index, + (lane_perf->bandwidth * ratio) / UCS_MBYTE, + UCP_PROTO_LANE_ARG(params, lane, lane_perf)); + + return lane_perf->bandwidth * ratio; +} + +static UCS_F_ALWAYS_INLINE double +ucp_proto_lane_select_score(double bw, ucp_lane_index_t lane, + const ucp_proto_lane_select_t *select, + const ucp_proto_lane_selection_t *selection) +{ + size_t msg_size = ucp_proto_common_get_variant_msg_size(selection->variant); + const ucp_proto_common_tl_perf_t *lane_perf, *best_lane_perf; + double latency, best_latency; + + lane_perf = &select->lanes_perf[lane]; + latency = lane_perf->latency + lane_perf->sys_latency; + + /* For latency variant, do not select lanes with higher latency than the + * best lane found so far */ + if ((selection->variant == UCP_PROTO_VARIANT_LAT) && + (selection->num_lanes > 0)) { + best_lane_perf = &select->lanes_perf[selection->lanes[0]]; + best_latency = best_lane_perf->latency + best_lane_perf->sys_latency; + if (ucp_score_cmp(latency, best_latency) > 0) { + return -1.0; + } + } + + return 1.0 / ((msg_size / bw) + latency); +} + +static ucp_lane_index_t +ucp_proto_lane_select_find(const ucp_proto_init_params_t *params, + const ucp_proto_lane_select_t *select, + const ucp_proto_lane_selection_t *selection, + ucp_lane_map_t index_map) +{ + double max_score = 0.0; + ucp_lane_index_t max_index = UCP_NULL_LANE; + double score; + double avail_bw; + ucp_lane_index_t lane, index; + + ucs_assert(index_map != 0); + ucs_for_each_bit(index, index_map) { + lane = select->lanes[index]; + avail_bw = ucp_proto_lane_select_avail_bw(params, lane, select, selection); + score = ucp_proto_lane_select_score(avail_bw, lane, select, selection); + if (score > max_score) { + max_score = score; + max_index = index; + } + } + + return max_index; +} + +static UCS_F_ALWAYS_INLINE void +ucp_proto_lane_select_add(const ucp_proto_init_params_t *params, + ucp_proto_lane_selection_t *selection, + ucp_lane_index_t lane) +{ + ucp_rsc_index_t dev_index = ucp_proto_common_get_dev_index(params, lane); + + ucs_assertv(selection->num_lanes < UCP_PROTO_MAX_LANES, + "selection num_lanes=%u max_lanes=%u", selection->num_lanes, + UCP_PROTO_MAX_LANES); + selection->lanes[selection->num_lanes++] = lane; + selection->lane_map |= UCS_BIT(lane); + selection->dev_count[dev_index]++; +} + +static void +ucp_proto_lane_select(const ucp_proto_init_params_t *params, + const ucp_proto_lane_select_req_t *req, + const ucp_proto_lane_select_t *select, + ucp_proto_variant_t variant, + ucp_proto_lane_selection_t *selection) +{ + ucp_lane_index_t i, lane_index; + ucp_lane_map_t index_map; + + memset(selection, 0, sizeof(*selection)); + selection->variant = variant; + + /* Select all available indexes */ + index_map = UCS_MASK(select->num_lanes); + + if (req->fixed_first_lane) { + ucp_proto_lane_select_add(params, selection, select->lanes[0]); + index_map &= ~UCS_BIT(0); + } + + i = req->fixed_first_lane ? 1 : 0; + for (; i < ucs_min(req->max_lanes, select->num_lanes); ++i) { + /* Greedy algorithm: find the best option at every step */ + lane_index = ucp_proto_lane_select_find(params, select, selection, + index_map); + if (lane_index == UCP_NULL_LANE) { + break; + } + + ucp_proto_lane_select_add(params, selection, select->lanes[lane_index]); + index_map &= ~UCS_BIT(lane_index); + } +} + +static void +ucp_proto_lane_select_fast_lanes(const ucp_proto_init_params_t *params, + const ucp_proto_lane_select_req_t *req, + const ucp_proto_lane_select_t *select, + ucp_proto_lane_selection_t *selection) +{ + ucp_context_h context = params->worker->context; + const double max_bw_ratio = context->config.ext.multi_lane_max_ratio; + double max_bandwidth = 0; + ucp_proto_lane_selection_t fast = {0}; + + const ucp_proto_common_tl_perf_t *lane_perf; + ucp_lane_index_t i, lane; + + /* Get bandwidth of all lanes and max_bandwidth */ + for (i = 0; i < selection->num_lanes; ++i) { + lane = selection->lanes[i]; + lane_perf = &select->lanes_perf[lane]; + + /* Calculate maximal BW of all selected lanes, to skip slow lanes */ + max_bandwidth = ucs_max(max_bandwidth, lane_perf->bandwidth); + } + + /* Add fast lanes to the selection */ + for (i = 0; i < selection->num_lanes; ++i) { + lane = selection->lanes[i]; + lane_perf = &select->lanes_perf[lane]; + + if ((req->fixed_first_lane && (i == 0)) || + ((lane_perf->bandwidth * max_bw_ratio) >= max_bandwidth)) { + ucp_proto_lane_select_add(params, &fast, lane); + } else { + ucs_trace("drop " UCP_PROTO_LANE_FMT, + UCP_PROTO_LANE_ARG(params, lane, lane_perf)); + } + } + + fast.variant = selection->variant; + *selection = fast; +} + +static void +ucp_proto_lane_select_trace(const ucp_proto_init_params_t *params, + const ucp_proto_lane_select_t *select, + const ucp_proto_lane_selection_t *selection, + const char *desc, ucs_log_level_t level) +{ + ucp_context_h UCS_V_UNUSED context = params->worker->context; + const ucp_proto_common_tl_perf_t *lane_perf; + ucp_lane_index_t i, lane; + ucp_rsc_index_t rsc_index; + + if (!ucs_log_is_enabled(level)) { + return; + } + + ucs_log(level, "%s(%s)=%u, protocol=%s", desc, + ucp_proto_common_get_variant_name(selection->variant), + selection->num_lanes, ucp_proto_id_field(params->proto_id, name)); + ucs_log_indent(1); + + for (i = 0; i < selection->num_lanes; ++i) { + lane = selection->lanes[i]; + lane_perf = &select->lanes_perf[lane]; + rsc_index = ucp_proto_common_get_rsc_index(params, lane); + + ucs_log(level, "lane[%d] " UCT_TL_RESOURCE_DESC_FMT " bw " + UCP_PROTO_PERF_FUNC_BW_FMT UCP_PROTO_TIME_FMT(latency) + UCP_PROTO_TIME_FMT(sys_latency) + UCP_PROTO_TIME_FMT(send_pre_overhead) + UCP_PROTO_TIME_FMT(send_post_overhead) + UCP_PROTO_TIME_FMT(recv_overhead), lane, + UCT_TL_RESOURCE_DESC_ARG(&context->tl_rscs[rsc_index].tl_rsc), + (lane_perf->bandwidth / UCS_MBYTE), + UCP_PROTO_TIME_ARG(lane_perf->latency), + UCP_PROTO_TIME_ARG(lane_perf->sys_latency), + UCP_PROTO_TIME_ARG(lane_perf->send_pre_overhead), + UCP_PROTO_TIME_ARG(lane_perf->send_post_overhead), + UCP_PROTO_TIME_ARG(lane_perf->recv_overhead)); + } + + ucs_log_indent(-1); +} + +static void +ucp_proto_lane_select_distinct(ucp_proto_lane_select_t *select) +{ + ucp_lane_index_t num_distinct = 1; + ucp_lane_index_t i, j; + int unique; + + for (i = num_distinct; i < select->num_selections; ++i) { + unique = 1; + for (j = 0; j < num_distinct; ++j) { + if (select->selections[i].lane_map == select->selections[j].lane_map) { + unique = 0; + break; + } + } + + if (unique) { + if (i != num_distinct) { + select->selections[num_distinct] = select->selections[i]; + } + + ++num_distinct; + } + } + + select->num_selections = num_distinct; +} + +static void +ucp_proto_lane_select_aggregate(const ucp_proto_lane_select_t *select, + ucp_proto_lane_selection_t *selection) +{ + ucp_proto_common_tl_perf_t *perf = &selection->perf; + const ucp_proto_common_tl_perf_t *lane_perf; + ucp_lane_index_t lane; + + /* TODO: Adjust performance estimation based on the actual resource usage, + * i.e. split the overall iface BW between all selected paths. */ + + ucs_for_each_bit(lane, selection->lane_map) { + lane_perf = &select->lanes_perf[lane]; + + /* Update aggregated performance metric */ + perf->bandwidth += lane_perf->bandwidth; + perf->send_pre_overhead += lane_perf->send_pre_overhead; + perf->send_post_overhead += lane_perf->send_post_overhead; + perf->recv_overhead += lane_perf->recv_overhead; + perf->latency = ucs_max(perf->latency, lane_perf->latency); + perf->sys_latency = ucs_max(perf->sys_latency, + lane_perf->sys_latency); + } + + if (selection->num_lanes == 1) { + lane_perf = &select->lanes_perf[selection->lanes[0]]; + ucp_proto_perf_node_ref(lane_perf->node); + selection->perf.node = lane_perf->node; + } else { + selection->perf.node = ucp_proto_perf_node_new_data("multi", "%u lanes", + selection->num_lanes); + ucs_for_each_bit(lane, selection->lane_map) { + lane_perf = &select->lanes_perf[lane]; + ucp_proto_perf_node_add_child(selection->perf.node, lane_perf->node); + } + } +} + +ucs_status_t +ucp_proto_lane_select_init(const ucp_proto_common_init_params_t *params, + const ucp_proto_lane_select_req_t *req, + ucp_proto_lane_select_t **select_p) +{ + ucp_worker_h worker = params->super.worker; + ucp_context_h context = worker->context; + ucp_proto_common_tl_perf_t *lane_perf; + ucp_lane_index_t i, lane; + ucp_proto_lane_select_t *select; + ucp_proto_lane_selection_t *selection; + ucs_status_t status; + ucp_proto_variant_t variant, max_variant; + + if (worker->proto_select_mp.data != NULL) { + select = ucs_mpool_get(&worker->proto_select_mp); + } else { + select = ucs_malloc(sizeof(*select), "ucp_proto_lane_select_t"); + } + if (select == NULL) { + return UCS_ERR_NO_MEMORY; + } + + memset(select, 0, sizeof(*select)); + select->mp_alloc = worker->proto_select_mp.data != NULL; + + /* Get bandwidth of all lanes */ + for (i = 0; i < req->num_lanes; ++i) { + lane = req->lanes[i]; + lane_perf = &select->lanes_perf[lane]; + + status = ucp_proto_common_get_lane_perf(params, lane, lane_perf); + if (status != UCS_OK) { + ucs_mpool_put(select); + return status; + } + + select->lanes[select->num_lanes++] = lane; + ucs_trace("avail " UCP_PROTO_LANE_FMT, + UCP_PROTO_LANE_ARG(¶ms->super, lane, lane_perf)); + } + + /* Initialize all protocol variants */ + max_variant = context->config.ext.proto_variants_enable ? + UCP_PROTO_VARIANT_LAST : 1; + for (variant = 0; variant < max_variant; ++variant) { + selection = &select->selections[variant]; + ucp_proto_lane_select(¶ms->super, req, select, variant, selection); + ucp_proto_lane_select_fast_lanes(¶ms->super, req, select, selection); + ucp_proto_lane_select_trace(¶ms->super, select, selection, + "found variant", UCS_LOG_LEVEL_TRACE); + ++select->num_selections; + } + + /* Select distinct protocol variants */ + ucp_proto_lane_select_distinct(select); + ucs_assert(select->num_selections > 0); + + /* Initialize selected distinct protocol variants */ + for (i = 0; i < select->num_selections; ++i) { + selection = &select->selections[i]; + + /* Append variant name only if there are multiple variants */ + snprintf(selection->name, sizeof(selection->name), "%s%s", req->perf_name, + (select->num_selections > 1 ? + ucp_proto_common_get_variant_name(selection->variant) : + "")); + + ucp_proto_lane_select_aggregate(select, selection); + ucp_proto_lane_select_trace(¶ms->super, select, selection, + "selected variant", UCS_LOG_LEVEL_DEBUG); + } + + *select_p = select; + return UCS_OK; +} + +void ucp_proto_lane_select_destroy(ucp_proto_lane_select_t *select) +{ + ucp_proto_common_tl_perf_t *lane_perf; + ucp_lane_index_t i, lane; + + /* Deref unused nodes */ + for (i = 0; i < select->num_lanes; ++i) { + lane = select->lanes[i]; + lane_perf = &select->lanes_perf[lane]; + ucp_proto_perf_node_deref(&lane_perf->node); + } + + for (i = 0; i < select->num_selections; ++i) { + ucp_proto_perf_node_deref(&select->selections[i].perf.node); + } + + if (select->mp_alloc) { + ucs_mpool_put(select); + } else { + ucs_free(select); + } +} diff --git a/src/ucp/proto/proto_common.h b/src/ucp/proto/proto_common.h index cd421a13852..95936adb038 100644 --- a/src/ucp/proto/proto_common.h +++ b/src/ucp/proto/proto_common.h @@ -176,15 +176,60 @@ typedef struct { /* Maximum single message length */ size_t max_frag; + + /* Performance selection tree node */ + ucp_proto_perf_node_t *node; } ucp_proto_common_tl_perf_t; +/** + * Protocol selection variants macro, used to iterate over all variants. + * The second argument is the message size for the variant, which is used to + * calculate the score for the variant. The third argument is the name of the + * variant, which is used to print the variant name. + */ +#define UCP_FOREACH_PROTO_VARIANT(_macro) \ + _macro(UCP_PROTO_VARIANT_BW, UCS_GBYTE, "(bw)") \ + _macro(UCP_PROTO_VARIANT_LAT, UCS_KBYTE, "(lat)") + +/** + * Protocol selection variant enum + */ +#define UCP_PROTO_VARIANT_ENUMIFY(ID, MSG_SIZE, NAME) ID, +typedef enum { + UCP_FOREACH_PROTO_VARIANT(UCP_PROTO_VARIANT_ENUMIFY) + UCP_PROTO_VARIANT_LAST +} ucp_proto_variant_t; + + +typedef struct { + ucp_lane_map_t lane_map; + ucp_lane_index_t lanes[UCP_PROTO_MAX_LANES]; + ucp_lane_index_t num_lanes; + uint8_t dev_count[UCP_MAX_RESOURCES]; + ucp_proto_common_tl_perf_t perf; + ucp_proto_variant_t variant; + char name[UCP_PROTO_DESC_STR_MAX]; +} ucp_proto_lane_selection_t; + + typedef struct { - ucp_lane_map_t lane_map; + const char *perf_name; ucp_lane_index_t lanes[UCP_PROTO_MAX_LANES]; ucp_lane_index_t num_lanes; - uint8_t dev_count[UCP_MAX_RESOURCES]; -} ucp_proto_lane_selection_t; + ucp_lane_index_t max_lanes; + int fixed_first_lane; +} ucp_proto_lane_select_req_t; + + +typedef struct { + ucp_lane_index_t lanes[UCP_PROTO_MAX_LANES]; + ucp_lane_index_t num_lanes; + ucp_proto_common_tl_perf_t lanes_perf[UCP_PROTO_MAX_LANES]; + ucp_proto_lane_selection_t selections[UCP_PROTO_VARIANT_LAST]; + ucp_lane_index_t num_selections; + int mp_alloc; +} ucp_proto_lane_select_t; /* Private data per lane */ @@ -279,8 +324,7 @@ void ucp_proto_common_lane_perf_node(ucp_context_h context, ucs_status_t ucp_proto_common_get_lane_perf(const ucp_proto_common_init_params_t *params, ucp_lane_index_t lane, - ucp_proto_common_tl_perf_t *perf, - ucp_proto_perf_node_t **perf_node_p); + ucp_proto_common_tl_perf_t *perf); typedef int (*ucp_proto_common_filter_lane_cb_t)( @@ -361,4 +405,11 @@ void ucp_proto_reset_fatal_not_implemented(ucp_request_t *req); void ucp_proto_fatal_invalid_stage(ucp_request_t *req, const char *func_name); +ucs_status_t +ucp_proto_lane_select_init(const ucp_proto_common_init_params_t *params, + const ucp_proto_lane_select_req_t *req, + ucp_proto_lane_select_t **select_p); + +void ucp_proto_lane_select_destroy(ucp_proto_lane_select_t *select); + #endif diff --git a/src/ucp/proto/proto_common.inl b/src/ucp/proto/proto_common.inl index c544a5c8be0..1f7048f0ab5 100644 --- a/src/ucp/proto/proto_common.inl +++ b/src/ucp/proto/proto_common.inl @@ -407,4 +407,34 @@ ucp_proto_common_get_dev_index(const ucp_proto_init_params_t *params, return params->worker->context->tl_rscs[rsc_index].dev_index; } +static UCS_F_ALWAYS_INLINE size_t +ucp_proto_common_get_variant_msg_size(ucp_proto_variant_t variant) +{ +#define UCP_PROTO_VARIANT_IT(ID, MSG_SIZE, _) case ID: return MSG_SIZE; + switch (variant) { + UCP_FOREACH_PROTO_VARIANT(UCP_PROTO_VARIANT_IT) + default: ucs_fatal("unexpected variant %d", variant); + } +#undef UCP_PROTO_VARIANT_IT + return 0; +} + +static UCS_F_ALWAYS_INLINE const char* +ucp_proto_common_get_variant_name(ucp_proto_variant_t variant) +{ +#define UCP_PROTO_VARIANT_IT(ID, _, NAME) case ID: return NAME; + switch (variant) { + UCP_FOREACH_PROTO_VARIANT(UCP_PROTO_VARIANT_IT) + default: ucs_fatal("unexpected variant %d", variant); + } +#undef UCP_PROTO_VARIANT_IT + return NULL; +} + +static UCS_F_ALWAYS_INLINE size_t +ucp_proto_common_get_variants_count(ucp_context_h context) +{ + return context->config.ext.proto_variants_enable ? UCP_PROTO_VARIANT_LAST : 1; +} + #endif diff --git a/src/ucp/proto/proto_init.c b/src/ucp/proto/proto_init.c index 724d99fcff2..bce2884f868 100644 --- a/src/ucp/proto/proto_init.c +++ b/src/ucp/proto/proto_init.c @@ -129,7 +129,6 @@ ucp_proto_init_skip_recv_overhead(const ucp_proto_common_init_params_t *params, static ucs_status_t ucp_proto_init_add_tl_perf(const ucp_proto_common_init_params_t *params, const ucp_proto_common_tl_perf_t *tl_perf, - ucp_proto_perf_node_t *const tl_perf_node, size_t range_start, size_t range_end, ucp_proto_perf_t *perf) { @@ -185,7 +184,7 @@ ucp_proto_init_add_tl_perf(const ucp_proto_common_init_params_t *params, return ucp_proto_perf_add_funcs(perf, range_start, range_end, perf_factors, ucp_proto_perf_node_new_data("transport", ""), - tl_perf_node); + tl_perf->node); } /** @@ -503,7 +502,6 @@ ucp_proto_common_check_mem_access(const ucp_proto_common_init_params_t *params) ucs_status_t ucp_proto_init_perf(const ucp_proto_common_init_params_t *params, const ucp_proto_common_tl_perf_t *tl_perf, - ucp_proto_perf_node_t *const tl_perf_node, ucp_md_map_t reg_md_map, const char *perf_name, ucp_proto_perf_t **perf_p) { @@ -532,8 +530,8 @@ ucs_status_t ucp_proto_init_perf(const ucp_proto_common_init_params_t *params, return status; } - status = ucp_proto_init_add_tl_perf(params, tl_perf, tl_perf_node, - range_start, range_end, perf); + status = ucp_proto_init_add_tl_perf(params, tl_perf, range_start, range_end, + perf); if (status != UCS_OK) { goto err_cleanup_perf; } diff --git a/src/ucp/proto/proto_init.h b/src/ucp/proto/proto_init.h index 4bff6472b01..8ff0c1516b2 100644 --- a/src/ucp/proto/proto_init.h +++ b/src/ucp/proto/proto_init.h @@ -68,7 +68,6 @@ ucp_proto_init_add_buffer_copy_time(ucp_worker_h worker, const char *title, ucs_status_t ucp_proto_init_perf(const ucp_proto_common_init_params_t *params, const ucp_proto_common_tl_perf_t *tl_perf, - ucp_proto_perf_node_t *const tl_perf_node, ucp_md_map_t reg_md_map, const char *perf_name, ucp_proto_perf_t **perf_p); diff --git a/src/ucp/proto/proto_multi.c b/src/ucp/proto/proto_multi.c index 149f770a368..f3d8e6eff35 100644 --- a/src/ucp/proto/proto_multi.c +++ b/src/ucp/proto/proto_multi.c @@ -20,127 +20,6 @@ #include -static UCS_F_ALWAYS_INLINE double -ucp_proto_multi_get_avail_bw(const ucp_proto_init_params_t *params, - ucp_lane_index_t lane, - const ucp_proto_common_tl_perf_t *lane_perf, - const ucp_proto_lane_selection_t *selection) -{ - /* Minimal path ratio */ - static const double MIN_RATIO = 0.01; - ucp_context_h context = params->worker->context; - double multi_path_ratio = context->config.ext.multi_path_ratio; - ucp_rsc_index_t dev_index = ucp_proto_common_get_dev_index(params, lane); - uint8_t path_index = selection->dev_count[dev_index]; - double ratio; - - if (UCS_CONFIG_DBL_IS_AUTO(multi_path_ratio)) { - ratio = ucs_min(1.0 - (lane_perf->path_ratio * path_index), - lane_perf->path_ratio); - } else { - ratio = 1.0 - (multi_path_ratio * path_index); - } - - if (ratio < MIN_RATIO) { - /* The iface BW is entirely consumed by the selected paths. But we still - * need to assign some minimal BW to the these extra paths in order to - * select them. We divide min ratio of the iface BW by path_index so - * that each additional path on the same device has lower bandwidth. */ - ratio = MIN_RATIO / path_index; - } - - ucs_trace("ratio=%0.3f path_index=%u avail_bw=" UCP_PROTO_PERF_FUNC_BW_FMT - " " UCP_PROTO_LANE_FMT, ratio, path_index, - (lane_perf->bandwidth * ratio) / UCS_MBYTE, - UCP_PROTO_LANE_ARG(params, lane, lane_perf)); - return lane_perf->bandwidth * ratio; -} - -static ucp_lane_index_t -ucp_proto_multi_find_max_avail_bw_lane(const ucp_proto_init_params_t *params, - const ucp_lane_index_t *lanes, - const ucp_proto_common_tl_perf_t *lanes_perf, - const ucp_proto_lane_selection_t *selection, - ucp_lane_map_t index_map) -{ - /* Initial value is 1Bps, so we don't consider lanes with lower available - * bandwidth. */ - double max_avail_bw = 1.0; - ucp_lane_index_t max_index = UCP_NULL_LANE; - double avail_bw; - const ucp_proto_common_tl_perf_t *lane_perf; - ucp_lane_index_t lane, index; - - ucs_assert(index_map != 0); - ucs_for_each_bit(index, index_map) { - lane = lanes[index]; - lane_perf = &lanes_perf[lane]; - avail_bw = ucp_proto_multi_get_avail_bw(params, lane, lane_perf, - selection); - if (avail_bw > max_avail_bw) { - max_avail_bw = avail_bw; - max_index = index; - } - } - - return max_index; -} - -static UCS_F_ALWAYS_INLINE void -ucp_proto_select_add_lane(ucp_proto_lane_selection_t *selection, - const ucp_proto_init_params_t *params, - ucp_lane_index_t lane) -{ - ucp_rsc_index_t dev_index = ucp_proto_common_get_dev_index(params, lane); - - ucs_assertv(selection->num_lanes < UCP_PROTO_MAX_LANES, - "selection num_lanes=%u max_lanes=%u", selection->num_lanes, - UCP_PROTO_MAX_LANES); - selection->lanes[selection->num_lanes++] = lane; - selection->lane_map |= UCS_BIT(lane); - selection->dev_count[dev_index]++; -} - -static void -ucp_proto_multi_select_bw_lanes(const ucp_proto_init_params_t *params, - const ucp_lane_index_t *lanes, - ucp_lane_index_t num_lanes, - ucp_lane_index_t max_lanes, - const ucp_proto_common_tl_perf_t *lanes_perf, - int fixed_first_lane, - ucp_proto_lane_selection_t *selection) -{ - ucp_lane_index_t i, lane_index; - ucp_lane_map_t index_map; - - memset(selection, 0, sizeof(*selection)); - - /* Select all available indexes */ - index_map = UCS_MASK(num_lanes); - - if (fixed_first_lane) { - ucp_proto_select_add_lane(selection, params, lanes[0]); - index_map &= ~UCS_BIT(0); - } - - for (i = fixed_first_lane? 1 : 0; i < ucs_min(max_lanes, num_lanes); ++i) { - /* Greedy algorithm: find the best option at every step */ - lane_index = ucp_proto_multi_find_max_avail_bw_lane(params, lanes, - lanes_perf, selection, - index_map); - if (lane_index == UCP_NULL_LANE) { - break; - } - - ucp_proto_select_add_lane(selection, params, lanes[lane_index]); - index_map &= ~UCS_BIT(lane_index); - } - - /* TODO: Aggregate performance: - * Split full iface bandwidth between selected paths, according to the total - * path ratio */ -} - static ucp_sys_dev_map_t ucp_proto_multi_init_flush_sys_dev_mask(const ucp_rkey_config_key_t *key) { @@ -151,165 +30,51 @@ ucp_proto_multi_init_flush_sys_dev_mask(const ucp_rkey_config_key_t *key) return UCS_BIT(key->sys_dev & ~UCP_SYS_DEVICE_FLUSH_BIT); } -ucs_status_t ucp_proto_multi_init(const ucp_proto_multi_init_params_t *params, - const char *perf_name, - ucp_proto_perf_t **perf_p, - ucp_proto_multi_priv_t *mpriv) +ucs_status_t +ucp_proto_multi_init_perf(const ucp_proto_multi_init_params_t *params, + const ucp_proto_lane_select_t *select, + ucp_proto_lane_selection_t *selection, + ucp_proto_perf_t **perf_p, + ucp_proto_multi_priv_t *mpriv) { - ucp_context_h context = params->super.super.worker->context; - const double max_bw_ratio = context->config.ext.multi_lane_max_ratio; - ucp_proto_perf_node_t *lanes_perf_nodes[UCP_PROTO_MAX_LANES]; - ucp_proto_common_tl_perf_t lanes_perf[UCP_PROTO_MAX_LANES]; - ucp_proto_common_tl_perf_t *lane_perf, perf; - ucp_lane_index_t lanes[UCP_PROTO_MAX_LANES]; - double max_bandwidth, max_frag_ratio, min_bandwidth; - ucp_lane_index_t i, lane, num_lanes, num_fast_lanes; + double max_frag_ratio = 0; + double min_bandwidth = DBL_MAX; + ucp_lane_index_t i, lane; ucp_proto_multi_lane_priv_t *lpriv; - ucp_proto_perf_node_t *perf_node; + const ucp_proto_common_tl_perf_t *lane_perf; size_t max_frag, min_length, min_end_offset, min_chunk; - ucp_proto_lane_selection_t selection; ucp_md_map_t reg_md_map; uint32_t weight_sum; - ucs_status_t status; - int fixed_first_lane; - - ucs_assert(params->max_lanes <= UCP_PROTO_MAX_LANES); - - if ((ucp_proto_select_op_flags(params->super.super.select_param) & - UCP_PROTO_SELECT_OP_FLAG_RESUME) && - !(params->super.flags & UCP_PROTO_COMMON_INIT_FLAG_RESUME)) { - return UCS_ERR_UNSUPPORTED; - } - - if (!ucp_proto_common_init_check_err_handling(¶ms->super) || - (params->max_lanes == 0)) { - return UCS_ERR_UNSUPPORTED; - } - - if (!ucp_proto_common_check_memtype_copy(¶ms->super)) { - return UCS_ERR_UNSUPPORTED; - } - - /* Find first lane */ - num_lanes = ucp_proto_common_find_lanes( - ¶ms->super.super, params->super.flags, params->first.lane_type, - params->first.tl_cap_flags, 1, 0, ucp_proto_common_filter_min_frag, - lanes); - if (num_lanes == 0) { - ucs_trace("no lanes for %s", - ucp_proto_id_field(params->super.super.proto_id, name)); - return UCS_ERR_NO_ELEM; - } - - /* Find rest of the lanes */ - num_lanes += ucp_proto_common_find_lanes( - ¶ms->super.super, params->super.flags, params->middle.lane_type, - params->middle.tl_cap_flags, UCP_PROTO_MAX_LANES - 1, - UCS_BIT(lanes[0]), ucp_proto_common_filter_min_frag, lanes + 1); - - /* Get bandwidth of all lanes and max_bandwidth */ - max_bandwidth = 0; - for (i = 0; i < num_lanes; ++i) { - lane = lanes[i]; - lane_perf = &lanes_perf[lane]; - - status = ucp_proto_common_get_lane_perf(¶ms->super, lane, lane_perf, - &lanes_perf_nodes[lane]); - if (status != UCS_OK) { - return status; - } - - /* Calculate maximal bandwidth of all lanes, to skip slow lanes */ - max_bandwidth = ucs_max(max_bandwidth, lane_perf->bandwidth); - } - - /* Select the lanes to use, and calculate their aggregate performance */ - perf.bandwidth = 0; - perf.send_pre_overhead = 0; - perf.send_post_overhead = 0; - perf.recv_overhead = 0; - perf.latency = 0; - perf.sys_latency = 0; - max_frag_ratio = 0; - min_bandwidth = DBL_MAX; - - /* Filter out slow lanes */ - fixed_first_lane = params->first.lane_type != params->middle.lane_type; - for (i = fixed_first_lane ? 1 : 0, num_fast_lanes = i; i < num_lanes; ++i) { - lane = lanes[i]; - lane_perf = &lanes_perf[lane]; - if ((lane_perf->bandwidth * max_bw_ratio) < max_bandwidth) { - /* Bandwidth on this lane is too low compared to the fastest - available lane, so it's not worth using it */ - ucp_proto_perf_node_deref(&lanes_perf_nodes[lane]); - ucs_trace("drop " UCP_PROTO_LANE_FMT, - UCP_PROTO_LANE_ARG(¶ms->super.super, lane, lane_perf)); - } else { - lanes[num_fast_lanes++] = lane; - ucs_trace("avail " UCP_PROTO_LANE_FMT, - UCP_PROTO_LANE_ARG(¶ms->super.super, lane, lane_perf)); - } - } - num_lanes = num_fast_lanes; - ucp_proto_multi_select_bw_lanes(¶ms->super.super, lanes, num_lanes, - params->max_lanes, lanes_perf, - fixed_first_lane, &selection); - - ucs_trace("selected %u lanes for %s", selection.num_lanes, - ucp_proto_id_field(params->super.super.proto_id, name)); - ucs_log_indent(1); - - for (i = 0; i < selection.num_lanes; ++i) { - lane = selection.lanes[i]; - lane_perf = &lanes_perf[lane]; - ucs_trace(UCP_PROTO_LANE_FMT UCP_PROTO_TIME_FMT(send_pre_overhead) - UCP_PROTO_TIME_FMT(send_post_overhead) - UCP_PROTO_TIME_FMT(recv_overhead), - UCP_PROTO_LANE_ARG(¶ms->super.super, lane, lane_perf), - UCP_PROTO_TIME_ARG(lane_perf->send_pre_overhead), - UCP_PROTO_TIME_ARG(lane_perf->send_post_overhead), - UCP_PROTO_TIME_ARG(lane_perf->recv_overhead)); + for (i = 0; i < selection->num_lanes; ++i) { + lane = selection->lanes[i]; + lane_perf = &select->lanes_perf[lane]; /* Calculate maximal bandwidth-to-fragment-size ratio, which is used to adjust fragment sizes so they are proportional to bandwidth ratio and also do not exceed maximal supported size */ max_frag_ratio = ucs_max(max_frag_ratio, lane_perf->bandwidth / lane_perf->max_frag); - - min_bandwidth = ucs_min(min_bandwidth, lane_perf->bandwidth); - - /* Update aggregated performance metric */ - perf.bandwidth += lane_perf->bandwidth; - perf.send_pre_overhead += lane_perf->send_pre_overhead; - perf.send_post_overhead += lane_perf->send_post_overhead; - perf.recv_overhead += lane_perf->recv_overhead; - perf.latency = ucs_max(perf.latency, lane_perf->latency); - perf.sys_latency = ucs_max(perf.sys_latency, - lane_perf->sys_latency); + min_bandwidth = ucs_min(min_bandwidth, lane_perf->bandwidth); } - ucs_log_indent(-1); - /* Initialize multi-lane private data and relative weights */ reg_md_map = ucp_proto_common_reg_md_map(¶ms->super, - selection.lane_map); + selection->lane_map); mpriv->reg_md_map = reg_md_map | params->initial_reg_md_map; - mpriv->lane_map = selection.lane_map; + mpriv->lane_map = selection->lane_map; mpriv->num_lanes = 0; mpriv->min_frag = 0; mpriv->max_frag_sum = 0; mpriv->align_thresh = 1; - perf.max_frag = 0; - perf.min_length = 0; weight_sum = 0; min_end_offset = 0; - ucs_for_each_bit(lane, selection.lane_map) { + ucs_for_each_bit(lane, selection->lane_map) { ucs_assert(lane < UCP_MAX_LANES); lpriv = &mpriv->lanes[mpriv->num_lanes++]; - lane_perf = &lanes_perf[lane]; + lane_perf = &select->lanes_perf[lane]; ucp_proto_common_lane_priv_init(¶ms->super, mpriv->reg_md_map, lane, &lpriv->super); @@ -329,13 +94,14 @@ ucs_status_t ucp_proto_multi_init(const ucp_proto_multi_init_params_t *params, ucs_max(UCP_MIN_BCOPY, lane_perf->bandwidth * params->min_chunk / min_bandwidth)); - max_frag = ucs_max(max_frag, min_chunk); - lpriv->max_frag = max_frag; - perf.max_frag += max_frag; + + max_frag = ucs_max(max_frag, min_chunk); + lpriv->max_frag = max_frag; + selection->perf.max_frag += max_frag; /* Calculate lane weight as a fixed-point fraction */ lpriv->weight = ucs_proto_multi_calc_weight(lane_perf->bandwidth, - perf.bandwidth); + selection->perf.bandwidth); ucs_assert(lpriv->weight > 0); ucs_assert(lpriv->weight <= UCP_PROTO_MULTI_WEIGHT_MAX); @@ -375,7 +141,8 @@ ucs_status_t ucp_proto_multi_init(const ucp_proto_multi_init_params_t *params, lpriv->weight; ucs_assert(ucp_proto_multi_scaled_length(lpriv->weight, min_length) >= lane_perf->min_length); - perf.min_length = ucs_max(perf.min_length, min_length); + selection->perf.min_length = ucs_max(selection->perf.min_length, + min_length); weight_sum += lpriv->weight; min_end_offset += min_chunk; @@ -389,35 +156,64 @@ ucs_status_t ucp_proto_multi_init(const ucp_proto_multi_init_params_t *params, lpriv->flush_sys_dev_mask = ucp_proto_multi_init_flush_sys_dev_mask( params->super.super.rkey_config_key); } - ucs_assert(mpriv->num_lanes == ucs_popcount(selection.lane_map)); - - /* After this block, 'perf_node' and 'lane_perf_nodes[]' have extra ref */ - if (mpriv->num_lanes == 1) { - perf_node = lanes_perf_nodes[ucs_ffs64(selection.lane_map)]; - ucp_proto_perf_node_ref(perf_node); - } else { - perf_node = ucp_proto_perf_node_new_data("multi", "%u lanes", - mpriv->num_lanes); - ucs_for_each_bit(lane, selection.lane_map) { - ucs_assert(lane < UCP_MAX_LANES); - ucp_proto_perf_node_add_child(perf_node, lanes_perf_nodes[lane]); - } + ucs_assert(mpriv->num_lanes == ucs_popcount(selection->lane_map)); + + return ucp_proto_init_perf(¶ms->super, &selection->perf, reg_md_map, + selection->name, perf_p); +} + +ucs_status_t +ucp_proto_multi_init(const ucp_proto_multi_init_params_t *params, + const char *perf_name, ucp_proto_lane_select_t **select_p) +{ + ucp_proto_lane_select_req_t req; + + ucs_assert(params->max_lanes <= UCP_PROTO_MAX_LANES); + + if ((ucp_proto_select_op_flags(params->super.super.select_param) & + UCP_PROTO_SELECT_OP_FLAG_RESUME) && + !(params->super.flags & UCP_PROTO_COMMON_INIT_FLAG_RESUME)) { + return UCS_ERR_UNSUPPORTED; } - status = ucp_proto_init_perf(¶ms->super, &perf, perf_node, reg_md_map, - perf_name, perf_p); + if (!ucp_proto_common_init_check_err_handling(¶ms->super) || + (params->max_lanes == 0)) { + return UCS_ERR_UNSUPPORTED; + } - /* Deref unused nodes */ - for (i = 0; i < num_lanes; ++i) { - ucp_proto_perf_node_deref(&lanes_perf_nodes[lanes[i]]); + if (!ucp_proto_common_check_memtype_copy(¶ms->super)) { + return UCS_ERR_UNSUPPORTED; } - ucp_proto_perf_node_deref(&perf_node); - return status; + /* Find first lane */ + req.num_lanes = ucp_proto_common_find_lanes( + ¶ms->super.super, params->super.flags, params->first.lane_type, + params->first.tl_cap_flags, 1, 0, ucp_proto_common_filter_min_frag, + req.lanes); + if (req.num_lanes == 0) { + ucs_trace("no lanes for %s", + ucp_proto_id_field(params->super.super.proto_id, name)); + return UCS_ERR_NO_ELEM; + } + + /* Find rest of the lanes */ + req.num_lanes += ucp_proto_common_find_lanes( + ¶ms->super.super, params->super.flags, params->middle.lane_type, + params->middle.tl_cap_flags, UCP_PROTO_MAX_LANES - 1, + UCS_BIT(req.lanes[0]), ucp_proto_common_filter_min_frag, + req.lanes + 1); + + /* Initialize lane selection */ + req.perf_name = perf_name; + req.max_lanes = params->max_lanes; + req.fixed_first_lane = params->first.lane_type != params->middle.lane_type; + + return ucp_proto_lane_select_init(¶ms->super, &req, select_p); } size_t ucp_proto_multi_priv_size(const ucp_proto_multi_priv_t *mpriv) { + ucs_assert_always(mpriv->num_lanes <= UCP_MAX_LANES); return ucs_offsetof(ucp_proto_multi_priv_t, lanes) + (mpriv->num_lanes * ucs_field_sizeof(ucp_proto_multi_priv_t, lanes[0])); @@ -427,18 +223,30 @@ void ucp_proto_multi_probe(const ucp_proto_multi_init_params_t *params) { const char *proto_name = ucp_proto_id_field(params->super.super.proto_id, name); + ucp_proto_lane_select_t *select; ucp_proto_multi_priv_t mpriv; + ucp_lane_index_t i; ucp_proto_perf_t *perf; ucs_status_t status; - status = ucp_proto_multi_init(params, proto_name, &perf, &mpriv); + status = ucp_proto_multi_init(params, proto_name, &select); if (status != UCS_OK) { return; } - ucp_proto_select_add_proto(¶ms->super.super, params->super.cfg_thresh, - params->super.cfg_priority, perf, &mpriv, - ucp_proto_multi_priv_size(&mpriv)); + for (i = 0; i < select->num_selections; ++i) { + status = ucp_proto_multi_init_perf(params, select, &select->selections[i], + &perf, &mpriv); + if (status != UCS_OK) { + continue; + } + + ucp_proto_select_add_proto(¶ms->super.super, params->super.cfg_thresh, + params->super.cfg_priority, perf, &mpriv, + ucp_proto_multi_priv_size(&mpriv)); + } + + ucp_proto_lane_select_destroy(select); } static const ucp_ep_config_key_lane_t * diff --git a/src/ucp/proto/proto_multi.h b/src/ucp/proto/proto_multi.h index a129fd08bb5..61e09b77069 100644 --- a/src/ucp/proto/proto_multi.h +++ b/src/ucp/proto/proto_multi.h @@ -167,8 +167,15 @@ typedef ucs_status_t (*ucp_proto_multi_lane_send_func_t)(ucp_request_t *req, ucs_status_t ucp_proto_multi_init(const ucp_proto_multi_init_params_t *params, const char *perf_name, - ucp_proto_perf_t **perf_p, - ucp_proto_multi_priv_t *mpriv); + ucp_proto_lane_select_t **select_p); + + +ucs_status_t +ucp_proto_multi_init_perf(const ucp_proto_multi_init_params_t *params, + const ucp_proto_lane_select_t *select, + ucp_proto_lane_selection_t *selection, + ucp_proto_perf_t **perf_p, + ucp_proto_multi_priv_t *mpriv); size_t ucp_proto_multi_priv_size(const ucp_proto_multi_priv_t *mpriv); diff --git a/src/ucp/proto/proto_single.c b/src/ucp/proto/proto_single.c index 2441cc08475..866d62771c0 100644 --- a/src/ucp/proto/proto_single.c +++ b/src/ucp/proto/proto_single.c @@ -24,7 +24,6 @@ ucs_status_t ucp_proto_single_init(const ucp_proto_single_init_params_t *params, { const char *proto_name = ucp_proto_id_field(params->super.super.proto_id, name); - ucp_proto_perf_node_t *tl_perf_node; ucp_proto_common_tl_perf_t tl_perf; ucp_lane_index_t num_lanes; ucp_md_map_t reg_md_map; @@ -57,15 +56,14 @@ ucs_status_t ucp_proto_single_init(const ucp_proto_single_init_params_t *params, ucp_proto_common_lane_priv_init(¶ms->super, reg_md_map, lane, &spriv->super); - status = ucp_proto_common_get_lane_perf(¶ms->super, lane, &tl_perf, - &tl_perf_node); + status = ucp_proto_common_get_lane_perf(¶ms->super, lane, &tl_perf); if (status != UCS_OK) { return status; } - status = ucp_proto_init_perf(¶ms->super, &tl_perf, tl_perf_node, - reg_md_map, proto_name, perf_p); - ucp_proto_perf_node_deref(&tl_perf_node); + status = ucp_proto_init_perf(¶ms->super, &tl_perf, reg_md_map, + proto_name, perf_p); + ucp_proto_perf_node_deref(&tl_perf.node); return status; } diff --git a/src/ucp/rndv/proto_rndv.c b/src/ucp/rndv/proto_rndv.c index cfaf62c637c..789028599a2 100644 --- a/src/ucp/rndv/proto_rndv.c +++ b/src/ucp/rndv/proto_rndv.c @@ -616,22 +616,17 @@ ucp_proto_rndv_ack_init(const ucp_proto_common_init_params_t *init_params, ucs_status_t ucp_proto_rndv_bulk_init(const ucp_proto_multi_init_params_t *init_params, - const char *op_name, const char *ack_name, + const char *ack_name, ucp_proto_perf_t *bulk_perf, ucp_proto_perf_t **perf_p, ucp_proto_rndv_bulk_priv_t *rpriv) { ucp_context_t *context = init_params->super.super.worker->context; size_t rndv_align_thresh = context->config.ext.rndv_align_thresh; ucp_proto_multi_priv_t *mpriv = &rpriv->mpriv; - ucp_proto_perf_t *bulk_perf, *ack_perf; + ucp_proto_perf_t *ack_perf; const char *proto_name; ucs_status_t status; - status = ucp_proto_multi_init(init_params, op_name, &bulk_perf, mpriv); - if (status != UCS_OK) { - return status; - } - /* Adjust align split threshold by user configuration */ mpriv->align_thresh = ucs_max(rndv_align_thresh, mpriv->align_thresh + mpriv->min_frag); diff --git a/src/ucp/rndv/proto_rndv.h b/src/ucp/rndv/proto_rndv.h index 8299b740491..39fb0e3e9ba 100644 --- a/src/ucp/rndv/proto_rndv.h +++ b/src/ucp/rndv/proto_rndv.h @@ -177,7 +177,7 @@ ucp_proto_rndv_ack_init(const ucp_proto_common_init_params_t *init_params, ucs_status_t ucp_proto_rndv_bulk_init(const ucp_proto_multi_init_params_t *init_params, - const char *name, const char *ack_name, + const char *ack_name, ucp_proto_perf_t *bulk_perf, ucp_proto_perf_t **perf_p, ucp_proto_rndv_bulk_priv_t *rpriv); diff --git a/src/ucp/rndv/rndv_get.c b/src/ucp/rndv/rndv_get.c index 9a8fb8c28f4..a4527bd3e58 100644 --- a/src/ucp/rndv/rndv_get.c +++ b/src/ucp/rndv/rndv_get.c @@ -57,7 +57,9 @@ ucp_proto_rndv_get_common_probe(const ucp_proto_init_params_t *init_params, cap.get.opt_zcopy_align), }; ucp_proto_rndv_bulk_priv_t rpriv; - ucp_proto_perf_t *perf; + ucp_proto_lane_select_t *select; + ucp_lane_index_t i; + ucp_proto_perf_t *bulk_perf, *perf; ucs_status_t status; if ((init_params->select_param->dt_class != UCP_DATATYPE_CONTIG) || @@ -66,16 +68,31 @@ ucp_proto_rndv_get_common_probe(const ucp_proto_init_params_t *init_params, return; } - status = ucp_proto_rndv_bulk_init(¶ms, UCP_PROTO_RNDV_GET_DESC, - UCP_PROTO_RNDV_ATS_NAME, &perf, &rpriv); + status = ucp_proto_multi_init(¶ms, UCP_PROTO_RNDV_GET_DESC, &select); if (status != UCS_OK) { return; } - ucp_proto_select_add_proto(¶ms.super.super, params.super.cfg_thresh, - params.super.cfg_priority, perf, &rpriv, - UCP_PROTO_MULTI_EXTENDED_PRIV_SIZE(&rpriv, - mpriv)); + for (i = 0; i < select->num_selections; ++i) { + status = ucp_proto_multi_init_perf(¶ms, select, &select->selections[i], + &bulk_perf, &rpriv.mpriv); + if (status != UCS_OK) { + continue; + } + + status = ucp_proto_rndv_bulk_init(¶ms, UCP_PROTO_RNDV_ATS_NAME, + bulk_perf, &perf, &rpriv); + if (status != UCS_OK) { + continue; + } + + ucp_proto_select_add_proto(¶ms.super.super, params.super.cfg_thresh, + params.super.cfg_priority, perf, &rpriv, + UCP_PROTO_MULTI_EXTENDED_PRIV_SIZE(&rpriv, + mpriv)); + } + + ucp_proto_lane_select_destroy(select); } static UCS_F_ALWAYS_INLINE ucs_status_t diff --git a/src/ucp/rndv/rndv_put.c b/src/ucp/rndv/rndv_put.c index 28948386ffb..a7cc015674a 100644 --- a/src/ucp/rndv/rndv_put.c +++ b/src/ucp/rndv/rndv_put.c @@ -230,6 +230,89 @@ ucp_proto_rndv_put_common_request_init(ucp_request_t *req) return ucp_proto_rndv_bulk_request_init(req, &rpriv->bulk); } +static void +ucp_proto_rndv_put_add_proto(const ucp_proto_multi_init_params_t *params, + uct_completion_callback_t comp_cb, + uint8_t stat_counter, ucp_proto_perf_t *perf, + ucp_proto_rndv_put_priv_t *rpriv) +{ + const size_t atp_size = sizeof(ucp_rndv_ack_hdr_t); + ucp_context_h context = params->super.super.worker->context; + const uct_iface_attr_t *iface_attr; + ucp_lane_index_t lane_idx, lane; + int send_atp, use_fence; + unsigned atp_map; + + send_atp = !ucp_proto_rndv_init_params_is_ppln_frag(¶ms->super.super); + + /* Check which lanes support sending ATP */ + atp_map = 0; + for (lane_idx = 0; lane_idx < rpriv->bulk.mpriv.num_lanes; ++lane_idx) { + lane = rpriv->bulk.mpriv.lanes[lane_idx].super.lane; + iface_attr = ucp_proto_common_get_iface_attr(¶ms->super.super, lane); + if (((iface_attr->cap.flags & UCT_IFACE_FLAG_AM_SHORT) && + (iface_attr->cap.am.max_short >= atp_size)) || + ((iface_attr->cap.flags & UCT_IFACE_FLAG_AM_BCOPY) && + (iface_attr->cap.am.max_bcopy >= atp_size))) { + atp_map |= UCS_BIT(lane); + } + } + + /* Use fence only if all lanes support sending ATP and flush is not forced + */ + use_fence = send_atp && !context->config.ext.rndv_put_force_flush && + (rpriv->bulk.mpriv.lane_map == atp_map); + + /* All lanes can send ATP - invalidate am_lane, to use mpriv->lanes. + * Otherwise, would need to flush all lanes and send ATP on: + * - All lanes supporting ATP send. This ensures that data is flushed + * remotely (i.e. resides in the target buffer), which may not be the case + * with IB transports. An alternative would be to pass + * UCT_FLUSH_FLAG_REMOTE to uct_ep_flush, but using this flag increases + * UCP worker address size. + * TODO: Consider calling UCT ep flush with remote flag when/if address + * size is not an issue anymore. + * - Control lane if none of the lanes support sending ATP + */ + if (use_fence) { + /* Send fence followed by ATP on all lanes */ + rpriv->bulk.super.lane = UCP_NULL_LANE; + rpriv->put_comp_cb = comp_cb; + rpriv->atp_comp_cb = NULL; + rpriv->stage_after_put = UCP_PROTO_RNDV_PUT_STAGE_FENCED_ATP; + rpriv->flush_map = 0; + rpriv->atp_map = rpriv->bulk.mpriv.lane_map; + } else { + /* Flush all lanes and send ATP on all supporting lanes (or control lane + * otherwise) */ + if (send_atp) { + rpriv->put_comp_cb = + ucp_proto_rndv_put_common_flush_completion_send_atp; + rpriv->atp_comp_cb = comp_cb; + rpriv->atp_map = (atp_map == 0) ? + UCS_BIT(rpriv->bulk.super.lane) : atp_map; + } else { + rpriv->put_comp_cb = comp_cb; + rpriv->atp_comp_cb = NULL; + rpriv->atp_map = 0; + } + rpriv->stage_after_put = UCP_PROTO_RNDV_PUT_STAGE_FLUSH; + rpriv->flush_map = rpriv->bulk.mpriv.lane_map; + ucs_assert(rpriv->flush_map != 0); + } + + if (send_atp) { + ucs_assert(rpriv->atp_map != 0); + } + rpriv->atp_num_lanes = ucs_popcount(rpriv->atp_map); + rpriv->stat_counter = stat_counter; + + ucp_proto_select_add_proto(¶ms->super.super, params->super.cfg_thresh, + params->super.cfg_priority, perf, rpriv, + UCP_PROTO_MULTI_EXTENDED_PRIV_SIZE(rpriv, + bulk.mpriv)); +} + static void ucp_proto_rndv_put_common_probe(const ucp_proto_init_params_t *init_params, uint64_t rndv_modes, size_t max_length, @@ -239,7 +322,6 @@ ucp_proto_rndv_put_common_probe(const ucp_proto_init_params_t *init_params, int support_ppln, uint8_t stat_counter, const ucp_memory_info_t *reg_mem_info) { - const size_t atp_size = sizeof(ucp_rndv_ack_hdr_t); ucp_context_t *context = init_params->worker->context; ucp_proto_multi_init_params_t params = { .super.super = *init_params, @@ -273,13 +355,11 @@ ucp_proto_rndv_put_common_probe(const ucp_proto_init_params_t *init_params, .opt_align_offs = ucs_offsetof(uct_iface_attr_t, cap.put.opt_zcopy_align), }; - const uct_iface_attr_t *iface_attr; - ucp_lane_index_t lane_idx, lane; ucp_proto_rndv_put_priv_t rpriv; - int send_atp, use_fence; - ucp_proto_perf_t *perf; + ucp_proto_lane_select_t *select; + ucp_lane_index_t i; + ucp_proto_perf_t *bulk_perf, *perf; ucs_status_t status; - unsigned atp_map; if ((init_params->select_param->dt_class != UCP_DATATYPE_CONTIG) || !ucp_proto_rndv_op_check(init_params, UCP_OP_ID_RNDV_SEND, @@ -288,81 +368,28 @@ ucp_proto_rndv_put_common_probe(const ucp_proto_init_params_t *init_params, return; } - status = ucp_proto_rndv_bulk_init(¶ms, UCP_PROTO_RNDV_PUT_DESC, - UCP_PROTO_RNDV_ATP_NAME, &perf, - &rpriv.bulk); + status = ucp_proto_multi_init(¶ms, UCP_PROTO_RNDV_PUT_DESC, &select); if (status != UCS_OK) { return; } - send_atp = !ucp_proto_rndv_init_params_is_ppln_frag(init_params); - - /* Check which lanes support sending ATP */ - atp_map = 0; - for (lane_idx = 0; lane_idx < rpriv.bulk.mpriv.num_lanes; ++lane_idx) { - lane = rpriv.bulk.mpriv.lanes[lane_idx].super.lane; - iface_attr = ucp_proto_common_get_iface_attr(init_params, lane); - if (((iface_attr->cap.flags & UCT_IFACE_FLAG_AM_SHORT) && - (iface_attr->cap.am.max_short >= atp_size)) || - ((iface_attr->cap.flags & UCT_IFACE_FLAG_AM_BCOPY) && - (iface_attr->cap.am.max_bcopy >= atp_size))) { - atp_map |= UCS_BIT(lane); + for (i = 0; i < select->num_selections; ++i) { + status = ucp_proto_multi_init_perf(¶ms, select, &select->selections[i], + &bulk_perf, &rpriv.bulk.mpriv); + if (status != UCS_OK) { + continue; } - } - - /* Use fence only if all lanes support sending ATP and flush is not forced - */ - use_fence = send_atp && !context->config.ext.rndv_put_force_flush && - (rpriv.bulk.mpriv.lane_map == atp_map); - /* All lanes can send ATP - invalidate am_lane, to use mpriv->lanes. - * Otherwise, would need to flush all lanes and send ATP on: - * - All lanes supporting ATP send. This ensures that data is flushed - * remotely (i.e. resides in the target buffer), which may not be the case - * with IB transports. An alternative would be to pass - * UCT_FLUSH_FLAG_REMOTE to uct_ep_flush, but using this flag increases - * UCP worker address size. - * TODO: Consider calling UCT ep flush with remote flag when/if address - * size is not an issue anymore. - * - Control lane if none of the lanes support sending ATP - */ - if (use_fence) { - /* Send fence followed by ATP on all lanes */ - rpriv.bulk.super.lane = UCP_NULL_LANE; - rpriv.put_comp_cb = comp_cb; - rpriv.atp_comp_cb = NULL; - rpriv.stage_after_put = UCP_PROTO_RNDV_PUT_STAGE_FENCED_ATP; - rpriv.flush_map = 0; - rpriv.atp_map = rpriv.bulk.mpriv.lane_map; - } else { - /* Flush all lanes and send ATP on all supporting lanes (or control lane - * otherwise) */ - if (send_atp) { - rpriv.put_comp_cb = - ucp_proto_rndv_put_common_flush_completion_send_atp; - rpriv.atp_comp_cb = comp_cb; - rpriv.atp_map = (atp_map == 0) ? - UCS_BIT(rpriv.bulk.super.lane) : atp_map; - } else { - rpriv.put_comp_cb = comp_cb; - rpriv.atp_comp_cb = NULL; - rpriv.atp_map = 0; + status = ucp_proto_rndv_bulk_init(¶ms, UCP_PROTO_RNDV_ATP_NAME, + bulk_perf, &perf, &rpriv.bulk); + if (status != UCS_OK) { + continue; } - rpriv.stage_after_put = UCP_PROTO_RNDV_PUT_STAGE_FLUSH; - rpriv.flush_map = rpriv.bulk.mpriv.lane_map; - ucs_assert(rpriv.flush_map != 0); - } - if (send_atp) { - ucs_assert(rpriv.atp_map != 0); + ucp_proto_rndv_put_add_proto(¶ms, comp_cb, stat_counter, perf, &rpriv); } - rpriv.atp_num_lanes = ucs_popcount(rpriv.atp_map); - rpriv.stat_counter = stat_counter; - ucp_proto_select_add_proto(¶ms.super.super, params.super.cfg_thresh, - params.super.cfg_priority, perf, &rpriv, - UCP_PROTO_MULTI_EXTENDED_PRIV_SIZE(&rpriv, - bulk.mpriv)); + ucp_proto_lane_select_destroy(select); } static const char * diff --git a/test/gtest/ucp/test_ucp_proto_mock.cc b/test/gtest/ucp/test_ucp_proto_mock.cc index f70eee451ca..11117e9a939 100644 --- a/test/gtest/ucp/test_ucp_proto_mock.cc +++ b/test/gtest/ucp/test_ucp_proto_mock.cc @@ -39,12 +39,17 @@ class mock_iface { void add_mock_iface( const std::string &dev_name = "mock", iface_attr_func_t cb = [](uct_iface_attr_t &iface_attr) {}, - perf_attr_func_t perf_cb = cx7_perf_mock) + perf_attr_func_t perf_cb = [](uct_perf_attr_t& perf_attr) {}) { m_iface_attrs_funcs[dev_name] = cb; m_perf_attrs_funcs[dev_name] = perf_cb; } + void add_mock_cx7_iface(const std::string &dev_name, iface_attr_func_t cb) + { + add_mock_iface(dev_name, cb, cx7_perf_mock); + } + void mock_transport(const std::string &tl_name) { uct_component_h component; @@ -163,8 +168,7 @@ class mock_iface { static void cx7_perf_mock(uct_perf_attr_t& perf_attr) { - perf_attr.path_bandwidth.shared = 0.95 * perf_attr.bandwidth.shared; - perf_attr.path_bandwidth.dedicated = 0; + perf_attr.path_bandwidth.shared = 0.95 * perf_attr.bandwidth.shared; } /* We have to use singleton to mock C functions */ @@ -515,7 +519,7 @@ class test_ucp_proto_mock_rcx : public test_ucp_proto_mock { virtual void init() override { /* Device with higher BW and latency */ - add_mock_iface("mock_0:1", [](uct_iface_attr_t &iface_attr) { + add_mock_cx7_iface("mock_0:1", [](uct_iface_attr_t &iface_attr) { iface_attr.cap.am.max_short = 2000; iface_attr.bandwidth.shared = 28e9; iface_attr.latency.c = 600e-9; @@ -523,7 +527,7 @@ class test_ucp_proto_mock_rcx : public test_ucp_proto_mock { iface_attr.cap.get.max_zcopy = 16384; }); /* Device with smaller BW but lower latency */ - add_mock_iface("mock_1:1", [](uct_iface_attr_t &iface_attr) { + add_mock_cx7_iface("mock_1:1", [](uct_iface_attr_t &iface_attr) { iface_attr.cap.am.max_short = 208; iface_attr.bandwidth.shared = 24e9; iface_attr.latency.c = 500e-9; @@ -534,19 +538,38 @@ class test_ucp_proto_mock_rcx : public test_ucp_proto_mock { }; UCS_TEST_P(test_ucp_proto_mock_rcx, memtype_copy_enable, - "IB_NUM_PATHS?=1", "MAX_RNDV_LANES=1", - "MEMTYPE_COPY_ENABLE=n") + "IB_NUM_PATHS?=1", "MAX_RNDV_LANES=1", "MEMTYPE_COPY_ENABLE=n", + "PROTO_VARIANTS=n") { ucp_proto_select_key_t key = any_key(); key.param.op_id_flags = UCP_OP_ID_AM_SEND; key.param.op_attr = 0; check_ep_config(sender(), { - {0, 0, "rendezvous no data fetch", ""}, - {1, 64, "rendezvous zero-copy fenced write to remote", - "rc_mlx5/mock_0:1"}, - {21992, INF, "rendezvous zero-copy read from remote", - "rc_mlx5/mock_0:1"}, + {0, 0, "rendezvous no data fetch", ""}, + {1, 64, "rendezvous zero-copy fenced write to remote", + "rc_mlx5/mock_0:1"}, + {65, INF, "rendezvous zero-copy read from remote", + "rc_mlx5/mock_0:1"}, + }, key); +} + +UCS_TEST_P(test_ucp_proto_mock_rcx, proto_variants_enable, + "IB_NUM_PATHS?=1", "MAX_RNDV_LANES=1", "MEMTYPE_COPY_ENABLE=n", + "PROTO_VARIANTS=y") +{ + ucp_proto_select_key_t key = any_key(); + key.param.op_id_flags = UCP_OP_ID_AM_SEND; + key.param.op_attr = 0; + + check_ep_config(sender(), { + {0, 0, "rendezvous no data fetch", ""}, + {1, 64, "rendezvous zero-copy fenced write to remote", + "rc_mlx5/mock_1:1"}, + {65, 16800, "rendezvous zero-copy read from remote", + "rc_mlx5/mock_1:1"}, + {16801, INF, "rendezvous zero-copy read from remote", + "rc_mlx5/mock_0:1"}, }, key); } @@ -642,7 +665,7 @@ class test_ucp_proto_mock_rcx2 : public test_ucp_proto_mock { virtual void init() override { /* Device with high BW and lower latency */ - add_mock_iface("mock_0:1", [](uct_iface_attr_t &iface_attr) { + add_mock_cx7_iface("mock_0:1", [](uct_iface_attr_t &iface_attr) { iface_attr.cap.am.max_short = 208; iface_attr.bandwidth.shared = 28e9; iface_attr.latency.c = 500e-9; @@ -650,7 +673,7 @@ class test_ucp_proto_mock_rcx2 : public test_ucp_proto_mock { iface_attr.cap.get.max_zcopy = 16384; }); /* Device with lower BW and higher latency */ - add_mock_iface("mock_1:1", [](uct_iface_attr_t &iface_attr) { + add_mock_cx7_iface("mock_1:1", [](uct_iface_attr_t &iface_attr) { iface_attr.cap.am.max_short = 2000; iface_attr.bandwidth.shared = 24e9; iface_attr.latency.c = 600e-9; @@ -690,7 +713,7 @@ class test_ucp_proto_mock_rcx3 : public test_ucp_proto_mock { { /* Device with high BW and lower latency, but 0 get_zcopy. * This use case is similar to cuda_ipc when NVLink is not available. */ - add_mock_iface("mock_0:1", [](uct_iface_attr_t &iface_attr) { + add_mock_cx7_iface("mock_0:1", [](uct_iface_attr_t &iface_attr) { iface_attr.cap.am.max_short = 208; iface_attr.bandwidth.shared = 28e9; iface_attr.latency.c = 500e-9; @@ -698,7 +721,7 @@ class test_ucp_proto_mock_rcx3 : public test_ucp_proto_mock { iface_attr.cap.get.max_zcopy = 0; }); /* Device with lower BW and higher latency */ - add_mock_iface("mock_1:1", [](uct_iface_attr_t &iface_attr) { + add_mock_cx7_iface("mock_1:1", [](uct_iface_attr_t &iface_attr) { iface_attr.cap.am.max_short = 2000; iface_attr.bandwidth.shared = 24e9; iface_attr.latency.c = 600e-9; @@ -726,6 +749,66 @@ UCS_TEST_P(test_ucp_proto_mock_rcx3, single_lane_no_zcopy, UCP_INSTANTIATE_TEST_CASE_TLS(test_ucp_proto_mock_rcx3, rcx, "rc_x") +class test_ucp_proto_mock_variants : public test_ucp_proto_mock { +public: + test_ucp_proto_mock_variants() + { + mock_transport("rc_mlx5"); + } + + virtual void init() override + { + /* Mock for "cuda_ipc" device. */ + add_mock_iface("nvlink", [](uct_iface_attr_t &iface_attr) { + iface_attr.cap.am.max_short = 0; + iface_attr.bandwidth.shared = 100e9; + iface_attr.latency.c = 2000e-9; + iface_attr.latency.m = 1e-9; + }, [](uct_perf_attr_t &perf_attr) { + perf_attr.path_bandwidth.shared = 0.5 * perf_attr.bandwidth.shared; + }); + /* IB device attached to GPU */ + add_mock_cx7_iface("fast:1", [](uct_iface_attr_t &iface_attr) { + iface_attr.cap.am.max_short = 2000; + iface_attr.bandwidth.shared = 45e9; + iface_attr.latency.c = 600e-9; + iface_attr.latency.m = 1e-9; + }); + /* IB device not attached to GPU, therefore both BW and latency are + * limited by distance */ + add_mock_cx7_iface("slow:1", [](uct_iface_attr_t &iface_attr) { + iface_attr.cap.am.max_short = 2000; + iface_attr.bandwidth.shared = 25e9; + iface_attr.latency.c = 900e-9; + iface_attr.latency.m = 1e-9; + }); + test_ucp_proto_mock::init(); + } +}; + +UCS_TEST_P(test_ucp_proto_mock_variants, latency_variant, + "IB_NUM_PATHS?=2", "MAX_RNDV_LANES=2", "RNDV_THRESH=0", "PROTO_VARIANTS=y") +{ + ucp_proto_select_key_t key = any_key(); + key.param.op_id_flags = UCP_OP_ID_AM_SEND; + key.param.op_attr = 0; + + /* + * Variant for latency must not include slow lane, because it adds + * latency to the path. + */ + check_ep_config(sender(), { + {1, 129, "rendezvous fragmented copy-in copy-out", + "rc_mlx5/fast:1/path0"}, + {130, 229091, "rendezvous zero-copy read from remote", + "rc_mlx5/fast:1 50% on path0 and 50% on path1"}, + {229092, INF, "rendezvous zero-copy read from remote", + "rc_mlx5/nvlink 50% on path0 and 50% on path1"}, + }, key); +} + +UCP_INSTANTIATE_TEST_CASE_TLS(test_ucp_proto_mock_variants, rcx, "rc_x") + class test_ucp_proto_mock_cma : public test_ucp_proto_mock { public: test_ucp_proto_mock_cma()