Skip to content

Commit

Permalink
Optimize implementation
Browse files Browse the repository at this point in the history
Signed-off-by: Nghia Truong <[email protected]>
  • Loading branch information
ttnghia committed Sep 30, 2024
1 parent c19a3df commit 91769c7
Showing 1 changed file with 61 additions and 48 deletions.
109 changes: 61 additions & 48 deletions src/main/cpp/src/json_utils.cu
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#include <cudf/strings/strings_column_view.hpp>

#include <rmm/cuda_stream_view.hpp>
#include <rmm/device_uvector.hpp>
#include <rmm/exec_policy.hpp>

#include <cub/device/device_histogram.cuh>
Expand All @@ -41,62 +42,63 @@ std::tuple<std::unique_ptr<cudf::column>, std::unique_ptr<rmm::device_buffer>, c
rmm::device_async_resource_ref mr)
{
auto const d_input_ptr = cudf::column_device_view::create(input.parent(), stream);
auto const default_mr = rmm::mr::get_current_device_resource();

rmm::device_uvector<bool> is_valid_input(input.size(), stream);
// Check if the input rows are either null, equal to `null` string literal, or empty.
// This will be used for masking out the input when doing string concatenation.
rmm::device_uvector<bool> is_valid_input(input.size(), stream, default_mr);

// Check if the input rows are either null or empty.
// This will be returned to the caller.
rmm::device_uvector<bool> is_null_or_empty(input.size(), stream, mr);
rmm::device_scalar<char> first_char(stream);

thrust::transform(
rmm::exec_policy(stream),
rmm::exec_policy_nosync(stream),
thrust::make_counting_iterator(0),
thrust::make_counting_iterator(input.size()),
thrust::make_zip_iterator(thrust::make_tuple(is_valid_input.begin(), is_null_or_empty.begin())),
[input = *d_input_ptr,
first_char = first_char.data()] __device__(cudf::size_type idx) -> thrust::tuple<bool, bool> {
[input = *d_input_ptr] __device__(cudf::size_type idx) -> thrust::tuple<bool, bool> {
if (input.is_null(idx)) { return {false, false}; }

auto const d_str = input.element<cudf::string_view>(idx);
int i = 0;
// Currently, only check for empty character.
constexpr char empty_char{' '};

auto const d_str = input.element<cudf::string_view>(idx);
cudf::size_type i = 0;
for (; i < d_str.size_bytes(); ++i) {
if (d_str[i] != ' ') { break; }
if (d_str[i] != empty_char) { break; }
}

bool is_null_literal{false};
if (i + 4 <= d_str.size_bytes() && d_str[i] == 'n' && d_str[i + 1] == 'u' &&
d_str[i + 2] == 'l' && d_str[i + 3] == 'l') {
if (i + 3 < d_str.size_bytes() &&
(d_str[i] == 'n' && d_str[i + 1] == 'u' && d_str[i + 2] == 'l' && d_str[i + 3] == 'l')) {
is_null_literal = true;
i += 4;
}

for (; i < d_str.size_bytes(); ++i) {
if (d_str[i] != ' ') {
if (d_str[i] != empty_char) {
is_null_literal = false;
break;
}
}

// The current row contains only `null` string literal and not any other non-empty characters.
// Such rows need to be masked out as null when doing concatenation.
if (is_null_literal) { return {false, true}; }

auto const not_empty = i + 1 < d_str.size_bytes();
if (idx == 0) { *first_char = not_empty ? d_str[i] : '\0'; }

return {not_empty, not_empty};
auto const not_eol = i < d_str.size_bytes();
return {not_eol, not_eol};
});
auto [null_mask, null_count] = cudf::detail::valid_if(
is_valid_input.begin(), is_valid_input.end(), thrust::identity{}, stream, mr);

auto constexpr num_levels = 256;
auto constexpr lower_level = std::numeric_limits<char>::min();
auto constexpr upper_level = std::numeric_limits<char>::max();

char h_first_char;
CUDF_CUDA_TRY(cudaMemcpyAsync(
&h_first_char, first_char.data(), sizeof(char), cudaMemcpyDefault, stream.value()));
auto const num_chars = input.chars_size(stream); // stream sync

// TODO: return when num_chars==0
auto const num_chars = input.chars_size(stream);

rmm::device_uvector<uint32_t> d_histogram(num_levels, stream);
thrust::uninitialized_fill(rmm::exec_policy(stream), d_histogram.begin(), d_histogram.end(), 0);
thrust::uninitialized_fill(
rmm::exec_policy_nosync(stream), d_histogram.begin(), d_histogram.end(), 0);

size_t temp_storage_bytes = 0;
cub::DeviceHistogram::HistogramEven(nullptr,
Expand All @@ -120,37 +122,48 @@ std::tuple<std::unique_ptr<cudf::column>, std::unique_ptr<rmm::device_buffer>, c
stream.value());

auto const zero_level = d_histogram.begin() - lower_level;
auto const first_zero_count_pos =
thrust::find(rmm::exec_policy(stream), zero_level + '\n', d_histogram.end(), 0);

// Firstly, search from the `\n` character to the end of the histogram,
// so the delimiter will be `\n` if it doesn't exist in the input.
auto first_zero_count_pos =
thrust::find(rmm::exec_policy_nosync(stream), zero_level + '\n', d_histogram.end(), 0);
if (first_zero_count_pos == d_histogram.end()) {
throw std::logic_error(
"can't find a character suitable as delimiter for combining json strings to json lines with "
"custom delimiter");
// Try again, but search from the beginning of the histogram to the last begin position.
first_zero_count_pos =
thrust::find(rmm::exec_policy_nosync(stream), d_histogram.begin(), zero_level + '\n', 0);

// This should never happen, since we are searching even with the characters starting from `\0`.
if (first_zero_count_pos == d_histogram.end()) {
throw std::logic_error(
"Cannot find any character suitable as delimiter during joining json strings.");
}
}
auto const first_non_existing_char = static_cast<char>(first_zero_count_pos - zero_level);

auto [null_mask, null_count] = cudf::detail::valid_if(
is_valid_input.begin(), is_valid_input.end(), thrust::identity{}, stream, default_mr);
// If the null count doesn't change, that mean we do not have any rows containing `null` string
// literal or empty rows. In such cases, just use the input column for concatenation.
auto const input_applied_null =
null_count == 0
? nullptr
: cudf::purge_nonempty_nulls(
cudf::column_view{cudf::data_type{cudf::type_id::STRING},
input.size(),
input.chars_begin(stream),
reinterpret_cast<cudf::bitmask_type const*>(null_mask.data()),
null_count,
0,
std::vector<cudf::column_view>{input.offsets()}},
stream);

auto const first_non_existing_char = first_zero_count_pos - zero_level;
auto all_done = cudf::strings::detail::join_strings(
null_count == 0 ? input : cudf::strings_column_view{input_applied_null->view()},
cudf::string_scalar(std::string(1, first_non_existing_char), true, stream, mr),
cudf::string_scalar(h_first_char == '[' ? "[]" : "{}", true, stream, mr),
null_count == input.null_count()
? cudf::column_view{}
: cudf::column_view{cudf::data_type{cudf::type_id::STRING},
input.size(),
input.chars_begin(stream),
reinterpret_cast<cudf::bitmask_type const*>(null_mask.data()),
null_count,
0,
std::vector<cudf::column_view>{input.offsets()}};

auto concat_strings = cudf::strings::detail::join_strings(
null_count == input.null_count() ? input : cudf::strings_column_view{input_applied_null},
cudf::string_scalar(std::string(1, first_non_existing_char), true, stream, default_mr),
cudf::string_scalar("{}", true, stream, default_mr),
stream,
mr);

return {std::make_unique<cudf::column>(std::move(is_null_or_empty), rmm::device_buffer{}, 0),
std::move(all_done->release().data),
std::move(concat_strings->release().data),
first_non_existing_char};
}

Expand Down

0 comments on commit 91769c7

Please sign in to comment.