Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Provide unique id for each rollup job and add debug logs #968

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -196,19 +196,19 @@

// Get the doc value field of the dateHistogram.sourceField for the first search hit converted to epoch millis
// If the doc value is null or empty it will be treated the same as empty doc hits
val firstHitTimestampAsString: String? = response.hits.hits.first().field(dateHistogram.sourceField).getValue<String>()
val firstHitTimestampAsString: String = response.hits.hits.first().field(dateHistogram.sourceField).getValue<String>()
?: return StartingTimeResult.NoDocumentsFound
// Parse date and extract epochMillis
val formatter = DateFormatter.forPattern(DATE_FIELD_STRICT_DATE_OPTIONAL_TIME_FORMAT)
val epochMillis = DateFormatters.from(formatter.parse(firstHitTimestampAsString), formatter.locale()).toInstant().toEpochMilli()
return StartingTimeResult.Success(getRoundedTime(epochMillis, dateHistogram))
} catch (e: RemoteTransportException) {
val unwrappedException = ExceptionsHelper.unwrapCause(e) as Exception
logger.debug("Error when getting initial start time for rollup [${rollup.id}]: $unwrappedException")
logger.error("Error when getting initial start time for rollup [{}]: {}", rollup.id, unwrappedException)

Check warning on line 207 in src/main/kotlin/org/opensearch/indexmanagement/rollup/RollupMetadataService.kt

View check run for this annotation

Codecov / codecov/patch

src/main/kotlin/org/opensearch/indexmanagement/rollup/RollupMetadataService.kt#L207

Added line #L207 was not covered by tests
return StartingTimeResult.Failure(unwrappedException)
} catch (e: Exception) {
// TODO: Catching general exceptions for now, can make more granular
logger.debug("Error when getting initial start time for rollup [${rollup.id}]: $e")
logger.error("Error when getting initial start time for rollup [{}]: {}", rollup.id, e)

Check warning on line 211 in src/main/kotlin/org/opensearch/indexmanagement/rollup/RollupMetadataService.kt

View check run for this annotation

Codecov / codecov/patch

src/main/kotlin/org/opensearch/indexmanagement/rollup/RollupMetadataService.kt#L211

Added line #L211 was not covered by tests
return StartingTimeResult.Failure(e)
}
}
Expand Down Expand Up @@ -261,7 +261,7 @@
}

// This updates the metadata for a continuous rollup after an execution of the composite search and ingestion of rollup data
fun getUpdatedContinuousMetadata(
private fun getUpdatedContinuousMetadata(
rollup: Rollup,
metadata: RollupMetadata,
internalComposite: InternalComposite
Expand Down Expand Up @@ -305,11 +305,11 @@
} else MetadataResult.NoMetadata
} catch (e: RemoteTransportException) {
val unwrappedException = ExceptionsHelper.unwrapCause(e) as Exception
logger.debug("$errorMessage: $unwrappedException")
logger.error("$errorMessage: $unwrappedException")

Check warning on line 308 in src/main/kotlin/org/opensearch/indexmanagement/rollup/RollupMetadataService.kt

View check run for this annotation

Codecov / codecov/patch

src/main/kotlin/org/opensearch/indexmanagement/rollup/RollupMetadataService.kt#L308

Added line #L308 was not covered by tests
return MetadataResult.Failure(errorMessage, unwrappedException)
} catch (e: Exception) {
// TODO: Catching general exceptions for now, can make more granular
logger.debug("$errorMessage: $e")
logger.error("$errorMessage: $e")
return MetadataResult.Failure(errorMessage, e)
}
}
Expand Down Expand Up @@ -389,6 +389,7 @@
failureReason = "The create metadata call failed with a ${response.result?.lowercase} result"
}
}
logger.debug("Metadata update successful {}", metadata)
// TODO: Is seqno/prim and id returned for all?
return MetadataResult.Success(
metadata.copy(
Expand All @@ -401,9 +402,11 @@
)
} catch (e: RemoteTransportException) {
val unwrappedException = ExceptionsHelper.unwrapCause(e) as Exception
logger.error("Metadata update failed ${metadata.rollupID}", unwrappedException)

Check warning on line 405 in src/main/kotlin/org/opensearch/indexmanagement/rollup/RollupMetadataService.kt

View check run for this annotation

Codecov / codecov/patch

src/main/kotlin/org/opensearch/indexmanagement/rollup/RollupMetadataService.kt#L405

Added line #L405 was not covered by tests
return MetadataResult.Failure(errorMessage, unwrappedException)
} catch (e: Exception) {
// TODO: Catching general exceptions for now, can make more granular
logger.error("Metadata update failed ${metadata.rollupID}", e)
return MetadataResult.Failure(errorMessage, e)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,7 @@ object RollupRunner :
* */
@Suppress("ReturnCount", "NestedBlockDepth", "ComplexMethod", "LongMethod", "ThrowsCount")
private suspend fun runRollupJob(job: Rollup, context: JobExecutionContext, lock: LockModel) {
logger.debug("Running rollup job [${job.id}]")
var updatableLock = lock
try {
when (val jobValidity = isJobValid(job)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@ import org.opensearch.indexmanagement.util._SEQ_NO
import org.opensearch.indexmanagement.waitFor
import org.opensearch.jobscheduler.spi.schedule.IntervalSchedule
import org.opensearch.core.rest.RestStatus
import org.opensearch.test.OpenSearchTestCase
import java.time.Duration
import java.time.Instant

Expand Down Expand Up @@ -112,7 +111,7 @@ abstract class RollupRestTestCase : IndexManagementRestTestCase() {

protected fun createRollup(
rollup: Rollup,
rollupId: String = OpenSearchTestCase.randomAlphaOfLength(10),
rollupId: String,
refresh: Boolean = true,
client: RestClient? = null
): Rollup {
Expand Down Expand Up @@ -150,7 +149,7 @@ abstract class RollupRestTestCase : IndexManagementRestTestCase() {

protected fun createRandomRollup(refresh: Boolean = true): Rollup {
val rollup = randomRollup()
val rollupId = createRollup(rollup, refresh = refresh).id
val rollupId = createRollup(rollup, rollupId = rollup.id, refresh = refresh).id
return getRollup(rollupId = rollupId)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import org.opensearch.test.junit.annotations.TestLogging
@TestLogging(value = "level:DEBUG", reason = "Debugging tests")
@Suppress("UNCHECKED_CAST")
class RestDeleteRollupActionIT : RollupRestTestCase() {

@Throws(Exception::class)
fun `test deleting a rollup`() {
val rollup = createRandomRollup()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,17 @@ import org.opensearch.indexmanagement.rollup.action.get.GetRollupsRequest.Compan
import org.opensearch.indexmanagement.rollup.randomRollup
import org.opensearch.core.rest.RestStatus
import org.opensearch.test.junit.annotations.TestLogging
import java.util.Locale

@TestLogging(value = "level:DEBUG", reason = "Debugging tests")
@Suppress("UNCHECKED_CAST")
class RestGetRollupActionIT : RollupRestTestCase() {

private val testName = javaClass.simpleName.lowercase(Locale.ROOT)

@Throws(Exception::class)
fun `test getting a rollup`() {
var rollup = createRollup(randomRollup())
var rollup = createRollup(randomRollup(), rollupId = "$testName-1")
val indexedRollup = getRollup(rollup.id)
// Schema version and last updated time are updated during the creation so we need to update the original too for comparison
// Job schedule interval will have a dynamic start time
Expand Down Expand Up @@ -48,7 +51,7 @@ class RestGetRollupActionIT : RollupRestTestCase() {

@Throws(Exception::class)
fun `test getting all rollups`() {
val rollups = randomList(1, 15) { createRollup(randomRollup()) }
val rollups = randomList(1, 15) { createRandomRollup() }

// Using a larger response size than the default in case leftover rollups prevent the ones created in this test from being returned
val res = client().makeRequest("GET", "$ROLLUP_JOBS_BASE_URI?size=100")
Expand Down Expand Up @@ -91,7 +94,7 @@ class RestGetRollupActionIT : RollupRestTestCase() {
fun `test changing response size when getting rollups`() {
// Ensure at least more rollup jobs than the default (20) exists
val rollupCount = 25
repeat(rollupCount) { createRollup(randomRollup()) }
repeat(rollupCount) { createRandomRollup() }

// The default response size is 20, so even though 25 rollup jobs were made, at most 20 will be returned
var res = client().makeRequest("GET", ROLLUP_JOBS_BASE_URI)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,14 @@ import org.opensearch.indexmanagement.util._SEQ_NO
import org.opensearch.core.rest.RestStatus
import org.opensearch.test.OpenSearchTestCase
import org.opensearch.test.junit.annotations.TestLogging
import java.util.Locale

@TestLogging(value = "level:DEBUG", reason = "Debugging tests")
@Suppress("UNCHECKED_CAST")
class RestIndexRollupActionIT : RollupRestTestCase() {

private val testName = javaClass.simpleName.lowercase(Locale.ROOT)

@Throws(Exception::class)
fun `test creating a rollup`() {
val rollup = randomRollup()
Expand Down Expand Up @@ -213,7 +216,7 @@ class RestIndexRollupActionIT : RollupRestTestCase() {
Dimension.Type.TERMS -> (it as Terms).copy(targetField = "some_other_target_field")
}
}
val rollup = createRollup(rollup = randomRollup().copy(dimensions = dimensions))
val rollup = createRollup(rollup = randomRollup().copy(dimensions = dimensions), rollupId = "$testName-1")
client().makeRequest(
"PUT",
"$ROLLUP_JOBS_BASE_URI/${rollup.id}?refresh=true&if_seq_no=${rollup.seqNo}&if_primary_term=${rollup.primaryTerm}",
Expand Down Expand Up @@ -254,7 +257,7 @@ class RestIndexRollupActionIT : RollupRestTestCase() {
}
)
}
val rollup = createRollup(rollup = randomRollup().copy(metrics = metrics))
val rollup = createRollup(rollup = randomRollup().copy(metrics = metrics), rollupId = "$testName-2")
client().makeRequest(
"PUT",
"$ROLLUP_JOBS_BASE_URI/${rollup.id}?refresh=true&if_seq_no=${rollup.seqNo}&if_primary_term=${rollup.primaryTerm}",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,15 @@ import org.opensearch.jobscheduler.spi.schedule.IntervalSchedule
import org.opensearch.core.rest.RestStatus
import java.time.Instant
import java.time.temporal.ChronoUnit
import java.util.Locale

class RestStartRollupActionIT : RollupRestTestCase() {

private val testName = javaClass.simpleName.lowercase(Locale.ROOT)

@Throws(Exception::class)
fun `test starting a stopped rollup`() {
val rollup = createRollup(randomRollup().copy(enabled = false, jobEnabledTime = null, metadataID = null))
val rollup = createRollup(randomRollup().copy(enabled = false, jobEnabledTime = null, metadataID = null), rollupId = "$testName-1")
assertTrue("Rollup was not disabled", !rollup.enabled)

val response = client().makeRequest("POST", "$ROLLUP_JOBS_BASE_URI/${rollup.id}/_start")
Expand All @@ -44,7 +47,7 @@ class RestStartRollupActionIT : RollupRestTestCase() {
@Throws(Exception::class)
fun `test starting a started rollup doesnt change enabled time`() {
// First create a non-started rollup
val rollup = createRollup(randomRollup().copy(enabled = false, jobEnabledTime = null, metadataID = null))
val rollup = createRollup(randomRollup().copy(enabled = false, jobEnabledTime = null, metadataID = null), rollupId = "$testName-2")
assertTrue("Rollup was not disabled", !rollup.enabled)

// Enable it to get the job enabled time
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,12 @@ import org.opensearch.jobscheduler.spi.schedule.IntervalSchedule
import org.opensearch.core.rest.RestStatus
import java.time.Instant
import java.time.temporal.ChronoUnit
import java.util.Locale

class RestStopRollupActionIT : RollupRestTestCase() {

private val testName = javaClass.simpleName.lowercase(Locale.ROOT)

@After
fun clearIndicesAfterEachTest() {
// Flaky could happen if config index not deleted
Expand All @@ -39,7 +42,7 @@ class RestStopRollupActionIT : RollupRestTestCase() {

@Throws(Exception::class)
fun `test stopping a started rollup`() {
val rollup = createRollup(randomRollup().copy(enabled = true, jobEnabledTime = randomInstant(), metadataID = null))
val rollup = createRollup(randomRollup().copy(enabled = true, jobEnabledTime = randomInstant(), metadataID = null), rollupId = "$testName-1")
assertTrue("Rollup was not enabled", rollup.enabled)

val response = client().makeRequest("POST", "$ROLLUP_JOBS_BASE_URI/${rollup.id}/_stop")
Expand All @@ -53,7 +56,7 @@ class RestStopRollupActionIT : RollupRestTestCase() {

@Throws(Exception::class)
fun `test stopping a stopped rollup`() {
val rollup = createRollup(randomRollup().copy(enabled = true, jobEnabledTime = randomInstant(), metadataID = null))
val rollup = createRollup(randomRollup().copy(enabled = true, jobEnabledTime = randomInstant(), metadataID = null), rollupId = "$testName-2")
assertTrue("Rollup was not enabled", rollup.enabled)

val response = client().makeRequest("POST", "$ROLLUP_JOBS_BASE_URI/${rollup.id}/_stop")
Expand Down Expand Up @@ -84,7 +87,8 @@ class RestStopRollupActionIT : RollupRestTestCase() {
enabled = true,
jobEnabledTime = Instant.now(),
metadataID = null
)
),
rollupId = "$testName-3"
)
createRollupSourceIndex(rollup)
updateRollupStartTime(rollup)
Expand Down Expand Up @@ -157,7 +161,8 @@ class RestStopRollupActionIT : RollupRestTestCase() {
enabled = true,
jobEnabledTime = Instant.now(),
metadataID = null
)
),
rollupId = "$testName-4"
)

// Force rollup to execute which should fail as we did not create a source index
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,12 @@ import org.opensearch.core.rest.RestStatus
import java.time.Instant
import java.time.temporal.ChronoUnit
import java.util.Collections.emptyMap
import java.util.Locale

class RollupRunnerIT : RollupRestTestCase() {

private val testName = javaClass.simpleName.lowercase(Locale.ROOT)

fun `test metadata is created for rollup job when none exists`() {
val indexName = "test_index_runner_first"

Expand Down Expand Up @@ -148,6 +151,7 @@ class RollupRunnerIT : RollupRestTestCase() {

// Define the rollup job
var rollup = randomRollup().copy(
id = "$testName-1",
enabled = true,
jobSchedule = IntervalSchedule(Instant.now(), 1, ChronoUnit.MINUTES),
jobEnabledTime = Instant.now(),
Expand Down Expand Up @@ -270,6 +274,7 @@ class RollupRunnerIT : RollupRestTestCase() {

// Define rollup
var rollup = randomRollup().copy(
id = "$testName-2",
enabled = true,
jobSchedule = IntervalSchedule(Instant.now(), 1, ChronoUnit.MINUTES),
jobEnabledTime = Instant.now(),
Expand Down Expand Up @@ -328,6 +333,7 @@ class RollupRunnerIT : RollupRestTestCase() {

// Define rollup
var rollup = randomRollup().copy(
id = "$testName-3",
enabled = true,
jobSchedule = IntervalSchedule(Instant.now(), 1, ChronoUnit.MINUTES),
jobEnabledTime = Instant.now(),
Expand Down Expand Up @@ -640,6 +646,7 @@ class RollupRunnerIT : RollupRestTestCase() {
val delay: Long = 15000
// Define rollup
var rollup = randomRollup().copy(
id = "$testName-4",
enabled = true,
jobSchedule = IntervalSchedule(Instant.now(), 1, ChronoUnit.MINUTES),
jobEnabledTime = Instant.now(),
Expand Down
Loading