diff --git a/cpp/CMakeLists.txt b/cpp/CMakeLists.txt index 18f95f652b..7cfcb75728 100644 --- a/cpp/CMakeLists.txt +++ b/cpp/CMakeLists.txt @@ -153,6 +153,7 @@ set(SOURCES "src/shim/cufile.cpp" "src/shim/utils.cpp" "src/stream.cpp" + "src/threadpool_wrapper.cpp" "src/utils.cpp" ) diff --git a/cpp/include/kvikio/bounce_buffer.hpp b/cpp/include/kvikio/bounce_buffer.hpp index 5a7623a6a4..9f72228d2e 100644 --- a/cpp/include/kvikio/bounce_buffer.hpp +++ b/cpp/include/kvikio/bounce_buffer.hpp @@ -15,9 +15,11 @@ */ #pragma once +#include #include +#include -#include +#include namespace kvikio { @@ -33,7 +35,7 @@ class AllocRetain { // Stack of free allocations std::stack _free_allocs{}; // The size of each allocation in `_free_allocs` - std::size_t _size{defaults::bounce_buffer_size()}; + std::size_t _size{}; // Decouple this class from the defaults singleton. public: /** @@ -57,7 +59,7 @@ class AllocRetain { std::size_t size() noexcept; }; - AllocRetain() = default; + AllocRetain(); // Notice, we do not clear the allocations at destruction thus the allocations leaks // at exit. We do this because `AllocRetain::instance()` stores the allocations in a @@ -102,4 +104,72 @@ class AllocRetain { AllocRetain& operator=(AllocRetain&& o) = delete; }; +class BlockView; + +class Block { + private: + std::byte* _buffer{nullptr}; + std::size_t _bytes{0u}; + + public: + Block() = default; + ~Block() = default; + Block(Block const&) = delete; + Block& operator=(Block const&) = delete; + Block(Block&&) = default; + Block& operator=(Block&&) = default; + + void allocate(std::size_t bytes); + void deallocate(); + + BlockView make_view(std::size_t start_byte_idx, std::size_t bytes); + std::size_t size() const noexcept; + std::byte* data() const noexcept; +}; + +class BlockView { + private: + std::byte* _buffer{nullptr}; + std::size_t _bytes{0u}; + + public: + BlockView(std::byte* buffer, std::size_t bytes); + BlockView(BlockView const&) = default; + BlockView& operator=(BlockView const&) = default; + BlockView(BlockView&&) = default; + BlockView& operator=(BlockView&&) = default; + + std::size_t size() const noexcept; + std::byte* data() const noexcept; +}; + +class BounceBuffer { + private: + BounceBuffer() = default; + + std::size_t _requested_bytes_per_block{1024u * 1024u * 16u}; + std::size_t _num_blocks{4u}; + + inline static Block block_pool; + std::vector _blockviews_pool; + + Block _block; + std::vector _blockviews; + + public: + static BounceBuffer& instance(); + + static void preinitialize_for_pool(unsigned int num_threads, + std::size_t requested_bytes_per_block, + std::size_t num_blocks); + void initialize_per_thread(std::size_t requested_bytes_per_block, std::size_t num_blocks); + + BlockView get(); + + BounceBuffer(BounceBuffer const&) = delete; + BounceBuffer& operator=(BounceBuffer const&) = delete; + BounceBuffer(BounceBuffer&&) = delete; + BounceBuffer& operator=(BounceBuffer&&) = delete; +}; + } // namespace kvikio diff --git a/cpp/include/kvikio/defaults.hpp b/cpp/include/kvikio/defaults.hpp index d1e17f0b85..b895cf8fd5 100644 --- a/cpp/include/kvikio/defaults.hpp +++ b/cpp/include/kvikio/defaults.hpp @@ -62,16 +62,16 @@ std::vector getenv_or(std::string_view env_var_name, std::vector defau */ class defaults { private: - BS_thread_pool _thread_pool{get_num_threads_from_env()}; + std::unique_ptr _thread_pool; CompatMode _compat_mode; std::size_t _task_size; std::size_t _gds_threshold; std::size_t _bounce_buffer_size; + std::size_t _task_group_size; std::size_t _http_max_attempts; long _http_timeout; std::vector _http_status_codes; - - static unsigned int get_num_threads_from_env(); + std::function _worker_thread_init_func; defaults(); @@ -236,6 +236,9 @@ class defaults { */ static void set_bounce_buffer_size(std::size_t nbytes); + [[nodiscard]] static std::size_t task_group_size(); + static void set_task_group_size(std::size_t task_group_size); + /** * @brief Get the maximum number of attempts per remote IO read. * diff --git a/cpp/include/kvikio/nvtx.hpp b/cpp/include/kvikio/nvtx.hpp index d665af33c1..e71b4c81b1 100644 --- a/cpp/include/kvikio/nvtx.hpp +++ b/cpp/include/kvikio/nvtx.hpp @@ -123,6 +123,11 @@ class NvtxManager { NvtxManager() = default; }; +struct NvtxData { + std::uint64_t nvtx_payload{0ull}; + nvtx_color_type nvtx_color{NvtxManager::default_color()}; +}; + /** * @brief Convenience macro for generating an NVTX range in the `libkvikio` domain * from the lifetime of a function. diff --git a/cpp/include/kvikio/parallel_operation.hpp b/cpp/include/kvikio/parallel_operation.hpp index 33b02373c9..4a2535645d 100644 --- a/cpp/include/kvikio/parallel_operation.hpp +++ b/cpp/include/kvikio/parallel_operation.hpp @@ -67,12 +67,12 @@ auto make_copyable_lambda(F op) * * @return A pair of NVTX color and call index. */ -inline const std::pair get_next_color_and_call_idx() noexcept +inline NvtxData const get_nvtx_data() noexcept { static std::atomic_uint64_t call_counter{1ull}; auto call_idx = call_counter.fetch_add(1ull, std::memory_order_relaxed); auto& nvtx_color = NvtxManager::get_color_by_index(call_idx); - return {nvtx_color, call_idx}; + return {call_idx, nvtx_color}; } /** @@ -80,25 +80,13 @@ inline const std::pair get_next_color_and * * Both the callable and arguments shall satisfy copy-constructible. */ -template -std::future submit_task(F op, - T buf, - std::size_t size, - std::size_t file_offset, - std::size_t devPtr_offset, - std::uint64_t nvtx_payload = 0ull, - nvtx_color_type nvtx_color = NvtxManager::default_color()) +template +std::future submit_task(F op, NvtxData nvtx_data, Args... args) { - static_assert(std::is_invocable_r_v); - + static_assert(std::is_invocable_r_v); return defaults::thread_pool().submit_task([=] { - KVIKIO_NVTX_SCOPED_RANGE("task", nvtx_payload, nvtx_color); - return op(buf, size, file_offset, devPtr_offset); + KVIKIO_NVTX_SCOPED_RANGE("task group", nvtx_data.nvtx_payload, nvtx_data.nvtx_color); + return op(args...); }); } @@ -110,15 +98,12 @@ std::future submit_task(F op, * @return A future to be used later to check if the operation has finished its execution. */ template -std::future submit_move_only_task( - F op_move_only, - std::uint64_t nvtx_payload = 0ull, - nvtx_color_type nvtx_color = NvtxManager::default_color()) +std::future submit_move_only_task(F op_move_only, NvtxData nvtx_data) { static_assert(std::is_invocable_r_v); auto op_copyable = make_copyable_lambda(std::move(op_move_only)); return defaults::thread_pool().submit_task([=] { - KVIKIO_NVTX_SCOPED_RANGE("task", nvtx_payload, nvtx_color); + KVIKIO_NVTX_SCOPED_RANGE("task group", nvtx_data.nvtx_payload, nvtx_data.nvtx_color); return op_copyable(); }); } @@ -139,13 +124,12 @@ std::future submit_move_only_task( */ template std::future parallel_io(F op, + NvtxData nvtx_data, T buf, std::size_t size, std::size_t file_offset, std::size_t task_size, - std::size_t devPtr_offset, - std::uint64_t call_idx = 0, - nvtx_color_type nvtx_color = NvtxManager::default_color()) + std::size_t devPtr_offset) { KVIKIO_EXPECT(task_size > 0, "`task_size` must be positive", std::invalid_argument); static_assert(std::is_invocable_r_v parallel_io(F op, // Single-task guard if (task_size >= size || page_size >= size) { - return detail::submit_task(op, buf, size, file_offset, devPtr_offset, call_idx, nvtx_color); + return detail::submit_task(op, nvtx_data, buf, size, file_offset, devPtr_offset); } std::vector> tasks; @@ -165,8 +149,7 @@ std::future parallel_io(F op, // 1) Submit all tasks but the last one. These are all `task_size` sized tasks. while (size > task_size) { - tasks.push_back( - detail::submit_task(op, buf, task_size, file_offset, devPtr_offset, call_idx, nvtx_color)); + tasks.push_back(detail::submit_task(op, nvtx_data, buf, task_size, file_offset, devPtr_offset)); file_offset += task_size; devPtr_offset += task_size; size -= task_size; @@ -181,7 +164,73 @@ std::future parallel_io(F op, } return ret; }; - return detail::submit_move_only_task(std::move(last_task), call_idx, nvtx_color); + return detail::submit_move_only_task(std::move(last_task), nvtx_data); +} + +template +std::future parallel_io_for_task_group(F op, + NvtxData nvtx_data, + T buf, + std::size_t size, + std::size_t file_offset, + std::size_t task_size, + std::size_t devPtr_offset) +{ + KVIKIO_EXPECT(task_size > 0, "`task_size` must be positive", std::invalid_argument); + + // Single-task guard + if (task_size >= size || page_size >= size) { + return detail::submit_task(op, nvtx_data, buf, size, file_offset, devPtr_offset, true); + } + + auto const task_group_size = defaults::task_group_size(); + auto const task_group_bytes = defaults::task_size() * task_group_size; + std::vector> task_groups; + task_groups.reserve((size + task_group_bytes - 1) / task_group_bytes); + + // 1) Submit task groups + while (size > task_group_bytes) { + auto task_group = [=]() -> std::size_t { + auto current_file_offset{file_offset}; + auto current_devPtr_offset{devPtr_offset}; + for (std::size_t idx = 0; idx < task_group_size; ++idx) { + bool const current_sync_stream = (idx == task_group_size - 1) ? true : false; + op(buf, task_size, current_file_offset, current_devPtr_offset, current_sync_stream); + current_file_offset += task_size; + current_devPtr_offset += task_size; + } + return task_group_bytes; + }; + + task_groups.push_back(detail::submit_task(task_group, nvtx_data)); + file_offset += task_group_bytes; + devPtr_offset += task_group_bytes; + size -= task_group_bytes; + } + + // 2) Submit last task group for the remainder. + auto last_task_group = [=, task_groups = std::move(task_groups)]() mutable -> std::size_t { + auto const num_tasks = (size + task_size - 1) / task_size; + auto current_file_offset{file_offset}; + auto current_devPtr_offset{devPtr_offset}; + std::size_t ret{0}; + + while (size > task_size) { + ret += op(buf, task_size, current_file_offset, current_devPtr_offset, false); + current_file_offset += task_size; + current_devPtr_offset += task_size; + size -= task_size; + } + + ret += op(buf, size, current_file_offset, current_devPtr_offset, true); + + for (auto& task_group : task_groups) { + ret += task_group.get(); + } + return ret; + }; + + return detail::submit_move_only_task(std::move(last_task_group), nvtx_data); } } // namespace kvikio diff --git a/cpp/include/kvikio/posix_io.hpp b/cpp/include/kvikio/posix_io.hpp index 9f023a89b4..6a3e082eb6 100644 --- a/cpp/include/kvikio/posix_io.hpp +++ b/cpp/include/kvikio/posix_io.hpp @@ -133,31 +133,29 @@ std::size_t posix_device_io(int fd, void const* devPtr_base, std::size_t size, std::size_t file_offset, - std::size_t devPtr_offset) + std::size_t devPtr_offset, + CUstream stream) { - auto alloc = AllocRetain::instance().get(); + auto alloc = BounceBuffer::instance().get(); CUdeviceptr devPtr = convert_void2deviceptr(devPtr_base) + devPtr_offset; off_t cur_file_offset = convert_size2off(file_offset); off_t byte_remaining = convert_size2off(size); off_t const chunk_size2 = convert_size2off(alloc.size()); - // Get a stream for the current CUDA context and thread - CUstream stream = StreamsByThread::get(); - while (byte_remaining > 0) { off_t const nbytes_requested = std::min(chunk_size2, byte_remaining); ssize_t nbytes_got = nbytes_requested; if constexpr (Operation == IOOperationType::READ) { nbytes_got = posix_host_io( - fd, alloc.get(), nbytes_requested, cur_file_offset); - CUDA_DRIVER_TRY(cudaAPI::instance().MemcpyHtoDAsync(devPtr, alloc.get(), nbytes_got, stream)); - CUDA_DRIVER_TRY(cudaAPI::instance().StreamSynchronize(stream)); + fd, alloc.data(), nbytes_requested, cur_file_offset); + CUDA_DRIVER_TRY( + cudaAPI::instance().MemcpyHtoDAsync(devPtr, alloc.data(), nbytes_got, stream)); } else { // Is a write operation CUDA_DRIVER_TRY( - cudaAPI::instance().MemcpyDtoHAsync(alloc.get(), devPtr, nbytes_requested, stream)); + cudaAPI::instance().MemcpyDtoHAsync(alloc.data(), devPtr, nbytes_requested, stream)); CUDA_DRIVER_TRY(cudaAPI::instance().StreamSynchronize(stream)); posix_host_io( - fd, alloc.get(), nbytes_requested, cur_file_offset); + fd, alloc.data(), nbytes_requested, cur_file_offset); } cur_file_offset += nbytes_got; devPtr += nbytes_got; @@ -227,7 +225,8 @@ std::size_t posix_device_read(int fd, void const* devPtr_base, std::size_t size, std::size_t file_offset, - std::size_t devPtr_offset); + std::size_t devPtr_offset, + CUstream stream); /** * @brief Write device memory to disk using POSIX @@ -246,6 +245,7 @@ std::size_t posix_device_write(int fd, void const* devPtr_base, std::size_t size, std::size_t file_offset, - std::size_t devPtr_offset); + std::size_t devPtr_offset, + CUstream stream); } // namespace kvikio::detail diff --git a/cpp/include/kvikio/shim/cuda.hpp b/cpp/include/kvikio/shim/cuda.hpp index 9aaac08827..9698a8457e 100644 --- a/cpp/include/kvikio/shim/cuda.hpp +++ b/cpp/include/kvikio/shim/cuda.hpp @@ -48,6 +48,8 @@ class cudaAPI { decltype(cuStreamSynchronize)* StreamSynchronize{nullptr}; decltype(cuStreamCreate)* StreamCreate{nullptr}; decltype(cuStreamDestroy)* StreamDestroy{nullptr}; + decltype(cuDeviceGetCount)* DeviceGetCount{nullptr}; + decltype(cuDevicePrimaryCtxGetState)* DevicePrimaryCtxGetState{nullptr}; private: cudaAPI(); diff --git a/cpp/include/kvikio/threadpool_wrapper.hpp b/cpp/include/kvikio/threadpool_wrapper.hpp index cb4640c047..f6197e733b 100644 --- a/cpp/include/kvikio/threadpool_wrapper.hpp +++ b/cpp/include/kvikio/threadpool_wrapper.hpp @@ -20,7 +20,11 @@ #include +#include +#include #include +#include +#include namespace kvikio { @@ -33,7 +37,33 @@ class thread_pool_wrapper : public pool_type { * * @param nthreads The number of threads to use. */ - thread_pool_wrapper(unsigned int nthreads) : pool_type{nthreads, worker_thread_init_func} {} + thread_pool_wrapper(unsigned int nthreads, + std::size_t bounce_buffer_size, + std::size_t bounce_buffer_group_size) + : pool_type(nthreads, preinitialize(nthreads, bounce_buffer_size, bounce_buffer_group_size)) + { + } + + std::function preinitialize(unsigned int nthreads, + std::size_t bounce_buffer_size, + std::size_t bounce_buffer_group_size) + { + auto ctx = ensure_valid_current_context(); + BounceBuffer::preinitialize_for_pool(nthreads, bounce_buffer_size, bounce_buffer_group_size); + + auto worker_thread_init_func = [=] { + CUDA_DRIVER_TRY(cudaAPI::instance().CtxPushCurrent(ctx)); + + KVIKIO_NVTX_SCOPED_RANGE("worker thread init", 0, NvtxManager::default_color()); + // Rename the worker thread in the thread pool to improve clarity from nsys-ui. + // Note: This NVTX feature is currently not supported by nsys-ui. + NvtxManager::rename_current_thread("thread pool"); + + BounceBuffer::instance().initialize_per_thread(bounce_buffer_size, bounce_buffer_group_size); + }; + + return worker_thread_init_func; + } /** * @brief Reset the number of threads in the thread pool, and invoke a pre-defined initialization @@ -41,17 +71,37 @@ class thread_pool_wrapper : public pool_type { * * @param nthreads The number of threads to use. */ - void reset(unsigned int nthreads) { pool_type::reset(nthreads, worker_thread_init_func); } - - private: - inline static std::function worker_thread_init_func{[] { - KVIKIO_NVTX_SCOPED_RANGE("worker thread init", 0, NvtxManager::default_color()); - // Rename the worker thread in the thread pool to improve clarity from nsys-ui. - // Note: This NVTX feature is currently not supported by nsys-ui. - NvtxManager::rename_current_thread("thread pool"); - }}; + void reset(unsigned int nthreads, + std::size_t bounce_buffer_size, + std::size_t bounce_buffer_group_size) + { + // Block the calling thread until existing tasks in the thread pool are done. + // This avoids race condition where data (such as BounceBuffer::block_pool) still being used by + // the worker threads are modified in preinitialize(). + pool_type::wait(); + + auto worker_thread_init_func = + preinitialize(nthreads, bounce_buffer_size, bounce_buffer_group_size); + + pool_type::reset(nthreads, worker_thread_init_func); + } }; using BS_thread_pool = thread_pool_wrapper; +namespace this_thread { +template +bool is_from_pool(); + +template <> +bool is_from_pool(); + +template +std::optional index(); + +template <> +std::optional index(); + +} // namespace this_thread + } // namespace kvikio diff --git a/cpp/include/kvikio/utils.hpp b/cpp/include/kvikio/utils.hpp index a324c93292..5e98c282de 100644 --- a/cpp/include/kvikio/utils.hpp +++ b/cpp/include/kvikio/utils.hpp @@ -192,4 +192,6 @@ bool is_future_done(T const& future) return future.wait_for(std::chrono::seconds(0)) != std::future_status::timeout; } +CUcontext ensure_valid_current_context(); + } // namespace kvikio diff --git a/cpp/src/bounce_buffer.cpp b/cpp/src/bounce_buffer.cpp index 6a5efea2d6..78c52dea7d 100644 --- a/cpp/src/bounce_buffer.cpp +++ b/cpp/src/bounce_buffer.cpp @@ -16,14 +16,19 @@ #include #include +#include #include #include #include #include +#include +#include namespace kvikio { +AllocRetain::AllocRetain() : _size{defaults::bounce_buffer_size()} {} + AllocRetain::Alloc::Alloc(AllocRetain* manager, void* alloc, std::size_t size) : _manager(manager), _alloc{alloc}, _size{size} { @@ -105,4 +110,94 @@ AllocRetain& AllocRetain::instance() return _instance; } +void Block::allocate(std::size_t bytes) +{ + _bytes = bytes; + CUDA_DRIVER_TRY(cudaAPI::instance().MemHostAlloc( + reinterpret_cast(&_buffer), _bytes, CU_MEMHOSTALLOC_PORTABLE)); +} + +void Block::deallocate() +{ + CUDA_DRIVER_TRY(cudaAPI::instance().MemFreeHost(_buffer)); + _bytes = 0u; + _buffer = nullptr; +} + +BlockView Block::make_view(std::size_t start_byte_idx, std::size_t bytes) +{ + KVIKIO_EXPECT(start_byte_idx + bytes <= _bytes, "Block view out of bound.", std::runtime_error); + return BlockView(_buffer + start_byte_idx, bytes); +} + +std::size_t Block::size() const noexcept { return _bytes; } + +std::byte* Block::data() const noexcept { return _buffer; } + +BlockView::BlockView(std::byte* buffer, std::size_t bytes) : _buffer(buffer), _bytes(bytes) {} + +std::size_t BlockView::size() const noexcept { return _bytes; } + +std::byte* BlockView::data() const noexcept { return _buffer; } + +BounceBuffer& BounceBuffer::instance() +{ + thread_local BounceBuffer _instance; + return _instance; +} + +BlockView BounceBuffer::get() +{ + thread_local std::size_t current_idx{0}; + if (this_thread::is_from_pool()) { + if (current_idx >= _blockviews_pool.size()) { current_idx -= _blockviews_pool.size(); } + return _blockviews_pool[current_idx++]; + } else { + if (_blockviews.size() == 0) { + initialize_per_thread(defaults::bounce_buffer_size(), defaults::task_group_size()); + } + if (current_idx >= _blockviews.size()) { current_idx -= _blockviews.size(); } + return _blockviews[current_idx++]; + } +} + +void BounceBuffer::preinitialize_for_pool(unsigned int num_threads, + std::size_t requested_bytes_per_block, + std::size_t num_blocks) +{ + // Round up to the multiples of page size + std::size_t bytes_per_block = (requested_bytes_per_block + page_size - 1) & (~page_size + 1); + auto total_bytes = bytes_per_block * num_blocks * num_threads; + + block_pool.deallocate(); + block_pool.allocate(total_bytes); +} + +void BounceBuffer::initialize_per_thread(std::size_t requested_bytes_per_block, + std::size_t num_blocks) +{ + _requested_bytes_per_block = requested_bytes_per_block; + _num_blocks = num_blocks; + + // Round up to the multiples of page size + std::size_t bytes_per_block = (_requested_bytes_per_block + page_size - 1) & (~page_size + 1); + auto bytes_per_thread = bytes_per_block * _num_blocks; + + if (this_thread::is_from_pool()) { + _blockviews_pool.clear(); + std::size_t my_offset = this_thread::index().value() * bytes_per_thread; + for (std::size_t i = 0; i < _num_blocks; ++i) { + _blockviews_pool.emplace_back( + block_pool.make_view(my_offset + bytes_per_block * i, bytes_per_block)); + } + } else { + _blockviews.clear(); + _block.deallocate(); + _block.allocate(bytes_per_thread); + for (std::size_t i = 0; i < _num_blocks; ++i) { + _blockviews.emplace_back(_block.make_view(bytes_per_block * i, bytes_per_block)); + } + } +} + } // namespace kvikio diff --git a/cpp/src/defaults.cpp b/cpp/src/defaults.cpp index 0ae5a99eb0..06cdb8c5a4 100644 --- a/cpp/src/defaults.cpp +++ b/cpp/src/defaults.cpp @@ -23,11 +23,13 @@ #include +#include #include #include #include #include #include +#include namespace kvikio { template <> @@ -82,13 +84,6 @@ std::vector getenv_or(std::string_view env_var_name, std::vector defau return detail::parse_http_status_codes(env_var_name, int_str); } -unsigned int defaults::get_num_threads_from_env() -{ - int const ret = getenv_or("KVIKIO_NTHREADS", 1); - KVIKIO_EXPECT(ret > 0, "KVIKIO_NTHREADS has to be a positive integer", std::invalid_argument); - return ret; -} - defaults::defaults() { // Determine the default value of `compat_mode` @@ -136,6 +131,19 @@ defaults::defaults() _http_status_codes = getenv_or("KVIKIO_HTTP_STATUS_CODES", std::vector{429, 500, 502, 503, 504}); } + + { + ssize_t const env = getenv_or("TASK_GROUP_SIZE", 1U); + KVIKIO_EXPECT(env > 0, "TASK_GROUP_SIZE has to be a positive integer", std::invalid_argument); + _task_group_size = env; + } + + { + int const env = getenv_or("KVIKIO_NTHREADS", 1); + KVIKIO_EXPECT(env > 0, "KVIKIO_NTHREADS has to be a positive integer", std::invalid_argument); + _thread_pool = std::make_unique( + static_cast(env), _bounce_buffer_size, _task_group_size); + } } defaults* defaults::instance() @@ -167,7 +175,7 @@ bool defaults::is_compat_mode_preferred(CompatMode compat_mode) noexcept bool defaults::is_compat_mode_preferred() { return is_compat_mode_preferred(compat_mode()); } -BS_thread_pool& defaults::thread_pool() { return instance()->_thread_pool; } +BS_thread_pool& defaults::thread_pool() { return *instance()->_thread_pool; } unsigned int defaults::thread_pool_nthreads() { return thread_pool().get_thread_count(); } @@ -175,7 +183,7 @@ void defaults::set_thread_pool_nthreads(unsigned int nthreads) { KVIKIO_EXPECT( nthreads > 0, "number of threads must be a positive integer", std::invalid_argument); - thread_pool().reset(nthreads); + thread_pool().reset(nthreads, instance()->bounce_buffer_size(), instance()->task_group_size()); } std::size_t defaults::task_size() { return instance()->_task_size; } @@ -197,6 +205,18 @@ void defaults::set_bounce_buffer_size(std::size_t nbytes) KVIKIO_EXPECT( nbytes > 0, "size of the bounce buffer must be a positive integer", std::invalid_argument); instance()->_bounce_buffer_size = nbytes; + thread_pool().reset(instance()->thread_pool_nthreads(), nbytes, instance()->task_group_size()); +} + +std::size_t defaults::task_group_size() { return instance()->_task_group_size; } + +void defaults::set_task_group_size(std::size_t task_group_size) +{ + KVIKIO_EXPECT( + task_group_size > 0, "TASK_GROUP_SIZE has to be a positive integer", std::invalid_argument); + instance()->_task_group_size = task_group_size; + thread_pool().reset( + instance()->thread_pool_nthreads(), instance()->bounce_buffer_size(), task_group_size); } std::size_t defaults::http_max_attempts() { return instance()->_http_max_attempts; } diff --git a/cpp/src/file_handle.cpp b/cpp/src/file_handle.cpp index d3b671b17d..a0bb19f39d 100644 --- a/cpp/src/file_handle.cpp +++ b/cpp/src/file_handle.cpp @@ -29,6 +29,8 @@ #include #include #include +#include +#include "kvikio/shim/cuda.hpp" namespace kvikio { @@ -108,8 +110,11 @@ std::size_t FileHandle::read(void* devPtr_base, { KVIKIO_NVTX_SCOPED_RANGE("FileHandle::read()", size); if (get_compat_mode_manager().is_compat_mode_preferred()) { - return detail::posix_device_read( - _file_direct_off.fd(), devPtr_base, size, file_offset, devPtr_offset); + auto stream = detail::StreamsByThread::get(); + auto res = detail::posix_device_read( + _file_direct_off.fd(), devPtr_base, size, file_offset, devPtr_offset, stream); + CUDA_DRIVER_TRY(cudaAPI::instance().StreamSynchronize(stream)); + return res; } if (sync_default_stream) { CUDA_DRIVER_TRY(cudaAPI::instance().StreamSynchronize(nullptr)); } @@ -132,8 +137,11 @@ std::size_t FileHandle::write(void const* devPtr_base, _nbytes = 0; // Invalidate the computed file size if (get_compat_mode_manager().is_compat_mode_preferred()) { - return detail::posix_device_write( - _file_direct_off.fd(), devPtr_base, size, file_offset, devPtr_offset); + auto stream = detail::StreamsByThread::get(); + auto res = detail::posix_device_write( + _file_direct_off.fd(), devPtr_base, size, file_offset, devPtr_offset, stream); + CUDA_DRIVER_TRY(cudaAPI::instance().StreamSynchronize(stream)); + return res; } if (sync_default_stream) { CUDA_DRIVER_TRY(cudaAPI::instance().StreamSynchronize(nullptr)); } @@ -154,8 +162,9 @@ std::future FileHandle::pread(void* buf, std::size_t gds_threshold, bool sync_default_stream) { - auto& [nvtx_color, call_idx] = detail::get_next_color_and_call_idx(); - KVIKIO_NVTX_SCOPED_RANGE("FileHandle::pread()", size, nvtx_color); + auto const nvtx_data = detail::get_nvtx_data(); + KVIKIO_NVTX_SCOPED_RANGE("FileHandle::pread()", size, nvtx_data.nvtx_color); + if (is_host_memory(buf)) { auto op = [this](void* hostPtr_base, std::size_t size, @@ -166,7 +175,7 @@ std::future FileHandle::pread(void* buf, _file_direct_off.fd(), buf, size, file_offset); }; - return parallel_io(op, buf, size, file_offset, task_size, 0, call_idx, nvtx_color); + return parallel_io(op, nvtx_data, buf, size, file_offset, task_size, 0); } CUcontext ctx = get_context_from_pointer(buf); @@ -174,7 +183,10 @@ std::future FileHandle::pread(void* buf, // Shortcut that circumvent the threadpool and use the POSIX backend directly. if (size < gds_threshold) { PushAndPopContext c(ctx); - auto bytes_read = detail::posix_device_read(_file_direct_off.fd(), buf, size, file_offset, 0); + auto stream = detail::StreamsByThread::get(); + auto bytes_read = + detail::posix_device_read(_file_direct_off.fd(), buf, size, file_offset, 0, stream); + CUDA_DRIVER_TRY(cudaAPI::instance().StreamSynchronize(stream)); // Maintain API consistency while making this trivial case synchronous. // The result in the future is immediately available after the call. return make_ready_future(bytes_read); @@ -187,16 +199,26 @@ std::future FileHandle::pread(void* buf, } // Regular case that use the threadpool and run the tasks in parallel - auto task = [this, ctx](void* devPtr_base, - std::size_t size, - std::size_t file_offset, - std::size_t devPtr_offset) -> std::size_t { + auto task = [this, ctx, nvtx_data](void* devPtr_base, + std::size_t size, + std::size_t file_offset, + std::size_t devPtr_offset, + bool sync_stream) -> std::size_t { + KVIKIO_NVTX_SCOPED_RANGE("task", nvtx_data.nvtx_payload, nvtx_data.nvtx_color); PushAndPopContext c(ctx); - return read(devPtr_base, size, file_offset, devPtr_offset, /* sync_default_stream = */ false); + auto stream = detail::StreamsByThread::get(); + if (get_compat_mode_manager().is_compat_mode_preferred()) { + auto bytes_read = detail::posix_device_read( + _file_direct_off.fd(), devPtr_base, size, file_offset, devPtr_offset, stream); + if (sync_stream) { CUDA_DRIVER_TRY(cudaAPI::instance().StreamSynchronize(stream)); } + return bytes_read; + } else { + return read(devPtr_base, size, file_offset, devPtr_offset, /* sync_default_stream = */ false); + } }; auto [devPtr_base, base_size, devPtr_offset] = get_alloc_info(buf, &ctx); - return parallel_io( - task, devPtr_base, size, file_offset, task_size, devPtr_offset, call_idx, nvtx_color); + return parallel_io_for_task_group( + task, nvtx_data, devPtr_base, size, file_offset, task_size, devPtr_offset); } std::future FileHandle::pwrite(void const* buf, @@ -206,8 +228,8 @@ std::future FileHandle::pwrite(void const* buf, std::size_t gds_threshold, bool sync_default_stream) { - auto& [nvtx_color, call_idx] = detail::get_next_color_and_call_idx(); - KVIKIO_NVTX_SCOPED_RANGE("FileHandle::pwrite()", size, nvtx_color); + auto const nvtx_data = detail::get_nvtx_data(); + KVIKIO_NVTX_SCOPED_RANGE("FileHandle::pwrite()", size, nvtx_data.nvtx_color); if (is_host_memory(buf)) { auto op = [this](void const* hostPtr_base, std::size_t size, @@ -218,7 +240,7 @@ std::future FileHandle::pwrite(void const* buf, _file_direct_off.fd(), buf, size, file_offset); }; - return parallel_io(op, buf, size, file_offset, task_size, 0, call_idx, nvtx_color); + return parallel_io(op, nvtx_data, buf, size, file_offset, task_size, 0); } CUcontext ctx = get_context_from_pointer(buf); @@ -226,7 +248,10 @@ std::future FileHandle::pwrite(void const* buf, // Shortcut that circumvent the threadpool and use the POSIX backend directly. if (size < gds_threshold) { PushAndPopContext c(ctx); - auto bytes_write = detail::posix_device_write(_file_direct_off.fd(), buf, size, file_offset, 0); + auto stream = detail::StreamsByThread::get(); + auto bytes_write = + detail::posix_device_write(_file_direct_off.fd(), buf, size, file_offset, 0, stream); + CUDA_DRIVER_TRY(cudaAPI::instance().StreamSynchronize(stream)); // Maintain API consistency while making this trivial case synchronous. // The result in the future is immediately available after the call. return make_ready_future(bytes_write); @@ -247,8 +272,7 @@ std::future FileHandle::pwrite(void const* buf, return write(devPtr_base, size, file_offset, devPtr_offset, /* sync_default_stream = */ false); }; auto [devPtr_base, base_size, devPtr_offset] = get_alloc_info(buf, &ctx); - return parallel_io( - op, devPtr_base, size, file_offset, task_size, devPtr_offset, call_idx, nvtx_color); + return parallel_io(op, nvtx_data, devPtr_base, size, file_offset, task_size, devPtr_offset); } void FileHandle::read_async(void* devPtr_base, diff --git a/cpp/src/posix_io.cpp b/cpp/src/posix_io.cpp index ed149f5d43..f1440ad328 100644 --- a/cpp/src/posix_io.cpp +++ b/cpp/src/posix_io.cpp @@ -59,22 +59,24 @@ std::size_t posix_device_read(int fd, void const* devPtr_base, std::size_t size, std::size_t file_offset, - std::size_t devPtr_offset) + std::size_t devPtr_offset, + CUstream stream) { KVIKIO_NVTX_SCOPED_RANGE("posix_device_read()", size); return detail::posix_device_io( - fd, devPtr_base, size, file_offset, devPtr_offset); + fd, devPtr_base, size, file_offset, devPtr_offset, stream); } std::size_t posix_device_write(int fd, void const* devPtr_base, std::size_t size, std::size_t file_offset, - std::size_t devPtr_offset) + std::size_t devPtr_offset, + CUstream stream) { KVIKIO_NVTX_SCOPED_RANGE("posix_device_write()", size); return detail::posix_device_io( - fd, devPtr_base, size, file_offset, devPtr_offset); + fd, devPtr_base, size, file_offset, devPtr_offset, stream); } } // namespace kvikio::detail diff --git a/cpp/src/remote_handle.cpp b/cpp/src/remote_handle.cpp index 072e56cec1..bafb7e6c2d 100644 --- a/cpp/src/remote_handle.cpp +++ b/cpp/src/remote_handle.cpp @@ -394,7 +394,7 @@ std::future RemoteHandle::pread(void* buf, std::size_t file_offset, std::size_t task_size) { - auto& [nvtx_color, call_idx] = detail::get_next_color_and_call_idx(); + auto const nvtx_data = detail::get_nvtx_data(); KVIKIO_NVTX_SCOPED_RANGE("RemoteHandle::pread()", size); auto task = [this](void* devPtr_base, std::size_t size, @@ -402,7 +402,7 @@ std::future RemoteHandle::pread(void* buf, std::size_t devPtr_offset) -> std::size_t { return read(static_cast(devPtr_base) + devPtr_offset, size, file_offset); }; - return parallel_io(task, buf, size, file_offset, task_size, 0, call_idx, nvtx_color); + return parallel_io(task, nvtx_data, buf, size, file_offset, task_size, 0); } } // namespace kvikio diff --git a/cpp/src/shim/cuda.cpp b/cpp/src/shim/cuda.cpp index 9e5c05bc05..3886ea20b9 100644 --- a/cpp/src/shim/cuda.cpp +++ b/cpp/src/shim/cuda.cpp @@ -29,6 +29,7 @@ cudaAPI::cudaAPI() // if a project uses the `_v2` CUDA Driver API or the newest Runtime API, the symbols // loaded should also be the `_v2` symbols. Thus, we use KVIKIO_STRINGIFY() to get // the name of the symbol through cude.h. + get_symbol(Init, lib, KVIKIO_STRINGIFY(cuInit)); get_symbol(MemHostAlloc, lib, KVIKIO_STRINGIFY(cuMemHostAlloc)); get_symbol(MemFreeHost, lib, KVIKIO_STRINGIFY(cuMemFreeHost)); get_symbol(MemcpyHtoDAsync, lib, KVIKIO_STRINGIFY(cuMemcpyHtoDAsync)); @@ -47,6 +48,8 @@ cudaAPI::cudaAPI() get_symbol(StreamSynchronize, lib, KVIKIO_STRINGIFY(cuStreamSynchronize)); get_symbol(StreamCreate, lib, KVIKIO_STRINGIFY(cuStreamCreate)); get_symbol(StreamDestroy, lib, KVIKIO_STRINGIFY(cuStreamDestroy)); + get_symbol(DeviceGetCount, lib, KVIKIO_STRINGIFY(cuDeviceGetCount)); + get_symbol(DevicePrimaryCtxGetState, lib, KVIKIO_STRINGIFY(cuDevicePrimaryCtxGetState)); } #else cudaAPI::cudaAPI() { KVIKIO_FAIL("KvikIO not compiled with CUDA support", std::runtime_error); } diff --git a/cpp/src/threadpool_wrapper.cpp b/cpp/src/threadpool_wrapper.cpp new file mode 100644 index 0000000000..e370edac06 --- /dev/null +++ b/cpp/src/threadpool_wrapper.cpp @@ -0,0 +1,38 @@ +/* + * Copyright (c) 2025, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include +#include "BS_thread_pool.hpp" + +namespace kvikio { + +namespace this_thread { +template <> +bool is_from_pool() +{ + return BS::this_thread::get_pool().has_value(); +} + +template <> +std::optional index() +{ + return BS::this_thread::get_index(); +} + +} // namespace this_thread + +} // namespace kvikio diff --git a/cpp/src/utils.cpp b/cpp/src/utils.cpp index c065f2ab8c..7e96175f81 100644 --- a/cpp/src/utils.cpp +++ b/cpp/src/utils.cpp @@ -173,4 +173,37 @@ std::tuple get_alloc_info(void const* devPtr, C return std::make_tuple(reinterpret_cast(base_ptr), base_size, offset); } +CUcontext ensure_valid_current_context() +{ + CUDA_DRIVER_TRY(cudaAPI::instance().Init(0)); + + // If the context stack is non-empty, return the top. + CUcontext ctx{}; + CUDA_DRIVER_TRY(cudaAPI::instance().CtxGetCurrent(&ctx)); + if (ctx != nullptr) { return ctx; } + + // Otherwise, search all devices and return the first active primary context. + int num_devices{}; + CUDA_DRIVER_TRY(cudaAPI::instance().DeviceGetCount(&num_devices)); + for (int device_idx = 0; device_idx < num_devices; ++device_idx) { + CUdevice dev{}; + unsigned int flags{}; + int active{}; + CUDA_DRIVER_TRY(cudaAPI::instance().DeviceGet(&dev, device_idx)); + CUDA_DRIVER_TRY(cudaAPI::instance().DevicePrimaryCtxGetState(dev, &flags, &active)); + if (active) { + CUDA_DRIVER_TRY(cudaAPI::instance().DevicePrimaryCtxRetain(&ctx, dev)); + CUDA_DRIVER_TRY(cudaAPI::instance().CtxPushCurrent(ctx)); + return ctx; + } + } + + // Otherwise, retain the primary context on device 0. + CUdevice dev{}; + CUDA_DRIVER_TRY(cudaAPI::instance().DeviceGet(&dev, 0)); + CUDA_DRIVER_TRY(cudaAPI::instance().DevicePrimaryCtxRetain(&ctx, dev)); + CUDA_DRIVER_TRY(cudaAPI::instance().CtxPushCurrent(ctx)); + return ctx; +} + } // namespace kvikio diff --git a/python/kvikio/kvikio/_lib/defaults.pyx b/python/kvikio/kvikio/_lib/defaults.pyx index 00f1de4ec1..c7b4856f5c 100644 --- a/python/kvikio/kvikio/_lib/defaults.pyx +++ b/python/kvikio/kvikio/_lib/defaults.pyx @@ -37,10 +37,13 @@ cdef extern from "" namespace "kvikio" nogil: vector[int] cpp_http_status_codes "kvikio::defaults::http_status_codes"() except + void cpp_set_http_status_codes \ "kvikio::defaults::set_http_status_codes"(vector[int] status_codes) except + - long cpp_http_timeout "kvikio::defaults::http_timeout"() except + void cpp_set_http_timeout\ "kvikio::defaults::set_http_timeout"(long timeout_seconds) except + + size_t cpp_task_group_size\ + "kvikio::defaults::task_group_size"() except + + void cpp_set_task_group_size\ + "kvikio::defaults::set_task_group_size"(size_t) except + def is_compat_mode_preferred() -> bool: @@ -109,3 +112,11 @@ def http_status_codes() -> list[int]: def set_http_status_codes(status_codes: list[int]) -> None: return cpp_set_http_status_codes(status_codes) + + +def task_group_size() -> int: + return cpp_task_group_size() + + +def set_task_group_size(task_group_size: int) -> None: + cpp_set_task_group_size(task_group_size) diff --git a/python/kvikio/kvikio/defaults.py b/python/kvikio/kvikio/defaults.py index 9ee11a256d..7012aed661 100644 --- a/python/kvikio/kvikio/defaults.py +++ b/python/kvikio/kvikio/defaults.py @@ -56,6 +56,7 @@ def _property_getter_and_setter(self) -> tuple[dict[str, Any], dict[str, Any]]: "http_max_attempts", "http_status_codes", "http_timeout", + "task_group_size", ] property_getters = {}