Skip to content

Commit

Permalink
Detect and report errors in Parquet header parsing (#14237)
Browse files Browse the repository at this point in the history
Fixes #13656.  Uses the error reporting introduced in #14167 to report errors in header parsing.

Authors:
  - Ed Seidl (https://github.com/etseidl)
  - Vukasin Milovanovic (https://github.com/vuule)

Approvers:
  - Vukasin Milovanovic (https://github.com/vuule)
  - Bradley Dice (https://github.com/bdice)

URL: #14237
  • Loading branch information
etseidl authored Oct 20, 2023
1 parent 0341bb7 commit e7c6365
Show file tree
Hide file tree
Showing 10 changed files with 170 additions and 75 deletions.
77 changes: 77 additions & 0 deletions cpp/src/io/parquet/error.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
/*
* Copyright (c) 2023, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#pragma once

#include <rmm/cuda_stream_view.hpp>
#include <rmm/device_scalar.hpp>

#include <cstdint>
#include <sstream>

namespace cudf::io::parquet {

/**
* @brief Wrapper around a `rmm::device_scalar` for use in reporting errors that occur in
* kernel calls.
*
* The `kernel_error` object is created with a `rmm::cuda_stream_view` which is used throughout
* the object's lifetime.
*/
class kernel_error {
private:
rmm::device_scalar<int32_t> _error_code;

public:
/**
* @brief Construct a new `kernel_error` with an initial value of 0.
*
* Note: the initial value is set asynchronously.
*
* @throws `rmm::bad_alloc` if allocating the device memory for `initial_value` fails.
* @throws `rmm::cuda_error` if copying `initial_value` to device memory fails.
*
* @param CUDA stream to use
*/
kernel_error(rmm::cuda_stream_view stream) : _error_code{0, stream} {}

/**
* @brief Return a pointer to the device memory for the error
*/
[[nodiscard]] auto data() { return _error_code.data(); }

/**
* @brief Return the current value of the error
*
* This uses the stream used to create this instance. This does a synchronize on the stream
* this object was instantiated with.
*/
[[nodiscard]] auto value() const { return _error_code.value(_error_code.stream()); }

/**
* @brief Return a hexadecimal string representation of the current error code
*
* Returned string will have "0x" prepended.
*/
[[nodiscard]] std::string str() const
{
std::stringstream sstream;
sstream << std::hex << value();
return "0x" + sstream.str();
}
};

} // namespace cudf::io::parquet
5 changes: 1 addition & 4 deletions cpp/src/io/parquet/page_data.cu
Original file line number Diff line number Diff line change
Expand Up @@ -599,10 +599,7 @@ __global__ void __launch_bounds__(decode_block_size)
}
__syncthreads();
}
if (t == 0 and s->error != 0) {
cuda::atomic_ref<int32_t, cuda::thread_scope_device> ref{*error_code};
ref.fetch_or(s->error, cuda::std::memory_order_relaxed);
}
if (t == 0 and s->error != 0) { set_error(s->error, error_code); }
}

struct mask_tform {
Expand Down
5 changes: 1 addition & 4 deletions cpp/src/io/parquet/page_delta_decode.cu
Original file line number Diff line number Diff line change
Expand Up @@ -151,10 +151,7 @@ __global__ void __launch_bounds__(96)
__syncthreads();
}

if (t == 0 and s->error != 0) {
cuda::atomic_ref<int32_t, cuda::thread_scope_device> ref{*error_code};
ref.fetch_or(s->error, cuda::std::memory_order_relaxed);
}
if (t == 0 and s->error != 0) { set_error(s->error, error_code); }
}

} // anonymous namespace
Expand Down
58 changes: 29 additions & 29 deletions cpp/src/io/parquet/page_hdr.cu
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@

#include "parquet_gpu.hpp"
#include <io/utilities/block_utils.cuh>

#include <cudf/detail/utilities/cuda.cuh>

#include <thrust/tuple.h>

#include <rmm/cuda_stream_view.hpp>
Expand All @@ -25,23 +28,6 @@ namespace cudf::io::parquet::detail {
// Minimal thrift implementation for parsing page headers
// https://github.com/apache/thrift/blob/master/doc/specs/thrift-compact-protocol.md

static const __device__ __constant__ uint8_t g_list2struct[16] = {0,
1,
2,
ST_FLD_BYTE,
ST_FLD_DOUBLE,
5,
ST_FLD_I16,
7,
ST_FLD_I32,
9,
ST_FLD_I64,
ST_FLD_BINARY,
ST_FLD_STRUCT,
ST_FLD_MAP,
ST_FLD_SET,
ST_FLD_LIST};

struct byte_stream_s {
uint8_t const* cur{};
uint8_t const* end{};
Expand Down Expand Up @@ -140,12 +126,13 @@ __device__ void skip_struct_field(byte_stream_s* bs, int field_type)
case ST_FLD_SET: { // NOTE: skipping a list of lists is not handled
auto const c = getb(bs);
int n = c >> 4;
if (n == 0xf) n = get_u32(bs);
field_type = g_list2struct[c & 0xf];
if (field_type == ST_FLD_STRUCT)
if (n == 0xf) { n = get_u32(bs); }
field_type = c & 0xf;
if (field_type == ST_FLD_STRUCT) {
struct_depth += n;
else
} else {
rep_cnt = n;
}
} break;
case ST_FLD_STRUCT: struct_depth++; break;
}
Expand Down Expand Up @@ -356,16 +343,20 @@ struct gpuParsePageHeader {
*/
// blockDim {128,1,1}
__global__ void __launch_bounds__(128)
gpuDecodePageHeaders(ColumnChunkDesc* chunks, int32_t num_chunks)
gpuDecodePageHeaders(ColumnChunkDesc* chunks, int32_t num_chunks, int32_t* error_code)
{
using cudf::detail::warp_size;
gpuParsePageHeader parse_page_header;
__shared__ byte_stream_s bs_g[4];

int lane_id = threadIdx.x % 32;
int chunk = (blockIdx.x * 4) + (threadIdx.x / 32);
byte_stream_s* const bs = &bs_g[threadIdx.x / 32];
int32_t error[4] = {0};
auto const lane_id = threadIdx.x % warp_size;
auto const warp_id = threadIdx.x / warp_size;
auto const chunk = (blockIdx.x * 4) + warp_id;
auto const bs = &bs_g[warp_id];

if (chunk < num_chunks and lane_id == 0) bs->ck = chunks[chunk];
if (chunk < num_chunks and lane_id == 0) { bs->ck = chunks[chunk]; }
if (lane_id == 0) { error[warp_id] = 0; }
__syncthreads();

if (chunk < num_chunks) {
Expand All @@ -376,7 +367,7 @@ __global__ void __launch_bounds__(128)
int32_t num_dict_pages = bs->ck.num_dict_pages;
PageInfo* page_info;

if (!lane_id) {
if (lane_id == 0) {
bs->base = bs->cur = bs->ck.compressed_data;
bs->end = bs->base + bs->ck.compressed_size;
bs->page.chunk_idx = chunk;
Expand Down Expand Up @@ -412,6 +403,9 @@ __global__ void __launch_bounds__(128)
bs->page.lvl_bytes[level_type::DEFINITION] = 0;
bs->page.lvl_bytes[level_type::REPETITION] = 0;
if (parse_page_header(bs) && bs->page.compressed_page_size >= 0) {
if (not is_supported_encoding(bs->page.encoding)) {
error[warp_id] |= static_cast<int32_t>(decode_error::UNSUPPORTED_ENCODING);
}
switch (bs->page_type) {
case PageType::DATA_PAGE:
index_out = num_dict_pages + data_page_count;
Expand Down Expand Up @@ -440,20 +434,25 @@ __global__ void __launch_bounds__(128)
}
bs->page.page_data = const_cast<uint8_t*>(bs->cur);
bs->cur += bs->page.compressed_page_size;
if (bs->cur > bs->end) {
error[warp_id] |= static_cast<int32_t>(decode_error::DATA_STREAM_OVERRUN);
}
bs->page.kernel_mask = kernel_mask_for_page(bs->page, bs->ck);
} else {
bs->cur = bs->end;
}
}
index_out = shuffle(index_out);
if (index_out >= 0 && index_out < max_num_pages && lane_id == 0)
if (index_out >= 0 && index_out < max_num_pages && lane_id == 0) {
page_info[index_out] = bs->page;
}
num_values = shuffle(num_values);
__syncwarp();
}
if (lane_id == 0) {
chunks[chunk].num_data_pages = data_page_count;
chunks[chunk].num_dict_pages = dictionary_page_count;
if (error[warp_id] != 0) { set_error(error[warp_id], error_code); }
}
}
}
Expand Down Expand Up @@ -509,11 +508,12 @@ __global__ void __launch_bounds__(128)

void __host__ DecodePageHeaders(ColumnChunkDesc* chunks,
int32_t num_chunks,
int32_t* error_code,
rmm::cuda_stream_view stream)
{
dim3 dim_block(128, 1);
dim3 dim_grid((num_chunks + 3) >> 2, 1); // 1 chunk per warp, 4 warps per block
gpuDecodePageHeaders<<<dim_grid, dim_block, 0, stream.value()>>>(chunks, num_chunks);
gpuDecodePageHeaders<<<dim_grid, dim_block, 0, stream.value()>>>(chunks, num_chunks, error_code);
}

void __host__ BuildStringDictionaryIndex(ColumnChunkDesc* chunks,
Expand Down
5 changes: 1 addition & 4 deletions cpp/src/io/parquet/page_string_decode.cu
Original file line number Diff line number Diff line change
Expand Up @@ -745,10 +745,7 @@ __global__ void __launch_bounds__(decode_block_size)
auto const offptr = reinterpret_cast<size_type*>(nesting_info_base[leaf_level_index].data_out);
block_excl_sum<decode_block_size>(offptr, value_count, s->page.str_offset);

if (t == 0 and s->error != 0) {
cuda::atomic_ref<int32_t, cuda::thread_scope_device> ref{*error_code};
ref.fetch_or(s->error, cuda::std::memory_order_relaxed);
}
if (t == 0 and s->error != 0) { set_error(s->error, error_code); }
}

} // anonymous namespace
Expand Down
32 changes: 31 additions & 1 deletion cpp/src/io/parquet/parquet_gpu.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@
#include <rmm/device_scalar.hpp>
#include <rmm/device_uvector.hpp>

#include <cuda/atomic>

#include <cuda_runtime.h>

#include <vector>
Expand All @@ -54,6 +56,30 @@ constexpr int rolling_index(int index)
return index % rolling_size;
}

// see setupLocalPageInfo() in page_decode.cuh for supported page encodings
constexpr bool is_supported_encoding(Encoding enc)
{
switch (enc) {
case Encoding::PLAIN:
case Encoding::PLAIN_DICTIONARY:
case Encoding::RLE:
case Encoding::RLE_DICTIONARY:
case Encoding::DELTA_BINARY_PACKED: return true;
default: return false;
}
}

/**
* @brief Atomically OR `error` into `error_code`.
*/
constexpr void set_error(int32_t error, int32_t* error_code)
{
if (error != 0) {
cuda::atomic_ref<int32_t, cuda::thread_scope_device> ref{*error_code};
ref.fetch_or(error, cuda::std::memory_order_relaxed);
}
}

/**
* @brief Enum for the different types of errors that can occur during decoding.
*
Expand Down Expand Up @@ -495,9 +521,13 @@ constexpr bool is_string_col(ColumnChunkDesc const& chunk)
*
* @param[in] chunks List of column chunks
* @param[in] num_chunks Number of column chunks
* @param[out] error_code Error code for kernel failures
* @param[in] stream CUDA stream to use
*/
void DecodePageHeaders(ColumnChunkDesc* chunks, int32_t num_chunks, rmm::cuda_stream_view stream);
void DecodePageHeaders(ColumnChunkDesc* chunks,
int32_t num_chunks,
int32_t* error_code,
rmm::cuda_stream_view stream);

/**
* @brief Launches kernel for building the dictionary index for the column
Expand Down
11 changes: 5 additions & 6 deletions cpp/src/io/parquet/reader_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/

#include "reader_impl.hpp"
#include "error.hpp"

#include <cudf/detail/stream_compaction.hpp>
#include <cudf/detail/transform.hpp>
Expand Down Expand Up @@ -163,7 +164,8 @@ void reader::impl::decode_page_data(size_t skip_rows, size_t num_rows)
chunk_nested_valids.host_to_device_async(_stream);
chunk_nested_data.host_to_device_async(_stream);

rmm::device_scalar<int32_t> error_code(0, _stream);
// create this before we fork streams
kernel_error error_code(_stream);

// get the number of streams we need from the pool and tell them to wait on the H2D copies
int const nkernels = std::bitset<32>(kernel_mask).count();
Expand Down Expand Up @@ -199,11 +201,8 @@ void reader::impl::decode_page_data(size_t skip_rows, size_t num_rows)
page_nesting.device_to_host_async(_stream);
page_nesting_decode.device_to_host_async(_stream);

auto const decode_error = error_code.value(_stream);
if (decode_error != 0) {
std::stringstream stream;
stream << std::hex << decode_error;
CUDF_FAIL("Parquet data decode failed with code(s) 0x" + stream.str());
if (error_code.value() != 0) {
CUDF_FAIL("Parquet data decode failed with code(s) " + error_code.str());
}

// for list columns, add the final offset to every offset buffer.
Expand Down
Loading

0 comments on commit e7c6365

Please sign in to comment.