Skip to content

Commit

Permalink
feat: Data partitions for snapshot and durable state
Browse files Browse the repository at this point in the history
  • Loading branch information
patriknw committed Feb 2, 2024
1 parent c7dd202 commit 73af6fe
Show file tree
Hide file tree
Showing 12 changed files with 205 additions and 96 deletions.
67 changes: 59 additions & 8 deletions core/src/main/scala/akka/persistence/r2dbc/R2dbcSettings.scala
Original file line number Diff line number Diff line change
Expand Up @@ -244,26 +244,64 @@ final class R2dbcSettings private (
* The journal table and schema name with data partition suffix for the given slice. When number-of-partitions is 1
* the table name is without suffix.
*/
def journalTableWithSchema(slice: Int): String = {
def journalTableWithSchema(slice: Int): String =
resolveTableName(journalTableWithSchema, slice)

/**
* The snapshot table and schema name without data partition suffix.
*/
val snapshotsTableWithSchema: String = schema.map(_ + ".").getOrElse("") + snapshotsTable

/**
* The snapshot table and schema name with data partition suffix for the given slice. When number-of-partitions is 1
* the table name is without suffix.
*/
def snapshotTableWithSchema(slice: Int): String =
resolveTableName(snapshotsTableWithSchema, slice)

/**
* The durable state table and schema name without data partition suffix.
*/
val durableStateTableWithSchema: String = schema.map(_ + ".").getOrElse("") + durableStateTable

/**
* The durable state table and schema name with data partition suffix for the given slice. When number-of-partitions
* is 1 the table name is without suffix.
*/
def durableStateTableWithSchema(slice: Int): String =
resolveTableName(durableStateTableWithSchema, slice)

private def resolveTableName(table: String, slice: Int): String = {
if (numberOfDataPartitions == 1)
journalTableWithSchema
table
else
s"${journalTableWithSchema}_${dataPartition(slice)}"
s"${table}_${dataPartition(slice)}"
}

val snapshotsTableWithSchema: String = schema.map(_ + ".").getOrElse("") + snapshotsTable
val durableStateTableWithSchema: String = schema.map(_ + ".").getOrElse("") + durableStateTable
/**
* INTERNAL API: All journal tables and their the lower slice
*/
@InternalApi private[akka] val allJournalTablesWithSchema: Map[String, Int] =
resolveAllTableNames(journalTableWithSchema(_))

/**
* INTERNAL API: All snapshot tables and their the lower slice
*/
@InternalApi private[akka] val allSnapshotTablesWithSchema: Map[String, Int] =
resolveAllTableNames(snapshotTableWithSchema(_))

/**
* INTERNAL API: All journal tables and their the lower slice
*/
@InternalApi private[akka] val allJournalTablesWithSchema: Map[String, Int] = {
@InternalApi private[akka] val allDurableStateTablesWithSchema: Map[String, Int] =
resolveAllTableNames(durableStateTableWithSchema(_))

private def resolveAllTableNames(tableForSlice: Int => String): Map[String, Int] =
(0 until NumberOfSlices).foldLeft(Map.empty[String, Int]) { case (acc, slice) =>
val table = journalTableWithSchema(slice)
val table = tableForSlice(slice)
if (acc.contains(table)) acc
else acc.updated(table, slice)
}
}

val numberOfDatabases: Int = _connectionFactorySettings.size

Expand Down Expand Up @@ -298,9 +336,22 @@ final class R2dbcSettings private (
def getDurableStateTable(entityType: String): String =
_durableStateTableByEntityType.getOrElse(entityType, durableStateTable)

/**
* The durable state table and schema name for the `entityType` without data partition suffix.
*/
def getDurableStateTableWithSchema(entityType: String): String =
durableStateTableByEntityTypeWithSchema.getOrElse(entityType, durableStateTableWithSchema)

/**
* The durable state table and schema name for the `entityType` with data partition suffix for the given slice. When
* number-of-partitions is 1 the table name is without suffix.
*/
def getDurableStateTableWithSchema(entityType: String, slice: Int): String =
durableStateTableByEntityTypeWithSchema.get(entityType) match {
case None => durableStateTableWithSchema(slice)
case Some(table) => resolveTableName(table, slice)
}

/**
* INTERNAL API
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,18 +30,18 @@ private[r2dbc] final class H2SnapshotDao(settings: R2dbcSettings, executorProvid

override protected lazy val log: Logger = LoggerFactory.getLogger(classOf[H2SnapshotDao])

override protected def createUpsertSql: String = {
override protected def upsertSql(slice: Int): String = {
// db_timestamp and tags columns were added in 1.2.0
if (settings.querySettings.startFromSnapshotEnabled)
sql"""
MERGE INTO $snapshotTable
MERGE INTO ${snapshotTable(slice)}
(slice, entity_type, persistence_id, seq_nr, write_timestamp, snapshot, ser_id, ser_manifest, meta_payload, meta_ser_id, meta_ser_manifest, db_timestamp, tags)
KEY (persistence_id)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
"""
else
sql"""
MERGE INTO $snapshotTable
MERGE INTO ${snapshotTable(slice)}
(slice, entity_type, persistence_id, seq_nr, write_timestamp, snapshot, ser_id, ser_manifest, meta_payload, meta_ser_id, meta_ser_manifest)
KEY (persistence_id)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
Expand Down
Loading

0 comments on commit 73af6fe

Please sign in to comment.