Skip to content

Commit 7a8de69

Browse files
committed
[SPARK-53656][SS] Fix ambiguity when both SparkSession and SQLContext are defined as implicit variables
1 parent bc92133 commit 7a8de69

File tree

2 files changed

+45
-2
lines changed

2 files changed

+45
-2
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/runtime/memory.scala

Lines changed: 23 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ import javax.annotation.concurrent.GuardedBy
2424
import scala.collection.mutable.ListBuffer
2525

2626
import org.apache.spark.internal.Logging
27-
import org.apache.spark.sql.{Encoder, SparkSession}
27+
import org.apache.spark.sql.{Encoder, SparkSession, SQLContext}
2828
import org.apache.spark.sql.catalyst.InternalRow
2929
import org.apache.spark.sql.catalyst.encoders.{encoderFor, ExpressionEncoder}
3030
import org.apache.spark.sql.catalyst.expressions.UnsafeRow
@@ -43,7 +43,7 @@ import org.apache.spark.sql.internal.connector.SimpleTableProvider
4343
import org.apache.spark.sql.types.StructType
4444
import org.apache.spark.sql.util.CaseInsensitiveStringMap
4545

46-
object MemoryStream {
46+
object MemoryStream extends LowPriorityMemoryStreamImplicits {
4747
protected val currentBlockId = new AtomicInteger(0)
4848
protected val memoryStreamId = new AtomicInteger(0)
4949

@@ -54,6 +54,27 @@ object MemoryStream {
5454
new MemoryStream[A](memoryStreamId.getAndIncrement(), sparkSession, Some(numPartitions))
5555
}
5656

57+
/**
58+
* Provides lower-priority implicits for MemoryStream to prevent ambiguity when both
59+
* SparkSession and SQLContext are in scope. The implicits in the companion object,
60+
* which use SparkSession, take higher precedence.
61+
*/
62+
trait LowPriorityMemoryStreamImplicits {
63+
this: MemoryStream.type =>
64+
65+
// Deprecated: Used when an implicit SQLContext is in scope
66+
@deprecated("Use MemoryStream.apply with an implicit SparkSession instead of SQLContext", "4.1.0")
67+
def apply[A: Encoder]()(implicit sqlContext: SQLContext): MemoryStream[A] =
68+
new MemoryStream[A](memoryStreamId.getAndIncrement(), sqlContext.sparkSession)
69+
70+
@deprecated("Use MemoryStream.apply with an implicit SparkSession instead of SQLContext", "4.1.0")
71+
def apply[A: Encoder](numPartitions: Int)(implicit sqlContext: SQLContext): MemoryStream[A] =
72+
new MemoryStream[A](
73+
memoryStreamId.getAndIncrement(),
74+
sqlContext.sparkSession,
75+
Some(numPartitions))
76+
}
77+
5778
/**
5879
* A base class for memory stream implementations. Supports adding data and resetting.
5980
*/

sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/MemorySinkSuite.scala

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -343,6 +343,28 @@ class MemorySinkSuite extends StreamTest with BeforeAndAfter {
343343
intsToDF(expected)(schema))
344344
}
345345

346+
test("LowPriorityMemoryStreamImplicits works with implicit sqlContext") {
347+
// Test that MemoryStream can be created using implicit sqlContext
348+
implicit val sqlContext: SQLContext = spark.sqlContext
349+
350+
// Test MemoryStream[A]() with implicit sqlContext
351+
val stream1 = MemoryStream[Int]()
352+
assert(stream1 != null)
353+
354+
// Test MemoryStream[A](numPartitions) with implicit sqlContext
355+
val stream2 = MemoryStream[String](3)
356+
assert(stream2 != null)
357+
358+
// Verify the streams work correctly
359+
stream1.addData(1, 2, 3)
360+
val df1 = stream1.toDF()
361+
assert(df1.schema.fieldNames.contains("value"))
362+
363+
stream2.addData("a", "b", "c")
364+
val df2 = stream2.toDF()
365+
assert(df2.schema.fieldNames.contains("value"))
366+
}
367+
346368
private implicit def intsToDF(seq: Seq[Int])(implicit schema: StructType): DataFrame = {
347369
require(schema.fields.length === 1)
348370
sqlContext.createDataset(seq).toDF(schema.fieldNames.head)

0 commit comments

Comments
 (0)