Skip to content

Latest commit

 

History

History
296 lines (192 loc) · 14.3 KB

spark-sql-streaming-StreamingQueryManager.adoc

File metadata and controls

296 lines (192 loc) · 14.3 KB

StreamingQueryManager — Streaming Query Management

StreamingQueryManager is the management interface for streaming queries in a single SparkSession.

StreamingQueryManager manages streaming queries and allows for:

StreamingQueryManager is available using SparkSession and streams property.

val spark: SparkSession = ...
val queries = spark.streams

StreamingQueryManager is created when SessionState is created.

StreamingQueryManager
Figure 1. StreamingQueryManager
Tip
Refer to the Mastering Apache Spark 2 gitbook to learn about SessionState.

StreamingQueryManager is used (internally) to create a StreamingQuery (with its StreamExecution).

StreamingQueryManager createQuery
Figure 2. StreamingQueryManager Creates StreamingQuery (and StreamExecution)
Table 1. StreamingQueryManager’s Internal Registries and Counters (in alphabetical order)
Name Description

activeQueries

Registry of StreamingQueryWrapper per id

Used when StreamingQueryManager is requested for active streaming queries, get a streaming query by id, starts a streaming query and is notified that a streaming query has terminated.

lastTerminatedQuery

StreamingQuery that has recently been terminated, i.e. stopped or due to an exception.

null when no streaming query has terminated yet or resetTerminated.

listenerBus

Used to:

stateStoreCoordinator

Getting All Active Streaming Queries — active Method

active: Array[StreamingQuery]

Getting Active Continuous Query By Name — get Method

get(name: String): StreamingQuery

get method returns a StreamingQuery by name.

It may throw an IllegalArgumentException when no StreamingQuery exists for the name.

java.lang.IllegalArgumentException: There is no active query with name hello
  at org.apache.spark.sql.StreamingQueryManager$$anonfun$get$1.apply(StreamingQueryManager.scala:59)
  at org.apache.spark.sql.StreamingQueryManager$$anonfun$get$1.apply(StreamingQueryManager.scala:59)
  at scala.collection.MapLike$class.getOrElse(MapLike.scala:128)
  at scala.collection.AbstractMap.getOrElse(Map.scala:59)
  at org.apache.spark.sql.StreamingQueryManager.get(StreamingQueryManager.scala:58)
  ... 49 elided

Registering StreamingQueryListener — addListener Method

addListener(listener: StreamingQueryListener): Unit

addListener requests StreamingQueryListenerBus to add the input listener.

De-Registering StreamingQueryListener — removeListener Method

removeListener(listener: StreamingQueryListener): Unit

removeListener requests StreamingQueryListenerBus to remove the input listener.

Waiting for Any Streaming Query Termination — awaitAnyTermination Method

awaitAnyTermination(): Unit
awaitAnyTermination(timeoutMs: Long): Boolean

awaitAnyTermination acquires a lock on awaitTerminationLock and waits until any streaming query has finished (i.e. lastTerminatedQuery is available) or timeoutMs has expired.

awaitAnyTermination re-throws the StreamingQueryException from lastTerminatedQuery if it reported one.

resetTerminated Method

resetTerminated(): Unit

resetTerminated forgets about the past-terminated query (so that awaitAnyTermination can be used again to wait for a new streaming query termination).

Internally, resetTerminated acquires a lock on awaitTerminationLock and simply resets lastTerminatedQuery (i.e. sets it to null).

Creating StreamingQueryManager Instance

StreamingQueryManager takes the following when created:

  • SparkSession

StreamingQueryManager initializes the internal registries and counters.

Creating StreamingQueryWrapper (Serializable StreamingQuery) with StreamExecution — createQuery Internal Method

createQuery(
  userSpecifiedName: Option[String],
  userSpecifiedCheckpointLocation: Option[String],
  df: DataFrame,
  sink: Sink,
  outputMode: OutputMode,
  useTempCheckpointLocation: Boolean,
  recoverFromCheckpointLocation: Boolean,
  trigger: Trigger,
  triggerClock: Clock): StreamingQueryWrapper

createQuery creates a StreamingQueryWrapper (for a StreamExecution per the input user-defined properties).

Internally, createQuery first finds the name of the checkpoint directory of a query (aka checkpoint location) in the following order:

  1. Exactly the input userSpecifiedCheckpointLocation if defined

  2. spark.sql.streaming.checkpointLocation Spark property if defined for the parent directory with a subdirectory per the optional userSpecifiedName (or a randomly-generated UUID)

  3. (only when useTempCheckpointLocation is enabled) A temporary directory (as specified by java.io.tmpdir JVM property) with a subdirectory with temporary prefix.

Note
userSpecifiedCheckpointLocation can be any path that is acceptable by Hadoop’s Path.

If the directory name for the checkpoint location could not be found, createQuery reports a AnalysisException.

checkpointLocation must be specified either through option("checkpointLocation", ...) or SparkSession.conf.set("spark.sql.streaming.checkpointLocation", ...)

createQuery reports a AnalysisException when the input recoverFromCheckpointLocation flag is turned off but there is offsets directory in the checkpoint location.

createQuery makes sure that the logical plan of the structured query is analyzed (i.e. no logical errors have been found).

(only when spark.sql.adaptive.enabled Spark property is turned on) createQuery prints out a WARN message to the logs:

WARN spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.

In the end, createQuery creates a StreamingQueryWrapper with a StreamExecution.

Note

recoverFromCheckpointLocation flag corresponds to recoverFromCheckpointLocation flag that StreamingQueryManager uses to start a streaming query and which is enabled by default (and is in fact the only place where createQuery is used).

  • memory sink has the flag enabled for Complete output mode only

  • foreach sink has the flag always enabled

  • console sink has the flag always disabled

  • all other sinks have the flag always enabled

Note
userSpecifiedName corresponds to queryName option (that can be defined using DataStreamWriter's queryName method) while userSpecifiedCheckpointLocation is checkpointLocation option.
Note
createQuery is used exclusively when StreamingQueryManager starts executing a streaming query.

Starting Streaming Query — startQuery Internal Method

startQuery(
  userSpecifiedName: Option[String],
  userSpecifiedCheckpointLocation: Option[String],
  df: DataFrame,
  sink: Sink,
  outputMode: OutputMode,
  useTempCheckpointLocation: Boolean = false,
  recoverFromCheckpointLocation: Boolean = true,
  trigger: Trigger = ProcessingTime(0),
  triggerClock: Clock = new SystemClock()): StreamingQuery

startQuery starts a streaming query.

Note
trigger defaults to 0 milliseconds (as ProcessingTime(0)).

Internally, startQuery first creates a streaming query, registers it in activeQueries internal registry and starts the query.

In the end, startQuery returns the query (as part of the fluent API so you can chain operators) or reports the exception that was reported when starting the query.

startQuery reports a IllegalArgumentException when there is another query registered under name. startQuery looks it up in activeQueries internal registry.

Cannot start query with name [name] as a query with that name is already active

startQuery reports a IllegalStateException when a query is started again from checkpoint. startQuery looks it up in activeQueries internal registry.

Cannot start query with id [id] as another query with same id is already active.
Perhaps you are attempting to restart a query from checkpoint that is already active.
Note
startQuery is used exclusively when DataStreamWriter is started.

Posting StreamingQueryListener Event to StreamingQueryListenerBus — postListenerEvent Internal Method

postListenerEvent(event: StreamingQueryListener.Event): Unit

postListenerEvent simply posts the input event to StreamingQueryListenerBus.

StreamingQueryManager postListenerEvent
Figure 3. StreamingQueryManager Propagates StreamingQueryListener Events
Note
postListenerEvent is used exclusively when StreamExecution posts a streaming event.

Marking Streaming Query as Terminated (and Deactivating Query in StateStoreCoordinator) — notifyQueryTermination Internal Method

notifyQueryTermination(terminatedQuery: StreamingQuery): Unit

notifyQueryTermination removes the terminatedQuery from activeQueries internal registry (by the query id).

notifyQueryTermination records the terminatedQuery in lastTerminatedQuery internal registry (when no earlier streaming query was recorded or the terminatedQuery terminated due to an exception).

notifyQueryTermination notifies others that are blocked on awaitTerminationLock.

In the end, notifyQueryTermination requests StateStoreCoordinator to deactivate all active runs of the streaming query.

StreamingQueryManager notifyQueryTermination
Figure 4. StreamingQueryManager’s Marking Streaming Query as Terminated
Note
notifyQueryTermination is used exclusively when StreamExecution has finished (running streaming batches) (possibly due to an exception).