From 35424d220cbb1382015c102f272fa9c168e071e3 Mon Sep 17 00:00:00 2001 From: tell-rebanta <75395987+tell-rebanta@users.noreply.github.com> Date: Mon, 21 Aug 2023 23:15:43 -0700 Subject: [PATCH] Initial changes to support cufile stream I/O. (#259) Authors: - https://github.com/tell-rebanta - Mads R. B. Kristensen (https://github.com/madsbk) Approvers: - Mads R. B. Kristensen (https://github.com/madsbk) URL: https://github.com/rapidsai/kvikio/pull/259 --- cpp/CMakeLists.txt | 10 +++ cpp/examples/basic_io.cpp | 116 +++++++++++++++++++---------- cpp/include/kvikio/file_handle.hpp | 81 ++++++++++++++++++++ cpp/include/kvikio/shim/cufile.hpp | 42 +++++++++++ 4 files changed, 210 insertions(+), 39 deletions(-) diff --git a/cpp/CMakeLists.txt b/cpp/CMakeLists.txt index ad44b65597..149f21b981 100644 --- a/cpp/CMakeLists.txt +++ b/cpp/CMakeLists.txt @@ -71,6 +71,13 @@ else() set(cuFile_BATCH_API_FOUND TRUE) endif() message(STATUS "Found cuFile's Batch API: ${cuFile_BATCH_API_FOUND}") + string(FIND "${CUFILE_H_STR}" "cuFileGetVersion" cuFileGetVersion_location) + if(cuFileGetVersion_location EQUAL "-1") + set(cuFile_STREAM_API_FOUND FALSE) + else() + set(cuFile_STREAM_API_FOUND TRUE) + endif() + message(STATUS "Found cuFile's Stream API: ${cuFile_STREAM_API_FOUND}") endif() # library targets @@ -91,6 +98,9 @@ if(cuFile_FOUND) if(cuFile_BATCH_API_FOUND) target_compile_definitions(kvikio INTERFACE KVIKIO_CUFILE_BATCH_API_FOUND) endif() + if(cuFile_STREAM_API_FOUND) + target_compile_definitions(kvikio INTERFACE KVIKIO_CUFILE_STREAM_API_FOUND) + endif() endif() target_link_libraries(kvikio INTERFACE ${CMAKE_DL_LIBS}) target_compile_features(kvikio INTERFACE cxx_std_17) diff --git a/cpp/examples/basic_io.cpp b/cpp/examples/basic_io.cpp index e1fb550fc6..ec36ce32c6 100644 --- a/cpp/examples/basic_io.cpp +++ b/cpp/examples/basic_io.cpp @@ -22,6 +22,7 @@ #include #include #include +#include #include using namespace std; @@ -34,11 +35,13 @@ void check(bool condition) } } -constexpr int NELEM = 1000; // Number of elements used throughout the test -constexpr int SIZE = NELEM * sizeof(int); // Size of the memory allocations (in bytes) +constexpr int NELEM = 1024; // Number of elements used throughout the test +constexpr int SIZE = NELEM * sizeof(int); // Size of the memory allocations (in bytes) +constexpr int LARGE_SIZE = 8 * SIZE; // LARGE SIZE to test partial submit (in bytes) int main() { + std::size_t io_size = SIZE; check(cudaSetDevice(0) == cudaSuccess); cout << "KvikIO defaults: " << endl; @@ -152,44 +155,79 @@ int main() constexpr int batchsize = SIZE / num_ops_in_batch; kvikio::DriverProperties props; check(num_ops_in_batch < props.get_max_batch_io_size()); - - // We open the file as usual. - kvikio::FileHandle f("/tmp/test-file", "r"); - - // Then we create a batch - auto batch = kvikio::BatchHandle(num_ops_in_batch); - - // And submit 4 operations each with its own offset - std::vector ops; - for (int i = 0; i < num_ops_in_batch; ++i) { - ops.push_back(kvikio::BatchOp{.file_handle = f, - .devPtr_base = b_dev, - .file_offset = i * batchsize, - .devPtr_offset = i * batchsize, - .size = batchsize, - .opcode = CUFILE_READ}); - } - batch.submit(ops); - - // Finally, we wait on all 4 operations to be finished and check the result - auto statuses = batch.status(num_ops_in_batch, num_ops_in_batch); - check(statuses.size() == num_ops_in_batch); - size_t total_read = 0; - for (auto status : statuses) { - check(status.status == CUFILE_COMPLETE); - check(status.ret == batchsize); - total_read += status.ret; - } - check(cudaMemcpy(b, b_dev, SIZE, cudaMemcpyDeviceToHost) == cudaSuccess); - for (int i = 0; i < NELEM; ++i) { - check(a[i] == b[i]); + { + // We open the file as usual. + kvikio::FileHandle f("/tmp/test-file", "r"); + + // Then we create a batch + auto batch = kvikio::BatchHandle(num_ops_in_batch); + + // And submit 4 operations each with its own offset + std::vector ops; + for (int i = 0; i < num_ops_in_batch; ++i) { + ops.push_back(kvikio::BatchOp{.file_handle = f, + .devPtr_base = b_dev, + .file_offset = i * batchsize, + .devPtr_offset = i * batchsize, + .size = batchsize, + .opcode = CUFILE_READ}); + } + batch.submit(ops); + + // Finally, we wait on all 4 operations to be finished and check the result + auto statuses = batch.status(num_ops_in_batch, num_ops_in_batch); + check(statuses.size() == num_ops_in_batch); + size_t total_read = 0; + for (auto status : statuses) { + check(status.status == CUFILE_COMPLETE); + check(status.ret == batchsize); + total_read += status.ret; + } + check(cudaMemcpy(b, b_dev, SIZE, cudaMemcpyDeviceToHost) == cudaSuccess); + for (int i = 0; i < NELEM; ++i) { + check(a[i] == b[i]); + } + cout << "Batch read using 4 operations: " << total_read << endl; + + batch.submit(ops); + batch.cancel(); + statuses = batch.status(num_ops_in_batch, num_ops_in_batch); + check(statuses.empty()); + cout << "Batch canceling of all 4 operations" << endl; } - cout << "Batch read using 4 operations: " << total_read << endl; + } - batch.submit(ops); - batch.cancel(); - statuses = batch.status(num_ops_in_batch, num_ops_in_batch); - check(statuses.empty()); - cout << "Batch canceling of all 4 operations" << endl; + cout << "stream : " << kvikio::is_stream_available() << endl; + if (kvikio::is_stream_available()) { + { + cout << "Performing stream I/O using file handle" << endl; + off_t f_off = 0, d_off = 0; + ssize_t bytes_done; + CUstream stream; + check(cudaStreamCreate(&stream) == cudaSuccess); + kvikio::FileHandle f_handle("/data/test-file", "w+", kvikio::FileHandle::m644, false); + check(cudaMemcpy(a_dev, a, SIZE, cudaMemcpyHostToDevice) == cudaSuccess); + + /* + * For stream based I/Os, buffer registration is not mandatory. However, + * it gives a better performance. + */ + + kvikio::buffer_register(a_dev, SIZE); + f_handle.write_async(a_dev, &io_size, &f_off, &d_off, &bytes_done, stream); + check(cudaStreamSynchronize(stream) == cudaSuccess); + check(bytes_done == SIZE); + cout << "File stream Write : " << bytes_done << endl; + kvikio::buffer_deregister(a_dev); + + /* Read */ + bytes_done = 0; + kvikio::buffer_register(c_dev, SIZE); + f_handle.read_async(c_dev, &io_size, &f_off, &d_off, &bytes_done, stream); + check(cudaStreamSynchronize(stream) == cudaSuccess); + check(bytes_done == SIZE); + cout << "File stream Read : " << bytes_done << endl; + kvikio::buffer_deregister(c_dev); + } } } diff --git a/cpp/include/kvikio/file_handle.hpp b/cpp/include/kvikio/file_handle.hpp index b63d12c5e7..e2e61ffac2 100644 --- a/cpp/include/kvikio/file_handle.hpp +++ b/cpp/include/kvikio/file_handle.hpp @@ -496,6 +496,87 @@ class FileHandle { return parallel_io(op, devPtr_base, size, file_offset, task_size, devPtr_offset); } + /** + * @brief Reads specified bytes from the file into the device memory. + * + * This API reads size bytes asynchronously from the file into device memory writing + * to a specified offset using GDS functionality. The API works correctly for unaligned + * offset and data sizes, although the performance is not on-par with aligned read. + * This is an asynchronous call and will be executed in sequence for the specified stream. + * + * @note For the `devPtr_offset`, if data will be read starting exactly from the + * `devPtr_base` that is registered with `buffer_register`, `devPtr_offset` should + * be set to 0. To read starting from an offset in the registered buffer range, + * the relative offset should be specified in the `devPtr_offset`, and the + * `devPtr_base` must remain set to the base address that was used in the + * `buffer_register` call. + * + * @param devPtr_base Base address of buffer in device memory. For registered buffers, + * `devPtr_base` must remain set to the base address used in the `buffer_register` call. + * @param size Size in bytes to read. + * @param file_offset Offset in the file to read from. + * @param devPtr_offset Offset relative to the `devPtr_base` pointer to read into. + * This parameter should be used only with registered buffers. + * @param bytes_read number of bytes that were successfully read. + * @param stream associated stream for this I/O. + */ + inline void read_async(void* devPtr_base, + std::size_t* size, + off_t* file_offset, + off_t* devPtr_offset, + ssize_t* bytes_read, + CUstream stream) + { +#ifdef KVIKIO_CUFILE_STREAM_API_FOUND + CUFILE_TRY(cuFileAPI::instance().ReadAsync( + _handle, devPtr_base, size, file_offset, devPtr_offset, bytes_read, stream)); + return; +#else + throw CUfileException("KvikIO not compiled with stream support."); +#endif + } + + /** + * @brief Writes specified bytes from the device memory into the file. + * + * This API writes asynchronously the data from the GPU memory to the file at a specified offset + * and size bytes by using GDS functionality. The API works correctly for unaligned + * offset and data sizes, although the performance is not on-par with aligned writes. + * This is an asynchronous call and will be executed in sequence for the specified stream. + * + * @note GDS functionality modified the standard file system metadata in SysMem. + * However, GDS functionality does not take any special responsibility for writing + * that metadata back to permanent storage. The data is not guaranteed to be present + * after a system crash unless the application uses an explicit `fsync(2)` call. If the + * file is opened with an `O_SYNC` flag, the metadata will be written to the disk before + * the call is complete. + * Refer to the note in read for more information about `devPtr_offset`. + * + * @param devPtr_base Base address of buffer in device memory. For registered buffers, + * `devPtr_base` must remain set to the base address used in the `buffer_register` call. + * @param size Size in bytes to write. + * @param file_offset Offset in the file to write at. + * @param devPtr_offset Offset relative to the `devPtr_base` pointer to write from. + * This parameter should be used only with registered buffers. + * @param bytes_written number of bytes that were successfully written. + * @param stream associated stream for this I/O. + */ + inline void write_async(void* devPtr_base, + std::size_t* size, + off_t* file_offset, + off_t* devPtr_offset, + ssize_t* bytes_written, + CUstream stream) + { +#ifdef KVIKIO_CUFILE_STREAM_API_FOUND + CUFILE_TRY(cuFileAPI::instance().WriteAsync( + _handle, devPtr_base, size, file_offset, devPtr_offset, bytes_written, stream)); + return; +#else + throw CUfileException("KvikIO not compiled with stream support."); +#endif + } + /** * @brief Returns `true` if the compatibility mode has been enabled for this file. * diff --git a/cpp/include/kvikio/shim/cufile.hpp b/cpp/include/kvikio/shim/cufile.hpp index 4791b39cd4..08851b0994 100644 --- a/cpp/include/kvikio/shim/cufile.hpp +++ b/cpp/include/kvikio/shim/cufile.hpp @@ -56,6 +56,15 @@ class cuFileAPI { #endif bool batch_available = false; +#ifdef KVIKIO_CUFILE_STREAM_API_FOUND + decltype(cuFileGetVersion)* GetVersion{nullptr}; + decltype(cuFileReadAsync)* ReadAsync{nullptr}; + decltype(cuFileWriteAsync)* WriteAsync{nullptr}; + decltype(cuFileStreamRegister)* StreamRegister{nullptr}; + decltype(cuFileStreamDeregister)* StreamDeregister{nullptr}; +#endif + bool stream_available = false; + private: cuFileAPI() { @@ -103,6 +112,21 @@ class cuFileAPI { } #endif +#ifdef KVIKIO_CUFILE_STREAM_API_FOUND + get_symbol(GetVersion, lib, KVIKIO_STRINGIFY(cuFileGetVersion)); + get_symbol(ReadAsync, lib, KVIKIO_STRINGIFY(cuFileReadAsync)); + get_symbol(WriteAsync, lib, KVIKIO_STRINGIFY(cuFileWriteAsync)); + get_symbol(StreamRegister, lib, KVIKIO_STRINGIFY(cuFileStreamRegister)); + get_symbol(StreamDeregister, lib, KVIKIO_STRINGIFY(cuFileStreamDeregister)); + try { + void* s{}; + get_symbol(s, lib, "cuFileGetVersion"); + stream_available = true; + } catch (const std::runtime_error&) { + stream_available = false; + } +#endif + // cuFile is supposed to open and close the driver automatically but because of a bug in // CUDA 11.8, it sometimes segfault. See . CUfileError_t const error = DriverOpen(); @@ -186,4 +210,22 @@ inline bool is_batch_available() constexpr bool is_batch_available() { return false; } #endif +/** + * @brief Check if cuFile's stream API is available + * + * @return The boolean answer + */ +#ifdef KVIKIO_CUFILE_STREAM_API_FOUND +inline bool is_stream_available() +{ + try { + return cuFileAPI::instance().stream_available; + } catch (const std::runtime_error&) { + return false; + } +} +#else +constexpr bool is_stream_available() { return false; } +#endif + } // namespace kvikio