[GLUTEN-11915][VL] Support RowBasedChecksum in ColumnarShuffleWriter (SPARK-51756)#12067
[GLUTEN-11915][VL] Support RowBasedChecksum in ColumnarShuffleWriter (SPARK-51756)#12067jaylisde wants to merge 1 commit into
Conversation
Do you mean https://issues.apache.org/jira/browse/SPARK-51756? |
|
Thanks @wForget for catching that. The correct reference should be SPARK-54663. I'll update the PR title and description. |
SPARK-54663 proposes row-based checksum, but the current implementation is based on data file. |
fa1cfba to
0bbebc7
Compare
|
Run Gluten Clickhouse CI on x86 |
0bbebc7 to
05d464d
Compare
|
Run Gluten Clickhouse CI on x86 |
05d464d to
0f7239d
Compare
|
Run Gluten Clickhouse CI on x86 |
|
Thanks @wForget. Updated to proper RowBasedChecksum (SPARK-51756) with native per-row XXH64 + order-independent aggregation. File-based checksum will be a follow-up PR. |
0f7239d to
f120e77
Compare
|
Run Gluten Clickhouse CI on x86 |
f120e77 to
74427a9
Compare
|
Run Gluten Clickhouse CI on x86 |
74427a9 to
44e0ac4
Compare
|
Run Gluten Clickhouse CI on x86 |
44e0ac4 to
7601312
Compare
|
Run Gluten Clickhouse CI on x86 |
7601312 to
ba4f04c
Compare
|
Run Gluten Clickhouse CI on x86 |
1 similar comment
|
Run Gluten Clickhouse CI on x86 |
|
@marin-ma, could you take a look when you get a chance? |
24d256c to
71ef704
Compare
|
Run Gluten Clickhouse CI on x86 |
…(SPARK-51756) Implement order-independent row-based checksum for non-deterministic stage retry detection. - C++ computeRowBasedChecksums(): UnsafeRowFast + XXH64, per-partition XOR+SUM - JNI: pass config, return checksum array - Scala: read SQLConf (OR logic), pass to native, use for MapStatus - Shim: GlutenMapStatusUtil for Spark 3.3-4.1 compatibility - Tests: C++ unit (4/4) + Scala integration (3/3)
71ef704 to
04208a3
Compare
|
Run Gluten Clickhouse CI on x86 |
Summary
Spark 4.1 introduced RowBasedChecksum (SPARK-51756) for detecting non-deterministic stage retries. When
spark.sql.shuffle.orderIndependentChecksum.enabledorspark.sql.shuffle.checksum.mismatchFullRetry.enabledis true, the shuffle writer must compute an order-independent per-row checksum and pass it viaMapStatus.checksumValueto the driver for comparing across task attempts.Problem: Gluten's
ColumnarShuffleWriteralways returnschecksumValue = 0, causing the driver to skip non-deterministic retry detection. If a task retry produces different output (e.g., due to round-robin partitioning), downstream consumers may silently read inconsistent data without triggering a full stage retry.Fix: Implement native C++ row-based checksum computation in
VeloxHashShuffleWriter. For each row indoSplit(), serialize viaUnsafeRowFastand compute XXH64 hash. Aggregate per-partition using XOR+SUM (order-independent). Return checksum array via JNI to Scala layer, which passes the aggregated value toMapStatus.checksumValue.Changes
VeloxHashShuffleWriter.cc: AddedcomputeRowBasedChecksums()using UnsafeRowFast + XXH64 with per-partition XOR+SUM aggregation.Options.h,ShuffleWriter.h/cc: AddedrowBasedChecksumEnabledoption androwBasedChecksums()accessor.JniWrapper.cc: Accept boolean config param, return checksum array.GlutenSplitResult.java,ShuffleWriterJniWrapper.java: AddedrowBasedChecksumsfield and param.ColumnarShuffleWriter.scala: Read SQLConf (OR logic), pass to native, use for MapStatus.GlutenMapStatusUtil.scala(shims/spark33-41): Cross-version MapStatus compatibility.RowBasedChecksumTest.cc: C++ unit test for order-independence, null handling, determinism.GlutenMapStatusEndToEndSuite.scala: Integration test withansiFallback=false.Test
GlutenMapStatusEndToEndSuite: 3/3 pass (propagation, deterministic, data-change detection)Partially addresses #11915.
Note
File-based shuffle checksum (
.checksumfile with ADLER32 for corruption diagnosis) will be addressed in a follow-up PR.