diff --git a/DEVELOPER_GUIDE.md b/DEVELOPER_GUIDE.md index 61ba3f9f9..c37625806 100644 --- a/DEVELOPER_GUIDE.md +++ b/DEVELOPER_GUIDE.md @@ -48,7 +48,7 @@ However, to build the `index management` plugin project, we also use the OpenSea ### Building from the command line -1. `./gradlew build` builds and tests project. +1. `./gradlew build` builds and tests project. 2. `./gradlew run` launches a single node cluster with the index management (and job-scheduler) plugin installed. 3. `./gradlew run -PnumNodes=3` launches a multi-node cluster with the index management (and job-scheduler) plugin installed. 4. `./gradlew integTest` launches a single node cluster with the index management (and job-scheduler) plugin installed and runs all integ tests. diff --git a/spi/src/main/kotlin/org.opensearch.indexmanagement.spi/indexstatemanagement/model/ActionProperties.kt b/spi/src/main/kotlin/org.opensearch.indexmanagement.spi/indexstatemanagement/model/ActionProperties.kt index e957b7724..1b75aec11 100644 --- a/spi/src/main/kotlin/org.opensearch.indexmanagement.spi/indexstatemanagement/model/ActionProperties.kt +++ b/spi/src/main/kotlin/org.opensearch.indexmanagement.spi/indexstatemanagement/model/ActionProperties.kt @@ -23,7 +23,8 @@ data class ActionProperties( val snapshotName: String? = null, val rollupId: String? = null, val hasRollupFailed: Boolean? = null, - val shrinkActionProperties: ShrinkActionProperties? = null + val shrinkActionProperties: ShrinkActionProperties? = null, + val transformActionProperties: TransformActionProperties? = null ) : Writeable, ToXContentFragment { override fun writeTo(out: StreamOutput) { @@ -32,6 +33,7 @@ data class ActionProperties( out.writeOptionalString(rollupId) out.writeOptionalBoolean(hasRollupFailed) out.writeOptionalWriteable(shrinkActionProperties) + out.writeOptionalWriteable(transformActionProperties) } override fun toXContent(builder: XContentBuilder, params: ToXContent.Params): XContentBuilder { @@ -40,6 +42,7 @@ data class ActionProperties( if (rollupId != null) builder.field(Properties.ROLLUP_ID.key, rollupId) if (hasRollupFailed != null) builder.field(Properties.HAS_ROLLUP_FAILED.key, hasRollupFailed) if (shrinkActionProperties != null) builder.addObject(ShrinkActionProperties.SHRINK_ACTION_PROPERTIES, shrinkActionProperties, params) + if (transformActionProperties != null) builder.addObject(TransformActionProperties.TRANSFORM_ACTION_PROPERTIES, transformActionProperties, params) return builder } @@ -52,7 +55,8 @@ data class ActionProperties( val rollupId: String? = si.readOptionalString() val hasRollupFailed: Boolean? = si.readOptionalBoolean() val shrinkActionProperties: ShrinkActionProperties? = si.readOptionalWriteable { ShrinkActionProperties.fromStreamInput(it) } - return ActionProperties(maxNumSegments, snapshotName, rollupId, hasRollupFailed, shrinkActionProperties) + val transformActionProperties: TransformActionProperties? = si.readOptionalWriteable { TransformActionProperties.fromStreamInput(it) } + return ActionProperties(maxNumSegments, snapshotName, rollupId, hasRollupFailed, shrinkActionProperties, transformActionProperties) } fun parse(xcp: XContentParser): ActionProperties { @@ -61,6 +65,7 @@ data class ActionProperties( var rollupId: String? = null var hasRollupFailed: Boolean? = null var shrinkActionProperties: ShrinkActionProperties? = null + var transformActionProperties: TransformActionProperties? = null ensureExpectedToken(Token.START_OBJECT, xcp.currentToken(), xcp) while (xcp.nextToken() != Token.END_OBJECT) { @@ -75,10 +80,13 @@ data class ActionProperties( ShrinkActionProperties.SHRINK_ACTION_PROPERTIES -> { shrinkActionProperties = if (xcp.currentToken() == Token.VALUE_NULL) null else ShrinkActionProperties.parse(xcp) } + TransformActionProperties.TRANSFORM_ACTION_PROPERTIES -> { + transformActionProperties = if (xcp.currentToken() == Token.VALUE_NULL) null else TransformActionProperties.parse(xcp) + } } } - return ActionProperties(maxNumSegments, snapshotName, rollupId, hasRollupFailed, shrinkActionProperties) + return ActionProperties(maxNumSegments, snapshotName, rollupId, hasRollupFailed, shrinkActionProperties, transformActionProperties) } } diff --git a/spi/src/main/kotlin/org.opensearch.indexmanagement.spi/indexstatemanagement/model/TransformActionProperties.kt b/spi/src/main/kotlin/org.opensearch.indexmanagement.spi/indexstatemanagement/model/TransformActionProperties.kt new file mode 100644 index 000000000..70b593750 --- /dev/null +++ b/spi/src/main/kotlin/org.opensearch.indexmanagement.spi/indexstatemanagement/model/TransformActionProperties.kt @@ -0,0 +1,58 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.indexmanagement.spi.indexstatemanagement.model + +import org.opensearch.core.common.io.stream.StreamInput +import org.opensearch.core.common.io.stream.StreamOutput +import org.opensearch.core.common.io.stream.Writeable +import org.opensearch.core.xcontent.XContentParserUtils.ensureExpectedToken +import org.opensearch.core.xcontent.ToXContent +import org.opensearch.core.xcontent.ToXContentFragment +import org.opensearch.core.xcontent.XContentBuilder +import org.opensearch.core.xcontent.XContentParser + +data class TransformActionProperties( + val transformId: String? +) : Writeable, ToXContentFragment { + + override fun writeTo(out: StreamOutput) { + out.writeOptionalString(transformId) + } + + override fun toXContent(builder: XContentBuilder, params: ToXContent.Params?): XContentBuilder { + if (transformId != null) builder.field(Properties.TRANSFORM_ID.key, transformId) + return builder + } + + companion object { + const val TRANSFORM_ACTION_PROPERTIES = "transform_action_properties" + + fun fromStreamInput(sin: StreamInput): TransformActionProperties { + val transformId: String? = sin.readOptionalString() + return TransformActionProperties(transformId) + } + + fun parse(xcp: XContentParser): TransformActionProperties { + var transformId: String? = null + + ensureExpectedToken(XContentParser.Token.START_OBJECT, xcp.currentToken(), xcp) + while (xcp.nextToken() != XContentParser.Token.END_OBJECT) { + val fieldName = xcp.currentName() + xcp.nextToken() + + when (fieldName) { + Properties.TRANSFORM_ID.key -> transformId = xcp.text() + } + } + + return TransformActionProperties(transformId) + } + } + + enum class Properties(val key: String) { + TRANSFORM_ID("transform_id") + } +} diff --git a/src/main/kotlin/org/opensearch/indexmanagement/IndexManagementPlugin.kt b/src/main/kotlin/org/opensearch/indexmanagement/IndexManagementPlugin.kt index c93df3b0d..7cc528f9e 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/IndexManagementPlugin.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/IndexManagementPlugin.kt @@ -460,7 +460,7 @@ class IndexManagementPlugin : JobSchedulerExtension, NetworkPlugin, ActionPlugin val managedIndexCoordinator = ManagedIndexCoordinator( environment.settings(), - client, clusterService, threadPool, indexManagementIndices, metadataService, templateService, indexMetadataProvider + client, clusterService, threadPool, indexManagementIndices, metadataService, templateService, indexMetadataProvider, xContentRegistry ) val smRunner = SMRunner.init(client, threadPool, settings, indexManagementIndices, clusterService) diff --git a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/ISMActionsParser.kt b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/ISMActionsParser.kt index 9db2b0f67..a45cfddae 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/ISMActionsParser.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/ISMActionsParser.kt @@ -23,6 +23,7 @@ import org.opensearch.indexmanagement.indexstatemanagement.action.RolloverAction import org.opensearch.indexmanagement.indexstatemanagement.action.RollupActionParser import org.opensearch.indexmanagement.indexstatemanagement.action.ShrinkActionParser import org.opensearch.indexmanagement.indexstatemanagement.action.SnapshotActionParser +import org.opensearch.indexmanagement.indexstatemanagement.action.TransformActionParser import org.opensearch.indexmanagement.spi.indexstatemanagement.Action import org.opensearch.indexmanagement.spi.indexstatemanagement.ActionParser import org.opensearch.indexmanagement.spi.indexstatemanagement.model.ActionRetry @@ -49,7 +50,8 @@ class ISMActionsParser private constructor() { RollupActionParser(), RolloverActionParser(), ShrinkActionParser(), - SnapshotActionParser() + SnapshotActionParser(), + TransformActionParser(), ) val customActionExtensionMap = mutableMapOf() diff --git a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/ManagedIndexCoordinator.kt b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/ManagedIndexCoordinator.kt index ad7c460c7..cc765acf2 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/ManagedIndexCoordinator.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/ManagedIndexCoordinator.kt @@ -42,6 +42,7 @@ import org.opensearch.common.regex.Regex import org.opensearch.common.settings.Settings import org.opensearch.common.unit.TimeValue import org.opensearch.commons.authuser.User +import org.opensearch.core.xcontent.NamedXContentRegistry import org.opensearch.core.index.Index import org.opensearch.index.query.QueryBuilders import org.opensearch.indexmanagement.IndexManagementIndices @@ -117,7 +118,8 @@ class ManagedIndexCoordinator( indexManagementIndices: IndexManagementIndices, private val metadataService: MetadataService, private val templateService: ISMTemplateService, - private val indexMetadataProvider: IndexMetadataProvider + private val indexMetadataProvider: IndexMetadataProvider, + private val xContentRegistry: NamedXContentRegistry ) : ClusterStateListener, CoroutineScope by CoroutineScope(SupervisorJob() + Dispatchers.Default + CoroutineName("ManagedIndexCoordinator")), LifecycleListener() { @@ -462,7 +464,7 @@ class ManagedIndexCoordinator( return try { val response: SearchResponse = client.suspendUntil { search(searchRequest, it) } - parseFromSearchResponse(response = response, parse = Policy.Companion::parse) + parseFromSearchResponse(response, xContentRegistry, Policy.Companion::parse) } catch (ex: IndexNotFoundException) { emptyList() } catch (ex: ClusterBlockException) { @@ -713,7 +715,7 @@ class ManagedIndexCoordinator( } mRes.forEach { if (it.response.isExists) { - result[it.id] = contentParser(it.response.sourceAsBytesRef).parseWithType( + result[it.id] = contentParser(it.response.sourceAsBytesRef, xContentRegistry).parseWithType( it.response.id, it.response.seqNo, it.response.primaryTerm, ManagedIndexConfig.Companion::parse ) } diff --git a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/action/TransformAction.kt b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/action/TransformAction.kt new file mode 100644 index 000000000..0de34c15b --- /dev/null +++ b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/action/TransformAction.kt @@ -0,0 +1,63 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.indexmanagement.indexstatemanagement.action + +import org.opensearch.core.common.io.stream.StreamOutput +import org.opensearch.core.xcontent.ToXContent +import org.opensearch.core.xcontent.XContentBuilder +import org.opensearch.indexmanagement.indexstatemanagement.step.transform.AttemptCreateTransformJobStep +import org.opensearch.indexmanagement.indexstatemanagement.step.transform.WaitForTransformCompletionStep +import org.opensearch.indexmanagement.spi.indexstatemanagement.Action +import org.opensearch.indexmanagement.spi.indexstatemanagement.Step +import org.opensearch.indexmanagement.spi.indexstatemanagement.model.StepContext +import org.opensearch.indexmanagement.transform.model.ISMTransform + +class TransformAction( + val ismTransform: ISMTransform, + index: Int +) : Action(name, index) { + + companion object { + const val name = "transform" + const val ISM_TRANSFORM_FIELD = "ism_transform" + } + + private val attemptCreateTransformJobStep = AttemptCreateTransformJobStep(this) + private val waitForTransformCompletionStep = WaitForTransformCompletionStep() + private val steps = listOf(attemptCreateTransformJobStep, waitForTransformCompletionStep) + + @Suppress("ReturnCount") + override fun getStepToExecute(context: StepContext): Step { + // if stepMetaData is null, return first step + val stepMetaData = context.metadata.stepMetaData ?: return attemptCreateTransformJobStep + + // if the current step has completed, return the next step + if (stepMetaData.stepStatus == Step.StepStatus.COMPLETED) { + return when (stepMetaData.name) { + AttemptCreateTransformJobStep.name -> waitForTransformCompletionStep + else -> attemptCreateTransformJobStep + } + } + + return when (stepMetaData.name) { + AttemptCreateTransformJobStep.name -> attemptCreateTransformJobStep + else -> waitForTransformCompletionStep + } + } + + override fun getSteps(): List = steps + + override fun populateAction(builder: XContentBuilder, params: ToXContent.Params) { + builder.startObject(type) + builder.field(ISM_TRANSFORM_FIELD, ismTransform) + builder.endObject() + } + + override fun populateAction(out: StreamOutput) { + ismTransform.writeTo(out) + out.writeInt(actionIndex) + } +} diff --git a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/action/TransformActionParser.kt b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/action/TransformActionParser.kt new file mode 100644 index 000000000..9d7eb1360 --- /dev/null +++ b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/action/TransformActionParser.kt @@ -0,0 +1,42 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.indexmanagement.indexstatemanagement.action + +import org.opensearch.core.common.io.stream.StreamInput +import org.opensearch.core.xcontent.XContentParser +import org.opensearch.core.xcontent.XContentParserUtils +import org.opensearch.indexmanagement.spi.indexstatemanagement.Action +import org.opensearch.indexmanagement.spi.indexstatemanagement.ActionParser +import org.opensearch.indexmanagement.transform.model.ISMTransform + +class TransformActionParser : ActionParser() { + override fun fromStreamInput(sin: StreamInput): Action { + val ismTransform = ISMTransform(sin) + val index = sin.readInt() + return TransformAction(ismTransform, index) + } + + override fun fromXContent(xcp: XContentParser, index: Int): Action { + var ismTransform: ISMTransform? = null + + XContentParserUtils.ensureExpectedToken(XContentParser.Token.START_OBJECT, xcp.currentToken(), xcp) + while (xcp.nextToken() != XContentParser.Token.END_OBJECT) { + val fieldName = xcp.currentName() + xcp.nextToken() + + when (fieldName) { + TransformAction.ISM_TRANSFORM_FIELD -> ismTransform = ISMTransform.parse(xcp) + else -> throw IllegalArgumentException("Invalid field: [$fieldName] found in TransformAction.") + } + } + + return TransformAction(ismTransform = requireNotNull(ismTransform) { "TransformAction transform is null." }, index) + } + + override fun getActionType(): String { + return TransformAction.name + } +} diff --git a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/step/transform/AttemptCreateTransformJobStep.kt b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/step/transform/AttemptCreateTransformJobStep.kt new file mode 100644 index 000000000..806803e3c --- /dev/null +++ b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/step/transform/AttemptCreateTransformJobStep.kt @@ -0,0 +1,127 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.indexmanagement.indexstatemanagement.step.transform + +import org.apache.logging.log4j.LogManager +import org.opensearch.ExceptionsHelper +import org.opensearch.OpenSearchException +import org.opensearch.action.support.WriteRequest +import org.opensearch.action.support.master.AcknowledgedResponse +import org.opensearch.client.Client +import org.opensearch.index.engine.VersionConflictEngineException +import org.opensearch.indexmanagement.indexstatemanagement.action.TransformAction +import org.opensearch.indexmanagement.opensearchapi.suspendUntil +import org.opensearch.indexmanagement.spi.indexstatemanagement.Step +import org.opensearch.indexmanagement.spi.indexstatemanagement.model.ActionProperties +import org.opensearch.indexmanagement.spi.indexstatemanagement.model.ManagedIndexMetaData +import org.opensearch.indexmanagement.spi.indexstatemanagement.model.StepContext +import org.opensearch.indexmanagement.spi.indexstatemanagement.model.StepMetaData +import org.opensearch.indexmanagement.spi.indexstatemanagement.model.TransformActionProperties +import org.opensearch.indexmanagement.transform.action.index.IndexTransformAction +import org.opensearch.indexmanagement.transform.action.index.IndexTransformRequest +import org.opensearch.indexmanagement.transform.action.index.IndexTransformResponse +import org.opensearch.indexmanagement.transform.action.start.StartTransformAction +import org.opensearch.indexmanagement.transform.action.start.StartTransformRequest +import org.opensearch.transport.RemoteTransportException + +class AttemptCreateTransformJobStep( + private val action: TransformAction +) : Step(name) { + + private val logger = LogManager.getLogger(javaClass) + private var stepStatus = StepStatus.STARTING + private var info: Map? = null + private var transformId: String? = null + + override suspend fun execute(): Step { + val context = this.context ?: return this + val indexName = context.metadata.index + val managedIndexMetadata = context.metadata + val startedTransformId = managedIndexMetadata.actionMetaData?.actionProperties?.transformActionProperties?.transformId + + // Creating a transform job + val transform = action.ismTransform.toTransform(indexName, context.user) + transformId = transform.id + logger.info("Attempting to create a transform job $transformId for index $indexName") + + val indexTransformRequest = IndexTransformRequest(transform, WriteRequest.RefreshPolicy.IMMEDIATE) + + try { + val response: IndexTransformResponse = context.client.suspendUntil { execute(IndexTransformAction.INSTANCE, indexTransformRequest, it) } + logger.info("Received status ${response.status.status} on trying to create transform job $transformId") + + stepStatus = StepStatus.COMPLETED + info = mapOf("message" to getSuccessMessage(transform.id, indexName)) + } catch (e: VersionConflictEngineException) { + val message = getTransformJobAlreadyExistsMessage(transform.id, indexName) + logger.info(message) + if (startedTransformId == null) { + // restart the transform job when this is another execution of the same action in the ISM policy + startTransformJob(transform.id, context) + } else { + // directly mark as complete when this is a retry of this step + stepStatus = StepStatus.COMPLETED + info = mapOf("info" to message) + } + } catch (e: RemoteTransportException) { + processFailure(transform.id, indexName, ExceptionsHelper.unwrapCause(e) as Exception) + } catch (e: OpenSearchException) { + processFailure(transform.id, indexName, e) + } catch (e: Exception) { + processFailure(transform.id, indexName, e) + } + + return this + } + + fun processFailure(transformId: String, indexName: String, e: Exception) { + val message = getFailedMessage(transformId, indexName) + logger.error(message, e) + this.transformId = null + stepStatus = StepStatus.FAILED + info = mapOf("message" to message, "cause" to "${e.message}") + } + + private suspend fun startTransformJob(transformId: String, context: StepContext) { + val indexName = context.metadata.index + val client = context.client + logger.info("Attempting to re-start the transform job $transformId") + try { + val startTransformRequest = StartTransformRequest(transformId) + client.suspendUntil { execute(StartTransformAction.INSTANCE, startTransformRequest, it) } + stepStatus = StepStatus.COMPLETED + info = mapOf("message" to getSuccessRestartMessage(transformId, indexName)) + } catch (e: Exception) { + val message = getFailedToStartMessage(transformId, indexName) + logger.error(message, e) + stepStatus = StepStatus.FAILED + info = mapOf("message" to message) + } + } + + override fun getUpdatedManagedIndexMetadata(currentMetadata: ManagedIndexMetaData): ManagedIndexMetaData { + val currentActionMetaData = currentMetadata.actionMetaData + val transformActionProperties = TransformActionProperties(transformId) + return currentMetadata.copy( + actionMetaData = currentActionMetaData?.copy(actionProperties = ActionProperties(transformActionProperties = transformActionProperties)), + stepMetaData = StepMetaData(name, getStepStartTime(currentMetadata).toEpochMilli(), stepStatus), + transitionTo = null, + info = info + ) + } + + override fun isIdempotent(): Boolean = true + + companion object { + const val name = "attempt_create_transform" + fun getFailedMessage(transformId: String, index: String) = "Failed to create the transform job [$transformId] [index=$index]" + fun getTransformJobAlreadyExistsMessage(transformId: String, index: String) = + "Transform job [$transformId] already exists, skipping creation [index=$index]" + fun getFailedToStartMessage(transformId: String, index: String) = "Failed to start the transform job [$transformId] [index=$index]" + fun getSuccessMessage(transformId: String, index: String) = "Successfully created the transform job [$transformId] [index=$index]" + fun getSuccessRestartMessage(transformId: String, index: String) = "Successfully restarted the transform job [$transformId] [index=$index]" + } +} diff --git a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/step/transform/WaitForTransformCompletionStep.kt b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/step/transform/WaitForTransformCompletionStep.kt new file mode 100644 index 000000000..78aa41f3d --- /dev/null +++ b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/step/transform/WaitForTransformCompletionStep.kt @@ -0,0 +1,132 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.indexmanagement.indexstatemanagement.step.transform + +import org.apache.logging.log4j.LogManager +import org.opensearch.ExceptionsHelper +import org.opensearch.indexmanagement.opensearchapi.suspendUntil +import org.opensearch.indexmanagement.spi.indexstatemanagement.Step +import org.opensearch.indexmanagement.spi.indexstatemanagement.model.ManagedIndexMetaData +import org.opensearch.indexmanagement.spi.indexstatemanagement.model.StepContext +import org.opensearch.indexmanagement.spi.indexstatemanagement.model.StepMetaData +import org.opensearch.indexmanagement.transform.action.explain.ExplainTransformAction +import org.opensearch.indexmanagement.transform.action.explain.ExplainTransformRequest +import org.opensearch.indexmanagement.transform.action.explain.ExplainTransformResponse +import org.opensearch.indexmanagement.transform.model.TransformMetadata +import org.opensearch.transport.RemoteTransportException + +@Suppress("ReturnCount") +class WaitForTransformCompletionStep : Step(name) { + + private val logger = LogManager.getLogger(javaClass) + private var stepStatus = StepStatus.STARTING + private var info: Map? = null + + override suspend fun execute(): Step { + val context = this.context ?: return this + val indexName = context.metadata.index + val managedIndexMetadata = context.metadata + val transformJobId = managedIndexMetadata.actionMetaData?.actionProperties?.transformActionProperties?.transformId + + if (transformJobId == null) { + logger.error("No transform job id passed down.") + stepStatus = StepStatus.FAILED + info = mapOf("message" to getMissingTransformJobMessage(indexName)) + return this + } + + val explainTransformResponse = explainTransformJob(transformJobId, indexName, context) + // if explainTransform call failed, return early + explainTransformResponse ?: return this + + val explainTransform = explainTransformResponse.getIdsToExplain()[transformJobId] + if (explainTransform == null) { + logger.warn("Job $transformJobId is not found, mark step as COMPLETED.") + stepStatus = StepStatus.COMPLETED + info = mapOf("message" to getJobNotFoundMessage(transformJobId, indexName)) + return this + } + + if (explainTransform.metadata?.status == null) { + logger.warn("Job $transformJobId has not started yet") + stepStatus = StepStatus.CONDITION_NOT_MET + info = mapOf("message" to getJobProcessingMessage(transformJobId, indexName)) + return this + } + + processTransformMetadataStatus(transformJobId, indexName, explainTransform.metadata) + return this + } + + private suspend fun explainTransformJob(transformJobId: String, indexName: String, context: StepContext): ExplainTransformResponse? { + val explainTransformRequest = ExplainTransformRequest(listOf(transformJobId)) + try { + val response = context.client.suspendUntil { + execute(ExplainTransformAction.INSTANCE, explainTransformRequest, it) + } + logger.info("Received the status for jobs [${response.getIdsToExplain().keys}]") + return response + } catch (e: RemoteTransportException) { + processFailure(transformJobId, indexName, ExceptionsHelper.unwrapCause(e) as Exception) + } catch (e: Exception) { + processFailure(transformJobId, indexName, e) + } + return null + } + + fun processTransformMetadataStatus(transformJobId: String, indexName: String, transformMetadata: TransformMetadata) { + when (transformMetadata.status) { + TransformMetadata.Status.INIT, TransformMetadata.Status.STARTED -> { + stepStatus = StepStatus.CONDITION_NOT_MET + info = mapOf("message" to getJobProcessingMessage(transformJobId, indexName)) + } + TransformMetadata.Status.FAILED -> { + stepStatus = StepStatus.FAILED + info = mapOf("message" to getJobFailedMessage(transformJobId, indexName), "cause" to "${transformMetadata.failureReason}") + } + TransformMetadata.Status.FINISHED -> { + stepStatus = StepStatus.COMPLETED + info = mapOf("message" to getJobCompletionMessage(transformJobId, indexName)) + } + TransformMetadata.Status.STOPPED -> { + stepStatus = StepStatus.FAILED + info = mapOf("message" to getJobFailedMessage(transformJobId, indexName), "cause" to JOB_STOPPED_MESSAGE) + } + } + } + + fun processFailure(transformJobId: String, indexName: String, e: Exception) { + stepStatus = StepStatus.FAILED + val message = getFailedMessage(transformJobId, indexName) + logger.error(message, e) + val mutableInfo = mutableMapOf("message" to message) + val errorMessage = e.message + if (errorMessage != null) mutableInfo["cause"] = errorMessage + info = mutableInfo.toMap() + } + + override fun getUpdatedManagedIndexMetadata(currentMetadata: ManagedIndexMetaData): ManagedIndexMetaData { + return currentMetadata.copy( + actionMetaData = currentMetadata.actionMetaData, + stepMetaData = StepMetaData(name, getStepStartTime(currentMetadata).toEpochMilli(), stepStatus), + transitionTo = null, + info = info + ) + } + + override fun isIdempotent(): Boolean = true + + companion object { + const val name = "wait_for_transform_completion" + const val JOB_STOPPED_MESSAGE = "Transform job was stopped" + fun getFailedMessage(transformJob: String, index: String) = "Failed to get the status of transform job [$transformJob] [index=$index]" + fun getJobProcessingMessage(transformJob: String, index: String) = "Transform job [$transformJob] is still processing [index=$index]" + fun getJobCompletionMessage(transformJob: String, index: String) = "Transform job [$transformJob] completed [index=$index]" + fun getJobFailedMessage(transformJob: String, index: String) = "Transform job [$transformJob] failed [index=$index]" + fun getMissingTransformJobMessage(index: String) = "Transform job was not found [index=$index]" + fun getJobNotFoundMessage(transformJob: String, index: String) = "Transform job [$transformJob] is not found [index=$index]" + } +} diff --git a/src/main/kotlin/org/opensearch/indexmanagement/opensearchapi/OpenSearchExtensions.kt b/src/main/kotlin/org/opensearch/indexmanagement/opensearchapi/OpenSearchExtensions.kt index a6325668f..80f460028 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/opensearchapi/OpenSearchExtensions.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/opensearchapi/OpenSearchExtensions.kt @@ -64,10 +64,12 @@ import kotlin.coroutines.suspendCoroutine const val OPENDISTRO_SECURITY_PROTECTED_INDICES_CONF_REQUEST = "_opendistro_security_protected_indices_conf_request" -fun contentParser(bytesReference: BytesReference): XContentParser { +fun contentParser(bytesReference: BytesReference, xContentRegistry: NamedXContentRegistry = NamedXContentRegistry.EMPTY): XContentParser { return XContentHelper.createParser( - NamedXContentRegistry.EMPTY, - LoggingDeprecationHandler.INSTANCE, bytesReference, XContentType.JSON + xContentRegistry, + LoggingDeprecationHandler.INSTANCE, + bytesReference, + XContentType.JSON ) } diff --git a/src/main/kotlin/org/opensearch/indexmanagement/transform/model/ISMTransform.kt b/src/main/kotlin/org/opensearch/indexmanagement/transform/model/ISMTransform.kt new file mode 100644 index 000000000..ecc092ac9 --- /dev/null +++ b/src/main/kotlin/org/opensearch/indexmanagement/transform/model/ISMTransform.kt @@ -0,0 +1,208 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.indexmanagement.transform.model + +import org.apache.commons.codec.digest.DigestUtils +import org.opensearch.common.xcontent.LoggingDeprecationHandler +import org.opensearch.common.xcontent.XContentFactory +import org.opensearch.common.xcontent.XContentType +import org.opensearch.commons.authuser.User +import org.opensearch.core.common.bytes.BytesReference +import org.opensearch.core.common.io.stream.StreamInput +import org.opensearch.core.common.io.stream.StreamOutput +import org.opensearch.core.common.io.stream.Writeable +import org.opensearch.core.xcontent.ToXContent +import org.opensearch.core.xcontent.ToXContentObject +import org.opensearch.core.xcontent.XContentBuilder +import org.opensearch.core.xcontent.XContentParser +import org.opensearch.core.xcontent.XContentParserUtils +import org.opensearch.index.query.AbstractQueryBuilder +import org.opensearch.index.query.MatchAllQueryBuilder +import org.opensearch.index.query.QueryBuilder +import org.opensearch.index.seqno.SequenceNumbers +import org.opensearch.indexmanagement.common.model.dimension.DateHistogram +import org.opensearch.indexmanagement.common.model.dimension.Dimension +import org.opensearch.indexmanagement.common.model.dimension.Histogram +import org.opensearch.indexmanagement.common.model.dimension.Terms +import org.opensearch.indexmanagement.util.IndexUtils +import org.opensearch.jobscheduler.spi.schedule.IntervalSchedule +import org.opensearch.search.aggregations.AggregatorFactories +import java.io.IOException +import java.lang.StringBuilder +import java.time.Instant +import java.time.temporal.ChronoUnit +import kotlin.jvm.Throws + +data class ISMTransform( + val description: String, + val targetIndex: String, + val pageSize: Int, + val dataSelectionQuery: QueryBuilder = MatchAllQueryBuilder(), + val groups: List, + val aggregations: AggregatorFactories.Builder = AggregatorFactories.builder() +) : ToXContentObject, Writeable { + + init { + require(pageSize in Transform.MINIMUM_PAGE_SIZE..Transform.MAXIMUM_PAGE_SIZE) { + "Page size must be between ${Transform.MINIMUM_PAGE_SIZE} and ${Transform.MAXIMUM_PAGE_SIZE}" + } + require(description.isNotEmpty()) { "Description cannot be empty" } + require(targetIndex.isNotEmpty()) { "TargetIndex cannot be empty" } + require(groups.isNotEmpty()) { "Groups cannot be empty" } + aggregations.aggregatorFactories.forEach { + require(Transform.supportedAggregations.contains(it.type)) { + "Unsupported aggregation [${it.type}]" + } + } + } + + override fun toXContent(builder: XContentBuilder, params: ToXContent.Params?): XContentBuilder { + builder.startObject() + .field(Transform.DESCRIPTION_FIELD, description) + .field(Transform.TARGET_INDEX_FIELD, targetIndex) + .field(Transform.PAGE_SIZE_FIELD, pageSize) + .field(Transform.DATA_SELECTION_QUERY_FIELD, dataSelectionQuery) + .field(Transform.GROUPS_FIELD, groups) + .field(Transform.AGGREGATIONS_FIELD, aggregations) + builder.endObject() + return builder + } + + fun toTransform(sourceIndex: String, user: User? = null): Transform { + val id = sourceIndex + toString() + val currentTime = Instant.now() + return Transform( + id = DigestUtils.sha1Hex(id), + seqNo = SequenceNumbers.UNASSIGNED_SEQ_NO, + primaryTerm = SequenceNumbers.UNASSIGNED_PRIMARY_TERM, + schemaVersion = IndexUtils.DEFAULT_SCHEMA_VERSION, + jobSchedule = IntervalSchedule(Instant.now(), 1, ChronoUnit.MINUTES), + metadataId = null, + updatedAt = currentTime, + enabled = true, + enabledAt = currentTime, + description = this.description, + sourceIndex = sourceIndex, + dataSelectionQuery = this.dataSelectionQuery, + targetIndex = this.targetIndex, + pageSize = pageSize, + continuous = false, + groups = this.groups, + aggregations = this.aggregations, + user = user + ) + } + + @Throws(IOException::class) + constructor(sin: StreamInput) : this( + description = sin.readString(), + targetIndex = sin.readString(), + pageSize = sin.readInt(), + dataSelectionQuery = requireNotNull(sin.readOptionalNamedWriteable(QueryBuilder::class.java)) { "Query cannot be null" }, + groups = sin.let { + val dimensionList = mutableListOf() + val size = it.readVInt() + repeat(size) { _ -> + val type = it.readEnum(Dimension.Type::class.java) + dimensionList.add( + when (requireNotNull(type) { "Dimension type cannot be null" }) { + Dimension.Type.DATE_HISTOGRAM -> DateHistogram(sin) + Dimension.Type.TERMS -> Terms(sin) + Dimension.Type.HISTOGRAM -> Histogram(sin) + } + ) + } + dimensionList.toList() + }, + aggregations = requireNotNull(sin.readOptionalWriteable { AggregatorFactories.Builder(it) }) { "Aggregations cannot be null" } + ) + + override fun toString(): String { + val sbd = StringBuilder() + sbd.append(targetIndex) + sbd.append(pageSize) + sbd.append(dataSelectionQuery) + groups.forEach { + sbd.append(it.type) + sbd.append(it.sourceField) + } + sbd.append(aggregations) + + return sbd.toString() + } + + override fun writeTo(out: StreamOutput) { + out.writeString(description) + out.writeString(targetIndex) + out.writeInt(pageSize) + out.writeOptionalNamedWriteable(dataSelectionQuery) + out.writeVInt(groups.size) + for (group in groups) { + out.writeEnum(group.type) + when (group) { + is DateHistogram -> group.writeTo(out) + is Terms -> group.writeTo(out) + is Histogram -> group.writeTo(out) + } + } + out.writeOptionalWriteable(aggregations) + } + + companion object { + @Suppress("ComplexMethod", "LongMethod") + @JvmStatic + @Throws(IOException::class) + fun parse(xcp: XContentParser): ISMTransform { + var description = "" + var targetIndex = "" + var pageSize = 0 + var dataSelectionQuery: QueryBuilder = MatchAllQueryBuilder() + val groups = mutableListOf() + var aggregations: AggregatorFactories.Builder = AggregatorFactories.builder() + + XContentParserUtils.ensureExpectedToken(XContentParser.Token.START_OBJECT, xcp.currentToken(), xcp) + + while (xcp.nextToken() != XContentParser.Token.END_OBJECT) { + val fieldName = xcp.currentName() + xcp.nextToken() + + when (fieldName) { + Transform.DESCRIPTION_FIELD -> description = xcp.text() + Transform.TARGET_INDEX_FIELD -> targetIndex = xcp.text() + Transform.PAGE_SIZE_FIELD -> pageSize = xcp.intValue() + Transform.DATA_SELECTION_QUERY_FIELD -> { + val registry = xcp.xContentRegistry + val source = xcp.mapOrdered() + val xContentBuilder = XContentFactory.jsonBuilder().map(source) + val sourceParser = XContentType.JSON.xContent().createParser( + registry, LoggingDeprecationHandler.INSTANCE, + BytesReference + .bytes(xContentBuilder).streamInput() + ) + dataSelectionQuery = AbstractQueryBuilder.parseInnerQueryBuilder(sourceParser) + } + Transform.GROUPS_FIELD -> { + XContentParserUtils.ensureExpectedToken(XContentParser.Token.START_ARRAY, xcp.currentToken(), xcp) + while (xcp.nextToken() != XContentParser.Token.END_ARRAY) { + groups.add(Dimension.parse(xcp)) + } + } + Transform.AGGREGATIONS_FIELD -> aggregations = AggregatorFactories.parseAggregators(xcp) + else -> throw IllegalArgumentException("Invalid field [$fieldName] found in ISM Transform.") + } + } + + return ISMTransform( + description = description, + targetIndex = targetIndex, + pageSize = pageSize, + dataSelectionQuery = dataSelectionQuery, + groups = groups, + aggregations = aggregations + ) + } + } +} diff --git a/src/main/resources/mappings/opendistro-ism-config.json b/src/main/resources/mappings/opendistro-ism-config.json index 588e886b4..bdcdcea31 100644 --- a/src/main/resources/mappings/opendistro-ism-config.json +++ b/src/main/resources/mappings/opendistro-ism-config.json @@ -1,6 +1,6 @@ { "_meta" : { - "schema_version": 19 + "schema_version": 20 }, "dynamic": "strict", "properties": { @@ -455,6 +455,83 @@ } } }, + "transform": { + "properties": { + "ism_transform": { + "properties": { + "description": { + "type": "text" + }, + "target_index": { + "type": "text", + "fields": { + "keyword": { + "type": "keyword", + "ignore_above": 256 + } + } + }, + "page_size": { + "type": "long" + }, + "data_selection_query": { + "type": "object", + "enabled": false + }, + "groups": { + "properties": { + "date_histogram": { + "properties": { + "source_field": { + "type": "keyword" + }, + "target_field": { + "type": "keyword" + }, + "fixed_interval": { + "type": "keyword" + }, + "calendar_interval": { + "type": "keyword" + }, + "timezone": { + "type": "keyword" + } + } + }, + "terms": { + "properties": { + "source_field": { + "type": "keyword" + }, + "target_field": { + "type": "keyword" + } + } + }, + "histogram": { + "properties": { + "source_field": { + "type": "keyword" + }, + "target_field": { + "type": "keyword" + }, + "interval": { + "type": "double" + } + } + } + } + }, + "aggregations": { + "type": "object", + "enabled": false + } + } + } + } + }, "shrink": { "properties": { "num_new_shards": { @@ -789,6 +866,10 @@ "shrink_action_properties": { "type": "object", "enabled": false + }, + "transform_action_properties": { + "type": "object", + "enabled": false } } } diff --git a/src/main/resources/mappings/opendistro-ism-history.json b/src/main/resources/mappings/opendistro-ism-history.json index 0e7db6d40..6ce6e1779 100644 --- a/src/main/resources/mappings/opendistro-ism-history.json +++ b/src/main/resources/mappings/opendistro-ism-history.json @@ -1,6 +1,6 @@ { "_meta" : { - "schema_version": 6 + "schema_version": 7 }, "dynamic": "strict", "properties": { @@ -115,6 +115,10 @@ "shrink_action_properties": { "type": "object", "enabled": false + }, + "transform_action_properties": { + "type": "object", + "enabled": false } } } diff --git a/src/test/kotlin/org/opensearch/indexmanagement/IndexManagementRestTestCase.kt b/src/test/kotlin/org/opensearch/indexmanagement/IndexManagementRestTestCase.kt index 1428cfea4..5a55f0ee1 100644 --- a/src/test/kotlin/org/opensearch/indexmanagement/IndexManagementRestTestCase.kt +++ b/src/test/kotlin/org/opensearch/indexmanagement/IndexManagementRestTestCase.kt @@ -28,12 +28,13 @@ import org.opensearch.indexmanagement.indexstatemanagement.util.INDEX_HIDDEN import org.opensearch.core.rest.RestStatus import org.opensearch.core.xcontent.MediaType import org.opensearch.indexmanagement.rollup.model.Rollup +import org.opensearch.indexmanagement.transform.model.Transform import org.opensearch.jobscheduler.spi.schedule.IntervalSchedule import java.io.IOException import java.nio.file.Files -import java.util.Date import java.time.Duration import java.time.Instant +import java.util.Date import java.util.* import javax.management.MBeanServerInvocationHandler import javax.management.ObjectName @@ -42,8 +43,8 @@ import javax.management.remote.JMXServiceURL abstract class IndexManagementRestTestCase : ODFERestTestCase() { - val configSchemaVersion = 19 - val historySchemaVersion = 6 + val configSchemaVersion = 20 + val historySchemaVersion = 7 // Having issues with tests leaking into other tests and mappings being incorrect and they are not caught by any pending task wait check as // they do not go through the pending task queue. Ideally this should probably be written in a way to wait for the @@ -230,6 +231,35 @@ abstract class IndexManagementRestTestCase : ODFERestTestCase() { assertEquals("Request failed", RestStatus.OK, response.restStatus()) } + protected fun updateTransformStartTime(update: Transform, desiredStartTimeMillis: Long? = null) { + // Before updating start time of a job always make sure there are no unassigned shards that could cause the config + // index to move to a new node and negate this forced start + if (isMultiNode) { + waitFor { + try { + client().makeRequest("GET", "_cluster/allocation/explain") + fail("Expected 400 Bad Request when there are no unassigned shards to explain") + } catch (e: ResponseException) { + assertEquals(RestStatus.BAD_REQUEST, e.response.restStatus()) + } + } + } + val intervalSchedule = (update.jobSchedule as IntervalSchedule) + val millis = Duration.of(intervalSchedule.interval.toLong(), intervalSchedule.unit).minusSeconds(2).toMillis() + val startTimeMillis = desiredStartTimeMillis ?: (Instant.now().toEpochMilli() - millis) + val waitForActiveShards = if (isMultiNode) "all" else "1" + val response = client().makeRequest( + "POST", "${IndexManagementPlugin.INDEX_MANAGEMENT_INDEX}/_update/${update.id}?wait_for_active_shards=$waitForActiveShards", + StringEntity( + "{\"doc\":{\"transform\":{\"schedule\":{\"interval\":{\"start_time\":" + + "\"$startTimeMillis\"}}}}}", + ContentType.APPLICATION_JSON + ) + ) + + assertEquals("Request failed", RestStatus.OK, response.restStatus()) + } + override fun preserveIndicesUponCompletion(): Boolean = true companion object { val isMultiNode = System.getProperty("cluster.number_of_nodes", "1").toInt() > 1 diff --git a/src/test/kotlin/org/opensearch/indexmanagement/SecurityRestTestCase.kt b/src/test/kotlin/org/opensearch/indexmanagement/SecurityRestTestCase.kt index cf5709ccd..b38b7e112 100644 --- a/src/test/kotlin/org/opensearch/indexmanagement/SecurityRestTestCase.kt +++ b/src/test/kotlin/org/opensearch/indexmanagement/SecurityRestTestCase.kt @@ -109,9 +109,6 @@ abstract class SecurityRestTestCase : IndexManagementRestTestCase() { private object TransformRestTestCaseExt : TransformRestTestCase() { - fun updateTransformStartTimeExt(update: Transform, desiredStartTimeMillis: Long? = null) = - super.updateTransformStartTime(update, desiredStartTimeMillis) - fun createTransformExt( transform: Transform, transformId: String = randomAlphaOfLength(10), @@ -310,9 +307,6 @@ abstract class SecurityRestTestCase : IndexManagementRestTestCase() { executeRequest(request, expectedStatus, userClient) } - protected fun updateTransformStartTime(update: Transform, desiredStartTimeMillis: Long? = null) = - TransformRestTestCaseExt.updateTransformStartTimeExt(update, desiredStartTimeMillis) - protected fun createTransform( transform: Transform, transformId: String = randomAlphaOfLength(10), diff --git a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/IndexStateManagementRestTestCase.kt b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/IndexStateManagementRestTestCase.kt index 2cf8d1ee7..e0b7b9e1b 100644 --- a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/IndexStateManagementRestTestCase.kt +++ b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/IndexStateManagementRestTestCase.kt @@ -19,6 +19,7 @@ import org.opensearch.client.Request import org.opensearch.client.Response import org.opensearch.client.ResponseException import org.opensearch.client.RestClient +import org.opensearch.cluster.ClusterModule import org.opensearch.cluster.metadata.IndexMetadata import org.opensearch.common.settings.Settings import org.opensearch.common.unit.TimeValue @@ -60,12 +61,15 @@ import org.opensearch.indexmanagement.spi.indexstatemanagement.model.PolicyRetry import org.opensearch.indexmanagement.spi.indexstatemanagement.model.StateMetaData import org.opensearch.indexmanagement.spi.indexstatemanagement.model.StepMetaData import org.opensearch.indexmanagement.spi.indexstatemanagement.model.ValidationResult +import org.opensearch.indexmanagement.transform.model.Transform +import org.opensearch.indexmanagement.transform.model.TransformMetadata import org.opensearch.indexmanagement.util._ID import org.opensearch.indexmanagement.util._PRIMARY_TERM import org.opensearch.indexmanagement.util._SEQ_NO import org.opensearch.indexmanagement.waitFor import org.opensearch.jobscheduler.spi.schedule.IntervalSchedule import org.opensearch.rest.RestRequest +import org.opensearch.search.SearchModule import org.opensearch.core.rest.RestStatus import org.opensearch.indexmanagement.rollup.randomTermQuery import org.opensearch.test.OpenSearchTestCase @@ -843,6 +847,63 @@ abstract class IndexStateManagementRestTestCase : IndexManagementRestTestCase() return metadata } + protected fun getTransform( + transformId: String, + header: BasicHeader = BasicHeader(HttpHeaders.CONTENT_TYPE, "application/json") + ): Transform { + val response = client().makeRequest("GET", "${IndexManagementPlugin.TRANSFORM_BASE_URI}/$transformId", null, header) + assertEquals("Unable to get transform $transformId", RestStatus.OK, response.restStatus()) + + val parser = createParser(XContentType.JSON.xContent(), response.entity.content) + ensureExpectedToken(Token.START_OBJECT, parser.nextToken(), parser) + + lateinit var id: String + var primaryTerm = SequenceNumbers.UNASSIGNED_PRIMARY_TERM + var seqNo = SequenceNumbers.UNASSIGNED_SEQ_NO + lateinit var transform: Transform + + while (parser.nextToken() != Token.END_OBJECT) { + parser.nextToken() + + when (parser.currentName()) { + _ID -> id = parser.text() + _SEQ_NO -> seqNo = parser.longValue() + _PRIMARY_TERM -> primaryTerm = parser.longValue() + Transform.TRANSFORM_TYPE -> transform = Transform.parse(parser, id, seqNo, primaryTerm) + } + } + return transform + } + + protected fun getTransformMetadata( + metadataId: String, + header: BasicHeader = BasicHeader(HttpHeaders.CONTENT_TYPE, "application/json") + ): TransformMetadata { + val response = client().makeRequest("GET", "$INDEX_MANAGEMENT_INDEX/_doc/$metadataId", null, header) + assertEquals("Unable to get transform metadata $metadataId", RestStatus.OK, response.restStatus()) + + val parser = createParser(XContentType.JSON.xContent(), response.entity.content) + ensureExpectedToken(Token.START_OBJECT, parser.nextToken(), parser) + + lateinit var id: String + var primaryTerm = SequenceNumbers.UNASSIGNED_PRIMARY_TERM + var seqNo = SequenceNumbers.UNASSIGNED_SEQ_NO + lateinit var metadata: TransformMetadata + + while (parser.nextToken() != Token.END_OBJECT) { + parser.nextToken() + + when (parser.currentName()) { + _ID -> id = parser.text() + _SEQ_NO -> seqNo = parser.longValue() + _PRIMARY_TERM -> primaryTerm = parser.longValue() + TransformMetadata.TRANSFORM_METADATA_TYPE -> metadata = TransformMetadata.parse(parser, id, seqNo, primaryTerm) + } + } + + return metadata + } + protected fun deleteSnapshot(repository: String, snapshotName: String) { val response = client().makeRequest("DELETE", "_snapshot/$repository/$snapshotName") assertEquals("Unable to delete snapshot", RestStatus.OK, response.restStatus()) @@ -1080,4 +1141,13 @@ abstract class IndexStateManagementRestTestCase : IndexManagementRestTestCase() throw OpenSearchParseException("Failed to parse content to list", e) } } + + override fun xContentRegistry(): NamedXContentRegistry { + return NamedXContentRegistry( + listOf( + ClusterModule.getNamedXWriteables(), + SearchModule(Settings.EMPTY, emptyList()).namedXContents + ).flatten() + ) + } } diff --git a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/TestHelpers.kt b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/TestHelpers.kt index 7bb838f0a..2ad77d312 100644 --- a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/TestHelpers.kt +++ b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/TestHelpers.kt @@ -40,6 +40,7 @@ import org.opensearch.indexmanagement.indexstatemanagement.model.Transition import org.opensearch.indexmanagement.indexstatemanagement.model.coordinator.ClusterStateManagedIndexConfig import org.opensearch.indexmanagement.indexstatemanagement.model.coordinator.SweptManagedIndexConfig import org.opensearch.indexmanagement.common.model.notification.Channel +import org.opensearch.indexmanagement.indexstatemanagement.action.TransformAction import org.opensearch.indexmanagement.indexstatemanagement.model.destination.Chime import org.opensearch.indexmanagement.indexstatemanagement.model.destination.CustomWebhook import org.opensearch.indexmanagement.indexstatemanagement.model.destination.Destination @@ -49,6 +50,7 @@ import org.opensearch.indexmanagement.opensearchapi.string import org.opensearch.indexmanagement.rollup.randomISMRollup import org.opensearch.indexmanagement.spi.indexstatemanagement.Action import org.opensearch.indexmanagement.spi.indexstatemanagement.model.ManagedIndexMetaData +import org.opensearch.indexmanagement.transform.randomISMTransform import org.opensearch.jobscheduler.spi.schedule.CronSchedule import org.opensearch.jobscheduler.spi.schedule.IntervalSchedule import org.opensearch.jobscheduler.spi.schedule.Schedule @@ -200,6 +202,10 @@ fun randomRollupActionConfig(): RollupAction { return RollupAction(ismRollup = randomISMRollup(), index = 0) } +fun randomTransformActionConfig(): TransformAction { + return TransformAction(ismTransform = randomISMTransform(), index = 0) +} + fun randomCloseActionConfig(): CloseAction { return CloseAction(index = 0) } diff --git a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/action/TransformActionIT.kt b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/action/TransformActionIT.kt new file mode 100644 index 000000000..386e47407 --- /dev/null +++ b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/action/TransformActionIT.kt @@ -0,0 +1,376 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.indexmanagement.indexstatemanagement.action + +import org.apache.http.entity.ContentType +import org.apache.http.entity.StringEntity +import org.opensearch.cluster.metadata.DataStream +import org.opensearch.index.query.MatchAllQueryBuilder +import org.opensearch.indexmanagement.common.model.dimension.DateHistogram +import org.opensearch.indexmanagement.common.model.dimension.Terms +import org.opensearch.indexmanagement.indexstatemanagement.IndexStateManagementRestTestCase +import org.opensearch.indexmanagement.indexstatemanagement.model.ISMTemplate +import org.opensearch.indexmanagement.indexstatemanagement.model.Policy +import org.opensearch.indexmanagement.indexstatemanagement.model.State +import org.opensearch.indexmanagement.indexstatemanagement.model.Transition +import org.opensearch.indexmanagement.indexstatemanagement.randomErrorNotification +import org.opensearch.indexmanagement.indexstatemanagement.step.transform.AttemptCreateTransformJobStep +import org.opensearch.indexmanagement.indexstatemanagement.step.transform.WaitForTransformCompletionStep +import org.opensearch.indexmanagement.indexstatemanagement.step.transition.AttemptTransitionStep +import org.opensearch.indexmanagement.makeRequest +import org.opensearch.indexmanagement.spi.indexstatemanagement.model.ActionRetry +import org.opensearch.indexmanagement.transform.avgAggregation +import org.opensearch.indexmanagement.transform.maxAggregation +import org.opensearch.indexmanagement.transform.minAggregation +import org.opensearch.indexmanagement.transform.model.ISMTransform +import org.opensearch.indexmanagement.transform.model.Transform +import org.opensearch.indexmanagement.transform.model.TransformMetadata +import org.opensearch.indexmanagement.transform.sumAggregation +import org.opensearch.indexmanagement.transform.valueCountAggregation +import org.opensearch.indexmanagement.waitFor +import org.opensearch.search.aggregations.AggregatorFactories +import java.time.Instant +import java.time.temporal.ChronoUnit +import java.util.Locale + +class TransformActionIT : IndexStateManagementRestTestCase() { + + private val testPrefix = javaClass.simpleName.lowercase(Locale.ROOT) + + companion object { + const val SOURCE_INDEX_MAPPING = """ + "properties": { + "timestamp": { + "type": "date", + "format": "strict_date_optional_time||epoch_millis" + }, + "category": { + "type": "keyword" + }, + "value": { + "type": "long" + } + } + """ + } + + fun `test transform action`() { + val indexName = "${testPrefix}_index_basic" + val targetIndex = "${testPrefix}_target" + val policyId = "${testPrefix}_policy_basic" + + val ismTransform = prepareISMTransform(targetIndex) + val policy = preparePolicyContainingTransform(indexName, ismTransform, policyId) + createPolicy(policy, policyId) + createIndex(indexName, policyId, mapping = SOURCE_INDEX_MAPPING) + + assertIndexTransformSucceeded(indexName, policyId, ismTransform) + } + + fun `test data stream transform action`() { + val dataStreamName = "${testPrefix}_data_stream" + val targetIndex = "${testPrefix}_target_data_stream" + val policyId = "${testPrefix}_policy_data_stream" + + val ismTransform = prepareISMTransform(targetIndex) + val policy = preparePolicyContainingTransform(dataStreamName, ismTransform, policyId) + createPolicy(policy, policyId) + createDataStream(dataStreamName) + + // assert transform works on backing indices of a data stream + val indexName = DataStream.getDefaultBackingIndexName(dataStreamName, 1L) + assertIndexTransformSucceeded(indexName, policyId, ismTransform) + } + + fun `test transform action failure due to wrong source field`() { + val indexName = "${testPrefix}_index_failure" + val targetIndex = "${testPrefix}_target_failure" + val policyId = "${testPrefix}_policy_failure" + + val ismTransform = ISMTransform( + description = "test transform", + targetIndex = targetIndex, + pageSize = 100, + dataSelectionQuery = MatchAllQueryBuilder(), + groups = listOf( + DateHistogram(sourceField = "timestamp", fixedInterval = "1d"), + Terms(sourceField = "wrong_field", targetField = "wrong_field") + ), + aggregations = AggregatorFactories.builder() + .addAggregator(sumAggregation()) + .addAggregator(maxAggregation()) + .addAggregator(minAggregation()) + .addAggregator(avgAggregation()) + .addAggregator(valueCountAggregation()) + ) + val policy = preparePolicyContainingTransform(indexName, ismTransform, policyId) + createPolicy(policy, policyId) + createIndex(indexName, policyId, mapping = SOURCE_INDEX_MAPPING) + + assertIndexTransformFailedInAttemptCreateTransformStep(indexName, policyId, ismTransform) + } + + fun `test transform action failed step got retried`() { + val indexName = "${testPrefix}_index_retry" + val targetIndex = "${testPrefix}_target_retry" + val policyId = "${testPrefix}_policy_retry" + + val ismTransform = ISMTransform( + description = "test transform", + targetIndex = targetIndex, + pageSize = 100, + dataSelectionQuery = MatchAllQueryBuilder(), + groups = listOf( + DateHistogram(sourceField = "timestamp", fixedInterval = "1d"), + Terms(sourceField = "wrong_field", targetField = "wrong_field") + ), + aggregations = AggregatorFactories.builder() + .addAggregator(sumAggregation()) + .addAggregator(maxAggregation()) + .addAggregator(minAggregation()) + .addAggregator(avgAggregation()) + .addAggregator(valueCountAggregation()) + ) + val transform = ismTransform.toTransform(indexName) + val policy = preparePolicyContainingTransform(indexName, ismTransform, policyId, retry = 1) + createPolicy(policy, policyId) + createIndex(indexName, policyId, mapping = SOURCE_INDEX_MAPPING) + + assertIndexTransformFailedInAttemptCreateTransformStep(indexName, policyId, ismTransform) + + // verify the wait for transform completion step will be retried and failed again. + updateManagedIndexConfigStartTime(getExistingManagedIndexConfig(indexName)) + waitFor { + assertEquals( + AttemptCreateTransformJobStep.getFailedMessage(transform.id, indexName), + getExplainManagedIndexMetaData(indexName).info?.get("message") + ) + } + } + + fun `test policy succeeded when run the same transform job twice`() { + val indexName = "${testPrefix}_index_two_transforms" + val targetIndex = "${testPrefix}_target_two_transforms" + val policyId = "${testPrefix}_policy_two_transforms" + + val ismTransform = prepareISMTransform(targetIndex) + val policy = preparePolicyContainingTransformTwice(indexName, ismTransform, policyId) + createPolicy(policy, policyId) + createIndex(indexName, policyId, mapping = SOURCE_INDEX_MAPPING) + + assertIndexTransformSucceededTwice(indexName, policyId, ismTransform) + } + + // create an ISMTransform that matches SOURCE_INDEX_MAPPING + private fun prepareISMTransform(targetIndex: String): ISMTransform { + return ISMTransform( + description = "test transform", + targetIndex = targetIndex, + pageSize = 100, + dataSelectionQuery = MatchAllQueryBuilder(), + groups = listOf( + DateHistogram(sourceField = "timestamp", fixedInterval = "1d"), + Terms(sourceField = "category", targetField = "category") + ), + aggregations = AggregatorFactories.builder() + .addAggregator(sumAggregation()) + .addAggregator(maxAggregation()) + .addAggregator(minAggregation()) + .addAggregator(avgAggregation()) + .addAggregator(valueCountAggregation()) + ) + } + + private fun preparePolicyContainingTransform(indexName: String, ismTransform: ISMTransform, policyId: String, retry: Long = 0): Policy { + val actionConfig = TransformAction(ismTransform, 0) + actionConfig.configRetry = ActionRetry(retry) + val states = listOf( + State("transform", listOf(actionConfig), listOf()) + ) + return Policy( + id = policyId, + description = "test description", + schemaVersion = 1L, + lastUpdatedTime = Instant.now().truncatedTo(ChronoUnit.MILLIS), + errorNotification = randomErrorNotification(), + defaultState = states[0].name, + states = states, + ismTemplate = listOf( + ISMTemplate( + indexPatterns = listOf(indexName), + priority = 100, + lastUpdatedTime = Instant.now().truncatedTo(ChronoUnit.MILLIS) + ) + ) + ) + } + + private fun preparePolicyContainingTransformTwice( + indexName: String, + ismTransform: ISMTransform, + policyId: String, + retry: Long = 0 + ): Policy { + val actionConfig = TransformAction(ismTransform, 0) + actionConfig.configRetry = ActionRetry(retry) + val states = listOf( + State("transform1", listOf(actionConfig), listOf(Transition(stateName = "transform2", conditions = null))), + State("transform2", listOf(actionConfig), listOf()) + ) + return Policy( + id = policyId, + description = "test description", + schemaVersion = 1L, + lastUpdatedTime = Instant.now().truncatedTo(ChronoUnit.MILLIS), + errorNotification = randomErrorNotification(), + defaultState = states[0].name, + states = states, + ismTemplate = listOf( + ISMTemplate( + indexPatterns = listOf(indexName), + priority = 100, + lastUpdatedTime = Instant.now().truncatedTo(ChronoUnit.MILLIS) + ) + ) + ) + } + + private fun createDataStream(dataStreamName: String) { + // create an index template for data stream + client().makeRequest( + "PUT", + "/_index_template/${dataStreamName}_template", + StringEntity( + "{ " + + "\"index_patterns\": [ \"$dataStreamName\" ], " + + "\"data_stream\": { \"timestamp_field\": { \"name\": \"timestamp\" } }, " + + "\"template\": { \"mappings\": { $SOURCE_INDEX_MAPPING } } }", + ContentType.APPLICATION_JSON + ) + ) + // create data stream + client().makeRequest("PUT", "/_data_stream/$dataStreamName") + } + + private fun assertIndexTransformSucceeded(indexName: String, policyId: String, ismTransform: ISMTransform) { + val transform = ismTransform.toTransform(indexName) + val transformId = transform.id + val managedIndexConfig = getExistingManagedIndexConfig(indexName) + + // Change the start time so that the policy will be initialized. + updateManagedIndexConfigStartTime(managedIndexConfig) + waitFor { assertEquals(policyId, getExplainManagedIndexMetaData(indexName).policyID) } + + // Change the start time so that the transform action will be attempted. + updateManagedIndexConfigStartTime(managedIndexConfig) + waitFor { + assertEquals( + AttemptCreateTransformJobStep.getSuccessMessage(transformId, indexName), + getExplainManagedIndexMetaData(indexName).info?.get("message") + ) + } + + updateTransformStartTime(transform) + + assertTransformCompleted(transform) + + // Change the start time so that the transform action will be attempted. + updateManagedIndexConfigStartTime(managedIndexConfig) + waitFor { + assertEquals( + WaitForTransformCompletionStep.getJobCompletionMessage(transformId, indexName), + getExplainManagedIndexMetaData(indexName).info?.get("message") + ) + } + } + + private fun assertIndexTransformSucceededTwice(indexName: String, policyId: String, ismTransform: ISMTransform) { + val transform = ismTransform.toTransform(indexName) + val transformId = transform.id + val managedIndexConfig = getExistingManagedIndexConfig(indexName) + + // Change the start time so that the policy will be initialized. + updateManagedIndexConfigStartTime(managedIndexConfig) + waitFor { assertEquals(policyId, getExplainManagedIndexMetaData(indexName).policyID) } + + // Change the start time so that the transform action will be attempted. + updateManagedIndexConfigStartTime(managedIndexConfig) + waitFor { + assertEquals( + AttemptCreateTransformJobStep.getSuccessMessage(transformId, indexName), + getExplainManagedIndexMetaData(indexName).info?.get("message") + ) + } + + assertTransformCompleted(transform) + + // Change the start time so that the transform action will be attempted. + updateManagedIndexConfigStartTime(managedIndexConfig) + waitFor { + assertEquals( + WaitForTransformCompletionStep.getJobCompletionMessage(transformId, indexName), + getExplainManagedIndexMetaData(indexName).info?.get("message") + ) + } + + // Change the start time so that the transition attempted. + updateManagedIndexConfigStartTime(managedIndexConfig) + waitFor { + assertEquals( + AttemptTransitionStep.getSuccessMessage(indexName, "transform2"), + getExplainManagedIndexMetaData(indexName).info?.get("message") + ) + } + + // Change the start time so that the second transform action will be attempted. + updateManagedIndexConfigStartTime(managedIndexConfig) + waitFor { + assertEquals( + AttemptCreateTransformJobStep.getSuccessRestartMessage(transformId, indexName), + getExplainManagedIndexMetaData(indexName).info?.get("message") + ) + } + + assertTransformCompleted(transform) + + // Change the start time so that the second transform action will be attempted. + updateManagedIndexConfigStartTime(managedIndexConfig) + waitFor { + assertEquals( + WaitForTransformCompletionStep.getJobCompletionMessage(transformId, indexName), + getExplainManagedIndexMetaData(indexName).info?.get("message") + ) + } + } + + private fun assertTransformCompleted(transform: Transform) { + updateTransformStartTime(transform) + waitFor(timeout = Instant.ofEpochSecond(60)) { + val transformJob = getTransform(transformId = transform.id) + assertNotNull("Transform job doesn't have metadata set", transformJob.metadataId) + val transformMetadata = getTransformMetadata(transformJob.metadataId!!) + assertEquals("Transform is not finished", TransformMetadata.Status.FINISHED, transformMetadata.status) + } + } + + private fun assertIndexTransformFailedInAttemptCreateTransformStep(indexName: String, policyId: String, ismTransform: ISMTransform) { + val transformId = ismTransform.toTransform(indexName).id + val managedIndexConfig = getExistingManagedIndexConfig(indexName) + + // Change the start time so that the policy will be initialized. + updateManagedIndexConfigStartTime(managedIndexConfig) + waitFor { assertEquals(policyId, getExplainManagedIndexMetaData(indexName).policyID) } + + // Change the start time so that the transform action will be attempted. + updateManagedIndexConfigStartTime(managedIndexConfig) + waitFor { + assertEquals( + AttemptCreateTransformJobStep.getFailedMessage(transformId, indexName), + getExplainManagedIndexMetaData(indexName).info?.get("message") + ) + } + } +} diff --git a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/coordinator/ManagedIndexCoordinatorTests.kt b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/coordinator/ManagedIndexCoordinatorTests.kt index f455fca15..bce83125e 100644 --- a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/coordinator/ManagedIndexCoordinatorTests.kt +++ b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/coordinator/ManagedIndexCoordinatorTests.kt @@ -22,6 +22,7 @@ import org.opensearch.indexmanagement.indexstatemanagement.ManagedIndexCoordinat import org.opensearch.indexmanagement.indexstatemanagement.MetadataService import org.opensearch.indexmanagement.indexstatemanagement.settings.ManagedIndexSettings import org.opensearch.indexmanagement.indexstatemanagement.migration.ISMTemplateService +import org.opensearch.search.SearchModule import org.opensearch.test.ClusterServiceUtils import org.opensearch.test.OpenSearchTestCase import org.opensearch.threadpool.Scheduler @@ -77,7 +78,7 @@ class ManagedIndexCoordinatorTests : OpenSearchAllocationTestCase() { indexMetadataProvider = IndexMetadataProvider(settings, client, clusterService, mutableMapOf()) coordinator = ManagedIndexCoordinator( settings, client, clusterService, threadPool, indexManagementIndices, metadataService, - templateService, indexMetadataProvider + templateService, indexMetadataProvider, NamedXContentRegistry(SearchModule(Settings.EMPTY, emptyList()).namedXContents) ) } diff --git a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/step/AttemptCreateTransformJobStepTests.kt b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/step/AttemptCreateTransformJobStepTests.kt new file mode 100644 index 000000000..02945979e --- /dev/null +++ b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/step/AttemptCreateTransformJobStepTests.kt @@ -0,0 +1,65 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.indexmanagement.indexstatemanagement.step + +import org.opensearch.indexmanagement.indexstatemanagement.randomTransformActionConfig +import org.opensearch.indexmanagement.indexstatemanagement.step.transform.AttemptCreateTransformJobStep +import org.opensearch.indexmanagement.spi.indexstatemanagement.Step +import org.opensearch.indexmanagement.spi.indexstatemanagement.model.ActionMetaData +import org.opensearch.indexmanagement.spi.indexstatemanagement.model.ActionProperties +import org.opensearch.indexmanagement.spi.indexstatemanagement.model.ManagedIndexMetaData +import org.opensearch.indexmanagement.spi.indexstatemanagement.model.TransformActionProperties +import org.opensearch.test.OpenSearchTestCase + +class AttemptCreateTransformJobStepTests : OpenSearchTestCase() { + + private val transformAction = randomTransformActionConfig() + private val indexName = "test" + private val transformId: String = transformAction.ismTransform.toTransform(indexName).id + private val metadata = ManagedIndexMetaData( + indexName, + "indexUuid", + "policy_id", + null, + null, + null, + null, + null, + null, + null, + ActionMetaData( + AttemptCreateTransformJobStep.name, 1, 0, false, 0, null, + ActionProperties(transformActionProperties = TransformActionProperties(transformId)) + ), + null, + null, + null + ) + private val step = AttemptCreateTransformJobStep(transformAction) + + fun `test process failure`() { + step.processFailure(transformId, indexName, Exception("dummy-error")) + val updatedManagedIndexMedaData = step.getUpdatedManagedIndexMetadata(metadata) + assertEquals( + "Step status is not FAILED", + Step.StepStatus.FAILED, + updatedManagedIndexMedaData.stepMetaData?.stepStatus + ) + assertEquals( + "Error message is not expected", + AttemptCreateTransformJobStep.getFailedMessage(transformId, indexName), + updatedManagedIndexMedaData.info?.get("message") + ) + assertNull( + "TransformId in action properties is not cleaned up", + updatedManagedIndexMedaData.actionMetaData?.actionProperties?.transformActionProperties?.transformId + ) + } + + fun `test isIdempotent`() { + assertTrue(step.isIdempotent()) + } +} diff --git a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/step/WaitForTransformCompletionStepTests.kt b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/step/WaitForTransformCompletionStepTests.kt new file mode 100644 index 000000000..d536c7449 --- /dev/null +++ b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/step/WaitForTransformCompletionStepTests.kt @@ -0,0 +1,171 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.indexmanagement.indexstatemanagement.step + +import com.nhaarman.mockitokotlin2.mock +import kotlinx.coroutines.runBlocking +import org.opensearch.client.Client +import org.opensearch.cluster.service.ClusterService +import org.opensearch.common.settings.Settings +import org.opensearch.indexmanagement.indexstatemanagement.step.transform.AttemptCreateTransformJobStep +import org.opensearch.indexmanagement.indexstatemanagement.step.transform.WaitForTransformCompletionStep +import org.opensearch.indexmanagement.transform.model.TransformMetadata +import org.opensearch.indexmanagement.transform.model.TransformStats +import org.opensearch.indexmanagement.spi.indexstatemanagement.Step +import org.opensearch.indexmanagement.spi.indexstatemanagement.model.ActionMetaData +import org.opensearch.indexmanagement.spi.indexstatemanagement.model.ActionProperties +import org.opensearch.indexmanagement.spi.indexstatemanagement.model.ManagedIndexMetaData +import org.opensearch.indexmanagement.spi.indexstatemanagement.model.StepContext +import org.opensearch.indexmanagement.spi.indexstatemanagement.model.TransformActionProperties +import org.opensearch.indexmanagement.util.NO_ID +import org.opensearch.jobscheduler.spi.utils.LockService +import org.opensearch.script.ScriptService +import org.opensearch.test.OpenSearchTestCase +import java.time.Instant + +class WaitForTransformCompletionStepTests : OpenSearchTestCase() { + + private val clusterService: ClusterService = mock() + private val scriptService: ScriptService = mock() + private val settings: Settings = Settings.EMPTY + private val transformId: String = "dummy-id" + private val indexName: String = "test" + private val metadata = ManagedIndexMetaData( + indexName, + "indexUuid", + "policy_id", + null, + null, + null, + null, + null, + null, + null, + ActionMetaData( + AttemptCreateTransformJobStep.name, 1, 0, false, 0, null, + ActionProperties(transformActionProperties = TransformActionProperties(transformId)) + ), + null, + null, + null + ) + private val transformMetadata = TransformMetadata( + id = NO_ID, + transformId = transformId, + lastUpdatedAt = Instant.now(), + status = TransformMetadata.Status.FINISHED, + stats = TransformStats(1, 1, 1, 1, 1) + ) + private val client: Client = mock() + private val step = WaitForTransformCompletionStep() + private val lockService: LockService = LockService(mock(), clusterService) + + fun `test wait for transform when missing transform id`() { + val actionMetadata = metadata.actionMetaData!!.copy(actionProperties = ActionProperties()) + val metadata = metadata.copy(actionMetaData = actionMetadata) + val context = StepContext(metadata, clusterService, client, null, null, scriptService, settings, lockService) + val step = WaitForTransformCompletionStep() + + runBlocking { + step.preExecute(logger, context).execute() + } + + val updatedManagedIndexMetaData = step.getUpdatedManagedIndexMetadata(metadata) + assertEquals("Step status is not FAILED", Step.StepStatus.FAILED, updatedManagedIndexMetaData.stepMetaData?.stepStatus) + assertEquals( + "Missing failure message", + WaitForTransformCompletionStep.getMissingTransformJobMessage(indexName), + updatedManagedIndexMetaData.info?.get("message") + ) + } + + fun `test process transform metadata FAILED status`() { + val transformMetadata = transformMetadata.copy(status = TransformMetadata.Status.FAILED) + step.processTransformMetadataStatus(transformId, indexName, transformMetadata) + + val updateManagedIndexMetaData = step.getUpdatedManagedIndexMetadata(metadata) + assertEquals("Step status is not FAILED", Step.StepStatus.FAILED, updateManagedIndexMetaData.stepMetaData?.stepStatus) + assertEquals( + "Missing failure message", + WaitForTransformCompletionStep.getJobFailedMessage(transformId, indexName), + updateManagedIndexMetaData.info?.get("message") + ) + } + + fun `test process transform metadata STOPPED status`() { + val transformMetadata = transformMetadata.copy(status = TransformMetadata.Status.STOPPED) + step.processTransformMetadataStatus(transformId, indexName, transformMetadata) + + val updateManagedIndexMetaData = step.getUpdatedManagedIndexMetadata(metadata) + assertEquals("Step status is not FAILED", Step.StepStatus.FAILED, updateManagedIndexMetaData.stepMetaData?.stepStatus) + assertEquals( + "Missing failure message", + WaitForTransformCompletionStep.getJobFailedMessage(transformId, indexName), + updateManagedIndexMetaData.info?.get("message") + ) + assertEquals("Mismatch in cause", WaitForTransformCompletionStep.JOB_STOPPED_MESSAGE, updateManagedIndexMetaData.info?.get("cause")) + } + + fun `test process transform metadata INIT status`() { + val transformMetadata = transformMetadata.copy(status = TransformMetadata.Status.INIT) + step.processTransformMetadataStatus(transformId, indexName, transformMetadata) + + val updateManagedIndexMetaData = step.getUpdatedManagedIndexMetadata(metadata) + assertEquals( + "Step status is not CONDITION_NOT_MET", + Step.StepStatus.CONDITION_NOT_MET, + updateManagedIndexMetaData.stepMetaData?.stepStatus + ) + assertEquals( + "Missing processing message", + WaitForTransformCompletionStep.getJobProcessingMessage(transformId, indexName), + updateManagedIndexMetaData.info?.get("message") + ) + } + + fun `test process transform metadata STARTED status`() { + val transformMetadata = transformMetadata.copy(status = TransformMetadata.Status.STARTED) + step.processTransformMetadataStatus(transformId, indexName, transformMetadata) + + val updateManagedIndexMetaData = step.getUpdatedManagedIndexMetadata(metadata) + assertEquals("Step status is not CONDITION_NOT_MET", Step.StepStatus.CONDITION_NOT_MET, updateManagedIndexMetaData.stepMetaData?.stepStatus) + assertEquals( + "Missing processing message", + WaitForTransformCompletionStep.getJobProcessingMessage(transformId, indexName), + updateManagedIndexMetaData.info?.get("message") + ) + } + + fun `test process transform metadata FINISHED status`() { + val transformMetadata = transformMetadata.copy(status = TransformMetadata.Status.FINISHED) + step.processTransformMetadataStatus(transformId, indexName, transformMetadata) + + val updateManagedIndexMetaData = step.getUpdatedManagedIndexMetadata(metadata) + assertEquals("Step status is not COMPLETED", Step.StepStatus.COMPLETED, updateManagedIndexMetaData.stepMetaData?.stepStatus) + assertEquals( + "Missing processing message", + WaitForTransformCompletionStep.getJobCompletionMessage(transformId, indexName), + updateManagedIndexMetaData.info?.get("message") + ) + } + + fun `test process failure`() { + step.processFailure(transformId, indexName, Exception("dummy-exception")) + + val updateManagedIndexMetaData = step.getUpdatedManagedIndexMetadata(metadata) + assertEquals("Mismatch in cause", "dummy-exception", updateManagedIndexMetaData.info?.get("cause")) + assertEquals( + "Mismatch in message", + WaitForTransformCompletionStep.getFailedMessage(transformId, indexName), + updateManagedIndexMetaData.info?.get("message") + ) + assertEquals("Step status is not FAILED", Step.StepStatus.FAILED, updateManagedIndexMetaData.stepMetaData?.stepStatus) + } + + fun `test isIdempotent`() { + assertTrue(step.isIdempotent()) + } +} diff --git a/src/test/kotlin/org/opensearch/indexmanagement/transform/TestHelpers.kt b/src/test/kotlin/org/opensearch/indexmanagement/transform/TestHelpers.kt index 880f00238..415e44718 100644 --- a/src/test/kotlin/org/opensearch/indexmanagement/transform/TestHelpers.kt +++ b/src/test/kotlin/org/opensearch/indexmanagement/transform/TestHelpers.kt @@ -21,8 +21,10 @@ import org.opensearch.indexmanagement.randomSchedule import org.opensearch.indexmanagement.randomUser import org.opensearch.indexmanagement.rollup.randomAfterKey import org.opensearch.indexmanagement.rollup.randomDimension +import org.opensearch.indexmanagement.rollup.randomTermQuery import org.opensearch.indexmanagement.transform.model.ContinuousTransformStats import org.opensearch.indexmanagement.transform.model.ExplainTransform +import org.opensearch.indexmanagement.transform.model.ISMTransform import org.opensearch.indexmanagement.transform.model.Transform import org.opensearch.indexmanagement.transform.model.TransformMetadata import org.opensearch.indexmanagement.transform.model.TransformStats @@ -159,6 +161,17 @@ fun randomExplainTransform(): ExplainTransform { return ExplainTransform(metadataID = metadata.id, metadata = metadata) } +fun randomISMTransform(): ISMTransform { + return ISMTransform( + description = OpenSearchRestTestCase.randomAlphaOfLength(10), + targetIndex = OpenSearchRestTestCase.randomAlphaOfLength(10).lowercase(Locale.ROOT), + pageSize = OpenSearchRestTestCase.randomIntBetween(1, 10000), + groups = randomGroups(), + dataSelectionQuery = randomTermQuery(), + aggregations = randomAggregationFactories() + ) +} + fun Transform.toJsonString(params: ToXContent.Params = ToXContent.EMPTY_PARAMS): String = this.toXContent(XContentFactory.jsonBuilder(), params).string() fun TransformMetadata.toJsonString(params: ToXContent.Params = ToXContent.EMPTY_PARAMS): String = this.toXContent(XContentFactory.jsonBuilder(), params) diff --git a/src/test/kotlin/org/opensearch/indexmanagement/transform/TransformRestTestCase.kt b/src/test/kotlin/org/opensearch/indexmanagement/transform/TransformRestTestCase.kt index c2f6451e9..d407e5fec 100644 --- a/src/test/kotlin/org/opensearch/indexmanagement/transform/TransformRestTestCase.kt +++ b/src/test/kotlin/org/opensearch/indexmanagement/transform/TransformRestTestCase.kt @@ -7,12 +7,12 @@ package org.opensearch.indexmanagement.transform import org.apache.http.HttpEntity import org.apache.http.HttpHeaders +import org.apache.http.entity.ContentType import org.apache.http.entity.ContentType.APPLICATION_JSON import org.apache.http.entity.StringEntity import org.apache.http.message.BasicHeader import org.junit.AfterClass import org.opensearch.client.Response -import org.opensearch.client.ResponseException import org.opensearch.client.RestClient import org.opensearch.common.settings.Settings import org.opensearch.core.xcontent.NamedXContentRegistry @@ -30,12 +30,8 @@ import org.opensearch.indexmanagement.transform.model.TransformMetadata import org.opensearch.indexmanagement.util._ID import org.opensearch.indexmanagement.util._PRIMARY_TERM import org.opensearch.indexmanagement.util._SEQ_NO -import org.opensearch.indexmanagement.waitFor -import org.opensearch.jobscheduler.spi.schedule.IntervalSchedule import org.opensearch.core.rest.RestStatus import org.opensearch.search.SearchModule -import java.time.Duration -import java.time.Instant abstract class TransformRestTestCase : IndexManagementRestTestCase() { @@ -221,36 +217,7 @@ abstract class TransformRestTestCase : IndexManagementRestTestCase() { return continuousStats["documents_behind"] as Map } - protected fun updateTransformStartTime(update: Transform, desiredStartTimeMillis: Long? = null) { - // Before updating start time of a job always make sure there are no unassigned shards that could cause the config - // index to move to a new node and negate this forced start - if (isMultiNode) { - waitFor { - try { - client().makeRequest("GET", "_cluster/allocation/explain") - fail("Expected 400 Bad Request when there are no unassigned shards to explain") - } catch (e: ResponseException) { - assertEquals(RestStatus.BAD_REQUEST, e.response.restStatus()) - } - } - } - val intervalSchedule = (update.jobSchedule as IntervalSchedule) - val millis = Duration.of(intervalSchedule.interval.toLong(), intervalSchedule.unit).minusSeconds(2).toMillis() - val startTimeMillis = desiredStartTimeMillis ?: Instant.now().toEpochMilli() - millis - val waitForActiveShards = if (isMultiNode) "all" else "1" - val response = client().makeRequest( - "POST", "$INDEX_MANAGEMENT_INDEX/_update/${update.id}?wait_for_active_shards=$waitForActiveShards", - StringEntity( - "{\"doc\":{\"transform\":{\"schedule\":{\"interval\":{\"start_time\":" + - "\"$startTimeMillis\"}}}}}", - APPLICATION_JSON - ) - ) - - assertEquals("Request failed", RestStatus.OK, response.restStatus()) - } - - protected fun Transform.toHttpEntity(): HttpEntity = StringEntity(toJsonString(), APPLICATION_JSON) + protected fun Transform.toHttpEntity(): HttpEntity = StringEntity(toJsonString(), ContentType.APPLICATION_JSON) override fun xContentRegistry(): NamedXContentRegistry { return NamedXContentRegistry(SearchModule(Settings.EMPTY, emptyList()).namedXContents) diff --git a/src/test/kotlin/org/opensearch/indexmanagement/transform/model/ISMTransformTests.kt b/src/test/kotlin/org/opensearch/indexmanagement/transform/model/ISMTransformTests.kt new file mode 100644 index 000000000..6129f453f --- /dev/null +++ b/src/test/kotlin/org/opensearch/indexmanagement/transform/model/ISMTransformTests.kt @@ -0,0 +1,49 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.indexmanagement.transform.model + +import org.opensearch.indexmanagement.transform.randomISMTransform +import org.opensearch.test.OpenSearchTestCase +import kotlin.test.assertFailsWith + +class ISMTransformTests : OpenSearchTestCase() { + + fun `test ism transform requires non empty description`() { + assertFailsWith(IllegalArgumentException::class, "Requires non empty description") { + randomISMTransform().copy(description = "") + } + } + + fun `test ism transform requires non empty target index`() { + assertFailsWith(IllegalArgumentException::class, "Requires non empty targetIndex") { + randomISMTransform().copy(targetIndex = "") + } + } + + fun `test ism transform requires non empty groups`() { + assertFailsWith(IllegalArgumentException::class, "Requires non empty groups") { + randomISMTransform().copy(groups = listOf()) + } + } + + fun `test ism transform requires page size between 1 and 10K`() { + assertFailsWith(IllegalArgumentException:: class, "Page size cannot be less than 1") { + randomISMTransform().copy(pageSize = -1) + } + + assertFailsWith(IllegalArgumentException:: class, "Page size cannot be less than 1") { + randomISMTransform().copy(pageSize = 0) + } + + assertFailsWith(IllegalArgumentException:: class, "Page size cannot be greater than 10000") { + randomISMTransform().copy(pageSize = 10001) + } + + randomISMTransform().copy(pageSize = 1) + randomISMTransform().copy(pageSize = 500) + randomISMTransform().copy(pageSize = 10000) + } +} diff --git a/src/test/kotlin/org/opensearch/indexmanagement/transform/model/WriteableTests.kt b/src/test/kotlin/org/opensearch/indexmanagement/transform/model/WriteableTests.kt index 06c3a6b71..af1053641 100644 --- a/src/test/kotlin/org/opensearch/indexmanagement/transform/model/WriteableTests.kt +++ b/src/test/kotlin/org/opensearch/indexmanagement/transform/model/WriteableTests.kt @@ -8,6 +8,7 @@ package org.opensearch.indexmanagement.transform.model import org.opensearch.common.io.stream.BytesStreamOutput import org.opensearch.core.common.io.stream.StreamInput import org.opensearch.indexmanagement.transform.buildStreamInputForTransforms +import org.opensearch.indexmanagement.transform.randomISMTransform import org.opensearch.indexmanagement.transform.randomTransform import org.opensearch.indexmanagement.transform.randomTransformMetadata import org.opensearch.test.OpenSearchTestCase @@ -36,4 +37,11 @@ class WriteableTests : OpenSearchTestCase() { @Suppress("DEPRECATION") assertTrue("roles field in transform model is deprecated and should be parsed to empty list.", streamedTransform.roles.isEmpty()) } + + fun `test ism transform as stream`() { + val ismTransform = randomISMTransform() + val out = BytesStreamOutput().also { ismTransform.writeTo(it) } + val streamedISMTransform = ISMTransform(buildStreamInputForTransforms(out)) + assertEquals("Round tripping ISMTransform stream doesn't work", ismTransform, streamedISMTransform) + } } diff --git a/src/test/resources/mappings/cached-opendistro-ism-config.json b/src/test/resources/mappings/cached-opendistro-ism-config.json index 588e886b4..bdcdcea31 100644 --- a/src/test/resources/mappings/cached-opendistro-ism-config.json +++ b/src/test/resources/mappings/cached-opendistro-ism-config.json @@ -1,6 +1,6 @@ { "_meta" : { - "schema_version": 19 + "schema_version": 20 }, "dynamic": "strict", "properties": { @@ -455,6 +455,83 @@ } } }, + "transform": { + "properties": { + "ism_transform": { + "properties": { + "description": { + "type": "text" + }, + "target_index": { + "type": "text", + "fields": { + "keyword": { + "type": "keyword", + "ignore_above": 256 + } + } + }, + "page_size": { + "type": "long" + }, + "data_selection_query": { + "type": "object", + "enabled": false + }, + "groups": { + "properties": { + "date_histogram": { + "properties": { + "source_field": { + "type": "keyword" + }, + "target_field": { + "type": "keyword" + }, + "fixed_interval": { + "type": "keyword" + }, + "calendar_interval": { + "type": "keyword" + }, + "timezone": { + "type": "keyword" + } + } + }, + "terms": { + "properties": { + "source_field": { + "type": "keyword" + }, + "target_field": { + "type": "keyword" + } + } + }, + "histogram": { + "properties": { + "source_field": { + "type": "keyword" + }, + "target_field": { + "type": "keyword" + }, + "interval": { + "type": "double" + } + } + } + } + }, + "aggregations": { + "type": "object", + "enabled": false + } + } + } + } + }, "shrink": { "properties": { "num_new_shards": { @@ -789,6 +866,10 @@ "shrink_action_properties": { "type": "object", "enabled": false + }, + "transform_action_properties": { + "type": "object", + "enabled": false } } } diff --git a/src/test/resources/mappings/cached-opendistro-ism-history.json b/src/test/resources/mappings/cached-opendistro-ism-history.json index 0e7db6d40..6ce6e1779 100644 --- a/src/test/resources/mappings/cached-opendistro-ism-history.json +++ b/src/test/resources/mappings/cached-opendistro-ism-history.json @@ -1,6 +1,6 @@ { "_meta" : { - "schema_version": 6 + "schema_version": 7 }, "dynamic": "strict", "properties": { @@ -115,6 +115,10 @@ "shrink_action_properties": { "type": "object", "enabled": false + }, + "transform_action_properties": { + "type": "object", + "enabled": false } } }