diff --git a/cpp/include/kvikio/stream.hpp b/cpp/include/kvikio/stream.hpp index 12bef04342..25dcec29dc 100644 --- a/cpp/include/kvikio/stream.hpp +++ b/cpp/include/kvikio/stream.hpp @@ -1,5 +1,5 @@ /* - * SPDX-FileCopyrightText: Copyright (c) 2023-2025, NVIDIA CORPORATION. + * SPDX-FileCopyrightText: Copyright (c) 2023-2026, NVIDIA CORPORATION. * SPDX-License-Identifier: Apache-2.0 */ #pragma once @@ -84,4 +84,21 @@ class StreamFuture { ~StreamFuture() noexcept; }; +/** + * @brief Registers the CUDA stream to the cuFile subsystem. + * + * @param stream CUDA stream which queues the async I/O operations + * @param flags Specifies when the I/O parameters become valid (submission time or execution time) + * and what I/O parameters are page-aligned. For details, refer to + * https://docs.nvidia.com/gpudirect-storage/api-reference-guide/index.html#cufilestreamregister + */ +void stream_register(CUstream stream, unsigned flags); + +/** + * @brief Deregisters the CUDA stream from the cuFile subsystem. + * + * @param stream CUDA stream which queues the async I/O operations + */ +void stream_deregister(CUstream stream); + } // namespace kvikio diff --git a/cpp/src/stream.cpp b/cpp/src/stream.cpp index 71b7f544f4..9aa33fbe04 100644 --- a/cpp/src/stream.cpp +++ b/cpp/src/stream.cpp @@ -1,5 +1,5 @@ /* - * SPDX-FileCopyrightText: Copyright (c) 2025, NVIDIA CORPORATION. + * SPDX-FileCopyrightText: Copyright (c) 2025-2026, NVIDIA CORPORATION. * SPDX-License-Identifier: Apache-2.0 */ @@ -92,4 +92,11 @@ StreamFuture::~StreamFuture() noexcept } } +void stream_register(CUstream stream, unsigned flags) +{ + cuFileAPI::instance().StreamRegister(stream, flags); +} + +void stream_deregister(CUstream stream) { cuFileAPI::instance().StreamDeregister(stream); } + } // namespace kvikio diff --git a/python/kvikio/kvikio/__init__.py b/python/kvikio/kvikio/__init__.py index 9765101216..ac13c2ce74 100644 --- a/python/kvikio/kvikio/__init__.py +++ b/python/kvikio/kvikio/__init__.py @@ -1,4 +1,4 @@ -# SPDX-FileCopyrightText: Copyright (c) 2021-2025, NVIDIA CORPORATION. All rights reserved. +# SPDX-FileCopyrightText: Copyright (c) 2021-2026, NVIDIA CORPORATION. All rights reserved. # SPDX-License-Identifier: Apache-2.0 # If libkvikio was installed as a wheel, we must request it to load the library symbols. @@ -17,6 +17,7 @@ from kvikio.cufile import CuFile, clear_page_cache, get_page_cache_info from kvikio.mmap import Mmap from kvikio.remote_file import RemoteEndpointType, RemoteFile, is_remote_file_available +from kvikio.stream import stream_deregister, stream_register __all__ = [ "__git_commit__", @@ -28,4 +29,6 @@ "is_remote_file_available", "RemoteEndpointType", "RemoteFile", + "stream_register", + "stream_deregister", ] diff --git a/python/kvikio/kvikio/_lib/CMakeLists.txt b/python/kvikio/kvikio/_lib/CMakeLists.txt index fe640ecfd7..d98cf5b047 100644 --- a/python/kvikio/kvikio/_lib/CMakeLists.txt +++ b/python/kvikio/kvikio/_lib/CMakeLists.txt @@ -1,13 +1,13 @@ # ============================================================================= # cmake-format: off -# SPDX-FileCopyrightText: Copyright (c) 2022-2025, NVIDIA CORPORATION. +# SPDX-FileCopyrightText: Copyright (c) 2022-2026, NVIDIA CORPORATION. # SPDX-License-Identifier: Apache-2.0 # cmake-format: on # ============================================================================= # Set the list of Cython files to build, one .so per file set(cython_modules arr.pyx buffer.pyx defaults.pyx cufile_driver.pyx file_handle.pyx future.pyx - mmap.pyx + mmap.pyx stream.pyx ) if(KvikIO_REMOTE_SUPPORT) diff --git a/python/kvikio/kvikio/_lib/stream.pyx b/python/kvikio/kvikio/_lib/stream.pyx new file mode 100644 index 0000000000..f932b363ad --- /dev/null +++ b/python/kvikio/kvikio/_lib/stream.pyx @@ -0,0 +1,27 @@ +# SPDX-FileCopyrightText: Copyright (c) 2026, NVIDIA CORPORATION. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 + +# distutils: language = c++ +# cython: language_level=3 + + +cdef extern from "cuda.h": + ctypedef void* CUstream + + +cdef extern from "" nogil: + void cpp_stream_register "kvikio::stream_register"(CUstream stream, unsigned flags) except + + void cpp_stream_deregister "kvikio::stream_deregister"(CUstream stream) except + + + +def stream_register(stream: int, flags: int) -> None: + cdef CUstream cpp_stream = stream + cdef unsigned int cpp_flags = flags + with nogil: + cpp_stream_register(cpp_stream, cpp_flags) + + +def stream_deregister(stream: int) -> None: + cdef CUstream cpp_stream = stream + with nogil: + cpp_stream_deregister(cpp_stream) diff --git a/python/kvikio/kvikio/cufile.py b/python/kvikio/kvikio/cufile.py index 0aa16695c9..f1be71132b 100644 --- a/python/kvikio/kvikio/cufile.py +++ b/python/kvikio/kvikio/cufile.py @@ -1,4 +1,4 @@ -# SPDX-FileCopyrightText: Copyright (c) 2022-2025, NVIDIA CORPORATION. All rights reserved. +# SPDX-FileCopyrightText: Copyright (c) 2022-2026, NVIDIA CORPORATION. All rights reserved. # SPDX-License-Identifier: Apache-2.0 import io @@ -286,7 +286,7 @@ def write( def raw_read_async( self, buf, - stream, + raw_stream: int, size: Optional[int] = None, file_offset: int = 0, dev_offset: int = 0, @@ -300,8 +300,8 @@ def raw_read_async( ---------- buf: buffer-like or array-like Device buffer to read into. - stream: cuda.Stream - CUDA stream to perform the read operation asynchronously. + raw_stream: int + Raw CUDA stream to perform the read operation asynchronously. size: int, optional Size in bytes to read. file_offset: int, optional @@ -316,12 +316,12 @@ def raw_read_async( `IOFutureStream.check_bytes_done()`, which will synchronize the associated stream and return the number of bytes read. """ - return self._handle.read_async(buf, size, file_offset, dev_offset, stream) + return self._handle.read_async(buf, size, file_offset, dev_offset, raw_stream) def raw_write_async( self, buf, - stream, + raw_stream: int, size: Optional[int] = None, file_offset: int = 0, dev_offset: int = 0, @@ -335,8 +335,8 @@ def raw_write_async( ---------- buf: buffer-like or array-like Device buffer to write to. - stream: cuda.Stream - CUDA stream to perform the write operation asynchronously. + raw_stream: int + Raw CUDA stream to perform the write operation asynchronously. size: int, optional Size in bytes to write. file_offset: int, optional @@ -351,7 +351,7 @@ def raw_write_async( `IOFutureStream.check_bytes_done()`, which will synchronize the associated stream and return the number of bytes written. """ - return self._handle.write_async(buf, size, file_offset, dev_offset, stream) + return self._handle.write_async(buf, size, file_offset, dev_offset, raw_stream) def raw_read( self, diff --git a/python/kvikio/kvikio/stream.py b/python/kvikio/kvikio/stream.py new file mode 100644 index 0000000000..5a02d238a3 --- /dev/null +++ b/python/kvikio/kvikio/stream.py @@ -0,0 +1,30 @@ +# SPDX-FileCopyrightText: Copyright (c) 2026, NVIDIA CORPORATION. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 + +from kvikio._lib import stream as stream_module # type: ignore + + +def stream_register(raw_stream: int, flags: int) -> None: + """Registers the CUDA stream to the cuFile subsystem. + + Parameters + ---------- + raw_stream: int + Raw CUDA stream which queues the async I/O operations + flags: int + Specifies when the I/O parameters become valid (submission time or execution + time) and what I/O parameters are page-aligned. For details, refer to + https://docs.nvidia.com/gpudirect-storage/api-reference-guide/index.html#cufilestreamregister + """ + stream_module.stream_register(raw_stream, flags) + + +def stream_deregister(raw_stream: int) -> None: + """Deregisters the CUDA stream from the cuFile subsystem. + + Parameters + ---------- + raw_stream: int + Raw CUDA stream which queues the async I/O operations + """ + stream_module.stream_deregister(raw_stream) diff --git a/python/kvikio/tests/test_async_io.py b/python/kvikio/tests/test_async_io.py index 388a2019f6..1410f32e77 100644 --- a/python/kvikio/tests/test_async_io.py +++ b/python/kvikio/tests/test_async_io.py @@ -1,10 +1,11 @@ -# SPDX-FileCopyrightText: Copyright (c) 2024-2025, NVIDIA CORPORATION. All rights reserved. +# SPDX-FileCopyrightText: Copyright (c) 2024-2026, NVIDIA CORPORATION. All rights reserved. # SPDX-License-Identifier: Apache-2.0 import os import cupy import pytest +import utils import kvikio import kvikio.defaults @@ -60,3 +61,27 @@ def test_read_write(tmp_path, size): ) cupy.testing.assert_array_equal(a, b) cupy.testing.assert_array_equal(a, c) + + +@pytest.mark.cufile +def test_stream_register_deregister(tmp_path): + """Test the Python API for cuFile stream register and deregister""" + filename = tmp_path / "test-file" + stream = cupy.cuda.Stream() + flags_list = [0x00, 0x01, 0x02, 0x04, 0x08, 0x0F] + + for flags in flags_list: + kvikio.stream_register(stream.ptr, flags) + + ref = utils.arange_page_aligned(1024 * 1024) + with kvikio.CuFile(filename, "w") as f: + future_stream = f.raw_write_async(ref, stream.ptr) + assert future_stream.check_bytes_done() == ref.nbytes + + dev_buf = utils.empty_page_aligned(ref.shape) + with kvikio.CuFile(filename, "r") as f: + future_stream = f.raw_read_async(dev_buf, stream.ptr) + assert future_stream.check_bytes_done() == ref.nbytes + assert cupy.array_equal(dev_buf, ref) + + kvikio.stream_deregister(stream.ptr) diff --git a/python/kvikio/tests/utils.py b/python/kvikio/tests/utils.py index 1b1368a421..d60ff6f0cd 100644 --- a/python/kvikio/tests/utils.py +++ b/python/kvikio/tests/utils.py @@ -1,7 +1,12 @@ -# SPDX-FileCopyrightText: Copyright (c) 2025, NVIDIA CORPORATION. All rights reserved. +# SPDX-FileCopyrightText: Copyright (c) 2025-2026, NVIDIA CORPORATION. All rights reserved. # SPDX-License-Identifier: Apache-2.0 +import math import socket +from typing import Any + +import cupy +import numpy.typing as npt def localhost() -> str: @@ -13,3 +18,63 @@ def find_free_port(host: str = localhost()) -> int: s.bind((host, 0)) _, port = s.getsockname() return port + + +def empty_page_aligned( + shape: int | tuple[int, ...], + dtype: npt.DTypeLike = cupy.float64, + page_size: int = 4096, +) -> cupy.ndarray: + """Allocate an uninitialized page-aligned CuPy array. + + Parameters + ---------- + shape: int or tuple of ints + Shape of the array. Can be an integer for 1D arrays or a tuple of integers + for multi-dimensional arrays. + dtype: npt.DTypeLike + Data type of the array elements. Defaults to ``cupy.float64``. + page_size: int + Page size in bytes. Defaults to 4096 (4KB). + + Returns + ------- + An uninitialized CuPy array with page-aligned underlying memory. + """ + resolved_dtype: cupy.dtype[Any] = cupy.dtype(dtype) + if isinstance(shape, int): + shape = (shape,) + + size = math.prod(shape) * resolved_dtype.itemsize + + # Over-allocate for alignment + backing_mem = cupy.cuda.Memory(size + page_size - 1) + aligned_ptr = (backing_mem.ptr + page_size - 1) & ~(page_size - 1) + aligned_memptr = cupy.cuda.MemoryPointer(backing_mem, aligned_ptr - backing_mem.ptr) + + return cupy.ndarray(shape, dtype=dtype, memptr=aligned_memptr) + + +def arange_page_aligned( + stop: int, dtype: npt.DTypeLike = cupy.float64, page_size: int = 4096 +) -> cupy.ndarray: + """Create a page-aligned CuPy array with incremental values from 0 to ``stop - 1``. + + Parameters + ---------- + stop: int + Number of elements. The array will contain values from 0 to ``stop - 1`` + (exclusive upper bound). + dtype: npt.DTypeLike + Data type of the array elements. Defaults to ``cupy.float64``. + page_size: int + Page size in bytes. Defaults to 4096 (4KB). + + Returns + ------- + A CuPy array with values ``[0, 1, ..., stop-1]`` and page-aligned underlying memory. + """ + arr_aligned = empty_page_aligned(stop, dtype=dtype, page_size=page_size) + arr_unaligned = cupy.arange(stop, dtype=dtype) + cupy.copyto(arr_aligned, arr_unaligned) + return arr_aligned