Skip to content

Commit 3e5aa17

Browse files
committed
test: realtime connect and subscribe
1 parent e973690 commit 3e5aa17

File tree

4 files changed

+179
-248
lines changed

4 files changed

+179
-248
lines changed

Sources/Realtime/V2/RealtimeClientV2.swift

+4-5
Original file line numberDiff line numberDiff line change
@@ -62,10 +62,10 @@ public actor RealtimeClientV2 {
6262
var inFlightConnectionTask: Task<Void, Never>?
6363

6464
public private(set) var subscriptions: [String: RealtimeChannelV2] = [:]
65-
var ws: WebSocketClientProtocol?
65+
var ws: WebSocketClient?
6666

6767
let config: Configuration
68-
let makeWebSocketClient: (_ url: URL, _ headers: [String: String]) -> WebSocketClientProtocol
68+
let makeWebSocketClient: (_ url: URL, _ headers: [String: String]) -> WebSocketClient
6969

7070
private let statusStream = SharedStream<Status>(initialElement: .disconnected)
7171

@@ -80,8 +80,7 @@ public actor RealtimeClientV2 {
8080

8181
init(
8282
config: Configuration,
83-
makeWebSocketClient: @escaping (_ url: URL, _ headers: [String: String])
84-
-> WebSocketClientProtocol
83+
makeWebSocketClient: @escaping (_ url: URL, _ headers: [String: String]) -> WebSocketClient
8584
) {
8685
self.config = config
8786
self.makeWebSocketClient = makeWebSocketClient
@@ -146,7 +145,7 @@ public actor RealtimeClientV2 {
146145
let ws = makeWebSocketClient(realtimeURL, config.headers)
147146
self.ws = ws
148147

149-
ws.connect()
148+
await ws.connect()
150149

151150
let connectionStatus = await ws.status.first { _ in true }
152151

Sources/Realtime/V2/WebSocketClient.swift

+83-65
Original file line numberDiff line numberDiff line change
@@ -13,106 +13,89 @@ import Foundation
1313
import FoundationNetworking
1414
#endif
1515

16-
protocol WebSocketClientProtocol: Sendable {
17-
var status: AsyncStream<WebSocketClient.ConnectionStatus> { get }
16+
struct WebSocketClient {
17+
enum ConnectionStatus {
18+
case open
19+
case close
20+
case error(Error)
21+
}
1822

19-
func send(_ message: RealtimeMessageV2) async throws
20-
func receive() -> AsyncThrowingStream<RealtimeMessageV2, Error>
21-
func connect()
22-
func cancel()
23+
var status: AsyncStream<WebSocketClient.ConnectionStatus>
24+
25+
var send: (_ message: RealtimeMessageV2) async throws -> Void
26+
var receive: () -> AsyncThrowingStream<RealtimeMessageV2, Error>
27+
var connect: () async -> Void
28+
var cancel: () -> Void
2329
}
2430

25-
final class WebSocketClient: NSObject, URLSessionWebSocketDelegate, WebSocketClientProtocol,
26-
@unchecked Sendable
27-
{
28-
struct MutableState {
29-
var session: URLSession?
30-
var task: URLSessionWebSocketTask?
31+
extension WebSocketClient {
32+
init(realtimeURL: URL, configuration: URLSessionConfiguration, logger: SupabaseLogger?) {
33+
let client = LiveWebSocketClient(
34+
realtimeURL: realtimeURL,
35+
configuration: configuration,
36+
logger: logger
37+
)
38+
self.init(
39+
status: client.status,
40+
send: { try await client.send($0) },
41+
receive: { client.receive() },
42+
connect: { await client.connect() },
43+
cancel: { client.cancel() }
44+
)
3145
}
46+
}
3247

48+
private actor LiveWebSocketClient {
3349
private let realtimeURL: URL
3450
private let configuration: URLSessionConfiguration
3551
private let logger: SupabaseLogger?
3652

37-
private let mutableState = LockIsolated(MutableState())
38-
39-
enum ConnectionStatus {
40-
case open
41-
case close
42-
case error(Error)
43-
}
53+
private var delegate: Delegate?
54+
private var session: URLSession?
55+
private var task: URLSessionWebSocketTask?
4456

4557
init(realtimeURL: URL, configuration: URLSessionConfiguration, logger: SupabaseLogger?) {
4658
self.realtimeURL = realtimeURL
4759
self.configuration = configuration
4860

49-
let (stream, continuation) = AsyncStream<ConnectionStatus>.makeStream()
61+
let (stream, continuation) = AsyncStream<WebSocketClient.ConnectionStatus>.makeStream()
5062
status = stream
5163
self.continuation = continuation
5264

5365
self.logger = logger
54-
super.init()
5566
}
5667

5768
deinit {
58-
mutableState.withValue {
59-
$0.task?.cancel()
60-
}
61-
69+
task?.cancel()
6270
continuation.finish()
6371
}
6472

65-
private let continuation: AsyncStream<ConnectionStatus>.Continuation
66-
let status: AsyncStream<ConnectionStatus>
73+
let continuation: AsyncStream<WebSocketClient.ConnectionStatus>.Continuation
74+
nonisolated let status: AsyncStream<WebSocketClient.ConnectionStatus>
6775

6876
func connect() {
69-
mutableState.withValue {
70-
$0.session = URLSession(configuration: configuration, delegate: self, delegateQueue: nil)
71-
$0.task = $0.session?.webSocketTask(with: realtimeURL)
72-
$0.task?.resume()
73-
}
74-
}
75-
76-
func cancel() {
77-
mutableState.withValue {
78-
$0.task?.cancel()
77+
delegate = Delegate { [weak self] status in
78+
self?.continuation.yield(status)
7979
}
80-
81-
continuation.finish()
82-
}
83-
84-
func urlSession(
85-
_: URLSession,
86-
webSocketTask _: URLSessionWebSocketTask,
87-
didOpenWithProtocol _: String?
88-
) {
89-
continuation.yield(.open)
80+
session = URLSession(configuration: configuration, delegate: delegate, delegateQueue: nil)
81+
task = session?.webSocketTask(with: realtimeURL)
82+
task?.resume()
9083
}
9184

92-
func urlSession(
93-
_: URLSession,
94-
webSocketTask _: URLSessionWebSocketTask,
95-
didCloseWith _: URLSessionWebSocketTask.CloseCode,
96-
reason _: Data?
97-
) {
98-
continuation.yield(.close)
85+
nonisolated func cancel() {
86+
Task { await _cancel() }
9987
}
10088

101-
func urlSession(
102-
_: URLSession,
103-
task _: URLSessionTask,
104-
didCompleteWithError error: Error?
105-
) {
106-
if let error {
107-
continuation.yield(.error(error))
108-
}
89+
private func _cancel() {
90+
task?.cancel()
91+
continuation.finish()
10992
}
11093

111-
func receive() -> AsyncThrowingStream<RealtimeMessageV2, Error> {
94+
nonisolated func receive() -> AsyncThrowingStream<RealtimeMessageV2, Error> {
11295
let (stream, continuation) = AsyncThrowingStream<RealtimeMessageV2, Error>.makeStream()
11396

11497
Task {
115-
while let message = try await self.mutableState.task?.receive() {
98+
while let message = try await self.task?.receive() {
11699
do {
117100
switch message {
118101
case let .string(stringMessage):
@@ -144,6 +127,41 @@ final class WebSocketClient: NSObject, URLSessionWebSocketDelegate, WebSocketCli
144127
let string = String(decoding: data, as: UTF8.self)
145128

146129
logger?.verbose("Sending message: \(string)")
147-
try await mutableState.task?.send(.string(string))
130+
try await task?.send(.string(string))
131+
}
132+
133+
final class Delegate: NSObject, URLSessionWebSocketDelegate {
134+
let onStatusChange: (_ status: WebSocketClient.ConnectionStatus) -> Void
135+
136+
init(onStatusChange: @escaping (_ status: WebSocketClient.ConnectionStatus) -> Void) {
137+
self.onStatusChange = onStatusChange
138+
}
139+
140+
func urlSession(
141+
_: URLSession,
142+
webSocketTask _: URLSessionWebSocketTask,
143+
didOpenWithProtocol _: String?
144+
) {
145+
onStatusChange(.open)
146+
}
147+
148+
func urlSession(
149+
_: URLSession,
150+
webSocketTask _: URLSessionWebSocketTask,
151+
didCloseWith _: URLSessionWebSocketTask.CloseCode,
152+
reason _: Data?
153+
) {
154+
onStatusChange(.close)
155+
}
156+
157+
func urlSession(
158+
_: URLSession,
159+
task _: URLSessionTask,
160+
didCompleteWithError error: Error?
161+
) {
162+
if let error {
163+
onStatusChange(.error(error))
164+
}
165+
}
148166
}
149167
}

Tests/RealtimeTests/MockWebSocketClient.swift

+10-54
Original file line numberDiff line numberDiff line change
@@ -8,58 +8,14 @@
88
import ConcurrencyExtras
99
import Foundation
1010
@testable import Realtime
11-
12-
final class MockWebSocketClient: WebSocketClientProtocol {
13-
private let continuation: AsyncStream<WebSocketClient.ConnectionStatus>.Continuation
14-
let status: AsyncStream<WebSocketClient.ConnectionStatus>
15-
16-
struct MutableState {
17-
var sentMessages: [RealtimeMessageV2] = []
18-
var responsesHandlers: [(RealtimeMessageV2) -> RealtimeMessageV2?] = []
19-
var receiveContinuation: AsyncThrowingStream<RealtimeMessageV2, Error>.Continuation?
20-
}
21-
22-
let mutableState = LockIsolated(MutableState())
23-
24-
init() {
25-
(status, continuation) = AsyncStream<WebSocketClient.ConnectionStatus>.makeStream()
26-
}
27-
28-
func connect() {}
29-
30-
func send(_ message: RealtimeMessageV2) async throws {
31-
mutableState.withValue {
32-
$0.sentMessages.append(message)
33-
34-
if let response = $0.responsesHandlers.lazy.compactMap({ $0(message) }).first {
35-
$0.receiveContinuation?.yield(response)
36-
}
37-
}
38-
}
39-
40-
func receive() -> AsyncThrowingStream<RealtimeMessageV2, Error> {
41-
mutableState.withValue {
42-
let (stream, continuation) = AsyncThrowingStream<RealtimeMessageV2, Error>.makeStream()
43-
$0.receiveContinuation = continuation
44-
return stream
45-
}
46-
}
47-
48-
func cancel() {
49-
mutableState.receiveContinuation?.finish()
50-
}
51-
52-
func when(_ handler: @escaping (RealtimeMessageV2) -> RealtimeMessageV2?) {
53-
mutableState.withValue {
54-
$0.responsesHandlers.append(handler)
55-
}
56-
}
57-
58-
func mockReceive(_ message: RealtimeMessageV2) {
59-
mutableState.receiveContinuation?.yield(message)
60-
}
61-
62-
func mockStatus(_ status: WebSocketClient.ConnectionStatus) {
63-
continuation.yield(status)
64-
}
11+
import XCTestDynamicOverlay
12+
13+
extension WebSocketClient {
14+
static let mock = WebSocketClient(
15+
status: .never,
16+
send: unimplemented("WebSocketClient.send"),
17+
receive: unimplemented("WebSocketClient.receive"),
18+
connect: unimplemented("WebSocketClient.connect"),
19+
cancel: unimplemented("WebSocketClient.cancel")
20+
)
6521
}

0 commit comments

Comments
 (0)