Skip to content

Commit

Permalink
Integrate SupabaseLogger
Browse files Browse the repository at this point in the history
  • Loading branch information
grdsdev committed Jan 17, 2024
1 parent e6a566a commit 9d79105
Show file tree
Hide file tree
Showing 8 changed files with 285 additions and 265 deletions.
163 changes: 82 additions & 81 deletions Sources/Realtime/Deprecated.swift
Original file line number Diff line number Diff line change
Expand Up @@ -10,87 +10,88 @@ import Foundation
@available(*, deprecated, renamed: "RealtimeMessage")
public typealias Message = RealtimeMessage

extension RealtimeChannel {
@available(
*,
deprecated,
message: "Please use one of postgresChanges, presenceChange, or broadcast methods that returns an AsyncSequence instead."
)
@discardableResult
public func on(
_ event: String,
filter: ChannelFilter,
handler: @escaping (_RealtimeMessage) -> Void
) -> RealtimeChannel {
let stream: AsyncStream<HasRawMessage>

switch event.lowercased() {
case "postgres_changes":
switch filter.event?.uppercased() {
case "UPDATE":
stream = postgresChange(
UpdateAction.self,
schema: filter.schema ?? "public",
table: filter.table!,
filter: filter.filter
)
.map { $0 as HasRawMessage }
.eraseToStream()
case "INSERT":
stream = postgresChange(
InsertAction.self,
schema: filter.schema ?? "public",
table: filter.table!,
filter: filter.filter
)
.map { $0 as HasRawMessage }
.eraseToStream()
case "DELETE":
stream = postgresChange(
DeleteAction.self,
schema: filter.schema ?? "public",
table: filter.table!,
filter: filter.filter
)
.map { $0 as HasRawMessage }
.eraseToStream()
case "SELECT":
stream = postgresChange(
SelectAction.self,
schema: filter.schema ?? "public",
table: filter.table!,
filter: filter.filter
)
.map { $0 as HasRawMessage }
.eraseToStream()
default:
stream = postgresChange(
AnyAction.self,
schema: filter.schema ?? "public",
table: filter.table!,
filter: filter.filter
)
.map { $0 as HasRawMessage }
.eraseToStream()
}

case "presence":
stream = presenceChange().map { $0 as HasRawMessage }.eraseToStream()
case "broadcast":
stream = broadcast(event: filter.event!).map { $0 as HasRawMessage }.eraseToStream()
default:
fatalError(
"Unsupported event '\(event)'. Expected one of: postgres_changes, presence, or broadcast."
)
}

Task {
for await action in stream {
handler(action.rawMessage)
}
}

return self
extension RealtimeChannelV2 {
// @available(
// *,
// deprecated,
// message: "Please use one of postgresChanges, presenceChange, or broadcast methods that returns an AsyncSequence instead."
// )
// @discardableResult
// public func on(
// _ event: String,
// filter: ChannelFilter,
// handler: @escaping (Message) -> Void
// ) -> RealtimeChannel {
// let stream: AsyncStream<HasRawMessage>
//
// switch event.lowercased() {
// case "postgres_changes":
// switch filter.event?.uppercased() {
// case "UPDATE":
// stream = postgresChange(
// UpdateAction.self,
// schema: filter.schema ?? "public",
// table: filter.table!,
// filter: filter.filter
// )
// .map { $0 as HasRawMessage }
// .eraseToStream()
// case "INSERT":
// stream = postgresChange(
// InsertAction.self,
// schema: filter.schema ?? "public",
// table: filter.table!,
// filter: filter.filter
// )
// .map { $0 as HasRawMessage }
// .eraseToStream()
// case "DELETE":
// stream = postgresChange(
// DeleteAction.self,
// schema: filter.schema ?? "public",
// table: filter.table!,
// filter: filter.filter
// )
// .map { $0 as HasRawMessage }
// .eraseToStream()
// case "SELECT":
// stream = postgresChange(
// SelectAction.self,
// schema: filter.schema ?? "public",
// table: filter.table!,
// filter: filter.filter
// )
// .map { $0 as HasRawMessage }
// .eraseToStream()
// default:
// stream = postgresChange(
// AnyAction.self,
// schema: filter.schema ?? "public",
// table: filter.table!,
// filter: filter.filter
// )
// .map { $0 as HasRawMessage }
// .eraseToStream()
// }
//
// case "presence":
// stream = presenceChange().map { $0 as HasRawMessage }.eraseToStream()
// case "broadcast":
// stream = broadcast(event: filter.event!).map { $0 as HasRawMessage }.eraseToStream()
// default:
// fatalError(
// "Unsupported event '\(event)'. Expected one of: postgres_changes, presence, or broadcast."
// )
// }
//
// Task {
// for await action in stream {
// handler(action.rawMessage)
// }
// }
//
// return self
// }
}

extension RealtimeClient {
Expand Down
41 changes: 25 additions & 16 deletions Sources/Realtime/V2/Channel.swift
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ public actor RealtimeChannelV2 {

let topic: String
let config: RealtimeChannelConfig
let logger: SupabaseLogger?

private let callbackManager = CallbackManager()
let statusStreamManager = AsyncStreamManager<Status>()
Expand All @@ -45,11 +46,13 @@ public actor RealtimeChannelV2 {
init(
topic: String,
config: RealtimeChannelConfig,
socket: RealtimeClientV2
socket: RealtimeClientV2,
logger: SupabaseLogger?
) {
self.socket = socket
self.topic = topic
self.config = config
self.logger = logger
}

deinit {
Expand All @@ -72,7 +75,7 @@ public actor RealtimeChannelV2 {
await socket?.addChannel(self)

statusStreamManager.yield(.subscribing)
debug("subscribing to channel \(topic)")
logger?.debug("subscribing to channel \(topic)")

let accessToken = await socket?.accessToken

Expand All @@ -87,7 +90,7 @@ public actor RealtimeChannelV2 {

joinRef = await socket?.makeRef().description

debug("subscribing to channel with body: \(joinConfig)")
logger?.debug("subscribing to channel with body: \(joinConfig)")

await push(
RealtimeMessageV2(
Expand All @@ -106,7 +109,7 @@ public actor RealtimeChannelV2 {

public func unsubscribe() async {
statusStreamManager.yield(.unsubscribing)
debug("unsubscribing from channel \(topic)")
logger?.debug("unsubscribing from channel \(topic)")

await push(
RealtimeMessageV2(
Expand All @@ -120,7 +123,7 @@ public actor RealtimeChannelV2 {
}

public func updateAuth(jwt: String) async {
debug("Updating auth token for channel \(topic)")
logger?.debug("Updating auth token for channel \(topic)")
await push(
RealtimeMessageV2(
joinRef: joinRef,
Expand Down Expand Up @@ -196,18 +199,18 @@ public actor RealtimeChannelV2 {
func onMessage(_ message: RealtimeMessageV2) {
do {
guard let eventType = message.eventType else {
debug("Received message without event type: \(message)")
logger?.debug("Received message without event type: \(message)")
return
}

switch eventType {
case .tokenExpired:
debug(
logger?.debug(
"Received token expired event. This should not happen, please report this warning."
)

case .system:
debug("Subscribed to channel \(message.topic)")
logger?.debug("Subscribed to channel \(message.topic)")
statusStreamManager.yield(.subscribed)

case .reply:
Expand All @@ -231,13 +234,13 @@ public actor RealtimeChannelV2 {

if statusStreamManager.value != .subscribed {
statusStreamManager.yield(.subscribed)
debug("Subscribed to channel \(message.topic)")
logger?.debug("Subscribed to channel \(message.topic)")
}
}

case .postgresChanges:
guard let data = message.payload["data"] else {
debug("Expected \"data\" key in message payload.")
logger?.debug("Expected \"data\" key in message payload.")
return
}

Expand Down Expand Up @@ -307,11 +310,11 @@ public actor RealtimeChannelV2 {
guard let self else { return }

await socket?.removeChannel(self)
debug("Unsubscribed from channel \(message.topic)")
logger?.debug("Unsubscribed from channel \(message.topic)")
}

case .error:
debug(
logger?.debug(
"Received an error in channel \(message.topic). That could be as a result of an invalid access token"
)

Expand All @@ -325,7 +328,7 @@ public actor RealtimeChannelV2 {
callbackManager.triggerPresenceDiffs(joins: joins, leaves: [:], rawMessage: message)
}
} catch {
debug("Failed: \(error)")
logger?.debug("Failed: \(error)")
}
}

Expand All @@ -337,8 +340,10 @@ public actor RealtimeChannelV2 {
continuation.yield($0)
}

let logger = logger

continuation.onTermination = { [weak callbackManager] _ in
debug("Removing presence callback with id: \(id)")
logger?.debug("Removing presence callback with id: \(id)")
callbackManager?.removeCallback(id: id)
}

Expand Down Expand Up @@ -429,8 +434,10 @@ public actor RealtimeChannelV2 {
continuation.yield(action)
}

let logger = logger

continuation.onTermination = { [weak callbackManager] _ in
debug("Removing postgres callback with id: \(id)")
logger?.debug("Removing postgres callback with id: \(id)")
callbackManager?.removeCallback(id: id)
}

Expand All @@ -446,8 +453,10 @@ public actor RealtimeChannelV2 {
continuation.yield($0)
}

let logger = logger

continuation.onTermination = { [weak callbackManager] _ in
debug("Removing broadcast callback with id: \(id)")
logger?.debug("Removing broadcast callback with id: \(id)")
callbackManager?.removeCallback(id: id)
}

Expand Down
Loading

0 comments on commit 9d79105

Please sign in to comment.