-
Notifications
You must be signed in to change notification settings - Fork 28.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-51358] [SS] Introduce snapshot upload lag detection through StateStoreCoordinator #50123
base: master
Are you sure you want to change the base?
Conversation
...re/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreCoordinator.scala
Outdated
Show resolved
Hide resolved
...re/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreCoordinator.scala
Show resolved
Hide resolved
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala
Show resolved
Hide resolved
stateStoreSnapshotVersions.getOrElse(storeProviderId, SnapshotUploadEvent(-1, 0)) | ||
logWarning( | ||
s"State store falling behind $storeProviderId " + | ||
s"(current: $snapshotEvent, latest: $latestSnapshot)" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe you can log the number of versions behind/time since last upload?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also - if this is the log line we have to look for, maybe you can add a prefix that is easy to grep for.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good idea, added StateStoreCoordinator Snapshot Lag
as a prefix for now. Thanks!
...re/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreCoordinator.scala
Outdated
Show resolved
Hide resolved
...re/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreCoordinator.scala
Outdated
Show resolved
Hide resolved
...c/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreCoordinatorSuite.scala
Show resolved
Hide resolved
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala
Outdated
Show resolved
Hide resolved
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala
Outdated
Show resolved
Hide resolved
...re/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreCoordinator.scala
Outdated
Show resolved
Hide resolved
...c/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreCoordinatorSuite.scala
Outdated
Show resolved
Hide resolved
.start() | ||
inputData.addData(1, 2, 3) | ||
query.processAllAvailable() | ||
inputData.addData(1, 2, 3) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why are you doing this twice rather than all together?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's more that I need to use multiple query.processAllAvailable()
to commit and progress to a new version, but I also didn't want to provide 0 data. I'll add a comment to make this more clear 👍
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Great thanks!
@@ -38,9 +38,14 @@ import org.apache.spark.sql.types.StructType | |||
import org.apache.spark.unsafe.Platform | |||
import org.apache.spark.util.{NonFateSharingCache, Utils} | |||
|
|||
/** Trait representing the different events reported from RocksDB instance */ | |||
trait RocksDBEventListener { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why RocksDBEventListener? Are we not adding this to HDFS later too?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
RocksDB state stores are a bit more special because the event is getting reported within the state store's rocksdb
instance, whereas HDFS just reports straight from the state store itself. This trait would not be needed for the HDFS state stores
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Got it. Can you make the docstring a bit more descriptive?
What changes were proposed in this pull request?
SPARK-51358
This PR adds detection logic + logging to detect delays in snapshot uploads across all state store instances. The main snapshot upload reporting logic is done through RPC calls from RocksDB.scala to the StateStoreCoordinator, so that events are not dependent on streaming query progress reports.
Why are the changes needed?
This allows us to enable observability through dashboards and alerts, helping us understand the frequency of lag in production.
Does this PR introduce any user-facing change?
No
How was this patch tested?
Five new tests are added in StateStoreCoordinatorSuite, while taking consideration join and non-joining stateful queries. One of these test is used to verify that the snapshot lag check is only done if changelog checkpointing is enabled.
Was this patch authored or co-authored using generative AI tooling?
No