Skip to content

Commit

Permalink
Implementing pagination for _cat/shards and metadata changes
Browse files Browse the repository at this point in the history
Signed-off-by: Harsh Garg <[email protected]>
  • Loading branch information
Harsh Garg committed Jul 4, 2024
1 parent 270054c commit 671bb92
Show file tree
Hide file tree
Showing 9 changed files with 214 additions and 19 deletions.
38 changes: 36 additions & 2 deletions server/src/main/java/org/opensearch/cluster/DiffableUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
Expand Down Expand Up @@ -88,6 +89,16 @@ public static <K, T extends Diffable<T>> MapDiff<K, T, Map<K, T>> diff(
return new JdkMapDiff<>(before, after, keySerializer, DiffableValueSerializer.getWriteOnlyInstance());
}

public static <K, T extends Diffable<T>> MapDiff<K, T, Map<K, T>> diff(
Map<K, T> before,
Map<K, T> after,
KeySerializer<K> keySerializer,
boolean maintainOrder
) {
assert after != null && before != null;
return new JdkMapDiff<>(before, after, keySerializer, DiffableValueSerializer.getWriteOnlyInstance(), maintainOrder);
}

/**
* Calculates diff between two Maps of non-diffable objects
*/
Expand Down Expand Up @@ -138,7 +149,17 @@ protected JdkMapDiff(StreamInput in, KeySerializer<K> keySerializer, ValueSerial
}

JdkMapDiff(Map<K, T> before, Map<K, T> after, KeySerializer<K> keySerializer, ValueSerializer<K, T> valueSerializer) {
super(keySerializer, valueSerializer);
this(before, after, keySerializer, valueSerializer, false);
}

JdkMapDiff(
Map<K, T> before,
Map<K, T> after,
KeySerializer<K> keySerializer,
ValueSerializer<K, T> valueSerializer,
boolean maintainOrder
) {
super(keySerializer, valueSerializer, maintainOrder);
assert after != null && before != null;

for (K key : before.keySet()) {
Expand All @@ -163,7 +184,7 @@ protected JdkMapDiff(StreamInput in, KeySerializer<K> keySerializer, ValueSerial

@Override
public Map<K, T> apply(Map<K, T> map) {
Map<K, T> builder = new HashMap<>(map);
Map<K, T> builder = maintainOrder ? new LinkedHashMap<>(map) : new HashMap<>(map);

for (K part : deletes) {
builder.remove(part);
Expand Down Expand Up @@ -198,13 +219,24 @@ public abstract static class MapDiff<K, T, M> implements Diff<M> {
protected final Map<K, T> upserts; // additions or full updates
protected final KeySerializer<K> keySerializer;
protected final ValueSerializer<K, T> valueSerializer;
protected final boolean maintainOrder;

protected MapDiff(KeySerializer<K> keySerializer, ValueSerializer<K, T> valueSerializer) {
this.keySerializer = keySerializer;
this.valueSerializer = valueSerializer;
deletes = new ArrayList<>();
diffs = new HashMap<>();
upserts = new HashMap<>();
this.maintainOrder = false;
}

protected MapDiff(KeySerializer<K> keySerializer, ValueSerializer<K, T> valueSerializer, boolean maintainOrder) {
this.keySerializer = keySerializer;
this.valueSerializer = valueSerializer;
deletes = new ArrayList<>();
diffs = new HashMap<>();
upserts = maintainOrder ? new LinkedHashMap<>() : new HashMap<>();
this.maintainOrder = maintainOrder;
}

protected MapDiff(
Expand All @@ -219,11 +251,13 @@ protected MapDiff(
this.deletes = deletes;
this.diffs = diffs;
this.upserts = upserts;
this.maintainOrder = false;
}

protected MapDiff(StreamInput in, KeySerializer<K> keySerializer, ValueSerializer<K, T> valueSerializer) throws IOException {
this.keySerializer = keySerializer;
this.valueSerializer = valueSerializer;
this.maintainOrder = false;
deletes = in.readList(keySerializer::readKey);
int diffsCount = in.readVInt();
diffs = diffsCount == 0 ? Collections.emptyMap() : new HashMap<>(diffsCount);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
Expand Down Expand Up @@ -307,7 +308,8 @@ static Custom fromXContent(XContentParser parser, String name) throws IOExceptio
this.persistentSettings = persistentSettings;
this.settings = Settings.builder().put(persistentSettings).put(transientSettings).build();
this.hashesOfConsistentSettings = hashesOfConsistentSettings;
this.indices = Collections.unmodifiableMap(indices);
LinkedHashMap<String, IndexMetadata> linkedMap = new LinkedHashMap<>(indices);
this.indices = Collections.unmodifiableMap(linkedMap);
this.customs = Collections.unmodifiableMap(customs);
this.templates = new TemplatesMetadata(templates);
int totalNumberOfShards = 0;
Expand Down Expand Up @@ -1039,7 +1041,7 @@ private static class MetadataDiff implements Diff<Metadata> {
transientSettings = after.transientSettings;
persistentSettings = after.persistentSettings;
hashesOfConsistentSettings = after.hashesOfConsistentSettings.diff(before.hashesOfConsistentSettings);
indices = DiffableUtils.diff(before.indices, after.indices, DiffableUtils.getStringKeySerializer());
indices = DiffableUtils.diff(before.indices, after.indices, DiffableUtils.getStringKeySerializer(), true);
templates = DiffableUtils.diff(
before.templates.getTemplates(),
after.templates.getTemplates(),
Expand Down Expand Up @@ -1183,7 +1185,7 @@ public static class Builder {

public Builder() {
clusterUUID = UNKNOWN_CLUSTER_UUID;
indices = new HashMap<>();
indices = new LinkedHashMap<>();
templates = new HashMap<>();
customs = new HashMap<>();
previousMetadata = null;
Expand All @@ -1198,7 +1200,7 @@ public Builder(Metadata metadata) {
this.persistentSettings = metadata.persistentSettings;
this.hashesOfConsistentSettings = metadata.hashesOfConsistentSettings;
this.version = metadata.version;
this.indices = new HashMap<>(metadata.indices);
this.indices = new LinkedHashMap<>(metadata.indices);
this.templates = new HashMap<>(metadata.templates.getTemplates());
this.customs = new HashMap<>(metadata.customs);
this.previousMetadata = metadata;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -652,6 +652,7 @@ private void callClusterStateListener(
clusterManagerMetrics.clusterStateListenersHistogram,
(double) Math.max(0, TimeValue.nsecToMSec(System.nanoTime() - listenerStartTimeNS)),
Optional.of(Tags.create().addTag("Operation", listener.getClass().getSimpleName()))

);
}
} catch (Exception ex) {
Expand Down
12 changes: 12 additions & 0 deletions server/src/main/java/org/opensearch/common/Table.java
Original file line number Diff line number Diff line change
Expand Up @@ -59,9 +59,21 @@ public class Table {
private List<Cell> currentCells;
private boolean inHeaders = false;
private boolean withTime = false;
public String nextToken = null;
public String paginatedElement = null;
public static final String EPOCH = "epoch";
public static final String TIMESTAMP = "timestamp";

public Table() {}

public Table(@Nullable String nextToken, @Nullable String paginatedElement) {
if (nextToken != null) {
assert paginatedElement != null : "Element which is getting paginated such as indices, shards or segments should be specified";
this.nextToken = nextToken;
this.paginatedElement = paginatedElement;
}
}

public Table startHeaders() {
inHeaders = true;
currentCells = new ArrayList<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -538,7 +538,7 @@ private void createIndices(final ClusterState state) {
for (Map.Entry<Index, List<ShardRouting>> entry : indicesToCreate.entrySet()) {
final Index index = entry.getKey();
final IndexMetadata indexMetadata = state.metadata().index(index);
logger.debug("[{}] creating index", index);
logger.info("[{}] creating index", index);

AllocatedIndex<? extends Shard> indexService = null;
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,6 @@ public RestChannelConsumer doCatRequest(final RestRequest request, final NodeCli
}
final TimeValue clusterManagerNodeTimeout = clusterManagerTimeout;
final boolean includeUnloadedSegments = request.paramAsBoolean("include_unloaded_segments", false);

return channel -> {
final ActionListener<Table> listener = ActionListener.notifyOnce(new RestResponseListener<Table>(channel) {
@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.opensearch.action.admin.indices.stats.IndicesStatsResponse;
import org.opensearch.action.admin.indices.stats.ShardStats;
import org.opensearch.client.node.NodeClient;
import org.opensearch.cluster.routing.IndexShardRoutingTable;
import org.opensearch.cluster.routing.ShardRouting;
import org.opensearch.cluster.routing.UnassignedInfo;
import org.opensearch.common.Table;
Expand Down Expand Up @@ -66,8 +67,12 @@
import org.opensearch.search.suggest.completion.CompletionStats;

import java.time.Instant;
import java.util.ArrayList;
import java.util.Base64;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.function.Function;

import static java.util.Arrays.asList;
Expand Down Expand Up @@ -106,24 +111,126 @@ protected void documentation(StringBuilder sb) {

@Override
public RestChannelConsumer doCatRequest(final RestRequest request, final NodeClient client) {
final String[] indices = Strings.splitStringByCommaToArray(request.param("index"));

String[] indices = new String[0];
final ClusterStateRequest clusterStateRequest = new ClusterStateRequest();
clusterStateRequest.local(request.paramAsBoolean("local", clusterStateRequest.local()));
clusterStateRequest.clusterManagerNodeTimeout(
request.paramAsTime("cluster_manager_timeout", clusterStateRequest.clusterManagerNodeTimeout())
);
parseDeprecatedMasterTimeoutParameter(clusterStateRequest, request, deprecationLogger, getName());
clusterStateRequest.clear().nodes(true).routingTable(true).indices(indices);
if (request.hasParam("nextToken")) {
// ToDo: Add validation on the nextToken passed in the request
// Need to get the metadata as well
request.param("nextToken");
clusterStateRequest.clear().nodes(true).routingTable(true).metadata(true);
} else {
// Only parse the "index" param if the request is not-paginated.
indices = Strings.splitStringByCommaToArray(request.param("index"));
clusterStateRequest.clear().nodes(true).routingTable(true).indices(indices);
}

String[] finalIndices = indices;
return channel -> client.admin().cluster().state(clusterStateRequest, new RestActionListener<ClusterStateResponse>(channel) {
@Override
public void processResponse(final ClusterStateResponse clusterStateResponse) {
String nextToken = null;
IndicesStatsRequest indicesStatsRequest = new IndicesStatsRequest();
indicesStatsRequest.all();
indicesStatsRequest.indices(indices);
final List<ShardRouting> shardRoutingResponseList = new ArrayList<>();
if (request.hasParam("nextToken")) {
final long defaultPageSize = (long) clusterStateResponse.getState().nodes().getDataNodes().size() + 1;
// Get the nextToken
final String nextTokenInRequest = Objects.equals(request.param("nextToken"), "null")
? "null"
: new String(Base64.getDecoder().decode(request.param("nextToken")));
List<String> sortedIndicesList = new ArrayList<String>(clusterStateResponse.getState().metadata().indices().keySet());

// Get the number of shards upto the maxPageSize
long shardCountSoFar = 0L;
List<String> indicesToBeQueried = new ArrayList<String>();

// Since all the shards for last ID would have already been sent in the last response,
// start iterating from the next shard for current page
int newPageStartShardID = "null".equals(nextTokenInRequest)
? 0
: Integer.parseInt(nextTokenInRequest.split("\\$")[0]) + 1;
// Since all the shards corresponding to the last processed index might not have been included in the last page,
// start iterating from the last index number itself
int newPageStartIndexNumber = "null".equals(nextTokenInRequest)
? 0
: Integer.parseInt(nextTokenInRequest.split("\\$")[1]);

int lastProcessedShardNumber = -1;
int lastProcessedIndexNumber = -1;
// ToDo: Handle case when index gets deleted. Select the first index with creationTime just greater than the last
// index's creationTime
int indexNumberInSortedList = newPageStartIndexNumber;
for (; indexNumberInSortedList < sortedIndicesList.size(); indexNumberInSortedList++) {
String index = sortedIndicesList.get(indexNumberInSortedList);
Map<Integer, IndexShardRoutingTable> indexShards = clusterStateResponse.getState()
.getRoutingTable()
.getIndicesRouting()
.get(index)
.getShards();
// If all the shards corresponding to the index were already processed, move to the next Index
if (indexNumberInSortedList == newPageStartIndexNumber && (newPageStartShardID > indexShards.size() - 1)) {
// ToDo: Add validation that the newPageStartShardID should not be greater than the
// newPageStartIndexShards.size()
newPageStartShardID = 0;
continue;
}
int lastProcessedShardNumberForCurrentIndex = -1;
int shardID = (indexNumberInSortedList == newPageStartIndexNumber) ? newPageStartShardID : 0;
for (; shardID < indexShards.size(); shardID++) {
shardCountSoFar += indexShards.get(shardID).shards().size();
if (shardCountSoFar > defaultPageSize) {
break;
}
shardRoutingResponseList.addAll(indexShards.get(shardID).shards());
lastProcessedShardNumberForCurrentIndex = shardID;
}

if (shardCountSoFar > defaultPageSize) {
if (lastProcessedShardNumberForCurrentIndex != -1) {
indicesToBeQueried.add(index);
lastProcessedIndexNumber = indexNumberInSortedList;
lastProcessedShardNumber = lastProcessedShardNumberForCurrentIndex;
}
break;
}
indicesToBeQueried.add(index);
lastProcessedShardNumber = lastProcessedShardNumberForCurrentIndex;
lastProcessedIndexNumber = indexNumberInSortedList;
}
nextToken = indexNumberInSortedList >= sortedIndicesList.size()
? "null"
: Base64.getEncoder()
.encodeToString(
(lastProcessedShardNumber
+ "$"
+ (lastProcessedIndexNumber)
+ "$"
+ clusterStateResponse.getState()
.metadata()
.indices()
.get(sortedIndicesList.get(lastProcessedIndexNumber))
.getCreationDate()).getBytes()
);
indicesStatsRequest.indices(indicesToBeQueried.toArray(new String[0]));
} else {
shardRoutingResponseList.addAll(clusterStateResponse.getState().routingTable().allShards());
indicesStatsRequest.indices(finalIndices);
}

final String finalNextToken = nextToken;
client.admin().indices().stats(indicesStatsRequest, new RestResponseListener<IndicesStatsResponse>(channel) {
@Override
public RestResponse buildResponse(IndicesStatsResponse indicesStatsResponse) throws Exception {
return RestTable.buildResponse(buildTable(request, clusterStateResponse, indicesStatsResponse), channel);
return RestTable.buildResponse(
buildTable(request, clusterStateResponse, indicesStatsResponse, shardRoutingResponseList, finalNextToken),
channel
);
}
});
}
Expand All @@ -132,7 +239,11 @@ public RestResponse buildResponse(IndicesStatsResponse indicesStatsResponse) thr

@Override
protected Table getTableWithHeader(final RestRequest request) {
Table table = new Table();
return getTableWithHeader(request, null);
}

protected Table getTableWithHeader(final RestRequest request, String nextToken) {
Table table = new Table(nextToken, "Shards");
table.startHeaders()
.addCell("index", "default:true;alias:i,idx;desc:index name")
.addCell("shard", "default:true;alias:s,sh;desc:shard name")
Expand Down Expand Up @@ -301,10 +412,15 @@ private static <S, T> Object getOrNull(S stats, Function<S, T> accessor, Functio
}

// package private for testing
Table buildTable(RestRequest request, ClusterStateResponse state, IndicesStatsResponse stats) {
Table table = getTableWithHeader(request);

for (ShardRouting shard : state.getState().routingTable().allShards()) {
Table buildTable(
RestRequest request,
ClusterStateResponse state,
IndicesStatsResponse stats,
List<ShardRouting> shardRoutingList,
String nextToken
) {
Table table = getTableWithHeader(request, nextToken);
for (ShardRouting shard : shardRoutingList) {
ShardStats shardStats = stats.asMap().get(shard);
CommonStats commonStats = null;
CommitStats commitStats = null;
Expand Down Expand Up @@ -453,7 +569,6 @@ Table buildTable(RestRequest request, ClusterStateResponse state, IndicesStatsRe

table.endRow();
}

return table;
}
}
Loading

0 comments on commit 671bb92

Please sign in to comment.