diff --git a/core/src/main/scala/akka/persistence/r2dbc/R2dbcSettings.scala b/core/src/main/scala/akka/persistence/r2dbc/R2dbcSettings.scala index e4da2dfe..042443d3 100644 --- a/core/src/main/scala/akka/persistence/r2dbc/R2dbcSettings.scala +++ b/core/src/main/scala/akka/persistence/r2dbc/R2dbcSettings.scala @@ -91,6 +91,9 @@ object R2dbcSettings { numberOfDatabases * (numberOfDataPartitions / numberOfDatabases) == numberOfDataPartitions, s"data-partition.number-of-databases [$numberOfDatabases] must be a whole number divisor of " + s"data-partition.number-of-partitions [$numberOfDataPartitions].") + require( + durableStateChangeHandlerClasses.isEmpty || numberOfDatabases == 1, + "Durable State ChangeHandler not supported with more than one data partition database.") val connectionFactorySettings = if (numberOfDatabases == 1) { @@ -291,7 +294,7 @@ final class R2dbcSettings private ( resolveAllTableNames(snapshotTableWithSchema(_)) /** - * INTERNAL API: All journal tables and their the lower slice + * INTERNAL API: All durable state tables and their the lower slice */ @InternalApi private[akka] val allDurableStateTablesWithSchema: Map[String, Int] = resolveAllTableNames(durableStateTableWithSchema(_)) diff --git a/core/src/test/scala/akka/persistence/r2dbc/state/DurableStateStoreChangeHandlerSpec.scala b/core/src/test/scala/akka/persistence/r2dbc/state/DurableStateStoreChangeHandlerSpec.scala index b86b5899..d72da074 100644 --- a/core/src/test/scala/akka/persistence/r2dbc/state/DurableStateStoreChangeHandlerSpec.scala +++ b/core/src/test/scala/akka/persistence/r2dbc/state/DurableStateStoreChangeHandlerSpec.scala @@ -8,6 +8,7 @@ import scala.concurrent.Await import scala.concurrent.ExecutionContext import scala.concurrent.Future import scala.concurrent.duration._ + import akka.Done import akka.actor.testkit.typed.scaladsl.LogCapturing import akka.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit @@ -28,6 +29,7 @@ import akka.persistence.state.scaladsl.GetObjectResult import com.typesafe.config.Config import com.typesafe.config.ConfigFactory import org.scalatest.wordspec.AnyWordSpecLike + import akka.persistence.r2dbc.internal.codec.IdentityAdapter import akka.persistence.r2dbc.internal.codec.QueryAdapter import akka.persistence.r2dbc.internal.codec.SqlServerQueryAdapter @@ -57,6 +59,9 @@ object DurableStateStoreChangeHandlerSpec { "JavadslCustomEntity" = "$javaDslcustomEntity" } } + # ChangeHandler not supported for number-of-databases > 1. + # Test is pending for that, but must override to be able to "start" test. + akka.persistence.r2dbc.data-partition.number-of-databases = 1 """) .withFallback(testConfig) @@ -138,6 +143,8 @@ class DurableStateStoreChangeHandlerSpec .getOrElse(0L) "The R2DBC durable state store change handler" should { + // ChangeHandler not supported for number-of-databases > 1 + pendingIfMoreThanOneDataPartition() "be invoked for first revision" in { val entityType = "CustomEntity"