From 2040335fa00e39e1264110f12511c4bb7b784e36 Mon Sep 17 00:00:00 2001 From: Szehon Ho Date: Fri, 5 Dec 2025 16:30:00 -0800 Subject: [PATCH 1/4] [SPARK-54621][SQL] Merge Into Update Set * preserve nested fields if coerceNestedTypes is enabled --- .../catalyst/analysis/AssignmentUtils.scala | 196 +++++++++++++++++- .../connector/MergeIntoTableSuiteBase.scala | 9 +- 2 files changed, 189 insertions(+), 16 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/AssignmentUtils.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/AssignmentUtils.scala index 6c7b0626e81e..61706dd9b8de 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/AssignmentUtils.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/AssignmentUtils.scala @@ -21,7 +21,8 @@ import scala.collection.mutable import org.apache.spark.sql.catalyst.SQLConfHelper import org.apache.spark.sql.catalyst.analysis.TableOutputResolver.DefaultValueFillMode.{NONE, RECURSE} -import org.apache.spark.sql.catalyst.expressions.{Attribute, CreateNamedStruct, Expression, GetStructField, Literal} +import org.apache.spark.sql.catalyst.expressions.{And, Attribute, CreateNamedStruct, Expression, GetStructField, If, IsNull, Literal} +import org.apache.spark.sql.catalyst.expressions.objects.AssertNotNull import org.apache.spark.sql.catalyst.plans.logical.Assignment import org.apache.spark.sql.catalyst.types.DataTypeUtils import org.apache.spark.sql.catalyst.util.CharVarcharUtils @@ -55,6 +56,7 @@ object AssignmentUtils extends SQLConfHelper with CastSupport { * (preserving existing fields). * @param coerceNestedTypes whether to coerce nested types to match the target type * for complex types + * @param missingSourcePaths paths that exist in target but not in source * @return aligned update assignments that match table attributes */ def alignUpdateAssignments( @@ -72,7 +74,8 @@ object AssignmentUtils extends SQLConfHelper with CastSupport { assignments, addError = err => errors += err, colPath = Seq(attr.name), - coerceNestedTypes) + coerceNestedTypes, + fromStar) } if (errors.nonEmpty) { @@ -156,7 +159,8 @@ object AssignmentUtils extends SQLConfHelper with CastSupport { assignments: Seq[Assignment], addError: String => Unit, colPath: Seq[String], - coerceNestedTypes: Boolean = false): Expression = { + coerceNestedTypes: Boolean = false, + updateStar: Boolean = false): Expression = { val (exactAssignments, otherAssignments) = assignments.partition { assignment => assignment.key.semanticEquals(colExpr) @@ -178,11 +182,31 @@ object AssignmentUtils extends SQLConfHelper with CastSupport { } else if (exactAssignments.isEmpty && fieldAssignments.isEmpty) { TableOutputResolver.checkNullability(colExpr, col, conf, colPath) } else if (exactAssignments.nonEmpty) { - val value = exactAssignments.head.value - val coerceMode = if (coerceNestedTypes) RECURSE else NONE - val resolvedValue = TableOutputResolver.resolveUpdate("", value, col, conf, addError, - colPath, coerceMode) - resolvedValue + if (updateStar) { + val value = exactAssignments.head.value + col.dataType match { + case structType: StructType => + // Expand assignments to leaf fields + val structAssignment = + applyNestedFieldAssignments(col, colExpr, value, addError, colPath, + coerceNestedTypes) + + // Wrap with null check for missing source fields + fixNullExpansion(col, value, structType, structAssignment, + colPath, addError) + case _ => + // For non-struct types, resolve directly + val coerceMode = if (coerceNestedTypes) RECURSE else NONE + TableOutputResolver.resolveUpdate("", value, col, conf, addError, colPath, + coerceMode) + } + } else { + val value = exactAssignments.head.value + val coerceMode = if (coerceNestedTypes) RECURSE else NONE + val resolvedValue = TableOutputResolver.resolveUpdate("", value, col, conf, addError, + colPath, coerceMode) + resolvedValue + } } else { applyFieldAssignments(col, colExpr, fieldAssignments, addError, colPath, coerceNestedTypes) } @@ -194,7 +218,7 @@ object AssignmentUtils extends SQLConfHelper with CastSupport { assignments: Seq[Assignment], addError: String => Unit, colPath: Seq[String], - coerceNestedTypes: Boolean): Expression = { + coerceNestedTyptes: Boolean): Expression = { col.dataType match { case structType: StructType => @@ -204,14 +228,71 @@ object AssignmentUtils extends SQLConfHelper with CastSupport { } val updatedFieldExprs = fieldAttrs.zip(fieldExprs).map { case (fieldAttr, fieldExpr) => applyAssignments(fieldAttr, fieldExpr, assignments, addError, colPath :+ fieldAttr.name, - coerceNestedTypes) + coerceNestedTyptes) + } + toNamedStruct(structType, updatedFieldExprs) + + case otherType => + addError( + "Updating nested fields is only supported for StructType but " + + s"'${colPath.quoted}' is of type $otherType") + colExpr + } + } + + private def applyNestedFieldAssignments( + col: Attribute, + colExpr: Expression, + value: Expression, + addError: String => Unit, + colPath: Seq[String], + coerceNestedTyptes: Boolean): Expression = { + + col.dataType match { + case structType: StructType => + val fieldAttrs = DataTypeUtils.toAttributes(structType) + + val updatedFieldExprs = fieldAttrs.zipWithIndex.map { case (fieldAttr, ordinal) => + val fieldPath = colPath :+ fieldAttr.name + val targetFieldExpr = GetStructField(colExpr, ordinal, Some(fieldAttr.name)) + + // Try to find a corresponding field in the source value by name + val sourceFieldValue: Expression = value.dataType match { + case valueStructType: StructType => + valueStructType.fields.find(f => conf.resolver(f.name, fieldAttr.name)) match { + case Some(matchingField) => + // Found matching field in source, extract it + val fieldIndex = valueStructType.fieldIndex(matchingField.name) + GetStructField(value, fieldIndex, Some(matchingField.name)) + case None => + // Field doesn't exist in source, use target's current value with null check + TableOutputResolver.checkNullability(targetFieldExpr, fieldAttr, conf, fieldPath) + } + case _ => + // Value is not a struct, cannot extract field + addError(s"Cannot assign non-struct value to struct field '${fieldPath.quoted}'") + Literal(null, fieldAttr.dataType) + } + + // Recurse or resolve based on field type + fieldAttr.dataType match { + case nestedStructType: StructType => + // Field is a struct, recurse + applyNestedFieldAssignments(fieldAttr, targetFieldExpr, sourceFieldValue, + addError, fieldPath, coerceNestedTyptes) + case _ => + // Field is not a struct, resolve with TableOutputResolver + val coerceMode = if (coerceNestedTyptes) RECURSE else NONE + TableOutputResolver.resolveUpdate("", sourceFieldValue, fieldAttr, conf, addError, + fieldPath, coerceMode) + } } toNamedStruct(structType, updatedFieldExprs) case otherType => addError( "Updating nested fields is only supported for StructType but " + - s"'${colPath.quoted}' is of type $otherType") + s"'${colPath.quoted}' is of type $otherType") colExpr } } @@ -223,6 +304,99 @@ object AssignmentUtils extends SQLConfHelper with CastSupport { CreateNamedStruct(namedStructExprs) } + private def getMissingSourcePaths(targetType: StructType, + sourceType: DataType, + colPath: Seq[String], + addError: String => Unit): Seq[Seq[String]] = { + val nestedTargetPaths = DataTypeUtils.extractLeafFieldPaths(targetType, Seq.empty) + val nestedSourcePaths = sourceType match { + case sourceStructType: StructType => + DataTypeUtils.extractLeafFieldPaths(sourceStructType, Seq.empty) + case _ => + addError(s"Value for struct type: " + + s"${colPath.quoted} must be a struct but was ${sourceType.simpleString}") + Seq() + } + nestedSourcePaths.diff(nestedTargetPaths) + } + + /** + * Creates a null check for a field at the given path within a struct expression. + * Navigates through the struct hierarchy following the path and returns an IsNull check + * for the final field. + * + * @param rootExpr the root expression to navigate from + * @param path the field path to navigate (sequence of field names) + * @return an IsNull expression checking if the field at the path is null + */ + private def createNullCheckForFieldPath( + rootExpr: Expression, + path: Seq[String]): Expression = { + var currentExpr: Expression = rootExpr + path.foreach { fieldName => + currentExpr.dataType match { + case st: StructType => + st.fields.find(f => conf.resolver(f.name, fieldName)) match { + case Some(field) => + val fieldIndex = st.fieldIndex(field.name) + currentExpr = GetStructField(currentExpr, fieldIndex, Some(field.name)) + case None => // No-op, should error later in TableOutputResolver + } + case _ => // Not a struct- no-op, should error later in TableOutputResolver + } + } + IsNull(currentExpr) + } + + /** + * As UPDATE SET * can assign struct fields individually (preserving existing fields), + * this will lead to null expansion, ie, a struct is created where all fields are null. + * Wraps a struct assignment with null checks for the source and missing source fields. + * Return null if all are null. + * + * @param col the target column attribute + * @param value the source value expression + * @param structType the target struct type + * @param structAssignment the struct assignment result to wrap + * @param colPath the column path for error reporting + * @param addError error reporting function + * @return the wrapped expression with null checks + */ + private def fixNullExpansion( + col: Attribute, + value: Expression, + structType: StructType, + structAssignment: Expression, + colPath: Seq[String], + addError: String => Unit): Expression = { + // As StoreAssignmentPolicy.LEGACY is not allowed in DSv2, always add null check for + // non-nullable column + if (!col.nullable) { + AssertNotNull(value) + } else { + // Check if source struct is null + val valueIsNull = IsNull(value) + + // Check if missing source paths (paths in target but not in source) are not null + // These will be null for the case of UPDATE SET * and + val missingSourcePaths = getMissingSourcePaths(structType, value.dataType, colPath, addError) + val condition = if (missingSourcePaths.nonEmpty) { + // Check if all target attributes at missing source paths are null + val missingFieldNullChecks = missingSourcePaths.map { path => + createNullCheckForFieldPath(col, path) + } + // Combine all null checks with AND + val allMissingFieldsNull = missingFieldNullChecks.reduce[Expression]((a, b) => And(a, b)) + And(valueIsNull, allMissingFieldsNull) + } else { + valueIsNull + } + + // Return: If (condition) THEN NULL ELSE structAssignment + If(condition, Literal(null, structAssignment.dataType), structAssignment) + } + } + /** * Checks whether assignments are aligned and compatible with table columns. * diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/MergeIntoTableSuiteBase.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/MergeIntoTableSuiteBase.scala index 5d1173b5a1a5..3d71168eaa33 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/MergeIntoTableSuiteBase.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/MergeIntoTableSuiteBase.scala @@ -3240,9 +3240,8 @@ abstract class MergeIntoTableSuiteBase extends RowLevelOperationSuiteBase checkAnswer( sql(s"SELECT * FROM $tableNameAsString"), Seq( - Row(1, Row(10, Row(null, Map("c" -> "d"), false)), "sales"), + Row(1, Row(10, Row(Seq(1, 2), Map("c" -> "d"), false)), "sales"), Row(2, Row(20, Row(null, Map("e" -> "f"), true)), "engineering"))) - } else { val exception = intercept[org.apache.spark.sql.AnalysisException] { sql(mergeStmt) @@ -5258,8 +5257,8 @@ abstract class MergeIntoTableSuiteBase extends RowLevelOperationSuiteBase checkAnswer( sql(s"SELECT * FROM $tableNameAsString"), Seq( - Row(1, Row(10, Row(20, null)), "sales"), - Row(2, Row(20, Row(30, null)), "engineering"))) + Row(1, Row(10, Row(20, true)), "sales"), + Row(2, Row(20, Row(30, false)), "engineering"))) } else { val exception = intercept[org.apache.spark.sql.AnalysisException] { sql(mergeStmt) @@ -5918,7 +5917,7 @@ abstract class MergeIntoTableSuiteBase extends RowLevelOperationSuiteBase checkAnswer( sql(s"SELECT * FROM $tableNameAsString"), Seq( - Row(1, Row(10, Row(null, Map("c" -> "d"), false)), "sales"), + Row(1, Row(10, Row(Seq(1, 2), Map("c" -> "d"), false)), "sales"), Row(2, Row(20, Row(null, Map("e" -> "f"), true)), "engineering"))) } else { val exception = intercept[org.apache.spark.sql.AnalysisException] { From 3485d55ddc1ddc98f5b80f664c93b1420369b266 Mon Sep 17 00:00:00 2001 From: Szehon Ho Date: Thu, 11 Dec 2025 21:54:10 -0800 Subject: [PATCH 2/4] Fix the condition to preserve nulls (source is null AND (target is null OR target has extra fields) --- .../catalyst/analysis/AssignmentUtils.scala | 68 +- .../connector/MergeIntoTableSuiteBase.scala | 628 +++++++++++++++++- 2 files changed, 642 insertions(+), 54 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/AssignmentUtils.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/AssignmentUtils.scala index 61706dd9b8de..d245ccd777a9 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/AssignmentUtils.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/AssignmentUtils.scala @@ -21,7 +21,7 @@ import scala.collection.mutable import org.apache.spark.sql.catalyst.SQLConfHelper import org.apache.spark.sql.catalyst.analysis.TableOutputResolver.DefaultValueFillMode.{NONE, RECURSE} -import org.apache.spark.sql.catalyst.expressions.{And, Attribute, CreateNamedStruct, Expression, GetStructField, If, IsNull, Literal} +import org.apache.spark.sql.catalyst.expressions.{And, Attribute, CreateNamedStruct, Expression, GetStructField, If, IsNull, Literal, Or} import org.apache.spark.sql.catalyst.expressions.objects.AssertNotNull import org.apache.spark.sql.catalyst.plans.logical.Assignment import org.apache.spark.sql.catalyst.types.DataTypeUtils @@ -317,42 +317,22 @@ object AssignmentUtils extends SQLConfHelper with CastSupport { s"${colPath.quoted} must be a struct but was ${sourceType.simpleString}") Seq() } - nestedSourcePaths.diff(nestedTargetPaths) + nestedTargetPaths.diff(nestedSourcePaths) } /** - * Creates a null check for a field at the given path within a struct expression. - * Navigates through the struct hierarchy following the path and returns an IsNull check - * for the final field. + * As UPDATE SET * assigns struct fields individually (preserving existing fields), + * this will lead to indiscriminate null expansion, ie, a struct is created where all + * fields are null. Wraps a struct assignment with a condition to return null + * if both conditions are true: * - * @param rootExpr the root expression to navigate from - * @param path the field path to navigate (sequence of field names) - * @return an IsNull expression checking if the field at the path is null - */ - private def createNullCheckForFieldPath( - rootExpr: Expression, - path: Seq[String]): Expression = { - var currentExpr: Expression = rootExpr - path.foreach { fieldName => - currentExpr.dataType match { - case st: StructType => - st.fields.find(f => conf.resolver(f.name, fieldName)) match { - case Some(field) => - val fieldIndex = st.fieldIndex(field.name) - currentExpr = GetStructField(currentExpr, fieldIndex, Some(field.name)) - case None => // No-op, should error later in TableOutputResolver - } - case _ => // Not a struct- no-op, should error later in TableOutputResolver - } - } - IsNull(currentExpr) - } - - /** - * As UPDATE SET * can assign struct fields individually (preserving existing fields), - * this will lead to null expansion, ie, a struct is created where all fields are null. - * Wraps a struct assignment with null checks for the source and missing source fields. - * Return null if all are null. + * - source struct is null + * - target struct is null OR target struct is same as source struct + * + * If the condition is not true, we preserve the original structure. + * This includes cases where the source was a struct of nulls, + * or there were any extra target fields (including null ones), + * both cases retain the assignment to a struct of nulls. * * @param col the target column attribute * @param value the source value expression @@ -374,23 +354,13 @@ object AssignmentUtils extends SQLConfHelper with CastSupport { if (!col.nullable) { AssertNotNull(value) } else { - // Check if source struct is null - val valueIsNull = IsNull(value) - - // Check if missing source paths (paths in target but not in source) are not null - // These will be null for the case of UPDATE SET * and + // Check if there are missing source paths (nested fields in target but not in source) val missingSourcePaths = getMissingSourcePaths(structType, value.dataType, colPath, addError) - val condition = if (missingSourcePaths.nonEmpty) { - // Check if all target attributes at missing source paths are null - val missingFieldNullChecks = missingSourcePaths.map { path => - createNullCheckForFieldPath(col, path) - } - // Combine all null checks with AND - val allMissingFieldsNull = missingFieldNullChecks.reduce[Expression]((a, b) => And(a, b)) - And(valueIsNull, allMissingFieldsNull) - } else { - valueIsNull - } + val missingPathsEmpty = + if (missingSourcePaths.isEmpty) Literal.TrueLiteral else Literal.FalseLiteral + + // Condition: (source struct IS NULL) AND (target struct IS NULL OR missingSourcePaths empty) + val condition = And(IsNull(value), Or(IsNull(col), missingPathsEmpty)) // Return: If (condition) THEN NULL ELSE structAssignment If(condition, Literal(null, structAssignment.dataType), structAssignment) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/MergeIntoTableSuiteBase.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/MergeIntoTableSuiteBase.scala index 3d71168eaa33..b5faa521000e 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/MergeIntoTableSuiteBase.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/MergeIntoTableSuiteBase.scala @@ -4775,6 +4775,292 @@ abstract class MergeIntoTableSuiteBase extends RowLevelOperationSuiteBase sql(s"DROP TABLE IF EXISTS $tableNameAsString") } + test("merge with struct of nulls") { + withTempView("source") { + createAndInitTable( + s"""pk INT NOT NULL, + |s STRUCT, + |dep STRING""".stripMargin, + """{ "pk": 0, "s": { "c1": 1, "c2": "a" }, "dep": "sales" } + |{ "pk": 1, "s": { "c1": 2, "c2": "b" }, "dep": "hr" }""" + .stripMargin) + + // Source table matches target table schema + val sourceTableSchema = StructType(Seq( + StructField("pk", IntegerType), + StructField("s", StructType(Seq( + StructField("c1", IntegerType), + StructField("c2", StringType) + ))), + StructField("dep", StringType) + )) + + // Source has a struct with null field values (not a null struct) + val data = Seq( + Row(1, Row(null, null), "engineering"), + Row(2, Row(null, null), "finance") + ) + spark.createDataFrame(spark.sparkContext.parallelize(data), sourceTableSchema) + .createOrReplaceTempView("source") + + sql( + s"""MERGE INTO $tableNameAsString t USING source + |ON t.pk = source.pk + |WHEN MATCHED THEN + | UPDATE SET * + |WHEN NOT MATCHED THEN + | INSERT * + |""".stripMargin) + // Struct of null values should be preserved, not converted to null struct + checkAnswer( + sql(s"SELECT * FROM $tableNameAsString"), + Seq( + Row(0, Row(1, "a"), "sales"), + Row(1, Row(null, null), "engineering"), + Row(2, Row(null, null), "finance"))) + } + sql(s"DROP TABLE IF EXISTS $tableNameAsString") + } + + test("merge with null struct into struct of nulls") { + withTempView("source") { + createAndInitTable( + s"""pk INT NOT NULL, + |s STRUCT, + |dep STRING""".stripMargin, + """{ "pk": 0, "s": { "c1": 1, "c2": "a" }, "dep": "sales" } + |{ "pk": 1, "s": { "c1": null, "c2": null }, "dep": "hr" }""" + .stripMargin) + + // Source table matches target table schema + val sourceTableSchema = StructType(Seq( + StructField("pk", IntegerType), + StructField("s", StructType(Seq( + StructField("c1", IntegerType), + StructField("c2", StringType) + ))), + StructField("dep", StringType) + )) + + // Source has a null struct (not a struct of nulls) + val data = Seq( + Row(1, null, "engineering") + ) + spark.createDataFrame(spark.sparkContext.parallelize(data), sourceTableSchema) + .createOrReplaceTempView("source") + + sql( + s"""MERGE INTO $tableNameAsString t USING source + |ON t.pk = source.pk + |WHEN MATCHED THEN + | UPDATE SET * + |WHEN NOT MATCHED THEN + | INSERT * + |""".stripMargin) + // Null struct should override struct of nulls + checkAnswer( + sql(s"SELECT * FROM $tableNameAsString"), + Seq( + Row(0, Row(1, "a"), "sales"), + Row(1, null, "engineering"))) + } + sql(s"DROP TABLE IF EXISTS $tableNameAsString") + } + + test("merge with null struct into struct of nulls with extra target field") { + Seq(true, false).foreach { withSchemaEvolution => + Seq(true, false).foreach { coerceNestedTypes => + withSQLConf(SQLConf.MERGE_INTO_NESTED_TYPE_COERCION_ENABLED.key -> + coerceNestedTypes.toString) { + withTempView("source") { + // Target has struct with 3 fields, row 1 has all nulls including extra field c3 + createAndInitTable( + s"""pk INT NOT NULL, + |s STRUCT, + |dep STRING""".stripMargin, + """{ "pk": 0, "s": { "c1": 1, "c2": "a", "c3": 10 }, "dep": "sales" } + |{ "pk": 1, "s": { "c1": null, "c2": null, "c3": null }, "dep": "hr" }""" + .stripMargin) + + // Source table has struct with 2 fields (missing c3) + val sourceTableSchema = StructType(Seq( + StructField("pk", IntegerType), + StructField("s", StructType(Seq( + StructField("c1", IntegerType), + StructField("c2", StringType) + // missing field c3 + ))), + StructField("dep", StringType) + )) + + // Source has a null struct (not a struct of nulls) + val data = Seq( + Row(1, null, "engineering") + ) + spark.createDataFrame(spark.sparkContext.parallelize(data), sourceTableSchema) + .createOrReplaceTempView("source") + + val schemaEvolutionClause = if (withSchemaEvolution) "WITH SCHEMA EVOLUTION" else "" + val mergeStmt = + s"""MERGE $schemaEvolutionClause INTO $tableNameAsString t USING source + |ON t.pk = source.pk + |WHEN MATCHED THEN + | UPDATE SET * + |WHEN NOT MATCHED THEN + | INSERT * + |""".stripMargin + + if (coerceNestedTypes && withSchemaEvolution) { + sql(mergeStmt) + // Because target has extra field c3, we preserve struct of nulls + checkAnswer( + sql(s"SELECT * FROM $tableNameAsString"), + Seq( + Row(0, Row(1, "a", 10), "sales"), + Row(1, Row(null, null, null), "engineering"))) + } else { + val exception = intercept[org.apache.spark.sql.AnalysisException] { + sql(mergeStmt) + } + assert(exception.errorClass.get == + "INCOMPATIBLE_DATA_FOR_TABLE.CANNOT_FIND_DATA") + } + } + } + sql(s"DROP TABLE IF EXISTS $tableNameAsString") + } + } + } + + test("merge with struct of nulls with missing source field") { + Seq(true, false).foreach { withSchemaEvolution => + Seq(true, false).foreach { coerceNestedTypes => + withSQLConf(SQLConf.MERGE_INTO_NESTED_TYPE_COERCION_ENABLED.key -> + coerceNestedTypes.toString) { + withTempView("source") { + // Target has struct with 3 fields + createAndInitTable( + s"""pk INT NOT NULL, + |s STRUCT, + |dep STRING""".stripMargin, + """{ "pk": 0, "s": { "c1": 1, "c2": "a", "c3": 10 }, "dep": "sales" } + |{ "pk": 1, "s": { "c1": 2, "c2": "b", "c3": 20 }, "dep": "hr" }""" + .stripMargin) + + // Source table has struct with 2 fields (missing field c3) + val sourceTableSchema = StructType(Seq( + StructField("pk", IntegerType), + StructField("s", StructType(Seq( + StructField("c1", IntegerType), + StructField("c2", StringType) + // missing field c3 + ))), + StructField("dep", StringType) + )) + + // Source has a struct with two null field values (not a null struct) + val data = Seq( + Row(1, Row(null, null), "engineering") + ) + spark.createDataFrame(spark.sparkContext.parallelize(data), sourceTableSchema) + .createOrReplaceTempView("source") + + val schemaEvolutionClause = if (withSchemaEvolution) "WITH SCHEMA EVOLUTION" else "" + val mergeStmt = + s"""MERGE $schemaEvolutionClause INTO $tableNameAsString t USING source + |ON t.pk = source.pk + |WHEN MATCHED THEN + | UPDATE SET * + |WHEN NOT MATCHED THEN + | INSERT * + |""".stripMargin + + if (coerceNestedTypes && withSchemaEvolution) { + sql(mergeStmt) + // Struct of null values should be preserved, not converted to null struct + checkAnswer( + sql(s"SELECT * FROM $tableNameAsString"), + Seq( + Row(0, Row(1, "a", 10), "sales"), + Row(1, Row(null, null, 20), "engineering"))) + } else { + val exception = intercept[org.apache.spark.sql.AnalysisException] { + sql(mergeStmt) + } + assert(exception.errorClass.get == + "INCOMPATIBLE_DATA_FOR_TABLE.CANNOT_FIND_DATA") + } + } + } + sql(s"DROP TABLE IF EXISTS $tableNameAsString") + } + } + } + + test("merge with struct of nulls with missing source field and null target field") { + Seq(true, false).foreach { withSchemaEvolution => + Seq(true, false).foreach { coerceNestedTypes => + withSQLConf(SQLConf.MERGE_INTO_NESTED_TYPE_COERCION_ENABLED.key -> + coerceNestedTypes.toString) { + withTempView("source") { + // Target has struct with 3 fields, but row 1 has null for the extra field c3 + createAndInitTable( + s"""pk INT NOT NULL, + |s STRUCT, + |dep STRING""".stripMargin, + """{ "pk": 0, "s": { "c1": 1, "c2": "a", "c3": 10 }, "dep": "sales" } + |{ "pk": 1, "s": { "c1": 2, "c2": "b", "c3": null }, "dep": "hr" }""" + .stripMargin) + + // Source table has struct with 2 fields (missing field c3) + val sourceTableSchema = StructType(Seq( + StructField("pk", IntegerType), + StructField("s", StructType(Seq( + StructField("c1", IntegerType), + StructField("c2", StringType) + // missing field c3 + ))), + StructField("dep", StringType) + )) + + // Source has a struct with two null field values (not a null struct) + val data = Seq( + Row(1, Row(null, null), "engineering") + ) + spark.createDataFrame(spark.sparkContext.parallelize(data), sourceTableSchema) + .createOrReplaceTempView("source") + + val schemaEvolutionClause = if (withSchemaEvolution) "WITH SCHEMA EVOLUTION" else "" + val mergeStmt = + s"""MERGE $schemaEvolutionClause INTO $tableNameAsString t USING source + |ON t.pk = source.pk + |WHEN MATCHED THEN + | UPDATE SET * + |WHEN NOT MATCHED THEN + | INSERT * + |""".stripMargin + + if (coerceNestedTypes && withSchemaEvolution) { + sql(mergeStmt) + // Struct of null values should be preserved, not converted to null struct + checkAnswer( + sql(s"SELECT * FROM $tableNameAsString"), + Seq( + Row(0, Row(1, "a", 10), "sales"), + Row(1, Row(null, null, null), "engineering"))) + } else { + val exception = intercept[org.apache.spark.sql.AnalysisException] { + sql(mergeStmt) + } + assert(exception.errorClass.get == + "INCOMPATIBLE_DATA_FOR_TABLE.CANNOT_FIND_DATA") + } + } + } + sql(s"DROP TABLE IF EXISTS $tableNameAsString") + } + } + } test("merge with null struct - update field") { withTempView("source") { @@ -4916,7 +5202,7 @@ abstract class MergeIntoTableSuiteBase extends RowLevelOperationSuiteBase sql(s"SELECT * FROM $tableNameAsString"), Seq( Row(0, Row(1, Row(10, "x")), "sales"), - Row(1, null, "engineering"), + Row(1, Row(null, Row(null, "y")), "engineering"), Row(2, null, "finance"))) } else { val exception = intercept[org.apache.spark.sql.AnalysisException] { @@ -4932,6 +5218,338 @@ abstract class MergeIntoTableSuiteBase extends RowLevelOperationSuiteBase } } + test("merge with null source struct with extra target source field being null") { + Seq(true, false).foreach { withSchemaEvolution => + Seq(true, false).foreach { coerceNestedTypes => + withSQLConf(SQLConf.MERGE_INTO_NESTED_TYPE_COERCION_ENABLED.key -> + coerceNestedTypes.toString) { + withTempView("source") { + // Target table has nested struct, row 1 has null for field 'b' (missing in source) + createAndInitTable( + s"""pk INT NOT NULL, + |s STRUCT>, + |dep STRING""".stripMargin, + """{ "pk": 0, "s": { "c1": 1, "c2": { "a": 10, "b": "x" } }, "dep": "sales" } + |{ "pk": 1, "s": { "c1": 2, "c2": { "a": 20, "b": null } }, "dep": "hr" }""" + .stripMargin) + + // Source table has struct with missing nested field 'b' + val sourceTableSchema = StructType(Seq( + StructField("pk", IntegerType), + StructField("s", StructType(Seq( + StructField("c1", IntegerType), + StructField("c2", StructType(Seq( + StructField("a", IntegerType) + // missing field 'b' + ))) + ))), + StructField("dep", StringType) + )) + + val data = Seq( + Row(1, null, "engineering") + ) + spark.createDataFrame(spark.sparkContext.parallelize(data), sourceTableSchema) + .createOrReplaceTempView("source") + + val schemaEvolutionClause = if (withSchemaEvolution) "WITH SCHEMA EVOLUTION" else "" + val mergeStmt = + s"""MERGE $schemaEvolutionClause INTO $tableNameAsString t USING source + |ON t.pk = source.pk + |WHEN MATCHED THEN + | UPDATE SET * + |WHEN NOT MATCHED THEN + | INSERT * + |""".stripMargin + + if (coerceNestedTypes && withSchemaEvolution) { + sql(mergeStmt) + // It's not immediately obvious, but because the target had extra fields + // we preserve them despite them being null (and thus retain the struct of nulls) + checkAnswer( + sql(s"SELECT * FROM $tableNameAsString"), + Seq( + Row(0, Row(1, Row(10, "x")), "sales"), + Row(1, Row(null, Row(null, null)), "engineering"))) + } else { + val exception = intercept[org.apache.spark.sql.AnalysisException] { + sql(mergeStmt) + } + assert(exception.errorClass.get == + "INCOMPATIBLE_DATA_FOR_TABLE.CANNOT_FIND_DATA") + } + } + } + sql(s"DROP TABLE IF EXISTS $tableNameAsString") + } + } + } + + test("merge with null source struct with extra target field in doubly nested struct") { + Seq(true, false).foreach { withSchemaEvolution => + Seq(true, false).foreach { coerceNestedTypes => + withSQLConf(SQLConf.MERGE_INTO_NESTED_TYPE_COERCION_ENABLED.key -> + coerceNestedTypes.toString) { + withTempView("source") { + // Target has struct nested in struct, with extra field 'y' in innermost struct + val targetTableSchema = StructType(Seq( + StructField("c1", IntegerType), + StructField("c2", StructType(Seq( + StructField("a", IntegerType), + StructField("b", StructType(Seq( + StructField("x", IntegerType), + StructField("y", StringType) + ))) + ))) + )) + + val columns = Array( + Column.create("pk", IntegerType, false), + Column.create("s", targetTableSchema), + Column.create("dep", StringType)) + createTable(columns) + + val targetData = Seq( + Row(0, Row(1, Row(10, Row(100, "foo"))), "sales"), + Row(1, Row(2, Row(20, Row(200, null))), "hr") + ) + val targetDataSchema = StructType(Seq( + StructField("pk", IntegerType), + StructField("s", targetTableSchema), + StructField("dep", StringType) + )) + spark.createDataFrame(spark.sparkContext.parallelize(targetData), targetDataSchema) + .writeTo(tableNameAsString).append() + + // Source has struct with missing field 'y' in innermost struct + val sourceTableSchema = StructType(Seq( + StructField("pk", IntegerType), + StructField("s", StructType(Seq( + StructField("c1", IntegerType), + StructField("c2", StructType(Seq( + StructField("a", IntegerType), + StructField("b", StructType(Seq( + StructField("x", IntegerType) + // missing field 'y' + ))) + ))) + ))), + StructField("dep", StringType) + )) + + val data = Seq( + Row(1, null, "engineering") + ) + spark.createDataFrame(spark.sparkContext.parallelize(data), sourceTableSchema) + .createOrReplaceTempView("source") + + val schemaEvolutionClause = if (withSchemaEvolution) "WITH SCHEMA EVOLUTION" else "" + val mergeStmt = + s"""MERGE $schemaEvolutionClause INTO $tableNameAsString t USING source + |ON t.pk = source.pk + |WHEN MATCHED THEN + | UPDATE SET * + |WHEN NOT MATCHED THEN + | INSERT * + |""".stripMargin + + if (coerceNestedTypes && withSchemaEvolution) { + sql(mergeStmt) + // Because the target had extra field 'y' which is null, + // we preserve it and retain the struct of nulls + checkAnswer( + sql(s"SELECT * FROM $tableNameAsString"), + Seq( + Row(0, Row(1, Row(10, Row(100, "foo"))), "sales"), + Row(1, Row(null, Row(null, Row(null, null))), "engineering"))) + } else { + val exception = intercept[org.apache.spark.sql.AnalysisException] { + sql(mergeStmt) + } + assert(exception.errorClass.get == + "INCOMPATIBLE_DATA_FOR_TABLE.CANNOT_FIND_DATA") + } + } + } + sql(s"DROP TABLE IF EXISTS $tableNameAsString") + } + } + } + + test("merge with null source struct with extra target field in struct inside array") { + Seq(true, false).foreach { withSchemaEvolution => + Seq(true, false).foreach { coerceNestedTypes => + withSQLConf(SQLConf.MERGE_INTO_NESTED_TYPE_COERCION_ENABLED.key -> + coerceNestedTypes.toString) { + withTempView("source") { + // Target has struct with array of structs, with extra field 'y' in array element struct + val arrayElementSchema = StructType(Seq( + StructField("x", IntegerType), + StructField("y", StringType) + )) + val targetTableSchema = StructType(Seq( + StructField("c1", IntegerType), + StructField("arr", ArrayType(arrayElementSchema)) + )) + + val columns = Array( + Column.create("pk", IntegerType, false), + Column.create("s", targetTableSchema), + Column.create("dep", StringType)) + createTable(columns) + + val targetData = Seq( + Row(0, Row(1, Seq(Row(100, "foo"), Row(101, "bar"))), "sales"), + Row(1, Row(2, Seq(Row(200, null), Row(201, null))), "hr") + ) + val targetDataSchema = StructType(Seq( + StructField("pk", IntegerType), + StructField("s", targetTableSchema), + StructField("dep", StringType) + )) + spark.createDataFrame(spark.sparkContext.parallelize(targetData), targetDataSchema) + .writeTo(tableNameAsString).append() + + // Source has struct with missing field 'y' in array element struct + val sourceArrayElementSchema = StructType(Seq( + StructField("x", IntegerType) + // missing field 'y' + )) + val sourceTableSchema = StructType(Seq( + StructField("pk", IntegerType), + StructField("s", StructType(Seq( + StructField("c1", IntegerType), + StructField("arr", ArrayType(sourceArrayElementSchema)) + ))), + StructField("dep", StringType) + )) + + val data = Seq( + Row(1, null, "engineering") + ) + spark.createDataFrame(spark.sparkContext.parallelize(data), sourceTableSchema) + .createOrReplaceTempView("source") + + val schemaEvolutionClause = if (withSchemaEvolution) "WITH SCHEMA EVOLUTION" else "" + val mergeStmt = + s"""MERGE $schemaEvolutionClause INTO $tableNameAsString t USING source + |ON t.pk = source.pk + |WHEN MATCHED THEN + | UPDATE SET * + |WHEN NOT MATCHED THEN + | INSERT * + |""".stripMargin + + if (coerceNestedTypes && withSchemaEvolution) { + sql(mergeStmt) + // Because the target had extra field 'y' which is within an array, + // it cannot be referenced and so we do not preserve it and allow source null + // to override it. + checkAnswer( + sql(s"SELECT * FROM $tableNameAsString"), + Seq( + Row(0, Row(1, Seq(Row(100, "foo"), Row(101, "bar"))), "sales"), + Row(1, null, "engineering"))) + } else { + val exception = intercept[org.apache.spark.sql.AnalysisException] { + sql(mergeStmt) + } + assert(exception.errorClass.get == + "INCOMPATIBLE_DATA_FOR_TABLE.CANNOT_FIND_DATA") + } + } + } + sql(s"DROP TABLE IF EXISTS $tableNameAsString") + } + } + } + + test("merge with null source struct with extra null target field in struct containing array") { + Seq(true, false).foreach { withSchemaEvolution => + Seq(true, false).foreach { coerceNestedTypes => + withSQLConf(SQLConf.MERGE_INTO_NESTED_TYPE_COERCION_ENABLED.key -> + coerceNestedTypes.toString) { + withTempView("source") { + val arrayElementSchema = StructType(Seq( + StructField("x", IntegerType) + )) + val targetTableSchema = StructType(Seq( + StructField("c1", IntegerType), + StructField("arr", ArrayType(arrayElementSchema)), + StructField("c2", StringType) // extra field at nested struct level + )) + + val columns = Array( + Column.create("pk", IntegerType, false), + Column.create("s", targetTableSchema), + Column.create("dep", StringType)) + createTable(columns) + + val targetData = Seq( + Row(0, Row(1, Seq(Row(100), Row(101)), "foo"), "sales"), + Row(1, Row(2, Seq(Row(200), Row(201)), null), "hr") // c2 is null + ) + val targetDataSchema = StructType(Seq( + StructField("pk", IntegerType), + StructField("s", targetTableSchema), + StructField("dep", StringType) + )) + spark.createDataFrame(spark.sparkContext.parallelize(targetData), targetDataSchema) + .writeTo(tableNameAsString).append() + + // Source has struct missing field 'c2' + val sourceArrayElementSchema = StructType(Seq( + StructField("x", IntegerType) + )) + val sourceTableSchema = StructType(Seq( + StructField("pk", IntegerType), + StructField("s", StructType(Seq( + StructField("c1", IntegerType), + StructField("arr", ArrayType(sourceArrayElementSchema)) + // missing field 'c2' + ))), + StructField("dep", StringType) + )) + + val data = Seq( + Row(1, null, "engineering") + ) + spark.createDataFrame(spark.sparkContext.parallelize(data), sourceTableSchema) + .createOrReplaceTempView("source") + + val schemaEvolutionClause = if (withSchemaEvolution) "WITH SCHEMA EVOLUTION" else "" + val mergeStmt = + s"""MERGE $schemaEvolutionClause INTO $tableNameAsString t USING source + |ON t.pk = source.pk + |WHEN MATCHED THEN + | UPDATE SET * + |WHEN NOT MATCHED THEN + | INSERT * + |""".stripMargin + + if (coerceNestedTypes && withSchemaEvolution) { + sql(mergeStmt) + // Because the target had extra field 'c2' which is null, + // we preserve it and retain the struct of nulls + checkAnswer( + sql(s"SELECT * FROM $tableNameAsString"), + Seq( + Row(0, Row(1, Seq(Row(100), Row(101)), "foo"), "sales"), + Row(1, Row(null, null, null), "engineering"))) + } else { + val exception = intercept[org.apache.spark.sql.AnalysisException] { + sql(mergeStmt) + } + assert(exception.errorClass.get == + "INCOMPATIBLE_DATA_FOR_TABLE.CANNOT_FIND_DATA") + } + } + } + sql(s"DROP TABLE IF EXISTS $tableNameAsString") + } + } + } + test("merge null struct with schema evolution - source with missing and extra nested fields") { Seq(true, false).foreach { withSchemaEvolution => Seq(true, false).foreach { coerceNestedTypes => @@ -4986,7 +5604,7 @@ abstract class MergeIntoTableSuiteBase extends RowLevelOperationSuiteBase sql(s"SELECT * FROM $tableNameAsString"), Seq( Row(0, Row(1, Row(10, "x", null)), "sales"), - Row(1, null, "engineering"), + Row(1, Row(null, Row(null, "y", null)), "engineering"), Row(2, null, "finance"))) } else { val exception = intercept[org.apache.spark.sql.AnalysisException] { @@ -5128,7 +5746,7 @@ abstract class MergeIntoTableSuiteBase extends RowLevelOperationSuiteBase sql(s"SELECT * FROM $tableNameAsString"), Seq( Row(0, Row(1, Row(10, "x")), "sales"), - Row(1, null, "engineering"), + Row(1, Row(null, Row(null, "y")), "engineering"), Row(2, null, "finance"))) } else { val exception = intercept[org.apache.spark.sql.AnalysisException] { @@ -6266,7 +6884,7 @@ abstract class MergeIntoTableSuiteBase extends RowLevelOperationSuiteBase sql(s"SELECT * FROM $tableNameAsString"), Seq( Row(0, Row(1, Row(10, "x")), "sales"), - Row(1, null, "engineering"), + Row(1, Row(null, Row(null, "y")), "engineering"), Row(2, null, "finance"))) } else { val exception = intercept[org.apache.spark.sql.AnalysisException] { @@ -6370,7 +6988,7 @@ abstract class MergeIntoTableSuiteBase extends RowLevelOperationSuiteBase sql(s"SELECT * FROM $tableNameAsString"), Seq( Row(0, Row(1, Row(10, "x", null)), "sales"), - Row(1, null, "engineering"), + Row(1, Row(null, Row(null, "y", null)), "engineering"), Row(2, null, "finance"))) } else { val exception = intercept[org.apache.spark.sql.AnalysisException] { From 810f6a5073cdf129e51bd667348d4fff0889468e Mon Sep 17 00:00:00 2001 From: Szehon Ho Date: Thu, 11 Dec 2025 22:24:54 -0800 Subject: [PATCH 3/4] Simplification --- .../catalyst/analysis/AssignmentUtils.scala | 46 +++++++++++-------- 1 file changed, 26 insertions(+), 20 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/AssignmentUtils.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/AssignmentUtils.scala index d245ccd777a9..0aadbf266a88 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/AssignmentUtils.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/AssignmentUtils.scala @@ -21,7 +21,7 @@ import scala.collection.mutable import org.apache.spark.sql.catalyst.SQLConfHelper import org.apache.spark.sql.catalyst.analysis.TableOutputResolver.DefaultValueFillMode.{NONE, RECURSE} -import org.apache.spark.sql.catalyst.expressions.{And, Attribute, CreateNamedStruct, Expression, GetStructField, If, IsNull, Literal, Or} +import org.apache.spark.sql.catalyst.expressions.{And, Attribute, CreateNamedStruct, Expression, GetStructField, If, IsNull, Literal} import org.apache.spark.sql.catalyst.expressions.objects.AssertNotNull import org.apache.spark.sql.catalyst.plans.logical.Assignment import org.apache.spark.sql.catalyst.types.DataTypeUtils @@ -304,20 +304,27 @@ object AssignmentUtils extends SQLConfHelper with CastSupport { CreateNamedStruct(namedStructExprs) } - private def getMissingSourcePaths(targetType: StructType, - sourceType: DataType, - colPath: Seq[String], - addError: String => Unit): Seq[Seq[String]] = { - val nestedTargetPaths = DataTypeUtils.extractLeafFieldPaths(targetType, Seq.empty) - val nestedSourcePaths = sourceType match { + /** + * Checks if target struct has extra fields compared to source struct, recursively. + */ + private def hasExtraTargetFields(targetType: StructType, sourceType: DataType): Boolean = { + sourceType match { case sourceStructType: StructType => - DataTypeUtils.extractLeafFieldPaths(sourceStructType, Seq.empty) + targetType.fields.exists { targetField => + sourceStructType.fields.find(f => conf.resolver(f.name, targetField.name)) match { + case Some(sourceField) => + // Check nested structs recursively + (targetField.dataType, sourceField.dataType) match { + case (targetNested: StructType, sourceNested) => + hasExtraTargetFields(targetNested, sourceNested) + case _ => false + } + case None => true // target has extra field not in source + } + } case _ => - addError(s"Value for struct type: " + - s"${colPath.quoted} must be a struct but was ${sourceType.simpleString}") - Seq() + false } - nestedTargetPaths.diff(nestedSourcePaths) } /** @@ -354,15 +361,14 @@ object AssignmentUtils extends SQLConfHelper with CastSupport { if (!col.nullable) { AssertNotNull(value) } else { - // Check if there are missing source paths (nested fields in target but not in source) - val missingSourcePaths = getMissingSourcePaths(structType, value.dataType, colPath, addError) - val missingPathsEmpty = - if (missingSourcePaths.isEmpty) Literal.TrueLiteral else Literal.FalseLiteral - - // Condition: (source struct IS NULL) AND (target struct IS NULL OR missingSourcePaths empty) - val condition = And(IsNull(value), Or(IsNull(col), missingPathsEmpty)) + val condition = if (hasExtraTargetFields(structType, value.dataType)) { + // extra target fields: return null iff source struct is null and target struct is null + And(IsNull(value), IsNull(col)) + } else { + // schemas match: return null iff source struct is null + IsNull(value) + } - // Return: If (condition) THEN NULL ELSE structAssignment If(condition, Literal(null, structAssignment.dataType), structAssignment) } } From 0576f789a0353656423b293cb36e4e7176b5dec3 Mon Sep 17 00:00:00 2001 From: Szehon Ho Date: Fri, 12 Dec 2025 09:27:21 -0800 Subject: [PATCH 4/4] Review comments --- .../sql/catalyst/analysis/AssignmentUtils.scala | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/AssignmentUtils.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/AssignmentUtils.scala index 0aadbf266a88..086ce6ddb177 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/AssignmentUtils.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/AssignmentUtils.scala @@ -19,6 +19,7 @@ package org.apache.spark.sql.catalyst.analysis import scala.collection.mutable +import org.apache.spark.SparkException import org.apache.spark.sql.catalyst.SQLConfHelper import org.apache.spark.sql.catalyst.analysis.TableOutputResolver.DefaultValueFillMode.{NONE, RECURSE} import org.apache.spark.sql.catalyst.expressions.{And, Attribute, CreateNamedStruct, Expression, GetStructField, If, IsNull, Literal} @@ -56,7 +57,6 @@ object AssignmentUtils extends SQLConfHelper with CastSupport { * (preserving existing fields). * @param coerceNestedTypes whether to coerce nested types to match the target type * for complex types - * @param missingSourcePaths paths that exist in target but not in source * @return aligned update assignments that match table attributes */ def alignUpdateAssignments( @@ -203,9 +203,8 @@ object AssignmentUtils extends SQLConfHelper with CastSupport { } else { val value = exactAssignments.head.value val coerceMode = if (coerceNestedTypes) RECURSE else NONE - val resolvedValue = TableOutputResolver.resolveUpdate("", value, col, conf, addError, + TableOutputResolver.resolveUpdate("", value, col, conf, addError, colPath, coerceMode) - resolvedValue } } else { applyFieldAssignments(col, colExpr, fieldAssignments, addError, colPath, coerceNestedTypes) @@ -218,7 +217,7 @@ object AssignmentUtils extends SQLConfHelper with CastSupport { assignments: Seq[Assignment], addError: String => Unit, colPath: Seq[String], - coerceNestedTyptes: Boolean): Expression = { + coerceNestedTypes: Boolean): Expression = { col.dataType match { case structType: StructType => @@ -228,7 +227,7 @@ object AssignmentUtils extends SQLConfHelper with CastSupport { } val updatedFieldExprs = fieldAttrs.zip(fieldExprs).map { case (fieldAttr, fieldExpr) => applyAssignments(fieldAttr, fieldExpr, assignments, addError, colPath :+ fieldAttr.name, - coerceNestedTyptes) + coerceNestedTypes) } toNamedStruct(structType, updatedFieldExprs) @@ -323,7 +322,9 @@ object AssignmentUtils extends SQLConfHelper with CastSupport { } } case _ => - false + // Should be caught earlier + throw SparkException.internalError( + s"Source type must be StructType but found: $sourceType") } }