From 62f7f2340db3d27614db8217478a52e5bd0f2803 Mon Sep 17 00:00:00 2001 From: sebastian-alfers Date: Mon, 5 Feb 2024 16:38:31 +0100 Subject: [PATCH 1/2] feat: migration for SQL Server --- build.sbt | 2 +- docs/src/main/paradox/migration.md | 28 +- .../application-sqlserver-example.conf | 45 ++++ .../test/resources/application-sqlserver.conf | 1 + .../src/test/resources/logback-main.xml | 2 +- .../src/test/resources/logback-test.xml | 24 ++ .../r2dbc/migration/MigrationToolSpec.scala | 239 ++++++++++++------ .../r2dbc/migration/MigrationTool.scala | 8 +- .../r2dbc/migration/MigrationToolDao.scala | 80 +++--- .../migration/SqlServerMigrationToolDao.scala | 69 +++++ project/Dependencies.scala | 6 +- 11 files changed, 380 insertions(+), 124 deletions(-) create mode 100644 migration-tests/src/test/resources/application-sqlserver-example.conf create mode 100644 migration-tests/src/test/resources/application-sqlserver.conf create mode 100644 migration-tests/src/test/resources/logback-test.xml create mode 100644 migration/src/main/scala/akka/persistence/r2dbc/migration/SqlServerMigrationToolDao.scala diff --git a/build.sbt b/build.sbt index 56c4a774..54577bdb 100644 --- a/build.sbt +++ b/build.sbt @@ -160,7 +160,7 @@ lazy val docs = project "scaladoc.akka.base_url" -> s"https://doc.akka.io/api/akka/${Dependencies.AkkaVersionInDocs}/", "javadoc.akka.base_url" -> s"https://doc.akka.io/japi/akka/${Dependencies.AkkaVersionInDocs}/", "scaladoc.com.typesafe.config.base_url" -> s"https://lightbend.github.io/config/latest/api/", - "sqlserver.version" -> Dependencies.SqlServerVersion), + "sqlserver.version" -> Dependencies.SqlServerR2dbcVersion), ApidocPlugin.autoImport.apidocRootPackage := "akka", apidocRootPackage := "akka", resolvers += Resolver.jcenterRepo, diff --git a/docs/src/main/paradox/migration.md b/docs/src/main/paradox/migration.md index 1fd36b3b..2836ef38 100644 --- a/docs/src/main/paradox/migration.md +++ b/docs/src/main/paradox/migration.md @@ -38,7 +38,8 @@ re-run the full migration. It's recommended that you create the `migration_progress` table before running the migration tool, but if it doesn't exist the tool will try to create the table. -```sql +Postgres: +: ```sql CREATE TABLE IF NOT EXISTS migration_progress( persistence_id VARCHAR(255) NOT NULL, event_seq_nr BIGINT, @@ -47,6 +48,23 @@ CREATE TABLE IF NOT EXISTS migration_progress( PRIMARY KEY(persistence_id) ``` +SQLServer: +: ```sql +IF object_id('migration_progress') is null + CREATE TABLE migration_progress( + persistence_id NVARCHAR(255) NOT NULL, + event_seq_nr BIGINT, + snapshot_seq_nr BIGINT, + state_revision BIGINT, + PRIMARY KEY(persistence_id) +``` + +@@@ warning { .group-sqlserver } + +The SQL Server dialect is marked `experimental` and not yet production ready until various [issues](https://github.com/akka/akka-persistence-r2dbc/issues?q=is%3Aopen+label%3Asqlserver+label%3Abug) with the integration of the `r2dbc-mssql` plugin have been resolved. + +@@@ + ## Running The migration tool can be run as main class `akka.persistence.r2dbc.migration.MigrationTool` provided by the above @@ -60,9 +78,13 @@ Durable State is not migrated by `MigrationTool.migrateAll`, instead you need to ## Configuration -You need to provide configuration for the source persistence plugin and the target Rd2BC plugin in your `application.conf`. An example of such configuration for migration from Akka Persistence JDBC: +You need to provide configuration for the source persistence plugin and the target Rd2BC plugin in your `application.conf`. An example of such configuration for migration from Akka Persistence JDBC: + +Postgres: +: @@snip [application-postgres.conf](/migration-tests/src/test/resources/application-postgres-example.conf) -@@snip [application-postgres.conf](/migration-tests/src/test/resources/application-postgres.conf) +SQLServer: +: @@snip [application-sqlserver.conf](/migration-tests/src/test/resources/application-sqlserver-example.conf) @@@ note diff --git a/migration-tests/src/test/resources/application-sqlserver-example.conf b/migration-tests/src/test/resources/application-sqlserver-example.conf new file mode 100644 index 00000000..ade91a0b --- /dev/null +++ b/migration-tests/src/test/resources/application-sqlserver-example.conf @@ -0,0 +1,45 @@ +akka.persistence.r2dbc.migration { + source { + query-plugin-id = "jdbc-read-journal" + snapshot-plugin-id = "jdbc-snapshot-store" + } +} + +akka.persistence.r2dbc.connection-factory = ${akka.persistence.r2dbc.sqlserver} +akka.persistence.r2dbc.connection-factory { + host = "localhost" + port = 1433 + database = "your_db" + user = "your_user" + password = "your_password" +} + +akka-persistence-jdbc { + shared-databases { + default { + profile = "slick.jdbc.SQLServerProfile$" + db { + url = "jdbc:sqlserver://"127.0.0.1":1433;databaseName=master;integratedSecurity=false;" + user = "user" + password = "password" + driver = "com.microsoft.sqlserver.jdbc.SQLServerDriver" + numThreads = 5 + maxConnections = 5 + minConnections = 1 + } + } + } +} + +jdbc-journal { + use-shared-db = "default" +} +jdbc-snapshot-store { + use-shared-db = "default" +} +jdbc-read-journal { + use-shared-db = "default" +} + +# application specific serializers for events and snapshots +# must also be configured and included in classpath diff --git a/migration-tests/src/test/resources/application-sqlserver.conf b/migration-tests/src/test/resources/application-sqlserver.conf new file mode 100644 index 00000000..524cf3da --- /dev/null +++ b/migration-tests/src/test/resources/application-sqlserver.conf @@ -0,0 +1 @@ +akka.persistence.r2dbc.connection-factory = ${akka.persistence.r2dbc.sqlserver} \ No newline at end of file diff --git a/migration-tests/src/test/resources/logback-main.xml b/migration-tests/src/test/resources/logback-main.xml index a3eaf119..4d28c4de 100644 --- a/migration-tests/src/test/resources/logback-main.xml +++ b/migration-tests/src/test/resources/logback-main.xml @@ -18,4 +18,4 @@ - + \ No newline at end of file diff --git a/migration-tests/src/test/resources/logback-test.xml b/migration-tests/src/test/resources/logback-test.xml new file mode 100644 index 00000000..57820d28 --- /dev/null +++ b/migration-tests/src/test/resources/logback-test.xml @@ -0,0 +1,24 @@ + + + + + + [%date{ISO8601}] [%level] [%logger] [%X{akkaAddress}] [%marker] [%thread] - %msg%n + + + + + + + + + + + + + + + + + + diff --git a/migration-tests/src/test/scala/akka/persistence/r2dbc/migration/MigrationToolSpec.scala b/migration-tests/src/test/scala/akka/persistence/r2dbc/migration/MigrationToolSpec.scala index 450b3219..c4cceeff 100644 --- a/migration-tests/src/test/scala/akka/persistence/r2dbc/migration/MigrationToolSpec.scala +++ b/migration-tests/src/test/scala/akka/persistence/r2dbc/migration/MigrationToolSpec.scala @@ -21,25 +21,51 @@ import com.typesafe.config.ConfigFactory import org.scalatest.wordspec.AnyWordSpecLike import akka.persistence.r2dbc.TestActors.DurableStatePersister +import akka.persistence.r2dbc.migration.MigrationToolSpec.dialect object MigrationToolSpec { - val config: Config = ConfigFactory - .parseString(""" + + private val testConfig = TestConfig.config + + private val dialect = testConfig.getString("akka.persistence.r2dbc.connection-factory.dialect") + private val dbProfile = if (dialect == "sqlserver") { + """ + default { + profile = "slick.jdbc.SQLServerProfile$" + db { + url = "jdbc:sqlserver://"127.0.0.1":1433;databaseName=master;integratedSecurity=false;" + user = "SA" + password = "" + driver = "com.microsoft.sqlserver.jdbc.SQLServerDriver" + numThreads = 5 + maxConnections = 5 + minConnections = 1 + } + } + """ + } else { + """ + default { + profile = "slick.jdbc.PostgresProfile$" + db { + host = "127.0.0.1" + url = "jdbc:postgresql://127.0.0.1:5432/postgres?reWriteBatchedInserts=true" + user = postgres + password = postgres + driver = "org.postgresql.Driver" + numThreads = 20 + maxConnections = 20 + minConnections = 5 + } + } + """ + } + + private val config: Config = ConfigFactory + .parseString(s""" akka-persistence-jdbc { shared-databases { - default { - profile = "slick.jdbc.PostgresProfile$" - db { - host = "127.0.0.1" - url = "jdbc:postgresql://127.0.0.1:5432/postgres?reWriteBatchedInserts=true" - user = postgres - password = postgres - driver = "org.postgresql.Driver" - numThreads = 20 - maxConnections = 20 - minConnections = 5 - } - } + $dbProfile } } @@ -62,7 +88,8 @@ object MigrationToolSpec { akka.persistence.r2dbc.state.assert-single-writer = off """) - .withFallback(TestConfig.config) + .withFallback(testConfig) + } class MigrationToolSpec @@ -83,10 +110,89 @@ class MigrationToolSpec private val migration = new MigrationTool(system) - private val testEnabled: Boolean = { - // don't run this for Yugabyte since it is using akka-persistence-jdbc - system.settings.config.getString("akka.persistence.r2dbc.connection-factory.dialect") == "postgres" - } + // don't run this for Yugabyte since it is using akka-persistence-jdbc + private val postgresTest = dialect == "postgres" + private val sqlServerTest = dialect == "sqlserver" + private val testEnabled = postgresTest || sqlServerTest + + private val createJournalTablePostgres = + """CREATE TABLE IF NOT EXISTS jdbc_event_journal( + | ordering BIGSERIAL, + | persistence_id VARCHAR(255) NOT NULL, + | sequence_number BIGINT NOT NULL, + | deleted BOOLEAN DEFAULT FALSE NOT NULL, + | + | writer VARCHAR(255) NOT NULL, + | write_timestamp BIGINT, + | adapter_manifest VARCHAR(255), + | + | event_ser_id INTEGER NOT NULL, + | event_ser_manifest VARCHAR(255) NOT NULL, + | event_payload BYTEA NOT NULL, + | + | meta_ser_id INTEGER, + | meta_ser_manifest VARCHAR(255), + | meta_payload BYTEA, + | + | PRIMARY KEY(persistence_id, sequence_number) + |)""".stripMargin + + private val createJournalTableSqlServer = + """IF object_id('jdbc_event_journal') is null + |CREATE TABLE "jdbc_event_journal" ( + | "ordering" BIGINT IDENTITY(1,1) NOT NULL, + | "deleted" BIT DEFAULT 0 NOT NULL, + | "persistence_id" NVARCHAR(255) NOT NULL, + | "sequence_number" NUMERIC(10,0) NOT NULL, + | "writer" NVARCHAR(255) NOT NULL, + | "write_timestamp" BIGINT NOT NULL, + | "adapter_manifest" NVARCHAR(MAX) NOT NULL, + | "event_payload" VARBINARY(MAX) NOT NULL, + | "event_ser_id" INTEGER NOT NULL, + | "event_ser_manifest" NVARCHAR(MAX) NOT NULL, + | "meta_payload" VARBINARY(MAX), + | "meta_ser_id" INTEGER, + | "meta_ser_manifest" NVARCHAR(MAX) + | PRIMARY KEY ("persistence_id", "sequence_number") + |)""".stripMargin + + private val createSnapshotTablePostgres = + """CREATE TABLE IF NOT EXISTS jdbc_snapshot ( + | persistence_id VARCHAR(255) NOT NULL, + | sequence_number BIGINT NOT NULL, + | created BIGINT NOT NULL, + | + | snapshot_ser_id INTEGER NOT NULL, + | snapshot_ser_manifest VARCHAR(255) NOT NULL, + | snapshot_payload BYTEA NOT NULL, + | + | meta_ser_id INTEGER, + | meta_ser_manifest VARCHAR(255), + | meta_payload BYTEA, + | + | PRIMARY KEY(persistence_id, sequence_number) + |)""".stripMargin + + private val createSnapshotTableSqlServer = + """IF object_id('jdbc_snapshot') is null + |CREATE TABLE "jdbc_snapshot" ( + | "persistence_id" NVARCHAR(255) NOT NULL, + | "sequence_number" NUMERIC(10,0) NOT NULL, + | "created" BIGINT NOT NULL, + | "snapshot_ser_id" INTEGER NOT NULL, + | "snapshot_ser_manifest" NVARCHAR(255) NOT NULL, + | "snapshot_payload" VARBINARY(MAX) NOT NULL, + | "meta_ser_id" INTEGER, + | "meta_ser_manifest" NVARCHAR(255), + | "meta_payload" VARBINARY(MAX), + | PRIMARY KEY ("persistence_id", "sequence_number") + | ) + |""".stripMargin + + private val createJournalTableSql = + if (dialect == "sqlserver") createJournalTableSqlServer else createJournalTablePostgres + private val createSnapshotTableSql = + if (dialect == "sqlserver") createSnapshotTableSqlServer else createSnapshotTablePostgres override protected def beforeAll(): Unit = { super.beforeAll() @@ -94,64 +200,33 @@ class MigrationToolSpec if (testEnabled) { Await.result( r2dbcExecutor.executeDdl("beforeAll create jdbc tables") { connection => - connection.createStatement("""CREATE TABLE IF NOT EXISTS jdbc_event_journal( - | ordering BIGSERIAL, - | persistence_id VARCHAR(255) NOT NULL, - | sequence_number BIGINT NOT NULL, - | deleted BOOLEAN DEFAULT FALSE NOT NULL, - | - | writer VARCHAR(255) NOT NULL, - | write_timestamp BIGINT, - | adapter_manifest VARCHAR(255), - | - | event_ser_id INTEGER NOT NULL, - | event_ser_manifest VARCHAR(255) NOT NULL, - | event_payload BYTEA NOT NULL, - | - | meta_ser_id INTEGER, - | meta_ser_manifest VARCHAR(255), - | meta_payload BYTEA, - | - | PRIMARY KEY(persistence_id, sequence_number) - |)""".stripMargin) + connection.createStatement(createJournalTableSql) }, 10.seconds) Await.result( r2dbcExecutor.executeDdl("beforeAll create jdbc tables") { connection => - connection.createStatement("""CREATE TABLE IF NOT EXISTS jdbc_snapshot ( - | persistence_id VARCHAR(255) NOT NULL, - | sequence_number BIGINT NOT NULL, - | created BIGINT NOT NULL, - | - | snapshot_ser_id INTEGER NOT NULL, - | snapshot_ser_manifest VARCHAR(255) NOT NULL, - | snapshot_payload BYTEA NOT NULL, - | - | meta_ser_id INTEGER, - | meta_ser_manifest VARCHAR(255), - | meta_payload BYTEA, - | - | PRIMARY KEY(persistence_id, sequence_number) - |)""".stripMargin) + connection.createStatement(createSnapshotTableSql) }, 10.seconds) - Await.result( - r2dbcExecutor.executeDdl("beforeAll create jdbc tables") { connection => - connection.createStatement("""CREATE TABLE IF NOT EXISTS jdbc_durable_state ( - | global_offset BIGSERIAL, - | persistence_id VARCHAR(255) NOT NULL, - | revision BIGINT NOT NULL, - | state_payload BYTEA NOT NULL, - | state_serial_id INTEGER NOT NULL, - | state_serial_manifest VARCHAR(255), - | tag VARCHAR, - | state_timestamp BIGINT NOT NULL, - | PRIMARY KEY(persistence_id) - |);""".stripMargin) - }, - 10.seconds) + if (postgresTest) { + Await.result( + r2dbcExecutor.executeDdl("beforeAll create jdbc tables") { connection => + connection.createStatement("""CREATE TABLE IF NOT EXISTS jdbc_durable_state ( + | global_offset BIGSERIAL, + | persistence_id VARCHAR(255) NOT NULL, + | revision BIGINT NOT NULL, + | state_payload BYTEA NOT NULL, + | state_serial_id INTEGER NOT NULL, + | state_serial_manifest VARCHAR(255), + | tag VARCHAR, + | state_timestamp BIGINT NOT NULL, + | PRIMARY KEY(persistence_id) + |);""".stripMargin) + }, + 10.seconds) + } Await.result( r2dbcExecutor.updateOne("beforeAll delete jdbc")(_.createStatement("delete from jdbc_event_journal")), @@ -159,9 +234,11 @@ class MigrationToolSpec Await.result( r2dbcExecutor.updateOne("beforeAll delete jdbc")(_.createStatement("delete from jdbc_snapshot")), 10.seconds) - Await.result( - r2dbcExecutor.updateOne("beforeAll delete jdbc")(_.createStatement("delete from jdbc_durable_state")), - 10.seconds) + if (postgresTest) { + Await.result( + r2dbcExecutor.updateOne("beforeAll delete jdbc")(_.createStatement("delete from jdbc_durable_state")), + 10.seconds) + } Await.result(migration.migrationDao.createProgressTable(), 10.seconds) Await.result( @@ -211,10 +288,10 @@ class MigrationToolSpec probe.expectMessage(Done) } - "MigrationTool" should { + "MigrationTool Events" should { if (!testEnabled) { info( - s"MigrationToolSpec not enabled for ${system.settings.config.getString("akka.persistence.r2dbc.connection-factory.dialect")}") + s"MigrationToolSpec (Events) not enabled for ${system.settings.config.getString("akka.persistence.r2dbc.connection-factory.dialect")}") pending } @@ -321,6 +398,16 @@ class MigrationToolSpec } } + } + + "MigrationTool State" should { + + if (!postgresTest) { + info( + s"MigrationToolSpec (State) not enabled for ${system.settings.config.getString("akka.persistence.r2dbc.connection-factory.dialect")}") + pending + } + "migrate durable state of one persistenceId" in { val pid = PersistenceId.ofUniqueId(nextPid()) persistDurableState(pid, "s-1") @@ -373,6 +460,6 @@ class MigrationToolSpec assertDurableState(pid, s"s-$pid") } } - } + } diff --git a/migration/src/main/scala/akka/persistence/r2dbc/migration/MigrationTool.scala b/migration/src/main/scala/akka/persistence/r2dbc/migration/MigrationTool.scala index 0ff25b84..fde26282 100644 --- a/migration/src/main/scala/akka/persistence/r2dbc/migration/MigrationTool.scala +++ b/migration/src/main/scala/akka/persistence/r2dbc/migration/MigrationTool.scala @@ -158,7 +158,13 @@ class MigrationTool(system: ActorSystem[_]) { if (targetR2dbcSettings.dialectName == "h2") { log.error("Migrating to H2 using the migration tool not currently supported") } - private[r2dbc] val migrationDao = new MigrationToolDao(targetExecutorProvider) + //private[r2dbc] val migratioDao = new MigrationToolDao(targetExecutorProvider) + private[r2dbc] val migrationDao = { + targetR2dbcSettings.dialectName match { + case "sqlserver" => new SqlServerMigrationToolDao(targetExecutorProvider) + case _ => new MigrationToolDao(targetExecutorProvider) + } + } private lazy val createProgressTable: Future[Done] = migrationDao.createProgressTable() diff --git a/migration/src/main/scala/akka/persistence/r2dbc/migration/MigrationToolDao.scala b/migration/src/main/scala/akka/persistence/r2dbc/migration/MigrationToolDao.scala index 0db12e59..a42230ea 100644 --- a/migration/src/main/scala/akka/persistence/r2dbc/migration/MigrationToolDao.scala +++ b/migration/src/main/scala/akka/persistence/r2dbc/migration/MigrationToolDao.scala @@ -11,11 +11,12 @@ import akka.Done import akka.actor.typed.ActorSystem import akka.annotation.InternalApi import akka.dispatch.ExecutionContexts +import akka.persistence.r2dbc.R2dbcSettings import akka.persistence.r2dbc.internal.Sql.InterpolationWithAdapter import akka.persistence.r2dbc.internal.codec.IdentityAdapter import akka.persistence.r2dbc.internal.codec.QueryAdapter import org.slf4j.LoggerFactory - +import io.r2dbc.spi.Statement import akka.persistence.r2dbc.internal.R2dbcExecutorProvider /** @@ -38,36 +39,51 @@ import akka.persistence.r2dbc.internal.R2dbcExecutorProvider ec: ExecutionContext, system: ActorSystem[_]) { import MigrationToolDao._ - implicit val queryAdapter: QueryAdapter = IdentityAdapter + + protected val settings: R2dbcSettings = executorProvider.settings + import settings.codecSettings.JournalImplicits._ + // progress always in data partition 0 private val r2dbcExecutor = executorProvider.executorFor(slice = 0) + protected def createMigrationProgressTableSql(): String = { + sql""" + CREATE TABLE IF NOT EXISTS migration_progress( + persistence_id VARCHAR(255) NOT NULL, + event_seq_nr BIGINT, + snapshot_seq_nr BIGINT, + state_revision BIGINT, + PRIMARY KEY(persistence_id) + )""" + } + def createProgressTable(): Future[Done] = { r2dbcExecutor.executeDdl("create migration progress table") { connection => - connection.createStatement(sql""" - CREATE TABLE IF NOT EXISTS migration_progress( - persistence_id VARCHAR(255) NOT NULL, - event_seq_nr BIGINT, - snapshot_seq_nr BIGINT, - state_revision BIGINT, - PRIMARY KEY(persistence_id) - )""") + connection.createStatement(createMigrationProgressTableSql()) } } + protected def baseUpsertSql(column: String): String = { + sql""" + INSERT INTO migration_progress + (persistence_id, $column) + VALUES (?, ?) + ON CONFLICT (persistence_id) + DO UPDATE SET + $column = excluded.$column""" + } + + protected def bindBaseUpsertSql(stmt: Statement, persistenceId: String, seqNr: Long): Statement = { + stmt + .bind(0, persistenceId) + .bind(1, seqNr) + } + def updateEventProgress(persistenceId: String, seqNr: Long): Future[Done] = { r2dbcExecutor .updateOne(s"upsert migration progress [$persistenceId]") { connection => - connection - .createStatement(sql""" - INSERT INTO migration_progress - (persistence_id, event_seq_nr) - VALUES (?, ?) - ON CONFLICT (persistence_id) - DO UPDATE SET - event_seq_nr = excluded.event_seq_nr""") - .bind(0, persistenceId) - .bind(1, seqNr) + val stmt = connection.createStatement(baseUpsertSql("event_seq_nr")) + bindBaseUpsertSql(stmt, persistenceId, seqNr) } .map(_ => Done)(ExecutionContexts.parasitic) } @@ -75,16 +91,8 @@ import akka.persistence.r2dbc.internal.R2dbcExecutorProvider def updateSnapshotProgress(persistenceId: String, seqNr: Long): Future[Done] = { r2dbcExecutor .updateOne(s"upsert migration progress [$persistenceId]") { connection => - connection - .createStatement(sql""" - INSERT INTO migration_progress - (persistence_id, snapshot_seq_nr) - VALUES (?, ?) - ON CONFLICT (persistence_id) - DO UPDATE SET - snapshot_seq_nr = excluded.snapshot_seq_nr""") - .bind(0, persistenceId) - .bind(1, seqNr) + val stmt = connection.createStatement(baseUpsertSql("snapshot_seq_nr")) + bindBaseUpsertSql(stmt, persistenceId, seqNr) } .map(_ => Done)(ExecutionContexts.parasitic) } @@ -92,16 +100,8 @@ import akka.persistence.r2dbc.internal.R2dbcExecutorProvider def updateDurableStateProgress(persistenceId: String, revision: Long): Future[Done] = { r2dbcExecutor .updateOne(s"upsert migration progress [$persistenceId]") { connection => - connection - .createStatement(sql""" - INSERT INTO migration_progress - (persistence_id, state_revision) - VALUES (?, ?) - ON CONFLICT (persistence_id) - DO UPDATE SET - state_revision = excluded.state_revision""") - .bind(0, persistenceId) - .bind(1, revision) + val stmt = connection.createStatement(baseUpsertSql("state_revision")) + bindBaseUpsertSql(stmt, persistenceId, revision) } .map(_ => Done)(ExecutionContexts.parasitic) } diff --git a/migration/src/main/scala/akka/persistence/r2dbc/migration/SqlServerMigrationToolDao.scala b/migration/src/main/scala/akka/persistence/r2dbc/migration/SqlServerMigrationToolDao.scala new file mode 100644 index 00000000..f52fbdc3 --- /dev/null +++ b/migration/src/main/scala/akka/persistence/r2dbc/migration/SqlServerMigrationToolDao.scala @@ -0,0 +1,69 @@ +/* + * Copyright (C) 2022 - 2023 Lightbend Inc. + */ + +package akka.persistence.r2dbc.migration + +import scala.concurrent.ExecutionContext +import scala.concurrent.duration.FiniteDuration + +import io.r2dbc.spi.ConnectionFactory +import io.r2dbc.spi.Statement +import org.slf4j.Logger +import org.slf4j.LoggerFactory +import akka.actor.typed.ActorSystem +import akka.annotation.InternalApi +import akka.persistence.r2dbc.internal.R2dbcExecutorProvider +import akka.persistence.r2dbc.internal.Sql.Interpolation +import akka.persistence.r2dbc.internal.codec.QueryAdapter +import akka.persistence.r2dbc.internal.codec.SqlServerQueryAdapter + +/** + * INTERNAL API + */ +@InternalApi private[r2dbc] object SqlServerMigrationToolDao { + private val log = LoggerFactory.getLogger(classOf[SqlServerMigrationToolDao]) + +} + +/** + * INTERNAL API + */ +@InternalApi private[r2dbc] class SqlServerMigrationToolDao(executorProvider: R2dbcExecutorProvider)(implicit + ec: ExecutionContext, + system: ActorSystem[_]) + extends MigrationToolDao(executorProvider) { + + override protected def createMigrationProgressTableSql(): String = { + sql""" + IF object_id('migration_progress') is null + CREATE TABLE migration_progress( + persistence_id NVARCHAR(255) NOT NULL, + event_seq_nr BIGINT, + snapshot_seq_nr BIGINT, + state_revision BIGINT, + PRIMARY KEY(persistence_id) + )""" + } + + override def baseUpsertSql(column: String): String = { + sql""" + UPDATE migration_progress SET + $column = @bindColumn + WHERE persistence_id = @persistenceId + if @@ROWCOUNT = 0 + INSERT INTO migration_progress + (persistence_id, $column) + VALUES(@persistenceId, @bindColumn) + + """ + } + + // necessary, otherwise we would need to bind both columns multiple times + override protected def bindBaseUpsertSql(stmt: Statement, persistenceId: String, columnValue: Long): Statement = { + stmt + .bind("@persistenceId", persistenceId) + .bind("@bindColumn", columnValue) + } + +} diff --git a/project/Dependencies.scala b/project/Dependencies.scala index ff7a0e44..7fcdc2ab 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -15,7 +15,8 @@ object Dependencies { val AkkaProjectionVersionInDocs = "current" val H2Version = "2.2.224" val R2dbcH2Version = "1.0.0.RELEASE" - val SqlServerVersion = "1.0.2.RELEASE" + val SqlServerR2dbcVersion = "1.0.2.RELEASE" + val SqlServerJdbcVersion = "7.4.1.jre8" object Compile { val akkaActorTyped = "com.typesafe.akka" %% "akka-actor-typed" % AkkaVersion @@ -30,7 +31,7 @@ object Dependencies { val h2 = "com.h2database" % "h2" % H2Version % Provided // EPL 1.0 val r2dbcH2 = "io.r2dbc" % "r2dbc-h2" % R2dbcH2Version % Provided // ApacheV2 - val r2dbcSqlServer = "io.r2dbc" % "r2dbc-mssql" % SqlServerVersion % Provided // ApacheV2 + val r2dbcSqlServer = "io.r2dbc" % "r2dbc-mssql" % SqlServerR2dbcVersion % Provided // ApacheV2 } object TestDeps { @@ -72,6 +73,7 @@ object Dependencies { val migrationTests = Seq( "com.lightbend.akka" %% "akka-persistence-jdbc" % AkkaPersistenceJdbcVersion % Test, + "com.microsoft.sqlserver" % "mssql-jdbc" % SqlServerJdbcVersion % Test, TestDeps.postgresql, TestDeps.logback, TestDeps.scalaTest, From 7e7fd98c71a2f15c69dbd8c849d271e0824f4108 Mon Sep 17 00:00:00 2001 From: sebastian-alfers Date: Mon, 5 Feb 2024 16:45:53 +0100 Subject: [PATCH 2/2] used InterpolationWithAdapter in sql migration dao --- .../migration/SqlServerMigrationToolDao.scala | 20 ++++--------------- 1 file changed, 4 insertions(+), 16 deletions(-) diff --git a/migration/src/main/scala/akka/persistence/r2dbc/migration/SqlServerMigrationToolDao.scala b/migration/src/main/scala/akka/persistence/r2dbc/migration/SqlServerMigrationToolDao.scala index f52fbdc3..4df347d7 100644 --- a/migration/src/main/scala/akka/persistence/r2dbc/migration/SqlServerMigrationToolDao.scala +++ b/migration/src/main/scala/akka/persistence/r2dbc/migration/SqlServerMigrationToolDao.scala @@ -5,26 +5,12 @@ package akka.persistence.r2dbc.migration import scala.concurrent.ExecutionContext -import scala.concurrent.duration.FiniteDuration -import io.r2dbc.spi.ConnectionFactory -import io.r2dbc.spi.Statement -import org.slf4j.Logger -import org.slf4j.LoggerFactory import akka.actor.typed.ActorSystem import akka.annotation.InternalApi import akka.persistence.r2dbc.internal.R2dbcExecutorProvider -import akka.persistence.r2dbc.internal.Sql.Interpolation -import akka.persistence.r2dbc.internal.codec.QueryAdapter -import akka.persistence.r2dbc.internal.codec.SqlServerQueryAdapter - -/** - * INTERNAL API - */ -@InternalApi private[r2dbc] object SqlServerMigrationToolDao { - private val log = LoggerFactory.getLogger(classOf[SqlServerMigrationToolDao]) - -} +import akka.persistence.r2dbc.internal.Sql.InterpolationWithAdapter +import io.r2dbc.spi.Statement /** * INTERNAL API @@ -34,6 +20,8 @@ import akka.persistence.r2dbc.internal.codec.SqlServerQueryAdapter system: ActorSystem[_]) extends MigrationToolDao(executorProvider) { + import settings.codecSettings.JournalImplicits._ + override protected def createMigrationProgressTableSql(): String = { sql""" IF object_id('migration_progress') is null