Skip to content

Commit

Permalink
Implement concat_json to join JSON strings given by strings column (#…
Browse files Browse the repository at this point in the history
…2457)

* add concat_jsons

* Fix compile error

Signed-off-by: Nghia Truong <[email protected]>

* Optimize stream sync

Signed-off-by: Nghia Truong <[email protected]>

* Fix interface

Signed-off-by: Nghia Truong <[email protected]>

* Add JNI binding

Signed-off-by: Nghia Truong <[email protected]>

* Change delimiter Java type

Signed-off-by: Nghia Truong <[email protected]>

* Fix null mask

Signed-off-by: Nghia Truong <[email protected]>

* Return `is_valid` column

Signed-off-by: Nghia Truong <[email protected]>

* Separate input and output validity

Signed-off-by: Nghia Truong <[email protected]>

* Make changes in cudf's Java code

* Change `delimiter` type from `byte` to `char`, and rewrite docs

Signed-off-by: Nghia Truong <[email protected]>

* Restore source file

Signed-off-by: Nghia Truong <[email protected]>

* Rename file

Signed-off-by: Nghia Truong <[email protected]>

* Add new source file

Signed-off-by: Nghia Truong <[email protected]>

* Add file to cmake

Signed-off-by: Nghia Truong <[email protected]>

* Optimize implementation

Signed-off-by: Nghia Truong <[email protected]>

* Fix start character

Signed-off-by: Nghia Truong <[email protected]>

* Check for white space characters that are not just space character

Signed-off-by: Nghia Truong <[email protected]>

* Check for delimiter if the character is acceptable

Signed-off-by: Nghia Truong <[email protected]>

* Change `not_whitespace`

Signed-off-by: Nghia Truong <[email protected]>

* Optimize searching for delimiter in just one kernel call

Signed-off-by: Nghia Truong <[email protected]>

* Use existence map instead of histogram

Signed-off-by: Nghia Truong <[email protected]>

* Remove utf8 processing code

Signed-off-by: Nghia Truong <[email protected]>

* Fix `num_values`

Signed-off-by: Nghia Truong <[email protected]>

* Search only for 128 characters

Signed-off-by: Nghia Truong <[email protected]>

* Fix `is_null_or_empty`

Signed-off-by: Nghia Truong <[email protected]>

* Change back to use `cub::DeviceHistogram::HistogramEven`

* Implement `JSONUtils.makeStructs`

Signed-off-by: Nghia Truong <[email protected]>

* Rename variables and update docs

Signed-off-by: Nghia Truong <[email protected]>

* Misc

Signed-off-by: Nghia Truong <[email protected]>

* Add stream sync and extract code into separate functions for profiling

Signed-off-by: Nghia Truong <[email protected]>

* Revert "Add stream sync and extract code into separate functions for profiling"

This reverts commit a048801.

* Reorganize code

Signed-off-by: Nghia Truong <[email protected]>

* Revert "Reorganize code"

This reverts commit f10e73a.

* Misc

* Use one warp per row to improve performance

Signed-off-by: Nghia Truong <[email protected]>

* Optimize write

Signed-off-by: Nghia Truong <[email protected]>

* Revert "Optimize write"

This reverts commit 9af88ca.

* Reorganize code

Signed-off-by: Nghia Truong <[email protected]>

---------

Signed-off-by: Nghia Truong <[email protected]>
Co-authored-by: Karthikeyan Natarajan <[email protected]>
  • Loading branch information
ttnghia and karthikeyann authored Oct 10, 2024
1 parent 811103d commit 10fbcff
Show file tree
Hide file tree
Showing 5 changed files with 396 additions and 1 deletion.
1 change: 1 addition & 0 deletions src/main/cpp/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,7 @@ add_library(
src/from_json_to_raw_map.cu
src/get_json_object.cu
src/histogram.cu
src/json_utils.cu
src/murmur_hash.cu
src/parse_uri.cu
src/regex_rewrite_utils.cu
Expand Down
46 changes: 45 additions & 1 deletion src/main/cpp/src/JSONUtilsJni.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@
*/

#include "cudf_jni_apis.hpp"
#include "from_json.hpp"
#include "get_json_object.hpp"
#include "json_utils.hpp"

#include <cudf/strings/strings_column_view.hpp>

Expand Down Expand Up @@ -154,4 +154,48 @@ JNIEXPORT jlong JNICALL Java_com_nvidia_spark_rapids_jni_JSONUtils_extractRawMap
}
CATCH_STD(env, 0);
}

JNIEXPORT jlongArray JNICALL Java_com_nvidia_spark_rapids_jni_JSONUtils_concatenateJsonStrings(
JNIEnv* env, jclass, jlong j_input)
{
JNI_NULL_CHECK(env, j_input, "j_input is null", 0);

try {
cudf::jni::auto_set_device(env);
auto const input_cv = reinterpret_cast<cudf::column_view const*>(j_input);
auto [is_valid, joined_strings, delimiter] =
spark_rapids_jni::concat_json(cudf::strings_column_view{*input_cv});

// The output array contains 5 elements:
// [0]: address of the cudf::column object `is_valid` in host memory
// [1]: address of data buffer of the concatenated strings in device memory
// [2]: data length
// [3]: address of the rmm::device_buffer object (of the concatenated strings) in host memory
// [4]: delimiter char
auto out_handles = cudf::jni::native_jlongArray(env, 5);
out_handles[0] = reinterpret_cast<jlong>(is_valid.release());
out_handles[1] = reinterpret_cast<jlong>(joined_strings->data());
out_handles[2] = static_cast<jlong>(joined_strings->size());
out_handles[3] = reinterpret_cast<jlong>(joined_strings.release());
out_handles[4] = static_cast<jlong>(delimiter);
return out_handles.get_jArray();
}
CATCH_STD(env, 0);
}

JNIEXPORT jlong JNICALL Java_com_nvidia_spark_rapids_jni_JSONUtils_makeStructs(
JNIEnv* env, jclass, jlongArray j_children, jlong j_is_null)
{
JNI_NULL_CHECK(env, j_children, "j_children is null", 0);
JNI_NULL_CHECK(env, j_is_null, "j_is_null is null", 0);

try {
cudf::jni::auto_set_device(env);
auto const children =
cudf::jni::native_jpointerArray<cudf::column_view>{env, j_children}.get_dereferenced();
auto const is_null = *reinterpret_cast<cudf::column_view const*>(j_is_null);
return cudf::jni::ptr_as_jlong(spark_rapids_jni::make_structs(children, is_null).release());
}
CATCH_STD(env, 0);
}
}
275 changes: 275 additions & 0 deletions src/main/cpp/src/json_utils.cu
Original file line number Diff line number Diff line change
@@ -0,0 +1,275 @@
/*
* Copyright (c) 2024, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#include <cudf/column/column_device_view.cuh>
#include <cudf/detail/utilities/cuda.cuh>
#include <cudf/detail/valid_if.cuh>
#include <cudf/strings/detail/combine.hpp>
#include <cudf/strings/string_view.cuh>
#include <cudf/strings/strings_column_view.hpp>

#include <rmm/cuda_stream_view.hpp>
#include <rmm/device_uvector.hpp>
#include <rmm/exec_policy.hpp>

#include <cub/device/device_histogram.cuh>
#include <thrust/find.h>
#include <thrust/functional.h>
#include <thrust/iterator/counting_iterator.h>
#include <thrust/iterator/zip_iterator.h>
#include <thrust/transform.h>
#include <thrust/tuple.h>
#include <thrust/uninitialized_fill.h>

namespace spark_rapids_jni {

namespace detail {

namespace {

constexpr bool not_whitespace(cudf::char_utf8 ch)
{
return ch != ' ' && ch != '\r' && ch != '\n' && ch != '\t';
}

constexpr bool can_be_delimiter(char c)
{
// The character list below is from `json_reader_options.set_delimiter`.
switch (c) {
case '{':
case '[':
case '}':
case ']':
case ',':
case ':':
case '"':
case '\'':
case '\\':
case ' ':
case '\t':
case '\r': return false;
default: return true;
}
}

} // namespace

std::tuple<std::unique_ptr<cudf::column>, std::unique_ptr<rmm::device_buffer>, char> concat_json(
cudf::strings_column_view const& input,
rmm::cuda_stream_view stream,
rmm::device_async_resource_ref mr)
{
auto const d_input_ptr = cudf::column_device_view::create(input.parent(), stream);
auto const default_mr = rmm::mr::get_current_device_resource();

// Check if the input rows are either null, equal to `null` string literal, or empty.
// This will be used for masking out the input when doing string concatenation.
rmm::device_uvector<bool> is_valid_input(input.size(), stream, default_mr);

// Check if the input rows are either null or empty.
// This will be returned to the caller.
rmm::device_uvector<bool> is_null_or_empty(input.size(), stream, mr);

thrust::for_each(
rmm::exec_policy_nosync(stream),
thrust::make_counting_iterator(0L),
thrust::make_counting_iterator(input.size() * static_cast<int64_t>(cudf::detail::warp_size)),
[input = *d_input_ptr,
output = thrust::make_zip_iterator(thrust::make_tuple(
is_valid_input.begin(), is_null_or_empty.begin()))] __device__(int64_t tidx) {
// Execute one warp per row to minimize thread divergence.
if ((tidx % cudf::detail::warp_size) != 0) { return; }
auto const idx = tidx / cudf::detail::warp_size;

if (input.is_null(idx)) {
output[idx] = thrust::make_tuple(false, true);
return;
}

auto const d_str = input.element<cudf::string_view>(idx);
auto const size = d_str.size_bytes();
int i = 0;
char ch;

// Skip the very first whitespace characters.
for (; i < size; ++i) {
ch = d_str[i];
if (not_whitespace(ch)) { break; }
}

if (i + 3 < size &&
(d_str[i] == 'n' && d_str[i + 1] == 'u' && d_str[i + 2] == 'l' && d_str[i + 3] == 'l')) {
i += 4;

// Skip the very last whitespace characters.
bool is_null_literal{true};
for (; i < size; ++i) {
ch = d_str[i];
if (not_whitespace(ch)) {
is_null_literal = false;
break;
}
}

// The current row contains only `null` string literal and not any other non-whitespace
// characters. Such rows need to be masked out as null when doing concatenation.
if (is_null_literal) {
output[idx] = thrust::make_tuple(false, false);
return;
}
}

auto const not_eol = i < size;

// If the current row is not null or empty, it should start with `{`. Otherwise, we need to
// replace it by a null. This is necessary for libcudf's JSON reader to work.
// Note that if we want to support ARRAY schema, we need to check for `[` instead.
auto constexpr start_character = '{';
if (not_eol && ch != start_character) {
output[idx] = thrust::make_tuple(false, false);
return;
}

output[idx] = thrust::make_tuple(not_eol, !not_eol);
});

auto constexpr num_levels = 256;
auto constexpr lower_level = std::numeric_limits<char>::min();
auto constexpr upper_level = std::numeric_limits<char>::max();
auto const num_chars = input.chars_size(stream);

rmm::device_uvector<uint32_t> histogram(num_levels, stream, default_mr);
thrust::uninitialized_fill(
rmm::exec_policy_nosync(stream), histogram.begin(), histogram.end(), 0);

size_t temp_storage_bytes = 0;
cub::DeviceHistogram::HistogramEven(nullptr,
temp_storage_bytes,
input.chars_begin(stream),
histogram.begin(),
num_levels,
lower_level,
upper_level,
num_chars,
stream.value());
rmm::device_buffer d_temp(temp_storage_bytes, stream);
cub::DeviceHistogram::HistogramEven(d_temp.data(),
temp_storage_bytes,
input.chars_begin(stream),
histogram.begin(),
num_levels,
lower_level,
upper_level,
num_chars,
stream.value());

auto const it = thrust::make_counting_iterator(0);
auto const zero_level_idx = -lower_level; // the bin storing count for character `\0`
auto const zero_level_it = it + zero_level_idx;
auto const end = it + num_levels;

auto const first_zero_count_pos =
thrust::find_if(rmm::exec_policy_nosync(stream),
zero_level_it, // ignore the negative characters
end,
[zero_level_idx, counts = histogram.begin()] __device__(auto idx) -> bool {
auto const count = counts[idx];
if (count > 0) { return false; }
auto const first_non_existing_char = static_cast<char>(idx - zero_level_idx);
return can_be_delimiter(first_non_existing_char);
});

// This should never happen since the input should never cover the entire char range.
if (first_zero_count_pos == end) {
throw std::logic_error(
"Cannot find any character suitable as delimiter during joining json strings.");
}
auto const delimiter = static_cast<char>(thrust::distance(zero_level_it, first_zero_count_pos));

auto [null_mask, null_count] = cudf::detail::valid_if(
is_valid_input.begin(), is_valid_input.end(), thrust::identity{}, stream, default_mr);
// If the null count doesn't change, that mean we do not have any rows containing `null` string
// literal or empty rows. In such cases, just use the input column for concatenation.
auto const input_applied_null =
null_count == input.null_count()
? cudf::column_view{}
: cudf::column_view{cudf::data_type{cudf::type_id::STRING},
input.size(),
input.chars_begin(stream),
reinterpret_cast<cudf::bitmask_type const*>(null_mask.data()),
null_count,
0,
std::vector<cudf::column_view>{input.offsets()}};

auto concat_strings = cudf::strings::detail::join_strings(
null_count == input.null_count() ? input : cudf::strings_column_view{input_applied_null},
cudf::string_scalar(std::string(1, delimiter), true, stream, default_mr),
cudf::string_scalar("{}", true, stream, default_mr),
stream,
mr);

return {std::make_unique<cudf::column>(std::move(is_null_or_empty), rmm::device_buffer{}, 0),
std::move(concat_strings->release().data),
delimiter};
}

std::unique_ptr<cudf::column> make_structs(std::vector<cudf::column_view> const& children,
cudf::column_view const& is_null,
rmm::cuda_stream_view stream,
rmm::device_async_resource_ref mr)
{
if (children.size() == 0) { return nullptr; }

auto const row_count = children.front().size();
for (auto const& col : children) {
CUDF_EXPECTS(col.size() == row_count, "All columns must have the same number of rows.");
}

auto const [null_mask, null_count] = cudf::detail::valid_if(
is_null.begin<bool>(), is_null.end<bool>(), thrust::logical_not{}, stream, mr);

auto const structs =
cudf::column_view(cudf::data_type{cudf::type_id::STRUCT},
row_count,
nullptr,
reinterpret_cast<cudf::bitmask_type const*>(null_mask.data()),
null_count,
0,
children);
return std::make_unique<cudf::column>(structs, stream, mr);
}

} // namespace detail

std::tuple<std::unique_ptr<cudf::column>, std::unique_ptr<rmm::device_buffer>, char> concat_json(
cudf::strings_column_view const& input,
rmm::cuda_stream_view stream,
rmm::device_async_resource_ref mr)
{
CUDF_FUNC_RANGE();
return detail::concat_json(input, stream, mr);
}

std::unique_ptr<cudf::column> make_structs(std::vector<cudf::column_view> const& children,
cudf::column_view const& is_null,
rmm::cuda_stream_view stream,
rmm::device_async_resource_ref mr)
{
CUDF_FUNC_RANGE();
return detail::make_structs(children, is_null, stream, mr);
}

} // namespace spark_rapids_jni
12 changes: 12 additions & 0 deletions src/main/cpp/src/from_json.hpp → src/main/cpp/src/json_utils.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#include <cudf/utilities/default_stream.hpp>

#include <rmm/cuda_stream_view.hpp>
#include <rmm/device_buffer.hpp>
#include <rmm/resource_ref.hpp>

#include <memory>
Expand All @@ -31,4 +32,15 @@ std::unique_ptr<cudf::column> from_json_to_raw_map(
rmm::cuda_stream_view stream = cudf::get_default_stream(),
rmm::device_async_resource_ref mr = rmm::mr::get_current_device_resource());

std::tuple<std::unique_ptr<cudf::column>, std::unique_ptr<rmm::device_buffer>, char> concat_json(
cudf::strings_column_view const& input,
rmm::cuda_stream_view stream = cudf::get_default_stream(),
rmm::device_async_resource_ref mr = rmm::mr::get_current_device_resource());

std::unique_ptr<cudf::column> make_structs(
std::vector<cudf::column_view> const& input,
cudf::column_view const& is_null,
rmm::cuda_stream_view stream = cudf::get_default_stream(),
rmm::device_async_resource_ref mr = rmm::mr::get_current_device_resource());

} // namespace spark_rapids_jni
Loading

0 comments on commit 10fbcff

Please sign in to comment.