Skip to content

Commit

Permalink
Use plugin-dispatcher all the way for internal eventsByPersistenceId, #…
Browse files Browse the repository at this point in the history
  • Loading branch information
patriknw authored Apr 22, 2021
1 parent b7dd238 commit 4ce9ae1
Show file tree
Hide file tree
Showing 3 changed files with 34 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ package akka.persistence.cassandra.journal
import java.lang.{ Long => JLong }
import java.nio.ByteBuffer
import java.util.{ UUID, HashMap => JHMap, Map => JMap }

import akka.Done
import akka.actor.SupervisorStrategy.Stop
import akka.actor.ActorRef
Expand Down Expand Up @@ -38,14 +39,16 @@ import com.datastax.driver.core.policies.RetryPolicy.RetryDecision
import com.datastax.driver.core.policies.{ LoggingRetryPolicy, RetryPolicy }
import com.datastax.driver.core.utils.{ Bytes, UUIDs }
import com.typesafe.config.Config

import scala.collection.JavaConverters._
import scala.collection.immutable
import scala.collection.immutable.Seq
import scala.concurrent._
import scala.util.control.NonFatal
import scala.util.{ Failure, Success, Try }

import akka.stream.ActorAttributes
import akka.stream.scaladsl.Keep

/**
* Journal implementation of the cassandra plugin.
* Inheritance is possible but without any guarantees for future source compatibility.
Expand Down Expand Up @@ -641,11 +644,13 @@ class CassandraJournal(cfg: Config)
// run the query on the journal dispatcher (not the queries dispatcher)
dispatcher = sessionSettings.pluginDispatcher)
.map(_.sequenceNr)
.runWith(Sink.headOption)
.toMat(Sink.headOption)(Keep.right)
.withAttributes(ActorAttributes.dispatcher(sessionSettings.pluginDispatcher))
.run()
.map {
case Some(sequenceNr) => sequenceNr
case None => fromSequenceNr
}
}(akka.dispatch.ExecutionContexts.sameThreadExecutionContext)
}

private[akka] def asyncFindHighestSequenceNr(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,11 @@ import akka.persistence.cassandra.journal.TagWriters.TagWrite
import akka.persistence.cassandra.query.EventsByPersistenceIdStage.{ Extractors, TaggedPersistentRepr }
import akka.stream.scaladsl.{ Sink, Source }
import akka.util.OptionVal

import scala.concurrent._

import akka.stream.ActorAttributes
import akka.stream.scaladsl.Keep

trait CassandraRecovery extends CassandraTagRecovery with TaggedPreparedStatements {
this: CassandraJournal =>

Expand Down Expand Up @@ -75,8 +77,11 @@ trait CassandraRecovery extends CassandraTagRecovery with TaggedPreparedStatemen
.mapAsync(1)(sendMissingTagWrite(tp, tagWrites.get))
}))
.map(te => queries.mapEvent(te.pr))
.runForeach(replayCallback)
.map(_ => ())
.map(replayCallback)
.toMat(Sink.ignore)(Keep.right)
.withAttributes(ActorAttributes.dispatcher(sessionSettings.pluginDispatcher))
.run()
.map(_ => ())(akka.dispatch.ExecutionContexts.sameThreadExecutionContext)

} else {
queries
Expand All @@ -94,8 +99,11 @@ trait CassandraRecovery extends CassandraTagRecovery with TaggedPreparedStatemen
// run the query on the journal dispatcher (not the queries dispatcher)
dispatcher = sessionSettings.pluginDispatcher)
.map(p => queries.mapEvent(p.persistentRepr))
.runForeach(replayCallback)
.map(_ => ())
.map(replayCallback)
.toMat(Sink.ignore)(Keep.right)
.withAttributes(ActorAttributes.dispatcher(sessionSettings.pluginDispatcher))
.run()
.map(_ => ())(akka.dispatch.ExecutionContexts.sameThreadExecutionContext)
}
}

Expand Down Expand Up @@ -133,7 +141,9 @@ trait CassandraRecovery extends CassandraTagRecovery with TaggedPreparedStatemen
case OptionVal.None => FutureDone // no tags, skip
}
}
.runWith(Sink.ignore)
.toMat(Sink.ignore)(Keep.right)
.withAttributes(ActorAttributes.dispatcher(sessionSettings.pluginDispatcher))
.run()
} else {
log.debug(
"[{}] Recovery is starting before the latest tag writes tag progress. Min progress [{}]. From sequence nr of recovery: [{}]",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import scala.concurrent.Future
import scala.util.Failure
import scala.util.Success
import scala.util.control.NonFatal

import akka.Done
import akka.actor._
import akka.annotation.InternalApi
Expand All @@ -28,7 +29,9 @@ import akka.serialization.AsyncSerializer
import akka.serialization.Serialization
import akka.serialization.SerializationExtension
import akka.serialization.Serializers
import akka.stream.ActorAttributes
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.Keep
import akka.stream.scaladsl.Sink
import com.datastax.driver.core._
import com.datastax.driver.core.policies.LoggingRetryPolicy
Expand Down Expand Up @@ -257,10 +260,14 @@ class CassandraSnapshotStore(cfg: Config)
SnapshotMetadata(row.getString("persistence_id"), row.getLong("sequence_nr"), row.getLong("timestamp")))
.dropWhile(_.timestamp > criteria.maxTimestamp)

limit match {
case Some(n) => source.take(n.toLong).runWith(Sink.seq)
case None => source.runWith(Sink.seq)
val limitedSource = limit match {
case Some(n) => source.take(n.toLong)
case None => source
}
limitedSource
.toMat(Sink.seq)(Keep.right)
.withAttributes(ActorAttributes.dispatcher(snapshotConfig.sessionSettings.pluginDispatcher))
.run()
}

}
Expand Down

0 comments on commit 4ce9ae1

Please sign in to comment.