-
Notifications
You must be signed in to change notification settings - Fork 135
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Use plugin-dispatcher all the way for internal eventsByPersistenceId, #886 #888
Conversation
} | ||
.withAttributes(ActorAttributes.dispatcher(sessionSettings.pluginDispatcher)) | ||
.runWith(Sink.ignore) | ||
.map(_ => ())(akka.dispatch.ExecutionContexts.sameThreadExecutionContext) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have only looked at the recovery usage of eventsByPersistenceId so far. With these changes it's all on the plugin dispatcher, which it wasn't before.
Wonder if the inner ActorAttributes.dispatcher in CassandraReadJournal will still create an unwanted async boundary, even though same dispatcher? I'll check.
I have to verify with Akka 2.6 also.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Wonder if the inner ActorAttributes.dispatcher in CassandraReadJournal will still create an unwanted async boundary, even though same dispatcher?
That's how it's documented anyway, so we would have to get rid of that if we are looking for optimal (and then maybe the attribute bugs will make that impossible).
I think I'll leave that part of the investigation to the 1.0 (master) version.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The concern with the read side was if the user runs their side on a different dispatcher and get two async boundaries, but I think I was wrong about that, since the operators with unspecified dispatcher would "inherit" the one the user selected, so it would only be the one between the plugin dispatcher and "the rest of the stream".
println(s"# asyncReplayMessages replayCallback ${Thread.currentThread().getName}") // FIXME | ||
replayCallback(p) | ||
} | ||
.withAttributes(ActorAttributes.dispatcher(sessionSettings.pluginDispatcher)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We want the Sink.ignore on the same dispatcher as well, so .to(Sink.ignore)
and then the dispatcher attribute for the whole graph
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ah, thanks
} | ||
.withAttributes(ActorAttributes.dispatcher(sessionSettings.pluginDispatcher)) | ||
.runWith(Sink.ignore) | ||
.map(_ => ())(akka.dispatch.ExecutionContexts.sameThreadExecutionContext) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The concern with the read side was if the user runs their side on a different dispatcher and get two async boundaries, but I think I was wrong about that, since the operators with unspecified dispatcher would "inherit" the one the user selected, so it would only be the one between the plugin dispatcher and "the rest of the stream".
@johanandren I changed a few more. Could you take another look? (modulo the println that I'll remove) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM!
Travis complains: |
I have also verified this with an old sample with Akka 2.6.x. I think this is enough, at least for releasing 0.107. I'll remove the println now. |
ad3ce37
to
e0d1676
Compare
merging anyway because it was only on of the jobs that failed and others were successful |
References #886