Skip to content

Commit

Permalink
.
Browse files Browse the repository at this point in the history
  • Loading branch information
patriknw committed Feb 8, 2024
1 parent 24808e8 commit 75ab142
Show file tree
Hide file tree
Showing 4 changed files with 32 additions and 16 deletions.
38 changes: 27 additions & 11 deletions core/src/main/scala/akka/persistence/r2dbc/R2dbcSettings.scala
Original file line number Diff line number Diff line change
Expand Up @@ -238,41 +238,50 @@ final class R2dbcSettings private (
val numberOfDataPartitions: Int) {
import R2dbcSettings.NumberOfSlices

private val _journalTableWithSchema: String = schema.map(_ + ".").getOrElse("") + journalTable

/**
* The journal table and schema name without data partition suffix.
*/
val journalTableWithSchema: String = schema.map(_ + ".").getOrElse("") + journalTable
@deprecated("Use journalTableWithSchema(slice)", "1.2.2")
val journalTableWithSchema: String = _journalTableWithSchema

/**
* The journal table and schema name with data partition suffix for the given slice. When number-of-partitions is 1
* the table name is without suffix.
*/
def journalTableWithSchema(slice: Int): String =
resolveTableName(journalTableWithSchema, slice)
resolveTableName(_journalTableWithSchema, slice)

private val _snapshotsTableWithSchema: String = schema.map(_ + ".").getOrElse("") + snapshotsTable

/**
* The snapshot table and schema name without data partition suffix.
*/
val snapshotsTableWithSchema: String = schema.map(_ + ".").getOrElse("") + snapshotsTable
@deprecated("Use snapshotTableWithSchema(slice)", "1.2.2")
val snapshotsTableWithSchema: String = _snapshotsTableWithSchema

/**
* The snapshot table and schema name with data partition suffix for the given slice. When number-of-partitions is 1
* the table name is without suffix.
*/
def snapshotTableWithSchema(slice: Int): String =
resolveTableName(snapshotsTableWithSchema, slice)
resolveTableName(_snapshotsTableWithSchema, slice)

private val _durableStateTableWithSchema: String = schema.map(_ + ".").getOrElse("") + durableStateTable

/**
* The durable state table and schema name without data partition suffix.
*/
@deprecated("Use durableStateTableWithSchema(slice)", "1.2.2")
val durableStateTableWithSchema: String = schema.map(_ + ".").getOrElse("") + durableStateTable

/**
* The durable state table and schema name with data partition suffix for the given slice. When number-of-partitions
* is 1 the table name is without suffix.
*/
def durableStateTableWithSchema(slice: Int): String =
resolveTableName(durableStateTableWithSchema, slice)
resolveTableName(_durableStateTableWithSchema, slice)

private def resolveTableName(table: String, slice: Int): String = {
if (numberOfDataPartitions == 1)
Expand All @@ -296,14 +305,20 @@ final class R2dbcSettings private (
/**
* INTERNAL API: All durable state tables and their the lower slice
*/
@InternalApi private[akka] val allDurableStateTablesWithSchema: Map[String, Int] =
resolveAllTableNames(durableStateTableWithSchema(_))
@InternalApi private[akka] val allDurableStateTablesWithSchema: Map[String, Int] = {
val defaultTables = resolveAllTableNames(durableStateTableWithSchema(_))
val entityTypes = _durableStateTableByEntityType.keys
entityTypes.foldLeft(defaultTables) { case (acc, entityType) =>
val entityTypeTables = resolveAllTableNames(slice => getDurableStateTableWithSchema(entityType, slice))
acc ++ entityTypeTables
}
}

private def resolveAllTableNames(tableForSlice: Int => String): Map[String, Int] =
(0 until NumberOfSlices).foldLeft(Map.empty[String, Int]) { case (acc, slice) =>
val table = tableForSlice(slice)
dataPartitionSliceRanges.foldLeft(Map.empty[String, Int]) { case (acc, sliceRange) =>
val table = tableForSlice(sliceRange.min)
if (acc.contains(table)) acc
else acc.updated(table, slice)
else acc.updated(table, sliceRange.min)
}

val numberOfDatabases: Int = _connectionFactorySettings.size
Expand Down Expand Up @@ -342,8 +357,9 @@ final class R2dbcSettings private (
/**
* The durable state table and schema name for the `entityType` without data partition suffix.
*/
@deprecated("Use getDurableStateTableWithSchema(entityType, slice)", "1.2.2")
def getDurableStateTableWithSchema(entityType: String): String =
durableStateTableByEntityTypeWithSchema.getOrElse(entityType, durableStateTableWithSchema)
durableStateTableByEntityTypeWithSchema.getOrElse(entityType, _durableStateTableWithSchema)

/**
* The durable state table and schema name for the `entityType` with data partition suffix for the given slice. When
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,6 @@ class CurrentPersistenceIdsQuerySpec
}

override protected def beforeAll(): Unit = {
super.beforeAll()

r2dbcSettings.dataPartitionSliceRanges.foreach { sliceRange =>
val dataPartitionSlice = sliceRange.min
Await.result(
Expand All @@ -85,6 +83,8 @@ class CurrentPersistenceIdsQuerySpec
r2dbcExecutor(dataPartitionSlice).updateOne("beforeAll delete")(
_.createStatement(s"delete from ${customTable(dataPartitionSlice)}")),
10.seconds)

super.beforeAll()
}

val probe = createTestProbe[Done]()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,8 +85,6 @@ class DurableStateStoreAdditionalColumnSpec
override def typedSystem: ActorSystem[_] = system

override def beforeAll(): Unit = {
super.beforeAll()

r2dbcSettings.dataPartitionSliceRanges.foreach { sliceRange =>
val dataPartitionSlice = sliceRange.min
Await.result(
Expand All @@ -110,6 +108,8 @@ class DurableStateStoreAdditionalColumnSpec
_.createStatement(s"delete from ${customTable(dataPartitionSlice)}")),
10.seconds)
}

super.beforeAll()
}

private val store = DurableStateStoreRegistry(testKit.system)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,13 +117,13 @@ class DurableStateStoreChangeHandlerSpec
override def typedSystem: ActorSystem[_] = system

override def beforeAll(): Unit = {
super.beforeAll()
Await.result(
r2dbcExecutor.executeDdl("beforeAll create durable_state_test")(_.createStatement(createTableSql)),
20.seconds)
Await.result(
r2dbcExecutor.updateOne("beforeAll delete")(_.createStatement(s"delete from $anotherTable")),
10.seconds)
super.beforeAll()
}

private val store = DurableStateStoreRegistry(testKit.system)
Expand Down

0 comments on commit 75ab142

Please sign in to comment.