Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-51341][CORE] Cancel time task with suitable way. #50107

Open
wants to merge 3 commits into
base: master
Choose a base branch
from

Conversation

beliefer
Copy link
Contributor

What changes were proposed in this pull request?

This PR proposes to cancel task with suitable way.

Why are the changes needed?

According to the discussion at
#47956 (comment)
TimerTask.cancel() doesn't work.

Does this PR introduce any user-facing change?

'No'.

How was this patch tested?

GA

Was this patch authored or co-authored using generative AI tooling?

'No'.

@beliefer beliefer changed the title [SPARK-51341][SQL] Cancel time task with suitable way. [SPARK-51341][CORE] Cancel time task with suitable way. Feb 27, 2025
assert(executor.isInstanceOf[ScheduledThreadPoolExecutor])
executor.asInstanceOf[ScheduledThreadPoolExecutor]
}
private val timer = ThreadUtils.newSingleThreadScheduledExecutor(
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The origin code is non-daemon thread here.
Please see
5d5b3a5#diff-e9f66d336ded6632366be06dbd70e74afe9595cfc92a962f2c8554f09af53d0fL286

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This timer is never shut down. Is there a decent place to do it? otherwise, I think daemon is correct here. Non-daemon threads would stop the JVM from exiting, in theory

Copy link
Contributor Author

@beliefer beliefer Mar 1, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because all the BarrierTaskContext share the timer, so we can't shut down it.
It seems good to revert it with daemon thread.

Comment on lines +124 to +127
def stop(): Unit = {
timerFuture.cancel(true)
ThreadUtils.shutdown(timer)
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the change @beliefer .
Interested in knowing how this change in the ui consoleprogress bar was tested?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good question!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To be honestly, I don't know it too.

@beliefer beliefer changed the title [SPARK-51341][CORE] Cancel time task with suitable way. [WIP][SPARK-51341][CORE] Cancel time task with suitable way. Feb 28, 2025
@beliefer beliefer changed the title [WIP][SPARK-51341][CORE] Cancel time task with suitable way. [SPARK-51341][CORE] Cancel time task with suitable way. Feb 28, 2025
@beliefer
Copy link
Contributor Author

beliefer commented Feb 28, 2025

assert(executor.isInstanceOf[ScheduledThreadPoolExecutor])
executor.asInstanceOf[ScheduledThreadPoolExecutor]
}
private val timer = ThreadUtils.newSingleThreadScheduledExecutor(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This timer is never shut down. Is there a decent place to do it? otherwise, I think daemon is correct here. Non-daemon threads would stop the JVM from exiting, in theory

@@ -169,6 +171,7 @@ private[spark] class TaskSchedulerImpl(
protected val executorIdToHost = new HashMap[String, String]

private val abortTimer = ThreadUtils.newDaemonSingleThreadScheduledExecutor("task-abort-timer")
private val abortFutures = new CopyOnWriteArrayList[ScheduledFuture[_]]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why CopyOnWriteArrayList in particular? maybe fine but would a simple synchronized list be easier?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah. less reads and more writes here.

@@ -959,7 +965,9 @@ private[spark] class TaskSchedulerImpl(
barrierCoordinator.stop()
}
}
starvationFutures.asScala.foreach(_.cancel(false))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My last concern is, do these Futures build up and cause a potential memory leak? I don't think they hold onto large result objects. The Future itself isn't big either. So maybe not an issue. I don't know of a clean way to delete them on completion of individual tasks.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good thinking. Let me revert this part. Maybe ThreadUtils.shutdown(starvationTimer) is good enough.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants