Skip to content

Commit

Permalink
chore: Dao ExecutionContext in R2dbcExecutorProvider (#518)
Browse files Browse the repository at this point in the history
* H2 requires blocking dispatcher, which was lost in the data partitions refactoring
* reduce number of parameters of dao creation since everything needed is included in
  R2dbcExecutorProvider
  • Loading branch information
patriknw authored Feb 5, 2024
1 parent ba3dfbf commit 74274a1
Show file tree
Hide file tree
Showing 29 changed files with 227 additions and 276 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,13 @@ final class DurableStateCleanup(systemProvider: ClassicActorSystemProvider, conf
private val settings = R2dbcSettings(system.settings.config.getConfig(sharedConfigPath))

private val executorProvider =
new R2dbcExecutorProvider(settings, sharedConfigPath + ".connection-factory", LoggerFactory.getLogger(getClass))
private val stateDao = settings.connectionFactorySettings.dialect.createDurableStateDao(settings, executorProvider)
new R2dbcExecutorProvider(
system,
settings.connectionFactorySettings.dialect.daoExecutionContext(settings, system),
settings,
sharedConfigPath + ".connection-factory",
LoggerFactory.getLogger(getClass))
private val stateDao = settings.connectionFactorySettings.dialect.createDurableStateDao(executorProvider)

/**
* Delete the state related to one single `persistenceId`.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,9 +60,14 @@ final class EventSourcedCleanup(systemProvider: ClassicActorSystemProvider, conf
private val settings = R2dbcSettings(system.settings.config.getConfig(sharedConfigPath))

private val executorProvider =
new R2dbcExecutorProvider(settings, sharedConfigPath + ".connection-factory", LoggerFactory.getLogger(getClass))
private val journalDao = settings.connectionFactorySettings.dialect.createJournalDao(settings, executorProvider)
private val snapshotDao = settings.connectionFactorySettings.dialect.createSnapshotDao(settings, executorProvider)
new R2dbcExecutorProvider(
system,
settings.connectionFactorySettings.dialect.daoExecutionContext(settings, system),
settings,
sharedConfigPath + ".connection-factory",
LoggerFactory.getLogger(getClass))
private val journalDao = settings.connectionFactorySettings.dialect.createJournalDao(executorProvider)
private val snapshotDao = settings.connectionFactorySettings.dialect.createSnapshotDao(executorProvider)

/**
* Delete all events before a sequenceNr for the given persistence id. Snapshots are not deleted.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,17 +29,15 @@ private[r2dbc] trait Dialect {
*/
def adaptSettings(settings: R2dbcSettings): R2dbcSettings = settings

def daoExecutionContext(settings: R2dbcSettings, system: ActorSystem[_]): ExecutionContext

def createConnectionFactory(config: Config): ConnectionFactory

def createJournalDao(settings: R2dbcSettings, executorProvider: R2dbcExecutorProvider)(implicit
system: ActorSystem[_]): JournalDao
def createJournalDao(executorProvider: R2dbcExecutorProvider): JournalDao

def createQueryDao(settings: R2dbcSettings, executorProvider: R2dbcExecutorProvider)(implicit
system: ActorSystem[_]): QueryDao
def createQueryDao(executorProvider: R2dbcExecutorProvider): QueryDao

def createSnapshotDao(settings: R2dbcSettings, executorProvider: R2dbcExecutorProvider)(implicit
system: ActorSystem[_]): SnapshotDao
def createSnapshotDao(executorProvider: R2dbcExecutorProvider): SnapshotDao

def createDurableStateDao(settings: R2dbcSettings, executorProvider: R2dbcExecutorProvider)(implicit
system: ActorSystem[_]): DurableStateDao
def createDurableStateDao(executorProvider: R2dbcExecutorProvider): DurableStateDao
}
Original file line number Diff line number Diff line change
Expand Up @@ -392,9 +392,11 @@ class R2dbcExecutor(
* INTERNAL API
*/
@InternalStableApi class R2dbcExecutorProvider(
val system: ActorSystem[_],
val ec: ExecutionContext,
val settings: R2dbcSettings,
connectionFactoryBaseConfigPath: String,
log: Logger)(implicit ec: ExecutionContext, system: ActorSystem[_]) {
log: Logger) {
private val connectionFactoryProvider = ConnectionFactoryProvider(system)
private var cache = IntMap.empty[R2dbcExecutor]

Expand All @@ -409,7 +411,7 @@ class R2dbcExecutor(
connectionFactory,
log,
settings.logDbCallsExceeding,
settings.connectionFactorySettings.poolSettings.closeCallsExceeding)
settings.connectionFactorySettings.poolSettings.closeCallsExceeding)(ec, system)
// it's just a cache so no need for guarding concurrent updates or visibility
cache = cache.updated(slice, executor)
executor
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,28 +87,24 @@ private[r2dbc] object H2Dialect extends Dialect {
new H2ConnectionFactory(h2Config)
}

override def createJournalDao(settings: R2dbcSettings, executorProvider: R2dbcExecutorProvider)(implicit
system: ActorSystem[_]): JournalDao =
new H2JournalDao(settings, executorProvider)(ecForDaos(system, settings), system)

override def createSnapshotDao(settings: R2dbcSettings, executorProvider: R2dbcExecutorProvider)(implicit
system: ActorSystem[_]): SnapshotDao =
new H2SnapshotDao(settings, executorProvider)(ecForDaos(system, settings), system)

override def createQueryDao(settings: R2dbcSettings, executorProvider: R2dbcExecutorProvider)(implicit
system: ActorSystem[_]): QueryDao =
new H2QueryDao(settings, executorProvider)(ecForDaos(system, settings), system)

override def createDurableStateDao(settings: R2dbcSettings, executorProvider: R2dbcExecutorProvider)(implicit
system: ActorSystem[_]): DurableStateDao =
new H2DurableStateDao(settings, executorProvider, this)(ecForDaos(system, settings), system)

private def ecForDaos(system: ActorSystem[_], settings: R2dbcSettings): ExecutionContext = {
override def daoExecutionContext(settings: R2dbcSettings, system: ActorSystem[_]): ExecutionContext = {
// H2 R2DBC driver blocks in surprising places (Mono.toFuture in stmt.execute().asFuture())
system.dispatchers.lookup(
DispatcherSelector.fromConfig(settings.connectionFactorySettings.config.getString("use-dispatcher")))
}

override def createJournalDao(executorProvider: R2dbcExecutorProvider): JournalDao =
new H2JournalDao(executorProvider)

override def createSnapshotDao(executorProvider: R2dbcExecutorProvider): SnapshotDao =
new H2SnapshotDao(executorProvider)

override def createQueryDao(executorProvider: R2dbcExecutorProvider): QueryDao =
new H2QueryDao(executorProvider)

override def createDurableStateDao(executorProvider: R2dbcExecutorProvider): DurableStateDao =
new H2DurableStateDao(executorProvider, this)

private def dbSchema(config: Config, createSliceIndexes: Boolean, additionalInit: String): String = {
def optionalConfString(name: String): Option[String] = {
val s = config.getString(name)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,13 @@

package akka.persistence.r2dbc.internal.h2

import scala.concurrent.ExecutionContext
import scala.concurrent.duration.Duration
import scala.concurrent.duration.FiniteDuration

import org.slf4j.Logger
import org.slf4j.LoggerFactory

import akka.actor.typed.ActorSystem
import akka.annotation.InternalApi
import akka.persistence.r2dbc.R2dbcSettings
import akka.persistence.r2dbc.internal.Dialect
import akka.persistence.r2dbc.internal.R2dbcExecutorProvider
import akka.persistence.r2dbc.internal.postgres.PostgresDurableStateDao
Expand All @@ -22,11 +19,8 @@ import akka.persistence.r2dbc.internal.postgres.PostgresDurableStateDao
* INTERNAL API
*/
@InternalApi
private[r2dbc] final class H2DurableStateDao(
settings: R2dbcSettings,
executorProvider: R2dbcExecutorProvider,
dialect: Dialect)(implicit ec: ExecutionContext, system: ActorSystem[_])
extends PostgresDurableStateDao(settings, executorProvider, dialect) {
private[r2dbc] final class H2DurableStateDao(executorProvider: R2dbcExecutorProvider, dialect: Dialect)
extends PostgresDurableStateDao(executorProvider, dialect) {

override protected lazy val log: Logger = LoggerFactory.getLogger(classOf[H2DurableStateDao])

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,41 +4,37 @@

package akka.persistence.r2dbc.internal.h2

import akka.actor.typed.ActorSystem
import akka.annotation.InternalApi
import akka.dispatch.ExecutionContexts
import akka.persistence.r2dbc.R2dbcSettings
import akka.persistence.r2dbc.internal.JournalDao
import akka.persistence.r2dbc.internal.codec.PayloadCodec.RichStatement
import akka.persistence.r2dbc.internal.Sql.InterpolationWithAdapter
import akka.persistence.r2dbc.internal.postgres.PostgresJournalDao
import io.r2dbc.spi.Statement
import org.slf4j.Logger
import org.slf4j.LoggerFactory
import java.time.Instant

import scala.concurrent.ExecutionContext
import scala.concurrent.Future

import io.r2dbc.spi.Connection
import io.r2dbc.spi.Statement
import org.slf4j.Logger
import org.slf4j.LoggerFactory

import akka.annotation.InternalApi
import akka.dispatch.ExecutionContexts
import akka.persistence.r2dbc.internal.JournalDao
import akka.persistence.r2dbc.internal.R2dbcExecutor
import akka.persistence.r2dbc.internal.R2dbcExecutorProvider
import akka.persistence.r2dbc.internal.Sql.InterpolationWithAdapter
import akka.persistence.r2dbc.internal.codec.PayloadCodec.RichStatement
import akka.persistence.r2dbc.internal.postgres.PostgresJournalDao

/**
* INTERNAL API
*/
@InternalApi
private[r2dbc] class H2JournalDao(journalSettings: R2dbcSettings, executorProvider: R2dbcExecutorProvider)(implicit
ec: ExecutionContext,
system: ActorSystem[_])
extends PostgresJournalDao(journalSettings, executorProvider) {
private[r2dbc] class H2JournalDao(executorProvider: R2dbcExecutorProvider)
extends PostgresJournalDao(executorProvider) {
import settings.codecSettings.JournalImplicits._

import JournalDao.SerializedJournalRow
import journalSettings.codecSettings.JournalImplicits._
override protected lazy val log: Logger = LoggerFactory.getLogger(classOf[H2JournalDao])
// always app timestamp (db is same process) monotonic increasing
require(journalSettings.useAppTimestamp)
require(journalSettings.dbTimestampMonotonicIncreasing)
require(settings.useAppTimestamp)
require(settings.dbTimestampMonotonicIncreasing)

private def insertSql(slice: Int) = sql"INSERT INTO ${journalTable(slice)} " +
"(slice, entity_type, persistence_id, seq_nr, writer, adapter_manifest, event_ser_id, event_ser_manifest, event_payload, tags, meta_ser_id, meta_ser_manifest, meta_payload, db_timestamp) " +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,29 +4,22 @@

package akka.persistence.r2dbc.internal.h2

import akka.actor.typed.ActorSystem
import akka.annotation.InternalApi
import akka.persistence.r2dbc.R2dbcSettings
import akka.persistence.r2dbc.internal.Sql.InterpolationWithAdapter
import akka.persistence.r2dbc.internal.postgres.PostgresQueryDao
import io.r2dbc.spi.ConnectionFactory
import io.r2dbc.spi.Row
import org.slf4j.Logger
import org.slf4j.LoggerFactory
import scala.concurrent.ExecutionContext
import scala.concurrent.duration.Duration
import scala.concurrent.duration.FiniteDuration

import org.slf4j.Logger
import org.slf4j.LoggerFactory

import akka.annotation.InternalApi
import akka.persistence.r2dbc.internal.R2dbcExecutorProvider
import akka.persistence.r2dbc.internal.Sql.InterpolationWithAdapter
import akka.persistence.r2dbc.internal.postgres.PostgresQueryDao

/**
* INTERNAL API
*/
@InternalApi
private[r2dbc] class H2QueryDao(settings: R2dbcSettings, executorProvider: R2dbcExecutorProvider)(implicit
ec: ExecutionContext,
system: ActorSystem[_])
extends PostgresQueryDao(settings, executorProvider) {
private[r2dbc] class H2QueryDao(executorProvider: R2dbcExecutorProvider) extends PostgresQueryDao(executorProvider) {
import settings.codecSettings.JournalImplicits._
override protected lazy val log: Logger = LoggerFactory.getLogger(classOf[H2QueryDao])

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,28 +4,20 @@

package akka.persistence.r2dbc.internal.h2

import akka.actor.typed.ActorSystem
import akka.annotation.InternalApi
import akka.persistence.r2dbc.R2dbcSettings
import akka.persistence.r2dbc.internal.Sql.InterpolationWithAdapter
import akka.persistence.r2dbc.internal.postgres.PostgresSnapshotDao
import io.r2dbc.spi.ConnectionFactory
import org.slf4j.Logger
import org.slf4j.LoggerFactory
import scala.concurrent.ExecutionContext

import io.r2dbc.spi.Row

import akka.annotation.InternalApi
import akka.persistence.r2dbc.internal.R2dbcExecutorProvider
import akka.persistence.r2dbc.internal.Sql.InterpolationWithAdapter
import akka.persistence.r2dbc.internal.postgres.PostgresSnapshotDao

/**
* INTERNAL API
*/
@InternalApi
private[r2dbc] final class H2SnapshotDao(settings: R2dbcSettings, executorProvider: R2dbcExecutorProvider)(implicit
ec: ExecutionContext,
system: ActorSystem[_])
extends PostgresSnapshotDao(settings, executorProvider) {
private[r2dbc] final class H2SnapshotDao(executorProvider: R2dbcExecutorProvider)
extends PostgresSnapshotDao(executorProvider) {
import settings.codecSettings.SnapshotImplicits._

override protected lazy val log: Logger = LoggerFactory.getLogger(classOf[H2SnapshotDao])
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ package akka.persistence.r2dbc.internal.postgres
import java.time.{ Duration => JDuration }
import java.util.Locale

import scala.concurrent.ExecutionContext
import scala.concurrent.duration.FiniteDuration

import akka.actor.typed.ActorSystem
Expand Down Expand Up @@ -117,19 +118,18 @@ private[r2dbc] object PostgresDialect extends Dialect {
ConnectionFactories.get(builder.build())
}

override def createJournalDao(settings: R2dbcSettings, executorProvider: R2dbcExecutorProvider)(implicit
system: ActorSystem[_]): JournalDao =
new PostgresJournalDao(settings, executorProvider)(system.executionContext, system)
override def daoExecutionContext(settings: R2dbcSettings, system: ActorSystem[_]): ExecutionContext =
system.executionContext

override def createSnapshotDao(settings: R2dbcSettings, executorProvider: R2dbcExecutorProvider)(implicit
system: ActorSystem[_]): SnapshotDao =
new PostgresSnapshotDao(settings, executorProvider)(system.executionContext, system)
override def createJournalDao(executorProvider: R2dbcExecutorProvider): JournalDao =
new PostgresJournalDao(executorProvider)

override def createQueryDao(settings: R2dbcSettings, executorProvider: R2dbcExecutorProvider)(implicit
system: ActorSystem[_]): QueryDao =
new PostgresQueryDao(settings, executorProvider)(system.executionContext, system)
override def createSnapshotDao(executorProvider: R2dbcExecutorProvider): SnapshotDao =
new PostgresSnapshotDao(executorProvider)

override def createDurableStateDao(settings: R2dbcSettings, executorProvider: R2dbcExecutorProvider)(implicit
system: ActorSystem[_]): DurableStateDao =
new PostgresDurableStateDao(settings, executorProvider, this)(system.executionContext, system)
override def createQueryDao(executorProvider: R2dbcExecutorProvider): QueryDao =
new PostgresQueryDao(executorProvider)

override def createDurableStateDao(executorProvider: R2dbcExecutorProvider): DurableStateDao =
new PostgresDurableStateDao(executorProvider, this)
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ import scala.concurrent.duration.FiniteDuration
import scala.util.control.NonFatal

import io.r2dbc.spi.Connection
import io.r2dbc.spi.ConnectionFactory
import io.r2dbc.spi.R2dbcDataIntegrityViolationException
import io.r2dbc.spi.Row
import io.r2dbc.spi.Statement
Expand Down Expand Up @@ -79,11 +78,11 @@ private[r2dbc] object PostgresDurableStateDao {
* INTERNAL API
*/
@InternalApi
private[r2dbc] class PostgresDurableStateDao(
settings: R2dbcSettings,
executorProvider: R2dbcExecutorProvider,
dialect: Dialect)(implicit ec: ExecutionContext, system: ActorSystem[_])
private[r2dbc] class PostgresDurableStateDao(executorProvider: R2dbcExecutorProvider, dialect: Dialect)
extends DurableStateDao {
protected val settings: R2dbcSettings = executorProvider.settings
protected val system: ActorSystem[_] = executorProvider.system
implicit protected val ec: ExecutionContext = executorProvider.ec
import DurableStateDao._
import PostgresDurableStateDao._
import settings.codecSettings.DurableStateImplicits._
Expand All @@ -93,7 +92,7 @@ private[r2dbc] class PostgresDurableStateDao(
protected val r2dbcExecutor = executorProvider.executorFor(slice = 0) // FIXME support data partitions

// used for change events
private lazy val journalDao: JournalDao = dialect.createJournalDao(settings, executorProvider)
private lazy val journalDao: JournalDao = dialect.createJournalDao(executorProvider)

private lazy val additionalColumns: Map[String, immutable.IndexedSeq[AdditionalColumn[Any, Any]]] = {
settings.durableStateAdditionalColumnClasses.map { case (entityType, columnClasses) =>
Expand Down Expand Up @@ -487,7 +486,7 @@ private[r2dbc] class PostgresDurableStateDao(
handler: ChangeHandler[Any],
connection: Connection,
change: DurableStateChange[Any]): Future[Done] = {
val session = new R2dbcSession(connection)
val session = new R2dbcSession(connection)(ec, system)

def excMessage(cause: Throwable): String = {
val (changeType, revision) = change match {
Expand Down
Loading

0 comments on commit 74274a1

Please sign in to comment.