Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
# internals
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.persistence.r2dbc.internal.BySliceQuery#Dao.rowsBySlices")
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.persistence.r2dbc.internal.BySliceQuery#Dao.countBuckets")
ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.persistence.r2dbc.internal.BySliceQuery#Dao.rowsBySlices")
ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.persistence.r2dbc.internal.BySliceQuery#Dao.countBuckets")
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,8 @@ import org.slf4j.Logger
fromSeqNr: Option[Long], // for events with same timestamp as `fromTimestamp`
toTimestamp: Option[Instant],
behindCurrentTime: FiniteDuration,
backtracking: Boolean): Source[SerializedRow, NotUsed]
backtracking: Boolean,
correlationId: Option[String]): Source[SerializedRow, NotUsed]

/**
* For Durable State we always refresh the bucket counts at the interval. For Event Sourced we know that they don't
Expand All @@ -227,7 +228,8 @@ import org.slf4j.Logger
minSlice: Int,
maxSlice: Int,
fromTimestamp: Instant,
limit: Int): Future[Seq[Bucket]]
limit: Int,
correlationId: Option[String]): Future[Seq[Bucket]]

protected def appendEmptyBucketIfLastIsMissing(
buckets: IndexedSeq[Bucket],
Expand Down Expand Up @@ -268,6 +270,7 @@ import org.slf4j.Logger

def currentBySlices(
logPrefix: String,
correlationId: Option[String],
entityType: String,
minSlice: Int,
maxSlice: Int,
Expand Down Expand Up @@ -322,7 +325,8 @@ import org.slf4j.Logger
fromSeqNr,
toTimestamp = Some(toTimestamp),
behindCurrentTime = Duration.Zero,
backtracking = false)
backtracking = false,
correlationId)
.filter { row =>
filterEventsBeforeSnapshots(row.persistenceId, row.seqNr, row.source)
}
Expand Down Expand Up @@ -354,14 +358,15 @@ import org.slf4j.Logger
updateState = nextOffset,
delayNextQuery = _ => None,
nextQuery = state => nextQuery(state, currentTime),
beforeQuery = beforeQuery(logPrefix, entityType, minSlice, maxSlice, _))
beforeQuery = beforeQuery(logPrefix, correlationId, entityType, minSlice, maxSlice, _))
}
}
.mapMaterializedValue(_ => NotUsed)
}

def liveBySlices(
logPrefix: String,
correlationId: Option[String],
entityType: String,
minSlice: Int,
maxSlice: Int,
Expand Down Expand Up @@ -555,7 +560,8 @@ import org.slf4j.Logger
fromSeqNr,
toTimestamp,
behindCurrentTime,
backtracking = newState.backtracking)
backtracking = newState.backtracking,
correlationId)
.filter { row =>
filterEventsBeforeSnapshots(row.persistenceId, row.seqNr, row.source)
}
Expand Down Expand Up @@ -589,7 +595,7 @@ import org.slf4j.Logger
updateState = nextOffset,
delayNextQuery = delayNextQuery,
nextQuery = nextQuery,
beforeQuery = beforeQuery(logPrefix, entityType, minSlice, maxSlice, _),
beforeQuery = beforeQuery(logPrefix, correlationId, entityType, minSlice, maxSlice, _),
heartbeat = nextHeartbeat)
}
}
Expand All @@ -598,6 +604,7 @@ import org.slf4j.Logger

private def beforeQuery(
logPrefix: String,
correlationId: Option[String],
entityType: String,
minSlice: Int,
maxSlice: Int,
Expand All @@ -624,7 +631,7 @@ import org.slf4j.Logger
}

val futureState =
dao.countBuckets(entityType, minSlice, maxSlice, fromTimestamp, Buckets.Limit).map { counts =>
dao.countBuckets(entityType, minSlice, maxSlice, fromTimestamp, Buckets.Limit, correlationId).map { counts =>
val newBuckets = state.buckets.add(counts)
val newState = state.copy(buckets = newBuckets)
if (log.isDebugEnabled) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
/*
* Copyright (C) 2022 - 2025 Lightbend Inc. <https://akka.io>
*/

package akka.persistence.r2dbc.internal

import akka.annotation.InternalApi

/**
* INTERNAL API
*/
@InternalApi
object CorrelationId {
def toLogText(correlationid: Option[String]) = correlationid match {
case Some(id) => s", correlation [$id]"
case None => ""
}
}
23 changes: 18 additions & 5 deletions core/src/main/scala/akka/persistence/r2dbc/internal/QueryDao.scala
Original file line number Diff line number Diff line change
Expand Up @@ -23,19 +23,32 @@ private[r2dbc] trait QueryDao extends BySliceQuery.Dao[SerializedJournalRow] {
*/
override def countBucketsMayChange: Boolean = false

def timestampOfEvent(persistenceId: String, seqNr: Long): Future[Option[Instant]]
def timestampOfEvent(persistenceId: String, seqNr: Long, correlationId: Option[String]): Future[Option[Instant]]

def latestEventTimestamp(entityType: String, minSlice: Int, maxSlice: Int): Future[Option[Instant]]
def latestEventTimestamp(
entityType: String,
minSlice: Int,
maxSlice: Int,
correlationId: Option[String]): Future[Option[Instant]]

def loadEvent(persistenceId: String, seqNr: Long, includePayload: Boolean): Future[Option[SerializedJournalRow]]
def loadEvent(
persistenceId: String,
seqNr: Long,
includePayload: Boolean,
correlationId: Option[String]): Future[Option[SerializedJournalRow]]

def loadLastEvent(persistenceId: String, toSeqNr: Long, includeDeleted: Boolean): Future[Option[SerializedJournalRow]]
def loadLastEvent(
persistenceId: String,
toSeqNr: Long,
includeDeleted: Boolean,
correlationId: Option[String]): Future[Option[SerializedJournalRow]]

def eventsByPersistenceId(
persistenceId: String,
fromSequenceNr: Long,
toSequenceNr: Long,
includeDeleted: Boolean): Source[SerializedJournalRow, NotUsed]
includeDeleted: Boolean,
correlationId: Option[String]): Source[SerializedJournalRow, NotUsed]

def persistenceIds(entityType: String, afterId: Option[String], limit: Long): Source[String, NotUsed]

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,21 +7,18 @@ package akka.persistence.r2dbc.internal.postgres
import java.lang
import java.time.Instant
import java.util

import scala.collection.immutable
import scala.concurrent.ExecutionContext
import scala.concurrent.Future
import scala.concurrent.duration.Duration
import scala.concurrent.duration.FiniteDuration
import scala.util.control.NonFatal

import io.r2dbc.spi.Connection
import io.r2dbc.spi.R2dbcDataIntegrityViolationException
import io.r2dbc.spi.Row
import io.r2dbc.spi.Statement
import org.slf4j.Logger
import org.slf4j.LoggerFactory

import akka.Done
import akka.NotUsed
import akka.actor.typed.ActorSystem
Expand All @@ -36,6 +33,7 @@ import akka.persistence.r2dbc.internal.AdditionalColumnFactory
import akka.persistence.r2dbc.internal.BySliceQuery.Buckets
import akka.persistence.r2dbc.internal.BySliceQuery.Buckets.Bucket
import akka.persistence.r2dbc.internal.ChangeHandlerFactory
import akka.persistence.r2dbc.internal.CorrelationId
import akka.persistence.r2dbc.internal.Dialect
import akka.persistence.r2dbc.internal.DurableStateDao
import akka.persistence.r2dbc.internal.InstantFactory
Expand Down Expand Up @@ -654,11 +652,12 @@ private[r2dbc] class PostgresDurableStateDao(executorProvider: R2dbcExecutorProv
fromSeqNr: Option[Long],
toTimestamp: Option[Instant],
behindCurrentTime: FiniteDuration,
backtracking: Boolean): Source[SerializedStateRow, NotUsed] = {
backtracking: Boolean,
correlationId: Option[String]): Source[SerializedStateRow, NotUsed] = {
if (!settings.isSliceRangeWithinSameDataPartition(minSlice, maxSlice))
throw new IllegalArgumentException(
s"Slice range [$minSlice-$maxSlice] spans over more than one " +
s"of the [${settings.numberOfDataPartitions}] data partitions.")
s"of the [${settings.numberOfDataPartitions}] data partitions" + CorrelationId.toLogText(correlationId))

val executor = executorProvider.executorFor(minSlice)
val result = executor.select(s"select stateBySlices [$minSlice - $maxSlice]")(
Expand Down Expand Up @@ -705,8 +704,11 @@ private[r2dbc] class PostgresDurableStateDao(executorProvider: R2dbcExecutorProv
tags = Set.empty // tags not fetched in queries (yet)
))

if (log.isDebugEnabled)
result.foreach(rows => log.debug("Read [{}] durable states from slices [{} - {}]", rows.size, minSlice, maxSlice))
if (log.isDebugEnabled) {
val correlationText = CorrelationId.toLogText(correlationId)
result.foreach(rows =>
log.debug("Read [{}] durable states from slices [{} - {}]{}", rows.size, minSlice, maxSlice, correlationText))
}

Source.futureSource(result.map(Source(_))).mapMaterializedValue(_ => NotUsed)
}
Expand Down Expand Up @@ -861,7 +863,8 @@ private[r2dbc] class PostgresDurableStateDao(executorProvider: R2dbcExecutorProv
minSlice: Int,
maxSlice: Int,
fromTimestamp: Instant,
limit: Int): Future[Seq[Bucket]] = {
limit: Int,
correlationId: Option[String]): Future[Seq[Bucket]] = {

val now = InstantFactory.now() // not important to use database time
val toTimestamp = {
Expand All @@ -886,8 +889,11 @@ private[r2dbc] class PostgresDurableStateDao(executorProvider: R2dbcExecutorProv
Bucket(bucketStartEpochSeconds, count)
})

if (log.isDebugEnabled)
result.foreach(rows => log.debug("Read [{}] bucket counts from slices [{} - {}]", rows.size, minSlice, maxSlice))
if (log.isDebugEnabled) {
val correlationText = CorrelationId.toLogText(correlationId)
result.foreach(rows =>
log.debug("Read [{}] bucket counts from slices [{} - {}]{}", rows.size, minSlice, maxSlice, correlationText))
}

if (toTimestamp == now)
result
Expand Down
Loading
Loading