Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Threadpool merge scheduler #120869

Open
wants to merge 198 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 180 commits
Commits
Show all changes
198 commits
Select commit Hold shift + click to select a range
bf557d2
ExecutorMergeScheduler
albertzaharovits Jan 16, 2025
a3f87df
Merge branch 'main' into threadpool-merge-scheduler
albertzaharovits Jan 16, 2025
f5a1a8d
[CI] Auto commit changes from spotless
elasticsearchmachine Jan 16, 2025
f0b72fe
wrap for merge in the executor merge scheduler
albertzaharovits Jan 16, 2025
9b03950
spotless
albertzaharovits Jan 16, 2025
26e4043
Merge branch 'main' into threadpool-merge-scheduler
albertzaharovits Jan 17, 2025
aba69d0
Fix InternalEngineTests
albertzaharovits Jan 17, 2025
52796b5
Merge branch 'main' into threadpool-merge-scheduler
albertzaharovits Jan 17, 2025
c0667bf
implemented Throttling
albertzaharovits Jan 18, 2025
2da753f
Merge branch 'main' into threadpool-merge-scheduler
albertzaharovits Jan 18, 2025
2c8dc7f
[CI] Auto commit changes from spotless
elasticsearchmachine Jan 18, 2025
81cc0f1
Checkstyle
albertzaharovits Jan 18, 2025
f58120f
Fix threadpool size for SnapshotResiliencyTests
albertzaharovits Jan 19, 2025
5ca992d
Spotless
albertzaharovits Jan 19, 2025
3c203cb
Nit
albertzaharovits Jan 19, 2025
6c21654
Implemented max thread setting
albertzaharovits Jan 19, 2025
68079d9
Throttling ?
albertzaharovits Jan 19, 2025
7b68ba9
Checkstyle
albertzaharovits Jan 19, 2025
9e467a1
Indexing throttling !
albertzaharovits Jan 19, 2025
a8f5297
Better throttling logging
albertzaharovits Jan 19, 2025
928fd32
Merge branch 'main' into threadpool-merge-scheduler
albertzaharovits Jan 20, 2025
3f5b4a8
Don't wrap errors during merging
albertzaharovits Jan 20, 2025
0e714a1
Merge branch 'main' into threadpool-merge-scheduler
albertzaharovits Jan 20, 2025
0297cce
Merge branch 'main' into threadpool-merge-scheduler
albertzaharovits Jan 21, 2025
2b79809
Refresh config
albertzaharovits Jan 21, 2025
57c2a5c
Nit
albertzaharovits Jan 21, 2025
60a71b8
WIP
albertzaharovits Jan 23, 2025
68db209
Merge branch 'main' into threadpool-merge-scheduler-sort-all-merges
albertzaharovits Jan 23, 2025
4099ac5
IO throttling
albertzaharovits Jan 24, 2025
5554bc2
Merge branch 'main' into threadpool-merge-scheduler-sort-all-merges
albertzaharovits Jan 24, 2025
17b682f
[CI] Auto commit changes from spotless
elasticsearchmachine Jan 24, 2025
f3506da
this-escape
albertzaharovits Jan 24, 2025
6a3911a
Unregister
albertzaharovits Jan 24, 2025
00070cf
Sort merge schedulers
albertzaharovits Jan 24, 2025
0cf8b0c
Rename to v1
albertzaharovits Jan 24, 2025
f6ed56d
Hmmm, looks OK
albertzaharovits Jan 26, 2025
d1fb28d
Rename WIP
albertzaharovits Jan 26, 2025
d8ca2a9
Merge branch 'main' into threadpool-merge-scheduler-sort-all-merges-t…
albertzaharovits Jan 26, 2025
ef0d757
rename in progress
albertzaharovits Jan 26, 2025
35a5ef6
Rename in progress 2
albertzaharovits Jan 26, 2025
33bd59c
Done?
albertzaharovits Jan 26, 2025
715c7e1
FollowingEngineTests.java
albertzaharovits Jan 26, 2025
2b2c790
[CI] Auto commit changes from spotless
elasticsearchmachine Jan 26, 2025
25d9eef
Ooops set IO rate only if merge task supports it
albertzaharovits Jan 26, 2025
27bffb2
remove mergeDone from onFailure
albertzaharovits Jan 27, 2025
573084d
Execute not submit!
albertzaharovits Jan 27, 2025
c87931e
Merge branch 'main' into threadpool-merge-scheduler-sort-all-merges-t…
albertzaharovits Jan 27, 2025
3857959
synchronizedSet in ThreadPoolMergeQueue
albertzaharovits Jan 27, 2025
5dcf91b
ConcurrentCollections.newConcurrentSet()
albertzaharovits Jan 27, 2025
c5b05e5
submitMergeTask
albertzaharovits Jan 27, 2025
932f3ce
Merge branch 'main' into threadpool-merge-scheduler-sort-all-merges-t…
albertzaharovits Jan 27, 2025
7b52d8e
Ooops
albertzaharovits Jan 27, 2025
a315198
remove explicit unthrottled IO rate
albertzaharovits Jan 27, 2025
cb95685
assert estimated merge bytes is set
albertzaharovits Jan 27, 2025
6a96a02
Remove ThreadLocal<MergeRateLimiter> onGoingMergeRateLimiter
albertzaharovits Jan 27, 2025
60b7b3a
Merge branch 'main' into threadpool-merge-scheduler-sort-all-merges-t…
albertzaharovits Jan 27, 2025
d170de8
Ooops
albertzaharovits Jan 28, 2025
4c974af
Merge branch 'main' into threadpool-merge-scheduler-sort-all-merges-t…
albertzaharovits Jan 28, 2025
d98f350
Node level setting
albertzaharovits Jan 28, 2025
f3c6d5f
Merge branch 'main' into threadpool-merge-scheduler-sort-all-merges-t…
albertzaharovits Jan 28, 2025
7a84fdf
[CI] Auto commit changes from spotless
elasticsearchmachine Jan 28, 2025
cf11da1
Register setting
albertzaharovits Jan 28, 2025
69fc3fc
NOOP
albertzaharovits Jan 28, 2025
67c9a42
revert typo
albertzaharovits Jan 28, 2025
236a6db
Merge branch 'main' into threadpool-merge-scheduler-sort-all-merges-t…
albertzaharovits Jan 28, 2025
aac7490
InternalTestCluster random merge scheduler
albertzaharovits Jan 28, 2025
bafc31b
IT
albertzaharovits Jan 28, 2025
9482d24
Do not build the thread pool executor if the merge scheduler is not used
albertzaharovits Jan 28, 2025
b8cdc13
Merge branch 'main' into threadpool-merge-scheduler-sort-all-merges-t…
albertzaharovits Jan 28, 2025
da0a9e3
[CI] Auto commit changes from spotless
elasticsearchmachine Jan 28, 2025
04f82ba
Remove estimated merge bytes assertion
albertzaharovits Jan 28, 2025
37b9f81
Merge branch 'main' into threadpool-merge-scheduler-sort-all-merges-t…
albertzaharovits Jan 28, 2025
42ee5a9
Remove unused random setting in test cluster
albertzaharovits Jan 29, 2025
5dc5d77
Merge branch 'main' into threadpool-merge-scheduler-sort-all-merges-t…
albertzaharovits Jan 29, 2025
22df845
[CI] Auto commit changes from spotless
elasticsearchmachine Jan 29, 2025
08e272a
Merge branch 'main' into threadpool-merge-scheduler-sort-all-merges-t…
albertzaharovits Jan 29, 2025
26037ba
Update docs/changelog/120869.yaml
albertzaharovits Jan 29, 2025
59ce648
Merge branch 'main' into threadpool-merge-scheduler-sort-all-merges-t…
albertzaharovits Jan 30, 2025
d70ba6c
Merge branch 'main' into threadpool-merge-scheduler-sort-all-merges-t…
albertzaharovits Jan 30, 2025
838cea7
Address review comments 1
albertzaharovits Jan 30, 2025
2b2c60d
Merge branch 'main' into threadpool-merge-scheduler-sort-all-merges-t…
albertzaharovits Jan 30, 2025
b16307a
EngineThreadPoolMergeScheduler
albertzaharovits Jan 30, 2025
89eb0d7
enable/disable indexing throttling
albertzaharovits Jan 30, 2025
a9f4823
Update comment
albertzaharovits Jan 30, 2025
5e1b508
AtomicLong for merge start time
albertzaharovits Jan 30, 2025
c74d9e8
Merge branch 'main' into threadpool-merge-scheduler-sort-all-merges-t…
albertzaharovits Jan 30, 2025
8ad122f
[CI] Auto commit changes from spotless
elasticsearchmachine Jan 30, 2025
6ae7425
no-op onFailure
albertzaharovits Jan 30, 2025
680868e
Merge branch 'main' into threadpool-merge-scheduler-sort-all-merges-t…
albertzaharovits Jan 30, 2025
779eace
Checkstyle
albertzaharovits Jan 30, 2025
6304852
make MergeTask a Runnable rather than an AbstractRunnable
albertzaharovits Jan 30, 2025
8d2c382
Merge branch 'main' into threadpool-merge-scheduler-sort-all-merges-t…
albertzaharovits Feb 14, 2025
189b1b7
InternalEngineMergeIT
albertzaharovits Feb 14, 2025
436ef52
Assert empty merge queue after internal IT
albertzaharovits Feb 14, 2025
57ad622
Merge branch 'main' into threadpool-merge-scheduler-sort-all-merges-t…
albertzaharovits Feb 15, 2025
c3d4001
Random merge queue in RefreshListenersTests
albertzaharovits Feb 15, 2025
b943275
Merge branch 'main' into threadpool-merge-scheduler-sort-all-merges-t…
albertzaharovits Feb 15, 2025
4fdd97f
Fix activeIOThrottledMergeTasksCount
albertzaharovits Feb 15, 2025
9f103e1
comment on maybeUpdateTargetMBPerSec
albertzaharovits Feb 16, 2025
32dfaf8
targetIORateBytesPerSec
albertzaharovits Feb 16, 2025
3e4f726
maybeUpdateIORateBytesPerSec
albertzaharovits Feb 16, 2025
091dcba
submittedIOThrottlingMergeTasks
albertzaharovits Feb 16, 2025
a0874ee
Merge branch 'main' into threadpool-merge-scheduler-sort-all-merges-t…
albertzaharovits Feb 16, 2025
79f8462
Nits
albertzaharovits Feb 17, 2025
f2b1472
ESIntegTestCase#getShardSegments
albertzaharovits Feb 17, 2025
9cb4f48
Merge branch 'main' into threadpool-merge-scheduler-sort-all-merges-t…
albertzaharovits Feb 17, 2025
5f9a293
[CI] Auto commit changes from spotless
elasticsearchmachine Feb 17, 2025
9d2d29d
Closing not quite there....
albertzaharovits Feb 18, 2025
1e78f21
closing WIP2
albertzaharovits Feb 20, 2025
137b8f5
Closeable finally sorted out
albertzaharovits Feb 20, 2025
2d42ded
Merge branch 'main' into threadpool-merge-scheduler-sort-all-merges-t…
albertzaharovits Feb 20, 2025
a2a6362
Maybe
albertzaharovits Feb 20, 2025
1348863
Avoid wait/notify
albertzaharovits Feb 20, 2025
cc24fce
[CI] Auto commit changes from spotless
elasticsearchmachine Feb 20, 2025
8bd1d7d
Fix close
albertzaharovits Feb 20, 2025
0431299
Nits
albertzaharovits Feb 20, 2025
2b14346
Merge branch 'main' into threadpool-merge-scheduler-sort-all-merges-t…
albertzaharovits Feb 20, 2025
6401d86
[CI] Auto commit changes from spotless
elasticsearchmachine Feb 20, 2025
b09863a
Rename ThreadPoolMergeQueue -> ThreadPoolMergeExecutorService
albertzaharovits Feb 21, 2025
2614b5d
Nit
albertzaharovits Feb 21, 2025
5122b7f
Merge branch 'main' into threadpool-merge-scheduler-sort-all-merges-t…
albertzaharovits Feb 21, 2025
88e8754
[CI] Auto commit changes from spotless
elasticsearchmachine Feb 21, 2025
5ae44c5
Fixed close after removing wait/notify
albertzaharovits Feb 21, 2025
b2e65a2
Handle aborted merge tasks at any point
albertzaharovits Feb 23, 2025
feed4ca
synchronized close
albertzaharovits Feb 23, 2025
ee0e3e2
Merge branch 'main' into threadpool-merge-scheduler-sort-all-merges-t…
albertzaharovits Feb 23, 2025
b0e2da0
Nits
albertzaharovits Feb 23, 2025
257a13f
[CI] Auto commit changes from spotless
elasticsearchmachine Feb 23, 2025
12944a5
Sometimes merges are disabled
albertzaharovits Feb 23, 2025
795c9b0
[CI] Auto commit changes from spotless
elasticsearchmachine Feb 23, 2025
8bb890c
Rename
albertzaharovits Feb 23, 2025
9b09dba
Run merges when aborted too
albertzaharovits Feb 25, 2025
28d96f2
Nits
albertzaharovits Feb 25, 2025
dcd2af1
Merge branch 'main' into threadpool-merge-scheduler-sort-all-merges-t…
albertzaharovits Feb 25, 2025
519c48e
Simple tests
albertzaharovits Feb 25, 2025
c15785f
Merge branch 'main' into threadpool-merge-scheduler-sort-all-merges-t…
albertzaharovits Feb 26, 2025
79ccc74
[CI] Auto commit changes from spotless
elasticsearchmachine Feb 26, 2025
822501b
testBackloggedMergeTasksAreAllExecutedExactlyOnce
albertzaharovits Feb 26, 2025
6931a95
[CI] Auto commit changes from spotless
elasticsearchmachine Feb 26, 2025
a76744a
Nit
albertzaharovits Feb 26, 2025
ba4c7ee
WIP
albertzaharovits Feb 27, 2025
f163f36
testBackloggedMergeTasksExecuteInSizeOrder
albertzaharovits Feb 28, 2025
66fa090
Merge branch 'main' into threadpool-merge-scheduler-sort-all-merges-t…
albertzaharovits Feb 28, 2025
4eb5707
[CI] Auto commit changes from spotless
elasticsearchmachine Feb 28, 2025
23ae2b4
synchronized (shouldThrottleIncomingMerges)
albertzaharovits Feb 28, 2025
93ea664
test nits
albertzaharovits Feb 28, 2025
e215fae
Nits
albertzaharovits Feb 28, 2025
9729e9c
testIORateAdjustedForNewlySubmittedTasks
albertzaharovits Feb 28, 2025
a4da44a
[CI] Auto commit changes from spotless
elasticsearchmachine Feb 28, 2025
48dce91
Merge branch 'main' into threadpool-merge-scheduler-sort-all-merges-t…
albertzaharovits Mar 2, 2025
68c9bcf
testIORateAdjustedForSubmittedTasksWhenExecutionRateIs*
albertzaharovits Mar 2, 2025
58fa5b3
[CI] Auto commit changes from spotless
elasticsearchmachine Mar 2, 2025
9f9fc86
Nit
albertzaharovits Mar 2, 2025
dce963f
testThreadPoolStatsWithBackloggedMergeTasks
albertzaharovits Mar 2, 2025
6da15aa
[CI] Auto commit changes from spotless
elasticsearchmachine Mar 2, 2025
2002d0a
Merge branch 'main' into threadpool-merge-scheduler-sort-all-merges-t…
albertzaharovits Mar 3, 2025
30e4860
Fix RetrySearchIntegTests
albertzaharovits Mar 3, 2025
cd39b81
testMergeTasksRunConcurrently
albertzaharovits Mar 3, 2025
9e1215a
[CI] Auto commit changes from spotless
elasticsearchmachine Mar 3, 2025
769e43b
Fix testThreadPoolStatsWithBackloggedMergeTasks
albertzaharovits Mar 3, 2025
aeca6b8
Test nits
albertzaharovits Mar 3, 2025
86e6117
Test method name nit
albertzaharovits Mar 3, 2025
261902e
testIORateAdjustedForRunningTasks
albertzaharovits Mar 4, 2025
7f6ce86
Merge branch 'main' into threadpool-merge-scheduler-sort-all-merges-t…
albertzaharovits Mar 4, 2025
155c110
Ooops remove +1 from floor/ceiling
albertzaharovits Mar 4, 2025
d0a4a3f
Test comments
albertzaharovits Mar 4, 2025
31b0867
testTargetIORateChangesWhenSubmittingMergeTasks
albertzaharovits Mar 4, 2025
0003f10
testMergesExecuteInSizeOrder
albertzaharovits Mar 5, 2025
321a332
Merge branch 'main' into threadpool-merge-scheduler-sort-all-merges-t…
albertzaharovits Mar 5, 2025
553b08a
[CI] Auto commit changes from spotless
elasticsearchmachine Mar 5, 2025
dc80bef
Test nit
albertzaharovits Mar 5, 2025
34ca0fa
testMergeSourceWithFollowUpMergesRunSequentially
albertzaharovits Mar 5, 2025
fda3f86
[CI] Auto commit changes from spotless
elasticsearchmachine Mar 5, 2025
3586ee1
test nit comments
albertzaharovits Mar 6, 2025
1fa7ba8
testMergesRunConcurrently
albertzaharovits Mar 6, 2025
0b55e35
[CI] Auto commit changes from spotless
elasticsearchmachine Mar 6, 2025
054fda0
Move catch (InterruptedException e) in ThreadPoolMergeExecutorService
albertzaharovits Mar 6, 2025
0959714
Comment about interrupt
albertzaharovits Mar 6, 2025
e75c94c
comment on 0 for estimatedMergeBytes
albertzaharovits Mar 6, 2025
e57f06b
testSchedulerCloseWaitsForRunningMerge
albertzaharovits Mar 6, 2025
910289f
Merge branch 'main' into threadpool-merge-scheduler-sort-all-merges-t…
albertzaharovits Mar 6, 2025
669a87e
Merge branch 'main' into threadpool-merge-scheduler-sort-all-merges-t…
albertzaharovits Mar 7, 2025
34ab7f6
abortOnGoingMerge -> abort
albertzaharovits Mar 7, 2025
669b349
Nit
albertzaharovits Mar 7, 2025
c2b0cce
nits
albertzaharovits Mar 8, 2025
d03d808
Merge branch 'main' into threadpool-merge-scheduler-sort-all-merges-t…
albertzaharovits Mar 8, 2025
1d7a94e
currentlyRunningMergeTasks -> runningMergeTasks
albertzaharovits Mar 9, 2025
d975a1c
Merge branch 'main' into threadpool-merge-scheduler-sort-all-merges-t…
albertzaharovits Mar 9, 2025
c35518a
nit
albertzaharovits Mar 9, 2025
a217d12
ThreadPoolMergeScheduler.Schedule schedule = smallestMergeTask.schedu…
albertzaharovits Mar 9, 2025
1fba8a1
[CI] Auto commit changes from spotless
elasticsearchmachine Mar 9, 2025
7ac3328
currentlySubmittedIOThrottledMergeTasksCount -> ioThrottledMergeTasks…
albertzaharovits Mar 9, 2025
fe7e9eb
Fix ThreadPoolMergeExecutorServiceTests
albertzaharovits Mar 9, 2025
171fc22
[CI] Auto commit changes from spotless
elasticsearchmachine Mar 9, 2025
79e6abf
Enhance ThreadPoolMergeExecutorServiceTests post MergeTask#abort
albertzaharovits Mar 9, 2025
a25940d
Checkstyle
albertzaharovits Mar 9, 2025
507896e
AtomicIORate
albertzaharovits Mar 9, 2025
eb6279e
[CI] Auto commit changes from spotless
elasticsearchmachine Mar 9, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions docs/changelog/120869.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 120869
summary: Threadpool merge scheduler
area: Engine
type: feature
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -8,24 +8,40 @@
*/
package org.elasticsearch.index.engine;

import org.elasticsearch.action.admin.cluster.node.stats.NodeStats;
import org.elasticsearch.action.admin.cluster.node.stats.NodesStatsResponse;
import org.elasticsearch.action.admin.indices.stats.IndicesStatsResponse;
import org.elasticsearch.action.bulk.BulkRequestBuilder;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.test.ESIntegTestCase.ClusterScope;
import org.elasticsearch.test.ESIntegTestCase.Scope;
import org.elasticsearch.threadpool.ThreadPool;

import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoFailures;
import static org.elasticsearch.xcontent.XContentFactory.jsonBuilder;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.lessThan;
import static org.hamcrest.Matchers.lessThanOrEqualTo;

@ClusterScope(supportsDedicatedMasters = false, numDataNodes = 1, scope = Scope.SUITE)
@ClusterScope(supportsDedicatedMasters = false, numDataNodes = 1, numClientNodes = 0, scope = Scope.TEST)
public class InternalEngineMergeIT extends ESIntegTestCase {

private boolean useThreadPoolMerging;

@Override
protected Settings nodeSettings(int nodeOrdinal, Settings otherSettings) {
useThreadPoolMerging = randomBoolean();
Settings.Builder settings = Settings.builder().put(super.nodeSettings(nodeOrdinal, otherSettings));
settings.put(ThreadPoolMergeScheduler.USE_THREAD_POOL_MERGE_SCHEDULER_SETTING.getKey(), useThreadPoolMerging);
return settings.build();
}

public void testMergesHappening() throws Exception {
final int numOfShards = randomIntBetween(1, 5);
// some settings to keep num segments low
Expand Down Expand Up @@ -83,4 +99,60 @@ public void testMergesHappening() throws Exception {
assertThat(count, lessThanOrEqualTo(upperNumberSegments));
}

public void testMergesUseTheMergeThreadPool() throws Exception {
final String indexName = randomIdentifier();
createIndex(indexName, indexSettings(randomIntBetween(1, 3), 0).build());
long id = 0;
final int minMerges = randomIntBetween(1, 5);
long totalDocs = 0;

while (true) {
int docs = randomIntBetween(100, 200);
totalDocs += docs;

BulkRequestBuilder request = client().prepareBulk();
for (int j = 0; j < docs; ++j) {
request.add(
new IndexRequest(indexName).id(Long.toString(id++))
.source(jsonBuilder().startObject().field("l", randomLong()).endObject())
);
}
BulkResponse response = request.get();
assertNoFailures(response);
refresh(indexName);

var mergesResponse = client().admin().indices().prepareStats(indexName).clear().setMerge(true).get();
var primaries = mergesResponse.getIndices().get(indexName).getPrimaries();
if (primaries.merge.getTotal() >= minMerges) {
break;
}
}

forceMerge();
refresh(indexName);

// after a force merge there should only be 1 segment per shard
var shardsWithMultipleSegments = getShardSegments().stream()
.filter(shardSegments -> shardSegments.getSegments().size() > 1)
.toList();
assertTrue("there are shards with multiple segments " + shardsWithMultipleSegments, shardsWithMultipleSegments.isEmpty());

final long expectedTotalDocs = totalDocs;
assertHitCount(prepareSearch(indexName).setQuery(QueryBuilders.matchAllQuery()).setTrackTotalHits(true), expectedTotalDocs);

IndicesStatsResponse indicesStats = client().admin().indices().prepareStats(indexName).setMerge(true).get();
long mergeCount = indicesStats.getIndices().get(indexName).getPrimaries().merge.getTotal();
NodesStatsResponse nodesStatsResponse = client().admin().cluster().prepareNodesStats().setThreadPool(true).get();
assertThat(nodesStatsResponse.getNodes().size(), equalTo(1));

NodeStats nodeStats = nodesStatsResponse.getNodes().get(0);
if (useThreadPoolMerging) {
assertThat(
nodeStats.getThreadPool().stats().stream().filter(s -> ThreadPool.Names.MERGE.equals(s.name())).findAny().get().completed(),
equalTo(mergeCount)
);
} else {
assertTrue(nodeStats.getThreadPool().stats().stream().filter(s -> ThreadPool.Names.MERGE.equals(s.name())).findAny().isEmpty());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -625,6 +625,7 @@ public static final IndexShard newIndexShard(
indexService.getIndexEventListener(),
wrapper,
indexService.getThreadPool(),
indexService.getThreadPoolMergeExecutorService(),
indexService.getBigArrays(),
null,
Collections.emptyList(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ EngineConfig engineConfigWithLargerIndexingMemory(EngineConfig config) {
return new EngineConfig(
config.getShardId(),
config.getThreadPool(),
config.getThreadPoolMergeExecutorService(),
indexSettings,
config.getWarmer(),
config.getStore(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.IndexingPressure;
import org.elasticsearch.index.MergePolicyConfig;
import org.elasticsearch.index.engine.ThreadPoolMergeScheduler;
import org.elasticsearch.indices.IndexingMemoryController;
import org.elasticsearch.indices.IndicesQueryCache;
import org.elasticsearch.indices.IndicesRequestCache;
Expand Down Expand Up @@ -624,6 +625,7 @@ public void apply(Settings value, Settings current, Settings previous) {
TDigestExecutionHint.SETTING,
MergePolicyConfig.DEFAULT_MAX_MERGED_SEGMENT_SETTING,
MergePolicyConfig.DEFAULT_MAX_TIME_BASED_MERGED_SEGMENT_SETTING,
ThreadPoolMergeScheduler.USE_THREAD_POOL_MERGE_SCHEDULER_SETTING,
TransportService.ENABLE_STACK_OVERFLOW_AVOIDANCE,
DataStreamGlobalRetentionSettings.DATA_STREAMS_DEFAULT_RETENTION_SETTING,
DataStreamGlobalRetentionSettings.DATA_STREAMS_MAX_RETENTION_SETTING,
Expand Down
3 changes: 3 additions & 0 deletions server/src/main/java/org/elasticsearch/index/IndexModule.java
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import org.elasticsearch.index.cache.query.QueryCache;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.engine.EngineFactory;
import org.elasticsearch.index.engine.ThreadPoolMergeExecutorService;
import org.elasticsearch.index.mapper.IdFieldMapper;
import org.elasticsearch.index.mapper.MapperMetrics;
import org.elasticsearch.index.mapper.MapperRegistry;
Expand Down Expand Up @@ -470,6 +471,7 @@ public IndexService newIndexService(
CircuitBreakerService circuitBreakerService,
BigArrays bigArrays,
ThreadPool threadPool,
ThreadPoolMergeExecutorService threadPoolMergeExecutorService,
ScriptService scriptService,
ClusterService clusterService,
Client client,
Expand Down Expand Up @@ -523,6 +525,7 @@ public IndexService newIndexService(
circuitBreakerService,
bigArrays,
threadPool,
threadPoolMergeExecutorService,
scriptService,
clusterService,
client,
Expand Down
10 changes: 10 additions & 0 deletions server/src/main/java/org/elasticsearch/index/IndexService.java
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import org.elasticsearch.index.cache.query.QueryCache;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.engine.EngineFactory;
import org.elasticsearch.index.engine.ThreadPoolMergeExecutorService;
import org.elasticsearch.index.fielddata.FieldDataContext;
import org.elasticsearch.index.fielddata.IndexFieldData;
import org.elasticsearch.index.fielddata.IndexFieldDataCache;
Expand Down Expand Up @@ -154,6 +155,8 @@ public class IndexService extends AbstractIndexComponent implements IndicesClust

private final AsyncTrimTranslogTask trimTranslogTask;
private final ThreadPool threadPool;
@Nullable
private final ThreadPoolMergeExecutorService threadPoolMergeExecutorService;
private final BigArrays bigArrays;
private final ScriptService scriptService;
private final ClusterService clusterService;
Expand All @@ -178,6 +181,7 @@ public IndexService(
CircuitBreakerService circuitBreakerService,
BigArrays bigArrays,
ThreadPool threadPool,
ThreadPoolMergeExecutorService threadPoolMergeExecutorService,
ScriptService scriptService,
ClusterService clusterService,
Client client,
Expand Down Expand Up @@ -261,6 +265,7 @@ public IndexService(
this.indexFoldersDeletionListener = indexFoldersDeletionListener;
this.bigArrays = bigArrays;
this.threadPool = threadPool;
this.threadPoolMergeExecutorService = threadPoolMergeExecutorService;
this.scriptService = scriptService;
this.clusterService = clusterService;
this.client = client;
Expand Down Expand Up @@ -556,6 +561,7 @@ public synchronized IndexShard createShard(
eventListener,
readerWrapper,
threadPool,
threadPoolMergeExecutorService,
bigArrays,
engineWarmer,
searchOperationListeners,
Expand Down Expand Up @@ -820,6 +826,10 @@ public ThreadPool getThreadPool() {
return threadPool;
}

public @Nullable ThreadPoolMergeExecutorService getThreadPoolMergeExecutorService() {
return threadPoolMergeExecutorService;
}

/**
* The {@link BigArrays} to use for this index.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,8 @@ public final class EngineConfig {
private final MapperService mapperService;
private final IndexStorePlugin.SnapshotCommitSupplier snapshotCommitSupplier;
private final ThreadPool threadPool;
@Nullable
private final ThreadPoolMergeExecutorService threadPoolMergeExecutorService;
private final Engine.Warmer warmer;
private final Store store;
private final MergePolicy mergePolicy;
Expand Down Expand Up @@ -150,6 +152,7 @@ public Supplier<RetentionLeases> retentionLeasesSupplier() {
public EngineConfig(
ShardId shardId,
ThreadPool threadPool,
ThreadPoolMergeExecutorService threadPoolMergeExecutorService,
IndexSettings indexSettings,
Engine.Warmer warmer,
Store store,
Expand Down Expand Up @@ -179,6 +182,7 @@ public EngineConfig(
this.shardId = shardId;
this.indexSettings = indexSettings;
this.threadPool = threadPool;
this.threadPoolMergeExecutorService = threadPoolMergeExecutorService;
this.warmer = warmer == null ? (a) -> {} : warmer;
this.store = store;
this.mergePolicy = mergePolicy;
Expand Down Expand Up @@ -287,6 +291,10 @@ public ThreadPool getThreadPool() {
return threadPool;
}

public @Nullable ThreadPoolMergeExecutorService getThreadPoolMergeExecutorService() {
return threadPoolMergeExecutorService;
}

/**
* Returns an {@link org.elasticsearch.index.engine.Engine.Warmer} used to warm new searchers before they are used for searching.
*/
Expand Down
Loading