Skip to content

Commit a5eb912

Browse files
cj-zhukovSergey Zhukov
andauthored
Consolidate udf examples (#18142) (#18493)
## Which issue does this PR close? <!-- We generally require a GitHub issue to be filed for all bug fixes and enhancements and this helps us generate change logs for our releases. You can link an issue to this PR using the GitHub syntax. For example `Closes #123` indicates that this PR will close issue #123. --> - part of ##18142. ## Rationale for this change This PR is for consolidating all the `udf` examples into a single example binary. We are agreed on the pattern and we can apply it to the remaining examples <!-- Why are you proposing this change? If this is already explained clearly in the issue then this section is not needed. Explaining clearly why changes are proposed helps reviewers understand your changes and offer better suggestions for fixes. --> ## What changes are included in this PR? <!-- There is no need to duplicate the description in the issue here but it is sometimes worth providing a summary of the individual changes in this PR. --> ## Are these changes tested? <!-- We typically require tests for all PRs in order to: 1. Prevent the code from being accidentally broken by subsequent changes 2. Serve as another way to document the expected behavior of the code If tests are not included in your PR, please explain why (for example, are they covered by existing tests)? --> ## Are there any user-facing changes? <!-- If there are user-facing changes then we may require documentation to be updated before approving the PR. --> <!-- If there are any breaking changes to public APIs, please add the `api change` label. --> --------- Co-authored-by: Sergey Zhukov <[email protected]>
1 parent 1ac6625 commit a5eb912

File tree

10 files changed

+183
-49
lines changed

10 files changed

+183
-49
lines changed

datafusion-examples/README.md

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -46,11 +46,11 @@ cargo run --example dataframe
4646

4747
## Single Process
4848

49-
- [`advanced_udaf.rs`](examples/advanced_udaf.rs): Define and invoke a more complicated User Defined Aggregate Function (UDAF)
50-
- [`advanced_udf.rs`](examples/advanced_udf.rs): Define and invoke a more complicated User Defined Scalar Function (UDF)
51-
- [`advanced_udwf.rs`](examples/advanced_udwf.rs): Define and invoke a more complicated User Defined Window Function (UDWF)
49+
- [`examples/udf/advanced_udaf.rs`](examples/udf/advanced_udaf.rs): Define and invoke a more complicated User Defined Aggregate Function (UDAF)
50+
- [`examples/udf/advanced_udf.rs`](examples/udf/advanced_udf.rs): Define and invoke a more complicated User Defined Scalar Function (UDF)
51+
- [`examples/udf/advanced_udwf.rs`](examples/udf/advanced_udwf.rs): Define and invoke a more complicated User Defined Window Function (UDWF)
5252
- [`advanced_parquet_index.rs`](examples/advanced_parquet_index.rs): Creates a detailed secondary index that covers the contents of several parquet files
53-
- [`async_udf.rs`](examples/async_udf.rs): Define and invoke an asynchronous User Defined Scalar Function (UDF)
53+
- [`examples/udf/async_udf.rs`](examples/udf/async_udf.rs): Define and invoke an asynchronous User Defined Scalar Function (UDF)
5454
- [`analyzer_rule.rs`](examples/analyzer_rule.rs): Use a custom AnalyzerRule to change a query's semantics (row level access control)
5555
- [`catalog.rs`](examples/catalog.rs): Register the table into a custom catalog
5656
- [`composed_extension_codec`](examples/composed_extension_codec.rs): Example of using multiple extension codecs for serialization / deserialization
@@ -83,9 +83,10 @@ cargo run --example dataframe
8383
- [`query-http-csv.rs`](examples/query-http-csv.rs): Configure `object_store` and run a query against files vi HTTP
8484
- [`regexp.rs`](examples/regexp.rs): Examples of using regular expression functions
8585
- [`remote_catalog.rs`](examples/regexp.rs): Examples of interfacing with a remote catalog (e.g. over a network)
86-
- [`simple_udaf.rs`](examples/simple_udaf.rs): Define and invoke a User Defined Aggregate Function (UDAF)
87-
- [`simple_udf.rs`](examples/simple_udf.rs): Define and invoke a User Defined Scalar Function (UDF)
88-
- [`simple_udfw.rs`](examples/simple_udwf.rs): Define and invoke a User Defined Window Function (UDWF)
86+
- [`examples/udf/simple_udaf.rs`](examples/udf/simple_udaf.rs): Define and invoke a User Defined Aggregate Function (UDAF)
87+
- [`examples/udf/simple_udf.rs`](examples/udf/simple_udf.rs): Define and invoke a User Defined Scalar Function (UDF)
88+
- [`examples/udf/simple_udtf.rs`](examples/udf/simple_udtf.rs): Define and invoke a User Defined Table Function (UDTF)
89+
- [`examples/udf/simple_udfw.rs`](examples/udf/simple_udwf.rs): Define and invoke a User Defined Window Function (UDWF)
8990
- [`sql_analysis.rs`](examples/sql_analysis.rs): Analyse SQL queries with DataFusion structures
9091
- [`sql_frontend.rs`](examples/sql_frontend.rs): Create LogicalPlans (only) from sql strings
9192
- [`sql_dialect.rs`](examples/sql_dialect.rs): Example of implementing a custom SQL dialect on top of `DFParser`

datafusion-examples/examples/advanced_udaf.rs renamed to datafusion-examples/examples/udf/advanced_udaf.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -469,8 +469,9 @@ fn create_context() -> Result<SessionContext> {
469469
Ok(ctx)
470470
}
471471

472-
#[tokio::main]
473-
async fn main() -> Result<()> {
472+
/// In this example we register `GeoMeanUdaf` and `SimplifiedGeoMeanUdaf`
473+
/// as user defined aggregate functions and invoke them via the DataFrame API and SQL
474+
pub async fn advanced_udaf() -> Result<()> {
474475
let ctx = create_context()?;
475476

476477
let geo_mean_udf = AggregateUDF::from(GeoMeanUdaf::new());

datafusion-examples/examples/advanced_udf.rs renamed to datafusion-examples/examples/udf/advanced_udf.rs

Lines changed: 27 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -245,10 +245,35 @@ fn maybe_pow_in_place(base: f64, exp_array: ArrayRef) -> Result<ArrayRef> {
245245
}
246246
}
247247

248+
/// create local execution context with an in-memory table:
249+
///
250+
/// ```text
251+
/// +-----+-----+
252+
/// | a | b |
253+
/// +-----+-----+
254+
/// | 2.1 | 1.0 |
255+
/// | 3.1 | 2.0 |
256+
/// | 4.1 | 3.0 |
257+
/// | 5.1 | 4.0 |
258+
/// +-----+-----+
259+
/// ```
260+
fn create_context() -> Result<SessionContext> {
261+
// define data.
262+
let a: ArrayRef = Arc::new(Float32Array::from(vec![2.1, 3.1, 4.1, 5.1]));
263+
let b: ArrayRef = Arc::new(Float64Array::from(vec![1.0, 2.0, 3.0, 4.0]));
264+
let batch = RecordBatch::try_from_iter(vec![("a", a), ("b", b)])?;
265+
266+
// declare a new context. In Spark API, this corresponds to a new SparkSession
267+
let ctx = SessionContext::new();
268+
269+
// declare a table in memory. In Spark API, this corresponds to createDataFrame(...).
270+
ctx.register_batch("t", batch)?;
271+
Ok(ctx)
272+
}
273+
248274
/// In this example we register `PowUdf` as a user defined function
249275
/// and invoke it via the DataFrame API and SQL
250-
#[tokio::main]
251-
async fn main() -> Result<()> {
276+
pub async fn advanced_udf() -> Result<()> {
252277
let ctx = create_context()?;
253278

254279
// create the UDF
@@ -295,29 +320,3 @@ async fn main() -> Result<()> {
295320

296321
Ok(())
297322
}
298-
299-
/// create local execution context with an in-memory table:
300-
///
301-
/// ```text
302-
/// +-----+-----+
303-
/// | a | b |
304-
/// +-----+-----+
305-
/// | 2.1 | 1.0 |
306-
/// | 3.1 | 2.0 |
307-
/// | 4.1 | 3.0 |
308-
/// | 5.1 | 4.0 |
309-
/// +-----+-----+
310-
/// ```
311-
fn create_context() -> Result<SessionContext> {
312-
// define data.
313-
let a: ArrayRef = Arc::new(Float32Array::from(vec![2.1, 3.1, 4.1, 5.1]));
314-
let b: ArrayRef = Arc::new(Float64Array::from(vec![1.0, 2.0, 3.0, 4.0]));
315-
let batch = RecordBatch::try_from_iter(vec![("a", a), ("b", b)])?;
316-
317-
// declare a new context. In Spark API, this corresponds to a new SparkSession
318-
let ctx = SessionContext::new();
319-
320-
// declare a table in memory. In Spark API, this corresponds to createDataFrame(...).
321-
ctx.register_batch("t", batch)?;
322-
Ok(ctx)
323-
}

datafusion-examples/examples/advanced_udwf.rs renamed to datafusion-examples/examples/udf/advanced_udwf.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -236,8 +236,9 @@ async fn create_context() -> Result<SessionContext> {
236236
Ok(ctx)
237237
}
238238

239-
#[tokio::main]
240-
async fn main() -> Result<()> {
239+
/// In this example we register `SmoothItUdf` as user defined window function
240+
/// and invoke it via the DataFrame API and SQL
241+
pub async fn advanced_udwf() -> Result<()> {
241242
let ctx = create_context().await?;
242243
let smooth_it = WindowUDF::from(SmoothItUdf::new());
243244
ctx.register_udwf(smooth_it.clone());

datafusion-examples/examples/async_udf.rs renamed to datafusion-examples/examples/udf/async_udf.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -38,8 +38,9 @@ use datafusion::prelude::{SessionConfig, SessionContext};
3838
use std::any::Any;
3939
use std::sync::Arc;
4040

41-
#[tokio::main]
42-
async fn main() -> Result<()> {
41+
/// In this example we register `AskLLM` as an asynchronous user defined function
42+
/// and invoke it via the DataFrame API and SQL
43+
pub async fn async_udf() -> Result<()> {
4344
// Use a hard coded parallelism level of 4 so the explain plan
4445
// is consistent across machines.
4546
let config = SessionConfig::new().with_target_partitions(4);
Lines changed: 133 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,133 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
//! # User-Defined Functions Examples
19+
//!
20+
//! These examples demonstrate user-defined functions in DataFusion.
21+
//!
22+
//! Each subcommand runs a corresponding example:
23+
//! - `adv_udaf` — user defined aggregate function example
24+
//! - `adv_udf` — user defined scalar function example
25+
//! - `adv_udwf` — user defined window function example
26+
//! - `async_udf` — asynchronous user defined function example
27+
//! - `udaf` — simple user defined aggregate function example
28+
//! - `udf` — simple user defined scalar function example
29+
//! - `udtf` — simple user defined table function example
30+
//! - `udwf` — simple user defined window function example
31+
32+
mod advanced_udaf;
33+
mod advanced_udf;
34+
mod advanced_udwf;
35+
mod async_udf;
36+
mod simple_udaf;
37+
mod simple_udf;
38+
mod simple_udtf;
39+
mod simple_udwf;
40+
41+
use std::str::FromStr;
42+
43+
use datafusion::error::{DataFusionError, Result};
44+
45+
enum ExampleKind {
46+
AdvUdaf,
47+
AdvUdf,
48+
AdvUdwf,
49+
AsyncUdf,
50+
Udf,
51+
Udaf,
52+
Udwf,
53+
Udtf,
54+
}
55+
56+
impl AsRef<str> for ExampleKind {
57+
fn as_ref(&self) -> &str {
58+
match self {
59+
Self::AdvUdaf => "adv_udaf",
60+
Self::AdvUdf => "adv_udf",
61+
Self::AdvUdwf => "adv_udwf",
62+
Self::AsyncUdf => "async_udf",
63+
Self::Udf => "udf",
64+
Self::Udaf => "udaf",
65+
Self::Udwf => "udwt",
66+
Self::Udtf => "udtf",
67+
}
68+
}
69+
}
70+
71+
impl FromStr for ExampleKind {
72+
type Err = DataFusionError;
73+
74+
fn from_str(s: &str) -> Result<Self> {
75+
match s {
76+
"adv_udaf" => Ok(Self::AdvUdaf),
77+
"adv_udf" => Ok(Self::AdvUdf),
78+
"adv_udwf" => Ok(Self::AdvUdwf),
79+
"async_udf" => Ok(Self::AsyncUdf),
80+
"udaf" => Ok(Self::Udaf),
81+
"udf" => Ok(Self::Udf),
82+
"udtf" => Ok(Self::Udtf),
83+
"udwf" => Ok(Self::Udwf),
84+
_ => Err(DataFusionError::Execution(format!("Unknown example: {s}"))),
85+
}
86+
}
87+
}
88+
89+
impl ExampleKind {
90+
const ALL: [Self; 8] = [
91+
Self::AdvUdaf,
92+
Self::AdvUdf,
93+
Self::AdvUdwf,
94+
Self::AsyncUdf,
95+
Self::Udaf,
96+
Self::Udf,
97+
Self::Udtf,
98+
Self::Udwf,
99+
];
100+
101+
const EXAMPLE_NAME: &str = "udf";
102+
103+
fn variants() -> Vec<&'static str> {
104+
Self::ALL.iter().map(|x| x.as_ref()).collect()
105+
}
106+
}
107+
108+
#[tokio::main]
109+
async fn main() -> Result<()> {
110+
let usage = format!(
111+
"Usage: cargo run --example {} -- [{}]",
112+
ExampleKind::EXAMPLE_NAME,
113+
ExampleKind::variants().join("|")
114+
);
115+
116+
let arg = std::env::args().nth(1).ok_or_else(|| {
117+
eprintln!("{usage}");
118+
DataFusionError::Execution("Missing argument".to_string())
119+
})?;
120+
121+
match arg.parse::<ExampleKind>()? {
122+
ExampleKind::AdvUdaf => advanced_udaf::advanced_udaf().await?,
123+
ExampleKind::AdvUdf => advanced_udf::advanced_udf().await?,
124+
ExampleKind::AdvUdwf => advanced_udwf::advanced_udwf().await?,
125+
ExampleKind::AsyncUdf => async_udf::async_udf().await?,
126+
ExampleKind::Udaf => simple_udaf::simple_udaf().await?,
127+
ExampleKind::Udf => simple_udf::simple_udf().await?,
128+
ExampleKind::Udtf => simple_udtf::simple_udtf().await?,
129+
ExampleKind::Udwf => simple_udwf::simple_udwf().await?,
130+
}
131+
132+
Ok(())
133+
}

datafusion-examples/examples/simple_udaf.rs renamed to datafusion-examples/examples/udf/simple_udaf.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -135,8 +135,9 @@ impl Accumulator for GeometricMean {
135135
}
136136
}
137137

138-
#[tokio::main]
139-
async fn main() -> Result<()> {
138+
/// In this example we register `GeometricMean`
139+
/// as user defined aggregate function and invoke it via the DataFrame API and SQL
140+
pub async fn simple_udaf() -> Result<()> {
140141
let ctx = create_context()?;
141142

142143
// here is where we define the UDAF. We also declare its signature:

datafusion-examples/examples/simple_udf.rs renamed to datafusion-examples/examples/udf/simple_udf.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -57,8 +57,7 @@ fn create_context() -> Result<SessionContext> {
5757
}
5858

5959
/// In this example we will declare a single-type, single return type UDF that exponentiates f64, a^b
60-
#[tokio::main]
61-
async fn main() -> Result<()> {
60+
pub async fn simple_udf() -> Result<()> {
6261
let ctx = create_context()?;
6362

6463
// First, declare the actual implementation of the calculation

datafusion-examples/examples/simple_udtf.rs renamed to datafusion-examples/examples/udf/simple_udtf.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -42,8 +42,7 @@ use std::sync::Arc;
4242
// 3. Register the function using [`SessionContext::register_udtf`]
4343

4444
/// This example demonstrates how to register a TableFunction
45-
#[tokio::main]
46-
async fn main() -> Result<()> {
45+
pub async fn simple_udtf() -> Result<()> {
4746
// create local execution context
4847
let ctx = SessionContext::new();
4948

datafusion-examples/examples/simple_udwf.rs renamed to datafusion-examples/examples/udf/simple_udwf.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -42,8 +42,7 @@ async fn create_context() -> Result<SessionContext> {
4242
}
4343

4444
/// In this example we will declare a user defined window function that computes a moving average and then run it using SQL
45-
#[tokio::main]
46-
async fn main() -> Result<()> {
45+
pub async fn simple_udwf() -> Result<()> {
4746
let ctx = create_context().await?;
4847

4948
// here is where we define the UDWF. We also declare its signature:

0 commit comments

Comments
 (0)