Skip to content

Commit

Permalink
fix: Don't call change handler if optimistic lock fails (#484)
Browse files Browse the repository at this point in the history
* the check and throw is outside the transaction
* I don't see the need for rollback, though
  • Loading branch information
patriknw authored Dec 7, 2023
1 parent f79bb14 commit 2a39adc
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -367,7 +367,7 @@ private[r2dbc] class PostgresDurableStateDao(settings: R2dbcSettings, connection
r2dbcExecutor.withConnection(s"insert [${state.persistenceId}] with change handler") { connection =>
for {
updatedRows <- recoverDataIntegrityViolation(R2dbcExecutor.updateOneInTx(insertStatement(connection)))
_ <- processChange(handler, connection, change)
_ <- if (updatedRows == 1) processChange(handler, connection, change) else FutureDone
} yield updatedRows
}
}
Expand Down Expand Up @@ -412,7 +412,7 @@ private[r2dbc] class PostgresDurableStateDao(settings: R2dbcSettings, connection
r2dbcExecutor.withConnection(s"update [${state.persistenceId}] with change handler") { connection =>
for {
updatedRows <- R2dbcExecutor.updateOneInTx(updateStatement(connection))
_ <- processChange(handler, connection, change)
_ <- if (updatedRows == 1) processChange(handler, connection, change) else FutureDone
} yield updatedRows
}
}
Expand Down Expand Up @@ -492,7 +492,7 @@ private[r2dbc] class PostgresDurableStateDao(settings: R2dbcSettings, connection
R2dbcExecutor.updateOneInTx(insertDeleteMarkerStatement(connection)))
_ <- changeHandler match {
case None => FutureDone
case Some(handler) => processChange(handler, connection, change)
case Some(handler) => if (updatedRows == 1) processChange(handler, connection, change) else FutureDone
}
} yield updatedRows
}
Expand Down Expand Up @@ -539,7 +539,7 @@ private[r2dbc] class PostgresDurableStateDao(settings: R2dbcSettings, connection
updatedRows <- R2dbcExecutor.updateOneInTx(updateStatement(connection))
_ <- changeHandler match {
case None => FutureDone
case Some(handler) => processChange(handler, connection, change)
case Some(handler) => if (updatedRows == 1) processChange(handler, connection, change) else FutureDone
}
} yield updatedRows
}
Expand Down Expand Up @@ -575,8 +575,11 @@ private[r2dbc] class PostgresDurableStateDao(settings: R2dbcSettings, connection
_ <- changeHandler match {
case None => FutureDone
case Some(handler) =>
val change = new DeletedDurableState[Any](persistenceId, 0L, NoOffset, EmptyDbTimestamp.toEpochMilli)
processChange(handler, connection, change)
if (updatedRows == 1) {
val change = new DeletedDurableState[Any](persistenceId, 0L, NoOffset, EmptyDbTimestamp.toEpochMilli)
processChange(handler, connection, change)
} else
FutureDone
}
} yield updatedRows
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,12 +103,15 @@ class DurableStateStoreChangeHandlerSpec
private val unusedTag = "n/a"

private def exists(whereCondition: String): Boolean =
count(whereCondition) >= 1

private def count(whereCondition: String): Long =
r2dbcExecutor
.selectOne("count")(
_.createStatement(s"select count(*) from $anotherTable where $whereCondition"),
row => row.get(0, classOf[java.lang.Long]).longValue())
.futureValue
.contains(1)
.getOrElse(0L)

"The R2DBC durable state store change handler" should {

Expand Down Expand Up @@ -180,6 +183,23 @@ class DurableStateStoreChangeHandlerSpec
exists(s"pid = '$persistenceId' and rev = 2") should be(false)
}

"not be invoked when wrong revision" in {
val entityType = "CustomEntity"
val persistenceId = nextPid(entityType)
val value = "Genuinely Collaborative"
store.upsertObject(persistenceId, 1L, value, unusedTag).futureValue
count(s"pid = '$persistenceId'") should be(1L)

store.upsertObject(persistenceId, 1L, value, unusedTag).failed.futureValue
count(s"pid = '$persistenceId'") should be(1L) // not called (or rolled back)

val updatedValue = "Open to Feedback"
store.upsertObject(persistenceId, 2L, updatedValue, unusedTag).futureValue
count(s"pid = '$persistenceId'") should be(2L)
store.upsertObject(persistenceId, 2L, updatedValue, unusedTag).failed.futureValue
count(s"pid = '$persistenceId'") should be(2L) // not called (or rolled back)
}

"support javadsl.ChangeHandler" in {
val entityType = "JavadslCustomEntity"
val persistenceId = nextPid(entityType)
Expand Down

0 comments on commit 2a39adc

Please sign in to comment.