Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 36 additions & 25 deletions src/uct/sm/mm/base/mm_ep.c
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,8 @@ static UCS_CLASS_INIT_FUNC(uct_mm_ep_t, const uct_ep_params_t *params)
self->cached_tail = self->fifo_ctl->tail;
ucs_arbiter_elem_init(&self->arb_elem);

self->reserved_head = 0;

status = uct_ep_keepalive_init(&self->keepalive, self->fifo_ctl->pid);
if (status != UCS_OK) {
goto err_free_segs;
Expand Down Expand Up @@ -230,26 +232,15 @@ UCS_CLASS_DEFINE_NEW_FUNC(uct_mm_ep_t, uct_ep_t, const uct_ep_params_t *);
UCS_CLASS_DEFINE_DELETE_FUNC(uct_mm_ep_t, uct_ep_t);


static inline ucs_status_t
static inline void
uct_mm_ep_get_remote_elem(uct_mm_ep_t *ep, uint64_t head,
uct_mm_fifo_element_t **elem)
{
uct_mm_iface_t *iface = ucs_derived_of(ep->super.super.iface, uct_mm_iface_t);
uint64_t new_head, prev_head;
uint64_t elem_index; /* index of the element to write */

elem_index = head & iface->fifo_mask;
*elem = UCT_MM_IFACE_GET_FIFO_ELEM(iface, ep->fifo_elems, elem_index);
new_head = (head + 1) & ~UCT_MM_IFACE_FIFO_HEAD_EVENT_ARMED;

/* try to get ownership of the head element */
prev_head = ucs_atomic_cswap64(ucs_unaligned_ptr(&ep->fifo_ctl->head), head,
new_head);
if (prev_head != head) {
return UCS_ERR_NO_RESOURCE;
}

return UCS_OK;
}

static inline void uct_mm_ep_update_cached_tail(uct_mm_ep_t *ep)
Expand Down Expand Up @@ -288,14 +279,20 @@ static UCS_F_ALWAYS_INLINE ssize_t uct_mm_ep_am_common_send(
ucs_status_t status;
void *base_address;
uint8_t elem_flags;
uint64_t head;
uint64_t head, prev_head;
ucs_iov_iter_t iov_iter;
void *desc_data;

UCT_CHECK_AM_ID(am_id);

retry:
head = ep->fifo_ctl->head;
if (!ep->reserved_head) {
/* Do a atomic add and get reserve a FIFO element */
ep->reserved_head = ucs_atomic_fadd64(&ep->fifo_ctl->head, 1);
}


head = ep->reserved_head;

/* check if there is room in the remote process's receive FIFO to write */
if (!UCT_MM_EP_IS_ABLE_TO_SEND(head, ep->cached_tail, iface->config.fifo_size)) {
if (!ucs_arbiter_group_is_empty(&ep->arb_group)) {
Expand All @@ -315,12 +312,9 @@ static UCS_F_ALWAYS_INLINE ssize_t uct_mm_ep_am_common_send(
}
}

status = uct_mm_ep_get_remote_elem(ep, head, &elem);
if (status != UCS_OK) {
ucs_assert(status == UCS_ERR_NO_RESOURCE);
ucs_trace_poll("couldn't get an available FIFO element. retrying");
goto retry;
}
/* reset reserved_head */
ep->reserved_head = 0;
uct_mm_ep_get_remote_elem(ep, head, &elem);

switch (send_op) {
case UCT_MM_SEND_AM_SHORT:
Expand Down Expand Up @@ -382,7 +376,14 @@ static UCS_F_ALWAYS_INLINE ssize_t uct_mm_ep_am_common_send(
elem->flags = elem_flags;

if (ucs_unlikely(head & UCT_MM_IFACE_FIFO_HEAD_EVENT_ARMED)) {
uct_mm_ep_signal_remote(ep);
/* Try to minimize number of ranks that do _fetch_and_and_ in
* the case of heavy contention */
if ((ep->fifo_ctl->head - head) == 1) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this check really needed given that if (prev_head & UCT_MM_IFACE_FIFO_HEAD_EVENT_ARMED) already makes sure only one rank would call uct_mm_ep_signal_remote?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this check really needed given that if (prev_head & UCT_MM_IFACE_FIFO_HEAD_EVENT_ARMED) already makes sure only one rank would call uct_mm_ep_signal_remote?

Let's assume multiple writers are simultaneously trying to communicate with a reader whose FIFO is already 'ARMED'.

In this case, as we are using FAA instead of CAS, all writers will see the 'ARM' bit.
If we don't have the '((ep->fifo_ctl->head - head) == 1)' check, all writers would perform 'ucs_atomic_fand64(&ep->fifo_ctl->head, ~UCT_MM_IFACE_FIFO_HEAD_EVENT_ARMED)', increasing unnecessary atomic operations.

This check helps reduce the number of writers doing atomic_fetch_and_and.
We could avoid it if signaling is infrequent among participants.

Can you comment on how often signaling typically occurs among workers (ranks)?

prev_head = ucs_atomic_fand64(&ep->fifo_ctl->head, ~UCT_MM_IFACE_FIFO_HEAD_EVENT_ARMED);
if (prev_head & UCT_MM_IFACE_FIFO_HEAD_EVENT_ARMED) {
uct_mm_ep_signal_remote(ep);
}
}
}

uct_mm_ep_peer_check(ep, flags);
Expand Down Expand Up @@ -453,8 +454,9 @@ ucs_status_t uct_mm_ep_pending_add(uct_ep_h tl_ep, uct_pending_req_t *n,
uct_mm_iface_t *iface = ucs_derived_of(tl_ep->iface, uct_mm_iface_t);
uct_mm_ep_t *ep = ucs_derived_of(tl_ep, uct_mm_ep_t);

/* check if resources became available */
if (uct_mm_ep_has_tx_resources(ep)) {
/* check if we can use the already reserved slot in FIFO */
if (UCT_MM_EP_IS_ABLE_TO_SEND(ep->reserved_head, ep->cached_tail,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe uct_mm_ep_has_tx_resources should be updated?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will try this way, seems better.

iface->config.fifo_size)) {
ucs_assert(ucs_arbiter_group_is_empty(&ep->arb_group));
return UCS_ERR_BUSY;
}
Expand All @@ -475,15 +477,24 @@ ucs_arbiter_cb_result_t uct_mm_ep_process_pending(ucs_arbiter_t *arbiter,
void *arg)
{
uct_mm_ep_t *ep = ucs_container_of(group, uct_mm_ep_t, arb_group);
uct_mm_iface_t *iface = ucs_derived_of(ep->super.super.iface, uct_mm_iface_t);
unsigned *count = (unsigned*)arg;
uct_pending_req_t *req;
ucs_status_t status;
uint64_t fifo_head;

/* update the local tail with its actual value from the remote peer
* making sure that the pending sends would use the real tail value */
uct_mm_ep_update_cached_tail(ep);

if (!uct_mm_ep_has_tx_resources(ep)) {
if (!ep->reserved_head) {
fifo_head = ep->fifo_ctl->head;
} else {
fifo_head = ep->reserved_head;
}

if (!UCT_MM_EP_IS_ABLE_TO_SEND(fifo_head, ep->cached_tail,
iface->config.fifo_size)) {
return UCS_ARBITER_CB_RESULT_RESCHED_GROUP;
}

Expand Down
3 changes: 3 additions & 0 deletions src/uct/sm/mm/base/mm_ep.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,9 @@ typedef struct uct_mm_ep {
it is not always updated with the actual remote tail value */
uint64_t cached_tail;

/* the sender's reserved slot in the receiver's FIFO */
uint64_t reserved_head;

/* mapped remote memory chunks to which remote descriptors belong to.
* (after attaching to them) */
khash_t(uct_mm_remote_seg) remote_segs;
Expand Down
13 changes: 4 additions & 9 deletions src/uct/sm/mm/base/mm_iface.c
Original file line number Diff line number Diff line change
Expand Up @@ -434,7 +434,7 @@ uct_mm_iface_event_fd_arm(uct_iface_h tl_iface, unsigned events)
{
uct_mm_iface_t *iface = ucs_derived_of(tl_iface, uct_mm_iface_t);
char dummy[UCT_MM_IFACE_MAX_SIG_EVENTS]; /* pop multiple signals at once */
uint64_t head, prev_head;
uint64_t head;
int ret;

if ((events & UCT_EVENT_SEND_COMP) &&
Expand All @@ -461,15 +461,10 @@ uct_mm_iface_event_fd_arm(uct_iface_h tl_iface, unsigned events)
if (!(head & UCT_MM_IFACE_FIFO_HEAD_EVENT_ARMED)) {
/* Try to mark the head index as armed in an atomic way; fail if any
sender managed to update the head at the same time */
prev_head = ucs_atomic_cswap64(
ucs_unaligned_ptr(&iface->recv_fifo_ctl->head), head,
head | UCT_MM_IFACE_FIFO_HEAD_EVENT_ARMED);
if (prev_head != head) {
ucs_atomic_or64(&iface->recv_fifo_ctl->head, UCT_MM_IFACE_FIFO_HEAD_EVENT_ARMED);
if ((head ^ iface->recv_fifo_ctl->head) & ~UCT_MM_IFACE_FIFO_HEAD_EVENT_ARMED) {
/* race with sender; need to retry */
ucs_assert(!(prev_head & UCT_MM_IFACE_FIFO_HEAD_EVENT_ARMED));
ucs_trace("iface %p: cannot arm, head %" PRIu64
" prev_head %" PRIu64,
iface, head, prev_head);
ucs_trace("iface %p: cannot arm, head %" PRIu64, iface, head);
return UCS_ERR_BUSY;
}
}
Expand Down
Loading