diff --git a/Sources/AsyncWebSocketOperators/Operators.swift b/Sources/AsyncWebSocketOperators/Operators.swift index 957bf93..f392397 100644 --- a/Sources/AsyncWebSocketOperators/Operators.swift +++ b/Sources/AsyncWebSocketOperators/Operators.swift @@ -60,3 +60,42 @@ extension AsyncStream where Element == AsyncWebSocketClient.ConnectionStatus { } } +extension AsyncStream where Element: Sendable { + /// Transforms a stream of Element into a stream of Result of transformed Element or Error. + /// + /// It allows the stream to continue even upon failure when attempting to transform the element. + public func toResult( + transform: @Sendable @escaping (Element) throws -> T + ) -> AsyncStream> { + self + .map { element in + return Result { try transform(element) } + } + .eraseToStream() + } +} + +extension AsyncStream where Element == AsyncWebSocketClient.Frame { + /// Attemps to decode into JSON if the frame is message.data or message.text. + public func json( + _ type: T.Type + ) -> AsyncStream> where T: Codable, T: Sendable { + self + .on(\.message) + .toResult { + switch $0 { + case let .binary(data): + return try JSONDecoder().decode(T.self, from: data) + case let .text(string): + guard let data = string.data(using: .utf8) else { + throw NSError( + domain: "String To Data Conversion", + code: 0, + userInfo: ["reason": "Faield to convert string to data"] + ) + } + return try JSONDecoder().decode(T.self, from: data) + } + } + } +}