Skip to content
45 changes: 27 additions & 18 deletions cpp/src/groupby/hash/output_utils.cu
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
#include <cudf/aggregation.hpp>
#include <cudf/column/column_factories.hpp>
#include <cudf/detail/aggregation/aggregation.hpp>
#include <cudf/detail/utilities/integer_utils.hpp>
#include <cudf/dictionary/dictionary_column_view.hpp>
#include <cudf/null_mask.hpp>
#include <cudf/table/table_view.hpp>
Expand All @@ -30,6 +31,8 @@ namespace {
*
* This functor handles the creation of appropriately typed and sized columns for each
* aggregation, including special handling for SUM_WITH_OVERFLOW which requires a struct column.
* For data types smaller than 4 bytes, the buffer size is adjusted to be a multiple of 4 to
* ensure memory safety when atomic operations use 4-byte CAS loops to emulate smaller atomics.
*/
struct result_column_creator {
size_type output_size;
Expand All @@ -45,29 +48,35 @@ struct result_column_creator {

std::unique_ptr<column> operator()(column_view const& col, aggregation::Kind const& agg) const
{
auto const col_type =
is_dictionary(col.type()) ? dictionary_column_view(col).keys().type() : col.type();
auto const nullable =
agg != aggregation::COUNT_VALID && agg != aggregation::COUNT_ALL && col.has_nulls();

// Special handling for SUM_WITH_OVERFLOW which needs a struct column.
// TODO: Remove adjusted buffer size workaround once https://github.com/NVIDIA/cccl/issues/6430
// is fixed. Use adjusted buffer size for small data types to ensure atomic operation safety.
auto const make_uninitialized_column = [&](data_type d_type, size_type size, mask_state state) {
auto const type_size = cudf::size_of(d_type);
if (type_size < 4) {
auto adjusted_size = cudf::util::round_up_safe(size, static_cast<size_type>(4));
auto buffer = rmm::device_buffer(adjusted_size * type_size, stream, mr);
auto mask = create_null_mask(size, state, stream, mr);
auto const null_count = state_null_count(state, size);
return std::make_unique<column>(
d_type, size, std::move(buffer), std::move(mask), null_count);
}
return make_fixed_width_column(d_type, size, state, stream, mr);
};
if (agg != aggregation::SUM_WITH_OVERFLOW) {
auto const col_type =
is_dictionary(col.type()) ? dictionary_column_view(col).keys().type() : col.type();
auto const mask_flag = nullable ? mask_state::ALL_NULL : mask_state::UNALLOCATED;
return make_fixed_width_column(
cudf::detail::target_type(col_type, agg), output_size, mask_flag, stream, mr);
auto const target_type = cudf::detail::target_type(col_type, agg);
auto const mask_flag = nullable ? mask_state::ALL_NULL : mask_state::UNALLOCATED;
return make_uninitialized_column(target_type, output_size, mask_flag);
}

auto const make_empty_column = [&](type_id type_id, size_type size, mask_state mask_state) {
return make_fixed_width_column(data_type{type_id}, size, mask_state, stream, mr);
};

auto make_children = [&make_empty_column](size_type size) {
auto make_children = [&make_uninitialized_column](size_type size) {
std::vector<std::unique_ptr<column>> children;
// Create sum child column (int64_t) - no null mask needed, struct-level mask handles
// nullability
children.push_back(make_empty_column(type_id::INT64, size, mask_state::UNALLOCATED));
// Create overflow child column (bool) - no null mask needed, only value matters
children.push_back(make_empty_column(type_id::BOOL8, size, mask_state::UNALLOCATED));
children.push_back(
make_uninitialized_column(data_type{type_id::INT64}, size, mask_state::UNALLOCATED));
children.push_back(
make_uninitialized_column(data_type{type_id::BOOL8}, size, mask_state::UNALLOCATED));
return children;
};

Expand Down