Skip to content

Commit

Permalink
Enabling term version check on local state for all ClusterManager Rea…
Browse files Browse the repository at this point in the history
…d Transport Actions (opensearch-project#14273) (opensearch-project#14869)

* enabling term version check on local state for all admin read actions

Signed-off-by: Rajiv Kumar Vaidyanathan <[email protected]>
Signed-off-by: Daniel (dB.) Doubrovkine <[email protected]>
Co-authored-by: Daniel (dB.) Doubrovkine <[email protected]>
Signed-off-by: kkewwei <[email protected]>
  • Loading branch information
2 people authored and kkewwei committed Jul 24, 2024
1 parent e455f62 commit 264384d
Show file tree
Hide file tree
Showing 23 changed files with 382 additions and 48 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Optimize TransportNodesAction to not send DiscoveryNodes for NodeStats, NodesInfo and ClusterStats call ([14749](https://github.com/opensearch-project/OpenSearch/pull/14749))
- Refactor remote-routing-table service inline with remote state interfaces([#14668](https://github.com/opensearch-project/OpenSearch/pull/14668))
- Reduce logging in DEBUG for MasterService:run ([#14795](https://github.com/opensearch-project/OpenSearch/pull/14795))
- Enabling term version check on local state for all ClusterManager Read Transport Actions ([#14273](https://github.com/opensearch-project/OpenSearch/pull/14273))

### Dependencies
- Update to Apache Lucene 9.11.1 ([#14042](https://github.com/opensearch-project/OpenSearch/pull/14042), [#14576](https://github.com/opensearch-project/OpenSearch/pull/14576))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,8 @@
import org.opensearch.action.search.SearchResponse;
import org.opensearch.action.search.SearchTransportService;
import org.opensearch.action.search.SearchType;
import org.opensearch.action.support.clustermanager.term.GetTermVersionAction;
import org.opensearch.action.support.clustermanager.term.GetTermVersionRequest;
import org.opensearch.action.support.replication.TransportReplicationActionTests;
import org.opensearch.action.termvectors.MultiTermVectorsAction;
import org.opensearch.action.termvectors.MultiTermVectorsRequest;
Expand Down Expand Up @@ -195,6 +197,7 @@ public void cleanUp() {
}

public void testGetFieldMappings() {

String getFieldMappingsShardAction = GetFieldMappingsAction.NAME + "[index][s]";
interceptTransportActions(getFieldMappingsShardAction);

Expand Down Expand Up @@ -545,13 +548,14 @@ public void testDeleteIndex() {
}

public void testGetMappings() {
interceptTransportActions(GetMappingsAction.NAME);

interceptTransportActions(GetTermVersionAction.NAME, GetMappingsAction.NAME);
GetMappingsRequest getMappingsRequest = new GetMappingsRequest().indices(randomIndicesOrAliases());
internalCluster().coordOnlyNodeClient().admin().indices().getMappings(getMappingsRequest).actionGet();

clearInterceptedActions();
assertSameIndices(getMappingsRequest, GetMappingsAction.NAME);

assertActionInvocation(GetTermVersionAction.NAME, GetTermVersionRequest.class);
assertNoActionInvocation(GetMappingsAction.NAME);
}

public void testPutMapping() {
Expand All @@ -565,8 +569,8 @@ public void testPutMapping() {
}

public void testGetSettings() {
interceptTransportActions(GetSettingsAction.NAME);

interceptTransportActions(GetSettingsAction.NAME);
GetSettingsRequest getSettingsRequest = new GetSettingsRequest().indices(randomIndicesOrAliases());
internalCluster().coordOnlyNodeClient().admin().indices().getSettings(getSettingsRequest).actionGet();

Expand Down Expand Up @@ -662,6 +666,21 @@ private static void assertSameIndices(IndicesRequest originalRequest, boolean op
}
}

private static void assertActionInvocation(String action, Class<? extends TransportRequest> requestClass) {
List<TransportRequest> requests = consumeTransportRequests(action);
assertFalse(requests.isEmpty());
for (TransportRequest internalRequest : requests) {
assertTrue(internalRequest.getClass() == requestClass);
}
}

private static void assertNoActionInvocation(String... actions) {
for (String action : actions) {
List<TransportRequest> requests = consumeTransportRequests(action);
assertTrue(requests.isEmpty());
}
}

private static void assertIndicesSubset(List<String> indices, String... actions) {
// indices returned by each bulk shard request need to be a subset of the original indices
for (String action : actions) {
Expand Down Expand Up @@ -781,7 +800,6 @@ public List<TransportInterceptor> getTransportInterceptors(
}

private final Set<String> actions = new HashSet<>();

private final Map<String, List<TransportRequest>> requests = new HashMap<>();

@Override
Expand Down Expand Up @@ -831,6 +849,7 @@ public void messageReceived(T request, TransportChannel channel, Task task) thro
}
}
requestHandler.messageReceived(request, channel, task);

}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,19 @@
import org.apache.logging.log4j.Logger;
import org.opensearch.action.admin.indices.alias.get.GetAliasesRequest;
import org.opensearch.action.admin.indices.alias.get.GetAliasesResponse;
import org.opensearch.action.support.clustermanager.term.GetTermVersionAction;
import org.opensearch.action.support.clustermanager.term.GetTermVersionResponse;
import org.opensearch.client.node.NodeClient;
import org.opensearch.cluster.coordination.ClusterStateTermVersion;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.core.concurrency.OpenSearchRejectedExecutionException;
import org.opensearch.core.rest.RestStatus;
import org.opensearch.node.IoUsageStats;
import org.opensearch.node.ResourceUsageCollectorService;
import org.opensearch.node.resource.tracker.ResourceTrackerSettings;
import org.opensearch.plugins.Plugin;
import org.opensearch.ratelimitting.admissioncontrol.controllers.CpuBasedAdmissionController;
import org.opensearch.ratelimitting.admissioncontrol.enums.AdmissionControlActionType;
import org.opensearch.ratelimitting.admissioncontrol.enums.AdmissionControlMode;
Expand All @@ -29,9 +34,13 @@
import org.opensearch.rest.action.admin.indices.RestGetAliasesAction;
import org.opensearch.test.OpenSearchIntegTestCase;
import org.opensearch.test.rest.FakeRestRequest;
import org.opensearch.test.transport.MockTransportService;
import org.opensearch.transport.TransportService;
import org.junit.Before;

import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicReference;
Expand Down Expand Up @@ -62,6 +71,10 @@ public class AdmissionForClusterManagerIT extends OpenSearchIntegTestCase {
.put(CLUSTER_ADMIN_CPU_USAGE_LIMIT.getKey(), 50)
.build();

protected Collection<Class<? extends Plugin>> nodePlugins() {
return List.of(MockTransportService.TestPlugin.class);
}

@Before
public void init() {
String clusterManagerNode = internalCluster().startClusterManagerOnlyNode(
Expand All @@ -79,15 +92,34 @@ public void init() {

// Enable admission control
client().admin().cluster().prepareUpdateSettings().setTransientSettings(ENFORCE_ADMISSION_CONTROL).execute().actionGet();
MockTransportService primaryService = (MockTransportService) internalCluster().getInstance(
TransportService.class,
clusterManagerNode
);

// Force always fetch from ClusterManager
ClusterService clusterService = internalCluster().clusterService();
GetTermVersionResponse oosTerm = new GetTermVersionResponse(
new ClusterStateTermVersion(
clusterService.state().getClusterName(),
clusterService.state().metadata().clusterUUID(),
clusterService.state().term() - 1,
clusterService.state().version() - 1
)
);
primaryService.addRequestHandlingBehavior(
GetTermVersionAction.NAME,
(handler, request, channel, task) -> channel.sendResponse(oosTerm)
);
}

public void testAdmissionControlEnforced() throws Exception {
cMResourceCollector.collectNodeResourceUsageStats(clusterManagerNodeId, System.currentTimeMillis(), 97, 99, new IoUsageStats(98));

// Write API on ClusterManager
assertAcked(prepareCreate("test").setMapping("field", "type=text").setAliases("{\"alias1\" : {}}"));

// Read API on ClusterManager

GetAliasesRequest aliasesRequest = new GetAliasesRequest();
aliasesRequest.aliases("alias1");
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,8 @@ public TransportGetDecommissionStateAction(
threadPool,
actionFilters,
GetDecommissionStateRequest::new,
indexNameExpressionResolver
indexNameExpressionResolver,
true
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -534,4 +534,9 @@ private ClusterHealthResponse clusterHealth(
pendingTaskTimeInQueue
);
}

@Override
protected boolean localExecuteSupportedByAction() {
return false;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,8 @@ public TransportGetRepositoriesAction(
threadPool,
actionFilters,
GetRepositoriesRequest::new,
indexNameExpressionResolver
indexNameExpressionResolver,
true
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,8 @@ public TransportClusterSearchShardsAction(
threadPool,
actionFilters,
ClusterSearchShardsRequest::new,
indexNameExpressionResolver
indexNameExpressionResolver,
true
);
this.indicesService = indicesService;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,8 @@ public TransportGetWeightedRoutingAction(
threadPool,
actionFilters,
ClusterGetWeightedRoutingRequest::new,
indexNameExpressionResolver
indexNameExpressionResolver,
true
);
this.weightedRoutingService = weightedRoutingService;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ public TransportClusterStateAction(
ClusterStateRequest::new,
indexNameExpressionResolver
);
this.localExecuteSupported = true;
}

@Override
Expand Down Expand Up @@ -233,9 +234,4 @@ private ClusterStateResponse buildResponse(final ClusterStateRequest request, fi

return new ClusterStateResponse(currentState.getClusterName(), builder.build(), false);
}

@Override
protected boolean localExecuteSupportedByAction() {
return true;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,8 @@ public TransportGetStoredScriptAction(
threadPool,
actionFilters,
GetStoredScriptRequest::new,
indexNameExpressionResolver
indexNameExpressionResolver,
true
);
this.scriptService = scriptService;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,4 +110,9 @@ protected void clusterManagerOperation(
logger.trace("done fetching pending tasks from cluster service");
listener.onResponse(new PendingClusterTasksResponse(pendingTasks));
}

@Override
protected boolean localExecuteSupportedByAction() {
return false;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,8 @@ public TransportGetAliasesAction(
threadPool,
actionFilters,
GetAliasesRequest::new,
indexNameExpressionResolver
indexNameExpressionResolver,
true
);
this.systemIndices = systemIndices;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,8 @@ public TransportIndicesExistsAction(
threadPool,
actionFilters,
IndicesExistsRequest::new,
indexNameExpressionResolver
indexNameExpressionResolver,
true
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,8 @@ public TransportIndicesShardStoresAction(
threadPool,
actionFilters,
IndicesShardStoresRequest::new,
indexNameExpressionResolver
indexNameExpressionResolver,
true
);
this.listShardStoresInfo = listShardStoresInfo;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,8 @@ public TransportGetComponentTemplateAction(
threadPool,
actionFilters,
GetComponentTemplateAction.Request::new,
indexNameExpressionResolver
indexNameExpressionResolver,
true
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,8 @@ public TransportGetComposableIndexTemplateAction(
threadPool,
actionFilters,
GetComposableIndexTemplateAction.Request::new,
indexNameExpressionResolver
indexNameExpressionResolver,
true
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,8 @@ public TransportGetIndexTemplatesAction(
threadPool,
actionFilters,
GetIndexTemplatesRequest::new,
indexNameExpressionResolver
indexNameExpressionResolver,
true
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,8 @@ public GetPipelineTransportAction(
threadPool,
actionFilters,
GetPipelineRequest::new,
indexNameExpressionResolver
indexNameExpressionResolver,
true
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,8 @@ public GetSearchPipelineTransportAction(
threadPool,
actionFilters,
GetSearchPipelineRequest::new,
indexNameExpressionResolver
indexNameExpressionResolver,
true
);
}

Expand Down
Loading

0 comments on commit 264384d

Please sign in to comment.