Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions cpp/include/kvikio/defaults.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ std::tuple<std::string_view, T, bool> getenv_or(
*/
class defaults {
private:
BS_thread_pool _thread_pool{get_num_threads_from_env()};
ThreadPool _thread_pool{get_num_threads_from_env()};
CompatMode _compat_mode;
std::size_t _task_size;
std::size_t _gds_threshold;
Expand Down Expand Up @@ -212,7 +212,7 @@ class defaults {
*
* @return The default thread pool instance.
*/
[[nodiscard]] static BS_thread_pool& thread_pool();
[[nodiscard]] static ThreadPool& thread_pool();

/**
* @brief Get the number of threads in the default thread pool.
Expand Down
92 changes: 92 additions & 0 deletions cpp/include/kvikio/detail/function_wrapper.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
/*
* SPDX-FileCopyrightText: Copyright (c) 2025, NVIDIA CORPORATION.
* SPDX-License-Identifier: Apache-2.0
*/

#pragma once

#include <functional>
#include <memory>

namespace kvikio::detail {
/**
* @brief Type-erased function wrapper that can hold a copyable or move-only callable with signature
* void(). Unlike std::function, this wrapper is move-only and cannot be copied.
*
* @todo Use small buffer optimization to avoid heap allocation for small callables.
* @note This class will be superseded by C++23's std::move_only_function.
*/
class FunctionWrapper {
private:
struct InnerBase {
virtual void operator()() = 0;

virtual ~InnerBase() = default;
};

template <typename F>
struct Inner : InnerBase {
using F_decay = std::decay_t<F>;
static_assert(std::is_invocable_r_v<void, F_decay>);

explicit Inner(F&& f) : _f(std::forward<F>(f)) {}

void operator()() override { std::invoke(_f); }

~Inner() override = default;

F_decay _f;
};

std::unique_ptr<InnerBase> _callable;

public:
/**
* @brief Construct a function wrapper from a callable object. The callable must be invocable with
* no arguments and return void. It can be either copyable or move-only (e.g., a lambda capturing
* std::unique_ptr).
*
* @tparam F Callable type.
* @param f Callable object to wrap. Will be moved or copied into the wrapper.
*/
template <typename F>
FunctionWrapper(F&& f) : _callable(std::make_unique<Inner<F>>(std::forward<F>(f)))
{
}

/**
* @brief Default constructor. Creates an empty wrapper with no callable target.
*/
FunctionWrapper() = default;

FunctionWrapper(FunctionWrapper&&) noexcept = default;
FunctionWrapper& operator=(FunctionWrapper&&) noexcept = default;

FunctionWrapper(const FunctionWrapper&) = delete;
FunctionWrapper& operator=(const FunctionWrapper&) = delete;

/**
* @brief Invoke the wrapped callable.
*
* @exception std::bad_function_call if the wrapper is empty (default-constructed or moved-from).
*/
void operator()()
{
if (!_callable) { throw std::bad_function_call(); }
_callable->operator()();
}

/**
* @brief Check whether the wrapper contains a callable target.
*
* @return true if the wrapper contains a callable, false if it is empty.
*/
explicit operator bool() const noexcept { return _callable != nullptr; }

/**
* @brief Reset the wrapper to an empty state, destroying the contained callable if any.
*/
void reset() noexcept { _callable.reset(); }
};

} // namespace kvikio::detail
22 changes: 16 additions & 6 deletions cpp/include/kvikio/detail/parallel_operation.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
#include <kvikio/defaults.hpp>
#include <kvikio/detail/nvtx.hpp>
#include <kvikio/error.hpp>
#include <kvikio/threadpool_wrapper.hpp>
#include <kvikio/utils.hpp>

namespace kvikio {
Expand Down Expand Up @@ -75,6 +76,7 @@ std::future<std::size_t> submit_task(F op,
std::size_t size,
std::size_t file_offset,
std::size_t devPtr_offset,
ThreadPool* thread_pool = &defaults::thread_pool(),
std::uint64_t nvtx_payload = 0ull,
nvtx_color_type nvtx_color = NvtxManager::default_color())
{
Expand All @@ -85,7 +87,7 @@ std::future<std::size_t> submit_task(F op,
decltype(file_offset),
decltype(devPtr_offset)>);

return defaults::thread_pool().submit_task([=] {
return thread_pool->submit_task([=] {
KVIKIO_NVTX_SCOPED_RANGE("task", nvtx_payload, nvtx_color);
return op(buf, size, file_offset, devPtr_offset);
});
Expand All @@ -101,12 +103,13 @@ std::future<std::size_t> submit_task(F op,
template <typename F>
std::future<std::size_t> submit_move_only_task(
F op_move_only,
ThreadPool* thread_pool = &defaults::thread_pool(),
std::uint64_t nvtx_payload = 0ull,
nvtx_color_type nvtx_color = NvtxManager::default_color())
{
static_assert(std::is_invocable_r_v<std::size_t, F>);
auto op_copyable = make_copyable_lambda(std::move(op_move_only));
return defaults::thread_pool().submit_task([=] {
return thread_pool->submit_task([=] {
KVIKIO_NVTX_SCOPED_RANGE("task", nvtx_payload, nvtx_color);
return op_copyable();
});
Expand All @@ -124,6 +127,10 @@ std::future<std::size_t> submit_move_only_task(
* @param size Number of bytes to read or write.
* @param file_offset Byte offset to the start of the file.
* @param task_size Size of each task in bytes.
* @param devPtr_offset Offset relative to the `devPtr_base` pointer. This parameter should be used
* only with registered buffers.
* @param thread_pool Thread pool to use for parallel execution. Defaults to the global default
* thread pool.
* @return A future to be used later to check if the operation has finished its execution.
*/
template <typename F, typename T>
Expand All @@ -133,10 +140,12 @@ std::future<std::size_t> parallel_io(F op,
std::size_t file_offset,
std::size_t task_size,
std::size_t devPtr_offset,
ThreadPool* thread_pool = &defaults::thread_pool(),
std::uint64_t call_idx = 0,
nvtx_color_type nvtx_color = NvtxManager::default_color())
{
KVIKIO_EXPECT(task_size > 0, "`task_size` must be positive", std::invalid_argument);
KVIKIO_EXPECT(thread_pool != nullptr, "The thread pool must not be nullptr");
static_assert(std::is_invocable_r_v<std::size_t,
decltype(op),
decltype(buf),
Expand All @@ -146,16 +155,17 @@ std::future<std::size_t> parallel_io(F op,

// Single-task guard
if (task_size >= size || get_page_size() >= size) {
return detail::submit_task(op, buf, size, file_offset, devPtr_offset, call_idx, nvtx_color);
return detail::submit_task(
op, buf, size, file_offset, devPtr_offset, thread_pool, call_idx, nvtx_color);
}

std::vector<std::future<std::size_t>> tasks;
tasks.reserve(size / task_size);

// 1) Submit all tasks but the last one. These are all `task_size` sized tasks.
while (size > task_size) {
tasks.push_back(
detail::submit_task(op, buf, task_size, file_offset, devPtr_offset, call_idx, nvtx_color));
tasks.push_back(detail::submit_task(
op, buf, task_size, file_offset, devPtr_offset, thread_pool, call_idx, nvtx_color));
file_offset += task_size;
devPtr_offset += task_size;
size -= task_size;
Expand All @@ -170,7 +180,7 @@ std::future<std::size_t> parallel_io(F op,
}
return ret;
};
return detail::submit_move_only_task(std::move(last_task), call_idx, nvtx_color);
return detail::submit_move_only_task(std::move(last_task), thread_pool, call_idx, nvtx_color);
}

} // namespace kvikio
11 changes: 9 additions & 2 deletions cpp/include/kvikio/file_handle.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#include <kvikio/shim/cufile.hpp>
#include <kvikio/shim/cufile_h_wrapper.hpp>
#include <kvikio/stream.hpp>
#include <kvikio/threadpool_wrapper.hpp>
#include <kvikio/utils.hpp>

namespace kvikio {
Expand Down Expand Up @@ -228,6 +229,8 @@ class FileHandle {
* in the null stream. When in KvikIO's compatibility mode or when accessing host memory, the
* operation is always default stream ordered like the rest of the non-async CUDA API. In this
* case, the value of `sync_default_stream` is ignored.
* @param thread_pool Thread pool to use for parallel execution. Defaults to the global default
* thread pool.
* @return Future that on completion returns the size of bytes that were successfully read.
*
* @note The `std::future` object's `wait()` or `get()` should not be called after the lifetime of
Expand All @@ -238,7 +241,8 @@ class FileHandle {
std::size_t file_offset = 0,
std::size_t task_size = defaults::task_size(),
std::size_t gds_threshold = defaults::gds_threshold(),
bool sync_default_stream = true);
bool sync_default_stream = true,
ThreadPool* thread_pool = &defaults::thread_pool());

/**
* @brief Writes specified bytes from device or host memory into the file in parallel.
Expand All @@ -265,6 +269,8 @@ class FileHandle {
* in the null stream. When in KvikIO's compatibility mode or when accessing host memory, the
* operation is always default stream ordered like the rest of the non-async CUDA API. In this
* case, the value of `sync_default_stream` is ignored.
* @param thread_pool Thread pool to use for parallel execution. Defaults to the global default
* thread pool.
* @return Future that on completion returns the size of bytes that were successfully written.
*
* @note The `std::future` object's `wait()` or `get()` should not be called after the lifetime of
Expand All @@ -275,7 +281,8 @@ class FileHandle {
std::size_t file_offset = 0,
std::size_t task_size = defaults::task_size(),
std::size_t gds_threshold = defaults::gds_threshold(),
bool sync_default_stream = true);
bool sync_default_stream = true,
ThreadPool* thread_pool = &defaults::thread_pool());

/**
* @brief Reads specified bytes from the file into the device memory asynchronously.
Expand Down
6 changes: 5 additions & 1 deletion cpp/include/kvikio/mmap.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

#include <kvikio/defaults.hpp>
#include <kvikio/file_handle.hpp>
#include <kvikio/threadpool_wrapper.hpp>
#include <optional>

namespace kvikio {
Expand Down Expand Up @@ -162,6 +163,8 @@ class MmapHandle {
* specified, read starts from `offset` to the end of file
* @param offset File offset
* @param task_size Size of each task in bytes
* @param thread_pool Thread pool to use for parallel execution. Defaults to the global default
* thread pool.
* @return Future that on completion returns the size of bytes that were successfully read.
*
* @exception std::out_of_range if the read region specified by `offset` and `size` is
Expand All @@ -174,7 +177,8 @@ class MmapHandle {
std::future<std::size_t> pread(void* buf,
std::optional<std::size_t> size = std::nullopt,
std::size_t offset = 0,
std::size_t task_size = defaults::task_size());
std::size_t task_size = defaults::task_size(),
ThreadPool* thread_pool = &defaults::thread_pool());
};

} // namespace kvikio
6 changes: 5 additions & 1 deletion cpp/include/kvikio/remote_handle.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@

#include <kvikio/defaults.hpp>
#include <kvikio/error.hpp>
#include <kvikio/threadpool_wrapper.hpp>
#include <kvikio/utils.hpp>

struct curl_slist;
Expand Down Expand Up @@ -452,12 +453,15 @@ class RemoteHandle {
* @param size Number of bytes to read.
* @param file_offset File offset in bytes.
* @param task_size Size of each task in bytes.
* @param thread_pool Thread pool to use for parallel execution. Defaults to the global default
* thread pool.
* @return Future that on completion returns the size of bytes read, which is always `size`.
*/
std::future<std::size_t> pread(void* buf,
std::size_t size,
std::size_t file_offset = 0,
std::size_t task_size = defaults::task_size());
std::size_t task_size = defaults::task_size(),
ThreadPool* thread_pool = &defaults::thread_pool());
};

} // namespace kvikio
Loading