Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Limit the number of keys to calculate column sizes and page starts in PQ reader to 1B #17059

Open
wants to merge 13 commits into
base: branch-24.12
Choose a base branch
from
Open
77 changes: 48 additions & 29 deletions cpp/src/io/parquet/reader_impl_preprocess.cu
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
#include <thrust/unique.h>

#include <bitset>
#include <limits>
#include <numeric>

namespace cudf::io::parquet::detail {
Expand Down Expand Up @@ -1592,36 +1593,54 @@ void reader::impl::allocate_columns(read_mode mode, size_t skip_rows, size_t num
auto const d_cols_info = cudf::detail::make_device_uvector_async(
h_cols_info, _stream, cudf::get_current_device_resource_ref());

auto const num_keys = _input_columns.size() * max_depth * subpass.pages.size();
// size iterator. indexes pages by sorted order
rmm::device_uvector<size_type> size_input{num_keys, _stream};
thrust::transform(
rmm::exec_policy(_stream),
thrust::make_counting_iterator<size_type>(0),
thrust::make_counting_iterator<size_type>(num_keys),
size_input.begin(),
get_page_nesting_size{
d_cols_info.data(), max_depth, subpass.pages.size(), subpass.pages.device_begin()});
auto const reduction_keys =
cudf::detail::make_counting_transform_iterator(0, get_reduction_key{subpass.pages.size()});
cudf::detail::hostdevice_vector<size_t> sizes{_input_columns.size() * max_depth, _stream};

// find the size of each column
thrust::reduce_by_key(rmm::exec_policy(_stream),
reduction_keys,
reduction_keys + num_keys,
size_input.cbegin(),
thrust::make_discard_iterator(),
sizes.d_begin());

// for nested hierarchies, compute per-page start offset
thrust::exclusive_scan_by_key(
rmm::exec_policy(_stream),
reduction_keys,
reduction_keys + num_keys,
size_input.cbegin(),
start_offset_output_iterator{
subpass.pages.device_begin(), 0, d_cols_info.data(), max_depth, subpass.pages.size()});
auto const num_keys = _input_columns.size() * max_depth * subpass.pages.size();
auto constexpr max_keys_per_iter = static_cast<size_t>(
std::numeric_limits<size_type>::max() / 2); ///< Maximum 1billion keys per iteration
auto const num_keys_per_iter =
num_keys < max_keys_per_iter
? num_keys
: (max_depth * subpass.pages.size()) *
std::max<size_t>(1, max_keys_per_iter / (max_depth * subpass.pages.size()));
mhaseeb123 marked this conversation as resolved.
Show resolved Hide resolved
mhaseeb123 marked this conversation as resolved.
Show resolved Hide resolved
// size iterator. indexes pages by sorted order
rmm::device_uvector<size_type> size_input{num_keys_per_iter, _stream};
// To keep track of the starting key of an iteration
size_t key_start = 0;
// Until all keys are processed
while (key_start < num_keys) {
// Number of keys processed in this iteration
auto const num_keys_this_iter = std::min<size_t>(num_keys_per_iter, num_keys - key_start);
thrust::transform(
rmm::exec_policy_nosync(_stream),
thrust::make_counting_iterator<size_t>(key_start),
thrust::make_counting_iterator<size_t>(key_start + num_keys_this_iter),
size_input.begin(),
get_page_nesting_size{
d_cols_info.data(), max_depth, subpass.pages.size(), subpass.pages.device_begin()});
auto const reduction_keys = cudf::detail::make_counting_transform_iterator(
key_start, get_reduction_key{subpass.pages.size()});

// Find the size of each column
thrust::reduce_by_key(rmm::exec_policy_nosync(_stream),
reduction_keys,
reduction_keys + num_keys_this_iter,
size_input.cbegin(),
thrust::make_discard_iterator(),
sizes.d_begin() + (key_start / subpass.pages.size()));

// For nested hierarchies, compute per-page start offset
thrust::exclusive_scan_by_key(rmm::exec_policy_nosync(_stream),
reduction_keys,
reduction_keys + num_keys_this_iter,
size_input.cbegin(),
start_offset_output_iterator{subpass.pages.device_begin(),
key_start,
d_cols_info.data(),
max_depth,
subpass.pages.size()});
// Increment the key_start
key_start += num_keys_this_iter;
}

sizes.device_to_host_sync(_stream);
for (size_type idx = 0; idx < static_cast<size_type>(_input_columns.size()); idx++) {
Expand Down
35 changes: 35 additions & 0 deletions cpp/tests/io/parquet_reader_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2724,3 +2724,38 @@ TYPED_TEST(ParquetReaderPredicatePushdownTest, FilterTyped)
EXPECT_EQ(result_table.num_columns(), expected->num_columns());
CUDF_TEST_EXPECT_TABLES_EQUAL(expected->view(), result_table);
}

TEST_F(ParquetReaderTest, ListsWideTable)
{
mhaseeb123 marked this conversation as resolved.
Show resolved Hide resolved
auto constexpr num_rows = 5;
auto constexpr num_cols = 32'768;
mhaseeb123 marked this conversation as resolved.
Show resolved Hide resolved
auto constexpr seed = 0xceed;

std::mt19937 engine{seed};

auto str_list_nulls = make_parquet_string_list_col(engine, num_rows, 5, 32, true);
auto str_list = make_parquet_string_list_col(engine, num_rows, 5, 32, false);

// switch between nullable and non-nullable
std::vector<cudf::column_view> cols(num_cols);
bool with_nulls = false;
std::generate_n(cols.begin(), num_cols, [&]() {
auto const view = with_nulls ? str_list_nulls->view() : str_list->view();
with_nulls = not with_nulls;
return view;
});

cudf::table_view expected(cols);

// Use a host buffer for faster I/O
std::vector<char> buffer;
auto const out_opts =
cudf::io::parquet_writer_options::builder(cudf::io::sink_info{&buffer}, expected).build();
cudf::io::write_parquet(out_opts);

cudf::io::parquet_reader_options default_in_opts =
cudf::io::parquet_reader_options::builder(cudf::io::source_info(buffer.data(), buffer.size()));
auto const [result, _] = cudf::io::read_parquet(default_in_opts);

CUDF_TEST_EXPECT_TABLES_EQUAL(expected, result->view());
}
Loading