From c7189ac86c39488ec02166a23824201519a7766d Mon Sep 17 00:00:00 2001 From: SzymonIwaniuk Date: Wed, 6 May 2026 19:31:22 +0200 Subject: [PATCH 1/7] feat(query-engine): add md5 hash function support --- .../crates/query-engine/src/consts.rs | 1 + .../crates/query-engine/src/parser.rs | 3 +- .../query-engine/src/pipeline/assign.rs | 47 +++++++++++++++++++ .../crates/query-engine/src/pipeline/expr.rs | 5 +- 4 files changed, 53 insertions(+), 3 deletions(-) diff --git a/rust/otap-dataflow/crates/query-engine/src/consts.rs b/rust/otap-dataflow/crates/query-engine/src/consts.rs index 1fe1f6aa0d..742f9046b5 100644 --- a/rust/otap-dataflow/crates/query-engine/src/consts.rs +++ b/rust/otap-dataflow/crates/query-engine/src/consts.rs @@ -10,3 +10,4 @@ pub(crate) const VALUE_FIELD_NAME: &str = "value"; pub(crate) const ENCODE_FUNC_NAME: &str = "encode"; pub(crate) const REGEXP_SUBSTR_FUNC_NAME: &str = "regexp_substr"; pub(crate) const SHA256_FUNC_NAME: &str = "sha256"; +pub(crate) const MD5_FUNC_NAME: &str = "md5"; diff --git a/rust/otap-dataflow/crates/query-engine/src/parser.rs b/rust/otap-dataflow/crates/query-engine/src/parser.rs index 5a6de7d0e9..cbe9c837bd 100644 --- a/rust/otap-dataflow/crates/query-engine/src/parser.rs +++ b/rust/otap-dataflow/crates/query-engine/src/parser.rs @@ -10,7 +10,7 @@ use data_engine_expressions::{ }; use data_engine_parser_abstractions::ParserOptions; -use crate::consts::{ENCODE_FUNC_NAME, REGEXP_SUBSTR_FUNC_NAME, SHA256_FUNC_NAME}; +use crate::consts::{ENCODE_FUNC_NAME, MD5_FUNC_NAME, REGEXP_SUBSTR_FUNC_NAME, SHA256_FUNC_NAME}; /// Create parser options that can be used when parsing an expression that will be executed with /// this query engine @@ -31,6 +31,7 @@ pub fn default_parser_options() -> ParserOptions { // expression tree. // .with_external_function(SHA256_FUNC_NAME, param_placeholders(1), None) + .with_external_function(MD5_FUNC_NAME, param_placeholders(1), None) .with_external_function(ENCODE_FUNC_NAME, param_placeholders(2), None) .with_external_function( REGEXP_SUBSTR_FUNC_NAME, diff --git a/rust/otap-dataflow/crates/query-engine/src/pipeline/assign.rs b/rust/otap-dataflow/crates/query-engine/src/pipeline/assign.rs index 156fee8974..84aa6360df 100644 --- a/rust/otap-dataflow/crates/query-engine/src/pipeline/assign.rs +++ b/rust/otap-dataflow/crates/query-engine/src/pipeline/assign.rs @@ -4805,6 +4805,53 @@ mod test { test_update_attr_to_hash_function_call_result_all_supported_types::().await } + async fn test_update_attr_to_md5_function_call_result() { + let logs_data = to_logs_data(vec![ + LogRecord::build() + .attributes(vec![ + KeyValue::new("str_attr", AnyValue::new_string("y")), + KeyValue::new("binary_attr", AnyValue::new_bytes(b"418")), + ]) + .finish(), + ]); + let query = r#"logs | extend + attributes["str_attr"] = encode(md5(attributes["str_attr"]), "hex"), + attributes["binary_attr"] = encode(md5(attributes["binary_attr"]), "hex") + "#; + let pipeline_expr = P::parse_with_options(query, default_parser_options()) + .unwrap() + .pipeline; + let mut pipeline = Pipeline::new(pipeline_expr); + let input = otlp_to_otap(&OtlpProtoMessage::Logs(logs_data)); + let result = pipeline.execute(input).await.unwrap(); + let OtlpProtoMessage::Logs(result_logs_data) = otap_to_otlp(&result) else { + panic!("invalid signal type"); + }; + let log_0 = &result_logs_data.resource_logs[0].scope_logs[0].log_records[0]; + assert_eq!( + log_0.attributes, + vec![ + KeyValue::new( + "str_attr", + AnyValue::new_string("a6105c0a611b41b08f1209506350279e") + ), + KeyValue::new( + "binary_attr", + AnyValue::new_string("0ffe9bcd5a3d234d4e99e9a1fb9a5d2c") + ) + ] + ); + } + + #[tokio::test] + async fn test_update_attr_to_md5_function_call_result_opl_parser() { + test_update_attr_to_md5_function_call_result::().await + } + #[tokio::test] + async fn test_update_attr_to_md5_function_call_result_kql_parser() { + test_update_attr_to_md5_function_call_result::().await + } + async fn test_update_attr_to_substring_function_call_result() { let logs_data = to_logs_data(vec![ LogRecord::build() diff --git a/rust/otap-dataflow/crates/query-engine/src/pipeline/expr.rs b/rust/otap-dataflow/crates/query-engine/src/pipeline/expr.rs index d89a084502..b70f8fc8d6 100644 --- a/rust/otap-dataflow/crates/query-engine/src/pipeline/expr.rs +++ b/rust/otap-dataflow/crates/query-engine/src/pipeline/expr.rs @@ -59,7 +59,7 @@ use data_engine_expressions::{ }; use datafusion::common::DFSchema; use datafusion::functions::core::expr_ext::FieldAccessor; -use datafusion::functions::crypto::sha256; +use datafusion::functions::crypto::{md5, sha256}; use datafusion::functions::encoding::encode; use datafusion::functions::string::{concat, concat_ws, replace}; use datafusion::logical_expr::expr::ScalarFunction; @@ -76,7 +76,7 @@ use otap_df_pdata::arrays::{ use otap_df_pdata::proto::opentelemetry::arrow::v1::ArrowPayloadType; use otap_df_pdata::schema::consts; -use crate::consts::{ENCODE_FUNC_NAME, REGEXP_SUBSTR_FUNC_NAME, SHA256_FUNC_NAME}; +use crate::consts::{ENCODE_FUNC_NAME, MD5_FUNC_NAME, REGEXP_SUBSTR_FUNC_NAME, SHA256_FUNC_NAME}; use crate::error::{Error, Result}; use crate::pipeline::expr::join::{join, multi_join}; use crate::pipeline::expr::types::{ @@ -827,6 +827,7 @@ impl DataFusionFunctionDef { Self::new(regexp_substr(), ExprLogicalType::String, false, None) } SHA256_FUNC_NAME => Self::new(sha256(), ExprLogicalType::Binary, true, None), + MD5_FUNC_NAME => Self::new(md5(), ExprLogicalType::Binary, true, None), _ => return None, }) } From 4825ba54d592be99938b5ab430d195862bb671da Mon Sep 17 00:00:00 2001 From: SzymonIwaniuk Date: Wed, 6 May 2026 20:06:21 +0200 Subject: [PATCH 2/7] feat(query-engine): add fnv hash function support --- .../crates/query-engine/src/consts.rs | 1 + .../crates/query-engine/src/parser.rs | 5 +- .../query-engine/src/pipeline/assign.rs | 43 ++++ .../crates/query-engine/src/pipeline/expr.rs | 6 +- .../query-engine/src/pipeline/functions.rs | 2 + .../src/pipeline/functions/fnv.rs | 220 ++++++++++++++++++ 6 files changed, 275 insertions(+), 2 deletions(-) create mode 100644 rust/otap-dataflow/crates/query-engine/src/pipeline/functions/fnv.rs diff --git a/rust/otap-dataflow/crates/query-engine/src/consts.rs b/rust/otap-dataflow/crates/query-engine/src/consts.rs index 742f9046b5..c07bbdc974 100644 --- a/rust/otap-dataflow/crates/query-engine/src/consts.rs +++ b/rust/otap-dataflow/crates/query-engine/src/consts.rs @@ -11,3 +11,4 @@ pub(crate) const ENCODE_FUNC_NAME: &str = "encode"; pub(crate) const REGEXP_SUBSTR_FUNC_NAME: &str = "regexp_substr"; pub(crate) const SHA256_FUNC_NAME: &str = "sha256"; pub(crate) const MD5_FUNC_NAME: &str = "md5"; +pub(crate) const FNV_FUNC_NAME: &str = "fnv"; diff --git a/rust/otap-dataflow/crates/query-engine/src/parser.rs b/rust/otap-dataflow/crates/query-engine/src/parser.rs index cbe9c837bd..4a8073746b 100644 --- a/rust/otap-dataflow/crates/query-engine/src/parser.rs +++ b/rust/otap-dataflow/crates/query-engine/src/parser.rs @@ -10,7 +10,9 @@ use data_engine_expressions::{ }; use data_engine_parser_abstractions::ParserOptions; -use crate::consts::{ENCODE_FUNC_NAME, MD5_FUNC_NAME, REGEXP_SUBSTR_FUNC_NAME, SHA256_FUNC_NAME}; +use crate::consts::{ + ENCODE_FUNC_NAME, FNV_FUNC_NAME, MD5_FUNC_NAME, REGEXP_SUBSTR_FUNC_NAME, SHA256_FUNC_NAME, +}; /// Create parser options that can be used when parsing an expression that will be executed with /// this query engine @@ -32,6 +34,7 @@ pub fn default_parser_options() -> ParserOptions { // .with_external_function(SHA256_FUNC_NAME, param_placeholders(1), None) .with_external_function(MD5_FUNC_NAME, param_placeholders(1), None) + .with_external_function(FNV_FUNC_NAME, param_placeholders(1), None) .with_external_function(ENCODE_FUNC_NAME, param_placeholders(2), None) .with_external_function( REGEXP_SUBSTR_FUNC_NAME, diff --git a/rust/otap-dataflow/crates/query-engine/src/pipeline/assign.rs b/rust/otap-dataflow/crates/query-engine/src/pipeline/assign.rs index 84aa6360df..5439c908a1 100644 --- a/rust/otap-dataflow/crates/query-engine/src/pipeline/assign.rs +++ b/rust/otap-dataflow/crates/query-engine/src/pipeline/assign.rs @@ -4852,6 +4852,49 @@ mod test { test_update_attr_to_md5_function_call_result::().await } + async fn test_update_attr_to_fnv_hash_function_call_result() { + let logs_data = to_logs_data(vec![ + LogRecord::build() + .attributes(vec![KeyValue::new( + "str_attr", + AnyValue::new_string("hello"), + )]) + .finish(), + ]); + + // fnv returns an Int64 directly - no encode() wrapper needed + let query = r#"logs | extend attributes["str_attr"] = fnv(attributes["str_attr"])"#; + let pipeline_expr = P::parse_with_options(query, default_parser_options()) + .unwrap() + .pipeline; + let mut pipeline = Pipeline::new(pipeline_expr); + + let input = otlp_to_otap(&OtlpProtoMessage::Logs(logs_data)); + let result = pipeline.execute(input).await.unwrap(); + let OtlpProtoMessage::Logs(result_logs_data) = otap_to_otlp(&result) else { + panic!("invalid signal type"); + }; + let log_0 = &result_logs_data.resource_logs[0].scope_logs[0].log_records[0]; + assert_eq!( + log_0.attributes, + vec![KeyValue::new( + "str_attr", + // FNV-1a 64-bit of "hello" interpreted as i64 + AnyValue::new_int(-6615550055289275125_i64) + )] + ); + } + + #[tokio::test] + async fn test_update_attr_to_fnv_hash_function_call_result_opl_parser() { + test_update_attr_to_fnv_hash_function_call_result::().await + } + + #[tokio::test] + async fn test_update_attr_to_fnv_hash_function_call_result_kql_parser() { + test_update_attr_to_fnv_hash_function_call_result::().await + } + async fn test_update_attr_to_substring_function_call_result() { let logs_data = to_logs_data(vec![ LogRecord::build() diff --git a/rust/otap-dataflow/crates/query-engine/src/pipeline/expr.rs b/rust/otap-dataflow/crates/query-engine/src/pipeline/expr.rs index b70f8fc8d6..9f7db53b70 100644 --- a/rust/otap-dataflow/crates/query-engine/src/pipeline/expr.rs +++ b/rust/otap-dataflow/crates/query-engine/src/pipeline/expr.rs @@ -45,6 +45,7 @@ use std::ops::Deref; use std::rc::Rc; use std::sync::{Arc, LazyLock}; +use crate::pipeline::functions::fnv_hash; use arrow::array::{Array, ArrayRef, RecordBatch, StringArray, StructArray, UInt16Array}; use arrow::compute::filter_record_batch; use arrow::compute::kernels::cmp::eq; @@ -76,7 +77,9 @@ use otap_df_pdata::arrays::{ use otap_df_pdata::proto::opentelemetry::arrow::v1::ArrowPayloadType; use otap_df_pdata::schema::consts; -use crate::consts::{ENCODE_FUNC_NAME, MD5_FUNC_NAME, REGEXP_SUBSTR_FUNC_NAME, SHA256_FUNC_NAME}; +use crate::consts::{ + ENCODE_FUNC_NAME, FNV_FUNC_NAME, MD5_FUNC_NAME, REGEXP_SUBSTR_FUNC_NAME, SHA256_FUNC_NAME, +}; use crate::error::{Error, Result}; use crate::pipeline::expr::join::{join, multi_join}; use crate::pipeline::expr::types::{ @@ -828,6 +831,7 @@ impl DataFusionFunctionDef { } SHA256_FUNC_NAME => Self::new(sha256(), ExprLogicalType::Binary, true, None), MD5_FUNC_NAME => Self::new(md5(), ExprLogicalType::Binary, true, None), + FNV_FUNC_NAME => Self::new(fnv_hash(), ExprLogicalType::Int64, true, None), _ => return None, }) } diff --git a/rust/otap-dataflow/crates/query-engine/src/pipeline/functions.rs b/rust/otap-dataflow/crates/query-engine/src/pipeline/functions.rs index 5e70899b54..4be0f34a3e 100644 --- a/rust/otap-dataflow/crates/query-engine/src/pipeline/functions.rs +++ b/rust/otap-dataflow/crates/query-engine/src/pipeline/functions.rs @@ -13,11 +13,13 @@ use datafusion::logical_expr::{self as datafusion_expr, TypeSignature}; use datafusion::logical_expr_common::signature::Arity; mod contains; +mod fnv; pub(crate) mod is_type; mod regexp_substr; mod substring; make_udf_function!(contains::ExtendedContainsFunc, contains); +make_udf_function!(fnv::FnvHashFunc, fnv_hash); make_udf_function!(substring::SubstringFunc, substring); make_udf_function!(regexp_substr::RegexpSubstrFunc, regexp_substr); diff --git a/rust/otap-dataflow/crates/query-engine/src/pipeline/functions/fnv.rs b/rust/otap-dataflow/crates/query-engine/src/pipeline/functions/fnv.rs new file mode 100644 index 0000000000..d0eaf3ffca --- /dev/null +++ b/rust/otap-dataflow/crates/query-engine/src/pipeline/functions/fnv.rs @@ -0,0 +1,220 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +use std::any::Any; +use std::sync::Arc; + +use arrow::array::{Array, ArrayRef, BinaryArray, Int64Array, LargeStringArray, StringArray}; +use arrow::datatypes::DataType; +use datafusion::common::exec_err; +use datafusion::error::Result; +use datafusion::logical_expr::{ + ColumnarValue, Documentation, ScalarFunctionArgs, ScalarUDFImpl, Signature, Volatility, +}; +use datafusion::scalar::ScalarValue; + +const FNV_OFFSET_BASIS: u64 = 14695981039346656037; +const FNV_PRIME: u64 = 1099511628211; + +fn fnv1a_64(data: &[u8]) -> i64 { + let mut hash = FNV_OFFSET_BASIS; + for byte in data { + hash ^= u64::from(*byte); + hash = hash.wrapping_mul(FNV_PRIME); + } + hash as i64 +} + +#[derive(Debug, PartialEq, Eq, Hash)] +pub struct FnvHashFunc { + signature: Signature, +} + +impl Default for FnvHashFunc { + fn default() -> Self { + Self::new() + } +} + +impl FnvHashFunc { + pub fn new() -> Self { + Self { + signature: Signature::any(1, Volatility::Immutable), + } + } +} + +impl ScalarUDFImpl for FnvHashFunc { + fn as_any(&self) -> &dyn Any { + self + } + + fn name(&self) -> &str { + "fnv" + } + + fn signature(&self) -> &Signature { + &self.signature + } + + fn return_type(&self, _: &[DataType]) -> Result { + Ok(DataType::Int64) + } + + fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result { + let args = args.args; + if args.len() != 1 { + return exec_err!( + "invalid number of args. {} expected 1 arg, found {}", + self.name(), + args.len() + ); + } + + match &args[0] { + ColumnarValue::Scalar(scalar) => { + let hash = hash_scalar(scalar)?; + Ok(ColumnarValue::Scalar(ScalarValue::Int64(hash))) + } + ColumnarValue::Array(arr) => { + let hashes = hash_array(arr.as_ref())?; + Ok(ColumnarValue::Array(Arc::new(hashes) as ArrayRef)) + } + } + } + + fn documentation(&self) -> Option<&Documentation> { + None + } +} + +fn hash_scalar(scalar: &ScalarValue) -> Result> { + match scalar { + ScalarValue::Utf8(v) | ScalarValue::LargeUtf8(v) => { + Ok(v.as_deref().map(|s| fnv1a_64(s.as_bytes()))) + } + ScalarValue::Utf8View(v) => Ok(v.as_deref().map(|s| fnv1a_64(s.as_bytes()))), + ScalarValue::Binary(v) | ScalarValue::LargeBinary(v) => Ok(v.as_ref().map(|b| fnv1a_64(b))), + other => exec_err!("fnv: unsupported scalar type {:?}", other.data_type()), + } +} + +fn hash_array(arr: &dyn Array) -> Result { + match arr.data_type() { + DataType::Utf8 => { + let arr = arr.as_any().downcast_ref::().expect("Utf8"); + Ok(Int64Array::from_iter( + arr.iter().map(|v| v.map(|s| fnv1a_64(s.as_bytes()))), + )) + } + DataType::LargeUtf8 => { + let arr = arr + .as_any() + .downcast_ref::() + .expect("LargeUtf8"); + Ok(Int64Array::from_iter( + arr.iter().map(|v| v.map(|s| fnv1a_64(s.as_bytes()))), + )) + } + DataType::Binary => { + let arr = arr.as_any().downcast_ref::().expect("Binary"); + Ok(Int64Array::from_iter(arr.iter().map(|v| v.map(fnv1a_64)))) + } + other => exec_err!("fnv: unsupported array type {:?}", other), + } +} + +#[cfg(test)] +mod test { + use super::*; + use arrow::datatypes::Field; + use datafusion::common::config::ConfigOptions; + use datafusion::logical_expr::{ColumnarValue, ScalarFunctionArgs}; + use datafusion::scalar::ScalarValue; + use std::sync::Arc; + + fn invoke(arg: ColumnarValue) -> Result { + let func = FnvHashFunc::new(); + let result = func.invoke_with_args(ScalarFunctionArgs { + args: vec![arg], + arg_fields: vec![], + number_rows: 1, + return_field: Field::new("", DataType::Int64, true).into(), + config_options: Arc::new(ConfigOptions::default()), + })?; + match result { + ColumnarValue::Scalar(ScalarValue::Int64(Some(v))) => Ok(v), + other => panic!("expected Scalar(Int64(Some(_))), got {:?}", other), + } + } + + #[test] + fn test_fnv_empty_string() { + // FNV-1a 64-bit of empty input is the offset basis itself + let v = invoke(ColumnarValue::Scalar(ScalarValue::Utf8( + Some(String::new()), + ))) + .unwrap(); + assert_eq!(v, FNV_OFFSET_BASIS as i64); + } + + #[test] + fn test_fnv_string() { + let v = invoke(ColumnarValue::Scalar(ScalarValue::Utf8(Some( + "hello".to_string(), + )))) + .unwrap(); + assert_eq!(v, -6615550055289275125_i64); + } + + #[test] + fn test_fnv_binary() { + let v = invoke(ColumnarValue::Scalar(ScalarValue::Binary(Some( + b"hello".to_vec(), + )))) + .unwrap(); + assert_eq!(v, -6615550055289275125_i64); + } + + #[test] + fn test_fnv_null_returns_none() { + let func = FnvHashFunc::new(); + let result = func + .invoke_with_args(ScalarFunctionArgs { + args: vec![ColumnarValue::Scalar(ScalarValue::Utf8(None))], + arg_fields: vec![], + number_rows: 1, + return_field: Field::new("", DataType::Int64, true).into(), + config_options: Arc::new(ConfigOptions::default()), + }) + .unwrap(); + match result { + ColumnarValue::Scalar(ScalarValue::Int64(None)) => {} + other => panic!("expected Scalar(Int64(None)), got {:?}", other), + } + } + + #[test] + fn test_fnv_array() { + use arrow::array::StringArray; + let arr = StringArray::from(vec![Some("hello"), Some("world"), None]); + let func = FnvHashFunc::new(); + let result = func + .invoke_with_args(ScalarFunctionArgs { + args: vec![ColumnarValue::Array(Arc::new(arr))], + arg_fields: vec![], + number_rows: 3, + return_field: Field::new("", DataType::Int64, true).into(), + config_options: Arc::new(ConfigOptions::default()), + }) + .unwrap(); + let ColumnarValue::Array(arr) = result else { + panic!("expected array"); + }; + let arr = arr.as_any().downcast_ref::().unwrap(); + assert_eq!(arr.value(0), -6615550055289275125_i64); + assert!(!arr.is_null(0)); + assert!(!arr.is_null(1)); + assert!(arr.is_null(2)); + } +} From 582c3a0668b01987e920e81389a4cb5b23827c14 Mon Sep 17 00:00:00 2001 From: SzymonIwaniuk Date: Wed, 6 May 2026 20:19:16 +0200 Subject: [PATCH 3/7] feat(query-engine): add murmur3 hash function support --- .../crates/query-engine/src/consts.rs | 1 + .../crates/query-engine/src/parser.rs | 4 +- .../query-engine/src/pipeline/assign.rs | 43 +++ .../crates/query-engine/src/pipeline/expr.rs | 6 +- .../query-engine/src/pipeline/functions.rs | 2 + .../src/pipeline/functions/murmur3.rs | 260 ++++++++++++++++++ 6 files changed, 313 insertions(+), 3 deletions(-) create mode 100644 rust/otap-dataflow/crates/query-engine/src/pipeline/functions/murmur3.rs diff --git a/rust/otap-dataflow/crates/query-engine/src/consts.rs b/rust/otap-dataflow/crates/query-engine/src/consts.rs index c07bbdc974..496616b7f7 100644 --- a/rust/otap-dataflow/crates/query-engine/src/consts.rs +++ b/rust/otap-dataflow/crates/query-engine/src/consts.rs @@ -12,3 +12,4 @@ pub(crate) const REGEXP_SUBSTR_FUNC_NAME: &str = "regexp_substr"; pub(crate) const SHA256_FUNC_NAME: &str = "sha256"; pub(crate) const MD5_FUNC_NAME: &str = "md5"; pub(crate) const FNV_FUNC_NAME: &str = "fnv"; +pub(crate) const MURMUR3_FUNC_NAME: &str = "murmur3"; diff --git a/rust/otap-dataflow/crates/query-engine/src/parser.rs b/rust/otap-dataflow/crates/query-engine/src/parser.rs index 4a8073746b..8bc848a9fa 100644 --- a/rust/otap-dataflow/crates/query-engine/src/parser.rs +++ b/rust/otap-dataflow/crates/query-engine/src/parser.rs @@ -11,7 +11,8 @@ use data_engine_expressions::{ use data_engine_parser_abstractions::ParserOptions; use crate::consts::{ - ENCODE_FUNC_NAME, FNV_FUNC_NAME, MD5_FUNC_NAME, REGEXP_SUBSTR_FUNC_NAME, SHA256_FUNC_NAME, + ENCODE_FUNC_NAME, FNV_FUNC_NAME, MD5_FUNC_NAME, MURMUR3_FUNC_NAME, REGEXP_SUBSTR_FUNC_NAME, + SHA256_FUNC_NAME, }; /// Create parser options that can be used when parsing an expression that will be executed with @@ -35,6 +36,7 @@ pub fn default_parser_options() -> ParserOptions { .with_external_function(SHA256_FUNC_NAME, param_placeholders(1), None) .with_external_function(MD5_FUNC_NAME, param_placeholders(1), None) .with_external_function(FNV_FUNC_NAME, param_placeholders(1), None) + .with_external_function(MURMUR3_FUNC_NAME, param_placeholders(1), None) .with_external_function(ENCODE_FUNC_NAME, param_placeholders(2), None) .with_external_function( REGEXP_SUBSTR_FUNC_NAME, diff --git a/rust/otap-dataflow/crates/query-engine/src/pipeline/assign.rs b/rust/otap-dataflow/crates/query-engine/src/pipeline/assign.rs index 5439c908a1..356b0579bd 100644 --- a/rust/otap-dataflow/crates/query-engine/src/pipeline/assign.rs +++ b/rust/otap-dataflow/crates/query-engine/src/pipeline/assign.rs @@ -4895,6 +4895,49 @@ mod test { test_update_attr_to_fnv_hash_function_call_result::().await } + async fn test_update_attr_to_murmur3_hash_function_call_result() { + let logs_data = to_logs_data(vec![ + LogRecord::build() + .attributes(vec![KeyValue::new( + "str_attr", + AnyValue::new_string("hello"), + )]) + .finish(), + ]); + + // murmur3 returns an Int64 directly - no encode() wrapper needed + let query = r#"logs | extend attributes["str_attr"] = murmur3(attributes["str_attr"])"#; + let pipeline_expr = P::parse_with_options(query, default_parser_options()) + .unwrap() + .pipeline; + let mut pipeline = Pipeline::new(pipeline_expr); + + let input = otlp_to_otap(&OtlpProtoMessage::Logs(logs_data)); + let result = pipeline.execute(input).await.unwrap(); + let OtlpProtoMessage::Logs(result_logs_data) = otap_to_otlp(&result) else { + panic!("invalid signal type"); + }; + let log_0 = &result_logs_data.resource_logs[0].scope_logs[0].log_records[0]; + assert_eq!( + log_0.attributes, + vec![KeyValue::new( + "str_attr", + // MurmurHash3 32-bit of "hello" with seed=0 + AnyValue::new_int(613_153_351_i64) + )] + ); + } + + #[tokio::test] + async fn test_update_attr_to_murmur3_hash_function_call_result_opl_parser() { + test_update_attr_to_murmur3_hash_function_call_result::().await + } + + #[tokio::test] + async fn test_update_attr_to_murmur3_hash_function_call_result_kql_parser() { + test_update_attr_to_murmur3_hash_function_call_result::().await + } + async fn test_update_attr_to_substring_function_call_result() { let logs_data = to_logs_data(vec![ LogRecord::build() diff --git a/rust/otap-dataflow/crates/query-engine/src/pipeline/expr.rs b/rust/otap-dataflow/crates/query-engine/src/pipeline/expr.rs index 9f7db53b70..e56a94fb67 100644 --- a/rust/otap-dataflow/crates/query-engine/src/pipeline/expr.rs +++ b/rust/otap-dataflow/crates/query-engine/src/pipeline/expr.rs @@ -45,7 +45,7 @@ use std::ops::Deref; use std::rc::Rc; use std::sync::{Arc, LazyLock}; -use crate::pipeline::functions::fnv_hash; +use crate::pipeline::functions::{fnv_hash, murmur3_hash}; use arrow::array::{Array, ArrayRef, RecordBatch, StringArray, StructArray, UInt16Array}; use arrow::compute::filter_record_batch; use arrow::compute::kernels::cmp::eq; @@ -78,7 +78,8 @@ use otap_df_pdata::proto::opentelemetry::arrow::v1::ArrowPayloadType; use otap_df_pdata::schema::consts; use crate::consts::{ - ENCODE_FUNC_NAME, FNV_FUNC_NAME, MD5_FUNC_NAME, REGEXP_SUBSTR_FUNC_NAME, SHA256_FUNC_NAME, + ENCODE_FUNC_NAME, FNV_FUNC_NAME, MD5_FUNC_NAME, MURMUR3_FUNC_NAME, REGEXP_SUBSTR_FUNC_NAME, + SHA256_FUNC_NAME, }; use crate::error::{Error, Result}; use crate::pipeline::expr::join::{join, multi_join}; @@ -832,6 +833,7 @@ impl DataFusionFunctionDef { SHA256_FUNC_NAME => Self::new(sha256(), ExprLogicalType::Binary, true, None), MD5_FUNC_NAME => Self::new(md5(), ExprLogicalType::Binary, true, None), FNV_FUNC_NAME => Self::new(fnv_hash(), ExprLogicalType::Int64, true, None), + MURMUR3_FUNC_NAME => Self::new(murmur3_hash(), ExprLogicalType::Int64, true, None), _ => return None, }) } diff --git a/rust/otap-dataflow/crates/query-engine/src/pipeline/functions.rs b/rust/otap-dataflow/crates/query-engine/src/pipeline/functions.rs index 4be0f34a3e..ac57a19b04 100644 --- a/rust/otap-dataflow/crates/query-engine/src/pipeline/functions.rs +++ b/rust/otap-dataflow/crates/query-engine/src/pipeline/functions.rs @@ -15,11 +15,13 @@ use datafusion::logical_expr_common::signature::Arity; mod contains; mod fnv; pub(crate) mod is_type; +mod murmur3; mod regexp_substr; mod substring; make_udf_function!(contains::ExtendedContainsFunc, contains); make_udf_function!(fnv::FnvHashFunc, fnv_hash); +make_udf_function!(murmur3::Murmur3HashFunc, murmur3_hash); make_udf_function!(substring::SubstringFunc, substring); make_udf_function!(regexp_substr::RegexpSubstrFunc, regexp_substr); diff --git a/rust/otap-dataflow/crates/query-engine/src/pipeline/functions/murmur3.rs b/rust/otap-dataflow/crates/query-engine/src/pipeline/functions/murmur3.rs new file mode 100644 index 0000000000..70bd05249c --- /dev/null +++ b/rust/otap-dataflow/crates/query-engine/src/pipeline/functions/murmur3.rs @@ -0,0 +1,260 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +use std::any::Any; +use std::sync::Arc; + +use arrow::array::{Array, ArrayRef, BinaryArray, Int64Array, LargeStringArray, StringArray}; +use arrow::datatypes::DataType; +use datafusion::common::exec_err; +use datafusion::error::Result; +use datafusion::logical_expr::{ + ColumnarValue, Documentation, ScalarFunctionArgs, ScalarUDFImpl, Signature, Volatility, +}; +use datafusion::scalar::ScalarValue; + +/// MurmurHash3 32-bit (seed=0). Returns the hash as i64 for parity with OTTL. +fn murmur3_32(data: &[u8]) -> i64 { + const C1: u32 = 0xcc9e2d51; + const C2: u32 = 0x1b873593; + + let mut h: u32 = 0; + let chunks = data.chunks_exact(4); + let remainder = chunks.remainder(); + + for chunk in chunks { + let mut k = u32::from_le_bytes([chunk[0], chunk[1], chunk[2], chunk[3]]); + k = k.wrapping_mul(C1).rotate_left(15).wrapping_mul(C2); + h ^= k; + h = h.rotate_left(13).wrapping_mul(5).wrapping_add(0xe6546b64); + } + + let mut k: u32 = 0; + match remainder.len() { + 3 => { + k ^= u32::from(remainder[2]) << 16; + k ^= u32::from(remainder[1]) << 8; + k ^= u32::from(remainder[0]); + k = k.wrapping_mul(C1).rotate_left(15).wrapping_mul(C2); + h ^= k; + } + 2 => { + k ^= u32::from(remainder[1]) << 8; + k ^= u32::from(remainder[0]); + k = k.wrapping_mul(C1).rotate_left(15).wrapping_mul(C2); + h ^= k; + } + 1 => { + k ^= u32::from(remainder[0]); + k = k.wrapping_mul(C1).rotate_left(15).wrapping_mul(C2); + h ^= k; + } + _ => {} + } + + h ^= data.len() as u32; + h ^= h >> 16; + h = h.wrapping_mul(0x85ebca6b); + h ^= h >> 13; + h = h.wrapping_mul(0xc2b2ae35); + h ^= h >> 16; + + i64::from(h) +} + +#[derive(Debug, PartialEq, Eq, Hash)] +pub struct Murmur3HashFunc { + signature: Signature, +} + +impl Default for Murmur3HashFunc { + fn default() -> Self { + Self::new() + } +} + +impl Murmur3HashFunc { + pub fn new() -> Self { + Self { + signature: Signature::any(1, Volatility::Immutable), + } + } +} + +impl ScalarUDFImpl for Murmur3HashFunc { + fn as_any(&self) -> &dyn Any { + self + } + + fn name(&self) -> &str { + "murmur3" + } + + fn signature(&self) -> &Signature { + &self.signature + } + + fn return_type(&self, _: &[DataType]) -> Result { + Ok(DataType::Int64) + } + + fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result { + let args = args.args; + if args.len() != 1 { + return exec_err!( + "invalid number of args. {} expected 1 arg, found {}", + self.name(), + args.len() + ); + } + + match &args[0] { + ColumnarValue::Scalar(scalar) => { + let hash = hash_scalar(scalar)?; + Ok(ColumnarValue::Scalar(ScalarValue::Int64(hash))) + } + ColumnarValue::Array(arr) => { + let hashes = hash_array(arr.as_ref())?; + Ok(ColumnarValue::Array(Arc::new(hashes) as ArrayRef)) + } + } + } + + fn documentation(&self) -> Option<&Documentation> { + None + } +} + +fn hash_scalar(scalar: &ScalarValue) -> Result> { + match scalar { + ScalarValue::Utf8(v) | ScalarValue::LargeUtf8(v) => { + Ok(v.as_deref().map(|s| murmur3_32(s.as_bytes()))) + } + ScalarValue::Utf8View(v) => Ok(v.as_deref().map(|s| murmur3_32(s.as_bytes()))), + ScalarValue::Binary(v) | ScalarValue::LargeBinary(v) => { + Ok(v.as_ref().map(|b| murmur3_32(b))) + } + other => exec_err!("murmur3: unsupported scalar type {:?}", other.data_type()), + } +} + +fn hash_array(arr: &dyn Array) -> Result { + match arr.data_type() { + DataType::Utf8 => { + let arr = arr.as_any().downcast_ref::().expect("Utf8"); + Ok(Int64Array::from_iter( + arr.iter().map(|v| v.map(|s| murmur3_32(s.as_bytes()))), + )) + } + DataType::LargeUtf8 => { + let arr = arr + .as_any() + .downcast_ref::() + .expect("LargeUtf8"); + Ok(Int64Array::from_iter( + arr.iter().map(|v| v.map(|s| murmur3_32(s.as_bytes()))), + )) + } + DataType::Binary => { + let arr = arr.as_any().downcast_ref::().expect("Binary"); + Ok(Int64Array::from_iter(arr.iter().map(|v| v.map(murmur3_32)))) + } + other => exec_err!("murmur3: unsupported array type {:?}", other), + } +} + +#[cfg(test)] +mod test { + use super::*; + use arrow::datatypes::Field; + use datafusion::common::config::ConfigOptions; + use datafusion::logical_expr::{ColumnarValue, ScalarFunctionArgs}; + use datafusion::scalar::ScalarValue; + use std::sync::Arc; + + fn invoke(arg: ColumnarValue) -> Result { + let func = Murmur3HashFunc::new(); + let result = func.invoke_with_args(ScalarFunctionArgs { + args: vec![arg], + arg_fields: vec![], + number_rows: 1, + return_field: Field::new("", DataType::Int64, true).into(), + config_options: Arc::new(ConfigOptions::default()), + })?; + match result { + ColumnarValue::Scalar(ScalarValue::Int64(Some(v))) => Ok(v), + other => panic!("expected Scalar(Int64(Some(_))), got {:?}", other), + } + } + + #[test] + fn test_murmur3_empty_string() { + // murmur3 of empty input with seed=0 is 0 + let v = invoke(ColumnarValue::Scalar(ScalarValue::Utf8( + Some(String::new()), + ))) + .unwrap(); + assert_eq!(v, 0); + } + + #[test] + fn test_murmur3_string() { + let v = invoke(ColumnarValue::Scalar(ScalarValue::Utf8(Some( + "hello".to_string(), + )))) + .unwrap(); + // pre-computed MurmurHash3 32-bit of "hello" with seed=0 + assert_eq!(v, 613_153_351_i64); + } + + #[test] + fn test_murmur3_binary_matches_string() { + let v = invoke(ColumnarValue::Scalar(ScalarValue::Binary(Some( + b"hello".to_vec(), + )))) + .unwrap(); + assert_eq!(v, 613_153_351_i64); + } + + #[test] + fn test_murmur3_null_returns_none() { + let func = Murmur3HashFunc::new(); + let result = func + .invoke_with_args(ScalarFunctionArgs { + args: vec![ColumnarValue::Scalar(ScalarValue::Utf8(None))], + arg_fields: vec![], + number_rows: 1, + return_field: Field::new("", DataType::Int64, true).into(), + config_options: Arc::new(ConfigOptions::default()), + }) + .unwrap(); + match result { + ColumnarValue::Scalar(ScalarValue::Int64(None)) => {} + other => panic!("expected Scalar(Int64(None)), got {:?}", other), + } + } + + #[test] + fn test_murmur3_array() { + use arrow::array::StringArray; + let arr = StringArray::from(vec![Some("hello"), Some("world"), None]); + let func = Murmur3HashFunc::new(); + let result = func + .invoke_with_args(ScalarFunctionArgs { + args: vec![ColumnarValue::Array(Arc::new(arr))], + arg_fields: vec![], + number_rows: 3, + return_field: Field::new("", DataType::Int64, true).into(), + config_options: Arc::new(ConfigOptions::default()), + }) + .unwrap(); + let ColumnarValue::Array(arr) = result else { + panic!("expected array"); + }; + let arr = arr.as_any().downcast_ref::().expect("Int64"); + assert_eq!(arr.value(0), 613_153_351_i64); + assert!(!arr.is_null(0)); + assert!(!arr.is_null(1)); + assert!(arr.is_null(2)); + } +} From ac1934ccc38a28553e0abefed5e78f95d6b13787 Mon Sep 17 00:00:00 2001 From: SzymonIwaniuk Date: Thu, 7 May 2026 10:05:48 +0200 Subject: [PATCH 4/7] feat(query-engine): add sha1 and sha512 hash function support --- rust/otap-dataflow/Cargo.toml | 1 + .../crates/query-engine/Cargo.toml | 1 + .../crates/query-engine/src/consts.rs | 2 + .../crates/query-engine/src/parser.rs | 4 +- .../query-engine/src/pipeline/assign.rs | 85 +++++++ .../crates/query-engine/src/pipeline/expr.rs | 11 +- .../query-engine/src/pipeline/functions.rs | 2 + .../src/pipeline/functions/fnv.rs | 7 +- .../src/pipeline/functions/sha1.rs | 210 ++++++++++++++++++ 9 files changed, 317 insertions(+), 6 deletions(-) create mode 100644 rust/otap-dataflow/crates/query-engine/src/pipeline/functions/sha1.rs diff --git a/rust/otap-dataflow/Cargo.toml b/rust/otap-dataflow/Cargo.toml index 9d68f51450..d6718429ad 100644 --- a/rust/otap-dataflow/Cargo.toml +++ b/rust/otap-dataflow/Cargo.toml @@ -211,6 +211,7 @@ weaver_forge = { git = "https://github.com/open-telemetry/weaver.git", tag = "v0 weaver_resolved_schema = { git = "https://github.com/open-telemetry/weaver.git", tag = "v0.21.2"} weaver_resolver = { git = "https://github.com/open-telemetry/weaver.git", tag = "v0.21.2"} weaver_semconv = { git = "https://github.com/open-telemetry/weaver.git", tag = "v0.21.2"} +sha1 = { version = "0.10", features = ["oid"] } xxhash-rust = { version = "0.8", features = ["xxh3"] } zip = "=8.6.0" byte-unit = { version = "5.2.0", features = ["serde"] } diff --git a/rust/otap-dataflow/crates/query-engine/Cargo.toml b/rust/otap-dataflow/crates/query-engine/Cargo.toml index 4987dff46a..9c52c59594 100644 --- a/rust/otap-dataflow/crates/query-engine/Cargo.toml +++ b/rust/otap-dataflow/crates/query-engine/Cargo.toml @@ -17,6 +17,7 @@ datafusion = { workspace = true, features = [ ] } futures-core = { workspace = true } memchr = { workspace = true } +sha1 = { workspace = true } parking_lot = { workspace = true } smallvec = { workspace = true } thiserror = { workspace = true } diff --git a/rust/otap-dataflow/crates/query-engine/src/consts.rs b/rust/otap-dataflow/crates/query-engine/src/consts.rs index 496616b7f7..e0e82dc3c9 100644 --- a/rust/otap-dataflow/crates/query-engine/src/consts.rs +++ b/rust/otap-dataflow/crates/query-engine/src/consts.rs @@ -13,3 +13,5 @@ pub(crate) const SHA256_FUNC_NAME: &str = "sha256"; pub(crate) const MD5_FUNC_NAME: &str = "md5"; pub(crate) const FNV_FUNC_NAME: &str = "fnv"; pub(crate) const MURMUR3_FUNC_NAME: &str = "murmur3"; +pub(crate) const SHA1_FUNC_NAME: &str = "sha1"; +pub(crate) const SHA512_FUNC_NAME: &str = "sha512"; diff --git a/rust/otap-dataflow/crates/query-engine/src/parser.rs b/rust/otap-dataflow/crates/query-engine/src/parser.rs index 8bc848a9fa..7a5661d340 100644 --- a/rust/otap-dataflow/crates/query-engine/src/parser.rs +++ b/rust/otap-dataflow/crates/query-engine/src/parser.rs @@ -12,7 +12,7 @@ use data_engine_parser_abstractions::ParserOptions; use crate::consts::{ ENCODE_FUNC_NAME, FNV_FUNC_NAME, MD5_FUNC_NAME, MURMUR3_FUNC_NAME, REGEXP_SUBSTR_FUNC_NAME, - SHA256_FUNC_NAME, + SHA1_FUNC_NAME, SHA256_FUNC_NAME, SHA512_FUNC_NAME, }; /// Create parser options that can be used when parsing an expression that will be executed with @@ -37,6 +37,8 @@ pub fn default_parser_options() -> ParserOptions { .with_external_function(MD5_FUNC_NAME, param_placeholders(1), None) .with_external_function(FNV_FUNC_NAME, param_placeholders(1), None) .with_external_function(MURMUR3_FUNC_NAME, param_placeholders(1), None) + .with_external_function(SHA1_FUNC_NAME, param_placeholders(1), None) + .with_external_function(SHA512_FUNC_NAME, param_placeholders(1), None) .with_external_function(ENCODE_FUNC_NAME, param_placeholders(2), None) .with_external_function( REGEXP_SUBSTR_FUNC_NAME, diff --git a/rust/otap-dataflow/crates/query-engine/src/pipeline/assign.rs b/rust/otap-dataflow/crates/query-engine/src/pipeline/assign.rs index 356b0579bd..aa704018e1 100644 --- a/rust/otap-dataflow/crates/query-engine/src/pipeline/assign.rs +++ b/rust/otap-dataflow/crates/query-engine/src/pipeline/assign.rs @@ -4938,6 +4938,91 @@ mod test { test_update_attr_to_murmur3_hash_function_call_result::().await } + async fn test_update_attr_to_sha1_hash_function_call_result() { + let logs_data = to_logs_data(vec![ + LogRecord::build() + .attributes(vec![KeyValue::new( + "str_attr", + AnyValue::new_string("hello"), + )]) + .finish(), + ]); + + let query = + r#"logs | extend attributes["str_attr"] = encode(sha1(attributes["str_attr"]), "hex")"#; + let pipeline_expr = P::parse_with_options(query, default_parser_options()) + .unwrap() + .pipeline; + let mut pipeline = Pipeline::new(pipeline_expr); + + let input = otlp_to_otap(&OtlpProtoMessage::Logs(logs_data)); + let result = pipeline.execute(input).await.unwrap(); + let OtlpProtoMessage::Logs(result_logs_data) = otap_to_otlp(&result) else { + panic!("invalid signal type"); + }; + let log_0 = &result_logs_data.resource_logs[0].scope_logs[0].log_records[0]; + assert_eq!( + log_0.attributes, + vec![KeyValue::new( + "str_attr", + AnyValue::new_string("aaf4c61ddcc5e8a2dabede0f3b482cd9aea9434d") + )] + ); + } + + #[tokio::test] + async fn test_update_attr_to_sha1_hash_function_call_result_opl_parser() { + test_update_attr_to_sha1_hash_function_call_result::().await + } + + #[tokio::test] + async fn test_update_attr_to_sha1_hash_function_call_result_kql_parser() { + test_update_attr_to_sha1_hash_function_call_result::().await + } + + async fn test_update_attr_to_sha512_hash_function_call_result() { + let logs_data = to_logs_data(vec![ + LogRecord::build() + .attributes(vec![KeyValue::new( + "str_attr", + AnyValue::new_string("hello"), + )]) + .finish(), + ]); + + let query = r#"logs | extend attributes["str_attr"] = encode(sha512(attributes["str_attr"]), "hex")"#; + let pipeline_expr = P::parse_with_options(query, default_parser_options()) + .unwrap() + .pipeline; + let mut pipeline = Pipeline::new(pipeline_expr); + + let input = otlp_to_otap(&OtlpProtoMessage::Logs(logs_data)); + let result = pipeline.execute(input).await.unwrap(); + let OtlpProtoMessage::Logs(result_logs_data) = otap_to_otlp(&result) else { + panic!("invalid signal type"); + }; + let log_0 = &result_logs_data.resource_logs[0].scope_logs[0].log_records[0]; + assert_eq!( + log_0.attributes, + vec![KeyValue::new( + "str_attr", + AnyValue::new_string( + "9b71d224bd62f3785d96d46ad3ea3d73319bfbc2890caadae2dff72519673ca72323c3d99ba5c11d7c7acc6e14b8c5da0c4663475c2e5c3adef46f73bcdec043" + ) + )] + ); + } + + #[tokio::test] + async fn test_update_attr_to_sha512_hash_function_call_result_opl_parser() { + test_update_attr_to_sha512_hash_function_call_result::().await + } + + #[tokio::test] + async fn test_update_attr_to_sha512_hash_function_call_result_kql_parser() { + test_update_attr_to_sha512_hash_function_call_result::().await + } + async fn test_update_attr_to_substring_function_call_result() { let logs_data = to_logs_data(vec![ LogRecord::build() diff --git a/rust/otap-dataflow/crates/query-engine/src/pipeline/expr.rs b/rust/otap-dataflow/crates/query-engine/src/pipeline/expr.rs index e56a94fb67..48f2b86d47 100644 --- a/rust/otap-dataflow/crates/query-engine/src/pipeline/expr.rs +++ b/rust/otap-dataflow/crates/query-engine/src/pipeline/expr.rs @@ -45,7 +45,6 @@ use std::ops::Deref; use std::rc::Rc; use std::sync::{Arc, LazyLock}; -use crate::pipeline::functions::{fnv_hash, murmur3_hash}; use arrow::array::{Array, ArrayRef, RecordBatch, StringArray, StructArray, UInt16Array}; use arrow::compute::filter_record_batch; use arrow::compute::kernels::cmp::eq; @@ -60,7 +59,7 @@ use data_engine_expressions::{ }; use datafusion::common::DFSchema; use datafusion::functions::core::expr_ext::FieldAccessor; -use datafusion::functions::crypto::{md5, sha256}; +use datafusion::functions::crypto::{md5, sha256, sha512}; use datafusion::functions::encoding::encode; use datafusion::functions::string::{concat, concat_ws, replace}; use datafusion::logical_expr::expr::ScalarFunction; @@ -79,14 +78,16 @@ use otap_df_pdata::schema::consts; use crate::consts::{ ENCODE_FUNC_NAME, FNV_FUNC_NAME, MD5_FUNC_NAME, MURMUR3_FUNC_NAME, REGEXP_SUBSTR_FUNC_NAME, - SHA256_FUNC_NAME, + SHA1_FUNC_NAME, SHA256_FUNC_NAME, SHA512_FUNC_NAME, }; use crate::error::{Error, Result}; use crate::pipeline::expr::join::{join, multi_join}; use crate::pipeline::expr::types::{ ExprLogicalType, coerce_arithmetic, nested_struct_field_type, root_field_type, }; -use crate::pipeline::functions::{arity_range, regexp_substr, substring}; +use crate::pipeline::functions::{ + arity_range, fnv_hash, murmur3_hash, regexp_substr, sha1_hash, substring, +}; use crate::pipeline::planner::{AttributesIdentifier, ColumnAccessor}; use crate::pipeline::project::anyval::{ find_any_value_columns, project_any_value_columns, stitch_partitioned_results, @@ -834,6 +835,8 @@ impl DataFusionFunctionDef { MD5_FUNC_NAME => Self::new(md5(), ExprLogicalType::Binary, true, None), FNV_FUNC_NAME => Self::new(fnv_hash(), ExprLogicalType::Int64, true, None), MURMUR3_FUNC_NAME => Self::new(murmur3_hash(), ExprLogicalType::Int64, true, None), + SHA1_FUNC_NAME => Self::new(sha1_hash(), ExprLogicalType::Binary, true, None), + SHA512_FUNC_NAME => Self::new(sha512(), ExprLogicalType::Binary, true, None), _ => return None, }) } diff --git a/rust/otap-dataflow/crates/query-engine/src/pipeline/functions.rs b/rust/otap-dataflow/crates/query-engine/src/pipeline/functions.rs index ac57a19b04..f60512c152 100644 --- a/rust/otap-dataflow/crates/query-engine/src/pipeline/functions.rs +++ b/rust/otap-dataflow/crates/query-engine/src/pipeline/functions.rs @@ -17,11 +17,13 @@ mod fnv; pub(crate) mod is_type; mod murmur3; mod regexp_substr; +mod sha1; mod substring; make_udf_function!(contains::ExtendedContainsFunc, contains); make_udf_function!(fnv::FnvHashFunc, fnv_hash); make_udf_function!(murmur3::Murmur3HashFunc, murmur3_hash); +make_udf_function!(sha1::Sha1Func, sha1_hash); make_udf_function!(substring::SubstringFunc, substring); make_udf_function!(regexp_substr::RegexpSubstrFunc, regexp_substr); diff --git a/rust/otap-dataflow/crates/query-engine/src/pipeline/functions/fnv.rs b/rust/otap-dataflow/crates/query-engine/src/pipeline/functions/fnv.rs index d0eaf3ffca..996b68b9e1 100644 --- a/rust/otap-dataflow/crates/query-engine/src/pipeline/functions/fnv.rs +++ b/rust/otap-dataflow/crates/query-engine/src/pipeline/functions/fnv.rs @@ -88,6 +88,8 @@ impl ScalarUDFImpl for FnvHashFunc { } } +/// computes the FNV-1a 64-bit hash of a scalar value +/// returns None if the input is null, or an error if the input type is not supported. fn hash_scalar(scalar: &ScalarValue) -> Result> { match scalar { ScalarValue::Utf8(v) | ScalarValue::LargeUtf8(v) => { @@ -99,6 +101,9 @@ fn hash_scalar(scalar: &ScalarValue) -> Result> { } } +/// computes the FNV-1a 64-bit hash of each element in an array +/// null elements produce null output values. +/// supports [DataType::Utf8], [DataType::LargeUtf8], and [DataType::Binary] input arrays. fn hash_array(arr: &dyn Array) -> Result { match arr.data_type() { DataType::Utf8 => { @@ -211,7 +216,7 @@ mod test { let ColumnarValue::Array(arr) = result else { panic!("expected array"); }; - let arr = arr.as_any().downcast_ref::().unwrap(); + let arr = arr.as_any().downcast_ref::().expect("Int64"); assert_eq!(arr.value(0), -6615550055289275125_i64); assert!(!arr.is_null(0)); assert!(!arr.is_null(1)); diff --git a/rust/otap-dataflow/crates/query-engine/src/pipeline/functions/sha1.rs b/rust/otap-dataflow/crates/query-engine/src/pipeline/functions/sha1.rs new file mode 100644 index 0000000000..e743c07712 --- /dev/null +++ b/rust/otap-dataflow/crates/query-engine/src/pipeline/functions/sha1.rs @@ -0,0 +1,210 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +use std::any::Any; +use std::sync::Arc; + +use arrow::array::{Array, ArrayRef, BinaryArray, BinaryBuilder, LargeStringArray, StringArray}; +use arrow::datatypes::DataType; +use datafusion::common::exec_err; +use datafusion::error::Result; +use datafusion::logical_expr::{ + ColumnarValue, Documentation, ScalarFunctionArgs, ScalarUDFImpl, Signature, Volatility, +}; +use datafusion::scalar::ScalarValue; +use sha1::Digest; + +fn sha1_bytes(data: &[u8]) -> Vec { + sha1::Sha1::digest(data).to_vec() +} + +#[derive(Debug, PartialEq, Eq, Hash)] +pub struct Sha1Func { + signature: Signature, +} + +impl Default for Sha1Func { + fn default() -> Self { + Self::new() + } +} + +impl Sha1Func { + pub fn new() -> Self { + Self { + signature: Signature::any(1, Volatility::Immutable), + } + } +} + +impl ScalarUDFImpl for Sha1Func { + fn as_any(&self) -> &dyn Any { + self + } + + fn name(&self) -> &str { + "sha1" + } + + fn signature(&self) -> &Signature { + &self.signature + } + + fn return_type(&self, _: &[DataType]) -> Result { + Ok(DataType::Binary) + } + + fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result { + let args = args.args; + if args.len() != 1 { + return exec_err!( + "invalid number of args. {} expected 1 arg, found {}", + self.name(), + args.len() + ); + } + + match &args[0] { + ColumnarValue::Scalar(scalar) => { + let hash = hash_scalar(scalar)?; + Ok(ColumnarValue::Scalar(ScalarValue::Binary(hash))) + } + ColumnarValue::Array(arr) => { + let hashes = hash_array(arr.as_ref())?; + Ok(ColumnarValue::Array(Arc::new(hashes) as ArrayRef)) + } + } + } + + fn documentation(&self) -> Option<&Documentation> { + None + } +} + +fn hash_scalar(scalar: &ScalarValue) -> Result>> { + match scalar { + ScalarValue::Utf8(v) | ScalarValue::LargeUtf8(v) => { + Ok(v.as_deref().map(|s| sha1_bytes(s.as_bytes()))) + } + ScalarValue::Utf8View(v) => Ok(v.as_deref().map(|s| sha1_bytes(s.as_bytes()))), + ScalarValue::Binary(v) | ScalarValue::LargeBinary(v) => { + Ok(v.as_ref().map(|b| sha1_bytes(b))) + } + other => exec_err!("sha1: unsupported scalar type {:?}", other.data_type()), + } +} + +fn hash_array(arr: &dyn Array) -> Result { + let len = arr.len(); + let mut builder = BinaryBuilder::with_capacity(len, len * 20); + + match arr.data_type() { + DataType::Utf8 => { + let arr = arr.as_any().downcast_ref::().expect("Utf8"); + for v in arr.iter() { + match v { + Some(s) => builder.append_value(sha1_bytes(s.as_bytes())), + None => builder.append_null(), + } + } + } + DataType::LargeUtf8 => { + let arr = arr + .as_any() + .downcast_ref::() + .expect("LargeUtf8"); + for v in arr.iter() { + match v { + Some(s) => builder.append_value(sha1_bytes(s.as_bytes())), + None => builder.append_null(), + } + } + } + DataType::Binary => { + let arr = arr.as_any().downcast_ref::().expect("Binary"); + for v in arr.iter() { + match v { + Some(b) => builder.append_value(sha1_bytes(b)), + None => builder.append_null(), + } + } + } + other => return exec_err!("sha1: unsupported array type {:?}", other), + } + + Ok(builder.finish()) +} + +#[cfg(test)] +mod test { + use super::*; + use arrow::datatypes::Field; + use datafusion::common::config::ConfigOptions; + use datafusion::logical_expr::{ColumnarValue, ScalarFunctionArgs}; + use datafusion::scalar::ScalarValue; + use std::sync::Arc; + + fn invoke(arg: ColumnarValue) -> Result> { + let func = Sha1Func::new(); + let result = func.invoke_with_args(ScalarFunctionArgs { + args: vec![arg], + arg_fields: vec![], + number_rows: 1, + return_field: Field::new("", DataType::Binary, true).into(), + config_options: Arc::new(ConfigOptions::default()), + })?; + match result { + ColumnarValue::Scalar(ScalarValue::Binary(Some(v))) => Ok(v), + other => panic!("expected Scalar(Binary(Some(_))), got {:?}", other), + } + } + + #[test] + fn test_sha1_string() { + let v = invoke(ColumnarValue::Scalar(ScalarValue::Utf8(Some( + "hello".to_string(), + )))) + .unwrap(); + // sha1("hello") = aaf4c61ddcc5e8a2dabede0f3b482cd9aea9434d + assert_eq!( + v, + vec![ + 0xaa, 0xf4, 0xc6, 0x1d, 0xdc, 0xc5, 0xe8, 0xa2, 0xda, 0xbe, 0xde, 0x0f, 0x3b, 0x48, + 0x2c, 0xd9, 0xae, 0xa9, 0x43, 0x4d + ] + ); + } + + #[test] + fn test_sha1_binary_matches_string() { + let v = invoke(ColumnarValue::Scalar(ScalarValue::Binary(Some( + b"hello".to_vec(), + )))) + .unwrap(); + assert_eq!( + v, + vec![ + 0xaa, 0xf4, 0xc6, 0x1d, 0xdc, 0xc5, 0xe8, 0xa2, 0xda, 0xbe, 0xde, 0x0f, 0x3b, 0x48, + 0x2c, 0xd9, 0xae, 0xa9, 0x43, 0x4d + ] + ); + } + + #[test] + fn test_sha1_null_returns_none() { + let func = Sha1Func::new(); + let result = func + .invoke_with_args(ScalarFunctionArgs { + args: vec![ColumnarValue::Scalar(ScalarValue::Utf8(None))], + arg_fields: vec![], + number_rows: 1, + return_field: Field::new("", DataType::Binary, true).into(), + config_options: Arc::new(ConfigOptions::default()), + }) + .unwrap(); + match result { + ColumnarValue::Scalar(ScalarValue::Binary(None)) => {} + other => panic!("expected Scalar(Binary(None)), got {:?}", other), + } + } +} From 3cc8c6f621cc6100f514ab671995473e8baee775 Mon Sep 17 00:00:00 2001 From: SzymonIwaniuk Date: Thu, 7 May 2026 10:22:01 +0200 Subject: [PATCH 5/7] feat(query-engine): add xxh3 hash function --- .../crates/query-engine/Cargo.toml | 1 + .../crates/query-engine/src/consts.rs | 1 + .../crates/query-engine/src/parser.rs | 3 +- .../query-engine/src/pipeline/assign.rs | 41 ++++ .../crates/query-engine/src/pipeline/expr.rs | 5 +- .../query-engine/src/pipeline/functions.rs | 2 + .../src/pipeline/functions/xxh3.rs | 178 ++++++++++++++++++ 7 files changed, 228 insertions(+), 3 deletions(-) create mode 100644 rust/otap-dataflow/crates/query-engine/src/pipeline/functions/xxh3.rs diff --git a/rust/otap-dataflow/crates/query-engine/Cargo.toml b/rust/otap-dataflow/crates/query-engine/Cargo.toml index 9c52c59594..5b20435926 100644 --- a/rust/otap-dataflow/crates/query-engine/Cargo.toml +++ b/rust/otap-dataflow/crates/query-engine/Cargo.toml @@ -18,6 +18,7 @@ datafusion = { workspace = true, features = [ futures-core = { workspace = true } memchr = { workspace = true } sha1 = { workspace = true } +xxhash-rust = { workspace = true } parking_lot = { workspace = true } smallvec = { workspace = true } thiserror = { workspace = true } diff --git a/rust/otap-dataflow/crates/query-engine/src/consts.rs b/rust/otap-dataflow/crates/query-engine/src/consts.rs index e0e82dc3c9..eb1e1f4e42 100644 --- a/rust/otap-dataflow/crates/query-engine/src/consts.rs +++ b/rust/otap-dataflow/crates/query-engine/src/consts.rs @@ -15,3 +15,4 @@ pub(crate) const FNV_FUNC_NAME: &str = "fnv"; pub(crate) const MURMUR3_FUNC_NAME: &str = "murmur3"; pub(crate) const SHA1_FUNC_NAME: &str = "sha1"; pub(crate) const SHA512_FUNC_NAME: &str = "sha512"; +pub(crate) const XXH3_FUNC_NAME: &str = "xxh3"; diff --git a/rust/otap-dataflow/crates/query-engine/src/parser.rs b/rust/otap-dataflow/crates/query-engine/src/parser.rs index 7a5661d340..5bc3fc5b7b 100644 --- a/rust/otap-dataflow/crates/query-engine/src/parser.rs +++ b/rust/otap-dataflow/crates/query-engine/src/parser.rs @@ -12,7 +12,7 @@ use data_engine_parser_abstractions::ParserOptions; use crate::consts::{ ENCODE_FUNC_NAME, FNV_FUNC_NAME, MD5_FUNC_NAME, MURMUR3_FUNC_NAME, REGEXP_SUBSTR_FUNC_NAME, - SHA1_FUNC_NAME, SHA256_FUNC_NAME, SHA512_FUNC_NAME, + SHA1_FUNC_NAME, SHA256_FUNC_NAME, SHA512_FUNC_NAME, XXH3_FUNC_NAME, }; /// Create parser options that can be used when parsing an expression that will be executed with @@ -39,6 +39,7 @@ pub fn default_parser_options() -> ParserOptions { .with_external_function(MURMUR3_FUNC_NAME, param_placeholders(1), None) .with_external_function(SHA1_FUNC_NAME, param_placeholders(1), None) .with_external_function(SHA512_FUNC_NAME, param_placeholders(1), None) + .with_external_function(XXH3_FUNC_NAME, param_placeholders(1), None) .with_external_function(ENCODE_FUNC_NAME, param_placeholders(2), None) .with_external_function( REGEXP_SUBSTR_FUNC_NAME, diff --git a/rust/otap-dataflow/crates/query-engine/src/pipeline/assign.rs b/rust/otap-dataflow/crates/query-engine/src/pipeline/assign.rs index aa704018e1..3b16a36104 100644 --- a/rust/otap-dataflow/crates/query-engine/src/pipeline/assign.rs +++ b/rust/otap-dataflow/crates/query-engine/src/pipeline/assign.rs @@ -5023,6 +5023,47 @@ mod test { test_update_attr_to_sha512_hash_function_call_result::().await } + async fn test_update_attr_to_xxh3_hash_function_call_result() { + let logs_data = to_logs_data(vec![ + LogRecord::build() + .attributes(vec![KeyValue::new( + "str_attr", + AnyValue::new_string("hello"), + )]) + .finish(), + ]); + + let query = r#"logs | extend attributes["str_attr"] = xxh3(attributes["str_attr"])"#; + let pipeline_expr = P::parse_with_options(query, default_parser_options()) + .unwrap() + .pipeline; + let mut pipeline = Pipeline::new(pipeline_expr); + + let input = otlp_to_otap(&OtlpProtoMessage::Logs(logs_data)); + let result = pipeline.execute(input).await.unwrap(); + let OtlpProtoMessage::Logs(result_logs_data) = otap_to_otlp(&result) else { + panic!("invalid signal type"); + }; + let log_0 = &result_logs_data.resource_logs[0].scope_logs[0].log_records[0]; + assert_eq!( + log_0.attributes, + vec![KeyValue::new( + "str_attr", + AnyValue::new_int(-7685981735718036227_i64) + )] + ); + } + + #[tokio::test] + async fn test_update_attr_to_xxh3_hash_function_call_result_opl_parser() { + test_update_attr_to_xxh3_hash_function_call_result::().await + } + + #[tokio::test] + async fn test_update_attr_to_xxh3_hash_function_call_result_kql_parser() { + test_update_attr_to_xxh3_hash_function_call_result::().await + } + async fn test_update_attr_to_substring_function_call_result() { let logs_data = to_logs_data(vec![ LogRecord::build() diff --git a/rust/otap-dataflow/crates/query-engine/src/pipeline/expr.rs b/rust/otap-dataflow/crates/query-engine/src/pipeline/expr.rs index 48f2b86d47..028e05af0d 100644 --- a/rust/otap-dataflow/crates/query-engine/src/pipeline/expr.rs +++ b/rust/otap-dataflow/crates/query-engine/src/pipeline/expr.rs @@ -78,7 +78,7 @@ use otap_df_pdata::schema::consts; use crate::consts::{ ENCODE_FUNC_NAME, FNV_FUNC_NAME, MD5_FUNC_NAME, MURMUR3_FUNC_NAME, REGEXP_SUBSTR_FUNC_NAME, - SHA1_FUNC_NAME, SHA256_FUNC_NAME, SHA512_FUNC_NAME, + SHA1_FUNC_NAME, SHA256_FUNC_NAME, SHA512_FUNC_NAME, XXH3_FUNC_NAME, }; use crate::error::{Error, Result}; use crate::pipeline::expr::join::{join, multi_join}; @@ -86,7 +86,7 @@ use crate::pipeline::expr::types::{ ExprLogicalType, coerce_arithmetic, nested_struct_field_type, root_field_type, }; use crate::pipeline::functions::{ - arity_range, fnv_hash, murmur3_hash, regexp_substr, sha1_hash, substring, + arity_range, fnv_hash, murmur3_hash, regexp_substr, sha1_hash, substring, xxh3_hash, }; use crate::pipeline::planner::{AttributesIdentifier, ColumnAccessor}; use crate::pipeline::project::anyval::{ @@ -837,6 +837,7 @@ impl DataFusionFunctionDef { MURMUR3_FUNC_NAME => Self::new(murmur3_hash(), ExprLogicalType::Int64, true, None), SHA1_FUNC_NAME => Self::new(sha1_hash(), ExprLogicalType::Binary, true, None), SHA512_FUNC_NAME => Self::new(sha512(), ExprLogicalType::Binary, true, None), + XXH3_FUNC_NAME => Self::new(xxh3_hash(), ExprLogicalType::Int64, true, None), _ => return None, }) } diff --git a/rust/otap-dataflow/crates/query-engine/src/pipeline/functions.rs b/rust/otap-dataflow/crates/query-engine/src/pipeline/functions.rs index f60512c152..8073098e2c 100644 --- a/rust/otap-dataflow/crates/query-engine/src/pipeline/functions.rs +++ b/rust/otap-dataflow/crates/query-engine/src/pipeline/functions.rs @@ -19,11 +19,13 @@ mod murmur3; mod regexp_substr; mod sha1; mod substring; +mod xxh3; make_udf_function!(contains::ExtendedContainsFunc, contains); make_udf_function!(fnv::FnvHashFunc, fnv_hash); make_udf_function!(murmur3::Murmur3HashFunc, murmur3_hash); make_udf_function!(sha1::Sha1Func, sha1_hash); +make_udf_function!(xxh3::Xxh3Func, xxh3_hash); make_udf_function!(substring::SubstringFunc, substring); make_udf_function!(regexp_substr::RegexpSubstrFunc, regexp_substr); diff --git a/rust/otap-dataflow/crates/query-engine/src/pipeline/functions/xxh3.rs b/rust/otap-dataflow/crates/query-engine/src/pipeline/functions/xxh3.rs new file mode 100644 index 0000000000..4d4521bf43 --- /dev/null +++ b/rust/otap-dataflow/crates/query-engine/src/pipeline/functions/xxh3.rs @@ -0,0 +1,178 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +use std::any::Any; +use std::sync::Arc; + +use arrow::array::{Array, ArrayRef, Int64Array, LargeStringArray, StringArray}; +use arrow::datatypes::DataType; +use datafusion::common::exec_err; +use datafusion::error::Result; +use datafusion::logical_expr::{ + ColumnarValue, Documentation, ScalarFunctionArgs, ScalarUDFImpl, Signature, Volatility, +}; +use datafusion::scalar::ScalarValue; +use xxhash_rust::xxh3::xxh3_64; + +#[derive(Debug, PartialEq, Eq, Hash)] +pub struct Xxh3Func { + signature: Signature, +} + +impl Default for Xxh3Func { + fn default() -> Self { + Self::new() + } +} + +impl Xxh3Func { + pub fn new() -> Self { + Self { + signature: Signature::any(1, Volatility::Immutable), + } + } +} + +impl ScalarUDFImpl for Xxh3Func { + fn as_any(&self) -> &dyn Any { + self + } + + fn name(&self) -> &str { + "xxh3" + } + + fn signature(&self) -> &Signature { + &self.signature + } + + fn return_type(&self, _: &[DataType]) -> Result { + Ok(DataType::Int64) + } + + fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result { + let args = args.args; + if args.len() != 1 { + return exec_err!( + "invalid number of args. {} expected 1 arg, found {}", + self.name(), + args.len() + ); + } + + match &args[0] { + ColumnarValue::Scalar(scalar) => { + let hash = hash_scalar(scalar)?; + Ok(ColumnarValue::Scalar(ScalarValue::Int64(hash))) + } + ColumnarValue::Array(arr) => { + let hashes = hash_array(arr.as_ref())?; + Ok(ColumnarValue::Array(Arc::new(hashes) as ArrayRef)) + } + } + } + + fn documentation(&self) -> Option<&Documentation> { + None + } +} + +fn hash_scalar(scalar: &ScalarValue) -> Result> { + match scalar { + ScalarValue::Utf8(v) | ScalarValue::LargeUtf8(v) => { + Ok(v.as_deref().map(|s| xxh3_64(s.as_bytes()) as i64)) + } + ScalarValue::Utf8View(v) => Ok(v.as_deref().map(|s| xxh3_64(s.as_bytes()) as i64)), + ScalarValue::Binary(v) | ScalarValue::LargeBinary(v) => { + Ok(v.as_ref().map(|b| xxh3_64(b) as i64)) + } + other => exec_err!("xxh3: unsupported scalar type {:?}", other.data_type()), + } +} + +fn hash_array(arr: &dyn Array) -> Result { + match arr.data_type() { + DataType::Utf8 => { + let arr = arr.as_any().downcast_ref::().expect("Utf8"); + Ok(Int64Array::from_iter( + arr.iter().map(|v| v.map(|s| xxh3_64(s.as_bytes()) as i64)), + )) + } + DataType::LargeUtf8 => { + let arr = arr + .as_any() + .downcast_ref::() + .expect("LargeUtf8"); + Ok(Int64Array::from_iter( + arr.iter().map(|v| v.map(|s| xxh3_64(s.as_bytes()) as i64)), + )) + } + DataType::Binary => { + let arr = arr + .as_any() + .downcast_ref::() + .expect("Binary"); + Ok(Int64Array::from_iter( + arr.iter().map(|v| v.map(|b| xxh3_64(b) as i64)), + )) + } + other => exec_err!("xxh3: unsupported array type {:?}", other), + } +} + +#[cfg(test)] +mod test { + use super::*; + use arrow::datatypes::Field; + use datafusion::common::config::ConfigOptions; + use datafusion::logical_expr::{ColumnarValue, ScalarFunctionArgs}; + use datafusion::scalar::ScalarValue; + use std::sync::Arc; + + fn invoke(arg: ColumnarValue) -> Result { + Xxh3Func::new().invoke_with_args(ScalarFunctionArgs { + args: vec![arg], + arg_fields: vec![], + number_rows: 1, + return_field: Field::new("", DataType::Int64, true).into(), + config_options: Arc::new(ConfigOptions::default()), + }) + } + + #[test] + fn test_xxh3_string() { + let result = invoke(ColumnarValue::Scalar(ScalarValue::Utf8(Some( + "hello".to_string(), + )))) + .unwrap(); + match result { + ColumnarValue::Scalar(ScalarValue::Int64(Some(v))) => { + assert_eq!(v, xxh3_64(b"hello") as i64); + } + other => panic!("expected Scalar(Int64(Some(_))), got {:?}", other), + } + } + + #[test] + fn test_xxh3_binary_matches_string() { + let result = invoke(ColumnarValue::Scalar(ScalarValue::Binary(Some( + b"hello".to_vec(), + )))) + .unwrap(); + match result { + ColumnarValue::Scalar(ScalarValue::Int64(Some(v))) => { + assert_eq!(v, xxh3_64(b"hello") as i64); + } + other => panic!("expected Scalar(Int64(Some(_))), got {:?}", other), + } + } + + #[test] + fn test_xxh3_null() { + let result = invoke(ColumnarValue::Scalar(ScalarValue::Utf8(None))).unwrap(); + match result { + ColumnarValue::Scalar(ScalarValue::Int64(None)) => {} + other => panic!("expected Scalar(Int64(None)), got {:?}", other), + } + } +} From ff4d51124291932e86e6a1dd89d84205e7bf1ccf Mon Sep 17 00:00:00 2001 From: SzymonIwaniuk Date: Thu, 7 May 2026 10:29:39 +0200 Subject: [PATCH 6/7] feat(query-engine): add xxh128 hash function --- .../crates/query-engine/src/consts.rs | 1 + .../crates/query-engine/src/parser.rs | 3 +- .../query-engine/src/pipeline/assign.rs | 41 ++++ .../crates/query-engine/src/pipeline/expr.rs | 4 +- .../query-engine/src/pipeline/functions.rs | 2 + .../src/pipeline/functions/xxh128.rs | 194 ++++++++++++++++++ 6 files changed, 243 insertions(+), 2 deletions(-) create mode 100644 rust/otap-dataflow/crates/query-engine/src/pipeline/functions/xxh128.rs diff --git a/rust/otap-dataflow/crates/query-engine/src/consts.rs b/rust/otap-dataflow/crates/query-engine/src/consts.rs index eb1e1f4e42..1b880403f2 100644 --- a/rust/otap-dataflow/crates/query-engine/src/consts.rs +++ b/rust/otap-dataflow/crates/query-engine/src/consts.rs @@ -16,3 +16,4 @@ pub(crate) const MURMUR3_FUNC_NAME: &str = "murmur3"; pub(crate) const SHA1_FUNC_NAME: &str = "sha1"; pub(crate) const SHA512_FUNC_NAME: &str = "sha512"; pub(crate) const XXH3_FUNC_NAME: &str = "xxh3"; +pub(crate) const XXH128_FUNC_NAME: &str = "xxh128"; diff --git a/rust/otap-dataflow/crates/query-engine/src/parser.rs b/rust/otap-dataflow/crates/query-engine/src/parser.rs index 5bc3fc5b7b..b7ff674ea5 100644 --- a/rust/otap-dataflow/crates/query-engine/src/parser.rs +++ b/rust/otap-dataflow/crates/query-engine/src/parser.rs @@ -12,7 +12,7 @@ use data_engine_parser_abstractions::ParserOptions; use crate::consts::{ ENCODE_FUNC_NAME, FNV_FUNC_NAME, MD5_FUNC_NAME, MURMUR3_FUNC_NAME, REGEXP_SUBSTR_FUNC_NAME, - SHA1_FUNC_NAME, SHA256_FUNC_NAME, SHA512_FUNC_NAME, XXH3_FUNC_NAME, + SHA1_FUNC_NAME, SHA256_FUNC_NAME, SHA512_FUNC_NAME, XXH3_FUNC_NAME, XXH128_FUNC_NAME, }; /// Create parser options that can be used when parsing an expression that will be executed with @@ -40,6 +40,7 @@ pub fn default_parser_options() -> ParserOptions { .with_external_function(SHA1_FUNC_NAME, param_placeholders(1), None) .with_external_function(SHA512_FUNC_NAME, param_placeholders(1), None) .with_external_function(XXH3_FUNC_NAME, param_placeholders(1), None) + .with_external_function(XXH128_FUNC_NAME, param_placeholders(1), None) .with_external_function(ENCODE_FUNC_NAME, param_placeholders(2), None) .with_external_function( REGEXP_SUBSTR_FUNC_NAME, diff --git a/rust/otap-dataflow/crates/query-engine/src/pipeline/assign.rs b/rust/otap-dataflow/crates/query-engine/src/pipeline/assign.rs index 3b16a36104..1ca7c009d9 100644 --- a/rust/otap-dataflow/crates/query-engine/src/pipeline/assign.rs +++ b/rust/otap-dataflow/crates/query-engine/src/pipeline/assign.rs @@ -5064,6 +5064,47 @@ mod test { test_update_attr_to_xxh3_hash_function_call_result::().await } + async fn test_update_attr_to_xxh128_hash_function_call_result() { + let logs_data = to_logs_data(vec![ + LogRecord::build() + .attributes(vec![KeyValue::new( + "str_attr", + AnyValue::new_string("hello"), + )]) + .finish(), + ]); + + let query = r#"logs | extend attributes["str_attr"] = encode(xxh128(attributes["str_attr"]), "hex")"#; + let pipeline_expr = P::parse_with_options(query, default_parser_options()) + .unwrap() + .pipeline; + let mut pipeline = Pipeline::new(pipeline_expr); + + let input = otlp_to_otap(&OtlpProtoMessage::Logs(logs_data)); + let result = pipeline.execute(input).await.unwrap(); + let OtlpProtoMessage::Logs(result_logs_data) = otap_to_otlp(&result) else { + panic!("invalid signal type"); + }; + let log_0 = &result_logs_data.resource_logs[0].scope_logs[0].log_records[0]; + assert_eq!( + log_0.attributes, + vec![KeyValue::new( + "str_attr", + AnyValue::new_string("b5e9c1ad071b3e7fc779cfaa5e523818") + )] + ); + } + + #[tokio::test] + async fn test_update_attr_to_xxh128_hash_function_call_result_opl_parser() { + test_update_attr_to_xxh128_hash_function_call_result::().await + } + + #[tokio::test] + async fn test_update_attr_to_xxh128_hash_function_call_result_kql_parser() { + test_update_attr_to_xxh128_hash_function_call_result::().await + } + async fn test_update_attr_to_substring_function_call_result() { let logs_data = to_logs_data(vec![ LogRecord::build() diff --git a/rust/otap-dataflow/crates/query-engine/src/pipeline/expr.rs b/rust/otap-dataflow/crates/query-engine/src/pipeline/expr.rs index 028e05af0d..dd58a46857 100644 --- a/rust/otap-dataflow/crates/query-engine/src/pipeline/expr.rs +++ b/rust/otap-dataflow/crates/query-engine/src/pipeline/expr.rs @@ -78,7 +78,7 @@ use otap_df_pdata::schema::consts; use crate::consts::{ ENCODE_FUNC_NAME, FNV_FUNC_NAME, MD5_FUNC_NAME, MURMUR3_FUNC_NAME, REGEXP_SUBSTR_FUNC_NAME, - SHA1_FUNC_NAME, SHA256_FUNC_NAME, SHA512_FUNC_NAME, XXH3_FUNC_NAME, + SHA1_FUNC_NAME, SHA256_FUNC_NAME, SHA512_FUNC_NAME, XXH3_FUNC_NAME, XXH128_FUNC_NAME, }; use crate::error::{Error, Result}; use crate::pipeline::expr::join::{join, multi_join}; @@ -87,6 +87,7 @@ use crate::pipeline::expr::types::{ }; use crate::pipeline::functions::{ arity_range, fnv_hash, murmur3_hash, regexp_substr, sha1_hash, substring, xxh3_hash, + xxh128_hash, }; use crate::pipeline::planner::{AttributesIdentifier, ColumnAccessor}; use crate::pipeline::project::anyval::{ @@ -838,6 +839,7 @@ impl DataFusionFunctionDef { SHA1_FUNC_NAME => Self::new(sha1_hash(), ExprLogicalType::Binary, true, None), SHA512_FUNC_NAME => Self::new(sha512(), ExprLogicalType::Binary, true, None), XXH3_FUNC_NAME => Self::new(xxh3_hash(), ExprLogicalType::Int64, true, None), + XXH128_FUNC_NAME => Self::new(xxh128_hash(), ExprLogicalType::Binary, true, None), _ => return None, }) } diff --git a/rust/otap-dataflow/crates/query-engine/src/pipeline/functions.rs b/rust/otap-dataflow/crates/query-engine/src/pipeline/functions.rs index 8073098e2c..36f527a411 100644 --- a/rust/otap-dataflow/crates/query-engine/src/pipeline/functions.rs +++ b/rust/otap-dataflow/crates/query-engine/src/pipeline/functions.rs @@ -19,12 +19,14 @@ mod murmur3; mod regexp_substr; mod sha1; mod substring; +mod xxh128; mod xxh3; make_udf_function!(contains::ExtendedContainsFunc, contains); make_udf_function!(fnv::FnvHashFunc, fnv_hash); make_udf_function!(murmur3::Murmur3HashFunc, murmur3_hash); make_udf_function!(sha1::Sha1Func, sha1_hash); +make_udf_function!(xxh128::Xxh128Func, xxh128_hash); make_udf_function!(xxh3::Xxh3Func, xxh3_hash); make_udf_function!(substring::SubstringFunc, substring); make_udf_function!(regexp_substr::RegexpSubstrFunc, regexp_substr); diff --git a/rust/otap-dataflow/crates/query-engine/src/pipeline/functions/xxh128.rs b/rust/otap-dataflow/crates/query-engine/src/pipeline/functions/xxh128.rs new file mode 100644 index 0000000000..8111fc11e4 --- /dev/null +++ b/rust/otap-dataflow/crates/query-engine/src/pipeline/functions/xxh128.rs @@ -0,0 +1,194 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +use std::any::Any; +use std::sync::Arc; + +use arrow::array::{Array, ArrayRef, BinaryArray, BinaryBuilder, LargeStringArray, StringArray}; +use arrow::datatypes::DataType; +use datafusion::common::exec_err; +use datafusion::error::Result; +use datafusion::logical_expr::{ + ColumnarValue, Documentation, ScalarFunctionArgs, ScalarUDFImpl, Signature, Volatility, +}; +use datafusion::scalar::ScalarValue; +use xxhash_rust::xxh3::xxh3_128; + +#[derive(Debug, PartialEq, Eq, Hash)] +pub struct Xxh128Func { + signature: Signature, +} + +impl Default for Xxh128Func { + fn default() -> Self { + Self::new() + } +} + +impl Xxh128Func { + pub fn new() -> Self { + Self { + signature: Signature::any(1, Volatility::Immutable), + } + } +} + +impl ScalarUDFImpl for Xxh128Func { + fn as_any(&self) -> &dyn Any { + self + } + + fn name(&self) -> &str { + "xxh128" + } + + fn signature(&self) -> &Signature { + &self.signature + } + + fn return_type(&self, _: &[DataType]) -> Result { + Ok(DataType::Binary) + } + + fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result { + let args = args.args; + if args.len() != 1 { + return exec_err!( + "invalid number of args. {} expected 1 arg, found {}", + self.name(), + args.len() + ); + } + + match &args[0] { + ColumnarValue::Scalar(scalar) => { + let hash = hash_scalar(scalar)?; + Ok(ColumnarValue::Scalar(ScalarValue::Binary(hash))) + } + ColumnarValue::Array(arr) => { + let hashes = hash_array(arr.as_ref())?; + Ok(ColumnarValue::Array(Arc::new(hashes) as ArrayRef)) + } + } + } + + fn documentation(&self) -> Option<&Documentation> { + None + } +} + +fn to_bytes(hash: u128) -> Vec { + hash.to_be_bytes().to_vec() +} + +fn hash_scalar(scalar: &ScalarValue) -> Result>> { + match scalar { + ScalarValue::Utf8(v) | ScalarValue::LargeUtf8(v) => { + Ok(v.as_deref().map(|s| to_bytes(xxh3_128(s.as_bytes())))) + } + ScalarValue::Utf8View(v) => Ok(v.as_deref().map(|s| to_bytes(xxh3_128(s.as_bytes())))), + ScalarValue::Binary(v) | ScalarValue::LargeBinary(v) => { + Ok(v.as_ref().map(|b| to_bytes(xxh3_128(b)))) + } + other => exec_err!("xxh128: unsupported scalar type {:?}", other.data_type()), + } +} + +fn hash_array(arr: &dyn Array) -> Result { + let len = arr.len(); + let mut builder = BinaryBuilder::with_capacity(len, len * 16); + + match arr.data_type() { + DataType::Utf8 => { + let arr = arr.as_any().downcast_ref::().expect("Utf8"); + for v in arr.iter() { + match v { + Some(s) => builder.append_value(to_bytes(xxh3_128(s.as_bytes()))), + None => builder.append_null(), + } + } + } + DataType::LargeUtf8 => { + let arr = arr + .as_any() + .downcast_ref::() + .expect("LargeUtf8"); + for v in arr.iter() { + match v { + Some(s) => builder.append_value(to_bytes(xxh3_128(s.as_bytes()))), + None => builder.append_null(), + } + } + } + DataType::Binary => { + let arr = arr.as_any().downcast_ref::().expect("Binary"); + for v in arr.iter() { + match v { + Some(b) => builder.append_value(to_bytes(xxh3_128(b))), + None => builder.append_null(), + } + } + } + other => return exec_err!("xxh128: unsupported array type {:?}", other), + } + + Ok(builder.finish()) +} + +#[cfg(test)] +mod test { + use super::*; + use arrow::datatypes::Field; + use datafusion::common::config::ConfigOptions; + use datafusion::logical_expr::{ColumnarValue, ScalarFunctionArgs}; + use datafusion::scalar::ScalarValue; + use std::sync::Arc; + + fn invoke(arg: ColumnarValue) -> Result { + Xxh128Func::new().invoke_with_args(ScalarFunctionArgs { + args: vec![arg], + arg_fields: vec![], + number_rows: 1, + return_field: Field::new("", DataType::Binary, true).into(), + config_options: Arc::new(ConfigOptions::default()), + }) + } + + #[test] + fn test_xxh128_string() { + let result = invoke(ColumnarValue::Scalar(ScalarValue::Utf8(Some( + "hello".to_string(), + )))) + .unwrap(); + match result { + ColumnarValue::Scalar(ScalarValue::Binary(Some(v))) => { + assert_eq!(v, to_bytes(xxh3_128(b"hello"))); + assert_eq!(v.len(), 16); + } + other => panic!("expected Scalar(Binary(Some(_))), got {:?}", other), + } + } + + #[test] + fn test_xxh128_binary_matches_string() { + let result = invoke(ColumnarValue::Scalar(ScalarValue::Binary(Some( + b"hello".to_vec(), + )))) + .unwrap(); + match result { + ColumnarValue::Scalar(ScalarValue::Binary(Some(v))) => { + assert_eq!(v, to_bytes(xxh3_128(b"hello"))); + } + other => panic!("expected Scalar(Binary(Some(_))), got {:?}", other), + } + } + + #[test] + fn test_xxh128_null() { + let result = invoke(ColumnarValue::Scalar(ScalarValue::Utf8(None))).unwrap(); + match result { + ColumnarValue::Scalar(ScalarValue::Binary(None)) => {} + other => panic!("expected Scalar(Binary(None)), got {:?}", other), + } + } +} From 0a15f0ffeaefd1313b7a9ee06c98031d6a2ae94c Mon Sep 17 00:00:00 2001 From: SzymonIwaniuk Date: Sat, 9 May 2026 16:32:35 +0200 Subject: [PATCH 7/7] feat(query-engine): address pr review suggestions --- rust/otap-dataflow/Cargo.toml | 2 +- .../crates/query-engine/Cargo.toml | 5 +++- .../crates/query-engine/src/consts.rs | 1 + .../crates/query-engine/src/parser.rs | 16 ++++++---- .../query-engine/src/pipeline/assign.rs | 11 ++++--- .../crates/query-engine/src/pipeline/expr.rs | 13 ++++++--- .../query-engine/src/pipeline/functions.rs | 3 ++ .../src/pipeline/functions/fnv.rs | 21 +++++++++++++- .../src/pipeline/functions/murmur3.rs | 21 +++++++++++++- .../src/pipeline/functions/sha1.rs | 29 ++++++++++++++++++- .../src/pipeline/functions/xxh128.rs | 29 ++++++++++++++++++- .../src/pipeline/functions/xxh3.rs | 25 ++++++++++++++-- 12 files changed, 155 insertions(+), 21 deletions(-) diff --git a/rust/otap-dataflow/Cargo.toml b/rust/otap-dataflow/Cargo.toml index d6718429ad..28743505de 100644 --- a/rust/otap-dataflow/Cargo.toml +++ b/rust/otap-dataflow/Cargo.toml @@ -211,7 +211,7 @@ weaver_forge = { git = "https://github.com/open-telemetry/weaver.git", tag = "v0 weaver_resolved_schema = { git = "https://github.com/open-telemetry/weaver.git", tag = "v0.21.2"} weaver_resolver = { git = "https://github.com/open-telemetry/weaver.git", tag = "v0.21.2"} weaver_semconv = { git = "https://github.com/open-telemetry/weaver.git", tag = "v0.21.2"} -sha1 = { version = "0.10", features = ["oid"] } +sha1 = { version = "0.10" } xxhash-rust = { version = "0.8", features = ["xxh3"] } zip = "=8.6.0" byte-unit = { version = "5.2.0", features = ["serde"] } diff --git a/rust/otap-dataflow/crates/query-engine/Cargo.toml b/rust/otap-dataflow/crates/query-engine/Cargo.toml index 85a8406b9c..72ba7b1690 100644 --- a/rust/otap-dataflow/crates/query-engine/Cargo.toml +++ b/rust/otap-dataflow/crates/query-engine/Cargo.toml @@ -18,7 +18,7 @@ datafusion = { workspace = true, features = [ ] } futures-core = { workspace = true } memchr = { workspace = true } -sha1 = { workspace = true } +sha1 = { workspace = true, optional = true } xxhash-rust = { workspace = true } parking_lot = { workspace = true } smallvec = { workspace = true } @@ -30,6 +30,9 @@ uuid = { workspace = true } otap-df-config = { workspace = true } otap-df-pdata = { workspace = true, features = ["testing"] } +[features] +sha1-hash = ["dep:sha1"] + [dev-dependencies] criterion = { workspace = true, features = ["async_tokio"] } data_engine_kql_parser = { workspace = true } diff --git a/rust/otap-dataflow/crates/query-engine/src/consts.rs b/rust/otap-dataflow/crates/query-engine/src/consts.rs index b5f36103dd..28ed15102e 100644 --- a/rust/otap-dataflow/crates/query-engine/src/consts.rs +++ b/rust/otap-dataflow/crates/query-engine/src/consts.rs @@ -16,6 +16,7 @@ pub(crate) const SHA256_FUNC_NAME: &str = "sha256"; pub(crate) const MD5_FUNC_NAME: &str = "md5"; pub(crate) const FNV_FUNC_NAME: &str = "fnv"; pub(crate) const MURMUR3_FUNC_NAME: &str = "murmur3"; +#[cfg(feature = "sha1-hash")] pub(crate) const SHA1_FUNC_NAME: &str = "sha1"; pub(crate) const SHA512_FUNC_NAME: &str = "sha512"; pub(crate) const XXH3_FUNC_NAME: &str = "xxh3"; diff --git a/rust/otap-dataflow/crates/query-engine/src/parser.rs b/rust/otap-dataflow/crates/query-engine/src/parser.rs index df8732f1fa..b24c0386c7 100644 --- a/rust/otap-dataflow/crates/query-engine/src/parser.rs +++ b/rust/otap-dataflow/crates/query-engine/src/parser.rs @@ -13,15 +13,17 @@ use data_engine_parser_abstractions::ParserOptions; use crate::consts::{ ENCODE_FUNC_NAME, FNV_FUNC_NAME, FORMAT_DATETIME_FUNC_NAME, LOWER_CASE_FUNC_NAME, LTRIM_FUNC_NAME, MD5_FUNC_NAME, MURMUR3_FUNC_NAME, REGEXP_SUBSTR_FUNC_NAME, RTRIM_FUNC_NAME, - SHA1_FUNC_NAME, SHA256_FUNC_NAME, SHA512_FUNC_NAME, UPPER_CASE_FUNC_NAME, UUID_FUNC_NAME, + SHA256_FUNC_NAME, SHA512_FUNC_NAME, UPPER_CASE_FUNC_NAME, UUID_FUNC_NAME, UUIDV7_FUNC_NAME, XXH3_FUNC_NAME, XXH128_FUNC_NAME, - }; +}; +#[cfg(feature = "sha1-hash")] +use crate::consts::SHA1_FUNC_NAME; /// Create parser options that can be used when parsing an expression that will be executed with /// this query engine #[must_use] pub fn default_parser_options() -> ParserOptions { - ParserOptions::new() + let opts = ParserOptions::new() // Add placeholders for scalar UDFs supported by this engine - these are needed because // the invoke function expression in our expression AST references the function by an ID, // adding these will make a named function with some ID available in the parser. Only the @@ -40,7 +42,6 @@ pub fn default_parser_options() -> ParserOptions { .with_external_function(MD5_FUNC_NAME, param_placeholders(1), None) .with_external_function(FNV_FUNC_NAME, param_placeholders(1), None) .with_external_function(MURMUR3_FUNC_NAME, param_placeholders(1), None) - .with_external_function(SHA1_FUNC_NAME, param_placeholders(1), None) .with_external_function(SHA512_FUNC_NAME, param_placeholders(1), None) .with_external_function(XXH3_FUNC_NAME, param_placeholders(1), None) .with_external_function(XXH128_FUNC_NAME, param_placeholders(1), None) @@ -112,7 +113,12 @@ pub fn default_parser_options() -> ParserOptions { ), ], None, - ) + ); + + #[cfg(feature = "sha1-hash")] + let opts = opts.with_external_function(SHA1_FUNC_NAME, param_placeholders(1), None); + + opts } fn param_placeholders( diff --git a/rust/otap-dataflow/crates/query-engine/src/pipeline/assign.rs b/rust/otap-dataflow/crates/query-engine/src/pipeline/assign.rs index e7db6950ff..3705426f27 100644 --- a/rust/otap-dataflow/crates/query-engine/src/pipeline/assign.rs +++ b/rust/otap-dataflow/crates/query-engine/src/pipeline/assign.rs @@ -4815,8 +4815,8 @@ mod test { .finish(), ]); let query = r#"logs | extend - attributes["str_attr"] = encode(md5(attributes["str_attr"]), "hex"), - attributes["binary_attr"] = encode(md5(attributes["binary_attr"]), "hex") + attributes["str_attr"] = md5(attributes["str_attr"]), + attributes["binary_attr"] = md5(attributes["binary_attr"]) "#; let pipeline_expr = P::parse_with_options(query, default_parser_options()) .unwrap() @@ -4833,11 +4833,11 @@ mod test { vec![ KeyValue::new( "str_attr", - AnyValue::new_string("a6105c0a611b41b08f1209506350279e") + AnyValue::new_string("415290769594460e2e485922904f345d") ), KeyValue::new( "binary_attr", - AnyValue::new_string("0ffe9bcd5a3d234d4e99e9a1fb9a5d2c") + AnyValue::new_string("d1f255a373a3cef72e03aa9d980c7eca") ) ] ); @@ -4938,6 +4938,7 @@ mod test { test_update_attr_to_murmur3_hash_function_call_result::().await } + #[cfg(feature = "sha1-hash")] async fn test_update_attr_to_sha1_hash_function_call_result() { let logs_data = to_logs_data(vec![ LogRecord::build() @@ -4970,11 +4971,13 @@ mod test { ); } + #[cfg(feature = "sha1-hash")] #[tokio::test] async fn test_update_attr_to_sha1_hash_function_call_result_opl_parser() { test_update_attr_to_sha1_hash_function_call_result::().await } + #[cfg(feature = "sha1-hash")] #[tokio::test] async fn test_update_attr_to_sha1_hash_function_call_result_kql_parser() { test_update_attr_to_sha1_hash_function_call_result::().await diff --git a/rust/otap-dataflow/crates/query-engine/src/pipeline/expr.rs b/rust/otap-dataflow/crates/query-engine/src/pipeline/expr.rs index b27424dba6..4e7e6f6079 100644 --- a/rust/otap-dataflow/crates/query-engine/src/pipeline/expr.rs +++ b/rust/otap-dataflow/crates/query-engine/src/pipeline/expr.rs @@ -80,18 +80,22 @@ use otap_df_pdata::schema::consts; use crate::consts::{ ENCODE_FUNC_NAME, FNV_FUNC_NAME, FORMAT_DATETIME_FUNC_NAME, LOWER_CASE_FUNC_NAME, LTRIM_FUNC_NAME, MD5_FUNC_NAME, MURMUR3_FUNC_NAME, REGEXP_SUBSTR_FUNC_NAME, RTRIM_FUNC_NAME, - SHA1_FUNC_NAME, SHA256_FUNC_NAME, SHA512_FUNC_NAME, UPPER_CASE_FUNC_NAME, UUID_FUNC_NAME, + SHA256_FUNC_NAME, SHA512_FUNC_NAME, UPPER_CASE_FUNC_NAME, UUID_FUNC_NAME, UUIDV7_FUNC_NAME, XXH3_FUNC_NAME, XXH128_FUNC_NAME, }; +#[cfg(feature = "sha1-hash")] +use crate::consts::SHA1_FUNC_NAME; use crate::error::{Error, Result}; use crate::pipeline::expr::join::{join, multi_join}; use crate::pipeline::expr::types::{ ExprLogicalType, coerce_arithmetic, nested_struct_field_type, root_field_type, }; use crate::pipeline::functions::{ - arity_range, fnv_hash, murmur3_hash, regexp_substr, sha1_hash, substring, xxh3_hash, - xxh128_hash, uuidv7 + arity_range, fnv_hash, murmur3_hash, regexp_substr, substring, xxh3_hash, + xxh128_hash, uuidv7, }; +#[cfg(feature = "sha1-hash")] +use crate::pipeline::functions::sha1_hash; use crate::pipeline::planner::{AttributesIdentifier, ColumnAccessor}; use crate::pipeline::project::anyval::{ find_any_value_columns, project_any_value_columns, stitch_partitioned_results, @@ -843,9 +847,10 @@ impl DataFusionFunctionDef { FORMAT_DATETIME_FUNC_NAME => Self::new(to_char(), ExprLogicalType::String, false, None), RTRIM_FUNC_NAME => Self::new(rtrim(), ExprLogicalType::String, true, None), SHA256_FUNC_NAME => Self::new(sha256(), ExprLogicalType::Binary, true, None), - MD5_FUNC_NAME => Self::new(md5(), ExprLogicalType::Binary, true, None), + MD5_FUNC_NAME => Self::new(md5(), ExprLogicalType::String, true, Some(DataType::Utf8)), FNV_FUNC_NAME => Self::new(fnv_hash(), ExprLogicalType::Int64, true, None), MURMUR3_FUNC_NAME => Self::new(murmur3_hash(), ExprLogicalType::Int64, true, None), + #[cfg(feature = "sha1-hash")] SHA1_FUNC_NAME => Self::new(sha1_hash(), ExprLogicalType::Binary, true, None), SHA512_FUNC_NAME => Self::new(sha512(), ExprLogicalType::Binary, true, None), XXH3_FUNC_NAME => Self::new(xxh3_hash(), ExprLogicalType::Int64, true, None), diff --git a/rust/otap-dataflow/crates/query-engine/src/pipeline/functions.rs b/rust/otap-dataflow/crates/query-engine/src/pipeline/functions.rs index eeee80036d..2424346d55 100644 --- a/rust/otap-dataflow/crates/query-engine/src/pipeline/functions.rs +++ b/rust/otap-dataflow/crates/query-engine/src/pipeline/functions.rs @@ -17,14 +17,17 @@ mod fnv; pub(crate) mod is_type; mod murmur3; mod regexp_substr; +#[cfg(feature = "sha1-hash")] mod sha1; mod substring; mod xxh128; mod xxh3; +mod uuidv7; make_udf_function!(contains::ExtendedContainsFunc, contains); make_udf_function!(fnv::FnvHashFunc, fnv_hash); make_udf_function!(murmur3::Murmur3HashFunc, murmur3_hash); +#[cfg(feature = "sha1-hash")] make_udf_function!(sha1::Sha1Func, sha1_hash); make_udf_function!(xxh128::Xxh128Func, xxh128_hash); make_udf_function!(xxh3::Xxh3Func, xxh3_hash); diff --git a/rust/otap-dataflow/crates/query-engine/src/pipeline/functions/fnv.rs b/rust/otap-dataflow/crates/query-engine/src/pipeline/functions/fnv.rs index 996b68b9e1..6980d613de 100644 --- a/rust/otap-dataflow/crates/query-engine/src/pipeline/functions/fnv.rs +++ b/rust/otap-dataflow/crates/query-engine/src/pipeline/functions/fnv.rs @@ -4,7 +4,10 @@ use std::any::Any; use std::sync::Arc; -use arrow::array::{Array, ArrayRef, BinaryArray, Int64Array, LargeStringArray, StringArray}; +use arrow::array::{ + Array, ArrayRef, BinaryArray, Int64Array, LargeBinaryArray, LargeStringArray, StringArray, + StringViewArray, +}; use arrow::datatypes::DataType; use datafusion::common::exec_err; use datafusion::error::Result; @@ -121,10 +124,26 @@ fn hash_array(arr: &dyn Array) -> Result { arr.iter().map(|v| v.map(|s| fnv1a_64(s.as_bytes()))), )) } + DataType::Utf8View => { + let arr = arr + .as_any() + .downcast_ref::() + .expect("Utf8View"); + Ok(Int64Array::from_iter( + arr.iter().map(|v| v.map(|s| fnv1a_64(s.as_bytes()))), + )) + } DataType::Binary => { let arr = arr.as_any().downcast_ref::().expect("Binary"); Ok(Int64Array::from_iter(arr.iter().map(|v| v.map(fnv1a_64)))) } + DataType::LargeBinary => { + let arr = arr + .as_any() + .downcast_ref::() + .expect("LargeBinary"); + Ok(Int64Array::from_iter(arr.iter().map(|v| v.map(fnv1a_64)))) + } other => exec_err!("fnv: unsupported array type {:?}", other), } } diff --git a/rust/otap-dataflow/crates/query-engine/src/pipeline/functions/murmur3.rs b/rust/otap-dataflow/crates/query-engine/src/pipeline/functions/murmur3.rs index 70bd05249c..fdd6de2281 100644 --- a/rust/otap-dataflow/crates/query-engine/src/pipeline/functions/murmur3.rs +++ b/rust/otap-dataflow/crates/query-engine/src/pipeline/functions/murmur3.rs @@ -4,7 +4,10 @@ use std::any::Any; use std::sync::Arc; -use arrow::array::{Array, ArrayRef, BinaryArray, Int64Array, LargeStringArray, StringArray}; +use arrow::array::{ + Array, ArrayRef, BinaryArray, Int64Array, LargeBinaryArray, LargeStringArray, StringArray, + StringViewArray, +}; use arrow::datatypes::DataType; use datafusion::common::exec_err; use datafusion::error::Result; @@ -155,10 +158,26 @@ fn hash_array(arr: &dyn Array) -> Result { arr.iter().map(|v| v.map(|s| murmur3_32(s.as_bytes()))), )) } + DataType::Utf8View => { + let arr = arr + .as_any() + .downcast_ref::() + .expect("Utf8View"); + Ok(Int64Array::from_iter( + arr.iter().map(|v| v.map(|s| murmur3_32(s.as_bytes()))), + )) + } DataType::Binary => { let arr = arr.as_any().downcast_ref::().expect("Binary"); Ok(Int64Array::from_iter(arr.iter().map(|v| v.map(murmur3_32)))) } + DataType::LargeBinary => { + let arr = arr + .as_any() + .downcast_ref::() + .expect("LargeBinary"); + Ok(Int64Array::from_iter(arr.iter().map(|v| v.map(murmur3_32)))) + } other => exec_err!("murmur3: unsupported array type {:?}", other), } } diff --git a/rust/otap-dataflow/crates/query-engine/src/pipeline/functions/sha1.rs b/rust/otap-dataflow/crates/query-engine/src/pipeline/functions/sha1.rs index e743c07712..ab76c002ba 100644 --- a/rust/otap-dataflow/crates/query-engine/src/pipeline/functions/sha1.rs +++ b/rust/otap-dataflow/crates/query-engine/src/pipeline/functions/sha1.rs @@ -4,7 +4,10 @@ use std::any::Any; use std::sync::Arc; -use arrow::array::{Array, ArrayRef, BinaryArray, BinaryBuilder, LargeStringArray, StringArray}; +use arrow::array::{ + Array, ArrayRef, BinaryArray, BinaryBuilder, LargeBinaryArray, LargeStringArray, StringArray, + StringViewArray, +}; use arrow::datatypes::DataType; use datafusion::common::exec_err; use datafusion::error::Result; @@ -120,6 +123,18 @@ fn hash_array(arr: &dyn Array) -> Result { } } } + DataType::Utf8View => { + let arr = arr + .as_any() + .downcast_ref::() + .expect("Utf8View"); + for v in arr.iter() { + match v { + Some(s) => builder.append_value(sha1_bytes(s.as_bytes())), + None => builder.append_null(), + } + } + } DataType::Binary => { let arr = arr.as_any().downcast_ref::().expect("Binary"); for v in arr.iter() { @@ -129,6 +144,18 @@ fn hash_array(arr: &dyn Array) -> Result { } } } + DataType::LargeBinary => { + let arr = arr + .as_any() + .downcast_ref::() + .expect("LargeBinary"); + for v in arr.iter() { + match v { + Some(b) => builder.append_value(sha1_bytes(b)), + None => builder.append_null(), + } + } + } other => return exec_err!("sha1: unsupported array type {:?}", other), } diff --git a/rust/otap-dataflow/crates/query-engine/src/pipeline/functions/xxh128.rs b/rust/otap-dataflow/crates/query-engine/src/pipeline/functions/xxh128.rs index 8111fc11e4..0df7f3cc48 100644 --- a/rust/otap-dataflow/crates/query-engine/src/pipeline/functions/xxh128.rs +++ b/rust/otap-dataflow/crates/query-engine/src/pipeline/functions/xxh128.rs @@ -4,7 +4,10 @@ use std::any::Any; use std::sync::Arc; -use arrow::array::{Array, ArrayRef, BinaryArray, BinaryBuilder, LargeStringArray, StringArray}; +use arrow::array::{ + Array, ArrayRef, BinaryArray, BinaryBuilder, LargeBinaryArray, LargeStringArray, StringArray, + StringViewArray, +}; use arrow::datatypes::DataType; use datafusion::common::exec_err; use datafusion::error::Result; @@ -120,6 +123,18 @@ fn hash_array(arr: &dyn Array) -> Result { } } } + DataType::Utf8View => { + let arr = arr + .as_any() + .downcast_ref::() + .expect("Utf8View"); + for v in arr.iter() { + match v { + Some(s) => builder.append_value(to_bytes(xxh3_128(s.as_bytes()))), + None => builder.append_null(), + } + } + } DataType::Binary => { let arr = arr.as_any().downcast_ref::().expect("Binary"); for v in arr.iter() { @@ -129,6 +144,18 @@ fn hash_array(arr: &dyn Array) -> Result { } } } + DataType::LargeBinary => { + let arr = arr + .as_any() + .downcast_ref::() + .expect("LargeBinary"); + for v in arr.iter() { + match v { + Some(b) => builder.append_value(to_bytes(xxh3_128(b))), + None => builder.append_null(), + } + } + } other => return exec_err!("xxh128: unsupported array type {:?}", other), } diff --git a/rust/otap-dataflow/crates/query-engine/src/pipeline/functions/xxh3.rs b/rust/otap-dataflow/crates/query-engine/src/pipeline/functions/xxh3.rs index 4d4521bf43..e28cd2f419 100644 --- a/rust/otap-dataflow/crates/query-engine/src/pipeline/functions/xxh3.rs +++ b/rust/otap-dataflow/crates/query-engine/src/pipeline/functions/xxh3.rs @@ -4,7 +4,10 @@ use std::any::Any; use std::sync::Arc; -use arrow::array::{Array, ArrayRef, Int64Array, LargeStringArray, StringArray}; +use arrow::array::{ + Array, ArrayRef, BinaryArray, Int64Array, LargeBinaryArray, LargeStringArray, StringArray, + StringViewArray, +}; use arrow::datatypes::DataType; use datafusion::common::exec_err; use datafusion::error::Result; @@ -107,15 +110,33 @@ fn hash_array(arr: &dyn Array) -> Result { arr.iter().map(|v| v.map(|s| xxh3_64(s.as_bytes()) as i64)), )) } + DataType::Utf8View => { + let arr = arr + .as_any() + .downcast_ref::() + .expect("Utf8View"); + Ok(Int64Array::from_iter( + arr.iter().map(|v| v.map(|s| xxh3_64(s.as_bytes()) as i64)), + )) + } DataType::Binary => { let arr = arr .as_any() - .downcast_ref::() + .downcast_ref::() .expect("Binary"); Ok(Int64Array::from_iter( arr.iter().map(|v| v.map(|b| xxh3_64(b) as i64)), )) } + DataType::LargeBinary => { + let arr = arr + .as_any() + .downcast_ref::() + .expect("LargeBinary"); + Ok(Int64Array::from_iter( + arr.iter().map(|v| v.map(|b| xxh3_64(b) as i64)), + )) + } other => exec_err!("xxh3: unsupported array type {:?}", other), } }