Skip to content

Commit

Permalink
Support async SubscriptionSet.waitForStateChange
Browse files Browse the repository at this point in the history
This requires support for out-of-isolate callbacks. To do this a dart
specific EventLoopDispatcher and DispatchFreeUserdata is introduced.
  • Loading branch information
nielsenko committed Apr 29, 2022
1 parent 9835ede commit 483a3b0
Show file tree
Hide file tree
Showing 14 changed files with 353 additions and 29 deletions.
2 changes: 2 additions & 0 deletions ffigen/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,13 @@ headers:
- 'realm_dart.h'
- 'realm_dart_scheduler.h'
- 'realm_android_platform.h'
- 'subscription_set.h'
include-directives: #generate only for these headers
- 'realm.h'
- 'realm_dart.h'
- 'realm_dart_scheduler.h'
- 'realm_android_platform.h'
- 'subscription_set.h'
compiler-opts:
- '-DRLM_NO_ANON_UNIONS'
- '-DFFI_GEN'
Expand Down
1 change: 1 addition & 0 deletions ffigen/subscription_set.h
2 changes: 1 addition & 1 deletion lib/src/app.dart
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ class App {
/// Removes the user's local credentials and attempts to invalidate their refresh token from the server.
///
/// If [user] is null logs out [currentUser] if it exists.
Future<void> logout([User? user]) async {
Future<void> logOut([User? user]) async {
user ??= currentUser;
if (user == null) {
return;
Expand Down
86 changes: 72 additions & 14 deletions lib/src/native/realm_bindings.dart
Original file line number Diff line number Diff line change
Expand Up @@ -7028,7 +7028,7 @@ class RealmLibrary {
ffi.Pointer<realm_flx_sync_subscription_set_t> Function(
ffi.Pointer<realm_t>)>();

/// Wait uptill subscripton set state is equal to the state passed as parameter.
/// Wait until subscripton set state is equal to the state passed as parameter.
/// This is a blocking operation.
/// @return the current subscription state
int realm_sync_on_subscription_set_state_change_wait(
Expand All @@ -7053,27 +7053,39 @@ class RealmLibrary {
/// This is an asynchronous operation.
/// @return true/false if the handler was registered correctly
bool realm_sync_on_subscription_set_state_change_async(
ffi.Pointer<realm_flx_sync_subscription_set_t> arg0,
int arg1,
realm_sync_on_subscription_state_changed arg2,
ffi.Pointer<realm_flx_sync_subscription_set_t> subscription_set,
int notify_when,
realm_sync_on_subscription_state_changed callback,
ffi.Pointer<ffi.Void> userdata,
realm_free_userdata_func_t userdata_free,
) {
return _realm_sync_on_subscription_set_state_change_async(
arg0,
arg1,
arg2,
subscription_set,
notify_when,
callback,
userdata,
userdata_free,
) !=
0;
}

late final _realm_sync_on_subscription_set_state_change_asyncPtr = _lookup<
ffi.NativeFunction<
ffi.Uint8 Function(ffi.Pointer<realm_flx_sync_subscription_set_t>,
ffi.Int32, realm_sync_on_subscription_state_changed)>>(
ffi.Uint8 Function(
ffi.Pointer<realm_flx_sync_subscription_set_t>,
ffi.Int32,
realm_sync_on_subscription_state_changed,
ffi.Pointer<ffi.Void>,
realm_free_userdata_func_t)>>(
'realm_sync_on_subscription_set_state_change_async');
late final _realm_sync_on_subscription_set_state_change_async =
_realm_sync_on_subscription_set_state_change_asyncPtr.asFunction<
int Function(ffi.Pointer<realm_flx_sync_subscription_set_t>, int,
realm_sync_on_subscription_state_changed)>();
int Function(
ffi.Pointer<realm_flx_sync_subscription_set_t>,
int,
realm_sync_on_subscription_state_changed,
ffi.Pointer<ffi.Void>,
realm_free_userdata_func_t)>();

/// Retrieve version for the subscription set passed as parameter
/// @return subscription set version if the poiter to the subscription is valid
Expand Down Expand Up @@ -8002,6 +8014,54 @@ class RealmLibrary {
'realm_dart_get_files_path');
late final _realm_dart_get_files_path = _realm_dart_get_files_pathPtr
.asFunction<ffi.Pointer<ffi.Int8> Function()>();

/// Register a handler in order to be notified when subscription set is equal to the one passed as parameter
/// This is an asynchronous operation.
///
/// @return true/false if the handler was registered correctly
///
/// This is dart specific version of realm_dart_on_subscription_set_state_change_async.
/// Unlike the original method, this one uses event_loop_dispatcher to ensure the callback
/// is handled on the correct isolate thread.
bool realm_dart_sync_on_subscription_set_state_change_async(
ffi.Pointer<realm_flx_sync_subscription_set_t> subscription_set,
int notify_when,
realm_sync_on_subscription_state_changed callback,
ffi.Pointer<ffi.Void> userdata,
realm_free_userdata_func_t userdata_free,
ffi.Pointer<realm_scheduler_t> scheduler,
) {
return _realm_dart_sync_on_subscription_set_state_change_async(
subscription_set,
notify_when,
callback,
userdata,
userdata_free,
scheduler,
) !=
0;
}

late final _realm_dart_sync_on_subscription_set_state_change_asyncPtr =
_lookup<
ffi.NativeFunction<
ffi.Uint8 Function(
ffi.Pointer<realm_flx_sync_subscription_set_t>,
ffi.Int32,
realm_sync_on_subscription_state_changed,
ffi.Pointer<ffi.Void>,
realm_free_userdata_func_t,
ffi.Pointer<realm_scheduler_t>)>>(
'realm_dart_sync_on_subscription_set_state_change_async');
late final _realm_dart_sync_on_subscription_set_state_change_async =
_realm_dart_sync_on_subscription_set_state_change_asyncPtr.asFunction<
int Function(
ffi.Pointer<realm_flx_sync_subscription_set_t>,
int,
realm_sync_on_subscription_state_changed,
ffi.Pointer<ffi.Void>,
realm_free_userdata_func_t,
ffi.Pointer<realm_scheduler_t>)>();
}

class shared_realm extends ffi.Opaque {}
Expand Down Expand Up @@ -8924,9 +8984,7 @@ typedef realm_sync_ssl_verify_func_t = ffi.Pointer<
ffi.Int32)>>;
typedef realm_flx_sync_subscription_set_t = realm_flx_sync_subscription_set;
typedef realm_sync_on_subscription_state_changed = ffi.Pointer<
ffi.NativeFunction<
ffi.Void Function(
ffi.Pointer<realm_flx_sync_subscription_set_t>, ffi.Int32)>>;
ffi.NativeFunction<ffi.Void Function(ffi.Pointer<ffi.Void>, ffi.Int32)>>;
typedef realm_flx_sync_subscription_t = realm_flx_sync_subscription;
typedef realm_flx_sync_mutable_subscription_set_t
= realm_flx_sync_mutable_subscription_set;
Expand Down
29 changes: 26 additions & 3 deletions lib/src/native/realm_core.dart
Original file line number Diff line number Diff line change
Expand Up @@ -229,18 +229,39 @@ class _RealmCore {
)));
}

static void _stateChangeCallback(Pointer<realm_flx_sync_subscription_set> subscriptionSetPtr, int state) {
static void _stateChangeCallback(Pointer<Void> userdata, int state) {
final completer = userdata.toObject<Completer<int>>(isPersistent: true);
if (completer == null) {
return;
}

// TODO: What about errors?!

completer.complete(state);
}

void waitForSubscriptionSetStateChangeSync(SubscriptionSet subscriptions, SubscriptionSetState state) {
_realmLib.realm_sync_on_subscription_set_state_change_wait(subscriptions.handle._pointer, state.index);
}

Future<int> waitForSubscriptionSetStateChange(SubscriptionSet subscriptions, SubscriptionSetState notifyWhen) {
final completer = Completer<int>();
_realmLib.realm_dart_sync_on_subscription_set_state_change_async(
subscriptions.handle._pointer,
notifyWhen.index,
Pointer.fromFunction(_stateChangeCallback),
completer.toPersistentHandle(),
_deletePersistentHandleFuncPtr,
subscriptions.realm.scheduler.handle._pointer,
);
return completer.future;
}

MutableSubscriptionSetHandle makeSubscriptionSetMutable(SubscriptionSet subscriptions) {
return MutableSubscriptionSetHandle._(_realmLib.invokeGetPointer(() => _realmLib.realm_sync_make_subscription_set_mutable(subscriptions.handle._pointer)));
}

SubscriptionSetHandle subscriptionSetCommit(MutableSubscriptionSet subscriptions){
SubscriptionSetHandle subscriptionSetCommit(MutableSubscriptionSet subscriptions) {
return SubscriptionSetHandle._(_realmLib.invokeGetPointer(() => _realmLib.realm_sync_subscription_set_commit(subscriptions.mutableHandle._pointer)));
}

Expand Down Expand Up @@ -948,8 +969,10 @@ class _RealmCore {
});
}

final doNotDie = Set<dynamic>();
AppHandle getApp(AppConfiguration configuration) {
final httpTransportHandle = _createHttpTransport(configuration.httpClient);
doNotDie.add(httpTransportHandle);
final appConfigHandle = _createAppConfig(configuration, httpTransportHandle);
final syncClientConfigHandle = _createSyncClientConfig(configuration);
final realmAppPtr = _realmLib.invokeGetPointer(() => _realmLib.realm_app_get(appConfigHandle._pointer, syncClientConfigHandle._pointer));
Expand Down Expand Up @@ -1119,7 +1142,7 @@ class _RealmCore {
}

static void _logOutCallback(Pointer<Void> userdata, Pointer<realm_app_error> error) {
final Completer<void>? completer = userdata.toObject();
final Completer<void>? completer = userdata.toObject(isPersistent: true);
if (completer == null) {
return;
}
Expand Down
2 changes: 1 addition & 1 deletion lib/src/realm_class.dart
Original file line number Diff line number Diff line change
Expand Up @@ -270,7 +270,7 @@ class Realm {
SubscriptionSet? _subscriptions;
SubscriptionSet? get subscriptions {
if (config is FlexibleSyncConfiguration) {
_subscriptions ??= SubscriptionSetInternal.create(realmCore.getSubscriptions(this));
_subscriptions ??= SubscriptionSetInternal.create(this, realmCore.getSubscriptions(this));
// TODO: Refresh _subscriptions, if needed.
}
return _subscriptions;
Expand Down
12 changes: 7 additions & 5 deletions lib/src/subscription.dart
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,10 @@ enum SubscriptionSetState {
}

abstract class SubscriptionSet with IterableMixin<Subscription> {
Realm _realm;
SubscriptionSetHandle _handle;

SubscriptionSet._(this._handle);
SubscriptionSet._(this._realm, this._handle);

Subscription? find<T extends RealmObject>(RealmResults<T> query) {
return Subscription._(realmCore.findSubscriptionByQuery(this, query));
Expand All @@ -63,8 +64,8 @@ abstract class SubscriptionSet with IterableMixin<Subscription> {
return Subscription._(realmCore.findSubscriptionByName(this, name));
}

void waitForStateChange(SubscriptionSetState state) {
realmCore.waitForSubscriptionSetStateChangeSync(this, state);
Future<SubscriptionSetState> waitForStateChange(SubscriptionSetState state) async {
return SubscriptionSetState.values[await realmCore.waitForSubscriptionSetStateChange(this, state)];
}

@override
Expand All @@ -82,15 +83,16 @@ abstract class SubscriptionSet with IterableMixin<Subscription> {
}

extension SubscriptionSetInternal on SubscriptionSet {
Realm get realm => _realm;
SubscriptionSetHandle get handle => _handle;

static SubscriptionSet create(SubscriptionSetHandle handle) => MutableSubscriptionSet._(handle);
static SubscriptionSet create(Realm realm, SubscriptionSetHandle handle) => MutableSubscriptionSet._(realm, handle);
}

class MutableSubscriptionSet extends SubscriptionSet {
MutableSubscriptionSetHandle? _mutableHandle;

MutableSubscriptionSet._(SubscriptionSetHandle handle) : super._(handle);
MutableSubscriptionSet._(Realm realm, SubscriptionSetHandle handle) : super._(realm, handle);

@override
void update(void Function(MutableSubscriptionSet mutableSubscriptions) action) {
Expand Down
6 changes: 6 additions & 0 deletions src/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,20 @@ set(SOURCES
realm_dart.cpp
realm_dart_scheduler.cpp
dart-include/dart_api_dl.c
subscription_set.cpp
)

set(HEADERS
realm_dart.h
realm_dart_scheduler.h
realm-core/src/realm.h
subscription_set.h
)

set (INCLUDE_DIRS
dart-include
dart-include/internal
${CMAKE_BINARY_DIR}/src/realm-core/src
)

if(NOT CMAKE_SYSTEM_NAME STREQUAL iOS)
Expand Down Expand Up @@ -75,6 +78,9 @@ if(ANDROID)
endif()
endif()

if(REALM_ENABLE_SYNC)
target_compile_definitions(realm_dart PUBLIC REALM_ENABLE_SYNC=1)
endif()

string(APPEND OUTPUT_DIR "${PROJECT_SOURCE_DIR}/binary")
if(CMAKE_SYSTEM_NAME STREQUAL "Windows")
Expand Down
Loading

0 comments on commit 483a3b0

Please sign in to comment.