diff --git a/src/main/kotlin/org/opensearch/indexmanagement/controlcenter/notification/ControlCenterIndices.kt b/src/main/kotlin/org/opensearch/indexmanagement/controlcenter/notification/ControlCenterIndices.kt index dbb68a4b2..29b2f65c1 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/controlcenter/notification/ControlCenterIndices.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/controlcenter/notification/ControlCenterIndices.kt @@ -5,6 +5,7 @@ package org.opensearch.indexmanagement.controlcenter.notification +import org.opensearch.ExceptionsHelper import org.opensearch.ResourceAlreadyExistsException import org.opensearch.action.ActionListener import org.opensearch.action.admin.indices.create.CreateIndexRequest @@ -31,7 +32,7 @@ class ControlCenterIndices( indexRequest, object : ActionListener { override fun onFailure(e: Exception) { - if (e is ResourceAlreadyExistsException) { + if (ExceptionsHelper.unwrapCause(e) is ResourceAlreadyExistsException) { /* if two request create the control center index at the same time, may raise this exception */ /* but we don't take it as error */ actionListener.onResponse( diff --git a/src/test/kotlin/org/opensearch/indexmanagement/controlcenter/notification/resthandler/LRONConfigRestTestCase.kt b/src/test/kotlin/org/opensearch/indexmanagement/controlcenter/notification/resthandler/LRONConfigRestTestCase.kt index 81e8fad03..c77aa3f62 100644 --- a/src/test/kotlin/org/opensearch/indexmanagement/controlcenter/notification/resthandler/LRONConfigRestTestCase.kt +++ b/src/test/kotlin/org/opensearch/indexmanagement/controlcenter/notification/resthandler/LRONConfigRestTestCase.kt @@ -17,9 +17,6 @@ import org.opensearch.indexmanagement.IndexManagementPlugin import org.opensearch.indexmanagement.IndexManagementRestTestCase import org.opensearch.indexmanagement.controlcenter.notification.initNodeIdsInRestIT import org.opensearch.indexmanagement.controlcenter.notification.model.LRONConfig -import org.opensearch.indexmanagement.controlcenter.notification.nodeIdsInRestIT -import org.opensearch.indexmanagement.controlcenter.notification.randomLRONConfig -import org.opensearch.indexmanagement.controlcenter.notification.randomTaskId import org.opensearch.indexmanagement.controlcenter.notification.toJsonString import org.opensearch.indexmanagement.makeRequest import org.opensearch.rest.RestStatus @@ -30,8 +27,6 @@ abstract class LRONConfigRestTestCase : IndexManagementRestTestCase() { setDebugLogLevel() /* init cluster node ids in integ test */ initNodeIdsInRestIT(client()) - /* index a random doc to create .opensearch-control-center index */ - createLRONConfig(randomLRONConfig(taskId = randomTaskId(nodeId = nodeIdsInRestIT.random()))) } @After @@ -74,8 +69,13 @@ abstract class LRONConfigRestTestCase : IndexManagementRestTestCase() { companion object { @AfterClass - @JvmStatic fun clearIndicesAfterClass() { - wipeAllIndices() + @JvmStatic fun removeControlCenterIndex() { + try { + adminClient().makeRequest("DELETE", IndexManagementPlugin.CONTROL_CENTER_INDEX, emptyMap()) + } catch (e: ResponseException) { + /* ignore if the index has not been created */ + assertEquals("Unexpected status", RestStatus.NOT_FOUND, RestStatus.fromCode(e.response.statusLine.statusCode)) + } } } } diff --git a/src/test/kotlin/org/opensearch/indexmanagement/controlcenter/notification/resthandler/RestDeleteLRONConfigActionIT.kt b/src/test/kotlin/org/opensearch/indexmanagement/controlcenter/notification/resthandler/RestDeleteLRONConfigActionIT.kt index 35ac2ccfc..82e6d9ebd 100644 --- a/src/test/kotlin/org/opensearch/indexmanagement/controlcenter/notification/resthandler/RestDeleteLRONConfigActionIT.kt +++ b/src/test/kotlin/org/opensearch/indexmanagement/controlcenter/notification/resthandler/RestDeleteLRONConfigActionIT.kt @@ -39,6 +39,8 @@ class RestDeleteLRONConfigActionIT : LRONConfigRestTestCase() { } fun `test delete nonexist LRONConfig response`() { + /* index a random doc to create .opensearch-control-center index */ + createLRONConfig(randomLRONConfig(taskId = randomTaskId(nodeId = nodeIdsInRestIT.random()))) val lronConfig = randomLRONConfig(taskId = randomTaskId(nodeId = nodeIdsInRestIT.random())) val response = client().makeRequest("DELETE", getResourceURI(lronConfig.taskId, lronConfig.actionName)) assertEquals("delete LRONConfig failed", RestStatus.OK, response.restStatus()) diff --git a/src/test/kotlin/org/opensearch/indexmanagement/controlcenter/notification/resthandler/RestGetLRONConfigActionIT.kt b/src/test/kotlin/org/opensearch/indexmanagement/controlcenter/notification/resthandler/RestGetLRONConfigActionIT.kt index 92f50cb62..f9dbef7d9 100644 --- a/src/test/kotlin/org/opensearch/indexmanagement/controlcenter/notification/resthandler/RestGetLRONConfigActionIT.kt +++ b/src/test/kotlin/org/opensearch/indexmanagement/controlcenter/notification/resthandler/RestGetLRONConfigActionIT.kt @@ -39,6 +39,8 @@ class RestGetLRONConfigActionIT : LRONConfigRestTestCase() { } fun `test get nonexist LRONConfig fails`() { + /* index a random doc to create .opensearch-control-center index */ + createLRONConfig(randomLRONConfig(taskId = randomTaskId(nodeId = nodeIdsInRestIT.random()))) try { val lronConfig = randomLRONConfig(taskId = randomTaskId(nodeId = nodeIdsInRestIT.random())) client().makeRequest("GET", getResourceURI(lronConfig.taskId, lronConfig.actionName)) @@ -51,7 +53,7 @@ class RestGetLRONConfigActionIT : LRONConfigRestTestCase() { fun `test get all LRONConfigs`() { /* LRONConfigRestTestCase index a doc to auto create the index, here we wipe the index before count doc number */ - wipeAllIndices() + removeControlCenterIndex() val lronConfigResponses = randomList(1, 15) { createLRONConfig(randomLRONConfig(taskId = randomTaskId(nodeId = nodeIdsInRestIT.random()))).asMap() } @@ -90,16 +92,11 @@ class RestGetLRONConfigActionIT : LRONConfigRestTestCase() { } fun `test get all LRONConfig if index not exists`() { - try { - wipeAllIndices() - val response = client().makeRequest("GET", IndexManagementPlugin.LRON_BASE_URI) - assertEquals("get LRONConfigs failed", RestStatus.OK, response.restStatus()) - val responseBody = response.asMap() - val totalNumber = responseBody["total_number"] - OpenSearchTestCase.assertEquals("wrong LRONConfigs number", 0, totalNumber) - } finally { - /* index a random doc to create .opensearch-control-center index */ - createLRONConfig(randomLRONConfig(taskId = randomTaskId(nodeId = nodeIdsInRestIT.random()))) - } + removeControlCenterIndex() + val response = client().makeRequest("GET", IndexManagementPlugin.LRON_BASE_URI) + assertEquals("get LRONConfigs failed", RestStatus.OK, response.restStatus()) + val responseBody = response.asMap() + val totalNumber = responseBody["total_number"] + OpenSearchTestCase.assertEquals("wrong LRONConfigs number", 0, totalNumber) } } diff --git a/src/test/kotlin/org/opensearch/indexmanagement/controlcenter/notification/resthandler/RestIndexLRONConfigActionIT.kt b/src/test/kotlin/org/opensearch/indexmanagement/controlcenter/notification/resthandler/RestIndexLRONConfigActionIT.kt index a83ff482f..e2d1f0b24 100644 --- a/src/test/kotlin/org/opensearch/indexmanagement/controlcenter/notification/resthandler/RestIndexLRONConfigActionIT.kt +++ b/src/test/kotlin/org/opensearch/indexmanagement/controlcenter/notification/resthandler/RestIndexLRONConfigActionIT.kt @@ -5,6 +5,10 @@ package org.opensearch.indexmanagement.controlcenter.notification.resthandler +import kotlinx.coroutines.asCoroutineDispatcher +import kotlinx.coroutines.async +import kotlinx.coroutines.awaitAll +import kotlinx.coroutines.runBlocking import org.junit.Assert import org.opensearch.client.ResponseException import org.opensearch.common.xcontent.XContentType @@ -21,6 +25,7 @@ import org.opensearch.indexmanagement.makeRequest import org.opensearch.indexmanagement.opensearchapi.convertToMap import org.opensearch.indexmanagement.util.DRY_RUN import org.opensearch.rest.RestStatus +import java.util.concurrent.Executors @Suppress("UNCHECKED_CAST") class RestIndexLRONConfigActionIT : LRONConfigRestTestCase() { @@ -155,21 +160,21 @@ class RestIndexLRONConfigActionIT : LRONConfigRestTestCase() { } fun `test autocreate index for indexLRONConfig action`() { - wipeAllIndices() + removeControlCenterIndex() val lronConfig = randomLRONConfig(taskId = randomTaskId(nodeId = nodeIdsInRestIT.random())) var response = createLRONConfig(lronConfig) assertEquals("Create LRONConfig failed", RestStatus.OK, response.restStatus()) - wipeAllIndices() + removeControlCenterIndex() response = client().makeRequest( "PUT", getResourceURI(lronConfig.taskId, lronConfig.actionName), lronConfig.toHttpEntity() ) assertEquals("Create LRONConfig failed", RestStatus.OK, response.restStatus()) - wipeAllIndices() } fun `test mappings after LRONConfig creation`() { + removeControlCenterIndex() val lronConfig = randomLRONConfig(taskId = randomTaskId(nodeId = nodeIdsInRestIT.random())) createLRONConfig(lronConfig) @@ -185,4 +190,29 @@ class RestIndexLRONConfigActionIT : LRONConfigRestTestCase() { assertEquals("Mappings are different", expectedMap, mappingsMap) } + + fun `test concurrent indexing requests auto create index and not throw exception`() { + removeControlCenterIndex() + val threadSize = 5 + val lronConfigs = List(threadSize) { randomLRONConfig(taskId = randomTaskId(nodeId = nodeIdsInRestIT.random())) } + val threadPool = Executors.newFixedThreadPool(threadSize) + try { + runBlocking { + val dispatcher = threadPool.asCoroutineDispatcher() + val responses = lronConfigs.map { + async(dispatcher) { + createLRONConfig(it) + } + }.awaitAll() + responses.forEach { assertEquals("Create LRONConfig failed", RestStatus.OK, it.restStatus()) } + } + } finally { + threadPool.shutdown() + } + val response = client().makeRequest("GET", IndexManagementPlugin.LRON_BASE_URI) + assertEquals("get LRONConfigs failed", RestStatus.OK, response.restStatus()) + val responseBody = response.asMap() + val totalNumber = responseBody["total_number"] + assertEquals("wrong LRONConfigs number", threadSize, totalNumber) + } }