Skip to content

Latest commit

 

History

History
123 lines (95 loc) · 3.51 KB

spark-sql-streaming-KeyValueGroupedDataset.adoc

File metadata and controls

123 lines (95 loc) · 3.51 KB

KeyValueGroupedDataset — Streaming Aggregation

KeyValueGroupedDataset represents a grouped dataset as a result of groupByKey operator (that aggregates records by a grouping function).

// Dataset[T]
groupByKey(func: T => K): KeyValueGroupedDataset[K, T]

KeyValueGroupedDataset works for batch and streaming aggregations, but shines the most when used for streaming aggregation (with streaming Datasets).

import java.sql.Timestamp
scala> val numGroups = spark.
  readStream.
  format("rate").
  load.
  as[(Timestamp, Long)].
  groupByKey { case (time, value) => value % 2 }
numGroups: org.apache.spark.sql.KeyValueGroupedDataset[Long,(java.sql.Timestamp, Long)] = org.apache.spark.sql.KeyValueGroupedDataset@616c1605

import org.apache.spark.sql.streaming.Trigger
import scala.concurrent.duration._
numGroups.
  mapGroups { case(group, values) => values.size }.
  writeStream.
  format("console").
  trigger(Trigger.ProcessingTime(10.seconds)).
  start

-------------------------------------------
Batch: 0
-------------------------------------------
+-----+
|value|
+-----+
+-----+

-------------------------------------------
Batch: 1
-------------------------------------------
+-----+
|value|
+-----+
|    3|
|    2|
+-----+

-------------------------------------------
Batch: 2
-------------------------------------------
+-----+
|value|
+-----+
|    5|
|    5|
+-----+

// Eventually...
spark.streams.active.foreach(_.stop)

The most prestigious use case of KeyValueGroupedDataset however is stateful streaming aggregation that allows for accumulating streaming state (by means of GroupState) using mapGroupsWithState and the more advanced flatMapGroupsWithState operators.

Table 1. KeyValueGroupedDataset’s Operators
Operator Description

agg

cogroup

count

flatMapGroups

flatMapGroupsWithState

Creates a Dataset with FlatMapGroupsWithState logical operator

Note
The difference between flatMapGroupsWithState and mapGroupsWithState is the state function that generates zero or more elements (that are in turn the rows in the result Dataset).

keyAs

keys

mapGroups

mapGroupsWithState

Creates a Dataset with FlatMapGroupsWithState logical operator

Note
The difference between mapGroupsWithState and flatMapGroupsWithState is the state function that generates exactly one element (that is in turn the row in the result Dataset).

mapValues

queryExecution

reduceGroups

Creating KeyValueGroupedDataset Instance

KeyValueGroupedDataset takes the following when created:

  • Encoder for keys

  • Encoder for values

  • QueryExecution

  • Data attributes

  • Grouping attributes