diff --git a/Benchmarks/Thresholds/5.10/NIOCoreBenchmarks.NIOAsyncChannel.init.p90.json b/Benchmarks/Thresholds/5.10/NIOCoreBenchmarks.NIOAsyncChannel.init.p90.json index 2554cd2c39..882a60c306 100644 --- a/Benchmarks/Thresholds/5.10/NIOCoreBenchmarks.NIOAsyncChannel.init.p90.json +++ b/Benchmarks/Thresholds/5.10/NIOCoreBenchmarks.NIOAsyncChannel.init.p90.json @@ -1,3 +1,3 @@ { - "mallocCountTotal" : 10 + "mallocCountTotal" : 8 } diff --git a/Benchmarks/Thresholds/5.8/NIOCoreBenchmarks.NIOAsyncChannel.init.p90.json b/Benchmarks/Thresholds/5.8/NIOCoreBenchmarks.NIOAsyncChannel.init.p90.json index 2554cd2c39..882a60c306 100644 --- a/Benchmarks/Thresholds/5.8/NIOCoreBenchmarks.NIOAsyncChannel.init.p90.json +++ b/Benchmarks/Thresholds/5.8/NIOCoreBenchmarks.NIOAsyncChannel.init.p90.json @@ -1,3 +1,3 @@ { - "mallocCountTotal" : 10 + "mallocCountTotal" : 8 } diff --git a/Benchmarks/Thresholds/5.9/NIOCoreBenchmarks.NIOAsyncChannel.init.p90.json b/Benchmarks/Thresholds/5.9/NIOCoreBenchmarks.NIOAsyncChannel.init.p90.json index 2554cd2c39..882a60c306 100644 --- a/Benchmarks/Thresholds/5.9/NIOCoreBenchmarks.NIOAsyncChannel.init.p90.json +++ b/Benchmarks/Thresholds/5.9/NIOCoreBenchmarks.NIOAsyncChannel.init.p90.json @@ -1,3 +1,3 @@ { - "mallocCountTotal" : 10 + "mallocCountTotal" : 8 } diff --git a/Benchmarks/Thresholds/nightly-6.0/NIOCoreBenchmarks.NIOAsyncChannel.init.p90.json b/Benchmarks/Thresholds/nightly-6.0/NIOCoreBenchmarks.NIOAsyncChannel.init.p90.json index 2554cd2c39..882a60c306 100644 --- a/Benchmarks/Thresholds/nightly-6.0/NIOCoreBenchmarks.NIOAsyncChannel.init.p90.json +++ b/Benchmarks/Thresholds/nightly-6.0/NIOCoreBenchmarks.NIOAsyncChannel.init.p90.json @@ -1,3 +1,3 @@ { - "mallocCountTotal" : 10 + "mallocCountTotal" : 8 } diff --git a/Benchmarks/Thresholds/nightly-main/NIOCoreBenchmarks.NIOAsyncChannel.init.p90.json b/Benchmarks/Thresholds/nightly-main/NIOCoreBenchmarks.NIOAsyncChannel.init.p90.json index 2554cd2c39..882a60c306 100644 --- a/Benchmarks/Thresholds/nightly-main/NIOCoreBenchmarks.NIOAsyncChannel.init.p90.json +++ b/Benchmarks/Thresholds/nightly-main/NIOCoreBenchmarks.NIOAsyncChannel.init.p90.json @@ -1,3 +1,3 @@ { - "mallocCountTotal" : 10 + "mallocCountTotal" : 8 } diff --git a/Sources/NIOConcurrencyHelpers/NIOLockedValueBox.swift b/Sources/NIOConcurrencyHelpers/NIOLockedValueBox.swift index 2c57e01d33..06cf88529f 100644 --- a/Sources/NIOConcurrencyHelpers/NIOLockedValueBox.swift +++ b/Sources/NIOConcurrencyHelpers/NIOLockedValueBox.swift @@ -37,6 +37,46 @@ public struct NIOLockedValueBox { public func withLockedValue(_ mutate: (inout Value) throws -> T) rethrows -> T { return try self._storage.withLockedValue(mutate) } + + /// Provides an unsafe view over the lock and its value. + /// + /// This can be beneficial when you require fine grained control over the lock in some + /// situations but don't want lose the benefits of ``withLockedValue(_:)`` in others by + /// switching to ``NIOLock``. + public var unsafe: Unsafe { + Unsafe(_storage: self._storage) + } + + /// Provides an unsafe view over the lock and its value. + public struct Unsafe { + @usableFromInline + let _storage: LockStorage + + /// Manually acquire the lock. + @inlinable + public func lock() { + self._storage.lock() + } + + /// Manually release the lock. + @inlinable + public func unlock() { + self._storage.unlock() + } + + /// Mutate the value, assuming the lock has been acquired manually. + /// + /// - Parameter mutate: A closure with scoped access to the value. + /// - Returns: The result of the `mutate` closure. + @inlinable + public func withValueAssumingLockIsAcquired( + _ mutate: (_ value: inout Value) throws -> Result + ) rethrows -> Result { + return try self._storage.withUnsafeMutablePointerToHeader { value in + try mutate(&value.pointee) + } + } + } } extension NIOLockedValueBox: Sendable where Value: Sendable {} diff --git a/Sources/NIOCore/AsyncSequences/NIOAsyncWriter.swift b/Sources/NIOCore/AsyncSequences/NIOAsyncWriter.swift index fe7e6af235..163dcaf076 100644 --- a/Sources/NIOCore/AsyncSequences/NIOAsyncWriter.swift +++ b/Sources/NIOCore/AsyncSequences/NIOAsyncWriter.swift @@ -414,12 +414,12 @@ extension NIOAsyncWriter { extension NIOAsyncWriter { /// This is the underlying storage of the writer. The goal of this is to synchronize the access to all state. @usableFromInline - /* fileprivate */ internal final class Storage: @unchecked Sendable { + /* fileprivate */ internal struct Storage: Sendable { /// Internal type to generate unique yield IDs. /// /// This type has reference semantics. @usableFromInline - struct YieldIDGenerator { + struct YieldIDGenerator: Sendable { /// A struct representing a unique yield ID. @usableFromInline struct YieldID: Equatable, Sendable { @@ -445,27 +445,43 @@ extension NIOAsyncWriter { } } - /// The lock that protects our state. - @usableFromInline - /* private */ internal let _lock = NIOLock() /// The counter used to assign an ID to all our yields. @usableFromInline /* private */ internal let _yieldIDGenerator = YieldIDGenerator() /// The state machine. @usableFromInline - /* private */ internal var _stateMachine: StateMachine + /* private */ internal let _state: NIOLockedValueBox + + @usableFromInline + struct State: Sendable { + @usableFromInline + var stateMachine: StateMachine + @usableFromInline + var didSuspend: (@Sendable () -> Void)? + + @inlinable + init(stateMachine: StateMachine) { + self.stateMachine = stateMachine + self.didSuspend = nil + } + } + /// Hook used in testing. @usableFromInline - internal var _didSuspend: (() -> Void)? + internal func _setDidSuspend(_ didSuspend: (@Sendable () -> Void)?) { + self._state.withLockedValue { + $0.didSuspend = didSuspend + } + } @inlinable internal var isWriterFinished: Bool { - self._lock.withLock { self._stateMachine.isWriterFinished } + self._state.withLockedValue { $0.stateMachine.isWriterFinished } } @inlinable internal var isSinkFinished: Bool { - self._lock.withLock { self._stateMachine.isSinkFinished } + self._state.withLockedValue { $0.stateMachine.isSinkFinished } } @inlinable @@ -473,10 +489,8 @@ extension NIOAsyncWriter { isWritable: Bool, delegate: Delegate ) { - self._stateMachine = .init( - isWritable: isWritable, - delegate: delegate - ) + let state = State(stateMachine: StateMachine(isWritable: isWritable, delegate: delegate)) + self._state = NIOLockedValueBox(state) } @inlinable @@ -484,8 +498,8 @@ extension NIOAsyncWriter { // We must not resume the continuation while holding the lock // because it can deadlock in combination with the underlying ulock // in cases where we race with a cancellation handler - let action = self._lock.withLock { - self._stateMachine.setWritability(to: writability) + let action = self._state.withLockedValue { + $0.stateMachine.setWritability(to: writability) } switch action { @@ -516,39 +530,42 @@ extension NIOAsyncWriter { return try await withTaskCancellationHandler { // We are manually locking here to hold the lock across the withCheckedContinuation call - self._lock.lock() + let unsafe = self._state.unsafe + unsafe.lock() - let action = self._stateMachine.yield(yieldID: yieldID) + let action = unsafe.withValueAssumingLockIsAcquired { + $0.stateMachine.yield(yieldID: yieldID) + } switch action { case .callDidYield(let delegate): // We are allocating a new Deque for every write here - self._lock.unlock() + unsafe.unlock() delegate.didYield(contentsOf: Deque(sequence)) self.unbufferQueuedEvents() return .yielded case .throwError(let error): - self._lock.unlock() + unsafe.unlock() throw error case .suspendTask: return try await withCheckedThrowingContinuation { (continuation: CheckedContinuation) in - self._stateMachine.yield( - continuation: continuation, - yieldID: yieldID - ) + let didSuspend = unsafe.withValueAssumingLockIsAcquired { + $0.stateMachine.yield(continuation: continuation, yieldID: yieldID) + return $0.didSuspend + } - self._lock.unlock() - self._didSuspend?() + unsafe.unlock() + didSuspend?() } } } onCancel: { // We must not resume the continuation while holding the lock // because it can deadlock in combination with the underlying ulock // in cases where we race with a cancellation handler - let action = self._lock.withLock { - self._stateMachine.cancel(yieldID: yieldID) + let action = self._state.withLockedValue { + $0.stateMachine.cancel(yieldID: yieldID) } switch action { @@ -580,39 +597,41 @@ extension NIOAsyncWriter { return try await withTaskCancellationHandler { // We are manually locking here to hold the lock across the withCheckedContinuation call - self._lock.lock() + let unsafe = self._state.unsafe + unsafe.lock() - let action = self._stateMachine.yield(yieldID: yieldID) + let action = unsafe.withValueAssumingLockIsAcquired { + $0.stateMachine.yield(yieldID: yieldID) + } switch action { case .callDidYield(let delegate): // We are allocating a new Deque for every write here - self._lock.unlock() + unsafe.unlock() delegate.didYield(element) self.unbufferQueuedEvents() return .yielded case .throwError(let error): - self._lock.unlock() + unsafe.unlock() throw error case .suspendTask: return try await withCheckedThrowingContinuation { (continuation: CheckedContinuation) in - self._stateMachine.yield( - continuation: continuation, - yieldID: yieldID - ) - - self._lock.unlock() - self._didSuspend?() + let didSuspend = unsafe.withValueAssumingLockIsAcquired { + $0.stateMachine.yield(continuation: continuation, yieldID: yieldID) + return $0.didSuspend + } + unsafe.unlock() + didSuspend?() } } } onCancel: { // We must not resume the continuation while holding the lock - // because it can deadlock in combination with the underlying ulock + // because it can deadlock in combination with the underlying lock // in cases where we race with a cancellation handler - let action = self._lock.withLock { - self._stateMachine.cancel(yieldID: yieldID) + let action = self._state.withLockedValue { + $0.stateMachine.cancel(yieldID: yieldID) } switch action { @@ -630,8 +649,8 @@ extension NIOAsyncWriter { // We must not resume the continuation while holding the lock // because it can deadlock in combination with the underlying ulock // in cases where we race with a cancellation handler - let action = self._lock.withLock { - self._stateMachine.writerFinish(error: error) + let action = self._state.withLockedValue { + $0.stateMachine.writerFinish(error: error) } switch action { @@ -651,8 +670,8 @@ extension NIOAsyncWriter { // We must not resume the continuation while holding the lock // because it can deadlock in combination with the underlying ulock // in cases where we race with a cancellation handler - let action = self._lock.withLock { - self._stateMachine.sinkFinish(error: error) + let action = self._state.withLockedValue { + $0.stateMachine.sinkFinish(error: error) } switch action { @@ -667,7 +686,7 @@ extension NIOAsyncWriter { @inlinable /* fileprivate */ internal func unbufferQueuedEvents() { - while let action = self._lock.withLock({ self._stateMachine.unbufferQueuedEvents()}) { + while let action = self._state.withLockedValue({ $0.stateMachine.unbufferQueuedEvents()}) { switch action { case .callDidTerminate(let delegate, let error): delegate.didTerminate(error: error) @@ -684,12 +703,12 @@ extension NIOAsyncWriter { @available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *) extension NIOAsyncWriter { @usableFromInline - /* private */ internal struct StateMachine { + /* private */ internal struct StateMachine: Sendable { @usableFromInline typealias YieldID = Storage.YieldIDGenerator.YieldID /// This is a small helper struct to encapsulate the two different values for a suspended yield. @usableFromInline - /* private */ internal struct SuspendedYield { + /* private */ internal struct SuspendedYield: Sendable { /// The yield's ID. @usableFromInline var yieldID: YieldID @@ -715,7 +734,7 @@ extension NIOAsyncWriter { /// The current state of our ``NIOAsyncWriter``. @usableFromInline - /* private */ internal enum State: CustomStringConvertible { + /* private */ internal enum State: Sendable, CustomStringConvertible { /// The initial state before either a call to ``NIOAsyncWriter/yield(contentsOf:)`` or /// ``NIOAsyncWriter/finish(completion:)`` happened. case initial( diff --git a/Sources/NIOCore/AsyncSequences/NIOThrowingAsyncSequenceProducer.swift b/Sources/NIOCore/AsyncSequences/NIOThrowingAsyncSequenceProducer.swift index 7b852dd6b6..0477c01969 100644 --- a/Sources/NIOCore/AsyncSequences/NIOThrowingAsyncSequenceProducer.swift +++ b/Sources/NIOCore/AsyncSequences/NIOThrowingAsyncSequenceProducer.swift @@ -388,23 +388,41 @@ extension NIOThrowingAsyncSequenceProducer { extension NIOThrowingAsyncSequenceProducer { /// This is the underlying storage of the sequence. The goal of this is to synchronize the access to all state. @usableFromInline - /* fileprivate */ internal final class Storage: @unchecked Sendable { - /// The lock that protects our state. + /* fileprivate */ internal struct Storage: Sendable { @usableFromInline - /* private */ internal let _lock = NIOLock() - /// The state machine. - @usableFromInline - /* private */ internal var _stateMachine: StateMachine - /// The delegate. + struct State: Sendable { + @usableFromInline + var stateMachine: StateMachine + @usableFromInline + var delegate: Delegate? + @usableFromInline + var didSuspend: (@Sendable () -> Void)? + + @inlinable + init( + stateMachine: StateMachine, + delegate: Delegate? = nil, + didSuspend: (@Sendable () -> Void)? = nil + ) { + self.stateMachine = stateMachine + self.delegate = delegate + self.didSuspend = didSuspend + } + } + @usableFromInline - /* private */ internal var _delegate: Delegate? - /// Hook used in testing. + internal let _state: NIOLockedValueBox + @usableFromInline - internal var _didSuspend: (() -> Void)? + internal func _setDidSuspend(_ didSuspend: (@Sendable () -> Void)?) { + self._state.withLockedValue { + $0.didSuspend = didSuspend + } + } @inlinable var isFinished: Bool { - self._lock.withLock { self._stateMachine.isFinished } + self._state.withLockedValue { $0.stateMachine.isFinished } } @usableFromInline @@ -412,19 +430,22 @@ extension NIOThrowingAsyncSequenceProducer { backPressureStrategy: Strategy, delegate: Delegate ) { - self._stateMachine = .init(backPressureStrategy: backPressureStrategy) - self._delegate = delegate + let state = State( + stateMachine: .init(backPressureStrategy: backPressureStrategy), + delegate: delegate + ) + self._state = NIOLockedValueBox(state) } @inlinable /* fileprivate */ internal func sequenceDeinitialized() { - let delegate: Delegate? = self._lock.withLock { - let action = self._stateMachine.sequenceDeinitialized() + let delegate: Delegate? = self._state.withLockedValue { + let action = $0.stateMachine.sequenceDeinitialized() switch action { case .callDidTerminate: - let delegate = self._delegate - self._delegate = nil + let delegate = $0.delegate + $0.delegate = nil return delegate case .none: @@ -437,20 +458,20 @@ extension NIOThrowingAsyncSequenceProducer { @inlinable /* fileprivate */ internal func iteratorInitialized() { - self._lock.withLock { - self._stateMachine.iteratorInitialized() + self._state.withLockedValue { + $0.stateMachine.iteratorInitialized() } } @inlinable /* fileprivate */ internal func iteratorDeinitialized() { - let delegate: Delegate? = self._lock.withLock { - let action = self._stateMachine.iteratorDeinitialized() + let delegate: Delegate? = self._state.withLockedValue { + let action = $0.stateMachine.iteratorDeinitialized() switch action { case .callDidTerminate: - let delegate = self._delegate - self._delegate = nil + let delegate = $0.delegate + $0.delegate = nil return delegate @@ -467,8 +488,8 @@ extension NIOThrowingAsyncSequenceProducer { // We must not resume the continuation while holding the lock // because it can deadlock in combination with the underlying ulock // in cases where we race with a cancellation handler - let action = self._lock.withLock { - self._stateMachine.yield(sequence) + let action = self._state.withLockedValue { + $0.stateMachine.yield(sequence) } switch action { @@ -498,13 +519,13 @@ extension NIOThrowingAsyncSequenceProducer { // We must not resume the continuation while holding the lock // because it can deadlock in combination with the underlying ulock // in cases where we race with a cancellation handler - let (delegate, action): (Delegate?, NIOThrowingAsyncSequenceProducer.StateMachine.FinishAction) = self._lock.withLock { - let action = self._stateMachine.finish(failure) - + let (delegate, action): (Delegate?, NIOThrowingAsyncSequenceProducer.StateMachine.FinishAction) = self._state.withLockedValue { + let action = $0.stateMachine.finish(failure) + switch action { case .resumeContinuationWithFailureAndCallDidTerminate: - let delegate = self._delegate - self._delegate = nil + let delegate = $0.delegate + $0.delegate = nil return (delegate, action) case .none: @@ -530,28 +551,36 @@ extension NIOThrowingAsyncSequenceProducer { @inlinable /* fileprivate */ internal func next() async throws -> Element? { - try await withTaskCancellationHandler { - self._lock.lock() + try await withTaskCancellationHandler { () async throws -> Element? in + let unsafe = self._state.unsafe + unsafe.lock() - let action = self._stateMachine.next() + let action = unsafe.withValueAssumingLockIsAcquired { + $0.stateMachine.next() + } switch action { case .returnElement(let element): - self._lock.unlock() + unsafe.unlock() return element case .returnElementAndCallProduceMore(let element): - let delegate = self._delegate - self._lock.unlock() + let delegate = unsafe.withValueAssumingLockIsAcquired { + $0.delegate + } + unsafe.unlock() delegate?.produceMore() return element case .returnFailureAndCallDidTerminate(let failure): - let delegate = self._delegate - self._delegate = nil - self._lock.unlock() + let delegate = unsafe.withValueAssumingLockIsAcquired { + let delegate = $0.delegate + $0.delegate = nil + return delegate + } + unsafe.unlock() delegate?.didTerminate() @@ -564,7 +593,7 @@ extension NIOThrowingAsyncSequenceProducer { } case .returnCancellationError: - self._lock.unlock() + unsafe.unlock() // We have deprecated the generic Failure type in the public API and Failure should // now be `Swift.Error`. However, if users have not migrated to the new API they could // still use a custom generic Error type and this cast might fail. @@ -579,45 +608,55 @@ extension NIOThrowingAsyncSequenceProducer { return nil case .returnNil: - self._lock.unlock() + unsafe.unlock() return nil case .suspendTask: // It is safe to hold the lock across this method // since the closure is guaranteed to be run straight away - return try await withCheckedThrowingContinuation { continuation in - let action = self._stateMachine.next(for: continuation) + return try await withCheckedThrowingContinuation { (continuation: CheckedContinuation) in + let (action, callDidSuspend) = unsafe.withValueAssumingLockIsAcquired { + let action = $0.stateMachine.next(for: continuation) + let callDidSuspend = $0.didSuspend != nil + return (action, callDidSuspend) + } switch action { case .callProduceMore: - let delegate = _delegate - self._lock.unlock() + let delegate = unsafe.withValueAssumingLockIsAcquired { + $0.delegate + } + unsafe.unlock() delegate?.produceMore() case .none: - self._lock.unlock() + unsafe.unlock() + } + + if callDidSuspend { + let didSuspend = self._state.withLockedValue { $0.didSuspend } + didSuspend?() } - self._didSuspend?() } } } onCancel: { // We must not resume the continuation while holding the lock // because it can deadlock in combination with the underlying ulock // in cases where we race with a cancellation handler - let (delegate, action): (Delegate?, NIOThrowingAsyncSequenceProducer.StateMachine.CancelledAction) = self._lock.withLock { - let action = self._stateMachine.cancelled() + let (delegate, action): (Delegate?, NIOThrowingAsyncSequenceProducer.StateMachine.CancelledAction) = self._state.withLockedValue { + let action = $0.stateMachine.cancelled() switch action { case .callDidTerminate: - let delegate = self._delegate - self._delegate = nil + let delegate = $0.delegate + $0.delegate = nil return (delegate, action) case .resumeContinuationWithCancellationErrorAndCallDidTerminate: - let delegate = self._delegate - self._delegate = nil + let delegate = $0.delegate + $0.delegate = nil return (delegate, action) @@ -658,9 +697,9 @@ extension NIOThrowingAsyncSequenceProducer { @available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *) extension NIOThrowingAsyncSequenceProducer { @usableFromInline - /* private */ internal struct StateMachine { + /* private */ internal struct StateMachine: Sendable { @usableFromInline - /* private */ internal enum State { + /* private */ internal enum State: Sendable { /// The initial state before either a call to `yield()` or a call to `next()` happened case initial( backPressureStrategy: Strategy, diff --git a/Tests/NIOCoreTests/AsyncSequences/NIOAsyncSequenceTests.swift b/Tests/NIOCoreTests/AsyncSequences/NIOAsyncSequenceTests.swift index db2471c43b..4f7b9ec361 100644 --- a/Tests/NIOCoreTests/AsyncSequences/NIOAsyncSequenceTests.swift +++ b/Tests/NIOCoreTests/AsyncSequences/NIOAsyncSequenceTests.swift @@ -269,7 +269,7 @@ final class NIOAsyncSequenceProducerTests: XCTestCase { let sequence = try XCTUnwrap(self.sequence) let suspended = expectation(description: "task suspended") - sequence._throwingSequence._storage._didSuspend = { suspended.fulfill() } + sequence._throwingSequence._storage._setDidSuspend { suspended.fulfill() } async let element = sequence.first { _ in true } @@ -351,7 +351,7 @@ final class NIOAsyncSequenceProducerTests: XCTestCase { let element: Int? = try await withThrowingTaskGroup(of: Int?.self) { group in let suspended = expectation(description: "task suspended") - sequence!._throwingSequence._storage._didSuspend = { suspended.fulfill() } + sequence!._throwingSequence._storage._setDidSuspend { suspended.fulfill() } group.addTask { let element = await sequence!.first { _ in true } @@ -439,7 +439,7 @@ final class NIOAsyncSequenceProducerTests: XCTestCase { let sequence = try XCTUnwrap(self.sequence) let suspended = expectation(description: "task suspended") - sequence._throwingSequence._storage._didSuspend = { suspended.fulfill() } + sequence._throwingSequence._storage._setDidSuspend { suspended.fulfill() } let task: Task = Task { @@ -462,7 +462,7 @@ final class NIOAsyncSequenceProducerTests: XCTestCase { let resumed = expectation(description: "task resumed") let cancelled = expectation(description: "task cancelled") - sequence._throwingSequence._storage._didSuspend = { suspended.fulfill() } + sequence._throwingSequence._storage._setDidSuspend { suspended.fulfill() } let task: Task = Task { let iterator = sequence.makeAsyncIterator() @@ -490,7 +490,7 @@ final class NIOAsyncSequenceProducerTests: XCTestCase { let sequence = try XCTUnwrap(self.sequence) let suspended = expectation(description: "task suspended") - sequence._throwingSequence._storage._didSuspend = { suspended.fulfill() } + sequence._throwingSequence._storage._setDidSuspend { suspended.fulfill() } let task: Task = Task { let iterator = sequence.makeAsyncIterator() diff --git a/Tests/NIOCoreTests/AsyncSequences/NIOAsyncWriterTests.swift b/Tests/NIOCoreTests/AsyncSequences/NIOAsyncWriterTests.swift index 9b93841252..5787068510 100644 --- a/Tests/NIOCoreTests/AsyncSequences/NIOAsyncWriterTests.swift +++ b/Tests/NIOCoreTests/AsyncSequences/NIOAsyncWriterTests.swift @@ -77,7 +77,7 @@ final class NIOAsyncWriterTests: XCTestCase { ) self.writer = newWriter.writer self.sink = newWriter.sink - self.sink._storage._didSuspend = self.delegate.didSuspend + self.sink._storage._setDidSuspend { self.delegate.didSuspend() } } override func tearDown() { diff --git a/Tests/NIOCoreTests/AsyncSequences/NIOThrowingAsyncSequenceTests.swift b/Tests/NIOCoreTests/AsyncSequences/NIOThrowingAsyncSequenceTests.swift index 237be2c28b..0d9acc1018 100644 --- a/Tests/NIOCoreTests/AsyncSequences/NIOThrowingAsyncSequenceTests.swift +++ b/Tests/NIOCoreTests/AsyncSequences/NIOThrowingAsyncSequenceTests.swift @@ -107,7 +107,7 @@ final class NIOThrowingAsyncSequenceProducerTests: XCTestCase { let element: Int? = try await withThrowingTaskGroup(of: Int?.self) { group in let suspended = expectation(description: "task suspended") - sequence._storage._didSuspend = { suspended.fulfill() } + sequence._storage._setDidSuspend { suspended.fulfill() } group.addTask { try await sequence.first { _ in true } @@ -135,7 +135,7 @@ final class NIOThrowingAsyncSequenceProducerTests: XCTestCase { let element: Int? = try await withThrowingTaskGroup(of: Int?.self) { group in let suspended = expectation(description: "task suspended") - sequence._storage._didSuspend = { suspended.fulfill() } + sequence._storage._setDidSuspend { suspended.fulfill() } group.addTask { try await sequence.first { _ in true } @@ -163,7 +163,7 @@ final class NIOThrowingAsyncSequenceProducerTests: XCTestCase { await withThrowingTaskGroup(of: Void.self) { group in let suspended = expectation(description: "task suspended") - sequence._storage._didSuspend = { suspended.fulfill() } + sequence._storage._setDidSuspend { suspended.fulfill() } group.addTask { _ = try await sequence.first { _ in true } @@ -188,7 +188,7 @@ final class NIOThrowingAsyncSequenceProducerTests: XCTestCase { await withThrowingTaskGroup(of: Void.self) { group in let suspended = expectation(description: "task suspended") - sequence._storage._didSuspend = { suspended.fulfill() } + sequence._storage._setDidSuspend { suspended.fulfill() } group.addTask { _ = try await sequence.first { _ in true } @@ -247,7 +247,7 @@ final class NIOThrowingAsyncSequenceProducerTests: XCTestCase { let element: Int? = try await withThrowingTaskGroup(of: Int?.self) { group in let suspended = expectation(description: "task suspended") - sequence._storage._didSuspend = { suspended.fulfill() } + sequence._storage._setDidSuspend { suspended.fulfill() } group.addTask { let element = try await sequence.first { _ in true } @@ -330,7 +330,7 @@ final class NIOThrowingAsyncSequenceProducerTests: XCTestCase { await XCTAssertThrowsError(try await withThrowingTaskGroup(of: Void.self) { group in let suspended = expectation(description: "task suspended") - sequence._storage._didSuspend = { suspended.fulfill() } + sequence._storage._setDidSuspend { suspended.fulfill() } group.addTask { _ = try await sequence.first { _ in true } @@ -442,7 +442,7 @@ final class NIOThrowingAsyncSequenceProducerTests: XCTestCase { let element: Int? = try await withThrowingTaskGroup(of: Int?.self) { group in let suspended = expectation(description: "task suspended") - sequence!._storage._didSuspend = { suspended.fulfill() } + sequence!._storage._setDidSuspend { suspended.fulfill() } group.addTask { let element = try await sequence!.first { _ in true } @@ -532,7 +532,7 @@ final class NIOThrowingAsyncSequenceProducerTests: XCTestCase { let sequence = try XCTUnwrap(self.sequence) let suspended = expectation(description: "task suspended") - sequence._storage._didSuspend = { suspended.fulfill() } + sequence._storage._setDidSuspend { suspended.fulfill() } let task: Task = Task { let iterator = sequence.makeAsyncIterator() @@ -563,7 +563,7 @@ final class NIOThrowingAsyncSequenceProducerTests: XCTestCase { let sequence = new.sequence let suspended = expectation(description: "task suspended") - sequence._storage._didSuspend = { suspended.fulfill() } + sequence._storage._setDidSuspend { suspended.fulfill() } let task: Task = Task { let iterator = sequence.makeAsyncIterator() @@ -587,7 +587,7 @@ final class NIOThrowingAsyncSequenceProducerTests: XCTestCase { let resumed = expectation(description: "task resumed") let cancelled = expectation(description: "task cancelled") - sequence._storage._didSuspend = { suspended.fulfill() } + sequence._storage._setDidSuspend { suspended.fulfill() } let task: Task = Task { let iterator = sequence.makeAsyncIterator() @@ -615,7 +615,7 @@ final class NIOThrowingAsyncSequenceProducerTests: XCTestCase { let sequence = try XCTUnwrap(self.sequence) let suspended = expectation(description: "task suspended") - sequence._storage._didSuspend = { suspended.fulfill() } + sequence._storage._setDidSuspend { suspended.fulfill() } let task: Task = Task { let iterator = sequence.makeAsyncIterator() @@ -689,7 +689,7 @@ final class NIOThrowingAsyncSequenceProducerTests: XCTestCase { let sequence = try XCTUnwrap(self.sequence) let suspended = expectation(description: "task suspended") - sequence._storage._didSuspend = { suspended.fulfill() } + sequence._storage._setDidSuspend { suspended.fulfill() } Task { // Would prefer to use async let _ here but that is not allowed yet @@ -707,7 +707,7 @@ final class NIOThrowingAsyncSequenceProducerTests: XCTestCase { let sequence = try XCTUnwrap(self.sequence) let suspended = expectation(description: "task suspended") - sequence._storage._didSuspend = { suspended.fulfill() } + sequence._storage._setDidSuspend { suspended.fulfill() } Task { // Would prefer to use async let _ here but that is not allowed yet @@ -727,7 +727,7 @@ final class NIOThrowingAsyncSequenceProducerTests: XCTestCase { let sequence = try XCTUnwrap(self.sequence) let suspended = expectation(description: "task suspended") - sequence._storage._didSuspend = { suspended.fulfill() } + sequence._storage._setDidSuspend { suspended.fulfill() } Task { // Would prefer to use async let _ here but that is not allowed yet @@ -747,7 +747,7 @@ final class NIOThrowingAsyncSequenceProducerTests: XCTestCase { let sequence = try XCTUnwrap(self.sequence) let suspended = expectation(description: "task suspended") - sequence._storage._didSuspend = { suspended.fulfill() } + sequence._storage._setDidSuspend { suspended.fulfill() } Task { // Would prefer to use async let _ here but that is not allowed yet