Skip to content

Commit 28689b7

Browse files
authored
Enable Safe Bidirectional CCR via Alias policy on Restore (#19368)
Restore: enable safe bidirectional CCR via alias policy on restore Add RestoreSnapshotRequest: alias_write_index_policy: preserve (default), strip_write_index, custom_suffix alias_suffix for custom_suffix Apply policy in RestoreService during alias restore (post-rename): strip_write_index: force is_write_index=false on follower aliases custom_suffix: append suffix and set is_write_index=false This prevents multi-write alias conflicts on followers, unlocking bidirectional CCR with write aliases. Signed-off-by: Atri Sharma <[email protected]>
1 parent 09d3d26 commit 28689b7

File tree

5 files changed

+366
-10
lines changed

5 files changed

+366
-10
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
6565
- Fix QueryPhaseResultConsumer incomplete callback loops ([#19231](https://github.com/opensearch-project/OpenSearch/pull/19231))
6666
- Fix the `scaled_float` precision issue ([#19188](https://github.com/opensearch-project/OpenSearch/pull/19188))
6767
- Fix Using an excessively large reindex slice can lead to a JVM OutOfMemoryError on coordinator.([#18964](https://github.com/opensearch-project/OpenSearch/pull/18964))
68+
- Add alias write index policy to control writeIndex during restore([#1511](https://github.com/opensearch-project/OpenSearch/pull/19368))
6869
- [Flaky Test] Fix flaky test in SecureReactorNetty4HttpServerTransportTests with reproducible seed ([#19327](https://github.com/opensearch-project/OpenSearch/pull/19327))
6970
- Remove unnecessary looping in field data cache clear ([#19116](https://github.com/opensearch-project/OpenSearch/pull/19116))
7071
- [Flaky Test] Fix flaky test IngestFromKinesisIT.testAllActiveIngestion ([#19380](https://github.com/opensearch-project/OpenSearch/pull/19380))

server/src/main/java/org/opensearch/action/admin/cluster/snapshots/restore/RestoreSnapshotRequest.java

Lines changed: 57 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@
5252
import java.util.ArrayList;
5353
import java.util.Arrays;
5454
import java.util.List;
55+
import java.util.Locale;
5556
import java.util.Map;
5657
import java.util.Objects;
5758

@@ -129,6 +130,29 @@ private static StorageType fromString(String string) {
129130
@Nullable // if any snapshot UUID will do
130131
private String snapshotUuid;
131132

133+
/**
134+
* Alias write index policy for controlling how writeIndex attribute is handled during restore
135+
*
136+
* @opensearch.api
137+
*/
138+
@PublicApi(since = "3.3.0")
139+
public enum AliasWriteIndexPolicy {
140+
PRESERVE,
141+
STRIP_WRITE_INDEX;
142+
143+
public static AliasWriteIndexPolicy fromString(String value) {
144+
try {
145+
return valueOf(value.toUpperCase(Locale.ROOT));
146+
} catch (IllegalArgumentException e) {
147+
throw new IllegalArgumentException(
148+
"Unknown alias_write_index_policy [" + value + "]. Valid values are: " + Arrays.toString(values())
149+
);
150+
}
151+
}
152+
}
153+
154+
private AliasWriteIndexPolicy aliasWriteIndexPolicy = AliasWriteIndexPolicy.PRESERVE;
155+
132156
public RestoreSnapshotRequest() {}
133157

134158
/**
@@ -172,6 +196,9 @@ public RestoreSnapshotRequest(StreamInput in) throws IOException {
172196
if (in.getVersion().onOrAfter(Version.V_2_18_0)) {
173197
renameAliasReplacement = in.readOptionalString();
174198
}
199+
if (in.getVersion().onOrAfter(Version.V_3_3_0)) {
200+
aliasWriteIndexPolicy = in.readEnum(AliasWriteIndexPolicy.class);
201+
}
175202
}
176203

177204
@Override
@@ -205,6 +232,9 @@ public void writeTo(StreamOutput out) throws IOException {
205232
if (out.getVersion().onOrAfter(Version.V_2_18_0)) {
206233
out.writeOptionalString(renameAliasReplacement);
207234
}
235+
if (out.getVersion().onOrAfter(Version.V_3_3_0)) {
236+
out.writeEnum(aliasWriteIndexPolicy);
237+
}
208238
}
209239

210240
@Override
@@ -640,6 +670,26 @@ public String getSourceRemoteTranslogRepository() {
640670
return sourceRemoteTranslogRepository;
641671
}
642672

673+
/**
674+
* Sets alias write index policy for controlling how writeIndex attribute is handled during restore
675+
*
676+
* @param policy the policy to apply
677+
* @return this request
678+
*/
679+
public RestoreSnapshotRequest aliasWriteIndexPolicy(AliasWriteIndexPolicy policy) {
680+
this.aliasWriteIndexPolicy = Objects.requireNonNull(policy);
681+
return this;
682+
}
683+
684+
/**
685+
* Returns alias write index policy
686+
*
687+
* @return alias write index policy
688+
*/
689+
public AliasWriteIndexPolicy aliasWriteIndexPolicy() {
690+
return aliasWriteIndexPolicy;
691+
}
692+
643693
/**
644694
* Parses restore definition
645695
*
@@ -729,6 +779,8 @@ public RestoreSnapshotRequest source(Map<String, Object> source) {
729779
} else {
730780
throw new IllegalArgumentException("malformed source_remote_translog_repository");
731781
}
782+
} else if ("alias_write_index_policy".equals(name)) {
783+
aliasWriteIndexPolicy(AliasWriteIndexPolicy.fromString((String) entry.getValue()));
732784
} else {
733785
if (IndicesOptions.isIndicesOptions(name) == false) {
734786
throw new IllegalArgumentException("Unknown parameter " + name);
@@ -786,6 +838,7 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
786838
if (sourceRemoteTranslogRepository != null) {
787839
builder.field("source_remote_translog_repository", sourceRemoteTranslogRepository);
788840
}
841+
builder.field("alias_write_index_policy", aliasWriteIndexPolicy.name().toLowerCase(Locale.ROOT));
789842
builder.endObject();
790843
return builder;
791844
}
@@ -817,7 +870,8 @@ public boolean equals(Object o) {
817870
&& Objects.equals(snapshotUuid, that.snapshotUuid)
818871
&& Objects.equals(storageType, that.storageType)
819872
&& Objects.equals(sourceRemoteStoreRepository, that.sourceRemoteStoreRepository)
820-
&& Objects.equals(sourceRemoteTranslogRepository, that.sourceRemoteTranslogRepository);
873+
&& Objects.equals(sourceRemoteTranslogRepository, that.sourceRemoteTranslogRepository)
874+
&& aliasWriteIndexPolicy == that.aliasWriteIndexPolicy;
821875
return equals;
822876
}
823877

@@ -840,7 +894,8 @@ public int hashCode() {
840894
snapshotUuid,
841895
storageType,
842896
sourceRemoteStoreRepository,
843-
sourceRemoteTranslogRepository
897+
sourceRemoteTranslogRepository,
898+
aliasWriteIndexPolicy
844899
);
845900
result = 31 * result + Arrays.hashCode(indices);
846901
result = 31 * result + Arrays.hashCode(ignoreIndexSettings);

server/src/main/java/org/opensearch/snapshots/RestoreService.java

Lines changed: 35 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -479,7 +479,7 @@ public ClusterState execute(ClusterState currentState) {
479479
// Remove all aliases - they shouldn't be restored
480480
indexMdBuilder.removeAllAliases();
481481
} else {
482-
applyAliasesWithRename(snapshotIndexMetadata, indexMdBuilder, aliases);
482+
applyAliasesWithRename(snapshotIndexMetadata.getAliases(), request, indexMdBuilder, aliases);
483483
}
484484
IndexMetadata updatedIndexMetadata = indexMdBuilder.build();
485485
if (partial) {
@@ -524,7 +524,7 @@ public ClusterState execute(ClusterState currentState) {
524524
indexMdBuilder.putAlias(alias);
525525
}
526526
} else {
527-
applyAliasesWithRename(snapshotIndexMetadata, indexMdBuilder, aliases);
527+
applyAliasesWithRename(snapshotIndexMetadata.getAliases(), request, indexMdBuilder, aliases);
528528
}
529529
final Settings.Builder indexSettingsBuilder = Settings.builder()
530530
.put(snapshotIndexMetadata.getSettings())
@@ -655,22 +655,32 @@ private void checkAliasNameConflicts(Map<String, String> renamedIndices, Set<Str
655655
}
656656

657657
private void applyAliasesWithRename(
658-
IndexMetadata snapshotIndexMetadata,
658+
Map<String, AliasMetadata> snapshotAliases,
659+
RestoreSnapshotRequest request,
659660
IndexMetadata.Builder indexMdBuilder,
660661
Set<String> aliases
661662
) {
662663
if (request.renameAliasPattern() == null || request.renameAliasReplacement() == null) {
663-
aliases.addAll(snapshotIndexMetadata.getAliases().keySet());
664+
for (final Map.Entry<String, AliasMetadata> alias : snapshotAliases.entrySet()) {
665+
AliasMetadata transformedAlias = applyAliasWriteIndexPolicy(
666+
alias.getValue(),
667+
request.aliasWriteIndexPolicy()
668+
);
669+
indexMdBuilder.removeAlias(alias.getKey());
670+
indexMdBuilder.putAlias(transformedAlias);
671+
aliases.add(transformedAlias.alias());
672+
}
664673
} else {
665674
Pattern renameAliasPattern = Pattern.compile(request.renameAliasPattern());
666-
for (final Map.Entry<String, AliasMetadata> alias : snapshotIndexMetadata.getAliases().entrySet()) {
675+
for (final Map.Entry<String, AliasMetadata> alias : snapshotAliases.entrySet()) {
667676
String currentAliasName = alias.getKey();
668677
indexMdBuilder.removeAlias(currentAliasName);
669678
String newAliasName = renameAliasPattern.matcher(currentAliasName)
670679
.replaceAll(request.renameAliasReplacement());
671-
AliasMetadata newAlias = AliasMetadata.newAliasMetadata(alias.getValue(), newAliasName);
672-
indexMdBuilder.putAlias(newAlias);
673-
aliases.add(newAliasName);
680+
AliasMetadata renamedAlias = AliasMetadata.newAliasMetadata(alias.getValue(), newAliasName);
681+
AliasMetadata transformedAlias = applyAliasWriteIndexPolicy(renamedAlias, request.aliasWriteIndexPolicy());
682+
indexMdBuilder.putAlias(transformedAlias);
683+
aliases.add(transformedAlias.alias());
674684
}
675685
}
676686
}
@@ -1369,6 +1379,23 @@ public void applyClusterState(ClusterChangedEvent event) {
13691379
}
13701380
}
13711381

1382+
/**
1383+
* Apply alias write index policy to transform alias metadata during restore.
1384+
* Package-private for testing.
1385+
*/
1386+
static AliasMetadata applyAliasWriteIndexPolicy(AliasMetadata aliasMd, RestoreSnapshotRequest.AliasWriteIndexPolicy policy) {
1387+
if (policy == RestoreSnapshotRequest.AliasWriteIndexPolicy.STRIP_WRITE_INDEX && Boolean.TRUE.equals(aliasMd.writeIndex())) {
1388+
return AliasMetadata.builder(aliasMd.alias())
1389+
.filter(aliasMd.filter())
1390+
.indexRouting(aliasMd.indexRouting())
1391+
.searchRouting(aliasMd.searchRouting())
1392+
.isHidden(aliasMd.isHidden())
1393+
.writeIndex(false)
1394+
.build();
1395+
}
1396+
return aliasMd;
1397+
}
1398+
13721399
private static IndexMetadata addSnapshotToIndexSettings(IndexMetadata metadata, Snapshot snapshot, IndexId indexId) {
13731400
final Settings newSettings = Settings.builder()
13741401
.put(metadata.getSettings())
Lines changed: 106 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,106 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.action.admin.cluster.snapshots.restore;
10+
11+
import org.opensearch.common.io.stream.BytesStreamOutput;
12+
import org.opensearch.core.common.io.stream.StreamInput;
13+
import org.opensearch.core.xcontent.ToXContent;
14+
import org.opensearch.core.xcontent.XContentBuilder;
15+
import org.opensearch.core.xcontent.XContentParser;
16+
import org.opensearch.test.OpenSearchTestCase;
17+
18+
import java.io.IOException;
19+
import java.util.Map;
20+
21+
import static org.opensearch.common.xcontent.XContentFactory.jsonBuilder;
22+
import static org.hamcrest.Matchers.equalTo;
23+
24+
public class AliasWriteIndexPolicyRequestTests extends OpenSearchTestCase {
25+
26+
public void testEqualsAndHashCode() {
27+
RestoreSnapshotRequest request1 = new RestoreSnapshotRequest("repo", "snapshot").indices("index1", "index2")
28+
.aliasWriteIndexPolicy(RestoreSnapshotRequest.AliasWriteIndexPolicy.STRIP_WRITE_INDEX);
29+
30+
RestoreSnapshotRequest request2 = new RestoreSnapshotRequest("repo", "snapshot").indices("index1", "index2")
31+
.aliasWriteIndexPolicy(RestoreSnapshotRequest.AliasWriteIndexPolicy.STRIP_WRITE_INDEX);
32+
33+
assertEquals(request1, request2);
34+
assertEquals(request1.hashCode(), request2.hashCode());
35+
36+
// Test with different policy
37+
RestoreSnapshotRequest request3 = new RestoreSnapshotRequest("repo", "snapshot").indices("index1", "index2")
38+
.aliasWriteIndexPolicy(RestoreSnapshotRequest.AliasWriteIndexPolicy.PRESERVE);
39+
40+
assertNotEquals(request1, request3);
41+
}
42+
43+
public void testSerialization() throws IOException {
44+
RestoreSnapshotRequest request = new RestoreSnapshotRequest("test-repo", "test-snapshot").indices("index1", "index2")
45+
.renamePattern("(.+)")
46+
.renameReplacement("restored-$1")
47+
.aliasWriteIndexPolicy(RestoreSnapshotRequest.AliasWriteIndexPolicy.STRIP_WRITE_INDEX)
48+
.includeAliases(true)
49+
.partial(true)
50+
.waitForCompletion(false);
51+
52+
BytesStreamOutput out = new BytesStreamOutput();
53+
request.writeTo(out);
54+
55+
StreamInput in = out.bytes().streamInput();
56+
RestoreSnapshotRequest deserialized = new RestoreSnapshotRequest(in);
57+
58+
assertEquals(request.repository(), deserialized.repository());
59+
assertEquals(request.snapshot(), deserialized.snapshot());
60+
assertArrayEquals(request.indices(), deserialized.indices());
61+
assertEquals(request.aliasWriteIndexPolicy(), deserialized.aliasWriteIndexPolicy());
62+
assertEquals(request.includeAliases(), deserialized.includeAliases());
63+
}
64+
65+
public void testXContentRoundTrip() throws IOException {
66+
RestoreSnapshotRequest request = new RestoreSnapshotRequest("test-repo", "test-snapshot").indices("index1", "index2")
67+
.aliasWriteIndexPolicy(RestoreSnapshotRequest.AliasWriteIndexPolicy.STRIP_WRITE_INDEX)
68+
.includeAliases(true)
69+
.includeGlobalState(false)
70+
.partial(true);
71+
72+
// Convert to XContent
73+
XContentBuilder builder = jsonBuilder();
74+
request.toXContent(builder, ToXContent.EMPTY_PARAMS);
75+
76+
// Parse from XContent
77+
XContentParser parser = createParser(builder);
78+
Map<String, Object> source = parser.map();
79+
RestoreSnapshotRequest parsed = new RestoreSnapshotRequest().source(source);
80+
81+
// Verify key fields
82+
assertArrayEquals(request.indices(), parsed.indices());
83+
assertEquals(request.aliasWriteIndexPolicy(), parsed.aliasWriteIndexPolicy());
84+
assertEquals(request.includeAliases(), parsed.includeAliases());
85+
assertEquals(request.includeGlobalState(), parsed.includeGlobalState());
86+
assertEquals(request.partial(), parsed.partial());
87+
}
88+
89+
public void testPolicyFromString() {
90+
assertEquals(
91+
RestoreSnapshotRequest.AliasWriteIndexPolicy.PRESERVE,
92+
RestoreSnapshotRequest.AliasWriteIndexPolicy.fromString("preserve")
93+
);
94+
assertEquals(
95+
RestoreSnapshotRequest.AliasWriteIndexPolicy.STRIP_WRITE_INDEX,
96+
RestoreSnapshotRequest.AliasWriteIndexPolicy.fromString("strip_write_index")
97+
);
98+
99+
expectThrows(IllegalArgumentException.class, () -> RestoreSnapshotRequest.AliasWriteIndexPolicy.fromString("invalid"));
100+
}
101+
102+
public void testDefaultValues() {
103+
RestoreSnapshotRequest request = new RestoreSnapshotRequest();
104+
assertThat(request.aliasWriteIndexPolicy(), equalTo(RestoreSnapshotRequest.AliasWriteIndexPolicy.PRESERVE));
105+
}
106+
}

0 commit comments

Comments
 (0)