diff --git a/cpp/src/io/parquet/reader_impl_preprocess.cu b/cpp/src/io/parquet/reader_impl_preprocess.cu index 8cab68ea721..5138a92ac14 100644 --- a/cpp/src/io/parquet/reader_impl_preprocess.cu +++ b/cpp/src/io/parquet/reader_impl_preprocess.cu @@ -44,6 +44,7 @@ #include #include +#include #include namespace cudf::io::parquet::detail { @@ -1592,36 +1593,68 @@ void reader::impl::allocate_columns(read_mode mode, size_t skip_rows, size_t num auto const d_cols_info = cudf::detail::make_device_uvector_async( h_cols_info, _stream, cudf::get_current_device_resource_ref()); - auto const num_keys = _input_columns.size() * max_depth * subpass.pages.size(); - // size iterator. indexes pages by sorted order - rmm::device_uvector size_input{num_keys, _stream}; - thrust::transform( - rmm::exec_policy(_stream), - thrust::make_counting_iterator(0), - thrust::make_counting_iterator(num_keys), - size_input.begin(), - get_page_nesting_size{ - d_cols_info.data(), max_depth, subpass.pages.size(), subpass.pages.device_begin()}); - auto const reduction_keys = - cudf::detail::make_counting_transform_iterator(0, get_reduction_key{subpass.pages.size()}); + // Vector to store page sizes for each column at each depth cudf::detail::hostdevice_vector sizes{_input_columns.size() * max_depth, _stream}; - // find the size of each column - thrust::reduce_by_key(rmm::exec_policy(_stream), - reduction_keys, - reduction_keys + num_keys, - size_input.cbegin(), - thrust::make_discard_iterator(), - sizes.d_begin()); - - // for nested hierarchies, compute per-page start offset - thrust::exclusive_scan_by_key( - rmm::exec_policy(_stream), - reduction_keys, - reduction_keys + num_keys, - size_input.cbegin(), - start_offset_output_iterator{ - subpass.pages.device_begin(), 0, d_cols_info.data(), max_depth, subpass.pages.size()}); + // Total number of keys to process + auto const num_keys = _input_columns.size() * max_depth * subpass.pages.size(); + + // Maximum 1 billion keys processed per iteration + auto constexpr max_keys_per_iter = + static_cast(std::numeric_limits::max() / 2); + + // Number of keys for per each column + auto const num_keys_per_col = max_depth * subpass.pages.size(); + + // The largest multiple of `num_keys_per_col` that is <= `num_keys` + auto const num_keys_per_iter = + num_keys <= max_keys_per_iter + ? num_keys + : num_keys_per_col * std::max(1, max_keys_per_iter / num_keys_per_col); + + // Size iterator. Indexes pages by sorted order + rmm::device_uvector size_input{num_keys_per_iter, _stream}; + + // To keep track of the starting key of an iteration + size_t key_start = 0; + // Loop until all keys are processed + while (key_start < num_keys) { + // Number of keys processed in this iteration + auto const num_keys_this_iter = std::min(num_keys_per_iter, num_keys - key_start); + thrust::transform( + rmm::exec_policy_nosync(_stream), + thrust::make_counting_iterator(key_start), + thrust::make_counting_iterator(key_start + num_keys_this_iter), + size_input.begin(), + get_page_nesting_size{ + d_cols_info.data(), max_depth, subpass.pages.size(), subpass.pages.device_begin()}); + + // Manually create a int64_t `key_start` compatible counting_transform_iterator to avoid + // implicit casting to size_type. + auto const reduction_keys = thrust::make_transform_iterator( + thrust::make_counting_iterator(key_start), get_reduction_key{subpass.pages.size()}); + + // Find the size of each column + thrust::reduce_by_key(rmm::exec_policy_nosync(_stream), + reduction_keys, + reduction_keys + num_keys_this_iter, + size_input.cbegin(), + thrust::make_discard_iterator(), + sizes.d_begin() + (key_start / subpass.pages.size())); + + // For nested hierarchies, compute per-page start offset + thrust::exclusive_scan_by_key(rmm::exec_policy_nosync(_stream), + reduction_keys, + reduction_keys + num_keys_this_iter, + size_input.cbegin(), + start_offset_output_iterator{subpass.pages.device_begin(), + key_start, + d_cols_info.data(), + max_depth, + subpass.pages.size()}); + // Increment the key_start + key_start += num_keys_this_iter; + } sizes.device_to_host_sync(_stream); for (size_type idx = 0; idx < static_cast(_input_columns.size()); idx++) { diff --git a/cpp/tests/io/parquet_reader_test.cpp b/cpp/tests/io/parquet_reader_test.cpp index 4a5309f3ba7..ab4645c2e25 100644 --- a/cpp/tests/io/parquet_reader_test.cpp +++ b/cpp/tests/io/parquet_reader_test.cpp @@ -2724,3 +2724,38 @@ TYPED_TEST(ParquetReaderPredicatePushdownTest, FilterTyped) EXPECT_EQ(result_table.num_columns(), expected->num_columns()); CUDF_TEST_EXPECT_TABLES_EQUAL(expected->view(), result_table); } + +TEST_F(ParquetReaderTest, ListsWideTable) +{ + auto constexpr num_rows = 2; + auto constexpr num_cols = 26'755; // for slightly over 2B keys + auto constexpr seed = 0xceed; + + std::mt19937 engine{seed}; + + auto list_list = make_parquet_list_list_col(0, num_rows, 1, 1, false); + auto list_list_nulls = make_parquet_list_list_col(0, num_rows, 1, 1, true); + + // switch between nullable and non-nullable + std::vector cols(num_cols); + bool with_nulls = false; + std::generate_n(cols.begin(), num_cols, [&]() { + auto const view = with_nulls ? list_list_nulls->view() : list_list->view(); + with_nulls = not with_nulls; + return view; + }); + + cudf::table_view expected(cols); + + // Use a host buffer for faster I/O + std::vector buffer; + auto const out_opts = + cudf::io::parquet_writer_options::builder(cudf::io::sink_info{&buffer}, expected).build(); + cudf::io::write_parquet(out_opts); + + cudf::io::parquet_reader_options default_in_opts = + cudf::io::parquet_reader_options::builder(cudf::io::source_info(buffer.data(), buffer.size())); + auto const [result, _] = cudf::io::read_parquet(default_in_opts); + + CUDF_TEST_EXPECT_TABLES_EQUAL(expected, result->view()); +}