Skip to content

Commit

Permalink
Fix change policy logic
Browse files Browse the repository at this point in the history
Signed-off-by: bowenlan-amzn <[email protected]>
  • Loading branch information
bowenlan-amzn committed Oct 1, 2023
1 parent 8b80c9f commit 3a0ed49
Show file tree
Hide file tree
Showing 10 changed files with 69 additions and 120 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -498,10 +498,7 @@ class IndexManagementPlugin : JobSchedulerExtension, NetworkPlugin, ActionPlugin
ManagedIndexSettings.ROLLOVER_SKIP,
ManagedIndexSettings.INDEX_STATE_MANAGEMENT_ENABLED,
ManagedIndexSettings.ACTION_VALIDATION_ENABLED,
ManagedIndexSettings.METADATA_SERVICE_ENABLED,
ManagedIndexSettings.AUTO_MANAGE,
ManagedIndexSettings.METADATA_SERVICE_STATUS,
ManagedIndexSettings.TEMPLATE_MIGRATION_CONTROL,
ManagedIndexSettings.JITTER,
ManagedIndexSettings.JOB_INTERVAL,
ManagedIndexSettings.SWEEP_PERIOD,
Expand Down Expand Up @@ -538,16 +535,13 @@ class IndexManagementPlugin : JobSchedulerExtension, NetworkPlugin, ActionPlugin
LegacyOpenDistroManagedIndexSettings.ROLLOVER_ALIAS,
LegacyOpenDistroManagedIndexSettings.ROLLOVER_SKIP,
LegacyOpenDistroManagedIndexSettings.INDEX_STATE_MANAGEMENT_ENABLED,
LegacyOpenDistroManagedIndexSettings.METADATA_SERVICE_ENABLED,
LegacyOpenDistroManagedIndexSettings.JOB_INTERVAL,
LegacyOpenDistroManagedIndexSettings.SWEEP_PERIOD,
LegacyOpenDistroManagedIndexSettings.COORDINATOR_BACKOFF_COUNT,
LegacyOpenDistroManagedIndexSettings.COORDINATOR_BACKOFF_MILLIS,
LegacyOpenDistroManagedIndexSettings.ALLOW_LIST,
LegacyOpenDistroManagedIndexSettings.SNAPSHOT_DENY_LIST,
LegacyOpenDistroManagedIndexSettings.AUTO_MANAGE,
LegacyOpenDistroManagedIndexSettings.METADATA_SERVICE_STATUS,
LegacyOpenDistroManagedIndexSettings.TEMPLATE_MIGRATION_CONTROL,
LegacyOpenDistroManagedIndexSettings.RESTRICTED_INDEX_PATTERN,
LegacyOpenDistroRollupSettings.ROLLUP_INGEST_BACKOFF_COUNT,
LegacyOpenDistroRollupSettings.ROLLUP_INGEST_BACKOFF_MILLIS,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,8 +119,6 @@ class ManagedIndexCoordinator(
private val ismIndices = indexManagementIndices

private var scheduledFullSweep: Scheduler.Cancellable? = null
private var scheduledMoveMetadata: Scheduler.Cancellable? = null
private var scheduledTemplateMigration: Scheduler.Cancellable? = null

@Volatile private var lastFullSweepTimeNano = System.nanoTime()
@Volatile private var indexStateManagementEnabled = INDEX_STATE_MANAGEMENT_ENABLED.get(settings)
Expand Down Expand Up @@ -170,10 +168,6 @@ class ManagedIndexCoordinator(
fun offClusterManager() {
// Cancel background sweep when demoted from being cluster manager
scheduledFullSweep?.cancel()

scheduledMoveMetadata?.cancel()

scheduledTemplateMigration?.cancel()
}

override fun clusterChanged(event: ClusterChangedEvent) {
Expand Down Expand Up @@ -206,8 +200,6 @@ class ManagedIndexCoordinator(

override fun beforeStop() {
scheduledFullSweep?.cancel()

scheduledMoveMetadata?.cancel()
}

private fun enable() {
Expand All @@ -229,8 +221,6 @@ class ManagedIndexCoordinator(
private fun disable() {
scheduledFullSweep?.cancel()
indexStateManagementEnabled = false

scheduledMoveMetadata?.cancel()
}

private suspend fun reenableJobs() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -333,7 +333,11 @@ object ManagedIndexRunner :
return
}

logger.info("change policy $managedIndexConfig")
logger.info("change policy $managedIndexMetaData")
logger.info("change policy $action")
if (managedIndexConfig.shouldChangePolicy(managedIndexMetaData, action)) {
logger.info("Change policy for index ${managedIndexConfig.index}")
initChangePolicy(managedIndexConfig, managedIndexMetaData, action)
return
}
Expand Down Expand Up @@ -464,13 +468,43 @@ object ManagedIndexRunner :
}

private suspend fun initManagedIndex(managedIndexConfig: ManagedIndexConfig) {
val policy: Policy = managedIndexConfig.policy
// at this point we either successfully saved the policy or we failed to get the policy
val updatedManagedIndexMetaData =
// Initializing ManagedIndexMetaData for the first time
getInitializedManagedIndexMetaData(managedIndexConfig, policy)
var policy: Policy = managedIndexConfig.policy
lateinit var metadata: ManagedIndexMetaData

metadata = getInitializedManagedIndexMetaData(managedIndexConfig, policy)
if (managedIndexConfig.changePolicy != null) {
val policyID = managedIndexConfig.changePolicy.policyID
val newPolicy = getPolicy(policyID)
if (newPolicy != null) {
policy = newPolicy
val saved = savePolicyToManagedIndexConfig(managedIndexConfig, policy)
if (!saved) {
logger.error("Failed to save policy to ManagedIndexConfig(${managedIndexConfig.index})")
return

Check warning on line 483 in src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/ManagedIndexRunner.kt

View check run for this annotation

Codecov / codecov/patch

src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/ManagedIndexRunner.kt#L482-L483

Added lines #L482 - L483 were not covered by tests
}
metadata = getInitializedManagedIndexMetaData(managedIndexConfig, policy)
} else {
// no policy found for change policy TODO can we check this in change policy API
metadata = ManagedIndexMetaData(
index = managedIndexConfig.index,
indexUuid = managedIndexConfig.indexUuid,
policyID = policyID,
policySeqNo = null,
policyPrimaryTerm = null,
policyCompleted = false,
rolledOver = false,
indexCreationDate = getIndexCreationDate(managedIndexConfig),
transitionTo = null,
stateMetaData = null,
actionMetaData = null,
stepMetaData = null,
policyRetryInfo = PolicyRetryInfoMetaData(failed = true, consumedRetries = 0),
info = mapOf("message" to "Fail to load policy: $policyID")
)
}
}

updateManagedIndexMetaData(updatedManagedIndexMetaData, create = true)
updateManagedIndexMetaData(metadata, create = true)
}

@Suppress("ReturnCount", "BlockingMethodInNonBlockingContext")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,20 +7,16 @@ package org.opensearch.indexmanagement.indexstatemanagement.settings

import org.opensearch.common.settings.Setting
import org.opensearch.common.unit.TimeValue
import org.opensearch.indexmanagement.indexstatemanagement.ISMActionsParser
import org.opensearch.indexmanagement.indexstatemanagement.settings.ManagedIndexSettings.Companion.DEFAULT_ISM_ENABLED
import org.opensearch.indexmanagement.indexstatemanagement.settings.ManagedIndexSettings.Companion.DEFAULT_JOB_INTERVAL
import org.opensearch.indexmanagement.indexstatemanagement.settings.ManagedIndexSettings.Companion.ALLOW_LIST_ALL
import org.opensearch.indexmanagement.indexstatemanagement.settings.ManagedIndexSettings.Companion.SNAPSHOT_DENY_LIST_NONE
import java.util.concurrent.TimeUnit
import java.util.function.Function

@Suppress("UtilityClassWithPublicConstructor")
class LegacyOpenDistroManagedIndexSettings {
companion object {
const val DEFAULT_ISM_ENABLED = true
const val DEFAULT_METADATA_SERVICE_STATUS = 0
const val DEFAULT_METADATA_SERVICE_ENABLED = true
const val DEFAULT_JOB_INTERVAL = 5
private val ALLOW_LIST_ALL = ISMActionsParser.instance.parsers.map { it.getActionType() }.toList()
val ALLOW_LIST_NONE = emptyList<String>()
val SNAPSHOT_DENY_LIST_NONE = emptyList<String>()

val INDEX_STATE_MANAGEMENT_ENABLED: Setting<Boolean> = Setting.boolSetting(
"opendistro.index_state_management.enabled",
Expand All @@ -30,36 +26,6 @@ class LegacyOpenDistroManagedIndexSettings {
Setting.Property.Deprecated
)

// 0: migration is going on
// 1: migration succeed
// -1: migration failed
val METADATA_SERVICE_STATUS: Setting<Int> = Setting.intSetting(
"opendistro.index_state_management.metadata_migration.status",
DEFAULT_METADATA_SERVICE_STATUS,
Setting.Property.NodeScope,
Setting.Property.Dynamic
)

// 0: enabled, use onClusterManager time as ISM template last_updated_time
// -1: migration ended successfully
// -2: migration ended unsuccessfully
// >0: use this setting (epoch millis) as ISM template last_updated_time
val TEMPLATE_MIGRATION_CONTROL: Setting<Long> = Setting.longSetting(
"opendistro.index_state_management.template_migration.control",
ManagedIndexSettings.DEFAULT_TEMPLATE_MIGRATION_TIMESTAMP,
-2L,
Setting.Property.NodeScope,
Setting.Property.Dynamic
)

val METADATA_SERVICE_ENABLED: Setting<Boolean> = Setting.boolSetting(
"opendistro.index_state_management.metadata_service.enabled",
DEFAULT_METADATA_SERVICE_ENABLED,
Setting.Property.NodeScope,
Setting.Property.Dynamic,
Setting.Property.Deprecated
)

val POLICY_ID: Setting<String> = Setting.simpleString(
"index.opendistro.index_state_management.policy_id",
Setting.Property.IndexScope,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,18 +8,19 @@ package org.opensearch.indexmanagement.indexstatemanagement.settings
import org.opensearch.common.settings.Setting
import org.opensearch.common.unit.TimeValue
import org.opensearch.indexmanagement.IndexManagementPlugin.Companion.INDEX_MANAGEMENT_INDEX
import org.opensearch.indexmanagement.indexstatemanagement.ISMActionsParser
import java.util.function.Function

@Suppress("UtilityClassWithPublicConstructor")
class ManagedIndexSettings {
companion object {
const val DEFAULT_ISM_ENABLED = true
const val DEFAULT_ACTION_VALIDATION_ENABLED = false
const val DEFAULT_TEMPLATE_MIGRATION_TIMESTAMP = 0L
const val DEFAULT_JOB_INTERVAL = 5
const val DEFAULT_JITTER = 0.6
const val DEFAULT_RESTRICTED_PATTERN = "\\.opendistro_security|\\.kibana.*|\\$INDEX_MANAGEMENT_INDEX"
val ALLOW_LIST_NONE = emptyList<String>()
val ALLOW_LIST_ALL = ISMActionsParser.instance.parsers.map { it.getActionType() }.toList()
val SNAPSHOT_DENY_LIST_NONE = emptyList<String>()

val INDEX_STATE_MANAGEMENT_ENABLED: Setting<Boolean> = Setting.boolSetting(
Expand All @@ -36,35 +37,6 @@ class ManagedIndexSettings {
Setting.Property.Dynamic
)

// 0: migration is going on
// 1: migration succeed
// -1: migration failed
val METADATA_SERVICE_STATUS: Setting<Int> = Setting.intSetting(
"plugins.index_state_management.metadata_migration.status",
LegacyOpenDistroManagedIndexSettings.METADATA_SERVICE_STATUS,
Setting.Property.NodeScope,
Setting.Property.Dynamic
)

// 0: enabled, use onClusterManager time as ISM template last_updated_time
// -1: migration ended successfully
// -2: migration ended unsuccessfully
// >0: use this setting (epoch millis) as ISM template last_updated_time
val TEMPLATE_MIGRATION_CONTROL: Setting<Long> = Setting.longSetting(
"plugins.index_state_management.template_migration.control",
LegacyOpenDistroManagedIndexSettings.TEMPLATE_MIGRATION_CONTROL,
-2L,
Setting.Property.NodeScope,
Setting.Property.Dynamic
)

val METADATA_SERVICE_ENABLED: Setting<Boolean> = Setting.boolSetting(
"plugins.index_state_management.metadata_service.enabled",
LegacyOpenDistroManagedIndexSettings.METADATA_SERVICE_ENABLED,
Setting.Property.NodeScope,
Setting.Property.Dynamic
)

val POLICY_ID: Setting<String> = Setting.simpleString(
"index.plugins.index_state_management.policy_id",
LegacyOpenDistroManagedIndexSettings.POLICY_ID,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -375,7 +375,7 @@ class TransportAddPolicyAction @Inject constructor(
bulkReq,
object : ActionListener<BulkResponse> {
override fun onResponse(response: BulkResponse) {
log.info("Successfully cleaned metadata for remove policy indices: {}", indices)
log.debug("Successfully cleaned metadata for remove policy indices: {}", indices)
}

override fun onFailure(e: Exception) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,14 +34,12 @@ import org.opensearch.indexmanagement.indexstatemanagement.action.DeleteAction
import org.opensearch.indexmanagement.indexstatemanagement.action.RolloverAction
import org.opensearch.indexmanagement.indexstatemanagement.action.TransitionsAction
import org.opensearch.indexmanagement.indexstatemanagement.model.ChangePolicy
import org.opensearch.indexmanagement.indexstatemanagement.model.ISMTemplate
import org.opensearch.indexmanagement.indexstatemanagement.model.ManagedIndexConfig
import org.opensearch.indexmanagement.indexstatemanagement.model.Policy
import org.opensearch.indexmanagement.indexstatemanagement.model.State
import org.opensearch.indexmanagement.indexstatemanagement.model.Transition
import org.opensearch.indexmanagement.indexstatemanagement.model.coordinator.SweptManagedIndexConfig
import org.opensearch.indexmanagement.indexstatemanagement.settings.ManagedIndexSettings
import org.opensearch.indexmanagement.opensearchapi.optionalISMTemplateField
import org.opensearch.indexmanagement.opensearchapi.optionalTimeField
import org.opensearch.indexmanagement.opensearchapi.parseWithType
import org.opensearch.indexmanagement.opensearchapi.suspendUntil
Expand Down Expand Up @@ -134,17 +132,6 @@ private fun updateEnabledField(uuid: String, enabled: Boolean, enabledTime: Long
return UpdateRequest(INDEX_MANAGEMENT_INDEX, uuid).doc(builder)
}

fun updateISMTemplateRequest(policyID: String, ismTemplates: List<ISMTemplate>, seqNo: Long, primaryTerm: Long): UpdateRequest {
val builder = XContentFactory.jsonBuilder()
.startObject()
.startObject(Policy.POLICY_TYPE)
.optionalISMTemplateField(Policy.ISM_TEMPLATE, ismTemplates)
.endObject()
.endObject()
return UpdateRequest(INDEX_MANAGEMENT_INDEX, policyID).doc(builder)
.setIfSeqNo(seqNo).setIfPrimaryTerm(primaryTerm)
}

fun updateDisableManagedIndexRequest(uuid: String): UpdateRequest {
return updateEnabledField(uuid, false, null)
}
Expand Down Expand Up @@ -413,17 +400,17 @@ fun ManagedIndexConfig.shouldChangePolicy(managedIndexMetaData: ManagedIndexMeta
return true
}

if (managedIndexMetaData.actionMetaData?.name != TransitionsAction.name) {
return false
}

// we need this in so that we can change policy before the first transition happens so policy doesn't get completed
// before we have a chance to change policy
if (actionToExecute?.type == TransitionsAction.name) {
return true
}

return true
// TODO actionToExecute is correlate to the actionMetadata?
// actionToExecute is found out by checking the metadata, it can be current unfinished one or the next
// actionMetadata has already been updated, it can be current unfinished one or the next
// In change policy context, we only accept unfinished transition or the new transition
return managedIndexMetaData.actionMetaData?.name == TransitionsAction.name
}

fun ManagedIndexMetaData.hasDifferentPolicyVersion(managedIndexConfig: ManagedIndexConfig): Boolean =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@ class IndexManagementSettingsTests : OpenSearchTestCase() {
LegacyOpenDistroManagedIndexSettings.ROLLOVER_ALIAS,
LegacyOpenDistroManagedIndexSettings.ROLLOVER_SKIP,
LegacyOpenDistroManagedIndexSettings.INDEX_STATE_MANAGEMENT_ENABLED,
LegacyOpenDistroManagedIndexSettings.METADATA_SERVICE_ENABLED,
LegacyOpenDistroManagedIndexSettings.JOB_INTERVAL,
LegacyOpenDistroManagedIndexSettings.SWEEP_PERIOD,
LegacyOpenDistroManagedIndexSettings.COORDINATOR_BACKOFF_COUNT,
Expand Down Expand Up @@ -78,7 +77,6 @@ class IndexManagementSettingsTests : OpenSearchTestCase() {
ManagedIndexSettings.POLICY_ID,
ManagedIndexSettings.ROLLOVER_ALIAS,
ManagedIndexSettings.INDEX_STATE_MANAGEMENT_ENABLED,
ManagedIndexSettings.METADATA_SERVICE_ENABLED,
ManagedIndexSettings.JOB_INTERVAL,
ManagedIndexSettings.SWEEP_PERIOD,
ManagedIndexSettings.COORDINATOR_BACKOFF_COUNT,
Expand Down Expand Up @@ -159,7 +157,6 @@ class IndexManagementSettingsTests : OpenSearchTestCase() {
.build()

assertEquals(ManagedIndexSettings.INDEX_STATE_MANAGEMENT_ENABLED.get(settings), false)
assertEquals(ManagedIndexSettings.METADATA_SERVICE_ENABLED.get(settings), false)
assertEquals(ManagedIndexSettings.JOB_INTERVAL.get(settings), 1)
assertEquals(ManagedIndexSettings.SWEEP_PERIOD.get(settings), TimeValue.timeValueMinutes(6))
assertEquals(ManagedIndexSettings.COORDINATOR_BACKOFF_MILLIS.get(settings), TimeValue.timeValueMillis(1))
Expand All @@ -186,7 +183,6 @@ class IndexManagementSettingsTests : OpenSearchTestCase() {
assertSettingDeprecationsAndWarnings(
arrayOf(
LegacyOpenDistroManagedIndexSettings.INDEX_STATE_MANAGEMENT_ENABLED,
LegacyOpenDistroManagedIndexSettings.METADATA_SERVICE_ENABLED,
LegacyOpenDistroManagedIndexSettings.JOB_INTERVAL,
LegacyOpenDistroManagedIndexSettings.SWEEP_PERIOD,
LegacyOpenDistroManagedIndexSettings.COORDINATOR_BACKOFF_MILLIS,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,6 @@ class ManagedIndexCoordinatorTests : OpenSearchAllocationTestCase() {
settingSet.add(ManagedIndexSettings.JITTER)
settingSet.add(ManagedIndexSettings.JOB_INTERVAL)
settingSet.add(ManagedIndexSettings.INDEX_STATE_MANAGEMENT_ENABLED)
settingSet.add(ManagedIndexSettings.METADATA_SERVICE_STATUS)
settingSet.add(ManagedIndexSettings.TEMPLATE_MIGRATION_CONTROL)
settingSet.add(ManagedIndexSettings.COORDINATOR_BACKOFF_COUNT)
settingSet.add(ManagedIndexSettings.COORDINATOR_BACKOFF_MILLIS)
settingSet.add(ManagedIndexSettings.RESTRICTED_INDEX_PATTERN)
Expand Down
Loading

0 comments on commit 3a0ed49

Please sign in to comment.