Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Threadpool merge scheduler #120869

Open
wants to merge 198 commits into
base: main
Choose a base branch
from

Conversation

albertzaharovits
Copy link
Contributor

@albertzaharovits albertzaharovits commented Jan 26, 2025

This adds a new merge scheduler implementation that uses a (new) dedicated thread pool to run the merges. This way the number of concurrent merges is limited to the number of threads in the pool (i.e. the number of allocated processors to the ES JVM).

It implements dynamic IO throttling (the same target IO rate for all merges, roughly, with caveats) that's adjusted based on the number of currently active (queued + running) merges.
Smaller merges are always preferred to larger ones, irrespective of the index shard that they're coming from.
The implementation also supports the per-shard "max thread count" and "max merge count" settings, the later being used today for indexing throttling.
Note that IO throttling, max merge count, and max thread count work similarly, but not identical, to their siblings in the ConcurrentMergeScheduler.

The per-shard merge statistics are not affected, and the thread-pool statistics should reflect the merge ones (i.e. the completed thread pool stats reflects the total number of merges, across shards, per node).

Copy link
Contributor

@henningandersen henningandersen left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Read through the main parts, left some comments, let me know if we need to discuss any of them.

smallestMergeTask = null;
// the merge task is backlogged by the merge scheduler, try to get the next smallest one
// it's then the duty of the said merge scheduler to re-enqueue the backlogged merge task when it can be run
} catch (InterruptedException e) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we only use interrupt at shutdown time, perhaps we should not loop then but rather exit? Or do we need to run a merge regardless? It seems like at shutdown we may never execute the runnable (might be rejected) so it feels slight inconsistent?

}
}

private void maybeUpdateIORateBytesPerSec(int currentlySubmittedIOThrottledMergeTasks) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this method is more or less updateAndGet with a special case for when the calculation results in the same value (since then we do not want to update all threads).

I wonder if it would be clearer to have an updateAndGet method that is a copy of the original updateAndGet, with the if (prev == next) condition returning -1 or Long.MIN_VALUE as a sentinel value?

As it is now, this seems quite custom and thus hard to read vs something that is just an updateAndGet and then the function as separate parts.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point! I've pushed 507896e

}
}

void abortOnGoingMerge() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The name is slightly confusing. I think this only works if the task did not start yet. earlyAbort or abort would seem more suitable to me.

Can we verify that mergeStartTimeNS is not set yet as an assertion? And maybe set it to ensure run is not called?

And document in javadoc, that we expect one or the other only, never both on the same task.

Copy link
Contributor Author

@albertzaharovits albertzaharovits Mar 9, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Addressed in 34ab7f6 (renamed to abort).

if (closed) {
// Do not backlog or execute tasks when closing the merge scheduler, instead abort them.
mergeTask.abortOnGoingMerge();
throw new ElasticsearchException("merge task aborted because scheduler is shutting down");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am somewhat in doubt about throwing here or returning true. I get that if we return true we should not call doMerge in abortOngoingMerge when called from this callsite. I'd probably find it slightly more intuitive, since the exceptional termination here seems like a 3rd return value.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've refactored the runNowOrBacklog method to a different schedule method, that indeed has 3 return values: a217d12 .
Aborting and running merge tasks have to do some accounting and cleanup. Backlogging merge tasks doesn't need to do anything.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
:Distributed Indexing/Engine Anything around managing Lucene and the Translog in an open shard. >feature serverless-linked Added by automation, don't add manually Team:Distributed Indexing Meta label for Distributed Indexing team v9.1.0
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants