diff --git a/cpp/CMakeLists.txt b/cpp/CMakeLists.txt index 3d61d4a4e1..37d237e0e3 100644 --- a/cpp/CMakeLists.txt +++ b/cpp/CMakeLists.txt @@ -147,6 +147,7 @@ set(SOURCES "src/error.cpp" "src/file_handle.cpp" "src/file_utils.cpp" + "src/mmap.cpp" "src/nvtx.cpp" "src/posix_io.cpp" "src/shim/cuda.cpp" diff --git a/cpp/examples/CMakeLists.txt b/cpp/examples/CMakeLists.txt index b7aa73dd2d..fb0b680f17 100644 --- a/cpp/examples/CMakeLists.txt +++ b/cpp/examples/CMakeLists.txt @@ -57,3 +57,23 @@ install( DESTINATION ${TEST_INSTALL_PATH} EXCLUDE_FROM_ALL ) + +# Example: mmap_io_host + +add_executable(MMAP_IO_HOST_EXAMPLE mmap_io_host.cpp) +set_target_properties(MMAP_IO_HOST_EXAMPLE PROPERTIES INSTALL_RPATH "\$ORIGIN/../../../lib") +target_link_libraries(MMAP_IO_HOST_EXAMPLE PRIVATE kvikio::kvikio) + +if(CMAKE_CXX_COMPILER_ID STREQUAL "GNU") + set(KVIKIO_CXX_FLAGS "-Wall;-Werror;-Wno-unknown-pragmas") + target_compile_options( + MMAP_IO_HOST_EXAMPLE PRIVATE "$<$:${KVIKIO_CXX_FLAGS}>" + ) +endif() + +install( + TARGETS MMAP_IO_HOST_EXAMPLE + COMPONENT testing + DESTINATION ${TEST_INSTALL_PATH} + EXCLUDE_FROM_ALL +) diff --git a/cpp/examples/mmap_io_host.cpp b/cpp/examples/mmap_io_host.cpp new file mode 100644 index 0000000000..ad8bf42416 --- /dev/null +++ b/cpp/examples/mmap_io_host.cpp @@ -0,0 +1,163 @@ +/* + * Copyright (c) 2025, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include +#include +#include +#include + +#include +#include +#include +#include "kvikio/defaults.hpp" + +std::string parse_cmd(int argc, char* argv[]) { return (argc > 1) ? argv[1] : "/tmp"; } + +class Timer { + public: + void start() { _start = std::chrono::high_resolution_clock::now(); } + double elapsed_time() + { + auto end = std::chrono::high_resolution_clock::now(); + std::chrono::duration time_elapsed = end - _start; + auto result = time_elapsed.count(); + return result; + } + + private: + std::chrono::time_point _start; +}; + +class IoHostManager { + public: + IoHostManager(std::string const& test_dir = "/tmp", bool clear_page_cache = false) + : _test_filepath(test_dir + "/test-file"), + _clear_page_cache(clear_page_cache), + _data_size(1024ull * 1024ull * 1024ull), + _num_repetition(10) + { + kvikio::defaults::set_num_threads(8); + std::vector v(_data_size, {}); + kvikio::FileHandle file_handle(_test_filepath, "w"); + auto fut = file_handle.pwrite(v.data(), v.size()); + fut.get(); + } + + void use_standard_io_parallel() + { + std::cout << "Standard I/O\n"; + std::vector v(_data_size, {}); + double ave_init_time{0.0}; + double ave_io_time{0.0}; + + for (std::size_t i = 0; i < _num_repetition; ++i) { + if (_clear_page_cache) { kvikio::clear_page_cache(); } + print_page_cache_info(); + + Timer timer; + timer.start(); + kvikio::FileHandle file_handle(_test_filepath, "r"); + ave_init_time += timer.elapsed_time(); + + timer.start(); + auto fut = file_handle.pread(v.data(), _data_size); + fut.get(); + ave_io_time += timer.elapsed_time(); + } + + std::cout << " Average initialization time: " << ave_init_time / _num_repetition << "\n"; + std::cout << " Average I/O time: " << ave_io_time / _num_repetition << "\n"; + } + + void use_mmap_io_seq() + { + std::cout << "Mmap I/O (sequential)\n"; + std::vector v(_data_size, {}); + double ave_init_time{0.0}; + double ave_io_time{0.0}; + + for (std::size_t i = 0; i < _num_repetition; ++i) { + if (_clear_page_cache) { kvikio::clear_page_cache(); } + print_page_cache_info(); + + Timer timer; + timer.start(); + kvikio::MmapHandle mmap_handle(_test_filepath, "r"); + ave_init_time += timer.elapsed_time(); + + timer.start(); + mmap_handle.read(v.data(), _data_size); + ave_io_time += timer.elapsed_time(); + } + + std::cout << " Average initialization time: " << ave_init_time / _num_repetition << "\n"; + std::cout << " Average I/O time: " << ave_io_time / _num_repetition << "\n"; + } + + void use_mmap_io_parallel() + { + std::cout << "Mmap I/O (parallel)\n"; + std::vector v(_data_size, {}); + double ave_init_time{0.0}; + double ave_io_time{0.0}; + + for (std::size_t i = 0; i < _num_repetition; ++i) { + if (_clear_page_cache) { kvikio::clear_page_cache(); } + print_page_cache_info(); + + Timer timer; + timer.start(); + kvikio::MmapHandle mmap_handle(_test_filepath, "r"); + ave_init_time += timer.elapsed_time(); + + timer.start(); + auto fut = mmap_handle.pread(v.data(), _data_size); + fut.get(); + ave_io_time += timer.elapsed_time(); + } + + std::cout << " Average initialization time: " << ave_init_time / _num_repetition << "\n"; + std::cout << " Average I/O time: " << ave_io_time / _num_repetition << "\n"; + } + + private: + void print_page_cache_info() + { + auto const [num_pages_in_page_cache, num_pages] = kvikio::get_page_cache_info(_test_filepath); + std::cout << " Page cache residency ratio: " + << static_cast(num_pages_in_page_cache) / static_cast(num_pages) + << "\n"; + } + std::string _test_filepath; + bool _clear_page_cache; + std::size_t _data_size; + std::size_t _num_repetition; +}; + +int main() +{ + // auto test_dir = parse_cmd(argc, argv); + IoHostManager io_host_manager{"/mnt/nvme", false}; + + io_host_manager.use_mmap_io_seq(); + + io_host_manager.use_mmap_io_parallel(); + + io_host_manager.use_standard_io_parallel(); + + return 0; +} diff --git a/cpp/include/kvikio/defaults.hpp b/cpp/include/kvikio/defaults.hpp index 200f248d79..8780963e9c 100644 --- a/cpp/include/kvikio/defaults.hpp +++ b/cpp/include/kvikio/defaults.hpp @@ -130,6 +130,7 @@ class defaults { std::size_t _http_max_attempts; long _http_timeout; std::vector _http_status_codes; + std::size_t _mmap_task_size; static unsigned int get_num_threads_from_env(); @@ -367,6 +368,10 @@ class defaults { * @param status_codes The HTTP status codes to retry. */ static void set_http_status_codes(std::vector status_codes); + + [[nodiscard]] static std::size_t mmap_task_size(); + + static void set_mmap_task_size(std::size_t nbytes); }; } // namespace kvikio diff --git a/cpp/include/kvikio/file_utils.hpp b/cpp/include/kvikio/file_utils.hpp index 778608a03b..2f03f0cbdc 100644 --- a/cpp/include/kvikio/file_utils.hpp +++ b/cpp/include/kvikio/file_utils.hpp @@ -180,4 +180,27 @@ std::pair get_page_cache_info(std::string const& file_ * @sa `get_page_cache_info(std::string const&)` overload. */ std::pair get_page_cache_info(int fd); + +/** + * @brief Clear the page cache + * + * @param reclaim_dentries_and_inodes Whether to free reclaimable slab objects which include + * dentries and inodes. + * - If `true`, equivalent to executing `/sbin/sysctl vm.drop_caches=3`; + * - If `false`, equivalent to executing `/sbin/sysctl vm.drop_caches=1`. + * @param clear_dirty_pages Whether to trigger the writeback process to clear the dirty pages. If + * `true`, `sync` will be called prior to cache clearing. + * @return Whether the page cache has been successfully cleared + * + * @note This function creates a child process and executes the cache clearing shell command in the + * following order + * - Execute the command without `sudo` prefix. This is for the superuser and also for specially + * configured systems where unprivileged users cannot execute `/usr/bin/sudo` but can execute + * `/sbin/sysctl`. If this step succeeds, the function returns `true` immediately. + * - Execute the command with `sudo` prefix. This is for the general case where selective + * unprivileged users have permission to run `/sbin/sysctl` with `sudo` prefix. + * + * @throws kvikio::GenericSystemError if somehow the child process could not be created. + */ +bool clear_page_cache(bool reclaim_dentries_and_inodes = true, bool clear_dirty_pages = true); } // namespace kvikio diff --git a/cpp/include/kvikio/mmap.hpp b/cpp/include/kvikio/mmap.hpp new file mode 100644 index 0000000000..3f04833ad6 --- /dev/null +++ b/cpp/include/kvikio/mmap.hpp @@ -0,0 +1,148 @@ +/* + * Copyright (c) 2025, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#pragma once + +#include +#include + +#include +#include +#include + +namespace kvikio { + +/** + * @brief + * + */ +class MmapHandle { + private: + void* _buf{}; + std::size_t _initial_size{}; + std::size_t _initial_file_offset{}; + std::size_t _file_size{}; + std::size_t _map_offset{}; + std::size_t _map_size{}; + void* _map_addr{}; + bool _initialized{}; + int _map_protection_flags{}; + int _map_core_flags{}; + FileWrapper _file_wrapper{}; + + /** + * @brief + * + * @param size + * @param file_offset + * @return + */ + std::tuple prepare_read(std::size_t size, + std::size_t file_offset); + + public: + /** + * @brief Construct a new Mmap Handle object + * + */ + MmapHandle() noexcept = default; + + /** + * @brief Construct a new Mmap Handle object + * + * @param file_path + * @param flags + * @param initial_size + * @param initial_file_offset + * @param mode + */ + MmapHandle(std::string const& file_path, + std::string const& flags = "r", + std::optional initial_size = std::nullopt, + std::size_t initial_file_offset = 0, + mode_t mode = FileHandle::m644); + + MmapHandle(MmapHandle const&) = delete; + MmapHandle& operator=(MmapHandle const&) = delete; + MmapHandle(MmapHandle&& o) noexcept; + MmapHandle& operator=(MmapHandle&& o) noexcept; + ~MmapHandle() noexcept; + + /** + * @brief + * + * @return std::size_t + */ + std::size_t initial_size() const noexcept; + + /** + * @brief + * + * @return std::size_t + */ + std::size_t initial_file_offset() const noexcept; + + /** + * @brief Get the file size + * + * @return The number of bytes + */ + [[nodiscard]] std::size_t file_size() const; + + /** + * @brief Get the file size + * + * @return The number of bytes + */ + [[nodiscard]] std::size_t nbytes() const; + + /** + * @brief + * + * @return Boolean answer + */ + [[nodiscard]] bool closed() const noexcept; + + /** + * @brief + * + */ + void close() noexcept; + + /** + * @brief + * + * @param size + * @param file_offset + * @return + */ + std::size_t read(void* buf, + std::optional size = std::nullopt, + std::size_t file_offset = 0); + + /** + * @brief + * + * @param size + * @param file_offset + * @param mmap_task_size + * @return + */ + std::future pread(void* buf, + std::optional size = std::nullopt, + std::size_t file_offset = 0, + std::size_t mmap_task_size = defaults::mmap_task_size()); +}; +} // namespace kvikio diff --git a/cpp/include/kvikio/utils.hpp b/cpp/include/kvikio/utils.hpp index 7724fe92a6..cd9c729a45 100644 --- a/cpp/include/kvikio/utils.hpp +++ b/cpp/include/kvikio/utils.hpp @@ -191,4 +191,12 @@ bool is_future_done(T const& future) return future.wait_for(std::chrono::seconds(0)) != std::future_status::timeout; } +[[nodiscard]] std::size_t align_up(std::size_t value, std::size_t alignment) noexcept; + +[[nodiscard]] void* align_up(void* addr, std::size_t alignment) noexcept; + +[[nodiscard]] std::size_t align_down(std::size_t value, std::size_t alignment) noexcept; + +[[nodiscard]] void* align_down(void* addr, std::size_t alignment) noexcept; + } // namespace kvikio diff --git a/cpp/src/defaults.cpp b/cpp/src/defaults.cpp index 24a4de0898..156ca92027 100644 --- a/cpp/src/defaults.cpp +++ b/cpp/src/defaults.cpp @@ -146,6 +146,13 @@ defaults::defaults() _http_status_codes = getenv_or("KVIKIO_HTTP_STATUS_CODES", std::vector{429, 500, 502, 503, 504}); } + // Determine the default value of `mmap_task_size` + { + ssize_t const env = getenv_or("KVIKIO_MMAP_TASK_SIZE", 4 * 1024 * 1024); + KVIKIO_EXPECT( + env >= 0, "KVIKIO_MMAP_TASK_SIZE has to be a non-negative integer", std::invalid_argument); + _mmap_task_size = env; + } } defaults* defaults::instance() @@ -236,4 +243,7 @@ void defaults::set_http_timeout(long timeout_seconds) instance()->_http_timeout = timeout_seconds; } +std::size_t defaults::mmap_task_size() { return instance()->_mmap_task_size; } + +void defaults::set_mmap_task_size(std::size_t nbytes) { instance()->_mmap_task_size = nbytes; } } // namespace kvikio diff --git a/cpp/src/file_utils.cpp b/cpp/src/file_utils.cpp index 3c7951effe..124edf7b86 100644 --- a/cpp/src/file_utils.cpp +++ b/cpp/src/file_utils.cpp @@ -18,7 +18,11 @@ #include #include #include + +#include #include +#include +#include #include #include #include @@ -209,4 +213,43 @@ std::pair get_page_cache_info(int fd) SYSCALL_CHECK(munmap(addr, file_size)); return {num_pages_in_page_cache, num_pages}; } + +bool clear_page_cache(bool reclaim_dentries_and_inodes, bool clear_dirty_pages) +{ + KVIKIO_NVTX_FUNC_RANGE(); + if (clear_dirty_pages) { sync(); } + std::string param = reclaim_dentries_and_inodes ? "3" : "1"; + + auto exec_cmd = [](std::string_view cmd) -> bool { + // Prevent the output from the command from mixing with the original process' output. + fflush(nullptr); + // popen only handles stdout. Switch stderr and stdout to only capture stderr. + auto const redirected_cmd = + std::string{"( "}.append(cmd).append(" 3>&2 2>&1 1>&3) 2>/dev/null"); + std::unique_ptr pipe(popen(redirected_cmd.c_str(), "r"), pclose); + KVIKIO_EXPECT(pipe != nullptr, "popen() failed", GenericSystemError); + + std::array buffer; + std::string error_out; + while (fgets(buffer.data(), buffer.size(), pipe.get()) != nullptr) { + error_out += buffer.data(); + } + return error_out.empty(); + }; + + std::array cmds{ + // Special case: + // - Unprivileged users who cannot execute `/usr/bin/sudo` but can execute `/sbin/sysctl`, and + // - Superuser + std::string{"/sbin/sysctl vm.drop_caches=" + param}, + // General case: + // - Unprivileged users who can execute `sudo`, and + // - Superuser + std::string{"sudo /sbin/sysctl vm.drop_caches=" + param}}; + + for (auto const& cmd : cmds) { + if (exec_cmd(cmd)) { return true; } + } + return false; +} } // namespace kvikio diff --git a/cpp/src/mmap.cpp b/cpp/src/mmap.cpp new file mode 100644 index 0000000000..8b6a1bc7c1 --- /dev/null +++ b/cpp/src/mmap.cpp @@ -0,0 +1,300 @@ +/* + * Copyright (c) 2025, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include + +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include +#include +#include "kvikio/file_utils.hpp" + +namespace kvikio { + +namespace detail { + +/** + * @brief Change an address `p` by a signed difference of `v` + * + * @tparam Integer Signed integer type + * @param p An address + * @param v Change of `p` in bytes + * @return A new address as a result of applying `v` on `p` + * + * @note This function exploits UB in C++. + */ +template +void* pointer_add(void* p, Integer v) +{ + static_assert(std::is_integral_v); + return static_cast(p) + v; +} + +/** + * @brief The distance in bytes between pointer `p1` and `p2` + * + * @param p1 The first pointer + * @param p2 The second pointer + * @return Signed result of (`p1` - `p2`). Both pointers are cast to std::byte* before subtraction. + * + * @note This function exploits UB in C++. + */ +std::ptrdiff_t pointer_diff(void* p1, void* p2) +{ + return static_cast(p1) - static_cast(p2); +} +} // namespace detail + +MmapHandle::MmapHandle(std::string const& file_path, + std::string const& flags, + std::optional initial_size, + std::size_t initial_file_offset, + mode_t mode) + : _initial_file_offset(initial_file_offset), + _initialized{true}, + _map_core_flags{MAP_PRIVATE}, + _file_wrapper(file_path, flags, false /* o_direct */, mode) +{ + KVIKIO_NVTX_FUNC_RANGE(); + + _file_size = get_file_size(_file_wrapper.fd()); + if (_file_size == 0) { return; } + + KVIKIO_EXPECT( + _initial_file_offset < _file_size, "Offset is past the end of file", std::out_of_range); + + // An initial size of std::nullopt is a shorthand for "starting from _initial_file_offset to the + // end of file". + _initial_size = + initial_size.has_value() ? initial_size.value() : (_file_size - _initial_file_offset); + + KVIKIO_EXPECT(_initial_size > 0, "Mapped region should not be zero byte", std::invalid_argument); + KVIKIO_EXPECT(_initial_file_offset + _initial_size <= _file_size, + "Mapped region is past the end of file", + std::out_of_range); + + auto const page_size = get_page_size(); + _map_offset = align_down(_initial_file_offset, page_size); + auto const offset_delta = _initial_file_offset - _map_offset; + _map_size = _initial_size + offset_delta; + + switch (flags[0]) { + case 'r': { + _map_protection_flags = PROT_READ; + break; + } + case 'w': { + KVIKIO_FAIL("File-backed mmap write is not supported yet", std::invalid_argument); + } + default: { + KVIKIO_FAIL("Unknown file open flag", std::invalid_argument); + } + } + + _map_addr = mmap( + nullptr, _map_size, _map_protection_flags, _map_core_flags, _file_wrapper.fd(), _map_offset); + SYSCALL_CHECK(_map_addr, "Cannot create memory mapping", MAP_FAILED); + _buf = detail::pointer_add(_map_addr, offset_delta); +} + +MmapHandle::MmapHandle(MmapHandle&& o) noexcept + : _buf{std::exchange(o._buf, {})}, + _initial_size{std::exchange(o._initial_size, {})}, + _initial_file_offset{std::exchange(o._initial_file_offset, {})}, + _file_size{std::exchange(o._file_size, {})}, + _map_offset{std::exchange(o._map_offset, {})}, + _map_size{std::exchange(o._map_size, {})}, + _map_addr{std::exchange(o._map_addr, {})}, + _initialized{std::exchange(o._initialized, {})}, + _map_protection_flags{std::exchange(o._map_protection_flags, {})}, + _map_core_flags{std::exchange(o._map_core_flags, {})}, + _file_wrapper{std::exchange(o._file_wrapper, {})} +{ +} + +MmapHandle& MmapHandle::operator=(MmapHandle&& o) noexcept +{ + _buf = std::exchange(o._buf, {}); + _initial_size = std::exchange(o._initial_size, {}); + _initial_file_offset = std::exchange(o._initial_file_offset, {}); + _file_size = std::exchange(o._file_size, {}); + _map_offset = std::exchange(o._map_offset, {}); + _map_size = std::exchange(o._map_size, {}); + _map_addr = std::exchange(o._map_addr, {}); + _initialized = std::exchange(o._initialized, {}); + _map_protection_flags = std::exchange(o._map_protection_flags, {}); + _map_core_flags = std::exchange(o._map_core_flags, {}); + _file_wrapper = std::exchange(o._file_wrapper, {}); + + return *this; +} + +MmapHandle::~MmapHandle() noexcept +{ + KVIKIO_NVTX_FUNC_RANGE(); + close(); +} + +// |--> file start |<--page_size-->| +// | +// (0) |...............|...............|...............|...............|............ +// +// (1) |<--_initial_file_offset-->|<---------------_initial_size--------------->| +// |--> _buf +// +// (2) |<-_map_offset->|<----------------------_map_size----------------------->| +// |--> _map_addr +// +// (3) |<---------------------file_offset--------------------->|<--size-->| +// |--> _buf +// |--> start_addr +// |--> start_aligned_addr + +bool MmapHandle::closed() const noexcept { return !_initialized; } + +void MmapHandle::close() noexcept +{ + KVIKIO_NVTX_FUNC_RANGE(); + if (closed() || _map_addr == nullptr) { return; } + try { + auto ret = munmap(_map_addr, _map_size); + SYSCALL_CHECK(ret); + } catch (...) { + } + _buf = {}; + _initial_size = {}; + _initial_file_offset = {}; + _file_size = {}; + _map_offset = {}; + _map_size = {}; + _map_addr = {}; + _initialized = {}; + _map_protection_flags = {}; + _map_core_flags = {}; + _file_wrapper = {}; +} + +std::size_t MmapHandle::initial_size() const noexcept { return _initial_size; } + +std::size_t MmapHandle::initial_file_offset() const noexcept { return _initial_file_offset; } + +std::size_t MmapHandle::file_size() const +{ + if (closed()) { return 0; } + return get_file_size(_file_wrapper.fd()); +} + +std::size_t MmapHandle::nbytes() const { return file_size(); } + +std::size_t MmapHandle::read(void* buf, std::optional size, std::size_t file_offset) +{ + KVIKIO_EXPECT(!closed(), "Cannot read from a closed MmapHandle", std::runtime_error); + + // Argument validation + KVIKIO_EXPECT(file_offset < _file_size, "Offset is past the end of file", std::out_of_range); + auto actual_size = size.has_value() ? size.value() : _file_size - file_offset; + KVIKIO_EXPECT(actual_size > 0, "Read size must be greater than 0", std::invalid_argument); + KVIKIO_EXPECT(file_offset >= _initial_file_offset && + file_offset + actual_size <= _initial_file_offset + _initial_size, + "Read is out of bound", + std::out_of_range); + + KVIKIO_NVTX_FUNC_RANGE(); + + auto const is_buf_host_mem = is_host_memory(buf); + CUcontext ctx{}; + if (!is_buf_host_mem) { ctx = get_context_from_pointer(buf); } + + auto const src_buf = detail::pointer_add(_buf, file_offset - _initial_file_offset); + + if (is_buf_host_mem) { + std::memcpy(buf, src_buf, actual_size); + } else { + PushAndPopContext c(ctx); + CUstream stream = detail::StreamsByThread::get(); + CUDA_DRIVER_TRY(cudaAPI::instance().MemcpyHtoDAsync( + convert_void2deviceptr(buf), src_buf, actual_size, stream)); + CUDA_DRIVER_TRY(cudaAPI::instance().StreamSynchronize(stream)); + } + return actual_size; +} + +std::future MmapHandle::pread(void* buf, + std::optional size, + std::size_t file_offset, + std::size_t mmap_task_size) +{ + KVIKIO_EXPECT(!closed(), "Cannot read from a closed MmapHandle", std::runtime_error); + + // Argument validation + KVIKIO_EXPECT(file_offset < _file_size, "Offset is past the end of file", std::out_of_range); + auto actual_size = size.has_value() ? size.value() : _file_size - file_offset; + KVIKIO_EXPECT(actual_size > 0, "Read size must be greater than 0", std::invalid_argument); + KVIKIO_EXPECT(file_offset >= _initial_file_offset && + file_offset + actual_size <= _initial_file_offset + _initial_size, + "Read is out of bound", + std::out_of_range); + + auto& [nvtx_color, call_idx] = detail::get_next_color_and_call_idx(); + KVIKIO_NVTX_FUNC_RANGE(actual_size, nvtx_color); + + auto const is_buf_host_mem = is_host_memory(buf); + CUcontext ctx{}; + if (!is_buf_host_mem) { ctx = get_context_from_pointer(buf); } + + auto const src_buf = detail::pointer_add(_buf, file_offset - _initial_file_offset); + std::size_t actual_mmap_task_size = + (mmap_task_size == 0) ? std::max(1, actual_size / defaults::num_threads()) + : mmap_task_size; + + auto op = [global_src_buf = src_buf, is_buf_host_mem = is_buf_host_mem, ctx = ctx]( + void* buf, std::size_t size, std::size_t, std::size_t buf_offset) -> std::size_t { + auto const src_buf = detail::pointer_add(global_src_buf, buf_offset); + auto const dst_buf = detail::pointer_add(buf, buf_offset); + + if (is_buf_host_mem) { + std::memcpy(dst_buf, src_buf, size); + } else { + PushAndPopContext c(ctx); + CUstream stream = detail::StreamsByThread::get(); + CUDA_DRIVER_TRY(cudaAPI::instance().MemcpyHtoDAsync( + convert_void2deviceptr(dst_buf), src_buf, size, stream)); + CUDA_DRIVER_TRY(cudaAPI::instance().StreamSynchronize(stream)); + } + + return size; + }; + + return parallel_io(op, + buf, + actual_size, + file_offset, + actual_mmap_task_size, + 0 /* global_buf_offset */, + call_idx, + nvtx_color); +} + +} // namespace kvikio diff --git a/cpp/src/utils.cpp b/cpp/src/utils.cpp index cb4a51b890..164829a930 100644 --- a/cpp/src/utils.cpp +++ b/cpp/src/utils.cpp @@ -15,6 +15,7 @@ */ #include +#include #include #include #include @@ -180,4 +181,25 @@ std::tuple get_alloc_info(void const* devPtr, C return std::make_tuple(reinterpret_cast(base_ptr), base_size, offset); } +std::size_t align_up(std::size_t value, std::size_t alignment) noexcept +{ + return (value + alignment - 1) & ~(alignment - 1); +} + +void* align_up(void* addr, std::size_t alignment) noexcept +{ + auto res = (reinterpret_cast(addr) + alignment - 1) & ~(alignment - 1); + return reinterpret_cast(res); +} + +std::size_t align_down(std::size_t value, std::size_t alignment) noexcept +{ + return value & ~(alignment - 1); +} + +void* align_down(void* addr, std::size_t alignment) noexcept +{ + auto res = reinterpret_cast(addr) & ~(alignment - 1); + return reinterpret_cast(res); +} } // namespace kvikio diff --git a/cpp/tests/CMakeLists.txt b/cpp/tests/CMakeLists.txt index 47be1d9899..a6b8391928 100644 --- a/cpp/tests/CMakeLists.txt +++ b/cpp/tests/CMakeLists.txt @@ -76,6 +76,8 @@ kvikio_add_test(NAME DEFAULTS_TEST SOURCES test_defaults.cpp utils/env.cpp) kvikio_add_test(NAME ERROR_TEST SOURCES test_error.cpp) +kvikio_add_test(NAME MMAP_TEST SOURCES test_mmap.cpp) + kvikio_add_test(NAME REMOTE_HANDLE_TEST SOURCES test_remote_handle.cpp utils/env.cpp) rapids_test_install_relocatable(INSTALL_COMPONENT_SET testing DESTINATION bin/tests/libkvikio) diff --git a/cpp/tests/test_mmap.cpp b/cpp/tests/test_mmap.cpp new file mode 100644 index 0000000000..e8cbb85c0a --- /dev/null +++ b/cpp/tests/test_mmap.cpp @@ -0,0 +1,303 @@ +/* + * Copyright (c) 2025, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include +#include + +#include +#include +#include +#include +#include +#include +#include + +#include "utils/utils.hpp" + +using ::testing::HasSubstr; +using ::testing::ThrowsMessage; + +class MmapTest : public testing::Test { + protected: + void SetUp() override + { + kvikio::test::TempDir tmp_dir{false}; + _filepath = tmp_dir.path() / "test.bin"; + std::size_t num_elements = 1024ull * 1024ull; + _host_buf = CreateTempFile(_filepath, num_elements); + _dev_buf = kvikio::test::DevBuffer{_host_buf}; + _page_size = kvikio::get_page_size(); + } + + void TearDown() override {} + + template + std::vector CreateTempFile(std::string const& filepath, std::size_t num_elements) + { + std::vector v(num_elements); + std::iota(v.begin(), v.end(), 0); + kvikio::FileHandle f(filepath, "w"); + auto fut = f.pwrite(v.data(), v.size() * sizeof(T)); + fut.get(); + _file_size = f.nbytes(); + return v; + } + + std::filesystem::path _filepath; + std::size_t _file_size; + std::size_t _page_size; + std::vector _host_buf; + kvikio::test::DevBuffer _dev_buf; + + using value_type = decltype(_host_buf)::value_type; +}; + +TEST_F(MmapTest, invalid_file_open_flag) +{ + // Empty file open flag + EXPECT_THAT( + [=] { + { + [[maybe_unused]] auto mmap_handle = kvikio::MmapHandle(_filepath, ""); + } + }, + ThrowsMessage(HasSubstr("Unknown file open flag"))); + + // Invalid file open flag + EXPECT_THAT( + [=] { + { + [[maybe_unused]] auto mmap_handle = kvikio::MmapHandle(_filepath, "z"); + } + }, + ThrowsMessage(HasSubstr("Unknown file open flag"))); +} + +TEST_F(MmapTest, constructor_invalid_range) +{ + // init_size is too large (by 1 char) + EXPECT_THAT([&] { kvikio::MmapHandle(_filepath, "r", _file_size + 1); }, + ThrowsMessage(HasSubstr("Mapped region is past the end of file"))); + + // init_file_offset is too large (by 1 char) + EXPECT_THAT([=] { kvikio::MmapHandle(_filepath, "r", std::nullopt, _file_size); }, + ThrowsMessage(HasSubstr("Offset is past the end of file"))); + + // init_size is 0 + EXPECT_THAT( + [=] { kvikio::MmapHandle(_filepath, "r", 0); }, + ThrowsMessage(HasSubstr("Mapped region should not be zero byte"))); +} + +TEST_F(MmapTest, constructor_valid_range) +{ + // init_size is exactly equal to file size + EXPECT_NO_THROW({ kvikio::MmapHandle(_filepath, "r", _file_size); }); + + // init_file_offset is exactly on the last char + EXPECT_NO_THROW({ + kvikio::MmapHandle mmap_handle(_filepath, "r", std::nullopt, _file_size - 1); + EXPECT_EQ(mmap_handle.initial_size(), 1); + }); +} + +TEST_F(MmapTest, read_invalid_range) +{ + std::size_t const initial_size{1024}; + std::size_t const initial_file_offset{512}; + std::vector out_host_buf(_file_size / sizeof(value_type), {}); + + // file_offset is too large + EXPECT_THAT( + [&] { + kvikio::MmapHandle mmap_handle(_filepath, "r", initial_size, initial_file_offset); + mmap_handle.read(out_host_buf.data(), initial_size, _file_size); + }, + ThrowsMessage(HasSubstr("Offset is past the end of file"))); + + // file_offset is too small + EXPECT_THAT( + [&] { + kvikio::MmapHandle mmap_handle(_filepath, "r", initial_size, initial_file_offset); + mmap_handle.read(out_host_buf.data(), initial_size, initial_file_offset - 128); + }, + ThrowsMessage(HasSubstr("Read is out of bound"))); + + // size is 0 + EXPECT_THAT( + [&] { + kvikio::MmapHandle mmap_handle(_filepath, "r", initial_size, initial_file_offset); + mmap_handle.read(out_host_buf.data(), 0, initial_file_offset); + }, + ThrowsMessage(HasSubstr("Read size must be greater than 0"))); + + // size is too large + EXPECT_THAT( + [&] { + kvikio::MmapHandle mmap_handle(_filepath, "r", initial_size, initial_file_offset); + mmap_handle.read(out_host_buf.data(), initial_size + 128, initial_file_offset); + }, + ThrowsMessage(HasSubstr("Read is out of bound"))); +} + +TEST_F(MmapTest, read_seq) +{ + auto do_test = [&](std::size_t num_elements_to_skip, std::size_t num_elements_to_read) { + kvikio::MmapHandle mmap_handle(_filepath, "r"); + auto const offset = num_elements_to_skip * sizeof(value_type); + auto const expected_read_size = num_elements_to_read * sizeof(value_type); + + // host + { + std::vector out_host_buf(num_elements_to_read, {}); + auto const read_size = mmap_handle.read(out_host_buf.data(), expected_read_size, offset); + for (std::size_t i = num_elements_to_skip; i < num_elements_to_read; ++i) { + EXPECT_EQ(_host_buf[i], out_host_buf[i - num_elements_to_skip]); + } + EXPECT_EQ(read_size, expected_read_size); + } + + // device + { + kvikio::test::DevBuffer out_device_buf(num_elements_to_read); + auto const read_size = mmap_handle.read(out_device_buf.ptr, expected_read_size, offset); + auto out_host_buf = out_device_buf.to_vector(); + for (std::size_t i = num_elements_to_skip; i < num_elements_to_read; ++i) { + EXPECT_EQ(_host_buf[i], out_host_buf[i - num_elements_to_skip]); + } + EXPECT_EQ(read_size, expected_read_size); + } + }; + + for (const auto& num_elements_to_read : {10, 9999}) { + for (const auto& num_elements_to_skip : {0, 10, 100, 1000, 9999}) { + do_test(num_elements_to_skip, num_elements_to_read); + } + } +} + +TEST_F(MmapTest, read_parallel) +{ + auto do_test = + [&](std::size_t num_elements_to_skip, std::size_t num_elements_to_read, std::size_t task_size) { + kvikio::MmapHandle mmap_handle(_filepath, "r"); + auto const offset = num_elements_to_skip * sizeof(value_type); + auto const expected_read_size = num_elements_to_read * sizeof(value_type); + + // host + { + std::vector out_host_buf(num_elements_to_read, {}); + auto fut = mmap_handle.pread(out_host_buf.data(), expected_read_size, offset, task_size); + auto const read_size = fut.get(); + for (std::size_t i = num_elements_to_skip; i < num_elements_to_read; ++i) { + EXPECT_EQ(_host_buf[i], out_host_buf[i - num_elements_to_skip]); + } + EXPECT_EQ(read_size, expected_read_size); + } + + // device + { + kvikio::test::DevBuffer out_device_buf(num_elements_to_read); + auto fut = mmap_handle.pread(out_device_buf.ptr, expected_read_size, offset); + auto const read_size = fut.get(); + auto out_host_buf = out_device_buf.to_vector(); + for (std::size_t i = num_elements_to_skip; i < num_elements_to_read; ++i) { + EXPECT_EQ(_host_buf[i], out_host_buf[i - num_elements_to_skip]); + } + EXPECT_EQ(read_size, expected_read_size); + } + }; + + std::vector task_sizes{0, 256, 1024, kvikio::defaults::mmap_task_size()}; + for (const auto& task_size : task_sizes) { + for (const auto& num_elements_to_read : {10, 9999}) { + for (const auto& num_elements_to_skip : {0, 10, 100, 1000, 9999}) { + do_test(num_elements_to_skip, num_elements_to_read, task_size); + } + } + } +} + +TEST_F(MmapTest, read_with_default_arguments) +{ + std::size_t num_elements = _file_size / sizeof(value_type); + kvikio::MmapHandle mmap_handle(_filepath, "r"); + + // host + { + std::vector out_host_buf(num_elements, {}); + + { + auto const read_size = mmap_handle.read(out_host_buf.data()); + for (std::size_t i = 0; i < num_elements; ++i) { + EXPECT_EQ(_host_buf[i], out_host_buf[i]); + } + EXPECT_EQ(read_size, _file_size); + } + + { + auto fut = mmap_handle.pread(out_host_buf.data()); + auto const read_size = fut.get(); + for (std::size_t i = 0; i < num_elements; ++i) { + EXPECT_EQ(_host_buf[i], out_host_buf[i]); + } + EXPECT_EQ(read_size, _file_size); + } + } + + // device + { + kvikio::test::DevBuffer out_device_buf(num_elements); + + { + auto const read_size = mmap_handle.read(out_device_buf.ptr); + auto out_host_buf = out_device_buf.to_vector(); + for (std::size_t i = 0; i < num_elements; ++i) { + EXPECT_EQ(_host_buf[i], out_host_buf[i]); + } + EXPECT_EQ(read_size, _file_size); + } + + { + auto fut = mmap_handle.pread(out_device_buf.ptr); + auto const read_size = fut.get(); + auto out_host_buf = out_device_buf.to_vector(); + for (std::size_t i = 0; i < num_elements; ++i) { + EXPECT_EQ(_host_buf[i], out_host_buf[i]); + } + EXPECT_EQ(read_size, _file_size); + } + } +} + +TEST_F(MmapTest, closed_handle) +{ + kvikio::MmapHandle mmap_handle(_filepath, "r"); + mmap_handle.close(); + + EXPECT_TRUE(mmap_handle.closed()); + EXPECT_EQ(mmap_handle.file_size(), 0); + + std::size_t num_elements = _file_size / sizeof(value_type); + std::vector out_host_buf(num_elements, {}); + + EXPECT_THAT([&] { mmap_handle.read(out_host_buf.data()); }, + ThrowsMessage(HasSubstr("Cannot read from a closed MmapHandle"))); + + EXPECT_THAT([&] { mmap_handle.pread(out_host_buf.data()); }, + ThrowsMessage(HasSubstr("Cannot read from a closed MmapHandle"))); +} diff --git a/docs/source/api.rst b/docs/source/api.rst index 5cba4fd8d3..1e19f12bdc 100644 --- a/docs/source/api.rst +++ b/docs/source/api.rst @@ -11,6 +11,10 @@ CuFile .. autoclass:: IOFuture :members: +.. autofunction:: get_page_cache_info + +.. autofunction:: clear_page_cache + CuFile driver ------------- .. currentmodule:: kvikio.cufile_driver diff --git a/python/kvikio/kvikio/__init__.py b/python/kvikio/kvikio/__init__.py index a1f3c483f6..9208d4e3ce 100644 --- a/python/kvikio/kvikio/__init__.py +++ b/python/kvikio/kvikio/__init__.py @@ -14,14 +14,15 @@ from kvikio._lib.defaults import CompatMode # noqa: F401 from kvikio._version import __git_commit__, __version__ -from kvikio.cufile import CuFile, get_page_cache_info +from kvikio.cufile import CuFile, clear_page_cache, get_page_cache_info from kvikio.remote_file import RemoteFile, is_remote_file_available __all__ = [ "__git_commit__", "__version__", + "clear_page_cache", "CuFile", "get_page_cache_info", - "RemoteFile", "is_remote_file_available", + "RemoteFile", ] diff --git a/python/kvikio/kvikio/_lib/file_handle.pyx b/python/kvikio/kvikio/_lib/file_handle.pyx index 6ac3cc14d8..c0d71f36a7 100644 --- a/python/kvikio/kvikio/_lib/file_handle.pyx +++ b/python/kvikio/kvikio/_lib/file_handle.pyx @@ -185,6 +185,10 @@ cdef extern from "" nogil: pair[size_t, size_t] cpp_get_page_cache_info_int \ "kvikio::get_page_cache_info"(int fd) except + + bool cpp_clear_page_cache "kvikio::clear_page_cache" \ + (bool reclaim_dentries_and_inodes, bool clear_dirty_pages) \ + except + + def get_page_cache_info(file: Union[os.PathLike, str, int, io.IOBase]) \ -> tuple[int, int]: @@ -202,3 +206,7 @@ def get_page_cache_info(file: Union[os.PathLike, str, int, io.IOBase]) \ else: raise ValueError("The type of `file` must be `os.PathLike`, `str`, `int`, " "or `io.IOBase`") + + +def clear_page_cache(reclaim_dentries_and_inodes: bool, clear_dirty_pages: bool): + return cpp_clear_page_cache(reclaim_dentries_and_inodes, clear_dirty_pages) diff --git a/python/kvikio/kvikio/cufile.py b/python/kvikio/kvikio/cufile.py index e703c703bc..fd1021d8ca 100644 --- a/python/kvikio/kvikio/cufile.py +++ b/python/kvikio/kvikio/cufile.py @@ -458,3 +458,27 @@ def get_page_cache_info( and the total number of pages. """ return file_handle.get_page_cache_info(file) + + +def clear_page_cache( + reclaim_dentries_and_inodes: bool = True, clear_dirty_pages: bool = True +) -> bool: + """Clear the page cache + + Parameters + ---------- + reclaim_dentries_and_inodes: bool, optional + Whether to free reclaimable slab objects which include dentries and inodes. + + - If `true`, equivalent to executing `/sbin/sysctl vm.drop_caches=3`; + - If `false`, equivalent to executing `/sbin/sysctl vm.drop_caches=1`. + clear_dirty_pages: bool, optional + Whether to trigger the writeback process to clear the dirty pages. If `true`, + `sync` will be called prior to cache dropping. + + Returns + ------- + bool + Whether the page cache has been successfully cleared. + """ + return file_handle.clear_page_cache(reclaim_dentries_and_inodes, clear_dirty_pages)