Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove onetime notification settings when index operation finish #839

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -235,9 +235,12 @@ class NotificationActionListener<Request : ActionRequest, Response : ActionRespo
}
}
}
}

// remove one time configuration no matter it is enabled or not
removeOneTimePolicy(config)
// remove one time configuration
val runtimeConfig = lronConfigResponse.lronConfigResponses.firstOrNull() { it.lronConfig.taskId != null }
runtimeConfig?.let {
removeOneTimePolicy(it.lronConfig)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import org.junit.After
import org.junit.Assert
import org.junit.Before
import org.opensearch.action.admin.indices.open.OpenIndexAction
import org.opensearch.client.Response
import org.opensearch.client.ResponseException
import org.opensearch.client.RestClient
import org.opensearch.common.settings.Settings
Expand Down Expand Up @@ -198,25 +199,7 @@ class NotificationActionListenerIT : IndexManagementRestTestCase() {

@Suppress("UNCHECKED_CAST")
fun `test notify for reindex`() {
insertSampleData("source-index", 10)
createIndex("reindex-dest", Settings.EMPTY)

val response = client.makeRequest(
"POST", "_reindex?wait_for_completion=false",
StringEntity(
"""
{
"source": {
"index": "source-index"
},
"dest": {
"index": "reindex-dest"
}
}
""".trimIndent(),
ContentType.APPLICATION_JSON
)
)
val response = performReindex()

Assert.assertTrue(response.restStatus() == RestStatus.OK)

Expand Down Expand Up @@ -323,25 +306,7 @@ class NotificationActionListenerIT : IndexManagementRestTestCase() {

@Suppress("UNCHECKED_CAST")
fun `test notify for reindex with duplicate channel`() {
insertSampleData("source-index", 10)
createIndex("reindex-dest", Settings.EMPTY)

val response = client.makeRequest(
"POST", "_reindex?wait_for_completion=false",
StringEntity(
"""
{
"source": {
"index": "source-index"
},
"dest": {
"index": "reindex-dest"
}
}
""".trimIndent(),
ContentType.APPLICATION_JSON
)
)
val response = performReindex()

Assert.assertTrue(response.restStatus() == RestStatus.OK)
val taskId = response.asMap()["task"] as String
Expand Down Expand Up @@ -448,4 +413,78 @@ class NotificationActionListenerIT : IndexManagementRestTestCase() {
)
}
}

@Suppress("UNCHECKED_CAST")
fun `test runtime policy been removed when index operation finished`() {
val response = performReindex()

Assert.assertTrue(response.restStatus() == RestStatus.OK)
val taskId = response.asMap()["task"]
Assert.assertNotNull(taskId)

// put runtime policy only for failure
client.makeRequest(
"POST", "_plugins/_im/lron",
StringEntity(
"""
{
"lron_config": {
"task_id": "$taskId",
"lron_condition": {
"failure": true,
"success": false
},
"channels": [
{
"id": "$notificationConfId"
}
]
}
}
""".trimIndent(),
ContentType.APPLICATION_JSON
)
)

waitFor(Instant.ofEpochSecond(60)) {
assertEquals(
"Notification index does not have a doc",
1,
(
client.makeRequest("GET", "$notificationIndex/_search?q=msg:reindex")
.asMap() as Map<String, Map<String, Map<String, Any>>>
)["hits"]!!["total"]!!["value"]
)

try {
client.makeRequest("GET", "_plugins/_im/lron/LRON:$taskId")
} catch (e: ResponseException) {
// runtime policy been removed
Assert.assertTrue(e.response.restStatus() == RestStatus.NOT_FOUND)
}
}
}

private fun performReindex(): Response {
insertSampleData("source-index", 10)
createIndex("reindex-dest", Settings.EMPTY)

val response = client.makeRequest(
"POST", "_reindex?wait_for_completion=false",
StringEntity(
"""
{
"source": {
"index": "source-index"
},
"dest": {
"index": "reindex-dest"
}
}
""".trimIndent(),
ContentType.APPLICATION_JSON
)
)
return response
}
}
Loading