From a3b0573c4b036626f3380f39c038e43571d8063c Mon Sep 17 00:00:00 2001 From: sebastian-alfers Date: Mon, 22 Jan 2024 17:58:01 +0100 Subject: [PATCH] comments from pr --- .../r2dbc/internal/codec/TagsCodec.scala | 2 +- .../r2dbc/internal/codec/TimestampCodec.scala | 9 ++----- .../postgres/PostgresDurableStateDao.scala | 8 +++---- .../postgres/PostgresJournalDao.scala | 6 ++--- .../internal/postgres/PostgresQueryDao.scala | 24 +++++++++---------- .../postgres/PostgresSnapshotDao.scala | 6 ++--- .../sqlserver/SqlServerDurableStateDao.scala | 6 ++--- .../sqlserver/SqlServerJournalDao.scala | 3 ++- .../sqlserver/SqlServerQueryDao.scala | 6 +++-- .../sqlserver/SqlServerSnapshotDao.scala | 2 +- .../r2dbc/journal/PersistTagsSpec.scala | 2 +- .../r2dbc/journal/PersistTimestampSpec.scala | 2 +- 12 files changed, 36 insertions(+), 40 deletions(-) diff --git a/core/src/main/scala/akka/persistence/r2dbc/internal/codec/TagsCodec.scala b/core/src/main/scala/akka/persistence/r2dbc/internal/codec/TagsCodec.scala index db1f4b71..b3e49abb 100644 --- a/core/src/main/scala/akka/persistence/r2dbc/internal/codec/TagsCodec.scala +++ b/core/src/main/scala/akka/persistence/r2dbc/internal/codec/TagsCodec.scala @@ -78,6 +78,6 @@ import akka.annotation.InternalApi } implicit class TagsCodecRichRow(val row: Row)(implicit codec: TagsCodec) extends AnyRef { - def getTags(column: String = "tags"): Set[String] = codec.getTags(row, column) + def getTags(column: String): Set[String] = codec.getTags(row, column) } } diff --git a/core/src/main/scala/akka/persistence/r2dbc/internal/codec/TimestampCodec.scala b/core/src/main/scala/akka/persistence/r2dbc/internal/codec/TimestampCodec.scala index 5bf1f42c..8dc40ab0 100644 --- a/core/src/main/scala/akka/persistence/r2dbc/internal/codec/TimestampCodec.scala +++ b/core/src/main/scala/akka/persistence/r2dbc/internal/codec/TimestampCodec.scala @@ -23,9 +23,8 @@ import akka.persistence.r2dbc.internal.InstantFactory def decode(row: Row, name: String): Instant def decode(row: Row, index: Int): Instant + // should we name it just `now()`? The type should not be in the name... def instantNow(): Instant = InstantFactory.now() - - def now[T](): T } /** @@ -38,8 +37,6 @@ import akka.persistence.r2dbc.internal.InstantFactory override def decode(row: Row, index: Int): Instant = row.get(index, classOf[Instant]) override def encode(timestamp: Instant): Any = timestamp - - override def now[T](): T = instantNow().asInstanceOf[T] } object PostgresTimestampCodec extends PostgresTimestampCodec @@ -55,8 +52,6 @@ import akka.persistence.r2dbc.internal.InstantFactory override def encode(timestamp: Instant): LocalDateTime = LocalDateTime.ofInstant(timestamp, zone) - override def now[T](): T = LocalDateTime.ofInstant(instantNow(), zone).asInstanceOf[T] - override def decode(row: Row, index: Int): Instant = toInstant(row.get(index, classOf[LocalDateTime])) } @@ -68,6 +63,6 @@ import akka.persistence.r2dbc.internal.InstantFactory def bindTimestamp(index: Int, timestamp: Instant): Statement = statement.bind(index, codec.encode(timestamp)) } implicit class TimestampCodecRichRow[T](val row: Row)(implicit codec: TimestampCodec) extends AnyRef { - def getTimestamp(index: String = "db_timestamp"): Instant = codec.decode(row, index) + def getTimestamp(index: String): Instant = codec.decode(row, index) } } diff --git a/core/src/main/scala/akka/persistence/r2dbc/internal/postgres/PostgresDurableStateDao.scala b/core/src/main/scala/akka/persistence/r2dbc/internal/postgres/PostgresDurableStateDao.scala index 25a882ae..e3be1619 100644 --- a/core/src/main/scala/akka/persistence/r2dbc/internal/postgres/PostgresDurableStateDao.scala +++ b/core/src/main/scala/akka/persistence/r2dbc/internal/postgres/PostgresDurableStateDao.scala @@ -309,7 +309,7 @@ private[r2dbc] class PostgresDurableStateDao( SerializedStateRow( persistenceId = persistenceId, revision = row.get[java.lang.Long]("revision", classOf[java.lang.Long]), - dbTimestamp = row.getTimestamp(), + dbTimestamp = row.getTimestamp("db_timestamp"), readDbTimestamp = Instant.EPOCH, // not needed here payload = getPayload(row), serId = row.get[Integer]("state_ser_id", classOf[Integer]), @@ -671,7 +671,7 @@ private[r2dbc] class PostgresDurableStateDao( r2dbcExecutor .selectOne("select current db timestamp")( connection => connection.createStatement(currentDbTimestampSql), - row => row.getTimestamp()) + row => row.getTimestamp("db_timestamp")) .map { case Some(time) => time case None => throw new IllegalStateException(s"Expected one row for: $currentDbTimestampSql") @@ -730,7 +730,7 @@ private[r2dbc] class PostgresDurableStateDao( SerializedStateRow( persistenceId = row.get("persistence_id", classOf[String]), revision = row.get[java.lang.Long]("revision", classOf[java.lang.Long]), - dbTimestamp = row.getTimestamp(), + dbTimestamp = row.getTimestamp("db_timestamp"), readDbTimestamp = row.getTimestamp("read_db_timestamp"), // payload = null => lazy loaded for backtracking (ugly, but not worth changing UpdatedDurableState in Akka) // payload = None => DeletedDurableState (no lazy loading) @@ -743,7 +743,7 @@ private[r2dbc] class PostgresDurableStateDao( SerializedStateRow( persistenceId = row.get("persistence_id", classOf[String]), revision = row.get[java.lang.Long]("revision", classOf[java.lang.Long]), - dbTimestamp = row.getTimestamp(), + dbTimestamp = row.getTimestamp("db_timestamp"), readDbTimestamp = row.getTimestamp("read_db_timestamp"), payload = getPayload(row), serId = row.get[Integer]("state_ser_id", classOf[Integer]), diff --git a/core/src/main/scala/akka/persistence/r2dbc/internal/postgres/PostgresJournalDao.scala b/core/src/main/scala/akka/persistence/r2dbc/internal/postgres/PostgresJournalDao.scala index 78f09304..695f942e 100644 --- a/core/src/main/scala/akka/persistence/r2dbc/internal/postgres/PostgresJournalDao.scala +++ b/core/src/main/scala/akka/persistence/r2dbc/internal/postgres/PostgresJournalDao.scala @@ -170,7 +170,7 @@ private[r2dbc] class PostgresJournalDao(journalSettings: R2dbcSettings, connecti val result = r2dbcExecutor.updateOneReturning(s"insert [$persistenceId]")( connection => bindInsertStatement(connection.createStatement(insertSql), events.head, useTimestampFromDb, previousSeqNr), - row => row.getTimestamp()) + row => row.getTimestamp("db_timestamp")) if (log.isDebugEnabled()) result.foreach { _ => log.debug("Wrote [{}] events for persistenceId [{}]", 1, persistenceId) @@ -183,7 +183,7 @@ private[r2dbc] class PostgresJournalDao(journalSettings: R2dbcSettings, connecti stmt.add() bindInsertStatement(stmt, write, useTimestampFromDb, previousSeqNr) }, - row => row.getTimestamp()) + row => row.getTimestamp("db_timestamp")) if (log.isDebugEnabled()) result.foreach { _ => log.debug("Wrote [{}] events for persistenceId [{}]", totalEvents, persistenceId) @@ -204,7 +204,7 @@ private[r2dbc] class PostgresJournalDao(journalSettings: R2dbcSettings, connecti else insertEventWithParameterTimestampSql val stmt = bindInsertStatement(connection.createStatement(insertSql), event, useTimestampFromDb, previousSeqNr) - val result = R2dbcExecutor.updateOneReturningInTx(stmt, row => row.getTimestamp()) + val result = R2dbcExecutor.updateOneReturningInTx(stmt, row => row.getTimestamp("db_timestamp")) if (log.isDebugEnabled()) result.foreach { _ => log.debug("Wrote [{}] event for persistenceId [{}]", 1, persistenceId) diff --git a/core/src/main/scala/akka/persistence/r2dbc/internal/postgres/PostgresQueryDao.scala b/core/src/main/scala/akka/persistence/r2dbc/internal/postgres/PostgresQueryDao.scala index 0ce856b1..373a1c01 100644 --- a/core/src/main/scala/akka/persistence/r2dbc/internal/postgres/PostgresQueryDao.scala +++ b/core/src/main/scala/akka/persistence/r2dbc/internal/postgres/PostgresQueryDao.scala @@ -171,15 +171,13 @@ private[r2dbc] class PostgresQueryDao(settings: R2dbcSettings, connectionFactory r2dbcExecutor .selectOne("select current db timestamp")( connection => connection.createStatement(currentDbTimestampSql), - row => row.getTimestamp()) + row => row.getTimestamp("db_timestamp")) .map { case Some(time) => time case None => throw new IllegalStateException(s"Expected one row for: $currentDbTimestampSql") } } - //protected def tagsFromDb(row: Row, columnName: String): Set[String] = row.getTags(columnName) - protected def bindEventsBySlicesRangeSql( stmt: Statement, entityType: String, @@ -224,13 +222,13 @@ private[r2dbc] class PostgresQueryDao(settings: R2dbcSettings, connectionFactory entityType, persistenceId = row.get("persistence_id", classOf[String]), seqNr = row.get[java.lang.Long]("seq_nr", classOf[java.lang.Long]), - dbTimestamp = row.getTimestamp(), + dbTimestamp = row.getTimestamp("db_timestamp"), readDbTimestamp = row.getTimestamp("read_db_timestamp"), payload = None, // lazy loaded for backtracking serId = row.get[Integer]("event_ser_id", classOf[Integer]), serManifest = "", writerUuid = "", // not need in this query - tags = row.getTags(), + tags = row.getTags("tags"), metadata = None) else SerializedJournalRow( @@ -238,13 +236,13 @@ private[r2dbc] class PostgresQueryDao(settings: R2dbcSettings, connectionFactory entityType, persistenceId = row.get("persistence_id", classOf[String]), seqNr = row.get[java.lang.Long]("seq_nr", classOf[java.lang.Long]), - dbTimestamp = row.getTimestamp(), + dbTimestamp = row.getTimestamp("db_timestamp"), readDbTimestamp = row.getTimestamp("read_db_timestamp"), payload = Some(row.getPayload("event_payload")), serId = row.get[Integer]("event_ser_id", classOf[Integer]), serManifest = row.get("event_ser_manifest", classOf[String]), writerUuid = "", // not need in this query - tags = row.getTags(), + tags = row.getTags("tags"), metadata = readMetadata(row))) if (log.isDebugEnabled) @@ -258,7 +256,7 @@ private[r2dbc] class PostgresQueryDao(settings: R2dbcSettings, connectionFactory entityType: String, fromTimestamp: Instant, toTimestamp: Instant, - limit: Int): _root_.io.r2dbc.spi.Statement = { + limit: Int): Statement = { stmt .bind(0, entityType) .bindTimestamp(1, fromTimestamp) @@ -313,7 +311,7 @@ private[r2dbc] class PostgresQueryDao(settings: R2dbcSettings, connectionFactory .createStatement(selectTimestampOfEventSql) .bind(0, persistenceId) .bind(1, seqNr), - row => row.getTimestamp()) + row => row.getTimestamp("db_timestamp")) } override def loadEvent( @@ -338,13 +336,13 @@ private[r2dbc] class PostgresQueryDao(settings: R2dbcSettings, connectionFactory entityType = row.get("entity_type", classOf[String]), persistenceId, seqNr, - dbTimestamp = row.getTimestamp(), + dbTimestamp = row.getTimestamp("db_timestamp"), readDbTimestamp = row.getTimestamp("read_db_timestamp"), payload, serId = row.get[Integer]("event_ser_id", classOf[Integer]), serManifest = row.get("event_ser_manifest", classOf[String]), writerUuid = "", // not need in this query - tags = row.getTags(), + tags = row.getTags("tags"), metadata = readMetadata(row)) }) @@ -364,13 +362,13 @@ private[r2dbc] class PostgresQueryDao(settings: R2dbcSettings, connectionFactory entityType = row.get("entity_type", classOf[String]), persistenceId = row.get("persistence_id", classOf[String]), seqNr = row.get[java.lang.Long]("seq_nr", classOf[java.lang.Long]), - dbTimestamp = row.getTimestamp(), + dbTimestamp = row.getTimestamp("db_timestamp"), readDbTimestamp = row.getTimestamp("read_db_timestamp"), payload = Some(row.getPayload("event_payload")), serId = row.get[Integer]("event_ser_id", classOf[Integer]), serManifest = row.get("event_ser_manifest", classOf[String]), writerUuid = row.get("writer", classOf[String]), - tags = row.getTags(), + tags = row.getTags("tags"), metadata = readMetadata(row))) if (log.isDebugEnabled) diff --git a/core/src/main/scala/akka/persistence/r2dbc/internal/postgres/PostgresSnapshotDao.scala b/core/src/main/scala/akka/persistence/r2dbc/internal/postgres/PostgresSnapshotDao.scala index 581315fa..c2c684b1 100644 --- a/core/src/main/scala/akka/persistence/r2dbc/internal/postgres/PostgresSnapshotDao.scala +++ b/core/src/main/scala/akka/persistence/r2dbc/internal/postgres/PostgresSnapshotDao.scala @@ -204,7 +204,7 @@ private[r2dbc] class PostgresSnapshotDao(settings: R2dbcSettings, connectionFact // db_timestamp and tags columns were added in 1.2.0 val dbTimestamp = if (settings.querySettings.startFromSnapshotEnabled) - row.getTimestamp() match { + row.getTimestamp("db_timestamp") match { case null => Instant.ofEpochMilli(writeTimestamp) case t => t } @@ -212,7 +212,7 @@ private[r2dbc] class PostgresSnapshotDao(settings: R2dbcSettings, connectionFact Instant.ofEpochMilli(writeTimestamp) val tags = if (settings.querySettings.startFromSnapshotEnabled) - row.getTags() + row.getTags("tags") else Set.empty[String] @@ -354,7 +354,7 @@ private[r2dbc] class PostgresSnapshotDao(settings: R2dbcSettings, connectionFact r2dbcExecutor .selectOne("select current db timestamp")( connection => connection.createStatement(currentDbTimestampSql), - row => row.getTimestamp()) + row => row.getTimestamp("db_timestamp")) .map { case Some(time) => time case None => throw new IllegalStateException(s"Expected one row for: $currentDbTimestampSql") diff --git a/core/src/main/scala/akka/persistence/r2dbc/internal/sqlserver/SqlServerDurableStateDao.scala b/core/src/main/scala/akka/persistence/r2dbc/internal/sqlserver/SqlServerDurableStateDao.scala index 5e1ddd70..fe6e386d 100644 --- a/core/src/main/scala/akka/persistence/r2dbc/internal/sqlserver/SqlServerDurableStateDao.scala +++ b/core/src/main/scala/akka/persistence/r2dbc/internal/sqlserver/SqlServerDurableStateDao.scala @@ -110,7 +110,7 @@ private[r2dbc] class SqlServerDurableStateDao( .bindTimestamp("@fromTimestamp", fromTimestamp) stmt.bind("@limit", settings.querySettings.bufferSize) if (behindCurrentTime > Duration.Zero) { - stmt.bind("@now", timestampCodec.now()) + stmt.bindTimestamp("@now", timestampCodec.instantNow()) } toTimestamp.foreach(until => stmt.bindTimestamp("@until", until)) stmt @@ -154,7 +154,7 @@ private[r2dbc] class SqlServerDurableStateDao( } override protected def bindTimestampNow(stmt: Statement, getAndIncIndex: () => Int): Statement = - stmt.bind(getAndIncIndex(), timestampCodec.now()) + stmt.bindTimestamp(getAndIncIndex(), timestampCodec.instantNow()) override protected def persistenceIdsForEntityTypeAfterSql(table: String): String = sql"SELECT TOP(@limit) persistence_id from $table WHERE persistence_id LIKE @persistenceIdLike AND persistence_id > @after ORDER BY persistence_id" @@ -194,6 +194,6 @@ private[r2dbc] class SqlServerDurableStateDao( .bind("@persistenceId", after) .bind("@limit", limit) - override def currentDbTimestamp(): Future[Instant] = Future.successful(timestampCodec.now()) + override def currentDbTimestamp(): Future[Instant] = Future.successful(timestampCodec.instantNow()) } diff --git a/core/src/main/scala/akka/persistence/r2dbc/internal/sqlserver/SqlServerJournalDao.scala b/core/src/main/scala/akka/persistence/r2dbc/internal/sqlserver/SqlServerJournalDao.scala index 832832bb..30798b68 100644 --- a/core/src/main/scala/akka/persistence/r2dbc/internal/sqlserver/SqlServerJournalDao.scala +++ b/core/src/main/scala/akka/persistence/r2dbc/internal/sqlserver/SqlServerJournalDao.scala @@ -10,6 +10,7 @@ import akka.actor.typed.ActorSystem import akka.annotation.InternalApi import akka.persistence.r2dbc.R2dbcSettings import akka.persistence.r2dbc.internal.Sql.Interpolation +import akka.persistence.r2dbc.internal.codec.TimestampCodec.TimestampCodecRichStatement import akka.persistence.r2dbc.internal.postgres.PostgresJournalDao import io.r2dbc.spi.ConnectionFactory import io.r2dbc.spi.Statement @@ -51,7 +52,7 @@ private[r2dbc] class SqlServerJournalDao(settings: R2dbcSettings, connectionFact VALUES (@slice, @entityType, @persistenceId, @seqNr, @writer, @adapterManifest, @eventSerId, @eventSerManifest, @eventPayload, @tags, @metaSerId, @metaSerManifest, @metaSerPayload, @dbTimestamp)""" override protected def bindTimestampNow(stmt: Statement, getAndIncIndex: () => Int): Statement = - stmt.bind(getAndIncIndex(), timestampCodec.now()) + stmt.bindTimestamp(getAndIncIndex(), timestampCodec.instantNow()) override def insertDeleteMarkerSql(timestamp: String): String = super.insertDeleteMarkerSql("?") } diff --git a/core/src/main/scala/akka/persistence/r2dbc/internal/sqlserver/SqlServerQueryDao.scala b/core/src/main/scala/akka/persistence/r2dbc/internal/sqlserver/SqlServerQueryDao.scala index 00db7ecd..68a532fd 100644 --- a/core/src/main/scala/akka/persistence/r2dbc/internal/sqlserver/SqlServerQueryDao.scala +++ b/core/src/main/scala/akka/persistence/r2dbc/internal/sqlserver/SqlServerQueryDao.scala @@ -17,6 +17,7 @@ import akka.annotation.InternalApi import akka.persistence.r2dbc.R2dbcSettings import akka.persistence.r2dbc.internal.InstantFactory import akka.persistence.r2dbc.internal.Sql.Interpolation +import akka.persistence.r2dbc.internal.codec.TimestampCodec.SqlServerTimestampCodec import akka.persistence.r2dbc.internal.codec.TimestampCodec.TimestampCodecRichStatement import akka.persistence.r2dbc.internal.postgres.PostgresQueryDao import io.r2dbc.spi.ConnectionFactory @@ -94,7 +95,7 @@ private[r2dbc] class SqlServerQueryDao(settings: R2dbcSettings, connectionFactor entityType: String, fromTimestamp: Instant, toTimestamp: Instant, - limit: Int): _root_.io.r2dbc.spi.Statement = { + limit: Int): Statement = { stmt .bind("@limit", limit) .bind("@entityType", entityType) @@ -112,7 +113,8 @@ private[r2dbc] class SqlServerQueryDao(settings: R2dbcSettings, connectionFactor def toDbTimestampParamCondition = if (toDbTimestampParam) "AND db_timestamp <= @until" else "" - def localNow: LocalDateTime = timestampCodec.now[LocalDateTime]() + // we know this is a LocalDateTime, so the cast should be ok + def localNow: LocalDateTime = timestampCodec.encode(timestampCodec.instantNow()).asInstanceOf[LocalDateTime] def behindCurrentTimeIntervalCondition = if (behindCurrentTime > Duration.Zero) diff --git a/core/src/main/scala/akka/persistence/r2dbc/internal/sqlserver/SqlServerSnapshotDao.scala b/core/src/main/scala/akka/persistence/r2dbc/internal/sqlserver/SqlServerSnapshotDao.scala index 166c7bbe..8408466e 100644 --- a/core/src/main/scala/akka/persistence/r2dbc/internal/sqlserver/SqlServerSnapshotDao.scala +++ b/core/src/main/scala/akka/persistence/r2dbc/internal/sqlserver/SqlServerSnapshotDao.scala @@ -205,6 +205,6 @@ private[r2dbc] class SqlServerSnapshotDao(settings: R2dbcSettings, connectionFac ORDER BY db_timestamp, seq_nr """ - override def currentDbTimestamp(): Future[Instant] = Future.successful(timestampCodec.now()) + override def currentDbTimestamp(): Future[Instant] = Future.successful(timestampCodec.instantNow()) } diff --git a/core/src/test/scala/akka/persistence/r2dbc/journal/PersistTagsSpec.scala b/core/src/test/scala/akka/persistence/r2dbc/journal/PersistTagsSpec.scala index 50984c4c..914ee1c2 100644 --- a/core/src/test/scala/akka/persistence/r2dbc/journal/PersistTagsSpec.scala +++ b/core/src/test/scala/akka/persistence/r2dbc/journal/PersistTagsSpec.scala @@ -62,7 +62,7 @@ class PersistTagsSpec Row( pid = row.get("persistence_id", classOf[String]), seqNr = row.get[java.lang.Long]("seq_nr", classOf[java.lang.Long]), - row.getTags())) + row.getTags("tags"))) .futureValue rows.foreach { case Row(pid, _, tags) => diff --git a/core/src/test/scala/akka/persistence/r2dbc/journal/PersistTimestampSpec.scala b/core/src/test/scala/akka/persistence/r2dbc/journal/PersistTimestampSpec.scala index 29ae0a61..694a10d2 100644 --- a/core/src/test/scala/akka/persistence/r2dbc/journal/PersistTimestampSpec.scala +++ b/core/src/test/scala/akka/persistence/r2dbc/journal/PersistTimestampSpec.scala @@ -87,7 +87,7 @@ class PersistTimestampSpec Row( pid = row.get("persistence_id", classOf[String]), seqNr = row.get[java.lang.Long]("seq_nr", classOf[java.lang.Long]), - dbTimestamp = row.getTimestamp(), + dbTimestamp = row.getTimestamp("db_timestamp"), event) }) .futureValue