Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions cpp/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,7 @@ set(SOURCES
"src/shim/cufile.cpp"
"src/shim/utils.cpp"
"src/stream.cpp"
"src/threadpool_wrapper.cpp"
"src/utils.cpp"
)

Expand Down
76 changes: 73 additions & 3 deletions cpp/include/kvikio/bounce_buffer.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,11 @@
*/
#pragma once

#include <mutex>
#include <stack>
#include <vector>

#include <kvikio/defaults.hpp>
#include <kvikio/shim/utils.hpp>

namespace kvikio {

Expand All @@ -33,7 +35,7 @@ class AllocRetain {
// Stack of free allocations
std::stack<void*> _free_allocs{};
// The size of each allocation in `_free_allocs`
std::size_t _size{defaults::bounce_buffer_size()};
std::size_t _size{}; // Decouple this class from the defaults singleton.

public:
/**
Expand All @@ -57,7 +59,7 @@ class AllocRetain {
std::size_t size() noexcept;
};

AllocRetain() = default;
AllocRetain();

// Notice, we do not clear the allocations at destruction thus the allocations leaks
// at exit. We do this because `AllocRetain::instance()` stores the allocations in a
Expand Down Expand Up @@ -102,4 +104,72 @@ class AllocRetain {
AllocRetain& operator=(AllocRetain&& o) = delete;
};

class BlockView;

class Block {
private:
std::byte* _buffer{nullptr};
std::size_t _bytes{0u};

public:
Block() = default;
~Block() = default;
Block(Block const&) = delete;
Block& operator=(Block const&) = delete;
Block(Block&&) = default;
Block& operator=(Block&&) = default;

void allocate(std::size_t bytes);
void deallocate();

BlockView make_view(std::size_t start_byte_idx, std::size_t bytes);
std::size_t size() const noexcept;
std::byte* data() const noexcept;
};

class BlockView {
private:
std::byte* _buffer{nullptr};
std::size_t _bytes{0u};

public:
BlockView(std::byte* buffer, std::size_t bytes);
BlockView(BlockView const&) = default;
BlockView& operator=(BlockView const&) = default;
BlockView(BlockView&&) = default;
BlockView& operator=(BlockView&&) = default;

std::size_t size() const noexcept;
std::byte* data() const noexcept;
};

class BounceBuffer {
private:
BounceBuffer() = default;

std::size_t _requested_bytes_per_block{1024u * 1024u * 16u};
std::size_t _num_blocks{4u};

inline static Block block_pool;
std::vector<BlockView> _blockviews_pool;

Block _block;
std::vector<BlockView> _blockviews;

public:
static BounceBuffer& instance();

static void preinitialize_for_pool(unsigned int num_threads,
std::size_t requested_bytes_per_block,
std::size_t num_blocks);
void initialize_per_thread(std::size_t requested_bytes_per_block, std::size_t num_blocks);

BlockView get();

BounceBuffer(BounceBuffer const&) = delete;
BounceBuffer& operator=(BounceBuffer const&) = delete;
BounceBuffer(BounceBuffer&&) = delete;
BounceBuffer& operator=(BounceBuffer&&) = delete;
};

} // namespace kvikio
9 changes: 6 additions & 3 deletions cpp/include/kvikio/defaults.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -62,16 +62,16 @@ std::vector<int> getenv_or(std::string_view env_var_name, std::vector<int> defau
*/
class defaults {
private:
BS_thread_pool _thread_pool{get_num_threads_from_env()};
std::unique_ptr<BS_thread_pool> _thread_pool;
CompatMode _compat_mode;
std::size_t _task_size;
std::size_t _gds_threshold;
std::size_t _bounce_buffer_size;
std::size_t _task_group_size;
std::size_t _http_max_attempts;
long _http_timeout;
std::vector<int> _http_status_codes;

static unsigned int get_num_threads_from_env();
std::function<void()> _worker_thread_init_func;

defaults();

Expand Down Expand Up @@ -236,6 +236,9 @@ class defaults {
*/
static void set_bounce_buffer_size(std::size_t nbytes);

[[nodiscard]] static std::size_t task_group_size();
static void set_task_group_size(std::size_t task_group_size);

/**
* @brief Get the maximum number of attempts per remote IO read.
*
Expand Down
5 changes: 5 additions & 0 deletions cpp/include/kvikio/nvtx.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,11 @@ class NvtxManager {
NvtxManager() = default;
};

struct NvtxData {
std::uint64_t nvtx_payload{0ull};
nvtx_color_type nvtx_color{NvtxManager::default_color()};
};

/**
* @brief Convenience macro for generating an NVTX range in the `libkvikio` domain
* from the lifetime of a function.
Expand Down
111 changes: 80 additions & 31 deletions cpp/include/kvikio/parallel_operation.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -67,38 +67,26 @@ auto make_copyable_lambda(F op)
*
* @return A pair of NVTX color and call index.
*/
inline const std::pair<const nvtx_color_type&, std::uint64_t> get_next_color_and_call_idx() noexcept
inline NvtxData const get_nvtx_data() noexcept
{
static std::atomic_uint64_t call_counter{1ull};
auto call_idx = call_counter.fetch_add(1ull, std::memory_order_relaxed);
auto& nvtx_color = NvtxManager::get_color_by_index(call_idx);
return {nvtx_color, call_idx};
return {call_idx, nvtx_color};
}

/**
* @brief Submit the task callable to the underlying thread pool.
*
* Both the callable and arguments shall satisfy copy-constructible.
*/
template <typename F, typename T>
std::future<std::size_t> submit_task(F op,
T buf,
std::size_t size,
std::size_t file_offset,
std::size_t devPtr_offset,
std::uint64_t nvtx_payload = 0ull,
nvtx_color_type nvtx_color = NvtxManager::default_color())
template <typename F, typename... Args>
std::future<std::size_t> submit_task(F op, NvtxData nvtx_data, Args... args)
{
static_assert(std::is_invocable_r_v<std::size_t,
decltype(op),
decltype(buf),
decltype(size),
decltype(file_offset),
decltype(devPtr_offset)>);

static_assert(std::is_invocable_r_v<std::size_t, F, Args...>);
return defaults::thread_pool().submit_task([=] {
KVIKIO_NVTX_SCOPED_RANGE("task", nvtx_payload, nvtx_color);
return op(buf, size, file_offset, devPtr_offset);
KVIKIO_NVTX_SCOPED_RANGE("task group", nvtx_data.nvtx_payload, nvtx_data.nvtx_color);
return op(args...);
});
}

Expand All @@ -110,15 +98,12 @@ std::future<std::size_t> submit_task(F op,
* @return A future to be used later to check if the operation has finished its execution.
*/
template <typename F>
std::future<std::size_t> submit_move_only_task(
F op_move_only,
std::uint64_t nvtx_payload = 0ull,
nvtx_color_type nvtx_color = NvtxManager::default_color())
std::future<std::size_t> submit_move_only_task(F op_move_only, NvtxData nvtx_data)
{
static_assert(std::is_invocable_r_v<std::size_t, F>);
auto op_copyable = make_copyable_lambda(std::move(op_move_only));
return defaults::thread_pool().submit_task([=] {
KVIKIO_NVTX_SCOPED_RANGE("task", nvtx_payload, nvtx_color);
KVIKIO_NVTX_SCOPED_RANGE("task group", nvtx_data.nvtx_payload, nvtx_data.nvtx_color);
return op_copyable();
});
}
Expand All @@ -139,13 +124,12 @@ std::future<std::size_t> submit_move_only_task(
*/
template <typename F, typename T>
std::future<std::size_t> parallel_io(F op,
NvtxData nvtx_data,
T buf,
std::size_t size,
std::size_t file_offset,
std::size_t task_size,
std::size_t devPtr_offset,
std::uint64_t call_idx = 0,
nvtx_color_type nvtx_color = NvtxManager::default_color())
std::size_t devPtr_offset)
{
KVIKIO_EXPECT(task_size > 0, "`task_size` must be positive", std::invalid_argument);
static_assert(std::is_invocable_r_v<std::size_t,
Expand All @@ -157,16 +141,15 @@ std::future<std::size_t> parallel_io(F op,

// Single-task guard
if (task_size >= size || page_size >= size) {
return detail::submit_task(op, buf, size, file_offset, devPtr_offset, call_idx, nvtx_color);
return detail::submit_task(op, nvtx_data, buf, size, file_offset, devPtr_offset);
}

std::vector<std::future<std::size_t>> tasks;
tasks.reserve(size / task_size);

// 1) Submit all tasks but the last one. These are all `task_size` sized tasks.
while (size > task_size) {
tasks.push_back(
detail::submit_task(op, buf, task_size, file_offset, devPtr_offset, call_idx, nvtx_color));
tasks.push_back(detail::submit_task(op, nvtx_data, buf, task_size, file_offset, devPtr_offset));
file_offset += task_size;
devPtr_offset += task_size;
size -= task_size;
Expand All @@ -181,7 +164,73 @@ std::future<std::size_t> parallel_io(F op,
}
return ret;
};
return detail::submit_move_only_task(std::move(last_task), call_idx, nvtx_color);
return detail::submit_move_only_task(std::move(last_task), nvtx_data);
}

template <typename F, typename T>
std::future<std::size_t> parallel_io_for_task_group(F op,
NvtxData nvtx_data,
T buf,
std::size_t size,
std::size_t file_offset,
std::size_t task_size,
std::size_t devPtr_offset)
{
KVIKIO_EXPECT(task_size > 0, "`task_size` must be positive", std::invalid_argument);

// Single-task guard
if (task_size >= size || page_size >= size) {
return detail::submit_task(op, nvtx_data, buf, size, file_offset, devPtr_offset, true);
}

auto const task_group_size = defaults::task_group_size();
auto const task_group_bytes = defaults::task_size() * task_group_size;
std::vector<std::future<std::size_t>> task_groups;
task_groups.reserve((size + task_group_bytes - 1) / task_group_bytes);

// 1) Submit task groups
while (size > task_group_bytes) {
auto task_group = [=]() -> std::size_t {
auto current_file_offset{file_offset};
auto current_devPtr_offset{devPtr_offset};
for (std::size_t idx = 0; idx < task_group_size; ++idx) {
bool const current_sync_stream = (idx == task_group_size - 1) ? true : false;
op(buf, task_size, current_file_offset, current_devPtr_offset, current_sync_stream);
current_file_offset += task_size;
current_devPtr_offset += task_size;
}
return task_group_bytes;
};

task_groups.push_back(detail::submit_task(task_group, nvtx_data));
file_offset += task_group_bytes;
devPtr_offset += task_group_bytes;
size -= task_group_bytes;
}

// 2) Submit last task group for the remainder.
auto last_task_group = [=, task_groups = std::move(task_groups)]() mutable -> std::size_t {
auto const num_tasks = (size + task_size - 1) / task_size;
auto current_file_offset{file_offset};
auto current_devPtr_offset{devPtr_offset};
std::size_t ret{0};

while (size > task_size) {
ret += op(buf, task_size, current_file_offset, current_devPtr_offset, false);
current_file_offset += task_size;
current_devPtr_offset += task_size;
size -= task_size;
}

ret += op(buf, size, current_file_offset, current_devPtr_offset, true);

for (auto& task_group : task_groups) {
ret += task_group.get();
}
return ret;
};

return detail::submit_move_only_task(std::move(last_task_group), nvtx_data);
}

} // namespace kvikio
24 changes: 12 additions & 12 deletions cpp/include/kvikio/posix_io.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -133,31 +133,29 @@ std::size_t posix_device_io(int fd,
void const* devPtr_base,
std::size_t size,
std::size_t file_offset,
std::size_t devPtr_offset)
std::size_t devPtr_offset,
CUstream stream)
{
auto alloc = AllocRetain::instance().get();
auto alloc = BounceBuffer::instance().get();
CUdeviceptr devPtr = convert_void2deviceptr(devPtr_base) + devPtr_offset;
off_t cur_file_offset = convert_size2off(file_offset);
off_t byte_remaining = convert_size2off(size);
off_t const chunk_size2 = convert_size2off(alloc.size());

// Get a stream for the current CUDA context and thread
CUstream stream = StreamsByThread::get();

while (byte_remaining > 0) {
off_t const nbytes_requested = std::min(chunk_size2, byte_remaining);
ssize_t nbytes_got = nbytes_requested;
if constexpr (Operation == IOOperationType::READ) {
nbytes_got = posix_host_io<IOOperationType::READ, PartialIO::YES>(
fd, alloc.get(), nbytes_requested, cur_file_offset);
CUDA_DRIVER_TRY(cudaAPI::instance().MemcpyHtoDAsync(devPtr, alloc.get(), nbytes_got, stream));
CUDA_DRIVER_TRY(cudaAPI::instance().StreamSynchronize(stream));
fd, alloc.data(), nbytes_requested, cur_file_offset);
CUDA_DRIVER_TRY(
cudaAPI::instance().MemcpyHtoDAsync(devPtr, alloc.data(), nbytes_got, stream));
} else { // Is a write operation
CUDA_DRIVER_TRY(
cudaAPI::instance().MemcpyDtoHAsync(alloc.get(), devPtr, nbytes_requested, stream));
cudaAPI::instance().MemcpyDtoHAsync(alloc.data(), devPtr, nbytes_requested, stream));
CUDA_DRIVER_TRY(cudaAPI::instance().StreamSynchronize(stream));
posix_host_io<IOOperationType::WRITE, PartialIO::NO>(
fd, alloc.get(), nbytes_requested, cur_file_offset);
fd, alloc.data(), nbytes_requested, cur_file_offset);
}
cur_file_offset += nbytes_got;
devPtr += nbytes_got;
Expand Down Expand Up @@ -227,7 +225,8 @@ std::size_t posix_device_read(int fd,
void const* devPtr_base,
std::size_t size,
std::size_t file_offset,
std::size_t devPtr_offset);
std::size_t devPtr_offset,
CUstream stream);

/**
* @brief Write device memory to disk using POSIX
Expand All @@ -246,6 +245,7 @@ std::size_t posix_device_write(int fd,
void const* devPtr_base,
std::size_t size,
std::size_t file_offset,
std::size_t devPtr_offset);
std::size_t devPtr_offset,
CUstream stream);

} // namespace kvikio::detail
2 changes: 2 additions & 0 deletions cpp/include/kvikio/shim/cuda.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ class cudaAPI {
decltype(cuStreamSynchronize)* StreamSynchronize{nullptr};
decltype(cuStreamCreate)* StreamCreate{nullptr};
decltype(cuStreamDestroy)* StreamDestroy{nullptr};
decltype(cuDeviceGetCount)* DeviceGetCount{nullptr};
decltype(cuDevicePrimaryCtxGetState)* DevicePrimaryCtxGetState{nullptr};

private:
cudaAPI();
Expand Down
Loading