Skip to content

Commit

Permalink
async: fall back to blocking
Browse files Browse the repository at this point in the history
  • Loading branch information
madsbk committed Aug 30, 2023
1 parent 63d76a9 commit 8c3d3e8
Show file tree
Hide file tree
Showing 3 changed files with 59 additions and 46 deletions.
70 changes: 34 additions & 36 deletions cpp/examples/basic_io.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,6 @@ int main()
cout << "Parallel POSIX read (" << kvikio::defaults::thread_pool_nthreads()
<< " threads): " << read << endl;
}

if (kvikio::is_batch_and_stream_available() && !kvikio::defaults::compat_mode()) {
// Here we use the batch API to read "/tmp/test-file" into `b_dev` by
// submitting 4 batch operations.
Expand Down Expand Up @@ -195,41 +194,40 @@ int main()
check(statuses.empty());
cout << "Batch canceling of all 4 operations" << endl;
}

{
cout << "Performing async I/O using file handle" << endl;
off_t f_off{0};
off_t d_off{0};
// Notice, we have to allocate the `bytes_done_p` argument on the heap and set it to 0.
ssize_t* bytes_done_p{};
check(cudaHostAlloc((void**)&bytes_done_p, SIZE, cudaHostAllocDefault) == cudaSuccess);
*bytes_done_p = 0;

// Let's create a new stream and submit a sync write
CUstream stream{};
check(cudaStreamCreate(&stream) == cudaSuccess);
kvikio::FileHandle f_handle("/data/test-file", "w+", kvikio::FileHandle::m644, false);
check(cudaMemcpyAsync(a_dev, a, SIZE, cudaMemcpyHostToDevice, stream) == cudaSuccess);
f_handle.write_async(a_dev, &io_size, &f_off, &d_off, bytes_done_p, stream);

// After synchronizing `stream`, we can read the number of bytes written
check(cudaStreamSynchronize(stream) == cudaSuccess);
// Note, `*bytes_done_p` might be negative, which indicate an IO error thus we
// use `CUFILE_CHECK_STREAM_IO` to check for errors.
CUFILE_CHECK_STREAM_IO(bytes_done_p);
check(*bytes_done_p == SIZE);
cout << "File async write : " << *bytes_done_p << endl;

/* Read */
*bytes_done_p = 0;
f_handle.read_async(c_dev, &io_size, &f_off, &d_off, bytes_done_p, stream);
check(cudaStreamSynchronize(stream) == cudaSuccess);
CUFILE_CHECK_STREAM_IO(bytes_done_p);
check(*bytes_done_p == SIZE);
cout << "File async read : " << *bytes_done_p << endl;
check(cudaFreeHost((void*)bytes_done_p) == cudaSuccess);
}
} else {
cout << "The batch and stream API isn't available, requires CUDA 12.2+" << endl;
cout << "The batch API isn't available, requires CUDA 12.2+" << endl;
}
{
cout << "Performing async I/O using file handle" << endl;
off_t f_off{0};
off_t d_off{0};
// Notice, we have to allocate the `bytes_done_p` argument on the heap and set it to 0.
ssize_t* bytes_done_p{};
check(cudaHostAlloc((void**)&bytes_done_p, SIZE, cudaHostAllocDefault) == cudaSuccess);
*bytes_done_p = 0;

// Let's create a new stream and submit a sync write
CUstream stream{};
check(cudaStreamCreate(&stream) == cudaSuccess);
kvikio::FileHandle f_handle("/tmp/test-file", "w+");
check(cudaMemcpyAsync(a_dev, a, SIZE, cudaMemcpyHostToDevice, stream) == cudaSuccess);
f_handle.write_async(a_dev, &io_size, &f_off, &d_off, bytes_done_p, stream);

// After synchronizing `stream`, we can read the number of bytes written
check(cudaStreamSynchronize(stream) == cudaSuccess);
// Note, `*bytes_done_p` might be negative, which indicate an IO error thus we
// use `CUFILE_CHECK_STREAM_IO` to check for errors.
CUFILE_CHECK_STREAM_IO(bytes_done_p);
check(*bytes_done_p == SIZE);
cout << "File async write : " << *bytes_done_p << endl;

/* Read */
*bytes_done_p = 0;
f_handle.read_async(c_dev, &io_size, &f_off, &d_off, bytes_done_p, stream);
check(cudaStreamSynchronize(stream) == cudaSuccess);
CUFILE_CHECK_STREAM_IO(bytes_done_p);
check(*bytes_done_p == SIZE);
cout << "File async read : " << *bytes_done_p << endl;
check(cudaFreeHost((void*)bytes_done_p) == cudaSuccess);
}
}
33 changes: 23 additions & 10 deletions cpp/include/kvikio/file_handle.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

#include <fcntl.h>
#include <sys/stat.h>
#include <sys/types.h>
#include <unistd.h>

#include <cstddef>
Expand Down Expand Up @@ -502,6 +503,9 @@ class FileHandle {
* This is an asynchronous version of `.read()`, which will be executed in sequence
* for the specified stream.
*
* When running CUDA v12.1 or older, this function falls back to use `.read()` after
* `stream` has been synchronized.
*
* The arguments have the same meaning as in `.read()` but some of them are deferred. That is,
* the values of `size`, `file_offset` and `devPtr_offset` will not be evaluated until execution
* time. Notice, this behavior can be changed using cuFile's cuFileStreamRegister API.
Expand Down Expand Up @@ -535,12 +539,15 @@ class FileHandle {
CUstream stream)
{
#ifdef KVIKIO_CUFILE_STREAM_API_FOUND
CUFILE_TRY(cuFileAPI::instance().ReadAsync(
_handle, devPtr_base, size, file_offset, devPtr_offset, bytes_read, stream));
return;
#else
throw CUfileException("cuFile's stream API isn't available, please build with CUDA v12.2+.");
if (kvikio::is_batch_and_stream_available() && !kvikio::defaults::compat_mode()) {
CUFILE_TRY(cuFileAPI::instance().ReadAsync(
_handle, devPtr_base, size, file_offset, devPtr_offset, bytes_read, stream));
return;
}
#endif

CUDA_DRIVER_TRY(cudaAPI::instance().StreamSynchronize(stream));
*bytes_read = static_cast<ssize_t>(read(devPtr_base, *size, *file_offset, *devPtr_offset));
}

/**
Expand All @@ -549,6 +556,9 @@ class FileHandle {
* This is an asynchronous version of `.write()`, which will be executed in sequence
* for the specified stream.
*
* When running CUDA v12.1 or older, this function falls back to use `.read()` after
* `stream` has been synchronized.
*
* The arguments have the same meaning as in `.write()` but some of them are deferred. That is,
* the values of `size`, `file_offset` and `devPtr_offset` will not be evaluated until execution
* time. Notice, this behavior can be changed using cuFile's cuFileStreamRegister API.
Expand Down Expand Up @@ -581,12 +591,15 @@ class FileHandle {
CUstream stream)
{
#ifdef KVIKIO_CUFILE_STREAM_API_FOUND
CUFILE_TRY(cuFileAPI::instance().WriteAsync(
_handle, devPtr_base, size, file_offset, devPtr_offset, bytes_written, stream));
return;
#else
throw CUfileException("cuFile's stream API isn't available, please build with CUDA v12.2+.");
if (kvikio::is_batch_and_stream_available() && !kvikio::defaults::compat_mode()) {
CUFILE_TRY(cuFileAPI::instance().WriteAsync(
_handle, devPtr_base, size, file_offset, devPtr_offset, bytes_written, stream));
return;
}
#endif

CUDA_DRIVER_TRY(cudaAPI::instance().StreamSynchronize(stream));
*bytes_written = static_cast<ssize_t>(write(devPtr_base, *size, *file_offset, *devPtr_offset));
}

/**
Expand Down
2 changes: 2 additions & 0 deletions cpp/include/kvikio/shim/cuda.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ class cudaAPI {
decltype(cuDeviceGet)* DeviceGet{nullptr};
decltype(cuDevicePrimaryCtxRetain)* DevicePrimaryCtxRetain{nullptr};
decltype(cuDevicePrimaryCtxRelease)* DevicePrimaryCtxRelease{nullptr};
decltype(cuStreamSynchronize)* StreamSynchronize{nullptr};

private:
cudaAPI()
Expand All @@ -70,6 +71,7 @@ class cudaAPI {
get_symbol(DeviceGet, lib, KVIKIO_STRINGIFY(cuDeviceGet));
get_symbol(DevicePrimaryCtxRetain, lib, KVIKIO_STRINGIFY(cuDevicePrimaryCtxRetain));
get_symbol(DevicePrimaryCtxRelease, lib, KVIKIO_STRINGIFY(cuDevicePrimaryCtxRelease));
get_symbol(StreamSynchronize, lib, KVIKIO_STRINGIFY(cuStreamSynchronize));
}

public:
Expand Down

0 comments on commit 8c3d3e8

Please sign in to comment.