From 8676e8d5f24d8829ce24bcb00d0504ad8f843fbe Mon Sep 17 00:00:00 2001 From: "Mads R. B. Kristensen" Date: Mon, 2 Sep 2024 17:22:25 +0200 Subject: [PATCH] Non-static bounce buffer option (#454) closes #451 closes #452 Also some refactor/clean up Authors: - Mads R. B. Kristensen (https://github.com/madsbk) Approvers: - Lawrence Mitchell (https://github.com/wence-) URL: https://github.com/rapidsai/kvikio/pull/454 --- cpp/doxygen/main_page.md | 16 ++++- cpp/include/kvikio/bounce_buffer.hpp | 70 +++++++++++++------ docs/source/runtime_settings.rst | 12 ++++ python/kvikio/kvikio/__init__.py | 11 +-- python/kvikio/kvikio/_lib/buffer.pyx | 10 ++- .../kvikio/benchmarks/single_node_io.py | 17 ++--- python/kvikio/kvikio/buffer.py | 41 +++++++++++ python/kvikio/kvikio/defaults.py | 16 ++--- python/kvikio/tests/test_basic_io.py | 20 ++++++ 9 files changed, 164 insertions(+), 49 deletions(-) create mode 100644 python/kvikio/kvikio/buffer.py diff --git a/cpp/doxygen/main_page.md b/cpp/doxygen/main_page.md index 2b404f835e..1a109a9913 100644 --- a/cpp/doxygen/main_page.md +++ b/cpp/doxygen/main_page.md @@ -85,14 +85,28 @@ Set the environment variable `KVIKIO_COMPAT_MODE` to enable/disable compatibilit - when running in Windows Subsystem for Linux (WSL). - when `/run/udev` isn't readable, which typically happens when running inside a docker image not launched with `--volume /run/udev:/run/udev:ro`. +This setting can also be controlled by `defaults::compat_mode()` and `defaults::compat_mode_reset()`. + + #### Thread Pool (KVIKIO_NTHREADS) KvikIO can use multiple threads for IO automatically. Set the environment variable `KVIKIO_NTHREADS` to the number of threads in the thread pool. If not set, the default value is 1. +This setting can also be controlled by `defaults::thread_pool_nthreads()` and `defaults::thread_pool_nthreads_reset()`. + #### Task Size (KVIKIO_TASK_SIZE) KvikIO splits parallel IO operations into multiple tasks. Set the environment variable `KVIKIO_TASK_SIZE` to the maximum task size (in bytes). If not set, the default value is 4194304 (4 MiB). +This setting can also be controlled by `defaults::task_size()` and `defaults::task_size_reset()`. + #### GDS Threshold (KVIKIO_GDS_THRESHOLD) -In order to improve performance of small IO, `.pread()` and `.pwrite()` implement a shortcut that circumvent the threadpool and use the POSIX backend directly. Set the environment variable `KVIKIO_GDS_THRESHOLD` to the minimum size (in bytes) to use GDS. If not set, the default value is 1048576 (1 MiB). +To improve performance of small IO requests, `.pread()` and `.pwrite()` implement a shortcut that circumvents the threadpool and uses the POSIX backend directly. Set the environment variable `KVIKIO_GDS_THRESHOLD` to the minimum size (in bytes) to use GDS. If not set, the default value is 1048576 (1 MiB). + +This setting can also be controlled by `defaults::gds_threshold()` and `defaults::gds_threshold_reset()`. + +#### Size of the Bounce Buffer (KVIKIO_GDS_THRESHOLD) +KvikIO might have to use intermediate host buffers (one per thread) when copying between files and device memory. Set the environment variable ``KVIKIO_BOUNCE_BUFFER_SIZE`` to the size (in bytes) of these "bounce" buffers. If not set, the default value is 16777216 (16 MiB). + +This setting can also be controlled by `defaults::bounce_buffer_size()` and `defaults::bounce_buffer_size_reset()`. ## Example diff --git a/cpp/include/kvikio/bounce_buffer.hpp b/cpp/include/kvikio/bounce_buffer.hpp index 77913afc5a..8160be3c5c 100644 --- a/cpp/include/kvikio/bounce_buffer.hpp +++ b/cpp/include/kvikio/bounce_buffer.hpp @@ -36,7 +36,6 @@ class AllocRetain { // The size of each allocation in `_free_allocs` std::size_t _size{defaults::bounce_buffer_size()}; - public: /** * @brief An host memory allocation */ @@ -44,14 +43,14 @@ class AllocRetain { private: AllocRetain* _manager; void* _alloc; - const std::size_t _size; + std::size_t const _size; public: Alloc(AllocRetain* manager, void* alloc, std::size_t size) : _manager(manager), _alloc{alloc}, _size{size} { } - Alloc(const Alloc&) = delete; + Alloc(Alloc const&) = delete; Alloc& operator=(Alloc const&) = delete; Alloc(Alloc&& o) = delete; Alloc& operator=(Alloc&& o) = delete; @@ -61,29 +60,49 @@ class AllocRetain { }; AllocRetain() = default; - ~AllocRetain() noexcept - { - try { - clear(); - } catch (const CUfileException& e) { - std::cerr << "~AllocRetain(): " << e.what() << std::endl; - } - } - void clear() + // Notice, we do not clear the allocations at destruction thus the allocations leaks + // at exit. We do this because `AllocRetain::instance()` stores the allocations in a + // static stack that are destructed below main, which is not allowed in CUDA: + // + ~AllocRetain() noexcept = default; + + /** + * @brief Free all retained allocations + * + * NB: The `_mutex` must be taken prior to calling this function. + * + * @return The number of bytes cleared + */ + std::size_t _clear() { + std::size_t ret = _free_allocs.size() * _size; while (!_free_allocs.empty()) { CUDA_DRIVER_TRY(cudaAPI::instance().MemFreeHost(_free_allocs.top())); _free_allocs.pop(); } + return ret; } - [[nodiscard]] Alloc get() + /** + * @brief Ensure the sizes of the retained allocations match `defaults::bounce_buffer_size()` + * + * NB: `_mutex` must be taken prior to calling this function. + */ + void _ensure_alloc_size() { - const std::lock_guard lock(_mutex); - if (_size != defaults::bounce_buffer_size()) { - clear(); // the desired allocation size has changed. + auto const bounce_buffer_size = defaults::bounce_buffer_size(); + if (_size != bounce_buffer_size) { + _clear(); + _size = bounce_buffer_size; } + } + + public: + [[nodiscard]] Alloc get() + { + std::lock_guard const lock(_mutex); + _ensure_alloc_size(); // Check if we have an allocation available if (!_free_allocs.empty()) { @@ -101,10 +120,8 @@ class AllocRetain { void put(void* alloc, std::size_t size) { - const std::lock_guard lock(_mutex); - if (_size != defaults::bounce_buffer_size()) { - clear(); // the desired allocation size has changed. - } + std::lock_guard const lock(_mutex); + _ensure_alloc_size(); // If the size of `alloc` matches the sizes of the retained allocations, // it is added to the set of free allocation otherwise it is freed. @@ -115,13 +132,24 @@ class AllocRetain { } } + /** + * @brief Free all retained allocations + * + * @return The number of bytes cleared + */ + std::size_t clear() + { + std::lock_guard const lock(_mutex); + return _clear(); + } + static AllocRetain& instance() { static AllocRetain _instance; return _instance; } - AllocRetain(const AllocRetain&) = delete; + AllocRetain(AllocRetain const&) = delete; AllocRetain& operator=(AllocRetain const&) = delete; AllocRetain(AllocRetain&& o) = delete; AllocRetain& operator=(AllocRetain&& o) = delete; diff --git a/docs/source/runtime_settings.rst b/docs/source/runtime_settings.rst index 2d03eb2f87..631ba0c937 100644 --- a/docs/source/runtime_settings.rst +++ b/docs/source/runtime_settings.rst @@ -10,17 +10,29 @@ Set the environment variable ``KVIKIO_COMPAT_MODE`` to enable/disable compatibil * when running in Windows Subsystem for Linux (WSL). * when ``/run/udev`` isn't readable, which typically happens when running inside a docker image not launched with ``--volume /run/udev:/run/udev:ro``. +This setting can also be controlled by :py:func:`kvikio.defaults.compat_mode`, :py:func:`kvikio.defaults.compat_mode_reset`, and :py:func:`kvikio.defaults.set_compat_mode`. + Thread Pool ``KVIKIO_NTHREADS`` ------------------------------- KvikIO can use multiple threads for IO automatically. Set the environment variable ``KVIKIO_NTHREADS`` to the number of threads in the thread pool. If not set, the default value is 1. +This setting can also be controlled by :py:func:`kvikio.defaults.get_num_threads`, :py:func:`kvikio.defaults.num_threads_reset`, and :py:func:`kvikio.defaults.set_num_threads`. Task Size ``KVIKIO_TASK_SIZE`` ------------------------------ KvikIO splits parallel IO operations into multiple tasks. Set the environment variable ``KVIKIO_TASK_SIZE`` to the maximum task size (in bytes). If not set, the default value is 4194304 (4 MiB). +This setting can also be controlled by :py:func:`kvikio.defaults.task_size`, :py:func:`kvikio.defaults.task_size_reset`, and :py:func:`kvikio.defaults.set_task_size`. GDS Threshold ``KVIKIO_GDS_THRESHOLD`` -------------------------------------- In order to improve performance of small IO, ``.pread()`` and ``.pwrite()`` implement a shortcut that circumvent the threadpool and use the POSIX backend directly. Set the environment variable ``KVIKIO_GDS_THRESHOLD`` to the minimum size (in bytes) to use GDS. If not set, the default value is 1048576 (1 MiB). + +This setting can also be controlled by :py:func:`kvikio.defaults.gds_threshold`, :py:func:`kvikio.defaults.gds_threshold_reset`, and :py:func:`kvikio.defaults.set_gds_threshold`. + +Size of the Bounce Buffer ``KVIKIO_BOUNCE_BUFFER_SIZE`` +------------------------------------------------------- +KvikIO might have to use intermediate host buffers (one per thread) when copying between files and device memory. Set the environment variable ``KVIKIO_BOUNCE_BUFFER_SIZE`` to the size (in bytes) of these "bounce" buffers. If not set, the default value is 16777216 (16 MiB). + +This setting can also be controlled by :py:func:`kvikio.defaults.bounce_buffer_size`, :py:func:`kvikio.defaults.bounce_buffer_size_reset`, and :py:func:`kvikio.defaults.set_bounce_buffer_size`. diff --git a/python/kvikio/kvikio/__init__.py b/python/kvikio/kvikio/__init__.py index d31d308916..883ac9e784 100644 --- a/python/kvikio/kvikio/__init__.py +++ b/python/kvikio/kvikio/__init__.py @@ -1,19 +1,10 @@ # Copyright (c) 2021-2024, NVIDIA CORPORATION. All rights reserved. # See file LICENSE for terms. -from kvikio._lib import buffer, driver_properties # type: ignore +from kvikio._lib import driver_properties # type: ignore from kvikio._version import __git_commit__, __version__ from kvikio.cufile import CuFile - -def memory_register(buf) -> None: - return buffer.memory_register(buf) - - -def memory_deregister(buf) -> None: - buffer.memory_deregister(buf) - - # TODO: Wrap nicely, maybe as a dataclass? DriverProperties = driver_properties.DriverProperties diff --git a/python/kvikio/kvikio/_lib/buffer.pyx b/python/kvikio/kvikio/_lib/buffer.pyx index 93d8379eef..3b90f09816 100644 --- a/python/kvikio/kvikio/_lib/buffer.pyx +++ b/python/kvikio/kvikio/_lib/buffer.pyx @@ -8,7 +8,7 @@ from kvikio._lib.arr cimport Array -cdef extern from "" namespace "kvikio" nogil: +cdef extern from "" nogil: void cpp_memory_register "kvikio::memory_register"(const void* devPtr) except + void cpp_memory_deregister "kvikio::memory_deregister"(const void* devPtr) except + @@ -25,3 +25,11 @@ def memory_deregister(buf) -> None: buf = Array(buf) cdef Array arr = buf cpp_memory_deregister(arr.ptr) + + +cdef extern from "" nogil: + size_t cpp_alloc_retain_clear "kvikio::AllocRetain::instance().clear"() except + + + +def bounce_buffer_free() -> int: + return cpp_alloc_retain_clear() diff --git a/python/kvikio/kvikio/benchmarks/single_node_io.py b/python/kvikio/kvikio/benchmarks/single_node_io.py index 4d47a80791..bca29ef90d 100644 --- a/python/kvikio/kvikio/benchmarks/single_node_io.py +++ b/python/kvikio/kvikio/benchmarks/single_node_io.py @@ -14,6 +14,7 @@ from dask.utils import format_bytes, parse_bytes import kvikio +import kvikio.buffer import kvikio.defaults from kvikio.benchmarks.utils import parse_directory, pprint_sys_info @@ -38,7 +39,7 @@ def run_cufile(args): file_path = args.dir / "kvikio-single-file" data = create_data(args.nbytes) if args.pre_register_buffer: - kvikio.memory_register(data) + kvikio.buffer.memory_register(data) # Write f = kvikio.CuFile(file_path, flags="w") @@ -57,7 +58,7 @@ def run_cufile(args): assert res == args.nbytes, f"IO mismatch, expected {args.nbytes} got {res}" if args.pre_register_buffer: - kvikio.memory_deregister(data) + kvikio.buffer.memory_deregister(data) return read_time, write_time @@ -73,7 +74,7 @@ def run_cufile_multiple_files_multiple_arrays(args): arrays = [create_data(chunksize) for _ in range(args.nthreads)] if args.pre_register_buffer: for array in arrays: - kvikio.memory_register(array) + kvikio.buffer.memory_register(array) # Write files = [kvikio.CuFile(file_path % i, flags="w") for i in range(args.nthreads)] @@ -95,7 +96,7 @@ def run_cufile_multiple_files_multiple_arrays(args): if args.pre_register_buffer: for array in arrays: - kvikio.memory_deregister(array) + kvikio.buffer.memory_deregister(array) return read_time, write_time @@ -108,7 +109,7 @@ def run_cufile_multiple_files(args): file_path = str(args.dir / "cufile-p-%03d") data = create_data(args.nbytes) if args.pre_register_buffer: - kvikio.memory_register(data) + kvikio.buffer.memory_register(data) # Write files = [kvikio.CuFile(file_path % i, flags="w") for i in range(args.nthreads)] @@ -133,7 +134,7 @@ def run_cufile_multiple_files(args): assert res == args.nbytes, f"IO mismatch, expected {args.nbytes} got {res}" if args.pre_register_buffer: - kvikio.memory_deregister(data) + kvikio.buffer.memory_deregister(data) return read_time, write_time @@ -149,7 +150,7 @@ def run_cufile_multiple_arrays(args): arrays = [create_data(chunksize) for _ in range(args.nthreads)] if args.pre_register_buffer: for array in arrays: - kvikio.memory_register(array) + kvikio.buffer.memory_register(array) # Write f = kvikio.CuFile(file_path, flags="w") @@ -174,7 +175,7 @@ def run_cufile_multiple_arrays(args): if args.pre_register_buffer: for array in arrays: - kvikio.memory_deregister(array) + kvikio.buffer.memory_deregister(array) return read_time, write_time diff --git a/python/kvikio/kvikio/buffer.py b/python/kvikio/kvikio/buffer.py new file mode 100644 index 0000000000..62bbc754b4 --- /dev/null +++ b/python/kvikio/kvikio/buffer.py @@ -0,0 +1,41 @@ +# Copyright (c) 2024, NVIDIA CORPORATION. All rights reserved. +# See file LICENSE for terms. + +from kvikio._lib import buffer # type: ignore + + +def memory_register(buf) -> None: + """Register a device memory allocation with cuFile. + + Warning + ------- + This API is intended for usecases where the memory is used as a streaming + buffer that is reused across multiple cuFile IO operations. + + Parameters + ---------- + buf: buffer-like or array-like + Device buffer to register . + """ + return buffer.memory_register(buf) + + +def memory_deregister(buf) -> None: + """Deregister an already registered device memory from cuFile. + + Parameters + ---------- + buf: buffer-like or array-like + Device buffer to deregister . + """ + buffer.memory_deregister(buf) + + +def bounce_buffer_free() -> int: + """Free the host allocations used as bounce buffers. + + Returns + ------- + Number of bytes freed. + """ + return buffer.bounce_buffer_free() diff --git a/python/kvikio/kvikio/defaults.py b/python/kvikio/kvikio/defaults.py index 2aaa2ffab8..a0ff265873 100644 --- a/python/kvikio/kvikio/defaults.py +++ b/python/kvikio/kvikio/defaults.py @@ -23,8 +23,8 @@ def compat_mode() -> bool: - when `/run/udev` isn't readable, which typically happens when running inside a docker image not launched with `--volume /run/udev:/run/udev:ro` - Return - ------ + Returns + ------- bool Whether KvikIO is running in compatibility mode or not. """ @@ -68,8 +68,8 @@ def get_num_threads() -> int: Set the default value using `num_threads_reset()` or by setting the `KVIKIO_NTHREADS` environment variable. If not set, the default value is 1. - Return - ------ + Returns + ------- nthreads: int The number of threads in the current thread pool. """ @@ -119,8 +119,8 @@ def task_size() -> int: the `KVIKIO_TASK_SIZE` environment variable. If not set, the default value is 4 MiB. - Return - ------ + Returns + ------- nbytes: int The default task size in bytes. """ @@ -166,8 +166,8 @@ def gds_threshold() -> int: `KVIKIO_GDS_THRESHOLD` environment variable. If not set, the default value is 1 MiB. - Return - ------ + Returns + ------- nbytes : int The default GDS threshold size in bytes. """ diff --git a/python/kvikio/tests/test_basic_io.py b/python/kvikio/tests/test_basic_io.py index 4346af4f0e..e1e9932e23 100644 --- a/python/kvikio/tests/test_basic_io.py +++ b/python/kvikio/tests/test_basic_io.py @@ -8,6 +8,7 @@ import pytest import kvikio +import kvikio.buffer import kvikio.defaults cupy = pytest.importorskip("cupy") @@ -273,3 +274,22 @@ def test_different_bounce_buffer_sizes(tmp_path, size, tasksize, buffer_size): f.write(a) assert f.read(b) == b.nbytes cupy.testing.assert_array_equal(a, b) + + +def test_bounce_buffer_free(tmp_path): + """Test freeing the bounce buffer allocations""" + filename = tmp_path / "test-file" + kvikio.buffer.bounce_buffer_free() + with kvikio.defaults.set_compat_mode(True), kvikio.defaults.set_num_threads(1): + with kvikio.CuFile(filename, "w") as f: + with kvikio.defaults.set_bounce_buffer_size(1024): + # Notice, since the bounce buffer size is only checked when the buffer + # is used, we populate the bounce buffer in between we clear it. + f.write(cupy.arange(10)) + assert kvikio.buffer.bounce_buffer_free() == 1024 + assert kvikio.buffer.bounce_buffer_free() == 0 + f.write(cupy.arange(10)) + with kvikio.defaults.set_bounce_buffer_size(2048): + f.write(cupy.arange(10)) + assert kvikio.buffer.bounce_buffer_free() == 2048 + assert kvikio.buffer.bounce_buffer_free() == 0