diff --git a/core/src/main/mima-filters/1.3.9.backwards.excludes/query-correlation-id.excludes b/core/src/main/mima-filters/1.3.9.backwards.excludes/query-correlation-id.excludes new file mode 100644 index 00000000..1971b682 --- /dev/null +++ b/core/src/main/mima-filters/1.3.9.backwards.excludes/query-correlation-id.excludes @@ -0,0 +1,5 @@ +# internals +ProblemFilters.exclude[DirectMissingMethodProblem]("akka.persistence.r2dbc.internal.BySliceQuery#Dao.rowsBySlices") +ProblemFilters.exclude[DirectMissingMethodProblem]("akka.persistence.r2dbc.internal.BySliceQuery#Dao.countBuckets") +ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.persistence.r2dbc.internal.BySliceQuery#Dao.rowsBySlices") +ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.persistence.r2dbc.internal.BySliceQuery#Dao.countBuckets") \ No newline at end of file diff --git a/core/src/main/scala/akka/persistence/r2dbc/internal/BySliceQuery.scala b/core/src/main/scala/akka/persistence/r2dbc/internal/BySliceQuery.scala index 31840601..a72c3b6f 100644 --- a/core/src/main/scala/akka/persistence/r2dbc/internal/BySliceQuery.scala +++ b/core/src/main/scala/akka/persistence/r2dbc/internal/BySliceQuery.scala @@ -214,7 +214,8 @@ import org.slf4j.Logger fromSeqNr: Option[Long], // for events with same timestamp as `fromTimestamp` toTimestamp: Option[Instant], behindCurrentTime: FiniteDuration, - backtracking: Boolean): Source[SerializedRow, NotUsed] + backtracking: Boolean, + correlationId: Option[String]): Source[SerializedRow, NotUsed] /** * For Durable State we always refresh the bucket counts at the interval. For Event Sourced we know that they don't @@ -227,7 +228,8 @@ import org.slf4j.Logger minSlice: Int, maxSlice: Int, fromTimestamp: Instant, - limit: Int): Future[Seq[Bucket]] + limit: Int, + correlationId: Option[String]): Future[Seq[Bucket]] protected def appendEmptyBucketIfLastIsMissing( buckets: IndexedSeq[Bucket], @@ -268,6 +270,7 @@ import org.slf4j.Logger def currentBySlices( logPrefix: String, + correlationId: Option[String], entityType: String, minSlice: Int, maxSlice: Int, @@ -322,7 +325,8 @@ import org.slf4j.Logger fromSeqNr, toTimestamp = Some(toTimestamp), behindCurrentTime = Duration.Zero, - backtracking = false) + backtracking = false, + correlationId) .filter { row => filterEventsBeforeSnapshots(row.persistenceId, row.seqNr, row.source) } @@ -354,7 +358,7 @@ import org.slf4j.Logger updateState = nextOffset, delayNextQuery = _ => None, nextQuery = state => nextQuery(state, currentTime), - beforeQuery = beforeQuery(logPrefix, entityType, minSlice, maxSlice, _)) + beforeQuery = beforeQuery(logPrefix, correlationId, entityType, minSlice, maxSlice, _)) } } .mapMaterializedValue(_ => NotUsed) @@ -362,6 +366,7 @@ import org.slf4j.Logger def liveBySlices( logPrefix: String, + correlationId: Option[String], entityType: String, minSlice: Int, maxSlice: Int, @@ -555,7 +560,8 @@ import org.slf4j.Logger fromSeqNr, toTimestamp, behindCurrentTime, - backtracking = newState.backtracking) + backtracking = newState.backtracking, + correlationId) .filter { row => filterEventsBeforeSnapshots(row.persistenceId, row.seqNr, row.source) } @@ -589,7 +595,7 @@ import org.slf4j.Logger updateState = nextOffset, delayNextQuery = delayNextQuery, nextQuery = nextQuery, - beforeQuery = beforeQuery(logPrefix, entityType, minSlice, maxSlice, _), + beforeQuery = beforeQuery(logPrefix, correlationId, entityType, minSlice, maxSlice, _), heartbeat = nextHeartbeat) } } @@ -598,6 +604,7 @@ import org.slf4j.Logger private def beforeQuery( logPrefix: String, + correlationId: Option[String], entityType: String, minSlice: Int, maxSlice: Int, @@ -624,7 +631,7 @@ import org.slf4j.Logger } val futureState = - dao.countBuckets(entityType, minSlice, maxSlice, fromTimestamp, Buckets.Limit).map { counts => + dao.countBuckets(entityType, minSlice, maxSlice, fromTimestamp, Buckets.Limit, correlationId).map { counts => val newBuckets = state.buckets.add(counts) val newState = state.copy(buckets = newBuckets) if (log.isDebugEnabled) { diff --git a/core/src/main/scala/akka/persistence/r2dbc/internal/CorrelationId.scala b/core/src/main/scala/akka/persistence/r2dbc/internal/CorrelationId.scala new file mode 100644 index 00000000..fa077480 --- /dev/null +++ b/core/src/main/scala/akka/persistence/r2dbc/internal/CorrelationId.scala @@ -0,0 +1,18 @@ +/* + * Copyright (C) 2022 - 2025 Lightbend Inc. + */ + +package akka.persistence.r2dbc.internal + +import akka.annotation.InternalApi + +/** + * INTERNAL API + */ +@InternalApi +object CorrelationId { + def toLogText(correlationid: Option[String]) = correlationid match { + case Some(id) => s", correlation [$id]" + case None => "" + } +} diff --git a/core/src/main/scala/akka/persistence/r2dbc/internal/QueryDao.scala b/core/src/main/scala/akka/persistence/r2dbc/internal/QueryDao.scala index 38a10783..71d08821 100644 --- a/core/src/main/scala/akka/persistence/r2dbc/internal/QueryDao.scala +++ b/core/src/main/scala/akka/persistence/r2dbc/internal/QueryDao.scala @@ -23,19 +23,32 @@ private[r2dbc] trait QueryDao extends BySliceQuery.Dao[SerializedJournalRow] { */ override def countBucketsMayChange: Boolean = false - def timestampOfEvent(persistenceId: String, seqNr: Long): Future[Option[Instant]] + def timestampOfEvent(persistenceId: String, seqNr: Long, correlationId: Option[String]): Future[Option[Instant]] - def latestEventTimestamp(entityType: String, minSlice: Int, maxSlice: Int): Future[Option[Instant]] + def latestEventTimestamp( + entityType: String, + minSlice: Int, + maxSlice: Int, + correlationId: Option[String]): Future[Option[Instant]] - def loadEvent(persistenceId: String, seqNr: Long, includePayload: Boolean): Future[Option[SerializedJournalRow]] + def loadEvent( + persistenceId: String, + seqNr: Long, + includePayload: Boolean, + correlationId: Option[String]): Future[Option[SerializedJournalRow]] - def loadLastEvent(persistenceId: String, toSeqNr: Long, includeDeleted: Boolean): Future[Option[SerializedJournalRow]] + def loadLastEvent( + persistenceId: String, + toSeqNr: Long, + includeDeleted: Boolean, + correlationId: Option[String]): Future[Option[SerializedJournalRow]] def eventsByPersistenceId( persistenceId: String, fromSequenceNr: Long, toSequenceNr: Long, - includeDeleted: Boolean): Source[SerializedJournalRow, NotUsed] + includeDeleted: Boolean, + correlationId: Option[String]): Source[SerializedJournalRow, NotUsed] def persistenceIds(entityType: String, afterId: Option[String], limit: Long): Source[String, NotUsed] diff --git a/core/src/main/scala/akka/persistence/r2dbc/internal/postgres/PostgresDurableStateDao.scala b/core/src/main/scala/akka/persistence/r2dbc/internal/postgres/PostgresDurableStateDao.scala index a8defef0..fb0607c6 100644 --- a/core/src/main/scala/akka/persistence/r2dbc/internal/postgres/PostgresDurableStateDao.scala +++ b/core/src/main/scala/akka/persistence/r2dbc/internal/postgres/PostgresDurableStateDao.scala @@ -7,21 +7,18 @@ package akka.persistence.r2dbc.internal.postgres import java.lang import java.time.Instant import java.util - import scala.collection.immutable import scala.concurrent.ExecutionContext import scala.concurrent.Future import scala.concurrent.duration.Duration import scala.concurrent.duration.FiniteDuration import scala.util.control.NonFatal - import io.r2dbc.spi.Connection import io.r2dbc.spi.R2dbcDataIntegrityViolationException import io.r2dbc.spi.Row import io.r2dbc.spi.Statement import org.slf4j.Logger import org.slf4j.LoggerFactory - import akka.Done import akka.NotUsed import akka.actor.typed.ActorSystem @@ -36,6 +33,7 @@ import akka.persistence.r2dbc.internal.AdditionalColumnFactory import akka.persistence.r2dbc.internal.BySliceQuery.Buckets import akka.persistence.r2dbc.internal.BySliceQuery.Buckets.Bucket import akka.persistence.r2dbc.internal.ChangeHandlerFactory +import akka.persistence.r2dbc.internal.CorrelationId import akka.persistence.r2dbc.internal.Dialect import akka.persistence.r2dbc.internal.DurableStateDao import akka.persistence.r2dbc.internal.InstantFactory @@ -654,11 +652,12 @@ private[r2dbc] class PostgresDurableStateDao(executorProvider: R2dbcExecutorProv fromSeqNr: Option[Long], toTimestamp: Option[Instant], behindCurrentTime: FiniteDuration, - backtracking: Boolean): Source[SerializedStateRow, NotUsed] = { + backtracking: Boolean, + correlationId: Option[String]): Source[SerializedStateRow, NotUsed] = { if (!settings.isSliceRangeWithinSameDataPartition(minSlice, maxSlice)) throw new IllegalArgumentException( s"Slice range [$minSlice-$maxSlice] spans over more than one " + - s"of the [${settings.numberOfDataPartitions}] data partitions.") + s"of the [${settings.numberOfDataPartitions}] data partitions" + CorrelationId.toLogText(correlationId)) val executor = executorProvider.executorFor(minSlice) val result = executor.select(s"select stateBySlices [$minSlice - $maxSlice]")( @@ -705,8 +704,11 @@ private[r2dbc] class PostgresDurableStateDao(executorProvider: R2dbcExecutorProv tags = Set.empty // tags not fetched in queries (yet) )) - if (log.isDebugEnabled) - result.foreach(rows => log.debug("Read [{}] durable states from slices [{} - {}]", rows.size, minSlice, maxSlice)) + if (log.isDebugEnabled) { + val correlationText = CorrelationId.toLogText(correlationId) + result.foreach(rows => + log.debug("Read [{}] durable states from slices [{} - {}]{}", rows.size, minSlice, maxSlice, correlationText)) + } Source.futureSource(result.map(Source(_))).mapMaterializedValue(_ => NotUsed) } @@ -861,7 +863,8 @@ private[r2dbc] class PostgresDurableStateDao(executorProvider: R2dbcExecutorProv minSlice: Int, maxSlice: Int, fromTimestamp: Instant, - limit: Int): Future[Seq[Bucket]] = { + limit: Int, + correlationId: Option[String]): Future[Seq[Bucket]] = { val now = InstantFactory.now() // not important to use database time val toTimestamp = { @@ -886,8 +889,11 @@ private[r2dbc] class PostgresDurableStateDao(executorProvider: R2dbcExecutorProv Bucket(bucketStartEpochSeconds, count) }) - if (log.isDebugEnabled) - result.foreach(rows => log.debug("Read [{}] bucket counts from slices [{} - {}]", rows.size, minSlice, maxSlice)) + if (log.isDebugEnabled) { + val correlationText = CorrelationId.toLogText(correlationId) + result.foreach(rows => + log.debug("Read [{}] bucket counts from slices [{} - {}]{}", rows.size, minSlice, maxSlice, correlationText)) + } if (toTimestamp == now) result diff --git a/core/src/main/scala/akka/persistence/r2dbc/internal/postgres/PostgresQueryDao.scala b/core/src/main/scala/akka/persistence/r2dbc/internal/postgres/PostgresQueryDao.scala index a35bb408..5ac1b30a 100644 --- a/core/src/main/scala/akka/persistence/r2dbc/internal/postgres/PostgresQueryDao.scala +++ b/core/src/main/scala/akka/persistence/r2dbc/internal/postgres/PostgresQueryDao.scala @@ -5,18 +5,15 @@ package akka.persistence.r2dbc.internal.postgres import java.time.Instant - import scala.collection.immutable import scala.concurrent.ExecutionContext import scala.concurrent.Future import scala.concurrent.duration.Duration import scala.concurrent.duration.FiniteDuration - import io.r2dbc.spi.Row import io.r2dbc.spi.Statement import org.slf4j.Logger import org.slf4j.LoggerFactory - import akka.NotUsed import akka.actor.typed.ActorSystem import akka.annotation.InternalApi @@ -25,6 +22,7 @@ import akka.persistence.r2dbc.R2dbcSettings import akka.persistence.r2dbc.internal.BySliceQuery.Buckets import akka.persistence.r2dbc.internal.BySliceQuery.Buckets.Bucket import akka.persistence.r2dbc.internal.BySliceQuery.Buckets.BucketDurationSeconds +import akka.persistence.r2dbc.internal.CorrelationId import akka.persistence.r2dbc.internal.InstantFactory import akka.persistence.r2dbc.internal.JournalDao.SerializedJournalRow import akka.persistence.r2dbc.internal.QueryDao @@ -287,7 +285,8 @@ private[r2dbc] class PostgresQueryDao(executorProvider: R2dbcExecutorProvider) e fromSeqNr: Option[Long], toTimestamp: Option[Instant], behindCurrentTime: FiniteDuration, - backtracking: Boolean): Source[SerializedJournalRow, NotUsed] = { + backtracking: Boolean, + correlationId: Option[String]): Source[SerializedJournalRow, NotUsed] = { if (!settings.isSliceRangeWithinSameDataPartition(minSlice, maxSlice)) throw new IllegalArgumentException( @@ -337,8 +336,15 @@ private[r2dbc] class PostgresQueryDao(executorProvider: R2dbcExecutorProvider) e tags = row.getTags("tags"), metadata = readMetadata(row))) - if (log.isDebugEnabled) - result.foreach(rows => log.debug("Read [{}] events from slices [{} - {}]", rows.size, minSlice, maxSlice)) + if (log.isDebugEnabled) { + result.foreach(rows => + log.debug( + "Read [{}] events from slices [{} - {}]{}", + rows.size, + minSlice, + maxSlice, + CorrelationId.toLogText(correlationId))) + } Source.futureSource(result.map(Source(_))).mapMaterializedValue(_ => NotUsed) } @@ -361,7 +367,8 @@ private[r2dbc] class PostgresQueryDao(executorProvider: R2dbcExecutorProvider) e minSlice: Int, maxSlice: Int, fromTimestamp: Instant, - limit: Int): Future[Seq[Bucket]] = { + limit: Int, + correlationId: Option[String]): Future[Seq[Bucket]] = { val executor = executorProvider.executorFor(minSlice) val now = InstantFactory.now() // not important to use database time @@ -387,7 +394,13 @@ private[r2dbc] class PostgresQueryDao(executorProvider: R2dbcExecutorProvider) e }) if (log.isDebugEnabled) - result.foreach(rows => log.debug("Read [{}] bucket counts from slices [{} - {}]", rows.size, minSlice, maxSlice)) + result.foreach(rows => + log.debug( + "Read [{}] bucket counts from slices [{} - {}]{}", + rows.size, + minSlice, + maxSlice, + CorrelationId.toLogText(correlationId))) if (toTimestamp == now) result @@ -400,7 +413,10 @@ private[r2dbc] class PostgresQueryDao(executorProvider: R2dbcExecutorProvider) e */ override def countBucketsMayChange: Boolean = false - override def timestampOfEvent(persistenceId: String, seqNr: Long): Future[Option[Instant]] = { + override def timestampOfEvent( + persistenceId: String, + seqNr: Long, + correlationId: Option[String]): Future[Option[Instant]] = { val slice = persistenceExt.sliceForPersistenceId(persistenceId) val executor = executorProvider.executorFor(slice) executor.selectOne("select timestampOfEvent")( @@ -412,7 +428,11 @@ private[r2dbc] class PostgresQueryDao(executorProvider: R2dbcExecutorProvider) e row => row.getTimestamp("db_timestamp")) } - override def latestEventTimestamp(entityType: String, minSlice: Int, maxSlice: Int): Future[Option[Instant]] = { + override def latestEventTimestamp( + entityType: String, + minSlice: Int, + maxSlice: Int, + correlationId: Option[String]): Future[Option[Instant]] = { val executor = executorProvider.executorFor(minSlice) val result = executor .selectOne(s"select latest event timestamp for entity type [$entityType] slice range [$minSlice - $maxSlice]")( @@ -428,11 +448,12 @@ private[r2dbc] class PostgresQueryDao(executorProvider: R2dbcExecutorProvider) e if (log.isDebugEnabled) result.foreach(timestamp => log.debug( - "Latest event timestamp for entity type [{}] slice range [{} - {}]: [{}]", + "Latest event timestamp for entity type [{}] slice range [{} - {}]: [{}]{}", entityType, minSlice, maxSlice, - timestamp)) + timestamp, + CorrelationId.toLogText(correlationId))) result } @@ -440,7 +461,8 @@ private[r2dbc] class PostgresQueryDao(executorProvider: R2dbcExecutorProvider) e override def loadEvent( persistenceId: String, seqNr: Long, - includePayload: Boolean): Future[Option[SerializedJournalRow]] = { + includePayload: Boolean, + correlationId: Option[String]): Future[Option[SerializedJournalRow]] = { val slice = persistenceExt.sliceForPersistenceId(persistenceId) val executor = executorProvider.executorFor(slice) executor.selectOne("select one event")( @@ -475,7 +497,8 @@ private[r2dbc] class PostgresQueryDao(executorProvider: R2dbcExecutorProvider) e override def loadLastEvent( persistenceId: String, toSeqNr: Long, - includeDeleted: Boolean): Future[Option[SerializedJournalRow]] = { + includeDeleted: Boolean, + correlationId: Option[String]): Future[Option[SerializedJournalRow]] = { val slice = persistenceExt.sliceForPersistenceId(persistenceId) val executor = executorProvider.executorFor(slice) val selectSql = if (includeDeleted) selectLastEventIncludeDeletedSql(slice) else selectLastEventSql(slice) @@ -512,7 +535,8 @@ private[r2dbc] class PostgresQueryDao(executorProvider: R2dbcExecutorProvider) e persistenceId: String, fromSequenceNr: Long, toSequenceNr: Long, - includeDeleted: Boolean): Source[SerializedJournalRow, NotUsed] = { + includeDeleted: Boolean, + correlationId: Option[String]): Source[SerializedJournalRow, NotUsed] = { val slice = persistenceExt.sliceForPersistenceId(persistenceId) val executor = executorProvider.executorFor(slice) val result = executor.select(s"select eventsByPersistenceId [$persistenceId]")( @@ -542,7 +566,12 @@ private[r2dbc] class PostgresQueryDao(executorProvider: R2dbcExecutorProvider) e }) if (log.isDebugEnabled) - result.foreach(rows => log.debug("Read [{}] events for persistenceId [{}]", rows.size, persistenceId)) + result.foreach(rows => + log.debug( + "Read [{}] events for persistenceId [{}]{}", + rows.size, + persistenceId, + CorrelationId.toLogText(correlationId))) Source.futureSource(result.map(Source(_))).mapMaterializedValue(_ => NotUsed) } diff --git a/core/src/main/scala/akka/persistence/r2dbc/internal/postgres/PostgresSnapshotDao.scala b/core/src/main/scala/akka/persistence/r2dbc/internal/postgres/PostgresSnapshotDao.scala index 83afe8ae..16845789 100644 --- a/core/src/main/scala/akka/persistence/r2dbc/internal/postgres/PostgresSnapshotDao.scala +++ b/core/src/main/scala/akka/persistence/r2dbc/internal/postgres/PostgresSnapshotDao.scala @@ -5,16 +5,13 @@ package akka.persistence.r2dbc.internal.postgres import java.time.Instant - import scala.concurrent.ExecutionContext import scala.concurrent.Future import scala.concurrent.duration.FiniteDuration - import io.r2dbc.spi.Row import io.r2dbc.spi.Statement import org.slf4j.Logger import org.slf4j.LoggerFactory - import akka.NotUsed import akka.actor.typed.ActorSystem import akka.annotation.InternalApi @@ -23,6 +20,7 @@ import akka.persistence.SnapshotSelectionCriteria import akka.persistence.r2dbc.R2dbcSettings import akka.persistence.r2dbc.internal.BySliceQuery.Buckets import akka.persistence.r2dbc.internal.BySliceQuery.Buckets.Bucket +import akka.persistence.r2dbc.internal.CorrelationId import akka.persistence.r2dbc.internal.InstantFactory import akka.persistence.r2dbc.internal.codec.PayloadCodec.RichRow import akka.persistence.r2dbc.internal.codec.PayloadCodec.RichStatement @@ -389,11 +387,12 @@ private[r2dbc] class PostgresSnapshotDao(executorProvider: R2dbcExecutorProvider fromSeqNr: Option[Long], toTimestamp: Option[Instant], behindCurrentTime: FiniteDuration, - backtracking: Boolean): Source[SerializedSnapshotRow, NotUsed] = { + backtracking: Boolean, + correlationId: Option[String]): Source[SerializedSnapshotRow, NotUsed] = { if (!settings.isSliceRangeWithinSameDataPartition(minSlice, maxSlice)) throw new IllegalArgumentException( s"Slice range [$minSlice-$maxSlice] spans over more than one " + - s"of the [${settings.numberOfDataPartitions}] data partitions.") + s"of the [${settings.numberOfDataPartitions}] data partitions" + CorrelationId.toLogText(correlationId)) val executor = executorProvider.executorFor(minSlice) val result = executor.select(s"select snapshotsBySlices [$minSlice - $maxSlice]")( @@ -403,8 +402,11 @@ private[r2dbc] class PostgresSnapshotDao(executorProvider: R2dbcExecutorProvider }, collectSerializedSnapshot(entityType, _)) - if (log.isDebugEnabled) - result.foreach(rows => log.debug("Read [{}] snapshots from slices [{} - {}]", rows.size, minSlice, maxSlice)) + if (log.isDebugEnabled) { + val correlationText = CorrelationId.toLogText(correlationId) + result.foreach(rows => + log.debug("Read [{}] snapshots from slices [{} - {}]{}", rows.size, minSlice, maxSlice, correlationText)) + } Source.futureSource(result.map(Source(_))).mapMaterializedValue(_ => NotUsed) } @@ -436,7 +438,8 @@ private[r2dbc] class PostgresSnapshotDao(executorProvider: R2dbcExecutorProvider minSlice: Int, maxSlice: Int, fromTimestamp: Instant, - limit: Int): Future[Seq[Bucket]] = { + limit: Int, + correlationId: Option[String]): Future[Seq[Bucket]] = { val now = InstantFactory.now() // not important to use database time val toTimestamp = { @@ -462,8 +465,11 @@ private[r2dbc] class PostgresSnapshotDao(executorProvider: R2dbcExecutorProvider Bucket(bucketStartEpochSeconds, count) }) - if (log.isDebugEnabled) - result.foreach(rows => log.debug("Read [{}] bucket counts from slices [{} - {}]", rows.size, minSlice, maxSlice)) + if (log.isDebugEnabled) { + val correlationText = CorrelationId.toLogText(correlationId) + result.foreach(rows => + log.debug("Read [{}] bucket counts from slices [{} - {}]{}", rows.size, minSlice, maxSlice, correlationText)) + } if (toTimestamp == now) result diff --git a/core/src/main/scala/akka/persistence/r2dbc/query/scaladsl/R2dbcReadJournal.scala b/core/src/main/scala/akka/persistence/r2dbc/query/scaladsl/R2dbcReadJournal.scala index 509ee714..b53c250a 100644 --- a/core/src/main/scala/akka/persistence/r2dbc/query/scaladsl/R2dbcReadJournal.scala +++ b/core/src/main/scala/akka/persistence/r2dbc/query/scaladsl/R2dbcReadJournal.scala @@ -26,6 +26,7 @@ import akka.persistence.Persistence import akka.persistence.SerializedEvent import akka.persistence.SnapshotSelectionCriteria import akka.persistence.query.Offset +import akka.persistence.query.QueryCorrelationId import akka.persistence.query.TimestampOffset import akka.persistence.query.TimestampOffset.toTimestampOffset import akka.persistence.query.scaladsl._ @@ -46,6 +47,7 @@ import akka.persistence.query.{ EventEnvelope => ClassicEventEnvelope } import akka.persistence.r2dbc.R2dbcSettings import akka.persistence.r2dbc.internal.BySliceQuery import akka.persistence.r2dbc.internal.ContinuousQuery +import akka.persistence.r2dbc.internal.CorrelationId import akka.persistence.r2dbc.internal.EnvelopeOrigin import akka.persistence.r2dbc.internal.JournalDao.SerializedJournalRow import akka.persistence.r2dbc.internal.PubSub @@ -283,9 +285,12 @@ final class R2dbcReadJournal(system: ExtendedActorSystem, config: Config, cfgPat minSlice: Int, maxSlice: Int, offset: Offset): Source[EventEnvelope[Event], NotUsed] = { + val correlationId = QueryCorrelationId.get() + val correlationIdText = CorrelationId.toLogText(correlationId) bySlice(entityType, minSlice) .currentBySlices( - s"[$entityType] currentEventsBySlices [$minSlice-$maxSlice]: ", + s"[$entityType] currentEventsBySlices [$minSlice-$maxSlice]$correlationIdText: ", + correlationId, entityType, minSlice, maxSlice, @@ -330,16 +335,19 @@ final class R2dbcReadJournal(system: ExtendedActorSystem, config: Config, cfgPat minSlice: Int, maxSlice: Int, offset: Offset): Source[EventEnvelope[Event], NotUsed] = { + val correlationId = QueryCorrelationId.get() + val correlationIdText = CorrelationId.toLogText(correlationId) val dbSource = bySlice[Event](entityType, minSlice).liveBySlices( - s"[$entityType] eventsBySlices [$minSlice-$maxSlice]: ", + s"[$entityType] eventsBySlices [$minSlice-$maxSlice]$correlationIdText: ", + correlationId, entityType, minSlice, maxSlice, offset) if (settings.journalPublishEvents) { val pubSubSource = eventsBySlicesPubSubSource[Event](entityType, minSlice, maxSlice) - mergeDbAndPubSubSources(dbSource, pubSubSource) + mergeDbAndPubSubSources(dbSource, pubSubSource, correlationId) } else dbSource } @@ -366,11 +374,13 @@ final class R2dbcReadJournal(system: ExtendedActorSystem, config: Config, cfgPat transformSnapshot: Snapshot => Event): Source[EventEnvelope[Event], NotUsed] = { checkStartFromSnapshotEnabled("currentEventsBySlicesStartingFromSnapshots") val timestampOffset = toTimestampOffset(offset) - + val correlationId = QueryCorrelationId.get() + val correlationIdText = CorrelationId.toLogText(correlationId) val snapshotSource = snapshotsBySlice[Snapshot, Event](entityType, minSlice, transformSnapshot) .currentBySlices( - s"[$entityType] currentSnapshotsBySlices [$minSlice-$maxSlice]: ", + s"[$entityType] currentSnapshotsBySlices [$minSlice-$maxSlice]$correlationIdText: ", + correlationId, entityType, minSlice, maxSlice, @@ -397,7 +407,8 @@ final class R2dbcReadJournal(system: ExtendedActorSystem, config: Config, cfgPat snapshotOffsets.size) bySlice(entityType, minSlice).currentBySlices( - s"[$entityType] currentEventsBySlices [$minSlice-$maxSlice]: ", + s"[$entityType] currentEventsBySlices [$minSlice-$maxSlice]$correlationIdText: ", + correlationId, entityType, minSlice, maxSlice, @@ -428,11 +439,13 @@ final class R2dbcReadJournal(system: ExtendedActorSystem, config: Config, cfgPat transformSnapshot: Snapshot => Event): Source[EventEnvelope[Event], NotUsed] = { checkStartFromSnapshotEnabled("eventsBySlicesStartingFromSnapshots") val timestampOffset = toTimestampOffset(offset) - + val correlationId = QueryCorrelationId.get() + val correlationIdText = CorrelationId.toLogText(correlationId) val snapshotSource = snapshotsBySlice[Snapshot, Event](entityType, minSlice, transformSnapshot) .currentBySlices( - s"[$entityType] snapshotsBySlices [$minSlice-$maxSlice]: ", + s"[$entityType] snapshotsBySlices [$minSlice-$maxSlice]$correlationIdText: ", + correlationId, entityType, minSlice, maxSlice, @@ -454,13 +467,14 @@ final class R2dbcReadJournal(system: ExtendedActorSystem, config: Config, cfgPat } log.debug( - "eventsBySlicesStartingFromSnapshots initOffset [{}] with [{}] snapshots", + s"eventsBySlicesStartingFromSnapshots $correlationIdText initOffset [{}] with [{}] snapshots", initOffset, snapshotOffsets.size) val dbSource = bySlice[Event](entityType, minSlice).liveBySlices( - s"[$entityType] eventsBySlices [$minSlice-$maxSlice]: ", + s"[$entityType] eventsBySlices [$minSlice-$maxSlice]$correlationIdText: ", + correlationId, entityType, minSlice, maxSlice, @@ -474,7 +488,7 @@ final class R2dbcReadJournal(system: ExtendedActorSystem, config: Config, cfgPat // know when memory of that Map can be released or the filter function would have to be shared // and thread safe, which is not worth it. val pubSubSource = eventsBySlicesPubSubSource[Event](entityType, minSlice, maxSlice) - mergeDbAndPubSubSources(dbSource, pubSubSource) + mergeDbAndPubSubSources(dbSource, pubSubSource, correlationId) } else dbSource })) @@ -550,13 +564,15 @@ final class R2dbcReadJournal(system: ExtendedActorSystem, config: Config, cfgPat private def mergeDbAndPubSubSources[Event, Snapshot]( dbSource: Source[EventEnvelope[Event], NotUsed], - pubSubSource: Source[EventEnvelope[Event], NotUsed]) = { + pubSubSource: Source[EventEnvelope[Event], NotUsed], + correlationId: Option[String]) = { dbSource .mergePrioritized(pubSubSource, leftPriority = 1, rightPriority = 10) .via( skipPubSubTooFarAhead( settings.querySettings.backtrackingEnabled, - JDuration.ofMillis(settings.querySettings.backtrackingWindow.toMillis))) + JDuration.ofMillis(settings.querySettings.backtrackingWindow.toMillis), + correlationId)) .via(deduplicate(settings.querySettings.deduplicateCapacity)) } @@ -605,7 +621,8 @@ final class R2dbcReadJournal(system: ExtendedActorSystem, config: Config, cfgPat */ @InternalApi private[akka] def skipPubSubTooFarAhead[Event]( enabled: Boolean, - maxAheadOfBacktracking: JDuration): Flow[EventEnvelope[Event], EventEnvelope[Event], NotUsed] = { + maxAheadOfBacktracking: JDuration, + correlationId: Option[String]): Flow[EventEnvelope[Event], EventEnvelope[Event], NotUsed] = { if (!enabled) Flow[EventEnvelope[Event]] else @@ -624,29 +641,32 @@ final class R2dbcReadJournal(system: ExtendedActorSystem, config: Config, cfgPat Nil // always drop heartbeats } else if (EnvelopeOrigin.fromPubSub(env) && latestBacktracking == Instant.EPOCH) { log.trace( - "Dropping pubsub event for persistenceId [{}] seqNr [{}] because no event from backtracking yet.", + "Dropping pubsub event for persistenceId [{}] seqNr [{}] because no event from backtracking yet{}", env.persistenceId, - env.sequenceNr) + env.sequenceNr, + CorrelationId.toLogText(correlationId)) Nil } else if (EnvelopeOrigin.fromPubSub(env) && JDuration .between(latestBacktracking, t.timestamp) .compareTo(maxAheadOfBacktracking) > 0) { // drop from pubsub when too far ahead from backtracking log.debug( - "Dropping pubsub event for persistenceId [{}] seqNr [{}] because too far ahead of backtracking.", + "Dropping pubsub event for persistenceId [{}] seqNr [{}] because too far ahead of backtracking{}", env.persistenceId, - env.sequenceNr) + env.sequenceNr, + CorrelationId.toLogText(correlationId)) Nil } else { if (log.isDebugEnabled()) { if (latestBacktracking.isAfter(t.timestamp)) log.debug( "Event from query for persistenceId [{}] seqNr [{}] timestamp [{}]" + - " was before latest timestamp from backtracking or heartbeat [{}].", + " was before latest timestamp from backtracking or heartbeat [{}]{}", env.persistenceId, env.sequenceNr, t.timestamp, - latestBacktracking) + latestBacktracking, + CorrelationId.toLogText(correlationId)) } env :: Nil } @@ -660,17 +680,21 @@ final class R2dbcReadJournal(system: ExtendedActorSystem, config: Config, cfgPat override def currentEventsByPersistenceId( persistenceId: String, fromSequenceNr: Long, - toSequenceNr: Long): Source[ClassicEventEnvelope, NotUsed] = - internalCurrentEventsByPersistenceId(persistenceId, fromSequenceNr, toSequenceNr) + toSequenceNr: Long): Source[ClassicEventEnvelope, NotUsed] = { + val correlationId = QueryCorrelationId.get() + internalCurrentEventsByPersistenceId(persistenceId, fromSequenceNr, toSequenceNr, correlationId = correlationId) .map(deserializeRow) + } @ApiMayChange override def currentEventsByPersistenceIdTyped[Event]( persistenceId: String, fromSequenceNr: Long, - toSequenceNr: Long): Source[EventEnvelope[Event], NotUsed] = - internalCurrentEventsByPersistenceId(persistenceId, fromSequenceNr, toSequenceNr) + toSequenceNr: Long): Source[EventEnvelope[Event], NotUsed] = { + val correlationId = QueryCorrelationId.get() + internalCurrentEventsByPersistenceId(persistenceId, fromSequenceNr, toSequenceNr, correlationId = correlationId) .map(deserializeBySliceRow[Event]) + } /** * INTERNAL API: Used by both journal replay and currentEventsByPersistenceId @@ -680,7 +704,9 @@ final class R2dbcReadJournal(system: ExtendedActorSystem, config: Config, cfgPat fromSequenceNr: Long, toSequenceNr: Long, readHighestSequenceNr: Boolean = true, - includeDeleted: Boolean = false): Source[SerializedJournalRow, NotUsed] = { + includeDeleted: Boolean = false, + correlationId: Option[String] = None): Source[SerializedJournalRow, NotUsed] = { + lazy val correlationLogText = CorrelationId.toLogText(QueryCorrelationId.get()) def updateState(state: ByPersistenceIdState, row: SerializedJournalRow): ByPersistenceIdState = state.copy(rowCount = state.rowCount + 1, latestSeqNr = row.seqNr) @@ -693,22 +719,24 @@ final class R2dbcReadJournal(system: ExtendedActorSystem, config: Config, cfgPat if (state.queryCount != 0 && log.isDebugEnabled()) log.debug( - "currentEventsByPersistenceId query [{}] for persistenceId [{}], from [{}] to [{}]. Found [{}] rows in previous query.", + "currentEventsByPersistenceId query [{}] for persistenceId [{}], from [{}] to [{}]. Found [{}] rows in previous query{}", state.queryCount, persistenceId, state.latestSeqNr + 1, highestSeqNr, - state.rowCount) + state.rowCount, + correlationLogText) newState -> Some( queryDao - .eventsByPersistenceId(persistenceId, state.latestSeqNr + 1, highestSeqNr, includeDeleted)) + .eventsByPersistenceId(persistenceId, state.latestSeqNr + 1, highestSeqNr, includeDeleted, correlationId)) } else { log.debug( - "currentEventsByPersistenceId query [{}] for persistenceId [{}] completed. Found [{}] rows in previous query.", + "currentEventsByPersistenceId query [{}] for persistenceId [{}] completed. Found [{}] rows in previous query{}", state.queryCount, persistenceId, - state.rowCount) + state.rowCount, + correlationLogText) state -> None } @@ -716,10 +744,11 @@ final class R2dbcReadJournal(system: ExtendedActorSystem, config: Config, cfgPat if (log.isDebugEnabled()) log.debug( - "currentEventsByPersistenceId query for persistenceId [{}], from [{}] to [{}].", + "currentEventsByPersistenceId query for persistenceId [{}], from [{}] to [{}].{}", persistenceId, fromSequenceNr, - toSequenceNr) + toSequenceNr, + correlationLogText) val highestSeqNrFut = if (readHighestSequenceNr && toSequenceNr == Long.MaxValue) @@ -746,15 +775,17 @@ final class R2dbcReadJournal(system: ExtendedActorSystem, config: Config, cfgPat persistenceId: String, toSequenceNr: Long, includeDeleted: Boolean): Future[Option[SerializedJournalRow]] = { - queryDao.loadLastEvent(persistenceId, toSequenceNr, includeDeleted) + queryDao.loadLastEvent(persistenceId, toSequenceNr, includeDeleted, None) } // EventTimestampQuery override def timestampOf(persistenceId: String, sequenceNr: Long): Future[Option[Instant]] = { - val result = queryDao.timestampOfEvent(persistenceId, sequenceNr) + val correlationId = QueryCorrelationId.get() + val result = queryDao.timestampOfEvent(persistenceId, sequenceNr, correlationId) if (log.isDebugEnabled) { + lazy val correlationLogText = CorrelationId.toLogText(QueryCorrelationId.get()) result.foreach { t => - log.debug("[{}] timestampOf seqNr [{}] is [{}]", persistenceId, sequenceNr, t) + log.debug("[{}] timestampOf seqNr [{}] is [{}]{}", persistenceId, sequenceNr, t, correlationLogText) } } result @@ -762,6 +793,8 @@ final class R2dbcReadJournal(system: ExtendedActorSystem, config: Config, cfgPat // LatestEventTimestampQuery override def latestEventTimestamp(entityType: String, minSlice: Int, maxSlice: Int): Future[Option[Instant]] = { + val correlationId = QueryCorrelationId.get() + lazy val correlationLogText = CorrelationId.toLogText(QueryCorrelationId.get()) settings.querySettings.cacheLatestEventTimestamp match { case Some(cacheTtl) if cacheTtl > Duration.Zero => val cacheKey = EntitySliceRange(entityType, minSlice, maxSlice) @@ -770,37 +803,40 @@ final class R2dbcReadJournal(system: ExtendedActorSystem, config: Config, cfgPat val now = clock.instant() if (expiry.exists(now.isBefore)) { // cache hit and not expired log.debug( - "[{}] latestEventTimestamp for slices [{} - {}] is [{}] (was cached at [{}])", + "[{}] latestEventTimestamp for slices [{} - {}] is [{}] (was cached at [{}]){}", entityType, minSlice, maxSlice, cachedValue.timestamp, - cachedValue.cachedAt) + cachedValue.cachedAt, + correlationLogText) Future.successful(cachedValue.timestamp) } else { // cache miss or expired, fetch and cache - val result = queryDao.latestEventTimestamp(entityType, minSlice, maxSlice) + val result = queryDao.latestEventTimestamp(entityType, minSlice, maxSlice, correlationId) result.foreach { timestamp => log.debug( - "[{}] latestEventTimestamp for slices [{} - {}] is [{}] (caching with TTL [{}])", + "[{}] latestEventTimestamp for slices [{} - {}] is [{}] (caching with TTL [{}]){}", entityType, minSlice, maxSlice, timestamp, - cacheTtl.toCoarsest) + cacheTtl.toCoarsest, + correlationLogText) latestEventTimestampCache.put(cacheKey, CachedTimestamp(timestamp, now)) } result } case _ => // caching disabled - val result = queryDao.latestEventTimestamp(entityType, minSlice, maxSlice) + val result = queryDao.latestEventTimestamp(entityType, minSlice, maxSlice, correlationId) if (log.isDebugEnabled) { result.foreach { timestamp => log.debug( - "[{}] latestEventTimestamp for slices [{} - {}] is [{}]", + "[{}] latestEventTimestamp for slices [{} - {}] is [{}]{}", entityType, minSlice, maxSlice, - timestamp) + timestamp, + correlationLogText) } } result @@ -812,9 +848,13 @@ final class R2dbcReadJournal(system: ExtendedActorSystem, config: Config, cfgPat //LoadEventQuery override def loadEnvelope[Event](persistenceId: String, sequenceNr: Long): Future[EventEnvelope[Event]] = { - log.debug("[{}] loadEnvelope seqNr [{}]", persistenceId, sequenceNr) + val correlationId = QueryCorrelationId.get() + if (log.isDebugEnabled()) { + val correlationLogText = CorrelationId.toLogText(QueryCorrelationId.get()) + log.debug("[{}] loadEnvelope seqNr [{}]{}", persistenceId, sequenceNr, correlationLogText) + } queryDao - .loadEvent(persistenceId, sequenceNr, includePayload = true) + .loadEvent(persistenceId, sequenceNr, includePayload = true, correlationId) .map { case Some(row) => deserializeBySliceRow(row) case None => @@ -841,9 +881,15 @@ final class R2dbcReadJournal(system: ExtendedActorSystem, config: Config, cfgPat private def internalEventsByPersistenceId( persistenceId: String, fromSequenceNr: Long, - toSequenceNr: Long): Source[SerializedJournalRow, NotUsed] = { - - log.debug("Starting eventsByPersistenceId query for persistenceId [{}], from [{}].", persistenceId, fromSequenceNr) + toSequenceNr: Long, + correlationId: Option[String] = None): Source[SerializedJournalRow, NotUsed] = { + lazy val correlationLogText = CorrelationId.toLogText(QueryCorrelationId.get()) + if (log.isDebugEnabled) + log.debug( + "Starting eventsByPersistenceId query for persistenceId [{}], from [{}]{}", + persistenceId, + fromSequenceNr, + correlationLogText) def nextOffset(state: ByPersistenceIdState, row: SerializedJournalRow): ByPersistenceIdState = state.copy(rowCount = state.rowCount + 1, latestSeqNr = row.seqNr) @@ -854,12 +900,15 @@ final class R2dbcReadJournal(system: ExtendedActorSystem, config: Config, cfgPat settings.querySettings.bufferSize, settings.querySettings.refreshInterval) - delay.foreach { d => - log.debug( - "eventsByPersistenceId query [{}] for persistenceId [{}] delay next [{}] ms.", - state.queryCount, - persistenceId, - d.toMillis) + if (log.isDebugEnabled) { + delay.foreach { d => + log.debug( + "eventsByPersistenceId query [{}] for persistenceId [{}] delay next [{}] ms{}", + state.queryCount, + persistenceId, + d.toMillis, + correlationLogText) + } } delay @@ -868,26 +917,35 @@ final class R2dbcReadJournal(system: ExtendedActorSystem, config: Config, cfgPat def nextQuery( state: ByPersistenceIdState): (ByPersistenceIdState, Option[Source[SerializedJournalRow, NotUsed]]) = { if (state.latestSeqNr >= toSequenceNr) { - log.debug( - "eventsByPersistenceId query [{}] for persistenceId [{}] completed. Found [{}] rows in previous query.", - state.queryCount, - persistenceId, - state.rowCount) + if (log.isDebugEnabled) { + log.debug( + "eventsByPersistenceId query [{}] for persistenceId [{}] completed. Found [{}] rows in previous query{}", + state.queryCount, + persistenceId, + state.rowCount, + correlationLogText) + } state -> None } else { val newState = state.copy(rowCount = 0, queryCount = state.queryCount + 1) - - log.debug( - "eventsByPersistenceId query [{}] for persistenceId [{}], from [{}]. Found [{}] rows in previous query.", - newState.queryCount, - persistenceId, - state.latestSeqNr + 1, - state.rowCount) - + if (log.isDebugEnabled) { + log.debug( + "eventsByPersistenceId query [{}] for persistenceId [{}], from [{}]. Found [{}] rows in previous query{}", + newState.queryCount, + persistenceId, + state.latestSeqNr + 1, + state.rowCount, + correlationLogText) + } newState -> Some( queryDao - .eventsByPersistenceId(persistenceId, state.latestSeqNr + 1, toSequenceNr, includeDeleted = false)) + .eventsByPersistenceId( + persistenceId, + state.latestSeqNr + 1, + toSequenceNr, + includeDeleted = false, + correlationId = correlationId)) } } @@ -904,6 +962,7 @@ final class R2dbcReadJournal(system: ExtendedActorSystem, config: Config, cfgPat toSequenceNr: Long, transformSnapshot: Snapshot => Event): Source[EventEnvelope[Event], NotUsed] = { checkStartFromSnapshotEnabled("currentEventsByPersistenceIdStartingFromSnapshot") + val correlationId = QueryCorrelationId.get() Source .futureSource(snapshotDao.load(persistenceId, SnapshotSelectionCriteria.Latest).map { case Some(snapshotRow) => @@ -912,13 +971,26 @@ final class R2dbcReadJournal(system: ExtendedActorSystem, config: Config, cfgPat val snapshotEnv = createEnvelopeFromSnapshot(snapshotRow, offset, transformSnapshot) Source .single(snapshotEnv) - .concat(internalCurrentEventsByPersistenceId(persistenceId, snapshotEnv.sequenceNr + 1, toSequenceNr) - .map(deserializeBySliceRow[Event])) + .concat( + internalCurrentEventsByPersistenceId( + persistenceId, + snapshotEnv.sequenceNr + 1, + toSequenceNr, + correlationId = correlationId) + .map(deserializeBySliceRow[Event])) } else - internalCurrentEventsByPersistenceId(persistenceId, fromSequenceNr, toSequenceNr) + internalCurrentEventsByPersistenceId( + persistenceId, + fromSequenceNr, + toSequenceNr, + correlationId = correlationId) .map(deserializeBySliceRow[Event]) case None => - internalCurrentEventsByPersistenceId(persistenceId, fromSequenceNr, toSequenceNr) + internalCurrentEventsByPersistenceId( + persistenceId, + fromSequenceNr, + toSequenceNr, + correlationId = correlationId) .map(deserializeBySliceRow[Event]) }) @@ -931,6 +1003,7 @@ final class R2dbcReadJournal(system: ExtendedActorSystem, config: Config, cfgPat toSequenceNr: Long, transformSnapshot: Snapshot => Event): Source[EventEnvelope[Event], NotUsed] = { checkStartFromSnapshotEnabled("eventsByPersistenceIdStartingFromSnapshot") + val correlationId = QueryCorrelationId.get() Source .futureSource(snapshotDao.load(persistenceId, SnapshotSelectionCriteria.Latest).map { case Some(snapshotRow) => @@ -939,13 +1012,18 @@ final class R2dbcReadJournal(system: ExtendedActorSystem, config: Config, cfgPat val snapshotEnv = createEnvelopeFromSnapshot(snapshotRow, offset, transformSnapshot) Source .single(snapshotEnv) - .concat(internalEventsByPersistenceId(persistenceId, snapshotEnv.sequenceNr + 1, toSequenceNr) - .map(deserializeBySliceRow[Event])) + .concat( + internalEventsByPersistenceId( + persistenceId, + snapshotEnv.sequenceNr + 1, + toSequenceNr, + correlationId = correlationId) + .map(deserializeBySliceRow[Event])) } else - internalEventsByPersistenceId(persistenceId, fromSequenceNr, toSequenceNr) + internalEventsByPersistenceId(persistenceId, fromSequenceNr, toSequenceNr, correlationId = correlationId) .map(deserializeBySliceRow[Event]) case None => - internalEventsByPersistenceId(persistenceId, fromSequenceNr, toSequenceNr) + internalEventsByPersistenceId(persistenceId, fromSequenceNr, toSequenceNr, correlationId = correlationId) .map(deserializeBySliceRow[Event]) }) diff --git a/core/src/main/scala/akka/persistence/r2dbc/snapshot/R2dbcSnapshotStore.scala b/core/src/main/scala/akka/persistence/r2dbc/snapshot/R2dbcSnapshotStore.scala index 521c0398..e5905658 100644 --- a/core/src/main/scala/akka/persistence/r2dbc/snapshot/R2dbcSnapshotStore.scala +++ b/core/src/main/scala/akka/persistence/r2dbc/snapshot/R2dbcSnapshotStore.scala @@ -100,7 +100,7 @@ private[r2dbc] final class R2dbcSnapshotStore(cfg: Config, cfgPath: String) exte val correspondingEvent: Future[Option[JournalDao.SerializedJournalRow]] = if (settings.querySettings.startFromSnapshotEnabled) - queryDao.loadEvent(metadata.persistenceId, metadata.sequenceNr, includePayload = false) + queryDao.loadEvent(metadata.persistenceId, metadata.sequenceNr, includePayload = false, correlationId = None) else Future.successful(None) diff --git a/core/src/main/scala/akka/persistence/r2dbc/state/scaladsl/R2dbcDurableStateStore.scala b/core/src/main/scala/akka/persistence/r2dbc/state/scaladsl/R2dbcDurableStateStore.scala index 4ab7e42b..3dcbbc22 100644 --- a/core/src/main/scala/akka/persistence/r2dbc/state/scaladsl/R2dbcDurableStateStore.scala +++ b/core/src/main/scala/akka/persistence/r2dbc/state/scaladsl/R2dbcDurableStateStore.scala @@ -309,6 +309,7 @@ class R2dbcDurableStateStore[A](system: ExtendedActorSystem, config: Config, cfg offset: Offset): Source[DurableStateChange[A], NotUsed] = bySlice.currentBySlices( s"[$entityType] currentChangesBySlices [$minSlice-$maxSlice]: ", + None, entityType, minSlice, maxSlice, @@ -321,6 +322,7 @@ class R2dbcDurableStateStore[A](system: ExtendedActorSystem, config: Config, cfg offset: Offset): Source[DurableStateChange[A], NotUsed] = bySlice.liveBySlices( s"[$entityType] changesBySlices [$minSlice-$maxSlice]: ", + None, entityType, minSlice, maxSlice, diff --git a/core/src/test/scala/akka/persistence/r2dbc/query/BucketCountSpec.scala b/core/src/test/scala/akka/persistence/r2dbc/query/BucketCountSpec.scala index c473f8a6..ea4a4665 100644 --- a/core/src/test/scala/akka/persistence/r2dbc/query/BucketCountSpec.scala +++ b/core/src/test/scala/akka/persistence/r2dbc/query/BucketCountSpec.scala @@ -50,7 +50,7 @@ class BucketCountSpec val buckets = dao - .countBuckets(entityType, 0, persistenceExt.numberOfSlices - 1, startTime, Buckets.Limit) + .countBuckets(entityType, 0, persistenceExt.numberOfSlices - 1, startTime, Buckets.Limit, None) .futureValue withClue(s"startTime $startTime ($bucketStartTime): ") { buckets.size shouldBe 10 @@ -82,7 +82,7 @@ class BucketCountSpec val buckets = dao - .countBuckets(entityType, 0, persistenceExt.numberOfSlices - 1, startTime, limit) + .countBuckets(entityType, 0, persistenceExt.numberOfSlices - 1, startTime, limit, None) .futureValue withClue(s"startTime $startTime ($bucketStartTime): ") { buckets.size shouldBe 11 @@ -109,7 +109,7 @@ class BucketCountSpec val buckets = dao - .countBuckets(entityType, 960, 975, startTime, Buckets.Limit) + .countBuckets(entityType, 960, 975, startTime, Buckets.Limit, None) .futureValue buckets.head.startTime shouldBe 1746024900L diff --git a/core/src/test/scala/akka/persistence/r2dbc/query/EventsBySlicePubSubSpec.scala b/core/src/test/scala/akka/persistence/r2dbc/query/EventsBySlicePubSubSpec.scala index e8ea1afe..07260ae8 100644 --- a/core/src/test/scala/akka/persistence/r2dbc/query/EventsBySlicePubSubSpec.scala +++ b/core/src/test/scala/akka/persistence/r2dbc/query/EventsBySlicePubSubSpec.scala @@ -226,7 +226,8 @@ class EventsBySlicePubSubSpec .via( query.skipPubSubTooFarAhead( enabled = true, - maxAheadOfBacktracking = JDuration.ofMillis(settings.querySettings.backtrackingWindow.toMillis))) + maxAheadOfBacktracking = JDuration.ofMillis(settings.querySettings.backtrackingWindow.toMillis), + correlationId = None)) .toMat(TestSink[EventEnvelope[String]]())(Keep.both) .run() out.request(100) diff --git a/project/Dependencies.scala b/project/Dependencies.scala index b87e8ef7..197158bd 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -9,7 +9,7 @@ object Dependencies { val Scala3 = "3.3.7" val Scala2Versions = Seq(Scala213) val ScalaVersions = Dependencies.Scala2Versions :+ Dependencies.Scala3 - val AkkaVersion = System.getProperty("override.akka.version", "2.10.11") + val AkkaVersion = System.getProperty("override.akka.version", "2.10.13") val AkkaVersionInDocs = VersionNumber(AkkaVersion).numbers match { case Seq(major, minor, _*) => s"$major.$minor" } val AkkaPersistenceJdbcVersion = "5.5.2" // only in migration tool tests val AkkaProjectionVersionInDocs = "current"