Skip to content

Commit

Permalink
Initial changes to support cufile stream I/O. (#259)
Browse files Browse the repository at this point in the history
Authors:
  - https://github.com/tell-rebanta
  - Mads R. B. Kristensen (https://github.com/madsbk)

Approvers:
  - Mads R. B. Kristensen (https://github.com/madsbk)

URL: #259
  • Loading branch information
tell-rebanta authored Aug 22, 2023
1 parent 55829bf commit 35424d2
Show file tree
Hide file tree
Showing 4 changed files with 210 additions and 39 deletions.
10 changes: 10 additions & 0 deletions cpp/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,13 @@ else()
set(cuFile_BATCH_API_FOUND TRUE)
endif()
message(STATUS "Found cuFile's Batch API: ${cuFile_BATCH_API_FOUND}")
string(FIND "${CUFILE_H_STR}" "cuFileGetVersion" cuFileGetVersion_location)
if(cuFileGetVersion_location EQUAL "-1")
set(cuFile_STREAM_API_FOUND FALSE)
else()
set(cuFile_STREAM_API_FOUND TRUE)
endif()
message(STATUS "Found cuFile's Stream API: ${cuFile_STREAM_API_FOUND}")
endif()

# library targets
Expand All @@ -91,6 +98,9 @@ if(cuFile_FOUND)
if(cuFile_BATCH_API_FOUND)
target_compile_definitions(kvikio INTERFACE KVIKIO_CUFILE_BATCH_API_FOUND)
endif()
if(cuFile_STREAM_API_FOUND)
target_compile_definitions(kvikio INTERFACE KVIKIO_CUFILE_STREAM_API_FOUND)
endif()
endif()
target_link_libraries(kvikio INTERFACE ${CMAKE_DL_LIBS})
target_compile_features(kvikio INTERFACE cxx_std_17)
Expand Down
116 changes: 77 additions & 39 deletions cpp/examples/basic_io.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#include <kvikio/buffer.hpp>
#include <kvikio/defaults.hpp>
#include <kvikio/driver.hpp>
#include <kvikio/error.hpp>
#include <kvikio/file_handle.hpp>

using namespace std;
Expand All @@ -34,11 +35,13 @@ void check(bool condition)
}
}

constexpr int NELEM = 1000; // Number of elements used throughout the test
constexpr int SIZE = NELEM * sizeof(int); // Size of the memory allocations (in bytes)
constexpr int NELEM = 1024; // Number of elements used throughout the test
constexpr int SIZE = NELEM * sizeof(int); // Size of the memory allocations (in bytes)
constexpr int LARGE_SIZE = 8 * SIZE; // LARGE SIZE to test partial submit (in bytes)

int main()
{
std::size_t io_size = SIZE;
check(cudaSetDevice(0) == cudaSuccess);

cout << "KvikIO defaults: " << endl;
Expand Down Expand Up @@ -152,44 +155,79 @@ int main()
constexpr int batchsize = SIZE / num_ops_in_batch;
kvikio::DriverProperties props;
check(num_ops_in_batch < props.get_max_batch_io_size());

// We open the file as usual.
kvikio::FileHandle f("/tmp/test-file", "r");

// Then we create a batch
auto batch = kvikio::BatchHandle(num_ops_in_batch);

// And submit 4 operations each with its own offset
std::vector<kvikio::BatchOp> ops;
for (int i = 0; i < num_ops_in_batch; ++i) {
ops.push_back(kvikio::BatchOp{.file_handle = f,
.devPtr_base = b_dev,
.file_offset = i * batchsize,
.devPtr_offset = i * batchsize,
.size = batchsize,
.opcode = CUFILE_READ});
}
batch.submit(ops);

// Finally, we wait on all 4 operations to be finished and check the result
auto statuses = batch.status(num_ops_in_batch, num_ops_in_batch);
check(statuses.size() == num_ops_in_batch);
size_t total_read = 0;
for (auto status : statuses) {
check(status.status == CUFILE_COMPLETE);
check(status.ret == batchsize);
total_read += status.ret;
}
check(cudaMemcpy(b, b_dev, SIZE, cudaMemcpyDeviceToHost) == cudaSuccess);
for (int i = 0; i < NELEM; ++i) {
check(a[i] == b[i]);
{
// We open the file as usual.
kvikio::FileHandle f("/tmp/test-file", "r");

// Then we create a batch
auto batch = kvikio::BatchHandle(num_ops_in_batch);

// And submit 4 operations each with its own offset
std::vector<kvikio::BatchOp> ops;
for (int i = 0; i < num_ops_in_batch; ++i) {
ops.push_back(kvikio::BatchOp{.file_handle = f,
.devPtr_base = b_dev,
.file_offset = i * batchsize,
.devPtr_offset = i * batchsize,
.size = batchsize,
.opcode = CUFILE_READ});
}
batch.submit(ops);

// Finally, we wait on all 4 operations to be finished and check the result
auto statuses = batch.status(num_ops_in_batch, num_ops_in_batch);
check(statuses.size() == num_ops_in_batch);
size_t total_read = 0;
for (auto status : statuses) {
check(status.status == CUFILE_COMPLETE);
check(status.ret == batchsize);
total_read += status.ret;
}
check(cudaMemcpy(b, b_dev, SIZE, cudaMemcpyDeviceToHost) == cudaSuccess);
for (int i = 0; i < NELEM; ++i) {
check(a[i] == b[i]);
}
cout << "Batch read using 4 operations: " << total_read << endl;

batch.submit(ops);
batch.cancel();
statuses = batch.status(num_ops_in_batch, num_ops_in_batch);
check(statuses.empty());
cout << "Batch canceling of all 4 operations" << endl;
}
cout << "Batch read using 4 operations: " << total_read << endl;
}

batch.submit(ops);
batch.cancel();
statuses = batch.status(num_ops_in_batch, num_ops_in_batch);
check(statuses.empty());
cout << "Batch canceling of all 4 operations" << endl;
cout << "stream : " << kvikio::is_stream_available() << endl;
if (kvikio::is_stream_available()) {
{
cout << "Performing stream I/O using file handle" << endl;
off_t f_off = 0, d_off = 0;
ssize_t bytes_done;
CUstream stream;
check(cudaStreamCreate(&stream) == cudaSuccess);
kvikio::FileHandle f_handle("/data/test-file", "w+", kvikio::FileHandle::m644, false);
check(cudaMemcpy(a_dev, a, SIZE, cudaMemcpyHostToDevice) == cudaSuccess);

/*
* For stream based I/Os, buffer registration is not mandatory. However,
* it gives a better performance.
*/

kvikio::buffer_register(a_dev, SIZE);
f_handle.write_async(a_dev, &io_size, &f_off, &d_off, &bytes_done, stream);
check(cudaStreamSynchronize(stream) == cudaSuccess);
check(bytes_done == SIZE);
cout << "File stream Write : " << bytes_done << endl;
kvikio::buffer_deregister(a_dev);

/* Read */
bytes_done = 0;
kvikio::buffer_register(c_dev, SIZE);
f_handle.read_async(c_dev, &io_size, &f_off, &d_off, &bytes_done, stream);
check(cudaStreamSynchronize(stream) == cudaSuccess);
check(bytes_done == SIZE);
cout << "File stream Read : " << bytes_done << endl;
kvikio::buffer_deregister(c_dev);
}
}
}
81 changes: 81 additions & 0 deletions cpp/include/kvikio/file_handle.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -496,6 +496,87 @@ class FileHandle {
return parallel_io(op, devPtr_base, size, file_offset, task_size, devPtr_offset);
}

/**
* @brief Reads specified bytes from the file into the device memory.
*
* This API reads size bytes asynchronously from the file into device memory writing
* to a specified offset using GDS functionality. The API works correctly for unaligned
* offset and data sizes, although the performance is not on-par with aligned read.
* This is an asynchronous call and will be executed in sequence for the specified stream.
*
* @note For the `devPtr_offset`, if data will be read starting exactly from the
* `devPtr_base` that is registered with `buffer_register`, `devPtr_offset` should
* be set to 0. To read starting from an offset in the registered buffer range,
* the relative offset should be specified in the `devPtr_offset`, and the
* `devPtr_base` must remain set to the base address that was used in the
* `buffer_register` call.
*
* @param devPtr_base Base address of buffer in device memory. For registered buffers,
* `devPtr_base` must remain set to the base address used in the `buffer_register` call.
* @param size Size in bytes to read.
* @param file_offset Offset in the file to read from.
* @param devPtr_offset Offset relative to the `devPtr_base` pointer to read into.
* This parameter should be used only with registered buffers.
* @param bytes_read number of bytes that were successfully read.
* @param stream associated stream for this I/O.
*/
inline void read_async(void* devPtr_base,
std::size_t* size,
off_t* file_offset,
off_t* devPtr_offset,
ssize_t* bytes_read,
CUstream stream)
{
#ifdef KVIKIO_CUFILE_STREAM_API_FOUND
CUFILE_TRY(cuFileAPI::instance().ReadAsync(
_handle, devPtr_base, size, file_offset, devPtr_offset, bytes_read, stream));
return;
#else
throw CUfileException("KvikIO not compiled with stream support.");
#endif
}

/**
* @brief Writes specified bytes from the device memory into the file.
*
* This API writes asynchronously the data from the GPU memory to the file at a specified offset
* and size bytes by using GDS functionality. The API works correctly for unaligned
* offset and data sizes, although the performance is not on-par with aligned writes.
* This is an asynchronous call and will be executed in sequence for the specified stream.
*
* @note GDS functionality modified the standard file system metadata in SysMem.
* However, GDS functionality does not take any special responsibility for writing
* that metadata back to permanent storage. The data is not guaranteed to be present
* after a system crash unless the application uses an explicit `fsync(2)` call. If the
* file is opened with an `O_SYNC` flag, the metadata will be written to the disk before
* the call is complete.
* Refer to the note in read for more information about `devPtr_offset`.
*
* @param devPtr_base Base address of buffer in device memory. For registered buffers,
* `devPtr_base` must remain set to the base address used in the `buffer_register` call.
* @param size Size in bytes to write.
* @param file_offset Offset in the file to write at.
* @param devPtr_offset Offset relative to the `devPtr_base` pointer to write from.
* This parameter should be used only with registered buffers.
* @param bytes_written number of bytes that were successfully written.
* @param stream associated stream for this I/O.
*/
inline void write_async(void* devPtr_base,
std::size_t* size,
off_t* file_offset,
off_t* devPtr_offset,
ssize_t* bytes_written,
CUstream stream)
{
#ifdef KVIKIO_CUFILE_STREAM_API_FOUND
CUFILE_TRY(cuFileAPI::instance().WriteAsync(
_handle, devPtr_base, size, file_offset, devPtr_offset, bytes_written, stream));
return;
#else
throw CUfileException("KvikIO not compiled with stream support.");
#endif
}

/**
* @brief Returns `true` if the compatibility mode has been enabled for this file.
*
Expand Down
42 changes: 42 additions & 0 deletions cpp/include/kvikio/shim/cufile.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,15 @@ class cuFileAPI {
#endif
bool batch_available = false;

#ifdef KVIKIO_CUFILE_STREAM_API_FOUND
decltype(cuFileGetVersion)* GetVersion{nullptr};
decltype(cuFileReadAsync)* ReadAsync{nullptr};
decltype(cuFileWriteAsync)* WriteAsync{nullptr};
decltype(cuFileStreamRegister)* StreamRegister{nullptr};
decltype(cuFileStreamDeregister)* StreamDeregister{nullptr};
#endif
bool stream_available = false;

private:
cuFileAPI()
{
Expand Down Expand Up @@ -103,6 +112,21 @@ class cuFileAPI {
}
#endif

#ifdef KVIKIO_CUFILE_STREAM_API_FOUND
get_symbol(GetVersion, lib, KVIKIO_STRINGIFY(cuFileGetVersion));
get_symbol(ReadAsync, lib, KVIKIO_STRINGIFY(cuFileReadAsync));
get_symbol(WriteAsync, lib, KVIKIO_STRINGIFY(cuFileWriteAsync));
get_symbol(StreamRegister, lib, KVIKIO_STRINGIFY(cuFileStreamRegister));
get_symbol(StreamDeregister, lib, KVIKIO_STRINGIFY(cuFileStreamDeregister));
try {
void* s{};
get_symbol(s, lib, "cuFileGetVersion");
stream_available = true;
} catch (const std::runtime_error&) {
stream_available = false;
}
#endif

// cuFile is supposed to open and close the driver automatically but because of a bug in
// CUDA 11.8, it sometimes segfault. See <https://github.com/rapidsai/kvikio/issues/159>.
CUfileError_t const error = DriverOpen();
Expand Down Expand Up @@ -186,4 +210,22 @@ inline bool is_batch_available()
constexpr bool is_batch_available() { return false; }
#endif

/**
* @brief Check if cuFile's stream API is available
*
* @return The boolean answer
*/
#ifdef KVIKIO_CUFILE_STREAM_API_FOUND
inline bool is_stream_available()
{
try {
return cuFileAPI::instance().stream_available;
} catch (const std::runtime_error&) {
return false;
}
}
#else
constexpr bool is_stream_available() { return false; }
#endif

} // namespace kvikio

0 comments on commit 35424d2

Please sign in to comment.