Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Limit the number of keys to calculate column sizes and page starts in PQ reader to 1B #17059

Open
wants to merge 13 commits into
base: branch-24.12
Choose a base branch
from
75 changes: 47 additions & 28 deletions cpp/src/io/parquet/reader_impl_preprocess.cu
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
#include <thrust/unique.h>

#include <bitset>
#include <limits>
#include <numeric>

namespace cudf::io::parquet::detail {
Expand Down Expand Up @@ -1592,36 +1593,54 @@ void reader::impl::allocate_columns(read_mode mode, size_t skip_rows, size_t num
auto const d_cols_info = cudf::detail::make_device_uvector_async(
h_cols_info, _stream, cudf::get_current_device_resource_ref());

cudf::detail::hostdevice_vector<size_t> sizes{_input_columns.size() * max_depth, _stream};
auto const num_keys = _input_columns.size() * max_depth * subpass.pages.size();
auto constexpr max_keys_per_iter =
mhaseeb123 marked this conversation as resolved.
Show resolved Hide resolved
std::numeric_limits<size_type>::max() / 2; ///< Maximum 1billion keys per iteration
auto const num_keys_per_iter =
num_keys < max_keys_per_iter
? num_keys
: (max_depth * subpass.pages.size()) *
std::max<size_t>(1, max_keys_per_iter / (max_depth * subpass.pages.size()));
mhaseeb123 marked this conversation as resolved.
Show resolved Hide resolved
mhaseeb123 marked this conversation as resolved.
Show resolved Hide resolved
// size iterator. indexes pages by sorted order
rmm::device_uvector<size_type> size_input{num_keys, _stream};
thrust::transform(
rmm::exec_policy(_stream),
thrust::make_counting_iterator<size_type>(0),
thrust::make_counting_iterator<size_type>(num_keys),
size_input.begin(),
get_page_nesting_size{
d_cols_info.data(), max_depth, subpass.pages.size(), subpass.pages.device_begin()});
auto const reduction_keys =
cudf::detail::make_counting_transform_iterator(0, get_reduction_key{subpass.pages.size()});
cudf::detail::hostdevice_vector<size_t> sizes{_input_columns.size() * max_depth, _stream};

// find the size of each column
thrust::reduce_by_key(rmm::exec_policy(_stream),
reduction_keys,
reduction_keys + num_keys,
size_input.cbegin(),
thrust::make_discard_iterator(),
sizes.d_begin());

// for nested hierarchies, compute per-page start offset
thrust::exclusive_scan_by_key(
rmm::exec_policy(_stream),
reduction_keys,
reduction_keys + num_keys,
size_input.cbegin(),
start_offset_output_iterator{
subpass.pages.device_begin(), 0, d_cols_info.data(), max_depth, subpass.pages.size()});
rmm::device_uvector<size_type> size_input{num_keys_per_iter, _stream};
// To keep track of the starting key of an iteration
size_t key_start = 0;
// Until all keys are processed
while (key_start < num_keys) {
// Number of keys processed in this iteration
auto const num_keys_this_iter = std::min<size_t>(num_keys_per_iter, num_keys - key_start);
thrust::transform(
rmm::exec_policy_nosync(_stream),
thrust::make_counting_iterator<size_t>(key_start),
thrust::make_counting_iterator<size_t>(key_start + num_keys_this_iter),
size_input.begin(),
get_page_nesting_size{
d_cols_info.data(), max_depth, subpass.pages.size(), subpass.pages.device_begin()});
auto const reduction_keys = cudf::detail::make_counting_transform_iterator(
key_start, get_reduction_key{subpass.pages.size()});

// Find the size of each column
thrust::reduce_by_key(rmm::exec_policy_nosync(_stream),
reduction_keys,
reduction_keys + num_keys_this_iter,
size_input.cbegin(),
thrust::make_discard_iterator(),
sizes.d_begin() + (key_start / subpass.pages.size()));

// For nested hierarchies, compute per-page start offset
thrust::exclusive_scan_by_key(rmm::exec_policy_nosync(_stream),
reduction_keys,
reduction_keys + num_keys_this_iter,
size_input.cbegin(),
start_offset_output_iterator{subpass.pages.device_begin(),
key_start,
d_cols_info.data(),
max_depth,
subpass.pages.size()});
// Increment the key_start
key_start += num_keys_this_iter;
}

sizes.device_to_host_sync(_stream);
for (size_type idx = 0; idx < static_cast<size_type>(_input_columns.size()); idx++) {
Expand Down
Loading