Skip to content

Commit

Permalink
Add more tests and general cleanup.
Browse files Browse the repository at this point in the history
Signed-off-by: db <[email protected]>
  • Loading branch information
nvdbaranec committed Jul 14, 2023
1 parent 96f1e0c commit 23881e1
Show file tree
Hide file tree
Showing 6 changed files with 220 additions and 107 deletions.
52 changes: 34 additions & 18 deletions src/main/cpp/src/BloomFilterJni.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,33 +23,46 @@

extern "C" {

JNIEXPORT jlong JNICALL Java_com_nvidia_spark_rapids_jni_BloomFilter_build(
JNIEnv* env, jclass, jlong bloom_filter, jlong bloom_filter_bytes, jint bloom_filter_bits, jlong cv, jint num_hashes)
JNIEXPORT jlong JNICALL Java_com_nvidia_spark_rapids_jni_BloomFilter_put(JNIEnv* env,
jclass,
jlong bloom_filter,
jlong bloom_filter_bytes,
long bloom_filter_bits,
jlong cv,
jint num_hashes)
{
try {
cudf::jni::auto_set_device(env);

cudf::column_view input_column{*reinterpret_cast<cudf::column_view const*>(cv)};
spark_rapids_jni::bloom_filter_build({reinterpret_cast<cudf::bitmask_type*>(bloom_filter), static_cast<std::size_t>(bloom_filter_bytes / 4)},
bloom_filter_bits,
input_column,
num_hashes);
spark_rapids_jni::bloom_filter_put({reinterpret_cast<cudf::bitmask_type*>(bloom_filter),
static_cast<std::size_t>(bloom_filter_bytes / 4)},
bloom_filter_bits,
input_column,
num_hashes);
return 0;
}
CATCH_STD(env, 0);
}

JNIEXPORT jlong JNICALL Java_com_nvidia_spark_rapids_jni_BloomFilter_probe(
JNIEnv* env, jclass, jlong cv, jlong bloom_filter, jlong bloom_filter_bytes, jint bloom_filter_bits, jint num_hashes)
JNIEXPORT jlong JNICALL Java_com_nvidia_spark_rapids_jni_BloomFilter_probe(JNIEnv* env,
jclass,
jlong cv,
jlong bloom_filter,
jlong bloom_filter_bytes,
long bloom_filter_bits,
jint num_hashes)
{
try {
cudf::jni::auto_set_device(env);

cudf::column_view input_column{*reinterpret_cast<cudf::column_view const*>(cv)};
return cudf::jni::release_as_jlong(spark_rapids_jni::bloom_filter_probe(input_column,
{reinterpret_cast<cudf::bitmask_type const*>(bloom_filter), static_cast<std::size_t>(bloom_filter_bytes / 4)},
bloom_filter_bits,
num_hashes));
return cudf::jni::release_as_jlong(spark_rapids_jni::bloom_filter_probe(
input_column,
{reinterpret_cast<cudf::bitmask_type const*>(bloom_filter),
static_cast<std::size_t>(bloom_filter_bytes / 4)},
bloom_filter_bits,
num_hashes));
}
CATCH_STD(env, 0);
}
Expand All @@ -59,13 +72,17 @@ JNIEXPORT jlongArray JNICALL Java_com_nvidia_spark_rapids_jni_BloomFilter_merge(
{
try {
cudf::jni::auto_set_device(env);

cudf::jni::native_jpointerArray<cudf::bitmask_type> jbuffers{env, bloom_filters};
std::vector<cudf::device_span<cudf::bitmask_type const>> cbloom_filters(jbuffers.size());
std::transform(jbuffers.begin(), jbuffers.end(), cbloom_filters.begin(), [bloom_filter_bytes](cudf::bitmask_type const* buf){
return cudf::device_span<cudf::bitmask_type const>{buf, static_cast<std::size_t>(bloom_filter_bytes / 4)};
});

std::transform(jbuffers.begin(),
jbuffers.end(),
cbloom_filters.begin(),
[bloom_filter_bytes](cudf::bitmask_type const* buf) {
return cudf::device_span<cudf::bitmask_type const>{
buf, static_cast<std::size_t>(bloom_filter_bytes / 4)};
});

auto merged = spark_rapids_jni::bitmask_bitwise_or(cbloom_filters);

cudf::jni::native_jlongArray result(env, 2);
Expand All @@ -75,5 +92,4 @@ JNIEXPORT jlongArray JNICALL Java_com_nvidia_spark_rapids_jni_BloomFilter_merge(
}
CATCH_STD(env, 0);
}

}
40 changes: 21 additions & 19 deletions src/main/cpp/src/bloom_filter.cu
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,10 @@ __device__ inline std::pair<cudf::size_type, cudf::bitmask_type> gpu_get_hash_ma
return {word_index, (1 << bit_index)};
}

__global__ void gpu_bloom_filter_build(cudf::bitmask_type* const bloom_filter,
cudf::size_type bloom_filter_bits,
cudf::device_span<int64_t const> input,
cudf::size_type num_hashes)
__global__ void gpu_bloom_filter_put(cudf::bitmask_type* const bloom_filter,
cudf::size_type bloom_filter_bits,
cudf::device_span<int64_t const> input,
cudf::size_type num_hashes)
{
int const tid = threadIdx.x + blockIdx.x * blockDim.x;
if (tid >= input.size()) { return; }
Expand All @@ -70,7 +70,7 @@ struct bloom_probe_functor {
__device__ bool operator()(int64_t input)
{
// https://github.com/apache/spark/blob/7bfbeb62cb1dc58d81243d22888faa688bad8064/common/sketch/src/main/java/org/apache/spark/util/sketch/BloomFilterImpl.java#L110
// this code could be combined with the very similar code in gpu_bloom_filter_build. i've
// this code could be combined with the very similar code in gpu_bloom_filter_put. i've
// left it this way since the expectation is that we will early out fairly often, whereas
// in the build case we never early out so doing the additional if() return check is pointless.
bloom_hash_type const h1 = MurmurHash3_32<int64_t>(0)(input);
Expand All @@ -88,36 +88,36 @@ struct bloom_probe_functor {

} // anonymous namespace

std::unique_ptr<rmm::device_buffer> bloom_filter_create(cudf::size_type bloom_filter_bits,
std::unique_ptr<rmm::device_buffer> bloom_filter_create(int64_t bloom_filter_bits,
rmm::cuda_stream_view stream,
rmm::mr::device_memory_resource* mr)
{
std::unique_ptr<rmm::device_buffer> out = std::make_unique<rmm::device_buffer>(
cudf::num_bitmask_words(bloom_filter_bits) * sizeof(cudf::bitmask_type), stream, mr);
cudaMemsetAsync(out->data(), 0, out->size() * sizeof(cudf::bitmask_type), stream);
cudaMemsetAsync(out->data(), 0, out->size(), stream);
return out;
}

void bloom_filter_build(cudf::device_span<cudf::bitmask_type> bloom_filter,
cudf::size_type bloom_filter_bits,
cudf::column_view const& input,
cudf::size_type num_hashes,
rmm::cuda_stream_view stream)
void bloom_filter_put(cudf::device_span<cudf::bitmask_type> bloom_filter,
int64_t bloom_filter_bits,
cudf::column_view const& input,
cudf::size_type num_hashes,
rmm::cuda_stream_view stream)
{
CUDF_EXPECTS(input.type() == cudf::data_type{cudf::type_id::INT64} && !input.nullable(),
"bloom filter input expects a non-nullable column of int64s");
CUDF_EXPECTS(bloom_filter_bits > 0, "Invalid empty bloom filter size specified");

constexpr int block_size = 256;
auto grid = cudf::detail::grid_1d{input.size(), block_size, 1};
gpu_bloom_filter_build<<<grid.num_blocks, block_size, 0, stream.value()>>>(
gpu_bloom_filter_put<<<grid.num_blocks, block_size, 0, stream.value()>>>(
bloom_filter.data(), bloom_filter_bits, input, num_hashes);
}

std::unique_ptr<cudf::column> bloom_filter_probe(
cudf::column_view const& input,
cudf::device_span<cudf::bitmask_type const> bloom_filter,
cudf::size_type bloom_filter_bits,
int64_t bloom_filter_bits,
cudf::size_type num_hashes,
rmm::cuda_stream_view stream,
rmm::mr::device_memory_resource* mr)
Expand All @@ -128,11 +128,13 @@ std::unique_ptr<cudf::column> bloom_filter_probe(

auto out = cudf::make_fixed_width_column(
cudf::data_type{cudf::type_id::BOOL8}, input.size(), cudf::mask_state::UNALLOCATED, stream, mr);
thrust::transform(rmm::exec_policy(stream),
input.begin<int64_t>(),
input.end<int64_t>(),
out->mutable_view().begin<bool>(),
bloom_probe_functor{bloom_filter.data(), bloom_filter_bits, num_hashes});
thrust::transform(
rmm::exec_policy(stream),
input.begin<int64_t>(),
input.end<int64_t>(),
out->mutable_view().begin<bool>(),
bloom_probe_functor{
bloom_filter.data(), static_cast<cudf::size_type>(bloom_filter_bits), num_hashes});
return out;
}

Expand Down
16 changes: 9 additions & 7 deletions src/main/cpp/src/bloom_filter.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,13 +32,15 @@ namespace spark_rapids_jni {
*
*/
std::unique_ptr<rmm::device_buffer> bloom_filter_create(
cudf::size_type bloom_filter_bits,
int64_t bloom_filter_bits,
rmm::cuda_stream_view stream = cudf::get_default_stream(),
rmm::mr::device_memory_resource* mr = rmm::mr::get_current_device_resource());

/**
* @brief Builds a bloom filter by hashing input int64_t values using xxhash64.
*
* Can be called multiple times on the same bloom_filter buffer.
*
* @param[in,out] bloom_filter The bloom filter to be constructed. The function expects that the
* buffer has already been initialized to 0.
* @param bloom_filter_bits Size of the bloom filter in bits.
Expand All @@ -47,11 +49,11 @@ std::unique_ptr<rmm::device_buffer> bloom_filter_create(
* @param stream CUDA stream used for device memory operations and kernel launches.
*
*/
void bloom_filter_build(cudf::device_span<cudf::bitmask_type> bloom_filter,
cudf::size_type bloom_filter_bits,
cudf::column_view const& input,
cudf::size_type num_hashes,
rmm::cuda_stream_view stream = cudf::get_default_stream());
void bloom_filter_put(cudf::device_span<cudf::bitmask_type> bloom_filter,
int64_t bloom_filter_bits,
cudf::column_view const& input,
cudf::size_type num_hashes,
rmm::cuda_stream_view stream = cudf::get_default_stream());

/**
* @brief Probe a bloom filter with an input column of int64_t values.
Expand All @@ -69,7 +71,7 @@ void bloom_filter_build(cudf::device_span<cudf::bitmask_type> bloom_filter,
std::unique_ptr<cudf::column> bloom_filter_probe(
cudf::column_view const& input,
cudf::device_span<cudf::bitmask_type const> bloom_filter,
cudf::size_type bloom_filter_bits,
int64_t bloom_filter_bits,
cudf::size_type num_hashes,
rmm::cuda_stream_view stream = cudf::get_default_stream(),
rmm::mr::device_memory_resource* mr = rmm::mr::get_current_device_resource());
Expand Down
8 changes: 4 additions & 4 deletions src/main/cpp/tests/bloom_filter.cu
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ TEST_F(BloomFilterTest, BuildAndProbe)
static_cast<cudf::bitmask_type*>(_bloom_filter->data()),
_bloom_filter->size() / sizeof(cudf::bitmask_type)};

spark_rapids_jni::bloom_filter_build(bloom_filter, bloom_filter_bits, input, 3, stream);
spark_rapids_jni::bloom_filter_put(bloom_filter, bloom_filter_bits, input, 3, stream);

// probe
cudf::test::fixed_width_column_wrapper<int64_t> probe{
Expand All @@ -83,19 +83,19 @@ TEST_F(BloomFilterTest, ProbeMerged)
cudf::test::fixed_width_column_wrapper<int64_t> col_a{20, 80, 100, 99, 47, -9, 234000000};
auto _bloom_filter_a = spark_rapids_jni::bloom_filter_create(bloom_filter_bits, stream);
auto bloom_filter_a = spark_rapids_jni::bloom_filter_to_span(*_bloom_filter_a);
spark_rapids_jni::bloom_filter_build(bloom_filter_a, bloom_filter_bits, col_a, 3, stream);
spark_rapids_jni::bloom_filter_put(bloom_filter_a, bloom_filter_bits, col_a, 3, stream);

// column b
cudf::test::fixed_width_column_wrapper<int64_t> col_b{100, 200, 300, 400};
auto _bloom_filter_b = spark_rapids_jni::bloom_filter_create(bloom_filter_bits, stream);
auto bloom_filter_b = spark_rapids_jni::bloom_filter_to_span(*_bloom_filter_b);
spark_rapids_jni::bloom_filter_build(bloom_filter_b, bloom_filter_bits, col_b, 3, stream);
spark_rapids_jni::bloom_filter_put(bloom_filter_b, bloom_filter_bits, col_b, 3, stream);

// column c
cudf::test::fixed_width_column_wrapper<int64_t> col_c{-100, -200, -300, -400};
auto _bloom_filter_c = spark_rapids_jni::bloom_filter_create(bloom_filter_bits, stream);
auto bloom_filter_c = spark_rapids_jni::bloom_filter_to_span(*_bloom_filter_c);
spark_rapids_jni::bloom_filter_build(bloom_filter_c, bloom_filter_bits, col_c, 3, stream);
spark_rapids_jni::bloom_filter_put(bloom_filter_c, bloom_filter_bits, col_c, 3, stream);

// merged bloom filter
auto _bloom_filter_merged =
Expand Down
Loading

0 comments on commit 23881e1

Please sign in to comment.