diff --git a/core/src/main/scala/akka/persistence/r2dbc/internal/BySliceQuery.scala b/core/src/main/scala/akka/persistence/r2dbc/internal/BySliceQuery.scala index fdc212a7..b8ece0e0 100644 --- a/core/src/main/scala/akka/persistence/r2dbc/internal/BySliceQuery.scala +++ b/core/src/main/scala/akka/persistence/r2dbc/internal/BySliceQuery.scala @@ -433,9 +433,12 @@ import org.slf4j.Logger def disableBacktrackingWhenFarBehindCurrentWallClockTime: Boolean = { val aheadOfInitial = - initialOffset == TimestampOffset.Zero || state.latestBacktracking.timestamp.isAfter(initialOffset.timestamp) + initialOffset == TimestampOffset.Zero || + state.latestBacktracking.timestamp.compareTo(initialOffset.timestamp) >= 0 + val previousTimestamp = if (state.previous == TimestampOffset.Zero) state.latest.timestamp else state.previous.timestamp + aheadOfInitial && previousTimestamp.isBefore(clock.instant().minus(firstBacktrackingQueryWindow)) } diff --git a/core/src/test/scala/akka/persistence/r2dbc/query/EventsBySliceBacktrackingSpec.scala b/core/src/test/scala/akka/persistence/r2dbc/query/EventsBySliceBacktrackingSpec.scala index 0962f436..ad944179 100644 --- a/core/src/test/scala/akka/persistence/r2dbc/query/EventsBySliceBacktrackingSpec.scala +++ b/core/src/test/scala/akka/persistence/r2dbc/query/EventsBySliceBacktrackingSpec.scala @@ -300,7 +300,7 @@ class EventsBySliceBacktrackingSpec result1.cancel() } - "still make initial backtracking until ahead of start offset" in { + "still make initial backtracking until caught up to start offset, then skip backtracking" in { pendingIfMoreThanOneDataPartition() val entityType = nextEntityType() @@ -317,7 +317,11 @@ class EventsBySliceBacktrackingSpec writeEvent(slice1, pid1, 2, startTime.plusMillis(3), "e1-2") writeEvent(slice2, pid2, 2, startTime.plusMillis(4), "e2-2") - (3 to 10).foreach { n => + // will start query at next event + val startOffset = TimestampOffset(startTime.plusSeconds(23).plusMillis(1), Map.empty) + + // go past switch-to-backtracking trigger of 3 * buffer size (of 10) + (3 to 30).foreach { n => writeEvent(slice1, pid1, n, startTime.plusSeconds(20 + n).plusMillis(1), s"e1-$n") writeEvent(slice2, pid2, n, startTime.plusSeconds(20 + n).plusMillis(2), s"e2-$n") } @@ -339,15 +343,16 @@ class EventsBySliceBacktrackingSpec env.offset } - val result1 = startQuery(TimestampOffset(startTime.plusSeconds(20), Map.empty)) + val result1 = startQuery(startOffset) // from backtracking expect(result1.expectNext(), pid1, 1, None) expect(result1.expectNext(), pid2, 1, None) expect(result1.expectNext(), pid1, 2, None) expect(result1.expectNext(), pid2, 2, None) + expect(result1.expectNext(), pid1, 3, None) // start offset // from normal - (3 to 10).foreach { n => + (3 to 30).foreach { n => expect(result1.expectNext(), pid1, n, Some(s"e1-$n")) expect(result1.expectNext(), pid2, n, Some(s"e2-$n")) }