Skip to content

Commit 3d0444b

Browse files
committed
Add adaptive merge policy to reduce benchmark variance
Signed-off-by: Sriram Ganesh <[email protected]> Fixed the style check issues Signed-off-by: Sriram Ganesh <[email protected]> Fixed javadocs Signed-off-by: Sriram Ganesh <[email protected]> Refactored the code Signed-off-by: Sriram Ganesh <[email protected]> Changed the api version Signed-off-by: Sriram Ganesh <[email protected]> Refactored the code Signed-off-by: Sriram Ganesh <[email protected]>
1 parent 09d3d26 commit 3d0444b

File tree

10 files changed

+1222
-0
lines changed

10 files changed

+1222
-0
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
2929
- Add all-active ingestion as docrep equivalent in pull-based ingestion ([#19316](https://github.com/opensearch-project/OpenSearch/pull/19316))
3030
- Adding logic for histogram aggregation using skiplist ([#19130](https://github.com/opensearch-project/OpenSearch/pull/19130))
3131
- Add skip_list param for date, scaled float and token count fields ([#19142](https://github.com/opensearch-project/OpenSearch/pull/19142))
32+
- Add adaptive merge policy to reduce benchmark variance by dynamically adjusting segment merge settings based on shard size ([#11163](https://github.com/opensearch-project/OpenSearch/issues/11163))
3233

3334
### Changed
3435
- Refactor `if-else` chains to use `Java 17 pattern matching switch expressions`(([#18965](https://github.com/opensearch-project/OpenSearch/pull/18965))

server/src/main/java/org/opensearch/action/ActionModule.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -343,6 +343,7 @@
343343
import org.opensearch.rest.RestHeaderDefinition;
344344
import org.opensearch.rest.action.RestFieldCapabilitiesAction;
345345
import org.opensearch.rest.action.RestMainAction;
346+
import org.opensearch.rest.action.admin.RestSegmentTopologyAction;
346347
import org.opensearch.rest.action.admin.cluster.RestAddVotingConfigExclusionAction;
347348
import org.opensearch.rest.action.admin.cluster.RestCancelTasksAction;
348349
import org.opensearch.rest.action.admin.cluster.RestCleanupRepositoryAction;
@@ -1007,6 +1008,7 @@ public void initRestHandlers(Supplier<DiscoveryNodes> nodesInCluster) {
10071008
registerHandler.accept(new RestTasksAction(nodesInCluster));
10081009
registerHandler.accept(new RestIndicesAction(responseLimitSettings));
10091010
registerHandler.accept(new RestSegmentsAction(responseLimitSettings));
1011+
registerHandler.accept(new RestSegmentTopologyAction());
10101012
// Fully qualified to prevent interference with rest.action.count.RestCountAction
10111013
registerHandler.accept(new org.opensearch.rest.action.cat.RestCountAction());
10121014
// Fully qualified to prevent interference with rest.action.indices.RestRecoveryAction
Lines changed: 263 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,263 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.index;
10+
11+
import org.apache.logging.log4j.Logger;
12+
import org.apache.lucene.index.TieredMergePolicy;
13+
import org.opensearch.common.annotation.PublicApi;
14+
import org.opensearch.core.common.unit.ByteSizeUnit;
15+
import org.opensearch.core.common.unit.ByteSizeValue;
16+
import org.opensearch.index.store.Store;
17+
18+
/**
19+
* An adaptive merge policy provider that adjusts merge settings based on shard size
20+
* to optimize segment topology and reduce benchmark variance.
21+
*
22+
* This addresses the issue described in https://github.com/opensearch-project/OpenSearch/issues/11163
23+
* by providing more intelligent default merge settings that adapt to the actual shard size.
24+
*
25+
* @opensearch.api
26+
*/
27+
@PublicApi(since = "3.3.0")
28+
public class AdaptiveTieredMergePolicyProvider implements MergePolicyProvider {
29+
30+
private final Logger logger;
31+
private final OpenSearchTieredMergePolicy tieredMergePolicy;
32+
private final IndexSettings indexSettings;
33+
private Store store;
34+
private boolean mergesEnabled;
35+
36+
// Adaptive settings based on shard size
37+
private static final ByteSizeValue SMALL_SHARD_THRESHOLD = new ByteSizeValue(100, ByteSizeUnit.MB);
38+
private static final ByteSizeValue MEDIUM_SHARD_THRESHOLD = new ByteSizeValue(1, ByteSizeUnit.GB);
39+
private static final ByteSizeValue LARGE_SHARD_THRESHOLD = new ByteSizeValue(10, ByteSizeUnit.GB);
40+
41+
// Adaptive max segment sizes
42+
private static final ByteSizeValue SMALL_SHARD_MAX_SEGMENT = new ByteSizeValue(50, ByteSizeUnit.MB);
43+
private static final ByteSizeValue MEDIUM_SHARD_MAX_SEGMENT = new ByteSizeValue(200, ByteSizeUnit.MB);
44+
private static final ByteSizeValue LARGE_SHARD_MAX_SEGMENT = new ByteSizeValue(1, ByteSizeUnit.GB);
45+
private static final ByteSizeValue VERY_LARGE_SHARD_MAX_SEGMENT = new ByteSizeValue(2, ByteSizeUnit.GB);
46+
47+
// Adaptive floor segment sizes
48+
private static final ByteSizeValue SMALL_SHARD_FLOOR = new ByteSizeValue(10, ByteSizeUnit.MB);
49+
private static final ByteSizeValue MEDIUM_SHARD_FLOOR = new ByteSizeValue(25, ByteSizeUnit.MB);
50+
private static final ByteSizeValue LARGE_SHARD_FLOOR = new ByteSizeValue(50, ByteSizeUnit.MB);
51+
private static final ByteSizeValue VERY_LARGE_SHARD_FLOOR = new ByteSizeValue(100, ByteSizeUnit.MB);
52+
53+
// Adaptive segments per tier
54+
private static final double SMALL_SHARD_SEGMENTS_PER_TIER = 5.0;
55+
private static final double MEDIUM_SHARD_SEGMENTS_PER_TIER = 8.0;
56+
private static final double LARGE_SHARD_SEGMENTS_PER_TIER = 10.0;
57+
private static final double VERY_LARGE_SHARD_SEGMENTS_PER_TIER = 12.0;
58+
59+
public AdaptiveTieredMergePolicyProvider(Logger logger, IndexSettings indexSettings) {
60+
this.logger = logger;
61+
this.indexSettings = indexSettings;
62+
this.store = null; // Will be set later via setStore()
63+
this.tieredMergePolicy = new OpenSearchTieredMergePolicy();
64+
this.mergesEnabled = indexSettings.getSettings().getAsBoolean("index.merge.enabled", true);
65+
66+
if (mergesEnabled == false) {
67+
logger.warn(
68+
"[index.merge.enabled] is set to false, this should only be used in tests and can cause serious problems in production environments"
69+
);
70+
}
71+
72+
// Initialize with default settings first, will be updated when store is available
73+
applyDefaultSettings();
74+
}
75+
76+
public AdaptiveTieredMergePolicyProvider(Logger logger, IndexSettings indexSettings, Store store) {
77+
this.logger = logger;
78+
this.indexSettings = indexSettings;
79+
this.store = store;
80+
this.tieredMergePolicy = new OpenSearchTieredMergePolicy();
81+
this.mergesEnabled = indexSettings.getSettings().getAsBoolean("index.merge.enabled", true);
82+
83+
if (mergesEnabled == false) {
84+
logger.warn(
85+
"[index.merge.enabled] is set to false, this should only be used in tests and can cause serious problems in production environments"
86+
);
87+
}
88+
89+
// Initialize with adaptive settings
90+
initializeAdaptiveSettings();
91+
}
92+
93+
private void initializeAdaptiveSettings() {
94+
try {
95+
// Estimate shard size from store
96+
long estimatedShardSize = estimateShardSize();
97+
ShardSizeCategory category = categorizeShardSize(estimatedShardSize);
98+
99+
// Apply adaptive settings based on shard size category
100+
applyAdaptiveSettings(category);
101+
102+
logger.debug(
103+
"Initialized adaptive merge policy for shard size category: {} (estimated size: {})",
104+
category,
105+
new ByteSizeValue(estimatedShardSize)
106+
);
107+
108+
} catch (Exception e) {
109+
logger.warn("Failed to initialize adaptive settings, falling back to defaults: {}", e.getMessage());
110+
applyDefaultSettings();
111+
}
112+
}
113+
114+
private long estimateShardSize() {
115+
if (store == null) {
116+
// Fallback to a reasonable default when store is not available
117+
return MEDIUM_SHARD_THRESHOLD.getBytes();
118+
}
119+
try {
120+
// Try to get a rough estimate of shard size from the store
121+
// This is a best-effort estimation - using directory size as proxy
122+
return store.directory().listAll().length * 1024 * 1024; // Rough estimate
123+
} catch (Exception e) {
124+
// Fallback to a reasonable default
125+
return MEDIUM_SHARD_THRESHOLD.getBytes();
126+
}
127+
}
128+
129+
private ShardSizeCategory categorizeShardSize(long sizeBytes) {
130+
if (sizeBytes < SMALL_SHARD_THRESHOLD.getBytes()) {
131+
return ShardSizeCategory.SMALL;
132+
} else if (sizeBytes < MEDIUM_SHARD_THRESHOLD.getBytes()) {
133+
return ShardSizeCategory.MEDIUM;
134+
} else if (sizeBytes < LARGE_SHARD_THRESHOLD.getBytes()) {
135+
return ShardSizeCategory.LARGE;
136+
} else {
137+
return ShardSizeCategory.VERY_LARGE;
138+
}
139+
}
140+
141+
private void applyAdaptiveSettings(ShardSizeCategory category) {
142+
ByteSizeValue maxSegmentSize;
143+
ByteSizeValue floorSegmentSize;
144+
double segmentsPerTier;
145+
146+
switch (category) {
147+
case SMALL:
148+
maxSegmentSize = SMALL_SHARD_MAX_SEGMENT;
149+
floorSegmentSize = SMALL_SHARD_FLOOR;
150+
segmentsPerTier = SMALL_SHARD_SEGMENTS_PER_TIER;
151+
break;
152+
case MEDIUM:
153+
maxSegmentSize = MEDIUM_SHARD_MAX_SEGMENT;
154+
floorSegmentSize = MEDIUM_SHARD_FLOOR;
155+
segmentsPerTier = MEDIUM_SHARD_SEGMENTS_PER_TIER;
156+
break;
157+
case LARGE:
158+
maxSegmentSize = LARGE_SHARD_MAX_SEGMENT;
159+
floorSegmentSize = LARGE_SHARD_FLOOR;
160+
segmentsPerTier = LARGE_SHARD_SEGMENTS_PER_TIER;
161+
break;
162+
case VERY_LARGE:
163+
maxSegmentSize = VERY_LARGE_SHARD_MAX_SEGMENT;
164+
floorSegmentSize = VERY_LARGE_SHARD_FLOOR;
165+
segmentsPerTier = VERY_LARGE_SHARD_SEGMENTS_PER_TIER;
166+
break;
167+
default:
168+
maxSegmentSize = MEDIUM_SHARD_MAX_SEGMENT;
169+
floorSegmentSize = MEDIUM_SHARD_FLOOR;
170+
segmentsPerTier = MEDIUM_SHARD_SEGMENTS_PER_TIER;
171+
}
172+
173+
// Apply the adaptive settings
174+
tieredMergePolicy.setMaxMergedSegmentMB(maxSegmentSize.getMbFrac());
175+
tieredMergePolicy.setFloorSegmentMB(floorSegmentSize.getMbFrac());
176+
tieredMergePolicy.setSegmentsPerTier(segmentsPerTier);
177+
178+
// Keep other settings at reasonable defaults
179+
tieredMergePolicy.setMaxMergeAtOnce(10);
180+
tieredMergePolicy.setForceMergeDeletesPctAllowed(10.0);
181+
tieredMergePolicy.setDeletesPctAllowed(20.0);
182+
tieredMergePolicy.setNoCFSRatio(TieredMergePolicy.DEFAULT_NO_CFS_RATIO);
183+
184+
logger.info(
185+
"Applied adaptive merge settings - max_segment: {}, floor_segment: {}, segments_per_tier: {}",
186+
maxSegmentSize,
187+
floorSegmentSize,
188+
segmentsPerTier
189+
);
190+
}
191+
192+
private void applyDefaultSettings() {
193+
// Fallback to the original default settings
194+
tieredMergePolicy.setMaxMergedSegmentMB(5 * 1024); // 5GB
195+
tieredMergePolicy.setFloorSegmentMB(16); // 16MB
196+
tieredMergePolicy.setSegmentsPerTier(10.0);
197+
tieredMergePolicy.setMaxMergeAtOnce(10);
198+
tieredMergePolicy.setForceMergeDeletesPctAllowed(10.0);
199+
tieredMergePolicy.setDeletesPctAllowed(20.0);
200+
tieredMergePolicy.setNoCFSRatio(TieredMergePolicy.DEFAULT_NO_CFS_RATIO);
201+
}
202+
203+
/**
204+
* Sets the store instance and reinitializes adaptive settings
205+
*/
206+
public void setStore(Store store) {
207+
this.store = store;
208+
if (store != null) {
209+
initializeAdaptiveSettings();
210+
}
211+
}
212+
213+
/**
214+
* Updates merge settings based on runtime analysis of segment topology
215+
*/
216+
public void updateSettingsBasedOnAnalysis(
217+
org.opensearch.index.analysis.SegmentTopologyAnalyzer.MergePolicyRecommendations recommendations
218+
) {
219+
if (recommendations.hasVarianceIssue || recommendations.hasSkewIssue) {
220+
logger.info("Updating merge settings based on segment topology analysis");
221+
222+
// Apply recommended settings
223+
tieredMergePolicy.setMaxMergedSegmentMB(recommendations.recommendedMaxSegmentSize / (1024 * 1024));
224+
tieredMergePolicy.setFloorSegmentMB(recommendations.recommendedFloorSegmentSize / (1024 * 1024));
225+
226+
// Adjust segments per tier based on optimal count
227+
double newSegmentsPerTier = Math.max(5.0, Math.min(20.0, recommendations.optimalSegmentCount * 0.8));
228+
tieredMergePolicy.setSegmentsPerTier(newSegmentsPerTier);
229+
230+
logger.info(
231+
"Updated merge settings - max_segment: {}MB, floor_segment: {}MB, segments_per_tier: {}",
232+
recommendations.recommendedMaxSegmentSize / (1024 * 1024),
233+
recommendations.recommendedFloorSegmentSize / (1024 * 1024),
234+
newSegmentsPerTier
235+
);
236+
}
237+
}
238+
239+
@Override
240+
public org.apache.lucene.index.MergePolicy getMergePolicy() {
241+
return mergesEnabled ? tieredMergePolicy : org.apache.lucene.index.NoMergePolicy.INSTANCE;
242+
}
243+
244+
// Getters for testing
245+
public double getMaxMergedSegmentMB() {
246+
return tieredMergePolicy.getMaxMergedSegmentMB();
247+
}
248+
249+
public double getFloorSegmentMB() {
250+
return tieredMergePolicy.getFloorSegmentMB();
251+
}
252+
253+
public double getSegmentsPerTier() {
254+
return tieredMergePolicy.getSegmentsPerTier();
255+
}
256+
257+
private enum ShardSizeCategory {
258+
SMALL,
259+
MEDIUM,
260+
LARGE,
261+
VERY_LARGE
262+
}
263+
}

server/src/main/java/org/opensearch/index/IndexSettings.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,7 @@ public final class IndexSettings {
101101
public enum IndexMergePolicy {
102102
TIERED("tiered"),
103103
LOG_BYTE_SIZE("log_byte_size"),
104+
ADAPTIVE("adaptive"),
104105
DEFAULT_POLICY(IndexSettings.DEFAULT_POLICY);
105106

106107
private final String value;
@@ -847,6 +848,7 @@ public static IndexMergePolicy fromString(String text) {
847848
private final MergeSchedulerConfig mergeSchedulerConfig;
848849
private final TieredMergePolicyProvider tieredMergePolicyProvider;
849850
private final LogByteSizeMergePolicyProvider logByteSizeMergePolicyProvider;
851+
private final AdaptiveTieredMergePolicyProvider adaptiveTieredMergePolicyProvider;
850852
private final IndexSortConfig indexSortConfig;
851853
private final IndexScopedSettings scopedSettings;
852854
private long gcDeletesInMillis = DEFAULT_GC_DELETES.millis();
@@ -1085,6 +1087,7 @@ public IndexSettings(final IndexMetadata indexMetadata, final Settings nodeSetti
10851087
maxRegexLength = scopedSettings.get(MAX_REGEX_LENGTH_SETTING);
10861088
this.tieredMergePolicyProvider = new TieredMergePolicyProvider(logger, this);
10871089
this.logByteSizeMergePolicyProvider = new LogByteSizeMergePolicyProvider(logger, this);
1090+
this.adaptiveTieredMergePolicyProvider = new AdaptiveTieredMergePolicyProvider(logger, this);
10881091
this.indexSortConfig = new IndexSortConfig(this);
10891092
searchIdleAfter = scopedSettings.get(INDEX_SEARCH_IDLE_AFTER);
10901093
defaultPipeline = scopedSettings.get(DEFAULT_PIPELINE);
@@ -1853,6 +1856,9 @@ public MergePolicy getMergePolicy(boolean isTimeSeriesIndex) {
18531856
case LOG_BYTE_SIZE:
18541857
mergePolicyProvider = logByteSizeMergePolicyProvider;
18551858
break;
1859+
case ADAPTIVE:
1860+
mergePolicyProvider = adaptiveTieredMergePolicyProvider;
1861+
break;
18561862
case DEFAULT_POLICY:
18571863
if (isTimeSeriesIndex) {
18581864
String nodeScopedTimeSeriesIndexPolicy = TIME_SERIES_INDEX_MERGE_POLICY.get(nodeSettings);
@@ -1865,6 +1871,9 @@ public MergePolicy getMergePolicy(boolean isTimeSeriesIndex) {
18651871
case LOG_BYTE_SIZE:
18661872
mergePolicyProvider = logByteSizeMergePolicyProvider;
18671873
break;
1874+
case ADAPTIVE:
1875+
mergePolicyProvider = adaptiveTieredMergePolicyProvider;
1876+
break;
18681877
}
18691878
} else {
18701879
mergePolicyProvider = tieredMergePolicyProvider;
@@ -2165,4 +2174,11 @@ public boolean isDerivedSourceEnabledForTranslog() {
21652174
public boolean isDerivedSourceEnabled() {
21662175
return derivedSourceEnabled;
21672176
}
2177+
2178+
/**
2179+
* Returns the adaptive tiered merge policy provider
2180+
*/
2181+
public AdaptiveTieredMergePolicyProvider getAdaptiveTieredMergePolicyProvider() {
2182+
return adaptiveTieredMergePolicyProvider;
2183+
}
21682184
}

0 commit comments

Comments
 (0)