Skip to content

Commit

Permalink
remove durableStateTable in dao
Browse files Browse the repository at this point in the history
  • Loading branch information
patriknw committed Feb 9, 2024
1 parent 83d4957 commit c8e67a6
Show file tree
Hide file tree
Showing 2 changed files with 9 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -107,18 +107,15 @@ private[r2dbc] class PostgresDurableStateDao(executorProvider: R2dbcExecutorProv
}
}

protected def durableStateTable(entityType: String, slice: Int): String =
settings.getDurableStateTableWithSchema(entityType, slice)

protected def selectStateSql(slice: Int, entityType: String): String = {
val stateTable = durableStateTable(entityType, slice)
val stateTable = settings.getDurableStateTableWithSchema(entityType, slice)
sql"""
SELECT revision, state_ser_id, state_ser_manifest, state_payload, db_timestamp
FROM $stateTable WHERE persistence_id = ?"""
}

protected def selectBucketsSql(entityType: String, minSlice: Int, maxSlice: Int): String = {
val stateTable = durableStateTable(entityType, minSlice)
val stateTable = settings.getDurableStateTableWithSchema(entityType, minSlice)
sql"""
SELECT extract(EPOCH from db_timestamp)::BIGINT / 10 AS bucket, count(*) AS count
FROM $stateTable
Expand All @@ -136,7 +133,7 @@ private[r2dbc] class PostgresDurableStateDao(executorProvider: R2dbcExecutorProv
slice: Int,
entityType: String,
additionalBindings: immutable.IndexedSeq[EvaluatedAdditionalColumnBindings]): String = {
val stateTable = durableStateTable(entityType, slice)
val stateTable = settings.getDurableStateTableWithSchema(entityType, slice)
val additionalCols = additionalInsertColumns(additionalBindings)
val additionalParams = additionalInsertParameters(additionalBindings)
sql"""
Expand Down Expand Up @@ -183,7 +180,7 @@ private[r2dbc] class PostgresDurableStateDao(executorProvider: R2dbcExecutorProv
additionalBindings: immutable.IndexedSeq[EvaluatedAdditionalColumnBindings],
currentTimestamp: String = "CURRENT_TIMESTAMP"): String = {

val stateTable = durableStateTable(entityType, slice)
val stateTable = settings.getDurableStateTableWithSchema(entityType, slice)

val timestamp =
if (settings.dbTimestampMonotonicIncreasing)
Expand Down Expand Up @@ -223,7 +220,7 @@ private[r2dbc] class PostgresDurableStateDao(executorProvider: R2dbcExecutorProv
}

protected def hardDeleteStateSql(entityType: String, slice: Int): String = {
val stateTable = durableStateTable(entityType, slice)
val stateTable = settings.getDurableStateTableWithSchema(entityType, slice)
sql"DELETE from $stateTable WHERE persistence_id = ?"
}

Expand Down Expand Up @@ -267,7 +264,7 @@ private[r2dbc] class PostgresDurableStateDao(executorProvider: R2dbcExecutorProv
minSlice: Int,
maxSlice: Int): String = {

val stateTable = durableStateTable(entityType, minSlice)
val stateTable = settings.getDurableStateTableWithSchema(entityType, minSlice)

def maxDbTimestampParamCondition =
if (maxDbTimestampParam) s"AND db_timestamp < ?" else ""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ private[r2dbc] class SqlServerDurableStateDao(executorProvider: R2dbcExecutorPro
slice: Int,
entityType: String,
additionalBindings: immutable.IndexedSeq[EvaluatedAdditionalColumnBindings]): String = {
val stateTable = durableStateTable(entityType, slice)
val stateTable = settings.getDurableStateTableWithSchema(entityType, slice)
val additionalCols = additionalInsertColumns(additionalBindings)
val additionalParams = additionalInsertParameters(additionalBindings)
sql"""
Expand All @@ -69,7 +69,7 @@ private[r2dbc] class SqlServerDurableStateDao(executorProvider: R2dbcExecutorPro
super.updateStateSql(slice, entityType, updateTags, additionalBindings, currentTimestamp = "?")

override def selectBucketsSql(entityType: String, minSlice: Int, maxSlice: Int): String = {
val stateTable = durableStateTable(entityType, minSlice)
val stateTable = settings.getDurableStateTableWithSchema(entityType, minSlice)

val subQuery =
s"""
Expand Down Expand Up @@ -126,7 +126,7 @@ private[r2dbc] class SqlServerDurableStateDao(executorProvider: R2dbcExecutorPro
s"AND db_timestamp < DATEADD(ms, -${behindCurrentTime.toMillis}, @now)"
else ""

val stateTable = durableStateTable(entityType, minSlice)
val stateTable = settings.getDurableStateTableWithSchema(entityType, minSlice)

def maxDbTimestampParamCondition =
if (maxDbTimestampParam) s"AND db_timestamp < @until" else ""
Expand Down

0 comments on commit c8e67a6

Please sign in to comment.