From e5fde6f193c3e4c7e399f58df1467db9c0eaba6a Mon Sep 17 00:00:00 2001 From: Kshitij Tandon Date: Tue, 17 Sep 2024 17:41:00 +0530 Subject: [PATCH] Fixing snapshot bug (#1257) Signed-off-by: Kshitij Tandon Co-authored-by: bowenlan-amzn (cherry picked from commit 809f85ca2fc26c7b1ee1fd3060bd35bf56b46520) --- .../step/snapshot/WaitForSnapshotStep.kt | 53 ++++++++-------- .../step/WaitForSnapshotStepTests.kt | 61 ++++++++++--------- .../snapshotmanagement/TestUtils.kt | 25 ++++++-- 3 files changed, 80 insertions(+), 59 deletions(-) diff --git a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/step/snapshot/WaitForSnapshotStep.kt b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/step/snapshot/WaitForSnapshotStep.kt index d71de58a6..ef745d412 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/step/snapshot/WaitForSnapshotStep.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/step/snapshot/WaitForSnapshotStep.kt @@ -7,20 +7,19 @@ package org.opensearch.indexmanagement.indexstatemanagement.step.snapshot import org.apache.logging.log4j.LogManager import org.opensearch.ExceptionsHelper -import org.opensearch.action.admin.cluster.snapshots.status.SnapshotStatus -import org.opensearch.action.admin.cluster.snapshots.status.SnapshotsStatusRequest -import org.opensearch.action.admin.cluster.snapshots.status.SnapshotsStatusResponse -import org.opensearch.cluster.SnapshotsInProgress.State +import org.opensearch.action.admin.cluster.snapshots.get.GetSnapshotsRequest +import org.opensearch.action.admin.cluster.snapshots.get.GetSnapshotsResponse import org.opensearch.indexmanagement.indexstatemanagement.action.SnapshotAction import org.opensearch.indexmanagement.opensearchapi.suspendUntil import org.opensearch.indexmanagement.spi.indexstatemanagement.Step import org.opensearch.indexmanagement.spi.indexstatemanagement.model.ActionProperties import org.opensearch.indexmanagement.spi.indexstatemanagement.model.ManagedIndexMetaData import org.opensearch.indexmanagement.spi.indexstatemanagement.model.StepMetaData +import org.opensearch.snapshots.SnapshotInfo +import org.opensearch.snapshots.SnapshotState import org.opensearch.transport.RemoteTransportException class WaitForSnapshotStep(private val action: SnapshotAction) : Step(name) { - private val logger = LogManager.getLogger(javaClass) private var stepStatus = StepStatus.STARTING private var info: Map? = null @@ -34,30 +33,31 @@ class WaitForSnapshotStep(private val action: SnapshotAction) : Step(name) { try { val snapshotName = getSnapshotName(managedIndexMetadata, indexName) ?: return this - val request = SnapshotsStatusRequest() + val newRequest = GetSnapshotsRequest() .snapshots(arrayOf(snapshotName)) .repository(repository) - val response: SnapshotsStatusResponse = context.client.admin().cluster().suspendUntil { snapshotsStatus(request, it) } - val status: SnapshotStatus? = response - .snapshots - .find { snapshotStatus -> - snapshotStatus.snapshot.snapshotId.name == snapshotName && snapshotStatus.snapshot.repository == repository - } + val response: GetSnapshotsResponse = context.client.admin().cluster().suspendUntil { getSnapshots(newRequest, it) } + val status: SnapshotInfo? = + response + .snapshots + .find { snapshotInfo -> + snapshotInfo.snapshotId().name == snapshotName + } if (status != null) { - when (status.state) { - State.INIT, State.STARTED -> { + when (status.state()) { + SnapshotState.IN_PROGRESS -> { stepStatus = StepStatus.CONDITION_NOT_MET - info = mapOf("message" to getSnapshotInProgressMessage(indexName), "state" to status.state.name) + info = mapOf("message" to getSnapshotInProgressMessage(indexName), "state" to status.state().toString()) } - State.SUCCESS -> { + SnapshotState.SUCCESS -> { stepStatus = StepStatus.COMPLETED - info = mapOf("message" to getSuccessMessage(indexName), "state" to status.state.name) + info = mapOf("message" to getSuccessMessage(indexName), "state" to status.state().toString()) } else -> { // State.FAILED, State.ABORTED val message = getFailedExistsMessage(indexName) logger.warn(message) stepStatus = StepStatus.FAILED - info = mapOf("message" to message, "state" to status.state.name) + info = mapOf("message" to message, "state" to status.state().toString()) } } } else { @@ -97,23 +97,26 @@ class WaitForSnapshotStep(private val action: SnapshotAction) : Step(name) { return actionProperties.snapshotName } - override fun getUpdatedManagedIndexMetadata(currentMetadata: ManagedIndexMetaData): ManagedIndexMetaData { - return currentMetadata.copy( - stepMetaData = StepMetaData(name, getStepStartTime(currentMetadata).toEpochMilli(), stepStatus), - transitionTo = null, - info = info, - ) - } + override fun getUpdatedManagedIndexMetadata(currentMetadata: ManagedIndexMetaData): ManagedIndexMetaData = currentMetadata.copy( + stepMetaData = StepMetaData(name, getStepStartTime(currentMetadata).toEpochMilli(), stepStatus), + transitionTo = null, + info = info, + ) override fun isIdempotent(): Boolean = true companion object { const val name = "wait_for_snapshot" + fun getFailedMessage(index: String) = "Failed to get status of snapshot [index=$index]" + fun getFailedExistsMessage(index: String) = "Snapshot doesn't exist [index=$index]" + fun getFailedActionPropertiesMessage(index: String, actionProperties: ActionProperties?) = "Unable to retrieve [${ActionProperties.Properties.SNAPSHOT_NAME.key}] from ActionProperties=$actionProperties [index=$index]" + fun getSuccessMessage(index: String) = "Successfully created snapshot [index=$index]" + fun getSnapshotInProgressMessage(index: String) = "Snapshot currently in progress [index=$index]" } } diff --git a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/step/WaitForSnapshotStepTests.kt b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/step/WaitForSnapshotStepTests.kt index 416512c7a..faed2f2f1 100644 --- a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/step/WaitForSnapshotStepTests.kt +++ b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/step/WaitForSnapshotStepTests.kt @@ -11,17 +11,17 @@ import com.nhaarman.mockitokotlin2.doReturn import com.nhaarman.mockitokotlin2.mock import com.nhaarman.mockitokotlin2.whenever import kotlinx.coroutines.runBlocking -import org.opensearch.action.admin.cluster.snapshots.status.SnapshotStatus -import org.opensearch.action.admin.cluster.snapshots.status.SnapshotsStatusResponse +import org.opensearch.action.admin.cluster.snapshots.get.GetSnapshotsResponse import org.opensearch.client.AdminClient import org.opensearch.client.Client import org.opensearch.client.ClusterAdminClient -import org.opensearch.cluster.SnapshotsInProgress import org.opensearch.cluster.service.ClusterService import org.opensearch.common.settings.Settings import org.opensearch.core.action.ActionListener import org.opensearch.indexmanagement.indexstatemanagement.action.SnapshotAction import org.opensearch.indexmanagement.indexstatemanagement.step.snapshot.WaitForSnapshotStep +import org.opensearch.indexmanagement.snapshotmanagement.mockInProgressSnapshotInfo +import org.opensearch.indexmanagement.snapshotmanagement.mockSnapshotInfo import org.opensearch.indexmanagement.spi.indexstatemanagement.Step import org.opensearch.indexmanagement.spi.indexstatemanagement.model.ActionMetaData import org.opensearch.indexmanagement.spi.indexstatemanagement.model.ActionProperties @@ -29,13 +29,12 @@ import org.opensearch.indexmanagement.spi.indexstatemanagement.model.ManagedInde import org.opensearch.indexmanagement.spi.indexstatemanagement.model.StepContext import org.opensearch.jobscheduler.spi.utils.LockService import org.opensearch.script.ScriptService -import org.opensearch.snapshots.Snapshot -import org.opensearch.snapshots.SnapshotId +import org.opensearch.snapshots.SnapshotInfo +import org.opensearch.snapshots.SnapshotState import org.opensearch.test.OpenSearchTestCase import org.opensearch.transport.RemoteTransportException class WaitForSnapshotStepTests : OpenSearchTestCase() { - private val clusterService: ClusterService = mock() private val scriptService: ScriptService = mock() private val settings: Settings = Settings.EMPTY @@ -71,13 +70,10 @@ class WaitForSnapshotStepTests : OpenSearchTestCase() { } fun `test snapshot status states`() { - val snapshotStatus: SnapshotStatus = mock() - val response: SnapshotsStatusResponse = mock() - whenever(response.snapshots).doReturn(listOf(snapshotStatus)) - whenever(snapshotStatus.snapshot).doReturn(Snapshot("repo", SnapshotId("snapshot-name", "some_uuid"))) + val snapshotInfo: SnapshotInfo = mockInProgressSnapshotInfo(snapshot) + val response: GetSnapshotsResponse = mock() + whenever(response.snapshots).doReturn(listOf(snapshotInfo)) val client = getClient(getAdminClient(getClusterAdminClient(response, null))) - - whenever(snapshotStatus.state).doReturn(SnapshotsInProgress.State.INIT) runBlocking { val snapshotAction = SnapshotAction("repo", snapshot, 0) val metadata = ManagedIndexMetaData("test", "indexUuid", "policy_id", null, null, null, null, null, null, null, ActionMetaData(WaitForSnapshotStep.name, 1, 0, false, 0, null, ActionProperties(snapshotName = "snapshot-name")), null, null, null) @@ -89,7 +85,8 @@ class WaitForSnapshotStepTests : OpenSearchTestCase() { assertEquals("Did not get snapshot in progress message", WaitForSnapshotStep.getSnapshotInProgressMessage("test"), updatedManagedIndexMetaData.info!!["message"]) } - whenever(snapshotStatus.state).doReturn(SnapshotsInProgress.State.STARTED) + val snapshotInfo2: SnapshotInfo = mockSnapshotInfo(snapshot, SnapshotState.SUCCESS) + whenever(response.snapshots).doReturn(listOf(snapshotInfo2)) runBlocking { val snapshotAction = SnapshotAction("repo", snapshot, 0) val metadata = ManagedIndexMetaData("test", "indexUuid", "policy_id", null, null, null, null, null, null, null, ActionMetaData(WaitForSnapshotStep.name, 1, 0, false, 0, null, ActionProperties(snapshotName = "snapshot-name")), null, null, null) @@ -97,11 +94,12 @@ class WaitForSnapshotStepTests : OpenSearchTestCase() { val context = StepContext(metadata, clusterService, client, null, null, scriptService, settings, lockService) step.preExecute(logger, context).execute() val updatedManagedIndexMetaData = step.getUpdatedManagedIndexMetadata(metadata) - assertEquals("Step status is not CONDITION_NOT_MET", Step.StepStatus.CONDITION_NOT_MET, updatedManagedIndexMetaData.stepMetaData?.stepStatus) - assertEquals("Did not get snapshot in progress message", WaitForSnapshotStep.getSnapshotInProgressMessage("test"), updatedManagedIndexMetaData.info!!["message"]) + assertEquals("Step status is not COMPLETED", Step.StepStatus.COMPLETED, updatedManagedIndexMetaData.stepMetaData?.stepStatus) + assertEquals("Did not get snapshot completed message", WaitForSnapshotStep.getSuccessMessage("test"), updatedManagedIndexMetaData.info!!["message"]) } - whenever(snapshotStatus.state).doReturn(SnapshotsInProgress.State.SUCCESS) + val snapshotInfo3: SnapshotInfo = mockSnapshotInfo(snapshot, SnapshotState.FAILED) + whenever(response.snapshots).doReturn(listOf(snapshotInfo3)) runBlocking { val snapshotAction = SnapshotAction("repo", snapshot, 0) val metadata = ManagedIndexMetaData("test", "indexUuid", "policy_id", null, null, null, null, null, null, null, ActionMetaData(WaitForSnapshotStep.name, 1, 0, false, 0, null, ActionProperties(snapshotName = "snapshot-name")), null, null, null) @@ -109,11 +107,12 @@ class WaitForSnapshotStepTests : OpenSearchTestCase() { val context = StepContext(metadata, clusterService, client, null, null, scriptService, settings, lockService) step.preExecute(logger, context).execute() val updatedManagedIndexMetaData = step.getUpdatedManagedIndexMetadata(metadata) - assertEquals("Step status is not COMPLETED", Step.StepStatus.COMPLETED, updatedManagedIndexMetaData.stepMetaData?.stepStatus) - assertEquals("Did not get snapshot completed message", WaitForSnapshotStep.getSuccessMessage("test"), updatedManagedIndexMetaData.info!!["message"]) + assertEquals("Step status is not FAILED", Step.StepStatus.FAILED, updatedManagedIndexMetaData.stepMetaData?.stepStatus) + assertEquals("Did not get snapshot failed message", WaitForSnapshotStep.getFailedExistsMessage("test"), updatedManagedIndexMetaData.info!!["message"]) } - whenever(snapshotStatus.state).doReturn(SnapshotsInProgress.State.ABORTED) + val snapshotInfo4: SnapshotInfo = mockSnapshotInfo(snapshot, SnapshotState.PARTIAL) + whenever(response.snapshots).doReturn(listOf(snapshotInfo4)) runBlocking { val snapshotAction = SnapshotAction("repo", snapshot, 0) val metadata = ManagedIndexMetaData("test", "indexUuid", "policy_id", null, null, null, null, null, null, null, ActionMetaData(WaitForSnapshotStep.name, 1, 0, false, 0, null, ActionProperties(snapshotName = "snapshot-name")), null, null, null) @@ -125,7 +124,8 @@ class WaitForSnapshotStepTests : OpenSearchTestCase() { assertEquals("Did not get snapshot failed message", WaitForSnapshotStep.getFailedExistsMessage("test"), updatedManagedIndexMetaData.info!!["message"]) } - whenever(snapshotStatus.state).doReturn(SnapshotsInProgress.State.FAILED) + val snapshotInfo5: SnapshotInfo = mockSnapshotInfo(snapshot, SnapshotState.INCOMPATIBLE) + whenever(response.snapshots).doReturn(listOf(snapshotInfo5)) runBlocking { val snapshotAction = SnapshotAction("repo", snapshot, 0) val metadata = ManagedIndexMetaData("test", "indexUuid", "policy_id", null, null, null, null, null, null, null, ActionMetaData(WaitForSnapshotStep.name, 1, 0, false, 0, null, ActionProperties(snapshotName = "snapshot-name")), null, null, null) @@ -139,10 +139,9 @@ class WaitForSnapshotStepTests : OpenSearchTestCase() { } fun `test snapshot not in response list`() { - val snapshotStatus: SnapshotStatus = mock() - val response: SnapshotsStatusResponse = mock() - whenever(response.snapshots).doReturn(listOf(snapshotStatus)) - whenever(snapshotStatus.snapshot).doReturn(Snapshot("repo", SnapshotId("snapshot-different-name", "some_uuid"))) + val snapshotInfo: SnapshotInfo = mockSnapshotInfo("snapshot-different-name") + val response: GetSnapshotsResponse = mock() + whenever(response.snapshots).doReturn(listOf(snapshotInfo)) val client = getClient(getAdminClient(getClusterAdminClient(response, null))) runBlocking { @@ -188,18 +187,20 @@ class WaitForSnapshotStepTests : OpenSearchTestCase() { } private fun getClient(adminClient: AdminClient): Client = mock { on { admin() } doReturn adminClient } + private fun getAdminClient(clusterAdminClient: ClusterAdminClient): AdminClient = mock { on { cluster() } doReturn clusterAdminClient } - private fun getClusterAdminClient(snapshotsStatusResponse: SnapshotsStatusResponse?, exception: Exception?): ClusterAdminClient { - assertTrue("Must provide one and only one response or exception", (snapshotsStatusResponse != null).xor(exception != null)) + + private fun getClusterAdminClient(getSnapshotsResponse: GetSnapshotsResponse?, exception: Exception?): ClusterAdminClient { + assertTrue("Must provide one and only one response or exception", (getSnapshotsResponse != null).xor(exception != null)) return mock { doAnswer { invocationOnMock -> - val listener = invocationOnMock.getArgument>(1) - if (snapshotsStatusResponse != null) { - listener.onResponse(snapshotsStatusResponse) + val listener = invocationOnMock.getArgument>(1) + if (getSnapshotsResponse != null) { + listener.onResponse(getSnapshotsResponse) } else { listener.onFailure(exception) } - }.whenever(this.mock).snapshotsStatus(any(), any()) + }.whenever(this.mock).getSnapshots(any(), any()) } } } diff --git a/src/test/kotlin/org/opensearch/indexmanagement/snapshotmanagement/TestUtils.kt b/src/test/kotlin/org/opensearch/indexmanagement/snapshotmanagement/TestUtils.kt index c7c366c6c..8392e6bee 100644 --- a/src/test/kotlin/org/opensearch/indexmanagement/snapshotmanagement/TestUtils.kt +++ b/src/test/kotlin/org/opensearch/indexmanagement/snapshotmanagement/TestUtils.kt @@ -37,6 +37,7 @@ import org.opensearch.jobscheduler.spi.schedule.IntervalSchedule import org.opensearch.snapshots.Snapshot import org.opensearch.snapshots.SnapshotId import org.opensearch.snapshots.SnapshotInfo +import org.opensearch.snapshots.SnapshotState import org.opensearch.test.OpenSearchTestCase.randomAlphaOfLength import org.opensearch.test.OpenSearchTestCase.randomBoolean import org.opensearch.test.OpenSearchTestCase.randomIntBetween @@ -169,11 +170,14 @@ fun randomSMState(): SMState = SMState.values()[randomIntBetween(0, SMState.valu fun randomNotificationConfig(): NotificationConfig = NotificationConfig(randomChannel(), randomConditions()) -fun randomConditions(): NotificationConfig.Conditions = NotificationConfig.Conditions(randomBoolean(), randomBoolean(), randomBoolean(), randomBoolean()) +fun randomConditions(): NotificationConfig.Conditions = + NotificationConfig.Conditions(randomBoolean(), randomBoolean(), randomBoolean(), randomBoolean()) -fun ToXContent.toJsonString(params: ToXContent.Params = ToXContent.EMPTY_PARAMS): String = this.toXContent(XContentFactory.jsonBuilder(), params).string() +fun ToXContent.toJsonString(params: ToXContent.Params = ToXContent.EMPTY_PARAMS): String = + this.toXContent(XContentFactory.jsonBuilder(), params).string() -fun ToXContent.toMap(params: ToXContent.Params = ToXContent.EMPTY_PARAMS): Map = this.toXContent(XContentFactory.jsonBuilder(), params).toMap() +fun ToXContent.toMap(params: ToXContent.Params = ToXContent.EMPTY_PARAMS): Map = + this.toXContent(XContentFactory.jsonBuilder(), params).toMap() fun mockIndexResponse(status: RestStatus = RestStatus.OK): IndexResponse { val indexResponse: IndexResponse = mock() @@ -253,6 +257,18 @@ fun mockInProgressSnapshotInfo( return SnapshotInfo(entry) } +fun mockSnapshotInfo( + name: String = randomAlphaOfLength(10), + snapshotState: SnapshotState, +): SnapshotInfo { + return SnapshotInfo( + SnapshotId(name, UUIDs.randomBase64UUID()), + emptyList(), + emptyList(), + snapshotState, + ) +} + fun mockGetSnapshotResponse(num: Int): GetSnapshotsResponse { val getSnapshotsRes: GetSnapshotsResponse = mock() whenever(getSnapshotsRes.snapshots).doReturn(mockSnapshotInfoList(num)) @@ -271,4 +287,5 @@ fun mockSnapshotInfoList(num: Int, namePrefix: String = randomAlphaOfLength(10)) return result.toList() } -fun String.parser(): XContentParser = XContentType.JSON.xContent().createParser(NamedXContentRegistry.EMPTY, LoggingDeprecationHandler.INSTANCE, this) +fun String.parser(): XContentParser = + XContentType.JSON.xContent().createParser(NamedXContentRegistry.EMPTY, LoggingDeprecationHandler.INSTANCE, this)