Skip to content

Commit

Permalink
refactor: use Clock protocol when available (#633)
Browse files Browse the repository at this point in the history
  • Loading branch information
grdsdev committed Jan 13, 2025
1 parent a7f4246 commit bd554c5
Show file tree
Hide file tree
Showing 10 changed files with 238 additions and 38 deletions.
2 changes: 2 additions & 0 deletions Package.swift
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,15 @@ let package = Package(
.package(url: "https://github.com/pointfreeco/swift-custom-dump", from: "1.3.2"),
.package(url: "https://github.com/pointfreeco/swift-snapshot-testing", from: "1.17.2"),
.package(url: "https://github.com/pointfreeco/xctest-dynamic-overlay", from: "1.2.2"),
.package(url: "https://github.com/pointfreeco/swift-clocks", from: "1.0.0"),
],
targets: [
.target(
name: "Helpers",
dependencies: [
.product(name: "ConcurrencyExtras", package: "swift-concurrency-extras"),
.product(name: "HTTPTypes", package: "swift-http-types"),
.product(name: "Clocks", package: "swift-clocks"),
]
),
.testTarget(
Expand Down
124 changes: 124 additions & 0 deletions Sources/Helpers/AsyncValueSubject.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
//
// AsyncValueSubject.swift
// Supabase
//
// Created by Guilherme Souza on 31/10/24.
//

import ConcurrencyExtras
import Foundation

/// A thread-safe subject that wraps a single value and provides async access to its updates.
/// Similar to Combine's CurrentValueSubject, but designed for async/await usage.
package final class AsyncValueSubject<Value: Sendable>: Sendable {

/// Defines how values are buffered in the underlying AsyncStream.
package typealias BufferingPolicy = AsyncStream<Value>.Continuation.BufferingPolicy

/// Internal state container for the subject.
struct MutableState {
var value: Value
var continuations: [UInt: AsyncStream<Value>.Continuation] = [:]
var count: UInt = 0
}

let bufferingPolicy: BufferingPolicy
let mutableState: LockIsolated<MutableState>

/// Creates a new AsyncValueSubject with an initial value.
/// - Parameters:
/// - initialValue: The initial value to store
/// - bufferingPolicy: Determines how values are buffered in the AsyncStream (defaults to .unbounded)
package init(_ initialValue: Value, bufferingPolicy: BufferingPolicy = .unbounded) {
self.mutableState = LockIsolated(MutableState(value: initialValue))
self.bufferingPolicy = bufferingPolicy
}

deinit {
finish()
}

/// The current value stored in the subject.
package var value: Value {
mutableState.value
}

/// Sends a new value to the subject and notifies all observers.
/// - Parameter value: The new value to send
package func yield(_ value: Value) {
mutableState.withValue {
$0.value = value

for (_, continuation) in $0.continuations {
continuation.yield(value)
}
}
}

/// Resume the task awaiting the next iteration point by having it return
/// nil, which signifies the end of the iteration.
///
/// Calling this function more than once has no effect. After calling
/// finish, the stream enters a terminal state and doesn't produce any
/// additional elements.
package func finish() {
for (_, continuation) in mutableState.continuations {
continuation.finish()
}
}

/// An AsyncStream that emits the current value and all subsequent updates.
package var values: AsyncStream<Value> {
AsyncStream(bufferingPolicy: bufferingPolicy) { continuation in
insert(continuation)
}
}

/// Observes changes to the subject's value by executing the provided handler.
/// - Parameters:
/// - priority: The priority of the task that will observe changes (optional)
/// - handler: A closure that will be called with each new value
/// - Returns: A task that can be cancelled to stop observing changes
@discardableResult
package func onChange(
priority: TaskPriority? = nil,
_ handler: @escaping @Sendable (Value) -> Void
) -> Task<Void, Never> {
let stream = self.values
return Task(priority: priority) {
for await value in stream {
if Task.isCancelled {
break
}
handler(value)
}
}
}

/// Adds a new continuation to the subject and yields the current value.
private func insert(_ continuation: AsyncStream<Value>.Continuation) {
mutableState.withValue { state in
continuation.yield(state.value)
let id = state.count + 1
state.count = id
state.continuations[id] = continuation

continuation.onTermination = { [weak self] _ in
self?.remove(continuation: id)
}
}
}

/// Removes a continuation when it's terminated.
private func remove(continuation id: UInt) {
mutableState.withValue {
_ = $0.continuations.removeValue(forKey: id)
}
}
}

extension AsyncValueSubject where Value == Void {
package func yield() {
self.yield(())
}
}
2 changes: 1 addition & 1 deletion Sources/Helpers/Task+withTimeout.swift
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ package func withTimeout<R: Sendable>(
group.addTask {
let interval = deadline.timeIntervalSinceNow
if interval > 0 {
try await Task.sleep(nanoseconds: NSEC_PER_SEC * UInt64(interval))
try await _clock.sleep(for: interval)
}
try Task.checkCancellation()
throw TimeoutError()
Expand Down
52 changes: 52 additions & 0 deletions Sources/Helpers/_Clock.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
//
// _Clock.swift
// Supabase
//
// Created by Guilherme Souza on 08/01/25.
//

import Clocks
import Foundation

package protocol _Clock: Sendable {
func sleep(for duration: TimeInterval) async throws
}

@available(macOS 13.0, iOS 16.0, watchOS 9.0, tvOS 16.0, *)
extension ContinuousClock: _Clock {
package func sleep(for duration: TimeInterval) async throws {
try await sleep(for: .seconds(duration))
}
}
@available(macOS 13.0, iOS 16.0, watchOS 9.0, tvOS 16.0, *)
extension TestClock<Duration>: _Clock {
package func sleep(for duration: TimeInterval) async throws {
try await sleep(for: .seconds(duration))
}
}

/// `_Clock` used on platforms where ``Clock`` protocol isn't available.
struct FallbackClock: _Clock {
func sleep(for duration: TimeInterval) async throws {
try await Task.sleep(nanoseconds: NSEC_PER_SEC * UInt64(duration))
}
}

// Resolves clock instance based on platform availability.
let _resolveClock: @Sendable () -> any _Clock = {
if #available(macOS 13.0, iOS 16.0, watchOS 9.0, tvOS 16.0, *) {
ContinuousClock()
} else {
FallbackClock()
}
}

// For overriding clock on tests, we use a mutable _clock in DEBUG builds.
// nonisolated(unsafe) is safe to use if making sure we assign _clock once in test set up.
//
// _clock is read-only in RELEASE builds.
#if DEBUG
nonisolated(unsafe) package var _clock = _resolveClock()
#else
package let _clock = _resolveClock()
#endif
13 changes: 7 additions & 6 deletions Sources/Realtime/V2/RealtimeChannelV2.swift
Original file line number Diff line number Diff line change
Expand Up @@ -41,15 +41,15 @@ public final class RealtimeChannelV2: Sendable {
var joinRef: String? { mutableState.joinRef }

let callbackManager = CallbackManager()
private let statusEventEmitter = EventEmitter<RealtimeChannelStatus>(initialEvent: .unsubscribed)
private let statusSubject = AsyncValueSubject<RealtimeChannelStatus>(.unsubscribed)

public private(set) var status: RealtimeChannelStatus {
get { statusEventEmitter.lastEvent }
set { statusEventEmitter.emit(newValue) }
get { statusSubject.value }
set { statusSubject.yield(newValue) }
}

public var statusChange: AsyncStream<RealtimeChannelStatus> {
statusEventEmitter.stream()
statusSubject.values
}

/// Listen for connection status changes.
Expand All @@ -59,8 +59,9 @@ public final class RealtimeChannelV2: Sendable {
/// - Note: Use ``statusChange`` if you prefer to use Async/Await.
public func onStatusChange(
_ listener: @escaping @Sendable (RealtimeChannelStatus) -> Void
) -> ObservationToken {
statusEventEmitter.attach(listener)
) -> RealtimeSubscription {
let task = statusSubject.onChange { listener($0) }
return RealtimeSubscription { task.cancel() }
}

init(
Expand Down
15 changes: 8 additions & 7 deletions Sources/Realtime/V2/RealtimeClientV2.swift
Original file line number Diff line number Diff line change
Expand Up @@ -56,19 +56,19 @@ public final class RealtimeClientV2: Sendable {
)
}

private let statusEventEmitter = EventEmitter<RealtimeClientStatus>(initialEvent: .disconnected)
private let statusSubject = AsyncValueSubject<RealtimeClientStatus>(.disconnected)

/// Listen for connection status changes.
///
/// You can also use ``onStatusChange(_:)`` for a closure based method.
public var statusChange: AsyncStream<RealtimeClientStatus> {
statusEventEmitter.stream()
statusSubject.values
}

/// The current connection status.
public private(set) var status: RealtimeClientStatus {
get { statusEventEmitter.lastEvent }
set { statusEventEmitter.emit(newValue) }
get { statusSubject.value }
set { statusSubject.yield(newValue) }
}

/// Listen for connection status changes.
Expand All @@ -79,7 +79,8 @@ public final class RealtimeClientV2: Sendable {
public func onStatusChange(
_ listener: @escaping @Sendable (RealtimeClientStatus) -> Void
) -> RealtimeSubscription {
statusEventEmitter.attach(listener)
let task = statusSubject.onChange { listener($0) }
return RealtimeSubscription { task.cancel() }
}

public convenience init(url: URL, options: RealtimeClientOptions) {
Expand Down Expand Up @@ -150,7 +151,7 @@ public final class RealtimeClientV2: Sendable {
if status == .disconnected {
let connectionTask = Task {
if reconnect {
try? await Task.sleep(nanoseconds: NSEC_PER_SEC * UInt64(options.reconnectDelay))
try? await _clock.sleep(for: options.reconnectDelay)

if Task.isCancelled {
options.logger?.debug("Reconnect cancelled, returning")
Expand Down Expand Up @@ -348,7 +349,7 @@ public final class RealtimeClientV2: Sendable {
private func startHeartbeating() {
let heartbeatTask = Task { [weak self, options] in
while !Task.isCancelled {
try? await Task.sleep(nanoseconds: NSEC_PER_SEC * UInt64(options.heartbeatInterval))
try? await _clock.sleep(for: options.heartbeatInterval)
if Task.isCancelled {
break
}
Expand Down
9 changes: 9 additions & 0 deletions Supabase.xcworkspace/xcshareddata/swiftpm/Package.resolved
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,15 @@
"version" : "1.5.6"
}
},
{
"identity" : "swift-clocks",
"kind" : "remoteSourceControl",
"location" : "https://github.com/pointfreeco/swift-clocks",
"state" : {
"revision" : "cc46202b53476d64e824e0b6612da09d84ffde8e",
"version" : "1.0.6"
}
},
{
"identity" : "swift-collections",
"kind" : "remoteSourceControl",
Expand Down
20 changes: 11 additions & 9 deletions Tests/IntegrationTests/RealtimeIntegrationTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
// Created by Guilherme Souza on 27/03/24.
//

import Clocks
import ConcurrencyExtras
import CustomDump
import Helpers
Expand All @@ -22,20 +23,22 @@ struct TestLogger: SupabaseLogger {
}
}

@available(macOS 13.0, iOS 16.0, watchOS 9.0, tvOS 16.0, *)
final class RealtimeIntegrationTests: XCTestCase {

static let reconnectDelay: TimeInterval = 1
let testClock = TestClock<Duration>()

let client = SupabaseClient(
supabaseURL: URL(string: DotEnv.SUPABASE_URL)!,
supabaseKey: DotEnv.SUPABASE_ANON_KEY,
options: SupabaseClientOptions(
realtime: RealtimeClientOptions(
reconnectDelay: reconnectDelay
)
)
supabaseKey: DotEnv.SUPABASE_ANON_KEY
)

override func setUp() {
super.setUp()

_clock = testClock
}

override func invokeTest() {
withMainSerialExecutor {
super.invokeTest()
Expand All @@ -49,8 +52,7 @@ final class RealtimeIntegrationTests: XCTestCase {
client.realtimeV2.disconnect()

/// Wait for the reconnection delay
try? await Task.sleep(
nanoseconds: NSEC_PER_SEC * UInt64(Self.reconnectDelay) + 1)
await testClock.advance(by: .seconds(RealtimeClientOptions.defaultReconnectDelay))

XCTAssertEqual(client.realtimeV2.status, .disconnected)
}
Expand Down
Loading

0 comments on commit bd554c5

Please sign in to comment.