Skip to content

Latest commit

 

History

History
185 lines (129 loc) · 6.06 KB

spark-sql-streaming-DataStreamReader.adoc

File metadata and controls

185 lines (129 loc) · 6.06 KB

DataStreamReader — Loading Data from Streaming Data Source

DataStreamReader is the interface to describe how data is loaded to a streaming Dataset from a streaming data source by format, schema and options.

DataStreamReader SparkSession StreamingRelation
Figure 1. DataStreamReader and The Others

DataStreamReader is used for a Spark developer to describe how Spark Structured Streaming loads datasets from a streaming source (that in the end creates a logical plan for a streaming query).

Note
DataStreamReader is the Spark developer-friendly API to create a StreamingRelation logical operator (that represents a streaming source in a logical plan).

You can access DataStreamReader using SparkSession.readStream method.

import org.apache.spark.sql.SparkSession
val spark: SparkSession = ...

val streamReader = spark.readStream
Table 1. DataStreamReader’s Methods
Method Description

csv

Sets csv as the source format

format

Sets the format of datasets

json

Sets json as the source format

load

Loads data from a streaming source to a streaming Dataset

option

Sets a loading option

options

Sets one or more loading options

parquet

Sets parquet as the source format

schema

Sets the schema of datasets

text

Sets text as the source format

textFile

Returns Dataset[String] (not DataFrame)

DataStreamReader supports many source formats natively and offers the interface to define custom formats:

Note
DataStreamReader assumes parquet file format by default that you can change using spark.sql.sources.default property.
Note
hive source format is not supported.

After you have described the streaming pipeline to read datasets from an external streaming data source, you eventually trigger the loading using format-agnostic load or format-specific (e.g. json, csv) operators.

Table 2. DataStreamReader’s Internal Properties (in alphabetical order)
Name Initial Value Description

source

spark.sql.sources.default property

Source format of datasets in a streaming data source

userSpecifiedSchema

(empty)

Optional user-defined schema

extraOptions

(empty)

Collection of key-value configuration options

Specifying Format — format Method

format(source: String): DataStreamReader

format specifies the source format of datasets in a streaming data source.

Internally, schema sets source internal property.

Specifying Schema — schema Method

schema(schema: StructType): DataStreamReader
schema(schemaString: String): DataStreamReader  // (1)
  1. Uses the input DDL-formatted string

schema specifies the schema of the streaming data source.

Internally, schema sets userSpecifiedSchema internal property.

Specifying Loading Options — option Method

option(key: String, value: String): DataStreamReader
option(key: String, value: Boolean): DataStreamReader
option(key: String, value: Long): DataStreamReader
option(key: String, value: Double): DataStreamReader

option family of methods specifies additional options to a streaming data source.

There is support for values of String, Boolean, Long, and Double types for user convenience, and internally are converted to String type.

Internally, option sets extraOptions internal property.

Note
You can also set options in bulk using options method. You have to do the type conversion yourself, though.

Specifying Loading Options — options Method

options(options: scala.collection.Map[String, String]): DataStreamReader

options method allows specifying one or many options of the streaming input data source.

Note
You can also set options one by one using option method.

Loading Data From Streaming Source (to Streaming Dataset) — load Method

load(): DataFrame
load(path: String): DataFrame // (1)
  1. Specifies path option before passing the call to parameterless load()

load loads data from a streaming data source to a streaming dataset.

Internally, load first creates a DataSource (using user-specified schema, the name of the source and options) followed by creating a DataFrame with a StreamingRelation logical operator (for the DataSource).

load makes sure that the name of the source is not hive. Otherwise, load reports a AnalysisException.

Hive data source can only be used with tables, you can not read files of Hive data source directly.

Built-in Formats

json(path: String): DataFrame
csv(path: String): DataFrame
parquet(path: String): DataFrame
text(path: String): DataFrame
textFile(path: String): Dataset[String] // (1)
  1. Returns Dataset[String] not DataFrame

DataStreamReader can load streaming datasets from data sources of the following formats:

  • json

  • csv

  • parquet

  • text

The methods simply pass calls to format followed by load(path).