Skip to content

Commit 37ee992

Browse files
ZiyaZacloud-fan
authored andcommitted
[SPARK-53535][SQL] Fix missing structs always being assumed as nulls
### What changes were proposed in this pull request? Currently, if all fields of a struct mentioned in the read schema are missing in a Parquet file, the reader populates the struct with nulls. This PR modifies the scan behavior so that if the struct exists in the Parquet schema but none of the fields from the read schema are present, we instead pick an arbitrary field from the Parquet file to read and use that to populate NULLs (as well as outer NULLs and array sizes if the struct is nested in another nested type). This is done by changing the schema requested by the readers. We add an additional field to the requested schema when clipping the Parquet file schema according to the Spark schema. This means that the readers actually read and return more data than requested, which can cause problems. This is only a problem for the `VectorizedParquetRecordReader`, since for the other read code path via parquet-mr, we already have an `UnsafeProjection` for outputting only requested schema fields in `ParquetFileFormat`. To ensure `VectorizedParquetRecordReader` only returns Spark requested fields, we create the `ColumnarBatch` with vectors that match the requested schema (we get rid of the additional fields by recursively matching `sparkSchema` with `sparkRequestedSchema` and ensuring structs have the same length in both). Then `ParquetColumnVector`s are responsible for allocating dummy vectors to hold the data temporarily while reading, but these are not exposed to the outside. The heuristic to pick the arbitrary leaf field is as follows: We try to minimize the amount of arrays or maps (repeated fields) in the path to a leaf column, because the more repeated fields we have the more likely we are to read larger amount of data. At the same repetition level, we consider the type of each column to pick the cheapest column to read (struct nesting do not affect the decision here). We look at the byte size of the column type to pick the cheapest one as follows: - BOOLEAN: 1 byte - INT32, FLOAT: 4 bytes - INT64, DOUBLE: 8 bytes - INT96: 12 bytes - BINARY, FIXED_LEN_BYTE_ARRAY, default case for future types: 32 bytes (high cost due to variable/large size) ### Why are the changes needed? This is a bug fix, because we were incorrectly assuming non-null struct values to be missing from the file depending on requested fields and returning null values. ### Does this PR introduce _any_ user-facing change? Yes. We previously assumed structs to be null if all the fields we are trying to read from a Parquet file were missing from that file, even if the file contained other fields that could be used to take definition levels from. See an example from the Jira ticket below: ```python df_a = sql('SELECT 1 as id, named_struct("a", 1) AS s') path = "/tmp/missing_col_test" df_a.write.format("parquet").save(path) df_b = sql('SELECT 2 as id, named_struct("b", 3) AS s') spark.read.format("parquet").schema(df_b.schema).load(path).show() ``` This used to return: ``` +---+----+ | id| s| +---+----+ | 1|NULL| +---+----+ ``` It now returns: ``` +---+------+ | id| s| +---+------+ | 1|{NULL}| +---+------+ ``` ### How was this patch tested? Added new unit tests, also fixed an old test to expect this new behavior. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #52557 from ZiyaZa/missing_struct. Authored-by: Ziya Mukhtarov <[email protected]> Signed-off-by: Wenchen Fan <[email protected]>
1 parent 1f21a8b commit 37ee992

File tree

15 files changed

+898
-121
lines changed

15 files changed

+898
-121
lines changed

docs/sql-migration-guide.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,10 @@ license: |
2222
* Table of contents
2323
{:toc}
2424

25+
## Upgrading from Spark SQL 4.0 to 4.1
26+
27+
- Since Spark 4.1, the Parquet reader no longer assumes all struct values to be null, if all the requested fields are missing in the parquet file. The new default behavior is to read an additional struct field that is present in the file to determine nullness. To restore the previous behavior, set `spark.sql.legacy.parquet.returnNullStructIfAllFieldsMissing` to `true`.
28+
2529
## Upgrading from Spark SQL 3.5 to 4.0
2630

2731
- Since Spark 4.0, `spark.sql.ansi.enabled` is on by default. To restore the previous behavior, set `spark.sql.ansi.enabled` to `false` or `SPARK_ANSI_SQL_MODE` to `false`.

sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1534,6 +1534,18 @@ object SQLConf {
15341534
.booleanConf
15351535
.createWithDefault(true)
15361536

1537+
val LEGACY_PARQUET_RETURN_NULL_STRUCT_IF_ALL_FIELDS_MISSING =
1538+
buildConf("spark.sql.legacy.parquet.returnNullStructIfAllFieldsMissing")
1539+
.internal()
1540+
.doc("When true, if all requested fields of a struct are missing in a parquet file, assume " +
1541+
"the struct is always null, even if other fields are present. The default behavior is " +
1542+
"to fetch and read an arbitrary non-requested field present in the file to determine " +
1543+
"struct nullness. If enabled, schema pruning may cause non-null structs to be read as " +
1544+
"null.")
1545+
.version("4.1.0")
1546+
.booleanConf
1547+
.createWithDefault(false)
1548+
15371549
val PARQUET_RECORD_FILTER_ENABLED = buildConf("spark.sql.parquet.recordLevelFilter.enabled")
15381550
.doc("If true, enables Parquet's native record-level filtering using the pushed down " +
15391551
"filters. " +

sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetColumnVector.java

Lines changed: 18 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -21,14 +21,10 @@
2121
import java.util.List;
2222
import java.util.Set;
2323

24-
import org.apache.spark.memory.MemoryMode;
2524
import org.apache.spark.network.util.JavaUtils;
26-
import org.apache.spark.sql.execution.vectorized.OffHeapColumnVector;
27-
import org.apache.spark.sql.execution.vectorized.OnHeapColumnVector;
2825
import org.apache.spark.sql.execution.vectorized.WritableColumnVector;
2926
import org.apache.spark.sql.types.ArrayType;
3027
import org.apache.spark.sql.types.DataType;
31-
import org.apache.spark.sql.catalyst.types.DataTypeUtils;
3228
import org.apache.spark.sql.types.DataTypes;
3329
import org.apache.spark.sql.types.MapType;
3430
import org.apache.spark.sql.types.StructType;
@@ -69,16 +65,9 @@ final class ParquetColumnVector {
6965
ParquetColumn column,
7066
WritableColumnVector vector,
7167
int capacity,
72-
MemoryMode memoryMode,
7368
Set<ParquetColumn> missingColumns,
7469
boolean isTopLevel,
7570
Object defaultValue) {
76-
DataType sparkType = column.sparkType();
77-
if (!DataTypeUtils.sameType(sparkType, vector.dataType())) {
78-
throw new IllegalArgumentException("Spark type: " + sparkType +
79-
" doesn't match the type: " + vector.dataType() + " in column vector");
80-
}
81-
8271
this.column = column;
8372
this.vector = vector;
8473
this.children = new ArrayList<>();
@@ -111,35 +100,41 @@ final class ParquetColumnVector {
111100

112101
if (column.variantFileType().isDefined()) {
113102
ParquetColumn fileContentCol = column.variantFileType().get();
114-
WritableColumnVector fileContent = memoryMode == MemoryMode.OFF_HEAP
115-
? new OffHeapColumnVector(capacity, fileContentCol.sparkType())
116-
: new OnHeapColumnVector(capacity, fileContentCol.sparkType());
117-
ParquetColumnVector contentVector = new ParquetColumnVector(fileContentCol,
118-
fileContent, capacity, memoryMode, missingColumns, false, null);
103+
WritableColumnVector fileContent = vector.reserveNewColumn(
104+
capacity, fileContentCol.sparkType());
105+
ParquetColumnVector contentVector = new ParquetColumnVector(fileContentCol, fileContent,
106+
capacity, missingColumns, /* isTopLevel= */ false, /* defaultValue= */ null);
119107
children.add(contentVector);
120108
variantSchema = SparkShreddingUtils.buildVariantSchema(fileContentCol.sparkType());
121109
fieldsToExtract = SparkShreddingUtils.getFieldsToExtract(column.sparkType(), variantSchema);
122110
repetitionLevels = contentVector.repetitionLevels;
123111
definitionLevels = contentVector.definitionLevels;
124112
} else if (isPrimitive) {
125113
if (column.repetitionLevel() > 0) {
126-
repetitionLevels = allocateLevelsVector(capacity, memoryMode);
114+
repetitionLevels = vector.reserveNewColumn(capacity, DataTypes.IntegerType);
127115
}
128116
// We don't need to create and store definition levels if the column is top-level.
129117
if (!isTopLevel) {
130-
definitionLevels = allocateLevelsVector(capacity, memoryMode);
118+
definitionLevels = vector.reserveNewColumn(capacity, DataTypes.IntegerType);
131119
}
132120
} else {
133-
JavaUtils.checkArgument(column.children().size() == vector.getNumChildren(),
134-
"The number of column children is different from the number of vector children");
121+
// If a child is not present in the allocated vectors, it means we don't care about this
122+
// child's data, we just want to read its levels to help assemble some parent struct. So we
123+
// create a dummy vector below to hold the child's data. There can only be one such child.
124+
JavaUtils.checkArgument(column.children().size() == vector.getNumChildren() ||
125+
column.children().size() == vector.getNumChildren() + 1,
126+
"The number of column children is not equal to the number of vector children or that + 1");
135127
boolean allChildrenAreMissing = true;
136128

137129
for (int i = 0; i < column.children().size(); i++) {
138-
ParquetColumnVector childCv = new ParquetColumnVector(column.children().apply(i),
139-
vector.getChild(i), capacity, memoryMode, missingColumns, false, null);
130+
ParquetColumn childColumn = column.children().apply(i);
131+
WritableColumnVector childVector = i < vector.getNumChildren()
132+
? vector.getChild(i)
133+
: vector.reserveNewColumn(capacity, childColumn.sparkType());
134+
ParquetColumnVector childCv = new ParquetColumnVector(childColumn, childVector, capacity,
135+
missingColumns, /* isTopLevel= */ false, /* defaultValue= */ null);
140136
children.add(childCv);
141137

142-
143138
// Only use levels from non-missing child, this can happen if only some but not all
144139
// fields of a struct are missing.
145140
if (!childCv.vector.isAllNull()) {
@@ -375,13 +370,6 @@ private void assembleStruct() {
375370
vector.addElementsAppended(rowId);
376371
}
377372

378-
private static WritableColumnVector allocateLevelsVector(int capacity, MemoryMode memoryMode) {
379-
return switch (memoryMode) {
380-
case ON_HEAP -> new OnHeapColumnVector(capacity, DataTypes.IntegerType);
381-
case OFF_HEAP -> new OffHeapColumnVector(capacity, DataTypes.IntegerType);
382-
};
383-
}
384-
385373
/**
386374
* For a collection (i.e., array or map) element at index 'idx', returns the starting index of
387375
* the next collection after it.

sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/SpecificParquetRecordReaderBase.java

Lines changed: 22 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,8 @@ public abstract class SpecificParquetRecordReaderBase<T> extends RecordReader<Vo
8787

8888
protected ParquetRowGroupReader reader;
8989

90+
protected Configuration configuration;
91+
9092
@Override
9193
public void initialize(InputSplit inputSplit, TaskAttemptContext taskAttemptContext)
9294
throws IOException, InterruptedException {
@@ -99,7 +101,7 @@ public void initialize(
99101
Option<HadoopInputFile> inputFile,
100102
Option<SeekableInputStream> inputStream,
101103
Option<ParquetMetadata> fileFooter) throws IOException, InterruptedException {
102-
Configuration configuration = taskAttemptContext.getConfiguration();
104+
this.configuration = taskAttemptContext.getConfiguration();
103105
FileSplit split = (FileSplit) inputSplit;
104106
this.file = split.getPath();
105107
ParquetReadOptions options = HadoopReadOptions
@@ -164,22 +166,22 @@ public void initialize(
164166
* configurations.
165167
*/
166168
protected void initialize(String path, List<String> columns) throws IOException {
167-
Configuration config = new Configuration();
168-
config.setBoolean(SQLConf.PARQUET_BINARY_AS_STRING().key() , false);
169-
config.setBoolean(SQLConf.PARQUET_INT96_AS_TIMESTAMP().key(), false);
170-
config.setBoolean(SQLConf.CASE_SENSITIVE().key(), false);
171-
config.setBoolean(SQLConf.PARQUET_INFER_TIMESTAMP_NTZ_ENABLED().key(), false);
172-
config.setBoolean(SQLConf.LEGACY_PARQUET_NANOS_AS_LONG().key(), false);
169+
this.configuration = new Configuration();
170+
this.configuration.setBoolean(SQLConf.PARQUET_BINARY_AS_STRING().key() , false);
171+
this.configuration.setBoolean(SQLConf.PARQUET_INT96_AS_TIMESTAMP().key(), false);
172+
this.configuration.setBoolean(SQLConf.CASE_SENSITIVE().key(), false);
173+
this.configuration.setBoolean(SQLConf.PARQUET_INFER_TIMESTAMP_NTZ_ENABLED().key(), false);
174+
this.configuration.setBoolean(SQLConf.LEGACY_PARQUET_NANOS_AS_LONG().key(), false);
173175

174176
this.file = new Path(path);
175-
long length = this.file.getFileSystem(config).getFileStatus(this.file).getLen();
177+
long length = this.file.getFileSystem(configuration).getFileStatus(this.file).getLen();
176178

177179
ParquetReadOptions options = HadoopReadOptions
178-
.builder(config, file)
180+
.builder(configuration, file)
179181
.withRange(0, length)
180182
.build();
181183
ParquetFileReader fileReader = ParquetFileReader.open(
182-
HadoopInputFile.fromPath(file, config), options);
184+
HadoopInputFile.fromPath(file, configuration), options);
183185
this.reader = new ParquetRowGroupReaderImpl(fileReader);
184186
this.fileSchema = fileReader.getFooter().getFileMetaData().getSchema();
185187

@@ -201,9 +203,10 @@ protected void initialize(String path, List<String> columns) throws IOException
201203
}
202204
}
203205
fileReader.setRequestedSchema(requestedSchema);
204-
this.parquetColumn = new ParquetToSparkSchemaConverter(config)
206+
this.parquetColumn = new ParquetToSparkSchemaConverter(configuration)
205207
.convertParquetColumn(requestedSchema, Option.empty());
206208
this.sparkSchema = (StructType) parquetColumn.sparkType();
209+
this.sparkRequestedSchema = this.sparkSchema;
207210
this.totalRowCount = fileReader.getFilteredRecordCount();
208211
}
209212

@@ -216,15 +219,16 @@ protected void initialize(
216219
this.reader = rowGroupReader;
217220
this.fileSchema = fileSchema;
218221
this.requestedSchema = requestedSchema;
219-
Configuration config = new Configuration();
220-
config.setBoolean(SQLConf.PARQUET_BINARY_AS_STRING().key() , false);
221-
config.setBoolean(SQLConf.PARQUET_INT96_AS_TIMESTAMP().key(), false);
222-
config.setBoolean(SQLConf.CASE_SENSITIVE().key(), false);
223-
config.setBoolean(SQLConf.PARQUET_INFER_TIMESTAMP_NTZ_ENABLED().key(), false);
224-
config.setBoolean(SQLConf.LEGACY_PARQUET_NANOS_AS_LONG().key(), false);
225-
this.parquetColumn = new ParquetToSparkSchemaConverter(config)
222+
this.configuration = new Configuration();
223+
this.configuration.setBoolean(SQLConf.PARQUET_BINARY_AS_STRING().key() , false);
224+
this.configuration.setBoolean(SQLConf.PARQUET_INT96_AS_TIMESTAMP().key(), false);
225+
this.configuration.setBoolean(SQLConf.CASE_SENSITIVE().key(), false);
226+
this.configuration.setBoolean(SQLConf.PARQUET_INFER_TIMESTAMP_NTZ_ENABLED().key(), false);
227+
this.configuration.setBoolean(SQLConf.LEGACY_PARQUET_NANOS_AS_LONG().key(), false);
228+
this.parquetColumn = new ParquetToSparkSchemaConverter(configuration)
226229
.convertParquetColumn(requestedSchema, Option.empty());
227230
this.sparkSchema = (StructType) parquetColumn.sparkType();
231+
this.sparkRequestedSchema = this.sparkSchema;
228232
this.totalRowCount = totalRowCount;
229233
}
230234

sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedParquetRecordReader.java

Lines changed: 65 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -48,10 +48,10 @@
4848
import org.apache.spark.sql.execution.vectorized.OffHeapColumnVector;
4949
import org.apache.spark.sql.execution.vectorized.OnHeapColumnVector;
5050
import org.apache.spark.sql.execution.vectorized.WritableColumnVector;
51+
import org.apache.spark.sql.internal.SQLConf$;
52+
import org.apache.spark.sql.types.*;
5153
import org.apache.spark.sql.vectorized.ColumnVector;
5254
import org.apache.spark.sql.vectorized.ColumnarBatch;
53-
import org.apache.spark.sql.types.StructField;
54-
import org.apache.spark.sql.types.StructType;
5555

5656
/**
5757
* A specialized RecordReader that reads into InternalRows or ColumnarBatches directly using the
@@ -265,7 +265,15 @@ private void initBatch(
265265
MemoryMode memMode,
266266
StructType partitionColumns,
267267
InternalRow partitionValues) {
268-
StructType batchSchema = new StructType(sparkSchema.fields());
268+
boolean returnNullStructIfAllFieldsMissing = configuration.getBoolean(
269+
SQLConf$.MODULE$.LEGACY_PARQUET_RETURN_NULL_STRUCT_IF_ALL_FIELDS_MISSING().key(),
270+
(boolean) SQLConf$.MODULE$.LEGACY_PARQUET_RETURN_NULL_STRUCT_IF_ALL_FIELDS_MISSING()
271+
.defaultValue().get());
272+
StructType batchSchema = returnNullStructIfAllFieldsMissing
273+
? new StructType(sparkSchema.fields())
274+
// Truncate to match requested schema to make sure extra struct field that we read for
275+
// nullability is not included in columnarBatch and exposed outside.
276+
: (StructType) truncateType(sparkSchema, sparkRequestedSchema);
269277

270278
int constantColumnLength = 0;
271279
if (partitionColumns != null) {
@@ -287,7 +295,8 @@ private void initBatch(
287295
defaultValue = ResolveDefaultColumns.existenceDefaultValues(sparkRequestedSchema)[i];
288296
}
289297
columnVectors[i] = new ParquetColumnVector(parquetColumn.children().apply(i),
290-
(WritableColumnVector) vectors[i], capacity, memMode, missingColumns, true, defaultValue);
298+
(WritableColumnVector) vectors[i], capacity, missingColumns, /* isTopLevel= */ true,
299+
defaultValue);
291300
}
292301

293302
if (partitionColumns != null) {
@@ -309,6 +318,58 @@ public void initBatch(StructType partitionColumns, InternalRow partitionValues)
309318
initBatch(MEMORY_MODE, partitionColumns, partitionValues);
310319
}
311320

321+
/**
322+
* Keeps the hierarchy and fields of readType, recursively truncating struct fields from the end
323+
* of the fields list to match the same number of fields in requestedType. This is used to get rid
324+
* of the extra fields that are added to the structs when the fields we wanted to read initially
325+
* were missing in the file schema. So this returns a type that we would be reading if everything
326+
* was present in the file, matching Spark's expected schema.
327+
*
328+
* <p> Example: <pre>{@code
329+
* readType: array<struct<a:int,b:long,c:int>>
330+
* requestedType: array<struct<a:int,b:long>>
331+
* returns: array<struct<a:int,b:long>>
332+
* }</pre>
333+
* We cannot return requestedType here because there might be slight differences, like nullability
334+
* of fields or the type precision (smallint/int)
335+
*/
336+
@VisibleForTesting
337+
static DataType truncateType(DataType readType, DataType requestedType) {
338+
if (requestedType instanceof UserDefinedType<?> requestedUDT) {
339+
requestedType = requestedUDT.sqlType();
340+
}
341+
342+
if (readType instanceof StructType readStruct &&
343+
requestedType instanceof StructType requestedStruct) {
344+
StructType result = new StructType();
345+
for (int i = 0; i < requestedStruct.fields().length; i++) {
346+
StructField readField = readStruct.fields()[i];
347+
StructField requestedField = requestedStruct.fields()[i];
348+
DataType truncatedType = truncateType(readField.dataType(), requestedField.dataType());
349+
result = result.add(readField.copy(
350+
readField.name(), truncatedType, readField.nullable(), readField.metadata()));
351+
}
352+
return result;
353+
}
354+
355+
if (readType instanceof ArrayType readArray &&
356+
requestedType instanceof ArrayType requestedArray) {
357+
DataType truncatedElementType = truncateType(
358+
readArray.elementType(), requestedArray.elementType());
359+
return readArray.copy(truncatedElementType, readArray.containsNull());
360+
}
361+
362+
if (readType instanceof MapType readMap && requestedType instanceof MapType requestedMap) {
363+
DataType truncatedKeyType = truncateType(readMap.keyType(), requestedMap.keyType());
364+
DataType truncatedValueType = truncateType(readMap.valueType(), requestedMap.valueType());
365+
return readMap.copy(truncatedKeyType, truncatedValueType, readMap.valueContainsNull());
366+
}
367+
368+
assert !ParquetSchemaConverter.isComplexType(readType);
369+
assert !ParquetSchemaConverter.isComplexType(requestedType);
370+
return readType;
371+
}
372+
312373
/**
313374
* Returns the ColumnarBatch object that will be used for all rows returned by this reader.
314375
* This object is reused. Calling this enables the vectorized reader. This should be called

sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -633,7 +633,7 @@ protected void reserveInternal(int newCapacity) {
633633
}
634634

635635
@Override
636-
protected OffHeapColumnVector reserveNewColumn(int capacity, DataType type) {
636+
public OffHeapColumnVector reserveNewColumn(int capacity, DataType type) {
637637
return new OffHeapColumnVector(capacity, type);
638638
}
639639
}

sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OnHeapColumnVector.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -646,7 +646,7 @@ protected void reserveInternal(int newCapacity) {
646646
}
647647

648648
@Override
649-
protected OnHeapColumnVector reserveNewColumn(int capacity, DataType type) {
649+
public OnHeapColumnVector reserveNewColumn(int capacity, DataType type) {
650650
return new OnHeapColumnVector(capacity, type);
651651
}
652652
}

sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/WritableColumnVector.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -944,7 +944,7 @@ public final boolean isAllNull() {
944944
/**
945945
* Reserve a new column.
946946
*/
947-
protected abstract WritableColumnVector reserveNewColumn(int capacity, DataType type);
947+
public abstract WritableColumnVector reserveNewColumn(int capacity, DataType type);
948948

949949
protected boolean isArray() {
950950
return type instanceof ArrayType || type instanceof BinaryType || type instanceof StringType ||

0 commit comments

Comments
 (0)