Skip to content

Commit

Permalink
Use plugin-dispatcher all the way for internal eventsByPersistenceId, #…
Browse files Browse the repository at this point in the history
…886

* and a few more
  • Loading branch information
patriknw committed Apr 21, 2021
1 parent b7dd238 commit 406facd
Show file tree
Hide file tree
Showing 5 changed files with 54 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ package akka.persistence.cassandra.journal
import java.lang.{ Long => JLong }
import java.nio.ByteBuffer
import java.util.{ UUID, HashMap => JHMap, Map => JMap }

import akka.Done
import akka.actor.SupervisorStrategy.Stop
import akka.actor.ActorRef
Expand Down Expand Up @@ -38,14 +39,16 @@ import com.datastax.driver.core.policies.RetryPolicy.RetryDecision
import com.datastax.driver.core.policies.{ LoggingRetryPolicy, RetryPolicy }
import com.datastax.driver.core.utils.{ Bytes, UUIDs }
import com.typesafe.config.Config

import scala.collection.JavaConverters._
import scala.collection.immutable
import scala.collection.immutable.Seq
import scala.concurrent._
import scala.util.control.NonFatal
import scala.util.{ Failure, Success, Try }

import akka.stream.ActorAttributes
import akka.stream.scaladsl.Keep

/**
* Journal implementation of the cassandra plugin.
* Inheritance is possible but without any guarantees for future source compatibility.
Expand Down Expand Up @@ -641,11 +644,13 @@ class CassandraJournal(cfg: Config)
// run the query on the journal dispatcher (not the queries dispatcher)
dispatcher = sessionSettings.pluginDispatcher)
.map(_.sequenceNr)
.runWith(Sink.headOption)
.toMat(Sink.headOption)(Keep.right)
.withAttributes(ActorAttributes.dispatcher(sessionSettings.pluginDispatcher))
.run()
.map {
case Some(sequenceNr) => sequenceNr
case None => fromSequenceNr
}
}(akka.dispatch.ExecutionContexts.sameThreadExecutionContext)
}

private[akka] def asyncFindHighestSequenceNr(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,11 @@ import akka.persistence.cassandra.journal.TagWriters.TagWrite
import akka.persistence.cassandra.query.EventsByPersistenceIdStage.{ Extractors, TaggedPersistentRepr }
import akka.stream.scaladsl.{ Sink, Source }
import akka.util.OptionVal

import scala.concurrent._

import akka.stream.ActorAttributes
import akka.stream.scaladsl.Keep

trait CassandraRecovery extends CassandraTagRecovery with TaggedPreparedStatements {
this: CassandraJournal =>

Expand Down Expand Up @@ -74,9 +76,18 @@ trait CassandraRecovery extends CassandraTagRecovery with TaggedPreparedStatemen
dispatcher = sessionSettings.pluginDispatcher)
.mapAsync(1)(sendMissingTagWrite(tp, tagWrites.get))
}))
.map(te => queries.mapEvent(te.pr))
.runForeach(replayCallback)
.map(_ => ())
.map { te =>
println(s"# asyncReplayMessages mapEvent ${Thread.currentThread().getName}") // FIXME
queries.mapEvent(te.pr)
}
.map { p =>
println(s"# asyncReplayMessages replayCallback ${Thread.currentThread().getName}") // FIXME
replayCallback(p)
}
.toMat(Sink.ignore)(Keep.right)
.withAttributes(ActorAttributes.dispatcher(sessionSettings.pluginDispatcher))
.run()
.map(_ => ())(akka.dispatch.ExecutionContexts.sameThreadExecutionContext)

} else {
queries
Expand All @@ -93,9 +104,18 @@ trait CassandraRecovery extends CassandraTagRecovery with TaggedPreparedStatemen
extractor = Extractors.persistentRepr(eventDeserializer, serialization),
// run the query on the journal dispatcher (not the queries dispatcher)
dispatcher = sessionSettings.pluginDispatcher)
.map(p => queries.mapEvent(p.persistentRepr))
.runForeach(replayCallback)
.map(_ => ())
.map { p =>
println(s"# asyncReplayMessages mapEvent ${Thread.currentThread().getName}") // FIXME
queries.mapEvent(p.persistentRepr)
}
.map { p =>
println(s"# asyncReplayMessages replayCallback ${Thread.currentThread().getName}") // FIXME
replayCallback(p)
}
.toMat(Sink.ignore)(Keep.right)
.withAttributes(ActorAttributes.dispatcher(sessionSettings.pluginDispatcher))
.run()
.map(_ => ())(akka.dispatch.ExecutionContexts.sameThreadExecutionContext)
}
}

Expand Down Expand Up @@ -133,7 +153,9 @@ trait CassandraRecovery extends CassandraTagRecovery with TaggedPreparedStatemen
case OptionVal.None => FutureDone // no tags, skip
}
}
.runWith(Sink.ignore)
.toMat(Sink.ignore)(Keep.right)
.withAttributes(ActorAttributes.dispatcher(sessionSettings.pluginDispatcher))
.run()
} else {
log.debug(
"[{}] Recovery is starting before the latest tag writes tag progress. Min progress [{}]. From sequence nr of recovery: [{}]",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,8 +86,10 @@ import akka.util.OptionVal
executeStatement(selectDeletedToQuery.bind(persistenceId)).map(r =>
Option(r.one()).map(_.getLong("deleted_to")).getOrElse(0))

private def executeStatement(statement: Statement)(implicit ec: ExecutionContext): Future[ResultSet] =
private def executeStatement(statement: Statement)(implicit ec: ExecutionContext): Future[ResultSet] = {
println(s"# EventsByPersistenceIdStage: ${Thread.currentThread().getName}") // FIXME
session.executeAsync(withCustom(statement)).asScala
}

private def withCustom(statement: Statement): Statement = {
customConsistencyLevel.foreach(statement.setConsistencyLevel)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -619,6 +619,8 @@ class CassandraReadJournal(system: ExtendedActorSystem, cfg: Config)
fastForwardEnabled: Boolean = false,
dispatcher: String): Source[T, Future[EventsByPersistenceIdStage.Control]] = {

println(s"# eventsByPersistenceId call on ${Thread.currentThread().getName}") // FIXME

val deserializeEventAsync = queryPluginConfig.deserializationParallelism > 1

createFutureSource(combinedEventsByPersistenceIdStmts) { (s, c) =>
Expand All @@ -642,6 +644,7 @@ class CassandraReadJournal(system: ExtendedActorSystem, cfg: Config)
fastForwardEnabled))
.named(name)
}.mapAsync(queryPluginConfig.deserializationParallelism) { row =>
println(s"# eventsByPersistenceId deserialization mapAsync ${Thread.currentThread().getName}") // FIXME
extractor.extract(row, deserializeEventAsync)
}
.withAttributes(ActorAttributes.dispatcher(dispatcher))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import scala.concurrent.Future
import scala.util.Failure
import scala.util.Success
import scala.util.control.NonFatal

import akka.Done
import akka.actor._
import akka.annotation.InternalApi
Expand All @@ -28,7 +29,9 @@ import akka.serialization.AsyncSerializer
import akka.serialization.Serialization
import akka.serialization.SerializationExtension
import akka.serialization.Serializers
import akka.stream.ActorAttributes
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.Keep
import akka.stream.scaladsl.Sink
import com.datastax.driver.core._
import com.datastax.driver.core.policies.LoggingRetryPolicy
Expand Down Expand Up @@ -257,10 +260,14 @@ class CassandraSnapshotStore(cfg: Config)
SnapshotMetadata(row.getString("persistence_id"), row.getLong("sequence_nr"), row.getLong("timestamp")))
.dropWhile(_.timestamp > criteria.maxTimestamp)

limit match {
case Some(n) => source.take(n.toLong).runWith(Sink.seq)
case None => source.runWith(Sink.seq)
val limitedSource = limit match {
case Some(n) => source.take(n.toLong)
case None => source
}
limitedSource
.toMat(Sink.seq)(Keep.right)
.withAttributes(ActorAttributes.dispatcher(snapshotConfig.sessionSettings.pluginDispatcher))
.run()
}

}
Expand Down

0 comments on commit 406facd

Please sign in to comment.