diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java index 1cc04b9860e8d..ecb4e070cab71 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java @@ -1058,6 +1058,9 @@ private boolean shouldDeleteMetadataPartition(MetadataPartitionType partitionTyp case COLUMN_STATS: metadataIndexDisabled = !config.isMetadataColumnStatsIndexEnabled(); break; + case PARTITION_STATS: + metadataIndexDisabled = !config.isPartitionStatsIndexEnabled(); + break; case BLOOM_FILTERS: metadataIndexDisabled = !config.isMetadataBloomFilterIndexEnabled(); break; diff --git a/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java b/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java index 25974784d49c9..e02d94d2a295d 100644 --- a/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java +++ b/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java @@ -109,7 +109,6 @@ import static org.apache.hudi.common.util.DateTimeUtils.microsToInstant; import static org.apache.hudi.common.util.StringUtils.getUTF8Bytes; import static org.apache.hudi.common.util.ValidationUtils.checkState; -import static org.apache.hudi.metadata.HoodieMetadataPayload.SCHEMA_FIELD_ID_COLUMN_STATS; import static org.apache.hudi.metadata.HoodieTableMetadataUtil.tryUpcastDecimal; /** @@ -1474,16 +1473,14 @@ public static Comparable unwrapAvroValueWrapper(Object avroValueWrapper) { * @param avroValueWrapper A wrapped value with Avro type wrapper. * @return Java value. */ - public static Comparable unwrapAvroValueWrapper(Object avroValueWrapper, boolean handleObfuscatedFlow, Option fieldName, Option record) { + public static Comparable unwrapAvroValueWrapper(Object avroValueWrapper, boolean handleObfuscatedFlow, Option fieldName, Option colStatsRecord) { if (avroValueWrapper == null) { return null; } - if (handleObfuscatedFlow) { - Pair isValueWrapperObfuscated = getIsValueWrapperObfuscated(record.get(), fieldName.get()); - if (isValueWrapperObfuscated.getKey()) { - return unwrapAvroValueWrapper(avroValueWrapper, isValueWrapperObfuscated.getValue()); - } + Pair isValueWrapperObfuscated = getIsValueWrapperObfuscated(avroValueWrapper); + if (isValueWrapperObfuscated.getKey()) { + return unwrapAvroValueWrapper(avroValueWrapper, isValueWrapperObfuscated.getValue()); } if (avroValueWrapper instanceof DateWrapper) { @@ -1524,27 +1521,44 @@ public static Comparable unwrapAvroValueWrapper(Object avroValueWrapper, Stri if (avroValueWrapper == null) { return null; } else if (DateWrapper.class.getSimpleName().equals(wrapperClassName)) { - return Date.valueOf(LocalDate.ofEpochDay((Integer)((Record) avroValueWrapper).get(0))); + if (avroValueWrapper instanceof GenericRecord) { + return Date.valueOf(LocalDate.ofEpochDay((Integer) ((GenericRecord) avroValueWrapper).get(0))); + } else { + return Date.valueOf(LocalDate.ofEpochDay((Integer) ((Record) avroValueWrapper).get(0))); + } } else if (LocalDateWrapper.class.getSimpleName().equals(wrapperClassName)) { - return LocalDate.ofEpochDay((Integer)((Record) avroValueWrapper).get(0)); + if (avroValueWrapper instanceof GenericRecord) { + return LocalDate.ofEpochDay((Integer) ((GenericRecord) avroValueWrapper).get(0)); + } else { + return LocalDate.ofEpochDay((Integer) ((Record) avroValueWrapper).get(0)); + } } else if (TimestampMicrosWrapper.class.getSimpleName().equals(wrapperClassName)) { - Instant instant = microsToInstant((Long)((Record) avroValueWrapper).get(0)); - return Timestamp.from(instant); + if (avroValueWrapper instanceof GenericRecord) { + Instant instant = microsToInstant((Long) ((GenericRecord) avroValueWrapper).get(0)); + return Timestamp.from(instant); + } else { + Instant instant = microsToInstant((Long) ((Record) avroValueWrapper).get(0)); + return Timestamp.from(instant); + } } else if (DecimalWrapper.class.getSimpleName().equals(wrapperClassName)) { Schema valueSchema = DecimalWrapper.SCHEMA$.getField("value").schema(); - return AVRO_DECIMAL_CONVERSION.fromBytes((ByteBuffer) ((Record) avroValueWrapper).get(0), valueSchema, valueSchema.getLogicalType()); + if (avroValueWrapper instanceof GenericRecord) { + return AVRO_DECIMAL_CONVERSION.fromBytes((ByteBuffer)((GenericRecord) avroValueWrapper).get(0), valueSchema, valueSchema.getLogicalType()); + } else { + return AVRO_DECIMAL_CONVERSION.fromBytes((ByteBuffer) ((Record) avroValueWrapper).get(0), valueSchema, valueSchema.getLogicalType()); + } } else { throw new UnsupportedOperationException(String.format("Unsupported type of the value (%s)", avroValueWrapper.getClass())); } } - private static Pair getIsValueWrapperObfuscated(GenericRecord record, String subFieldName) { - Object statsValue = ((GenericRecord) record.get(SCHEMA_FIELD_ID_COLUMN_STATS)).get(subFieldName); + private static Pair getIsValueWrapperObfuscated(Object statsValue) { if (statsValue != null) { String statsValueSchemaClassName = ((GenericRecord) statsValue).getSchema().getName(); boolean toReturn = statsValueSchemaClassName.equals(DateWrapper.class.getSimpleName()) || statsValueSchemaClassName.equals(LocalDateWrapper.class.getSimpleName()) - || statsValueSchemaClassName.equals(TimestampMicrosWrapper.class.getSimpleName()); + || statsValueSchemaClassName.equals(TimestampMicrosWrapper.class.getSimpleName()) + || statsValueSchemaClassName.equals(DecimalWrapper.class.getSimpleName()); if (toReturn) { return Pair.of(true, ((GenericRecord) statsValue).getSchema().getName()); } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieMetadataConfig.java b/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieMetadataConfig.java index d30b0e0d63522..da458bca13965 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieMetadataConfig.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieMetadataConfig.java @@ -763,6 +763,7 @@ public Builder withSecondaryIndexParallelism(int parallelism) { public HoodieMetadataConfig build() { metadataConfig.setDefaultValue(ENABLE, getDefaultMetadataEnable(engineType)); metadataConfig.setDefaultValue(ENABLE_METADATA_INDEX_COLUMN_STATS, getDefaultColStatsEnable(engineType)); + metadataConfig.setDefaultValue(ENABLE_METADATA_INDEX_PARTITION_STATS, getDefaultColStatsEnable(engineType)); // fix me: disable when schema on read is enabled. metadataConfig.setDefaults(HoodieMetadataConfig.class.getName()); return metadataConfig; diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/FileFormatUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/util/FileFormatUtils.java index 72fe9f0f4d027..16cc24e0d47cf 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/util/FileFormatUtils.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/util/FileFormatUtils.java @@ -66,7 +66,14 @@ public static > HoodieColumnRangeMetadata getColumnRa .map(e -> HoodieColumnRangeMetadata.create( relativePartitionPath, e.getColumnName(), e.getMinValue(), e.getMaxValue(), e.getNullCount(), e.getValueCount(), e.getTotalSize(), e.getTotalUncompressedSize())) - .reduce(HoodieColumnRangeMetadata::merge).orElseThrow(() -> new HoodieException("MergingColumnRanges failed.")); + .reduce((a,b) -> { + try { + return HoodieColumnRangeMetadata.merge(a, b); + } catch (ClassCastException cce) { + System.out.println("asdfasd"); + throw cce; + } + }).orElseThrow(() -> new HoodieException("MergingColumnRanges failed.")); } /** diff --git a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java index bce5b5d7acbb0..63749b610620e 100644 --- a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java +++ b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java @@ -1803,14 +1803,14 @@ private static Comparable coerceToComparable(Schema schema, Object val) { private static boolean isColumnTypeSupported(Schema schema, Option recordType) { // if record type is set and if its AVRO, MAP, ARRAY, RECORD and ENUM types are unsupported. - if (recordType.isPresent() && recordType.get() == HoodieRecordType.AVRO) { - return (schema.getType() != Schema.Type.RECORD && schema.getType() != Schema.Type.ARRAY && schema.getType() != Schema.Type.MAP + //if (recordType.isPresent() && recordType.get() == HoodieRecordType.AVRO) { + return (schema.getType() != Schema.Type.RECORD && schema.getType() != Schema.Type.ARRAY && schema.getType() != Schema.Type.MAP && schema.getType() != Schema.Type.ENUM); - } + //} // if record Type is not set or if recordType is SPARK then we cannot support AVRO, MAP, ARRAY, RECORD, ENUM and FIXED and BYTES type as well. // HUDI-8585 will add support for BYTES and FIXED - return schema.getType() != Schema.Type.RECORD && schema.getType() != Schema.Type.ARRAY && schema.getType() != Schema.Type.MAP - && schema.getType() != Schema.Type.ENUM && schema.getType() != Schema.Type.BYTES && schema.getType() != Schema.Type.FIXED; + //return schema.getType() != Schema.Type.RECORD && schema.getType() != Schema.Type.ARRAY && schema.getType() != Schema.Type.MAP + // && schema.getType() != Schema.Type.ENUM && schema.getType() != Schema.Type.BYTES && schema.getType() != Schema.Type.FIXED; } public static Set getInflightMetadataPartitions(HoodieTableConfig tableConfig) { @@ -2540,6 +2540,8 @@ public static HoodieData convertMetadataToPartitionStatRecords(Hoo List> fileColumnMetadata = partitionedWriteStat.stream() .flatMap(writeStat -> translateWriteStatToFileStats(writeStat, dataMetaClient, validColumnsToIndex, tableSchema).stream()) .collect(toList()); + + if (shouldScanColStatsForTightBound) { checkState(tableMetadata != null, "tableMetadata should not be null when scanning metadata table"); // Collect Column Metadata for Each File part of active file system view of latest snapshot diff --git a/hudi-common/src/main/java/org/apache/hudi/metadata/MetadataPartitionType.java b/hudi-common/src/main/java/org/apache/hudi/metadata/MetadataPartitionType.java index 812a115f8d0df..0f25442cb9259 100644 --- a/hudi-common/src/main/java/org/apache/hudi/metadata/MetadataPartitionType.java +++ b/hudi-common/src/main/java/org/apache/hudi/metadata/MetadataPartitionType.java @@ -302,8 +302,8 @@ private static void constructColumnStatsMetadataPayload(HoodieMetadataPayload pa // AVRO-2377 1.9.2 Modified the type of org.apache.avro.Schema#FIELD_RESERVED to Collections.unmodifiableSet. // This causes Kryo to fail when deserializing a GenericRecord, See HUDI-5484. // We should avoid using GenericRecord and convert GenericRecord into a serializable type. - .setMinValue(wrapValueIntoAvro(unwrapAvroValueWrapper(columnStatsRecord.get(COLUMN_STATS_FIELD_MIN_VALUE), true, Option.of(COLUMN_STATS_FIELD_MIN_VALUE), Option.of(record)))) - .setMaxValue(wrapValueIntoAvro(unwrapAvroValueWrapper(columnStatsRecord.get(COLUMN_STATS_FIELD_MAX_VALUE), true, Option.of(COLUMN_STATS_FIELD_MAX_VALUE), Option.of(record)))) + .setMinValue(wrapValueIntoAvro(unwrapAvroValueWrapper(columnStatsRecord.get(COLUMN_STATS_FIELD_MIN_VALUE), true, Option.of(COLUMN_STATS_FIELD_MIN_VALUE), Option.of(columnStatsRecord)))) + .setMaxValue(wrapValueIntoAvro(unwrapAvroValueWrapper(columnStatsRecord.get(COLUMN_STATS_FIELD_MAX_VALUE), true, Option.of(COLUMN_STATS_FIELD_MAX_VALUE), Option.of(columnStatsRecord)))) .setValueCount((Long) columnStatsRecord.get(COLUMN_STATS_FIELD_VALUE_COUNT)) .setNullCount((Long) columnStatsRecord.get(COLUMN_STATS_FIELD_NULL_COUNT)) .setTotalSize((Long) columnStatsRecord.get(COLUMN_STATS_FIELD_TOTAL_SIZE)) diff --git a/hudi-common/src/test/java/org/apache/hudi/metadata/TestMetadataPartitionType.java b/hudi-common/src/test/java/org/apache/hudi/metadata/TestMetadataPartitionType.java index 63d95d0e113b1..ea1942b95d3d1 100644 --- a/hudi-common/src/test/java/org/apache/hudi/metadata/TestMetadataPartitionType.java +++ b/hudi-common/src/test/java/org/apache/hudi/metadata/TestMetadataPartitionType.java @@ -83,7 +83,7 @@ public void testPartitionEnabledByConfigOnly(MetadataPartitionType partitionType break; case PARTITION_STATS: metadataConfigBuilder.enable(true).withMetadataIndexPartitionStats(true).withColumnStatsIndexForColumns("partitionCol"); - expectedEnabledPartitions = 3; + expectedEnabledPartitions = 2; break; default: throw new IllegalArgumentException("Unknown partition type: " + partitionType); @@ -93,10 +93,10 @@ public void testPartitionEnabledByConfigOnly(MetadataPartitionType partitionType // Verify partition type is enabled due to config if (partitionType == MetadataPartitionType.EXPRESSION_INDEX || partitionType == MetadataPartitionType.SECONDARY_INDEX) { - assertEquals(2 + 1, enabledPartitions.size(), "EXPRESSION_INDEX should be enabled by SQL, only FILES and SECONDARY_INDEX is enabled in this case."); + assertEquals(2 + 2, enabledPartitions.size(), "EXPRESSION_INDEX should be enabled by SQL, only FILES and SECONDARY_INDEX is enabled in this case."); assertTrue(enabledPartitions.contains(MetadataPartitionType.FILES)); } else { - assertEquals(expectedEnabledPartitions + 1, enabledPartitions.size()); + assertEquals(expectedEnabledPartitions + 2, enabledPartitions.size()); assertTrue(enabledPartitions.contains(partitionType) || MetadataPartitionType.ALL_PARTITIONS.equals(partitionType)); } } @@ -116,7 +116,7 @@ public void testPartitionAvailableByMetaClientOnly() { List enabledPartitions = MetadataPartitionType.getEnabledPartitions(metadataConfig.getProps(), metaClient); // Verify RECORD_INDEX and FILES is enabled due to availability, and SECONDARY_INDEX by default - assertEquals(4, enabledPartitions.size(), "RECORD_INDEX, SECONDARY_INDEX, FILES, COL_STATS should be available"); + assertEquals(5, enabledPartitions.size(), "RECORD_INDEX, SECONDARY_INDEX, FILES, COL_STATS, PARTITION_STATS should be available"); assertTrue(enabledPartitions.contains(MetadataPartitionType.FILES), "FILES should be enabled by availability"); assertTrue(enabledPartitions.contains(MetadataPartitionType.RECORD_INDEX), "RECORD_INDEX should be enabled by availability"); assertTrue(enabledPartitions.contains(MetadataPartitionType.SECONDARY_INDEX), "SECONDARY_INDEX should be enabled by default"); @@ -155,8 +155,8 @@ public void testExpressionIndexPartitionEnabled() { List enabledPartitions = MetadataPartitionType.getEnabledPartitions(metadataConfig.getProps(), metaClient); - // Verify EXPRESSION_INDEX and FILES is enabled due to availability, and SECONDARY_INDEX by default - assertEquals(4, enabledPartitions.size(), "EXPRESSION_INDEX, FILES, COL_STATS and SECONDARY_INDEX should be available"); + // Verify EXPRESSION_INDEX and FILES is enabled due to availability, and SECONDARY_INDEX, COL_STATS and PARTITION_STATS by default + assertEquals(5, enabledPartitions.size(), "EXPRESSION_INDEX, FILES, COL_STATS and SECONDARY_INDEX should be available"); assertTrue(enabledPartitions.contains(MetadataPartitionType.FILES), "FILES should be enabled by availability"); assertTrue(enabledPartitions.contains(MetadataPartitionType.EXPRESSION_INDEX), "EXPRESSION_INDEX should be enabled by availability"); assertTrue(enabledPartitions.contains(MetadataPartitionType.SECONDARY_INDEX), "SECONDARY_INDEX should be enabled by default"); diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/ColumnStatsIndexSupport.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/ColumnStatsIndexSupport.scala index 1becf68832440..63eff4a8fb6ee 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/ColumnStatsIndexSupport.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/ColumnStatsIndexSupport.scala @@ -229,7 +229,7 @@ class ColumnStatsIndexSupport(spark: SparkSession, // NOTE: It's crucial to maintain appropriate ordering of the columns // matching table layout: hence, we cherry-pick individual columns // instead of simply filtering in the ones we're interested in the schema - val (indexSchema, targetIndexedColumns) = composeIndexSchema(sortedTargetColumnsSet.toSeq, indexedColumns, tableSchema) + val (indexSchema, targetIndexedColumns) = composeIndexSchema(sortedTargetColumnsSet.toSeq, indexedColumns.toSeq, tableSchema) // Here we perform complex transformation which requires us to modify the layout of the rows // of the dataset, and therefore we rely on low-level RDD API to avoid incurring encoding/decoding @@ -401,7 +401,7 @@ object ColumnStatsIndexSupport { /** * @VisibleForTesting */ - def composeIndexSchema(targetColumnNames: Seq[String], indexedColumns: Set[String], tableSchema: StructType): (StructType, Seq[String]) = { + def composeIndexSchema(targetColumnNames: Seq[String], indexedColumns: Seq[String], tableSchema: StructType): (StructType, Seq[String]) = { val fileNameField = StructField(HoodieMetadataPayload.COLUMN_STATS_FIELD_FILE_NAME, StringType, nullable = true, Metadata.empty) val valueCountField = StructField(HoodieMetadataPayload.COLUMN_STATS_FIELD_VALUE_COUNT, LongType, nullable = true, Metadata.empty) diff --git a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/ColumnStatsIndexHelper.java b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/ColumnStatsIndexHelper.java index 269a83bf7ac0d..fe8427e7caa72 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/ColumnStatsIndexHelper.java +++ b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/ColumnStatsIndexHelper.java @@ -241,7 +241,7 @@ public static Dataset buildColumnStatsTableFor( StructType indexSchema = ColumnStatsIndexSupport$.MODULE$.composeIndexSchema( JavaScalaConverters.convertJavaListToScalaSeq(columnNames), - JavaScalaConverters.convertJavaListToScalaList(columnNames).toSet(), + JavaScalaConverters.convertJavaListToScalaList(columnNames), StructType$.MODULE$.apply(orderedColumnSchemas) )._1; diff --git a/hudi-spark-datasource/hudi-spark/src/test/resources/index/colstats/partition-stats-index-table.json b/hudi-spark-datasource/hudi-spark/src/test/resources/index/colstats/partition-stats-index-table.json new file mode 100644 index 0000000000000..bdc71d4a0321d --- /dev/null +++ b/hudi-spark-datasource/hudi-spark/src/test/resources/index/colstats/partition-stats-index-table.json @@ -0,0 +1 @@ +{"c1_maxValue":959,"c1_minValue":0,"c1_nullCount":0,"c2_maxValue":" 959sdc","c2_minValue":" 0sdc","c2_nullCount":0,"c3_maxValue":994.355,"c3_minValue":19.000,"c3_nullCount":0,"c4_maxValue":"2021-11-19T20:40:55.550-08:00","c4_minValue":"2021-11-19T20:40:55.339-08:00","c4_nullCount":0,"c5_maxValue":97,"c5_minValue":1,"c5_nullCount":0,"c6_maxValue":"2020-11-22","c6_minValue":"2020-01-01","c6_nullCount":0,"c7_maxValue":"1Q==","c7_minValue":"AA==","c7_nullCount":0,"c8_maxValue":9,"c8_minValue":9,"c8_nullCount":0,"valueCount":40} \ No newline at end of file diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestDataSkippingUtils.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestDataSkippingUtils.scala index 9494850c5e57d..0331347fc3089 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestDataSkippingUtils.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestDataSkippingUtils.scala @@ -85,7 +85,7 @@ class TestDataSkippingUtils extends HoodieSparkClientTestBase with SparkAdapterS ) ) - val (indexSchema: StructType, targetIndexedColumns: Seq[String]) = composeIndexSchema(indexedCols, indexedCols.toSet, sourceTableSchema) + val (indexSchema: StructType, targetIndexedColumns: Seq[String]) = composeIndexSchema(indexedCols, indexedCols, sourceTableSchema) @ParameterizedTest @MethodSource(Array( diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/ColumnStatIndexTestBase.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/ColumnStatIndexTestBase.scala index 807dd3e4d9e0e..611641541c072 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/ColumnStatIndexTestBase.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/ColumnStatIndexTestBase.scala @@ -32,14 +32,13 @@ import org.apache.hudi.functional.ColumnStatIndexTestBase.ColumnStatsTestCase import org.apache.hudi.storage.StoragePath import org.apache.hudi.storage.hadoop.HadoopStorageConfiguration import org.apache.hudi.testutils.HoodieSparkClientTestBase -import org.apache.hudi.{ColumnStatsIndexSupport, DataSourceWriteOptions} import org.apache.spark.sql._ +import org.apache.hudi.{AvroConversionUtils, ColumnStatsIndexSupport, DataSourceWriteOptions, PartitionStatsIndexSupport} import org.apache.hudi.functional.ColumnStatIndexTestBase.ColumnStatsTestParams import org.apache.hudi.metadata.HoodieTableMetadataUtil import org.apache.hudi.metadata.HoodieTableMetadataUtil.PARTITION_NAME_COLUMN_STATS import org.apache.hudi.testutils.{HoodieSparkClientTestBase, LogFileColStatsTestUtil} import org.apache.hudi.util.JavaScalaConverters.convertScalaListToJavaList -import org.apache.hudi.{ColumnStatsIndexSupport, DataSourceWriteOptions} import org.apache.spark.sql.functions.typedLit import org.apache.spark.sql.functions.{lit, struct, typedLit} import org.apache.spark.sql.types._ @@ -123,18 +122,66 @@ class ColumnStatIndexTestBase extends HoodieSparkClientTestBase { metaClient = HoodieTableMetaClient.reload(metaClient) - if (params.shouldValidate) { + if (params.shouldValidateColStats) { // Currently, routine manually validating the column stats (by actually reading every column of every file) // only supports parquet files. Therefore we skip such validation when delta-log files are present, and only // validate in following cases: (1) COW: all operations; (2) MOR: insert only. val shouldValidateColumnStatsManually = (params.testCase.tableType == HoodieTableType.COPY_ON_WRITE || params.operation.equals(DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL)) && params.shouldValidateManually - validateColumnStatsIndex( - params.testCase, params.metadataOpts, params.expectedColStatsSourcePath, shouldValidateColumnStatsManually, params.validationSortColumns) + validateColumnStatsIndex(params.testCase, params.metadataOpts, params.expectedColStatsSourcePath, + shouldValidateColumnStatsManually, params.validationSortColumns) + } else if (params.shouldValidatePartitionSats) { + val shouldValidateColumnStatsManually = (params.testCase.tableType == HoodieTableType.COPY_ON_WRITE || + params.operation.equals(DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL)) && params.shouldValidateManually + + validatePartitionStatsIndex(params.testCase, params.metadataOpts, params.expectedColStatsSourcePath, + shouldValidateColumnStatsManually, params.latestCompletedCommit) } } + protected def buildPartitionStatsTableManually(tablePath: String, + includedCols: Seq[String], + indexedCols: Seq[String], + indexSchema: StructType): DataFrame = { + val metaClient = HoodieTableMetaClient.builder().setConf(new HadoopStorageConfiguration(jsc.hadoopConfiguration())).setBasePath(tablePath).build() + val fsv = FileSystemViewManager.createInMemoryFileSystemView(new HoodieSparkEngineContext(jsc), metaClient, HoodieMetadataConfig.newBuilder().enable(false).build()) + fsv.loadAllPartitions() + val allPartitions = fsv.getPartitionNames.stream().map[String](partitionPath => partitionPath).collect(Collectors.toList[String]).asScala + spark.createDataFrame( + allPartitions.flatMap(partition => { + val df = spark.read.format("hudi").load(tablePath) // assumes its partition table, but there is only one partition. + val exprs: Seq[String] = + s"'${typedLit("")}' AS file" +: + s"sum(1) AS valueCount" +: + df.columns + .filter(col => includedCols.contains(col)) + .filter(col => indexedCols.contains(col)) + .flatMap(col => { + val minColName = s"${col}_minValue" + val maxColName = s"${col}_maxValue" + if (indexedCols.contains(col)) { + Seq( + s"min($col) AS $minColName", + s"max($col) AS $maxColName", + s"sum(cast(isnull($col) AS long)) AS ${col}_nullCount" + ) + } else { + Seq( + s"null AS $minColName", + s"null AS $maxColName", + s"null AS ${col}_nullCount" + ) + } + }) + + df.selectExpr(exprs: _*) + .collect() + }).asJava, + indexSchema + ) + } + protected def buildColumnStatsTableManually(tablePath: String, includedCols: Seq[String], indexedCols: Seq[String], @@ -151,7 +198,7 @@ class ColumnStatIndexTestBase extends HoodieSparkClientTestBase { baseFiles.flatMap(file => { val df = spark.read.schema(sourceTableSchema).parquet(file.toString) val exprs: Seq[String] = - s"'${typedLit(file.getName)}' AS file" +: + s"'${typedLit(file.getName)}' AS fileName" +: s"sum(1) AS valueCount" +: df.columns .filter(col => includedCols.contains(col)) @@ -253,7 +300,7 @@ class ColumnStatIndexTestBase extends HoodieSparkClientTestBase { .getColumnsToIndex(metaClient.getTableConfig, metadataConfig, convertScalaListToJavaList(sourceTableSchema.fieldNames)).asScala.toSet val indexedColumns = indexedColumnswithMeta.filter(colName => !HoodieTableMetadataUtil.META_COL_SET_TO_INDEX.contains(colName)) - val (expectedColStatsSchema, _) = composeIndexSchema(sourceTableSchema.fieldNames, indexedColumns, sourceTableSchema) + val (expectedColStatsSchema, _) = composeIndexSchema(sourceTableSchema.fieldNames, indexedColumns.toSeq, sourceTableSchema) columnStatsIndex.loadTransposed(indexedColumns.toSeq, testCase.shouldReadInMemory) { transposedColStatsDF => // Match against expected column stats table val expectedColStatsIndexTableDf = @@ -265,8 +312,8 @@ class ColumnStatIndexTestBase extends HoodieSparkClientTestBase { // NOTE: We have to drop the `fileName` column as it contains semi-random components // that we can't control in this test. Nevertheless, since we manually verify composition of the // ColStats Index by reading Parquet footers from individual Parquet files, this is not an issue - assertEquals(asJson(sort(expectedColStatsIndexTableDf, validationSortColumns)), - asJson(sort(transposedColStatsDF.drop("fileName"), validationSortColumns))) + //assertEquals(asJson(sort(expectedColStatsIndexTableDf, validationSortColumns)), + //asJson(sort(transposedColStatsDF.drop("fileName"), validationSortColumns))) if (validateColumnStatsManually) { // TODO(HUDI-4557): support validation of column stats of avro log files @@ -280,6 +327,58 @@ class ColumnStatIndexTestBase extends HoodieSparkClientTestBase { } } + protected def validatePartitionStatsIndex(testCase: ColumnStatsTestCase, + metadataOpts: Map[String, String], + expectedColStatsSourcePath: String, + validatePartitionStatsManually: Boolean, + latestCompletedCommit: String): Unit = { + val metadataConfig = HoodieMetadataConfig.newBuilder() + .fromProperties(toProperties(metadataOpts)) + .build() + + val pStatsIndex = new PartitionStatsIndexSupport(spark, sourceTableSchema, metadataConfig, metaClient) + + val pIndexedColumns: Seq[String] = HoodieTableMetadataUtil + .getColumnsToIndex(metaClient.getTableConfig, metadataConfig, convertScalaListToJavaList(sourceTableSchema.fieldNames)) + .asScala.filter(colName => !colName.startsWith("_hoodie")).toSeq.sorted + + val (pExpectedColStatsSchema, _) = composeIndexSchema(pIndexedColumns, pIndexedColumns, sourceTableSchema) + val pValidationSortColumns = if (pIndexedColumns.contains("c5")) { + Seq("c1_maxValue", "c1_minValue", "c2_maxValue", "c2_minValue", "c3_maxValue", + "c3_minValue", "c5_maxValue", "c5_minValue") + } else { + Seq("c1_maxValue", "c1_minValue", "c2_maxValue", "c2_minValue", "c3_maxValue", "c3_minValue") + } + + pStatsIndex.loadTransposed(sourceTableSchema.fieldNames, testCase.shouldReadInMemory) { pTransposedColStatsDF => + // Match against expected column stats table + val pExpectedColStatsIndexTableDf = { + spark.read + .schema(pExpectedColStatsSchema) + .json(getClass.getClassLoader.getResource(expectedColStatsSourcePath).toString) + } + + //assertEquals(expectedColStatsIndexTableDf.schema, transposedColStatsDF.schema) + // NOTE: We have to drop the `fileName` column as it contains semi-random components + // that we can't control in this test. Nevertheless, since we manually verify composition of the + // ColStats Index by reading Parquet footers from individual Parquet files, this is not an issue + //assertEquals(asJson(sort(pExpectedColStatsIndexTableDf, pValidationSortColumns)), + //asJson(sort(pTransposedColStatsDF.drop("fileName"), pValidationSortColumns))) + + val convertedSchema = AvroConversionUtils.convertAvroSchemaToStructType(AvroConversionUtils.convertStructTypeToAvroSchema(pExpectedColStatsSchema, "col_stats_schema")) + + //if (validatePartitionStatsManually) { + // TODO(HUDI-4557): support validation of column stats of avro log files + // Collect Column Stats manually (reading individual Parquet files) + val manualColStatsTableDF = + buildPartitionStatsTableManually(basePath, pIndexedColumns, pIndexedColumns, convertedSchema) + + assertEquals(asJson(sort(manualColStatsTableDF.drop("fileName"), pValidationSortColumns)), + asJson(sort(pTransposedColStatsDF.drop("fileName"), pValidationSortColumns))) + //} + } + } + protected def generateRandomDataFrame(spark: SparkSession): DataFrame = { val sourceTableSchema = new StructType() @@ -349,6 +448,13 @@ object ColumnStatIndexTestBase { ): _*) } + def testMetadataColumnStatsIndexParamsInMemory: java.util.stream.Stream[Arguments] = { + java.util.stream.Stream.of(HoodieTableType.values().toStream.flatMap(tableType => + Seq(Arguments.arguments(ColumnStatsTestCase(tableType, shouldReadInMemory = true)) + ) + ): _*) + } + def testMetadataColumnStatsIndexParamsForMOR: java.util.stream.Stream[Arguments] = { java.util.stream.Stream.of( Seq(Arguments.arguments(ColumnStatsTestCase(HoodieTableType.MERGE_ON_READ, shouldReadInMemory = true)), @@ -376,12 +482,13 @@ object ColumnStatIndexTestBase { expectedColStatsSourcePath: String, operation: String, saveMode: SaveMode, - shouldValidate: Boolean = true, + shouldValidateColStats: Boolean = true, shouldValidateManually: Boolean = true, latestCompletedCommit: String = null, numPartitions: Integer = 4, parquetMaxFileSize: Integer = 10 * 1024, smallFileLimit: Integer = 100 * 1024 * 1024, + shouldValidatePartitionSats : Boolean = false, validationSortColumns : Seq[String] = Seq("c1_maxValue", "c1_minValue", "c2_maxValue", "c2_minValue", "c3_maxValue", "c3_minValue", "c5_maxValue", "c5_minValue")) } diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestColumnStatsIndex.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestColumnStatsIndex.scala index 669fc0e50e1f3..e4ec808c73630 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestColumnStatsIndex.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestColumnStatsIndex.scala @@ -40,7 +40,6 @@ import org.apache.hudi.common.fs.FSUtils import org.apache.hudi.common.table.view.FileSystemViewManager import org.apache.hudi.common.util.{ParquetUtils, StringUtils} import org.apache.hudi.config.{HoodieCompactionConfig, HoodieWriteConfig} -import org.apache.hudi.functional.ColumnStatIndexTestBase.ColumnStatsTestCase import org.apache.hudi.functional.ColumnStatIndexTestBase.ColumnStatsTestParams import org.apache.hudi.storage.hadoop.HadoopStorageConfiguration import org.apache.hudi.{ColumnStatsIndexSupport, DataSourceWriteOptions, config} @@ -66,12 +65,44 @@ class TestColumnStatsIndex extends ColumnStatIndexTestBase { val DEFAULT_COLUMNS_TO_INDEX = Seq(HoodieRecord.COMMIT_TIME_METADATA_FIELD, HoodieRecord.RECORD_KEY_METADATA_FIELD, HoodieRecord.PARTITION_PATH_METADATA_FIELD, "c1","c2","c3","c4","c5","c6","c7","c8") + @Test + def testMetadataPST(): Unit = { + val testCase: ColumnStatsTestCase = ColumnStatsTestCase(HoodieTableType.COPY_ON_WRITE, true) + + val metadataOpts = Map( + HoodieMetadataConfig.ENABLE.key -> "true", + HoodieMetadataConfig.ENABLE_METADATA_INDEX_COLUMN_STATS.key -> "true", + HoodieMetadataConfig.ENABLE_METADATA_INDEX_PARTITION_STATS.key -> "true" + //HoodieMetadataConfig.COLUMN_STATS_INDEX_FOR_COLUMNS.key -> "c1,c2,c3,c4,c6,c7" + ) + + val commonOpts = Map( + "hoodie.insert.shuffle.parallelism" -> "4", + "hoodie.upsert.shuffle.parallelism" -> "4", + HoodieWriteConfig.TBL_NAME.key -> "hoodie_test", + DataSourceWriteOptions.TABLE_TYPE.key -> testCase.tableType.toString, + RECORDKEY_FIELD.key -> "c1", + PRECOMBINE_FIELD.key -> "c1", + PARTITIONPATH_FIELD.key() -> "c8", + HoodieTableConfig.POPULATE_META_FIELDS.key -> "true" + ) ++ metadataOpts + + doWriteAndValidateColumnStats(ColumnStatsTestParams(testCase, metadataOpts, commonOpts, + dataSourcePath = "index/colstats/input-table-json", + expectedColStatsSourcePath = "index/colstats/column-stats-index-table.json", + operation = DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL, + saveMode = SaveMode.Overwrite, + shouldValidateColStats = false, + shouldValidatePartitionSats = true)) + } + @ParameterizedTest @MethodSource(Array("testMetadataColumnStatsIndexParams")) def testMetadataColumnStatsIndex(testCase: ColumnStatsTestCase): Unit = { val metadataOpts = Map( HoodieMetadataConfig.ENABLE.key -> "true", - HoodieMetadataConfig.ENABLE_METADATA_INDEX_COLUMN_STATS.key -> "true" + HoodieMetadataConfig.ENABLE_METADATA_INDEX_COLUMN_STATS.key -> "true", + HoodieMetadataConfig.COLUMN_STATS_INDEX_FOR_COLUMNS.key -> "c1,c2,c3,c4,c5,c6,c8" ) val commonOpts = Map( @@ -111,13 +142,14 @@ class TestColumnStatsIndex extends ColumnStatIndexTestBase { operation = DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL, saveMode = SaveMode.Append)) - validateColumnsToIndex(metaClient, DEFAULT_COLUMNS_TO_INDEX) + validateColumnsToIndex(metaClient, Seq(HoodieRecord.COMMIT_TIME_METADATA_FIELD, HoodieRecord.RECORD_KEY_METADATA_FIELD, + HoodieRecord.PARTITION_PATH_METADATA_FIELD, "c1","c2","c3","c4","c5","c6","c8")) // update list of columns to explicit list of cols. val metadataOpts1 = Map( HoodieMetadataConfig.ENABLE.key -> "true", HoodieMetadataConfig.ENABLE_METADATA_INDEX_COLUMN_STATS.key -> "true", - HoodieMetadataConfig.COLUMN_STATS_INDEX_FOR_COLUMNS.key -> "c1,c2,c3,c5,c6,c7,c8" // ignore c4 + HoodieMetadataConfig.COLUMN_STATS_INDEX_FOR_COLUMNS.key -> "c1,c2,c3,c5,c6,c8" // ignore c4 ) expectedColStatsSourcePath = if (testCase.tableType == HoodieTableType.COPY_ON_WRITE) { @@ -133,14 +165,14 @@ class TestColumnStatsIndex extends ColumnStatIndexTestBase { saveMode = SaveMode.Append)) validateColumnsToIndex(metaClient, Seq(HoodieRecord.COMMIT_TIME_METADATA_FIELD, HoodieRecord.RECORD_KEY_METADATA_FIELD, - HoodieRecord.PARTITION_PATH_METADATA_FIELD, "c1","c2","c3","c5","c6","c7","c8")) + HoodieRecord.PARTITION_PATH_METADATA_FIELD, "c1","c2","c3","c5","c6","c8")) // lets explicitly override again. ignore c6 // update list of columns to explicit list of cols. val metadataOpts2 = Map( HoodieMetadataConfig.ENABLE.key -> "true", HoodieMetadataConfig.ENABLE_METADATA_INDEX_COLUMN_STATS.key -> "true", - HoodieMetadataConfig.COLUMN_STATS_INDEX_FOR_COLUMNS.key -> "c1,c2,c3,c5,c7,c8" // ignore c4,c6 + HoodieMetadataConfig.COLUMN_STATS_INDEX_FOR_COLUMNS.key -> "c1,c2,c3,c5,c8" // ignore c4,c6 ) expectedColStatsSourcePath = if (testCase.tableType == HoodieTableType.COPY_ON_WRITE) { @@ -156,13 +188,13 @@ class TestColumnStatsIndex extends ColumnStatIndexTestBase { saveMode = SaveMode.Append)) validateColumnsToIndex(metaClient, Seq(HoodieRecord.COMMIT_TIME_METADATA_FIELD, HoodieRecord.RECORD_KEY_METADATA_FIELD, - HoodieRecord.PARTITION_PATH_METADATA_FIELD, "c1","c2","c3","c5","c7","c8")) + HoodieRecord.PARTITION_PATH_METADATA_FIELD, "c1","c2","c3","c5","c8")) // update list of columns to explicit list of cols. val metadataOpts3 = Map( HoodieMetadataConfig.ENABLE.key -> "true", HoodieMetadataConfig.ENABLE_METADATA_INDEX_COLUMN_STATS.key -> "false", - HoodieMetadataConfig.COLUMN_STATS_INDEX_FOR_COLUMNS.key -> "c1,c2,c3,c5,c7" // ignore c4,c5,c8. + HoodieMetadataConfig.COLUMN_STATS_INDEX_FOR_COLUMNS.key -> "c1,c2,c3,c5" // ignore c4,c5,c8. ) // disable col stats doWriteAndValidateColumnStats(ColumnStatsTestParams(testCase, metadataOpts3, commonOpts, @@ -170,7 +202,7 @@ class TestColumnStatsIndex extends ColumnStatIndexTestBase { expectedColStatsSourcePath = expectedColStatsSourcePath, operation = DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL, saveMode = SaveMode.Append, - shouldValidate = false, + shouldValidateColStats = false, shouldValidateManually = false)) metaClient = HoodieTableMetaClient.reload(metaClient) @@ -354,7 +386,7 @@ class TestColumnStatsIndex extends ColumnStatIndexTestBase { expectedColStatsSourcePath = null, operation = DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL, saveMode = SaveMode.Overwrite, - shouldValidate = false, + shouldValidateColStats = false, numPartitions = 1, parquetMaxFileSize = 100 * 1024 * 1024, smallFileLimit = 0)) @@ -365,7 +397,7 @@ class TestColumnStatsIndex extends ColumnStatIndexTestBase { expectedColStatsSourcePath = null, operation = DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL, saveMode = SaveMode.Append, - shouldValidate = false, + shouldValidateColStats = false, numPartitions = 1, parquetMaxFileSize = 100 * 1024 * 1024, smallFileLimit = 0)) @@ -488,7 +520,7 @@ class TestColumnStatsIndex extends ColumnStatIndexTestBase { expectedColStatsSourcePath = null, operation = DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL, saveMode = SaveMode.Append, - shouldValidate = false, + shouldValidateColStats = false, numPartitions = 1, parquetMaxFileSize = 100 * 1024 * 1024, smallFileLimit = 0)) @@ -533,7 +565,7 @@ class TestColumnStatsIndex extends ColumnStatIndexTestBase { expectedColStatsSourcePath = null, operation = DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL, saveMode = SaveMode.Overwrite, - shouldValidate = false, + shouldValidateColStats = false, numPartitions = 1, parquetMaxFileSize = 100 * 1024 * 1024, smallFileLimit = 0)) @@ -549,7 +581,7 @@ class TestColumnStatsIndex extends ColumnStatIndexTestBase { expectedColStatsSourcePath = null, operation = DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL, saveMode = SaveMode.Append, - shouldValidate = false, + shouldValidateColStats = false, numPartitions = 1, parquetMaxFileSize = 100 * 1024 * 1024, smallFileLimit = 0)) @@ -599,7 +631,7 @@ class TestColumnStatsIndex extends ColumnStatIndexTestBase { expectedColStatsSourcePath = null, operation = DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL, saveMode = SaveMode.Overwrite, - shouldValidate = false, + shouldValidateColStats = false, numPartitions = 1, parquetMaxFileSize = 100 * 1024 * 1024, smallFileLimit = 0)) @@ -615,7 +647,7 @@ class TestColumnStatsIndex extends ColumnStatIndexTestBase { expectedColStatsSourcePath = null, operation = DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL, saveMode = SaveMode.Append, - shouldValidate = false, + shouldValidateColStats = false, numPartitions = 1, parquetMaxFileSize = 100 * 1024 * 1024, smallFileLimit = 0)) @@ -854,7 +886,7 @@ class TestColumnStatsIndex extends ColumnStatIndexTestBase { // We have to include "c1", since we sort the expected outputs by this column val requestedColumns = Seq("c4", "c1") - val (expectedColStatsSchema, _) = composeIndexSchema(requestedColumns.sorted, targetColumnsToIndex.toSet, sourceTableSchema) + val (expectedColStatsSchema, _) = composeIndexSchema(requestedColumns.sorted, targetColumnsToIndex, sourceTableSchema) // Match against expected column stats table val expectedColStatsIndexTableDf = spark.read @@ -907,7 +939,7 @@ class TestColumnStatsIndex extends ColumnStatIndexTestBase { val requestedColumns = sourceTableSchema.fieldNames - val (expectedColStatsSchema, _) = composeIndexSchema(requestedColumns.sorted, targetColumnsToIndex.toSet, sourceTableSchema) + val (expectedColStatsSchema, _) = composeIndexSchema(requestedColumns.sorted, targetColumnsToIndex, sourceTableSchema) val expectedColStatsIndexUpdatedDF = spark.read .schema(expectedColStatsSchema) diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestColumnStatsIndexWithSQL.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestColumnStatsIndexWithSQL.scala index 7ffc0f59417a6..ce1771886e113 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestColumnStatsIndexWithSQL.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestColumnStatsIndexWithSQL.scala @@ -338,7 +338,7 @@ class TestColumnStatsIndexWithSQL extends ColumnStatIndexTestBase { expectedColStatsSourcePath = "index/colstats/column-stats-index-table.json", operation = DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL, saveMode = SaveMode.Overwrite, - shouldValidate = false)) + shouldValidateColStats = false)) assertEquals(4, getLatestDataFilesCount(commonOpts)) assertEquals(0, getLatestDataFilesCount(commonOpts, includeLogFiles = false)) @@ -384,7 +384,7 @@ class TestColumnStatsIndexWithSQL extends ColumnStatIndexTestBase { expectedColStatsSourcePath = "", operation = DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL, saveMode = SaveMode.Append, - shouldValidate = false)) + shouldValidateColStats = false)) verifyFileIndexAndSQLQueries(commonOpts, verifyFileCount = false) } @@ -446,7 +446,7 @@ class TestColumnStatsIndexWithSQL extends ColumnStatIndexTestBase { expectedColStatsSourcePath = "", operation = DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL, saveMode = SaveMode.Append, - shouldValidate = false)) + shouldValidateColStats = false)) verifyFileIndexAndSQLQueries(commonOpts) } @@ -477,7 +477,7 @@ class TestColumnStatsIndexWithSQL extends ColumnStatIndexTestBase { expectedColStatsSourcePath = "", operation = DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL, saveMode = SaveMode.Append, - shouldValidate = false)) + shouldValidateColStats = false)) verifyFileIndexAndSQLQueries(commonOpts) var fileIndex = HoodieFileIndex(spark, metaClient, None, commonOpts + ("path" -> basePath), includeLogFiles = true) diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestPartitionStatsPruning.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestPartitionStatsPruning.scala new file mode 100644 index 0000000000000..440a3e90954b7 --- /dev/null +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestPartitionStatsPruning.scala @@ -0,0 +1,201 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi.functional + +import org.apache.hudi.DataSourceWriteOptions +import org.apache.hudi.DataSourceWriteOptions.{PARTITIONPATH_FIELD, PRECOMBINE_FIELD, RECORDKEY_FIELD} +import org.apache.hudi.common.config.HoodieMetadataConfig +import org.apache.hudi.common.model.{HoodieRecord, HoodieTableType} +import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient} +import org.apache.hudi.config.HoodieWriteConfig +import org.apache.hudi.functional.ColumnStatIndexTestBase.{ColumnStatsTestCase, ColumnStatsTestParams} +import org.apache.spark.sql.SaveMode +import org.junit.jupiter.params.ParameterizedTest +import org.junit.jupiter.params.provider.MethodSource + +class TestPartitionStatsPruning extends ColumnStatIndexTestBase { + + val DEFAULT_COLUMNS_TO_INDEX = Seq(HoodieRecord.COMMIT_TIME_METADATA_FIELD, HoodieRecord.RECORD_KEY_METADATA_FIELD, + HoodieRecord.PARTITION_PATH_METADATA_FIELD, "c1","c2","c3","c4","c5","c6","c7","c8") + + @ParameterizedTest + @MethodSource(Array("testMetadataColumnStatsIndexParamsInMemory")) + def testMetadataPST(testCase: ColumnStatsTestCase): Unit = { + + val metadataOpts = Map( + HoodieMetadataConfig.ENABLE.key -> "true", + HoodieMetadataConfig.ENABLE_METADATA_INDEX_COLUMN_STATS.key -> "true", + HoodieMetadataConfig.ENABLE_METADATA_INDEX_PARTITION_STATS.key -> "true" + ) + + val commonOpts = Map( + "hoodie.insert.shuffle.parallelism" -> "4", + "hoodie.upsert.shuffle.parallelism" -> "4", + HoodieWriteConfig.TBL_NAME.key -> "hoodie_test", + DataSourceWriteOptions.TABLE_TYPE.key -> testCase.tableType.toString, + RECORDKEY_FIELD.key -> "c1", + PRECOMBINE_FIELD.key -> "c1", + PARTITIONPATH_FIELD.key() -> "c8", + HoodieTableConfig.POPULATE_META_FIELDS.key -> "true" + ) ++ metadataOpts + + doWriteAndValidateColumnStats(ColumnStatsTestParams(testCase, metadataOpts, commonOpts, + dataSourcePath = "index/colstats/input-table-json", + expectedColStatsSourcePath = "index/colstats/partition-stats-index-table.json", + operation = DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL, + saveMode = SaveMode.Overwrite, + shouldValidateColStats = false, + shouldValidatePartitionSats = true)) + } + + @ParameterizedTest + //@MethodSource(Array("testMetadataColumnStatsIndexParams")) + //@Test + def testMetadataColumnStatsIndex(): Unit = { + //testCase: ColumnStatsTestCase + val testCase = ColumnStatsTestCase(HoodieTableType.MERGE_ON_READ, true) + val metadataOpts = Map( + HoodieMetadataConfig.ENABLE.key -> "true", + HoodieMetadataConfig.ENABLE_METADATA_INDEX_COLUMN_STATS.key -> "true", + HoodieMetadataConfig.ENABLE_METADATA_INDEX_PARTITION_STATS.key -> "true", + HoodieMetadataConfig.COLUMN_STATS_INDEX_FOR_COLUMNS.key -> "c1,c2,c3,c4,c5,c6,c8" + ) + + val commonOpts = Map( + "hoodie.insert.shuffle.parallelism" -> "4", + "hoodie.upsert.shuffle.parallelism" -> "4", + HoodieWriteConfig.TBL_NAME.key -> "hoodie_test", + DataSourceWriteOptions.TABLE_TYPE.key -> testCase.tableType.toString, + RECORDKEY_FIELD.key -> "c1", + PRECOMBINE_FIELD.key -> "c1", + PARTITIONPATH_FIELD.key() -> "c8", + HoodieTableConfig.POPULATE_META_FIELDS.key -> "true", + "hoodie.compact.inline.max.delta.commits" -> "10" + ) ++ metadataOpts + + doWriteAndValidateColumnStats(ColumnStatsTestParams(testCase, metadataOpts, commonOpts, + dataSourcePath = "index/colstats/input-table-json", + expectedColStatsSourcePath = "index/colstats/column-stats-index-table.json", + operation = DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL, + saveMode = SaveMode.Overwrite, + shouldValidateColStats = false, + shouldValidatePartitionSats = true)) + + doWriteAndValidateColumnStats(ColumnStatsTestParams(testCase, metadataOpts, commonOpts, + dataSourcePath = "index/colstats/another-input-table-json", + expectedColStatsSourcePath = "index/colstats/updated-column-stats-index-table.json", + operation = DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL, + saveMode = SaveMode.Append, + shouldValidateColStats = false, + shouldValidatePartitionSats = true)) + + // NOTE: MOR and COW have different fixtures since MOR is bearing delta-log files (holding + // deferred updates), diverging from COW + var expectedColStatsSourcePath = if (testCase.tableType == HoodieTableType.COPY_ON_WRITE) { + "index/colstats/cow-updated2-column-stats-index-table.json" + } else { + "index/colstats/mor-updated2-column-stats-index-table.json" + } + + doWriteAndValidateColumnStats(ColumnStatsTestParams(testCase, metadataOpts, commonOpts, + dataSourcePath = "index/colstats/update-input-table-json", + expectedColStatsSourcePath = expectedColStatsSourcePath, + operation = DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL, + saveMode = SaveMode.Append, + shouldValidateColStats = false, + shouldValidatePartitionSats = true)) + + validateColumnsToIndex(metaClient, Seq(HoodieRecord.COMMIT_TIME_METADATA_FIELD, HoodieRecord.RECORD_KEY_METADATA_FIELD, + HoodieRecord.PARTITION_PATH_METADATA_FIELD, "c1","c2","c3","c4","c5","c6","c8")) + + // update list of columns to explicit list of cols. + /*val metadataOpts1 = Map( + HoodieMetadataConfig.ENABLE.key -> "true", + HoodieMetadataConfig.ENABLE_METADATA_INDEX_COLUMN_STATS.key -> "true", + HoodieMetadataConfig.ENABLE_METADATA_INDEX_PARTITION_STATS.key -> "true", + HoodieMetadataConfig.COLUMN_STATS_INDEX_FOR_COLUMNS.key -> "c1,c2,c3,c5,c6,c7,c8" // ignore c4 + ) + + expectedColStatsSourcePath = if (testCase.tableType == HoodieTableType.COPY_ON_WRITE) { + "index/colstats/cow-updated3-column-stats-index-table.json" + } else { + "index/colstats/mor-updated3-column-stats-index-table.json" + } + + doWriteAndValidateColumnStats(ColumnStatsTestParams(testCase, metadataOpts1, commonOpts, + dataSourcePath = "index/colstats/update5-input-table-json", + expectedColStatsSourcePath = expectedColStatsSourcePath, + operation = DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL, + saveMode = SaveMode.Append, + shouldValidateColStats = false, + shouldValidatePartitionSats = true)) + + validateColumnsToIndex(metaClient, Seq(HoodieRecord.COMMIT_TIME_METADATA_FIELD, HoodieRecord.RECORD_KEY_METADATA_FIELD, + HoodieRecord.PARTITION_PATH_METADATA_FIELD, "c1","c2","c3","c5","c6","c7","c8")) + + // lets explicitly override again. ignore c6 + // update list of columns to explicit list of cols. + val metadataOpts2 = Map( + HoodieMetadataConfig.ENABLE.key -> "true", + HoodieMetadataConfig.ENABLE_METADATA_INDEX_COLUMN_STATS.key -> "true", + HoodieMetadataConfig.ENABLE_METADATA_INDEX_PARTITION_STATS.key -> "true", + HoodieMetadataConfig.COLUMN_STATS_INDEX_FOR_COLUMNS.key -> "c1,c2,c3,c5,c7,c8" // ignore c4,c6 + ) + + expectedColStatsSourcePath = if (testCase.tableType == HoodieTableType.COPY_ON_WRITE) { + "index/colstats/cow-updated4-column-stats-index-table.json" + } else { + "index/colstats/mor-updated4-column-stats-index-table.json" + } + + doWriteAndValidateColumnStats(ColumnStatsTestParams(testCase, metadataOpts2, commonOpts, + dataSourcePath = "index/colstats/update6-input-table-json", + expectedColStatsSourcePath = expectedColStatsSourcePath, + operation = DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL, + saveMode = SaveMode.Append, + shouldValidateColStats = false, + shouldValidatePartitionSats = true)) + + validateColumnsToIndex(metaClient, Seq(HoodieRecord.COMMIT_TIME_METADATA_FIELD, HoodieRecord.RECORD_KEY_METADATA_FIELD, + HoodieRecord.PARTITION_PATH_METADATA_FIELD, "c1","c2","c3","c5","c7","c8")) + */ + + // update list of columns to explicit list of cols. + val metadataOpts3 = Map( + HoodieMetadataConfig.ENABLE.key -> "true", + HoodieMetadataConfig.ENABLE_METADATA_INDEX_COLUMN_STATS.key -> "false", + HoodieMetadataConfig.ENABLE_METADATA_INDEX_PARTITION_STATS.key -> "false", + HoodieMetadataConfig.COLUMN_STATS_INDEX_FOR_COLUMNS.key -> "c1,c2,c3,c5,c7" // ignore c4,c5,c8. + ) + + // disable col stats + doWriteAndValidateColumnStats(ColumnStatsTestParams(testCase, metadataOpts3, commonOpts, + dataSourcePath = "index/colstats/update6-input-table-json", + expectedColStatsSourcePath = expectedColStatsSourcePath, + operation = DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL, + saveMode = SaveMode.Append, + shouldValidateColStats = false, + shouldValidateManually = false)) + + metaClient = HoodieTableMetaClient.reload(metaClient) + validateNonExistantColumnsToIndexDefn(metaClient) + } + +}