Skip to content

Commit

Permalink
Reduce channel list updates when syncing (#3450)
Browse files Browse the repository at this point in the history
  • Loading branch information
laevandus authored Oct 11, 2024
1 parent 67d3465 commit a70bb54
Show file tree
Hide file tree
Showing 5 changed files with 61 additions and 35 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/).
- Fix a rare crash caused by missing uniqueness constraints in polls [#3454](https://github.com/GetStream/stream-chat-swift/pull/3454)
- Fix rare crash in `WebSocketPingController.connectionStateDidChange` [#3451](https://github.com/GetStream/stream-chat-swift/pull/3451)
- Improve reliability and performance of resetting ephemeral values [#3439](https://github.com/GetStream/stream-chat-swift/pull/3439)
- Reduce channel list updates when updating the local state [#3450](https://github.com/GetStream/stream-chat-swift/pull/3450)

## StreamChatUI
### 🐞 Fixed
Expand Down
29 changes: 17 additions & 12 deletions Sources/StreamChat/Repositories/SyncOperations.swift
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import Foundation
/// A final class that holds the context for the ongoing operations during the sync process
final class SyncContext {
let lastSyncAt: Date
var localChannelIds: [ChannelId] = []
var localChannelIds: Set<ChannelId> = Set()
var synchedChannelIds: Set<ChannelId> = Set()
var watchedAndSynchedChannelIds: Set<ChannelId> = Set()
var unwantedChannelIds: Set<ChannelId> = Set()
Expand All @@ -31,13 +31,13 @@ final class ActiveChannelIdsOperation: AsyncOperation, @unchecked Sendable {
}

let completion: () -> Void = {
context.localChannelIds = Array(Set(context.localChannelIds))
context.localChannelIds = Set(context.localChannelIds)
log.info("Found \(context.localChannelIds.count) active channels", subsystems: .offlineSupport)
done(.continue)
}

context.localChannelIds.append(contentsOf: syncRepository.activeChannelControllers.allObjects.compactMap(\.cid))
context.localChannelIds.append(contentsOf:
context.localChannelIds.formUnion(syncRepository.activeChannelControllers.allObjects.compactMap(\.cid))
context.localChannelIds.formUnion(
syncRepository.activeChannelListControllers.allObjects
.map(\.channels)
.flatMap { $0 }
Expand All @@ -51,8 +51,8 @@ final class ActiveChannelIdsOperation: AsyncOperation, @unchecked Sendable {
} else {
// Main actor requirement
DispatchQueue.main.async {
context.localChannelIds.append(contentsOf: syncRepository.activeChats.allObjects.compactMap { try? $0.cid })
context.localChannelIds.append(contentsOf:
context.localChannelIds.formUnion(syncRepository.activeChats.allObjects.compactMap { try? $0.cid })
context.localChannelIds.formUnion(
syncRepository.activeChannelLists.allObjects
.map(\.state.channels)
.flatMap { $0 }
Expand Down Expand Up @@ -119,7 +119,7 @@ final class GetChannelIdsOperation: AsyncOperation, @unchecked Sendable {
.flatMap(\.channels)
.compactMap { try? ChannelId(cid: $0.cid) }
log.info("0. Retrieved channels from existing queries from DB. Count \(cids.count)", subsystems: .offlineSupport)
context.localChannelIds = Array(Set(cids + activeChannelIds))
context.localChannelIds = Set(cids + activeChannelIds)
done(.continue)
}
}
Expand All @@ -134,17 +134,22 @@ final class SyncEventsOperation: AsyncOperation, @unchecked Sendable {
subsystems: .offlineSupport
)

let channelIds = Set(context.localChannelIds).subtracting(context.synchedChannelIds)
guard !channelIds.isEmpty else {
done(.continue)
return
}

syncRepository?.syncChannelsEvents(
channelIds: context.localChannelIds,
channelIds: Array(channelIds),
lastSyncAt: context.lastSyncAt,
isRecovery: recovery
) { result in
switch result {
case let .success(channelIds):
context.synchedChannelIds = Set(channelIds)
context.synchedChannelIds.formUnion(channelIds)
done(.continue)
case let .failure(error):
context.synchedChannelIds = Set([])
done(error.shouldRetry ? .retry : .continue)
}
}
Expand Down Expand Up @@ -247,8 +252,8 @@ final class RefetchChannelListQueryOperation: AsyncOperation, @unchecked Sendabl
case let .success((watchedChannels, unwantedCids)):
log.info("Successfully refetched query for \(query.debugDescription)", subsystems: .offlineSupport)
let queryChannelIds = watchedChannels.map(\.cid)
context.watchedAndSynchedChannelIds = context.watchedAndSynchedChannelIds.union(queryChannelIds)
context.unwantedChannelIds = context.unwantedChannelIds.union(unwantedCids)
context.watchedAndSynchedChannelIds.formUnion(queryChannelIds)
context.unwantedChannelIds.formUnion(unwantedCids)
done(.continue)
case let .failure(error):
log.error(
Expand Down
16 changes: 9 additions & 7 deletions Sources/StreamChat/Repositories/SyncRepository.swift
Original file line number Diff line number Diff line change
Expand Up @@ -156,8 +156,10 @@ class SyncRepository {
///
/// Background mode (other regular API requests are allowed to run at the same time)
/// 1. Collect all the **active** channel ids (from instances of `Chat`, `ChannelList`, `ChatChannelController`, `ChatChannelListController`)
/// 2. Apply updates from the /sync endpoint for these channels
/// 3. Refresh channel lists (channels for current pages in `ChannelList`, `ChatChannelListController`)
/// 2. Refresh channel lists (channels for current pages in `ChannelList`, `ChatChannelListController`)
/// 3. Apply updates from the /sync endpoint for channels not in active channel lists (max 2000 events is supported)
/// * channel controllers targeting other channels
/// * no channel lists active, but channel controllers are
/// 4. Re-watch channels what we were watching before disconnect
private func syncLocalStateV2(lastSyncAt: Date, completion: @escaping () -> Void) {
let context = SyncContext(lastSyncAt: lastSyncAt)
Expand All @@ -183,12 +185,12 @@ class SyncRepository {
/// 1. Collect all the **active** channel ids
operations.append(ActiveChannelIdsOperation(syncRepository: self, context: context))

// 2. /sync
operations.append(SyncEventsOperation(syncRepository: self, context: context, recovery: false))

// 3. Refresh channel lists (required even after applying events)
// 2. Refresh channel lists
operations.append(contentsOf: activeChannelLists.allObjects.map { RefreshChannelListOperation(channelList: $0, context: context) })
operations.append(contentsOf: activeChannelListControllers.allObjects.map { RefreshChannelListOperation(controller: $0, context: context) })

// 3. /sync (for channels what not part of active channel lists)
operations.append(SyncEventsOperation(syncRepository: self, context: context, recovery: false))

// 4. Re-watch channels what we were watching before disconnect
// Needs to be done explicitly after reconnection, otherwise SDK users need to handle connection changes
Expand Down Expand Up @@ -407,7 +409,7 @@ class SyncRepository {
completion(.failure(.syncEndpointFailed(error)))
return
}
// Backend responds with 400 if there were more than 1000 events to return
// Backend responds with 400 if there were more than 2000 events to return
// Cleaning local channels data and refetching it from scratch
log.info("/sync returned too many events. Continuing...", subsystems: .offlineSupport)

Expand Down
10 changes: 6 additions & 4 deletions Tests/StreamChatTests/Repositories/SyncOperations_Tests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ final class SyncOperations_Tests: XCTestCase {

func test_SyncEventsOperation_pendingDate_syncFailure_shouldRetry() throws {
let context = SyncContext(lastSyncAt: .init())
context.localChannelIds = [ChannelId.unique]
try database.createCurrentUser()
let originalDate = Date().addingTimeInterval(-3600)
try database.writeSynchronously { session in
Expand All @@ -96,6 +97,7 @@ final class SyncOperations_Tests: XCTestCase {

func test_SyncEventsOperation_pendingDate_syncSuccess_shouldUpdateLastPendingConnectionDate() throws {
let context = SyncContext(lastSyncAt: .init())
context.localChannelIds = [ChannelId.unique]
try database.createCurrentUser()
try database.writeSynchronously { session in
session.currentUser?.lastSynchedEventDate = DBDate().addingTimeInterval(-3600)
Expand Down Expand Up @@ -189,7 +191,7 @@ final class SyncOperations_Tests: XCTestCase {
func test_RefetchChannelListQueryOperation_notAvailableOnRemote() {
let context = SyncContext(lastSyncAt: .init())
let controller = ChatChannelListController_Mock(query: .init(filter: .exists(.cid)), client: client)
controller.state = .initialized
controller.state_mock = .initialized
let operation = RefetchChannelListQueryOperation(
controller: controller,
context: context
Expand All @@ -207,7 +209,7 @@ final class SyncOperations_Tests: XCTestCase {
func test_RefetchChannelListQueryOperation_availableOnRemote_resetFailure_shouldRetry() {
let context = SyncContext(lastSyncAt: .init())
let controller = ChatChannelListController_Mock(query: .init(filter: .exists(.cid)), client: client)
controller.state = .remoteDataFetched
controller.state_mock = .remoteDataFetched
let operation = RefetchChannelListQueryOperation(
controller: controller,
context: context
Expand All @@ -226,7 +228,7 @@ final class SyncOperations_Tests: XCTestCase {
func test_RefetchChannelListQueryOperation_availableOnRemote_resetSuccess_shouldAddToContext() throws {
let context = SyncContext(lastSyncAt: .init())
let controller = ChatChannelListController_Mock(query: .init(filter: .exists(.cid)), client: client)
controller.state = .remoteDataFetched
controller.state_mock = .remoteDataFetched
let channelId = ChannelId.unique
try database.writeSynchronously { session in
try session.saveChannel(payload: self.dummyPayload(with: channelId))
Expand Down Expand Up @@ -258,7 +260,7 @@ final class SyncOperations_Tests: XCTestCase {
func test_RefetchChannelListQueryOperation_availableOnRemote_resetSuccess_shouldNotAddToContextWhenAlreadyExisting() throws {
let context = SyncContext(lastSyncAt: .init())
let controller = ChatChannelListController_Mock(query: .init(filter: .exists(.cid)), client: client)
controller.state = .remoteDataFetched
controller.state_mock = .remoteDataFetched
let channelId = ChannelId.unique
try database.writeSynchronously { session in
try session.saveChannel(payload: self.dummyPayload(with: channelId))
Expand Down
40 changes: 28 additions & 12 deletions Tests/StreamChatTests/Repositories/SyncRepository_Tests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,27 @@ class SyncRepositoryV2_Tests: SyncRepository_Tests {
super.setUp()
repository.usesV2Sync = true
}

func test_syncLocalEvents_bySkippingAlreadyFetchedChannelIds() throws {
let lastSyncDate = Date()
let cid = ChannelId.unique
try prepareForSyncLocalStorage(
createUser: true,
lastSynchedEventDate: lastSyncDate,
createChannel: true,
cid: cid
)

// One channel list controller which fetches the state for cid
let chatListController = ChatChannelListController_Mock(query: .init(filter: .exists(.cid)), client: client)
chatListController.state_mock = .remoteDataFetched
chatListController.channels_mock = [.mock(cid: cid)]
repository.startTrackingChannelListController(chatListController)
chatListController.refreshLoadedChannelsResult = .success(Set([cid]))

// If it fails, it means /sync was called but we expect it to be skipped because channel list refresh already refreshed the channel
waitForSyncLocalStateRun()
}
}

class SyncRepository_Tests: XCTestCase {
Expand Down Expand Up @@ -247,7 +268,7 @@ class SyncRepository_Tests: XCTestCase {
)

let chatListController = ChatChannelListController_Mock(query: .init(filter: .exists(.cid)), client: client)
chatListController.state = .remoteDataFetched
chatListController.state_mock = .remoteDataFetched
chatListController.channels_mock = [.mock(cid: cid)]
repository.startTrackingChannelListController(chatListController)
if repository.usesV2Sync {
Expand Down Expand Up @@ -296,7 +317,7 @@ class SyncRepository_Tests: XCTestCase {
)

let chatListController = ChatChannelListController_Mock(query: .init(filter: .exists(.cid)), client: client)
chatListController.state = .remoteDataFetched
chatListController.state_mock = .remoteDataFetched
repository.startTrackingChannelListController(chatListController)
let unwantedId = ChannelId.unique
chatListController.resetChannelsQueryResult = .success(([], [unwantedId]))
Expand Down Expand Up @@ -329,14 +350,9 @@ class SyncRepository_Tests: XCTestCase {
cid: cid
)

// At least one active controller is needed for sync to happen
let chatListController = ChatChannelListController_Mock(query: .init(filter: .exists(.cid)), client: client)
let channelController = ChatChannelController_Mock(channelQuery: ChannelQuery(cid: .unique), channelListQuery: nil, client: client)
if repository.usesV2Sync {
chatListController.state = .remoteDataFetched
chatListController.channels_mock = [.mock(cid: cid)]
repository.startTrackingChannelListController(chatListController)

chatListController.refreshLoadedChannelsResult = .success(Set([cid]))
repository.startTrackingChannelController(channelController)
}

let firstDate = lastSyncDate.addingTimeInterval(1)
Expand All @@ -352,7 +368,7 @@ class SyncRepository_Tests: XCTestCase {

XCTAssertNearlySameDate(lastSyncAtValue, thirdDate)

repository.stopTrackingChannelListController(chatListController)
repository.stopTrackingChannelController(channelController)
}

// MARK: - Sync existing channels events
Expand Down Expand Up @@ -681,6 +697,7 @@ class SyncRepository_Tests: XCTestCase {
}

func test_cancelRecoveryFlow_cancelsAllOperations() throws {
try XCTSkipIf(repository.usesV2Sync, "V2 has different implementation")
// Prepare environment
try prepareForSyncLocalStorage(
createUser: true,
Expand All @@ -696,7 +713,7 @@ class SyncRepository_Tests: XCTestCase {

// Add active channel list component
let channelListController = ChatChannelListController_Mock(query: .init(filter: .exists(.cid)), client: client)
channelListController.state = .remoteDataFetched
channelListController.state_mock = .remoteDataFetched
repository.startTrackingChannelListController(channelListController)

// Sync local state
Expand Down Expand Up @@ -842,7 +859,6 @@ extension SyncRepository_Tests {
}

if let result = requestResult {
// Simulate API Failure
if repository.usesV2Sync {
apiClient.waitForRequest()
guard let callback = apiClient.request_completion as? (Result<MissingEventsPayload, Error>) -> Void else {
Expand Down

0 comments on commit a70bb54

Please sign in to comment.