Skip to content

Add coalesce support in OPL parser and OTAP query-engine planner#2905

Open
Paramesh324 wants to merge 2 commits into
open-telemetry:mainfrom
Paramesh324:issue_2823
Open

Add coalesce support in OPL parser and OTAP query-engine planner#2905
Paramesh324 wants to merge 2 commits into
open-telemetry:mainfrom
Paramesh324:issue_2823

Conversation

@Paramesh324
Copy link
Copy Markdown

Change Summary

Add support for coalesce(...) in OPL / OTAP query-engine by:

  • Parsing coalesce calls in the OPL parser into ScalarExpression::Coalesce(CoalesceScalarExpression).
  • Planning ScalarExpression::Coalesce in the columnar query-engine expression planner.
  • Lowering coalesce arguments to a CASE/WHEN logical expression (DataFusion-compatible behavior) to ensure correct execution in the current planning/execution path.
  • Adding parser and planner/execution tests for valid usage and arity validation.

What issue does this PR close?

How are these changes tested?

  • Added OPL parser tests for:
    • valid coalesce(...) parsing
    • invalid arity (fewer than 2 args)
  • Added query-engine expr planner/execution test for coalesce fallback behavior.
  • Ran formatting and checks:
    • cargo fmt --all
    • cargo test -p otap-df-opl --lib
    • cargo test -p otap-df-query-engine --lib pipeline::expr::test
    • cargo clippy -p otap-df-opl -p otap-df-query-engine --all-targets -- -D warnings

Are there any user-facing changes?

Yes.
Users can now use coalesce(...) in OPL expressions for OTAP query-engine pipelines, e.g.:
coalesce(attributes["x"], attributes["y"], "hello")

@Paramesh324 Paramesh324 requested a review from a team as a code owner May 8, 2026 06:51
@github-actions github-actions Bot added rust Pull requests that update Rust code query-engine Query Engine / Transform related tasks query-engine-columnar Columnar query engine which uses DataFusion to process OTAP Batches opl-parser Work items related to OPL Parser labels May 8, 2026
@Paramesh324
Copy link
Copy Markdown
Author

@cijothomas @albertlockett Requesting to review this. Thanks!

@codecov
Copy link
Copy Markdown

codecov Bot commented May 8, 2026

Codecov Report

❌ Patch coverage is 97.07602% with 5 lines in your changes missing coverage. Please review.
✅ Project coverage is 86.03%. Comparing base (b645a26) to head (79c5d8e).

Additional details and impacted files
@@           Coverage Diff            @@
##             main    #2905    +/-   ##
========================================
  Coverage   86.03%   86.03%            
========================================
  Files         720      720            
  Lines      273264   273435   +171     
========================================
+ Hits       235095   235259   +164     
- Misses      37645    37652     +7     
  Partials      524      524            
Components Coverage Δ
otap-dataflow 87.20% <97.07%> (+<0.01%) ⬆️
query_abstraction 80.61% <ø> (ø)
query_engine 89.57% <ø> (ø)
otel-arrow-go 52.45% <ø> (ø)
quiver 92.25% <ø> (ø)
🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.
  • 📦 JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.

Copy link
Copy Markdown
Member

@albertlockett albertlockett left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like a good start @Paramesh324

Would you mind also adding a test in assign.rs, similar to what we've done for other function calls? For example:

async fn test_update_attr_to_upper_case_function_call<P: Parser>() {
let logs_data = to_logs_data(vec![
LogRecord::build()
.attributes(vec![KeyValue::new(
"attr",
AnyValue::new_string("hello world"),
)])
.finish(),
]);
let query = r#"logs | extend attributes["attr"] = upper_case(attributes["attr"])"#;
let pipeline_expr = P::parse_with_options(query, default_parser_options())
.unwrap()
.pipeline;
let mut pipeline = Pipeline::new(pipeline_expr);
let input = otlp_to_otap(&OtlpProtoMessage::Logs(logs_data));
let result = pipeline.execute(input).await.unwrap();
let OtlpProtoMessage::Logs(result_logs_data) = otap_to_otlp(&result) else {
panic!("invalid signal type");
};
let log_0 = &result_logs_data.resource_logs[0].scope_logs[0].log_records[0];
assert_eq!(
log_0.attributes,
vec![KeyValue::new("attr", AnyValue::new_string("HELLO WORLD"))]
);
}
#[tokio::test]
async fn test_update_attr_to_upper_case_function_call_opl_parser() {
test_update_attr_to_upper_case_function_call::<OplParser>().await
}
#[tokio::test]
async fn test_update_attr_to_upper_case_function_call_kql_parser() {
test_update_attr_to_upper_case_function_call::<KqlParser>().await
}

Maybe something like:

  async fn test_update_attr_coalesce_function_call<P: Parser>() {
        let logs_data = to_logs_data(vec![
            LogRecord::build()
                .attributes(vec![
                    KeyValue::new("attr1", AnyValue::new_string("X")),
                    KeyValue::new("attr2", AnyValue::new_string("Y")),
                ])
                .finish(),
            LogRecord::build()
                .attributes(vec![KeyValue::new("attr2", AnyValue::new_string("Z"))])
                .finish(),
            LogRecord::build().finish(),
        ]);

        let query = r#"logs | extend attributes["attr3"] = coalesce(attributes["attr1"], attributes["attr2"], "foo")"#;
        let pipeline_expr = P::parse_with_options(query, default_parser_options())
            .unwrap()
            .pipeline;
        let mut pipeline = Pipeline::new(pipeline_expr);

        let input = otlp_to_otap(&OtlpProtoMessage::Logs(logs_data));
        let result = pipeline.execute(input).await.unwrap();
        let OtlpProtoMessage::Logs(result_logs_data) = otap_to_otlp(&result) else {
            panic!("invalid signal type");
        };
        let log_0 = &result_logs_data.resource_logs[0].scope_logs[0].log_records[0];
        assert_eq!(
            log_0.attributes,
            vec![
                KeyValue::new("attr1", AnyValue::new_string("X")),
                KeyValue::new("attr2", AnyValue::new_string("Y")),
                KeyValue::new("attr3", AnyValue::new_string("X")),
            ]
        );
        let log_1 = &result_logs_data.resource_logs[0].scope_logs[0].log_records[1];
        assert_eq!(
            log_1.attributes,
            vec![
                KeyValue::new("attr2", AnyValue::new_string("Z")),
                KeyValue::new("attr3", AnyValue::new_string("Z")),
            ]
        );

        let log_2 = &result_logs_data.resource_logs[0].scope_logs[0].log_records[2];
        assert_eq!(
            log_2.attributes,
            vec![KeyValue::new("attr3", AnyValue::new_string("foo")),]
        );
    }

    #[tokio::test]
    async fn test_update_attr_coalesce_function_call_opl_parser() {
        test_update_attr_coalesce_function_call::<OplParser>().await
    }
    
    #[tokio::test]
    async fn test_update_attr_coalesce_function_call_kql_parser() {
        test_update_attr_coalesce_function_call::<KqlParser>().await
    }

Comment thread rust/otap-dataflow/crates/query-engine/src/pipeline/expr.rs Outdated
…≥2 args).

Plan Coalesce in ExprLogicalPlanner by lowering to the same CASE/WHEN shape
DataFusion uses for coalesce, avoiding the coalesce UDF’s physical-path error.
Set requires_dict_downcast for attribute + literal mixes.

Add parser and expr execution tests.

Signed-off-by: Parameshwaran Krishnasamy <Parameshwaran.K@ibm.com>
…rite.

Add assign.rs integration tests for coalesce via extend (OPL + KQL).
Adjust fixtures so MultiJoin row alignment matches explicit-null attrs.

Signed-off-by: Parameshwaran Krishnasamy <Parameshwaran.K@ibm.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

opl-parser Work items related to OPL Parser query-engine Query Engine / Transform related tasks query-engine-columnar Columnar query engine which uses DataFusion to process OTAP Batches rust Pull requests that update Rust code

Projects

Status: No status

Development

Successfully merging this pull request may close these issues.

[OPL/OTAP query-engine functions] coalesce

2 participants