From 52c52bece6aeb9dd1cfd22e7db3af97cf7de02e6 Mon Sep 17 00:00:00 2001 From: Thibault Wittemberg Date: Tue, 15 Jun 2021 18:54:12 -0400 Subject: [PATCH] operators: add fromAsync, fromThrowingAsync, fromAsyncSequence to bridge async/await with Combine --- .github/workflows/tests.yml | 27 +- .../xcschemes/CombineExt.xcscheme | 91 ++++++ CombineExt.podspec | 2 +- README.md | 95 ++++++ Sources/Operators/FromAsync.swift | 191 +++++++++++ Tests/FromAsyncTests.swift | 309 ++++++++++++++++++ 6 files changed, 695 insertions(+), 20 deletions(-) create mode 100644 .swiftpm/xcode/xcshareddata/xcschemes/CombineExt.xcscheme create mode 100644 Sources/Operators/FromAsync.swift create mode 100644 Tests/FromAsyncTests.swift diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml index 58af8a7..a3ad7ab 100644 --- a/.github/workflows/tests.yml +++ b/.github/workflows/tests.yml @@ -5,32 +5,30 @@ on: [push, pull_request, workflow_dispatch] jobs: xcode-tests: name: "Test" - runs-on: macOS-latest + runs-on: macOS-11 strategy: matrix: platform: [macOS, iOS, tvOS] include: - platform: macOS - sdk: macosx + sdk: macosx11.3 destination: "arch=x86_64" - platform: iOS - sdk: iphonesimulator - destination: "name=iPhone 11" + sdk: iphoneos15.0 + destination: "name=iPhone 13" - platform: tvOS - sdk: appletvsimulator + sdk: appletvsimulator15.0 destination: "name=Apple TV" steps: - uses: actions/checkout@v2 - - name: Select Xcode 12 (beta) - run: sudo xcode-select -s /Applications/Xcode_12_beta.app - - name: Generate project - run: make project + - name: Select Xcode 13 + run: sudo xcode-select -s /Applications/Xcode_13.0.app - name: Run tests - run: set -o pipefail && xcodebuild -project CombineExt.xcodeproj -scheme CombineExt-Package -enableCodeCoverage YES -sdk ${{ matrix.sdk }} -destination "${{ matrix.destination }}" test | xcpretty -c -r html --output logs/${{ matrix.platform }}.html + run: set -o pipefail && xcodebuild -scheme CombineExt-Package -enableCodeCoverage YES -sdk ${{ matrix.sdk }} -destination "${{ matrix.destination }}" test | xcpretty -c -r html --output logs/${{ matrix.platform }}.html - uses: codecov/codecov-action@v1.0.13 with: token: 1519d58c-6fb9-483f-af6c-7f6f0b384345 @@ -39,12 +37,3 @@ jobs: with: name: build-logs-${{ github.run_id }} path: logs - - SPM: - name: "Test (SPM)" - runs-on: macOS-latest - - steps: - - uses: actions/checkout@v2 - - name: Run tests - run: set -o pipefail && swift test diff --git a/.swiftpm/xcode/xcshareddata/xcschemes/CombineExt.xcscheme b/.swiftpm/xcode/xcshareddata/xcschemes/CombineExt.xcscheme new file mode 100644 index 0000000..b833217 --- /dev/null +++ b/.swiftpm/xcode/xcshareddata/xcschemes/CombineExt.xcscheme @@ -0,0 +1,91 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/CombineExt.podspec b/CombineExt.podspec index 0e266a4..5856c9d 100644 --- a/CombineExt.podspec +++ b/CombineExt.podspec @@ -18,5 +18,5 @@ Pod::Spec.new do |s| s.source = { :git => "https://github.com/CombineCommunity/CombineExt.git", :tag => s.version } s.source_files = 'Sources/**/*.swift' s.frameworks = ['Combine'] - s.swift_version = '5.1' + s.swift_version = '5.5' end diff --git a/README.md b/README.md index 3f6b169..f16e84b 100644 --- a/README.md +++ b/README.md @@ -755,6 +755,101 @@ subscription = ints .finished ``` +------ + +### fromAsync(priority:_:) + +Creates a Combine Publisher from an async function. The Publisher emits a value and then completes when the async function returns its result. +The task that supports the async function is canceled when the publisher's subscription is canceled. +An optional priority indicates the priority of the Task supporting the execution of the async function. + +```swift +var value: Int { + get async { + 3 + } +} + +Publishers + .fromAsync { + await value + }.sink { + print($0) + } receiveValue: { + print($0) + } +``` + +#### Output: + +```none +3 +finished +``` + +------ + +### fromThrowingAsync(priority:_:) + +Creates a Combine Publisher from a throwing async function +The Publisher emits a value and completes or fail according the the async function execution result. +The task that supports the async function is canceled when the publisher's subscription is canceled. +An optional priority indicates the priority of the Task supporting the execution of the async function. + +```swift +struct MyError: Error, CustomStringConvertible { + var description: String { + "Async Error" + } + } + +Publishers + .fromAsync { () async throws -> String in + throw MyError() + }.sink { + print($0) + } receiveValue: { + print($0) + } +``` + +#### Output: + +```none +failure(Async Error) +``` + +### fromAsyncSequence(priority:_:) + +Creates a Combine Publisher from an async sequence. +The Publisher emits values or fail according the the async sequence execution result. +An optional priority indicates the priority of the Task supporting the execution of the async sequence. + +```swift +let sequence = AsyncStream(Int.self) { continuation in + continuation.yield(1) + continuation.yield(2) + continuation.yield(3) + continuation.finish() +} + +Publishers + .fromAsyncSequence(sequence).sink { + print($0) + } receiveValue: { + print($0) + } +``` + +#### Output: + +```none +1 +2 +3 +finished +``` + ## Publishers This section outlines some of the custom Combine publishers CombineExt provides diff --git a/Sources/Operators/FromAsync.swift b/Sources/Operators/FromAsync.swift new file mode 100644 index 0000000..0349fde --- /dev/null +++ b/Sources/Operators/FromAsync.swift @@ -0,0 +1,191 @@ +// +// FromAsync.swift +// CombineExt +// +// Created by Thibault Wittemberg on 2021-06-15. +// Copyright © 2021 Combine Community. All rights reserved. +// + +#if canImport(Combine) +import Combine + +@available(macOS 12.0, iOS 15.0, tvOS 15.0, watchOS 8.0, *) +public extension Publishers { + /// Creates a Combine Publisher from an async function + /// The Publisher emits a value and then completes when the async function returns its result. + /// The task that supports the async function is canceled when the publisher's subscription is canceled. + /// ``` + /// var value: Int { + /// get async { + /// 3 + /// } + /// } + /// + /// Publishers + /// .fromAsync { + /// await value + /// }.sink { + /// print($0) + /// } receiveValue: { + /// print($0) + /// } + /// + /// // will print: + /// // 3 + /// // finished + /// ``` + /// - parameter priority: Optional value indicating the priority of the Task supporting the execution of the async function + /// - Returns: The Combine Publisher wrapping the async function execution + static func fromAsync(priority: TaskPriority? = nil, + _ asyncFunction: @escaping () async -> Output) -> AnyPublisher { + AnyPublisher.create { subscriber in + let task = Task(priority: priority) { + let result = await asyncFunction() + subscriber.send(result) + subscriber.send(completion: .finished) + } + + return AnyCancellable { + task.cancel() + } + } + } + + /// Creates a Combine Publisher from a throwing async function + /// The Publisher emits a value or fail according the the async function execution result. + /// The task that supports the async function is canceled when the publisher's subscription is canceled. + /// + /// ``` + /// var value: Int { + /// get async { + /// 3 + /// } + /// } + /// + /// Publishers + /// .fromAsync { + /// await value + /// }.sink { + /// print($0) + /// } receiveValue: { + /// print($0) + /// } + /// + /// // will print: + /// // 3 + /// // finished + /// ``` + /// + /// Whenever the async function throws an error, the stream will faile: + /// + /// ``` + /// struct MyError: Error, CustomStringConvertible { + /// var description: String { + /// "Async Error" + /// } + /// } + /// + /// Publishers + /// .fromAsync { () async throws -> String in + /// throw MyError() + /// }.sink { + /// print($0) + /// } receiveValue: { + /// print($0) + /// } + /// + /// // will print: + /// // failure(Async Error) + ///``` + /// - parameter priority: Optional value indicating the priority of the Task supporting the execution of the async function + /// - Returns: The Combine Publisher wrapping the async function execution + static func fromThrowingAsync(priority: TaskPriority? = nil, + _ asyncThrowingFunction: @escaping () async throws -> Output) -> AnyPublisher { + AnyPublisher.create { subscriber in + let task = Task(priority: priority) { + do { + let result = try await asyncThrowingFunction() + subscriber.send(result) + subscriber.send(completion: .finished) + } catch { + subscriber.send(completion: .failure(error)) + } + } + + return AnyCancellable { + task.cancel() + } + } + } + + /// Creates a Combine Publisher from an async sequence. + /// The Publisher emits values or fail according the the async sequence execution result. + /// + /// ``` + /// let sequence = [1, 2, 3].publisher.values + /// + /// Publishers + /// .fromAsyncSequence(sequence).sink { + /// print($0) + /// } receiveValue: { + /// print($0) + /// } + /// + /// // will print: + /// // 1 + /// // 2 + /// // 3 + /// // finished + /// ``` + /// + /// If the asyncSequence faild: + /// + /// ``` + /// struct MyError: Error, CustomStringConvertible { + /// var description: String { + /// "Async Error" + /// } + /// } + /// + /// let sequence = AsyncThrowingStream(Int.self) { continuation in + /// continuation.yield(1) + /// continuation.yield(2) + /// continuation.finish(throwing: MockError(value: Int.random(in: 1...100))) + /// } + /// + /// Publishers + /// .fromAsyncSequence(sequence).sink { + /// print($0) + /// } receiveValue: { + /// print($0) + /// } + /// + /// // will print: + /// // 1 + /// // 2 + /// // failure(Async Error) + ///``` + /// - parameter priority: Optional value indicating the priority of the Task supporting the async sequence execution + /// - Returns: The Combine Publisher wrapping the async sequence iteration + static func fromAsyncSequence(priority: TaskPriority? = nil, + _ asyncSequence: AsyncSequenceType) -> AnyPublisher + where AsyncSequenceType: AsyncSequence, AsyncSequenceType.Element == Output { + AnyPublisher.create { subscriber in + let task = Task(priority: priority) { + do { + for try await result in asyncSequence { + subscriber.send(result) + } + subscriber.send(completion: .finished) + } catch { + subscriber.send(completion: .failure(error)) + } + } + + return AnyCancellable { + task.cancel() + } + } + } +} +#endif diff --git a/Tests/FromAsyncTests.swift b/Tests/FromAsyncTests.swift new file mode 100644 index 0000000..795ca69 --- /dev/null +++ b/Tests/FromAsyncTests.swift @@ -0,0 +1,309 @@ +// +// FromAsyncTests.swift +// CombineExtTests +// +// Created by Thibault Wittemberg on 2021-06-15. +// Copyright © 2021 Combine Community. All rights reserved. +// + +#if !os(watchOS) +import Combine +import CombineExt +import XCTest + +struct MockError: Error, Equatable { + let value: Int +} + +@available(macOS 12.0, iOS 15.0, tvOS 15.0, watchOS 8.0, *) +final class FromAsyncTests: XCTestCase { + func testFromAsync_publishes_value() { + let exp = expectation(description: "fromAsync publishes the expected value when executing an async function") + + var asyncFunctionNumberOfExecutions = 0 + let expectedOutput = UUID().uuidString + var receivedOutput: String? + + // Given: an async function + // When: making the publisher from the function and subscribing to it + let cancelable = Publishers + .fromAsync { + asyncFunctionNumberOfExecutions += 1 + return expectedOutput + } + .handleEvents(receiveCompletion: { _ in exp.fulfill() }) + .sink { receivedOutput = $0 } + + waitForExpectations(timeout: 1) + + // Then: The value from the async function is published and then the stream completes + XCTAssertEqual(receivedOutput, expectedOutput) + XCTAssertEqual(asyncFunctionNumberOfExecutions, 1) + + cancelable.cancel() + } + + func testFromAsync_executes_asyncFunction_with_specified_priority_when_called_with_taskPriority() { + let exp = expectation(description: "fromAsync uses the expected priority when executing an async function with a priority") + + var asyncFunctionNumberOfExecutions = 0 + var receivedQueue: String? + + // Given: an async function + // When: making the publisher from the function and subscribing to it with a priority + let cancelable = Publishers + .fromAsync(priority: .userInitiated) { () async -> String in + asyncFunctionNumberOfExecutions += 1 + receivedQueue = DispatchQueue.currentLabel + return "" + } + .handleEvents(receiveCompletion: { _ in exp.fulfill() }) + .sink { _ in } + + waitForExpectations(timeout: 1) + + // Then: The async function is executed with the expected priority + XCTAssertTrue(receivedQueue!.contains("user-initiated")) + XCTAssertEqual(asyncFunctionNumberOfExecutions, 1) + + cancelable.cancel() + } + + func testFromAsync_cancels_task_when_subscription_is_canceled() { + let exp = expectation(description: "fromAsync cancels the task when the subscription is canceled") + + let semaphore = DispatchSemaphore(value: 0) + + var isTaskCanceled = false + + // Given: an async function that records the cancelation of its execution task + // When: making the publisher from the function and subscribing to it + let cancelable = Publishers + .fromAsync { () async -> String in + semaphore.wait() + isTaskCanceled = Task.isCancelled + exp.fulfill() + return "" + } + .sink { _ in } + + // When: canceling the subscription + cancelable.cancel() + semaphore.signal() + + waitForExpectations(timeout: 1) + + // Then: the aync task has been canceled + XCTAssertTrue(isTaskCanceled) + } + + func testFromThrowingAsync_publishes_value_when_throwing_asyncFunction_does_not_throw() { + let exp = expectation(description: "fromAsync publishes the expected value when executing an async function that does no throw") + + var asyncFunctionNumberOfExecutions = 0 + let expectedOutput = UUID().uuidString + var receivedOutput: String? + + // Given: a throwing async function that returns a value + // When: making the publisher from the function and subscribing to it + let cancelable = Publishers + .fromThrowingAsync { + asyncFunctionNumberOfExecutions += 1 + return expectedOutput + } + .handleEvents(receiveCompletion: { _ in exp.fulfill() }) + .sink(receiveCompletion: { _ in }, receiveValue: { asyncOutput in receivedOutput = asyncOutput }) + + waitForExpectations(timeout: 1) + + // Then: The value from the async function is published + XCTAssertEqual(receivedOutput, expectedOutput) + XCTAssertEqual(asyncFunctionNumberOfExecutions, 1) + + cancelable.cancel() + } + + func testFromThrowingAsync_executes_asynFunction_with_specified_priority_when_called_with_taskPriority() { + let exp = expectation(description: "fromAsync uses the expected priority when executing a throwable async function with a priority") + + var asyncFunctionNumberOfExecutions = 0 + var receivedQueue: String? + + // Given: a throwable async function + // When: making the publisher from the function and subscribing to it with a priority + let cancelable = Publishers + .fromThrowingAsync(priority: .userInitiated) { () async throws -> String in + asyncFunctionNumberOfExecutions += 1 + receivedQueue = DispatchQueue.currentLabel + return "" + } + .handleEvents(receiveCompletion: { _ in exp.fulfill() }) + .sink(receiveCompletion: { _ in }, receiveValue: { _ in }) + + waitForExpectations(timeout: 1) + + // Then: The async function is executed with the expected priority + XCTAssertTrue(receivedQueue!.contains("user-initiated")) + XCTAssertEqual(asyncFunctionNumberOfExecutions, 1) + + cancelable.cancel() + } + + func testFromThrowingAsync_completesWithFailure_when_asyncFunction_throws() { + let exp = expectation(description: "fromAsync completes with the expected failure when executing an async function that throws") + + var asyncFunctionNumberOfExecutions = 0 + let expectedError = MockError(value: Int.random(in: 0...100)) + var receivedError: Error? + + // Given: an async function that throws + // When: making the publisher from the function and subscribing to it + let cancelable = Publishers + .fromThrowingAsync { () async throws -> String in + asyncFunctionNumberOfExecutions += 1 + throw expectedError + } + .handleEvents(receiveCompletion: { _ in exp.fulfill() }) + .sink(receiveCompletion: { completion in + if case let .failure(error) = completion { + receivedError = error + } + }, receiveValue: { _ in }) + + waitForExpectations(timeout: 1) + + // Then: The error from the async function is catched in the completion + XCTAssertEqual(receivedError as? MockError, expectedError) + XCTAssertEqual(asyncFunctionNumberOfExecutions, 1) + + cancelable.cancel() + } + + func testFromThrowingAsync_cancels_task_when_subscription_is_canceled() { + let exp = expectation(description: "fromAsync cancels the task when the subscription is canceled") + + let semaphore = DispatchSemaphore(value: 0) + + var isTaskCanceled = false + + // Given: a throwing async function that records the cancelation of its execution task + // When: making the publisher from the function and subscribing to it + let cancelable = Publishers + .fromThrowingAsync { () async throws -> String in + semaphore.wait() + isTaskCanceled = Task.isCancelled + exp.fulfill() + return "" + } + .sink(receiveCompletion: { _ in }, receiveValue: { _ in }) + + // When: canceling the subscription + cancelable.cancel() + semaphore.signal() + + waitForExpectations(timeout: 1) + + // Then: the async task has been canceled + XCTAssertTrue(isTaskCanceled) + } + + func testFromAsyncSequence_publishes_the_values_from_a_non_throwing_asyncSequence() { + let exp = expectation(description: "fromAsync publishes the expected values when executing an async sequence") + + let expectedValues = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10] + var receivedValues: [Int]? + + // Given: an async sequence + let sut = expectedValues.publisher.values + + // When: making the publisher from the sequence and subscribing to it + let cancelable = Publishers + .fromAsyncSequence(sut) + .collect() + .sink { completion in + exp.fulfill() + } receiveValue: { output in + receivedValues = output + } + + waitForExpectations(timeout: 1) + + // Then: the publisher publishes the values from the sequence + XCTAssertEqual(receivedValues, expectedValues) + + cancelable.cancel() + } + + func testFromAsyncSequence_completes_with_failure_when_asyncSequence_throws() { + let exp = expectation(description: "fromAsync completes with error when async sequence fails") + + let expectedError = MockError(value: Int.random(in: 1...100)) + var receivedError: Error? + + // Given: an async sequence that throws + let sut = AsyncThrowingStream(Int.self) { continuation in + continuation.finish(throwing: expectedError) + } + + // When: making the publisher from the sequence and subscribing to it + let cancelable = Publishers + .fromAsyncSequence(sut) + .sink { completion in + if case let .failure(error) = completion { + receivedError = error + } + exp.fulfill() + } receiveValue: { _ in } + + waitForExpectations(timeout: 1) + + // Then: The error from the async sequence is catched in the completion + XCTAssertEqual(receivedError as? MockError, expectedError) + + cancelable.cancel() + } + + func testFromAsyncSequence_cancels_task_when_subscription_is_canceled() { + let exp = expectation(description: "fromAsync cancels the task when the subscription is canceled") + + class CancelRecorder { + var isCanceled = false + + init(isCanceled: Bool) { + self.isCanceled = isCanceled + } + + func setCanceled() { + self.isCanceled = true + } + } + + let cancelRecorder = CancelRecorder(isCanceled: false) + + // Given: An async sequence that records its cancelation + // When: making the publisher from the sequence and subscribing to it + let cancelable = Publishers + .fromAsyncSequence(AsyncStream(Int.self) { continuation in + continuation.onTermination = { @Sendable _ in + cancelRecorder.setCanceled() + exp.fulfill() + } + }) + .sink(receiveCompletion: { print($0) }, receiveValue: { print($0) }) + + // When: canceling the subscription + cancelable.cancel() + + waitForExpectations(timeout: 1) + + // Then: the async sequence has been canceled + XCTAssertTrue(cancelRecorder.isCanceled) + } +} + +fileprivate extension DispatchQueue { + class var currentLabel: String { + return String(validatingUTF8: __dispatch_queue_get_label(nil))! + } +} +#endif