Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 0 additions & 5 deletions docs/source/faq.md
Original file line number Diff line number Diff line change
Expand Up @@ -201,8 +201,6 @@ by `ucx_info -d` command.
> **IMPORTANT NOTE**
> In some cases restricting the transports can lead to unexpected and undefined behavior:
> * Using *rc_verbs* or *rc_mlx5* also requires *ud_verbs* or *ud_mlx5* transport for bootstrap.
> * Applications using GPU memory must also specify GPU transports for detecting and
> handling non-host memory.

In addition to the built-in transports it's possible to use aliases which specify multiple transports.

Expand Down Expand Up @@ -347,9 +345,6 @@ GPU memory (for example,
and UCX compiled with GPU support. Then you can run the application as usual (for
example, with MPI) and whenever GPU memory is passed to UCX, it either use GPU-direct
for zero copy operations, or copy the data to/from host memory.
> NOTE When specifying UCX_TLS explicitly, must also specify cuda/rocm for GPU memory
> support, otherwise the GPU memory will not be recognized.
> For example: `UCX_TLS=rc,cuda` or `UCX_TLS=dc,rocm`

#### I'm running UCX with GPU memory and getting a segfault, why?

Expand Down
292 changes: 161 additions & 131 deletions src/ucp/core/ucp_context.c

Large diffs are not rendered by default.

12 changes: 9 additions & 3 deletions src/ucp/core/ucp_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,12 @@ KHASH_IMPL(ucp_context_imported_mem_hash, uint64_t, ucs_rcache_t*, 1,


enum {
/* The flag indicates that the resource may be used for auxiliary
* wireup communications only */
UCP_TL_RSC_FLAG_AUX = UCS_BIT(0)
/* The flag indicates that the resource may be used for normal communication */
UCP_TL_RSC_FLAG_COMM = UCS_BIT(0),
/* The flag indicates that the resource may be used for auxiliary wireup */
UCP_TL_RSC_FLAG_AUX = UCS_BIT(1),
/* The flag indicates that the resource may be used for memory copy */
UCP_TL_RSC_FLAG_MEM = UCS_BIT(2),
};

#define UCP_OP_ATTR_INDEX_MASK (UCP_OP_ATTR_FLAG_NO_IMM_CMPL | \
Expand Down Expand Up @@ -228,6 +231,8 @@ struct ucp_config {
ucs_config_names_array_t devices[UCT_DEVICE_TYPE_LAST];
/** Array of transport names to use */
ucs_config_allow_list_t tls;
/** Array of transport names to use for memory copy */
ucs_config_allow_list_t mem_tls;
/** Array of protocol names to use */
ucs_config_allow_list_t protos;
/** Array of memory allocation methods */
Expand Down Expand Up @@ -587,6 +592,7 @@ typedef struct ucp_tl_iface_atomic_flags {

extern ucp_am_handler_t *ucp_am_handlers[];
extern const char *ucp_feature_str[];
extern const char *ucp_tl_rsc_flag_names[];


void ucp_dump_payload(ucp_context_h context, char *buffer, size_t max,
Expand Down
51 changes: 34 additions & 17 deletions src/ucp/core/ucp_worker.c
Original file line number Diff line number Diff line change
Expand Up @@ -573,13 +573,26 @@ ucp_worker_iface_error_handler(void *arg, uct_ep_h uct_ep, ucs_status_t status)
return status;
}

static int ucp_worker_iface_can_recv_tag(const ucp_worker_iface_t *wiface)
{
const uint64_t cap_flags = UCT_IFACE_FLAG_CB_SYNC |
UCT_IFACE_FLAG_TAG_EAGER_SHORT |
UCT_IFACE_FLAG_TAG_EAGER_BCOPY |
UCT_IFACE_FLAG_TAG_EAGER_ZCOPY |
UCT_IFACE_FLAG_TAG_RNDV_ZCOPY;
ucp_context_h context = wiface->worker->context;

return (wiface->attr.cap.flags & cap_flags) &&
(context->tl_rscs[wiface->rsc_index].flags & UCP_TL_RSC_FLAG_COMM);
}

void ucp_worker_iface_activate(ucp_worker_iface_t *wiface, unsigned uct_flags)
{
ucp_worker_h worker = wiface->worker;

ucs_trace("activate " UCP_WIFACE_FMT " a_count=%u a_ifaces=%u",
ucs_trace("activate " UCP_WIFACE_FMT " a_count %u tag_ifaces %u",
UCP_WIFACE_ARG(wiface), wiface->activate_count,
worker->num_active_ifaces);
worker->num_active_tag_ifaces);

if (wiface->activate_count++ > 0) {
return; /* was already activated */
Expand All @@ -602,7 +615,9 @@ void ucp_worker_iface_activate(ucp_worker_iface_t *wiface, unsigned uct_flags)
ucs_list_add_tail(&worker->arm_ifaces, &wiface->arm_list);
}

++worker->num_active_ifaces;
if (ucp_worker_iface_can_recv_tag(wiface)) {
++worker->num_active_tag_ifaces;
}

uct_iface_progress_enable(wiface->iface,
UCT_PROGRESS_SEND | UCT_PROGRESS_RECV | uct_flags);
Expand Down Expand Up @@ -731,9 +746,9 @@ static void ucp_worker_iface_deactivate(ucp_worker_iface_t *wiface, int force)
{
ucp_worker_h worker = wiface->worker;

ucs_trace("deactivate " UCP_WIFACE_FMT " force=%d a_count=%u a_ifaces=%u",
ucs_trace("deactivate " UCP_WIFACE_FMT " force=%d a_count %u tag_ifaces %u",
UCP_WIFACE_ARG(wiface), force, wiface->activate_count,
worker->num_active_ifaces);
worker->num_active_tag_ifaces);

if (!force) {
ucs_assertv(wiface->activate_count > 0, UCP_WIFACE_FMT,
Expand All @@ -744,7 +759,9 @@ static void ucp_worker_iface_deactivate(ucp_worker_iface_t *wiface, int force)
return;
}

--worker->num_active_ifaces;
if (ucp_worker_iface_can_recv_tag(wiface)) {
--worker->num_active_tag_ifaces;
}
}

/* Avoid progress on the interface to reduce overhead */
Expand Down Expand Up @@ -2485,17 +2502,17 @@ ucs_status_t ucp_worker_create(ucp_context_h context,
return UCS_ERR_NO_MEMORY;
}

worker->context = context;
worker->uuid = ucs_generate_uuid((uintptr_t)worker);
worker->flush_ops_count = 0;
worker->fence_seq = 0;
worker->inprogress = 0;
worker->rkey_config_count = 0;
worker->num_active_ifaces = 0;
worker->num_ifaces = 0;
worker->am_message_id = ucs_generate_uuid(0);
worker->rkey_ptr_cb_id = UCS_CALLBACKQ_ID_NULL;
worker->num_all_eps = 0;
worker->context = context;
worker->uuid = ucs_generate_uuid((uintptr_t)worker);
worker->flush_ops_count = 0;
worker->fence_seq = 0;
worker->inprogress = 0;
worker->rkey_config_count = 0;
worker->num_active_tag_ifaces = 0;
worker->num_ifaces = 0;
worker->am_message_id = ucs_generate_uuid(0);
worker->rkey_ptr_cb_id = UCS_CALLBACKQ_ID_NULL;
worker->num_all_eps = 0;
ucp_worker_keepalive_reset(worker);
ucs_queue_head_init(&worker->rkey_ptr_reqs);
ucs_list_head_init(&worker->arm_ifaces);
Expand Down
2 changes: 1 addition & 1 deletion src/ucp/core/ucp_worker.h
Original file line number Diff line number Diff line change
Expand Up @@ -334,7 +334,7 @@ typedef struct ucp_worker {
ucp_worker_iface_t **ifaces; /* Array of pointers to interfaces,
one for each resource */
unsigned num_ifaces; /* Number of elements in ifaces array */
unsigned num_active_ifaces; /* Number of activated ifaces */
unsigned num_active_tag_ifaces; /* Number of activated ifaces that can receive tag messages */
ucp_tl_bitmap_t scalable_tl_bitmap; /* Map of scalable tl resources */
ucp_worker_cm_t *cms; /* Array of CMs, one for each component */
ucs_mpool_set_t am_mps; /* Memory pool set for AM receives */
Expand Down
2 changes: 1 addition & 1 deletion src/ucp/tag/offload.c
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ ucp_tag_offload_iface(ucp_worker_t *worker, ucp_tag_t tag)
khiter_t hash_it;
ucp_tag_t key_tag;

if (worker->num_active_ifaces == 1) {
if (worker->num_active_tag_ifaces == 1) {
ucs_assert(worker->tm.offload.iface != NULL);
return worker->tm.offload.iface;
}
Expand Down
2 changes: 1 addition & 1 deletion src/ucp/tag/offload.h
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ ucp_tag_offload_unexp(ucp_worker_iface_t *wiface, ucp_tag_t tag, size_t length)
avoid unwanted postings of receive buffers (those, which are expected to
arrive from offload incapable iface) to the HW. */
if (ucs_unlikely((length >= worker->tm.offload.thresh) &&
(worker->num_active_ifaces > 1))) {
(worker->num_active_tag_ifaces > 1))) {
tag_key = worker->context->config.tag_sender_mask & tag;
hash_it = kh_get(ucp_tag_offload_hash, &worker->tm.offload.tag_hash,
tag_key);
Expand Down
49 changes: 32 additions & 17 deletions src/ucp/wireup/select.c
Original file line number Diff line number Diff line change
Expand Up @@ -410,13 +410,15 @@ static UCS_F_NOINLINE ucs_status_t ucp_wireup_select_transport(
ucp_lane_index_t lane;
char tls_info[256];
char uct_info[256];
char flags_str[64];
char *p, *endp;
uct_iface_attr_t *iface_attr;
uct_md_attr_v2_t *md_attr;
const uct_component_attr_t *cmpt_attr;
int is_reachable;
double score;
uint8_t priority;
uint8_t tl_rsc_flags;
ucp_md_index_t md_index;

p = tls_info;
Expand Down Expand Up @@ -494,8 +496,14 @@ static UCS_F_NOINLINE ucs_status_t ucp_wireup_select_transport(
md_attr = &context->tl_mds[md_index].attr;
cmpt_attr = ucp_cmpt_attr_by_md_index(context, md_index);

if ((context->tl_rscs[rsc_index].flags & UCP_TL_RSC_FLAG_AUX) &&
!(criteria->tl_rsc_flags & UCP_TL_RSC_FLAG_AUX)) {
tl_rsc_flags = context->tl_rscs[rsc_index].flags;
if (!(tl_rsc_flags & criteria->tl_rsc_flags)) {
ucs_trace(UCT_TL_RESOURCE_DESC_FMT
" : disabled because it doesn't support %s",
UCT_TL_RESOURCE_DESC_ARG(resource),
ucs_flags_str(flags_str, sizeof(flags_str),
criteria->tl_rsc_flags,
ucp_tl_rsc_flag_names));
continue;
}

Expand Down Expand Up @@ -1083,7 +1091,9 @@ static void ucp_wireup_fill_aux_criteria(ucp_wireup_criteria_t *criteria,
ucp_wireup_fill_peer_err_criteria(criteria, ep_init_flags);
}

static void ucp_wireup_criteria_init(ucp_wireup_criteria_t *criteria)
static void
ucp_wireup_criteria_init(const ucp_wireup_select_params_t *select_params,
ucp_wireup_criteria_t *criteria)
{
criteria->title = "";
criteria->local_md_flags = 0;
Expand All @@ -1093,7 +1103,10 @@ static void ucp_wireup_criteria_init(ucp_wireup_criteria_t *criteria)
criteria->alloc_mem_types = 0;
criteria->is_keepalive = 0;
criteria->calc_score = NULL;
criteria->tl_rsc_flags = 0;
criteria->tl_rsc_flags = (select_params->ep_init_flags &
UCP_EP_INIT_FLAG_MEM_TYPE) ?
UCP_TL_RSC_FLAG_MEM :
UCP_TL_RSC_FLAG_COMM;
ucp_wireup_init_select_flags(&criteria->local_iface_flags, 0, 0);
ucp_wireup_init_select_flags(&criteria->remote_iface_flags, 0, 0);
memset(&criteria->remote_atomic_flags, 0,
Expand Down Expand Up @@ -1154,7 +1167,7 @@ ucp_wireup_add_rma_lanes(const ucp_wireup_select_params_t *select_params,
return UCS_OK;
}

ucp_wireup_criteria_init(&criteria);
ucp_wireup_criteria_init(select_params, &criteria);
if (ep_init_flags & UCP_EP_INIT_FLAG_MEM_TYPE) {
criteria.title = "copy across memory types";
ucp_wireup_init_select_flags(&criteria.local_iface_flags,
Expand Down Expand Up @@ -1221,7 +1234,7 @@ ucp_wireup_add_amo_lanes(const ucp_wireup_select_params_t *select_params,
return UCS_OK;
}

ucp_wireup_criteria_init(&criteria);
ucp_wireup_criteria_init(select_params, &criteria);
criteria.title = "atomic operations on %s memory";
criteria.local_atomic_flags = criteria.remote_atomic_flags;
criteria.calc_score = ucp_wireup_amo_score_func;
Expand Down Expand Up @@ -1464,13 +1477,15 @@ ucp_wireup_add_am_lane(const ucp_wireup_select_params_t *select_params,

/* Select one lane for active messages */
for (;;) {
ucp_wireup_criteria_init(&criteria);
criteria.title = "active messages";
criteria.calc_score = ucp_wireup_am_score_func;
criteria.lane_type = UCP_LANE_TYPE_AM;
criteria.tl_rsc_flags =
(ep_init_flags & UCP_EP_INIT_ALLOW_AM_AUX_TL) ?
UCP_TL_RSC_FLAG_AUX : 0;
ucp_wireup_criteria_init(select_params, &criteria);
criteria.calc_score = ucp_wireup_am_score_func;
criteria.lane_type = UCP_LANE_TYPE_AM;
if (ep_init_flags & UCP_EP_INIT_ALLOW_AM_AUX_TL) {
criteria.title = "auxiliary active messages";
criteria.tl_rsc_flags |= UCP_TL_RSC_FLAG_AUX;
} else {
criteria.title = "active messages";
}
ucp_wireup_init_select_flags(&criteria.local_iface_flags,
UCT_IFACE_FLAG_AM_BCOPY, 0);
ucp_wireup_init_select_flags(&criteria.remote_iface_flags,
Expand Down Expand Up @@ -1905,7 +1920,7 @@ ucp_wireup_add_am_bw_lanes(const ucp_wireup_select_params_t *select_params,
}

/* Select one lane for active messages */
ucp_wireup_criteria_init(&bw_info.criteria);
ucp_wireup_criteria_init(select_params, &bw_info.criteria);
bw_info.criteria.title = "high-bw active messages";
bw_info.criteria.calc_score = ucp_wireup_am_bw_score_func;
bw_info.criteria.lane_type = UCP_LANE_TYPE_AM_BW;
Expand Down Expand Up @@ -2074,7 +2089,7 @@ ucp_wireup_add_rma_bw_lanes(const ucp_wireup_select_params_t *select_params,
return UCS_OK;
}

ucp_wireup_criteria_init(&bw_info.criteria);
ucp_wireup_criteria_init(select_params, &bw_info.criteria);
bw_info.criteria.calc_score = ucp_wireup_rma_bw_score_func;
ucp_wireup_init_select_flags(&bw_info.criteria.local_iface_flags,
UCT_IFACE_FLAG_PENDING, 0);
Expand Down Expand Up @@ -2237,7 +2252,7 @@ ucp_wireup_add_tag_lane(const ucp_wireup_select_params_t *select_params,
return UCS_OK;
}

ucp_wireup_criteria_init(&criteria);
ucp_wireup_criteria_init(select_params, &criteria);
criteria.title = "tag_offload";
criteria.calc_score = ucp_wireup_am_score_func;
criteria.lane_type = UCP_LANE_TYPE_TAG;
Expand Down Expand Up @@ -2401,7 +2416,7 @@ ucp_wireup_add_keepalive_lane(const ucp_wireup_select_params_t *select_params,
tl_bitmap = &select_params->tl_bitmap;
}

ucp_wireup_criteria_init(&criteria);
ucp_wireup_criteria_init(select_params, &criteria);
criteria.title = "keepalive";
criteria.local_md_flags = 0;
criteria.is_keepalive = 1;
Expand Down
5 changes: 4 additions & 1 deletion test/gtest/ucp/test_ucp_sockaddr.cc
Original file line number Diff line number Diff line change
Expand Up @@ -3307,7 +3307,10 @@ class test_ucp_sockaddr_iface_activate : public test_ucp_sockaddr {
{
ucp_worker_h worker = e.worker();
for (unsigned i = 0; i < worker->num_ifaces; ++i) {
if (ucp_worker_iface_is_activated(worker->ifaces[i])) {
auto wiface = worker->ifaces[i];
if ((worker->context->tl_rscs[wiface->rsc_index].flags &
UCP_TL_RSC_FLAG_COMM) &&
ucp_worker_iface_is_activated(wiface)) {
return true;
}
}
Expand Down
3 changes: 3 additions & 0 deletions test/gtest/ucp/test_ucp_tag.cc
Original file line number Diff line number Diff line change
Expand Up @@ -431,6 +431,9 @@ class test_ucp_tag_limits : public test_ucp_tag {
}

void init() {
// Disable GPU memory support
modify_config("MEM_TLS", "");

/* TODO: Currently all the tests are for intra-node communication only.
* Find a way to create inter-node endpoint on a single node */
test_ucp_tag::init();
Expand Down
2 changes: 1 addition & 1 deletion test/gtest/ucp/test_ucp_tag_offload.cc
Original file line number Diff line number Diff line change
Expand Up @@ -553,7 +553,7 @@ UCS_TEST_P(test_ucp_tag_offload_multi, recv_from_multi)
// Activate first offload iface. Tag hashing is performed only if there is
// more than one active interface.
activate_offload_hashing(e(0), make_tag(e(0), tag));
int init_hash_size = receiver().worker()->num_active_ifaces > 1;
int init_hash_size = receiver().worker()->num_active_tag_ifaces > 1;
EXPECT_EQ(init_hash_size,
kh_size(&receiver().worker()->tm.offload.tag_hash));

Expand Down
Loading
Loading