Skip to content

Latest commit

 

History

History
1098 lines (718 loc) · 56.5 KB

spark-sql-streaming-StreamExecution.adoc

File metadata and controls

1098 lines (718 loc) · 56.5 KB

StreamExecution — Execution Environment of Streaming Dataset

StreamExecution is the execution environment of a single continuous query (aka streaming Dataset) that is executed every trigger and in the end adds the results to a sink.

Note
Continuous query, streaming query, continuous Dataset, streaming Dataset are synonyms, and StreamExecution uses analyzed logical plan internally to refer to it.
Note
StreamExecution corresponds to a single streaming query with one or more streaming sources and exactly one streaming sink.

StreamExecution is created exclusively when DataStreamWriter is started.

scala> spark.version
res0: String = 2.3.0-SNAPSHOT

import org.apache.spark.sql.streaming.Trigger
import scala.concurrent.duration._
val q = spark.
  readStream.
  format("rate").
  load.
  writeStream.
  format("console").
  trigger(Trigger.ProcessingTime(10.minutes)).
  start
scala> :type q
org.apache.spark.sql.streaming.StreamingQuery

// Pull out StreamExecution off StreamingQueryWrapper
import org.apache.spark.sql.execution.streaming.{StreamExecution, StreamingQueryWrapper}
val se = q.asInstanceOf[StreamingQueryWrapper].streamingQuery
scala> :type se
org.apache.spark.sql.execution.streaming.StreamExecution
StreamExecution creating instance
Figure 1. Creating Instance of StreamExecution
Note
DataStreamWriter describes how the results of executing batches of a streaming query are written to a streaming sink.

StreamExecution starts a thread of execution that runs the streaming query continuously and concurrently (and polls for new records in the streaming data sources to create a batch every trigger).

StreamExecution start
Figure 2. StreamExecution’s Starting Streaming Query (on Execution Thread)

StreamExecution can be in three states:

  • INITIALIZED when the instance was created.

  • ACTIVE when batches are pulled from the sources.

  • TERMINATED when executing streaming batches has been terminated due to an error, all batches were successfully processed or StreamExecution has been stopped.

StreamExecution is a ProgressReporter and reports status of the streaming query (i.e. when it starts, progresses and terminates) by posting StreamingQueryListener events.

StreamExecution tracks streaming data sources in uniqueSources internal registry.

StreamExecution uniqueSources
Figure 3. StreamExecution’s uniqueSources Registry of Streaming Data Sources

StreamExecution collects durationMs for the execution units of streaming batches.

StreamExecution durationMs
Figure 4. StreamExecution’s durationMs
scala> :type q
org.apache.spark.sql.streaming.StreamingQuery

scala> println(q.lastProgress)
{
  "id" : "03fc78fc-fe19-408c-a1ae-812d0e28fcee",
  "runId" : "8c247071-afba-40e5-aad2-0e6f45f22488",
  "name" : null,
  "timestamp" : "2017-08-14T20:30:00.004Z",
  "batchId" : 1,
  "numInputRows" : 432,
  "inputRowsPerSecond" : 0.9993568953312452,
  "processedRowsPerSecond" : 1380.1916932907347,
  "durationMs" : {
    "addBatch" : 237,
    "getBatch" : 26,
    "getOffset" : 0,
    "queryPlanning" : 1,
    "triggerExecution" : 313,
    "walCommit" : 45
  },
  "stateOperators" : [ ],
  "sources" : [ {
    "description" : "RateSource[rowsPerSecond=1, rampUpTimeSeconds=0, numPartitions=8]",
    "startOffset" : 0,
    "endOffset" : 432,
    "numInputRows" : 432,
    "inputRowsPerSecond" : 0.9993568953312452,
    "processedRowsPerSecond" : 1380.1916932907347
  } ],
  "sink" : {
    "description" : "ConsoleSink[numRows=20, truncate=true]"
  }
}

StreamExecution uses OffsetSeqLog and BatchCommitLog metadata logs for write-ahead log (to record offsets to be processed) and that have already been processed and committed to a streaming sink, respectively.

Tip
Monitor offsets and commits metadata logs to know the progress of a streaming query.

StreamExecution delays polling for new data for 10 milliseconds (when no data was available to process in a batch). Use spark.sql.streaming.pollingDelay Spark property to control the delay.

Table 1. StreamExecution’s Internal Registries and Counters (in alphabetical order)
Name Description

availableOffsets

StreamProgress of the streaming sources with their available and unprocessed offsets.

Note
availableOffsets is a part of ProgressReporter Contract.
Note
StreamProgress is an enhanced immutable.Map from Scala with streaming sources as keys and their Offsets as values.

Set when (in order):

  1. StreamExecution resumes and populates the start offsets with the latest offsets from the offset log that may have already been processed (and committed to the batch commit log so they are used as the current committed offsets)

  2. StreamExecution constructs the next streaming batch (and gets offsets from the sources)

Note

You can see availableOffsets in the DEBUG message in the logs when StreamExecution resumes and populates the start offsets.

DEBUG Resuming at batch [currentBatchId] with committed offsets [committedOffsets] and available offsets [availableOffsets]

Used when:

Note
availableOffsets works in tandem with committedOffsets registry.

awaitBatchLock

Java’s fair reentrant mutual exclusion java.util.concurrent.locks.ReentrantLock (that favors granting access to the longest-waiting thread under contention).

batchCommitLog

BatchCommitLog with commits metadata checkpoint directory for completed streaming batches (with a single file per batch with a file name being the batch id).

Note
Metadata log or metadata checkpoint are synonyms and are often used interchangeably.

Used when StreamExecution runs streaming batches (and records a batch that had data for processing and has finished successfully) and populates the start offsets (by looking up what has been committed the last time the streaming query ran).

Note
StreamExecution discards offsets from the batch commit log when the current batch id is above spark.sql.streaming.minBatchesToRetain Spark property (which defaults to 100).

committedOffsets

StreamProgress of the streaming sources and the committed offsets (i.e. processed already).

Note
committedOffsets is a part of ProgressReporter Contract.

currentBatchId

Current batch number

id

Unique identifier of the streaming query

Set as the id of streamMetadata when StreamExecution is created.

Note
id can get fetched from checkpoint metadata if available and thus recovered when a query is resumed (i.e. restarted after a failure or a planned stop).

initializationLatch

lastExecution

Last IncrementalExecution

logicalPlan

Lazily-generated logical plan (i.e. LogicalPlan) of the streaming Dataset

Note
logicalPlan is a part of ProgressReporter Contract.

Initialized right after StreamExecution starts running streaming batches (which is when stream execution thread is started).

Used mainly when StreamExecution replaces StreamingExecutionRelations in a logical query plan with relations with new data that has arrived since the last batch.


While initializing, logicalPlan transforms the analyzed logical plan so that every StreamingRelation is replaced with a StreamingExecutionRelation. logicalPlan creates a StreamingExecutionRelation with source created using a metadata path as /sources/[nextSourceId] under the checkpoint directory.

Note
nextSourceId is the unique identifier of every StreamingRelation in analyzed logical plan starting from 0.
Note
logicalPlan uses DataSource.createSource factory method to create a streaming Source that assumes StreamSourceProvider or FileFormat as the implementations of the streaming data sources for reading.

While initializing, logicalPlan also initializes sources and uniqueSources registries.

microBatchThread

Thread of execution to run a streaming query concurrently with the name as stream execution thread for [prettyIdString] (that uses prettyIdString for logging purposes).

When started, microBatchThread sets the so-called call site and runs streaming batches.

Note
microBatchThread is Java’s java.util.Thread.
Tip

Use Java’s jconsole or jstack to monitor the streaming threads.

$ jstack <driver-pid> | grep -e "stream execution thread"
"stream execution thread for kafka-topic1 [id =...

newData

Registry of the streaming sources (in logical query plan) that have new data available in the current batch. The new data is a streaming DataFrame.

Note
newData is a part of ProgressReporter Contract.

noNewData

Flag whether there are any new offsets available for processing or not.

Turned on (i.e. enabled) when constructing the next streaming batch when no new offsets are available.

offsetLog

OffsetSeqLog with offsets metadata checkpoint directory for write-ahead log to record offsets in as ready for processing.

Note
Metadata log or metadata checkpoint are synonyms and are often used interchangeably.

Used when StreamExecution populates the start offsets and constructs the next streaming batch (first to store the current batch’s offsets in a write-ahead log and retrieve the previous batch’s offsets right afterwards).

Note
StreamExecution discards offsets from the offset metadata log when the current batch id is above spark.sql.streaming.minBatchesToRetain Spark property (which defaults to 100).

offsetSeqMetadata

Note
offsetSeqMetadata is a part of ProgressReporter Contract.
  • Initialized with 0 for batchWatermarkMs and batchTimestampMs when StreamExecution is created.

  • Updated with 0 for batchWatermarkMs and batchTimestampMs and SparkSession with spark.sql.adaptive.enabled disabled when StreamExecution runs streaming batches.

  • Used in…​FIXME

  • Copied with batchTimestampMs updated with the current time (in milliseconds) when StreamExecution constructs the next streaming batch.

pollingDelayMs

Time delay before polling new data again when no data was available

Set to spark.sql.streaming.pollingDelay Spark property.

Used when StreamExecution has started running streaming batches (and no data was available to process in a trigger).

prettyIdString

Pretty-identified string for identification in logs (with name if defined).

queryName [id = xyz, runId = abc]

[id = xyz, runId = abc]

resolvedCheckpointRoot

Qualified path of the checkpoint directory (as defined using checkpointRoot when StreamExecution is created).

Note

checkpointRoot is defined using checkpointLocation option or spark.sql.streaming.checkpointLocation Spark property with queryName option.

checkpointLocation and queryName options are defined when StreamingQueryManager creates a streaming query.

Used when creating the path to the checkpoint directory and when StreamExecution finishes running streaming batches.

Used for logicalPlan (while transforming analyzedPlan and planning StreamingRelation logical operators to corresponding StreamingExecutionRelation physical operators with the streaming data sources created passing in the path to sources directory to store checkpointing metadata).

Note

You can see resolvedCheckpointRoot in the INFO message when StreamExecution is started.

INFO StreamExecution: Starting [id] with [resolvedCheckpointRoot] to store the query checkpoint.

Internally, resolvedCheckpointRoot creates a Hadoop org.apache.hadoop.fs.Path for checkpointRoot and makes it qualified.

Note
resolvedCheckpointRoot uses SparkSession to access SessionState for a Hadoop configuration.

runId

Current run id

sources

All streaming Sources in logical query plan (that are the sources from StreamingExecutionRelation).

startLatch

Java’s java.util.concurrent.CountDownLatch with count 1.

Used when StreamExecution is started to get notified when StreamExecution has started running streaming batches.

state

Java’s java.util.concurrent.atomic.AtomicReference for the three different states a streaming query execution can be:

  • INITIALIZING (default)

  • ACTIVE (after the first execution of runBatches)

  • TERMINATED

streamDeathCause

StreamingQueryException

streamMetadata

StreamMetadata from the metadata file from checkpoint directory. If the metadata file is not available it is created (with a new random id).

triggerExecutor

  • ProcessingTimeExecutor for ProcessingTime

  • OneTimeExecutor for OneTimeTrigger (aka Once trigger)

Used when StreamExecution starts running streaming batches.

Note
StreamExecution reports a IllegalStateException when TriggerExecutor is different from the two built-in implementations: OneTimeExecutor or ProcessingTimeExecutor.

uniqueSources

Unique streaming data sources in a streaming Dataset (after being collected as StreamingExecutionRelation from the corresponding logical query plan).

Note
StreamingExecutionRelation is a leaf logical operator (i.e. LogicalPlan) that represents a streaming data source (and corresponds to a single StreamingRelation in analyzed logical query plan of a streaming Dataset).

Used when StreamExecution:

Tip

Enable INFO or DEBUG logging levels for org.apache.spark.sql.execution.streaming.StreamExecution to see what happens inside.

Add the following line to conf/log4j.properties:

log4j.logger.org.apache.spark.sql.execution.streaming.StreamExecution=DEBUG

Refer to Logging.

stop Method

Caution
FIXME

stopSources Internal Method

stopSources(): Unit
Caution
FIXME

Running Single Streaming Batch — runBatch Internal Method

runBatch(sparkSessionToRunBatch: SparkSession): Unit

runBatch performs the following steps (aka phases):

Note
runBatch is used exclusively when StreamExecution runs streaming batches.

getBatch Phase — Requesting New (and Hence Unprocessed) Data From Streaming Sources

Internally, runBatch first requests the streaming sources for unprocessed data (and stores them as DataFrames in newData internal registry).

In getBatch time-tracking section, runBatch goes over the available offsets per source and processes the offsets that have not been committed yet.

runBatch then requests every source for the data (as DataFrame with the new records).

Note
runBatch requests the streaming sources for new DataFrames sequentially, source by source.
StreamExecution runBatch getBatch
Figure 5. StreamExecution’s Running Single Streaming Batch (getBatch Phase)

You should see the following DEBUG message in the logs:

DEBUG StreamExecution: Retrieving data from [source]: [current] -> [available]

You should then see the following DEBUG message in the logs:

DEBUG StreamExecution: getBatch took [timeTaken] ms

withNewSources Phase — Replacing StreamingExecutionRelations (in Logical Plan) With Relations With New Data or Empty LocalRelation

StreamExecution runBatch withNewSources
Figure 6. StreamExecution’s Running Single Streaming Batch (withNewSources Phase)

In withNewSources phase, runBatch transforms logical query plan and replaces every StreamingExecutionRelation logical operator with the logical plan of the DataFrame with the input data in a batch for the corresponding streaming source.

Note
StreamingExecutionRelation logical operator is used to represent a streaming source in the logical query plan of a streaming Dataset.

runBatch finds the corresponding DataFrame (with the input data) per streaming source in newData internal registry. If found, runBatch takes the logical plan of the DataFrame. If not, runBatch creates a LocalRelation logical relation (for the output schema).

Note
newData internal registry contains entries for streaming sources that have new data available in the current batch.

While replacing StreamingExecutionRelation operators, runBatch records the output schema of the streaming source (from StreamingExecutionRelation) and the DataFrame with the new data (in replacements temporary internal buffer).

runBatch makes sure that the output schema of the streaming source with a new data in the batch has not changed. If the output schema has changed, runBatch reports…​FIXME

triggerLogicalPlan Phase — Transforming Catalyst Expressions

runBatch transforms Catalyst expressions in withNewSources new logical plan (using replacements temporary internal buffer).

  • Catalyst Attribute is replaced with one if recorded in replacements internal buffer (that corresponds to the attribute in the DataFrame with the new input data in the batch)

  • CurrentTimestamp and CurrentDate Catalyst expressions are replaced with CurrentBatchTimestamp expression (with batchTimestampMs from OffsetSeqMetadata).

Note

CurrentTimestamp Catalyst expression corresponds to current_timestamp function.

Find more about current_timestamp function in Mastering Apache Spark 2 gitbook.

Note

CurrentDate Catalyst expression corresponds to current_date function.

Find more about current_date function in Mastering Apache Spark 2 gitbook.

queryPlanning Phase — Creating IncrementalExecution for Current Streaming Batch

StreamExecution runBatch queryPlanning
Figure 7. StreamExecution’s Query Planning (queryPlanning Phase)

In queryPlanning time-tracking section, runBatch creates a new IncrementalExecution with the following:

The new IncrementalExecution is recorded in lastExecution property.

Before leaving queryPlanning section, runBatch forces preparation of the physical plan for execution (i.e. requesting IncrementalExecution for executedPlan).

Note
executedPlan is a physical plan (i.e. SparkPlan) ready for execution with state optimization rules applied.

nextBatch Phase — Creating Dataset (with IncrementalExecution for New Data)

StreamExecution runBatch nextBatch
Figure 8. StreamExecution Creates DataFrame with New Data

runBatch creates a DataFrame with the new IncrementalExecution (as QueryExecution) and its analyzed output schema.

Note
The new DataFrame represents the result of a streaming query.

addBatch Phase — Adding Current Streaming Batch to Sink

StreamExecution runBatch addBatch
Figure 9. StreamExecution Creates DataFrame with New Data

In addBatch time-tracking section, runBatch requests the one and only streaming Sink to add the results of a streaming query (as the DataFrame created in nextBatch Phase).

Note
runBatch uses Sink.addBatch method to request the Sink to add the results.
Note
runBatch uses SQLExecution.withNewExecutionId to execute and track all the Spark actions (under one execution id) that Sink can use when requested to add the results.
Note
The new DataFrame will only be executed in Sink.addBatch.
Note
SQLExecution.withNewExecutionId posts a SparkListenerSQLExecutionStart event before executing Sink.addBatch and a SparkListenerSQLExecutionEnd event right afterwards.
Tip

Register SparkListener to get notified about the SQL execution events.

You can find more information on SparkListener in Mastering Apache Spark 2 gitbook.

awaitBatchLock Phase — Waking Up Threads Waiting For Stream to Progress

In awaitBatchLock code block (it is not a time-tracking section), runBatch acquires a lock on awaitBatchLock, wakes up all waiting threads on awaitBatchLockCondition and immediatelly releases awaitBatchLock lock.

Note
awaitBatchLockCondition is used mainly when StreamExecution processAllAvailable (and also when awaitOffset, but that seems mainly for testing).

Running Streaming Batches — runBatches Internal Method

runBatches(): Unit

runBatches runs streaming batches of data (that are datasets from every streaming source).

import org.apache.spark.sql.streaming.Trigger
import scala.concurrent.duration._

val out = spark.
  readStream.
  text("server-logs").
  writeStream.
  format("console").
  queryName("debug").
  trigger(Trigger.ProcessingTime(10.seconds))
scala> val debugStream = out.start
INFO StreamExecution: Starting debug [id = 8b57b0bd-fc4a-42eb-81a3-777d7ba5e370, runId = 920b227e-6d02-4a03-a271-c62120258cea]. Use file:///private/var/folders/0w/kb0d3rqn4zb9fcc91pxhgn8w0000gn/T/temporary-274f9ae1-1238-4088-b4a1-5128fc520c1f to store the query checkpoint.
debugStream: org.apache.spark.sql.streaming.StreamingQuery = org.apache.spark.sql.execution.streaming.StreamingQueryWrapper@58a5b69c

// Enable the log level to see the INFO and DEBUG messages
// log4j.logger.org.apache.spark.sql.execution.streaming.StreamExecution=DEBUG

17/06/18 21:21:07 INFO StreamExecution: Starting new streaming query.
17/06/18 21:21:07 DEBUG StreamExecution: getOffset took 5 ms
17/06/18 21:21:07 DEBUG StreamExecution: Stream running from {} to {}
17/06/18 21:21:07 DEBUG StreamExecution: triggerExecution took 9 ms
17/06/18 21:21:07 DEBUG StreamExecution: Execution stats: ExecutionStats(Map(),List(),Map())
17/06/18 21:21:07 INFO StreamExecution: Streaming query made progress: {
  "id" : "8b57b0bd-fc4a-42eb-81a3-777d7ba5e370",
  "runId" : "920b227e-6d02-4a03-a271-c62120258cea",
  "name" : "debug",
  "timestamp" : "2017-06-18T19:21:07.693Z",
  "numInputRows" : 0,
  "processedRowsPerSecond" : 0.0,
  "durationMs" : {
    "getOffset" : 5,
    "triggerExecution" : 9
  },
  "stateOperators" : [ ],
  "sources" : [ {
    "description" : "FileStreamSource[file:/Users/jacek/dev/oss/spark/server-logs]",
    "startOffset" : null,
    "endOffset" : null,
    "numInputRows" : 0,
    "processedRowsPerSecond" : 0.0
  } ],
  "sink" : {
    "description" : "org.apache.spark.sql.execution.streaming.ConsoleSink@2460208a"
  }
}
17/06/18 21:21:10 DEBUG StreamExecution: Starting Trigger Calculation
17/06/18 21:21:10 DEBUG StreamExecution: getOffset took 3 ms
17/06/18 21:21:10 DEBUG StreamExecution: triggerExecution took 3 ms
17/06/18 21:21:10 DEBUG StreamExecution: Execution stats: ExecutionStats(Map(),List(),Map())

Internally, runBatches assigns the group id (to all the Spark jobs started by this thread) as runId (with the group description to display in web UI as getBatchDescriptionString and interruptOnCancel flag enabled).

Note

runBatches uses SparkSession to access SparkContext and assign the group id.

You can find the details on SparkContext.setJobGroup method in the Mastering Apache Spark 2 gitbook.

runBatches sets a local property sql.streaming.queryId as id.

runBatches registers a metric source when spark.sql.streaming.metricsEnabled property is enabled (which is disabled by default).

Caution
FIXME Metrics

runBatches notifies StreamingQueryListeners that a streaming query has been started (by posting a QueryStartedEvent with id, runId and name).

StreamingQueryListener onQueryStarted
Figure 10. StreamingQueryListener Notified about Query’s Start (onQueryStarted)

runBatches unblocks the main starting thread (by decrementing the count of startLatch that goes to 0 and lets the starting thread continue).

Caution
FIXME A picture with two parallel lanes for the starting thread and daemon one for the query.

runBatches updates the status message to Initializing sources followed by initialization of the logical plan (of the streaming Dataset).

runBatches disables adaptive query execution (using spark.sql.adaptive.enabled property which is disabled by default) as it could change the number of shuffle partitions.

runBatches initializes offsetSeqMetadata internal variable.

runBatches sets state to ACTIVE (only when the current state is INITIALIZING that prevents from repeating the initialization)

Note
runBatches does the work only when first started (i.e. when state is INITIALIZING).

runBatches decrements the count of initializationLatch.

Caution
FIXME initializationLatch so what?

runBatches requests TriggerExecutor to start executing batches (aka triggers) by executing a batch runner.

Once TriggerExecutor has finished executing batches, runBatches updates the status message to Stopped.

Note
TriggerExecutor finishes executing batches when batch runner returns whether the streaming query is stopped or not (which is when the internal state is not TERMINATED).
Caution
FIXME Describe catch block for exception handling
Caution
FIXME Describe finally block for query termination
Note
runBatches is used exclusively when StreamExecution starts the execution thread for a streaming query (i.e. the thread that runs the micro-batches of this stream).

TriggerExecutor’s Batch Runner

Batch Runner (aka batchRunner) is an executable block executed by TriggerExecutor in runBatches.

As long as the query is not stopped (i.e. state is not TERMINATED), batchRunner executes the streaming batch for the trigger.

In triggerExecution time-tracking section, runBatches branches off per currentBatchId.

Table 2. Current Batch Execution per currentBatchId
currentBatchId < 0 currentBatchId >= 0
  1. populateStartOffsets

  2. Setting Job Description as getBatchDescriptionString

DEBUG Stream running from [committedOffsets] to [availableOffsets]

1. Constructing the next streaming batch

If there is data available in the sources, batchRunner marks currentStatus with isDataAvailable enabled.

Note

You can check out the status of a streaming query using status method.

scala> spark.streams.active(0).status
res1: org.apache.spark.sql.streaming.StreamingQueryStatus =
{
  "message" : "Waiting for next trigger",
  "isDataAvailable" : false,
  "isTriggerActive" : false
}

batchRunner then updates the status message to Processing new data and runs the current streaming batch.

StreamExecution runBatches
Figure 11. StreamExecution’s Running Batches (on Execution Thread)

After triggerExecution section has finished, batchRunner finishes the streaming batch for the trigger (and collects query execution statistics).

When there was data available in the sources, batchRunner updates committed offsets (by adding the current batch id to BatchCommitLog and adding availableOffsets to committedOffsets).

You should see the following DEBUG message in the logs:

DEBUG batch $currentBatchId committed

batchRunner increments the current batch id and sets the job description for all the following Spark jobs to include the new batch id.

When no data was available in the sources to process, batchRunner does the following:

  1. Marks currentStatus with isDataAvailable disabled

  2. Updates the status message to Waiting for data to arrive

  3. Sleeps the current thread for pollingDelayMs milliseconds.

batchRunner updates the status message to Waiting for next trigger and returns whether the query is currently active or not (so TriggerExecutor can decide whether to finish executing the batches or not)

Populating Start Offsets — populateStartOffsets Internal Method

populateStartOffsets(sparkSessionToRunBatches: SparkSession): Unit
Note
The batch id could not be available in metadata log if a streaming query started with a new metadata log or no batch was committed before.

With the latest committed batch id with the metadata (from OffsetSeqLog) populateStartOffsets sets current batch id to the latest committed batch id, and availableOffsets to its offsets (considering them unprocessed yet).

Note
populateStartOffsets may re-execute the latest committed batch.

If the latest batch id is greater than 0, populateStartOffsets requests OffsetSeqLog for the second latest batch with its metadata (or reports a IllegalStateException if not found). populateStartOffsets sets committed offsets to the second latest committed offsets.

populateStartOffsets updates the offset metadata.

Caution
FIXME Why is the update needed?

(only when the latest batch in OffsetSeqLog is also the latest batch in BatchCommitLog) With the latest processed batch id with the metadata (from BatchCommitLog), populateStartOffsets sets current batch id as the next after the latest processed batch. populateStartOffsets sets committed offsets to availableOffsets.

Caution
FIXME Describe what happens with availableOffsets.

populateStartOffsets constructs the next streaming batch.

Caution
FIXME Describe the WARN message when latestCommittedBatchId < latestBatchId - 1.
WARN Batch completion log latest batch id is [latestCommittedBatchId], which is not trailing batchid [latestBatchId] by one

You should see the following DEBUG message in the logs:

DEBUG Resuming at batch [currentBatchId] with committed offsets [committedOffsets] and available offsets [availableOffsets]
Caution
FIXME Include an example of Resuming at batch

When the latest committed batch id with the metadata could not be found in BatchCommitLog, populateStartOffsets prints out the following INFO message to the logs:

INFO no commit log present
Caution
FIXME Include an example of the case when no commit log present.

When the latest committed batch id with the metadata could not be found in OffsetSeqLog, it is assumed that the streaming query is started for the first time. You should see the following INFO message in the logs:

INFO StreamExecution: Starting new streaming query.

populateStartOffsets sets current batch id to 0 and constructs the next streaming batch.

Note
populateStartOffsets is used exclusively when TriggerExecutor executes a batch runner for the first time (i.e. current batch id is negative).

getBatchDescriptionString Internal Method

getBatchDescriptionString: String
Caution
FIXME

toDebugString Internal Method

toDebugString(includeLogicalPlan: Boolean): String

toDebugString…​FIXME

Note
toDebugString is used exclusively when StreamExecution runs streaming batches (and a streaming query terminated with exception).

Starting Streaming Query (on Execution Thread) — start Method

start(): Unit

When called, start prints the following INFO message to the logs:

INFO Starting [id]. Use [resolvedCheckpointRoot] to store the query checkpoint.

start then sets microBatchThread as a daemon thread and starts it.

Note
start uses Java’s java.lang.Thread.start to run the streaming query on a separate execution thread.
Note
When started, a streaming query runs in its own execution thread on JVM.

In the end, start waits until startLatch has counted down to zero (which is right after StreamExecution has started running streaming batches so there is some pause in the main thread’s execution to wait till the streaming query execution thread starts).

Note
start is used exclusively when StreamingQueryManager is requested to start a streaming query.

Creating StreamExecution Instance

StreamExecution takes the following when created:

  • SparkSession

  • Query name

  • Path to the checkpoint directory (aka metadata directory)

  • Analyzed logical plan (i.e. LogicalPlan)

  • Streaming sink

  • Trigger

  • Clock

  • Output mode (that is only used when creating IncrementalExecution for a streaming batch in query planning)

  • Flag where to delete the checkpoint on stop

StreamExecution initializes the internal registries and counters.

Creating Path to Checkpoint Directory — checkpointFile Internal Method

checkpointFile(name: String): String

checkpointFile gives the path of a directory with name in checkpoint directory.

Note
checkpointFile uses Hadoop’s org.apache.hadoop.fs.Path.
Note
checkpointFile is used for streamMetadata, OffsetSeqLog, BatchCommitLog, and lastExecution (for runBatch).

Constructing Next Streaming Batch — constructNextBatch Internal Method

constructNextBatch(): Unit

constructNextBatch is made up of the following three parts:

  1. Firstly, checking if there is new data available by requesting new offsets from every streaming source

  2. There is some data to process (and so where the next batch is constructed)

  3. No data is available

Note

constructNextBatch is used when StreamExecution:

Checking Whether New Data Is Available (by Requesting New Offsets from Sources)

constructNextBatch starts by checking whether or not a new data is available in any of the streaming sources (in the logical query plan).

constructNextBatch acquires awaitBatchLock and gets the latest offset from every streaming data source.

Note
constructNextBatch checks out the latest offset in every streaming data source sequentially, i.e. one data source at a time.
StreamExecution constructNextBatch
Figure 12. StreamExecution’s Getting Offsets From Streaming Sources
Note
constructNextBatch uses the Source contract to get the latest offset (using Source.getOffset method).

constructNextBatch updates the status message to Getting offsets from [source] for every streaming data source.

In getOffset time-tracking section, constructNextBatch gets the offsets.

constructNextBatch prints out the following DEBUG message to the logs:

DEBUG StreamExecution: getOffset took [time] ms

constructNextBatch adds the streaming sources that have the available offsets to availableOffsets.

If there is no data available (i.e. no offsets unprocessed in any of the streaming data sources), constructNextBatch turns noNewData flag on.

In the end (of this checking-data block), constructNextBatch releases awaitBatchLock

New Data Available

When new data is available, constructNextBatch updates the event time watermark (tracked using offsetSeqMetadata) if it finds one in the last IncrementalExecution.

If lastExecution is available (which may not when constructNextBatch is executed the very first time), constructNextBatch takes the executed physical plan (i.e. SparkPlan) and collects all EventTimeWatermarkExec physical operators with the count of eventTimeStats greater than 0.

Note
The executed physical plan is available as executedPlan property of IncrementalExecution (which is a custom QueryExecution).

You should see the following DEBUG message in the logs:

DEBUG StreamExecution: Observed event time stats: [eventTimeStats]

constructNextBatch calculates the difference between the maximum value of eventTimeStats and delayMs for every EventTimeWatermarkExec physical operator.

Note
The maximum value of eventTimeStats is the youngest time, i.e. the time the closest to the current time.

constructNextBatch then takes the first difference (if available at all) and uses it as a possible new event time watermark.

If the event time watermark candidate is greater than the current watermark (i.e. later time-wise), constructNextBatch prints out the following INFO message to the logs:

INFO StreamExecution: Updating eventTime watermark to: [newWatermarkMs] ms

constructNextBatch creates a new OffsetSeqMetadata with the new event time watermark and the current time.

Otherwise, if the eventTime watermark candidate is not greater than the current watermark, constructNextBatch simply prints out the following DEBUG message to the logs:

DEBUG StreamExecution: Event time didn't move: [newWatermarkMs] <= [batchWatermarkMs]

constructNextBatch creates a new OffsetSeqMetadata with just the current time.

Note
Although constructNextBatch collects all the EventTimeWatermarkExec physical operators in the executed physical plan of lastExecution, only the first matters if available.
Note
A physical plan can have as many EventTimeWatermarkExec physical operators as withWatermark operator was used to create a streaming query.
Note

Streaming watermark can be changed between a streaming query’s restarts (and be different between what is checkpointed and the current version of the query).

FIXME True? Example?

constructNextBatch then adds the offsets to metadata log.

constructNextBatch updates the status message to Writing offsets to log.

Note

While writing the offsets to the metadata log, constructNextBatch uses the following internal registries:

constructNextBatch reports a AssertionError when writing to the metadata log has failed.

Concurrent update to the log. Multiple streaming jobs detected for [currentBatchId]
Tip

Use StreamingQuery.lastProgress to access walCommit duration.

scala> :type sq
org.apache.spark.sql.streaming.StreamingQuery
sq.lastProgress.durationMs.get("walCommit")
Tip

Enable INFO logging level for org.apache.spark.sql.execution.streaming.StreamExecution logger to be notified about walCommit duration.

17/08/11 09:04:17 INFO StreamExecution: Streaming query made progress: {
  "id" : "ec8f8228-90f6-4e1f-8ad2-80222affed63",
  "runId" : "f605c134-cfb0-4378-88c1-159d8a7c232e",
  "name" : "rates-to-console",
  "timestamp" : "2017-08-11T07:04:17.373Z",
  "batchId" : 0,
  "numInputRows" : 0,
  "processedRowsPerSecond" : 0.0,
  "durationMs" : {
    "addBatch" : 38,
    "getBatch" : 1,
    "getOffset" : 0,
    "queryPlanning" : 1,
    "triggerExecution" : 62,
    "walCommit" : 19          // <-- walCommit
  },

constructNextBatch commits the offsets for the batch (only when current batch id is not 0, i.e. when the query has just been started and constructNextBatch is called the first time).

constructNextBatch takes the previously-committed batch (from OffsetSeqLog), extracts the stored offsets per source.

Note
constructNextBatch uses OffsetSeq.toStreamProgress and sources registry to extract the offsets per source.

constructNextBatch requests every streaming source to commit the offsets

Note
constructNextBatch uses the Source contract to commit the offsets (using Source.commit method).

constructNextBatch reports a IllegalStateException when current batch id is 0.

batch [currentBatchId] doesn't exist

In the end, constructNextBatch purges OffsetSeqLog and BatchCommitLog when current batch id is above spark.sql.streaming.minBatchesToRetain Spark property.

No New Data Available

If there is no new data available, constructNextBatch acquires a lock on awaitBatchLock, wakes up all waiting threads that are waiting for the stream to progress (using awaitBatchLockCondition), followed by releasing the lock on awaitBatchLock.

Posting StreamingQueryListener Event — postEvent Method

postEvent(event: StreamingQueryListener.Event): Unit
Note
postEvent is a part of ProgressReporter Contract.

postEvent simply requests the StreamingQueryManager to post the input event (to the StreamingQueryListenerBus in the current SparkSession).

Note
postEvent uses SparkSession to access the current StreamingQueryManager.
Note

postEvent is used when:

Checking Whether Data Is Available in Streaming Sources — dataAvailable Internal Method

dataAvailable: Boolean

dataAvailable finds the streaming sources in availableOffsets for which the offsets committed (as recorded in committedOffsets) are different or do not exist at all.

If there are any differences in the number of sources or their committed offsets, dataAvailable is enabled (i.e. true).

Note
dataAvailable is used when StreamExecution runs streaming batches and constructs the next streaming batch.

Waiting Until No Data Available in Sources or Query Has Been Terminated — processAllAvailable Method

processAllAvailable(): Unit
Note
processAllAvailable is a part of StreamingQuery Contract.

processAllAvailable reports streamDeathCause exception if defined (and returns).

Note
streamDeathCause is defined exclusively when StreamExecution runs streaming batches (and terminated with an exception).

processAllAvailable returns when isActive flag is turned off (which is when StreamExecution is in TERMINATED state).

processAllAvailable acquires a lock on awaitBatchLock and turns noNewData flag off.

processAllAvailable keeps waiting 10 seconds for awaitBatchLockCondition until noNewData flag is turned on or StreamExecution is no longer active.

Note
noNewData flag is turned on exclusively when StreamExecution constructs the next streaming batch (and finds that no data is available).

In the end, processAllAvailable releases awaitBatchLock lock.