Skip to content

Latest commit

 

History

History
57 lines (41 loc) · 2.63 KB

spark-sql-streaming-Dataset-groupByKey.adoc

File metadata and controls

57 lines (41 loc) · 2.63 KB

groupByKey Operator — Streaming Aggregation (with Explicit State Logic)

groupByKey[K: Encoder](func: T => K): KeyValueGroupedDataset[K, T]

groupByKey operator is used to combine rows (of type T) into KeyValueGroupedDataset with the keys (of type K) being generated by a func key-generating function and the values collections of one or more rows associated with a key.

groupByKey uses a func function that takes a row (of type T) and gives the group key (of type K) the row is associated with.

func: T => K
Note
The type of the input argument of func is the type of rows in the Dataset (i.e. Dataset[T]).

groupByKey might group together customer orders from the same postal code (wherein the "key" would be the postal code of each individual order, and the "value" would be the order itself).

The following example code shows how to apply groupByKey operator to a structured stream of timestamped values of different devices.

scala> spark.version
res0: String = 2.3.0-SNAPSHOT

// input stream
import java.sql.Timestamp
val signals = spark.
  readStream.
  format("rate").
  option("rowsPerSecond", 1).
  load.
  withColumn("value", $"value" % 10)  // <-- randomize the values (just for fun)
  withColumn("deviceId", lit(util.Random.nextInt(10))). // <-- 10 devices randomly assigned to values
  as[(Timestamp, Long, Int)] // <-- convert to a "better" type (from "unpleasant" Row)

// stream processing using groupByKey operator
// groupByKey(func: ((Timestamp, Long, Int)) => K): KeyValueGroupedDataset[K, (Timestamp, Long, Int)]
// K becomes Int which is a device id
val deviceId: ((Timestamp, Long, Int)) => Int = { case (_, _, deviceId) => deviceId }
scala> val signalsByDevice = signals.groupByKey(deviceId)
signalsByDevice: org.apache.spark.sql.KeyValueGroupedDataset[Int,(java.sql.Timestamp, Long, Int)] = org.apache.spark.sql.KeyValueGroupedDataset@19d40bc6

Internally, creates a KeyValueGroupedDataset with the following:

  • Encoders for K keys and T rows

  • QueryExecution for AppendColumns unary logical operator with the func function and the analyzed logical plan of the Dataset (groupBy is executed on)

  • Grouping attributes

Credits

  • The example with customer orders and postal codes is borrowed from Apache Beam’s Using GroupByKey Programming Guide.