From fcb71e394ff5bd0f8977399d4e408ebca4ec7f02 Mon Sep 17 00:00:00 2001 From: Philip Marshall Date: Wed, 8 May 2024 15:17:44 -0700 Subject: [PATCH] src: Pass NIC index to transport layer functions --- src/atomic_c.c4 | 25 ++++-- src/data_c.c4 | 99 +++++++++++++++++------- src/shmem_comm.h | 61 +++++++-------- src/shmem_internal.h | 3 +- src/shr_transport.h4 | 11 +-- src/transport_none.h | 27 +++---- src/transport_ofi.c | 73 ++++++++++++------ src/transport_ofi.h | 161 ++++++++++++++++++++------------------- src/transport_portals4.h | 41 +++++----- src/transport_ucx.h | 33 ++++---- 10 files changed, 315 insertions(+), 219 deletions(-) diff --git a/src/atomic_c.c4 b/src/atomic_c.c4 index f8b20dd6c..7ae7ac7c4 100644 --- a/src/atomic_c.c4 +++ b/src/atomic_c.c4 @@ -310,8 +310,11 @@ shmem_swap(long *target, long value, int pe) SHMEM_ERR_CHECK_PE(pe); \ SHMEM_ERR_CHECK_CTX(ctx); \ SHMEM_ERR_CHECK_SYMMETRIC(target, sizeof(TYPE)); \ + \ + size_t nic_idx = 0; \ + SHMEM_GET_TRANSMIT_NIC_IDX(nic_idx); \ shmem_internal_atomic(ctx, target, &tmp, sizeof(TYPE), \ - pe, SHM_INTERNAL_SUM, ITYPE); \ + pe, SHM_INTERNAL_SUM, ITYPE, nic_idx); \ } @@ -361,8 +364,10 @@ shmem_swap(long *target, long value, int pe) SHMEM_ERR_CHECK_CTX(ctx); \ SHMEM_ERR_CHECK_SYMMETRIC(target, sizeof(TYPE)); \ \ + size_t nic_idx = 0; \ + SHMEM_GET_TRANSMIT_NIC_IDX(nic_idx); \ shmem_internal_atomic(ctx, target, &value, sizeof(TYPE), \ - pe, SHM_INTERNAL_SUM, ITYPE); \ + pe, SHM_INTERNAL_SUM, ITYPE, nic_idx); \ } @@ -432,8 +437,10 @@ shmem_swap(long *target, long value, int pe) SHMEM_ERR_CHECK_CTX(ctx); \ SHMEM_ERR_CHECK_SYMMETRIC(dest, sizeof(TYPE)); \ \ + size_t nic_idx = 0; \ + SHMEM_GET_TRANSMIT_NIC_IDX(nic_idx); \ shmem_internal_atomic_set(ctx, (void *) dest, &value, \ - sizeof(TYPE), pe, ITYPE); \ + sizeof(TYPE), pe, ITYPE, nic_idx); \ } @@ -445,8 +452,10 @@ shmem_swap(long *target, long value, int pe) SHMEM_ERR_CHECK_CTX(ctx); \ SHMEM_ERR_CHECK_SYMMETRIC(target, sizeof(TYPE)); \ \ + size_t nic_idx = 0; \ + SHMEM_GET_TRANSMIT_NIC_IDX(nic_idx); \ shmem_internal_atomic(ctx, target, &value, sizeof(TYPE), \ - pe, SHM_INTERNAL_BXOR, ITYPE); \ + pe, SHM_INTERNAL_BXOR, ITYPE, nic_idx); \ } @@ -458,8 +467,10 @@ shmem_swap(long *target, long value, int pe) SHMEM_ERR_CHECK_CTX(ctx); \ SHMEM_ERR_CHECK_SYMMETRIC(target, sizeof(TYPE)); \ \ + size_t nic_idx = 0; \ + SHMEM_GET_TRANSMIT_NIC_IDX(nic_idx); \ shmem_internal_atomic(ctx, target, &value, sizeof(TYPE), \ - pe, SHM_INTERNAL_BAND, ITYPE); \ + pe, SHM_INTERNAL_BAND, ITYPE, nic_idx); \ } @@ -471,8 +482,10 @@ shmem_swap(long *target, long value, int pe) SHMEM_ERR_CHECK_CTX(ctx); \ SHMEM_ERR_CHECK_SYMMETRIC(target, sizeof(TYPE)); \ \ + size_t nic_idx = 0; \ + SHMEM_GET_TRANSMIT_NIC_IDX(nic_idx); \ shmem_internal_atomic(ctx, target, &value, sizeof(TYPE), \ - pe, SHM_INTERNAL_BOR, ITYPE); \ + pe, SHM_INTERNAL_BOR, ITYPE, nic_idx); \ } diff --git a/src/data_c.c4 b/src/data_c.c4 index 0973192cf..f514d7331 100644 --- a/src/data_c.c4 +++ b/src/data_c.c4 @@ -305,8 +305,11 @@ SHMEM_PROF_DEF_CTX_PUT_N_SIGNAL_NBI(`mem') SHMEM_ERR_CHECK_PE(pe); \ SHMEM_ERR_CHECK_CTX(ctx); \ SHMEM_ERR_CHECK_SYMMETRIC(addr, sizeof(TYPE)); \ + \ + size_t nic_idx = 0; \ + SHMEM_GET_TRANSMIT_NIC_IDX(nic_idx); \ shmem_internal_put_scalar(ctx, addr, &value, sizeof(TYPE), \ - pe); \ + pe, nic_idx); \ } #define SHMEM_DEF_G(STYPE,TYPE) \ @@ -340,10 +343,13 @@ SHMEM_PROF_DEF_CTX_PUT_N_SIGNAL_NBI(`mem') SHMEM_ERR_CHECK_OVERLAP(target, source, sizeof(TYPE) * \ nelems, sizeof(TYPE) * nelems, 0, \ (shmem_internal_my_pe == pe)); \ + \ + size_t nic_idx = 0; \ + SHMEM_GET_TRANSMIT_NIC_IDX(nic_idx); \ shmem_internal_put_nb(ctx, target, source, \ sizeof(TYPE) * nelems, pe, \ - &completion); \ - shmem_internal_put_wait(ctx, &completion); \ + &completion, nic_idx); \ + shmem_internal_put_wait(ctx, &completion, nic_idx); \ } @@ -361,9 +367,12 @@ SHMEM_PROF_DEF_CTX_PUT_N_SIGNAL_NBI(`mem') SHMEM_ERR_CHECK_OVERLAP(target, source, (SIZE) * nelems, \ (SIZE) * nelems, 0, \ (shmem_internal_my_pe == pe)); \ + \ + size_t nic_idx = 0; \ + SHMEM_GET_TRANSMIT_NIC_IDX(nic_idx); \ shmem_internal_put_nb(ctx, target, source, (SIZE) * nelems,\ - pe, &completion); \ - shmem_internal_put_wait(ctx, &completion); \ + pe, &completion, nic_idx); \ + shmem_internal_put_wait(ctx, &completion, nic_idx); \ } @@ -379,9 +388,12 @@ SHMEM_PROF_DEF_CTX_PUT_N_SIGNAL_NBI(`mem') SHMEM_ERR_CHECK_OVERLAP(target, source, sizeof(TYPE) * \ nelems, sizeof(TYPE) * nelems, 0, \ (shmem_internal_my_pe == pe)); \ + \ + size_t nic_idx = 0; \ + SHMEM_GET_TRANSMIT_NIC_IDX(nic_idx); \ shmem_internal_put_nbi(ctx, target, source, \ sizeof(TYPE)*nelems, \ - pe); \ + pe, nic_idx); \ } @@ -398,8 +410,11 @@ SHMEM_PROF_DEF_CTX_PUT_N_SIGNAL_NBI(`mem') SHMEM_ERR_CHECK_OVERLAP(target, source, (SIZE) * nelems, \ (SIZE) * nelems, 0, \ (shmem_internal_my_pe == pe)); \ + \ + size_t nic_idx = 0; \ + SHMEM_GET_TRANSMIT_NIC_IDX(nic_idx); \ shmem_internal_put_nbi(ctx, target, source, (SIZE)*nelems, \ - pe); \ + pe, nic_idx); \ } @@ -502,9 +517,12 @@ SHMEM_PROF_DEF_CTX_PUT_N_SIGNAL_NBI(`mem') sizeof(TYPE) * ((nelems-1) * tst + 1), \ sizeof(TYPE) * ((nelems-1) * sst + 1), 0, \ (shmem_internal_my_pe == pe)); \ + \ + size_t nic_idx = 0; \ + SHMEM_GET_TRANSMIT_NIC_IDX(nic_idx); \ for ( ; nelems > 0 ; --nelems) { \ shmem_internal_put_scalar(ctx, target, source, \ - sizeof(TYPE), pe); \ + sizeof(TYPE), pe, nic_idx); \ target += tst; \ source += sst; \ } \ @@ -527,14 +545,17 @@ SHMEM_PROF_DEF_CTX_PUT_N_SIGNAL_NBI(`mem') sizeof(TYPE) * ((nblocks-1) * tst + bsize), \ sizeof(TYPE) * ((nblocks-1) * sst + bsize), \ 0, (shmem_internal_my_pe == pe)); \ + \ + size_t nic_idx = 0; \ + SHMEM_GET_TRANSMIT_NIC_IDX(nic_idx); \ for ( ; nblocks > 0 ; --nblocks) { \ shmem_internal_put_nb(ctx, target, source, \ bsize * sizeof(TYPE), pe, \ - &completion); \ + &completion, nic_idx); \ target += tst; \ source += sst; \ } \ - shmem_internal_put_wait(ctx, &completion); \ + shmem_internal_put_wait(ctx, &completion, nic_idx); \ } #define SHMEM_DEF_IPUT_N(NAME,SIZE) \ @@ -554,9 +575,12 @@ SHMEM_PROF_DEF_CTX_PUT_N_SIGNAL_NBI(`mem') (SIZE) * ((nelems-1) * tst + 1), \ (SIZE) * ((nelems-1) * sst + 1), 0, \ (shmem_internal_my_pe == pe)); \ + \ + size_t nic_idx = 0; \ + SHMEM_GET_TRANSMIT_NIC_IDX(nic_idx); \ for ( ; nelems > 0 ; --nelems) { \ shmem_internal_put_scalar(ctx, target, source, (SIZE), \ - pe); \ + pe, nic_idx); \ target = (uint8_t *) target + tst * (SIZE); \ source = (uint8_t *) source + sst * (SIZE); \ } \ @@ -580,14 +604,17 @@ SHMEM_PROF_DEF_CTX_PUT_N_SIGNAL_NBI(`mem') (SIZE) * ((nblocks-1) * tst + bsize), \ (SIZE) * ((nblocks-1) * sst + bsize), \ 0, (shmem_internal_my_pe == pe)); \ + \ + size_t nic_idx = 0; \ + SHMEM_GET_TRANSMIT_NIC_IDX(nic_idx); \ for ( ; nblocks > 0 ; --nblocks) { \ shmem_internal_put_nb(ctx, target, source, \ bsize * (SIZE), pe, \ - &completion); \ + &completion, nic_idx); \ target = (uint8_t *) target + tst * (SIZE); \ source = (uint8_t *) source + sst * (SIZE); \ } \ - shmem_internal_put_wait(ctx, &completion); \ + shmem_internal_put_wait(ctx, &completion, nic_idx); \ } #define SHMEM_DEF_IGET(STYPE,TYPE) \ @@ -698,7 +725,7 @@ SHMEM_PROF_DEF_CTX_PUT_N_SIGNAL_NBI(`mem') 0, (shmem_internal_my_pe == pe)); \ \ size_t nic_idx = 0; \ - SHMEM_GET_TRANSMIT_NIC_IDX(nic_idx); \ + SHMEM_GET_TRANSMIT_NIC_IDX(nic_idx); \ for ( ; nblocks > 0 ; --nblocks) { \ shmem_internal_get(ctx, target, source, \ bsize * (SIZE), pe, nic_idx); \ @@ -724,19 +751,22 @@ SHMEM_PROF_DEF_CTX_PUT_N_SIGNAL_NBI(`mem') sizeof(uint64_t), 0, \ (shmem_internal_my_pe == pe)); \ SHMEM_ERR_CHECK_SIG_OP(sig_op); \ + \ + size_t nic_idx = 0; \ + SHMEM_GET_TRANSMIT_NIC_IDX(nic_idx); \ shmem_internal_put_nb(ctx, target, source, \ sizeof(TYPE) * nelems, pe, \ - &completion); \ - shmem_internal_put_wait(ctx, &completion); \ + &completion, nic_idx); \ + shmem_internal_put_wait(ctx, &completion, nic_idx); \ shmem_internal_fence(ctx); \ if (sig_op == SHMEM_SIGNAL_ADD) \ shmem_internal_atomic(ctx, sig_addr, &signal, sizeof(uint64_t), \ pe, SHM_INTERNAL_SUM, \ - SHM_INTERNAL_UINT64); \ + SHM_INTERNAL_UINT64, nic_idx); \ else \ shmem_internal_atomic_set(ctx, sig_addr, &signal, \ sizeof(uint64_t), pe, \ - SHM_INTERNAL_UINT64); \ + SHM_INTERNAL_UINT64, nic_idx); \ } @@ -756,18 +786,21 @@ SHMEM_PROF_DEF_CTX_PUT_N_SIGNAL_NBI(`mem') sizeof(uint64_t), 0, \ (shmem_internal_my_pe == pe)); \ SHMEM_ERR_CHECK_SIG_OP(sig_op); \ + \ + size_t nic_idx = 0; \ + SHMEM_GET_TRANSMIT_NIC_IDX(nic_idx); \ shmem_internal_put_nb(ctx, target, source, (SIZE) * nelems, \ - pe, &completion); \ - shmem_internal_put_wait(ctx, &completion); \ + pe, &completion, nic_idx); \ + shmem_internal_put_wait(ctx, &completion, nic_idx); \ shmem_internal_fence(ctx); \ if (sig_op == SHMEM_SIGNAL_ADD) \ shmem_internal_atomic(ctx, sig_addr, &signal, sizeof(uint64_t), \ pe, SHM_INTERNAL_SUM, \ - SHM_INTERNAL_UINT64); \ + SHM_INTERNAL_UINT64, nic_idx); \ else \ shmem_internal_atomic_set(ctx, sig_addr, &signal, \ sizeof(uint64_t), pe, \ - SHM_INTERNAL_UINT64); \ + SHM_INTERNAL_UINT64, nic_idx); \ } #define SHMEM_DEF_PUT_SIGNAL_NBI(STYPE,TYPE) \ @@ -806,8 +839,12 @@ SHMEM_PROF_DEF_CTX_PUT_N_SIGNAL_NBI(`mem') sizeof(uint64_t), 0, \ (shmem_internal_my_pe == pe)); \ SHMEM_ERR_CHECK_SIG_OP(sig_op); \ + \ + size_t nic_idx = 0; \ + SHMEM_GET_TRANSMIT_NIC_IDX(nic_idx); \ shmem_internal_put_signal_nbi(ctx, target, source, (SIZE) * nelems, \ - sig_addr, signal, sig_op, pe); \ + sig_addr, signal, sig_op, \ + pe, nic_idx); \ } @@ -914,8 +951,10 @@ shmemx_signal_add(uint64_t *sig_addr, uint64_t signal, int pe) SHMEM_ERR_CHECK_PE(pe); SHMEM_ERR_CHECK_SYMMETRIC(sig_addr, sizeof(uint64_t)); + size_t nic_idx = 0; + SHMEM_GET_TRANSMIT_NIC_IDX(nic_idx); shmem_internal_atomic(SHMEM_CTX_DEFAULT, sig_addr, &signal, sizeof(uint64_t), - pe, SHM_INTERNAL_SUM, SHM_INTERNAL_UINT64); + pe, SHM_INTERNAL_SUM, SHM_INTERNAL_UINT64, nic_idx); } void SHMEM_FUNCTION_ATTRIBUTES @@ -926,8 +965,10 @@ shmemx_ctx_signal_add(shmem_ctx_t ctx, uint64_t *sig_addr, uint64_t signal, int SHMEM_ERR_CHECK_CTX(ctx); SHMEM_ERR_CHECK_SYMMETRIC(sig_addr, sizeof(uint64_t)); + size_t nic_idx = 0; + SHMEM_GET_TRANSMIT_NIC_IDX(nic_idx); shmem_internal_atomic(ctx, sig_addr, &signal, sizeof(uint64_t), - pe, SHM_INTERNAL_SUM, SHM_INTERNAL_UINT64); + pe, SHM_INTERNAL_SUM, SHM_INTERNAL_UINT64, nic_idx); } void SHMEM_FUNCTION_ATTRIBUTES @@ -937,8 +978,10 @@ shmemx_signal_set(uint64_t *sig_addr, uint64_t signal, int pe) SHMEM_ERR_CHECK_PE(pe); SHMEM_ERR_CHECK_SYMMETRIC(sig_addr, sizeof(uint64_t)); + size_t nic_idx = 0; + SHMEM_GET_TRANSMIT_NIC_IDX(nic_idx); shmem_internal_atomic_set(SHMEM_CTX_DEFAULT, (void *) sig_addr, &signal, - sizeof(uint64_t), pe, SHM_INTERNAL_UINT64); + sizeof(uint64_t), pe, SHM_INTERNAL_UINT64, nic_idx); } void SHMEM_FUNCTION_ATTRIBUTES @@ -949,8 +992,10 @@ shmemx_ctx_signal_set(shmem_ctx_t ctx, uint64_t *sig_addr, uint64_t signal, int SHMEM_ERR_CHECK_CTX(ctx); SHMEM_ERR_CHECK_SYMMETRIC(sig_addr, sizeof(uint64_t)); + size_t nic_idx = 0; + SHMEM_GET_TRANSMIT_NIC_IDX(nic_idx); shmem_internal_atomic_set(ctx, (void *) sig_addr, &signal, - sizeof(uint64_t), pe, SHM_INTERNAL_UINT64); + sizeof(uint64_t), pe, SHM_INTERNAL_UINT64, nic_idx); } void SHMEM_FUNCTION_ATTRIBUTES diff --git a/src/shmem_comm.h b/src/shmem_comm.h index 7e06ac5e5..b76a140a8 100644 --- a/src/shmem_comm.h +++ b/src/shmem_comm.h @@ -33,7 +33,7 @@ static inline void shmem_internal_put_nb(shmem_ctx_t ctx, void *target, const void *source, size_t len, int pe, - long *completion) + long *completion, size_t nic_idx) { if (len == 0) return; @@ -41,23 +41,23 @@ shmem_internal_put_nb(shmem_ctx_t ctx, void *target, const void *source, size_t if (shmem_shr_transport_use_write(ctx, target, source, len, pe)) { shmem_shr_transport_put(ctx, target, source, len, pe); } else { - shmem_transport_put_nb((shmem_transport_ctx_t *)ctx, target, source, len, pe, completion); + shmem_transport_put_nb((shmem_transport_ctx_t *)ctx, target, source, len, pe, completion, nic_idx); } } static inline void -shmem_internal_put_wait(shmem_ctx_t ctx, long *completion) +shmem_internal_put_wait(shmem_ctx_t ctx, long *completion, size_t nic_idx) { - shmem_transport_put_wait((shmem_transport_ctx_t *)ctx, completion); + shmem_transport_put_wait((shmem_transport_ctx_t *)ctx, completion, nic_idx); /* on-node is always blocking, so this is a no-op for them */ } static inline void -shmem_internal_put_scalar(shmem_ctx_t ctx, void *target, const void *source, size_t len, int pe) +shmem_internal_put_scalar(shmem_ctx_t ctx, void *target, const void *source, size_t len, int pe, size_t nic_idx) { shmem_internal_assert(len > 0); @@ -65,11 +65,11 @@ shmem_internal_put_scalar(shmem_ctx_t ctx, void *target, const void *source, siz shmem_shr_transport_put_scalar(ctx, target, source, len, pe); } else { #ifndef DISABLE_OFI_INJECT - shmem_transport_put_scalar((shmem_transport_ctx_t *)ctx, target, source, len, pe); + shmem_transport_put_scalar((shmem_transport_ctx_t *)ctx, target, source, len, pe, nic_idx); #else long completion = 0; - shmem_transport_put_nb((shmem_transport_ctx_t *)ctx, target, source, len, pe, &completion); - shmem_internal_put_wait(ctx, &completion); + shmem_transport_put_nb((shmem_transport_ctx_t *)ctx, target, source, len, pe, &completion, nic_idx); + shmem_internal_put_wait(ctx, &completion, nic_idx); #endif } } @@ -77,35 +77,35 @@ shmem_internal_put_scalar(shmem_ctx_t ctx, void *target, const void *source, siz static inline void shmem_internal_put_signal_nbi(shmem_ctx_t ctx, void *target, const void *source, size_t len, - uint64_t *sig_addr, uint64_t signal, int sig_op, int pe) + uint64_t *sig_addr, uint64_t signal, int sig_op, int pe, size_t nic_idx) { if (len == 0) { if (sig_op == SHMEM_SIGNAL_ADD) shmem_transport_atomic((shmem_transport_ctx_t *) ctx, sig_addr, &signal, sizeof(uint64_t), - pe, SHM_INTERNAL_SUM, SHM_INTERNAL_UINT64); + pe, SHM_INTERNAL_SUM, SHM_INTERNAL_UINT64, nic_idx); else shmem_transport_atomic_set((shmem_transport_ctx_t *) ctx, sig_addr, &signal, - sizeof(uint64_t), pe, SHM_INTERNAL_UINT64); + sizeof(uint64_t), pe, SHM_INTERNAL_UINT64, nic_idx); return; } if (shmem_shr_transport_use_write(ctx, target, source, len, pe)) { shmem_shr_transport_put_signal(ctx, target, source, len, sig_addr, signal, sig_op, pe); } else { - shmem_transport_put_signal_nbi((shmem_transport_ctx_t *) ctx, target, source, len, sig_addr, signal, sig_op, pe); + shmem_transport_put_signal_nbi((shmem_transport_ctx_t *) ctx, target, source, len, sig_addr, signal, sig_op, pe, nic_idx); } } static inline void -shmem_internal_put_nbi(shmem_ctx_t ctx, void *target, const void *source, size_t len, int pe) +shmem_internal_put_nbi(shmem_ctx_t ctx, void *target, const void *source, size_t len, int pe, size_t nic_idx) { if (len == 0) return; if (shmem_shr_transport_use_write(ctx, target, source, len, pe)) { shmem_shr_transport_put(ctx, target, source, len, pe); } else { - shmem_transport_put_nbi((shmem_transport_ctx_t *)ctx, target, source, len, pe); + shmem_transport_put_nbi((shmem_transport_ctx_t *)ctx, target, source, len, pe, nic_idx); } } @@ -130,7 +130,7 @@ shmem_internal_get(shmem_ctx_t ctx, void *target, const void *source, size_t len if (shmem_shr_transport_use_read(ctx, target, source, len, pe)) { shmem_shr_transport_get(ctx, target, source, len, pe); } else { - shmem_transport_get((shmem_transport_ctx_t *)ctx, target, source, len, pe); + shmem_transport_get((shmem_transport_ctx_t *)ctx, target, source, len, pe, nic_idx); } } @@ -142,7 +142,7 @@ shmem_internal_get_ct(shmemx_ct_t ct, void *target, const void *source, size_t l { /* TODO: add shortcut for on-node-comms */ shmem_transport_get_ct((shmem_transport_ct_t *) ct, - target, source, len, pe); + target, source, len, pe, nic_idx); } @@ -164,7 +164,7 @@ shmem_internal_swap(shmem_ctx_t ctx, void *target, void *source, void *dest, siz if (shmem_shr_transport_use_atomic(ctx, target, len, pe, datatype)) { shmem_shr_transport_swap(ctx, target, source, dest, len, pe, datatype); } else { - shmem_transport_swap((shmem_transport_ctx_t *)ctx, target, source, dest, len, pe, datatype); + shmem_transport_swap((shmem_transport_ctx_t *)ctx, target, source, dest, len, pe, datatype, nic_idx); } } @@ -197,7 +197,7 @@ shmem_internal_cswap(shmem_ctx_t ctx, void *target, void *source, void *dest, vo shmem_shr_transport_cswap(ctx, target, source, dest, operand, len, pe, datatype); } else { shmem_transport_cswap((shmem_transport_ctx_t *)ctx, target, source, - dest, operand, len, pe, datatype); + dest, operand, len, pe, datatype, nic_idx); } } @@ -230,7 +230,7 @@ shmem_internal_mswap(shmem_ctx_t ctx, void *target, void *source, void *dest, vo shmem_shr_transport_mswap(ctx, target, source, dest, mask, len, pe, datatype); } else { shmem_transport_mswap((shmem_transport_ctx_t *)ctx, target, source, - dest, mask, len, pe, datatype); + dest, mask, len, pe, datatype, nic_idx); } } @@ -238,7 +238,8 @@ shmem_internal_mswap(shmem_ctx_t ctx, void *target, void *source, void *dest, vo static inline void shmem_internal_atomic(shmem_ctx_t ctx, void *target, const void *source, size_t len, - int pe, shm_internal_op_t op, shm_internal_datatype_t datatype) + int pe, shm_internal_op_t op, shm_internal_datatype_t datatype, + size_t nic_idx) { shmem_internal_assert(len > 0); @@ -246,7 +247,7 @@ shmem_internal_atomic(shmem_ctx_t ctx, void *target, const void *source, size_t shmem_shr_transport_atomic(ctx, target, source, len, pe, op, datatype); } else { shmem_transport_atomic((shmem_transport_ctx_t *)ctx, target, source, - len, pe, op, datatype); + len, pe, op, datatype, nic_idx); } } @@ -262,7 +263,7 @@ shmem_internal_atomic_fetch(shmem_ctx_t ctx, void *target, const void *source, s shmem_shr_transport_atomic_fetch(ctx, target, source, len, pe, datatype); } else { shmem_transport_atomic_fetch((shmem_transport_ctx_t *)ctx, target, - source, len, pe, datatype); + source, len, pe, datatype, nic_idx); } } @@ -270,7 +271,7 @@ shmem_internal_atomic_fetch(shmem_ctx_t ctx, void *target, const void *source, s static inline void shmem_internal_atomic_set(shmem_ctx_t ctx, void *target, const void *source, size_t len, - int pe, shm_internal_datatype_t datatype) + int pe, shm_internal_datatype_t datatype, size_t nic_idx) { shmem_internal_assert(len > 0); @@ -278,7 +279,7 @@ shmem_internal_atomic_set(shmem_ctx_t ctx, void *target, const void *source, siz shmem_shr_transport_atomic_set(ctx, target, source, len, pe, datatype); } else { shmem_transport_atomic_set((shmem_transport_ctx_t *)ctx, target, - source, len, pe, datatype); + source, len, pe, datatype, nic_idx); } } @@ -287,7 +288,7 @@ static inline void shmem_internal_atomicv(shmem_ctx_t ctx, void *target, const void *source, size_t len, int pe, shm_internal_op_t op, - shm_internal_datatype_t datatype, long *completion) + shm_internal_datatype_t datatype, long *completion, size_t nic_idx) { shmem_internal_assert(len > 0); @@ -295,7 +296,7 @@ shmem_internal_atomicv(shmem_ctx_t ctx, void *target, const void *source, shmem_shr_transport_atomicv(ctx, target, source, len, pe, op, datatype); } else { shmem_transport_atomicv((shmem_transport_ctx_t *)ctx, target, source, len, - pe, op, datatype, completion); + pe, op, datatype, completion, nic_idx); } } @@ -314,7 +315,7 @@ shmem_internal_fetch_atomic(shmem_ctx_t ctx, void *target, void *source, void *d op, datatype); } else { shmem_transport_fetch_atomic((shmem_transport_ctx_t *)ctx, target, - source, dest, len, pe, op, datatype); + source, dest, len, pe, op, datatype, nic_idx); } } @@ -374,13 +375,13 @@ void shmem_internal_ct_wait(shmemx_ct_t ct, long wait_for) /* Uses internal put for external heap config; otherwise memcpy */ static inline -void shmem_internal_copy_self(void *dest, const void *source, size_t nelems) +void shmem_internal_copy_self(void *dest, const void *source, size_t nelems, size_t nic_idx) { #ifdef USE_FI_HMEM long completion = 0; shmem_internal_put_nb(SHMEM_CTX_DEFAULT, dest, source, nelems, - shmem_internal_my_pe, &completion); - shmem_internal_put_wait(SHMEM_CTX_DEFAULT, &completion); + shmem_internal_my_pe, &completion, nic_idx); + shmem_internal_put_wait(SHMEM_CTX_DEFAULT, &completion, nic_idx); #else memcpy(dest, source, nelems); #endif diff --git a/src/shmem_internal.h b/src/shmem_internal.h index 59bc7549c..377a6814e 100644 --- a/src/shmem_internal.h +++ b/src/shmem_internal.h @@ -195,8 +195,7 @@ extern hwloc_topology_t shmem_internal_topology; do { \ int rand_int = rand_r(&shmem_internal_rand_seed); \ double normalized = (double)rand_int / (double)RAND_MAX; \ - int range = shmem_transport_ofi_num_nics - 1; \ - idx = (int)(normalized * range); \ + idx = (int)(normalized * shmem_transport_ofi_num_nics); \ } while (0) #else #define SHMEM_GET_TRANSMIT_NIC_IDX(idx) diff --git a/src/shr_transport.h4 b/src/shr_transport.h4 index 9379ef2e5..fd7db7633 100644 --- a/src/shr_transport.h4 +++ b/src/shr_transport.h4 @@ -566,7 +566,8 @@ SHMEM_DEFINE_FOR_AMO(SHMEM_DEF_SUM_OP) static inline void shmem_shr_transport_put_signal(shmem_ctx_t ctx, void *target, const void *source, size_t len, - uint64_t *sig_addr, uint64_t signal, int sig_op, int pe) + uint64_t *sig_addr, uint64_t signal, int sig_op, int pe, + size_t nic_idx) { #if USE_MEMCPY memcpy(target, source, len); @@ -587,10 +588,10 @@ shmem_shr_transport_put_signal(shmem_ctx_t ctx, void *target, #else if (sig_op == SHMEM_SIGNAL_ADD) shmem_transport_atomic((shmem_transport_ctx_t *) ctx, sig_addr, &signal, sizeof(uint64_t), - pe, SHM_INTERNAL_SUM, SHM_INTERNAL_UINT64); + pe, SHM_INTERNAL_SUM, SHM_INTERNAL_UINT64, nic_idx); else shmem_transport_atomic_set((shmem_transport_ctx_t *) ctx, sig_addr, &signal, - sizeof(uint64_t), pe, SHM_INTERNAL_UINT64); + sizeof(uint64_t), pe, SHM_INTERNAL_UINT64, nic_idx); #endif #elif USE_CMA shmem_transport_cma_put(target, source, len, pe, @@ -600,10 +601,10 @@ shmem_shr_transport_put_signal(shmem_ctx_t ctx, void *target, /* Using network atomics as CMA does not support atomic operations */ if (sig_op == SHMEM_SIGNAL_ADD) shmem_transport_atomic((shmem_transport_ctx_t *) ctx, sig_addr, &signal, sizeof(uint64_t), - pe, SHM_INTERNAL_SUM, SHM_INTERNAL_UINT64); + pe, SHM_INTERNAL_SUM, SHM_INTERNAL_UINT64, nic_idx); else shmem_transport_atomic_set((shmem_transport_ctx_t *) ctx, sig_addr, &signal, - sizeof(uint64_t), pe, SHM_INTERNAL_UINT64); + sizeof(uint64_t), pe, SHM_INTERNAL_UINT64, nic_idx); #else RAISE_ERROR_STR("No path to peer"); #endif diff --git a/src/transport_none.h b/src/transport_none.h index f0d517d07..e336a20e9 100644 --- a/src/transport_none.h +++ b/src/transport_none.h @@ -112,7 +112,7 @@ shmem_transport_fence(shmem_transport_ctx_t* ctx) static inline void -shmem_transport_put_scalar(shmem_transport_ctx_t* ctx, void *target, const void *source, size_t len, int pe) +shmem_transport_put_scalar(shmem_transport_ctx_t* ctx, void *target, const void *source, size_t len, int pe, size_t nic_idx) { RAISE_ERROR_STR("No path to peer"); } @@ -128,14 +128,14 @@ shmem_transport_put_nb(shmem_transport_ctx_t* ctx, void *target, const void *sou static inline void shmem_transport_put_signal_nbi(shmem_transport_ctx_t* ctx, void *target, const void *source, size_t len, - uint64_t *sig_addr, uint64_t signal, int sig_op, int pe) + uint64_t *sig_addr, uint64_t signal, int sig_op, int pe, size_t nic_idx) { RAISE_ERROR_STR("No path to peer"); } static inline void -shmem_transport_put_wait(shmem_transport_ctx_t* ctx, long *completion) +shmem_transport_put_wait(shmem_transport_ctx_t* ctx, long *completion, size_t nic_idx) { /* No op */ } @@ -143,14 +143,14 @@ shmem_transport_put_wait(shmem_transport_ctx_t* ctx, long *completion) static inline void shmem_transport_put_nbi(shmem_transport_ctx_t* ctx, void *target, const void *source, size_t len, - int pe) + int pe, size_t nic_idx) { RAISE_ERROR_STR("No path to peer"); } static inline void -shmem_transport_get(shmem_transport_ctx_t* ctx, void *target, const void *source, size_t len, int pe) +shmem_transport_get(shmem_transport_ctx_t* ctx, void *target, const void *source, size_t len, int pe, size_t nic_idx) { RAISE_ERROR_STR("No path to peer"); } @@ -166,7 +166,7 @@ shmem_transport_get_wait(shmem_transport_ctx_t* ctx, size_t idx) static inline void shmem_transport_swap(shmem_transport_ctx_t* ctx, void *target, const void *source, void *dest, - size_t len, int pe, shm_internal_datatype_t datatype) + size_t len, int pe, shm_internal_datatype_t datatype, size_t nic_idx) { RAISE_ERROR_STR("No path to peer"); } @@ -183,7 +183,7 @@ static inline void shmem_transport_cswap(shmem_transport_ctx_t* ctx, void *target, const void *source, void *dest, const void *operand, size_t len, int pe, - shm_internal_datatype_t datatype) + shm_internal_datatype_t datatype, size_t nic_idx) { RAISE_ERROR_STR("No path to peer"); } @@ -201,7 +201,7 @@ static inline void shmem_transport_mswap(shmem_transport_ctx_t* ctx, void *target, const void *source, void *dest, const void *mask, size_t len, int pe, - shm_internal_datatype_t datatype) + shm_internal_datatype_t datatype, size_t nic_idx) { RAISE_ERROR_STR("No path to peer"); } @@ -209,7 +209,7 @@ shmem_transport_mswap(shmem_transport_ctx_t* ctx, void *target, const void *sour static inline void shmem_transport_atomic(shmem_transport_ctx_t* ctx, void *target, const void *source, size_t len, - int pe, shm_internal_op_t op, shm_internal_datatype_t datatype) + int pe, shm_internal_op_t op, shm_internal_datatype_t datatype, size_t nic_idx) { RAISE_ERROR_STR("No path to peer"); } @@ -225,7 +225,7 @@ shmem_transport_atomicv(shmem_transport_ctx_t* ctx, void *target, const void *so static inline void shmem_transport_fetch_atomic(shmem_transport_ctx_t* ctx, void *target, const void *source, void *dest, size_t len, - int pe, shm_internal_op_t op, shm_internal_datatype_t datatype) + int pe, shm_internal_op_t op, shm_internal_datatype_t datatype, size_t nic_idx) { RAISE_ERROR_STR("No path to peer"); } @@ -241,7 +241,7 @@ shmem_transport_fetch_atomic_nbi(shmem_transport_ctx_t* ctx, void *target, const static inline void shmem_transport_atomic_fetch(shmem_transport_ctx_t* ctx, void *target, const void *source, size_t len, - int pe, shm_internal_datatype_t datatype) + int pe, shm_internal_datatype_t datatype, size_t nic_idx) { RAISE_ERROR_STR("No path to peer"); } @@ -249,7 +249,7 @@ shmem_transport_atomic_fetch(shmem_transport_ctx_t* ctx, void *target, const voi static inline void shmem_transport_atomic_set(shmem_transport_ctx_t* ctx, void *target, const void *source, size_t len, - int pe, shm_internal_datatype_t datatype) + int pe, shm_internal_datatype_t datatype, size_t nic_idx) { RAISE_ERROR_STR("No path to peer"); } @@ -301,7 +301,8 @@ shmem_transport_put_ct_nb(shmem_transport_ct_t *ct, void *target, const void static inline void shmem_transport_get_ct(shmem_transport_ct_t *ct, void - *target, const void *source, size_t len, int pe) + *target, const void *source, size_t len, int pe, + size_t nic_idx) { RAISE_ERROR_STR("No path to peer"); } diff --git a/src/transport_ofi.c b/src/transport_ofi.c index 26d871ef8..274b29bd2 100644 --- a/src/transport_ofi.c +++ b/src/transport_ofi.c @@ -629,8 +629,9 @@ int bind_enable_ep_resources(shmem_transport_ctx_t *ctx, size_t idx) FI_SELECTIVE_COMPLETION | FI_TRANSMIT | FI_RECV); OFI_CHECK_RETURN_STR(ret, "fi_ep_bind CQ to endpoint failed"); - ret = fi_ep_bind(ctx->ep[idx], &shmem_transport_ofi_avfd->fid, 0); - OFI_CHECK_RETURN_STR(ret, "fi_ep_bind AV to endpoint failed"); + ret = fi_ep_bind(ctx->ep[idx], /*&shmem_transport_ofi_avfd->fid*/ &ctx->av[idx]->fid, 0); /* Currently failing */ + //OFI_CHECK_RETURN_STR(ret, "fi_ep_bind AV to endpoint failed"); + OFI_CHECK_RETURN_MSG(ret, "fi_ep_bind AV to endpoint failed(%s)\n", fi_strerror(errno)); ret = fi_enable(ctx->ep[idx]); OFI_CHECK_RETURN_STR(ret, "fi_enable on endpoint failed"); @@ -1251,11 +1252,12 @@ int publish_av_info(struct fabric_info *info) return ret; } +char * alladdrs = NULL; static inline int populate_av(void) { int i, ret, err = 0; - char *alladdrs = NULL; + //char *alladdrs = NULL; alladdrs = malloc(shmem_internal_num_pes * shmem_transport_ofi_addrlen); if (alladdrs == NULL) { @@ -1282,7 +1284,7 @@ int populate_av(void) return ret; } - free(alladdrs); + //free(alladdrs); return 0; } @@ -1765,6 +1767,9 @@ static int shmem_transport_ofi_ctx_init(shmem_transport_ctx_t *ctx, int id) //info->p_info->rx_attr->caps = FI_RECV; /* to drive progress on the CQ */; ctx->id = id; + ctx->fabric = (struct fid_fabric **) malloc(shmem_transport_ofi_num_nics * sizeof(struct fid_fabric *)); + ctx->domain = (struct fid_domain **) malloc(shmem_transport_ofi_num_nics * sizeof(struct fid_domain *)); + ctx->av = (struct fid_av **) malloc(shmem_transport_ofi_num_nics * sizeof(struct fid_av *)); ctx->ep = (struct fid_ep **) malloc(shmem_transport_ofi_num_nics * sizeof(struct fid_ep *)); ctx->put_cntr = (struct fid_cntr **) malloc(shmem_transport_ofi_num_nics * sizeof(struct fid_cntr *)); ctx->get_cntr = (struct fid_cntr **) malloc(shmem_transport_ofi_num_nics * sizeof(struct fid_cntr *)); @@ -1778,41 +1783,65 @@ static int shmem_transport_ofi_ctx_init(shmem_transport_ctx_t *ctx, int id) ctx->cq = (struct fid_cq **) malloc(shmem_transport_ofi_num_nics * sizeof(struct fid_cq *)); for (size_t idx = 0; idx < shmem_transport_ofi_num_nics; idx++) { #ifdef USE_CTX_LOCK - ctx->pending_put_cntr[idx] = 0; - ctx->pending_get_cntr[idx] = 0; + ctx->pending_put_cntr[idx] = 0; + ctx->pending_get_cntr[idx] = 0; #else - shmem_internal_cntr_write(&ctx->pending_put_cntr[idx], 0); - shmem_internal_cntr_write(&ctx->pending_get_cntr[idx], 0); + shmem_internal_cntr_write(&ctx->pending_put_cntr[idx], 0); + shmem_internal_cntr_write(&ctx->pending_get_cntr[idx], 0); #endif - /* FIX */ - //shmem_transport_ofi_eps[idx]->info->ep_attr->tx_ctx_cnt = shmem_transport_ofi_stx_max > 0 ? FI_SHARED_CONTEXT : 0; - //shmem_transport_ofi_eps[idx]->info->caps = FI_RMA | FI_WRITE | FI_READ | FI_ATOMIC | FI_RECV; - //shmem_transport_ofi_eps[idx]->info->tx_attr->op_flags = FI_DELIVERY_COMPLETE; - //shmem_transport_ofi_eps[idx]->info->mode = 0; - //shmem_transport_ofi_eps[idx]->info->tx_attr->mode = 0; - //shmem_transport_ofi_eps[idx]->info->rx_attr->mode = 0; - //shmem_transport_ofi_eps[idx]->info->tx_attr->caps = info->p_info->caps; - //shmem_transport_ofi_eps[idx]->info->rx_attr->caps = FI_RECV; /* to drive progress on the CQ */; + provider_list[idx]->ep_attr->tx_ctx_cnt = shmem_transport_ofi_stx_max > 0 ? FI_SHARED_CONTEXT : 0; + provider_list[idx]->caps = FI_RMA | FI_WRITE | FI_READ | FI_ATOMIC | FI_RECV; + provider_list[idx]->tx_attr->op_flags = FI_DELIVERY_COMPLETE; + provider_list[idx]->mode = 0; + provider_list[idx]->tx_attr->mode = 0; + provider_list[idx]->rx_attr->mode = 0; + provider_list[idx]->tx_attr->caps = provider_list[idx]->caps; + provider_list[idx]->rx_attr->caps = FI_RECV; /* to drive progress on the CQ */; #ifdef USE_CTX_LOCK SHMEM_MUTEX_INIT(ctx->lock); #endif + ret = fi_fabric(provider_list[idx]->fabric_attr, &ctx->fabric[idx], NULL); + OFI_CHECK_RETURN_STR(ret, "fabric initialization failed"); - ret = fi_cntr_open(shmem_transport_ofi_domainfd, &cntr_put_attr, + ret = fi_domain(/*shmem_transport_ofi_fabfd*/ ctx->fabric[idx], provider_list[idx], + &ctx->domain[idx], NULL); + OFI_CHECK_RETURN_STR(ret, "domain initialization failed"); + + struct fi_av_attr av_attr = {0}; +#ifdef USE_AV_MAP + av_attr.type = FI_AV_MAP; +#else + av_attr.type = FI_AV_TABLE; +#endif + ret = fi_av_open(/*shmem_transport_ofi_domainfd*/ ctx->domain[idx], + &av_attr, + /*&shmem_transport_ofi_avfd*/ &ctx->av[idx], + NULL); + OFI_CHECK_RETURN_STR(ret, "AV creation failed"); + + ret = fi_av_insert(/*shmem_transport_ofi_avfd*/ ctx->av[idx], + alladdrs, + shmem_internal_num_pes, + addr_table, + 0, + NULL); + + ret = fi_cntr_open(/*shmem_transport_ofi_domainfd*/ ctx->domain[idx], &cntr_put_attr, &ctx->put_cntr[idx], NULL); OFI_CHECK_RETURN_MSG(ret, "put_cntr creation failed (%s)\n", fi_strerror(errno)); - ret = fi_cntr_open(shmem_transport_ofi_domainfd, &cntr_get_attr, + ret = fi_cntr_open(/*shmem_transport_ofi_domainfd*/ ctx->domain[idx], &cntr_get_attr, &ctx->get_cntr[idx], NULL); OFI_CHECK_RETURN_MSG(ret, "get_cntr creation failed (%s)\n", fi_strerror(errno)); - ret = fi_cq_open(shmem_transport_ofi_domainfd, &cq_attr, &ctx->cq[idx], NULL); + ret = fi_cq_open(/*shmem_transport_ofi_domainfd*/ ctx->domain[idx], &cq_attr, &ctx->cq[idx], NULL); if (ret && errno == FI_EMFILE) { DEBUG_STR("Context creation failed because of open files limit, consider increasing with 'ulimit' command"); } OFI_CHECK_RETURN_MSG(ret, "cq_open failed (%s)\n", fi_strerror(errno)); - ret = fi_endpoint(shmem_transport_ofi_domainfd, - info->p_info, &ctx->ep[idx], NULL); + ret = fi_endpoint(/*shmem_transport_ofi_domainfd*/ ctx->domain[idx], + /*info->p_info*/ provider_list[idx], &ctx->ep[idx], NULL); OFI_CHECK_RETURN_MSG(ret, "ep creation failed (%s)\n", fi_strerror(errno)); } diff --git a/src/transport_ofi.h b/src/transport_ofi.h index dcd288af8..b01df5cd5 100644 --- a/src/transport_ofi.h +++ b/src/transport_ofi.h @@ -264,6 +264,9 @@ struct shmem_transport_ctx_t { shmem_internal_mutex_t lock; #endif long options; + struct fid_fabric** fabric; + struct fid_domain** domain; + struct fid_av** av; struct fid_ep** ep; struct fid_cntr** put_cntr; struct fid_cntr** get_cntr; @@ -455,9 +458,9 @@ void shmem_transport_put_quiet(shmem_transport_ctx_t* ctx) fail = 0; for (size_t idx = 0; idx < shmem_transport_ofi_num_nics; idx++) { - success = fi_cntr_read(ctx->put_cntr[idx]); /* FIX */ - fail = fi_cntr_readerr(ctx->put_cntr[idx]); /* FIX */ - cnt = SHMEM_TRANSPORT_OFI_CNTR_READ(&ctx->pending_put_cntr[idx]); /* FIX */ + success = fi_cntr_read(ctx->put_cntr[idx]); /* FIXED? */ + fail = fi_cntr_readerr(ctx->put_cntr[idx]); /* FIXED? */ + cnt = SHMEM_TRANSPORT_OFI_CNTR_READ(&ctx->pending_put_cntr[idx]); /* FIXED? */ } shmem_transport_probe(); @@ -474,11 +477,11 @@ void shmem_transport_put_quiet(shmem_transport_ctx_t* ctx) poll_count++; } for (size_t idx = 0; idx < shmem_transport_ofi_num_nics; idx++) { - cnt_new = SHMEM_TRANSPORT_OFI_CNTR_READ(&ctx->pending_put_cntr[idx]); /* FIX */ + cnt_new = SHMEM_TRANSPORT_OFI_CNTR_READ(&ctx->pending_put_cntr[idx]); /* FIXED? */ do { cnt = cnt_new; - ssize_t ret = fi_cntr_wait(ctx->put_cntr[idx], cnt, -1); /* FIX */ - cnt_new = SHMEM_TRANSPORT_OFI_CNTR_READ(&ctx->pending_put_cntr[idx]); /* FIX */ + ssize_t ret = fi_cntr_wait(ctx->put_cntr[idx], cnt, -1); /* FIXED? */ + cnt_new = SHMEM_TRANSPORT_OFI_CNTR_READ(&ctx->pending_put_cntr[idx]); /* FIXED? */ OFI_CTX_CHECK_ERROR(ctx, ret); } while (cnt < cnt_new); shmem_internal_assert(cnt == cnt_new); @@ -521,8 +524,7 @@ int shmem_transport_fence(shmem_transport_ctx_t* ctx) * to reclaim resources and indicate that the operation should be retried. If * retry limit (ofi_max_poll) is exceeded, abort. */ static inline -int try_again(shmem_transport_ctx_t *ctx, const int ret, uint64_t *polled) { - +int try_again(shmem_transport_ctx_t *ctx, const int ret, uint64_t *polled, size_t nic_idx) { if (ret) { if (ret == -FI_EAGAIN) { if (ctx->bounce_buffers) { @@ -533,9 +535,9 @@ int try_again(shmem_transport_ctx_t *ctx, const int ret, uint64_t *polled) { else { /* Poke CQ for errors to encourage progress */ struct fi_cq_err_entry e = {0}; - ssize_t ret = fi_cq_readerr(ctx->cq, (void *)&e, 0); + ssize_t ret = fi_cq_readerr(ctx->cq[nic_idx], (void *)&e, 0); /* FIXED? */ if (ret == 1) { - const char *errmsg = fi_cq_strerror(ctx->cq, e.prov_errno, + const char *errmsg = fi_cq_strerror(ctx->cq[nic_idx], e.prov_errno, /* FIXED? */ e.err_data, NULL, 0); RAISE_ERROR_MSG("Error in operation: %s\n", errmsg); } else if (ret && ret != -FI_EAGAIN) { @@ -566,7 +568,7 @@ int try_again(shmem_transport_ctx_t *ctx, const int ret, uint64_t *polled) { static inline void shmem_transport_put_scalar(shmem_transport_ctx_t* ctx, void *target, const - void *source, size_t len, int pe) + void *source, size_t len, int pe, size_t nic_idx) { int ret = 0; uint64_t dst = (uint64_t) pe; @@ -579,24 +581,24 @@ void shmem_transport_put_scalar(shmem_transport_ctx_t* ctx, void *target, const shmem_internal_assert(len <= shmem_transport_ofi_max_buffered_send); SHMEM_TRANSPORT_OFI_CTX_LOCK(ctx); - SHMEM_TRANSPORT_OFI_CNTR_INC(&ctx->pending_put_cntr[1]); /* FIX */ + SHMEM_TRANSPORT_OFI_CNTR_INC(&ctx->pending_put_cntr[nic_idx]); /* FIXED? */ do { - ret = fi_inject_write(ctx->ep[1], /* FIX */ + ret = fi_inject_write(ctx->ep[nic_idx], /* FIXED? */ source, len, GET_DEST(dst), (uint64_t) addr, key); - } while (try_again(ctx, ret, &polled)); + } while (try_again(ctx, ret, &polled, nic_idx)); /* FIXED? */ SHMEM_TRANSPORT_OFI_CTX_UNLOCK(ctx); } static inline void shmem_transport_ofi_put_large(shmem_transport_ctx_t* ctx, void *target, const void *source, - size_t len, int pe) + size_t len, int pe, size_t nic_idx) { int ret = 0; uint64_t dst = (uint64_t) pe; @@ -618,14 +620,14 @@ void shmem_transport_ofi_put_large(shmem_transport_ctx_t* ctx, void *target, con (size_t) (((uint8_t *) source) + len - frag_source)); polled = 0; - SHMEM_TRANSPORT_OFI_CNTR_INC(&ctx->pending_put_cntr[1]); /* FIX */ + SHMEM_TRANSPORT_OFI_CNTR_INC(&ctx->pending_put_cntr[nic_idx]); /* FIXED? */ do { - ret = fi_write(ctx->ep[1], /* FIX */ + ret = fi_write(ctx->ep[nic_idx], /* FIXED? */ frag_source, frag_len, NULL, GET_DEST(dst), frag_target, key, NULL); - } while (try_again(ctx, ret, &polled)); + } while (try_again(ctx, ret, &polled, nic_idx)); /* FIXED? */ frag_source += frag_len; frag_target += frag_len; @@ -635,7 +637,7 @@ void shmem_transport_ofi_put_large(shmem_transport_ctx_t* ctx, void *target, con static inline void shmem_transport_put_nb(shmem_transport_ctx_t* ctx, void *target, const void *source, size_t len, - int pe, long *completion) + int pe, long *completion, size_t nic_idx) { int ret = 0; uint64_t dst = (uint64_t) pe; @@ -647,12 +649,12 @@ void shmem_transport_put_nb(shmem_transport_ctx_t* ctx, void *target, const void if (len <= shmem_transport_ofi_max_buffered_send) { - shmem_transport_put_scalar(ctx, target, source, len, pe); + shmem_transport_put_scalar(ctx, target, source, len, pe, nic_idx); } else if (len <= shmem_transport_ofi_bounce_buffer_size && ctx->bounce_buffers) { SHMEM_TRANSPORT_OFI_CTX_LOCK(ctx); - SHMEM_TRANSPORT_OFI_CNTR_INC(&ctx->pending_put_cntr[1]); /* FIX */ + SHMEM_TRANSPORT_OFI_CNTR_INC(&ctx->pending_put_cntr[nic_idx]); /* FIXED? */ shmem_transport_ofi_get_mr(target, pe, &addr, &key); shmem_transport_ofi_bounce_buffer_t *buff = @@ -672,19 +674,19 @@ void shmem_transport_put_nb(shmem_transport_ctx_t* ctx, void *target, const void .data = 0 }; do { - ret = fi_writemsg(ctx->ep[1], &msg, FI_COMPLETION | FI_DELIVERY_COMPLETE); /* FIX */ - } while (try_again(ctx, ret, &polled)); + ret = fi_writemsg(ctx->ep[nic_idx], &msg, FI_COMPLETION | FI_DELIVERY_COMPLETE); /* FIXED? */ + } while (try_again(ctx, ret, &polled, nic_idx)); /* FIXED? */ SHMEM_TRANSPORT_OFI_CTX_UNLOCK(ctx); } else { - shmem_transport_ofi_put_large(ctx, target, source,len, pe); + shmem_transport_ofi_put_large(ctx, target, source,len, pe, nic_idx); (*completion)++; } } static inline void shmem_transport_put_signal_nbi(shmem_transport_ctx_t* ctx, void *target, const void *source, size_t len, - uint64_t *sig_addr, uint64_t signal, int sig_op, int pe) + uint64_t *sig_addr, uint64_t signal, int sig_op, int pe, size_t nic_idx) { int ret = 0; uint64_t dst = (uint64_t) pe; @@ -721,8 +723,8 @@ void shmem_transport_put_signal_nbi(shmem_transport_ctx_t* ctx, void *target, co }; do { - ret = fi_writemsg(ctx->ep[1], &msg, FI_DELIVERY_COMPLETE | FI_INJECT); /* FIX */ - } while (try_again(ctx, ret, &polled)); + ret = fi_writemsg(ctx->ep[nic_idx], &msg, FI_DELIVERY_COMPLETE | FI_INJECT); /* FIXED? */ + } while (try_again(ctx, ret, &polled, nic_idx)); /* FIXED? */ SHMEM_TRANSPORT_OFI_CTX_UNLOCK(ctx); } else { @@ -766,11 +768,11 @@ void shmem_transport_put_signal_nbi(shmem_transport_ctx_t* ctx, void *target, co msg.rma_iov = &rma_iov; msg.context = frag_source; - SHMEM_TRANSPORT_OFI_CNTR_INC(&ctx->pending_put_cntr[1]); /* FIX */ + SHMEM_TRANSPORT_OFI_CNTR_INC(&ctx->pending_put_cntr[nic_idx]); /* FIXED? */ do { - ret = fi_writemsg(ctx->ep[1], &msg, FI_DELIVERY_COMPLETE); /* FIX */ - } while (try_again(ctx, ret, &polled)); + ret = fi_writemsg(ctx->ep[nic_idx], &msg, FI_DELIVERY_COMPLETE); /* FIXED? */ + } while (try_again(ctx, ret, &polled, nic_idx)); /* FIXED? */ frag_source += frag_len; frag_target += frag_len; @@ -820,14 +822,14 @@ void shmem_transport_put_signal_nbi(shmem_transport_ctx_t* ctx, void *target, co do { ret = fi_atomicmsg(ctx->ep[1], &msg_signal, flags_signal); /* FIX */ - } while (try_again(ctx, ret, &polled)); + } while (try_again(ctx, ret, &polled, 1)); /* FIX */ SHMEM_TRANSPORT_OFI_CTX_UNLOCK(ctx); } /* compatibility with Portals transport */ static inline -void shmem_transport_put_wait(shmem_transport_ctx_t* ctx, long *completion) { +void shmem_transport_put_wait(shmem_transport_ctx_t* ctx, long *completion, size_t nic_idx) { shmem_internal_assert((*completion) >= 0); @@ -839,21 +841,21 @@ void shmem_transport_put_wait(shmem_transport_ctx_t* ctx, long *completion) { static inline void shmem_transport_put_nbi(shmem_transport_ctx_t* ctx, void *target, const void *source, size_t len, - int pe) + int pe, size_t nic_idx) { if (len <= shmem_transport_ofi_max_buffered_send) { - shmem_transport_put_scalar(ctx, target, source, len, pe); + shmem_transport_put_scalar(ctx, target, source, len, pe, nic_idx); } else { - shmem_transport_ofi_put_large(ctx, target, source, len, pe); + shmem_transport_ofi_put_large(ctx, target, source, len, pe, nic_idx); } } static inline -void shmem_transport_get(shmem_transport_ctx_t* ctx, void *target, const void *source, size_t len, int pe) +void shmem_transport_get(shmem_transport_ctx_t* ctx, void *target, const void *source, size_t len, int pe, size_t nic_idx) { int ret = 0; uint64_t dst = (uint64_t) pe; @@ -866,9 +868,9 @@ void shmem_transport_get(shmem_transport_ctx_t* ctx, void *target, const void *s SHMEM_TRANSPORT_OFI_CTX_LOCK(ctx); if (len <= shmem_transport_ofi_max_msg_size) { - SHMEM_TRANSPORT_OFI_CNTR_INC(&ctx->pending_get_cntr[1]); /* FIX */ + SHMEM_TRANSPORT_OFI_CNTR_INC(&ctx->pending_get_cntr[nic_idx]); /* FIXED? */ do { - ret = fi_read(ctx->ep[1], /* FIX */ + ret = fi_read(ctx->ep[nic_idx], /* FIXED? */ target, len, NULL, @@ -876,7 +878,7 @@ void shmem_transport_get(shmem_transport_ctx_t* ctx, void *target, const void *s (uint64_t) addr, key, NULL); - } while (try_again(ctx, ret, &polled)); + } while (try_again(ctx, ret, &polled, nic_idx)); /* FIXED? */ } else { uint8_t *frag_target = (uint8_t *) target; @@ -888,14 +890,14 @@ void shmem_transport_get(shmem_transport_ctx_t* ctx, void *target, const void *s (size_t) (((uint8_t *) target) + len - frag_target)); polled = 0; - SHMEM_TRANSPORT_OFI_CNTR_INC(&ctx->pending_get_cntr[1]); /* FIX */ + SHMEM_TRANSPORT_OFI_CNTR_INC(&ctx->pending_get_cntr[nic_idx]); /* FIXED? */ do { - ret = fi_read(ctx->ep[1], /* FIX */ + ret = fi_read(ctx->ep[nic_idx], /* FIXED? */ frag_target, frag_len, NULL, GET_DEST(dst), frag_source, key, NULL); - } while (try_again(ctx, ret, &polled)); + } while (try_again(ctx, ret, &polled, nic_idx)); /* FIXED? */ frag_source += frag_len; frag_target += frag_len; @@ -1000,14 +1002,14 @@ void shmem_transport_cswap_nbi(shmem_transport_ctx_t* ctx, void *target, const 1, FI_INJECT); /* FI_DELIVERY_COMPLETE is not required as it is implied for fetch atomicmsgs */ - } while (try_again(ctx, ret, &polled)); + } while (try_again(ctx, ret, &polled, 1)); /* FIX */ SHMEM_TRANSPORT_OFI_CTX_UNLOCK(ctx); } static inline void shmem_transport_cswap(shmem_transport_ctx_t* ctx, void *target, const void *source, void *dest, - const void *operand, size_t len, int pe, int datatype) + const void *operand, size_t len, int pe, int datatype, size_t nic_idx) { #ifdef ENABLE_MR_ENDPOINT /* CXI provider currently does not support fetch atomics with FI_DELIVERY_COMPLETE @@ -1028,10 +1030,10 @@ void shmem_transport_cswap(shmem_transport_ctx_t* ctx, void *target, const void shmem_internal_assert(SHMEM_Dtsize[SHMEM_TRANSPORT_DTYPE(datatype)] == len); SHMEM_TRANSPORT_OFI_CTX_LOCK(ctx); - SHMEM_TRANSPORT_OFI_CNTR_INC(&ctx->pending_get_cntr[1]); /* FIX */ + SHMEM_TRANSPORT_OFI_CNTR_INC(&ctx->pending_get_cntr[nic_idx]); /* FIXED? */ do { - ret = fi_compare_atomic(ctx->ep[1], /* FIX */ + ret = fi_compare_atomic(ctx->ep[nic_idx], /* FIXED? */ source, 1, NULL, @@ -1045,7 +1047,7 @@ void shmem_transport_cswap(shmem_transport_ctx_t* ctx, void *target, const void SHMEM_TRANSPORT_DTYPE(datatype), FI_CSWAP, NULL); - } while (try_again(ctx, ret, &polled)); + } while (try_again(ctx, ret, &polled, nic_idx)); /* FIXED? */ SHMEM_TRANSPORT_OFI_CTX_UNLOCK(ctx); #endif } @@ -1053,7 +1055,7 @@ void shmem_transport_cswap(shmem_transport_ctx_t* ctx, void *target, const void static inline void shmem_transport_mswap(shmem_transport_ctx_t* ctx, void *target, const void *source, void *dest, - const void *mask, size_t len, int pe, int datatype) + const void *mask, size_t len, int pe, int datatype, size_t nic_idx) { int ret = 0; uint64_t dst = (uint64_t) pe; @@ -1067,10 +1069,10 @@ void shmem_transport_mswap(shmem_transport_ctx_t* ctx, void *target, const void shmem_internal_assert(SHMEM_Dtsize[SHMEM_TRANSPORT_DTYPE(datatype)] == len); SHMEM_TRANSPORT_OFI_CTX_LOCK(ctx); - SHMEM_TRANSPORT_OFI_CNTR_INC(&ctx->pending_get_cntr[1]); /* FIX */ + SHMEM_TRANSPORT_OFI_CNTR_INC(&ctx->pending_get_cntr[nic_idx]); /* FIXED? */ do { - ret = fi_compare_atomic(ctx->ep[1], /* FIX */ + ret = fi_compare_atomic(ctx->ep[nic_idx], /* FIXED? */ source, 1, NULL, @@ -1084,14 +1086,14 @@ void shmem_transport_mswap(shmem_transport_ctx_t* ctx, void *target, const void SHMEM_TRANSPORT_DTYPE(datatype), FI_MSWAP, NULL); - } while (try_again(ctx, ret, &polled)); + } while (try_again(ctx, ret, &polled, nic_idx)); /* FIXED? */ SHMEM_TRANSPORT_OFI_CTX_UNLOCK(ctx); } static inline void shmem_transport_atomic(shmem_transport_ctx_t* ctx, void *target, const void *source, size_t len, - int pe, int op, int datatype) + int pe, int op, int datatype, size_t nic_idx) { int ret = 0; uint64_t dst = (uint64_t) pe; @@ -1104,10 +1106,10 @@ void shmem_transport_atomic(shmem_transport_ctx_t* ctx, void *target, const void shmem_internal_assert(SHMEM_Dtsize[SHMEM_TRANSPORT_DTYPE(datatype)] == len); SHMEM_TRANSPORT_OFI_CTX_LOCK(ctx); - SHMEM_TRANSPORT_OFI_CNTR_INC(&ctx->pending_put_cntr[1]); /* FIX */ + SHMEM_TRANSPORT_OFI_CNTR_INC(&ctx->pending_put_cntr[nic_idx]); /* FIXED? */ do { - ret = fi_inject_atomic(ctx->ep[1], /* FIX */ + ret = fi_inject_atomic(ctx->ep[nic_idx], /* FIXED? */ source, 1, GET_DEST(dst), @@ -1115,7 +1117,7 @@ void shmem_transport_atomic(shmem_transport_ctx_t* ctx, void *target, const void key, SHMEM_TRANSPORT_DTYPE(datatype), op); - } while (try_again(ctx, ret, &polled)); + } while (try_again(ctx, ret, &polled, nic_idx)); /* FIXED? */ SHMEM_TRANSPORT_OFI_CTX_UNLOCK(ctx); } @@ -1123,7 +1125,7 @@ void shmem_transport_atomic(shmem_transport_ctx_t* ctx, void *target, const void static inline void shmem_transport_atomicv(shmem_transport_ctx_t* ctx, void *target, const void *source, size_t full_len, int pe, int op, int datatype, - long *completion) + long *completion, size_t nic_idx) { int ret = 0; uint64_t dst = (uint64_t) pe; @@ -1137,7 +1139,7 @@ void shmem_transport_atomicv(shmem_transport_ctx_t* ctx, void *target, const voi shmem_internal_assert(SHMEM_Dtsize[dt] * len == full_len); SHMEM_TRANSPORT_OFI_CTX_LOCK(ctx); - ret = fi_atomicvalid(ctx->ep[1], dt, op, /* FIX */ + ret = fi_atomicvalid(ctx->ep[nic_idx], dt, op, /* FIXED? */ &max_atomic_size); max_atomic_size = max_atomic_size * SHMEM_Dtsize[dt]; if (max_atomic_size > shmem_transport_ofi_max_msg_size @@ -1154,10 +1156,10 @@ void shmem_transport_atomicv(shmem_transport_ctx_t* ctx, void *target, const voi polled = 0; - SHMEM_TRANSPORT_OFI_CNTR_INC(&ctx->pending_put_cntr[1]); /* FIX */ + SHMEM_TRANSPORT_OFI_CNTR_INC(&ctx->pending_put_cntr[nic_idx]); /* FIXED? */ do { - ret = fi_inject_atomic(ctx->ep[1], /* FIX */ + ret = fi_inject_atomic(ctx->ep[nic_idx], /* FIXED? */ source, len, GET_DEST(dst), @@ -1165,7 +1167,7 @@ void shmem_transport_atomicv(shmem_transport_ctx_t* ctx, void *target, const voi key, dt, op); - } while (try_again(ctx, ret, &polled)); + } while (try_again(ctx, ret, &polled, nic_idx)); /* FIXED? */ } else if (full_len <= MIN(shmem_transport_ofi_bounce_buffer_size, max_atomic_size) && @@ -1175,7 +1177,7 @@ void shmem_transport_atomicv(shmem_transport_ctx_t* ctx, void *target, const voi create_bounce_buffer(ctx, source, full_len); polled = 0; - SHMEM_TRANSPORT_OFI_CNTR_INC(&ctx->pending_put_cntr[1]); /* FIX */ + SHMEM_TRANSPORT_OFI_CNTR_INC(&ctx->pending_put_cntr[nic_idx]); /* FIXED? */ const struct fi_ioc msg_iov = { .addr = buff->data, .count = len }; const struct fi_rma_ioc rma_iov = { .addr = (uint64_t) addr, .count = len, .key = key }; @@ -1192,8 +1194,8 @@ void shmem_transport_atomicv(shmem_transport_ctx_t* ctx, void *target, const voi .data = 0 }; do { - ret = fi_atomicmsg(ctx->ep[1], &msg, FI_COMPLETION | FI_DELIVERY_COMPLETE); /* FIX */ - } while (try_again(ctx, ret, &polled)); + ret = fi_atomicmsg(ctx->ep[nic_idx], &msg, FI_COMPLETION | FI_DELIVERY_COMPLETE); /* FIXED? */ + } while (try_again(ctx, ret, &polled, nic_idx)); /* FIXED? */ } else { size_t sent = 0; @@ -1203,9 +1205,9 @@ void shmem_transport_atomicv(shmem_transport_ctx_t* ctx, void *target, const voi size_t chunksize = MIN((len-sent), (max_atomic_size/SHMEM_Dtsize[dt])); polled = 0; - SHMEM_TRANSPORT_OFI_CNTR_INC(&ctx->pending_put_cntr[1]); /* FIX */ + SHMEM_TRANSPORT_OFI_CNTR_INC(&ctx->pending_put_cntr[nic_idx]); /* FIXED? */ do { - ret = fi_atomic(ctx->ep[1], /* FIX */ + ret = fi_atomic(ctx->ep[nic_idx], /* FIXED? */ (void *)((char *)source + (sent*SHMEM_Dtsize[dt])), chunksize, @@ -1217,7 +1219,7 @@ void shmem_transport_atomicv(shmem_transport_ctx_t* ctx, void *target, const voi dt, op, NULL); - } while (try_again(ctx, ret, &polled)); + } while (try_again(ctx, ret, &polled, nic_idx)); /* FIXED? */ sent += chunksize; } @@ -1271,7 +1273,7 @@ void shmem_transport_fetch_atomic_nbi(shmem_transport_ctx_t* ctx, void *target, 1, FI_INJECT); /* FI_DELIVERY_COMPLETE is not required as it's implied for fetch atomicmsgs */ - } while (try_again(ctx, ret, &polled)); + } while (try_again(ctx, ret, &polled, 1)); /* FIX */ SHMEM_TRANSPORT_OFI_CTX_UNLOCK(ctx); } @@ -1279,7 +1281,8 @@ void shmem_transport_fetch_atomic_nbi(shmem_transport_ctx_t* ctx, void *target, static inline void shmem_transport_fetch_atomic(shmem_transport_ctx_t* ctx, void *target, const void *source, void *dest, - size_t len, int pe, int op, int datatype) + size_t len, int pe, int op, int datatype, + size_t nic_idx) { #ifdef ENABLE_MR_ENDPOINT /* CXI provider currently does not support fetch atomics with FI_DELIVERY_COMPLETE @@ -1300,10 +1303,10 @@ void shmem_transport_fetch_atomic(shmem_transport_ctx_t* ctx, void *target, shmem_internal_assert(SHMEM_Dtsize[SHMEM_TRANSPORT_DTYPE(datatype)] == len); SHMEM_TRANSPORT_OFI_CTX_LOCK(ctx); - SHMEM_TRANSPORT_OFI_CNTR_INC(&ctx->pending_get_cntr[1]); /* FIX */ + SHMEM_TRANSPORT_OFI_CNTR_INC(&ctx->pending_get_cntr[nic_idx]); /* FIXED? */ do { - ret = fi_fetch_atomic(ctx->ep[1], /* FIX */ + ret = fi_fetch_atomic(ctx->ep[nic_idx], /* FIXED */ source, 1, NULL, @@ -1315,7 +1318,7 @@ void shmem_transport_fetch_atomic(shmem_transport_ctx_t* ctx, void *target, SHMEM_TRANSPORT_DTYPE(datatype), op, NULL); - } while (try_again(ctx, ret, &polled)); + } while (try_again(ctx, ret, &polled, nic_idx)); /* FIXED? */ SHMEM_TRANSPORT_OFI_CTX_UNLOCK(ctx); #endif } @@ -1324,10 +1327,11 @@ void shmem_transport_fetch_atomic(shmem_transport_ctx_t* ctx, void *target, static inline void shmem_transport_swap(shmem_transport_ctx_t* ctx, void *target, const void *source, void *dest, - size_t len, int pe, int datatype) + size_t len, int pe, int datatype, + size_t nic_idx) { shmem_transport_fetch_atomic(ctx, target, source, dest, len, pe, - FI_ATOMIC_WRITE, datatype); + FI_ATOMIC_WRITE, datatype, nic_idx); } @@ -1344,17 +1348,17 @@ void shmem_transport_swap_nbi(shmem_transport_ctx_t* ctx, void *target, static inline void shmem_transport_atomic_set(shmem_transport_ctx_t* ctx, void *target, const void *source, size_t len, int pe, - int datatype) + int datatype, size_t nic_idx) { shmem_transport_atomic(ctx, target, source, len, pe, FI_ATOMIC_WRITE, - datatype); + datatype, nic_idx); } static inline void shmem_transport_atomic_fetch(shmem_transport_ctx_t* ctx, void *target, const void *source, size_t len, int pe, - int datatype) + int datatype, size_t nic_idx) { #ifdef ENABLE_MR_ENDPOINT /* CXI provider currently does not support fetch atomics with FI_DELIVERY_COMPLETE @@ -1365,7 +1369,7 @@ void shmem_transport_atomic_fetch(shmem_transport_ctx_t* ctx, void *target, target, len, pe, FI_SUM, datatype); #else shmem_transport_fetch_atomic(ctx, (void *) source, (const void *) NULL, - target, len, pe, FI_ATOMIC_READ, datatype); + target, len, pe, FI_ATOMIC_READ, datatype, nic_idx); #endif } @@ -1408,7 +1412,8 @@ void shmem_transport_put_ct_nb(shmem_transport_ct_t *ct, void *target, static inline void shmem_transport_get_ct(shmem_transport_ct_t *ct, void *target, - const void *source, size_t len, int pe) + const void *source, size_t len, int pe, + size_t nic_idx) { RAISE_ERROR_STR("OFI transport does not currently support CT operations"); } diff --git a/src/transport_portals4.h b/src/transport_portals4.h index b31f1fb47..d5b426b23 100644 --- a/src/transport_portals4.h +++ b/src/transport_portals4.h @@ -368,7 +368,7 @@ shmem_transport_portals4_drain_eq(void) static inline void -shmem_transport_put_scalar(shmem_transport_ctx_t* ctx, void *target, const void *source, size_t len, int pe) +shmem_transport_put_scalar(shmem_transport_ctx_t* ctx, void *target, const void *source, size_t len, int pe, size_t nic_idx) { int ret; ptl_process_t peer; @@ -571,7 +571,7 @@ shmem_transport_portals4_put_nbi_internal(shmem_transport_ctx_t* ctx, void *targ static inline void -shmem_transport_put_nbi(shmem_transport_ctx_t* ctx, void *target, const void *source, size_t len, int pe) +shmem_transport_put_nbi(shmem_transport_ctx_t* ctx, void *target, const void *source, size_t len, int pe, size_t nic_idx) { #ifdef ENABLE_REMOTE_VIRTUAL_ADDRESSING shmem_transport_portals4_put_nbi_internal(ctx, target, source, len, pe, @@ -588,7 +588,7 @@ shmem_transport_put_nbi(shmem_transport_ctx_t* ctx, void *target, const void *so static inline void shmem_transport_put_nb(shmem_transport_ctx_t* ctx, void *target, const void *source, size_t len, - int pe, long *completion) + int pe, long *completion, size_t nic_idx) { if (ctx->options & SHMEMX_CTX_BOUNCE_BUFFER) { #ifdef ENABLE_REMOTE_VIRTUAL_ADDRESSING @@ -603,7 +603,7 @@ shmem_transport_put_nb(shmem_transport_ctx_t* ctx, void *target, const void *sou shmem_transport_portals4_heap_pt); #endif } else { - shmem_transport_put_nbi(ctx, target, source, len, pe); + shmem_transport_put_nbi(ctx, target, source, len, pe, nic_idx); } } @@ -624,7 +624,7 @@ shmem_transport_put_ct_nb(shmem_transport_ct_t *ct, void *target, const void *so static inline void -shmem_transport_put_wait(shmem_transport_ctx_t* ctx, long *completion) +shmem_transport_put_wait(shmem_transport_ctx_t* ctx, long *completion, size_t nic_idx) { if (ctx->options & SHMEMX_CTX_BOUNCE_BUFFER) { while (*completion > 0) { @@ -668,7 +668,7 @@ shmem_transport_portals4_get_internal(shmem_transport_ctx_t* ctx, void *target, static inline -void shmem_transport_get(shmem_transport_ctx_t* ctx, void *target, const void *source, size_t len, int pe) +void shmem_transport_get(shmem_transport_ctx_t* ctx, void *target, const void *source, size_t len, int pe, size_t nic_idx) { #ifdef ENABLE_REMOTE_VIRTUAL_ADDRESSING shmem_transport_portals4_get_internal(ctx, target, source, len, pe, @@ -683,7 +683,8 @@ void shmem_transport_get(shmem_transport_ctx_t* ctx, void *target, const void *s static inline void shmem_transport_get_ct(shmem_transport_ct_t *ct, void *target, - const void *source, size_t len, int pe) + const void *source, size_t len, int pe, + size_t nic_idx) { #ifdef ENABLE_REMOTE_VIRTUAL_ADDRESSING shmem_transport_portals4_get_internal((shmem_transport_ctx_t *)SHMEM_CTX_DEFAULT, target, source, len, pe, ct->shr_pt, -1); @@ -718,7 +719,7 @@ shmem_transport_get_wait(shmem_transport_ctx_t* ctx, size_t idx) static inline void shmem_transport_swap(shmem_transport_ctx_t* ctx, void *target, const void *source, void *dest, size_t len, - int pe, ptl_datatype_t datatype) + int pe, ptl_datatype_t datatype, size_t nic_idx) { int ret; ptl_process_t peer; @@ -769,7 +770,7 @@ static inline void shmem_transport_cswap(shmem_transport_ctx_t* ctx, void *target, const void *source, void *dest, const void *operand, size_t len, int pe, - ptl_datatype_t datatype) + ptl_datatype_t datatype, size_t nic_idx) { int ret; ptl_process_t peer; @@ -821,7 +822,7 @@ static inline void shmem_transport_mswap(shmem_transport_ctx_t* ctx, void *target, const void *source, void *dest, const void *mask, size_t len, int pe, - ptl_datatype_t datatype) + ptl_datatype_t datatype, size_t nic_idx) { int ret; ptl_process_t peer; @@ -860,7 +861,7 @@ shmem_transport_mswap(shmem_transport_ctx_t* ctx, void *target, const void *sour static inline void shmem_transport_atomic(shmem_transport_ctx_t* ctx, void *target, const void *source, size_t len, - int pe, ptl_op_t op, ptl_datatype_t datatype) + int pe, ptl_op_t op, ptl_datatype_t datatype, size_t nic_idx) { int ret; ptl_pt_index_t pt; @@ -1020,7 +1021,7 @@ static inline void shmem_transport_fetch_atomic(shmem_transport_ctx_t* ctx, void *target, const void *source, void *dest, size_t len, int pe, ptl_op_t op, - ptl_datatype_t datatype) + ptl_datatype_t datatype, size_t nic_idx) { int ret; ptl_pt_index_t pt; @@ -1070,22 +1071,22 @@ shmem_transport_fetch_atomic_nbi(shmem_transport_ctx_t* ctx, void *target, static inline void shmem_transport_atomic_set(shmem_transport_ctx_t* ctx, void *target, const void *source, size_t len, - int pe, int datatype) + int pe, int datatype, size_t nic_idx) { shmem_internal_assert(len <= shmem_transport_portals4_max_atomic_size); - shmem_transport_put_scalar(ctx, target, source, len, pe); + shmem_transport_put_scalar(ctx, target, source, len, pe, nic_idx); } static inline void shmem_transport_atomic_fetch(shmem_transport_ctx_t* ctx, void *target, const void *source, size_t len, - int pe, int datatype) + int pe, int datatype, size_t nic_idx) { shmem_internal_assert(len <= shmem_transport_portals4_max_fetch_atomic_size); - shmem_transport_get(ctx, target, source, len, pe); + shmem_transport_get(ctx, target, source, len, pe, nic_idx); } @@ -1102,16 +1103,16 @@ int shmem_transport_atomic_supported(ptl_op_t op, ptl_datatype_t datatype) static inline void shmem_transport_put_signal_nbi(shmem_transport_ctx_t* ctx, void *target, const void *source, size_t len, - uint64_t *sig_addr, uint64_t signal, int sig_op, int pe) + uint64_t *sig_addr, uint64_t signal, int sig_op, int pe, size_t nic_idx) { /* FIXME: Need to optimize non-blocking put with signal for Portals. Current implementation below keeps * * the "fence" in between data and signal put */ - shmem_transport_put_nbi(ctx, target, source, len, pe); + shmem_transport_put_nbi(ctx, target, source, len, pe, nic_idx); shmem_transport_fence(ctx); if (sig_op == SHMEM_SIGNAL_ADD) - shmem_transport_atomic(ctx, sig_addr, &signal, sizeof(uint64_t), pe, SHM_INTERNAL_SUM, SHM_INTERNAL_UINT64); + shmem_transport_atomic(ctx, sig_addr, &signal, sizeof(uint64_t), pe, SHM_INTERNAL_SUM, SHM_INTERNAL_UINT64, nic_idx); else - shmem_transport_atomic_set(ctx, sig_addr, &signal, sizeof(uint64_t), pe, SHM_INTERNAL_UINT64); + shmem_transport_atomic_set(ctx, sig_addr, &signal, sizeof(uint64_t), pe, SHM_INTERNAL_UINT64, nic_idx); } static inline diff --git a/src/transport_ucx.h b/src/transport_ucx.h index c74165007..72c96e9a9 100644 --- a/src/transport_ucx.h +++ b/src/transport_ucx.h @@ -230,7 +230,7 @@ shmem_transport_fence(shmem_transport_ctx_t* ctx) static inline void -shmem_transport_put_scalar(shmem_transport_ctx_t* ctx, void *target, const void *source, size_t len, int pe) +shmem_transport_put_scalar(shmem_transport_ctx_t* ctx, void *target, const void *source, size_t len, int pe, size_t nic_idx) { ucs_status_t status; ucp_rkey_h rkey; @@ -275,7 +275,7 @@ shmem_transport_put_nb(shmem_transport_ctx_t* ctx, void *target, const void *sou static inline void -shmem_transport_put_wait(shmem_transport_ctx_t* ctx, long *completion) +shmem_transport_put_wait(shmem_transport_ctx_t* ctx, long *completion, size_t nic_idx) { while (__atomic_load_n(completion, __ATOMIC_ACQUIRE) > 0) shmem_transport_probe(); @@ -284,7 +284,7 @@ shmem_transport_put_wait(shmem_transport_ctx_t* ctx, long *completion) static inline void shmem_transport_put_nbi(shmem_transport_ctx_t* ctx, void *target, const void *source, size_t len, - int pe) + int pe, size_t nic_idx) { ucs_status_t status; ucp_rkey_h rkey; @@ -298,7 +298,7 @@ shmem_transport_put_nbi(shmem_transport_ctx_t* ctx, void *target, const void *so static inline void -shmem_transport_get(shmem_transport_ctx_t* ctx, void *target, const void *source, size_t len, int pe) +shmem_transport_get(shmem_transport_ctx_t* ctx, void *target, const void *source, size_t len, int pe, size_t nic_idx) { ucs_status_ptr_t pstatus; ucp_rkey_h rkey; @@ -324,7 +324,7 @@ shmem_transport_get_wait(shmem_transport_ctx_t* ctx, size_t idx) static inline void shmem_transport_swap(shmem_transport_ctx_t* ctx, void *target, const void *source, void *dest, - size_t len, int pe, shm_internal_datatype_t datatype) + size_t len, int pe, shm_internal_datatype_t datatype, size_t nic_idx) { uint8_t *remote_addr; ucp_rkey_h rkey; @@ -402,7 +402,7 @@ static inline void shmem_transport_cswap(shmem_transport_ctx_t* ctx, void *target, const void *source, void *dest, const void *operand, size_t len, int pe, - shm_internal_datatype_t datatype) + shm_internal_datatype_t datatype, size_t nic_idx) { uint8_t *remote_addr; ucp_rkey_h rkey; @@ -484,7 +484,7 @@ shmem_transport_cswap_nbi(shmem_transport_ctx_t* ctx, void *target, const void * static inline void shmem_transport_atomic(shmem_transport_ctx_t* ctx, void *target, const void *source, size_t len, - int pe, shm_internal_op_t op, shm_internal_datatype_t datatype) + int pe, shm_internal_op_t op, shm_internal_datatype_t datatype, size_t nic_idx) { uint8_t *remote_addr; ucp_rkey_h rkey; @@ -530,7 +530,7 @@ shmem_transport_atomicv(shmem_transport_ctx_t* ctx, void *target, const void *so static inline void shmem_transport_fetch_atomic(shmem_transport_ctx_t* ctx, void *target, const void *source, void *dest, size_t len, - int pe, shm_internal_op_t op, shm_internal_datatype_t datatype) + int pe, shm_internal_op_t op, shm_internal_datatype_t datatype, size_t nic_idx) { uint8_t *remote_addr; ucp_rkey_h rkey; @@ -613,7 +613,7 @@ shmem_transport_fetch_atomic_nbi(shmem_transport_ctx_t* ctx, void *target, const static inline void shmem_transport_atomic_fetch(shmem_transport_ctx_t* ctx, void *target, const void *source, size_t len, - int pe, shm_internal_datatype_t datatype) + int pe, shm_internal_datatype_t datatype, size_t nic_idx) { uint8_t *remote_addr; ucp_rkey_h rkey; @@ -632,7 +632,7 @@ shmem_transport_atomic_fetch(shmem_transport_ctx_t* ctx, void *target, const voi static inline void shmem_transport_atomic_set(shmem_transport_ctx_t* ctx, void *target, const void *source, size_t len, - int pe, shm_internal_datatype_t datatype) + int pe, shm_internal_datatype_t datatype, size_t nic_idx) { uint8_t *remote_addr; ucp_rkey_h rkey; @@ -675,7 +675,7 @@ static inline void shmem_transport_mswap(shmem_transport_ctx_t* ctx, void *target, const void *source, void *dest, const void *mask, size_t len, int pe, - shm_internal_datatype_t datatype) + shm_internal_datatype_t datatype, size_t nic_idx) { uint8_t *remote_addr; ucp_rkey_h rkey; @@ -718,18 +718,18 @@ void shmem_transport_syncmem(void) static inline void shmem_transport_put_signal_nbi(shmem_transport_ctx_t* ctx, void *target, const void *source, size_t len, - uint64_t *sig_addr, uint64_t signal, int sig_op, int pe) + uint64_t *sig_addr, uint64_t signal, int sig_op, int pe, size_t nic_idx) { - shmem_transport_put_nbi(ctx, target, source, len, pe); + shmem_transport_put_nbi(ctx, target, source, len, pe, nic_idx); shmem_transport_fence(ctx); switch (sig_op) { case SHMEM_SIGNAL_ADD: shmem_transport_atomic(ctx, sig_addr, &signal, sizeof(uint64_t), - pe, SHM_INTERNAL_SUM, SHM_INTERNAL_UINT64); + pe, SHM_INTERNAL_SUM, SHM_INTERNAL_UINT64, nic_idx); break; case SHMEM_SIGNAL_SET: shmem_transport_atomic_set(ctx, sig_addr, &signal, sizeof(uint64_t), - pe, SHM_INTERNAL_UINT64); + pe, SHM_INTERNAL_UINT64, nic_idx); break; default: RAISE_ERROR_MSG("Unsupported operation (%d)\n", sig_op); @@ -779,7 +779,8 @@ shmem_transport_put_ct_nb(shmem_transport_ct_t *ct, void *target, const void static inline void shmem_transport_get_ct(shmem_transport_ct_t *ct, void - *target, const void *source, size_t len, int pe) + *target, const void *source, size_t len, int pe, + size_t nic_idx) { RAISE_ERROR_STR("No path to peer"); }