-
Notifications
You must be signed in to change notification settings - Fork 3.9k
ARROW-11268: [Rust][DataFusion] MemTable::load output partition support #9214
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Thanks for opening a pull request! Could you open an issue for this pull request on JIRA? Then could you also rename pull request title in the following format?
See also: |
Codecov Report
@@ Coverage Diff @@
## master #9214 +/- ##
==========================================
- Coverage 81.61% 81.58% -0.03%
==========================================
Files 215 215
Lines 51867 51882 +15
==========================================
Hits 42329 42329
- Misses 9538 9553 +15
Continue to review full report at Codecov.
|
This would also help us in the db-benchmark h2oai/db-benchmark#182 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good to me. I wonder if there is some way to add a test (so that this won't get broken accidentally) for addition of the correct RepartitionExec
feature -- perhaps something like https://github.com/alamb/arrow/blob/main/rust/datafusion/src/logical_plan/builder.rs#L367 where the presence of the repartition operator is checked
|
||
let exec = MemoryExec::try_new(&data, schema.clone(), None)?; | ||
|
||
if let Some(num_partitions) = output_partitions { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
Co-authored-by: Andrew Lamb <[email protected]>
I think the feature to be able to repartition an in memory table is useful, as the repartitioning only needs to be applied once, and repartition itself is cheap (at the same node). Doing this when loading data is very useful for in-memory analytics as we can benefit from mutliple cores after loading the data. The speed up from repartitioning is very big (mainly on aggregates), on my (8-core machine): ~5-7x on query 1 and 12 versus a single partition, and a smaller (~30%) difference for query 5 when using 16 partition. q1/q12 also have very high cpu utilization. @jorgecarleitao maybe this is of interest to you, as you mentioned you are looking into multi-threading. I think this would be a "high level" way to get more parallelism, also in the logical plan. I think in some optimizer rules and/or dynamically we can do repartitions, similar to what's described here https://issues.apache.org/jira/browse/ARROW-9464 Benchmarks after repartitioning (16 partitions): PR (16 partitions) ``` Query 12 iteration 0 took 33.9 ms Query 12 iteration 1 took 34.3 ms Query 12 iteration 2 took 36.9 ms Query 12 iteration 3 took 33.6 ms Query 12 iteration 4 took 35.1 ms Query 12 iteration 5 took 38.8 ms Query 12 iteration 6 took 35.8 ms Query 12 iteration 7 took 34.4 ms Query 12 iteration 8 took 34.2 ms Query 12 iteration 9 took 35.3 ms Query 12 avg time: 35.24 ms ``` Master (1 partition): ``` Query 12 iteration 0 took 245.6 ms Query 12 iteration 1 took 246.4 ms Query 12 iteration 2 took 246.1 ms Query 12 iteration 3 took 247.9 ms Query 12 iteration 4 took 246.5 ms Query 12 iteration 5 took 248.2 ms Query 12 iteration 6 took 247.8 ms Query 12 iteration 7 took 246.4 ms Query 12 iteration 8 took 246.6 ms Query 12 iteration 9 took 246.5 ms Query 12 avg time: 246.79 ms ``` PR (16 partitions): ``` Query 1 iteration 0 took 138.6 ms Query 1 iteration 1 took 142.2 ms Query 1 iteration 2 took 125.8 ms Query 1 iteration 3 took 102.4 ms Query 1 iteration 4 took 105.9 ms Query 1 iteration 5 took 107.0 ms Query 1 iteration 6 took 109.3 ms Query 1 iteration 7 took 109.9 ms Query 1 iteration 8 took 108.8 ms Query 1 iteration 9 took 112.0 ms Query 1 avg time: 116.19 ms ``` Master (1 partition): ``` Query 1 iteration 0 took 640.6 ms Query 1 iteration 1 took 640.0 ms Query 1 iteration 2 took 632.9 ms Query 1 iteration 3 took 634.6 ms Query 1 iteration 4 took 630.7 ms Query 1 iteration 5 took 630.7 ms Query 1 iteration 6 took 631.9 ms Query 1 iteration 7 took 635.5 ms Query 1 iteration 8 took 639.0 ms Query 1 iteration 9 took 638.3 ms Query 1 avg time: 635.43 ms ``` PR (16 partitions) ``` Query 5 iteration 0 took 465.8 ms Query 5 iteration 1 took 428.0 ms Query 5 iteration 2 took 435.0 ms Query 5 iteration 3 took 407.3 ms Query 5 iteration 4 took 435.7 ms Query 5 iteration 5 took 437.4 ms Query 5 iteration 6 took 411.2 ms Query 5 iteration 7 took 432.0 ms Query 5 iteration 8 took 436.8 ms Query 5 iteration 9 took 435.6 ms Query 5 avg time: 432.47 ms ``` Master (1 partition) ``` Query 5 iteration 0 took 660.6 ms Query 5 iteration 1 took 634.4 ms Query 5 iteration 2 took 626.4 ms Query 5 iteration 3 took 628.0 ms Query 5 iteration 4 took 635.3 ms Query 5 iteration 5 took 631.1 ms Query 5 iteration 6 took 631.3 ms Query 5 iteration 7 took 639.4 ms Query 5 iteration 8 took 634.3 ms Query 5 iteration 9 took 639.0 ms Query 5 avg time: 635.97 ms ``` Closes #9214 from Dandandan/mem_table_repartition Lead-authored-by: Heres, Daniel <[email protected]> Co-authored-by: Daniël Heres <[email protected]> Signed-off-by: Jorge C. Leitao <[email protected]>
I think the feature to be able to repartition an in memory table is useful, as the repartitioning only needs to be applied once, and repartition itself is cheap (at the same node). Doing this when loading data is very useful for in-memory analytics as we can benefit from mutliple cores after loading the data. The speed up from repartitioning is very big (mainly on aggregates), on my (8-core machine): ~5-7x on query 1 and 12 versus a single partition, and a smaller (~30%) difference for query 5 when using 16 partition. q1/q12 also have very high cpu utilization. @jorgecarleitao maybe this is of interest to you, as you mentioned you are looking into multi-threading. I think this would be a "high level" way to get more parallelism, also in the logical plan. I think in some optimizer rules and/or dynamically we can do repartitions, similar to what's described here https://issues.apache.org/jira/browse/ARROW-9464 Benchmarks after repartitioning (16 partitions): PR (16 partitions) ``` Query 12 iteration 0 took 33.9 ms Query 12 iteration 1 took 34.3 ms Query 12 iteration 2 took 36.9 ms Query 12 iteration 3 took 33.6 ms Query 12 iteration 4 took 35.1 ms Query 12 iteration 5 took 38.8 ms Query 12 iteration 6 took 35.8 ms Query 12 iteration 7 took 34.4 ms Query 12 iteration 8 took 34.2 ms Query 12 iteration 9 took 35.3 ms Query 12 avg time: 35.24 ms ``` Master (1 partition): ``` Query 12 iteration 0 took 245.6 ms Query 12 iteration 1 took 246.4 ms Query 12 iteration 2 took 246.1 ms Query 12 iteration 3 took 247.9 ms Query 12 iteration 4 took 246.5 ms Query 12 iteration 5 took 248.2 ms Query 12 iteration 6 took 247.8 ms Query 12 iteration 7 took 246.4 ms Query 12 iteration 8 took 246.6 ms Query 12 iteration 9 took 246.5 ms Query 12 avg time: 246.79 ms ``` PR (16 partitions): ``` Query 1 iteration 0 took 138.6 ms Query 1 iteration 1 took 142.2 ms Query 1 iteration 2 took 125.8 ms Query 1 iteration 3 took 102.4 ms Query 1 iteration 4 took 105.9 ms Query 1 iteration 5 took 107.0 ms Query 1 iteration 6 took 109.3 ms Query 1 iteration 7 took 109.9 ms Query 1 iteration 8 took 108.8 ms Query 1 iteration 9 took 112.0 ms Query 1 avg time: 116.19 ms ``` Master (1 partition): ``` Query 1 iteration 0 took 640.6 ms Query 1 iteration 1 took 640.0 ms Query 1 iteration 2 took 632.9 ms Query 1 iteration 3 took 634.6 ms Query 1 iteration 4 took 630.7 ms Query 1 iteration 5 took 630.7 ms Query 1 iteration 6 took 631.9 ms Query 1 iteration 7 took 635.5 ms Query 1 iteration 8 took 639.0 ms Query 1 iteration 9 took 638.3 ms Query 1 avg time: 635.43 ms ``` PR (16 partitions) ``` Query 5 iteration 0 took 465.8 ms Query 5 iteration 1 took 428.0 ms Query 5 iteration 2 took 435.0 ms Query 5 iteration 3 took 407.3 ms Query 5 iteration 4 took 435.7 ms Query 5 iteration 5 took 437.4 ms Query 5 iteration 6 took 411.2 ms Query 5 iteration 7 took 432.0 ms Query 5 iteration 8 took 436.8 ms Query 5 iteration 9 took 435.6 ms Query 5 avg time: 432.47 ms ``` Master (1 partition) ``` Query 5 iteration 0 took 660.6 ms Query 5 iteration 1 took 634.4 ms Query 5 iteration 2 took 626.4 ms Query 5 iteration 3 took 628.0 ms Query 5 iteration 4 took 635.3 ms Query 5 iteration 5 took 631.1 ms Query 5 iteration 6 took 631.3 ms Query 5 iteration 7 took 639.4 ms Query 5 iteration 8 took 634.3 ms Query 5 iteration 9 took 639.0 ms Query 5 avg time: 635.97 ms ``` Closes apache#9214 from Dandandan/mem_table_repartition Lead-authored-by: Heres, Daniel <[email protected]> Co-authored-by: Daniël Heres <[email protected]> Signed-off-by: Jorge C. Leitao <[email protected]>
I think the feature to be able to repartition an in memory table is useful, as the repartitioning only needs to be applied once, and repartition itself is cheap (at the same node). Doing this when loading data is very useful for in-memory analytics as we can benefit from mutliple cores after loading the data. The speed up from repartitioning is very big (mainly on aggregates), on my (8-core machine): ~5-7x on query 1 and 12 versus a single partition, and a smaller (~30%) difference for query 5 when using 16 partition. q1/q12 also have very high cpu utilization. @jorgecarleitao maybe this is of interest to you, as you mentioned you are looking into multi-threading. I think this would be a "high level" way to get more parallelism, also in the logical plan. I think in some optimizer rules and/or dynamically we can do repartitions, similar to what's described here https://issues.apache.org/jira/browse/ARROW-9464 Benchmarks after repartitioning (16 partitions): PR (16 partitions) ``` Query 12 iteration 0 took 33.9 ms Query 12 iteration 1 took 34.3 ms Query 12 iteration 2 took 36.9 ms Query 12 iteration 3 took 33.6 ms Query 12 iteration 4 took 35.1 ms Query 12 iteration 5 took 38.8 ms Query 12 iteration 6 took 35.8 ms Query 12 iteration 7 took 34.4 ms Query 12 iteration 8 took 34.2 ms Query 12 iteration 9 took 35.3 ms Query 12 avg time: 35.24 ms ``` Master (1 partition): ``` Query 12 iteration 0 took 245.6 ms Query 12 iteration 1 took 246.4 ms Query 12 iteration 2 took 246.1 ms Query 12 iteration 3 took 247.9 ms Query 12 iteration 4 took 246.5 ms Query 12 iteration 5 took 248.2 ms Query 12 iteration 6 took 247.8 ms Query 12 iteration 7 took 246.4 ms Query 12 iteration 8 took 246.6 ms Query 12 iteration 9 took 246.5 ms Query 12 avg time: 246.79 ms ``` PR (16 partitions): ``` Query 1 iteration 0 took 138.6 ms Query 1 iteration 1 took 142.2 ms Query 1 iteration 2 took 125.8 ms Query 1 iteration 3 took 102.4 ms Query 1 iteration 4 took 105.9 ms Query 1 iteration 5 took 107.0 ms Query 1 iteration 6 took 109.3 ms Query 1 iteration 7 took 109.9 ms Query 1 iteration 8 took 108.8 ms Query 1 iteration 9 took 112.0 ms Query 1 avg time: 116.19 ms ``` Master (1 partition): ``` Query 1 iteration 0 took 640.6 ms Query 1 iteration 1 took 640.0 ms Query 1 iteration 2 took 632.9 ms Query 1 iteration 3 took 634.6 ms Query 1 iteration 4 took 630.7 ms Query 1 iteration 5 took 630.7 ms Query 1 iteration 6 took 631.9 ms Query 1 iteration 7 took 635.5 ms Query 1 iteration 8 took 639.0 ms Query 1 iteration 9 took 638.3 ms Query 1 avg time: 635.43 ms ``` PR (16 partitions) ``` Query 5 iteration 0 took 465.8 ms Query 5 iteration 1 took 428.0 ms Query 5 iteration 2 took 435.0 ms Query 5 iteration 3 took 407.3 ms Query 5 iteration 4 took 435.7 ms Query 5 iteration 5 took 437.4 ms Query 5 iteration 6 took 411.2 ms Query 5 iteration 7 took 432.0 ms Query 5 iteration 8 took 436.8 ms Query 5 iteration 9 took 435.6 ms Query 5 avg time: 432.47 ms ``` Master (1 partition) ``` Query 5 iteration 0 took 660.6 ms Query 5 iteration 1 took 634.4 ms Query 5 iteration 2 took 626.4 ms Query 5 iteration 3 took 628.0 ms Query 5 iteration 4 took 635.3 ms Query 5 iteration 5 took 631.1 ms Query 5 iteration 6 took 631.3 ms Query 5 iteration 7 took 639.4 ms Query 5 iteration 8 took 634.3 ms Query 5 iteration 9 took 639.0 ms Query 5 avg time: 635.97 ms ``` Closes apache#9214 from Dandandan/mem_table_repartition Lead-authored-by: Heres, Daniel <[email protected]> Co-authored-by: Daniël Heres <[email protected]> Signed-off-by: Jorge C. Leitao <[email protected]>
I think the feature to be able to repartition an in memory table is useful, as the repartitioning only needs to be applied once, and repartition itself is cheap (at the same node). Doing this when loading data is very useful for in-memory analytics as we can benefit from mutliple cores after loading the data.
The speed up from repartitioning is very big (mainly on aggregates), on my (8-core machine): ~5-7x on query 1 and 12 versus a single partition, and a smaller (~30%) difference for query 5 when using 16 partition. q1/q12 also have very high cpu utilization.
@jorgecarleitao maybe this is of interest to you, as you mentioned you are looking into multi-threading. I think this would be a "high level" way to get more parallelism, also in the logical plan. I think in some optimizer rules and/or dynamically we can do repartitions, similar to what's described here https://issues.apache.org/jira/browse/ARROW-9464
Benchmarks after repartitioning (16 partitions):
PR (16 partitions)
Master (1 partition):
PR (16 partitions):
Master (1 partition):
PR (16 partitions)
Master (1 partition)