From df32c7e4f1b67b92d3edd9ee440aed1f36f87e91 Mon Sep 17 00:00:00 2001 From: Firestarman Date: Fri, 11 Oct 2024 11:11:52 +0800 Subject: [PATCH] Add tests Signed-off-by: Firestarman --- integration_tests/src/main/python/hash_aggregate_test.py | 8 ++++++-- integration_tests/src/main/python/join_test.py | 8 ++++++-- .../com/nvidia/spark/rapids/GpuShuffleCoalesceExec.scala | 1 - 3 files changed, 12 insertions(+), 5 deletions(-) diff --git a/integration_tests/src/main/python/hash_aggregate_test.py b/integration_tests/src/main/python/hash_aggregate_test.py index 734b4dfb708..2645d1d5f98 100644 --- a/integration_tests/src/main/python/hash_aggregate_test.py +++ b/integration_tests/src/main/python/hash_aggregate_test.py @@ -1118,7 +1118,8 @@ def test_hash_groupby_typed_imperative_agg_without_gpu_implementation_fallback() @disable_ansi_mode # https://github.com/NVIDIA/spark-rapids/issues/5114 @pytest.mark.parametrize('data_gen', _init_list, ids=idfn) @pytest.mark.parametrize('conf', get_params(_confs, params_markers_for_confs), ids=idfn) -def test_hash_multiple_mode_query(data_gen, conf): +@pytest.mark.parametrize('shuffle_split', [True, False], ids=idfn) +def test_hash_multiple_mode_query(data_gen, conf, shuffle_split): print_params(data_gen) assert_gpu_and_cpu_are_equal_collect( lambda spark: gen_df(spark, data_gen, length=100) @@ -1132,7 +1133,10 @@ def test_hash_multiple_mode_query(data_gen, conf): f.max('a'), f.sumDistinct('b'), f.countDistinct('c') - ), conf=conf) + ), + conf=copy_and_update( + conf, + {'spark.rapids.shuffle.splitRetryRead.enabled': shuffle_split})) @approximate_float diff --git a/integration_tests/src/main/python/join_test.py b/integration_tests/src/main/python/join_test.py index 703fbe80230..d23d74da524 100644 --- a/integration_tests/src/main/python/join_test.py +++ b/integration_tests/src/main/python/join_test.py @@ -242,10 +242,12 @@ def test_hash_join_ridealong_non_sized(data_gen, join_type, sub_part_enabled): @ignore_order(local=True) @pytest.mark.parametrize('data_gen', basic_nested_gens + [decimal_gen_128bit], ids=idfn) @pytest.mark.parametrize('join_type', all_symmetric_sized_join_types, ids=idfn) +@pytest.mark.parametrize('shuffle_split', [True, False], ids=idfn) @allow_non_gpu(*non_utc_allow) -def test_hash_join_ridealong_symmetric(data_gen, join_type): +def test_hash_join_ridealong_symmetric(data_gen, join_type, shuffle_split): confs = { "spark.rapids.sql.join.useShuffledSymmetricHashJoin": "true", + "spark.rapids.shuffle.splitRetryRead.enabled": shuffle_split, } hash_join_ridealong(data_gen, join_type, confs) @@ -253,10 +255,12 @@ def test_hash_join_ridealong_symmetric(data_gen, join_type): @ignore_order(local=True) @pytest.mark.parametrize('data_gen', basic_nested_gens + [decimal_gen_128bit], ids=idfn) @pytest.mark.parametrize('join_type', all_asymmetric_sized_join_types, ids=idfn) +@pytest.mark.parametrize('shuffle_split', [True, False], ids=idfn) @allow_non_gpu(*non_utc_allow) -def test_hash_join_ridealong_asymmetric(data_gen, join_type): +def test_hash_join_ridealong_asymmetric(data_gen, join_type, shuffle_split): confs = { "spark.rapids.sql.join.useShuffledAsymmetricHashJoin": "true", + "spark.rapids.shuffle.splitRetryRead.enabled": shuffle_split, } hash_join_ridealong(data_gen, join_type, confs) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuShuffleCoalesceExec.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuShuffleCoalesceExec.scala index 5b940721bad..4fc1696113a 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuShuffleCoalesceExec.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuShuffleCoalesceExec.scala @@ -105,7 +105,6 @@ object GpuShuffleCoalesceUtils { reader.prefetchHeadOnHost() } } - println("===> use GpuShuffleCoalesce Reader") reader.asIterator } else { val hostIter = new HostShuffleCoalesceIterator(iter, targetSize, metricsMap)