Skip to content

Commit 5f059e9

Browse files
committed
Enabling partition stats by default
1 parent c984043 commit 5f059e9

File tree

14 files changed

+427
-59
lines changed

14 files changed

+427
-59
lines changed

Diff for: hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java

+3
Original file line numberDiff line numberDiff line change
@@ -1058,6 +1058,9 @@ private boolean shouldDeleteMetadataPartition(MetadataPartitionType partitionTyp
10581058
case COLUMN_STATS:
10591059
metadataIndexDisabled = !config.isMetadataColumnStatsIndexEnabled();
10601060
break;
1061+
case PARTITION_STATS:
1062+
metadataIndexDisabled = !config.isPartitionStatsIndexEnabled();
1063+
break;
10611064
case BLOOM_FILTERS:
10621065
metadataIndexDisabled = !config.isMetadataBloomFilterIndexEnabled();
10631066
break;

Diff for: hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java

+29-15
Original file line numberDiff line numberDiff line change
@@ -109,7 +109,6 @@
109109
import static org.apache.hudi.common.util.DateTimeUtils.microsToInstant;
110110
import static org.apache.hudi.common.util.StringUtils.getUTF8Bytes;
111111
import static org.apache.hudi.common.util.ValidationUtils.checkState;
112-
import static org.apache.hudi.metadata.HoodieMetadataPayload.SCHEMA_FIELD_ID_COLUMN_STATS;
113112
import static org.apache.hudi.metadata.HoodieTableMetadataUtil.tryUpcastDecimal;
114113

115114
/**
@@ -1474,16 +1473,14 @@ public static Comparable<?> unwrapAvroValueWrapper(Object avroValueWrapper) {
14741473
* @param avroValueWrapper A wrapped value with Avro type wrapper.
14751474
* @return Java value.
14761475
*/
1477-
public static Comparable<?> unwrapAvroValueWrapper(Object avroValueWrapper, boolean handleObfuscatedFlow, Option<String> fieldName, Option<GenericRecord> record) {
1476+
public static Comparable<?> unwrapAvroValueWrapper(Object avroValueWrapper, boolean handleObfuscatedFlow, Option<String> fieldName, Option<GenericRecord> colStatsRecord) {
14781477
if (avroValueWrapper == null) {
14791478
return null;
14801479
}
14811480

1482-
if (handleObfuscatedFlow) {
1483-
Pair<Boolean, String> isValueWrapperObfuscated = getIsValueWrapperObfuscated(record.get(), fieldName.get());
1484-
if (isValueWrapperObfuscated.getKey()) {
1485-
return unwrapAvroValueWrapper(avroValueWrapper, isValueWrapperObfuscated.getValue());
1486-
}
1481+
Pair<Boolean, String> isValueWrapperObfuscated = getIsValueWrapperObfuscated(avroValueWrapper);
1482+
if (isValueWrapperObfuscated.getKey()) {
1483+
return unwrapAvroValueWrapper(avroValueWrapper, isValueWrapperObfuscated.getValue());
14871484
}
14881485

14891486
if (avroValueWrapper instanceof DateWrapper) {
@@ -1524,27 +1521,44 @@ public static Comparable<?> unwrapAvroValueWrapper(Object avroValueWrapper, Stri
15241521
if (avroValueWrapper == null) {
15251522
return null;
15261523
} else if (DateWrapper.class.getSimpleName().equals(wrapperClassName)) {
1527-
return Date.valueOf(LocalDate.ofEpochDay((Integer)((Record) avroValueWrapper).get(0)));
1524+
if (avroValueWrapper instanceof GenericRecord) {
1525+
return Date.valueOf(LocalDate.ofEpochDay((Integer) ((GenericRecord) avroValueWrapper).get(0)));
1526+
} else {
1527+
return Date.valueOf(LocalDate.ofEpochDay((Integer) ((Record) avroValueWrapper).get(0)));
1528+
}
15281529
} else if (LocalDateWrapper.class.getSimpleName().equals(wrapperClassName)) {
1529-
return LocalDate.ofEpochDay((Integer)((Record) avroValueWrapper).get(0));
1530+
if (avroValueWrapper instanceof GenericRecord) {
1531+
return LocalDate.ofEpochDay((Integer) ((GenericRecord) avroValueWrapper).get(0));
1532+
} else {
1533+
return LocalDate.ofEpochDay((Integer) ((Record) avroValueWrapper).get(0));
1534+
}
15301535
} else if (TimestampMicrosWrapper.class.getSimpleName().equals(wrapperClassName)) {
1531-
Instant instant = microsToInstant((Long)((Record) avroValueWrapper).get(0));
1532-
return Timestamp.from(instant);
1536+
if (avroValueWrapper instanceof GenericRecord) {
1537+
Instant instant = microsToInstant((Long) ((GenericRecord) avroValueWrapper).get(0));
1538+
return Timestamp.from(instant);
1539+
} else {
1540+
Instant instant = microsToInstant((Long) ((Record) avroValueWrapper).get(0));
1541+
return Timestamp.from(instant);
1542+
}
15331543
} else if (DecimalWrapper.class.getSimpleName().equals(wrapperClassName)) {
15341544
Schema valueSchema = DecimalWrapper.SCHEMA$.getField("value").schema();
1535-
return AVRO_DECIMAL_CONVERSION.fromBytes((ByteBuffer) ((Record) avroValueWrapper).get(0), valueSchema, valueSchema.getLogicalType());
1545+
if (avroValueWrapper instanceof GenericRecord) {
1546+
return AVRO_DECIMAL_CONVERSION.fromBytes((ByteBuffer)((GenericRecord) avroValueWrapper).get(0), valueSchema, valueSchema.getLogicalType());
1547+
} else {
1548+
return AVRO_DECIMAL_CONVERSION.fromBytes((ByteBuffer) ((Record) avroValueWrapper).get(0), valueSchema, valueSchema.getLogicalType());
1549+
}
15361550
} else {
15371551
throw new UnsupportedOperationException(String.format("Unsupported type of the value (%s)", avroValueWrapper.getClass()));
15381552
}
15391553
}
15401554

1541-
private static Pair<Boolean, String> getIsValueWrapperObfuscated(GenericRecord record, String subFieldName) {
1542-
Object statsValue = ((GenericRecord) record.get(SCHEMA_FIELD_ID_COLUMN_STATS)).get(subFieldName);
1555+
private static Pair<Boolean, String> getIsValueWrapperObfuscated(Object statsValue) {
15431556
if (statsValue != null) {
15441557
String statsValueSchemaClassName = ((GenericRecord) statsValue).getSchema().getName();
15451558
boolean toReturn = statsValueSchemaClassName.equals(DateWrapper.class.getSimpleName())
15461559
|| statsValueSchemaClassName.equals(LocalDateWrapper.class.getSimpleName())
1547-
|| statsValueSchemaClassName.equals(TimestampMicrosWrapper.class.getSimpleName());
1560+
|| statsValueSchemaClassName.equals(TimestampMicrosWrapper.class.getSimpleName())
1561+
|| statsValueSchemaClassName.equals(DecimalWrapper.class.getSimpleName());
15481562
if (toReturn) {
15491563
return Pair.of(true, ((GenericRecord) statsValue).getSchema().getName());
15501564
}

Diff for: hudi-common/src/main/java/org/apache/hudi/common/config/HoodieMetadataConfig.java

+1
Original file line numberDiff line numberDiff line change
@@ -763,6 +763,7 @@ public Builder withSecondaryIndexParallelism(int parallelism) {
763763
public HoodieMetadataConfig build() {
764764
metadataConfig.setDefaultValue(ENABLE, getDefaultMetadataEnable(engineType));
765765
metadataConfig.setDefaultValue(ENABLE_METADATA_INDEX_COLUMN_STATS, getDefaultColStatsEnable(engineType));
766+
metadataConfig.setDefaultValue(ENABLE_METADATA_INDEX_PARTITION_STATS, getDefaultColStatsEnable(engineType));
766767
// fix me: disable when schema on read is enabled.
767768
metadataConfig.setDefaults(HoodieMetadataConfig.class.getName());
768769
return metadataConfig;

Diff for: hudi-common/src/main/java/org/apache/hudi/common/util/FileFormatUtils.java

+8-1
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,14 @@ public static <T extends Comparable<T>> HoodieColumnRangeMetadata<T> getColumnRa
6666
.map(e -> HoodieColumnRangeMetadata.create(
6767
relativePartitionPath, e.getColumnName(), e.getMinValue(), e.getMaxValue(),
6868
e.getNullCount(), e.getValueCount(), e.getTotalSize(), e.getTotalUncompressedSize()))
69-
.reduce(HoodieColumnRangeMetadata::merge).orElseThrow(() -> new HoodieException("MergingColumnRanges failed."));
69+
.reduce((a,b) -> {
70+
try {
71+
return HoodieColumnRangeMetadata.merge(a, b);
72+
} catch (ClassCastException cce) {
73+
System.out.println("asdfasd");
74+
throw cce;
75+
}
76+
}).orElseThrow(() -> new HoodieException("MergingColumnRanges failed."));
7077
}
7178

7279
/**

Diff for: hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java

+7-5
Original file line numberDiff line numberDiff line change
@@ -1803,14 +1803,14 @@ private static Comparable<?> coerceToComparable(Schema schema, Object val) {
18031803

18041804
private static boolean isColumnTypeSupported(Schema schema, Option<HoodieRecordType> recordType) {
18051805
// if record type is set and if its AVRO, MAP, ARRAY, RECORD and ENUM types are unsupported.
1806-
if (recordType.isPresent() && recordType.get() == HoodieRecordType.AVRO) {
1807-
return (schema.getType() != Schema.Type.RECORD && schema.getType() != Schema.Type.ARRAY && schema.getType() != Schema.Type.MAP
1806+
//if (recordType.isPresent() && recordType.get() == HoodieRecordType.AVRO) {
1807+
return (schema.getType() != Schema.Type.RECORD && schema.getType() != Schema.Type.ARRAY && schema.getType() != Schema.Type.MAP
18081808
&& schema.getType() != Schema.Type.ENUM);
1809-
}
1809+
//}
18101810
// if record Type is not set or if recordType is SPARK then we cannot support AVRO, MAP, ARRAY, RECORD, ENUM and FIXED and BYTES type as well.
18111811
// HUDI-8585 will add support for BYTES and FIXED
1812-
return schema.getType() != Schema.Type.RECORD && schema.getType() != Schema.Type.ARRAY && schema.getType() != Schema.Type.MAP
1813-
&& schema.getType() != Schema.Type.ENUM && schema.getType() != Schema.Type.BYTES && schema.getType() != Schema.Type.FIXED;
1812+
//return schema.getType() != Schema.Type.RECORD && schema.getType() != Schema.Type.ARRAY && schema.getType() != Schema.Type.MAP
1813+
// && schema.getType() != Schema.Type.ENUM && schema.getType() != Schema.Type.BYTES && schema.getType() != Schema.Type.FIXED;
18141814
}
18151815

18161816
public static Set<String> getInflightMetadataPartitions(HoodieTableConfig tableConfig) {
@@ -2540,6 +2540,8 @@ public static HoodieData<HoodieRecord> convertMetadataToPartitionStatRecords(Hoo
25402540
List<HoodieColumnRangeMetadata<Comparable>> fileColumnMetadata = partitionedWriteStat.stream()
25412541
.flatMap(writeStat -> translateWriteStatToFileStats(writeStat, dataMetaClient, validColumnsToIndex, tableSchema).stream())
25422542
.collect(toList());
2543+
2544+
25432545
if (shouldScanColStatsForTightBound) {
25442546
checkState(tableMetadata != null, "tableMetadata should not be null when scanning metadata table");
25452547
// Collect Column Metadata for Each File part of active file system view of latest snapshot

Diff for: hudi-common/src/main/java/org/apache/hudi/metadata/MetadataPartitionType.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -302,8 +302,8 @@ private static void constructColumnStatsMetadataPayload(HoodieMetadataPayload pa
302302
// AVRO-2377 1.9.2 Modified the type of org.apache.avro.Schema#FIELD_RESERVED to Collections.unmodifiableSet.
303303
// This causes Kryo to fail when deserializing a GenericRecord, See HUDI-5484.
304304
// We should avoid using GenericRecord and convert GenericRecord into a serializable type.
305-
.setMinValue(wrapValueIntoAvro(unwrapAvroValueWrapper(columnStatsRecord.get(COLUMN_STATS_FIELD_MIN_VALUE), true, Option.of(COLUMN_STATS_FIELD_MIN_VALUE), Option.of(record))))
306-
.setMaxValue(wrapValueIntoAvro(unwrapAvroValueWrapper(columnStatsRecord.get(COLUMN_STATS_FIELD_MAX_VALUE), true, Option.of(COLUMN_STATS_FIELD_MAX_VALUE), Option.of(record))))
305+
.setMinValue(wrapValueIntoAvro(unwrapAvroValueWrapper(columnStatsRecord.get(COLUMN_STATS_FIELD_MIN_VALUE), true, Option.of(COLUMN_STATS_FIELD_MIN_VALUE), Option.of(columnStatsRecord))))
306+
.setMaxValue(wrapValueIntoAvro(unwrapAvroValueWrapper(columnStatsRecord.get(COLUMN_STATS_FIELD_MAX_VALUE), true, Option.of(COLUMN_STATS_FIELD_MAX_VALUE), Option.of(columnStatsRecord))))
307307
.setValueCount((Long) columnStatsRecord.get(COLUMN_STATS_FIELD_VALUE_COUNT))
308308
.setNullCount((Long) columnStatsRecord.get(COLUMN_STATS_FIELD_NULL_COUNT))
309309
.setTotalSize((Long) columnStatsRecord.get(COLUMN_STATS_FIELD_TOTAL_SIZE))

Diff for: hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/ColumnStatsIndexSupport.scala

+2-2
Original file line numberDiff line numberDiff line change
@@ -229,7 +229,7 @@ class ColumnStatsIndexSupport(spark: SparkSession,
229229
// NOTE: It's crucial to maintain appropriate ordering of the columns
230230
// matching table layout: hence, we cherry-pick individual columns
231231
// instead of simply filtering in the ones we're interested in the schema
232-
val (indexSchema, targetIndexedColumns) = composeIndexSchema(sortedTargetColumnsSet.toSeq, indexedColumns, tableSchema)
232+
val (indexSchema, targetIndexedColumns) = composeIndexSchema(sortedTargetColumnsSet.toSeq, indexedColumns.toSeq, tableSchema)
233233

234234
// Here we perform complex transformation which requires us to modify the layout of the rows
235235
// of the dataset, and therefore we rely on low-level RDD API to avoid incurring encoding/decoding
@@ -401,7 +401,7 @@ object ColumnStatsIndexSupport {
401401
/**
402402
* @VisibleForTesting
403403
*/
404-
def composeIndexSchema(targetColumnNames: Seq[String], indexedColumns: Set[String], tableSchema: StructType): (StructType, Seq[String]) = {
404+
def composeIndexSchema(targetColumnNames: Seq[String], indexedColumns: Seq[String], tableSchema: StructType): (StructType, Seq[String]) = {
405405
val fileNameField = StructField(HoodieMetadataPayload.COLUMN_STATS_FIELD_FILE_NAME, StringType, nullable = true, Metadata.empty)
406406
val valueCountField = StructField(HoodieMetadataPayload.COLUMN_STATS_FIELD_VALUE_COUNT, LongType, nullable = true, Metadata.empty)
407407

Diff for: hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/ColumnStatsIndexHelper.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -241,7 +241,7 @@ public static Dataset<Row> buildColumnStatsTableFor(
241241

242242
StructType indexSchema = ColumnStatsIndexSupport$.MODULE$.composeIndexSchema(
243243
JavaScalaConverters.<String>convertJavaListToScalaSeq(columnNames),
244-
JavaScalaConverters.convertJavaListToScalaList(columnNames).toSet(),
244+
JavaScalaConverters.convertJavaListToScalaList(columnNames),
245245
StructType$.MODULE$.apply(orderedColumnSchemas)
246246
)._1;
247247

Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
{"c1_maxValue":959,"c1_minValue":0,"c1_nullCount":0,"c2_maxValue":" 959sdc","c2_minValue":" 0sdc","c2_nullCount":0,"c3_maxValue":994.355,"c3_minValue":19.000,"c3_nullCount":0,"c4_maxValue":"2021-11-19T20:40:55.550-08:00","c4_minValue":"2021-11-19T20:40:55.339-08:00","c4_nullCount":0,"c5_maxValue":97,"c5_minValue":1,"c5_nullCount":0,"c6_maxValue":"2020-11-22","c6_minValue":"2020-01-01","c6_nullCount":0,"c7_maxValue":"1Q==","c7_minValue":"AA==","c7_nullCount":0,"c8_maxValue":9,"c8_minValue":9,"c8_nullCount":0,"valueCount":40}

Diff for: hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestDataSkippingUtils.scala

+1-1
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,7 @@ class TestDataSkippingUtils extends HoodieSparkClientTestBase with SparkAdapterS
8585
)
8686
)
8787

88-
val (indexSchema: StructType, targetIndexedColumns: Seq[String]) = composeIndexSchema(indexedCols, indexedCols.toSet, sourceTableSchema)
88+
val (indexSchema: StructType, targetIndexedColumns: Seq[String]) = composeIndexSchema(indexedCols, indexedCols, sourceTableSchema)
8989

9090
@ParameterizedTest
9191
@MethodSource(Array(

0 commit comments

Comments
 (0)