diff --git a/src/uct/sm/mm/base/mm_ep.c b/src/uct/sm/mm/base/mm_ep.c index 26fd7eb7212..21064985ac9 100644 --- a/src/uct/sm/mm/base/mm_ep.c +++ b/src/uct/sm/mm/base/mm_ep.c @@ -200,6 +200,8 @@ static UCS_CLASS_INIT_FUNC(uct_mm_ep_t, const uct_ep_params_t *params) self->cached_tail = self->fifo_ctl->tail; ucs_arbiter_elem_init(&self->arb_elem); + self->reserved_head = 0; + status = uct_ep_keepalive_init(&self->keepalive, self->fifo_ctl->pid); if (status != UCS_OK) { goto err_free_segs; @@ -230,26 +232,15 @@ UCS_CLASS_DEFINE_NEW_FUNC(uct_mm_ep_t, uct_ep_t, const uct_ep_params_t *); UCS_CLASS_DEFINE_DELETE_FUNC(uct_mm_ep_t, uct_ep_t); -static inline ucs_status_t +static inline void uct_mm_ep_get_remote_elem(uct_mm_ep_t *ep, uint64_t head, uct_mm_fifo_element_t **elem) { uct_mm_iface_t *iface = ucs_derived_of(ep->super.super.iface, uct_mm_iface_t); - uint64_t new_head, prev_head; uint64_t elem_index; /* index of the element to write */ elem_index = head & iface->fifo_mask; *elem = UCT_MM_IFACE_GET_FIFO_ELEM(iface, ep->fifo_elems, elem_index); - new_head = (head + 1) & ~UCT_MM_IFACE_FIFO_HEAD_EVENT_ARMED; - - /* try to get ownership of the head element */ - prev_head = ucs_atomic_cswap64(ucs_unaligned_ptr(&ep->fifo_ctl->head), head, - new_head); - if (prev_head != head) { - return UCS_ERR_NO_RESOURCE; - } - - return UCS_OK; } static inline void uct_mm_ep_update_cached_tail(uct_mm_ep_t *ep) @@ -288,14 +279,20 @@ static UCS_F_ALWAYS_INLINE ssize_t uct_mm_ep_am_common_send( ucs_status_t status; void *base_address; uint8_t elem_flags; - uint64_t head; + uint64_t head, prev_head; ucs_iov_iter_t iov_iter; void *desc_data; UCT_CHECK_AM_ID(am_id); -retry: - head = ep->fifo_ctl->head; + if (!ep->reserved_head) { + /* Do a atomic add and get reserve a FIFO element */ + ep->reserved_head = ucs_atomic_fadd64(&ep->fifo_ctl->head, 1); + } + + + head = ep->reserved_head; + /* check if there is room in the remote process's receive FIFO to write */ if (!UCT_MM_EP_IS_ABLE_TO_SEND(head, ep->cached_tail, iface->config.fifo_size)) { if (!ucs_arbiter_group_is_empty(&ep->arb_group)) { @@ -315,12 +312,9 @@ static UCS_F_ALWAYS_INLINE ssize_t uct_mm_ep_am_common_send( } } - status = uct_mm_ep_get_remote_elem(ep, head, &elem); - if (status != UCS_OK) { - ucs_assert(status == UCS_ERR_NO_RESOURCE); - ucs_trace_poll("couldn't get an available FIFO element. retrying"); - goto retry; - } + /* reset reserved_head */ + ep->reserved_head = 0; + uct_mm_ep_get_remote_elem(ep, head, &elem); switch (send_op) { case UCT_MM_SEND_AM_SHORT: @@ -382,7 +376,14 @@ static UCS_F_ALWAYS_INLINE ssize_t uct_mm_ep_am_common_send( elem->flags = elem_flags; if (ucs_unlikely(head & UCT_MM_IFACE_FIFO_HEAD_EVENT_ARMED)) { - uct_mm_ep_signal_remote(ep); + /* Try to minimize number of ranks that do _fetch_and_and_ in + * the case of heavy contention */ + if ((ep->fifo_ctl->head - head) == 1) { + prev_head = ucs_atomic_fand64(&ep->fifo_ctl->head, ~UCT_MM_IFACE_FIFO_HEAD_EVENT_ARMED); + if (prev_head & UCT_MM_IFACE_FIFO_HEAD_EVENT_ARMED) { + uct_mm_ep_signal_remote(ep); + } + } } uct_mm_ep_peer_check(ep, flags); @@ -453,8 +454,9 @@ ucs_status_t uct_mm_ep_pending_add(uct_ep_h tl_ep, uct_pending_req_t *n, uct_mm_iface_t *iface = ucs_derived_of(tl_ep->iface, uct_mm_iface_t); uct_mm_ep_t *ep = ucs_derived_of(tl_ep, uct_mm_ep_t); - /* check if resources became available */ - if (uct_mm_ep_has_tx_resources(ep)) { + /* check if we can use the already reserved slot in FIFO */ + if (UCT_MM_EP_IS_ABLE_TO_SEND(ep->reserved_head, ep->cached_tail, + iface->config.fifo_size)) { ucs_assert(ucs_arbiter_group_is_empty(&ep->arb_group)); return UCS_ERR_BUSY; } @@ -475,15 +477,24 @@ ucs_arbiter_cb_result_t uct_mm_ep_process_pending(ucs_arbiter_t *arbiter, void *arg) { uct_mm_ep_t *ep = ucs_container_of(group, uct_mm_ep_t, arb_group); + uct_mm_iface_t *iface = ucs_derived_of(ep->super.super.iface, uct_mm_iface_t); unsigned *count = (unsigned*)arg; uct_pending_req_t *req; ucs_status_t status; + uint64_t fifo_head; /* update the local tail with its actual value from the remote peer * making sure that the pending sends would use the real tail value */ uct_mm_ep_update_cached_tail(ep); - if (!uct_mm_ep_has_tx_resources(ep)) { + if (!ep->reserved_head) { + fifo_head = ep->fifo_ctl->head; + } else { + fifo_head = ep->reserved_head; + } + + if (!UCT_MM_EP_IS_ABLE_TO_SEND(fifo_head, ep->cached_tail, + iface->config.fifo_size)) { return UCS_ARBITER_CB_RESULT_RESCHED_GROUP; } diff --git a/src/uct/sm/mm/base/mm_ep.h b/src/uct/sm/mm/base/mm_ep.h index 6c52b55d206..80adf038b12 100644 --- a/src/uct/sm/mm/base/mm_ep.h +++ b/src/uct/sm/mm/base/mm_ep.h @@ -34,6 +34,9 @@ typedef struct uct_mm_ep { it is not always updated with the actual remote tail value */ uint64_t cached_tail; + /* the sender's reserved slot in the receiver's FIFO */ + uint64_t reserved_head; + /* mapped remote memory chunks to which remote descriptors belong to. * (after attaching to them) */ khash_t(uct_mm_remote_seg) remote_segs; diff --git a/src/uct/sm/mm/base/mm_iface.c b/src/uct/sm/mm/base/mm_iface.c index f4718655b64..03daf8baa82 100644 --- a/src/uct/sm/mm/base/mm_iface.c +++ b/src/uct/sm/mm/base/mm_iface.c @@ -434,7 +434,7 @@ uct_mm_iface_event_fd_arm(uct_iface_h tl_iface, unsigned events) { uct_mm_iface_t *iface = ucs_derived_of(tl_iface, uct_mm_iface_t); char dummy[UCT_MM_IFACE_MAX_SIG_EVENTS]; /* pop multiple signals at once */ - uint64_t head, prev_head; + uint64_t head; int ret; if ((events & UCT_EVENT_SEND_COMP) && @@ -461,15 +461,10 @@ uct_mm_iface_event_fd_arm(uct_iface_h tl_iface, unsigned events) if (!(head & UCT_MM_IFACE_FIFO_HEAD_EVENT_ARMED)) { /* Try to mark the head index as armed in an atomic way; fail if any sender managed to update the head at the same time */ - prev_head = ucs_atomic_cswap64( - ucs_unaligned_ptr(&iface->recv_fifo_ctl->head), head, - head | UCT_MM_IFACE_FIFO_HEAD_EVENT_ARMED); - if (prev_head != head) { + ucs_atomic_or64(&iface->recv_fifo_ctl->head, UCT_MM_IFACE_FIFO_HEAD_EVENT_ARMED); + if ((head ^ iface->recv_fifo_ctl->head) & ~UCT_MM_IFACE_FIFO_HEAD_EVENT_ARMED) { /* race with sender; need to retry */ - ucs_assert(!(prev_head & UCT_MM_IFACE_FIFO_HEAD_EVENT_ARMED)); - ucs_trace("iface %p: cannot arm, head %" PRIu64 - " prev_head %" PRIu64, - iface, head, prev_head); + ucs_trace("iface %p: cannot arm, head %" PRIu64, iface, head); return UCS_ERR_BUSY; } }