Skip to content

Commit

Permalink
Merge branch 'branch-24.10' of https://github.com/rapidsai/kvikio int…
Browse files Browse the repository at this point in the history
…o s3_support
  • Loading branch information
madsbk committed Sep 3, 2024
2 parents 2bd411e + 8676e8d commit b40a0ad
Show file tree
Hide file tree
Showing 8 changed files with 124 additions and 43 deletions.
9 changes: 8 additions & 1 deletion cpp/doxygen/main_page.md
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,14 @@ KvikIO splits parallel IO operations into multiple tasks. Set the environment va
This setting can also be controlled by `defaults::task_size()` and `defaults::task_size_reset()`.

#### GDS Threshold (KVIKIO_GDS_THRESHOLD)
In order to improve performance of small IO, `.pread()` and `.pwrite()` implement a shortcut that circumvent the threadpool and use the POSIX backend directly. Set the environment variable `KVIKIO_GDS_THRESHOLD` to the minimum size (in bytes) to use GDS. If not set, the default value is 1048576 (1 MiB).
To improve performance of small IO requests, `.pread()` and `.pwrite()` implement a shortcut that circumvents the threadpool and uses the POSIX backend directly. Set the environment variable `KVIKIO_GDS_THRESHOLD` to the minimum size (in bytes) to use GDS. If not set, the default value is 1048576 (1 MiB).

This setting can also be controlled by `defaults::gds_threshold()` and `defaults::gds_threshold_reset()`.

#### Size of the Bounce Buffer (KVIKIO_GDS_THRESHOLD)
KvikIO might have to use intermediate host buffers (one per thread) when copying between files and device memory. Set the environment variable ``KVIKIO_BOUNCE_BUFFER_SIZE`` to the size (in bytes) of these "bounce" buffers. If not set, the default value is 16777216 (16 MiB).

This setting can also be controlled by `defaults::bounce_buffer_size()` and `defaults::bounce_buffer_size_reset()`.

This setting can also be controlled by `defaults::gds_threshold()` and `defaults::gds_threshold_reset()`.

Expand Down
57 changes: 35 additions & 22 deletions cpp/include/kvikio/bounce_buffer.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,22 +36,21 @@ class AllocRetain {
// The size of each allocation in `_free_allocs`
std::size_t _size{defaults::bounce_buffer_size()};

public:
/**
* @brief An host memory allocation
*/
class Alloc {
private:
AllocRetain* _manager;
void* _alloc;
const std::size_t _size;
std::size_t const _size;

public:
Alloc(AllocRetain* manager, void* alloc, std::size_t size)
: _manager(manager), _alloc{alloc}, _size{size}
{
}
Alloc(const Alloc&) = delete;
Alloc(Alloc const&) = delete;
Alloc& operator=(Alloc const&) = delete;
Alloc(Alloc&& o) = delete;
Alloc& operator=(Alloc&& o) = delete;
Expand All @@ -61,46 +60,49 @@ class AllocRetain {
};

AllocRetain() = default;
~AllocRetain() noexcept
{
try {
clear();
} catch (const CUfileException& e) {
std::cerr << "~AllocRetain(): " << e.what() << std::endl;
}
}

// Notice, we do not clear the allocations at destruction thus the allocations leaks
// at exit. We do this because `AllocRetain::instance()` stores the allocations in a
// static stack that are destructed below main, which is not allowed in CUDA:
// <https://docs.nvidia.com/cuda/cuda-c-programming-guide/index.html#initialization>
~AllocRetain() noexcept = default;

/**
* @brief Free all retained allocations
*
* NB: The `_mutex` must be taken prior to calling this function, if not called from the dtor.
* NB: The `_mutex` must be taken prior to calling this function.
*
* @return The number of bytes cleared
*/
void clear()
std::size_t _clear()
{
std::size_t ret = _free_allocs.size() * _size;
while (!_free_allocs.empty()) {
CUDA_DRIVER_TRY(cudaAPI::instance().MemFreeHost(_free_allocs.top()));
_free_allocs.pop();
}
return ret;
}

/**
* @brief Ensure the size of the retained allocations match `defaults::bounce_buffer_size()`
* @brief Ensure the sizes of the retained allocations match `defaults::bounce_buffer_size()`
*
* NB: `_mutex` must be taken prior to calling this function.
*/
void ensure_alloc_size()
void _ensure_alloc_size()
{
const auto bounce_buffer_size = defaults::bounce_buffer_size();
auto const bounce_buffer_size = defaults::bounce_buffer_size();
if (_size != bounce_buffer_size) {
_clear();
_size = bounce_buffer_size;
clear(); // the desired allocation size has changed.
}
}

public:
[[nodiscard]] Alloc get()
{
const std::lock_guard lock(_mutex);
ensure_alloc_size();
std::lock_guard const lock(_mutex);
_ensure_alloc_size();

// Check if we have an allocation available
if (!_free_allocs.empty()) {
Expand All @@ -118,8 +120,8 @@ class AllocRetain {

void put(void* alloc, std::size_t size)
{
const std::lock_guard lock(_mutex);
ensure_alloc_size();
std::lock_guard const lock(_mutex);
_ensure_alloc_size();

// If the size of `alloc` matches the sizes of the retained allocations,
// it is added to the set of free allocation otherwise it is freed.
Expand All @@ -130,13 +132,24 @@ class AllocRetain {
}
}

/**
* @brief Free all retained allocations
*
* @return The number of bytes cleared
*/
std::size_t clear()
{
std::lock_guard const lock(_mutex);
return _clear();
}

static AllocRetain& instance()
{
static AllocRetain _instance;
return _instance;
}

AllocRetain(const AllocRetain&) = delete;
AllocRetain(AllocRetain const&) = delete;
AllocRetain& operator=(AllocRetain const&) = delete;
AllocRetain(AllocRetain&& o) = delete;
AllocRetain& operator=(AllocRetain&& o) = delete;
Expand Down
2 changes: 1 addition & 1 deletion docs/source/runtime_settings.rst
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,6 @@ This setting can also be controlled by :py:func:`kvikio.defaults.gds_threshold`,

Size of the Bounce Buffer ``KVIKIO_BOUNCE_BUFFER_SIZE``
-------------------------------------------------------
KvikIO might have to use an intermediate host buffer when copying between file and device memory. Set the environment variable ``KVIKIO_BOUNCE_BUFFER_SIZE`` to size (in bytes) of this "bounce" buffer. If not set, the default value is 16777216 (16 MiB).
KvikIO might have to use intermediate host buffers (one per thread) when copying between files and device memory. Set the environment variable ``KVIKIO_BOUNCE_BUFFER_SIZE`` to the size (in bytes) of these "bounce" buffers. If not set, the default value is 16777216 (16 MiB).

This setting can also be controlled by :py:func:`kvikio.defaults.bounce_buffer_size`, :py:func:`kvikio.defaults.bounce_buffer_size_reset`, and :py:func:`kvikio.defaults.set_bounce_buffer_size`.
11 changes: 1 addition & 10 deletions python/kvikio/kvikio/__init__.py
Original file line number Diff line number Diff line change
@@ -1,20 +1,11 @@
# Copyright (c) 2021-2024, NVIDIA CORPORATION. All rights reserved.
# See file LICENSE for terms.

from kvikio._lib import buffer, driver_properties # type: ignore
from kvikio._lib import driver_properties # type: ignore
from kvikio._version import __git_commit__, __version__
from kvikio.cufile import CuFile
from kvikio.remote_file import RemoteFile, S3Context, is_remote_file_available


def memory_register(buf) -> None:
return buffer.memory_register(buf)


def memory_deregister(buf) -> None:
buffer.memory_deregister(buf)


# TODO: Wrap nicely, maybe as a dataclass?
DriverProperties = driver_properties.DriverProperties

Expand Down
10 changes: 9 additions & 1 deletion python/kvikio/kvikio/_lib/buffer.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from kvikio._lib.arr cimport Array


cdef extern from "<kvikio/buffer.hpp>" namespace "kvikio" nogil:
cdef extern from "<kvikio/buffer.hpp>" nogil:
void cpp_memory_register "kvikio::memory_register"(const void* devPtr) except +
void cpp_memory_deregister "kvikio::memory_deregister"(const void* devPtr) except +

Expand All @@ -25,3 +25,11 @@ def memory_deregister(buf) -> None:
buf = Array(buf)
cdef Array arr = buf
cpp_memory_deregister(<void*>arr.ptr)


cdef extern from "<kvikio/bounce_buffer.hpp>" nogil:
size_t cpp_alloc_retain_clear "kvikio::AllocRetain::instance().clear"() except +


def bounce_buffer_free() -> int:
return cpp_alloc_retain_clear()
17 changes: 9 additions & 8 deletions python/kvikio/kvikio/benchmarks/single_node_io.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
from dask.utils import format_bytes, parse_bytes

import kvikio
import kvikio.buffer
import kvikio.defaults
from kvikio.benchmarks.utils import parse_directory, pprint_sys_info

Expand All @@ -38,7 +39,7 @@ def run_cufile(args):
file_path = args.dir / "kvikio-single-file"
data = create_data(args.nbytes)
if args.pre_register_buffer:
kvikio.memory_register(data)
kvikio.buffer.memory_register(data)

# Write
f = kvikio.CuFile(file_path, flags="w")
Expand All @@ -57,7 +58,7 @@ def run_cufile(args):
assert res == args.nbytes, f"IO mismatch, expected {args.nbytes} got {res}"

if args.pre_register_buffer:
kvikio.memory_deregister(data)
kvikio.buffer.memory_deregister(data)

return read_time, write_time

Expand All @@ -73,7 +74,7 @@ def run_cufile_multiple_files_multiple_arrays(args):
arrays = [create_data(chunksize) for _ in range(args.nthreads)]
if args.pre_register_buffer:
for array in arrays:
kvikio.memory_register(array)
kvikio.buffer.memory_register(array)

# Write
files = [kvikio.CuFile(file_path % i, flags="w") for i in range(args.nthreads)]
Expand All @@ -95,7 +96,7 @@ def run_cufile_multiple_files_multiple_arrays(args):

if args.pre_register_buffer:
for array in arrays:
kvikio.memory_deregister(array)
kvikio.buffer.memory_deregister(array)

return read_time, write_time

Expand All @@ -108,7 +109,7 @@ def run_cufile_multiple_files(args):
file_path = str(args.dir / "cufile-p-%03d")
data = create_data(args.nbytes)
if args.pre_register_buffer:
kvikio.memory_register(data)
kvikio.buffer.memory_register(data)

# Write
files = [kvikio.CuFile(file_path % i, flags="w") for i in range(args.nthreads)]
Expand All @@ -133,7 +134,7 @@ def run_cufile_multiple_files(args):
assert res == args.nbytes, f"IO mismatch, expected {args.nbytes} got {res}"

if args.pre_register_buffer:
kvikio.memory_deregister(data)
kvikio.buffer.memory_deregister(data)

return read_time, write_time

Expand All @@ -149,7 +150,7 @@ def run_cufile_multiple_arrays(args):
arrays = [create_data(chunksize) for _ in range(args.nthreads)]
if args.pre_register_buffer:
for array in arrays:
kvikio.memory_register(array)
kvikio.buffer.memory_register(array)

# Write
f = kvikio.CuFile(file_path, flags="w")
Expand All @@ -174,7 +175,7 @@ def run_cufile_multiple_arrays(args):

if args.pre_register_buffer:
for array in arrays:
kvikio.memory_deregister(array)
kvikio.buffer.memory_deregister(array)

return read_time, write_time

Expand Down
41 changes: 41 additions & 0 deletions python/kvikio/kvikio/buffer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
# Copyright (c) 2024, NVIDIA CORPORATION. All rights reserved.
# See file LICENSE for terms.

from kvikio._lib import buffer # type: ignore


def memory_register(buf) -> None:
"""Register a device memory allocation with cuFile.
Warning
-------
This API is intended for usecases where the memory is used as a streaming
buffer that is reused across multiple cuFile IO operations.
Parameters
----------
buf: buffer-like or array-like
Device buffer to register .
"""
return buffer.memory_register(buf)


def memory_deregister(buf) -> None:
"""Deregister an already registered device memory from cuFile.
Parameters
----------
buf: buffer-like or array-like
Device buffer to deregister .
"""
buffer.memory_deregister(buf)


def bounce_buffer_free() -> int:
"""Free the host allocations used as bounce buffers.
Returns
-------
Number of bytes freed.
"""
return buffer.bounce_buffer_free()
20 changes: 20 additions & 0 deletions python/kvikio/tests/test_basic_io.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import pytest

import kvikio
import kvikio.buffer
import kvikio.defaults

cupy = pytest.importorskip("cupy")
Expand Down Expand Up @@ -273,3 +274,22 @@ def test_different_bounce_buffer_sizes(tmp_path, size, tasksize, buffer_size):
f.write(a)
assert f.read(b) == b.nbytes
cupy.testing.assert_array_equal(a, b)


def test_bounce_buffer_free(tmp_path):
"""Test freeing the bounce buffer allocations"""
filename = tmp_path / "test-file"
kvikio.buffer.bounce_buffer_free()
with kvikio.defaults.set_compat_mode(True), kvikio.defaults.set_num_threads(1):
with kvikio.CuFile(filename, "w") as f:
with kvikio.defaults.set_bounce_buffer_size(1024):
# Notice, since the bounce buffer size is only checked when the buffer
# is used, we populate the bounce buffer in between we clear it.
f.write(cupy.arange(10))
assert kvikio.buffer.bounce_buffer_free() == 1024
assert kvikio.buffer.bounce_buffer_free() == 0
f.write(cupy.arange(10))
with kvikio.defaults.set_bounce_buffer_size(2048):
f.write(cupy.arange(10))
assert kvikio.buffer.bounce_buffer_free() == 2048
assert kvikio.buffer.bounce_buffer_free() == 0

0 comments on commit b40a0ad

Please sign in to comment.