Skip to content

Commit

Permalink
DurableStateStoreAdditionalColumnSpec
Browse files Browse the repository at this point in the history
  • Loading branch information
patriknw committed Feb 5, 2024
1 parent c6a9207 commit 20d72eb
Showing 1 changed file with 68 additions and 47 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -66,78 +66,99 @@ class DurableStateStoreAdditionalColumnSpec
with TestData
with LogCapturing {

private val customTable = r2dbcSettings.getDurableStateTableWithSchema("CustomEntity")
private def customTable(slice: Int) = r2dbcSettings.getDurableStateTableWithSchema("CustomEntity", slice)

val (createCustomTable, alterCustomTable) = if (r2dbcSettings.dialectName == "sqlserver") {
val create =
s"IF object_id('$customTable') is null SELECT * INTO $customTable FROM durable_state where persistence_id = ''"
val alter = (col: String, colType: String) => {
s"IF COL_LENGTH('$customTable', '$col') IS NULL ALTER TABLE $customTable ADD $col $colType"
}
(create, alter)
} else {
val create = s"create table if not exists $customTable as select * from durable_state where persistence_id = ''"
val alter = (col: String, colType: String) => s"alter table $customTable add if not exists $col $colType"
(create, alter)
private def createCustomTable(slice: Int): String = {
if (r2dbcSettings.dialectName == "sqlserver")
s"IF object_id('${customTable(slice)}') is null SELECT * INTO ${customTable(slice)} FROM ${r2dbcSettings.durableStateTableWithSchema(slice)} where persistence_id = ''"
else
s"create table if not exists ${customTable(slice)} as select * from ${r2dbcSettings.durableStateTableWithSchema(slice)} where persistence_id = ''"
}

private def alterCustomTable(slice: Int, col: String, colType: String): String = {
if (r2dbcSettings.dialectName == "sqlserver")
s"IF COL_LENGTH('${customTable(slice)}', '$col') IS NULL ALTER TABLE ${customTable(slice)} ADD $col $colType"
else
s"alter table ${customTable(slice)} add if not exists $col $colType"
}

override def typedSystem: ActorSystem[_] = system

override def beforeAll(): Unit = {
super.beforeAll()
Await.result(
r2dbcExecutor.executeDdl("beforeAll create durable_state_test")(_.createStatement(createCustomTable)),
20.seconds)
Await.result(
r2dbcExecutor.executeDdl("beforeAll alter durable_state_test")(
_.createStatement(alterCustomTable("col1", "varchar(256)"))),
20.seconds)
Await.result(
r2dbcExecutor.executeDdl("beforeAll alter durable_state_test")(
_.createStatement(alterCustomTable("col2", "int"))),
20.seconds)
Await.result(
r2dbcExecutor.executeDdl("beforeAll alter durable_state_test")(
_.createStatement(alterCustomTable("col3", "int"))),
20.seconds)
Await.result(
r2dbcExecutor.updateOne("beforeAll delete")(_.createStatement(s"delete from $customTable")),
10.seconds)

r2dbcSettings.dataPartitionSliceRanges.foreach { sliceRange =>
val dataPartitionSlice = sliceRange.min
Await.result(
r2dbcExecutor(dataPartitionSlice).executeDdl("beforeAll create durable_state_test")(
_.createStatement(createCustomTable(dataPartitionSlice))),
20.seconds)
Await.result(
r2dbcExecutor(dataPartitionSlice).executeDdl("beforeAll alter durable_state_test")(
_.createStatement(alterCustomTable(dataPartitionSlice, "col1", "varchar(256)"))),
20.seconds)
Await.result(
r2dbcExecutor(dataPartitionSlice).executeDdl("beforeAll alter durable_state_test")(
_.createStatement(alterCustomTable(dataPartitionSlice, "col2", "int"))),
20.seconds)
Await.result(
r2dbcExecutor(dataPartitionSlice).executeDdl("beforeAll alter durable_state_test")(
_.createStatement(alterCustomTable(dataPartitionSlice, "col3", "int"))),
20.seconds)
Await.result(
r2dbcExecutor(dataPartitionSlice).updateOne("beforeAll delete")(
_.createStatement(s"delete from ${customTable(dataPartitionSlice)}")),
10.seconds)
}
}

private val store = DurableStateStoreRegistry(testKit.system)
.durableStateStoreFor[R2dbcDurableStateStore[String]](R2dbcDurableStateStore.Identifier)

private val unusedTag = "n/a"

private def exists(whereCondition: String): Boolean =
r2dbcExecutor
private def exists(slice: Int, whereCondition: String): Boolean =
r2dbcExecutor(slice)
.selectOne("count")(
_.createStatement(s"select count(*) from $customTable where $whereCondition"),
_.createStatement(s"select count(*) from ${customTable(slice)} where $whereCondition"),
row => row.get(0, classOf[lang.Long]).longValue())
.futureValue
.contains(1)

private def existsInCustomTable(persistenceId: String): Boolean =
exists(s"persistence_id = '$persistenceId'")
private def existsInCustomTable(persistenceId: String): Boolean = {
val slice = persistenceExt.sliceForPersistenceId(persistenceId)
exists(slice, s"persistence_id = '$persistenceId'")
}

private def existsMatchingCol1(persistenceId: String, columnValue: String): Boolean =
exists(s"persistence_id = '$persistenceId' and col1 = '$columnValue'")
private def existsMatchingCol1(persistenceId: String, columnValue: String): Boolean = {
val slice = persistenceExt.sliceForPersistenceId(persistenceId)
exists(slice, s"persistence_id = '$persistenceId' and col1 = '$columnValue'")
}

private def existsMatchingCol2(persistenceId: String, columnValue: Int): Boolean =
exists(s"persistence_id = '$persistenceId' and col2 = $columnValue")
private def existsMatchingCol2(persistenceId: String, columnValue: Int): Boolean = {
val slice = persistenceExt.sliceForPersistenceId(persistenceId)
exists(slice, s"persistence_id = '$persistenceId' and col2 = $columnValue")
}

private def existsMatchingCol3(persistenceId: String, columnValue: Int): Boolean =
exists(s"persistence_id = '$persistenceId' and col3 = $columnValue")
private def existsMatchingCol3(persistenceId: String, columnValue: Int): Boolean = {
val slice = persistenceExt.sliceForPersistenceId(persistenceId)
exists(slice, s"persistence_id = '$persistenceId' and col3 = $columnValue")
}

private def existsCol1IsNull(persistenceId: String): Boolean =
exists(s"persistence_id = '$persistenceId' and col1 is null")
private def existsCol1IsNull(persistenceId: String): Boolean = {
val slice = persistenceExt.sliceForPersistenceId(persistenceId)
exists(slice, s"persistence_id = '$persistenceId' and col1 is null")
}

private def existsCol2IsNull(persistenceId: String): Boolean =
exists(s"persistence_id = '$persistenceId' and col2 is null")
private def existsCol2IsNull(persistenceId: String): Boolean = {
val slice = persistenceExt.sliceForPersistenceId(persistenceId)
exists(slice, s"persistence_id = '$persistenceId' and col2 is null")
}

private def existsCol3IsNull(persistenceId: String): Boolean =
exists(s"persistence_id = '$persistenceId' and col3 is null")
private def existsCol3IsNull(persistenceId: String): Boolean = {
val slice = persistenceExt.sliceForPersistenceId(persistenceId)
exists(slice, s"persistence_id = '$persistenceId' and col3 is null")
}

"The R2DBC durable state store" should {

Expand Down

0 comments on commit 20d72eb

Please sign in to comment.