diff --git a/src/main/cpp/CMakeLists.txt b/src/main/cpp/CMakeLists.txt index b2f5c339dd..3e7c388057 100644 --- a/src/main/cpp/CMakeLists.txt +++ b/src/main/cpp/CMakeLists.txt @@ -148,6 +148,7 @@ add_library( spark_rapids_jni SHARED src/CastStringJni.cpp src/DecimalUtilsJni.cpp + src/HashJni.cpp src/MapUtilsJni.cpp src/NativeParquetJni.cpp src/RowConversionJni.cpp @@ -158,6 +159,7 @@ add_library( src/cast_string_to_float.cu src/decimal_utils.cu src/map_utils.cu + src/murmur_hash.cu src/row_conversion.cu src/zorder.cu ) diff --git a/src/main/cpp/src/HashJni.cpp b/src/main/cpp/src/HashJni.cpp new file mode 100644 index 0000000000..bcf72922d4 --- /dev/null +++ b/src/main/cpp/src/HashJni.cpp @@ -0,0 +1,39 @@ +/* + * Copyright (c) 2023, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "cudf_jni_apis.hpp" +#include "dtype_utils.hpp" +#include "jni_utils.hpp" + +#include "hash.cuh" + +extern "C" { + +JNIEXPORT jlong JNICALL Java_com_nvidia_spark_rapids_jni_Hash_murmurHash32( + JNIEnv* env, jclass, jint seed, jlongArray column_handles) +{ + JNI_NULL_CHECK(env, column_handles, "array of column handles is null", 0); + + try { + cudf::jni::auto_set_device(env); + auto column_views = + cudf::jni::native_jpointerArray{env, column_handles}.get_dereferenced(); + return cudf::jni::release_as_jlong( + spark_rapids_jni::murmur_hash3_32(cudf::table_view{column_views}, seed)); + } + CATCH_STD(env, 0); +} +} diff --git a/src/main/cpp/src/hash.cuh b/src/main/cpp/src/hash.cuh new file mode 100644 index 0000000000..84a204d08f --- /dev/null +++ b/src/main/cpp/src/hash.cuh @@ -0,0 +1,94 @@ +/* + * Copyright (c) 2023, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include +#include + +#include + +#include + +namespace spark_rapids_jni { + +/** + * @brief Converts a cudf decimal128 value to a java bigdecimal value. + * + * @param key The cudf decimal value + * + * @returns A 128 bit value containing the converted decimal bits and a length + * representing the relevant number of bytes in the value. + * + */ +__device__ __inline__ std::pair<__int128_t, cudf::size_type> to_java_bigdecimal( + numeric::decimal128 key) +{ + // java.math.BigDecimal.valueOf(unscaled_value, _scale).unscaledValue().toByteArray() + // https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/hash.scala#L381 + __int128_t const val = key.value(); + constexpr cudf::size_type key_size = sizeof(__int128_t); + std::byte const* data = reinterpret_cast(&val); + + // Small negative values start with 0xff..., small positive values start with 0x00... + bool const is_negative = val < 0; + std::byte const zero_value = is_negative ? std::byte{0xff} : std::byte{0x00}; + + // If the value can be represented with a shorter than 16-byte integer, the + // leading bytes of the little-endian value are truncated and are not hashed. + auto const reverse_begin = thrust::reverse_iterator(data + key_size); + auto const reverse_end = thrust::reverse_iterator(data); + auto const first_nonzero_byte = + thrust::find_if_not(thrust::seq, reverse_begin, reverse_end, [zero_value](std::byte const& v) { + return v == zero_value; + }).base(); + // Max handles special case of 0 and -1 which would shorten to 0 length otherwise + cudf::size_type length = + std::max(1, static_cast(thrust::distance(data, first_nonzero_byte))); + + // Preserve the 2's complement sign bit by adding a byte back on if necessary. + // e.g. 0x0000ff would shorten to 0x00ff. The 0x00 byte is retained to + // preserve the sign bit, rather than leaving an "f" at the front which would + // change the sign bit. However, 0x00007f would shorten to 0x7f. No extra byte + // is needed because the leftmost bit matches the sign bit. Similarly for + // negative values: 0xffff00 --> 0xff00 and 0xffff80 --> 0x80. + if ((length < key_size) && (is_negative ^ bool(data[length - 1] & std::byte{0x80}))) { ++length; } + + // Convert to big endian by reversing the range of nonzero bytes. Only those bytes are hashed. + __int128_t big_endian_value = 0; + auto big_endian_data = reinterpret_cast(&big_endian_value); + thrust::reverse_copy(thrust::seq, data, data + length, big_endian_data); + + return {big_endian_value, length}; +} + +/** + * @brief Computes the murmur32 hash value of each row in the input set of columns. + * + * @param input The table of columns to hash + * @param seed Optional seed value to use for the hash function + * @param stream CUDA stream used for device memory operations and kernel launches + * @param mr Device memory resource used to allocate the returned column's device memory + * + * @returns A column where each row is the hash of a column from the input. + */ +std::unique_ptr murmur_hash3_32( + cudf::table_view const& input, + uint32_t seed = 0, + rmm::cuda_stream_view stream = cudf::get_default_stream(), + rmm::mr::device_memory_resource* mr = rmm::mr::get_current_device_resource()); + +} // namespace spark_rapids_jni diff --git a/src/main/cpp/src/murmur_hash.cu b/src/main/cpp/src/murmur_hash.cu new file mode 100644 index 0000000000..fc8f2db8f6 --- /dev/null +++ b/src/main/cpp/src/murmur_hash.cu @@ -0,0 +1,397 @@ +/* + * Copyright (c) 2023, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include +#include +#include +#include +#include + +#include "hash.cuh" + +#include +#include + +#include +#include + +namespace spark_rapids_jni { + +namespace { + +using spark_hash_value_type = int32_t; + +template ())> +struct SparkMurmurHash3_32 { + using result_type = spark_hash_value_type; + + constexpr SparkMurmurHash3_32() = default; + constexpr SparkMurmurHash3_32(uint32_t seed) : m_seed(seed) {} + + [[nodiscard]] __device__ inline uint32_t fmix32(uint32_t h) const + { + h ^= h >> 16; + h *= 0x85ebca6b; + h ^= h >> 13; + h *= 0xc2b2ae35; + h ^= h >> 16; + return h; + } + + [[nodiscard]] __device__ inline uint32_t getblock32(std::byte const* data, + cudf::size_type offset) const + { + // Read a 4-byte value from the data pointer as individual bytes for safe + // unaligned access (very likely for string types). + auto block = reinterpret_cast(data + offset); + return block[0] | (block[1] << 8) | (block[2] << 16) | (block[3] << 24); + } + + [[nodiscard]] result_type __device__ inline operator()(Key const& key) const + { + return compute(key); + } + + template + result_type __device__ inline compute(T const& key) const + { + return compute_bytes(reinterpret_cast(&key), sizeof(T)); + } + + result_type __device__ inline compute_remaining_bytes(std::byte const* data, + cudf::size_type len, + cudf::size_type tail_offset, + result_type h) const + { + // Process remaining bytes that do not fill a four-byte chunk using Spark's approach + // (does not conform to normal MurmurHash3). + for (auto i = tail_offset; i < len; i++) { + // We require a two-step cast to get the k1 value from the byte. First, + // we must cast to a signed int8_t. Then, the sign bit is preserved when + // casting to uint32_t under 2's complement. Java preserves the sign when + // casting byte-to-int, but C++ does not. + uint32_t k1 = static_cast(std::to_integer(data[i])); + k1 *= c1; + k1 = cudf::detail::rotate_bits_left(k1, rot_c1); + k1 *= c2; + h ^= k1; + h = cudf::detail::rotate_bits_left(h, rot_c2); + h = h * 5 + c3; + } + return h; + } + + result_type __device__ compute_bytes(std::byte const* data, cudf::size_type const len) const + { + constexpr cudf::size_type BLOCK_SIZE = 4; + cudf::size_type const nblocks = len / BLOCK_SIZE; + cudf::size_type const tail_offset = nblocks * BLOCK_SIZE; + result_type h = m_seed; + + // Process all four-byte chunks. + for (cudf::size_type i = 0; i < nblocks; i++) { + uint32_t k1 = getblock32(data, i * BLOCK_SIZE); + k1 *= c1; + k1 = cudf::detail::rotate_bits_left(k1, rot_c1); + k1 *= c2; + h ^= k1; + h = cudf::detail::rotate_bits_left(h, rot_c2); + h = h * 5 + c3; + } + + h = compute_remaining_bytes(data, len, tail_offset, h); + + // Finalize hash. + h ^= len; + h = fmix32(h); + return h; + } + + private: + uint32_t m_seed{cudf::DEFAULT_HASH_SEED}; + static constexpr uint32_t c1 = 0xcc9e2d51; + static constexpr uint32_t c2 = 0x1b873593; + static constexpr uint32_t c3 = 0xe6546b64; + static constexpr uint32_t rot_c1 = 15; + static constexpr uint32_t rot_c2 = 13; +}; + +template <> +spark_hash_value_type __device__ inline SparkMurmurHash3_32::operator()(bool const& key) const +{ + return compute(key); +} + +template <> +spark_hash_value_type __device__ inline SparkMurmurHash3_32::operator()( + int8_t const& key) const +{ + return compute(key); +} + +template <> +spark_hash_value_type __device__ inline SparkMurmurHash3_32::operator()( + uint8_t const& key) const +{ + return compute(key); +} + +template <> +spark_hash_value_type __device__ inline SparkMurmurHash3_32::operator()( + int16_t const& key) const +{ + return compute(key); +} + +template <> +spark_hash_value_type __device__ inline SparkMurmurHash3_32::operator()( + uint16_t const& key) const +{ + return compute(key); +} + +template <> +spark_hash_value_type __device__ inline SparkMurmurHash3_32::operator()( + float const& key) const +{ + return compute(cudf::detail::normalize_nans(key)); +} + +template <> +spark_hash_value_type __device__ inline SparkMurmurHash3_32::operator()( + double const& key) const +{ + return compute(cudf::detail::normalize_nans(key)); +} + +template <> +spark_hash_value_type __device__ inline SparkMurmurHash3_32::operator()( + cudf::string_view const& key) const +{ + auto const data = reinterpret_cast(key.data()); + auto const len = key.size_bytes(); + return compute_bytes(data, len); +} + +template <> +spark_hash_value_type __device__ inline SparkMurmurHash3_32::operator()( + numeric::decimal32 const& key) const +{ + return compute(key.value()); +} + +template <> +spark_hash_value_type __device__ inline SparkMurmurHash3_32::operator()( + numeric::decimal64 const& key) const +{ + return compute(key.value()); +} + +template <> +spark_hash_value_type __device__ inline SparkMurmurHash3_32::operator()( + numeric::decimal128 const& key) const +{ + auto [java_d, length] = to_java_bigdecimal(key); + auto bytes = reinterpret_cast(&java_d); + return compute_bytes(bytes, length); +} + +/** + * @brief Computes the hash value of a row in the given table. + * + * This functor uses Spark conventions for Murmur hashing, which differs from + * the Murmur implementation used in the rest of libcudf. These differences + * include: + * - Serially using the output hash as an input seed for the next item + * - Ignorance of null values + * + * The serial use of hashes as seeds means that data of different nested types + * can exhibit hash collisions. For example, a row of an integer column + * containing a 1 will have the same hash as a lists column of integers + * containing a list of [1] and a struct column of a single integer column + * containing a struct of {1}. + * + * As a consequence of ignoring null values, inputs like [1], [1, null], and + * [null, 1] have the same hash (an expected hash collision). This kind of + * collision can also occur across a table of nullable columns and with nulls + * in structs ({1, null} and {null, 1} have the same hash). The seed value (the + * previous element's hash value) is returned as the hash if an element is + * null. + * + * For additional differences such as special tail processing and decimal type + * handling, refer to the SparkMurmurHash3_32 functor. + * + * @tparam hash_function Hash functor to use for hashing elements. Must be SparkMurmurHash3_32. + * @tparam Nullate A cudf::nullate type describing whether to check for nulls. + */ +template