Skip to content

Commit

Permalink
Enabling partition stats by default
Browse files Browse the repository at this point in the history
  • Loading branch information
nsivabalan committed Jan 17, 2025
1 parent c984043 commit e244110
Show file tree
Hide file tree
Showing 15 changed files with 433 additions and 65 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1058,6 +1058,9 @@ private boolean shouldDeleteMetadataPartition(MetadataPartitionType partitionTyp
case COLUMN_STATS:
metadataIndexDisabled = !config.isMetadataColumnStatsIndexEnabled();
break;
case PARTITION_STATS:
metadataIndexDisabled = !config.isPartitionStatsIndexEnabled();
break;
case BLOOM_FILTERS:
metadataIndexDisabled = !config.isMetadataBloomFilterIndexEnabled();
break;
Expand Down
44 changes: 29 additions & 15 deletions hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,6 @@
import static org.apache.hudi.common.util.DateTimeUtils.microsToInstant;
import static org.apache.hudi.common.util.StringUtils.getUTF8Bytes;
import static org.apache.hudi.common.util.ValidationUtils.checkState;
import static org.apache.hudi.metadata.HoodieMetadataPayload.SCHEMA_FIELD_ID_COLUMN_STATS;
import static org.apache.hudi.metadata.HoodieTableMetadataUtil.tryUpcastDecimal;

/**
Expand Down Expand Up @@ -1474,16 +1473,14 @@ public static Comparable<?> unwrapAvroValueWrapper(Object avroValueWrapper) {
* @param avroValueWrapper A wrapped value with Avro type wrapper.
* @return Java value.
*/
public static Comparable<?> unwrapAvroValueWrapper(Object avroValueWrapper, boolean handleObfuscatedFlow, Option<String> fieldName, Option<GenericRecord> record) {
public static Comparable<?> unwrapAvroValueWrapper(Object avroValueWrapper, boolean handleObfuscatedFlow, Option<String> fieldName, Option<GenericRecord> colStatsRecord) {
if (avroValueWrapper == null) {
return null;
}

if (handleObfuscatedFlow) {
Pair<Boolean, String> isValueWrapperObfuscated = getIsValueWrapperObfuscated(record.get(), fieldName.get());
if (isValueWrapperObfuscated.getKey()) {
return unwrapAvroValueWrapper(avroValueWrapper, isValueWrapperObfuscated.getValue());
}
Pair<Boolean, String> isValueWrapperObfuscated = getIsValueWrapperObfuscated(avroValueWrapper);
if (isValueWrapperObfuscated.getKey()) {
return unwrapAvroValueWrapper(avroValueWrapper, isValueWrapperObfuscated.getValue());
}

if (avroValueWrapper instanceof DateWrapper) {
Expand Down Expand Up @@ -1524,27 +1521,44 @@ public static Comparable<?> unwrapAvroValueWrapper(Object avroValueWrapper, Stri
if (avroValueWrapper == null) {
return null;
} else if (DateWrapper.class.getSimpleName().equals(wrapperClassName)) {
return Date.valueOf(LocalDate.ofEpochDay((Integer)((Record) avroValueWrapper).get(0)));
if (avroValueWrapper instanceof GenericRecord) {
return Date.valueOf(LocalDate.ofEpochDay((Integer) ((GenericRecord) avroValueWrapper).get(0)));
} else {
return Date.valueOf(LocalDate.ofEpochDay((Integer) ((Record) avroValueWrapper).get(0)));
}
} else if (LocalDateWrapper.class.getSimpleName().equals(wrapperClassName)) {
return LocalDate.ofEpochDay((Integer)((Record) avroValueWrapper).get(0));
if (avroValueWrapper instanceof GenericRecord) {
return LocalDate.ofEpochDay((Integer) ((GenericRecord) avroValueWrapper).get(0));
} else {
return LocalDate.ofEpochDay((Integer) ((Record) avroValueWrapper).get(0));
}
} else if (TimestampMicrosWrapper.class.getSimpleName().equals(wrapperClassName)) {
Instant instant = microsToInstant((Long)((Record) avroValueWrapper).get(0));
return Timestamp.from(instant);
if (avroValueWrapper instanceof GenericRecord) {
Instant instant = microsToInstant((Long) ((GenericRecord) avroValueWrapper).get(0));
return Timestamp.from(instant);
} else {
Instant instant = microsToInstant((Long) ((Record) avroValueWrapper).get(0));
return Timestamp.from(instant);
}
} else if (DecimalWrapper.class.getSimpleName().equals(wrapperClassName)) {
Schema valueSchema = DecimalWrapper.SCHEMA$.getField("value").schema();
return AVRO_DECIMAL_CONVERSION.fromBytes((ByteBuffer) ((Record) avroValueWrapper).get(0), valueSchema, valueSchema.getLogicalType());
if (avroValueWrapper instanceof GenericRecord) {
return AVRO_DECIMAL_CONVERSION.fromBytes((ByteBuffer)((GenericRecord) avroValueWrapper).get(0), valueSchema, valueSchema.getLogicalType());
} else {
return AVRO_DECIMAL_CONVERSION.fromBytes((ByteBuffer) ((Record) avroValueWrapper).get(0), valueSchema, valueSchema.getLogicalType());
}
} else {
throw new UnsupportedOperationException(String.format("Unsupported type of the value (%s)", avroValueWrapper.getClass()));
}
}

private static Pair<Boolean, String> getIsValueWrapperObfuscated(GenericRecord record, String subFieldName) {
Object statsValue = ((GenericRecord) record.get(SCHEMA_FIELD_ID_COLUMN_STATS)).get(subFieldName);
private static Pair<Boolean, String> getIsValueWrapperObfuscated(Object statsValue) {
if (statsValue != null) {
String statsValueSchemaClassName = ((GenericRecord) statsValue).getSchema().getName();
boolean toReturn = statsValueSchemaClassName.equals(DateWrapper.class.getSimpleName())
|| statsValueSchemaClassName.equals(LocalDateWrapper.class.getSimpleName())
|| statsValueSchemaClassName.equals(TimestampMicrosWrapper.class.getSimpleName());
|| statsValueSchemaClassName.equals(TimestampMicrosWrapper.class.getSimpleName())
|| statsValueSchemaClassName.equals(DecimalWrapper.class.getSimpleName());
if (toReturn) {
return Pair.of(true, ((GenericRecord) statsValue).getSchema().getName());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -763,6 +763,7 @@ public Builder withSecondaryIndexParallelism(int parallelism) {
public HoodieMetadataConfig build() {
metadataConfig.setDefaultValue(ENABLE, getDefaultMetadataEnable(engineType));
metadataConfig.setDefaultValue(ENABLE_METADATA_INDEX_COLUMN_STATS, getDefaultColStatsEnable(engineType));
metadataConfig.setDefaultValue(ENABLE_METADATA_INDEX_PARTITION_STATS, getDefaultColStatsEnable(engineType));
// fix me: disable when schema on read is enabled.
metadataConfig.setDefaults(HoodieMetadataConfig.class.getName());
return metadataConfig;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,14 @@ public static <T extends Comparable<T>> HoodieColumnRangeMetadata<T> getColumnRa
.map(e -> HoodieColumnRangeMetadata.create(
relativePartitionPath, e.getColumnName(), e.getMinValue(), e.getMaxValue(),
e.getNullCount(), e.getValueCount(), e.getTotalSize(), e.getTotalUncompressedSize()))
.reduce(HoodieColumnRangeMetadata::merge).orElseThrow(() -> new HoodieException("MergingColumnRanges failed."));
.reduce((a,b) -> {
try {
return HoodieColumnRangeMetadata.merge(a, b);
} catch (ClassCastException cce) {
System.out.println("asdfasd");
throw cce;
}
}).orElseThrow(() -> new HoodieException("MergingColumnRanges failed."));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1803,14 +1803,14 @@ private static Comparable<?> coerceToComparable(Schema schema, Object val) {

private static boolean isColumnTypeSupported(Schema schema, Option<HoodieRecordType> recordType) {
// if record type is set and if its AVRO, MAP, ARRAY, RECORD and ENUM types are unsupported.
if (recordType.isPresent() && recordType.get() == HoodieRecordType.AVRO) {
return (schema.getType() != Schema.Type.RECORD && schema.getType() != Schema.Type.ARRAY && schema.getType() != Schema.Type.MAP
//if (recordType.isPresent() && recordType.get() == HoodieRecordType.AVRO) {
return (schema.getType() != Schema.Type.RECORD && schema.getType() != Schema.Type.ARRAY && schema.getType() != Schema.Type.MAP
&& schema.getType() != Schema.Type.ENUM);
}
//}
// if record Type is not set or if recordType is SPARK then we cannot support AVRO, MAP, ARRAY, RECORD, ENUM and FIXED and BYTES type as well.
// HUDI-8585 will add support for BYTES and FIXED
return schema.getType() != Schema.Type.RECORD && schema.getType() != Schema.Type.ARRAY && schema.getType() != Schema.Type.MAP
&& schema.getType() != Schema.Type.ENUM && schema.getType() != Schema.Type.BYTES && schema.getType() != Schema.Type.FIXED;
//return schema.getType() != Schema.Type.RECORD && schema.getType() != Schema.Type.ARRAY && schema.getType() != Schema.Type.MAP
// && schema.getType() != Schema.Type.ENUM && schema.getType() != Schema.Type.BYTES && schema.getType() != Schema.Type.FIXED;
}

public static Set<String> getInflightMetadataPartitions(HoodieTableConfig tableConfig) {
Expand Down Expand Up @@ -2540,6 +2540,8 @@ public static HoodieData<HoodieRecord> convertMetadataToPartitionStatRecords(Hoo
List<HoodieColumnRangeMetadata<Comparable>> fileColumnMetadata = partitionedWriteStat.stream()
.flatMap(writeStat -> translateWriteStatToFileStats(writeStat, dataMetaClient, validColumnsToIndex, tableSchema).stream())
.collect(toList());


if (shouldScanColStatsForTightBound) {
checkState(tableMetadata != null, "tableMetadata should not be null when scanning metadata table");
// Collect Column Metadata for Each File part of active file system view of latest snapshot
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -302,8 +302,8 @@ private static void constructColumnStatsMetadataPayload(HoodieMetadataPayload pa
// AVRO-2377 1.9.2 Modified the type of org.apache.avro.Schema#FIELD_RESERVED to Collections.unmodifiableSet.
// This causes Kryo to fail when deserializing a GenericRecord, See HUDI-5484.
// We should avoid using GenericRecord and convert GenericRecord into a serializable type.
.setMinValue(wrapValueIntoAvro(unwrapAvroValueWrapper(columnStatsRecord.get(COLUMN_STATS_FIELD_MIN_VALUE), true, Option.of(COLUMN_STATS_FIELD_MIN_VALUE), Option.of(record))))
.setMaxValue(wrapValueIntoAvro(unwrapAvroValueWrapper(columnStatsRecord.get(COLUMN_STATS_FIELD_MAX_VALUE), true, Option.of(COLUMN_STATS_FIELD_MAX_VALUE), Option.of(record))))
.setMinValue(wrapValueIntoAvro(unwrapAvroValueWrapper(columnStatsRecord.get(COLUMN_STATS_FIELD_MIN_VALUE), true, Option.of(COLUMN_STATS_FIELD_MIN_VALUE), Option.of(columnStatsRecord))))
.setMaxValue(wrapValueIntoAvro(unwrapAvroValueWrapper(columnStatsRecord.get(COLUMN_STATS_FIELD_MAX_VALUE), true, Option.of(COLUMN_STATS_FIELD_MAX_VALUE), Option.of(columnStatsRecord))))
.setValueCount((Long) columnStatsRecord.get(COLUMN_STATS_FIELD_VALUE_COUNT))
.setNullCount((Long) columnStatsRecord.get(COLUMN_STATS_FIELD_NULL_COUNT))
.setTotalSize((Long) columnStatsRecord.get(COLUMN_STATS_FIELD_TOTAL_SIZE))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ public void testPartitionEnabledByConfigOnly(MetadataPartitionType partitionType
break;
case PARTITION_STATS:
metadataConfigBuilder.enable(true).withMetadataIndexPartitionStats(true).withColumnStatsIndexForColumns("partitionCol");
expectedEnabledPartitions = 3;
expectedEnabledPartitions = 2;
break;
default:
throw new IllegalArgumentException("Unknown partition type: " + partitionType);
Expand All @@ -93,10 +93,10 @@ public void testPartitionEnabledByConfigOnly(MetadataPartitionType partitionType

// Verify partition type is enabled due to config
if (partitionType == MetadataPartitionType.EXPRESSION_INDEX || partitionType == MetadataPartitionType.SECONDARY_INDEX) {
assertEquals(2 + 1, enabledPartitions.size(), "EXPRESSION_INDEX should be enabled by SQL, only FILES and SECONDARY_INDEX is enabled in this case.");
assertEquals(2 + 2, enabledPartitions.size(), "EXPRESSION_INDEX should be enabled by SQL, only FILES and SECONDARY_INDEX is enabled in this case.");
assertTrue(enabledPartitions.contains(MetadataPartitionType.FILES));
} else {
assertEquals(expectedEnabledPartitions + 1, enabledPartitions.size());
assertEquals(expectedEnabledPartitions + 2, enabledPartitions.size());
assertTrue(enabledPartitions.contains(partitionType) || MetadataPartitionType.ALL_PARTITIONS.equals(partitionType));
}
}
Expand All @@ -116,7 +116,7 @@ public void testPartitionAvailableByMetaClientOnly() {
List<MetadataPartitionType> enabledPartitions = MetadataPartitionType.getEnabledPartitions(metadataConfig.getProps(), metaClient);

// Verify RECORD_INDEX and FILES is enabled due to availability, and SECONDARY_INDEX by default
assertEquals(4, enabledPartitions.size(), "RECORD_INDEX, SECONDARY_INDEX, FILES, COL_STATS should be available");
assertEquals(5, enabledPartitions.size(), "RECORD_INDEX, SECONDARY_INDEX, FILES, COL_STATS, PARTITION_STATS should be available");
assertTrue(enabledPartitions.contains(MetadataPartitionType.FILES), "FILES should be enabled by availability");
assertTrue(enabledPartitions.contains(MetadataPartitionType.RECORD_INDEX), "RECORD_INDEX should be enabled by availability");
assertTrue(enabledPartitions.contains(MetadataPartitionType.SECONDARY_INDEX), "SECONDARY_INDEX should be enabled by default");
Expand Down Expand Up @@ -155,8 +155,8 @@ public void testExpressionIndexPartitionEnabled() {

List<MetadataPartitionType> enabledPartitions = MetadataPartitionType.getEnabledPartitions(metadataConfig.getProps(), metaClient);

// Verify EXPRESSION_INDEX and FILES is enabled due to availability, and SECONDARY_INDEX by default
assertEquals(4, enabledPartitions.size(), "EXPRESSION_INDEX, FILES, COL_STATS and SECONDARY_INDEX should be available");
// Verify EXPRESSION_INDEX and FILES is enabled due to availability, and SECONDARY_INDEX, COL_STATS and PARTITION_STATS by default
assertEquals(5, enabledPartitions.size(), "EXPRESSION_INDEX, FILES, COL_STATS and SECONDARY_INDEX should be available");
assertTrue(enabledPartitions.contains(MetadataPartitionType.FILES), "FILES should be enabled by availability");
assertTrue(enabledPartitions.contains(MetadataPartitionType.EXPRESSION_INDEX), "EXPRESSION_INDEX should be enabled by availability");
assertTrue(enabledPartitions.contains(MetadataPartitionType.SECONDARY_INDEX), "SECONDARY_INDEX should be enabled by default");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,7 @@ class ColumnStatsIndexSupport(spark: SparkSession,
// NOTE: It's crucial to maintain appropriate ordering of the columns
// matching table layout: hence, we cherry-pick individual columns
// instead of simply filtering in the ones we're interested in the schema
val (indexSchema, targetIndexedColumns) = composeIndexSchema(sortedTargetColumnsSet.toSeq, indexedColumns, tableSchema)
val (indexSchema, targetIndexedColumns) = composeIndexSchema(sortedTargetColumnsSet.toSeq, indexedColumns.toSeq, tableSchema)

// Here we perform complex transformation which requires us to modify the layout of the rows
// of the dataset, and therefore we rely on low-level RDD API to avoid incurring encoding/decoding
Expand Down Expand Up @@ -401,7 +401,7 @@ object ColumnStatsIndexSupport {
/**
* @VisibleForTesting
*/
def composeIndexSchema(targetColumnNames: Seq[String], indexedColumns: Set[String], tableSchema: StructType): (StructType, Seq[String]) = {
def composeIndexSchema(targetColumnNames: Seq[String], indexedColumns: Seq[String], tableSchema: StructType): (StructType, Seq[String]) = {
val fileNameField = StructField(HoodieMetadataPayload.COLUMN_STATS_FIELD_FILE_NAME, StringType, nullable = true, Metadata.empty)
val valueCountField = StructField(HoodieMetadataPayload.COLUMN_STATS_FIELD_VALUE_COUNT, LongType, nullable = true, Metadata.empty)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -241,7 +241,7 @@ public static Dataset<Row> buildColumnStatsTableFor(

StructType indexSchema = ColumnStatsIndexSupport$.MODULE$.composeIndexSchema(
JavaScalaConverters.<String>convertJavaListToScalaSeq(columnNames),
JavaScalaConverters.convertJavaListToScalaList(columnNames).toSet(),
JavaScalaConverters.convertJavaListToScalaList(columnNames),
StructType$.MODULE$.apply(orderedColumnSchemas)
)._1;

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
{"c1_maxValue":959,"c1_minValue":0,"c1_nullCount":0,"c2_maxValue":" 959sdc","c2_minValue":" 0sdc","c2_nullCount":0,"c3_maxValue":994.355,"c3_minValue":19.000,"c3_nullCount":0,"c4_maxValue":"2021-11-19T20:40:55.550-08:00","c4_minValue":"2021-11-19T20:40:55.339-08:00","c4_nullCount":0,"c5_maxValue":97,"c5_minValue":1,"c5_nullCount":0,"c6_maxValue":"2020-11-22","c6_minValue":"2020-01-01","c6_nullCount":0,"c7_maxValue":"1Q==","c7_minValue":"AA==","c7_nullCount":0,"c8_maxValue":9,"c8_minValue":9,"c8_nullCount":0,"valueCount":40}
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ class TestDataSkippingUtils extends HoodieSparkClientTestBase with SparkAdapterS
)
)

val (indexSchema: StructType, targetIndexedColumns: Seq[String]) = composeIndexSchema(indexedCols, indexedCols.toSet, sourceTableSchema)
val (indexSchema: StructType, targetIndexedColumns: Seq[String]) = composeIndexSchema(indexedCols, indexedCols, sourceTableSchema)

@ParameterizedTest
@MethodSource(Array(
Expand Down
Loading

0 comments on commit e244110

Please sign in to comment.