Skip to content

Conversation

@yaooqinn
Copy link
Member

What changes were proposed in this pull request?

This PR adds Native KQUEUE via JNI support for transport, such as shuffle, file, and rpc procedures

Why are the changes needed?

Feature parity between Linux and MacOS/BSD platforms

Does this PR introduce any user-facing change?

Yes, a new option for io.mode

How was this patch tested?

new unit tests

Was this patch authored or co-authored using generative AI tooling?

no

@github-actions github-actions bot added the CORE label Oct 23, 2025
return switch (mode) {
case NIO -> new NioEventLoopGroup(numThreads, threadFactory);
case EPOLL -> new EpollEventLoopGroup(numThreads, threadFactory);
case KQUEUE -> new KQueueEventLoopGroup(numThreads, threadFactory);
Copy link
Contributor

@LuciferYang LuciferYang Oct 23, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a deprecated API usage.
Of course, we can fix it uniformly in a separate pr

@sarutak
Copy link
Member

sarutak commented Oct 23, 2025

ShuffleNettyKQueueSuite doesn't finish on my Mac.

[info] ShuffleNettyKQueueSuite:
[info] - groupByKey without compression (804 milliseconds)
[info] *** Test still running after 6 minutes, 53 seconds: suite name: ShuffleNettyKQueueSuite, test name: shuffle non-zero block size. 
[info] *** Test still running after 11 minutes, 53 seconds: suite name: ShuffleNettyKQueueSuite, test name: shuffle non-zero block size. 
[info] *** Test still running after 16 minutes, 53 seconds: suite name: ShuffleNettyKQueueSuite, test name: shuffle non-zero block size. 

@yaooqinn Does this test pass on your Mac?

@dongjoon-hyun
Copy link
Member

dongjoon-hyun commented Oct 23, 2025

+1, LGTM.

Thank you, @yaooqinn , @LuciferYang , @sarutak . Merged to master for Apache Spark 4.1.0-preview3.

I manually verified on my Mac. It works.

$ sw_vers
ProductName:		macOS
ProductVersion:		26.0.1
BuildVersion:		25A362

$ build/sbt "core/testOnly *.ShuffleNetty*Suite"
...
[info] ShuffleNettyEpollSuite:
[info] - groupByKey without compression [disabled on Mac OS X with EPOLL] !!! IGNORED !!!
[info] - shuffle non-zero block size [disabled on Mac OS X with EPOLL] !!! IGNORED !!!
[info] - shuffle serializer [disabled on Mac OS X with EPOLL] !!! IGNORED !!!
[info] - zero sized blocks [disabled on Mac OS X with EPOLL] !!! IGNORED !!!
[info] - zero sized blocks without kryo [disabled on Mac OS X with EPOLL] !!! IGNORED !!!
[info] - shuffle on mutable pairs [disabled on Mac OS X with EPOLL] !!! IGNORED !!!
[info] - sorting on mutable pairs [disabled on Mac OS X with EPOLL] !!! IGNORED !!!
[info] - cogroup using mutable pairs [disabled on Mac OS X with EPOLL] !!! IGNORED !!!
[info] - subtract mutable pairs [disabled on Mac OS X with EPOLL] !!! IGNORED !!!
[info] - sort with Java non serializable class - Kryo [disabled on Mac OS X with EPOLL] !!! IGNORED !!!
[info] - sort with Java non serializable class - Java [disabled on Mac OS X with EPOLL] !!! IGNORED !!!
[info] - shuffle with different compression settings (SPARK-3426) [disabled on Mac OS X with EPOLL] !!! IGNORED !!!
[info] - [SPARK-4085] rerun map stage if reduce stage cannot find its local shuffle file [disabled on Mac OS X with EPOLL] !!! IGNORED !!!
[info] - cannot find its local shuffle file if no execution of the stage and rerun shuffle [disabled on Mac OS X with EPOLL] !!! IGNORED !!!
[info] - metrics for shuffle without aggregation [disabled on Mac OS X with EPOLL] !!! IGNORED !!!
[info] - metrics for shuffle with aggregation [disabled on Mac OS X with EPOLL] !!! IGNORED !!!
[info] - multiple simultaneous attempts for one task (SPARK-8029) [disabled on Mac OS X with EPOLL] !!! IGNORED !!!
[info] - SPARK-34541: shuffle can be removed [disabled on Mac OS X with EPOLL] !!! IGNORED !!!
[info] - SPARK-36206: shuffle checksum detect disk corruption [disabled on Mac OS X with EPOLL] !!! IGNORED !!!
[info] - SPARK-39771: warn when shuffle block number is too large [disabled on Mac OS X with EPOLL] !!! IGNORED !!!
[info] ShuffleNettyNioSuite:
[info] - groupByKey without compression (1 second, 253 milliseconds)
[info] - shuffle non-zero block size (2 seconds, 705 milliseconds)
[info] - shuffle serializer (2 seconds, 321 milliseconds)
[info] - zero sized blocks (3 seconds, 414 milliseconds)
[info] - zero sized blocks without kryo (3 seconds, 377 milliseconds)
[info] - shuffle on mutable pairs (2 seconds, 205 milliseconds)
[info] - sorting on mutable pairs (2 seconds, 205 milliseconds)
[info] - cogroup using mutable pairs (2 seconds, 117 milliseconds)
[info] - subtract mutable pairs (2 seconds, 24 milliseconds)
[info] - sort with Java non serializable class - Kryo (2 seconds, 298 milliseconds)
[info] - sort with Java non serializable class - Java (1 second, 618 milliseconds)
[info] - shuffle with different compression settings (SPARK-3426) (170 milliseconds)
[info] - [SPARK-4085] rerun map stage if reduce stage cannot find its local shuffle file (276 milliseconds)
[info] - cannot find its local shuffle file if no execution of the stage and rerun shuffle (38 milliseconds)
[info] - metrics for shuffle without aggregation (132 milliseconds)
[info] - metrics for shuffle with aggregation (200 milliseconds)
[info] - multiple simultaneous attempts for one task (SPARK-8029) (31 milliseconds)
[info] - SPARK-34541: shuffle can be removed (65 milliseconds)
[info] - SPARK-36206: shuffle checksum detect disk corruption (3 seconds, 345 milliseconds)
[info] - SPARK-39771: warn when shuffle block number is too large (75 milliseconds)
[info] ShuffleNettyKQueueSuite:
[info] - groupByKey without compression (344 milliseconds)
[info] - shuffle non-zero block size (2 seconds, 198 milliseconds)
[info] - shuffle serializer (2 seconds, 269 milliseconds)
[info] - zero sized blocks (3 seconds, 131 milliseconds)
[info] - zero sized blocks without kryo (3 seconds, 415 milliseconds)
[info] - shuffle on mutable pairs (2 seconds, 382 milliseconds)
[info] - sorting on mutable pairs (2 seconds, 350 milliseconds)
[info] - cogroup using mutable pairs (2 seconds, 159 milliseconds)
[info] - subtract mutable pairs (2 seconds, 257 milliseconds)
[info] - sort with Java non serializable class - Kryo (2 seconds, 289 milliseconds)
[info] - sort with Java non serializable class - Java (1 second, 710 milliseconds)
[info] - shuffle with different compression settings (SPARK-3426) (130 milliseconds)
[info] - [SPARK-4085] rerun map stage if reduce stage cannot find its local shuffle file (283 milliseconds)
[info] - cannot find its local shuffle file if no execution of the stage and rerun shuffle (39 milliseconds)
[info] - metrics for shuffle without aggregation (81 milliseconds)
[info] - metrics for shuffle with aggregation (133 milliseconds)
[info] - multiple simultaneous attempts for one task (SPARK-8029) (25 milliseconds)
[info] - SPARK-34541: shuffle can be removed (36 milliseconds)
[info] - SPARK-36206: shuffle checksum detect disk corruption (3 seconds, 415 milliseconds)
[info] - SPARK-39771: warn when shuffle block number is too large (31 milliseconds)
[info] Run completed in 59 seconds, 880 milliseconds.
[info] Total number of tests run: 40
[info] Suites: completed 3, aborted 0
[info] Tests: succeeded 40, failed 0, canceled 0, ignored 20, pending 0
[info] All tests passed.
[success] Total time: 92 s (0:01:32.0), completed Oct 23, 2025, 10:35:14 AM

@sarutak
Copy link
Member

sarutak commented Oct 23, 2025

I confirmed this test passes on my clean environment.
Sorry for confusing.

@dongjoon-hyun
Copy link
Member

No problem at all, @sarutak . Thank you for letting us know. 😄

@yaooqinn
Copy link
Member Author

Thank you @LuciferYang @sarutak @dongjoon-hyun

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants