Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SCRUM-58] Stream Listener #62

Merged
merged 4 commits into from
Dec 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions Projects/Core/StreamListener/Interface/AnyStreamContinuation.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
import Foundation

public class AnyStreamContinuation {
private let _yield: (Any) -> Void

public init<T>(_ continuation: AsyncStream<T>.Continuation) {
self._yield = { value in
guard let value = value as? T else { return }
continuation.yield(value)
}
}

public func yield(_ value: Any) {
_yield(value)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,26 +10,49 @@ import Foundation
import Dependencies
import DependenciesMacros

/*
TODO:
StreamListener를 토스트나 다이얼로그, 에러뷰 등 다양한 상황의 스트림을 만들 수 있도록 둘 것인지, serverState 추적만을 하도록 둘 것인지 정해야함 (네이밍 다시 해야함)
++ NetworkTracking도 그 목적이 StreamListener와 유사함
*/
public protocol StreamListenerProtocol {
func send<T: StreamTypeProtocol>(_ state: T) async
func receive<T: StreamTypeProtocol>(_ type: T.Type) -> AsyncStream<T>
}

@DependencyClient
public struct StreamListener {
public var sendServerState: @Sendable (_ state: ServerState) async -> Void
public var updateServerState: @Sendable () -> AsyncStream<ServerState> = { .never }
public var protocolAdapter: StreamListenerProtocol

public init(protocolAdapter: StreamListenerProtocol) {
self.protocolAdapter = protocolAdapter
}
}

extension StreamListener: TestDependencyKey {
public static let previewValue = Self()
public static let testValue = Self()
public static let previewValue = Self(protocolAdapter: StreamListenerTestImpl())
public static let testValue = Self(protocolAdapter: StreamListenerTestImpl())
}

public enum ServerState {
case requestStarted
case requestCompleted
case errorOccured
case networkDisabled
private struct StreamListenerTestImpl: StreamListenerProtocol {
func send<T: StreamTypeProtocol>(_ state: T) async {}
func receive<T: StreamTypeProtocol>(_ type: T.Type) -> AsyncStream<T> { .never }
}

/*
// 서버 상태 변경 시
await send(ServerState.requestStarted)
어떤 stream에 어떤 state를 보낼 것인지

// 스트림 구독
let serverStateStream: AsyncStream<ServerState> = receive(for: ServerState.self)
어떤 stream을 구독할 것인지 , , , , . . ..
*/

/*
catCore - serverstate 네트워크 에러 send
catcore - send와 동시에 core.action에 networkerror 같은 action만들고 어떤 api인지 같이 보냄
catcore - core.action.networkerror([api종류]) 에선, retry 구독, 스트림 받으면 어떤 api인지 알고잇으니 재요청 하면됨 (재요청하고나선 구독한 스트림 꼭 종료)
appCore - serverstate 네트워크 에러 receive
appCore - 네트워크에러 receive 네트워크에러뷰 띄움
retryCore - 재시도 뷰에서 재시도버튼 클릭 시 재시도 send
*/

/*
해야할 것: receive, send 조금 더 수정하기 . . . ..
*/
21 changes: 21 additions & 0 deletions Projects/Core/StreamListener/Interface/Streams.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
import Foundation

// MARK: StreamType

public protocol StreamTypeProtocol: Hashable {
static var key: StreamType { get }
}

public enum StreamType: Hashable {
case serverState
case retry
case toast
}

public enum ServerState: StreamTypeProtocol {
public static var key: StreamType { .serverState }
case requestStarted
case requestCompleted
case errorOccured
case networkDisabled
}
49 changes: 28 additions & 21 deletions Projects/Core/StreamListener/Sources/StreamListener.swift
Original file line number Diff line number Diff line change
Expand Up @@ -15,32 +15,39 @@ extension StreamListener: DependencyKey {
public static let liveValue: StreamListener = .live()

public static func live() -> StreamListener {
return .init(protocolAdapter: StreamListenerImpl())
}
}

// 네이밍 추천 plz .,
actor ContinuationActor {
var continuation: AsyncStream<ServerState>.Continuation?
final class StreamListenerImpl: StreamListenerProtocol {
private let actor = StreamActor()

func set(_ newContinuation: AsyncStream<ServerState>.Continuation) {
continuation = newContinuation
}
func send<T: StreamTypeProtocol>(_ state: T) async {
await actor.yield(type: T.key, value: T.self)
}

func yield(_ state: ServerState) {
continuation?.yield(state)
}
func receive<T: StreamTypeProtocol>(_ type: T.Type) -> AsyncStream<T> {
let (stream, continuation) = AsyncStream<T>.makeStream()
Task {
await actor.register(key: T.key, continuation: continuation)
}
return stream
}
}

let continuationActor = ContinuationActor()
let asyncStream = AsyncStream<ServerState> { continuation in
Task { await continuationActor.set(continuation) }
}
private actor StreamActor {
private var streams: [StreamType: AnyStreamContinuation] = [:]

func register<T: StreamTypeProtocol>(key: StreamType, continuation: AsyncStream<T>.Continuation) {
streams[key] = AnyStreamContinuation(continuation)
}

func yield<T: StreamTypeProtocol>(type: StreamType, value: T.Type) {
guard let continuation = streams[type] else { return }
continuation.yield(value)
}

return StreamListener(
sendServerState: { state in
await continuationActor.yield(state)
},
updateServerState: {
return asyncStream
}
)
func remove(type: StreamType) {
streams[type] = nil
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ public struct NamingCatCore {

case let ._postNamedCatRequest(request):
return .run { send in
await self.streamListener.sendServerState(state: .requestStarted)
await self.streamListener.protocolAdapter.send(ServerState.requestStarted)
await send(._postNamedCatResponse(Result {
try await self.catService.changeCatName(apiClient: apiClient, request: request)
}))
Expand All @@ -126,7 +126,7 @@ public struct NamingCatCore {
case ._postNamedCatResponse(.success(_)):
return .run { send in
try await self.userService.syncUserInfo(apiClient: self.apiClient, databaseClient: self.databaseClient)
await self.streamListener.sendServerState(state: .requestCompleted)
await self.streamListener.protocolAdapter.send(ServerState.requestCompleted)
await send(._setNextAction)
}

Expand Down Expand Up @@ -172,14 +172,14 @@ extension NamingCatCore {
networkError.code == .networkConnectionLost ||
networkError.code == .notConnectedToInternet {
return .run { send in
await streamListener.sendServerState(state: .networkDisabled)
await self.streamListener.protocolAdapter.send(ServerState.networkDisabled)
}
}
guard let error = error as? NetworkError else { return .none }
switch error {
case .apiError(_):
return .run { send in
await streamListener.sendServerState(state: .errorOccured)
await self.streamListener.protocolAdapter.send(ServerState.errorOccured)
}
default:
return .none
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ public struct SelectCatCore {

case ._fetchCatListRequest:
return .run { send in
await streamListener.sendServerState(state: .requestStarted)
await self.streamListener.protocolAdapter.send(ServerState.requestStarted)
await send(._fetchCatListResponse(Result {
try await catService.getCatList(apiClient)
}))
Expand All @@ -134,15 +134,15 @@ public struct SelectCatCore {
case let ._fetchCatListResponse(.success(response)):
state.catList = response.map { SomeCat(baseInfo: $0) }
return .run { send in
await streamListener.sendServerState(state: .requestCompleted)
await self.streamListener.protocolAdapter.send(ServerState.requestCompleted)
}

case let ._fetchCatListResponse(.failure(error)):
return handleError(error: error)

case let ._postSelectedCatRequest(request):
return .run { send in
await streamListener.sendServerState(state: .requestStarted)
await self.streamListener.protocolAdapter.send(ServerState.requestStarted)
await send(._postSelectedCatResponse(Result {
try await userService.selectCat(apiClient: self.apiClient, request: request)
}))
Expand All @@ -151,7 +151,7 @@ public struct SelectCatCore {
case ._postSelectedCatResponse(.success(_)):
return .run { send in
try await userService.syncUserInfo(apiClient: self.apiClient, databaseClient: self.databaseClient)
await streamListener.sendServerState(state: .requestCompleted)
await self.streamListener.protocolAdapter.send(ServerState.requestCompleted)
await send(._setNextAction)
}

Expand All @@ -174,14 +174,14 @@ extension SelectCatCore {
networkError.code == .networkConnectionLost ||
networkError.code == .notConnectedToInternet {
return .run { send in
await streamListener.sendServerState(state: .networkDisabled)
await self.streamListener.protocolAdapter.send(ServerState.networkDisabled)
}
}
guard let error = error as? NetworkError else { return .none }
switch error {
case .apiError(_):
return .run { send in
await streamListener.sendServerState(state: .errorOccured)
await self.streamListener.protocolAdapter.send(ServerState.errorOccured)
}
default:
return .none
Expand Down
5 changes: 3 additions & 2 deletions Projects/Feature/Feature/Sources/AppCore.swift
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
// Feature
//
// Created by devMinseok on 7/22/24.
// Copyright © 2024 PomoNyang. All rights reserved.
// Copyright 2024 PomoNyang. All rights reserved.
//

import SwiftUI
Expand Down Expand Up @@ -92,7 +92,8 @@ public struct AppCore {
case .onLoad:
state.splash = SplashCore.State()
return .run { send in
for await serverState in streamListener.updateServerState() {
let serverStateStream: AsyncStream<ServerState> = streamListener.protocolAdapter.receive(ServerState.self)
for await serverState in serverStateStream {
await send(.serverState(serverState))
}
}
Expand Down
2 changes: 1 addition & 1 deletion XCFramework/Binary/RealmSwift.json
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
{
"10.52.3": "https://github.com/realm/realm-swift/releases/download/v10.52.3/Carthage.xcframework.zip"
"10.54.1": "https://github.com/realm/realm-swift/releases/download/v10.54.1/Carthage.xcframework.zip"
}
Loading