Skip to content

Latest commit

 

History

History
64 lines (42 loc) · 2.43 KB

spark-sql-streaming-OffsetSeqLog.adoc

File metadata and controls

64 lines (42 loc) · 2.43 KB

OffsetSeqLog — HDFSMetadataLog with OffsetSeq Metadata

OffsetSeqLog is a HDFSMetadataLog with metadata as OffsetSeq.

Note
HDFSMetadataLog is a MetadataLog that uses Hadoop HDFS for a reliable storage.

OffsetSeqLog is created exclusively for write-ahead log of offsets in StreamExecution.

OffsetSeqLog uses OffsetSeq for metadata which holds an ordered collection of zero or more offsets and optional metadata (as OffsetSeqMetadata for keeping track of event time watermark as set up by a Spark developer and what was found in the records).

serialize Method

serialize(offsetSeq: OffsetSeq, out: OutputStream): Unit
Note
serialize is a part of HDFSMetadataLog Contract to write a metadata in serialized format.

serialize firstly writes out the version prefixed with v on a single line (e.g. v1) followed by the optional metadata in JSON format.

Note
The version in Spark 2.2 is 1 with the charset being UTF-8.

serialize then writes out the offsets in JSON format, one per line.

Note
No offsets to write in offsetSeq for a streaming source is marked as - (a dash) in the log.
$ ls -tr [checkpoint-directory]/offsets
0 1 2 3 4 5 6

$ cat [checkpoint-directory]/offsets/6
v1
{"batchWatermarkMs":0,"batchTimestampMs":1502872590006,"conf":{"spark.sql.shuffle.partitions":"200","spark.sql.streaming.stateStore.providerClass":"org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider"}}
51

deserialize Method

deserialize(in: InputStream): OffsetSeq
Caution
FIXME

Creating OffsetSeqLog Instance

OffsetSeqLog takes the following when created:

  • SparkSession

  • Path of the metadata log directory

OffsetSeq.toStreamProgress Method

toStreamProgress(sources: Seq[Source]): StreamProgress

toStreamProgress creates a StreamProgress with the only sources that have offsets unprocessed yet.