-
Notifications
You must be signed in to change notification settings - Fork 135
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
When used for recovery, pin query to journal dispatcher #880
When used for recovery, pin query to journal dispatcher #880
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
agreed, good to run those on the journal dispatcher
This comment has been minimized.
This comment has been minimized.
Note this targets |
Travis can't download the Jabba installer script. 🤦🏼♂️ |
@@ -88,6 +92,8 @@ trait CassandraRecovery extends CassandraTagRecovery with TaggedPreparedStatemen | |||
someReadConsistency, | |||
someReadRetryPolicy, | |||
extractor = Extractors.persistentRepr(eventDeserializer, serialization)) | |||
// run the query on the journal dispatcher (not the queries dispatcher) | |||
.withAttributes(ActorAttributes.dispatcher(sessionSettings.pluginDispatcher)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We have to be sure that this isn't just adding an outer asynchronous boundary. Might be more clear and safe to pass in the dispatcher as parameter to queries. eventsByPersistenceId
?
cc4abdb
to
12bbf34
Compare
The job that runs tests on JDK11 seems to be slow enough that the 50min timeout in travis fails the job. |
This comment has been minimized.
This comment has been minimized.
core/src/main/scala/akka/persistence/cassandra/CassandraPluginConfig.scala
Outdated
Show resolved
Hide resolved
@@ -569,7 +569,8 @@ class CassandraReadJournal(system: ExtendedActorSystem, cfg: Config) | |||
queryPluginConfig.fetchSize, | |||
None, | |||
s"currentEventsByPersistenceId-$persistenceId", | |||
extractor = Extractors.persistentRepr(eventsByPersistenceIdDeserializer, serialization)) | |||
extractor = Extractors.persistentRepr(eventsByPersistenceIdDeserializer, serialization), | |||
dispatcher = queryPluginConfig.pluginDispatcher) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since the user part of the stream run through the public API queries will run on the default dispatcher this means an async boundary is introduced in every app that uses the query side, is that really something we want to do?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure I understand the previous comment.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@johanandren this is not changing that, the dispatchers parameter is delegated down to https://github.com/akka/akka-persistence-cassandra/pull/880/files#diff-279ff4d092baf64b28404c59938d5a8f4053dffe99b8679826391980f66a6f24R647
We must have a separate dispatcher at the inner stage because some operations are blocking.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, so if we run that inner EventsByPersistenceIdStage
and mapAsync
stage on an internal dispatcher, the user flow running on the default dispatcher will always introduce an async boundary since different dispatchers.
If that's unavoidable I guess that is fine, just that it is understood that is the decision.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's intended, because otherwise we would propagate the blocking out to user responsibility (and it would end up on the default-dispatcher). That doesn't change by this PR, or did I miss something?
See #870
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That same problem goes for this entire PR though. So it wouldn't be more "blindly" than the current changes?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I tried to keep things as small as possible.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I only extracted a hardcoded dispatcher name and set the appropriate value in 5 places (CassandraJournal, EventsByTagMigration and CassandreRecovery (x3)).
I got the impression you were suggesting a full review of the whole codebase. It's different orders of magnitude.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we should trust that Akka Streams does the right thing so additional automated tests shouldn't be needed here, but since we know there were problems with setting the attributes a manual verification with println is what I was suggesting. I can do that before approving this PR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually after reviewing this a bit more in detail I see how tricky it is because of the futureSource
bug, setting the dispatcher needs to be repeated in lots of places.
Perhaps it's even preferrable to set the dispatcher where you'd expect to set it (outermost) so that this is fixed once that is fixed in Akka?
2343699
to
d76e066
Compare
extractor = Extractors.sequenceNumber(eventDeserializer, serialization)) | ||
extractor = Extractors.sequenceNumber(eventDeserializer, serialization), | ||
// run the query on the journal dispatcher (not the queries dispatcher) | ||
dispatcher = sessionSettings.pluginDispatcher) | ||
.map(_.sequenceNr) | ||
.runWith(Sink.headOption) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here the .map
and sink is running on default dispatcher, so one async boundary added that we probably do not want, since it is an entirely internal stream
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
extractor = Extractors.taggedPersistentRepr(eventDeserializer, serialization)) | ||
extractor = Extractors.taggedPersistentRepr(eventDeserializer, serialization), | ||
// run the query on the journal dispatcher (not the queries dispatcher) | ||
dispatcher = sessionSettings.pluginDispatcher) | ||
.mapAsync(1)(sendMissingTagWrite(tp, tagWrites.get)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
here mapAsync
and the "outer" map
and runForeach
runs on default dispatcher, we probably do not want that since it is an entirely internal stream.
This is one case where the bug in the futureSource operator does not propagate attributes like it should means the dispatcher needs to be set both on the complete "inner" stream and the "outer" stream.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This one can be very important. Cause a performance regression. We should fix that here or in follow up before releasing.
@@ -569,7 +569,8 @@ class CassandraReadJournal(system: ExtendedActorSystem, cfg: Config) | |||
queryPluginConfig.fetchSize, | |||
None, | |||
s"currentEventsByPersistenceId-$persistenceId", | |||
extractor = Extractors.persistentRepr(eventsByPersistenceIdDeserializer, serialization)) | |||
extractor = Extractors.persistentRepr(eventsByPersistenceIdDeserializer, serialization), | |||
dispatcher = queryPluginConfig.pluginDispatcher) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually after reviewing this a bit more in detail I see how tricky it is because of the futureSource
bug, setting the dispatcher needs to be repeated in lots of places.
Perhaps it's even preferrable to set the dispatcher where you'd expect to set it (outermost) so that this is fixed once that is fixed in Akka?
Ok, we have an urgent need to fix this now and release. So let's follow up on the outer dispatcher things. Created issue #886 for follow up. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, given that we follow up the async boundary introduction, especially the completely internal streams for recovery etc that could live entirely on the same dispatcher/stream island and now has gotten an async boundary/two actors.
I agree, for the internal usage, recovery, we have to fix it before releasing. Could cause performance regression otherwise. |
I'll forward port this first PR to |
Recovering an entity should not be clogged by the load on the read-side.
When using a query for recovery, it should run on the journal dispatcher no the read dispatcher.