Skip to content

Commit 1e63dcc

Browse files
gaborgsomogyiHyukjinKwon
authored andcommitted
[SPARK-33102][SQL] Use stringToSeq on SQL list typed parameters
### What changes were proposed in this pull request? While I've implemented JDBC provider disable functionality it has been popped up [here](apache#29964 (comment)) that `Utils.stringToSeq` must be used when String list type SQL parameter handled. In this PR I've fixed the problematic parameters. ### Why are the changes needed? `Utils.stringToSeq` must be used when String list type SQL parameter handled. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Existing unit tests. Closes apache#29989 from gaborgsomogyi/SPARK-33102. Authored-by: Gabor Somogyi <[email protected]> Signed-off-by: HyukjinKwon <[email protected]>
1 parent 018811f commit 1e63dcc

File tree

2 files changed

+5
-3
lines changed

2 files changed

+5
-3
lines changed

Diff for: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala

+2-2
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ import org.apache.spark.sql.execution.datasources.v2.{StreamingDataSourceV2Relat
3232
import org.apache.spark.sql.execution.streaming.sources.WriteToMicroBatchDataSource
3333
import org.apache.spark.sql.internal.SQLConf
3434
import org.apache.spark.sql.streaming.{OutputMode, Trigger}
35-
import org.apache.spark.util.Clock
35+
import org.apache.spark.util.{Clock, Utils}
3636

3737
class MicroBatchExecution(
3838
sparkSession: SparkSession,
@@ -76,7 +76,7 @@ class MicroBatchExecution(
7676
// transformation is responsible for replacing attributes with their final values.
7777

7878
val disabledSources =
79-
sparkSession.sqlContext.conf.disabledV2StreamingMicroBatchReaders.split(",")
79+
Utils.stringToSeq(sparkSession.sqlContext.conf.disabledV2StreamingMicroBatchReaders)
8080

8181
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Implicits._
8282
val _logicalPlan = analyzedPlan.transform {

Diff for: sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala

+3-1
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ import org.apache.spark.sql.execution.streaming._
3636
import org.apache.spark.sql.execution.streaming.sources._
3737
import org.apache.spark.sql.internal.SQLConf
3838
import org.apache.spark.sql.util.CaseInsensitiveStringMap
39+
import org.apache.spark.util.Utils
3940

4041
/**
4142
* Interface used to write a streaming `Dataset` to external storage systems (e.g. file systems,
@@ -366,7 +367,8 @@ final class DataStreamWriter[T] private[sql](ds: Dataset[T]) {
366367
startQuery(sink, extraOptions)
367368
} else {
368369
val cls = DataSource.lookupDataSource(source, df.sparkSession.sessionState.conf)
369-
val disabledSources = df.sparkSession.sqlContext.conf.disabledV2StreamingWriters.split(",")
370+
val disabledSources =
371+
Utils.stringToSeq(df.sparkSession.sqlContext.conf.disabledV2StreamingWriters)
370372
val useV1Source = disabledSources.contains(cls.getCanonicalName) ||
371373
// file source v2 does not support streaming yet.
372374
classOf[FileDataSourceV2].isAssignableFrom(cls)

0 commit comments

Comments
 (0)