Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@
import io.substrait.proto.NamedStruct;
import io.substrait.proto.ReadRel;
import io.substrait.proto.Type;
import org.apache.spark.sql.types.ArrayType;
import org.apache.spark.sql.types.DataType;
import org.apache.spark.sql.types.MapType;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;

Expand Down Expand Up @@ -111,6 +114,18 @@ private NamedStruct buildNamedStruct() {

Type.Struct.Builder structBuilder = Type.Struct.newBuilder();
for (StructField field : fileSchema.fields()) {
// A column that contains a VariantType cannot be encoded in a Substrait NamedStruct
// because ConverterUtils#getTypeNode has no representation for it. If such a column
// is actually projected from this scan, validation in VeloxBackend rejects the plan and
// the whole stage falls back to vanilla Spark; we never reach this code path. If the
// column is *not* projected, however, it still ends up here because the file schema
// contains every data column of the relation regardless of projection. Skipping the
// field is safe: the native reader looks up requested columns by name, so an entry
// that is not requested is unused, and an entry that is missing from the file schema
// simply means the native reader will not see/validate that column.
if (containsUnsupportedFileSchemaType(field.dataType())) {
continue;
}
structBuilder.addTypes(
ConverterUtils.getTypeNode(field.dataType(), field.nullable()).toProtobuf());
namedStructBuilder.addNames(ConverterUtils.normalizeColName(field.name()));
Expand All @@ -120,6 +135,33 @@ private NamedStruct buildNamedStruct() {
return namedStructBuilder.build();
}

// Returns true if `dt` is, or recursively contains, a Spark type that has no Substrait
// representation in ConverterUtils#getTypeNode. Today this is only Spark 4.0's VariantType,
// matched by name to remain source-compatible with shims that target older Spark versions.
// Package-private for unit tests.
static boolean containsUnsupportedFileSchemaType(DataType dt) {
if ("variant".equals(dt.typeName())) {
return true;
}
if (dt instanceof StructType) {
for (StructField f : ((StructType) dt).fields()) {
if (containsUnsupportedFileSchemaType(f.dataType())) {
return true;
}
}
return false;
}
if (dt instanceof ArrayType) {
return containsUnsupportedFileSchemaType(((ArrayType) dt).elementType());
}
if (dt instanceof MapType) {
MapType m = (MapType) dt;
return containsUnsupportedFileSchemaType(m.keyType())
|| containsUnsupportedFileSchemaType(m.valueType());
}
return false;
}

@Override
public List<String> preferredLocations() {
return this.preferredLocations;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,174 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.gluten.substrait.rel

import org.apache.gluten.substrait.rel.LocalFilesNode.ReadFileFormat

import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types._

import io.substrait.proto.ReadRel
import org.scalatest.funsuite.AnyFunSuiteLike

import java.util.{Collections => JCollections}

/**
* Unit tests for `LocalFilesNode.containsUnsupportedFileSchemaType`. The method is package-private
* (visible-for-testing) so we can invoke it directly from this suite without spinning up a full
* SparkSession.
*
* VariantType only exists in Spark 4.0+, so the leaf case is exercised through a stub `DataType`
* whose `typeName` matches the production code's `typeName == "variant"` check. This keeps the
* suite source-compatible with shims that target older Spark versions.
*/
class LocalFilesNodeUnsupportedTypeSuite extends AnyFunSuiteLike {

// Stub DataType that pretends to be Spark's VariantType for the typeName-based detection.
private object FakeVariantType extends DataType {
override def defaultSize: Int = 16
override def asNullable: DataType = this
override def typeName: String = "variant"
}

private def check(dt: DataType): Boolean =
LocalFilesNode.containsUnsupportedFileSchemaType(dt)

test("returns true for a bare variant type") {
assert(check(FakeVariantType))
}

test("returns true for a struct that contains a variant field") {
val schema = StructType(
Seq(
StructField("id", IntegerType, nullable = false),
StructField("data", FakeVariantType, nullable = true)))
assert(check(schema))
}

test("returns true for a deeply nested struct that contains a variant field") {
val schema = StructType(
Seq(
StructField(
"outer",
StructType(Seq(StructField("inner", FakeVariantType, nullable = true))),
nullable = true)))
assert(check(schema))
}

test("returns true for an array whose element contains variant") {
val arr = ArrayType(FakeVariantType, containsNull = true)
assert(check(arr))
val arrOfStruct = ArrayType(
StructType(Seq(StructField("v", FakeVariantType, nullable = true))),
containsNull = true)
assert(check(arrOfStruct))
}

test("returns true for a map whose key or value contains variant") {
val mapValueVariant = MapType(StringType, FakeVariantType, valueContainsNull = true)
assert(check(mapValueVariant))
val mapKeyVariant = MapType(FakeVariantType, IntegerType, valueContainsNull = false)
assert(check(mapKeyVariant))
}

test("returns false for primitive types and supported nested types without variant") {
Seq(
IntegerType,
LongType,
ShortType,
ByteType,
FloatType,
DoubleType,
BooleanType,
StringType,
BinaryType,
DateType,
TimestampType,
DecimalType(10, 2)
).foreach(dt => assert(!check(dt), s"$dt should be supported"))

val supportedNested = StructType(
Seq(
StructField("a", IntegerType),
StructField("b", ArrayType(StringType)),
StructField("c", MapType(StringType, StructType(Seq(StructField("x", DoubleType))))),
StructField("d", StructType(Seq(StructField("y", BinaryType))))
))
assert(!check(supportedNested))
assert(!check(ArrayType(IntegerType, containsNull = true)))
assert(!check(MapType(StringType, LongType)))
}

// Integration tests that exercise buildNamedStruct() indirectly via toProtobuf(),
// verifying names-types synchronisation when variant fields are skipped.

private def createLocalFilesNode(): LocalFilesNode = {
new LocalFilesNode(
0,
JCollections.singletonList("file:///fake/path"),
JCollections.singletonList(java.lang.Long.valueOf(0L)),
JCollections.singletonList(java.lang.Long.valueOf(1024L)),
JCollections.emptyList(),
JCollections.emptyList(),
JCollections.singletonList(JCollections.emptyMap()),
JCollections.singletonList(JCollections.emptyMap()),
ReadFileFormat.ParquetReadFormat,
JCollections.emptyList(),
JCollections.emptyMap(),
JCollections.singletonList(JCollections.emptyMap())
)
}

test("buildNamedStruct omits variant fields and keeps names in sync with types") {
SQLConf.withExistingConf(new SQLConf()) {
val schema = StructType(
Seq(
StructField("id", IntegerType, nullable = false),
StructField("data", FakeVariantType, nullable = true),
StructField("ts", TimestampType, nullable = true)))

val node = createLocalFilesNode()
node.setFileSchema(schema)

val proto = node.toProtobuf().asInstanceOf[ReadRel.LocalFiles]
val ns = proto.getItems(0).getSchema

assert(ns.getNamesCount === 2, "expected 2 names (variant field skipped)")
assert(ns.getStruct.getTypesCount === 2, "expected 2 types (in sync with names)")
assert(ns.getNames(0) === "id")
assert(ns.getNames(1) === "ts")
}
}

test("buildNamedStruct produces empty struct when all fields are unsupported") {
SQLConf.withExistingConf(new SQLConf()) {
val schema = StructType(
Seq(
StructField("v1", FakeVariantType, nullable = true),
StructField("v2", FakeVariantType, nullable = true)))

val node = createLocalFilesNode()
node.setFileSchema(schema)

val proto = node.toProtobuf().asInstanceOf[ReadRel.LocalFiles]
val ns = proto.getItems(0).getSchema

assert(ns.getNamesCount === 0)
assert(ns.getStruct.getTypesCount === 0)
}
}
}
Loading