Skip to content

Commit 6b0f6bf

Browse files
committed
Addressing feedback from sagar
1 parent f676afd commit 6b0f6bf

File tree

3 files changed

+25
-44
lines changed

3 files changed

+25
-44
lines changed

Diff for: hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java

+9-22
Original file line numberDiff line numberDiff line change
@@ -1517,32 +1517,19 @@ public static Comparable<?> unwrapAvroValueWrapper(Object avroValueWrapper, Stri
15171517
if (avroValueWrapper == null) {
15181518
return null;
15191519
} else if (DateWrapper.class.getSimpleName().equals(wrapperClassName)) {
1520-
if (avroValueWrapper instanceof GenericRecord) {
1521-
return Date.valueOf(LocalDate.ofEpochDay((Integer) ((GenericRecord) avroValueWrapper).get(0)));
1522-
} else {
1523-
return Date.valueOf(LocalDate.ofEpochDay((Integer) ((Record) avroValueWrapper).get(0)));
1524-
}
1520+
ValidationUtils.checkArgument(avroValueWrapper instanceof GenericRecord);
1521+
return Date.valueOf(LocalDate.ofEpochDay((Integer) ((GenericRecord) avroValueWrapper).get(0)));
15251522
} else if (LocalDateWrapper.class.getSimpleName().equals(wrapperClassName)) {
1526-
if (avroValueWrapper instanceof GenericRecord) {
1527-
return LocalDate.ofEpochDay((Integer) ((GenericRecord) avroValueWrapper).get(0));
1528-
} else {
1529-
return LocalDate.ofEpochDay((Integer) ((Record) avroValueWrapper).get(0));
1530-
}
1523+
ValidationUtils.checkArgument(avroValueWrapper instanceof GenericRecord);
1524+
return LocalDate.ofEpochDay((Integer) ((GenericRecord) avroValueWrapper).get(0));
15311525
} else if (TimestampMicrosWrapper.class.getSimpleName().equals(wrapperClassName)) {
1532-
if (avroValueWrapper instanceof GenericRecord) {
1533-
Instant instant = microsToInstant((Long) ((GenericRecord) avroValueWrapper).get(0));
1534-
return Timestamp.from(instant);
1535-
} else {
1536-
Instant instant = microsToInstant((Long) ((Record) avroValueWrapper).get(0));
1537-
return Timestamp.from(instant);
1538-
}
1526+
ValidationUtils.checkArgument(avroValueWrapper instanceof GenericRecord);
1527+
Instant instant = microsToInstant((Long) ((GenericRecord) avroValueWrapper).get(0));
1528+
return Timestamp.from(instant);
15391529
} else if (DecimalWrapper.class.getSimpleName().equals(wrapperClassName)) {
15401530
Schema valueSchema = DecimalWrapper.SCHEMA$.getField("value").schema();
1541-
if (avroValueWrapper instanceof GenericRecord) {
1542-
return AVRO_DECIMAL_CONVERSION.fromBytes((ByteBuffer)((GenericRecord) avroValueWrapper).get(0), valueSchema, valueSchema.getLogicalType());
1543-
} else {
1544-
return AVRO_DECIMAL_CONVERSION.fromBytes((ByteBuffer) ((Record) avroValueWrapper).get(0), valueSchema, valueSchema.getLogicalType());
1545-
}
1531+
ValidationUtils.checkArgument(avroValueWrapper instanceof GenericRecord);
1532+
return AVRO_DECIMAL_CONVERSION.fromBytes((ByteBuffer)((GenericRecord) avroValueWrapper).get(0), valueSchema, valueSchema.getLogicalType());
15461533
} else {
15471534
throw new UnsupportedOperationException(String.format("Unsupported type of the value (%s)", avroValueWrapper.getClass()));
15481535
}

Diff for: hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/ColumnStatIndexTestBase.scala

+7-13
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,7 @@ class ColumnStatIndexTestBase extends HoodieSparkClientTestBase {
6969
.add("c4", TimestampType)
7070
.add("c5", ShortType)
7171
.add("c6", DateType)
72-
.add("c7", StringType)
72+
.add("c7", StringType) // HUDI-8909. To support Byte w/ partition stats index.
7373
.add("c8", ByteType)
7474

7575
@BeforeEach
@@ -131,12 +131,11 @@ class ColumnStatIndexTestBase extends HoodieSparkClientTestBase {
131131

132132
validateColumnStatsIndex(params.testCase, params.metadataOpts, params.expectedColStatsSourcePath,
133133
shouldValidateColumnStatsManually, params.validationSortColumns)
134-
} else if (params.shouldValidatePartitionSats) {
134+
} else if (params.shouldValidatePartitionStats) {
135135
val shouldValidateColumnStatsManually = (params.testCase.tableType == HoodieTableType.COPY_ON_WRITE ||
136136
params.operation.equals(DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL)) && params.shouldValidateManually
137137

138-
validatePartitionStatsIndex(params.testCase, params.metadataOpts, params.expectedColStatsSourcePath,
139-
shouldValidateColumnStatsManually, params.latestCompletedCommit)
138+
validatePartitionStatsIndex(params.testCase, params.metadataOpts, params.expectedColStatsSourcePath)
140139
}
141140
}
142141

@@ -329,9 +328,7 @@ class ColumnStatIndexTestBase extends HoodieSparkClientTestBase {
329328

330329
protected def validatePartitionStatsIndex(testCase: ColumnStatsTestCase,
331330
metadataOpts: Map[String, String],
332-
expectedColStatsSourcePath: String,
333-
validatePartitionStatsManually: Boolean,
334-
latestCompletedCommit: String): Unit = {
331+
expectedColStatsSourcePath: String): Unit = {
335332
val metadataConfig = HoodieMetadataConfig.newBuilder()
336333
.fromProperties(toProperties(metadataOpts))
337334
.build()
@@ -361,13 +358,10 @@ class ColumnStatIndexTestBase extends HoodieSparkClientTestBase {
361358
val colsToDrop = if (testCase.tableType == HoodieTableType.COPY_ON_WRITE) {
362359
Seq("fileName")
363360
} else {
364-
Seq("fileName","valueCount")
361+
Seq("fileName","valueCount") // for MOR, value count may not match, since w/ we could have repeated updates across multiple log files.
362+
// So, value count might be larger w/ MOR stats when compared to calculating it manually.
365363
}
366364

367-
//assertEquals(expectedColStatsIndexTableDf.schema, transposedColStatsDF.schema)
368-
// NOTE: We have to drop the `fileName` column as it contains semi-random components
369-
// that we can't control in this test. Nevertheless, since we manually verify composition of the
370-
// ColStats Index by reading Parquet footers from individual Parquet files, this is not an issue
371365
assertEquals(asJson(sort(pExpectedColStatsIndexTableDf.drop(colsToDrop: _*), pValidationSortColumns)),
372366
asJson(sort(pTransposedColStatsDF.drop(colsToDrop: _*), pValidationSortColumns)))
373367

@@ -492,7 +486,7 @@ object ColumnStatIndexTestBase {
492486
numPartitions: Integer = 4,
493487
parquetMaxFileSize: Integer = 10 * 1024,
494488
smallFileLimit: Integer = 100 * 1024 * 1024,
495-
shouldValidatePartitionSats : Boolean = false,
489+
shouldValidatePartitionStats : Boolean = false,
496490
validationSortColumns : Seq[String] = Seq("c1_maxValue", "c1_minValue", "c2_maxValue",
497491
"c2_minValue", "c3_maxValue", "c3_minValue", "c5_maxValue", "c5_minValue"))
498492
}

Diff for: hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestPartitionStatsPruning.scala

+9-9
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,7 @@ class TestPartitionStatsPruning extends ColumnStatIndexTestBase {
7171
operation = DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL,
7272
saveMode = SaveMode.Overwrite,
7373
shouldValidateColStats = false,
74-
shouldValidatePartitionSats = true))
74+
shouldValidatePartitionStats = true))
7575
}
7676

7777
@ParameterizedTest
@@ -101,23 +101,23 @@ class TestPartitionStatsPruning extends ColumnStatIndexTestBase {
101101
operation = DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL,
102102
saveMode = SaveMode.Overwrite,
103103
shouldValidateColStats = false,
104-
shouldValidatePartitionSats = true))
104+
shouldValidatePartitionStats = true))
105105

106106
doWriteAndValidateColumnStats(ColumnStatsTestParams(testCase, metadataOpts, commonOpts,
107107
dataSourcePath = "index/colstats/another-input-table-json",
108108
expectedColStatsSourcePath = "index/colstats/updated-partition-stats-index-table.json",
109109
operation = DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL,
110110
saveMode = SaveMode.Append,
111111
shouldValidateColStats = false,
112-
shouldValidatePartitionSats = true))
112+
shouldValidatePartitionStats = true))
113113

114114
doWriteAndValidateColumnStats(ColumnStatsTestParams(testCase, metadataOpts, commonOpts,
115115
dataSourcePath = "index/colstats/update-input-table-json",
116116
expectedColStatsSourcePath = "index/colstats/updated-partition-stats-2-index-table.json",
117117
operation = DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL,
118118
saveMode = SaveMode.Append,
119119
shouldValidateColStats = false,
120-
shouldValidatePartitionSats = true))
120+
shouldValidatePartitionStats = true))
121121

122122
validateColumnsToIndex(metaClient, Seq(HoodieRecord.COMMIT_TIME_METADATA_FIELD, HoodieRecord.RECORD_KEY_METADATA_FIELD,
123123
HoodieRecord.PARTITION_PATH_METADATA_FIELD, "c1","c2","c3","c4","c5","c6","c7","c8"))
@@ -136,7 +136,7 @@ class TestPartitionStatsPruning extends ColumnStatIndexTestBase {
136136
operation = DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL,
137137
saveMode = SaveMode.Append,
138138
shouldValidateColStats = false,
139-
shouldValidatePartitionSats = true))
139+
shouldValidatePartitionStats = true))
140140

141141
validateColumnsToIndex(metaClient, Seq(HoodieRecord.COMMIT_TIME_METADATA_FIELD, HoodieRecord.RECORD_KEY_METADATA_FIELD,
142142
HoodieRecord.PARTITION_PATH_METADATA_FIELD, "c1","c2","c3","c5","c6","c7","c8"))
@@ -156,7 +156,7 @@ class TestPartitionStatsPruning extends ColumnStatIndexTestBase {
156156
operation = DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL,
157157
saveMode = SaveMode.Append,
158158
shouldValidateColStats = false,
159-
shouldValidatePartitionSats = true))
159+
shouldValidatePartitionStats = true))
160160

161161
validateColumnsToIndex(metaClient, Seq(HoodieRecord.COMMIT_TIME_METADATA_FIELD, HoodieRecord.RECORD_KEY_METADATA_FIELD,
162162
HoodieRecord.PARTITION_PATH_METADATA_FIELD, "c1","c2","c3","c5","c7","c8"))
@@ -282,7 +282,7 @@ class TestPartitionStatsPruning extends ColumnStatIndexTestBase {
282282
parquetMaxFileSize = 100 * 1024 * 1024,
283283
smallFileLimit = 0,
284284
shouldValidateColStats = false,
285-
shouldValidatePartitionSats = true))
285+
shouldValidatePartitionStats = true))
286286

287287
expectedColStatsSourcePath = if (testCase.tableType == HoodieTableType.COPY_ON_WRITE) {
288288
"index/colstats/cow-bootstrap2-partition-stats-index-table.json"
@@ -300,7 +300,7 @@ class TestPartitionStatsPruning extends ColumnStatIndexTestBase {
300300
parquetMaxFileSize = 100 * 1024 * 1024,
301301
smallFileLimit = 0,
302302
shouldValidateColStats = false,
303-
shouldValidatePartitionSats = true))
303+
shouldValidatePartitionStats = true))
304304

305305
validateColumnsToIndex(metaClient, Seq(HoodieRecord.COMMIT_TIME_METADATA_FIELD, HoodieRecord.RECORD_KEY_METADATA_FIELD,
306306
HoodieRecord.PARTITION_PATH_METADATA_FIELD, "c1","c2","c3","c4","c5","c6","c7","c8"))
@@ -372,7 +372,7 @@ class TestPartitionStatsPruning extends ColumnStatIndexTestBase {
372372
parquetMaxFileSize = 100 * 1024 * 1024,
373373
smallFileLimit = 0,
374374
shouldValidateColStats = false,
375-
shouldValidatePartitionSats = true))
375+
shouldValidatePartitionStats = true))
376376

377377
metaClient = HoodieTableMetaClient.reload(metaClient)
378378
assertTrue(metaClient.getActiveTimeline.getRollbackTimeline.countInstants() > 0)

0 commit comments

Comments
 (0)