Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[HUDI-8680] Enabling partition stats by default #12671

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,9 @@ private HoodieMetadataFileSystemView getMetadataView() {
if (metadataView == null || !metadataView.equals(metadata.getMetadataFileSystemView())) {
ValidationUtils.checkState(metadata != null, "Metadata table not initialized");
ValidationUtils.checkState(dataMetaClient != null, "Data table meta client not initialized");
if (metadataView != null) {
metadataView.close();
}
codope marked this conversation as resolved.
Show resolved Hide resolved
metadataView = new HoodieMetadataFileSystemView(dataMetaClient, dataMetaClient.getActiveTimeline(), metadata);
}
return metadataView;
Expand Down Expand Up @@ -528,16 +531,17 @@ private String generateUniqueInstantTime(String initializationTime) {
}

private Pair<Integer, HoodieData<HoodieRecord>> initializePartitionStatsIndex() throws IOException {
HoodieData<HoodieRecord> records = HoodieTableMetadataUtil.convertFilesToPartitionStatsRecords(engineContext, getPartitionFileSlicePairs(), dataWriteConfig.getMetadataConfig(), dataMetaClient,
Option.of(new Schema.Parser().parse(dataWriteConfig.getWriteSchema())), Option.of(dataWriteConfig.getRecordMerger().getRecordType()));
HoodieData<HoodieRecord> records = HoodieTableMetadataUtil.convertFilesToPartitionStatsRecords(engineContext, getPartitionFileSlicePairs(), dataWriteConfig.getMetadataConfig(),
dataMetaClient, Option.empty(), Option.of(dataWriteConfig.getRecordMerger().getRecordType()));
final int fileGroupCount = dataWriteConfig.getMetadataConfig().getPartitionStatsIndexFileGroupCount();
return Pair.of(fileGroupCount, records);
}

private Pair<List<String>, Pair<Integer, HoodieData<HoodieRecord>>> initializeColumnStatsPartition(Map<String, Map<String, Long>> partitionToFilesMap) {
// Find the columns to index
Lazy<Option<Schema>> tableSchema = Lazy.lazily(() -> HoodieTableMetadataUtil.tryResolveSchemaForTable(dataMetaClient));
final List<String> columnsToIndex = HoodieTableMetadataUtil.getColumnsToIndex(dataMetaClient.getTableConfig(),
dataWriteConfig.getMetadataConfig(), Lazy.lazily(() -> HoodieTableMetadataUtil.tryResolveSchemaForTable(dataMetaClient)), true,
dataWriteConfig.getMetadataConfig(), tableSchema, true,
Option.of(dataWriteConfig.getRecordMerger().getRecordType()));

final int fileGroupCount = dataWriteConfig.getMetadataConfig().getColumnStatsIndexFileGroupCount();
Expand Down Expand Up @@ -1672,22 +1676,21 @@ private void fetchOutofSyncFilesRecordsFromMetadataTable(Map<String, DirectoryIn
}

private HoodieData<HoodieRecord> getRecordIndexReplacedRecords(HoodieReplaceCommitMetadata replaceCommitMetadata) {
try (HoodieMetadataFileSystemView fsView = getMetadataView()) {
List<Pair<String, HoodieBaseFile>> partitionBaseFilePairs = replaceCommitMetadata
.getPartitionToReplaceFileIds()
.keySet().stream()
.flatMap(partition -> fsView.getLatestBaseFiles(partition).map(f -> Pair.of(partition, f)))
.collect(Collectors.toList());
return readRecordKeysFromBaseFiles(
engineContext,
dataWriteConfig,
partitionBaseFilePairs,
true,
dataWriteConfig.getMetadataConfig().getRecordIndexMaxParallelism(),
dataMetaClient.getBasePath(),
storageConf,
this.getClass().getSimpleName());
}
HoodieMetadataFileSystemView fsView = getMetadataView();
List<Pair<String, HoodieBaseFile>> partitionBaseFilePairs = replaceCommitMetadata
.getPartitionToReplaceFileIds()
.keySet().stream()
.flatMap(partition -> fsView.getLatestBaseFiles(partition).map(f -> Pair.of(partition, f)))
.collect(Collectors.toList());
return readRecordKeysFromBaseFiles(
engineContext,
dataWriteConfig,
partitionBaseFilePairs,
true,
dataWriteConfig.getMetadataConfig().getRecordIndexMaxParallelism(),
dataMetaClient.getBasePath(),
storageConf,
this.getClass().getSimpleName());
}

private HoodieData<HoodieRecord> getRecordIndexAdditionalUpserts(HoodieData<HoodieRecord> updatesFromWriteStatuses, HoodieCommitMetadata commitMetadata) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -342,7 +342,8 @@ public void testKeepLatestFileVersions() throws Exception {
public void testKeepLatestFileVersionsWithBootstrapFileClean() throws Exception {
HoodieWriteConfig config =
HoodieWriteConfig.newBuilder().withPath(basePath)
.withMetadataConfig(HoodieMetadataConfig.newBuilder().withMetadataIndexColumnStats(false).build())
.withMetadataConfig(HoodieMetadataConfig.newBuilder().withMetadataIndexColumnStats(false)
.withMetadataIndexPartitionStats(false).build())
.withCleanConfig(HoodieCleanConfig.newBuilder()
.withCleanBootstrapBaseFileEnabled(true)
.withCleanerParallelism(1)
Expand Down Expand Up @@ -377,6 +378,7 @@ public void testKeepLatestFileVersionsWithBootstrapFileClean() throws Exception
Map<String, List<Pair<String, Integer>>> c2PartitionToFilesNameLengthMap = new HashMap<>();
c2PartitionToFilesNameLengthMap.put(p0, Arrays.asList(Pair.of(file1P0C0, 101), Pair.of(file2P0C1, 100)));
c2PartitionToFilesNameLengthMap.put(p1, Arrays.asList(Pair.of(file1P1C0, 201), Pair.of(file2P1C1, 200)));
testTable = HoodieMetadataTestTable.of(metaClient, getMetadataWriter(config), Option.of(context));
testTable.doWriteOperation("00000000000003", WriteOperationType.UPSERT, Collections.emptyList(),
c2PartitionToFilesNameLengthMap, false, false);

Expand Down
33 changes: 15 additions & 18 deletions hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,6 @@
import static org.apache.hudi.common.util.DateTimeUtils.microsToInstant;
import static org.apache.hudi.common.util.StringUtils.getUTF8Bytes;
import static org.apache.hudi.common.util.ValidationUtils.checkState;
import static org.apache.hudi.metadata.HoodieMetadataPayload.SCHEMA_FIELD_ID_COLUMN_STATS;
import static org.apache.hudi.metadata.HoodieTableMetadataUtil.tryUpcastDecimal;

/**
Expand Down Expand Up @@ -1465,26 +1464,20 @@ public static Object wrapValueIntoAvro(Comparable<?> value) {
}
}

public static Comparable<?> unwrapAvroValueWrapper(Object avroValueWrapper) {
return unwrapAvroValueWrapper(avroValueWrapper, false, Option.empty(), Option.empty());
}

/**
* Unwraps Avro value wrapper into Java value.
*
* @param avroValueWrapper A wrapped value with Avro type wrapper.
* @return Java value.
*/
public static Comparable<?> unwrapAvroValueWrapper(Object avroValueWrapper, boolean handleObfuscatedFlow, Option<String> fieldName, Option<GenericRecord> record) {
public static Comparable<?> unwrapAvroValueWrapper(Object avroValueWrapper) {
if (avroValueWrapper == null) {
return null;
}

if (handleObfuscatedFlow) {
Pair<Boolean, String> isValueWrapperObfuscated = getIsValueWrapperObfuscated(record.get(), fieldName.get());
if (isValueWrapperObfuscated.getKey()) {
return unwrapAvroValueWrapper(avroValueWrapper, isValueWrapperObfuscated.getValue());
}
Pair<Boolean, String> isValueWrapperObfuscated = getIsValueWrapperObfuscated(avroValueWrapper);
if (isValueWrapperObfuscated.getKey()) {
return unwrapAvroValueWrapper(avroValueWrapper, isValueWrapperObfuscated.getValue());
}

if (avroValueWrapper instanceof DateWrapper) {
Expand Down Expand Up @@ -1525,27 +1518,31 @@ public static Comparable<?> unwrapAvroValueWrapper(Object avroValueWrapper, Stri
if (avroValueWrapper == null) {
return null;
} else if (DateWrapper.class.getSimpleName().equals(wrapperClassName)) {
return Date.valueOf(LocalDate.ofEpochDay((Integer)((Record) avroValueWrapper).get(0)));
ValidationUtils.checkArgument(avroValueWrapper instanceof GenericRecord);
return Date.valueOf(LocalDate.ofEpochDay((Integer) ((GenericRecord) avroValueWrapper).get(0)));
} else if (LocalDateWrapper.class.getSimpleName().equals(wrapperClassName)) {
return LocalDate.ofEpochDay((Integer)((Record) avroValueWrapper).get(0));
ValidationUtils.checkArgument(avroValueWrapper instanceof GenericRecord);
return LocalDate.ofEpochDay((Integer) ((GenericRecord) avroValueWrapper).get(0));
} else if (TimestampMicrosWrapper.class.getSimpleName().equals(wrapperClassName)) {
Instant instant = microsToInstant((Long)((Record) avroValueWrapper).get(0));
ValidationUtils.checkArgument(avroValueWrapper instanceof GenericRecord);
Instant instant = microsToInstant((Long) ((GenericRecord) avroValueWrapper).get(0));
return Timestamp.from(instant);
} else if (DecimalWrapper.class.getSimpleName().equals(wrapperClassName)) {
Schema valueSchema = DecimalWrapper.SCHEMA$.getField("value").schema();
return AVRO_DECIMAL_CONVERSION.fromBytes((ByteBuffer) ((Record) avroValueWrapper).get(0), valueSchema, valueSchema.getLogicalType());
ValidationUtils.checkArgument(avroValueWrapper instanceof GenericRecord);
return AVRO_DECIMAL_CONVERSION.fromBytes((ByteBuffer)((GenericRecord) avroValueWrapper).get(0), valueSchema, valueSchema.getLogicalType());
} else {
throw new UnsupportedOperationException(String.format("Unsupported type of the value (%s)", avroValueWrapper.getClass()));
}
}

private static Pair<Boolean, String> getIsValueWrapperObfuscated(GenericRecord record, String subFieldName) {
Object statsValue = ((GenericRecord) record.get(SCHEMA_FIELD_ID_COLUMN_STATS)).get(subFieldName);
private static Pair<Boolean, String> getIsValueWrapperObfuscated(Object statsValue) {
if (statsValue != null) {
String statsValueSchemaClassName = ((GenericRecord) statsValue).getSchema().getName();
boolean toReturn = statsValueSchemaClassName.equals(DateWrapper.class.getSimpleName())
|| statsValueSchemaClassName.equals(LocalDateWrapper.class.getSimpleName())
|| statsValueSchemaClassName.equals(TimestampMicrosWrapper.class.getSimpleName());
|| statsValueSchemaClassName.equals(TimestampMicrosWrapper.class.getSimpleName())
|| statsValueSchemaClassName.equals(DecimalWrapper.class.getSimpleName());
if (toReturn) {
return Pair.of(true, ((GenericRecord) statsValue).getSchema().getName());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -890,6 +890,7 @@ public Builder withDropMetadataIndex(String indexName) {
public HoodieMetadataConfig build() {
metadataConfig.setDefaultValue(ENABLE, getDefaultMetadataEnable(engineType));
metadataConfig.setDefaultValue(ENABLE_METADATA_INDEX_COLUMN_STATS, getDefaultColStatsEnable(engineType));
metadataConfig.setDefaultValue(ENABLE_METADATA_INDEX_PARTITION_STATS, metadataConfig.isColumnStatsIndexEnabled());
// fix me: disable when schema on read is enabled.
metadataConfig.setDefaults(HoodieMetadataConfig.class.getName());
return metadataConfig;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2437,7 +2437,7 @@ public static HoodieData<HoodieRecord> convertFilesToPartitionStatsRecords(Hoodi
}
Lazy<Option<Schema>> lazyWriterSchemaOpt = writerSchemaOpt.isPresent() ? Lazy.eagerly(writerSchemaOpt) : Lazy.lazily(() -> tryResolveSchemaForTable(dataTableMetaClient));
final List<String> columnsToIndex = getColumnsToIndex(dataTableMetaClient.getTableConfig(), metadataConfig, lazyWriterSchemaOpt,
dataTableMetaClient.getActiveTimeline().filterCompletedInstants().empty(), recordTypeOpt);
dataTableMetaClient.getActiveTimeline().getWriteTimeline().filterCompletedInstants().empty(), recordTypeOpt);
codope marked this conversation as resolved.
Show resolved Hide resolved
if (columnsToIndex.isEmpty()) {
LOG.warn("No columns to index for partition stats index");
return engineContext.emptyHoodieData();
Expand Down Expand Up @@ -2535,11 +2535,7 @@ public static HoodieData<HoodieRecord> convertMetadataToPartitionStatRecords(Hoo
if (columnsToIndex.isEmpty()) {
return engineContext.emptyHoodieData();
}
// filter columns with only supported types
final List<String> validColumnsToIndex = columnsToIndex.stream()
.filter(col -> SUPPORTED_META_FIELDS_PARTITION_STATS.contains(col) || validateDataTypeForPartitionStats(col, writerSchemaOpt.get().get()))
.collect(Collectors.toList());
LOG.debug("Indexing following columns for partition stats index: {}", validColumnsToIndex);
LOG.debug("Indexing following columns for partition stats index: {}", columnsToIndex);
// Group by partitionPath and then gather write stats lists,
// where each inner list contains HoodieWriteStat objects that have the same partitionPath.
List<List<HoodieWriteStat>> partitionedWriteStats = new ArrayList<>(allWriteStats.stream()
Expand All @@ -2553,8 +2549,8 @@ public static HoodieData<HoodieRecord> convertMetadataToPartitionStatRecords(Hoo
final String partitionName = partitionedWriteStat.get(0).getPartitionPath();
// Step 1: Collect Column Metadata for Each File part of current commit metadata
List<HoodieColumnRangeMetadata<Comparable>> fileColumnMetadata = partitionedWriteStat.stream()
.flatMap(writeStat -> translateWriteStatToFileStats(writeStat, dataMetaClient, validColumnsToIndex, tableSchema).stream())
.collect(toList());
.flatMap(writeStat -> translateWriteStatToFileStats(writeStat, dataMetaClient, columnsToIndex, tableSchema).stream()).collect(toList());

if (shouldScanColStatsForTightBound) {
checkState(tableMetadata != null, "tableMetadata should not be null when scanning metadata table");
// Collect Column Metadata for Each File part of active file system view of latest snapshot
Expand All @@ -2567,7 +2563,7 @@ public static HoodieData<HoodieRecord> convertMetadataToPartitionStatRecords(Hoo
.collect(Collectors.toSet());
// Fetch metadata table COLUMN_STATS partition records for above files
List<HoodieColumnRangeMetadata<Comparable>> partitionColumnMetadata = tableMetadata
.getRecordsByKeyPrefixes(generateKeyPrefixes(validColumnsToIndex, partitionName), MetadataPartitionType.COLUMN_STATS.getPartitionPath(), false)
.getRecordsByKeyPrefixes(generateKeyPrefixes(columnsToIndex, partitionName), MetadataPartitionType.COLUMN_STATS.getPartitionPath(), false)
// schema and properties are ignored in getInsertValue, so simply pass as null
.map(record -> ((HoodieMetadataPayload)record.getData()).getColumnStatMetadata())
.filter(Option::isPresent)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.model.HoodieIndexDefinition;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.index.expression.HoodieExpressionIndex;
Expand Down Expand Up @@ -309,8 +308,8 @@ private static void constructColumnStatsMetadataPayload(HoodieMetadataPayload pa
// AVRO-2377 1.9.2 Modified the type of org.apache.avro.Schema#FIELD_RESERVED to Collections.unmodifiableSet.
// This causes Kryo to fail when deserializing a GenericRecord, See HUDI-5484.
// We should avoid using GenericRecord and convert GenericRecord into a serializable type.
.setMinValue(wrapValueIntoAvro(unwrapAvroValueWrapper(columnStatsRecord.get(COLUMN_STATS_FIELD_MIN_VALUE), true, Option.of(COLUMN_STATS_FIELD_MIN_VALUE), Option.of(record))))
.setMaxValue(wrapValueIntoAvro(unwrapAvroValueWrapper(columnStatsRecord.get(COLUMN_STATS_FIELD_MAX_VALUE), true, Option.of(COLUMN_STATS_FIELD_MAX_VALUE), Option.of(record))))
.setMinValue(wrapValueIntoAvro(unwrapAvroValueWrapper(columnStatsRecord.get(COLUMN_STATS_FIELD_MIN_VALUE))))
.setMaxValue(wrapValueIntoAvro(unwrapAvroValueWrapper(columnStatsRecord.get(COLUMN_STATS_FIELD_MAX_VALUE))))
.setValueCount((Long) columnStatsRecord.get(COLUMN_STATS_FIELD_VALUE_COUNT))
.setNullCount((Long) columnStatsRecord.get(COLUMN_STATS_FIELD_NULL_COUNT))
.setTotalSize((Long) columnStatsRecord.get(COLUMN_STATS_FIELD_TOTAL_SIZE))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,6 @@
import java.nio.ByteBuffer;
import java.sql.Date;
import java.sql.Timestamp;
import java.time.Instant;
import java.time.LocalDate;
import java.time.temporal.ChronoUnit;
import java.util.ArrayDeque;
Expand Down Expand Up @@ -715,7 +714,7 @@ public void testWrapAndUnwrapJavaValues(Comparable value, Class expectedWrapper)
assertEquals(((Timestamp) value).getTime() * 1000L,
((GenericRecord) wrapperValue).get(0));
assertEquals(((Timestamp) value).getTime(),
((Instant) unwrapAvroValueWrapper(wrapperValue)).toEpochMilli());
((Timestamp) unwrapAvroValueWrapper(wrapperValue)).getTime());
} else if (value instanceof Date) {
assertEquals((int) ChronoUnit.DAYS.between(
LocalDate.ofEpochDay(0), ((Date) value).toLocalDate()),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ public void testPartitionEnabledByConfigOnly(MetadataPartitionType partitionType
break;
case PARTITION_STATS:
metadataConfigBuilder.enable(true).withMetadataIndexPartitionStats(true).withColumnStatsIndexForColumns("partitionCol");
expectedEnabledPartitions = 3;
expectedEnabledPartitions = 2;
break;
default:
throw new IllegalArgumentException("Unknown partition type: " + partitionType);
Expand All @@ -93,10 +93,10 @@ public void testPartitionEnabledByConfigOnly(MetadataPartitionType partitionType

// Verify partition type is enabled due to config
if (partitionType == MetadataPartitionType.EXPRESSION_INDEX || partitionType == MetadataPartitionType.SECONDARY_INDEX) {
assertEquals(2 + 1, enabledPartitions.size(), "EXPRESSION_INDEX should be enabled by SQL, only FILES and SECONDARY_INDEX is enabled in this case.");
assertEquals(2 + 2, enabledPartitions.size(), "EXPRESSION_INDEX should be enabled by SQL, only FILES and SECONDARY_INDEX is enabled in this case.");
assertTrue(enabledPartitions.contains(MetadataPartitionType.FILES));
} else {
assertEquals(expectedEnabledPartitions + 1, enabledPartitions.size());
assertEquals(expectedEnabledPartitions + 2, enabledPartitions.size());
assertTrue(enabledPartitions.contains(partitionType) || MetadataPartitionType.ALL_PARTITIONS.equals(partitionType));
}
}
Expand All @@ -116,7 +116,7 @@ public void testPartitionAvailableByMetaClientOnly() {
List<MetadataPartitionType> enabledPartitions = MetadataPartitionType.getEnabledPartitions(metadataConfig.getProps(), metaClient);

// Verify RECORD_INDEX and FILES is enabled due to availability, and SECONDARY_INDEX by default
assertEquals(4, enabledPartitions.size(), "RECORD_INDEX, SECONDARY_INDEX, FILES, COL_STATS should be available");
assertEquals(5, enabledPartitions.size(), "RECORD_INDEX, SECONDARY_INDEX, FILES, COL_STATS, PARTITION_STATS should be available");
assertTrue(enabledPartitions.contains(MetadataPartitionType.FILES), "FILES should be enabled by availability");
assertTrue(enabledPartitions.contains(MetadataPartitionType.RECORD_INDEX), "RECORD_INDEX should be enabled by availability");
assertTrue(enabledPartitions.contains(MetadataPartitionType.SECONDARY_INDEX), "SECONDARY_INDEX should be enabled by default");
Expand Down Expand Up @@ -155,8 +155,8 @@ public void testExpressionIndexPartitionEnabled() {

List<MetadataPartitionType> enabledPartitions = MetadataPartitionType.getEnabledPartitions(metadataConfig.getProps(), metaClient);

// Verify EXPRESSION_INDEX and FILES is enabled due to availability, and SECONDARY_INDEX by default
assertEquals(4, enabledPartitions.size(), "EXPRESSION_INDEX, FILES, COL_STATS and SECONDARY_INDEX should be available");
// Verify EXPRESSION_INDEX and FILES is enabled due to availability, and SECONDARY_INDEX, COL_STATS and PARTITION_STATS by default
assertEquals(5, enabledPartitions.size(), "EXPRESSION_INDEX, FILES, COL_STATS and SECONDARY_INDEX should be available");
assertTrue(enabledPartitions.contains(MetadataPartitionType.FILES), "FILES should be enabled by availability");
assertTrue(enabledPartitions.contains(MetadataPartitionType.EXPRESSION_INDEX), "EXPRESSION_INDEX should be enabled by availability");
assertTrue(enabledPartitions.contains(MetadataPartitionType.SECONDARY_INDEX), "SECONDARY_INDEX should be enabled by default");
Expand Down
Loading
Loading