diff --git a/.github/workflows/scheduled.yml b/.github/workflows/scheduled.yml index 17eb81ca190e..a6351e5fc8ef 100644 --- a/.github/workflows/scheduled.yml +++ b/.github/workflows/scheduled.yml @@ -604,6 +604,7 @@ jobs: --seed ${random_seed} \ --duration_sec $DURATION \ --enable_sorted_aggregations=false \ + --enable_streaming_aggregations=false \ --minloglevel=0 \ --stderrthreshold=2 \ --log_dir=/tmp/spark_aggregate_fuzzer_repro/logs \ diff --git a/velox/exec/fuzzer/AggregationFuzzer.cpp b/velox/exec/fuzzer/AggregationFuzzer.cpp index 98854f82d264..a22718133b06 100644 --- a/velox/exec/fuzzer/AggregationFuzzer.cpp +++ b/velox/exec/fuzzer/AggregationFuzzer.cpp @@ -34,6 +34,11 @@ DEFINE_bool( true, "When true, generates plans with aggregations over sorted inputs"); +DEFINE_bool( + enable_streaming_aggregations, + true, + "When true, generates plans with streaming aggregations"); + using facebook::velox::fuzzer::CallableSignature; using facebook::velox::fuzzer::SignatureTemplate; @@ -741,7 +746,7 @@ bool AggregationFuzzer::verifyAggregation( projections, tableScanPlans); - if (!groupingKeys.empty()) { + if (FLAGS_enable_streaming_aggregations && !groupingKeys.empty()) { // Use OrderBy + StreamingAggregation on original input. makeStreamingPlansWithTableScan( groupingKeys, @@ -772,7 +777,7 @@ bool AggregationFuzzer::verifyAggregation( makeAlternativePlansWithValues( groupingKeys, aggregates, masks, flatInput, projections, valuesPlans); - if (!groupingKeys.empty()) { + if (FLAGS_enable_streaming_aggregations && !groupingKeys.empty()) { // Use OrderBy + StreamingAggregation on original input. makeStreamingPlansWithValues( groupingKeys, aggregates, masks, input, projections, valuesPlans); @@ -839,7 +844,7 @@ bool AggregationFuzzer::verifySortedAggregation( std::vector plans; plans.push_back({firstPlan, {}}); - if (!groupingKeys.empty()) { + if (FLAGS_enable_streaming_aggregations && !groupingKeys.empty()) { plans.push_back( {PlanBuilder() .values(input) @@ -869,7 +874,7 @@ bool AggregationFuzzer::verifySortedAggregation( .planNode(), splits}); - if (!groupingKeys.empty()) { + if (FLAGS_enable_streaming_aggregations && !groupingKeys.empty()) { plans.push_back( {PlanBuilder() .tableScan(inputRowType) @@ -1135,7 +1140,7 @@ bool AggregationFuzzer::verifyDistinctAggregation( std::vector plans; plans.push_back({firstPlan, {}}); - if (!groupingKeys.empty()) { + if (FLAGS_enable_streaming_aggregations && !groupingKeys.empty()) { plans.push_back( {PlanBuilder() .values(input) @@ -1167,7 +1172,7 @@ bool AggregationFuzzer::verifyDistinctAggregation( .planNode(), splits}); - if (!groupingKeys.empty()) { + if (FLAGS_enable_streaming_aggregations && !groupingKeys.empty()) { plans.push_back( {PlanBuilder() .tableScan(inputRowType)