Skip to content
Open
1 change: 1 addition & 0 deletions rust/otap-dataflow/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,7 @@ weaver_forge = { git = "https://github.com/open-telemetry/weaver.git", tag = "v0
weaver_resolved_schema = { git = "https://github.com/open-telemetry/weaver.git", tag = "v0.21.2"}
weaver_resolver = { git = "https://github.com/open-telemetry/weaver.git", tag = "v0.21.2"}
weaver_semconv = { git = "https://github.com/open-telemetry/weaver.git", tag = "v0.21.2"}
sha1 = { version = "0.10", features = ["oid"] }
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we avoid making sha1 a default dependency here, or gate just the sha1() function behind a feature? otap-df-query-engine is pulled into the normal df_engine build through core-nodes/transform-processor, so this makes SHA-1 part of the default engine dependency graph. Also, the workspace sha1 dependency enables oid, which does not look used by this implementation.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for reference, the concerns on sha-1 - #2827

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I gated sha1 behind an sha1-hash cargo feature. The module, UDF registration, parser, and tests are all behind #[cfg(feature = "sha1-hash")]

Copy link
Copy Markdown
Contributor

@lquerel lquerel May 9, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@SzymonIwaniuk @lalitb That could be done in a future PR but I’m slightly concerned by the growing number of SHA-1 related features. While the current usages are valid (e.g. WebSocket protocol compatibility or non-cryptographic hash functions in OPL), multiplying fine-grained features could make the build matrix and dependency story harder to reason about over time.

I’d recommend converging toward a single high-level compatibility feature (e.g. sha1-compat) controlling all SHA-1 usage globally, instead of component-specific flags. Internally, all SHA-1 usage should go through a shared utility module with explicit documentation clarifying that it is used for protocol compatibility/non-security purposes only.

Please open a GH issue to track this if not integrated in this PR. Thanks.

Copy link
Copy Markdown
Author

@SzymonIwaniuk SzymonIwaniuk May 9, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@lquerel @lalitb I agree with the consolidation of SHA-1 related feature flags. I think this would be better addressed in a follow-up issue rather than in this PR, as it involves changes across multiple components. Feel free to assign it to me once the issue is created after this PR. @lalitb I'd like to know what you think about this approach, would that work for you?

xxhash-rust = { version = "0.8", features = ["xxh3"] }
zip = "=8.6.0"
byte-unit = { version = "5.2.0", features = ["serde"] }
Expand Down
2 changes: 2 additions & 0 deletions rust/otap-dataflow/crates/query-engine/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ datafusion = { workspace = true, features = [
] }
futures-core = { workspace = true }
memchr = { workspace = true }
sha1 = { workspace = true }
xxhash-rust = { workspace = true }
parking_lot = { workspace = true }
smallvec = { workspace = true }
thiserror = { workspace = true }
Expand Down
7 changes: 7 additions & 0 deletions rust/otap-dataflow/crates/query-engine/src/consts.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,13 @@ pub(crate) const LTRIM_FUNC_NAME: &str = "ltrim";
pub(crate) const REGEXP_SUBSTR_FUNC_NAME: &str = "regexp_substr";
pub(crate) const RTRIM_FUNC_NAME: &str = "rtrim";
pub(crate) const SHA256_FUNC_NAME: &str = "sha256";
pub(crate) const MD5_FUNC_NAME: &str = "md5";
pub(crate) const FNV_FUNC_NAME: &str = "fnv";
pub(crate) const MURMUR3_FUNC_NAME: &str = "murmur3";
pub(crate) const SHA1_FUNC_NAME: &str = "sha1";
pub(crate) const SHA512_FUNC_NAME: &str = "sha512";
pub(crate) const XXH3_FUNC_NAME: &str = "xxh3";
pub(crate) const XXH128_FUNC_NAME: &str = "xxh128";
pub(crate) const UUID_FUNC_NAME: &str = "uuid";
pub(crate) const UUIDV7_FUNC_NAME: &str = "uuidv7";
pub(crate) const LOWER_CASE_FUNC_NAME: &str = "lower_case";
Expand Down
16 changes: 12 additions & 4 deletions rust/otap-dataflow/crates/query-engine/src/parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,11 @@ use data_engine_expressions::{
use data_engine_parser_abstractions::ParserOptions;

use crate::consts::{
ENCODE_FUNC_NAME, FORMAT_DATETIME_FUNC_NAME, LOWER_CASE_FUNC_NAME, LTRIM_FUNC_NAME,
REGEXP_SUBSTR_FUNC_NAME, RTRIM_FUNC_NAME, SHA256_FUNC_NAME, UPPER_CASE_FUNC_NAME,
UUID_FUNC_NAME, UUIDV7_FUNC_NAME,
};
ENCODE_FUNC_NAME, FNV_FUNC_NAME, FORMAT_DATETIME_FUNC_NAME, LOWER_CASE_FUNC_NAME,
LTRIM_FUNC_NAME, MD5_FUNC_NAME, MURMUR3_FUNC_NAME, REGEXP_SUBSTR_FUNC_NAME, RTRIM_FUNC_NAME,
SHA1_FUNC_NAME, SHA256_FUNC_NAME, SHA512_FUNC_NAME, UPPER_CASE_FUNC_NAME, UUID_FUNC_NAME,
UUIDV7_FUNC_NAME, XXH3_FUNC_NAME, XXH128_FUNC_NAME,
};

/// Create parser options that can be used when parsing an expression that will be executed with
/// this query engine
Expand All @@ -36,6 +37,13 @@ pub fn default_parser_options() -> ParserOptions {
//
.with_external_function(FORMAT_DATETIME_FUNC_NAME, param_placeholders(2), None)
.with_external_function(SHA256_FUNC_NAME, param_placeholders(1), None)
.with_external_function(MD5_FUNC_NAME, param_placeholders(1), None)
.with_external_function(FNV_FUNC_NAME, param_placeholders(1), None)
.with_external_function(MURMUR3_FUNC_NAME, param_placeholders(1), None)
.with_external_function(SHA1_FUNC_NAME, param_placeholders(1), None)
.with_external_function(SHA512_FUNC_NAME, param_placeholders(1), None)
.with_external_function(XXH3_FUNC_NAME, param_placeholders(1), None)
.with_external_function(XXH128_FUNC_NAME, param_placeholders(1), None)
.with_external_function(ENCODE_FUNC_NAME, param_placeholders(2), None)
.with_external_function(UUID_FUNC_NAME, param_placeholders(0), None)
.with_external_function(UUIDV7_FUNC_NAME, param_placeholders(0), None)
Expand Down
300 changes: 300 additions & 0 deletions rust/otap-dataflow/crates/query-engine/src/pipeline/assign.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4805,6 +4805,306 @@ mod test {
test_update_attr_to_hash_function_call_result_all_supported_types::<KqlParser>().await
}

async fn test_update_attr_to_md5_function_call_result<P: Parser>() {
let logs_data = to_logs_data(vec![
LogRecord::build()
.attributes(vec![
KeyValue::new("str_attr", AnyValue::new_string("y")),
KeyValue::new("binary_attr", AnyValue::new_bytes(b"418")),
])
.finish(),
]);
let query = r#"logs | extend
attributes["str_attr"] = encode(md5(attributes["str_attr"]), "hex"),
attributes["binary_attr"] = encode(md5(attributes["binary_attr"]), "hex")
"#;
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Related to the md5() return type: DataFusion md5() already returns a hex string, so should these be md5(attributes["..."]) directly instead of wrapping with encode(..., "hex")?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is correct - but will need to add change from this comment for this test to pass

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done, I changed the return type toExprLogicalType::String with Some(DataType::Utf8) and removed the encode() wrapper from the test, also updated the expected hash values accordingly.

let pipeline_expr = P::parse_with_options(query, default_parser_options())
.unwrap()
.pipeline;
let mut pipeline = Pipeline::new(pipeline_expr);
let input = otlp_to_otap(&OtlpProtoMessage::Logs(logs_data));
let result = pipeline.execute(input).await.unwrap();
let OtlpProtoMessage::Logs(result_logs_data) = otap_to_otlp(&result) else {
panic!("invalid signal type");
};
let log_0 = &result_logs_data.resource_logs[0].scope_logs[0].log_records[0];
assert_eq!(
log_0.attributes,
vec![
KeyValue::new(
"str_attr",
AnyValue::new_string("a6105c0a611b41b08f1209506350279e")
),
KeyValue::new(
"binary_attr",
AnyValue::new_string("0ffe9bcd5a3d234d4e99e9a1fb9a5d2c")
)
]
);
}

#[tokio::test]
async fn test_update_attr_to_md5_function_call_result_opl_parser() {
test_update_attr_to_md5_function_call_result::<OplParser>().await
}
#[tokio::test]
async fn test_update_attr_to_md5_function_call_result_kql_parser() {
test_update_attr_to_md5_function_call_result::<KqlParser>().await
}

async fn test_update_attr_to_fnv_hash_function_call_result<P: Parser>() {
let logs_data = to_logs_data(vec![
LogRecord::build()
.attributes(vec![KeyValue::new(
"str_attr",
AnyValue::new_string("hello"),
)])
.finish(),
]);

// fnv returns an Int64 directly - no encode() wrapper needed
let query = r#"logs | extend attributes["str_attr"] = fnv(attributes["str_attr"])"#;
let pipeline_expr = P::parse_with_options(query, default_parser_options())
.unwrap()
.pipeline;
let mut pipeline = Pipeline::new(pipeline_expr);

let input = otlp_to_otap(&OtlpProtoMessage::Logs(logs_data));
let result = pipeline.execute(input).await.unwrap();
let OtlpProtoMessage::Logs(result_logs_data) = otap_to_otlp(&result) else {
panic!("invalid signal type");
};
let log_0 = &result_logs_data.resource_logs[0].scope_logs[0].log_records[0];
assert_eq!(
log_0.attributes,
vec![KeyValue::new(
"str_attr",
// FNV-1a 64-bit of "hello" interpreted as i64
AnyValue::new_int(-6615550055289275125_i64)
)]
);
}

#[tokio::test]
async fn test_update_attr_to_fnv_hash_function_call_result_opl_parser() {
test_update_attr_to_fnv_hash_function_call_result::<OplParser>().await
}

#[tokio::test]
async fn test_update_attr_to_fnv_hash_function_call_result_kql_parser() {
test_update_attr_to_fnv_hash_function_call_result::<KqlParser>().await
}

async fn test_update_attr_to_murmur3_hash_function_call_result<P: Parser>() {
let logs_data = to_logs_data(vec![
LogRecord::build()
.attributes(vec![KeyValue::new(
"str_attr",
AnyValue::new_string("hello"),
)])
.finish(),
]);

// murmur3 returns an Int64 directly - no encode() wrapper needed
let query = r#"logs | extend attributes["str_attr"] = murmur3(attributes["str_attr"])"#;
let pipeline_expr = P::parse_with_options(query, default_parser_options())
.unwrap()
.pipeline;
let mut pipeline = Pipeline::new(pipeline_expr);

let input = otlp_to_otap(&OtlpProtoMessage::Logs(logs_data));
let result = pipeline.execute(input).await.unwrap();
let OtlpProtoMessage::Logs(result_logs_data) = otap_to_otlp(&result) else {
panic!("invalid signal type");
};
let log_0 = &result_logs_data.resource_logs[0].scope_logs[0].log_records[0];
assert_eq!(
log_0.attributes,
vec![KeyValue::new(
"str_attr",
// MurmurHash3 32-bit of "hello" with seed=0
AnyValue::new_int(613_153_351_i64)
)]
);
}

#[tokio::test]
async fn test_update_attr_to_murmur3_hash_function_call_result_opl_parser() {
test_update_attr_to_murmur3_hash_function_call_result::<OplParser>().await
}

#[tokio::test]
async fn test_update_attr_to_murmur3_hash_function_call_result_kql_parser() {
test_update_attr_to_murmur3_hash_function_call_result::<KqlParser>().await
}

async fn test_update_attr_to_sha1_hash_function_call_result<P: Parser>() {
let logs_data = to_logs_data(vec![
LogRecord::build()
.attributes(vec![KeyValue::new(
"str_attr",
AnyValue::new_string("hello"),
)])
.finish(),
]);

let query =
r#"logs | extend attributes["str_attr"] = encode(sha1(attributes["str_attr"]), "hex")"#;
let pipeline_expr = P::parse_with_options(query, default_parser_options())
.unwrap()
.pipeline;
let mut pipeline = Pipeline::new(pipeline_expr);

let input = otlp_to_otap(&OtlpProtoMessage::Logs(logs_data));
let result = pipeline.execute(input).await.unwrap();
let OtlpProtoMessage::Logs(result_logs_data) = otap_to_otlp(&result) else {
panic!("invalid signal type");
};
let log_0 = &result_logs_data.resource_logs[0].scope_logs[0].log_records[0];
assert_eq!(
log_0.attributes,
vec![KeyValue::new(
"str_attr",
AnyValue::new_string("aaf4c61ddcc5e8a2dabede0f3b482cd9aea9434d")
)]
);
}

#[tokio::test]
async fn test_update_attr_to_sha1_hash_function_call_result_opl_parser() {
test_update_attr_to_sha1_hash_function_call_result::<OplParser>().await
}

#[tokio::test]
async fn test_update_attr_to_sha1_hash_function_call_result_kql_parser() {
test_update_attr_to_sha1_hash_function_call_result::<KqlParser>().await
}

async fn test_update_attr_to_sha512_hash_function_call_result<P: Parser>() {
let logs_data = to_logs_data(vec![
LogRecord::build()
.attributes(vec![KeyValue::new(
"str_attr",
AnyValue::new_string("hello"),
)])
.finish(),
]);

let query = r#"logs | extend attributes["str_attr"] = encode(sha512(attributes["str_attr"]), "hex")"#;
let pipeline_expr = P::parse_with_options(query, default_parser_options())
.unwrap()
.pipeline;
let mut pipeline = Pipeline::new(pipeline_expr);

let input = otlp_to_otap(&OtlpProtoMessage::Logs(logs_data));
let result = pipeline.execute(input).await.unwrap();
let OtlpProtoMessage::Logs(result_logs_data) = otap_to_otlp(&result) else {
panic!("invalid signal type");
};
let log_0 = &result_logs_data.resource_logs[0].scope_logs[0].log_records[0];
assert_eq!(
log_0.attributes,
vec![KeyValue::new(
"str_attr",
AnyValue::new_string(
"9b71d224bd62f3785d96d46ad3ea3d73319bfbc2890caadae2dff72519673ca72323c3d99ba5c11d7c7acc6e14b8c5da0c4663475c2e5c3adef46f73bcdec043"
)
)]
);
}

#[tokio::test]
async fn test_update_attr_to_sha512_hash_function_call_result_opl_parser() {
test_update_attr_to_sha512_hash_function_call_result::<OplParser>().await
}

#[tokio::test]
async fn test_update_attr_to_sha512_hash_function_call_result_kql_parser() {
test_update_attr_to_sha512_hash_function_call_result::<KqlParser>().await
}

async fn test_update_attr_to_xxh3_hash_function_call_result<P: Parser>() {
let logs_data = to_logs_data(vec![
LogRecord::build()
.attributes(vec![KeyValue::new(
"str_attr",
AnyValue::new_string("hello"),
)])
.finish(),
]);

let query = r#"logs | extend attributes["str_attr"] = xxh3(attributes["str_attr"])"#;
let pipeline_expr = P::parse_with_options(query, default_parser_options())
.unwrap()
.pipeline;
let mut pipeline = Pipeline::new(pipeline_expr);

let input = otlp_to_otap(&OtlpProtoMessage::Logs(logs_data));
let result = pipeline.execute(input).await.unwrap();
let OtlpProtoMessage::Logs(result_logs_data) = otap_to_otlp(&result) else {
panic!("invalid signal type");
};
let log_0 = &result_logs_data.resource_logs[0].scope_logs[0].log_records[0];
assert_eq!(
log_0.attributes,
vec![KeyValue::new(
"str_attr",
AnyValue::new_int(-7685981735718036227_i64)
)]
);
}

#[tokio::test]
async fn test_update_attr_to_xxh3_hash_function_call_result_opl_parser() {
test_update_attr_to_xxh3_hash_function_call_result::<OplParser>().await
}

#[tokio::test]
async fn test_update_attr_to_xxh3_hash_function_call_result_kql_parser() {
test_update_attr_to_xxh3_hash_function_call_result::<KqlParser>().await
}

async fn test_update_attr_to_xxh128_hash_function_call_result<P: Parser>() {
let logs_data = to_logs_data(vec![
LogRecord::build()
.attributes(vec![KeyValue::new(
"str_attr",
AnyValue::new_string("hello"),
)])
.finish(),
]);

let query = r#"logs | extend attributes["str_attr"] = encode(xxh128(attributes["str_attr"]), "hex")"#;
let pipeline_expr = P::parse_with_options(query, default_parser_options())
.unwrap()
.pipeline;
let mut pipeline = Pipeline::new(pipeline_expr);

let input = otlp_to_otap(&OtlpProtoMessage::Logs(logs_data));
let result = pipeline.execute(input).await.unwrap();
let OtlpProtoMessage::Logs(result_logs_data) = otap_to_otlp(&result) else {
panic!("invalid signal type");
};
let log_0 = &result_logs_data.resource_logs[0].scope_logs[0].log_records[0];
assert_eq!(
log_0.attributes,
vec![KeyValue::new(
"str_attr",
AnyValue::new_string("b5e9c1ad071b3e7fc779cfaa5e523818")
)]
);
}

#[tokio::test]
async fn test_update_attr_to_xxh128_hash_function_call_result_opl_parser() {
test_update_attr_to_xxh128_hash_function_call_result::<OplParser>().await
}

#[tokio::test]
async fn test_update_attr_to_xxh128_hash_function_call_result_kql_parser() {
test_update_attr_to_xxh128_hash_function_call_result::<KqlParser>().await
}

async fn test_update_attr_to_substring_function_call_result<P: Parser>() {
let logs_data = to_logs_data(vec![
LogRecord::build()
Expand Down
Loading
Loading