-
Notifications
You must be signed in to change notification settings - Fork 723
Cancel pending in/outbound consumers in EmbeddedChannelCore upon channel close #3464
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Cancel pending in/outbound consumers in EmbeddedChannelCore upon channel close #3464
Conversation
…on channel close.
### Motivation:
Currently, pending consumer closures remain in the `{in}{out}boundBufferConsumer` queues of `EmbeddedChannelCore` even after the channel closes. Also, it is possible to enqueue consumers *after* the channel closes. In these cases, the consumer closures will never be invoked and this can lead to unfavourable behaviour, as observed in `NIOAsyncTestingChannel`'s `waitFor{In}{Out}boundWrite` methods (the only place these queues are currently used).
`NIOAsyncTestingChannel`'s `waitFor{In}{Out}boundWrite` methods complete a continuation *inside* the consumer closure. In the cases described above, the continuation never completes and therefore `waitFor{In}{Out}boundWrite` never returns.
### Modifications:
- Updated the element type in `EmbeddedChannelCore`'s `{in}{out}boundBufferConsumer` from `(NIOAny) -> Void` to `(Result<NIOAny, Error>) -> Void`.
- This is so that the `.failure` case can be used to notify the consumer closure that the channel has closed.
- Changed the visibility of the `{in}{out}boundBufferConsumer` properties from `internal` to `private` in order to prevent the queues from being accessed and being appended to without the call-site considering whether the channel has been closed.
- Added new methods named `enqueue{In}{Out}boundBufferConsumer(_:)` which take the consumer closure as an argument and only append to the corresponding queue if the channel isn't closed. If the channel is closed, the consumer closure is invoked immediately with `.failure(ChannelError.ioOnClosedChannel)`.
- Updated `EmbeddedChannelCore`'s `close0` method to empty out all pending closures in `{in}{out}boundBufferConsumer` and return a `.failure(ChannelError.ioOnClosedChannel)` result to each closure.
- Updated `NIOAsyncTestingChannel`'s `waitFor{In}{Out}boundWrite` to throw an error in the continuation upon receiving a `.failure` result.
- Added associated test cases.
### Result:
`EmbeddedChannelCore`'s `{in}{out}boundBufferConsumer` queues can be used more safely and all pending closures are invoked upon channel close. As a result, `NIOAsyncTestingChannel`'s `waitFor{In}{Out}boundWrite` no longer indefinitely blocks when the channel closes.
josephnoir
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM!
Sources/NIOEmbedded/Embedded.swift
Outdated
| /// Enqueue a consumer closure that will be invoked upon the next pending inbound write. | ||
| /// - Parameter newElement: The consumer closure to enqueue. Returns a `.failure` result if the channel has already | ||
| /// closed. | ||
| func enqueueInboundBufferConsumer(_ newElement: @escaping (Result<NIOAny, Error>) -> Void) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: we typically annotate methods only supposed to be called on the EL with a leading underscore, and I recommend making this private if it can be.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've added a leading underscore to both methods (enqueue{In}{Out}boundBufferConsumer(_:)).
Those methods are defined in EmbeddedChannelCore and are also called from NIOAsyncTestingChannel, so unfortunately, they cannot be made private.
Motivation:
Currently, pending consumer closures remain in the
{in}{out}boundBufferConsumerqueues ofEmbeddedChannelCoreeven after the channel closes. It is also possible to enqueue consumers after the channel closes. In these cases, the consumer closures will never be invoked and this can lead to unfavourable behaviour, as observed inNIOAsyncTestingChannel'swaitFor{In}{Out}boundWritemethods (the only places these queues are currently used).NIOAsyncTestingChannel'swaitFor{In}{Out}boundWritemethods complete a continuation inside the consumer closure. In the cases described above, the continuation never completes and thereforewaitFor{In}{Out}boundWritenever returns.Modifications:
EmbeddedChannelCore's{in}{out}boundBufferConsumerfrom(NIOAny) -> Voidto(Result<NIOAny, Error>) -> Void..failurecase can be used to notify the consumer closure that the channel has closed.{in}{out}boundBufferConsumerproperties frominternaltoprivatein order to prevent the queues from being accessed and being appended to without the call site considering whether the channel has been closed.internalmethods namedenqueue{In}{Out}boundBufferConsumer(_:)which take the consumer closure as an argument and only append to the corresponding queue if the channel isn't closed..failure(ChannelError.ioOnClosedChannel).EmbeddedChannelCore'sclose0method to return a.failure(ChannelError.ioOnClosedChannel)result to each closure in{in}{out}boundBufferConsumerand empty both buffers.NIOAsyncTestingChannel'swaitFor{In}{Out}boundWriteto throw an error in the continuation upon receiving a.failureresult.Result:
EmbeddedChannelCore's{in}{out}boundBufferConsumerqueues can be used more safely: all pending closures will be invoked upon channel close. As a result,NIOAsyncTestingChannel'swaitFor{In}{Out}boundWriteno longer indefinitely blocks when the channel closes.