From 5c3da325ad8e30929f92c297d62d0199cd7b3bf4 Mon Sep 17 00:00:00 2001 From: smk762 Date: Thu, 30 Oct 2025 22:51:14 +0800 Subject: [PATCH 1/2] enhance activation coordinator --- .../src/activation/activation_manager.dart | 29 +++++++++++--- .../lib/src/assets/asset_manager.dart | 38 ++++++++++++++++--- .../lib/src/balances/balance_manager.dart | 10 +++++ .../komodo_defi_sdk/lib/src/bootstrap.dart | 1 + 4 files changed, 67 insertions(+), 11 deletions(-) diff --git a/packages/komodo_defi_sdk/lib/src/activation/activation_manager.dart b/packages/komodo_defi_sdk/lib/src/activation/activation_manager.dart index 45060eff0..6b1a09dba 100644 --- a/packages/komodo_defi_sdk/lib/src/activation/activation_manager.dart +++ b/packages/komodo_defi_sdk/lib/src/activation/activation_manager.dart @@ -70,12 +70,32 @@ class ActivationManager { continue; } - // Register activation attempt + // Register activation attempt; if one is already in progress, join it final primaryCompleter = await _registerActivation(group.primary.id); if (primaryCompleter == null) { debugPrint( - 'Activation already in progress for ${group.primary.id.name}', + 'Activation already in progress for ${group.primary.id.name} (joining)', ); + // Join the existing activation and emit a synthetic completion when done + final existing = _activationCompleters[group.primary.id]; + if (existing != null) { + yield ActivationProgress( + status: 'Joining existing activation for ${group.primary.id.name}...', + progressDetails: const ActivationProgressDetails( + currentStep: ActivationStep.initialization, + stepCount: 1, + ), + ); + try { + await existing.future; + yield ActivationProgress.alreadyActiveSuccess( + assetName: group.primary.id.name, + childCount: group.children?.length ?? 0, + ); + } catch (e) { + yield ActivationProgress.error(message: e.toString()); + } + } continue; } @@ -176,10 +196,9 @@ class ActivationManager { /// Register new activation attempt Future?> _registerActivation(AssetId assetId) async { return _protectedOperation(() async { - // Return the existing completer if activation is already in progress - // This ensures subsequent callers properly wait for the activation to complete + // If activation is already in progress for this asset, signal caller to join if (_activationCompleters.containsKey(assetId)) { - return _activationCompleters[assetId]; + return null; } final completer = Completer(); diff --git a/packages/komodo_defi_sdk/lib/src/assets/asset_manager.dart b/packages/komodo_defi_sdk/lib/src/assets/asset_manager.dart index ea19febfc..b7c6898b1 100644 --- a/packages/komodo_defi_sdk/lib/src/assets/asset_manager.dart +++ b/packages/komodo_defi_sdk/lib/src/assets/asset_manager.dart @@ -1,4 +1,4 @@ -import 'dart:async' show StreamSubscription; +import 'dart:async' show StreamSubscription, StreamController, scheduleMicrotask; import 'package:flutter/foundation.dart' show ValueGetter; import 'package:komodo_coins/komodo_coins.dart'; @@ -48,8 +48,10 @@ class AssetManager implements IAssetProvider { ValueGetter activationManager, this._coins, ValueGetter activatedAssetsCache, + [ValueGetter? activationCoordinator], ) : _activationManager = activationManager, - _activatedAssetsCache = activatedAssetsCache { + _activatedAssetsCache = activatedAssetsCache, + _activationCoordinator = activationCoordinator { _authSubscription = _auth.authStateChanges.listen(_handleAuthStateChange); } final KomodoDefiLocalAuth _auth; @@ -61,6 +63,9 @@ class AssetManager implements IAssetProvider { /// See [activateAsset] and [activateAssets] for more details. final ValueGetter _activationManager; + /// Lazily resolved shared activation coordinator to ensure global dedup. + final ValueGetter? _activationCoordinator; + /// Activated assets cache shared across SDK consumers. final ValueGetter _activatedAssetsCache; StreamSubscription? _authSubscription; @@ -198,8 +203,13 @@ class AssetManager implements IAssetProvider { /// activation logic internally and seamlessly. /// /// Returns a stream of [ActivationProgress] updates. - Stream activateAsset(Asset asset) => - _activationManager().activateAsset(asset); + Stream activateAsset(Asset asset) { + final coordinator = _activationCoordinator?.call(); + if (coordinator != null) { + return coordinator.activateAssetStream(asset); + } + return _activationManager().activateAsset(asset); + } /// Activates multiple assets at once. /// @@ -211,8 +221,24 @@ class AssetManager implements IAssetProvider { /// activation logic internally and seamlessly. /// /// Returns a stream of [ActivationProgress] updates. - Stream activateAssets(List assets) => - _activationManager().activateAssets(assets); + Stream activateAssets(List assets) { + final coordinator = _activationCoordinator?.call(); + if (coordinator == null) { + return _activationManager().activateAssets(assets); + } + + final controller = StreamController.broadcast(); + + scheduleMicrotask(() { + for (final asset in assets) { + coordinator + .activateAssetStream(asset) + .listen(controller.add, onError: controller.addError); + } + }); + + return controller.stream; + } /// Disposes of the asset manager, cleaning up resources. /// diff --git a/packages/komodo_defi_sdk/lib/src/balances/balance_manager.dart b/packages/komodo_defi_sdk/lib/src/balances/balance_manager.dart index 84cbd4795..f15428f6f 100644 --- a/packages/komodo_defi_sdk/lib/src/balances/balance_manager.dart +++ b/packages/komodo_defi_sdk/lib/src/balances/balance_manager.dart @@ -278,6 +278,16 @@ class BalanceManager implements IBalanceManager { } if (!activateIfNeeded) { + // If activation isn't allowed but one is in progress, join it to avoid races + if (_activationCoordinator!.isActivationInProgress(asset.id)) { + try { + final result = await _activationCoordinator!.activateAsset(asset); + return result.isSuccess; + } catch (e) { + _logger.fine('Failed while awaiting in-progress activation: $e'); + return false; + } + } return _activationCoordinator!.isAssetActive(asset.id); } diff --git a/packages/komodo_defi_sdk/lib/src/bootstrap.dart b/packages/komodo_defi_sdk/lib/src/bootstrap.dart index 3ea76836f..bdc994082 100644 --- a/packages/komodo_defi_sdk/lib/src/bootstrap.dart +++ b/packages/komodo_defi_sdk/lib/src/bootstrap.dart @@ -111,6 +111,7 @@ Future bootstrap({ () => container(), container(), () => container(), + () => container(), ); await assetManager.init(); // Will be removed in near future after KW is fully migrated to KDF From 28e8f2814618c48e4687076fa02715d00ba859f8 Mon Sep 17 00:00:00 2001 From: smk762 Date: Thu, 30 Oct 2025 22:51:59 +0800 Subject: [PATCH 2/2] Break synchronous call stacks to avoid re-entrancy into disposed closures --- .../streaming/event_streaming_service.dart | 36 ++++++++++++------- 1 file changed, 24 insertions(+), 12 deletions(-) diff --git a/packages/komodo_defi_framework/lib/src/streaming/event_streaming_service.dart b/packages/komodo_defi_framework/lib/src/streaming/event_streaming_service.dart index b51179391..6d48027a1 100644 --- a/packages/komodo_defi_framework/lib/src/streaming/event_streaming_service.dart +++ b/packages/komodo_defi_framework/lib/src/streaming/event_streaming_service.dart @@ -23,11 +23,15 @@ class KdfEventStreamingService { final StreamController _events = StreamController.broadcast(); Stream get events => _events.stream; + bool _disposed = false; + bool _initialized = false; /// Start listening to stream events. /// - Web: Connects to SharedWorker forwarded messages. /// - Native (IO): Connects to SSE endpoint exposed by KDF RPC server. void initialize() { + if (_initialized) return; + _initialized = true; _unsubscribe ??= connectEventStream( hostConfig: _hostConfig, onMessage: _onIncomingData, @@ -35,8 +39,12 @@ class KdfEventStreamingService { } void _onIncomingData(Object? data) { - try { - if (data == null) return; + if (_disposed) return; + // Break synchronous call stacks to avoid re-entrancy into disposed closures + scheduleMicrotask(() { + if (_disposed) return; + try { + if (data == null) return; JsonMap? map; if (data is String) { @@ -63,17 +71,20 @@ class KdfEventStreamingService { } else { throw ArgumentError('Unsupported event data type: ${data.runtimeType}'); } - final event = KdfEvent.fromJson(map); - if (kDebugMode) { - final summary = _summarizeEvent(event); - print('[EventStream] Received ${event.typeEnum.value}: $summary'); - } - _events.add(event); - } catch (e) { - if (kDebugMode) { - print('Failed to parse stream event: $e'); + final event = KdfEvent.fromJson(map); + if (kDebugMode) { + final summary = _summarizeEvent(event); + print('[EventStream] Received ${event.typeEnum.value}: $summary'); + } + if (!_events.isClosed) { + _events.add(event); + } + } catch (e) { + if (kDebugMode) { + print('Failed to parse stream event: $e'); + } } - } + }); } /// Generic filter for a specific event type with proper type casting @@ -124,6 +135,7 @@ class KdfEventStreamingService { /// Cleanup Future dispose() async { + _disposed = true; _unsubscribe?.call(); await _events.close(); }