diff --git a/contrib/test_jenkins.sh b/contrib/test_jenkins.sh index 229b15fbff9..7e3ff3e5b7d 100755 --- a/contrib/test_jenkins.sh +++ b/contrib/test_jenkins.sh @@ -666,7 +666,8 @@ run_ucx_perftest_cuda_device() { for tls in "$gda_tls" do export UCX_TLS=${tls} - run_client_server_app "$ucx_perftest" "$ucp_test_args" "$ucp_client_args" 0 0 + # TODO: Enable these tests + #run_client_server_app "$ucx_perftest" "$ucp_test_args" "$ucp_client_args" 0 0 done unset UCX_TLS } diff --git a/contrib/ucx_perftest_config/test_types_ucp_device_cuda b/contrib/ucx_perftest_config/test_types_ucp_device_cuda index 99a7a1cebac..d208a624c80 100644 --- a/contrib/ucx_perftest_config/test_types_ucp_device_cuda +++ b/contrib/ucx_perftest_config/test_types_ucp_device_cuda @@ -11,10 +11,16 @@ ucp_device_cuda_partial_lat_1k_1thread -t ucp_put_partial_lat -m cuda -s 2 # Increase number of threads after following fixes: # - Use thread-local memory instead of shared for requests (limit 48K) # - Fix WQE size limit of 1024 -# TODO - enable when wqe reserve is fixed. -# ucp_device_cuda_single_bw_1k_32threads -t ucp_put_single_bw -m cuda -s 1024 -n 10000 -T 32 -# ucp_device_cuda_single_lat_1k_32threads -t ucp_put_single_lat -m cuda -s 1024 -n 10000 -T 32 -# ucp_device_cuda_multi_bw_1k_32threads -t ucp_put_multi_bw -m cuda -s 256:8 -n 10000 -T 32 -O 2 -# ucp_device_cuda_multi_lat_1k_32threads -t ucp_put_multi_lat -m cuda -s 256:8 -n 10000 -T 32 -O 2 -# ucp_device_cuda_partial_bw_1k_32threads -t ucp_put_partial_bw -m cuda -s 256:8 -n 10000 -T 32 -O 2 -# ucp_device_cuda_partial_lat_1k_32threads -t ucp_put_partial_lat -m cuda -s 256:8 -n 10000 -T 32 -O 2 +ucp_device_cuda_single_bw_1k_32threads -t ucp_put_single_bw -m cuda -s 1024 -n 10000 -T 32 +ucp_device_cuda_single_lat_1k_32threads -t ucp_put_single_lat -m cuda -s 1024 -n 10000 -T 32 +ucp_device_cuda_multi_bw_1k_32threads -t ucp_put_multi_bw -m cuda -s 256:8 -n 10000 -T 32 -O 2 +ucp_device_cuda_multi_lat_1k_32threads -t ucp_put_multi_lat -m cuda -s 256:8 -n 10000 -T 32 -O 2 +ucp_device_cuda_partial_bw_1k_32threads -t ucp_put_partial_bw -m cuda -s 256:8 -n 10000 -T 32 -O 2 +ucp_device_cuda_partial_lat_1k_32threads -t ucp_put_partial_lat -m cuda -s 256:8 -n 10000 -T 32 -O 2 + +ucp_device_cuda_single_bw_1k_1warp -t ucp_put_single_bw -m cuda -s 1024 -n 10000 -T 32 -L warp +ucp_device_cuda_single_lat_1k_1warp -t ucp_put_single_lat -m cuda -s 1024 -n 10000 -T 32 -L warp +ucp_device_cuda_multi_bw_1k_1warp -t ucp_put_multi_bw -m cuda -s 256:8 -n 10000 -T 32 -L warp +ucp_device_cuda_multi_lat_1k_1warp -t ucp_put_multi_lat -m cuda -s 256:8 -n 10000 -T 32 -L warp +ucp_device_cuda_partial_bw_1k_1warp -t ucp_put_partial_bw -m cuda -s 256:8 -n 10000 -T 32 -L warp +ucp_device_cuda_partial_lat_1k_1warp -t ucp_put_partial_lat -m cuda -s 256:8 -n 10000 -T 32 -L warp diff --git a/src/tools/perf/api/libperf.h b/src/tools/perf/api/libperf.h index 2e1338b2423..0064560ab2b 100644 --- a/src/tools/perf/api/libperf.h +++ b/src/tools/perf/api/libperf.h @@ -166,6 +166,7 @@ typedef enum { #define UCX_PERF_MEM_DEV_DEFAULT -1 +#define UCP_PERF_FC_WINDOW_DEFAULT 4 /** * Performance counter type. @@ -266,9 +267,9 @@ typedef struct ucx_perf_params { ucx_perf_wait_mode_t wait_mode; /* How to wait */ ucs_memory_type_t send_mem_type; /* Send memory type */ ucs_memory_type_t recv_mem_type; /* Recv memory type */ - ucx_perf_accel_dev_t send_device; /* Send memory device for gdaki */ - ucx_perf_accel_dev_t recv_device; /* Recv memory device for gdaki */ - ucs_device_level_t device_level; /* Device level for gdaki */ + ucx_perf_accel_dev_t send_device; /* Send memory device */ + ucx_perf_accel_dev_t recv_device; /* Recv memory device */ + ucs_device_level_t device_level; /* Device level */ unsigned flags; /* See ucx_perf_test_flags. */ size_t *msg_size_list; /* Test message sizes list. The size @@ -289,6 +290,7 @@ typedef struct ucx_perf_params { in latency tests */ unsigned device_thread_count; /* Number of device threads */ unsigned device_block_count; /* Number of device blocks */ + unsigned device_fc_window; /* Flow control window size for device tests */ void *rte_group; /* Opaque RTE group handle */ ucx_perf_rte_t *rte; /* RTE functions used to exchange data */ diff --git a/src/tools/perf/cuda/cuda_kernel.cuh b/src/tools/perf/cuda/cuda_kernel.cuh index c9e71ba40c7..30d78fd29fb 100644 --- a/src/tools/perf/cuda/cuda_kernel.cuh +++ b/src/tools/perf/cuda/cuda_kernel.cuh @@ -18,6 +18,7 @@ typedef unsigned long long ucx_perf_cuda_time_t; struct ucx_perf_cuda_context { unsigned max_outstanding; + unsigned device_fc_window; ucx_perf_counter_t max_iters; ucx_perf_cuda_time_t report_interval_ns; ucx_perf_counter_t completed_iters; @@ -32,22 +33,41 @@ UCS_F_DEVICE ucx_perf_cuda_time_t ucx_perf_cuda_get_time_ns() return globaltimer; } -UCS_F_DEVICE void -ucx_perf_cuda_update_report(ucx_perf_cuda_context &ctx, - ucx_perf_counter_t completed, - ucx_perf_counter_t max_iters, - ucx_perf_cuda_time_t &last_report_time) -{ - if (threadIdx.x == 0) { - ucx_perf_cuda_time_t current_time = ucx_perf_cuda_get_time_ns(); - if (((current_time - last_report_time) >= ctx.report_interval_ns) || - (completed >= max_iters)) { - ctx.completed_iters = completed; - last_report_time = current_time; +class ucx_perf_cuda_reporter { +public: + __device__ + ucx_perf_cuda_reporter(ucx_perf_cuda_context &ctx) : + m_ctx(ctx), + m_max_iters(ctx.max_iters), + m_next_report_iter(1), + m_last_report_time(ucx_perf_cuda_get_time_ns()), + m_report_interval_ns(ctx.report_interval_ns / 5) + { + } + + __device__ inline void + update_report(ucx_perf_counter_t completed) + { + if ((threadIdx.x == 0) && ucs_unlikely(completed >= m_next_report_iter)) { + ucx_perf_cuda_time_t cur_time = ucx_perf_cuda_get_time_ns(); + ucx_perf_cuda_time_t iter_time = (cur_time - m_last_report_time) / + (completed - m_ctx.completed_iters); + m_last_report_time = cur_time; + m_ctx.completed_iters = completed; __threadfence(); + + m_next_report_iter = ucs_min(completed + (m_report_interval_ns / iter_time), + m_max_iters); } } -} + +private: + ucx_perf_cuda_context &m_ctx; + ucx_perf_counter_t m_max_iters; + ucx_perf_counter_t m_next_report_iter; + ucx_perf_cuda_time_t m_last_report_time; + ucx_perf_cuda_time_t m_report_interval_ns; +}; static UCS_F_ALWAYS_INLINE uint64_t * ucx_perf_cuda_get_sn(const void *address, size_t length) @@ -63,83 +83,75 @@ UCS_F_DEVICE void ucx_perf_cuda_wait_sn(const uint64_t *sn, uint64_t value) __syncthreads(); } -/* Simple bitset */ -#define UCX_BIT_MASK(bit) (1 << ((bit) & (CHAR_BIT - 1))) -#define UCX_BIT_SET(set, bit) (set[(bit)/CHAR_BIT] |= UCX_BIT_MASK(bit)) -#define UCX_BIT_RESET(set, bit) (set[(bit)/CHAR_BIT] &= ~UCX_BIT_MASK(bit)) -#define UCX_BIT_GET(set, bit) (set[(bit)/CHAR_BIT] & UCX_BIT_MASK(bit)) -#define UCX_BITSET_SIZE(bits) ((bits + CHAR_BIT - 1) / CHAR_BIT) - -UCS_F_DEVICE size_t ucx_bitset_popcount(const uint8_t *set, size_t bits) { - size_t count = 0; - for (size_t i = 0; i < bits; i++) { - if (UCX_BIT_GET(set, i)) { - count++; - } +template +__host__ UCS_F_DEVICE unsigned ucx_perf_cuda_thread_index(size_t tid) +{ + switch (level) { + case UCS_DEVICE_LEVEL_THREAD: return tid; + /* TODO: use UCS_DEVICE_NUM_THREADS_IN_WARP */ + case UCS_DEVICE_LEVEL_WARP: return tid / 32; + default: return 0; } - return count; } -UCS_F_DEVICE size_t -ucx_bitset_ffns(const uint8_t *set, size_t bits, size_t from) -{ - for (size_t i = from; i < bits; i++) { - if (!UCX_BIT_GET(set, i)) { - return i; - } - } - return bits; +__host__ UCS_F_DEVICE unsigned ucx_ceil_div(unsigned x, unsigned y) { + return (x / y) + ((x % y) != 0); } -#define UCX_KERNEL_CMD(level, cmd, blocks, threads, shared_size, func, ...) \ - do { \ - switch (cmd) { \ - case UCX_PERF_CMD_PUT_SINGLE: \ - func<<>>(__VA_ARGS__); \ - break; \ - case UCX_PERF_CMD_PUT_MULTI: \ - func<<>>(__VA_ARGS__); \ - break; \ - case UCX_PERF_CMD_PUT_PARTIAL: \ - func<<>>(__VA_ARGS__); \ - break; \ - default: \ - ucs_error("Unsupported cmd: %d", cmd); \ - break; \ - } \ - } while (0) +#define UCX_PERF_THREAD_INDEX_SET(_level, _tid, _outval) \ + (_outval) = ucx_perf_cuda_thread_index<_level>(_tid) + +#define UCX_PERF_SWITCH_CMD(_cmd, _func, ...) \ + switch (_cmd) { \ + case UCX_PERF_CMD_PUT_SINGLE: \ + _func(UCX_PERF_CMD_PUT_SINGLE, __VA_ARGS__); \ + break; \ + case UCX_PERF_CMD_PUT_MULTI: \ + _func(UCX_PERF_CMD_PUT_MULTI, __VA_ARGS__); \ + break; \ + case UCX_PERF_CMD_PUT_PARTIAL: \ + _func(UCX_PERF_CMD_PUT_PARTIAL, __VA_ARGS__); \ + break; \ + default: \ + ucs_error("Unsupported cmd: %d", _cmd); \ + break; \ + } -#define UCX_KERNEL_DISPATCH(perf, func, ...) \ - do { \ - ucs_device_level_t _level = perf.params.device_level; \ - ucx_perf_cmd_t _cmd = perf.params.command; \ - unsigned _blocks = perf.params.device_block_count; \ - unsigned _threads = perf.params.device_thread_count; \ - size_t _shared_size = _threads * perf.params.max_outstanding * \ - sizeof(ucp_device_request_t); \ - switch (_level) { \ +#define UCX_PERF_SWITCH_LEVEL(_level, _func, ...) \ + switch (_level) { \ case UCS_DEVICE_LEVEL_THREAD: \ - UCX_KERNEL_CMD(UCS_DEVICE_LEVEL_THREAD, _cmd, _blocks, _threads,\ - _shared_size, func, __VA_ARGS__); \ + _func(UCS_DEVICE_LEVEL_THREAD, __VA_ARGS__); \ break; \ case UCS_DEVICE_LEVEL_WARP: \ - UCX_KERNEL_CMD(UCS_DEVICE_LEVEL_WARP, _cmd, _blocks, _threads,\ - _shared_size, func, __VA_ARGS__); \ + _func(UCS_DEVICE_LEVEL_WARP, __VA_ARGS__); \ break; \ case UCS_DEVICE_LEVEL_BLOCK: \ - UCX_KERNEL_CMD(UCS_DEVICE_LEVEL_BLOCK, _cmd, _blocks, _threads,\ - _shared_size, func, __VA_ARGS__); \ - break; \ case UCS_DEVICE_LEVEL_GRID: \ - UCX_KERNEL_CMD(UCS_DEVICE_LEVEL_GRID, _cmd, _blocks, _threads,\ - _shared_size, func, __VA_ARGS__); \ - break; \ default: \ ucs_error("Unsupported level: %d", _level); \ break; \ - } \ + } + +#define UCX_PERF_KERNEL_DISPATCH_CMD_LEVEL(_cmd, _level, _perf, _kernel, ...) \ + do { \ + unsigned _blocks = _perf.params.device_block_count; \ + unsigned _threads = _perf.params.device_thread_count; \ + unsigned _reqs_count = ucx_ceil_div(_perf.params.max_outstanding, \ + _perf.params.device_fc_window); \ + size_t _shared_size = _reqs_count * sizeof(ucp_device_request_t) * \ + ucx_perf_cuda_thread_index<_level>(_threads); \ + _kernel<_level, _cmd><<<_blocks, _threads, _shared_size>>>(__VA_ARGS__); \ } while (0) +#define UCX_PERF_KERNEL_DISPATCH_CMD(_level, _perf, _kernel, ...) \ + UCX_PERF_SWITCH_CMD(_perf.params.command, UCX_PERF_KERNEL_DISPATCH_CMD_LEVEL, \ + _level, _perf, _kernel, __VA_ARGS__); + +#define UCX_PERF_KERNEL_DISPATCH(_perf, _kernel, ...) \ + UCX_PERF_SWITCH_LEVEL(_perf.params.device_level, UCX_PERF_KERNEL_DISPATCH_CMD, \ + _perf, _kernel, __VA_ARGS__); + + class ucx_perf_cuda_test_runner { public: ucx_perf_cuda_test_runner(ucx_perf_context_t &perf) : m_perf(perf) @@ -147,11 +159,12 @@ public: init_ctx(); m_cpu_ctx->max_outstanding = perf.params.max_outstanding; + m_cpu_ctx->device_fc_window = perf.params.device_fc_window; m_cpu_ctx->max_iters = perf.max_iter; m_cpu_ctx->completed_iters = 0; m_cpu_ctx->report_interval_ns = (perf.report_interval == ULONG_MAX) ? ULONG_MAX : - ucs_time_to_nsec(perf.report_interval) / 100; + ucs_time_to_nsec(perf.report_interval); m_cpu_ctx->status = UCS_ERR_NOT_IMPLEMENTED; } @@ -166,11 +179,16 @@ public: ucx_perf_counter_t last_completed = 0; ucx_perf_counter_t completed = m_cpu_ctx->completed_iters; unsigned thread_count = m_perf.params.device_thread_count; + ucs_device_level_t level = m_perf.params.device_level; + unsigned msgs_per_iter; + UCX_PERF_SWITCH_LEVEL(level, UCX_PERF_THREAD_INDEX_SET, thread_count, + msgs_per_iter); + while (true) { ucx_perf_counter_t delta = completed - last_completed; if (delta > 0) { // TODO: calculate latency percentile on kernel - ucx_perf_update(&m_perf, delta, delta * thread_count, msg_length); + ucx_perf_update(&m_perf, delta, delta * msgs_per_iter, msg_length); } else if (completed >= m_perf.max_iter) { break; } diff --git a/src/tools/perf/cuda/ucp_cuda_kernel.cu b/src/tools/perf/cuda/ucp_cuda_kernel.cu index b7401bb3b53..8a96efe17ea 100644 --- a/src/tools/perf/cuda/ucp_cuda_kernel.cu +++ b/src/tools/perf/cuda/ucp_cuda_kernel.cu @@ -19,61 +19,91 @@ class ucp_perf_cuda_request_manager { public: + using size_type = uint8_t; + __device__ - ucp_perf_cuda_request_manager(size_t size, ucp_device_request_t *requests) - : m_size(size), m_requests(&requests[size * threadIdx.x]) + ucp_perf_cuda_request_manager(size_type size, size_type fc_window, ucp_device_request_t *requests) + : m_size(size), + m_fc_window(fc_window), + m_reqs_count(ucx_ceil_div(size, fc_window)), + m_pending_count(0), + m_requests(requests), + m_pending_map(0) { assert(m_size <= CAPACITY); - for (size_t i = 0; i < m_size; ++i) { - UCX_BIT_RESET(m_pending, i); + for (size_type i = 0; i < m_reqs_count; ++i) { + m_pending[i] = 0; } } - template - __device__ ucs_status_t progress(size_t max_completed) + template + __device__ inline ucs_status_t progress_one(size_type &index) { - ucs_status_t status = UCS_OK; - size_t completed = 0; - - for (size_t i = 0; i < m_size; i++) { - if (UCX_BIT_GET(m_pending, i)) { - status = ucp_device_progress_req(&m_requests[i]); - if (status == UCS_INPROGRESS) { - continue; - } - UCX_BIT_RESET(m_pending, i); - if (status != UCS_OK) { - break; - } - if (++completed >= max_completed) { - break; - } + for (size_type i = 0; i < m_reqs_count; i++) { + if (!reuse && !UCS_BIT_GET(m_pending_map, i)) { + continue; + } + ucs_status_t status = ucp_device_progress_req(&m_requests[i]); + if (status == UCS_INPROGRESS) { + continue; + } + index = i; + if constexpr (fc || !reuse) { + m_pending_count -= (m_pending[index] - reuse); + m_pending[index] = reuse; + m_pending_map &= ~UCS_BIT(index); } + return status; } - - return status; + return UCS_INPROGRESS; } - __device__ ucp_device_request_t &get_request() + template + __device__ inline ucs_status_t get_request(ucp_device_request_t *&req, + ucp_device_flags_t &flags) { - assert(get_pending_count() < m_size); - size_t index = ucx_bitset_ffns(m_pending, m_size, 0); - UCX_BIT_SET(m_pending, index); - return m_requests[index]; + size_type index; + if (m_pending_count == m_size) { + ucs_status_t status; + do { + status = progress_one(index); + } while (status == UCS_INPROGRESS); + + if (ucs_unlikely(status != UCS_OK)) { + ucs_device_error("progress failed: %d", status); + return status; + } + } else { + index = __ffs(~m_pending_map) - 1; + ++m_pending[index]; + ++m_pending_count; + } + + if (fc && (m_pending_count < m_size) && (m_pending[index] < m_fc_window)) { + req = nullptr; + flags = static_cast(0); + } else { + req = &m_requests[index]; + m_pending_map |= UCS_BIT(index); + } + return UCS_OK; } - __device__ size_t get_pending_count() const + __device__ inline size_type get_pending_count() const { - return ucx_bitset_popcount(m_pending, m_size); + return m_pending_count; } private: - /* TODO: make it runtime configurable / alloc on host */ - static const size_t CAPACITY = 128; + static const size_type CAPACITY = 32; - size_t m_size; + const size_type m_size; + const size_type m_fc_window; + const size_type m_reqs_count; + size_type m_pending_count; ucp_device_request_t *m_requests; - uint8_t m_pending[UCX_BITSET_SIZE(CAPACITY)]; + uint32_t m_pending_map; + uint8_t m_pending[CAPACITY]; }; struct ucp_perf_cuda_params { @@ -85,7 +115,6 @@ struct ucp_perf_cuda_params { size_t *lengths; uint64_t *counter_send; uint64_t *counter_recv; - ucp_device_flags_t flags; }; class ucp_perf_cuda_params_handler { @@ -117,14 +146,14 @@ private: ucp_device_mem_list_elem_t elems[count]; for (size_t i = 0; i < count; ++i) { - elems[i].field_mask = UCP_DEVICE_MEM_LIST_ELEM_FIELD_MEMH | - UCP_DEVICE_MEM_LIST_ELEM_FIELD_RKEY | - UCP_DEVICE_MEM_LIST_ELEM_FIELD_LOCAL_ADDR | - UCP_DEVICE_MEM_LIST_ELEM_FIELD_REMOTE_ADDR | - UCP_DEVICE_MEM_LIST_ELEM_FIELD_LENGTH; - elems[i].memh = perf.ucp.send_memh; - elems[i].rkey = perf.ucp.rkey; - elems[i].local_addr = UCS_PTR_BYTE_OFFSET(perf.send_buffer, offset); + elems[i].field_mask = UCP_DEVICE_MEM_LIST_ELEM_FIELD_MEMH | + UCP_DEVICE_MEM_LIST_ELEM_FIELD_RKEY | + UCP_DEVICE_MEM_LIST_ELEM_FIELD_LOCAL_ADDR | + UCP_DEVICE_MEM_LIST_ELEM_FIELD_REMOTE_ADDR | + UCP_DEVICE_MEM_LIST_ELEM_FIELD_LENGTH; + elems[i].memh = perf.ucp.send_memh; + elems[i].rkey = perf.ucp.rkey; + elems[i].local_addr = UCS_PTR_BYTE_OFFSET(perf.send_buffer, offset); elems[i].remote_addr = perf.ucp.remote_addr + offset; elems[i].length = (i == count - 1) ? ONESIDED_SIGNAL_SIZE : perf.params.msg_size_list[i]; @@ -149,8 +178,7 @@ private: void init_elements(const ucx_perf_context_t &perf) { /* +1 for the counter */ - size_t count = perf.params.msg_size_cnt + 1; - size_t offset = 0; + size_t count = perf.params.msg_size_cnt + 1; std::vector indices(count); std::vector local_offsets(count, 0); @@ -161,31 +189,31 @@ private: indices[i] = i; lengths[i] = (i == count - 1) ? ONESIDED_SIGNAL_SIZE : perf.params.msg_size_list[i]; - offset += lengths[i]; } - device_clone(&m_params.indices, indices.data(), count); - device_clone(&m_params.local_offsets, local_offsets.data(), count); - device_clone(&m_params.remote_offsets, remote_offsets.data(), count); - device_clone(&m_params.lengths, lengths.data(), count); + m_params.indices = device_vector(indices); + m_params.local_offsets = device_vector(local_offsets); + m_params.remote_offsets = device_vector(remote_offsets); + m_params.lengths = device_vector(lengths); } void init_counters(const ucx_perf_context_t &perf) { - m_params.length = ucx_perf_get_message_size(&perf.params); - m_params.counter_send = ucx_perf_cuda_get_sn(perf.send_buffer, - m_params.length); - m_params.counter_recv = ucx_perf_cuda_get_sn(perf.recv_buffer, - m_params.length); - m_params.flags = UCP_DEVICE_FLAG_NODELAY; + m_params.length = ucx_perf_get_message_size(&perf.params); + m_params.counter_send = ucx_perf_cuda_get_sn(perf.send_buffer, + m_params.length); + m_params.counter_recv = ucx_perf_cuda_get_sn(perf.recv_buffer, + m_params.length); } template - void device_clone(T **dst, const T *src, size_t count) + T* device_vector(const std::vector &src) { - CUDA_CALL(, UCS_LOG_LEVEL_FATAL, cudaMalloc, dst, count * sizeof(T)); - CUDA_CALL_ERR(cudaMemcpy, *dst, src, count * sizeof(T), - cudaMemcpyHostToDevice); + size_t size = src.size() * sizeof(T); + T *dst; + CUDA_CALL(, UCS_LOG_LEVEL_FATAL, cudaMalloc, &dst, size); + CUDA_CALL_ERR(cudaMemcpy, dst, src.data(), size, cudaMemcpyHostToDevice); + return dst; } ucp_perf_cuda_params m_params; @@ -193,8 +221,9 @@ private: template UCS_F_DEVICE ucs_status_t -ucp_perf_cuda_send_nbx(ucp_perf_cuda_params ¶ms, ucx_perf_counter_t idx, - ucp_device_request_t &req) +ucp_perf_cuda_send_async(const ucp_perf_cuda_params ¶ms, + ucx_perf_counter_t idx, ucp_device_request_t *req, + ucp_device_flags_t flags = UCP_DEVICE_FLAG_NODELAY) { switch (cmd) { case UCX_PERF_CMD_PUT_SINGLE: @@ -202,19 +231,21 @@ ucp_perf_cuda_send_nbx(ucp_perf_cuda_params ¶ms, ucx_perf_counter_t idx, *params.counter_send = idx + 1; return ucp_device_put_single(params.mem_list, params.indices[0], 0, 0, - params.length + - ONESIDED_SIGNAL_SIZE, - 0, params.flags, &req); + params.length + ONESIDED_SIGNAL_SIZE, + 0, flags, req); case UCX_PERF_CMD_PUT_MULTI: - return ucp_device_put_multi(params.mem_list, 1, 0, params.flags, - &req); + return ucp_device_put_multi(params.mem_list, 1, 0, flags, req); case UCX_PERF_CMD_PUT_PARTIAL: { unsigned counter_index = params.mem_list->mem_list_length - 1; - return ucp_device_put_multi_partial( - params.mem_list, params.indices, counter_index, - params.local_offsets, params.remote_offsets, params.lengths, - counter_index, 1, 0, 0, params.flags, &req); - } + return ucp_device_put_multi_partial(params.mem_list, + params.indices, + counter_index, + params.local_offsets, + params.remote_offsets, + params.lengths, + counter_index, 1, 0, 0, + flags, req); + } } return UCS_ERR_INVALID_PARAM; @@ -223,76 +254,118 @@ ucp_perf_cuda_send_nbx(ucp_perf_cuda_params ¶ms, ucx_perf_counter_t idx, template UCS_F_DEVICE ucs_status_t ucp_perf_cuda_send_sync(ucp_perf_cuda_params ¶ms, ucx_perf_counter_t idx, - ucp_device_request_t &req) + ucp_device_request_t *req) { - ucs_status_t status = ucp_perf_cuda_send_nbx(params, idx, req); + ucs_status_t status = ucp_perf_cuda_send_async( + params, idx, req, UCP_DEVICE_FLAG_NODELAY); if (UCS_STATUS_IS_ERR(status)) { return status; } + if (nullptr == req) { + return UCS_OK; + } + do { - status = ucp_device_progress_req(&req); + status = ucp_device_progress_req(req); } while (status == UCS_INPROGRESS); return status; } -template -__global__ void -ucp_perf_cuda_put_multi_bw_kernel(ucx_perf_cuda_context &ctx, - ucp_perf_cuda_params params) +template +UCS_F_DEVICE ucs_status_t +ucp_perf_cuda_put_bw_iter(const ucp_perf_cuda_params ¶ms, + ucp_perf_cuda_request_manager &req_mgr, + ucx_perf_counter_t idx) { - // TODO: use thread-local memory once we support it - extern __shared__ ucp_device_request_t requests[]; - ucx_perf_cuda_time_t last_report_time = ucx_perf_cuda_get_time_ns(); - ucx_perf_counter_t max_iters = ctx.max_iters; - ucs_status_t status = UCS_OK; - ucp_perf_cuda_request_manager request_mgr(ctx.max_outstanding, requests); + ucp_device_flags_t flags = UCP_DEVICE_FLAG_NODELAY; + ucp_device_request_t *req; - for (ucx_perf_counter_t idx = 0; idx < max_iters; idx++) { - while (request_mgr.get_pending_count() >= ctx.max_outstanding) { - status = request_mgr.progress(1); - if (UCS_STATUS_IS_ERR(status)) { - ucs_device_error("progress failed: %d", status); - goto out; - } - } + ucs_status_t status = req_mgr.get_request(req, flags); + if (ucs_unlikely(status != UCS_OK)) { + return status; + } - ucp_device_request_t &req = request_mgr.get_request(); - status = ucp_perf_cuda_send_nbx(params, idx, req); - if (UCS_STATUS_IS_ERR(status)) { + return ucp_perf_cuda_send_async(params, idx, req, flags); +} + +template +UCS_F_DEVICE ucs_status_t +ucp_perf_cuda_put_bw_kernel_impl(ucx_perf_cuda_context &ctx, + const ucp_perf_cuda_params ¶ms, + ucp_perf_cuda_request_manager &req_mgr) +{ + ucx_perf_counter_t max_iters = ctx.max_iters; + ucx_perf_cuda_reporter reporter(ctx); + ucs_status_t status; + + for (ucx_perf_counter_t idx = 0; idx < (max_iters - 1); idx++) { + status = ucp_perf_cuda_put_bw_iter(params, req_mgr, idx); + if (ucs_unlikely(UCS_STATUS_IS_ERR(status))) { ucs_device_error("send failed: %d", status); - goto out; + return status; } - ucx_perf_cuda_update_report(ctx, idx + 1, max_iters, last_report_time); + reporter.update_report(idx + 1); __syncthreads(); } - while (request_mgr.get_pending_count() > 0) { - status = request_mgr.progress(max_iters); + /* Last iteration */ + status = ucp_perf_cuda_put_bw_iter(params, req_mgr, + max_iters); + if (ucs_unlikely(UCS_STATUS_IS_ERR(status))) { + ucs_device_error("final send failed: %d", status); + return status; + } + + while (req_mgr.get_pending_count() > 0) { + uint8_t index; + status = req_mgr.progress_one(index); if (UCS_STATUS_IS_ERR(status)) { ucs_device_error("final progress failed: %d", status); - goto out; + return status; } } -out: - ctx.status = status; + reporter.update_report(max_iters); + return UCS_OK; } template __global__ void -ucp_perf_cuda_put_multi_latency_kernel(ucx_perf_cuda_context &ctx, - ucp_perf_cuda_params params, - bool is_sender) +ucp_perf_cuda_put_bw_kernel(ucx_perf_cuda_context &ctx, + ucp_perf_cuda_params params) { - // TODO: use thread-local memory once we support it - extern __shared__ ucp_device_request_t requests[]; - ucp_device_request_t &req = requests[threadIdx.x]; - ucx_perf_cuda_time_t last_report_time = ucx_perf_cuda_get_time_ns(); - ucx_perf_counter_t max_iters = ctx.max_iters; - ucs_status_t status = UCS_OK; + extern __shared__ ucp_device_request_t shared_requests[]; + unsigned thread_index = ucx_perf_cuda_thread_index(threadIdx.x); + unsigned reqs_count = ucx_ceil_div(ctx.max_outstanding, + ctx.device_fc_window); + ucp_device_request_t *reqs = &shared_requests[reqs_count * thread_index]; + + ucp_perf_cuda_request_manager req_mgr(ctx.max_outstanding, + ctx.device_fc_window, reqs); + + if (ctx.device_fc_window > 1) { + ctx.status = ucp_perf_cuda_put_bw_kernel_impl( + ctx, params, req_mgr); + } else { + ctx.status = ucp_perf_cuda_put_bw_kernel_impl( + ctx, params, req_mgr); + } +} + +template +__global__ void +ucp_perf_cuda_put_latency_kernel(ucx_perf_cuda_context &ctx, + ucp_perf_cuda_params params, bool is_sender) +{ + extern __shared__ ucp_device_request_t shared_requests[]; + ucx_perf_counter_t max_iters = ctx.max_iters; + ucs_status_t status = UCS_OK; + unsigned thread_index = ucx_perf_cuda_thread_index(threadIdx.x); + ucp_device_request_t *req = &shared_requests[thread_index]; + ucx_perf_cuda_reporter reporter(ctx); for (ucx_perf_counter_t idx = 0; idx < max_iters; idx++) { if (is_sender) { @@ -311,16 +384,15 @@ ucp_perf_cuda_put_multi_latency_kernel(ucx_perf_cuda_context &ctx, } } - ucx_perf_cuda_update_report(ctx, idx + 1, max_iters, last_report_time); - __syncthreads(); + reporter.update_report(idx + 1); } ctx.status = status; } __global__ void -ucp_perf_cuda_wait_multi_bw_kernel(ucx_perf_cuda_context &ctx, - ucp_perf_cuda_params params) +ucp_perf_cuda_wait_bw_kernel(ucx_perf_cuda_context &ctx, + ucp_perf_cuda_params params) { // TODO: we can use ucp_device_counter_read, but it adds latency volatile uint64_t *sn = params.counter_recv; @@ -350,8 +422,9 @@ public: ucp_perf_barrier(&m_perf); ucx_perf_test_start_clock(&m_perf); - UCX_KERNEL_DISPATCH(m_perf, ucp_perf_cuda_put_multi_latency_kernel, - *m_gpu_ctx, params_handler.get_params(), my_index); + UCX_PERF_KERNEL_DISPATCH(m_perf, ucp_perf_cuda_put_latency_kernel, + *m_gpu_ctx, params_handler.get_params(), + my_index); CUDA_CALL_RET(UCS_ERR_NO_DEVICE, cudaGetLastError); wait_for_kernel(); @@ -372,12 +445,12 @@ public: ucx_perf_test_start_clock(&m_perf); if (my_index == 1) { - UCX_KERNEL_DISPATCH(m_perf, ucp_perf_cuda_put_multi_bw_kernel, - *m_gpu_ctx, params_handler.get_params()); + UCX_PERF_KERNEL_DISPATCH(m_perf, ucp_perf_cuda_put_bw_kernel, + *m_gpu_ctx, params_handler.get_params()); CUDA_CALL_RET(UCS_ERR_NO_DEVICE, cudaGetLastError); wait_for_kernel(); } else if (my_index == 0) { - ucp_perf_cuda_wait_multi_bw_kernel<<<1, 1>>>( + ucp_perf_cuda_wait_bw_kernel<<<1, 1>>>( *m_gpu_ctx, params_handler.get_params()); } diff --git a/src/tools/perf/perftest.c b/src/tools/perf/perftest.c index 7da9fea02ed..94893a76918 100644 --- a/src/tools/perf/perftest.c +++ b/src/tools/perf/perftest.c @@ -90,13 +90,13 @@ test_type_t tests[] = { "put multi bandwidth", "overhead", 32}, {"ucp_put_multi_lat", UCX_PERF_API_UCP, UCX_PERF_CMD_PUT_MULTI, UCX_PERF_TEST_TYPE_PINGPONG, - "put multi latency", "latency", 32}, + "put multi latency", "latency", 1}, {"ucp_put_partial_bw", UCX_PERF_API_UCP, UCX_PERF_CMD_PUT_PARTIAL, UCX_PERF_TEST_TYPE_STREAM_UNI, "put partial bandwidth", "overhead", 32}, {"ucp_put_partial_lat", UCX_PERF_API_UCP, UCX_PERF_CMD_PUT_PARTIAL, UCX_PERF_TEST_TYPE_PINGPONG, - "put partial latency", "latency", 32}, + "put partial latency", "latency", 1}, {"ucp_get", UCX_PERF_API_UCP, UCX_PERF_CMD_GET, UCX_PERF_TEST_TYPE_STREAM_UNI, "get latency / bandwidth / message rate", "latency", 1}, @@ -223,6 +223,7 @@ ucs_status_t init_test_params(perftest_params_t *params) params->super.ucp.am_hdr_size = 0; params->super.device_thread_count = 1; params->super.device_block_count = 1; + params->super.device_fc_window = UCP_PERF_FC_WINDOW_DEFAULT; params->super.ucp.is_daemon_mode = 0; params->super.ucp.dmn_local_addr = empty_addr; params->super.ucp.dmn_remote_addr = empty_addr; diff --git a/src/tools/perf/perftest.h b/src/tools/perf/perftest.h index d61b44108fa..c8838efd3dd 100644 --- a/src/tools/perf/perftest.h +++ b/src/tools/perf/perftest.h @@ -19,7 +19,7 @@ #endif #define TL_RESOURCE_NAME_NONE "" -#define TEST_PARAMS_ARGS "t:n:s:W:O:w:D:i:H:oSCIqM:r:E:T:d:x:A:BUem:a:R:lyzL:" +#define TEST_PARAMS_ARGS "t:n:s:W:O:w:D:i:H:oSCIqM:r:E:T:d:x:A:BUem:a:R:lyzL:F:" #define TEST_ID_UNDEFINED -1 #define DEFAULT_DAEMON_PORT 1338 diff --git a/src/tools/perf/perftest_params.c b/src/tools/perf/perftest_params.c index e24a0be5507..9098cdaed2f 100644 --- a/src/tools/perf/perftest_params.c +++ b/src/tools/perf/perftest_params.c @@ -68,7 +68,7 @@ static void usage(const struct perftest_context *ctx, const char *program) } printf("\n"); printf(" -a [,]\n"); - printf(" Accelerator device type and device id to use for running the test.\n"); + printf(" Accelerator device type and device id to use for running the test.\n"); printf(" device id is optional, it corresponds to the index of\n"); printf(" the device in the list of available devices\n"); printf(" -L device cooperation level for gdaki (thread)\n"); @@ -76,6 +76,10 @@ static void usage(const struct perftest_context *ctx, const char *program) printf(" warp - warp level\n"); printf(" block - block level\n"); printf(" grid - grid level\n"); + printf(" -F flow control window size for device tests (%u).\n", + ctx->params.super.device_fc_window); + printf(" This option defines the number of iterations per which a single flow control\n"); + printf(" request is sent.\n"); printf(" -s list of scatter-gather sizes for single message (%zu)\n", ctx->params.super.msg_size_list[0]); printf(" for example: \"-s 16,48,8192,8192,14\"\n"); @@ -721,6 +725,9 @@ ucs_status_t parse_test_params(perftest_params_t *params, char opt, ¶ms->super.recv_device); case 'L': return parse_device_level(opt_arg, ¶ms->super.device_level); + case 'F': + return parse_int(opt_arg, ¶ms->super.device_fc_window, + "device flow control window size", 1, INT_MAX); case 'y': params->super.flags |= UCX_PERF_TEST_FLAG_AM_RECV_COPY; return UCS_OK; diff --git a/test/gtest/common/test_perf.cc b/test/gtest/common/test_perf.cc index 93d3171311d..7afb5a02715 100644 --- a/test/gtest/common/test_perf.cc +++ b/test/gtest/common/test_perf.cc @@ -219,6 +219,7 @@ void test_perf::test_params_init(const test_spec &test, params.device_thread_count = 1; params.device_block_count = 1; params.device_level = UCS_DEVICE_LEVEL_THREAD; + params.device_fc_window = UCP_PERF_FC_WINDOW_DEFAULT; params.percentile_rank = 50.0; memset(params.uct.md_name, 0, sizeof(params.uct.md_name));