Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Propagate errors from Parquet reader kernels back to host #14167

Merged
merged 18 commits into from
Sep 28, 2023
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 18 additions & 7 deletions cpp/src/io/parquet/page_data.cu
Original file line number Diff line number Diff line change
Expand Up @@ -430,10 +430,15 @@ static __device__ void gpuOutputGeneric(
* @param chunks List of column chunks
* @param min_row Row index to start reading at
* @param num_rows Maximum number of rows to read
* @param error_code Error code to set if an error is encountered
*/
template <int lvl_buf_size, typename level_t>
__global__ void __launch_bounds__(decode_block_size) gpuDecodePageData(
PageInfo* pages, device_span<ColumnChunkDesc const> chunks, size_t min_row, size_t num_rows)
__global__ void __launch_bounds__(decode_block_size)
gpuDecodePageData(PageInfo* pages,
device_span<ColumnChunkDesc const> chunks,
size_t min_row,
size_t num_rows,
int32_t* error_code)
{
__shared__ __align__(16) page_state_s state_g;
__shared__ __align__(16)
Expand Down Expand Up @@ -472,7 +477,8 @@ __global__ void __launch_bounds__(decode_block_size) gpuDecodePageData(

// skipped_leaf_values will always be 0 for flat hierarchies.
uint32_t skipped_leaf_values = s->page.skipped_leaf_values;
while (!s->error && (s->input_value_count < s->num_input_values || s->src_pos < s->nz_count)) {
while (s->error == 0 &&
(s->input_value_count < s->num_input_values || s->src_pos < s->nz_count)) {
int target_pos;
int src_pos = s->src_pos;

Expand Down Expand Up @@ -596,6 +602,10 @@ __global__ void __launch_bounds__(decode_block_size) gpuDecodePageData(
}
__syncthreads();
}
if (!t and s->error != 0) {
vuule marked this conversation as resolved.
Show resolved Hide resolved
cuda::atomic_ref<int32_t, cuda::thread_scope_device> ref{*error_code};
ref.store(s->error, cuda::std::memory_order_relaxed);
vuule marked this conversation as resolved.
Show resolved Hide resolved
}
}

struct mask_tform {
Expand All @@ -621,6 +631,7 @@ void __host__ DecodePageData(cudf::detail::hostdevice_vector<PageInfo>& pages,
size_t num_rows,
size_t min_row,
int level_type_size,
int32_t* error_code,
rmm::cuda_stream_view stream)
{
CUDF_EXPECTS(pages.size() > 0, "There is no page to decode");
Expand All @@ -629,11 +640,11 @@ void __host__ DecodePageData(cudf::detail::hostdevice_vector<PageInfo>& pages,
dim3 dim_grid(pages.size(), 1); // 1 threadblock per page

if (level_type_size == 1) {
gpuDecodePageData<rolling_buf_size, uint8_t>
<<<dim_grid, dim_block, 0, stream.value()>>>(pages.device_ptr(), chunks, min_row, num_rows);
gpuDecodePageData<rolling_buf_size, uint8_t><<<dim_grid, dim_block, 0, stream.value()>>>(
pages.device_ptr(), chunks, min_row, num_rows, error_code);
} else {
gpuDecodePageData<rolling_buf_size, uint16_t>
<<<dim_grid, dim_block, 0, stream.value()>>>(pages.device_ptr(), chunks, min_row, num_rows);
gpuDecodePageData<rolling_buf_size, uint16_t><<<dim_grid, dim_block, 0, stream.value()>>>(
pages.device_ptr(), chunks, min_row, num_rows, error_code);
}
}

Expand Down
45 changes: 28 additions & 17 deletions cpp/src/io/parquet/page_decode.cuh
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

#include <io/utilities/block_utils.cuh>

#include <cuda/atomic>
#include <cuda/std/tuple>

namespace cudf::io::parquet::gpu {
Expand Down Expand Up @@ -68,6 +69,12 @@ struct page_state_s {
PageNestingDecodeInfo nesting_decode_cache[max_cacheable_nesting_decode_info];
// points to either nesting_decode_cache above when possible, or to the global source otherwise
PageNestingDecodeInfo* nesting_info;

inline __device__ void set_error_code(int32_t err) volatile
vuule marked this conversation as resolved.
Show resolved Hide resolved
{
cuda::atomic_ref<int32_t, cuda::thread_scope_block> ref{const_cast<int&>(error)};
ref.store(err, cuda::std::memory_order_relaxed);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this atomic necessary? I didn't see any places where anything other than thread 0 (of the block) sets the error code. I suppose that may not be the case in the future. Based on how this is called, I wonder if an atomic OR is better here so we can stash multiple error types as individual bits.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I made it atomic since we probably don't need to worry about performance when failing. This seemed like a safe option for future checks as well.

About the error code as mask - Ed is concerned about the limit on the number of errors that this would impose. I could be convinced to go either way, don't expect the trade-off to be relevant in practice.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TBH the most common error condition is going to be a buffer overrun detected somewhere. We could probably get away without codes at all and have a single error bit. The host code calling the kernel can report which kernel failed. It just comes down to how fine grained you want the error reporting to be.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I could see it either way. It's so hard to even know what thread failed and the context of why (possibly because some other thread did something wrong) having a set of bits could act as bread-crumbs to lead you to where things really went wrong. But on the other hand, you're a lot more limited on what you can report. I'm fine either way. Parallel error reporting is amusing in any case.

}
};

// buffers only used in the decode kernel. separated from page_state_s to keep
Expand Down Expand Up @@ -470,7 +477,7 @@ __device__ void gpuDecodeStream(
int32_t value_count = s->lvl_count[lvl];
int32_t batch_coded_count = 0;

while (value_count < target_count && value_count < num_input_values) {
while (s->error == 0 && value_count < target_count && value_count < num_input_values) {
int batch_len;
if (level_run <= 1) {
// Get a new run symbol from the byte stream
Expand All @@ -486,7 +493,10 @@ __device__ void gpuDecodeStream(
cur++;
}
}
if (cur > end || level_run <= 1) { s->error = 0x10; }
if (cur > end || level_run <= 1) {
s->set_error_code(0x10);
break;
}
sym_len = (int32_t)(cur - cur_def);
__threadfence_block();
}
Expand All @@ -495,7 +505,7 @@ __device__ void gpuDecodeStream(
level_run = shuffle(level_run);
cur_def += sym_len;
}
if (s->error) { break; }
if (s->error != 0) { break; }

batch_len = min(num_input_values - value_count, 32);
if (level_run & 1) {
Expand Down Expand Up @@ -851,7 +861,7 @@ __device__ void gpuDecodeLevels(page_state_s* s,

constexpr int batch_size = 32;
int cur_leaf_count = target_leaf_count;
while (!s->error && s->nz_count < target_leaf_count &&
while (s->error == 0 && s->nz_count < target_leaf_count &&
s->input_value_count < s->num_input_values) {
if (has_repetition) {
gpuDecodeStream<level_t, rolling_buf_size>(rep, s, cur_leaf_count, t, level_type::REPETITION);
Expand Down Expand Up @@ -915,7 +925,7 @@ inline __device__ uint32_t InitLevelSection(page_state_s* s,
}
s->lvl_start[lvl] = cur;

if (cur > end) { s->error = 2; }
if (cur > end) { s->set_error_code(2); }
};

// this is a little redundant. if level_bits == 0, then nothing should be encoded
Expand All @@ -940,8 +950,8 @@ inline __device__ uint32_t InitLevelSection(page_state_s* s,
// add back the 4 bytes for the length
len += 4;
} else {
len = 0;
s->error = 2;
len = 0;
s->set_error_code(2);
}
} else if (encoding == Encoding::BIT_PACKED) {
len = (s->page.num_input_values * level_bits + 7) >> 3;
Expand All @@ -950,8 +960,8 @@ inline __device__ uint32_t InitLevelSection(page_state_s* s,
s->lvl_start[lvl] = cur;
s->abs_lvl_start[lvl] = cur;
} else {
s->error = 3;
len = 0;
len = 0;
s->set_error_code(3);
}

s->abs_lvl_end[lvl] = start + len;
Expand Down Expand Up @@ -1093,7 +1103,7 @@ inline __device__ bool setupLocalPageInfo(page_state_s* const s,
}

if (!t) {
s->error = 0;
s->set_error_code(0);

// IMPORTANT : nested schemas can have 0 rows in a page but still have
// values. The case is:
Expand Down Expand Up @@ -1151,7 +1161,7 @@ inline __device__ bool setupLocalPageInfo(page_state_s* const s,
break;
default: // FIXED_LEN_BYTE_ARRAY:
s->dtype_len = dtype_len_out;
s->error |= (s->dtype_len <= 0);
s->set_error_code(s->dtype_len <= 0);
break;
}
// Special check for downconversions
Expand Down Expand Up @@ -1267,7 +1277,7 @@ inline __device__ bool setupLocalPageInfo(page_state_s* const s,
s->dict_run = 0;
s->dict_val = 0;
s->dict_bits = (cur < end) ? *cur++ : 0;
if (s->dict_bits > 32 || !s->dict_base) { s->error = (10 << 8) | s->dict_bits; }
if (s->dict_bits > 32 || !s->dict_base) { s->set_error_code((10 << 8) | s->dict_bits); }
break;
case Encoding::PLAIN:
s->dict_size = static_cast<int32_t>(end - cur);
Expand All @@ -1278,22 +1288,23 @@ inline __device__ bool setupLocalPageInfo(page_state_s* const s,
// first 4 bytes are length of RLE data
int const len = (cur[0]) + (cur[1] << 8) + (cur[2] << 16) + (cur[3] << 24);
cur += 4;
if (cur + len > end) { s->error = 2; }
if (cur + len > end) { s->set_error_code(2); }
s->dict_run = 0;
} break;
case Encoding::DELTA_BINARY_PACKED:
// nothing to do, just don't error
break;
default:
s->error = 1; // Unsupported encoding
default: {
s->set_error_code(1); // Unsupported encoding
break;
}
}
if (cur > end) { s->error = 1; }
if (cur > end) { s->set_error_code(1); }
s->lvl_end = cur;
s->data_start = cur;
s->data_end = end;
} else {
s->error = 1;
s->set_error_code(1);
}

s->lvl_count[level_type::REPETITION] = 0;
Expand Down
25 changes: 18 additions & 7 deletions cpp/src/io/parquet/page_delta_decode.cu
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,12 @@ namespace {
// with V2 page headers; see https://www.mail-archive.com/[email protected]/msg11826.html).
// this kernel only needs 96 threads (3 warps)(for now).
template <typename level_t>
__global__ void __launch_bounds__(96) gpuDecodeDeltaBinary(
PageInfo* pages, device_span<ColumnChunkDesc const> chunks, size_t min_row, size_t num_rows)
__global__ void __launch_bounds__(96)
gpuDecodeDeltaBinary(PageInfo* pages,
device_span<ColumnChunkDesc const> chunks,
size_t min_row,
size_t num_rows,
int32_t* error_code)
{
using cudf::detail::warp_size;
__shared__ __align__(16) delta_binary_decoder db_state;
Expand Down Expand Up @@ -79,7 +83,8 @@ __global__ void __launch_bounds__(96) gpuDecodeDeltaBinary(
// that has a value we need.
if (skipped_leaf_values > 0) { db->skip_values(skipped_leaf_values); }

while (!s->error && (s->input_value_count < s->num_input_values || s->src_pos < s->nz_count)) {
while (s->error == 0 &&
(s->input_value_count < s->num_input_values || s->src_pos < s->nz_count)) {
uint32_t target_pos;
uint32_t const src_pos = s->src_pos;

Expand Down Expand Up @@ -145,6 +150,11 @@ __global__ void __launch_bounds__(96) gpuDecodeDeltaBinary(
}
__syncthreads();
}

if (!t and s->error != 0) {
cuda::atomic_ref<int32_t, cuda::thread_scope_device> ref{*error_code};
ref.store(s->error, cuda::std::memory_order_relaxed);
}
}

} // anonymous namespace
Expand All @@ -157,6 +167,7 @@ void __host__ DecodeDeltaBinary(cudf::detail::hostdevice_vector<PageInfo>& pages
size_t num_rows,
size_t min_row,
int level_type_size,
int32_t* error_code,
rmm::cuda_stream_view stream)
{
CUDF_EXPECTS(pages.size() > 0, "There is no page to decode");
Expand All @@ -165,11 +176,11 @@ void __host__ DecodeDeltaBinary(cudf::detail::hostdevice_vector<PageInfo>& pages
dim3 dim_grid(pages.size(), 1); // 1 threadblock per page

if (level_type_size == 1) {
gpuDecodeDeltaBinary<uint8_t>
<<<dim_grid, dim_block, 0, stream.value()>>>(pages.device_ptr(), chunks, min_row, num_rows);
gpuDecodeDeltaBinary<uint8_t><<<dim_grid, dim_block, 0, stream.value()>>>(
pages.device_ptr(), chunks, min_row, num_rows, error_code);
} else {
gpuDecodeDeltaBinary<uint16_t>
<<<dim_grid, dim_block, 0, stream.value()>>>(pages.device_ptr(), chunks, min_row, num_rows);
gpuDecodeDeltaBinary<uint16_t><<<dim_grid, dim_block, 0, stream.value()>>>(
pages.device_ptr(), chunks, min_row, num_rows, error_code);
}
}

Expand Down
25 changes: 18 additions & 7 deletions cpp/src/io/parquet/page_string_decode.cu
Original file line number Diff line number Diff line change
Expand Up @@ -582,8 +582,12 @@ __global__ void __launch_bounds__(preprocess_block_size) gpuComputePageStringSiz
* @tparam level_t Type used to store decoded repetition and definition levels
*/
template <typename level_t>
__global__ void __launch_bounds__(decode_block_size) gpuDecodeStringPageData(
PageInfo* pages, device_span<ColumnChunkDesc const> chunks, size_t min_row, size_t num_rows)
__global__ void __launch_bounds__(decode_block_size)
gpuDecodeStringPageData(PageInfo* pages,
device_span<ColumnChunkDesc const> chunks,
size_t min_row,
size_t num_rows,
int32_t* error_code)
{
__shared__ __align__(16) page_state_s state_g;
__shared__ __align__(4) size_type last_offset;
Expand Down Expand Up @@ -617,7 +621,8 @@ __global__ void __launch_bounds__(decode_block_size) gpuDecodeStringPageData(

// skipped_leaf_values will always be 0 for flat hierarchies.
uint32_t skipped_leaf_values = s->page.skipped_leaf_values;
while (!s->error && (s->input_value_count < s->num_input_values || s->src_pos < s->nz_count)) {
while (s->error == 0 &&
(s->input_value_count < s->num_input_values || s->src_pos < s->nz_count)) {
int target_pos;
int src_pos = s->src_pos;

Expand Down Expand Up @@ -742,6 +747,11 @@ __global__ void __launch_bounds__(decode_block_size) gpuDecodeStringPageData(

auto const offptr = reinterpret_cast<size_type*>(nesting_info_base[leaf_level_index].data_out);
block_excl_sum<decode_block_size>(offptr, value_count, s->page.str_offset);

if (!t and s->error != 0) {
cuda::atomic_ref<int32_t, cuda::thread_scope_device> ref{*error_code};
ref.store(s->error, cuda::std::memory_order_relaxed);
}
}

} // anonymous namespace
Expand Down Expand Up @@ -775,6 +785,7 @@ void __host__ DecodeStringPageData(cudf::detail::hostdevice_vector<PageInfo>& pa
size_t num_rows,
size_t min_row,
int level_type_size,
int32_t* error_code,
rmm::cuda_stream_view stream)
{
CUDF_EXPECTS(pages.size() > 0, "There is no page to decode");
Expand All @@ -783,11 +794,11 @@ void __host__ DecodeStringPageData(cudf::detail::hostdevice_vector<PageInfo>& pa
dim3 dim_grid(pages.size(), 1); // 1 threadblock per page

if (level_type_size == 1) {
gpuDecodeStringPageData<uint8_t>
<<<dim_grid, dim_block, 0, stream.value()>>>(pages.device_ptr(), chunks, min_row, num_rows);
gpuDecodeStringPageData<uint8_t><<<dim_grid, dim_block, 0, stream.value()>>>(
pages.device_ptr(), chunks, min_row, num_rows, error_code);
} else {
gpuDecodeStringPageData<uint16_t>
<<<dim_grid, dim_block, 0, stream.value()>>>(pages.device_ptr(), chunks, min_row, num_rows);
gpuDecodeStringPageData<uint16_t><<<dim_grid, dim_block, 0, stream.value()>>>(
pages.device_ptr(), chunks, min_row, num_rows, error_code);
}
}

Expand Down
6 changes: 6 additions & 0 deletions cpp/src/io/parquet/parquet_gpu.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -566,13 +566,15 @@ void ComputePageStringSizes(cudf::detail::hostdevice_vector<PageInfo>& pages,
* @param[in] num_rows Total number of rows to read
* @param[in] min_row Minimum number of rows to read
* @param[in] level_type_size Size in bytes of the type for level decoding
* @param[out] error_code Error code for kernel failures
* @param[in] stream CUDA stream to use
*/
void DecodePageData(cudf::detail::hostdevice_vector<PageInfo>& pages,
cudf::detail::hostdevice_vector<ColumnChunkDesc> const& chunks,
size_t num_rows,
size_t min_row,
int level_type_size,
int32_t* error_code,
rmm::cuda_stream_view stream);

/**
Expand All @@ -586,13 +588,15 @@ void DecodePageData(cudf::detail::hostdevice_vector<PageInfo>& pages,
* @param[in] num_rows Total number of rows to read
* @param[in] min_row Minimum number of rows to read
* @param[in] level_type_size Size in bytes of the type for level decoding
* @param[out] error_code Error code for kernel failures
* @param[in] stream CUDA stream to use
*/
void DecodeStringPageData(cudf::detail::hostdevice_vector<PageInfo>& pages,
cudf::detail::hostdevice_vector<ColumnChunkDesc> const& chunks,
size_t num_rows,
size_t min_row,
int level_type_size,
int32_t* error_code,
rmm::cuda_stream_view stream);

/**
Expand All @@ -606,13 +610,15 @@ void DecodeStringPageData(cudf::detail::hostdevice_vector<PageInfo>& pages,
* @param[in] num_rows Total number of rows to read
* @param[in] min_row Minimum number of rows to read
* @param[in] level_type_size Size in bytes of the type for level decoding
* @param[out] error_code Error code for kernel failures
* @param[in] stream CUDA stream to use, default 0
*/
void DecodeDeltaBinary(cudf::detail::hostdevice_vector<PageInfo>& pages,
cudf::detail::hostdevice_vector<ColumnChunkDesc> const& chunks,
size_t num_rows,
size_t min_row,
int level_type_size,
int32_t* error_code,
rmm::cuda_stream_view stream);

/**
Expand Down
Loading