Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,9 @@ class KdfEventStreamingService {
SseConnectionState _connectionState = SseConnectionState.disconnected;

Stream<KdfEvent> get events => _events.stream;

bool _disposed = false;
bool _initialized = false;

/// Future that completes when the first byte is received from the SSE stream.
/// This indicates the server's event loop is fully flowing and the client is registered.
Future<void> get firstByteReceived => _firstByteCompleter.future;
Expand All @@ -50,6 +52,8 @@ class KdfEventStreamingService {
/// but should not be called at app startup. SSE connection should be tied to authentication state.
@Deprecated('Use connectIfNeeded() instead')
void initialize() {
if (_initialized) return;
_initialized = true;
connectIfNeeded();
}

Expand Down Expand Up @@ -109,8 +113,12 @@ class KdfEventStreamingService {
}

void _onIncomingData(Object? data) {
try {
if (data == null) return;
if (_disposed) return;
// Break synchronous call stacks to avoid re-entrancy into disposed closures
scheduleMicrotask(() {
if (_disposed) return;
try {
if (data == null) return;
JsonMap? map;

if (data is String) {
Expand All @@ -137,17 +145,20 @@ class KdfEventStreamingService {
} else {
throw ArgumentError('Unsupported event data type: ${data.runtimeType}');
}
final event = KdfEvent.fromJson(map);
if (kDebugMode) {
final summary = _summarizeEvent(event);
print('[EventStream] Received ${event.typeEnum.value}: $summary');
}
_events.add(event);
} catch (e) {
if (kDebugMode) {
print('Failed to parse stream event: $e');
final event = KdfEvent.fromJson(map);
if (kDebugMode) {
final summary = _summarizeEvent(event);
print('[EventStream] Received ${event.typeEnum.value}: $summary');
}
if (!_events.isClosed) {
_events.add(event);
}
} catch (e) {
if (kDebugMode) {
print('Failed to parse stream event: $e');
}
}
}
});
}

/// Generic filter for a specific event type with proper type casting
Expand Down Expand Up @@ -198,6 +209,7 @@ class KdfEventStreamingService {

/// Cleanup
Future<void> dispose() async {
_disposed = true;
_unsubscribe?.call();
await _events.close();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,12 +70,32 @@ class ActivationManager {
continue;
}

// Register activation attempt
// Register activation attempt; if one is already in progress, join it
final primaryCompleter = await _registerActivation(group.primary.id);
if (primaryCompleter == null) {
debugPrint(
'Activation already in progress for ${group.primary.id.name}',
'Activation already in progress for ${group.primary.id.name} (joining)',
);
// Join the existing activation and emit a synthetic completion when done
final existing = _activationCompleters[group.primary.id];
if (existing != null) {
yield ActivationProgress(
status: 'Joining existing activation for ${group.primary.id.name}...',
progressDetails: const ActivationProgressDetails(
currentStep: ActivationStep.initialization,
stepCount: 1,
),
);
try {
await existing.future;
yield ActivationProgress.alreadyActiveSuccess(
assetName: group.primary.id.name,
childCount: group.children?.length ?? 0,
);
} catch (e) {
yield ActivationProgress.error(message: e.toString());
}
}
continue;
}

Expand Down Expand Up @@ -176,10 +196,9 @@ class ActivationManager {
/// Register new activation attempt
Future<Completer<void>?> _registerActivation(AssetId assetId) async {
return _protectedOperation(() async {
// Return the existing completer if activation is already in progress
// This ensures subsequent callers properly wait for the activation to complete
// If activation is already in progress for this asset, signal caller to join
if (_activationCompleters.containsKey(assetId)) {
return _activationCompleters[assetId];
return null;
}

final completer = Completer<void>();
Expand Down
38 changes: 32 additions & 6 deletions packages/komodo_defi_sdk/lib/src/assets/asset_manager.dart
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import 'dart:async' show StreamSubscription;
import 'dart:async' show StreamSubscription, StreamController, scheduleMicrotask;

import 'package:flutter/foundation.dart' show ValueGetter;
import 'package:komodo_coins/komodo_coins.dart';
Expand Down Expand Up @@ -48,8 +48,10 @@ class AssetManager implements IAssetProvider {
ValueGetter<ActivationManager> activationManager,
this._coins,
ValueGetter<ActivatedAssetsCache> activatedAssetsCache,
[ValueGetter<SharedActivationCoordinator>? activationCoordinator],
) : _activationManager = activationManager,
_activatedAssetsCache = activatedAssetsCache {
_activatedAssetsCache = activatedAssetsCache,
_activationCoordinator = activationCoordinator {
_authSubscription = _auth.authStateChanges.listen(_handleAuthStateChange);
}
final KomodoDefiLocalAuth _auth;
Expand All @@ -61,6 +63,9 @@ class AssetManager implements IAssetProvider {
/// See [activateAsset] and [activateAssets] for more details.
final ValueGetter<ActivationManager> _activationManager;

/// Lazily resolved shared activation coordinator to ensure global dedup.
final ValueGetter<SharedActivationCoordinator>? _activationCoordinator;

/// Activated assets cache shared across SDK consumers.
final ValueGetter<ActivatedAssetsCache> _activatedAssetsCache;
StreamSubscription<KdfUser?>? _authSubscription;
Expand Down Expand Up @@ -198,8 +203,13 @@ class AssetManager implements IAssetProvider {
/// activation logic internally and seamlessly.
///
/// Returns a stream of [ActivationProgress] updates.
Stream<ActivationProgress> activateAsset(Asset asset) =>
_activationManager().activateAsset(asset);
Stream<ActivationProgress> activateAsset(Asset asset) {
final coordinator = _activationCoordinator?.call();
if (coordinator != null) {
return coordinator.activateAssetStream(asset);
}
return _activationManager().activateAsset(asset);
}

/// Activates multiple assets at once.
///
Expand All @@ -211,8 +221,24 @@ class AssetManager implements IAssetProvider {
/// activation logic internally and seamlessly.
///
/// Returns a stream of [ActivationProgress] updates.
Stream<ActivationProgress> activateAssets(List<Asset> assets) =>
_activationManager().activateAssets(assets);
Stream<ActivationProgress> activateAssets(List<Asset> assets) {
final coordinator = _activationCoordinator?.call();
if (coordinator == null) {
return _activationManager().activateAssets(assets);
}

final controller = StreamController<ActivationProgress>.broadcast();

scheduleMicrotask(() {
for (final asset in assets) {
coordinator
.activateAssetStream(asset)
.listen(controller.add, onError: controller.addError);
Comment on lines +232 to +236
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Close activation stream controller so callers can complete

When SharedActivationCoordinator is available, activateAssets now creates a broadcast controller and forwards each asset’s activation stream, but the controller is never closed or completed once the forwarded streams finish. As a result the returned stream never emits done, so any caller awaiting completion (e.g. await activateAssets([...]).last) will hang indefinitely and the controller/listeners stay live even after all activations end. The old implementation from ActivationManager.activateAssets completed normally, so this is a regression.

Useful? React with 👍 / 👎.

}
});

return controller.stream;
}

/// Disposes of the asset manager, cleaning up resources.
///
Expand Down
10 changes: 10 additions & 0 deletions packages/komodo_defi_sdk/lib/src/balances/balance_manager.dart
Original file line number Diff line number Diff line change
Expand Up @@ -278,6 +278,16 @@ class BalanceManager implements IBalanceManager {
}

if (!activateIfNeeded) {
// If activation isn't allowed but one is in progress, join it to avoid races
if (_activationCoordinator!.isActivationInProgress(asset.id)) {
try {
final result = await _activationCoordinator!.activateAsset(asset);
return result.isSuccess;
} catch (e) {
_logger.fine('Failed while awaiting in-progress activation: $e');
return false;
}
}
return _activationCoordinator!.isAssetActive(asset.id);
}

Expand Down
1 change: 1 addition & 0 deletions packages/komodo_defi_sdk/lib/src/bootstrap.dart
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ Future<void> bootstrap({
() => container<ActivationManager>(),
container<KomodoAssetsUpdateManager>(),
() => container<ActivatedAssetsCache>(),
() => container<SharedActivationCoordinator>(),
);
await assetManager.init();
// Will be removed in near future after KW is fully migrated to KDF
Expand Down