Skip to content

Commit

Permalink
enable term check selectively for transport actions based on code audit
Browse files Browse the repository at this point in the history
Signed-off-by: Rajiv Kumar Vaidyanathan <[email protected]>
  • Loading branch information
rajiv-kv committed Jul 19, 2024
1 parent 73ebd56 commit 86ef756
Show file tree
Hide file tree
Showing 19 changed files with 298 additions and 61 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@
import org.opensearch.action.search.SearchTransportService;
import org.opensearch.action.search.SearchType;
import org.opensearch.action.support.clustermanager.term.GetTermVersionAction;
import org.opensearch.action.support.clustermanager.term.GetTermVersionResponse;
import org.opensearch.action.support.clustermanager.term.GetTermVersionRequest;
import org.opensearch.action.support.replication.TransportReplicationActionTests;
import org.opensearch.action.termvectors.MultiTermVectorsAction;
import org.opensearch.action.termvectors.MultiTermVectorsRequest;
Expand All @@ -95,8 +95,6 @@
import org.opensearch.action.update.UpdateRequest;
import org.opensearch.action.update.UpdateResponse;
import org.opensearch.client.Requests;
import org.opensearch.cluster.ClusterName;
import org.opensearch.cluster.coordination.ClusterStateTermVersion;
import org.opensearch.cluster.metadata.IndexNameExpressionResolver;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.util.concurrent.ThreadContext;
Expand Down Expand Up @@ -128,7 +126,6 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Function;

import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;
Expand Down Expand Up @@ -551,14 +548,14 @@ public void testDeleteIndex() {
}

public void testGetMappings() {
interceptTransportActions(GetMappingsAction.NAME);
stubClusterTermResponse(internalCluster().getClusterManagerName());

interceptTransportActions(GetTermVersionAction.NAME, GetMappingsAction.NAME);
GetMappingsRequest getMappingsRequest = new GetMappingsRequest().indices(randomIndicesOrAliases());
internalCluster().coordOnlyNodeClient().admin().indices().getMappings(getMappingsRequest).actionGet();

clearInterceptedActions();
assertSameIndices(getMappingsRequest, GetMappingsAction.NAME);

assertActionInvocation(GetTermVersionAction.NAME, GetTermVersionRequest.class);
assertNoActionInvocation(GetTermVersionAction.NAME);
}

public void testPutMapping() {
Expand All @@ -574,7 +571,6 @@ public void testPutMapping() {
public void testGetSettings() {

interceptTransportActions(GetSettingsAction.NAME);
stubClusterTermResponse(internalCluster().getClusterManagerName());
GetSettingsRequest getSettingsRequest = new GetSettingsRequest().indices(randomIndicesOrAliases());
internalCluster().coordOnlyNodeClient().admin().indices().getSettings(getSettingsRequest).actionGet();

Expand Down Expand Up @@ -670,6 +666,21 @@ private static void assertSameIndices(IndicesRequest originalRequest, boolean op
}
}

private static void assertActionInvocation(String action, Class<? extends TransportRequest> requestClass) {
List<TransportRequest> requests = consumeTransportRequests(action);
assertFalse(requests.isEmpty());
for (TransportRequest internalRequest : requests) {
assertTrue(internalRequest.getClass() == requestClass);
}
}

private static void assertNoActionInvocation(String... actions) {
for (String action : actions) {
List<TransportRequest> requests = consumeTransportRequests(action);
assertTrue(requests.isEmpty());
}
}

private static void assertIndicesSubset(List<String> indices, String... actions) {
// indices returned by each bulk shard request need to be a subset of the original indices
for (String action : actions) {
Expand Down Expand Up @@ -789,8 +800,6 @@ public List<TransportInterceptor> getTransportInterceptors(
}

private final Set<String> actions = new HashSet<>();
private final Map<String, TransportRequestHandler> stubHandlers = new ConcurrentHashMap<>();

private final Map<String, List<TransportRequest>> requests = new HashMap<>();

@Override
Expand All @@ -813,11 +822,6 @@ synchronized void interceptTransportActions(String... actions) {

synchronized void clearInterceptedActions() {
actions.clear();
stubHandlers.clear();
}

synchronized void stub(String action, TransportRequestHandler handler) {
stubHandlers.put(action, handler);
}

private class InterceptingRequestHandler<T extends TransportRequest> implements TransportRequestHandler<T> {
Expand All @@ -844,25 +848,9 @@ public void messageReceived(T request, TransportChannel channel, Task task) thro
}
}
}
if (!stubHandlers.containsKey(action)) {
requestHandler.messageReceived(request, channel, task);
} else {
stubHandlers.get(action).messageReceived(request, channel, task);
}
requestHandler.messageReceived(request, channel, task);

}
}
}

private void stubClusterTermResponse(String master) {
PluginsService pluginsService = internalCluster().getInstance(PluginsService.class, master);
pluginsService.filterPlugins(InterceptingTransportService.TestPlugin.class).stream().findFirst().get().instance.stub(
GetTermVersionAction.NAME,
(request, channel, task) -> channel.sendResponse(
new GetTermVersionResponse(new ClusterStateTermVersion(new ClusterName("test"), "1", -1, -1))
)
);

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,8 @@ public TransportGetDecommissionStateAction(
threadPool,
actionFilters,
GetDecommissionStateRequest::new,
indexNameExpressionResolver
indexNameExpressionResolver,
true
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,8 @@ public TransportGetRepositoriesAction(
threadPool,
actionFilters,
GetRepositoriesRequest::new,
indexNameExpressionResolver
indexNameExpressionResolver,
true
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,8 @@ public TransportClusterSearchShardsAction(
threadPool,
actionFilters,
ClusterSearchShardsRequest::new,
indexNameExpressionResolver
indexNameExpressionResolver,
true
);
this.indicesService = indicesService;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,8 @@ public TransportGetWeightedRoutingAction(
threadPool,
actionFilters,
ClusterGetWeightedRoutingRequest::new,
indexNameExpressionResolver
indexNameExpressionResolver,
true
);
this.weightedRoutingService = weightedRoutingService;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ public TransportClusterStateAction(
ClusterStateRequest::new,
indexNameExpressionResolver
);
this.localExecuteSupported = true;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,8 @@ public TransportGetStoredScriptAction(
threadPool,
actionFilters,
GetStoredScriptRequest::new,
indexNameExpressionResolver
indexNameExpressionResolver,
true
);
this.scriptService = scriptService;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,8 @@ public TransportGetAliasesAction(
threadPool,
actionFilters,
GetAliasesRequest::new,
indexNameExpressionResolver
indexNameExpressionResolver,
true
);
this.systemIndices = systemIndices;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,8 @@ public TransportIndicesExistsAction(
threadPool,
actionFilters,
IndicesExistsRequest::new,
indexNameExpressionResolver
indexNameExpressionResolver,
true
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,8 @@ public TransportIndicesShardStoresAction(
threadPool,
actionFilters,
IndicesShardStoresRequest::new,
indexNameExpressionResolver
indexNameExpressionResolver,
true
);
this.listShardStoresInfo = listShardStoresInfo;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,8 @@ public TransportGetComponentTemplateAction(
threadPool,
actionFilters,
GetComponentTemplateAction.Request::new,
indexNameExpressionResolver
indexNameExpressionResolver,
true
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,8 @@ public TransportGetComposableIndexTemplateAction(
threadPool,
actionFilters,
GetComposableIndexTemplateAction.Request::new,
indexNameExpressionResolver
indexNameExpressionResolver,
true
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,8 @@ public TransportGetIndexTemplatesAction(
threadPool,
actionFilters,
GetIndexTemplatesRequest::new,
indexNameExpressionResolver
indexNameExpressionResolver,
true
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,8 @@ public GetPipelineTransportAction(
threadPool,
actionFilters,
GetPipelineRequest::new,
indexNameExpressionResolver
indexNameExpressionResolver,
true
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,8 @@ public GetSearchPipelineTransportAction(
threadPool,
actionFilters,
GetSearchPipelineRequest::new,
indexNameExpressionResolver
indexNameExpressionResolver,
true
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,14 +51,17 @@ public abstract class TransportClusterManagerNodeReadAction<
Request extends ClusterManagerNodeReadRequest<Request>,
Response extends ActionResponse> extends TransportClusterManagerNodeAction<Request, Response> {

protected boolean localExecuteSupported = false;

protected TransportClusterManagerNodeReadAction(
String actionName,
TransportService transportService,
ClusterService clusterService,
ThreadPool threadPool,
ActionFilters actionFilters,
Writeable.Reader<Request> request,
IndexNameExpressionResolver indexNameExpressionResolver
IndexNameExpressionResolver indexNameExpressionResolver,
boolean localExecuteSupported
) {
this(
actionName,
Expand All @@ -71,6 +74,19 @@ protected TransportClusterManagerNodeReadAction(
request,
indexNameExpressionResolver
);
this.localExecuteSupported = localExecuteSupported;
}

protected TransportClusterManagerNodeReadAction(
String actionName,
TransportService transportService,
ClusterService clusterService,
ThreadPool threadPool,
ActionFilters actionFilters,
Writeable.Reader<Request> request,
IndexNameExpressionResolver indexNameExpressionResolver
) {
this(actionName, transportService, clusterService, threadPool, actionFilters, request, indexNameExpressionResolver, false);
}

protected TransportClusterManagerNodeReadAction(
Expand Down Expand Up @@ -126,7 +142,7 @@ protected final boolean localExecute(Request request) {
}

protected boolean localExecuteSupportedByAction() {
return true;
return localExecuteSupported;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ public TransportClusterInfoAction(
IndexNameExpressionResolver indexNameExpressionResolver
) {
super(actionName, transportService, clusterService, threadPool, actionFilters, request, indexNameExpressionResolver);
this.localExecuteSupported = true;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.transport.TransportService;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;

import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -103,19 +104,9 @@ public void tearDown() throws Exception {

public void testIncludeDefaults() {
GetIndexRequest defaultsRequest = new GetIndexRequest().indices(indexName).includeDefaults(true);
getIndexAction.execute(
null,
defaultsRequest,
ActionListener.wrap(
defaultsResponse -> assertNotNull(
"index.refresh_interval should be set as we are including defaults",
defaultsResponse.getSetting(indexName, "index.refresh_interval")
),
exception -> {
throw new AssertionError(exception);
}
)
);
getIndexAction.execute(null, defaultsRequest, ActionListener.wrap(Assert::assertNotNull, exception -> {
throw new AssertionError(exception);
}));
}

public void testDoNotIncludeDefaults() {
Expand Down
Loading

0 comments on commit 86ef756

Please sign in to comment.