Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Flexible sync subscribe/unsubscribe API #1354

Merged
merged 50 commits into from
Nov 14, 2023
Merged
Show file tree
Hide file tree
Changes from 31 commits
Commits
Show all changes
50 commits
Select commit Hold shift + click to select a range
9bc4fe0
Implement results subscribe/unsubscribe API
desistefanova Jul 13, 2023
beb4b23
Add a simple test for subscriptions
desistefanova Jul 14, 2023
49b01d7
Update changelog and fix build errors
desistefanova Jul 14, 2023
499b6a3
Use subscribe for local realm throws
desistefanova Jul 14, 2023
1388af0
More tests added
desistefanova Jul 24, 2023
1043eb4
Introduced _SubscribedRealmResult
desistefanova Jul 25, 2023
4099766
Fix wait for download
desistefanova Jul 25, 2023
ff8d2a2
Implement unsubscribeAll
desistefanova Jul 25, 2023
6880da4
Implement unsubscribeAll
desistefanova Jul 25, 2023
877df5b
Fix CHANGELOG
desistefanova Jul 25, 2023
1fc1edc
Skip one the slowest test
desistefanova Jul 25, 2023
b94650f
Enable all tests
desistefanova Jul 26, 2023
2270992
Small fixes
desistefanova Jul 26, 2023
15233de
Fix unsubscribeAll
desistefanova Jul 26, 2023
0815dd7
api doc fix
desistefanova Jul 26, 2023
37f034d
Fix unsubscribe
desistefanova Jul 26, 2023
5f4fd76
Merge branch 'main' into ds/subscription_api
desistefanova Jul 26, 2023
30e02b0
Remove UnsubscribeAll
desistefanova Jul 27, 2023
4130b00
Fix unsubscribe method
desistefanova Jul 27, 2023
32727ec
Update lib/src/subscription.dart
desistefanova Jul 31, 2023
1816575
code review changes
desistefanova Jul 31, 2023
6a36909
Update API doc
desistefanova Jul 31, 2023
a3525a1
Merge branch 'main' into ds/subscription_api
desistefanova Aug 15, 2023
0efe4e4
Added cancellation token to subscripe API
desistefanova Aug 15, 2023
e500989
CancellationToken added to waitForDownload/waitForUpload and waitForS…
desistefanova Aug 15, 2023
12b6891
fix subscribe method
desistefanova Aug 16, 2023
1e87963
clear unnamed
desistefanova Aug 16, 2023
632ed33
update changelog
desistefanova Aug 16, 2023
647791f
update changelog about the cancellationToken
desistefanova Aug 16, 2023
d8cbc97
small fixes
desistefanova Aug 16, 2023
16723f6
remove import
desistefanova Aug 16, 2023
5e66404
code review changes
desistefanova Aug 16, 2023
faf64d9
Merge branch 'main' into ds/subscription_api
desistefanova Aug 17, 2023
321f347
Merge branch 'main' into ds/subscription_api
desistefanova Aug 17, 2023
9c1ba93
Merge branch 'main' into ds/subscription_api
desistefanova Aug 17, 2023
e66a13c
Merge branch 'main' into ds/subscription_api
desistefanova Aug 29, 2023
4e81a7b
Merge branch 'main' into ds/subscription_api
desistefanova Sep 1, 2023
3a00b8b
Merge branch 'main' into ds/subscription_api
desistefanova Sep 1, 2023
98aa892
Merge branch 'main' into ds/subscription_api
nirinchev Nov 9, 2023
620429f
Try to fix tests
nirinchev Nov 14, 2023
59c3f78
Add extra app exception logs
nirinchev Nov 14, 2023
694a6d9
Add print statements for test run start
nirinchev Nov 14, 2023
9da1d1e
Log more info in http logs
nirinchev Nov 14, 2023
f08b999
Merge branch 'main' into ds/subscription_api
nirinchev Nov 14, 2023
c8f202c
Merge branch 'main' into ds/subscription_api
nirinchev Nov 14, 2023
3aaedc6
Use meaningful timeouts on CI
nirinchev Nov 14, 2023
06b9c2b
Don't report to slack always
nirinchev Nov 14, 2023
3e1de5d
Change the concurrency group
nirinchev Nov 14, 2023
5144062
Revert the concurrency group change
nirinchev Nov 14, 2023
f067e02
Final touches
nirinchev Nov 14, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@
* Support ReamSet.freeze() ([#1342](https://github.com/realm/realm-dart/pull/1342))
* Added support for query on `RealmSet`. ([#1346](https://github.com/realm/realm-dart/pull/1346))
* Support for passing `List`, `Set` or `Iterable` arguments to queries with `IN`-operators. ([#1346](https://github.com/realm/realm-dart/pull/1346))

* Added new flexible sync API `RealmResults.subscribe()` and `RealmResults.unsubscribe()` as an easy way to create subscriptions and download data in background. Added named parameter to `MutableSubscriptionSet.clear({bool unnamedOnly = false})` for removing all the unnamed subscriptions. ([#1354](https://github.com/realm/realm-dart/pull/1354))
* Added `cancellationToken` parameter to `Session.waitForDownload()`, `Session.waitForUpload()` and `SubscriptionSet.waitForSynchronization()`. ([#1354](https://github.com/realm/realm-dart/pull/1354))

### Fixed
* Fixed an early unlock race condition during client reset callbacks. ([#1335](https://github.com/realm/realm-dart/pull/1335))
Expand Down
66 changes: 38 additions & 28 deletions lib/src/native/realm_core.dart
Original file line number Diff line number Diff line change
Expand Up @@ -413,17 +413,22 @@ class _RealmCore {
}

static void _stateChangeCallback(Object userdata, int state) {
final completer = userdata as Completer<SubscriptionSetState>;

final completer = userdata as CancellableCompleter<SubscriptionSetState>;
if (completer.isCompleted || completer.isCancelled) {
return;
}
completer.complete(SubscriptionSetState.values[state]);
desistefanova marked this conversation as resolved.
Show resolved Hide resolved
}

Future<SubscriptionSetState> waitForSubscriptionSetStateChange(SubscriptionSet subscriptions, SubscriptionSetState notifyWhen) {
final completer = Completer<SubscriptionSetState>();
final callback = Pointer.fromFunction<Void Function(Handle, Int32)>(_stateChangeCallback);
final userdata = _realmLib.realm_dart_userdata_async_new(completer, callback.cast(), scheduler.handle._pointer);
_realmLib.realm_sync_on_subscription_set_state_change_async(subscriptions.handle._pointer, notifyWhen.index,
_realmLib.addresses.realm_dart_sync_on_subscription_state_changed_callback, userdata.cast(), _realmLib.addresses.realm_dart_userdata_async_free);
Future<SubscriptionSetState> waitForSubscriptionSetStateChange(SubscriptionSet subscriptions, SubscriptionSetState notifyWhen,
[CancellationToken? cancellationToken]) {
final completer = CancellableCompleter<SubscriptionSetState>(cancellationToken);
if (!completer.isCancelled) {
final callback = Pointer.fromFunction<Void Function(Handle, Int32)>(_stateChangeCallback);
final userdata = _realmLib.realm_dart_userdata_async_new(completer, callback.cast(), scheduler.handle._pointer);
_realmLib.realm_sync_on_subscription_set_state_change_async(subscriptions.handle._pointer, notifyWhen.index,
_realmLib.addresses.realm_dart_sync_on_subscription_state_changed_callback, userdata.cast(), _realmLib.addresses.realm_dart_userdata_async_free);
}
return completer.future;
}

Expand Down Expand Up @@ -666,23 +671,26 @@ class _RealmCore {

Future<RealmHandle> openRealmAsync(RealmAsyncOpenTaskHandle handle, CancellationToken? cancellationToken) {
final completer = CancellableCompleter<RealmHandle>(cancellationToken);
final callback =
Pointer.fromFunction<Void Function(Handle, Pointer<realm_thread_safe_reference> realm, Pointer<realm_async_error_t> error)>(_openRealmAsyncCallback);
final userData = _realmLib.realm_dart_userdata_async_new(completer, callback.cast(), scheduler.handle._pointer);
_realmLib.realm_async_open_task_start(
handle._pointer,
_realmLib.addresses.realm_dart_async_open_task_callback,
userData.cast(),
_realmLib.addresses.realm_dart_userdata_async_free,
);

if (!completer.isCancelled) {
desistefanova marked this conversation as resolved.
Show resolved Hide resolved
final callback =
Pointer.fromFunction<Void Function(Handle, Pointer<realm_thread_safe_reference> realm, Pointer<realm_async_error_t> error)>(_openRealmAsyncCallback);
final userData = _realmLib.realm_dart_userdata_async_new(completer, callback.cast(), scheduler.handle._pointer);
_realmLib.realm_async_open_task_start(
handle._pointer,
_realmLib.addresses.realm_dart_async_open_task_callback,
userData.cast(),
_realmLib.addresses.realm_dart_userdata_async_free,
);
}
return completer.future;
}

static void _openRealmAsyncCallback(Object userData, Pointer<realm_thread_safe_reference> realmSafePtr, Pointer<realm_async_error_t> error) {
return using((Arena arena) {
final completer = userData as Completer<RealmHandle>;

final completer = userData as CancellableCompleter<RealmHandle>;
if (completer.isCompleted || completer.isCancelled) {
return;
}
desistefanova marked this conversation as resolved.
Show resolved Hide resolved
if (error != nullptr) {
final err = arena<realm_error>();
_realmLib.realm_get_async_error(error, err);
Expand Down Expand Up @@ -2308,12 +2316,14 @@ class _RealmCore {
controller.onConnectionStateChange(ConnectionState.values[oldState], ConnectionState.values[newState]);
}

Future<void> sessionWaitForUpload(Session session) {
final completer = Completer<void>();
final callback = Pointer.fromFunction<Void Function(Handle, Pointer<realm_sync_error_code_t>)>(_sessionWaitCompletionCallback);
final userdata = _realmLib.realm_dart_userdata_async_new(completer, callback.cast(), scheduler.handle._pointer);
_realmLib.realm_sync_session_wait_for_upload_completion(session.handle._pointer, _realmLib.addresses.realm_dart_sync_wait_for_completion_callback,
userdata.cast(), _realmLib.addresses.realm_dart_userdata_async_free);
Future<void> sessionWaitForUpload(Session session, [CancellationToken? cancellationToken]) {
final completer = CancellableCompleter<void>(cancellationToken);
if (!completer.isCancelled) {
final callback = Pointer.fromFunction<Void Function(Handle, Pointer<realm_sync_error_code_t>)>(_sessionWaitCompletionCallback);
final userdata = _realmLib.realm_dart_userdata_async_new(completer, callback.cast(), scheduler.handle._pointer);
_realmLib.realm_sync_session_wait_for_upload_completion(session.handle._pointer, _realmLib.addresses.realm_dart_sync_wait_for_completion_callback,
userdata.cast(), _realmLib.addresses.realm_dart_userdata_async_free);
}
return completer.future;
}

Expand All @@ -2329,8 +2339,8 @@ class _RealmCore {
}

static void _sessionWaitCompletionCallback(Object userdata, Pointer<realm_sync_error_code_t> errorCode) {
final completer = userdata as Completer<void>;
if (completer.isCompleted) {
final completer = userdata as CancellableCompleter<void>;
if (completer.isCompleted || completer.isCancelled) {
return;
}
desistefanova marked this conversation as resolved.
Show resolved Hide resolved
if (errorCode != nullptr) {
Expand Down
4 changes: 2 additions & 2 deletions lib/src/realm_class.dart
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ import 'session.dart';
import 'subscription.dart';
import 'set.dart';

export 'package:cancellation_token/cancellation_token.dart' show CancellationToken, CancelledException;
export 'package:cancellation_token/cancellation_token.dart' show CancellationToken, TimeoutCancellationToken, CancelledException;
Copy link
Contributor

@nielsenko nielsenko Aug 16, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Still now fan of exporting classes from other packages.. but I know you didn't start this 😄

export 'package:realm_common/realm_common.dart'
show
Backlink,
Expand Down Expand Up @@ -106,7 +106,7 @@ export 'realm_object.dart'
RealmObjectChanges,
UserCallbackException;
export 'realm_property.dart';
export 'results.dart' show RealmResultsOfObject, RealmResultsChanges, RealmResults;
export 'results.dart' show RealmResultsOfObject, RealmResultsChanges, RealmResults, WaitForSyncMode, RealmResultsOfRealmObject;
export 'session.dart'
show
ConnectionStateChange,
Expand Down
112 changes: 112 additions & 0 deletions lib/src/results.dart
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
import 'dart:async';
import 'dart:ffi';

import 'package:cancellation_token/cancellation_token.dart';

import 'collections.dart';
import 'native/realm_core.dart';
import 'realm_class.dart';
Expand Down Expand Up @@ -135,6 +137,99 @@ extension RealmResultsOfObject<T extends RealmObjectBase> on RealmResults<T> {
}
}

class _SubscribedRealmResult<T extends RealmObject> extends RealmResults<T> {
final String? subscriptionName;

_SubscribedRealmResult._(RealmResults<RealmObject> results, {this.subscriptionName})
: super._(
results.handle,
results.realm,
results.metadata,
);
}

extension RealmResultsOfRealmObject<T extends RealmObject> on RealmResults<T> {
nielsenko marked this conversation as resolved.
Show resolved Hide resolved
/// Adds this [RealmResults] query to the set of active subscriptions.
/// The query will be joined via an OR statement with any existing queries for the same type.
///
/// If a [name] is given this allows you to later refer to the subscription by name,
/// e.g. when calling [MutableSubscriptionSet.removeByName].
///
/// If [update] is specified to `true`, then any existing query
/// with the same name will be replaced.
/// Otherwise a [RealmException] is thrown, in case of duplicates.
///
/// [WaitForSyncMode] specifies how to wait or not wait for subscribed objects to be downloaded.
/// The default value is [WaitForSyncMode.firstTime].
///
/// The [cancellationToken] is optional and can be used to cancel
/// the waiting for objects to be downloaded.
/// If the operation is cancelled, a [CancelledException] is thrown and the download
/// continues in the background.
/// In case of using [TimeoutCancellationToken] and the time limit is exceeded,
/// a [TimeoutException] is thrown and the download continues in the background.
///
/// {@category Sync}
Future<RealmResults<T>> subscribe({
String? name,
WaitForSyncMode waitForSyncMode = WaitForSyncMode.firstTime,
CancellationToken? cancellationToken,
bool update = false,
}) async {
Subscription? existingSubscription = name == null ? realm.subscriptions.find(this) : realm.subscriptions.findByName(name);
late Subscription updatedSubscription;
realm.subscriptions.update((mutableSubscriptions) {
updatedSubscription = mutableSubscriptions.add(this, name: name, update: update);
});
bool shouldWait = waitForSyncMode == WaitForSyncMode.always ||
(waitForSyncMode == WaitForSyncMode.firstTime && subscriptionIsChanged(existingSubscription, updatedSubscription));

return await CancellableFuture.from<RealmResults<T>>(() async {
if (cancellationToken != null && cancellationToken.isCancelled) {
throw cancellationToken.exception!;
}
if (shouldWait) {
await realm.subscriptions.waitForSynchronization(cancellationToken);
await realm.syncSession.waitForDownload(cancellationToken);
}
return _SubscribedRealmResult._(this, subscriptionName: name);
}, cancellationToken);
}

/// Unsubscribe from this query result. It returns immediately
/// without waiting for synchronization.
///
/// If the subscription is unnamed, the subscription matching
/// the query will be removed.
/// Return `false` if the [RealmResults] is not created by [subscribe].
///
/// {@category Sync}
bool unsubscribe() {
bool unsubscribed = false;
if (realm.config is! FlexibleSyncConfiguration) {
throw RealmError('unsubscribe is only allowed on Realms opened with a FlexibleSyncConfiguration');
}
if (this is _SubscribedRealmResult<T>) {
final subscriptionName = (this as _SubscribedRealmResult<T>).subscriptionName;
realm.subscriptions.update((mutableSubscriptions) {
if (subscriptionName != null) {
unsubscribed = mutableSubscriptions.removeByName(subscriptionName);
} else {
unsubscribed = mutableSubscriptions.removeByQuery(this);
}
});
}
return unsubscribed;
}

bool subscriptionIsChanged(Subscription? existingSubscription, Subscription updatedSubscription) {
bool changed = existingSubscription == null ||
existingSubscription.objectClassName != updatedSubscription.objectClassName ||
existingSubscription.queryString != updatedSubscription.queryString;
return changed;
}
}

/// @nodoc
//RealmResults package internal members
extension RealmResultsInternal on RealmResults {
Expand Down Expand Up @@ -231,3 +326,20 @@ class _RealmResultsIterator<T extends Object?> implements Iterator<T> {
return true;
}
}

///
/// Behavior when waiting for subscribed objects to be synchronized/downloaded.
///
enum WaitForSyncMode {
/// Waits until the objects have been downloaded from the server
/// the first time the subscription is created. If the subscription
/// already exists, the [RealmResults] is returned immediately.
firstTime,

/// Always waits until the objects have been downloaded from the server.
always,

/// Never waits for the download to complete, but keeps downloading the
/// objects in the background.
never,
}
4 changes: 3 additions & 1 deletion lib/src/session.dart
Original file line number Diff line number Diff line change
Expand Up @@ -57,9 +57,11 @@ class Session implements Finalizable {
void resume() => realmCore.sessionResume(this);

/// Waits for the [Session] to finish all pending uploads.
Future<void> waitForUpload() => realmCore.sessionWaitForUpload(this);
/// An optional [cancellationToken] can be used to cancel the wait operation.
Future<void> waitForUpload([CancellationToken? cancellationToken]) => realmCore.sessionWaitForUpload(this, cancellationToken);

/// Waits for the [Session] to finish all pending downloads.
/// An optional [cancellationToken] can be used to cancel the wait operation.
Future<void> waitForDownload([CancellationToken? cancellationToken]) => realmCore.sessionWaitForDownload(this, cancellationToken);

/// Gets a [Stream] of [SyncProgress] that can be used to track upload or download progress.
Expand Down
29 changes: 20 additions & 9 deletions lib/src/subscription.dart
Original file line number Diff line number Diff line change
Expand Up @@ -147,8 +147,8 @@ abstract class SubscriptionSet with IterableMixin<Subscription> implements Final
return result == null ? null : Subscription._(result);
}

Future<SubscriptionSetState> _waitForStateChange(SubscriptionSetState state) async {
final result = await realmCore.waitForSubscriptionSetStateChange(this, state);
Future<SubscriptionSetState> _waitForStateChange(SubscriptionSetState state, [CancellationToken? cancellationToken]) async {
final result = await realmCore.waitForSubscriptionSetStateChange(this, state, cancellationToken);
realmCore.refreshSubscriptionSet(this);
return result;
}
Expand All @@ -159,8 +159,9 @@ abstract class SubscriptionSet with IterableMixin<Subscription> implements Final
/// the returned [Future] will complete immediately. If the state is
/// [SubscriptionSetState.error], the returned future will throw an
/// error.
Future<void> waitForSynchronization() async {
final result = await _waitForStateChange(SubscriptionSetState.complete);
/// An optional [cancellationToken] can be used to cancel the wait operation.
Future<void> waitForSynchronization([CancellationToken? cancellationToken]) async {
final result = await _waitForStateChange(SubscriptionSetState.complete, cancellationToken);
if (result == SubscriptionSetState.error) {
throw error!;
}
Expand Down Expand Up @@ -272,21 +273,22 @@ class MutableSubscriptionSet extends SubscriptionSet {
return Subscription._(realmCore.insertOrAssignSubscription(this, query, name, update));
}

/// Remove the [subscription] from the set, if it exists.
/// Removes the [subscription] from the set, if it exists.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why change the tense? If we do, there are doc comments on other methods like clear that should change as well..

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Most of the descriptions in this file are like this, Removes, Gets, Adds ... I will change the rest of the methods in the file.

bool remove(Subscription subscription) {
return realmCore.eraseSubscriptionById(this, subscription);
}

/// Remove the [query] from the set, if it exists.
/// Removes the [query] from the set, if it exists.
bool removeByQuery<T extends RealmObject>(RealmResults<T> query) {
return realmCore.eraseSubscriptionByResults(this, query);
}

/// Remove the [query] from the set that matches by [name], if it exists.
/// Remove the subscription from the set that matches by [name], if it exists.
bool removeByName(String name) {
return realmCore.eraseSubscriptionByName(this, name);
}

/// Removes the subscriptions from the set that matches by type, if it exists.
bool removeByType<T extends RealmObject>() {
final name = realm.schema.singleWhere((e) => e.type == T).name;
var result = false;
Expand All @@ -301,8 +303,17 @@ class MutableSubscriptionSet extends SubscriptionSet {
}

/// Clear the subscription set.
void clear() {
realmCore.clearSubscriptionSet(this);
void clear({bool unnamedOnly = false}) {
if (unnamedOnly) {
for (var i = length - 1; i >= 0; i--) {
final subscription = this[i];
if (subscription.name == null) {
remove(subscription);
}
}
} else {
realmCore.clearSubscriptionSet(this);
}
}
}

Expand Down
7 changes: 3 additions & 4 deletions test/realm_test.dart
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ import 'package:test/test.dart' hide test, throws;
import 'package:timezone/timezone.dart' as tz;
import 'package:timezone/data/latest.dart' as tz;
import 'package:path/path.dart' as p;
import 'package:cancellation_token/cancellation_token.dart';
import '../lib/realm.dart';
import 'test.dart';
import '../lib/src/native/realm_core.dart';
Expand Down Expand Up @@ -1017,7 +1016,7 @@ Future<void> main([List<String>? args]) async {
Future<void>.delayed(Duration(milliseconds: 10), () => transaction.commit());

final transaction1 = await realm.beginWriteAsync();

await transaction1.commitAsync();

expect(transaction.isOpen, false);
Expand Down Expand Up @@ -1978,8 +1977,8 @@ Future<void> _addDataToAtlas(Realm realm, String productNamePrefix, {int itemsCo
await realm.syncSession.waitForDownload();
}

Future<void> _addSubscriptions(Realm realm, String searchByPreffix) async {
final query = realm.query<Product>(r'name BEGINSWITH $0', [searchByPreffix]);
Future<void> _addSubscriptions(Realm realm, String searchByPrefix) async {
final query = realm.query<Product>(r'name BEGINSWITH $0', [searchByPrefix]);
if (realm.subscriptions.find(query) == null) {
realm.subscriptions.update((mutableSubscriptions) => mutableSubscriptions.add(query));
}
Expand Down
13 changes: 13 additions & 0 deletions test/session_test.dart
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,19 @@ Future<void> main([List<String>? args]) async {
await realm.syncSession.waitForDownload();
});

baasTest('SyncSession.waitForDownload/waitForUpload canceled', (configuration) async {
final realm = await getIntegrationRealm();
final cancellationDownloadToken = CancellationToken();
final waitForDownloadFuture = realm.syncSession.waitForDownload(cancellationDownloadToken);
cancellationDownloadToken.cancel();
expect(() async => await waitForDownloadFuture, throwsA(isA<CancelledException>()));

final cancellationUploadToken = CancellationToken();
final waitForUploadFuture = realm.syncSession.waitForUpload(cancellationUploadToken);
cancellationUploadToken.cancel();
expect(() async => await waitForUploadFuture, throwsA(isA<CancelledException>()));
});

baasTest('SyncSesison.waitForUpload with changes', (configuration) async {
final differentiator = ObjectId();

Expand Down
Loading
Loading