From 223b0e8eac242485495a662f8345f199f739e12c Mon Sep 17 00:00:00 2001 From: George Barnett Date: Thu, 18 Jul 2024 09:16:59 +0100 Subject: [PATCH] Add manual control to NIOLockedValueBox (#2786) Motivation: NIOLockedValueBox has a 'safer' API than NIOLock as it only provides scoped access to its boxed value. NIOLock requires users to only access protected state while the lock is acquired. As such NIOLockedValueBox should be preferred where possible. However, there are cases where manual control must be used (such as storing a continuation) and users must use a NIOLock for this. There are two downsides to this: 1. All other access to the protected state must use the NIOLock API putting the onus on the developer to only access the protected state while the lock is held. 2. NIOLock can't store its protected state inline which typically results in users storing it on a class. Modifications: - Add an 'unsafe' view to NIOLockedValueBox which allows users to manually control the lock and access its protected state - Update NIOAsyncWriter and NIOThrowingAsyncSequenceProducer to use NIOLockedValueBox Result: - Safer locking API is used in more places - Fewer allocations --- ...reBenchmarks.NIOAsyncChannel.init.p90.json | 2 +- ...reBenchmarks.NIOAsyncChannel.init.p90.json | 2 +- ...reBenchmarks.NIOAsyncChannel.init.p90.json | 2 +- ...reBenchmarks.NIOAsyncChannel.init.p90.json | 2 +- ...reBenchmarks.NIOAsyncChannel.init.p90.json | 2 +- .../NIOLockedValueBox.swift | 40 +++++ .../AsyncSequences/NIOAsyncWriter.swift | 117 ++++++++------ .../NIOThrowingAsyncSequenceProducer.swift | 149 +++++++++++------- .../NIOAsyncSequenceTests.swift | 10 +- .../AsyncSequences/NIOAsyncWriterTests.swift | 2 +- .../NIOThrowingAsyncSequenceTests.swift | 30 ++-- 11 files changed, 228 insertions(+), 130 deletions(-) diff --git a/Benchmarks/Thresholds/5.10/NIOCoreBenchmarks.NIOAsyncChannel.init.p90.json b/Benchmarks/Thresholds/5.10/NIOCoreBenchmarks.NIOAsyncChannel.init.p90.json index 2554cd2c39..882a60c306 100644 --- a/Benchmarks/Thresholds/5.10/NIOCoreBenchmarks.NIOAsyncChannel.init.p90.json +++ b/Benchmarks/Thresholds/5.10/NIOCoreBenchmarks.NIOAsyncChannel.init.p90.json @@ -1,3 +1,3 @@ { - "mallocCountTotal" : 10 + "mallocCountTotal" : 8 } diff --git a/Benchmarks/Thresholds/5.8/NIOCoreBenchmarks.NIOAsyncChannel.init.p90.json b/Benchmarks/Thresholds/5.8/NIOCoreBenchmarks.NIOAsyncChannel.init.p90.json index 2554cd2c39..882a60c306 100644 --- a/Benchmarks/Thresholds/5.8/NIOCoreBenchmarks.NIOAsyncChannel.init.p90.json +++ b/Benchmarks/Thresholds/5.8/NIOCoreBenchmarks.NIOAsyncChannel.init.p90.json @@ -1,3 +1,3 @@ { - "mallocCountTotal" : 10 + "mallocCountTotal" : 8 } diff --git a/Benchmarks/Thresholds/5.9/NIOCoreBenchmarks.NIOAsyncChannel.init.p90.json b/Benchmarks/Thresholds/5.9/NIOCoreBenchmarks.NIOAsyncChannel.init.p90.json index 2554cd2c39..882a60c306 100644 --- a/Benchmarks/Thresholds/5.9/NIOCoreBenchmarks.NIOAsyncChannel.init.p90.json +++ b/Benchmarks/Thresholds/5.9/NIOCoreBenchmarks.NIOAsyncChannel.init.p90.json @@ -1,3 +1,3 @@ { - "mallocCountTotal" : 10 + "mallocCountTotal" : 8 } diff --git a/Benchmarks/Thresholds/nightly-6.0/NIOCoreBenchmarks.NIOAsyncChannel.init.p90.json b/Benchmarks/Thresholds/nightly-6.0/NIOCoreBenchmarks.NIOAsyncChannel.init.p90.json index 2554cd2c39..882a60c306 100644 --- a/Benchmarks/Thresholds/nightly-6.0/NIOCoreBenchmarks.NIOAsyncChannel.init.p90.json +++ b/Benchmarks/Thresholds/nightly-6.0/NIOCoreBenchmarks.NIOAsyncChannel.init.p90.json @@ -1,3 +1,3 @@ { - "mallocCountTotal" : 10 + "mallocCountTotal" : 8 } diff --git a/Benchmarks/Thresholds/nightly-main/NIOCoreBenchmarks.NIOAsyncChannel.init.p90.json b/Benchmarks/Thresholds/nightly-main/NIOCoreBenchmarks.NIOAsyncChannel.init.p90.json index 2554cd2c39..882a60c306 100644 --- a/Benchmarks/Thresholds/nightly-main/NIOCoreBenchmarks.NIOAsyncChannel.init.p90.json +++ b/Benchmarks/Thresholds/nightly-main/NIOCoreBenchmarks.NIOAsyncChannel.init.p90.json @@ -1,3 +1,3 @@ { - "mallocCountTotal" : 10 + "mallocCountTotal" : 8 } diff --git a/Sources/NIOConcurrencyHelpers/NIOLockedValueBox.swift b/Sources/NIOConcurrencyHelpers/NIOLockedValueBox.swift index 2c57e01d33..06cf88529f 100644 --- a/Sources/NIOConcurrencyHelpers/NIOLockedValueBox.swift +++ b/Sources/NIOConcurrencyHelpers/NIOLockedValueBox.swift @@ -37,6 +37,46 @@ public struct NIOLockedValueBox { public func withLockedValue(_ mutate: (inout Value) throws -> T) rethrows -> T { return try self._storage.withLockedValue(mutate) } + + /// Provides an unsafe view over the lock and its value. + /// + /// This can be beneficial when you require fine grained control over the lock in some + /// situations but don't want lose the benefits of ``withLockedValue(_:)`` in others by + /// switching to ``NIOLock``. + public var unsafe: Unsafe { + Unsafe(_storage: self._storage) + } + + /// Provides an unsafe view over the lock and its value. + public struct Unsafe { + @usableFromInline + let _storage: LockStorage + + /// Manually acquire the lock. + @inlinable + public func lock() { + self._storage.lock() + } + + /// Manually release the lock. + @inlinable + public func unlock() { + self._storage.unlock() + } + + /// Mutate the value, assuming the lock has been acquired manually. + /// + /// - Parameter mutate: A closure with scoped access to the value. + /// - Returns: The result of the `mutate` closure. + @inlinable + public func withValueAssumingLockIsAcquired( + _ mutate: (_ value: inout Value) throws -> Result + ) rethrows -> Result { + return try self._storage.withUnsafeMutablePointerToHeader { value in + try mutate(&value.pointee) + } + } + } } extension NIOLockedValueBox: Sendable where Value: Sendable {} diff --git a/Sources/NIOCore/AsyncSequences/NIOAsyncWriter.swift b/Sources/NIOCore/AsyncSequences/NIOAsyncWriter.swift index fe7e6af235..163dcaf076 100644 --- a/Sources/NIOCore/AsyncSequences/NIOAsyncWriter.swift +++ b/Sources/NIOCore/AsyncSequences/NIOAsyncWriter.swift @@ -414,12 +414,12 @@ extension NIOAsyncWriter { extension NIOAsyncWriter { /// This is the underlying storage of the writer. The goal of this is to synchronize the access to all state. @usableFromInline - /* fileprivate */ internal final class Storage: @unchecked Sendable { + /* fileprivate */ internal struct Storage: Sendable { /// Internal type to generate unique yield IDs. /// /// This type has reference semantics. @usableFromInline - struct YieldIDGenerator { + struct YieldIDGenerator: Sendable { /// A struct representing a unique yield ID. @usableFromInline struct YieldID: Equatable, Sendable { @@ -445,27 +445,43 @@ extension NIOAsyncWriter { } } - /// The lock that protects our state. - @usableFromInline - /* private */ internal let _lock = NIOLock() /// The counter used to assign an ID to all our yields. @usableFromInline /* private */ internal let _yieldIDGenerator = YieldIDGenerator() /// The state machine. @usableFromInline - /* private */ internal var _stateMachine: StateMachine + /* private */ internal let _state: NIOLockedValueBox + + @usableFromInline + struct State: Sendable { + @usableFromInline + var stateMachine: StateMachine + @usableFromInline + var didSuspend: (@Sendable () -> Void)? + + @inlinable + init(stateMachine: StateMachine) { + self.stateMachine = stateMachine + self.didSuspend = nil + } + } + /// Hook used in testing. @usableFromInline - internal var _didSuspend: (() -> Void)? + internal func _setDidSuspend(_ didSuspend: (@Sendable () -> Void)?) { + self._state.withLockedValue { + $0.didSuspend = didSuspend + } + } @inlinable internal var isWriterFinished: Bool { - self._lock.withLock { self._stateMachine.isWriterFinished } + self._state.withLockedValue { $0.stateMachine.isWriterFinished } } @inlinable internal var isSinkFinished: Bool { - self._lock.withLock { self._stateMachine.isSinkFinished } + self._state.withLockedValue { $0.stateMachine.isSinkFinished } } @inlinable @@ -473,10 +489,8 @@ extension NIOAsyncWriter { isWritable: Bool, delegate: Delegate ) { - self._stateMachine = .init( - isWritable: isWritable, - delegate: delegate - ) + let state = State(stateMachine: StateMachine(isWritable: isWritable, delegate: delegate)) + self._state = NIOLockedValueBox(state) } @inlinable @@ -484,8 +498,8 @@ extension NIOAsyncWriter { // We must not resume the continuation while holding the lock // because it can deadlock in combination with the underlying ulock // in cases where we race with a cancellation handler - let action = self._lock.withLock { - self._stateMachine.setWritability(to: writability) + let action = self._state.withLockedValue { + $0.stateMachine.setWritability(to: writability) } switch action { @@ -516,39 +530,42 @@ extension NIOAsyncWriter { return try await withTaskCancellationHandler { // We are manually locking here to hold the lock across the withCheckedContinuation call - self._lock.lock() + let unsafe = self._state.unsafe + unsafe.lock() - let action = self._stateMachine.yield(yieldID: yieldID) + let action = unsafe.withValueAssumingLockIsAcquired { + $0.stateMachine.yield(yieldID: yieldID) + } switch action { case .callDidYield(let delegate): // We are allocating a new Deque for every write here - self._lock.unlock() + unsafe.unlock() delegate.didYield(contentsOf: Deque(sequence)) self.unbufferQueuedEvents() return .yielded case .throwError(let error): - self._lock.unlock() + unsafe.unlock() throw error case .suspendTask: return try await withCheckedThrowingContinuation { (continuation: CheckedContinuation) in - self._stateMachine.yield( - continuation: continuation, - yieldID: yieldID - ) + let didSuspend = unsafe.withValueAssumingLockIsAcquired { + $0.stateMachine.yield(continuation: continuation, yieldID: yieldID) + return $0.didSuspend + } - self._lock.unlock() - self._didSuspend?() + unsafe.unlock() + didSuspend?() } } } onCancel: { // We must not resume the continuation while holding the lock // because it can deadlock in combination with the underlying ulock // in cases where we race with a cancellation handler - let action = self._lock.withLock { - self._stateMachine.cancel(yieldID: yieldID) + let action = self._state.withLockedValue { + $0.stateMachine.cancel(yieldID: yieldID) } switch action { @@ -580,39 +597,41 @@ extension NIOAsyncWriter { return try await withTaskCancellationHandler { // We are manually locking here to hold the lock across the withCheckedContinuation call - self._lock.lock() + let unsafe = self._state.unsafe + unsafe.lock() - let action = self._stateMachine.yield(yieldID: yieldID) + let action = unsafe.withValueAssumingLockIsAcquired { + $0.stateMachine.yield(yieldID: yieldID) + } switch action { case .callDidYield(let delegate): // We are allocating a new Deque for every write here - self._lock.unlock() + unsafe.unlock() delegate.didYield(element) self.unbufferQueuedEvents() return .yielded case .throwError(let error): - self._lock.unlock() + unsafe.unlock() throw error case .suspendTask: return try await withCheckedThrowingContinuation { (continuation: CheckedContinuation) in - self._stateMachine.yield( - continuation: continuation, - yieldID: yieldID - ) - - self._lock.unlock() - self._didSuspend?() + let didSuspend = unsafe.withValueAssumingLockIsAcquired { + $0.stateMachine.yield(continuation: continuation, yieldID: yieldID) + return $0.didSuspend + } + unsafe.unlock() + didSuspend?() } } } onCancel: { // We must not resume the continuation while holding the lock - // because it can deadlock in combination with the underlying ulock + // because it can deadlock in combination with the underlying lock // in cases where we race with a cancellation handler - let action = self._lock.withLock { - self._stateMachine.cancel(yieldID: yieldID) + let action = self._state.withLockedValue { + $0.stateMachine.cancel(yieldID: yieldID) } switch action { @@ -630,8 +649,8 @@ extension NIOAsyncWriter { // We must not resume the continuation while holding the lock // because it can deadlock in combination with the underlying ulock // in cases where we race with a cancellation handler - let action = self._lock.withLock { - self._stateMachine.writerFinish(error: error) + let action = self._state.withLockedValue { + $0.stateMachine.writerFinish(error: error) } switch action { @@ -651,8 +670,8 @@ extension NIOAsyncWriter { // We must not resume the continuation while holding the lock // because it can deadlock in combination with the underlying ulock // in cases where we race with a cancellation handler - let action = self._lock.withLock { - self._stateMachine.sinkFinish(error: error) + let action = self._state.withLockedValue { + $0.stateMachine.sinkFinish(error: error) } switch action { @@ -667,7 +686,7 @@ extension NIOAsyncWriter { @inlinable /* fileprivate */ internal func unbufferQueuedEvents() { - while let action = self._lock.withLock({ self._stateMachine.unbufferQueuedEvents()}) { + while let action = self._state.withLockedValue({ $0.stateMachine.unbufferQueuedEvents()}) { switch action { case .callDidTerminate(let delegate, let error): delegate.didTerminate(error: error) @@ -684,12 +703,12 @@ extension NIOAsyncWriter { @available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *) extension NIOAsyncWriter { @usableFromInline - /* private */ internal struct StateMachine { + /* private */ internal struct StateMachine: Sendable { @usableFromInline typealias YieldID = Storage.YieldIDGenerator.YieldID /// This is a small helper struct to encapsulate the two different values for a suspended yield. @usableFromInline - /* private */ internal struct SuspendedYield { + /* private */ internal struct SuspendedYield: Sendable { /// The yield's ID. @usableFromInline var yieldID: YieldID @@ -715,7 +734,7 @@ extension NIOAsyncWriter { /// The current state of our ``NIOAsyncWriter``. @usableFromInline - /* private */ internal enum State: CustomStringConvertible { + /* private */ internal enum State: Sendable, CustomStringConvertible { /// The initial state before either a call to ``NIOAsyncWriter/yield(contentsOf:)`` or /// ``NIOAsyncWriter/finish(completion:)`` happened. case initial( diff --git a/Sources/NIOCore/AsyncSequences/NIOThrowingAsyncSequenceProducer.swift b/Sources/NIOCore/AsyncSequences/NIOThrowingAsyncSequenceProducer.swift index 7b852dd6b6..0477c01969 100644 --- a/Sources/NIOCore/AsyncSequences/NIOThrowingAsyncSequenceProducer.swift +++ b/Sources/NIOCore/AsyncSequences/NIOThrowingAsyncSequenceProducer.swift @@ -388,23 +388,41 @@ extension NIOThrowingAsyncSequenceProducer { extension NIOThrowingAsyncSequenceProducer { /// This is the underlying storage of the sequence. The goal of this is to synchronize the access to all state. @usableFromInline - /* fileprivate */ internal final class Storage: @unchecked Sendable { - /// The lock that protects our state. + /* fileprivate */ internal struct Storage: Sendable { @usableFromInline - /* private */ internal let _lock = NIOLock() - /// The state machine. - @usableFromInline - /* private */ internal var _stateMachine: StateMachine - /// The delegate. + struct State: Sendable { + @usableFromInline + var stateMachine: StateMachine + @usableFromInline + var delegate: Delegate? + @usableFromInline + var didSuspend: (@Sendable () -> Void)? + + @inlinable + init( + stateMachine: StateMachine, + delegate: Delegate? = nil, + didSuspend: (@Sendable () -> Void)? = nil + ) { + self.stateMachine = stateMachine + self.delegate = delegate + self.didSuspend = didSuspend + } + } + @usableFromInline - /* private */ internal var _delegate: Delegate? - /// Hook used in testing. + internal let _state: NIOLockedValueBox + @usableFromInline - internal var _didSuspend: (() -> Void)? + internal func _setDidSuspend(_ didSuspend: (@Sendable () -> Void)?) { + self._state.withLockedValue { + $0.didSuspend = didSuspend + } + } @inlinable var isFinished: Bool { - self._lock.withLock { self._stateMachine.isFinished } + self._state.withLockedValue { $0.stateMachine.isFinished } } @usableFromInline @@ -412,19 +430,22 @@ extension NIOThrowingAsyncSequenceProducer { backPressureStrategy: Strategy, delegate: Delegate ) { - self._stateMachine = .init(backPressureStrategy: backPressureStrategy) - self._delegate = delegate + let state = State( + stateMachine: .init(backPressureStrategy: backPressureStrategy), + delegate: delegate + ) + self._state = NIOLockedValueBox(state) } @inlinable /* fileprivate */ internal func sequenceDeinitialized() { - let delegate: Delegate? = self._lock.withLock { - let action = self._stateMachine.sequenceDeinitialized() + let delegate: Delegate? = self._state.withLockedValue { + let action = $0.stateMachine.sequenceDeinitialized() switch action { case .callDidTerminate: - let delegate = self._delegate - self._delegate = nil + let delegate = $0.delegate + $0.delegate = nil return delegate case .none: @@ -437,20 +458,20 @@ extension NIOThrowingAsyncSequenceProducer { @inlinable /* fileprivate */ internal func iteratorInitialized() { - self._lock.withLock { - self._stateMachine.iteratorInitialized() + self._state.withLockedValue { + $0.stateMachine.iteratorInitialized() } } @inlinable /* fileprivate */ internal func iteratorDeinitialized() { - let delegate: Delegate? = self._lock.withLock { - let action = self._stateMachine.iteratorDeinitialized() + let delegate: Delegate? = self._state.withLockedValue { + let action = $0.stateMachine.iteratorDeinitialized() switch action { case .callDidTerminate: - let delegate = self._delegate - self._delegate = nil + let delegate = $0.delegate + $0.delegate = nil return delegate @@ -467,8 +488,8 @@ extension NIOThrowingAsyncSequenceProducer { // We must not resume the continuation while holding the lock // because it can deadlock in combination with the underlying ulock // in cases where we race with a cancellation handler - let action = self._lock.withLock { - self._stateMachine.yield(sequence) + let action = self._state.withLockedValue { + $0.stateMachine.yield(sequence) } switch action { @@ -498,13 +519,13 @@ extension NIOThrowingAsyncSequenceProducer { // We must not resume the continuation while holding the lock // because it can deadlock in combination with the underlying ulock // in cases where we race with a cancellation handler - let (delegate, action): (Delegate?, NIOThrowingAsyncSequenceProducer.StateMachine.FinishAction) = self._lock.withLock { - let action = self._stateMachine.finish(failure) - + let (delegate, action): (Delegate?, NIOThrowingAsyncSequenceProducer.StateMachine.FinishAction) = self._state.withLockedValue { + let action = $0.stateMachine.finish(failure) + switch action { case .resumeContinuationWithFailureAndCallDidTerminate: - let delegate = self._delegate - self._delegate = nil + let delegate = $0.delegate + $0.delegate = nil return (delegate, action) case .none: @@ -530,28 +551,36 @@ extension NIOThrowingAsyncSequenceProducer { @inlinable /* fileprivate */ internal func next() async throws -> Element? { - try await withTaskCancellationHandler { - self._lock.lock() + try await withTaskCancellationHandler { () async throws -> Element? in + let unsafe = self._state.unsafe + unsafe.lock() - let action = self._stateMachine.next() + let action = unsafe.withValueAssumingLockIsAcquired { + $0.stateMachine.next() + } switch action { case .returnElement(let element): - self._lock.unlock() + unsafe.unlock() return element case .returnElementAndCallProduceMore(let element): - let delegate = self._delegate - self._lock.unlock() + let delegate = unsafe.withValueAssumingLockIsAcquired { + $0.delegate + } + unsafe.unlock() delegate?.produceMore() return element case .returnFailureAndCallDidTerminate(let failure): - let delegate = self._delegate - self._delegate = nil - self._lock.unlock() + let delegate = unsafe.withValueAssumingLockIsAcquired { + let delegate = $0.delegate + $0.delegate = nil + return delegate + } + unsafe.unlock() delegate?.didTerminate() @@ -564,7 +593,7 @@ extension NIOThrowingAsyncSequenceProducer { } case .returnCancellationError: - self._lock.unlock() + unsafe.unlock() // We have deprecated the generic Failure type in the public API and Failure should // now be `Swift.Error`. However, if users have not migrated to the new API they could // still use a custom generic Error type and this cast might fail. @@ -579,45 +608,55 @@ extension NIOThrowingAsyncSequenceProducer { return nil case .returnNil: - self._lock.unlock() + unsafe.unlock() return nil case .suspendTask: // It is safe to hold the lock across this method // since the closure is guaranteed to be run straight away - return try await withCheckedThrowingContinuation { continuation in - let action = self._stateMachine.next(for: continuation) + return try await withCheckedThrowingContinuation { (continuation: CheckedContinuation) in + let (action, callDidSuspend) = unsafe.withValueAssumingLockIsAcquired { + let action = $0.stateMachine.next(for: continuation) + let callDidSuspend = $0.didSuspend != nil + return (action, callDidSuspend) + } switch action { case .callProduceMore: - let delegate = _delegate - self._lock.unlock() + let delegate = unsafe.withValueAssumingLockIsAcquired { + $0.delegate + } + unsafe.unlock() delegate?.produceMore() case .none: - self._lock.unlock() + unsafe.unlock() + } + + if callDidSuspend { + let didSuspend = self._state.withLockedValue { $0.didSuspend } + didSuspend?() } - self._didSuspend?() } } } onCancel: { // We must not resume the continuation while holding the lock // because it can deadlock in combination with the underlying ulock // in cases where we race with a cancellation handler - let (delegate, action): (Delegate?, NIOThrowingAsyncSequenceProducer.StateMachine.CancelledAction) = self._lock.withLock { - let action = self._stateMachine.cancelled() + let (delegate, action): (Delegate?, NIOThrowingAsyncSequenceProducer.StateMachine.CancelledAction) = self._state.withLockedValue { + let action = $0.stateMachine.cancelled() switch action { case .callDidTerminate: - let delegate = self._delegate - self._delegate = nil + let delegate = $0.delegate + $0.delegate = nil return (delegate, action) case .resumeContinuationWithCancellationErrorAndCallDidTerminate: - let delegate = self._delegate - self._delegate = nil + let delegate = $0.delegate + $0.delegate = nil return (delegate, action) @@ -658,9 +697,9 @@ extension NIOThrowingAsyncSequenceProducer { @available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *) extension NIOThrowingAsyncSequenceProducer { @usableFromInline - /* private */ internal struct StateMachine { + /* private */ internal struct StateMachine: Sendable { @usableFromInline - /* private */ internal enum State { + /* private */ internal enum State: Sendable { /// The initial state before either a call to `yield()` or a call to `next()` happened case initial( backPressureStrategy: Strategy, diff --git a/Tests/NIOCoreTests/AsyncSequences/NIOAsyncSequenceTests.swift b/Tests/NIOCoreTests/AsyncSequences/NIOAsyncSequenceTests.swift index db2471c43b..4f7b9ec361 100644 --- a/Tests/NIOCoreTests/AsyncSequences/NIOAsyncSequenceTests.swift +++ b/Tests/NIOCoreTests/AsyncSequences/NIOAsyncSequenceTests.swift @@ -269,7 +269,7 @@ final class NIOAsyncSequenceProducerTests: XCTestCase { let sequence = try XCTUnwrap(self.sequence) let suspended = expectation(description: "task suspended") - sequence._throwingSequence._storage._didSuspend = { suspended.fulfill() } + sequence._throwingSequence._storage._setDidSuspend { suspended.fulfill() } async let element = sequence.first { _ in true } @@ -351,7 +351,7 @@ final class NIOAsyncSequenceProducerTests: XCTestCase { let element: Int? = try await withThrowingTaskGroup(of: Int?.self) { group in let suspended = expectation(description: "task suspended") - sequence!._throwingSequence._storage._didSuspend = { suspended.fulfill() } + sequence!._throwingSequence._storage._setDidSuspend { suspended.fulfill() } group.addTask { let element = await sequence!.first { _ in true } @@ -439,7 +439,7 @@ final class NIOAsyncSequenceProducerTests: XCTestCase { let sequence = try XCTUnwrap(self.sequence) let suspended = expectation(description: "task suspended") - sequence._throwingSequence._storage._didSuspend = { suspended.fulfill() } + sequence._throwingSequence._storage._setDidSuspend { suspended.fulfill() } let task: Task = Task { @@ -462,7 +462,7 @@ final class NIOAsyncSequenceProducerTests: XCTestCase { let resumed = expectation(description: "task resumed") let cancelled = expectation(description: "task cancelled") - sequence._throwingSequence._storage._didSuspend = { suspended.fulfill() } + sequence._throwingSequence._storage._setDidSuspend { suspended.fulfill() } let task: Task = Task { let iterator = sequence.makeAsyncIterator() @@ -490,7 +490,7 @@ final class NIOAsyncSequenceProducerTests: XCTestCase { let sequence = try XCTUnwrap(self.sequence) let suspended = expectation(description: "task suspended") - sequence._throwingSequence._storage._didSuspend = { suspended.fulfill() } + sequence._throwingSequence._storage._setDidSuspend { suspended.fulfill() } let task: Task = Task { let iterator = sequence.makeAsyncIterator() diff --git a/Tests/NIOCoreTests/AsyncSequences/NIOAsyncWriterTests.swift b/Tests/NIOCoreTests/AsyncSequences/NIOAsyncWriterTests.swift index 9b93841252..5787068510 100644 --- a/Tests/NIOCoreTests/AsyncSequences/NIOAsyncWriterTests.swift +++ b/Tests/NIOCoreTests/AsyncSequences/NIOAsyncWriterTests.swift @@ -77,7 +77,7 @@ final class NIOAsyncWriterTests: XCTestCase { ) self.writer = newWriter.writer self.sink = newWriter.sink - self.sink._storage._didSuspend = self.delegate.didSuspend + self.sink._storage._setDidSuspend { self.delegate.didSuspend() } } override func tearDown() { diff --git a/Tests/NIOCoreTests/AsyncSequences/NIOThrowingAsyncSequenceTests.swift b/Tests/NIOCoreTests/AsyncSequences/NIOThrowingAsyncSequenceTests.swift index 237be2c28b..0d9acc1018 100644 --- a/Tests/NIOCoreTests/AsyncSequences/NIOThrowingAsyncSequenceTests.swift +++ b/Tests/NIOCoreTests/AsyncSequences/NIOThrowingAsyncSequenceTests.swift @@ -107,7 +107,7 @@ final class NIOThrowingAsyncSequenceProducerTests: XCTestCase { let element: Int? = try await withThrowingTaskGroup(of: Int?.self) { group in let suspended = expectation(description: "task suspended") - sequence._storage._didSuspend = { suspended.fulfill() } + sequence._storage._setDidSuspend { suspended.fulfill() } group.addTask { try await sequence.first { _ in true } @@ -135,7 +135,7 @@ final class NIOThrowingAsyncSequenceProducerTests: XCTestCase { let element: Int? = try await withThrowingTaskGroup(of: Int?.self) { group in let suspended = expectation(description: "task suspended") - sequence._storage._didSuspend = { suspended.fulfill() } + sequence._storage._setDidSuspend { suspended.fulfill() } group.addTask { try await sequence.first { _ in true } @@ -163,7 +163,7 @@ final class NIOThrowingAsyncSequenceProducerTests: XCTestCase { await withThrowingTaskGroup(of: Void.self) { group in let suspended = expectation(description: "task suspended") - sequence._storage._didSuspend = { suspended.fulfill() } + sequence._storage._setDidSuspend { suspended.fulfill() } group.addTask { _ = try await sequence.first { _ in true } @@ -188,7 +188,7 @@ final class NIOThrowingAsyncSequenceProducerTests: XCTestCase { await withThrowingTaskGroup(of: Void.self) { group in let suspended = expectation(description: "task suspended") - sequence._storage._didSuspend = { suspended.fulfill() } + sequence._storage._setDidSuspend { suspended.fulfill() } group.addTask { _ = try await sequence.first { _ in true } @@ -247,7 +247,7 @@ final class NIOThrowingAsyncSequenceProducerTests: XCTestCase { let element: Int? = try await withThrowingTaskGroup(of: Int?.self) { group in let suspended = expectation(description: "task suspended") - sequence._storage._didSuspend = { suspended.fulfill() } + sequence._storage._setDidSuspend { suspended.fulfill() } group.addTask { let element = try await sequence.first { _ in true } @@ -330,7 +330,7 @@ final class NIOThrowingAsyncSequenceProducerTests: XCTestCase { await XCTAssertThrowsError(try await withThrowingTaskGroup(of: Void.self) { group in let suspended = expectation(description: "task suspended") - sequence._storage._didSuspend = { suspended.fulfill() } + sequence._storage._setDidSuspend { suspended.fulfill() } group.addTask { _ = try await sequence.first { _ in true } @@ -442,7 +442,7 @@ final class NIOThrowingAsyncSequenceProducerTests: XCTestCase { let element: Int? = try await withThrowingTaskGroup(of: Int?.self) { group in let suspended = expectation(description: "task suspended") - sequence!._storage._didSuspend = { suspended.fulfill() } + sequence!._storage._setDidSuspend { suspended.fulfill() } group.addTask { let element = try await sequence!.first { _ in true } @@ -532,7 +532,7 @@ final class NIOThrowingAsyncSequenceProducerTests: XCTestCase { let sequence = try XCTUnwrap(self.sequence) let suspended = expectation(description: "task suspended") - sequence._storage._didSuspend = { suspended.fulfill() } + sequence._storage._setDidSuspend { suspended.fulfill() } let task: Task = Task { let iterator = sequence.makeAsyncIterator() @@ -563,7 +563,7 @@ final class NIOThrowingAsyncSequenceProducerTests: XCTestCase { let sequence = new.sequence let suspended = expectation(description: "task suspended") - sequence._storage._didSuspend = { suspended.fulfill() } + sequence._storage._setDidSuspend { suspended.fulfill() } let task: Task = Task { let iterator = sequence.makeAsyncIterator() @@ -587,7 +587,7 @@ final class NIOThrowingAsyncSequenceProducerTests: XCTestCase { let resumed = expectation(description: "task resumed") let cancelled = expectation(description: "task cancelled") - sequence._storage._didSuspend = { suspended.fulfill() } + sequence._storage._setDidSuspend { suspended.fulfill() } let task: Task = Task { let iterator = sequence.makeAsyncIterator() @@ -615,7 +615,7 @@ final class NIOThrowingAsyncSequenceProducerTests: XCTestCase { let sequence = try XCTUnwrap(self.sequence) let suspended = expectation(description: "task suspended") - sequence._storage._didSuspend = { suspended.fulfill() } + sequence._storage._setDidSuspend { suspended.fulfill() } let task: Task = Task { let iterator = sequence.makeAsyncIterator() @@ -689,7 +689,7 @@ final class NIOThrowingAsyncSequenceProducerTests: XCTestCase { let sequence = try XCTUnwrap(self.sequence) let suspended = expectation(description: "task suspended") - sequence._storage._didSuspend = { suspended.fulfill() } + sequence._storage._setDidSuspend { suspended.fulfill() } Task { // Would prefer to use async let _ here but that is not allowed yet @@ -707,7 +707,7 @@ final class NIOThrowingAsyncSequenceProducerTests: XCTestCase { let sequence = try XCTUnwrap(self.sequence) let suspended = expectation(description: "task suspended") - sequence._storage._didSuspend = { suspended.fulfill() } + sequence._storage._setDidSuspend { suspended.fulfill() } Task { // Would prefer to use async let _ here but that is not allowed yet @@ -727,7 +727,7 @@ final class NIOThrowingAsyncSequenceProducerTests: XCTestCase { let sequence = try XCTUnwrap(self.sequence) let suspended = expectation(description: "task suspended") - sequence._storage._didSuspend = { suspended.fulfill() } + sequence._storage._setDidSuspend { suspended.fulfill() } Task { // Would prefer to use async let _ here but that is not allowed yet @@ -747,7 +747,7 @@ final class NIOThrowingAsyncSequenceProducerTests: XCTestCase { let sequence = try XCTUnwrap(self.sequence) let suspended = expectation(description: "task suspended") - sequence._storage._didSuspend = { suspended.fulfill() } + sequence._storage._setDidSuspend { suspended.fulfill() } Task { // Would prefer to use async let _ here but that is not allowed yet