Skip to content

Commit

Permalink
Interval schedule should take start time from the request, should not… (
Browse files Browse the repository at this point in the history
opensearch-project#1040) (opensearch-project#1047)

* Interval schedule should take start time from the request, should not set it to the current time of request execution.



* Changed the "delayed continuous execution test" to be more expressive about what it should test.



* fixed the NPE if schedule.startTime is NULL



* fixed the NPE if schedule.startTime is NULL



* fixed styling



* - removed null checks from RollUp and Transforms
- fixed comments in the "delayed execution" test



---------


(cherry picked from commit 417d0d9)

Signed-off-by: Oleg Kravchuk <[email protected]>
Signed-off-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
Co-authored-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
  • Loading branch information
1 parent 36bb819 commit 55e689a
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -330,7 +330,7 @@ data class Rollup(
// TODO: Make startTime public in Job Scheduler so we can just directly check the value
if (seqNo == SequenceNumbers.UNASSIGNED_SEQ_NO || primaryTerm == SequenceNumbers.UNASSIGNED_PRIMARY_TERM) {
if (schedule is IntervalSchedule) {
schedule = IntervalSchedule(Instant.now(), schedule.interval, schedule.unit, schedule.delay ?: 0)
schedule = IntervalSchedule(schedule.startTime, schedule.interval, schedule.unit, schedule.delay ?: 0)
}
}
return Rollup(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -639,7 +639,7 @@ class RollupRunnerIT : RollupRestTestCase() {
// Tests that a continuous rollup will not be processed until the end of the interval plus delay passes
fun `test delaying continuous execution`() {
val indexName = "test_index_runner_eighth"
val delay: Long = 15000
val delay: Long = 7_500
// Define rollup
var rollup = randomRollup().copy(
id = "$testName-4",
Expand All @@ -663,40 +663,29 @@ class RollupRunnerIT : RollupRestTestCase() {
putDateDocumentInSourceIndex(rollup)

// Create rollup job
rollup = createRollup(rollup = rollup, rollupId = rollup.id)
val jobStartTime = Instant.now()
val rollupNow = rollup.copy(
jobSchedule = IntervalSchedule(jobStartTime, 1, ChronoUnit.MINUTES),
jobEnabledTime = jobStartTime
)
rollup = createRollup(rollup = rollupNow, rollupId = rollupNow.id)

var nextExecutionTime = rollup.schedule.getNextExecutionTime(null).toEpochMilli()
val expectedExecutionTime = rollup.jobEnabledTime!!.plusMillis(delay).toEpochMilli()
val delayIsCorrect = ((expectedExecutionTime - nextExecutionTime) > -500) && ((expectedExecutionTime - nextExecutionTime) < 500)
assertTrue("Delay was not correctly applied", delayIsCorrect)
val expectedFirstExecutionTime = rollup.jobSchedule.getNextExecutionTime(null).toEpochMilli()
assertTrue("The first job execution time should be equal [job start time] + [delay].", expectedFirstExecutionTime == jobStartTime.toEpochMilli() + delay)

waitFor {
// Wait until half a second before the intended execution time
assertTrue(Instant.now().toEpochMilli() >= nextExecutionTime - 500)
// Still should not have run at this point
assertFalse("Target rollup index was created before the delay should allow", indexExists(rollup.targetIndex))
}
val rollupMetadata = waitFor {
waitFor() {
assertTrue("Target rollup index was not created", indexExists(rollup.targetIndex))
val rollupJob = getRollup(rollupId = rollup.id)
assertNotNull("Rollup job doesn't have metadata set", rollupJob.metadataID)
val rollupMetadata = getRollupMetadata(rollupJob.metadataID!!)
assertNotNull("Rollup metadata not found", rollupMetadata)
rollupMetadata
}
nextExecutionTime = rollup.schedule.getNextExecutionTime(null).toEpochMilli()
val nextExecutionOffset = (nextExecutionTime - Instant.now().toEpochMilli()) - 60000
val nextExecutionIsCorrect = nextExecutionOffset < 5000 && nextExecutionOffset > -5000
assertTrue("Next execution time not updated correctly", nextExecutionIsCorrect)
val nextWindowStartTime: Instant = rollupMetadata.continuous!!.nextWindowStartTime
val nextWindowEndTime: Instant = rollupMetadata.continuous!!.nextWindowEndTime
// Assert that after the window was updated, it falls approximately around 'now'
assertTrue("Rollup window start time is incorrect", nextWindowStartTime.plusMillis(delay).minusMillis(1000) < Instant.now())
assertTrue("Rollup window end time is incorrect", nextWindowEndTime.plusMillis(delay).plusMillis(1000) > Instant.now())

// window length should be 5 seconds
val expectedWindowEnd = nextWindowStartTime.plusMillis(5000)
assertEquals("Rollup window length applied incorrectly", expectedWindowEnd, nextWindowEndTime)

val now = Instant.now().toEpochMilli()
assertTrue("The first job execution must happen after [job start time] + [delay]", now > jobStartTime.toEpochMilli() + delay)

val secondExecutionTime = rollup.schedule.getNextExecutionTime(null).toEpochMilli()
assertTrue("The second job execution time should be not earlier than a minute after the first execution.", secondExecutionTime - expectedFirstExecutionTime == 60_000L)
}

fun `test non continuous delay does nothing`() {
Expand Down

0 comments on commit 55e689a

Please sign in to comment.