diff --git a/rust/otap-dataflow/Cargo.toml b/rust/otap-dataflow/Cargo.toml index b8fa00788a..94b803769d 100644 --- a/rust/otap-dataflow/Cargo.toml +++ b/rust/otap-dataflow/Cargo.toml @@ -212,6 +212,7 @@ weaver_forge = { git = "https://github.com/open-telemetry/weaver.git", rev = "37 weaver_resolved_schema = { git = "https://github.com/open-telemetry/weaver.git", rev = "37c645a5ebc9e0d2a68f9228205f1d7d6a32ccbc"} weaver_resolver = { git = "https://github.com/open-telemetry/weaver.git", rev = "37c645a5ebc9e0d2a68f9228205f1d7d6a32ccbc"} weaver_semconv = { git = "https://github.com/open-telemetry/weaver.git", rev = "37c645a5ebc9e0d2a68f9228205f1d7d6a32ccbc"} +sha1 = { version = "0.10" } xxhash-rust = { version = "0.8", features = ["xxh3"] } zip = "=8.6.0" byte-unit = { version = "5.2.0", features = ["serde"] } diff --git a/rust/otap-dataflow/crates/query-engine/Cargo.toml b/rust/otap-dataflow/crates/query-engine/Cargo.toml index 0ce25dfaed..6115354644 100644 --- a/rust/otap-dataflow/crates/query-engine/Cargo.toml +++ b/rust/otap-dataflow/crates/query-engine/Cargo.toml @@ -19,6 +19,8 @@ datafusion = { workspace = true, features = [ ] } futures-core = { workspace = true } memchr = { workspace = true } +sha1 = { workspace = true, optional = true } +xxhash-rust = { workspace = true } parking_lot = { workspace = true } smallvec = { workspace = true } thiserror = { workspace = true } @@ -29,6 +31,9 @@ uuid = { workspace = true } otap-df-config = { workspace = true } otap-df-pdata = { workspace = true, features = ["testing"] } +[features] +sha1-hash = ["dep:sha1"] + [dev-dependencies] criterion = { workspace = true, features = ["async_tokio"] } data_engine_kql_parser = { workspace = true } diff --git a/rust/otap-dataflow/crates/query-engine/src/consts.rs b/rust/otap-dataflow/crates/query-engine/src/consts.rs index 86054480ce..216021d569 100644 --- a/rust/otap-dataflow/crates/query-engine/src/consts.rs +++ b/rust/otap-dataflow/crates/query-engine/src/consts.rs @@ -14,6 +14,14 @@ pub(crate) const LTRIM_FUNC_NAME: &str = "ltrim"; pub(crate) const REGEXP_SUBSTR_FUNC_NAME: &str = "regexp_substr"; pub(crate) const RTRIM_FUNC_NAME: &str = "rtrim"; pub(crate) const SHA256_FUNC_NAME: &str = "sha256"; +pub(crate) const MD5_FUNC_NAME: &str = "md5"; +pub(crate) const FNV_FUNC_NAME: &str = "fnv"; +pub(crate) const MURMUR3_FUNC_NAME: &str = "murmur3"; +#[cfg(feature = "sha1-hash")] +pub(crate) const SHA1_FUNC_NAME: &str = "sha1"; +pub(crate) const SHA512_FUNC_NAME: &str = "sha512"; +pub(crate) const XXH3_FUNC_NAME: &str = "xxh3"; +pub(crate) const XXH128_FUNC_NAME: &str = "xxh128"; pub(crate) const UUID_FUNC_NAME: &str = "uuid"; pub(crate) const UUIDV7_FUNC_NAME: &str = "uuidv7"; pub(crate) const LOWER_CASE_FUNC_NAME: &str = "lower_case"; diff --git a/rust/otap-dataflow/crates/query-engine/src/parser.rs b/rust/otap-dataflow/crates/query-engine/src/parser.rs index cc9e0a0735..327c5cdb47 100644 --- a/rust/otap-dataflow/crates/query-engine/src/parser.rs +++ b/rust/otap-dataflow/crates/query-engine/src/parser.rs @@ -10,17 +10,20 @@ use data_engine_expressions::{ }; use data_engine_parser_abstractions::ParserOptions; +#[cfg(feature = "sha1-hash")] +use crate::consts::SHA1_FUNC_NAME; use crate::consts::{ - ENCODE_FUNC_NAME, FORMAT_DATETIME_FUNC_NAME, LOG_FUNC_NAME, LOWER_CASE_FUNC_NAME, - LTRIM_FUNC_NAME, REGEXP_SUBSTR_FUNC_NAME, RTRIM_FUNC_NAME, SHA256_FUNC_NAME, - UPPER_CASE_FUNC_NAME, UUID_FUNC_NAME, UUIDV7_FUNC_NAME, + ENCODE_FUNC_NAME, FNV_FUNC_NAME, FORMAT_DATETIME_FUNC_NAME, LOG_FUNC_NAME, + LOWER_CASE_FUNC_NAME, LTRIM_FUNC_NAME, MD5_FUNC_NAME, MURMUR3_FUNC_NAME, + REGEXP_SUBSTR_FUNC_NAME, RTRIM_FUNC_NAME, SHA256_FUNC_NAME, SHA512_FUNC_NAME, + UPPER_CASE_FUNC_NAME, UUID_FUNC_NAME, UUIDV7_FUNC_NAME, XXH3_FUNC_NAME, XXH128_FUNC_NAME, }; /// Create parser options that can be used when parsing an expression that will be executed with /// this query engine #[must_use] pub fn default_parser_options() -> ParserOptions { - ParserOptions::new() + let opts = ParserOptions::new() // Add placeholders for scalar UDFs supported by this engine - these are needed because // the invoke function expression in our expression AST references the function by an ID, // adding these will make a named function with some ID available in the parser. Only the @@ -36,6 +39,12 @@ pub fn default_parser_options() -> ParserOptions { // .with_external_function(FORMAT_DATETIME_FUNC_NAME, param_placeholders(2), None) .with_external_function(SHA256_FUNC_NAME, param_placeholders(1), None) + .with_external_function(MD5_FUNC_NAME, param_placeholders(1), None) + .with_external_function(FNV_FUNC_NAME, param_placeholders(1), None) + .with_external_function(MURMUR3_FUNC_NAME, param_placeholders(1), None) + .with_external_function(SHA512_FUNC_NAME, param_placeholders(1), None) + .with_external_function(XXH3_FUNC_NAME, param_placeholders(1), None) + .with_external_function(XXH128_FUNC_NAME, param_placeholders(1), None) .with_external_function(ENCODE_FUNC_NAME, param_placeholders(2), None) .with_external_function(UUID_FUNC_NAME, param_placeholders(0), None) .with_external_function(UUIDV7_FUNC_NAME, param_placeholders(0), None) @@ -105,7 +114,12 @@ pub fn default_parser_options() -> ParserOptions { ), ], None, - ) + ); + + #[cfg(feature = "sha1-hash")] + let opts = opts.with_external_function(SHA1_FUNC_NAME, param_placeholders(1), None); + + opts } fn param_placeholders( diff --git a/rust/otap-dataflow/crates/query-engine/src/pipeline/assign.rs b/rust/otap-dataflow/crates/query-engine/src/pipeline/assign.rs index ea6c35124f..db8a45af0f 100644 --- a/rust/otap-dataflow/crates/query-engine/src/pipeline/assign.rs +++ b/rust/otap-dataflow/crates/query-engine/src/pipeline/assign.rs @@ -4805,6 +4805,85 @@ mod test { test_update_attr_to_hash_function_call_result_all_supported_types::().await } + async fn test_update_attr_to_md5_function_call_result() { + let logs_data = to_logs_data(vec![ + LogRecord::build() + .attributes(vec![ + KeyValue::new("str_attr", AnyValue::new_string("y")), + KeyValue::new("binary_attr", AnyValue::new_bytes(b"418")), + ]) + .finish(), + ]); + let query = r#"logs | extend + attributes["str_attr"] = md5(attributes["str_attr"]), + attributes["binary_attr"] = md5(attributes["binary_attr"]) + "#; + let pipeline_expr = P::parse_with_options(query, default_parser_options()) + .unwrap() + .pipeline; + let mut pipeline = Pipeline::new(pipeline_expr); + let input = otlp_to_otap(&OtlpProtoMessage::Logs(logs_data)); + let result = pipeline.execute(input).await.unwrap(); + let OtlpProtoMessage::Logs(result_logs_data) = otap_to_otlp(&result) else { + panic!("invalid signal type"); + }; + let log_0 = &result_logs_data.resource_logs[0].scope_logs[0].log_records[0]; + assert_eq!( + log_0.attributes, + vec![ + KeyValue::new( + "str_attr", + AnyValue::new_string("415290769594460e2e485922904f345d") + ), + KeyValue::new( + "binary_attr", + AnyValue::new_string("d1f255a373a3cef72e03aa9d980c7eca") + ) + ] + ); + } + + #[tokio::test] + async fn test_update_attr_to_md5_function_call_result_opl_parser() { + test_update_attr_to_md5_function_call_result::().await + } + #[tokio::test] + async fn test_update_attr_to_md5_function_call_result_kql_parser() { + test_update_attr_to_md5_function_call_result::().await + } + + async fn test_update_attr_to_fnv_hash_function_call_result() { + let logs_data = to_logs_data(vec![ + LogRecord::build() + .attributes(vec![KeyValue::new( + "str_attr", + AnyValue::new_string("hello"), + )]) + .finish(), + ]); + + let query = r#"logs | extend attributes["str_attr"] = fnv(attributes["str_attr"])"#; + let pipeline_expr = P::parse_with_options(query, default_parser_options()) + .unwrap() + .pipeline; + let mut pipeline = Pipeline::new(pipeline_expr); + + let input = otlp_to_otap(&OtlpProtoMessage::Logs(logs_data)); + let result = pipeline.execute(input).await.unwrap(); + let OtlpProtoMessage::Logs(result_logs_data) = otap_to_otlp(&result) else { + panic!("invalid signal type"); + }; + let log_0 = &result_logs_data.resource_logs[0].scope_logs[0].log_records[0]; + assert_eq!( + log_0.attributes, + vec![KeyValue::new( + "str_attr", + // FNV-1a 64-bit of "hello" interpreted as i64 + AnyValue::new_int(-6615550055289275125_i64) + )] + ); + } + async fn test_update_attr_to_log_function_call_result() { let logs_data = to_logs_data(vec![ LogRecord::build() @@ -4826,7 +4905,6 @@ mod test { let input = otlp_to_otap(&OtlpProtoMessage::Logs(logs_data)); let result = pipeline.execute(input).await.unwrap(); - let OtlpProtoMessage::Logs(result_logs_data) = otap_to_otlp(&result) else { panic!("invalid signal type"); }; @@ -4857,6 +4935,228 @@ mod test { assert_eq!(*log_ratio, 100.0f64.log10()); } + #[tokio::test] + async fn test_update_attr_to_fnv_hash_function_call_result_opl_parser() { + test_update_attr_to_fnv_hash_function_call_result::().await + } + + #[tokio::test] + async fn test_update_attr_to_fnv_hash_function_call_result_kql_parser() { + test_update_attr_to_fnv_hash_function_call_result::().await + } + + async fn test_update_attr_to_murmur3_hash_function_call_result() { + let logs_data = to_logs_data(vec![ + LogRecord::build() + .attributes(vec![KeyValue::new( + "str_attr", + AnyValue::new_string("hello"), + )]) + .finish(), + ]); + + let query = r#"logs | extend attributes["str_attr"] = murmur3(attributes["str_attr"])"#; + let pipeline_expr = P::parse_with_options(query, default_parser_options()) + .unwrap() + .pipeline; + let mut pipeline = Pipeline::new(pipeline_expr); + + let input = otlp_to_otap(&OtlpProtoMessage::Logs(logs_data)); + let result = pipeline.execute(input).await.unwrap(); + let OtlpProtoMessage::Logs(result_logs_data) = otap_to_otlp(&result) else { + panic!("invalid signal type"); + }; + let log_0 = &result_logs_data.resource_logs[0].scope_logs[0].log_records[0]; + assert_eq!( + log_0.attributes, + vec![KeyValue::new( + "str_attr", + // MurmurHash3 32-bit of "hello" with seed=0 + AnyValue::new_int(613_153_351_i64) + )] + ); + } + + #[tokio::test] + async fn test_update_attr_to_murmur3_hash_function_call_result_opl_parser() { + test_update_attr_to_murmur3_hash_function_call_result::().await + } + + #[tokio::test] + async fn test_update_attr_to_murmur3_hash_function_call_result_kql_parser() { + test_update_attr_to_murmur3_hash_function_call_result::().await + } + + #[cfg(feature = "sha1-hash")] + async fn test_update_attr_to_sha1_hash_function_call_result() { + let logs_data = to_logs_data(vec![ + LogRecord::build() + .attributes(vec![KeyValue::new( + "str_attr", + AnyValue::new_string("hello"), + )]) + .finish(), + ]); + + let query = + r#"logs | extend attributes["str_attr"] = encode(sha1(attributes["str_attr"]), "hex")"#; + let pipeline_expr = P::parse_with_options(query, default_parser_options()) + .unwrap() + .pipeline; + let mut pipeline = Pipeline::new(pipeline_expr); + + let input = otlp_to_otap(&OtlpProtoMessage::Logs(logs_data)); + let result = pipeline.execute(input).await.unwrap(); + let OtlpProtoMessage::Logs(result_logs_data) = otap_to_otlp(&result) else { + panic!("invalid signal type"); + }; + let log_0 = &result_logs_data.resource_logs[0].scope_logs[0].log_records[0]; + assert_eq!( + log_0.attributes, + vec![KeyValue::new( + "str_attr", + AnyValue::new_string("aaf4c61ddcc5e8a2dabede0f3b482cd9aea9434d") + )] + ); + } + + #[cfg(feature = "sha1-hash")] + #[tokio::test] + async fn test_update_attr_to_sha1_hash_function_call_result_opl_parser() { + test_update_attr_to_sha1_hash_function_call_result::().await + } + + #[cfg(feature = "sha1-hash")] + #[tokio::test] + async fn test_update_attr_to_sha1_hash_function_call_result_kql_parser() { + test_update_attr_to_sha1_hash_function_call_result::().await + } + + async fn test_update_attr_to_sha512_hash_function_call_result() { + let logs_data = to_logs_data(vec![ + LogRecord::build() + .attributes(vec![KeyValue::new( + "str_attr", + AnyValue::new_string("hello"), + )]) + .finish(), + ]); + + let query = r#"logs | extend attributes["str_attr"] = encode(sha512(attributes["str_attr"]), "hex")"#; + let pipeline_expr = P::parse_with_options(query, default_parser_options()) + .unwrap() + .pipeline; + let mut pipeline = Pipeline::new(pipeline_expr); + + let input = otlp_to_otap(&OtlpProtoMessage::Logs(logs_data)); + let result = pipeline.execute(input).await.unwrap(); + let OtlpProtoMessage::Logs(result_logs_data) = otap_to_otlp(&result) else { + panic!("invalid signal type"); + }; + let log_0 = &result_logs_data.resource_logs[0].scope_logs[0].log_records[0]; + assert_eq!( + log_0.attributes, + vec![KeyValue::new( + "str_attr", + AnyValue::new_string( + "9b71d224bd62f3785d96d46ad3ea3d73319bfbc2890caadae2dff72519673ca72323c3d99ba5c11d7c7acc6e14b8c5da0c4663475c2e5c3adef46f73bcdec043" + ) + )] + ); + } + + #[tokio::test] + async fn test_update_attr_to_sha512_hash_function_call_result_opl_parser() { + test_update_attr_to_sha512_hash_function_call_result::().await + } + + #[tokio::test] + async fn test_update_attr_to_sha512_hash_function_call_result_kql_parser() { + test_update_attr_to_sha512_hash_function_call_result::().await + } + + async fn test_update_attr_to_xxh3_hash_function_call_result() { + let logs_data = to_logs_data(vec![ + LogRecord::build() + .attributes(vec![KeyValue::new( + "str_attr", + AnyValue::new_string("hello"), + )]) + .finish(), + ]); + + let query = r#"logs | extend attributes["str_attr"] = xxh3(attributes["str_attr"])"#; + let pipeline_expr = P::parse_with_options(query, default_parser_options()) + .unwrap() + .pipeline; + let mut pipeline = Pipeline::new(pipeline_expr); + + let input = otlp_to_otap(&OtlpProtoMessage::Logs(logs_data)); + let result = pipeline.execute(input).await.unwrap(); + let OtlpProtoMessage::Logs(result_logs_data) = otap_to_otlp(&result) else { + panic!("invalid signal type"); + }; + let log_0 = &result_logs_data.resource_logs[0].scope_logs[0].log_records[0]; + assert_eq!( + log_0.attributes, + vec![KeyValue::new( + "str_attr", + AnyValue::new_int(-7685981735718036227_i64) + )] + ); + } + + #[tokio::test] + async fn test_update_attr_to_xxh3_hash_function_call_result_opl_parser() { + test_update_attr_to_xxh3_hash_function_call_result::().await + } + + #[tokio::test] + async fn test_update_attr_to_xxh3_hash_function_call_result_kql_parser() { + test_update_attr_to_xxh3_hash_function_call_result::().await + } + + async fn test_update_attr_to_xxh128_hash_function_call_result() { + let logs_data = to_logs_data(vec![ + LogRecord::build() + .attributes(vec![KeyValue::new( + "str_attr", + AnyValue::new_string("hello"), + )]) + .finish(), + ]); + + let query = r#"logs | extend attributes["str_attr"] = encode(xxh128(attributes["str_attr"]), "hex")"#; + let pipeline_expr = P::parse_with_options(query, default_parser_options()) + .unwrap() + .pipeline; + let mut pipeline = Pipeline::new(pipeline_expr); + + let input = otlp_to_otap(&OtlpProtoMessage::Logs(logs_data)); + let result = pipeline.execute(input).await.unwrap(); + let OtlpProtoMessage::Logs(result_logs_data) = otap_to_otlp(&result) else { + panic!("invalid signal type"); + }; + let log_0 = &result_logs_data.resource_logs[0].scope_logs[0].log_records[0]; + assert_eq!( + log_0.attributes, + vec![KeyValue::new( + "str_attr", + AnyValue::new_string("b5e9c1ad071b3e7fc779cfaa5e523818") + )] + ); + } + + #[tokio::test] + async fn test_update_attr_to_xxh128_hash_function_call_result_opl_parser() { + test_update_attr_to_xxh128_hash_function_call_result::().await + } + + #[tokio::test] + async fn test_update_attr_to_xxh128_hash_function_call_result_kql_parser() { + test_update_attr_to_xxh128_hash_function_call_result::().await + } + #[tokio::test] async fn test_update_attr_to_log_function_call_result_opl_parser() { test_update_attr_to_log_function_call_result::().await diff --git a/rust/otap-dataflow/crates/query-engine/src/pipeline/expr.rs b/rust/otap-dataflow/crates/query-engine/src/pipeline/expr.rs index 3ec8427845..0b54681017 100644 --- a/rust/otap-dataflow/crates/query-engine/src/pipeline/expr.rs +++ b/rust/otap-dataflow/crates/query-engine/src/pipeline/expr.rs @@ -59,7 +59,7 @@ use data_engine_expressions::{ }; use datafusion::common::DFSchema; use datafusion::functions::core::expr_ext::FieldAccessor; -use datafusion::functions::crypto::sha256; +use datafusion::functions::crypto::{md5, sha256, sha512}; use datafusion::functions::datetime::to_char; use datafusion::functions::encoding::encode; @@ -79,17 +79,24 @@ use otap_df_pdata::arrays::{ use otap_df_pdata::proto::opentelemetry::arrow::v1::ArrowPayloadType; use otap_df_pdata::schema::consts; +#[cfg(feature = "sha1-hash")] +use crate::consts::SHA1_FUNC_NAME; use crate::consts::{ - ENCODE_FUNC_NAME, FORMAT_DATETIME_FUNC_NAME, LOG_FUNC_NAME, LOWER_CASE_FUNC_NAME, - LTRIM_FUNC_NAME, REGEXP_SUBSTR_FUNC_NAME, RTRIM_FUNC_NAME, SHA256_FUNC_NAME, - UPPER_CASE_FUNC_NAME, UUID_FUNC_NAME, UUIDV7_FUNC_NAME, + ENCODE_FUNC_NAME, FNV_FUNC_NAME, FORMAT_DATETIME_FUNC_NAME, LOG_FUNC_NAME, + LOWER_CASE_FUNC_NAME, LTRIM_FUNC_NAME, MD5_FUNC_NAME, MURMUR3_FUNC_NAME, + REGEXP_SUBSTR_FUNC_NAME, RTRIM_FUNC_NAME, SHA256_FUNC_NAME, SHA512_FUNC_NAME, + UPPER_CASE_FUNC_NAME, UUID_FUNC_NAME, UUIDV7_FUNC_NAME, XXH3_FUNC_NAME, XXH128_FUNC_NAME, }; use crate::error::{Error, Result}; use crate::pipeline::expr::join::{join, multi_join}; use crate::pipeline::expr::types::{ ExprLogicalType, coerce_arithmetic, nested_struct_field_type, root_field_type, }; -use crate::pipeline::functions::{arity_range, regexp_substr, substring, uuidv7}; +#[cfg(feature = "sha1-hash")] +use crate::pipeline::functions::sha1_hash; +use crate::pipeline::functions::{ + arity_range, fnv_hash, murmur3_hash, regexp_substr, substring, uuidv7, xxh3_hash, xxh128_hash, +}; use crate::pipeline::planner::{AttributesIdentifier, ColumnAccessor}; use crate::pipeline::project::anyval::{ find_any_value_columns, project_any_value_columns, stitch_partitioned_results, @@ -842,6 +849,14 @@ impl DataFusionFunctionDef { FORMAT_DATETIME_FUNC_NAME => Self::new(to_char(), ExprLogicalType::String, false, None), RTRIM_FUNC_NAME => Self::new(rtrim(), ExprLogicalType::String, true, None), SHA256_FUNC_NAME => Self::new(sha256(), ExprLogicalType::Binary, true, None), + MD5_FUNC_NAME => Self::new(md5(), ExprLogicalType::String, true, Some(DataType::Utf8)), + FNV_FUNC_NAME => Self::new(fnv_hash(), ExprLogicalType::Int64, true, None), + MURMUR3_FUNC_NAME => Self::new(murmur3_hash(), ExprLogicalType::Int64, true, None), + #[cfg(feature = "sha1-hash")] + SHA1_FUNC_NAME => Self::new(sha1_hash(), ExprLogicalType::Binary, true, None), + SHA512_FUNC_NAME => Self::new(sha512(), ExprLogicalType::Binary, true, None), + XXH3_FUNC_NAME => Self::new(xxh3_hash(), ExprLogicalType::Int64, true, None), + XXH128_FUNC_NAME => Self::new(xxh128_hash(), ExprLogicalType::Binary, true, None), UUID_FUNC_NAME => Self::new(uuid(), ExprLogicalType::String, false, None), UUIDV7_FUNC_NAME => Self::new(uuidv7(), ExprLogicalType::String, false, None), UPPER_CASE_FUNC_NAME => Self::new(upper(), ExprLogicalType::String, true, None), diff --git a/rust/otap-dataflow/crates/query-engine/src/pipeline/functions.rs b/rust/otap-dataflow/crates/query-engine/src/pipeline/functions.rs index 749f16d4c6..6e4865c991 100644 --- a/rust/otap-dataflow/crates/query-engine/src/pipeline/functions.rs +++ b/rust/otap-dataflow/crates/query-engine/src/pipeline/functions.rs @@ -13,12 +13,24 @@ use datafusion::logical_expr::{self as datafusion_expr, TypeSignature}; use datafusion::logical_expr_common::signature::Arity; mod contains; +mod fnv; pub(crate) mod is_type; +mod murmur3; mod regexp_substr; +#[cfg(feature = "sha1-hash")] +mod sha1; mod substring; mod uuidv7; +mod xxh128; +mod xxh3; make_udf_function!(contains::ExtendedContainsFunc, contains); +make_udf_function!(fnv::FnvHashFunc, fnv_hash); +make_udf_function!(murmur3::Murmur3HashFunc, murmur3_hash); +#[cfg(feature = "sha1-hash")] +make_udf_function!(sha1::Sha1Func, sha1_hash); +make_udf_function!(xxh128::Xxh128Func, xxh128_hash); +make_udf_function!(xxh3::Xxh3Func, xxh3_hash); make_udf_function!(substring::SubstringFunc, substring); make_udf_function!(regexp_substr::RegexpSubstrFunc, regexp_substr); make_udf_function!(uuidv7::UuidV7Func, uuidv7); diff --git a/rust/otap-dataflow/crates/query-engine/src/pipeline/functions/fnv.rs b/rust/otap-dataflow/crates/query-engine/src/pipeline/functions/fnv.rs new file mode 100644 index 0000000000..6980d613de --- /dev/null +++ b/rust/otap-dataflow/crates/query-engine/src/pipeline/functions/fnv.rs @@ -0,0 +1,244 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +use std::any::Any; +use std::sync::Arc; + +use arrow::array::{ + Array, ArrayRef, BinaryArray, Int64Array, LargeBinaryArray, LargeStringArray, StringArray, + StringViewArray, +}; +use arrow::datatypes::DataType; +use datafusion::common::exec_err; +use datafusion::error::Result; +use datafusion::logical_expr::{ + ColumnarValue, Documentation, ScalarFunctionArgs, ScalarUDFImpl, Signature, Volatility, +}; +use datafusion::scalar::ScalarValue; + +const FNV_OFFSET_BASIS: u64 = 14695981039346656037; +const FNV_PRIME: u64 = 1099511628211; + +fn fnv1a_64(data: &[u8]) -> i64 { + let mut hash = FNV_OFFSET_BASIS; + for byte in data { + hash ^= u64::from(*byte); + hash = hash.wrapping_mul(FNV_PRIME); + } + hash as i64 +} + +#[derive(Debug, PartialEq, Eq, Hash)] +pub struct FnvHashFunc { + signature: Signature, +} + +impl Default for FnvHashFunc { + fn default() -> Self { + Self::new() + } +} + +impl FnvHashFunc { + pub fn new() -> Self { + Self { + signature: Signature::any(1, Volatility::Immutable), + } + } +} + +impl ScalarUDFImpl for FnvHashFunc { + fn as_any(&self) -> &dyn Any { + self + } + + fn name(&self) -> &str { + "fnv" + } + + fn signature(&self) -> &Signature { + &self.signature + } + + fn return_type(&self, _: &[DataType]) -> Result { + Ok(DataType::Int64) + } + + fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result { + let args = args.args; + if args.len() != 1 { + return exec_err!( + "invalid number of args. {} expected 1 arg, found {}", + self.name(), + args.len() + ); + } + + match &args[0] { + ColumnarValue::Scalar(scalar) => { + let hash = hash_scalar(scalar)?; + Ok(ColumnarValue::Scalar(ScalarValue::Int64(hash))) + } + ColumnarValue::Array(arr) => { + let hashes = hash_array(arr.as_ref())?; + Ok(ColumnarValue::Array(Arc::new(hashes) as ArrayRef)) + } + } + } + + fn documentation(&self) -> Option<&Documentation> { + None + } +} + +/// computes the FNV-1a 64-bit hash of a scalar value +/// returns None if the input is null, or an error if the input type is not supported. +fn hash_scalar(scalar: &ScalarValue) -> Result> { + match scalar { + ScalarValue::Utf8(v) | ScalarValue::LargeUtf8(v) => { + Ok(v.as_deref().map(|s| fnv1a_64(s.as_bytes()))) + } + ScalarValue::Utf8View(v) => Ok(v.as_deref().map(|s| fnv1a_64(s.as_bytes()))), + ScalarValue::Binary(v) | ScalarValue::LargeBinary(v) => Ok(v.as_ref().map(|b| fnv1a_64(b))), + other => exec_err!("fnv: unsupported scalar type {:?}", other.data_type()), + } +} + +/// computes the FNV-1a 64-bit hash of each element in an array +/// null elements produce null output values. +/// supports [DataType::Utf8], [DataType::LargeUtf8], and [DataType::Binary] input arrays. +fn hash_array(arr: &dyn Array) -> Result { + match arr.data_type() { + DataType::Utf8 => { + let arr = arr.as_any().downcast_ref::().expect("Utf8"); + Ok(Int64Array::from_iter( + arr.iter().map(|v| v.map(|s| fnv1a_64(s.as_bytes()))), + )) + } + DataType::LargeUtf8 => { + let arr = arr + .as_any() + .downcast_ref::() + .expect("LargeUtf8"); + Ok(Int64Array::from_iter( + arr.iter().map(|v| v.map(|s| fnv1a_64(s.as_bytes()))), + )) + } + DataType::Utf8View => { + let arr = arr + .as_any() + .downcast_ref::() + .expect("Utf8View"); + Ok(Int64Array::from_iter( + arr.iter().map(|v| v.map(|s| fnv1a_64(s.as_bytes()))), + )) + } + DataType::Binary => { + let arr = arr.as_any().downcast_ref::().expect("Binary"); + Ok(Int64Array::from_iter(arr.iter().map(|v| v.map(fnv1a_64)))) + } + DataType::LargeBinary => { + let arr = arr + .as_any() + .downcast_ref::() + .expect("LargeBinary"); + Ok(Int64Array::from_iter(arr.iter().map(|v| v.map(fnv1a_64)))) + } + other => exec_err!("fnv: unsupported array type {:?}", other), + } +} + +#[cfg(test)] +mod test { + use super::*; + use arrow::datatypes::Field; + use datafusion::common::config::ConfigOptions; + use datafusion::logical_expr::{ColumnarValue, ScalarFunctionArgs}; + use datafusion::scalar::ScalarValue; + use std::sync::Arc; + + fn invoke(arg: ColumnarValue) -> Result { + let func = FnvHashFunc::new(); + let result = func.invoke_with_args(ScalarFunctionArgs { + args: vec![arg], + arg_fields: vec![], + number_rows: 1, + return_field: Field::new("", DataType::Int64, true).into(), + config_options: Arc::new(ConfigOptions::default()), + })?; + match result { + ColumnarValue::Scalar(ScalarValue::Int64(Some(v))) => Ok(v), + other => panic!("expected Scalar(Int64(Some(_))), got {:?}", other), + } + } + + #[test] + fn test_fnv_empty_string() { + // FNV-1a 64-bit of empty input is the offset basis itself + let v = invoke(ColumnarValue::Scalar(ScalarValue::Utf8( + Some(String::new()), + ))) + .unwrap(); + assert_eq!(v, FNV_OFFSET_BASIS as i64); + } + + #[test] + fn test_fnv_string() { + let v = invoke(ColumnarValue::Scalar(ScalarValue::Utf8(Some( + "hello".to_string(), + )))) + .unwrap(); + assert_eq!(v, -6615550055289275125_i64); + } + + #[test] + fn test_fnv_binary() { + let v = invoke(ColumnarValue::Scalar(ScalarValue::Binary(Some( + b"hello".to_vec(), + )))) + .unwrap(); + assert_eq!(v, -6615550055289275125_i64); + } + + #[test] + fn test_fnv_null_returns_none() { + let func = FnvHashFunc::new(); + let result = func + .invoke_with_args(ScalarFunctionArgs { + args: vec![ColumnarValue::Scalar(ScalarValue::Utf8(None))], + arg_fields: vec![], + number_rows: 1, + return_field: Field::new("", DataType::Int64, true).into(), + config_options: Arc::new(ConfigOptions::default()), + }) + .unwrap(); + match result { + ColumnarValue::Scalar(ScalarValue::Int64(None)) => {} + other => panic!("expected Scalar(Int64(None)), got {:?}", other), + } + } + + #[test] + fn test_fnv_array() { + use arrow::array::StringArray; + let arr = StringArray::from(vec![Some("hello"), Some("world"), None]); + let func = FnvHashFunc::new(); + let result = func + .invoke_with_args(ScalarFunctionArgs { + args: vec![ColumnarValue::Array(Arc::new(arr))], + arg_fields: vec![], + number_rows: 3, + return_field: Field::new("", DataType::Int64, true).into(), + config_options: Arc::new(ConfigOptions::default()), + }) + .unwrap(); + let ColumnarValue::Array(arr) = result else { + panic!("expected array"); + }; + let arr = arr.as_any().downcast_ref::().expect("Int64"); + assert_eq!(arr.value(0), -6615550055289275125_i64); + assert!(!arr.is_null(0)); + assert!(!arr.is_null(1)); + assert!(arr.is_null(2)); + } +} diff --git a/rust/otap-dataflow/crates/query-engine/src/pipeline/functions/murmur3.rs b/rust/otap-dataflow/crates/query-engine/src/pipeline/functions/murmur3.rs new file mode 100644 index 0000000000..fdd6de2281 --- /dev/null +++ b/rust/otap-dataflow/crates/query-engine/src/pipeline/functions/murmur3.rs @@ -0,0 +1,279 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +use std::any::Any; +use std::sync::Arc; + +use arrow::array::{ + Array, ArrayRef, BinaryArray, Int64Array, LargeBinaryArray, LargeStringArray, StringArray, + StringViewArray, +}; +use arrow::datatypes::DataType; +use datafusion::common::exec_err; +use datafusion::error::Result; +use datafusion::logical_expr::{ + ColumnarValue, Documentation, ScalarFunctionArgs, ScalarUDFImpl, Signature, Volatility, +}; +use datafusion::scalar::ScalarValue; + +/// MurmurHash3 32-bit (seed=0). Returns the hash as i64 for parity with OTTL. +fn murmur3_32(data: &[u8]) -> i64 { + const C1: u32 = 0xcc9e2d51; + const C2: u32 = 0x1b873593; + + let mut h: u32 = 0; + let chunks = data.chunks_exact(4); + let remainder = chunks.remainder(); + + for chunk in chunks { + let mut k = u32::from_le_bytes([chunk[0], chunk[1], chunk[2], chunk[3]]); + k = k.wrapping_mul(C1).rotate_left(15).wrapping_mul(C2); + h ^= k; + h = h.rotate_left(13).wrapping_mul(5).wrapping_add(0xe6546b64); + } + + let mut k: u32 = 0; + match remainder.len() { + 3 => { + k ^= u32::from(remainder[2]) << 16; + k ^= u32::from(remainder[1]) << 8; + k ^= u32::from(remainder[0]); + k = k.wrapping_mul(C1).rotate_left(15).wrapping_mul(C2); + h ^= k; + } + 2 => { + k ^= u32::from(remainder[1]) << 8; + k ^= u32::from(remainder[0]); + k = k.wrapping_mul(C1).rotate_left(15).wrapping_mul(C2); + h ^= k; + } + 1 => { + k ^= u32::from(remainder[0]); + k = k.wrapping_mul(C1).rotate_left(15).wrapping_mul(C2); + h ^= k; + } + _ => {} + } + + h ^= data.len() as u32; + h ^= h >> 16; + h = h.wrapping_mul(0x85ebca6b); + h ^= h >> 13; + h = h.wrapping_mul(0xc2b2ae35); + h ^= h >> 16; + + i64::from(h) +} + +#[derive(Debug, PartialEq, Eq, Hash)] +pub struct Murmur3HashFunc { + signature: Signature, +} + +impl Default for Murmur3HashFunc { + fn default() -> Self { + Self::new() + } +} + +impl Murmur3HashFunc { + pub fn new() -> Self { + Self { + signature: Signature::any(1, Volatility::Immutable), + } + } +} + +impl ScalarUDFImpl for Murmur3HashFunc { + fn as_any(&self) -> &dyn Any { + self + } + + fn name(&self) -> &str { + "murmur3" + } + + fn signature(&self) -> &Signature { + &self.signature + } + + fn return_type(&self, _: &[DataType]) -> Result { + Ok(DataType::Int64) + } + + fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result { + let args = args.args; + if args.len() != 1 { + return exec_err!( + "invalid number of args. {} expected 1 arg, found {}", + self.name(), + args.len() + ); + } + + match &args[0] { + ColumnarValue::Scalar(scalar) => { + let hash = hash_scalar(scalar)?; + Ok(ColumnarValue::Scalar(ScalarValue::Int64(hash))) + } + ColumnarValue::Array(arr) => { + let hashes = hash_array(arr.as_ref())?; + Ok(ColumnarValue::Array(Arc::new(hashes) as ArrayRef)) + } + } + } + + fn documentation(&self) -> Option<&Documentation> { + None + } +} + +fn hash_scalar(scalar: &ScalarValue) -> Result> { + match scalar { + ScalarValue::Utf8(v) | ScalarValue::LargeUtf8(v) => { + Ok(v.as_deref().map(|s| murmur3_32(s.as_bytes()))) + } + ScalarValue::Utf8View(v) => Ok(v.as_deref().map(|s| murmur3_32(s.as_bytes()))), + ScalarValue::Binary(v) | ScalarValue::LargeBinary(v) => { + Ok(v.as_ref().map(|b| murmur3_32(b))) + } + other => exec_err!("murmur3: unsupported scalar type {:?}", other.data_type()), + } +} + +fn hash_array(arr: &dyn Array) -> Result { + match arr.data_type() { + DataType::Utf8 => { + let arr = arr.as_any().downcast_ref::().expect("Utf8"); + Ok(Int64Array::from_iter( + arr.iter().map(|v| v.map(|s| murmur3_32(s.as_bytes()))), + )) + } + DataType::LargeUtf8 => { + let arr = arr + .as_any() + .downcast_ref::() + .expect("LargeUtf8"); + Ok(Int64Array::from_iter( + arr.iter().map(|v| v.map(|s| murmur3_32(s.as_bytes()))), + )) + } + DataType::Utf8View => { + let arr = arr + .as_any() + .downcast_ref::() + .expect("Utf8View"); + Ok(Int64Array::from_iter( + arr.iter().map(|v| v.map(|s| murmur3_32(s.as_bytes()))), + )) + } + DataType::Binary => { + let arr = arr.as_any().downcast_ref::().expect("Binary"); + Ok(Int64Array::from_iter(arr.iter().map(|v| v.map(murmur3_32)))) + } + DataType::LargeBinary => { + let arr = arr + .as_any() + .downcast_ref::() + .expect("LargeBinary"); + Ok(Int64Array::from_iter(arr.iter().map(|v| v.map(murmur3_32)))) + } + other => exec_err!("murmur3: unsupported array type {:?}", other), + } +} + +#[cfg(test)] +mod test { + use super::*; + use arrow::datatypes::Field; + use datafusion::common::config::ConfigOptions; + use datafusion::logical_expr::{ColumnarValue, ScalarFunctionArgs}; + use datafusion::scalar::ScalarValue; + use std::sync::Arc; + + fn invoke(arg: ColumnarValue) -> Result { + let func = Murmur3HashFunc::new(); + let result = func.invoke_with_args(ScalarFunctionArgs { + args: vec![arg], + arg_fields: vec![], + number_rows: 1, + return_field: Field::new("", DataType::Int64, true).into(), + config_options: Arc::new(ConfigOptions::default()), + })?; + match result { + ColumnarValue::Scalar(ScalarValue::Int64(Some(v))) => Ok(v), + other => panic!("expected Scalar(Int64(Some(_))), got {:?}", other), + } + } + + #[test] + fn test_murmur3_empty_string() { + // murmur3 of empty input with seed=0 is 0 + let v = invoke(ColumnarValue::Scalar(ScalarValue::Utf8( + Some(String::new()), + ))) + .unwrap(); + assert_eq!(v, 0); + } + + #[test] + fn test_murmur3_string() { + let v = invoke(ColumnarValue::Scalar(ScalarValue::Utf8(Some( + "hello".to_string(), + )))) + .unwrap(); + // pre-computed MurmurHash3 32-bit of "hello" with seed=0 + assert_eq!(v, 613_153_351_i64); + } + + #[test] + fn test_murmur3_binary_matches_string() { + let v = invoke(ColumnarValue::Scalar(ScalarValue::Binary(Some( + b"hello".to_vec(), + )))) + .unwrap(); + assert_eq!(v, 613_153_351_i64); + } + + #[test] + fn test_murmur3_null_returns_none() { + let func = Murmur3HashFunc::new(); + let result = func + .invoke_with_args(ScalarFunctionArgs { + args: vec![ColumnarValue::Scalar(ScalarValue::Utf8(None))], + arg_fields: vec![], + number_rows: 1, + return_field: Field::new("", DataType::Int64, true).into(), + config_options: Arc::new(ConfigOptions::default()), + }) + .unwrap(); + match result { + ColumnarValue::Scalar(ScalarValue::Int64(None)) => {} + other => panic!("expected Scalar(Int64(None)), got {:?}", other), + } + } + + #[test] + fn test_murmur3_array() { + use arrow::array::StringArray; + let arr = StringArray::from(vec![Some("hello"), Some("world"), None]); + let func = Murmur3HashFunc::new(); + let result = func + .invoke_with_args(ScalarFunctionArgs { + args: vec![ColumnarValue::Array(Arc::new(arr))], + arg_fields: vec![], + number_rows: 3, + return_field: Field::new("", DataType::Int64, true).into(), + config_options: Arc::new(ConfigOptions::default()), + }) + .unwrap(); + let ColumnarValue::Array(arr) = result else { + panic!("expected array"); + }; + let arr = arr.as_any().downcast_ref::().expect("Int64"); + assert_eq!(arr.value(0), 613_153_351_i64); + assert!(!arr.is_null(0)); + assert!(!arr.is_null(1)); + assert!(arr.is_null(2)); + } +} diff --git a/rust/otap-dataflow/crates/query-engine/src/pipeline/functions/sha1.rs b/rust/otap-dataflow/crates/query-engine/src/pipeline/functions/sha1.rs new file mode 100644 index 0000000000..ab76c002ba --- /dev/null +++ b/rust/otap-dataflow/crates/query-engine/src/pipeline/functions/sha1.rs @@ -0,0 +1,237 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +use std::any::Any; +use std::sync::Arc; + +use arrow::array::{ + Array, ArrayRef, BinaryArray, BinaryBuilder, LargeBinaryArray, LargeStringArray, StringArray, + StringViewArray, +}; +use arrow::datatypes::DataType; +use datafusion::common::exec_err; +use datafusion::error::Result; +use datafusion::logical_expr::{ + ColumnarValue, Documentation, ScalarFunctionArgs, ScalarUDFImpl, Signature, Volatility, +}; +use datafusion::scalar::ScalarValue; +use sha1::Digest; + +fn sha1_bytes(data: &[u8]) -> Vec { + sha1::Sha1::digest(data).to_vec() +} + +#[derive(Debug, PartialEq, Eq, Hash)] +pub struct Sha1Func { + signature: Signature, +} + +impl Default for Sha1Func { + fn default() -> Self { + Self::new() + } +} + +impl Sha1Func { + pub fn new() -> Self { + Self { + signature: Signature::any(1, Volatility::Immutable), + } + } +} + +impl ScalarUDFImpl for Sha1Func { + fn as_any(&self) -> &dyn Any { + self + } + + fn name(&self) -> &str { + "sha1" + } + + fn signature(&self) -> &Signature { + &self.signature + } + + fn return_type(&self, _: &[DataType]) -> Result { + Ok(DataType::Binary) + } + + fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result { + let args = args.args; + if args.len() != 1 { + return exec_err!( + "invalid number of args. {} expected 1 arg, found {}", + self.name(), + args.len() + ); + } + + match &args[0] { + ColumnarValue::Scalar(scalar) => { + let hash = hash_scalar(scalar)?; + Ok(ColumnarValue::Scalar(ScalarValue::Binary(hash))) + } + ColumnarValue::Array(arr) => { + let hashes = hash_array(arr.as_ref())?; + Ok(ColumnarValue::Array(Arc::new(hashes) as ArrayRef)) + } + } + } + + fn documentation(&self) -> Option<&Documentation> { + None + } +} + +fn hash_scalar(scalar: &ScalarValue) -> Result>> { + match scalar { + ScalarValue::Utf8(v) | ScalarValue::LargeUtf8(v) => { + Ok(v.as_deref().map(|s| sha1_bytes(s.as_bytes()))) + } + ScalarValue::Utf8View(v) => Ok(v.as_deref().map(|s| sha1_bytes(s.as_bytes()))), + ScalarValue::Binary(v) | ScalarValue::LargeBinary(v) => { + Ok(v.as_ref().map(|b| sha1_bytes(b))) + } + other => exec_err!("sha1: unsupported scalar type {:?}", other.data_type()), + } +} + +fn hash_array(arr: &dyn Array) -> Result { + let len = arr.len(); + let mut builder = BinaryBuilder::with_capacity(len, len * 20); + + match arr.data_type() { + DataType::Utf8 => { + let arr = arr.as_any().downcast_ref::().expect("Utf8"); + for v in arr.iter() { + match v { + Some(s) => builder.append_value(sha1_bytes(s.as_bytes())), + None => builder.append_null(), + } + } + } + DataType::LargeUtf8 => { + let arr = arr + .as_any() + .downcast_ref::() + .expect("LargeUtf8"); + for v in arr.iter() { + match v { + Some(s) => builder.append_value(sha1_bytes(s.as_bytes())), + None => builder.append_null(), + } + } + } + DataType::Utf8View => { + let arr = arr + .as_any() + .downcast_ref::() + .expect("Utf8View"); + for v in arr.iter() { + match v { + Some(s) => builder.append_value(sha1_bytes(s.as_bytes())), + None => builder.append_null(), + } + } + } + DataType::Binary => { + let arr = arr.as_any().downcast_ref::().expect("Binary"); + for v in arr.iter() { + match v { + Some(b) => builder.append_value(sha1_bytes(b)), + None => builder.append_null(), + } + } + } + DataType::LargeBinary => { + let arr = arr + .as_any() + .downcast_ref::() + .expect("LargeBinary"); + for v in arr.iter() { + match v { + Some(b) => builder.append_value(sha1_bytes(b)), + None => builder.append_null(), + } + } + } + other => return exec_err!("sha1: unsupported array type {:?}", other), + } + + Ok(builder.finish()) +} + +#[cfg(test)] +mod test { + use super::*; + use arrow::datatypes::Field; + use datafusion::common::config::ConfigOptions; + use datafusion::logical_expr::{ColumnarValue, ScalarFunctionArgs}; + use datafusion::scalar::ScalarValue; + use std::sync::Arc; + + fn invoke(arg: ColumnarValue) -> Result> { + let func = Sha1Func::new(); + let result = func.invoke_with_args(ScalarFunctionArgs { + args: vec![arg], + arg_fields: vec![], + number_rows: 1, + return_field: Field::new("", DataType::Binary, true).into(), + config_options: Arc::new(ConfigOptions::default()), + })?; + match result { + ColumnarValue::Scalar(ScalarValue::Binary(Some(v))) => Ok(v), + other => panic!("expected Scalar(Binary(Some(_))), got {:?}", other), + } + } + + #[test] + fn test_sha1_string() { + let v = invoke(ColumnarValue::Scalar(ScalarValue::Utf8(Some( + "hello".to_string(), + )))) + .unwrap(); + // sha1("hello") = aaf4c61ddcc5e8a2dabede0f3b482cd9aea9434d + assert_eq!( + v, + vec![ + 0xaa, 0xf4, 0xc6, 0x1d, 0xdc, 0xc5, 0xe8, 0xa2, 0xda, 0xbe, 0xde, 0x0f, 0x3b, 0x48, + 0x2c, 0xd9, 0xae, 0xa9, 0x43, 0x4d + ] + ); + } + + #[test] + fn test_sha1_binary_matches_string() { + let v = invoke(ColumnarValue::Scalar(ScalarValue::Binary(Some( + b"hello".to_vec(), + )))) + .unwrap(); + assert_eq!( + v, + vec![ + 0xaa, 0xf4, 0xc6, 0x1d, 0xdc, 0xc5, 0xe8, 0xa2, 0xda, 0xbe, 0xde, 0x0f, 0x3b, 0x48, + 0x2c, 0xd9, 0xae, 0xa9, 0x43, 0x4d + ] + ); + } + + #[test] + fn test_sha1_null_returns_none() { + let func = Sha1Func::new(); + let result = func + .invoke_with_args(ScalarFunctionArgs { + args: vec![ColumnarValue::Scalar(ScalarValue::Utf8(None))], + arg_fields: vec![], + number_rows: 1, + return_field: Field::new("", DataType::Binary, true).into(), + config_options: Arc::new(ConfigOptions::default()), + }) + .unwrap(); + match result { + ColumnarValue::Scalar(ScalarValue::Binary(None)) => {} + other => panic!("expected Scalar(Binary(None)), got {:?}", other), + } + } +} diff --git a/rust/otap-dataflow/crates/query-engine/src/pipeline/functions/xxh128.rs b/rust/otap-dataflow/crates/query-engine/src/pipeline/functions/xxh128.rs new file mode 100644 index 0000000000..0df7f3cc48 --- /dev/null +++ b/rust/otap-dataflow/crates/query-engine/src/pipeline/functions/xxh128.rs @@ -0,0 +1,221 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +use std::any::Any; +use std::sync::Arc; + +use arrow::array::{ + Array, ArrayRef, BinaryArray, BinaryBuilder, LargeBinaryArray, LargeStringArray, StringArray, + StringViewArray, +}; +use arrow::datatypes::DataType; +use datafusion::common::exec_err; +use datafusion::error::Result; +use datafusion::logical_expr::{ + ColumnarValue, Documentation, ScalarFunctionArgs, ScalarUDFImpl, Signature, Volatility, +}; +use datafusion::scalar::ScalarValue; +use xxhash_rust::xxh3::xxh3_128; + +#[derive(Debug, PartialEq, Eq, Hash)] +pub struct Xxh128Func { + signature: Signature, +} + +impl Default for Xxh128Func { + fn default() -> Self { + Self::new() + } +} + +impl Xxh128Func { + pub fn new() -> Self { + Self { + signature: Signature::any(1, Volatility::Immutable), + } + } +} + +impl ScalarUDFImpl for Xxh128Func { + fn as_any(&self) -> &dyn Any { + self + } + + fn name(&self) -> &str { + "xxh128" + } + + fn signature(&self) -> &Signature { + &self.signature + } + + fn return_type(&self, _: &[DataType]) -> Result { + Ok(DataType::Binary) + } + + fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result { + let args = args.args; + if args.len() != 1 { + return exec_err!( + "invalid number of args. {} expected 1 arg, found {}", + self.name(), + args.len() + ); + } + + match &args[0] { + ColumnarValue::Scalar(scalar) => { + let hash = hash_scalar(scalar)?; + Ok(ColumnarValue::Scalar(ScalarValue::Binary(hash))) + } + ColumnarValue::Array(arr) => { + let hashes = hash_array(arr.as_ref())?; + Ok(ColumnarValue::Array(Arc::new(hashes) as ArrayRef)) + } + } + } + + fn documentation(&self) -> Option<&Documentation> { + None + } +} + +fn to_bytes(hash: u128) -> Vec { + hash.to_be_bytes().to_vec() +} + +fn hash_scalar(scalar: &ScalarValue) -> Result>> { + match scalar { + ScalarValue::Utf8(v) | ScalarValue::LargeUtf8(v) => { + Ok(v.as_deref().map(|s| to_bytes(xxh3_128(s.as_bytes())))) + } + ScalarValue::Utf8View(v) => Ok(v.as_deref().map(|s| to_bytes(xxh3_128(s.as_bytes())))), + ScalarValue::Binary(v) | ScalarValue::LargeBinary(v) => { + Ok(v.as_ref().map(|b| to_bytes(xxh3_128(b)))) + } + other => exec_err!("xxh128: unsupported scalar type {:?}", other.data_type()), + } +} + +fn hash_array(arr: &dyn Array) -> Result { + let len = arr.len(); + let mut builder = BinaryBuilder::with_capacity(len, len * 16); + + match arr.data_type() { + DataType::Utf8 => { + let arr = arr.as_any().downcast_ref::().expect("Utf8"); + for v in arr.iter() { + match v { + Some(s) => builder.append_value(to_bytes(xxh3_128(s.as_bytes()))), + None => builder.append_null(), + } + } + } + DataType::LargeUtf8 => { + let arr = arr + .as_any() + .downcast_ref::() + .expect("LargeUtf8"); + for v in arr.iter() { + match v { + Some(s) => builder.append_value(to_bytes(xxh3_128(s.as_bytes()))), + None => builder.append_null(), + } + } + } + DataType::Utf8View => { + let arr = arr + .as_any() + .downcast_ref::() + .expect("Utf8View"); + for v in arr.iter() { + match v { + Some(s) => builder.append_value(to_bytes(xxh3_128(s.as_bytes()))), + None => builder.append_null(), + } + } + } + DataType::Binary => { + let arr = arr.as_any().downcast_ref::().expect("Binary"); + for v in arr.iter() { + match v { + Some(b) => builder.append_value(to_bytes(xxh3_128(b))), + None => builder.append_null(), + } + } + } + DataType::LargeBinary => { + let arr = arr + .as_any() + .downcast_ref::() + .expect("LargeBinary"); + for v in arr.iter() { + match v { + Some(b) => builder.append_value(to_bytes(xxh3_128(b))), + None => builder.append_null(), + } + } + } + other => return exec_err!("xxh128: unsupported array type {:?}", other), + } + + Ok(builder.finish()) +} + +#[cfg(test)] +mod test { + use super::*; + use arrow::datatypes::Field; + use datafusion::common::config::ConfigOptions; + use datafusion::logical_expr::{ColumnarValue, ScalarFunctionArgs}; + use datafusion::scalar::ScalarValue; + use std::sync::Arc; + + fn invoke(arg: ColumnarValue) -> Result { + Xxh128Func::new().invoke_with_args(ScalarFunctionArgs { + args: vec![arg], + arg_fields: vec![], + number_rows: 1, + return_field: Field::new("", DataType::Binary, true).into(), + config_options: Arc::new(ConfigOptions::default()), + }) + } + + #[test] + fn test_xxh128_string() { + let result = invoke(ColumnarValue::Scalar(ScalarValue::Utf8(Some( + "hello".to_string(), + )))) + .unwrap(); + match result { + ColumnarValue::Scalar(ScalarValue::Binary(Some(v))) => { + assert_eq!(v, to_bytes(xxh3_128(b"hello"))); + assert_eq!(v.len(), 16); + } + other => panic!("expected Scalar(Binary(Some(_))), got {:?}", other), + } + } + + #[test] + fn test_xxh128_binary_matches_string() { + let result = invoke(ColumnarValue::Scalar(ScalarValue::Binary(Some( + b"hello".to_vec(), + )))) + .unwrap(); + match result { + ColumnarValue::Scalar(ScalarValue::Binary(Some(v))) => { + assert_eq!(v, to_bytes(xxh3_128(b"hello"))); + } + other => panic!("expected Scalar(Binary(Some(_))), got {:?}", other), + } + } + + #[test] + fn test_xxh128_null() { + let result = invoke(ColumnarValue::Scalar(ScalarValue::Utf8(None))).unwrap(); + match result { + ColumnarValue::Scalar(ScalarValue::Binary(None)) => {} + other => panic!("expected Scalar(Binary(None)), got {:?}", other), + } + } +} diff --git a/rust/otap-dataflow/crates/query-engine/src/pipeline/functions/xxh3.rs b/rust/otap-dataflow/crates/query-engine/src/pipeline/functions/xxh3.rs new file mode 100644 index 0000000000..435a23d432 --- /dev/null +++ b/rust/otap-dataflow/crates/query-engine/src/pipeline/functions/xxh3.rs @@ -0,0 +1,196 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +use std::any::Any; +use std::sync::Arc; + +use arrow::array::{ + Array, ArrayRef, BinaryArray, Int64Array, LargeBinaryArray, LargeStringArray, StringArray, + StringViewArray, +}; +use arrow::datatypes::DataType; +use datafusion::common::exec_err; +use datafusion::error::Result; +use datafusion::logical_expr::{ + ColumnarValue, Documentation, ScalarFunctionArgs, ScalarUDFImpl, Signature, Volatility, +}; +use datafusion::scalar::ScalarValue; +use xxhash_rust::xxh3::xxh3_64; + +#[derive(Debug, PartialEq, Eq, Hash)] +pub struct Xxh3Func { + signature: Signature, +} + +impl Default for Xxh3Func { + fn default() -> Self { + Self::new() + } +} + +impl Xxh3Func { + pub fn new() -> Self { + Self { + signature: Signature::any(1, Volatility::Immutable), + } + } +} + +impl ScalarUDFImpl for Xxh3Func { + fn as_any(&self) -> &dyn Any { + self + } + + fn name(&self) -> &str { + "xxh3" + } + + fn signature(&self) -> &Signature { + &self.signature + } + + fn return_type(&self, _: &[DataType]) -> Result { + Ok(DataType::Int64) + } + + fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result { + let args = args.args; + if args.len() != 1 { + return exec_err!( + "invalid number of args. {} expected 1 arg, found {}", + self.name(), + args.len() + ); + } + + match &args[0] { + ColumnarValue::Scalar(scalar) => { + let hash = hash_scalar(scalar)?; + Ok(ColumnarValue::Scalar(ScalarValue::Int64(hash))) + } + ColumnarValue::Array(arr) => { + let hashes = hash_array(arr.as_ref())?; + Ok(ColumnarValue::Array(Arc::new(hashes) as ArrayRef)) + } + } + } + + fn documentation(&self) -> Option<&Documentation> { + None + } +} + +fn hash_scalar(scalar: &ScalarValue) -> Result> { + match scalar { + ScalarValue::Utf8(v) | ScalarValue::LargeUtf8(v) => { + Ok(v.as_deref().map(|s| xxh3_64(s.as_bytes()) as i64)) + } + ScalarValue::Utf8View(v) => Ok(v.as_deref().map(|s| xxh3_64(s.as_bytes()) as i64)), + ScalarValue::Binary(v) | ScalarValue::LargeBinary(v) => { + Ok(v.as_ref().map(|b| xxh3_64(b) as i64)) + } + other => exec_err!("xxh3: unsupported scalar type {:?}", other.data_type()), + } +} + +fn hash_array(arr: &dyn Array) -> Result { + match arr.data_type() { + DataType::Utf8 => { + let arr = arr.as_any().downcast_ref::().expect("Utf8"); + Ok(Int64Array::from_iter( + arr.iter().map(|v| v.map(|s| xxh3_64(s.as_bytes()) as i64)), + )) + } + DataType::LargeUtf8 => { + let arr = arr + .as_any() + .downcast_ref::() + .expect("LargeUtf8"); + Ok(Int64Array::from_iter( + arr.iter().map(|v| v.map(|s| xxh3_64(s.as_bytes()) as i64)), + )) + } + DataType::Utf8View => { + let arr = arr + .as_any() + .downcast_ref::() + .expect("Utf8View"); + Ok(Int64Array::from_iter( + arr.iter().map(|v| v.map(|s| xxh3_64(s.as_bytes()) as i64)), + )) + } + DataType::Binary => { + let arr = arr.as_any().downcast_ref::().expect("Binary"); + Ok(Int64Array::from_iter( + arr.iter().map(|v| v.map(|b| xxh3_64(b) as i64)), + )) + } + DataType::LargeBinary => { + let arr = arr + .as_any() + .downcast_ref::() + .expect("LargeBinary"); + Ok(Int64Array::from_iter( + arr.iter().map(|v| v.map(|b| xxh3_64(b) as i64)), + )) + } + other => exec_err!("xxh3: unsupported array type {:?}", other), + } +} + +#[cfg(test)] +mod test { + use super::*; + use arrow::datatypes::Field; + use datafusion::common::config::ConfigOptions; + use datafusion::logical_expr::{ColumnarValue, ScalarFunctionArgs}; + use datafusion::scalar::ScalarValue; + use std::sync::Arc; + + fn invoke(arg: ColumnarValue) -> Result { + Xxh3Func::new().invoke_with_args(ScalarFunctionArgs { + args: vec![arg], + arg_fields: vec![], + number_rows: 1, + return_field: Field::new("", DataType::Int64, true).into(), + config_options: Arc::new(ConfigOptions::default()), + }) + } + + #[test] + fn test_xxh3_string() { + let result = invoke(ColumnarValue::Scalar(ScalarValue::Utf8(Some( + "hello".to_string(), + )))) + .unwrap(); + match result { + ColumnarValue::Scalar(ScalarValue::Int64(Some(v))) => { + assert_eq!(v, xxh3_64(b"hello") as i64); + } + other => panic!("expected Scalar(Int64(Some(_))), got {:?}", other), + } + } + + #[test] + fn test_xxh3_binary_matches_string() { + let result = invoke(ColumnarValue::Scalar(ScalarValue::Binary(Some( + b"hello".to_vec(), + )))) + .unwrap(); + match result { + ColumnarValue::Scalar(ScalarValue::Int64(Some(v))) => { + assert_eq!(v, xxh3_64(b"hello") as i64); + } + other => panic!("expected Scalar(Int64(Some(_))), got {:?}", other), + } + } + + #[test] + fn test_xxh3_null() { + let result = invoke(ColumnarValue::Scalar(ScalarValue::Utf8(None))).unwrap(); + match result { + ColumnarValue::Scalar(ScalarValue::Int64(None)) => {} + other => panic!("expected Scalar(Int64(None)), got {:?}", other), + } + } +}