diff --git a/src/plugins/posix/aio_queue.cpp b/src/plugins/posix/aio_queue.cpp deleted file mode 100644 index 460ead307..000000000 --- a/src/plugins/posix/aio_queue.cpp +++ /dev/null @@ -1,151 +0,0 @@ -/* - * SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved. - * SPDX-License-Identifier: Apache-2.0 - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#include "aio_queue.h" -#include "posix_backend.h" -#include -#include "common/nixl_log.h" -#include -#include -#include - -aioQueue::aioQueue(int num_entries, nixl_xfer_op_t operation) - : aiocbs(num_entries), - num_entries(num_entries), - completed(num_entries), - num_completed(0), - num_submitted(0), - operation(operation) { - if (num_entries <= 0) { - throw std::runtime_error("Invalid number of entries for AIO queue"); - } - for (auto& aiocb : aiocbs) { - memset(&aiocb, 0, sizeof(struct aiocb)); - } -} - -aioQueue::~aioQueue() { - // There should not be any in-flight I/Os at destruction time - if (num_submitted > num_completed) { - NIXL_ERROR << "Programming error: Destroying aioQueue with " << (num_submitted - num_completed) << " in-flight I/Os"; - } - - // Cancel any remaining I/Os - for (auto& aiocb : aiocbs) { - if (aiocb.aio_fildes != 0) { - aio_cancel(aiocb.aio_fildes, &aiocb); - } - } -} - -nixl_status_t -aioQueue::submit (const nixl_meta_dlist_t &, const nixl_meta_dlist_t &) { - num_submitted = 0; - // Submit all I/Os at once - for (auto& aiocb : aiocbs) { - if (aiocb.aio_fildes == 0 || aiocb.aio_nbytes == 0) continue; - // Check if file descriptor is valid - if (aiocb.aio_fildes < 0) { - NIXL_ERROR << "Invalid file descriptor in AIO request"; - return NIXL_ERR_BACKEND; - } - - int ret; - if (operation == NIXL_READ) { - ret = aio_read(&aiocb); - } else { - ret = aio_write(&aiocb); - } - - if (ret < 0) { - if (errno == EAGAIN) { - // If we hit the kernel limit, cancel all submitted I/Os and return error - NIXL_ERROR << "AIO submit failed: kernel queue full"; - for (auto& cb : aiocbs) { - if (cb.aio_fildes != 0) { - aio_cancel(cb.aio_fildes, &cb); - } - } - return NIXL_ERR_BACKEND; - } - NIXL_PERROR << "AIO submit failed"; - return NIXL_ERR_BACKEND; - } - - num_submitted++; - } - - completed.assign(num_entries, false); - num_completed = 0; - return NIXL_IN_PROG; -} - -nixl_status_t aioQueue::checkCompleted() { - if (num_completed == num_entries) - return NIXL_SUCCESS; - - // Check all submitted I/Os - for (int i = 0; i < num_entries; i++) { - if (completed[i] || aiocbs[i].aio_fildes == 0 || aiocbs[i].aio_nbytes == 0) - continue; // Skip completed I/Os - - int status = aio_error(&aiocbs[i]); - if (status == 0) { // Operation completed - ssize_t ret = aio_return(&aiocbs[i]); - if (ret < 0 || ret != static_cast(aiocbs[i].aio_nbytes)) { - NIXL_PERROR << "AIO operation failed or incomplete"; - return NIXL_ERR_BACKEND; - } - num_completed++; - completed[i] = true; - } else if (status == EINPROGRESS) { - return NIXL_IN_PROG; // At least one operation still in progress - } else { - NIXL_PERROR << "AIO error"; - return NIXL_ERR_BACKEND; - } - } - - return (num_completed == num_submitted) ? NIXL_SUCCESS : NIXL_IN_PROG; -} - -nixl_status_t aioQueue::prepIO(int fd, void* buf, size_t len, off_t offset) { - // Find an unused control block - for (auto& aiocb : aiocbs) { - if (aiocb.aio_fildes == 0) { - // Check if file descriptor is valid - if (fd < 0) { - NIXL_ERROR << "Invalid file descriptor provided to prepareIO"; - return NIXL_ERR_BACKEND; - } - - // Check buffer and length - if (!buf || len == 0) { - NIXL_ERROR << "Invalid buffer or length provided to prepareIO"; - return NIXL_ERR_BACKEND; - } - - aiocb.aio_fildes = fd; - aiocb.aio_buf = buf; - aiocb.aio_nbytes = len; - aiocb.aio_offset = offset; - return NIXL_SUCCESS; - } - } - NIXL_ERROR << "No available AIO control blocks"; - return NIXL_ERR_BACKEND; -} diff --git a/src/plugins/posix/aio_queue.h b/src/plugins/posix/aio_queue.h deleted file mode 100644 index cba8e9a0f..000000000 --- a/src/plugins/posix/aio_queue.h +++ /dev/null @@ -1,52 +0,0 @@ -/* - * SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved. - * SPDX-License-Identifier: Apache-2.0 - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#ifndef AIO_QUEUE_H -#define AIO_QUEUE_H - -#include -#include -#include "posix_queue.h" - -// Forward declare Error class -class nixlPosixBackendReqH; - -class aioQueue : public nixlPosixQueue { - private: - std::vector aiocbs; // Array of AIO control blocks - int num_entries; // Total number of entries expected - std::vector completed; // Track completed I/Os - int num_completed; // Number of completed operations - int num_submitted; // Track number of submitted I/Os - nixl_xfer_op_t operation; // Whether this is a read operation - - // Delete copy and move operations - aioQueue(const aioQueue&) = delete; - aioQueue& operator=(const aioQueue&) = delete; - aioQueue(aioQueue&&) = delete; - aioQueue& operator=(aioQueue&&) = delete; - - public: - aioQueue(int num_entries, nixl_xfer_op_t operation); - ~aioQueue(); - nixl_status_t - submit (const nixl_meta_dlist_t &, const nixl_meta_dlist_t &) override; - nixl_status_t checkCompleted() override; - nixl_status_t prepIO(int fd, void* buf, size_t len, off_t offset) override; -}; - -#endif // AIO_QUEUE_H diff --git a/src/plugins/posix/io_queue.cpp b/src/plugins/posix/io_queue.cpp new file mode 100644 index 000000000..15f9e45cc --- /dev/null +++ b/src/plugins/posix/io_queue.cpp @@ -0,0 +1,56 @@ +/* + * SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved. + * SPDX-License-Identifier: Apache-2.0 + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "io_queue.h" + +#ifdef HAVE_LIBAIO +std::unique_ptr nixlPosixAioIOQueueCreate(uint32_t max_ios); +#endif +#ifdef HAVE_LIBURING +std::unique_ptr nixlPosixIoUringIOQueueCreate(uint32_t max_ios); +#endif +#ifdef HAVE_LINUXAIO +std::unique_ptr nixlPosixLinuxAioIOQueueCreate(uint32_t max_ios); +#endif + +static const struct { + const char *name; + nixlPosixIOQueue::nixlPosixIOQueueCreateFn createFn; +} factories[] = { +#ifdef HAVE_LIBAIO + {"POSIXAIO", nixlPosixAioIOQueueCreate}, +#endif +#ifdef HAVE_LIBURING + {"URING", nixlPosixIoUringIOQueueCreate}, +#endif +#ifdef HAVE_LINUXAIO + {"AIO", nixlPosixLinuxAioIOQueueCreate}, +#endif +}; + +const uint32_t nixlPosixIOQueue::MIN_IOS = 64; +const uint32_t nixlPosixIOQueue::MAX_IOS = 1024 * 64; + +std::unique_ptr +nixlPosixIOQueue::instantiate(std::string_view io_queue_type, uint32_t max_ios) { + for (const auto &factory : factories) { + if (io_queue_type == factory.name) { + return factory.createFn(max_ios); + } + } + return nullptr; +} diff --git a/src/plugins/posix/io_queue.h b/src/plugins/posix/io_queue.h new file mode 100644 index 000000000..4bae4996a --- /dev/null +++ b/src/plugins/posix/io_queue.h @@ -0,0 +1,84 @@ +/* + * SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved. + * SPDX-License-Identifier: Apache-2.0 + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef POSIX_IO_QUEUE_H +#define POSIX_IO_QUEUE_H + +#include +#include +#include +#include +#include +#include +#include "backend_aux.h" + +typedef void (*nixlPosixIOQueueDoneCb)(void *ctx, uint32_t data_size, int error); + +class nixlPosixIOQueue { +public: + using nixlPosixIOQueueCreateFn = + std::function(uint32_t max_ios)>; + + nixlPosixIOQueue(uint32_t max_ios) : max_ios_(normalizedMaxIOS(max_ios)) {} + + virtual ~nixlPosixIOQueue() {} + + virtual nixl_status_t + enqueue(int fd, + void *buf, + size_t len, + off_t offset, + bool read, + nixlPosixIOQueueDoneCb clb, + void *ctx) = 0; + virtual nixl_status_t + post(void) = 0; + virtual nixl_status_t + poll(void) = 0; + + static std::unique_ptr + instantiate(std::string_view io_queue_type, uint32_t max_ios); + +protected: + static uint32_t + normalizedMaxIOS(uint32_t max_ios) { + uint32_t m = std::max(MIN_IOS, max_ios); + m = std::min(m, MAX_IOS); + return m; + } + + uint32_t max_ios_; + static const uint32_t MIN_IOS; + static const uint32_t MAX_IOS; +}; + +template class nixlPosixIOQueueImpl : public nixlPosixIOQueue { +public: + nixlPosixIOQueueImpl(uint32_t max_ios) : nixlPosixIOQueue(max_ios), ios_(max_ios_) { + for (uint32_t i = 0; i < max_ios_; i++) { + free_ios_.push_back(&ios_[i]); + } + } + +protected: + std::vector ios_; + std::list free_ios_; + std::list ios_to_submit_; +}; + + +#endif // POSIX_IO_QUEUE_H diff --git a/src/plugins/posix/io_uring_io_queue.cpp b/src/plugins/posix/io_uring_io_queue.cpp new file mode 100644 index 000000000..ca50d88ae --- /dev/null +++ b/src/plugins/posix/io_uring_io_queue.cpp @@ -0,0 +1,197 @@ +/* + * SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved. + * SPDX-License-Identifier: Apache-2.0 + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "io_queue.h" +#include "common/nixl_log.h" +#include +#include + +#define MAX_IO_SUBMIT_BATCH_SIZE 64 +#define MAX_IO_CHECK_COMPLETED_BATCH_SIZE 64 + +struct nixlPosixIoUringIO { +public: + int fd; + void *buf_; + size_t len_; + off_t offset_; + bool read_; + nixlPosixIOQueueDoneCb clb_; + void *ctx_; + struct io_uring_sqe *sqe_; +}; + +class nixlPosixIoUringIOQueue : public nixlPosixIOQueueImpl { +public: + nixlPosixIoUringIOQueue(uint32_t max_ios); + + virtual nixl_status_t + post(void) override; + virtual nixl_status_t + enqueue(int fd, + void *buf, + size_t len, + off_t offset, + bool read, + nixlPosixIOQueueDoneCb clb, + void *ctx) override; + virtual nixl_status_t + poll(void) override; + virtual ~nixlPosixIoUringIOQueue() override; + +protected: + nixlPosixIoUringIO * + getBufInfo(struct iocb *io); + nixl_status_t + doSubmit(void); + nixl_status_t + doCheckCompleted(void); + +private: + struct io_uring uring; // The io_uring instance for async I/O operations +}; + +nixlPosixIoUringIOQueue::nixlPosixIoUringIOQueue(uint32_t max_ios) : nixlPosixIOQueueImpl(max_ios) { + io_uring_params params = {}; + if (io_uring_queue_init_params(max_ios_, &uring, ¶ms) < 0) { + throw std::runtime_error( + absl::StrFormat("Failed to initialize io_uring instance: %s", nixl_strerror(errno))); + } +} + +nixl_status_t +nixlPosixIoUringIOQueue::doSubmit(void) { + if (ios_to_submit_.empty()) { + return NIXL_SUCCESS; // No blocks to submit + } + + int num_ios = std::min(MAX_IO_SUBMIT_BATCH_SIZE, (int)ios_to_submit_.size()); + for (int i = 0; i < num_ios; i++) { + nixlPosixIoUringIO *io = ios_to_submit_.front(); + ios_to_submit_.pop_front(); + + struct io_uring_sqe *sqe = io_uring_get_sqe(&uring); + if (!sqe) { + NIXL_ERROR << "Failed to get io_uring submission queue entry"; + return NIXL_ERR_BACKEND; + } + + if (io->read_) { + io_uring_prep_read(sqe, io->fd, io->buf_, io->len_, io->offset_); + } else { + io_uring_prep_write(sqe, io->fd, io->buf_, io->len_, io->offset_); + } + + io_uring_sqe_set_data(sqe, io); + } + + int ret = io_uring_submit(&uring); + if (ret < 0) { + NIXL_ERROR << "io_uring_submit failed: " << nixl_strerror(-ret); + return NIXL_ERR_BACKEND; + } + + return ios_to_submit_.empty() ? NIXL_SUCCESS : NIXL_IN_PROG; +} + +nixl_status_t +nixlPosixIoUringIOQueue::doCheckCompleted(void) { + struct io_uring_cqe *cqe; + unsigned head; + int count = 0; + io_uring_for_each_cqe(&uring, head, cqe) { + int res = cqe->res; + nixlPosixIoUringIO *io = reinterpret_cast(io_uring_cqe_get_data(cqe)); + if (io->clb_) { + io->clb_(io->ctx_, res, 0); + } + free_ios_.push_back(io); + if (res < 0) { + NIXL_ERROR << absl::StrFormat("IO operation failed: %s", nixl_strerror(-res)); + return NIXL_ERR_BACKEND; + } + count++; + if (count == MAX_IO_CHECK_COMPLETED_BATCH_SIZE) { + break; + } + } + + // Mark all seen + io_uring_cq_advance(&uring, count); + + if (free_ios_.size() == max_ios_) { + return NIXL_SUCCESS; // All ios are free now + } + + return NIXL_IN_PROG; // Some ios are in flight, need to check again +} + +nixl_status_t +nixlPosixIoUringIOQueue::enqueue(int fd, + void *buf, + size_t len, + off_t offset, + bool read, + nixlPosixIOQueueDoneCb clb, + void *ctx) { + if (free_ios_.empty()) { + NIXL_ERROR << "No more free blocks available"; + return NIXL_ERR_NOT_ALLOWED; + } + + nixlPosixIoUringIO *io = free_ios_.front(); + free_ios_.pop_front(); + io->fd = fd; + io->buf_ = buf; + io->len_ = len; + io->offset_ = offset; + io->read_ = read; + io->clb_ = clb; + io->ctx_ = ctx; + + ios_to_submit_.push_back(io); + + return NIXL_SUCCESS; +} + +nixl_status_t +nixlPosixIoUringIOQueue::post(void) { + nixl_status_t status = doSubmit(); + if (status < 0) { + return status; + } + + return NIXL_IN_PROG; +} + +nixl_status_t +nixlPosixIoUringIOQueue::poll(void) { + nixl_status_t status = post(); + if (status < 0) { + return status; + } + + return doCheckCompleted(); +} + +nixlPosixIoUringIOQueue::~nixlPosixIoUringIOQueue() { + io_uring_queue_exit(&uring); +} + +std::unique_ptr nixlPosixIoUringIOQueueCreate(uint32_t max_ios) { + return std::make_unique(max_ios); +} diff --git a/src/plugins/posix/linux_aio_io_queue.cpp b/src/plugins/posix/linux_aio_io_queue.cpp new file mode 100644 index 000000000..387728cc0 --- /dev/null +++ b/src/plugins/posix/linux_aio_io_queue.cpp @@ -0,0 +1,209 @@ +/* + * SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved. + * SPDX-License-Identifier: Apache-2.0 + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "io_queue.h" +#include +#include "common/nixl_log.h" +#include +#include + +#define MAX_IO_SUBMIT_BATCH_SIZE 64 +#define MAX_IO_CHECK_COMPLETED_BATCH_SIZE 64 + +struct nixlPosixLinuxAioIO { +public: + nixlPosixIOQueueDoneCb clb_; + void *ctx_; + struct iocb io_; +}; + +class nixlPosixLinuxAioIOQueue : public nixlPosixIOQueueImpl { +public: + nixlPosixLinuxAioIOQueue(uint32_t max_ios); + + virtual nixl_status_t + post(void) override; + virtual nixl_status_t + enqueue(int fd, + void *buf, + size_t len, + off_t offset, + bool read, + nixlPosixIOQueueDoneCb clb, + void *ctx) override; + virtual nixl_status_t + poll(void) override; + virtual ~nixlPosixLinuxAioIOQueue() override; + +protected: + nixlPosixLinuxAioIO * + getBufInfo(struct iocb *io); + nixl_status_t + doSubmit(void); + nixl_status_t + doCheckCompleted(void); + +private: + io_context_t io_ctx_; // I/O context +}; + +nixlPosixLinuxAioIOQueue::nixlPosixLinuxAioIOQueue(uint32_t max_ios) + : nixlPosixIOQueueImpl(max_ios) { + int res = io_queue_init(max_ios_, &io_ctx_); + if (res) { + throw std::runtime_error( + absl::StrFormat("Failed to initialize io_queue: %s", nixl_strerror(errno))); + } +} + +nixl_status_t +nixlPosixLinuxAioIOQueue::enqueue(int fd, + void *buf, + size_t len, + off_t offset, + bool read, + nixlPosixIOQueueDoneCb clb, + void *ctx) { + if (free_ios_.empty()) { + NIXL_ERROR << "No more free blocks available"; + return NIXL_ERR_NOT_ALLOWED; + } + nixlPosixLinuxAioIO *io = free_ios_.front(); + free_ios_.pop_front(); + + if (read) { + io_prep_pread(&io->io_, fd, buf, len, offset); + } else { + io_prep_pwrite(&io->io_, fd, buf, len, offset); + } + io->clb_ = clb; + io->ctx_ = ctx; + io->io_.data = io; + ios_to_submit_.push_back(io); + + return NIXL_SUCCESS; +} + +nixlPosixLinuxAioIOQueue::~nixlPosixLinuxAioIOQueue() { + io_queue_release(io_ctx_); +} + +nixl_status_t +nixlPosixLinuxAioIOQueue::doSubmit(void) { + struct iocb *ios[MAX_IO_SUBMIT_BATCH_SIZE]; + nixlPosixLinuxAioIO *to_submit[MAX_IO_SUBMIT_BATCH_SIZE]; + + if (ios_to_submit_.empty()) { + return NIXL_SUCCESS; // No blocks to submit + } + + int num_ios = std::min(MAX_IO_SUBMIT_BATCH_SIZE, (int)ios_to_submit_.size()); + for (int i = 0; i < num_ios; i++) { + nixlPosixLinuxAioIO *io = ios_to_submit_.front(); + ios_to_submit_.pop_front(); + + ios[i] = &io->io_; + to_submit[i] = io; + } + + int ret = io_submit(io_ctx_, num_ios, ios); + if (ret < 0) { + if (ret == -EAGAIN) { + ret = 0; // 0 were submitted, we will try again later + } else { + NIXL_ERROR << "io_submit failed: " << nixl_strerror(-ret); + return NIXL_ERR_BACKEND; + } + } + + for (int i = num_ios - 1; i >= ret; i--) { + // If not submitted, push back to the front of the list + nixlPosixLinuxAioIO *io = to_submit[i]; + ios_to_submit_.push_front(io); + } + + return ios_to_submit_.empty() ? NIXL_SUCCESS : NIXL_IN_PROG; +} + +nixl_status_t +nixlPosixLinuxAioIOQueue::doCheckCompleted(void) { + struct io_event events[MAX_IO_CHECK_COMPLETED_BATCH_SIZE]; + std::list completed_ios; + int rc; + struct timespec timeout = {0, 0}; + + if (free_ios_.size() == max_ios_) { + return NIXL_SUCCESS; // All ios are free, no ios in flight + } + + rc = io_getevents(io_ctx_, 0, MAX_IO_CHECK_COMPLETED_BATCH_SIZE, events, &timeout); + if (rc < 0) { + NIXL_ERROR << "io_getevents error: " << rc; + return NIXL_ERR_BACKEND; + } + + for (int i = 0; i < rc; i++) { + struct iocb *iocb = events[i].obj; + nixlPosixLinuxAioIO *io = (nixlPosixLinuxAioIO *)iocb->data; + + if (events[i].res < 0) { + NIXL_ERROR << "AIO operation failed: " << events[i].res; + return NIXL_ERR_BACKEND; + } + + if (io->clb_) { + io->clb_(io->ctx_, events[i].res, 0); + } + + completed_ios.push_back(io); + } + + if (!completed_ios.empty()) { + free_ios_.splice(free_ios_.end(), completed_ios); + } + + if (free_ios_.size() == max_ios_) { + return NIXL_SUCCESS; // All ios are free now + } + + return NIXL_IN_PROG; // Some blocks are in flight, need to check again +} + +nixl_status_t +nixlPosixLinuxAioIOQueue::post(void) { + nixl_status_t status = doSubmit(); + if (status < 0) { + return status; + } + + return NIXL_IN_PROG; +} + +nixl_status_t +nixlPosixLinuxAioIOQueue::poll(void) { + nixl_status_t status = doSubmit(); + if (status < 0) { + return status; + } + + return doCheckCompleted(); +} + +std::unique_ptr +nixlPosixLinuxAioIOQueueCreate(uint32_t max_ios) { + return std::make_unique(max_ios); +} diff --git a/src/plugins/posix/linux_aio_queue.cpp b/src/plugins/posix/linux_aio_queue.cpp deleted file mode 100644 index 0b4cebfa6..000000000 --- a/src/plugins/posix/linux_aio_queue.cpp +++ /dev/null @@ -1,143 +0,0 @@ -/* - * SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved. - * SPDX-License-Identifier: Apache-2.0 - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#include "linux_aio_queue.h" -#include "posix_backend.h" -#include -#include "common/nixl_log.h" -#include -#include -#include - -linuxAioQueue::linuxAioQueue(int num_entries, nixl_xfer_op_t operation) - : io_ctx(io_context_t()), - ios(num_entries), - num_entries(num_entries), - num_ios_to_submit(0), - completed(num_entries), - num_completed(0), - operation(operation) { - if (num_entries <= 0) { - throw std::runtime_error("Invalid number of entries for AIO queue"); - } - - if (operation != NIXL_READ && operation != NIXL_WRITE) { - throw std::runtime_error("Invalid operation for AIO queue"); - } - - int res = io_queue_init(num_entries, &io_ctx); - if (res) { - throw std::runtime_error("io_queue_init (" + std::to_string(num_entries) + - ") failed with " + std::to_string(res)); - } - - ios_to_submit.assign(num_entries, nullptr); -} - -linuxAioQueue::~linuxAioQueue() { - io_queue_release(io_ctx); -} - -nixl_status_t -linuxAioQueue::submit(const nixl_meta_dlist_t &, const nixl_meta_dlist_t &) { - if (!num_ios_to_submit) { - return NIXL_IN_PROG; - } - - int ret = io_submit(io_ctx, num_ios_to_submit, ios_to_submit.data()); - if (ret != num_ios_to_submit) { - if (ret < 0) { - NIXL_ERROR << absl::StrFormat("linux_aio submit failed: %s", nixl_strerror(-ret)); - } else { - NIXL_ERROR << absl::StrFormat( - "linux_aio submit failed. Partial submission: %d/%d", num_ios_to_submit, ret); - } - return NIXL_ERR_BACKEND; - } - - num_completed = 0; - num_ios_to_submit = 0; - return NIXL_IN_PROG; -} - -nixl_status_t -linuxAioQueue::checkCompleted() { - if (num_completed == num_entries) { - return NIXL_SUCCESS; - } - - struct io_event events[32]; - int rc; - struct timespec timeout = {0, 0}; - - rc = io_getevents(io_ctx, 0, 32, events, &timeout); - if (rc < 0) { - NIXL_ERROR << "io_getevents error: " << rc; - return NIXL_ERR_BACKEND; - } - - for (int i = 0; i < rc; i++) { - struct iocb *io = events[i].obj; - size_t idx = (size_t)io->data; - - ios_to_submit[idx] = nullptr; // Mark as completed - - if (events[i].res < 0) { - NIXL_ERROR << "AIO operation failed: " << events[i].res; - return NIXL_ERR_BACKEND; - } - } - - num_completed += rc; - - return (num_completed == num_entries) ? NIXL_SUCCESS : NIXL_IN_PROG; -} - -nixl_status_t -linuxAioQueue::prepIO(int fd, void *buf, size_t len, off_t offset) { - if (num_ios_to_submit == num_entries) { - NIXL_ERROR << "No available IOs"; - return NIXL_ERR_BACKEND; - } - - // Check if file descriptor is valid - if (fd < 0) { - NIXL_ERROR << "Invalid file descriptor provided to prepareIO"; - return NIXL_ERR_BACKEND; - } - - // Check buffer and length - if (!buf || len == 0) { - NIXL_ERROR << "Invalid buffer or length provided to prepareIO"; - return NIXL_ERR_BACKEND; - } - - int idx = num_ios_to_submit; - auto io = &ios[idx]; - - if (operation == NIXL_READ) { - io_prep_pread(io, fd, buf, len, offset); - } else { - io_prep_pwrite(io, fd, buf, len, offset); - } - - ios_to_submit[idx] = io; - io->data = (void *)(uintptr_t)idx; - num_ios_to_submit++; - - return NIXL_SUCCESS; -} diff --git a/src/plugins/posix/linux_aio_queue.h b/src/plugins/posix/linux_aio_queue.h deleted file mode 100644 index 985b5f73e..000000000 --- a/src/plugins/posix/linux_aio_queue.h +++ /dev/null @@ -1,58 +0,0 @@ -/* - * SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved. - * SPDX-License-Identifier: Apache-2.0 - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#ifndef LINUXAIO_QUEUE_H -#define LINUXAIO_QUEUE_H - -#include -#include -#include "posix_queue.h" - -// Forward declare Error class -class nixlPosixBackendReqH; - -class linuxAioQueue : public nixlPosixQueue { -private: - io_context_t io_ctx; // I/O context - std::vector ios; // Array of I/Os - int num_entries; // Total number of entries expected - std::vector ios_to_submit; // Array of I/Os to submit - int num_ios_to_submit; // Total number of entries to submit - std::vector completed; // Track completed I/Os - int num_completed; // Number of completed operations - nixl_xfer_op_t operation; // Whether this is a read operation - - // Delete copy and move operations - linuxAioQueue(const linuxAioQueue &) = delete; - linuxAioQueue & - operator=(const linuxAioQueue &) = delete; - linuxAioQueue(linuxAioQueue &&) = delete; - linuxAioQueue & - operator=(linuxAioQueue &&) = delete; - -public: - linuxAioQueue(int num_entries, nixl_xfer_op_t operation); - ~linuxAioQueue(); - nixl_status_t - submit(const nixl_meta_dlist_t &, const nixl_meta_dlist_t &) override; - nixl_status_t - checkCompleted() override; - nixl_status_t - prepIO(int fd, void *buf, size_t len, off_t offset) override; -}; - -#endif // LINUXAIO_QUEUE_H diff --git a/src/plugins/posix/meson.build b/src/plugins/posix/meson.build index dff2365cf..a9d127801 100644 --- a/src/plugins/posix/meson.build +++ b/src/plugins/posix/meson.build @@ -22,14 +22,16 @@ posix_sources = [ 'posix_backend.cpp', 'posix_backend.h', 'posix_plugin.cpp', - 'queue_factory_impl.cpp', - 'aio_queue.cpp' # Always include AIO source since it's required + 'io_queue.h', + 'io_queue.cpp', + 'linux_aio_io_queue.cpp' # Always include Linux AIO source since it's required ] # If libaio is not found, skip building the POSIX plugin entirely if posix_aio message('Correct libaio found, Building POSIX plugin') compile_defs = ['-DHAVE_LIBAIO'] + posix_sources += ['posix_aio_io_queue.cpp'] plugin_link_args = ['-laio'] plugin_deps += [ aio_dep ] elif rt_dep.found() @@ -44,7 +46,7 @@ endif have_uring = uring_dep.found() if have_uring compile_defs += ['-DHAVE_LIBURING'] - posix_sources += ['uring_queue.cpp'] + posix_sources += ['io_uring_io_queue.cpp'] plugin_deps += [uring_dep] plugin_link_args += ['-luring'] message('liburing found, adding io_uring support') @@ -54,7 +56,7 @@ endif if linux_aio compile_defs += ['-DHAVE_LINUXAIO'] - posix_sources += ['linux_aio_queue.cpp'] + #posix_sources += ['linux_aio_queue.cpp'] plugin_link_args += ['-laio'] message('Linux AIO found, adding Linux AIO support') else diff --git a/src/plugins/posix/posix_aio_io_queue.cpp b/src/plugins/posix/posix_aio_io_queue.cpp new file mode 100644 index 000000000..56b54b863 --- /dev/null +++ b/src/plugins/posix/posix_aio_io_queue.cpp @@ -0,0 +1,190 @@ +/* + * SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved. + * SPDX-License-Identifier: Apache-2.0 + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "io_queue.h" +#include "common/nixl_log.h" +#include + +#define MAX_IO_SUBMIT_BATCH_SIZE 64 +#define MAX_IO_CHECK_COMPLETED_BATCH_SIZE 64 + +struct nixlPosixAioIO { +public: + nixlPosixIOQueueDoneCb clb_; + void *ctx_; + struct aiocb aio_; + bool read_; +}; + +class nixlPosixAioIOQueue : public nixlPosixIOQueueImpl { +public: + nixlPosixAioIOQueue(uint32_t max_ios) : nixlPosixIOQueueImpl(max_ios) {} + + virtual nixl_status_t + post(void) override; + virtual nixl_status_t + enqueue(int fd, + void *buf, + size_t len, + off_t offset, + bool read, + nixlPosixIOQueueDoneCb clb, + void *ctx) override; + virtual nixl_status_t + poll(void) override; + virtual ~nixlPosixAioIOQueue() override; + +protected: + nixl_status_t + doSubmit(void); + nixl_status_t + doCheckCompleted(void); + + std::list ios_in_flight_; +}; + +nixlPosixAioIOQueue::~nixlPosixAioIOQueue() { + for (auto &io : ios_) { + if (io.aio_.aio_fildes != 0) { + aio_cancel(io.aio_.aio_fildes, &io.aio_); + } + } +} + +nixl_status_t +nixlPosixAioIOQueue::enqueue(int fd, + void *buf, + size_t len, + off_t offset, + bool read, + nixlPosixIOQueueDoneCb clb, + void *ctx) { + if (free_ios_.empty()) { + NIXL_ERROR << "No more free blocks available"; + return NIXL_ERR_NOT_ALLOWED; + } + + nixlPosixAioIO *io = free_ios_.front(); + free_ios_.pop_front(); + + io->clb_ = clb; + io->ctx_ = ctx; + io->read_ = read; + io->aio_.aio_fildes = fd; + io->aio_.aio_buf = buf; + io->aio_.aio_nbytes = len; + io->aio_.aio_offset = offset; + + ios_to_submit_.push_back(io); + + return NIXL_SUCCESS; +} + +nixl_status_t +nixlPosixAioIOQueue::doSubmit(void) { + if (ios_to_submit_.empty()) { + return NIXL_SUCCESS; // No blocks to submit + } + + int num_ios = std::min(MAX_IO_SUBMIT_BATCH_SIZE, (int)ios_to_submit_.size()); + for (int i = 0; i < num_ios; i++) { + nixlPosixAioIO *io = ios_to_submit_.front(); + ios_to_submit_.pop_front(); + + int ret; + if (io->read_) { + ret = aio_read(&io->aio_); + } else { + ret = aio_write(&io->aio_); + } + + if (ret < 0) { + NIXL_ERROR << "aio_submit failed: " << nixl_strerror(-ret); + ios_to_submit_.push_front(io); + return NIXL_ERR_BACKEND; + } + + ios_in_flight_.push_back(io); + } + + return ios_to_submit_.empty() ? NIXL_SUCCESS : NIXL_IN_PROG; +} + +nixl_status_t +nixlPosixAioIOQueue::doCheckCompleted(void) { + if (ios_in_flight_.empty()) { + return NIXL_SUCCESS; // No blocks in flight + } + + int num_ios = std::min(MAX_IO_CHECK_COMPLETED_BATCH_SIZE, (int)ios_in_flight_.size()); + for (auto it = ios_in_flight_.begin(); it != ios_in_flight_.end();) { + nixlPosixAioIO *io = *it; + int status = aio_error(&io->aio_); + if (status == 0) { + ssize_t ret = aio_return(&io->aio_); + if (ret < 0 || ret != static_cast(io->aio_.aio_nbytes)) { + NIXL_ERROR << "aio_return failed: " << nixl_strerror(-ret); + ios_in_flight_.push_front(io); + return NIXL_ERR_BACKEND; + } + if (io->clb_) { + io->clb_(io->ctx_, ret, 0); + } + it = ios_in_flight_.erase(it); + free_ios_.push_back(io); + } else if (status == EINPROGRESS) { + return NIXL_IN_PROG; + } else { + NIXL_ERROR << "aio_error failed: " << nixl_strerror(-status); + ios_in_flight_.push_front(io); + return NIXL_ERR_BACKEND; + } + + it++; + + num_ios--; + if (num_ios == 0) { + break; + } + } + + return ios_in_flight_.empty() ? NIXL_SUCCESS : NIXL_IN_PROG; +} + +nixl_status_t +nixlPosixAioIOQueue::post(void) { + nixl_status_t status = doSubmit(); + if (status < 0) { + return status; + } + return NIXL_IN_PROG; +} + +nixl_status_t +nixlPosixAioIOQueue::poll(void) { + nixl_status_t status = doSubmit(); + if (status < 0) { + return status; + } + + return doCheckCompleted(); +} + +std::unique_ptr +nixlPosixAioIOQueueCreate(uint32_t max_ios) { + return std::make_unique(max_ios); +} diff --git a/src/plugins/posix/posix_backend.cpp b/src/plugins/posix/posix_backend.cpp index a391d7098..839b8ba3f 100644 --- a/src/plugins/posix/posix_backend.cpp +++ b/src/plugins/posix/posix_backend.cpp @@ -23,108 +23,110 @@ #include #include #include "common/nixl_log.h" -#include "queue_factory_impl.h" #include "nixl_types.h" #include "file/file_utils.h" namespace { - bool isValidPrepXferParams(const nixl_xfer_op_t &operation, - const nixl_meta_dlist_t &local, - const nixl_meta_dlist_t &remote, - const std::string &remote_agent, - const std::string &local_agent) { - if (remote_agent != local_agent) { - NIXL_ERROR << absl::StrFormat("Error: Remote agent must match the requesting agent (%s). Got %s", - local_agent, remote_agent); - return false; - } - - if (local.getType() != DRAM_SEG) { - NIXL_ERROR << absl::StrFormat("Error: Local memory type must be DRAM_SEG, got %d", local.getType()); - return false; - } - - if (remote.getType() != FILE_SEG) { - NIXL_ERROR << absl::StrFormat("Error: Remote memory type must be FILE_SEG, got %d", remote.getType()); - return false; - } +bool +isValidPrepXferParams(const nixl_xfer_op_t &operation, + const nixl_meta_dlist_t &local, + const nixl_meta_dlist_t &remote, + const std::string &remote_agent, + const std::string &local_agent) { + if (remote_agent != local_agent) { + NIXL_ERROR << absl::StrFormat( + "Error: Remote agent must match the requesting agent (%s). Got %s", + local_agent, + remote_agent); + return false; + } - if (local.descCount() != remote.descCount()) { - NIXL_ERROR << absl::StrFormat("Error: Mismatch in descriptor counts - local: %d, remote: %d", - local.descCount(), remote.descCount()); - return false; - } + if (local.getType() != DRAM_SEG) { + NIXL_ERROR << absl::StrFormat("Error: Local memory type must be DRAM_SEG, got %d", + local.getType()); + return false; + } - return true; + if (remote.getType() != FILE_SEG) { + NIXL_ERROR << absl::StrFormat("Error: Remote memory type must be FILE_SEG, got %d", + remote.getType()); + return false; } - nixlPosixBackendReqH& castPosixHandle(nixlBackendReqH* handle) { - if (!handle) { - throw nixlPosixBackendReqH::exception("received null handle", NIXL_ERR_INVALID_PARAM); - } - return dynamic_cast(*handle); + if (local.descCount() != remote.descCount()) { + NIXL_ERROR << absl::StrFormat( + "Error: Mismatch in descriptor counts - local: %d, remote: %d", + local.descCount(), + remote.descCount()); + return false; } - // Stringify function for queue_t - inline const char* to_string(nixlPosixQueue::queue_t type) { - using queue_t = nixlPosixQueue::queue_t; - switch (type) { - case queue_t::AIO: return "AIO"; - case queue_t::URING: return "URING"; - case queue_t::POSIXAIO: - return "POSIXAIO"; - case queue_t::UNSUPPORTED: return "UNSUPPORTED"; - default: return "UNKNOWN"; - } + return true; +} + +nixlPosixBackendReqH & +castPosixHandle(nixlBackendReqH *handle) { + if (!handle) { + throw nixlPosixBackendReqH::exception("received null handle", NIXL_ERR_INVALID_PARAM); } + return dynamic_cast(*handle); +} - static nixlPosixQueue::queue_t getQueueType(const nixl_b_params_t* custom_params) { - using queue_t = nixlPosixQueue::queue_t; - - // Check for explicit backend request - if (custom_params) { - // First check if AIO is explicitly requested - if (custom_params->count("use_aio") > 0) { - const auto& value = custom_params->at("use_aio"); - if (value == "true" || value == "1") { - if (!QueueFactory::isLinuxAioAvailable()) { - NIXL_ERROR << "linux_aio backend requested but not available at runtime"; - return queue_t::UNSUPPORTED; - } - return queue_t::AIO; - } +static const char * +getIoQueueType(const nixl_b_params_t *custom_params) { + // Check for explicit backend request + if (custom_params) { + // First check if AIO is explicitly requested + if (custom_params->count("use_aio") > 0) { + const auto &value = custom_params->at("use_aio"); + if (value == "true" || value == "1") { + return "AIO"; } + } - // Then check if io_uring is explicitly requested - if (custom_params->count("use_uring") > 0) { - const auto& value = custom_params->at("use_uring"); - if (value == "true" || value == "1") { - if (!QueueFactory::isUringAvailable()) { - NIXL_ERROR << "io_uring backend requested but not available at runtime"; - return queue_t::UNSUPPORTED; - } - return queue_t::URING; - } +#ifdef HAVE_LIBURING + // Then check if io_uring is explicitly requested + if (custom_params->count("use_uring") > 0) { + const auto &value = custom_params->at("use_uring"); + if (value == "true" || value == "1") { + return "URING"; } + } +#endif - // Then check if linux_aio is explicitly requested - if (custom_params->count("use_posix_aio") > 0) { - const auto &value = custom_params->at("use_posix_aio"); - if (value == "true" || value == "1") { - return queue_t::POSIXAIO; - } +#ifdef HAVE_LIBAIO + // Then check if linux_aio is explicitly requested + if (custom_params->count("use_posix_aio") > 0) { + const auto &value = custom_params->at("use_posix_aio"); + if (value == "true" || value == "1") { + return "POSIXAIO"; } } +#endif + } - if (QueueFactory::isLinuxAioAvailable()) { - return queue_t::AIO; - } - if (QueueFactory::isUringAvailable()) { - return queue_t::URING; - } - return queue_t::POSIXAIO; + return "AIO"; +} + +// Log completion percentage at regular intervals (every log_percent_step percent) +void +logOnPercentStep(unsigned int completed, unsigned int total) { + constexpr unsigned int default_log_percent_step = 10; + static_assert(default_log_percent_step >= 1 && default_log_percent_step <= 100, + "log_percent_step must be in [1, 100]"); + unsigned int log_percent_step = total < 10 ? 1 : default_log_percent_step; + + if (total == 0) { + NIXL_ERROR << "Tried to log completion percentage with total == 0"; + return; + } + // Only log at each percentage step + if (completed % (total / log_percent_step) == 0) { + NIXL_DEBUG << absl::StrFormat("Queue progress: %.1f%% complete", + (completed * 100.0 / total)); } } +} // namespace // ----------------------------------------------------------------------------- // POSIX Backend Request Handle Implementation @@ -133,108 +135,132 @@ namespace { nixlPosixBackendReqH::nixlPosixBackendReqH(const nixl_xfer_op_t &op, const nixl_meta_dlist_t &loc, const nixl_meta_dlist_t &rem, - const nixl_opt_b_args_t* args, - const nixl_b_params_t* params) - : operation(op) - , local(loc) - , remote(rem) - , opt_args(args) - , custom_params_(params) - , queue_depth_(loc.descCount()) - , queue_type_(getQueueType(params)) { - if (queue_type_ == nixlPosixQueue::queue_t::UNSUPPORTED) { - throw exception(absl::StrFormat("Unsupported queue type"), NIXL_ERR_NOT_SUPPORTED); + const nixl_opt_b_args_t *args, + const nixl_b_params_t *params) + : operation(op), + local(loc), + remote(rem), + opt_args(args), + custom_params_(params), + queue_depth_(loc.descCount()) { + + std::string io_queue_type = params->at("io_queue_type"); + if (io_queue_type.empty()) { + throw exception("Unsupported io queue type: no io queue type specified", + NIXL_ERR_NOT_SUPPORTED); } if (local.descCount() == 0 || remote.descCount() == 0) { - throw exception( - absl::StrFormat("Invalid descriptor count - local: %zu, remote: %zu", local.descCount(), remote.descCount()), - NIXL_ERR_INVALID_PARAM); + throw exception(absl::StrFormat("Invalid descriptor count - local: %zu, remote: %zu", + local.descCount(), + remote.descCount()), + NIXL_ERR_INVALID_PARAM); } - nixl_status_t status = initQueues(); + nixl_status_t status = initIoQueue(io_queue_type); if (status != NIXL_SUCCESS) { - throw exception(absl::StrFormat("Failed to initialize queues: %s", to_string(queue_type_)), + throw exception(absl::StrFormat("Failed to initialize io queue: %s", io_queue_type), status); } } +void +nixlPosixBackendReqH::ioDone(uint32_t data_size, int error) { + num_confirmed_ios_++; + logOnPercentStep(num_confirmed_ios_, queue_depth_); +} + +void +nixlPosixBackendReqH::ioDoneClb(void *ctx, uint32_t data_size, int error) { + nixlPosixBackendReqH *self = static_cast(ctx); + self->ioDone(data_size, error); +} -nixl_status_t nixlPosixBackendReqH::initQueues() { +nixl_status_t +nixlPosixBackendReqH::initIoQueue(const std::string &io_queue_type) { try { - switch (queue_type_) { - case nixlPosixQueue::queue_t::AIO: - queue = QueueFactory::createLinuxAioQueue(queue_depth_, operation); - break; - case nixlPosixQueue::queue_t::URING: - queue = QueueFactory::createUringQueue(queue_depth_, operation); - break; - case nixlPosixQueue::queue_t::POSIXAIO: - queue = QueueFactory::createPosixAioQueue(queue_depth_, operation); - break; - default: - NIXL_ERROR << absl::StrFormat("Invalid queue type: %s", to_string(queue_type_)); - return NIXL_ERR_INVALID_PARAM; + io_queue_ = nixlPosixIOQueue::instantiate(io_queue_type, queue_depth_); + if (!io_queue_) { + throw exception(absl::StrFormat("Failed to initialize io queue: %s", io_queue_type), + NIXL_ERR_INVALID_PARAM); } + return NIXL_SUCCESS; - } catch (const nixlPosixBackendReqH::exception& e) { - NIXL_ERROR << absl::StrFormat("Failed to initialize queues: %s", e.what()); + } + catch (const nixlPosixBackendReqH::exception &e) { + NIXL_ERROR << absl::StrFormat("Failed to initialize io queue: %s", e.what()); return e.code(); - } catch (const std::exception& e) { - NIXL_ERROR << absl::StrFormat("Failed to initialize queues: %s", e.what()); + } + catch (const std::exception &e) { + NIXL_ERROR << absl::StrFormat("Failed to initialize io queue: %s", e.what()); return NIXL_ERR_BACKEND; } } -nixl_status_t nixlPosixBackendReqH::prepXfer() { +nixl_status_t +nixlPosixBackendReqH::prepXfer() { + return NIXL_SUCCESS; +} + +nixl_status_t +nixlPosixBackendReqH::checkXfer() { + if (num_confirmed_ios_ == queue_depth_) { + return NIXL_SUCCESS; + } + + nixl_status_t status = io_queue_->poll(); + if (status < 0) { + return status; + } + + return NIXL_IN_PROG; +} + +nixl_status_t +nixlPosixBackendReqH::postXfer() { + num_confirmed_ios_ = 0; + for (auto [local_it, remote_it] = std::make_pair(local.begin(), remote.begin()); local_it != local.end() && remote_it != remote.end(); ++local_it, ++remote_it) { - nixl_status_t status = queue->prepIO( - remote_it->devId, - reinterpret_cast(local_it->addr), - remote_it->len, - remote_it->addr - ); + nixl_status_t status = io_queue_->enqueue(remote_it->devId, + reinterpret_cast(local_it->addr), + remote_it->len, + remote_it->addr, + operation == NIXL_READ, + ioDoneClb, + this); if (status != NIXL_SUCCESS) { - NIXL_ERROR << "Error preparing I/O operation"; + // Currently we do not support partial submissions, so it's all or nothing + NIXL_ERROR << absl::StrFormat("Error preparing I/O operation: %d", status); return status; } } - return NIXL_SUCCESS; -} - -nixl_status_t nixlPosixBackendReqH::checkXfer() { - return queue->checkCompleted(); -} - -nixl_status_t nixlPosixBackendReqH::postXfer() { - return queue->submit (local, remote); + return io_queue_->post(); } // ----------------------------------------------------------------------------- // POSIX Engine Implementation // ----------------------------------------------------------------------------- -nixlPosixEngine::nixlPosixEngine(const nixlBackendInitParams* init_params) - : nixlBackendEngine(init_params) - , queue_type_(getQueueType(init_params->customParams)) { - if (queue_type_ == nixlPosixQueue::queue_t::UNSUPPORTED) { +nixlPosixEngine::nixlPosixEngine(const nixlBackendInitParams *init_params) + : nixlBackendEngine(init_params), + io_queue_type_(getIoQueueType(init_params->customParams)) { + if (!io_queue_type_) { initErr = true; - NIXL_ERROR << absl::StrFormat( - "Failed to initialize POSIX backend - requested queue type not available: %s", - to_string(queue_type_)); + NIXL_ERROR << "Failed to initialize POSIX backend - no supported io queue type found"; return; } - NIXL_INFO << absl::StrFormat("POSIX backend initialized using queue type: %s", - to_string(queue_type_)); + NIXL_INFO << absl::StrFormat("POSIX backend initialized using io queue type: %s", + io_queue_type_); } -nixl_status_t nixlPosixEngine::registerMem(const nixlBlobDesc &mem, - const nixl_mem_t &nixl_mem, - nixlBackendMD* &out) { +nixl_status_t +nixlPosixEngine::registerMem(const nixlBlobDesc &mem, + const nixl_mem_t &nixl_mem, + nixlBackendMD *&out) { auto supported_mems = getSupportedMems(); if (std::find(supported_mems.begin(), supported_mems.end(), nixl_mem) != supported_mems.end()) return NIXL_SUCCESS; @@ -242,16 +268,18 @@ nixl_status_t nixlPosixEngine::registerMem(const nixlBlobDesc &mem, return NIXL_ERR_NOT_SUPPORTED; } -nixl_status_t nixlPosixEngine::deregisterMem(nixlBackendMD *) { +nixl_status_t +nixlPosixEngine::deregisterMem(nixlBackendMD *) { return NIXL_SUCCESS; } -nixl_status_t nixlPosixEngine::prepXfer(const nixl_xfer_op_t &operation, - const nixl_meta_dlist_t &local, - const nixl_meta_dlist_t &remote, - const std::string &remote_agent, - nixlBackendReqH* &handle, - const nixl_opt_b_args_t* opt_args) const { +nixl_status_t +nixlPosixEngine::prepXfer(const nixl_xfer_op_t &operation, + const nixl_meta_dlist_t &local, + const nixl_meta_dlist_t &remote, + const std::string &remote_agent, + nixlBackendReqH *&handle, + const nixl_opt_b_args_t *opt_args) const { if (!isValidPrepXferParams(operation, local, remote, remote_agent, localAgent)) { return NIXL_ERR_INVALID_PARAM; } @@ -259,22 +287,10 @@ nixl_status_t nixlPosixEngine::prepXfer(const nixl_xfer_op_t &operation, try { // Create a params map with our backend selection nixl_b_params_t params; - switch (queue_type_) { - case nixlPosixQueue::queue_t::AIO: - params["use_aio"] = "true"; - break; - case nixlPosixQueue::queue_t::URING: - params["use_uring"] = "true"; - break; - case nixlPosixQueue::queue_t::POSIXAIO: - params["use_posix_aio"] = "true"; - break; - default: - NIXL_ERROR << absl::StrFormat("Invalid queue type: %s", to_string(queue_type_)); - return NIXL_ERR_INVALID_PARAM; - } + params["io_queue_type"] = io_queue_type_; - auto posix_handle = std::make_unique(operation, local, remote, opt_args, ¶ms); + auto posix_handle = + std::make_unique(operation, local, remote, opt_args, ¶ms); nixl_status_t status = posix_handle->prepXfer(); if (status != NIXL_SUCCESS) { return status; @@ -282,53 +298,60 @@ nixl_status_t nixlPosixEngine::prepXfer(const nixl_xfer_op_t &operation, handle = posix_handle.release(); return NIXL_SUCCESS; - } catch (const nixlPosixBackendReqH::exception& e) { + } + catch (const nixlPosixBackendReqH::exception &e) { NIXL_ERROR << absl::StrFormat("Error: %s", e.what()); return e.code(); - } catch (const std::exception& e) { + } + catch (const std::exception &e) { NIXL_ERROR << absl::StrFormat("Unexpected error: %s", e.what()); return NIXL_ERR_BACKEND; } } -nixl_status_t nixlPosixEngine::postXfer(const nixl_xfer_op_t &operation, - const nixl_meta_dlist_t &local, - const nixl_meta_dlist_t &remote, - const std::string &remote_agent, - nixlBackendReqH* &handle, - const nixl_opt_b_args_t* opt_args) const { +nixl_status_t +nixlPosixEngine::postXfer(const nixl_xfer_op_t &operation, + const nixl_meta_dlist_t &local, + const nixl_meta_dlist_t &remote, + const std::string &remote_agent, + nixlBackendReqH *&handle, + const nixl_opt_b_args_t *opt_args) const { try { - auto& posix_handle = castPosixHandle(handle); + auto &posix_handle = castPosixHandle(handle); nixl_status_t status = posix_handle.postXfer(); if (status != NIXL_IN_PROG) { NIXL_ERROR << "Error in submitting queue"; } return status; - } catch (const nixlPosixBackendReqH::exception& e) { + } + catch (const nixlPosixBackendReqH::exception &e) { NIXL_ERROR << e.what(); return e.code(); } return NIXL_ERR_BACKEND; } -nixl_status_t nixlPosixEngine::checkXfer(nixlBackendReqH* handle) const { +nixl_status_t +nixlPosixEngine::checkXfer(nixlBackendReqH *handle) const { try { - auto& posix_handle = castPosixHandle(handle); + auto &posix_handle = castPosixHandle(handle); return posix_handle.checkXfer(); } - catch (const nixlPosixBackendReqH::exception& e) { + catch (const nixlPosixBackendReqH::exception &e) { NIXL_ERROR << e.what(); return e.code(); } return NIXL_ERR_BACKEND; } -nixl_status_t nixlPosixEngine::releaseReqH(nixlBackendReqH* handle) const { +nixl_status_t +nixlPosixEngine::releaseReqH(nixlBackendReqH *handle) const { try { - auto& posix_handle = castPosixHandle(handle); + auto &posix_handle = castPosixHandle(handle); posix_handle.~nixlPosixBackendReqH(); return NIXL_SUCCESS; - } catch (const nixlPosixBackendReqH::exception& e) { + } + catch (const nixlPosixBackendReqH::exception &e) { NIXL_ERROR << e.what(); return e.code(); } diff --git a/src/plugins/posix/posix_backend.h b/src/plugins/posix/posix_backend.h index b2777297b..0fbf8fc06 100644 --- a/src/plugins/posix/posix_backend.h +++ b/src/plugins/posix/posix_backend.h @@ -23,107 +23,131 @@ #include #include #include "backend/backend_engine.h" -#include "posix_queue.h" +#include "io_queue.h" class nixlPosixBackendReqH : public nixlBackendReqH { private: - const nixl_xfer_op_t &operation; // The transfer operation (read/write) - const nixl_meta_dlist_t &local; // Local memory descriptor list - const nixl_meta_dlist_t &remote; // Remote memory descriptor list - const nixl_opt_b_args_t *opt_args; // Optional backend-specific arguments - const nixl_b_params_t *custom_params_; // Custom backend parameters - const int queue_depth_; // Queue depth for async I/O - std::unique_ptr queue; // Async I/O queue instance - const nixlPosixQueue::queue_t queue_type_; // Type of queue used + const nixl_xfer_op_t &operation; // The transfer operation (read/write) + const nixl_meta_dlist_t &local; // Local memory descriptor list + const nixl_meta_dlist_t &remote; // Remote memory descriptor list + const nixl_opt_b_args_t *opt_args; // Optional backend-specific arguments + const nixl_b_params_t *custom_params_; // Custom backend parameters + const int queue_depth_; // Queue depth for async I/O + std::unique_ptr io_queue_; // Async I/O queue instance + int num_confirmed_ios_; // Number of confirmed IOs - nixl_status_t initQueues(); // Initialize async I/O queue + nixl_status_t + initIoQueue(const std::string &io_queue_type); // Initialize async I/O queue + void + ioDone(uint32_t data_size, int error); + static void + ioDoneClb(void *ctx, uint32_t data_size, int error); public: nixlPosixBackendReqH(const nixl_xfer_op_t &operation, const nixl_meta_dlist_t &local, const nixl_meta_dlist_t &remote, - const nixl_opt_b_args_t* opt_args, - const nixl_b_params_t* custom_params); - ~nixlPosixBackendReqH() {}; + const nixl_opt_b_args_t *opt_args, + const nixl_b_params_t *custom_params); + ~nixlPosixBackendReqH(){}; - nixl_status_t postXfer(); - nixl_status_t prepXfer(); - nixl_status_t checkXfer(); + nixl_status_t + postXfer(); + nixl_status_t + prepXfer(); + nixl_status_t + checkXfer(); // Exception classes - class exception: public std::exception { - private: - const nixl_status_t code_; - public: - exception(const std::string& msg, nixl_status_t code) - : std::exception(), code_(code) {} - nixl_status_t code() const noexcept { return code_; } + class exception : public std::exception { + private: + const nixl_status_t code_; + + public: + exception(const std::string &msg, nixl_status_t code) : std::exception(), code_(code) {} + + nixl_status_t + code() const noexcept { + return code_; + } }; }; class nixlPosixEngine : public nixlBackendEngine { private: - const nixlPosixQueue::queue_t queue_type_; + const char *io_queue_type_; public: - nixlPosixEngine(const nixlBackendInitParams* init_params); + nixlPosixEngine(const nixlBackendInitParams *init_params); virtual ~nixlPosixEngine() = default; - bool supportsRemote() const override { + bool + supportsRemote() const override { return false; } - bool supportsLocal() const override { + bool + supportsLocal() const override { return true; } - bool supportsNotif() const override { + bool + supportsNotif() const override { return false; } - nixl_mem_list_t getSupportedMems() const override { + nixl_mem_list_t + getSupportedMems() const override { return {FILE_SEG, DRAM_SEG}; } - nixl_status_t registerMem(const nixlBlobDesc &mem, - const nixl_mem_t &nixl_mem, - nixlBackendMD* &out) override; + nixl_status_t + registerMem(const nixlBlobDesc &mem, const nixl_mem_t &nixl_mem, nixlBackendMD *&out) override; - nixl_status_t deregisterMem(nixlBackendMD* meta) override; + nixl_status_t + deregisterMem(nixlBackendMD *meta) override; - nixl_status_t connect(const std::string &remote_agent) override { + nixl_status_t + connect(const std::string &remote_agent) override { return NIXL_SUCCESS; } - nixl_status_t disconnect(const std::string &remote_agent) override { + nixl_status_t + disconnect(const std::string &remote_agent) override { return NIXL_SUCCESS; } - nixl_status_t unloadMD(nixlBackendMD* input) override { + nixl_status_t + unloadMD(nixlBackendMD *input) override { return NIXL_SUCCESS; } - nixl_status_t prepXfer(const nixl_xfer_op_t &operation, - const nixl_meta_dlist_t &local, - const nixl_meta_dlist_t &remote, - const std::string &remote_agent, - nixlBackendReqH* &handle, - const nixl_opt_b_args_t* opt_args=nullptr) const override; + nixl_status_t + prepXfer(const nixl_xfer_op_t &operation, + const nixl_meta_dlist_t &local, + const nixl_meta_dlist_t &remote, + const std::string &remote_agent, + nixlBackendReqH *&handle, + const nixl_opt_b_args_t *opt_args = nullptr) const override; - nixl_status_t postXfer(const nixl_xfer_op_t &operation, - const nixl_meta_dlist_t &local, - const nixl_meta_dlist_t &remote, - const std::string &remote_agent, - nixlBackendReqH* &handle, - const nixl_opt_b_args_t* opt_args=nullptr) const override; + nixl_status_t + postXfer(const nixl_xfer_op_t &operation, + const nixl_meta_dlist_t &local, + const nixl_meta_dlist_t &remote, + const std::string &remote_agent, + nixlBackendReqH *&handle, + const nixl_opt_b_args_t *opt_args = nullptr) const override; - nixl_status_t checkXfer(nixlBackendReqH* handle) const override; - nixl_status_t releaseReqH(nixlBackendReqH* handle) const override; + nixl_status_t + checkXfer(nixlBackendReqH *handle) const override; + nixl_status_t + releaseReqH(nixlBackendReqH *handle) const override; nixl_status_t queryMem(const nixl_reg_dlist_t &descs, std::vector &resp) const override; - nixl_status_t loadLocalMD(nixlBackendMD* input, nixlBackendMD* &output) override { + nixl_status_t + loadLocalMD(nixlBackendMD *input, nixlBackendMD *&output) override { output = input; return NIXL_SUCCESS; } diff --git a/src/plugins/posix/posix_queue.h b/src/plugins/posix/posix_queue.h deleted file mode 100644 index d9251897c..000000000 --- a/src/plugins/posix/posix_queue.h +++ /dev/null @@ -1,42 +0,0 @@ -/* - * SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved. - * SPDX-License-Identifier: Apache-2.0 - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#ifndef POSIX_QUEUE_H -#define POSIX_QUEUE_H - -#include "nixl_types.h" -#include "backend/backend_aux.h" -#include - -// Abstract base class for async I/O operations -class nixlPosixQueue { - public: - virtual ~nixlPosixQueue() = default; - virtual nixl_status_t - submit (const nixl_meta_dlist_t &local, const nixl_meta_dlist_t &remote) = 0; - virtual nixl_status_t checkCompleted() = 0; - virtual nixl_status_t prepIO(int fd, void* buf, size_t len, off_t offset) = 0; - - enum class queue_t { - AIO, - URING, - POSIXAIO, - UNSUPPORTED, - }; -}; - -#endif // POSIX_QUEUE_H diff --git a/src/plugins/posix/queue_factory_impl.cpp b/src/plugins/posix/queue_factory_impl.cpp deleted file mode 100644 index 6d85f1eb4..000000000 --- a/src/plugins/posix/queue_factory_impl.cpp +++ /dev/null @@ -1,108 +0,0 @@ -/* - * SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved. - * SPDX-License-Identifier: Apache-2.0 - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#include -#include "queue_factory_impl.h" -#include "posix_queue.h" -#include "posix_backend.h" -#include "aio_queue.h" - -#ifdef HAVE_LIBURING -#include "uring_queue.h" -#endif - -#ifdef HAVE_LINUXAIO -#include "linux_aio_queue.h" -#endif - -// Anonymous namespace for internal template implementations for functions that use the optional liburing -namespace { - struct uringEnabled {}; - struct uringDisabled {}; - -#ifdef HAVE_LIBURING - using uringMode = uringEnabled; -#else - using uringMode = uringDisabled; -#endif - - template - struct funcImpl; - - template - struct funcImpl::value>> { - static std::unique_ptr createUringQueue(int num_entries, nixl_xfer_op_t operation) { - // Initialize io_uring parameters with basic configuration - // Start with basic parameters, no special flags - // We can add optimizations like SQPOLL later - struct io_uring_params params = {}; - return std::make_unique(num_entries, params, operation); - } - - static bool isUringAvailable() { - return true; - } - }; - - template - struct funcImpl::value>> { - static std::unique_ptr createUringQueue(int num_entries, nixl_xfer_op_t operation) { - (void)num_entries; - (void)operation; - throw nixlPosixBackendReqH::exception("Attempting to create io_uring queue when support is not compiled in", - NIXL_ERR_NOT_SUPPORTED); - } - - static bool isUringAvailable() { - return false; - } - }; -} - -// Public functions implementation -std::unique_ptr -QueueFactory::createPosixAioQueue(int num_entries, nixl_xfer_op_t operation) { - return std::make_unique(num_entries, operation); -} - -std::unique_ptr QueueFactory::createUringQueue(int num_entries, nixl_xfer_op_t operation) { - return funcImpl::createUringQueue(num_entries, operation); -} - -std::unique_ptr -QueueFactory::createLinuxAioQueue(int num_entries, nixl_xfer_op_t operation) { -#ifdef HAVE_LINUXAIO - return std::make_unique(num_entries, operation); -#else - throw nixlPosixBackendReqH::exception( - "Attempting to create linux_aio queue when support is not compiled in", - NIXL_ERR_NOT_SUPPORTED); -#endif -} - -bool QueueFactory::isUringAvailable() { - return funcImpl::isUringAvailable(); -} - -bool -QueueFactory::isLinuxAioAvailable() { -#ifdef HAVE_LINUXAIO - return true; -#else - return false; -#endif -} diff --git a/src/plugins/posix/queue_factory_impl.h b/src/plugins/posix/queue_factory_impl.h deleted file mode 100644 index c9b667fd6..000000000 --- a/src/plugins/posix/queue_factory_impl.h +++ /dev/null @@ -1,39 +0,0 @@ -/* - * SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved. - * SPDX-License-Identifier: Apache-2.0 - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#ifndef QUEUE_FACTORY_IMPL_H -#define QUEUE_FACTORY_IMPL_H - -#include "posix_queue.h" - -namespace QueueFactory { -std::unique_ptr -createPosixAioQueue(int num_entries, nixl_xfer_op_t operation); - -std::unique_ptr -createUringQueue(int num_entries, nixl_xfer_op_t operation); - -std::unique_ptr -createLinuxAioQueue(int num_entries, nixl_xfer_op_t operation); - -bool -isLinuxAioAvailable(); -bool -isUringAvailable(); -}; // namespace QueueFactory - -#endif // QUEUE_FACTORY_IMPL_H diff --git a/src/plugins/posix/uring_queue.cpp b/src/plugins/posix/uring_queue.cpp deleted file mode 100644 index a36a70968..000000000 --- a/src/plugins/posix/uring_queue.cpp +++ /dev/null @@ -1,158 +0,0 @@ -/* - * SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved. - * SPDX-License-Identifier: Apache-2.0 - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#include "uring_queue.h" -#include -#include -#include -#include -#include -#include "absl/strings/str_format.h" -#include "absl/strings/str_join.h" -#include "common/nixl_log.h" - -namespace { - // Log completion percentage at regular intervals (every log_percent_step percent) - void logOnPercentStep(unsigned int completed, unsigned int total) { - constexpr unsigned int default_log_percent_step = 10; - static_assert (default_log_percent_step >= 1 && default_log_percent_step <= 100, - "log_percent_step must be in [1, 100]"); - unsigned int log_percent_step = total < 10 ? 1 : default_log_percent_step; - - if (total == 0) { - NIXL_ERROR << "Tried to log completion percentage with total == 0"; - return; - } - // Only log at each percentage step - if (completed % (total / log_percent_step) == 0) { - NIXL_DEBUG << absl::StrFormat("Queue progress: %.1f%% complete", - (completed * 100.0 / total)); - } - } - - std::string stringifyUringFeatures(unsigned int features) { - static const std::unordered_map feature_map = { - {IORING_FEAT_SQPOLL_NONFIXED, "SQPOLL"}, - {IORING_FEAT_FAST_POLL, "IOPOLL"} - }; - - std::vector enabled; - for (unsigned int bits = features; bits; bits &= (bits - 1)) { // step through each set bit - unsigned int bit = bits & -bits; // isolate lowest set bit - auto it = feature_map.find(bit); - if (it != feature_map.end()) { - enabled.push_back(it->second); - } - } - return enabled.empty() ? "none" : absl::StrJoin(enabled, ", "); - } -} - -nixl_status_t UringQueue::init(int entries, const io_uring_params& params) { - // Initialize with basic setup - need a mutable copy since the API modifies the params - io_uring_params mutable_params = params; - if (io_uring_queue_init_params(entries, &uring, &mutable_params) < 0) { - throw std::runtime_error(absl::StrFormat("Failed to initialize io_uring instance: %s", nixl_strerror(errno))); - } - - // Log the features supported by this io_uring instance - NIXL_INFO << absl::StrFormat("io_uring features: %s", stringifyUringFeatures(mutable_params.features)); - - return NIXL_SUCCESS; -} - -UringQueue::UringQueue(int num_entries, const io_uring_params& params, nixl_xfer_op_t operation) - : num_entries(num_entries) - , num_completed(0) - , prep_op(operation == NIXL_READ ? - reinterpret_cast(io_uring_prep_read) : - reinterpret_cast(io_uring_prep_write)) -{ - if (num_entries <= 0) { - throw std::invalid_argument("Invalid number of entries for UringQueue"); - } - - init(num_entries, params); -} - -UringQueue::~UringQueue() { - io_uring_queue_exit(&uring); -} - -nixl_status_t -UringQueue::submit (const nixl_meta_dlist_t &local, const nixl_meta_dlist_t &remote) { - for (auto [local_it, remote_it] = std::make_pair (local.begin(), remote.begin()); - local_it != local.end() && remote_it != remote.end(); - ++local_it, ++remote_it) { - int fd = remote_it->devId; - void *buf = reinterpret_cast (local_it->addr); - size_t len = local_it->len; - off_t offset = remote_it->addr; - - struct io_uring_sqe *sqe = io_uring_get_sqe (&uring); - if (!sqe) { - NIXL_ERROR << "Failed to get io_uring submission queue entry"; - return NIXL_ERR_BACKEND; - } - prep_op (sqe, fd, buf, len, offset); - } - - int ret = io_uring_submit(&uring); - if (ret != num_entries) { - if (ret < 0) { - NIXL_ERROR << absl::StrFormat("io_uring submit failed: %s", nixl_strerror(-ret)); - } else { - NIXL_ERROR << absl::StrFormat("io_uring submit failed. Partial submission: %d/%d", num_entries, ret); - } - return NIXL_ERR_BACKEND; - } - num_completed = 0; - return NIXL_IN_PROG; -} - -nixl_status_t UringQueue::checkCompleted() { - if (num_completed == num_entries) { - return NIXL_SUCCESS; - } - - // Process all available completions - struct io_uring_cqe* cqe; - unsigned head; - unsigned count = 0; - - // Get completion events - io_uring_for_each_cqe(&uring, head, cqe) { - int res = cqe->res; - if (res < 0) { - NIXL_ERROR << absl::StrFormat("IO operation failed: %s", nixl_strerror(-res)); - return NIXL_ERR_BACKEND; - } - count++; - } - - // Mark all seen - io_uring_cq_advance(&uring, count); - num_completed += count; - - logOnPercentStep(num_completed, num_entries); - - return (num_completed == num_entries) ? NIXL_SUCCESS : NIXL_IN_PROG; -} - -nixl_status_t UringQueue::prepIO(int fd, void* buf, size_t len, off_t offset) { - return NIXL_SUCCESS; -} diff --git a/src/plugins/posix/uring_queue.h b/src/plugins/posix/uring_queue.h deleted file mode 100644 index 61d66be37..000000000 --- a/src/plugins/posix/uring_queue.h +++ /dev/null @@ -1,56 +0,0 @@ -/* - * SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved. - * SPDX-License-Identifier: Apache-2.0 - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#ifndef URING_QUEUE_H -#define URING_QUEUE_H - -#include -#include "posix_queue.h" -#include - -// Forward declare Error class -class nixlPosixBackendReqH; - -// Type definition for io_uring prep functions -typedef void (*io_uring_prep_func_t)(struct io_uring_sqe*, int, const void*, unsigned int, __u64); - -class UringQueue : public nixlPosixQueue { - private: - struct io_uring uring; // The io_uring instance for async I/O operations - const int num_entries; // Total number of entries expected in this ring - int num_completed; // Number of completed operations so far - io_uring_prep_func_t prep_op; // Pointer to prep function - - // Initialize the queue with the given parameters - nixl_status_t init(int num_entries, const struct io_uring_params& params); - - // Delete copy and move operations to prevent accidental copying of kernel resources - UringQueue(const UringQueue&) = delete; - UringQueue& operator=(const UringQueue&) = delete; - UringQueue(UringQueue&&) = delete; - UringQueue& operator=(UringQueue&&) = delete; - - public: - UringQueue(int num_entries, const struct io_uring_params& params, nixl_xfer_op_t operation); - ~UringQueue(); - nixl_status_t - submit (const nixl_meta_dlist_t &local, const nixl_meta_dlist_t &remote) override; - nixl_status_t checkCompleted() override; - nixl_status_t prepIO(int fd, void* buf, size_t len, off_t offset) override; -}; - -#endif // URING_QUEUE_H