Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Limit the number of keys to calculate column sizes and page starts in PQ reader to 1B #17059

Open
wants to merge 13 commits into
base: branch-24.12
Choose a base branch
from
Open
89 changes: 61 additions & 28 deletions cpp/src/io/parquet/reader_impl_preprocess.cu
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
#include <thrust/unique.h>

#include <bitset>
#include <limits>
#include <numeric>

namespace cudf::io::parquet::detail {
Expand Down Expand Up @@ -1592,36 +1593,68 @@ void reader::impl::allocate_columns(read_mode mode, size_t skip_rows, size_t num
auto const d_cols_info = cudf::detail::make_device_uvector_async(
h_cols_info, _stream, cudf::get_current_device_resource_ref());

auto const num_keys = _input_columns.size() * max_depth * subpass.pages.size();
// size iterator. indexes pages by sorted order
rmm::device_uvector<size_type> size_input{num_keys, _stream};
thrust::transform(
rmm::exec_policy(_stream),
thrust::make_counting_iterator<size_type>(0),
thrust::make_counting_iterator<size_type>(num_keys),
size_input.begin(),
get_page_nesting_size{
d_cols_info.data(), max_depth, subpass.pages.size(), subpass.pages.device_begin()});
auto const reduction_keys =
cudf::detail::make_counting_transform_iterator(0, get_reduction_key{subpass.pages.size()});
// Vector to store page sizes for each column at each depth
cudf::detail::hostdevice_vector<size_t> sizes{_input_columns.size() * max_depth, _stream};

// find the size of each column
thrust::reduce_by_key(rmm::exec_policy(_stream),
reduction_keys,
reduction_keys + num_keys,
size_input.cbegin(),
thrust::make_discard_iterator(),
sizes.d_begin());

// for nested hierarchies, compute per-page start offset
thrust::exclusive_scan_by_key(
rmm::exec_policy(_stream),
reduction_keys,
reduction_keys + num_keys,
size_input.cbegin(),
start_offset_output_iterator{
subpass.pages.device_begin(), 0, d_cols_info.data(), max_depth, subpass.pages.size()});
// Total number of keys to process
auto const num_keys = _input_columns.size() * max_depth * subpass.pages.size();

// Maximum 1 billion keys processed per iteration
auto constexpr max_keys_per_iter =
mhaseeb123 marked this conversation as resolved.
Show resolved Hide resolved
static_cast<size_t>(std::numeric_limits<size_type>::max() / 2);

// Number of keys for per each column
auto const num_keys_per_col = max_depth * subpass.pages.size();

// The largest multiple of `num_keys_per_col` that is <= `num_keys`
auto const num_keys_per_iter =
num_keys <= max_keys_per_iter
? num_keys
: num_keys_per_col * std::max<size_t>(1, max_keys_per_iter / num_keys_per_col);

// Size iterator. Indexes pages by sorted order
rmm::device_uvector<size_type> size_input{num_keys_per_iter, _stream};

// To keep track of the starting key of an iteration
size_t key_start = 0;
// Loop until all keys are processed
while (key_start < num_keys) {
// Number of keys processed in this iteration
auto const num_keys_this_iter = std::min<size_t>(num_keys_per_iter, num_keys - key_start);
thrust::transform(
rmm::exec_policy_nosync(_stream),
thrust::make_counting_iterator<size_t>(key_start),
thrust::make_counting_iterator<size_t>(key_start + num_keys_this_iter),
size_input.begin(),
get_page_nesting_size{
d_cols_info.data(), max_depth, subpass.pages.size(), subpass.pages.device_begin()});

// Manually create a int64_t `key_start` compatible counting_transform_iterator to avoid
// implicit casting to size_type.
auto const reduction_keys = thrust::make_transform_iterator(
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The smoking gun. cudf::detail::make_counting_transform_iterator implicitly typecasts key_start to size_type causing all sorts of problems.

thrust::make_counting_iterator<size_t>(key_start), get_reduction_key{subpass.pages.size()});

// Find the size of each column
thrust::reduce_by_key(rmm::exec_policy_nosync(_stream),
reduction_keys,
reduction_keys + num_keys_this_iter,
size_input.cbegin(),
thrust::make_discard_iterator(),
sizes.d_begin() + (key_start / subpass.pages.size()));

// For nested hierarchies, compute per-page start offset
thrust::exclusive_scan_by_key(rmm::exec_policy_nosync(_stream),
reduction_keys,
reduction_keys + num_keys_this_iter,
size_input.cbegin(),
start_offset_output_iterator{subpass.pages.device_begin(),
key_start,
d_cols_info.data(),
max_depth,
subpass.pages.size()});
// Increment the key_start
key_start += num_keys_this_iter;
}

sizes.device_to_host_sync(_stream);
for (size_type idx = 0; idx < static_cast<size_type>(_input_columns.size()); idx++) {
Expand Down
35 changes: 35 additions & 0 deletions cpp/tests/io/parquet_reader_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2724,3 +2724,38 @@ TYPED_TEST(ParquetReaderPredicatePushdownTest, FilterTyped)
EXPECT_EQ(result_table.num_columns(), expected->num_columns());
CUDF_TEST_EXPECT_TABLES_EQUAL(expected->view(), result_table);
}

TEST_F(ParquetReaderTest, ListsWideTable)
{
mhaseeb123 marked this conversation as resolved.
Show resolved Hide resolved
auto constexpr num_rows = 2;
auto constexpr num_cols = 26'755; // for slightly over 2B keys
auto constexpr seed = 0xceed;

std::mt19937 engine{seed};

auto list_list = make_parquet_list_list_col<int32_t>(0, num_rows, 1, 1, false);
auto list_list_nulls = make_parquet_list_list_col<int32_t>(0, num_rows, 1, 1, true);

// switch between nullable and non-nullable
std::vector<cudf::column_view> cols(num_cols);
bool with_nulls = false;
std::generate_n(cols.begin(), num_cols, [&]() {
auto const view = with_nulls ? list_list_nulls->view() : list_list->view();
with_nulls = not with_nulls;
return view;
});

cudf::table_view expected(cols);

// Use a host buffer for faster I/O
std::vector<char> buffer;
auto const out_opts =
cudf::io::parquet_writer_options::builder(cudf::io::sink_info{&buffer}, expected).build();
cudf::io::write_parquet(out_opts);

cudf::io::parquet_reader_options default_in_opts =
cudf::io::parquet_reader_options::builder(cudf::io::source_info(buffer.data(), buffer.size()));
auto const [result, _] = cudf::io::read_parquet(default_in_opts);

CUDF_TEST_EXPECT_TABLES_EQUAL(expected, result->view());
}
Loading