Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
92 changes: 80 additions & 12 deletions rust/otap-dataflow/crates/opl/src/parser/expression.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,12 @@ use std::sync::LazyLock;

use data_engine_expressions::{
AndLogicalExpression, BinaryMathematicalScalarExpression, BooleanScalarExpression,
CaptureTextScalarExpression, CollectionScalarExpression, CombineScalarExpression,
ContainsLogicalExpression, DateTimeScalarExpression, DoubleScalarExpression, DoubleValue,
EqualToLogicalExpression, Expression, GetRecordTypeScalarExpression, GetTypeScalarExpression,
GreaterThanLogicalExpression, GreaterThanOrEqualToLogicalExpression, IntegerScalarExpression,
IntegerValue, InvokeFunctionArgument, InvokeFunctionScalarExpression, JoinTextScalarExpression,
CaptureTextScalarExpression, CoalesceScalarExpression, CollectionScalarExpression,
CombineScalarExpression, ContainsLogicalExpression, DateTimeScalarExpression,
DoubleScalarExpression, DoubleValue, EqualToLogicalExpression, Expression,
GetRecordTypeScalarExpression, GetTypeScalarExpression, GreaterThanLogicalExpression,
GreaterThanOrEqualToLogicalExpression, IntegerScalarExpression, IntegerValue,
InvokeFunctionArgument, InvokeFunctionScalarExpression, JoinTextScalarExpression,
ListScalarExpression, LogicalExpression, MatchesLogicalExpression, MathScalarExpression,
NotLogicalExpression, NullScalarExpression, OrLogicalExpression, QueryLocation,
RegexScalarExpression, ReplaceTextScalarExpression, ScalarExpression, SliceScalarExpression,
Expand Down Expand Up @@ -787,10 +788,8 @@ pub(crate) fn parse_tagged_literal(

/// Parses invocation of function
///
/// Currently we only support two known function `matches` and `contains`, both of which
/// take two arguments and return a boolean value. In the future, our function library
/// may be expanded to include more functions with various signatures and user defined
/// functions, so the implementation of this function will be updated accordingly.
/// Parses built-in scalar and logical functions (`contains`, `matches`, `concat`, `coalesce`,
/// `substring`, and others) as well as user-defined pipeline functions.
fn parse_function_call(
rule: Pair<'_, Rule>,
pipeline_builder: &dyn PipelineBuilder,
Expand Down Expand Up @@ -882,6 +881,21 @@ fn parse_function_call(
),
))
.into()),
"coalesce" => {
if args.len() < 2 {
return Err(ParserError::SyntaxError(
query_location,
format!(
"Function '{fn_name}' expects at least 2 arguments, got {}",
args.len()
),
));
}
Ok(
ScalarExpression::Coalesce(CoalesceScalarExpression::new(query_location, args))
.into(),
)
}
"join" | "concat_ws" => {
if args.is_empty() {
return Err(ParserError::SyntaxError(
Expand Down Expand Up @@ -1035,9 +1049,9 @@ mod test {

use data_engine_expressions::{
AndLogicalExpression, BinaryMathematicalScalarExpression, BooleanScalarExpression,
CaptureTextScalarExpression, CollectionScalarExpression, CombineScalarExpression,
ContainsLogicalExpression, DateTimeScalarExpression, DoubleScalarExpression,
EqualToLogicalExpression, GreaterThanLogicalExpression,
CaptureTextScalarExpression, CoalesceScalarExpression, CollectionScalarExpression,
CombineScalarExpression, ContainsLogicalExpression, DateTimeScalarExpression,
DoubleScalarExpression, EqualToLogicalExpression, GreaterThanLogicalExpression,
GreaterThanOrEqualToLogicalExpression, IntegerScalarExpression, JoinTextScalarExpression,
ListScalarExpression, LogicalExpression, MatchesLogicalExpression, MathScalarExpression,
NotLogicalExpression, NullScalarExpression, OrLogicalExpression, PipelineFunction,
Expand Down Expand Up @@ -2153,6 +2167,51 @@ mod test {
assert_eq!(result, expected);
}

#[test]
fn test_parse_coalesce_function_call() {
let input = "coalesce(attributes[\"x\"], attributes[\"y\"], \"hello\")";
let mut rules = OplPestParser::parse(Rule::member_expression, input).unwrap();
assert_eq!(rules.len(), 1);

let result: ScalarExpression =
parse_member_expression(rules.next().unwrap(), default_pipeline_builder().as_ref())
.unwrap()
.into();

let expected = ScalarExpression::Coalesce(CoalesceScalarExpression::new(
QueryLocation::new_fake(),
vec![
ScalarExpression::Source(SourceScalarExpression::new(
QueryLocation::new_fake(),
ValueAccessor::new_with_selectors(vec![
ScalarExpression::Static(StaticScalarExpression::String(
StringScalarExpression::new(QueryLocation::new_fake(), "attributes"),
)),
ScalarExpression::Static(StaticScalarExpression::String(
StringScalarExpression::new(QueryLocation::new_fake(), "x"),
)),
]),
)),
ScalarExpression::Source(SourceScalarExpression::new(
QueryLocation::new_fake(),
ValueAccessor::new_with_selectors(vec![
ScalarExpression::Static(StaticScalarExpression::String(
StringScalarExpression::new(QueryLocation::new_fake(), "attributes"),
)),
ScalarExpression::Static(StaticScalarExpression::String(
StringScalarExpression::new(QueryLocation::new_fake(), "y"),
)),
]),
)),
ScalarExpression::Static(StaticScalarExpression::String(
StringScalarExpression::new(QueryLocation::new_fake(), "hello"),
)),
],
));

assert_eq!(result, expected);
}

#[test]
fn test_parse_concat_with_delimiter_function_call() {
// "join" is alias for "concat_ws"
Expand Down Expand Up @@ -2339,6 +2398,15 @@ mod test {
);
}

#[test]
fn test_parse_coalesce_function_call_wrong_arity() {
let err = parse_known_func_with_args("coalesce", &["\"only\""]).unwrap_err();
assert_eq!(
err.to_string(),
"Function 'coalesce' expects at least 2 arguments, got 1".to_string(),
);
}

#[test]
fn parse_catches_invalid_tag() {
let input = "test\"hello\"";
Expand Down
111 changes: 105 additions & 6 deletions rust/otap-dataflow/crates/query-engine/src/pipeline/expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,11 +51,11 @@ use arrow::compute::kernels::cmp::eq;
use arrow::datatypes::{DataType, Field, Schema};
use data_engine_expressions::{
BinaryMathematicalScalarExpression, BooleanValue, CaptureTextScalarExpression,
CollectionScalarExpression, CombineScalarExpression, DoubleValue, Expression, IntegerValue,
InvokeFunctionArgument, InvokeFunctionScalarExpression, JoinTextScalarExpression,
MathScalarExpression, PipelineFunction, PipelineFunctionImplementation,
ReplaceTextScalarExpression, ScalarExpression, StaticScalarExpression, StringScalarExpression,
StringValue, TextScalarExpression,
CoalesceScalarExpression, CollectionScalarExpression, CombineScalarExpression, DoubleValue,
Expression, IntegerValue, InvokeFunctionArgument, InvokeFunctionScalarExpression,
JoinTextScalarExpression, MathScalarExpression, PipelineFunction,
PipelineFunctionImplementation, ReplaceTextScalarExpression, ScalarExpression,
StaticScalarExpression, StringScalarExpression, StringValue, TextScalarExpression,
};
use datafusion::common::DFSchema;
use datafusion::functions::core::expr_ext::FieldAccessor;
Expand All @@ -67,7 +67,7 @@ use datafusion::functions::math::log10;
use datafusion::functions::string::{concat, concat_ws, lower, ltrim, replace, rtrim, upper, uuid};
use datafusion::logical_expr::expr::ScalarFunction;
use datafusion::logical_expr::{
BinaryExpr, ColumnarValue, Expr, Operator, ScalarUDF, cast, col, lit,
BinaryExpr, ColumnarValue, Expr, Operator, ScalarUDF, cast, col, lit, when,
};
use datafusion::physical_expr::{PhysicalExprRef, create_physical_expr};
use datafusion::prelude::SessionContext;
Expand Down Expand Up @@ -401,12 +401,69 @@ impl ExprLogicalPlanner {
})
}
ScalarExpression::Text(text) => self.plan_text_expr(text, functions),
ScalarExpression::Coalesce(coalesce_expr) => {
self.plan_coalesce_expr(coalesce_expr, functions)
}
other_expr => Err(Error::NotYetSupportedError {
message: format!("expression not yet supported {other_expr:?}"),
}),
}
}

fn plan_coalesce_expr(
&self,
coalesce_expr: &CoalesceScalarExpression,
functions: &[PipelineFunction],
) -> Result<ScopedLogicalExpr> {
let (df_args, source_scope, _requires_dict_downcast) =
self.plan_function_args(coalesce_expr.get_expressions().iter(), functions)?;

// DataFusion's `coalesce` UDF is simplified to CASE during logical optimization and does not
// support direct physical evaluation. Mirror that rewrite here (same shape as
// `CoalesceFunc::simplify` in Apache DataFusion).
let case_expr = Self::coalesce_args_to_case_expr(df_args)?;

Ok(ScopedLogicalExpr {
logical_expr: case_expr,
expr_type: ExprLogicalType::AnyValue,
source: source_scope,
// Like `concat`, mixed attribute columns (often dictionary-encoded) and literals need
// dictionary downcasting before CASE can build a single array.
requires_dict_downcast: true,
})
}

/// Rewrites `coalesce(e1, …, eN)` to chained `WHEN eK IS NOT NULL THEN eK … ELSE eN`,
/// matching DataFusion's `coalesce` simplification.
fn coalesce_args_to_case_expr(mut args: Vec<Expr>) -> Result<Expr> {
match args.len() {
0 => Err(Error::InvalidPipelineError {
cause: "coalesce requires at least one argument".into(),
query_location: None,
}),
1 => Ok(args.pop().ok_or_else(|| Error::InvalidPipelineError {
cause: "coalesce internal error: empty args after len check".into(),
query_location: None,
})?),
_ => {
let last = args.pop().ok_or_else(|| Error::InvalidPipelineError {
cause: "coalesce internal error: missing last argument".into(),
query_location: None,
})?;
let first = args.first().ok_or_else(|| Error::InvalidPipelineError {
cause: "coalesce internal error: missing first argument".into(),
query_location: None,
})?;
let first = first.clone();
let mut builder = when(first.clone().is_not_null(), first);
for a in args.into_iter().skip(1) {
builder = builder.when(a.clone().is_not_null(), a);
}
builder.otherwise(last).map_err(Error::from)
}
}
}

fn plan_binary_math_expr(
&self,
binary_math_expr: &BinaryMathematicalScalarExpression,
Expand Down Expand Up @@ -3557,6 +3614,48 @@ mod test {
run_scalar_expr_success_test(input_expr, &otap_batch, expected_col);
}

/// `coalesce(attributes["k1"], "hello")` uses the literal when the attribute is absent.
#[test]
fn test_coalesce_attribute_with_string_fallback() {
use data_engine_expressions::CoalesceScalarExpression;

let attr_arg = ScalarExpression::Source(SourceScalarExpression::new(
QueryLocation::new_fake(),
ValueAccessor::new_with_selectors(vec![
ScalarExpression::Static(StaticScalarExpression::String(
StringScalarExpression::new(QueryLocation::new_fake(), ATTRIBUTES_FIELD_NAME),
)),
ScalarExpression::Static(StaticScalarExpression::String(
StringScalarExpression::new(QueryLocation::new_fake(), "k1"),
)),
]),
));

let fallback = ScalarExpression::Static(StaticScalarExpression::String(
StringScalarExpression::new(QueryLocation::new_fake(), "hello"),
));

let input_expr = ScalarExpression::Coalesce(CoalesceScalarExpression::new(
QueryLocation::new_fake(),
vec![attr_arg, fallback],
));

let logs = to_logs_data(vec![
LogRecord::build()
.severity_text("ERROR")
.attributes(vec![KeyValue::new("k1", AnyValue::new_string("from_attr"))])
.finish(),
LogRecord::build()
.severity_text("INFO")
.attributes(vec![KeyValue::new("k1", AnyValue { value: None })])
.finish(),
]);

let otap_batch = otlp_to_otap(&OtlpProtoMessage::Logs(logs));
let expected_col = Arc::new(StringArray::from(vec!["from_attr", "hello"]));
run_scalar_expr_success_test(input_expr, &otap_batch, expected_col);
}

/// Tests concat(attributes["k1"], attributes["k2"]) where both args come from different
/// attribute scopes (different keys from the same payload type). This triggers a multi-join
/// because the data scopes differ (different filtered rows).
Expand Down