Skip to content

Latest commit

 

History

History
351 lines (246 loc) · 12.5 KB

spark-sql-streaming-ProgressReporter.adoc

File metadata and controls

351 lines (246 loc) · 12.5 KB

ProgressReporter

ProgressReporter is the contract that StreamExecution uses to report query progress.

import org.apache.spark.sql.streaming.Trigger
import scala.concurrent.duration._
val sampleQuery = spark.
  readStream.
  format("rate").
  load.
  writeStream.
  format("console").
  option("truncate", false).
  trigger(Trigger.ProcessingTime(10.seconds)).
  start

// Using public API
import org.apache.spark.sql.streaming.SourceProgress
scala> sampleQuery.
     |   lastProgress.
     |   sources.
     |   map { case sp: SourceProgress =>
     |     s"source = ${sp.description} => endOffset = ${sp.endOffset}" }.
     |   foreach(println)
source = RateSource[rowsPerSecond=1, rampUpTimeSeconds=0, numPartitions=8] => endOffset = 663

scala> println(sampleQuery.lastProgress.sources(0))
res40: org.apache.spark.sql.streaming.SourceProgress =
{
  "description" : "RateSource[rowsPerSecond=1, rampUpTimeSeconds=0, numPartitions=8]",
  "startOffset" : 333,
  "endOffset" : 343,
  "numInputRows" : 10,
  "inputRowsPerSecond" : 0.9998000399920015,
  "processedRowsPerSecond" : 200.0
}

// With a hack
import org.apache.spark.sql.execution.streaming.StreamingQueryWrapper
val offsets = sampleQuery.
  asInstanceOf[StreamingQueryWrapper].
  streamingQuery.
  availableOffsets.
  map { case (source, offset) =>
    s"source = $source => offset = $offset" }
scala> offsets.foreach(println)
source = RateSource[rowsPerSecond=1, rampUpTimeSeconds=0, numPartitions=8] => offset = 293
Table 1. ProgressReporter’s Internal Registries and Counters (in alphabetical order)
Name Description

currentDurationsMs

Scala’s scala.collection.mutable.HashMap of action names (aka triggerDetailKey) and their cumulative times (in milliseconds).

The action names can be as follows:

  • addBatch

  • getBatch (when StreamExecution runs a streaming batch)

  • getOffset

  • queryPlanning

  • triggerExecution

  • walCommit when writing offsets to log

Starts empty when ProgressReporter sets the state for a new batch with new entries added or updated when reporting execution time (of an action).

Tip

You can see the current value of currentDurationsMs in progress reports under durationMs.

scala> query.lastProgress.durationMs
res3: java.util.Map[String,Long] = {triggerExecution=60, queryPlanning=1, getBatch=5, getOffset=0, addBatch=30, walCommit=23}

currentStatus

StreamingQueryStatus to track the status of a streaming query.

Available using status method.

currentTriggerEndTimestamp

Timestamp of when the current batch/trigger has ended

currentTriggerStartTimestamp

Timestamp of when the current batch/trigger has started

noDataProgressEventInterval

FIXME

lastNoDataProgressEventTime

FIXME

lastTriggerStartTimestamp

Timestamp of when the last batch/trigger started

progressBuffer

Scala’s scala.collection.mutable.Queue of StreamingQueryProgress

Elements are added and removed when ProgressReporter updates progress.

Used when ProgressReporter does lastProgress or recentProgress.

Creating Execution Statistics — extractExecutionStats Internal Method

extractExecutionStats(hasNewData: Boolean): ExecutionStats
Caution
FIXME

SourceProgress

Caution
FIXME

SinkProgress

Caution
FIXME

ProgressReporter Contract

package org.apache.spark.sql.execution.streaming

trait ProgressReporter {
  // only required methods that have no implementation
  def availableOffsets: StreamProgress
  def committedOffsets: StreamProgress
  def currentBatchId: Long
  def id: UUID
  def logicalPlan: LogicalPlan
  def name: String
  def newData: Map[Source, DataFrame]
  def offsetSeqMetadata: OffsetSeqMetadata
  def postEvent(event: StreamingQueryListener.Event): Unit
  def runId: UUID
  def sink: Sink
  def sources: Seq[Source]
  def triggerClock: Clock
}
Table 2. (Subset of) ProgressReporter Contract (in alphabetical order)
Method Description

availableOffsets

Used when:

committedOffsets

Used when:

currentBatchId

Id of the current batch

id

UUID of…​FIXME

logicalPlan

Logical plan (i.e. LogicalPlan) of a streaming Dataset that…​FIXME

Used when:

name

Name of…​FIXME

newData

Streaming sources with the new data as a DataFrame.

Used when:

offsetSeqMetadata

postEvent

FIXME

runId

UUID of…​FIXME

sink

Streaming sink

sources

Streaming sources

triggerClock

Clock to track the time

status Method

status: StreamingQueryStatus

status gives the current StreamingQueryStatus.

Note
status is used when StreamingQueryWrapper is requested for the current status of a streaming query (that is part of StreamingQuery Contract).

Reporting Streaming Query Progress — updateProgress Internal Method

updateProgress(newProgress: StreamingQueryProgress): Unit

updateProgress records the input newProgress and posts a QueryProgressEvent event.

ProgressReporter updateProgress
Figure 1. ProgressReporter’s Reporting Query Progress

updateProgress adds the input newProgress to progressBuffer.

updateProgress removes elements from progressBuffer if their number is or exceeds the value of spark.sql.streaming.numRecentProgressUpdates property.

updateProgress posts a QueryProgressEvent (with the input newProgress).

updateProgress prints out the following INFO message to the logs:

INFO StreamExecution: Streaming query made progress: [newProgress]
Note
updateProgress synchronizes concurrent access to progressBuffer.
Note
updateProgress is used exclusively when ProgressReporter finishes a trigger.

Setting State For New Trigger — startTrigger Method

startTrigger(): Unit

When called, startTrigger prints out the following DEBUG message to the logs:

DEBUG StreamExecution: Starting Trigger Calculation

startTrigger sets currentTriggerStartTimestamp using triggerClock.

startTrigger enables isTriggerActive flag of StreamingQueryStatus.

startTrigger clears currentDurationsMs.

Note
startTrigger is used exclusively when StreamExecution starts running batches (as part of TriggerExecutor executing a batch runner).

Finishing Trigger (by Updating Progress and Marking Current Status As Trigger Inactive) — finishTrigger Method

finishTrigger(hasNewData: Boolean): Unit

Internally, finishTrigger sets currentTriggerEndTimestamp to the current time (using triggerClock).

finishTrigger extractExecutionStats.

finishTrigger calculates the processing time (in seconds) as the difference between the end and start timestamps.

finishTrigger calculates the input time (in seconds) as the difference between the start time of the current and last triggers.

ProgressReporter finishTrigger timestamps
Figure 2. ProgressReporter’s finishTrigger and Timestamps

finishTrigger prints out the following DEBUG message to the logs:

DEBUG StreamExecution: Execution stats: [executionStats]

finishTrigger creates a SourceProgress (aka source statistics) for every source used.

finishTrigger creates a SinkProgress (aka sink statistics) for the sink.

finishTrigger creates a StreamingQueryProgress.

If there was any data (using the input hasNewData flag), finishTrigger resets lastNoDataProgressEventTime (i.e. becomes the minimum possible time) and updates query progress.

Otherwise, when no data was available (using the input hasNewData flag), finishTrigger updates query progress only when lastNoDataProgressEventTime passed.

In the end, finishTrigger disables isTriggerActive flag of StreamingQueryStatus (i.e. sets it to false).

Note
finishTrigger is used exclusively when StreamExecution runs streaming batches (after TriggerExecutor has finished executing a streaming batch for a trigger).

Tracking and Recording Execution Time — reportTimeTaken Method

reportTimeTaken[T](triggerDetailKey: String)(body: => T): T

reportTimeTaken measures the time to execute body and records it in currentDurationsMs.

In the end, reportTimeTaken prints out the following DEBUG message to the logs and returns the result of executing body.

DEBUG StreamExecution: [triggerDetailKey] took [time] ms
Note

reportTimeTaken is used when StreamExecution wants to record the time taken for (as triggerDetailKey in the DEBUG message above):

  • addBatch

  • getBatch

  • getOffset

  • queryPlanning

  • triggerExecution

  • walCommit when writing offsets to log

updateStatusMessage Method

updateStatusMessage(message: String): Unit

updateStatusMessage updates message in StreamingQueryStatus internal registry.