From e778a99af62b9fbef40876356353a33d4104c720 Mon Sep 17 00:00:00 2001 From: Ovidiu Mara Date: Tue, 7 Oct 2025 19:00:57 +0200 Subject: [PATCH 1/6] Initialize objects with zero Signed-off-by: Ovidiu Mara --- .ci/jenkins/lib/build-matrix.yaml | 1 - src/core/nixl_listener.cpp | 8 +- src/infra/nixl_descriptors.cpp | 20 +++-- src/plugins/ucx/ucx_backend.cpp | 1 + src/utils/stream/metadata_stream.cpp | 38 ++++---- src/utils/ucx/gpu_xfer_req_h.cpp | 44 ++++----- src/utils/ucx/ucx_utils.cpp | 128 +++++++++++++-------------- 7 files changed, 121 insertions(+), 119 deletions(-) diff --git a/.ci/jenkins/lib/build-matrix.yaml b/.ci/jenkins/lib/build-matrix.yaml index daed884c7..304ac34ce 100644 --- a/.ci/jenkins/lib/build-matrix.yaml +++ b/.ci/jenkins/lib/build-matrix.yaml @@ -48,7 +48,6 @@ env: NIXL_INSTALL_DIR: /opt/nixl TEST_TIMEOUT: 30 NPROC: "16" - UCX_TLS: "^shm" steps: - name: Build diff --git a/src/core/nixl_listener.cpp b/src/core/nixl_listener.cpp index 0dc5e76f3..c117ae149 100644 --- a/src/core/nixl_listener.cpp +++ b/src/core/nixl_listener.cpp @@ -35,10 +35,12 @@ namespace { static const std::string invalid_label = "invalid"; -int connectToIP(std::string ip_addr, int port) { +int +connectToIP(std::string ip_addr, int port) { - struct sockaddr_in listenerAddr; - listenerAddr.sin_port = htons(port); + struct sockaddr_in listenerAddr {}; + + listenerAddr.sin_port = htons(port); listenerAddr.sin_family = AF_INET; if (inet_pton(AF_INET, ip_addr.c_str(), &listenerAddr.sin_addr) <= 0) { diff --git a/src/infra/nixl_descriptors.cpp b/src/infra/nixl_descriptors.cpp index ecb005389..7ad979952 100644 --- a/src/infra/nixl_descriptors.cpp +++ b/src/infra/nixl_descriptors.cpp @@ -112,19 +112,23 @@ nixlBlobDesc::nixlBlobDesc(const nixlBasicDesc &desc, } nixlBlobDesc::nixlBlobDesc(const nixl_blob_t &blob) { + if (blob.size() < sizeof(nixlBasicDesc)) { + NIXL_ERROR << "Blob size is less than the size of nixlBasicDesc"; + addr = 0; + len = 0; + devId = 0; + metaInfo.resize(0); + return; + } size_t meta_size = blob.size() - sizeof(nixlBasicDesc); if (meta_size > 0) { metaInfo.resize(meta_size); blob.copy(reinterpret_cast(this), sizeof(nixlBasicDesc)); - blob.copy(reinterpret_cast(&metaInfo[0]), - meta_size, sizeof(nixlBasicDesc)); + blob.copy(reinterpret_cast(&metaInfo[0]), meta_size, sizeof(nixlBasicDesc)); } else if (meta_size == 0) { - blob.copy(reinterpret_cast(this), sizeof(nixlBasicDesc)); - } else { // Error - addr = 0; - len = 0; - devId = 0; - metaInfo.resize(0); + blob.copy(reinterpret_cast(this), sizeof(nixlBasicDesc)); + } else { + NIXL_ASSERT(false) << "Negative meta size"; } } diff --git a/src/plugins/ucx/ucx_backend.cpp b/src/plugins/ucx/ucx_backend.cpp index 37bac433a..3d9722ad9 100644 --- a/src/plugins/ucx/ucx_backend.cpp +++ b/src/plugins/ucx/ucx_backend.cpp @@ -1811,6 +1811,7 @@ nixl_status_t nixlUcxEngine::genNotif(const std::string &remote_agent, const std switch(ret) { case NIXL_IN_PROG: /* do not track the request */ + // TODO: why are we releasing a request that is still in progress? getWorker(getWorkerId())->reqRelease(req); case NIXL_SUCCESS: break; diff --git a/src/utils/stream/metadata_stream.cpp b/src/utils/stream/metadata_stream.cpp index 5bbd73ff7..a38d72252 100644 --- a/src/utils/stream/metadata_stream.cpp +++ b/src/utils/stream/metadata_stream.cpp @@ -132,21 +132,21 @@ std::string nixlMDStreamListener::recvFromClient() { return recvData; } -void nixlMDStreamListener::recvFromClients(int clientSocket) { - char buffer[RECV_BUFFER_SIZE]; - int bytes_read; - - while ((bytes_read = recv(clientSocket, buffer, - sizeof(buffer), 0)) > 0) { - buffer[bytes_read] = '\0'; - // Return ack - std::string ack = "Message received"; - send(clientSocket, ack.c_str(), ack.size(), 0); - std::string recv_message(buffer); - NIXL_DEBUG << "Message Received" << recv_message; - } - close(clientSocket); - NIXL_DEBUG << "Client Disconnected"; +void +nixlMDStreamListener::recvFromClients(int clientSocket) { + char buffer[RECV_BUFFER_SIZE + 1]; + int bytes_read; + + while ((bytes_read = recv(clientSocket, buffer, sizeof(buffer), 0)) > 0) { + buffer[bytes_read] = '\0'; + // Return ack + std::string ack = "Message received"; + send(clientSocket, ack.c_str(), ack.size(), 0); + std::string recv_message(buffer); + NIXL_DEBUG << "Message Received" << recv_message; + } + close(clientSocket); + NIXL_DEBUG << "Client Disconnected"; } void nixlMDStreamListener::startListenerForClient() { @@ -169,12 +169,14 @@ nixlMDStreamClient::~nixlMDStreamClient() { closeStream(); } -bool nixlMDStreamClient::setupClient() { +bool +nixlMDStreamClient::setupClient() { setupStream(); - struct sockaddr_in listenerAddr; + struct sockaddr_in listenerAddr {}; + listenerAddr.sin_family = AF_INET; - listenerAddr.sin_port = htons(port); + listenerAddr.sin_port = htons(port); if (inet_pton(AF_INET, listenerAddress.c_str(), &listenerAddr.sin_addr) <= 0) { diff --git a/src/utils/ucx/gpu_xfer_req_h.cpp b/src/utils/ucx/gpu_xfer_req_h.cpp index a75974f09..25ccfe2b0 100644 --- a/src/utils/ucx/gpu_xfer_req_h.cpp +++ b/src/utils/ucx/gpu_xfer_req_h.cpp @@ -54,31 +54,33 @@ createGpuXferReq(const nixlUcxEp &ep, ucp_elements.reserve(local_mems.size()); for (size_t i = 0; i < local_mems.size(); i++) { - ucp_device_mem_list_elem_t ucp_elem; - ucp_elem.field_mask = UCP_DEVICE_MEM_LIST_ELEM_FIELD_MEMH | - UCP_DEVICE_MEM_LIST_ELEM_FIELD_RKEY | UCP_DEVICE_MEM_LIST_ELEM_FIELD_LOCAL_ADDR | - UCP_DEVICE_MEM_LIST_ELEM_FIELD_REMOTE_ADDR | UCP_DEVICE_MEM_LIST_ELEM_FIELD_LENGTH; - ucp_elem.memh = local_mems[i].getMemh(); - ucp_elem.rkey = remote_rkeys[i]->get(); - ucp_elem.local_addr = local_mems[i].getBase(); - ucp_elem.remote_addr = remote_addrs[i]; - ucp_elem.length = local_mems[i].getSize(); + ucp_device_mem_list_elem_t ucp_elem = { + .field_mask = UCP_DEVICE_MEM_LIST_ELEM_FIELD_MEMH | + UCP_DEVICE_MEM_LIST_ELEM_FIELD_RKEY | UCP_DEVICE_MEM_LIST_ELEM_FIELD_LOCAL_ADDR | + UCP_DEVICE_MEM_LIST_ELEM_FIELD_REMOTE_ADDR | UCP_DEVICE_MEM_LIST_ELEM_FIELD_LENGTH, + .memh = local_mems[i].getMemh(), + .rkey = remote_rkeys[i]->get(), + .local_addr = local_mems[i].getBase(), + .remote_addr = remote_addrs[i], + .length = local_mems[i].getSize(), + }; ucp_elements.push_back(ucp_elem); } - ucp_device_mem_list_params_t params; - params.field_mask = UCP_DEVICE_MEM_LIST_PARAMS_FIELD_ELEMENTS | +ucp_device_mem_list_params_t params = { + .field_mask = UCP_DEVICE_MEM_LIST_PARAMS_FIELD_ELEMENTS | UCP_DEVICE_MEM_LIST_PARAMS_FIELD_ELEMENT_SIZE | - UCP_DEVICE_MEM_LIST_PARAMS_FIELD_NUM_ELEMENTS; - params.elements = ucp_elements.data(); - params.element_size = sizeof(ucp_device_mem_list_elem_t); - params.num_elements = ucp_elements.size(); - - ucp_device_mem_list_handle_h ucx_handle; - ucs_status_t ucs_status = ucp_device_mem_list_create(ep.getEp(), ¶ms, &ucx_handle); - if (ucs_status != UCS_OK) { - throw std::runtime_error(std::string("Failed to create device memory list: ") + - ucs_status_string(ucs_status)); + UCP_DEVICE_MEM_LIST_PARAMS_FIELD_NUM_ELEMENTS, + .elements = ucp_elements.data(), + .element_size = sizeof(ucp_device_mem_list_elem_t), + .num_elements = ucp_elements.size(), +}; + +ucp_device_mem_list_handle_h ucx_handle; +ucs_status_t ucs_status = ucp_device_mem_list_create(ep.getEp(), ¶ms, &ucx_handle); +if (ucs_status != UCS_OK) { + throw std::runtime_error(std::string("Failed to create device memory list: ") + + ucs_status_string(ucs_status)); } NIXL_DEBUG << "Created device memory list handle with " << local_mems.size() << " elements"; diff --git a/src/utils/ucx/ucx_utils.cpp b/src/utils/ucx/ucx_utils.cpp index 1bbac53c8..50b1b7fc9 100644 --- a/src/utils/ucx/ucx_utils.cpp +++ b/src/utils/ucx/ucx_utils.cpp @@ -155,13 +155,13 @@ void nixlUcxEp::setState(nixl_ucx_ep_state_t new_state) nixl_status_t nixlUcxEp::closeImpl(ucp_ep_close_flags_t flags) { - ucs_status_ptr_t request = nullptr; + ucs_status_ptr_t request = nullptr; ucp_request_param_t req_param = { .op_attr_mask = UCP_OP_ATTR_FIELD_FLAGS, - .flags = flags + .flags = flags, }; - switch(state) { + switch (state) { case NIXL_UCX_EP_STATE_NULL: case NIXL_UCX_EP_STATE_DISCONNECTED: // The EP has not been connected, or already disconnected. @@ -192,21 +192,19 @@ nixlUcxEp::closeImpl(ucp_ep_close_flags_t flags) std::terminate(); } -nixlUcxEp::nixlUcxEp(ucp_worker_h worker, void* addr, - ucp_err_handling_mode_t err_handling_mode) -{ - ucp_ep_params_t ep_params; - nixl_status_t status; - - ep_params.field_mask = UCP_EP_PARAM_FIELD_REMOTE_ADDRESS | - UCP_EP_PARAM_FIELD_ERR_HANDLER | - UCP_EP_PARAM_FIELD_ERR_HANDLING_MODE; - ep_params.err_mode = err_handling_mode; - ep_params.err_handler.cb = err_cb_wrapper; - ep_params.err_handler.arg = reinterpret_cast(this); - ep_params.address = reinterpret_cast(addr); - - status = ucx_status_to_nixl(ucp_ep_create(worker, &ep_params, &eph)); +nixlUcxEp::nixlUcxEp(ucp_worker_h worker, void *addr, ucp_err_handling_mode_t err_handling_mode) { + ucp_ep_params_t ep_params = { + .field_mask = UCP_EP_PARAM_FIELD_REMOTE_ADDRESS | UCP_EP_PARAM_FIELD_ERR_HANDLER | + UCP_EP_PARAM_FIELD_ERR_HANDLING_MODE, + .address = reinterpret_cast(addr), + .err_mode = err_handling_mode, + .err_handler = + { + .cb = err_cb_wrapper, + .arg = reinterpret_cast(this), + }, + }; + nixl_status_t status = ucx_status_to_nixl(ucp_ep_create(worker, &ep_params, &eph)); if (status == NIXL_SUCCESS) setState(NIXL_UCX_EP_STATE_CONNECTED); else @@ -246,10 +244,10 @@ nixl_status_t nixlUcxEp::sendAm(unsigned msg_id, return status; } - ucp_request_param_t param = {0}; - - param.op_attr_mask |= UCP_OP_ATTR_FIELD_FLAGS; - param.flags = flags; + ucp_request_param_t param = { + .op_attr_mask = UCP_OP_ATTR_FIELD_FLAGS, + .flags = flags, + }; ucs_status_ptr_t request = ucp_am_send_nbx(eph, msg_id, hdr, hdr_len, buffer, len, ¶m); if (UCS_PTR_IS_PTR(request)) { @@ -345,29 +343,27 @@ nixl_status_t nixlUcxEp::estimateCost(size_t size, return NIXL_SUCCESS; } -nixl_status_t nixlUcxEp::flushEp(nixlUcxReq &req) -{ - ucp_request_param_t param; - ucs_status_ptr_t request; - - param.op_attr_mask = 0; - request = ucp_ep_flush_nbx(eph, ¶m); +nixl_status_t +nixlUcxEp::flushEp(nixlUcxReq &req) { + ucp_request_param_t param{}; + ucs_status_ptr_t request = ucp_ep_flush_nbx(eph, ¶m); if (UCS_PTR_IS_PTR(request)) { - req = (void*)request; + req = (void *)request; return NIXL_IN_PROG; } return ucx_status_to_nixl(UCS_PTR_STATUS(request)); } -bool nixlUcxMtLevelIsSupported(const nixl_ucx_mt_t mt_type) noexcept -{ - ucp_lib_attr_t attr; - attr.field_mask = UCP_LIB_ATTR_FIELD_MAX_THREAD_LEVEL; +bool +nixlUcxMtLevelIsSupported(const nixl_ucx_mt_t mt_type) noexcept { + ucp_lib_attr_t attr = { + .field_mask = UCP_LIB_ATTR_FIELD_MAX_THREAD_LEVEL, + }; ucp_lib_query(&attr); - switch(mt_type) { + switch (mt_type) { case nixl_ucx_mt_t::SINGLE: return attr.max_thread_level >= UCS_THREAD_MODE_SERIALIZED; case nixl_ucx_mt_t::CTX: @@ -384,9 +380,8 @@ nixlUcxContext::nixlUcxContext(std::vector devs, nixlUcxContext::req_cb_t fini_cb, bool prog_thread, unsigned long num_workers, - nixl_thread_sync_t sync_mode) -{ - ucp_params_t ucp_params; + nixl_thread_sync_t sync_mode) { + ucp_params_t ucp_params{}; // With strict synchronization model nixlAgent serializes access to backends, with more // permissive models backends need to account for concurrent access and ensure their internal @@ -478,11 +473,8 @@ namespace std::terminate(); } - struct nixlUcpWorkerParams - : ucp_worker_params_t - { - explicit nixlUcpWorkerParams(const nixl_ucx_mt_t t) - { + struct nixlUcpWorkerParams : ucp_worker_params_t { + explicit nixlUcpWorkerParams(const nixl_ucx_mt_t t) : ucp_worker_params_t{} { field_mask = UCP_WORKER_PARAM_FIELD_THREAD_MODE; thread_mode = toUcsThreadModeChecked(t); } @@ -509,11 +501,12 @@ nixlUcxWorker::nixlUcxWorker(const nixlUcxContext &ctx, ucp_err_handling_mode_t : worker(createUcpWorker(ctx), &ucp_worker_destroy), err_handling_mode_(err_handling_mode) {} -std::string nixlUcxWorker::epAddr() -{ - ucp_worker_attr_t wattr; +std::string +nixlUcxWorker::epAddr() { + ucp_worker_attr_t wattr = { + .field_mask = UCP_WORKER_ATTR_FIELD_ADDRESS, + }; - wattr.field_mask = UCP_WORKER_ATTR_FIELD_ADDRESS; const ucs_status_t status = ucp_worker_query(worker.get(), &wattr); if (UCS_OK != status) { throw std::runtime_error(std::string("Unable to query UCX worker address: ") + @@ -560,12 +553,12 @@ int nixlUcxContext::memReg(void *addr, size_t size, nixlUcxMem &mem, nixl_mem_t } if (nixl_mem_type == nixl_mem_t::VRAM_SEG) { - ucp_mem_attr_t attr; - attr.field_mask = UCP_MEM_ATTR_FIELD_MEM_TYPE; + ucp_mem_attr_t attr = { + .field_mask = UCP_MEM_ATTR_FIELD_MEM_TYPE, + }; status = ucp_mem_query(mem.memh, &attr); if (status != UCS_OK) { - NIXL_ERROR << absl::StrFormat("Failed to ucp_mem_query: %s", - ucs_status_string(status)); + NIXL_ERROR << absl::StrFormat("Failed to ucp_mem_query: %s", ucs_status_string(status)); ucp_mem_unmap(ctx, mem.memh); return -1; } @@ -607,12 +600,12 @@ constexpr std::string_view ucxGpuDeviceApiUnsupported{ #endif - size_t nixlUcxContext::getGpuSignalSize() const { #ifdef HAVE_UCX_GPU_DEVICE_API - ucp_context_attr_t attr; - attr.field_mask = UCP_ATTR_FIELD_DEVICE_COUNTER_SIZE; + ucp_context_attr_t attr = { + .field_mask = UCP_ATTR_FIELD_DEVICE_COUNTER_SIZE, + }; ucs_status_t query_status = ucp_context_query(ctx, &attr); if (query_status != UCS_OK) { @@ -631,17 +624,15 @@ nixlUcxContext::getGpuSignalSize() const { * Active message handling * =========================================== */ -int nixlUcxWorker::regAmCallback(unsigned msg_id, ucp_am_recv_callback_t cb, void* arg) -{ - ucp_am_handler_param_t params = {0}; - - params.field_mask = UCP_AM_HANDLER_PARAM_FIELD_ID | - UCP_AM_HANDLER_PARAM_FIELD_CB | - UCP_AM_HANDLER_PARAM_FIELD_ARG; - - params.id = msg_id; - params.cb = cb; - params.arg = arg; +int +nixlUcxWorker::regAmCallback(unsigned msg_id, ucp_am_recv_callback_t cb, void *arg) { + ucp_am_handler_param_t params = { + .field_mask = UCP_AM_HANDLER_PARAM_FIELD_ID | UCP_AM_HANDLER_PARAM_FIELD_CB | + UCP_AM_HANDLER_PARAM_FIELD_ARG, + .id = msg_id, + .cb = cb, + .arg = arg, + }; const ucs_status_t status = ucp_worker_set_am_recv_handler(worker.get(), ¶ms); @@ -706,9 +697,10 @@ nixlUcxWorker::prepGpuSignal([[maybe_unused]] const nixlUcxMem &mem, throw std::invalid_argument("Signal pointer cannot be null"); } - ucp_device_counter_params_t params; - params.field_mask = UCP_DEVICE_COUNTER_PARAMS_FIELD_MEMH; - params.memh = mem.memh; + ucp_device_counter_params_t params = { + .field_mask = UCP_DEVICE_COUNTER_PARAMS_FIELD_MEMH, + .memh = mem.memh, + }; ucs_status_t status = ucp_device_counter_init(worker.get(), ¶ms, signal); From e019d4cefa7773be099378d4fa4840f04828fa0d Mon Sep 17 00:00:00 2001 From: Ovidiu Mara Date: Tue, 7 Oct 2025 19:24:42 +0200 Subject: [PATCH 2/6] Remove inheritance Signed-off-by: Ovidiu Mara --- src/utils/ucx/ucx_utils.cpp | 17 ++++++++--------- 1 file changed, 8 insertions(+), 9 deletions(-) diff --git a/src/utils/ucx/ucx_utils.cpp b/src/utils/ucx/ucx_utils.cpp index 50b1b7fc9..8240fc9ca 100644 --- a/src/utils/ucx/ucx_utils.cpp +++ b/src/utils/ucx/ucx_utils.cpp @@ -473,21 +473,20 @@ namespace std::terminate(); } - struct nixlUcpWorkerParams : ucp_worker_params_t { - explicit nixlUcpWorkerParams(const nixl_ucx_mt_t t) : ucp_worker_params_t{} { - field_mask = UCP_WORKER_PARAM_FIELD_THREAD_MODE; - thread_mode = toUcsThreadModeChecked(t); - } - }; - - static_assert(sizeof(nixlUcpWorkerParams) == sizeof(ucp_worker_params_t)); + static ucp_worker_params_t toUcpWorkerParams(const nixl_ucx_mt_t t) + { + return ucp_worker_params_t{ + .field_mask = UCP_WORKER_PARAM_FIELD_THREAD_MODE, + .thread_mode = toUcsThreadModeChecked(t), + }; + } } // namespace ucp_worker * nixlUcxWorker::createUcpWorker(const nixlUcxContext &ctx) { ucp_worker* worker = nullptr; - const nixlUcpWorkerParams params(ctx.mt_type); + const ucp_worker_params_t params = toUcpWorkerParams(ctx.mt_type); const ucs_status_t status = ucp_worker_create(ctx.ctx, ¶ms, &worker); if(status != UCS_OK) { throw std::runtime_error(std::string("Failed to create UCX worker: ") + From be8f1075408c9f028ef5498325bd9fed4e9b519b Mon Sep 17 00:00:00 2001 From: Ovidiu Mara Date: Wed, 8 Oct 2025 09:54:37 +0200 Subject: [PATCH 3/6] Stress test (TODO: revert) Signed-off-by: Ovidiu Mara --- .ci/jenkins/lib/test-matrix.yaml | 2 +- .gitlab/test_cpp.sh | 12 +++++++----- 2 files changed, 8 insertions(+), 6 deletions(-) diff --git a/.ci/jenkins/lib/test-matrix.yaml b/.ci/jenkins/lib/test-matrix.yaml index df5382ccb..942c4195a 100644 --- a/.ci/jenkins/lib/test-matrix.yaml +++ b/.ci/jenkins/lib/test-matrix.yaml @@ -101,7 +101,7 @@ steps: - name: Test CPP parallel: false run: | - timeout ${TEST_TIMEOUT}m docker exec -w ${WORKSPACE} "${JOB_BASE_NAME}-${BUILD_ID}-${axis_index}" /bin/bash -c ".gitlab/test_cpp.sh ${INSTALL_DIR}" + for i in $(seq 1 30); do timeout ${TEST_TIMEOUT}m docker exec -w ${WORKSPACE} "${JOB_BASE_NAME}-${BUILD_ID}-${axis_index}" /bin/bash -c ".gitlab/test_cpp.sh ${INSTALL_DIR}" ; done onfail: | docker rm -f "${JOB_BASE_NAME}-${BUILD_ID}-${axis_index}" docker image rm -f "${JOB_BASE_NAME}-${BUILD_ID}-${axis_index}" diff --git a/.gitlab/test_cpp.sh b/.gitlab/test_cpp.sh index 0753268b5..b2445bea2 100755 --- a/.gitlab/test_cpp.sh +++ b/.gitlab/test_cpp.sh @@ -59,11 +59,13 @@ sleep 5 echo "==== Running C++ tests ====" cd ${INSTALL_DIR} -./bin/desc_example -./bin/agent_example -./bin/nixl_example -./bin/nixl_etcd_example -./bin/ucx_backend_test +for i in $(seq 1 10); do + ./bin/desc_example + ./bin/agent_example + ./bin/nixl_example + ./bin/nixl_etcd_example + ./bin/ucx_backend_test +done # Skip UCX_MO backend test on GPU worker, fails VRAM transfers if ! $HAS_GPU ; then ./bin/ucx_mo_backend_test From 2e55617d77a6603a1561486bc2b41ca21b6cc079 Mon Sep 17 00:00:00 2001 From: Ovidiu Mara Date: Wed, 8 Oct 2025 10:49:10 +0200 Subject: [PATCH 4/6] Revert "Remove inheritance" This reverts commit e019d4cefa7773be099378d4fa4840f04828fa0d. --- src/utils/ucx/ucx_utils.cpp | 17 +++++++++-------- 1 file changed, 9 insertions(+), 8 deletions(-) diff --git a/src/utils/ucx/ucx_utils.cpp b/src/utils/ucx/ucx_utils.cpp index 8240fc9ca..50b1b7fc9 100644 --- a/src/utils/ucx/ucx_utils.cpp +++ b/src/utils/ucx/ucx_utils.cpp @@ -473,20 +473,21 @@ namespace std::terminate(); } - static ucp_worker_params_t toUcpWorkerParams(const nixl_ucx_mt_t t) - { - return ucp_worker_params_t{ - .field_mask = UCP_WORKER_PARAM_FIELD_THREAD_MODE, - .thread_mode = toUcsThreadModeChecked(t), - }; - } + struct nixlUcpWorkerParams : ucp_worker_params_t { + explicit nixlUcpWorkerParams(const nixl_ucx_mt_t t) : ucp_worker_params_t{} { + field_mask = UCP_WORKER_PARAM_FIELD_THREAD_MODE; + thread_mode = toUcsThreadModeChecked(t); + } + }; + + static_assert(sizeof(nixlUcpWorkerParams) == sizeof(ucp_worker_params_t)); } // namespace ucp_worker * nixlUcxWorker::createUcpWorker(const nixlUcxContext &ctx) { ucp_worker* worker = nullptr; - const ucp_worker_params_t params = toUcpWorkerParams(ctx.mt_type); + const nixlUcpWorkerParams params(ctx.mt_type); const ucs_status_t status = ucp_worker_create(ctx.ctx, ¶ms, &worker); if(status != UCS_OK) { throw std::runtime_error(std::string("Failed to create UCX worker: ") + From 0021537da853ed5fcc9da16356a66c938cb5262a Mon Sep 17 00:00:00 2001 From: Ovidiu Mara Date: Wed, 8 Oct 2025 10:49:26 +0200 Subject: [PATCH 5/6] Revert "Initialize objects with zero" This reverts commit e778a99af62b9fbef40876356353a33d4104c720. --- .ci/jenkins/lib/build-matrix.yaml | 1 + src/core/nixl_listener.cpp | 8 +- src/infra/nixl_descriptors.cpp | 20 ++--- src/plugins/ucx/ucx_backend.cpp | 1 - src/utils/stream/metadata_stream.cpp | 38 ++++---- src/utils/ucx/gpu_xfer_req_h.cpp | 44 +++++---- src/utils/ucx/ucx_utils.cpp | 128 ++++++++++++++------------- 7 files changed, 119 insertions(+), 121 deletions(-) diff --git a/.ci/jenkins/lib/build-matrix.yaml b/.ci/jenkins/lib/build-matrix.yaml index 304ac34ce..daed884c7 100644 --- a/.ci/jenkins/lib/build-matrix.yaml +++ b/.ci/jenkins/lib/build-matrix.yaml @@ -48,6 +48,7 @@ env: NIXL_INSTALL_DIR: /opt/nixl TEST_TIMEOUT: 30 NPROC: "16" + UCX_TLS: "^shm" steps: - name: Build diff --git a/src/core/nixl_listener.cpp b/src/core/nixl_listener.cpp index c117ae149..0dc5e76f3 100644 --- a/src/core/nixl_listener.cpp +++ b/src/core/nixl_listener.cpp @@ -35,12 +35,10 @@ namespace { static const std::string invalid_label = "invalid"; -int -connectToIP(std::string ip_addr, int port) { +int connectToIP(std::string ip_addr, int port) { - struct sockaddr_in listenerAddr {}; - - listenerAddr.sin_port = htons(port); + struct sockaddr_in listenerAddr; + listenerAddr.sin_port = htons(port); listenerAddr.sin_family = AF_INET; if (inet_pton(AF_INET, ip_addr.c_str(), &listenerAddr.sin_addr) <= 0) { diff --git a/src/infra/nixl_descriptors.cpp b/src/infra/nixl_descriptors.cpp index 7ad979952..ecb005389 100644 --- a/src/infra/nixl_descriptors.cpp +++ b/src/infra/nixl_descriptors.cpp @@ -112,23 +112,19 @@ nixlBlobDesc::nixlBlobDesc(const nixlBasicDesc &desc, } nixlBlobDesc::nixlBlobDesc(const nixl_blob_t &blob) { - if (blob.size() < sizeof(nixlBasicDesc)) { - NIXL_ERROR << "Blob size is less than the size of nixlBasicDesc"; - addr = 0; - len = 0; - devId = 0; - metaInfo.resize(0); - return; - } size_t meta_size = blob.size() - sizeof(nixlBasicDesc); if (meta_size > 0) { metaInfo.resize(meta_size); blob.copy(reinterpret_cast(this), sizeof(nixlBasicDesc)); - blob.copy(reinterpret_cast(&metaInfo[0]), meta_size, sizeof(nixlBasicDesc)); + blob.copy(reinterpret_cast(&metaInfo[0]), + meta_size, sizeof(nixlBasicDesc)); } else if (meta_size == 0) { - blob.copy(reinterpret_cast(this), sizeof(nixlBasicDesc)); - } else { - NIXL_ASSERT(false) << "Negative meta size"; + blob.copy(reinterpret_cast(this), sizeof(nixlBasicDesc)); + } else { // Error + addr = 0; + len = 0; + devId = 0; + metaInfo.resize(0); } } diff --git a/src/plugins/ucx/ucx_backend.cpp b/src/plugins/ucx/ucx_backend.cpp index 3d9722ad9..37bac433a 100644 --- a/src/plugins/ucx/ucx_backend.cpp +++ b/src/plugins/ucx/ucx_backend.cpp @@ -1811,7 +1811,6 @@ nixl_status_t nixlUcxEngine::genNotif(const std::string &remote_agent, const std switch(ret) { case NIXL_IN_PROG: /* do not track the request */ - // TODO: why are we releasing a request that is still in progress? getWorker(getWorkerId())->reqRelease(req); case NIXL_SUCCESS: break; diff --git a/src/utils/stream/metadata_stream.cpp b/src/utils/stream/metadata_stream.cpp index a38d72252..5bbd73ff7 100644 --- a/src/utils/stream/metadata_stream.cpp +++ b/src/utils/stream/metadata_stream.cpp @@ -132,21 +132,21 @@ std::string nixlMDStreamListener::recvFromClient() { return recvData; } -void -nixlMDStreamListener::recvFromClients(int clientSocket) { - char buffer[RECV_BUFFER_SIZE + 1]; - int bytes_read; - - while ((bytes_read = recv(clientSocket, buffer, sizeof(buffer), 0)) > 0) { - buffer[bytes_read] = '\0'; - // Return ack - std::string ack = "Message received"; - send(clientSocket, ack.c_str(), ack.size(), 0); - std::string recv_message(buffer); - NIXL_DEBUG << "Message Received" << recv_message; - } - close(clientSocket); - NIXL_DEBUG << "Client Disconnected"; +void nixlMDStreamListener::recvFromClients(int clientSocket) { + char buffer[RECV_BUFFER_SIZE]; + int bytes_read; + + while ((bytes_read = recv(clientSocket, buffer, + sizeof(buffer), 0)) > 0) { + buffer[bytes_read] = '\0'; + // Return ack + std::string ack = "Message received"; + send(clientSocket, ack.c_str(), ack.size(), 0); + std::string recv_message(buffer); + NIXL_DEBUG << "Message Received" << recv_message; + } + close(clientSocket); + NIXL_DEBUG << "Client Disconnected"; } void nixlMDStreamListener::startListenerForClient() { @@ -169,14 +169,12 @@ nixlMDStreamClient::~nixlMDStreamClient() { closeStream(); } -bool -nixlMDStreamClient::setupClient() { +bool nixlMDStreamClient::setupClient() { setupStream(); - struct sockaddr_in listenerAddr {}; - + struct sockaddr_in listenerAddr; listenerAddr.sin_family = AF_INET; - listenerAddr.sin_port = htons(port); + listenerAddr.sin_port = htons(port); if (inet_pton(AF_INET, listenerAddress.c_str(), &listenerAddr.sin_addr) <= 0) { diff --git a/src/utils/ucx/gpu_xfer_req_h.cpp b/src/utils/ucx/gpu_xfer_req_h.cpp index 25ccfe2b0..a75974f09 100644 --- a/src/utils/ucx/gpu_xfer_req_h.cpp +++ b/src/utils/ucx/gpu_xfer_req_h.cpp @@ -54,33 +54,31 @@ createGpuXferReq(const nixlUcxEp &ep, ucp_elements.reserve(local_mems.size()); for (size_t i = 0; i < local_mems.size(); i++) { - ucp_device_mem_list_elem_t ucp_elem = { - .field_mask = UCP_DEVICE_MEM_LIST_ELEM_FIELD_MEMH | - UCP_DEVICE_MEM_LIST_ELEM_FIELD_RKEY | UCP_DEVICE_MEM_LIST_ELEM_FIELD_LOCAL_ADDR | - UCP_DEVICE_MEM_LIST_ELEM_FIELD_REMOTE_ADDR | UCP_DEVICE_MEM_LIST_ELEM_FIELD_LENGTH, - .memh = local_mems[i].getMemh(), - .rkey = remote_rkeys[i]->get(), - .local_addr = local_mems[i].getBase(), - .remote_addr = remote_addrs[i], - .length = local_mems[i].getSize(), - }; + ucp_device_mem_list_elem_t ucp_elem; + ucp_elem.field_mask = UCP_DEVICE_MEM_LIST_ELEM_FIELD_MEMH | + UCP_DEVICE_MEM_LIST_ELEM_FIELD_RKEY | UCP_DEVICE_MEM_LIST_ELEM_FIELD_LOCAL_ADDR | + UCP_DEVICE_MEM_LIST_ELEM_FIELD_REMOTE_ADDR | UCP_DEVICE_MEM_LIST_ELEM_FIELD_LENGTH; + ucp_elem.memh = local_mems[i].getMemh(); + ucp_elem.rkey = remote_rkeys[i]->get(); + ucp_elem.local_addr = local_mems[i].getBase(); + ucp_elem.remote_addr = remote_addrs[i]; + ucp_elem.length = local_mems[i].getSize(); ucp_elements.push_back(ucp_elem); } -ucp_device_mem_list_params_t params = { - .field_mask = UCP_DEVICE_MEM_LIST_PARAMS_FIELD_ELEMENTS | + ucp_device_mem_list_params_t params; + params.field_mask = UCP_DEVICE_MEM_LIST_PARAMS_FIELD_ELEMENTS | UCP_DEVICE_MEM_LIST_PARAMS_FIELD_ELEMENT_SIZE | - UCP_DEVICE_MEM_LIST_PARAMS_FIELD_NUM_ELEMENTS, - .elements = ucp_elements.data(), - .element_size = sizeof(ucp_device_mem_list_elem_t), - .num_elements = ucp_elements.size(), -}; - -ucp_device_mem_list_handle_h ucx_handle; -ucs_status_t ucs_status = ucp_device_mem_list_create(ep.getEp(), ¶ms, &ucx_handle); -if (ucs_status != UCS_OK) { - throw std::runtime_error(std::string("Failed to create device memory list: ") + - ucs_status_string(ucs_status)); + UCP_DEVICE_MEM_LIST_PARAMS_FIELD_NUM_ELEMENTS; + params.elements = ucp_elements.data(); + params.element_size = sizeof(ucp_device_mem_list_elem_t); + params.num_elements = ucp_elements.size(); + + ucp_device_mem_list_handle_h ucx_handle; + ucs_status_t ucs_status = ucp_device_mem_list_create(ep.getEp(), ¶ms, &ucx_handle); + if (ucs_status != UCS_OK) { + throw std::runtime_error(std::string("Failed to create device memory list: ") + + ucs_status_string(ucs_status)); } NIXL_DEBUG << "Created device memory list handle with " << local_mems.size() << " elements"; diff --git a/src/utils/ucx/ucx_utils.cpp b/src/utils/ucx/ucx_utils.cpp index 50b1b7fc9..1bbac53c8 100644 --- a/src/utils/ucx/ucx_utils.cpp +++ b/src/utils/ucx/ucx_utils.cpp @@ -155,13 +155,13 @@ void nixlUcxEp::setState(nixl_ucx_ep_state_t new_state) nixl_status_t nixlUcxEp::closeImpl(ucp_ep_close_flags_t flags) { - ucs_status_ptr_t request = nullptr; + ucs_status_ptr_t request = nullptr; ucp_request_param_t req_param = { .op_attr_mask = UCP_OP_ATTR_FIELD_FLAGS, - .flags = flags, + .flags = flags }; - switch (state) { + switch(state) { case NIXL_UCX_EP_STATE_NULL: case NIXL_UCX_EP_STATE_DISCONNECTED: // The EP has not been connected, or already disconnected. @@ -192,19 +192,21 @@ nixlUcxEp::closeImpl(ucp_ep_close_flags_t flags) std::terminate(); } -nixlUcxEp::nixlUcxEp(ucp_worker_h worker, void *addr, ucp_err_handling_mode_t err_handling_mode) { - ucp_ep_params_t ep_params = { - .field_mask = UCP_EP_PARAM_FIELD_REMOTE_ADDRESS | UCP_EP_PARAM_FIELD_ERR_HANDLER | - UCP_EP_PARAM_FIELD_ERR_HANDLING_MODE, - .address = reinterpret_cast(addr), - .err_mode = err_handling_mode, - .err_handler = - { - .cb = err_cb_wrapper, - .arg = reinterpret_cast(this), - }, - }; - nixl_status_t status = ucx_status_to_nixl(ucp_ep_create(worker, &ep_params, &eph)); +nixlUcxEp::nixlUcxEp(ucp_worker_h worker, void* addr, + ucp_err_handling_mode_t err_handling_mode) +{ + ucp_ep_params_t ep_params; + nixl_status_t status; + + ep_params.field_mask = UCP_EP_PARAM_FIELD_REMOTE_ADDRESS | + UCP_EP_PARAM_FIELD_ERR_HANDLER | + UCP_EP_PARAM_FIELD_ERR_HANDLING_MODE; + ep_params.err_mode = err_handling_mode; + ep_params.err_handler.cb = err_cb_wrapper; + ep_params.err_handler.arg = reinterpret_cast(this); + ep_params.address = reinterpret_cast(addr); + + status = ucx_status_to_nixl(ucp_ep_create(worker, &ep_params, &eph)); if (status == NIXL_SUCCESS) setState(NIXL_UCX_EP_STATE_CONNECTED); else @@ -244,10 +246,10 @@ nixl_status_t nixlUcxEp::sendAm(unsigned msg_id, return status; } - ucp_request_param_t param = { - .op_attr_mask = UCP_OP_ATTR_FIELD_FLAGS, - .flags = flags, - }; + ucp_request_param_t param = {0}; + + param.op_attr_mask |= UCP_OP_ATTR_FIELD_FLAGS; + param.flags = flags; ucs_status_ptr_t request = ucp_am_send_nbx(eph, msg_id, hdr, hdr_len, buffer, len, ¶m); if (UCS_PTR_IS_PTR(request)) { @@ -343,27 +345,29 @@ nixl_status_t nixlUcxEp::estimateCost(size_t size, return NIXL_SUCCESS; } -nixl_status_t -nixlUcxEp::flushEp(nixlUcxReq &req) { - ucp_request_param_t param{}; - ucs_status_ptr_t request = ucp_ep_flush_nbx(eph, ¶m); +nixl_status_t nixlUcxEp::flushEp(nixlUcxReq &req) +{ + ucp_request_param_t param; + ucs_status_ptr_t request; + + param.op_attr_mask = 0; + request = ucp_ep_flush_nbx(eph, ¶m); if (UCS_PTR_IS_PTR(request)) { - req = (void *)request; + req = (void*)request; return NIXL_IN_PROG; } return ucx_status_to_nixl(UCS_PTR_STATUS(request)); } -bool -nixlUcxMtLevelIsSupported(const nixl_ucx_mt_t mt_type) noexcept { - ucp_lib_attr_t attr = { - .field_mask = UCP_LIB_ATTR_FIELD_MAX_THREAD_LEVEL, - }; +bool nixlUcxMtLevelIsSupported(const nixl_ucx_mt_t mt_type) noexcept +{ + ucp_lib_attr_t attr; + attr.field_mask = UCP_LIB_ATTR_FIELD_MAX_THREAD_LEVEL; ucp_lib_query(&attr); - switch (mt_type) { + switch(mt_type) { case nixl_ucx_mt_t::SINGLE: return attr.max_thread_level >= UCS_THREAD_MODE_SERIALIZED; case nixl_ucx_mt_t::CTX: @@ -380,8 +384,9 @@ nixlUcxContext::nixlUcxContext(std::vector devs, nixlUcxContext::req_cb_t fini_cb, bool prog_thread, unsigned long num_workers, - nixl_thread_sync_t sync_mode) { - ucp_params_t ucp_params{}; + nixl_thread_sync_t sync_mode) +{ + ucp_params_t ucp_params; // With strict synchronization model nixlAgent serializes access to backends, with more // permissive models backends need to account for concurrent access and ensure their internal @@ -473,8 +478,11 @@ namespace std::terminate(); } - struct nixlUcpWorkerParams : ucp_worker_params_t { - explicit nixlUcpWorkerParams(const nixl_ucx_mt_t t) : ucp_worker_params_t{} { + struct nixlUcpWorkerParams + : ucp_worker_params_t + { + explicit nixlUcpWorkerParams(const nixl_ucx_mt_t t) + { field_mask = UCP_WORKER_PARAM_FIELD_THREAD_MODE; thread_mode = toUcsThreadModeChecked(t); } @@ -501,12 +509,11 @@ nixlUcxWorker::nixlUcxWorker(const nixlUcxContext &ctx, ucp_err_handling_mode_t : worker(createUcpWorker(ctx), &ucp_worker_destroy), err_handling_mode_(err_handling_mode) {} -std::string -nixlUcxWorker::epAddr() { - ucp_worker_attr_t wattr = { - .field_mask = UCP_WORKER_ATTR_FIELD_ADDRESS, - }; +std::string nixlUcxWorker::epAddr() +{ + ucp_worker_attr_t wattr; + wattr.field_mask = UCP_WORKER_ATTR_FIELD_ADDRESS; const ucs_status_t status = ucp_worker_query(worker.get(), &wattr); if (UCS_OK != status) { throw std::runtime_error(std::string("Unable to query UCX worker address: ") + @@ -553,12 +560,12 @@ int nixlUcxContext::memReg(void *addr, size_t size, nixlUcxMem &mem, nixl_mem_t } if (nixl_mem_type == nixl_mem_t::VRAM_SEG) { - ucp_mem_attr_t attr = { - .field_mask = UCP_MEM_ATTR_FIELD_MEM_TYPE, - }; + ucp_mem_attr_t attr; + attr.field_mask = UCP_MEM_ATTR_FIELD_MEM_TYPE; status = ucp_mem_query(mem.memh, &attr); if (status != UCS_OK) { - NIXL_ERROR << absl::StrFormat("Failed to ucp_mem_query: %s", ucs_status_string(status)); + NIXL_ERROR << absl::StrFormat("Failed to ucp_mem_query: %s", + ucs_status_string(status)); ucp_mem_unmap(ctx, mem.memh); return -1; } @@ -600,12 +607,12 @@ constexpr std::string_view ucxGpuDeviceApiUnsupported{ #endif + size_t nixlUcxContext::getGpuSignalSize() const { #ifdef HAVE_UCX_GPU_DEVICE_API - ucp_context_attr_t attr = { - .field_mask = UCP_ATTR_FIELD_DEVICE_COUNTER_SIZE, - }; + ucp_context_attr_t attr; + attr.field_mask = UCP_ATTR_FIELD_DEVICE_COUNTER_SIZE; ucs_status_t query_status = ucp_context_query(ctx, &attr); if (query_status != UCS_OK) { @@ -624,15 +631,17 @@ nixlUcxContext::getGpuSignalSize() const { * Active message handling * =========================================== */ -int -nixlUcxWorker::regAmCallback(unsigned msg_id, ucp_am_recv_callback_t cb, void *arg) { - ucp_am_handler_param_t params = { - .field_mask = UCP_AM_HANDLER_PARAM_FIELD_ID | UCP_AM_HANDLER_PARAM_FIELD_CB | - UCP_AM_HANDLER_PARAM_FIELD_ARG, - .id = msg_id, - .cb = cb, - .arg = arg, - }; +int nixlUcxWorker::regAmCallback(unsigned msg_id, ucp_am_recv_callback_t cb, void* arg) +{ + ucp_am_handler_param_t params = {0}; + + params.field_mask = UCP_AM_HANDLER_PARAM_FIELD_ID | + UCP_AM_HANDLER_PARAM_FIELD_CB | + UCP_AM_HANDLER_PARAM_FIELD_ARG; + + params.id = msg_id; + params.cb = cb; + params.arg = arg; const ucs_status_t status = ucp_worker_set_am_recv_handler(worker.get(), ¶ms); @@ -697,10 +706,9 @@ nixlUcxWorker::prepGpuSignal([[maybe_unused]] const nixlUcxMem &mem, throw std::invalid_argument("Signal pointer cannot be null"); } - ucp_device_counter_params_t params = { - .field_mask = UCP_DEVICE_COUNTER_PARAMS_FIELD_MEMH, - .memh = mem.memh, - }; + ucp_device_counter_params_t params; + params.field_mask = UCP_DEVICE_COUNTER_PARAMS_FIELD_MEMH; + params.memh = mem.memh; ucs_status_t status = ucp_device_counter_init(worker.get(), ¶ms, signal); From fee6e44aa9b650e49a4d46493adb843270a28360 Mon Sep 17 00:00:00 2001 From: Ovidiu Mara Date: Wed, 8 Oct 2025 10:53:10 +0200 Subject: [PATCH 6/6] Increase iterations Signed-off-by: Ovidiu Mara --- .ci/jenkins/lib/build-matrix.yaml | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/.ci/jenkins/lib/build-matrix.yaml b/.ci/jenkins/lib/build-matrix.yaml index daed884c7..700d1f73e 100644 --- a/.ci/jenkins/lib/build-matrix.yaml +++ b/.ci/jenkins/lib/build-matrix.yaml @@ -46,9 +46,8 @@ matrix: env: NIXL_INSTALL_DIR: /opt/nixl - TEST_TIMEOUT: 30 + TEST_TIMEOUT: 1200 NPROC: "16" - UCX_TLS: "^shm" steps: - name: Build @@ -64,7 +63,7 @@ steps: parallel: false timeout: "${TEST_TIMEOUT}" run: | - .gitlab/test_cpp.sh ${NIXL_INSTALL_DIR} + for i in $(seq 1 30); do .gitlab/test_cpp.sh ${NIXL_INSTALL_DIR} ; done - name: Test Python parallel: false