diff --git a/common/utils/src/main/resources/error/error-conditions.json b/common/utils/src/main/resources/error/error-conditions.json index 417170d4b744c..7be9fbdd3e9fe 100644 --- a/common/utils/src/main/resources/error/error-conditions.json +++ b/common/utils/src/main/resources/error/error-conditions.json @@ -3330,7 +3330,7 @@ }, "INVALID_SINGLE_VARIANT_COLUMN" : { "message" : [ - "The `singleVariantColumn` option cannot be used if there is also a user specified schema." + "User specified schema is invalid when the `singleVariantColumn` option is enabled. The schema must either be a variant field, or a variant field plus a corrupt column field." ], "sqlState" : "42613" }, diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/DataSourceOptions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/DataSourceOptions.scala index 41b062e4a375c..aa5e3c1de13f7 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/DataSourceOptions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/DataSourceOptions.scala @@ -17,6 +17,11 @@ package org.apache.spark.sql.catalyst +import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap +import org.apache.spark.sql.errors.QueryCompilationErrors +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.types._ + /** * Interface defines the following methods for a data source: * - register a new option name @@ -71,4 +76,39 @@ object DataSourceOptions { // as a single VARIANT type column in the table with the given column name. // E.g. spark.read.format("").option("singleVariantColumn", "colName") val SINGLE_VARIANT_COLUMN = "singleVariantColumn" + // The common option name for all data sources that supports corrupt record. In case of a parsing + // error, the record will be stored as a string in the column with the given name. + // Theoretically, the behavior of this option is not affected by the parsing mode + // (PERMISSIVE/FAILFAST/DROPMALFORMED). However, the corrupt record is only visible to the user + // when in PERMISSIVE mode, because the queries will fail in FAILFAST mode, or the row containing + // the corrupt record will be dropped in DROPMALFORMED mode. + val COLUMN_NAME_OF_CORRUPT_RECORD = "columnNameOfCorruptRecord" + + // When `singleVariantColumn` is enabled and there is a user-specified schema, the schema must + // either be a variant field, or a variant field plus a corrupt column field. + def validateSingleVariantColumn( + options: CaseInsensitiveMap[String], + userSpecifiedSchema: Option[StructType]): Unit = { + (options.get(SINGLE_VARIANT_COLUMN), userSpecifiedSchema) match { + case (Some(variantColumnName), Some(schema)) => + var valid = schema.fields.exists { f => + f.dataType.isInstanceOf[VariantType] && f.name == variantColumnName && f.nullable + } + schema.length match { + case 1 => + case 2 => + val corruptRecordColumnName = options.getOrElse( + COLUMN_NAME_OF_CORRUPT_RECORD, SQLConf.get.columnNameOfCorruptRecord) + valid = valid && corruptRecordColumnName != variantColumnName + valid = valid && schema.fields.exists { f => + f.dataType.isInstanceOf[StringType] && f.name == corruptRecordColumnName && f.nullable + } + case _ => valid = false + } + if (!valid) { + throw QueryCompilationErrors.invalidSingleVariantColumn(schema) + } + case _ => + } + } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVOptions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVOptions.scala index 3dcab7a03f04f..b43f124ed19c0 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVOptions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVOptions.scala @@ -396,7 +396,7 @@ object CSVOptions extends DataSourceOptions { val EMPTY_VALUE = newOption("emptyValue") val LINE_SEP = newOption("lineSep") val INPUT_BUFFER_SIZE = newOption("inputBufferSize") - val COLUMN_NAME_OF_CORRUPT_RECORD = newOption("columnNameOfCorruptRecord") + val COLUMN_NAME_OF_CORRUPT_RECORD = newOption(DataSourceOptions.COLUMN_NAME_OF_CORRUPT_RECORD) val NULL_VALUE = newOption("nullValue") val NAN_VALUE = newOption("nanValue") val POSITIVE_INF = newOption("positiveInf") diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/csv/CsvExpressionEvalUtils.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/csv/CsvExpressionEvalUtils.scala index 650db3a686be1..4d6c862a4fbce 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/csv/CsvExpressionEvalUtils.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/csv/CsvExpressionEvalUtils.scala @@ -19,7 +19,7 @@ package org.apache.spark.sql.catalyst.expressions.csv import com.univocity.parsers.csv.CsvParser import org.apache.spark.SparkException -import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.{DataSourceOptions, InternalRow} import org.apache.spark.sql.catalyst.csv.{CSVInferSchema, CSVOptions, UnivocityParser} import org.apache.spark.sql.catalyst.expressions.ExprUtils import org.apache.spark.sql.catalyst.util.{FailFastMode, FailureSafeParser, PermissiveMode} @@ -66,11 +66,7 @@ case class CsvToStructsEvaluator( if (mode != PermissiveMode && mode != FailFastMode) { throw QueryCompilationErrors.parseModeUnsupportedError("from_csv", mode) } - if (parsedOptions.singleVariantColumn.isDefined) { - if (nullableSchema.length != 1 || nullableSchema.head.dataType != VariantType) { - throw QueryCompilationErrors.invalidSingleVariantColumn() - } - } + DataSourceOptions.validateSingleVariantColumn(parsedOptions.parameters, Some(nullableSchema)) ExprUtils.verifyColumnNameOfCorruptRecord( nullableSchema, parsedOptions.columnNameOfCorruptRecord) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala index 249c0efc26813..6ba152d309846 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala @@ -91,7 +91,7 @@ class JSONOptions( val parseMode: ParseMode = parameters.get(MODE).map(ParseMode.fromString).getOrElse(PermissiveMode) val columnNameOfCorruptRecord = - parameters.getOrElse(COLUMN_NAME_OF_CORRUPTED_RECORD, defaultColumnNameOfCorruptRecord) + parameters.getOrElse(COLUMN_NAME_OF_CORRUPT_RECORD, defaultColumnNameOfCorruptRecord) // Whether to ignore column of all null values or empty array/struct during schema inference val dropFieldIfAllNull = parameters.get(DROP_FIELD_IF_ALL_NULL).map(_.toBoolean).getOrElse(false) @@ -284,7 +284,7 @@ object JSONOptions extends DataSourceOptions { val LINE_SEP = newOption("lineSep") val PRETTY = newOption("pretty") val INFER_TIMESTAMP = newOption("inferTimestamp") - val COLUMN_NAME_OF_CORRUPTED_RECORD = newOption("columnNameOfCorruptRecord") + val COLUMN_NAME_OF_CORRUPT_RECORD = newOption(DataSourceOptions.COLUMN_NAME_OF_CORRUPT_RECORD) val TIME_ZONE = newOption("timeZone") val WRITE_NON_ASCII_CHARACTER_AS_CODEPOINT = newOption("writeNonAsciiCharacterAsCodePoint") val SINGLE_VARIANT_COLUMN = newOption(DataSourceOptions.SINGLE_VARIANT_COLUMN) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/xml/XmlOptions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/xml/XmlOptions.scala index e2c2d9dbc6d63..2fb25478e529b 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/xml/XmlOptions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/xml/XmlOptions.scala @@ -200,7 +200,7 @@ object XmlOptions extends DataSourceOptions { val COMPRESSION = newOption("compression") val MULTI_LINE = newOption("multiLine") val SAMPLING_RATIO = newOption("samplingRatio") - val COLUMN_NAME_OF_CORRUPT_RECORD = newOption("columnNameOfCorruptRecord") + val COLUMN_NAME_OF_CORRUPT_RECORD = newOption(DataSourceOptions.COLUMN_NAME_OF_CORRUPT_RECORD) val DATE_FORMAT = newOption("dateFormat") val TIMESTAMP_FORMAT = newOption("timestampFormat") val TIMESTAMP_NTZ_FORMAT = newOption("timestampNTZFormat") diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala index 4d8b6b3da6db6..b58605ae95420 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala @@ -3355,10 +3355,10 @@ private[sql] object QueryCompilationErrors extends QueryErrorsBase with Compilat "config" -> SQLConf.LEGACY_PATH_OPTION_BEHAVIOR.key)) } - def invalidSingleVariantColumn(): Throwable = { + def invalidSingleVariantColumn(schema: DataType): Throwable = { new AnalysisException( errorClass = "INVALID_SINGLE_VARIANT_COLUMN", - messageParameters = Map.empty) + messageParameters = Map("schema" -> toSQLType(schema))) } def writeWithSaveModeUnsupportedBySourceError(source: String, createMode: String): Throwable = { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/classic/DataFrameReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/classic/DataFrameReader.scala index 08cc823752263..bc01517e1c6ff 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/classic/DataFrameReader.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/classic/DataFrameReader.scala @@ -36,7 +36,6 @@ import org.apache.spark.sql.catalyst.plans.logical.UnresolvedDataSource import org.apache.spark.sql.catalyst.util.FailureSafeParser import org.apache.spark.sql.catalyst.xml.{StaxXmlParser, XmlOptions} import org.apache.spark.sql.classic.ClassicConversions._ -import org.apache.spark.sql.errors.QueryCompilationErrors import org.apache.spark.sql.execution.datasources.csv._ import org.apache.spark.sql.execution.datasources.jdbc.{JDBCOptions, JDBCPartition, JDBCRelation} import org.apache.spark.sql.execution.datasources.json.JsonUtils.checkJsonSchema @@ -336,12 +335,8 @@ class DataFrameReader private[sql](sparkSession: SparkSession) override def textFile(paths: String*): Dataset[String] = super.textFile(paths: _*) /** @inheritdoc */ - override protected def validateSingleVariantColumn(): Unit = { - if (extraOptions.get(DataSourceOptions.SINGLE_VARIANT_COLUMN).isDefined && - userSpecifiedSchema.isDefined) { - throw QueryCompilationErrors.invalidSingleVariantColumn() - } - } + override protected def validateSingleVariantColumn(): Unit = + DataSourceOptions.validateSingleVariantColumn(extraOptions, userSpecifiedSchema) override protected def validateJsonSchema(): Unit = userSpecifiedSchema.foreach(checkJsonSchema) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/CsvFunctionsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/CsvFunctionsSuite.scala index 5b2ae9a20ab7d..f5cca28dc01da 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/CsvFunctionsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/CsvFunctionsSuite.scala @@ -789,7 +789,7 @@ class CsvFunctionsSuite extends QueryTest with SharedSparkSession { from_csv( $"value", StructType.fromDDL("v variant"), - Map("header" -> header, "singleVariantColumn" -> "true") + Map("header" -> header, "singleVariantColumn" -> "v") ).cast("string")), Seq( Row("""{{"_c0":100,"_c1":1.1}}"""), @@ -797,6 +797,19 @@ class CsvFunctionsSuite extends QueryTest with SharedSparkSession { Row("""{{"_c0":null,"_c1":true}}"""), Row("""{{"_c0":"1e9","_c1":"hello","_c2":"extra"}}"""), Row("""{{"_c0":"missing"}}"""))) + checkAnswer( + df.select( + from_csv( + $"value", + StructType.fromDDL("v variant, _corrupt_record string"), + Map("header" -> header, "singleVariantColumn" -> "v") + ).cast("string")), + Seq( + Row("""{{"_c0":100,"_c1":1.1}, null}"""), + Row("""{{"_c0":"2000-01-01","_c1":"2000-01-01 01:02:03+00:00"}, null}"""), + Row("""{{"_c0":null,"_c1":true}, null}"""), + Row("""{{"_c0":"1e9","_c1":"hello","_c2":"extra"}, null}"""), + Row("""{{"_c0":"missing"}, null}"""))) } } checkError( @@ -807,7 +820,20 @@ class CsvFunctionsSuite extends QueryTest with SharedSparkSession { StructType.fromDDL("a variant, b variant"), Map("singleVariantColumn" -> "true"))).collect() }, - condition = "INVALID_SINGLE_VARIANT_COLUMN") + condition = "INVALID_SINGLE_VARIANT_COLUMN", + parameters = Map("schema" -> "\"STRUCT\"")) + + // In singleVariantColumn mode, from_csv normally treats all inputs as valid. The only exception + // case is the input exceeds the variant size limit (16MiB). + val largeInput = "a" * (16 * 1024 * 1024) + checkAnswer( + Seq(largeInput).toDF("value").select( + from_csv( + $"value", + StructType.fromDDL("v variant, _corrupt_record string"), + Map("singleVariantColumn" -> "v") + ).cast("string")), + Seq(Row(s"""{null, $largeInput}"""))) } test("SPARK-47497: the input of to_csv must be StructType") { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/VariantSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/VariantSuite.scala index 1999e6502d5da..95cee3b995996 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/VariantSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/VariantSuite.scala @@ -433,23 +433,35 @@ class VariantSuite extends QueryTest with SharedSparkSession with ExpressionEval val file = new File(dir, "file.json") Files.write(file.toPath, "0".getBytes(StandardCharsets.UTF_8)) - // Ensure that we get an error when setting the singleVariantColumn JSON option while also - // specifying a schema. - checkError( - exception = intercept[AnalysisException] { - spark.read.format("json").option("singleVariantColumn", "var").schema("var variant") - }, - condition = "INVALID_SINGLE_VARIANT_COLUMN", - parameters = Map.empty - ) - checkError( - exception = intercept[AnalysisException] { - spark.read.format("json").option("singleVariantColumn", "another_name") - .schema("var variant").json(file.getAbsolutePath).collect() - }, - condition = "INVALID_SINGLE_VARIANT_COLUMN", - parameters = Map.empty - ) + // These are valid. + for ((schema, options) <- Seq( + ("var variant", Map()), + ("var variant, _corrupt_record string", Map()), + ("_corrupt_record string, var variant", Map()), + ("c string, var variant", Map("columnNameOfCorruptRecord" -> "c")) + )) { + spark.read.options(Map("singleVariantColumn" -> "var") ++ options) + .schema(schema).json(file.getAbsolutePath).collect() + } + + // These are invalid. + for ((schema, options) <- Seq( + ("a variant", Map()), + ("var int", Map()), + ("var variant, c string", Map()), + ("var variant, _corrupt_record int", Map()), + ("var variant, _corrupt_record string", Map("columnNameOfCorruptRecord" -> "c")), + ("var variant, var string", Map("columnNameOfCorruptRecord" -> "var")) + )) { + checkError( + exception = intercept[AnalysisException] { + spark.read.options(Map("singleVariantColumn" -> "var") ++ options) + .schema(schema).json(file.getAbsolutePath).collect() + }, + condition = "INVALID_SINGLE_VARIANT_COLUMN", + parameters = Map("schema" -> s"\"${StructType.fromDDL(schema).sql}\"") + ) + } } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala index 6134603642ddb..5c5efdbf64070 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala @@ -3556,6 +3556,23 @@ abstract class CSVSuite """{"field 1":"missing"}""") } + withSQLConf(SQLConf.SESSION_LOCAL_TIMEZONE.key -> "UTC") { + val options = Map("header" -> "true", "singleVariantColumn" -> "v") + checkAnswer( + spark.read.options(options) + .schema("v variant, _corrupt_record string") + .csv(path.getCanonicalPath) + .selectExpr("cast(v as string)", "_corrupt_record"), + // When `header` is true, inconsistent lengths between the header and content line is + // treated as a corrupt record. + Seq( + Row("""{"field 1":100,"field2":1.1}""", null), + Row("""{"field 1":"2000-01-01","field2":"2000-01-01 01:02:03+00:00"}""", null), + Row("""{"field 1":null,"field2":true}""", null), + Row("""{"field 1":"1e9","field2":"hello"}""", "1e9,hello,extra"), + Row("""{"field 1":"missing"}""", "missing"))) + } + checkError( exception = intercept[SparkException] { val options = Map("singleVariantColumn" -> "v", "header" -> "true", "mode" -> "failfast") diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala index 07fb2dedaa0a0..52ffdf6c6c0b9 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala @@ -4007,6 +4007,7 @@ abstract class JsonSuite "true", """{"a": [], "b": null}""", """{"a": 1}""", + "bad json", "[1, 2, 3]" ).mkString("\n").getBytes(StandardCharsets.UTF_8) @@ -4018,13 +4019,30 @@ abstract class JsonSuite spark.read.format("json").option("singleVariantColumn", "var") .load(file.getAbsolutePath) .selectExpr("to_json(var)"), - Seq(Row("true"), Row("""{"a":[],"b":null}"""), Row("""{"a":1}"""), Row("[1,2,3]")) + Seq(Row("true"), Row("""{"a":[],"b":null}"""), Row("""{"a":1}"""), Row(null), + Row("[1,2,3]")) + ) + + checkAnswer( + spark.read.format("json").option("singleVariantColumn", "var") + .schema("var variant, _corrupt_record string") + .load(file.getAbsolutePath) + .selectExpr("to_json(var)", "_corrupt_record"), + Seq(Row("true", null), Row("""{"a":[],"b":null}""", null), Row("""{"a":1}""", null), + Row(null, "bad json"), Row("[1,2,3]", null)) ) checkAnswer( spark.read.format("json").schema("a variant, b variant") .load(file.getAbsolutePath).selectExpr("to_json(a)", "to_json(b)"), - Seq(Row(null, null), Row("[]", "null"), Row("1", null), Row(null, null)) + Seq(Row(null, null), Row("[]", "null"), Row("1", null), Row(null, null), Row(null, null)) + ) + + checkAnswer( + spark.read.format("json").schema("a variant, b variant, _corrupt_record string") + .load(file.getAbsolutePath).selectExpr("to_json(a)", "to_json(b)", "_corrupt_record"), + Seq(Row(null, null, "true"), Row("[]", "null", null), Row("1", null, null), + Row(null, null, "bad json"), Row(null, null, "[1, 2, 3]")) ) withTempDir { streamDir => @@ -4038,7 +4056,8 @@ abstract class JsonSuite stream.processAllAvailable() checkAnswer( spark.read.format("parquet").load(streamDir.getCanonicalPath + "/output"), - Seq(Row("true"), Row("""{"a":[],"b":null}"""), Row("""{"a":1}"""), Row("[1,2,3]")) + Seq(Row("true"), Row("""{"a":[],"b":null}"""), Row("""{"a":1}"""), Row(null), + Row("[1,2,3]")) ) } } @@ -4051,10 +4070,32 @@ abstract class JsonSuite spark.read.format("json").option("singleVariantColumn", "var") .load(dir.getAbsolutePath).selectExpr("a", "b", "to_json(var)"), Seq(Row(1, 2, "true"), Row(1, 2, """{"a":[],"b":null}"""), Row(1, 2, """{"a":1}"""), - Row(1, 2, "[1,2,3]")) + Row(1, 2, null), Row(1, 2, "[1,2,3]")) ) } } + + test("from_json with variant") { + val df = Seq( + "true", + """{"a": [], "b": null}""", + """{"a": 1}""", + "bad json", + "[1, 2, 3]" + ).toDF("value") + checkAnswer( + df.selectExpr("cast(from_json(value, 'var variant', " + + "map('singleVariantColumn', 'var')) as string)"), + Seq(Row("{true}"), Row("""{{"a":[],"b":null}}"""), Row("""{{"a":1}}"""), Row("{null}"), + Row("{[1,2,3]}")) + ) + checkAnswer( + df.selectExpr("cast(from_json(value, 'var variant, _corrupt_record string', " + + "map('singleVariantColumn', 'var')) as string)"), + Seq(Row("{true, null}"), Row("""{{"a":[],"b":null}, null}"""), Row("""{{"a":1}, null}"""), + Row("{null, bad json}"), Row("{[1,2,3], null}")) + ) + } } class JsonV1Suite extends JsonSuite {