diff --git a/core/src/main/scala/akka/persistence/r2dbc/internal/EnvelopeOrigin.scala b/core/src/main/scala/akka/persistence/r2dbc/internal/EnvelopeOrigin.scala index 9fc29da8..6d18ee13 100644 --- a/core/src/main/scala/akka/persistence/r2dbc/internal/EnvelopeOrigin.scala +++ b/core/src/main/scala/akka/persistence/r2dbc/internal/EnvelopeOrigin.scala @@ -15,6 +15,7 @@ import akka.persistence.query.typed.EventEnvelope val SourceQuery = "" val SourceBacktracking = "BT" val SourcePubSub = "PS" + val SourceSnapshot = "SN" def fromQuery(env: EventEnvelope[_]): Boolean = env.source == SourceQuery @@ -28,6 +29,9 @@ import akka.persistence.query.typed.EventEnvelope def fromPubSub(env: EventEnvelope[_]): Boolean = env.source == SourcePubSub + def fromSnapshot(env: EventEnvelope[_]): Boolean = + env.source == SourceSnapshot + def isFilteredEvent(env: Any): Boolean = env match { case e: EventEnvelope[_] => e.filtered diff --git a/core/src/main/scala/akka/persistence/r2dbc/internal/StartingFromSnapshotStage.scala b/core/src/main/scala/akka/persistence/r2dbc/internal/StartingFromSnapshotStage.scala index 08f85f2c..3f9840b7 100644 --- a/core/src/main/scala/akka/persistence/r2dbc/internal/StartingFromSnapshotStage.scala +++ b/core/src/main/scala/akka/persistence/r2dbc/internal/StartingFromSnapshotStage.scala @@ -66,7 +66,7 @@ import akka.stream.stage.OutHandler } override def onUpstreamFinish(): Unit = { - val primaryHandler = new PrimaryHandler + val primaryHandler = new PrimaryHandler(isAvailable(out)) self.setHandler(out, primaryHandler) subFusingMaterializer.materialize( @@ -80,9 +80,9 @@ import akka.stream.stage.OutHandler } } - class PrimaryHandler extends OutHandler with InHandler { + class PrimaryHandler(pullImmediately: Boolean) extends OutHandler with InHandler { val subSink = new SubSinkInlet[EventEnvelope[Event]]("snapshots") - subSink.pull() + if (pullImmediately) subSink.pull() subSink.setHandler(this) override def onPull(): Unit = { diff --git a/core/src/main/scala/akka/persistence/r2dbc/query/scaladsl/R2dbcReadJournal.scala b/core/src/main/scala/akka/persistence/r2dbc/query/scaladsl/R2dbcReadJournal.scala index 336746b3..73f703a9 100644 --- a/core/src/main/scala/akka/persistence/r2dbc/query/scaladsl/R2dbcReadJournal.scala +++ b/core/src/main/scala/akka/persistence/r2dbc/query/scaladsl/R2dbcReadJournal.scala @@ -162,7 +162,7 @@ final class R2dbcReadJournal(system: ExtendedActorSystem, config: Config, cfgPat row.entityType, row.slice, filtered = false, - source = "", + source = EnvelopeOrigin.SourceSnapshot, tags = row.tags) } diff --git a/core/src/test/scala/akka/persistence/r2dbc/query/EventsByPersistenceIdStartingFromSnapshotSpec.scala b/core/src/test/scala/akka/persistence/r2dbc/query/EventsByPersistenceIdStartingFromSnapshotSpec.scala index 30751e3a..cced7b5b 100644 --- a/core/src/test/scala/akka/persistence/r2dbc/query/EventsByPersistenceIdStartingFromSnapshotSpec.scala +++ b/core/src/test/scala/akka/persistence/r2dbc/query/EventsByPersistenceIdStartingFromSnapshotSpec.scala @@ -5,7 +5,6 @@ package akka.persistence.r2dbc.query import java.time.Instant - import akka.Done import akka.NotUsed import akka.actor.testkit.typed.scaladsl.LogCapturing @@ -20,6 +19,7 @@ import akka.persistence.r2dbc.TestActors.Persister.PersistWithAck import akka.persistence.r2dbc.TestConfig import akka.persistence.r2dbc.TestData import akka.persistence.r2dbc.TestDbLifecycle +import akka.persistence.r2dbc.internal.EnvelopeOrigin import akka.persistence.r2dbc.query.scaladsl.R2dbcReadJournal import akka.persistence.typed.PersistenceId import akka.stream.scaladsl.Source @@ -124,7 +124,9 @@ class EventsByPersistenceIdStartingFromSnapshotSpec .runWith(sinkProbe) .request(21) - result.expectNext().event shouldBe expectedSnapshotEvent(17) + val evt17 = result.expectNext() + evt17.event shouldBe expectedSnapshotEvent(17) + EnvelopeOrigin.fromSnapshot(evt17) shouldBe true for (i <- 18 to 20) { result.expectNext().event shouldBe s"e-$i" }