Skip to content

Commit

Permalink
Make the bounce buffer size configurable (#447)
Browse files Browse the repository at this point in the history
`KVIKIO_BOUNCE_BUFFER_SIZE`

Authors:
  - Mads R. B. Kristensen (https://github.com/madsbk)

Approvers:
  - Bradley Dice (https://github.com/bdice)

URL: #447
  • Loading branch information
madsbk authored Aug 27, 2024
1 parent 8a1a1cd commit 0b8525a
Show file tree
Hide file tree
Showing 7 changed files with 300 additions and 88 deletions.
130 changes: 130 additions & 0 deletions cpp/include/kvikio/bounce_buffer.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
/*
* Copyright (c) 2024, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once

#include <mutex>
#include <stack>

#include <kvikio/defaults.hpp>

namespace kvikio {

/**
* @brief Singleton class to retain host memory allocations
*
* Call `AllocRetain::get` to get an allocation that will be retained when it
* goes out of scope (RAII). The size of all retained allocations are the same.
*/
class AllocRetain {
private:
std::mutex _mutex{};
// Stack of free allocations
std::stack<void*> _free_allocs{};
// The size of each allocation in `_free_allocs`
std::size_t _size{defaults::bounce_buffer_size()};

public:
/**
* @brief An host memory allocation
*/
class Alloc {
private:
AllocRetain* _manager;
void* _alloc;
const std::size_t _size;

public:
Alloc(AllocRetain* manager, void* alloc, std::size_t size)
: _manager(manager), _alloc{alloc}, _size{size}
{
}
Alloc(const Alloc&) = delete;
Alloc& operator=(Alloc const&) = delete;
Alloc(Alloc&& o) = delete;
Alloc& operator=(Alloc&& o) = delete;
~Alloc() noexcept { _manager->put(_alloc, _size); }
void* get() noexcept { return _alloc; }
std::size_t size() noexcept { return _size; }
};

AllocRetain() = default;
~AllocRetain() noexcept
{
try {
clear();
} catch (const CUfileException& e) {
std::cerr << "~AllocRetain(): " << e.what() << std::endl;
}
}

void clear()
{
while (!_free_allocs.empty()) {
CUDA_DRIVER_TRY(cudaAPI::instance().MemFreeHost(_free_allocs.top()));
_free_allocs.pop();
}
}

[[nodiscard]] Alloc get()
{
const std::lock_guard lock(_mutex);
if (_size != defaults::bounce_buffer_size()) {
clear(); // the desired allocation size has changed.
}

// Check if we have an allocation available
if (!_free_allocs.empty()) {
void* ret = _free_allocs.top();
_free_allocs.pop();
return Alloc(this, ret, _size);
}

// If no available allocation, allocate and register a new one
void* alloc{};
// Allocate page-locked host memory
CUDA_DRIVER_TRY(cudaAPI::instance().MemHostAlloc(&alloc, _size, CU_MEMHOSTREGISTER_PORTABLE));
return Alloc(this, alloc, _size);
}

void put(void* alloc, std::size_t size)
{
const std::lock_guard lock(_mutex);
if (_size != defaults::bounce_buffer_size()) {
clear(); // the desired allocation size has changed.
}

// If the size of `alloc` matches the sizes of the retained allocations,
// it is added to the set of free allocation otherwise it is freed.
if (size == _size) {
_free_allocs.push(alloc);
} else {
CUDA_DRIVER_TRY(cudaAPI::instance().MemFreeHost(alloc));
}
}

static AllocRetain& instance()
{
static AllocRetain _instance;
return _instance;
}

AllocRetain(const AllocRetain&) = delete;
AllocRetain& operator=(AllocRetain const&) = delete;
AllocRetain(AllocRetain&& o) = delete;
AllocRetain& operator=(AllocRetain&& o) = delete;
};

} // namespace kvikio
59 changes: 54 additions & 5 deletions cpp/include/kvikio/defaults.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -83,11 +83,14 @@ class defaults {
bool _compat_mode;
std::size_t _task_size;
std::size_t _gds_threshold;
std::size_t _bounce_buffer_size;

static unsigned int get_num_threads_from_env()
{
const int ret = detail::getenv_or("KVIKIO_NTHREADS", 1);
if (ret <= 0) { throw std::invalid_argument("KVIKIO_NTHREADS has to be a positive integer"); }
if (ret <= 0) {
throw std::invalid_argument("KVIKIO_NTHREADS has to be a positive integer greater than zero");
}
return ret;
}

Expand All @@ -107,18 +110,28 @@ class defaults {
{
const ssize_t env = detail::getenv_or("KVIKIO_TASK_SIZE", 4 * 1024 * 1024);
if (env <= 0) {
throw std::invalid_argument("KVIKIO_TASK_SIZE has to be a positive integer");
throw std::invalid_argument(
"KVIKIO_TASK_SIZE has to be a positive integer greater than zero");
}
_task_size = env;
}
// Determine the default value of `gds_threshold`
{
const ssize_t env = detail::getenv_or("KVIKIO_GDS_THRESHOLD", 1024 * 1024);
if (env <= 0) {
if (env < 0) {
throw std::invalid_argument("KVIKIO_GDS_THRESHOLD has to be a positive integer");
}
_gds_threshold = env;
}
// Determine the default value of `bounce_buffer_size`
{
const ssize_t env = detail::getenv_or("KVIKIO_BOUNCE_BUFFER_SIZE", 16 * 1024 * 1024);
if (env <= 0) {
throw std::invalid_argument(
"KVIKIO_BOUNCE_BUFFER_SIZE has to be a positive integer greater than zero");
}
_bounce_buffer_size = env;
}
}

static defaults* instance()
Expand Down Expand Up @@ -191,7 +204,13 @@ class defaults {
*
* @param nthreads The number of threads to use.
*/
static void thread_pool_nthreads_reset(unsigned int nthreads) { thread_pool().reset(nthreads); }
static void thread_pool_nthreads_reset(unsigned int nthreads)
{
if (nthreads == 0) {
throw std::invalid_argument("number of threads must be a positive integer greater than zero");
}
thread_pool().reset(nthreads);
}

/**
* @brief Get the default task size used for parallel IO operations.
Expand All @@ -208,7 +227,13 @@ class defaults {
*
* @param nbytes The default task size in bytes.
*/
static void task_size_reset(std::size_t nbytes) { instance()->_task_size = nbytes; }
static void task_size_reset(std::size_t nbytes)
{
if (nbytes == 0) {
throw std::invalid_argument("task size must be a positive integer greater than zero");
}
instance()->_task_size = nbytes;
}

/**
* @brief Get the default GDS threshold, which is the minimum size to use GDS (in bytes).
Expand All @@ -228,6 +253,30 @@ class defaults {
* @param nbytes The default GDS threshold size in bytes.
*/
static void gds_threshold_reset(std::size_t nbytes) { instance()->_gds_threshold = nbytes; }

/**
* @brief Get the size of the bounce buffer used to stage data in host memory.
*
* Set the value using `kvikio::default::bounce_buffer_size_reset()` or by setting the
* `KVIKIO_BOUNCE_BUFFER_SIZE` environment variable. If not set, the value is 16 MiB.
*
* @return The bounce buffer size in bytes.
*/
[[nodiscard]] static std::size_t bounce_buffer_size() { return instance()->_bounce_buffer_size; }

/**
* @brief Reset the size of the bounce buffer used to stage data in host memory.
*
* @param nbytes The bounce buffer size in bytes.
*/
static void bounce_buffer_size_reset(std::size_t nbytes)
{
if (nbytes == 0) {
throw std::invalid_argument(
"size of the bounce buffer must be a positive integer greater than zero");
}
instance()->_bounce_buffer_size = nbytes;
}
};

} // namespace kvikio
82 changes: 2 additions & 80 deletions cpp/include/kvikio/posix_io.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,19 +19,15 @@
#include <cstddef>
#include <cstdlib>
#include <map>
#include <mutex>
#include <stack>
#include <thread>

#include <cstring>
#include <kvikio/bounce_buffer.hpp>
#include <kvikio/error.hpp>
#include <kvikio/shim/cuda.hpp>
#include <kvikio/utils.hpp>

namespace kvikio {

inline constexpr std::size_t posix_bounce_buffer_size = 2 << 23; // 16 MiB

namespace detail {

/**
Expand Down Expand Up @@ -87,80 +83,6 @@ class StreamsByThread {
StreamsByThread& operator=(StreamsByThread&& o) = delete;
};

/**
* @brief Singleton class to retain host memory allocations
*
* Call `AllocRetain::get` to get an allocation that will be retained when it
* goes out of scope (RAII). The size of all allocations are `posix_bounce_buffer_size`.
*/
class AllocRetain {
private:
std::stack<void*> _free_allocs;
std::mutex _mutex;

public:
class Alloc {
private:
AllocRetain* _manager;
void* _alloc;

public:
Alloc(AllocRetain* manager, void* alloc) : _manager(manager), _alloc{alloc} {}
Alloc(const Alloc&) = delete;
Alloc& operator=(Alloc const&) = delete;
Alloc(Alloc&& o) = delete;
Alloc& operator=(Alloc&& o) = delete;
~Alloc() noexcept { _manager->put(_alloc); }
void* get() noexcept { return _alloc; }
};

AllocRetain() = default;
[[nodiscard]] Alloc get()
{
const std::lock_guard lock(_mutex);
// Check if we have an allocation available
if (!_free_allocs.empty()) {
void* ret = _free_allocs.top();
_free_allocs.pop();
return Alloc(this, ret);
}

// If no available allocation, allocate and register a new one
void* alloc{};
// Allocate page-locked host memory
CUDA_DRIVER_TRY(cudaAPI::instance().MemHostAlloc(
&alloc, posix_bounce_buffer_size, CU_MEMHOSTREGISTER_PORTABLE));
return Alloc(this, alloc);
}

void put(void* alloc)
{
const std::lock_guard lock(_mutex);
_free_allocs.push(alloc);
}

void clear()
{
const std::lock_guard lock(_mutex);
while (!_free_allocs.empty()) {
CUDA_DRIVER_TRY(cudaAPI::instance().MemFreeHost(_free_allocs.top()));
_free_allocs.pop();
}
}

static AllocRetain& instance()
{
static AllocRetain _instance;
return _instance;
}

AllocRetain(const AllocRetain&) = delete;
AllocRetain& operator=(AllocRetain const&) = delete;
AllocRetain(AllocRetain&& o) = delete;
AllocRetain& operator=(AllocRetain&& o) = delete;
~AllocRetain() noexcept = default;
};

/**
* @brief Read or write host memory to or from disk using POSIX
*
Expand Down Expand Up @@ -230,7 +152,7 @@ std::size_t posix_device_io(int fd,
CUdeviceptr devPtr = convert_void2deviceptr(devPtr_base) + devPtr_offset;
off_t cur_file_offset = convert_size2off(file_offset);
off_t byte_remaining = convert_size2off(size);
const off_t chunk_size2 = convert_size2off(posix_bounce_buffer_size);
const off_t chunk_size2 = convert_size2off(alloc.size());

// Get a stream for the current CUDA context and thread
CUstream stream = StreamsByThread::get();
Expand Down
11 changes: 11 additions & 0 deletions python/kvikio/kvikio/_lib/defaults.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@ cdef extern from "<kvikio/defaults.hpp>" nogil:
size_t cpp_gds_threshold "kvikio::defaults::gds_threshold"() except +
void cpp_gds_threshold_reset \
"kvikio::defaults::gds_threshold_reset"(size_t nbytes) except +
size_t cpp_bounce_buffer_size "kvikio::defaults::bounce_buffer_size"() except +
void cpp_bounce_buffer_size_reset \
"kvikio::defaults::bounce_buffer_size_reset"(size_t nbytes) except +


def compat_mode() -> bool:
Expand Down Expand Up @@ -52,3 +55,11 @@ def gds_threshold() -> int:

def gds_threshold_reset(nbytes: int) -> None:
cpp_gds_threshold_reset(nbytes)


def bounce_buffer_size() -> int:
return cpp_bounce_buffer_size()


def bounce_buffer_size_reset(nbytes: int) -> None:
cpp_bounce_buffer_size_reset(nbytes)
Loading

0 comments on commit 0b8525a

Please sign in to comment.