Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 18 additions & 1 deletion cpp/include/kvikio/stream.hpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* SPDX-FileCopyrightText: Copyright (c) 2023-2025, NVIDIA CORPORATION.
* SPDX-FileCopyrightText: Copyright (c) 2023-2026, NVIDIA CORPORATION.
* SPDX-License-Identifier: Apache-2.0
*/
#pragma once
Expand Down Expand Up @@ -84,4 +84,21 @@ class StreamFuture {
~StreamFuture() noexcept;
};

/**
* @brief Registers the CUDA stream to the cuFile subsystem.
*
* @param stream CUDA stream which queues the async I/O operations
* @param flags Specifies when the I/O parameters become valid (submission time or execution time)
* and what I/O parameters are page-aligned. For details, refer to
* https://docs.nvidia.com/gpudirect-storage/api-reference-guide/index.html#cufilestreamregister
*/
void stream_register(CUstream stream, unsigned flags);

/**
* @brief Deregisters the CUDA stream from the cuFile subsystem.
*
* @param stream CUDA stream which queues the async I/O operations
*/
void stream_deregister(CUstream stream);

} // namespace kvikio
9 changes: 8 additions & 1 deletion cpp/src/stream.cpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* SPDX-FileCopyrightText: Copyright (c) 2025, NVIDIA CORPORATION.
* SPDX-FileCopyrightText: Copyright (c) 2025-2026, NVIDIA CORPORATION.
* SPDX-License-Identifier: Apache-2.0
*/

Expand Down Expand Up @@ -92,4 +92,11 @@ StreamFuture::~StreamFuture() noexcept
}
}

void stream_register(CUstream stream, unsigned flags)
{
cuFileAPI::instance().StreamRegister(stream, flags);
}

void stream_deregister(CUstream stream) { cuFileAPI::instance().StreamDeregister(stream); }

} // namespace kvikio
5 changes: 4 additions & 1 deletion python/kvikio/kvikio/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# SPDX-FileCopyrightText: Copyright (c) 2021-2025, NVIDIA CORPORATION. All rights reserved.
# SPDX-FileCopyrightText: Copyright (c) 2021-2026, NVIDIA CORPORATION. All rights reserved.
# SPDX-License-Identifier: Apache-2.0

# If libkvikio was installed as a wheel, we must request it to load the library symbols.
Expand All @@ -17,6 +17,7 @@
from kvikio.cufile import CuFile, clear_page_cache, get_page_cache_info
from kvikio.mmap import Mmap
from kvikio.remote_file import RemoteEndpointType, RemoteFile, is_remote_file_available
from kvikio.stream import stream_deregister, stream_register

__all__ = [
"__git_commit__",
Expand All @@ -28,4 +29,6 @@
"is_remote_file_available",
"RemoteEndpointType",
"RemoteFile",
"stream_register",
"stream_deregister",
]
4 changes: 2 additions & 2 deletions python/kvikio/kvikio/_lib/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
# =============================================================================
# cmake-format: off
# SPDX-FileCopyrightText: Copyright (c) 2022-2025, NVIDIA CORPORATION.
# SPDX-FileCopyrightText: Copyright (c) 2022-2026, NVIDIA CORPORATION.
# SPDX-License-Identifier: Apache-2.0
# cmake-format: on
# =============================================================================

# Set the list of Cython files to build, one .so per file
set(cython_modules arr.pyx buffer.pyx defaults.pyx cufile_driver.pyx file_handle.pyx future.pyx
mmap.pyx
mmap.pyx stream.pyx
)

if(KvikIO_REMOTE_SUPPORT)
Expand Down
27 changes: 27 additions & 0 deletions python/kvikio/kvikio/_lib/stream.pyx
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# SPDX-FileCopyrightText: Copyright (c) 2026, NVIDIA CORPORATION. All rights reserved.
# SPDX-License-Identifier: Apache-2.0

# distutils: language = c++
# cython: language_level=3


cdef extern from "cuda.h":
ctypedef void* CUstream


cdef extern from "<kvikio/stream.hpp>" nogil:
void cpp_stream_register "kvikio::stream_register"(CUstream stream, unsigned flags) except +
void cpp_stream_deregister "kvikio::stream_deregister"(CUstream stream) except +


def stream_register(stream: int, flags: int) -> None:
cdef CUstream cpp_stream = <CUstream>stream
cdef unsigned int cpp_flags = flags
with nogil:
cpp_stream_register(cpp_stream, cpp_flags)


def stream_deregister(stream: int) -> None:
cdef CUstream cpp_stream = <CUstream>stream
with nogil:
cpp_stream_deregister(cpp_stream)
18 changes: 9 additions & 9 deletions python/kvikio/kvikio/cufile.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# SPDX-FileCopyrightText: Copyright (c) 2022-2025, NVIDIA CORPORATION. All rights reserved.
# SPDX-FileCopyrightText: Copyright (c) 2022-2026, NVIDIA CORPORATION. All rights reserved.
# SPDX-License-Identifier: Apache-2.0

import io
Expand Down Expand Up @@ -286,7 +286,7 @@ def write(
def raw_read_async(
self,
buf,
stream,
raw_stream: int,
size: Optional[int] = None,
file_offset: int = 0,
dev_offset: int = 0,
Expand All @@ -300,8 +300,8 @@ def raw_read_async(
----------
buf: buffer-like or array-like
Device buffer to read into.
stream: cuda.Stream
CUDA stream to perform the read operation asynchronously.
raw_stream: int
Raw CUDA stream to perform the read operation asynchronously.
size: int, optional
Size in bytes to read.
file_offset: int, optional
Expand All @@ -316,12 +316,12 @@ def raw_read_async(
`IOFutureStream.check_bytes_done()`, which will synchronize the associated
stream and return the number of bytes read.
"""
return self._handle.read_async(buf, size, file_offset, dev_offset, stream)
return self._handle.read_async(buf, size, file_offset, dev_offset, raw_stream)

def raw_write_async(
self,
buf,
stream,
raw_stream: int,
size: Optional[int] = None,
file_offset: int = 0,
dev_offset: int = 0,
Expand All @@ -335,8 +335,8 @@ def raw_write_async(
----------
buf: buffer-like or array-like
Device buffer to write to.
stream: cuda.Stream
CUDA stream to perform the write operation asynchronously.
raw_stream: int
Raw CUDA stream to perform the write operation asynchronously.
size: int, optional
Size in bytes to write.
file_offset: int, optional
Expand All @@ -351,7 +351,7 @@ def raw_write_async(
`IOFutureStream.check_bytes_done()`, which will synchronize the associated
stream and return the number of bytes written.
"""
return self._handle.write_async(buf, size, file_offset, dev_offset, stream)
return self._handle.write_async(buf, size, file_offset, dev_offset, raw_stream)

def raw_read(
self,
Expand Down
30 changes: 30 additions & 0 deletions python/kvikio/kvikio/stream.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
# SPDX-FileCopyrightText: Copyright (c) 2026, NVIDIA CORPORATION. All rights reserved.
# SPDX-License-Identifier: Apache-2.0

from kvikio._lib import stream as stream_module # type: ignore


def stream_register(raw_stream: int, flags: int) -> None:
"""Registers the CUDA stream to the cuFile subsystem.
Parameters
----------
raw_stream: int
Raw CUDA stream which queues the async I/O operations
flags: int
Specifies when the I/O parameters become valid (submission time or execution
time) and what I/O parameters are page-aligned. For details, refer to
https://docs.nvidia.com/gpudirect-storage/api-reference-guide/index.html#cufilestreamregister
"""
stream_module.stream_register(raw_stream, flags)


def stream_deregister(raw_stream: int) -> None:
"""Deregisters the CUDA stream from the cuFile subsystem.
Parameters
----------
raw_stream: int
Raw CUDA stream which queues the async I/O operations
"""
stream_module.stream_deregister(raw_stream)
27 changes: 26 additions & 1 deletion python/kvikio/tests/test_async_io.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
# SPDX-FileCopyrightText: Copyright (c) 2024-2025, NVIDIA CORPORATION. All rights reserved.
# SPDX-FileCopyrightText: Copyright (c) 2024-2026, NVIDIA CORPORATION. All rights reserved.
# SPDX-License-Identifier: Apache-2.0

import os

import cupy
import pytest
import utils

import kvikio
import kvikio.defaults
Expand Down Expand Up @@ -60,3 +61,27 @@ def test_read_write(tmp_path, size):
)
cupy.testing.assert_array_equal(a, b)
cupy.testing.assert_array_equal(a, c)


@pytest.mark.cufile
def test_stream_register_deregister(tmp_path):
"""Test the Python API for cuFile stream register and deregister"""
filename = tmp_path / "test-file"
stream = cupy.cuda.Stream()
flags_list = [0x00, 0x01, 0x02, 0x04, 0x08, 0x0F]

for flags in flags_list:
kvikio.stream_register(stream.ptr, flags)

ref = utils.arange_page_aligned(1024 * 1024)
with kvikio.CuFile(filename, "w") as f:
future_stream = f.raw_write_async(ref, stream.ptr)
assert future_stream.check_bytes_done() == ref.nbytes

dev_buf = utils.empty_page_aligned(ref.shape)
with kvikio.CuFile(filename, "r") as f:
future_stream = f.raw_read_async(dev_buf, stream.ptr)
assert future_stream.check_bytes_done() == ref.nbytes
assert cupy.array_equal(dev_buf, ref)

kvikio.stream_deregister(stream.ptr)
67 changes: 66 additions & 1 deletion python/kvikio/tests/utils.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,12 @@
# SPDX-FileCopyrightText: Copyright (c) 2025, NVIDIA CORPORATION. All rights reserved.
# SPDX-FileCopyrightText: Copyright (c) 2025-2026, NVIDIA CORPORATION. All rights reserved.
# SPDX-License-Identifier: Apache-2.0

import math
import socket
from typing import Any

import cupy
import numpy.typing as npt


def localhost() -> str:
Expand All @@ -13,3 +18,63 @@ def find_free_port(host: str = localhost()) -> int:
s.bind((host, 0))
_, port = s.getsockname()
return port


def empty_page_aligned(
shape: int | tuple[int, ...],
dtype: npt.DTypeLike = cupy.float64,
page_size: int = 4096,
) -> cupy.ndarray:
"""Allocate an uninitialized page-aligned CuPy array.

Parameters
----------
shape: int or tuple of ints
Shape of the array. Can be an integer for 1D arrays or a tuple of integers
for multi-dimensional arrays.
dtype: npt.DTypeLike
Data type of the array elements. Defaults to ``cupy.float64``.
page_size: int
Page size in bytes. Defaults to 4096 (4KB).

Returns
-------
An uninitialized CuPy array with page-aligned underlying memory.
"""
resolved_dtype: cupy.dtype[Any] = cupy.dtype(dtype)
if isinstance(shape, int):
shape = (shape,)

size = math.prod(shape) * resolved_dtype.itemsize

# Over-allocate for alignment
backing_mem = cupy.cuda.Memory(size + page_size - 1)
aligned_ptr = (backing_mem.ptr + page_size - 1) & ~(page_size - 1)
aligned_memptr = cupy.cuda.MemoryPointer(backing_mem, aligned_ptr - backing_mem.ptr)

return cupy.ndarray(shape, dtype=dtype, memptr=aligned_memptr)


def arange_page_aligned(
stop: int, dtype: npt.DTypeLike = cupy.float64, page_size: int = 4096
) -> cupy.ndarray:
"""Create a page-aligned CuPy array with incremental values from 0 to ``stop - 1``.

Parameters
----------
stop: int
Number of elements. The array will contain values from 0 to ``stop - 1``
(exclusive upper bound).
dtype: npt.DTypeLike
Data type of the array elements. Defaults to ``cupy.float64``.
page_size: int
Page size in bytes. Defaults to 4096 (4KB).

Returns
-------
A CuPy array with values ``[0, 1, ..., stop-1]`` and page-aligned underlying memory.
"""
arr_aligned = empty_page_aligned(stop, dtype=dtype, page_size=page_size)
arr_unaligned = cupy.arange(stop, dtype=dtype)
cupy.copyto(arr_aligned, arr_unaligned)
return arr_aligned