From 73af6fe6f0edaf85b066660ac6b8398bfd827d78 Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Fri, 2 Feb 2024 15:10:26 +0100 Subject: [PATCH] feat: Data partitions for snapshot and durable state --- .../persistence/r2dbc/R2dbcSettings.scala | 67 +++++++++++++-- .../r2dbc/internal/h2/H2SnapshotDao.scala | 6 +- .../postgres/PostgresDurableStateDao.scala | 84 ++++++++++++------- .../postgres/PostgresSnapshotDao.scala | 60 ++++++++----- .../sqlserver/SqlServerDurableStateDao.scala | 10 ++- .../sqlserver/SqlServerSnapshotDao.scala | 21 +++-- .../scaladsl/R2dbcDurableStateStore.scala | 1 + .../persistence/r2dbc/R2dbcSettingsSpec.scala | 26 ++++-- .../persistence/r2dbc/TestDbLifecycle.scala | 20 +++-- .../CurrentPersistenceIdsQuerySpec.scala | 2 + .../r2dbc/state/DurableStateBySliceSpec.scala | 2 - ...urableStateStoreAdditionalColumnSpec.scala | 2 + 12 files changed, 205 insertions(+), 96 deletions(-) diff --git a/core/src/main/scala/akka/persistence/r2dbc/R2dbcSettings.scala b/core/src/main/scala/akka/persistence/r2dbc/R2dbcSettings.scala index 249a3b37..e4da2dfe 100644 --- a/core/src/main/scala/akka/persistence/r2dbc/R2dbcSettings.scala +++ b/core/src/main/scala/akka/persistence/r2dbc/R2dbcSettings.scala @@ -244,26 +244,64 @@ final class R2dbcSettings private ( * The journal table and schema name with data partition suffix for the given slice. When number-of-partitions is 1 * the table name is without suffix. */ - def journalTableWithSchema(slice: Int): String = { + def journalTableWithSchema(slice: Int): String = + resolveTableName(journalTableWithSchema, slice) + + /** + * The snapshot table and schema name without data partition suffix. + */ + val snapshotsTableWithSchema: String = schema.map(_ + ".").getOrElse("") + snapshotsTable + + /** + * The snapshot table and schema name with data partition suffix for the given slice. When number-of-partitions is 1 + * the table name is without suffix. + */ + def snapshotTableWithSchema(slice: Int): String = + resolveTableName(snapshotsTableWithSchema, slice) + + /** + * The durable state table and schema name without data partition suffix. + */ + val durableStateTableWithSchema: String = schema.map(_ + ".").getOrElse("") + durableStateTable + + /** + * The durable state table and schema name with data partition suffix for the given slice. When number-of-partitions + * is 1 the table name is without suffix. + */ + def durableStateTableWithSchema(slice: Int): String = + resolveTableName(durableStateTableWithSchema, slice) + + private def resolveTableName(table: String, slice: Int): String = { if (numberOfDataPartitions == 1) - journalTableWithSchema + table else - s"${journalTableWithSchema}_${dataPartition(slice)}" + s"${table}_${dataPartition(slice)}" } - val snapshotsTableWithSchema: String = schema.map(_ + ".").getOrElse("") + snapshotsTable - val durableStateTableWithSchema: String = schema.map(_ + ".").getOrElse("") + durableStateTable + /** + * INTERNAL API: All journal tables and their the lower slice + */ + @InternalApi private[akka] val allJournalTablesWithSchema: Map[String, Int] = + resolveAllTableNames(journalTableWithSchema(_)) + + /** + * INTERNAL API: All snapshot tables and their the lower slice + */ + @InternalApi private[akka] val allSnapshotTablesWithSchema: Map[String, Int] = + resolveAllTableNames(snapshotTableWithSchema(_)) /** * INTERNAL API: All journal tables and their the lower slice */ - @InternalApi private[akka] val allJournalTablesWithSchema: Map[String, Int] = { + @InternalApi private[akka] val allDurableStateTablesWithSchema: Map[String, Int] = + resolveAllTableNames(durableStateTableWithSchema(_)) + + private def resolveAllTableNames(tableForSlice: Int => String): Map[String, Int] = (0 until NumberOfSlices).foldLeft(Map.empty[String, Int]) { case (acc, slice) => - val table = journalTableWithSchema(slice) + val table = tableForSlice(slice) if (acc.contains(table)) acc else acc.updated(table, slice) } - } val numberOfDatabases: Int = _connectionFactorySettings.size @@ -298,9 +336,22 @@ final class R2dbcSettings private ( def getDurableStateTable(entityType: String): String = _durableStateTableByEntityType.getOrElse(entityType, durableStateTable) + /** + * The durable state table and schema name for the `entityType` without data partition suffix. + */ def getDurableStateTableWithSchema(entityType: String): String = durableStateTableByEntityTypeWithSchema.getOrElse(entityType, durableStateTableWithSchema) + /** + * The durable state table and schema name for the `entityType` with data partition suffix for the given slice. When + * number-of-partitions is 1 the table name is without suffix. + */ + def getDurableStateTableWithSchema(entityType: String, slice: Int): String = + durableStateTableByEntityTypeWithSchema.get(entityType) match { + case None => durableStateTableWithSchema(slice) + case Some(table) => resolveTableName(table, slice) + } + /** * INTERNAL API */ diff --git a/core/src/main/scala/akka/persistence/r2dbc/internal/h2/H2SnapshotDao.scala b/core/src/main/scala/akka/persistence/r2dbc/internal/h2/H2SnapshotDao.scala index cf3c6279..2b7cc91c 100644 --- a/core/src/main/scala/akka/persistence/r2dbc/internal/h2/H2SnapshotDao.scala +++ b/core/src/main/scala/akka/persistence/r2dbc/internal/h2/H2SnapshotDao.scala @@ -30,18 +30,18 @@ private[r2dbc] final class H2SnapshotDao(settings: R2dbcSettings, executorProvid override protected lazy val log: Logger = LoggerFactory.getLogger(classOf[H2SnapshotDao]) - override protected def createUpsertSql: String = { + override protected def upsertSql(slice: Int): String = { // db_timestamp and tags columns were added in 1.2.0 if (settings.querySettings.startFromSnapshotEnabled) sql""" - MERGE INTO $snapshotTable + MERGE INTO ${snapshotTable(slice)} (slice, entity_type, persistence_id, seq_nr, write_timestamp, snapshot, ser_id, ser_manifest, meta_payload, meta_ser_id, meta_ser_manifest, db_timestamp, tags) KEY (persistence_id) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) """ else sql""" - MERGE INTO $snapshotTable + MERGE INTO ${snapshotTable(slice)} (slice, entity_type, persistence_id, seq_nr, write_timestamp, snapshot, ser_id, ser_manifest, meta_payload, meta_ser_id, meta_ser_manifest) KEY (persistence_id) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) diff --git a/core/src/main/scala/akka/persistence/r2dbc/internal/postgres/PostgresDurableStateDao.scala b/core/src/main/scala/akka/persistence/r2dbc/internal/postgres/PostgresDurableStateDao.scala index 7042a4a7..22be3243 100644 --- a/core/src/main/scala/akka/persistence/r2dbc/internal/postgres/PostgresDurableStateDao.scala +++ b/core/src/main/scala/akka/persistence/r2dbc/internal/postgres/PostgresDurableStateDao.scala @@ -16,7 +16,6 @@ import scala.concurrent.duration.FiniteDuration import scala.util.control.NonFatal import io.r2dbc.spi.Connection -import io.r2dbc.spi.ConnectionFactory import io.r2dbc.spi.R2dbcDataIntegrityViolationException import io.r2dbc.spi.Row import io.r2dbc.spi.Statement @@ -90,7 +89,6 @@ private[r2dbc] class PostgresDurableStateDao( protected def log: Logger = PostgresDurableStateDao.log private val persistenceExt = Persistence(system) - protected val r2dbcExecutor = executorProvider.executorFor(slice = 0) // FIXME support data partitions // used for change events private lazy val journalDao: JournalDao = dialect.createJournalDao(settings, executorProvider) @@ -109,15 +107,18 @@ private[r2dbc] class PostgresDurableStateDao( } } - protected def selectStateSql(entityType: String): String = { - val stateTable = settings.getDurableStateTableWithSchema(entityType) + protected def durableStateTable(entityType: String, slice: Int): String = + settings.getDurableStateTableWithSchema(entityType, slice) + + protected def selectStateSql(slice: Int, entityType: String): String = { + val stateTable = durableStateTable(entityType, slice) sql""" SELECT revision, state_ser_id, state_ser_manifest, state_payload, db_timestamp FROM $stateTable WHERE persistence_id = ?""" } protected def selectBucketsSql(entityType: String, minSlice: Int, maxSlice: Int): String = { - val stateTable = settings.getDurableStateTableWithSchema(entityType) + val stateTable = durableStateTable(entityType, minSlice) sql""" SELECT extract(EPOCH from db_timestamp)::BIGINT / 10 AS bucket, count(*) AS count FROM $stateTable @@ -132,9 +133,10 @@ private[r2dbc] class PostgresDurableStateDao( s"slice in (${(minSlice to maxSlice).mkString(",")})" protected def insertStateSql( + slice: Int, entityType: String, additionalBindings: immutable.IndexedSeq[EvaluatedAdditionalColumnBindings]): String = { - val stateTable = settings.getDurableStateTableWithSchema(entityType) + val stateTable = durableStateTable(entityType, slice) val additionalCols = additionalInsertColumns(additionalBindings) val additionalParams = additionalInsertParameters(additionalBindings) sql""" @@ -175,12 +177,13 @@ private[r2dbc] class PostgresDurableStateDao( } protected def updateStateSql( + slice: Int, entityType: String, updateTags: Boolean, additionalBindings: immutable.IndexedSeq[EvaluatedAdditionalColumnBindings], currentTimestamp: String = "CURRENT_TIMESTAMP"): String = { - val stateTable = settings.getDurableStateTableWithSchema(entityType) + val stateTable = durableStateTable(entityType, slice) val timestamp = if (settings.dbTimestampMonotonicIncreasing) @@ -219,8 +222,8 @@ private[r2dbc] class PostgresDurableStateDao( } } - protected def hardDeleteStateSql(entityType: String): String = { - val stateTable = settings.getDurableStateTableWithSchema(entityType) + protected def hardDeleteStateSql(entityType: String, slice: Int): String = { + val stateTable = durableStateTable(entityType, slice) sql"DELETE from $stateTable WHERE persistence_id = ?" } @@ -263,7 +266,8 @@ private[r2dbc] class PostgresDurableStateDao( backtracking: Boolean, minSlice: Int, maxSlice: Int): String = { - val stateTable = settings.getDurableStateTableWithSchema(entityType) + + val stateTable = durableStateTable(entityType, minSlice) def maxDbTimestampParamCondition = if (maxDbTimestampParam) s"AND db_timestamp < ?" else "" @@ -288,10 +292,12 @@ private[r2dbc] class PostgresDurableStateDao( override def readState(persistenceId: String): Future[Option[SerializedStateRow]] = { val entityType = PersistenceId.extractEntityType(persistenceId) - r2dbcExecutor.selectOne(s"select [$persistenceId]")( + val slice = persistenceExt.sliceForPersistenceId(persistenceId) + val executor = executorProvider.executorFor(slice) + executor.selectOne(s"select [$persistenceId]")( connection => connection - .createStatement(selectStateSql(entityType)) + .createStatement(selectStateSql(slice, entityType)) .bind(0, persistenceId), row => SerializedStateRow( @@ -339,6 +345,8 @@ private[r2dbc] class PostgresDurableStateDao( value: Any, changeEvent: Option[SerializedJournalRow]): Future[Option[Instant]] = { require(state.revision > 0) + val slice = persistenceExt.sliceForPersistenceId(state.persistenceId) + val executor = executorProvider.executorFor(slice) def bindTags(stmt: Statement, i: Int): Statement = { if (state.tags.isEmpty) @@ -377,10 +385,9 @@ private[r2dbc] class PostgresDurableStateDao( } if (state.revision == 1) { - val slice = persistenceExt.sliceForPersistenceId(state.persistenceId) def insertStatement(connection: Connection): Statement = { val stmt = connection - .createStatement(insertStateSql(entityType, additionalBindings)) + .createStatement(insertStateSql(slice, entityType, additionalBindings)) .bind(idx.next(), slice) .bind(idx.next(), entityType) .bind(idx.next(), state.persistenceId) @@ -404,10 +411,10 @@ private[r2dbc] class PostgresDurableStateDao( if (!changeHandlers.contains(entityType) && changeEvent.isEmpty) { val updatedRows = recoverDataIntegrityViolation( - r2dbcExecutor.updateOne(s"insert [${state.persistenceId}]")(insertStatement)) + executor.updateOne(s"insert [${state.persistenceId}]")(insertStatement)) updatedRows.map(_ -> None) } else - r2dbcExecutor.withConnection(s"insert [${state.persistenceId}]") { connection => + executor.withConnection(s"insert [${state.persistenceId}]") { connection => for { updatedRows <- recoverDataIntegrityViolation(R2dbcExecutor.updateOneInTx(insertStatement(connection))) changeEventTimestamp <- writeChangeEventAndCallChangeHandler( @@ -423,7 +430,7 @@ private[r2dbc] class PostgresDurableStateDao( def updateStatement(connection: Connection): Statement = { val stmt = connection - .createStatement(updateStateSql(entityType, updateTags = true, additionalBindings)) + .createStatement(updateStateSql(slice, entityType, updateTags = true, additionalBindings)) .bind(idx.next(), state.revision) .bind(idx.next(), state.serId) .bind(idx.next(), state.serManifest) @@ -455,10 +462,10 @@ private[r2dbc] class PostgresDurableStateDao( } if (!changeHandlers.contains(entityType) && changeEvent.isEmpty) { - val updatedRows = r2dbcExecutor.updateOne(s"update [${state.persistenceId}]")(updateStatement) + val updatedRows = executor.updateOne(s"update [${state.persistenceId}]")(updateStatement) updatedRows.map(_ -> None) } else - r2dbcExecutor.withConnection(s"update [${state.persistenceId}]") { connection => + executor.withConnection(s"update [${state.persistenceId}]") { connection => for { updatedRows <- R2dbcExecutor.updateOneInTx(updateStatement(connection)) changeEventTimestamp <- writeChangeEventAndCallChangeHandler( @@ -512,6 +519,8 @@ private[r2dbc] class PostgresDurableStateDao( hardDeleteState(persistenceId) .map(_ => None)(ExecutionContexts.parasitic) } else { + val slice = persistenceExt.sliceForPersistenceId(persistenceId) + val executor = executorProvider.executorFor(slice) val result: Future[(Long, Option[Instant])] = { val entityType = PersistenceId.extractEntityType(persistenceId) def change = @@ -522,7 +531,7 @@ private[r2dbc] class PostgresDurableStateDao( def insertDeleteMarkerStatement(connection: Connection): Statement = { val stmt = connection .createStatement( - insertStateSql(entityType, Vector.empty) + insertStateSql(slice, entityType, Vector.empty) ) // FIXME should the additional columns be cleared (null)? Then they must allow NULL .bind(0, slice) .bind(1, entityType) @@ -542,7 +551,7 @@ private[r2dbc] class PostgresDurableStateDao( s"Insert delete marker with revision 1 failed: durable state for persistence id [$persistenceId] already exists")) } - r2dbcExecutor.withConnection(s"insert delete marker [$persistenceId]") { connection => + executor.withConnection(s"insert delete marker [$persistenceId]") { connection => for { updatedRows <- recoverDataIntegrityViolation( R2dbcExecutor.updateOneInTx(insertDeleteMarkerStatement(connection))) @@ -563,7 +572,7 @@ private[r2dbc] class PostgresDurableStateDao( val stmt = connection .createStatement( - updateStateSql(entityType, updateTags = false, Vector.empty) + updateStateSql(slice, entityType, updateTags = false, Vector.empty) ) // FIXME should the additional columns be cleared (null)? Then they must allow NULL .bind(idx.next(), revision) .bind(idx.next(), 0) @@ -593,7 +602,7 @@ private[r2dbc] class PostgresDurableStateDao( } } - r2dbcExecutor.withConnection(s"delete [$persistenceId]") { connection => + executor.withConnection(s"delete [$persistenceId]") { connection => for { updatedRows <- R2dbcExecutor.updateOneInTx(updateStatement(connection)) changeEventTimestamp <- writeChangeEventAndCallChangeHandler( @@ -622,16 +631,18 @@ private[r2dbc] class PostgresDurableStateDao( private def hardDeleteState(persistenceId: String): Future[Done] = { val entityType = PersistenceId.extractEntityType(persistenceId) + val slice = persistenceExt.sliceForPersistenceId(persistenceId) + val executor = executorProvider.executorFor(slice) val changeHandler = changeHandlers.get(entityType) val changeHandlerHint = changeHandler.map(_ => " with change handler").getOrElse("") val result = - r2dbcExecutor.withConnection(s"hard delete [$persistenceId]$changeHandlerHint") { connection => + executor.withConnection(s"hard delete [$persistenceId]$changeHandlerHint") { connection => for { updatedRows <- R2dbcExecutor.updateOneInTx( connection - .createStatement(hardDeleteStateSql(entityType)) + .createStatement(hardDeleteStateSql(entityType, slice)) .bind(0, persistenceId)) _ <- { val change = new DeletedDurableState[Any](persistenceId, 0L, NoOffset, EmptyDbTimestamp.toEpochMilli) @@ -647,7 +658,8 @@ private[r2dbc] class PostgresDurableStateDao( } override def currentDbTimestamp(slice: Int): Future[Instant] = { - r2dbcExecutor + val executor = executorProvider.executorFor(slice) + executor .selectOne("select current db timestamp")( connection => connection.createStatement(currentDbTimestampSql), row => row.getTimestamp("db_timestamp")) @@ -686,7 +698,13 @@ private[r2dbc] class PostgresDurableStateDao( toTimestamp: Option[Instant], behindCurrentTime: FiniteDuration, backtracking: Boolean): Source[SerializedStateRow, NotUsed] = { - val result = r2dbcExecutor.select(s"select stateBySlices [$minSlice - $maxSlice]")( + if (!settings.isSliceRangeWithinSameDataPartition(minSlice, maxSlice)) + throw new IllegalArgumentException( + s"Slice range [$minSlice-$maxSlice] spans over more than one " + + s"of the [${settings.numberOfDataPartitions}] data partitions.") + + val executor = executorProvider.executorFor(minSlice) + val result = executor.select(s"select stateBySlices [$minSlice - $maxSlice]")( connection => { val stmt = connection .createStatement( @@ -755,6 +773,7 @@ private[r2dbc] class PostgresDurableStateDao( } } + // FIXME data partition val customTables = settings.durableStateTableByEntityTypeWithSchema.toVector.sortBy(_._1).map(_._2) val ids = for { fromDefaultTable <- readPersistenceIds(afterId, limit, settings.durableStateTableWithSchema) @@ -783,7 +802,8 @@ private[r2dbc] class PostgresDurableStateDao( afterId: Option[String], limit: Long, table: String): Future[immutable.IndexedSeq[String]] = { - val result = r2dbcExecutor.select(s"select persistenceIds")( + val executor = executorProvider.executorFor(slice = 0) // FIXME + val result = executor.select(s"select persistenceIds")( connection => afterId match { case Some(after) => @@ -812,9 +832,10 @@ private[r2dbc] class PostgresDurableStateDao( } override def persistenceIds(entityType: String, afterId: Option[String], limit: Long): Source[String, NotUsed] = { - val table = settings.getDurableStateTableWithSchema(entityType) + val table = settings.getDurableStateTableWithSchema(entityType) // FIXME val likeStmtPostfix = PersistenceId.DefaultSeparator + "%" - val result = r2dbcExecutor.select(s"select persistenceIds by entity type")( + val executor = executorProvider.executorFor(slice = 0) // FIXME + val result = executor.select(s"select persistenceIds by entity type")( connection => afterId match { case Some(after) => @@ -869,7 +890,8 @@ private[r2dbc] class PostgresDurableStateDao( } } - val result = r2dbcExecutor.select(s"select bucket counts [$minSlice - $maxSlice]")( + val executor = executorProvider.executorFor(minSlice) + val result = executor.select(s"select bucket counts [$minSlice - $maxSlice]")( connection => { val stmt = connection.createStatement(selectBucketsSql(entityType, minSlice, maxSlice)) bindSelectBucketSql(stmt, entityType, fromTimestamp, toTimestamp, limit) diff --git a/core/src/main/scala/akka/persistence/r2dbc/internal/postgres/PostgresSnapshotDao.scala b/core/src/main/scala/akka/persistence/r2dbc/internal/postgres/PostgresSnapshotDao.scala index 551d5570..f7f6df81 100644 --- a/core/src/main/scala/akka/persistence/r2dbc/internal/postgres/PostgresSnapshotDao.scala +++ b/core/src/main/scala/akka/persistence/r2dbc/internal/postgres/PostgresSnapshotDao.scala @@ -21,6 +21,7 @@ import akka.actor.typed.ActorSystem import akka.actor.typed.scaladsl.LoggerOps import akka.annotation.InternalApi import akka.dispatch.ExecutionContexts +import akka.persistence.Persistence import akka.persistence.SnapshotSelectionCriteria import akka.persistence.r2dbc.R2dbcSettings import akka.persistence.r2dbc.internal.BySliceQuery.Buckets @@ -59,15 +60,15 @@ private[r2dbc] class PostgresSnapshotDao(settings: R2dbcSettings, executorProvid protected def log: Logger = PostgresSnapshotDao.log - protected val snapshotTable: String = settings.snapshotsTableWithSchema + protected val persistenceExt: Persistence = Persistence(system) - protected val r2dbcExecutor = executorProvider.executorFor(slice = 0) // FIXME support data partitions + protected def snapshotTable(slice: Int): String = settings.snapshotTableWithSchema(slice) - protected def createUpsertSql: String = { + protected def upsertSql(slice: Int): String = { // db_timestamp and tags columns were added in 1.2.0 if (settings.querySettings.startFromSnapshotEnabled) sql""" - INSERT INTO $snapshotTable + INSERT INTO ${snapshotTable(slice)} (slice, entity_type, persistence_id, seq_nr, write_timestamp, snapshot, ser_id, ser_manifest, meta_payload, meta_ser_id, meta_ser_manifest, db_timestamp, tags) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) ON CONFLICT (persistence_id) @@ -84,7 +85,7 @@ private[r2dbc] class PostgresSnapshotDao(settings: R2dbcSettings, executorProvid meta_ser_manifest = excluded.meta_ser_manifest""" else sql""" - INSERT INTO $snapshotTable + INSERT INTO ${snapshotTable(slice)} (slice, entity_type, persistence_id, seq_nr, write_timestamp, snapshot, ser_id, ser_manifest, meta_payload, meta_ser_id, meta_ser_manifest) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) ON CONFLICT (persistence_id) @@ -99,9 +100,7 @@ private[r2dbc] class PostgresSnapshotDao(settings: R2dbcSettings, executorProvid meta_ser_manifest = excluded.meta_ser_manifest""" } - private val upsertSql = createUpsertSql - - protected def selectSql(criteria: SnapshotSelectionCriteria): String = { + protected def selectSql(slice: Int, criteria: SnapshotSelectionCriteria): String = { val maxSeqNrCondition = if (criteria.maxSequenceNr != Long.MaxValue) " AND seq_nr <= ?" else "" @@ -122,20 +121,20 @@ private[r2dbc] class PostgresSnapshotDao(settings: R2dbcSettings, executorProvid if (settings.querySettings.startFromSnapshotEnabled) sql""" SELECT slice, persistence_id, seq_nr, db_timestamp, write_timestamp, snapshot, ser_id, ser_manifest, tags, meta_payload, meta_ser_id, meta_ser_manifest - FROM $snapshotTable + FROM ${snapshotTable(slice)} WHERE persistence_id = ? $maxSeqNrCondition $minSeqNrCondition $maxTimestampCondition $minTimestampCondition LIMIT 1""" else sql""" SELECT slice, persistence_id, seq_nr, write_timestamp, snapshot, ser_id, ser_manifest, meta_payload, meta_ser_id, meta_ser_manifest - FROM $snapshotTable + FROM ${snapshotTable(slice)} WHERE persistence_id = ? $maxSeqNrCondition $minSeqNrCondition $maxTimestampCondition $minTimestampCondition LIMIT 1""" } - private def deleteSql(criteria: SnapshotSelectionCriteria): String = { + private def deleteSql(slice: Int, criteria: SnapshotSelectionCriteria): String = { val maxSeqNrCondition = if (criteria.maxSequenceNr != Long.MaxValue) " AND seq_nr <= ?" else "" @@ -153,7 +152,7 @@ private[r2dbc] class PostgresSnapshotDao(settings: R2dbcSettings, executorProvid else "" sql""" - DELETE FROM $snapshotTable + DELETE FROM ${snapshotTable(slice)} WHERE persistence_id = ? $maxSeqNrCondition $minSeqNrCondition $maxTimestampCondition $minTimestampCondition""" } @@ -165,7 +164,7 @@ private[r2dbc] class PostgresSnapshotDao(settings: R2dbcSettings, executorProvid sql""" SELECT slice, persistence_id, seq_nr, db_timestamp, write_timestamp, snapshot, ser_id, ser_manifest, tags, meta_payload, meta_ser_id, meta_ser_manifest - FROM $snapshotTable + FROM ${snapshotTable(minSlice)} WHERE entity_type = ? AND ${sliceCondition(minSlice, maxSlice)} AND db_timestamp >= ? @@ -176,7 +175,7 @@ private[r2dbc] class PostgresSnapshotDao(settings: R2dbcSettings, executorProvid protected def selectBucketsSql(entityType: String, minSlice: Int, maxSlice: Int): String = { sql""" SELECT extract(EPOCH from db_timestamp)::BIGINT / 10 AS bucket, count(*) AS count - FROM $snapshotTable + FROM ${snapshotTable(minSlice)} WHERE entity_type = ? AND ${sliceCondition(minSlice, maxSlice)} AND db_timestamp >= ? AND db_timestamp <= ? @@ -232,11 +231,13 @@ private[r2dbc] class PostgresSnapshotDao(settings: R2dbcSettings, executorProvid persistenceId: String, criteria: SnapshotSelectionCriteria): Future[Option[SerializedSnapshotRow]] = { val entityType = PersistenceId.extractEntityType(persistenceId) - r2dbcExecutor + val slice = persistenceExt.sliceForPersistenceId(persistenceId) + val executor = executorProvider.executorFor(slice) + executor .select(s"select snapshot [$persistenceId], criteria: [$criteria]")( { connection => val statement = connection - .createStatement(selectSql(criteria)) + .createStatement(selectSql(slice, criteria)) .bind(0, persistenceId) var bindIdx = 0 @@ -296,12 +297,14 @@ private[r2dbc] class PostgresSnapshotDao(settings: R2dbcSettings, executorProvid } def store(serializedRow: SerializedSnapshotRow): Future[Unit] = { - r2dbcExecutor + val slice = persistenceExt.sliceForPersistenceId(serializedRow.persistenceId) + val executor = executorProvider.executorFor(slice) + executor .updateOne(s"upsert snapshot [${serializedRow.persistenceId}], sequence number [${serializedRow.seqNr}]") { connection => val statement = connection - .createStatement(upsertSql) + .createStatement(upsertSql(slice)) bindUpsertSql(statement, serializedRow) @@ -310,9 +313,11 @@ private[r2dbc] class PostgresSnapshotDao(settings: R2dbcSettings, executorProvid } def delete(persistenceId: String, criteria: SnapshotSelectionCriteria): Future[Unit] = { - r2dbcExecutor.updateOne(s"delete snapshot [$persistenceId], criteria [$criteria]") { connection => + val slice = persistenceExt.sliceForPersistenceId(persistenceId) + val executor = executorProvider.executorFor(slice) + executor.updateOne(s"delete snapshot [$persistenceId], criteria [$criteria]") { connection => val statement = connection - .createStatement(deleteSql(criteria)) + .createStatement(deleteSql(slice, criteria)) .bind(0, persistenceId) var bindIdx = 0 @@ -340,7 +345,8 @@ private[r2dbc] class PostgresSnapshotDao(settings: R2dbcSettings, executorProvid * This is used from `BySliceQuery`, i.e. only if settings.querySettings.startFromSnapshotEnabled */ override def currentDbTimestamp(slice: Int): Future[Instant] = { - r2dbcExecutor + val executor = executorProvider.executorFor(slice) + executor .selectOne("select current db timestamp")( connection => connection.createStatement(currentDbTimestampSql), row => row.getTimestamp("db_timestamp")) @@ -372,7 +378,13 @@ private[r2dbc] class PostgresSnapshotDao(settings: R2dbcSettings, executorProvid toTimestamp: Option[Instant], behindCurrentTime: FiniteDuration, backtracking: Boolean): Source[SerializedSnapshotRow, NotUsed] = { - val result = r2dbcExecutor.select(s"select snapshotsBySlices [$minSlice - $maxSlice]")( + if (!settings.isSliceRangeWithinSameDataPartition(minSlice, maxSlice)) + throw new IllegalArgumentException( + s"Slice range [$minSlice-$maxSlice] spans over more than one " + + s"of the [${settings.numberOfDataPartitions}] data partitions.") + + val executor = executorProvider.executorFor(minSlice) + val result = executor.select(s"select snapshotsBySlices [$minSlice - $maxSlice]")( connection => { val stmt = connection.createStatement(snapshotsBySlicesRangeSql(minSlice, maxSlice)) bindSnapshotsBySlicesRangeSql(stmt, entityType, fromTimestamp, settings.querySettings.bufferSize) @@ -425,7 +437,9 @@ private[r2dbc] class PostgresSnapshotDao(settings: R2dbcSettings, executorProvid } } - val result = r2dbcExecutor.select(s"select bucket counts [$minSlice - $maxSlice]")( + val executor = executorProvider.executorFor(minSlice) + + val result = executor.select(s"select bucket counts [$minSlice - $maxSlice]")( connection => { val stmt = connection.createStatement(selectBucketsSql(entityType, minSlice, maxSlice)) bindSelectBucketsSql(stmt, entityType, fromTimestamp, toTimestamp, limit) diff --git a/core/src/main/scala/akka/persistence/r2dbc/internal/sqlserver/SqlServerDurableStateDao.scala b/core/src/main/scala/akka/persistence/r2dbc/internal/sqlserver/SqlServerDurableStateDao.scala index 43da199f..6ef09a4d 100644 --- a/core/src/main/scala/akka/persistence/r2dbc/internal/sqlserver/SqlServerDurableStateDao.scala +++ b/core/src/main/scala/akka/persistence/r2dbc/internal/sqlserver/SqlServerDurableStateDao.scala @@ -51,9 +51,10 @@ private[r2dbc] class SqlServerDurableStateDao( override def log: Logger = SqlServerDurableStateDao.log override protected def insertStateSql( + slice: Int, entityType: String, additionalBindings: immutable.IndexedSeq[EvaluatedAdditionalColumnBindings]): String = { - val stateTable = settings.getDurableStateTableWithSchema(entityType) + val stateTable = durableStateTable(entityType, slice) val additionalCols = additionalInsertColumns(additionalBindings) val additionalParams = additionalInsertParameters(additionalBindings) sql""" @@ -66,14 +67,15 @@ private[r2dbc] class SqlServerDurableStateDao( * here, the currentTimestamp is another query param. Binding is happening in the overridden method `bindTimestampNow` */ override protected def updateStateSql( + slice: Int, entityType: String, updateTags: Boolean, additionalBindings: immutable.IndexedSeq[EvaluatedAdditionalColumnBindings], currentTimestamp: String): String = - super.updateStateSql(entityType, updateTags, additionalBindings, currentTimestamp = "?") + super.updateStateSql(slice, entityType, updateTags, additionalBindings, currentTimestamp = "?") override def selectBucketsSql(entityType: String, minSlice: Int, maxSlice: Int): String = { - val stateTable = settings.getDurableStateTableWithSchema(entityType) + val stateTable = durableStateTable(entityType, minSlice) val subQuery = s""" @@ -130,7 +132,7 @@ private[r2dbc] class SqlServerDurableStateDao( s"AND db_timestamp < DATEADD(ms, -${behindCurrentTime.toMillis}, @now)" else "" - val stateTable = settings.getDurableStateTableWithSchema(entityType) + val stateTable = durableStateTable(entityType, minSlice) def maxDbTimestampParamCondition = if (maxDbTimestampParam) s"AND db_timestamp < @until" else "" diff --git a/core/src/main/scala/akka/persistence/r2dbc/internal/sqlserver/SqlServerSnapshotDao.scala b/core/src/main/scala/akka/persistence/r2dbc/internal/sqlserver/SqlServerSnapshotDao.scala index 4c574ef5..155e3a06 100644 --- a/core/src/main/scala/akka/persistence/r2dbc/internal/sqlserver/SqlServerSnapshotDao.scala +++ b/core/src/main/scala/akka/persistence/r2dbc/internal/sqlserver/SqlServerSnapshotDao.scala @@ -47,7 +47,7 @@ private[r2dbc] class SqlServerSnapshotDao(settings: R2dbcSettings, executorProvi override def log: Logger = SqlServerSnapshotDao.log - override def selectSql(criteria: SnapshotSelectionCriteria): String = { + override def selectSql(slice: Int, criteria: SnapshotSelectionCriteria): String = { val maxSeqNrCondition = if (criteria.maxSequenceNr != Long.MaxValue) " AND seq_nr <= @maxSeqNr" else "" @@ -67,23 +67,23 @@ private[r2dbc] class SqlServerSnapshotDao(settings: R2dbcSettings, executorProvi if (settings.querySettings.startFromSnapshotEnabled) sql""" SELECT TOP(1) slice, persistence_id, seq_nr, db_timestamp, write_timestamp, snapshot, ser_id, ser_manifest, tags, meta_payload, meta_ser_id, meta_ser_manifest - FROM $snapshotTable + FROM ${snapshotTable(slice)} WHERE persistence_id = @persistenceId $maxSeqNrCondition $minSeqNrCondition $maxTimestampCondition $minTimestampCondition """ else sql""" SELECT TOP (1) slice, persistence_id, seq_nr, write_timestamp, snapshot, ser_id, ser_manifest, meta_payload, meta_ser_id, meta_ser_manifest - FROM $snapshotTable + FROM ${snapshotTable(slice)} WHERE persistence_id = @persistenceId $maxSeqNrCondition $minSeqNrCondition $maxTimestampCondition $minTimestampCondition """ } - override protected def createUpsertSql: String = { + override protected def upsertSql(slice: Int): String = { if (settings.querySettings.startFromSnapshotEnabled) sql""" - UPDATE $snapshotTable SET + UPDATE ${snapshotTable(slice)} SET seq_nr = @seqNr, db_timestamp = @dbTimestamp, write_timestamp = @writeTimestamp, @@ -96,13 +96,13 @@ private[r2dbc] class SqlServerSnapshotDao(settings: R2dbcSettings, executorProvi meta_ser_manifest = @metaSerManifest where persistence_id = @persistenceId if @@ROWCOUNT = 0 - INSERT INTO $snapshotTable + INSERT INTO ${snapshotTable(slice)} (slice, entity_type, persistence_id, seq_nr, write_timestamp, snapshot, ser_id, ser_manifest, meta_payload, meta_ser_id, meta_ser_manifest, db_timestamp, tags) VALUES (@slice, @entityType, @persistenceId, @seqNr, @writeTimestamp, @snapshot, @serId, @serManifest, @metaPayload, @metaSerId, @metaSerManifest, @dbTimestamp, @tags) """ else sql""" - UPDATE $snapshotTable SET + UPDATE ${snapshotTable(slice)} SET seq_nr = @seqNr, write_timestamp = @writeTimestamp, snapshot = @snapshot, @@ -114,7 +114,7 @@ private[r2dbc] class SqlServerSnapshotDao(settings: R2dbcSettings, executorProvi tags = @tags where persistence_id = @persistenceId if @@ROWCOUNT = 0 - INSERT INTO $snapshotTable + INSERT INTO ${snapshotTable(slice)} (slice, entity_type, persistence_id, seq_nr, write_timestamp, snapshot, ser_id, ser_manifest, meta_payload, meta_ser_id, meta_ser_manifest, tags) VALUES (@slice, @entityType, @persistenceId, @seqNr, @writeTimestamp, @snapshot, @serId, @serManifest, @metaPayload, @metaSerId, @metaSerManifest, @tags) """ @@ -168,13 +168,12 @@ private[r2dbc] class SqlServerSnapshotDao(settings: R2dbcSettings, executorProvi } override protected def selectBucketsSql(entityType: String, minSlice: Int, maxSlice: Int): String = { - val stateTable = settings.getDurableStateTableWithSchema(entityType) // group by column alias (bucket) needs a sub query val subQuery = s""" select TOP(@limit) CAST(DATEDIFF(s,'1970-01-01 00:00:00',db_timestamp) AS BIGINT) / 10 AS bucket - FROM $stateTable + FROM ${snapshotTable(minSlice)} WHERE entity_type = @entityType AND ${sliceCondition(minSlice, maxSlice)} AND db_timestamp >= @fromTimestamp AND db_timestamp <= @toTimestamp @@ -199,7 +198,7 @@ private[r2dbc] class SqlServerSnapshotDao(settings: R2dbcSettings, executorProvi override protected def snapshotsBySlicesRangeSql(minSlice: Int, maxSlice: Int): String = sql""" SELECT TOP(@bufferSize) slice, persistence_id, seq_nr, db_timestamp, write_timestamp, snapshot, ser_id, ser_manifest, tags, meta_payload, meta_ser_id, meta_ser_manifest - FROM $snapshotTable + FROM ${snapshotTable(minSlice)} WHERE entity_type = @entityType AND ${sliceCondition(minSlice, maxSlice)} AND db_timestamp >= @fromTimestamp diff --git a/core/src/main/scala/akka/persistence/r2dbc/state/scaladsl/R2dbcDurableStateStore.scala b/core/src/main/scala/akka/persistence/r2dbc/state/scaladsl/R2dbcDurableStateStore.scala index c8473758..1836c8cf 100644 --- a/core/src/main/scala/akka/persistence/r2dbc/state/scaladsl/R2dbcDurableStateStore.scala +++ b/core/src/main/scala/akka/persistence/r2dbc/state/scaladsl/R2dbcDurableStateStore.scala @@ -377,6 +377,7 @@ class R2dbcDurableStateStore[A](system: ExtendedActorSystem, config: Config, cfg } } + // FIXME data partitions val customTables = settings.durableStateTableByEntityTypeWithSchema.toList.sortBy(_._1).map(_._2) val tables = settings.durableStateTableWithSchema :: customTables diff --git a/core/src/test/scala/akka/persistence/r2dbc/R2dbcSettingsSpec.scala b/core/src/test/scala/akka/persistence/r2dbc/R2dbcSettingsSpec.scala index e3171233..32d67cf8 100644 --- a/core/src/test/scala/akka/persistence/r2dbc/R2dbcSettingsSpec.scala +++ b/core/src/test/scala/akka/persistence/r2dbc/R2dbcSettingsSpec.scala @@ -27,8 +27,8 @@ class R2dbcSettingsSpec extends AnyWordSpec with TestSuite with Matchers { .withFallback(ConfigFactory.load("application-postgres.conf")) val settings = R2dbcSettings(config.getConfig("akka.persistence.r2dbc")) settings.journalTableWithSchema(0) shouldBe "s1.event_journal" - settings.snapshotsTableWithSchema shouldBe "s1.snapshot" - settings.durableStateTableWithSchema shouldBe "s1.durable_state" + settings.snapshotTableWithSchema(0) shouldBe "s1.snapshot" + settings.durableStateTableWithSchema(0) shouldBe "s1.durable_state" // by default connection is configured with options val connectionFactorySettings = postgresConnectionFactorySettings(config) @@ -42,7 +42,6 @@ class R2dbcSettingsSpec extends AnyWordSpec with TestSuite with Matchers { .parseString("akka.persistence.r2dbc.connection-factory.url=whatever-url") .withFallback(ConfigFactory.load("application-postgres.conf")) - val settings = R2dbcSettings(config.getConfig("akka.persistence.r2dbc")) val connectionFactorySettings = postgresConnectionFactorySettings(config) connectionFactorySettings shouldBe a[PostgresConnectionFactorySettings] connectionFactorySettings.urlOption shouldBe defined @@ -52,7 +51,6 @@ class R2dbcSettingsSpec extends AnyWordSpec with TestSuite with Matchers { val config = ConfigFactory .parseString("akka.persistence.r2dbc.connection-factory.ssl.mode=VERIFY_FULL") .withFallback(ConfigFactory.load("application-postgres.conf")) - val settings = R2dbcSettings(config.getConfig("akka.persistence.r2dbc")) val connectionFactorySettings = postgresConnectionFactorySettings(config) connectionFactorySettings.sslMode shouldBe "VERIFY_FULL" SSLMode.fromValue(connectionFactorySettings.sslMode) shouldBe SSLMode.VERIFY_FULL @@ -62,7 +60,6 @@ class R2dbcSettingsSpec extends AnyWordSpec with TestSuite with Matchers { val config = ConfigFactory .parseString("akka.persistence.r2dbc.connection-factory.ssl.mode=verify-full") .withFallback(ConfigFactory.load("application-postgres.conf")) - val settings = R2dbcSettings(config.getConfig("akka.persistence.r2dbc")) val connectionFactorySettings = postgresConnectionFactorySettings(config) connectionFactorySettings.sslMode shouldBe "verify-full" SSLMode.fromValue(connectionFactorySettings.sslMode) shouldBe SSLMode.VERIFY_FULL @@ -115,6 +112,7 @@ class R2dbcSettingsSpec extends AnyWordSpec with TestSuite with Matchers { """) .withFallback(ConfigFactory.load("application-postgres.conf")) val settings = R2dbcSettings(config.getConfig("akka.persistence.r2dbc")) + settings.journalTableWithSchema(slice = 0) shouldBe "s1.event_journal_0" settings.journalTableWithSchema(slice = 17) shouldBe "s1.event_journal_0" settings.journalTableWithSchema(slice = 256) shouldBe "s1.event_journal_1" @@ -123,6 +121,24 @@ class R2dbcSettingsSpec extends AnyWordSpec with TestSuite with Matchers { settings.journalTableWithSchema(slice = 767) shouldBe "s1.event_journal_2" settings.journalTableWithSchema(slice = 768) shouldBe "s1.event_journal_3" settings.journalTableWithSchema(slice = 1023) shouldBe "s1.event_journal_3" + + settings.snapshotTableWithSchema(slice = 0) shouldBe "s1.snapshot_0" + settings.snapshotTableWithSchema(slice = 17) shouldBe "s1.snapshot_0" + settings.snapshotTableWithSchema(slice = 256) shouldBe "s1.snapshot_1" + settings.snapshotTableWithSchema(slice = 511) shouldBe "s1.snapshot_1" + settings.snapshotTableWithSchema(slice = 512) shouldBe "s1.snapshot_2" + settings.snapshotTableWithSchema(slice = 767) shouldBe "s1.snapshot_2" + settings.snapshotTableWithSchema(slice = 768) shouldBe "s1.snapshot_3" + settings.snapshotTableWithSchema(slice = 1023) shouldBe "s1.snapshot_3" + + settings.durableStateTableWithSchema(slice = 0) shouldBe "s1.durable_state_0" + settings.durableStateTableWithSchema(slice = 17) shouldBe "s1.durable_state_0" + settings.durableStateTableWithSchema(slice = 256) shouldBe "s1.durable_state_1" + settings.durableStateTableWithSchema(slice = 511) shouldBe "s1.durable_state_1" + settings.durableStateTableWithSchema(slice = 512) shouldBe "s1.durable_state_2" + settings.durableStateTableWithSchema(slice = 767) shouldBe "s1.durable_state_2" + settings.durableStateTableWithSchema(slice = 768) shouldBe "s1.durable_state_3" + settings.durableStateTableWithSchema(slice = 1023) shouldBe "s1.durable_state_3" } "verify slice range within same data partition" in { diff --git a/core/src/test/scala/akka/persistence/r2dbc/TestDbLifecycle.scala b/core/src/test/scala/akka/persistence/r2dbc/TestDbLifecycle.scala index 429e1680..b0c2991d 100644 --- a/core/src/test/scala/akka/persistence/r2dbc/TestDbLifecycle.scala +++ b/core/src/test/scala/akka/persistence/r2dbc/TestDbLifecycle.scala @@ -56,16 +56,18 @@ trait TestDbLifecycle extends BeforeAndAfterAll { this: Suite => r2dbcExecutor(minSlice).updateOne("beforeAll delete")(_.createStatement(s"delete from $table")), 10.seconds) } - Await.result( - r2dbcExecutor.updateOne("beforeAll delete")( - _.createStatement(s"delete from ${r2dbcSettings.snapshotsTableWithSchema}")), - 10.seconds) - Await.result( - r2dbcExecutor.updateOne("beforeAll delete")( - _.createStatement(s"delete from ${r2dbcSettings.durableStateTableWithSchema}")), - 10.seconds) + r2dbcSettings.allSnapshotTablesWithSchema.foreach { case (table, minSlice) => + Await.result( + r2dbcExecutor(minSlice).updateOne("beforeAll delete")(_.createStatement(s"delete from $table")), + 10.seconds) + } + r2dbcSettings.allDurableStateTablesWithSchema.foreach { case (table, minSlice) => + Await.result( + r2dbcExecutor(minSlice).updateOne("beforeAll delete")(_.createStatement(s"delete from $table")), + 10.seconds) + } } catch { - case NonFatal(ex) => throw new RuntimeException(s"Test db cleanup failed", ex) + case NonFatal(ex) => throw new RuntimeException(s"Test db creation failed", ex) } super.beforeAll() } diff --git a/core/src/test/scala/akka/persistence/r2dbc/state/CurrentPersistenceIdsQuerySpec.scala b/core/src/test/scala/akka/persistence/r2dbc/state/CurrentPersistenceIdsQuerySpec.scala index b299dc29..1f049cc6 100644 --- a/core/src/test/scala/akka/persistence/r2dbc/state/CurrentPersistenceIdsQuerySpec.scala +++ b/core/src/test/scala/akka/persistence/r2dbc/state/CurrentPersistenceIdsQuerySpec.scala @@ -103,6 +103,8 @@ class CurrentPersistenceIdsQuerySpec } "Durable State persistenceIds" should { + pendingIfMoreThanOneDataPartition() // FIXME + "retrieve all ids" in { val result = store.currentPersistenceIds().runWith(Sink.seq).futureValue result shouldBe pids.map(_.id) diff --git a/core/src/test/scala/akka/persistence/r2dbc/state/DurableStateBySliceSpec.scala b/core/src/test/scala/akka/persistence/r2dbc/state/DurableStateBySliceSpec.scala index ddef5dcb..1f82bc2d 100644 --- a/core/src/test/scala/akka/persistence/r2dbc/state/DurableStateBySliceSpec.scala +++ b/core/src/test/scala/akka/persistence/r2dbc/state/DurableStateBySliceSpec.scala @@ -20,7 +20,6 @@ import akka.persistence.query.NoOffset import akka.persistence.query.Offset import akka.persistence.query.TimestampOffset import akka.persistence.query.UpdatedDurableState -import akka.persistence.r2dbc.R2dbcSettings import akka.persistence.r2dbc.TestActors import akka.persistence.r2dbc.TestActors.DurableStatePersister.DeleteWithAck import akka.persistence.r2dbc.TestActors.DurableStatePersister.Persist @@ -63,7 +62,6 @@ class DurableStateBySliceSpec import DurableStateBySliceSpec._ override def typedSystem: ActorSystem[_] = system - private val settings = R2dbcSettings(system.settings.config.getConfig("akka.persistence.r2dbc")) private val query = DurableStateStoreRegistry(testKit.system) .durableStateStoreFor[R2dbcDurableStateStore[String]](R2dbcDurableStateStore.Identifier) diff --git a/core/src/test/scala/akka/persistence/r2dbc/state/DurableStateStoreAdditionalColumnSpec.scala b/core/src/test/scala/akka/persistence/r2dbc/state/DurableStateStoreAdditionalColumnSpec.scala index b62d6c86..cf9ca557 100644 --- a/core/src/test/scala/akka/persistence/r2dbc/state/DurableStateStoreAdditionalColumnSpec.scala +++ b/core/src/test/scala/akka/persistence/r2dbc/state/DurableStateStoreAdditionalColumnSpec.scala @@ -140,6 +140,8 @@ class DurableStateStoreAdditionalColumnSpec exists(s"persistence_id = '$persistenceId' and col3 is null") "The R2DBC durable state store" should { + pendingIfMoreThanOneDataPartition() // FIXME + "save and retrieve a value in custom table with additional columns" in { val entityType = "CustomEntity" val persistenceId = nextPid(entityType)