From 2a93861b1038710e875aea17fae17d4735a8ddb5 Mon Sep 17 00:00:00 2001 From: ramitg254 Date: Sat, 20 Sep 2025 17:25:20 +0530 Subject: [PATCH 1/6] HIVE-29203:get_aggr_stats_for doesn't aggregate stats when direct sql batch retrieve is enabled --- .../clientpositive/perf/batched_query16.q | 35 ++ .../perf/tpcds30tb/tez/batched_query16.q.out | 342 ++++++++++++++++++ .../hive/metastore/MetaStoreDirectSql.java | 21 +- 3 files changed, 388 insertions(+), 10 deletions(-) create mode 100644 ql/src/test/queries/clientpositive/perf/batched_query16.q create mode 100644 ql/src/test/results/clientpositive/perf/tpcds30tb/tez/batched_query16.q.out diff --git a/ql/src/test/queries/clientpositive/perf/batched_query16.q b/ql/src/test/queries/clientpositive/perf/batched_query16.q new file mode 100644 index 000000000000..14dc24654736 --- /dev/null +++ b/ql/src/test/queries/clientpositive/perf/batched_query16.q @@ -0,0 +1,35 @@ +set hive.mapred.mode=nonstrict; +set hive.auto.convert.anti.join=true; +set hive.metastore.direct.sql.batch.size=1000; +-- start query 1 in stream 0 using template query16.tpl and seed 171719422 +explain +select + count(distinct cs_order_number) as `order count` + ,sum(cs_ext_ship_cost) as `total shipping cost` + ,sum(cs_net_profit) as `total net profit` +from + catalog_sales cs1 + ,date_dim + ,customer_address + ,call_center +where + d_date between '2001-4-01' and + (cast('2001-4-01' as date) + 60 days) +and cs1.cs_ship_date_sk = d_date_sk +and cs1.cs_ship_addr_sk = ca_address_sk +and ca_state = 'NY' +and cs1.cs_call_center_sk = cc_call_center_sk +and cc_county in ('Ziebach County','Levy County','Huron County','Franklin Parish', + 'Daviess County' +) +and exists (select * + from catalog_sales cs2 + where cs1.cs_order_number = cs2.cs_order_number + and cs1.cs_warehouse_sk <> cs2.cs_warehouse_sk) +and not exists(select * + from catalog_returns cr1 + where cs1.cs_order_number = cr1.cr_order_number) +order by count(distinct cs_order_number) +limit 100; + +-- end query 1 in stream 0 using template query16.tpl diff --git a/ql/src/test/results/clientpositive/perf/tpcds30tb/tez/batched_query16.q.out b/ql/src/test/results/clientpositive/perf/tpcds30tb/tez/batched_query16.q.out new file mode 100644 index 000000000000..9cd4986a6e30 --- /dev/null +++ b/ql/src/test/results/clientpositive/perf/tpcds30tb/tez/batched_query16.q.out @@ -0,0 +1,342 @@ +STAGE DEPENDENCIES: + Stage-1 is a root stage + Stage-0 depends on stages: Stage-1 + +STAGE PLANS: + Stage: Stage-1 + Tez +#### A masked pattern was here #### + Edges: + Map 1 <- Map 10 (BROADCAST_EDGE), Map 8 (BROADCAST_EDGE), Map 9 (BROADCAST_EDGE) + Map 11 <- Reducer 7 (BROADCAST_EDGE) + Map 12 <- Reducer 6 (BROADCAST_EDGE) + Reducer 2 <- Map 1 (CUSTOM_SIMPLE_EDGE), Map 11 (CUSTOM_SIMPLE_EDGE) + Reducer 3 <- Map 12 (CUSTOM_SIMPLE_EDGE), Reducer 2 (CUSTOM_SIMPLE_EDGE) + Reducer 4 <- Reducer 3 (SIMPLE_EDGE) + Reducer 5 <- Reducer 4 (CUSTOM_SIMPLE_EDGE) + Reducer 6 <- Reducer 2 (CUSTOM_SIMPLE_EDGE) + Reducer 7 <- Map 1 (CUSTOM_SIMPLE_EDGE) +#### A masked pattern was here #### + Vertices: + Map 1 + Map Operator Tree: + TableScan + alias: cs1 + filterExpr: (cs_ship_addr_sk is not null and cs_ship_date_sk is not null and cs_call_center_sk is not null) (type: boolean) + probeDecodeDetails: cacheKey:HASH_MAP_MAPJOIN_119_container, bigKeyColName:cs_call_center_sk, smallTablePos:1, keyRatio:1.8509578697501366E-10 + Statistics: Num rows: 43220864887 Data size: 11379157992136 Basic stats: COMPLETE Column stats: COMPLETE + Filter Operator + predicate: (cs_ship_addr_sk is not null and cs_ship_date_sk is not null and cs_call_center_sk is not null) (type: boolean) + Statistics: Num rows: 42578387146 Data size: 11210006917944 Basic stats: COMPLETE Column stats: COMPLETE + Select Operator + expressions: cs_ship_date_sk (type: bigint), cs_ship_addr_sk (type: bigint), cs_call_center_sk (type: bigint), cs_warehouse_sk (type: bigint), cs_order_number (type: bigint), cs_ext_ship_cost (type: decimal(7,2)), cs_net_profit (type: decimal(7,2)) + outputColumnNames: _col0, _col1, _col2, _col3, _col4, _col5, _col6 + Statistics: Num rows: 42578387146 Data size: 11210006917944 Basic stats: COMPLETE Column stats: COMPLETE + Map Join Operator + condition map: + Inner Join 0 to 1 + keys: + 0 _col1 (type: bigint) + 1 _col0 (type: bigint) + outputColumnNames: _col0, _col2, _col3, _col4, _col5, _col6 + input vertices: + 1 Map 8 + Statistics: Num rows: 803365808 Data size: 176672786488 Basic stats: COMPLETE Column stats: COMPLETE + Map Join Operator + condition map: + Inner Join 0 to 1 + keys: + 0 _col2 (type: bigint) + 1 _col0 (type: bigint) + outputColumnNames: _col0, _col3, _col4, _col5, _col6 + input vertices: + 1 Map 9 + Statistics: Num rows: 160673164 Data size: 19280779808 Basic stats: COMPLETE Column stats: COMPLETE + Map Join Operator + condition map: + Inner Join 0 to 1 + keys: + 0 _col0 (type: bigint) + 1 _col0 (type: bigint) + outputColumnNames: _col3, _col4, _col5, _col6 + input vertices: + 1 Map 10 + Statistics: Num rows: 17851352 Data size: 2142162360 Basic stats: COMPLETE Column stats: COMPLETE + Reduce Output Operator + key expressions: _col4 (type: bigint) + null sort order: z + sort order: + + Map-reduce partition columns: _col4 (type: bigint) + Statistics: Num rows: 17851352 Data size: 2142162360 Basic stats: COMPLETE Column stats: COMPLETE + value expressions: _col3 (type: bigint), _col5 (type: decimal(7,2)), _col6 (type: decimal(7,2)) + Select Operator + expressions: _col4 (type: bigint) + outputColumnNames: _col4 + Statistics: Num rows: 17851352 Data size: 142810816 Basic stats: COMPLETE Column stats: COMPLETE + Group By Operator + aggregations: min(_col4), max(_col4), bloom_filter(_col4, expectedEntries=1869746) + minReductionHashAggr: 0.99 + mode: hash + outputColumnNames: _col0, _col1, _col2 + Statistics: Num rows: 1 Data size: 160 Basic stats: COMPLETE Column stats: COMPLETE + Reduce Output Operator + null sort order: + sort order: + Statistics: Num rows: 1 Data size: 160 Basic stats: COMPLETE Column stats: COMPLETE + value expressions: _col0 (type: bigint), _col1 (type: bigint), _col2 (type: binary) + Execution mode: vectorized, llap + LLAP IO: may be used (ACID table) + Map 10 + Map Operator Tree: + TableScan + alias: date_dim + filterExpr: CAST( d_date AS TIMESTAMP) BETWEEN TIMESTAMP'2001-04-01 00:00:00' AND TIMESTAMP'2001-05-31 00:00:00' (type: boolean) + Statistics: Num rows: 73049 Data size: 4675136 Basic stats: COMPLETE Column stats: COMPLETE + Filter Operator + predicate: CAST( d_date AS TIMESTAMP) BETWEEN TIMESTAMP'2001-04-01 00:00:00' AND TIMESTAMP'2001-05-31 00:00:00' (type: boolean) + Statistics: Num rows: 8116 Data size: 519424 Basic stats: COMPLETE Column stats: COMPLETE + Select Operator + expressions: d_date_sk (type: bigint) + outputColumnNames: _col0 + Statistics: Num rows: 8116 Data size: 64928 Basic stats: COMPLETE Column stats: COMPLETE + Reduce Output Operator + key expressions: _col0 (type: bigint) + null sort order: z + sort order: + + Map-reduce partition columns: _col0 (type: bigint) + Statistics: Num rows: 8116 Data size: 64928 Basic stats: COMPLETE Column stats: COMPLETE + Execution mode: vectorized, llap + LLAP IO: may be used (ACID table) + Map 11 + Map Operator Tree: + TableScan + alias: cs2 + filterExpr: (cs_warehouse_sk is not null and cs_order_number BETWEEN DynamicValue(RS_29_cs1_cs_order_number_min) AND DynamicValue(RS_29_cs1_cs_order_number_max) and in_bloom_filter(cs_order_number, DynamicValue(RS_29_cs1_cs_order_number_bloom_filter))) (type: boolean) + Statistics: Num rows: 43220864887 Data size: 689811596216 Basic stats: COMPLETE Column stats: COMPLETE + Filter Operator + predicate: (cs_warehouse_sk is not null and cs_order_number BETWEEN DynamicValue(RS_29_cs1_cs_order_number_min) AND DynamicValue(RS_29_cs1_cs_order_number_max) and in_bloom_filter(cs_order_number, DynamicValue(RS_29_cs1_cs_order_number_bloom_filter))) (type: boolean) + Statistics: Num rows: 43005584639 Data size: 686375690624 Basic stats: COMPLETE Column stats: COMPLETE + Select Operator + expressions: cs_order_number (type: bigint), cs_warehouse_sk (type: bigint) + outputColumnNames: _col0, _col1 + Statistics: Num rows: 43005584639 Data size: 686375690624 Basic stats: COMPLETE Column stats: COMPLETE + Group By Operator + keys: _col0 (type: bigint), _col1 (type: bigint) + minReductionHashAggr: 0.45127505 + mode: hash + outputColumnNames: _col0, _col1 + Statistics: Num rows: 43005584639 Data size: 686375690624 Basic stats: COMPLETE Column stats: COMPLETE + Reduce Output Operator + key expressions: _col0 (type: bigint) + null sort order: z + sort order: + + Map-reduce partition columns: _col0 (type: bigint) + Statistics: Num rows: 43005584639 Data size: 686375690624 Basic stats: COMPLETE Column stats: COMPLETE + value expressions: _col1 (type: bigint) + Execution mode: vectorized, llap + LLAP IO: may be used (ACID table) + Map 12 + Map Operator Tree: + TableScan + alias: cr1 + filterExpr: (cr_order_number BETWEEN DynamicValue(RS_35_cs1_cs_order_number_min) AND DynamicValue(RS_35_cs1_cs_order_number_max) and in_bloom_filter(cr_order_number, DynamicValue(RS_35_cs1_cs_order_number_bloom_filter))) (type: boolean) + Statistics: Num rows: 4320980099 Data size: 34567840792 Basic stats: COMPLETE Column stats: COMPLETE + Filter Operator + predicate: (cr_order_number BETWEEN DynamicValue(RS_35_cs1_cs_order_number_min) AND DynamicValue(RS_35_cs1_cs_order_number_max) and in_bloom_filter(cr_order_number, DynamicValue(RS_35_cs1_cs_order_number_bloom_filter))) (type: boolean) + Statistics: Num rows: 4320980099 Data size: 34567840792 Basic stats: COMPLETE Column stats: COMPLETE + Select Operator + expressions: cr_order_number (type: bigint) + outputColumnNames: _col0 + Statistics: Num rows: 4320980099 Data size: 34567840792 Basic stats: COMPLETE Column stats: COMPLETE + Group By Operator + keys: _col0 (type: bigint) + minReductionHashAggr: 0.4 + mode: hash + outputColumnNames: _col0 + Statistics: Num rows: 4320980099 Data size: 34567840792 Basic stats: COMPLETE Column stats: COMPLETE + Reduce Output Operator + key expressions: _col0 (type: bigint) + null sort order: z + sort order: + + Map-reduce partition columns: _col0 (type: bigint) + Statistics: Num rows: 4320980099 Data size: 34567840792 Basic stats: COMPLETE Column stats: COMPLETE + Execution mode: vectorized, llap + LLAP IO: may be used (ACID table) + Map 8 + Map Operator Tree: + TableScan + alias: customer_address + filterExpr: (ca_state = 'NY') (type: boolean) + Statistics: Num rows: 40000000 Data size: 3760000000 Basic stats: COMPLETE Column stats: COMPLETE + Filter Operator + predicate: (ca_state = 'NY') (type: boolean) + Statistics: Num rows: 754717 Data size: 70943398 Basic stats: COMPLETE Column stats: COMPLETE + Select Operator + expressions: ca_address_sk (type: bigint) + outputColumnNames: _col0 + Statistics: Num rows: 754717 Data size: 6037736 Basic stats: COMPLETE Column stats: COMPLETE + Reduce Output Operator + key expressions: _col0 (type: bigint) + null sort order: z + sort order: + + Map-reduce partition columns: _col0 (type: bigint) + Statistics: Num rows: 754717 Data size: 6037736 Basic stats: COMPLETE Column stats: COMPLETE + Execution mode: vectorized, llap + LLAP IO: may be used (ACID table) + Map 9 + Map Operator Tree: + TableScan + alias: call_center + filterExpr: (cc_county) IN ('Daviess County', 'Franklin Parish', 'Huron County', 'Levy County', 'Ziebach County') (type: boolean) + Statistics: Num rows: 60 Data size: 6360 Basic stats: COMPLETE Column stats: COMPLETE + Filter Operator + predicate: (cc_county) IN ('Daviess County', 'Franklin Parish', 'Huron County', 'Levy County', 'Ziebach County') (type: boolean) + Statistics: Num rows: 12 Data size: 1272 Basic stats: COMPLETE Column stats: COMPLETE + Select Operator + expressions: cc_call_center_sk (type: bigint) + outputColumnNames: _col0 + Statistics: Num rows: 12 Data size: 96 Basic stats: COMPLETE Column stats: COMPLETE + Reduce Output Operator + key expressions: _col0 (type: bigint) + null sort order: z + sort order: + + Map-reduce partition columns: _col0 (type: bigint) + Statistics: Num rows: 12 Data size: 96 Basic stats: COMPLETE Column stats: COMPLETE + Execution mode: vectorized, llap + LLAP IO: may be used (ACID table) + Reducer 2 + Execution mode: llap + Reduce Operator Tree: + Map Join Operator + condition map: + Left Semi Join 0 to 1 + keys: + 0 KEY.reducesinkkey0 (type: bigint) + 1 KEY.reducesinkkey0 (type: bigint) + outputColumnNames: _col3, _col4, _col5, _col6, _col14 + input vertices: + 1 Map 11 + residual filter predicates: {(_col3 <> _col14)} + Statistics: Num rows: 17851352 Data size: 2142162368 Basic stats: COMPLETE Column stats: COMPLETE + DynamicPartitionHashJoin: true + Select Operator + expressions: _col4 (type: bigint), _col5 (type: decimal(7,2)), _col6 (type: decimal(7,2)) + outputColumnNames: _col4, _col5, _col6 + Statistics: Num rows: 17851352 Data size: 2142162352 Basic stats: COMPLETE Column stats: COMPLETE + Reduce Output Operator + key expressions: _col4 (type: bigint) + null sort order: z + sort order: + + Map-reduce partition columns: _col4 (type: bigint) + Statistics: Num rows: 17851352 Data size: 2142162352 Basic stats: COMPLETE Column stats: COMPLETE + value expressions: _col5 (type: decimal(7,2)), _col6 (type: decimal(7,2)) + Select Operator + expressions: _col4 (type: bigint) + outputColumnNames: _col4 + Statistics: Num rows: 17851352 Data size: 142810816 Basic stats: COMPLETE Column stats: COMPLETE + Group By Operator + aggregations: min(_col4), max(_col4), bloom_filter(_col4, expectedEntries=1869746) + minReductionHashAggr: 0.99 + mode: hash + outputColumnNames: _col0, _col1, _col2 + Statistics: Num rows: 1 Data size: 160 Basic stats: COMPLETE Column stats: COMPLETE + Reduce Output Operator + null sort order: + sort order: + Statistics: Num rows: 1 Data size: 160 Basic stats: COMPLETE Column stats: COMPLETE + value expressions: _col0 (type: bigint), _col1 (type: bigint), _col2 (type: binary) + Reducer 3 + Execution mode: vectorized, llap + Reduce Operator Tree: + Map Join Operator + condition map: + Anti Join 0 to 1 + keys: + 0 KEY.reducesinkkey0 (type: bigint) + 1 KEY.reducesinkkey0 (type: bigint) + outputColumnNames: _col4, _col5, _col6 + input vertices: + 1 Map 12 + Statistics: Num rows: 17851352 Data size: 2142162352 Basic stats: COMPLETE Column stats: COMPLETE + DynamicPartitionHashJoin: true + Group By Operator + aggregations: sum(_col5), sum(_col6) + keys: _col4 (type: bigint) + minReductionHashAggr: 0.8952603 + mode: hash + outputColumnNames: _col0, _col2, _col3 + Statistics: Num rows: 8925676 Data size: 2070756832 Basic stats: COMPLETE Column stats: COMPLETE + Reduce Output Operator + key expressions: _col0 (type: bigint) + null sort order: z + sort order: + + Map-reduce partition columns: _col0 (type: bigint) + Statistics: Num rows: 8925676 Data size: 2070756832 Basic stats: COMPLETE Column stats: COMPLETE + value expressions: _col2 (type: decimal(17,2)), _col3 (type: decimal(17,2)) + Reducer 4 + Execution mode: vectorized, llap + Reduce Operator Tree: + Group By Operator + aggregations: sum(VALUE._col0), sum(VALUE._col1) + keys: KEY._col0 (type: bigint) + mode: partial2 + outputColumnNames: _col0, _col1, _col2 + Statistics: Num rows: 8925676 Data size: 2070756832 Basic stats: COMPLETE Column stats: COMPLETE + Group By Operator + aggregations: count(_col0), sum(_col1), sum(_col2) + mode: partial2 + outputColumnNames: _col0, _col1, _col2 + Statistics: Num rows: 1 Data size: 232 Basic stats: COMPLETE Column stats: COMPLETE + Reduce Output Operator + null sort order: + sort order: + Statistics: Num rows: 1 Data size: 232 Basic stats: COMPLETE Column stats: COMPLETE + value expressions: _col0 (type: bigint), _col1 (type: decimal(17,2)), _col2 (type: decimal(17,2)) + Reducer 5 + Execution mode: vectorized, llap + Reduce Operator Tree: + Group By Operator + aggregations: count(VALUE._col0), sum(VALUE._col1), sum(VALUE._col2) + mode: mergepartial + outputColumnNames: _col0, _col1, _col2 + Statistics: Num rows: 1 Data size: 232 Basic stats: COMPLETE Column stats: COMPLETE + File Output Operator + compressed: false + Statistics: Num rows: 1 Data size: 232 Basic stats: COMPLETE Column stats: COMPLETE + table: + input format: org.apache.hadoop.mapred.SequenceFileInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + Reducer 6 + Execution mode: vectorized, llap + Reduce Operator Tree: + Group By Operator + aggregations: min(VALUE._col0), max(VALUE._col1), bloom_filter(VALUE._col2, 1, expectedEntries=1869746) + mode: final + outputColumnNames: _col0, _col1, _col2 + Statistics: Num rows: 1 Data size: 160 Basic stats: COMPLETE Column stats: COMPLETE + Reduce Output Operator + null sort order: + sort order: + Statistics: Num rows: 1 Data size: 160 Basic stats: COMPLETE Column stats: COMPLETE + value expressions: _col0 (type: bigint), _col1 (type: bigint), _col2 (type: binary) + Reducer 7 + Execution mode: vectorized, llap + Reduce Operator Tree: + Group By Operator + aggregations: min(VALUE._col0), max(VALUE._col1), bloom_filter(VALUE._col2, 1, expectedEntries=1869746) + mode: final + outputColumnNames: _col0, _col1, _col2 + Statistics: Num rows: 1 Data size: 160 Basic stats: COMPLETE Column stats: COMPLETE + Reduce Output Operator + null sort order: + sort order: + Statistics: Num rows: 1 Data size: 160 Basic stats: COMPLETE Column stats: COMPLETE + value expressions: _col0 (type: bigint), _col1 (type: bigint), _col2 (type: binary) + + Stage: Stage-0 + Fetch Operator + limit: -1 + Processor Tree: + ListSink + diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/MetaStoreDirectSql.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/MetaStoreDirectSql.java index 12c78c347e43..d9e5e755ba96 100644 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/MetaStoreDirectSql.java +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/MetaStoreDirectSql.java @@ -1880,14 +1880,8 @@ private List columnStatisticsObjForPartitions( return Batchable.runBatched(batchSize, colNames, new Batchable() { @Override public List run(final List inputColNames) throws MetaException { - return Batchable.runBatched(batchSize, partNames, new Batchable() { - @Override - public List run(List inputPartNames) throws MetaException { - return columnStatisticsObjForPartitionsBatch(catName, dbName, tableName, inputPartNames, - inputColNames, engine, areAllPartsFound, useDensityFunctionForNDVEstimation, ndvTuner, - enableBitVector, enableKll); - } - }); + return columnStatisticsObjForPartitionsBatch(catName, dbName, tableName, partNames, inputColNames, engine, + areAllPartsFound, useDensityFunctionForNDVEstimation, ndvTuner, enableBitVector, enableKll); } }); } @@ -1936,8 +1930,15 @@ private List columnStatisticsObjForPartitionsBatch(String c return aggrStatsUseJava(catName, dbName, tableName, partNames, colNames, engine, areAllPartsFound, useDensityFunctionForNDVEstimation, ndvTuner, enableBitVector, enableKll); } else { - return aggrStatsUseDB(catName, dbName, tableName, partNames, colNames, engine, areAllPartsFound, - useDensityFunctionForNDVEstimation, ndvTuner); + return Batchable.runBatched(batchSize, partNames, new Batchable() { + @Override + public List run(List inputPartNames) + throws MetaException { + return aggrStatsUseDB(catName, dbName, tableName, inputPartNames, colNames, engine, areAllPartsFound, + useDensityFunctionForNDVEstimation, ndvTuner); + } + }); + } } From 4e9e1b9c4323d284e6fd6ad9683762dc0d52ee00 Mon Sep 17 00:00:00 2001 From: ramitg254 Date: Sat, 20 Sep 2025 23:28:07 +0530 Subject: [PATCH 2/6] dropping batched test --- .../clientpositive/perf/batched_query16.q | 35 -- .../perf/tpcds30tb/tez/batched_query16.q.out | 342 ------------------ 2 files changed, 377 deletions(-) delete mode 100644 ql/src/test/queries/clientpositive/perf/batched_query16.q delete mode 100644 ql/src/test/results/clientpositive/perf/tpcds30tb/tez/batched_query16.q.out diff --git a/ql/src/test/queries/clientpositive/perf/batched_query16.q b/ql/src/test/queries/clientpositive/perf/batched_query16.q deleted file mode 100644 index 14dc24654736..000000000000 --- a/ql/src/test/queries/clientpositive/perf/batched_query16.q +++ /dev/null @@ -1,35 +0,0 @@ -set hive.mapred.mode=nonstrict; -set hive.auto.convert.anti.join=true; -set hive.metastore.direct.sql.batch.size=1000; --- start query 1 in stream 0 using template query16.tpl and seed 171719422 -explain -select - count(distinct cs_order_number) as `order count` - ,sum(cs_ext_ship_cost) as `total shipping cost` - ,sum(cs_net_profit) as `total net profit` -from - catalog_sales cs1 - ,date_dim - ,customer_address - ,call_center -where - d_date between '2001-4-01' and - (cast('2001-4-01' as date) + 60 days) -and cs1.cs_ship_date_sk = d_date_sk -and cs1.cs_ship_addr_sk = ca_address_sk -and ca_state = 'NY' -and cs1.cs_call_center_sk = cc_call_center_sk -and cc_county in ('Ziebach County','Levy County','Huron County','Franklin Parish', - 'Daviess County' -) -and exists (select * - from catalog_sales cs2 - where cs1.cs_order_number = cs2.cs_order_number - and cs1.cs_warehouse_sk <> cs2.cs_warehouse_sk) -and not exists(select * - from catalog_returns cr1 - where cs1.cs_order_number = cr1.cr_order_number) -order by count(distinct cs_order_number) -limit 100; - --- end query 1 in stream 0 using template query16.tpl diff --git a/ql/src/test/results/clientpositive/perf/tpcds30tb/tez/batched_query16.q.out b/ql/src/test/results/clientpositive/perf/tpcds30tb/tez/batched_query16.q.out deleted file mode 100644 index 9cd4986a6e30..000000000000 --- a/ql/src/test/results/clientpositive/perf/tpcds30tb/tez/batched_query16.q.out +++ /dev/null @@ -1,342 +0,0 @@ -STAGE DEPENDENCIES: - Stage-1 is a root stage - Stage-0 depends on stages: Stage-1 - -STAGE PLANS: - Stage: Stage-1 - Tez -#### A masked pattern was here #### - Edges: - Map 1 <- Map 10 (BROADCAST_EDGE), Map 8 (BROADCAST_EDGE), Map 9 (BROADCAST_EDGE) - Map 11 <- Reducer 7 (BROADCAST_EDGE) - Map 12 <- Reducer 6 (BROADCAST_EDGE) - Reducer 2 <- Map 1 (CUSTOM_SIMPLE_EDGE), Map 11 (CUSTOM_SIMPLE_EDGE) - Reducer 3 <- Map 12 (CUSTOM_SIMPLE_EDGE), Reducer 2 (CUSTOM_SIMPLE_EDGE) - Reducer 4 <- Reducer 3 (SIMPLE_EDGE) - Reducer 5 <- Reducer 4 (CUSTOM_SIMPLE_EDGE) - Reducer 6 <- Reducer 2 (CUSTOM_SIMPLE_EDGE) - Reducer 7 <- Map 1 (CUSTOM_SIMPLE_EDGE) -#### A masked pattern was here #### - Vertices: - Map 1 - Map Operator Tree: - TableScan - alias: cs1 - filterExpr: (cs_ship_addr_sk is not null and cs_ship_date_sk is not null and cs_call_center_sk is not null) (type: boolean) - probeDecodeDetails: cacheKey:HASH_MAP_MAPJOIN_119_container, bigKeyColName:cs_call_center_sk, smallTablePos:1, keyRatio:1.8509578697501366E-10 - Statistics: Num rows: 43220864887 Data size: 11379157992136 Basic stats: COMPLETE Column stats: COMPLETE - Filter Operator - predicate: (cs_ship_addr_sk is not null and cs_ship_date_sk is not null and cs_call_center_sk is not null) (type: boolean) - Statistics: Num rows: 42578387146 Data size: 11210006917944 Basic stats: COMPLETE Column stats: COMPLETE - Select Operator - expressions: cs_ship_date_sk (type: bigint), cs_ship_addr_sk (type: bigint), cs_call_center_sk (type: bigint), cs_warehouse_sk (type: bigint), cs_order_number (type: bigint), cs_ext_ship_cost (type: decimal(7,2)), cs_net_profit (type: decimal(7,2)) - outputColumnNames: _col0, _col1, _col2, _col3, _col4, _col5, _col6 - Statistics: Num rows: 42578387146 Data size: 11210006917944 Basic stats: COMPLETE Column stats: COMPLETE - Map Join Operator - condition map: - Inner Join 0 to 1 - keys: - 0 _col1 (type: bigint) - 1 _col0 (type: bigint) - outputColumnNames: _col0, _col2, _col3, _col4, _col5, _col6 - input vertices: - 1 Map 8 - Statistics: Num rows: 803365808 Data size: 176672786488 Basic stats: COMPLETE Column stats: COMPLETE - Map Join Operator - condition map: - Inner Join 0 to 1 - keys: - 0 _col2 (type: bigint) - 1 _col0 (type: bigint) - outputColumnNames: _col0, _col3, _col4, _col5, _col6 - input vertices: - 1 Map 9 - Statistics: Num rows: 160673164 Data size: 19280779808 Basic stats: COMPLETE Column stats: COMPLETE - Map Join Operator - condition map: - Inner Join 0 to 1 - keys: - 0 _col0 (type: bigint) - 1 _col0 (type: bigint) - outputColumnNames: _col3, _col4, _col5, _col6 - input vertices: - 1 Map 10 - Statistics: Num rows: 17851352 Data size: 2142162360 Basic stats: COMPLETE Column stats: COMPLETE - Reduce Output Operator - key expressions: _col4 (type: bigint) - null sort order: z - sort order: + - Map-reduce partition columns: _col4 (type: bigint) - Statistics: Num rows: 17851352 Data size: 2142162360 Basic stats: COMPLETE Column stats: COMPLETE - value expressions: _col3 (type: bigint), _col5 (type: decimal(7,2)), _col6 (type: decimal(7,2)) - Select Operator - expressions: _col4 (type: bigint) - outputColumnNames: _col4 - Statistics: Num rows: 17851352 Data size: 142810816 Basic stats: COMPLETE Column stats: COMPLETE - Group By Operator - aggregations: min(_col4), max(_col4), bloom_filter(_col4, expectedEntries=1869746) - minReductionHashAggr: 0.99 - mode: hash - outputColumnNames: _col0, _col1, _col2 - Statistics: Num rows: 1 Data size: 160 Basic stats: COMPLETE Column stats: COMPLETE - Reduce Output Operator - null sort order: - sort order: - Statistics: Num rows: 1 Data size: 160 Basic stats: COMPLETE Column stats: COMPLETE - value expressions: _col0 (type: bigint), _col1 (type: bigint), _col2 (type: binary) - Execution mode: vectorized, llap - LLAP IO: may be used (ACID table) - Map 10 - Map Operator Tree: - TableScan - alias: date_dim - filterExpr: CAST( d_date AS TIMESTAMP) BETWEEN TIMESTAMP'2001-04-01 00:00:00' AND TIMESTAMP'2001-05-31 00:00:00' (type: boolean) - Statistics: Num rows: 73049 Data size: 4675136 Basic stats: COMPLETE Column stats: COMPLETE - Filter Operator - predicate: CAST( d_date AS TIMESTAMP) BETWEEN TIMESTAMP'2001-04-01 00:00:00' AND TIMESTAMP'2001-05-31 00:00:00' (type: boolean) - Statistics: Num rows: 8116 Data size: 519424 Basic stats: COMPLETE Column stats: COMPLETE - Select Operator - expressions: d_date_sk (type: bigint) - outputColumnNames: _col0 - Statistics: Num rows: 8116 Data size: 64928 Basic stats: COMPLETE Column stats: COMPLETE - Reduce Output Operator - key expressions: _col0 (type: bigint) - null sort order: z - sort order: + - Map-reduce partition columns: _col0 (type: bigint) - Statistics: Num rows: 8116 Data size: 64928 Basic stats: COMPLETE Column stats: COMPLETE - Execution mode: vectorized, llap - LLAP IO: may be used (ACID table) - Map 11 - Map Operator Tree: - TableScan - alias: cs2 - filterExpr: (cs_warehouse_sk is not null and cs_order_number BETWEEN DynamicValue(RS_29_cs1_cs_order_number_min) AND DynamicValue(RS_29_cs1_cs_order_number_max) and in_bloom_filter(cs_order_number, DynamicValue(RS_29_cs1_cs_order_number_bloom_filter))) (type: boolean) - Statistics: Num rows: 43220864887 Data size: 689811596216 Basic stats: COMPLETE Column stats: COMPLETE - Filter Operator - predicate: (cs_warehouse_sk is not null and cs_order_number BETWEEN DynamicValue(RS_29_cs1_cs_order_number_min) AND DynamicValue(RS_29_cs1_cs_order_number_max) and in_bloom_filter(cs_order_number, DynamicValue(RS_29_cs1_cs_order_number_bloom_filter))) (type: boolean) - Statistics: Num rows: 43005584639 Data size: 686375690624 Basic stats: COMPLETE Column stats: COMPLETE - Select Operator - expressions: cs_order_number (type: bigint), cs_warehouse_sk (type: bigint) - outputColumnNames: _col0, _col1 - Statistics: Num rows: 43005584639 Data size: 686375690624 Basic stats: COMPLETE Column stats: COMPLETE - Group By Operator - keys: _col0 (type: bigint), _col1 (type: bigint) - minReductionHashAggr: 0.45127505 - mode: hash - outputColumnNames: _col0, _col1 - Statistics: Num rows: 43005584639 Data size: 686375690624 Basic stats: COMPLETE Column stats: COMPLETE - Reduce Output Operator - key expressions: _col0 (type: bigint) - null sort order: z - sort order: + - Map-reduce partition columns: _col0 (type: bigint) - Statistics: Num rows: 43005584639 Data size: 686375690624 Basic stats: COMPLETE Column stats: COMPLETE - value expressions: _col1 (type: bigint) - Execution mode: vectorized, llap - LLAP IO: may be used (ACID table) - Map 12 - Map Operator Tree: - TableScan - alias: cr1 - filterExpr: (cr_order_number BETWEEN DynamicValue(RS_35_cs1_cs_order_number_min) AND DynamicValue(RS_35_cs1_cs_order_number_max) and in_bloom_filter(cr_order_number, DynamicValue(RS_35_cs1_cs_order_number_bloom_filter))) (type: boolean) - Statistics: Num rows: 4320980099 Data size: 34567840792 Basic stats: COMPLETE Column stats: COMPLETE - Filter Operator - predicate: (cr_order_number BETWEEN DynamicValue(RS_35_cs1_cs_order_number_min) AND DynamicValue(RS_35_cs1_cs_order_number_max) and in_bloom_filter(cr_order_number, DynamicValue(RS_35_cs1_cs_order_number_bloom_filter))) (type: boolean) - Statistics: Num rows: 4320980099 Data size: 34567840792 Basic stats: COMPLETE Column stats: COMPLETE - Select Operator - expressions: cr_order_number (type: bigint) - outputColumnNames: _col0 - Statistics: Num rows: 4320980099 Data size: 34567840792 Basic stats: COMPLETE Column stats: COMPLETE - Group By Operator - keys: _col0 (type: bigint) - minReductionHashAggr: 0.4 - mode: hash - outputColumnNames: _col0 - Statistics: Num rows: 4320980099 Data size: 34567840792 Basic stats: COMPLETE Column stats: COMPLETE - Reduce Output Operator - key expressions: _col0 (type: bigint) - null sort order: z - sort order: + - Map-reduce partition columns: _col0 (type: bigint) - Statistics: Num rows: 4320980099 Data size: 34567840792 Basic stats: COMPLETE Column stats: COMPLETE - Execution mode: vectorized, llap - LLAP IO: may be used (ACID table) - Map 8 - Map Operator Tree: - TableScan - alias: customer_address - filterExpr: (ca_state = 'NY') (type: boolean) - Statistics: Num rows: 40000000 Data size: 3760000000 Basic stats: COMPLETE Column stats: COMPLETE - Filter Operator - predicate: (ca_state = 'NY') (type: boolean) - Statistics: Num rows: 754717 Data size: 70943398 Basic stats: COMPLETE Column stats: COMPLETE - Select Operator - expressions: ca_address_sk (type: bigint) - outputColumnNames: _col0 - Statistics: Num rows: 754717 Data size: 6037736 Basic stats: COMPLETE Column stats: COMPLETE - Reduce Output Operator - key expressions: _col0 (type: bigint) - null sort order: z - sort order: + - Map-reduce partition columns: _col0 (type: bigint) - Statistics: Num rows: 754717 Data size: 6037736 Basic stats: COMPLETE Column stats: COMPLETE - Execution mode: vectorized, llap - LLAP IO: may be used (ACID table) - Map 9 - Map Operator Tree: - TableScan - alias: call_center - filterExpr: (cc_county) IN ('Daviess County', 'Franklin Parish', 'Huron County', 'Levy County', 'Ziebach County') (type: boolean) - Statistics: Num rows: 60 Data size: 6360 Basic stats: COMPLETE Column stats: COMPLETE - Filter Operator - predicate: (cc_county) IN ('Daviess County', 'Franklin Parish', 'Huron County', 'Levy County', 'Ziebach County') (type: boolean) - Statistics: Num rows: 12 Data size: 1272 Basic stats: COMPLETE Column stats: COMPLETE - Select Operator - expressions: cc_call_center_sk (type: bigint) - outputColumnNames: _col0 - Statistics: Num rows: 12 Data size: 96 Basic stats: COMPLETE Column stats: COMPLETE - Reduce Output Operator - key expressions: _col0 (type: bigint) - null sort order: z - sort order: + - Map-reduce partition columns: _col0 (type: bigint) - Statistics: Num rows: 12 Data size: 96 Basic stats: COMPLETE Column stats: COMPLETE - Execution mode: vectorized, llap - LLAP IO: may be used (ACID table) - Reducer 2 - Execution mode: llap - Reduce Operator Tree: - Map Join Operator - condition map: - Left Semi Join 0 to 1 - keys: - 0 KEY.reducesinkkey0 (type: bigint) - 1 KEY.reducesinkkey0 (type: bigint) - outputColumnNames: _col3, _col4, _col5, _col6, _col14 - input vertices: - 1 Map 11 - residual filter predicates: {(_col3 <> _col14)} - Statistics: Num rows: 17851352 Data size: 2142162368 Basic stats: COMPLETE Column stats: COMPLETE - DynamicPartitionHashJoin: true - Select Operator - expressions: _col4 (type: bigint), _col5 (type: decimal(7,2)), _col6 (type: decimal(7,2)) - outputColumnNames: _col4, _col5, _col6 - Statistics: Num rows: 17851352 Data size: 2142162352 Basic stats: COMPLETE Column stats: COMPLETE - Reduce Output Operator - key expressions: _col4 (type: bigint) - null sort order: z - sort order: + - Map-reduce partition columns: _col4 (type: bigint) - Statistics: Num rows: 17851352 Data size: 2142162352 Basic stats: COMPLETE Column stats: COMPLETE - value expressions: _col5 (type: decimal(7,2)), _col6 (type: decimal(7,2)) - Select Operator - expressions: _col4 (type: bigint) - outputColumnNames: _col4 - Statistics: Num rows: 17851352 Data size: 142810816 Basic stats: COMPLETE Column stats: COMPLETE - Group By Operator - aggregations: min(_col4), max(_col4), bloom_filter(_col4, expectedEntries=1869746) - minReductionHashAggr: 0.99 - mode: hash - outputColumnNames: _col0, _col1, _col2 - Statistics: Num rows: 1 Data size: 160 Basic stats: COMPLETE Column stats: COMPLETE - Reduce Output Operator - null sort order: - sort order: - Statistics: Num rows: 1 Data size: 160 Basic stats: COMPLETE Column stats: COMPLETE - value expressions: _col0 (type: bigint), _col1 (type: bigint), _col2 (type: binary) - Reducer 3 - Execution mode: vectorized, llap - Reduce Operator Tree: - Map Join Operator - condition map: - Anti Join 0 to 1 - keys: - 0 KEY.reducesinkkey0 (type: bigint) - 1 KEY.reducesinkkey0 (type: bigint) - outputColumnNames: _col4, _col5, _col6 - input vertices: - 1 Map 12 - Statistics: Num rows: 17851352 Data size: 2142162352 Basic stats: COMPLETE Column stats: COMPLETE - DynamicPartitionHashJoin: true - Group By Operator - aggregations: sum(_col5), sum(_col6) - keys: _col4 (type: bigint) - minReductionHashAggr: 0.8952603 - mode: hash - outputColumnNames: _col0, _col2, _col3 - Statistics: Num rows: 8925676 Data size: 2070756832 Basic stats: COMPLETE Column stats: COMPLETE - Reduce Output Operator - key expressions: _col0 (type: bigint) - null sort order: z - sort order: + - Map-reduce partition columns: _col0 (type: bigint) - Statistics: Num rows: 8925676 Data size: 2070756832 Basic stats: COMPLETE Column stats: COMPLETE - value expressions: _col2 (type: decimal(17,2)), _col3 (type: decimal(17,2)) - Reducer 4 - Execution mode: vectorized, llap - Reduce Operator Tree: - Group By Operator - aggregations: sum(VALUE._col0), sum(VALUE._col1) - keys: KEY._col0 (type: bigint) - mode: partial2 - outputColumnNames: _col0, _col1, _col2 - Statistics: Num rows: 8925676 Data size: 2070756832 Basic stats: COMPLETE Column stats: COMPLETE - Group By Operator - aggregations: count(_col0), sum(_col1), sum(_col2) - mode: partial2 - outputColumnNames: _col0, _col1, _col2 - Statistics: Num rows: 1 Data size: 232 Basic stats: COMPLETE Column stats: COMPLETE - Reduce Output Operator - null sort order: - sort order: - Statistics: Num rows: 1 Data size: 232 Basic stats: COMPLETE Column stats: COMPLETE - value expressions: _col0 (type: bigint), _col1 (type: decimal(17,2)), _col2 (type: decimal(17,2)) - Reducer 5 - Execution mode: vectorized, llap - Reduce Operator Tree: - Group By Operator - aggregations: count(VALUE._col0), sum(VALUE._col1), sum(VALUE._col2) - mode: mergepartial - outputColumnNames: _col0, _col1, _col2 - Statistics: Num rows: 1 Data size: 232 Basic stats: COMPLETE Column stats: COMPLETE - File Output Operator - compressed: false - Statistics: Num rows: 1 Data size: 232 Basic stats: COMPLETE Column stats: COMPLETE - table: - input format: org.apache.hadoop.mapred.SequenceFileInputFormat - output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat - serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe - Reducer 6 - Execution mode: vectorized, llap - Reduce Operator Tree: - Group By Operator - aggregations: min(VALUE._col0), max(VALUE._col1), bloom_filter(VALUE._col2, 1, expectedEntries=1869746) - mode: final - outputColumnNames: _col0, _col1, _col2 - Statistics: Num rows: 1 Data size: 160 Basic stats: COMPLETE Column stats: COMPLETE - Reduce Output Operator - null sort order: - sort order: - Statistics: Num rows: 1 Data size: 160 Basic stats: COMPLETE Column stats: COMPLETE - value expressions: _col0 (type: bigint), _col1 (type: bigint), _col2 (type: binary) - Reducer 7 - Execution mode: vectorized, llap - Reduce Operator Tree: - Group By Operator - aggregations: min(VALUE._col0), max(VALUE._col1), bloom_filter(VALUE._col2, 1, expectedEntries=1869746) - mode: final - outputColumnNames: _col0, _col1, _col2 - Statistics: Num rows: 1 Data size: 160 Basic stats: COMPLETE Column stats: COMPLETE - Reduce Output Operator - null sort order: - sort order: - Statistics: Num rows: 1 Data size: 160 Basic stats: COMPLETE Column stats: COMPLETE - value expressions: _col0 (type: bigint), _col1 (type: bigint), _col2 (type: binary) - - Stage: Stage-0 - Fetch Operator - limit: -1 - Processor Tree: - ListSink - From b56681625ebed7ff057847cdeedd52bd25dee41f Mon Sep 17 00:00:00 2001 From: ramitg254 Date: Wed, 24 Sep 2025 12:28:50 +0530 Subject: [PATCH 3/6] altered behaviour for batches in case kll and bit vector both are disabled --- .../hive/metastore/MetaStoreDirectSql.java | 458 ++++++++++-------- .../metastore/MetastoreDirectSqlUtils.java | 48 ++ 2 files changed, 297 insertions(+), 209 deletions(-) diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/MetaStoreDirectSql.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/MetaStoreDirectSql.java index d9e5e755ba96..e66429488e0c 100644 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/MetaStoreDirectSql.java +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/MetaStoreDirectSql.java @@ -1930,18 +1930,48 @@ private List columnStatisticsObjForPartitionsBatch(String c return aggrStatsUseJava(catName, dbName, tableName, partNames, colNames, engine, areAllPartsFound, useDensityFunctionForNDVEstimation, ndvTuner, enableBitVector, enableKll); } else { - return Batchable.runBatched(batchSize, partNames, new Batchable() { - @Override - public List run(List inputPartNames) - throws MetaException { - return aggrStatsUseDB(catName, dbName, tableName, inputPartNames, colNames, engine, areAllPartsFound, - useDensityFunctionForNDVEstimation, ndvTuner); - } - }); - + return aggrStatsUseDB(catName, dbName, tableName, partNames, colNames, engine, areAllPartsFound, + useDensityFunctionForNDVEstimation, ndvTuner); } } + private Batchable jobsBatching(final String queryText0, final String catName, final String dbName, + final String tableName, final List partNames, final String engine, final boolean doTrace) { + return new Batchable() { + @Override + public List run(final List inputColNames) + throws MetaException { + Batchable b2 = new Batchable() { + @Override + public List run(List inputPartNames) + throws MetaException { + String queryText = + String.format(queryText0, makeParams(inputColNames.size()), makeParams(inputPartNames.size())); + long start = doTrace ? System.nanoTime() : 0; + Query query = pm.newQuery("javax.jdo.query.SQL", queryText); + try { + Object qResult = executeWithArray(query, + prepareParams(catName, dbName, tableName, inputPartNames, inputColNames, engine), queryText); + long end = doTrace ? System.nanoTime() : 0; + MetastoreDirectSqlUtils.timingTrace(doTrace, queryText0, start, end); + if (qResult == null) { + return Collections.emptyList(); + } + return MetastoreDirectSqlUtils.ensureList(qResult); + } finally { + addQueryAfterUse(query); + } + } + }; + try { + return Batchable.runBatched(batchSize, partNames, b2); + } finally { + addQueryAfterUse(b2); + } + } + }; + } + private List aggrStatsUseJava(String catName, String dbName, String tableName, List partNames, List colNames, String engine, boolean areAllPartsFound, boolean useDensityFunctionForNDVEstimation, double ndvTuner, boolean enableBitVector, @@ -1954,9 +1984,22 @@ private List aggrStatsUseJava(String catName, String dbName areAllPartsFound, useDensityFunctionForNDVEstimation, ndvTuner); } - private List aggrStatsUseDB(String catName, String dbName, - String tableName, List partNames, List colNames, String engine, - boolean areAllPartsFound, boolean useDensityFunctionForNDVEstimation, double ndvTuner) throws MetaException { + private Map> columnWiseSubList (List list){ + Map> colSubList = new HashMap<>(); + for (Object[] row : list) { + String colName = (String) row[0]; + if (!colSubList.containsKey(colName)) { + colSubList.put(colName, new ArrayList<>()); + } + colSubList.get(colName).add(row); + } + return colSubList; + } + + private List aggrStatsUseDB(String catName, String dbName, String tableName, + List partNames, List colNames, String engine, boolean areAllPartsFound, + boolean useDensityFunctionForNDVEstimation, double ndvTuner) + throws MetaException { // TODO: all the extrapolation logic should be moved out of this class, // only mechanical data retrieval should remain here. String commonPrefix = "select \"COLUMN_NAME\", \"COLUMN_TYPE\", " @@ -1977,116 +2020,95 @@ private List aggrStatsUseDB(String catName, String dbName, // And, we also guarantee that the estimation makes sense by comparing it to the // UpperBound (calculated by "sum(\"NUM_DISTINCTS\")") // and LowerBound (calculated by "max(\"NUM_DISTINCTS\")") - + "avg((\"LONG_HIGH_VALUE\"-\"LONG_LOW_VALUE\")/cast(\"NUM_DISTINCTS\" as decimal))," - + "avg((\"DOUBLE_HIGH_VALUE\"-\"DOUBLE_LOW_VALUE\")/\"NUM_DISTINCTS\")," - + "avg((cast(\"BIG_DECIMAL_HIGH_VALUE\" as decimal)-cast(\"BIG_DECIMAL_LOW_VALUE\" as decimal))/\"NUM_DISTINCTS\")," - + "sum(\"NUM_DISTINCTS\")" + " from " + PART_COL_STATS + "" - + " inner join " + PARTITIONS + " on " + PART_COL_STATS + ".\"PART_ID\" = " + PARTITIONS + ".\"PART_ID\"" - + " inner join " + TBLS + " on " + PARTITIONS + ".\"TBL_ID\" = " + TBLS + ".\"TBL_ID\"" - + " inner join " + DBS + " on " + TBLS + ".\"DB_ID\" = " + DBS + ".\"DB_ID\"" - + " where " + DBS + ".\"CTLG_NAME\" = ? and " + DBS + ".\"NAME\" = ? and " + TBLS + ".\"TBL_NAME\" = ? "; + + "sum((\"LONG_HIGH_VALUE\"-\"LONG_LOW_VALUE\")/cast(\"NUM_DISTINCTS\" as decimal))," + + "count((\"LONG_HIGH_VALUE\"-\"LONG_LOW_VALUE\")/cast(\"NUM_DISTINCTS\" as decimal))," + + "sum((\"DOUBLE_HIGH_VALUE\"-\"DOUBLE_LOW_VALUE\")/\"NUM_DISTINCTS\")," + + "count((\"DOUBLE_HIGH_VALUE\"-\"DOUBLE_LOW_VALUE\")/\"NUM_DISTINCTS\")," + + "sum((cast(\"BIG_DECIMAL_HIGH_VALUE\" as decimal)-cast(\"BIG_DECIMAL_LOW_VALUE\" as decimal))/\"NUM_DISTINCTS\")," + + "count((cast(\"BIG_DECIMAL_HIGH_VALUE\" as decimal)-cast(\"BIG_DECIMAL_LOW_VALUE\" as decimal))/\"NUM_DISTINCTS\")," + + "sum(\"NUM_DISTINCTS\")" + " from " + PART_COL_STATS + "" + " inner join " + PARTITIONS + " on " + + PART_COL_STATS + ".\"PART_ID\" = " + PARTITIONS + ".\"PART_ID\"" + " inner join " + TBLS + " on " + PARTITIONS + + ".\"TBL_ID\" = " + TBLS + ".\"TBL_ID\"" + " inner join " + DBS + " on " + TBLS + ".\"DB_ID\" = " + DBS + + ".\"DB_ID\"" + " where " + DBS + ".\"CTLG_NAME\" = ? and " + DBS + ".\"NAME\" = ? and " + TBLS + + ".\"TBL_NAME\" = ? "; String queryText = null; - long start = 0; - long end = 0; boolean doTrace = LOG.isDebugEnabled(); ForwardQueryResult fqr = null; // Check if the status of all the columns of all the partitions exists // Extrapolation is not needed. if (areAllPartsFound) { - queryText = commonPrefix + " and \"COLUMN_NAME\" in (" + makeParams(colNames.size()) + ")" - + " and " + PARTITIONS + ".\"PART_NAME\" in (" + makeParams(partNames.size()) + ")" - + " and \"ENGINE\" = ? " - + " group by \"COLUMN_NAME\", \"COLUMN_TYPE\""; - start = doTrace ? System.nanoTime() : 0; - try (QueryWrapper query = new QueryWrapper(pm.newQuery("javax.jdo.query.SQL", queryText))) { - Object qResult = executeWithArray(query.getInnerQuery(), - prepareParams(catName, dbName, tableName, partNames, colNames, - engine), queryText); - if (qResult == null) { - return Collections.emptyList(); - } - end = doTrace ? System.nanoTime() : 0; - MetastoreDirectSqlUtils.timingTrace(doTrace, queryText, start, end); - List list = MetastoreDirectSqlUtils.ensureList(qResult); - List colStats = - new ArrayList(list.size()); - for (Object[] row : list) { - colStats.add(prepareCSObjWithAdjustedNDV(row, 0, - useDensityFunctionForNDVEstimation, ndvTuner)); + queryText = commonPrefix + " and \"COLUMN_NAME\" in (%1$s)" + " and " + PARTITIONS + ".\"PART_NAME\" in (%2$s)" + + " and \"ENGINE\" = ? " + " group by \"COLUMN_NAME\", \"COLUMN_TYPE\""; + Batchable b = jobsBatching(queryText, catName, dbName, tableName, partNames, engine, doTrace); + List colStats = new ArrayList(colNames.size()); + try { + List list = Batchable.runBatched(batchSize, colNames, b); + Map> colSubList = columnWiseSubList(list); + for (String colName : colSubList.keySet()) { + colStats.add(CSObjWithAdjustedNDV(colSubList.get(colName), 0, useDensityFunctionForNDVEstimation, ndvTuner)); Deadline.checkTimeout(); } - return colStats; + } finally { + b.closeAllQueries(); } + return colStats; } else { // Extrapolation is needed for some columns. // In this case, at least a column status for a partition is missing. // We need to extrapolate this partition based on the other partitions List colStats = new ArrayList(colNames.size()); - queryText = "select \"COLUMN_NAME\", \"COLUMN_TYPE\", count(\"PART_COL_STATS\".\"PART_ID\") " - + " from " + PART_COL_STATS - + " inner join " + PARTITIONS + " on " + PART_COL_STATS + ".\"PART_ID\" = " + PARTITIONS + ".\"PART_ID\"" - + " inner join " + TBLS + " on " + PARTITIONS + ".\"TBL_ID\" = " + TBLS + ".\"TBL_ID\"" - + " inner join " + DBS + " on " + TBLS + ".\"DB_ID\" = " + DBS + ".\"DB_ID\"" - + " where " + DBS + ".\"CTLG_NAME\" = ? and " + DBS + ".\"NAME\" = ? and " + TBLS + ".\"TBL_NAME\" = ? " - + " and " + PART_COL_STATS + ".\"COLUMN_NAME\" in (" + makeParams(colNames.size()) + ")" - + " and " + PARTITIONS + ".\"PART_NAME\" in (" + makeParams(partNames.size()) + ")" - + " and " + PART_COL_STATS + ".\"ENGINE\" = ? " - + " group by " + PART_COL_STATS + ".\"COLUMN_NAME\", " + PART_COL_STATS + ".\"COLUMN_TYPE\""; - start = doTrace ? System.nanoTime() : 0; + queryText = + "select \"COLUMN_NAME\", \"COLUMN_TYPE\", count(\"PART_COL_STATS\".\"PART_ID\") " + " from " + PART_COL_STATS + + " inner join " + PARTITIONS + " on " + PART_COL_STATS + ".\"PART_ID\" = " + PARTITIONS + ".\"PART_ID\"" + + " inner join " + TBLS + " on " + PARTITIONS + ".\"TBL_ID\" = " + TBLS + ".\"TBL_ID\"" + " inner join " + + DBS + " on " + TBLS + ".\"DB_ID\" = " + DBS + ".\"DB_ID\"" + " where " + DBS + ".\"CTLG_NAME\" = ? and " + + DBS + ".\"NAME\" = ? and " + TBLS + ".\"TBL_NAME\" = ? " + " and " + PART_COL_STATS + + ".\"COLUMN_NAME\" in (%1$s)" + " and " + PARTITIONS + ".\"PART_NAME\" in (%2$s)" + " and " + + PART_COL_STATS + ".\"ENGINE\" = ? " + " group by " + PART_COL_STATS + ".\"COLUMN_NAME\", " + + PART_COL_STATS + ".\"COLUMN_TYPE\""; + + Batchable b = jobsBatching(queryText, catName, dbName, tableName, partNames, engine, doTrace); List noExtraColumnNames = new ArrayList(); Map extraColumnNameTypeParts = new HashMap(); - try(QueryWrapper query = new QueryWrapper(pm.newQuery("javax.jdo.query.SQL", queryText))) { - Object qResult = executeWithArray(query.getInnerQuery(), - prepareParams(catName, dbName, tableName, partNames, colNames, - engine), queryText); - end = doTrace ? System.nanoTime() : 0; - MetastoreDirectSqlUtils.timingTrace(doTrace, queryText, start, end); - if (qResult == null) { - return Collections.emptyList(); - } - - List list = MetastoreDirectSqlUtils.ensureList(qResult); - for (Object[] row : list) { - String colName = (String) row[0]; - String colType = (String) row[1]; - // Extrapolation is not needed for this column if - // count(\"PARTITION_NAME\")==partNames.size() - // Or, extrapolation is not possible for this column if - // count(\"PARTITION_NAME\")<2 - Long count = MetastoreDirectSqlUtils.extractSqlLong(row[2]); + try { + List list = Batchable.runBatched(batchSize, colNames, b); + Map> colSubList = columnWiseSubList(list); + for (String colName : colSubList.keySet()) { + List subList = colSubList.get(colName); + String colType = (String) subList.getFirst()[1]; + Object countObj = null; + for (Object[] row : subList) { + countObj = MetastoreDirectSqlUtils.sum(countObj, row[2], 1); + } + Long count = MetastoreDirectSqlUtils.extractSqlLong(countObj); if (count == partNames.size() || count < 2) { noExtraColumnNames.add(colName); } else { - extraColumnNameTypeParts.put(colName, new String[] {colType, String.valueOf(count)}); + extraColumnNameTypeParts.put(colName, new String[]{colType, String.valueOf(count)}); } Deadline.checkTimeout(); } + } finally { + b.closeAllQueries(); } // Extrapolation is not needed for columns noExtraColumnNames List list; if (noExtraColumnNames.size() != 0) { - queryText = commonPrefix + " and \"COLUMN_NAME\" in (" - + makeParams(noExtraColumnNames.size()) + ")" + " and \"PARTITION_NAME\" in (" - + makeParams(partNames.size()) + ")" - + " and \"ENGINE\" = ? " - + " group by \"COLUMN_NAME\", \"COLUMN_TYPE\""; - start = doTrace ? System.nanoTime() : 0; - - try (QueryWrapper query = new QueryWrapper(pm.newQuery("javax.jdo.query.SQL", queryText))) { - Object qResult = executeWithArray(query.getInnerQuery(), - prepareParams(catName, dbName, tableName, partNames, noExtraColumnNames, engine), queryText); - if (qResult == null) { - return Collections.emptyList(); - } - list = MetastoreDirectSqlUtils.ensureList(qResult); - for (Object[] row : list) { - colStats.add(prepareCSObjWithAdjustedNDV(row, 0, - useDensityFunctionForNDVEstimation, ndvTuner)); + queryText = commonPrefix + " and \"COLUMN_NAME\" in (%1$s)" + " and \"PARTITION_NAME\" in (%2$s)" + + " and \"ENGINE\" = ? " + " group by \"COLUMN_NAME\", \"COLUMN_TYPE\""; + + b = jobsBatching(queryText, catName, dbName, tableName, partNames, engine, doTrace); + try { + list = Batchable.runBatched(batchSize, noExtraColumnNames, b); + Map> colSubList = columnWiseSubList(list); + for (String colName : colSubList.keySet()) { + colStats.add( + CSObjWithAdjustedNDV(colSubList.get(colName), 0, useDensityFunctionForNDVEstimation, ndvTuner)); Deadline.checkTimeout(); } - end = doTrace ? System.nanoTime() : 0; - MetastoreDirectSqlUtils.timingTrace(doTrace, queryText, start, end); + } finally { + b.closeAllQueries(); } } // Extrapolation is needed for extraColumnNames. @@ -2098,29 +2120,38 @@ private List aggrStatsUseDB(String catName, String dbName, } // get sum for all columns to reduce the number of queries Map> sumMap = new HashMap>(); - queryText = "select \"COLUMN_NAME\", sum(\"NUM_NULLS\"), sum(\"NUM_TRUES\"), sum(\"NUM_FALSES\"), sum(\"NUM_DISTINCTS\")" - + " from " + PART_COL_STATS - + " inner join " + PARTITIONS + " on " + PART_COL_STATS + ".\"PART_ID\" = " + PARTITIONS + ".\"PART_ID\"" - + " inner join " + TBLS + " on " + PARTITIONS + ".\"TBL_ID\" = " + TBLS + ".\"TBL_ID\"" - + " inner join " + DBS + " on " + TBLS + ".\"DB_ID\" = " + DBS + ".\"DB_ID\"" - + " where " + DBS + ".\"CTLG_NAME\" = ? and " + DBS + ".\"NAME\" = ? and " + TBLS + ".\"TBL_NAME\" = ? " - + " and " + PART_COL_STATS + ".\"COLUMN_NAME\" in (" + makeParams(extraColumnNameTypeParts.size()) + ")" - + " and " + PARTITIONS + ".\"PART_NAME\" in (" + makeParams(partNames.size()) + ")" - + " and " + PART_COL_STATS + ".\"ENGINE\" = ? " - + " group by " + PART_COL_STATS + ".\"COLUMN_NAME\""; - start = doTrace ? System.nanoTime() : 0; - try (QueryWrapper query = new QueryWrapper(pm.newQuery("javax.jdo.query.SQL", queryText))) { + queryText = + "select \"COLUMN_NAME\", sum(\"NUM_NULLS\"), sum(\"NUM_TRUES\"), sum(\"NUM_FALSES\"), sum(\"NUM_DISTINCTS\")" + + " from " + PART_COL_STATS + " inner join " + PARTITIONS + " on " + PART_COL_STATS + ".\"PART_ID\" = " + + PARTITIONS + ".\"PART_ID\"" + " inner join " + TBLS + " on " + PARTITIONS + ".\"TBL_ID\" = " + TBLS + + ".\"TBL_ID\"" + " inner join " + DBS + " on " + TBLS + ".\"DB_ID\" = " + DBS + ".\"DB_ID\"" + + " where " + DBS + ".\"CTLG_NAME\" = ? and " + DBS + ".\"NAME\" = ? and " + TBLS + ".\"TBL_NAME\" = ? " + + " and " + PART_COL_STATS + ".\"COLUMN_NAME\" in (" + makeParams(extraColumnNameTypeParts.size()) + ")" + + " and " + PARTITIONS + ".\"PART_NAME\" in (" + makeParams(partNames.size()) + ")" + " and " + + PART_COL_STATS + ".\"ENGINE\" = ? " + " group by " + PART_COL_STATS + ".\"COLUMN_NAME\""; + + b = jobsBatching(queryText, catName, dbName, tableName, partNames, engine, doTrace); + try { List extraColumnNames = new ArrayList(); extraColumnNames.addAll(extraColumnNameTypeParts.keySet()); - Object qResult = executeWithArray(query.getInnerQuery(), - prepareParams(catName, dbName, tableName, partNames, - extraColumnNames, engine), queryText); - if (qResult == null) { - return Collections.emptyList(); + List unmergedList = Batchable.runBatched(batchSize, extraColumnNames, b); + Map> colSubList = columnWiseSubList(unmergedList); + List mergedList = new ArrayList<>(); + for (String colName : colSubList.keySet()) { + Object[] mergedRow = new Object[5]; + List subList = colSubList.get(colName); + mergedRow[0] = colName; + for (Object[] row : subList) { + mergedRow[1] = MetastoreDirectSqlUtils.sum(mergedRow[1], row[1], 1); + mergedRow[2] = MetastoreDirectSqlUtils.sum(mergedRow[2], row[2], 1); + mergedRow[3] = MetastoreDirectSqlUtils.sum(mergedRow[3], row[3], 1); + mergedRow[4] = MetastoreDirectSqlUtils.sum(mergedRow[4], row[4], 1); + } + mergedList.add(mergedRow); } - list = MetastoreDirectSqlUtils.ensureList(qResult); + list = mergedList; // see the indexes for colstats in IExtrapolatePartStatus - Integer[] sumIndex = new Integer[] {6, 10, 11, 15}; + Integer[] sumIndex = new Integer[]{6, 10, 11, 15}; for (Object[] row : list) { Map indexToObject = new HashMap(); for (int ind = 1; ind < row.length; ind++) { @@ -2130,9 +2161,10 @@ private List aggrStatsUseDB(String catName, String dbName, sumMap.put((String) row[0], indexToObject); Deadline.checkTimeout(); } - end = doTrace ? System.nanoTime() : 0; - MetastoreDirectSqlUtils.timingTrace(doTrace, queryText, start, end); + } finally { + b.closeAllQueries(); } + for (Map.Entry entry : extraColumnNameTypeParts.entrySet()) { Object[] row = new Object[IExtrapolatePartStatus.colStatNames.length + 2]; String colName = entry.getKey(); @@ -2159,6 +2191,37 @@ private List aggrStatsUseDB(String catName, String dbName, if (index == null) { index = IExtrapolatePartStatus.indexMaps.get("default"); } + + //for avg calculation + queryText = "select " + "sum((\"LONG_HIGH_VALUE\"-\"LONG_LOW_VALUE\")/cast(\"NUM_DISTINCTS\" as decimal))," + + "count((\"LONG_HIGH_VALUE\"-\"LONG_LOW_VALUE\")/cast(\"NUM_DISTINCTS\" as decimal))," + + "sum((\"DOUBLE_HIGH_VALUE\"-\"DOUBLE_LOW_VALUE\")/\"NUM_DISTINCTS\")," + + "count((\"DOUBLE_HIGH_VALUE\"-\"DOUBLE_LOW_VALUE\")/\"NUM_DISTINCTS\")," + + "sum((cast(\"BIG_DECIMAL_HIGH_VALUE\" as decimal)-cast(\"BIG_DECIMAL_LOW_VALUE\" as decimal))/\"NUM_DISTINCTS\")," + + "count((cast(\"BIG_DECIMAL_HIGH_VALUE\" as decimal)-cast(\"BIG_DECIMAL_LOW_VALUE\" as decimal))/\"NUM_DISTINCTS\")," + + " from " + PART_COL_STATS + "" + " inner join " + PARTITIONS + " on " + PART_COL_STATS + + ".\"PART_ID\" = " + PARTITIONS + ".\"PART_ID\"" + " inner join " + TBLS + " on " + PARTITIONS + + ".\"TBL_ID\" = " + TBLS + ".\"TBL_ID\"" + " inner join " + DBS + " on " + TBLS + ".\"DB_ID\" = " + DBS + + ".\"DB_ID\"" + " where " + DBS + ".\"CTLG_NAME\" = ? and " + DBS + ".\"NAME\" = ? and " + TBLS + + ".\"TBL_NAME\" = ? " + " and " + PART_COL_STATS + ".\"COLUMN_NAME\" in (%1$s)" + " and " + PARTITIONS + + ".\"PART_NAME\" in (%2$s)" + " and " + PART_COL_STATS + ".\"ENGINE\" = ? " + + " group by \"COLUMN_NAME\""; + + b = jobsBatching(queryText, catName, dbName, tableName, partNames, engine, doTrace); + Object[] avg = new Object[3]; + try { + list = Batchable.runBatched(batchSize, Arrays.asList(colName), b); + for (int i = 0; i < 6; i += 2) { + Object sum = null, count = null; + for (Object[] batch : list) { + sum = MetastoreDirectSqlUtils.sum(sum, batch[i], 0); + count = MetastoreDirectSqlUtils.sum(count, batch[i + 1], 0); + } + avg[i / 2] = MetastoreDirectSqlUtils.divide(sum, count, 0); + } + } finally { + b.closeAllQueries(); + } for (int colStatIndex : index) { String colStatName = IExtrapolatePartStatus.colStatNames[colStatIndex]; // if the aggregation type is sum, we do a scale-up @@ -2174,76 +2237,35 @@ private List aggrStatsUseDB(String catName, String dbName, || IExtrapolatePartStatus.aggrTypes[colStatIndex] == IExtrapolatePartStatus.AggrType.Max) { // if the aggregation type is min/max, we extrapolate from the // left/right borders - if (!decimal) { - queryText = "select \"" + colStatName + "\",\"PART_NAME\" from " + PART_COL_STATS - + " inner join " + PARTITIONS + " on " + PART_COL_STATS + ".\"PART_ID\" = " + PARTITIONS + ".\"PART_ID\"" - + " inner join " + TBLS + " on " + PARTITIONS + ".\"TBL_ID\" = " + TBLS + ".\"TBL_ID\"" - + " inner join " + DBS + " on " + TBLS + ".\"DB_ID\" = " + DBS + ".\"DB_ID\"" - + " where " + DBS + ".\"CTLG_NAME\" = ? and " + DBS + ".\"NAME\" = ? and " + TBLS + ".\"TBL_NAME\" = ? " - + " and " + PART_COL_STATS + ".\"COLUMN_NAME\" = ? " - + " and " + PARTITIONS + ".\"PART_NAME\" in (" + makeParams(partNames.size()) + ")" - + " and " + PART_COL_STATS + ".\"ENGINE\" = ? " - + " order by \"" + colStatName + "\""; - } else { - queryText = "select \"" + colStatName + "\",\"PART_NAME\" from " + PART_COL_STATS - + " inner join " + PARTITIONS + " on " + PART_COL_STATS + ".\"PART_ID\" = " + PARTITIONS + ".\"PART_ID\"" - + " inner join " + TBLS + " on " + PARTITIONS + ".\"TBL_ID\" = " + TBLS + ".\"TBL_ID\"" - + " inner join " + DBS + " on " + TBLS + ".\"DB_ID\" = " + DBS + ".\"DB_ID\"" - + " where " + DBS + ".\"CTLG_NAME\" = ? and " + DBS + ".\"NAME\" = ? and " + TBLS + ".\"TBL_NAME\" = ? " - + " and " + PART_COL_STATS + ".\"COLUMN_NAME\" = ? " - + " and " + PARTITIONS + ".\"PART_NAME\" in (" + makeParams(partNames.size()) + ")" - + " and " + PART_COL_STATS + ".\"ENGINE\" = ? " - + " order by cast(\"" + colStatName + "\" as decimal)"; - } - start = doTrace ? System.nanoTime() : 0; - try (QueryWrapper query = new QueryWrapper(pm.newQuery("javax.jdo.query.SQL", queryText))) { - Object qResult = executeWithArray(query.getInnerQuery(), - prepareParams(catName, dbName, tableName, partNames, Arrays.asList(colName), engine), queryText); - if (qResult == null) { - return Collections.emptyList(); - } - fqr = (ForwardQueryResult) qResult; - Object[] min = (Object[]) (fqr.get(0)); - Object[] max = (Object[]) (fqr.get(fqr.size() - 1)); - end = doTrace ? System.nanoTime() : 0; - MetastoreDirectSqlUtils.timingTrace(doTrace, queryText, start, end); + String orderByExpr = decimal ? "cast(\"" + colStatName + "\" as decimal)" : "\"" + colStatName + "\""; + + queryText = + "select \"" + colStatName + "\",\"PART_NAME\" from " + PART_COL_STATS + " inner join " + PARTITIONS + + " on " + PART_COL_STATS + ".\"PART_ID\" = " + PARTITIONS + ".\"PART_ID\"" + " inner join " + + TBLS + " on " + PARTITIONS + ".\"TBL_ID\" = " + TBLS + ".\"TBL_ID\"" + " inner join " + DBS + + " on " + TBLS + ".\"DB_ID\" = " + DBS + ".\"DB_ID\"" + " where " + DBS + + ".\"CTLG_NAME\" = ? and " + DBS + ".\"NAME\" = ? and " + TBLS + ".\"TBL_NAME\" = ? " + " and " + + PART_COL_STATS + ".\"COLUMN_NAME\" in (%1$s)" + " and " + PARTITIONS + + ".\"PART_NAME\" in (%2$s)" + " and " + PART_COL_STATS + ".\"ENGINE\" = ? " + " order by " + + orderByExpr; + + b = jobsBatching(queryText, catName, dbName, tableName, partNames, engine, doTrace); + try { + list = Batchable.runBatched(batchSize, Arrays.asList(colName), b); + Object[] min = list.getFirst(); + Object[] max = list.getLast(); if (min[0] == null || max[0] == null) { row[2 + colStatIndex] = null; } else { - row[2 + colStatIndex] = extrapolateMethod - .extrapolate(min, max, colStatIndex, indexMap); + row[2 + colStatIndex] = extrapolateMethod.extrapolate(min, max, colStatIndex, indexMap); } + } finally { + b.closeAllQueries(); } } else { - // if the aggregation type is avg, we use the average on the existing ones. - queryText = "select " - + "avg((\"LONG_HIGH_VALUE\"-\"LONG_LOW_VALUE\")/cast(\"NUM_DISTINCTS\" as decimal))," - + "avg((\"DOUBLE_HIGH_VALUE\"-\"DOUBLE_LOW_VALUE\")/\"NUM_DISTINCTS\")," - + "avg((cast(\"BIG_DECIMAL_HIGH_VALUE\" as decimal)-cast(\"BIG_DECIMAL_LOW_VALUE\" as decimal))/\"NUM_DISTINCTS\")" - + " from " + PART_COL_STATS + "" - + " inner join " + PARTITIONS + " on " + PART_COL_STATS + ".\"PART_ID\" = " + PARTITIONS + ".\"PART_ID\"" - + " inner join " + TBLS + " on " + PARTITIONS + ".\"TBL_ID\" = " + TBLS + ".\"TBL_ID\"" - + " inner join " + DBS + " on " + TBLS + ".\"DB_ID\" = " + DBS + ".\"DB_ID\"" - + " where " + DBS + ".\"CTLG_NAME\" = ? and " + DBS + ".\"NAME\" = ? and " + TBLS + ".\"TBL_NAME\" = ? " - + " and " + PART_COL_STATS + ".\"COLUMN_NAME\" = ? " - + " and " + PARTITIONS + ".\"PART_NAME\" in (" + makeParams(partNames.size()) + ")" - + " and " + PART_COL_STATS + ".\"ENGINE\" = ? " - + " group by \"COLUMN_NAME\""; - start = doTrace ? System.nanoTime() : 0; - try(QueryWrapper query = new QueryWrapper(pm.newQuery("javax.jdo.query.SQL", queryText))) { - Object qResult = executeWithArray(query.getInnerQuery(), - prepareParams(catName, dbName, tableName, partNames, Arrays.asList(colName), engine), queryText); - if (qResult == null) { - return Collections.emptyList(); - } - fqr = (ForwardQueryResult) qResult; - Object[] avg = (Object[]) (fqr.get(0)); - // colStatIndex=12,13,14 respond to "AVG_LONG", "AVG_DOUBLE", - // "AVG_DECIMAL" - row[2 + colStatIndex] = avg[colStatIndex - 12]; - end = doTrace ? System.nanoTime() : 0; - MetastoreDirectSqlUtils.timingTrace(doTrace, queryText, start, end); - } + // colStatIndex=12,13,14 respond to "AVG_LONG", "AVG_DOUBLE", + // "AVG_DECIMAL" + row[2 + colStatIndex] = avg[colStatIndex - 12]; } } colStats.add(prepareCSObjWithAdjustedNDV(row, 0, useDensityFunctionForNDVEstimation, ndvTuner)); @@ -2254,6 +2276,54 @@ private List aggrStatsUseDB(String catName, String dbName, } } + private ColumnStatisticsObj CSObjWithAdjustedNDV(List list, int i, + boolean useDensityFunctionForNDVEstimation, double ndvTuner) + throws MetaException { + if (list.isEmpty()) { + return null; + } + ColumnStatisticsData data = new ColumnStatisticsData(); + int j = i; + Object[] row = list.getFirst(); + String colName = (String) row[j++]; + String colType = (String) row[j++]; + ColumnStatisticsObj cso = new ColumnStatisticsObj(colName, colType, data); + Object llow = row[j++], lhigh = row[j++], dlow = row[j++], dhigh = row[j++], declow = row[j++], dechigh = row[j++], + nulls = row[j++], dist = row[j++], avglen = row[j++], maxlen = row[j++], trues = row[j++], falses = row[j++], + sumLong = row[j++], countLong = row[j++], sumDouble = row[j++], countDouble = row[j++], sumDecimal = row[j++], + countDecimal = row[j++], sumDist = row[j++]; + for (int k = 1; k < list.size(); k++) { + j = i + 2; + row = list.get(k); + llow = MetastoreDirectSqlUtils.min(llow, row[j++], 1); + lhigh = MetastoreDirectSqlUtils.max(lhigh, row[j++], 1); + dlow = MetastoreDirectSqlUtils.min(dlow, row[j++], 0); + dhigh = MetastoreDirectSqlUtils.max(dhigh, row[j++], 0); + declow = MetastoreDirectSqlUtils.min(declow, row[j++], 0); + dechigh = MetastoreDirectSqlUtils.max(dechigh, row[j++], 0); + nulls = MetastoreDirectSqlUtils.sum(nulls, row[j++], 1); + dist = MetastoreDirectSqlUtils.max(dist, row[j++], 1); + avglen = MetastoreDirectSqlUtils.max(avglen, row[j++], 0); + maxlen = MetastoreDirectSqlUtils.max(maxlen, row[j++], 1); + trues = MetastoreDirectSqlUtils.sum(trues, row[j++], 1); + falses = MetastoreDirectSqlUtils.sum(falses, row[j++], 1); + sumLong = MetastoreDirectSqlUtils.sum(sumLong, row[j++], 0); + countLong = MetastoreDirectSqlUtils.sum(countLong, row[j++], 0); + sumDouble = MetastoreDirectSqlUtils.sum(sumDouble, row[j++], 0); + countDouble = MetastoreDirectSqlUtils.sum(countDouble, row[j++], 0); + sumDecimal = MetastoreDirectSqlUtils.sum(sumDecimal, row[j++], 0); + countDecimal = MetastoreDirectSqlUtils.sum(countDecimal, row[j++], 0); + sumDist = MetastoreDirectSqlUtils.sum(sumDist, row[j],1); + } + Object avgLong = MetastoreDirectSqlUtils.divide(sumLong, countLong, 0); + Object avgDouble = MetastoreDirectSqlUtils.divide(sumDouble, countDouble, 0); + Object avgDecimal = MetastoreDirectSqlUtils.divide(sumDecimal, countDecimal, 0); + StatObjectConverter.fillColumnStatisticsData(cso.getColType(), data, llow, lhigh, dlow, dhigh, declow, dechigh, + nulls, dist, avglen, maxlen, trues, falses, avgLong, avgDouble, avgDecimal, sumDist, + useDensityFunctionForNDVEstimation, ndvTuner); + return cso; + } + private ColumnStatisticsObj prepareCSObj (Object[] row, int i) throws MetaException { ColumnStatisticsData data = new ColumnStatisticsData(); ColumnStatisticsObj cso = new ColumnStatisticsObj((String)row[i++], (String)row[i++], data); @@ -2314,37 +2384,7 @@ public List getPartitionStats( + " and " + PARTITIONS + ".\"PART_NAME\" in (%2$s)" + " and " + PART_COL_STATS + ".\"ENGINE\" = ? " + " order by " + PARTITIONS + ".\"PART_NAME\""; - Batchable b = new Batchable() { - @Override - public List run(final List inputColNames) throws MetaException { - Batchable b2 = new Batchable() { - @Override - public List run(List inputPartNames) throws MetaException { - String queryText = String.format(queryText0, - makeParams(inputColNames.size()), makeParams(inputPartNames.size())); - long start = doTrace ? System.nanoTime() : 0; - Query query = pm.newQuery("javax.jdo.query.SQL", queryText); - try { - Object qResult = executeWithArray(query, prepareParams( - catName, dbName, tableName, inputPartNames, inputColNames, engine), queryText); - MetastoreDirectSqlUtils.timingTrace(doTrace, queryText0, start, (doTrace ? System.nanoTime() : 0)); - if (qResult == null) { - return Collections.emptyList(); - } - return MetastoreDirectSqlUtils.ensureList(qResult); - } finally { - addQueryAfterUse(query); - } - } - }; - try { - return Batchable.runBatched(batchSize, partNames, b2); - } finally { - addQueryAfterUse(b2); - } - } - }; - + Batchable b = jobsBatching(queryText0, catName, dbName, tableName, partNames, engine, doTrace); List result = new ArrayList(partNames.size()); String lastPartName = null; int from = 0; diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/MetastoreDirectSqlUtils.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/MetastoreDirectSqlUtils.java index 45e89ab40df5..12aba51aaa09 100644 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/MetastoreDirectSqlUtils.java +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/MetastoreDirectSqlUtils.java @@ -646,4 +646,52 @@ public static void throwMetaOrRuntimeException(Exception e) throws MetaException throw new RuntimeException(e); } } + + public static Object sum(Object a, Object b, int type) + throws MetaException { + if (a == null) { + return b; + } + if (b == null) { + return a; + } + return (type == 0) ? MetastoreDirectSqlUtils.extractSqlDouble(a) + MetastoreDirectSqlUtils.extractSqlDouble(b) + : MetastoreDirectSqlUtils.extractSqlLong(a) + MetastoreDirectSqlUtils.extractSqlLong(b); + } + + public static Object divide(Object a, Object b, int type) + throws MetaException { + if (a == null || b == null || (type == 0 && MetastoreDirectSqlUtils.extractSqlDouble(b) == 0) + || MetastoreDirectSqlUtils.extractSqlDouble(b) == 0) { + return null; + } + return (type == 0) ? MetastoreDirectSqlUtils.extractSqlDouble(a) / MetastoreDirectSqlUtils.extractSqlDouble(b) + : MetastoreDirectSqlUtils.extractSqlLong(a) / MetastoreDirectSqlUtils.extractSqlLong(b); + } + + public static Object min(Object a, Object b, int type) + throws MetaException { + if (a == null) { + return b; + } + if (b == null) { + return a; + } + return (type == 0) ? Math.min(MetastoreDirectSqlUtils.extractSqlDouble(a), + MetastoreDirectSqlUtils.extractSqlDouble(b)) + : Math.min(MetastoreDirectSqlUtils.extractSqlLong(a), MetastoreDirectSqlUtils.extractSqlLong(b)); + } + + public static Object max(Object a, Object b, int type) + throws MetaException { + if (a == null) { + return b; + } + if (b == null) { + return a; + } + return (type == 0) ? Math.max(MetastoreDirectSqlUtils.extractSqlDouble(a), + MetastoreDirectSqlUtils.extractSqlDouble(b)) + : Math.max(MetastoreDirectSqlUtils.extractSqlLong(a), MetastoreDirectSqlUtils.extractSqlLong(b)); + } } From fdfa6d5c06c2a5091573554398a87972bd1283c0 Mon Sep 17 00:00:00 2001 From: ramitg254 Date: Wed, 24 Sep 2025 14:09:07 +0530 Subject: [PATCH 4/6] Revert altered behaviour for batches in case kll and bit vector both are disabled --- .../hive/metastore/MetaStoreDirectSql.java | 458 ++++++++---------- .../metastore/MetastoreDirectSqlUtils.java | 48 -- 2 files changed, 209 insertions(+), 297 deletions(-) diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/MetaStoreDirectSql.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/MetaStoreDirectSql.java index e66429488e0c..d9e5e755ba96 100644 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/MetaStoreDirectSql.java +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/MetaStoreDirectSql.java @@ -1930,46 +1930,16 @@ private List columnStatisticsObjForPartitionsBatch(String c return aggrStatsUseJava(catName, dbName, tableName, partNames, colNames, engine, areAllPartsFound, useDensityFunctionForNDVEstimation, ndvTuner, enableBitVector, enableKll); } else { - return aggrStatsUseDB(catName, dbName, tableName, partNames, colNames, engine, areAllPartsFound, - useDensityFunctionForNDVEstimation, ndvTuner); - } - } - - private Batchable jobsBatching(final String queryText0, final String catName, final String dbName, - final String tableName, final List partNames, final String engine, final boolean doTrace) { - return new Batchable() { - @Override - public List run(final List inputColNames) - throws MetaException { - Batchable b2 = new Batchable() { - @Override - public List run(List inputPartNames) - throws MetaException { - String queryText = - String.format(queryText0, makeParams(inputColNames.size()), makeParams(inputPartNames.size())); - long start = doTrace ? System.nanoTime() : 0; - Query query = pm.newQuery("javax.jdo.query.SQL", queryText); - try { - Object qResult = executeWithArray(query, - prepareParams(catName, dbName, tableName, inputPartNames, inputColNames, engine), queryText); - long end = doTrace ? System.nanoTime() : 0; - MetastoreDirectSqlUtils.timingTrace(doTrace, queryText0, start, end); - if (qResult == null) { - return Collections.emptyList(); - } - return MetastoreDirectSqlUtils.ensureList(qResult); - } finally { - addQueryAfterUse(query); - } - } - }; - try { - return Batchable.runBatched(batchSize, partNames, b2); - } finally { - addQueryAfterUse(b2); + return Batchable.runBatched(batchSize, partNames, new Batchable() { + @Override + public List run(List inputPartNames) + throws MetaException { + return aggrStatsUseDB(catName, dbName, tableName, inputPartNames, colNames, engine, areAllPartsFound, + useDensityFunctionForNDVEstimation, ndvTuner); } - } - }; + }); + + } } private List aggrStatsUseJava(String catName, String dbName, String tableName, @@ -1984,22 +1954,9 @@ private List aggrStatsUseJava(String catName, String dbName areAllPartsFound, useDensityFunctionForNDVEstimation, ndvTuner); } - private Map> columnWiseSubList (List list){ - Map> colSubList = new HashMap<>(); - for (Object[] row : list) { - String colName = (String) row[0]; - if (!colSubList.containsKey(colName)) { - colSubList.put(colName, new ArrayList<>()); - } - colSubList.get(colName).add(row); - } - return colSubList; - } - - private List aggrStatsUseDB(String catName, String dbName, String tableName, - List partNames, List colNames, String engine, boolean areAllPartsFound, - boolean useDensityFunctionForNDVEstimation, double ndvTuner) - throws MetaException { + private List aggrStatsUseDB(String catName, String dbName, + String tableName, List partNames, List colNames, String engine, + boolean areAllPartsFound, boolean useDensityFunctionForNDVEstimation, double ndvTuner) throws MetaException { // TODO: all the extrapolation logic should be moved out of this class, // only mechanical data retrieval should remain here. String commonPrefix = "select \"COLUMN_NAME\", \"COLUMN_TYPE\", " @@ -2020,95 +1977,116 @@ private List aggrStatsUseDB(String catName, String dbName, // And, we also guarantee that the estimation makes sense by comparing it to the // UpperBound (calculated by "sum(\"NUM_DISTINCTS\")") // and LowerBound (calculated by "max(\"NUM_DISTINCTS\")") - + "sum((\"LONG_HIGH_VALUE\"-\"LONG_LOW_VALUE\")/cast(\"NUM_DISTINCTS\" as decimal))," - + "count((\"LONG_HIGH_VALUE\"-\"LONG_LOW_VALUE\")/cast(\"NUM_DISTINCTS\" as decimal))," - + "sum((\"DOUBLE_HIGH_VALUE\"-\"DOUBLE_LOW_VALUE\")/\"NUM_DISTINCTS\")," - + "count((\"DOUBLE_HIGH_VALUE\"-\"DOUBLE_LOW_VALUE\")/\"NUM_DISTINCTS\")," - + "sum((cast(\"BIG_DECIMAL_HIGH_VALUE\" as decimal)-cast(\"BIG_DECIMAL_LOW_VALUE\" as decimal))/\"NUM_DISTINCTS\")," - + "count((cast(\"BIG_DECIMAL_HIGH_VALUE\" as decimal)-cast(\"BIG_DECIMAL_LOW_VALUE\" as decimal))/\"NUM_DISTINCTS\")," - + "sum(\"NUM_DISTINCTS\")" + " from " + PART_COL_STATS + "" + " inner join " + PARTITIONS + " on " - + PART_COL_STATS + ".\"PART_ID\" = " + PARTITIONS + ".\"PART_ID\"" + " inner join " + TBLS + " on " + PARTITIONS - + ".\"TBL_ID\" = " + TBLS + ".\"TBL_ID\"" + " inner join " + DBS + " on " + TBLS + ".\"DB_ID\" = " + DBS - + ".\"DB_ID\"" + " where " + DBS + ".\"CTLG_NAME\" = ? and " + DBS + ".\"NAME\" = ? and " + TBLS - + ".\"TBL_NAME\" = ? "; + + "avg((\"LONG_HIGH_VALUE\"-\"LONG_LOW_VALUE\")/cast(\"NUM_DISTINCTS\" as decimal))," + + "avg((\"DOUBLE_HIGH_VALUE\"-\"DOUBLE_LOW_VALUE\")/\"NUM_DISTINCTS\")," + + "avg((cast(\"BIG_DECIMAL_HIGH_VALUE\" as decimal)-cast(\"BIG_DECIMAL_LOW_VALUE\" as decimal))/\"NUM_DISTINCTS\")," + + "sum(\"NUM_DISTINCTS\")" + " from " + PART_COL_STATS + "" + + " inner join " + PARTITIONS + " on " + PART_COL_STATS + ".\"PART_ID\" = " + PARTITIONS + ".\"PART_ID\"" + + " inner join " + TBLS + " on " + PARTITIONS + ".\"TBL_ID\" = " + TBLS + ".\"TBL_ID\"" + + " inner join " + DBS + " on " + TBLS + ".\"DB_ID\" = " + DBS + ".\"DB_ID\"" + + " where " + DBS + ".\"CTLG_NAME\" = ? and " + DBS + ".\"NAME\" = ? and " + TBLS + ".\"TBL_NAME\" = ? "; String queryText = null; + long start = 0; + long end = 0; boolean doTrace = LOG.isDebugEnabled(); ForwardQueryResult fqr = null; // Check if the status of all the columns of all the partitions exists // Extrapolation is not needed. if (areAllPartsFound) { - queryText = commonPrefix + " and \"COLUMN_NAME\" in (%1$s)" + " and " + PARTITIONS + ".\"PART_NAME\" in (%2$s)" - + " and \"ENGINE\" = ? " + " group by \"COLUMN_NAME\", \"COLUMN_TYPE\""; - Batchable b = jobsBatching(queryText, catName, dbName, tableName, partNames, engine, doTrace); - List colStats = new ArrayList(colNames.size()); - try { - List list = Batchable.runBatched(batchSize, colNames, b); - Map> colSubList = columnWiseSubList(list); - for (String colName : colSubList.keySet()) { - colStats.add(CSObjWithAdjustedNDV(colSubList.get(colName), 0, useDensityFunctionForNDVEstimation, ndvTuner)); + queryText = commonPrefix + " and \"COLUMN_NAME\" in (" + makeParams(colNames.size()) + ")" + + " and " + PARTITIONS + ".\"PART_NAME\" in (" + makeParams(partNames.size()) + ")" + + " and \"ENGINE\" = ? " + + " group by \"COLUMN_NAME\", \"COLUMN_TYPE\""; + start = doTrace ? System.nanoTime() : 0; + try (QueryWrapper query = new QueryWrapper(pm.newQuery("javax.jdo.query.SQL", queryText))) { + Object qResult = executeWithArray(query.getInnerQuery(), + prepareParams(catName, dbName, tableName, partNames, colNames, + engine), queryText); + if (qResult == null) { + return Collections.emptyList(); + } + end = doTrace ? System.nanoTime() : 0; + MetastoreDirectSqlUtils.timingTrace(doTrace, queryText, start, end); + List list = MetastoreDirectSqlUtils.ensureList(qResult); + List colStats = + new ArrayList(list.size()); + for (Object[] row : list) { + colStats.add(prepareCSObjWithAdjustedNDV(row, 0, + useDensityFunctionForNDVEstimation, ndvTuner)); Deadline.checkTimeout(); } - } finally { - b.closeAllQueries(); + return colStats; } - return colStats; } else { // Extrapolation is needed for some columns. // In this case, at least a column status for a partition is missing. // We need to extrapolate this partition based on the other partitions List colStats = new ArrayList(colNames.size()); - queryText = - "select \"COLUMN_NAME\", \"COLUMN_TYPE\", count(\"PART_COL_STATS\".\"PART_ID\") " + " from " + PART_COL_STATS - + " inner join " + PARTITIONS + " on " + PART_COL_STATS + ".\"PART_ID\" = " + PARTITIONS + ".\"PART_ID\"" - + " inner join " + TBLS + " on " + PARTITIONS + ".\"TBL_ID\" = " + TBLS + ".\"TBL_ID\"" + " inner join " - + DBS + " on " + TBLS + ".\"DB_ID\" = " + DBS + ".\"DB_ID\"" + " where " + DBS + ".\"CTLG_NAME\" = ? and " - + DBS + ".\"NAME\" = ? and " + TBLS + ".\"TBL_NAME\" = ? " + " and " + PART_COL_STATS - + ".\"COLUMN_NAME\" in (%1$s)" + " and " + PARTITIONS + ".\"PART_NAME\" in (%2$s)" + " and " - + PART_COL_STATS + ".\"ENGINE\" = ? " + " group by " + PART_COL_STATS + ".\"COLUMN_NAME\", " - + PART_COL_STATS + ".\"COLUMN_TYPE\""; - - Batchable b = jobsBatching(queryText, catName, dbName, tableName, partNames, engine, doTrace); + queryText = "select \"COLUMN_NAME\", \"COLUMN_TYPE\", count(\"PART_COL_STATS\".\"PART_ID\") " + + " from " + PART_COL_STATS + + " inner join " + PARTITIONS + " on " + PART_COL_STATS + ".\"PART_ID\" = " + PARTITIONS + ".\"PART_ID\"" + + " inner join " + TBLS + " on " + PARTITIONS + ".\"TBL_ID\" = " + TBLS + ".\"TBL_ID\"" + + " inner join " + DBS + " on " + TBLS + ".\"DB_ID\" = " + DBS + ".\"DB_ID\"" + + " where " + DBS + ".\"CTLG_NAME\" = ? and " + DBS + ".\"NAME\" = ? and " + TBLS + ".\"TBL_NAME\" = ? " + + " and " + PART_COL_STATS + ".\"COLUMN_NAME\" in (" + makeParams(colNames.size()) + ")" + + " and " + PARTITIONS + ".\"PART_NAME\" in (" + makeParams(partNames.size()) + ")" + + " and " + PART_COL_STATS + ".\"ENGINE\" = ? " + + " group by " + PART_COL_STATS + ".\"COLUMN_NAME\", " + PART_COL_STATS + ".\"COLUMN_TYPE\""; + start = doTrace ? System.nanoTime() : 0; List noExtraColumnNames = new ArrayList(); Map extraColumnNameTypeParts = new HashMap(); - try { - List list = Batchable.runBatched(batchSize, colNames, b); - Map> colSubList = columnWiseSubList(list); - for (String colName : colSubList.keySet()) { - List subList = colSubList.get(colName); - String colType = (String) subList.getFirst()[1]; - Object countObj = null; - for (Object[] row : subList) { - countObj = MetastoreDirectSqlUtils.sum(countObj, row[2], 1); - } - Long count = MetastoreDirectSqlUtils.extractSqlLong(countObj); + try(QueryWrapper query = new QueryWrapper(pm.newQuery("javax.jdo.query.SQL", queryText))) { + Object qResult = executeWithArray(query.getInnerQuery(), + prepareParams(catName, dbName, tableName, partNames, colNames, + engine), queryText); + end = doTrace ? System.nanoTime() : 0; + MetastoreDirectSqlUtils.timingTrace(doTrace, queryText, start, end); + if (qResult == null) { + return Collections.emptyList(); + } + + List list = MetastoreDirectSqlUtils.ensureList(qResult); + for (Object[] row : list) { + String colName = (String) row[0]; + String colType = (String) row[1]; + // Extrapolation is not needed for this column if + // count(\"PARTITION_NAME\")==partNames.size() + // Or, extrapolation is not possible for this column if + // count(\"PARTITION_NAME\")<2 + Long count = MetastoreDirectSqlUtils.extractSqlLong(row[2]); if (count == partNames.size() || count < 2) { noExtraColumnNames.add(colName); } else { - extraColumnNameTypeParts.put(colName, new String[]{colType, String.valueOf(count)}); + extraColumnNameTypeParts.put(colName, new String[] {colType, String.valueOf(count)}); } Deadline.checkTimeout(); } - } finally { - b.closeAllQueries(); } // Extrapolation is not needed for columns noExtraColumnNames List list; if (noExtraColumnNames.size() != 0) { - queryText = commonPrefix + " and \"COLUMN_NAME\" in (%1$s)" + " and \"PARTITION_NAME\" in (%2$s)" - + " and \"ENGINE\" = ? " + " group by \"COLUMN_NAME\", \"COLUMN_TYPE\""; - - b = jobsBatching(queryText, catName, dbName, tableName, partNames, engine, doTrace); - try { - list = Batchable.runBatched(batchSize, noExtraColumnNames, b); - Map> colSubList = columnWiseSubList(list); - for (String colName : colSubList.keySet()) { - colStats.add( - CSObjWithAdjustedNDV(colSubList.get(colName), 0, useDensityFunctionForNDVEstimation, ndvTuner)); + queryText = commonPrefix + " and \"COLUMN_NAME\" in (" + + makeParams(noExtraColumnNames.size()) + ")" + " and \"PARTITION_NAME\" in (" + + makeParams(partNames.size()) + ")" + + " and \"ENGINE\" = ? " + + " group by \"COLUMN_NAME\", \"COLUMN_TYPE\""; + start = doTrace ? System.nanoTime() : 0; + + try (QueryWrapper query = new QueryWrapper(pm.newQuery("javax.jdo.query.SQL", queryText))) { + Object qResult = executeWithArray(query.getInnerQuery(), + prepareParams(catName, dbName, tableName, partNames, noExtraColumnNames, engine), queryText); + if (qResult == null) { + return Collections.emptyList(); + } + list = MetastoreDirectSqlUtils.ensureList(qResult); + for (Object[] row : list) { + colStats.add(prepareCSObjWithAdjustedNDV(row, 0, + useDensityFunctionForNDVEstimation, ndvTuner)); Deadline.checkTimeout(); } - } finally { - b.closeAllQueries(); + end = doTrace ? System.nanoTime() : 0; + MetastoreDirectSqlUtils.timingTrace(doTrace, queryText, start, end); } } // Extrapolation is needed for extraColumnNames. @@ -2120,38 +2098,29 @@ private List aggrStatsUseDB(String catName, String dbName, } // get sum for all columns to reduce the number of queries Map> sumMap = new HashMap>(); - queryText = - "select \"COLUMN_NAME\", sum(\"NUM_NULLS\"), sum(\"NUM_TRUES\"), sum(\"NUM_FALSES\"), sum(\"NUM_DISTINCTS\")" - + " from " + PART_COL_STATS + " inner join " + PARTITIONS + " on " + PART_COL_STATS + ".\"PART_ID\" = " - + PARTITIONS + ".\"PART_ID\"" + " inner join " + TBLS + " on " + PARTITIONS + ".\"TBL_ID\" = " + TBLS - + ".\"TBL_ID\"" + " inner join " + DBS + " on " + TBLS + ".\"DB_ID\" = " + DBS + ".\"DB_ID\"" - + " where " + DBS + ".\"CTLG_NAME\" = ? and " + DBS + ".\"NAME\" = ? and " + TBLS + ".\"TBL_NAME\" = ? " - + " and " + PART_COL_STATS + ".\"COLUMN_NAME\" in (" + makeParams(extraColumnNameTypeParts.size()) + ")" - + " and " + PARTITIONS + ".\"PART_NAME\" in (" + makeParams(partNames.size()) + ")" + " and " - + PART_COL_STATS + ".\"ENGINE\" = ? " + " group by " + PART_COL_STATS + ".\"COLUMN_NAME\""; - - b = jobsBatching(queryText, catName, dbName, tableName, partNames, engine, doTrace); - try { + queryText = "select \"COLUMN_NAME\", sum(\"NUM_NULLS\"), sum(\"NUM_TRUES\"), sum(\"NUM_FALSES\"), sum(\"NUM_DISTINCTS\")" + + " from " + PART_COL_STATS + + " inner join " + PARTITIONS + " on " + PART_COL_STATS + ".\"PART_ID\" = " + PARTITIONS + ".\"PART_ID\"" + + " inner join " + TBLS + " on " + PARTITIONS + ".\"TBL_ID\" = " + TBLS + ".\"TBL_ID\"" + + " inner join " + DBS + " on " + TBLS + ".\"DB_ID\" = " + DBS + ".\"DB_ID\"" + + " where " + DBS + ".\"CTLG_NAME\" = ? and " + DBS + ".\"NAME\" = ? and " + TBLS + ".\"TBL_NAME\" = ? " + + " and " + PART_COL_STATS + ".\"COLUMN_NAME\" in (" + makeParams(extraColumnNameTypeParts.size()) + ")" + + " and " + PARTITIONS + ".\"PART_NAME\" in (" + makeParams(partNames.size()) + ")" + + " and " + PART_COL_STATS + ".\"ENGINE\" = ? " + + " group by " + PART_COL_STATS + ".\"COLUMN_NAME\""; + start = doTrace ? System.nanoTime() : 0; + try (QueryWrapper query = new QueryWrapper(pm.newQuery("javax.jdo.query.SQL", queryText))) { List extraColumnNames = new ArrayList(); extraColumnNames.addAll(extraColumnNameTypeParts.keySet()); - List unmergedList = Batchable.runBatched(batchSize, extraColumnNames, b); - Map> colSubList = columnWiseSubList(unmergedList); - List mergedList = new ArrayList<>(); - for (String colName : colSubList.keySet()) { - Object[] mergedRow = new Object[5]; - List subList = colSubList.get(colName); - mergedRow[0] = colName; - for (Object[] row : subList) { - mergedRow[1] = MetastoreDirectSqlUtils.sum(mergedRow[1], row[1], 1); - mergedRow[2] = MetastoreDirectSqlUtils.sum(mergedRow[2], row[2], 1); - mergedRow[3] = MetastoreDirectSqlUtils.sum(mergedRow[3], row[3], 1); - mergedRow[4] = MetastoreDirectSqlUtils.sum(mergedRow[4], row[4], 1); - } - mergedList.add(mergedRow); + Object qResult = executeWithArray(query.getInnerQuery(), + prepareParams(catName, dbName, tableName, partNames, + extraColumnNames, engine), queryText); + if (qResult == null) { + return Collections.emptyList(); } - list = mergedList; + list = MetastoreDirectSqlUtils.ensureList(qResult); // see the indexes for colstats in IExtrapolatePartStatus - Integer[] sumIndex = new Integer[]{6, 10, 11, 15}; + Integer[] sumIndex = new Integer[] {6, 10, 11, 15}; for (Object[] row : list) { Map indexToObject = new HashMap(); for (int ind = 1; ind < row.length; ind++) { @@ -2161,10 +2130,9 @@ private List aggrStatsUseDB(String catName, String dbName, sumMap.put((String) row[0], indexToObject); Deadline.checkTimeout(); } - } finally { - b.closeAllQueries(); + end = doTrace ? System.nanoTime() : 0; + MetastoreDirectSqlUtils.timingTrace(doTrace, queryText, start, end); } - for (Map.Entry entry : extraColumnNameTypeParts.entrySet()) { Object[] row = new Object[IExtrapolatePartStatus.colStatNames.length + 2]; String colName = entry.getKey(); @@ -2191,37 +2159,6 @@ private List aggrStatsUseDB(String catName, String dbName, if (index == null) { index = IExtrapolatePartStatus.indexMaps.get("default"); } - - //for avg calculation - queryText = "select " + "sum((\"LONG_HIGH_VALUE\"-\"LONG_LOW_VALUE\")/cast(\"NUM_DISTINCTS\" as decimal))," - + "count((\"LONG_HIGH_VALUE\"-\"LONG_LOW_VALUE\")/cast(\"NUM_DISTINCTS\" as decimal))," - + "sum((\"DOUBLE_HIGH_VALUE\"-\"DOUBLE_LOW_VALUE\")/\"NUM_DISTINCTS\")," - + "count((\"DOUBLE_HIGH_VALUE\"-\"DOUBLE_LOW_VALUE\")/\"NUM_DISTINCTS\")," - + "sum((cast(\"BIG_DECIMAL_HIGH_VALUE\" as decimal)-cast(\"BIG_DECIMAL_LOW_VALUE\" as decimal))/\"NUM_DISTINCTS\")," - + "count((cast(\"BIG_DECIMAL_HIGH_VALUE\" as decimal)-cast(\"BIG_DECIMAL_LOW_VALUE\" as decimal))/\"NUM_DISTINCTS\")," - + " from " + PART_COL_STATS + "" + " inner join " + PARTITIONS + " on " + PART_COL_STATS - + ".\"PART_ID\" = " + PARTITIONS + ".\"PART_ID\"" + " inner join " + TBLS + " on " + PARTITIONS - + ".\"TBL_ID\" = " + TBLS + ".\"TBL_ID\"" + " inner join " + DBS + " on " + TBLS + ".\"DB_ID\" = " + DBS - + ".\"DB_ID\"" + " where " + DBS + ".\"CTLG_NAME\" = ? and " + DBS + ".\"NAME\" = ? and " + TBLS - + ".\"TBL_NAME\" = ? " + " and " + PART_COL_STATS + ".\"COLUMN_NAME\" in (%1$s)" + " and " + PARTITIONS - + ".\"PART_NAME\" in (%2$s)" + " and " + PART_COL_STATS + ".\"ENGINE\" = ? " - + " group by \"COLUMN_NAME\""; - - b = jobsBatching(queryText, catName, dbName, tableName, partNames, engine, doTrace); - Object[] avg = new Object[3]; - try { - list = Batchable.runBatched(batchSize, Arrays.asList(colName), b); - for (int i = 0; i < 6; i += 2) { - Object sum = null, count = null; - for (Object[] batch : list) { - sum = MetastoreDirectSqlUtils.sum(sum, batch[i], 0); - count = MetastoreDirectSqlUtils.sum(count, batch[i + 1], 0); - } - avg[i / 2] = MetastoreDirectSqlUtils.divide(sum, count, 0); - } - } finally { - b.closeAllQueries(); - } for (int colStatIndex : index) { String colStatName = IExtrapolatePartStatus.colStatNames[colStatIndex]; // if the aggregation type is sum, we do a scale-up @@ -2237,35 +2174,76 @@ private List aggrStatsUseDB(String catName, String dbName, || IExtrapolatePartStatus.aggrTypes[colStatIndex] == IExtrapolatePartStatus.AggrType.Max) { // if the aggregation type is min/max, we extrapolate from the // left/right borders - String orderByExpr = decimal ? "cast(\"" + colStatName + "\" as decimal)" : "\"" + colStatName + "\""; - - queryText = - "select \"" + colStatName + "\",\"PART_NAME\" from " + PART_COL_STATS + " inner join " + PARTITIONS - + " on " + PART_COL_STATS + ".\"PART_ID\" = " + PARTITIONS + ".\"PART_ID\"" + " inner join " - + TBLS + " on " + PARTITIONS + ".\"TBL_ID\" = " + TBLS + ".\"TBL_ID\"" + " inner join " + DBS - + " on " + TBLS + ".\"DB_ID\" = " + DBS + ".\"DB_ID\"" + " where " + DBS - + ".\"CTLG_NAME\" = ? and " + DBS + ".\"NAME\" = ? and " + TBLS + ".\"TBL_NAME\" = ? " + " and " - + PART_COL_STATS + ".\"COLUMN_NAME\" in (%1$s)" + " and " + PARTITIONS - + ".\"PART_NAME\" in (%2$s)" + " and " + PART_COL_STATS + ".\"ENGINE\" = ? " + " order by " - + orderByExpr; - - b = jobsBatching(queryText, catName, dbName, tableName, partNames, engine, doTrace); - try { - list = Batchable.runBatched(batchSize, Arrays.asList(colName), b); - Object[] min = list.getFirst(); - Object[] max = list.getLast(); + if (!decimal) { + queryText = "select \"" + colStatName + "\",\"PART_NAME\" from " + PART_COL_STATS + + " inner join " + PARTITIONS + " on " + PART_COL_STATS + ".\"PART_ID\" = " + PARTITIONS + ".\"PART_ID\"" + + " inner join " + TBLS + " on " + PARTITIONS + ".\"TBL_ID\" = " + TBLS + ".\"TBL_ID\"" + + " inner join " + DBS + " on " + TBLS + ".\"DB_ID\" = " + DBS + ".\"DB_ID\"" + + " where " + DBS + ".\"CTLG_NAME\" = ? and " + DBS + ".\"NAME\" = ? and " + TBLS + ".\"TBL_NAME\" = ? " + + " and " + PART_COL_STATS + ".\"COLUMN_NAME\" = ? " + + " and " + PARTITIONS + ".\"PART_NAME\" in (" + makeParams(partNames.size()) + ")" + + " and " + PART_COL_STATS + ".\"ENGINE\" = ? " + + " order by \"" + colStatName + "\""; + } else { + queryText = "select \"" + colStatName + "\",\"PART_NAME\" from " + PART_COL_STATS + + " inner join " + PARTITIONS + " on " + PART_COL_STATS + ".\"PART_ID\" = " + PARTITIONS + ".\"PART_ID\"" + + " inner join " + TBLS + " on " + PARTITIONS + ".\"TBL_ID\" = " + TBLS + ".\"TBL_ID\"" + + " inner join " + DBS + " on " + TBLS + ".\"DB_ID\" = " + DBS + ".\"DB_ID\"" + + " where " + DBS + ".\"CTLG_NAME\" = ? and " + DBS + ".\"NAME\" = ? and " + TBLS + ".\"TBL_NAME\" = ? " + + " and " + PART_COL_STATS + ".\"COLUMN_NAME\" = ? " + + " and " + PARTITIONS + ".\"PART_NAME\" in (" + makeParams(partNames.size()) + ")" + + " and " + PART_COL_STATS + ".\"ENGINE\" = ? " + + " order by cast(\"" + colStatName + "\" as decimal)"; + } + start = doTrace ? System.nanoTime() : 0; + try (QueryWrapper query = new QueryWrapper(pm.newQuery("javax.jdo.query.SQL", queryText))) { + Object qResult = executeWithArray(query.getInnerQuery(), + prepareParams(catName, dbName, tableName, partNames, Arrays.asList(colName), engine), queryText); + if (qResult == null) { + return Collections.emptyList(); + } + fqr = (ForwardQueryResult) qResult; + Object[] min = (Object[]) (fqr.get(0)); + Object[] max = (Object[]) (fqr.get(fqr.size() - 1)); + end = doTrace ? System.nanoTime() : 0; + MetastoreDirectSqlUtils.timingTrace(doTrace, queryText, start, end); if (min[0] == null || max[0] == null) { row[2 + colStatIndex] = null; } else { - row[2 + colStatIndex] = extrapolateMethod.extrapolate(min, max, colStatIndex, indexMap); + row[2 + colStatIndex] = extrapolateMethod + .extrapolate(min, max, colStatIndex, indexMap); } - } finally { - b.closeAllQueries(); } } else { - // colStatIndex=12,13,14 respond to "AVG_LONG", "AVG_DOUBLE", - // "AVG_DECIMAL" - row[2 + colStatIndex] = avg[colStatIndex - 12]; + // if the aggregation type is avg, we use the average on the existing ones. + queryText = "select " + + "avg((\"LONG_HIGH_VALUE\"-\"LONG_LOW_VALUE\")/cast(\"NUM_DISTINCTS\" as decimal))," + + "avg((\"DOUBLE_HIGH_VALUE\"-\"DOUBLE_LOW_VALUE\")/\"NUM_DISTINCTS\")," + + "avg((cast(\"BIG_DECIMAL_HIGH_VALUE\" as decimal)-cast(\"BIG_DECIMAL_LOW_VALUE\" as decimal))/\"NUM_DISTINCTS\")" + + " from " + PART_COL_STATS + "" + + " inner join " + PARTITIONS + " on " + PART_COL_STATS + ".\"PART_ID\" = " + PARTITIONS + ".\"PART_ID\"" + + " inner join " + TBLS + " on " + PARTITIONS + ".\"TBL_ID\" = " + TBLS + ".\"TBL_ID\"" + + " inner join " + DBS + " on " + TBLS + ".\"DB_ID\" = " + DBS + ".\"DB_ID\"" + + " where " + DBS + ".\"CTLG_NAME\" = ? and " + DBS + ".\"NAME\" = ? and " + TBLS + ".\"TBL_NAME\" = ? " + + " and " + PART_COL_STATS + ".\"COLUMN_NAME\" = ? " + + " and " + PARTITIONS + ".\"PART_NAME\" in (" + makeParams(partNames.size()) + ")" + + " and " + PART_COL_STATS + ".\"ENGINE\" = ? " + + " group by \"COLUMN_NAME\""; + start = doTrace ? System.nanoTime() : 0; + try(QueryWrapper query = new QueryWrapper(pm.newQuery("javax.jdo.query.SQL", queryText))) { + Object qResult = executeWithArray(query.getInnerQuery(), + prepareParams(catName, dbName, tableName, partNames, Arrays.asList(colName), engine), queryText); + if (qResult == null) { + return Collections.emptyList(); + } + fqr = (ForwardQueryResult) qResult; + Object[] avg = (Object[]) (fqr.get(0)); + // colStatIndex=12,13,14 respond to "AVG_LONG", "AVG_DOUBLE", + // "AVG_DECIMAL" + row[2 + colStatIndex] = avg[colStatIndex - 12]; + end = doTrace ? System.nanoTime() : 0; + MetastoreDirectSqlUtils.timingTrace(doTrace, queryText, start, end); + } } } colStats.add(prepareCSObjWithAdjustedNDV(row, 0, useDensityFunctionForNDVEstimation, ndvTuner)); @@ -2276,54 +2254,6 @@ private List aggrStatsUseDB(String catName, String dbName, } } - private ColumnStatisticsObj CSObjWithAdjustedNDV(List list, int i, - boolean useDensityFunctionForNDVEstimation, double ndvTuner) - throws MetaException { - if (list.isEmpty()) { - return null; - } - ColumnStatisticsData data = new ColumnStatisticsData(); - int j = i; - Object[] row = list.getFirst(); - String colName = (String) row[j++]; - String colType = (String) row[j++]; - ColumnStatisticsObj cso = new ColumnStatisticsObj(colName, colType, data); - Object llow = row[j++], lhigh = row[j++], dlow = row[j++], dhigh = row[j++], declow = row[j++], dechigh = row[j++], - nulls = row[j++], dist = row[j++], avglen = row[j++], maxlen = row[j++], trues = row[j++], falses = row[j++], - sumLong = row[j++], countLong = row[j++], sumDouble = row[j++], countDouble = row[j++], sumDecimal = row[j++], - countDecimal = row[j++], sumDist = row[j++]; - for (int k = 1; k < list.size(); k++) { - j = i + 2; - row = list.get(k); - llow = MetastoreDirectSqlUtils.min(llow, row[j++], 1); - lhigh = MetastoreDirectSqlUtils.max(lhigh, row[j++], 1); - dlow = MetastoreDirectSqlUtils.min(dlow, row[j++], 0); - dhigh = MetastoreDirectSqlUtils.max(dhigh, row[j++], 0); - declow = MetastoreDirectSqlUtils.min(declow, row[j++], 0); - dechigh = MetastoreDirectSqlUtils.max(dechigh, row[j++], 0); - nulls = MetastoreDirectSqlUtils.sum(nulls, row[j++], 1); - dist = MetastoreDirectSqlUtils.max(dist, row[j++], 1); - avglen = MetastoreDirectSqlUtils.max(avglen, row[j++], 0); - maxlen = MetastoreDirectSqlUtils.max(maxlen, row[j++], 1); - trues = MetastoreDirectSqlUtils.sum(trues, row[j++], 1); - falses = MetastoreDirectSqlUtils.sum(falses, row[j++], 1); - sumLong = MetastoreDirectSqlUtils.sum(sumLong, row[j++], 0); - countLong = MetastoreDirectSqlUtils.sum(countLong, row[j++], 0); - sumDouble = MetastoreDirectSqlUtils.sum(sumDouble, row[j++], 0); - countDouble = MetastoreDirectSqlUtils.sum(countDouble, row[j++], 0); - sumDecimal = MetastoreDirectSqlUtils.sum(sumDecimal, row[j++], 0); - countDecimal = MetastoreDirectSqlUtils.sum(countDecimal, row[j++], 0); - sumDist = MetastoreDirectSqlUtils.sum(sumDist, row[j],1); - } - Object avgLong = MetastoreDirectSqlUtils.divide(sumLong, countLong, 0); - Object avgDouble = MetastoreDirectSqlUtils.divide(sumDouble, countDouble, 0); - Object avgDecimal = MetastoreDirectSqlUtils.divide(sumDecimal, countDecimal, 0); - StatObjectConverter.fillColumnStatisticsData(cso.getColType(), data, llow, lhigh, dlow, dhigh, declow, dechigh, - nulls, dist, avglen, maxlen, trues, falses, avgLong, avgDouble, avgDecimal, sumDist, - useDensityFunctionForNDVEstimation, ndvTuner); - return cso; - } - private ColumnStatisticsObj prepareCSObj (Object[] row, int i) throws MetaException { ColumnStatisticsData data = new ColumnStatisticsData(); ColumnStatisticsObj cso = new ColumnStatisticsObj((String)row[i++], (String)row[i++], data); @@ -2384,7 +2314,37 @@ public List getPartitionStats( + " and " + PARTITIONS + ".\"PART_NAME\" in (%2$s)" + " and " + PART_COL_STATS + ".\"ENGINE\" = ? " + " order by " + PARTITIONS + ".\"PART_NAME\""; - Batchable b = jobsBatching(queryText0, catName, dbName, tableName, partNames, engine, doTrace); + Batchable b = new Batchable() { + @Override + public List run(final List inputColNames) throws MetaException { + Batchable b2 = new Batchable() { + @Override + public List run(List inputPartNames) throws MetaException { + String queryText = String.format(queryText0, + makeParams(inputColNames.size()), makeParams(inputPartNames.size())); + long start = doTrace ? System.nanoTime() : 0; + Query query = pm.newQuery("javax.jdo.query.SQL", queryText); + try { + Object qResult = executeWithArray(query, prepareParams( + catName, dbName, tableName, inputPartNames, inputColNames, engine), queryText); + MetastoreDirectSqlUtils.timingTrace(doTrace, queryText0, start, (doTrace ? System.nanoTime() : 0)); + if (qResult == null) { + return Collections.emptyList(); + } + return MetastoreDirectSqlUtils.ensureList(qResult); + } finally { + addQueryAfterUse(query); + } + } + }; + try { + return Batchable.runBatched(batchSize, partNames, b2); + } finally { + addQueryAfterUse(b2); + } + } + }; + List result = new ArrayList(partNames.size()); String lastPartName = null; int from = 0; diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/MetastoreDirectSqlUtils.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/MetastoreDirectSqlUtils.java index 12aba51aaa09..45e89ab40df5 100644 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/MetastoreDirectSqlUtils.java +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/MetastoreDirectSqlUtils.java @@ -646,52 +646,4 @@ public static void throwMetaOrRuntimeException(Exception e) throws MetaException throw new RuntimeException(e); } } - - public static Object sum(Object a, Object b, int type) - throws MetaException { - if (a == null) { - return b; - } - if (b == null) { - return a; - } - return (type == 0) ? MetastoreDirectSqlUtils.extractSqlDouble(a) + MetastoreDirectSqlUtils.extractSqlDouble(b) - : MetastoreDirectSqlUtils.extractSqlLong(a) + MetastoreDirectSqlUtils.extractSqlLong(b); - } - - public static Object divide(Object a, Object b, int type) - throws MetaException { - if (a == null || b == null || (type == 0 && MetastoreDirectSqlUtils.extractSqlDouble(b) == 0) - || MetastoreDirectSqlUtils.extractSqlDouble(b) == 0) { - return null; - } - return (type == 0) ? MetastoreDirectSqlUtils.extractSqlDouble(a) / MetastoreDirectSqlUtils.extractSqlDouble(b) - : MetastoreDirectSqlUtils.extractSqlLong(a) / MetastoreDirectSqlUtils.extractSqlLong(b); - } - - public static Object min(Object a, Object b, int type) - throws MetaException { - if (a == null) { - return b; - } - if (b == null) { - return a; - } - return (type == 0) ? Math.min(MetastoreDirectSqlUtils.extractSqlDouble(a), - MetastoreDirectSqlUtils.extractSqlDouble(b)) - : Math.min(MetastoreDirectSqlUtils.extractSqlLong(a), MetastoreDirectSqlUtils.extractSqlLong(b)); - } - - public static Object max(Object a, Object b, int type) - throws MetaException { - if (a == null) { - return b; - } - if (b == null) { - return a; - } - return (type == 0) ? Math.max(MetastoreDirectSqlUtils.extractSqlDouble(a), - MetastoreDirectSqlUtils.extractSqlDouble(b)) - : Math.max(MetastoreDirectSqlUtils.extractSqlLong(a), MetastoreDirectSqlUtils.extractSqlLong(b)); - } } From f7fea52e91a18b8c95f82e7ecc22b7445f21011f Mon Sep 17 00:00:00 2001 From: ramitg254 Date: Wed, 24 Sep 2025 14:23:40 +0530 Subject: [PATCH 5/6] removal of aggrStatsUseDB --- .../hive/metastore/MetaStoreDirectSql.java | 326 ------------------ 1 file changed, 326 deletions(-) diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/MetaStoreDirectSql.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/MetaStoreDirectSql.java index d9e5e755ba96..808889298a1d 100644 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/MetaStoreDirectSql.java +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/MetaStoreDirectSql.java @@ -1926,20 +1926,8 @@ private List columnStatisticsObjForPartitionsBatch(String c boolean areAllPartsFound, boolean useDensityFunctionForNDVEstimation, double ndvTuner, boolean enableBitVector, boolean enableKll) throws MetaException { - if (enableBitVector || enableKll) { return aggrStatsUseJava(catName, dbName, tableName, partNames, colNames, engine, areAllPartsFound, useDensityFunctionForNDVEstimation, ndvTuner, enableBitVector, enableKll); - } else { - return Batchable.runBatched(batchSize, partNames, new Batchable() { - @Override - public List run(List inputPartNames) - throws MetaException { - return aggrStatsUseDB(catName, dbName, tableName, inputPartNames, colNames, engine, areAllPartsFound, - useDensityFunctionForNDVEstimation, ndvTuner); - } - }); - - } } private List aggrStatsUseJava(String catName, String dbName, String tableName, @@ -1954,306 +1942,6 @@ private List aggrStatsUseJava(String catName, String dbName areAllPartsFound, useDensityFunctionForNDVEstimation, ndvTuner); } - private List aggrStatsUseDB(String catName, String dbName, - String tableName, List partNames, List colNames, String engine, - boolean areAllPartsFound, boolean useDensityFunctionForNDVEstimation, double ndvTuner) throws MetaException { - // TODO: all the extrapolation logic should be moved out of this class, - // only mechanical data retrieval should remain here. - String commonPrefix = "select \"COLUMN_NAME\", \"COLUMN_TYPE\", " - + "min(\"LONG_LOW_VALUE\"), max(\"LONG_HIGH_VALUE\"), min(\"DOUBLE_LOW_VALUE\"), max(\"DOUBLE_HIGH_VALUE\"), " - + "min(cast(\"BIG_DECIMAL_LOW_VALUE\" as decimal)), max(cast(\"BIG_DECIMAL_HIGH_VALUE\" as decimal)), " - + "sum(\"NUM_NULLS\"), max(\"NUM_DISTINCTS\"), " - + "max(\"AVG_COL_LEN\"), max(\"MAX_COL_LEN\"), sum(\"NUM_TRUES\"), sum(\"NUM_FALSES\"), " - // The following data is used to compute a partitioned table's NDV based - // on partitions' NDV when useDensityFunctionForNDVEstimation = true. Global NDVs cannot be - // accurately derived from partition NDVs, because the domain of column value two partitions - // can overlap. If there is no overlap then global NDV is just the sum - // of partition NDVs (UpperBound). But if there is some overlay then - // global NDV can be anywhere between sum of partition NDVs (no overlap) - // and same as one of the partition NDV (domain of column value in all other - // partitions is subset of the domain value in one of the partition) - // (LowerBound).But under uniform distribution, we can roughly estimate the global - // NDV by leveraging the min/max values. - // And, we also guarantee that the estimation makes sense by comparing it to the - // UpperBound (calculated by "sum(\"NUM_DISTINCTS\")") - // and LowerBound (calculated by "max(\"NUM_DISTINCTS\")") - + "avg((\"LONG_HIGH_VALUE\"-\"LONG_LOW_VALUE\")/cast(\"NUM_DISTINCTS\" as decimal))," - + "avg((\"DOUBLE_HIGH_VALUE\"-\"DOUBLE_LOW_VALUE\")/\"NUM_DISTINCTS\")," - + "avg((cast(\"BIG_DECIMAL_HIGH_VALUE\" as decimal)-cast(\"BIG_DECIMAL_LOW_VALUE\" as decimal))/\"NUM_DISTINCTS\")," - + "sum(\"NUM_DISTINCTS\")" + " from " + PART_COL_STATS + "" - + " inner join " + PARTITIONS + " on " + PART_COL_STATS + ".\"PART_ID\" = " + PARTITIONS + ".\"PART_ID\"" - + " inner join " + TBLS + " on " + PARTITIONS + ".\"TBL_ID\" = " + TBLS + ".\"TBL_ID\"" - + " inner join " + DBS + " on " + TBLS + ".\"DB_ID\" = " + DBS + ".\"DB_ID\"" - + " where " + DBS + ".\"CTLG_NAME\" = ? and " + DBS + ".\"NAME\" = ? and " + TBLS + ".\"TBL_NAME\" = ? "; - String queryText = null; - long start = 0; - long end = 0; - - boolean doTrace = LOG.isDebugEnabled(); - ForwardQueryResult fqr = null; - // Check if the status of all the columns of all the partitions exists - // Extrapolation is not needed. - if (areAllPartsFound) { - queryText = commonPrefix + " and \"COLUMN_NAME\" in (" + makeParams(colNames.size()) + ")" - + " and " + PARTITIONS + ".\"PART_NAME\" in (" + makeParams(partNames.size()) + ")" - + " and \"ENGINE\" = ? " - + " group by \"COLUMN_NAME\", \"COLUMN_TYPE\""; - start = doTrace ? System.nanoTime() : 0; - try (QueryWrapper query = new QueryWrapper(pm.newQuery("javax.jdo.query.SQL", queryText))) { - Object qResult = executeWithArray(query.getInnerQuery(), - prepareParams(catName, dbName, tableName, partNames, colNames, - engine), queryText); - if (qResult == null) { - return Collections.emptyList(); - } - end = doTrace ? System.nanoTime() : 0; - MetastoreDirectSqlUtils.timingTrace(doTrace, queryText, start, end); - List list = MetastoreDirectSqlUtils.ensureList(qResult); - List colStats = - new ArrayList(list.size()); - for (Object[] row : list) { - colStats.add(prepareCSObjWithAdjustedNDV(row, 0, - useDensityFunctionForNDVEstimation, ndvTuner)); - Deadline.checkTimeout(); - } - return colStats; - } - } else { - // Extrapolation is needed for some columns. - // In this case, at least a column status for a partition is missing. - // We need to extrapolate this partition based on the other partitions - List colStats = new ArrayList(colNames.size()); - queryText = "select \"COLUMN_NAME\", \"COLUMN_TYPE\", count(\"PART_COL_STATS\".\"PART_ID\") " - + " from " + PART_COL_STATS - + " inner join " + PARTITIONS + " on " + PART_COL_STATS + ".\"PART_ID\" = " + PARTITIONS + ".\"PART_ID\"" - + " inner join " + TBLS + " on " + PARTITIONS + ".\"TBL_ID\" = " + TBLS + ".\"TBL_ID\"" - + " inner join " + DBS + " on " + TBLS + ".\"DB_ID\" = " + DBS + ".\"DB_ID\"" - + " where " + DBS + ".\"CTLG_NAME\" = ? and " + DBS + ".\"NAME\" = ? and " + TBLS + ".\"TBL_NAME\" = ? " - + " and " + PART_COL_STATS + ".\"COLUMN_NAME\" in (" + makeParams(colNames.size()) + ")" - + " and " + PARTITIONS + ".\"PART_NAME\" in (" + makeParams(partNames.size()) + ")" - + " and " + PART_COL_STATS + ".\"ENGINE\" = ? " - + " group by " + PART_COL_STATS + ".\"COLUMN_NAME\", " + PART_COL_STATS + ".\"COLUMN_TYPE\""; - start = doTrace ? System.nanoTime() : 0; - List noExtraColumnNames = new ArrayList(); - Map extraColumnNameTypeParts = new HashMap(); - try(QueryWrapper query = new QueryWrapper(pm.newQuery("javax.jdo.query.SQL", queryText))) { - Object qResult = executeWithArray(query.getInnerQuery(), - prepareParams(catName, dbName, tableName, partNames, colNames, - engine), queryText); - end = doTrace ? System.nanoTime() : 0; - MetastoreDirectSqlUtils.timingTrace(doTrace, queryText, start, end); - if (qResult == null) { - return Collections.emptyList(); - } - - List list = MetastoreDirectSqlUtils.ensureList(qResult); - for (Object[] row : list) { - String colName = (String) row[0]; - String colType = (String) row[1]; - // Extrapolation is not needed for this column if - // count(\"PARTITION_NAME\")==partNames.size() - // Or, extrapolation is not possible for this column if - // count(\"PARTITION_NAME\")<2 - Long count = MetastoreDirectSqlUtils.extractSqlLong(row[2]); - if (count == partNames.size() || count < 2) { - noExtraColumnNames.add(colName); - } else { - extraColumnNameTypeParts.put(colName, new String[] {colType, String.valueOf(count)}); - } - Deadline.checkTimeout(); - } - } - // Extrapolation is not needed for columns noExtraColumnNames - List list; - if (noExtraColumnNames.size() != 0) { - queryText = commonPrefix + " and \"COLUMN_NAME\" in (" - + makeParams(noExtraColumnNames.size()) + ")" + " and \"PARTITION_NAME\" in (" - + makeParams(partNames.size()) + ")" - + " and \"ENGINE\" = ? " - + " group by \"COLUMN_NAME\", \"COLUMN_TYPE\""; - start = doTrace ? System.nanoTime() : 0; - - try (QueryWrapper query = new QueryWrapper(pm.newQuery("javax.jdo.query.SQL", queryText))) { - Object qResult = executeWithArray(query.getInnerQuery(), - prepareParams(catName, dbName, tableName, partNames, noExtraColumnNames, engine), queryText); - if (qResult == null) { - return Collections.emptyList(); - } - list = MetastoreDirectSqlUtils.ensureList(qResult); - for (Object[] row : list) { - colStats.add(prepareCSObjWithAdjustedNDV(row, 0, - useDensityFunctionForNDVEstimation, ndvTuner)); - Deadline.checkTimeout(); - } - end = doTrace ? System.nanoTime() : 0; - MetastoreDirectSqlUtils.timingTrace(doTrace, queryText, start, end); - } - } - // Extrapolation is needed for extraColumnNames. - // give a sequence number for all the partitions - if (extraColumnNameTypeParts.size() != 0) { - Map indexMap = new HashMap(); - for (int index = 0; index < partNames.size(); index++) { - indexMap.put(partNames.get(index), index); - } - // get sum for all columns to reduce the number of queries - Map> sumMap = new HashMap>(); - queryText = "select \"COLUMN_NAME\", sum(\"NUM_NULLS\"), sum(\"NUM_TRUES\"), sum(\"NUM_FALSES\"), sum(\"NUM_DISTINCTS\")" - + " from " + PART_COL_STATS - + " inner join " + PARTITIONS + " on " + PART_COL_STATS + ".\"PART_ID\" = " + PARTITIONS + ".\"PART_ID\"" - + " inner join " + TBLS + " on " + PARTITIONS + ".\"TBL_ID\" = " + TBLS + ".\"TBL_ID\"" - + " inner join " + DBS + " on " + TBLS + ".\"DB_ID\" = " + DBS + ".\"DB_ID\"" - + " where " + DBS + ".\"CTLG_NAME\" = ? and " + DBS + ".\"NAME\" = ? and " + TBLS + ".\"TBL_NAME\" = ? " - + " and " + PART_COL_STATS + ".\"COLUMN_NAME\" in (" + makeParams(extraColumnNameTypeParts.size()) + ")" - + " and " + PARTITIONS + ".\"PART_NAME\" in (" + makeParams(partNames.size()) + ")" - + " and " + PART_COL_STATS + ".\"ENGINE\" = ? " - + " group by " + PART_COL_STATS + ".\"COLUMN_NAME\""; - start = doTrace ? System.nanoTime() : 0; - try (QueryWrapper query = new QueryWrapper(pm.newQuery("javax.jdo.query.SQL", queryText))) { - List extraColumnNames = new ArrayList(); - extraColumnNames.addAll(extraColumnNameTypeParts.keySet()); - Object qResult = executeWithArray(query.getInnerQuery(), - prepareParams(catName, dbName, tableName, partNames, - extraColumnNames, engine), queryText); - if (qResult == null) { - return Collections.emptyList(); - } - list = MetastoreDirectSqlUtils.ensureList(qResult); - // see the indexes for colstats in IExtrapolatePartStatus - Integer[] sumIndex = new Integer[] {6, 10, 11, 15}; - for (Object[] row : list) { - Map indexToObject = new HashMap(); - for (int ind = 1; ind < row.length; ind++) { - indexToObject.put(sumIndex[ind - 1], row[ind]); - } - // row[0] is the column name - sumMap.put((String) row[0], indexToObject); - Deadline.checkTimeout(); - } - end = doTrace ? System.nanoTime() : 0; - MetastoreDirectSqlUtils.timingTrace(doTrace, queryText, start, end); - } - for (Map.Entry entry : extraColumnNameTypeParts.entrySet()) { - Object[] row = new Object[IExtrapolatePartStatus.colStatNames.length + 2]; - String colName = entry.getKey(); - String colType = entry.getValue()[0]; - Long sumVal = Long.parseLong(entry.getValue()[1]); - // fill in colname - row[0] = colName; - // fill in coltype - row[1] = colType; - // use linear extrapolation. more complicated one can be added in the - // future. - IExtrapolatePartStatus extrapolateMethod = new LinearExtrapolatePartStatus(); - // fill in colstatus - Integer[] index = null; - boolean decimal = false; - if (colType.toLowerCase().startsWith("decimal")) { - index = IExtrapolatePartStatus.indexMaps.get("decimal"); - decimal = true; - } else { - index = IExtrapolatePartStatus.indexMaps.get(colType.toLowerCase()); - } - // if the colType is not the known type, long, double, etc, then get - // all index. - if (index == null) { - index = IExtrapolatePartStatus.indexMaps.get("default"); - } - for (int colStatIndex : index) { - String colStatName = IExtrapolatePartStatus.colStatNames[colStatIndex]; - // if the aggregation type is sum, we do a scale-up - if (IExtrapolatePartStatus.aggrTypes[colStatIndex] == IExtrapolatePartStatus.AggrType.Sum) { - Object o = sumMap.get(colName).get(colStatIndex); - if (o == null) { - row[2 + colStatIndex] = null; - } else { - Long val = MetastoreDirectSqlUtils.extractSqlLong(o); - row[2 + colStatIndex] = val / sumVal * (partNames.size()); - } - } else if (IExtrapolatePartStatus.aggrTypes[colStatIndex] == IExtrapolatePartStatus.AggrType.Min - || IExtrapolatePartStatus.aggrTypes[colStatIndex] == IExtrapolatePartStatus.AggrType.Max) { - // if the aggregation type is min/max, we extrapolate from the - // left/right borders - if (!decimal) { - queryText = "select \"" + colStatName + "\",\"PART_NAME\" from " + PART_COL_STATS - + " inner join " + PARTITIONS + " on " + PART_COL_STATS + ".\"PART_ID\" = " + PARTITIONS + ".\"PART_ID\"" - + " inner join " + TBLS + " on " + PARTITIONS + ".\"TBL_ID\" = " + TBLS + ".\"TBL_ID\"" - + " inner join " + DBS + " on " + TBLS + ".\"DB_ID\" = " + DBS + ".\"DB_ID\"" - + " where " + DBS + ".\"CTLG_NAME\" = ? and " + DBS + ".\"NAME\" = ? and " + TBLS + ".\"TBL_NAME\" = ? " - + " and " + PART_COL_STATS + ".\"COLUMN_NAME\" = ? " - + " and " + PARTITIONS + ".\"PART_NAME\" in (" + makeParams(partNames.size()) + ")" - + " and " + PART_COL_STATS + ".\"ENGINE\" = ? " - + " order by \"" + colStatName + "\""; - } else { - queryText = "select \"" + colStatName + "\",\"PART_NAME\" from " + PART_COL_STATS - + " inner join " + PARTITIONS + " on " + PART_COL_STATS + ".\"PART_ID\" = " + PARTITIONS + ".\"PART_ID\"" - + " inner join " + TBLS + " on " + PARTITIONS + ".\"TBL_ID\" = " + TBLS + ".\"TBL_ID\"" - + " inner join " + DBS + " on " + TBLS + ".\"DB_ID\" = " + DBS + ".\"DB_ID\"" - + " where " + DBS + ".\"CTLG_NAME\" = ? and " + DBS + ".\"NAME\" = ? and " + TBLS + ".\"TBL_NAME\" = ? " - + " and " + PART_COL_STATS + ".\"COLUMN_NAME\" = ? " - + " and " + PARTITIONS + ".\"PART_NAME\" in (" + makeParams(partNames.size()) + ")" - + " and " + PART_COL_STATS + ".\"ENGINE\" = ? " - + " order by cast(\"" + colStatName + "\" as decimal)"; - } - start = doTrace ? System.nanoTime() : 0; - try (QueryWrapper query = new QueryWrapper(pm.newQuery("javax.jdo.query.SQL", queryText))) { - Object qResult = executeWithArray(query.getInnerQuery(), - prepareParams(catName, dbName, tableName, partNames, Arrays.asList(colName), engine), queryText); - if (qResult == null) { - return Collections.emptyList(); - } - fqr = (ForwardQueryResult) qResult; - Object[] min = (Object[]) (fqr.get(0)); - Object[] max = (Object[]) (fqr.get(fqr.size() - 1)); - end = doTrace ? System.nanoTime() : 0; - MetastoreDirectSqlUtils.timingTrace(doTrace, queryText, start, end); - if (min[0] == null || max[0] == null) { - row[2 + colStatIndex] = null; - } else { - row[2 + colStatIndex] = extrapolateMethod - .extrapolate(min, max, colStatIndex, indexMap); - } - } - } else { - // if the aggregation type is avg, we use the average on the existing ones. - queryText = "select " - + "avg((\"LONG_HIGH_VALUE\"-\"LONG_LOW_VALUE\")/cast(\"NUM_DISTINCTS\" as decimal))," - + "avg((\"DOUBLE_HIGH_VALUE\"-\"DOUBLE_LOW_VALUE\")/\"NUM_DISTINCTS\")," - + "avg((cast(\"BIG_DECIMAL_HIGH_VALUE\" as decimal)-cast(\"BIG_DECIMAL_LOW_VALUE\" as decimal))/\"NUM_DISTINCTS\")" - + " from " + PART_COL_STATS + "" - + " inner join " + PARTITIONS + " on " + PART_COL_STATS + ".\"PART_ID\" = " + PARTITIONS + ".\"PART_ID\"" - + " inner join " + TBLS + " on " + PARTITIONS + ".\"TBL_ID\" = " + TBLS + ".\"TBL_ID\"" - + " inner join " + DBS + " on " + TBLS + ".\"DB_ID\" = " + DBS + ".\"DB_ID\"" - + " where " + DBS + ".\"CTLG_NAME\" = ? and " + DBS + ".\"NAME\" = ? and " + TBLS + ".\"TBL_NAME\" = ? " - + " and " + PART_COL_STATS + ".\"COLUMN_NAME\" = ? " - + " and " + PARTITIONS + ".\"PART_NAME\" in (" + makeParams(partNames.size()) + ")" - + " and " + PART_COL_STATS + ".\"ENGINE\" = ? " - + " group by \"COLUMN_NAME\""; - start = doTrace ? System.nanoTime() : 0; - try(QueryWrapper query = new QueryWrapper(pm.newQuery("javax.jdo.query.SQL", queryText))) { - Object qResult = executeWithArray(query.getInnerQuery(), - prepareParams(catName, dbName, tableName, partNames, Arrays.asList(colName), engine), queryText); - if (qResult == null) { - return Collections.emptyList(); - } - fqr = (ForwardQueryResult) qResult; - Object[] avg = (Object[]) (fqr.get(0)); - // colStatIndex=12,13,14 respond to "AVG_LONG", "AVG_DOUBLE", - // "AVG_DECIMAL" - row[2 + colStatIndex] = avg[colStatIndex - 12]; - end = doTrace ? System.nanoTime() : 0; - MetastoreDirectSqlUtils.timingTrace(doTrace, queryText, start, end); - } - } - } - colStats.add(prepareCSObjWithAdjustedNDV(row, 0, useDensityFunctionForNDVEstimation, ndvTuner)); - Deadline.checkTimeout(); - } - } - return colStats; - } - } - private ColumnStatisticsObj prepareCSObj (Object[] row, int i) throws MetaException { ColumnStatisticsData data = new ColumnStatisticsData(); ColumnStatisticsObj cso = new ColumnStatisticsObj((String)row[i++], (String)row[i++], data); @@ -2265,20 +1953,6 @@ private ColumnStatisticsObj prepareCSObj (Object[] row, int i) throws MetaExcept return cso; } - private ColumnStatisticsObj prepareCSObjWithAdjustedNDV(Object[] row, int i, - boolean useDensityFunctionForNDVEstimation, double ndvTuner) throws MetaException { - ColumnStatisticsData data = new ColumnStatisticsData(); - ColumnStatisticsObj cso = new ColumnStatisticsObj((String) row[i++], (String) row[i++], data); - Object llow = row[i++], lhigh = row[i++], dlow = row[i++], dhigh = row[i++], declow = row[i++], - dechigh = row[i++], nulls = row[i++], dist = row[i++], avglen = row[i++], maxlen = row[i++], - trues = row[i++], falses = row[i++], avgLong = row[i++], avgDouble = row[i++], - avgDecimal = row[i++], sumDist = row[i++]; - StatObjectConverter.fillColumnStatisticsData(cso.getColType(), data, llow, lhigh, dlow, dhigh, - declow, dechigh, nulls, dist, avglen, maxlen, trues, falses, avgLong, avgDouble, - avgDecimal, sumDist, useDensityFunctionForNDVEstimation, ndvTuner); - return cso; - } - private Object[] prepareParams(String catName, String dbName, String tableName, List partNames, List colNames, String engine) throws MetaException { Object[] params = new Object[colNames.size() + partNames.size() + 4]; From 288f14321c646920797c8eb3d4bc6a43ed2d6ed2 Mon Sep 17 00:00:00 2001 From: ramitg254 Date: Wed, 24 Sep 2025 21:04:02 +0530 Subject: [PATCH 6/6] removal of columnStatisticsObjForPartitionsBatch --- .../hadoop/hive/metastore/MetaStoreDirectSql.java | 15 +++------------ 1 file changed, 3 insertions(+), 12 deletions(-) diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/MetaStoreDirectSql.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/MetaStoreDirectSql.java index 808889298a1d..fb595b02713f 100644 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/MetaStoreDirectSql.java +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/MetaStoreDirectSql.java @@ -1880,8 +1880,9 @@ private List columnStatisticsObjForPartitions( return Batchable.runBatched(batchSize, colNames, new Batchable() { @Override public List run(final List inputColNames) throws MetaException { - return columnStatisticsObjForPartitionsBatch(catName, dbName, tableName, partNames, inputColNames, engine, - areAllPartsFound, useDensityFunctionForNDVEstimation, ndvTuner, enableBitVector, enableKll); + /** Should be called with the list short enough to not trip up Oracle/etc. */ + return aggrStatsUseJava(catName, dbName, tableName, partNames, inputColNames, engine, areAllPartsFound, + useDensityFunctionForNDVEstimation, ndvTuner, enableBitVector, enableKll); } }); } @@ -1920,16 +1921,6 @@ public List getColStatsForAllTablePartitions(String c return colStatsForDB; } - /** Should be called with the list short enough to not trip up Oracle/etc. */ - private List columnStatisticsObjForPartitionsBatch(String catName, String dbName, - String tableName, List partNames, List colNames, String engine, - boolean areAllPartsFound, boolean useDensityFunctionForNDVEstimation, double ndvTuner, - boolean enableBitVector, boolean enableKll) - throws MetaException { - return aggrStatsUseJava(catName, dbName, tableName, partNames, colNames, engine, areAllPartsFound, - useDensityFunctionForNDVEstimation, ndvTuner, enableBitVector, enableKll); - } - private List aggrStatsUseJava(String catName, String dbName, String tableName, List partNames, List colNames, String engine, boolean areAllPartsFound, boolean useDensityFunctionForNDVEstimation, double ndvTuner, boolean enableBitVector,