Skip to content

Commit

Permalink
Clean up Sendability for ChannelInvoker (#2955)
Browse files Browse the repository at this point in the history
Motivation:

The ChannelInvoker protocols are an awkward beast. They aren't really
something that people can do generic programming against. Instead, they
were designed to do API sharing. Of course, they didn't do that very
well, and the strict concurrency checking world has revealed this.

Much of the API surface on ChannelInvoker is confused. There are
NIOAnys, which aren't Sendable. We allow sending user events without
requiring Sendable. And our two main conforming types are
ChannelPipeline and ChannelHandlerContext, two types with wildly
differing thread-safety semantics.

This PR aims to clean that up.

Modifications:

- Deprecated all API surface on ChannelInvoker protocols that uses
NIOAny.

    ChannelInvoker has to be assumed to be a cross-thread protocol,
    and that requires that it only use Sendable types. NIOAny isn't,
    so these methods are no longer sound.

- Re-add non-deprecated versions on ChannelHandlerContext.

    While it's not safe to use the NIOAny methods on Channel or
    ChannelPipeline, it's totally safe to use them on
    ChannelHandlerContext. So we keep those available and
    undeprecated.

- Provide typed generic replacements on ChannelPipeline and on Channel

    To replace the NIOAny methods on ChannelPipeline and Channel
    we can use some typed generic ones instead. These are not
    defined on ChannelInvoker, as the methods are useless on
    ChannelHandlerContext. This begins the acknowledgement that
    ChannelHandlerContext should not have conformed to these
    protocols at all.

- Add Sendable constraints to the user event witnesses on ChannelInvoker

    Again, these were missing, but must be there for Channel and
    ChannelPipeline.

- Provide non-Sendable overloads on ChannelHandlerContext

    ChannelHandlerContext is thread-bound, and so may safely pass
    non-Sendable user events.

Result:

One step closer to strict concurrency cleanliness for NIOCore.
  • Loading branch information
Lukasa authored Oct 31, 2024
1 parent 9356598 commit de116b7
Show file tree
Hide file tree
Showing 29 changed files with 719 additions and 270 deletions.
15 changes: 13 additions & 2 deletions Sources/NIOCore/AsyncAwaitSupport.swift
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,8 @@ extension Channel {
/// or `nil` if not interested in the outcome of the operation.
@available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *)
@inlinable
public func writeAndFlush<T>(_ any: T) async throws {
@preconcurrency
public func writeAndFlush<T: Sendable>(_ any: T) async throws {
try await self.writeAndFlush(any).get()
}

Expand Down Expand Up @@ -140,6 +141,11 @@ extension ChannelOutboundInvoker {
/// - data: the data to write
/// - returns: the future which will be notified once the `write` operation completes.
@available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *)
@available(
*,
deprecated,
message: "NIOAny is not Sendable: avoid wrapping the value in NIOAny to silence this warning."
)
public func writeAndFlush(_ data: NIOAny, file: StaticString = #fileID, line: UInt = #line) async throws {
try await self.writeAndFlush(data, file: file, line: line).get()
}
Expand All @@ -159,8 +165,13 @@ extension ChannelOutboundInvoker {
/// - parameters:
/// - event: the event itself.
/// - returns: the future which will be notified once the operation completes.
@preconcurrency
@available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *)
public func triggerUserOutboundEvent(_ event: Any, file: StaticString = #fileID, line: UInt = #line) async throws {
public func triggerUserOutboundEvent(
_ event: Any & Sendable,
file: StaticString = #fileID,
line: UInt = #line
) async throws {
try await self.triggerUserOutboundEvent(event, file: file, line: line).get()
}
}
Expand Down
77 changes: 56 additions & 21 deletions Sources/NIOCore/Channel.swift
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,30 @@ public protocol Channel: AnyObject, ChannelOutboundInvoker, _NIOPreconcurrencySe
/// The default implementation returns `nil`, and `Channel` implementations must opt in to
/// support this behavior.
var syncOptions: NIOSynchronousChannelOptions? { get }

/// Write data into the `Channel`, automatically wrapping with `NIOAny`.
///
/// - seealso: `ChannelOutboundInvoker.write`.
@preconcurrency
func write<T: Sendable>(_ any: T) -> EventLoopFuture<Void>

/// Write data into the `Channel`, automatically wrapping with `NIOAny`.
///
/// - seealso: `ChannelOutboundInvoker.write`.
@preconcurrency
func write<T: Sendable>(_ any: T, promise: EventLoopPromise<Void>?)

/// Write and flush data into the `Channel`, automatically wrapping with `NIOAny`.
///
/// - seealso: `ChannelOutboundInvoker.writeAndFlush`.
@preconcurrency
func writeAndFlush<T: Sendable>(_ any: T) -> EventLoopFuture<Void>

/// Write and flush data into the `Channel`, automatically wrapping with `NIOAny`.
///
/// - seealso: `ChannelOutboundInvoker.writeAndFlush`.
@preconcurrency
func writeAndFlush<T: Sendable>(_ any: T, promise: EventLoopPromise<Void>?)
}

extension Channel {
Expand Down Expand Up @@ -177,18 +201,36 @@ extension Channel {
pipeline.connect(to: address, promise: promise)
}

@available(
*,
deprecated,
message: "NIOAny is not Sendable. Avoid wrapping the value in NIOAny to silence this warning."
)
public func write(_ data: NIOAny, promise: EventLoopPromise<Void>?) {
pipeline.write(data, promise: promise)
}

public func write<T: Sendable>(_ data: T, promise: EventLoopPromise<Void>?) {
pipeline.write(data, promise: promise)
}

public func flush() {
pipeline.flush()
}

@available(
*,
deprecated,
message: "NIOAny is not Sendable. Avoid wrapping the value in NIOAny to silence this warning."
)
public func writeAndFlush(_ data: NIOAny, promise: EventLoopPromise<Void>?) {
pipeline.writeAndFlush(data, promise: promise)
}

public func writeAndFlush<T: Sendable>(_ data: T, promise: EventLoopPromise<Void>?) {
pipeline.writeAndFlush(data, promise: promise)
}

public func read() {
pipeline.read()
}
Expand All @@ -205,40 +247,33 @@ extension Channel {
promise?.fail(ChannelError._operationUnsupported)
}

public func triggerUserOutboundEvent(_ event: Any, promise: EventLoopPromise<Void>?) {
@preconcurrency
public func triggerUserOutboundEvent(_ event: Any & Sendable, promise: EventLoopPromise<Void>?) {
pipeline.triggerUserOutboundEvent(event, promise: promise)
}
}

/// Provides special extension to make writing data to the `Channel` easier by removing the need to wrap data in `NIOAny` manually.
extension Channel {

/// Write data into the `Channel`, automatically wrapping with `NIOAny`.
/// Write data into the `Channel`.
///
/// - seealso: `ChannelOutboundInvoker.write`.
public func write<T>(_ any: T) -> EventLoopFuture<Void> {
self.write(NIOAny(any))
@preconcurrency
public func write<T: Sendable>(_ any: T) -> EventLoopFuture<Void> {
let promise = self.eventLoop.makePromise(of: Void.self)
self.write(any, promise: promise)
return promise.futureResult
}

/// Write data into the `Channel`, automatically wrapping with `NIOAny`.
///
/// - seealso: `ChannelOutboundInvoker.write`.
public func write<T>(_ any: T, promise: EventLoopPromise<Void>?) {
self.write(NIOAny(any), promise: promise)
}

/// Write and flush data into the `Channel`, automatically wrapping with `NIOAny`.
///
/// - seealso: `ChannelOutboundInvoker.writeAndFlush`.
public func writeAndFlush<T>(_ any: T) -> EventLoopFuture<Void> {
self.writeAndFlush(NIOAny(any))
}

/// Write and flush data into the `Channel`, automatically wrapping with `NIOAny`.
/// Write and flush data into the `Channel`.
///
/// - seealso: `ChannelOutboundInvoker.writeAndFlush`.
public func writeAndFlush<T>(_ any: T, promise: EventLoopPromise<Void>?) {
self.writeAndFlush(NIOAny(any), promise: promise)
@preconcurrency
public func writeAndFlush<T: Sendable>(_ any: T) -> EventLoopFuture<Void> {
let promise = self.eventLoop.makePromise(of: Void.self)
self.writeAndFlush(any, promise: promise)
return promise.futureResult
}
}

Expand Down
34 changes: 31 additions & 3 deletions Sources/NIOCore/ChannelInvoker.swift
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,11 @@ public protocol ChannelOutboundInvoker {
/// - data: the data to write
/// - promise: the `EventLoopPromise` that will be notified once the operation completes,
/// or `nil` if not interested in the outcome of the operation.
@available(
*,
deprecated,
message: "NIOAny is not Sendable. Avoid wrapping the value in NIOAny to silence this warning."
)
func write(_ data: NIOAny, promise: EventLoopPromise<Void>?)

/// Flush data that was previously written via `write` to the remote peer.
Expand All @@ -56,6 +61,11 @@ public protocol ChannelOutboundInvoker {
/// - data: the data to write
/// - promise: the `EventLoopPromise` that will be notified once the `write` operation completes,
/// or `nil` if not interested in the outcome of the operation.
@available(
*,
deprecated,
message: "NIOAny is not Sendable. Avoid wrapping the value in NIOAny to silence this warning."
)
func writeAndFlush(_ data: NIOAny, promise: EventLoopPromise<Void>?)

/// Signal that we want to read from the `Channel` once there is data ready.
Expand All @@ -78,7 +88,8 @@ public protocol ChannelOutboundInvoker {
/// - parameters:
/// - promise: the `EventLoopPromise` that will be notified once the operation completes,
/// or `nil` if not interested in the outcome of the operation.
func triggerUserOutboundEvent(_ event: Any, promise: EventLoopPromise<Void>?)
@preconcurrency
func triggerUserOutboundEvent(_ event: Any & Sendable, promise: EventLoopPromise<Void>?)

/// The `EventLoop` which is used by this `ChannelOutboundInvoker` for execution.
var eventLoop: EventLoop { get }
Expand Down Expand Up @@ -135,6 +146,11 @@ extension ChannelOutboundInvoker {
/// - parameters:
/// - data: the data to write
/// - returns: the future which will be notified once the operation completes.
@available(
*,
deprecated,
message: "NIOAny is not Sendable. Avoid wrapping the value in NIOAny to silence this warning."
)
public func write(_ data: NIOAny, file: StaticString = #fileID, line: UInt = #line) -> EventLoopFuture<Void> {
let promise = makePromise(file: file, line: line)
write(data, promise: promise)
Expand All @@ -146,6 +162,11 @@ extension ChannelOutboundInvoker {
/// - parameters:
/// - data: the data to write
/// - returns: the future which will be notified once the `write` operation completes.
@available(
*,
deprecated,
message: "NIOAny is not Sendable. Avoid wrapping the value in NIOAny to silence this warning."
)
public func writeAndFlush(_ data: NIOAny, file: StaticString = #fileID, line: UInt = #line) -> EventLoopFuture<Void>
{
let promise = makePromise(file: file, line: line)
Expand All @@ -170,8 +191,9 @@ extension ChannelOutboundInvoker {
/// - parameters:
/// - event: the event itself.
/// - returns: the future which will be notified once the operation completes.
@preconcurrency
public func triggerUserOutboundEvent(
_ event: Any,
_ event: Any & Sendable,
file: StaticString = #fileID,
line: UInt = #line
) -> EventLoopFuture<Void> {
Expand Down Expand Up @@ -210,6 +232,11 @@ public protocol ChannelInboundInvoker {
///
/// - parameters:
/// - data: the data that was read and is ready to be processed.
@available(
*,
deprecated,
message: "NIOAny is not Sendable. Avoid wrapping the value in NIOAny to silence this warning."
)
func fireChannelRead(_ data: NIOAny)

/// Called once there is no more data to read immediately on a `Channel`. Any new data received will be handled later.
Expand Down Expand Up @@ -238,7 +265,8 @@ public protocol ChannelInboundInvoker {
///
/// - parameters:
/// - event: the event itself.
func fireUserInboundEventTriggered(_ event: Any)
@preconcurrency
func fireUserInboundEventTriggered(_ event: Any & Sendable)
}

/// A protocol that signals that outbound and inbound events are triggered by this invoker.
Expand Down
Loading

0 comments on commit de116b7

Please sign in to comment.