diff --git a/Sources/Atoms/Atom/ObservableObjectAtom.swift b/Sources/Atoms/Atom/ObservableObjectAtom.swift index 731ddcda..d340f362 100644 --- a/Sources/Atoms/Atom/ObservableObjectAtom.swift +++ b/Sources/Atoms/Atom/ObservableObjectAtom.swift @@ -1,3 +1,4 @@ +import Combine import Foundation /// An atom type that instantiates an observable object. @@ -70,24 +71,85 @@ public extension ObservableObjectAtom { AtomProducer { context in context.transaction(object) } manageValue: { object, context in - var task: Task? let cancellable = object .objectWillChange - .sink { [weak object] _ in - // Wait until the object's property is set, because `objectWillChange` - // emits an event before the property is updated. - task?.cancel() - task = Task { @MainActor in - if let object, !Task.isCancelled, !context.isTerminated { - context.update(with: object) - } + .map { @Sendable _ in } + .sinkLatest { [weak object] _ in + // A custom subscriber is used here, encompassing the following + // three behaviours. + // + // 1. It ensures that updates are performed on the main actor because `ObservableObject` + // is not constrained to be isolated to the main actor. + // 2. It always performs updates asynchronously to ensure the object to be updated as + // `objectWillChange` emits events before the update. + // 3. It adopts the latest event and cancels the previous update when successive events + // arrive. + if let object, !context.isTerminated { + context.update(with: object) } } context.onTermination = { - task?.cancel() cancellable.cancel() } } } } + +private extension Publisher where Output: Sendable, Failure == Never { + func sinkLatest(receiveValue: @MainActor @escaping (Output) -> Void) -> AnyCancellable { + let subscriber = Subscribers.SinkLatestOnMainActor(receiveValue: receiveValue) + receive(subscriber: subscriber) + return AnyCancellable(subscriber) + } +} + +private extension Subscribers { + final class SinkLatestOnMainActor: Combine.Subscriber, Cancellable { + private var receiveValue: (@MainActor (Input) -> Void)? + private var currentTask: Task? + private var lock = os_unfair_lock_s() + + init(receiveValue: @MainActor @escaping (Input) -> Void) { + self.receiveValue = receiveValue + } + + func receive(subscription: any Combine.Subscription) { + subscription.request(.unlimited) + } + + func receive(_ input: Input) -> Demand { + withLock { + guard let receiveValue else { + return .none + } + + currentTask?.cancel() + currentTask = Task { @MainActor in + guard !Task.isCancelled else { + return + } + receiveValue(input) + } + + return .unlimited + } + } + + func receive(completion: Completion) {} + + func cancel() { + withLock { + currentTask?.cancel() + currentTask = nil + receiveValue = nil + } + } + + func withLock(_ body: () -> R) -> R { + os_unfair_lock_lock(&lock) + defer { os_unfair_lock_unlock(&lock) } + return body() + } + } +} diff --git a/Tests/AtomsTests/Atom/ObservableObjectAtomTests.swift b/Tests/AtomsTests/Atom/ObservableObjectAtomTests.swift index 5532012c..054cd76a 100644 --- a/Tests/AtomsTests/Atom/ObservableObjectAtomTests.swift +++ b/Tests/AtomsTests/Atom/ObservableObjectAtomTests.swift @@ -147,13 +147,46 @@ final class ObservableObjectAtomTests: XCTestCase { } object.update() - - await context.wait(for: atom) { - $0.value0 == 1 && $0.value1 == 1 - } + await context.waitForUpdate() XCTAssertEqual(updatedCount, 1) XCTAssertEqual(object.value0, 1) XCTAssertEqual(object.value1, 1) } + + @MainActor + func testUpdateOnNonIsolatedContext() async { + final class TestObject: ObservableObject, @unchecked Sendable { + @Published + var value = 0 + + func update() { + value += 1 + } + } + + struct TestAtom: ObservableObjectAtom, Hashable { + func object(context: Context) -> TestObject { + TestObject() + } + } + + let atom = TestAtom() + let context = AtomTestContext() + let object = context.watch(atom) + var updatedCount = 0 + + context.onUpdate = { + updatedCount += 1 + } + + Task.detached { + object.update() + } + + await context.waitForUpdate() + + XCTAssertEqual(updatedCount, 1) + XCTAssertEqual(object.value, 1) + } }