Skip to content

Commit

Permalink
Adding _list/shards API
Browse files Browse the repository at this point in the history
Signed-off-by: Harsh Garg <[email protected]>
  • Loading branch information
Harsh Garg committed Sep 26, 2024
1 parent 12dadcf commit 437b87f
Show file tree
Hide file tree
Showing 21 changed files with 951 additions and 30 deletions.
14 changes: 13 additions & 1 deletion server/src/main/java/org/opensearch/action/ActionModule.java
Original file line number Diff line number Diff line change
Expand Up @@ -461,6 +461,9 @@
import org.opensearch.rest.action.ingest.RestGetPipelineAction;
import org.opensearch.rest.action.ingest.RestPutPipelineAction;
import org.opensearch.rest.action.ingest.RestSimulatePipelineAction;
import org.opensearch.rest.action.list.AbstractListAction;
import org.opensearch.rest.action.list.RestListAction;
import org.opensearch.rest.action.list.RestShardsListAction;
import org.opensearch.rest.action.search.RestClearScrollAction;
import org.opensearch.rest.action.search.RestCountAction;
import org.opensearch.rest.action.search.RestCreatePitAction;
Expand Down Expand Up @@ -802,9 +805,14 @@ private ActionFilters setupActionFilters(List<ActionPlugin> actionPlugins) {

public void initRestHandlers(Supplier<DiscoveryNodes> nodesInCluster) {
List<AbstractCatAction> catActions = new ArrayList<>();
List<AbstractListAction> listActions = new ArrayList<>();
Consumer<RestHandler> registerHandler = handler -> {
if (handler instanceof AbstractCatAction) {
catActions.add((AbstractCatAction) handler);
if (handler instanceof AbstractListAction && ((AbstractListAction) handler).isActionPaginated()) {
listActions.add((AbstractListAction) handler);
} else {
catActions.add((AbstractCatAction) handler);
}
}
restController.registerHandler(handler);
};
Expand Down Expand Up @@ -980,6 +988,9 @@ public void initRestHandlers(Supplier<DiscoveryNodes> nodesInCluster) {
}
registerHandler.accept(new RestTemplatesAction());

// LIST API
registerHandler.accept(new RestShardsListAction());

// Point in time API
registerHandler.accept(new RestCreatePitAction());
registerHandler.accept(new RestDeletePitAction());
Expand Down Expand Up @@ -1011,6 +1022,7 @@ public void initRestHandlers(Supplier<DiscoveryNodes> nodesInCluster) {
}
}
registerHandler.accept(new RestCatAction(catActions));
registerHandler.accept(new RestListAction(listActions));
registerHandler.accept(new RestDecommissionAction());
registerHandler.accept(new RestGetDecommissionStateAction());
registerHandler.accept(new RestRemoteStoreStatsAction());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,15 @@

package org.opensearch.action.admin.cluster.shards;

import org.opensearch.Version;
import org.opensearch.action.ActionRequestValidationException;
import org.opensearch.action.support.clustermanager.ClusterManagerNodeReadRequest;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.core.common.io.stream.StreamOutput;
import org.opensearch.core.tasks.TaskId;
import org.opensearch.rest.action.admin.cluster.ClusterAdminTask;
import org.opensearch.rest.pagination.PageParams;

import java.io.IOException;
import java.util.Map;
Expand All @@ -27,11 +30,27 @@ public class CatShardsRequest extends ClusterManagerNodeReadRequest<CatShardsReq

private String[] indices;
private TimeValue cancelAfterTimeInterval;
private PageParams pageParams = null;

public CatShardsRequest() {}

public CatShardsRequest(StreamInput in) throws IOException {
super(in);
if (in.getVersion().onOrAfter(Version.V_3_0_0)) {
indices = in.readStringArray();
cancelAfterTimeInterval = in.readTimeValue();
pageParams = PageParams.readPageParams(in);
}
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
if (out.getVersion().onOrAfter(Version.V_3_0_0)) {
out.writeStringArray(indices);
out.writeTimeValue(cancelAfterTimeInterval);
pageParams.writePageParams(out);
}
}

@Override
Expand All @@ -55,6 +74,14 @@ public TimeValue getCancelAfterTimeInterval() {
return this.cancelAfterTimeInterval;
}

public void setPageParams(PageParams pageParams) {
this.pageParams = pageParams;
}

public PageParams getPageParams() {
return pageParams;
}

@Override
public ClusterAdminTask createTask(long id, String type, String action, TaskId parentTaskId, Map<String, String> headers) {
return new ClusterAdminTask(id, type, action, parentTaskId, headers, this.cancelAfterTimeInterval);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,18 @@

package org.opensearch.action.admin.cluster.shards;

import org.opensearch.Version;
import org.opensearch.action.admin.cluster.state.ClusterStateResponse;
import org.opensearch.action.admin.indices.stats.IndicesStatsResponse;
import org.opensearch.cluster.routing.ShardRouting;
import org.opensearch.core.action.ActionResponse;
import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.core.common.io.stream.StreamOutput;
import org.opensearch.rest.pagination.PageToken;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

/**
* A response of a cat shards request.
Expand All @@ -26,17 +31,29 @@ public class CatShardsResponse extends ActionResponse {
private ClusterStateResponse clusterStateResponse = null;

private IndicesStatsResponse indicesStatsResponse = null;
private List<ShardRouting> responseShards = new ArrayList<>();
private PageToken pageToken = null;

public CatShardsResponse() {}

public CatShardsResponse(StreamInput in) throws IOException {
super(in);
clusterStateResponse = new ClusterStateResponse(in);
indicesStatsResponse = new IndicesStatsResponse(in);
if (in.getVersion().onOrAfter(Version.V_3_0_0)) {
responseShards = in.readList(ShardRouting::new);
pageToken = PageToken.readPageToken(in);
}
}

@Override
public void writeTo(StreamOutput out) throws IOException {
clusterStateResponse.writeTo(out);
indicesStatsResponse.writeTo(out);
if (out.getVersion().onOrAfter(Version.V_3_0_0)) {
out.writeList(responseShards);
pageToken.writePageToken(out);
}
}

public void setClusterStateResponse(ClusterStateResponse clusterStateResponse) {
Expand All @@ -54,4 +71,20 @@ public void setIndicesStatsResponse(IndicesStatsResponse indicesStatsResponse) {
public IndicesStatsResponse getIndicesStatsResponse() {
return this.indicesStatsResponse;
}

public void setResponseShards(List<ShardRouting> responseShards) {
this.responseShards = responseShards;
}

public List<ShardRouting> getResponseShards() {
return this.responseShards;
}

public void setPageToken(PageToken pageToken) {
this.pageToken = pageToken;
}

public PageToken getPageToken() {
return this.pageToken;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,14 @@
import org.opensearch.common.inject.Inject;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.action.NotifyOnceListener;
import org.opensearch.rest.pagination.PageParams;
import org.opensearch.rest.pagination.ShardPaginationStrategy;
import org.opensearch.tasks.CancellableTask;
import org.opensearch.tasks.Task;
import org.opensearch.transport.TransportService;

import java.util.Objects;

/**
* Perform cat shards action
*
Expand All @@ -44,7 +48,11 @@ public void doExecute(Task parentTask, CatShardsRequest shardsRequest, ActionLis
clusterStateRequest.setShouldCancelOnTimeout(true);
clusterStateRequest.local(shardsRequest.local());
clusterStateRequest.clusterManagerNodeTimeout(shardsRequest.clusterManagerNodeTimeout());
clusterStateRequest.clear().nodes(true).routingTable(true).indices(shardsRequest.getIndices());
if (Objects.nonNull(shardsRequest.getPageParams())) {
clusterStateRequest.clear().nodes(true).routingTable(true).indices(shardsRequest.getIndices());
} else {
clusterStateRequest.clear().nodes(true).routingTable(true).indices(shardsRequest.getIndices()).metadata(true);
}
assert parentTask instanceof CancellableTask;
clusterStateRequest.setParentTask(client.getLocalNodeId(), parentTask.getId());

Expand Down Expand Up @@ -73,11 +81,21 @@ protected void innerOnFailure(Exception e) {
client.admin().cluster().state(clusterStateRequest, new ActionListener<ClusterStateResponse>() {
@Override
public void onResponse(ClusterStateResponse clusterStateResponse) {
ShardPaginationStrategy paginationStrategy = getPaginationStrategy(shardsRequest.getPageParams(), clusterStateResponse);
String[] indices = Objects.isNull(paginationStrategy)
? shardsRequest.getIndices()
: paginationStrategy.getRequestedIndices().toArray(new String[0]);
catShardsResponse.setClusterStateResponse(clusterStateResponse);
catShardsResponse.setResponseShards(
Objects.isNull(paginationStrategy)
? clusterStateResponse.getState().routingTable().allShards()
: paginationStrategy.getRequestedEntities()
);
catShardsResponse.setPageToken(Objects.isNull(paginationStrategy) ? null : paginationStrategy.getResponseToken());
IndicesStatsRequest indicesStatsRequest = new IndicesStatsRequest();
indicesStatsRequest.setShouldCancelOnTimeout(true);
indicesStatsRequest.all();
indicesStatsRequest.indices(shardsRequest.getIndices());
indicesStatsRequest.indices(indices);
indicesStatsRequest.setParentTask(client.getLocalNodeId(), parentTask.getId());
try {
client.admin().indices().stats(indicesStatsRequest, new ActionListener<IndicesStatsResponse>() {
Expand Down Expand Up @@ -107,4 +125,8 @@ public void onFailure(Exception e) {
}

}

private ShardPaginationStrategy getPaginationStrategy(PageParams pageParams, ClusterStateResponse clusterStateResponse) {
return Objects.isNull(pageParams) ? null : new ShardPaginationStrategy(pageParams, clusterStateResponse.getState());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ public class IndicesStatsResponse extends BroadcastResponse {

private Map<ShardRouting, ShardStats> shardStatsMap;

IndicesStatsResponse(StreamInput in) throws IOException {
public IndicesStatsResponse(StreamInput in) throws IOException {
super(in);
shards = in.readArray(ShardStats::new, (size) -> new ShardStats[size]);
}
Expand Down
15 changes: 15 additions & 0 deletions server/src/main/java/org/opensearch/common/Table.java
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@

import org.opensearch.common.time.DateFormatter;
import org.opensearch.core.common.Strings;
import org.opensearch.rest.pagination.PageToken;

import java.time.Instant;
import java.time.ZoneOffset;
Expand All @@ -59,9 +60,19 @@ public class Table {
private List<Cell> currentCells;
private boolean inHeaders = false;
private boolean withTime = false;
/**
* paginatedQueryResponse if null will imply the Table response is not paginated.
*/
private PageToken pageToken;
public static final String EPOCH = "epoch";
public static final String TIMESTAMP = "timestamp";

public Table() {}

public Table(@Nullable PageToken pageToken) {
this.pageToken = pageToken;
}

public Table startHeaders() {
inHeaders = true;
currentCells = new ArrayList<>();
Expand Down Expand Up @@ -230,6 +241,10 @@ public Map<String, String> getAliasMap() {
return headerAliasMap;
}

public PageToken getPageToken() {
return pageToken;
}

/**
* Cell in a table
*
Expand Down
7 changes: 7 additions & 0 deletions server/src/main/java/org/opensearch/rest/RestHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,13 @@ default boolean allowSystemIndexAccessByDefault() {
return false;
}

/**
* Denotes whether the RestHandler will output paginated responses or not.
*/
default boolean isActionPaginated() {
return false;
}

static RestHandler wrapper(RestHandler delegate) {
return new Wrapper(delegate);
}
Expand Down
8 changes: 8 additions & 0 deletions server/src/main/java/org/opensearch/rest/RestRequest.java
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
import org.opensearch.core.xcontent.XContentParser;
import org.opensearch.http.HttpChannel;
import org.opensearch.http.HttpRequest;
import org.opensearch.rest.pagination.PageParams;

import java.io.IOException;
import java.io.InputStream;
Expand All @@ -67,6 +68,9 @@

import static org.opensearch.common.unit.TimeValue.parseTimeValue;
import static org.opensearch.core.common.unit.ByteSizeValue.parseBytesSizeValue;
import static org.opensearch.rest.pagination.PageParams.PARAM_NEXT_TOKEN;
import static org.opensearch.rest.pagination.PageParams.PARAM_SIZE;
import static org.opensearch.rest.pagination.PageParams.PARAM_SORT;

/**
* REST Request
Expand Down Expand Up @@ -591,6 +595,10 @@ public static MediaType parseContentType(List<String> header) {
throw new IllegalArgumentException("empty Content-Type header");
}

public PageParams parsePaginatedQueryParams(String defaultSortOrder, int defaultPageSize) {
return new PageParams(param(PARAM_NEXT_TOKEN), param(PARAM_SORT, defaultSortOrder), paramAsInt(PARAM_SIZE, defaultPageSize));
}

/**
* Thrown if there is an error in the content type header.
*
Expand Down
Loading

0 comments on commit 437b87f

Please sign in to comment.