diff --git a/packages/komodo_defi_framework/lib/src/streaming/event_streaming_service.dart b/packages/komodo_defi_framework/lib/src/streaming/event_streaming_service.dart index e52e4075e..bfbff6972 100644 --- a/packages/komodo_defi_framework/lib/src/streaming/event_streaming_service.dart +++ b/packages/komodo_defi_framework/lib/src/streaming/event_streaming_service.dart @@ -31,7 +31,9 @@ class KdfEventStreamingService { SseConnectionState _connectionState = SseConnectionState.disconnected; Stream get events => _events.stream; - + bool _disposed = false; + bool _initialized = false; + /// Future that completes when the first byte is received from the SSE stream. /// This indicates the server's event loop is fully flowing and the client is registered. Future get firstByteReceived => _firstByteCompleter.future; @@ -50,6 +52,8 @@ class KdfEventStreamingService { /// but should not be called at app startup. SSE connection should be tied to authentication state. @Deprecated('Use connectIfNeeded() instead') void initialize() { + if (_initialized) return; + _initialized = true; connectIfNeeded(); } @@ -109,8 +113,12 @@ class KdfEventStreamingService { } void _onIncomingData(Object? data) { - try { - if (data == null) return; + if (_disposed) return; + // Break synchronous call stacks to avoid re-entrancy into disposed closures + scheduleMicrotask(() { + if (_disposed) return; + try { + if (data == null) return; JsonMap? map; if (data is String) { @@ -137,17 +145,20 @@ class KdfEventStreamingService { } else { throw ArgumentError('Unsupported event data type: ${data.runtimeType}'); } - final event = KdfEvent.fromJson(map); - if (kDebugMode) { - final summary = _summarizeEvent(event); - print('[EventStream] Received ${event.typeEnum.value}: $summary'); - } - _events.add(event); - } catch (e) { - if (kDebugMode) { - print('Failed to parse stream event: $e'); + final event = KdfEvent.fromJson(map); + if (kDebugMode) { + final summary = _summarizeEvent(event); + print('[EventStream] Received ${event.typeEnum.value}: $summary'); + } + if (!_events.isClosed) { + _events.add(event); + } + } catch (e) { + if (kDebugMode) { + print('Failed to parse stream event: $e'); + } } - } + }); } /// Generic filter for a specific event type with proper type casting @@ -198,6 +209,7 @@ class KdfEventStreamingService { /// Cleanup Future dispose() async { + _disposed = true; _unsubscribe?.call(); await _events.close(); } diff --git a/packages/komodo_defi_sdk/lib/src/activation/activation_manager.dart b/packages/komodo_defi_sdk/lib/src/activation/activation_manager.dart index 45060eff0..6b1a09dba 100644 --- a/packages/komodo_defi_sdk/lib/src/activation/activation_manager.dart +++ b/packages/komodo_defi_sdk/lib/src/activation/activation_manager.dart @@ -70,12 +70,32 @@ class ActivationManager { continue; } - // Register activation attempt + // Register activation attempt; if one is already in progress, join it final primaryCompleter = await _registerActivation(group.primary.id); if (primaryCompleter == null) { debugPrint( - 'Activation already in progress for ${group.primary.id.name}', + 'Activation already in progress for ${group.primary.id.name} (joining)', ); + // Join the existing activation and emit a synthetic completion when done + final existing = _activationCompleters[group.primary.id]; + if (existing != null) { + yield ActivationProgress( + status: 'Joining existing activation for ${group.primary.id.name}...', + progressDetails: const ActivationProgressDetails( + currentStep: ActivationStep.initialization, + stepCount: 1, + ), + ); + try { + await existing.future; + yield ActivationProgress.alreadyActiveSuccess( + assetName: group.primary.id.name, + childCount: group.children?.length ?? 0, + ); + } catch (e) { + yield ActivationProgress.error(message: e.toString()); + } + } continue; } @@ -176,10 +196,9 @@ class ActivationManager { /// Register new activation attempt Future?> _registerActivation(AssetId assetId) async { return _protectedOperation(() async { - // Return the existing completer if activation is already in progress - // This ensures subsequent callers properly wait for the activation to complete + // If activation is already in progress for this asset, signal caller to join if (_activationCompleters.containsKey(assetId)) { - return _activationCompleters[assetId]; + return null; } final completer = Completer(); diff --git a/packages/komodo_defi_sdk/lib/src/assets/asset_manager.dart b/packages/komodo_defi_sdk/lib/src/assets/asset_manager.dart index ea19febfc..b7c6898b1 100644 --- a/packages/komodo_defi_sdk/lib/src/assets/asset_manager.dart +++ b/packages/komodo_defi_sdk/lib/src/assets/asset_manager.dart @@ -1,4 +1,4 @@ -import 'dart:async' show StreamSubscription; +import 'dart:async' show StreamSubscription, StreamController, scheduleMicrotask; import 'package:flutter/foundation.dart' show ValueGetter; import 'package:komodo_coins/komodo_coins.dart'; @@ -48,8 +48,10 @@ class AssetManager implements IAssetProvider { ValueGetter activationManager, this._coins, ValueGetter activatedAssetsCache, + [ValueGetter? activationCoordinator], ) : _activationManager = activationManager, - _activatedAssetsCache = activatedAssetsCache { + _activatedAssetsCache = activatedAssetsCache, + _activationCoordinator = activationCoordinator { _authSubscription = _auth.authStateChanges.listen(_handleAuthStateChange); } final KomodoDefiLocalAuth _auth; @@ -61,6 +63,9 @@ class AssetManager implements IAssetProvider { /// See [activateAsset] and [activateAssets] for more details. final ValueGetter _activationManager; + /// Lazily resolved shared activation coordinator to ensure global dedup. + final ValueGetter? _activationCoordinator; + /// Activated assets cache shared across SDK consumers. final ValueGetter _activatedAssetsCache; StreamSubscription? _authSubscription; @@ -198,8 +203,13 @@ class AssetManager implements IAssetProvider { /// activation logic internally and seamlessly. /// /// Returns a stream of [ActivationProgress] updates. - Stream activateAsset(Asset asset) => - _activationManager().activateAsset(asset); + Stream activateAsset(Asset asset) { + final coordinator = _activationCoordinator?.call(); + if (coordinator != null) { + return coordinator.activateAssetStream(asset); + } + return _activationManager().activateAsset(asset); + } /// Activates multiple assets at once. /// @@ -211,8 +221,24 @@ class AssetManager implements IAssetProvider { /// activation logic internally and seamlessly. /// /// Returns a stream of [ActivationProgress] updates. - Stream activateAssets(List assets) => - _activationManager().activateAssets(assets); + Stream activateAssets(List assets) { + final coordinator = _activationCoordinator?.call(); + if (coordinator == null) { + return _activationManager().activateAssets(assets); + } + + final controller = StreamController.broadcast(); + + scheduleMicrotask(() { + for (final asset in assets) { + coordinator + .activateAssetStream(asset) + .listen(controller.add, onError: controller.addError); + } + }); + + return controller.stream; + } /// Disposes of the asset manager, cleaning up resources. /// diff --git a/packages/komodo_defi_sdk/lib/src/balances/balance_manager.dart b/packages/komodo_defi_sdk/lib/src/balances/balance_manager.dart index 84cbd4795..f15428f6f 100644 --- a/packages/komodo_defi_sdk/lib/src/balances/balance_manager.dart +++ b/packages/komodo_defi_sdk/lib/src/balances/balance_manager.dart @@ -278,6 +278,16 @@ class BalanceManager implements IBalanceManager { } if (!activateIfNeeded) { + // If activation isn't allowed but one is in progress, join it to avoid races + if (_activationCoordinator!.isActivationInProgress(asset.id)) { + try { + final result = await _activationCoordinator!.activateAsset(asset); + return result.isSuccess; + } catch (e) { + _logger.fine('Failed while awaiting in-progress activation: $e'); + return false; + } + } return _activationCoordinator!.isAssetActive(asset.id); } diff --git a/packages/komodo_defi_sdk/lib/src/bootstrap.dart b/packages/komodo_defi_sdk/lib/src/bootstrap.dart index 3ea76836f..bdc994082 100644 --- a/packages/komodo_defi_sdk/lib/src/bootstrap.dart +++ b/packages/komodo_defi_sdk/lib/src/bootstrap.dart @@ -111,6 +111,7 @@ Future bootstrap({ () => container(), container(), () => container(), + () => container(), ); await assetManager.init(); // Will be removed in near future after KW is fully migrated to KDF