Skip to content

[SPARK-51296][SQL] Support collecting corrupt data in singleVariantColumn mode. #50517

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -3330,7 +3330,7 @@
},
"INVALID_SINGLE_VARIANT_COLUMN" : {
"message" : [
"The `singleVariantColumn` option cannot be used if there is also a user specified schema."
"User specified schema <schema> is invalid when the `singleVariantColumn` option is enabled. The schema must either be a variant field, or a variant field plus a corrupt column field."
],
"sqlState" : "42613"
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,11 @@

package org.apache.spark.sql.catalyst

import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
import org.apache.spark.sql.errors.QueryCompilationErrors
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types._

/**
* Interface defines the following methods for a data source:
* - register a new option name
Expand Down Expand Up @@ -71,4 +76,39 @@ object DataSourceOptions {
// as a single VARIANT type column in the table with the given column name.
// E.g. spark.read.format("<data-source-format>").option("singleVariantColumn", "colName")
val SINGLE_VARIANT_COLUMN = "singleVariantColumn"
// The common option name for all data sources that supports corrupt record. In case of a parsing
// error, the record will be stored as a string in the column with the given name.
// Theoretically, the behavior of this option is not affected by the parsing mode
// (PERMISSIVE/FAILFAST/DROPMALFORMED). However, the corrupt record is only visible to the user
// when in PERMISSIVE mode, because the queries will fail in FAILFAST mode, or the row containing
// the corrupt record will be dropped in DROPMALFORMED mode.
val COLUMN_NAME_OF_CORRUPT_RECORD = "columnNameOfCorruptRecord"

// When `singleVariantColumn` is enabled and there is a user-specified schema, the schema must
// either be a variant field, or a variant field plus a corrupt column field.
def validateSingleVariantColumn(
options: CaseInsensitiveMap[String],
userSpecifiedSchema: Option[StructType]): Unit = {
(options.get(SINGLE_VARIANT_COLUMN), userSpecifiedSchema) match {
case (Some(variantColumnName), Some(schema)) =>
var valid = schema.fields.exists { f =>
f.dataType.isInstanceOf[VariantType] && f.name == variantColumnName && f.nullable
}
schema.length match {
case 1 =>
case 2 =>
val corruptRecordColumnName = options.getOrElse(
COLUMN_NAME_OF_CORRUPT_RECORD, SQLConf.get.columnNameOfCorruptRecord)
valid = valid && corruptRecordColumnName != variantColumnName
valid = valid && schema.fields.exists { f =>
f.dataType.isInstanceOf[StringType] && f.name == corruptRecordColumnName && f.nullable
}
case _ => valid = false
}
if (!valid) {
throw QueryCompilationErrors.invalidSingleVariantColumn(schema)
}
case _ =>
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -396,7 +396,7 @@ object CSVOptions extends DataSourceOptions {
val EMPTY_VALUE = newOption("emptyValue")
val LINE_SEP = newOption("lineSep")
val INPUT_BUFFER_SIZE = newOption("inputBufferSize")
val COLUMN_NAME_OF_CORRUPT_RECORD = newOption("columnNameOfCorruptRecord")
val COLUMN_NAME_OF_CORRUPT_RECORD = newOption(DataSourceOptions.COLUMN_NAME_OF_CORRUPT_RECORD)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add from_csv and csv file reader test with corrupt column and variant

val NULL_VALUE = newOption("nullValue")
val NAN_VALUE = newOption("nanValue")
val POSITIVE_INF = newOption("positiveInf")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package org.apache.spark.sql.catalyst.expressions.csv
import com.univocity.parsers.csv.CsvParser

import org.apache.spark.SparkException
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.{DataSourceOptions, InternalRow}
import org.apache.spark.sql.catalyst.csv.{CSVInferSchema, CSVOptions, UnivocityParser}
import org.apache.spark.sql.catalyst.expressions.ExprUtils
import org.apache.spark.sql.catalyst.util.{FailFastMode, FailureSafeParser, PermissiveMode}
Expand Down Expand Up @@ -66,11 +66,7 @@ case class CsvToStructsEvaluator(
if (mode != PermissiveMode && mode != FailFastMode) {
throw QueryCompilationErrors.parseModeUnsupportedError("from_csv", mode)
}
if (parsedOptions.singleVariantColumn.isDefined) {
if (nullableSchema.length != 1 || nullableSchema.head.dataType != VariantType) {
throw QueryCompilationErrors.invalidSingleVariantColumn()
}
}
DataSourceOptions.validateSingleVariantColumn(parsedOptions.parameters, Some(nullableSchema))
ExprUtils.verifyColumnNameOfCorruptRecord(
nullableSchema,
parsedOptions.columnNameOfCorruptRecord)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ class JSONOptions(
val parseMode: ParseMode =
parameters.get(MODE).map(ParseMode.fromString).getOrElse(PermissiveMode)
val columnNameOfCorruptRecord =
parameters.getOrElse(COLUMN_NAME_OF_CORRUPTED_RECORD, defaultColumnNameOfCorruptRecord)
parameters.getOrElse(COLUMN_NAME_OF_CORRUPT_RECORD, defaultColumnNameOfCorruptRecord)

// Whether to ignore column of all null values or empty array/struct during schema inference
val dropFieldIfAllNull = parameters.get(DROP_FIELD_IF_ALL_NULL).map(_.toBoolean).getOrElse(false)
Expand Down Expand Up @@ -284,7 +284,7 @@ object JSONOptions extends DataSourceOptions {
val LINE_SEP = newOption("lineSep")
val PRETTY = newOption("pretty")
val INFER_TIMESTAMP = newOption("inferTimestamp")
val COLUMN_NAME_OF_CORRUPTED_RECORD = newOption("columnNameOfCorruptRecord")
val COLUMN_NAME_OF_CORRUPT_RECORD = newOption(DataSourceOptions.COLUMN_NAME_OF_CORRUPT_RECORD)
val TIME_ZONE = newOption("timeZone")
val WRITE_NON_ASCII_CHARACTER_AS_CODEPOINT = newOption("writeNonAsciiCharacterAsCodePoint")
val SINGLE_VARIANT_COLUMN = newOption(DataSourceOptions.SINGLE_VARIANT_COLUMN)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,7 @@ object XmlOptions extends DataSourceOptions {
val COMPRESSION = newOption("compression")
val MULTI_LINE = newOption("multiLine")
val SAMPLING_RATIO = newOption("samplingRatio")
val COLUMN_NAME_OF_CORRUPT_RECORD = newOption("columnNameOfCorruptRecord")
val COLUMN_NAME_OF_CORRUPT_RECORD = newOption(DataSourceOptions.COLUMN_NAME_OF_CORRUPT_RECORD)
val DATE_FORMAT = newOption("dateFormat")
val TIMESTAMP_FORMAT = newOption("timestampFormat")
val TIMESTAMP_NTZ_FORMAT = newOption("timestampNTZFormat")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3355,10 +3355,10 @@ private[sql] object QueryCompilationErrors extends QueryErrorsBase with Compilat
"config" -> SQLConf.LEGACY_PATH_OPTION_BEHAVIOR.key))
}

def invalidSingleVariantColumn(): Throwable = {
def invalidSingleVariantColumn(schema: DataType): Throwable = {
new AnalysisException(
errorClass = "INVALID_SINGLE_VARIANT_COLUMN",
messageParameters = Map.empty)
messageParameters = Map("schema" -> toSQLType(schema)))
}

def writeWithSaveModeUnsupportedBySourceError(source: String, createMode: String): Throwable = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ import org.apache.spark.sql.catalyst.plans.logical.UnresolvedDataSource
import org.apache.spark.sql.catalyst.util.FailureSafeParser
import org.apache.spark.sql.catalyst.xml.{StaxXmlParser, XmlOptions}
import org.apache.spark.sql.classic.ClassicConversions._
import org.apache.spark.sql.errors.QueryCompilationErrors
import org.apache.spark.sql.execution.datasources.csv._
import org.apache.spark.sql.execution.datasources.jdbc.{JDBCOptions, JDBCPartition, JDBCRelation}
import org.apache.spark.sql.execution.datasources.json.JsonUtils.checkJsonSchema
Expand Down Expand Up @@ -336,12 +335,8 @@ class DataFrameReader private[sql](sparkSession: SparkSession)
override def textFile(paths: String*): Dataset[String] = super.textFile(paths: _*)

/** @inheritdoc */
override protected def validateSingleVariantColumn(): Unit = {
if (extraOptions.get(DataSourceOptions.SINGLE_VARIANT_COLUMN).isDefined &&
userSpecifiedSchema.isDefined) {
throw QueryCompilationErrors.invalidSingleVariantColumn()
}
}
override protected def validateSingleVariantColumn(): Unit =
DataSourceOptions.validateSingleVariantColumn(extraOptions, userSpecifiedSchema)

override protected def validateJsonSchema(): Unit =
userSpecifiedSchema.foreach(checkJsonSchema)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -789,14 +789,27 @@ class CsvFunctionsSuite extends QueryTest with SharedSparkSession {
from_csv(
$"value",
StructType.fromDDL("v variant"),
Map("header" -> header, "singleVariantColumn" -> "true")
Map("header" -> header, "singleVariantColumn" -> "v")
).cast("string")),
Seq(
Row("""{{"_c0":100,"_c1":1.1}}"""),
Row("""{{"_c0":"2000-01-01","_c1":"2000-01-01 01:02:03+00:00"}}"""),
Row("""{{"_c0":null,"_c1":true}}"""),
Row("""{{"_c0":"1e9","_c1":"hello","_c2":"extra"}}"""),
Row("""{{"_c0":"missing"}}""")))
checkAnswer(
df.select(
from_csv(
$"value",
StructType.fromDDL("v variant, _corrupt_record string"),
Map("header" -> header, "singleVariantColumn" -> "v")
).cast("string")),
Seq(
Row("""{{"_c0":100,"_c1":1.1}, null}"""),
Row("""{{"_c0":"2000-01-01","_c1":"2000-01-01 01:02:03+00:00"}, null}"""),
Row("""{{"_c0":null,"_c1":true}, null}"""),
Row("""{{"_c0":"1e9","_c1":"hello","_c2":"extra"}, null}"""),
Row("""{{"_c0":"missing"}, null}""")))
}
}
checkError(
Expand All @@ -807,7 +820,20 @@ class CsvFunctionsSuite extends QueryTest with SharedSparkSession {
StructType.fromDDL("a variant, b variant"),
Map("singleVariantColumn" -> "true"))).collect()
},
condition = "INVALID_SINGLE_VARIANT_COLUMN")
condition = "INVALID_SINGLE_VARIANT_COLUMN",
parameters = Map("schema" -> "\"STRUCT<a: VARIANT, b: VARIANT>\""))

// In singleVariantColumn mode, from_csv normally treats all inputs as valid. The only exception
// case is the input exceeds the variant size limit (16MiB).
val largeInput = "a" * (16 * 1024 * 1024)
checkAnswer(
Seq(largeInput).toDF("value").select(
from_csv(
$"value",
StructType.fromDDL("v variant, _corrupt_record string"),
Map("singleVariantColumn" -> "v")
).cast("string")),
Seq(Row(s"""{null, $largeInput}""")))
}

test("SPARK-47497: the input of to_csv must be StructType") {
Expand Down
46 changes: 29 additions & 17 deletions sql/core/src/test/scala/org/apache/spark/sql/VariantSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -433,23 +433,35 @@ class VariantSuite extends QueryTest with SharedSparkSession with ExpressionEval
val file = new File(dir, "file.json")
Files.write(file.toPath, "0".getBytes(StandardCharsets.UTF_8))

// Ensure that we get an error when setting the singleVariantColumn JSON option while also
// specifying a schema.
checkError(
exception = intercept[AnalysisException] {
spark.read.format("json").option("singleVariantColumn", "var").schema("var variant")
},
condition = "INVALID_SINGLE_VARIANT_COLUMN",
parameters = Map.empty
)
checkError(
exception = intercept[AnalysisException] {
spark.read.format("json").option("singleVariantColumn", "another_name")
.schema("var variant").json(file.getAbsolutePath).collect()
},
condition = "INVALID_SINGLE_VARIANT_COLUMN",
parameters = Map.empty
)
// These are valid.
for ((schema, options) <- Seq(
("var variant", Map()),
("var variant, _corrupt_record string", Map()),
("_corrupt_record string, var variant", Map()),
("c string, var variant", Map("columnNameOfCorruptRecord" -> "c"))
)) {
spark.read.options(Map("singleVariantColumn" -> "var") ++ options)
.schema(schema).json(file.getAbsolutePath).collect()
}

// These are invalid.
for ((schema, options) <- Seq(
("a variant", Map()),
("var int", Map()),
("var variant, c string", Map()),
("var variant, _corrupt_record int", Map()),
("var variant, _corrupt_record string", Map("columnNameOfCorruptRecord" -> "c")),
("var variant, var string", Map("columnNameOfCorruptRecord" -> "var"))
)) {
checkError(
exception = intercept[AnalysisException] {
spark.read.options(Map("singleVariantColumn" -> "var") ++ options)
.schema(schema).json(file.getAbsolutePath).collect()
},
condition = "INVALID_SINGLE_VARIANT_COLUMN",
parameters = Map("schema" -> s"\"${StructType.fromDDL(schema).sql}\"")
)
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3556,6 +3556,23 @@ abstract class CSVSuite
"""{"field 1":"missing"}""")
}

withSQLConf(SQLConf.SESSION_LOCAL_TIMEZONE.key -> "UTC") {
val options = Map("header" -> "true", "singleVariantColumn" -> "v")
checkAnswer(
spark.read.options(options)
.schema("v variant, _corrupt_record string")
.csv(path.getCanonicalPath)
.selectExpr("cast(v as string)", "_corrupt_record"),
// When `header` is true, inconsistent lengths between the header and content line is
// treated as a corrupt record.
Seq(
Row("""{"field 1":100,"field2":1.1}""", null),
Row("""{"field 1":"2000-01-01","field2":"2000-01-01 01:02:03+00:00"}""", null),
Row("""{"field 1":null,"field2":true}""", null),
Row("""{"field 1":"1e9","field2":"hello"}""", "1e9,hello,extra"),
Row("""{"field 1":"missing"}""", "missing")))
}

checkError(
exception = intercept[SparkException] {
val options = Map("singleVariantColumn" -> "v", "header" -> "true", "mode" -> "failfast")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4007,6 +4007,7 @@ abstract class JsonSuite
"true",
"""{"a": [], "b": null}""",
"""{"a": 1}""",
"bad json",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add from_json test

"[1, 2, 3]"
).mkString("\n").getBytes(StandardCharsets.UTF_8)

Expand All @@ -4018,13 +4019,30 @@ abstract class JsonSuite
spark.read.format("json").option("singleVariantColumn", "var")
.load(file.getAbsolutePath)
.selectExpr("to_json(var)"),
Seq(Row("true"), Row("""{"a":[],"b":null}"""), Row("""{"a":1}"""), Row("[1,2,3]"))
Seq(Row("true"), Row("""{"a":[],"b":null}"""), Row("""{"a":1}"""), Row(null),
Row("[1,2,3]"))
)

checkAnswer(
spark.read.format("json").option("singleVariantColumn", "var")
.schema("var variant, _corrupt_record string")
.load(file.getAbsolutePath)
.selectExpr("to_json(var)", "_corrupt_record"),
Seq(Row("true", null), Row("""{"a":[],"b":null}""", null), Row("""{"a":1}""", null),
Row(null, "bad json"), Row("[1,2,3]", null))
)

checkAnswer(
spark.read.format("json").schema("a variant, b variant")
.load(file.getAbsolutePath).selectExpr("to_json(a)", "to_json(b)"),
Seq(Row(null, null), Row("[]", "null"), Row("1", null), Row(null, null))
Seq(Row(null, null), Row("[]", "null"), Row("1", null), Row(null, null), Row(null, null))
)

checkAnswer(
spark.read.format("json").schema("a variant, b variant, _corrupt_record string")
.load(file.getAbsolutePath).selectExpr("to_json(a)", "to_json(b)", "_corrupt_record"),
Seq(Row(null, null, "true"), Row("[]", "null", null), Row("1", null, null),
Row(null, null, "bad json"), Row(null, null, "[1, 2, 3]"))
)

withTempDir { streamDir =>
Expand All @@ -4038,7 +4056,8 @@ abstract class JsonSuite
stream.processAllAvailable()
checkAnswer(
spark.read.format("parquet").load(streamDir.getCanonicalPath + "/output"),
Seq(Row("true"), Row("""{"a":[],"b":null}"""), Row("""{"a":1}"""), Row("[1,2,3]"))
Seq(Row("true"), Row("""{"a":[],"b":null}"""), Row("""{"a":1}"""), Row(null),
Row("[1,2,3]"))
)
}
}
Expand All @@ -4051,10 +4070,32 @@ abstract class JsonSuite
spark.read.format("json").option("singleVariantColumn", "var")
.load(dir.getAbsolutePath).selectExpr("a", "b", "to_json(var)"),
Seq(Row(1, 2, "true"), Row(1, 2, """{"a":[],"b":null}"""), Row(1, 2, """{"a":1}"""),
Row(1, 2, "[1,2,3]"))
Row(1, 2, null), Row(1, 2, "[1,2,3]"))
)
}
}

test("from_json with variant") {
val df = Seq(
"true",
"""{"a": [], "b": null}""",
"""{"a": 1}""",
"bad json",
"[1, 2, 3]"
).toDF("value")
checkAnswer(
df.selectExpr("cast(from_json(value, 'var variant', " +
"map('singleVariantColumn', 'var')) as string)"),
Seq(Row("{true}"), Row("""{{"a":[],"b":null}}"""), Row("""{{"a":1}}"""), Row("{null}"),
Row("{[1,2,3]}"))
)
checkAnswer(
df.selectExpr("cast(from_json(value, 'var variant, _corrupt_record string', " +
"map('singleVariantColumn', 'var')) as string)"),
Seq(Row("{true, null}"), Row("""{{"a":[],"b":null}, null}"""), Row("""{{"a":1}, null}"""),
Row("{null, bad json}"), Row("{[1,2,3], null}"))
)
}
}

class JsonV1Suite extends JsonSuite {
Expand Down