Skip to content

Commit 1887f4f

Browse files
authored
fix(realtime): auto reconnect after calling disconnect, and several refactors (#627)
* fix: trigger socket reconnection on a new Task * use factory function for returning ws connection * code format * use FakeWebSocket on tests * integration tests * fix FakeWebSocket * add docs for EventEmitter * use SupabaseClient for integration test * fire channel rejoins on background * allow multiple channels to the same topic * remove Socket abstraction and use client directly * skip linux CI * fix build
1 parent bb0a89b commit 1887f4f

15 files changed

+1043
-742
lines changed

.github/workflows/ci.yml

+23-23
Original file line numberDiff line numberDiff line change
@@ -78,29 +78,29 @@ jobs:
7878
if: matrix.skip_release != '1'
7979
run: make XCODEBUILD_ARGUMENT="${{ matrix.command }}" CONFIG=Release PLATFORM="${{ matrix.platform }}" xcodebuild
8080

81-
linux:
82-
name: linux
83-
strategy:
84-
matrix:
85-
swift-version: ["5.10"]
86-
runs-on: ubuntu-latest
87-
steps:
88-
- uses: actions/checkout@v4
89-
- uses: swift-actions/setup-swift@v2
90-
with:
91-
swift-version: ${{ matrix.swift-version }}
92-
- name: Cache build
93-
uses: actions/cache@v3
94-
with:
95-
path: |
96-
.build
97-
key: |
98-
build-spm-linux-${{ matrix.swift-version }}-${{ hashFiles('**/Sources/**/*.swift', '**/Tests/**/*.swift', '**/Package.resolved') }}
99-
restore-keys: |
100-
build-spm-linux-${{ matrix.swift-version }}-
101-
- run: make dot-env
102-
- name: Run tests
103-
run: swift test --skip IntegrationTests
81+
# linux:
82+
# name: linux
83+
# strategy:
84+
# matrix:
85+
# swift-version: ["5.10"]
86+
# runs-on: ubuntu-latest
87+
# steps:
88+
# - uses: actions/checkout@v4
89+
# - uses: swift-actions/setup-swift@v2
90+
# with:
91+
# swift-version: ${{ matrix.swift-version }}
92+
# - name: Cache build
93+
# uses: actions/cache@v3
94+
# with:
95+
# path: |
96+
# .build
97+
# key: |
98+
# build-spm-linux-${{ matrix.swift-version }}-${{ hashFiles('**/Sources/**/*.swift', '**/Tests/**/*.swift', '**/Package.resolved') }}
99+
# restore-keys: |
100+
# build-spm-linux-${{ matrix.swift-version }}-
101+
# - run: make dot-env
102+
# - name: Run tests
103+
# run: swift test --skip IntegrationTests
104104

105105
# library-evolution:
106106
# name: Library (evolution)

Package.swift

+1-4
Original file line numberDiff line numberDiff line change
@@ -92,10 +92,7 @@ let package = Package(
9292
.product(name: "InlineSnapshotTesting", package: "swift-snapshot-testing"),
9393
.product(name: "XCTestDynamicOverlay", package: "xctest-dynamic-overlay"),
9494
"Helpers",
95-
"Auth",
96-
"PostgREST",
97-
"Realtime",
98-
"Storage",
95+
"Supabase",
9996
"TestHelpers",
10097
],
10198
resources: [.process("Fixtures")]

Sources/Helpers/EventEmitter.swift

+26-12
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,9 @@
88
import ConcurrencyExtras
99
import Foundation
1010

11+
/// A token for cancelling observations.
12+
///
13+
/// When this token gets deallocated it cancels the observation it was associated with. Store this token in another object to keep the observation alive.
1114
public final class ObservationToken: @unchecked Sendable, Hashable {
1215
private let _isCancelled = LockIsolated(false)
1316
package var onCancel: @Sendable () -> Void
@@ -44,9 +47,7 @@ public final class ObservationToken: @unchecked Sendable, Hashable {
4447
public func hash(into hasher: inout Hasher) {
4548
hasher.combine(ObjectIdentifier(self))
4649
}
47-
}
4850

49-
extension ObservationToken {
5051
public func store(in collection: inout some RangeReplaceableCollection<ObservationToken>) {
5152
collection.append(self)
5253
}
@@ -59,20 +60,29 @@ extension ObservationToken {
5960
package final class EventEmitter<Event: Sendable>: Sendable {
6061
public typealias Listener = @Sendable (Event) -> Void
6162

62-
private let listeners = LockIsolated<[(key: ObjectIdentifier, listener: Listener)]>([])
63-
private let _lastEvent: LockIsolated<Event>
64-
package var lastEvent: Event { _lastEvent.value }
63+
struct MutableState {
64+
var listeners: [(key: ObjectIdentifier, listener: Listener)] = []
65+
var lastEvent: Event
66+
}
67+
68+
let mutableState: LockIsolated<MutableState>
69+
70+
/// The last event emitted by this Emiter, or the initial event.
71+
package var lastEvent: Event { mutableState.lastEvent }
6572

6673
let emitsLastEventWhenAttaching: Bool
6774

6875
package init(
6976
initialEvent event: Event,
7077
emitsLastEventWhenAttaching: Bool = true
7178
) {
72-
_lastEvent = LockIsolated(event)
79+
mutableState = LockIsolated(MutableState(lastEvent: event))
7380
self.emitsLastEventWhenAttaching = emitsLastEventWhenAttaching
7481
}
7582

83+
/// Attaches a new listener for observing event emissions.
84+
///
85+
/// If emitter initialized with `emitsLastEventWhenAttaching = true`, listener gets called right away with last event.
7686
package func attach(_ listener: @escaping Listener) -> ObservationToken {
7787
defer {
7888
if emitsLastEventWhenAttaching {
@@ -84,21 +94,24 @@ package final class EventEmitter<Event: Sendable>: Sendable {
8494
let key = ObjectIdentifier(token)
8595

8696
token.onCancel = { [weak self] in
87-
self?.listeners.withValue {
88-
$0.removeAll { $0.key == key }
97+
self?.mutableState.withValue {
98+
$0.listeners.removeAll { $0.key == key }
8999
}
90100
}
91101

92-
listeners.withValue {
93-
$0.append((key, listener))
102+
mutableState.withValue {
103+
$0.listeners.append((key, listener))
94104
}
95105

96106
return token
97107
}
98108

109+
/// Trigger a new event on all attached listeners, or a specific listener owned by the `token` provided.
99110
package func emit(_ event: Event, to token: ObservationToken? = nil) {
100-
_lastEvent.setValue(event)
101-
let listeners = listeners.value
111+
let listeners = mutableState.withValue {
112+
$0.lastEvent = event
113+
return $0.listeners
114+
}
102115

103116
if let token {
104117
listeners.first { $0.key == ObjectIdentifier(token) }?.listener(event)
@@ -109,6 +122,7 @@ package final class EventEmitter<Event: Sendable>: Sendable {
109122
}
110123
}
111124

125+
/// Returns a new ``AsyncStream`` for observing events emitted by this emitter.
112126
package func stream() -> AsyncStream<Event> {
113127
AsyncStream { continuation in
114128
let token = attach { status in

Sources/Realtime/V2/PushV2.swift

+2-2
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ actor PushV2 {
3131
return .error
3232
}
3333

34-
await channel.socket.push(message)
34+
channel.socket.push(message)
3535

3636
if !channel.config.broadcast.acknowledgeBroadcasts {
3737
// channel was configured with `ack = false`,
@@ -40,7 +40,7 @@ actor PushV2 {
4040
}
4141

4242
do {
43-
return try await withTimeout(interval: channel.socket.options().timeoutInterval) {
43+
return try await withTimeout(interval: channel.socket.options.timeoutInterval) {
4444
await withCheckedContinuation { continuation in
4545
self.receivedContinuation = continuation
4646
}

Sources/Realtime/V2/RealtimeChannelV2.swift

+15-56
Original file line numberDiff line numberDiff line change
@@ -25,46 +25,6 @@ public struct RealtimeChannelConfig: Sendable {
2525
public var isPrivate: Bool
2626
}
2727

28-
struct Socket: Sendable {
29-
var broadcastURL: @Sendable () -> URL
30-
var status: @Sendable () -> RealtimeClientStatus
31-
var options: @Sendable () -> RealtimeClientOptions
32-
var accessToken: @Sendable () async -> String?
33-
var apiKey: @Sendable () -> String?
34-
var makeRef: @Sendable () -> Int
35-
36-
var connect: @Sendable () async -> Void
37-
var addChannel: @Sendable (_ channel: RealtimeChannelV2) -> Void
38-
var removeChannel: @Sendable (_ channel: RealtimeChannelV2) async -> Void
39-
var push: @Sendable (_ message: RealtimeMessageV2) async -> Void
40-
var httpSend: @Sendable (_ request: Helpers.HTTPRequest) async throws -> Helpers.HTTPResponse
41-
}
42-
43-
extension Socket {
44-
init(client: RealtimeClientV2) {
45-
self.init(
46-
broadcastURL: { [weak client] in client?.broadcastURL ?? URL(string: "http://localhost")! },
47-
status: { [weak client] in client?.status ?? .disconnected },
48-
options: { [weak client] in client?.options ?? .init() },
49-
accessToken: { [weak client] in
50-
if let accessToken = try? await client?.options.accessToken?() {
51-
return accessToken
52-
}
53-
return client?.mutableState.accessToken
54-
},
55-
apiKey: { [weak client] in client?.apikey },
56-
makeRef: { [weak client] in client?.makeRef() ?? 0 },
57-
connect: { [weak client] in await client?.connect() },
58-
addChannel: { [weak client] in client?.addChannel($0) },
59-
removeChannel: { [weak client] in await client?.removeChannel($0) },
60-
push: { [weak client] in await client?.push($0) },
61-
httpSend: { [weak client] in
62-
try await client?.http.send($0) ?? .init(data: Data(), response: HTTPURLResponse())
63-
}
64-
)
65-
}
66-
}
67-
6828
public final class RealtimeChannelV2: Sendable {
6929
struct MutableState {
7030
var clientChanges: [PostgresJoinConfig] = []
@@ -77,7 +37,8 @@ public final class RealtimeChannelV2: Sendable {
7737
let topic: String
7838
let config: RealtimeChannelConfig
7939
let logger: (any SupabaseLogger)?
80-
let socket: Socket
40+
let socket: RealtimeClientV2
41+
var joinRef: String? { mutableState.joinRef }
8142

8243
let callbackManager = CallbackManager()
8344
private let statusEventEmitter = EventEmitter<RealtimeChannelStatus>(initialEvent: .unsubscribed)
@@ -105,7 +66,7 @@ public final class RealtimeChannelV2: Sendable {
10566
init(
10667
topic: String,
10768
config: RealtimeChannelConfig,
108-
socket: Socket,
69+
socket: RealtimeClientV2,
10970
logger: (any SupabaseLogger)?
11071
) {
11172
self.topic = topic
@@ -120,8 +81,8 @@ public final class RealtimeChannelV2: Sendable {
12081

12182
/// Subscribes to the channel
12283
public func subscribe() async {
123-
if socket.status() != .connected {
124-
if socket.options().connectOnSubscribe != true {
84+
if socket.status != .connected {
85+
if socket.options.connectOnSubscribe != true {
12586
reportIssue(
12687
"You can't subscribe to a channel while the realtime client is not connected. Did you forget to call `realtime.connect()`?"
12788
)
@@ -130,8 +91,6 @@ public final class RealtimeChannelV2: Sendable {
13091
await socket.connect()
13192
}
13293

133-
socket.addChannel(self)
134-
13594
status = .subscribing
13695
logger?.debug("Subscribing to channel \(topic)")
13796

@@ -144,10 +103,10 @@ public final class RealtimeChannelV2: Sendable {
144103

145104
let payload = RealtimeJoinPayload(
146105
config: joinConfig,
147-
accessToken: await socket.accessToken()
106+
accessToken: await socket._getAccessToken()
148107
)
149108

150-
let joinRef = socket.makeRef().description
109+
let joinRef = socket.makeRef()
151110
mutableState.withValue { $0.joinRef = joinRef }
152111

153112
logger?.debug("Subscribing to channel with body: \(joinConfig)")
@@ -159,7 +118,7 @@ public final class RealtimeChannelV2: Sendable {
159118
)
160119

161120
do {
162-
try await withTimeout(interval: socket.options().timeoutInterval) { [self] in
121+
try await withTimeout(interval: socket.options.timeoutInterval) { [self] in
163122
_ = await statusChange.first { @Sendable in $0 == .subscribed }
164123
}
165124
} catch {
@@ -215,17 +174,17 @@ public final class RealtimeChannelV2: Sendable {
215174
}
216175

217176
var headers: HTTPFields = [.contentType: "application/json"]
218-
if let apiKey = socket.apiKey() {
177+
if let apiKey = socket.options.apikey {
219178
headers[.apiKey] = apiKey
220179
}
221-
if let accessToken = await socket.accessToken() {
180+
if let accessToken = await socket._getAccessToken() {
222181
headers[.authorization] = "Bearer \(accessToken)"
223182
}
224183

225184
let task = Task { [headers] in
226-
_ = try? await socket.httpSend(
185+
_ = try? await socket.http.send(
227186
HTTPRequest(
228-
url: socket.broadcastURL(),
187+
url: socket.broadcastURL,
229188
method: .post,
230189
headers: headers,
231190
body: JSONEncoder().encode(
@@ -245,7 +204,7 @@ public final class RealtimeChannelV2: Sendable {
245204
}
246205

247206
if config.broadcast.acknowledgeBroadcasts {
248-
try? await withTimeout(interval: socket.options().timeoutInterval) {
207+
try? await withTimeout(interval: socket.options.timeoutInterval) {
249208
await task.value
250209
}
251210
}
@@ -406,7 +365,7 @@ public final class RealtimeChannelV2: Sendable {
406365
callbackManager.triggerBroadcast(event: event, json: payload)
407366

408367
case .close:
409-
await socket.removeChannel(self)
368+
socket._remove(self)
410369
logger?.debug("Unsubscribed from channel \(message.topic)")
411370
status = .unsubscribed
412371

@@ -582,7 +541,7 @@ public final class RealtimeChannelV2: Sendable {
582541
let push = mutableState.withValue {
583542
let message = RealtimeMessageV2(
584543
joinRef: $0.joinRef,
585-
ref: ref ?? socket.makeRef().description,
544+
ref: ref ?? socket.makeRef(),
586545
topic: self.topic,
587546
event: event,
588547
payload: payload

0 commit comments

Comments
 (0)