Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
grdsdev committed Jan 18, 2024
1 parent fd1b0f6 commit bf0ef03
Show file tree
Hide file tree
Showing 9 changed files with 125 additions and 86 deletions.
2 changes: 1 addition & 1 deletion Examples/SlackClone/ChannelsViewModel.swift
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ final class ChannelsViewModel: ChannelsStore {
let insertions = await channel.postgresChange(InsertAction.self, table: "channels")
let deletions = await channel.postgresChange(DeleteAction.self, table: "channels")

await channel.subscribe(blockUntilSubscribed: true)
await channel.subscribe()

Task {
for await insertion in insertions {
Expand Down
2 changes: 1 addition & 1 deletion Examples/SlackClone/MessagesViewModel.swift
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ final class MessagesViewModel: MessagesStore {
let updates = await channel.postgresChange(UpdateAction.self, table: "messages")
let deletions = await channel.postgresChange(DeleteAction.self, table: "messages")

await channel.subscribe(blockUntilSubscribed: true)
await channel.subscribe()

Task {
for await insertion in insertions {
Expand Down
2 changes: 1 addition & 1 deletion Examples/SlackClone/UserStore.swift
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ final class UserStore {
let channel = await supabase.realtimeV2.channel("public:users")
let changes = await channel.postgresChange(AnyAction.self, table: "users")

await channel.subscribe(blockUntilSubscribed: true)
await channel.subscribe()

for await change in changes {
handleChangedUser(change)
Expand Down
52 changes: 52 additions & 0 deletions Sources/Realtime/SharedStream.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
//
// SharedStream.swift
//
//
// Created by Guilherme Souza on 12/01/24.
//

import ConcurrencyExtras
import Foundation

final class SharedStream<Element>: Sendable where Element: Sendable {
private let storage = LockIsolated<[UUID: AsyncStream<Element>.Continuation]>([:])
private let _value: LockIsolated<Element>

var lastElement: Element { _value.value }

init(initialElement: Element) {
_value = LockIsolated(initialElement)
}

func makeStream() -> AsyncStream<Element> {
let (stream, continuation) = AsyncStream<Element>.makeStream()
let id = UUID()

continuation.onTermination = { _ in
self.storage.withValue {
$0[id] = nil
}
}

storage.withValue {
$0[id] = continuation
}

continuation.yield(lastElement)

return stream
}

func yield(_ value: Element) {
_value.setValue(value)
for continuation in storage.value.values {
continuation.yield(value)
}
}

func finish() {
for continuation in storage.value.values {
continuation.finish()
}
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
//
// Channel.swift
// RealtimeChannelV2.swift
//
//
// Created by Guilherme Souza on 26/12/23.
Expand Down Expand Up @@ -33,14 +33,19 @@ public actor RealtimeChannelV2 {
let logger: SupabaseLogger?

private let callbackManager = CallbackManager()
let statusStreamManager = AsyncStreamManager<Status>()
private let statusStream = SharedStream<Status>(initialElement: .unsubscribed)

private var clientChanges: [PostgresJoinConfig] = []
private var joinRef: String?
private var pushes: [String: _Push] = [:]

public var status: AsyncStream<Status> {
statusStreamManager.makeStream()
public private(set) var status: Status {
get { statusStream.lastElement }
set { statusStream.yield(newValue) }
}

public var statusChange: AsyncStream<Status> {
statusStream.makeStream()
}

init(
Expand All @@ -60,10 +65,8 @@ public actor RealtimeChannelV2 {
}

/// Subscribes to the channel
/// - Parameter blockUntilSubscribed: if true, the method will block the current Task until the
/// ``status-swift.property`` is ``Status-swift.enum/subscribed``.
public func subscribe(blockUntilSubscribed: Bool = false) async {
if socket?.statusStreamManager.value != .connected {
public func subscribe() async {
if await socket?.status != .connected {
if socket?.config.connectOnSubscribe != true {
fatalError(
"You can't subscribe to a channel while the realtime client is not connected. Did you forget to call `realtime.connect()`?"
Expand All @@ -74,20 +77,19 @@ public actor RealtimeChannelV2 {

await socket?.addChannel(self)

statusStreamManager.yield(.subscribing)
status = .subscribing
logger?.debug("subscribing to channel \(topic)")

let accessToken = await socket?.accessToken

let postgresChanges = clientChanges

let joinConfig = RealtimeJoinConfig(
broadcast: config.broadcast,
presence: config.presence,
postgresChanges: postgresChanges
postgresChanges: clientChanges
)

let payload = RealtimeJoinPayload(config: joinConfig, accessToken: accessToken)
let payload = await RealtimeJoinPayload(
config: joinConfig,
accessToken: socket?.accessToken
)

joinRef = await socket?.makeRef().description

Expand All @@ -99,17 +101,15 @@ public actor RealtimeChannelV2 {
ref: joinRef,
topic: topic,
event: ChannelEvent.join,
payload: (try? JSONObject(payload)) ?? [:]
payload: try! JSONObject(payload)
)
)

if blockUntilSubscribed {
_ = await status.first { $0 == .subscribed }
}
_ = await statusChange.first { $0 == .subscribed }
}

public func unsubscribe() async {
statusStreamManager.yield(.unsubscribing)
status = .unsubscribing
logger?.debug("unsubscribing from channel \(topic)")

await push(
Expand All @@ -136,9 +136,9 @@ public actor RealtimeChannelV2 {
)
}

public func broadcast(event: String, message: [String: AnyJSON]) async {
public func broadcast(event: String, message: JSONObject) async {
assert(
statusStreamManager.value == .subscribed,
status == .subscribed,
"You can only broadcast after subscribing to the channel. Did you forget to call `channel.subscribe()`?"
)

Expand All @@ -163,7 +163,7 @@ public actor RealtimeChannelV2 {

public func track(state: JSONObject) async {
assert(
statusStreamManager.value == .subscribed,
status == .subscribed,
"You can only track your presence after subscribing to the channel. Did you forget to call `channel.subscribe()`?"
)

Expand Down Expand Up @@ -212,7 +212,7 @@ public actor RealtimeChannelV2 {

case .system:
logger?.debug("Subscribed to channel \(message.topic)")
statusStreamManager.yield(.subscribed)
status = .subscribed

case .reply:
guard
Expand All @@ -233,8 +233,8 @@ public actor RealtimeChannelV2 {

callbackManager.setServerChanges(changes: serverPostgresChanges ?? [])

if statusStreamManager.value != .subscribed {
statusStreamManager.yield(.subscribed)
if self.status != .subscribed {
self.status = .subscribed
logger?.debug("Subscribed to channel \(message.topic)")
}
}
Expand Down Expand Up @@ -416,7 +416,7 @@ public actor RealtimeChannelV2 {
filter: String?
) -> AsyncStream<AnyAction> {
precondition(
statusStreamManager.value != .subscribed,
status != .subscribed,
"You cannot call postgresChange after joining the channel"
)

Expand Down
36 changes: 23 additions & 13 deletions Sources/Realtime/V2/RealtimeClientV2.swift
Original file line number Diff line number Diff line change
Expand Up @@ -67,10 +67,15 @@ public actor RealtimeClientV2 {
let config: Configuration
let makeWebSocketClient: (_ url: URL, _ headers: [String: String]) -> WebSocketClientProtocol

let statusStreamManager = AsyncStreamManager<Status>()
private let statusStream = SharedStream<Status>(initialElement: .disconnected)

public var status: AsyncStream<Status> {
statusStreamManager.makeStream()
public var statusChange: AsyncStream<Status> {
statusStream.makeStream()
}

public private(set) var status: Status {
get { statusStream.lastElement }
set { statusStream.yield(newValue) }
}

init(
Expand Down Expand Up @@ -119,7 +124,7 @@ public actor RealtimeClientV2 {
return await inFlightConnectionTask.value
}

inFlightConnectionTask = Task {
inFlightConnectionTask = Task { [self] in
defer { inFlightConnectionTask = nil }
if reconnect {
try? await Task.sleep(nanoseconds: NSEC_PER_SEC * UInt64(config.reconnectDelay))
Expand All @@ -130,12 +135,12 @@ public actor RealtimeClientV2 {
}
}

if statusStreamManager.value == .connected {
if status == .connected {
config.logger?.debug("Websocket already connected")
return
}

statusStreamManager.yield(.connecting)
status = .connecting

let realtimeURL = realtimeWebSocketURL

Expand All @@ -150,7 +155,7 @@ public actor RealtimeClientV2 {

switch connectionStatus {
case .open:
statusStreamManager.yield(.connected)
status = .connected
config.logger?.debug("Connected to realtime websocket")
listenForMessages()
startHeartbeating()
Expand Down Expand Up @@ -193,7 +198,7 @@ public actor RealtimeClientV2 {
}

public func removeChannel(_ channel: RealtimeChannelV2) async {
if channel.statusStreamManager.value == .subscribed {
if await channel.status == .subscribed {
await channel.unsubscribe()
}

Expand All @@ -206,9 +211,14 @@ public actor RealtimeClientV2 {
}

private func rejoinChannels() async {
// TODO: should we fire all subscribe calls concurrently?
for channel in subscriptions.values {
await channel.subscribe()
await withTaskGroup(of: Void.self) { group in
for channel in subscriptions.values {
_ = group.addTaskUnlessCancelled {
await channel.subscribe()
}

await group.waitForAll()
}
}
}

Expand Down Expand Up @@ -273,14 +283,14 @@ public actor RealtimeClientV2 {
heartbeatTask?.cancel()
ws?.cancel()
ws = nil
statusStreamManager.yield(.disconnected)
status = .disconnected
}

public func setAuth(_ token: String?) async {
accessToken = token

for channel in subscriptions.values {
if let token, channel.statusStreamManager.value == .subscribed {
if let token, await channel.status == .subscribed {
await channel.updateAuth(jwt: token)
}
}
Expand Down
2 changes: 1 addition & 1 deletion Sources/Realtime/V2/WebSocketClient.swift
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ final class WebSocketClient: NSObject, URLSessionWebSocketDelegate, WebSocketCli
}

private let continuation: AsyncStream<ConnectionStatus>.Continuation
var status: AsyncStream<ConnectionStatus>
let status: AsyncStream<ConnectionStatus>

func connect() {
mutableState.withValue {
Expand Down
42 changes: 0 additions & 42 deletions Sources/_Helpers/StreamManager.swift

This file was deleted.

19 changes: 19 additions & 0 deletions Tests/RealtimeTests/StreamManagerTests.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
//
// StreamManagerTests.swift
//
//
// Created by Guilherme Souza on 18/01/24.
//

import Foundation
@testable import Realtime
import XCTest

final class StreamManagerTests: XCTestCase {
func testYieldInitialValue() async {
let manager = SharedStream(initialElement: 0)

let value = await manager.makeStream().first(where: { _ in true })
XCTAssertEqual(value, 0)
}
}

0 comments on commit bf0ef03

Please sign in to comment.