From 0b8525aca63de64ac877ed16e0f8e5582077271f Mon Sep 17 00:00:00 2001 From: "Mads R. B. Kristensen" Date: Tue, 27 Aug 2024 15:01:14 +0200 Subject: [PATCH 1/2] Make the bounce buffer size configurable (#447) `KVIKIO_BOUNCE_BUFFER_SIZE` Authors: - Mads R. B. Kristensen (https://github.com/madsbk) Approvers: - Bradley Dice (https://github.com/bdice) URL: https://github.com/rapidsai/kvikio/pull/447 --- cpp/include/kvikio/bounce_buffer.hpp | 130 +++++++++++++++++++++++++ cpp/include/kvikio/defaults.hpp | 59 ++++++++++- cpp/include/kvikio/posix_io.hpp | 82 +--------------- python/kvikio/kvikio/_lib/defaults.pyx | 11 +++ python/kvikio/kvikio/defaults.py | 47 ++++++++- python/kvikio/tests/test_basic_io.py | 17 ++++ python/kvikio/tests/test_defaults.py | 42 +++++++- 7 files changed, 300 insertions(+), 88 deletions(-) create mode 100644 cpp/include/kvikio/bounce_buffer.hpp diff --git a/cpp/include/kvikio/bounce_buffer.hpp b/cpp/include/kvikio/bounce_buffer.hpp new file mode 100644 index 0000000000..77913afc5a --- /dev/null +++ b/cpp/include/kvikio/bounce_buffer.hpp @@ -0,0 +1,130 @@ +/* + * Copyright (c) 2024, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#pragma once + +#include +#include + +#include + +namespace kvikio { + +/** + * @brief Singleton class to retain host memory allocations + * + * Call `AllocRetain::get` to get an allocation that will be retained when it + * goes out of scope (RAII). The size of all retained allocations are the same. + */ +class AllocRetain { + private: + std::mutex _mutex{}; + // Stack of free allocations + std::stack _free_allocs{}; + // The size of each allocation in `_free_allocs` + std::size_t _size{defaults::bounce_buffer_size()}; + + public: + /** + * @brief An host memory allocation + */ + class Alloc { + private: + AllocRetain* _manager; + void* _alloc; + const std::size_t _size; + + public: + Alloc(AllocRetain* manager, void* alloc, std::size_t size) + : _manager(manager), _alloc{alloc}, _size{size} + { + } + Alloc(const Alloc&) = delete; + Alloc& operator=(Alloc const&) = delete; + Alloc(Alloc&& o) = delete; + Alloc& operator=(Alloc&& o) = delete; + ~Alloc() noexcept { _manager->put(_alloc, _size); } + void* get() noexcept { return _alloc; } + std::size_t size() noexcept { return _size; } + }; + + AllocRetain() = default; + ~AllocRetain() noexcept + { + try { + clear(); + } catch (const CUfileException& e) { + std::cerr << "~AllocRetain(): " << e.what() << std::endl; + } + } + + void clear() + { + while (!_free_allocs.empty()) { + CUDA_DRIVER_TRY(cudaAPI::instance().MemFreeHost(_free_allocs.top())); + _free_allocs.pop(); + } + } + + [[nodiscard]] Alloc get() + { + const std::lock_guard lock(_mutex); + if (_size != defaults::bounce_buffer_size()) { + clear(); // the desired allocation size has changed. + } + + // Check if we have an allocation available + if (!_free_allocs.empty()) { + void* ret = _free_allocs.top(); + _free_allocs.pop(); + return Alloc(this, ret, _size); + } + + // If no available allocation, allocate and register a new one + void* alloc{}; + // Allocate page-locked host memory + CUDA_DRIVER_TRY(cudaAPI::instance().MemHostAlloc(&alloc, _size, CU_MEMHOSTREGISTER_PORTABLE)); + return Alloc(this, alloc, _size); + } + + void put(void* alloc, std::size_t size) + { + const std::lock_guard lock(_mutex); + if (_size != defaults::bounce_buffer_size()) { + clear(); // the desired allocation size has changed. + } + + // If the size of `alloc` matches the sizes of the retained allocations, + // it is added to the set of free allocation otherwise it is freed. + if (size == _size) { + _free_allocs.push(alloc); + } else { + CUDA_DRIVER_TRY(cudaAPI::instance().MemFreeHost(alloc)); + } + } + + static AllocRetain& instance() + { + static AllocRetain _instance; + return _instance; + } + + AllocRetain(const AllocRetain&) = delete; + AllocRetain& operator=(AllocRetain const&) = delete; + AllocRetain(AllocRetain&& o) = delete; + AllocRetain& operator=(AllocRetain&& o) = delete; +}; + +} // namespace kvikio diff --git a/cpp/include/kvikio/defaults.hpp b/cpp/include/kvikio/defaults.hpp index c192763ccd..119197bfc4 100644 --- a/cpp/include/kvikio/defaults.hpp +++ b/cpp/include/kvikio/defaults.hpp @@ -83,11 +83,14 @@ class defaults { bool _compat_mode; std::size_t _task_size; std::size_t _gds_threshold; + std::size_t _bounce_buffer_size; static unsigned int get_num_threads_from_env() { const int ret = detail::getenv_or("KVIKIO_NTHREADS", 1); - if (ret <= 0) { throw std::invalid_argument("KVIKIO_NTHREADS has to be a positive integer"); } + if (ret <= 0) { + throw std::invalid_argument("KVIKIO_NTHREADS has to be a positive integer greater than zero"); + } return ret; } @@ -107,18 +110,28 @@ class defaults { { const ssize_t env = detail::getenv_or("KVIKIO_TASK_SIZE", 4 * 1024 * 1024); if (env <= 0) { - throw std::invalid_argument("KVIKIO_TASK_SIZE has to be a positive integer"); + throw std::invalid_argument( + "KVIKIO_TASK_SIZE has to be a positive integer greater than zero"); } _task_size = env; } // Determine the default value of `gds_threshold` { const ssize_t env = detail::getenv_or("KVIKIO_GDS_THRESHOLD", 1024 * 1024); - if (env <= 0) { + if (env < 0) { throw std::invalid_argument("KVIKIO_GDS_THRESHOLD has to be a positive integer"); } _gds_threshold = env; } + // Determine the default value of `bounce_buffer_size` + { + const ssize_t env = detail::getenv_or("KVIKIO_BOUNCE_BUFFER_SIZE", 16 * 1024 * 1024); + if (env <= 0) { + throw std::invalid_argument( + "KVIKIO_BOUNCE_BUFFER_SIZE has to be a positive integer greater than zero"); + } + _bounce_buffer_size = env; + } } static defaults* instance() @@ -191,7 +204,13 @@ class defaults { * * @param nthreads The number of threads to use. */ - static void thread_pool_nthreads_reset(unsigned int nthreads) { thread_pool().reset(nthreads); } + static void thread_pool_nthreads_reset(unsigned int nthreads) + { + if (nthreads == 0) { + throw std::invalid_argument("number of threads must be a positive integer greater than zero"); + } + thread_pool().reset(nthreads); + } /** * @brief Get the default task size used for parallel IO operations. @@ -208,7 +227,13 @@ class defaults { * * @param nbytes The default task size in bytes. */ - static void task_size_reset(std::size_t nbytes) { instance()->_task_size = nbytes; } + static void task_size_reset(std::size_t nbytes) + { + if (nbytes == 0) { + throw std::invalid_argument("task size must be a positive integer greater than zero"); + } + instance()->_task_size = nbytes; + } /** * @brief Get the default GDS threshold, which is the minimum size to use GDS (in bytes). @@ -228,6 +253,30 @@ class defaults { * @param nbytes The default GDS threshold size in bytes. */ static void gds_threshold_reset(std::size_t nbytes) { instance()->_gds_threshold = nbytes; } + + /** + * @brief Get the size of the bounce buffer used to stage data in host memory. + * + * Set the value using `kvikio::default::bounce_buffer_size_reset()` or by setting the + * `KVIKIO_BOUNCE_BUFFER_SIZE` environment variable. If not set, the value is 16 MiB. + * + * @return The bounce buffer size in bytes. + */ + [[nodiscard]] static std::size_t bounce_buffer_size() { return instance()->_bounce_buffer_size; } + + /** + * @brief Reset the size of the bounce buffer used to stage data in host memory. + * + * @param nbytes The bounce buffer size in bytes. + */ + static void bounce_buffer_size_reset(std::size_t nbytes) + { + if (nbytes == 0) { + throw std::invalid_argument( + "size of the bounce buffer must be a positive integer greater than zero"); + } + instance()->_bounce_buffer_size = nbytes; + } }; } // namespace kvikio diff --git a/cpp/include/kvikio/posix_io.hpp b/cpp/include/kvikio/posix_io.hpp index cba399a449..9e88a3e265 100644 --- a/cpp/include/kvikio/posix_io.hpp +++ b/cpp/include/kvikio/posix_io.hpp @@ -19,19 +19,15 @@ #include #include #include -#include -#include #include -#include +#include #include #include #include namespace kvikio { -inline constexpr std::size_t posix_bounce_buffer_size = 2 << 23; // 16 MiB - namespace detail { /** @@ -87,80 +83,6 @@ class StreamsByThread { StreamsByThread& operator=(StreamsByThread&& o) = delete; }; -/** - * @brief Singleton class to retain host memory allocations - * - * Call `AllocRetain::get` to get an allocation that will be retained when it - * goes out of scope (RAII). The size of all allocations are `posix_bounce_buffer_size`. - */ -class AllocRetain { - private: - std::stack _free_allocs; - std::mutex _mutex; - - public: - class Alloc { - private: - AllocRetain* _manager; - void* _alloc; - - public: - Alloc(AllocRetain* manager, void* alloc) : _manager(manager), _alloc{alloc} {} - Alloc(const Alloc&) = delete; - Alloc& operator=(Alloc const&) = delete; - Alloc(Alloc&& o) = delete; - Alloc& operator=(Alloc&& o) = delete; - ~Alloc() noexcept { _manager->put(_alloc); } - void* get() noexcept { return _alloc; } - }; - - AllocRetain() = default; - [[nodiscard]] Alloc get() - { - const std::lock_guard lock(_mutex); - // Check if we have an allocation available - if (!_free_allocs.empty()) { - void* ret = _free_allocs.top(); - _free_allocs.pop(); - return Alloc(this, ret); - } - - // If no available allocation, allocate and register a new one - void* alloc{}; - // Allocate page-locked host memory - CUDA_DRIVER_TRY(cudaAPI::instance().MemHostAlloc( - &alloc, posix_bounce_buffer_size, CU_MEMHOSTREGISTER_PORTABLE)); - return Alloc(this, alloc); - } - - void put(void* alloc) - { - const std::lock_guard lock(_mutex); - _free_allocs.push(alloc); - } - - void clear() - { - const std::lock_guard lock(_mutex); - while (!_free_allocs.empty()) { - CUDA_DRIVER_TRY(cudaAPI::instance().MemFreeHost(_free_allocs.top())); - _free_allocs.pop(); - } - } - - static AllocRetain& instance() - { - static AllocRetain _instance; - return _instance; - } - - AllocRetain(const AllocRetain&) = delete; - AllocRetain& operator=(AllocRetain const&) = delete; - AllocRetain(AllocRetain&& o) = delete; - AllocRetain& operator=(AllocRetain&& o) = delete; - ~AllocRetain() noexcept = default; -}; - /** * @brief Read or write host memory to or from disk using POSIX * @@ -230,7 +152,7 @@ std::size_t posix_device_io(int fd, CUdeviceptr devPtr = convert_void2deviceptr(devPtr_base) + devPtr_offset; off_t cur_file_offset = convert_size2off(file_offset); off_t byte_remaining = convert_size2off(size); - const off_t chunk_size2 = convert_size2off(posix_bounce_buffer_size); + const off_t chunk_size2 = convert_size2off(alloc.size()); // Get a stream for the current CUDA context and thread CUstream stream = StreamsByThread::get(); diff --git a/python/kvikio/kvikio/_lib/defaults.pyx b/python/kvikio/kvikio/_lib/defaults.pyx index 6ff1cd5997..f59cad5cb4 100644 --- a/python/kvikio/kvikio/_lib/defaults.pyx +++ b/python/kvikio/kvikio/_lib/defaults.pyx @@ -20,6 +20,9 @@ cdef extern from "" nogil: size_t cpp_gds_threshold "kvikio::defaults::gds_threshold"() except + void cpp_gds_threshold_reset \ "kvikio::defaults::gds_threshold_reset"(size_t nbytes) except + + size_t cpp_bounce_buffer_size "kvikio::defaults::bounce_buffer_size"() except + + void cpp_bounce_buffer_size_reset \ + "kvikio::defaults::bounce_buffer_size_reset"(size_t nbytes) except + def compat_mode() -> bool: @@ -52,3 +55,11 @@ def gds_threshold() -> int: def gds_threshold_reset(nbytes: int) -> None: cpp_gds_threshold_reset(nbytes) + + +def bounce_buffer_size() -> int: + return cpp_bounce_buffer_size() + + +def bounce_buffer_size_reset(nbytes: int) -> None: + cpp_bounce_buffer_size_reset(nbytes) diff --git a/python/kvikio/kvikio/defaults.py b/python/kvikio/kvikio/defaults.py index ce66cc70f4..2aaa2ffab8 100644 --- a/python/kvikio/kvikio/defaults.py +++ b/python/kvikio/kvikio/defaults.py @@ -163,8 +163,8 @@ def gds_threshold() -> int: backend directly. Set the default value using `gds_threshold_reset()` or by setting the - `KVIKIO_TASK_SIZE` environment variable. If not set, the default value - is 1 MiB. + `KVIKIO_GDS_THRESHOLD` environment variable. If not set, the default + value is 1 MiB. Return ------ @@ -200,3 +200,46 @@ def set_gds_threshold(nbytes: int): yield finally: gds_threshold_reset(old_value) + + +def bounce_buffer_size() -> int: + """Get the size of the bounce buffer used to stage data in host memory. + + Set the value using `bounce_buffer_size_reset()` or by setting the + `KVIKIO_BOUNCE_BUFFER_SIZE` environment variable. If not set, the + value is 16 MiB. + + Return + ------ + nbytes : int + The bounce buffer size in bytes. + """ + return kvikio._lib.defaults.bounce_buffer_size() + + +def bounce_buffer_size_reset(nbytes: int) -> None: + """Reset the size of the bounce buffer used to stage data in host memory. + + Parameters + ---------- + nbytes : int + The bounce buffer size in bytes. + """ + kvikio._lib.defaults.bounce_buffer_size_reset(nbytes) + + +@contextlib.contextmanager +def set_bounce_buffer_size(nbytes: int): + """Context for resetting the the size of the bounce buffer. + + Parameters + ---------- + nbytes : int + The bounce buffer size in bytes. + """ + old_value = bounce_buffer_size() + try: + bounce_buffer_size_reset(nbytes) + yield + finally: + bounce_buffer_size_reset(old_value) diff --git a/python/kvikio/tests/test_basic_io.py b/python/kvikio/tests/test_basic_io.py index 5b7d094565..4346af4f0e 100644 --- a/python/kvikio/tests/test_basic_io.py +++ b/python/kvikio/tests/test_basic_io.py @@ -256,3 +256,20 @@ def test_multiple_gpus(tmp_path, xp, gds_threshold): with cupy.cuda.Device(0): assert f.read(a1) == a1.nbytes assert bytes(a0) == bytes(a1) + + +@pytest.mark.parametrize("size", [1, 10, 100, 1000]) +@pytest.mark.parametrize("tasksize", [1, 10, 100, 1000]) +@pytest.mark.parametrize("buffer_size", [1, 10, 100, 1000]) +def test_different_bounce_buffer_sizes(tmp_path, size, tasksize, buffer_size): + """Test different bounce buffer sizes""" + filename = tmp_path / "test-file" + with kvikio.defaults.set_compat_mode(True), kvikio.defaults.set_num_threads(10): + with kvikio.defaults.set_task_size(tasksize): + with kvikio.defaults.set_bounce_buffer_size(buffer_size): + with kvikio.CuFile(filename, "w+") as f: + a = cupy.arange(size) + b = cupy.empty_like(a) + f.write(a) + assert f.read(b) == b.nbytes + cupy.testing.assert_array_equal(a, b) diff --git a/python/kvikio/tests/test_defaults.py b/python/kvikio/tests/test_defaults.py index 27b2d6187a..39892a784d 100644 --- a/python/kvikio/tests/test_defaults.py +++ b/python/kvikio/tests/test_defaults.py @@ -1,4 +1,4 @@ -# Copyright (c) 2022, NVIDIA CORPORATION. All rights reserved. +# Copyright (c) 2022-2024, NVIDIA CORPORATION. All rights reserved. # See file LICENSE for terms. @@ -32,6 +32,11 @@ def test_num_threads(): assert kvikio.defaults.get_num_threads() == 4 assert before == kvikio.defaults.get_num_threads() + with pytest.raises(ValueError, match="positive integer greater than zero"): + kvikio.defaults.num_threads_reset(0) + with pytest.raises(OverflowError, match="negative value"): + kvikio.defaults.num_threads_reset(-1) + def test_task_size(): """Test changing `task_size`""" @@ -42,3 +47,38 @@ def test_task_size(): kvikio.defaults.task_size_reset(4) assert kvikio.defaults.task_size() == 4 assert before == kvikio.defaults.task_size() + + with pytest.raises(ValueError, match="positive integer greater than zero"): + kvikio.defaults.task_size_reset(0) + with pytest.raises(OverflowError, match="negative value"): + kvikio.defaults.task_size_reset(-1) + + +def test_gds_threshold(): + """Test changing `gds_threshold`""" + + before = kvikio.defaults.gds_threshold() + with kvikio.defaults.set_gds_threshold(3): + assert kvikio.defaults.gds_threshold() == 3 + kvikio.defaults.gds_threshold_reset(4) + assert kvikio.defaults.gds_threshold() == 4 + assert before == kvikio.defaults.gds_threshold() + + with pytest.raises(OverflowError, match="negative value"): + kvikio.defaults.gds_threshold_reset(-1) + + +def test_bounce_buffer_size(): + """Test changing `bounce_buffer_size`""" + + before = kvikio.defaults.bounce_buffer_size() + with kvikio.defaults.set_bounce_buffer_size(3): + assert kvikio.defaults.bounce_buffer_size() == 3 + kvikio.defaults.bounce_buffer_size_reset(4) + assert kvikio.defaults.bounce_buffer_size() == 4 + assert before == kvikio.defaults.bounce_buffer_size() + + with pytest.raises(ValueError, match="positive integer greater than zero"): + kvikio.defaults.bounce_buffer_size_reset(0) + with pytest.raises(OverflowError, match="negative value"): + kvikio.defaults.bounce_buffer_size_reset(-1) From 3ce8bcc4da057f3c9fc7855d6297c73a448bc836 Mon Sep 17 00:00:00 2001 From: Kyle Edwards Date: Tue, 27 Aug 2024 12:51:49 -0400 Subject: [PATCH 2/2] Update rapidsai/pre-commit-hooks (#448) This PR updates rapidsai/pre-commit-hooks to the version 0.4.0. Authors: - Kyle Edwards (https://github.com/KyleFromNVIDIA) Approvers: - James Lamb (https://github.com/jameslamb) URL: https://github.com/rapidsai/kvikio/pull/448 --- .pre-commit-config.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 0d728e0799..0f60770658 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -82,7 +82,7 @@ repos: ^CHANGELOG.md$ ) - repo: https://github.com/rapidsai/pre-commit-hooks - rev: v0.3.1 + rev: v0.4.0 hooks: - id: verify-copyright files: |