diff --git a/cpp/CMakeLists.txt b/cpp/CMakeLists.txt index a8897a27e2..96157e28c5 100644 --- a/cpp/CMakeLists.txt +++ b/cpp/CMakeLists.txt @@ -153,6 +153,7 @@ set(SOURCES "src/file_utils.cpp" "src/mmap.cpp" "src/detail/env.cpp" + "src/detail/io_uring.cpp" "src/detail/nvtx.cpp" "src/detail/posix_io.cpp" "src/shim/cuda.cpp" @@ -189,7 +190,7 @@ target_include_directories( target_link_libraries( kvikio PUBLIC Threads::Threads BS::thread_pool ${CMAKE_DL_LIBS} $ - PRIVATE $ + PRIVATE $ uring ) target_compile_definitions( diff --git a/cpp/include/kvikio/defaults.hpp b/cpp/include/kvikio/defaults.hpp index 190909c2cc..de1b011ec9 100644 --- a/cpp/include/kvikio/defaults.hpp +++ b/cpp/include/kvikio/defaults.hpp @@ -121,6 +121,8 @@ class defaults { std::vector _http_status_codes; bool _auto_direct_io_read; bool _auto_direct_io_write; + bool _io_uring_enabled; + unsigned int _io_uring_queue_depth; bool _thread_pool_per_block_device; static unsigned int get_num_threads_from_env(); @@ -396,6 +398,13 @@ class defaults { */ static void set_auto_direct_io_write(bool flag); + static bool io_uring_enabled(); + + static void set_io_uring_enabled(bool flag); + + static unsigned int io_uring_queue_depth(); + + static void set_io_uring_queue_depth(unsigned int io_uring_queue_depth); /** * @brief Check if per-block-device thread pools are enabled. * diff --git a/cpp/include/kvikio/detail/io_uring.hpp b/cpp/include/kvikio/detail/io_uring.hpp new file mode 100644 index 0000000000..5fe9516432 --- /dev/null +++ b/cpp/include/kvikio/detail/io_uring.hpp @@ -0,0 +1,42 @@ +/* + * SPDX-FileCopyrightText: Copyright (c) 2022-2025, NVIDIA CORPORATION. + * SPDX-License-Identifier: Apache-2.0 + */ +#pragma once + +#include +#include + +#include + +namespace kvikio::detail { + +class IoUringManager { + private: + IoUringManager(); + io_uring _ring{}; + unsigned int _queue_depth{}; + std::size_t _task_size{}; + + public: + static IoUringManager& get(); + ~IoUringManager() noexcept; + + IoUringManager(IoUringManager const&) = delete; + IoUringManager& operator=(IoUringManager const&) = delete; + IoUringManager(IoUringManager&&) = delete; + IoUringManager& operator=(IoUringManager&&) = delete; + + io_uring* ring() noexcept; + unsigned int queue_depth() noexcept; + std::size_t task_size() noexcept; +}; + +bool is_io_uring_supported() noexcept; + +std::size_t io_uring_read_host(int fd, void* buf, std::size_t size, std::size_t file_offset); + +std::size_t io_uring_read_device( + int fd, void* buf, std::size_t size, std::size_t file_offset, CUstream stream); + +} // namespace kvikio::detail diff --git a/cpp/src/defaults.cpp b/cpp/src/defaults.cpp index 841e7314d3..2900c41853 100644 --- a/cpp/src/defaults.cpp +++ b/cpp/src/defaults.cpp @@ -142,6 +142,11 @@ defaults::defaults() _auto_direct_io_write = getenv_or("KVIKIO_AUTO_DIRECT_IO_WRITE", true); } + { + _io_uring_enabled = getenv_or("KVIKIO_IO_URING_ENABLED", false); + _io_uring_queue_depth = getenv_or("KVIKIO_IO_URING_QUEUE_DEPTH", 32U); + } + // Determine the default value of `thread_pool_per_block_device` { _thread_pool_per_block_device = getenv_or("KVIKIO_THREAD_POOL_PER_BLOCK_DEVICE", false); @@ -244,6 +249,17 @@ bool defaults::auto_direct_io_write() { return instance()->_auto_direct_io_write void defaults::set_auto_direct_io_write(bool flag) { instance()->_auto_direct_io_write = flag; } +bool defaults::io_uring_enabled() { return instance()->_io_uring_enabled; } + +void defaults::set_io_uring_enabled(bool flag) { instance()->_io_uring_enabled = flag; } + +unsigned int defaults::io_uring_queue_depth() { return instance()->_io_uring_queue_depth; } + +void defaults::set_io_uring_queue_depth(unsigned int io_uring_queue_depth) +{ + instance()->_io_uring_queue_depth = io_uring_queue_depth; +} + bool defaults::thread_pool_per_block_device() { return instance()->_thread_pool_per_block_device; } void defaults::set_thread_pool_per_block_device(bool flag) diff --git a/cpp/src/detail/io_uring.cpp b/cpp/src/detail/io_uring.cpp new file mode 100644 index 0000000000..4ca81beb5b --- /dev/null +++ b/cpp/src/detail/io_uring.cpp @@ -0,0 +1,311 @@ +/* + * SPDX-FileCopyrightText: Copyright (c) 2025, NVIDIA CORPORATION. + * SPDX-License-Identifier: Apache-2.0 + */ + +// TODO: Refinements needed: +// - Bounce buffer size is made the same with task size +// - Ring in a singleton +// - Wait/submit error handling + +#include +#include +#include +#include +#include +#include +#include + +#define IO_URING_CHECK(err_code) \ + do { \ + kvikio::detail::check_io_uring_call(__LINE__, __FILE__, err_code); \ + } while (0) + +namespace kvikio::detail { +namespace { +inline void check_io_uring_call(int line_number, char const* filename, int err_code) +{ + // Success + if (err_code == 0) { + return; + } else if (err_code < 0) { + // On failure, io_uring API returns -errno + auto* msg = strerror(-err_code); + + std::stringstream ss; + ss << "Linux io_uring error (" << -err_code << ": " << msg << ") at: " << filename << ":" + << line_number; + + throw std::runtime_error(ss.str()); + } +} +} // namespace + +IoUringManager::IoUringManager() + : _queue_depth{defaults::io_uring_queue_depth()}, _task_size{defaults::task_size()} +{ + IO_URING_CHECK(io_uring_queue_init(_queue_depth, &_ring, 0)); +} + +IoUringManager& IoUringManager::get() +{ + static IoUringManager inst; + return inst; +} + +IoUringManager::~IoUringManager() noexcept +{ + // Does not have a return value + io_uring_queue_exit(&_ring); +} + +io_uring* IoUringManager::ring() noexcept { return &_ring; } + +unsigned int IoUringManager::queue_depth() noexcept { return _queue_depth; } + +std::size_t IoUringManager::task_size() noexcept { return _task_size; } + +bool is_io_uring_supported() noexcept +{ + try { + [[maybe_unused]] auto& inst = IoUringManager::get(); + } catch (...) { + return false; + } + return true; +} + +std::size_t io_uring_read_host(int fd, void* buf, std::size_t size, std::size_t file_offset) +{ + auto* ring = IoUringManager::get().ring(); + auto queue_depth = IoUringManager::get().queue_depth(); + auto task_size = IoUringManager::get().task_size(); + + std::size_t inflight_queues{0}; + std::size_t bytes_submitted{0}; + std::size_t bytes_completed{0}; + std::size_t current_offset{file_offset}; + bool has_error{false}; + int first_neg_err_code{0}; + + while (!has_error && bytes_completed < size) { + while (inflight_queues < queue_depth && bytes_submitted < size) { + auto* sqe = io_uring_get_sqe(ring); + + // Queue is full. Need to consume some CQEs. + if (sqe == nullptr) { break; } + + auto current_task_size = std::min(task_size, size - bytes_submitted); + + io_uring_prep_read( + sqe, fd, static_cast(buf) + bytes_submitted, current_task_size, current_offset); + // Ask the kernel to execute the SQE operation asynchronously + sqe->flags |= IOSQE_ASYNC; + + bytes_submitted += current_task_size; + current_offset += current_task_size; + ++inflight_queues; + } + + auto num_sqes_submitted = io_uring_submit(ring); + if (num_sqes_submitted < 0) { IO_URING_CHECK(-num_sqes_submitted); } + + // Wait for one completion event + struct io_uring_cqe* cqe{}; + IO_URING_CHECK(io_uring_wait_cqe(ring, &cqe)); + + // Process all completion events at this point + unsigned int head{0}; + unsigned int num_consumed{0}; + io_uring_for_each_cqe(ring, head, cqe) + { + if (cqe->res < 0) { + if (!has_error) { + has_error = true; + first_neg_err_code = cqe->res; + } + // Don't break here. Finish processing this batch of CQEs + } else { + bytes_completed += cqe->res; + } + + --inflight_queues; + ++num_consumed; + } + + // Mark completion events as consumed + io_uring_cq_advance(ring, num_consumed); + } + + // Drain the ring if async I/O encounters any error + if (has_error) { + while (inflight_queues > 0) { + struct io_uring_cqe* cqe{}; + IO_URING_CHECK(io_uring_wait_cqe(ring, &cqe)); + + unsigned int head{0}; + unsigned int num_consumed{0}; + io_uring_for_each_cqe(ring, head, cqe) + { + --inflight_queues; + ++num_consumed; + } + + io_uring_cq_advance(ring, num_consumed); + } + + IO_URING_CHECK(first_neg_err_code); + } + + KVIKIO_EXPECT(bytes_completed == bytes_submitted, + "Loss of data: submission and completion mismatch."); + + return bytes_completed; +} + +struct IoUringTaskCtx { + void* bounce_buffer{}; + void* src{}; + void* dst{}; + std::size_t size{}; +}; + +std::stack& task_ctx_pool() +{ + static auto task_ctx_objs = []() { + std::vector result(IoUringManager::get().queue_depth()); + for (auto&& task_ctx : result) { + void* buffer{}; + CUDA_DRIVER_TRY( + cudaAPI::instance().MemHostAlloc(&buffer, defaults::task_size(), CU_MEMHOSTALLOC_PORTABLE)); + task_ctx.bounce_buffer = buffer; + } + return result; + }(); + + static auto task_ctx_pool = [&]() { + std::stack result; + for (auto&& task_ctx : task_ctx_objs) { + result.push(&task_ctx); + } + return result; + }(); + return task_ctx_pool; +} + +std::size_t io_uring_read_device( + int fd, void* buf, std::size_t size, std::size_t file_offset, CUstream stream) +{ + auto* ring = IoUringManager::get().ring(); + auto queue_depth = IoUringManager::get().queue_depth(); + auto task_size = IoUringManager::get().task_size(); + + std::size_t inflight_queues{0}; + std::size_t bytes_submitted{0}; + std::size_t bytes_completed{0}; + std::size_t current_offset{file_offset}; + bool has_error{false}; + int first_neg_err_code{0}; + + while (!has_error && bytes_completed < size) { + // Only submit new operations if no error has occurred + while (inflight_queues < queue_depth && bytes_submitted < size) { + auto* sqe = io_uring_get_sqe(ring); + + // Queue is full. Need to consume some CQEs. + if (sqe == nullptr) { break; } + + auto current_task_size = std::min(task_size, size - bytes_submitted); + + auto* task_ctx = task_ctx_pool().top(); + KVIKIO_EXPECT(!task_ctx_pool().empty(), "Task context pool is empty unexpectedly."); + task_ctx_pool().pop(); + task_ctx->src = task_ctx->bounce_buffer; + task_ctx->dst = static_cast(buf) + bytes_submitted; + task_ctx->size = current_task_size; + + io_uring_prep_read(sqe, fd, task_ctx->bounce_buffer, current_task_size, current_offset); + // Ask the kernel to execute the SQE operation asynchronously + sqe->flags |= IOSQE_ASYNC; + + io_uring_sqe_set_data(sqe, task_ctx); + + bytes_submitted += current_task_size; + current_offset += current_task_size; + ++inflight_queues; + } + + if (inflight_queues == 0) { break; } + + auto num_sqes_submitted = io_uring_submit(ring); + if (num_sqes_submitted < 0) { IO_URING_CHECK(-num_sqes_submitted); } + + // Wait for one completion event + struct io_uring_cqe* cqe{}; + IO_URING_CHECK(io_uring_wait_cqe(ring, &cqe)); + + // Process all available completion events + unsigned int head{0}; + unsigned int num_consumed{0}; + io_uring_for_each_cqe(ring, head, cqe) + { + auto* task_ctx = static_cast(io_uring_cqe_get_data(cqe)); + + if (cqe->res < 0) { + if (!has_error) { + has_error = true; + first_neg_err_code = cqe->res; + } + // Don't break here. Finish processing this batch of CQEs + } else { + bytes_completed += cqe->res; + CUDA_DRIVER_TRY(cudaAPI::instance().MemcpyHtoDAsync( + convert_void2deviceptr(task_ctx->dst), task_ctx->src, task_ctx->size, stream)); + CUDA_DRIVER_TRY(cudaAPI::instance().StreamSynchronize(stream)); + } + task_ctx->src = nullptr; + task_ctx->dst = nullptr; + task_ctx->size = 0; + task_ctx_pool().push(task_ctx); + --inflight_queues; + ++num_consumed; + } + + // Mark completion events as consumed + io_uring_cq_advance(ring, num_consumed); + } + + // Drain the ring if async I/O encounters any error + if (has_error) { + while (inflight_queues > 0) { + struct io_uring_cqe* cqe{}; + IO_URING_CHECK(io_uring_wait_cqe(ring, &cqe)); + + unsigned int head{0}; + unsigned int num_consumed{0}; + io_uring_for_each_cqe(ring, head, cqe) + { + auto* task_ctx = static_cast(io_uring_cqe_get_data(cqe)); + task_ctx->src = nullptr; + task_ctx->dst = nullptr; + task_ctx->size = 0; + task_ctx_pool().push(task_ctx); + --inflight_queues; + ++num_consumed; + } + + io_uring_cq_advance(ring, num_consumed); + } + + IO_URING_CHECK(first_neg_err_code); + } + + std::stringstream ss; + ss << "Loss of data: submission (" << bytes_submitted << ") and completion (" << bytes_completed + << ") mismatch."; + KVIKIO_EXPECT(bytes_completed == bytes_submitted, ss.str()); + + return bytes_completed; +} + +} // namespace kvikio::detail diff --git a/cpp/src/file_handle.cpp b/cpp/src/file_handle.cpp index b978b49dc3..ab044effd3 100644 --- a/cpp/src/file_handle.cpp +++ b/cpp/src/file_handle.cpp @@ -16,6 +16,7 @@ #include #include +#include #include #include #include @@ -233,21 +234,46 @@ std::future FileHandle::pread(void* buf, auto& [nvtx_color, call_idx] = detail::get_next_color_and_call_idx(); KVIKIO_NVTX_FUNC_RANGE(size, nvtx_color); if (is_host_memory(buf)) { - auto op = [this](void* hostPtr_base, - std::size_t size, - std::size_t file_offset, - std::size_t hostPtr_offset) -> std::size_t { - char* buf = static_cast(hostPtr_base) + hostPtr_offset; - return detail::posix_host_read( - _file_direct_off.fd(), buf, size, file_offset, _file_direct_on.fd()); - }; - - return parallel_io( - op, buf, size, file_offset, task_size, 0, actual_thread_pool, call_idx, nvtx_color); + if (defaults::io_uring_enabled()) { + auto op = [this](void* host_ptr, + std::size_t size, + std::size_t file_offset, + std::size_t host_ptr_offset) -> std::size_t { + return detail::io_uring_read_host(_file_direct_off.fd(), host_ptr, size, file_offset); + }; + return detail::submit_task( + op, buf, size, file_offset, 0, actual_thread_pool, call_idx, nvtx_color); + } else { + auto op = [this](void* hostPtr_base, + std::size_t size, + std::size_t file_offset, + std::size_t hostPtr_offset) -> std::size_t { + char* buf = static_cast(hostPtr_base) + hostPtr_offset; + return detail::posix_host_read( + _file_direct_off.fd(), buf, size, file_offset, _file_direct_on.fd()); + }; + + return parallel_io( + op, buf, size, file_offset, task_size, 0, actual_thread_pool, call_idx, nvtx_color); + } } CUcontext ctx = get_context_from_pointer(buf); + if (defaults::io_uring_enabled()) { + auto op = [this, ctx](void* dev_ptr, + std::size_t size, + std::size_t file_offset, + std::size_t dev_ptr_offset) -> std::size_t { + PushAndPopContext c(ctx); + CUstream stream = detail::StreamsByThread::get(); + return detail::io_uring_read_device( + _file_direct_off.fd(), dev_ptr, size, file_offset, stream); + }; + return detail::submit_task( + op, buf, size, file_offset, 0, actual_thread_pool, call_idx, nvtx_color); + } + // Shortcut that circumvent the threadpool and use the POSIX backend directly. if (size < gds_threshold) { PushAndPopContext c(ctx); diff --git a/python/kvikio/tests/test_basic_io.py b/python/kvikio/tests/test_basic_io.py index 5be7b5e982..7ba183036f 100644 --- a/python/kvikio/tests/test_basic_io.py +++ b/python/kvikio/tests/test_basic_io.py @@ -91,12 +91,14 @@ def test_incorrect_open_mode_error(tmp_path, xp): a.tofile(filename) os.sync() + err_msg_psync = "Operation not permitted" + err_msg_io_uring = "Bad file descriptor" with kvikio.CuFile(filename, "r") as f: - with pytest.raises(RuntimeError, match="Operation not permitted"): + with pytest.raises(RuntimeError, match=f"{err_msg_psync}|{err_msg_io_uring}"): f.write(xp.arange(10)) with kvikio.CuFile(filename, "w") as f: - with pytest.raises(RuntimeError, match="Operation not permitted"): + with pytest.raises(RuntimeError, match=f"{err_msg_psync}|{err_msg_io_uring}"): f.read(xp.arange(10))