diff --git a/rust/otap-dataflow/crates/query-engine/src/consts.rs b/rust/otap-dataflow/crates/query-engine/src/consts.rs index 1fe1f6aa0d..5939a6700e 100644 --- a/rust/otap-dataflow/crates/query-engine/src/consts.rs +++ b/rust/otap-dataflow/crates/query-engine/src/consts.rs @@ -8,5 +8,7 @@ pub(crate) const SCOPE_FIELD_NAME: &str = "instrumentation_scope"; pub(crate) const VALUE_FIELD_NAME: &str = "value"; pub(crate) const ENCODE_FUNC_NAME: &str = "encode"; +pub(crate) const ENDS_WITH_FUNC_NAME: &str = "ends_with"; pub(crate) const REGEXP_SUBSTR_FUNC_NAME: &str = "regexp_substr"; pub(crate) const SHA256_FUNC_NAME: &str = "sha256"; +pub(crate) const STARTS_WITH_FUNC_NAME: &str = "starts_with"; diff --git a/rust/otap-dataflow/crates/query-engine/src/parser.rs b/rust/otap-dataflow/crates/query-engine/src/parser.rs index 5a6de7d0e9..51d53559d6 100644 --- a/rust/otap-dataflow/crates/query-engine/src/parser.rs +++ b/rust/otap-dataflow/crates/query-engine/src/parser.rs @@ -10,7 +10,10 @@ use data_engine_expressions::{ }; use data_engine_parser_abstractions::ParserOptions; -use crate::consts::{ENCODE_FUNC_NAME, REGEXP_SUBSTR_FUNC_NAME, SHA256_FUNC_NAME}; +use crate::consts::{ + ENCODE_FUNC_NAME, ENDS_WITH_FUNC_NAME, REGEXP_SUBSTR_FUNC_NAME, SHA256_FUNC_NAME, + STARTS_WITH_FUNC_NAME, +}; /// Create parser options that can be used when parsing an expression that will be executed with /// this query engine @@ -32,6 +35,8 @@ pub fn default_parser_options() -> ParserOptions { // .with_external_function(SHA256_FUNC_NAME, param_placeholders(1), None) .with_external_function(ENCODE_FUNC_NAME, param_placeholders(2), None) + .with_external_function(STARTS_WITH_FUNC_NAME, param_placeholders(2), None) + .with_external_function(ENDS_WITH_FUNC_NAME, param_placeholders(2), None) .with_external_function( REGEXP_SUBSTR_FUNC_NAME, vec![ diff --git a/rust/otap-dataflow/crates/query-engine/src/pipeline/expr.rs b/rust/otap-dataflow/crates/query-engine/src/pipeline/expr.rs index 3943f95414..51b4ec7561 100644 --- a/rust/otap-dataflow/crates/query-engine/src/pipeline/expr.rs +++ b/rust/otap-dataflow/crates/query-engine/src/pipeline/expr.rs @@ -61,7 +61,7 @@ use datafusion::common::DFSchema; use datafusion::functions::core::expr_ext::FieldAccessor; use datafusion::functions::crypto::sha256; use datafusion::functions::encoding::encode; -use datafusion::functions::string::{concat, concat_ws, replace}; +use datafusion::functions::string::{concat, concat_ws, ends_with, replace, starts_with}; use datafusion::logical_expr::expr::ScalarFunction; use datafusion::logical_expr::{ BinaryExpr, ColumnarValue, Expr, Operator, ScalarUDF, cast, col, lit, @@ -76,7 +76,10 @@ use otap_df_pdata::arrays::{ use otap_df_pdata::proto::opentelemetry::arrow::v1::ArrowPayloadType; use otap_df_pdata::schema::consts; -use crate::consts::{ENCODE_FUNC_NAME, REGEXP_SUBSTR_FUNC_NAME, SHA256_FUNC_NAME}; +use crate::consts::{ + ENCODE_FUNC_NAME, ENDS_WITH_FUNC_NAME, REGEXP_SUBSTR_FUNC_NAME, SHA256_FUNC_NAME, + STARTS_WITH_FUNC_NAME, +}; use crate::error::{Error, Result}; use crate::pipeline::expr::join::{join, multi_join}; use crate::pipeline::expr::types::{ @@ -823,10 +826,12 @@ impl DataFusionFunctionDef { // upstream in datafusion_functions) Some(match func_name { ENCODE_FUNC_NAME => Self::new(encode(), ExprLogicalType::String, false, None), + ENDS_WITH_FUNC_NAME => Self::new(ends_with(), ExprLogicalType::Boolean, true, None), REGEXP_SUBSTR_FUNC_NAME => { Self::new(regexp_substr(), ExprLogicalType::String, false, None) } SHA256_FUNC_NAME => Self::new(sha256(), ExprLogicalType::Binary, true, None), + STARTS_WITH_FUNC_NAME => Self::new(starts_with(), ExprLogicalType::Boolean, true, None), _ => return None, }) } @@ -1290,7 +1295,8 @@ impl PhysicalExprEvalResult { mod test { use super::*; use arrow::array::{ - BinaryArray, DictionaryArray, Float64Array, Int32Array, Int64Array, StructArray, UInt8Array, + BinaryArray, BooleanArray, DictionaryArray, Float64Array, Int32Array, Int64Array, + StructArray, UInt8Array, }; use data_engine_expressions::{ BinaryMathematicalScalarExpression, IntegerScalarExpression, @@ -3385,6 +3391,116 @@ mod test { assert_eq!(result_arr.as_ref(), &expected); } + #[test] + fn test_function_invocation_starts_with() { + let input_expr = ScalarExpression::InvokeFunction(InvokeFunctionScalarExpression::new( + QueryLocation::new_fake(), + None, + 0, + vec![ + InvokeFunctionArgument::Scalar(ScalarExpression::Source( + SourceScalarExpression::new( + QueryLocation::new_fake(), + ValueAccessor::new_with_selectors(vec![ScalarExpression::Static( + StaticScalarExpression::String(StringScalarExpression::new( + QueryLocation::new_fake(), + "event_name", + )), + )]), + ), + )), + InvokeFunctionArgument::Scalar(ScalarExpression::Static( + StaticScalarExpression::String(StringScalarExpression::new( + QueryLocation::new_fake(), + "ev", + )), + )), + ], + )); + + let functions = [PipelineFunction::new_external("starts_with", vec![], None)]; + + let logs = to_logs_data(vec![ + LogRecord::build().finish(), + LogRecord::build().event_name("event1").finish(), + LogRecord::build().event_name("other").finish(), + ]); + + let otap_batch = otlp_to_otap(&OtlpProtoMessage::Logs(logs)); + + let planner = ExprLogicalPlanner {}; + let logical_expr = planner.plan_scalar_expr(&input_expr, &functions).unwrap(); + let mut physical_expr = logical_expr.into_physical().unwrap(); + let session_ctx = Pipeline::create_session_context(); + let result = physical_expr.execute(&otap_batch, &session_ctx).unwrap(); + let result_vals = result.map(|result| result.values); + let result_arr = match &result_vals { + Some(ColumnarValue::Array(arr)) => arr, + otherwise => { + panic!("expected arr, got scalar {otherwise:?}") + } + }; + + let expected = BooleanArray::from_iter([None, Some(true), Some(false)]); + + assert_eq!(result_arr.as_ref(), &expected); + } + + #[test] + fn test_function_invocation_ends_with() { + let input_expr = ScalarExpression::InvokeFunction(InvokeFunctionScalarExpression::new( + QueryLocation::new_fake(), + None, + 0, + vec![ + InvokeFunctionArgument::Scalar(ScalarExpression::Source( + SourceScalarExpression::new( + QueryLocation::new_fake(), + ValueAccessor::new_with_selectors(vec![ScalarExpression::Static( + StaticScalarExpression::String(StringScalarExpression::new( + QueryLocation::new_fake(), + "event_name", + )), + )]), + ), + )), + InvokeFunctionArgument::Scalar(ScalarExpression::Static( + StaticScalarExpression::String(StringScalarExpression::new( + QueryLocation::new_fake(), + "1", + )), + )), + ], + )); + + let functions = [PipelineFunction::new_external("ends_with", vec![], None)]; + + let logs = to_logs_data(vec![ + LogRecord::build().finish(), + LogRecord::build().event_name("event1").finish(), + LogRecord::build().event_name("event2").finish(), + ]); + + let otap_batch = otlp_to_otap(&OtlpProtoMessage::Logs(logs)); + + let planner = ExprLogicalPlanner {}; + let logical_expr = planner.plan_scalar_expr(&input_expr, &functions).unwrap(); + let mut physical_expr = logical_expr.into_physical().unwrap(); + let session_ctx = Pipeline::create_session_context(); + let result = physical_expr.execute(&otap_batch, &session_ctx).unwrap(); + let result_vals = result.map(|result| result.values); + let result_arr = match &result_vals { + Some(ColumnarValue::Array(arr)) => arr, + otherwise => { + panic!("expected arr, got scalar {otherwise:?}") + } + }; + + let expected = BooleanArray::from_iter([None, Some(true), Some(false)]); + + assert_eq!(result_arr.as_ref(), &expected); + } + // ----- Tests for multi-scope function arguments (MultiJoin) ----- /// Tests concat(severity_text, attributes["k1"]) where args come from Root and Attributes diff --git a/rust/otap-dataflow/crates/query-engine/src/pipeline/filter.rs b/rust/otap-dataflow/crates/query-engine/src/pipeline/filter.rs index e9cb541824..ab4c5e5a1f 100644 --- a/rust/otap-dataflow/crates/query-engine/src/pipeline/filter.rs +++ b/rust/otap-dataflow/crates/query-engine/src/pipeline/filter.rs @@ -2656,6 +2656,129 @@ mod test { .await; } + #[tokio::test] + async fn test_filter_event_name_using_starts_with_opl() { + let input = vec![ + LogRecord::build().event_name("hello world").finish(), + LogRecord::build().event_name("hello arrow").finish(), + LogRecord::build().event_name("world hello").finish(), + LogRecord::build().finish(), + LogRecord::build().event_name("hello").finish(), + ]; + + let query = r#"logs | where starts_with(event_name, "hello")"#; + let result = exec_logs_pipeline::(query, to_logs_data(input.clone())).await; + + assert_eq!( + &result.resource_logs[0].scope_logs[0].log_records, + &[input[0].clone(), input[1].clone(), input[4].clone()] + ); + + // column on the right + let query = r#"logs | where starts_with("hello world", event_name)"#; + let result = exec_logs_pipeline::(query, to_logs_data(input.clone())).await; + + assert_eq!( + &result.resource_logs[0].scope_logs[0].log_records, + &[input[0].clone(), input[4].clone()] + ); + } + + #[tokio::test] + async fn test_filter_event_name_using_ends_with_opl() { + let input = vec![ + LogRecord::build().event_name("hello world").finish(), + LogRecord::build().event_name("goodbye world").finish(), + LogRecord::build().event_name("hello arrow").finish(), + LogRecord::build().finish(), + LogRecord::build().event_name("world").finish(), + ]; + + let query = r#"logs | where ends_with(event_name, "world")"#; + let result = exec_logs_pipeline::(query, to_logs_data(input.clone())).await; + + assert_eq!( + &result.resource_logs[0].scope_logs[0].log_records, + &[input[0].clone(), input[1].clone(), input[4].clone()] + ); + + // column on the right + let query = r#"logs | where ends_with("hello world", event_name)"#; + let result = exec_logs_pipeline::(query, to_logs_data(input.clone())).await; + + assert_eq!( + &result.resource_logs[0].scope_logs[0].log_records, + &[input[0].clone(), input[4].clone()] + ); + } + + #[tokio::test] + async fn test_filter_attrs_using_starts_with_opl() { + let log_records = vec![ + LogRecord::build() + .attributes(vec![KeyValue::new( + "username", + AnyValue::new_string("albert"), + )]) + .event_name("1") + .finish(), + LogRecord::build() + .attributes(vec![KeyValue::new( + "username", + AnyValue::new_string("alice"), + )]) + .event_name("2") + .finish(), + LogRecord::build() + .attributes(vec![KeyValue::new("username", AnyValue::new_string("bob"))]) + .event_name("3") + .finish(), + ]; + + let query = r#"logs | where starts_with(attributes["username"], "al")"#; + let result = + exec_logs_pipeline::(query, to_logs_data(log_records.clone())).await; + assert_eq!( + &result.resource_logs[0].scope_logs[0].log_records, + &[log_records[0].clone(), log_records[1].clone()] + ); + } + + #[tokio::test] + async fn test_filter_attrs_using_ends_with_opl() { + let log_records = vec![ + LogRecord::build() + .attributes(vec![KeyValue::new( + "filename", + AnyValue::new_string("report.pdf"), + )]) + .event_name("1") + .finish(), + LogRecord::build() + .attributes(vec![KeyValue::new( + "filename", + AnyValue::new_string("notes.pdf"), + )]) + .event_name("2") + .finish(), + LogRecord::build() + .attributes(vec![KeyValue::new( + "filename", + AnyValue::new_string("README.md"), + )]) + .event_name("3") + .finish(), + ]; + + let query = r#"logs | where ends_with(attributes["filename"], ".pdf")"#; + let result = + exec_logs_pipeline::(query, to_logs_data(log_records.clone())).await; + assert_eq!( + &result.resource_logs[0].scope_logs[0].log_records, + &[log_records[0].clone(), log_records[1].clone()] + ); + } + async fn test_filter_matches_regex(q1: &str, q2: &str) { let log_records = vec![ LogRecord::build()