Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions rust/otap-dataflow/crates/query-engine/src/consts.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,5 +8,7 @@ pub(crate) const SCOPE_FIELD_NAME: &str = "instrumentation_scope";
pub(crate) const VALUE_FIELD_NAME: &str = "value";

pub(crate) const ENCODE_FUNC_NAME: &str = "encode";
pub(crate) const ENDS_WITH_FUNC_NAME: &str = "ends_with";
pub(crate) const REGEXP_SUBSTR_FUNC_NAME: &str = "regexp_substr";
pub(crate) const SHA256_FUNC_NAME: &str = "sha256";
pub(crate) const STARTS_WITH_FUNC_NAME: &str = "starts_with";
7 changes: 6 additions & 1 deletion rust/otap-dataflow/crates/query-engine/src/parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,10 @@ use data_engine_expressions::{
};
use data_engine_parser_abstractions::ParserOptions;

use crate::consts::{ENCODE_FUNC_NAME, REGEXP_SUBSTR_FUNC_NAME, SHA256_FUNC_NAME};
use crate::consts::{
ENCODE_FUNC_NAME, ENDS_WITH_FUNC_NAME, REGEXP_SUBSTR_FUNC_NAME, SHA256_FUNC_NAME,
STARTS_WITH_FUNC_NAME,
};

/// Create parser options that can be used when parsing an expression that will be executed with
/// this query engine
Expand All @@ -32,6 +35,8 @@ pub fn default_parser_options() -> ParserOptions {
//
.with_external_function(SHA256_FUNC_NAME, param_placeholders(1), None)
.with_external_function(ENCODE_FUNC_NAME, param_placeholders(2), None)
.with_external_function(STARTS_WITH_FUNC_NAME, param_placeholders(2), None)
.with_external_function(ENDS_WITH_FUNC_NAME, param_placeholders(2), None)
.with_external_function(
REGEXP_SUBSTR_FUNC_NAME,
vec![
Expand Down
122 changes: 119 additions & 3 deletions rust/otap-dataflow/crates/query-engine/src/pipeline/expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ use datafusion::common::DFSchema;
use datafusion::functions::core::expr_ext::FieldAccessor;
use datafusion::functions::crypto::sha256;
use datafusion::functions::encoding::encode;
use datafusion::functions::string::{concat, concat_ws, replace};
use datafusion::functions::string::{concat, concat_ws, ends_with, replace, starts_with};
use datafusion::logical_expr::expr::ScalarFunction;
use datafusion::logical_expr::{
BinaryExpr, ColumnarValue, Expr, Operator, ScalarUDF, cast, col, lit,
Expand All @@ -76,7 +76,10 @@ use otap_df_pdata::arrays::{
use otap_df_pdata::proto::opentelemetry::arrow::v1::ArrowPayloadType;
use otap_df_pdata::schema::consts;

use crate::consts::{ENCODE_FUNC_NAME, REGEXP_SUBSTR_FUNC_NAME, SHA256_FUNC_NAME};
use crate::consts::{
ENCODE_FUNC_NAME, ENDS_WITH_FUNC_NAME, REGEXP_SUBSTR_FUNC_NAME, SHA256_FUNC_NAME,
STARTS_WITH_FUNC_NAME,
};
use crate::error::{Error, Result};
use crate::pipeline::expr::join::{join, multi_join};
use crate::pipeline::expr::types::{
Expand Down Expand Up @@ -823,10 +826,12 @@ impl DataFusionFunctionDef {
// upstream in datafusion_functions)
Some(match func_name {
ENCODE_FUNC_NAME => Self::new(encode(), ExprLogicalType::String, false, None),
ENDS_WITH_FUNC_NAME => Self::new(ends_with(), ExprLogicalType::Boolean, true, None),
REGEXP_SUBSTR_FUNC_NAME => {
Self::new(regexp_substr(), ExprLogicalType::String, false, None)
}
SHA256_FUNC_NAME => Self::new(sha256(), ExprLogicalType::Binary, true, None),
STARTS_WITH_FUNC_NAME => Self::new(starts_with(), ExprLogicalType::Boolean, true, None),
_ => return None,
})
}
Expand Down Expand Up @@ -1290,7 +1295,8 @@ impl PhysicalExprEvalResult {
mod test {
use super::*;
use arrow::array::{
BinaryArray, DictionaryArray, Float64Array, Int32Array, Int64Array, StructArray, UInt8Array,
BinaryArray, BooleanArray, DictionaryArray, Float64Array, Int32Array, Int64Array,
StructArray, UInt8Array,
};
use data_engine_expressions::{
BinaryMathematicalScalarExpression, IntegerScalarExpression,
Expand Down Expand Up @@ -3385,6 +3391,116 @@ mod test {
assert_eq!(result_arr.as_ref(), &expected);
}

#[test]
fn test_function_invocation_starts_with() {
let input_expr = ScalarExpression::InvokeFunction(InvokeFunctionScalarExpression::new(
QueryLocation::new_fake(),
None,
0,
vec![
InvokeFunctionArgument::Scalar(ScalarExpression::Source(
SourceScalarExpression::new(
QueryLocation::new_fake(),
ValueAccessor::new_with_selectors(vec![ScalarExpression::Static(
StaticScalarExpression::String(StringScalarExpression::new(
QueryLocation::new_fake(),
"event_name",
)),
)]),
),
)),
InvokeFunctionArgument::Scalar(ScalarExpression::Static(
StaticScalarExpression::String(StringScalarExpression::new(
QueryLocation::new_fake(),
"ev",
)),
)),
],
));

let functions = [PipelineFunction::new_external("starts_with", vec![], None)];

let logs = to_logs_data(vec![
LogRecord::build().finish(),
LogRecord::build().event_name("event1").finish(),
LogRecord::build().event_name("other").finish(),
]);

let otap_batch = otlp_to_otap(&OtlpProtoMessage::Logs(logs));

let planner = ExprLogicalPlanner {};
let logical_expr = planner.plan_scalar_expr(&input_expr, &functions).unwrap();
let mut physical_expr = logical_expr.into_physical().unwrap();
let session_ctx = Pipeline::create_session_context();
let result = physical_expr.execute(&otap_batch, &session_ctx).unwrap();
let result_vals = result.map(|result| result.values);
let result_arr = match &result_vals {
Some(ColumnarValue::Array(arr)) => arr,
otherwise => {
panic!("expected arr, got scalar {otherwise:?}")
}
};

let expected = BooleanArray::from_iter([None, Some(true), Some(false)]);

assert_eq!(result_arr.as_ref(), &expected);
}

#[test]
fn test_function_invocation_ends_with() {
let input_expr = ScalarExpression::InvokeFunction(InvokeFunctionScalarExpression::new(
QueryLocation::new_fake(),
None,
0,
vec![
InvokeFunctionArgument::Scalar(ScalarExpression::Source(
SourceScalarExpression::new(
QueryLocation::new_fake(),
ValueAccessor::new_with_selectors(vec![ScalarExpression::Static(
StaticScalarExpression::String(StringScalarExpression::new(
QueryLocation::new_fake(),
"event_name",
)),
)]),
),
)),
InvokeFunctionArgument::Scalar(ScalarExpression::Static(
StaticScalarExpression::String(StringScalarExpression::new(
QueryLocation::new_fake(),
"1",
)),
)),
],
));

let functions = [PipelineFunction::new_external("ends_with", vec![], None)];

let logs = to_logs_data(vec![
LogRecord::build().finish(),
LogRecord::build().event_name("event1").finish(),
LogRecord::build().event_name("event2").finish(),
]);

let otap_batch = otlp_to_otap(&OtlpProtoMessage::Logs(logs));

let planner = ExprLogicalPlanner {};
let logical_expr = planner.plan_scalar_expr(&input_expr, &functions).unwrap();
let mut physical_expr = logical_expr.into_physical().unwrap();
let session_ctx = Pipeline::create_session_context();
let result = physical_expr.execute(&otap_batch, &session_ctx).unwrap();
let result_vals = result.map(|result| result.values);
let result_arr = match &result_vals {
Some(ColumnarValue::Array(arr)) => arr,
otherwise => {
panic!("expected arr, got scalar {otherwise:?}")
}
};

let expected = BooleanArray::from_iter([None, Some(true), Some(false)]);

assert_eq!(result_arr.as_ref(), &expected);
}

// ----- Tests for multi-scope function arguments (MultiJoin) -----

/// Tests concat(severity_text, attributes["k1"]) where args come from Root and Attributes
Expand Down
123 changes: 123 additions & 0 deletions rust/otap-dataflow/crates/query-engine/src/pipeline/filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2656,6 +2656,129 @@ mod test {
.await;
}

#[tokio::test]
async fn test_filter_event_name_using_starts_with_opl() {
let input = vec![
LogRecord::build().event_name("hello world").finish(),
LogRecord::build().event_name("hello arrow").finish(),
LogRecord::build().event_name("world hello").finish(),
LogRecord::build().finish(),
LogRecord::build().event_name("hello").finish(),
];

let query = r#"logs | where starts_with(event_name, "hello")"#;
let result = exec_logs_pipeline::<OplParser>(query, to_logs_data(input.clone())).await;

assert_eq!(
&result.resource_logs[0].scope_logs[0].log_records,
&[input[0].clone(), input[1].clone(), input[4].clone()]
);

// column on the right
let query = r#"logs | where starts_with("hello world", event_name)"#;
let result = exec_logs_pipeline::<OplParser>(query, to_logs_data(input.clone())).await;

assert_eq!(
&result.resource_logs[0].scope_logs[0].log_records,
&[input[0].clone(), input[4].clone()]
);
}

#[tokio::test]
async fn test_filter_event_name_using_ends_with_opl() {
let input = vec![
LogRecord::build().event_name("hello world").finish(),
LogRecord::build().event_name("goodbye world").finish(),
LogRecord::build().event_name("hello arrow").finish(),
LogRecord::build().finish(),
LogRecord::build().event_name("world").finish(),
];

let query = r#"logs | where ends_with(event_name, "world")"#;
let result = exec_logs_pipeline::<OplParser>(query, to_logs_data(input.clone())).await;

assert_eq!(
&result.resource_logs[0].scope_logs[0].log_records,
&[input[0].clone(), input[1].clone(), input[4].clone()]
);

// column on the right
let query = r#"logs | where ends_with("hello world", event_name)"#;
let result = exec_logs_pipeline::<OplParser>(query, to_logs_data(input.clone())).await;

assert_eq!(
&result.resource_logs[0].scope_logs[0].log_records,
&[input[0].clone(), input[4].clone()]
);
}

#[tokio::test]
async fn test_filter_attrs_using_starts_with_opl() {
let log_records = vec![
LogRecord::build()
.attributes(vec![KeyValue::new(
"username",
AnyValue::new_string("albert"),
)])
.event_name("1")
.finish(),
LogRecord::build()
.attributes(vec![KeyValue::new(
"username",
AnyValue::new_string("alice"),
)])
.event_name("2")
.finish(),
LogRecord::build()
.attributes(vec![KeyValue::new("username", AnyValue::new_string("bob"))])
.event_name("3")
.finish(),
];

let query = r#"logs | where starts_with(attributes["username"], "al")"#;
let result =
exec_logs_pipeline::<OplParser>(query, to_logs_data(log_records.clone())).await;
assert_eq!(
&result.resource_logs[0].scope_logs[0].log_records,
&[log_records[0].clone(), log_records[1].clone()]
);
}

#[tokio::test]
async fn test_filter_attrs_using_ends_with_opl() {
let log_records = vec![
LogRecord::build()
.attributes(vec![KeyValue::new(
"filename",
AnyValue::new_string("report.pdf"),
)])
.event_name("1")
.finish(),
LogRecord::build()
.attributes(vec![KeyValue::new(
"filename",
AnyValue::new_string("notes.pdf"),
)])
.event_name("2")
.finish(),
LogRecord::build()
.attributes(vec![KeyValue::new(
"filename",
AnyValue::new_string("README.md"),
)])
.event_name("3")
.finish(),
];

let query = r#"logs | where ends_with(attributes["filename"], ".pdf")"#;
let result =
exec_logs_pipeline::<OplParser>(query, to_logs_data(log_records.clone())).await;
assert_eq!(
&result.resource_logs[0].scope_logs[0].log_records,
&[log_records[0].clone(), log_records[1].clone()]
);
}

async fn test_filter_matches_regex<P: Parser>(q1: &str, q2: &str) {
let log_records = vec![
LogRecord::build()
Expand Down
Loading