From 8ba58281fea195cfdba90a48b9c17841b66831ca Mon Sep 17 00:00:00 2001 From: "Heres, Daniel" Date: Sat, 16 Jan 2021 12:47:34 +0100 Subject: [PATCH 1/7] Mem table repartition --- rust/datafusion/src/datasource/memory.rs | 24 ++++++++++++++++++++++-- 1 file changed, 22 insertions(+), 2 deletions(-) diff --git a/rust/datafusion/src/datasource/memory.rs b/rust/datafusion/src/datasource/memory.rs index a3d7b0f1ac8..92baf414bd7 100644 --- a/rust/datafusion/src/datasource/memory.rs +++ b/rust/datafusion/src/datasource/memory.rs @@ -19,6 +19,7 @@ //! queried by DataFusion. This allows data to be pre-loaded into memory and then //! repeatedly queried without incurring additional file I/O overhead. +use futures::StreamExt; use log::debug; use std::any::Any; use std::sync::Arc; @@ -26,13 +27,16 @@ use std::sync::Arc; use arrow::datatypes::{Field, Schema, SchemaRef}; use arrow::record_batch::RecordBatch; -use crate::datasource::datasource::Statistics; use crate::datasource::TableProvider; use crate::error::{DataFusionError, Result}; use crate::logical_plan::Expr; use crate::physical_plan::common; use crate::physical_plan::memory::MemoryExec; use crate::physical_plan::ExecutionPlan; +use crate::{ + datasource::datasource::Statistics, + physical_plan::{repartition::RepartitionExec, Partitioning}, +}; use super::datasource::ColumnStatistics; @@ -126,7 +130,23 @@ impl MemTable { data.push(result); } - MemTable::try_new(schema.clone(), data) + let exec = MemoryExec::try_new(&data, schema.clone(), None)?; + let exec = + RepartitionExec::try_new(Arc::new(exec), Partitioning::RoundRobinBatch(16))?; + + // execute and collect results + let mut output_partitions = vec![]; + for i in 0..exec.output_partitioning().partition_count() { + // execute this *output* partition and collect all batches + let mut stream = exec.execute(i).await?; + let mut batches = vec![]; + while let Some(result) = stream.next().await { + batches.push(result?); + } + output_partitions.push(batches); + } + + MemTable::try_new(schema.clone(), output_partitions) } } From afd6528d5572da5280922a8002486a5385a8bb58 Mon Sep 17 00:00:00 2001 From: "Heres, Daniel" Date: Sat, 16 Jan 2021 13:23:29 +0100 Subject: [PATCH 2/7] Pass output partitions as argument --- rust/benchmarks/src/bin/tpch.rs | 12 +++++-- rust/datafusion/src/datasource/memory.rs | 40 +++++++++++++++--------- 2 files changed, 35 insertions(+), 17 deletions(-) diff --git a/rust/benchmarks/src/bin/tpch.rs b/rust/benchmarks/src/bin/tpch.rs index 539b8d23d08..ac9470b8a49 100644 --- a/rust/benchmarks/src/bin/tpch.rs +++ b/rust/benchmarks/src/bin/tpch.rs @@ -66,6 +66,10 @@ struct BenchmarkOpt { /// Load the data into a MemTable before executing the query #[structopt(short = "m", long = "mem-table")] mem_table: bool, + + /// Number of partitions to use when using MemTable + #[structopt(short = "p", long = "partitions", default_value = "8")] + partitions: usize, } #[derive(Debug, StructOpt)] @@ -134,8 +138,12 @@ async fn benchmark(opt: BenchmarkOpt) -> Result Result { + pub async fn load( + t: &dyn TableProvider, + batch_size: usize, + output_partitions: Option, + ) -> Result { let schema = t.schema(); let exec = t.scan(&None, batch_size, &[])?; let partition_count = exec.output_partitioning().partition_count(); @@ -131,22 +135,28 @@ impl MemTable { } let exec = MemoryExec::try_new(&data, schema.clone(), None)?; - let exec = - RepartitionExec::try_new(Arc::new(exec), Partitioning::RoundRobinBatch(16))?; - - // execute and collect results - let mut output_partitions = vec![]; - for i in 0..exec.output_partitioning().partition_count() { - // execute this *output* partition and collect all batches - let mut stream = exec.execute(i).await?; - let mut batches = vec![]; - while let Some(result) = stream.next().await { - batches.push(result?); + + if let Some(num_partitions) = output_partitions { + let exec = RepartitionExec::try_new( + Arc::new(exec), + Partitioning::RoundRobinBatch(num_partitions), + )?; + + // execute and collect results + let mut output_partitions = vec![]; + for i in 0..exec.output_partitioning().partition_count() { + // execute this *output* partition and collect all batches + let mut stream = exec.execute(i).await?; + let mut batches = vec![]; + while let Some(result) = stream.next().await { + batches.push(result?); + } + output_partitions.push(batches); } - output_partitions.push(batches); - } - MemTable::try_new(schema.clone(), output_partitions) + return MemTable::try_new(schema.clone(), output_partitions); + } + MemTable::try_new(schema.clone(), data) } } From 40ca825a6d57ba5c070b9ccb470ccff53f7146a8 Mon Sep 17 00:00:00 2001 From: "Heres, Daniel" Date: Sat, 16 Jan 2021 13:38:00 +0100 Subject: [PATCH 3/7] Fix already used short flag --- rust/benchmarks/src/bin/tpch.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rust/benchmarks/src/bin/tpch.rs b/rust/benchmarks/src/bin/tpch.rs index ac9470b8a49..b7c017c9b9f 100644 --- a/rust/benchmarks/src/bin/tpch.rs +++ b/rust/benchmarks/src/bin/tpch.rs @@ -68,7 +68,7 @@ struct BenchmarkOpt { mem_table: bool, /// Number of partitions to use when using MemTable - #[structopt(short = "p", long = "partitions", default_value = "8")] + #[structopt(short = "n", long = "partitions", default_value = "8")] partitions: usize, } From 34bd32f4568cfb0f237af7dcd95f92b306d461d2 Mon Sep 17 00:00:00 2001 From: "Heres, Daniel" Date: Sat, 16 Jan 2021 14:18:42 +0100 Subject: [PATCH 4/7] Fix test --- rust/benchmarks/src/bin/tpch.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/rust/benchmarks/src/bin/tpch.rs b/rust/benchmarks/src/bin/tpch.rs index b7c017c9b9f..9705df173be 100644 --- a/rust/benchmarks/src/bin/tpch.rs +++ b/rust/benchmarks/src/bin/tpch.rs @@ -1597,6 +1597,7 @@ mod tests { path: PathBuf::from(path.to_string()), file_format: "tbl".to_string(), mem_table: false, + partitions: 16, }; let actual = benchmark(opt).await?; From 9750ead28a6dbe99ca5fafa584d5717ecfba21b9 Mon Sep 17 00:00:00 2001 From: "Heres, Daniel" Date: Sat, 16 Jan 2021 16:21:38 +0100 Subject: [PATCH 5/7] Pass partitions in bench --- rust/datafusion/benches/sort_limit_query_sql.rs | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/rust/datafusion/benches/sort_limit_query_sql.rs b/rust/datafusion/benches/sort_limit_query_sql.rs index de6a765cb52..290091a0a97 100644 --- a/rust/datafusion/benches/sort_limit_query_sql.rs +++ b/rust/datafusion/benches/sort_limit_query_sql.rs @@ -70,8 +70,11 @@ fn create_context() -> Arc> { let ctx_holder: Arc>>>> = Arc::new(Mutex::new(vec![])); + + let partitions = 16; + rt.block_on(async { - let mem_table = MemTable::load(&csv, 16 * 1024).await.unwrap(); + let mem_table = MemTable::load(&csv, 16 * 1024, Some(partitions)).await.unwrap(); // create local execution context let mut ctx = ExecutionContext::new(); From 990d3bae1c3e738436a7ec603de3cb802bef81e3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dani=C3=ABl=20Heres?= Date: Mon, 18 Jan 2021 12:41:49 +0100 Subject: [PATCH 6/7] Adapt description of flag Co-authored-by: Andrew Lamb --- rust/benchmarks/src/bin/tpch.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rust/benchmarks/src/bin/tpch.rs b/rust/benchmarks/src/bin/tpch.rs index 9705df173be..16a353a4202 100644 --- a/rust/benchmarks/src/bin/tpch.rs +++ b/rust/benchmarks/src/bin/tpch.rs @@ -67,7 +67,7 @@ struct BenchmarkOpt { #[structopt(short = "m", long = "mem-table")] mem_table: bool, - /// Number of partitions to use when using MemTable + /// Number of partitions to create when using MemTable as input #[structopt(short = "n", long = "partitions", default_value = "8")] partitions: usize, } From 0018c859c44d2daa89487c5ba8684947d762c2de Mon Sep 17 00:00:00 2001 From: "Heres, Daniel" Date: Mon, 18 Jan 2021 13:38:05 +0100 Subject: [PATCH 7/7] Fmt --- rust/datafusion/benches/sort_limit_query_sql.rs | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/rust/datafusion/benches/sort_limit_query_sql.rs b/rust/datafusion/benches/sort_limit_query_sql.rs index 290091a0a97..9834bee0986 100644 --- a/rust/datafusion/benches/sort_limit_query_sql.rs +++ b/rust/datafusion/benches/sort_limit_query_sql.rs @@ -74,7 +74,9 @@ fn create_context() -> Arc> { let partitions = 16; rt.block_on(async { - let mem_table = MemTable::load(&csv, 16 * 1024, Some(partitions)).await.unwrap(); + let mem_table = MemTable::load(&csv, 16 * 1024, Some(partitions)) + .await + .unwrap(); // create local execution context let mut ctx = ExecutionContext::new();