From c8e67a63f60d5ed08f18f5adf8f013487d2e0dfb Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Thu, 8 Feb 2024 13:49:42 +0100 Subject: [PATCH] remove durableStateTable in dao --- .../postgres/PostgresDurableStateDao.scala | 15 ++++++--------- .../sqlserver/SqlServerDurableStateDao.scala | 6 +++--- 2 files changed, 9 insertions(+), 12 deletions(-) diff --git a/core/src/main/scala/akka/persistence/r2dbc/internal/postgres/PostgresDurableStateDao.scala b/core/src/main/scala/akka/persistence/r2dbc/internal/postgres/PostgresDurableStateDao.scala index 214fb1de..efcaf7b0 100644 --- a/core/src/main/scala/akka/persistence/r2dbc/internal/postgres/PostgresDurableStateDao.scala +++ b/core/src/main/scala/akka/persistence/r2dbc/internal/postgres/PostgresDurableStateDao.scala @@ -107,18 +107,15 @@ private[r2dbc] class PostgresDurableStateDao(executorProvider: R2dbcExecutorProv } } - protected def durableStateTable(entityType: String, slice: Int): String = - settings.getDurableStateTableWithSchema(entityType, slice) - protected def selectStateSql(slice: Int, entityType: String): String = { - val stateTable = durableStateTable(entityType, slice) + val stateTable = settings.getDurableStateTableWithSchema(entityType, slice) sql""" SELECT revision, state_ser_id, state_ser_manifest, state_payload, db_timestamp FROM $stateTable WHERE persistence_id = ?""" } protected def selectBucketsSql(entityType: String, minSlice: Int, maxSlice: Int): String = { - val stateTable = durableStateTable(entityType, minSlice) + val stateTable = settings.getDurableStateTableWithSchema(entityType, minSlice) sql""" SELECT extract(EPOCH from db_timestamp)::BIGINT / 10 AS bucket, count(*) AS count FROM $stateTable @@ -136,7 +133,7 @@ private[r2dbc] class PostgresDurableStateDao(executorProvider: R2dbcExecutorProv slice: Int, entityType: String, additionalBindings: immutable.IndexedSeq[EvaluatedAdditionalColumnBindings]): String = { - val stateTable = durableStateTable(entityType, slice) + val stateTable = settings.getDurableStateTableWithSchema(entityType, slice) val additionalCols = additionalInsertColumns(additionalBindings) val additionalParams = additionalInsertParameters(additionalBindings) sql""" @@ -183,7 +180,7 @@ private[r2dbc] class PostgresDurableStateDao(executorProvider: R2dbcExecutorProv additionalBindings: immutable.IndexedSeq[EvaluatedAdditionalColumnBindings], currentTimestamp: String = "CURRENT_TIMESTAMP"): String = { - val stateTable = durableStateTable(entityType, slice) + val stateTable = settings.getDurableStateTableWithSchema(entityType, slice) val timestamp = if (settings.dbTimestampMonotonicIncreasing) @@ -223,7 +220,7 @@ private[r2dbc] class PostgresDurableStateDao(executorProvider: R2dbcExecutorProv } protected def hardDeleteStateSql(entityType: String, slice: Int): String = { - val stateTable = durableStateTable(entityType, slice) + val stateTable = settings.getDurableStateTableWithSchema(entityType, slice) sql"DELETE from $stateTable WHERE persistence_id = ?" } @@ -267,7 +264,7 @@ private[r2dbc] class PostgresDurableStateDao(executorProvider: R2dbcExecutorProv minSlice: Int, maxSlice: Int): String = { - val stateTable = durableStateTable(entityType, minSlice) + val stateTable = settings.getDurableStateTableWithSchema(entityType, minSlice) def maxDbTimestampParamCondition = if (maxDbTimestampParam) s"AND db_timestamp < ?" else "" diff --git a/core/src/main/scala/akka/persistence/r2dbc/internal/sqlserver/SqlServerDurableStateDao.scala b/core/src/main/scala/akka/persistence/r2dbc/internal/sqlserver/SqlServerDurableStateDao.scala index 1fd58512..71ac6108 100644 --- a/core/src/main/scala/akka/persistence/r2dbc/internal/sqlserver/SqlServerDurableStateDao.scala +++ b/core/src/main/scala/akka/persistence/r2dbc/internal/sqlserver/SqlServerDurableStateDao.scala @@ -48,7 +48,7 @@ private[r2dbc] class SqlServerDurableStateDao(executorProvider: R2dbcExecutorPro slice: Int, entityType: String, additionalBindings: immutable.IndexedSeq[EvaluatedAdditionalColumnBindings]): String = { - val stateTable = durableStateTable(entityType, slice) + val stateTable = settings.getDurableStateTableWithSchema(entityType, slice) val additionalCols = additionalInsertColumns(additionalBindings) val additionalParams = additionalInsertParameters(additionalBindings) sql""" @@ -69,7 +69,7 @@ private[r2dbc] class SqlServerDurableStateDao(executorProvider: R2dbcExecutorPro super.updateStateSql(slice, entityType, updateTags, additionalBindings, currentTimestamp = "?") override def selectBucketsSql(entityType: String, minSlice: Int, maxSlice: Int): String = { - val stateTable = durableStateTable(entityType, minSlice) + val stateTable = settings.getDurableStateTableWithSchema(entityType, minSlice) val subQuery = s""" @@ -126,7 +126,7 @@ private[r2dbc] class SqlServerDurableStateDao(executorProvider: R2dbcExecutorPro s"AND db_timestamp < DATEADD(ms, -${behindCurrentTime.toMillis}, @now)" else "" - val stateTable = durableStateTable(entityType, minSlice) + val stateTable = settings.getDurableStateTableWithSchema(entityType, minSlice) def maxDbTimestampParamCondition = if (maxDbTimestampParam) s"AND db_timestamp < @until" else ""