From 6994598f66c0c04cb20277b9589e8ba582855297 Mon Sep 17 00:00:00 2001 From: bowenlan-amzn Date: Mon, 2 Oct 2023 11:26:23 -0700 Subject: [PATCH] Provide unique id for each rollup job and add debug logs (#968) * Provide unique id for each rollup job and add debug logs Signed-off-by: bowenlan-amzn * Remove conflict rollup ids Signed-off-by: bowenlan-amzn --------- Signed-off-by: bowenlan-amzn --- .../rollup/RollupMetadataService.kt | 15 +++++++++------ .../indexmanagement/rollup/RollupRunner.kt | 1 + .../indexmanagement/rollup/RollupRestTestCase.kt | 5 ++--- .../resthandler/RestDeleteRollupActionIT.kt | 1 + .../rollup/resthandler/RestGetRollupActionIT.kt | 9 ++++++--- .../rollup/resthandler/RestIndexRollupActionIT.kt | 7 +++++-- .../rollup/resthandler/RestStartRollupActionIT.kt | 7 +++++-- .../rollup/resthandler/RestStopRollupActionIT.kt | 13 +++++++++---- .../rollup/runner/RollupRunnerIT.kt | 7 +++++++ 9 files changed, 45 insertions(+), 20 deletions(-) diff --git a/src/main/kotlin/org/opensearch/indexmanagement/rollup/RollupMetadataService.kt b/src/main/kotlin/org/opensearch/indexmanagement/rollup/RollupMetadataService.kt index f6d3e8f50..41eb84a6a 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/rollup/RollupMetadataService.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/rollup/RollupMetadataService.kt @@ -196,7 +196,7 @@ class RollupMetadataService(val client: Client, val xContentRegistry: NamedXCont // Get the doc value field of the dateHistogram.sourceField for the first search hit converted to epoch millis // If the doc value is null or empty it will be treated the same as empty doc hits - val firstHitTimestampAsString: String? = response.hits.hits.first().field(dateHistogram.sourceField).getValue() + val firstHitTimestampAsString: String = response.hits.hits.first().field(dateHistogram.sourceField).getValue() ?: return StartingTimeResult.NoDocumentsFound // Parse date and extract epochMillis val formatter = DateFormatter.forPattern(DATE_FIELD_STRICT_DATE_OPTIONAL_TIME_FORMAT) @@ -204,11 +204,11 @@ class RollupMetadataService(val client: Client, val xContentRegistry: NamedXCont return StartingTimeResult.Success(getRoundedTime(epochMillis, dateHistogram)) } catch (e: RemoteTransportException) { val unwrappedException = ExceptionsHelper.unwrapCause(e) as Exception - logger.debug("Error when getting initial start time for rollup [${rollup.id}]: $unwrappedException") + logger.error("Error when getting initial start time for rollup [{}]: {}", rollup.id, unwrappedException) return StartingTimeResult.Failure(unwrappedException) } catch (e: Exception) { // TODO: Catching general exceptions for now, can make more granular - logger.debug("Error when getting initial start time for rollup [${rollup.id}]: $e") + logger.error("Error when getting initial start time for rollup [{}]: {}", rollup.id, e) return StartingTimeResult.Failure(e) } } @@ -261,7 +261,7 @@ class RollupMetadataService(val client: Client, val xContentRegistry: NamedXCont } // This updates the metadata for a continuous rollup after an execution of the composite search and ingestion of rollup data - fun getUpdatedContinuousMetadata( + private fun getUpdatedContinuousMetadata( rollup: Rollup, metadata: RollupMetadata, internalComposite: InternalComposite @@ -305,11 +305,11 @@ class RollupMetadataService(val client: Client, val xContentRegistry: NamedXCont } else MetadataResult.NoMetadata } catch (e: RemoteTransportException) { val unwrappedException = ExceptionsHelper.unwrapCause(e) as Exception - logger.debug("$errorMessage: $unwrappedException") + logger.error("$errorMessage: $unwrappedException") return MetadataResult.Failure(errorMessage, unwrappedException) } catch (e: Exception) { // TODO: Catching general exceptions for now, can make more granular - logger.debug("$errorMessage: $e") + logger.error("$errorMessage: $e") return MetadataResult.Failure(errorMessage, e) } } @@ -389,6 +389,7 @@ class RollupMetadataService(val client: Client, val xContentRegistry: NamedXCont failureReason = "The create metadata call failed with a ${response.result?.lowercase} result" } } + logger.debug("Metadata update successful {}", metadata) // TODO: Is seqno/prim and id returned for all? return MetadataResult.Success( metadata.copy( @@ -401,9 +402,11 @@ class RollupMetadataService(val client: Client, val xContentRegistry: NamedXCont ) } catch (e: RemoteTransportException) { val unwrappedException = ExceptionsHelper.unwrapCause(e) as Exception + logger.error("Metadata update failed ${metadata.rollupID}", unwrappedException) return MetadataResult.Failure(errorMessage, unwrappedException) } catch (e: Exception) { // TODO: Catching general exceptions for now, can make more granular + logger.error("Metadata update failed ${metadata.rollupID}", e) return MetadataResult.Failure(errorMessage, e) } } diff --git a/src/main/kotlin/org/opensearch/indexmanagement/rollup/RollupRunner.kt b/src/main/kotlin/org/opensearch/indexmanagement/rollup/RollupRunner.kt index 8f779a5f6..e4725fb28 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/rollup/RollupRunner.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/rollup/RollupRunner.kt @@ -193,6 +193,7 @@ object RollupRunner : * */ @Suppress("ReturnCount", "NestedBlockDepth", "ComplexMethod", "LongMethod", "ThrowsCount") private suspend fun runRollupJob(job: Rollup, context: JobExecutionContext, lock: LockModel) { + logger.debug("Running rollup job [${job.id}]") var updatableLock = lock try { when (val jobValidity = isJobValid(job)) { diff --git a/src/test/kotlin/org/opensearch/indexmanagement/rollup/RollupRestTestCase.kt b/src/test/kotlin/org/opensearch/indexmanagement/rollup/RollupRestTestCase.kt index 9009ba775..90c6aa9b2 100644 --- a/src/test/kotlin/org/opensearch/indexmanagement/rollup/RollupRestTestCase.kt +++ b/src/test/kotlin/org/opensearch/indexmanagement/rollup/RollupRestTestCase.kt @@ -40,7 +40,6 @@ import org.opensearch.indexmanagement.util._SEQ_NO import org.opensearch.indexmanagement.waitFor import org.opensearch.jobscheduler.spi.schedule.IntervalSchedule import org.opensearch.core.rest.RestStatus -import org.opensearch.test.OpenSearchTestCase import java.time.Duration import java.time.Instant @@ -112,7 +111,7 @@ abstract class RollupRestTestCase : IndexManagementRestTestCase() { protected fun createRollup( rollup: Rollup, - rollupId: String = OpenSearchTestCase.randomAlphaOfLength(10), + rollupId: String, refresh: Boolean = true, client: RestClient? = null ): Rollup { @@ -150,7 +149,7 @@ abstract class RollupRestTestCase : IndexManagementRestTestCase() { protected fun createRandomRollup(refresh: Boolean = true): Rollup { val rollup = randomRollup() - val rollupId = createRollup(rollup, refresh = refresh).id + val rollupId = createRollup(rollup, rollupId = rollup.id, refresh = refresh).id return getRollup(rollupId = rollupId) } diff --git a/src/test/kotlin/org/opensearch/indexmanagement/rollup/resthandler/RestDeleteRollupActionIT.kt b/src/test/kotlin/org/opensearch/indexmanagement/rollup/resthandler/RestDeleteRollupActionIT.kt index bcd1a5645..7c871781c 100644 --- a/src/test/kotlin/org/opensearch/indexmanagement/rollup/resthandler/RestDeleteRollupActionIT.kt +++ b/src/test/kotlin/org/opensearch/indexmanagement/rollup/resthandler/RestDeleteRollupActionIT.kt @@ -16,6 +16,7 @@ import org.opensearch.test.junit.annotations.TestLogging @TestLogging(value = "level:DEBUG", reason = "Debugging tests") @Suppress("UNCHECKED_CAST") class RestDeleteRollupActionIT : RollupRestTestCase() { + @Throws(Exception::class) fun `test deleting a rollup`() { val rollup = createRandomRollup() diff --git a/src/test/kotlin/org/opensearch/indexmanagement/rollup/resthandler/RestGetRollupActionIT.kt b/src/test/kotlin/org/opensearch/indexmanagement/rollup/resthandler/RestGetRollupActionIT.kt index ab421d90b..0f2897972 100644 --- a/src/test/kotlin/org/opensearch/indexmanagement/rollup/resthandler/RestGetRollupActionIT.kt +++ b/src/test/kotlin/org/opensearch/indexmanagement/rollup/resthandler/RestGetRollupActionIT.kt @@ -13,14 +13,17 @@ import org.opensearch.indexmanagement.rollup.action.get.GetRollupsRequest.Compan import org.opensearch.indexmanagement.rollup.randomRollup import org.opensearch.core.rest.RestStatus import org.opensearch.test.junit.annotations.TestLogging +import java.util.Locale @TestLogging(value = "level:DEBUG", reason = "Debugging tests") @Suppress("UNCHECKED_CAST") class RestGetRollupActionIT : RollupRestTestCase() { + private val testName = javaClass.simpleName.lowercase(Locale.ROOT) + @Throws(Exception::class) fun `test getting a rollup`() { - var rollup = createRollup(randomRollup()) + var rollup = createRollup(randomRollup(), rollupId = "$testName-1") val indexedRollup = getRollup(rollup.id) // Schema version and last updated time are updated during the creation so we need to update the original too for comparison // Job schedule interval will have a dynamic start time @@ -48,7 +51,7 @@ class RestGetRollupActionIT : RollupRestTestCase() { @Throws(Exception::class) fun `test getting all rollups`() { - val rollups = randomList(1, 15) { createRollup(randomRollup()) } + val rollups = randomList(1, 15) { createRandomRollup() } // Using a larger response size than the default in case leftover rollups prevent the ones created in this test from being returned val res = client().makeRequest("GET", "$ROLLUP_JOBS_BASE_URI?size=100") @@ -91,7 +94,7 @@ class RestGetRollupActionIT : RollupRestTestCase() { fun `test changing response size when getting rollups`() { // Ensure at least more rollup jobs than the default (20) exists val rollupCount = 25 - repeat(rollupCount) { createRollup(randomRollup()) } + repeat(rollupCount) { createRandomRollup() } // The default response size is 20, so even though 25 rollup jobs were made, at most 20 will be returned var res = client().makeRequest("GET", ROLLUP_JOBS_BASE_URI) diff --git a/src/test/kotlin/org/opensearch/indexmanagement/rollup/resthandler/RestIndexRollupActionIT.kt b/src/test/kotlin/org/opensearch/indexmanagement/rollup/resthandler/RestIndexRollupActionIT.kt index fdfd99f40..49a44b49b 100644 --- a/src/test/kotlin/org/opensearch/indexmanagement/rollup/resthandler/RestIndexRollupActionIT.kt +++ b/src/test/kotlin/org/opensearch/indexmanagement/rollup/resthandler/RestIndexRollupActionIT.kt @@ -31,11 +31,14 @@ import org.opensearch.indexmanagement.util._SEQ_NO import org.opensearch.core.rest.RestStatus import org.opensearch.test.OpenSearchTestCase import org.opensearch.test.junit.annotations.TestLogging +import java.util.Locale @TestLogging(value = "level:DEBUG", reason = "Debugging tests") @Suppress("UNCHECKED_CAST") class RestIndexRollupActionIT : RollupRestTestCase() { + private val testName = javaClass.simpleName.lowercase(Locale.ROOT) + @Throws(Exception::class) fun `test creating a rollup`() { val rollup = randomRollup() @@ -213,7 +216,7 @@ class RestIndexRollupActionIT : RollupRestTestCase() { Dimension.Type.TERMS -> (it as Terms).copy(targetField = "some_other_target_field") } } - val rollup = createRollup(rollup = randomRollup().copy(dimensions = dimensions)) + val rollup = createRollup(rollup = randomRollup().copy(dimensions = dimensions), rollupId = "$testName-1") client().makeRequest( "PUT", "$ROLLUP_JOBS_BASE_URI/${rollup.id}?refresh=true&if_seq_no=${rollup.seqNo}&if_primary_term=${rollup.primaryTerm}", @@ -254,7 +257,7 @@ class RestIndexRollupActionIT : RollupRestTestCase() { } ) } - val rollup = createRollup(rollup = randomRollup().copy(metrics = metrics)) + val rollup = createRollup(rollup = randomRollup().copy(metrics = metrics), rollupId = "$testName-2") client().makeRequest( "PUT", "$ROLLUP_JOBS_BASE_URI/${rollup.id}?refresh=true&if_seq_no=${rollup.seqNo}&if_primary_term=${rollup.primaryTerm}", diff --git a/src/test/kotlin/org/opensearch/indexmanagement/rollup/resthandler/RestStartRollupActionIT.kt b/src/test/kotlin/org/opensearch/indexmanagement/rollup/resthandler/RestStartRollupActionIT.kt index 4f2dde8bf..e2654b105 100644 --- a/src/test/kotlin/org/opensearch/indexmanagement/rollup/resthandler/RestStartRollupActionIT.kt +++ b/src/test/kotlin/org/opensearch/indexmanagement/rollup/resthandler/RestStartRollupActionIT.kt @@ -23,12 +23,15 @@ import org.opensearch.jobscheduler.spi.schedule.IntervalSchedule import org.opensearch.core.rest.RestStatus import java.time.Instant import java.time.temporal.ChronoUnit +import java.util.Locale class RestStartRollupActionIT : RollupRestTestCase() { + private val testName = javaClass.simpleName.lowercase(Locale.ROOT) + @Throws(Exception::class) fun `test starting a stopped rollup`() { - val rollup = createRollup(randomRollup().copy(enabled = false, jobEnabledTime = null, metadataID = null)) + val rollup = createRollup(randomRollup().copy(enabled = false, jobEnabledTime = null, metadataID = null), rollupId = "$testName-1") assertTrue("Rollup was not disabled", !rollup.enabled) val response = client().makeRequest("POST", "$ROLLUP_JOBS_BASE_URI/${rollup.id}/_start") @@ -44,7 +47,7 @@ class RestStartRollupActionIT : RollupRestTestCase() { @Throws(Exception::class) fun `test starting a started rollup doesnt change enabled time`() { // First create a non-started rollup - val rollup = createRollup(randomRollup().copy(enabled = false, jobEnabledTime = null, metadataID = null)) + val rollup = createRollup(randomRollup().copy(enabled = false, jobEnabledTime = null, metadataID = null), rollupId = "$testName-2") assertTrue("Rollup was not disabled", !rollup.enabled) // Enable it to get the job enabled time diff --git a/src/test/kotlin/org/opensearch/indexmanagement/rollup/resthandler/RestStopRollupActionIT.kt b/src/test/kotlin/org/opensearch/indexmanagement/rollup/resthandler/RestStopRollupActionIT.kt index a4d7a6d18..592889e49 100644 --- a/src/test/kotlin/org/opensearch/indexmanagement/rollup/resthandler/RestStopRollupActionIT.kt +++ b/src/test/kotlin/org/opensearch/indexmanagement/rollup/resthandler/RestStopRollupActionIT.kt @@ -26,9 +26,12 @@ import org.opensearch.jobscheduler.spi.schedule.IntervalSchedule import org.opensearch.core.rest.RestStatus import java.time.Instant import java.time.temporal.ChronoUnit +import java.util.Locale class RestStopRollupActionIT : RollupRestTestCase() { + private val testName = javaClass.simpleName.lowercase(Locale.ROOT) + @After fun clearIndicesAfterEachTest() { // Flaky could happen if config index not deleted @@ -39,7 +42,7 @@ class RestStopRollupActionIT : RollupRestTestCase() { @Throws(Exception::class) fun `test stopping a started rollup`() { - val rollup = createRollup(randomRollup().copy(enabled = true, jobEnabledTime = randomInstant(), metadataID = null)) + val rollup = createRollup(randomRollup().copy(enabled = true, jobEnabledTime = randomInstant(), metadataID = null), rollupId = "$testName-1") assertTrue("Rollup was not enabled", rollup.enabled) val response = client().makeRequest("POST", "$ROLLUP_JOBS_BASE_URI/${rollup.id}/_stop") @@ -53,7 +56,7 @@ class RestStopRollupActionIT : RollupRestTestCase() { @Throws(Exception::class) fun `test stopping a stopped rollup`() { - val rollup = createRollup(randomRollup().copy(enabled = true, jobEnabledTime = randomInstant(), metadataID = null)) + val rollup = createRollup(randomRollup().copy(enabled = true, jobEnabledTime = randomInstant(), metadataID = null), rollupId = "$testName-2") assertTrue("Rollup was not enabled", rollup.enabled) val response = client().makeRequest("POST", "$ROLLUP_JOBS_BASE_URI/${rollup.id}/_stop") @@ -84,7 +87,8 @@ class RestStopRollupActionIT : RollupRestTestCase() { enabled = true, jobEnabledTime = Instant.now(), metadataID = null - ) + ), + rollupId = "$testName-3" ) createRollupSourceIndex(rollup) updateRollupStartTime(rollup) @@ -157,7 +161,8 @@ class RestStopRollupActionIT : RollupRestTestCase() { enabled = true, jobEnabledTime = Instant.now(), metadataID = null - ) + ), + rollupId = "$testName-4" ) // Force rollup to execute which should fail as we did not create a source index diff --git a/src/test/kotlin/org/opensearch/indexmanagement/rollup/runner/RollupRunnerIT.kt b/src/test/kotlin/org/opensearch/indexmanagement/rollup/runner/RollupRunnerIT.kt index 529f166f4..d379638c8 100644 --- a/src/test/kotlin/org/opensearch/indexmanagement/rollup/runner/RollupRunnerIT.kt +++ b/src/test/kotlin/org/opensearch/indexmanagement/rollup/runner/RollupRunnerIT.kt @@ -34,9 +34,12 @@ import org.opensearch.core.rest.RestStatus import java.time.Instant import java.time.temporal.ChronoUnit import java.util.Collections.emptyMap +import java.util.Locale class RollupRunnerIT : RollupRestTestCase() { + private val testName = javaClass.simpleName.lowercase(Locale.ROOT) + fun `test metadata is created for rollup job when none exists`() { val indexName = "test_index_runner_first" @@ -148,6 +151,7 @@ class RollupRunnerIT : RollupRestTestCase() { // Define the rollup job var rollup = randomRollup().copy( + id = "$testName-1", enabled = true, jobSchedule = IntervalSchedule(Instant.now(), 1, ChronoUnit.MINUTES), jobEnabledTime = Instant.now(), @@ -270,6 +274,7 @@ class RollupRunnerIT : RollupRestTestCase() { // Define rollup var rollup = randomRollup().copy( + id = "$testName-2", enabled = true, jobSchedule = IntervalSchedule(Instant.now(), 1, ChronoUnit.MINUTES), jobEnabledTime = Instant.now(), @@ -328,6 +333,7 @@ class RollupRunnerIT : RollupRestTestCase() { // Define rollup var rollup = randomRollup().copy( + id = "$testName-3", enabled = true, jobSchedule = IntervalSchedule(Instant.now(), 1, ChronoUnit.MINUTES), jobEnabledTime = Instant.now(), @@ -640,6 +646,7 @@ class RollupRunnerIT : RollupRestTestCase() { val delay: Long = 15000 // Define rollup var rollup = randomRollup().copy( + id = "$testName-4", enabled = true, jobSchedule = IntervalSchedule(Instant.now(), 1, ChronoUnit.MINUTES), jobEnabledTime = Instant.now(),