Skip to content
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,20 +23,28 @@ class KdfEventStreamingService {
final StreamController<KdfEvent> _events = StreamController.broadcast();

Stream<KdfEvent> get events => _events.stream;
bool _disposed = false;
bool _initialized = false;

/// Start listening to stream events.
/// - Web: Connects to SharedWorker forwarded messages.
/// - Native (IO): Connects to SSE endpoint exposed by KDF RPC server.
void initialize() {
if (_initialized) return;
_initialized = true;
_unsubscribe ??= connectEventStream(
hostConfig: _hostConfig,
onMessage: _onIncomingData,
);
}

void _onIncomingData(Object? data) {
try {
if (data == null) return;
if (_disposed) return;
// Break synchronous call stacks to avoid re-entrancy into disposed closures
scheduleMicrotask(() {
if (_disposed) return;
try {
if (data == null) return;
JsonMap? map;

if (data is String) {
Expand All @@ -63,17 +71,20 @@ class KdfEventStreamingService {
} else {
throw ArgumentError('Unsupported event data type: ${data.runtimeType}');
}
final event = KdfEvent.fromJson(map);
if (kDebugMode) {
final summary = _summarizeEvent(event);
print('[EventStream] Received ${event.typeEnum.value}: $summary');
}
_events.add(event);
} catch (e) {
if (kDebugMode) {
print('Failed to parse stream event: $e');
final event = KdfEvent.fromJson(map);
if (kDebugMode) {
final summary = _summarizeEvent(event);
print('[EventStream] Received ${event.typeEnum.value}: $summary');
}
if (!_events.isClosed) {
_events.add(event);
}
} catch (e) {
if (kDebugMode) {
print('Failed to parse stream event: $e');
}
}
}
});
}

/// Generic filter for a specific event type with proper type casting
Expand Down Expand Up @@ -124,6 +135,7 @@ class KdfEventStreamingService {

/// Cleanup
Future<void> dispose() async {
_disposed = true;
_unsubscribe?.call();
await _events.close();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,12 +70,32 @@ class ActivationManager {
continue;
}

// Register activation attempt
// Register activation attempt; if one is already in progress, join it
final primaryCompleter = await _registerActivation(group.primary.id);
if (primaryCompleter == null) {
debugPrint(
'Activation already in progress for ${group.primary.id.name}',
'Activation already in progress for ${group.primary.id.name} (joining)',
);
// Join the existing activation and emit a synthetic completion when done
final existing = _activationCompleters[group.primary.id];
if (existing != null) {
yield ActivationProgress(
status: 'Joining existing activation for ${group.primary.id.name}...',
progressDetails: const ActivationProgressDetails(
currentStep: ActivationStep.initialization,
stepCount: 1,
),
);
try {
await existing.future;
yield ActivationProgress.alreadyActiveSuccess(
assetName: group.primary.id.name,
childCount: group.children?.length ?? 0,
);
} catch (e) {
yield ActivationProgress.error(message: e.toString());
}
}
continue;
}

Expand Down Expand Up @@ -176,10 +196,9 @@ class ActivationManager {
/// Register new activation attempt
Future<Completer<void>?> _registerActivation(AssetId assetId) async {
return _protectedOperation(() async {
// Return the existing completer if activation is already in progress
// This ensures subsequent callers properly wait for the activation to complete
// If activation is already in progress for this asset, signal caller to join
if (_activationCompleters.containsKey(assetId)) {
return _activationCompleters[assetId];
return null;
}

final completer = Completer<void>();
Expand Down
38 changes: 32 additions & 6 deletions packages/komodo_defi_sdk/lib/src/assets/asset_manager.dart
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import 'dart:async' show StreamSubscription;
import 'dart:async' show StreamSubscription, StreamController, scheduleMicrotask;

import 'package:flutter/foundation.dart' show ValueGetter;
import 'package:komodo_coins/komodo_coins.dart';
Expand Down Expand Up @@ -48,8 +48,10 @@ class AssetManager implements IAssetProvider {
ValueGetter<ActivationManager> activationManager,
this._coins,
ValueGetter<ActivatedAssetsCache> activatedAssetsCache,
[ValueGetter<SharedActivationCoordinator>? activationCoordinator],
) : _activationManager = activationManager,
_activatedAssetsCache = activatedAssetsCache {
_activatedAssetsCache = activatedAssetsCache,
_activationCoordinator = activationCoordinator {
_authSubscription = _auth.authStateChanges.listen(_handleAuthStateChange);
}
final KomodoDefiLocalAuth _auth;
Expand All @@ -61,6 +63,9 @@ class AssetManager implements IAssetProvider {
/// See [activateAsset] and [activateAssets] for more details.
final ValueGetter<ActivationManager> _activationManager;

/// Lazily resolved shared activation coordinator to ensure global dedup.
final ValueGetter<SharedActivationCoordinator>? _activationCoordinator;

/// Activated assets cache shared across SDK consumers.
final ValueGetter<ActivatedAssetsCache> _activatedAssetsCache;
StreamSubscription<KdfUser?>? _authSubscription;
Expand Down Expand Up @@ -198,8 +203,13 @@ class AssetManager implements IAssetProvider {
/// activation logic internally and seamlessly.
///
/// Returns a stream of [ActivationProgress] updates.
Stream<ActivationProgress> activateAsset(Asset asset) =>
_activationManager().activateAsset(asset);
Stream<ActivationProgress> activateAsset(Asset asset) {
final coordinator = _activationCoordinator?.call();
if (coordinator != null) {
return coordinator.activateAssetStream(asset);
}
return _activationManager().activateAsset(asset);
}

/// Activates multiple assets at once.
///
Expand All @@ -211,8 +221,24 @@ class AssetManager implements IAssetProvider {
/// activation logic internally and seamlessly.
///
/// Returns a stream of [ActivationProgress] updates.
Stream<ActivationProgress> activateAssets(List<Asset> assets) =>
_activationManager().activateAssets(assets);
Stream<ActivationProgress> activateAssets(List<Asset> assets) {
final coordinator = _activationCoordinator?.call();
if (coordinator == null) {
return _activationManager().activateAssets(assets);
}

final controller = StreamController<ActivationProgress>.broadcast();

scheduleMicrotask(() {
for (final asset in assets) {
coordinator
.activateAssetStream(asset)
.listen(controller.add, onError: controller.addError);
}
});

return controller.stream;
Comment on lines +224 to +240
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Close multi-asset activation stream after forwarding child streams

The new coordinator-backed activateAssets path builds a broadcast controller but never closes it when the underlying per‑asset streams finish. Consumers waiting on the returned stream to complete (e.g. await manager.activateAssets([...]).last or using await for expecting the loop to end) will now hang indefinitely and keep subscriptions alive, whereas the previous ActivationManager.activateAssets stream finished once all activations completed. Consider closing the controller once all child streams emit a terminal event.

Useful? React with 👍 / 👎.

}

/// Disposes of the asset manager, cleaning up resources.
///
Expand Down
10 changes: 10 additions & 0 deletions packages/komodo_defi_sdk/lib/src/balances/balance_manager.dart
Original file line number Diff line number Diff line change
Expand Up @@ -278,6 +278,16 @@ class BalanceManager implements IBalanceManager {
}

if (!activateIfNeeded) {
// If activation isn't allowed but one is in progress, join it to avoid races
if (_activationCoordinator!.isActivationInProgress(asset.id)) {
try {
final result = await _activationCoordinator!.activateAsset(asset);
return result.isSuccess;
} catch (e) {
_logger.fine('Failed while awaiting in-progress activation: $e');
return false;
}
}
return _activationCoordinator!.isAssetActive(asset.id);
}

Expand Down
1 change: 1 addition & 0 deletions packages/komodo_defi_sdk/lib/src/bootstrap.dart
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ Future<void> bootstrap({
() => container<ActivationManager>(),
container<KomodoAssetsUpdateManager>(),
() => container<ActivatedAssetsCache>(),
() => container<SharedActivationCoordinator>(),
);
await assetManager.init();
// Will be removed in near future after KW is fully migrated to KDF
Expand Down
Loading