Skip to content

Commit

Permalink
Complete make_strings_column_batch
Browse files Browse the repository at this point in the history
Signed-off-by: Nghia Truong <[email protected]>
  • Loading branch information
ttnghia committed Oct 9, 2024
1 parent bd51a25 commit e252fb4
Show file tree
Hide file tree
Showing 7 changed files with 551 additions and 49 deletions.
1 change: 1 addition & 0 deletions cpp/benchmarks/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -383,6 +383,7 @@ ConfigureNVBench(
string/join_strings.cpp
string/lengths.cpp
string/like.cpp
string/make_strings_column.cu
string/replace_re.cpp
string/reverse.cpp
string/slice.cpp
Expand Down
117 changes: 117 additions & 0 deletions cpp/benchmarks/string/make_strings_column.cu
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
/*
* Copyright (c) 2024, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#include <benchmarks/common/generate_input.hpp>

#include <cudf/column/column_device_view.cuh>
#include <cudf/column/column_factories.hpp>

#include <rmm/device_uvector.hpp>
#include <rmm/exec_policy.hpp>

#include <thrust/pair.h>
#include <thrust/tabulate.h>

#include <nvbench/nvbench.cuh>

#include <vector>

constexpr int min_row_width = 0;
constexpr int max_row_width = 50;

static void BM_make_strings_column(nvbench::state& state)
{
auto const num_rows = static_cast<cudf::size_type>(state.get_int64("num_rows"));
auto const has_nulls = static_cast<bool>(state.get_int64("has_nulls"));

data_profile const table_profile =
data_profile_builder()
.distribution(cudf::type_id::STRING, distribution_id::NORMAL, min_row_width, max_row_width)
.null_probability(has_nulls ? std::optional<double>{0.1} : std::nullopt);
auto const data_table =
create_random_table({cudf::type_id::STRING}, row_count{num_rows}, table_profile);

using string_index_pair = thrust::pair<char const*, cudf::size_type>;
auto const stream = cudf::get_default_stream();
auto input = rmm::device_uvector<string_index_pair>(data_table->num_rows(), stream);
auto const d_data_ptr =
cudf::column_device_view::create(data_table->get_column(0).view(), stream);
thrust::tabulate(rmm::exec_policy(stream),
input.begin(),
input.end(),
[data_col = *d_data_ptr] __device__(auto const idx) {
if (data_col.is_null(idx)) { return string_index_pair{nullptr, 0}; }
auto const row = data_col.element<cudf::string_view>(idx);
return string_index_pair{row.data(), row.size_bytes()};
});

state.set_cuda_stream(nvbench::make_cuda_stream_view(stream.value()));
state.exec(nvbench::exec_tag::sync, [&](nvbench::launch& launch) {
[[maybe_unused]] auto const output = cudf::make_strings_column(input, stream);
});
}

static void BM_make_strings_column_batch(nvbench::state& state)
{
auto const num_rows = static_cast<cudf::size_type>(state.get_int64("num_rows"));
auto const has_nulls = static_cast<bool>(state.get_int64("has_nulls"));
auto const batch_size = static_cast<cudf::size_type>(state.get_int64("batch_size"));

data_profile const table_profile =
data_profile_builder()
.distribution(cudf::type_id::STRING, distribution_id::NORMAL, min_row_width, max_row_width)
.null_probability(has_nulls ? std::optional<double>{0.1} : std::nullopt);
auto const data_table = create_random_table(
cycle_dtypes({cudf::type_id::STRING}, batch_size), row_count{num_rows}, table_profile);

using string_index_pair = thrust::pair<char const*, cudf::size_type>;
auto const stream = cudf::get_default_stream();
auto input_data = std::vector<rmm::device_uvector<string_index_pair>>{};
auto input = std::vector<cudf::device_span<string_index_pair const>>{};
input_data.reserve(batch_size);
input.reserve(batch_size);
for (auto i = 0; i < batch_size; ++i) {
auto const d_data_ptr =
cudf::column_device_view::create(data_table->get_column(i).view(), stream);
auto batch_input = rmm::device_uvector<string_index_pair>(data_table->num_rows(), stream);
thrust::tabulate(rmm::exec_policy(stream),
batch_input.begin(),
batch_input.end(),
[data_col = *d_data_ptr] __device__(auto const idx) {
if (data_col.is_null(idx)) { return string_index_pair{nullptr, 0}; }
auto const row = data_col.element<cudf::string_view>(idx);
return string_index_pair{row.data(), row.size_bytes()};
});
input_data.emplace_back(std::move(batch_input));
input.emplace_back(input_data.back());
}

state.set_cuda_stream(nvbench::make_cuda_stream_view(stream.value()));
state.exec(nvbench::exec_tag::sync, [&](nvbench::launch& launch) {
[[maybe_unused]] auto const output = cudf::make_strings_column_batch(input, stream);
});
}

NVBENCH_BENCH(BM_make_strings_column)
.set_name("make_strings_column")
.add_int64_axis("num_rows", {100'000, 1'000'000, 10'000'000, 100'000'000, 200'000'000})
.add_int64_axis("has_nulls", {0, 1});

NVBENCH_BENCH(BM_make_strings_column_batch)
.set_name("make_strings_column_batch")
.add_int64_axis("num_rows", {1'000'000, 10'000'000, 20'000'000})
.add_int64_axis("has_nulls", {0, 1})
.add_int64_axis("batch_size", {10, 50, 100, 200});
19 changes: 19 additions & 0 deletions cpp/include/cudf/column/column_factories.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -378,6 +378,25 @@ std::unique_ptr<column> make_strings_column(
rmm::cuda_stream_view stream = cudf::get_default_stream(),
rmm::device_async_resource_ref mr = cudf::get_current_device_resource_ref());

/**
* @brief Construct a batch of STRING type columns given an array of device span of pointer/size
*
* This function has input/output expectation similar to the `make_strings_column()` API that
* accepts only one device span of pointer/size pairs. The difference is that, this is designed to
* create many strings column at once with minimal overhead of multiple kernel launches and
* stream synchronizations.
*
* @param input Array of device spans of pointer/size pairs, where each pointer is a device memory
* address or `nullptr` (indicating a null string), and size is string length (in bytes)
* @param stream CUDA stream used for device memory operations and kernel launches
* @param mr Device memory resource used for memory allocation of the output columns
* @return Array of constructed strings columns
*/
std::vector<std::unique_ptr<column>> make_strings_column_batch(
std::vector<cudf::device_span<thrust::pair<char const*, size_type> const>> const& input,
rmm::cuda_stream_view stream = cudf::get_default_stream(),
rmm::device_async_resource_ref mr = cudf::get_current_device_resource_ref());

/**
* @brief Construct a STRING type column given a device span of string_view.
*
Expand Down
68 changes: 68 additions & 0 deletions cpp/include/cudf/detail/valid_if.cuh
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

#include <cudf/detail/null_mask.hpp>
#include <cudf/detail/utilities/cuda.cuh>
#include <cudf/detail/utilities/vector_factories.hpp>
#include <cudf/types.hpp>
#include <cudf/utilities/bit.hpp>
#include <cudf/utilities/default_stream.hpp>
Expand Down Expand Up @@ -188,5 +189,72 @@ CUDF_KERNEL void valid_if_n_kernel(InputIterator1 begin1,
}
}

// TODO
/**
* @brief Populates a set of bitmasks by applying a binary predicate to two
* input ranges.
* Given a set of bitmasks, `masks`, the state of bit `j` in mask `i` is
* determined by `p( *(begin1 + i), *(begin2 + j))`. If the predicate evaluates
* to true, the bit is set to `1`. If false, set to `0`.
*
* Example Arguments:
* begin1: zero-based counting iterator,
* begin2: zero-based counting iterator,
* p: [](size_type col, size_type row){ return col == row; }
* masks: [[b00...], [b00...], [b00...]]
* mask_count: 3
* mask_num_bits: 2
* valid_counts: [0, 0, 0]
*
* Example Results:
* masks: [[b10...], [b01...], [b00...]]
* valid_counts: [1, 1, 0]
*
* @note If any mask in `masks` is `nullptr`, that mask will be ignored.
*
* @param begin1 LHS arguments to binary predicate. ex: column/mask idx
* @param begin2 RHS arguments to binary predicate. ex: row/bit idx
* @param p Predicate: `bit = p(begin1 + mask_idx, begin2 + bit_idx)`
* @param masks Masks for which bits will be obtained and assigned.
* @param mask_count The number of `masks`.
* @param mask_num_bits The number of bits to assign for each mask. If this
* number is smaller than the total number of bits, the
* remaining bits may not be initialized.
* @param valid_counts Used to obtain the total number of valid bits for each
* mask.
*/
template <int32_t block_size, typename ElementType, typename BinaryPredicate>
CUDF_KERNEL void valid_if_batch_kernel(device_span<device_span<ElementType const> const> input,
BinaryPredicate p,
bitmask_type* const* masks,
size_type* valid_counts)
{
for (std::size_t mask_idx = 0; mask_idx < input.size(); ++mask_idx) {
auto const mask_input = input[mask_idx];
auto const mask_num_bits = mask_input.size();
auto const out_mask = masks[mask_idx];

std::size_t block_offset{blockIdx.x * blockDim.x};
size_type warp_valid_count{0};
while (block_offset < mask_num_bits) {
auto const thread_idx = block_offset + threadIdx.x;
auto const thread_active = thread_idx < mask_num_bits;
auto const bit_is_valid = thread_active && p(mask_input[thread_idx]);
auto const warp_validity = __ballot_sync(0xffff'ffffu, bit_is_valid);

if (thread_active && threadIdx.x % warp_size == 0) {
out_mask[word_index(thread_idx)] = warp_validity;
}

warp_valid_count += __popc(warp_validity);
block_offset += blockDim.x * gridDim.x;
}

auto const block_valid_count = single_lane_block_sum_reduce<block_size, 0>(warp_valid_count);
if (threadIdx.x == 0) { atomicAdd(valid_counts + mask_idx, block_valid_count); }
}
}

} // namespace detail
} // namespace cudf
92 changes: 58 additions & 34 deletions cpp/include/cudf/strings/detail/strings_column_factories.cuh
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,61 @@ using string_index_pair = thrust::pair<char const*, size_type>;
*/
constexpr size_type FACTORY_BYTES_PER_ROW_THRESHOLD = 64;

/**
* @brief Gather characters to create a strings column using the given string_index_pair iterator
*
* @tparam IndexPairIterator iterator over type `pair<char const*,size_type>` values
*
* @param offsets The offsets for the output strings column
* @param chars_size The size (in bytes) of the chars data
* @param avg_bytes_per_row The average bytes per row
* @param begin Iterator to the first string_index_pair
* @param strings_count The number of strings
* @param stream CUDA stream used for device memory operations
* @param mr Device memory resource used to allocate the returned column's device memory
* @return An array of chars gathered from the input string_index_pair iterator
*/
template <typename IndexPairIterator>
rmm::device_uvector<char> make_chars_column(column_view const& offsets,
int64_t chars_size,
int64_t avg_bytes_per_row,
IndexPairIterator begin,
size_type strings_count,
rmm::cuda_stream_view stream,
rmm::device_async_resource_ref mr)
{
auto const d_offsets = cudf::detail::offsetalator_factory::make_input_iterator(offsets);

// use a character-parallel kernel for long string lengths
if (avg_bytes_per_row > FACTORY_BYTES_PER_ROW_THRESHOLD) {
auto const str_begin = thrust::make_transform_iterator(
begin, cuda::proclaim_return_type<string_view>([] __device__(auto ip) {
return string_view{ip.first, ip.second};
}));

return gather_chars(str_begin,
thrust::make_counting_iterator<size_type>(0),
thrust::make_counting_iterator<size_type>(strings_count),
d_offsets,
chars_size,
stream,
mr);
}
// this approach is 2-3x faster for a large number of smaller string lengths
auto chars_data = rmm::device_uvector<char>(chars_size, stream, mr);
auto d_chars = chars_data.data();
auto copy_chars = [d_chars] __device__(auto item) {
string_index_pair const str = thrust::get<0>(item);
int64_t const offset = thrust::get<1>(item);
if (str.first != nullptr) memcpy(d_chars + offset, str.first, str.second);
};
thrust::for_each_n(rmm::exec_policy_nosync(stream),
thrust::make_zip_iterator(thrust::make_tuple(begin, d_offsets)),
strings_count,
copy_chars);
return chars_data;
}

/**
* @brief Create a strings-type column from iterators of pointer/size pairs
*
Expand Down Expand Up @@ -88,8 +143,6 @@ std::unique_ptr<column> make_strings_column(IndexPairIterator begin,
auto offsets_transformer_itr = thrust::make_transform_iterator(begin, offsets_transformer);
auto [offsets_column, bytes] = cudf::strings::detail::make_offsets_child_column(
offsets_transformer_itr, offsets_transformer_itr + strings_count, stream, mr);
auto const d_offsets =
cudf::detail::offsetalator_factory::make_input_iterator(offsets_column->view());

// create null mask
auto validator = [] __device__(string_index_pair const item) { return item.first != nullptr; };
Expand All @@ -99,38 +152,9 @@ std::unique_ptr<column> make_strings_column(IndexPairIterator begin,
(null_count > 0) ? std::move(new_nulls.first) : rmm::device_buffer{0, stream, mr};

// build chars column
auto chars_data = [d_offsets, bytes = bytes, begin, strings_count, null_count, stream, mr] {
auto const avg_bytes_per_row = bytes / std::max(strings_count - null_count, 1);
// use a character-parallel kernel for long string lengths
if (avg_bytes_per_row > FACTORY_BYTES_PER_ROW_THRESHOLD) {
auto const str_begin = thrust::make_transform_iterator(
begin, cuda::proclaim_return_type<string_view>([] __device__(auto ip) {
return string_view{ip.first, ip.second};
}));

return gather_chars(str_begin,
thrust::make_counting_iterator<size_type>(0),
thrust::make_counting_iterator<size_type>(strings_count),
d_offsets,
bytes,
stream,
mr);
} else {
// this approach is 2-3x faster for a large number of smaller string lengths
auto chars_data = rmm::device_uvector<char>(bytes, stream, mr);
auto d_chars = chars_data.data();
auto copy_chars = [d_chars] __device__(auto item) {
string_index_pair const str = thrust::get<0>(item);
int64_t const offset = thrust::get<1>(item);
if (str.first != nullptr) memcpy(d_chars + offset, str.first, str.second);
};
thrust::for_each_n(rmm::exec_policy(stream),
thrust::make_zip_iterator(thrust::make_tuple(begin, d_offsets)),
strings_count,
copy_chars);
return chars_data;
}
}();
auto const avg_bytes_per_row = bytes / std::max(strings_count - null_count, 1);
auto chars_data = make_chars_column(
offsets_column->view(), bytes, avg_bytes_per_row, begin, strings_count, stream, mr);

return make_strings_column(strings_count,
std::move(offsets_column),
Expand Down
Loading

0 comments on commit e252fb4

Please sign in to comment.