@@ -414,12 +414,12 @@ extension NIOAsyncWriter {
414
414
extension NIOAsyncWriter {
415
415
/// This is the underlying storage of the writer. The goal of this is to synchronize the access to all state.
416
416
@usableFromInline
417
- /* fileprivate */ internal final class Storage : @ unchecked Sendable {
417
+ /* fileprivate */ internal struct Storage : Sendable {
418
418
/// Internal type to generate unique yield IDs.
419
419
///
420
420
/// This type has reference semantics.
421
421
@usableFromInline
422
- struct YieldIDGenerator {
422
+ struct YieldIDGenerator : Sendable {
423
423
/// A struct representing a unique yield ID.
424
424
@usableFromInline
425
425
struct YieldID : Equatable , Sendable {
@@ -445,47 +445,61 @@ extension NIOAsyncWriter {
445
445
}
446
446
}
447
447
448
- /// The lock that protects our state.
449
- @usableFromInline
450
- /* private */ internal let _lock = NIOLock ( )
451
448
/// The counter used to assign an ID to all our yields.
452
449
@usableFromInline
453
450
/* private */ internal let _yieldIDGenerator = YieldIDGenerator ( )
454
451
/// The state machine.
455
452
@usableFromInline
456
- /* private */ internal var _stateMachine : StateMachine
453
+ /* private */ internal let _state : NIOLockedValueBox < State >
454
+
455
+ @usableFromInline
456
+ struct State : Sendable {
457
+ @usableFromInline
458
+ var stateMachine : StateMachine
459
+ @usableFromInline
460
+ var didSuspend : ( @Sendable ( ) -> Void ) ?
461
+
462
+ @inlinable
463
+ init ( stateMachine: StateMachine ) {
464
+ self . stateMachine = stateMachine
465
+ self . didSuspend = nil
466
+ }
467
+ }
468
+
457
469
/// Hook used in testing.
458
470
@usableFromInline
459
- internal var _didSuspend : ( ( ) -> Void ) ?
471
+ internal func _setDidSuspend( _ didSuspend: ( @Sendable ( ) -> Void ) ? ) {
472
+ self . _state. withLockedValue {
473
+ $0. didSuspend = didSuspend
474
+ }
475
+ }
460
476
461
477
@inlinable
462
478
internal var isWriterFinished : Bool {
463
- self . _lock . withLock { self . _stateMachine . isWriterFinished }
479
+ self . _state . withLockedValue { $0 . stateMachine . isWriterFinished }
464
480
}
465
481
466
482
@inlinable
467
483
internal var isSinkFinished : Bool {
468
- self . _lock . withLock { self . _stateMachine . isSinkFinished }
484
+ self . _state . withLockedValue { $0 . stateMachine . isSinkFinished }
469
485
}
470
486
471
487
@inlinable
472
488
/* fileprivate */ internal init (
473
489
isWritable: Bool ,
474
490
delegate: Delegate
475
491
) {
476
- self . _stateMachine = . init(
477
- isWritable: isWritable,
478
- delegate: delegate
479
- )
492
+ let state = State ( stateMachine: StateMachine ( isWritable: isWritable, delegate: delegate) )
493
+ self . _state = NIOLockedValueBox ( state)
480
494
}
481
495
482
496
@inlinable
483
497
/* fileprivate */ internal func setWritability( to writability: Bool ) {
484
498
// We must not resume the continuation while holding the lock
485
499
// because it can deadlock in combination with the underlying ulock
486
500
// in cases where we race with a cancellation handler
487
- let action = self . _lock . withLock {
488
- self . _stateMachine . setWritability ( to: writability)
501
+ let action = self . _state . withLockedValue {
502
+ $0 . stateMachine . setWritability ( to: writability)
489
503
}
490
504
491
505
switch action {
@@ -516,39 +530,42 @@ extension NIOAsyncWriter {
516
530
517
531
return try await withTaskCancellationHandler {
518
532
// We are manually locking here to hold the lock across the withCheckedContinuation call
519
- self . _lock. lock ( )
533
+ let unsafe = self . _state. unsafe
534
+ unsafe. lock ( )
520
535
521
- let action = self . _stateMachine. yield ( yieldID: yieldID)
536
+ let action = unsafe. withValueAssumingLockIsAcquired {
537
+ $0. stateMachine. yield ( yieldID: yieldID)
538
+ }
522
539
523
540
switch action {
524
541
case . callDidYield( let delegate) :
525
542
// We are allocating a new Deque for every write here
526
- self . _lock . unlock ( )
543
+ unsafe . unlock ( )
527
544
delegate. didYield ( contentsOf: Deque ( sequence) )
528
545
self . unbufferQueuedEvents ( )
529
546
return . yielded
530
547
531
548
case . throwError( let error) :
532
- self . _lock . unlock ( )
549
+ unsafe . unlock ( )
533
550
throw error
534
551
535
552
case . suspendTask:
536
553
return try await withCheckedThrowingContinuation { ( continuation: CheckedContinuation < StateMachine . YieldResult , Error > ) in
537
- self . _stateMachine . yield (
538
- continuation: continuation,
539
- yieldID : yieldID
540
- )
554
+ let didSuspend = unsafe . withValueAssumingLockIsAcquired {
555
+ $0 . stateMachine . yield ( continuation: continuation, yieldID : yieldID )
556
+ return $0 . didSuspend
557
+ }
541
558
542
- self . _lock . unlock ( )
543
- self . _didSuspend ? ( )
559
+ unsafe . unlock ( )
560
+ didSuspend ? ( )
544
561
}
545
562
}
546
563
} onCancel: {
547
564
// We must not resume the continuation while holding the lock
548
565
// because it can deadlock in combination with the underlying ulock
549
566
// in cases where we race with a cancellation handler
550
- let action = self . _lock . withLock {
551
- self . _stateMachine . cancel ( yieldID: yieldID)
567
+ let action = self . _state . withLockedValue {
568
+ $0 . stateMachine . cancel ( yieldID: yieldID)
552
569
}
553
570
554
571
switch action {
@@ -580,39 +597,41 @@ extension NIOAsyncWriter {
580
597
581
598
return try await withTaskCancellationHandler {
582
599
// We are manually locking here to hold the lock across the withCheckedContinuation call
583
- self . _lock. lock ( )
600
+ let unsafe = self . _state. unsafe
601
+ unsafe. lock ( )
584
602
585
- let action = self . _stateMachine. yield ( yieldID: yieldID)
603
+ let action = unsafe. withValueAssumingLockIsAcquired {
604
+ $0. stateMachine. yield ( yieldID: yieldID)
605
+ }
586
606
587
607
switch action {
588
608
case . callDidYield( let delegate) :
589
609
// We are allocating a new Deque for every write here
590
- self . _lock . unlock ( )
610
+ unsafe . unlock ( )
591
611
delegate. didYield ( element)
592
612
self . unbufferQueuedEvents ( )
593
613
return . yielded
594
614
595
615
case . throwError( let error) :
596
- self . _lock . unlock ( )
616
+ unsafe . unlock ( )
597
617
throw error
598
618
599
619
case . suspendTask:
600
620
return try await withCheckedThrowingContinuation { ( continuation: CheckedContinuation < StateMachine . YieldResult , Error > ) in
601
- self . _stateMachine. yield (
602
- continuation: continuation,
603
- yieldID: yieldID
604
- )
605
-
606
- self . _lock. unlock ( )
607
- self . _didSuspend ? ( )
621
+ let didSuspend = unsafe. withValueAssumingLockIsAcquired {
622
+ $0. stateMachine. yield ( continuation: continuation, yieldID: yieldID)
623
+ return $0. didSuspend
624
+ }
625
+ unsafe. unlock ( )
626
+ didSuspend ? ( )
608
627
}
609
628
}
610
629
} onCancel: {
611
630
// We must not resume the continuation while holding the lock
612
- // because it can deadlock in combination with the underlying ulock
631
+ // because it can deadlock in combination with the underlying lock
613
632
// in cases where we race with a cancellation handler
614
- let action = self . _lock . withLock {
615
- self . _stateMachine . cancel ( yieldID: yieldID)
633
+ let action = self . _state . withLockedValue {
634
+ $0 . stateMachine . cancel ( yieldID: yieldID)
616
635
}
617
636
618
637
switch action {
@@ -630,8 +649,8 @@ extension NIOAsyncWriter {
630
649
// We must not resume the continuation while holding the lock
631
650
// because it can deadlock in combination with the underlying ulock
632
651
// in cases where we race with a cancellation handler
633
- let action = self . _lock . withLock {
634
- self . _stateMachine . writerFinish ( error: error)
652
+ let action = self . _state . withLockedValue {
653
+ $0 . stateMachine . writerFinish ( error: error)
635
654
}
636
655
637
656
switch action {
@@ -651,8 +670,8 @@ extension NIOAsyncWriter {
651
670
// We must not resume the continuation while holding the lock
652
671
// because it can deadlock in combination with the underlying ulock
653
672
// in cases where we race with a cancellation handler
654
- let action = self . _lock . withLock {
655
- self . _stateMachine . sinkFinish ( error: error)
673
+ let action = self . _state . withLockedValue {
674
+ $0 . stateMachine . sinkFinish ( error: error)
656
675
}
657
676
658
677
switch action {
@@ -667,7 +686,7 @@ extension NIOAsyncWriter {
667
686
668
687
@inlinable
669
688
/* fileprivate */ internal func unbufferQueuedEvents( ) {
670
- while let action = self . _lock . withLock ( { self . _stateMachine . unbufferQueuedEvents ( ) } ) {
689
+ while let action = self . _state . withLockedValue ( { $0 . stateMachine . unbufferQueuedEvents ( ) } ) {
671
690
switch action {
672
691
case . callDidTerminate( let delegate, let error) :
673
692
delegate. didTerminate ( error: error)
@@ -684,12 +703,12 @@ extension NIOAsyncWriter {
684
703
@available ( macOS 10 . 15 , iOS 13 , tvOS 13 , watchOS 6 , * )
685
704
extension NIOAsyncWriter {
686
705
@usableFromInline
687
- /* private */ internal struct StateMachine {
706
+ /* private */ internal struct StateMachine : Sendable {
688
707
@usableFromInline
689
708
typealias YieldID = Storage . YieldIDGenerator . YieldID
690
709
/// This is a small helper struct to encapsulate the two different values for a suspended yield.
691
710
@usableFromInline
692
- /* private */ internal struct SuspendedYield {
711
+ /* private */ internal struct SuspendedYield : Sendable {
693
712
/// The yield's ID.
694
713
@usableFromInline
695
714
var yieldID : YieldID
@@ -715,7 +734,7 @@ extension NIOAsyncWriter {
715
734
716
735
/// The current state of our ``NIOAsyncWriter``.
717
736
@usableFromInline
718
- /* private */ internal enum State : CustomStringConvertible {
737
+ /* private */ internal enum State : Sendable , CustomStringConvertible {
719
738
/// The initial state before either a call to ``NIOAsyncWriter/yield(contentsOf:)`` or
720
739
/// ``NIOAsyncWriter/finish(completion:)`` happened.
721
740
case initial(
0 commit comments