Skip to content

Commit

Permalink
comments from pr
Browse files Browse the repository at this point in the history
  • Loading branch information
sebastian-alfers committed Jan 24, 2024
1 parent a3b0573 commit 288206b
Show file tree
Hide file tree
Showing 5 changed files with 18 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,12 @@ package akka.persistence.r2dbc.internal.codec

import java.time.Instant
import java.time.LocalDateTime
import java.util.TimeZone
import java.time.ZoneId

import io.r2dbc.spi.Row
import io.r2dbc.spi.Statement

import akka.annotation.InternalApi
import akka.persistence.r2dbc.internal.InstantFactory

/**
* INTERNAL API
Expand All @@ -24,7 +23,7 @@ import akka.persistence.r2dbc.internal.InstantFactory
def decode(row: Row, index: Int): Instant

// should we name it just `now()`? The type should not be in the name...
def instantNow(): Instant = InstantFactory.now()
//def instantNow(): Instant = InstantFactory.now()
}

/**
Expand All @@ -43,7 +42,7 @@ import akka.persistence.r2dbc.internal.InstantFactory
case object SqlServerTimestampCodec extends TimestampCodec {

// should this come from config?
private val zone = TimeZone.getTimeZone("UTC").toZoneId
private val zone = ZoneId.of("UTC")

private def toInstant(timestamp: LocalDateTime) =
timestamp.atZone(zone).toInstant
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ import io.r2dbc.spi.Statement
import org.slf4j.Logger
import org.slf4j.LoggerFactory

import akka.persistence.r2dbc.internal.InstantFactory

/**
* INTERNAL API
*/
Expand Down Expand Up @@ -110,7 +112,7 @@ private[r2dbc] class SqlServerDurableStateDao(
.bindTimestamp("@fromTimestamp", fromTimestamp)
stmt.bind("@limit", settings.querySettings.bufferSize)
if (behindCurrentTime > Duration.Zero) {
stmt.bindTimestamp("@now", timestampCodec.instantNow())
stmt.bindTimestamp("@now", InstantFactory.now())
}
toTimestamp.foreach(until => stmt.bindTimestamp("@until", until))
stmt
Expand Down Expand Up @@ -154,7 +156,7 @@ private[r2dbc] class SqlServerDurableStateDao(
}

override protected def bindTimestampNow(stmt: Statement, getAndIncIndex: () => Int): Statement =
stmt.bindTimestamp(getAndIncIndex(), timestampCodec.instantNow())
stmt.bindTimestamp(getAndIncIndex(), InstantFactory.now())

override protected def persistenceIdsForEntityTypeAfterSql(table: String): String =
sql"SELECT TOP(@limit) persistence_id from $table WHERE persistence_id LIKE @persistenceIdLike AND persistence_id > @after ORDER BY persistence_id"
Expand Down Expand Up @@ -194,6 +196,6 @@ private[r2dbc] class SqlServerDurableStateDao(
.bind("@persistenceId", after)
.bind("@limit", limit)

override def currentDbTimestamp(): Future[Instant] = Future.successful(timestampCodec.instantNow())
override def currentDbTimestamp(): Future[Instant] = Future.successful(InstantFactory.now())

}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ import io.r2dbc.spi.Statement
import org.slf4j.Logger
import org.slf4j.LoggerFactory

import akka.persistence.r2dbc.internal.InstantFactory

/**
* INTERNAL API
*/
Expand Down Expand Up @@ -52,7 +54,7 @@ private[r2dbc] class SqlServerJournalDao(settings: R2dbcSettings, connectionFact
VALUES (@slice, @entityType, @persistenceId, @seqNr, @writer, @adapterManifest, @eventSerId, @eventSerManifest, @eventPayload, @tags, @metaSerId, @metaSerManifest, @metaSerPayload, @dbTimestamp)"""

override protected def bindTimestampNow(stmt: Statement, getAndIncIndex: () => Int): Statement =
stmt.bindTimestamp(getAndIncIndex(), timestampCodec.instantNow())
stmt.bindTimestamp(getAndIncIndex(), InstantFactory.now())

override def insertDeleteMarkerSql(timestamp: String): String = super.insertDeleteMarkerSql("?")
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ package akka.persistence.r2dbc.internal.sqlserver

import java.time.Instant
import java.time.LocalDateTime
import java.time.ZoneId
import java.util.Date.UTC

import scala.concurrent.ExecutionContext
import scala.concurrent.Future
Expand Down Expand Up @@ -113,12 +115,9 @@ private[r2dbc] class SqlServerQueryDao(settings: R2dbcSettings, connectionFactor
def toDbTimestampParamCondition =
if (toDbTimestampParam) "AND db_timestamp <= @until" else ""

// we know this is a LocalDateTime, so the cast should be ok
def localNow: LocalDateTime = timestampCodec.encode(timestampCodec.instantNow()).asInstanceOf[LocalDateTime]

def behindCurrentTimeIntervalCondition =
if (behindCurrentTime > Duration.Zero)
s"AND db_timestamp < DATEADD(ms, -${behindCurrentTime.toMillis}, CAST('$localNow' as datetime2(6)))"
s"AND db_timestamp < DATEADD(ms, -${behindCurrentTime.toMillis}, SYSUTCDATETIME())"
else ""

val selectColumns = {
Expand Down Expand Up @@ -202,6 +201,6 @@ private[r2dbc] class SqlServerQueryDao(settings: R2dbcSettings, connectionFactor
override protected val allPersistenceIdsSql: String =
sql"SELECT TOP(@limit) persistence_id FROM (SELECT DISTINCT(persistence_id) from $journalTable) as sub ORDER BY persistence_id"

override def currentDbTimestamp(): Future[Instant] = Future.successful(timestampCodec.instantNow())
override def currentDbTimestamp(): Future[Instant] = Future.successful(InstantFactory.now())

}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ import io.r2dbc.spi.Statement
import org.slf4j.Logger
import org.slf4j.LoggerFactory

import akka.persistence.r2dbc.internal.InstantFactory

/**
* INTERNAL API
*/
Expand Down Expand Up @@ -205,6 +207,6 @@ private[r2dbc] class SqlServerSnapshotDao(settings: R2dbcSettings, connectionFac
ORDER BY db_timestamp, seq_nr
"""

override def currentDbTimestamp(): Future[Instant] = Future.successful(timestampCodec.instantNow())
override def currentDbTimestamp(): Future[Instant] = Future.successful(InstantFactory.now())

}

0 comments on commit 288206b

Please sign in to comment.