diff --git a/cpp/src/io/parquet/page_data.cu b/cpp/src/io/parquet/page_data.cu index c26802aa3c2..230834632dd 100644 --- a/cpp/src/io/parquet/page_data.cu +++ b/cpp/src/io/parquet/page_data.cu @@ -430,10 +430,15 @@ static __device__ void gpuOutputGeneric( * @param chunks List of column chunks * @param min_row Row index to start reading at * @param num_rows Maximum number of rows to read + * @param error_code Error code to set if an error is encountered */ template -__global__ void __launch_bounds__(decode_block_size) gpuDecodePageData( - PageInfo* pages, device_span chunks, size_t min_row, size_t num_rows) +__global__ void __launch_bounds__(decode_block_size) + gpuDecodePageData(PageInfo* pages, + device_span chunks, + size_t min_row, + size_t num_rows, + int32_t* error_code) { __shared__ __align__(16) page_state_s state_g; __shared__ __align__(16) @@ -472,7 +477,8 @@ __global__ void __launch_bounds__(decode_block_size) gpuDecodePageData( // skipped_leaf_values will always be 0 for flat hierarchies. uint32_t skipped_leaf_values = s->page.skipped_leaf_values; - while (!s->error && (s->input_value_count < s->num_input_values || s->src_pos < s->nz_count)) { + while (s->error == 0 && + (s->input_value_count < s->num_input_values || s->src_pos < s->nz_count)) { int target_pos; int src_pos = s->src_pos; @@ -596,6 +602,10 @@ __global__ void __launch_bounds__(decode_block_size) gpuDecodePageData( } __syncthreads(); } + if (t == 0 and s->error != 0) { + cuda::atomic_ref ref{*error_code}; + ref.fetch_or(s->error, cuda::std::memory_order_relaxed); + } } struct mask_tform { @@ -621,6 +631,7 @@ void __host__ DecodePageData(cudf::detail::hostdevice_vector& pages, size_t num_rows, size_t min_row, int level_type_size, + int32_t* error_code, rmm::cuda_stream_view stream) { CUDF_EXPECTS(pages.size() > 0, "There is no page to decode"); @@ -629,11 +640,11 @@ void __host__ DecodePageData(cudf::detail::hostdevice_vector& pages, dim3 dim_grid(pages.size(), 1); // 1 threadblock per page if (level_type_size == 1) { - gpuDecodePageData - <<>>(pages.device_ptr(), chunks, min_row, num_rows); + gpuDecodePageData<<>>( + pages.device_ptr(), chunks, min_row, num_rows, error_code); } else { - gpuDecodePageData - <<>>(pages.device_ptr(), chunks, min_row, num_rows); + gpuDecodePageData<<>>( + pages.device_ptr(), chunks, min_row, num_rows, error_code); } } diff --git a/cpp/src/io/parquet/page_decode.cuh b/cpp/src/io/parquet/page_decode.cuh index 5e66885d746..cdc29197eb3 100644 --- a/cpp/src/io/parquet/page_decode.cuh +++ b/cpp/src/io/parquet/page_decode.cuh @@ -21,6 +21,7 @@ #include +#include #include namespace cudf::io::parquet::gpu { @@ -69,6 +70,18 @@ struct page_state_s { PageNestingDecodeInfo nesting_decode_cache[max_cacheable_nesting_decode_info]{}; // points to either nesting_decode_cache above when possible, or to the global source otherwise PageNestingDecodeInfo* nesting_info{}; + + inline __device__ void set_error_code(decode_error err) volatile + { + cuda::atomic_ref ref{const_cast(error)}; + ref.fetch_or(static_cast(err), cuda::std::memory_order_relaxed); + } + + inline __device__ void reset_error_code() volatile + { + cuda::atomic_ref ref{const_cast(error)}; + ref.store(0, cuda::std::memory_order_release); + } }; // buffers only used in the decode kernel. separated from page_state_s to keep @@ -471,7 +484,7 @@ __device__ void gpuDecodeStream( int32_t value_count = s->lvl_count[lvl]; int32_t batch_coded_count = 0; - while (value_count < target_count && value_count < num_input_values) { + while (s->error == 0 && value_count < target_count && value_count < num_input_values) { int batch_len; if (level_run <= 1) { // Get a new run symbol from the byte stream @@ -487,7 +500,14 @@ __device__ void gpuDecodeStream( cur++; } } - if (cur > end || level_run <= 1) { s->error = 0x10; } + if (cur > end) { + s->set_error_code(decode_error::LEVEL_STREAM_OVERRUN); + break; + } + if (level_run <= 1) { + s->set_error_code(decode_error::INVALID_LEVEL_RUN); + break; + } sym_len = (int32_t)(cur - cur_def); __threadfence_block(); } @@ -496,7 +516,7 @@ __device__ void gpuDecodeStream( level_run = shuffle(level_run); cur_def += sym_len; } - if (s->error) { break; } + if (s->error != 0) { break; } batch_len = min(num_input_values - value_count, 32); if (level_run & 1) { @@ -852,7 +872,7 @@ __device__ void gpuDecodeLevels(page_state_s* s, constexpr int batch_size = 32; int cur_leaf_count = target_leaf_count; - while (!s->error && s->nz_count < target_leaf_count && + while (s->error == 0 && s->nz_count < target_leaf_count && s->input_value_count < s->num_input_values) { if (has_repetition) { gpuDecodeStream(rep, s, cur_leaf_count, t, level_type::REPETITION); @@ -916,7 +936,7 @@ inline __device__ uint32_t InitLevelSection(page_state_s* s, } s->lvl_start[lvl] = cur; - if (cur > end) { s->error = 2; } + if (cur > end) { s->set_error_code(decode_error::LEVEL_STREAM_OVERRUN); } }; // this is a little redundant. if level_bits == 0, then nothing should be encoded @@ -941,8 +961,8 @@ inline __device__ uint32_t InitLevelSection(page_state_s* s, // add back the 4 bytes for the length len += 4; } else { - len = 0; - s->error = 2; + len = 0; + s->set_error_code(decode_error::LEVEL_STREAM_OVERRUN); } } else if (encoding == Encoding::BIT_PACKED) { len = (s->page.num_input_values * level_bits + 7) >> 3; @@ -951,8 +971,8 @@ inline __device__ uint32_t InitLevelSection(page_state_s* s, s->lvl_start[lvl] = cur; s->abs_lvl_start[lvl] = cur; } else { - s->error = 3; - len = 0; + len = 0; + s->set_error_code(decode_error::UNSUPPORTED_ENCODING); } s->abs_lvl_end[lvl] = start + len; @@ -1094,7 +1114,7 @@ inline __device__ bool setupLocalPageInfo(page_state_s* const s, } if (!t) { - s->error = 0; + s->reset_error_code(); // IMPORTANT : nested schemas can have 0 rows in a page but still have // values. The case is: @@ -1152,7 +1172,7 @@ inline __device__ bool setupLocalPageInfo(page_state_s* const s, break; default: // FIXED_LEN_BYTE_ARRAY: s->dtype_len = dtype_len_out; - s->error |= (s->dtype_len <= 0); + if (s->dtype_len <= 0) { s->set_error_code(decode_error::INVALID_DATA_TYPE); } break; } // Special check for downconversions @@ -1268,7 +1288,9 @@ inline __device__ bool setupLocalPageInfo(page_state_s* const s, s->dict_run = 0; s->dict_val = 0; s->dict_bits = (cur < end) ? *cur++ : 0; - if (s->dict_bits > 32 || !s->dict_base) { s->error = (10 << 8) | s->dict_bits; } + if (s->dict_bits > 32 || !s->dict_base) { + s->set_error_code(decode_error::INVALID_DICT_WIDTH); + } break; case Encoding::PLAIN: s->dict_size = static_cast(end - cur); @@ -1279,22 +1301,23 @@ inline __device__ bool setupLocalPageInfo(page_state_s* const s, // first 4 bytes are length of RLE data int const len = (cur[0]) + (cur[1] << 8) + (cur[2] << 16) + (cur[3] << 24); cur += 4; - if (cur + len > end) { s->error = 2; } + if (cur + len > end) { s->set_error_code(decode_error::DATA_STREAM_OVERRUN); } s->dict_run = 0; } break; case Encoding::DELTA_BINARY_PACKED: // nothing to do, just don't error break; - default: - s->error = 1; // Unsupported encoding + default: { + s->set_error_code(decode_error::UNSUPPORTED_ENCODING); break; + } } - if (cur > end) { s->error = 1; } + if (cur > end) { s->set_error_code(decode_error::DATA_STREAM_OVERRUN); } s->lvl_end = cur; s->data_start = cur; s->data_end = end; } else { - s->error = 1; + s->set_error_code(decode_error::EMPTY_PAGE); } s->lvl_count[level_type::REPETITION] = 0; diff --git a/cpp/src/io/parquet/page_delta_decode.cu b/cpp/src/io/parquet/page_delta_decode.cu index 35f33a761be..2b78dead205 100644 --- a/cpp/src/io/parquet/page_delta_decode.cu +++ b/cpp/src/io/parquet/page_delta_decode.cu @@ -32,8 +32,12 @@ namespace { // with V2 page headers; see https://www.mail-archive.com/dev@parquet.apache.org/msg11826.html). // this kernel only needs 96 threads (3 warps)(for now). template -__global__ void __launch_bounds__(96) gpuDecodeDeltaBinary( - PageInfo* pages, device_span chunks, size_t min_row, size_t num_rows) +__global__ void __launch_bounds__(96) + gpuDecodeDeltaBinary(PageInfo* pages, + device_span chunks, + size_t min_row, + size_t num_rows, + int32_t* error_code) { using cudf::detail::warp_size; __shared__ __align__(16) delta_binary_decoder db_state; @@ -79,7 +83,8 @@ __global__ void __launch_bounds__(96) gpuDecodeDeltaBinary( // that has a value we need. if (skipped_leaf_values > 0) { db->skip_values(skipped_leaf_values); } - while (!s->error && (s->input_value_count < s->num_input_values || s->src_pos < s->nz_count)) { + while (s->error == 0 && + (s->input_value_count < s->num_input_values || s->src_pos < s->nz_count)) { uint32_t target_pos; uint32_t const src_pos = s->src_pos; @@ -145,6 +150,11 @@ __global__ void __launch_bounds__(96) gpuDecodeDeltaBinary( } __syncthreads(); } + + if (t == 0 and s->error != 0) { + cuda::atomic_ref ref{*error_code}; + ref.fetch_or(s->error, cuda::std::memory_order_relaxed); + } } } // anonymous namespace @@ -157,6 +167,7 @@ void __host__ DecodeDeltaBinary(cudf::detail::hostdevice_vector& pages size_t num_rows, size_t min_row, int level_type_size, + int32_t* error_code, rmm::cuda_stream_view stream) { CUDF_EXPECTS(pages.size() > 0, "There is no page to decode"); @@ -165,11 +176,11 @@ void __host__ DecodeDeltaBinary(cudf::detail::hostdevice_vector& pages dim3 dim_grid(pages.size(), 1); // 1 threadblock per page if (level_type_size == 1) { - gpuDecodeDeltaBinary - <<>>(pages.device_ptr(), chunks, min_row, num_rows); + gpuDecodeDeltaBinary<<>>( + pages.device_ptr(), chunks, min_row, num_rows, error_code); } else { - gpuDecodeDeltaBinary - <<>>(pages.device_ptr(), chunks, min_row, num_rows); + gpuDecodeDeltaBinary<<>>( + pages.device_ptr(), chunks, min_row, num_rows, error_code); } } diff --git a/cpp/src/io/parquet/page_string_decode.cu b/cpp/src/io/parquet/page_string_decode.cu index 1ac4c95f713..d79abe4a6d2 100644 --- a/cpp/src/io/parquet/page_string_decode.cu +++ b/cpp/src/io/parquet/page_string_decode.cu @@ -582,8 +582,12 @@ __global__ void __launch_bounds__(preprocess_block_size) gpuComputePageStringSiz * @tparam level_t Type used to store decoded repetition and definition levels */ template -__global__ void __launch_bounds__(decode_block_size) gpuDecodeStringPageData( - PageInfo* pages, device_span chunks, size_t min_row, size_t num_rows) +__global__ void __launch_bounds__(decode_block_size) + gpuDecodeStringPageData(PageInfo* pages, + device_span chunks, + size_t min_row, + size_t num_rows, + int32_t* error_code) { __shared__ __align__(16) page_state_s state_g; __shared__ __align__(4) size_type last_offset; @@ -617,7 +621,8 @@ __global__ void __launch_bounds__(decode_block_size) gpuDecodeStringPageData( // skipped_leaf_values will always be 0 for flat hierarchies. uint32_t skipped_leaf_values = s->page.skipped_leaf_values; - while (!s->error && (s->input_value_count < s->num_input_values || s->src_pos < s->nz_count)) { + while (s->error == 0 && + (s->input_value_count < s->num_input_values || s->src_pos < s->nz_count)) { int target_pos; int src_pos = s->src_pos; @@ -742,6 +747,11 @@ __global__ void __launch_bounds__(decode_block_size) gpuDecodeStringPageData( auto const offptr = reinterpret_cast(nesting_info_base[leaf_level_index].data_out); block_excl_sum(offptr, value_count, s->page.str_offset); + + if (t == 0 and s->error != 0) { + cuda::atomic_ref ref{*error_code}; + ref.fetch_or(s->error, cuda::std::memory_order_relaxed); + } } } // anonymous namespace @@ -775,6 +785,7 @@ void __host__ DecodeStringPageData(cudf::detail::hostdevice_vector& pa size_t num_rows, size_t min_row, int level_type_size, + int32_t* error_code, rmm::cuda_stream_view stream) { CUDF_EXPECTS(pages.size() > 0, "There is no page to decode"); @@ -783,11 +794,11 @@ void __host__ DecodeStringPageData(cudf::detail::hostdevice_vector& pa dim3 dim_grid(pages.size(), 1); // 1 threadblock per page if (level_type_size == 1) { - gpuDecodeStringPageData - <<>>(pages.device_ptr(), chunks, min_row, num_rows); + gpuDecodeStringPageData<<>>( + pages.device_ptr(), chunks, min_row, num_rows, error_code); } else { - gpuDecodeStringPageData - <<>>(pages.device_ptr(), chunks, min_row, num_rows); + gpuDecodeStringPageData<<>>( + pages.device_ptr(), chunks, min_row, num_rows, error_code); } } diff --git a/cpp/src/io/parquet/parquet_gpu.hpp b/cpp/src/io/parquet/parquet_gpu.hpp index a760c2448dc..3c37c0df021 100644 --- a/cpp/src/io/parquet/parquet_gpu.hpp +++ b/cpp/src/io/parquet/parquet_gpu.hpp @@ -54,6 +54,21 @@ constexpr int rolling_index(int index) return index % rolling_size; } +/** + * @brief Enum for the different types of errors that can occur during decoding. + * + * These values are used as bitmasks, so they must be powers of 2. + */ +enum class decode_error : int32_t { + DATA_STREAM_OVERRUN = 0x1, + LEVEL_STREAM_OVERRUN = 0x2, + UNSUPPORTED_ENCODING = 0x4, + INVALID_LEVEL_RUN = 0x8, + INVALID_DATA_TYPE = 0x10, + EMPTY_PAGE = 0x20, + INVALID_DICT_WIDTH = 0x40, +}; + /** * @brief Struct representing an input column in the file. */ @@ -566,6 +581,7 @@ void ComputePageStringSizes(cudf::detail::hostdevice_vector& pages, * @param[in] num_rows Total number of rows to read * @param[in] min_row Minimum number of rows to read * @param[in] level_type_size Size in bytes of the type for level decoding + * @param[out] error_code Error code for kernel failures * @param[in] stream CUDA stream to use */ void DecodePageData(cudf::detail::hostdevice_vector& pages, @@ -573,6 +589,7 @@ void DecodePageData(cudf::detail::hostdevice_vector& pages, size_t num_rows, size_t min_row, int level_type_size, + int32_t* error_code, rmm::cuda_stream_view stream); /** @@ -586,6 +603,7 @@ void DecodePageData(cudf::detail::hostdevice_vector& pages, * @param[in] num_rows Total number of rows to read * @param[in] min_row Minimum number of rows to read * @param[in] level_type_size Size in bytes of the type for level decoding + * @param[out] error_code Error code for kernel failures * @param[in] stream CUDA stream to use */ void DecodeStringPageData(cudf::detail::hostdevice_vector& pages, @@ -593,6 +611,7 @@ void DecodeStringPageData(cudf::detail::hostdevice_vector& pages, size_t num_rows, size_t min_row, int level_type_size, + int32_t* error_code, rmm::cuda_stream_view stream); /** @@ -606,6 +625,7 @@ void DecodeStringPageData(cudf::detail::hostdevice_vector& pages, * @param[in] num_rows Total number of rows to read * @param[in] min_row Minimum number of rows to read * @param[in] level_type_size Size in bytes of the type for level decoding + * @param[out] error_code Error code for kernel failures * @param[in] stream CUDA stream to use, default 0 */ void DecodeDeltaBinary(cudf::detail::hostdevice_vector& pages, @@ -613,6 +633,7 @@ void DecodeDeltaBinary(cudf::detail::hostdevice_vector& pages, size_t num_rows, size_t min_row, int level_type_size, + int32_t* error_code, rmm::cuda_stream_view stream); /** diff --git a/cpp/src/io/parquet/reader_impl.cpp b/cpp/src/io/parquet/reader_impl.cpp index 8b0a0bd4eb0..6cbe64e227b 100644 --- a/cpp/src/io/parquet/reader_impl.cpp +++ b/cpp/src/io/parquet/reader_impl.cpp @@ -163,6 +163,8 @@ void reader::impl::decode_page_data(size_t skip_rows, size_t num_rows) chunk_nested_valids.host_to_device_async(_stream); chunk_nested_data.host_to_device_async(_stream); + rmm::device_scalar error_code(0, _stream); + // get the number of streams we need from the pool and tell them to wait on the H2D copies int const nkernels = std::bitset<32>(kernel_mask).count(); auto streams = cudf::detail::fork_streams(_stream, nkernels); @@ -174,17 +176,20 @@ void reader::impl::decode_page_data(size_t skip_rows, size_t num_rows) if (has_strings) { auto& stream = streams[s_idx++]; chunk_nested_str_data.host_to_device_async(stream); - gpu::DecodeStringPageData(pages, chunks, num_rows, skip_rows, level_type_size, stream); + gpu::DecodeStringPageData( + pages, chunks, num_rows, skip_rows, level_type_size, error_code.data(), stream); } // launch delta binary decoder if ((kernel_mask & gpu::KERNEL_MASK_DELTA_BINARY) != 0) { - gpu::DecodeDeltaBinary(pages, chunks, num_rows, skip_rows, level_type_size, streams[s_idx++]); + gpu::DecodeDeltaBinary( + pages, chunks, num_rows, skip_rows, level_type_size, error_code.data(), streams[s_idx++]); } // launch the catch-all page decoder if ((kernel_mask & gpu::KERNEL_MASK_GENERAL) != 0) { - gpu::DecodePageData(pages, chunks, num_rows, skip_rows, level_type_size, streams[s_idx++]); + gpu::DecodePageData( + pages, chunks, num_rows, skip_rows, level_type_size, error_code.data(), streams[s_idx++]); } // synchronize the streams @@ -193,7 +198,13 @@ void reader::impl::decode_page_data(size_t skip_rows, size_t num_rows) pages.device_to_host_async(_stream); page_nesting.device_to_host_async(_stream); page_nesting_decode.device_to_host_async(_stream); - _stream.synchronize(); + + auto const decode_error = error_code.value(_stream); + if (decode_error != 0) { + std::stringstream stream; + stream << std::hex << decode_error; + CUDF_FAIL("Parquet data decode failed with code(s) 0x" + stream.str()); + } // for list columns, add the final offset to every offset buffer. // TODO : make this happen in more efficiently. Maybe use thrust::for_each