Skip to content

Commit

Permalink
Merge branch 'main' into jw-static-wrap
Browse files Browse the repository at this point in the history
  • Loading branch information
weissi authored Jul 18, 2024
2 parents e9f2faa + 223b0e8 commit 5b56715
Show file tree
Hide file tree
Showing 11 changed files with 228 additions and 130 deletions.
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
{
"mallocCountTotal" : 10
"mallocCountTotal" : 8
}
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
{
"mallocCountTotal" : 10
"mallocCountTotal" : 8
}
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
{
"mallocCountTotal" : 10
"mallocCountTotal" : 8
}
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
{
"mallocCountTotal" : 10
"mallocCountTotal" : 8
}
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
{
"mallocCountTotal" : 10
"mallocCountTotal" : 8
}
40 changes: 40 additions & 0 deletions Sources/NIOConcurrencyHelpers/NIOLockedValueBox.swift
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,46 @@ public struct NIOLockedValueBox<Value> {
public func withLockedValue<T>(_ mutate: (inout Value) throws -> T) rethrows -> T {
return try self._storage.withLockedValue(mutate)
}

/// Provides an unsafe view over the lock and its value.
///
/// This can be beneficial when you require fine grained control over the lock in some
/// situations but don't want lose the benefits of ``withLockedValue(_:)`` in others by
/// switching to ``NIOLock``.
public var unsafe: Unsafe {
Unsafe(_storage: self._storage)
}

/// Provides an unsafe view over the lock and its value.
public struct Unsafe {
@usableFromInline
let _storage: LockStorage<Value>

/// Manually acquire the lock.
@inlinable
public func lock() {
self._storage.lock()
}

/// Manually release the lock.
@inlinable
public func unlock() {
self._storage.unlock()
}

/// Mutate the value, assuming the lock has been acquired manually.
///
/// - Parameter mutate: A closure with scoped access to the value.
/// - Returns: The result of the `mutate` closure.
@inlinable
public func withValueAssumingLockIsAcquired<Result>(
_ mutate: (_ value: inout Value) throws -> Result
) rethrows -> Result {
return try self._storage.withUnsafeMutablePointerToHeader { value in
try mutate(&value.pointee)
}
}
}
}

extension NIOLockedValueBox: Sendable where Value: Sendable {}
117 changes: 68 additions & 49 deletions Sources/NIOCore/AsyncSequences/NIOAsyncWriter.swift
Original file line number Diff line number Diff line change
Expand Up @@ -414,12 +414,12 @@ extension NIOAsyncWriter {
extension NIOAsyncWriter {
/// This is the underlying storage of the writer. The goal of this is to synchronize the access to all state.
@usableFromInline
/* fileprivate */ internal final class Storage: @unchecked Sendable {
/* fileprivate */ internal struct Storage: Sendable {
/// Internal type to generate unique yield IDs.
///
/// This type has reference semantics.
@usableFromInline
struct YieldIDGenerator {
struct YieldIDGenerator: Sendable {
/// A struct representing a unique yield ID.
@usableFromInline
struct YieldID: Equatable, Sendable {
Expand All @@ -445,47 +445,61 @@ extension NIOAsyncWriter {
}
}

/// The lock that protects our state.
@usableFromInline
/* private */ internal let _lock = NIOLock()
/// The counter used to assign an ID to all our yields.
@usableFromInline
/* private */ internal let _yieldIDGenerator = YieldIDGenerator()
/// The state machine.
@usableFromInline
/* private */ internal var _stateMachine: StateMachine
/* private */ internal let _state: NIOLockedValueBox<State>

@usableFromInline
struct State: Sendable {
@usableFromInline
var stateMachine: StateMachine
@usableFromInline
var didSuspend: (@Sendable () -> Void)?

@inlinable
init(stateMachine: StateMachine) {
self.stateMachine = stateMachine
self.didSuspend = nil
}
}

/// Hook used in testing.
@usableFromInline
internal var _didSuspend: (() -> Void)?
internal func _setDidSuspend(_ didSuspend: (@Sendable () -> Void)?) {
self._state.withLockedValue {
$0.didSuspend = didSuspend
}
}

@inlinable
internal var isWriterFinished: Bool {
self._lock.withLock { self._stateMachine.isWriterFinished }
self._state.withLockedValue { $0.stateMachine.isWriterFinished }
}

@inlinable
internal var isSinkFinished: Bool {
self._lock.withLock { self._stateMachine.isSinkFinished }
self._state.withLockedValue { $0.stateMachine.isSinkFinished }
}

@inlinable
/* fileprivate */ internal init(
isWritable: Bool,
delegate: Delegate
) {
self._stateMachine = .init(
isWritable: isWritable,
delegate: delegate
)
let state = State(stateMachine: StateMachine(isWritable: isWritable, delegate: delegate))
self._state = NIOLockedValueBox(state)
}

@inlinable
/* fileprivate */ internal func setWritability(to writability: Bool) {
// We must not resume the continuation while holding the lock
// because it can deadlock in combination with the underlying ulock
// in cases where we race with a cancellation handler
let action = self._lock.withLock {
self._stateMachine.setWritability(to: writability)
let action = self._state.withLockedValue {
$0.stateMachine.setWritability(to: writability)
}

switch action {
Expand Down Expand Up @@ -516,39 +530,42 @@ extension NIOAsyncWriter {

return try await withTaskCancellationHandler {
// We are manually locking here to hold the lock across the withCheckedContinuation call
self._lock.lock()
let unsafe = self._state.unsafe
unsafe.lock()

let action = self._stateMachine.yield(yieldID: yieldID)
let action = unsafe.withValueAssumingLockIsAcquired {
$0.stateMachine.yield(yieldID: yieldID)
}

switch action {
case .callDidYield(let delegate):
// We are allocating a new Deque for every write here
self._lock.unlock()
unsafe.unlock()
delegate.didYield(contentsOf: Deque(sequence))
self.unbufferQueuedEvents()
return .yielded

case .throwError(let error):
self._lock.unlock()
unsafe.unlock()
throw error

case .suspendTask:
return try await withCheckedThrowingContinuation { (continuation: CheckedContinuation<StateMachine.YieldResult, Error>) in
self._stateMachine.yield(
continuation: continuation,
yieldID: yieldID
)
let didSuspend = unsafe.withValueAssumingLockIsAcquired {
$0.stateMachine.yield(continuation: continuation, yieldID: yieldID)
return $0.didSuspend
}

self._lock.unlock()
self._didSuspend?()
unsafe.unlock()
didSuspend?()
}
}
} onCancel: {
// We must not resume the continuation while holding the lock
// because it can deadlock in combination with the underlying ulock
// in cases where we race with a cancellation handler
let action = self._lock.withLock {
self._stateMachine.cancel(yieldID: yieldID)
let action = self._state.withLockedValue {
$0.stateMachine.cancel(yieldID: yieldID)
}

switch action {
Expand Down Expand Up @@ -580,39 +597,41 @@ extension NIOAsyncWriter {

return try await withTaskCancellationHandler {
// We are manually locking here to hold the lock across the withCheckedContinuation call
self._lock.lock()
let unsafe = self._state.unsafe
unsafe.lock()

let action = self._stateMachine.yield(yieldID: yieldID)
let action = unsafe.withValueAssumingLockIsAcquired {
$0.stateMachine.yield(yieldID: yieldID)
}

switch action {
case .callDidYield(let delegate):
// We are allocating a new Deque for every write here
self._lock.unlock()
unsafe.unlock()
delegate.didYield(element)
self.unbufferQueuedEvents()
return .yielded

case .throwError(let error):
self._lock.unlock()
unsafe.unlock()
throw error

case .suspendTask:
return try await withCheckedThrowingContinuation { (continuation: CheckedContinuation<StateMachine.YieldResult, Error>) in
self._stateMachine.yield(
continuation: continuation,
yieldID: yieldID
)

self._lock.unlock()
self._didSuspend?()
let didSuspend = unsafe.withValueAssumingLockIsAcquired {
$0.stateMachine.yield(continuation: continuation, yieldID: yieldID)
return $0.didSuspend
}
unsafe.unlock()
didSuspend?()
}
}
} onCancel: {
// We must not resume the continuation while holding the lock
// because it can deadlock in combination with the underlying ulock
// because it can deadlock in combination with the underlying lock
// in cases where we race with a cancellation handler
let action = self._lock.withLock {
self._stateMachine.cancel(yieldID: yieldID)
let action = self._state.withLockedValue {
$0.stateMachine.cancel(yieldID: yieldID)
}

switch action {
Expand All @@ -630,8 +649,8 @@ extension NIOAsyncWriter {
// We must not resume the continuation while holding the lock
// because it can deadlock in combination with the underlying ulock
// in cases where we race with a cancellation handler
let action = self._lock.withLock {
self._stateMachine.writerFinish(error: error)
let action = self._state.withLockedValue {
$0.stateMachine.writerFinish(error: error)
}

switch action {
Expand All @@ -651,8 +670,8 @@ extension NIOAsyncWriter {
// We must not resume the continuation while holding the lock
// because it can deadlock in combination with the underlying ulock
// in cases where we race with a cancellation handler
let action = self._lock.withLock {
self._stateMachine.sinkFinish(error: error)
let action = self._state.withLockedValue {
$0.stateMachine.sinkFinish(error: error)
}

switch action {
Expand All @@ -667,7 +686,7 @@ extension NIOAsyncWriter {

@inlinable
/* fileprivate */ internal func unbufferQueuedEvents() {
while let action = self._lock.withLock({ self._stateMachine.unbufferQueuedEvents()}) {
while let action = self._state.withLockedValue({ $0.stateMachine.unbufferQueuedEvents()}) {
switch action {
case .callDidTerminate(let delegate, let error):
delegate.didTerminate(error: error)
Expand All @@ -684,12 +703,12 @@ extension NIOAsyncWriter {
@available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *)
extension NIOAsyncWriter {
@usableFromInline
/* private */ internal struct StateMachine {
/* private */ internal struct StateMachine: Sendable {
@usableFromInline
typealias YieldID = Storage.YieldIDGenerator.YieldID
/// This is a small helper struct to encapsulate the two different values for a suspended yield.
@usableFromInline
/* private */ internal struct SuspendedYield {
/* private */ internal struct SuspendedYield: Sendable {
/// The yield's ID.
@usableFromInline
var yieldID: YieldID
Expand All @@ -715,7 +734,7 @@ extension NIOAsyncWriter {

/// The current state of our ``NIOAsyncWriter``.
@usableFromInline
/* private */ internal enum State: CustomStringConvertible {
/* private */ internal enum State: Sendable, CustomStringConvertible {
/// The initial state before either a call to ``NIOAsyncWriter/yield(contentsOf:)`` or
/// ``NIOAsyncWriter/finish(completion:)`` happened.
case initial(
Expand Down
Loading

0 comments on commit 5b56715

Please sign in to comment.