From 75ab142827d257bde0178aefea271a38e1b6eacb Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Thu, 8 Feb 2024 13:45:46 +0100 Subject: [PATCH] . --- .../persistence/r2dbc/R2dbcSettings.scala | 38 +++++++++++++------ .../CurrentPersistenceIdsQuerySpec.scala | 4 +- ...urableStateStoreAdditionalColumnSpec.scala | 4 +- .../DurableStateStoreChangeHandlerSpec.scala | 2 +- 4 files changed, 32 insertions(+), 16 deletions(-) diff --git a/core/src/main/scala/akka/persistence/r2dbc/R2dbcSettings.scala b/core/src/main/scala/akka/persistence/r2dbc/R2dbcSettings.scala index 042443d3..892d6c05 100644 --- a/core/src/main/scala/akka/persistence/r2dbc/R2dbcSettings.scala +++ b/core/src/main/scala/akka/persistence/r2dbc/R2dbcSettings.scala @@ -238,33 +238,42 @@ final class R2dbcSettings private ( val numberOfDataPartitions: Int) { import R2dbcSettings.NumberOfSlices + private val _journalTableWithSchema: String = schema.map(_ + ".").getOrElse("") + journalTable + /** * The journal table and schema name without data partition suffix. */ - val journalTableWithSchema: String = schema.map(_ + ".").getOrElse("") + journalTable + @deprecated("Use journalTableWithSchema(slice)", "1.2.2") + val journalTableWithSchema: String = _journalTableWithSchema /** * The journal table and schema name with data partition suffix for the given slice. When number-of-partitions is 1 * the table name is without suffix. */ def journalTableWithSchema(slice: Int): String = - resolveTableName(journalTableWithSchema, slice) + resolveTableName(_journalTableWithSchema, slice) + + private val _snapshotsTableWithSchema: String = schema.map(_ + ".").getOrElse("") + snapshotsTable /** * The snapshot table and schema name without data partition suffix. */ - val snapshotsTableWithSchema: String = schema.map(_ + ".").getOrElse("") + snapshotsTable + @deprecated("Use snapshotTableWithSchema(slice)", "1.2.2") + val snapshotsTableWithSchema: String = _snapshotsTableWithSchema /** * The snapshot table and schema name with data partition suffix for the given slice. When number-of-partitions is 1 * the table name is without suffix. */ def snapshotTableWithSchema(slice: Int): String = - resolveTableName(snapshotsTableWithSchema, slice) + resolveTableName(_snapshotsTableWithSchema, slice) + + private val _durableStateTableWithSchema: String = schema.map(_ + ".").getOrElse("") + durableStateTable /** * The durable state table and schema name without data partition suffix. */ + @deprecated("Use durableStateTableWithSchema(slice)", "1.2.2") val durableStateTableWithSchema: String = schema.map(_ + ".").getOrElse("") + durableStateTable /** @@ -272,7 +281,7 @@ final class R2dbcSettings private ( * is 1 the table name is without suffix. */ def durableStateTableWithSchema(slice: Int): String = - resolveTableName(durableStateTableWithSchema, slice) + resolveTableName(_durableStateTableWithSchema, slice) private def resolveTableName(table: String, slice: Int): String = { if (numberOfDataPartitions == 1) @@ -296,14 +305,20 @@ final class R2dbcSettings private ( /** * INTERNAL API: All durable state tables and their the lower slice */ - @InternalApi private[akka] val allDurableStateTablesWithSchema: Map[String, Int] = - resolveAllTableNames(durableStateTableWithSchema(_)) + @InternalApi private[akka] val allDurableStateTablesWithSchema: Map[String, Int] = { + val defaultTables = resolveAllTableNames(durableStateTableWithSchema(_)) + val entityTypes = _durableStateTableByEntityType.keys + entityTypes.foldLeft(defaultTables) { case (acc, entityType) => + val entityTypeTables = resolveAllTableNames(slice => getDurableStateTableWithSchema(entityType, slice)) + acc ++ entityTypeTables + } + } private def resolveAllTableNames(tableForSlice: Int => String): Map[String, Int] = - (0 until NumberOfSlices).foldLeft(Map.empty[String, Int]) { case (acc, slice) => - val table = tableForSlice(slice) + dataPartitionSliceRanges.foldLeft(Map.empty[String, Int]) { case (acc, sliceRange) => + val table = tableForSlice(sliceRange.min) if (acc.contains(table)) acc - else acc.updated(table, slice) + else acc.updated(table, sliceRange.min) } val numberOfDatabases: Int = _connectionFactorySettings.size @@ -342,8 +357,9 @@ final class R2dbcSettings private ( /** * The durable state table and schema name for the `entityType` without data partition suffix. */ + @deprecated("Use getDurableStateTableWithSchema(entityType, slice)", "1.2.2") def getDurableStateTableWithSchema(entityType: String): String = - durableStateTableByEntityTypeWithSchema.getOrElse(entityType, durableStateTableWithSchema) + durableStateTableByEntityTypeWithSchema.getOrElse(entityType, _durableStateTableWithSchema) /** * The durable state table and schema name for the `entityType` with data partition suffix for the given slice. When diff --git a/core/src/test/scala/akka/persistence/r2dbc/state/CurrentPersistenceIdsQuerySpec.scala b/core/src/test/scala/akka/persistence/r2dbc/state/CurrentPersistenceIdsQuerySpec.scala index 724f85d1..7a2ee209 100644 --- a/core/src/test/scala/akka/persistence/r2dbc/state/CurrentPersistenceIdsQuerySpec.scala +++ b/core/src/test/scala/akka/persistence/r2dbc/state/CurrentPersistenceIdsQuerySpec.scala @@ -73,8 +73,6 @@ class CurrentPersistenceIdsQuerySpec } override protected def beforeAll(): Unit = { - super.beforeAll() - r2dbcSettings.dataPartitionSliceRanges.foreach { sliceRange => val dataPartitionSlice = sliceRange.min Await.result( @@ -85,6 +83,8 @@ class CurrentPersistenceIdsQuerySpec r2dbcExecutor(dataPartitionSlice).updateOne("beforeAll delete")( _.createStatement(s"delete from ${customTable(dataPartitionSlice)}")), 10.seconds) + + super.beforeAll() } val probe = createTestProbe[Done]() diff --git a/core/src/test/scala/akka/persistence/r2dbc/state/DurableStateStoreAdditionalColumnSpec.scala b/core/src/test/scala/akka/persistence/r2dbc/state/DurableStateStoreAdditionalColumnSpec.scala index f4533c47..c3d03dd7 100644 --- a/core/src/test/scala/akka/persistence/r2dbc/state/DurableStateStoreAdditionalColumnSpec.scala +++ b/core/src/test/scala/akka/persistence/r2dbc/state/DurableStateStoreAdditionalColumnSpec.scala @@ -85,8 +85,6 @@ class DurableStateStoreAdditionalColumnSpec override def typedSystem: ActorSystem[_] = system override def beforeAll(): Unit = { - super.beforeAll() - r2dbcSettings.dataPartitionSliceRanges.foreach { sliceRange => val dataPartitionSlice = sliceRange.min Await.result( @@ -110,6 +108,8 @@ class DurableStateStoreAdditionalColumnSpec _.createStatement(s"delete from ${customTable(dataPartitionSlice)}")), 10.seconds) } + + super.beforeAll() } private val store = DurableStateStoreRegistry(testKit.system) diff --git a/core/src/test/scala/akka/persistence/r2dbc/state/DurableStateStoreChangeHandlerSpec.scala b/core/src/test/scala/akka/persistence/r2dbc/state/DurableStateStoreChangeHandlerSpec.scala index d72da074..268d4bfc 100644 --- a/core/src/test/scala/akka/persistence/r2dbc/state/DurableStateStoreChangeHandlerSpec.scala +++ b/core/src/test/scala/akka/persistence/r2dbc/state/DurableStateStoreChangeHandlerSpec.scala @@ -117,13 +117,13 @@ class DurableStateStoreChangeHandlerSpec override def typedSystem: ActorSystem[_] = system override def beforeAll(): Unit = { - super.beforeAll() Await.result( r2dbcExecutor.executeDdl("beforeAll create durable_state_test")(_.createStatement(createTableSql)), 20.seconds) Await.result( r2dbcExecutor.updateOne("beforeAll delete")(_.createStatement(s"delete from $anotherTable")), 10.seconds) + super.beforeAll() } private val store = DurableStateStoreRegistry(testKit.system)