diff --git a/server/src/internalClusterTest/java/org/opensearch/gateway/GatewayRecoveryTestUtils.java b/server/src/internalClusterTest/java/org/opensearch/gateway/GatewayRecoveryTestUtils.java index 2b6a5b4ee6867..dc157681be6fa 100644 --- a/server/src/internalClusterTest/java/org/opensearch/gateway/GatewayRecoveryTestUtils.java +++ b/server/src/internalClusterTest/java/org/opensearch/gateway/GatewayRecoveryTestUtils.java @@ -54,7 +54,7 @@ public static Map prepareRequestMap(String[] indices, ); for (int shardIdNum = 0; shardIdNum < primaryShardCount; shardIdNum++) { final ShardId shardId = new ShardId(index, shardIdNum); - shardIdShardAttributesMap.put(shardId, new ShardAttributes(shardId, customDataPath)); + shardIdShardAttributesMap.put(shardId, new ShardAttributes(customDataPath)); } } return shardIdShardAttributesMap; diff --git a/server/src/main/java/org/opensearch/gateway/AsyncShardFetch.java b/server/src/main/java/org/opensearch/gateway/AsyncShardFetch.java index 42ad0fbdfba68..52f52362e8347 100644 --- a/server/src/main/java/org/opensearch/gateway/AsyncShardFetch.java +++ b/server/src/main/java/org/opensearch/gateway/AsyncShardFetch.java @@ -94,7 +94,7 @@ protected AsyncShardFetch( this.logger = logger; this.type = type; shardAttributesMap = new HashMap<>(); - shardAttributesMap.put(shardId, new ShardAttributes(shardId, customDataPath)); + shardAttributesMap.put(shardId, new ShardAttributes(customDataPath)); this.action = (Lister, T>) action; this.reroutingKey = "ShardId=[" + shardId.toString() + "]"; this.shardCache = new ShardCache(logger, reroutingKey, type); diff --git a/server/src/main/java/org/opensearch/gateway/BaseShardCache.java b/server/src/main/java/org/opensearch/gateway/BaseShardCache.java index 0266d2fe5ec39..08740ee272dbe 100644 --- a/server/src/main/java/org/opensearch/gateway/BaseShardCache.java +++ b/server/src/main/java/org/opensearch/gateway/BaseShardCache.java @@ -136,7 +136,7 @@ boolean hasAnyNodeFetching() { * @param failedNodes return failedNodes with the nodes where fetch has failed. * @return Map of cache data for every DiscoveryNode. */ - Map getCacheData(DiscoveryNodes nodes, Set failedNodes) { + Map getCacheData(DiscoveryNodes nodes, Set failedNodes) { Map fetchData = new HashMap<>(); for (Iterator> it = getCache().entrySet().iterator(); it.hasNext();) { Map.Entry entry = (Map.Entry) it.next(); diff --git a/server/src/main/java/org/opensearch/gateway/ShardCache.java b/server/src/main/java/org/opensearch/gateway/ShardCache.java index d7c84be2e06e3..3c3bffc79715f 100644 --- a/server/src/main/java/org/opensearch/gateway/ShardCache.java +++ b/server/src/main/java/org/opensearch/gateway/ShardCache.java @@ -26,10 +26,11 @@ */ public class ShardCache extends BaseShardCache { - private final Map> cache = new HashMap<>(); + private final Map> cache; public ShardCache(Logger logger, String logKey, String type) { super(logger, logKey, type); + cache = new HashMap<>(); } @Override diff --git a/server/src/main/java/org/opensearch/gateway/ShardsBatchGatewayAllocator.java b/server/src/main/java/org/opensearch/gateway/ShardsBatchGatewayAllocator.java index 67f768ccddc2b..efbe614dc716d 100644 --- a/server/src/main/java/org/opensearch/gateway/ShardsBatchGatewayAllocator.java +++ b/server/src/main/java/org/opensearch/gateway/ShardsBatchGatewayAllocator.java @@ -261,7 +261,6 @@ else if (shardRouting.primary() == primary) { if (batchSize > 0) { ShardEntry shardEntry = new ShardEntry( new ShardAttributes( - currentShard.shardId(), IndexMetadata.INDEX_DATA_PATH_SETTING.get(allocation.metadata().index(currentShard.index()).getSettings()) ), currentShard @@ -705,7 +704,7 @@ public String toString() { /** * Holds information about a shard to be allocated in a batch. */ - public class ShardEntry { + public static class ShardEntry { private final ShardAttributes shardAttributes; diff --git a/server/src/main/java/org/opensearch/indices/store/ShardAttributes.java b/server/src/main/java/org/opensearch/indices/store/ShardAttributes.java index d0683703df05c..155d787ae8316 100644 --- a/server/src/main/java/org/opensearch/indices/store/ShardAttributes.java +++ b/server/src/main/java/org/opensearch/indices/store/ShardAttributes.java @@ -12,7 +12,6 @@ import org.opensearch.core.common.io.stream.StreamInput; import org.opensearch.core.common.io.stream.StreamOutput; import org.opensearch.core.common.io.stream.Writeable; -import org.opensearch.core.index.shard.ShardId; import java.io.IOException; @@ -22,24 +21,17 @@ * @opensearch.internal */ public class ShardAttributes implements Writeable { - private final ShardId shardId; @Nullable private final String customDataPath; - public ShardAttributes(ShardId shardId, String customDataPath) { - this.shardId = shardId; + public ShardAttributes(String customDataPath) { this.customDataPath = customDataPath; } public ShardAttributes(StreamInput in) throws IOException { - shardId = new ShardId(in); customDataPath = in.readString(); } - public ShardId getShardId() { - return shardId; - } - /** * Returns the custom data path that is used to look up information for this shard. * Returns an empty string if no custom data path is used for this index. @@ -51,12 +43,11 @@ public String getCustomDataPath() { } public void writeTo(StreamOutput out) throws IOException { - shardId.writeTo(out); out.writeString(customDataPath); } @Override public String toString() { - return "ShardAttributes{" + "shardId=" + shardId + ", customDataPath='" + customDataPath + '\'' + '}'; + return "ShardAttributes{" + ", customDataPath='" + customDataPath + '\'' + '}'; } } diff --git a/server/src/test/java/org/opensearch/gateway/AsyncShardFetchTests.java b/server/src/test/java/org/opensearch/gateway/AsyncShardFetchTests.java index 4e5e9c71e1fe4..3502cc8996fa2 100644 --- a/server/src/test/java/org/opensearch/gateway/AsyncShardFetchTests.java +++ b/server/src/test/java/org/opensearch/gateway/AsyncShardFetchTests.java @@ -92,8 +92,8 @@ public void setUp() throws Exception { HashMap shardToCustomDataPath = new HashMap<>(); ShardId shardId0 = new ShardId("index1", "index_uuid1", 0); ShardId shardId1 = new ShardId("index2", "index_uuid2", 0); - shardToCustomDataPath.put(shardId0, new ShardAttributes(shardId0, "")); - shardToCustomDataPath.put(shardId1, new ShardAttributes(shardId1, "")); + shardToCustomDataPath.put(shardId0, new ShardAttributes("")); + shardToCustomDataPath.put(shardId1, new ShardAttributes("")); this.test = new TestFetch(threadPool, shardToCustomDataPath); } } diff --git a/server/src/test/java/org/opensearch/gateway/BatchTestUtil.java b/server/src/test/java/org/opensearch/gateway/BatchTestUtil.java index d009856ef020d..69f0cfeeb2c7d 100644 --- a/server/src/test/java/org/opensearch/gateway/BatchTestUtil.java +++ b/server/src/test/java/org/opensearch/gateway/BatchTestUtil.java @@ -10,12 +10,12 @@ import org.opensearch.core.index.shard.ShardId; -import java.util.HashSet; -import java.util.Set; +import java.util.ArrayList; +import java.util.List; public class BatchTestUtil { - public static Set setUpShards(int numberOfShards) { - Set shards = new HashSet<>(); + public static List setUpShards(int numberOfShards) { + List shards = new ArrayList<>(); for (int shardNumber = 0; shardNumber < numberOfShards; shardNumber++) { ShardId shardId = new ShardId("test", "_na_", shardNumber); shards.add(shardId); diff --git a/server/src/test/java/org/opensearch/gateway/PrimaryShardBatchAllocatorTests.java b/server/src/test/java/org/opensearch/gateway/PrimaryShardBatchAllocatorTests.java index a6417ff999127..a8f8a01c63cce 100644 --- a/server/src/test/java/org/opensearch/gateway/PrimaryShardBatchAllocatorTests.java +++ b/server/src/test/java/org/opensearch/gateway/PrimaryShardBatchAllocatorTests.java @@ -49,7 +49,7 @@ public class PrimaryShardBatchAllocatorTests extends OpenSearchAllocationTestCase { private final ShardId shardId = new ShardId("test", "_na_", 0); - private static Set shardsInBatch; + private static List shardsInBatch; private final DiscoveryNode node1 = newNode("node1"); private final DiscoveryNode node2 = newNode("node2"); private final DiscoveryNode node3 = newNode("node3"); diff --git a/server/src/test/java/org/opensearch/gateway/ShardBatchCacheTests.java b/server/src/test/java/org/opensearch/gateway/ShardBatchCacheTests.java index 4449d78c2d560..2c769b4e165f6 100644 --- a/server/src/test/java/org/opensearch/gateway/ShardBatchCacheTests.java +++ b/server/src/test/java/org/opensearch/gateway/ShardBatchCacheTests.java @@ -8,20 +8,30 @@ package org.opensearch.gateway; +import org.opensearch.cluster.OpenSearchAllocationTestCase; +import org.opensearch.cluster.node.DiscoveryNode; +import org.opensearch.cluster.node.DiscoveryNodes; +import org.opensearch.cluster.routing.ShardRouting; +import org.opensearch.cluster.routing.ShardRoutingState; +import org.opensearch.cluster.routing.TestShardRouting; import org.opensearch.core.index.shard.ShardId; +import org.opensearch.gateway.TransportNodesListGatewayStartedShardsBatch.NodeGatewayStartedShard; +import org.opensearch.gateway.TransportNodesListGatewayStartedShardsBatch.NodeGatewayStartedShardsBatch; import org.opensearch.indices.store.ShardAttributes; -import org.opensearch.test.OpenSearchTestCase; +import java.util.ArrayList; import java.util.HashMap; +import java.util.List; import java.util.Map; -public class ShardBatchCacheTests extends OpenSearchTestCase { - private final ShardId shardId = new ShardId("test", "_na_", 0); - +public class ShardBatchCacheTests extends OpenSearchAllocationTestCase { + private static final String BATCH_ID = "b1"; + private final DiscoveryNode node1 = newNode("node1"); + private final DiscoveryNode node2 = newNode("node2"); + private final DiscoveryNode node3 = newNode("node3"); private Map batchInfo = new HashMap<>(); - private ShardBatchCache< - TransportNodesListGatewayStartedShardsBatch.NodeGatewayStartedShardsBatch, - TransportNodesListGatewayStartedShardsBatch.NodeGatewayStartedShard> shardCache; + private ShardBatchCache shardCache; + private List shardsInBatch = new ArrayList<>(); public void setupShardBatchCache(String batchId) { Map shardAttributesMap = new HashMap<>(); @@ -31,21 +41,130 @@ public void setupShardBatchCache(String batchId) { "batch_shards_started", shardAttributesMap, "BatchID=[" + batchId + "]", - TransportNodesListGatewayStartedShardsBatch.NodeGatewayStartedShard.class, - TransportNodesListGatewayStartedShardsBatch.NodeGatewayStartedShardsBatch::new, - TransportNodesListGatewayStartedShardsBatch.NodeGatewayStartedShardsBatch::getNodeGatewayStartedShardsBatch, - () -> new TransportNodesListGatewayStartedShardsBatch.NodeGatewayStartedShard(null, false, null, null), + NodeGatewayStartedShard.class, + NodeGatewayStartedShardsBatch::new, + NodeGatewayStartedShardsBatch::getNodeGatewayStartedShardsBatch, + () -> new NodeGatewayStartedShard(null, false, null, null), this::removeShard ); } + public void testClearShardCache() { + setupShardBatchCache(BATCH_ID); + ShardId shard = shardsInBatch.iterator().next(); + this.shardCache.initData(node1); + this.shardCache.markAsFetching(List.of(node1.getId()), 1); + this.shardCache.putData(node1, new NodeGatewayStartedShardsBatch(node1, getEmptyPrimaryResponse(shardsInBatch))); + assertTrue( + this.shardCache.getCacheData(DiscoveryNodes.builder().add(node1).build(), null) + .get(node1) + .getNodeGatewayStartedShardsBatch() + .containsKey(shard) + ); + this.shardCache.clearShardCache(shard); + assertFalse( + this.shardCache.getCacheData(DiscoveryNodes.builder().add(node1).build(), null) + .get(node1) + .getNodeGatewayStartedShardsBatch() + .containsKey(shard) + ); + } + + public void testGetCacheData() { + setupShardBatchCache(BATCH_ID); + ShardId shard = shardsInBatch.iterator().next(); + this.shardCache.initData(node1); + this.shardCache.initData(node2); + this.shardCache.markAsFetching(List.of(node1.getId(), node2.getId()), 1); + this.shardCache.putData(node1, new NodeGatewayStartedShardsBatch(node1, getEmptyPrimaryResponse(shardsInBatch))); + assertTrue( + this.shardCache.getCacheData(DiscoveryNodes.builder().add(node1).build(), null) + .get(node1) + .getNodeGatewayStartedShardsBatch() + .containsKey(shard) + ); + assertTrue( + this.shardCache.getCacheData(DiscoveryNodes.builder().add(node2).build(), null) + .get(node2) + .getNodeGatewayStartedShardsBatch() + .isEmpty() + ); + } + + public void testInitCacheData() { + setupShardBatchCache(BATCH_ID); + this.shardCache.initData(node1); + this.shardCache.initData(node2); + assertEquals(2, shardCache.getCache().size()); + } + + public void testPutData() { + setupShardBatchCache(BATCH_ID); + ShardId shard = shardsInBatch.iterator().next(); + this.shardCache.initData(node1); + this.shardCache.initData(node2); + this.shardCache.markAsFetching(List.of(node1.getId(), node2.getId()), 1); + this.shardCache.putData(node1, new NodeGatewayStartedShardsBatch(node1, getActualPrimaryResponse(shardsInBatch))); + this.shardCache.putData(node2, new NodeGatewayStartedShardsBatch(node1, getEmptyPrimaryResponse(shardsInBatch))); + + Map fetchData = shardCache.getCacheData( + DiscoveryNodes.builder().add(node1).add(node2).build(), + null + ); + assertEquals(2, fetchData.size()); + assertEquals(10, fetchData.get(node1).getNodeGatewayStartedShardsBatch().size()); + assertEquals("alloc-1", fetchData.get(node1).getNodeGatewayStartedShardsBatch().get(shard).allocationId()); + + assertEquals(10, fetchData.get(node2).getNodeGatewayStartedShardsBatch().size()); + assertTrue(fetchData.get(node2).getNodeGatewayStartedShardsBatch().get(shard).isEmpty()); + } + + public void testFilterFailedShards() { + // ToDo + } + + private Map getEmptyPrimaryResponse(List shards) { + Map shardData = new HashMap<>(); + for (ShardId shard : shards) { + shardData.put(shard, new NodeGatewayStartedShard(null, false, null, null)); + } + return shardData; + } + + private Map getActualPrimaryResponse(List shards) { + int allocationId = 1; + Map shardData = new HashMap<>(); + for (ShardId shard : shards) { + shardData.put(shard, new NodeGatewayStartedShard("alloc-" + allocationId++, false, null, null)); + } + return shardData; + } + public void removeShard(ShardId shardId) { batchInfo.remove(shardId); } private void fillShards(Map shardAttributesMap) { - BatchTestUtil.setUpShards(10); - // ToDo + shardsInBatch = BatchTestUtil.setUpShards(10); + for (ShardId shardId : shardsInBatch) { + ShardAttributes attr = new ShardAttributes(""); + shardAttributesMap.put(shardId, attr); + batchInfo.put( + shardId, + new ShardsBatchGatewayAllocator.ShardEntry(attr, randomShardRouting(shardId.getIndexName(), shardId.id())) + ); + } } + private ShardRouting randomShardRouting(String index, int shard) { + ShardRoutingState state = randomFrom(ShardRoutingState.values()); + return TestShardRouting.newShardRouting( + index, + shard, + state == ShardRoutingState.UNASSIGNED ? null : "1", + state == ShardRoutingState.RELOCATING ? "2" : null, + state != ShardRoutingState.UNASSIGNED && randomBoolean(), + state + ); + } } diff --git a/server/src/test/java/org/opensearch/indices/store/ShardAttributesTests.java b/server/src/test/java/org/opensearch/indices/store/ShardAttributesTests.java index 15b9d06218e93..94834bab1d98b 100644 --- a/server/src/test/java/org/opensearch/indices/store/ShardAttributesTests.java +++ b/server/src/test/java/org/opensearch/indices/store/ShardAttributesTests.java @@ -28,12 +28,12 @@ public class ShardAttributesTests extends OpenSearchTestCase { String customDataPath = "/path/to/data"; public void testShardAttributesConstructor() { - ShardAttributes attributes = new ShardAttributes(shardId, customDataPath); + ShardAttributes attributes = new ShardAttributes(customDataPath); assertEquals(attributes.getCustomDataPath(), customDataPath); } public void testSerialization() throws IOException { - ShardAttributes attributes1 = new ShardAttributes(shardId, customDataPath); + ShardAttributes attributes1 = new ShardAttributes(customDataPath); ByteArrayOutputStream bytes = new ByteArrayOutputStream(); StreamOutput output = new DataOutputStreamOutput(new DataOutputStream(bytes)); attributes1.writeTo(output);