From 51385f941c1e4fd3129bc7124b2b637784757de5 Mon Sep 17 00:00:00 2001 From: Anton Nayshtut Date: Mon, 24 Nov 2025 14:00:27 +0200 Subject: [PATCH 1/2] plugins/posix: trivial: posix_backend auto-format Signed-off-by: Anton Nayshtut --- src/plugins/posix/posix_backend.cpp | 343 +++++++++++++++------------- src/plugins/posix/posix_backend.h | 118 ++++++---- 2 files changed, 255 insertions(+), 206 deletions(-) diff --git a/src/plugins/posix/posix_backend.cpp b/src/plugins/posix/posix_backend.cpp index a391d7098..2d72a6384 100644 --- a/src/plugins/posix/posix_backend.cpp +++ b/src/plugins/posix/posix_backend.cpp @@ -28,103 +28,117 @@ #include "file/file_utils.h" namespace { - bool isValidPrepXferParams(const nixl_xfer_op_t &operation, - const nixl_meta_dlist_t &local, - const nixl_meta_dlist_t &remote, - const std::string &remote_agent, - const std::string &local_agent) { - if (remote_agent != local_agent) { - NIXL_ERROR << absl::StrFormat("Error: Remote agent must match the requesting agent (%s). Got %s", - local_agent, remote_agent); - return false; - } - - if (local.getType() != DRAM_SEG) { - NIXL_ERROR << absl::StrFormat("Error: Local memory type must be DRAM_SEG, got %d", local.getType()); - return false; - } +bool +isValidPrepXferParams(const nixl_xfer_op_t &operation, + const nixl_meta_dlist_t &local, + const nixl_meta_dlist_t &remote, + const std::string &remote_agent, + const std::string &local_agent) { + if (remote_agent != local_agent) { + NIXL_ERROR << absl::StrFormat( + "Error: Remote agent must match the requesting agent (%s). Got %s", + local_agent, + remote_agent); + return false; + } - if (remote.getType() != FILE_SEG) { - NIXL_ERROR << absl::StrFormat("Error: Remote memory type must be FILE_SEG, got %d", remote.getType()); - return false; - } + if (local.getType() != DRAM_SEG) { + NIXL_ERROR << absl::StrFormat("Error: Local memory type must be DRAM_SEG, got %d", + local.getType()); + return false; + } - if (local.descCount() != remote.descCount()) { - NIXL_ERROR << absl::StrFormat("Error: Mismatch in descriptor counts - local: %d, remote: %d", - local.descCount(), remote.descCount()); - return false; - } + if (remote.getType() != FILE_SEG) { + NIXL_ERROR << absl::StrFormat("Error: Remote memory type must be FILE_SEG, got %d", + remote.getType()); + return false; + } - return true; + if (local.descCount() != remote.descCount()) { + NIXL_ERROR << absl::StrFormat( + "Error: Mismatch in descriptor counts - local: %d, remote: %d", + local.descCount(), + remote.descCount()); + return false; } - nixlPosixBackendReqH& castPosixHandle(nixlBackendReqH* handle) { - if (!handle) { - throw nixlPosixBackendReqH::exception("received null handle", NIXL_ERR_INVALID_PARAM); - } - return dynamic_cast(*handle); + return true; +} + +nixlPosixBackendReqH & +castPosixHandle(nixlBackendReqH *handle) { + if (!handle) { + throw nixlPosixBackendReqH::exception("received null handle", NIXL_ERR_INVALID_PARAM); } + return dynamic_cast(*handle); +} - // Stringify function for queue_t - inline const char* to_string(nixlPosixQueue::queue_t type) { - using queue_t = nixlPosixQueue::queue_t; - switch (type) { - case queue_t::AIO: return "AIO"; - case queue_t::URING: return "URING"; - case queue_t::POSIXAIO: - return "POSIXAIO"; - case queue_t::UNSUPPORTED: return "UNSUPPORTED"; - default: return "UNKNOWN"; - } +// Stringify function for queue_t +inline const char * +to_string(nixlPosixQueue::queue_t type) { + using queue_t = nixlPosixQueue::queue_t; + switch (type) { + case queue_t::AIO: + return "AIO"; + case queue_t::URING: + return "URING"; + case queue_t::POSIXAIO: + return "POSIXAIO"; + case queue_t::UNSUPPORTED: + return "UNSUPPORTED"; + default: + return "UNKNOWN"; } +} - static nixlPosixQueue::queue_t getQueueType(const nixl_b_params_t* custom_params) { - using queue_t = nixlPosixQueue::queue_t; - - // Check for explicit backend request - if (custom_params) { - // First check if AIO is explicitly requested - if (custom_params->count("use_aio") > 0) { - const auto& value = custom_params->at("use_aio"); - if (value == "true" || value == "1") { - if (!QueueFactory::isLinuxAioAvailable()) { - NIXL_ERROR << "linux_aio backend requested but not available at runtime"; - return queue_t::UNSUPPORTED; - } - return queue_t::AIO; +static nixlPosixQueue::queue_t +getQueueType(const nixl_b_params_t *custom_params) { + using queue_t = nixlPosixQueue::queue_t; + + // Check for explicit backend request + if (custom_params) { + // First check if AIO is explicitly requested + if (custom_params->count("use_aio") > 0) { + const auto &value = custom_params->at("use_aio"); + if (value == "true" || value == "1") { + if (!QueueFactory::isLinuxAioAvailable()) { + NIXL_ERROR << "linux_aio backend requested but not available at runtime"; + return queue_t::UNSUPPORTED; } + return queue_t::AIO; } + } - // Then check if io_uring is explicitly requested - if (custom_params->count("use_uring") > 0) { - const auto& value = custom_params->at("use_uring"); - if (value == "true" || value == "1") { - if (!QueueFactory::isUringAvailable()) { - NIXL_ERROR << "io_uring backend requested but not available at runtime"; - return queue_t::UNSUPPORTED; - } - return queue_t::URING; + // Then check if io_uring is explicitly requested + if (custom_params->count("use_uring") > 0) { + const auto &value = custom_params->at("use_uring"); + if (value == "true" || value == "1") { + if (!QueueFactory::isUringAvailable()) { + NIXL_ERROR << "io_uring backend requested but not available at runtime"; + return queue_t::UNSUPPORTED; } + return queue_t::URING; } + } - // Then check if linux_aio is explicitly requested - if (custom_params->count("use_posix_aio") > 0) { - const auto &value = custom_params->at("use_posix_aio"); - if (value == "true" || value == "1") { - return queue_t::POSIXAIO; - } + // Then check if linux_aio is explicitly requested + if (custom_params->count("use_posix_aio") > 0) { + const auto &value = custom_params->at("use_posix_aio"); + if (value == "true" || value == "1") { + return queue_t::POSIXAIO; } } + } - if (QueueFactory::isLinuxAioAvailable()) { - return queue_t::AIO; - } - if (QueueFactory::isUringAvailable()) { - return queue_t::URING; - } - return queue_t::POSIXAIO; + if (QueueFactory::isLinuxAioAvailable()) { + return queue_t::AIO; + } + if (QueueFactory::isUringAvailable()) { + return queue_t::URING; } + return queue_t::POSIXAIO; } +} // namespace // ----------------------------------------------------------------------------- // POSIX Backend Request Handle Implementation @@ -133,23 +147,24 @@ namespace { nixlPosixBackendReqH::nixlPosixBackendReqH(const nixl_xfer_op_t &op, const nixl_meta_dlist_t &loc, const nixl_meta_dlist_t &rem, - const nixl_opt_b_args_t* args, - const nixl_b_params_t* params) - : operation(op) - , local(loc) - , remote(rem) - , opt_args(args) - , custom_params_(params) - , queue_depth_(loc.descCount()) - , queue_type_(getQueueType(params)) { + const nixl_opt_b_args_t *args, + const nixl_b_params_t *params) + : operation(op), + local(loc), + remote(rem), + opt_args(args), + custom_params_(params), + queue_depth_(loc.descCount()), + queue_type_(getQueueType(params)) { if (queue_type_ == nixlPosixQueue::queue_t::UNSUPPORTED) { throw exception(absl::StrFormat("Unsupported queue type"), NIXL_ERR_NOT_SUPPORTED); } if (local.descCount() == 0 || remote.descCount() == 0) { - throw exception( - absl::StrFormat("Invalid descriptor count - local: %zu, remote: %zu", local.descCount(), remote.descCount()), - NIXL_ERR_INVALID_PARAM); + throw exception(absl::StrFormat("Invalid descriptor count - local: %zu, remote: %zu", + local.descCount(), + remote.descCount()), + NIXL_ERR_INVALID_PARAM); } nixl_status_t status = initQueues(); @@ -159,43 +174,44 @@ nixlPosixBackendReqH::nixlPosixBackendReqH(const nixl_xfer_op_t &op, } } - -nixl_status_t nixlPosixBackendReqH::initQueues() { +nixl_status_t +nixlPosixBackendReqH::initQueues() { try { switch (queue_type_) { - case nixlPosixQueue::queue_t::AIO: - queue = QueueFactory::createLinuxAioQueue(queue_depth_, operation); - break; - case nixlPosixQueue::queue_t::URING: - queue = QueueFactory::createUringQueue(queue_depth_, operation); - break; - case nixlPosixQueue::queue_t::POSIXAIO: - queue = QueueFactory::createPosixAioQueue(queue_depth_, operation); - break; - default: - NIXL_ERROR << absl::StrFormat("Invalid queue type: %s", to_string(queue_type_)); - return NIXL_ERR_INVALID_PARAM; + case nixlPosixQueue::queue_t::AIO: + queue = QueueFactory::createLinuxAioQueue(queue_depth_, operation); + break; + case nixlPosixQueue::queue_t::URING: + queue = QueueFactory::createUringQueue(queue_depth_, operation); + break; + case nixlPosixQueue::queue_t::POSIXAIO: + queue = QueueFactory::createPosixAioQueue(queue_depth_, operation); + break; + default: + NIXL_ERROR << absl::StrFormat("Invalid queue type: %s", to_string(queue_type_)); + return NIXL_ERR_INVALID_PARAM; } return NIXL_SUCCESS; - } catch (const nixlPosixBackendReqH::exception& e) { + } + catch (const nixlPosixBackendReqH::exception &e) { NIXL_ERROR << absl::StrFormat("Failed to initialize queues: %s", e.what()); return e.code(); - } catch (const std::exception& e) { + } + catch (const std::exception &e) { NIXL_ERROR << absl::StrFormat("Failed to initialize queues: %s", e.what()); return NIXL_ERR_BACKEND; } } -nixl_status_t nixlPosixBackendReqH::prepXfer() { +nixl_status_t +nixlPosixBackendReqH::prepXfer() { for (auto [local_it, remote_it] = std::make_pair(local.begin(), remote.begin()); local_it != local.end() && remote_it != remote.end(); ++local_it, ++remote_it) { - nixl_status_t status = queue->prepIO( - remote_it->devId, - reinterpret_cast(local_it->addr), - remote_it->len, - remote_it->addr - ); + nixl_status_t status = queue->prepIO(remote_it->devId, + reinterpret_cast(local_it->addr), + remote_it->len, + remote_it->addr); if (status != NIXL_SUCCESS) { NIXL_ERROR << "Error preparing I/O operation"; @@ -206,21 +222,23 @@ nixl_status_t nixlPosixBackendReqH::prepXfer() { return NIXL_SUCCESS; } -nixl_status_t nixlPosixBackendReqH::checkXfer() { +nixl_status_t +nixlPosixBackendReqH::checkXfer() { return queue->checkCompleted(); } -nixl_status_t nixlPosixBackendReqH::postXfer() { - return queue->submit (local, remote); +nixl_status_t +nixlPosixBackendReqH::postXfer() { + return queue->submit(local, remote); } // ----------------------------------------------------------------------------- // POSIX Engine Implementation // ----------------------------------------------------------------------------- -nixlPosixEngine::nixlPosixEngine(const nixlBackendInitParams* init_params) - : nixlBackendEngine(init_params) - , queue_type_(getQueueType(init_params->customParams)) { +nixlPosixEngine::nixlPosixEngine(const nixlBackendInitParams *init_params) + : nixlBackendEngine(init_params), + queue_type_(getQueueType(init_params->customParams)) { if (queue_type_ == nixlPosixQueue::queue_t::UNSUPPORTED) { initErr = true; NIXL_ERROR << absl::StrFormat( @@ -232,9 +250,10 @@ nixlPosixEngine::nixlPosixEngine(const nixlBackendInitParams* init_params) to_string(queue_type_)); } -nixl_status_t nixlPosixEngine::registerMem(const nixlBlobDesc &mem, - const nixl_mem_t &nixl_mem, - nixlBackendMD* &out) { +nixl_status_t +nixlPosixEngine::registerMem(const nixlBlobDesc &mem, + const nixl_mem_t &nixl_mem, + nixlBackendMD *&out) { auto supported_mems = getSupportedMems(); if (std::find(supported_mems.begin(), supported_mems.end(), nixl_mem) != supported_mems.end()) return NIXL_SUCCESS; @@ -242,16 +261,18 @@ nixl_status_t nixlPosixEngine::registerMem(const nixlBlobDesc &mem, return NIXL_ERR_NOT_SUPPORTED; } -nixl_status_t nixlPosixEngine::deregisterMem(nixlBackendMD *) { +nixl_status_t +nixlPosixEngine::deregisterMem(nixlBackendMD *) { return NIXL_SUCCESS; } -nixl_status_t nixlPosixEngine::prepXfer(const nixl_xfer_op_t &operation, - const nixl_meta_dlist_t &local, - const nixl_meta_dlist_t &remote, - const std::string &remote_agent, - nixlBackendReqH* &handle, - const nixl_opt_b_args_t* opt_args) const { +nixl_status_t +nixlPosixEngine::prepXfer(const nixl_xfer_op_t &operation, + const nixl_meta_dlist_t &local, + const nixl_meta_dlist_t &remote, + const std::string &remote_agent, + nixlBackendReqH *&handle, + const nixl_opt_b_args_t *opt_args) const { if (!isValidPrepXferParams(operation, local, remote, remote_agent, localAgent)) { return NIXL_ERR_INVALID_PARAM; } @@ -260,21 +281,22 @@ nixl_status_t nixlPosixEngine::prepXfer(const nixl_xfer_op_t &operation, // Create a params map with our backend selection nixl_b_params_t params; switch (queue_type_) { - case nixlPosixQueue::queue_t::AIO: - params["use_aio"] = "true"; - break; - case nixlPosixQueue::queue_t::URING: - params["use_uring"] = "true"; - break; - case nixlPosixQueue::queue_t::POSIXAIO: - params["use_posix_aio"] = "true"; - break; - default: - NIXL_ERROR << absl::StrFormat("Invalid queue type: %s", to_string(queue_type_)); - return NIXL_ERR_INVALID_PARAM; + case nixlPosixQueue::queue_t::AIO: + params["use_aio"] = "true"; + break; + case nixlPosixQueue::queue_t::URING: + params["use_uring"] = "true"; + break; + case nixlPosixQueue::queue_t::POSIXAIO: + params["use_posix_aio"] = "true"; + break; + default: + NIXL_ERROR << absl::StrFormat("Invalid queue type: %s", to_string(queue_type_)); + return NIXL_ERR_INVALID_PARAM; } - auto posix_handle = std::make_unique(operation, local, remote, opt_args, ¶ms); + auto posix_handle = + std::make_unique(operation, local, remote, opt_args, ¶ms); nixl_status_t status = posix_handle->prepXfer(); if (status != NIXL_SUCCESS) { return status; @@ -282,53 +304,60 @@ nixl_status_t nixlPosixEngine::prepXfer(const nixl_xfer_op_t &operation, handle = posix_handle.release(); return NIXL_SUCCESS; - } catch (const nixlPosixBackendReqH::exception& e) { + } + catch (const nixlPosixBackendReqH::exception &e) { NIXL_ERROR << absl::StrFormat("Error: %s", e.what()); return e.code(); - } catch (const std::exception& e) { + } + catch (const std::exception &e) { NIXL_ERROR << absl::StrFormat("Unexpected error: %s", e.what()); return NIXL_ERR_BACKEND; } } -nixl_status_t nixlPosixEngine::postXfer(const nixl_xfer_op_t &operation, - const nixl_meta_dlist_t &local, - const nixl_meta_dlist_t &remote, - const std::string &remote_agent, - nixlBackendReqH* &handle, - const nixl_opt_b_args_t* opt_args) const { +nixl_status_t +nixlPosixEngine::postXfer(const nixl_xfer_op_t &operation, + const nixl_meta_dlist_t &local, + const nixl_meta_dlist_t &remote, + const std::string &remote_agent, + nixlBackendReqH *&handle, + const nixl_opt_b_args_t *opt_args) const { try { - auto& posix_handle = castPosixHandle(handle); + auto &posix_handle = castPosixHandle(handle); nixl_status_t status = posix_handle.postXfer(); if (status != NIXL_IN_PROG) { NIXL_ERROR << "Error in submitting queue"; } return status; - } catch (const nixlPosixBackendReqH::exception& e) { + } + catch (const nixlPosixBackendReqH::exception &e) { NIXL_ERROR << e.what(); return e.code(); } return NIXL_ERR_BACKEND; } -nixl_status_t nixlPosixEngine::checkXfer(nixlBackendReqH* handle) const { +nixl_status_t +nixlPosixEngine::checkXfer(nixlBackendReqH *handle) const { try { - auto& posix_handle = castPosixHandle(handle); + auto &posix_handle = castPosixHandle(handle); return posix_handle.checkXfer(); } - catch (const nixlPosixBackendReqH::exception& e) { + catch (const nixlPosixBackendReqH::exception &e) { NIXL_ERROR << e.what(); return e.code(); } return NIXL_ERR_BACKEND; } -nixl_status_t nixlPosixEngine::releaseReqH(nixlBackendReqH* handle) const { +nixl_status_t +nixlPosixEngine::releaseReqH(nixlBackendReqH *handle) const { try { - auto& posix_handle = castPosixHandle(handle); + auto &posix_handle = castPosixHandle(handle); posix_handle.~nixlPosixBackendReqH(); return NIXL_SUCCESS; - } catch (const nixlPosixBackendReqH::exception& e) { + } + catch (const nixlPosixBackendReqH::exception &e) { NIXL_ERROR << e.what(); return e.code(); } diff --git a/src/plugins/posix/posix_backend.h b/src/plugins/posix/posix_backend.h index b2777297b..222c5008b 100644 --- a/src/plugins/posix/posix_backend.h +++ b/src/plugins/posix/posix_backend.h @@ -27,37 +27,45 @@ class nixlPosixBackendReqH : public nixlBackendReqH { private: - const nixl_xfer_op_t &operation; // The transfer operation (read/write) - const nixl_meta_dlist_t &local; // Local memory descriptor list - const nixl_meta_dlist_t &remote; // Remote memory descriptor list - const nixl_opt_b_args_t *opt_args; // Optional backend-specific arguments - const nixl_b_params_t *custom_params_; // Custom backend parameters - const int queue_depth_; // Queue depth for async I/O - std::unique_ptr queue; // Async I/O queue instance - const nixlPosixQueue::queue_t queue_type_; // Type of queue used + const nixl_xfer_op_t &operation; // The transfer operation (read/write) + const nixl_meta_dlist_t &local; // Local memory descriptor list + const nixl_meta_dlist_t &remote; // Remote memory descriptor list + const nixl_opt_b_args_t *opt_args; // Optional backend-specific arguments + const nixl_b_params_t *custom_params_; // Custom backend parameters + const int queue_depth_; // Queue depth for async I/O + std::unique_ptr queue; // Async I/O queue instance + const nixlPosixQueue::queue_t queue_type_; // Type of queue used - nixl_status_t initQueues(); // Initialize async I/O queue + nixl_status_t + initQueues(); // Initialize async I/O queue public: nixlPosixBackendReqH(const nixl_xfer_op_t &operation, const nixl_meta_dlist_t &local, const nixl_meta_dlist_t &remote, - const nixl_opt_b_args_t* opt_args, - const nixl_b_params_t* custom_params); - ~nixlPosixBackendReqH() {}; + const nixl_opt_b_args_t *opt_args, + const nixl_b_params_t *custom_params); + ~nixlPosixBackendReqH(){}; - nixl_status_t postXfer(); - nixl_status_t prepXfer(); - nixl_status_t checkXfer(); + nixl_status_t + postXfer(); + nixl_status_t + prepXfer(); + nixl_status_t + checkXfer(); // Exception classes - class exception: public std::exception { - private: - const nixl_status_t code_; - public: - exception(const std::string& msg, nixl_status_t code) - : std::exception(), code_(code) {} - nixl_status_t code() const noexcept { return code_; } + class exception : public std::exception { + private: + const nixl_status_t code_; + + public: + exception(const std::string &msg, nixl_status_t code) : std::exception(), code_(code) {} + + nixl_status_t + code() const noexcept { + return code_; + } }; }; @@ -66,64 +74,76 @@ class nixlPosixEngine : public nixlBackendEngine { const nixlPosixQueue::queue_t queue_type_; public: - nixlPosixEngine(const nixlBackendInitParams* init_params); + nixlPosixEngine(const nixlBackendInitParams *init_params); virtual ~nixlPosixEngine() = default; - bool supportsRemote() const override { + bool + supportsRemote() const override { return false; } - bool supportsLocal() const override { + bool + supportsLocal() const override { return true; } - bool supportsNotif() const override { + bool + supportsNotif() const override { return false; } - nixl_mem_list_t getSupportedMems() const override { + nixl_mem_list_t + getSupportedMems() const override { return {FILE_SEG, DRAM_SEG}; } - nixl_status_t registerMem(const nixlBlobDesc &mem, - const nixl_mem_t &nixl_mem, - nixlBackendMD* &out) override; + nixl_status_t + registerMem(const nixlBlobDesc &mem, const nixl_mem_t &nixl_mem, nixlBackendMD *&out) override; - nixl_status_t deregisterMem(nixlBackendMD* meta) override; + nixl_status_t + deregisterMem(nixlBackendMD *meta) override; - nixl_status_t connect(const std::string &remote_agent) override { + nixl_status_t + connect(const std::string &remote_agent) override { return NIXL_SUCCESS; } - nixl_status_t disconnect(const std::string &remote_agent) override { + nixl_status_t + disconnect(const std::string &remote_agent) override { return NIXL_SUCCESS; } - nixl_status_t unloadMD(nixlBackendMD* input) override { + nixl_status_t + unloadMD(nixlBackendMD *input) override { return NIXL_SUCCESS; } - nixl_status_t prepXfer(const nixl_xfer_op_t &operation, - const nixl_meta_dlist_t &local, - const nixl_meta_dlist_t &remote, - const std::string &remote_agent, - nixlBackendReqH* &handle, - const nixl_opt_b_args_t* opt_args=nullptr) const override; + nixl_status_t + prepXfer(const nixl_xfer_op_t &operation, + const nixl_meta_dlist_t &local, + const nixl_meta_dlist_t &remote, + const std::string &remote_agent, + nixlBackendReqH *&handle, + const nixl_opt_b_args_t *opt_args = nullptr) const override; - nixl_status_t postXfer(const nixl_xfer_op_t &operation, - const nixl_meta_dlist_t &local, - const nixl_meta_dlist_t &remote, - const std::string &remote_agent, - nixlBackendReqH* &handle, - const nixl_opt_b_args_t* opt_args=nullptr) const override; + nixl_status_t + postXfer(const nixl_xfer_op_t &operation, + const nixl_meta_dlist_t &local, + const nixl_meta_dlist_t &remote, + const std::string &remote_agent, + nixlBackendReqH *&handle, + const nixl_opt_b_args_t *opt_args = nullptr) const override; - nixl_status_t checkXfer(nixlBackendReqH* handle) const override; - nixl_status_t releaseReqH(nixlBackendReqH* handle) const override; + nixl_status_t + checkXfer(nixlBackendReqH *handle) const override; + nixl_status_t + releaseReqH(nixlBackendReqH *handle) const override; nixl_status_t queryMem(const nixl_reg_dlist_t &descs, std::vector &resp) const override; - nixl_status_t loadLocalMD(nixlBackendMD* input, nixlBackendMD* &output) override { + nixl_status_t + loadLocalMD(nixlBackendMD *input, nixlBackendMD *&output) override { output = input; return NIXL_SUCCESS; } From 5911caad0712ef9e353dbd749d0f8a9fb09b616b Mon Sep 17 00:00:00 2001 From: Anton Nayshtut Date: Wed, 12 Nov 2025 14:47:13 +0200 Subject: [PATCH 2/2] plugins/posix: minimized IO queue abstraction This patch set minimizes the IO queue abstraction, making it as low- level as possible. It now only wraps the lowest-level IO-related API. It works with a set of IOs (writes or reads) while leaving the Xfer level logic to the upper layer (posix_backend). This patch set allows the posix_backend to implement the common logic, while only offloading the details of specific file access API to the IO queue abstraction. This patch set prepares the code for the POSIX plugin threading model implementation. With it, the threading will be implemented once, in the posix_backend. Signed-off-by: Anton Nayshtut --- src/plugins/posix/aio_queue.cpp | 151 ---------------- src/plugins/posix/aio_queue.h | 52 ------ src/plugins/posix/io_queue.cpp | 56 ++++++ src/plugins/posix/io_queue.h | 84 +++++++++ src/plugins/posix/io_uring_io_queue.cpp | 197 +++++++++++++++++++++ src/plugins/posix/linux_aio_io_queue.cpp | 209 +++++++++++++++++++++++ src/plugins/posix/linux_aio_queue.cpp | 143 ---------------- src/plugins/posix/linux_aio_queue.h | 58 ------- src/plugins/posix/meson.build | 10 +- src/plugins/posix/posix_aio_io_queue.cpp | 190 +++++++++++++++++++++ src/plugins/posix/posix_backend.cpp | 190 ++++++++++----------- src/plugins/posix/posix_backend.h | 14 +- src/plugins/posix/posix_queue.h | 42 ----- src/plugins/posix/queue_factory_impl.cpp | 108 ------------ src/plugins/posix/queue_factory_impl.h | 39 ----- src/plugins/posix/uring_queue.cpp | 158 ----------------- src/plugins/posix/uring_queue.h | 56 ------ 17 files changed, 843 insertions(+), 914 deletions(-) delete mode 100644 src/plugins/posix/aio_queue.cpp delete mode 100644 src/plugins/posix/aio_queue.h create mode 100644 src/plugins/posix/io_queue.cpp create mode 100644 src/plugins/posix/io_queue.h create mode 100644 src/plugins/posix/io_uring_io_queue.cpp create mode 100644 src/plugins/posix/linux_aio_io_queue.cpp delete mode 100644 src/plugins/posix/linux_aio_queue.cpp delete mode 100644 src/plugins/posix/linux_aio_queue.h create mode 100644 src/plugins/posix/posix_aio_io_queue.cpp delete mode 100644 src/plugins/posix/posix_queue.h delete mode 100644 src/plugins/posix/queue_factory_impl.cpp delete mode 100644 src/plugins/posix/queue_factory_impl.h delete mode 100644 src/plugins/posix/uring_queue.cpp delete mode 100644 src/plugins/posix/uring_queue.h diff --git a/src/plugins/posix/aio_queue.cpp b/src/plugins/posix/aio_queue.cpp deleted file mode 100644 index 460ead307..000000000 --- a/src/plugins/posix/aio_queue.cpp +++ /dev/null @@ -1,151 +0,0 @@ -/* - * SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved. - * SPDX-License-Identifier: Apache-2.0 - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#include "aio_queue.h" -#include "posix_backend.h" -#include -#include "common/nixl_log.h" -#include -#include -#include - -aioQueue::aioQueue(int num_entries, nixl_xfer_op_t operation) - : aiocbs(num_entries), - num_entries(num_entries), - completed(num_entries), - num_completed(0), - num_submitted(0), - operation(operation) { - if (num_entries <= 0) { - throw std::runtime_error("Invalid number of entries for AIO queue"); - } - for (auto& aiocb : aiocbs) { - memset(&aiocb, 0, sizeof(struct aiocb)); - } -} - -aioQueue::~aioQueue() { - // There should not be any in-flight I/Os at destruction time - if (num_submitted > num_completed) { - NIXL_ERROR << "Programming error: Destroying aioQueue with " << (num_submitted - num_completed) << " in-flight I/Os"; - } - - // Cancel any remaining I/Os - for (auto& aiocb : aiocbs) { - if (aiocb.aio_fildes != 0) { - aio_cancel(aiocb.aio_fildes, &aiocb); - } - } -} - -nixl_status_t -aioQueue::submit (const nixl_meta_dlist_t &, const nixl_meta_dlist_t &) { - num_submitted = 0; - // Submit all I/Os at once - for (auto& aiocb : aiocbs) { - if (aiocb.aio_fildes == 0 || aiocb.aio_nbytes == 0) continue; - // Check if file descriptor is valid - if (aiocb.aio_fildes < 0) { - NIXL_ERROR << "Invalid file descriptor in AIO request"; - return NIXL_ERR_BACKEND; - } - - int ret; - if (operation == NIXL_READ) { - ret = aio_read(&aiocb); - } else { - ret = aio_write(&aiocb); - } - - if (ret < 0) { - if (errno == EAGAIN) { - // If we hit the kernel limit, cancel all submitted I/Os and return error - NIXL_ERROR << "AIO submit failed: kernel queue full"; - for (auto& cb : aiocbs) { - if (cb.aio_fildes != 0) { - aio_cancel(cb.aio_fildes, &cb); - } - } - return NIXL_ERR_BACKEND; - } - NIXL_PERROR << "AIO submit failed"; - return NIXL_ERR_BACKEND; - } - - num_submitted++; - } - - completed.assign(num_entries, false); - num_completed = 0; - return NIXL_IN_PROG; -} - -nixl_status_t aioQueue::checkCompleted() { - if (num_completed == num_entries) - return NIXL_SUCCESS; - - // Check all submitted I/Os - for (int i = 0; i < num_entries; i++) { - if (completed[i] || aiocbs[i].aio_fildes == 0 || aiocbs[i].aio_nbytes == 0) - continue; // Skip completed I/Os - - int status = aio_error(&aiocbs[i]); - if (status == 0) { // Operation completed - ssize_t ret = aio_return(&aiocbs[i]); - if (ret < 0 || ret != static_cast(aiocbs[i].aio_nbytes)) { - NIXL_PERROR << "AIO operation failed or incomplete"; - return NIXL_ERR_BACKEND; - } - num_completed++; - completed[i] = true; - } else if (status == EINPROGRESS) { - return NIXL_IN_PROG; // At least one operation still in progress - } else { - NIXL_PERROR << "AIO error"; - return NIXL_ERR_BACKEND; - } - } - - return (num_completed == num_submitted) ? NIXL_SUCCESS : NIXL_IN_PROG; -} - -nixl_status_t aioQueue::prepIO(int fd, void* buf, size_t len, off_t offset) { - // Find an unused control block - for (auto& aiocb : aiocbs) { - if (aiocb.aio_fildes == 0) { - // Check if file descriptor is valid - if (fd < 0) { - NIXL_ERROR << "Invalid file descriptor provided to prepareIO"; - return NIXL_ERR_BACKEND; - } - - // Check buffer and length - if (!buf || len == 0) { - NIXL_ERROR << "Invalid buffer or length provided to prepareIO"; - return NIXL_ERR_BACKEND; - } - - aiocb.aio_fildes = fd; - aiocb.aio_buf = buf; - aiocb.aio_nbytes = len; - aiocb.aio_offset = offset; - return NIXL_SUCCESS; - } - } - NIXL_ERROR << "No available AIO control blocks"; - return NIXL_ERR_BACKEND; -} diff --git a/src/plugins/posix/aio_queue.h b/src/plugins/posix/aio_queue.h deleted file mode 100644 index cba8e9a0f..000000000 --- a/src/plugins/posix/aio_queue.h +++ /dev/null @@ -1,52 +0,0 @@ -/* - * SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved. - * SPDX-License-Identifier: Apache-2.0 - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#ifndef AIO_QUEUE_H -#define AIO_QUEUE_H - -#include -#include -#include "posix_queue.h" - -// Forward declare Error class -class nixlPosixBackendReqH; - -class aioQueue : public nixlPosixQueue { - private: - std::vector aiocbs; // Array of AIO control blocks - int num_entries; // Total number of entries expected - std::vector completed; // Track completed I/Os - int num_completed; // Number of completed operations - int num_submitted; // Track number of submitted I/Os - nixl_xfer_op_t operation; // Whether this is a read operation - - // Delete copy and move operations - aioQueue(const aioQueue&) = delete; - aioQueue& operator=(const aioQueue&) = delete; - aioQueue(aioQueue&&) = delete; - aioQueue& operator=(aioQueue&&) = delete; - - public: - aioQueue(int num_entries, nixl_xfer_op_t operation); - ~aioQueue(); - nixl_status_t - submit (const nixl_meta_dlist_t &, const nixl_meta_dlist_t &) override; - nixl_status_t checkCompleted() override; - nixl_status_t prepIO(int fd, void* buf, size_t len, off_t offset) override; -}; - -#endif // AIO_QUEUE_H diff --git a/src/plugins/posix/io_queue.cpp b/src/plugins/posix/io_queue.cpp new file mode 100644 index 000000000..15f9e45cc --- /dev/null +++ b/src/plugins/posix/io_queue.cpp @@ -0,0 +1,56 @@ +/* + * SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved. + * SPDX-License-Identifier: Apache-2.0 + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "io_queue.h" + +#ifdef HAVE_LIBAIO +std::unique_ptr nixlPosixAioIOQueueCreate(uint32_t max_ios); +#endif +#ifdef HAVE_LIBURING +std::unique_ptr nixlPosixIoUringIOQueueCreate(uint32_t max_ios); +#endif +#ifdef HAVE_LINUXAIO +std::unique_ptr nixlPosixLinuxAioIOQueueCreate(uint32_t max_ios); +#endif + +static const struct { + const char *name; + nixlPosixIOQueue::nixlPosixIOQueueCreateFn createFn; +} factories[] = { +#ifdef HAVE_LIBAIO + {"POSIXAIO", nixlPosixAioIOQueueCreate}, +#endif +#ifdef HAVE_LIBURING + {"URING", nixlPosixIoUringIOQueueCreate}, +#endif +#ifdef HAVE_LINUXAIO + {"AIO", nixlPosixLinuxAioIOQueueCreate}, +#endif +}; + +const uint32_t nixlPosixIOQueue::MIN_IOS = 64; +const uint32_t nixlPosixIOQueue::MAX_IOS = 1024 * 64; + +std::unique_ptr +nixlPosixIOQueue::instantiate(std::string_view io_queue_type, uint32_t max_ios) { + for (const auto &factory : factories) { + if (io_queue_type == factory.name) { + return factory.createFn(max_ios); + } + } + return nullptr; +} diff --git a/src/plugins/posix/io_queue.h b/src/plugins/posix/io_queue.h new file mode 100644 index 000000000..4bae4996a --- /dev/null +++ b/src/plugins/posix/io_queue.h @@ -0,0 +1,84 @@ +/* + * SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved. + * SPDX-License-Identifier: Apache-2.0 + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef POSIX_IO_QUEUE_H +#define POSIX_IO_QUEUE_H + +#include +#include +#include +#include +#include +#include +#include "backend_aux.h" + +typedef void (*nixlPosixIOQueueDoneCb)(void *ctx, uint32_t data_size, int error); + +class nixlPosixIOQueue { +public: + using nixlPosixIOQueueCreateFn = + std::function(uint32_t max_ios)>; + + nixlPosixIOQueue(uint32_t max_ios) : max_ios_(normalizedMaxIOS(max_ios)) {} + + virtual ~nixlPosixIOQueue() {} + + virtual nixl_status_t + enqueue(int fd, + void *buf, + size_t len, + off_t offset, + bool read, + nixlPosixIOQueueDoneCb clb, + void *ctx) = 0; + virtual nixl_status_t + post(void) = 0; + virtual nixl_status_t + poll(void) = 0; + + static std::unique_ptr + instantiate(std::string_view io_queue_type, uint32_t max_ios); + +protected: + static uint32_t + normalizedMaxIOS(uint32_t max_ios) { + uint32_t m = std::max(MIN_IOS, max_ios); + m = std::min(m, MAX_IOS); + return m; + } + + uint32_t max_ios_; + static const uint32_t MIN_IOS; + static const uint32_t MAX_IOS; +}; + +template class nixlPosixIOQueueImpl : public nixlPosixIOQueue { +public: + nixlPosixIOQueueImpl(uint32_t max_ios) : nixlPosixIOQueue(max_ios), ios_(max_ios_) { + for (uint32_t i = 0; i < max_ios_; i++) { + free_ios_.push_back(&ios_[i]); + } + } + +protected: + std::vector ios_; + std::list free_ios_; + std::list ios_to_submit_; +}; + + +#endif // POSIX_IO_QUEUE_H diff --git a/src/plugins/posix/io_uring_io_queue.cpp b/src/plugins/posix/io_uring_io_queue.cpp new file mode 100644 index 000000000..ca50d88ae --- /dev/null +++ b/src/plugins/posix/io_uring_io_queue.cpp @@ -0,0 +1,197 @@ +/* + * SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved. + * SPDX-License-Identifier: Apache-2.0 + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "io_queue.h" +#include "common/nixl_log.h" +#include +#include + +#define MAX_IO_SUBMIT_BATCH_SIZE 64 +#define MAX_IO_CHECK_COMPLETED_BATCH_SIZE 64 + +struct nixlPosixIoUringIO { +public: + int fd; + void *buf_; + size_t len_; + off_t offset_; + bool read_; + nixlPosixIOQueueDoneCb clb_; + void *ctx_; + struct io_uring_sqe *sqe_; +}; + +class nixlPosixIoUringIOQueue : public nixlPosixIOQueueImpl { +public: + nixlPosixIoUringIOQueue(uint32_t max_ios); + + virtual nixl_status_t + post(void) override; + virtual nixl_status_t + enqueue(int fd, + void *buf, + size_t len, + off_t offset, + bool read, + nixlPosixIOQueueDoneCb clb, + void *ctx) override; + virtual nixl_status_t + poll(void) override; + virtual ~nixlPosixIoUringIOQueue() override; + +protected: + nixlPosixIoUringIO * + getBufInfo(struct iocb *io); + nixl_status_t + doSubmit(void); + nixl_status_t + doCheckCompleted(void); + +private: + struct io_uring uring; // The io_uring instance for async I/O operations +}; + +nixlPosixIoUringIOQueue::nixlPosixIoUringIOQueue(uint32_t max_ios) : nixlPosixIOQueueImpl(max_ios) { + io_uring_params params = {}; + if (io_uring_queue_init_params(max_ios_, &uring, ¶ms) < 0) { + throw std::runtime_error( + absl::StrFormat("Failed to initialize io_uring instance: %s", nixl_strerror(errno))); + } +} + +nixl_status_t +nixlPosixIoUringIOQueue::doSubmit(void) { + if (ios_to_submit_.empty()) { + return NIXL_SUCCESS; // No blocks to submit + } + + int num_ios = std::min(MAX_IO_SUBMIT_BATCH_SIZE, (int)ios_to_submit_.size()); + for (int i = 0; i < num_ios; i++) { + nixlPosixIoUringIO *io = ios_to_submit_.front(); + ios_to_submit_.pop_front(); + + struct io_uring_sqe *sqe = io_uring_get_sqe(&uring); + if (!sqe) { + NIXL_ERROR << "Failed to get io_uring submission queue entry"; + return NIXL_ERR_BACKEND; + } + + if (io->read_) { + io_uring_prep_read(sqe, io->fd, io->buf_, io->len_, io->offset_); + } else { + io_uring_prep_write(sqe, io->fd, io->buf_, io->len_, io->offset_); + } + + io_uring_sqe_set_data(sqe, io); + } + + int ret = io_uring_submit(&uring); + if (ret < 0) { + NIXL_ERROR << "io_uring_submit failed: " << nixl_strerror(-ret); + return NIXL_ERR_BACKEND; + } + + return ios_to_submit_.empty() ? NIXL_SUCCESS : NIXL_IN_PROG; +} + +nixl_status_t +nixlPosixIoUringIOQueue::doCheckCompleted(void) { + struct io_uring_cqe *cqe; + unsigned head; + int count = 0; + io_uring_for_each_cqe(&uring, head, cqe) { + int res = cqe->res; + nixlPosixIoUringIO *io = reinterpret_cast(io_uring_cqe_get_data(cqe)); + if (io->clb_) { + io->clb_(io->ctx_, res, 0); + } + free_ios_.push_back(io); + if (res < 0) { + NIXL_ERROR << absl::StrFormat("IO operation failed: %s", nixl_strerror(-res)); + return NIXL_ERR_BACKEND; + } + count++; + if (count == MAX_IO_CHECK_COMPLETED_BATCH_SIZE) { + break; + } + } + + // Mark all seen + io_uring_cq_advance(&uring, count); + + if (free_ios_.size() == max_ios_) { + return NIXL_SUCCESS; // All ios are free now + } + + return NIXL_IN_PROG; // Some ios are in flight, need to check again +} + +nixl_status_t +nixlPosixIoUringIOQueue::enqueue(int fd, + void *buf, + size_t len, + off_t offset, + bool read, + nixlPosixIOQueueDoneCb clb, + void *ctx) { + if (free_ios_.empty()) { + NIXL_ERROR << "No more free blocks available"; + return NIXL_ERR_NOT_ALLOWED; + } + + nixlPosixIoUringIO *io = free_ios_.front(); + free_ios_.pop_front(); + io->fd = fd; + io->buf_ = buf; + io->len_ = len; + io->offset_ = offset; + io->read_ = read; + io->clb_ = clb; + io->ctx_ = ctx; + + ios_to_submit_.push_back(io); + + return NIXL_SUCCESS; +} + +nixl_status_t +nixlPosixIoUringIOQueue::post(void) { + nixl_status_t status = doSubmit(); + if (status < 0) { + return status; + } + + return NIXL_IN_PROG; +} + +nixl_status_t +nixlPosixIoUringIOQueue::poll(void) { + nixl_status_t status = post(); + if (status < 0) { + return status; + } + + return doCheckCompleted(); +} + +nixlPosixIoUringIOQueue::~nixlPosixIoUringIOQueue() { + io_uring_queue_exit(&uring); +} + +std::unique_ptr nixlPosixIoUringIOQueueCreate(uint32_t max_ios) { + return std::make_unique(max_ios); +} diff --git a/src/plugins/posix/linux_aio_io_queue.cpp b/src/plugins/posix/linux_aio_io_queue.cpp new file mode 100644 index 000000000..387728cc0 --- /dev/null +++ b/src/plugins/posix/linux_aio_io_queue.cpp @@ -0,0 +1,209 @@ +/* + * SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved. + * SPDX-License-Identifier: Apache-2.0 + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "io_queue.h" +#include +#include "common/nixl_log.h" +#include +#include + +#define MAX_IO_SUBMIT_BATCH_SIZE 64 +#define MAX_IO_CHECK_COMPLETED_BATCH_SIZE 64 + +struct nixlPosixLinuxAioIO { +public: + nixlPosixIOQueueDoneCb clb_; + void *ctx_; + struct iocb io_; +}; + +class nixlPosixLinuxAioIOQueue : public nixlPosixIOQueueImpl { +public: + nixlPosixLinuxAioIOQueue(uint32_t max_ios); + + virtual nixl_status_t + post(void) override; + virtual nixl_status_t + enqueue(int fd, + void *buf, + size_t len, + off_t offset, + bool read, + nixlPosixIOQueueDoneCb clb, + void *ctx) override; + virtual nixl_status_t + poll(void) override; + virtual ~nixlPosixLinuxAioIOQueue() override; + +protected: + nixlPosixLinuxAioIO * + getBufInfo(struct iocb *io); + nixl_status_t + doSubmit(void); + nixl_status_t + doCheckCompleted(void); + +private: + io_context_t io_ctx_; // I/O context +}; + +nixlPosixLinuxAioIOQueue::nixlPosixLinuxAioIOQueue(uint32_t max_ios) + : nixlPosixIOQueueImpl(max_ios) { + int res = io_queue_init(max_ios_, &io_ctx_); + if (res) { + throw std::runtime_error( + absl::StrFormat("Failed to initialize io_queue: %s", nixl_strerror(errno))); + } +} + +nixl_status_t +nixlPosixLinuxAioIOQueue::enqueue(int fd, + void *buf, + size_t len, + off_t offset, + bool read, + nixlPosixIOQueueDoneCb clb, + void *ctx) { + if (free_ios_.empty()) { + NIXL_ERROR << "No more free blocks available"; + return NIXL_ERR_NOT_ALLOWED; + } + nixlPosixLinuxAioIO *io = free_ios_.front(); + free_ios_.pop_front(); + + if (read) { + io_prep_pread(&io->io_, fd, buf, len, offset); + } else { + io_prep_pwrite(&io->io_, fd, buf, len, offset); + } + io->clb_ = clb; + io->ctx_ = ctx; + io->io_.data = io; + ios_to_submit_.push_back(io); + + return NIXL_SUCCESS; +} + +nixlPosixLinuxAioIOQueue::~nixlPosixLinuxAioIOQueue() { + io_queue_release(io_ctx_); +} + +nixl_status_t +nixlPosixLinuxAioIOQueue::doSubmit(void) { + struct iocb *ios[MAX_IO_SUBMIT_BATCH_SIZE]; + nixlPosixLinuxAioIO *to_submit[MAX_IO_SUBMIT_BATCH_SIZE]; + + if (ios_to_submit_.empty()) { + return NIXL_SUCCESS; // No blocks to submit + } + + int num_ios = std::min(MAX_IO_SUBMIT_BATCH_SIZE, (int)ios_to_submit_.size()); + for (int i = 0; i < num_ios; i++) { + nixlPosixLinuxAioIO *io = ios_to_submit_.front(); + ios_to_submit_.pop_front(); + + ios[i] = &io->io_; + to_submit[i] = io; + } + + int ret = io_submit(io_ctx_, num_ios, ios); + if (ret < 0) { + if (ret == -EAGAIN) { + ret = 0; // 0 were submitted, we will try again later + } else { + NIXL_ERROR << "io_submit failed: " << nixl_strerror(-ret); + return NIXL_ERR_BACKEND; + } + } + + for (int i = num_ios - 1; i >= ret; i--) { + // If not submitted, push back to the front of the list + nixlPosixLinuxAioIO *io = to_submit[i]; + ios_to_submit_.push_front(io); + } + + return ios_to_submit_.empty() ? NIXL_SUCCESS : NIXL_IN_PROG; +} + +nixl_status_t +nixlPosixLinuxAioIOQueue::doCheckCompleted(void) { + struct io_event events[MAX_IO_CHECK_COMPLETED_BATCH_SIZE]; + std::list completed_ios; + int rc; + struct timespec timeout = {0, 0}; + + if (free_ios_.size() == max_ios_) { + return NIXL_SUCCESS; // All ios are free, no ios in flight + } + + rc = io_getevents(io_ctx_, 0, MAX_IO_CHECK_COMPLETED_BATCH_SIZE, events, &timeout); + if (rc < 0) { + NIXL_ERROR << "io_getevents error: " << rc; + return NIXL_ERR_BACKEND; + } + + for (int i = 0; i < rc; i++) { + struct iocb *iocb = events[i].obj; + nixlPosixLinuxAioIO *io = (nixlPosixLinuxAioIO *)iocb->data; + + if (events[i].res < 0) { + NIXL_ERROR << "AIO operation failed: " << events[i].res; + return NIXL_ERR_BACKEND; + } + + if (io->clb_) { + io->clb_(io->ctx_, events[i].res, 0); + } + + completed_ios.push_back(io); + } + + if (!completed_ios.empty()) { + free_ios_.splice(free_ios_.end(), completed_ios); + } + + if (free_ios_.size() == max_ios_) { + return NIXL_SUCCESS; // All ios are free now + } + + return NIXL_IN_PROG; // Some blocks are in flight, need to check again +} + +nixl_status_t +nixlPosixLinuxAioIOQueue::post(void) { + nixl_status_t status = doSubmit(); + if (status < 0) { + return status; + } + + return NIXL_IN_PROG; +} + +nixl_status_t +nixlPosixLinuxAioIOQueue::poll(void) { + nixl_status_t status = doSubmit(); + if (status < 0) { + return status; + } + + return doCheckCompleted(); +} + +std::unique_ptr +nixlPosixLinuxAioIOQueueCreate(uint32_t max_ios) { + return std::make_unique(max_ios); +} diff --git a/src/plugins/posix/linux_aio_queue.cpp b/src/plugins/posix/linux_aio_queue.cpp deleted file mode 100644 index 0b4cebfa6..000000000 --- a/src/plugins/posix/linux_aio_queue.cpp +++ /dev/null @@ -1,143 +0,0 @@ -/* - * SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved. - * SPDX-License-Identifier: Apache-2.0 - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#include "linux_aio_queue.h" -#include "posix_backend.h" -#include -#include "common/nixl_log.h" -#include -#include -#include - -linuxAioQueue::linuxAioQueue(int num_entries, nixl_xfer_op_t operation) - : io_ctx(io_context_t()), - ios(num_entries), - num_entries(num_entries), - num_ios_to_submit(0), - completed(num_entries), - num_completed(0), - operation(operation) { - if (num_entries <= 0) { - throw std::runtime_error("Invalid number of entries for AIO queue"); - } - - if (operation != NIXL_READ && operation != NIXL_WRITE) { - throw std::runtime_error("Invalid operation for AIO queue"); - } - - int res = io_queue_init(num_entries, &io_ctx); - if (res) { - throw std::runtime_error("io_queue_init (" + std::to_string(num_entries) + - ") failed with " + std::to_string(res)); - } - - ios_to_submit.assign(num_entries, nullptr); -} - -linuxAioQueue::~linuxAioQueue() { - io_queue_release(io_ctx); -} - -nixl_status_t -linuxAioQueue::submit(const nixl_meta_dlist_t &, const nixl_meta_dlist_t &) { - if (!num_ios_to_submit) { - return NIXL_IN_PROG; - } - - int ret = io_submit(io_ctx, num_ios_to_submit, ios_to_submit.data()); - if (ret != num_ios_to_submit) { - if (ret < 0) { - NIXL_ERROR << absl::StrFormat("linux_aio submit failed: %s", nixl_strerror(-ret)); - } else { - NIXL_ERROR << absl::StrFormat( - "linux_aio submit failed. Partial submission: %d/%d", num_ios_to_submit, ret); - } - return NIXL_ERR_BACKEND; - } - - num_completed = 0; - num_ios_to_submit = 0; - return NIXL_IN_PROG; -} - -nixl_status_t -linuxAioQueue::checkCompleted() { - if (num_completed == num_entries) { - return NIXL_SUCCESS; - } - - struct io_event events[32]; - int rc; - struct timespec timeout = {0, 0}; - - rc = io_getevents(io_ctx, 0, 32, events, &timeout); - if (rc < 0) { - NIXL_ERROR << "io_getevents error: " << rc; - return NIXL_ERR_BACKEND; - } - - for (int i = 0; i < rc; i++) { - struct iocb *io = events[i].obj; - size_t idx = (size_t)io->data; - - ios_to_submit[idx] = nullptr; // Mark as completed - - if (events[i].res < 0) { - NIXL_ERROR << "AIO operation failed: " << events[i].res; - return NIXL_ERR_BACKEND; - } - } - - num_completed += rc; - - return (num_completed == num_entries) ? NIXL_SUCCESS : NIXL_IN_PROG; -} - -nixl_status_t -linuxAioQueue::prepIO(int fd, void *buf, size_t len, off_t offset) { - if (num_ios_to_submit == num_entries) { - NIXL_ERROR << "No available IOs"; - return NIXL_ERR_BACKEND; - } - - // Check if file descriptor is valid - if (fd < 0) { - NIXL_ERROR << "Invalid file descriptor provided to prepareIO"; - return NIXL_ERR_BACKEND; - } - - // Check buffer and length - if (!buf || len == 0) { - NIXL_ERROR << "Invalid buffer or length provided to prepareIO"; - return NIXL_ERR_BACKEND; - } - - int idx = num_ios_to_submit; - auto io = &ios[idx]; - - if (operation == NIXL_READ) { - io_prep_pread(io, fd, buf, len, offset); - } else { - io_prep_pwrite(io, fd, buf, len, offset); - } - - ios_to_submit[idx] = io; - io->data = (void *)(uintptr_t)idx; - num_ios_to_submit++; - - return NIXL_SUCCESS; -} diff --git a/src/plugins/posix/linux_aio_queue.h b/src/plugins/posix/linux_aio_queue.h deleted file mode 100644 index 985b5f73e..000000000 --- a/src/plugins/posix/linux_aio_queue.h +++ /dev/null @@ -1,58 +0,0 @@ -/* - * SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved. - * SPDX-License-Identifier: Apache-2.0 - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#ifndef LINUXAIO_QUEUE_H -#define LINUXAIO_QUEUE_H - -#include -#include -#include "posix_queue.h" - -// Forward declare Error class -class nixlPosixBackendReqH; - -class linuxAioQueue : public nixlPosixQueue { -private: - io_context_t io_ctx; // I/O context - std::vector ios; // Array of I/Os - int num_entries; // Total number of entries expected - std::vector ios_to_submit; // Array of I/Os to submit - int num_ios_to_submit; // Total number of entries to submit - std::vector completed; // Track completed I/Os - int num_completed; // Number of completed operations - nixl_xfer_op_t operation; // Whether this is a read operation - - // Delete copy and move operations - linuxAioQueue(const linuxAioQueue &) = delete; - linuxAioQueue & - operator=(const linuxAioQueue &) = delete; - linuxAioQueue(linuxAioQueue &&) = delete; - linuxAioQueue & - operator=(linuxAioQueue &&) = delete; - -public: - linuxAioQueue(int num_entries, nixl_xfer_op_t operation); - ~linuxAioQueue(); - nixl_status_t - submit(const nixl_meta_dlist_t &, const nixl_meta_dlist_t &) override; - nixl_status_t - checkCompleted() override; - nixl_status_t - prepIO(int fd, void *buf, size_t len, off_t offset) override; -}; - -#endif // LINUXAIO_QUEUE_H diff --git a/src/plugins/posix/meson.build b/src/plugins/posix/meson.build index dff2365cf..a9d127801 100644 --- a/src/plugins/posix/meson.build +++ b/src/plugins/posix/meson.build @@ -22,14 +22,16 @@ posix_sources = [ 'posix_backend.cpp', 'posix_backend.h', 'posix_plugin.cpp', - 'queue_factory_impl.cpp', - 'aio_queue.cpp' # Always include AIO source since it's required + 'io_queue.h', + 'io_queue.cpp', + 'linux_aio_io_queue.cpp' # Always include Linux AIO source since it's required ] # If libaio is not found, skip building the POSIX plugin entirely if posix_aio message('Correct libaio found, Building POSIX plugin') compile_defs = ['-DHAVE_LIBAIO'] + posix_sources += ['posix_aio_io_queue.cpp'] plugin_link_args = ['-laio'] plugin_deps += [ aio_dep ] elif rt_dep.found() @@ -44,7 +46,7 @@ endif have_uring = uring_dep.found() if have_uring compile_defs += ['-DHAVE_LIBURING'] - posix_sources += ['uring_queue.cpp'] + posix_sources += ['io_uring_io_queue.cpp'] plugin_deps += [uring_dep] plugin_link_args += ['-luring'] message('liburing found, adding io_uring support') @@ -54,7 +56,7 @@ endif if linux_aio compile_defs += ['-DHAVE_LINUXAIO'] - posix_sources += ['linux_aio_queue.cpp'] + #posix_sources += ['linux_aio_queue.cpp'] plugin_link_args += ['-laio'] message('Linux AIO found, adding Linux AIO support') else diff --git a/src/plugins/posix/posix_aio_io_queue.cpp b/src/plugins/posix/posix_aio_io_queue.cpp new file mode 100644 index 000000000..56b54b863 --- /dev/null +++ b/src/plugins/posix/posix_aio_io_queue.cpp @@ -0,0 +1,190 @@ +/* + * SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved. + * SPDX-License-Identifier: Apache-2.0 + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "io_queue.h" +#include "common/nixl_log.h" +#include + +#define MAX_IO_SUBMIT_BATCH_SIZE 64 +#define MAX_IO_CHECK_COMPLETED_BATCH_SIZE 64 + +struct nixlPosixAioIO { +public: + nixlPosixIOQueueDoneCb clb_; + void *ctx_; + struct aiocb aio_; + bool read_; +}; + +class nixlPosixAioIOQueue : public nixlPosixIOQueueImpl { +public: + nixlPosixAioIOQueue(uint32_t max_ios) : nixlPosixIOQueueImpl(max_ios) {} + + virtual nixl_status_t + post(void) override; + virtual nixl_status_t + enqueue(int fd, + void *buf, + size_t len, + off_t offset, + bool read, + nixlPosixIOQueueDoneCb clb, + void *ctx) override; + virtual nixl_status_t + poll(void) override; + virtual ~nixlPosixAioIOQueue() override; + +protected: + nixl_status_t + doSubmit(void); + nixl_status_t + doCheckCompleted(void); + + std::list ios_in_flight_; +}; + +nixlPosixAioIOQueue::~nixlPosixAioIOQueue() { + for (auto &io : ios_) { + if (io.aio_.aio_fildes != 0) { + aio_cancel(io.aio_.aio_fildes, &io.aio_); + } + } +} + +nixl_status_t +nixlPosixAioIOQueue::enqueue(int fd, + void *buf, + size_t len, + off_t offset, + bool read, + nixlPosixIOQueueDoneCb clb, + void *ctx) { + if (free_ios_.empty()) { + NIXL_ERROR << "No more free blocks available"; + return NIXL_ERR_NOT_ALLOWED; + } + + nixlPosixAioIO *io = free_ios_.front(); + free_ios_.pop_front(); + + io->clb_ = clb; + io->ctx_ = ctx; + io->read_ = read; + io->aio_.aio_fildes = fd; + io->aio_.aio_buf = buf; + io->aio_.aio_nbytes = len; + io->aio_.aio_offset = offset; + + ios_to_submit_.push_back(io); + + return NIXL_SUCCESS; +} + +nixl_status_t +nixlPosixAioIOQueue::doSubmit(void) { + if (ios_to_submit_.empty()) { + return NIXL_SUCCESS; // No blocks to submit + } + + int num_ios = std::min(MAX_IO_SUBMIT_BATCH_SIZE, (int)ios_to_submit_.size()); + for (int i = 0; i < num_ios; i++) { + nixlPosixAioIO *io = ios_to_submit_.front(); + ios_to_submit_.pop_front(); + + int ret; + if (io->read_) { + ret = aio_read(&io->aio_); + } else { + ret = aio_write(&io->aio_); + } + + if (ret < 0) { + NIXL_ERROR << "aio_submit failed: " << nixl_strerror(-ret); + ios_to_submit_.push_front(io); + return NIXL_ERR_BACKEND; + } + + ios_in_flight_.push_back(io); + } + + return ios_to_submit_.empty() ? NIXL_SUCCESS : NIXL_IN_PROG; +} + +nixl_status_t +nixlPosixAioIOQueue::doCheckCompleted(void) { + if (ios_in_flight_.empty()) { + return NIXL_SUCCESS; // No blocks in flight + } + + int num_ios = std::min(MAX_IO_CHECK_COMPLETED_BATCH_SIZE, (int)ios_in_flight_.size()); + for (auto it = ios_in_flight_.begin(); it != ios_in_flight_.end();) { + nixlPosixAioIO *io = *it; + int status = aio_error(&io->aio_); + if (status == 0) { + ssize_t ret = aio_return(&io->aio_); + if (ret < 0 || ret != static_cast(io->aio_.aio_nbytes)) { + NIXL_ERROR << "aio_return failed: " << nixl_strerror(-ret); + ios_in_flight_.push_front(io); + return NIXL_ERR_BACKEND; + } + if (io->clb_) { + io->clb_(io->ctx_, ret, 0); + } + it = ios_in_flight_.erase(it); + free_ios_.push_back(io); + } else if (status == EINPROGRESS) { + return NIXL_IN_PROG; + } else { + NIXL_ERROR << "aio_error failed: " << nixl_strerror(-status); + ios_in_flight_.push_front(io); + return NIXL_ERR_BACKEND; + } + + it++; + + num_ios--; + if (num_ios == 0) { + break; + } + } + + return ios_in_flight_.empty() ? NIXL_SUCCESS : NIXL_IN_PROG; +} + +nixl_status_t +nixlPosixAioIOQueue::post(void) { + nixl_status_t status = doSubmit(); + if (status < 0) { + return status; + } + return NIXL_IN_PROG; +} + +nixl_status_t +nixlPosixAioIOQueue::poll(void) { + nixl_status_t status = doSubmit(); + if (status < 0) { + return status; + } + + return doCheckCompleted(); +} + +std::unique_ptr +nixlPosixAioIOQueueCreate(uint32_t max_ios) { + return std::make_unique(max_ios); +} diff --git a/src/plugins/posix/posix_backend.cpp b/src/plugins/posix/posix_backend.cpp index 2d72a6384..839b8ba3f 100644 --- a/src/plugins/posix/posix_backend.cpp +++ b/src/plugins/posix/posix_backend.cpp @@ -23,7 +23,6 @@ #include #include #include "common/nixl_log.h" -#include "queue_factory_impl.h" #include "nixl_types.h" #include "file/file_utils.h" @@ -73,70 +72,59 @@ castPosixHandle(nixlBackendReqH *handle) { return dynamic_cast(*handle); } -// Stringify function for queue_t -inline const char * -to_string(nixlPosixQueue::queue_t type) { - using queue_t = nixlPosixQueue::queue_t; - switch (type) { - case queue_t::AIO: - return "AIO"; - case queue_t::URING: - return "URING"; - case queue_t::POSIXAIO: - return "POSIXAIO"; - case queue_t::UNSUPPORTED: - return "UNSUPPORTED"; - default: - return "UNKNOWN"; - } -} - -static nixlPosixQueue::queue_t -getQueueType(const nixl_b_params_t *custom_params) { - using queue_t = nixlPosixQueue::queue_t; - +static const char * +getIoQueueType(const nixl_b_params_t *custom_params) { // Check for explicit backend request if (custom_params) { // First check if AIO is explicitly requested if (custom_params->count("use_aio") > 0) { const auto &value = custom_params->at("use_aio"); if (value == "true" || value == "1") { - if (!QueueFactory::isLinuxAioAvailable()) { - NIXL_ERROR << "linux_aio backend requested but not available at runtime"; - return queue_t::UNSUPPORTED; - } - return queue_t::AIO; + return "AIO"; } } +#ifdef HAVE_LIBURING // Then check if io_uring is explicitly requested if (custom_params->count("use_uring") > 0) { const auto &value = custom_params->at("use_uring"); if (value == "true" || value == "1") { - if (!QueueFactory::isUringAvailable()) { - NIXL_ERROR << "io_uring backend requested but not available at runtime"; - return queue_t::UNSUPPORTED; - } - return queue_t::URING; + return "URING"; } } +#endif +#ifdef HAVE_LIBAIO // Then check if linux_aio is explicitly requested if (custom_params->count("use_posix_aio") > 0) { const auto &value = custom_params->at("use_posix_aio"); if (value == "true" || value == "1") { - return queue_t::POSIXAIO; + return "POSIXAIO"; } } +#endif } - if (QueueFactory::isLinuxAioAvailable()) { - return queue_t::AIO; + return "AIO"; +} + +// Log completion percentage at regular intervals (every log_percent_step percent) +void +logOnPercentStep(unsigned int completed, unsigned int total) { + constexpr unsigned int default_log_percent_step = 10; + static_assert(default_log_percent_step >= 1 && default_log_percent_step <= 100, + "log_percent_step must be in [1, 100]"); + unsigned int log_percent_step = total < 10 ? 1 : default_log_percent_step; + + if (total == 0) { + NIXL_ERROR << "Tried to log completion percentage with total == 0"; + return; } - if (QueueFactory::isUringAvailable()) { - return queue_t::URING; + // Only log at each percentage step + if (completed % (total / log_percent_step) == 0) { + NIXL_DEBUG << absl::StrFormat("Queue progress: %.1f%% complete", + (completed * 100.0 / total)); } - return queue_t::POSIXAIO; } } // namespace @@ -154,10 +142,12 @@ nixlPosixBackendReqH::nixlPosixBackendReqH(const nixl_xfer_op_t &op, remote(rem), opt_args(args), custom_params_(params), - queue_depth_(loc.descCount()), - queue_type_(getQueueType(params)) { - if (queue_type_ == nixlPosixQueue::queue_t::UNSUPPORTED) { - throw exception(absl::StrFormat("Unsupported queue type"), NIXL_ERR_NOT_SUPPORTED); + queue_depth_(loc.descCount()) { + + std::string io_queue_type = params->at("io_queue_type"); + if (io_queue_type.empty()) { + throw exception("Unsupported io queue type: no io queue type specified", + NIXL_ERR_NOT_SUPPORTED); } if (local.descCount() == 0 || remote.descCount() == 0) { @@ -167,69 +157,88 @@ nixlPosixBackendReqH::nixlPosixBackendReqH(const nixl_xfer_op_t &op, NIXL_ERR_INVALID_PARAM); } - nixl_status_t status = initQueues(); + nixl_status_t status = initIoQueue(io_queue_type); if (status != NIXL_SUCCESS) { - throw exception(absl::StrFormat("Failed to initialize queues: %s", to_string(queue_type_)), + throw exception(absl::StrFormat("Failed to initialize io queue: %s", io_queue_type), status); } } +void +nixlPosixBackendReqH::ioDone(uint32_t data_size, int error) { + num_confirmed_ios_++; + logOnPercentStep(num_confirmed_ios_, queue_depth_); +} + +void +nixlPosixBackendReqH::ioDoneClb(void *ctx, uint32_t data_size, int error) { + nixlPosixBackendReqH *self = static_cast(ctx); + self->ioDone(data_size, error); +} + nixl_status_t -nixlPosixBackendReqH::initQueues() { +nixlPosixBackendReqH::initIoQueue(const std::string &io_queue_type) { try { - switch (queue_type_) { - case nixlPosixQueue::queue_t::AIO: - queue = QueueFactory::createLinuxAioQueue(queue_depth_, operation); - break; - case nixlPosixQueue::queue_t::URING: - queue = QueueFactory::createUringQueue(queue_depth_, operation); - break; - case nixlPosixQueue::queue_t::POSIXAIO: - queue = QueueFactory::createPosixAioQueue(queue_depth_, operation); - break; - default: - NIXL_ERROR << absl::StrFormat("Invalid queue type: %s", to_string(queue_type_)); - return NIXL_ERR_INVALID_PARAM; + io_queue_ = nixlPosixIOQueue::instantiate(io_queue_type, queue_depth_); + if (!io_queue_) { + throw exception(absl::StrFormat("Failed to initialize io queue: %s", io_queue_type), + NIXL_ERR_INVALID_PARAM); } + return NIXL_SUCCESS; } catch (const nixlPosixBackendReqH::exception &e) { - NIXL_ERROR << absl::StrFormat("Failed to initialize queues: %s", e.what()); + NIXL_ERROR << absl::StrFormat("Failed to initialize io queue: %s", e.what()); return e.code(); } catch (const std::exception &e) { - NIXL_ERROR << absl::StrFormat("Failed to initialize queues: %s", e.what()); + NIXL_ERROR << absl::StrFormat("Failed to initialize io queue: %s", e.what()); return NIXL_ERR_BACKEND; } } nixl_status_t nixlPosixBackendReqH::prepXfer() { - for (auto [local_it, remote_it] = std::make_pair(local.begin(), remote.begin()); - local_it != local.end() && remote_it != remote.end(); - ++local_it, ++remote_it) { - nixl_status_t status = queue->prepIO(remote_it->devId, - reinterpret_cast(local_it->addr), - remote_it->len, - remote_it->addr); - - if (status != NIXL_SUCCESS) { - NIXL_ERROR << "Error preparing I/O operation"; - return status; - } - } - return NIXL_SUCCESS; } nixl_status_t nixlPosixBackendReqH::checkXfer() { - return queue->checkCompleted(); + if (num_confirmed_ios_ == queue_depth_) { + return NIXL_SUCCESS; + } + + nixl_status_t status = io_queue_->poll(); + if (status < 0) { + return status; + } + + return NIXL_IN_PROG; } nixl_status_t nixlPosixBackendReqH::postXfer() { - return queue->submit(local, remote); + num_confirmed_ios_ = 0; + + for (auto [local_it, remote_it] = std::make_pair(local.begin(), remote.begin()); + local_it != local.end() && remote_it != remote.end(); + ++local_it, ++remote_it) { + nixl_status_t status = io_queue_->enqueue(remote_it->devId, + reinterpret_cast(local_it->addr), + remote_it->len, + remote_it->addr, + operation == NIXL_READ, + ioDoneClb, + this); + + if (status != NIXL_SUCCESS) { + // Currently we do not support partial submissions, so it's all or nothing + NIXL_ERROR << absl::StrFormat("Error preparing I/O operation: %d", status); + return status; + } + } + + return io_queue_->post(); } // ----------------------------------------------------------------------------- @@ -238,16 +247,14 @@ nixlPosixBackendReqH::postXfer() { nixlPosixEngine::nixlPosixEngine(const nixlBackendInitParams *init_params) : nixlBackendEngine(init_params), - queue_type_(getQueueType(init_params->customParams)) { - if (queue_type_ == nixlPosixQueue::queue_t::UNSUPPORTED) { + io_queue_type_(getIoQueueType(init_params->customParams)) { + if (!io_queue_type_) { initErr = true; - NIXL_ERROR << absl::StrFormat( - "Failed to initialize POSIX backend - requested queue type not available: %s", - to_string(queue_type_)); + NIXL_ERROR << "Failed to initialize POSIX backend - no supported io queue type found"; return; } - NIXL_INFO << absl::StrFormat("POSIX backend initialized using queue type: %s", - to_string(queue_type_)); + NIXL_INFO << absl::StrFormat("POSIX backend initialized using io queue type: %s", + io_queue_type_); } nixl_status_t @@ -280,20 +287,7 @@ nixlPosixEngine::prepXfer(const nixl_xfer_op_t &operation, try { // Create a params map with our backend selection nixl_b_params_t params; - switch (queue_type_) { - case nixlPosixQueue::queue_t::AIO: - params["use_aio"] = "true"; - break; - case nixlPosixQueue::queue_t::URING: - params["use_uring"] = "true"; - break; - case nixlPosixQueue::queue_t::POSIXAIO: - params["use_posix_aio"] = "true"; - break; - default: - NIXL_ERROR << absl::StrFormat("Invalid queue type: %s", to_string(queue_type_)); - return NIXL_ERR_INVALID_PARAM; - } + params["io_queue_type"] = io_queue_type_; auto posix_handle = std::make_unique(operation, local, remote, opt_args, ¶ms); diff --git a/src/plugins/posix/posix_backend.h b/src/plugins/posix/posix_backend.h index 222c5008b..0fbf8fc06 100644 --- a/src/plugins/posix/posix_backend.h +++ b/src/plugins/posix/posix_backend.h @@ -23,7 +23,7 @@ #include #include #include "backend/backend_engine.h" -#include "posix_queue.h" +#include "io_queue.h" class nixlPosixBackendReqH : public nixlBackendReqH { private: @@ -33,11 +33,15 @@ class nixlPosixBackendReqH : public nixlBackendReqH { const nixl_opt_b_args_t *opt_args; // Optional backend-specific arguments const nixl_b_params_t *custom_params_; // Custom backend parameters const int queue_depth_; // Queue depth for async I/O - std::unique_ptr queue; // Async I/O queue instance - const nixlPosixQueue::queue_t queue_type_; // Type of queue used + std::unique_ptr io_queue_; // Async I/O queue instance + int num_confirmed_ios_; // Number of confirmed IOs nixl_status_t - initQueues(); // Initialize async I/O queue + initIoQueue(const std::string &io_queue_type); // Initialize async I/O queue + void + ioDone(uint32_t data_size, int error); + static void + ioDoneClb(void *ctx, uint32_t data_size, int error); public: nixlPosixBackendReqH(const nixl_xfer_op_t &operation, @@ -71,7 +75,7 @@ class nixlPosixBackendReqH : public nixlBackendReqH { class nixlPosixEngine : public nixlBackendEngine { private: - const nixlPosixQueue::queue_t queue_type_; + const char *io_queue_type_; public: nixlPosixEngine(const nixlBackendInitParams *init_params); diff --git a/src/plugins/posix/posix_queue.h b/src/plugins/posix/posix_queue.h deleted file mode 100644 index d9251897c..000000000 --- a/src/plugins/posix/posix_queue.h +++ /dev/null @@ -1,42 +0,0 @@ -/* - * SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved. - * SPDX-License-Identifier: Apache-2.0 - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#ifndef POSIX_QUEUE_H -#define POSIX_QUEUE_H - -#include "nixl_types.h" -#include "backend/backend_aux.h" -#include - -// Abstract base class for async I/O operations -class nixlPosixQueue { - public: - virtual ~nixlPosixQueue() = default; - virtual nixl_status_t - submit (const nixl_meta_dlist_t &local, const nixl_meta_dlist_t &remote) = 0; - virtual nixl_status_t checkCompleted() = 0; - virtual nixl_status_t prepIO(int fd, void* buf, size_t len, off_t offset) = 0; - - enum class queue_t { - AIO, - URING, - POSIXAIO, - UNSUPPORTED, - }; -}; - -#endif // POSIX_QUEUE_H diff --git a/src/plugins/posix/queue_factory_impl.cpp b/src/plugins/posix/queue_factory_impl.cpp deleted file mode 100644 index 6d85f1eb4..000000000 --- a/src/plugins/posix/queue_factory_impl.cpp +++ /dev/null @@ -1,108 +0,0 @@ -/* - * SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved. - * SPDX-License-Identifier: Apache-2.0 - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#include -#include "queue_factory_impl.h" -#include "posix_queue.h" -#include "posix_backend.h" -#include "aio_queue.h" - -#ifdef HAVE_LIBURING -#include "uring_queue.h" -#endif - -#ifdef HAVE_LINUXAIO -#include "linux_aio_queue.h" -#endif - -// Anonymous namespace for internal template implementations for functions that use the optional liburing -namespace { - struct uringEnabled {}; - struct uringDisabled {}; - -#ifdef HAVE_LIBURING - using uringMode = uringEnabled; -#else - using uringMode = uringDisabled; -#endif - - template - struct funcImpl; - - template - struct funcImpl::value>> { - static std::unique_ptr createUringQueue(int num_entries, nixl_xfer_op_t operation) { - // Initialize io_uring parameters with basic configuration - // Start with basic parameters, no special flags - // We can add optimizations like SQPOLL later - struct io_uring_params params = {}; - return std::make_unique(num_entries, params, operation); - } - - static bool isUringAvailable() { - return true; - } - }; - - template - struct funcImpl::value>> { - static std::unique_ptr createUringQueue(int num_entries, nixl_xfer_op_t operation) { - (void)num_entries; - (void)operation; - throw nixlPosixBackendReqH::exception("Attempting to create io_uring queue when support is not compiled in", - NIXL_ERR_NOT_SUPPORTED); - } - - static bool isUringAvailable() { - return false; - } - }; -} - -// Public functions implementation -std::unique_ptr -QueueFactory::createPosixAioQueue(int num_entries, nixl_xfer_op_t operation) { - return std::make_unique(num_entries, operation); -} - -std::unique_ptr QueueFactory::createUringQueue(int num_entries, nixl_xfer_op_t operation) { - return funcImpl::createUringQueue(num_entries, operation); -} - -std::unique_ptr -QueueFactory::createLinuxAioQueue(int num_entries, nixl_xfer_op_t operation) { -#ifdef HAVE_LINUXAIO - return std::make_unique(num_entries, operation); -#else - throw nixlPosixBackendReqH::exception( - "Attempting to create linux_aio queue when support is not compiled in", - NIXL_ERR_NOT_SUPPORTED); -#endif -} - -bool QueueFactory::isUringAvailable() { - return funcImpl::isUringAvailable(); -} - -bool -QueueFactory::isLinuxAioAvailable() { -#ifdef HAVE_LINUXAIO - return true; -#else - return false; -#endif -} diff --git a/src/plugins/posix/queue_factory_impl.h b/src/plugins/posix/queue_factory_impl.h deleted file mode 100644 index c9b667fd6..000000000 --- a/src/plugins/posix/queue_factory_impl.h +++ /dev/null @@ -1,39 +0,0 @@ -/* - * SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved. - * SPDX-License-Identifier: Apache-2.0 - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#ifndef QUEUE_FACTORY_IMPL_H -#define QUEUE_FACTORY_IMPL_H - -#include "posix_queue.h" - -namespace QueueFactory { -std::unique_ptr -createPosixAioQueue(int num_entries, nixl_xfer_op_t operation); - -std::unique_ptr -createUringQueue(int num_entries, nixl_xfer_op_t operation); - -std::unique_ptr -createLinuxAioQueue(int num_entries, nixl_xfer_op_t operation); - -bool -isLinuxAioAvailable(); -bool -isUringAvailable(); -}; // namespace QueueFactory - -#endif // QUEUE_FACTORY_IMPL_H diff --git a/src/plugins/posix/uring_queue.cpp b/src/plugins/posix/uring_queue.cpp deleted file mode 100644 index a36a70968..000000000 --- a/src/plugins/posix/uring_queue.cpp +++ /dev/null @@ -1,158 +0,0 @@ -/* - * SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved. - * SPDX-License-Identifier: Apache-2.0 - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#include "uring_queue.h" -#include -#include -#include -#include -#include -#include "absl/strings/str_format.h" -#include "absl/strings/str_join.h" -#include "common/nixl_log.h" - -namespace { - // Log completion percentage at regular intervals (every log_percent_step percent) - void logOnPercentStep(unsigned int completed, unsigned int total) { - constexpr unsigned int default_log_percent_step = 10; - static_assert (default_log_percent_step >= 1 && default_log_percent_step <= 100, - "log_percent_step must be in [1, 100]"); - unsigned int log_percent_step = total < 10 ? 1 : default_log_percent_step; - - if (total == 0) { - NIXL_ERROR << "Tried to log completion percentage with total == 0"; - return; - } - // Only log at each percentage step - if (completed % (total / log_percent_step) == 0) { - NIXL_DEBUG << absl::StrFormat("Queue progress: %.1f%% complete", - (completed * 100.0 / total)); - } - } - - std::string stringifyUringFeatures(unsigned int features) { - static const std::unordered_map feature_map = { - {IORING_FEAT_SQPOLL_NONFIXED, "SQPOLL"}, - {IORING_FEAT_FAST_POLL, "IOPOLL"} - }; - - std::vector enabled; - for (unsigned int bits = features; bits; bits &= (bits - 1)) { // step through each set bit - unsigned int bit = bits & -bits; // isolate lowest set bit - auto it = feature_map.find(bit); - if (it != feature_map.end()) { - enabled.push_back(it->second); - } - } - return enabled.empty() ? "none" : absl::StrJoin(enabled, ", "); - } -} - -nixl_status_t UringQueue::init(int entries, const io_uring_params& params) { - // Initialize with basic setup - need a mutable copy since the API modifies the params - io_uring_params mutable_params = params; - if (io_uring_queue_init_params(entries, &uring, &mutable_params) < 0) { - throw std::runtime_error(absl::StrFormat("Failed to initialize io_uring instance: %s", nixl_strerror(errno))); - } - - // Log the features supported by this io_uring instance - NIXL_INFO << absl::StrFormat("io_uring features: %s", stringifyUringFeatures(mutable_params.features)); - - return NIXL_SUCCESS; -} - -UringQueue::UringQueue(int num_entries, const io_uring_params& params, nixl_xfer_op_t operation) - : num_entries(num_entries) - , num_completed(0) - , prep_op(operation == NIXL_READ ? - reinterpret_cast(io_uring_prep_read) : - reinterpret_cast(io_uring_prep_write)) -{ - if (num_entries <= 0) { - throw std::invalid_argument("Invalid number of entries for UringQueue"); - } - - init(num_entries, params); -} - -UringQueue::~UringQueue() { - io_uring_queue_exit(&uring); -} - -nixl_status_t -UringQueue::submit (const nixl_meta_dlist_t &local, const nixl_meta_dlist_t &remote) { - for (auto [local_it, remote_it] = std::make_pair (local.begin(), remote.begin()); - local_it != local.end() && remote_it != remote.end(); - ++local_it, ++remote_it) { - int fd = remote_it->devId; - void *buf = reinterpret_cast (local_it->addr); - size_t len = local_it->len; - off_t offset = remote_it->addr; - - struct io_uring_sqe *sqe = io_uring_get_sqe (&uring); - if (!sqe) { - NIXL_ERROR << "Failed to get io_uring submission queue entry"; - return NIXL_ERR_BACKEND; - } - prep_op (sqe, fd, buf, len, offset); - } - - int ret = io_uring_submit(&uring); - if (ret != num_entries) { - if (ret < 0) { - NIXL_ERROR << absl::StrFormat("io_uring submit failed: %s", nixl_strerror(-ret)); - } else { - NIXL_ERROR << absl::StrFormat("io_uring submit failed. Partial submission: %d/%d", num_entries, ret); - } - return NIXL_ERR_BACKEND; - } - num_completed = 0; - return NIXL_IN_PROG; -} - -nixl_status_t UringQueue::checkCompleted() { - if (num_completed == num_entries) { - return NIXL_SUCCESS; - } - - // Process all available completions - struct io_uring_cqe* cqe; - unsigned head; - unsigned count = 0; - - // Get completion events - io_uring_for_each_cqe(&uring, head, cqe) { - int res = cqe->res; - if (res < 0) { - NIXL_ERROR << absl::StrFormat("IO operation failed: %s", nixl_strerror(-res)); - return NIXL_ERR_BACKEND; - } - count++; - } - - // Mark all seen - io_uring_cq_advance(&uring, count); - num_completed += count; - - logOnPercentStep(num_completed, num_entries); - - return (num_completed == num_entries) ? NIXL_SUCCESS : NIXL_IN_PROG; -} - -nixl_status_t UringQueue::prepIO(int fd, void* buf, size_t len, off_t offset) { - return NIXL_SUCCESS; -} diff --git a/src/plugins/posix/uring_queue.h b/src/plugins/posix/uring_queue.h deleted file mode 100644 index 61d66be37..000000000 --- a/src/plugins/posix/uring_queue.h +++ /dev/null @@ -1,56 +0,0 @@ -/* - * SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved. - * SPDX-License-Identifier: Apache-2.0 - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#ifndef URING_QUEUE_H -#define URING_QUEUE_H - -#include -#include "posix_queue.h" -#include - -// Forward declare Error class -class nixlPosixBackendReqH; - -// Type definition for io_uring prep functions -typedef void (*io_uring_prep_func_t)(struct io_uring_sqe*, int, const void*, unsigned int, __u64); - -class UringQueue : public nixlPosixQueue { - private: - struct io_uring uring; // The io_uring instance for async I/O operations - const int num_entries; // Total number of entries expected in this ring - int num_completed; // Number of completed operations so far - io_uring_prep_func_t prep_op; // Pointer to prep function - - // Initialize the queue with the given parameters - nixl_status_t init(int num_entries, const struct io_uring_params& params); - - // Delete copy and move operations to prevent accidental copying of kernel resources - UringQueue(const UringQueue&) = delete; - UringQueue& operator=(const UringQueue&) = delete; - UringQueue(UringQueue&&) = delete; - UringQueue& operator=(UringQueue&&) = delete; - - public: - UringQueue(int num_entries, const struct io_uring_params& params, nixl_xfer_op_t operation); - ~UringQueue(); - nixl_status_t - submit (const nixl_meta_dlist_t &local, const nixl_meta_dlist_t &remote) override; - nixl_status_t checkCompleted() override; - nixl_status_t prepIO(int fd, void* buf, size_t len, off_t offset) override; -}; - -#endif // URING_QUEUE_H