Skip to content

Commit 79c5d8e

Browse files
committed
Use DataFusion CoalesceFunc::simplify instead of duplicating CASE rewrite.
Add assign.rs integration tests for coalesce via extend (OPL + KQL). Adjust fixtures so MultiJoin row alignment matches explicit-null attrs. Signed-off-by: Parameshwaran Krishnasamy <Parameshwaran.K@ibm.com>
1 parent 6c8f8d3 commit 79c5d8e

2 files changed

Lines changed: 98 additions & 36 deletions

File tree

rust/otap-dataflow/crates/query-engine/src/pipeline/assign.rs

Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5979,6 +5979,86 @@ mod test {
59795979
test_update_attr_to_upper_case_function_call::<KqlParser>().await
59805980
}
59815981

5982+
/// `extend` with `coalesce` across two attribute keys (see #2905).
5983+
///
5984+
/// The second log uses explicit null `attr1` so the `attr1` attribute batch still has a row
5985+
/// for that log (MultiJoin follows the first attribute argument's row set).
5986+
///
5987+
/// The third log uses explicit null `attr1` and `attr2` so the join still has rows before
5988+
/// the literal default `"foo"`.
5989+
async fn test_update_attr_coalesce_function_call<P: Parser>() {
5990+
let logs_data = to_logs_data(vec![
5991+
LogRecord::build()
5992+
.attributes(vec![
5993+
KeyValue::new("attr1", AnyValue::new_string("X")),
5994+
KeyValue::new("attr2", AnyValue::new_string("Y")),
5995+
])
5996+
.finish(),
5997+
LogRecord::build()
5998+
.attributes(vec![
5999+
KeyValue::new("attr1", AnyValue { value: None }),
6000+
KeyValue::new("attr2", AnyValue::new_string("Z")),
6001+
])
6002+
.finish(),
6003+
LogRecord::build()
6004+
.attributes(vec![
6005+
KeyValue::new("attr1", AnyValue { value: None }),
6006+
KeyValue::new("attr2", AnyValue { value: None }),
6007+
])
6008+
.finish(),
6009+
]);
6010+
6011+
let query = r#"logs | extend attributes["attr3"] = coalesce(attributes["attr1"], attributes["attr2"], "foo")"#;
6012+
let pipeline_expr = P::parse_with_options(query, default_parser_options())
6013+
.unwrap()
6014+
.pipeline;
6015+
let mut pipeline = Pipeline::new(pipeline_expr);
6016+
6017+
let input = otlp_to_otap(&OtlpProtoMessage::Logs(logs_data));
6018+
let result = pipeline.execute(input).await.unwrap();
6019+
let OtlpProtoMessage::Logs(result_logs_data) = otap_to_otlp(&result) else {
6020+
panic!("invalid signal type");
6021+
};
6022+
let log_0 = &result_logs_data.resource_logs[0].scope_logs[0].log_records[0];
6023+
assert_eq!(
6024+
log_0.attributes,
6025+
vec![
6026+
KeyValue::new("attr1", AnyValue::new_string("X")),
6027+
KeyValue::new("attr2", AnyValue::new_string("Y")),
6028+
KeyValue::new("attr3", AnyValue::new_string("X")),
6029+
]
6030+
);
6031+
let log_1 = &result_logs_data.resource_logs[0].scope_logs[0].log_records[1];
6032+
assert_eq!(
6033+
log_1.attributes,
6034+
vec![
6035+
KeyValue::new("attr1", AnyValue { value: None }),
6036+
KeyValue::new("attr2", AnyValue::new_string("Z")),
6037+
KeyValue::new("attr3", AnyValue::new_string("Z")),
6038+
]
6039+
);
6040+
6041+
let log_2 = &result_logs_data.resource_logs[0].scope_logs[0].log_records[2];
6042+
assert_eq!(
6043+
log_2.attributes,
6044+
vec![
6045+
KeyValue::new("attr1", AnyValue { value: None }),
6046+
KeyValue::new("attr2", AnyValue { value: None }),
6047+
KeyValue::new("attr3", AnyValue::new_string("foo")),
6048+
]
6049+
);
6050+
}
6051+
6052+
#[tokio::test]
6053+
async fn test_update_attr_coalesce_function_call_opl_parser() {
6054+
test_update_attr_coalesce_function_call::<OplParser>().await
6055+
}
6056+
6057+
#[tokio::test]
6058+
async fn test_update_attr_coalesce_function_call_kql_parser() {
6059+
test_update_attr_coalesce_function_call::<KqlParser>().await
6060+
}
6061+
59826062
async fn test_update_attr_to_lower_case_function_call<P: Parser>() {
59836063
let logs_data = to_logs_data(vec![
59846064
LogRecord::build()

rust/otap-dataflow/crates/query-engine/src/pipeline/expr.rs

Lines changed: 18 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@ use data_engine_expressions::{
5858
StaticScalarExpression, StringScalarExpression, StringValue, TextScalarExpression,
5959
};
6060
use datafusion::common::DFSchema;
61+
use datafusion::functions::core::coalesce::CoalesceFunc;
6162
use datafusion::functions::core::expr_ext::FieldAccessor;
6263
use datafusion::functions::crypto::{md5, sha256, sha512};
6364
use datafusion::functions::datetime::to_char;
@@ -66,8 +67,9 @@ use datafusion::functions::encoding::encode;
6667
use datafusion::functions::math::log10;
6768
use datafusion::functions::string::{concat, concat_ws, lower, ltrim, replace, rtrim, upper, uuid};
6869
use datafusion::logical_expr::expr::ScalarFunction;
70+
use datafusion::logical_expr::simplify::{ExprSimplifyResult, SimplifyContext};
6971
use datafusion::logical_expr::{
70-
BinaryExpr, ColumnarValue, Expr, Operator, ScalarUDF, cast, col, lit, when,
72+
BinaryExpr, ColumnarValue, Expr, Operator, ScalarUDF, ScalarUDFImpl, cast, col, lit,
7173
};
7274
use datafusion::physical_expr::{PhysicalExprRef, create_physical_expr};
7375
use datafusion::prelude::SessionContext;
@@ -426,10 +428,21 @@ impl ExprLogicalPlanner {
426428
let (df_args, source_scope, _requires_dict_downcast) =
427429
self.plan_function_args(coalesce_expr.get_expressions().iter(), functions)?;
428430

429-
// DataFusion's `coalesce` UDF is simplified to CASE during logical optimization and does not
430-
// support direct physical evaluation. Mirror that rewrite here (same shape as
431-
// `CoalesceFunc::simplify` in Apache DataFusion).
432-
let case_expr = Self::coalesce_args_to_case_expr(df_args)?;
431+
// DataFusion's `coalesce` UDF does not support direct physical evaluation; the optimizer
432+
// rewrites it via `CoalesceFunc::simplify`. Reuse that implementation here.
433+
let coalesce_func = CoalesceFunc::new();
434+
let simplify_result = coalesce_func
435+
.simplify(df_args, &SimplifyContext::default())
436+
.map_err(Error::from)?;
437+
let case_expr = match simplify_result {
438+
ExprSimplifyResult::Simplified(expr) => expr,
439+
ExprSimplifyResult::Original(_) => {
440+
return Err(Error::InvalidPipelineError {
441+
cause: "expected coalesce simplify to produce a single expression".into(),
442+
query_location: None,
443+
});
444+
}
445+
};
433446

434447
Ok(ScopedLogicalExpr {
435448
logical_expr: case_expr,
@@ -441,37 +454,6 @@ impl ExprLogicalPlanner {
441454
})
442455
}
443456

444-
/// Rewrites `coalesce(e1, …, eN)` to chained `WHEN eK IS NOT NULL THEN eK … ELSE eN`,
445-
/// matching DataFusion's `coalesce` simplification.
446-
fn coalesce_args_to_case_expr(mut args: Vec<Expr>) -> Result<Expr> {
447-
match args.len() {
448-
0 => Err(Error::InvalidPipelineError {
449-
cause: "coalesce requires at least one argument".into(),
450-
query_location: None,
451-
}),
452-
1 => Ok(args.pop().ok_or_else(|| Error::InvalidPipelineError {
453-
cause: "coalesce internal error: empty args after len check".into(),
454-
query_location: None,
455-
})?),
456-
_ => {
457-
let last = args.pop().ok_or_else(|| Error::InvalidPipelineError {
458-
cause: "coalesce internal error: missing last argument".into(),
459-
query_location: None,
460-
})?;
461-
let first = args.first().ok_or_else(|| Error::InvalidPipelineError {
462-
cause: "coalesce internal error: missing first argument".into(),
463-
query_location: None,
464-
})?;
465-
let first = first.clone();
466-
let mut builder = when(first.clone().is_not_null(), first);
467-
for a in args.into_iter().skip(1) {
468-
builder = builder.when(a.clone().is_not_null(), a);
469-
}
470-
builder.otherwise(last).map_err(Error::from)
471-
}
472-
}
473-
}
474-
475457
fn plan_binary_math_expr(
476458
&self,
477459
binary_math_expr: &BinaryMathematicalScalarExpression,

0 commit comments

Comments
 (0)