Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add capability to disable source recovery_source for an index #13590

Merged
merged 1 commit into from
Jun 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Add getMetadataFields to MapperService ([#13819](https://github.com/opensearch-project/OpenSearch/pull/13819))
- [Remote State] Add async remote state deletion task running on an interval, configurable by a setting ([#13131](https://github.com/opensearch-project/OpenSearch/pull/13131))
- Allow setting query parameters on requests ([#13776](https://github.com/opensearch-project/OpenSearch/issues/13776))
- Add capability to disable source recovery_source for an index ([#13590](https://github.com/opensearch-project/OpenSearch/pull/13590))
- Add remote routing table for remote state publication with experimental feature flag ([#13304](https://github.com/opensearch-project/OpenSearch/pull/13304))
- [Remote Store] Add support to disable flush based on translog reader count ([#14027](https://github.com/opensearch-project/OpenSearch/pull/14027))

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@

import org.opensearch.action.admin.cluster.health.ClusterHealthRequestBuilder;
import org.opensearch.action.admin.cluster.health.ClusterHealthResponse;
import org.opensearch.action.admin.indices.create.CreateIndexResponse;
import org.opensearch.action.admin.indices.recovery.RecoveryResponse;
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.metadata.IndexMetadata;
Expand All @@ -45,6 +46,8 @@
import org.opensearch.common.collect.MapBuilder;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.common.xcontent.XContentFactory;
import org.opensearch.core.xcontent.XContentBuilder;
import org.opensearch.indices.recovery.RecoveryState;
import org.opensearch.test.OpenSearchIntegTestCase.ClusterScope;
import org.opensearch.test.OpenSearchIntegTestCase.Scope;
Expand Down Expand Up @@ -253,4 +256,144 @@ public void testNoRebalanceOnRollingRestart() throws Exception {
);
}
}

public void testFullRollingRestart_withNoRecoveryPayloadAndSource() throws Exception {
internalCluster().startNode();
XContentBuilder builder = XContentFactory.jsonBuilder()
.startObject()
.startObject("_source")
.field("enabled")
.value(false)
.field("recovery_source_enabled")
.value(false)
.endObject()
.endObject();
CreateIndexResponse response = prepareCreate("test").setMapping(builder).get();
logger.info("Create index response is : {}", response);

final String healthTimeout = "1m";

for (int i = 0; i < 1000; i++) {
client().prepareIndex("test")
.setId(Long.toString(i))
.setSource(MapBuilder.<String, Object>newMapBuilder().put("test", "value" + i).map())
.execute()
.actionGet();
}

for (int i = 1000; i < 2000; i++) {
client().prepareIndex("test")
.setId(Long.toString(i))
.setSource(MapBuilder.<String, Object>newMapBuilder().put("test", "value" + i).map())
.execute()
.actionGet();
}
// ensuring all docs are committed to file system
flush();

logger.info("--> now start adding nodes");
internalCluster().startNode();
internalCluster().startNode();

// make sure the cluster state is green, and all has been recovered
assertTimeout(
client().admin()
.cluster()
.prepareHealth()
.setWaitForEvents(Priority.LANGUID)
.setTimeout(healthTimeout)
.setWaitForGreenStatus()
.setWaitForNoRelocatingShards(true)
.setWaitForNodes("3")
);

logger.info("--> add two more nodes");
internalCluster().startNode();
internalCluster().startNode();

// make sure the cluster state is green, and all has been recovered
assertTimeout(
client().admin()
.cluster()
.prepareHealth()
.setWaitForEvents(Priority.LANGUID)
.setTimeout(healthTimeout)
.setWaitForGreenStatus()
.setWaitForNoRelocatingShards(true)
.setWaitForNodes("5")
);

logger.info("--> refreshing and checking data");
refreshAndWaitForReplication();
for (int i = 0; i < 10; i++) {
assertHitCount(client().prepareSearch().setSize(0).setQuery(matchAllQuery()).get(), 2000L);
}

// now start shutting nodes down
internalCluster().stopRandomDataNode();
// make sure the cluster state is green, and all has been recovered
assertTimeout(
client().admin()
.cluster()
.prepareHealth()
.setWaitForEvents(Priority.LANGUID)
.setTimeout(healthTimeout)
.setWaitForGreenStatus()
.setWaitForNoRelocatingShards(true)
.setWaitForNodes("4")
);

internalCluster().stopRandomDataNode();
// make sure the cluster state is green, and all has been recovered
assertTimeout(
client().admin()
.cluster()
.prepareHealth()
.setWaitForEvents(Priority.LANGUID)
.setTimeout(healthTimeout)
.setWaitForGreenStatus()
.setWaitForNoRelocatingShards(true)
.setWaitForNodes("3")
);

logger.info("--> stopped two nodes, verifying data");
refreshAndWaitForReplication();
for (int i = 0; i < 10; i++) {
assertHitCount(client().prepareSearch().setSize(0).setQuery(matchAllQuery()).get(), 2000L);
}

// closing the 3rd node
internalCluster().stopRandomDataNode();
// make sure the cluster state is green, and all has been recovered
assertTimeout(
client().admin()
.cluster()
.prepareHealth()
.setWaitForEvents(Priority.LANGUID)
.setTimeout(healthTimeout)
.setWaitForGreenStatus()
.setWaitForNoRelocatingShards(true)
.setWaitForNodes("2")
);

internalCluster().stopRandomDataNode();

// make sure the cluster state is yellow, and all has been recovered
assertTimeout(
client().admin()
.cluster()
.prepareHealth()
.setWaitForEvents(Priority.LANGUID)
.setTimeout(healthTimeout)
.setWaitForYellowStatus()
.setWaitForNoRelocatingShards(true)
.setWaitForNodes("1")
);

logger.info("--> one node left, verifying data");
refreshAndWaitForReplication();
for (int i = 0; i < 10; i++) {
assertHitCount(client().prepareSearch().setSize(0).setQuery(matchAllQuery()).get(), 2000L);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ public class SourceFieldMapper extends MetadataFieldMapper {

public static final String CONTENT_TYPE = "_source";
private final Function<Map<String, ?>, Map<String, Object>> filter;
private final Function<Map<String, ?>, Map<String, Object>> recoverySourceFilter;

/**
* Default parameters for source fields
Expand Down Expand Up @@ -119,21 +120,75 @@ public static class Builder extends MetadataFieldMapper.Builder {
Collections.emptyList()
);

/**
* A mapping parameter which define whether the recovery_source should be added or not. Default value is true.
* <p>
* Recovery source gets added if source is disabled or there are filters that are applied on _source using
* {@link #includes}/{@link #excludes}, which has the possibility to change the original document provided by
* customer. Recovery source is not a permanent field and gets removed during merges. Refer this merge
* policy: org.opensearch.index.engine.RecoverySourcePruneMergePolicy
* <p>
* The main reason for adding the _recovery_source was to ensure Peer to Peer recovery if segments
* are not flushed to the disk. If you are disabling the recovery source, then ensure that you are calling
* flush operation of Opensearch periodically to ensure that segments are flushed to the disk and if required
* Peer to Peer recovery can happen using segment files rather than replaying traffic by querying Lucene
* snapshot.
*
* <p>
* This is an expert mapping parameter.
*
*/
private final Parameter<Boolean> recoverySourceEnabled = Parameter.boolParam(
"recovery_source_enabled",
false,
m -> toType(m).recoverySourceEnabled,
Defaults.ENABLED
);

/**
* Provides capability to add specific fields in the recovery_source.
* <p>
* Refer {@link #recoverySourceEnabled} for more details
* This is an expert parameter.
*/
private final Parameter<List<String>> recoverySourceIncludes = Parameter.stringArrayParam(
"recovery_source_includes",
false,
m -> Arrays.asList(toType(m).recoverySourceIncludes),
Collections.emptyList()
);

/**
* Provides capability to remove specific fields in the recovery_source.
*
* Refer {@link #recoverySourceEnabled} for more details
* This is an expert parameter.
*/
private final Parameter<List<String>> recoverySourceExcludes = Parameter.stringArrayParam(
"recovery_source_excludes",
false,
m -> Arrays.asList(toType(m).recoverySourceExcludes),
Collections.emptyList()
);
msfroh marked this conversation as resolved.
Show resolved Hide resolved

public Builder() {
super(Defaults.NAME);
}

@Override
protected List<Parameter<?>> getParameters() {
return Arrays.asList(enabled, includes, excludes);
return Arrays.asList(enabled, includes, excludes, recoverySourceEnabled, recoverySourceIncludes, recoverySourceExcludes);
}

@Override
public SourceFieldMapper build(BuilderContext context) {
return new SourceFieldMapper(
enabled.getValue(),
includes.getValue().toArray(new String[0]),
excludes.getValue().toArray(new String[0])
excludes.getValue().toArray(new String[0]),
recoverySourceEnabled.getValue(),
recoverySourceIncludes.getValue().toArray(new String[0]),
recoverySourceExcludes.getValue().toArray(new String[0])
);
}
}
Expand Down Expand Up @@ -173,24 +228,44 @@ public Query termQuery(Object value, QueryShardContext context) {
}

private final boolean enabled;
private final boolean recoverySourceEnabled;
/** indicates whether the source will always exist and be complete, for use by features like the update API */
private final boolean complete;

private final String[] includes;
private final String[] excludes;
private final String[] recoverySourceIncludes;
private final String[] recoverySourceExcludes;

private SourceFieldMapper() {
this(Defaults.ENABLED, Strings.EMPTY_ARRAY, Strings.EMPTY_ARRAY);
this(Defaults.ENABLED, Strings.EMPTY_ARRAY, Strings.EMPTY_ARRAY, Defaults.ENABLED, Strings.EMPTY_ARRAY, Strings.EMPTY_ARRAY);
}

private SourceFieldMapper(boolean enabled, String[] includes, String[] excludes) {
private SourceFieldMapper(
boolean enabled,
String[] includes,
String[] excludes,
boolean recoverySourceEnabled,
String[] recoverySourceIncludes,
String[] recoverySourceExcludes
) {
super(new SourceFieldType(enabled));
this.enabled = enabled;
this.includes = includes;
this.excludes = excludes;
final boolean filtered = CollectionUtils.isEmpty(includes) == false || CollectionUtils.isEmpty(excludes) == false;
this.filter = enabled && filtered ? XContentMapValues.filter(includes, excludes) : null;
this.complete = enabled && CollectionUtils.isEmpty(includes) && CollectionUtils.isEmpty(excludes);

// Set parameters for recovery source
this.recoverySourceEnabled = recoverySourceEnabled;
this.recoverySourceIncludes = recoverySourceIncludes;
this.recoverySourceExcludes = recoverySourceExcludes;
final boolean recoverySourcefiltered = CollectionUtils.isEmpty(recoverySourceIncludes) == false
|| CollectionUtils.isEmpty(recoverySourceExcludes) == false;
this.recoverySourceFilter = this.recoverySourceEnabled && recoverySourcefiltered
? XContentMapValues.filter(recoverySourceIncludes, recoverySourceExcludes)
: null;
}

public boolean enabled() {
Expand All @@ -212,22 +287,40 @@ public void preParse(ParseContext context) throws IOException {
context.doc().add(new StoredField(fieldType().name(), ref.bytes, ref.offset, ref.length));
}

if (originalSource != null && adaptedSource != originalSource) {
// if we omitted source or modified it we add the _recovery_source to ensure we have it for ops based recovery
BytesRef ref = originalSource.toBytesRef();
context.doc().add(new StoredField(RECOVERY_SOURCE_NAME, ref.bytes, ref.offset, ref.length));
context.doc().add(new NumericDocValuesField(RECOVERY_SOURCE_NAME, 1));
if (recoverySourceEnabled) {
if (originalSource != null && adaptedSource != originalSource) {
final BytesReference adaptedRecoverySource = applyFilters(
originalSource,
contentType,
recoverySourceEnabled,
recoverySourceFilter
);
// if we omitted source or modified it we add the _recovery_source to ensure we have it for ops based recovery
BytesRef ref = adaptedRecoverySource.toBytesRef();
context.doc().add(new StoredField(RECOVERY_SOURCE_NAME, ref.bytes, ref.offset, ref.length));
context.doc().add(new NumericDocValuesField(RECOVERY_SOURCE_NAME, 1));
}
}
}

@Nullable
public BytesReference applyFilters(@Nullable BytesReference originalSource, @Nullable MediaType contentType) throws IOException {
if (enabled && originalSource != null) {
return applyFilters(originalSource, contentType, enabled, filter);
}

@Nullable
private BytesReference applyFilters(
@Nullable BytesReference originalSource,
@Nullable MediaType contentType,
boolean isProvidedSourceEnabled,
@Nullable final Function<Map<String, ?>, Map<String, Object>> filters
) throws IOException {
if (isProvidedSourceEnabled && originalSource != null) {
// Percolate and tv APIs may not set the source and that is ok, because these APIs will not index any data
if (filter != null) {
if (filters != null) {
// we don't update the context source if we filter, we want to keep it as is...
Tuple<? extends MediaType, Map<String, Object>> mapTuple = XContentHelper.convertToMap(originalSource, true, contentType);
Map<String, Object> filteredSource = filter.apply(mapTuple.v2());
Map<String, Object> filteredSource = filters.apply(mapTuple.v2());
BytesStreamOutput bStream = new BytesStreamOutput();
MediaType actualContentType = mapTuple.v1();
XContentBuilder builder = MediaTypeRegistry.contentBuilder(actualContentType, bStream).map(filteredSource);
Expand Down
Loading
Loading